Skip to content

Commit 3785b1b

Browse files
authored
KIP 848:Extend DescribeConfigs and IncrementalAlterConfigs to support GROUP Config (#1856)
* add integration test to check for added group support in config apis * Add enum Group Config * Add enum CONFIG_SOURCE_GROUP_CONFIG to init.py * style fix * add TODO comment in test
1 parent 4e39f6a commit 3785b1b

File tree

4 files changed

+39
-0
lines changed

4 files changed

+39
-0
lines changed

src/confluent_kafka/admin/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG,
7373
CONFIG_SOURCE_STATIC_BROKER_CONFIG,
7474
CONFIG_SOURCE_DEFAULT_CONFIG,
75+
CONFIG_SOURCE_GROUP_CONFIG,
7576
RESOURCE_UNKNOWN,
7677
RESOURCE_ANY,
7778
RESOURCE_TOPIC,

src/confluent_kafka/admin/_config.py

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class ConfigSource(Enum):
5656
DYNAMIC_DEFAULT_BROKER_CONFIG = _cimpl.CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG #: Dynamic Default Broker
5757
STATIC_BROKER_CONFIG = _cimpl.CONFIG_SOURCE_STATIC_BROKER_CONFIG #: Static Broker
5858
DEFAULT_CONFIG = _cimpl.CONFIG_SOURCE_DEFAULT_CONFIG #: Default
59+
GROUP_CONFIG = _cimpl.CONFIG_SOURCE_GROUP_CONFIG #: Group
5960

6061

6162
class ConfigEntry(object):

src/confluent_kafka/src/AdminTypes.c

+2
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,8 @@ static void AdminTypes_AddObjectsConfigSource (PyObject *m) {
514514
RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG);
515515
PyModule_AddIntConstant(m, "CONFIG_SOURCE_DEFAULT_CONFIG",
516516
RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG);
517+
PyModule_AddIntConstant(m, "CONFIG_SOURCE_GROUP_CONFIG",
518+
RD_KAFKA_CONFIG_SOURCE_GROUP_CONFIG);
517519
}
518520

519521

tests/integration/admin/test_incremental_alter_configs.py

+35
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
ConfigEntry, ResourceType, \
2020
AlterConfigOpType
2121

22+
from tests.common import TestUtils
23+
2224

2325
def assert_expected_config_entries(fs, num_fs, expected):
2426
"""
@@ -147,3 +149,36 @@ def test_incremental_alter_configs(kafka_cluster):
147149

148150
# Assert expected config entries.
149151
assert_expected_config_entries(fs, 1, expected)
152+
153+
# TODO: enable this test for the classic run too, when
154+
# Confluent Platform test cluster is upgraded to 8.0.0
155+
if TestUtils.use_group_protocol_consumer():
156+
group_id = "test-group"
157+
158+
res_group = ConfigResource(
159+
ResourceType.GROUP,
160+
group_id,
161+
incremental_configs=[
162+
ConfigEntry("consumer.session.timeout.ms", "50000",
163+
incremental_operation=AlterConfigOpType.SET)
164+
]
165+
)
166+
167+
expected[res_group] = ['consumer.session.timeout.ms="50000"']
168+
169+
#
170+
# Incrementally alter some configuration values
171+
#
172+
fs = admin_client.incremental_alter_configs([res_group])
173+
174+
assert_operation_succeeded(fs, 1)
175+
176+
time.sleep(1)
177+
178+
#
179+
# Get current group config
180+
#
181+
fs = admin_client.describe_configs([res_group])
182+
183+
# Assert expected config entries.
184+
assert_expected_config_entries(fs, 1, expected)

0 commit comments

Comments
 (0)