1
+ /*
2
+ ** Kafka Connect for TxEventQ.
3
+ **
4
+ ** Copyright (c) 2024, 2025 Oracle and/or its affiliates.
5
+ ** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
6
+ */
7
+
8
+ /*
9
+ * Licensed to the Apache Software Foundation (ASF) under one or more
10
+ * contributor license agreements. See the NOTICE file distributed with
11
+ * this work for additional information regarding copyright ownership.
12
+ * The ASF licenses this file to You under the Apache License, Version 2.0
13
+ * (the "License"); you may not use this file except in compliance with
14
+ * the License. You may obtain a copy of the License at
15
+ *
16
+ * http://www.apache.org/licenses/LICENSE-2.0
17
+ *
18
+ * Unless required by applicable law or agreed to in writing, software
19
+ * distributed under the License is distributed on an "AS IS" BASIS,
20
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21
+ * See the License for the specific language governing permissions and
22
+ * limitations under the License.
23
+ */
24
+
25
+ import java .util .Properties ;
26
+ import java .util .List ;
27
+ import java .util .UUID ;
28
+ import java .util .Iterator ;
29
+ import java .time .Duration ;
30
+ import org .apache .kafka .clients .consumer .KafkaConsumer ;
31
+ import org .apache .kafka .clients .consumer .ConsumerRecords ;
32
+ import org .apache .kafka .clients .consumer .ConsumerRecord ;
33
+ import org .json .JSONObject ;
34
+ import java .util .Base64 ;
35
+ import java .nio .charset .StandardCharsets ;
36
+
37
+ /*
38
+ * Pass in the name of the Kafka topic to consume from for the first argument.
39
+ * Pass in the number of messages to consume from the Kafka topic for the second argument.
40
+ */
41
+ public class ConsumerOfJmsSchema {
42
+ public static void main (String [] args ) {
43
+
44
+ String kafkaTopic = args [0 ];
45
+ int numOfMsgToConsume = Integer .valueOf (args [1 ]);
46
+ final int BLOCK_TIMEOUT_MS = 3000 ;
47
+ Properties props = new Properties ();
48
+ props .put ("bootstrap.servers" , "localhost:9092" );
49
+ props .put ("group.id" , UUID .randomUUID ().toString ());
50
+ props .put ("auto.offset.reset" ,"earliest" );
51
+
52
+ props .put ("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" );
53
+ props .put ("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" );
54
+ KafkaConsumer <String , String > consumer = new KafkaConsumer <String , String >(props );
55
+
56
+ consumer .subscribe (List .of (kafkaTopic ));
57
+ int msgCounter = 0 ;
58
+
59
+ while (msgCounter < numOfMsgToConsume )
60
+ {
61
+ ConsumerRecords <String , String > records = consumer .poll (Duration .ofMillis (BLOCK_TIMEOUT_MS ));
62
+ System .out .println ("" );
63
+
64
+ for (ConsumerRecord <String , String > record : records )
65
+ {
66
+ msgCounter ++;
67
+ JSONObject jsonObject = new JSONObject (record .value ());
68
+
69
+ System .out .println ("********Gets list of JSON keys for the schema.********" );
70
+
71
+ Iterator <?> keys = jsonObject .keys ();
72
+ while (keys .hasNext ()) {
73
+ String key = (String )keys .next ();
74
+ System .out .println ("JSON Keys:" + key );
75
+ }
76
+
77
+ System .out .println ("" );
78
+
79
+ String messageId = jsonObject .getString ("messageId" );
80
+ System .out .println ("messageId = " + messageId );
81
+
82
+ String messageType = jsonObject .getString ("messageType" );
83
+ System .out .println ("messageType = " + messageType );
84
+
85
+ if (!jsonObject .get ("correlationId" ).equals (null )) {
86
+ String correlationId = jsonObject .getString ("correlationId" );
87
+ System .out .println ("correlationId = " + correlationId );
88
+ }
89
+
90
+ // Gets the payloadBytes property and decodes the message.
91
+ if (!jsonObject .get ("payloadBytes" ).equals (null )) {
92
+ String payloadBytes = jsonObject .get ("payloadBytes" ).toString ();
93
+ System .out .println ("Undecoded payloadBytes: " + payloadBytes );
94
+ System .out .println ("Decoded payloadBytes: " + decodeKafkaMessage (payloadBytes ));
95
+ }
96
+
97
+ // Gets the payloadText message.
98
+ if (!jsonObject .get ("payloadText" ).equals (null )) {
99
+ String payloadText = jsonObject .get ("payloadText" ).toString ();
100
+ System .out .println ("payloadText: " + payloadText );
101
+ }
102
+
103
+ /**
104
+ * Gets the payloadMap message that is a Map of keys/values.
105
+ * If access to different property values are required follow the example
106
+ * below for the properties schema.
107
+ */
108
+ if (!jsonObject .get ("payloadMap" ).equals (null )) {
109
+ JSONObject payloadMapJsonObject = jsonObject .getJSONObject ("payloadMap" );
110
+ System .out .println ("payloadMap: " + payloadMapJsonObject );
111
+ }
112
+
113
+ System .out .println ("" );
114
+
115
+ /*********************************************************/
116
+ /* Gets the different values from the properties schema. */
117
+ /*********************************************************/
118
+
119
+ JSONObject propertiesJsonObject = jsonObject .getJSONObject ("properties" );
120
+ System .out .println ("********Gets the different values from the properties schema.********" );
121
+
122
+ // Since the propertyType for JMSXRcvTimestamp property is a long use the getLong to get the value.
123
+ JSONObject jmsXrcvTimestamp = propertiesJsonObject .getJSONObject ("JMSXRcvTimestamp" );
124
+ long jmsXrcvTimestampVal = jmsXrcvTimestamp .getLong ("long" );
125
+ System .out .println ("JMSXRcvTimestamp: " + jmsXrcvTimestampVal );
126
+
127
+ // Since the propertyType for JMSXDeliveryCount property is a integer use the getInt to get the value.
128
+ JSONObject jmsXdeliveryCount = propertiesJsonObject .getJSONObject ("JMSXDeliveryCount" );
129
+ int jmsXdeliveryCountVal = jmsXdeliveryCount .getInt ("integer" );
130
+ System .out .println ("JMSXDeliveryCount: " + jmsXdeliveryCountVal );
131
+
132
+ // Since the propertyType for JMSXState property is a integer use the getInt to get the value.
133
+ JSONObject jmsXstate = propertiesJsonObject .getJSONObject ("JMSXState" );
134
+ int jmsXstateVal = jmsXstate .getInt ("integer" );
135
+ System .out .println ("JMSXState: " + jmsXstateVal );
136
+
137
+ System .out .println ("" );
138
+
139
+ /*********************************************************/
140
+ /* Gets the different values from the destination schema. */
141
+ /*********************************************************/
142
+
143
+ JSONObject destinationJsonObject = jsonObject .getJSONObject ("destination" );
144
+ System .out .println ("********Gets the different values from the Destination schema.********" );
145
+ String destinationType = destinationJsonObject .getString ("type" );
146
+ System .out .println ("type= " + destinationType );
147
+
148
+ String destinationName = destinationJsonObject .getString ("name" );
149
+ System .out .println ("name= " + destinationName );
150
+
151
+ if (destinationType .equals ("topic" ) || destinationType .equals ("queue" )) {
152
+ String destinationOwner = destinationJsonObject .getString ("owner" );
153
+ System .out .println ("owner= " + destinationOwner );
154
+
155
+ String destinationCompleteName = destinationJsonObject .getString ("completeName" );
156
+ System .out .println ("completeName= " + destinationCompleteName );
157
+
158
+ String destinationCompleteTableName = destinationJsonObject .getString ("completeTableName" );
159
+ System .out .println ("completeTableName= " + destinationCompleteTableName );
160
+
161
+ }
162
+
163
+ System .out .println ("" );
164
+ }
165
+ }
166
+ }
167
+
168
+ // This method is used to decode the JMS Bytes message stored in the payloadBytes schema property.
169
+ public static String decodeKafkaMessage (String encodedMessage ) {
170
+ // Decode the Base64 encoded string
171
+ byte [] decodedBytes = Base64 .getDecoder ().decode (encodedMessage );
172
+
173
+ // Convert the byte array to a String (assuming UTF-8 encoding)
174
+ return new String (decodedBytes , StandardCharsets .UTF_8 );
175
+ }
176
+ }
0 commit comments