Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37603] QueryOperation SQL Serialization customization #26389

Merged
merged 4 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def java_class(cls):
def excluded_methods(cls):
# internal interfaces, no need to expose to users.
return {'getPlanner', 'getExecutor', 'getUserClassLoader', 'getCatalogStore',
'toConfiguration', 'fromConfiguration'}
'toConfiguration', 'fromConfiguration', 'getSqlFactory'}


class EnvironmentSettingsBuilderCompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase):
Expand All @@ -58,7 +58,8 @@ def java_class(cls):
@classmethod
def excluded_methods(cls):
# internal interfaces, no need to expose to users.
return {'withClassLoader', 'withCatalogStore'}
# withSqlFactory - needs to be implemented
return {'withClassLoader', 'withCatalogStore', 'withSqlFactory'}

if __name__ == '__main__':
import unittest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.CatalogStore;
import org.apache.flink.table.expressions.SqlFactory;
import org.apache.flink.table.functions.UserDefinedFunction;

import javax.annotation.Nullable;

import java.util.Optional;

import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
Expand Down Expand Up @@ -62,16 +65,17 @@ public class EnvironmentSettings {
private final ClassLoader classLoader;

private final @Nullable CatalogStore catalogStore;

private EnvironmentSettings(Configuration configuration, ClassLoader classLoader) {
this(configuration, classLoader, null);
}
private final @Nullable SqlFactory sqlFactory;

private EnvironmentSettings(
Configuration configuration, ClassLoader classLoader, CatalogStore catalogStore) {
Configuration configuration,
ClassLoader classLoader,
CatalogStore catalogStore,
SqlFactory sqlFactory) {
this.configuration = configuration;
this.classLoader = classLoader;
this.catalogStore = catalogStore;
this.sqlFactory = sqlFactory;
}

/**
Expand Down Expand Up @@ -145,6 +149,11 @@ public CatalogStore getCatalogStore() {
return catalogStore;
}

@Internal
public Optional<SqlFactory> getSqlFactory() {
return Optional.ofNullable(sqlFactory);
}

/** A builder for {@link EnvironmentSettings}. */
@PublicEvolving
public static class Builder {
Expand All @@ -153,6 +162,7 @@ public static class Builder {
private ClassLoader classLoader;

private @Nullable CatalogStore catalogStore;
private @Nullable SqlFactory sqlFactory;

public Builder() {}

Expand Down Expand Up @@ -232,12 +242,21 @@ public Builder withCatalogStore(CatalogStore catalogStore) {
return this;
}

/**
* Provides a way to customize the process of serializing Table API to a SQL string. This is
* useful, for example, for customizing the serialization of inline functions.
*/
public Builder withSqlFactory(SqlFactory sqlFactory) {
this.sqlFactory = sqlFactory;
return this;
}

/** Returns an immutable instance of {@link EnvironmentSettings}. */
public EnvironmentSettings build() {
if (classLoader == null) {
classLoader = Thread.currentThread().getContextClassLoader();
}
return new EnvironmentSettings(configuration, classLoader, catalogStore);
return new EnvironmentSettings(configuration, classLoader, catalogStore, sqlFactory);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.expressions.DefaultSqlFactory;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.table.functions.TableFunction;
Expand Down Expand Up @@ -102,7 +103,7 @@ public interface Table extends Explainable<Table>, Executable {
*/
@Deprecated
default TableSchema getSchema() {
return TableSchema.fromResolvedSchema(getResolvedSchema());
return TableSchema.fromResolvedSchema(getResolvedSchema(), DefaultSqlFactory.INSTANCE);
}

/** Returns the resolved schema of this table. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.expressions.SqlFactory;
import org.apache.flink.table.utils.EncodingUtils;

import org.apache.commons.lang3.StringUtils;
Expand All @@ -50,7 +51,8 @@ private ShowCreateUtil() {}
public static String buildShowCreateTableRow(
ResolvedCatalogBaseTable<?> table,
ObjectIdentifier tableIdentifier,
boolean isTemporary) {
boolean isTemporary,
SqlFactory sqlFactory) {
if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
throw new TableException(
String.format(
Expand All @@ -61,7 +63,7 @@ public static String buildShowCreateTableRow(
new StringBuilder()
.append(buildCreateFormattedPrefix("TABLE", isTemporary, tableIdentifier));
sb.append(extractFormattedColumns(table, PRINT_INDENT));
extractFormattedWatermarkSpecs(table, PRINT_INDENT)
extractFormattedWatermarkSpecs(table, PRINT_INDENT, sqlFactory)
.ifPresent(watermarkSpecs -> sb.append(",\n").append(watermarkSpecs));
extractFormattedPrimaryKey(table, PRINT_INDENT)
.ifPresent(pk -> sb.append(",\n").append(pk));
Expand Down Expand Up @@ -176,7 +178,7 @@ static String extractFormattedColumns(ResolvedCatalogBaseTable<?> table, String
}

static Optional<String> extractFormattedWatermarkSpecs(
ResolvedCatalogBaseTable<?> table, String printIndent) {
ResolvedCatalogBaseTable<?> table, String printIndent, SqlFactory sqlFactory) {
if (table.getResolvedSchema().getWatermarkSpecs().isEmpty()) {
return Optional.empty();
}
Expand All @@ -191,7 +193,7 @@ static Optional<String> extractFormattedWatermarkSpecs(
watermarkSpec.getRowtimeAttribute()),
watermarkSpec
.getWatermarkExpression()
.asSerializableString()))
.asSerializableString(sqlFactory)))
.collect(Collectors.joining("\n")));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.execution.StagingSinkJobStatusHook;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.DefaultSqlFactory;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.TableReferenceExpression;
import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
Expand Down Expand Up @@ -283,6 +284,9 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) {
.config(tableConfig)
.classloader(userClassLoader)
.build())
.sqlFactory(
settings.getSqlFactory()
.orElseGet(() -> DefaultSqlFactory.INSTANCE))
.build();

final FunctionCatalog functionCatalog =
Expand Down Expand Up @@ -1257,7 +1261,7 @@ public Pipeline generatePipelineFromQueryOperation(
String defaultJobName = "collect";

try {
defaultJobName = operation.asSerializableString();
defaultJobName = operation.asSerializableString(catalogManager.getSqlFactory());
} catch (Throwable e) {
// ignore error for unsupported operations and use 'collect' as default job name
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.apache.flink.table.catalog.listener.DropTableEvent;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.expressions.DefaultSqlFactory;
import org.apache.flink.table.expressions.SqlFactory;
import org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.operations.Operation;
Expand Down Expand Up @@ -119,12 +121,15 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable {

private final CatalogStoreHolder catalogStoreHolder;

private SqlFactory sqlFactory;

private CatalogManager(
String defaultCatalogName,
Catalog defaultCatalog,
DataTypeFactory typeFactory,
List<CatalogModificationListener> catalogModificationListeners,
CatalogStoreHolder catalogStoreHolder) {
CatalogStoreHolder catalogStoreHolder,
SqlFactory sqlFactory) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(defaultCatalogName),
"Default catalog name cannot be null or empty");
Expand All @@ -145,6 +150,8 @@ private CatalogManager(
this.catalogModificationListeners = catalogModificationListeners;

this.catalogStoreHolder = catalogStoreHolder;

this.sqlFactory = sqlFactory;
}

@VisibleForTesting
Expand Down Expand Up @@ -180,6 +187,8 @@ public static final class Builder {
Collections.emptyList();
private CatalogStoreHolder catalogStoreHolder;

private SqlFactory sqlFactory = DefaultSqlFactory.INSTANCE;

public Builder classLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
return this;
Expand Down Expand Up @@ -217,6 +226,11 @@ public Builder catalogStoreHolder(CatalogStoreHolder catalogStoreHolder) {
return this;
}

public Builder sqlFactory(SqlFactory sqlFactory) {
this.sqlFactory = checkNotNull(sqlFactory);
return this;
}

public CatalogManager build() {
checkNotNull(classLoader, "Class loader cannot be null");
checkNotNull(config, "Config cannot be null");
Expand All @@ -233,7 +247,8 @@ public CatalogManager build() {
? null
: executionConfig.getSerializerConfig()),
catalogModificationListeners,
catalogStoreHolder);
catalogStoreHolder,
sqlFactory);
}
}

Expand Down Expand Up @@ -305,6 +320,14 @@ public DataTypeFactory getDataTypeFactory() {
return typeFactory;
}

public SqlFactory getSqlFactory() {
return sqlFactory;
}

public void setSqlFactory(SqlFactory sqlFactory) {
this.sqlFactory = checkNotNull(sqlFactory);
}

/**
* Creates a catalog under the given name. The catalog name must be unique.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public String asSummaryString() {
}

@Override
public String asSerializableString() {
public String asSerializableString(SqlFactory sqlFactory) {
return EncodingUtils.escapeIdentifier(name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ public List<ResolvedExpression> getResolvedChildren() {
}

@Override
public String asSerializableString() {
public String asSerializableString(SqlFactory sqlFactory) {
if (queryOperation instanceof PartitionQueryOperation) {
return OperationUtils.indent(queryOperation.asSerializableString());
return OperationUtils.indent(queryOperation.asSerializableString(sqlFactory));
}
return String.format(
"(%s\n)", OperationUtils.indent(queryOperation.asSerializableString()));
"(%s\n)", OperationUtils.indent(queryOperation.asSerializableString(sqlFactory)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory;
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.expressions.DefaultSqlFactory;
import org.apache.flink.table.legacy.factories.TableFactory;
import org.apache.flink.table.legacy.factories.TableSinkFactory;
import org.apache.flink.table.legacy.factories.TableSourceFactory;
Expand All @@ -54,7 +55,8 @@ public static <T> TableSource<T> findAndCreateTableSource(TableSourceFactory.Con
try {
return TableFactoryService.find(
TableSourceFactory.class,
((ResolvedCatalogTable) context.getTable()).toProperties())
((ResolvedCatalogTable) context.getTable())
.toProperties(DefaultSqlFactory.INSTANCE))
.createTableSource(context);
} catch (Throwable t) {
throw new TableException("findAndCreateTableSource failed.", t);
Expand Down Expand Up @@ -84,7 +86,8 @@ public static <T> TableSink<T> findAndCreateTableSink(TableSinkFactory.Context c
try {
return TableFactoryService.find(
TableSinkFactory.class,
((ResolvedCatalogTable) context.getTable()).toProperties())
((ResolvedCatalogTable) context.getTable())
.toProperties(DefaultSqlFactory.INSTANCE))
.createTableSink(context);
} catch (Throwable t) {
throw new TableException("findAndCreateTableSink failed.", t);
Expand Down
Loading