Skip to content

Commit c2b38bc

Browse files
committed
comments addressed
1 parent 7d65e8b commit c2b38bc

File tree

68 files changed

+258
-377
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+258
-377
lines changed

flink-python/pyflink/table/tests/test_environment_settings_completeness.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def java_class(cls):
3838
def excluded_methods(cls):
3939
# internal interfaces, no need to expose to users.
4040
return {'getPlanner', 'getExecutor', 'getUserClassLoader', 'getCatalogStore',
41-
'toConfiguration', 'fromConfiguration', 'getSerializationContext'}
41+
'toConfiguration', 'fromConfiguration', 'getSqlFactory'}
4242

4343

4444
class EnvironmentSettingsBuilderCompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase):
@@ -58,8 +58,8 @@ def java_class(cls):
5858
@classmethod
5959
def excluded_methods(cls):
6060
# internal interfaces, no need to expose to users.
61-
# withSerializationContext - needs to be implemented
62-
return {'withClassLoader', 'withCatalogStore', 'withSerializationContext'}
61+
# withSqlFactory - needs to be implemented
62+
return {'withClassLoader', 'withCatalogStore', 'withSqlFactory'}
6363

6464
if __name__ == '__main__':
6565
import unittest

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java

+12-13
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
import org.apache.flink.configuration.Configuration;
2424
import org.apache.flink.table.api.config.TableConfigOptions;
2525
import org.apache.flink.table.catalog.CatalogStore;
26+
import org.apache.flink.table.expressions.SqlFactory;
2627
import org.apache.flink.table.functions.UserDefinedFunction;
27-
import org.apache.flink.table.operations.SerializationContext;
2828

2929
import javax.annotation.Nullable;
3030

