Skip to content

Commit 7a70912

Browse files
committed
Use TableInfo for StageCreate/Create or replace/Replace
1 parent 73a3aba commit 7a70912

File tree

5 files changed

+112
-86
lines changed

5 files changed

+112
-86
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java

Lines changed: 66 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,14 @@
3434

3535
/**
3636
* An optional mix-in for implementations of {@link TableCatalog} that support staging creation of
37-
* the a table before committing the table's metadata along with its contents in CREATE TABLE AS
37+
* a table before committing the table's metadata along with its contents in CREATE TABLE AS
3838
* SELECT or REPLACE TABLE AS SELECT operations.
3939
* <p>
4040
* It is highly recommended to implement this trait whenever possible so that CREATE TABLE AS
4141
* SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when one runs a REPLACE
4242
* TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first
4343
* drop the table via {@link TableCatalog#dropTable(Identifier)}, then create the table via
44-
* {@link TableCatalog#createTable(Identifier, Column[], Transform[], Map)}, and then perform
44+
* {@link TableCatalog#createTable(Identifier, TableInfo)}, and then perform
4545
* the write via {@link SupportsWrite#newWriteBuilder(LogicalWriteInfo)}.
4646
* However, if the write operation fails, the catalog will have already dropped the table, and the
4747
* planner cannot roll back the dropping of the table.
@@ -72,6 +72,21 @@ default StagedTable stageCreate(
7272
throw QueryCompilationErrors.mustOverrideOneMethodError("stageCreate");
7373
}
7474

75+
/**
76+
* Stage the creation of a table, preparing it to be committed into the metastore.
77+
* <p>
78+
* @deprecated This is deprecated. Please override
79+
* {@link #stageCreate(Identifier, TableInfo)} instead.
80+
*/
81+
@Deprecated(since = "4.1.0")
82+
default StagedTable stageCreate(
83+
Identifier ident,
84+
Column[] columns,
85+
Transform[] partitions,
86+
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException {
87+
return stageCreate(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
88+
}
89+
7590
/**
7691
* Stage the creation of a table, preparing it to be committed into the metastore.
7792
* <p>
@@ -82,21 +97,16 @@ default StagedTable stageCreate(
8297
* committed, an exception should be thrown by {@link StagedTable#commitStagedChanges()}.
8398
*
8499
* @param ident a table identifier
85-
* @param columns the column of the new table
86-
* @param partitions transforms to use for partitioning data in the table
87-
* @param properties a string map of table properties
100+
* @param tableInfo information about the table
88101
* @return metadata for the new table. This can be null if the catalog does not support atomic
89102
* creation for this table. Spark will call {@link #loadTable(Identifier)} later.
90103
* @throws TableAlreadyExistsException If a table or view already exists for the identifier
91104
* @throws UnsupportedOperationException If a requested partition transform is not supported
92105
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
93106
*/
94-
default StagedTable stageCreate(
95-
Identifier ident,
96-
Column[] columns,
97-
Transform[] partitions,
98-
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException {
99-
return stageCreate(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
107+
default StagedTable stageCreate(Identifier ident, TableInfo tableInfo)
108+
throws TableAlreadyExistsException, NoSuchNamespaceException {
109+
return stageCreate(ident, tableInfo.columns(), tableInfo.partitions(), tableInfo.properties());
100110
}
101111

102112
/**
@@ -115,6 +125,23 @@ default StagedTable stageReplace(
115125
throw QueryCompilationErrors.mustOverrideOneMethodError("stageReplace");
116126
}
117127

128+
/**
129+
* Stage the replacement of a table, preparing it to be committed into the metastore when the
130+
* returned table's {@link StagedTable#commitStagedChanges()} is called.
131+
* <p>
132+
* This is deprecated, please override
133+
* {@link #stageReplace(Identifier, TableInfo)} instead.
134+
*/
135+
@Deprecated(since = "4.1.0")
136+
default StagedTable stageReplace(
137+
Identifier ident,
138+
Column[] columns,
139+
Transform[] partitions,
140+
Map<String, String> properties) throws NoSuchNamespaceException, NoSuchTableException {
141+
return stageReplace(
142+
ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
143+
}
144+
118145
/**
119146
* Stage the replacement of a table, preparing it to be committed into the metastore when the
120147
* returned table's {@link StagedTable#commitStagedChanges()} is called.
@@ -134,22 +161,16 @@ default StagedTable stageReplace(
134161
* operation.
135162
*
136163
* @param ident a table identifier
137-
* @param columns the columns of the new table
138-
* @param partitions transforms to use for partitioning data in the table
139-
* @param properties a string map of table properties
164+
* @param tableInfo information about the table
140165
* @return metadata for the new table. This can be null if the catalog does not support atomic
141166
* creation for this table. Spark will call {@link #loadTable(Identifier)} later.
142167
* @throws UnsupportedOperationException If a requested partition transform is not supported
143168
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
144169
* @throws NoSuchTableException If the table does not exist
145170
*/
146-
default StagedTable stageReplace(
147-
Identifier ident,
148-
Column[] columns,
149-
Transform[] partitions,
150-
Map<String, String> properties) throws NoSuchNamespaceException, NoSuchTableException {
151-
return stageReplace(
152-
ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
171+
default StagedTable stageReplace(Identifier ident, TableInfo tableInfo)
172+
throws NoSuchNamespaceException, NoSuchTableException {
173+
return stageReplace(ident, tableInfo.columns(), tableInfo.partitions(), tableInfo.properties());
153174
}
154175

155176
/**
@@ -168,6 +189,23 @@ default StagedTable stageCreateOrReplace(
168189
throw QueryCompilationErrors.mustOverrideOneMethodError("stageCreateOrReplace");
169190
}
170191

192+
/**
193+
* Stage the creation or replacement of a table, preparing it to be committed into the metastore
194+
* when the returned table's {@link StagedTable#commitStagedChanges()} is called.
195+
* <p>
196+
* This is deprecated, please override
197+
* {@link #stageCreateOrReplace(Identifier, TableInfo)} instead.
198+
*/
199+
@Deprecated(since = "4.1.0")
200+
default StagedTable stageCreateOrReplace(
201+
Identifier ident,
202+
Column[] columns,
203+
Transform[] partitions,
204+
Map<String, String> properties) throws NoSuchNamespaceException {
205+
return stageCreateOrReplace(
206+
ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
207+
}
208+
171209
/**
172210
* Stage the creation or replacement of a table, preparing it to be committed into the metastore
173211
* when the returned table's {@link StagedTable#commitStagedChanges()} is called.
@@ -186,21 +224,18 @@ default StagedTable stageCreateOrReplace(
186224
* the staged changes are committed but the table doesn't exist at commit time.
187225
*
188226
* @param ident a table identifier
189-
* @param columns the columns of the new table
190-
* @param partitions transforms to use for partitioning data in the table
191-
* @param properties a string map of table properties
227+
* @param tableInfo information about the table
192228
* @return metadata for the new table. This can be null if the catalog does not support atomic
193229
* creation for this table. Spark will call {@link #loadTable(Identifier)} later.
194230
* @throws UnsupportedOperationException If a requested partition transform is not supported
195231
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
196232
*/
197-
default StagedTable stageCreateOrReplace(
198-
Identifier ident,
199-
Column[] columns,
200-
Transform[] partitions,
201-
Map<String, String> properties) throws NoSuchNamespaceException {
202-
return stageCreateOrReplace(
203-
ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
233+
default StagedTable stageCreateOrReplace(Identifier ident, TableInfo tableInfo)
234+
throws NoSuchNamespaceException {
235+
return stageCreateOrReplace(ident,
236+
tableInfo.columns(),
237+
tableInfo.partitions(),
238+
tableInfo.properties());
204239
}
205240

206241
/**

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/StagingInMemoryTableCatalog.scala

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,40 +31,28 @@ class StagingInMemoryTableCatalog extends InMemoryTableCatalog with StagingTable
3131
import InMemoryTableCatalog._
3232
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
3333

34-
override def stageCreate(
35-
ident: Identifier,
36-
columns: Array[Column],
37-
partitions: Array[Transform],
38-
properties: util.Map[String, String]): StagedTable = {
39-
validateStagedTable(partitions, properties)
34+
override def stageCreate(ident: Identifier, tableInfo: TableInfo): StagedTable = {
35+
validateStagedTable(tableInfo.partitions, tableInfo.properties)
4036
new TestStagedCreateTable(
4137
ident,
4238
new InMemoryTable(s"$name.${ident.quoted}",
43-
CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties))
39+
tableInfo.schema(), tableInfo.partitions(), tableInfo.properties()))
4440
}
4541

46-
override def stageReplace(
47-
ident: Identifier,
48-
columns: Array[Column],
49-
partitions: Array[Transform],
50-
properties: util.Map[String, String]): StagedTable = {
51-
validateStagedTable(partitions, properties)
42+
override def stageReplace(ident: Identifier, tableInfo: TableInfo): StagedTable = {
43+
validateStagedTable(tableInfo.partitions, tableInfo.properties)
5244
new TestStagedReplaceTable(
5345
ident,
5446
new InMemoryTable(s"$name.${ident.quoted}",
55-
CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties))
47+
tableInfo.schema(), tableInfo.partitions(), tableInfo.properties()))
5648
}
5749

58-
override def stageCreateOrReplace(
59-
ident: Identifier,
60-
columns: Array[Column],
61-
partitions: Array[Transform],
62-
properties: util.Map[String, String]): StagedTable = {
63-
validateStagedTable(partitions, properties)
50+
override def stageCreateOrReplace(ident: Identifier, tableInfo: TableInfo) : StagedTable = {
51+
validateStagedTable(tableInfo.partitions, tableInfo.properties)
6452
new TestStagedCreateOrReplaceTable(
6553
ident,
6654
new InMemoryTable(s"$name.${ident.quoted}",
67-
CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties))
55+
tableInfo.schema(), tableInfo.partitions(), tableInfo.properties))
6856
}
6957

7058
private def validateStagedTable(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,20 @@ case class AtomicReplaceTableExec(
8080
invalidateCache(catalog, table, identifier)
8181
}
8282
val staged = if (orCreate) {
83-
catalog.stageCreateOrReplace(
84-
identifier, columns, partitioning.toArray, tableProperties.asJava)
83+
val tableInfo = new TableInfo.Builder()
84+
.withColumns(columns)
85+
.withPartitions(partitioning.toArray)
86+
.withProperties(tableProperties.asJava)
87+
.build()
88+
catalog.stageCreateOrReplace(identifier, tableInfo)
8589
} else if (catalog.tableExists(identifier)) {
8690
try {
87-
catalog.stageReplace(
88-
identifier, columns, partitioning.toArray, tableProperties.asJava)
91+
val tableInfo = new TableInfo.Builder()
92+
.withColumns(columns)
93+
.withPartitions(partitioning.toArray)
94+
.withProperties(tableProperties.asJava)
95+
.build()
96+
catalog.stageReplace(identifier, tableInfo)
8997
} catch {
9098
case e: NoSuchTableException =>
9199
throw QueryCompilationErrors.cannotReplaceMissingTableError(identifier, Some(e))

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,12 @@ case class AtomicCreateTableAsSelectExec(
123123
}
124124
throw QueryCompilationErrors.tableAlreadyExistsError(ident)
125125
}
126-
val stagedTable = Option(catalog.stageCreate(
127-
ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
128-
partitioning.toArray, properties.asJava)
126+
val tableInfo = new TableInfo.Builder()
127+
.withColumns(getV2Columns(query.schema, catalog.useNullableQuerySchema))
128+
.withPartitions(partitioning.toArray)
129+
.withProperties(properties.asJava)
130+
.build()
131+
val stagedTable = Option(catalog.stageCreate(ident, tableInfo)
129132
).getOrElse(catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava))
130133
writeToTable(catalog, stagedTable, writeOptions, ident, query)
131134
}
@@ -216,12 +219,20 @@ case class AtomicReplaceTableAsSelectExec(
216219
invalidateCache(catalog, table, ident)
217220
}
218221
val staged = if (orCreate) {
219-
catalog.stageCreateOrReplace(
220-
ident, columns, partitioning.toArray, properties.asJava)
222+
val tableInfo = new TableInfo.Builder()
223+
.withColumns(columns)
224+
.withPartitions(partitioning.toArray)
225+
.withProperties(properties.asJava)
226+
.build()
227+
catalog.stageCreateOrReplace(ident, tableInfo)
221228
} else if (catalog.tableExists(ident)) {
222229
try {
223-
catalog.stageReplace(
224-
ident, columns, partitioning.toArray, properties.asJava)
230+
val tableInfo = new TableInfo.Builder()
231+
.withColumns(columns)
232+
.withPartitions(partitioning.toArray)
233+
.withProperties(properties.asJava)
234+
.build()
235+
catalog.stageReplace(ident, tableInfo)
225236
} catch {
226237
case e: NoSuchTableException =>
227238
throw QueryCompilationErrors.cannotReplaceMissingTableError(ident, Some(e))

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2MetricsSuite.scala

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,9 @@
1717

1818
package org.apache.spark.sql.connector
1919

20-
import java.util
21-
2220
import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured
23-
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, InMemoryTable, InMemoryTableCatalog, StagedTable, StagingInMemoryTableCatalog}
21+
import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable, InMemoryTableCatalog, StagedTable, StagingInMemoryTableCatalog, TableInfo}
2422
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
25-
import org.apache.spark.sql.connector.expressions.Transform
2623
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomSumMetric, CustomTaskMetric}
2724
import org.apache.spark.sql.execution.SparkPlan
2825
import org.apache.spark.sql.execution.datasources.v2.{AtomicCreateTableAsSelectExec, AtomicReplaceTableAsSelectExec, AtomicReplaceTableExec, CreateTableAsSelectExec, ReplaceTableAsSelectExec, ReplaceTableExec}
@@ -54,30 +51,17 @@ class StagingInMemoryTableCatalogWithMetrics extends StagingInMemoryTableCatalog
5451
}
5552
}
5653

57-
override def stageCreate(
58-
ident: Identifier,
59-
columns: Array[Column],
60-
partitions: Array[Transform],
61-
properties: util.Map[String, String]): StagedTable = {
54+
override def stageCreate(ident: Identifier, tableInfo: TableInfo): StagedTable =
6255
new TestStagedTableWithMetric(
6356
ident,
6457
new InMemoryTable(s"$name.${ident.quoted}",
65-
CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties))
66-
}
58+
tableInfo.schema(), tableInfo.partitions(), tableInfo.properties()))
6759

68-
override def stageReplace(
69-
ident: Identifier,
70-
columns: Array[Column],
71-
partitions: Array[Transform],
72-
properties: util.Map[String, String]): StagedTable =
73-
stageCreate(ident, columns, partitions, properties)
60+
override def stageReplace(ident: Identifier, tableInfo: TableInfo): StagedTable =
61+
stageCreate(ident, tableInfo)
7462

75-
override def stageCreateOrReplace(
76-
ident: Identifier,
77-
columns: Array[Column],
78-
partitions: Array[Transform],
79-
properties: util.Map[String, String]): StagedTable =
80-
stageCreate(ident, columns, partitions, properties)
63+
override def stageCreateOrReplace(ident: Identifier, tableInfo: TableInfo) : StagedTable =
64+
stageCreate(ident, tableInfo)
8165
}
8266

8367
object StagingInMemoryTableCatalogWithMetrics {

0 commit comments

Comments
 (0)