Skip to content

Commit 5a4bbb9

Browse files
authored
KAFKA-7341 Migrate core module to JUnit 5 (apache#9855)
Reviewers: Ismael Juma <[email protected]>
1 parent 7bd10ff commit 5a4bbb9

File tree

276 files changed

+3250
-3877
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

276 files changed

+3250
-3877
lines changed

build.gradle

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,8 @@ subprojects {
242242
}
243243

244244
// Remove the relevant project name once it's converted to JUnit 5
245-
def shouldUseJUnit5 = !(["connect", "core", "generator", "runtime", "examples",
246-
"streams-scala", "streams"].contains(it.project.name) || it.project.name.startsWith("upgrade-system-tests-"))
245+
def shouldUseJUnit5 = !(["connect", "generator", "runtime", "examples", "streams-scala",
246+
"streams"].contains(it.project.name) || it.project.name.startsWith("upgrade-system-tests-"))
247247

248248
def testLoggingEvents = ["passed", "skipped", "failed"]
249249
def testShowStandardStreams = false
@@ -786,9 +786,7 @@ project(':core') {
786786
testCompile libs.apachedsLdifPartition
787787
testCompile libs.apachedsMavibotPartition
788788
testCompile libs.apachedsJdbmPartition
789-
testCompile libs.junitJupiterApi
790-
testCompile libs.junitVintageEngine
791-
testCompile libs.scalatest
789+
testCompile libs.junitJupiter
792790
testCompile libs.slf4jlog4j
793791
testCompile libs.jfreechart
794792
}

clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import javax.management.MBeanServer;
7474
import javax.management.ObjectName;
7575
import java.lang.management.ManagementFactory;
76+
import java.nio.charset.StandardCharsets;
7677
import java.time.Duration;
7778
import java.util.ArrayList;
7879
import java.util.Arrays;
@@ -1308,6 +1309,13 @@ public void testUnusedConfigs() {
13081309
}
13091310
}
13101311

1312+
@Test
1313+
public void testNullTopicName() {
1314+
// send a record with null topic should fail
1315+
assertThrows(IllegalArgumentException.class, () -> new ProducerRecord<>(null, 1,
1316+
"key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8)));
1317+
}
1318+
13111319
private static final List<String> CLIENT_IDS = new ArrayList<>();
13121320

13131321
public static class SerializerForClientId implements Serializer<byte[]> {

core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,15 @@ package kafka.admin
1919

2020
import java.io.{ByteArrayOutputStream, PrintStream}
2121
import java.nio.charset.StandardCharsets
22-
2322
import scala.collection.Seq
24-
2523
import kafka.integration.KafkaServerTestHarness
2624
import kafka.server.KafkaConfig
2725
import kafka.utils.TestUtils
2826
import org.apache.kafka.clients.NodeApiVersions
2927
import org.apache.kafka.common.protocol.ApiKeys
30-
import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue}
31-
import org.junit.Test
28+
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue}
29+
import org.junit.jupiter.api.Test
30+
import org.junit.jupiter.api.Timeout
3231

3332
import scala.jdk.CollectionConverters._
3433

@@ -45,7 +44,8 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
4544
props
4645
}).map(KafkaConfig.fromProps)
4746

