19
19
# Example Admin clients.
20
20
#
21
21
22
- from confluent_kafka .admin import AdminClient , NewTopic , NewPartitions , ConfigResource , ConfigSource
22
+ from confluent_kafka .admin import (AdminClient , NewTopic , NewPartitions , ConfigResource , ConfigSource ,
23
+ AclBinding , AclBindingFilter , ResourceType , ResourcePatternType ,
24
+ AclOperation , AclPermissionType )
23
25
from confluent_kafka import KafkaException
24
26
import sys
25
27
import threading
28
30
logging .basicConfig ()
29
31
30
32
33
+ def parse_nullable_string (s ):
34
+ if s == "None" :
35
+ return None
36
+ else :
37
+ return s
38
+
39
+
31
40
def example_create_topics (a , topics ):
32
41
""" Create topics """
33
42
@@ -117,6 +126,133 @@ def example_describe_configs(a, args):
117
126
raise
118
127
119
128
129
+ def example_create_acls (a , args ):
130
+ """ create acls """
131
+
132
+ acl_bindings = [
133
+ AclBinding (
134
+ ResourceType [restype ],
135
+ parse_nullable_string (resname ),
136
+ ResourcePatternType [resource_pattern_type ],
137
+ parse_nullable_string (principal ),
138
+ parse_nullable_string (host ),
139
+ AclOperation [operation ],
140
+ AclPermissionType [permission_type ]
141
+ )
142
+ for restype , resname , resource_pattern_type ,
143
+ principal , host , operation , permission_type
144
+ in zip (
145
+ args [0 ::7 ],
146
+ args [1 ::7 ],
147
+ args [2 ::7 ],
148
+ args [3 ::7 ],
149
+ args [4 ::7 ],
150
+ args [5 ::7 ],
151
+ args [6 ::7 ],
152
+ )
153
+ ]
154
+
155
+ fs = a .create_acls (acl_bindings , request_timeout = 10 )
156
+
157
+ # Wait for operation to finish.
158
+ for res , f in fs .items ():
159
+ try :
160
+ result = f .result ()
161
+ if result is None :
162
+ print ("Created {}" .format (res ))
163
+
164
+ except KafkaException as e :
165
+ print ("Failed to create ACL {}: {}" .format (res , e ))
166
+ except Exception :
167
+ raise
168
+
169
+
170
+ def example_describe_acls (a , args ):
171
+ """ describe acls """
172
+
173
+ acl_binding_filters = [
174
+ AclBindingFilter (
175
+ ResourceType [restype ],
176
+ parse_nullable_string (resname ),
177
+ ResourcePatternType [resource_pattern_type ],
178
+ parse_nullable_string (principal ),
179
+ parse_nullable_string (host ),
180
+ AclOperation [operation ],
181
+ AclPermissionType [permission_type ]
182
+ )
183
+ for restype , resname , resource_pattern_type ,
184
+ principal , host , operation , permission_type
185
+ in zip (
186
+ args [0 ::7 ],
187
+ args [1 ::7 ],
188
+ args [2 ::7 ],
189
+ args [3 ::7 ],
190
+ args [4 ::7 ],
191
+ args [5 ::7 ],
192
+ args [6 ::7 ],
193
+ )
194
+ ]
195
+
196
+ fs = [
197
+ a .describe_acls (acl_binding_filter , request_timeout = 10 )
198
+ for acl_binding_filter in acl_binding_filters
199
+ ]
200
+ # Wait for operations to finish.
201
+ for acl_binding_filter , f in zip (acl_binding_filters , fs ):
202
+ try :
203
+ print ("Acls matching filter: {}" .format (acl_binding_filter ))
204
+ acl_bindings = f .result ()
205
+ for acl_binding in acl_bindings :
206
+ print (acl_binding )
207
+
208
+ except KafkaException as e :
209
+ print ("Failed to describe {}: {}" .format (acl_binding_filter , e ))
210
+ except Exception :
211
+ raise
212
+
213
+
214
+ def example_delete_acls (a , args ):
215
+ """ delete acls """
216
+
217
+ acl_binding_filters = [
218
+ AclBindingFilter (
219
+ ResourceType [restype ],
220
+ parse_nullable_string (resname ),
221
+ ResourcePatternType [resource_pattern_type ],
222
+ parse_nullable_string (principal ),
223
+ parse_nullable_string (host ),
224
+ AclOperation [operation ],
225
+ AclPermissionType [permission_type ]
226
+ )
227
+ for restype , resname , resource_pattern_type ,
228
+ principal , host , operation , permission_type
229
+ in zip (
230
+ args [0 ::7 ],
231
+ args [1 ::7 ],
232
+ args [2 ::7 ],
233
+ args [3 ::7 ],
234
+ args [4 ::7 ],
235
+ args [5 ::7 ],
236
+ args [6 ::7 ],
237
+ )
238
+ ]
239
+
240
+ fs = a .delete_acls (acl_binding_filters , request_timeout = 10 )
241
+
242
+ # Wait for operation to finish.
243
+ for res , f in fs .items ():
244
+ try :
245
+ acl_bindings = f .result ()
246
+ print ("Deleted acls matching filter: {}" .format (res ))
247
+ for acl_binding in acl_bindings :
248
+ print (" " , acl_binding )
249
+
250
+ except KafkaException as e :
251
+ print ("Failed to delete {}: {}" .format (res , e ))
252
+ except Exception :
253
+ raise
254
+
255
+
120
256
def example_alter_configs (a , args ):
121
257
""" Alter configs atomically, replacing non-specified
122
258
configuration properties with their default values.
@@ -300,6 +436,12 @@ def example_list(a, args):
300
436
'<config=val,config2=val2> <resource_type2> <resource_name2> <config..> ..\n ' )
301
437
sys .stderr .write (' delta_alter_configs <resource_type1> <resource_name1> ' +
302
438
'<config=val,config2=val2> <resource_type2> <resource_name2> <config..> ..\n ' )
439
+ sys .stderr .write (' create_acls <resource_type1> <resource_name1> <resource_patter_type1> ' +
440
+ '<principal1> <host1> <operation1> <permission_type1> ..\n ' )
441
+ sys .stderr .write (' describe_acls <resource_type1 <resource_name1> <resource_patter_type1> ' +
442
+ '<principal1> <host1> <operation1> <permission_type1> ..\n ' )
443
+ sys .stderr .write (' delete_acls <resource_type1> <resource_name1> <resource_patter_type1> ' +
444
+ '<principal1> <host1> <operation1> <permission_type1> ..\n ' )
303
445
sys .stderr .write (' list [<all|topics|brokers|groups>]\n ' )
304
446
sys .exit (1 )
305
447
@@ -316,6 +458,9 @@ def example_list(a, args):
316
458
'describe_configs' : example_describe_configs ,
317
459
'alter_configs' : example_alter_configs ,
318
460
'delta_alter_configs' : example_delta_alter_configs ,
461
+ 'create_acls' : example_create_acls ,
462
+ 'describe_acls' : example_describe_acls ,
463
+ 'delete_acls' : example_delete_acls ,
319
464
'list' : example_list }
320
465
321
466
if operation not in opsmap :
0 commit comments