From a14dcecd8af7e8a57c71476d2d33875ee86b475f Mon Sep 17 00:00:00 2001 From: alex-ibb Date: Tue, 10 Mar 2026 00:51:09 +0000 Subject: [PATCH] fix(aio): use get_running_loop() instead of get_event_loop() in AIOConsumer --- src/confluent_kafka/aio/_AIOConsumer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/confluent_kafka/aio/_AIOConsumer.py b/src/confluent_kafka/aio/_AIOConsumer.py index 0ff986a62..160e80e41 100644 --- a/src/confluent_kafka/aio/_AIOConsumer.py +++ b/src/confluent_kafka/aio/_AIOConsumer.py @@ -38,7 +38,7 @@ def __init__( raise ValueError("max_workers must be at least 1") self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() wrap_common_callbacks = _common.wrap_common_callbacks wrap_conf_callback = _common.wrap_conf_callback wrap_common_callbacks(loop, consumer_conf) @@ -103,7 +103,7 @@ def _edit_rebalance_callbacks_args(self, args: Tuple[Any, ...]) -> Tuple[Any, .. return tuple(args_list) async def subscribe(self, *args: Any, **kwargs: Any) -> Any: - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() for callback in ['on_assign', 'on_revoke', 'on_lost']: if callback in kwargs: kwargs[callback] = self._wrap_callback(