Skip to content

Added new IConsumer.Consume overload taking target ConsumeResult/Message as param for low-alloc flows. #2369

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

UBSaxo
Copy link

@UBSaxo UBSaxo commented Nov 27, 2024

This PR adds a new IConsumer.Consume(int, ConsumeResult) overload that allows caller to bring his own (likely reused) ConsumeResult with embedded Message.

This makes it easy for caller to reuse single ConsumeResult instance and single Message instance and thereby reduce overall allocations.

Headers class has also been optimized internally to only create the backing list when first header is added. This avoids allocating an empty list for all messages without headers.

@UBSaxo UBSaxo requested review from a team as code owners November 27, 2024 13:40
@confluent-cla-assistant
Copy link

confluent-cla-assistant bot commented Nov 27, 2024

🎉 All Contributor License Agreements have been signed. Ready to merge.
✅ UBSaxo
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@emasab
Copy link
Contributor

emasab commented May 15, 2025

@UBSaxo thanks for your contribution and sorry for the long wait. We're starting to work on your PRs as promised.

I understand and appreciate the goal here, just we need some validation on why these changes are needed, that is how much they're reducing memory allocation or improving throughput.

This is a delicate matter as it's public API and we chose to not diverge from Java public API to avoid incompatibilities in future KIPs. Given rest of the language bindings and Java also use a garbage collector if we have significative improvements here we should have them in other languages as well, in that case we can go ahead and create a KIP to apply the same changes in all languages, including Java.

If you have these tests already you can share them otherwise I can use the Confluent.Kafka.Benchmark to compare it.

Same reasoning applies to #2367

@UBSaxo
Copy link
Author

UBSaxo commented May 19, 2025

I have done a small test consumer that consumes 1000 Message<byte[], byte[]> messages from a sample topic. I have disabled consume of headers, topic name and timestamp.

I then ran the Visual Studio memory profiler on it and tracked the managed memory allocations:

Type Allocations
byte[] 2006
Confluent.Kafka.TopicPartitionOffset 1.001
Confluent.Kafka.Impl.rd_kafka_message 1.000
Confluent.Kafka.ConsumeResult<System.Byte[], System.Byte[]> 1.000
Confluent.Kafka.Message<System.Byte[], System.Byte[]> 1.000

The byte[] allocations are primarily the message key and value components. The rd_kafka_message cannot be avoided, but the remaining 3.000 allocations can be completely eliminated if the API changes proposed by this PR are utilized. If headers were consumed even more could be gained.

So in short, with the proposed changes, the number of allocations can be reduced by 50%.

I agree that allocations generally are very cheap, and that these improvements likely will be insignificant in most real life use cases. We are more concerned about the garbage collection. We have business services where constant predictable low latency is critical, and we have seen real-life issues caused by garbage collection kicking in after excessive object allocations. These were not Kafka enabled at that time, but we would like them to be and are thus trying to minimize the memory allocation impact by doing so.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants