Skip to content

Commit a015798

Browse files
authored
drop schema refactored (#6096)
1 parent 2f852b3 commit a015798

File tree

10 files changed

+264
-13
lines changed

10 files changed

+264
-13
lines changed

datafusion/common/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub mod parsers;
2626
#[cfg(feature = "pyarrow")]
2727
mod pyarrow;
2828
pub mod scalar;
29+
mod schema_reference;
2930
pub mod stats;
3031
mod table_reference;
3132
pub mod test_util;
@@ -39,6 +40,7 @@ pub use error::{
3940
SharedResult,
4041
};
4142
pub use scalar::{ScalarType, ScalarValue};
43+
pub use schema_reference::{OwnedSchemaReference, SchemaReference};
4244
pub use stats::{ColumnStatistics, Statistics};
4345
pub use table_reference::{OwnedTableReference, ResolvedTableReference, TableReference};
4446

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::borrow::Cow;
19+
20+
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
21+
pub enum SchemaReference<'a> {
22+
Bare {
23+
schema: Cow<'a, str>,
24+
},
25+
Full {
26+
schema: Cow<'a, str>,
27+
catalog: Cow<'a, str>,
28+
},
29+
}
30+
31+
impl SchemaReference<'_> {
32+
/// Get only the schema name that this references.
33+
pub fn schema_name(&self) -> &str {
34+
match self {
35+
SchemaReference::Bare { schema } => schema,
36+
SchemaReference::Full { schema, catalog: _ } => schema,
37+
}
38+
}
39+
}
40+
41+
pub type OwnedSchemaReference = SchemaReference<'static>;
42+
43+
impl std::fmt::Display for SchemaReference<'_> {
44+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45+
match self {
46+
Self::Bare { schema } => write!(f, "{schema}"),
47+
Self::Full { schema, catalog } => write!(f, "{catalog}.{schema}"),
48+
}
49+
}
50+
}
51+
52+
impl<'a> From<&'a OwnedSchemaReference> for SchemaReference<'a> {
53+
fn from(value: &'a OwnedSchemaReference) -> Self {
54+
match value {
55+
SchemaReference::Bare { schema } => SchemaReference::Bare {
56+
schema: Cow::Borrowed(schema),
57+
},
58+
SchemaReference::Full { schema, catalog } => SchemaReference::Full {
59+
schema: Cow::Borrowed(schema),
60+
catalog: Cow::Borrowed(catalog),
61+
},
62+
}
63+
}
64+
}

datafusion/core/src/catalog/catalog.rs

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,26 @@ pub trait CatalogProvider: Sync + Send {
124124
"Registering new schemas is not supported".to_string(),
125125
))
126126
}
127+
128+
/// Removes a schema from this catalog. Implementations of this method should return
129+
/// errors if the schema exists but cannot be dropped. For example, in DataFusion's
130+
/// default in-memory catalog, [`MemoryCatalogProvider`], a non-empty schema
131+
/// will only be successfully dropped when `cascade` is true.
132+
/// This is equivalent to how DROP SCHEMA works in PostgreSQL.
133+
///
134+
/// Implementations of this method should return None if schema with `name`
135+
/// does not exist.
136+
///
137+
/// By default returns a "Not Implemented" error
138+
fn deregister_schema(
139+
&self,
140+
_name: &str,
141+
_cascade: bool,
142+
) -> Result<Option<Arc<dyn SchemaProvider>>> {
143+
Err(DataFusionError::NotImplemented(
144+
"Deregistering new schemas is not supported".to_string(),
145+
))
146+
}
127147
}
128148

