Skip to content

Commit 0d55f0f

Browse files
stanislavkozlovskicmccabe
authored andcommitted
KAFKA-8102: Add an interval-based Trogdor transaction generator (#6444)
This patch adds a TimeIntervalTransactionsGenerator class which enables the Trogdor ProduceBench worker to commit transactions based on a configurable millisecond time interval. Also, we now handle 409 create task responses in the coordinator command-line client by printing a more informative message Reviewers: Colin P. McCabe <[email protected]>
1 parent 1baba1b commit 0d55f0f

File tree

7 files changed

+123
-3
lines changed

7 files changed

+123
-3
lines changed

TROGDOR.md

+1
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ Trogdor can run several workloads. Workloads perform operations on the cluster
105105

106106
### ProduceBench
107107
ProduceBench starts a Kafka producer on a single agent node, producing to several partitions. The workload measures the average produce latency, as well as the median, 95th percentile, and 99th percentile latency.
108+
It can be configured to use a transactional producer which can commit transactions based on a set time interval or number of messages.
108109

109110
### RoundTripWorkload
110111
RoundTripWorkload tests both production and consumption. The workload starts a Kafka producer and consumer on a single node. The consumer will read back the messages that were produced by the producer.

tests/spec/transactional-produce-bench.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
//
1717
// An example task specification for running a transactional producer benchmark
18-
in Trogdor. See TROGDOR.md for details.
18+
// in Trogdor. See TROGDOR.md for details.
1919
//
2020

2121
{

tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.kafka.trogdor.rest.TaskState;
4545
import org.apache.kafka.trogdor.rest.TasksResponse;
4646
import org.apache.kafka.trogdor.task.TaskSpec;
47+
import org.apache.kafka.trogdor.rest.RequestConflictException;
4748
import org.apache.kafka.trogdor.rest.UptimeResponse;
4849
import org.slf4j.Logger;
4950
import org.slf4j.LoggerFactory;
@@ -426,8 +427,15 @@ public static void main(String[] args) throws Exception {
426427
TaskSpec taskSpec = JsonUtil.
427428
objectFromCommandLineArgument(res.getString("taskSpec"), TaskSpec.class);
428429
CreateTaskRequest req = new CreateTaskRequest(taskId, taskSpec);
429-
client.createTask(req);
430-
System.out.printf("Sent CreateTaskRequest for task %s.%n", req.id());
430+
try {
431+
client.createTask(req);
432+
System.out.printf("Sent CreateTaskRequest for task %s.%n", req.id());
433+
} catch (RequestConflictException rce) {
434+
System.out.printf("CreateTaskRequest for task %s got a 409 status code - " +
435+
"a task with the same ID but a different specification already exists.%nException: %s%n",
436+
req.id(), rce.getMessage());
437+
Exit.exit(1);
438+
}
431439
break;
432440
}
433441
case "stopTask": {

tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java

+1
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ TreeMap<String, Long> activeWorkerIds() {
302302
*
303303
* @param id The ID of the task to create.
304304
* @param spec The specification of the task to create.
305+
* @throws RequestConflictException - if a task with the same ID but different spec exists
305306
*/
306307
public void createTask(final String id, TaskSpec spec)
307308
throws Throwable {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.trogdor.workload;
18+
19+
import com.fasterxml.jackson.annotation.JsonCreator;
20+
import com.fasterxml.jackson.annotation.JsonProperty;
21+
import org.apache.kafka.common.utils.Time;
22+
23+
/**
24+
* A transactions generator where we commit a transaction every N milliseconds
25+
*/
26+
public class TimeIntervalTransactionsGenerator implements TransactionGenerator {
27+
28+
private static final long NULL_START_MS = -1;
29+
30+
private final Time time;
31+
private final int intervalMs;
32+
33+
private long lastTransactionStartMs = NULL_START_MS;
34+
35+
@JsonCreator
36+
public TimeIntervalTransactionsGenerator(@JsonProperty("transactionIntervalMs") int intervalMs) {
37+
this(intervalMs, Time.SYSTEM);
38+
}
39+
40+
TimeIntervalTransactionsGenerator(@JsonProperty("transactionIntervalMs") int intervalMs,
41+
Time time) {
42+
if (intervalMs < 1) {
43+
throw new IllegalArgumentException("Cannot have a negative interval");
44+
}
45+
this.time = time;
46+
this.intervalMs = intervalMs;
47+
}
48+
49+
@JsonProperty
50+
public int transactionIntervalMs() {
51+
return intervalMs;
52+
}
53+
54+
@Override
55+
public synchronized TransactionAction nextAction() {
56+
if (lastTransactionStartMs == NULL_START_MS) {
57+
lastTransactionStartMs = time.milliseconds();
58+
return TransactionAction.BEGIN_TRANSACTION;
59+
}
60+
if (time.milliseconds() - lastTransactionStartMs >= intervalMs) {
61+
lastTransactionStartMs = NULL_START_MS;
62+
return TransactionAction.COMMIT_TRANSACTION;
63+
}
64+
65+
return TransactionAction.NO_OP;
66+
}
67+
}

tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
property = "type")
2828
@JsonSubTypes(value = {
2929
@JsonSubTypes.Type(value = UniformTransactionsGenerator.class, name = "uniform"),
30+
@JsonSubTypes.Type(value = TimeIntervalTransactionsGenerator.class, name = "interval"),
3031
})
3132
public interface TransactionGenerator {
3233
enum TransactionAction {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.trogdor.workload;
19+
20+
import org.apache.kafka.common.utils.MockTime;
21+
import org.junit.Test;
22+
23+
import static org.junit.Assert.assertEquals;
24+
25+
public class TimeIntervalTransactionsGeneratorTest {
26+
@Test
27+
public void testCommitsTransactionAfterIntervalPasses() {
28+
MockTime time = new MockTime();
29+
TimeIntervalTransactionsGenerator generator = new TimeIntervalTransactionsGenerator(100, time);
30+
31+
assertEquals(100, generator.transactionIntervalMs());
32+
assertEquals(TransactionGenerator.TransactionAction.BEGIN_TRANSACTION, generator.nextAction());
33+
assertEquals(TransactionGenerator.TransactionAction.NO_OP, generator.nextAction());
34+
time.sleep(50);
35+
assertEquals(TransactionGenerator.TransactionAction.NO_OP, generator.nextAction());
36+
time.sleep(49);
37+
assertEquals(TransactionGenerator.TransactionAction.NO_OP, generator.nextAction());
38+
time.sleep(1);
39+
assertEquals(TransactionGenerator.TransactionAction.COMMIT_TRANSACTION, generator.nextAction());
40+
assertEquals(TransactionGenerator.TransactionAction.BEGIN_TRANSACTION, generator.nextAction());
41+
}
42+
}

0 commit comments

Comments
 (0)