Skip to content

Commit 5427f4b

Browse files
committed
draft: add logical types and support extension types
1 parent c80da91 commit 5427f4b

File tree

101 files changed

+2382
-1327
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

101 files changed

+2382
-1327
lines changed

datafusion-examples/examples/dataframe_subquery.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use arrow_schema::DataType;
18+
use datafusion_common::logical_type::LogicalType;
1919
use std::sync::Arc;
2020

2121
use datafusion::error::Result;
@@ -48,7 +48,7 @@ async fn where_scalar_subquery(ctx: &SessionContext) -> Result<()> {
4848
scalar_subquery(Arc::new(
4949
ctx.table("t2")
5050
.await?
51-
.filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))?
51+
.filter(out_ref_col(LogicalType::Utf8, "t1.c1").eq(col("t2.c1")))?
5252
.aggregate(vec![], vec![avg(col("t2.c2"))])?
5353
.select(vec![avg(col("t2.c2"))])?
5454
.into_unoptimized_plan(),
@@ -91,7 +91,7 @@ async fn where_exist_subquery(ctx: &SessionContext) -> Result<()> {
9191
.filter(exists(Arc::new(
9292
ctx.table("t2")
9393
.await?
94-
.filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))?
94+
.filter(out_ref_col(LogicalType::Utf8, "t1.c1").eq(col("t2.c1")))?
9595
.select(vec![col("t2.c2")])?
9696
.into_unoptimized_plan(),
9797
)))?

