28
28
import org .apache .kafka .clients .producer .ProducerRecord ;
29
29
import org .apache .kafka .clients .producer .RecordMetadata ;
30
30
import org .apache .kafka .common .TopicPartition ;
31
- import org .apache . log4j . Level ;
32
- import org .apache . log4j . Logger ;
31
+ import org .slf4j . Logger ;
32
+ import org .slf4j . LoggerFactory ;
33
33
34
34
import javax .servlet .annotation .WebServlet ;
35
35
import javax .servlet .http .HttpServlet ;
52
52
@ WebServlet ("/KafkaServlet" )
53
53
public class KafkaServlet extends HttpServlet {
54
54
private static final long serialVersionUID = 1L ;
55
- private static final Logger logger = Logger .getLogger (KafkaServlet .class );
55
+ private static final Logger logger = LoggerFactory .getLogger (KafkaServlet .class );
56
56
57
57
private final String serverConfigDir = System .getProperty ("server.config.dir" );
58
58
private final String resourceDir = serverConfigDir + File .separator
@@ -73,9 +73,9 @@ public class KafkaServlet extends HttpServlet {
73
73
* Intialising the KafkaServlet
74
74
*/
75
75
public void init () {
76
- logger .log ( Level . WARN , "Initialising Kafka Servlet" );
77
- logger .log ( Level . WARN , "Server Config directory: " + serverConfigDir );
78
- logger .log ( Level . WARN , "Resource directory: " + resourceDir );
76
+ logger .warn ( "Initialising Kafka Servlet" );
77
+ logger .warn ( "Server Config directory: " + serverConfigDir );
78
+ logger .warn ( "Resource directory: " + resourceDir );
79
79
80
80
// Retrieve credentials from environment
81
81
EventStreamsCredentials credentials = Environment .getEventStreamsCredentials ();
@@ -93,7 +93,7 @@ public void init() {
93
93
restApi .post ("/admin/topics" , "{ \" name\" : \" " + topic + "\" }" , new int []{422 });
94
94
95
95
String topics = restApi .get ("/admin/topics" , false );
96
- logger .log ( Level . WARN , "Topics: " + topics );
96
+ logger .warn ( "Topics: " + topics );
97
97
98
98
// Initialize Kafka Producer
99
99
kafkaProducer = new KafkaProducer <>(getClientConfiguration (bootstrapServers , credentials .getApiKey (), true ));
@@ -172,12 +172,12 @@ private Properties getClientConfiguration(String broker, String apikey, boolean
172
172
}
173
173
174
174
try {
175
- logger .log ( Level . WARN , "Reading properties file from: " + fileName );
175
+ logger .warn ( "Reading properties file from: " + fileName );
176
176
propsStream = new FileInputStream (fileName );
177
177
props .load (propsStream );
178
178
propsStream .close ();
179
179
} catch (IOException e ) {
180
- logger .log ( Level . ERROR , e );
180
+ logger .error ( e . getMessage (), e );
181
181
return props ;
182
182
}
183
183
@@ -188,7 +188,7 @@ private Properties getClientConfiguration(String broker, String apikey, boolean
188
188
saslJaasConfig = saslJaasConfig .replace ("APIKEY" , apikey );
189
189
props .setProperty ("sasl.jaas.config" , saslJaasConfig );
190
190
191
- logger .log ( Level . WARN , "Using properties: " + props );
191
+ logger .warn ( "Using properties: " + props );
192
192
193
193
return props ;
194
194
}
@@ -199,7 +199,7 @@ private Properties getClientConfiguration(String broker, String apikey, boolean
199
199
* @param topic
200
200
*/
201
201
private void produce (String topic ) {
202
- logger .log ( Level . WARN , "Producer is starting." );
202
+ logger .warn ( "Producer is starting." );
203
203
204
204
String key = "key" ;
205
205
// Push a message into the list to be sent.
@@ -217,7 +217,7 @@ private void produce(String topic) {
217
217
RecordMetadata m = kafkaProducer .send (record ).get ();
218
218
producedMessages ++;
219
219
220
- logger .log ( Level . WARN , "Message produced, offset: " + m .offset ());
220
+ logger .warn ( "Message produced, offset: " + m .offset ());
221
221
222
222
Thread .sleep (1000 );
223
223
} catch (final Exception e ) {
@@ -226,7 +226,7 @@ private void produce(String topic) {
226
226
System .exit (-1 );
227
227
}
228
228
messageProduced = true ;
229
- logger .log ( Level . WARN , "Producer is shutting down." );
229
+ logger .warn ( "Producer is shutting down." );
230
230
}
231
231
232
232
/**
@@ -250,19 +250,19 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
250
250
@ Override
251
251
public void onPartitionsAssigned (Collection <org .apache .kafka .common .TopicPartition > partitions ) {
252
252
try {
253
- logger .log ( Level . WARN , "Partitions " + partitions + " assigned, consumer seeking to end." );
253
+ logger .warn ( "Partitions " + partitions + " assigned, consumer seeking to end." );
254
254
255
255
for (TopicPartition partition : partitions ) {
256
256
long position = kafkaConsumer .position (partition );
257
- logger .log ( Level . WARN , "current Position: " + position );
257
+ logger .warn ( "current Position: " + position );
258
258
259
- logger .log ( Level . WARN , "Seeking to end..." );
259
+ logger .warn ( "Seeking to end..." );
260
260
kafkaConsumer .seekToEnd (Arrays .asList (partition ));
261
- logger .log ( Level . WARN ,
261
+ logger .warn (
262
262
"Seek from the current position: " + kafkaConsumer .position (partition ));
263
263
kafkaConsumer .seek (partition , position );
264
264
}
265
- logger .log ( Level . WARN , "Producer can now begin producing messages." );
265
+ logger .warn ( "Producer can now begin producing messages." );
266
266
} catch (final Exception e ) {
267
267
logger .error ("Error when assigning partitions" , e );
268
268
}
@@ -273,7 +273,7 @@ public void onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartiti
273
273
274
274
@ Override
275
275
public void run () {
276
- logger .log ( Level . WARN , "Consumer is starting." );
276
+ logger .warn ( "Consumer is starting." );
277
277
278
278
while (!closing ) {
279
279
try {
@@ -291,15 +291,15 @@ public void run() {
291
291
292
292
Thread .sleep (1000 );
293
293
} catch (final InterruptedException e ) {
294
- logger .log ( Level . ERROR , "Producer/Consumer loop has been unexpectedly interrupted" );
294
+ logger .error ( "Producer/Consumer loop has been unexpectedly interrupted" );
295
295
shutdown ();
296
296
} catch (final Exception e ) {
297
- logger .log ( Level . ERROR , "Consumer has failed with exception: " + e );
297
+ logger .error ( "Consumer has failed with exception: " + e );
298
298
shutdown ();
299
299
}
300
300
}
301
301
302
- logger .log ( Level . WARN , "Consumer is shutting down." );
302
+ logger .warn ( "Consumer is shutting down." );
303
303
kafkaConsumer .close ();
304
304
consumedMessages .clear ();
305
305
}
0 commit comments