Skip to content

[KIP-848] Test changes required for KIP-848 Preview #1967

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 17, 2025
3 changes: 2 additions & 1 deletion .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ execution_time_limit:
global_job_config:
env_vars:
- name: LIBRDKAFKA_VERSION
value: v2.8.0
value: v2.10.0-RC3
prologue:
commands:
- checkout
Expand Down Expand Up @@ -208,6 +208,7 @@ blocks:
- name: Build and Tests with 'consumer' group protocol
commands:
- sem-version python 3.9
- sem-version java 17
# use a virtualenv
- python3 -m venv _venv && source _venv/bin/activate
- chmod u+r+x tools/source-package-verification.sh
Expand Down
2 changes: 1 addition & 1 deletion requirements/requirements-tests-install.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
-r requirements-avro.txt
-r requirements-protobuf.txt
-r requirements-json.txt
tests/trivup/trivup-0.12.7.tar.gz
tests/trivup/trivup-0.12.10.tar.gz
46 changes: 45 additions & 1 deletion tests/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,26 @@ def _trivup_cluster_type_kraft():


class TestUtils:
@staticmethod
def broker_version():
return '4.0.0' if TestUtils.use_group_protocol_consumer() else '3.9.0'

@staticmethod
def broker_conf():
broker_conf = ['transaction.state.log.replication.factor=1',
'transaction.state.log.min.isr=1']
if TestUtils.use_group_protocol_consumer():
broker_conf.append('group.coordinator.rebalance.protocols=classic,consumer')
return broker_conf

@staticmethod
def _broker_major_version():
return int(TestUtils.broker_version().split('.')[0])

@staticmethod
def use_kraft():
return TestUtils.use_group_protocol_consumer() or _trivup_cluster_type_kraft()
return (TestUtils.use_group_protocol_consumer() or
_trivup_cluster_type_kraft())

@staticmethod
def use_group_protocol_consumer():
Expand All @@ -41,8 +58,35 @@ def update_conf_group_protocol(conf=None):
if conf is not None and 'group.id' in conf and TestUtils.use_group_protocol_consumer():
conf['group.protocol'] = 'consumer'

@staticmethod
def remove_forbidden_conf_group_protocol_consumer(conf):
if conf is None:
return
if TestUtils.use_group_protocol_consumer():
forbidden_conf_properties = ["session.timeout.ms",
"partition.assignment.strategy",
"heartbeat.interval.ms",
"group.protocol.type"]
for prop in forbidden_conf_properties:
if prop in conf:
print("Skipping setting forbidden configuration {prop} for `CONSUMER` protocol")
del conf[prop]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a message here as in other clients



class TestConsumer(Consumer):
def __init__(self, conf=None, **kwargs):
TestUtils.update_conf_group_protocol(conf)
TestUtils.remove_forbidden_conf_group_protocol_consumer(conf)
super(TestConsumer, self).__init__(conf, **kwargs)

def assign(self, partitions):
if TestUtils.use_group_protocol_consumer():
super(TestConsumer, self).incremental_assign(partitions)
else:
super(TestConsumer, self).assign(partitions)

def unassign(self, partitions):
if TestUtils.use_group_protocol_consumer():
super(TestConsumer, self).incremental_unassign(partitions)
else:
super(TestConsumer, self).unassign()
2 changes: 2 additions & 0 deletions tests/common/schema_registry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
class TestDeserializingConsumer(DeserializingConsumer):
def __init__(self, conf=None, **kwargs):
TestUtils.update_conf_group_protocol(conf)
TestUtils.remove_forbidden_conf_group_protocol_consumer(conf)
super(TestDeserializingConsumer, self).__init__(conf, **kwargs)


class TestAvroConsumer(AvroConsumer):
def __init__(self, conf=None, **kwargs):
TestUtils.update_conf_group_protocol(conf)
TestUtils.remove_forbidden_conf_group_protocol_consumer(conf)
super(TestAvroConsumer, self).__init__(conf, **kwargs)
3 changes: 2 additions & 1 deletion tests/integration/admin/test_basic_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@
assert topic_partition.offset == 0


def test_basic_operations(kafka_cluster):

Check failure on line 202 in tests/integration/admin/test_basic_operations.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/integration/admin/test_basic_operations.py#L202

