Skip to content

Attempting to call list_topics() on a closed Kafka Consumer object results in a segmentation fault. #1942

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
prashant-kumar-27-06 opened this issue Mar 13, 2025 · 1 comment

Comments

@prashant-kumar-27-06
Copy link

prashant-kumar-27-06 commented Mar 13, 2025

Description:

When calling the list_topics() method on a Kafka Consumer object after the close() method has already been invoked, a segmentation fault occurs, causing the Python interpreter to crash and shut down the terminal. This issue was encountered in the following situation:

Steps to Reproduce:

  1. Create and configure a Kafka Consumer object.
  2. Call the close() method on the Consumer object.
  3. Attempt to call list_topics() on the closed Consumer object.

Observed Behavior:

The terminal crashes and a segmentation fault error is raised. The following error log details the issue:

In [2]: from confluent_kafka import Consumer as ConfluentKafkaConsumer

In [5]: from datetime import datetime, timedelta

In [6]: base_prefix = "testkafka.kuprasha.testing.confluent.upgrade."

In [7]: def get_a_group_name():
   ...:     return f"{base_prefix}{datetime.now().strftime('%Y%m%d%H%M%S')}.group"
   ...: 

In [8]: def on_commit(err, topics_partitions):
   ...:     if err:
   ...:         print(f"ERROR _ON_COMMIT: {err}")
   ...:     else:
   ...:         print(f"Offset commit succeeded: {topics_partitions}")
   ...: 
   ...: def on_error(err):
   ...:     print(f"Encountered error {err.name()}: {err}")
   ...: 

In [10]: my_server = ",".join(get_kafka_server_host_port('qa'))

In [11]: configs = {'bootstrap.servers': my_server,
    ...:            'group.id': get_a_group_name(),
    ...:            'on_commit': on_commit,
    ...:            'error_cb': on_error,
    ...:            'auto.offset.reset': 'earliest',
    ...:            "enable.auto.commit": False,
    ...:           }
    ...: configs = deep_merge(get_sasl_kerberos_config(), configs)
    ...: 
    ...: my_consumer = ConfluentKafkaConsumer(configs)

In [12]: my_consumer.assignment()
Out[12]: []

In [13]: topic_info = my_consumer.list_topics()

In [14]: type(topic_info)
Out[14]: confluent_kafka.admin._metadata.ClusterMetadata

In [15]: my_consumer.close()

In [16]: my_consumer.assignment()
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Cell In[16], line 1
----> 1 my_consumer.assignment()

RuntimeError: Consumer closed

In [17]: my_consumer.list_topics()
Fatal Python error: Segmentation fault

Thread 0x00007f5c78f8e700 (most recent call first):
  File "/usr/local/python/python-3.11/std/lib64/python3.11/site-packages/traitlets/traitlets.py", line 0 in get
  File "kwargs", line 0 in sig
Segmentation fault (core dumped)

Impact:

This issue results in the abrupt termination of the application and loss of any unsaved work in the IPython environment.

@githubyb
Copy link

I also encountered a similar problem: When the kafka server is stopped for a period of time and then restarted, the python interpreter will crash abnormally. However, this situation does not necessarily occur, and I do not know the specific cause.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants