Skip to content

Commit 5a2fcdd

Browse files
cmccabejunrao
authored andcommitted
KAFKA-4894; Fix findbugs "default character set in use" warnings
Author: Colin P. Mccabe <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #2683 from cmccabe/KAFKA-4894
1 parent 783900c commit 5a2fcdd

File tree

24 files changed

+168
-121
lines changed

24 files changed

+168
-121
lines changed

clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.slf4j.Logger;
2222
import org.slf4j.LoggerFactory;
2323

24+
import java.nio.charset.StandardCharsets;
25+
2426
public class ErrorLoggingCallback implements Callback {
2527
private static final Logger log = LoggerFactory.getLogger(ErrorLoggingCallback.class);
2628
private String topic;
@@ -44,9 +46,9 @@ public ErrorLoggingCallback(String topic, byte[] key, byte[] value, boolean logA
4446
public void onCompletion(RecordMetadata metadata, Exception e) {
4547
if (e != null) {
4648
String keyString = (key == null) ? "null" :
47-
logAsString ? new String(key) : key.length + " bytes";
49+
logAsString ? new String(key, StandardCharsets.UTF_8) : key.length + " bytes";
4850
String valueString = (valueLength == -1) ? "null" :
49-
logAsString ? new String(value) : valueLength + " bytes";
51+
logAsString ? new String(value, StandardCharsets.UTF_8) : valueLength + " bytes";
5052
log.error("Error when sending message to topic {} with key: {}, value: {} with error:",
5153
topic, keyString, valueString, e);
5254
}

clients/src/main/java/org/apache/kafka/common/utils/Shell.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.BufferedReader;
2020
import java.io.IOException;
2121
import java.io.InputStreamReader;
22+
import java.nio.charset.StandardCharsets;
2223
import java.util.Timer;
2324
import java.util.TimerTask;
2425
import java.util.concurrent.atomic.AtomicBoolean;
@@ -87,8 +88,10 @@ private void runCommand() throws IOException {
8788
//One time scheduling.
8889
timeoutTimer.schedule(new ShellTimeoutTimerTask(this), timeout);
8990
}
90-
final BufferedReader errReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
91-
BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
91+
final BufferedReader errReader = new BufferedReader(
92+
new InputStreamReader(process.getErrorStream(), StandardCharsets.UTF_8));
93+
BufferedReader inReader = new BufferedReader(
94+
new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8));
9295
final StringBuffer errMsg = new StringBuffer();
9396

9497
// read error and input streams as this would free up the buffers

connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import java.io.FileNotFoundException;
2828
import java.io.FileOutputStream;
2929
import java.io.PrintStream;
30+
import java.io.UnsupportedEncodingException;
31+
import java.nio.charset.StandardCharsets;
3032
import java.util.Collection;
3133
import java.util.Map;
3234

@@ -60,8 +62,9 @@ public void start(Map<String, String> props) {
6062
outputStream = System.out;
6163
} else {
6264
try {
63-
outputStream = new PrintStream(new FileOutputStream(filename, true));
64-
} catch (FileNotFoundException e) {
65+
outputStream = new PrintStream(new FileOutputStream(filename, true), false,
66+
StandardCharsets.UTF_8.name());
67+
} catch (FileNotFoundException | UnsupportedEncodingException e) {
6568
throw new ConnectException("Couldn't find or create file for FileStreamSinkTask", e);
6669
}
6770
}

connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.IOException;
2323
import java.io.InputStream;
2424
import java.io.InputStreamReader;
25+
import java.nio.charset.StandardCharsets;
2526
import java.util.ArrayList;
2627
import java.util.Collections;
2728
import java.util.List;
@@ -64,7 +65,7 @@ public void start(Map<String, String> props) {
6465
stream = System.in;
6566
// Tracking offset for stdin doesn't make sense
6667
streamOffset = null;
67-
reader = new BufferedReader(new InputStreamReader(stream));
68+
reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
6869
}
6970
topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
7071
if (topic == null)
@@ -99,7 +100,7 @@ public List<SourceRecord> poll() throws InterruptedException {
99100
} else {
100101
streamOffset = 0L;
101102
}
102-
reader = new BufferedReader(new InputStreamReader(stream));
103+
reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
103104
log.debug("Opened {} for reading", logFilename());
104105
} catch (FileNotFoundException e) {
105106
log.warn("Couldn't find file {} for FileStreamSourceTask, sleeping to wait for it to be created", logFilename());

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.io.ByteArrayOutputStream;
3838
import java.io.PrintStream;
3939
import java.io.UnsupportedEncodingException;
40+
import java.nio.charset.StandardCharsets;
4041
import java.util.ArrayList;
4142
import java.util.Collection;
4243
import java.util.Collections;
@@ -339,8 +340,8 @@ protected Connector getConnector(String connType) {
339340

340341
private String trace(Throwable t) {
341342
ByteArrayOutputStream output = new ByteArrayOutputStream();
342-
t.printStackTrace(new PrintStream(output));
343343
try {
344+
t.printStackTrace(new PrintStream(output, false, StandardCharsets.UTF_8.name()));
344345
return output.toString("UTF-8");
345346
} catch (UnsupportedEncodingException e) {
346347
return null;

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.net.HttpURLConnection;
5454
import java.net.URI;
5555
import java.net.URL;
56+
import java.nio.charset.StandardCharsets;
5657
import java.util.EnumSet;
5758
import java.util.List;
5859
import java.util.Map;
@@ -213,7 +214,7 @@ public static <T> HttpResponse<T> httpRequest(String url, String method, Object
213214
connection.setDoOutput(true);
214215

215216
OutputStream os = connection.getOutputStream();
216-
os.write(serializedBody.getBytes());
217+
os.write(serializedBody.getBytes(StandardCharsets.UTF_8));
217218
os.flush();
218219
os.close();
219220
}

core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package kafka.coordinator
1919

2020
import java.io.PrintStream
2121
import java.nio.ByteBuffer
22+
import java.nio.charset.StandardCharsets
2223
import java.util.concurrent.TimeUnit
2324
import java.util.concurrent.atomic.AtomicBoolean
2425
import java.util.concurrent.locks.ReentrantLock
@@ -1045,10 +1046,10 @@ object GroupMetadataManager {
10451046
val formattedValue =
10461047
if (value == null) "NULL"
10471048
else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
1048-
output.write(groupTopicPartition.toString.getBytes)
1049-
output.write("::".getBytes)
1050-
output.write(formattedValue.getBytes)
1051-
output.write("\n".getBytes)
1049+
output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))
1050+
output.write("::".getBytes(StandardCharsets.UTF_8))
1051+
output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
1052+
output.write("\n".getBytes(StandardCharsets.UTF_8))
10521053
case _ => // no-op
10531054
}
10541055
}
@@ -1066,10 +1067,10 @@ object GroupMetadataManager {
10661067
val formattedValue =
10671068
if (value == null) "NULL"
10681069
else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString
1069-
output.write(groupId.getBytes)
1070-
output.write("::".getBytes)
1071-
output.write(formattedValue.getBytes)
1072-
output.write("\n".getBytes)
1070+
output.write(groupId.getBytes(StandardCharsets.UTF_8))
1071+
output.write("::".getBytes(StandardCharsets.UTF_8))
1072+
output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
1073+
output.write("\n".getBytes(StandardCharsets.UTF_8))
10731074
case _ => // no-op
10741075
}
10751076
}

core/src/main/scala/kafka/server/OffsetCheckpoint.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.collection._
2525
import kafka.utils.{Exit, Logging}
2626
import kafka.common._
2727
import java.io._
28+
import java.nio.charset.StandardCharsets
2829

2930
import org.apache.kafka.common.TopicPartition
3031

@@ -47,7 +48,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
4748
lock synchronized {
4849
// write to temp file and then swap with the existing file
4950
val fileOutputStream = new FileOutputStream(tempPath.toFile)
50-
val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream))
51+
val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
5152
try {
5253
writer.write(CurrentVersion.toString)
5354
writer.newLine()
@@ -83,7 +84,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
8384
new IOException(s"Malformed line in offset checkpoint file: $line'")
8485

8586
lock synchronized {
86-
val reader = new BufferedReader(new FileReader(file))
87+
val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))
8788
var line: String = null
8889
try {
8990
line = reader.readLine()

core/src/main/scala/kafka/tools/ConsoleConsumer.scala

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
package kafka.tools
1919

2020
import java.io.PrintStream
21+
import java.nio.charset.StandardCharsets
2122
import java.util.concurrent.CountDownLatch
2223
import java.util.{Locale, Properties, Random}
24+
2325
import joptsimple._
2426
import kafka.api.OffsetRequest
2527
import kafka.common.{MessageFormatter, StreamEndException}
@@ -33,6 +35,7 @@ import org.apache.kafka.common.record.TimestampType
3335
import org.apache.kafka.common.serialization.Deserializer
3436
import org.apache.kafka.common.utils.Utils
3537
import org.apache.log4j.Logger
38+
3639
import scala.collection.JavaConverters._
3740

3841
/**
@@ -412,8 +415,8 @@ object ConsoleConsumer extends Logging {
412415
class DefaultMessageFormatter extends MessageFormatter {
413416
var printKey = false
414417
var printTimestamp = false
415-
var keySeparator = "\t".getBytes
416-
var lineSeparator = "\n".getBytes
418+
var keySeparator = "\t".getBytes(StandardCharsets.UTF_8)
419+
var lineSeparator = "\n".getBytes(StandardCharsets.UTF_8)
417420

418421
var keyDeserializer: Option[Deserializer[_]] = None
419422
var valueDeserializer: Option[Deserializer[_]] = None
@@ -424,9 +427,9 @@ class DefaultMessageFormatter extends MessageFormatter {
424427
if (props.containsKey("print.key"))
425428
printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true")
426429
if (props.containsKey("key.separator"))
427-
keySeparator = props.getProperty("key.separator").getBytes
430+
keySeparator = props.getProperty("key.separator").getBytes(StandardCharsets.UTF_8)
428431
if (props.containsKey("line.separator"))
429-
lineSeparator = props.getProperty("line.separator").getBytes
432+
lineSeparator = props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8)
430433
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
431434
if (props.containsKey("key.deserializer"))
432435
keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
@@ -438,8 +441,9 @@ class DefaultMessageFormatter extends MessageFormatter {
438441
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
439442

440443
def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], separator: Array[Byte]) {
441-
val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes)
442-
val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.getBytes).getOrElse(nonNullBytes)
444+
val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))
445+
val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.
446+
getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
443447
output.write(convertedBytes)
444448
output.write(separator)
445449
}
@@ -448,9 +452,9 @@ class DefaultMessageFormatter extends MessageFormatter {
448452

449453
if (printTimestamp) {
450454
if (timestampType != TimestampType.NO_TIMESTAMP_TYPE)
451-
output.write(s"$timestampType:$timestamp".getBytes)
455+
output.write(s"$timestampType:$timestamp".getBytes(StandardCharsets.UTF_8))
452456
else
453-
output.write(s"NO_TIMESTAMP".getBytes)
457+
output.write(s"NO_TIMESTAMP".getBytes(StandardCharsets.UTF_8))
454458
output.write(keySeparator)
455459
}
456460

@@ -470,8 +474,8 @@ class LoggingMessageFormatter extends MessageFormatter {
470474
defaultWriter.writeTo(consumerRecord, output)
471475
if (logger.isInfoEnabled)
472476
logger.info({if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) s"$timestampType:$timestamp, " else ""} +
473-
s"key:${if (key == null) "null" else new String(key)}, " +
474-
s"value:${if (value == null) "null" else new String(value)}")
477+
s"key:${if (key == null) "null" else new String(key, StandardCharsets.UTF_8)}, " +
478+
s"value:${if (value == null) "null" else new String(value, StandardCharsets.UTF_8)}")
475479
}
476480
}
477481

core/src/main/scala/kafka/tools/ConsoleProducer.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
2424
import kafka.producer.{NewShinyProducer, OldProducer}
2525
import java.util.Properties
2626
import java.io._
27+
import java.nio.charset.StandardCharsets
2728

2829
import joptsimple._
2930
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
@@ -301,7 +302,7 @@ object ConsoleProducer {
301302
keySeparator = props.getProperty("key.separator")
302303
if (props.containsKey("ignore.error"))
303304
ignoreError = props.getProperty("ignore.error").trim.equalsIgnoreCase("true")
304-
reader = new BufferedReader(new InputStreamReader(inputStream))
305+
reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))
305306
}
306307

307308
override def readMessage() = {
@@ -312,14 +313,14 @@ object ConsoleProducer {
312313
case (line, true) =>
313314
line.indexOf(keySeparator) match {
314315
case -1 =>
315-
if (ignoreError) new ProducerRecord(topic, line.getBytes)
316+
if (ignoreError) new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
316317
else throw new KafkaException(s"No key found on line $lineNumber: $line")
317318
case n =>
318-
val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes
319-
new ProducerRecord(topic, line.substring(0, n).getBytes, value)
319+
val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes(StandardCharsets.UTF_8)
320+
new ProducerRecord(topic, line.substring(0, n).getBytes(StandardCharsets.UTF_8), value)
320321
}
321322
case (line, false) =>
322-
new ProducerRecord(topic, line.getBytes)
323+
new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
323324
}
324325
}
325326
}

core/src/main/scala/kafka/tools/EndToEndLatency.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package kafka.tools
1919

20+
import java.nio.charset.StandardCharsets
2021
import java.util.{Arrays, Collections, Properties}
2122

2223
import kafka.utils.Exit
@@ -113,8 +114,8 @@ object EndToEndLatency {
113114
}
114115

115116
//Check result matches the original record
116-
val sent = new String(message)
117-
val read = new String(recordIter.next().value())
117+
val sent = new String(message, StandardCharsets.UTF_8)
118+
val read = new String(recordIter.next().value(), StandardCharsets.UTF_8)
118119
if (!read.equals(sent)) {
119120
finalise()
120121
throw new RuntimeException(s"The message read [$read] did not match the message sent [$sent]")

core/src/main/scala/kafka/tools/ExportZkOffsets.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
package kafka.tools
1919

20-
import java.io.FileWriter
20+
import java.io.{FileOutputStream, FileWriter, OutputStreamWriter}
21+
import java.nio.charset.StandardCharsets
2122

2223
import joptsimple._
2324
import kafka.utils.{CommandLineUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils}
@@ -76,7 +77,8 @@ object ExportZkOffsets extends Logging {
7677
val outfile = options.valueOf(outFileOpt)
7778

7879
var zkUtils : ZkUtils = null
79-
val fileWriter : FileWriter = new FileWriter(outfile)
80+
val fileWriter : OutputStreamWriter =
81+
new OutputStreamWriter(new FileOutputStream(outfile), StandardCharsets.UTF_8)
8082

8183
try {
8284
zkUtils = ZkUtils(zkConnect,

core/src/main/scala/kafka/tools/ImportZkOffsets.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
package kafka.tools
1919

20-
import java.io.BufferedReader
21-
import java.io.FileReader
20+
import java.io.{BufferedReader, FileInputStream, InputStreamReader}
21+
import java.nio.charset.StandardCharsets
2222

2323
import joptsimple._
2424
import kafka.utils.{CommandLineUtils, Exit, Logging, ZkUtils}
@@ -77,7 +77,7 @@ object ImportZkOffsets extends Logging {
7777
}
7878

7979
private def getPartitionOffsetsFromFile(filename: String):Map[String,String] = {
80-
val fr = new FileReader(filename)
80+
val fr = new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8)
8181
val br = new BufferedReader(fr)
8282
var partOffsetsMap: Map[String,String] = Map()
8383

core/src/main/scala/kafka/tools/ProducerPerformance.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong
2727
import java.util._
2828
import java.text.SimpleDateFormat
2929
import java.math.BigInteger
30+
import java.nio.charset.StandardCharsets
3031

3132
import org.apache.kafka.common.utils.Utils
3233
import org.apache.log4j.Logger
@@ -245,7 +246,7 @@ object ProducerPerformance extends Logging {
245246

246247
val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x')
247248
debug(seqMsgString)
248-
seqMsgString.getBytes()
249+
seqMsgString.getBytes(StandardCharsets.UTF_8)
249250
}
250251

251252
private def generateProducerData(topic: String, messageId: Long): Array[Byte] = {
@@ -276,7 +277,7 @@ object ProducerPerformance extends Logging {
276277
Thread.sleep(config.messageSendGapMs)
277278
})
278279
} catch {
279-
case e: Throwable => error("Error when sending message " + new String(message), e)
280+
case e: Throwable => error("Error when sending message " + new String(message, StandardCharsets.UTF_8), e)
280281
}
281282
i += 1
282283
}

0 commit comments

Comments
 (0)