diff --git a/src/confluent_kafka/aio/_AIOConsumer.py b/src/confluent_kafka/aio/_AIOConsumer.py index fb0157070..4fe446968 100644 --- a/src/confluent_kafka/aio/_AIOConsumer.py +++ b/src/confluent_kafka/aio/_AIOConsumer.py @@ -41,7 +41,7 @@ def __init__( raise ValueError("max_workers must be at least 1") self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() wrap_common_callbacks = _common.wrap_common_callbacks wrap_conf_callback = _common.wrap_conf_callback wrap_common_callbacks(loop, consumer_conf) @@ -112,7 +112,7 @@ def _edit_rebalance_callbacks_args(self, args: Tuple[Any, ...]) -> Tuple[Any, .. return tuple(args_list) async def subscribe(self, *args: Any, **kwargs: Any) -> Any: - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() for callback in ['on_assign', 'on_revoke', 'on_lost']: if callback in kwargs: kwargs[callback] = self._wrap_callback(