48-
@Test(timeout=120000)
47+
@Timeout(120)
48+
@Test
4949
def checkBrokerApiVersionCommandOutput(): Unit = {
5050
val byteArrayOutputStream = new ByteArrayOutputStream
5151
val printStream = new PrintStream(byteArrayOutputStream, false, StandardCharsets.UTF_8.name())

core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,18 @@ import org.apache.kafka.common.network.ListenerName
3232
import org.apache.kafka.common.security.auth.SecurityProtocol
3333
import org.apache.kafka.common.utils.Utils
3434
import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
35-
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
36-
import org.junit.rules.Timeout
37-
import org.junit.{After, Rule, Test}
35+
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
36+
import org.junit.jupiter.api.{AfterEach, Test, Timeout}
3837

3938
import scala.collection.{Map, Seq, mutable}
4039
import scala.jdk.CollectionConverters._
4140

41+
@Timeout(300)
4242
class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
43-
@Rule
44-
def globalTimeout: Timeout = Timeout.millis(300000)
4543

4644
var cluster: ReassignPartitionsTestCluster = null
4745

48-
@After
46+
@AfterEach
4947
override def tearDown(): Unit = {
5048
Utils.closeQuietly(cluster, "ReassignPartitionsTestCluster")
5149
super.tearDown()
@@ -122,8 +120,7 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
122120

123121
// Execute the assignment
124122
runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L)
125-
assertEquals(unthrottledBrokerConfigs,
126-
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
123+
assertEquals(unthrottledBrokerConfigs, describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
127124
val finalAssignment = Map(
128125
new TopicPartition("foo", 0) ->
129126
PartitionReassignmentState(Seq(0, 1, 3), Seq(0, 1, 3), true),
@@ -193,9 +190,8 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
193190
cluster.servers(3).replicaManager.nonOfflinePartition(part).
194191
flatMap(_.leaderLogIfLocal).isDefined
195192
}, "broker 3 should be the new leader", pause = 10L)
196-
assertEquals(s"Expected broker 3 to have the correct high water mark for the " +
197-
"partition.", 123L, cluster.servers(3).replicaManager.
198-
localLogOrException(part).highWatermark)
193+
assertEquals(123L, cluster.servers(3).replicaManager.localLogOrException(part).highWatermark,
194+
s"Expected broker 3 to have the correct high water mark for the partition.")
199195
}
200196

201197
@Test
@@ -251,12 +247,9 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
251247
PartitionReassignmentState(Seq(0, 1, 2), Seq(0, 3, 2), true),
252248
new TopicPartition("baz", 2) ->
253249
PartitionReassignmentState(Seq(0, 2, 1), Seq(3, 2, 1), true))
254-
assertEquals(VerifyAssignmentResult(initialAssignment),
255-
runVerifyAssignment(cluster.adminClient, assignment, false))
256-
assertEquals(VerifyAssignmentResult(initialAssignment),
257-
runVerifyAssignment(zkClient, assignment, false))
258-
assertEquals(unthrottledBrokerConfigs,
259-
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
250+
assertEquals(VerifyAssignmentResult(initialAssignment), runVerifyAssignment(cluster.adminClient, assignment, false))
251+
assertEquals(VerifyAssignmentResult(initialAssignment), runVerifyAssignment(zkClient, assignment, false))
252+
assertEquals(unthrottledBrokerConfigs, describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
260253

261254
// Execute the assignment
262255
val interBrokerThrottle = 300000L
@@ -277,12 +270,9 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
277270
if (!result.partsOngoing) {
278271
true
279272
} else {
280-
assertTrue("Expected at least one partition reassignment to be ongoing when " +
281-
s"result = ${result}", !result.partStates.forall(_._2.done))
282-
assertEquals(Seq(0, 3, 2),
283-
result.partStates(new TopicPartition("foo", 0)).targetReplicas)
284-
assertEquals(Seq(3, 2, 1),
285-
result.partStates(new TopicPartition("baz", 2)).targetReplicas)
273+
assertFalse(result.partStates.forall(_._2.done), s"Expected at least one partition reassignment to be ongoing when result = $result")
274+
assertEquals(Seq(0, 3, 2), result.partStates(new TopicPartition("foo", 0)).targetReplicas)
275+
assertEquals(Seq(3, 2, 1), result.partStates(new TopicPartition("baz", 2)).targetReplicas)
286276
logger.info(s"Current result: ${result}")
287277
waitForInterBrokerThrottle(Set(0, 1, 2, 3), interBrokerThrottle)
288278
false
@@ -469,8 +459,7 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
469459

470460
val info1 = new BrokerDirs(cluster.adminClient.describeLogDirs(0.to(4).
471461
map(_.asInstanceOf[Integer]).asJavaCollection), 0)
472-
assertEquals(reassignment.targetDir,
473-
info1.curLogDirs.getOrElse(topicPartition, ""))
462+
assertEquals(reassignment.targetDir, info1.curLogDirs.getOrElse(topicPartition, ""))
474463
}
475464

476465
@Test

core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import org.apache.kafka.common.record.TimestampType
2626
import org.apache.kafka.common.TopicPartition
2727
import kafka.utils.{ShutdownableThread, TestUtils}
2828
import kafka.server.{BaseRequestTest, KafkaConfig}
29-
import org.junit.Assert._
30-
import org.junit.Before
29+
import org.junit.jupiter.api.Assertions._
30+
import org.junit.jupiter.api.BeforeEach
3131

3232
import scala.jdk.CollectionConverters._
3333
import scala.collection.mutable.{ArrayBuffer, Buffer}
@@ -73,7 +73,7 @@ abstract class AbstractConsumerTest extends BaseRequestTest {
7373
properties.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "10")
7474
}
7575

76-
@Before
76+
@BeforeEach
7777
override def setUp(): Unit = {
7878
super.setUp()
7979

@@ -134,8 +134,8 @@ abstract class AbstractConsumerTest extends BaseRequestTest {
134134
val timestamp = startingTimestamp + i
135135
assertEquals(timestamp.toLong, record.timestamp)
136136
} else
137-
assertTrue(s"Got unexpected timestamp ${record.timestamp}. Timestamp should be between [$startingTimestamp, $now}]",
138-
record.timestamp >= startingTimestamp && record.timestamp <= now)
137+
assertTrue(record.timestamp >= startingTimestamp && record.timestamp <= now,
138+
s"Got unexpected timestamp ${record.timestamp}. Timestamp should be between [$startingTimestamp, $now}]")
139139
assertEquals(offset.toLong, record.offset)
140140
val keyAndValueIndex = startingKeyAndValueIndex + i
141141
assertEquals(s"key $keyAndValueIndex", new String(record.key))

core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package kafka.api
1616
import java.util
1717
import java.util.Properties
1818
import java.util.concurrent.ExecutionException
19-
2019
import kafka.integration.KafkaServerTestHarness
2120
import kafka.log.LogConfig
2221
import kafka.server.{Defaults, KafkaConfig}
@@ -26,33 +25,30 @@ import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
2625
import org.apache.kafka.common.errors.{InvalidRequestException, PolicyViolationException}
2726
import org.apache.kafka.common.utils.Utils
2827
import org.apache.kafka.server.policy.AlterConfigPolicy
29-
import org.junit.Assert.{assertEquals, assertNull, assertTrue, assertThrows}
30-
import org.junit.{After, Before, Rule, Test}
31-
import org.junit.rules.Timeout
28+
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, assertTrue}
29+
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout}
3230

