From 63a73e56578ec45e8f487de7f3e36199594f6528 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 20 Feb 2025 16:35:47 +0000 Subject: [PATCH 1/2] simplify fn signature --- datafusion-examples/examples/csv_json_opener.rs | 2 +- datafusion/core/src/datasource/data_source.rs | 2 +- .../src/datasource/physical_plan/arrow_file.rs | 4 ++-- .../core/src/datasource/physical_plan/avro.rs | 6 +++--- .../core/src/datasource/physical_plan/csv.rs | 4 ++-- .../datasource/physical_plan/file_scan_config.rs | 2 +- .../core/src/datasource/physical_plan/json.rs | 4 ++-- .../datasource/physical_plan/parquet/source.rs | 16 ++++++---------- 8 files changed, 18 insertions(+), 22 deletions(-) diff --git a/datafusion-examples/examples/csv_json_opener.rs b/datafusion-examples/examples/csv_json_opener.rs index 7613578e8c3a..585d8ff29dca 100644 --- a/datafusion-examples/examples/csv_json_opener.rs +++ b/datafusion-examples/examples/csv_json_opener.rs @@ -71,7 +71,7 @@ async fn csv_opener() -> Result<()> { .with_batch_size(8192) .with_projection(&scan_config); - let opener = config.create_file_opener(Ok(object_store), &scan_config, 0)?; + let opener = config.create_file_opener(object_store, &scan_config, 0)?; let mut result = vec![]; let mut stream = diff --git a/datafusion/core/src/datasource/data_source.rs b/datafusion/core/src/datasource/data_source.rs index d31b68019e30..6e66bc0e1882 100644 --- a/datafusion/core/src/datasource/data_source.rs +++ b/datafusion/core/src/datasource/data_source.rs @@ -38,7 +38,7 @@ pub trait FileSource: Send + Sync { /// Creates a `dyn FileOpener` based on given parameters fn create_file_opener( &self, - object_store: datafusion_common::Result>, + object_store: Arc, base_config: &FileScanConfig, partition: usize, ) -> datafusion_common::Result>; diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 4a7cdc192cd3..6e81d7d26388 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -211,12 +211,12 @@ pub struct ArrowSource { impl FileSource for ArrowSource { fn create_file_opener( &self, - object_store: Result>, + object_store: Arc, base_config: &FileScanConfig, _partition: usize, ) -> Result> { Ok(Arc::new(ArrowOpener { - object_store: object_store?, + object_store, projection: base_config.file_column_projection_indices(), })) } diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index b0a1d8c8c9e2..9a1d4944127f 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -194,20 +194,20 @@ impl FileSource for AvroSource { #[cfg(feature = "avro")] fn create_file_opener( &self, - object_store: Result>, + object_store: Arc, _base_config: &FileScanConfig, _partition: usize, ) -> Result> { Ok(Arc::new(private::AvroOpener { config: Arc::new(self.clone()), - object_store: object_store?, + object_store, })) } #[cfg(not(feature = "avro"))] fn create_file_opener( &self, - _object_store: Result>, + _object_store: Arc, _base_config: &FileScanConfig, _partition: usize, ) -> Result> { diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index c0952229b5e0..f0c6045c573d 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -564,14 +564,14 @@ impl CsvOpener { impl FileSource for CsvSource { fn create_file_opener( &self, - object_store: Result>, + object_store: Arc, base_config: &FileScanConfig, _partition: usize, ) -> Result> { Ok(Arc::new(CsvOpener { config: Arc::new(self.clone()), file_compression_type: base_config.file_compression_type, - object_store: object_store?, + object_store, })) } diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 123ecc2f9582..a17e122d9392 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -162,7 +162,7 @@ impl DataSource for FileScanConfig { partition: usize, context: Arc, ) -> Result { - let object_store = context.runtime_env().object_store(&self.object_store_url); + let object_store = context.runtime_env().object_store(&self.object_store_url)?; let source = self .source diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 590b1cb88dcd..f2b88f112e34 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -262,7 +262,7 @@ impl JsonSource { impl FileSource for JsonSource { fn create_file_opener( &self, - object_store: Result>, + object_store: Arc, base_config: &FileScanConfig, _partition: usize, ) -> Result> { @@ -272,7 +272,7 @@ impl FileSource for JsonSource { .expect("Batch size must set before creating opener"), projected_schema: base_config.projected_file_schema(), file_compression_type: base_config.file_compression_type, - object_store: object_store?, + object_store, })) } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/source.rs b/datafusion/core/src/datasource/physical_plan/parquet/source.rs index 21881112075d..63a6a7fb4b76 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/source.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/source.rs @@ -463,7 +463,7 @@ impl ParquetSource { impl FileSource for ParquetSource { fn create_file_opener( &self, - object_store: datafusion_common::Result>, + object_store: Arc, base_config: &FileScanConfig, partition: usize, ) -> datafusion_common::Result> { @@ -475,15 +475,10 @@ impl FileSource for ParquetSource { .clone() .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory)); - let parquet_file_reader_factory = self - .parquet_file_reader_factory - .as_ref() - .map(|f| Ok(Arc::clone(f))) - .unwrap_or_else(|| { - object_store.map(|store| { - Arc::new(DefaultParquetFileReaderFactory::new(store)) as _ - }) - })?; + let parquet_file_reader_factory = + self.parquet_file_reader_factory.clone().unwrap_or_else(|| { + Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _ + }); Ok(Arc::new(ParquetOpener { partition_index: partition, @@ -586,6 +581,7 @@ impl FileSource for ParquetSource { } } } + fn supports_repartition(&self, _config: &FileScanConfig) -> bool { true } From 2ce456d3d9347a8aa564aee2cc446d7a04876786 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 20 Feb 2025 18:49:20 +0000 Subject: [PATCH 2/2] . --- datafusion-examples/examples/csv_json_opener.rs | 2 +- datafusion/core/src/datasource/data_source.rs | 2 +- .../core/src/datasource/physical_plan/arrow_file.rs | 6 +++--- datafusion/core/src/datasource/physical_plan/avro.rs | 8 ++++---- datafusion/core/src/datasource/physical_plan/csv.rs | 6 +++--- .../core/src/datasource/physical_plan/file_scan_config.rs | 2 +- datafusion/core/src/datasource/physical_plan/json.rs | 6 +++--- .../core/src/datasource/physical_plan/parquet/source.rs | 6 +++--- 8 files changed, 19 insertions(+), 19 deletions(-) diff --git a/datafusion-examples/examples/csv_json_opener.rs b/datafusion-examples/examples/csv_json_opener.rs index 585d8ff29dca..ef4ff9d51e7f 100644 --- a/datafusion-examples/examples/csv_json_opener.rs +++ b/datafusion-examples/examples/csv_json_opener.rs @@ -71,7 +71,7 @@ async fn csv_opener() -> Result<()> { .with_batch_size(8192) .with_projection(&scan_config); - let opener = config.create_file_opener(object_store, &scan_config, 0)?; + let opener = config.create_file_opener(object_store, &scan_config, 0); let mut result = vec![]; let mut stream = diff --git a/datafusion/core/src/datasource/data_source.rs b/datafusion/core/src/datasource/data_source.rs index 6e66bc0e1882..e1daf1b9fdad 100644 --- a/datafusion/core/src/datasource/data_source.rs +++ b/datafusion/core/src/datasource/data_source.rs @@ -41,7 +41,7 @@ pub trait FileSource: Send + Sync { object_store: Arc, base_config: &FileScanConfig, partition: usize, - ) -> datafusion_common::Result>; + ) -> Arc; /// Any fn as_any(&self) -> &dyn Any; /// Initialize new type with batch size configuration diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 6e81d7d26388..53a46e54bf79 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -214,11 +214,11 @@ impl FileSource for ArrowSource { object_store: Arc, base_config: &FileScanConfig, _partition: usize, - ) -> Result> { - Ok(Arc::new(ArrowOpener { + ) -> Arc { + Arc::new(ArrowOpener { object_store, projection: base_config.file_column_projection_indices(), - })) + }) } fn as_any(&self) -> &dyn Any { diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index 9a1d4944127f..ded6adbe68b5 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -197,11 +197,11 @@ impl FileSource for AvroSource { object_store: Arc, _base_config: &FileScanConfig, _partition: usize, - ) -> Result> { - Ok(Arc::new(private::AvroOpener { + ) -> Arc { + Arc::new(private::AvroOpener { config: Arc::new(self.clone()), object_store, - })) + }) } #[cfg(not(feature = "avro"))] @@ -210,7 +210,7 @@ impl FileSource for AvroSource { _object_store: Arc, _base_config: &FileScanConfig, _partition: usize, - ) -> Result> { + ) -> Arc { panic!("Avro feature is not enabled in this build") } diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index f0c6045c573d..85308c7dfeef 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -567,12 +567,12 @@ impl FileSource for CsvSource { object_store: Arc, base_config: &FileScanConfig, _partition: usize, - ) -> Result> { - Ok(Arc::new(CsvOpener { + ) -> Arc { + Arc::new(CsvOpener { config: Arc::new(self.clone()), file_compression_type: base_config.file_compression_type, object_store, - })) + }) } fn as_any(&self) -> &dyn Any { diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index a17e122d9392..fb8454fd766d 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -170,7 +170,7 @@ impl DataSource for FileScanConfig { .with_schema(Arc::clone(&self.file_schema)) .with_projection(self); - let opener = source.create_file_opener(object_store, self, partition)?; + let opener = source.create_file_opener(object_store, self, partition); let stream = FileStream::new(self, partition, opener, source.metrics())?; Ok(Box::pin(stream)) diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index f2b88f112e34..a960be832180 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -265,15 +265,15 @@ impl FileSource for JsonSource { object_store: Arc, base_config: &FileScanConfig, _partition: usize, - ) -> Result> { - Ok(Arc::new(JsonOpener { + ) -> Arc { + Arc::new(JsonOpener { batch_size: self .batch_size .expect("Batch size must set before creating opener"), projected_schema: base_config.projected_file_schema(), file_compression_type: base_config.file_compression_type, object_store, - })) + }) } fn as_any(&self) -> &dyn Any { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/source.rs b/datafusion/core/src/datasource/physical_plan/parquet/source.rs index 63a6a7fb4b76..4920b28e0e73 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/source.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/source.rs @@ -466,7 +466,7 @@ impl FileSource for ParquetSource { object_store: Arc, base_config: &FileScanConfig, partition: usize, - ) -> datafusion_common::Result> { + ) -> Arc { let projection = base_config .file_column_projection_indices() .unwrap_or_else(|| (0..base_config.file_schema.fields().len()).collect()); @@ -480,7 +480,7 @@ impl FileSource for ParquetSource { Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _ }); - Ok(Arc::new(ParquetOpener { + Arc::new(ParquetOpener { partition_index: partition, projection: Arc::from(projection), batch_size: self @@ -499,7 +499,7 @@ impl FileSource for ParquetSource { enable_page_index: self.enable_page_index(), enable_bloom_filter: self.bloom_filter_on_read(), schema_adapter_factory, - })) + }) } fn as_any(&self) -> &dyn Any {