Skip to content

Commit 15d1cc8

Browse files
MINOR: Improvements and fixes for Trogdor payload generators. (#10621)
* Changes the new Throughput Generators to track messages per window instead of making per-second calculations which can have rounding errors. Also, one of these had a calculation error which prompted this change in the first place. * Fixes a couple typos. * Fixes an error where certain JSON fields were not exposed, causing the workloads to not behave as intended. * Fixes a bug where we use wait not in a loop, which exits too quickly. * Adds additional constant payload generators. * Fixes problems with an example spec. * Fixes several off-by-one comparisons. Reviewers: Colin P. McCabe <[email protected]>
1 parent 45d7440 commit 15d1cc8

8 files changed

+260
-54
lines changed

trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConstantThroughputGenerator.java

+19-20
Original file line numberDiff line numberDiff line change
@@ -26,50 +26,47 @@
2626
* The lower the window size, the smoother the traffic will be. Using a 100ms window offers no noticeable spikes in
2727
* traffic while still being long enough to avoid too much overhead.
2828
*
29-
* WARNING: Due to binary nature of throughput in terms of messages sent in a window, each window will send at least 1
30-
* message, and each window sends the same number of messages, rounded down. For example, 99 messages per second with a
31-
* 100ms window will only send 90 messages per second, or 9 messages per window. Another example, in order to send only
32-
* 5 messages per second, a window size of 200ms is required. In cases like these, both the `messagesPerSecond` and
33-
* `windowSizeMs` parameters should be adjusted together to achieve more accurate throughput.
34-
*
3529
* Here is an example spec:
3630
*
3731
* {
3832
* "type": "constant",
39-
* "messagesPerSecond": 500,
33+
* "messagesPerWindow": 50,
4034
* "windowSizeMs": 100
4135
* }
4236
*
4337
* This will produce a workload that runs 500 messages per second, with a maximum resolution of 50 messages per 100
4438
* millisecond.
39+
*
40+
* If `messagesPerWindow` is less than or equal to 0, `throttle` will not throttle at all and will return immediately.
4541
*/
4642

4743
public class ConstantThroughputGenerator implements ThroughputGenerator {
48-
private final int messagesPerSecond;
4944
private final int messagesPerWindow;
5045
private final long windowSizeMs;
5146

5247
private long nextWindowStarts = 0;
5348
private int messageTracker = 0;
5449

5550
@JsonCreator
56-
public ConstantThroughputGenerator(@JsonProperty("messagesPerSecond") int messagesPerSecond,
51+
public ConstantThroughputGenerator(@JsonProperty("messagesPerWindow") int messagesPerWindow,
5752
@JsonProperty("windowSizeMs") long windowSizeMs) {
58-
// Calcualte the default values.
53+
// Calculate the default values.
5954
if (windowSizeMs <= 0) {
6055
windowSizeMs = 100;
6156
}
6257
this.windowSizeMs = windowSizeMs;
63-
this.messagesPerSecond = messagesPerSecond;
64-
65-
// Use the rest of the parameters to calculate window properties.
66-
this.messagesPerWindow = (int) ((long) messagesPerSecond / windowSizeMs);
58+
this.messagesPerWindow = messagesPerWindow;
6759
calculateNextWindow();
6860
}
6961

7062
@JsonProperty
71-
public int messagesPerSecond() {
72-
return messagesPerSecond;
63+
public long windowSizeMs() {
64+
return windowSizeMs;
65+
}
66+
67+
@JsonProperty
68+
public int messagesPerWindow() {
69+
return messagesPerWindow;
7370
}
7471

7572
private void calculateNextWindow() {
@@ -79,7 +76,7 @@ private void calculateNextWindow() {
7976
// Calculate the next window start time.
8077
long now = Time.SYSTEM.milliseconds();
8178
if (nextWindowStarts > 0) {
82-
while (nextWindowStarts < now) {
79+
while (nextWindowStarts <= now) {
8380
nextWindowStarts += windowSizeMs;
8481
}
8582
} else {
@@ -89,8 +86,8 @@ private void calculateNextWindow() {
8986

9087
@Override
9188
public synchronized void throttle() throws InterruptedException {
92-
// Run unthrottled if messagesPerSecond is negative.
93-
if (messagesPerSecond < 0) {
89+
// Run unthrottled if messagesPerWindow is not positive.
90+
if (messagesPerWindow <= 0) {
9491
return;
9592
}
9693

@@ -106,7 +103,9 @@ public synchronized void throttle() throws InterruptedException {
106103
if (messageTracker >= messagesPerWindow) {
107104

108105
// Wait the difference in time between now and when the next window starts.
109-
wait(nextWindowStarts - Time.SYSTEM.milliseconds());
106+
while (nextWindowStarts > Time.SYSTEM.milliseconds()) {
107+
wait(nextWindowStarts - Time.SYSTEM.milliseconds());
108+
}
110109
}
111110
}
112111
}

trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianFlushGenerator.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252

5353
public class GaussianFlushGenerator implements FlushGenerator {
5454
private final int messagesPerFlushAverage;
55-
private final int messagesPerFlushDeviation;
55+
private final double messagesPerFlushDeviation;
5656

5757
private final Random random = new Random();
5858

@@ -61,7 +61,7 @@ public class GaussianFlushGenerator implements FlushGenerator {
6161

6262
@JsonCreator
6363
public GaussianFlushGenerator(@JsonProperty("messagesPerFlushAverage") int messagesPerFlushAverage,
64-
@JsonProperty("messagesPerFlushDeviation") int messagesPerFlushDeviation) {
64+
@JsonProperty("messagesPerFlushDeviation") double messagesPerFlushDeviation) {
6565
this.messagesPerFlushAverage = messagesPerFlushAverage;
6666
this.messagesPerFlushDeviation = messagesPerFlushDeviation;
6767
calculateFlushSize();
@@ -73,7 +73,7 @@ public int messagesPerFlushAverage() {
7373
}
7474

7575
@JsonProperty
76-
public long messagesPerFlushDeviation() {
76+
public double messagesPerFlushDeviation() {
7777
return messagesPerFlushDeviation;
7878
}
7979

trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java

+23-26
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,17 @@
2525
/*
2626
* This throughput generator configures throughput with a gaussian normal distribution on a per-window basis. You can
2727
* specify how many windows to keep the throughput at the rate before changing. All traffic will follow a gaussian
28-
* distribution centered around `messagesPerSecondAverage` with a deviation of `messagesPerSecondDeviation`.
28+
* distribution centered around `messagesPerWindowAverage` with a deviation of `messagesPerWindowDeviation`.
2929
*
3030
* The lower the window size, the smoother the traffic will be. Using a 100ms window offers no noticeable spikes in
3131
* traffic while still being long enough to avoid too much overhead.
3232
*
33-
* WARNING: Due to binary nature of throughput in terms of messages sent in a window, this does not work well for an
34-
* average throughput of less than 5 messages per window. In cases where you want lower throughput, please adjust the
35-
* `windowSizeMs` accordingly.
36-
*
3733
* Here is an example spec:
3834
*
3935
* {
4036
* "type": "gaussian",
41-
* "messagesPerSecondAverage": 500,
42-
* "messagesPerSecondDeviation": 50,
37+
* "messagesPerWindowAverage": 50,
38+
* "messagesPerWindowDeviation": 5,
4339
* "windowsUntilRateChange": 100,
4440
* "windowSizeMs": 100
4541
* }
@@ -56,10 +52,8 @@
5652
*/
5753

5854
public class GaussianThroughputGenerator implements ThroughputGenerator {
59-
private final int messagesPerSecondAverage;
60-
private final int messagesPerSecondDeviation;
6155
private final int messagesPerWindowAverage;
62-
private final int messagesPerWindowDeviation;
56+
private final double messagesPerWindowDeviation;
6357
private final int windowsUntilRateChange;
6458
private final long windowSizeMs;
6559

@@ -71,42 +65,43 @@ public class GaussianThroughputGenerator implements ThroughputGenerator {
7165
private int throttleMessages = 0;
7266

7367
@JsonCreator
74-
public GaussianThroughputGenerator(@JsonProperty("messagesPerSecondAverage") int messagesPerSecondAverage,
75-
@JsonProperty("messagesPerSecondDeviation") int messagesPerSecondDeviation,
68+
public GaussianThroughputGenerator(@JsonProperty("messagesPerWindowAverage") int messagesPerWindowAverage,
69+
@JsonProperty("messagesPerWindowDeviation") double messagesPerWindowDeviation,
7670
@JsonProperty("windowsUntilRateChange") int windowsUntilRateChange,
7771
@JsonProperty("windowSizeMs") long windowSizeMs) {
78-
// Calcualte the default values.
72+
// Calculate the default values.
7973
if (windowSizeMs <= 0) {
8074
windowSizeMs = 100;
8175
}
8276
this.windowSizeMs = windowSizeMs;
83-
this.messagesPerSecondAverage = messagesPerSecondAverage;
84-
this.messagesPerSecondDeviation = messagesPerSecondDeviation;
77+
this.messagesPerWindowAverage = messagesPerWindowAverage;
78+
this.messagesPerWindowDeviation = messagesPerWindowDeviation;
8579
this.windowsUntilRateChange = windowsUntilRateChange;
8680

87-
// Take per-second calculations and convert them to per-window calculations.
88-
messagesPerWindowAverage = (int) (messagesPerSecondAverage * windowSizeMs / 1000);
89-
messagesPerWindowDeviation = (int) (messagesPerSecondDeviation * windowSizeMs / 1000);
90-
91-
// Calcualte the first window.
81+
// Calculate the first window.
9282
calculateNextWindow(true);
9383
}
9484

9585
@JsonProperty
96-
public int messagesPerSecondAverage() {
97-
return messagesPerSecondAverage;
86+
public int messagesPerWindowAverage() {
87+
return messagesPerWindowAverage;
9888
}
9989

10090
@JsonProperty
101-
public long messagesPerSecondDeviation() {
102-
return messagesPerSecondDeviation;
91+
public double messagesPerWindowDeviation() {
92+
return messagesPerWindowDeviation;
10393
}
10494

10595
@JsonProperty
10696
public long windowsUntilRateChange() {
10797
return windowsUntilRateChange;
10898
}
10999

100+
@JsonProperty
101+
public long windowSizeMs() {
102+
return windowSizeMs;
103+
}
104+
110105
private synchronized void calculateNextWindow(boolean force) {
111106
// Reset the message count.
112107
messageTracker = 0;
@@ -127,7 +122,7 @@ private synchronized void calculateNextWindow(boolean force) {
127122

128123
// Calculate the number of messages allowed in this window using a normal distribution.
129124
// The formula is: Messages = Gaussian * Deviation + Average
130-
throttleMessages = Math.max((int) (random.nextGaussian() * (double) messagesPerWindowDeviation) + messagesPerWindowAverage, 1);
125+
throttleMessages = Math.max((int) (random.nextGaussian() * messagesPerWindowDeviation) + messagesPerWindowAverage, 1);
131126
}
132127
windowTracker += 1;
133128
}
@@ -146,7 +141,9 @@ public synchronized void throttle() throws InterruptedException {
146141
if (messageTracker >= throttleMessages) {
147142

148143
// Wait the difference in time between now and when the next window starts.
149-
wait(nextWindowStarts - Time.SYSTEM.milliseconds());
144+
while (nextWindowStarts > Time.SYSTEM.milliseconds()) {
145+
wait(nextWindowStarts - Time.SYSTEM.milliseconds());
146+
}
150147
}
151148
}
152149
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.trogdor.workload;
19+
20+
import com.fasterxml.jackson.annotation.JsonCreator;
21+
import com.fasterxml.jackson.annotation.JsonProperty;
22+
import org.apache.kafka.common.utils.Time;
23+
24+
import java.nio.ByteBuffer;
25+
import java.nio.ByteOrder;
26+
import java.util.Random;
27+
28+
/**
29+
* This class behaves identically to TimestampConstantPayloadGenerator, except the message size follows a gaussian
30+
* distribution.
31+
*
32+
* This should be used in conjunction with TimestampRecordProcessor in the Consumer to measure true end-to-end latency
33+
* of a system.
34+
*
35+
* `messageSizeAverage` - The average size in bytes of each message.
36+
* `messageSizeDeviation` - The standard deviation to use when calculating message size.
37+
* `messagesUntilSizeChange` - The number of messages to keep at the same size.
38+
*
39+
* Here is an example spec:
40+
*
41+
* {
42+
* "type": "gaussianTimestampConstant",
43+
* "messageSizeAverage": 512,
44+
* "messageSizeDeviation": 100,
45+
* "messagesUntilSizeChange": 100
46+
* }
47+
*
48+
* This will generate messages on a gaussian distribution with an average size each 512-bytes. The message sizes will
49+
* have a standard deviation of 100 bytes, and the size will only change every 100 messages. The distribution of
50+
* messages will be as follows:
51+
*
52+
* The average size of the messages are 512 bytes.
53+
* ~68% of the messages are between 412 and 612 bytes
54+
* ~95% of the messages are between 312 and 712 bytes
55+
* ~99% of the messages are between 212 and 812 bytes
56+
*/
57+
58+
public class GaussianTimestampConstantPayloadGenerator implements PayloadGenerator {
59+
private final int messageSizeAverage;
60+
private final double messageSizeDeviation;
61+
private final int messagesUntilSizeChange;
62+
private final long seed;
63+
64+
private final Random random = new Random();
65+
private final ByteBuffer buffer;
66+
67+
private int messageTracker = 0;
68+
private int messageSize = 0;
69+
70+
@JsonCreator
71+
public GaussianTimestampConstantPayloadGenerator(@JsonProperty("messageSizeAverage") int messageSizeAverage,
72+
@JsonProperty("messageSizeDeviation") double messageSizeDeviation,
73+
@JsonProperty("messagesUntilSizeChange") int messagesUntilSizeChange,
74+
@JsonProperty("seed") long seed) {
75+
this.messageSizeAverage = messageSizeAverage;
76+
this.messageSizeDeviation = messageSizeDeviation;
77+
this.seed = seed;
78+
this.messagesUntilSizeChange = messagesUntilSizeChange;
79+
buffer = ByteBuffer.allocate(Long.BYTES);
80+
buffer.order(ByteOrder.LITTLE_ENDIAN);
81+
}
82+
83+
@JsonProperty
84+
public int messageSizeAverage() {
85+
return messageSizeAverage;
86+
}
87+
88+
@JsonProperty
89+
public double messageSizeDeviation() {
90+
return messageSizeDeviation;
91+
}
92+
93+
@JsonProperty
94+
public int messagesUntilSizeChange() {
95+
return messagesUntilSizeChange;
96+
}
97+
98+
@JsonProperty
99+
public long seed() {
100+
return seed;
101+
}
102+
103+
@Override
104+
public synchronized byte[] generate(long position) {
105+
// Make the random number generator deterministic for unit tests.
106+
random.setSeed(seed + position);
107+
108+
// Calculate the next message size based on a gaussian distribution.
109+
if ((messageSize == 0) || (messageTracker >= messagesUntilSizeChange)) {
110+
messageTracker = 0;
111+
messageSize = Math.max((int) (random.nextGaussian() * messageSizeDeviation) + messageSizeAverage, Long.BYTES);
112+
}
113+
messageTracker += 1;
114+
115+
// Generate the byte array before the timestamp generation.
116+
byte[] result = new byte[messageSize];
117+
118+
// Do the timestamp generation as the very last task.
119+
buffer.clear();
120+
buffer.putLong(Time.SYSTEM.milliseconds());
121+
buffer.rewind();
122+
System.arraycopy(buffer.array(), 0, result, 0, Long.BYTES);
123+
return result;
124+
}
125+
}

trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858

5959
public class GaussianTimestampRandomPayloadGenerator implements PayloadGenerator {
6060
private final int messageSizeAverage;
61-
private final int messageSizeDeviation;
61+
private final double messageSizeDeviation;
6262
private final int messagesUntilSizeChange;
6363
private final long seed;
6464

@@ -70,7 +70,7 @@ public class GaussianTimestampRandomPayloadGenerator implements PayloadGenerator
7070

7171
@JsonCreator
7272
public GaussianTimestampRandomPayloadGenerator(@JsonProperty("messageSizeAverage") int messageSizeAverage,
73-
@JsonProperty("messageSizeDeviation") int messageSizeDeviation,
73+
@JsonProperty("messageSizeDeviation") double messageSizeDeviation,
7474
@JsonProperty("messagesUntilSizeChange") int messagesUntilSizeChange,
7575
@JsonProperty("seed") long seed) {
7676
this.messageSizeAverage = messageSizeAverage;
@@ -87,10 +87,15 @@ public int messageSizeAverage() {
8787
}
8888

8989
@JsonProperty
90-
public long messageSizeDeviation() {
90+
public double messageSizeDeviation() {
9191
return messageSizeDeviation;
9292
}
9393

94+
@JsonProperty
95+
public int messagesUntilSizeChange() {
96+
return messagesUntilSizeChange;
97+
}
98+
9499
@JsonProperty
95100
public long seed() {
96101
return seed;

0 commit comments

Comments
 (0)