Skip to content

[SPARK-51726][SQL] Use TableInfo for Stage CREATE/REPLACE/CREATE OR REPLACE table #50521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -34,14 +34,14 @@

/**
* An optional mix-in for implementations of {@link TableCatalog} that support staging creation of
* the a table before committing the table's metadata along with its contents in CREATE TABLE AS
* a table before committing the table's metadata along with its contents in CREATE TABLE AS
* SELECT or REPLACE TABLE AS SELECT operations.
* <p>
* It is highly recommended to implement this trait whenever possible so that CREATE TABLE AS
* SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when one runs a REPLACE
* TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first
* drop the table via {@link TableCatalog#dropTable(Identifier)}, then create the table via
* {@link TableCatalog#createTable(Identifier, Column[], Transform[], Map)}, and then perform
* {@link TableCatalog#createTable(Identifier, TableInfo)}, and then perform
* the write via {@link SupportsWrite#newWriteBuilder(LogicalWriteInfo)}.
* However, if the write operation fails, the catalog will have already dropped the table, and the
* planner cannot roll back the dropping of the table.
@@ -72,6 +72,21 @@ default StagedTable stageCreate(
throw QueryCompilationErrors.mustOverrideOneMethodError("stageCreate");
}

/**
* Stage the creation of a table, preparing it to be committed into the metastore.
* <p>
* @deprecated This is deprecated. Please override
* {@link #stageCreate(Identifier, TableInfo)} instead.
*/
@Deprecated(since = "4.1.0")
default StagedTable stageCreate(
Identifier ident,
Column[] columns,
Transform[] partitions,
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException {
return stageCreate(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
}

/**
* Stage the creation of a table, preparing it to be committed into the metastore.
* <p>
@@ -82,21 +97,16 @@ default StagedTable stageCreate(
* committed, an exception should be thrown by {@link StagedTable#commitStagedChanges()}.
*
* @param ident a table identifier
* @param columns the column of the new table
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @param tableInfo information about the table
* @return metadata for the new table. This can be null if the catalog does not support atomic
* creation for this table. Spark will call {@link #loadTable(Identifier)} later.
* @throws TableAlreadyExistsException If a table or view already exists for the identifier
* @throws UnsupportedOperationException If a requested partition transform is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
*/
default StagedTable stageCreate(
Identifier ident,
Column[] columns,
Transform[] partitions,
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException {
return stageCreate(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
default StagedTable stageCreate(Identifier ident, TableInfo tableInfo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: My personal preference would be to format it this way:

default StagedTable stageCreate(
    Identifier ident,
    TableInfo tableInfo) throws TableAlreadyExistsException, NoSuchNamespaceException {
  ...
}

throws TableAlreadyExistsException, NoSuchNamespaceException {
return stageCreate(ident, tableInfo.columns(), tableInfo.partitions(), tableInfo.properties());
}

/**
@@ -115,6 +125,23 @@ default StagedTable stageReplace(
throw QueryCompilationErrors.mustOverrideOneMethodError("stageReplace");
}

/**
* Stage the replacement of a table, preparing it to be committed into the metastore when the
* returned table's {@link StagedTable#commitStagedChanges()} is called.
* <p>
* This is deprecated, please override
* {@link #stageReplace(Identifier, TableInfo)} instead.
*/
@Deprecated(since = "4.1.0")
default StagedTable stageReplace(
Identifier ident,
Column[] columns,
Transform[] partitions,
Map<String, String> properties) throws NoSuchNamespaceException, NoSuchTableException {
return stageReplace(
ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: What about putting each argument on a separate line?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems consistent with existing code in this file

}

/**
* Stage the replacement of a table, preparing it to be committed into the metastore when the
* returned table's {@link StagedTable#commitStagedChanges()} is called.
@@ -134,22 +161,16 @@ default StagedTable stageReplace(
* operation.
*
* @param ident a table identifier
* @param columns the columns of the new table
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @param tableInfo information about the table
* @return metadata for the new table. This can be null if the catalog does not support atomic
* creation for this table. Spark will call {@link #loadTable(Identifier)} later.
* @throws UnsupportedOperationException If a requested partition transform is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
* @throws NoSuchTableException If the table does not exist
*/
default StagedTable stageReplace(
Identifier ident,
Column[] columns,
Transform[] partitions,
Map<String, String> properties) throws NoSuchNamespaceException, NoSuchTableException {
return stageReplace(
ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
default StagedTable stageReplace(Identifier ident, TableInfo tableInfo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: Same comment about formatting as in create.

throws NoSuchNamespaceException, NoSuchTableException {
return stageReplace(ident, tableInfo.columns(), tableInfo.partitions(), tableInfo.properties());
}

/**
@@ -168,6 +189,23 @@ default StagedTable stageCreateOrReplace(
throw QueryCompilationErrors.mustOverrideOneMethodError("stageCreateOrReplace");
}

/**
* Stage the creation or replacement of a table, preparing it to be committed into the metastore
* when the returned table's {@link StagedTable#commitStagedChanges()} is called.
* <p>
* This is deprecated, please override
* {@link #stageCreateOrReplace(Identifier, TableInfo)} instead.
*/
@Deprecated(since = "4.1.0")
default StagedTable stageCreateOrReplace(
Identifier ident,
Column[] columns,
Transform[] partitions,
Map<String, String> properties) throws NoSuchNamespaceException {
return stageCreateOrReplace(
ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
}

/**
* Stage the creation or replacement of a table, preparing it to be committed into the metastore
* when the returned table's {@link StagedTable#commitStagedChanges()} is called.
@@ -186,21 +224,18 @@ default StagedTable stageCreateOrReplace(
* the staged changes are committed but the table doesn't exist at commit time.
*
* @param ident a table identifier
* @param columns the columns of the new table
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @param tableInfo information about the table
* @return metadata for the new table. This can be null if the catalog does not support atomic
* creation for this table. Spark will call {@link #loadTable(Identifier)} later.
* @throws UnsupportedOperationException If a requested partition transform is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
*/
default StagedTable stageCreateOrReplace(
Identifier ident,
Column[] columns,
Transform[] partitions,
Map<String, String> properties) throws NoSuchNamespaceException {
return stageCreateOrReplace(
ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
default StagedTable stageCreateOrReplace(Identifier ident, TableInfo tableInfo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too, method + invocation.

throws NoSuchNamespaceException {
return stageCreateOrReplace(ident,
tableInfo.columns(),
tableInfo.partitions(),
tableInfo.properties());
}

/**
Original file line number Diff line number Diff line change
@@ -31,40 +31,28 @@ class StagingInMemoryTableCatalog extends InMemoryTableCatalog with StagingTable
import InMemoryTableCatalog._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override def stageCreate(
ident: Identifier,
columns: Array[Column],
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
validateStagedTable(partitions, properties)
override def stageCreate(ident: Identifier, tableInfo: TableInfo): StagedTable = {
validateStagedTable(tableInfo.partitions, tableInfo.properties)
new TestStagedCreateTable(
ident,
new InMemoryTable(s"$name.${ident.quoted}",
CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties))
tableInfo.schema(), tableInfo.partitions(), tableInfo.properties()))
}

override def stageReplace(
ident: Identifier,
columns: Array[Column],
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
validateStagedTable(partitions, properties)
override def stageReplace(ident: Identifier, tableInfo: TableInfo): StagedTable = {
validateStagedTable(tableInfo.partitions, tableInfo.properties)
new TestStagedReplaceTable(
ident,
new InMemoryTable(s"$name.${ident.quoted}",
CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties))
tableInfo.schema(), tableInfo.partitions(), tableInfo.properties()))
}

override def stageCreateOrReplace(
ident: Identifier,
columns: Array[Column],
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
validateStagedTable(partitions, properties)
override def stageCreateOrReplace(ident: Identifier, tableInfo: TableInfo) : StagedTable = {
validateStagedTable(tableInfo.partitions, tableInfo.properties)
new TestStagedCreateOrReplaceTable(
ident,
new InMemoryTable(s"$name.${ident.quoted}",
CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties))
tableInfo.schema(), tableInfo.partitions(), tableInfo.properties))
}

private def validateStagedTable(
Original file line number Diff line number Diff line change
@@ -80,12 +80,20 @@ case class AtomicReplaceTableExec(
invalidateCache(catalog, table, identifier)
}
val staged = if (orCreate) {
catalog.stageCreateOrReplace(
identifier, columns, partitioning.toArray, tableProperties.asJava)
val tableInfo = new TableInfo.Builder()
.withColumns(columns)
.withPartitions(partitioning.toArray)
.withProperties(tableProperties.asJava)
.build()
catalog.stageCreateOrReplace(identifier, tableInfo)
} else if (catalog.tableExists(identifier)) {
try {
catalog.stageReplace(
identifier, columns, partitioning.toArray, tableProperties.asJava)
val tableInfo = new TableInfo.Builder()
.withColumns(columns)
.withPartitions(partitioning.toArray)
.withProperties(tableProperties.asJava)
.build()
catalog.stageReplace(identifier, tableInfo)
} catch {
case e: NoSuchTableException =>
throw QueryCompilationErrors.cannotReplaceMissingTableError(identifier, Some(e))
Original file line number Diff line number Diff line change
@@ -123,9 +123,12 @@ case class AtomicCreateTableAsSelectExec(
}
throw QueryCompilationErrors.tableAlreadyExistsError(ident)
}
val stagedTable = Option(catalog.stageCreate(
ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
partitioning.toArray, properties.asJava)
val tableInfo = new TableInfo.Builder()
.withColumns(getV2Columns(query.schema, catalog.useNullableQuerySchema))
.withPartitions(partitioning.toArray)
.withProperties(properties.asJava)
.build()
val stagedTable = Option(catalog.stageCreate(ident, tableInfo)
).getOrElse(catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava))
writeToTable(catalog, stagedTable, writeOptions, ident, query)
}
@@ -216,12 +219,20 @@ case class AtomicReplaceTableAsSelectExec(
invalidateCache(catalog, table, ident)
}
val staged = if (orCreate) {
catalog.stageCreateOrReplace(
ident, columns, partitioning.toArray, properties.asJava)
val tableInfo = new TableInfo.Builder()
.withColumns(columns)
.withPartitions(partitioning.toArray)
.withProperties(properties.asJava)
.build()
catalog.stageCreateOrReplace(ident, tableInfo)
} else if (catalog.tableExists(ident)) {
try {
catalog.stageReplace(
ident, columns, partitioning.toArray, properties.asJava)
val tableInfo = new TableInfo.Builder()
.withColumns(columns)
.withPartitions(partitioning.toArray)
.withProperties(properties.asJava)
.build()
catalog.stageReplace(ident, tableInfo)
} catch {
case e: NoSuchTableException =>
throw QueryCompilationErrors.cannotReplaceMissingTableError(ident, Some(e))
Original file line number Diff line number Diff line change
@@ -17,12 +17,9 @@

package org.apache.spark.sql.connector

import java.util

import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, InMemoryTable, InMemoryTableCatalog, StagedTable, StagingInMemoryTableCatalog}
import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable, InMemoryTableCatalog, StagedTable, StagingInMemoryTableCatalog, TableInfo}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomSumMetric, CustomTaskMetric}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.v2.{AtomicCreateTableAsSelectExec, AtomicReplaceTableAsSelectExec, AtomicReplaceTableExec, CreateTableAsSelectExec, ReplaceTableAsSelectExec, ReplaceTableExec}
@@ -54,30 +51,17 @@ class StagingInMemoryTableCatalogWithMetrics extends StagingInMemoryTableCatalog
}
}

override def stageCreate(
ident: Identifier,
columns: Array[Column],
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
override def stageCreate(ident: Identifier, tableInfo: TableInfo): StagedTable =
new TestStagedTableWithMetric(
ident,
new InMemoryTable(s"$name.${ident.quoted}",
CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties))
}
tableInfo.schema(), tableInfo.partitions(), tableInfo.properties()))

override def stageReplace(
ident: Identifier,
columns: Array[Column],
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable =
stageCreate(ident, columns, partitions, properties)
override def stageReplace(ident: Identifier, tableInfo: TableInfo): StagedTable =
stageCreate(ident, tableInfo)

override def stageCreateOrReplace(
ident: Identifier,
columns: Array[Column],
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable =
stageCreate(ident, columns, partitions, properties)
override def stageCreateOrReplace(ident: Identifier, tableInfo: TableInfo) : StagedTable =
stageCreate(ident, tableInfo)
}

object StagingInMemoryTableCatalogWithMetrics {