Skip to content

Commit 7d65e8b

Browse files
committedApr 4, 2025
[FLINK-37603] QueryOperation SQL Serialization customization
1 parent 39aa79e commit 7d65e8b

File tree

75 files changed

+845
-300
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+845
-300
lines changed
 

‎flink-python/pyflink/table/tests/test_environment_settings_completeness.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def java_class(cls):
3838
def excluded_methods(cls):
3939
# internal interfaces, no need to expose to users.
4040
return {'getPlanner', 'getExecutor', 'getUserClassLoader', 'getCatalogStore',
41-
'toConfiguration', 'fromConfiguration'}
41+
'toConfiguration', 'fromConfiguration', 'getSerializationContext'}
4242

4343

4444
class EnvironmentSettingsBuilderCompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase):
@@ -58,7 +58,8 @@ def java_class(cls):
5858
@classmethod
5959
def excluded_methods(cls):
6060
# internal interfaces, no need to expose to users.
61-
return {'withClassLoader', 'withCatalogStore'}
61+
# withSerializationContext - needs to be implemented
62+
return {'withClassLoader', 'withCatalogStore', 'withSerializationContext'}
6263

6364
if __name__ == '__main__':
6465
import unittest

‎flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java

+26-6
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,12 @@
2424
import org.apache.flink.table.api.config.TableConfigOptions;
2525
import org.apache.flink.table.catalog.CatalogStore;
2626
import org.apache.flink.table.functions.UserDefinedFunction;
27+
import org.apache.flink.table.operations.SerializationContext;
2728

2829
import javax.annotation.Nullable;
2930

31+
import java.util.Optional;
32+
3033
import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
3134
import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
3235
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
@@ -62,16 +65,17 @@ public class EnvironmentSettings {
6265
private final ClassLoader classLoader;
6366

6467
private final @Nullable CatalogStore catalogStore;
65-
66-
private EnvironmentSettings(Configuration configuration, ClassLoader classLoader) {
67-
this(configuration, classLoader, null);
68-
}
68+
private final @Nullable SerializationContext serializationContext;
6969

7070
private EnvironmentSettings(
71-
Configuration configuration, ClassLoader classLoader, CatalogStore catalogStore) {
71+
Configuration configuration,
72+
ClassLoader classLoader,
73+
CatalogStore catalogStore,
74+
SerializationContext serializationContext) {
7275
this.configuration = configuration;
7376
this.classLoader = classLoader;
7477
this.catalogStore = catalogStore;
78+
this.serializationContext = serializationContext;
7579
}
7680

7781
/**
@@ -145,6 +149,11 @@ public CatalogStore getCatalogStore() {
145149
return catalogStore;
146150
}
147151

152+
@Internal
153+
public Optional<SerializationContext> getSerializationContext() {
154+
return Optional.ofNullable(serializationContext);
155+
}
156+
148157
/** A builder for {@link EnvironmentSettings}. */
149158
@PublicEvolving
150159
public static class Builder {
@@ -153,6 +162,7 @@ public static class Builder {
153162
private ClassLoader classLoader;
154163

155164
private @Nullable CatalogStore catalogStore;
165+
private @Nullable SerializationContext serializationContext;
156166

157167
public Builder() {}
158168

@@ -232,12 +242,22 @@ public Builder withCatalogStore(CatalogStore catalogStore) {
232242
return this;
233243
}
234244

245+
/**
246+
* Provides a way to customize the process of serializing operations to an SQL string. This
247+
* is useful, for example, for customizing the serialization of inline functions.
248+
*/
249+
public Builder withSerializationContext(SerializationContext serializationContext) {
250+
this.serializationContext = serializationContext;
251+
return this;
252+
}
253+
235254
/** Returns an immutable instance of {@link EnvironmentSettings}. */
236255
public EnvironmentSettings build() {
237256
if (classLoader == null) {
238257
classLoader = Thread.currentThread().getContextClassLoader();
239258
}
240-
return new EnvironmentSettings(configuration, classLoader, catalogStore);
259+
return new EnvironmentSettings(
260+
configuration, classLoader, catalogStore, serializationContext);
241261
}
242262
}
243263
}

0 commit comments

Comments
 (0)