|
| 1 | += Spring Boot with Java Example Confluent Cloud Client |
| 2 | +:experimental: |
| 3 | + |
| 4 | +NOTE: Produce messages to and consume messages from https://www.confluent.io/confluent-cloud/?utm_source=github&utm_medium=demo&utm_campaign=ch.examples_type.community_content.clients-ccloud[Confluent Cloud] using the Spring Boot using Java API. |
| 5 | + |
| 6 | +== Prerequisites |
| 7 | + |
| 8 | +* Java 1.8 |
| 9 | +* Create a local file (e.g. at `$HOME/.ccloud/java.config`) with configuration parameters to connect to your Kafka cluster, which can be on your local host, https://www.confluent.io/confluent-cloud/?utm_source=github&utm_medium=demo&utm_campaign=ch.examples_type.community_content.clients-ccloud[Confluent Cloud], or any other cluster. |
| 10 | +Follow https://github.com/confluentinc/configuration-templates/tree/master/README.md[these detailed instructions] to properly create this file. |
| 11 | +* If you are running on Confluent Cloud, you must have access to a link:https://www.confluent.io/confluent-cloud/?utm_source=github&utm_medium=demo&utm_campaign=ch.examples_type.community_content.clients-ccloud[Confluent Cloud] cluster. |
| 12 | + |
| 13 | +[source] |
| 14 | +.Confluent Cloud config file example |
| 15 | +---- |
| 16 | +$ cat $HOME/.ccloud/java.config |
| 17 | +bootstrap.servers=<BROKER ENDPOINT> |
| 18 | +ssl.endpoint.identification.algorithm=https |
| 19 | +security.protocol=SASL_SSL |
| 20 | +sasl.mechanism=PLAIN |
| 21 | +sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<API KEY>" password\="<API SECRET>"; |
| 22 | +schema.registry.url=<SR ENDPOINT> |
| 23 | +basic.auth.credentials.source=USER_INFO |
| 24 | +schema.registry.basic.auth.user.info=<SR_KEY:SR_PASSWORD> |
| 25 | +---- |
| 26 | + |
| 27 | +== Example 1: Producing and Consuming Avro messages |
| 28 | + |
| 29 | +This example is uses values that formatted as Avro and integrates with the Confluent Cloud Schema Registry. |
| 30 | +Before using Confluent Cloud Schema Registry, check its https://docs.confluent.io/current/cloud/limits.html?utm_source=github&utm_medium=demo&utm_campaign=ch.examples_type.community_content.clients-ccloud[availability and limits]. |
| 31 | +Note that your VPC must be able to connect to the Confluent Cloud Schema Registry public internet endpoint. |
| 32 | + |
| 33 | +. As described in the https://docs.confluent.io/current/quickstart/cloud-quickstart/schema-registry.html?utm_source=github&utm_medium=demo&utm_campaign=ch.examples_type.community_content.clients-ccloud[Confluent Cloud quickstart], in the Confluent Cloud GUI, enable Confluent Cloud Schema Registry and create an API key and secret to connect to it. |
| 34 | + |
| 35 | +. Verify your Confluent Cloud Schema Registry credentials work from your host. |
| 36 | +In the output below, substitute your values for `<SR API KEY>`, `<SR API SECRET>`, and `<SR ENDPOINT>`. |
| 37 | ++ |
| 38 | +[source,shell] |
| 39 | +.View the list of registered subjects |
| 40 | +---- |
| 41 | +$ curl -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects |
| 42 | +---- |
| 43 | + |
| 44 | +This Spring Boot application has two components - Producer (`ProducerExample.java`) and Consumer (`ConsumerExample.java`). |
| 45 | +Both components will be initialized during the Spring Boot application startup. |
| 46 | +The producer writes Kafka data to a topic in Confluent Cloud. |
| 47 | +Each record has a String key representing a username (e.g. `alice`) and a value of a count, formatted as Avro object |
| 48 | + |
| 49 | +[source,json] |
| 50 | +---- |
| 51 | +{"namespace": "io.confluent.examples.clients.cloud", |
| 52 | + "type": "record", |
| 53 | + "name": "DataRecordAvro", |
| 54 | + "fields": [ |
| 55 | + {"name": "count", "type": "long"} |
| 56 | + ] |
| 57 | +} |
| 58 | +---- |
| 59 | + |
| 60 | +.Run the Kafka Producer / Consumer application. |
| 61 | +[source,shell] |
| 62 | +---- |
| 63 | +./startProducerConsumer.sh |
| 64 | +---- |
| 65 | + |
| 66 | +This command will build jar and executes `spring-kafka` powered producer and consumer. |
| 67 | +The consumer reads the same topic from Confluent Cloud and prints data to the console. |
| 68 | + |
| 69 | +You should see following in the console: |
| 70 | + |
| 71 | +[source,shell] |
| 72 | +---- |
| 73 | + ... |
| 74 | + 2020-02-13 14:41:57.924 INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample : Produced record to topic test partition 3 @ offset 20 |
| 75 | +2020-02-13 14:41:57.927 INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample : Produced record to topic test partition 3 @ offset 21 |
| 76 | +2020-02-13 14:41:57.927 INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample : Produced record to topic test partition 3 @ offset 22 |
| 77 | +2020-02-13 14:41:57.927 INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample : Produced record to topic test partition 3 @ offset 23 |
| 78 | +2020-02-13 14:41:57.928 INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample : Produced record to topic test partition 3 @ offset 24 |
| 79 | +2020-02-13 14:41:57.928 INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample : Produced record to topic test partition 3 @ offset 25 |
| 80 | +2020-02-13 14:41:57.928 INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample : Produced record to topic test partition 3 @ offset 26 |
| 81 | +2020-02-13 14:41:57.929 INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample : Produced record to topic test partition 3 @ offset 27 |
| 82 | +2020-02-13 14:41:57.929 INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample : Produced record to topic test partition 3 @ offset 28 |
| 83 | +2020-02-13 14:41:57.930 INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample : Produced record to topic test partition 3 @ offset 29 |
| 84 | + 10 messages were produced to topic test |
| 85 | + ... |
| 86 | +---- |
| 87 | + |
| 88 | +=== Verify that the consumer received all the messages: |
| 89 | + |
| 90 | +.You should see: |
| 91 | +---- |
| 92 | + ... |
| 93 | + 2020-02-13 14:41:58.248 INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample : received alice {"count": 0} |
| 94 | +2020-02-13 14:41:58.248 INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample : received alice {"count": 1} |
| 95 | +2020-02-13 14:41:58.248 INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample : received alice {"count": 2} |
| 96 | +2020-02-13 14:41:58.248 INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample : received alice {"count": 3} |
| 97 | +2020-02-13 14:41:58.249 INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample : received alice {"count": 4} |
| 98 | +2020-02-13 14:41:58.249 INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample : received alice {"count": 5} |
| 99 | +2020-02-13 14:41:58.249 INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample : received alice {"count": 6} |
| 100 | +2020-02-13 14:41:58.249 INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample : received alice {"count": 7} |
| 101 | +2020-02-13 14:41:58.249 INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample : received alice {"count": 8} |
| 102 | +2020-02-13 14:41:58.249 INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample : received alice {"count": 9} |
| 103 | +---- |
| 104 | + |
| 105 | +NOTE: When you are done, press kbd:[Ctrl + c]. |
| 106 | + |
| 107 | +== Example 2: Kafka Streams with Spring Boot |
| 108 | + |
| 109 | +The Kafka Streams API reads the same topic from Confluent Cloud and does a stateful sum aggregation, also a rolling sum of the counts as it processes each record. |
| 110 | + |
| 111 | +.Run the Kafka Streams application. |
| 112 | +[source,shell] |
| 113 | +---- |
| 114 | +./startStreams.sh |
| 115 | +---- |
| 116 | + |
| 117 | +.You should see: |
| 118 | +---- |
| 119 | + ... |
| 120 | + [Consumed record]: alice, 0 |
| 121 | + [Consumed record]: alice, 1 |
| 122 | + [Consumed record]: alice, 2 |
| 123 | + [Consumed record]: alice, 3 |
| 124 | + [Consumed record]: alice, 4 |
| 125 | + [Consumed record]: alice, 5 |
| 126 | + [Consumed record]: alice, 6 |
| 127 | + [Consumed record]: alice, 7 |
| 128 | + [Consumed record]: alice, 8 |
| 129 | + [Consumed record]: alice, 9 |
| 130 | + ... |
| 131 | + [Running count]: alice, 0 |
| 132 | + [Running count]: alice, 1 |
| 133 | + [Running count]: alice, 3 |
| 134 | + [Running count]: alice, 6 |
| 135 | + [Running count]: alice, 10 |
| 136 | + [Running count]: alice, 15 |
| 137 | + [Running count]: alice, 21 |
| 138 | + [Running count]: alice, 28 |
| 139 | + [Running count]: alice, 36 |
| 140 | + [Running count]: alice, 45 |
| 141 | + ... |
| 142 | +---- |
| 143 | + |
| 144 | +NOTE: When you are done, press kbd:[Ctrl + c]. |
0 commit comments