Refactor this function to reduce its Cognitive Complexity from 27 to the 15 allowed.
num_partitions = 2
topic_config = {"compression.type": "gzip"}

Expand Down Expand Up @@ -277,7 +277,8 @@
print('Read all the required messages: exiting')
break
except ConsumeError as e:
if msg is not None and e.code == KafkaError._PARTITION_EOF:
if e.code == KafkaError._PARTITION_EOF:
msg = e.kafka_message
print('Reached end of %s [%d] at offset %d' % (
msg.topic(), msg.partition(), msg.offset()))
eof_reached[(msg.topic(), msg.partition())] = True
Expand Down
20 changes: 4 additions & 16 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,38 +26,26 @@
work_dir = os.path.dirname(os.path.realpath(__file__))


def _broker_conf():
broker_conf = ['transaction.state.log.replication.factor=1',
'transaction.state.log.min.isr=1']
if TestUtils.use_group_protocol_consumer():
broker_conf.append('group.coordinator.rebalance.protocols=classic,consumer')
return broker_conf


def _broker_version():
return 'trunk@3a0efa2845e6a0d237772adfe6364579af50ce18' if TestUtils.use_group_protocol_consumer() else '3.8.0'


def create_trivup_cluster(conf={}):
trivup_fixture_conf = {'with_sr': True,
'debug': True,
'cp_version': '7.6.0',
'kraft': TestUtils.use_kraft(),
'version': _broker_version(),
'broker_conf': _broker_conf()}
'version': TestUtils.broker_version(),
'broker_conf': TestUtils.broker_conf()}
trivup_fixture_conf.update(conf)
return TrivupFixture(trivup_fixture_conf)


def create_sasl_cluster(conf={}):
trivup_fixture_conf = {'with_sr': False,
'version': _broker_version(),
'version': TestUtils.broker_version(),
'sasl_mechanism': "PLAIN",
'kraft': TestUtils.use_kraft(),
'sasl_users': 'sasl_user=sasl_user',
'debug': True,
'cp_version': 'latest',
'broker_conf': _broker_conf()}
'broker_conf': TestUtils.broker_conf()}
trivup_fixture_conf.update(conf)
return TrivupFixture(trivup_fixture_conf)

Expand Down
21 changes: 19 additions & 2 deletions tests/integration/consumer/test_consumer_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from confluent_kafka.error import ConsumeError
from confluent_kafka.serialization import StringSerializer
from tests.common import TestUtils


def test_consume_error(kafka_cluster):
Expand All @@ -46,6 +47,14 @@ def test_consume_error(kafka_cluster):
"Expected _PARTITION_EOF, not {}".format(exc_info)


# Skipping the test for consumer protocol for now. Update the test to use
# IncrementalAlterConfigs Admin operation to update
# group.session.timeout.ms and enable the test again.
@pytest.mark.skipif(TestUtils.use_group_protocol_consumer(),
reason="session.timeout.ms is not supported on client side for "
"consumer protocol. Update this test to use IncrementalAlterConfigs "
"Admin operation to update group.session.timeout.ms and enable "
"the test again.")
def test_consume_error_commit(kafka_cluster):
"""
Tests to ensure that we handle messages with errors when commiting.
Expand All @@ -63,13 +72,21 @@ def test_consume_error_commit(kafka_cluster):
try:
# Since the session timeout value is low, JoinGroupRequest will fail
# and we get error in a message while polling.
m = consumer.poll(1)
m = consumer.poll(2)
consumer.commit(m)
except KafkaException as e:
assert e.args[0].code() == KafkaError._INVALID_ARG, \
"Expected INVALID_ARG, not {}".format(e)


# Skipping the test for consumer protocol for now. Update the test to use
# IncrementalAlterConfigs Admin operation to update
# group.session.timeout.ms and enable the test again.
@pytest.mark.skipif(TestUtils.use_group_protocol_consumer(),
reason="session.timeout.ms is not supported on client side for "
"consumer protocol. Update this test to use IncrementalAlterConfigs "
"Admin operation to update group.session.timeout.ms and enable "
"the test again.")
def test_consume_error_store_offsets(kafka_cluster):
"""
Tests to ensure that we handle messages with errors when storing offsets.
Expand All @@ -89,7 +106,7 @@ def test_consume_error_store_offsets(kafka_cluster):
try:
# Since the session timeout value is low, JoinGroupRequest will fail
# and we get error in a message while polling.
m = consumer.poll(1)
m = consumer.poll(2)
consumer.store_offsets(m)
except KafkaException as e:
assert e.args[0].code() == KafkaError._INVALID_ARG, \
Expand Down
23 changes: 18 additions & 5 deletions tests/integration/consumer/test_consumer_memberid.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# limit

