21
21
22
22
from confluent_kafka .error import ConsumeError
23
23
from confluent_kafka .serialization import StringSerializer
24
+ from tests .common import TestUtils
24
25
25
26
26
27
def test_consume_error (kafka_cluster ):
@@ -46,6 +47,14 @@ def test_consume_error(kafka_cluster):
46
47
"Expected _PARTITION_EOF, not {}" .format (exc_info )
47
48
48
49
50
+ # Skipping the test for consumer protocol for now. Update the test to use
51
+ # IncrementalAlterConfigs Admin operation to update
52
+ # group.session.timeout.ms and enable the test again.
53
+ @pytest .mark .skipif (TestUtils .use_group_protocol_consumer (),
54
+ reason = "session.timeout.ms is not supported on client side for "
55
+ "consumer protocol. Update this test to use IncrementalAlterConfigs "
56
+ "Admin operation to update group.session.timeout.ms and enable "
57
+ "the test again." )
49
58
def test_consume_error_commit (kafka_cluster ):
50
59
"""
51
60
Tests to ensure that we handle messages with errors when commiting.
@@ -63,13 +72,21 @@ def test_consume_error_commit(kafka_cluster):
63
72
try :
64
73
# Since the session timeout value is low, JoinGroupRequest will fail
65
74
# and we get error in a message while polling.
66
- m = consumer .poll (1 )
75
+ m = consumer .poll (2 )
67
76
consumer .commit (m )
68
77
except KafkaException as e :
69
78
assert e .args [0 ].code () == KafkaError ._INVALID_ARG , \
70
79
"Expected INVALID_ARG, not {}" .format (e )
71
80
72
81
82
+ # Skipping the test for consumer protocol for now. Update the test to use
83
+ # IncrementalAlterConfigs Admin operation to update
84
+ # group.session.timeout.ms and enable the test again.
85
+ @pytest .mark .skipif (TestUtils .use_group_protocol_consumer (),
86
+ reason = "session.timeout.ms is not supported on client side for "
87
+ "consumer protocol. Update this test to use IncrementalAlterConfigs "
88
+ "Admin operation to update group.session.timeout.ms and enable "
89
+ "the test again." )
73
90
def test_consume_error_store_offsets (kafka_cluster ):
74
91
"""
75
92
Tests to ensure that we handle messages with errors when storing offsets.
@@ -89,7 +106,7 @@ def test_consume_error_store_offsets(kafka_cluster):
89
106
try :
90
107
# Since the session timeout value is low, JoinGroupRequest will fail
91
108
# and we get error in a message while polling.
92
- m = consumer .poll (1 )
109
+ m = consumer .poll (2 )
93
110
consumer .store_offsets (m )
94
111
except KafkaException as e :
95
112
assert e .args [0 ].code () == KafkaError ._INVALID_ARG , \
0 commit comments