From 7a70912f07eca848b11cc4ede8628d73c6809a19 Mon Sep 17 00:00:00 2001 From: Anoop Johnson Date: Fri, 4 Apr 2025 17:04:00 -0700 Subject: [PATCH] Use TableInfo for StageCreate/Create or replace/Replace --- .../catalog/StagingTableCatalog.java | 97 +++++++++++++------ .../catalog/StagingInMemoryTableCatalog.scala | 30 ++---- .../datasources/v2/ReplaceTableExec.scala | 16 ++- .../v2/WriteToDataSourceV2Exec.scala | 25 +++-- .../connector/DataSourceV2MetricsSuite.scala | 30 ++---- 5 files changed, 112 insertions(+), 86 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java index f457a4a3d7863..6811ea380b3ae 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java @@ -34,14 +34,14 @@ /** * An optional mix-in for implementations of {@link TableCatalog} that support staging creation of - * the a table before committing the table's metadata along with its contents in CREATE TABLE AS + * a table before committing the table's metadata along with its contents in CREATE TABLE AS * SELECT or REPLACE TABLE AS SELECT operations. *

* It is highly recommended to implement this trait whenever possible so that CREATE TABLE AS * SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when one runs a REPLACE * TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first * drop the table via {@link TableCatalog#dropTable(Identifier)}, then create the table via - * {@link TableCatalog#createTable(Identifier, Column[], Transform[], Map)}, and then perform + * {@link TableCatalog#createTable(Identifier, TableInfo)}, and then perform * the write via {@link SupportsWrite#newWriteBuilder(LogicalWriteInfo)}. * However, if the write operation fails, the catalog will have already dropped the table, and the * planner cannot roll back the dropping of the table. @@ -72,6 +72,21 @@ default StagedTable stageCreate( throw QueryCompilationErrors.mustOverrideOneMethodError("stageCreate"); } + /** + * Stage the creation of a table, preparing it to be committed into the metastore. + *

+ * @deprecated This is deprecated. Please override + * {@link #stageCreate(Identifier, TableInfo)} instead. + */ + @Deprecated(since = "4.1.0") + default StagedTable stageCreate( + Identifier ident, + Column[] columns, + Transform[] partitions, + Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException { + return stageCreate(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties); + } + /** * Stage the creation of a table, preparing it to be committed into the metastore. *

@@ -82,21 +97,16 @@ default StagedTable stageCreate( * committed, an exception should be thrown by {@link StagedTable#commitStagedChanges()}. * * @param ident a table identifier - * @param columns the column of the new table - * @param partitions transforms to use for partitioning data in the table - * @param properties a string map of table properties + * @param tableInfo information about the table * @return metadata for the new table. This can be null if the catalog does not support atomic * creation for this table. Spark will call {@link #loadTable(Identifier)} later. * @throws TableAlreadyExistsException If a table or view already exists for the identifier * @throws UnsupportedOperationException If a requested partition transform is not supported * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) */ - default StagedTable stageCreate( - Identifier ident, - Column[] columns, - Transform[] partitions, - Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException { - return stageCreate(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties); + default StagedTable stageCreate(Identifier ident, TableInfo tableInfo) + throws TableAlreadyExistsException, NoSuchNamespaceException { + return stageCreate(ident, tableInfo.columns(), tableInfo.partitions(), tableInfo.properties()); } /** @@ -115,6 +125,23 @@ default StagedTable stageReplace( throw QueryCompilationErrors.mustOverrideOneMethodError("stageReplace"); } + /** + * Stage the replacement of a table, preparing it to be committed into the metastore when the + * returned table's {@link StagedTable#commitStagedChanges()} is called. + *

+ * This is deprecated, please override + * {@link #stageReplace(Identifier, TableInfo)} instead. + */ + @Deprecated(since = "4.1.0") + default StagedTable stageReplace( + Identifier ident, + Column[] columns, + Transform[] partitions, + Map properties) throws NoSuchNamespaceException, NoSuchTableException { + return stageReplace( + ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties); + } + /** * Stage the replacement of a table, preparing it to be committed into the metastore when the * returned table's {@link StagedTable#commitStagedChanges()} is called. @@ -134,22 +161,16 @@ default StagedTable stageReplace( * operation. * * @param ident a table identifier - * @param columns the columns of the new table - * @param partitions transforms to use for partitioning data in the table - * @param properties a string map of table properties + * @param tableInfo information about the table * @return metadata for the new table. This can be null if the catalog does not support atomic * creation for this table. Spark will call {@link #loadTable(Identifier)} later. * @throws UnsupportedOperationException If a requested partition transform is not supported * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) * @throws NoSuchTableException If the table does not exist */ - default StagedTable stageReplace( - Identifier ident, - Column[] columns, - Transform[] partitions, - Map properties) throws NoSuchNamespaceException, NoSuchTableException { - return stageReplace( - ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties); + default StagedTable stageReplace(Identifier ident, TableInfo tableInfo) + throws NoSuchNamespaceException, NoSuchTableException { + return stageReplace(ident, tableInfo.columns(), tableInfo.partitions(), tableInfo.properties()); } /** @@ -168,6 +189,23 @@ default StagedTable stageCreateOrReplace( throw QueryCompilationErrors.mustOverrideOneMethodError("stageCreateOrReplace"); } + /** + * Stage the creation or replacement of a table, preparing it to be committed into the metastore + * when the returned table's {@link StagedTable#commitStagedChanges()} is called. + *

+ * This is deprecated, please override + * {@link #stageCreateOrReplace(Identifier, TableInfo)} instead. + */ + @Deprecated(since = "4.1.0") + default StagedTable stageCreateOrReplace( + Identifier ident, + Column[] columns, + Transform[] partitions, + Map properties) throws NoSuchNamespaceException { + return stageCreateOrReplace( + ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties); + } + /** * Stage the creation or replacement of a table, preparing it to be committed into the metastore * when the returned table's {@link StagedTable#commitStagedChanges()} is called. @@ -186,21 +224,18 @@ default StagedTable stageCreateOrReplace( * the staged changes are committed but the table doesn't exist at commit time. * * @param ident a table identifier - * @param columns the columns of the new table - * @param partitions transforms to use for partitioning data in the table - * @param properties a string map of table properties + * @param tableInfo information about the table * @return metadata for the new table. This can be null if the catalog does not support atomic * creation for this table. Spark will call {@link #loadTable(Identifier)} later. * @throws UnsupportedOperationException If a requested partition transform is not supported * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) */ - default StagedTable stageCreateOrReplace( - Identifier ident, - Column[] columns, - Transform[] partitions, - Map properties) throws NoSuchNamespaceException { - return stageCreateOrReplace( - ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties); + default StagedTable stageCreateOrReplace(Identifier ident, TableInfo tableInfo) + throws NoSuchNamespaceException { + return stageCreateOrReplace(ident, + tableInfo.columns(), + tableInfo.partitions(), + tableInfo.properties()); } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/StagingInMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/StagingInMemoryTableCatalog.scala index 2a207901b83f5..ee2400cab35c8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/StagingInMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/StagingInMemoryTableCatalog.scala @@ -31,40 +31,28 @@ class StagingInMemoryTableCatalog extends InMemoryTableCatalog with StagingTable import InMemoryTableCatalog._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - override def stageCreate( - ident: Identifier, - columns: Array[Column], - partitions: Array[Transform], - properties: util.Map[String, String]): StagedTable = { - validateStagedTable(partitions, properties) + override def stageCreate(ident: Identifier, tableInfo: TableInfo): StagedTable = { + validateStagedTable(tableInfo.partitions, tableInfo.properties) new TestStagedCreateTable( ident, new InMemoryTable(s"$name.${ident.quoted}", - CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties)) + tableInfo.schema(), tableInfo.partitions(), tableInfo.properties())) } - override def stageReplace( - ident: Identifier, - columns: Array[Column], - partitions: Array[Transform], - properties: util.Map[String, String]): StagedTable = { - validateStagedTable(partitions, properties) + override def stageReplace(ident: Identifier, tableInfo: TableInfo): StagedTable = { + validateStagedTable(tableInfo.partitions, tableInfo.properties) new TestStagedReplaceTable( ident, new InMemoryTable(s"$name.${ident.quoted}", - CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties)) + tableInfo.schema(), tableInfo.partitions(), tableInfo.properties())) } - override def stageCreateOrReplace( - ident: Identifier, - columns: Array[Column], - partitions: Array[Transform], - properties: util.Map[String, String]): StagedTable = { - validateStagedTable(partitions, properties) + override def stageCreateOrReplace(ident: Identifier, tableInfo: TableInfo) : StagedTable = { + validateStagedTable(tableInfo.partitions, tableInfo.properties) new TestStagedCreateOrReplaceTable( ident, new InMemoryTable(s"$name.${ident.quoted}", - CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties)) + tableInfo.schema(), tableInfo.partitions(), tableInfo.properties)) } private def validateStagedTable( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala index 97f604cc58ea8..51f5c848bd27b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala @@ -80,12 +80,20 @@ case class AtomicReplaceTableExec( invalidateCache(catalog, table, identifier) } val staged = if (orCreate) { - catalog.stageCreateOrReplace( - identifier, columns, partitioning.toArray, tableProperties.asJava) + val tableInfo = new TableInfo.Builder() + .withColumns(columns) + .withPartitions(partitioning.toArray) + .withProperties(tableProperties.asJava) + .build() + catalog.stageCreateOrReplace(identifier, tableInfo) } else if (catalog.tableExists(identifier)) { try { - catalog.stageReplace( - identifier, columns, partitioning.toArray, tableProperties.asJava) + val tableInfo = new TableInfo.Builder() + .withColumns(columns) + .withPartitions(partitioning.toArray) + .withProperties(tableProperties.asJava) + .build() + catalog.stageReplace(identifier, tableInfo) } catch { case e: NoSuchTableException => throw QueryCompilationErrors.cannotReplaceMissingTableError(identifier, Some(e)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 104e1bea82c13..2a4cf4003c314 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -123,9 +123,12 @@ case class AtomicCreateTableAsSelectExec( } throw QueryCompilationErrors.tableAlreadyExistsError(ident) } - val stagedTable = Option(catalog.stageCreate( - ident, getV2Columns(query.schema, catalog.useNullableQuerySchema), - partitioning.toArray, properties.asJava) + val tableInfo = new TableInfo.Builder() + .withColumns(getV2Columns(query.schema, catalog.useNullableQuerySchema)) + .withPartitions(partitioning.toArray) + .withProperties(properties.asJava) + .build() + val stagedTable = Option(catalog.stageCreate(ident, tableInfo) ).getOrElse(catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava)) writeToTable(catalog, stagedTable, writeOptions, ident, query) } @@ -216,12 +219,20 @@ case class AtomicReplaceTableAsSelectExec( invalidateCache(catalog, table, ident) } val staged = if (orCreate) { - catalog.stageCreateOrReplace( - ident, columns, partitioning.toArray, properties.asJava) + val tableInfo = new TableInfo.Builder() + .withColumns(columns) + .withPartitions(partitioning.toArray) + .withProperties(properties.asJava) + .build() + catalog.stageCreateOrReplace(ident, tableInfo) } else if (catalog.tableExists(ident)) { try { - catalog.stageReplace( - ident, columns, partitioning.toArray, properties.asJava) + val tableInfo = new TableInfo.Builder() + .withColumns(columns) + .withPartitions(partitioning.toArray) + .withProperties(properties.asJava) + .build() + catalog.stageReplace(ident, tableInfo) } catch { case e: NoSuchTableException => throw QueryCompilationErrors.cannotReplaceMissingTableError(ident, Some(e)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2MetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2MetricsSuite.scala index fe28b85528632..64cf1c769be18 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2MetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2MetricsSuite.scala @@ -17,12 +17,9 @@ package org.apache.spark.sql.connector -import java.util - import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, InMemoryTable, InMemoryTableCatalog, StagedTable, StagingInMemoryTableCatalog} +import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable, InMemoryTableCatalog, StagedTable, StagingInMemoryTableCatalog, TableInfo} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper -import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.metric.{CustomMetric, CustomSumMetric, CustomTaskMetric} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.v2.{AtomicCreateTableAsSelectExec, AtomicReplaceTableAsSelectExec, AtomicReplaceTableExec, CreateTableAsSelectExec, ReplaceTableAsSelectExec, ReplaceTableExec} @@ -54,30 +51,17 @@ class StagingInMemoryTableCatalogWithMetrics extends StagingInMemoryTableCatalog } } - override def stageCreate( - ident: Identifier, - columns: Array[Column], - partitions: Array[Transform], - properties: util.Map[String, String]): StagedTable = { + override def stageCreate(ident: Identifier, tableInfo: TableInfo): StagedTable = new TestStagedTableWithMetric( ident, new InMemoryTable(s"$name.${ident.quoted}", - CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties)) - } + tableInfo.schema(), tableInfo.partitions(), tableInfo.properties())) - override def stageReplace( - ident: Identifier, - columns: Array[Column], - partitions: Array[Transform], - properties: util.Map[String, String]): StagedTable = - stageCreate(ident, columns, partitions, properties) + override def stageReplace(ident: Identifier, tableInfo: TableInfo): StagedTable = + stageCreate(ident, tableInfo) - override def stageCreateOrReplace( - ident: Identifier, - columns: Array[Column], - partitions: Array[Transform], - properties: util.Map[String, String]): StagedTable = - stageCreate(ident, columns, partitions, properties) + override def stageCreateOrReplace(ident: Identifier, tableInfo: TableInfo) : StagedTable = + stageCreate(ident, tableInfo) } object StagingInMemoryTableCatalogWithMetrics {