Skip to content

Commit e89c094

Browse files
committed
[BAEL-11415] - Initial commit with sparing-kafka version and topic creation configuration onstatup
1 parent e4dd6e0 commit e89c094

File tree

4 files changed

+92
-2
lines changed

4 files changed

+92
-2
lines changed

spring-kafka/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
</dependencies>
3434

3535
<properties>
36-
<spring-kafka.version>1.1.3.RELEASE</spring-kafka.version>
36+
<spring-kafka.version>2.2.2.RELEASE</spring-kafka.version>
3737
<jackson.version>2.9.7</jackson.version>
3838
</properties>
3939

spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@
1313
import org.springframework.kafka.annotation.TopicPartition;
1414
import org.springframework.kafka.core.KafkaTemplate;
1515
import org.springframework.kafka.support.KafkaHeaders;
16+
import org.springframework.kafka.support.SendResult;
1617
import org.springframework.messaging.handler.annotation.Header;
1718
import org.springframework.messaging.handler.annotation.Payload;
19+
import org.springframework.util.concurrent.ListenableFuture;
20+
import org.springframework.util.concurrent.ListenableFutureCallback;
1821

1922
@SpringBootApplication
2023
public class KafkaApplication {
@@ -98,7 +101,20 @@ public static class MessageProducer {
98101
private String greetingTopicName;
99102

100103
public void sendMessage(String message) {
101-
kafkaTemplate.send(topicName, message);
104+
105+
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
106+
107+
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
108+
109+
@Override
110+
public void onSuccess(SendResult<String, String> result) {
111+
System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]");
112+
}
113+
@Override
114+
public void onFailure(Throwable ex) {
115+
System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage());
116+
}
117+
});
102118
}
103119

104120
public void sendMessageToPartion(String message, int partition) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package com.baeldung.spring.kafka;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
6+
import org.apache.kafka.clients.admin.AdminClientConfig;
7+
import org.apache.kafka.clients.admin.NewTopic;
8+
import org.springframework.beans.factory.annotation.Value;
9+
import org.springframework.context.annotation.Bean;
10+
import org.springframework.context.annotation.Configuration;
11+
import org.springframework.kafka.core.KafkaAdmin;
12+
13+
@Configuration
14+
public class KafkaTopicConfig {
15+
16+
@Value(value = "${kafka.bootstrapAddress}")
17+
private String bootstrapAddress;
18+
19+
@Value(value = "${message.topic.name}")
20+
private String topicName;
21+
22+
@Value(value = "${partitioned.topic.name}")
23+
private String partionedTopicName;
24+
25+
@Value(value = "${filtered.topic.name}")
26+
private String filteredTopicName;
27+
28+
@Value(value = "${greeting.topic.name}")
29+
private String greetingTopicName;
30+
31+
@Bean
32+
public KafkaAdmin kafkaAdmin() {
33+
Map<String, Object> configs = new HashMap<>();
34+
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
35+
return new KafkaAdmin(configs);
36+
}
37+
38+
@Bean
39+
public NewTopic topic1() {
40+
return new NewTopic(topicName, 1, (short) 1);
41+
}
42+
43+
@Bean
44+
public NewTopic topic2() {
45+
return new NewTopic(partionedTopicName, 6, (short) 1);
46+
}
47+
48+
@Bean
49+
public NewTopic topic3() {
50+
return new NewTopic(filteredTopicName, 1, (short) 1);
51+
}
52+
53+
@Bean
54+
public NewTopic topic4() {
55+
return new NewTopic(greetingTopicName, 1, (short) 1);
56+
}
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package org.baeldung;
2+
3+
import org.junit.Test;
4+
import org.junit.runner.RunWith;
5+
import org.springframework.boot.test.context.SpringBootTest;
6+
import org.springframework.test.context.junit4.SpringRunner;
7+
8+
import com.baeldung.spring.kafka.KafkaApplication;
9+
10+
@RunWith(SpringRunner.class)
11+
@SpringBootTest(classes = KafkaApplication.class)
12+
public class SpringContextLiveTest {
13+
14+
@Test
15+
public void whenSpringContextIsBootstrapped_thenNoExceptions() {
16+
}
17+
}

0 commit comments

Comments
 (0)