Skip to content

Commit 4ac0ca8

Browse files
committed
[FLINK-37596][metrics] Close metric group of a finished split
1 parent 5aaff07 commit 4ac0ca8

File tree

3 files changed

+120
-4
lines changed

3 files changed

+120
-4
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public class InternalSourceSplitMetricGroup extends ProxyMetricGroup<MetricGroup
4747
private static final String WATERMARK = "watermark";
4848
private static final long SPLIT_NOT_STARTED = -1L;
4949
private long splitStartTime = SPLIT_NOT_STARTED;
50+
private final MetricGroup splitWatermarkMetricGroup;
5051

5152
private InternalSourceSplitMetricGroup(
5253
MetricGroup parentMetricGroup,
@@ -55,8 +56,7 @@ private InternalSourceSplitMetricGroup(
5556
Gauge<Long> currentWatermark) {
5657
super(parentMetricGroup);
5758
this.clock = clock;
58-
MetricGroup splitWatermarkMetricGroup =
59-
parentMetricGroup.addGroup(SPLIT, splitId).addGroup(WATERMARK);
59+
splitWatermarkMetricGroup = parentMetricGroup.addGroup(SPLIT, splitId).addGroup(WATERMARK);
6060
pausedTimePerSecond =
6161
splitWatermarkMetricGroup.gauge(
6262
MetricNames.SPLIT_PAUSED_TIME, new TimerGauge(clock));
@@ -191,4 +191,17 @@ public Boolean isIdle() {
191191
public Boolean isActive() {
192192
return !isPaused() && !isIdle();
193193
}
194+
195+
public void onSplitFinished() {
196+
if (splitWatermarkMetricGroup instanceof AbstractMetricGroup) {
197+
((AbstractMetricGroup) splitWatermarkMetricGroup).close();
198+
} else {
199+
LOG.warn("Split watermark metric group can not be closed");
200+
}
201+
}
202+
203+
@VisibleForTesting
204+
public MetricGroup getSplitWatermarkMetricGroup() {
205+
return splitWatermarkMetricGroup;
206+
}
194207
}

flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java

+1
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,7 @@ public void updateCurrentSplitIdle(String splitId, boolean idle) {
729729
@Override
730730
public void splitFinished(String splitId) {
731731
splitCurrentWatermarks.remove(splitId);
732+
getOrCreateSplitMetricGroup(splitId).onSplitFinished();
732733
this.splitMetricGroups.remove(splitId);
733734
}
734735

flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java

+104-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ Licensed to the Apache Software Foundation (ASF) under one
1919
package org.apache.flink.streaming.api.operators;
2020

2121
import org.apache.flink.api.common.ExecutionConfig;
22+
import org.apache.flink.api.common.JobID;
2223
import org.apache.flink.api.common.eventtime.Watermark;
2324
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
2425
import org.apache.flink.api.common.eventtime.WatermarkOutput;
@@ -28,8 +29,15 @@ Licensed to the Apache Software Foundation (ASF) under one
2829
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
2930
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
3031
import org.apache.flink.configuration.Configuration;
32+
import org.apache.flink.metrics.Metric;
33+
import org.apache.flink.runtime.clusterframework.types.ResourceID;
3134
import org.apache.flink.runtime.execution.Environment;
35+
import org.apache.flink.runtime.metrics.MetricNames;
36+
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
37+
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
3238
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
39+
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
40+
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
3341
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
3442
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
3543
import org.apache.flink.runtime.source.event.AddSplitEvent;
@@ -55,10 +63,14 @@ Licensed to the Apache Software Foundation (ASF) under one
5563
import java.time.Duration;
5664
import java.util.ArrayList;
5765
import java.util.Arrays;
66+
import java.util.Collection;
5867
import java.util.Collections;
5968
import java.util.List;
69+
import java.util.Map;
70+
import java.util.concurrent.ConcurrentHashMap;
6071
import java.util.stream.Collectors;
6172

73+
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
6274
import static org.assertj.core.api.Assertions.assertThat;
6375

6476
/** Unit test for split alignment in {@link SourceOperator}. */
@@ -306,6 +318,44 @@ void testSplitWatermarkAlignmentWithFinishedSplit() throws Exception {
306318
assertThat(sourceReader.getPausedSplits()).isEmpty();
307319
}
308320

321+
@Test
322+
void testMetricGroupIsClosedForFinishedSplitAndMetricsAreUnregistered() throws Exception {
323+
long idleTimeout = 100;
324+
Collection<String> expectedMetricNames =
325+
Arrays.asList(
326+
MetricNames.SPLIT_IDLE_TIME,
327+
MetricNames.ACC_SPLIT_IDLE_TIME,
328+
MetricNames.SPLIT_ACTIVE_TIME,
329+
MetricNames.ACC_SPLIT_ACTIVE_TIME,
330+
MetricNames.SPLIT_PAUSED_TIME,
331+
MetricNames.ACC_SPLIT_PAUSED_TIME,
332+
MetricNames.SPLIT_CURRENT_WATERMARK);
333+
final Map<String, Metric> registry = new ConcurrentHashMap<>();
334+
MockSourceReader sourceReader =
335+
new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true);
336+
TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
337+
SourceOperator<Integer, MockSourceSplit> operator =
338+
createAndOpenSourceOperatorWithIdlenessAndRegistry(
339+
sourceReader, processingTimeService, idleTimeout, registry);
340+
341+
MockSourceSplit split0 = new MockSourceSplit(0, 0, 1);
342+
split0.addRecord(5);
343+
344+
operator.handleOperatorEvent(
345+
new AddSplitEvent<>(Arrays.asList(split0), new MockSourceSplitSerializer()));
346+
CollectingDataOutput<Integer> dataOutput = new CollectingDataOutput<>();
347+
AbstractMetricGroup metricGroup =
348+
(AbstractMetricGroup)
349+
operator.getSplitMetricGroup(split0.splitId())
350+
.getSplitWatermarkMetricGroup();
351+
expectedMetricNames.forEach(metric -> assertThat(registry.containsKey(metric)).isTrue());
352+
while (operator.emitNext(dataOutput) == DataInputStatus.MORE_AVAILABLE) {
353+
// split0 emits records until finished/released
354+
}
355+
assertThat(metricGroup.isClosed()).isTrue();
356+
expectedMetricNames.forEach(metric -> assertThat(registry.containsKey(metric)).isFalse());
357+
}
358+
309359
@Test
310360
void testStateReportingForMultiSplitWatermarkAlignmentAndIdleness() throws Exception {
311361
long idleTimeout = 100;
@@ -448,7 +498,37 @@ private SourceOperator<Integer, MockSourceSplit> createAndOpenSourceOperatorWith
448498
long idleTimeout)
449499
throws Exception {
450500

451-
Environment env = getTestingEnvironment();
501+
return createAndOpenSourceOperatorWithIdlenessAndEnv(
502+
sourceReader, processingTimeService, idleTimeout, getTestingEnvironment());
503+
}
504+
505+
private SourceOperator<Integer, MockSourceSplit>
506+
createAndOpenSourceOperatorWithIdlenessAndRegistry(
507+
MockSourceReader sourceReader,
508+
TestProcessingTimeService processingTimeService,
509+
long idleTimeout,
510+
Map<String, Metric> registry)
511+
throws Exception {
512+
513+
StreamMockEnvironment env = getTestingEnvironment();
514+
TaskMetricGroup metricGroup =
515+
TaskManagerMetricGroup.createTaskManagerMetricGroup(
516+
new TestMetricRegistry(registry),
517+
"localhost",
518+
ResourceID.generate())
519+
.addJob(new JobID(), "jobName")
520+
.addTask(createExecutionAttemptId(), "test");
521+
env.setTaskMetricGroup(metricGroup);
522+
return createAndOpenSourceOperatorWithIdlenessAndEnv(
523+
sourceReader, processingTimeService, idleTimeout, env);
524+
}
525+
526+
private SourceOperator<Integer, MockSourceSplit> createAndOpenSourceOperatorWithIdlenessAndEnv(
527+
MockSourceReader sourceReader,
528+
TestProcessingTimeService processingTimeService,
529+
long idleTimeout,
530+
Environment env)
531+
throws Exception {
452532
SourceOperator<Integer, MockSourceSplit> operator =
453533
new TestingSourceOperator<>(
454534
new StreamOperatorParameters<>(
@@ -474,7 +554,7 @@ private SourceOperator<Integer, MockSourceSplit> createAndOpenSourceOperatorWith
474554
return operator;
475555
}
476556

477-
private Environment getTestingEnvironment() {
557+
private StreamMockEnvironment getTestingEnvironment() {
478558
return new StreamMockEnvironment(
479559
new Configuration(),
480560
new Configuration(),
@@ -529,4 +609,26 @@ public AnyWatermark() {
529609
"any watermark");
530610
}
531611
}
612+
613+
/** The metric registry for storing the registered metrics to verify in tests. */
614+
static class TestMetricRegistry extends NoOpMetricRegistry {
615+
private final Map<String, Metric> metrics;
616+
617+
TestMetricRegistry(Map<String, Metric> metrics) {
618+
super();
619+
this.metrics = metrics;
620+
}
621+
622+
@Override
623+
public void register(Metric metric, String metricName, AbstractMetricGroup<?> group) {
624+
metrics.put(metricName, metric);
625+
}
626+
627+
@Override
628+
public void unregister(Metric metric, String metricName, AbstractMetricGroup<?> group) {
629+
if (metrics.get(metricName) != null) {
630+
metrics.remove(metricName);
631+
}
632+
}
633+
}
532634
}

0 commit comments

Comments
 (0)