From 855e3a8c84ad416ab284719262b03834d86e315d Mon Sep 17 00:00:00 2001
From: beliefer <beliefer@163.com>
Date: Mon, 31 Mar 2025 11:53:48 +0800
Subject: [PATCH] [FLINK-37579] Avoid duplicate generate ResourceProfile

---
 .../taskexecutor/TaskManagerRunner.java       | 16 +++++++-------
 .../taskexecutor/TaskManagerServices.java     | 13 ++++++------
 .../TaskManagerServicesConfiguration.java     | 21 +++++++++++++++++++
 ...skExecutorLocalStateStoresManagerTest.java | 10 ++++++++-
 .../taskexecutor/TaskExecutorBuilder.java     |  2 ++
 .../TaskManagerRunnerConfigurationTest.java   |  9 +++++++-
 6 files changed, 56 insertions(+), 15 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 47b71ceaa1aa1..954dcd90a3a08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -614,6 +614,13 @@ public static TaskExecutor startTaskManager(
         final TaskExecutorResourceSpec taskExecutorResourceSpec =
                 TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
 
+        TaskManagerConfiguration taskManagerConfiguration =
+                TaskManagerConfiguration.fromConfiguration(
+                        configuration,
+                        taskExecutorResourceSpec,
+                        externalAddress,
+                        workingDirectory.getTmpDirectory());
+
         TaskManagerServicesConfiguration taskManagerServicesConfiguration =
                 TaskManagerServicesConfiguration.fromConfiguration(
                         configuration,
@@ -621,6 +628,8 @@ public static TaskExecutor startTaskManager(
                         externalAddress,
                         localCommunicationOnly,
                         taskExecutorResourceSpec,
+                        taskManagerConfiguration.getDefaultSlotResourceProfile(),
+                        taskManagerConfiguration.getTotalResourceProfile(),
                         workingDirectory);
 
         Tuple2<TaskManagerMetricGroup, MetricGroup> taskManagerMetricGroup =
@@ -650,13 +659,6 @@ public static TaskExecutor startTaskManager(
                 taskManagerServices.getTaskSlotTable(),
                 taskManagerServices::getManagedMemorySize);
 
-        TaskManagerConfiguration taskManagerConfiguration =
-                TaskManagerConfiguration.fromConfiguration(
-                        configuration,
-                        taskExecutorResourceSpec,
-                        externalAddress,
-                        workingDirectory.getTmpDirectory());
-
         String metricQueryServiceAddress = metricRegistry.getMetricQueryServiceGatewayRpcAddress();
 
         return new TaskExecutor(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index e514123ee77b9..9d6a06b024629 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -25,6 +25,7 @@
 import org.apache.flink.runtime.blob.PermanentBlobService;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
 import org.apache.flink.runtime.entrypoint.WorkingDirectory;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
@@ -374,7 +375,8 @@ public static TaskManagerServices fromConfiguration(
         final TaskSlotTable<Task> taskSlotTable =
                 createTaskSlotTable(
                         taskManagerServicesConfiguration.getNumberOfSlots(),
-                        taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),
+                        taskManagerServicesConfiguration.getTotalResourceProfile(),
+                        taskManagerServicesConfiguration.getDefaultSlotResourceProfile(),
                         taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
                         taskManagerServicesConfiguration.getPageSize(),
                         ioExecutor);
@@ -467,7 +469,8 @@ public static TaskManagerServices fromConfiguration(
 
     private static TaskSlotTable<Task> createTaskSlotTable(
             final int numberOfSlots,
-            final TaskExecutorResourceSpec taskExecutorResourceSpec,
+            final ResourceProfile totalAvailableResourceProfile,
+            final ResourceProfile defaultSlotResourceProfile,
             final long timerServiceShutdownTimeout,
             final int pageSize,
             final Executor memoryVerificationExecutor) {
@@ -476,10 +479,8 @@ private static TaskSlotTable<Task> createTaskSlotTable(
                         new ScheduledThreadPoolExecutor(1), timerServiceShutdownTimeout);
         return new TaskSlotTableImpl<>(
                 numberOfSlots,
-                TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(
-                        taskExecutorResourceSpec),
-                TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(
-                        taskExecutorResourceSpec, numberOfSlots),
+                totalAvailableResourceProfile,
+                defaultSlotResourceProfile,
                 pageSize,
                 timerService,
                 memoryVerificationExecutor);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 80cc556967fab..ed87d6e9a62a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -29,6 +29,7 @@
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptionsInternal;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
 import org.apache.flink.runtime.entrypoint.WorkingDirectory;
 import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
@@ -91,6 +92,10 @@ public class TaskManagerServicesConfiguration {
 
     private final TaskExecutorResourceSpec taskExecutorResourceSpec;
 
+    private final ResourceProfile totalResourceProfile;
+
+    private final ResourceProfile defaultSlotResourceProfile;
+
     private final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder;
 
     private final String[] alwaysParentFirstLoaderPatterns;
@@ -112,6 +117,8 @@ private TaskManagerServicesConfiguration(
             int numberOfSlots,
             int pageSize,
             TaskExecutorResourceSpec taskExecutorResourceSpec,
+            ResourceProfile defaultSlotResourceProfile,
+            ResourceProfile totalAvailableResourceProfile,
             long timerServiceShutdownTimeout,
             RetryingRegistrationConfiguration retryingRegistrationConfiguration,
             Optional<Duration> systemResourceMetricsProbingInterval,
@@ -136,6 +143,8 @@ private TaskManagerServicesConfiguration(
         this.pageSize = pageSize;
 
         this.taskExecutorResourceSpec = taskExecutorResourceSpec;
+        this.defaultSlotResourceProfile = defaultSlotResourceProfile;
+        this.totalResourceProfile = totalAvailableResourceProfile;
         this.classLoaderResolveOrder = classLoaderResolveOrder;
         this.alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderPatterns;
         this.numIoThreads = numIoThreads;
@@ -213,6 +222,14 @@ public TaskExecutorResourceSpec getTaskExecutorResourceSpec() {
         return taskExecutorResourceSpec;
     }
 
+    public ResourceProfile getDefaultSlotResourceProfile() {
+        return defaultSlotResourceProfile;
+    }
+
+    public ResourceProfile getTotalResourceProfile() {
+        return totalResourceProfile;
+    }
+
     public MemorySize getNetworkMemorySize() {
         return taskExecutorResourceSpec.getNetworkMemSize();
     }
@@ -273,6 +290,8 @@ public static TaskManagerServicesConfiguration fromConfiguration(
             String externalAddress,
             boolean localCommunicationOnly,
             TaskExecutorResourceSpec taskExecutorResourceSpec,
+            ResourceProfile defaultSlotResourceProfile,
+            ResourceProfile totalAvailableResourceProfile,
             WorkingDirectory workingDirectory)
             throws Exception {
         String[] localStateRootDirs = ConfigurationUtils.parseLocalStateDirectories(configuration);
@@ -342,6 +361,8 @@ public static TaskManagerServicesConfiguration fromConfiguration(
                 ConfigurationParserUtils.getSlot(configuration),
                 ConfigurationParserUtils.getPageSize(configuration),
                 taskExecutorResourceSpec,
+                defaultSlotResourceProfile,
+                totalAvailableResourceProfile,
                 timerServiceShutdownTimeout,
                 retryingRegistrationConfiguration,
                 ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index 464b42bcc26a8..bb42a76e05a8d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -29,10 +29,12 @@
 import org.apache.flink.runtime.entrypoint.WorkingDirectory;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
 import org.apache.flink.runtime.testutils.WorkingDirectoryExtension;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
 import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Reference;
@@ -423,12 +425,18 @@ private void checkRootDirsClean(File[] rootDirs) {
 
     private TaskManagerServicesConfiguration createTaskManagerServiceConfiguration(
             Configuration config, WorkingDirectory workingDirectory) throws Exception {
+        final TaskExecutorResourceSpec taskExecutorResourceSpec =
+                TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution(config);
         return TaskManagerServicesConfiguration.fromConfiguration(
                 config,
                 ResourceID.generate(),
                 InetAddress.getLocalHost().getHostName(),
                 true,
-                TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution(config),
+                taskExecutorResourceSpec,
+                TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(
+                        taskExecutorResourceSpec, ConfigurationParserUtils.getSlot(config)),
+                TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(
+                        taskExecutorResourceSpec),
                 workingDirectory);
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java
index 73f0b7d6e9a14..66f7fe79e2bf9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java
@@ -132,6 +132,8 @@ public TaskExecutor build() throws Exception {
                             rpcService.getAddress(),
                             true,
                             taskExecutorResourceSpec,
+                            resolvedTaskManagerConfiguration.getDefaultSlotResourceProfile(),
+                            resolvedTaskManagerConfiguration.getTotalResourceProfile(),
                             workingDirectory);
             resolvedTaskManagerServices =
                     TaskManagerServices.fromConfiguration(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
index 3f68bc6bdbb31..9f49e365ff18c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
@@ -36,6 +36,7 @@
 import org.apache.flink.runtime.rpc.AddressResolution;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcSystem;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.concurrent.Executors;
 
@@ -235,12 +236,18 @@ void testNodeIdShouldBeExternalAddressIfNotExplicitlySet() throws Exception {
 
     private TaskManagerServicesConfiguration createTaskManagerServiceConfiguration(
             Configuration config) throws Exception {
+        final TaskExecutorResourceSpec taskExecutorResourceSpec =
+                TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution(config);
         return TaskManagerServicesConfiguration.fromConfiguration(
                 config,
                 ResourceID.generate(),
                 InetAddress.getLocalHost().getHostName(),
                 true,
-                TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution(config),
+                taskExecutorResourceSpec,
+                TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(
+                        taskExecutorResourceSpec, ConfigurationParserUtils.getSlot(config)),
+                TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(
+                        taskExecutorResourceSpec),
                 WorkingDirectory.create(
                         Files.createTempDirectory(temporaryFolder, UUID.randomUUID().toString())
                                 .toFile()));