Skip to content

Commit 5ab613e

Browse files
committed
Added Message.latency() to retrieve the per-message produce latency
1 parent a149304 commit 5ab613e

File tree

4 files changed

+32
-5
lines changed

4 files changed

+32
-5
lines changed

src/confluent_kafka/src/confluent_kafka.c

+23
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,12 @@ static PyObject *Message_timestamp (Message *self, PyObject *ignore) {
483483
self->timestamp);
484484
}
485485

486+
static PyObject *Message_latency (Message *self, PyObject *ignore) {
487+
if (self->latency == -1)
488+
Py_RETURN_NONE;
489+
return PyFloat_FromDouble((double)self->latency / 1000000.0);
490+
}
491+
486492
static PyObject *Message_headers (Message *self, PyObject *ignore) {
487493
#ifdef RD_KAFKA_V_HEADERS
488494
if (self->headers) {
@@ -587,6 +593,18 @@ static PyMethodDef Message_methods[] = {
587593
" :rtype: (int, int)\n"
588594
"\n"
589595
},
596+
{ "latency", (PyCFunction)Message_latency, METH_NOARGS,
597+
"Retrieve the time it took to produce the message, from calling "
598+
"produce() to the time the acknowledgement was received from "
599+
"the broker.\n"
600+
"Must only be used with the producer for successfully produced "
601+
"messages.\n"
602+
"\n"
603+
" :returns: latency as float seconds, or None if latency "
604+
"information is not available (such as for errored messages).\n"
605+
" :rtype: float or None\n"
606+
"\n"
607+
},
590608
{ "headers", (PyCFunction)Message_headers, METH_NOARGS,
591609
" Retrieve the headers set on a message. Each header is a key value"
592610
"pair. Please note that header keys are ordered and can repeat.\n"
@@ -769,6 +787,11 @@ PyObject *Message_new0 (const Handle *handle, const rd_kafka_message_t *rkm) {
769787

770788
self->timestamp = rd_kafka_message_timestamp(rkm, &self->tstype);
771789

790+
if (handle->type == RD_KAFKA_PRODUCER)
791+
self->latency = rd_kafka_message_latency(rkm);
792+
else
793+
self->latency = -1;
794+
772795
return (PyObject *)self;
773796
}
774797

src/confluent_kafka/src/confluent_kafka.h

+1
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ typedef struct {
417417
int64_t offset;
418418
int64_t timestamp;
419419
rd_kafka_timestamp_type_t tstype;
420+
int64_t latency; /**< Producer: time it took to produce message */
420421
} Message;
421422

422423
extern PyTypeObject MessageType;

tests/integration/integration_test.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,9 @@ def _delivery(err, msg, silent=False):
135135
return 0
136136
else:
137137
if not silent:
138-
print('Message delivered to %s [%s] at offset [%s]: %s' %
139-
(msg.topic(), msg.partition(), msg.offset(), msg.value()))
138+
print('Message delivered to %s [%s] at offset [%s] in %.3fs: %s' %
139+
(msg.topic(), msg.partition(), msg.offset(),
140+
msg.latency(), msg.value()))
140141
return 1
141142

142143
def delivery(self, err, msg):
@@ -145,8 +146,9 @@ def delivery(self, err, msg):
145146
(msg.topic(), str(msg.partition()), err))
146147
return
147148
elif not self.silent:
148-
print('Message delivered to %s [%s] at offset [%s]: %s' %
149-
(msg.topic(), msg.partition(), msg.offset(), msg.value()))
149+
print('Message delivered to %s [%s] at offset [%s] in %.3fs: %s' %
150+
(msg.topic(), msg.partition(), msg.offset(),
151+
msg.latency(), msg.value()))
150152
self.msgs_delivered += 1
151153
self.bytes_delivered += len(msg)
152154

tests/test_Producer.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@ def test_basic_api():
2727
p.produce('mytopic', value='somedata', key='a key')
2828

2929
def on_delivery(err, msg):
30-
print('delivery', str)
30+
print('delivery', err, msg)
3131
# Since there is no broker, produced messages should time out.
3232
assert err.code() == KafkaError._MSG_TIMED_OUT
33+
print('message latency', msg.latency())
3334

3435
p.produce(topic='another_topic', value='testing', partition=9,
3536
callback=on_delivery)

0 commit comments

Comments
 (0)