3331
import scala.annotation.nowarn
3432
import scala.jdk.CollectionConverters._
3533

3634
/**
3735
* Tests AdminClient calls when the broker is configured with policies like AlterConfigPolicy, CreateTopicPolicy, etc.
3836
*/
37+
@Timeout(120)
3938
class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with Logging {
4039

4140
import AdminClientWithPoliciesIntegrationTest._
4241

4342
var client: Admin = null
4443
val brokerCount = 3
4544

46-
@Rule
47-
def globalTimeout = Timeout.millis(120000)
48-
49-
@Before
45+
@BeforeEach
5046
override def setUp(): Unit = {
5147
super.setUp()
5248
TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
5349
}
5450

55-
@After
51+
@AfterEach
5652
override def tearDown(): Unit = {
5753
if (client != null)
5854
Utils.closeQuietly(client, "AdminClient")
@@ -141,10 +137,8 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
141137
var configs = describeResult.all.get
142138
assertEquals(4, configs.size)
143139

144-
assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
145-
configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
146-
assertEquals(Defaults.MinInSyncReplicas.toString,
147-
configs.get(topicResource1).get(LogConfig.MinInSyncReplicasProp).value)
140+
assertEquals(Defaults.LogCleanerMinCleanRatio.toString, configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
141+
assertEquals(Defaults.MinInSyncReplicas.toString, configs.get(topicResource1).get(LogConfig.MinInSyncReplicasProp).value)
148142

149143
assertEquals("0.8", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
150144

@@ -171,10 +165,8 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
171165
configs = describeResult.all.get
172166
assertEquals(4, configs.size)
173167

174-
assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
175-
configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
176-
assertEquals(Defaults.MinInSyncReplicas.toString,
177-
configs.get(topicResource1).get(LogConfig.MinInSyncReplicasProp).value)
168+
assertEquals(Defaults.LogCleanerMinCleanRatio.toString, configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
169+
assertEquals(Defaults.MinInSyncReplicas.toString, configs.get(topicResource1).get(LogConfig.MinInSyncReplicasProp).value)
178170

179171
assertEquals("0.8", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
180172

core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
5858
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol}
5959
import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition, requests, Uuid}
6060
import org.apache.kafka.test.{TestUtils => JTestUtils}
61-
import org.junit.Assert._
62-
import org.junit.{After, Before, Test}
61+
import org.junit.jupiter.api.Assertions._
62+
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
6363

6464
import scala.annotation.nowarn
6565
import scala.jdk.CollectionConverters._
@@ -276,7 +276,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
276276
ApiKeys.OFFSET_DELETE -> groupReadAcl
277277
)
278278

279-
@Before
279+
@BeforeEach
280280
override def setUp(): Unit = {
281281
doSetup(createOffsetsTopic = false)
282282

@@ -286,7 +286,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
286286
TestUtils.createOffsetsTopic(zkClient, servers)
287287
}
288288

289-
@After
289+
@AfterEach
290290
override def tearDown(): Unit = {
291291
adminClients.foreach(_.close())
292292
removeAllClientAcls()
@@ -1640,23 +1640,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
16401640
removeAclIdempotenceRequired()
16411641
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource)
16421642

1643-
try {
1644-
// the send should now fail with a cluster auth error
1645-
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)).get()
1646-
fail("Should have raised ClusterAuthorizationException")
1647-
} catch {
1648-
case e: ExecutionException =>
1649-
assertTrue(e.getCause.isInstanceOf[TopicAuthorizationException])
1650-
}
1651-
try {
1652-
// the second time, the call to send itself should fail (the producer becomes unusable
1653-
// if no producerId can be obtained)
1654-
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)).get()
1655-
fail("Should have raised ClusterAuthorizationException")
1656-
} catch {
1657-
case e: ExecutionException =>
1658-
assertTrue(e.getCause.isInstanceOf[TopicAuthorizationException])
1659-
}
1643+
// the send should now fail with a cluster auth error
1644+
var e = assertThrows(classOf[ExecutionException], () => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)).get())
1645+
assertTrue(e.getCause.isInstanceOf[TopicAuthorizationException])
1646+
1647+
// the second time, the call to send itself should fail (the producer becomes unusable
1648+
// if no producerId can be obtained)
1649+
e = assertThrows(classOf[ExecutionException], () => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)).get())
1650+
assertTrue(e.getCause.isInstanceOf[TopicAuthorizationException])
16601651
}
16611652

