Skip to content

Commit d200f79

Browse files
committed
[FLINK-37603] QueryOperation SQL Serialization customization
1 parent 39aa79e commit d200f79

File tree

75 files changed

+852
-300
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+852
-300
lines changed

flink-python/pyflink/table/tests/test_environment_settings_completeness.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def java_class(cls):
3838
def excluded_methods(cls):
3939
# internal interfaces, no need to expose to users.
4040
return {'getPlanner', 'getExecutor', 'getUserClassLoader', 'getCatalogStore',
41-
'toConfiguration', 'fromConfiguration'}
41+
'toConfiguration', 'fromConfiguration', 'getOperationSerializationContext'}
4242

4343

4444
class EnvironmentSettingsBuilderCompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase):
@@ -58,7 +58,8 @@ def java_class(cls):
5858
@classmethod
5959
def excluded_methods(cls):
6060
# internal interfaces, no need to expose to users.
61-
return {'withClassLoader', 'withCatalogStore'}
61+
# withOperationSerializationContext - needs to be implemented
62+
return {'withClassLoader', 'withCatalogStore', 'withOperationSerializationContext'}
6263

6364
if __name__ == '__main__':
6465
import unittest

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java

+27-6
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,12 @@
2424
import org.apache.flink.table.api.config.TableConfigOptions;
2525
import org.apache.flink.table.catalog.CatalogStore;
2626
import org.apache.flink.table.functions.UserDefinedFunction;
27+
import org.apache.flink.table.operations.SerializationContext;
2728

2829
import javax.annotation.Nullable;
2930

31+
import java.util.Optional;
32+
3033
import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
3134
import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
3235
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
@@ -62,16 +65,17 @@ public class EnvironmentSettings {
6265
private final ClassLoader classLoader;
6366

6467
private final @Nullable CatalogStore catalogStore;
65-
66-
private EnvironmentSettings(Configuration configuration, ClassLoader classLoader) {
67-
this(configuration, classLoader, null);
68-
}
68+
private final @Nullable SerializationContext serializationContext;
6969

7070
private EnvironmentSettings(
71-
Configuration configuration, ClassLoader classLoader, CatalogStore catalogStore) {
71+
Configuration configuration,
72+
ClassLoader classLoader,
73+
CatalogStore catalogStore,
74+
SerializationContext serializationContext) {
7275
this.configuration = configuration;
7376
this.classLoader = classLoader;
7477
this.catalogStore = catalogStore;
78+
this.serializationContext = serializationContext;
7579
}
7680

7781
/**
@@ -145,6 +149,11 @@ public CatalogStore getCatalogStore() {
145149
return catalogStore;
146150
}
147151

152+
@Internal
153+
public Optional<SerializationContext> getOperationSerializationContext() {
154+
return Optional.ofNullable(serializationContext);
155+
}
156+
148157
/** A builder for {@link EnvironmentSettings}. */
149158
@PublicEvolving
150159
public static class Builder {
@@ -153,6 +162,7 @@ public static class Builder {
153162
private ClassLoader classLoader;
154163

155164
private @Nullable CatalogStore catalogStore;
165+
private @Nullable SerializationContext serializationContext;
156166

157167
public Builder() {}
158168

@@ -232,12 +242,23 @@ public Builder withCatalogStore(CatalogStore catalogStore) {
232242
return this;
233243
}
234244

245+
/**
246+
* Provides a way to customize the process of serializing operations to an SQL string. This
247+
* is useful, for example, for customizing the serialization of inline functions.
248+
*/
249+
public Builder withOperationSerializationContext(
250+
SerializationContext serializationContext) {
251+
this.serializationContext = serializationContext;
252+
return this;
253+
}
254+
235255
/** Returns an immutable instance of {@link EnvironmentSettings}. */
236256
public EnvironmentSettings build() {
237257
if (classLoader == null) {
238258
classLoader = Thread.currentThread().getContextClassLoader();
239259
}
240-
return new EnvironmentSettings(configuration, classLoader, catalogStore);
260+
return new EnvironmentSettings(
261+
configuration, classLoader, catalogStore, serializationContext);
241262
}
242263
}
243264
}

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.table.catalog.ResolvedSchema;
2424
import org.apache.flink.table.connector.sink.DynamicTableSink;
2525
import org.apache.flink.table.connector.source.DynamicTableSource;
26+
import org.apache.flink.table.expressions.DefaultSerializationContext;
2627
import org.apache.flink.table.expressions.Expression;
2728
import org.apache.flink.table.functions.ProcessTableFunction;
2829
import org.apache.flink.table.functions.TableFunction;
@@ -102,7 +103,8 @@ public interface Table extends Explainable<Table>, Executable {
102103
*/
103104
@Deprecated
104105
default TableSchema getSchema() {
105-
return TableSchema.fromResolvedSchema(getResolvedSchema());
106+
return TableSchema.fromResolvedSchema(
107+
getResolvedSchema(), new DefaultSerializationContext());
106108
}
107109

108110
/** Returns the resolved schema of this table. */

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.flink.table.catalog.ResolvedCatalogTable;
3131
import org.apache.flink.table.catalog.TableDistribution;
3232
import org.apache.flink.table.catalog.UniqueConstraint;
33+
import org.apache.flink.table.operations.SerializationContext;
34+
import org.apache.flink.table.operations.SerializationContextAdapters;
3335
import org.apache.flink.table.utils.EncodingUtils;
3436

3537
import org.apache.commons.lang3.StringUtils;
@@ -50,7 +52,8 @@ private ShowCreateUtil() {}
5052
public static String buildShowCreateTableRow(
5153
ResolvedCatalogBaseTable<?> table,
5254
ObjectIdentifier tableIdentifier,
53-
boolean isTemporary) {
55+
boolean isTemporary,
56+
SerializationContext context) {
5457
if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
5558
throw new TableException(
5659
String.format(
@@ -61,7 +64,7 @@ public static String buildShowCreateTableRow(
6164
new StringBuilder()
6265
.append(buildCreateFormattedPrefix("TABLE", isTemporary, tableIdentifier));
6366
sb.append(extractFormattedColumns(table, PRINT_INDENT));
64-
extractFormattedWatermarkSpecs(table, PRINT_INDENT)
67+
extractFormattedWatermarkSpecs(table, PRINT_INDENT, context)
6568
.ifPresent(watermarkSpecs -> sb.append(",\n").append(watermarkSpecs));
6669
extractFormattedPrimaryKey(table, PRINT_INDENT)
6770
.ifPresent(pk -> sb.append(",\n").append(pk));
@@ -176,7 +179,7 @@ static String extractFormattedColumns(ResolvedCatalogBaseTable<?> table, String
176179
}
177180

178181
static Optional<String> extractFormattedWatermarkSpecs(
179-
ResolvedCatalogBaseTable<?> table, String printIndent) {
182+
ResolvedCatalogBaseTable<?> table, String printIndent, SerializationContext context) {
180183
if (table.getResolvedSchema().getWatermarkSpecs().isEmpty()) {
181184
return Optional.empty();
182185
}
@@ -191,7 +194,9 @@ static Optional<String> extractFormattedWatermarkSpecs(
191194
watermarkSpec.getRowtimeAttribute()),
192195
watermarkSpec
193196
.getWatermarkExpression()
194-
.asSerializableString()))
197+
.asSerializableString(
198+
SerializationContextAdapters.adapt(
199+
context))))
195200
.collect(Collectors.joining("\n")));
196201
}
197202

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import org.apache.flink.table.operations.CollectModifyOperation;
9292
import org.apache.flink.table.operations.CompileAndExecutePlanOperation;
9393
import org.apache.flink.table.operations.CreateTableASOperation;
94+
import org.apache.flink.table.operations.DefaultSerializationContext;
9495
import org.apache.flink.table.operations.DeleteFromFilterOperation;
9596
import org.apache.flink.table.operations.ExecutableOperation;
9697
import org.apache.flink.table.operations.ExplainOperation;
@@ -283,6 +284,9 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) {
283284
.config(tableConfig)
284285
.classloader(userClassLoader)
285286
.build())
287+
.operationSerializationContext(
288+
settings.getOperationSerializationContext()
289+
.orElseGet(() -> new DefaultSerializationContext()))
286290
.build();
287291