129149
/// Simple in-memory implementation of a catalog.
@@ -160,13 +180,38 @@ impl CatalogProvider for MemoryCatalogProvider {
160180
) -> Result<Option<Arc<dyn SchemaProvider>>> {
161181
Ok(self.schemas.insert(name.into(), schema))
162182
}
183+
184+
fn deregister_schema(
185+
&self,
186+
name: &str,
187+
cascade: bool,
188+
) -> Result<Option<Arc<dyn SchemaProvider>>> {
189+
if let Some(schema) = self.schema(name) {
190+
let table_names = schema.table_names();
191+
match (table_names.is_empty(), cascade) {
192+
(true, _) | (false, true) => {
193+
let (_, removed) = self.schemas.remove(name).unwrap();
194+
Ok(Some(removed))
195+
}
196+
(false, false) => Err(DataFusionError::Execution(format!(
197+
"Cannot drop schema {} because other tables depend on it: {}",
198+
name,
199+
itertools::join(table_names.iter(), ", ")
200+
))),
201+
}
202+
} else {
203+
Ok(None)
204+
}
205+
}
163206
}
164207

165208
#[cfg(test)]
166209
mod tests {
167-
use crate::catalog::schema::MemorySchemaProvider;
168-
169210
use super::*;
211+
use crate::catalog::schema::MemorySchemaProvider;
212+
use crate::datasource::empty::EmptyTable;
213+
use crate::datasource::TableProvider;
214+
use arrow::datatypes::Schema;
170215

171216
#[test]
172217
fn default_register_schema_not_supported() {
@@ -194,4 +239,38 @@ mod tests {
194239
Err(e) => assert_eq!(e.to_string(), "This feature is not implemented: Registering new schemas is not supported"),
195240
};
196241
}
242+
243+
#[test]
244+
fn memory_catalog_dereg_nonempty_schema() {
245+
let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;
246+
247+
let schema = Arc::new(MemorySchemaProvider::new()) as Arc<dyn SchemaProvider>;
248+
let test_table = Arc::new(EmptyTable::new(Arc::new(Schema::empty())))
249+
as Arc<dyn TableProvider>;
250+
schema.register_table("t".into(), test_table).unwrap();
251+
252+
cat.register_schema("foo", schema.clone()).unwrap();
253+
254+
assert!(
255+
cat.deregister_schema("foo", false).is_err(),
256+
"dropping empty schema without cascade should error"
257+
);
258+
assert!(cat.deregister_schema("foo", true).unwrap().is_some());
259+
}
260+
261+
#[test]
262+
fn memory_catalog_dereg_empty_schema() {
263+
let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;
264+
265+
let schema = Arc::new(MemorySchemaProvider::new()) as Arc<dyn SchemaProvider>;
266+
cat.register_schema("foo", schema.clone()).unwrap();
267+
268+
assert!(cat.deregister_schema("foo", false).unwrap().is_some());
269+
}
270+
271+
#[test]
272+
fn memory_catalog_dereg_missing() {
273+
let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;
274+
assert!(cat.deregister_schema("foo", false).unwrap().is_none());
275+
}
197276
}

