diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index 43dc592b997e..bb1cf3c8f78d 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -504,7 +504,7 @@ impl TableProvider for IndexTableProvider { .with_file(partitioned_file); // Finally, put it all together into a DataSourceExec - Ok(file_scan_config.new_exec()) + Ok(file_scan_config.build()) } /// Tell DataFusion to push filters down to the scan method diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet_index.rs index f465699abed2..3851dca2a775 100644 --- a/datafusion-examples/examples/parquet_index.rs +++ b/datafusion-examples/examples/parquet_index.rs @@ -258,7 +258,7 @@ impl TableProvider for IndexTableProvider { file_size, )); } - Ok(file_scan_config.new_exec()) + Ok(file_scan_config.build()) } /// Tell DataFusion to push filters down to the scan method diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index dd56b4c137ed..09121eba6702 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -171,11 +171,10 @@ impl FileFormat for ArrowFormat { async fn create_physical_plan( &self, _state: &dyn Session, - mut conf: FileScanConfig, + conf: FileScanConfig, _filters: Option<&Arc>, ) -> Result> { - conf = conf.with_source(Arc::new(ArrowSource::default())); - Ok(conf.new_exec()) + Ok(conf.with_source(Arc::new(ArrowSource::default())).build()) } async fn create_writer_physical_plan( diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index 100aa4fd51e2..c0c8f25722c2 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -148,11 +148,10 @@ impl FileFormat for AvroFormat { async fn create_physical_plan( &self, _state: &dyn Session, - mut conf: FileScanConfig, + conf: FileScanConfig, _filters: Option<&Arc>, ) -> Result> { - conf = conf.with_source(self.file_source()); - Ok(conf.new_exec()) + Ok(conf.with_source(self.file_source()).build()) } fn file_source(&self) -> Arc { diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 7d06648d7ba8..4991a96dc3d3 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -434,9 +434,7 @@ impl FileFormat for CsvFormat { .with_terminator(self.options.terminator) .with_comment(self.options.comment), ); - conf = conf.with_source(source); - - Ok(conf.new_exec()) + Ok(conf.with_source(source).build()) } async fn create_writer_physical_plan( diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 32a527bc5876..94e74b144499 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -254,9 +254,7 @@ impl FileFormat for JsonFormat { ) -> Result> { let source = Arc::new(JsonSource::new()); conf.file_compression_type = FileCompressionType::from(self.options.compression); - conf = conf.with_source(source); - - Ok(conf.new_exec()) + Ok(conf.with_source(source).build()) } async fn create_writer_physical_plan( diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 9774792133cd..7dbc510eca09 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -398,7 +398,7 @@ impl FileFormat for ParquetFormat { async fn create_physical_plan( &self, _state: &dyn Session, - mut conf: FileScanConfig, + conf: FileScanConfig, filters: Option<&Arc>, ) -> Result> { let mut predicate = None; @@ -424,8 +424,7 @@ impl FileFormat for ParquetFormat { if let Some(metadata_size_hint) = metadata_size_hint { source = source.with_metadata_size_hint(metadata_size_hint) } - conf = conf.with_source(Arc::new(source)); - Ok(conf.new_exec()) + Ok(conf.with_source(Arc::new(source)).build()) } async fn create_writer_physical_plan( diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index 6aa330caffab..b0a1d8c8c9e2 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -399,7 +399,7 @@ mod tests { .with_file(meta.into()) .with_projection(Some(vec![0, 1, 2])); - let source_exec = conf.new_exec(); + let source_exec = conf.build(); assert_eq!( source_exec .properties() @@ -472,7 +472,7 @@ mod tests { .with_file(meta.into()) .with_projection(projection); - let source_exec = conf.new_exec(); + let source_exec = conf.build(); assert_eq!( source_exec .properties() @@ -546,7 +546,7 @@ mod tests { .with_file(partitioned_file) .with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)]); - let source_exec = conf.new_exec(); + let source_exec = conf.build(); assert_eq!( source_exec diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 5e017b992581..c0952229b5e0 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -425,7 +425,7 @@ impl ExecutionPlan for CsvExec { /// let file_scan_config = FileScanConfig::new(object_store_url, file_schema, source) /// .with_file(PartitionedFile::new("file1.csv", 100*1024*1024)) /// .with_newlines_in_values(true); // The file contains newlines in values; -/// let exec = file_scan_config.new_exec(); +/// let exec = file_scan_config.build(); /// ``` #[derive(Debug, Clone, Default)] pub struct CsvSource { @@ -836,14 +836,14 @@ mod tests { )?; let source = Arc::new(CsvSource::new(true, b',', b'"')); - let mut config = partitioned_csv_config(file_schema, file_groups, source) + let config = partitioned_csv_config(file_schema, file_groups, source) .with_file_compression_type(file_compression_type) - .with_newlines_in_values(false); - config.projection = Some(vec![0, 2, 4]); - - let csv = config.new_exec(); + .with_newlines_in_values(false) + .with_projection(Some(vec![0, 2, 4])); assert_eq!(13, config.file_schema.fields().len()); + let csv = config.build(); + assert_eq!(3, csv.schema().fields().len()); let mut stream = csv.execute(0, task_ctx)?; @@ -901,12 +901,12 @@ mod tests { )?; let source = Arc::new(CsvSource::new(true, b',', b'"')); - let mut config = partitioned_csv_config(file_schema, file_groups, source) + let config = partitioned_csv_config(file_schema, file_groups, source) .with_newlines_in_values(false) - .with_file_compression_type(file_compression_type.to_owned()); - config.projection = Some(vec![4, 0, 2]); - let csv = config.new_exec(); + .with_file_compression_type(file_compression_type.to_owned()) + .with_projection(Some(vec![4, 0, 2])); assert_eq!(13, config.file_schema.fields().len()); + let csv = config.build(); assert_eq!(3, csv.schema().fields().len()); let mut stream = csv.execute(0, task_ctx)?; @@ -964,12 +964,12 @@ mod tests { )?; let source = Arc::new(CsvSource::new(true, b',', b'"')); - let mut config = partitioned_csv_config(file_schema, file_groups, source) + let config = partitioned_csv_config(file_schema, file_groups, source) .with_newlines_in_values(false) - .with_file_compression_type(file_compression_type.to_owned()); - config.limit = Some(5); - let csv = config.new_exec(); + .with_file_compression_type(file_compression_type.to_owned()) + .with_limit(Some(5)); assert_eq!(13, config.file_schema.fields().len()); + let csv = config.build(); assert_eq!(13, csv.schema().fields().len()); let mut it = csv.execute(0, task_ctx)?; @@ -1024,12 +1024,12 @@ mod tests { )?; let source = Arc::new(CsvSource::new(true, b',', b'"')); - let mut config = partitioned_csv_config(file_schema, file_groups, source) + let config = partitioned_csv_config(file_schema, file_groups, source) .with_newlines_in_values(false) - .with_file_compression_type(file_compression_type.to_owned()); - config.limit = Some(5); - let csv = config.new_exec(); + .with_file_compression_type(file_compression_type.to_owned()) + .with_limit(Some(5)); assert_eq!(14, config.file_schema.fields().len()); + let csv = config.build(); assert_eq!(14, csv.schema().fields().len()); // errors due to https://github.com/apache/datafusion/issues/4918 @@ -1089,8 +1089,8 @@ mod tests { // we don't have `/date=xx/` in the path but that is ok because // partitions are resolved during scan anyway - let csv = config.new_exec(); assert_eq!(13, config.file_schema.fields().len()); + let csv = config.build(); assert_eq!(2, csv.schema().fields().len()); let mut it = csv.execute(0, task_ctx)?; @@ -1179,7 +1179,7 @@ mod tests { let config = partitioned_csv_config(file_schema, file_groups, source) .with_newlines_in_values(false) .with_file_compression_type(file_compression_type.to_owned()); - let csv = config.new_exec(); + let csv = config.build(); let it = csv.execute(0, task_ctx).unwrap(); let batches: Vec<_> = it.try_collect().await.unwrap(); diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index e979eb49d0f6..123ecc2f9582 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -68,21 +68,30 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue { ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val)) } -/// The base configurations to provide when creating a physical plan for +/// The base configurations for a [`DataSourceExec`], the a physical plan for /// any given file format. /// +/// Use [`Self::build`] to create a [`DataSourceExec`] from a ``FileScanConfig`. +/// /// # Example /// ``` /// # use std::sync::Arc; -/// # use arrow::datatypes::Schema; +/// # use arrow::datatypes::{Field, Fields, DataType, Schema}; /// # use datafusion::datasource::listing::PartitionedFile; /// # use datafusion::datasource::physical_plan::FileScanConfig; /// # use datafusion_execution::object_store::ObjectStoreUrl; /// # use datafusion::datasource::physical_plan::ArrowSource; -/// # let file_schema = Arc::new(Schema::empty()); -/// // create FileScan config for reading data from file:// +/// # use datafusion_physical_plan::ExecutionPlan; +/// # let file_schema = Arc::new(Schema::new(vec![ +/// # Field::new("c1", DataType::Int32, false), +/// # Field::new("c2", DataType::Int32, false), +/// # Field::new("c3", DataType::Int32, false), +/// # Field::new("c4", DataType::Int32, false), +/// # ])); +/// // create FileScan config for reading arrow files from file:// /// let object_store_url = ObjectStoreUrl::local_filesystem(); -/// let config = FileScanConfig::new(object_store_url, file_schema, Arc::new(ArrowSource::default())) +/// let file_source = Arc::new(ArrowSource::default()); +/// let config = FileScanConfig::new(object_store_url, file_schema, file_source) /// .with_limit(Some(1000)) // read only the first 1000 records /// .with_projection(Some(vec![2, 3])) // project columns 2 and 3 /// // Read /tmp/file1.parquet with known size of 1234 bytes in a single group @@ -93,6 +102,8 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue { /// PartitionedFile::new("file2.parquet", 56), /// PartitionedFile::new("file3.parquet", 78), /// ]); +/// // create an execution plan from the config +/// let plan: Arc = config.build(); /// ``` #[derive(Clone)] pub struct FileScanConfig { @@ -252,19 +263,20 @@ impl DataSource for FileScanConfig { // If there is any non-column or alias-carrier expression, Projection should not be removed. // This process can be moved into CsvExec, but it would be an overlap of their responsibility. Ok(all_alias_free_columns(projection.expr()).then(|| { - let mut file_scan = self.clone(); + let file_scan = self.clone(); let source = Arc::clone(&file_scan.source); let new_projections = new_projections_for_columns( projection, &file_scan .projection + .clone() .unwrap_or((0..self.file_schema.fields().len()).collect()), ); - file_scan.projection = Some(new_projections); - // Assign projected statistics to source - file_scan = file_scan.with_source(source); - - file_scan.new_exec() as _ + file_scan + // Assign projected statistics to source + .with_projection(Some(new_projections)) + .with_source(source) + .build() as _ })) } } @@ -574,9 +586,9 @@ impl FileScanConfig { } // TODO: This function should be moved into DataSourceExec once FileScanConfig moved out of datafusion/core - /// Returns a new [`DataSourceExec`] from file configurations - pub fn new_exec(&self) -> Arc { - Arc::new(DataSourceExec::new(Arc::new(self.clone()))) + /// Returns a new [`DataSourceExec`] to scan the files specified by this config + pub fn build(self) -> Arc { + Arc::new(DataSourceExec::new(Arc::new(self))) } /// Write the data_type based on file_source diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 51e0a46d942e..590b1cb88dcd 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -589,7 +589,7 @@ mod tests { .with_file_groups(file_groups) .with_limit(Some(3)) .with_file_compression_type(file_compression_type.to_owned()); - let exec = conf.new_exec(); + let exec = conf.build(); // TODO: this is not where schema inference should be tested @@ -660,7 +660,7 @@ mod tests { .with_file_groups(file_groups) .with_limit(Some(3)) .with_file_compression_type(file_compression_type.to_owned()); - let exec = conf.new_exec(); + let exec = conf.build(); let mut it = exec.execute(0, task_ctx)?; let batch = it.next().await.unwrap()?; @@ -700,7 +700,7 @@ mod tests { .with_file_groups(file_groups) .with_projection(Some(vec![0, 2])) .with_file_compression_type(file_compression_type.to_owned()); - let exec = conf.new_exec(); + let exec = conf.build(); let inferred_schema = exec.schema(); assert_eq!(inferred_schema.fields().len(), 2); @@ -745,7 +745,7 @@ mod tests { .with_file_groups(file_groups) .with_projection(Some(vec![3, 0, 2])) .with_file_compression_type(file_compression_type.to_owned()); - let exec = conf.new_exec(); + let exec = conf.build(); let inferred_schema = exec.schema(); assert_eq!(inferred_schema.fields().len(), 3); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index a1c2bb4207ef..4bd43cd1aaca 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -708,7 +708,7 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let parquet_exec = base_config.new_exec(); + let parquet_exec = base_config.clone().build(); RoundTripResult { batches: collect(parquet_exec.clone(), task_ctx).await, parquet_exec, @@ -1354,7 +1354,7 @@ mod tests { Arc::new(ParquetSource::default()), ) .with_file_groups(file_groups) - .new_exec(); + .build(); assert_eq!( parquet_exec .properties() @@ -1468,7 +1468,7 @@ mod tests { false, ), ]) - .new_exec(); + .build(); let partition_count = parquet_exec .source() .output_partitioning() @@ -1531,7 +1531,7 @@ mod tests { Arc::new(ParquetSource::default()), ) .with_file(partitioned_file) - .new_exec(); + .build(); let mut results = parquet_exec.execute(0, state.task_ctx())?; let batch = results.next().await.unwrap(); @@ -2188,7 +2188,7 @@ mod tests { extensions: None, metadata_size_hint: None, }) - .new_exec(); + .build(); let res = collect(exec, ctx.task_ctx()).await.unwrap(); assert_eq!(res.len(), 2); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/source.rs b/datafusion/core/src/datasource/physical_plan/parquet/source.rs index a98524b0bead..21881112075d 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/source.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/source.rs @@ -94,7 +94,7 @@ use object_store::ObjectStore; /// // Create a DataSourceExec for reading `file1.parquet` with a file size of 100MB /// let file_scan_config = FileScanConfig::new(object_store_url, file_schema, source) /// .with_file(PartitionedFile::new("file1.parquet", 100*1024*1024)); -/// let exec = file_scan_config.new_exec(); +/// let exec = file_scan_config.build(); /// ``` /// /// # Features @@ -176,7 +176,7 @@ use object_store::ObjectStore; /// .clone() /// .with_file_groups(vec![file_group.clone()]); /// -/// new_config.new_exec() +/// new_config.build() /// }) /// .collect::>(); /// ``` @@ -219,7 +219,7 @@ use object_store::ObjectStore; /// .with_file(partitioned_file); /// // this parquet DataSourceExec will not even try to read row groups 2 and 4. Additional /// // pruning based on predicates may also happen -/// let exec = file_scan_config.new_exec(); +/// let exec = file_scan_config.build(); /// ``` /// /// For a complete example, see the [`advanced_parquet_index` example]). diff --git a/datafusion/core/src/datasource/schema_adapter.rs b/datafusion/core/src/datasource/schema_adapter.rs index e59d7b669ce0..41e375cf81f8 100644 --- a/datafusion/core/src/datasource/schema_adapter.rs +++ b/datafusion/core/src/datasource/schema_adapter.rs @@ -507,7 +507,7 @@ mod tests { FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, source) .with_file(partitioned_file); - let parquet_exec = base_conf.new_exec(); + let parquet_exec = base_conf.build(); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 5b7a9d8a16eb..ba85f9afb6da 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -93,7 +93,7 @@ pub fn scan_partitioned_csv( let source = Arc::new(CsvSource::new(true, b'"', b'"')); let config = partitioned_csv_config(schema, file_groups, source) .with_file_compression_type(FileCompressionType::UNCOMPRESSED); - Ok(config.new_exec()) + Ok(config.build()) } /// Returns file groups [`Vec>`] for scanning `partitions` of `filename` diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 67e0e1726917..0e0090ef028e 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -156,7 +156,7 @@ impl TestParquetFile { ) -> Result> { let parquet_options = ctx.copied_table_options().parquet; let source = Arc::new(ParquetSource::new(parquet_options.clone())); - let mut scan_config = FileScanConfig::new( + let scan_config = FileScanConfig::new( self.object_store_url.clone(), Arc::clone(&self.schema), source, @@ -185,13 +185,12 @@ impl TestParquetFile { Arc::clone(&scan_config.file_schema), Arc::clone(&physical_filter_expr), )); - scan_config = scan_config.with_source(source); - let parquet_exec = scan_config.new_exec(); + let parquet_exec = scan_config.with_source(source).build(); let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?); Ok(exec) } else { - Ok(scan_config.new_exec()) + Ok(scan_config.build()) } } diff --git a/datafusion/core/tests/fuzz_cases/pruning.rs b/datafusion/core/tests/fuzz_cases/pruning.rs index e11d472b9b8a..ba7dd7272d62 100644 --- a/datafusion/core/tests/fuzz_cases/pruning.rs +++ b/datafusion/core/tests/fuzz_cases/pruning.rs @@ -321,7 +321,7 @@ async fn execute_with_predicate( }) .collect(), ); - let exec = scan.new_exec(); + let exec = scan.build(); let exec = Arc::new(FilterExec::try_new(predicate, exec).unwrap()) as Arc; diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 928b650e0300..b12b3be2d435 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -90,7 +90,7 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { ) .with_file_group(file_group); - let parquet_exec = base_config.new_exec(); + let parquet_exec = base_config.build(); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); diff --git a/datafusion/core/tests/parquet/external_access_plan.rs b/datafusion/core/tests/parquet/external_access_plan.rs index cbf6580b7e4b..1eacbe42c525 100644 --- a/datafusion/core/tests/parquet/external_access_plan.rs +++ b/datafusion/core/tests/parquet/external_access_plan.rs @@ -351,7 +351,7 @@ impl TestFull { let config = FileScanConfig::new(object_store_url, schema.clone(), source) .with_file(partitioned_file); - let plan: Arc = config.new_exec(); + let plan: Arc = config.build(); // run the DataSourceExec and collect the results let results = diff --git a/datafusion/core/tests/parquet/schema_coercion.rs b/datafusion/core/tests/parquet/schema_coercion.rs index 90793028f209..4cbbcf12f32b 100644 --- a/datafusion/core/tests/parquet/schema_coercion.rs +++ b/datafusion/core/tests/parquet/schema_coercion.rs @@ -64,7 +64,7 @@ async fn multi_parquet_coercion() { FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema, source) .with_file_group(file_group); - let parquet_exec = conf.new_exec(); + let parquet_exec = conf.build(); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); @@ -121,7 +121,7 @@ async fn multi_parquet_coercion_projection() { ) .with_file_group(file_group) .with_projection(Some(vec![1, 0, 2])) - .new_exec(); + .build(); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 855550dc748a..50c67f09c704 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -183,7 +183,7 @@ fn parquet_exec_multiple_sorted( vec![PartitionedFile::new("y".to_string(), 100)], ]) .with_output_ordering(output_ordering) - .new_exec() + .build() } fn csv_exec() -> Arc { @@ -198,7 +198,7 @@ fn csv_exec_with_sort(output_ordering: Vec) -> Arc ) .with_file(PartitionedFile::new("x".to_string(), 100)) .with_output_ordering(output_ordering) - .new_exec() + .build() } fn csv_exec_multiple() -> Arc { @@ -217,7 +217,7 @@ fn csv_exec_multiple_sorted(output_ordering: Vec) -> Arc Result<()> { ) .with_file(PartitionedFile::new("x".to_string(), 100)) .with_file_compression_type(compression_type) - .new_exec(), + .build(), vec![("a".to_string(), "a".to_string())], ); assert_optimized!(expected, plan, true, false, 2, true, 10, false); diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index 4b358e47361b..3412b962d859 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -69,7 +69,7 @@ fn csv_exec_ordered( ) .with_file(PartitionedFile::new("file_path".to_string(), 100)) .with_output_ordering(vec![sort_exprs]) - .new_exec() + .build() } /// Created a sorted parquet exec @@ -87,7 +87,7 @@ pub fn parquet_exec_sorted( ) .with_file(PartitionedFile::new("x".to_string(), 100)) .with_output_ordering(vec![sort_exprs]) - .new_exec() + .build() } /// Create a sorted Csv exec @@ -104,7 +104,7 @@ fn csv_exec_sorted( ) .with_file(PartitionedFile::new("x".to_string(), 100)) .with_output_ordering(vec![sort_exprs]) - .new_exec() + .build() } /// Runs the sort enforcement optimizer and asserts the plan diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index c9eadf009130..ac86abcb7fed 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -382,7 +382,7 @@ fn create_simple_csv_exec() -> Arc { ) .with_file(PartitionedFile::new("x".to_string(), 100)) .with_projection(Some(vec![0, 1, 2, 3, 4])) - .new_exec() + .build() } fn create_projecting_csv_exec() -> Arc { @@ -399,7 +399,7 @@ fn create_projecting_csv_exec() -> Arc { ) .with_file(PartitionedFile::new("x".to_string(), 100)) .with_projection(Some(vec![3, 2, 1])) - .new_exec() + .build() } fn create_projecting_memory_exec() -> Arc { diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 5e486a715b41..162f93facc90 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -75,7 +75,7 @@ pub fn parquet_exec(schema: &SchemaRef) -> Arc { Arc::new(ParquetSource::default()), ) .with_file(PartitionedFile::new("x".to_string(), 100)) - .new_exec() + .build() } /// Create a single parquet file that is sorted @@ -89,7 +89,7 @@ pub(crate) fn parquet_exec_with_sort( ) .with_file(PartitionedFile::new("x".to_string(), 100)) .with_output_ordering(output_ordering) - .new_exec() + .build() } pub fn schema() -> SchemaRef { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 84b952965958..a575a42d0b6c 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -243,7 +243,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { )? .with_newlines_in_values(scan.newlines_in_values) .with_file_compression_type(FileCompressionType::UNCOMPRESSED); - Ok(conf.new_exec()) + Ok(conf.build()) } #[cfg_attr(not(feature = "parquet"), allow(unused_variables))] PhysicalPlanType::ParquetScan(scan) => { @@ -280,7 +280,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { extension_codec, Arc::new(source), )?; - Ok(base_config.new_exec()) + Ok(base_config.build()) } #[cfg(not(feature = "parquet"))] panic!("Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled") @@ -292,7 +292,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { extension_codec, Arc::new(AvroSource::new()), )?; - Ok(conf.new_exec()) + Ok(conf.build()) } PhysicalPlanType::CoalesceBatches(coalesce_batches) => { let input: Arc = into_physical_plan( diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 7418184fcac1..a8ecb2d0749e 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -741,7 +741,7 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> { source, }; - roundtrip_test(scan_config.new_exec()) + roundtrip_test(scan_config.build()) } #[tokio::test] @@ -772,7 +772,7 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> { source, }; - roundtrip_test(scan_config.new_exec()) + roundtrip_test(scan_config.build()) } #[test] @@ -918,7 +918,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { } } - let exec_plan = scan_config.new_exec(); + let exec_plan = scan_config.build(); let ctx = SessionContext::new(); roundtrip_test_and_return(exec_plan, &ctx, &CustomPhysicalExtensionCodec {})?; diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs index ce056ddac664..7bbdfc2a5d94 100644 --- a/datafusion/substrait/src/physical_plan/consumer.rs +++ b/datafusion/substrait/src/physical_plan/consumer.rs @@ -152,7 +152,7 @@ pub async fn from_substrait_rel( } } - Ok(base_config.new_exec() as Arc) + Ok(base_config.build() as Arc) } _ => not_impl_err!( "Only LocalFile reads are supported when parsing physical" diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs index 04c5e8ada758..f1284db2ad46 100644 --- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs @@ -49,7 +49,7 @@ async fn parquet_exec() -> Result<()> { 123, )], ]); - let parquet_exec: Arc = scan_config.new_exec(); + let parquet_exec: Arc = scan_config.build(); let mut extension_info: ( Vec,