From f68cdd235515d7a7b76b8fa6c980a3f14769404e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 14 Mar 2025 12:12:31 -0500 Subject: [PATCH 01/17] wip --- datafusion-testing | 2 +- .../datasource-parquet/src/row_filter.rs | 2 +- datafusion/datasource-parquet/src/source.rs | 2 +- datafusion/datasource/src/schema_adapter.rs | 232 ++++++++++++++---- 4 files changed, 189 insertions(+), 49 deletions(-) diff --git a/datafusion-testing b/datafusion-testing index 3462eaa78745..5b424aefd7f6 160000 --- a/datafusion-testing +++ b/datafusion-testing @@ -1 +1 @@ -Subproject commit 3462eaa787459957e38df267a4a21f5bea605807 +Subproject commit 5b424aefd7f6bf198220c37f59d39dbb25b47695 diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 39fcecf37c6d..263413d090c9 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -684,7 +684,7 @@ mod test { let table_ref = Arc::new(table_schema.clone()); let schema_adapter = - DefaultSchemaAdapterFactory.create(Arc::clone(&table_ref), table_ref); + DefaultSchemaAdapterFactory::default().create(Arc::clone(&table_ref), table_ref); let (schema_mapping, _) = schema_adapter .map_schema(&file_schema) .expect("creating schema mapping"); diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 683d62a1df49..ca28a882acb0 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -473,7 +473,7 @@ impl FileSource for ParquetSource { let schema_adapter_factory = self .schema_adapter_factory .clone() - .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory)); + .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory::default())); let parquet_file_reader_factory = self.parquet_file_reader_factory.clone().unwrap_or_else(|| { diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index e3a4ea4918c1..f64df0bda52f 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -21,10 +21,12 @@ //! physical format into how they should be used by DataFusion. For instance, a schema //! can be stored external to a parquet file that maps parquet logical types to arrow types. -use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions}; +use arrow::array::{new_null_array, ArrayRef, RecordBatch, RecordBatchOptions}; use arrow::compute::{can_cast_types, cast}; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::{Field, Schema, SchemaRef}; use datafusion_common::plan_err; +use itertools::Itertools; +use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; @@ -111,6 +113,45 @@ pub trait SchemaMapper: Debug + Send + Sync { ) -> datafusion_common::Result; } +pub trait MissingColumnGeneratorFactory: Debug + Send + Sync { + /// Create a [`MissingColumnGenerator`] for the given `field` and `file_schema`. + /// Returns None if the column cannot be generated by this generator. + /// Otherwise, returns a [`MissingColumnGenerator`] that can generate the missing column. + fn create( + &self, + field: &Field, + file_schema: &Schema, + ) -> Option>; +} + +pub trait MissingColumnGenerator: Debug + Send + Sync { + /// Generate a missing column for the given `field` from the provided `batch`. + /// When this method is called `batch` will contain all of the columns declared as dependencies in `dependencies`. + /// If the column cannot be generated, this method should return an error. + /// Otherwise, it should return the generated column as an `ArrayRef`. + /// No casting or post processing is done by this method, so the generated column should match the data type + /// of the `field` it is being generated for. + /// The order of + fn generate(&self, batch: RecordBatch) -> datafusion_common::Result; + + /// Returns a list of column names that this generator depends on to generate the missing column. + /// This is used when creating the `RecordBatch` to ensure that all dependencies are present before calling `generate`. + /// The dependencies do not need to be declared in any particular order. + fn dependencies(&self) -> Vec; +} + +pub type ColumnGeneratorFactories = Vec>; + +#[derive(Debug)] +enum FieldSource { + /// The field is present in the (projected) file schema at the given index + Table(usize), + /// The field is generated by the given generator + Generated(Arc), + /// The field will be populated with null + Null, +} + /// Default [`SchemaAdapterFactory`] for mapping schemas. /// /// This can be used to adapt file-level record batches to a table schema and @@ -197,7 +238,13 @@ pub trait SchemaMapper: Debug + Send + Sync { /// assert_eq!(mapped_batch, expected_batch); /// ``` #[derive(Clone, Debug, Default)] -pub struct DefaultSchemaAdapterFactory; +pub struct DefaultSchemaAdapterFactory { + /// Optional generator for missing columns + /// + /// This is used to fill in missing columns with a default value other than null. + /// If this is `None`, then missing columns will be filled with nulls. + column_generators: ColumnGeneratorFactories, +} impl DefaultSchemaAdapterFactory { /// Create a new factory for mapping batches from a file schema to a table @@ -207,7 +254,23 @@ impl DefaultSchemaAdapterFactory { /// the same schema for both the projected table schema and the table /// schema. pub fn from_schema(table_schema: SchemaRef) -> Box { - Self.create(Arc::clone(&table_schema), table_schema) + Self { column_generators: vec![] }.create(Arc::clone(&table_schema), table_schema) + } + + pub fn with_column_generator( + self, + generator: Arc, + ) -> Self { + let mut generators = self.column_generators; + generators.push(generator); + Self { column_generators: generators } + } + + pub fn with_column_generators( + self, + generators: ColumnGeneratorFactories, + ) -> Self { + Self { column_generators: generators } } } @@ -220,6 +283,7 @@ impl SchemaAdapterFactory for DefaultSchemaAdapterFactory { Box::new(DefaultSchemaAdapter { projected_table_schema, table_schema, + column_generators: self.column_generators.clone(), }) } } @@ -237,6 +301,8 @@ pub(crate) struct DefaultSchemaAdapter { /// which may refer to columns that are not referred to anywhere /// else in the plan. table_schema: SchemaRef, + /// The column generators to use when a column is missing + column_generators: ColumnGeneratorFactories, } impl SchemaAdapter for DefaultSchemaAdapter { @@ -258,45 +324,119 @@ impl SchemaAdapter for DefaultSchemaAdapter { /// /// Returns a [`SchemaMapping`] that can be applied to the output batch /// along with an ordered list of columns to project from the file - fn map_schema( - &self, - file_schema: &Schema, - ) -> datafusion_common::Result<(Arc, Vec)> { + fn map_schema(&self, file_schema: &Schema) -> datafusion_common::Result<(Arc, Vec)> { + // Projection is the indexes into the file schema that we need to read + // Note that readers will NOT respect the order of the columns in projection let mut projection = Vec::with_capacity(file_schema.fields().len()); - let mut field_mappings = vec![None; self.projected_table_schema.fields().len()]; - - for (file_idx, file_field) in file_schema.fields.iter().enumerate() { - if let Some((table_idx, table_field)) = - self.projected_table_schema.fields().find(file_field.name()) - { - match can_cast_types(file_field.data_type(), table_field.data_type()) { - true => { - field_mappings[table_idx] = Some(projection.len()); - projection.push(file_idx); + + // Additions to the projection which will be needed to generate missing columns + // TODO can we refactor this away? + let mut dependency_projection = Vec::with_capacity(file_schema.fields().len()); + + let field_sources: Vec<_> = self + .projected_table_schema + .fields() + .iter() + .map(|table_field| { + if let Some((file_idx, file_field)) = file_schema.fields.find(table_field.name()) { + match can_cast_types(file_field.data_type(), table_field.data_type()) { + true => { + projection.push(file_idx); + } + false => { + return plan_err!( + "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", + file_field.name(), + file_field.data_type(), + table_field.data_type() + ) + } } - false => { - return plan_err!( - "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", - file_field.name(), - file_field.data_type(), - table_field.data_type() - ) + let projected_idx = projection.len(); + Ok(FieldSource::Table(projected_idx)) + } else if let Some((generator, dependencies)) = self.column_generators.iter().find_map(|factory| { + if let Some(generator) = factory.create(table_field, file_schema) { + let dependencies = generator.dependencies(); + dependencies.into_iter().map(|dep| { + file_schema.index_of(&dep).map_err(|_| { + plan_err!( + "Generator for column {} depends on column {} which is not present in the file schema, columns present in the file schema are: {:?}", + table_field.name(), dep, file_schema.fields().iter().map(|f| f.name()).collect::>() + ) + }) + }) + .collect::, _>>()?; + Ok(Some((generator, dependencies))) + } else { + Ok(None) } + }) { + dependency_projection.extend(dependencies); + Ok(FieldSource::Generated(generator)) + } else if table_field.is_nullable() { + Ok(FieldSource::Null) + } else { + plan_err!( + "Column {} is missing from the file schema, cannot be generated, and is non-nullable", + table_field.name() + ) } + }) + .try_collect()?; + + for dep in dependency_projection { + if !projection.contains(&dep) { + projection.push(dep); } } + let (sorted_projection, field_sources) = sort_projections_and_sources(&projection, field_sources); + Ok(( Arc::new(SchemaMapping { - projected_table_schema: Arc::clone(&self.projected_table_schema), - field_mappings, - table_schema: Arc::clone(&self.table_schema), + projected_table_schema: self.projected_table_schema.clone(), + field_sources, + table_schema: self.table_schema.clone(), }), - projection, + sorted_projection, )) } } + +/// The parquet reader needs projections to be sorted (it does not respect the order of the columns in the projection, only the values) +/// This function adjusts the projections and mappings so that they all reference sorted projections +fn sort_projections_and_sources( + projection: &[usize], + mut field_sources: Vec, +) -> (Vec, Vec) { + // Sort projection and create a mapping from old to new positions + let mut sorted_projection = projection.to_vec(); + sorted_projection.sort_unstable(); + + // Create a mapping from old projection values to their new positions + let mut new_position_map = HashMap::new(); + for (new_pos, &proj_val) in sorted_projection.iter().enumerate() { + new_position_map.insert(proj_val, new_pos); + } + + // Create a mapping from old positions to new positions in the projected schema + let mut position_mapping = vec![0; projection.len()]; + for (old_pos, &proj_val) in projection.iter().enumerate() { + position_mapping[old_pos] = *new_position_map.get(&proj_val).expect("should always be present"); + } + + // Update field_sources to reflect the new positions + for source in &mut field_sources { + if let FieldSource::Table(pos) = source { + *pos = position_mapping[*pos]; + } + } + + (sorted_projection, field_sources) +} + + /// The SchemaMapping struct holds a mapping from the file schema to the table /// schema and any necessary type conversions. /// @@ -326,12 +466,9 @@ pub struct SchemaMapping { /// The schema of the table. This is the expected schema after conversion /// and it should match the schema of the query result. projected_table_schema: SchemaRef, - /// Mapping from field index in `projected_table_schema` to index in - /// projected file_schema. - /// - /// They are Options instead of just plain `usize`s because the table could - /// have fields that don't exist in the file. - field_mappings: Vec>, + /// A mapping from the fields in `projected_table_schema`` to the way to materialize + /// them from the projected file schema. + field_sources: Vec, /// The entire table schema, as opposed to the projected_table_schema (which /// only contains the columns that we are projecting out of this query). /// This contains all fields in the table, regardless of if they will be @@ -348,6 +485,8 @@ impl SchemaMapper for SchemaMapping { let batch_rows = batch.num_rows(); let batch_cols = batch.columns().to_vec(); + debug_assert_eq!(self.projected_table_schema.fields().len(), self.field_sources.len()); + let cols = self .projected_table_schema // go through each field in the projected schema @@ -355,24 +494,25 @@ impl SchemaMapper for SchemaMapping { .iter() // and zip it with the index that maps fields from the projected table schema to the // projected file schema in `batch` - .zip(&self.field_mappings) + .zip(&self.field_sources) // and for each one... - .map(|(field, file_idx)| { - file_idx.map_or_else( - // If this field only exists in the table, and not in the file, then we know - // that it's null, so just return that. - || Ok(new_null_array(field.data_type(), batch_rows)), - // However, if it does exist in both, then try to cast it to the correct output - // type - |batch_idx| cast(&batch_cols[batch_idx], field.data_type()), - ) + .map(|(field, source)| -> datafusion_common::Result<_> { + let column = match source { + FieldSource::Table(file_idx) => batch_cols[*file_idx].clone(), + FieldSource::Generated(generator) => generator.generate(batch.clone())?, + FieldSource::Null => { + debug_assert!(field.is_nullable()); + new_null_array(field.data_type(), batch_rows) + } + }; + Ok(cast(&column, field.data_type())?) }) .collect::, _>>()?; // Necessary to handle empty batches let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); - let schema = Arc::clone(&self.projected_table_schema); + let schema = self.projected_table_schema.clone(); let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; Ok(record_batch) } From 58057f5a9c374f0d8025b3de9e94e069c52e59ed Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 14 Mar 2025 12:32:19 -0500 Subject: [PATCH 02/17] wip --- .../core/src/datasource/physical_plan/mod.rs | 5 +- .../datasource-parquet/src/row_filter.rs | 4 +- datafusion/datasource/src/schema_adapter.rs | 144 ++++++++++-------- 3 files changed, 85 insertions(+), 68 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index cae04e5ee6b8..b4b1be03ef22 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -89,7 +89,7 @@ mod tests { Field::new("c3", DataType::Float64, true), ])); - let adapter = DefaultSchemaAdapterFactory + let adapter = DefaultSchemaAdapterFactory::default() .create(table_schema.clone(), table_schema.clone()); let file_schema = Schema::new(vec![ @@ -147,7 +147,8 @@ mod tests { let indices = vec![1, 2, 4]; let schema = SchemaRef::from(table_schema.project(&indices).unwrap()); - let adapter = DefaultSchemaAdapterFactory.create(schema, table_schema.clone()); + let adapter = + DefaultSchemaAdapterFactory::default().create(schema, table_schema.clone()); let (mapping, projection) = adapter.map_schema(&file_schema).unwrap(); let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 263413d090c9..1c8940c9fe24 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -683,8 +683,8 @@ mod test { )]); let table_ref = Arc::new(table_schema.clone()); - let schema_adapter = - DefaultSchemaAdapterFactory::default().create(Arc::clone(&table_ref), table_ref); + let schema_adapter = DefaultSchemaAdapterFactory::default() + .create(Arc::clone(&table_ref), table_ref); let (schema_mapping, _) = schema_adapter .map_schema(&file_schema) .expect("creating schema mapping"); diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index f64df0bda52f..fb21a342c0a6 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -25,7 +25,6 @@ use arrow::array::{new_null_array, ArrayRef, RecordBatch, RecordBatchOptions}; use arrow::compute::{can_cast_types, cast}; use arrow::datatypes::{Field, Schema, SchemaRef}; use datafusion_common::plan_err; -use itertools::Itertools; use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; @@ -131,7 +130,7 @@ pub trait MissingColumnGenerator: Debug + Send + Sync { /// Otherwise, it should return the generated column as an `ArrayRef`. /// No casting or post processing is done by this method, so the generated column should match the data type /// of the `field` it is being generated for. - /// The order of + /// The order of fn generate(&self, batch: RecordBatch) -> datafusion_common::Result; /// Returns a list of column names that this generator depends on to generate the missing column. @@ -140,7 +139,8 @@ pub trait MissingColumnGenerator: Debug + Send + Sync { fn dependencies(&self) -> Vec; } -pub type ColumnGeneratorFactories = Vec>; +pub type ColumnGeneratorFactories = + Vec>; #[derive(Debug)] enum FieldSource { @@ -254,7 +254,10 @@ impl DefaultSchemaAdapterFactory { /// the same schema for both the projected table schema and the table /// schema. pub fn from_schema(table_schema: SchemaRef) -> Box { - Self { column_generators: vec![] }.create(Arc::clone(&table_schema), table_schema) + Self { + column_generators: vec![], + } + .create(Arc::clone(&table_schema), table_schema) } pub fn with_column_generator( @@ -263,14 +266,15 @@ impl DefaultSchemaAdapterFactory { ) -> Self { let mut generators = self.column_generators; generators.push(generator); - Self { column_generators: generators } + Self { + column_generators: generators, + } } - pub fn with_column_generators( - self, - generators: ColumnGeneratorFactories, - ) -> Self { - Self { column_generators: generators } + pub fn with_column_generators(self, generators: ColumnGeneratorFactories) -> Self { + Self { + column_generators: generators, + } } } @@ -324,7 +328,10 @@ impl SchemaAdapter for DefaultSchemaAdapter { /// /// Returns a [`SchemaMapping`] that can be applied to the output batch /// along with an ordered list of columns to project from the file - fn map_schema(&self, file_schema: &Schema) -> datafusion_common::Result<(Arc, Vec)> { + fn map_schema( + &self, + file_schema: &Schema, + ) -> datafusion_common::Result<(Arc, Vec)> { // Projection is the indexes into the file schema that we need to read // Note that readers will NOT respect the order of the columns in projection let mut projection = Vec::with_capacity(file_schema.fields().len()); @@ -333,56 +340,59 @@ impl SchemaAdapter for DefaultSchemaAdapter { // TODO can we refactor this away? let mut dependency_projection = Vec::with_capacity(file_schema.fields().len()); - let field_sources: Vec<_> = self - .projected_table_schema - .fields() - .iter() - .map(|table_field| { - if let Some((file_idx, file_field)) = file_schema.fields.find(table_field.name()) { - match can_cast_types(file_field.data_type(), table_field.data_type()) { - true => { - projection.push(file_idx); - } - false => { - return plan_err!( - "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", - file_field.name(), - file_field.data_type(), - table_field.data_type() - ) - } + let mut field_sources = Vec::with_capacity(file_schema.fields().len()); + // For each field in the projected table schema, check if it exists in the file schema + // and if not, check if we can generate it using one of the column generators + // If we can't generate it, then we need to fill it with nulls + // If we can't fill it with nulls, then we have a schema error + for table_field in self.projected_table_schema.fields() { + if let Some((file_idx, file_field)) = file_schema.fields.find(table_field.name()) { + // If the field exists in the file schema, check if we can cast it to the table schema + match can_cast_types(file_field.data_type(), table_field.data_type()) { + true => { + projection.push(file_idx); } - let projected_idx = projection.len(); - Ok(FieldSource::Table(projected_idx)) - } else if let Some((generator, dependencies)) = self.column_generators.iter().find_map(|factory| { - if let Some(generator) = factory.create(table_field, file_schema) { - let dependencies = generator.dependencies(); - dependencies.into_iter().map(|dep| { - file_schema.index_of(&dep).map_err(|_| { - plan_err!( - "Generator for column {} depends on column {} which is not present in the file schema, columns present in the file schema are: {:?}", - table_field.name(), dep, file_schema.fields().iter().map(|f| f.name()).collect::>() - ) - }) - }) - .collect::, _>>()?; - Ok(Some((generator, dependencies))) - } else { - Ok(None) + false => { + return plan_err!( + "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", + file_field.name(), + file_field.data_type(), + table_field.data_type() + ) } - }) { - dependency_projection.extend(dependencies); - Ok(FieldSource::Generated(generator)) - } else if table_field.is_nullable() { - Ok(FieldSource::Null) - } else { - plan_err!( - "Column {} is missing from the file schema, cannot be generated, and is non-nullable", - table_field.name() - ) } - }) - .try_collect()?; + let projected_idx = projection.len(); + field_sources.push(FieldSource::Table(projected_idx)); + } else if let Some(Ok(generator)) = self.column_generators.iter().find_map(|factory| { + factory.create(table_field, file_schema).map(|generator| { + let dependencies = generator.dependencies(); + // Check that all of the dependencies are present in the file schema, if not raise an error + for dep in &dependencies { + if let Ok(dep_idx) = file_schema.index_of(dep) { + dependency_projection.push(dep_idx); + } else { + return plan_err!( + "Generated column {} depends on column {} but column {} is not present in the file schema, columns present: {:?}", + table_field.name(), + dep, + dep, + file_schema.fields().iter().map(|f| f.name()).collect::>() + ); + } + } + Ok(generator) + }) + }) { + field_sources.push(FieldSource::Generated(generator)); + } else if table_field.is_nullable() { + field_sources.push(FieldSource::Null); + } else { + return plan_err!( + "Column {} is missing from the file schema, cannot be generated, and is non-nullable", + table_field.name() + ) + } + } for dep in dependency_projection { if !projection.contains(&dep) { @@ -390,7 +400,8 @@ impl SchemaAdapter for DefaultSchemaAdapter { } } - let (sorted_projection, field_sources) = sort_projections_and_sources(&projection, field_sources); + let (sorted_projection, field_sources) = + sort_projections_and_sources(&projection, field_sources); Ok(( Arc::new(SchemaMapping { @@ -403,7 +414,6 @@ impl SchemaAdapter for DefaultSchemaAdapter { } } - /// The parquet reader needs projections to be sorted (it does not respect the order of the columns in the projection, only the values) /// This function adjusts the projections and mappings so that they all reference sorted projections fn sort_projections_and_sources( @@ -423,7 +433,9 @@ fn sort_projections_and_sources( // Create a mapping from old positions to new positions in the projected schema let mut position_mapping = vec![0; projection.len()]; for (old_pos, &proj_val) in projection.iter().enumerate() { - position_mapping[old_pos] = *new_position_map.get(&proj_val).expect("should always be present"); + position_mapping[old_pos] = *new_position_map + .get(&proj_val) + .expect("should always be present"); } // Update field_sources to reflect the new positions @@ -436,7 +448,6 @@ fn sort_projections_and_sources( (sorted_projection, field_sources) } - /// The SchemaMapping struct holds a mapping from the file schema to the table /// schema and any necessary type conversions. /// @@ -485,7 +496,10 @@ impl SchemaMapper for SchemaMapping { let batch_rows = batch.num_rows(); let batch_cols = batch.columns().to_vec(); - debug_assert_eq!(self.projected_table_schema.fields().len(), self.field_sources.len()); + debug_assert_eq!( + self.projected_table_schema.fields().len(), + self.field_sources.len() + ); let cols = self .projected_table_schema @@ -499,7 +513,9 @@ impl SchemaMapper for SchemaMapping { .map(|(field, source)| -> datafusion_common::Result<_> { let column = match source { FieldSource::Table(file_idx) => batch_cols[*file_idx].clone(), - FieldSource::Generated(generator) => generator.generate(batch.clone())?, + FieldSource::Generated(generator) => { + generator.generate(batch.clone())? + } FieldSource::Null => { debug_assert!(field.is_nullable()); new_null_array(field.data_type(), batch_rows) From 6bfe3d4ca610e81301d2f76ab2c6b98c4f5ea9e5 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 14 Mar 2025 12:49:06 -0500 Subject: [PATCH 03/17] wip --- datafusion/datasource/src/schema_adapter.rs | 68 +++++++++++---------- 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index fb21a342c0a6..2d3b35ef78ea 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -25,6 +25,7 @@ use arrow::array::{new_null_array, ArrayRef, RecordBatch, RecordBatchOptions}; use arrow::compute::{can_cast_types, cast}; use arrow::datatypes::{Field, Schema, SchemaRef}; use datafusion_common::plan_err; +use itertools::Itertools; use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; @@ -340,57 +341,55 @@ impl SchemaAdapter for DefaultSchemaAdapter { // TODO can we refactor this away? let mut dependency_projection = Vec::with_capacity(file_schema.fields().len()); - let mut field_sources = Vec::with_capacity(file_schema.fields().len()); - // For each field in the projected table schema, check if it exists in the file schema - // and if not, check if we can generate it using one of the column generators - // If we can't generate it, then we need to fill it with nulls - // If we can't fill it with nulls, then we have a schema error - for table_field in self.projected_table_schema.fields() { - if let Some((file_idx, file_field)) = file_schema.fields.find(table_field.name()) { + let mut field_sources = + Vec::with_capacity(self.projected_table_schema.fields().len()); + + for field in self.projected_table_schema.fields() { + if let Some((file_idx, file_field)) = file_schema.fields.find(field.name()) { // If the field exists in the file schema, check if we can cast it to the table schema - match can_cast_types(file_field.data_type(), table_field.data_type()) { + match can_cast_types(file_field.data_type(), field.data_type()) { true => { projection.push(file_idx); + field_sources.push(FieldSource::Table(projection.len() - 1)); } false => { return plan_err!( "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", file_field.name(), file_field.data_type(), - table_field.data_type() + field.data_type() ) } } - let projected_idx = projection.len(); - field_sources.push(FieldSource::Table(projected_idx)); - } else if let Some(Ok(generator)) = self.column_generators.iter().find_map(|factory| { - factory.create(table_field, file_schema).map(|generator| { - let dependencies = generator.dependencies(); - // Check that all of the dependencies are present in the file schema, if not raise an error - for dep in &dependencies { - if let Ok(dep_idx) = file_schema.index_of(dep) { - dependency_projection.push(dep_idx); - } else { - return plan_err!( - "Generated column {} depends on column {} but column {} is not present in the file schema, columns present: {:?}", - table_field.name(), - dep, - dep, - file_schema.fields().iter().map(|f| f.name()).collect::>() - ); - } + } else if let Some(generator) = self + .column_generators + .iter() + .find_map(|factory| factory.create(field, file_schema)) + { + let dependencies = generator.dependencies(); + let mut dependency_indices = Vec::with_capacity(dependencies.len()); + for dep in &dependencies { + if let Ok(dep_idx) = file_schema.index_of(dep) { + dependency_indices.push(dep_idx); + } else { + return plan_err!( + "Generated column {} depends on column {} but column {} is not present in the file schema, columns present: {:?}", + field.name(), + dep, + dep, + file_schema.fields().iter().map(|f| f.name()).collect::>() + ); } - Ok(generator) - }) - }) { + } + dependency_projection.extend(dependency_indices); field_sources.push(FieldSource::Generated(generator)); - } else if table_field.is_nullable() { + } else if field.is_nullable() { field_sources.push(FieldSource::Null); } else { return plan_err!( "Column {} is missing from the file schema, cannot be generated, and is non-nullable", - table_field.name() - ) + field.name() + ); } } @@ -403,6 +402,9 @@ impl SchemaAdapter for DefaultSchemaAdapter { let (sorted_projection, field_sources) = sort_projections_and_sources(&projection, field_sources); + debug_assert!(sorted_projection.is_sorted()); + debug_assert!(sorted_projection.iter().all_unique()); + Ok(( Arc::new(SchemaMapping { projected_table_schema: self.projected_table_schema.clone(), From 6ecf25dfa5c1480d507cb2a136896b4e910544fa Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 14 Mar 2025 13:04:04 -0500 Subject: [PATCH 04/17] add tests --- .../core/src/datasource/physical_plan/mod.rs | 349 ++++++++++++++++++ 1 file changed, 349 insertions(+) diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index b4b1be03ef22..a06337e6abc6 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -198,4 +198,353 @@ mod tests { assert_eq!(c4.value(1), 2.0_f32); assert_eq!(c4.value(2), 3.0_f32); } + + #[test] + fn schema_adapter_with_column_generator() { + use crate::datasource::schema_adapter::{ + DefaultSchemaAdapterFactory, MissingColumnGenerator, MissingColumnGeneratorFactory, + }; + use arrow::array::{ArrayRef, Int32Array}; + use arrow::datatypes::Int32Type; + use std::fmt; + + // A simple generator that produces a constant value + #[derive(Debug)] + struct ConstantGenerator(Int32Array); + + impl MissingColumnGenerator for ConstantGenerator { + fn generate(&self, _batch: RecordBatch) -> datafusion_common::Result { + Ok(Arc::new(self.0.clone())) + } + + fn dependencies(&self) -> Vec { + vec![] + } + } + + // A factory that produces a constant generator for a specific field + #[derive(Debug)] + struct ConstantGeneratorFactory { + field_name: String, + value: i32, + } + + impl fmt::Display for ConstantGeneratorFactory { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "ConstantGeneratorFactory({}={})", self.field_name, self.value) + } + } + + impl MissingColumnGeneratorFactory for ConstantGeneratorFactory { + fn create( + &self, + field: &Field, + _file_schema: &Schema, + ) -> Option> { + if field.name() == &self.field_name && field.data_type() == &DataType::Int32 { + let array = Int32Array::from(vec![self.value; 3]); + Some(Arc::new(ConstantGenerator(array))) + } else { + None + } + } + } + + // A generator that depends on another column + #[derive(Debug)] + struct MultiplyByTwoGenerator { + dependency: String, + } + + impl MissingColumnGenerator for MultiplyByTwoGenerator { + fn generate(&self, batch: RecordBatch) -> datafusion_common::Result { + let idx = batch + .schema() + .index_of(&self.dependency) + .expect("dependency should exist"); + let col = batch.column(idx); + let col = col.as_primitive::(); + + let result: Int32Array = col.iter().map(|v| v.map(|x| x * 2)).collect(); + Ok(Arc::new(result)) + } + + fn dependencies(&self) -> Vec { + vec![self.dependency.clone()] + } + } + + #[derive(Debug)] + struct MultiplyByTwoGeneratorFactory; + + impl fmt::Display for MultiplyByTwoGeneratorFactory { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "MultiplyByTwoGeneratorFactory") + } + } + + impl MissingColumnGeneratorFactory for MultiplyByTwoGeneratorFactory { + fn create( + &self, + field: &Field, + file_schema: &Schema, + ) -> Option> { + if field.name() == "doubled_id" && field.data_type() == &DataType::Int32 { + // Look for id column to use as our dependency + if file_schema.column_with_name("id").is_some() { + Some(Arc::new(MultiplyByTwoGenerator { + dependency: "id".to_string(), + })) + } else { + None + } + } else { + None + } + } + } + + // Test with a constant generator + let table_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("c1", DataType::Boolean, true), + Field::new("missing_column", DataType::Int32, true), + ])); + + let file_schema = Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("c1", DataType::Boolean, true), + ]); + + // Create a factory that will generate a constant value for the missing column + let generator_factory = Arc::new(ConstantGeneratorFactory { + field_name: "missing_column".to_string(), + value: 42, + }); + + let adapter = DefaultSchemaAdapterFactory::default() + .with_column_generator(generator_factory) + .create(table_schema.clone(), table_schema.clone()); + + let (mapping, projection) = adapter.map_schema(&file_schema).unwrap(); + + // Create a batch to test + let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); + let c1 = BooleanArray::from(vec![Some(true), Some(false), Some(true)]); + let batch = RecordBatch::try_new( + Arc::new(file_schema.clone()), + vec![Arc::new(id), Arc::new(c1)], + ) + .unwrap(); + + let projected = batch.project(&projection).unwrap(); + let mapped_batch = mapping.map_batch(projected).unwrap(); + + // Verify the result + assert_eq!(mapped_batch.schema(), table_schema); + assert_eq!(mapped_batch.num_columns(), 3); + assert_eq!(mapped_batch.num_rows(), 3); + + // Check the missing column was generated with constant value + let missing_col = mapped_batch.column(2).as_primitive::(); + assert_eq!(missing_col.value(0), 42); + assert_eq!(missing_col.value(1), 42); + assert_eq!(missing_col.value(2), 42); + + // Test with a generator that depends on another column + let table_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("doubled_id", DataType::Int32, true), + ])); + + let file_schema = Schema::new(vec![Field::new("id", DataType::Int32, true)]); + + // Set up the generator factory + let adapter = DefaultSchemaAdapterFactory::default() + .with_column_generator(Arc::new(MultiplyByTwoGeneratorFactory)) + .create(table_schema.clone(), table_schema.clone()); + + let (mapping, projection) = adapter.map_schema(&file_schema).unwrap(); + + // Create a batch with just an id column + let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); + let batch = RecordBatch::try_new( + Arc::new(file_schema.clone()), + vec![Arc::new(id)], + ) + .unwrap(); + + let projected = batch.project(&projection).unwrap(); + let mapped_batch = mapping.map_batch(projected).unwrap(); + + // Verify the result + assert_eq!(mapped_batch.schema(), table_schema); + assert_eq!(mapped_batch.num_columns(), 2); + assert_eq!(mapped_batch.num_rows(), 3); + + // Check the doubled_id column was generated correctly + let id_col = mapped_batch.column(0).as_primitive::(); + let doubled_col = mapped_batch.column(1).as_primitive::(); + + assert_eq!(doubled_col.value(0), id_col.value(0) * 2); + assert_eq!(doubled_col.value(1), id_col.value(1) * 2); + assert_eq!(doubled_col.value(2), id_col.value(2) * 2); + } + + #[test] + fn schema_adapter_with_multiple_generators() { + use crate::datasource::schema_adapter::{ + DefaultSchemaAdapterFactory, MissingColumnGenerator, MissingColumnGeneratorFactory, + }; + use arrow::array::{ArrayRef, Int32Array, StringArray}; + use arrow::datatypes::Int32Type; + use std::fmt; + + // A generator for creating a description from an id + #[derive(Debug)] + struct IdToDescriptionGenerator; + + impl MissingColumnGenerator for IdToDescriptionGenerator { + fn generate(&self, batch: RecordBatch) -> datafusion_common::Result { + let idx = batch.schema().index_of("id").expect("id should exist"); + let col = batch.column(idx); + let col = col.as_primitive::(); + + let result: StringArray = col + .iter() + .map(|v| { + v.map(|id| match id { + 1 => "Product One", + 2 => "Product Two", + 3 => "Product Three", + _ => "Unknown Product", + }) + }) + .collect(); + Ok(Arc::new(result)) + } + + fn dependencies(&self) -> Vec { + vec!["id".to_string()] + } + } + + #[derive(Debug)] + struct IdToDescriptionGeneratorFactory; + + impl fmt::Display for IdToDescriptionGeneratorFactory { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "IdToDescriptionGeneratorFactory") + } + } + + impl MissingColumnGeneratorFactory for IdToDescriptionGeneratorFactory { + fn create( + &self, + field: &Field, + file_schema: &Schema, + ) -> Option> { + if field.name() == "description" && field.data_type() == &DataType::Utf8 { + if file_schema.column_with_name("id").is_some() { + Some(Arc::new(IdToDescriptionGenerator)) + } else { + None + } + } else { + None + } + } + } + + // A generator for creating a score column with constant value + #[derive(Debug)] + struct ScoreGenerator(i32); + + impl MissingColumnGenerator for ScoreGenerator { + fn generate(&self, batch: RecordBatch) -> datafusion_common::Result { + let len = batch.num_rows(); + Ok(Arc::new(Int32Array::from(vec![self.0; len]))) + } + + fn dependencies(&self) -> Vec { + vec![] + } + } + + #[derive(Debug)] + struct ScoreGeneratorFactory; + + impl fmt::Display for ScoreGeneratorFactory { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "ScoreGeneratorFactory") + } + } + + impl MissingColumnGeneratorFactory for ScoreGeneratorFactory { + fn create( + &self, + field: &Field, + _file_schema: &Schema, + ) -> Option> { + if field.name() == "score" && field.data_type() == &DataType::Int32 { + Some(Arc::new(ScoreGenerator(100))) + } else { + None + } + } + } + + // Set up the schema with multiple missing columns + let table_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("description", DataType::Utf8, true), + Field::new("score", DataType::Int32, true), + ])); + + let file_schema = Schema::new(vec![Field::new("id", DataType::Int32, true)]); + + // Create factory that will generate multiple missing columns + let adapter = DefaultSchemaAdapterFactory::default() + .with_column_generator(Arc::new(IdToDescriptionGeneratorFactory)) + .with_column_generator(Arc::new(ScoreGeneratorFactory)) + .create(table_schema.clone(), table_schema.clone()); + + let (mapping, projection) = adapter.map_schema(&file_schema).unwrap(); + + // Create a batch to test + let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); + let batch = RecordBatch::try_new( + Arc::new(file_schema.clone()), + vec![Arc::new(id)], + ) + .unwrap(); + + let projected = batch.project(&projection).unwrap(); + let mapped_batch = mapping.map_batch(projected).unwrap(); + + // Verify the result + assert_eq!(mapped_batch.schema(), table_schema); + assert_eq!(mapped_batch.num_columns(), 3); + assert_eq!(mapped_batch.num_rows(), 3); + + // Check both missing columns were generated correctly + let id_col = mapped_batch.column(0).as_primitive::(); + let description_col = mapped_batch.column(1).as_string::(); + let score_col = mapped_batch.column(2).as_primitive::(); + + // Verify id column + assert_eq!(id_col.value(0), 1); + assert_eq!(id_col.value(1), 2); + assert_eq!(id_col.value(2), 3); + + // Verify description column generated from id + assert_eq!(description_col.value(0), "Product One"); + assert_eq!(description_col.value(1), "Product Two"); + assert_eq!(description_col.value(2), "Product Three"); + + // Verify score column has constant value + assert_eq!(score_col.value(0), 100); + assert_eq!(score_col.value(1), 100); + assert_eq!(score_col.value(2), 100); + } } From e581bc44572d182cd5f637915e6d9c898f2950cd Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 14 Mar 2025 13:21:00 -0500 Subject: [PATCH 05/17] wip --- .../src/datasource/physical_plan/parquet.rs | 103 ++++++++++++++++++ .../datasource-parquet/src/row_filter.rs | 60 +++++----- 2 files changed, 133 insertions(+), 30 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 888f3ad9e3b9..324dc48c3b51 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -52,6 +52,7 @@ mod tests { use datafusion_datasource::file_format::FileFormat; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_scan_config::FileScanConfig; + use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, MissingColumnGenerator, MissingColumnGeneratorFactory, SchemaAdapterFactory}; use datafusion_datasource::source::DataSourceExec; use datafusion_datasource::{FileRange, PartitionedFile}; @@ -90,6 +91,7 @@ mod tests { /// options. #[derive(Debug, Default)] struct RoundTrip { + schema_adapter_factory: Option>, projection: Option>, schema: Option, predicate: Option, @@ -102,6 +104,11 @@ mod tests { Default::default() } + fn with_schema_adapter_factory(mut self, schema_adapter_factory: Arc) -> Self { + self.schema_adapter_factory = Some(schema_adapter_factory); + self + } + fn with_projection(mut self, projection: Vec) -> Self { self.projection = Some(projection); self @@ -143,6 +150,7 @@ mod tests { predicate, pushdown_predicate, page_index_predicate, + schema_adapter_factory, } = self; let file_schema = match schema { @@ -174,6 +182,10 @@ mod tests { .with_reorder_filters(true); } + if let Some(schema_adapter_factory) = schema_adapter_factory { + source = source.with_schema_adapter_factory(schema_adapter_factory); + } + if page_index_predicate { source = source.with_enable_page_index(true); } @@ -224,6 +236,97 @@ mod tests { ) } + #[tokio::test] + async fn test_pushdown_with_column_generators() { + #[derive(Debug, Clone)] + struct ConstantMissingColumnGenerator { + default_value: ScalarValue, + } + + impl MissingColumnGenerator for ConstantMissingColumnGenerator { + fn dependencies(&self) -> Vec { + vec![] + } + + fn generate(&self, batch: RecordBatch) -> Result { + let num_rows = batch.num_rows(); + let array = self.default_value.to_array_of_size(num_rows)?; + Ok(array) + } + } + + #[derive(Debug, Clone)] + struct ConstantMissingColumnGeneratorFactory { + column: String, + default_value: ScalarValue, + } + + impl MissingColumnGeneratorFactory for ConstantMissingColumnGeneratorFactory { + fn create( + &self, + field: &Field, + _file_schema: &Schema, + ) -> Option> { + if *field.name() == self.column { + Some(Arc::new(ConstantMissingColumnGenerator { + default_value: self.default_value.clone(), + })) + } else { + None + } + } + } + + let c1: ArrayRef = + Arc::new(Int32Array::from(vec![1, 2, 3])); + + let file_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, true), + ])); + + let table_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, true), + Field::new("c2", DataType::Int32, true), + ])); + + let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap(); + + let filter = col("c2").eq(lit(1_i32)); + + let missing_column_generator_factory = Arc::new(ConstantMissingColumnGeneratorFactory { + column: "c2".to_string(), + default_value: ScalarValue::Int32(Some(1)), + }); + + let schema_adapter_factory = Arc::new( + DefaultSchemaAdapterFactory::default() + .with_column_generator(missing_column_generator_factory) + ); + + let read = RoundTrip::new() + .with_schema(table_schema) + .with_predicate(filter) + .with_pushdown_predicate() + .with_schema_adapter_factory(schema_adapter_factory) + .round_trip_to_batches(vec![batch]) + .await + .unwrap(); + let expected = [ + "+-----+----+----+", + "| c1 | c3 | c2 |", + "+-----+----+----+", + "| | | |", + "| | 10 | 1 |", + "| | 20 | |", + "| | 20 | 2 |", + "| Foo | 10 | |", + "| bar | | |", + "+-----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + + } + #[tokio::test] async fn evolved_schema() { let c1: ArrayRef = diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 1c8940c9fe24..9a812fe24bf1 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -352,36 +352,36 @@ impl TreeNodeRewriter for PushdownChecker<'_> { Ok(Transformed::no(node)) } - /// After visiting all children, rewrite column references to nulls if - /// they are not in the file schema. - /// We do this because they won't be relevant if they're not in the file schema, since that's - /// the only thing we're dealing with here as this is only used for the parquet pushdown during - /// scanning - fn f_up( - &mut self, - expr: Arc, - ) -> Result>> { - if let Some(column) = expr.as_any().downcast_ref::() { - // if the expression is a column, is it in the file schema? - if self.file_schema.field_with_name(column.name()).is_err() { - return self - .table_schema - .field_with_name(column.name()) - .and_then(|field| { - // Replace the column reference with a NULL (using the type from the table schema) - // e.g. `column = 'foo'` is rewritten be transformed to `NULL = 'foo'` - // - // See comments on `FilterCandidateBuilder` for more information - let null_value = ScalarValue::try_from(field.data_type())?; - Ok(Transformed::yes(Arc::new(Literal::new(null_value)) as _)) - }) - // If the column is not in the table schema, should throw the error - .map_err(|e| arrow_datafusion_err!(e)); - } - } - - Ok(Transformed::no(expr)) - } + // /// After visiting all children, rewrite column references to nulls if + // /// they are not in the file schema. + // /// We do this because they won't be relevant if they're not in the file schema, since that's + // /// the only thing we're dealing with here as this is only used for the parquet pushdown during + // /// scanning + // fn f_up( + // &mut self, + // expr: Arc, + // ) -> Result>> { + // if let Some(column) = expr.as_any().downcast_ref::() { + // // if the expression is a column, is it in the file schema? + // if self.file_schema.field_with_name(column.name()).is_err() { + // return self + // .table_schema + // .field_with_name(column.name()) + // .and_then(|field| { + // // Replace the column reference with a NULL (using the type from the table schema) + // // e.g. `column = 'foo'` is rewritten be transformed to `NULL = 'foo'` + // // + // // See comments on `FilterCandidateBuilder` for more information + // let null_value = ScalarValue::try_from(field.data_type())?; + // Ok(Transformed::yes(Arc::new(Literal::new(null_value)) as _)) + // }) + // // If the column is not in the table schema, should throw the error + // .map_err(|e| arrow_datafusion_err!(e)); + // } + // } + + // Ok(Transformed::no(expr)) + // } } type ProjectionAndExpr = (BTreeSet, Arc); From a14e4121b8c2c497aa375292f6c7b2eba46fb926 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 16 Mar 2025 14:22:39 -0500 Subject: [PATCH 06/17] wip --- datafusion/datasource-parquet/src/row_filter.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 9a812fe24bf1..dd5eb5f46c08 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -272,6 +272,9 @@ impl<'a> FilterCandidateBuilder<'a> { return Ok(None); }; + println!("required_indices: {required_indices:?}"); + println!("rewritten_expr: {rewritten_expr}"); + let required_bytes = size_of_columns(&required_indices, metadata)?; let can_use_index = columns_sorted(&required_indices, metadata)?; From b3396dbb0116d9f728e010609db28da0b26fcd44 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 16 Mar 2025 15:44:48 -0500 Subject: [PATCH 07/17] fix --- datafusion/core/src/datasource/mod.rs | 7 - .../core/src/datasource/physical_plan/mod.rs | 52 ++- .../src/datasource/physical_plan/parquet.rs | 60 ++- datafusion/datasource-parquet/src/opener.rs | 3 +- .../datasource-parquet/src/row_filter.rs | 414 +++++++----------- datafusion/datasource/src/schema_adapter.rs | 97 +--- 6 files changed, 234 insertions(+), 399 deletions(-) diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 18a1318dd40d..fd186fbfe7dd 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -347,12 +347,5 @@ mod tests { Ok(RecordBatch::try_new(schema, new_columns).unwrap()) } - - fn map_partial_batch( - &self, - batch: RecordBatch, - ) -> datafusion_common::Result { - self.map_batch(batch) - } } } diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index a06337e6abc6..834bd522789b 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -202,7 +202,8 @@ mod tests { #[test] fn schema_adapter_with_column_generator() { use crate::datasource::schema_adapter::{ - DefaultSchemaAdapterFactory, MissingColumnGenerator, MissingColumnGeneratorFactory, + DefaultSchemaAdapterFactory, MissingColumnGenerator, + MissingColumnGeneratorFactory, }; use arrow::array::{ArrayRef, Int32Array}; use arrow::datatypes::Int32Type; @@ -213,7 +214,10 @@ mod tests { struct ConstantGenerator(Int32Array); impl MissingColumnGenerator for ConstantGenerator { - fn generate(&self, _batch: RecordBatch) -> datafusion_common::Result { + fn generate( + &self, + _batch: RecordBatch, + ) -> datafusion_common::Result { Ok(Arc::new(self.0.clone())) } @@ -231,7 +235,11 @@ mod tests { impl fmt::Display for ConstantGeneratorFactory { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "ConstantGeneratorFactory({}={})", self.field_name, self.value) + write!( + f, + "ConstantGeneratorFactory({}={})", + self.field_name, self.value + ) } } @@ -241,7 +249,9 @@ mod tests { field: &Field, _file_schema: &Schema, ) -> Option> { - if field.name() == &self.field_name && field.data_type() == &DataType::Int32 { + if field.name() == &self.field_name + && field.data_type() == &DataType::Int32 + { let array = Int32Array::from(vec![self.value; 3]); Some(Arc::new(ConstantGenerator(array))) } else { @@ -257,7 +267,10 @@ mod tests { } impl MissingColumnGenerator for MultiplyByTwoGenerator { - fn generate(&self, batch: RecordBatch) -> datafusion_common::Result { + fn generate( + &self, + batch: RecordBatch, + ) -> datafusion_common::Result { let idx = batch .schema() .index_of(&self.dependency) @@ -368,11 +381,9 @@ mod tests { // Create a batch with just an id column let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); - let batch = RecordBatch::try_new( - Arc::new(file_schema.clone()), - vec![Arc::new(id)], - ) - .unwrap(); + let batch = + RecordBatch::try_new(Arc::new(file_schema.clone()), vec![Arc::new(id)]) + .unwrap(); let projected = batch.project(&projection).unwrap(); let mapped_batch = mapping.map_batch(projected).unwrap(); @@ -394,7 +405,8 @@ mod tests { #[test] fn schema_adapter_with_multiple_generators() { use crate::datasource::schema_adapter::{ - DefaultSchemaAdapterFactory, MissingColumnGenerator, MissingColumnGeneratorFactory, + DefaultSchemaAdapterFactory, MissingColumnGenerator, + MissingColumnGeneratorFactory, }; use arrow::array::{ArrayRef, Int32Array, StringArray}; use arrow::datatypes::Int32Type; @@ -405,7 +417,10 @@ mod tests { struct IdToDescriptionGenerator; impl MissingColumnGenerator for IdToDescriptionGenerator { - fn generate(&self, batch: RecordBatch) -> datafusion_common::Result { + fn generate( + &self, + batch: RecordBatch, + ) -> datafusion_common::Result { let idx = batch.schema().index_of("id").expect("id should exist"); let col = batch.column(idx); let col = col.as_primitive::(); @@ -461,7 +476,10 @@ mod tests { struct ScoreGenerator(i32); impl MissingColumnGenerator for ScoreGenerator { - fn generate(&self, batch: RecordBatch) -> datafusion_common::Result { + fn generate( + &self, + batch: RecordBatch, + ) -> datafusion_common::Result { let len = batch.num_rows(); Ok(Arc::new(Int32Array::from(vec![self.0; len]))) } @@ -513,11 +531,9 @@ mod tests { // Create a batch to test let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); - let batch = RecordBatch::try_new( - Arc::new(file_schema.clone()), - vec![Arc::new(id)], - ) - .unwrap(); + let batch = + RecordBatch::try_new(Arc::new(file_schema.clone()), vec![Arc::new(id)]) + .unwrap(); let projected = batch.project(&projection).unwrap(); let mapped_batch = mapping.map_batch(projected).unwrap(); diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 324dc48c3b51..d3937a5ecfeb 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -52,7 +52,10 @@ mod tests { use datafusion_datasource::file_format::FileFormat; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_scan_config::FileScanConfig; - use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, MissingColumnGenerator, MissingColumnGeneratorFactory, SchemaAdapterFactory}; + use datafusion_datasource::schema_adapter::{ + DefaultSchemaAdapterFactory, MissingColumnGenerator, + MissingColumnGeneratorFactory, SchemaAdapterFactory, + }; use datafusion_datasource::source::DataSourceExec; use datafusion_datasource::{FileRange, PartitionedFile}; @@ -104,7 +107,10 @@ mod tests { Default::default() } - fn with_schema_adapter_factory(mut self, schema_adapter_factory: Arc) -> Self { + fn with_schema_adapter_factory( + mut self, + schema_adapter_factory: Arc, + ) -> Self { self.schema_adapter_factory = Some(schema_adapter_factory); self } @@ -236,6 +242,36 @@ mod tests { ) } + #[tokio::test] + async fn test_pushdown_with_missing_column_in_file() { + let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + + let file_schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)])); + + let table_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, true), + Field::new("c2", DataType::Int32, true), + ])); + + let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap(); + + // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`, + // the default behavior is to fill in missing columns with nulls. + // Thus this predicate will come back as false. + let filter = col("c1").eq(lit(1_i32)).and(col("c2").eq(lit(1_i32))); + + let read = RoundTrip::new() + .with_schema(table_schema) + .with_predicate(filter) + .with_pushdown_predicate() + .round_trip_to_batches(vec![batch]) + .await + .unwrap(); + let total_rows = read.iter().map(|b| b.num_rows()).sum::(); + assert_eq!(total_rows, 0, "Expected no rows to match the predicate"); + } + #[tokio::test] async fn test_pushdown_with_column_generators() { #[derive(Debug, Clone)] @@ -277,12 +313,10 @@ mod tests { } } - let c1: ArrayRef = - Arc::new(Int32Array::from(vec![1, 2, 3])); + let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); - let file_schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Int32, true), - ])); + let file_schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)])); let table_schema = Arc::new(Schema::new(vec![ Field::new("c1", DataType::Int32, true), @@ -293,14 +327,15 @@ mod tests { let filter = col("c2").eq(lit(1_i32)); - let missing_column_generator_factory = Arc::new(ConstantMissingColumnGeneratorFactory { - column: "c2".to_string(), - default_value: ScalarValue::Int32(Some(1)), - }); + let missing_column_generator_factory = + Arc::new(ConstantMissingColumnGeneratorFactory { + column: "c2".to_string(), + default_value: ScalarValue::Int32(Some(1)), + }); let schema_adapter_factory = Arc::new( DefaultSchemaAdapterFactory::default() - .with_column_generator(missing_column_generator_factory) + .with_column_generator(missing_column_generator_factory), ); let read = RoundTrip::new() @@ -324,7 +359,6 @@ mod tests { "+-----+----+----+", ]; assert_batches_sorted_eq!(expected, &read); - } #[tokio::test] diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 3c623f558e43..38a06aa94162 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -107,6 +107,7 @@ impl FileOpener for ParquetOpener { let projected_schema = SchemaRef::from(self.table_schema.project(&self.projection)?); + let schema_adapter_factory = Arc::clone(&self.schema_adapter_factory); let schema_adapter = self .schema_adapter_factory .create(projected_schema, Arc::clone(&self.table_schema)); @@ -173,7 +174,7 @@ impl FileOpener for ParquetOpener { builder.metadata(), reorder_predicates, &file_metrics, - Arc::clone(&schema_mapping), + schema_adapter_factory.clone(), ); match row_filter { diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index dd5eb5f46c08..264c13735a18 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -64,7 +64,7 @@ use std::collections::BTreeSet; use std::sync::Arc; use arrow::array::BooleanArray; -use arrow::datatypes::{DataType, Schema}; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter}; @@ -72,12 +72,10 @@ use parquet::arrow::ProjectionMask; use parquet::file::metadata::ParquetMetaData; use datafusion_common::cast::as_boolean_array; -use datafusion_common::tree_node::{ - Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter, -}; -use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue}; -use datafusion_datasource::schema_adapter::SchemaMapper; -use datafusion_physical_expr::expressions::{Column, Literal}; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; +use datafusion_common::Result; +use datafusion_datasource::schema_adapter::{SchemaAdapterFactory, SchemaMapper}; +use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::utils::reassign_predicate_columns; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; @@ -102,8 +100,6 @@ pub(crate) struct DatafusionArrowPredicate { /// Path to the columns in the parquet schema required to evaluate the /// expression projection_mask: ProjectionMask, - /// Columns required to evaluate the expression in the arrow schema - projection: Vec, /// how many rows were filtered out by this predicate rows_pruned: metrics::Count, /// how many rows passed this predicate @@ -118,27 +114,17 @@ impl DatafusionArrowPredicate { /// Create a new `DatafusionArrowPredicate` from a `FilterCandidate` pub fn try_new( candidate: FilterCandidate, - schema: &Schema, metadata: &ParquetMetaData, rows_pruned: metrics::Count, rows_matched: metrics::Count, time: metrics::Time, - schema_mapping: Arc, ) -> Result { - let schema = Arc::new(schema.project(&candidate.projection)?); - let physical_expr = reassign_predicate_columns(candidate.expr, &schema, true)?; - - // ArrowPredicate::evaluate is passed columns in the order they appear in the file - // If the predicate has multiple columns, we therefore must project the columns based - // on the order they appear in the file - let projection = match candidate.projection.len() { - 0 | 1 => vec![], - 2.. => remap_projection(&candidate.projection), - }; + let projected_schema = candidate.filter_schema.clone(); + let physical_expr = + reassign_predicate_columns(candidate.expr, &projected_schema, true)?; Ok(Self { physical_expr, - projection, projection_mask: ProjectionMask::roots( metadata.file_metadata().schema_descr(), candidate.projection, @@ -146,7 +132,7 @@ impl DatafusionArrowPredicate { rows_pruned, rows_matched, time, - schema_mapping, + schema_mapping: candidate.schema_mapper, }) } } @@ -156,12 +142,8 @@ impl ArrowPredicate for DatafusionArrowPredicate { &self.projection_mask } - fn evaluate(&mut self, mut batch: RecordBatch) -> ArrowResult { - if !self.projection.is_empty() { - batch = batch.project(&self.projection)?; - }; - - let batch = self.schema_mapping.map_partial_batch(batch)?; + fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult { + let batch = self.schema_mapping.map_batch(batch)?; // scoped timer updates on drop let mut timer = self.time.timer(); @@ -194,9 +176,22 @@ impl ArrowPredicate for DatafusionArrowPredicate { /// See the module level documentation for more information. pub(crate) struct FilterCandidate { expr: Arc, + /// Estimate for the total number of bytes that will need to be processed + /// to evaluate this filter. This is used to estimate the cost of evaluating + /// the filter and to order the filters when `reorder_predicates` is true. + /// This is generated by summing the compressed size of all columns that the filter references. required_bytes: usize, + /// Can this filter use an index (e.g. a page index) to prune rows? can_use_index: bool, + /// The projection to read from the file schema to get the columns + /// required to pass thorugh a `SchemaMapper` to the table schema + /// upon which we then evaluate the filter expression. projection: Vec, + /// A `SchemaMapper` used to map batches read from the file schema to + /// the filter's projection of the table schema. + schema_mapper: Arc, + /// The projected table schema that this filter references + filter_schema: SchemaRef, } /// Helper to build a `FilterCandidate`. @@ -220,41 +215,40 @@ pub(crate) struct FilterCandidate { /// but old files do not have the columns. /// /// When a file is missing a column from the table schema, the value of the -/// missing column is filled in with `NULL` via a `SchemaAdapter`. +/// missing column is filled in by a `SchemaAdapter` (by default as `NULL`). /// /// When a predicate is pushed down to the parquet reader, the predicate is -/// evaluated in the context of the file schema. If the predicate references a -/// column that is in the table schema but not in the file schema, the column -/// reference must be rewritten to a literal expression that represents the -/// `NULL` value that would be produced by the `SchemaAdapter`. -/// -/// For example, if: -/// * The table schema is `id, name, address` -/// * The file schema is `id, name` (missing the `address` column) -/// * predicate is `address = 'foo'` -/// -/// When evaluating the predicate as a filter on the parquet file, the predicate -/// must be rewritten to `NULL = 'foo'` as the `address` column will be filled -/// in with `NULL` values during the rest of the evaluation. -struct FilterCandidateBuilder<'a> { +/// evaluated in the context of the file schema. +/// For each predicate we build a filter schema which is the projection of the table +/// schema that contains only the columns that this filter references. +/// If any columns from the file schema are missing from a particular file they are +/// added by the `SchemaAdapter`, by default as `NULL`. +struct FilterCandidateBuilder { expr: Arc, - /// The schema of this parquet file - file_schema: &'a Schema, + /// The schema of this parquet file. + /// Columns may have different types from the table schema and there may be + /// columns in the file schema that are not in the table schema or columns that + /// are in the table schema that are not in the file schema. + file_schema: SchemaRef, /// The schema of the table (merged schema) -- columns may be in different /// order than in the file and have columns that are not in the file schema - table_schema: &'a Schema, + table_schema: SchemaRef, + /// A `SchemaAdapterFactory` used to map the file schema to the table schema. + schema_adapter_factory: Arc, } -impl<'a> FilterCandidateBuilder<'a> { +impl FilterCandidateBuilder { pub fn new( expr: Arc, - file_schema: &'a Schema, - table_schema: &'a Schema, + file_schema: Arc, + table_schema: Arc, + schema_adapter_factory: Arc, ) -> Self { Self { expr, file_schema, table_schema, + schema_adapter_factory, } } @@ -266,23 +260,32 @@ impl<'a> FilterCandidateBuilder<'a> { /// * `Ok(None)` if the expression cannot be used as an ArrowFilter /// * `Err(e)` if an error occurs while building the candidate pub fn build(self, metadata: &ParquetMetaData) -> Result> { - let Some((required_indices, rewritten_expr)) = - pushdown_columns(self.expr, self.file_schema, self.table_schema)? + let Some(required_indices_into_table_schema) = + pushdown_columns(&self.expr, &self.table_schema)? else { return Ok(None); }; - println!("required_indices: {required_indices:?}"); - println!("rewritten_expr: {rewritten_expr}"); + let projected_table_schema = Arc::new( + self.table_schema + .project(&required_indices_into_table_schema)?, + ); + + let (schema_mapper, projection_into_file_schema) = self + .schema_adapter_factory + .create(projected_table_schema.clone(), self.table_schema) + .map_schema(&self.file_schema)?; - let required_bytes = size_of_columns(&required_indices, metadata)?; - let can_use_index = columns_sorted(&required_indices, metadata)?; + let required_bytes = size_of_columns(&projection_into_file_schema, metadata)?; + let can_use_index = columns_sorted(&projection_into_file_schema, metadata)?; Ok(Some(FilterCandidate { - expr: rewritten_expr, + expr: self.expr, required_bytes, can_use_index, - projection: required_indices.into_iter().collect(), + projection: projection_into_file_schema, + schema_mapper: Arc::clone(&schema_mapper), + filter_schema: Arc::clone(&projected_table_schema), })) } } @@ -297,33 +300,29 @@ struct PushdownChecker<'schema> { /// Does the expression reference any columns that are in the table /// schema but not in the file schema? projected_columns: bool, - // the indices of all the columns found within the given expression which exist inside the given - // [`file_schema`] - required_column_indices: BTreeSet, - file_schema: &'schema Schema, + // Indices into the table schema of the columns required to evaluate the expression + required_columns: BTreeSet, table_schema: &'schema Schema, } impl<'schema> PushdownChecker<'schema> { - fn new(file_schema: &'schema Schema, table_schema: &'schema Schema) -> Self { + fn new(table_schema: &'schema Schema) -> Self { Self { non_primitive_columns: false, projected_columns: false, - required_column_indices: BTreeSet::default(), - file_schema, + required_columns: BTreeSet::default(), table_schema, } } fn check_single_column(&mut self, column_name: &str) -> Option { - if let Ok(idx) = self.file_schema.index_of(column_name) { - self.required_column_indices.insert(idx); - - if DataType::is_nested(self.file_schema.field(idx).data_type()) { + if let Ok(idx) = self.table_schema.index_of(column_name) { + self.required_columns.insert(idx); + if DataType::is_nested(self.table_schema.field(idx).data_type()) { self.non_primitive_columns = true; return Some(TreeNodeRecursion::Jump); } - } else if self.table_schema.index_of(column_name).is_err() { + } else { // If the column does not exist in the (un-projected) table schema then // it must be a projected column. self.projected_columns = true; @@ -339,82 +338,40 @@ impl<'schema> PushdownChecker<'schema> { } } -impl TreeNodeRewriter for PushdownChecker<'_> { +impl TreeNodeVisitor<'_> for PushdownChecker<'_> { type Node = Arc; - fn f_down( - &mut self, - node: Arc, - ) -> Result>> { + fn f_down(&mut self, node: &Self::Node) -> Result { if let Some(column) = node.as_any().downcast_ref::() { if let Some(recursion) = self.check_single_column(column.name()) { - return Ok(Transformed::new(node, false, recursion)); + return Ok(recursion); } } - Ok(Transformed::no(node)) + Ok(TreeNodeRecursion::Continue) } - - // /// After visiting all children, rewrite column references to nulls if - // /// they are not in the file schema. - // /// We do this because they won't be relevant if they're not in the file schema, since that's - // /// the only thing we're dealing with here as this is only used for the parquet pushdown during - // /// scanning - // fn f_up( - // &mut self, - // expr: Arc, - // ) -> Result>> { - // if let Some(column) = expr.as_any().downcast_ref::() { - // // if the expression is a column, is it in the file schema? - // if self.file_schema.field_with_name(column.name()).is_err() { - // return self - // .table_schema - // .field_with_name(column.name()) - // .and_then(|field| { - // // Replace the column reference with a NULL (using the type from the table schema) - // // e.g. `column = 'foo'` is rewritten be transformed to `NULL = 'foo'` - // // - // // See comments on `FilterCandidateBuilder` for more information - // let null_value = ScalarValue::try_from(field.data_type())?; - // Ok(Transformed::yes(Arc::new(Literal::new(null_value)) as _)) - // }) - // // If the column is not in the table schema, should throw the error - // .map_err(|e| arrow_datafusion_err!(e)); - // } - // } - - // Ok(Transformed::no(expr)) - // } } -type ProjectionAndExpr = (BTreeSet, Arc); - // Checks if a given expression can be pushed down into `DataSourceExec` as opposed to being evaluated // post-parquet-scan in a `FilterExec`. If it can be pushed down, this returns all the // columns in the given expression so that they can be used in the parquet scanning, along with the // expression rewritten as defined in [`PushdownChecker::f_up`] fn pushdown_columns( - expr: Arc, - file_schema: &Schema, + expr: &Arc, table_schema: &Schema, -) -> Result> { - let mut checker = PushdownChecker::new(file_schema, table_schema); - - let expr = expr.rewrite(&mut checker).data()?; - - Ok((!checker.prevents_pushdown()).then_some((checker.required_column_indices, expr))) +) -> Result>> { + let mut checker = PushdownChecker::new(table_schema); + expr.visit(&mut checker)?; + Ok((!checker.prevents_pushdown()) + .then_some(checker.required_columns.into_iter().collect())) } /// creates a PushdownChecker for a single use to check a given column with the given schemes. Used /// to check preemptively if a column name would prevent pushdowning. /// effectively does the inverse of [`pushdown_columns`] does, but with a single given column /// (instead of traversing the entire tree to determine this) -fn would_column_prevent_pushdown( - column_name: &str, - file_schema: &Schema, - table_schema: &Schema, -) -> bool { - let mut checker = PushdownChecker::new(file_schema, table_schema); +fn would_column_prevent_pushdown(column_name: &str, table_schema: &Schema) -> bool { + let mut checker = PushdownChecker::new(table_schema); // the return of this is only used for [`PushdownChecker::f_down()`], so we can safely ignore // it here. I'm just verifying we know the return type of this so nobody accidentally changes @@ -430,14 +387,13 @@ fn would_column_prevent_pushdown( /// Otherwise, true. pub fn can_expr_be_pushed_down_with_schemas( expr: &datafusion_expr::Expr, - file_schema: &Schema, + _file_schema: &Schema, table_schema: &Schema, ) -> bool { let mut can_be_pushed = true; expr.apply(|expr| match expr { datafusion_expr::Expr::Column(column) => { - can_be_pushed &= - !would_column_prevent_pushdown(column.name(), file_schema, table_schema); + can_be_pushed &= !would_column_prevent_pushdown(column.name(), table_schema); Ok(if can_be_pushed { TreeNodeRecursion::Jump } else { @@ -450,41 +406,12 @@ pub fn can_expr_be_pushed_down_with_schemas( can_be_pushed } -/// Computes the projection required to go from the file's schema order to the projected -/// order expected by this filter -/// -/// Effectively this computes the rank of each element in `src` -fn remap_projection(src: &[usize]) -> Vec { - let len = src.len(); - - // Compute the column mapping from projected order to file order - // i.e. the indices required to sort projected schema into the file schema - // - // e.g. projection: [5, 9, 0] -> [2, 0, 1] - let mut sorted_indexes: Vec<_> = (0..len).collect(); - sorted_indexes.sort_unstable_by_key(|x| src[*x]); - - // Compute the mapping from schema order to projected order - // i.e. the indices required to sort file schema into the projected schema - // - // Above we computed the order of the projected schema according to the file - // schema, and so we can use this as the comparator - // - // e.g. sorted_indexes [2, 0, 1] -> [1, 2, 0] - let mut projection: Vec<_> = (0..len).collect(); - projection.sort_unstable_by_key(|x| sorted_indexes[*x]); - projection -} - /// Calculate the total compressed size of all `Column`'s required for /// predicate `Expr`. /// /// This value represents the total amount of IO required to evaluate the /// predicate. -fn size_of_columns( - columns: &BTreeSet, - metadata: &ParquetMetaData, -) -> Result { +fn size_of_columns(columns: &[usize], metadata: &ParquetMetaData) -> Result { let mut total_size = 0; let row_groups = metadata.row_groups(); for idx in columns { @@ -501,10 +428,7 @@ fn size_of_columns( /// /// Sorted columns may be queried more efficiently in the presence of /// a PageIndex. -fn columns_sorted( - _columns: &BTreeSet, - _metadata: &ParquetMetaData, -) -> Result { +fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result { // TODO How do we know this? Ok(false) } @@ -530,7 +454,7 @@ pub fn build_row_filter( metadata: &ParquetMetaData, reorder_predicates: bool, file_metrics: &ParquetFileMetrics, - schema_mapping: Arc, + schema_adapter_factory: Arc, ) -> Result> { let rows_pruned = &file_metrics.pushdown_rows_pruned; let rows_matched = &file_metrics.pushdown_rows_matched; @@ -540,12 +464,20 @@ pub fn build_row_filter( // `a = 1 AND b = 2 AND c = 3` -> [`a = 1`, `b = 2`, `c = 3`] let predicates = split_conjunction(expr); + let file_schema = Arc::new(file_schema.clone()); + let table_schema = Arc::new(table_schema.clone()); + // Determine which conjuncts can be evaluated as ArrowPredicates, if any let mut candidates: Vec = predicates .into_iter() .map(|expr| { - FilterCandidateBuilder::new(Arc::clone(expr), file_schema, table_schema) - .build(metadata) + FilterCandidateBuilder::new( + Arc::clone(expr), + file_schema.clone(), + table_schema.clone(), + schema_adapter_factory.clone(), + ) + .build(metadata) }) .collect::, _>>()? .into_iter() @@ -571,12 +503,10 @@ pub fn build_row_filter( .map(|candidate| { DatafusionArrowPredicate::try_new( candidate, - file_schema, metadata, rows_pruned.clone(), rows_matched.clone(), time.clone(), - Arc::clone(&schema_mapping), ) .map(|pred| Box::new(pred) as _) }) @@ -587,19 +517,17 @@ pub fn build_row_filter( #[cfg(test)] mod test { use super::*; - use datafusion_datasource::schema_adapter::{ - DefaultSchemaAdapterFactory, SchemaAdapterFactory, - }; + use datafusion_common::ScalarValue; + use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory; use arrow::datatypes::{Field, Fields, TimeUnit::Nanosecond}; - use datafusion_expr::{cast, col, lit, Expr}; + use datafusion_expr::{col, Expr}; use datafusion_physical_expr::planner::logical2physical; use datafusion_physical_plan::metrics::{Count, Time}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::arrow::parquet_to_arrow_schema; use parquet::file::reader::{FileReader, SerializedFileReader}; - use rand::prelude::*; // We should ignore predicate that read non-primitive columns #[test] @@ -619,51 +547,19 @@ mod test { let expr = col("int64_list").is_not_null(); let expr = logical2physical(&expr, &table_schema); - let candidate = FilterCandidateBuilder::new(expr, &table_schema, &table_schema) - .build(metadata) - .expect("building candidate"); - - assert!(candidate.is_none()); - } - - // If a column exists in the table schema but not the file schema it should be rewritten to a null expression - #[test] - fn test_filter_candidate_builder_rewrite_missing_column() { - let testdata = datafusion_common::test_util::parquet_test_data(); - let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet")) - .expect("opening file"); - - let reader = SerializedFileReader::new(file).expect("creating reader"); - - let metadata = reader.metadata(); - - let table_schema = - parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None) - .expect("parsing schema"); - - let file_schema = Schema::new(vec![ - Field::new("bigint_col", DataType::Int64, true), - Field::new("float_col", DataType::Float32, true), - ]); - - // The parquet file with `file_schema` just has `bigint_col` and `float_col` column, and don't have the `int_col` - let expr = col("bigint_col").eq(cast(col("int_col"), DataType::Int64)); - let expr = logical2physical(&expr, &table_schema); - let expected_candidate_expr = - col("bigint_col").eq(cast(lit(ScalarValue::Int32(None)), DataType::Int64)); - let expected_candidate_expr = - logical2physical(&expected_candidate_expr, &table_schema); - - let candidate = FilterCandidateBuilder::new(expr, &file_schema, &table_schema) - .build(metadata) - .expect("building candidate"); + let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory::default()); + let table_schema = Arc::new(table_schema.clone()); - assert!(candidate.is_some()); + let candidate = FilterCandidateBuilder::new( + expr, + table_schema.clone(), + table_schema, + schema_adapter_factory, + ) + .build(metadata) + .expect("building candidate"); - assert_eq!( - candidate.unwrap().expr.to_string(), - expected_candidate_expr.to_string() - ); + assert!(candidate.is_none()); } #[test] @@ -685,42 +581,43 @@ mod test { false, )]); - let table_ref = Arc::new(table_schema.clone()); - let schema_adapter = DefaultSchemaAdapterFactory::default() - .create(Arc::clone(&table_ref), table_ref); - let (schema_mapping, _) = schema_adapter - .map_schema(&file_schema) - .expect("creating schema mapping"); - - let mut parquet_reader = parquet_reader_builder.build().expect("building reader"); - - // Parquet file is small, we only need 1 record batch - let first_rb = parquet_reader - .next() - .expect("expected record batch") - .expect("expected error free record batch"); - // Test all should fail let expr = col("timestamp_col").lt(Expr::Literal( ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))), )); let expr = logical2physical(&expr, &table_schema); - let candidate = FilterCandidateBuilder::new(expr, &file_schema, &table_schema) - .build(&metadata) - .expect("building candidate") - .expect("candidate expected"); + let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory::default()); + let table_schema = Arc::new(table_schema.clone()); + let candidate = FilterCandidateBuilder::new( + expr, + file_schema.clone(), + table_schema.clone(), + schema_adapter_factory, + ) + .build(&metadata) + .expect("building candidate") + .expect("candidate expected"); let mut row_filter = DatafusionArrowPredicate::try_new( candidate, - &file_schema, &metadata, Count::new(), Count::new(), Time::new(), - Arc::clone(&schema_mapping), ) .expect("creating filter predicate"); + let mut parquet_reader = parquet_reader_builder + .with_projection(row_filter.projection().clone()) + .build() + .expect("building reader"); + + // Parquet file is small, we only need 1 record batch + let first_rb = parquet_reader + .next() + .expect("expected record batch") + .expect("expected error free record batch"); + let filtered = row_filter.evaluate(first_rb.clone()); assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![false; 8]))); @@ -729,19 +626,23 @@ mod test { ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))), )); let expr = logical2physical(&expr, &table_schema); - let candidate = FilterCandidateBuilder::new(expr, &file_schema, &table_schema) - .build(&metadata) - .expect("building candidate") - .expect("candidate expected"); + let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory::default()); + let candidate = FilterCandidateBuilder::new( + expr, + file_schema, + table_schema, + schema_adapter_factory, + ) + .build(&metadata) + .expect("building candidate") + .expect("candidate expected"); let mut row_filter = DatafusionArrowPredicate::try_new( candidate, - &file_schema, &metadata, Count::new(), Count::new(), Time::new(), - schema_mapping, ) .expect("creating filter predicate"); @@ -749,24 +650,6 @@ mod test { assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![true; 8]))); } - #[test] - fn test_remap_projection() { - let mut rng = thread_rng(); - for _ in 0..100 { - // A random selection of column indexes in arbitrary order - let projection: Vec<_> = (0..100).map(|_| rng.gen()).collect(); - - // File order is the projection sorted - let mut file_order = projection.clone(); - file_order.sort_unstable(); - - let remap = remap_projection(&projection); - // Applying the remapped projection to the file order should yield the original - let remapped: Vec<_> = remap.iter().map(|r| file_order[*r]).collect(); - assert_eq!(projection, remapped) - } - } - #[test] fn nested_data_structures_prevent_pushdown() { let table_schema = get_basic_table_schema(); @@ -806,9 +689,10 @@ mod test { fn basic_expr_doesnt_prevent_pushdown() { let table_schema = get_basic_table_schema(); - let file_schema = Schema::new(vec![Field::new("str_col", DataType::Utf8, true)]); + let file_schema = + Schema::new(vec![Field::new("string_col", DataType::Utf8, true)]); - let expr = col("str_col").is_null(); + let expr = col("string_col").is_null(); assert!(can_expr_be_pushed_down_with_schemas( &expr, @@ -822,13 +706,13 @@ mod test { let table_schema = get_basic_table_schema(); let file_schema = Schema::new(vec![ - Field::new("str_col", DataType::Utf8, true), - Field::new("int_col", DataType::UInt64, true), + Field::new("string_col", DataType::Utf8, true), + Field::new("bigint_col", DataType::Int64, true), ]); - let expr = col("str_col") + let expr = col("string_col") .is_not_null() - .or(col("int_col").gt(Expr::Literal(ScalarValue::UInt64(Some(5))))); + .or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5))))); assert!(can_expr_be_pushed_down_with_schemas( &expr, diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index 2d3b35ef78ea..23313bfe872f 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -98,19 +98,6 @@ pub trait SchemaAdapter: Send + Sync { pub trait SchemaMapper: Debug + Send + Sync { /// Adapts a `RecordBatch` to match the `table_schema` fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result; - - /// Adapts a [`RecordBatch`] that does not have all the columns from the - /// file schema. - /// - /// This method is used, for example, when applying a filter to a subset of - /// the columns as part of `DataFusionArrowPredicate` when `filter_pushdown` - /// is enabled. - /// - /// This method is slower than `map_batch` as it looks up columns by name. - fn map_partial_batch( - &self, - batch: RecordBatch, - ) -> datafusion_common::Result; } pub trait MissingColumnGeneratorFactory: Debug + Send + Sync { @@ -283,11 +270,10 @@ impl SchemaAdapterFactory for DefaultSchemaAdapterFactory { fn create( &self, projected_table_schema: SchemaRef, - table_schema: SchemaRef, + _table_schema: SchemaRef, ) -> Box { Box::new(DefaultSchemaAdapter { projected_table_schema, - table_schema, column_generators: self.column_generators.clone(), }) } @@ -300,12 +286,6 @@ pub(crate) struct DefaultSchemaAdapter { /// The schema for the table, projected to include only the fields being output (projected) by the /// associated ParquetSource projected_table_schema: SchemaRef, - /// The entire table schema for the table we're using this to adapt. - /// - /// This is used to evaluate any filters pushed down into the scan - /// which may refer to columns that are not referred to anywhere - /// else in the plan. - table_schema: SchemaRef, /// The column generators to use when a column is missing column_generators: ColumnGeneratorFactories, } @@ -409,7 +389,6 @@ impl SchemaAdapter for DefaultSchemaAdapter { Arc::new(SchemaMapping { projected_table_schema: self.projected_table_schema.clone(), field_sources, - table_schema: self.table_schema.clone(), }), sorted_projection, )) @@ -453,27 +432,12 @@ fn sort_projections_and_sources( /// The SchemaMapping struct holds a mapping from the file schema to the table /// schema and any necessary type conversions. /// -/// Note, because `map_batch` and `map_partial_batch` functions have different -/// needs, this struct holds two schemas: -/// -/// 1. The projected **table** schema -/// 2. The full table schema -/// /// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which /// has the projected schema, since that's the schema which is supposed to come /// out of the execution of this query. Thus `map_batch` uses /// `projected_table_schema` as it can only operate on the projected fields. /// -/// [`map_partial_batch`] is used to create a RecordBatch with a schema that -/// can be used for Parquet predicate pushdown, meaning that it may contain -/// fields which are not in the projected schema (as the fields that parquet -/// pushdown filters operate can be completely distinct from the fields that are -/// projected (output) out of the ParquetSource). `map_partial_batch` thus uses -/// `table_schema` to create the resulting RecordBatch (as it could be operating -/// on any fields in the schema). -/// /// [`map_batch`]: Self::map_batch -/// [`map_partial_batch`]: Self::map_partial_batch #[derive(Debug)] pub struct SchemaMapping { /// The schema of the table. This is the expected schema after conversion @@ -482,18 +446,11 @@ pub struct SchemaMapping { /// A mapping from the fields in `projected_table_schema`` to the way to materialize /// them from the projected file schema. field_sources: Vec, - /// The entire table schema, as opposed to the projected_table_schema (which - /// only contains the columns that we are projecting out of this query). - /// This contains all fields in the table, regardless of if they will be - /// projected out or not. - table_schema: SchemaRef, } impl SchemaMapper for SchemaMapping { /// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and - /// conversions. The produced RecordBatch has a schema that contains only the projected - /// columns, so if one needs a RecordBatch with a schema that references columns which are not - /// in the projected, it would be better to use `map_partial_batch` + /// conversions. fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result { let batch_rows = batch.num_rows(); let batch_cols = batch.columns().to_vec(); @@ -534,54 +491,4 @@ impl SchemaMapper for SchemaMapping { let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; Ok(record_batch) } - - /// Adapts a [`RecordBatch`]'s schema into one that has all the correct output types and only - /// contains the fields that exist in both the file schema and table schema. - /// - /// Unlike `map_batch` this method also preserves the columns that - /// may not appear in the final output (`projected_table_schema`) but may - /// appear in push down predicates - fn map_partial_batch( - &self, - batch: RecordBatch, - ) -> datafusion_common::Result { - let batch_cols = batch.columns().to_vec(); - let schema = batch.schema(); - - // for each field in the batch's schema (which is based on a file, not a table)... - let (cols, fields) = schema - .fields() - .iter() - .zip(batch_cols.iter()) - .flat_map(|(field, batch_col)| { - self.table_schema - // try to get the same field from the table schema that we have stored in self - .field_with_name(field.name()) - // and if we don't have it, that's fine, ignore it. This may occur when we've - // created an external table whose fields are a subset of the fields in this - // file, then tried to read data from the file into this table. If that is the - // case here, it's fine to ignore because we don't care about this field - // anyways - .ok() - // but if we do have it, - .map(|table_field| { - // try to cast it into the correct output type. we don't want to ignore this - // error, though, so it's propagated. - cast(batch_col, table_field.data_type()) - // and if that works, return the field and column. - .map(|new_col| (new_col, table_field.clone())) - }) - }) - .collect::, _>>()? - .into_iter() - .unzip::<_, _, Vec<_>, Vec<_>>(); - - // Necessary to handle empty batches - let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); - - let schema = - Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone())); - let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; - Ok(record_batch) - } } From 210616c9b0983f8ad3e0ca506704d4a3bfdaaba6 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 16 Mar 2025 15:54:53 -0500 Subject: [PATCH 08/17] fix --- datafusion/core/src/datasource/mod.rs | 10 ++-------- .../src/datasource/physical_plan/parquet.rs | 18 ++++++++---------- 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index fd186fbfe7dd..c40e75eaa90d 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -276,14 +276,8 @@ mod tests { ]); let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema)); - let (mapper, indices) = adapter.map_schema(&file_schema).unwrap(); - assert_eq!(indices, vec![0]); - - let file_batch = record_batch!(("b", Float64, vec![1.0, 2.0])).unwrap(); - - // Mapping fails because it tries to fill in a non-nullable column with nulls - let err = mapper.map_batch(file_batch).unwrap_err().to_string(); - assert!(err.contains("Invalid argument error: Column 'a' is declared as non-nullable but contains null values"), "{err}"); + let err = adapter.map_schema(&file_schema).unwrap_err().to_string(); + assert_eq!(err, "Error during planning: Column a is missing from the file schema, cannot be generated, and is non-nullable"); } #[derive(Debug)] diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index d3937a5ecfeb..a6352a3e5140 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -346,17 +346,15 @@ mod tests { .round_trip_to_batches(vec![batch]) .await .unwrap(); + #[rustfmt::skip] let expected = [ - "+-----+----+----+", - "| c1 | c3 | c2 |", - "+-----+----+----+", - "| | | |", - "| | 10 | 1 |", - "| | 20 | |", - "| | 20 | 2 |", - "| Foo | 10 | |", - "| bar | | |", - "+-----+----+----+", + "+----+----+", + "| c1 | c2 |", + "+----+----+", + "| 1 | 1 |", + "| 2 | 1 |", + "| 3 | 1 |", + "+----+----+", ]; assert_batches_sorted_eq!(expected, &read); } From 896eb8b014ed01e890ee9781381d85dd811a7140 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 16 Mar 2025 16:04:45 -0500 Subject: [PATCH 09/17] fix --- datafusion/core/src/datasource/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index c40e75eaa90d..69483b343855 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -277,7 +277,7 @@ mod tests { let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema)); let err = adapter.map_schema(&file_schema).unwrap_err().to_string(); - assert_eq!(err, "Error during planning: Column a is missing from the file schema, cannot be generated, and is non-nullable"); + assert!(err.contains("Error during planning: Column a is missing from the file schema, cannot be generated, and is non-nullable")); } #[derive(Debug)] From 43e2123caee76818f1c83dfa37a63a64d590d91a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 16 Mar 2025 20:22:58 -0500 Subject: [PATCH 10/17] better test --- .../core/src/datasource/physical_plan/mod.rs | 370 +----------------- .../src/datasource/physical_plan/parquet.rs | 133 ++----- .../datasource-parquet/src/row_filter.rs | 8 +- datafusion/datasource-parquet/src/source.rs | 2 +- datafusion/datasource/src/schema_adapter.rs | 223 ++--------- 5 files changed, 68 insertions(+), 668 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 834bd522789b..cae04e5ee6b8 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -89,7 +89,7 @@ mod tests { Field::new("c3", DataType::Float64, true), ])); - let adapter = DefaultSchemaAdapterFactory::default() + let adapter = DefaultSchemaAdapterFactory .create(table_schema.clone(), table_schema.clone()); let file_schema = Schema::new(vec![ @@ -147,8 +147,7 @@ mod tests { let indices = vec![1, 2, 4]; let schema = SchemaRef::from(table_schema.project(&indices).unwrap()); - let adapter = - DefaultSchemaAdapterFactory::default().create(schema, table_schema.clone()); + let adapter = DefaultSchemaAdapterFactory.create(schema, table_schema.clone()); let (mapping, projection) = adapter.map_schema(&file_schema).unwrap(); let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); @@ -198,369 +197,4 @@ mod tests { assert_eq!(c4.value(1), 2.0_f32); assert_eq!(c4.value(2), 3.0_f32); } - - #[test] - fn schema_adapter_with_column_generator() { - use crate::datasource::schema_adapter::{ - DefaultSchemaAdapterFactory, MissingColumnGenerator, - MissingColumnGeneratorFactory, - }; - use arrow::array::{ArrayRef, Int32Array}; - use arrow::datatypes::Int32Type; - use std::fmt; - - // A simple generator that produces a constant value - #[derive(Debug)] - struct ConstantGenerator(Int32Array); - - impl MissingColumnGenerator for ConstantGenerator { - fn generate( - &self, - _batch: RecordBatch, - ) -> datafusion_common::Result { - Ok(Arc::new(self.0.clone())) - } - - fn dependencies(&self) -> Vec { - vec![] - } - } - - // A factory that produces a constant generator for a specific field - #[derive(Debug)] - struct ConstantGeneratorFactory { - field_name: String, - value: i32, - } - - impl fmt::Display for ConstantGeneratorFactory { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "ConstantGeneratorFactory({}={})", - self.field_name, self.value - ) - } - } - - impl MissingColumnGeneratorFactory for ConstantGeneratorFactory { - fn create( - &self, - field: &Field, - _file_schema: &Schema, - ) -> Option> { - if field.name() == &self.field_name - && field.data_type() == &DataType::Int32 - { - let array = Int32Array::from(vec![self.value; 3]); - Some(Arc::new(ConstantGenerator(array))) - } else { - None - } - } - } - - // A generator that depends on another column - #[derive(Debug)] - struct MultiplyByTwoGenerator { - dependency: String, - } - - impl MissingColumnGenerator for MultiplyByTwoGenerator { - fn generate( - &self, - batch: RecordBatch, - ) -> datafusion_common::Result { - let idx = batch - .schema() - .index_of(&self.dependency) - .expect("dependency should exist"); - let col = batch.column(idx); - let col = col.as_primitive::(); - - let result: Int32Array = col.iter().map(|v| v.map(|x| x * 2)).collect(); - Ok(Arc::new(result)) - } - - fn dependencies(&self) -> Vec { - vec![self.dependency.clone()] - } - } - - #[derive(Debug)] - struct MultiplyByTwoGeneratorFactory; - - impl fmt::Display for MultiplyByTwoGeneratorFactory { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "MultiplyByTwoGeneratorFactory") - } - } - - impl MissingColumnGeneratorFactory for MultiplyByTwoGeneratorFactory { - fn create( - &self, - field: &Field, - file_schema: &Schema, - ) -> Option> { - if field.name() == "doubled_id" && field.data_type() == &DataType::Int32 { - // Look for id column to use as our dependency - if file_schema.column_with_name("id").is_some() { - Some(Arc::new(MultiplyByTwoGenerator { - dependency: "id".to_string(), - })) - } else { - None - } - } else { - None - } - } - } - - // Test with a constant generator - let table_schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, true), - Field::new("c1", DataType::Boolean, true), - Field::new("missing_column", DataType::Int32, true), - ])); - - let file_schema = Schema::new(vec![ - Field::new("id", DataType::Int32, true), - Field::new("c1", DataType::Boolean, true), - ]); - - // Create a factory that will generate a constant value for the missing column - let generator_factory = Arc::new(ConstantGeneratorFactory { - field_name: "missing_column".to_string(), - value: 42, - }); - - let adapter = DefaultSchemaAdapterFactory::default() - .with_column_generator(generator_factory) - .create(table_schema.clone(), table_schema.clone()); - - let (mapping, projection) = adapter.map_schema(&file_schema).unwrap(); - - // Create a batch to test - let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); - let c1 = BooleanArray::from(vec![Some(true), Some(false), Some(true)]); - let batch = RecordBatch::try_new( - Arc::new(file_schema.clone()), - vec![Arc::new(id), Arc::new(c1)], - ) - .unwrap(); - - let projected = batch.project(&projection).unwrap(); - let mapped_batch = mapping.map_batch(projected).unwrap(); - - // Verify the result - assert_eq!(mapped_batch.schema(), table_schema); - assert_eq!(mapped_batch.num_columns(), 3); - assert_eq!(mapped_batch.num_rows(), 3); - - // Check the missing column was generated with constant value - let missing_col = mapped_batch.column(2).as_primitive::(); - assert_eq!(missing_col.value(0), 42); - assert_eq!(missing_col.value(1), 42); - assert_eq!(missing_col.value(2), 42); - - // Test with a generator that depends on another column - let table_schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, true), - Field::new("doubled_id", DataType::Int32, true), - ])); - - let file_schema = Schema::new(vec![Field::new("id", DataType::Int32, true)]); - - // Set up the generator factory - let adapter = DefaultSchemaAdapterFactory::default() - .with_column_generator(Arc::new(MultiplyByTwoGeneratorFactory)) - .create(table_schema.clone(), table_schema.clone()); - - let (mapping, projection) = adapter.map_schema(&file_schema).unwrap(); - - // Create a batch with just an id column - let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); - let batch = - RecordBatch::try_new(Arc::new(file_schema.clone()), vec![Arc::new(id)]) - .unwrap(); - - let projected = batch.project(&projection).unwrap(); - let mapped_batch = mapping.map_batch(projected).unwrap(); - - // Verify the result - assert_eq!(mapped_batch.schema(), table_schema); - assert_eq!(mapped_batch.num_columns(), 2); - assert_eq!(mapped_batch.num_rows(), 3); - - // Check the doubled_id column was generated correctly - let id_col = mapped_batch.column(0).as_primitive::(); - let doubled_col = mapped_batch.column(1).as_primitive::(); - - assert_eq!(doubled_col.value(0), id_col.value(0) * 2); - assert_eq!(doubled_col.value(1), id_col.value(1) * 2); - assert_eq!(doubled_col.value(2), id_col.value(2) * 2); - } - - #[test] - fn schema_adapter_with_multiple_generators() { - use crate::datasource::schema_adapter::{ - DefaultSchemaAdapterFactory, MissingColumnGenerator, - MissingColumnGeneratorFactory, - }; - use arrow::array::{ArrayRef, Int32Array, StringArray}; - use arrow::datatypes::Int32Type; - use std::fmt; - - // A generator for creating a description from an id - #[derive(Debug)] - struct IdToDescriptionGenerator; - - impl MissingColumnGenerator for IdToDescriptionGenerator { - fn generate( - &self, - batch: RecordBatch, - ) -> datafusion_common::Result { - let idx = batch.schema().index_of("id").expect("id should exist"); - let col = batch.column(idx); - let col = col.as_primitive::(); - - let result: StringArray = col - .iter() - .map(|v| { - v.map(|id| match id { - 1 => "Product One", - 2 => "Product Two", - 3 => "Product Three", - _ => "Unknown Product", - }) - }) - .collect(); - Ok(Arc::new(result)) - } - - fn dependencies(&self) -> Vec { - vec!["id".to_string()] - } - } - - #[derive(Debug)] - struct IdToDescriptionGeneratorFactory; - - impl fmt::Display for IdToDescriptionGeneratorFactory { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "IdToDescriptionGeneratorFactory") - } - } - - impl MissingColumnGeneratorFactory for IdToDescriptionGeneratorFactory { - fn create( - &self, - field: &Field, - file_schema: &Schema, - ) -> Option> { - if field.name() == "description" && field.data_type() == &DataType::Utf8 { - if file_schema.column_with_name("id").is_some() { - Some(Arc::new(IdToDescriptionGenerator)) - } else { - None - } - } else { - None - } - } - } - - // A generator for creating a score column with constant value - #[derive(Debug)] - struct ScoreGenerator(i32); - - impl MissingColumnGenerator for ScoreGenerator { - fn generate( - &self, - batch: RecordBatch, - ) -> datafusion_common::Result { - let len = batch.num_rows(); - Ok(Arc::new(Int32Array::from(vec![self.0; len]))) - } - - fn dependencies(&self) -> Vec { - vec![] - } - } - - #[derive(Debug)] - struct ScoreGeneratorFactory; - - impl fmt::Display for ScoreGeneratorFactory { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "ScoreGeneratorFactory") - } - } - - impl MissingColumnGeneratorFactory for ScoreGeneratorFactory { - fn create( - &self, - field: &Field, - _file_schema: &Schema, - ) -> Option> { - if field.name() == "score" && field.data_type() == &DataType::Int32 { - Some(Arc::new(ScoreGenerator(100))) - } else { - None - } - } - } - - // Set up the schema with multiple missing columns - let table_schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, true), - Field::new("description", DataType::Utf8, true), - Field::new("score", DataType::Int32, true), - ])); - - let file_schema = Schema::new(vec![Field::new("id", DataType::Int32, true)]); - - // Create factory that will generate multiple missing columns - let adapter = DefaultSchemaAdapterFactory::default() - .with_column_generator(Arc::new(IdToDescriptionGeneratorFactory)) - .with_column_generator(Arc::new(ScoreGeneratorFactory)) - .create(table_schema.clone(), table_schema.clone()); - - let (mapping, projection) = adapter.map_schema(&file_schema).unwrap(); - - // Create a batch to test - let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); - let batch = - RecordBatch::try_new(Arc::new(file_schema.clone()), vec![Arc::new(id)]) - .unwrap(); - - let projected = batch.project(&projection).unwrap(); - let mapped_batch = mapping.map_batch(projected).unwrap(); - - // Verify the result - assert_eq!(mapped_batch.schema(), table_schema); - assert_eq!(mapped_batch.num_columns(), 3); - assert_eq!(mapped_batch.num_rows(), 3); - - // Check both missing columns were generated correctly - let id_col = mapped_batch.column(0).as_primitive::(); - let description_col = mapped_batch.column(1).as_string::(); - let score_col = mapped_batch.column(2).as_primitive::(); - - // Verify id column - assert_eq!(id_col.value(0), 1); - assert_eq!(id_col.value(1), 2); - assert_eq!(id_col.value(2), 3); - - // Verify description column generated from id - assert_eq!(description_col.value(0), "Product One"); - assert_eq!(description_col.value(1), "Product Two"); - assert_eq!(description_col.value(2), "Product Three"); - - // Verify score column has constant value - assert_eq!(score_col.value(0), 100); - assert_eq!(score_col.value(1), 100); - assert_eq!(score_col.value(2), 100); - } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index a6352a3e5140..5280d013ec84 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -52,10 +52,6 @@ mod tests { use datafusion_datasource::file_format::FileFormat; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_scan_config::FileScanConfig; - use datafusion_datasource::schema_adapter::{ - DefaultSchemaAdapterFactory, MissingColumnGenerator, - MissingColumnGeneratorFactory, SchemaAdapterFactory, - }; use datafusion_datasource::source::DataSourceExec; use datafusion_datasource::{FileRange, PartitionedFile}; @@ -94,7 +90,6 @@ mod tests { /// options. #[derive(Debug, Default)] struct RoundTrip { - schema_adapter_factory: Option>, projection: Option>, schema: Option, predicate: Option, @@ -107,14 +102,6 @@ mod tests { Default::default() } - fn with_schema_adapter_factory( - mut self, - schema_adapter_factory: Arc, - ) -> Self { - self.schema_adapter_factory = Some(schema_adapter_factory); - self - } - fn with_projection(mut self, projection: Vec) -> Self { self.projection = Some(projection); self @@ -156,7 +143,6 @@ mod tests { predicate, pushdown_predicate, page_index_predicate, - schema_adapter_factory, } = self; let file_schema = match schema { @@ -188,10 +174,6 @@ mod tests { .with_reorder_filters(true); } - if let Some(schema_adapter_factory) = schema_adapter_factory { - source = source.with_schema_adapter_factory(schema_adapter_factory); - } - if page_index_predicate { source = source.with_enable_page_index(true); } @@ -259,104 +241,45 @@ mod tests { // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`, // the default behavior is to fill in missing columns with nulls. // Thus this predicate will come back as false. - let filter = col("c1").eq(lit(1_i32)).and(col("c2").eq(lit(1_i32))); - - let read = RoundTrip::new() - .with_schema(table_schema) - .with_predicate(filter) + let filter = col("c2").eq(lit(1_i32)); + let rt = RoundTrip::new() + .with_schema(table_schema.clone()) + .with_predicate(filter.clone()) .with_pushdown_predicate() - .round_trip_to_batches(vec![batch]) - .await - .unwrap(); - let total_rows = read.iter().map(|b| b.num_rows()).sum::(); + .round_trip(vec![batch.clone()]) + .await; + let total_rows = rt + .batches + .unwrap() + .iter() + .map(|b| b.num_rows()) + .sum::(); assert_eq!(total_rows, 0, "Expected no rows to match the predicate"); - } - - #[tokio::test] - async fn test_pushdown_with_column_generators() { - #[derive(Debug, Clone)] - struct ConstantMissingColumnGenerator { - default_value: ScalarValue, - } - - impl MissingColumnGenerator for ConstantMissingColumnGenerator { - fn dependencies(&self) -> Vec { - vec![] - } - - fn generate(&self, batch: RecordBatch) -> Result { - let num_rows = batch.num_rows(); - let array = self.default_value.to_array_of_size(num_rows)?; - Ok(array) - } - } - - #[derive(Debug, Clone)] - struct ConstantMissingColumnGeneratorFactory { - column: String, - default_value: ScalarValue, - } - - impl MissingColumnGeneratorFactory for ConstantMissingColumnGeneratorFactory { - fn create( - &self, - field: &Field, - _file_schema: &Schema, - ) -> Option> { - if *field.name() == self.column { - Some(Arc::new(ConstantMissingColumnGenerator { - default_value: self.default_value.clone(), - })) - } else { - None - } - } - } - - let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); - - let file_schema = - Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)])); - - let table_schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Int32, true), - Field::new("c2", DataType::Int32, true), - ])); - - let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap(); - - let filter = col("c2").eq(lit(1_i32)); - - let missing_column_generator_factory = - Arc::new(ConstantMissingColumnGeneratorFactory { - column: "c2".to_string(), - default_value: ScalarValue::Int32(Some(1)), - }); - - let schema_adapter_factory = Arc::new( - DefaultSchemaAdapterFactory::default() - .with_column_generator(missing_column_generator_factory), - ); + let metrics = rt.parquet_exec.metrics().unwrap(); + let metric = get_value(&metrics, "pushdown_rows_pruned"); + assert_eq!(metric, 3, "Expected all rows to be pruned"); - let read = RoundTrip::new() - .with_schema(table_schema) - .with_predicate(filter) + // If we excplicitly allow nulls the rest of the predicate should work + let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32))); + let rt = RoundTrip::new() + .with_schema(table_schema.clone()) + .with_predicate(filter.clone()) .with_pushdown_predicate() - .with_schema_adapter_factory(schema_adapter_factory) - .round_trip_to_batches(vec![batch]) - .await - .unwrap(); + .round_trip(vec![batch.clone()]) + .await; + let batches = rt.batches.unwrap(); #[rustfmt::skip] let expected = [ "+----+----+", "| c1 | c2 |", "+----+----+", - "| 1 | 1 |", - "| 2 | 1 |", - "| 3 | 1 |", + "| 1 | |", "+----+----+", ]; - assert_batches_sorted_eq!(expected, &read); + assert_batches_sorted_eq!(expected, &batches); + let metrics = rt.parquet_exec.metrics().unwrap(); + let metric = get_value(&metrics, "pushdown_rows_pruned"); + assert_eq!(metric, 2, "Expected all rows to be pruned"); } #[tokio::test] diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 264c13735a18..810b3c1bf71c 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -518,9 +518,9 @@ pub fn build_row_filter( mod test { use super::*; use datafusion_common::ScalarValue; - use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory; use arrow::datatypes::{Field, Fields, TimeUnit::Nanosecond}; + use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory; use datafusion_expr::{col, Expr}; use datafusion_physical_expr::planner::logical2physical; use datafusion_physical_plan::metrics::{Count, Time}; @@ -547,7 +547,7 @@ mod test { let expr = col("int64_list").is_not_null(); let expr = logical2physical(&expr, &table_schema); - let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory::default()); + let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory); let table_schema = Arc::new(table_schema.clone()); let candidate = FilterCandidateBuilder::new( @@ -586,7 +586,7 @@ mod test { ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))), )); let expr = logical2physical(&expr, &table_schema); - let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory::default()); + let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory); let table_schema = Arc::new(table_schema.clone()); let candidate = FilterCandidateBuilder::new( expr, @@ -626,7 +626,7 @@ mod test { ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))), )); let expr = logical2physical(&expr, &table_schema); - let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory::default()); + let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory); let candidate = FilterCandidateBuilder::new( expr, file_schema, diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index ca28a882acb0..683d62a1df49 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -473,7 +473,7 @@ impl FileSource for ParquetSource { let schema_adapter_factory = self .schema_adapter_factory .clone() - .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory::default())); + .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory)); let parquet_file_reader_factory = self.parquet_file_reader_factory.clone().unwrap_or_else(|| { diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index 23313bfe872f..4164cda8cba1 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -21,12 +21,10 @@ //! physical format into how they should be used by DataFusion. For instance, a schema //! can be stored external to a parquet file that maps parquet logical types to arrow types. -use arrow::array::{new_null_array, ArrayRef, RecordBatch, RecordBatchOptions}; +use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions}; use arrow::compute::{can_cast_types, cast}; -use arrow::datatypes::{Field, Schema, SchemaRef}; +use arrow::datatypes::{Schema, SchemaRef}; use datafusion_common::plan_err; -use itertools::Itertools; -use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; @@ -100,46 +98,6 @@ pub trait SchemaMapper: Debug + Send + Sync { fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result; } -pub trait MissingColumnGeneratorFactory: Debug + Send + Sync { - /// Create a [`MissingColumnGenerator`] for the given `field` and `file_schema`. - /// Returns None if the column cannot be generated by this generator. - /// Otherwise, returns a [`MissingColumnGenerator`] that can generate the missing column. - fn create( - &self, - field: &Field, - file_schema: &Schema, - ) -> Option>; -} - -pub trait MissingColumnGenerator: Debug + Send + Sync { - /// Generate a missing column for the given `field` from the provided `batch`. - /// When this method is called `batch` will contain all of the columns declared as dependencies in `dependencies`. - /// If the column cannot be generated, this method should return an error. - /// Otherwise, it should return the generated column as an `ArrayRef`. - /// No casting or post processing is done by this method, so the generated column should match the data type - /// of the `field` it is being generated for. - /// The order of - fn generate(&self, batch: RecordBatch) -> datafusion_common::Result; - - /// Returns a list of column names that this generator depends on to generate the missing column. - /// This is used when creating the `RecordBatch` to ensure that all dependencies are present before calling `generate`. - /// The dependencies do not need to be declared in any particular order. - fn dependencies(&self) -> Vec; -} - -pub type ColumnGeneratorFactories = - Vec>; - -#[derive(Debug)] -enum FieldSource { - /// The field is present in the (projected) file schema at the given index - Table(usize), - /// The field is generated by the given generator - Generated(Arc), - /// The field will be populated with null - Null, -} - /// Default [`SchemaAdapterFactory`] for mapping schemas. /// /// This can be used to adapt file-level record batches to a table schema and @@ -226,13 +184,7 @@ enum FieldSource { /// assert_eq!(mapped_batch, expected_batch); /// ``` #[derive(Clone, Debug, Default)] -pub struct DefaultSchemaAdapterFactory { - /// Optional generator for missing columns - /// - /// This is used to fill in missing columns with a default value other than null. - /// If this is `None`, then missing columns will be filled with nulls. - column_generators: ColumnGeneratorFactories, -} +pub struct DefaultSchemaAdapterFactory; impl DefaultSchemaAdapterFactory { /// Create a new factory for mapping batches from a file schema to a table @@ -242,27 +194,7 @@ impl DefaultSchemaAdapterFactory { /// the same schema for both the projected table schema and the table /// schema. pub fn from_schema(table_schema: SchemaRef) -> Box { - Self { - column_generators: vec![], - } - .create(Arc::clone(&table_schema), table_schema) - } - - pub fn with_column_generator( - self, - generator: Arc, - ) -> Self { - let mut generators = self.column_generators; - generators.push(generator); - Self { - column_generators: generators, - } - } - - pub fn with_column_generators(self, generators: ColumnGeneratorFactories) -> Self { - Self { - column_generators: generators, - } + Self.create(Arc::clone(&table_schema), table_schema) } } @@ -274,7 +206,6 @@ impl SchemaAdapterFactory for DefaultSchemaAdapterFactory { ) -> Box { Box::new(DefaultSchemaAdapter { projected_table_schema, - column_generators: self.column_generators.clone(), }) } } @@ -286,8 +217,6 @@ pub(crate) struct DefaultSchemaAdapter { /// The schema for the table, projected to include only the fields being output (projected) by the /// associated ParquetSource projected_table_schema: SchemaRef, - /// The column generators to use when a column is missing - column_generators: ColumnGeneratorFactories, } impl SchemaAdapter for DefaultSchemaAdapter { @@ -313,122 +242,40 @@ impl SchemaAdapter for DefaultSchemaAdapter { &self, file_schema: &Schema, ) -> datafusion_common::Result<(Arc, Vec)> { - // Projection is the indexes into the file schema that we need to read - // Note that readers will NOT respect the order of the columns in projection let mut projection = Vec::with_capacity(file_schema.fields().len()); + let mut field_mappings = vec![None; self.projected_table_schema.fields().len()]; - // Additions to the projection which will be needed to generate missing columns - // TODO can we refactor this away? - let mut dependency_projection = Vec::with_capacity(file_schema.fields().len()); - - let mut field_sources = - Vec::with_capacity(self.projected_table_schema.fields().len()); - - for field in self.projected_table_schema.fields() { - if let Some((file_idx, file_field)) = file_schema.fields.find(field.name()) { - // If the field exists in the file schema, check if we can cast it to the table schema - match can_cast_types(file_field.data_type(), field.data_type()) { + for (file_idx, file_field) in file_schema.fields.iter().enumerate() { + if let Some((table_idx, table_field)) = + self.projected_table_schema.fields().find(file_field.name()) + { + match can_cast_types(file_field.data_type(), table_field.data_type()) { true => { + field_mappings[table_idx] = Some(projection.len()); projection.push(file_idx); - field_sources.push(FieldSource::Table(projection.len() - 1)); } false => { return plan_err!( "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", file_field.name(), file_field.data_type(), - field.data_type() + table_field.data_type() ) } } - } else if let Some(generator) = self - .column_generators - .iter() - .find_map(|factory| factory.create(field, file_schema)) - { - let dependencies = generator.dependencies(); - let mut dependency_indices = Vec::with_capacity(dependencies.len()); - for dep in &dependencies { - if let Ok(dep_idx) = file_schema.index_of(dep) { - dependency_indices.push(dep_idx); - } else { - return plan_err!( - "Generated column {} depends on column {} but column {} is not present in the file schema, columns present: {:?}", - field.name(), - dep, - dep, - file_schema.fields().iter().map(|f| f.name()).collect::>() - ); - } - } - dependency_projection.extend(dependency_indices); - field_sources.push(FieldSource::Generated(generator)); - } else if field.is_nullable() { - field_sources.push(FieldSource::Null); - } else { - return plan_err!( - "Column {} is missing from the file schema, cannot be generated, and is non-nullable", - field.name() - ); - } - } - - for dep in dependency_projection { - if !projection.contains(&dep) { - projection.push(dep); } } - let (sorted_projection, field_sources) = - sort_projections_and_sources(&projection, field_sources); - - debug_assert!(sorted_projection.is_sorted()); - debug_assert!(sorted_projection.iter().all_unique()); - Ok(( Arc::new(SchemaMapping { - projected_table_schema: self.projected_table_schema.clone(), - field_sources, + projected_table_schema: Arc::clone(&self.projected_table_schema), + field_mappings, }), - sorted_projection, + projection, )) } } -/// The parquet reader needs projections to be sorted (it does not respect the order of the columns in the projection, only the values) -/// This function adjusts the projections and mappings so that they all reference sorted projections -fn sort_projections_and_sources( - projection: &[usize], - mut field_sources: Vec, -) -> (Vec, Vec) { - // Sort projection and create a mapping from old to new positions - let mut sorted_projection = projection.to_vec(); - sorted_projection.sort_unstable(); - - // Create a mapping from old projection values to their new positions - let mut new_position_map = HashMap::new(); - for (new_pos, &proj_val) in sorted_projection.iter().enumerate() { - new_position_map.insert(proj_val, new_pos); - } - - // Create a mapping from old positions to new positions in the projected schema - let mut position_mapping = vec![0; projection.len()]; - for (old_pos, &proj_val) in projection.iter().enumerate() { - position_mapping[old_pos] = *new_position_map - .get(&proj_val) - .expect("should always be present"); - } - - // Update field_sources to reflect the new positions - for source in &mut field_sources { - if let FieldSource::Table(pos) = source { - *pos = position_mapping[*pos]; - } - } - - (sorted_projection, field_sources) -} - /// The SchemaMapping struct holds a mapping from the file schema to the table /// schema and any necessary type conversions. /// @@ -443,23 +290,22 @@ pub struct SchemaMapping { /// The schema of the table. This is the expected schema after conversion /// and it should match the schema of the query result. projected_table_schema: SchemaRef, - /// A mapping from the fields in `projected_table_schema`` to the way to materialize - /// them from the projected file schema. - field_sources: Vec, + /// Mapping from field index in `projected_table_schema` to index in + /// projected file_schema. + /// + /// They are Options instead of just plain `usize`s because the table could + /// have fields that don't exist in the file. + field_mappings: Vec>, } impl SchemaMapper for SchemaMapping { /// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and /// conversions. + /// The produced RecordBatch has a schema that contains only the projected columns. fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result { let batch_rows = batch.num_rows(); let batch_cols = batch.columns().to_vec(); - debug_assert_eq!( - self.projected_table_schema.fields().len(), - self.field_sources.len() - ); - let cols = self .projected_table_schema // go through each field in the projected schema @@ -467,27 +313,24 @@ impl SchemaMapper for SchemaMapping { .iter() // and zip it with the index that maps fields from the projected table schema to the // projected file schema in `batch` - .zip(&self.field_sources) + .zip(&self.field_mappings) // and for each one... - .map(|(field, source)| -> datafusion_common::Result<_> { - let column = match source { - FieldSource::Table(file_idx) => batch_cols[*file_idx].clone(), - FieldSource::Generated(generator) => { - generator.generate(batch.clone())? - } - FieldSource::Null => { - debug_assert!(field.is_nullable()); - new_null_array(field.data_type(), batch_rows) - } - }; - Ok(cast(&column, field.data_type())?) + .map(|(field, file_idx)| { + file_idx.map_or_else( + // If this field only exists in the table, and not in the file, then we know + // that it's null, so just return that. + || Ok(new_null_array(field.data_type(), batch_rows)), + // However, if it does exist in both, then try to cast it to the correct output + // type + |batch_idx| cast(&batch_cols[batch_idx], field.data_type()), + ) }) .collect::, _>>()?; // Necessary to handle empty batches let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); - let schema = self.projected_table_schema.clone(); + let schema = Arc::clone(&self.projected_table_schema); let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; Ok(record_batch) } From 369e385b825e5eaa3fe6fb69c98e7dfcd3519e80 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 16 Mar 2025 20:35:54 -0500 Subject: [PATCH 11/17] more reverts --- datafusion/core/src/datasource/mod.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 69483b343855..fd186fbfe7dd 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -276,8 +276,14 @@ mod tests { ]); let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema)); - let err = adapter.map_schema(&file_schema).unwrap_err().to_string(); - assert!(err.contains("Error during planning: Column a is missing from the file schema, cannot be generated, and is non-nullable")); + let (mapper, indices) = adapter.map_schema(&file_schema).unwrap(); + assert_eq!(indices, vec![0]); + + let file_batch = record_batch!(("b", Float64, vec![1.0, 2.0])).unwrap(); + + // Mapping fails because it tries to fill in a non-nullable column with nulls + let err = mapper.map_batch(file_batch).unwrap_err().to_string(); + assert!(err.contains("Invalid argument error: Column 'a' is declared as non-nullable but contains null values"), "{err}"); } #[derive(Debug)] From 3f49dced9e8960b6713d21d6beec8ab2887f687f Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 16 Mar 2025 20:49:05 -0500 Subject: [PATCH 12/17] fix --- datafusion/datasource-parquet/src/row_filter.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 810b3c1bf71c..9724ee9de59d 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -107,7 +107,7 @@ pub(crate) struct DatafusionArrowPredicate { /// how long was spent evaluating this predicate time: metrics::Time, /// used to perform type coercion while filtering rows - schema_mapping: Arc, + schema_mapper: Arc, } impl DatafusionArrowPredicate { @@ -132,7 +132,7 @@ impl DatafusionArrowPredicate { rows_pruned, rows_matched, time, - schema_mapping: candidate.schema_mapper, + schema_mapper: candidate.schema_mapper, }) } } @@ -143,7 +143,7 @@ impl ArrowPredicate for DatafusionArrowPredicate { } fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult { - let batch = self.schema_mapping.map_batch(batch)?; + let batch = self.schema_mapper.map_batch(batch)?; // scoped timer updates on drop let mut timer = self.time.timer(); From 09062dde177421e6d4d8839b25f50fb1eb6affb1 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 17 Mar 2025 16:12:48 -0400 Subject: [PATCH 13/17] Reduce Schema clones in predicate --- datafusion-testing | 2 +- datafusion/datasource-parquet/src/opener.rs | 2 +- datafusion/datasource-parquet/src/row_filter.rs | 15 ++++++--------- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/datafusion-testing b/datafusion-testing index 5b424aefd7f6..3462eaa78745 160000 --- a/datafusion-testing +++ b/datafusion-testing @@ -1 +1 @@ -Subproject commit 5b424aefd7f6bf198220c37f59d39dbb25b47695 +Subproject commit 3462eaa787459957e38df267a4a21f5bea605807 diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 38a06aa94162..fed5e6d5d59c 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -174,7 +174,7 @@ impl FileOpener for ParquetOpener { builder.metadata(), reorder_predicates, &file_metrics, - schema_adapter_factory.clone(), + &schema_adapter_factory, ); match row_filter { diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 9724ee9de59d..b2f40d1ca48e 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -449,12 +449,12 @@ fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result, - file_schema: &Schema, - table_schema: &Schema, + file_schema: &SchemaRef, + table_schema: &SchemaRef, metadata: &ParquetMetaData, reorder_predicates: bool, file_metrics: &ParquetFileMetrics, - schema_adapter_factory: Arc, + schema_adapter_factory: &Arc, ) -> Result> { let rows_pruned = &file_metrics.pushdown_rows_pruned; let rows_matched = &file_metrics.pushdown_rows_matched; @@ -464,18 +464,15 @@ pub fn build_row_filter( // `a = 1 AND b = 2 AND c = 3` -> [`a = 1`, `b = 2`, `c = 3`] let predicates = split_conjunction(expr); - let file_schema = Arc::new(file_schema.clone()); - let table_schema = Arc::new(table_schema.clone()); - // Determine which conjuncts can be evaluated as ArrowPredicates, if any let mut candidates: Vec = predicates .into_iter() .map(|expr| { FilterCandidateBuilder::new( Arc::clone(expr), - file_schema.clone(), - table_schema.clone(), - schema_adapter_factory.clone(), + Arc::clone(file_schema), + Arc::clone(table_schema), + Arc::clone(schema_adapter_factory), ) .build(metadata) }) From 8252b62eb4346b24ef7c9b26f54f9b84e4844ffc Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 17 Mar 2025 16:55:29 -0500 Subject: [PATCH 14/17] add more tests --- datafusion-testing | 2 +- .../src/datasource/physical_plan/parquet.rs | 182 ++++++++++++++++++ 2 files changed, 183 insertions(+), 1 deletion(-) diff --git a/datafusion-testing b/datafusion-testing index 3462eaa78745..5b424aefd7f6 160000 --- a/datafusion-testing +++ b/datafusion-testing @@ -1 +1 @@ -Subproject commit 3462eaa787459957e38df267a4a21f5bea605807 +Subproject commit 5b424aefd7f6bf198220c37f59d39dbb25b47695 diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 5280d013ec84..aee278404b7f 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -282,6 +282,188 @@ mod tests { assert_eq!(metric, 2, "Expected all rows to be pruned"); } + #[tokio::test] + async fn test_pushdown_with_missing_column_in_file_multiple_types() { + let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + + let file_schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)])); + + let table_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, true), + Field::new("c2", DataType::Utf8, true), + ])); + + let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap(); + + // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`, + // the default behavior is to fill in missing columns with nulls. + // Thus this predicate will come back as false. + let filter = col("c2").eq(lit("abc")); + let rt = RoundTrip::new() + .with_schema(table_schema.clone()) + .with_predicate(filter.clone()) + .with_pushdown_predicate() + .round_trip(vec![batch.clone()]) + .await; + let total_rows = rt + .batches + .unwrap() + .iter() + .map(|b| b.num_rows()) + .sum::(); + assert_eq!(total_rows, 0, "Expected no rows to match the predicate"); + let metrics = rt.parquet_exec.metrics().unwrap(); + let metric = get_value(&metrics, "pushdown_rows_pruned"); + assert_eq!(metric, 3, "Expected all rows to be pruned"); + + // If we excplicitly allow nulls the rest of the predicate should work + let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32))); + let rt = RoundTrip::new() + .with_schema(table_schema.clone()) + .with_predicate(filter.clone()) + .with_pushdown_predicate() + .round_trip(vec![batch.clone()]) + .await; + let batches = rt.batches.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----+", + "| c1 | c2 |", + "+----+----+", + "| 1 | |", + "+----+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + let metrics = rt.parquet_exec.metrics().unwrap(); + let metric = get_value(&metrics, "pushdown_rows_pruned"); + assert_eq!(metric, 2, "Expected all rows to be pruned"); + } + + #[tokio::test] + async fn test_pushdown_with_missing_middle_column() { + let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + let c3 = Arc::new(Int32Array::from(vec![7, 8, 9])); + + let file_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, true), + Field::new("c3", DataType::Int32, true), + ])); + + let table_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, true), + Field::new("c2", DataType::Utf8, true), + Field::new("c3", DataType::Int32, true), + ])); + + let batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap(); + + // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`, + // the default behavior is to fill in missing columns with nulls. + // Thus this predicate will come back as false. + let filter = col("c2").eq(lit("abc")); + let rt = RoundTrip::new() + .with_schema(table_schema.clone()) + .with_predicate(filter.clone()) + .with_pushdown_predicate() + .round_trip(vec![batch.clone()]) + .await; + let total_rows = rt + .batches + .unwrap() + .iter() + .map(|b| b.num_rows()) + .sum::(); + assert_eq!(total_rows, 0, "Expected no rows to match the predicate"); + let metrics = rt.parquet_exec.metrics().unwrap(); + let metric = get_value(&metrics, "pushdown_rows_pruned"); + assert_eq!(metric, 3, "Expected all rows to be pruned"); + + // If we excplicitly allow nulls the rest of the predicate should work + let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32))); + let rt = RoundTrip::new() + .with_schema(table_schema.clone()) + .with_predicate(filter.clone()) + .with_pushdown_predicate() + .round_trip(vec![batch.clone()]) + .await; + let batches = rt.batches.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----+----+", + "| c1 | c2 | c3 |", + "+----+----+----+", + "| 1 | | 7 |", + "+----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + let metrics = rt.parquet_exec.metrics().unwrap(); + let metric = get_value(&metrics, "pushdown_rows_pruned"); + assert_eq!(metric, 2, "Expected all rows to be pruned"); + } + + #[tokio::test] + async fn test_pushdown_with_file_column_order_mismatch() { + let c3 = Arc::new(Int32Array::from(vec![7, 8, 9])); + + let file_schema = Arc::new(Schema::new(vec![ + Field::new("c3", DataType::Int32, true), + Field::new("c3", DataType::Int32, true), + ])); + + let table_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, true), + Field::new("c2", DataType::Utf8, true), + Field::new("c3", DataType::Int32, true), + ])); + + let batch = + RecordBatch::try_new(file_schema.clone(), vec![c3.clone(), c3]).unwrap(); + + // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`, + // the default behavior is to fill in missing columns with nulls. + // Thus this predicate will come back as false. + let filter = col("c2").eq(lit("abc")); + let rt = RoundTrip::new() + .with_schema(table_schema.clone()) + .with_predicate(filter.clone()) + .with_pushdown_predicate() + .round_trip(vec![batch.clone()]) + .await; + let total_rows = rt + .batches + .unwrap() + .iter() + .map(|b| b.num_rows()) + .sum::(); + assert_eq!(total_rows, 0, "Expected no rows to match the predicate"); + let metrics = rt.parquet_exec.metrics().unwrap(); + let metric = get_value(&metrics, "pushdown_rows_pruned"); + assert_eq!(metric, 3, "Expected all rows to be pruned"); + + // If we excplicitly allow nulls the rest of the predicate should work + let filter = col("c2").is_null().and(col("c3").eq(lit(7_i32))); + let rt = RoundTrip::new() + .with_schema(table_schema.clone()) + .with_predicate(filter.clone()) + .with_pushdown_predicate() + .round_trip(vec![batch.clone()]) + .await; + let batches = rt.batches.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----+----+", + "| c1 | c2 | c3 |", + "+----+----+----+", + "| | | 7 |", + "+----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + let metrics = rt.parquet_exec.metrics().unwrap(); + let metric = get_value(&metrics, "pushdown_rows_pruned"); + assert_eq!(metric, 2, "Expected all rows to be pruned"); + } + #[tokio::test] async fn evolved_schema() { let c1: ArrayRef = From 5ec3c152cc4ac6072ac9a15fbea16ff393d4f018 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 17 Mar 2025 18:03:01 -0500 Subject: [PATCH 15/17] add another test --- .../src/datasource/physical_plan/parquet.rs | 81 +++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index aee278404b7f..b5534d6b3d1c 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -464,6 +464,87 @@ mod tests { assert_eq!(metric, 2, "Expected all rows to be pruned"); } + #[tokio::test] + async fn test_pushdown_with_missing_column_nested_conditions() { + // Create test data with c1 and c3 columns + let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); + let c3: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])); + + let file_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, true), + Field::new("c3", DataType::Int32, true), + ])); + + let table_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, true), + Field::new("c2", DataType::Int32, true), + Field::new("c3", DataType::Int32, true), + ])); + + let batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap(); + + // Test with complex nested AND/OR: + // (c1 = 1 OR c2 = 5) AND (c3 = 10 OR c2 IS NULL) + // Should return 1 row where c1=1 AND c3=10 (since c2 IS NULL is always true) + let filter = col("c1") + .eq(lit(1_i32)) + .or(col("c2").eq(lit(5_i32))) + .and(col("c3").eq(lit(10_i32)).or(col("c2").is_null())); + + let rt = RoundTrip::new() + .with_schema(table_schema.clone()) + .with_predicate(filter.clone()) + .with_pushdown_predicate() + .round_trip(vec![batch.clone()]) + .await; + + let batches = rt.batches.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----+----+", + "| c1 | c2 | c3 |", + "+----+----+----+", + "| 1 | | 10 |", + "+----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + let metrics = rt.parquet_exec.metrics().unwrap(); + let metric = get_value(&metrics, "pushdown_rows_pruned"); + assert_eq!(metric, 4, "Expected 4 rows to be pruned"); + + // Test a more complex nested condition: + // (c1 < 3 AND c2 IS NOT NULL) OR (c3 > 20 AND c2 IS NULL) + // First part should return 0 rows (c2 IS NOT NULL is always false) + // Second part should return rows where c3 > 20 (3 rows: where c3 is 30, 40, 50) + let filter = col("c1") + .lt(lit(3_i32)) + .and(col("c2").is_not_null()) + .or(col("c3").gt(lit(20_i32)).and(col("c2").is_null())); + + let rt = RoundTrip::new() + .with_schema(table_schema) + .with_predicate(filter.clone()) + .with_pushdown_predicate() + .round_trip(vec![batch]) + .await; + + let batches = rt.batches.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----+----+", + "| c1 | c2 | c3 |", + "+----+----+----+", + "| 3 | | 30 |", + "| 4 | | 40 |", + "| 5 | | 50 |", + "+----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + let metrics = rt.parquet_exec.metrics().unwrap(); + let metric = get_value(&metrics, "pushdown_rows_pruned"); + assert_eq!(metric, 2, "Expected 2 rows to be pruned"); + } + #[tokio::test] async fn evolved_schema() { let c1: ArrayRef = From 37b4e0348b398d905ab76f70f83f416bff8d8655 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 18 Mar 2025 10:58:49 -0400 Subject: [PATCH 16/17] Fix datafusion testing pin --- datafusion-testing | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-testing b/datafusion-testing index 5b424aefd7f6..3462eaa78745 160000 --- a/datafusion-testing +++ b/datafusion-testing @@ -1 +1 @@ -Subproject commit 5b424aefd7f6bf198220c37f59d39dbb25b47695 +Subproject commit 3462eaa787459957e38df267a4a21f5bea605807 From 2620c6a46fba979d3c7a17a1c9cb32f52c1d5b1b Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 18 Mar 2025 12:48:12 -0500 Subject: [PATCH 17/17] fix clippy --- datafusion-testing | 1 - datafusion/datasource-parquet/src/row_filter.rs | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) delete mode 160000 datafusion-testing diff --git a/datafusion-testing b/datafusion-testing deleted file mode 160000 index 3462eaa78745..000000000000 --- a/datafusion-testing +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 3462eaa787459957e38df267a4a21f5bea605807 diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index b2f40d1ca48e..da6bf114d71d 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -119,7 +119,7 @@ impl DatafusionArrowPredicate { rows_matched: metrics::Count, time: metrics::Time, ) -> Result { - let projected_schema = candidate.filter_schema.clone(); + let projected_schema = Arc::clone(&candidate.filter_schema); let physical_expr = reassign_predicate_columns(candidate.expr, &projected_schema, true)?; @@ -273,7 +273,7 @@ impl FilterCandidateBuilder { let (schema_mapper, projection_into_file_schema) = self .schema_adapter_factory - .create(projected_table_schema.clone(), self.table_schema) + .create(Arc::clone(&projected_table_schema), self.table_schema) .map_schema(&self.file_schema)?; let required_bytes = size_of_columns(&projection_into_file_schema, metadata)?;