@@ -65,17 +65,17 @@ public class EnvironmentSettings {
6565
private final ClassLoader classLoader;
6666

6767
private final @Nullable CatalogStore catalogStore;
68-
private final @Nullable SerializationContext serializationContext;
68+
private final @Nullable SqlFactory sqlFactory;
6969

7070
private EnvironmentSettings(
7171
Configuration configuration,
7272
ClassLoader classLoader,
7373
CatalogStore catalogStore,
74-
SerializationContext serializationContext) {
74+
SqlFactory sqlFactory) {
7575
this.configuration = configuration;
7676
this.classLoader = classLoader;
7777
this.catalogStore = catalogStore;
78-
this.serializationContext = serializationContext;
78+
this.sqlFactory = sqlFactory;
7979
}
8080

8181
/**
@@ -150,8 +150,8 @@ public CatalogStore getCatalogStore() {
150150
}
151151

152152
@Internal
153-
public Optional<SerializationContext> getSerializationContext() {
154-
return Optional.ofNullable(serializationContext);
153+
public Optional<SqlFactory> getSqlFactory() {
154+
return Optional.ofNullable(sqlFactory);
155155
}
156156

157157
/** A builder for {@link EnvironmentSettings}. */
@@ -162,7 +162,7 @@ public static class Builder {
162162
private ClassLoader classLoader;
163163

164164
private @Nullable CatalogStore catalogStore;
165-
private @Nullable SerializationContext serializationContext;
165+
private @Nullable SqlFactory sqlFactory;
166166

167167
public Builder() {}
168168

@@ -243,11 +243,11 @@ public Builder withCatalogStore(CatalogStore catalogStore) {
243243
}
244244

245245
/**
246-
* Provides a way to customize the process of serializing operations to an SQL string. This
247-
* is useful, for example, for customizing the serialization of inline functions.
246+
* Provides a way to customize the process of serializing Table API to a SQL string. This is
247+
* useful, for example, for customizing the serialization of inline functions.
248248
*/
249-
public Builder withSerializationContext(SerializationContext serializationContext) {
250-
this.serializationContext = serializationContext;
249+
public Builder withSqlFactory(SqlFactory sqlFactory) {
250+
this.sqlFactory = sqlFactory;
251251
return this;
252252
}
253253

@@ -256,8 +256,7 @@ public EnvironmentSettings build() {
256256
if (classLoader == null) {
257257
classLoader = Thread.currentThread().getContextClassLoader();
258258
}
259-
return new EnvironmentSettings(
260-
configuration, classLoader, catalogStore, serializationContext);
259+
return new EnvironmentSettings(configuration, classLoader, catalogStore, sqlFactory);
261260
}
262261
}
263262
}

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.flink.table.catalog.ResolvedSchema;
2424
import org.apache.flink.table.connector.sink.DynamicTableSink;
2525
import org.apache.flink.table.connector.source.DynamicTableSource;
26-
import org.apache.flink.table.expressions.DefaultSerializationContext;
26+
import org.apache.flink.table.expressions.DefaultSqlFactory;
2727
import org.apache.flink.table.expressions.Expression;
2828
import org.apache.flink.table.functions.ProcessTableFunction;
2929
import org.apache.flink.table.functions.TableFunction;
@@ -103,8 +103,7 @@ public interface Table extends Explainable<Table>, Executable {
103103
*/
104104
@Deprecated
105105
default TableSchema getSchema() {
106-
return TableSchema.fromResolvedSchema(
107-
getResolvedSchema(), new DefaultSerializationContext());
106+
return TableSchema.fromResolvedSchema(getResolvedSchema(), new DefaultSqlFactory());
108107
}
109108

110109
/** Returns the resolved schema of this table. */

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java

+5-8
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@
3030
import org.apache.flink.table.catalog.ResolvedCatalogTable;
3131
import org.apache.flink.table.catalog.TableDistribution;
3232
import org.apache.flink.table.catalog.UniqueConstraint;
33-
import org.apache.flink.table.operations.SerializationContext;
34-
import org.apache.flink.table.operations.SerializationContextAdapters;
33+
import org.apache.flink.table.expressions.SqlFactory;
3534
import org.apache.flink.table.utils.EncodingUtils;
3635

3736
import org.apache.commons.lang3.StringUtils;
@@ -53,7 +52,7 @@ public static String buildShowCreateTableRow(
5352
ResolvedCatalogBaseTable<?> table,
5453
ObjectIdentifier tableIdentifier,
5554
boolean isTemporary,
56-
SerializationContext context) {
55+
SqlFactory sqlFactory) {
5756
if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
5857
throw new TableException(
5958
String.format(
@@ -64,7 +63,7 @@ public static String buildShowCreateTableRow(
6463
new StringBuilder()
6564
.append(buildCreateFormattedPrefix("TABLE", isTemporary, tableIdentifier));
6665
sb.append(extractFormattedColumns(table, PRINT_INDENT));
67-
extractFormattedWatermarkSpecs(table, PRINT_INDENT, context)
66+
extractFormattedWatermarkSpecs(table, PRINT_INDENT, sqlFactory)
6867
.ifPresent(watermarkSpecs -> sb.append(",\n").append(watermarkSpecs));
6968
extractFormattedPrimaryKey(table, PRINT_INDENT)
7069
.ifPresent(pk -> sb.append(",\n").append(pk));
@@ -179,7 +178,7 @@ static String extractFormattedColumns(ResolvedCatalogBaseTable<?> table, String
179178
}
180179

181180
static Optional<String> extractFormattedWatermarkSpecs(
182-
ResolvedCatalogBaseTable<?> table, String printIndent, SerializationContext context) {
181+
ResolvedCatalogBaseTable<?> table, String printIndent, SqlFactory sqlFactory) {
183182
if (table.getResolvedSchema().getWatermarkSpecs().isEmpty()) {
184183
return Optional.empty();
185184
}
@@ -194,9 +193,7 @@ static Optional<String> extractFormattedWatermarkSpecs(
194193
watermarkSpec.getRowtimeAttribute()),
195194
watermarkSpec
196195
.getWatermarkExpression()
197-
.asSerializableString(
198-
SerializationContextAdapters.adapt(
199-
context))))
196+
.asSerializableString(sqlFactory)))
200197
.collect(Collectors.joining("\n")));
201198
}
202199

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import org.apache.flink.table.delegation.Planner;
7474
import org.apache.flink.table.execution.StagingSinkJobStatusHook;
7575
import org.apache.flink.table.expressions.ApiExpressionUtils;
76+
import org.apache.flink.table.expressions.DefaultSqlFactory;
7677
import org.apache.flink.table.expressions.Expression;
7778
import org.apache.flink.table.expressions.TableReferenceExpression;
7879
import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
@@ -91,7 +92,6 @@
9192
import org.apache.flink.table.operations.CollectModifyOperation;
9293
import org.apache.flink.table.operations.CompileAndExecutePlanOperation;
9394
import org.apache.flink.table.operations.CreateTableASOperation;
94-
import org.apache.flink.table.operations.DefaultSerializationContext;
9595
import org.apache.flink.table.operations.DeleteFromFilterOperation;
9696
import org.apache.flink.table.operations.ExecutableOperation;
9797
import org.apache.flink.table.operations.ExplainOperation;
@@ -284,9 +284,8 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) {
284284
.config(tableConfig)
285285
.classloader(userClassLoader)
286286
.build())
287-
.serializationContext(
288-
settings.getSerializationContext()
289-
.orElseGet(() -> new DefaultSerializationContext()))
287+
.sqlFactory(
288+
settings.getSqlFactory().orElseGet(() -> new DefaultSqlFactory()))
290289
.build();
291290