16621653
@Test
@@ -1776,7 +1767,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
17761767
val request = new requests.MetadataRequest.Builder(List.empty.asJava, false).build()
17771768
val response = connectAndReceive[MetadataResponse](request)
17781769
assertEquals(Collections.emptyMap, response.errorCounts)
1779-
assertFalse("Cluster id not returned", response.clusterId.isEmpty)
1770+
assertFalse(response.clusterId.isEmpty, "Cluster id not returned")
17801771
}
17811772

17821773
@Test
@@ -1893,14 +1884,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
18931884

18941885
if (topicExists)
18951886
if (isAuthorized)
1896-
assertFalse(s"$apiKey should be allowed. Found unexpected authorization error $error", authorizationErrors.contains(error))
1887+
assertFalse(authorizationErrors.contains(error), s"$apiKey should be allowed. Found unexpected authorization error $error")
18971888
else
1898-
assertTrue(s"$apiKey should be forbidden. Found error $error but expected one of $authorizationErrors", authorizationErrors.contains(error))
1889+
assertTrue(authorizationErrors.contains(error), s"$apiKey should be forbidden. Found error $error but expected one of $authorizationErrors")
18991890
else if (resources == Set(TOPIC))
19001891
if (isAuthorized)
1901-
assertEquals(s"$apiKey had an unexpected error", Errors.UNKNOWN_TOPIC_OR_PARTITION, error)
1892+
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, error, s"$apiKey had an unexpected error")
19021893
else
1903-
assertEquals(s"$apiKey had an unexpected error", Errors.TOPIC_AUTHORIZATION_FAILED, error)
1894+
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, error, s"$apiKey had an unexpected error")
19041895

19051896
response
19061897
}

0 commit comments

Comments
 (0)