import pytest
from tests.common import TestUtils


def test_consumer_memberid(kafka_cluster):
Expand All @@ -32,17 +33,29 @@ def test_consumer_memberid(kafka_cluster):
consumer = kafka_cluster.consumer(consumer_conf)

assert consumer is not None
assert consumer.memberid() is None
before_memberid = consumer.memberid()

# With implementation of KIP-1082, member id is generated on the client
# side for ConsumerGroupHeartbeat used in the `consumer` protocol
# introduced in KIP-848
if TestUtils.use_group_protocol_consumer():
assert before_memberid is not None
assert isinstance(before_memberid, str)
assert len(before_memberid) > 0
else:
assert before_memberid is None

kafka_cluster.seed_topic(topic, value_source=[b'memberid'])

consumer.subscribe([topic])
msg = consumer.poll(10)
assert msg is not None
assert msg.value() == b'memberid'
memberid = consumer.memberid()
print("Member Id is -----> " + memberid)
assert isinstance(memberid, str)
assert len(memberid) > 0
after_memberid = consumer.memberid()
assert isinstance(after_memberid, str)
assert len(after_memberid) > 0
if TestUtils.use_group_protocol_consumer():
assert before_memberid == after_memberid
consumer.close()

with pytest.raises(RuntimeError) as error_info:
Expand Down
42 changes: 21 additions & 21 deletions tests/integration/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,9 @@ def verify_producer_performance(with_dr_cb=True):
bar.finish()

print('# producing %d messages (%.2fMb) took %.3fs: %d msgs/s, %.2f Mb/s' %
(msgs_produced, bytecnt / (1024*1024), t_produce_spent,
(msgs_produced, bytecnt / (1024 * 1024), t_produce_spent,
msgs_produced / t_produce_spent,
(bytecnt/t_produce_spent) / (1024*1024)))
(bytecnt / t_produce_spent) / (1024 * 1024)))
print('# %d temporary produce() failures due to backpressure (local queue full)' % msgs_backpressure)

print('waiting for %d/%d deliveries' % (len(p), msgs_produced))
Expand All @@ -344,9 +344,9 @@ def verify_producer_performance(with_dr_cb=True):
t_delivery_spent = time.time() - t_produce_start

print('# producing %d messages (%.2fMb) took %.3fs: %d msgs/s, %.2f Mb/s' %
(msgs_produced, bytecnt / (1024*1024), t_produce_spent,
(msgs_produced, bytecnt / (1024 * 1024), t_produce_spent,
msgs_produced / t_produce_spent,
(bytecnt/t_produce_spent) / (1024*1024)))
(bytecnt / t_produce_spent) / (1024 * 1024)))

# Fake numbers if not using a dr_cb
if not with_dr_cb:
Expand All @@ -355,9 +355,9 @@ def verify_producer_performance(with_dr_cb=True):
dr.bytes_delivered = bytecnt

print('# delivering %d messages (%.2fMb) took %.3fs: %d msgs/s, %.2f Mb/s' %
(dr.msgs_delivered, dr.bytes_delivered / (1024*1024), t_delivery_spent,
(dr.msgs_delivered, dr.bytes_delivered / (1024 * 1024), t_delivery_spent,
dr.msgs_delivered / t_delivery_spent,
(dr.bytes_delivered/t_delivery_spent) / (1024*1024)))
(dr.bytes_delivered / t_delivery_spent) / (1024 * 1024)))
print('# post-produce delivery wait took %.3fs' %
(t_delivery_spent - t_produce_spent))

