Skip to content

Commit bbe170a

Browse files
authored
MINOR: deprecate TaskMetadata constructor and add KIP-740 notes to upgrade guide (#10755)
Quick followup to KIP-740 to actually deprecate this constructor, and update the upgrade guide with what we changed in KIP-740. I also noticed the TaskId#parse method had been modified previously, and should be re-added to the public TaskId class. It had no tests, so now it does Reviewers: Matthias J. Sax <[email protected]>, Luke Chen <[email protected]>
1 parent 38e8391 commit bbe170a

File tree

4 files changed

+65
-0
lines changed

4 files changed

+65
-0
lines changed

docs/streams/upgrade-guide.html

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,14 @@ <h3><a id="streams_api_changes_300" href="#streams_api_changes_300">Streams API
117117
<p>
118118
We removed the default implementation of <code>RocksDBConfigSetter#close()</code>.
119119
</p>
120+
121+
<p>
122+
The public <code>topicGroupId</code> and <code>partition</code> fields on TaskId have been deprecated and replaced with getters. Please migrate to using the new <code>TaskId.subtopology()</code>
123+
(which replaces <code>topicGroupId</code>) and <code>TaskId.partition()</code> APIs instead. Also, the <code>TaskId#readFrom</code> and <code>TaskId#writeTo</code> methods have been deprecated
124+
and will be removed, as they were never intended for public use. Finally, we have deprecated the <code>TaskMetadata.taskId()</code> method as well as the <code>TaskMetadata</code> constructor.
125+
These have been replaced with APIs that better represent the task id as an actual <code>TaskId</code> object instead of a String. Please migrate to the new <code>TaskMetadata#getTaskId</code>
126+
method. See <a href="https://cwiki.apache.org/confluence/x/vYTOCg">KIP-740</a> for more details.
127+
</p>
120128
<p>
121129
We removed the following deprecated APIs:
122130
</p>

streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616
*/
1717
package org.apache.kafka.streams.processor;
1818

19+
import org.apache.kafka.streams.errors.TaskIdFormatException;
20+
1921
import java.io.DataInputStream;
2022
import java.io.DataOutputStream;
2123
import java.io.IOException;
2224
import java.nio.ByteBuffer;
2325
import java.util.Objects;
26+
2427
import org.slf4j.Logger;
2528
import org.slf4j.LoggerFactory;
2629

@@ -80,6 +83,35 @@ public String toString() {
8083
return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition;
8184
}
8285

86+
/**
87+
* @throws TaskIdFormatException if the taskIdStr is not a valid {@link TaskId}
88+
*/
89+
public static TaskId parse(final String taskIdStr) {
90+
final int firstIndex = taskIdStr.indexOf('_');
91+
final int secondIndex = taskIdStr.indexOf('_', firstIndex + 1);
92+
if (firstIndex <= 0 || firstIndex + 1 >= taskIdStr.length()) {
93+
throw new TaskIdFormatException(taskIdStr);
94+
}
95+
96+
try {
97+
// If only one copy of '_' exists, there is no named topology in the string
98+
if (secondIndex < 0) {
99+
final int topicGroupId = Integer.parseInt(taskIdStr.substring(0, firstIndex));
100+
final int partition = Integer.parseInt(taskIdStr.substring(firstIndex + 1));
101+
102+
return new TaskId(topicGroupId, partition);
103+
} else {
104+
final String namedTopology = taskIdStr.substring(0, firstIndex);
105+
final int topicGroupId = Integer.parseInt(taskIdStr.substring(firstIndex + 1, secondIndex));
106+
final int partition = Integer.parseInt(taskIdStr.substring(secondIndex + 1));
107+
108+
return new TaskId(topicGroupId, partition, namedTopology);
109+
}
110+
} catch (final Exception e) {
111+
throw new TaskIdFormatException(taskIdStr);
112+
}
113+
}
114+
83115
/**
84116
* @throws IOException if cannot write to output stream
85117
* @deprecated since 3.0, for internal use, will be removed

streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,19 @@ public class TaskMetadata {
4040

4141
private final Optional<Long> timeCurrentIdlingStarted;
4242

43+
/**
44+
* @deprecated since 3.0, not intended for public use
45+
*/
46+
@Deprecated
47+
public TaskMetadata(final String taskId,
48+
final Set<TopicPartition> topicPartitions,
49+
final Map<TopicPartition, Long> committedOffsets,
50+
final Map<TopicPartition, Long> endOffsets,
51+
final Optional<Long> timeCurrentIdlingStarted) {
52+
this(TaskId.parse(taskId), topicPartitions, committedOffsets, endOffsets, timeCurrentIdlingStarted);
53+
}
54+
55+
// For internal use -- not a public API
4356
public TaskMetadata(final TaskId taskId,
4457
final Set<TopicPartition> topicPartitions,
4558
final Map<TopicPartition, Long> committedOffsets,

streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,18 @@ private void assertPermissions(final File file) {
148148
}
149149
}
150150

151+
@Test
152+
public void shouldParseUnnamedTaskId() {
153+
final TaskId task = new TaskId(1, 0);
154+
assertThat(TaskId.parse(task.toString()), equalTo(task));
155+
}
156+
157+
@Test
158+
public void shouldParseNamedTaskId() {
159+
final TaskId task = new TaskId(1, 0, "namedTopology");
160+
assertThat(TaskId.parse(task.toString()), equalTo(task));
161+
}
162+
151163
@Test
152164
public void shouldCreateTaskStateDirectory() {
153165
final TaskId taskId = new TaskId(0, 0);

0 commit comments

Comments
 (0)