Skip to content

Commit baee8a6

Browse files
authored
Add producer purge method with optional blocking argument (@peteryin21, confluentinc#548)
1 parent 7768191 commit baee8a6

File tree

2 files changed

+78
-1
lines changed

2 files changed

+78
-1
lines changed

confluent_kafka/src/Producer.c

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
145145
goto done;
146146

147147
msgobj = Message_new0(self, rkm);
148-
148+
149149
args = Py_BuildValue("(OO)", ((Message *)msgobj)->error, msgobj);
150150

151151
Py_DECREF(msgobj);
@@ -532,6 +532,36 @@ static PyObject *Producer_abort_transaction(Handle *self, PyObject *args) {
532532
Py_RETURN_NONE;
533533
}
534534

535+
static void *Producer_purge (Handle *self, PyObject *args,
536+
PyObject *kwargs) {
537+
int in_queue = 1;
538+
int in_flight = 1;
539+
int blocking = 1;
540+
int purge_strategy = 0;
541+
542+
rd_kafka_resp_err_t err;
543+
static char *kws[] = { "in_queue", "in_flight", "blocking", NULL};
544+
545+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|bbb", kws, &in_queue, &in_flight, &blocking))
546+
return NULL;
547+
if (in_queue)
548+
purge_strategy = RD_KAFKA_PURGE_F_QUEUE;
549+
if (in_flight)
550+
purge_strategy |= RD_KAFKA_PURGE_F_INFLIGHT;
551+
if (blocking)
552+
purge_strategy |= RD_KAFKA_PURGE_F_NON_BLOCKING;
553+
554+
err = rd_kafka_purge(self->rk, purge_strategy);
555+
556+
if (err) {
557+
cfl_PyErr_Format(err, "Purge failed: %s", rd_kafka_err2str(err));
558+
return NULL;
559+
}
560+
561+
Py_RETURN_NONE;
562+
}
563+
564+
535565
static PyMethodDef Producer_methods[] = {
536566
{ "produce", (PyCFunction)Producer_produce,
537567
METH_VARARGS|METH_KEYWORDS,
@@ -597,6 +627,18 @@ static PyMethodDef Producer_methods[] = {
597627
"callbacks may be triggered.\n"
598628
"\n"
599629
},
630+
{ "purge", (PyCFunction)Producer_purge, METH_VARARGS|METH_KEYWORDS,
631+
".. py:function:: purge([in_queue=True], [in_flight=True], [blocking=True])\n"
632+
"\n"
633+
" Purge messages currently handled by the producer instance.\n"
634+
" The application will need to call poll() or flush() "
635+
"afterwards to serve the delivery report callbacks of the purged messages."
636+
"\n"
637+
" :param: bool in_queue: Purge messages from internal queues. By default, true.\n"
638+
" :param: bool in_flight: Purge messages in flight to or from the broker. By default, true.\n"
639+
" :param: bool blocking: If set to False, will not wait on background thread queue\n"
640+
"purging to finish. By default, true."
641+
},
600642
{ "list_topics", (PyCFunction)list_topics, METH_VARARGS|METH_KEYWORDS,
601643
list_topics_doc
602644
},

tests/test_Producer.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,3 +235,38 @@ def test_transaction_api():
235235
assert ex.value.args[0].retriable() is False
236236
assert ex.value.args[0].fatal() is False
237237
assert ex.value.args[0].txn_requires_abort() is False
238+
239+
240+
def test_purge():
241+
"""
242+
Verify that when we have a higher message.timeout.ms timeout, we can use purge()
243+
to stop waiting for messages and get delivery reports
244+
"""
245+
p = Producer(
246+
{"socket.timeout.ms": 10, "error_cb": error_cb, "message.timeout.ms": 30000}
247+
) # 30 seconds
248+
249+
# Hack to detect on_delivery was called because inner functions can modify nonlocal objects.
250+
# When python2 support is dropped, we can use the "nonlocal" keyword instead
251+
cb_detector = {"on_delivery_called": False}
252+
253+
def on_delivery(err, msg):
254+
cb_detector["on_delivery_called"] = True
255+
# Because we are purging messages, we should see a PURGE_QUEUE kafka error
256+
assert err.code() == KafkaError._PURGE_QUEUE
257+
258+
# Our message won't be delivered, but also won't timeout yet because our timeout is 30s.
259+
p.produce(topic="some_topic", value="testing", partition=9, callback=on_delivery)
260+
p.flush(0.002)
261+
assert not cb_detector["on_delivery_called"]
262+
263+
# When in_queue set to false, we won't purge the message and get delivery callback
264+
p.purge(in_queue=False)
265+
p.flush(0.002)
266+
assert not cb_detector["on_delivery_called"]
267+
268+
# When we purge including the queue, the message should have delivered a delivery report
269+
# with a PURGE_QUEUE error
270+
p.purge()
271+
p.flush(0.002)
272+
assert cb_detector["on_delivery_called"]

0 commit comments

Comments
 (0)