Skip to content

Commit d8d41ec

Browse files
author
Matt Howlett
committed
tweaks
1 parent c6538f2 commit d8d41ec

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+296
-165
lines changed

doc/docfx.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"src": [
55
{
66
"files": [
7-
"src/Confluent.Kafka/*.cs"
7+
"src/**/*.cs"
88
],
99
"cwd": ".."
1010
}

src/Confluent.Kafka/Consumer.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ namespace Confluent.Kafka
3535
/// consumer <see cref="Confluent.Kafka.Consumer{TKey,TValue}" /> where possible
3636
/// (use the byte[] deserializer).
3737
/// </summary>
38-
public class Consumer : IConsumer, IDisposable
38+
public class Consumer : IConsumer
3939
{
4040
private SafeKafkaHandle kafkaHandle;
4141

src/Confluent.Kafka/Consumer_KV.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ namespace Confluent.Kafka
3131
/// Implements a high-level Apache Kafka consumer (with
3232
/// key and value deserialization).
3333
/// </summary>
34-
public class Consumer<TKey, TValue> : IConsumer<TKey, TValue>, IDisposable
34+
public class Consumer<TKey, TValue> : IConsumer<TKey, TValue>
3535
{
3636
private readonly Consumer consumer;
3737

src/Confluent.Kafka/ErrorCode.cs

+20
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,26 @@ public enum ErrorCode
233233
/// </summary>
234234
Local_ValueDeserialization = -159,
235235

236+
/// <summary>
237+
/// Partial response
238+
/// </summary>
239+
Local_Partial = -158,
240+
241+
/// <summary>
242+
/// Modification attempted on read-only object
243+
/// </summary>
244+
Local_ReadOnly = -157,
245+
246+
/// <summary>
247+
/// No such entry / item not found
248+
/// </summary>
249+
Local_NoEnt = -156,
250+
251+
/// <summary>
252+
/// Read underflow
253+
/// </summary>
254+
Local_Underflow = -155,
255+
236256

237257
/// <summary>
238258
/// Unknown broker error

src/Confluent.Kafka/Headers.cs

-2
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414
//
15-
// Derived from: rdkafka-dotnet, licensed under the 2-clause BSD License.
16-
//
1715
// Refer to LICENSE for more information.
1816

1917
using System.Collections.Generic;

src/Confluent.Kafka/IConsumer.cs

+2-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2016-2017 Confluent Inc., 2015-2016 Andreas Heider
1+
// Copyright 2018 Confluent Inc.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -12,8 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414
//
15-
// Derived from: rdkafka-dotnet, licensed under the 2-clause BSD License.
16-
//
1715
// Refer to LICENSE for more information.
1816

1917
using System;
@@ -35,7 +33,7 @@ namespace Confluent.Kafka
3533
/// consumer <see cref="Confluent.Kafka.IConsumer{TKey,TValue}" /> where possible
3634
/// (use the byte[] deserializer).
3735
/// </summary>
38-
public interface IConsumer
36+
public interface IConsumer : IDisposable
3937
{
4038
/// <include file='include_docs_consumer.xml' path='API/Member[@name="OnPartitionsAssigned"]/*' />
4139
event EventHandler<List<TopicPartition>> OnPartitionsAssigned;

src/Confluent.Kafka/IConsumer_KV.cs

+2-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2016-2017 Confluent Inc., 2015-2016 Andreas Heider
1+
// Copyright 2018 Confluent Inc.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -12,8 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414
//
15-
// Derived from: rdkafka-dotnet, licensed under the 2-clause BSD License.
16-
//
1715
// Refer to LICENSE for more information.
1816

1917
using System;
@@ -31,7 +29,7 @@ namespace Confluent.Kafka
3129
/// Defines a high-level Apache Kafka consumer (with key and
3230
/// value deserialization).
3331
/// </summary>
34-
public interface IConsumer<TKey, TValue>
32+
public interface IConsumer<TKey, TValue> : IDisposable
3533
{
3634
/// <include file='include_docs_consumer.xml' path='API/Member[@name="KeyDeserializer"]/*' />
3735
IDeserializer<TKey> KeyDeserializer { get; }

src/Confluent.Kafka/IProducer.cs

+3-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2016-2017 Confluent Inc., 2015-2016 Andreas Heider
1+
// Copyright 2018 Confluent Inc.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -12,8 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414
//
15-
// Derived from: rdkafka-dotnet, licensed under the 2-clause BSD License.
16-
//
1715
// Refer to LICENSE for more information.
1816

1917
using System;
@@ -31,7 +29,8 @@
3129
namespace Confluent.Kafka
3230
{
3331
/// <summary>
34-
/// Defines a high-level Apache Kafka producer (without serialization).
32+
/// Defines a high-level Apache Kafka producer client (without
33+
/// serialization capability).
3534
/// </summary>
3635
public interface IProducer : IDisposable
3736
{

src/Confluent.Kafka/IProducer_KV.cs

+2-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2016-2017 Confluent Inc., 2015-2016 Andreas Heider
1+
// Copyright 2018 Confluent Inc.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -12,8 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414
//
15-
// Derived from: rdkafka-dotnet, licensed under the 2-clause BSD License.
16-
//
1715
// Refer to LICENSE for more information.
1816

1917
using System;
@@ -31,7 +29,7 @@
3129
namespace Confluent.Kafka
3230
{
3331
/// <summary>
34-
/// Defines a high-level Apache Kafka producer with key
32+
/// Defines a high-level Apache Kafka producer client that provides key
3533
/// and value serialization.
3634
/// </summary>
3735
public interface IProducer<TKey, TValue> : ISerializingProducer<TKey, TValue>, IDisposable

src/Confluent.Kafka/ISerializingProducer.cs

+2-4
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
namespace Confluent.Kafka
2424
{
2525
/// <summary>
26-
/// Describes the minimum functionality provided by a high level (serializing) Kafka producer.
26+
/// A focused interface for producing messages to Kafka with key and
27+
/// value serialization (excludes general client functionality).
2728
/// </summary>
2829
public interface ISerializingProducer<TKey, TValue>
2930
{
@@ -33,9 +34,6 @@ public interface ISerializingProducer<TKey, TValue>
3334
/// <include file='include_docs_producer.xml' path='API/Member[@name="ValueSerializer"]/*' />
3435
ISerializer<TValue> ValueSerializer { get; }
3536

36-
/// <include file='include_docs_client.xml' path='API/Member[@name="Name"]/*' />
37-
string Name { get; }
38-
3937
/// <include file='include_docs_producer.xml' path='API/Member[@name="ProduceAsync_Message"]/*' />
4038
/// <include file='include_docs_producer.xml' path='API/Member[@name="ProduceAsync_Common"]/*' />
4139
Task<Message<TKey, TValue>> ProduceAsync(Message<TKey, TValue> message);

src/Confluent.Kafka/Ignore.cs

+2-3
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@
2020
namespace Confluent.Kafka
2121
{
2222
/// <summary>
23-
/// A type for use in conjunction with <see cref="Confluent.Kafka.Serialization.IgnoreDeserializer" />
24-
/// that enables message keys or values to be read as null, regardless
25-
/// of their value.
23+
/// A type for use in conjunction with IgnoreDeserializer that enables
24+
/// message keys or values to be read as null, regardless of their value.
2625
/// </summary>
2726
public sealed class Ignore
2827
{

src/Confluent.Kafka/Null.cs

+3-5
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,13 @@
1414
//
1515
// Refer to LICENSE for more information.
1616

17-
using Confluent.Kafka.Serialization;
18-
1917

2018
namespace Confluent.Kafka
2119
{
2220
/// <summary>
23-
/// A type for use in conjunction with <see cref="Confluent.Kafka.Serialization.NullSerializer" />
24-
/// and <see cref="Confluent.Kafka.Serialization.NullDeserializer" /> that enables null key or
25-
/// values to be enforced when producing or consuming messages.
21+
/// A type for use in conjunction with NullSerializer and NullDeserializer
22+
/// that enables null key or values to be enforced when producing or
23+
/// consuming messages.
2624
/// </summary>
2725
public sealed class Null
2826
{

src/Confluent.Kafka/Partition.cs

+1-2
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ public struct Partition
3434
/// <summary>
3535
/// A special value that refers to an unspecified / unknown partition.
3636
/// </summary>
37-
/// <returns></returns>
38-
public static Partition NotSpecified { get { return new Partition(RD_KAFKA_PARTITION_UA); } }
37+
public static Partition Any { get { return new Partition(RD_KAFKA_PARTITION_UA); } }
3938

4039
/// <summary>
4140
/// Initializes a new instance of the Partition structure.

src/Confluent.Kafka/Producer.cs

+37-18
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,24 @@ private static void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntP
150150
{
151151
var msg = Util.Marshal.PtrToStructureUnsafe<rd_kafka_message>(rkmessage);
152152

153+
Headers headers = new Headers();
154+
LibRdKafka.message_headers(rkmessage, out IntPtr hdrsPtr);
155+
if (hdrsPtr != IntPtr.Zero)
156+
{
157+
for (var i=0; ; ++i)
158+
{
159+
var err = LibRdKafka.header_get_all(hdrsPtr, (IntPtr)i, out IntPtr namep, out IntPtr valuep, out IntPtr sizep);
160+
if (err != ErrorCode.NoError)
161+
{
162+
break;
163+
}
164+
var headerName = Util.Marshal.PtrToStringUTF8(namep);
165+
var headerValue = new byte[(int)sizep];
166+
Marshal.Copy(valuep, headerValue, 0, (int)sizep);
167+
headers.Add(new KeyValuePair<string, byte[]>(headerName, headerValue));
168+
}
169+
}
170+
153171
// the msg._private property has dual purpose. Here, it is an opaque pointer set
154172
// by Topic.Produce to be an IDeliveryHandler. When Consuming, it's for internal
155173
// use (hence the name).
@@ -193,7 +211,7 @@ private static void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntP
193211
key,
194212
val,
195213
new Timestamp(timestamp, (TimestampType)timestampType),
196-
null,
214+
headers,
197215
msg.err
198216
)
199217
);
@@ -321,22 +339,22 @@ public Producer(IEnumerable<KeyValuePair<string, object>> config, bool manualPol
321339
prop.Key != EnableBackgroundPollPropertyName &&
322340
prop.Key != EnableDeliveryReportsPropertyName);
323341

324-
var enableBackgroundPollStr = (string)config.FirstOrDefault(prop => prop.Key == EnableBackgroundPollPropertyName).Value;
325-
if (enableBackgroundPollStr != null)
342+
var enableBackgroundPollObj = config.FirstOrDefault(prop => prop.Key == EnableBackgroundPollPropertyName).Value;
343+
if (enableBackgroundPollObj != null)
326344
{
327-
this.manualPoll = !bool.Parse(enableBackgroundPollStr);
345+
this.manualPoll = !bool.Parse(enableBackgroundPollObj.ToString());
328346
}
329347

330-
var enableDeliveryReportsStr = (string)config.FirstOrDefault(prop => prop.Key == EnableDeliveryReportsPropertyName).Value;
331-
if (enableDeliveryReportsStr != null)
348+
var enableDeliveryReportsObj = config.FirstOrDefault(prop => prop.Key == EnableDeliveryReportsPropertyName).Value;
349+
if (enableDeliveryReportsObj != null)
332350
{
333-
this.disableDeliveryReports = !bool.Parse(enableDeliveryReportsStr);
351+
this.disableDeliveryReports = !bool.Parse(enableDeliveryReportsObj.ToString());
334352
}
335353

336-
var blockIfQueueFullStr = (string)config.FirstOrDefault(prop => prop.Key == BlockIfQueueFullPropertyName).Value;
337-
if (blockIfQueueFullStr != null)
354+
var blockIfQueueFullObj = config.FirstOrDefault(prop => prop.Key == BlockIfQueueFullPropertyName).Value;
355+
if (blockIfQueueFullObj != null)
338356
{
339-
this.blockIfQueueFullPropertyValue = bool.Parse(blockIfQueueFullStr);
357+
this.blockIfQueueFullPropertyValue = bool.Parse(blockIfQueueFullObj.ToString());
340358
}
341359

342360
// Note: Setting default topic configuration properties via default.topic.config is depreciated
@@ -404,8 +422,9 @@ public Producer(IEnumerable<KeyValuePair<string, object>> config, bool manualPol
404422
/// librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
405423
/// </param>
406424
public Producer(IEnumerable<KeyValuePair<string, object>> config)
425+
#pragma warning disable CS0618
407426
: this(config, false, false) {}
408-
427+
#pragma warning restore CS0618
409428

410429
/// <include file='include_docs_producer.xml' path='API/Member[@name="Poll_int"]/*' />
411430
public int Poll(int millisecondsTimeout)
@@ -454,7 +473,7 @@ public Task<Message> ProduceAsync(Message message)
454473
/// <include file='include_docs_producer.xml' path='API/Member[@name="ProduceAsync_string_TKey_TValue"]/*' />
455474
/// <include file='include_docs_producer.xml' path='API/Member[@name="ProduceAsync_Common"]/*' />
456475
public Task<Message> ProduceAsync(string topic, byte[] key, byte[] val)
457-
=> ProduceImpl(topic, val, 0, val?.Length ?? 0, key, 0, key?.Length ?? 0, Timestamp.Default, Partition.NotSpecified, null, this.blockIfQueueFullPropertyValue);
476+
=> ProduceImpl(topic, val, 0, val?.Length ?? 0, key, 0, key?.Length ?? 0, Timestamp.Default, Partition.Any, null, this.blockIfQueueFullPropertyValue);
458477

459478
/// <include file='include_docs_producer.xml' path='API/Member[@name="ProduceAsync_string_Partition_TKey_TValue_Timestamp_IEnumerable"]/*' />
460479
/// <include file='include_docs_producer.xml' path='API/Member[@name="ProduceAsync_Common"]/*' />
@@ -492,7 +511,7 @@ public void Produce(Message message, IDeliveryHandler deliveryHandler)
492511
/// <include file='include_docs_producer.xml' path='API/Member[@name="ProduceAsync_string_TKey_TValue"]/*' />
493512
/// <include file='include_docs_producer.xml' path='API/Member[@name="Produce_IDeliveryHandler"]/*' />
494513
public void Produce(string topic, byte[] key, byte[] val, IDeliveryHandler deliveryHandler)
495-
=> ProduceImpl(topic, val, 0, val?.Length ?? 0, key, 0, key?.Length ?? 0, Timestamp.Default, Partition.NotSpecified, null, this.blockIfQueueFullPropertyValue, deliveryHandler);
514+
=> ProduceImpl(topic, val, 0, val?.Length ?? 0, key, 0, key?.Length ?? 0, Timestamp.Default, Partition.Any, null, this.blockIfQueueFullPropertyValue, deliveryHandler);
496515

497516
/// <include file='include_docs_producer.xml' path='API/Member[@name="ProduceAsync_string_Partition_TKey_TValue_Timestamp_IEnumerable"]/*' />
498517
/// <include file='include_docs_producer.xml' path='API/Member[@name="Produce_IDeliveryHandler"]/*' />
@@ -520,7 +539,7 @@ IDeliveryHandler deliveryHandler
520539
/// <include file='include_docs_producer.xml' path='API/Member[@name="ProduceAsync_Obsolete"]/*' />
521540
[Obsolete("The Producer API has been revised and this overload of ProduceAsync has been depreciated. Please use another variant of ProduceAsync.")]
522541
public Task<Message> ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength)
523-
=> ProduceImpl(topic, val, valOffset, valLength, key, keyOffset, keyLength, Timestamp.Default, Partition.NotSpecified, null, this.blockIfQueueFullPropertyValue);
542+
=> ProduceImpl(topic, val, valOffset, valLength, key, keyOffset, keyLength, Timestamp.Default, Partition.Any, null, this.blockIfQueueFullPropertyValue);
524543

525544
/// <include file='include_docs_producer.xml' path='API/Member[@name="ProduceAsync_Obsolete"]/*' />
526545
[Obsolete("The Producer API has been revised and this overload of ProduceAsync has been depreciated. Please use another variant of ProduceAsync.")]
@@ -535,17 +554,17 @@ public Task<Message> ProduceAsync(string topic, byte[] key, int keyOffset, int k
535554
/// <include file='include_docs_producer.xml' path='API/Member[@name="ProduceAsync_Obsolete"]/*' />
536555
[Obsolete("Variants of ProduceAsync that include a blockIfQueueFull parameter are depreciated - use the dotnet.producer.block.if.queue.full configuration property instead.")]
537556
public Task<Message> ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, bool blockIfQueueFull)
538-
=> ProduceImpl(topic, val, valOffset, valLength, key, keyOffset, keyLength, Timestamp.Default, Partition.NotSpecified, null, blockIfQueueFull);
557+
=> ProduceImpl(topic, val, valOffset, valLength, key, keyOffset, keyLength, Timestamp.Default, Partition.Any, null, blockIfQueueFull);
539558

540559
/// <include file='include_docs_producer.xml' path='API/Member[@name="ProduceAsync_Obsolete"]/*' />
541560
[Obsolete("Variants of ProduceAsync that include a IDeliveryHandler parameter are depreciated - use a variant of Produce instead.")]
542561
public void ProduceAsync(string topic, byte[] key, byte[] val, IDeliveryHandler deliveryHandler)
543-
=> ProduceImpl(topic, val, 0, val?.Length ?? 0, key, 0, key?.Length ?? 0, Timestamp.Default, Partition.NotSpecified, null, this.blockIfQueueFullPropertyValue, deliveryHandler);
562+
=> ProduceImpl(topic, val, 0, val?.Length ?? 0, key, 0, key?.Length ?? 0, Timestamp.Default, Partition.Any, null, this.blockIfQueueFullPropertyValue, deliveryHandler);
544563

545564
/// <include file='include_docs_producer.xml' path='API/Member[@name="ProduceAsync_Obsolete"]/*' />
546565
[Obsolete("Variants of ProduceAsync that include a IDeliveryHandler parameter are depreciated - use a variant of Produce instead.")]
547566
public void ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, IDeliveryHandler deliveryHandler)
548-
=> ProduceImpl(topic, val, valOffset, valLength, key, keyOffset, keyLength, Timestamp.Default, Partition.NotSpecified, null, this.blockIfQueueFullPropertyValue, deliveryHandler);
567+
=> ProduceImpl(topic, val, valOffset, valLength, key, keyOffset, keyLength, Timestamp.Default, Partition.Any, null, this.blockIfQueueFullPropertyValue, deliveryHandler);
549568

550569
/// <include file='include_docs_producer.xml' path='API/Member[@name="ProduceAsync_Obsolete"]/*' />
551570
[Obsolete("Variants of ProduceAsync that include a IDeliveryHandler parameter are depreciated - use a variant of Produce instead.")]
@@ -564,7 +583,7 @@ public void ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength,
564583
"Variants of ProduceAsync that include a IDeliveryHandler parameter are depreciated - use a variant of Produce instead. " +
565584
"Variants of ProduceAsync that include a blockIfQueueFull parameter are depreciated - use the dotnet.producer.block.if.queue.full configuration property instead.")]
566585
public void ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, bool blockIfQueueFull, IDeliveryHandler deliveryHandler)
567-
=> ProduceImpl(topic, val, valOffset, valLength, key, keyOffset, keyLength, Timestamp.Default, Partition.NotSpecified, null, blockIfQueueFull, deliveryHandler);
586+
=> ProduceImpl(topic, val, valOffset, valLength, key, keyOffset, keyLength, Timestamp.Default, Partition.Any, null, blockIfQueueFull, deliveryHandler);
568587

569588
#endregion
570589

0 commit comments

Comments
 (0)