diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java index c6007b9db6c00..706606a159811 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java @@ -47,6 +47,7 @@ public class InternalSourceSplitMetricGroup extends ProxyMetricGroup currentWatermark) { super(parentMetricGroup); this.clock = clock; - MetricGroup splitWatermarkMetricGroup = - parentMetricGroup.addGroup(SPLIT, splitId).addGroup(WATERMARK); + splitWatermarkMetricGroup = parentMetricGroup.addGroup(SPLIT, splitId).addGroup(WATERMARK); pausedTimePerSecond = splitWatermarkMetricGroup.gauge( MetricNames.SPLIT_PAUSED_TIME, new TimerGauge(clock)); @@ -191,4 +191,21 @@ public Boolean isIdle() { public Boolean isActive() { return !isPaused() && !isIdle(); } + + public void onSplitFinished() { + if (splitWatermarkMetricGroup instanceof AbstractMetricGroup) { + ((AbstractMetricGroup) splitWatermarkMetricGroup).close(); + } else { + if (splitWatermarkMetricGroup != null) { + LOG.warn( + "Split watermark metric group can not be closed, expecting an instance of AbstractMetricGroup but got: ", + splitWatermarkMetricGroup.getClass().getName()); + } + } + } + + @VisibleForTesting + public MetricGroup getSplitWatermarkMetricGroup() { + return splitWatermarkMetricGroup; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index efef41ff46d33..b5fff8915da6f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -729,6 +729,7 @@ public void updateCurrentSplitIdle(String splitId, boolean idle) { @Override public void splitFinished(String splitId) { splitCurrentWatermarks.remove(splitId); + getOrCreateSplitMetricGroup(splitId).onSplitFinished(); this.splitMetricGroups.remove(splitId); } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java index d3226c651c2b6..060689f0a557a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java @@ -19,6 +19,7 @@ Licensed to the Apache Software Foundation (ASF) under one package org.apache.flink.streaming.api.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.eventtime.WatermarkGenerator; import org.apache.flink.api.common.eventtime.WatermarkOutput; @@ -28,8 +29,15 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Metric; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.source.event.AddSplitEvent; @@ -55,10 +63,14 @@ Licensed to the Apache Software Foundation (ASF) under one import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; import static org.assertj.core.api.Assertions.assertThat; /** Unit test for split alignment in {@link SourceOperator}. */ @@ -306,6 +318,44 @@ void testSplitWatermarkAlignmentWithFinishedSplit() throws Exception { assertThat(sourceReader.getPausedSplits()).isEmpty(); } + @Test + void testMetricGroupIsClosedForFinishedSplitAndMetricsAreUnregistered() throws Exception { + long idleTimeout = 100; + Collection expectedMetricNames = + Arrays.asList( + MetricNames.SPLIT_IDLE_TIME, + MetricNames.ACC_SPLIT_IDLE_TIME, + MetricNames.SPLIT_ACTIVE_TIME, + MetricNames.ACC_SPLIT_ACTIVE_TIME, + MetricNames.SPLIT_PAUSED_TIME, + MetricNames.ACC_SPLIT_PAUSED_TIME, + MetricNames.SPLIT_CURRENT_WATERMARK); + final Map registry = new ConcurrentHashMap<>(); + MockSourceReader sourceReader = + new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true); + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + SourceOperator operator = + createAndOpenSourceOperatorWithIdlenessAndRegistry( + sourceReader, processingTimeService, idleTimeout, registry); + + MockSourceSplit split0 = new MockSourceSplit(0, 0, 1); + split0.addRecord(5); + + operator.handleOperatorEvent( + new AddSplitEvent<>(Arrays.asList(split0), new MockSourceSplitSerializer())); + CollectingDataOutput dataOutput = new CollectingDataOutput<>(); + AbstractMetricGroup metricGroup = + (AbstractMetricGroup) + operator.getSplitMetricGroup(split0.splitId()) + .getSplitWatermarkMetricGroup(); + expectedMetricNames.forEach(metric -> assertThat(registry.containsKey(metric)).isTrue()); + while (operator.emitNext(dataOutput) == DataInputStatus.MORE_AVAILABLE) { + // split0 emits records until finished/released + } + assertThat(metricGroup.isClosed()).isTrue(); + expectedMetricNames.forEach(metric -> assertThat(registry.containsKey(metric)).isFalse()); + } + @Test void testStateReportingForMultiSplitWatermarkAlignmentAndIdleness() throws Exception { long idleTimeout = 100; @@ -448,7 +498,37 @@ private SourceOperator createAndOpenSourceOperatorWith long idleTimeout) throws Exception { - Environment env = getTestingEnvironment(); + return createAndOpenSourceOperatorWithIdlenessAndEnv( + sourceReader, processingTimeService, idleTimeout, getTestingEnvironment()); + } + + private SourceOperator + createAndOpenSourceOperatorWithIdlenessAndRegistry( + MockSourceReader sourceReader, + TestProcessingTimeService processingTimeService, + long idleTimeout, + Map registry) + throws Exception { + + StreamMockEnvironment env = getTestingEnvironment(); + TaskMetricGroup metricGroup = + TaskManagerMetricGroup.createTaskManagerMetricGroup( + new TestMetricRegistry(registry), + "localhost", + ResourceID.generate()) + .addJob(new JobID(), "jobName") + .addTask(createExecutionAttemptId(), "test"); + env.setTaskMetricGroup(metricGroup); + return createAndOpenSourceOperatorWithIdlenessAndEnv( + sourceReader, processingTimeService, idleTimeout, env); + } + + private SourceOperator createAndOpenSourceOperatorWithIdlenessAndEnv( + MockSourceReader sourceReader, + TestProcessingTimeService processingTimeService, + long idleTimeout, + Environment env) + throws Exception { SourceOperator operator = new TestingSourceOperator<>( new StreamOperatorParameters<>( @@ -474,7 +554,7 @@ private SourceOperator createAndOpenSourceOperatorWith return operator; } - private Environment getTestingEnvironment() { + private StreamMockEnvironment getTestingEnvironment() { return new StreamMockEnvironment( new Configuration(), new Configuration(), @@ -529,4 +609,26 @@ public AnyWatermark() { "any watermark"); } } + + /** The metric registry for storing the registered metrics to verify in tests. */ + static class TestMetricRegistry extends NoOpMetricRegistry { + private final Map metrics; + + TestMetricRegistry(Map metrics) { + super(); + this.metrics = metrics; + } + + @Override + public void register(Metric metric, String metricName, AbstractMetricGroup group) { + metrics.put(metricName, metric); + } + + @Override + public void unregister(Metric metric, String metricName, AbstractMetricGroup group) { + if (metrics.get(metricName) != null) { + metrics.remove(metricName); + } + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java index 3cd58e4c8b975..491106d2f4298 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java @@ -27,7 +27,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; @@ -103,7 +102,6 @@ public TestingSourceOperator( this.subtaskIndex = subtaskIndex; this.parallelism = parallelism; - this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); initSourceMetricGroup(); // unchecked wrapping is okay to keep tests simpler