292291
final FunctionCatalog functionCatalog =
@@ -1261,8 +1260,7 @@ public Pipeline generatePipelineFromQueryOperation(
12611260
String defaultJobName = "collect";
12621261

12631262
try {
1264-
defaultJobName =
1265-
operation.asSerializableString(catalogManager.getSerializationContext());
1263+
defaultJobName = operation.asSerializableString(catalogManager.getSqlFactory());
12661264
} catch (Throwable e) {
12671265
// ignore error for unsupported operations and use 'collect' as default job name
12681266
}

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java

+13-13
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,12 @@
5050
import org.apache.flink.table.catalog.listener.DropTableEvent;
5151
import org.apache.flink.table.delegation.Parser;
5252
import org.apache.flink.table.delegation.Planner;
53+
import org.apache.flink.table.expressions.DefaultSqlFactory;
54+
import org.apache.flink.table.expressions.SqlFactory;
5355
import org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder;
5456
import org.apache.flink.table.factories.FactoryUtil;
55-
import org.apache.flink.table.operations.DefaultSerializationContext;
5657
import org.apache.flink.table.operations.Operation;
5758
import org.apache.flink.table.operations.QueryOperation;
58-
import org.apache.flink.table.operations.SerializationContext;
5959
import org.apache.flink.util.Preconditions;
6060
import org.apache.flink.util.StringUtils;
6161

@@ -121,15 +121,15 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable {
121121

122122
private final CatalogStoreHolder catalogStoreHolder;
123123

124-
private SerializationContext serializationContext;
124+
private SqlFactory sqlFactory;
125125

126126
private CatalogManager(
127127
String defaultCatalogName,
128128
Catalog defaultCatalog,
129129
DataTypeFactory typeFactory,
130130
List<CatalogModificationListener> catalogModificationListeners,
131131
CatalogStoreHolder catalogStoreHolder,
132-
SerializationContext serializationContext) {
132+
SqlFactory sqlFactory) {
133133
checkArgument(
134134
!StringUtils.isNullOrWhitespaceOnly(defaultCatalogName),
135135
"Default catalog name cannot be null or empty");
@@ -151,7 +151,7 @@ private CatalogManager(
151151

152152
this.catalogStoreHolder = catalogStoreHolder;
153153

154-
this.serializationContext = serializationContext;
154+
this.sqlFactory = sqlFactory;
155155
}
156156

157157
@VisibleForTesting
@@ -187,7 +187,7 @@ public static final class Builder {
187187
Collections.emptyList();
188188
private CatalogStoreHolder catalogStoreHolder;
189189

190-
private SerializationContext serializationContext = new DefaultSerializationContext();
190+
private SqlFactory sqlFactory = new DefaultSqlFactory();
191191

192192
public Builder classLoader(ClassLoader classLoader) {
193193
this.classLoader = classLoader;
@@ -226,8 +226,8 @@ public Builder catalogStoreHolder(CatalogStoreHolder catalogStoreHolder) {
226226
return this;
227227
}
228228

229-
public Builder serializationContext(SerializationContext serializationContext) {
230-
this.serializationContext = checkNotNull(serializationContext);
229+
public Builder sqlFactory(SqlFactory sqlFactory) {
230+
this.sqlFactory = checkNotNull(sqlFactory);
231231
return this;
232232
}
233233

@@ -248,7 +248,7 @@ public CatalogManager build() {
248248
: executionConfig.getSerializerConfig()),
249249
catalogModificationListeners,
250250
catalogStoreHolder,
251-
serializationContext);
251+
sqlFactory);
252252
}
253253
}
254254

@@ -320,12 +320,12 @@ public DataTypeFactory getDataTypeFactory() {
320320
return typeFactory;
321321
}
322322

323-
public SerializationContext getSerializationContext() {
324-
return serializationContext;
323+
public SqlFactory getSqlFactory() {
324+
return sqlFactory;
325325
}
326326

327-
public void setSerializationContext(SerializationContext serializationContext) {
328-
this.serializationContext = checkNotNull(serializationContext);
327+
public void setSqlFactory(SqlFactory sqlFactory) {
328+
this.sqlFactory = checkNotNull(sqlFactory);
329329
}
330330

331331
/**

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalReferenceExpression.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public String asSummaryString() {
6464
}
6565

6666
@Override
67-
public String asSerializableString(SerializationContext context) {
67+
public String asSerializableString(SqlFactory sqlFactory) {
6868
return EncodingUtils.escapeIdentifier(name);
6969
}
7070

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/TableReferenceExpression.java

+3-9
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.table.operations.OperationUtils;
2424
import org.apache.flink.table.operations.PartitionQueryOperation;
2525
import org.apache.flink.table.operations.QueryOperation;
26-
import org.apache.flink.table.operations.SerializationContextAdapters;
2726
import org.apache.flink.table.types.DataType;
2827
import org.apache.flink.table.types.utils.DataTypeUtils;
2928
import org.apache.flink.util.Preconditions;
@@ -86,17 +85,12 @@ public List<ResolvedExpression> getResolvedChildren() {
8685
}
8786

8887
@Override
89-
public String asSerializableString(SerializationContext context) {
90-
final org.apache.flink.table.operations.SerializationContext
91-
operationsSerializationContext = SerializationContextAdapters.adapt(context);
88+
public String asSerializableString(SqlFactory sqlFactory) {
9289
if (queryOperation instanceof PartitionQueryOperation) {
93-
return OperationUtils.indent(
94-
queryOperation.asSerializableString(operationsSerializationContext));
90+
return OperationUtils.indent(queryOperation.asSerializableString(sqlFactory));
9591
}
9692
return String.format(
97-
"(%s\n)",
98-
OperationUtils.indent(
99-
queryOperation.asSerializableString(operationsSerializationContext)));
93+
"(%s\n)", OperationUtils.indent(queryOperation.asSerializableString(sqlFactory)));
10094
}
10195

10296
@Override

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory;
3434
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
3535
import org.apache.flink.table.descriptors.DescriptorProperties;
36-
import org.apache.flink.table.expressions.DefaultSerializationContext;
36+
import org.apache.flink.table.expressions.DefaultSqlFactory;
3737
import org.apache.flink.table.legacy.factories.TableFactory;
3838
import org.apache.flink.table.legacy.factories.TableSinkFactory;
3939
import org.apache.flink.table.legacy.factories.TableSourceFactory;
@@ -56,7 +56,7 @@ public static <T> TableSource<T> findAndCreateTableSource(TableSourceFactory.Con
5656
return TableFactoryService.find(
5757
TableSourceFactory.class,
5858
((ResolvedCatalogTable) context.getTable())
59-
.toProperties(new DefaultSerializationContext()))
59+
.toProperties(new DefaultSqlFactory()))
6060
.createTableSource(context);
6161
} catch (Throwable t) {
6262
throw new TableException("findAndCreateTableSource failed.", t);
@@ -87,7 +87,7 @@ public static <T> TableSink<T> findAndCreateTableSink(TableSinkFactory.Context c
8787
return TableFactoryService.find(
8888
TableSinkFactory.class,
8989
((ResolvedCatalogTable) context.getTable())
90-
.toProperties(new DefaultSerializationContext()))
90+
.toProperties(new DefaultSqlFactory()))
9191
.createTableSink(context);
9292
} catch (Throwable t) {
9393
throw new TableException("findAndCreateTableSink failed.", t);

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/AggregateQueryOperation.java

+6-10
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.table.catalog.ResolvedSchema;
2323
import org.apache.flink.table.expressions.ResolvedExpression;
24-
import org.apache.flink.table.expressions.SerializationContext;
24+
import org.apache.flink.table.expressions.SqlFactory;
2525
import org.apache.flink.table.operations.utils.OperationExpressionsUtils;
2626

2727
import java.util.Collections;
@@ -78,11 +78,8 @@ public List<ResolvedExpression> getAggregateExpressions() {
7878
}
7979

8080
@Override
81-
public String asSerializableString(
82-
org.apache.flink.table.operations.SerializationContext context) {
83-
final SerializationContext expressionSerializationContext =
84-
SerializationContextAdapters.adapt(context);
85-
final String groupingExprs = getGroupingExprs(expressionSerializationContext);
81+
public String asSerializableString(SqlFactory sqlFactory) {
82+
final String groupingExprs = getGroupingExprs(sqlFactory);
8683
return String.format(
8784
"SELECT %s FROM (%s\n) %s\nGROUP BY %s",
8885
Stream.concat(groupingExpressions.stream(), aggregateExpressions.stream())
@@ -92,15 +89,14 @@ public String asSerializableString(
9289
INPUT_ALIAS, expr))
9390
.map(
9491
resolvedExpression ->
95-
resolvedExpression.asSerializableString(
96-
expressionSerializationContext))
92+
resolvedExpression.asSerializableString(sqlFactory))
9793
.collect(Collectors.joining(", ")),
98-
OperationUtils.indent(child.asSerializableString(context)),
94+
OperationUtils.indent(child.asSerializableString(sqlFactory)),
9995
INPUT_ALIAS,
10096
groupingExprs);
10197
}
10298

103-
private String getGroupingExprs(SerializationContext context) {
99+
private String getGroupingExprs(SqlFactory context) {
104100
if (groupingExpressions.isEmpty()) {
105101
return "1";
106102
} else {

0 commit comments

Comments
 (0)