From 01ea706942dc30116aefebb0718464db3c7ddeb1 Mon Sep 17 00:00:00 2001
From: Efrat Levitan <efrat890089@gmail.com>
Date: Wed, 2 Apr 2025 09:32:44 +0300
Subject: [PATCH] [FLINK-37596][metrics] Close metric group of a finished split

[FLINK-37596] Remove redundant metricGroup initialization
---
 .../InternalSourceSplitMetricGroup.java       |  21 +++-
 .../api/operators/SourceOperator.java         |   1 +
 ...ceOperatorSplitWatermarkAlignmentTest.java | 106 +++++++++++++++++-
 .../source/TestingSourceOperator.java         |   2 -
 4 files changed, 124 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
index c6007b9db6c00..706606a159811 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
@@ -47,6 +47,7 @@ public class InternalSourceSplitMetricGroup extends ProxyMetricGroup<MetricGroup
     private static final String WATERMARK = "watermark";
     private static final long SPLIT_NOT_STARTED = -1L;
     private long splitStartTime = SPLIT_NOT_STARTED;
+    private final MetricGroup splitWatermarkMetricGroup;
 
     private InternalSourceSplitMetricGroup(
             MetricGroup parentMetricGroup,
@@ -55,8 +56,7 @@ private InternalSourceSplitMetricGroup(
             Gauge<Long> currentWatermark) {
         super(parentMetricGroup);
         this.clock = clock;
-        MetricGroup splitWatermarkMetricGroup =
-                parentMetricGroup.addGroup(SPLIT, splitId).addGroup(WATERMARK);
+        splitWatermarkMetricGroup = parentMetricGroup.addGroup(SPLIT, splitId).addGroup(WATERMARK);
         pausedTimePerSecond =
                 splitWatermarkMetricGroup.gauge(
                         MetricNames.SPLIT_PAUSED_TIME, new TimerGauge(clock));
@@ -191,4 +191,21 @@ public Boolean isIdle() {
     public Boolean isActive() {
         return !isPaused() && !isIdle();
     }
+
+    public void onSplitFinished() {
+        if (splitWatermarkMetricGroup instanceof AbstractMetricGroup) {
+            ((AbstractMetricGroup) splitWatermarkMetricGroup).close();
+        } else {
+            if (splitWatermarkMetricGroup != null) {
+                LOG.warn(
+                        "Split watermark metric group can not be closed, expecting an instance of AbstractMetricGroup but got: ",
+                        splitWatermarkMetricGroup.getClass().getName());
+            }
+        }
+    }
+
+    @VisibleForTesting
+    public MetricGroup getSplitWatermarkMetricGroup() {
+        return splitWatermarkMetricGroup;
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index efef41ff46d33..b5fff8915da6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -729,6 +729,7 @@ public void updateCurrentSplitIdle(String splitId, boolean idle) {
     @Override
     public void splitFinished(String splitId) {
         splitCurrentWatermarks.remove(splitId);
+        getOrCreateSplitMetricGroup(splitId).onSplitFinished();
         this.splitMetricGroups.remove(splitId);
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
index d3226c651c2b6..060689f0a557a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
@@ -19,6 +19,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.common.eventtime.WatermarkGenerator;
 import org.apache.flink.api.common.eventtime.WatermarkOutput;
@@ -28,8 +29,15 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
@@ -55,10 +63,14 @@ Licensed to the Apache Software Foundation (ASF) under one
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit test for split alignment in {@link SourceOperator}. */
@@ -306,6 +318,44 @@ void testSplitWatermarkAlignmentWithFinishedSplit() throws Exception {
         assertThat(sourceReader.getPausedSplits()).isEmpty();
     }
 
+    @Test
+    void testMetricGroupIsClosedForFinishedSplitAndMetricsAreUnregistered() throws Exception {
+        long idleTimeout = 100;
+        Collection<String> expectedMetricNames =
+                Arrays.asList(
+                        MetricNames.SPLIT_IDLE_TIME,
+                        MetricNames.ACC_SPLIT_IDLE_TIME,
+                        MetricNames.SPLIT_ACTIVE_TIME,
+                        MetricNames.ACC_SPLIT_ACTIVE_TIME,
+                        MetricNames.SPLIT_PAUSED_TIME,
+                        MetricNames.ACC_SPLIT_PAUSED_TIME,
+                        MetricNames.SPLIT_CURRENT_WATERMARK);
+        final Map<String, Metric> registry = new ConcurrentHashMap<>();
+        MockSourceReader sourceReader =
+                new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true);
+        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+        SourceOperator<Integer, MockSourceSplit> operator =
+                createAndOpenSourceOperatorWithIdlenessAndRegistry(
+                        sourceReader, processingTimeService, idleTimeout, registry);
+
+        MockSourceSplit split0 = new MockSourceSplit(0, 0, 1);
+        split0.addRecord(5);
+
+        operator.handleOperatorEvent(
+                new AddSplitEvent<>(Arrays.asList(split0), new MockSourceSplitSerializer()));
+        CollectingDataOutput<Integer> dataOutput = new CollectingDataOutput<>();
+        AbstractMetricGroup metricGroup =
+                (AbstractMetricGroup)
+                        operator.getSplitMetricGroup(split0.splitId())
+                                .getSplitWatermarkMetricGroup();
+        expectedMetricNames.forEach(metric -> assertThat(registry.containsKey(metric)).isTrue());
+        while (operator.emitNext(dataOutput) == DataInputStatus.MORE_AVAILABLE) {
+            // split0 emits records until finished/released
+        }
+        assertThat(metricGroup.isClosed()).isTrue();
+        expectedMetricNames.forEach(metric -> assertThat(registry.containsKey(metric)).isFalse());
+    }
+
     @Test
     void testStateReportingForMultiSplitWatermarkAlignmentAndIdleness() throws Exception {
         long idleTimeout = 100;
@@ -448,7 +498,37 @@ private SourceOperator<Integer, MockSourceSplit> createAndOpenSourceOperatorWith
             long idleTimeout)
             throws Exception {
 
-        Environment env = getTestingEnvironment();
+        return createAndOpenSourceOperatorWithIdlenessAndEnv(
+                sourceReader, processingTimeService, idleTimeout, getTestingEnvironment());
+    }
+
+    private SourceOperator<Integer, MockSourceSplit>
+            createAndOpenSourceOperatorWithIdlenessAndRegistry(
+                    MockSourceReader sourceReader,
+                    TestProcessingTimeService processingTimeService,
+                    long idleTimeout,
+                    Map<String, Metric> registry)
+                    throws Exception {
+
+        StreamMockEnvironment env = getTestingEnvironment();
+        TaskMetricGroup metricGroup =
+                TaskManagerMetricGroup.createTaskManagerMetricGroup(
+                                new TestMetricRegistry(registry),
+                                "localhost",
+                                ResourceID.generate())
+                        .addJob(new JobID(), "jobName")
+                        .addTask(createExecutionAttemptId(), "test");
+        env.setTaskMetricGroup(metricGroup);
+        return createAndOpenSourceOperatorWithIdlenessAndEnv(
+                sourceReader, processingTimeService, idleTimeout, env);
+    }
+
+    private SourceOperator<Integer, MockSourceSplit> createAndOpenSourceOperatorWithIdlenessAndEnv(
+            MockSourceReader sourceReader,
+            TestProcessingTimeService processingTimeService,
+            long idleTimeout,
+            Environment env)
+            throws Exception {
         SourceOperator<Integer, MockSourceSplit> operator =
                 new TestingSourceOperator<>(
                         new StreamOperatorParameters<>(
@@ -474,7 +554,7 @@ private SourceOperator<Integer, MockSourceSplit> createAndOpenSourceOperatorWith
         return operator;
     }
 
-    private Environment getTestingEnvironment() {
+    private StreamMockEnvironment getTestingEnvironment() {
         return new StreamMockEnvironment(
                 new Configuration(),
                 new Configuration(),
@@ -529,4 +609,26 @@ public AnyWatermark() {
                     "any watermark");
         }
     }
+
+    /** The metric registry for storing the registered metrics to verify in tests. */
+    static class TestMetricRegistry extends NoOpMetricRegistry {
+        private final Map<String, Metric> metrics;
+
+        TestMetricRegistry(Map<String, Metric> metrics) {
+            super();
+            this.metrics = metrics;
+        }
+
+        @Override
+        public void register(Metric metric, String metricName, AbstractMetricGroup<?> group) {
+            metrics.put(metricName, metric);
+        }
+
+        @Override
+        public void unregister(Metric metric, String metricName, AbstractMetricGroup<?> group) {
+            if (metrics.get(metricName) != null) {
+                metrics.remove(metricName);
+            }
+        }
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
index 3cd58e4c8b975..491106d2f4298 100644
--- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -27,7 +27,6 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
@@ -103,7 +102,6 @@ public TestingSourceOperator(
 
         this.subtaskIndex = subtaskIndex;
         this.parallelism = parallelism;
-        this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
         initSourceMetricGroup();
 
         // unchecked wrapping is okay to keep tests simpler