Skip to content

Commit 9802684

Browse files
timsaucerNirnay Roy
authored and
Nirnay Roy
committed
Add Extension Type / Metadata support for Scalar UDFs (apache#15646)
* Add in plumbing to pass around metadata for physical expressions * Adding argument metadata to scalar argument struct * Since everywhere we use this we immediately clone, go ahead and returned an owned version of the metadata for simplicity * Cargo fmt * Benchmarks required args_metadata in tests * Clippy warnings * Switching over to passing Field around instead of metadata so we can handle extension types directly * Switching return_type_from_args to return_field_from_args * Updates to unit tests for switching to field instead of data_type * Resolve unit test issues * Update after rebase on main * GetFieldFunc should return the field it finds instead of creating a new one * Get metadata from scalar functions * Change expr_schema to use to_field primarily instead of individual calls for getting data type, nullability, and schema * Scalar function arguments should take return field instead of return data type now * subquery should just get the field from below and not lose potential metadata * Update comment * Remove output_field now that we've determined it using return_field_from_args * Change name to_field to field_from_column to be more consistent with the usage and prevent misconception about if we are doing some conversion * Minor moving around of the explicit lifetimes in the struct definition * Change physical expression to require to output a field which requires a lot of unit test updates, especially because the scalar arguments pass around borrowed values * Change name from output_field to return_field to be more consistent * Update migration guide for DF48 with user defined functions * Whitespace * Docstring correction
1 parent b85a0ee commit 9802684

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

95 files changed

+2102
-1294
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/src/dfschema.rs

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ impl DFSchema {
472472
let matches = self.qualified_fields_with_unqualified_name(name);
473473
match matches.len() {
474474
0 => Err(unqualified_field_not_found(name, self)),
475-
1 => Ok((matches[0].0, (matches[0].1))),
475+
1 => Ok((matches[0].0, matches[0].1)),
476476
_ => {
477477
// When `matches` size > 1, it doesn't necessarily mean an `ambiguous name` problem.
478478
// Because name may generate from Alias/... . It means that it don't own qualifier.
@@ -515,14 +515,6 @@ impl DFSchema {
515515
Ok(self.field(idx))
516516
}
517517

518-
/// Find the field with the given qualified column
519-
pub fn field_from_column(&self, column: &Column) -> Result<&Field> {
520-
match &column.relation {
521-
Some(r) => self.field_with_qualified_name(r, &column.name),
522-
None => self.field_with_unqualified_name(&column.name),
523-
}
524-
}
525-
526518
/// Find the field with the given qualified column
527519
pub fn qualified_field_from_column(
528520
&self,
@@ -969,16 +961,28 @@ impl Display for DFSchema {
969961
/// widely used in the DataFusion codebase.
970962
pub trait ExprSchema: std::fmt::Debug {
971963
/// Is this column reference nullable?
972-
fn nullable(&self, col: &Column) -> Result<bool>;
964+
fn nullable(&self, col: &Column) -> Result<bool> {
965+
Ok(self.field_from_column(col)?.is_nullable())
966+
}
973967

974968
/// What is the datatype of this column?
975-
fn data_type(&self, col: &Column) -> Result<&DataType>;
969+
fn data_type(&self, col: &Column) -> Result<&DataType> {
970+
Ok(self.field_from_column(col)?.data_type())
971+
}
976972

977973
/// Returns the column's optional metadata.
978-
fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>>;
974+
fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
975+
Ok(self.field_from_column(col)?.metadata())
976+
}
979977

980978
/// Return the column's datatype and nullability
981-
fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)>;
979+
fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
980+
let field = self.field_from_column(col)?;
981+
Ok((field.data_type(), field.is_nullable()))
982+
}
983+
984+
// Return the column's field
985+
fn field_from_column(&self, col: &Column) -> Result<&Field>;
982986
}
983987

984988
// Implement `ExprSchema` for `Arc<DFSchema>`
@@ -998,24 +1002,18 @@ impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
9981002
fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
9991003
self.as_ref().data_type_and_nullable(col)
10001004
}
1001-
}
1002-
1003-
impl ExprSchema for DFSchema {
1004-
fn nullable(&self, col: &Column) -> Result<bool> {
1005-
Ok(self.field_from_column(col)?.is_nullable())
1006-
}
1007-
1008-
fn data_type(&self, col: &Column) -> Result<&DataType> {
1009-
Ok(self.field_from_column(col)?.data_type())
1010-
}
10111005

1012-
fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1013-
Ok(self.field_from_column(col)?.metadata())
1006+
fn field_from_column(&self, col: &Column) -> Result<&Field> {
1007+
self.as_ref().field_from_column(col)
10141008
}
1009+
}
10151010

1016-
fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1017-
let field = self.field_from_column(col)?;
1018-
Ok((field.data_type(), field.is_nullable()))
1011+
impl ExprSchema for DFSchema {
1012+
fn field_from_column(&self, col: &Column) -> Result<&Field> {
1013+
match &col.relation {
1014+
Some(r) => self.field_with_qualified_name(r, &col.name),
1015+
None => self.field_with_unqualified_name(&col.name),
1016+
}
10191017
}
10201018
}
10211019

datafusion/core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ extended_tests = []
9595
[dependencies]
9696
arrow = { workspace = true }
9797
arrow-ipc = { workspace = true }
98-
arrow-schema = { workspace = true }
98+
arrow-schema = { workspace = true, features = ["canonical_extension_types"] }
9999
async-trait = { workspace = true }
100100
bytes = { workspace = true }
101101
bzip2 = { version = "0.5.2", optional = true }

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ fn test_update_matching_exprs() -> Result<()> {
128128
Arc::new(Column::new("b", 1)),
129129
)),
130130
],
131-
DataType::Int32,
131+
Field::new("f", DataType::Int32, true),
132132
)),
133133
Arc::new(CaseExpr::try_new(
134134
Some(Arc::new(Column::new("d", 2))),
@@ -193,7 +193,7 @@ fn test_update_matching_exprs() -> Result<()> {
193193
Arc::new(Column::new("b", 1)),
194194
)),
195195
],
196-
DataType::Int32,
196+
Field::new("f", DataType::Int32, true),
197197
)),
198198
Arc::new(CaseExpr::try_new(
199199
Some(Arc::new(Column::new("d", 3))),
@@ -261,7 +261,7 @@ fn test_update_projected_exprs() -> Result<()> {
261261
Arc::new(Column::new("b", 1)),
262262
)),
263263
],
264-
DataType::Int32,
264+
Field::new("f", DataType::Int32, true),
265265
)),
266266
Arc::new(CaseExpr::try_new(
267267
Some(Arc::new(Column::new("d", 2))),
@@ -326,7 +326,7 @@ fn test_update_projected_exprs() -> Result<()> {
326326
Arc::new(Column::new("b_new", 1)),
327327
)),
328328
],
329-
DataType::Int32,
329+
Field::new("f", DataType::Int32, true),
330330
)),
331331
Arc::new(CaseExpr::try_new(
332332
Some(Arc::new(Column::new("d_new", 3))),

datafusion/core/tests/tpc-ds/49.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ select channel, item, return_ratio, return_rank, currency_rank from
110110
where
111111
sr.sr_return_amt > 10000
112112
and sts.ss_net_profit > 1
113-
and sts.ss_net_paid > 0
113+
and sts.ss_net_paid > 0
114114
and sts.ss_quantity > 0
115115
and ss_sold_date_sk = d_date_sk
116116
and d_year = 2000

0 commit comments

Comments
 (0)