Expand Down Expand Up @@ -447,7 +447,7 @@ def print_wmark(consumer, topic_parts):
elif (msg.offset() % 4) == 0:
offsets = c.commit(msg, asynchronous=False)
assert len(offsets) == 1, 'expected 1 offset, not %s' % (offsets)
assert offsets[0].offset == msg.offset()+1, \
assert offsets[0].offset == msg.offset() + 1, \
'expected offset %d to be committed, not %s' % \
(msg.offset(), offsets)
print('Sync committed offset: %s' % offsets)
Expand Down Expand Up @@ -515,7 +515,7 @@ def my_on_revoke(consumer, partitions):
print('on_revoke:', len(partitions), 'partitions:')
for p in partitions:
print(' %s [%d] @ %d' % (p.topic, p.partition, p.offset))
consumer.unassign()
consumer.unassign(partitions)

c.subscribe([topic], on_assign=my_on_assign, on_revoke=my_on_revoke)

Expand Down Expand Up @@ -559,8 +559,8 @@ def my_on_revoke(consumer, partitions):
if msgcnt > 0:
t_spent = time.time() - t_first_msg
print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' %
(msgcnt, bytecnt / (1024*1024), t_spent, msgcnt / t_spent,
(bytecnt / t_spent) / (1024*1024)))
(msgcnt, bytecnt / (1024 * 1024), t_spent, msgcnt / t_spent,
(bytecnt / t_spent) / (1024 * 1024)))

print('closing consumer')
c.close()
Expand Down Expand Up @@ -590,11 +590,11 @@ def verify_consumer_seek(c, seek_to_msg):
print('seek: message at offset %d (epoch %d)' %
(msg.offset(), msg.leader_epoch()))
assert msg.offset() == seek_to_msg.offset() and \
msg.leader_epoch() == seek_to_msg.leader_epoch(), \
('expected message at offset %d (epoch %d), ' % (seek_to_msg.offset(),
seek_to_msg.leader_epoch())) + \
('not %d (epoch %d)' % (msg.offset(),
msg.leader_epoch()))
msg.leader_epoch() == seek_to_msg.leader_epoch(), \
('expected message at offset %d (epoch %d), ' % (seek_to_msg.offset(),
seek_to_msg.leader_epoch())) + \
('not %d (epoch %d)' % (msg.offset(),
msg.leader_epoch()))
break


Expand Down Expand Up @@ -643,7 +643,7 @@ def verify_batch_consumer():
elif (msg.offset() % 4) == 0:
offsets = c.commit(msg, asynchronous=False)
assert len(offsets) == 1, 'expected 1 offset, not %s' % (offsets)
assert offsets[0].offset == msg.offset()+1, \
assert offsets[0].offset == msg.offset() + 1, \
'expected offset %d to be committed, not %s' % \
(msg.offset(), offsets)
print('Sync committed offset: %s' % offsets)
Expand Down Expand Up @@ -697,7 +697,7 @@ def my_on_revoke(consumer, partitions):
print('on_revoke:', len(partitions), 'partitions:')
for p in partitions:
print(' %s [%d] @ %d' % (p.topic, p.partition, p.offset))
consumer.unassign()
consumer.unassign(partitions)

c.subscribe([topic], on_assign=my_on_assign, on_revoke=my_on_revoke)

Expand Down Expand Up @@ -738,8 +738,8 @@ def my_on_revoke(consumer, partitions):
if msgcnt > 0:
t_spent = time.time() - t_first_msg
print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' %
(msgcnt, bytecnt / (1024*1024), t_spent, msgcnt / t_spent,
(bytecnt / t_spent) / (1024*1024)))
(msgcnt, bytecnt / (1024 * 1024), t_spent, msgcnt / t_spent,
(bytecnt / t_spent) / (1024 * 1024)))

print('closing consumer')
c.close()
Expand Down Expand Up @@ -1035,8 +1035,8 @@ def stats_cb(stats_json_str):
if msgcnt > 0:
t_spent = time.time() - t_first_msg
print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' %
(msgcnt, bytecnt / (1024*1024), t_spent, msgcnt / t_spent,
(bytecnt / t_spent) / (1024*1024)))
(msgcnt, bytecnt / (1024 * 1024), t_spent, msgcnt / t_spent,
(bytecnt / t_spent) / (1024 * 1024)))

print('closing consumer')
c.close()
Expand Down
Loading