Skip to content

Commit 0658124

Browse files
authored
Made message convenience properties on ConsumeResult obsolete (confluentinc#1215)
1 parent f45e6a3 commit 0658124

File tree

20 files changed

+57
-54
lines changed

20 files changed

+57
-54
lines changed

examples/AdminClient/Program.cs

-3
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717
// Refer to LICENSE for more information.
1818

1919
using System;
20-
using System.Collections.Generic;
21-
using System.Linq;
2220
using System.Threading.Tasks;
23-
using Confluent.Kafka;
2421
using Confluent.Kafka.Admin;
2522

2623

examples/AvroBlogExamples/Program.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ static void ConsumeSpecific(string bootstrapServers, string schemaRegistryUrl)
123123

124124
Console.WriteLine(
125125
consumeResult.Message.Timestamp.UtcDateTime.ToString("yyyy-MM-dd HH:mm:ss")
126-
+ $": [{consumeResult.Value.Severity}] {consumeResult.Value.Message}");
126+
+ $": [{consumeResult.Message.Value.Severity}] {consumeResult.Message.Value.Message}");
127127
}
128128
catch (ConsumeException e)
129129
{

examples/AvroGeneric/Program.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ static async Task Main(string[] args)
7676
{
7777
var consumeResult = consumer.Consume(cts.Token);
7878

79-
Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");
79+
Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Message.Value}");
8080
}
8181
catch (ConsumeException e)
8282
{

examples/AvroSpecific/Program.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ static async Task Main(string[] args)
8484
try
8585
{
8686
var consumeResult = consumer.Consume(cts.Token);
87-
Console.WriteLine($"user name: {consumeResult.Message.Key}, favorite color: {consumeResult.Value.favorite_color}, hourly_rate: {consumeResult.Value.hourly_rate}");
87+
Console.WriteLine($"user name: {consumeResult.Message.Key}, favorite color: {consumeResult.Message.Value.favorite_color}, hourly_rate: {consumeResult.Message.Value.hourly_rate}");
8888
}
8989
catch (ConsumeException e)
9090
{

examples/ConfluentCloud/Program.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ static void Main(string[] args)
7979
try
8080
{
8181
var consumeResult = consumer.Consume();
82-
Console.WriteLine($"consumed: {consumeResult.Value}");
82+
Console.WriteLine($"consumed: {consumeResult.Message.Value}");
8383
}
8484
catch (ConsumeException e)
8585
{

examples/Consumer/Program.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public static void Run_Consume(string brokerList, List<string> topics, Cancellat
9292
continue;
9393
}
9494

95-
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Value}");
95+
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");
9696

9797
if (consumeResult.Offset % commitPeriod == 0)
9898
{
@@ -163,7 +163,7 @@ public static void Run_ManualAssign(string brokerList, List<string> topics, Canc
163163
// Note: End of partition notification has not been enabled, so
164164
// it is guaranteed that the ConsumeResult instance corresponds
165165
// to a Message, and not a PartitionEOF event.
166-
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: ${consumeResult.Value}");
166+
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: ${consumeResult.Message.Value}");
167167
}
168168
catch (ConsumeException e)
169169
{

examples/Protobuf/Program.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ static async Task Main(string[] args)
8181
try
8282
{
8383
var consumeResult = consumer.Consume(cts.Token);
84-
Console.WriteLine($"user name: {consumeResult.Message.Key}, favorite color: {consumeResult.Value.FavoriteColor}");
84+
Console.WriteLine($"user name: {consumeResult.Message.Key}, favorite color: {consumeResult.Message.Value.FavoriteColor}");
8585
}
8686
catch (ConsumeException e)
8787
{

examples/Transactions/Program.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ static void Processor_MapWords(string brokerList, string clientId, CancellationT
323323

324324
producerState[cr.TopicPartition].Offset = cr.Offset;
325325

326-
var words = Regex.Split(cr.Value.ToLower(), @"[^a-zA-Z_]").Where(s => s != String.Empty);
326+
var words = Regex.Split(cr.Message.Value.ToLower(), @"[^a-zA-Z_]").Where(s => s != String.Empty);
327327
foreach (var w in words)
328328
{
329329
while (true)
@@ -444,7 +444,7 @@ public static void LoadCountState(RocksDb db, string brokerList, IEnumerable<Par
444444
else
445445
{
446446
msgCount += 1;
447-
db.Put(Encoding.UTF8.GetBytes(cr.Key), BitConverter.GetBytes(cr.Value), columnFamily);
447+
db.Put(Encoding.UTF8.GetBytes(cr.Message.Key), BitConverter.GetBytes(cr.Message.Value), columnFamily);
448448
}
449449
}
450450
}
@@ -550,7 +550,7 @@ public static void Processor_AggregateWords(string brokerList, string clientId,
550550
var cr = consumer.Consume(ct);
551551
producerState[cr.TopicPartition].Offset = cr.Offset;
552552

553-
var kBytes = Encoding.UTF8.GetBytes(cr.Key);
553+
var kBytes = Encoding.UTF8.GetBytes(cr.Message.Key);
554554
var vBytes = db.Get(kBytes, columnFamily);
555555
var v = vBytes == null ? 0 : BitConverter.ToInt32(vBytes);
556556
var updatedV = v+1;
@@ -562,7 +562,7 @@ public static void Processor_AggregateWords(string brokerList, string clientId,
562562
try
563563
{
564564
producerState[cr.TopicPartition].Producer.Produce(
565-
Topic_Counts, new Message<string, int> { Key = cr.Key, Value = updatedV });
565+
Topic_Counts, new Message<string, int> { Key = cr.Message.Key, Value = updatedV });
566566
}
567567
catch (KafkaException e)
568568
{

src/Confluent.Kafka/ConsumeResult.cs

+6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
//
1717
// Refer to LICENSE for more information.
1818

19+
using System;
20+
1921

2022
namespace Confluent.Kafka
2123
{
@@ -71,6 +73,7 @@ public TopicPartitionOffset TopicPartitionOffset
7173
/// <summary>
7274
/// The Kafka message Key.
7375
/// </summary>
76+
[Obsolete("Please access the message Key via .Message.Key.")]
7477
public TKey Key
7578
{
7679
get
@@ -87,6 +90,7 @@ public TKey Key
8790
/// <summary>
8891
/// The Kafka message Value.
8992
/// </summary>
93+
[Obsolete("Please access the message Value via .Message.Value.")]
9094
public TValue Value
9195
{
9296
get
@@ -103,6 +107,7 @@ public TValue Value
103107
/// <summary>
104108
/// The Kafka message timestamp.
105109
/// </summary>
110+
[Obsolete("Please access the message Timestamp via .Message.Timestamp.")]
106111
public Timestamp Timestamp
107112
{
108113
get
@@ -119,6 +124,7 @@ public Timestamp Timestamp
119124
/// <summary>
120125
/// The Kafka message headers.
121126
/// </summary>
127+
[Obsolete("Please access the message Headers via .Message.Headers.")]
122128
public Headers Headers
123129
{
124130
get

test/Confluent.Kafka.IntegrationTests/Tests/AssignOverloads.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public void AssignOverloads(string bootstrapServers)
6363
consumer.Assign(new List<TopicPartitionOffset>() { new TopicPartitionOffset(dr.TopicPartition, dr.Offset) });
6464
var cr = consumer.Consume(TimeSpan.FromSeconds(10));
6565
consumer.Commit();
66-
Assert.Equal(cr.Value, testString);
66+
Assert.Equal(cr.Message.Value, testString);
6767

6868
// Determine offset to consume from automatically.
6969
consumer.Assign(new List<TopicPartition>() { dr.TopicPartition });
@@ -76,7 +76,7 @@ public void AssignOverloads(string bootstrapServers)
7676
consumer.Assign(new TopicPartitionOffset(dr.TopicPartition, dr3.Offset));
7777
cr = consumer.Consume(TimeSpan.FromSeconds(10));
7878
consumer.Commit();
79-
Assert.Equal(cr.Value, testString3);
79+
Assert.Equal(cr.Message.Value, testString3);
8080

8181
// Determine offset to consume from automatically.
8282
consumer.Assign(dr.TopicPartition);

test/Confluent.Kafka.IntegrationTests/Tests/Builder_CustomDefaults.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -123,17 +123,17 @@ public void ProducerBuilder(string bootstrapServers)
123123
{
124124
c.Assign(dr.TopicPartitionOffset);
125125
var cr = c.Consume(TimeSpan.FromSeconds(10));
126-
Assert.Equal("abc", cr.Key);
127-
Assert.Equal("123", cr.Value);
126+
Assert.Equal("abc", cr.Message.Key);
127+
Assert.Equal("123", cr.Message.Value);
128128
}
129129

130130
using (var c = new ConsumerBuilder<byte[], byte[]>(consumerConfig).Build())
131131
{
132132
c.Assign(dr.TopicPartitionOffset);
133133
var cr = c.Consume(TimeSpan.FromSeconds(10));
134134
// check that each character is serialized into 4 bytes.
135-
Assert.Equal(3*4, cr.Key.Length);
136-
Assert.Equal(3*4, cr.Value.Length);
135+
Assert.Equal(3*4, cr.Message.Key.Length);
136+
Assert.Equal(3*4, cr.Message.Value.Length);
137137
}
138138

139139
Assert.Equal(0, Library.HandleCount);

test/Confluent.Kafka.IntegrationTests/Tests/Consumer_DisableHeaders.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ public void Consumer_DisableHeaders(string bootstrapServers)
6464
var record = consumer.Consume(TimeSpan.FromSeconds(10));
6565
Assert.NotNull(record.Message);
6666
Assert.Null(record.Message.Headers);
67-
Assert.NotEqual(TimestampType.NotAvailable, record.Timestamp.Type);
68-
Assert.NotEqual(0, record.Timestamp.UnixTimestampMs);
67+
Assert.NotEqual(TimestampType.NotAvailable, record.Message.Timestamp.Type);
68+
Assert.NotEqual(0, record.Message.Timestamp.UnixTimestampMs);
6969
}
7070

7171
Assert.Equal(0, Library.HandleCount);

test/Confluent.Kafka.IntegrationTests/Tests/Consumer_DisableTimestamps.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ public void Consumer_DisableTimestamps(string bootstrapServers)
6464
var record = consumer.Consume(TimeSpan.FromSeconds(10));
6565
Assert.NotNull(record.Message);
6666
Assert.NotNull(record.Message.Headers);
67-
Assert.Equal(TimestampType.NotAvailable, record.Timestamp.Type);
68-
Assert.Equal(0, record.Timestamp.UnixTimestampMs);
67+
Assert.Equal(TimestampType.NotAvailable, record.Message.Timestamp.Type);
68+
Assert.Equal(0, record.Message.Timestamp.UnixTimestampMs);
6969
}
7070

7171
Assert.Equal(0, Library.HandleCount);

test/Confluent.Kafka.IntegrationTests/Tests/Headers.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ public void MessageHeaderProduceConsume(string bootstrapServers)
318318
consumer.Assign(new TopicPartitionOffset(singlePartitionTopic, 0, nulldr.Offset));
319319
var cr = consumer.Consume(TimeSpan.FromSeconds(10));
320320
Assert.NotNull(cr?.Message);
321-
Assert.Single(cr.Headers);
321+
Assert.Single(cr.Message.Headers);
322322
Assert.Equal("my-header", cr.Message.Headers[0].Key);
323323
Assert.Null(cr.Message.Headers[0].GetValueBytes());
324324
}

test/Confluent.Kafka.IntegrationTests/Tests/Ignore.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,10 @@ record = consumer.Consume(TimeSpan.FromSeconds(10));
7373

7474
ConsumeResult<Ignore, byte[]> record = consumer.Consume(TimeSpan.FromSeconds(10));
7575
Assert.NotNull(record.Message);
76-
Assert.Null(record.Key);
77-
Assert.NotNull(record.Value);
78-
Assert.Equal(42, record.Value[0]);
79-
Assert.Equal(240, record.Value[1]);
76+
Assert.Null(record.Message.Key);
77+
Assert.NotNull(record.Message.Value);
78+
Assert.Equal(42, record.Message.Value[0]);
79+
Assert.Equal(240, record.Message.Value[1]);
8080
}
8181

8282
Assert.Equal(0, Library.HandleCount);

test/Confluent.Kafka.IntegrationTests/Tests/Producer_Handles.cs

+14-14
Original file line numberDiff line numberDiff line change
@@ -87,62 +87,62 @@ public void Producer_Handles(string bootstrapServers)
8787
{
8888
consumer.Assign(new TopicPartitionOffset(topic.Name, 0, 0));
8989
var r1 = consumer.Consume(TimeSpan.FromSeconds(10));
90-
Assert.Equal(new byte[] { 42 }, r1.Key);
91-
Assert.Equal(new byte[] { 33 }, r1.Value);
90+
Assert.Equal(new byte[] { 42 }, r1.Message.Key);
91+
Assert.Equal(new byte[] { 33 }, r1.Message.Value);
9292
Assert.Equal(0, r1.Offset);
9393
}
9494

9595
using (var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build())
9696
{
9797
consumer.Assign(new TopicPartitionOffset(topic.Name, 0, 1));
9898
var r2 = consumer.Consume(TimeSpan.FromSeconds(10));
99-
Assert.Equal("hello", r2.Key);
100-
Assert.Equal("world", r2.Value);
99+
Assert.Equal("hello", r2.Message.Key);
100+
Assert.Equal("world", r2.Message.Value);
101101
Assert.Equal(1, r2.Offset);
102102
}
103103

104104
using (var consumer = new ConsumerBuilder<byte[], byte[]>(consumerConfig).Build())
105105
{
106106
consumer.Assign(new TopicPartitionOffset(topic.Name, 0, 2));
107107
var r3 = consumer.Consume(TimeSpan.FromSeconds(10));
108-
Assert.Equal(new byte[] { 40 }, r3.Key);
109-
Assert.Equal(new byte[] { 31 }, r3.Value);
108+
Assert.Equal(new byte[] { 40 }, r3.Message.Key);
109+
Assert.Equal(new byte[] { 31 }, r3.Message.Value);
110110
Assert.Equal(2, r3.Offset);
111111
}
112112

113113
using (var consumer = new ConsumerBuilder<int, string>(consumerConfig).Build())
114114
{
115115
consumer.Assign(new TopicPartitionOffset(topic.Name, 0, 3));
116116
var r4 = consumer.Consume(TimeSpan.FromSeconds(10));
117-
Assert.Equal(42, r4.Key);
118-
Assert.Equal("mellow world", r4.Value);
117+
Assert.Equal(42, r4.Message.Key);
118+
Assert.Equal("mellow world", r4.Message.Value);
119119
Assert.Equal(3, r4.Offset);
120120
}
121121

122122
using (var consumer = new ConsumerBuilder<int, int>(consumerConfig).Build())
123123
{
124124
consumer.Assign(new TopicPartitionOffset(topic.Name, 0, 4));
125125
var r5 = consumer.Consume(TimeSpan.FromSeconds(10));
126-
Assert.Equal(int.MaxValue, r5.Key);
127-
Assert.Equal(int.MinValue, r5.Value);
126+
Assert.Equal(int.MaxValue, r5.Message.Key);
127+
Assert.Equal(int.MinValue, r5.Message.Value);
128128
Assert.Equal(4, r5.Offset);
129129
}
130130

131131
using (var consumer = new ConsumerBuilder<string, byte[]>(consumerConfig).Build())
132132
{
133133
consumer.Assign(new TopicPartitionOffset(topic.Name, 0, 5));
134134
var r6 = consumer.Consume(TimeSpan.FromSeconds(10));
135-
Assert.Equal("yellow mould", r6.Key);
136-
Assert.Equal(new byte[] { 69 }, r6.Value);
135+
Assert.Equal("yellow mould", r6.Message.Key);
136+
Assert.Equal(new byte[] { 69 }, r6.Message.Value);
137137
Assert.Equal(5, r6.Offset);
138138
}
139139

140140
using (var consumer = new ConsumerBuilder<double, double>(consumerConfig).Build())
141141
{
142142
consumer.Assign(new TopicPartitionOffset(topic.Name, 0, 6));
143143
var r7 = consumer.Consume(TimeSpan.FromSeconds(10));
144-
Assert.Equal(44.0, r7.Key);
145-
Assert.Equal(234.4, r7.Value);
144+
Assert.Equal(44.0, r7.Message.Key);
145+
Assert.Equal(234.4, r7.Message.Value);
146146
Assert.Equal(6, r7.Offset);
147147
}
148148
}

test/Confluent.Kafka.Transactions/TestConsumer.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ public void Run()
3939
{
4040
var cr = consumer.Consume();
4141

42-
if (!lasts.ContainsKey(cr.Key)) { lasts.Add(cr.Key, -1); }
43-
if (cr.Value == lasts[cr.Key] + 1) { Console.Write("."); }
44-
else { Console.Write($"[producer {cr.Key} expected seq {lasts[cr.Key]+1} but got {cr.Value}]"); break; }
42+
if (!lasts.ContainsKey(cr.Message.Key)) { lasts.Add(cr.Message.Key, -1); }
43+
if (cr.Message.Value == lasts[cr.Message.Key] + 1) { Console.Write("."); }
44+
else { Console.Write($"[producer {cr.Message.Key} expected seq {lasts[cr.Message.Key]+1} but got {cr.Message.Value}]"); break; }
4545
Console.Out.Flush();
46-
lasts[cr.Key] = cr.Value;
46+
lasts[cr.Message.Key] = cr.Message.Value;
4747
}
4848
}
4949
catch (Exception e)

test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Tests_Avro/AvroAndRegular.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ public static void AvoAndRegular(string bootstrapServers, string schemaRegistryS
112112
{
113113
consumer.Assign(new TopicPartitionOffset(topic1.Name, 0, 0));
114114
var cr = consumer.Consume();
115-
Assert.Equal("hello", cr.Key);
116-
Assert.Equal("world", cr.Value);
115+
Assert.Equal("hello", cr.Message.Key);
116+
Assert.Equal("world", cr.Message.Value);
117117
}
118118

119119
using (var consumer =
@@ -123,8 +123,8 @@ public static void AvoAndRegular(string bootstrapServers, string schemaRegistryS
123123
{
124124
consumer.Assign(new TopicPartitionOffset(topic2.Name, 0, 0));
125125
var cr = consumer.Consume();
126-
Assert.Equal("hello", cr.Key);
127-
Assert.Equal("world", cr.Value);
126+
Assert.Equal("hello", cr.Message.Key);
127+
Assert.Equal("world", cr.Message.Value);
128128
}
129129

130130
using (var consumer =

test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Tests_Avro/PrimitiveTypes.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -244,8 +244,8 @@ public static void PrimitiveTypes(string bootstrapServers, string schemaRegistry
244244
{
245245
consumer.Assign(new List<TopicPartitionOffset> { new TopicPartitionOffset(nullTopic, 0, 0) });
246246
var result8 = consumer.Consume(TimeSpan.FromSeconds(10));
247-
Assert.Null(result8.Key);
248-
Assert.Null(result8.Value);
247+
Assert.Null(result8.Message.Key);
248+
Assert.Null(result8.Message.Value);
249249
}
250250
}
251251
}

test/Confluent.SchemaRegistry.Serdes.UnitTests/Configuration.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public ConfigurationTests()
3333
{
3434
testTopic = "topic";
3535
var schemaRegistryMock = new Mock<ISchemaRegistryClient>();
36-
schemaRegistryMock.Setup(x => x.RegisterSchemaAsync(testTopic + "-value", It.IsAny<string>())).ReturnsAsync(
36+
schemaRegistryMock.Setup(x => x.RegisterSchemaAsync(testTopic + "-value", It.IsAny<Schema>())).ReturnsAsync(
3737
(string topic, string schema) => store.TryGetValue(schema, out int id) ? id : store[schema] = store.Count + 1
3838
);
3939
schemaRegistryMock.Setup(x => x.GetSchemaAsync(It.IsAny<int>(), It.IsAny<string>())).ReturnsAsync(

0 commit comments

Comments
 (0)