@@ -89,9 +89,11 @@ def dr_cb(self, err, msg):
89
89
else :
90
90
self .dr_cnt += 1
91
91
self .dd_incr ("producer.drok" , 1 )
92
+ self .dd_gauge ("producer.latency" , msg .latency (),
93
+ tags = ["partition:{}" .format (msg .partition ())])
92
94
if (self .dr_cnt % self .disprate ) == 0 :
93
- self .logger .debug ("producer: delivered message to {} [{}] at offset {}" .format (
94
- msg .topic (), msg .partition (), msg .offset ()))
95
+ self .logger .debug ("producer: delivered message to {} [{}] at offset {} in {}s " .format (
96
+ msg .topic (), msg .partition (), msg .offset (), msg . latency () ))
95
97
96
98
def produce_record (self ):
97
99
""" Asynchronously produce a single record, but block and
@@ -236,7 +238,8 @@ def consumer_run(self):
236
238
txtime = headers .get ('time' , None )
237
239
if txtime is not None :
238
240
latency = time .time () - float (txtime )
239
- self .dd_gauge ("consumer.e2e_latency" , latency )
241
+ self .dd_gauge ("consumer.e2e_latency" , latency ,
242
+ tags = ["partition:{}" .format (msg .partition ())])
240
243
else :
241
244
latency = None
242
245
@@ -310,6 +313,25 @@ def producer_error_cb(self, err):
310
313
self .producer_error_cb_cnt += 1
311
314
self .dd_incr ("producer.errorcb" , 1 )
312
315
316
+ def rtt_stats (self , d ):
317
+ """ Extract broker rtt statistics from the stats dict in @param d """
318
+
319
+ # Get leader RTT stats
320
+ for broker in d ['brokers' ].values ():
321
+ if broker ['toppars' ] is None :
322
+ continue
323
+
324
+ parts = ',' .join ([str (x ['partition' ]) for x in broker ['toppars' ].values ()])
325
+
326
+ tags = ["broker:{}" .format (broker ['nodeid' ]),
327
+ "partitions:{}" .format (parts ),
328
+ "type:{}" .format (d ['type' ])]
329
+
330
+ self .dd_gauge ("broker.rtt.p99" ,
331
+ float (broker ['rtt' ]['p99' ]) / 1000000.0 , tags = tags )
332
+ self .dd_gauge ("broker.rtt.avg" ,
333
+ float (broker ['rtt' ]['avg' ]) / 1000000.0 , tags = tags )
334
+
313
335
def stats_cb (self , json_str ):
314
336
""" Common statistics callback. """
315
337
d = json .loads (json_str )
@@ -332,6 +354,8 @@ def stats_cb(self, json_str):
332
354
if (self .stats_cnt [d ['type' ]] % 11 ) == 0 :
333
355
self .logger .info ("{} raw stats: {}" .format (d ['name' ], json_str ))
334
356
357
+ self .rtt_stats (d )
358
+
335
359
# Sample the producer queue length
336
360
if d ['type' ] == 'producer' :
337
361
self .dd_gauge ("producer.outq" , len (self .producer ))
@@ -451,9 +475,10 @@ def dd_incr(self, metric_name, incrval):
451
475
""" Increment datadog metric counter by incrval """
452
476
self .dd .increment (self .DD_PFX + metric_name , incrval , host = self .hostname )
453
477
454
- def dd_gauge (self , metric_name , val ):
478
+ def dd_gauge (self , metric_name , val , tags = None ):
455
479
""" Set datadog metric gauge to val """
456
- self .dd .gauge (self .DD_PFX + metric_name , val , host = self .hostname )
480
+ self .dd .gauge (self .DD_PFX + metric_name , val ,
481
+ tags = tags , host = self .hostname )
457
482
458
483
def calc_rusage_deltas (self , curr , prev , elapsed ):
459
484
""" Calculate deltas between previous and current resource usage """
0 commit comments