datafusion-examples/examples/expr_api.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ use datafusion::functions_aggregate::first_last::first_value_udaf;
2828
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
2929
use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries};
3030
use datafusion::prelude::*;
31+
use datafusion_common::logical_type::field::LogicalField;
32+
use datafusion_common::logical_type::schema::LogicalSchema;
33+
use datafusion_common::logical_type::LogicalType;
3134
use datafusion_common::{ScalarValue, ToDFSchema};
3235
use datafusion_expr::execution_props::ExecutionProps;
3336
use datafusion_expr::expr::BinaryExpr;
@@ -156,7 +159,7 @@ fn simplify_demo() -> Result<()> {
156159
// However, DataFusion's simplification logic can do this for you
157160

158161
// you need to tell DataFusion the type of column "ts":
159-
let schema = Schema::new(vec![make_ts_field("ts")]).to_dfschema_ref()?;
162+
let schema = LogicalSchema::from(Schema::new(vec![make_ts_field("ts")])).to_dfschema_ref()?;
160163

161164
// And then build a simplifier
162165
// the ExecutionProps carries information needed to simplify
@@ -177,10 +180,10 @@ fn simplify_demo() -> Result<()> {
177180
);
178181

179182
// here are some other examples of what DataFusion is capable of
180-
let schema = Schema::new(vec![
183+
let schema = LogicalSchema::from(Schema::new(vec![
181184
make_field("i", DataType::Int64),
182185
make_field("b", DataType::Boolean),
183-
])
186+
]))
184187
.to_dfschema_ref()?;
185188
let context = SimplifyContext::new(&props).with_schema(schema.clone());
186189
let simplifier = ExprSimplifier::new(context);
@@ -211,7 +214,7 @@ fn simplify_demo() -> Result<()> {
211214
// String --> Date simplification
212215
// `cast('2020-09-01' as date)` --> 18500
213216
assert_eq!(
214-
simplifier.simplify(lit("2020-09-01").cast_to(&DataType::Date32, &schema)?)?,
217+
simplifier.simplify(lit("2020-09-01").cast_to(&LogicalType::Date32, &schema)?)?,
215218
lit(ScalarValue::Date32(Some(18506)))
216219
);
217220

@@ -258,7 +261,7 @@ fn range_analysis_demo() -> Result<()> {
258261
let analysis_result = analyze(
259262
&physical_expr,
260263
AnalysisContext::new(boundaries),
261-
df_schema.as_ref(),
264+
&df_schema.into(),
262265
)?;
263266

264267
// The results of the analysis is an range, encoded as an `Interval`, for
@@ -293,14 +296,14 @@ fn expression_type_demo() -> Result<()> {
293296
// a schema. In this case we create a schema where the column `c` is of
294297
// type Utf8 (a String / VARCHAR)
295298
let schema = DFSchema::from_unqualified_fields(
296-
vec![Field::new("c", DataType::Utf8, true)].into(),
299+
vec![LogicalField::new("c", LogicalType::Utf8, true)].into(),
297300
HashMap::new(),
298301
)?;
299302
assert_eq!("Utf8", format!("{}", expr.get_type(&schema).unwrap()));
300303

301304
// Using a schema where the column `foo` is of type Int32
302305
let schema = DFSchema::from_unqualified_fields(
303-
vec![Field::new("c", DataType::Int32, true)].into(),
306+
vec![LogicalField::new("c", LogicalType::Int32, true)].into(),
304307
HashMap::new(),
305308
)?;
306309
assert_eq!("Int32", format!("{}", expr.get_type(&schema).unwrap()));
@@ -310,8 +313,8 @@ fn expression_type_demo() -> Result<()> {
310313
let expr = col("c1") + col("c2");
311314
let schema = DFSchema::from_unqualified_fields(
312315
vec![
313-
Field::new("c1", DataType::Int32, true),
314-
Field::new("c2", DataType::Float32, true),
316+
LogicalField::new("c1", LogicalType::Int32, true),
317+
LogicalField::new("c2", LogicalType::Float32, true),
315318
]
316319
.into(),
317320
HashMap::new(),

datafusion-examples/examples/function_factory.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use datafusion::error::Result;
2222
use datafusion::execution::context::{
2323
FunctionFactory, RegisterFunction, SessionContext, SessionState,
2424
};
25+
use datafusion_common::logical_type::extension::ExtensionType;
2526
use datafusion_common::tree_node::{Transformed, TreeNode};
2627
use datafusion_common::{exec_err, internal_err, DataFusionError};
2728
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
@@ -216,13 +217,14 @@ impl TryFrom<CreateFunction> for ScalarFunctionWrapper {
216217
.expect("Expression has to be defined!"),
217218
return_type: definition
218219
.return_type
219-
.expect("Return type has to be defined!"),
220+
.expect("Return type has to be defined!")
221+
.physical_type(),
220222
signature: Signature::exact(
221223
definition
222224
.args
223225
.unwrap_or_default()
224226
.into_iter()
225-
.map(|a| a.data_type)
227+
.map(|a| a.data_type.physical_type())
226228
.collect(),
227229
definition
228230
.params

datafusion-examples/examples/rewrite_expr.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
1919
use datafusion_common::config::ConfigOptions;
20+
use datafusion_common::logical_type::LogicalType;
2021
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
2122
use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue};
2223
use datafusion_expr::{
@@ -211,7 +212,7 @@ impl ContextProvider for MyContextProvider {
211212
None
212213
}
213214

214-
fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
215+
fn get_variable_type(&self, _variable_names: &[String]) -> Option<LogicalType> {
215216
None
216217
}
217218

datafusion/common/src/column.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
//! Column
1919
20-
use arrow_schema::{Field, FieldRef};
21-
2220
use crate::error::_schema_err;
2321
use crate::utils::{parse_identifiers_normalized, quote_identifier};
2422
use crate::{DFSchema, DataFusionError, Result, SchemaError, TableReference};
@@ -27,6 +25,7 @@ use std::convert::Infallible;
2725
use std::fmt;
2826
use std::str::FromStr;
2927
use std::sync::Arc;
28+
use crate::logical_type::field::{LogicalField, LogicalFieldRef};
3029

3130
/// A named reference to a qualified field in a schema.
3231
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
@@ -349,15 +348,15 @@ impl From<String> for Column {
349348
}
350349

351350
/// Create a column, use qualifier and field name
352-
impl From<(Option<&TableReference>, &Field)> for Column {
353-
fn from((relation, field): (Option<&TableReference>, &Field)) -> Self {
351+
impl From<(Option<&TableReference>, &LogicalField)> for Column {
352+
fn from((relation, field): (Option<&TableReference>, &LogicalField)) -> Self {
354353
Self::new(relation.cloned(), field.name())
355354
}
356355
}
357356

358357
/// Create a column, use qualifier and field name
359-
impl From<(Option<&TableReference>, &FieldRef)> for Column {
360-
fn from((relation, field): (Option<&TableReference>, &FieldRef)) -> Self {
358+
impl From<(Option<&TableReference>, &LogicalFieldRef)> for Column {
359+
fn from((relation, field): (Option<&TableReference>, &LogicalFieldRef)) -> Self {
361360
Self::new(relation.cloned(), field.name())
362361
}
363362
}
@@ -380,7 +379,7 @@ impl fmt::Display for Column {
380379
mod tests {
381380
use super::*;
382381
use arrow::datatypes::DataType;
383-
use arrow_schema::SchemaBuilder;
382+
use arrow_schema::{Field, SchemaBuilder};
384383

385384
fn create_qualified_schema(qualifier: &str, names: Vec<&str>) -> Result<DFSchema> {
386385
let mut schema_builder = SchemaBuilder::new();
@@ -389,7 +388,7 @@ mod tests {
389388
.iter()
390389
.map(|f| Field::new(*f, DataType::Boolean, true)),
391390
);
392-
let schema = Arc::new(schema_builder.finish());
391+
let schema = Arc::new(schema_builder.finish().into());
393392
DFSchema::try_from_qualified_schema(qualifier, &schema)
394393
}
395394

0 commit comments

Comments
 (0)