Skip to content

Commit afbc07a

Browse files
committedApr 4, 2025
[FLINK-37603] QueryOperation SQL Serialization customization
1 parent 39aa79e commit afbc07a

File tree

74 files changed

+849
-298
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+849
-298
lines changed
 

‎flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java

+27-6
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,12 @@
2424
import org.apache.flink.table.api.config.TableConfigOptions;
2525
import org.apache.flink.table.catalog.CatalogStore;
2626
import org.apache.flink.table.functions.UserDefinedFunction;
27+
import org.apache.flink.table.operations.SerializationContext;
2728

2829
import javax.annotation.Nullable;
2930

31+
import java.util.Optional;
32+
3033
import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
3134
import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
3235
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
@@ -62,16 +65,17 @@ public class EnvironmentSettings {
6265
private final ClassLoader classLoader;
6366

6467
private final @Nullable CatalogStore catalogStore;
65-
66-
private EnvironmentSettings(Configuration configuration, ClassLoader classLoader) {
67-
this(configuration, classLoader, null);
68-
}
68+
private final @Nullable SerializationContext serializationContext;
6969

7070
private EnvironmentSettings(
71-
Configuration configuration, ClassLoader classLoader, CatalogStore catalogStore) {
71+
Configuration configuration,
72+
ClassLoader classLoader,
73+
CatalogStore catalogStore,
74+
SerializationContext serializationContext) {
7275
this.configuration = configuration;
7376
this.classLoader = classLoader;
7477
this.catalogStore = catalogStore;
78+
this.serializationContext = serializationContext;
7579
}
7680

7781
/**
@@ -145,6 +149,11 @@ public CatalogStore getCatalogStore() {
145149
return catalogStore;
146150
}
147151

152+
@Internal
153+
public Optional<SerializationContext> getOperationSerializationContext() {
154+
return Optional.ofNullable(serializationContext);
155+
}
156+
148157
/** A builder for {@link EnvironmentSettings}. */
149158
@PublicEvolving
150159
public static class Builder {
@@ -153,6 +162,7 @@ public static class Builder {
153162
private ClassLoader classLoader;
154163

155164
private @Nullable CatalogStore catalogStore;
165+
private @Nullable SerializationContext serializationContext;
156166

157167
public Builder() {}
158168

@@ -232,12 +242,23 @@ public Builder withCatalogStore(CatalogStore catalogStore) {
232242
return this;
233243
}
234244

245+
/**
246+
* Provides a way to customize the process of serializing operations to an SQL string. This
247+
* is useful, for example, for customizing the serialization of inline functions.
248+
*/
249+
public Builder withOperationSerializationContext(
250+
SerializationContext serializationContext) {
251+
this.serializationContext = serializationContext;
252+
return this;
253+
}
254+
235255
/** Returns an immutable instance of {@link EnvironmentSettings}. */
236256
public EnvironmentSettings build() {
237257
if (classLoader == null) {
238258
classLoader = Thread.currentThread().getContextClassLoader();
239259
}
240-
return new EnvironmentSettings(configuration, classLoader, catalogStore);
260+
return new EnvironmentSettings(
261+
configuration, classLoader, catalogStore, serializationContext);
241262
}
242263
}
243264
}

‎flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.table.catalog.ResolvedSchema;
2424
import org.apache.flink.table.connector.sink.DynamicTableSink;
2525
import org.apache.flink.table.connector.source.DynamicTableSource;
26+
import org.apache.flink.table.expressions.DefaultSerializationContext;
2627
import org.apache.flink.table.expressions.Expression;
2728
import org.apache.flink.table.functions.ProcessTableFunction;
2829
import org.apache.flink.table.functions.TableFunction;
@@ -102,7 +103,8 @@ public interface Table extends Explainable<Table>, Executable {
102103
*/
103104
@Deprecated
104105
default TableSchema getSchema() {
105-
return TableSchema.fromResolvedSchema(getResolvedSchema());
106+
return TableSchema.fromResolvedSchema(
107+
getResolvedSchema(), new DefaultSerializationContext());
106108
}
107109

108110
/** Returns the resolved schema of this table. */

0 commit comments

Comments
 (0)