-
Notifications
You must be signed in to change notification settings - Fork 233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AIOKafkaConsumer.getone
infinite loop.
#712
Comments
I got same bug in I made some code review and find point where error message emits. except (OSError, asyncio.TimeoutError, KafkaError) as err:
log.error('Unable connect to node with id %s: %s', node_id, err)
if group == ConnectionGroup.DEFAULT:
# Connection failures imply that our metadata is stale, so
# let's refresh
self.force_metadata_update() # <-- doing nothing
return None
else: So i think the problem in this code line def force_metadata_update(self):
# . . .
return asyncio.shield(self._md_update_fut) ... which means that simple call of this function doing nothing Moreover I looked for calls of def _on_connection_closed(self, conn, reason):
""" Callback called when connection is closed
"""
# Connection failures imply that our metadata is stale, so let's
# refresh
if reason == CloseReason.CONNECTION_BROKEN or \
reason == CloseReason.CONNECTION_TIMEOUT:
self.force_metadata_update() # <-- doing nothing |
OK! Here is my workaround... import logging as m_log
import sys as m_sys
import io as m_io
import os as m_os
import asyncio as m_aio
import aiokafka as m_ak
import signal as m_sig
gv_log: m_log.Logger = m_log.getLogger(__name__)
class MyConsumerCommitter:
def __init__(self, iv_consumer: m_ak.AIOKafkaConsumer, i_cache_file: str):
self.l_partition = dict()
self._consumer = iv_consumer
self._cache_file = i_cache_file
self._changed = False
def _save(self) -> None:
with m_io.open(self._cache_file, mode='r', encoding='utf-8', newline='\n') as v_rd:
l_line_new: list = []
l_partition_new: dict = self.l_partition.copy()
for v_line in v_rd:
l_line_parsed = v_line.split(maxsplit=2)
v_tp_committed = m_ak.TopicPartition(l_line_parsed[2], int(l_line_parsed[1]))
if v_tp_committed in self.l_partition:
l_line_parsed[0] = str(self.l_partition[v_tp_committed])
del l_partition_new[v_tp_committed]
l_line_new.append(str(' ').join(l_line_parsed))
l_line_new += \
[f'{v_offset} {v_tp_committed.partition} {v_tp_committed.topic}' \
for v_tp_committed, v_offset in l_partition_new.items()]
with m_io.open(self._cache_file, mode='w', encoding='utf-8', newline='\n') as v_wr:
v_wr.writelines(l_line_new)
def _load(self) -> None:
with m_io.open(self._cache_file, mode='r', encoding='utf-8', newline='\n') as v_rd:
for v_line in v_rd:
l_line_parsed = v_line.split(maxsplit=2)
v_tp_committed = m_ak.TopicPartition(l_line_parsed[2], int(l_line_parsed[1]))
if v_tp_committed in self.l_partition:
self.l_partition[v_tp_committed] = int(l_line_parsed[0])
def partition_assign(self, il_partition) -> None:
gv_log.info(f'Rebalancer: assigned {len(il_partition)}, {il_partition}')
self._changed = True
for v_tp in il_partition:
if v_tp not in self.l_partition:
self.l_partition[v_tp] = 0
self._load()
for v_tp in il_partition:
self._consumer.seek(v_tp, self.l_partition[v_tp] + 1)
def partition_revoke(self, il_partition) -> None:
gv_log.info(f'Rebalancer: revoked {len(il_partition)}, {il_partition}')
self._changed = True
for v_tp in il_partition:
del self.l_partition[v_tp]
def is_changed(self) -> bool:
if self._changed:
self._changed = False
return not self._changed
else:
return self._changed
def offset_set(self, iv_topic, iv_partition, iv_offset) -> None:
self.l_partition[m_ak.TopicPartition(iv_topic, iv_partition)] = iv_offset
self._save()
def offset_get(self, iv_topic, iv_partition) -> int:
return self.l_partition[m_ak.TopicPartition(iv_topic, iv_partition)]
gv_running: bool = True
def on_signal_exit():
gv_log.info("on signal to exit")
global gv_running
gv_running = False
async def consume():
global gv_running
v_loop = m_aio.get_running_loop()
v_loop.add_signal_handler(m_sig.SIGTERM, on_signal_exit)
v_loop.add_signal_handler(m_sig.SIGINT, on_signal_exit)
gv_log.info(m_sys.argv)
v_kc = m_ak.AIOKafkaConsumer(
bootstrap_servers=['kafka1.tz.local:9092']
, group_id='user1'
, security_protocol='SASL_PLAINTEXT'
, sasl_mechanism='PLAIN'
, sasl_plain_username='user1'
, sasl_plain_password='mUATUysl9BxpbdI'
, auto_offset_reset='earliest'
, enable_auto_commit=False)
try:
gv_log.info(f'{__file__} Begin')
v_co_cache = MyConsumerCommitter(v_kc, m_os.path.splitext(__file__)[0] + '.commit.txt')
gv_log.debug(f'Kafka: +start')
await v_kc.start()
gv_log.debug(f'Kafka: -start')
v_kc.subscribe(topics=['simple-01'],
listener=type('', (m_ak.ConsumerRebalanceListener,), {
'on_partitions_assigned': lambda self, assigned: v_co_cache.partition_assign(assigned),
'on_partitions_revoked': lambda self, revoked: v_co_cache.partition_revoke(revoked)
})()
)
l_end_offset: dict = {}
gv_log.info(f'Init done')
if v_co_cache.is_changed():
gv_log.info(f'Kafka: on rebalance [early]')
for v_tp, v_offs in v_co_cache.l_partition.items():
v_kc.seek(v_tp, v_offs + 1)
v_cnt: int = 10
while gv_running and v_cnt:
gv_log.debug(f'+loop')
v_cnt -= 1
gv_log.debug(f'Kafka: +getone')
v_msg = await m_aio.wait_for(v_kc.getone(), timeout=60)
gv_log.debug(f'Kafka: -getone')
if v_co_cache.is_changed():
gv_log.info(f'Kafka: on rebalance')
for v_tp, v_offs in v_co_cache.l_partition.items():
gv_log.debug(f'Kafka: seek')
v_kc.seek(v_tp, v_offs + 1)
continue
gv_log.info(f': consumed {v_msg.partition}:{v_msg.offset}')
with m_io.open(m_os.path.splitext(__file__)[0] + f'.{v_msg.offset % 10}.msg', mode='w+b') as v_wr:
v_wr.write(bytes(f'{len(v_msg.key)}\n', encoding='utf-8') + v_msg.key)
v_wr.write(bytes(f'\n{len(v_msg.value)}\n', encoding='utf-8') + v_msg.value)
v_co_cache.offset_set('simple-01', v_msg.partition, v_msg.offset)
await m_aio.sleep(1)
gv_log.debug(f': -loop [{gv_running}]')
except BaseException as x:
gv_log.exception(x.__repr__())
raise
finally:
try:
gv_log.info(f'Kafka: +stop')
await m_aio.wait_for(v_kc.stop(), timeout=60)
finally:
gv_log.info(f'Kafka: -stop')
gv_log.info(f'End')
m_log.basicConfig(
level=m_log.DEBUG,
handlers=(m_logh.WatchedFileHandler(m_os.extsep.join(__file__.split(m_os.extsep)[:-1] + ['log']), mode='w'),))
m_aio.run(consume()) I updated source code at aiokafka/client.py:458: except (OSError, asyncio.TimeoutError, KafkaError) as err:
log.error('Unable connect to node with id %s: %s', node_id, err)
if group == ConnectionGroup.DEFAULT:
# Connection failures imply that our metadata is stale, so
# let's refresh
self.force_metadata_update() # <--
return None to: except (OSError, asyncio.TimeoutError, KafkaError) as err:
log.error('Unable connect to node with id %s: %s', node_id, err)
if group == ConnectionGroup.DEFAULT:
# Connection failures imply that our metadata is stale, so
# let's refresh
await self.force_metadata_update()
return None If i don't i will get infinite loop in |
If I'm not mistaken, there is a similar unawaited function call here 😞 |
@TEH30P hi. i do research connected with zombie consumers after problem with network and maybe your issue is related with it. I have try to use your snippet to reproduce but all work is fine. after 60 secound I have got
could u explain how do u simulate "network was down during"? About "which means that simple call of this function doing nothing" it's not actually true. In the def force_metadata_update(self):
if self._md_update_fut is None:
if not self._md_update_waiter.done():
self._md_update_waiter.set_result(None)
self._md_update_fut = self._loop.create_future() this attribute are using into if self._sync_task is None:
# starting metadata synchronizer task
self._sync_task = create_task(self._md_synchronizer()) async def _md_synchronizer(self):
while True:
await asyncio.wait(
[self._md_update_waiter],
timeout=self._metadata_max_age_ms / 1000)
ret = await self._metadata_update(self.cluster, topics) so, this function will run |
def force_metadata_update(self):
"""Update cluster metadata
Returns:
True/False - metadata updated or not
"""
if self._md_update_fut is None:
# Wake up the `_md_synchronizer` task
if not self._md_update_waiter.done():
self._md_update_waiter.set_result(None) # <----------- this is not nothing, it wakes a task to update metadata
self._md_update_fut = self._loop.create_future()
# Metadata will be updated in the background by syncronizer
return asyncio.shield(self._md_update_fut) Not awaiting See the following code: async def _md_synchronizer(self):
"""routine (async task) for synchronize cluster metadata every
`metadata_max_age_ms` milliseconds"""
while True:
await asyncio.wait(
[self._md_update_waiter],
timeout=self._metadata_max_age_ms / 1000) # <---- this should wake periodically
topics = self._topics
if self._md_update_fut is None:
self._md_update_fut = create_future() # <--- this should ensure next call to force_metadata_update() wakes us, whether it is awaited or not
ret = await self._metadata_update(self.cluster, topics) # <---- this should update metadata
# If list of topics changed during metadata update we must update
# it again right away.
if topics != self._topics:
continue
# Earlier this waiter was set before sending metadata_request,
# but that was to avoid topic list changes being unnoticed, which
# is handled explicitly now.
self._md_update_waiter = create_future()
self._md_update_fut.set_result(ret)
self._md_update_fut = None Hosts should eventually get shuffled, as i understand it. Can you still repro this? If you ctrl-C when |
Not sure if I am reproducing this issue (could it be the cause of #625 ?), but I will try the following approach to wrap import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
_log = logging.getLogger()
async def task() -> None:
loop = asyncio.get_running_loop()
forever = loop.create_future()
await forever # simulating a hang
async def main() -> None:
t = asyncio.create_task(task()) # imagine this is getone()
try:
await asyncio.wait_for(t, timeout=2) # set your timeout
except TimeoutError as e:
t.cancel()
try:
await t
except BaseException as e:
_log.exception('backtrace for getone() when it timed out')
asyncio.run(main()) |
I'm having similar error, Kafka cluster set in GCP by Confluent Cloud, we're noticing occasional "Unable connect to node with id X" error but today we run into the infinite loop of those until our process was restarted.
Notice that first error log is missing stringified Lines 487 to 489 in 7544add
It last 10 hours and produced 915k errors. We're using
|
It's certainly a bug, and it's easy to fix it |
Looks like this is still an issue |
Describe the bug
If network was down during
await AIOKafkaConsumer.getone
theawait
will never get done.Even wrapping in
asyncio.wait_for
doesn't help.Expected behaviour
Some kind of exception should happen.
Environment (please complete the following information):
#python -c "import aiokafka; print(aiokafka.__version__) 0.7.0
python -c "import kafka; print(kafka.__version__)"
):#python -c "import kafka; print(kafka.__version__)" 2.0.2
kafka-topics.sh --version
):Reproducible example
Part of the log with messages which were created at start of the infinite loop:
The text was updated successfully, but these errors were encountered: