diff --git a/examples/asyncio_example.py b/examples/asyncio_example.py old mode 100755 new mode 100644 index bdfaebb2b..52b26b915 --- a/examples/asyncio_example.py +++ b/examples/asyncio_example.py @@ -1,6 +1,5 @@ #!/usr/bin/env python - -# flake8: noqa +# # Copyright 2019 Confluent Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,168 +14,169 @@ # See the License for the specific language governing permissions and # limitations under the License. - -# Companion code to the blog post "Integrating Kafka With Python -# Asyncio Web Applications" -# https://www.confluent.io/blog/kafka-python-asyncio-integration/ -# -# Example Siege [https://github.com/JoeDog/siege] test: -# $ siege -c 400 -r 200 'http://localhost:8000/items1 POST {"name":"testuser"}' - - import asyncio -import confluent_kafka -from confluent_kafka import KafkaException -from fastapi import FastAPI, HTTPException -from pydantic import BaseModel -from time import time -from threading import Thread -import uvicorn - - -class AIOProducer: - def __init__(self, configs, loop=None): - self._loop = loop or asyncio.get_event_loop() - self._producer = confluent_kafka.Producer(configs) - self._cancelled = False - self._poll_thread = Thread(target=self._poll_loop) - self._poll_thread.start() - - def _poll_loop(self): - while not self._cancelled: - self._producer.poll(0.1) - - def close(self): - self._cancelled = True - self._poll_thread.join() - - def produce(self, topic, value): - """ - An awaitable produce method. - """ - result = self._loop.create_future() - - def ack(err, msg): - if err: - self._loop.call_soon_threadsafe(result.set_exception, KafkaException(err)) - else: - self._loop.call_soon_threadsafe(result.set_result, msg) - self._producer.produce(topic, value, on_delivery=ack) - return result - - def produce2(self, topic, value, on_delivery): - """ - A produce method in which delivery notifications are made available - via both the returned future and on_delivery callback (if specified). - """ - result = self._loop.create_future() - - def ack(err, msg): - if err: - self._loop.call_soon_threadsafe( - result.set_exception, KafkaException(err)) - else: - self._loop.call_soon_threadsafe( - result.set_result, msg) - if on_delivery: - self._loop.call_soon_threadsafe( - on_delivery, err, msg) - self._producer.produce(topic, value, on_delivery=ack) - return result - - -class Producer: - def __init__(self, configs): - self._producer = confluent_kafka.Producer(configs) - self._cancelled = False - self._poll_thread = Thread(target=self._poll_loop) - self._poll_thread.start() +import sys +from confluent_kafka.aio import AIOProducer +from confluent_kafka.aio import AIOConsumer +import random +import logging +import signal - def _poll_loop(self): - while not self._cancelled: - self._producer.poll(0.1) +logging.basicConfig() +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +running = True - def close(self): - self._cancelled = True - self._poll_thread.join() - def produce(self, topic, value, on_delivery=None): - self._producer.produce(topic, value, on_delivery=on_delivery) +async def error_cb(err): + logger.error(f'Kafka error: {err}') -config = {"bootstrap.servers": "localhost:9092"} +async def throttle_cb(event): + logger.warning(f'Kafka throttle event: {event}') -app = FastAPI() +async def stats_cb(stats_json_str): + logger.info(f'Kafka stats: {stats_json_str}') -class Item(BaseModel): - name: str +def configure_common(conf): + bootstrap_servers = sys.argv[1] + conf.update({ + 'bootstrap.servers': bootstrap_servers, + 'logger': logger, + 'debug': 'conf', + 'error_cb': error_cb, + 'throttle_cb': throttle_cb, + 'stats_cb': stats_cb, + 'statistics.interval.ms': 5000, + }) -aio_producer = None -producer = None + return conf -@app.on_event("startup") -async def startup_event(): - global producer, aio_producer - aio_producer = AIOProducer(config) - producer = Producer(config) +async def run_producer(): + topic = sys.argv[2] + producer = AIOProducer(configure_common( + { + 'transactional.id': 'producer1' + }), max_workers=5) - -@app.on_event("shutdown") -def shutdown_event(): - aio_producer.close() - producer.close() - - -@app.post("/items1") -async def create_item1(item: Item): + await producer.init_transactions() + # TODO: handle exceptions with transactional API + transaction_active = False try: - result = await aio_producer.produce("items", item.name) - return {"timestamp": result.timestamp()} - except KafkaException as ex: - raise HTTPException(status_code=500, detail=ex.args[0].str()) - -cnt = 0 - - -def ack(err, msg): - global cnt - cnt = cnt + 1 + while running: + await producer.begin_transaction() + transaction_active = True + + produce_futures = [asyncio.create_task( + producer.produce(topic=topic, + key=f'testkey{i}', + value=f'testvalue{i}')) + for i in range(100)] + results = await asyncio.gather(*produce_futures) + + for msg in results: + logger.info( + 'Produced to: {} [{}] @ {}'.format(msg.topic(), + msg.partition(), + msg.offset())) + + await producer.commit_transaction() + transaction_active = False + await asyncio.sleep(1) + except Exception as e: + logger.error(e) + finally: + if transaction_active: + await producer.abort_transaction() + await producer.stop() + logger.info('Closed producer') + + +async def run_consumer(): + topic = sys.argv[2] + group_id = f'{topic}_{random.randint(1, 1000)}' + consumer = AIOConsumer(configure_common( + { + 'group.id': group_id, + 'auto.offset.reset': 'latest', + 'enable.auto.commit': 'false', + 'enable.auto.offset.store': 'false', + 'partition.assignment.strategy': 'cooperative-sticky', + })) + + async def on_assign(consumer, partitions): + # Calling incremental_assign is necessary to pause the assigned partitions + # otherwise it'll be done by the consumer after callback termination. + await consumer.incremental_assign(partitions) + await consumer.pause(partitions) + logger.debug(f'on_assign {partitions}') + # Resume the partitions as it's just a pause example + await consumer.resume(partitions) + + async def on_revoke(consumer, partitions): + logger.debug(f'before on_revoke {partitions}', ) + try: + await consumer.commit() + except Exception as e: + logger.info(f'Error during commit: {e}') + logger.debug(f'after on_revoke {partitions}') + + async def on_lost(consumer, partitions): + logger.debug(f'on_lost {partitions}') - -@app.post("/items2") -async def create_item2(item: Item): try: - aio_producer.produce2("items", item.name, on_delivery=ack) - return {"timestamp": time()} - except KafkaException as ex: - raise HTTPException(status_code=500, detail=ex.args[0].str()) - + await consumer.subscribe([topic], + on_assign=on_assign, + on_revoke=on_revoke, + # Remember to set a on_lost callback + # if you're committing on revocation + # as lost partitions cannot be committed + on_lost=on_lost) + i = 0 + while running: + message = await consumer.poll(1.0) + if message is None: + continue + + if i % 100 == 0: + position = await consumer.position(await consumer.assignment()) + logger.info(f'Current position: {position}') + await consumer.commit() + logger.info('Stored offsets were committed') + + err = message.error() + if err: + logger.error(f'Error: {err}') + else: + logger.info(f'Consumed: {message.value()}') + await consumer.store_offsets(message=message) + i += 1 + finally: + await consumer.unsubscribe() + await consumer.close() + logger.info('Closed consumer') -@app.post("/items3") -async def create_item3(item: Item): - try: - producer.produce("items", item.name, on_delivery=ack) - return {"timestamp": time()} - except KafkaException as ex: - raise HTTPException(status_code=500, detail=ex.args[0].str()) +def signal_handler(*_): + global running + logger.info('Signal received, shutting down...') + running = False -@app.post("/items4") -async def create_item4(item: Item): - try: - producer.produce("items", item.name) - return {"timestamp": time()} - except KafkaException as ex: - raise HTTPException(status_code=500, detail=ex.args[0].str()) +async def main(): + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) -@app.post("/items5") -async def create_item5(item: Item): - return {"timestamp": time()} + producer_task = asyncio.create_task(run_producer()) + consumer_task = asyncio.create_task(run_consumer()) + await asyncio.gather(producer_task, consumer_task) +try: + asyncio.run(main()) +except asyncio.exceptions.CancelledError as e: + logger.warning(f'Asyncio task was cancelled: {e}') -if __name__ == '__main__': - uvicorn.run(app, host='127.0.0.1', port=8000) +logger.info('End of example') diff --git a/src/confluent_kafka/aio/_AIOConsumer.py b/src/confluent_kafka/aio/_AIOConsumer.py new file mode 100644 index 000000000..fc79c7431 --- /dev/null +++ b/src/confluent_kafka/aio/_AIOConsumer.py @@ -0,0 +1,130 @@ +# Copyright 2025 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import concurrent.futures +import confluent_kafka +import functools +import confluent_kafka.aio._common as _common + + +class AIOConsumer: + def __init__(self, consumer_conf, max_workers=2, executor=None): + if executor is not None: + # Executor must have at least one worker. + # At least two workers are needed when calling re-entrant + # methods from callbacks. + self.executor = executor + else: + if max_workers < 1: + raise ValueError("max_workers must be at least 1") + self.executor = concurrent.futures.ThreadPoolExecutor( + max_workers=max_workers) + + loop = asyncio.get_event_loop() + wrap_common_callbacks = _common.wrap_common_callbacks + wrap_conf_callback = _common.wrap_conf_callback + wrap_common_callbacks(loop, consumer_conf) + wrap_conf_callback(loop, consumer_conf, 'on_commit') + + self._consumer = confluent_kafka.Consumer(consumer_conf) + + async def _call(self, blocking_task, *args, **kwargs): + return (await asyncio.gather( + asyncio.get_running_loop().run_in_executor(self.executor, + functools.partial( + blocking_task, + *args, + **kwargs)) + + ))[0] + + def _wrap_callback(self, loop, callback, edit_args=None, edit_kwargs=None): + def ret(*args, **kwargs): + if edit_args: + args = edit_args(args) + if edit_kwargs: + kwargs = edit_kwargs(kwargs) + f = asyncio.run_coroutine_threadsafe(callback(*args, **kwargs), + loop) + return f.result() + return ret + + async def poll(self, *args, **kwargs): + return await self._call(self._consumer.poll, *args, **kwargs) + + async def consume(self, *args, **kwargs): + return await self._call(self._consumer.consume, *args, **kwargs) + + def _edit_rebalance_callbacks_args(self, args): + args = list(args) + args[0] = self + return args + + async def subscribe(self, *args, **kwargs): + loop = asyncio.get_event_loop() + for callback in ['on_assign', 'on_revoke', 'on_lost']: + if callback in kwargs: + kwargs[callback] = self._wrap_callback(loop, kwargs[callback], + self._edit_rebalance_callbacks_args) # noqa: E501 + return await self._call(self._consumer.subscribe, *args, **kwargs) + + async def unsubscribe(self, *args, **kwargs): + return await self._call(self._consumer.unsubscribe, *args, **kwargs) + + async def commit(self, *args, **kwargs): + return await self._call(self._consumer.commit, *args, **kwargs) + + async def close(self, *args, **kwargs): + return await self._call(self._consumer.close, *args, **kwargs) + + async def seek(self, *args, **kwargs): + return await self._call(self._consumer.seek, *args, **kwargs) + + async def pause(self, *args, **kwargs): + return await self._call(self._consumer.pause, *args, **kwargs) + + async def resume(self, *args, **kwargs): + return await self._call(self._consumer.resume, *args, **kwargs) + + async def store_offsets(self, *args, **kwargs): + return await self._call(self._consumer.store_offsets, *args, **kwargs) + + async def committed(self, *args, **kwargs): + return await self._call(self._consumer.committed, *args, **kwargs) + + async def assign(self, *args, **kwargs): + return await self._call(self._consumer.assign, *args, **kwargs) + + async def unassign(self, *args, **kwargs): + return await self._call(self._consumer.unassign, *args, **kwargs) + + async def incremental_assign(self, *args, **kwargs): + return await self._call(self._consumer.incremental_assign, *args, **kwargs) + + async def incremental_unassign(self, *args, **kwargs): + return await self._call(self._consumer.incremental_unassign, *args, **kwargs) + + async def assignment(self, *args, **kwargs): + return await self._call(self._consumer.assignment, *args, **kwargs) + + async def position(self, *args, **kwargs): + return await self._call(self._consumer.position, *args, **kwargs) + + async def consumer_group_metadata(self, *args, **kwargs): + return await self._call(self._consumer.consumer_group_metadata, *args, **kwargs) + + async def set_sasl_credentials(self, *args, **kwargs): + return await self._call(self._consumer.set_sasl_credentials, + *args, **kwargs) diff --git a/src/confluent_kafka/aio/_AIOProducer.py b/src/confluent_kafka/aio/_AIOProducer.py new file mode 100644 index 000000000..4ffb9a561 --- /dev/null +++ b/src/confluent_kafka/aio/_AIOProducer.py @@ -0,0 +1,106 @@ +# Copyright 2025 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import concurrent.futures +import confluent_kafka +from confluent_kafka import KafkaException as _KafkaException +import functools +import confluent_kafka.aio._common as _common + + +class AIOProducer: + def __init__(self, producer_conf, max_workers=1, + executor=None, auto_poll=True): + if executor is not None: + self.executor = executor + else: + self.executor = concurrent.futures.ThreadPoolExecutor( + max_workers=max_workers) + loop = asyncio.get_event_loop() + wrap_common_callbacks = _common.wrap_common_callbacks + wrap_common_callbacks(loop, producer_conf) + + self._producer = confluent_kafka.Producer(producer_conf) + self._running = False + if auto_poll: + self._running = True + self._running_loop = asyncio.create_task(self._loop()) + + async def stop(self): + if self._running: + self._running = False + await self._running_loop + + async def _loop(self): + while self._running: + await self.poll(1.0) + + async def poll(self, *args, **kwargs): + await self._call(self._producer.poll, *args, **kwargs) + + async def _call(self, blocking_task, *args, **kwargs): + return (await asyncio.gather( + asyncio.get_running_loop().run_in_executor(self.executor, + functools.partial( + blocking_task, + *args, + **kwargs)) + + ))[0] + + async def produce(self, *args, **kwargs): + loop = asyncio.get_event_loop() + result = loop.create_future() + + def on_delivery(err, msg): + if err: + loop.call_soon_threadsafe(result.set_exception, + _KafkaException(err)) + else: + loop.call_soon_threadsafe(result.set_result, msg) + + kwargs['on_delivery'] = on_delivery + await self._call(self._producer.produce, *args, **kwargs) + return await result + + async def init_transactions(self, *args, **kwargs): + return await self._call(self._producer.init_transactions, + *args, **kwargs) + + async def begin_transaction(self, *args, **kwargs): + return await self._call(self._producer.begin_transaction, + *args, **kwargs) + + async def send_offsets_to_transaction(self, *args, **kwargs): + return await self._call(self._producer.send_offsets_to_transaction, + *args, **kwargs) + + async def commit_transaction(self, *args, **kwargs): + return await self._call(self._producer.commit_transaction, + *args, **kwargs) + + async def abort_transaction(self, *args, **kwargs): + return await self._call(self._producer.abort_transaction, + *args, **kwargs) + + async def flush(self, *args, **kwargs): + return await self._call(self._producer.flush, *args, **kwargs) + + async def purge(self, *args, **kwargs): + return await self._call(self._producer.purge, *args, **kwargs) + + async def set_sasl_credentials(self, *args, **kwargs): + return await self._call(self._producer.set_sasl_credentials, + *args, **kwargs) diff --git a/src/confluent_kafka/aio/__init__.py b/src/confluent_kafka/aio/__init__.py new file mode 100644 index 000000000..1edc80b8a --- /dev/null +++ b/src/confluent_kafka/aio/__init__.py @@ -0,0 +1,17 @@ +# Copyright 2025 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._AIOConsumer import AIOConsumer +from ._AIOProducer import AIOProducer +__all__ = ['AIOConsumer', 'AIOProducer'] diff --git a/src/confluent_kafka/aio/_common.py b/src/confluent_kafka/aio/_common.py new file mode 100644 index 000000000..56aa8c8fc --- /dev/null +++ b/src/confluent_kafka/aio/_common.py @@ -0,0 +1,56 @@ +# Copyright 2025 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio + + +class AsyncLogger: + + def __init__(self, loop, logger): + self.loop = loop + self.logger = logger + + def log(self, *args, **kwargs): + self.loop.call_soon_threadsafe(self.logger.log, *args, **kwargs) + + +def wrap_callback(loop, callback, edit_args=None, edit_kwargs=None): + def ret(*args, **kwargs): + if edit_args: + args = edit_args(args) + if edit_kwargs: + kwargs = edit_kwargs(kwargs) + f = asyncio.run_coroutine_threadsafe(callback(*args, **kwargs), + loop) + return f.result() + return ret + + +def wrap_conf_callback(loop, conf, name): + if name in conf: + cb = conf[name] + conf[name] = wrap_callback(loop, cb) + + +def wrap_conf_logger(loop, conf): + if 'logger' in conf: + conf['logger'] = AsyncLogger(loop, conf['logger']) + + +def wrap_common_callbacks(loop, conf): + wrap_conf_callback(loop, conf, 'error_cb') + wrap_conf_callback(loop, conf, 'throttle_cb') + wrap_conf_callback(loop, conf, 'stats_cb') + wrap_conf_callback(loop, conf, 'oauth_cb') + wrap_conf_logger(loop, conf)