288292
final FunctionCatalog functionCatalog =
@@ -1257,7 +1261,9 @@ public Pipeline generatePipelineFromQueryOperation(
12571261
String defaultJobName = "collect";
12581262

12591263
try {
1260-
defaultJobName = operation.asSerializableString();
1264+
defaultJobName =
1265+
operation.asSerializableString(
1266+
catalogManager.getOperationSerializationContext());
12611267
} catch (Throwable e) {
12621268
// ignore error for unsupported operations and use 'collect' as default job name
12631269
}

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java

+25-2
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,10 @@
5252
import org.apache.flink.table.delegation.Planner;
5353
import org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder;
5454
import org.apache.flink.table.factories.FactoryUtil;
55+
import org.apache.flink.table.operations.DefaultSerializationContext;
5556
import org.apache.flink.table.operations.Operation;
5657
import org.apache.flink.table.operations.QueryOperation;
58+
import org.apache.flink.table.operations.SerializationContext;
5759
import org.apache.flink.util.Preconditions;
5860
import org.apache.flink.util.StringUtils;
5961

@@ -119,12 +121,15 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable {
119121

120122
private final CatalogStoreHolder catalogStoreHolder;
121123

124+
private SerializationContext serializationContext;
125+
122126
private CatalogManager(
123127
String defaultCatalogName,
124128
Catalog defaultCatalog,
125129
DataTypeFactory typeFactory,
126130
List<CatalogModificationListener> catalogModificationListeners,
127-
CatalogStoreHolder catalogStoreHolder) {
131+
CatalogStoreHolder catalogStoreHolder,
132+
SerializationContext serializationContext) {
128133
checkArgument(
129134
!StringUtils.isNullOrWhitespaceOnly(defaultCatalogName),
130135
"Default catalog name cannot be null or empty");
@@ -145,6 +150,8 @@ private CatalogManager(
145150
this.catalogModificationListeners = catalogModificationListeners;
146151

147152
this.catalogStoreHolder = catalogStoreHolder;
153+
154+
this.serializationContext = serializationContext;
148155
}
149156

150157
@VisibleForTesting
@@ -180,6 +187,8 @@ public static final class Builder {
180187
Collections.emptyList();
181188
private CatalogStoreHolder catalogStoreHolder;
182189

190+
private SerializationContext serializationContext = new DefaultSerializationContext();
191+
183192
public Builder classLoader(ClassLoader classLoader) {
184193
this.classLoader = classLoader;
185194
return this;
@@ -217,6 +226,11 @@ public Builder catalogStoreHolder(CatalogStoreHolder catalogStoreHolder) {
217226
return this;
218227
}
219228

229+
public Builder operationSerializationContext(SerializationContext serializationContext) {
230+
this.serializationContext = checkNotNull(serializationContext);
231+
return this;
232+
}
233+
220234
public CatalogManager build() {
221235
checkNotNull(classLoader, "Class loader cannot be null");
222236
checkNotNull(config, "Config cannot be null");
@@ -233,7 +247,8 @@ public CatalogManager build() {
233247
? null
234248
: executionConfig.getSerializerConfig()),
235249
catalogModificationListeners,
236-
catalogStoreHolder);
250+
catalogStoreHolder,
251+
serializationContext);
237252
}
238253
}
239254

@@ -305,6 +320,14 @@ public DataTypeFactory getDataTypeFactory() {
305320
return typeFactory;
306321
}
307322

323+
public SerializationContext getOperationSerializationContext() {
324+
return serializationContext;
325+
}
326+
327+
public void setOperationSerializationContext(SerializationContext serializationContext) {
328+
this.serializationContext = checkNotNull(serializationContext);
329+
}
330+
308331
/**
309332
* Creates a catalog under the given name. The catalog name must be unique.
310333
*

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalReferenceExpression.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public String asSummaryString() {
6464
}
6565

6666
@Override
67-
public String asSerializableString() {
67+
public String asSerializableString(SerializationContext context) {
6868
return EncodingUtils.escapeIdentifier(name);
6969
}
7070

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/TableReferenceExpression.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.table.operations.OperationUtils;
2424
import org.apache.flink.table.operations.PartitionQueryOperation;
2525
import org.apache.flink.table.operations.QueryOperation;
26+
import org.apache.flink.table.operations.SerializationContextAdapters;
2627
import org.apache.flink.table.types.DataType;
2728
import org.apache.flink.table.types.utils.DataTypeUtils;
2829
import org.apache.flink.util.Preconditions;
@@ -85,12 +86,17 @@ public List<ResolvedExpression> getResolvedChildren() {
8586
}
8687

8788
@Override
88-
public String asSerializableString() {
89+
public String asSerializableString(SerializationContext context) {
90+
final org.apache.flink.table.operations.SerializationContext
91+
operationsSerializationContext = SerializationContextAdapters.adapt(context);
8992
if (queryOperation instanceof PartitionQueryOperation) {
90-
return OperationUtils.indent(queryOperation.asSerializableString());
93+
return OperationUtils.indent(
94+
queryOperation.asSerializableString(operationsSerializationContext));
9195
}
9296
return String.format(
93-
"(%s\n)", OperationUtils.indent(queryOperation.asSerializableString()));
97+
"(%s\n)",
98+
OperationUtils.indent(
99+
queryOperation.asSerializableString(operationsSerializationContext)));
94100
}
95101

96102
@Override

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory;
3434
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
3535
import org.apache.flink.table.descriptors.DescriptorProperties;
36+
import org.apache.flink.table.expressions.DefaultSerializationContext;
3637
import org.apache.flink.table.legacy.factories.TableFactory;
3738
import org.apache.flink.table.legacy.factories.TableSinkFactory;
3839
import org.apache.flink.table.legacy.factories.TableSourceFactory;
@@ -54,7 +55,8 @@ public static <T> TableSource<T> findAndCreateTableSource(TableSourceFactory.Con
5455
try {
5556
return TableFactoryService.find(
5657
TableSourceFactory.class,
57-
((ResolvedCatalogTable) context.getTable()).toProperties())
58+
((ResolvedCatalogTable) context.getTable())
59+
.toProperties(new DefaultSerializationContext()))
5860
.createTableSource(context);
5961
} catch (Throwable t) {
6062
throw new TableException("findAndCreateTableSource failed.", t);
@@ -84,7 +86,8 @@ public static <T> TableSink<T> findAndCreateTableSink(TableSinkFactory.Context c
8486
try {
8587
return TableFactoryService.find(
8688
TableSinkFactory.class,
87-
((ResolvedCatalogTable) context.getTable()).toProperties())
89+
((ResolvedCatalogTable) context.getTable())
90+
.toProperties(new DefaultSerializationContext()))
8891
.createTableSink(context);
8992
} catch (Throwable t) {
9093
throw new TableException("findAndCreateTableSink failed.", t);

0 commit comments

Comments
 (0)