diff --git a/datafusion-examples/examples/csv_json_opener.rs b/datafusion-examples/examples/csv_json_opener.rs index 7613578e8c3a..ef4ff9d51e7f 100644 --- a/datafusion-examples/examples/csv_json_opener.rs +++ b/datafusion-examples/examples/csv_json_opener.rs @@ -71,7 +71,7 @@ async fn csv_opener() -> Result<()> { .with_batch_size(8192) .with_projection(&scan_config); - let opener = config.create_file_opener(Ok(object_store), &scan_config, 0)?; + let opener = config.create_file_opener(object_store, &scan_config, 0); let mut result = vec![]; let mut stream = diff --git a/datafusion/core/src/datasource/data_source.rs b/datafusion/core/src/datasource/data_source.rs index fcb31194eab1..2db79c5c839d 100644 --- a/datafusion/core/src/datasource/data_source.rs +++ b/datafusion/core/src/datasource/data_source.rs @@ -40,10 +40,10 @@ pub trait FileSource: Send + Sync { /// Creates a `dyn FileOpener` based on given parameters fn create_file_opener( &self, - object_store: datafusion_common::Result>, + object_store: Arc, base_config: &FileScanConfig, partition: usize, - ) -> datafusion_common::Result>; + ) -> Arc; /// Any fn as_any(&self) -> &dyn Any; /// Initialize new type with batch size configuration diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index d0d037924862..e5523063c782 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -211,14 +211,14 @@ pub struct ArrowSource { impl FileSource for ArrowSource { fn create_file_opener( &self, - object_store: Result>, + object_store: Arc, base_config: &FileScanConfig, _partition: usize, - ) -> Result> { - Ok(Arc::new(ArrowOpener { - object_store: object_store?, + ) -> Arc { + Arc::new(ArrowOpener { + object_store, projection: base_config.file_column_projection_indices(), - })) + }) } fn as_any(&self) -> &dyn Any { diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index ae98c19a1615..1674814d76a7 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -194,23 +194,23 @@ impl FileSource for AvroSource { #[cfg(feature = "avro")] fn create_file_opener( &self, - object_store: Result>, + object_store: Arc, _base_config: &FileScanConfig, _partition: usize, - ) -> Result> { - Ok(Arc::new(private::AvroOpener { + ) -> Arc { + Arc::new(private::AvroOpener { config: Arc::new(self.clone()), - object_store: object_store?, - })) + object_store, + }) } #[cfg(not(feature = "avro"))] fn create_file_opener( &self, - _object_store: Result>, + _object_store: Arc, _base_config: &FileScanConfig, _partition: usize, - ) -> Result> { + ) -> Arc { panic!("Avro feature is not enabled in this build") } diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 8fcfd6b41e85..629d452064f5 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -564,15 +564,15 @@ impl CsvOpener { impl FileSource for CsvSource { fn create_file_opener( &self, - object_store: Result>, + object_store: Arc, base_config: &FileScanConfig, _partition: usize, - ) -> Result> { - Ok(Arc::new(CsvOpener { + ) -> Arc { + Arc::new(CsvOpener { config: Arc::new(self.clone()), file_compression_type: base_config.file_compression_type, - object_store: object_store?, - })) + object_store, + }) } fn as_any(&self) -> &dyn Any { diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 5c882ed75109..6b74f6be79eb 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -162,7 +162,7 @@ impl DataSource for FileScanConfig { partition: usize, context: Arc, ) -> Result { - let object_store = context.runtime_env().object_store(&self.object_store_url); + let object_store = context.runtime_env().object_store(&self.object_store_url)?; let source = self .source @@ -170,7 +170,7 @@ impl DataSource for FileScanConfig { .with_schema(Arc::clone(&self.file_schema)) .with_projection(self); - let opener = source.create_file_opener(object_store, self, partition)?; + let opener = source.create_file_opener(object_store, self, partition); let stream = FileStream::new(self, partition, opener, source.metrics())?; Ok(Box::pin(stream)) diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index f2304ed8a342..d1ae13b083ab 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -262,18 +262,18 @@ impl JsonSource { impl FileSource for JsonSource { fn create_file_opener( &self, - object_store: Result>, + object_store: Arc, base_config: &FileScanConfig, _partition: usize, - ) -> Result> { - Ok(Arc::new(JsonOpener { + ) -> Arc { + Arc::new(JsonOpener { batch_size: self .batch_size .expect("Batch size must set before creating opener"), projected_schema: base_config.projected_file_schema(), file_compression_type: base_config.file_compression_type, - object_store: object_store?, - })) + object_store, + }) } fn as_any(&self) -> &dyn Any { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/source.rs b/datafusion/core/src/datasource/physical_plan/parquet/source.rs index 26a5877e2d38..178de8f51ae4 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/source.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/source.rs @@ -463,10 +463,10 @@ impl ParquetSource { impl FileSource for ParquetSource { fn create_file_opener( &self, - object_store: datafusion_common::Result>, + object_store: Arc, base_config: &FileScanConfig, partition: usize, - ) -> datafusion_common::Result> { + ) -> Arc { let projection = base_config .file_column_projection_indices() .unwrap_or_else(|| (0..base_config.file_schema.fields().len()).collect()); @@ -475,17 +475,12 @@ impl FileSource for ParquetSource { .clone() .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory)); - let parquet_file_reader_factory = self - .parquet_file_reader_factory - .as_ref() - .map(|f| Ok(Arc::clone(f))) - .unwrap_or_else(|| { - object_store.map(|store| { - Arc::new(DefaultParquetFileReaderFactory::new(store)) as _ - }) - })?; - - Ok(Arc::new(ParquetOpener { + let parquet_file_reader_factory = + self.parquet_file_reader_factory.clone().unwrap_or_else(|| { + Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _ + }); + + Arc::new(ParquetOpener { partition_index: partition, projection: Arc::from(projection), batch_size: self @@ -504,7 +499,7 @@ impl FileSource for ParquetSource { enable_page_index: self.enable_page_index(), enable_bloom_filter: self.bloom_filter_on_read(), schema_adapter_factory, - })) + }) } fn as_any(&self) -> &dyn Any {