Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37596][metrics] Close metric group of a finished split #26388

Merged
merged 1 commit into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class InternalSourceSplitMetricGroup extends ProxyMetricGroup<MetricGroup
private static final String WATERMARK = "watermark";
private static final long SPLIT_NOT_STARTED = -1L;
private long splitStartTime = SPLIT_NOT_STARTED;
private final MetricGroup splitWatermarkMetricGroup;

private InternalSourceSplitMetricGroup(
MetricGroup parentMetricGroup,
Expand All @@ -55,8 +56,7 @@ private InternalSourceSplitMetricGroup(
Gauge<Long> currentWatermark) {
super(parentMetricGroup);
this.clock = clock;
MetricGroup splitWatermarkMetricGroup =
parentMetricGroup.addGroup(SPLIT, splitId).addGroup(WATERMARK);
splitWatermarkMetricGroup = parentMetricGroup.addGroup(SPLIT, splitId).addGroup(WATERMARK);
pausedTimePerSecond =
splitWatermarkMetricGroup.gauge(
MetricNames.SPLIT_PAUSED_TIME, new TimerGauge(clock));
Expand Down Expand Up @@ -191,4 +191,21 @@ public Boolean isIdle() {
public Boolean isActive() {
return !isPaused() && !isIdle();
}

public void onSplitFinished() {
if (splitWatermarkMetricGroup instanceof AbstractMetricGroup) {
((AbstractMetricGroup) splitWatermarkMetricGroup).close();
} else {
if (splitWatermarkMetricGroup != null) {
LOG.warn(
"Split watermark metric group can not be closed, expecting an instance of AbstractMetricGroup but got: ",
splitWatermarkMetricGroup.getClass().getName());
}
}
}

@VisibleForTesting
public MetricGroup getSplitWatermarkMetricGroup() {
return splitWatermarkMetricGroup;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,7 @@ public void updateCurrentSplitIdle(String splitId, boolean idle) {
@Override
public void splitFinished(String splitId) {
splitCurrentWatermarks.remove(splitId);
getOrCreateSplitMetricGroup(splitId).onSplitFinished();
this.splitMetricGroups.remove(splitId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Licensed to the Apache Software Foundation (ASF) under one
package org.apache.flink.streaming.api.operators;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
Expand All @@ -28,8 +29,15 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.source.event.AddSplitEvent;
Expand All @@ -55,10 +63,14 @@ Licensed to the Apache Software Foundation (ASF) under one
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.assertj.core.api.Assertions.assertThat;

/** Unit test for split alignment in {@link SourceOperator}. */
Expand Down Expand Up @@ -306,6 +318,44 @@ void testSplitWatermarkAlignmentWithFinishedSplit() throws Exception {
assertThat(sourceReader.getPausedSplits()).isEmpty();
}

@Test
void testMetricGroupIsClosedForFinishedSplitAndMetricsAreUnregistered() throws Exception {
long idleTimeout = 100;
Collection<String> expectedMetricNames =
Arrays.asList(
MetricNames.SPLIT_IDLE_TIME,
MetricNames.ACC_SPLIT_IDLE_TIME,
MetricNames.SPLIT_ACTIVE_TIME,
MetricNames.ACC_SPLIT_ACTIVE_TIME,
MetricNames.SPLIT_PAUSED_TIME,
MetricNames.ACC_SPLIT_PAUSED_TIME,
MetricNames.SPLIT_CURRENT_WATERMARK);
final Map<String, Metric> registry = new ConcurrentHashMap<>();
MockSourceReader sourceReader =
new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true);
TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
SourceOperator<Integer, MockSourceSplit> operator =
createAndOpenSourceOperatorWithIdlenessAndRegistry(
sourceReader, processingTimeService, idleTimeout, registry);

MockSourceSplit split0 = new MockSourceSplit(0, 0, 1);
split0.addRecord(5);

operator.handleOperatorEvent(
new AddSplitEvent<>(Arrays.asList(split0), new MockSourceSplitSerializer()));
CollectingDataOutput<Integer> dataOutput = new CollectingDataOutput<>();
AbstractMetricGroup metricGroup =
(AbstractMetricGroup)
operator.getSplitMetricGroup(split0.splitId())
.getSplitWatermarkMetricGroup();
expectedMetricNames.forEach(metric -> assertThat(registry.containsKey(metric)).isTrue());
while (operator.emitNext(dataOutput) == DataInputStatus.MORE_AVAILABLE) {
// split0 emits records until finished/released
}
assertThat(metricGroup.isClosed()).isTrue();
expectedMetricNames.forEach(metric -> assertThat(registry.containsKey(metric)).isFalse());
}

@Test
void testStateReportingForMultiSplitWatermarkAlignmentAndIdleness() throws Exception {
long idleTimeout = 100;
Expand Down Expand Up @@ -448,7 +498,37 @@ private SourceOperator<Integer, MockSourceSplit> createAndOpenSourceOperatorWith
long idleTimeout)
throws Exception {

Environment env = getTestingEnvironment();
return createAndOpenSourceOperatorWithIdlenessAndEnv(
sourceReader, processingTimeService, idleTimeout, getTestingEnvironment());
}

private SourceOperator<Integer, MockSourceSplit>
createAndOpenSourceOperatorWithIdlenessAndRegistry(
MockSourceReader sourceReader,
TestProcessingTimeService processingTimeService,
long idleTimeout,
Map<String, Metric> registry)
throws Exception {

StreamMockEnvironment env = getTestingEnvironment();
TaskMetricGroup metricGroup =
TaskManagerMetricGroup.createTaskManagerMetricGroup(
new TestMetricRegistry(registry),
"localhost",
ResourceID.generate())
.addJob(new JobID(), "jobName")
.addTask(createExecutionAttemptId(), "test");
env.setTaskMetricGroup(metricGroup);
return createAndOpenSourceOperatorWithIdlenessAndEnv(
sourceReader, processingTimeService, idleTimeout, env);
}

private SourceOperator<Integer, MockSourceSplit> createAndOpenSourceOperatorWithIdlenessAndEnv(
MockSourceReader sourceReader,
TestProcessingTimeService processingTimeService,
long idleTimeout,
Environment env)
throws Exception {
SourceOperator<Integer, MockSourceSplit> operator =
new TestingSourceOperator<>(
new StreamOperatorParameters<>(
Expand All @@ -474,7 +554,7 @@ private SourceOperator<Integer, MockSourceSplit> createAndOpenSourceOperatorWith
return operator;
}

private Environment getTestingEnvironment() {
private StreamMockEnvironment getTestingEnvironment() {
return new StreamMockEnvironment(
new Configuration(),
new Configuration(),
Expand Down Expand Up @@ -529,4 +609,26 @@ public AnyWatermark() {
"any watermark");
}
}

/** The metric registry for storing the registered metrics to verify in tests. */
static class TestMetricRegistry extends NoOpMetricRegistry {
private final Map<String, Metric> metrics;

TestMetricRegistry(Map<String, Metric> metrics) {
super();
this.metrics = metrics;
}

@Override
public void register(Metric metric, String metricName, AbstractMetricGroup<?> group) {
metrics.put(metricName, metric);
}

@Override
public void unregister(Metric metric, String metricName, AbstractMetricGroup<?> group) {
if (metrics.get(metricName) != null) {
metrics.remove(metricName);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
Expand Down Expand Up @@ -103,7 +102,6 @@ public TestingSourceOperator(

this.subtaskIndex = subtaskIndex;
this.parallelism = parallelism;
this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initSourceMetricGroup();

// unchecked wrapping is okay to keep tests simpler
Expand Down