@@ -19,6 +19,7 @@ Licensed to the Apache Software Foundation (ASF) under one
19
19
package org .apache .flink .streaming .api .operators ;
20
20
21
21
import org .apache .flink .api .common .ExecutionConfig ;
22
+ import org .apache .flink .api .common .JobID ;
22
23
import org .apache .flink .api .common .eventtime .Watermark ;
23
24
import org .apache .flink .api .common .eventtime .WatermarkGenerator ;
24
25
import org .apache .flink .api .common .eventtime .WatermarkOutput ;
@@ -28,8 +29,15 @@ Licensed to the Apache Software Foundation (ASF) under one
28
29
import org .apache .flink .api .connector .source .mocks .MockSourceSplit ;
29
30
import org .apache .flink .api .connector .source .mocks .MockSourceSplitSerializer ;
30
31
import org .apache .flink .configuration .Configuration ;
32
+ import org .apache .flink .metrics .Metric ;
33
+ import org .apache .flink .runtime .clusterframework .types .ResourceID ;
31
34
import org .apache .flink .runtime .execution .Environment ;
35
+ import org .apache .flink .runtime .metrics .MetricNames ;
36
+ import org .apache .flink .runtime .metrics .NoOpMetricRegistry ;
37
+ import org .apache .flink .runtime .metrics .groups .AbstractMetricGroup ;
32
38
import org .apache .flink .runtime .metrics .groups .TaskIOMetricGroup ;
39
+ import org .apache .flink .runtime .metrics .groups .TaskManagerMetricGroup ;
40
+ import org .apache .flink .runtime .metrics .groups .TaskMetricGroup ;
33
41
import org .apache .flink .runtime .operators .coordination .MockOperatorEventGateway ;
34
42
import org .apache .flink .runtime .operators .testutils .MockInputSplitProvider ;
35
43
import org .apache .flink .runtime .source .event .AddSplitEvent ;
@@ -55,10 +63,14 @@ Licensed to the Apache Software Foundation (ASF) under one
55
63
import java .time .Duration ;
56
64
import java .util .ArrayList ;
57
65
import java .util .Arrays ;
66
+ import java .util .Collection ;
58
67
import java .util .Collections ;
59
68
import java .util .List ;
69
+ import java .util .Map ;
70
+ import java .util .concurrent .ConcurrentHashMap ;
60
71
import java .util .stream .Collectors ;
61
72
73
+ import static org .apache .flink .runtime .executiongraph .ExecutionGraphTestUtils .createExecutionAttemptId ;
62
74
import static org .assertj .core .api .Assertions .assertThat ;
63
75
64
76
/** Unit test for split alignment in {@link SourceOperator}. */
@@ -306,6 +318,44 @@ void testSplitWatermarkAlignmentWithFinishedSplit() throws Exception {
306
318
assertThat (sourceReader .getPausedSplits ()).isEmpty ();
307
319
}
308
320
321
+ @ Test
322
+ void testMetricGroupIsClosedForFinishedSplitAndMetricsAreUnregistered () throws Exception {
323
+ long idleTimeout = 100 ;
324
+ Collection <String > expectedMetricNames =
325
+ Arrays .asList (
326
+ MetricNames .SPLIT_IDLE_TIME ,
327
+ MetricNames .ACC_SPLIT_IDLE_TIME ,
328
+ MetricNames .SPLIT_ACTIVE_TIME ,
329
+ MetricNames .ACC_SPLIT_ACTIVE_TIME ,
330
+ MetricNames .SPLIT_PAUSED_TIME ,
331
+ MetricNames .ACC_SPLIT_PAUSED_TIME ,
332
+ MetricNames .SPLIT_CURRENT_WATERMARK );
333
+ final Map <String , Metric > registry = new ConcurrentHashMap <>();
334
+ MockSourceReader sourceReader =
335
+ new MockSourceReader (WaitingForSplits .DO_NOT_WAIT_FOR_SPLITS , false , true );
336
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService ();
337
+ SourceOperator <Integer , MockSourceSplit > operator =
338
+ createAndOpenSourceOperatorWithIdlenessAndRegistry (
339
+ sourceReader , processingTimeService , idleTimeout , registry );
340
+
341
+ MockSourceSplit split0 = new MockSourceSplit (0 , 0 , 1 );
342
+ split0 .addRecord (5 );
343
+
344
+ operator .handleOperatorEvent (
345
+ new AddSplitEvent <>(Arrays .asList (split0 ), new MockSourceSplitSerializer ()));
346
+ CollectingDataOutput <Integer > dataOutput = new CollectingDataOutput <>();
347
+ AbstractMetricGroup metricGroup =
348
+ (AbstractMetricGroup )
349
+ operator .getSplitMetricGroup (split0 .splitId ())
350
+ .getSplitWatermarkMetricGroup ();
351
+ expectedMetricNames .forEach (metric -> assertThat (registry .containsKey (metric )).isTrue ());
352
+ while (operator .emitNext (dataOutput ) == DataInputStatus .MORE_AVAILABLE ) {
353
+ // split0 emits records until finished/released
354
+ }
355
+ assertThat (metricGroup .isClosed ()).isTrue ();
356
+ expectedMetricNames .forEach (metric -> assertThat (registry .containsKey (metric )).isFalse ());
357
+ }
358
+
309
359
@ Test
310
360
void testStateReportingForMultiSplitWatermarkAlignmentAndIdleness () throws Exception {
311
361
long idleTimeout = 100 ;
@@ -448,7 +498,37 @@ private SourceOperator<Integer, MockSourceSplit> createAndOpenSourceOperatorWith
448
498
long idleTimeout )
449
499
throws Exception {
450
500
451
- Environment env = getTestingEnvironment ();
501
+ return createAndOpenSourceOperatorWithIdlenessAndEnv (
502
+ sourceReader , processingTimeService , idleTimeout , getTestingEnvironment ());
503
+ }
504
+
505
+ private SourceOperator <Integer , MockSourceSplit >
506
+ createAndOpenSourceOperatorWithIdlenessAndRegistry (
507
+ MockSourceReader sourceReader ,
508
+ TestProcessingTimeService processingTimeService ,
509
+ long idleTimeout ,
510
+ Map <String , Metric > registry )
511
+ throws Exception {
512
+
513
+ StreamMockEnvironment env = getTestingEnvironment ();
514
+ TaskMetricGroup metricGroup =
515
+ TaskManagerMetricGroup .createTaskManagerMetricGroup (
516
+ new TestMetricRegistry (registry ),
517
+ "localhost" ,
518
+ ResourceID .generate ())
519
+ .addJob (new JobID (), "jobName" )
520
+ .addTask (createExecutionAttemptId (), "test" );
521
+ env .setTaskMetricGroup (metricGroup );
522
+ return createAndOpenSourceOperatorWithIdlenessAndEnv (
523
+ sourceReader , processingTimeService , idleTimeout , env );
524
+ }
525
+
526
+ private SourceOperator <Integer , MockSourceSplit > createAndOpenSourceOperatorWithIdlenessAndEnv (
527
+ MockSourceReader sourceReader ,
528
+ TestProcessingTimeService processingTimeService ,
529
+ long idleTimeout ,
530
+ Environment env )
531
+ throws Exception {
452
532
SourceOperator <Integer , MockSourceSplit > operator =
453
533
new TestingSourceOperator <>(
454
534
new StreamOperatorParameters <>(
@@ -474,7 +554,7 @@ private SourceOperator<Integer, MockSourceSplit> createAndOpenSourceOperatorWith
474
554
return operator ;
475
555
}
476
556
477
- private Environment getTestingEnvironment () {
557
+ private StreamMockEnvironment getTestingEnvironment () {
478
558
return new StreamMockEnvironment (
479
559
new Configuration (),
480
560
new Configuration (),
@@ -529,4 +609,26 @@ public AnyWatermark() {
529
609
"any watermark" );
530
610
}
531
611
}
612
+
613
+ /** The metric registry for storing the registered metrics to verify in tests. */
614
+ static class TestMetricRegistry extends NoOpMetricRegistry {
615
+ private final Map <String , Metric > metrics ;
616
+
617
+ TestMetricRegistry (Map <String , Metric > metrics ) {
618
+ super ();
619
+ this .metrics = metrics ;
620
+ }
621
+
622
+ @ Override
623
+ public void register (Metric metric , String metricName , AbstractMetricGroup <?> group ) {
624
+ metrics .put (metricName , metric );
625
+ }
626
+
627
+ @ Override
628
+ public void unregister (Metric metric , String metricName , AbstractMetricGroup <?> group ) {
629
+ if (metrics .get (metricName ) != null ) {
630
+ metrics .remove (metricName );
631
+ }
632
+ }
633
+ }
532
634
}
0 commit comments