diff --git a/datafusion-examples/examples/custom_file_format.rs b/datafusion-examples/examples/custom_file_format.rs index 4d85ce882923..c44210e55318 100644 --- a/datafusion-examples/examples/custom_file_format.rs +++ b/datafusion-examples/examples/custom_file_format.rs @@ -21,11 +21,14 @@ use arrow::{ array::{AsArray, RecordBatch, StringArray, UInt8Array}, datatypes::{DataType, Field, Schema, SchemaRef, UInt64Type}, }; -use datafusion::common::{GetExt, Statistics}; use datafusion::datasource::data_source::FileSource; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::physical_expr::LexRequirement; use datafusion::physical_expr::PhysicalExpr; +use datafusion::{ + catalog::Session, + common::{GetExt, Statistics}, +}; use datafusion::{ datasource::{ file_format::{ @@ -36,7 +39,6 @@ use datafusion::{ MemTable, }, error::Result, - execution::context::SessionState, physical_plan::ExecutionPlan, prelude::SessionContext, }; @@ -84,7 +86,7 @@ impl FileFormat for TSVFileFormat { async fn infer_schema( &self, - state: &SessionState, + state: &dyn Session, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -95,7 +97,7 @@ impl FileFormat for TSVFileFormat { async fn infer_stats( &self, - state: &SessionState, + state: &dyn Session, store: &Arc, table_schema: SchemaRef, object: &ObjectMeta, @@ -107,7 +109,7 @@ impl FileFormat for TSVFileFormat { async fn create_physical_plan( &self, - state: &SessionState, + state: &dyn Session, conf: FileScanConfig, filters: Option<&Arc>, ) -> Result> { @@ -119,7 +121,7 @@ impl FileFormat for TSVFileFormat { async fn create_writer_physical_plan( &self, input: Arc, - state: &SessionState, + state: &dyn Session, conf: FileSinkConfig, order_requirements: Option, ) -> Result> { @@ -153,7 +155,7 @@ impl TSVFileFactory { impl FileFormatFactory for TSVFileFactory { fn create( &self, - state: &SessionState, + state: &dyn Session, format_options: &std::collections::HashMap, ) -> Result> { let mut new_options = format_options.clone(); diff --git a/datafusion/core/src/datasource/physical_plan/file_groups.rs b/datafusion/catalog-listing/src/file_groups.rs similarity index 99% rename from datafusion/core/src/datasource/physical_plan/file_groups.rs rename to datafusion/catalog-listing/src/file_groups.rs index f681dfe219b5..2c2b791f2365 100644 --- a/datafusion/core/src/datasource/physical_plan/file_groups.rs +++ b/datafusion/catalog-listing/src/file_groups.rs @@ -17,7 +17,7 @@ //! Logic for managing groups of [`PartitionedFile`]s in DataFusion -use crate::datasource::listing::{FileRange, PartitionedFile}; +use crate::{FileRange, PartitionedFile}; use itertools::Itertools; use std::cmp::min; use std::collections::BinaryHeap; diff --git a/datafusion/catalog-listing/src/mod.rs b/datafusion/catalog-listing/src/mod.rs index e952e39fd479..709fa88b5867 100644 --- a/datafusion/catalog-listing/src/mod.rs +++ b/datafusion/catalog-listing/src/mod.rs @@ -18,9 +18,9 @@ //! A table that uses the `ObjectStore` listing capability //! to get the list of files to process. +pub mod file_groups; pub mod helpers; pub mod url; - use chrono::TimeZone; use datafusion_common::Result; use datafusion_common::{ScalarValue, Statistics}; diff --git a/datafusion/core/src/datasource/data_source.rs b/datafusion/core/src/datasource/data_source.rs index 3984ffa64c76..03bfb4175022 100644 --- a/datafusion/core/src/datasource/data_source.rs +++ b/datafusion/core/src/datasource/data_source.rs @@ -62,4 +62,9 @@ pub trait FileSource: Send + Sync { fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result { Ok(()) } + /// Return true if the file format supports repartition + /// + /// If this returns true, the DataSourceExec may repartition the data + /// by breaking up the input files into multiple smaller groups. + fn supports_repartition(&self, config: &FileScanConfig) -> bool; } diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index e88144c2ea16..5a4bf103e7ce 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -35,7 +35,6 @@ use crate::datasource::physical_plan::{ ArrowSource, FileGroupDisplay, FileScanConfig, FileSink, FileSinkConfig, }; use crate::error::Result; -use crate::execution::context::SessionState; use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use arrow::ipc::convert::fb_to_schema; @@ -43,6 +42,7 @@ use arrow::ipc::reader::FileReader; use arrow::ipc::writer::IpcWriteOptions; use arrow::ipc::{root_as_message, CompressionType}; use arrow_schema::{ArrowError, Schema, SchemaRef}; +use datafusion_catalog::Session; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION, @@ -84,7 +84,7 @@ impl ArrowFormatFactory { impl FileFormatFactory for ArrowFormatFactory { fn create( &self, - _state: &SessionState, + _state: &dyn Session, _format_options: &HashMap, ) -> Result> { Ok(Arc::new(ArrowFormat)) @@ -135,7 +135,7 @@ impl FileFormat for ArrowFormat { async fn infer_schema( &self, - _state: &SessionState, + _state: &dyn Session, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -159,7 +159,7 @@ impl FileFormat for ArrowFormat { async fn infer_stats( &self, - _state: &SessionState, + _state: &dyn Session, _store: &Arc, table_schema: SchemaRef, _object: &ObjectMeta, @@ -169,7 +169,7 @@ impl FileFormat for ArrowFormat { async fn create_physical_plan( &self, - _state: &SessionState, + _state: &dyn Session, mut conf: FileScanConfig, _filters: Option<&Arc>, ) -> Result> { @@ -180,7 +180,7 @@ impl FileFormat for ArrowFormat { async fn create_writer_physical_plan( &self, input: Arc, - _state: &SessionState, + _state: &dyn Session, conf: FileSinkConfig, order_requirements: Option, ) -> Result> { diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index 8e5192d12f5d..100aa4fd51e2 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -28,7 +28,6 @@ use super::FileFormatFactory; use crate::datasource::avro_to_arrow::read_avro_schema_from_reader; use crate::datasource::physical_plan::{AvroSource, FileScanConfig}; use crate::error::Result; -use crate::execution::context::SessionState; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; @@ -36,6 +35,7 @@ use crate::datasource::data_source::FileSource; use arrow::datatypes::Schema; use arrow::datatypes::SchemaRef; use async_trait::async_trait; +use datafusion_catalog::Session; use datafusion_common::internal_err; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::GetExt; @@ -57,7 +57,7 @@ impl AvroFormatFactory { impl FileFormatFactory for AvroFormatFactory { fn create( &self, - _state: &SessionState, + _state: &dyn Session, _format_options: &HashMap, ) -> Result> { Ok(Arc::new(AvroFormat)) @@ -112,7 +112,7 @@ impl FileFormat for AvroFormat { async fn infer_schema( &self, - _state: &SessionState, + _state: &dyn Session, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -137,7 +137,7 @@ impl FileFormat for AvroFormat { async fn infer_stats( &self, - _state: &SessionState, + _state: &dyn Session, _store: &Arc, table_schema: SchemaRef, _object: &ObjectMeta, @@ -147,7 +147,7 @@ impl FileFormat for AvroFormat { async fn create_physical_plan( &self, - _state: &SessionState, + _state: &dyn Session, mut conf: FileScanConfig, _filters: Option<&Arc>, ) -> Result> { @@ -505,7 +505,7 @@ mod tests { } async fn get_exec( - state: &SessionState, + state: &dyn Session, file_name: &str, projection: Option>, limit: Option, diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 89df3a70963f..eab3494be026 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -44,6 +44,7 @@ use arrow::array::RecordBatch; use arrow::csv::WriterBuilder; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; use arrow_schema::ArrowError; +use datafusion_catalog::Session; use datafusion_common::config::{ConfigField, ConfigFileType, CsvOptions}; use datafusion_common::file_options::csv_writer::CsvWriterOptions; use datafusion_common::{ @@ -95,9 +96,10 @@ impl Debug for CsvFormatFactory { impl FileFormatFactory for CsvFormatFactory { fn create( &self, - state: &SessionState, + state: &dyn Session, format_options: &HashMap, ) -> Result> { + let state = state.as_any().downcast_ref::().unwrap(); let csv_options = match &self.options { None => { let mut table_options = state.default_table_options(); @@ -365,7 +367,7 @@ impl FileFormat for CsvFormat { async fn infer_schema( &self, - state: &SessionState, + state: &dyn Session, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -400,7 +402,7 @@ impl FileFormat for CsvFormat { async fn infer_stats( &self, - _state: &SessionState, + _state: &dyn Session, _store: &Arc, table_schema: SchemaRef, _object: &ObjectMeta, @@ -410,7 +412,7 @@ impl FileFormat for CsvFormat { async fn create_physical_plan( &self, - state: &SessionState, + state: &dyn Session, mut conf: FileScanConfig, _filters: Option<&Arc>, ) -> Result> { @@ -440,7 +442,7 @@ impl FileFormat for CsvFormat { async fn create_writer_physical_plan( &self, input: Arc, - state: &SessionState, + state: &dyn Session, conf: FileSinkConfig, order_requirements: Option, ) -> Result> { @@ -485,7 +487,7 @@ impl CsvFormat { /// number of lines that were read async fn infer_schema_from_stream( &self, - state: &SessionState, + state: &dyn Session, mut records_to_read: usize, stream: impl Stream>, ) -> Result<(Schema, usize)> { @@ -1147,7 +1149,7 @@ mod tests { } async fn get_exec( - state: &SessionState, + state: &dyn Session, file_name: &str, projection: Option>, limit: Option, diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index d96efeec3895..60609e467163 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -36,7 +36,7 @@ use crate::datasource::physical_plan::{ FileGroupDisplay, FileSink, FileSinkConfig, JsonSource, }; use crate::error::Result; -use crate::execution::context::SessionState; +use crate::execution::SessionState; use crate::physical_plan::insert::{DataSink, DataSinkExec}; use crate::physical_plan::{ DisplayAs, DisplayFormatType, SendableRecordBatchStream, Statistics, @@ -48,6 +48,7 @@ use arrow::json; use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter}; use arrow_array::RecordBatch; use arrow_schema::ArrowError; +use datafusion_catalog::Session; use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions}; use datafusion_common::file_options::json_writer::JsonWriterOptions; use datafusion_common::{not_impl_err, GetExt, DEFAULT_JSON_EXTENSION}; @@ -87,9 +88,10 @@ impl JsonFormatFactory { impl FileFormatFactory for JsonFormatFactory { fn create( &self, - state: &SessionState, + state: &dyn Session, format_options: &HashMap, ) -> Result> { + let state = state.as_any().downcast_ref::().unwrap(); let json_options = match &self.options { None => { let mut table_options = state.default_table_options(); @@ -189,7 +191,7 @@ impl FileFormat for JsonFormat { async fn infer_schema( &self, - _state: &SessionState, + _state: &dyn Session, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -237,7 +239,7 @@ impl FileFormat for JsonFormat { async fn infer_stats( &self, - _state: &SessionState, + _state: &dyn Session, _store: &Arc, table_schema: SchemaRef, _object: &ObjectMeta, @@ -247,7 +249,7 @@ impl FileFormat for JsonFormat { async fn create_physical_plan( &self, - _state: &SessionState, + _state: &dyn Session, mut conf: FileScanConfig, _filters: Option<&Arc>, ) -> Result> { @@ -261,7 +263,7 @@ impl FileFormat for JsonFormat { async fn create_writer_physical_plan( &self, input: Arc, - _state: &SessionState, + _state: &dyn Session, conf: FileSinkConfig, order_requirements: Option, ) -> Result> { @@ -538,7 +540,7 @@ mod tests { } async fn get_exec( - state: &SessionState, + state: &dyn Session, projection: Option>, limit: Option, ) -> Result> { diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index bb7e42d3f854..ab5ce91ec57d 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -40,11 +40,11 @@ use std::task::Poll; use crate::arrow::datatypes::SchemaRef; use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; use crate::error::Result; -use crate::execution::context::SessionState; use crate::physical_plan::{ExecutionPlan, Statistics}; use arrow_array::RecordBatch; use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema}; +use datafusion_catalog::Session; use datafusion_common::file_options::file_type::FileType; use datafusion_common::{internal_err, not_impl_err, GetExt}; use datafusion_expr::Expr; @@ -66,7 +66,7 @@ pub trait FileFormatFactory: Sync + Send + GetExt + Debug { /// Initialize a [FileFormat] and configure based on session and command level options fn create( &self, - state: &SessionState, + state: &dyn Session, format_options: &HashMap, ) -> Result>; @@ -104,7 +104,7 @@ pub trait FileFormat: Send + Sync + Debug { /// the files have schemas that cannot be merged. async fn infer_schema( &self, - state: &SessionState, + state: &dyn Session, store: &Arc, objects: &[ObjectMeta], ) -> Result; @@ -118,7 +118,7 @@ pub trait FileFormat: Send + Sync + Debug { /// TODO: should the file source return statistics for only columns referred to in the table schema? async fn infer_stats( &self, - state: &SessionState, + state: &dyn Session, store: &Arc, table_schema: SchemaRef, object: &ObjectMeta, @@ -128,7 +128,7 @@ pub trait FileFormat: Send + Sync + Debug { /// according to this file format. async fn create_physical_plan( &self, - state: &SessionState, + state: &dyn Session, conf: FileScanConfig, filters: Option<&Arc>, ) -> Result>; @@ -138,7 +138,7 @@ pub trait FileFormat: Send + Sync + Debug { async fn create_writer_physical_plan( &self, _input: Arc, - _state: &SessionState, + _state: &dyn Session, _conf: FileSinkConfig, _order_requirements: Option, ) -> Result> { @@ -569,7 +569,7 @@ pub(crate) mod test_util { }; pub async fn scan_format( - state: &SessionState, + state: &dyn Session, format: &dyn FileFormat, store_root: &str, file_name: &str, diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 70121c96ae16..8b91bc2cfdbe 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -39,7 +39,7 @@ use crate::datasource::physical_plan::parquet::source::ParquetSource; use crate::datasource::physical_plan::{FileGroupDisplay, FileSink, FileSinkConfig}; use crate::datasource::statistics::{create_max_min_accs, get_col_stats}; use crate::error::Result; -use crate::execution::context::SessionState; +use crate::execution::SessionState; use crate::physical_plan::insert::{DataSink, DataSinkExec}; use crate::physical_plan::{ Accumulator, DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, @@ -47,6 +47,7 @@ use crate::physical_plan::{ }; use arrow::compute::sum; +use datafusion_catalog::Session; use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions}; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; @@ -121,9 +122,10 @@ impl ParquetFormatFactory { impl FileFormatFactory for ParquetFormatFactory { fn create( &self, - state: &SessionState, + state: &dyn Session, format_options: &std::collections::HashMap, ) -> Result> { + let state = state.as_any().downcast_ref::().unwrap(); let parquet_options = match &self.options { None => { let mut table_options = state.default_table_options(); @@ -325,7 +327,7 @@ impl FileFormat for ParquetFormat { async fn infer_schema( &self, - state: &SessionState, + state: &dyn Session, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -378,7 +380,7 @@ impl FileFormat for ParquetFormat { async fn infer_stats( &self, - _state: &SessionState, + _state: &dyn Session, store: &Arc, table_schema: SchemaRef, object: &ObjectMeta, @@ -395,7 +397,7 @@ impl FileFormat for ParquetFormat { async fn create_physical_plan( &self, - _state: &SessionState, + _state: &dyn Session, mut conf: FileScanConfig, filters: Option<&Arc>, ) -> Result> { @@ -429,7 +431,7 @@ impl FileFormat for ParquetFormat { async fn create_writer_physical_plan( &self, input: Arc, - _state: &SessionState, + _state: &dyn Session, conf: FileSinkConfig, order_requirements: Option, ) -> Result> { @@ -1295,6 +1297,7 @@ pub(crate) mod test_util { mod tests { use super::super::test_util::scan_format; use crate::datasource::listing::{ListingTableUrl, PartitionedFile}; + use crate::execution::SessionState; use crate::physical_plan::collect; use crate::test_util::bounded_stream; use std::fmt::{Display, Formatter}; @@ -2229,13 +2232,13 @@ mod tests { } async fn get_exec( - state: &SessionState, + state: &dyn Session, file_name: &str, projection: Option>, limit: Option, ) -> Result> { let testdata = crate::test_util::parquet_test_data(); - + let state = state.as_any().downcast_ref::().unwrap(); let format = state .get_file_format_factory("parquet") .map(|factory| factory.create(state, &Default::default()).unwrap()) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 7d95a40186b2..f1d00ba9a37a 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -137,7 +137,7 @@ impl ListingTableConfig { } /// Infer `ListingOptions` based on `table_path` suffix. - pub async fn infer_options(self, state: &SessionState) -> Result { + pub async fn infer_options(self, state: &dyn Session) -> Result { let store = if let Some(url) = self.table_paths.first() { state.runtime_env().object_store(url)? } else { @@ -164,7 +164,7 @@ impl ListingTableConfig { format_options .insert("format.compression".to_string(), compression_type.clone()); } - + let state = state.as_any().downcast_ref::().unwrap(); let file_format = state .get_file_format_factory(&file_extension) .ok_or(config_datafusion_err!( @@ -191,7 +191,7 @@ impl ListingTableConfig { } /// Infer the [`SchemaRef`] based on `table_path` suffix. Requires `self.options` to be set prior to using. - pub async fn infer_schema(self, state: &SessionState) -> Result { + pub async fn infer_schema(self, state: &dyn Session) -> Result { match self.options { Some(options) => { let schema = if let Some(url) = self.table_paths.first() { @@ -211,12 +211,12 @@ impl ListingTableConfig { } /// Convenience wrapper for calling `infer_options` and `infer_schema` - pub async fn infer(self, state: &SessionState) -> Result { + pub async fn infer(self, state: &dyn Session) -> Result { self.infer_options(state).await?.infer_schema(state).await } /// Infer the partition columns from the path. Requires `self.options` to be set prior to using. - pub async fn infer_partitions_from_path(self, state: &SessionState) -> Result { + pub async fn infer_partitions_from_path(self, state: &dyn Session) -> Result { match self.options { Some(options) => { let Some(url) = self.table_paths.first() else { @@ -484,7 +484,7 @@ impl ListingOptions { /// locally or ask a remote service to do it (e.g a scheduler). pub async fn infer_schema<'a>( &'a self, - state: &SessionState, + state: &dyn Session, table_path: &'a ListingTableUrl, ) -> Result { let store = state.runtime_env().object_store(table_path)?; @@ -509,7 +509,7 @@ impl ListingOptions { /// Allows specifying partial partitions. pub async fn validate_partitions( &self, - state: &SessionState, + state: &dyn Session, table_path: &ListingTableUrl, ) -> Result<()> { if self.table_partition_cols.is_empty() { @@ -563,7 +563,7 @@ impl ListingOptions { /// and therefore may fail to detect invalid partitioning. pub(crate) async fn infer_partitions( &self, - state: &SessionState, + state: &dyn Session, table_path: &ListingTableUrl, ) -> Result> { let store = state.runtime_env().object_store(table_path)?; @@ -1095,7 +1095,7 @@ impl ListingTable { /// be distributed to different threads / executors. async fn list_files_for_scan<'a>( &'a self, - ctx: &'a SessionState, + ctx: &'a dyn Session, filters: &'a [Expr], limit: Option, ) -> Result<(Vec>, Statistics)> { @@ -1156,7 +1156,7 @@ impl ListingTable { /// If they are not, it infers the statistics from the file and stores them in the cache. async fn do_collect_statistics( &self, - ctx: &SessionState, + ctx: &dyn Session, store: &Arc, part_file: &PartitionedFile, ) -> Result> { diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 5c5dce46e936..1a486a54ca39 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -256,6 +256,10 @@ impl FileSource for ArrowSource { fn file_type(&self) -> &str { "arrow" } + + fn supports_repartition(&self, config: &FileScanConfig) -> bool { + !(config.file_compression_type.is_compressed() || config.new_lines_in_values) + } } /// The struct arrow that implements `[FileOpener]` trait diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index ee3f7071c11f..b148c412c48e 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -255,6 +255,11 @@ impl FileSource for AvroSource { fn file_type(&self) -> &str { "avro" } + fn supports_repartition(&self, config: &FileScanConfig) -> bool { + !(config.file_compression_type.is_compressed() + || config.new_lines_in_values + || self.as_any().downcast_ref::().is_some()) + } } #[cfg(feature = "avro")] diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 269af4df903a..bfc2c1df8eab 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -618,6 +618,9 @@ impl FileSource for CsvSource { fn fmt_extra(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { write!(f, ", has_header={}", self.has_header) } + fn supports_repartition(&self, config: &FileScanConfig) -> bool { + !(config.file_compression_type.is_compressed() || config.new_lines_in_values) + } } impl FileOpener for CsvOpener { diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 652632c31554..e31c7dfc247a 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -19,8 +19,8 @@ //! file sources. use super::{ - get_projected_output_ordering, statistics::MinMaxStatistics, AvroSource, - FileGroupPartitioner, FileGroupsDisplay, FileStream, + get_projected_output_ordering, statistics::MinMaxStatistics, FileGroupPartitioner, + FileGroupsDisplay, FileStream, }; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}; @@ -209,7 +209,7 @@ impl DataSource for FileScanConfig { repartition_file_min_size: usize, output_ordering: Option, ) -> Result>> { - if !self.supports_repartition() { + if !self.source.supports_repartition(self) { return Ok(None); } @@ -598,12 +598,6 @@ impl FileScanConfig { pub fn file_source(&self) -> &Arc { &self.source } - - fn supports_repartition(&self) -> bool { - !(self.file_compression_type.is_compressed() - || self.new_lines_in_values - || self.source.as_any().downcast_ref::().is_some()) - } } /// A helper that projects partition columns into the file record batches. diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index f581f5cd0b4a..76cb657b0c5f 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -313,6 +313,10 @@ impl FileSource for JsonSource { fn file_type(&self) -> &str { "json" } + + fn supports_repartition(&self, config: &FileScanConfig) -> bool { + !(config.file_compression_type.is_compressed() || config.new_lines_in_values) + } } impl FileOpener for JsonOpener { diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 28ac73cef521..4cc65714d53e 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -20,14 +20,12 @@ mod arrow_file; mod avro; mod csv; -mod file_groups; mod file_scan_config; mod file_stream; mod json; #[cfg(feature = "parquet")] pub mod parquet; mod statistics; - pub(crate) use self::csv::plan_to_csv; pub(crate) use self::json::plan_to_json; #[cfg(feature = "parquet")] @@ -37,7 +35,6 @@ pub use self::parquet::source::ParquetSource; pub use self::parquet::{ ParquetExec, ParquetExecBuilder, ParquetFileMetrics, ParquetFileReaderFactory, }; - #[allow(deprecated)] pub use arrow_file::ArrowExec; pub use arrow_file::ArrowSource; @@ -47,8 +44,8 @@ pub use avro::AvroSource; #[allow(deprecated)] pub use csv::{CsvExec, CsvExecBuilder}; pub use csv::{CsvOpener, CsvSource}; +pub use datafusion_catalog_listing::file_groups::FileGroupPartitioner; use datafusion_expr::dml::InsertOp; -pub use file_groups::FileGroupPartitioner; pub use file_scan_config::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, }; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/source.rs b/datafusion/core/src/datasource/physical_plan/parquet/source.rs index c00fe91b859f..0705a398f4fb 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/source.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/source.rs @@ -586,4 +586,7 @@ impl FileSource for ParquetSource { } } } + fn supports_repartition(&self, _config: &FileScanConfig) -> bool { + true + } }