Skip to content

Commit 996bbdc

Browse files
committed
Add type relation
1 parent 5427f4b commit 996bbdc

File tree

105 files changed

+1446
-1688
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

105 files changed

+1446
-1688
lines changed

datafusion-examples/examples/advanced_parquet_index.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
6060
use std::sync::Arc;
6161
use tempfile::TempDir;
6262
use url::Url;
63+
use datafusion_common::logical_type::schema::LogicalSchemaRef;
6364

6465
/// This example demonstrates using low level DataFusion APIs to read only
6566
/// certain row groups and ranges from parquet files, based on external
@@ -299,8 +300,9 @@ impl IndexTableProvider {
299300
// In this example, we use the PruningPredicate's literal guarantees to
300301
// analyze the predicate. In a real system, using
301302
// `PruningPredicate::prune` would likely be easier to do.
303+
let schema = SchemaRef::new(self.schema().as_ref().clone().into());
302304
let pruning_predicate =
303-
PruningPredicate::try_new(Arc::clone(predicate), self.schema().clone())?;
305+
PruningPredicate::try_new(Arc::clone(predicate), schema)?;
304306

305307
// The PruningPredicate's guarantees must all be satisfied in order for
306308
// the predicate to possibly evaluate to true.
@@ -453,8 +455,8 @@ impl TableProvider for IndexTableProvider {
453455
self
454456
}
455457

456-
fn schema(&self) -> SchemaRef {
457-
Arc::clone(&self.indexed_file.schema)
458+
fn schema(&self) -> LogicalSchemaRef {
459+
LogicalSchemaRef::new(self.indexed_file.schema.as_ref().clone().into())
458460
}
459461

460462
fn table_type(&self) -> TableType {
@@ -482,7 +484,7 @@ impl TableProvider for IndexTableProvider {
482484
.with_extensions(Arc::new(access_plan) as _);
483485

484486
// Prepare for scanning
485-
let schema = self.schema();
487+
let schema = SchemaRef::new(self.schema().as_ref().clone().into());
486488
let object_store_url = ObjectStoreUrl::parse("file://")?;
487489
let file_scan_config = FileScanConfig::new(object_store_url, schema)
488490
.with_limit(limit)

datafusion-examples/examples/custom_datasource.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::sync::{Arc, Mutex};
2222
use std::time::Duration;
2323

2424
use datafusion::arrow::array::{UInt64Builder, UInt8Builder};
25-
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
25+
use datafusion::arrow::datatypes::{DataType, SchemaRef};
2626
use datafusion::arrow::record_batch::RecordBatch;
2727
use datafusion::datasource::{provider_as_source, TableProvider, TableType};
2828
use datafusion::error::Result;
@@ -38,6 +38,8 @@ use datafusion_physical_expr::EquivalenceProperties;
3838

3939
use async_trait::async_trait;
4040
use tokio::time::timeout;
41+
use datafusion_common::logical_type::field::LogicalField;
42+
use datafusion_common::logical_type::schema::{LogicalSchema, LogicalSchemaRef};
4143

4244
/// This example demonstrates executing a simple query against a custom datasource
4345
#[tokio::main]
@@ -162,10 +164,10 @@ impl TableProvider for CustomDataSource {
162164
self
163165
}
164166

165-
fn schema(&self) -> SchemaRef {
166-
SchemaRef::new(Schema::new(vec![
167-
Field::new("id", DataType::UInt8, false),
168-
Field::new("bank_account", DataType::UInt64, true),
167+
fn schema(&self) -> LogicalSchemaRef {
168+
LogicalSchemaRef::new(LogicalSchema::new(vec![
169+
LogicalField::new("id", DataType::UInt8, false),
170+
LogicalField::new("bank_account", DataType::UInt64, true),
169171
]))
170172
}
171173

@@ -181,7 +183,8 @@ impl TableProvider for CustomDataSource {
181183
_filters: &[Expr],
182184
_limit: Option<usize>,
183185
) -> Result<Arc<dyn ExecutionPlan>> {
184-
return self.create_physical_plan(projection, self.schema()).await;
186+
let schema = SchemaRef::new(self.schema().as_ref().clone().into());
187+
return self.create_physical_plan(projection, schema).await;
185188
}
186189
}
187190

datafusion-examples/examples/dataframe_subquery.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use datafusion_common::logical_type::LogicalType;
1918
use std::sync::Arc;
2019

20+
use arrow_schema::DataType;
2121
use datafusion::error::Result;
2222
use datafusion::functions_aggregate::average::avg;
2323
use datafusion::prelude::*;
@@ -48,7 +48,7 @@ async fn where_scalar_subquery(ctx: &SessionContext) -> Result<()> {
4848
scalar_subquery(Arc::new(
4949
ctx.table("t2")
5050
.await?
51-
.filter(out_ref_col(LogicalType::Utf8, "t1.c1").eq(col("t2.c1")))?
51+
.filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))?
5252
.aggregate(vec![], vec![avg(col("t2.c2"))])?
5353
.select(vec![avg(col("t2.c2"))])?
5454
.into_unoptimized_plan(),
@@ -91,7 +91,7 @@ async fn where_exist_subquery(ctx: &SessionContext) -> Result<()> {
9191
.filter(exists(Arc::new(
9292
ctx.table("t2")
9393
.await?
94-
.filter(out_ref_col(LogicalType::Utf8, "t1.c1").eq(col("t2.c1")))?
94+
.filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))?
9595
.select(vec![col("t2.c2")])?
9696
.into_unoptimized_plan(),
9797
)))?

datafusion-examples/examples/expr_api.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries};
3030
use datafusion::prelude::*;
3131
use datafusion_common::logical_type::field::LogicalField;
3232
use datafusion_common::logical_type::schema::LogicalSchema;
33-
use datafusion_common::logical_type::LogicalType;
3433
use datafusion_common::{ScalarValue, ToDFSchema};
3534
use datafusion_expr::execution_props::ExecutionProps;
3635
use datafusion_expr::expr::BinaryExpr;
@@ -214,7 +213,7 @@ fn simplify_demo() -> Result<()> {
214213
// String --> Date simplification
215214
// `cast('2020-09-01' as date)` --> 18500
216215
assert_eq!(
217-
simplifier.simplify(lit("2020-09-01").cast_to(&LogicalType::Date32, &schema)?)?,
216+
simplifier.simplify(lit("2020-09-01").cast_to(&DataType::Date32.into(), &schema)?)?,
218217
lit(ScalarValue::Date32(Some(18506)))
219218
);
220219

@@ -296,14 +295,14 @@ fn expression_type_demo() -> Result<()> {
296295
// a schema. In this case we create a schema where the column `c` is of
297296
// type Utf8 (a String / VARCHAR)
298297
let schema = DFSchema::from_unqualified_fields(
299-
vec![LogicalField::new("c", LogicalType::Utf8, true)].into(),
298+
vec![LogicalField::new("c", DataType::Utf8, true)].into(),
300299
HashMap::new(),
301300
)?;
302301
assert_eq!("Utf8", format!("{}", expr.get_type(&schema).unwrap()));
303302

304303
// Using a schema where the column `foo` is of type Int32
305304
let schema = DFSchema::from_unqualified_fields(
306-
vec![LogicalField::new("c", LogicalType::Int32, true)].into(),
305+
vec![LogicalField::new("c", DataType::Int32, true)].into(),
307306
HashMap::new(),
308307
)?;
309308
assert_eq!("Int32", format!("{}", expr.get_type(&schema).unwrap()));
@@ -313,8 +312,8 @@ fn expression_type_demo() -> Result<()> {
313312
let expr = col("c1") + col("c2");
314313
let schema = DFSchema::from_unqualified_fields(
315314
vec![
316-
LogicalField::new("c1", LogicalType::Int32, true),
317-
LogicalField::new("c2", LogicalType::Float32, true),
315+
LogicalField::new("c1", DataType::Int32, true),
316+
LogicalField::new("c2", DataType::Float32, true),
318317
]
319318
.into(),
320319
HashMap::new(),

datafusion-examples/examples/function_factory.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use datafusion::error::Result;
2222
use datafusion::execution::context::{
2323
FunctionFactory, RegisterFunction, SessionContext, SessionState,
2424
};
25-
use datafusion_common::logical_type::extension::ExtensionType;
25+
use datafusion_common::logical_type::ExtensionType;
2626
use datafusion_common::tree_node::{Transformed, TreeNode};
2727
use datafusion_common::{exec_err, internal_err, DataFusionError};
2828
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
@@ -218,13 +218,14 @@ impl TryFrom<CreateFunction> for ScalarFunctionWrapper {
218218
return_type: definition
219219
.return_type
220220
.expect("Return type has to be defined!")
221-
.physical_type(),
221+
.physical()
222+
.clone(),
222223
signature: Signature::exact(
223224
definition
224225
.args
225226
.unwrap_or_default()
226227
.into_iter()
227-
.map(|a| a.data_type.physical_type())
228+
.map(|a| a.data_type.physical().clone())
228229
.collect(),
229230
definition
230231
.params
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
fn main() {
2+
3+
}

datafusion-examples/examples/parquet_index.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ use std::sync::{
5454
};
5555
use tempfile::TempDir;
5656
use url::Url;
57+
use datafusion_common::logical_type::schema::LogicalSchemaRef;
5758

5859
/// This example demonstrates building a secondary index over multiple Parquet
5960
/// files and using that index during query to skip ("prune") files that do not
@@ -212,8 +213,8 @@ impl TableProvider for IndexTableProvider {
212213
self
213214
}
214215

215-
fn schema(&self) -> SchemaRef {
216-
self.index.schema().clone()
216+
fn schema(&self) -> LogicalSchemaRef {
217+
LogicalSchemaRef::new(self.index.schema().into())
217218
}
218219

219220
fn table_type(&self) -> TableType {
@@ -243,7 +244,8 @@ impl TableProvider for IndexTableProvider {
243244
let files = self.index.get_files(predicate.clone())?;
244245

245246
let object_store_url = ObjectStoreUrl::parse("file://")?;
246-
let mut file_scan_config = FileScanConfig::new(object_store_url, self.schema())
247+
let schema = SchemaRef::new(self.schema().as_ref().clone().into());
248+
let mut file_scan_config = FileScanConfig::new(object_store_url, schema)
247249
.with_projection(projection.cloned())
248250
.with_limit(limit);
249251

datafusion-examples/examples/rewrite_expr.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
1919
use datafusion_common::config::ConfigOptions;
20-
use datafusion_common::logical_type::LogicalType;
20+
use datafusion_common::logical_type::TypeRelation;
2121
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
2222
use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue};
2323
use datafusion_expr::{
@@ -32,6 +32,7 @@ use datafusion_sql::sqlparser::parser::Parser;
3232
use datafusion_sql::TableReference;
3333
use std::any::Any;
3434
use std::sync::Arc;
35+
use datafusion_common::logical_type::schema::LogicalSchemaRef;
3536

3637
pub fn main() -> Result<()> {
3738
// produce a logical plan using the datafusion-sql crate
@@ -212,7 +213,7 @@ impl ContextProvider for MyContextProvider {
212213
None
213214
}
214215

215-
fn get_variable_type(&self, _variable_names: &[String]) -> Option<LogicalType> {
216+
fn get_variable_type(&self, _variable_names: &[String]) -> Option<TypeRelation> {
216217
None
217218
}
218219

@@ -246,7 +247,7 @@ impl TableSource for MyTableSource {
246247
self
247248
}
248249

249-
fn schema(&self) -> SchemaRef {
250-
self.schema.clone()
250+
fn schema(&self) -> LogicalSchemaRef {
251+
LogicalSchemaRef::new(self.schema.as_ref().clone().into())
251252
}
252253
}

datafusion-examples/examples/simple_udtf.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use std::fs::File;
3535
use std::io::Seek;
3636
use std::path::Path;
3737
use std::sync::Arc;
38-
38+
use datafusion_common::logical_type::schema::LogicalSchemaRef;
3939
// To define your own table function, you only need to do the following 3 things:
4040
// 1. Implement your own [`TableProvider`]
4141
// 2. Implement your own [`TableFunctionImpl`] and return your [`TableProvider`]
@@ -85,8 +85,8 @@ impl TableProvider for LocalCsvTable {
8585
self
8686
}
8787

88-
fn schema(&self) -> SchemaRef {
89-
self.schema.clone()
88+
fn schema(&self) -> LogicalSchemaRef {
89+
LogicalSchemaRef::new(self.schema.clone().into())
9090
}
9191

9292
fn table_type(&self) -> TableType {
@@ -121,7 +121,7 @@ impl TableProvider for LocalCsvTable {
121121
};
122122
Ok(Arc::new(MemoryExec::try_new(
123123
&[batches],
124-
TableProvider::schema(self),
124+
self.schema.clone(),
125125
projection.cloned(),
126126
)?))
127127
}

datafusion/common/src/dfschema.rs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,9 @@ use crate::{
3131

3232
use arrow::compute::can_cast_types;
3333
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
34-
use crate::logical_type::extension::ExtensionType;
3534
use crate::logical_type::field::{LogicalField, LogicalFieldRef};
3635
use crate::logical_type::fields::LogicalFields;
37-
use crate::logical_type::LogicalType;
36+
use crate::logical_type::{TypeRelation, ExtensionType};
3837
use crate::logical_type::schema::{LogicalSchema, LogicalSchemaBuilder, LogicalSchemaRef};
3938

4039
/// A reference-counted reference to a [DFSchema].
@@ -165,7 +164,7 @@ impl DFSchema {
165164
/// Create a new `DFSchema` from a list of Arrow [Field]s
166165
#[allow(deprecated)]
167166
pub fn from_unqualified_fields(
168-
fields: Fields,
167+
fields: LogicalFields,
169168
metadata: HashMap<String, String>,
170169
) -> Result<Self> {
171170
Self::from_unqualifed_fields(fields, metadata)
@@ -675,15 +674,15 @@ impl DFSchema {
675674
self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
676675
q1 == q2
677676
&& f1.name() == f2.name()
678-
&& Self::datatype_is_semantically_equal(&f1.data_type().physical_type(), &f2.data_type().physical_type())
677+
&& Self::datatype_is_semantically_equal(&f1.data_type().physical(), &f2.data_type().physical())
679678
})
680679
}
681680

682681
/// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint
683682
/// than datatype_is_semantically_equal in that a Dictionary<K,V> type is logically
684683
/// equal to a plain V type, but not semantically equal. Dictionary<K1, V1> is also
685684
/// logically equal to Dictionary<K2, V1>.
686-
pub fn datatype_is_logically_equal(dt1: &LogicalType, dt2: &LogicalType) -> bool {
685+
pub fn datatype_is_logically_equal(dt1: &TypeRelation, dt2: &TypeRelation) -> bool {
687686
dt1 == dt2
688687
}
689688

@@ -916,13 +915,13 @@ pub trait ExprSchema: std::fmt::Debug {
916915
fn nullable(&self, col: &Column) -> Result<bool>;
917916

918917
/// What is the datatype of this column?
919-
fn data_type(&self, col: &Column) -> Result<&LogicalType>;
918+
fn data_type(&self, col: &Column) -> Result<&TypeRelation>;
920919

921920
/// Returns the column's optional metadata.
922921
fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>>;
923922

924923
/// Return the coulmn's datatype and nullability
925-
fn data_type_and_nullable(&self, col: &Column) -> Result<(&LogicalType, bool)>;
924+
fn data_type_and_nullable(&self, col: &Column) -> Result<(&TypeRelation, bool)>;
926925
}
927926

928927
// Implement `ExprSchema` for `Arc<DFSchema>`
@@ -931,15 +930,15 @@ impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
931930
self.as_ref().nullable(col)
932931
}
933932

934-
fn data_type(&self, col: &Column) -> Result<&LogicalType> {
933+
fn data_type(&self, col: &Column) -> Result<&TypeRelation> {
935934
self.as_ref().data_type(col)
936935
}
937936

938937
fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
939938
ExprSchema::metadata(self.as_ref(), col)
940939
}
941940

942-
fn data_type_and_nullable(&self, col: &Column) -> Result<(&LogicalType, bool)> {
941+
fn data_type_and_nullable(&self, col: &Column) -> Result<(&TypeRelation, bool)> {
943942
self.as_ref().data_type_and_nullable(col)
944943
}
945944
}
@@ -949,15 +948,15 @@ impl ExprSchema for DFSchema {
949948
Ok(self.field_from_column(col)?.is_nullable())
950949
}
951950

952-
fn data_type(&self, col: &Column) -> Result<&LogicalType> {
951+
fn data_type(&self, col: &Column) -> Result<&TypeRelation> {
953952
Ok(self.field_from_column(col)?.data_type())
954953
}
955954

956955
fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
957956
Ok(self.field_from_column(col)?.metadata())
958957
}
959958

960-
fn data_type_and_nullable(&self, col: &Column) -> Result<(&LogicalType, bool)> {
959+
fn data_type_and_nullable(&self, col: &Column) -> Result<(&TypeRelation, bool)> {
961960
let field = self.field_from_column(col)?;
962961
Ok((field.data_type(), field.is_nullable()))
963962
}
@@ -1080,8 +1079,8 @@ mod tests {
10801079
let schema = DFSchema::from_field_specific_qualified_schema(
10811080
vec![Some("t1".into()), None],
10821081
&Arc::new(LogicalSchema::new(vec![
1083-
LogicalField::new("c0", LogicalType::Boolean, true),
1084-
LogicalField::new("c1", LogicalType::Boolean, true),
1082+
LogicalField::new("c0", DataType::Boolean, true),
1083+
LogicalField::new("c1", DataType::Boolean, true),
10851084
])),
10861085
)?;
10871086
assert_eq!("fields:[t1.c0, c1], metadata:{}", schema.to_string());
@@ -1333,8 +1332,8 @@ mod tests {
13331332

13341333
fn test_schema_2() -> LogicalSchema {
13351334
LogicalSchema::new(vec![
1336-
LogicalField::new("c100", LogicalType::Boolean, true),
1337-
LogicalField::new("c101", LogicalType::Boolean, true),
1335+
LogicalField::new("c100", DataType::Boolean, true),
1336+
LogicalField::new("c101", DataType::Boolean, true),
13381337
])
13391338
}
13401339

0 commit comments

Comments
 (0)