datafusion/core/src/execution/context.rs

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ use crate::datasource::{
6565
use crate::error::{DataFusionError, Result};
6666
use crate::logical_expr::{
6767
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
68-
CreateView, DropTable, DropView, Explain, LogicalPlan, LogicalPlanBuilder,
69-
SetVariable, TableSource, TableType, UNNAMED_TABLE,
68+
CreateView, DropCatalogSchema, DropTable, DropView, Explain, LogicalPlan,
69+
LogicalPlanBuilder, SetVariable, TableSource, TableType, UNNAMED_TABLE,
7070
};
7171
use crate::optimizer::OptimizerRule;
7272
use datafusion_sql::{planner::ParserOptions, ResolvedTableReference, TableReference};
@@ -86,7 +86,7 @@ use crate::physical_plan::PhysicalPlanner;
8686
use crate::variable::{VarProvider, VarType};
8787
use async_trait::async_trait;
8888
use chrono::{DateTime, Utc};
89-
use datafusion_common::OwnedTableReference;
89+
use datafusion_common::{OwnedTableReference, SchemaReference};
9090
use datafusion_sql::{
9191
parser::DFParser,
9292
planner::{ContextProvider, SqlToRel},
@@ -383,6 +383,7 @@ impl SessionContext {
383383
DdlStatement::CreateCatalog(cmd) => self.create_catalog(cmd).await,
384384
DdlStatement::DropTable(cmd) => self.drop_table(cmd).await,
385385
DdlStatement::DropView(cmd) => self.drop_view(cmd).await,
386+
DdlStatement::DropCatalogSchema(cmd) => self.drop_schema(cmd).await,
386387
},
387388
// TODO what about the other statements (like TransactionStart and TransactionEnd)
388389
LogicalPlan::Statement(Statement::SetVariable(stmt)) => {
@@ -653,6 +654,46 @@ impl SessionContext {
653654
}
654655
}
655656

657+
async fn drop_schema(&self, cmd: DropCatalogSchema) -> Result<DataFrame> {
658+
let DropCatalogSchema {
659+
name,
660+
if_exists: allow_missing,
661+
cascade,
662+
schema: _,
663+
} = cmd;
664+
let catalog = {
665+
let state = self.state.read();
666+
let catalog_name = match &name {
667+
SchemaReference::Full { catalog, .. } => catalog.to_string(),
668+
SchemaReference::Bare { .. } => {
669+
state.config_options().catalog.default_catalog.to_string()
670+
}
671+
};
672+
if let Some(catalog) = state.catalog_list.catalog(&catalog_name) {
673+
catalog
674+
} else if allow_missing {
675+
return self.return_empty_dataframe();
676+
} else {
677+
return self.schema_doesnt_exist_err(name);
678+
}
679+
};
680+
let dereg = catalog.deregister_schema(name.schema_name(), cascade)?;
681+
match (dereg, allow_missing) {
682+
(None, true) => self.return_empty_dataframe(),
683+
(None, false) => self.schema_doesnt_exist_err(name),
684+
(Some(_), _) => self.return_empty_dataframe(),
685+
}
686+
}
687+
688+
fn schema_doesnt_exist_err(
689+
&self,
690+
schemaref: SchemaReference<'_>,
691+
) -> Result<DataFrame> {
692+
Err(DataFusionError::Execution(format!(
693+
"Schema '{schemaref}' doesn't exist."
694+
)))
695+
}
696+
656697
async fn set_variable(&self, stmt: SetVariable) -> Result<DataFrame> {
657698
let SetVariable {
658699
variable, value, ..

datafusion/core/tests/sqllogictests/test_files/ddl.slt

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,3 +643,25 @@ select * from t;
643643

644644
statement ok
645645
drop table t;
646+
647+
##########
648+
# Dropping schemas
649+
##########
650+
651+
statement error DataFusion error: Execution error: Cannot drop schema foo_schema because other tables depend on it: bar
652+
DROP SCHEMA foo_schema;
653+
654+
statement ok
655+
DROP SCHEMA foo_schema CASCADE;
656+
657+
statement error DataFusion error: Execution error: Schema 'doesnt_exist' doesn't exist.
658+
DROP SCHEMA doesnt_exist;
659+
660+
statement ok
661+
DROP SCHEMA IF EXISTS doesnt_exist;
662+
663+
statement ok
664+
CREATE SCHEMA empty_schema;
665+
666+
statement ok
667+
DROP SCHEMA empty_schema;

datafusion/core/tests/sqllogictests/test_files/information_schema.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ SHOW CREATE TABLE test.xyz
375375
----
376376
datafusion test xyz CREATE VIEW test.xyz AS SELECT * FROM abc
377377

378-
statement error DataFusion error: This feature is not implemented: Only `DROP TABLE/VIEW
378+
statement error DataFusion error: Execution error: Cannot drop schema test because other tables depend on it: xyz
379379
DROP SCHEMA test;
380380

381381
statement ok

datafusion/expr/src/logical_plan/ddl.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use datafusion_common::Column;
1918
use datafusion_common::{
2019
parsers::CompressionTypeVariant, DFSchemaRef, OwnedTableReference,
2120
};
21+
use datafusion_common::{Column, OwnedSchemaReference};
2222
use std::collections::HashMap;
2323
use std::sync::Arc;
2424
use std::{
@@ -45,6 +45,8 @@ pub enum DdlStatement {
4545
DropTable(DropTable),
4646
/// Drops a view.
4747
DropView(DropView),
48+
/// Drops a catalog schema
49+
DropCatalogSchema(DropCatalogSchema),
4850
}
4951

5052
impl DdlStatement {
@@ -62,6 +64,7 @@ impl DdlStatement {
6264
DdlStatement::CreateCatalog(CreateCatalog { schema, .. }) => schema,
6365
DdlStatement::DropTable(DropTable { schema, .. }) => schema,
6466
DdlStatement::DropView(DropView { schema, .. }) => schema,
67+
DdlStatement::DropCatalogSchema(DropCatalogSchema { schema, .. }) => schema,
6568
}
6669
}
6770

@@ -76,6 +79,7 @@ impl DdlStatement {
7679
DdlStatement::CreateCatalog(_) => "CreateCatalog",
7780
DdlStatement::DropTable(_) => "DropTable",
7881
DdlStatement::DropView(_) => "DropView",
82+
DdlStatement::DropCatalogSchema(_) => "DropCatalogSchema",
7983
}
8084
}
8185

@@ -91,6 +95,7 @@ impl DdlStatement {
9195
DdlStatement::CreateView(CreateView { input, .. }) => vec![input],
9296
DdlStatement::DropTable(_) => vec![],
9397
DdlStatement::DropView(_) => vec![],
98+
DdlStatement::DropCatalogSchema(_) => vec![],
9499
}
95100
}
96101

@@ -147,6 +152,14 @@ impl DdlStatement {
147152
}) => {
148153
write!(f, "DropView: {name:?} if not exist:={if_exists}")
149154
}
155+
DdlStatement::DropCatalogSchema(DropCatalogSchema {
156+
name,
157+
if_exists,
158+
cascade,
159+
..
160+
}) => {
161+
write!(f, "DropCatalogSchema: {name:?} if not exist:={if_exists} cascade:={cascade}")
162+
}
150163
}
151164
}
152165
}
@@ -273,3 +286,16 @@ pub struct DropView {
273286
/// Dummy schema
274287
pub schema: DFSchemaRef,
275288
}
289+
290+
/// Drops a schema
291+
#[derive(Clone, PartialEq, Eq, Hash)]
292+
pub struct DropCatalogSchema {
293+
/// The schema name
294+
pub name: OwnedSchemaReference,
295+
/// If the schema exists
296+
pub if_exists: bool,
297+
/// Whether drop should cascade
298+
pub cascade: bool,
299+
/// Dummy schema
300+
pub schema: DFSchemaRef,
301+
}

datafusion/expr/src/logical_plan/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub use builder::{
2929
};
3030
pub use ddl::{
3131
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
32-
CreateView, DdlStatement, DropTable, DropView,
32+
CreateView, DdlStatement, DropCatalogSchema, DropTable, DropView,
3333
};
3434
pub use dml::{DmlStatement, WriteOp};
3535
pub use plan::{

datafusion/proto/src/logical_plan/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1372,6 +1372,9 @@ impl AsLogicalPlan for LogicalPlanNode {
13721372
LogicalPlan::Ddl(DdlStatement::DropView(_)) => Err(proto_error(
13731373
"LogicalPlan serde is not yet implemented for DropView",
13741374
)),
1375+
LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
1376+
"LogicalPlan serde is not yet implemented for DropCatalogSchema",
1377+
)),
13751378
LogicalPlan::Statement(_) => Err(proto_error(
13761379
"LogicalPlan serde is not yet implemented for Statement",
13771380
)),

0 commit comments

Comments
 (0)