From 4033d5602eff08850b99f9bcf978cf38c4562bec Mon Sep 17 00:00:00 2001 From: logan-keede Date: Tue, 11 Feb 2025 18:43:54 +0530 Subject: [PATCH 1/9] exploration with cargo machete --- Cargo.lock | 19 ------------------- datafusion-examples/Cargo.toml | 2 -- .../ffi/ffi_example_table_provider/Cargo.toml | 1 - datafusion/catalog/Cargo.toml | 1 - datafusion/functions/Cargo.toml | 3 +-- datafusion/physical-optimizer/Cargo.toml | 4 ---- datafusion/physical-plan/Cargo.toml | 1 - datafusion/wasmtest/Cargo.toml | 9 --------- 8 files changed, 1 insertion(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 10f437c47ddb..080bcb463910 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1816,7 +1816,6 @@ dependencies = [ "itertools 0.14.0", "log", "parking_lot", - "sqlparser", "tokio", ] @@ -1924,10 +1923,8 @@ version = "45.0.0" dependencies = [ "arrow", "arrow-flight", - "arrow-schema", "async-trait", "bytes", - "dashmap", "datafusion", "datafusion-proto", "env_logger", @@ -2029,7 +2026,6 @@ dependencies = [ "datafusion-expr", "datafusion-expr-common", "datafusion-macros", - "hashbrown 0.14.5", "hex", "itertools 0.14.0", "log", @@ -2217,13 +2213,9 @@ dependencies = [ "datafusion-physical-expr", "datafusion-physical-expr-common", "datafusion-physical-plan", - "futures", "itertools 0.14.0", "log", "recursive", - "rstest", - "tokio", - "url", ] [[package]] @@ -2233,7 +2225,6 @@ dependencies = [ "ahash 0.8.11", "arrow", "arrow-ord", - "arrow-schema", "async-trait", "chrono", "criterion", @@ -2376,23 +2367,14 @@ dependencies = [ "chrono", "console_error_panic_hook", "datafusion", - "datafusion-catalog", "datafusion-common", - "datafusion-common-runtime", "datafusion-execution", "datafusion-expr", - "datafusion-expr-common", - "datafusion-functions", - "datafusion-functions-aggregate", - "datafusion-functions-aggregate-common", - "datafusion-functions-table", "datafusion-optimizer", "datafusion-physical-expr", - "datafusion-physical-expr-common", "datafusion-physical-plan", "datafusion-sql", "getrandom 0.2.15", - "parquet", "tokio", "wasm-bindgen", "wasm-bindgen-futures", @@ -2622,7 +2604,6 @@ version = "0.1.0" dependencies = [ "abi_stable", "arrow", - "arrow-schema", "datafusion", "datafusion-ffi", "ffi_module_interface", diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index ec6e0ab71d50..6ba87ed625cf 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -56,10 +56,8 @@ path = "examples/external_dependency/query-aws-s3.rs" arrow = { workspace = true } # arrow_schema is required for record_batch! macro :sad: arrow-flight = { workspace = true } -arrow-schema = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } -dashmap = { workspace = true } # note only use main datafusion crate for examples datafusion = { workspace = true, default-features = true, features = ["avro"] } datafusion-proto = { workspace = true } diff --git a/datafusion-examples/examples/ffi/ffi_example_table_provider/Cargo.toml b/datafusion-examples/examples/ffi/ffi_example_table_provider/Cargo.toml index 2d91ea2329e4..e9c0c5b43d68 100644 --- a/datafusion-examples/examples/ffi/ffi_example_table_provider/Cargo.toml +++ b/datafusion-examples/examples/ffi/ffi_example_table_provider/Cargo.toml @@ -24,7 +24,6 @@ publish = false [dependencies] abi_stable = "0.11.3" arrow = { workspace = true } -arrow-schema = { workspace = true } datafusion = { workspace = true } datafusion-ffi = { workspace = true } ffi_module_interface = { path = "../ffi_module_interface" } diff --git a/datafusion/catalog/Cargo.toml b/datafusion/catalog/Cargo.toml index 749457855ca2..73ac44a0316e 100644 --- a/datafusion/catalog/Cargo.toml +++ b/datafusion/catalog/Cargo.toml @@ -40,7 +40,6 @@ futures = { workspace = true } itertools = { workspace = true } log = { workspace = true } parking_lot = { workspace = true } -sqlparser = { workspace = true } [dev-dependencies] tokio = { workspace = true } diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index db3e6838f6a5..a890b7c7d65b 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -56,7 +56,7 @@ regex_expressions = ["regex"] # enable string functions string_expressions = ["uuid"] # enable unicode functions -unicode_expressions = ["hashbrown", "unicode-segmentation"] +unicode_expressions = ["unicode-segmentation"] [lib] name = "datafusion_functions" @@ -77,7 +77,6 @@ datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-expr-common = { workspace = true } datafusion-macros = { workspace = true } -hashbrown = { workspace = true, optional = true } hex = { version = "0.4", optional = true } itertools = { workspace = true } log = { workspace = true } diff --git a/datafusion/physical-optimizer/Cargo.toml b/datafusion/physical-optimizer/Cargo.toml index d189dc3920af..c9c86e9c8d5c 100644 --- a/datafusion/physical-optimizer/Cargo.toml +++ b/datafusion/physical-optimizer/Cargo.toml @@ -43,14 +43,10 @@ datafusion-expr-common = { workspace = true, default-features = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } -futures = { workspace = true } itertools = { workspace = true } log = { workspace = true } recursive = { workspace = true, optional = true } -url = { workspace = true } [dev-dependencies] datafusion-expr = { workspace = true } datafusion-functions-nested = { workspace = true } -rstest = { workspace = true } -tokio = { workspace = true } diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index b84243b1b56b..410863272b31 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -42,7 +42,6 @@ path = "src/lib.rs" ahash = { workspace = true } arrow = { workspace = true } arrow-ord = { workspace = true } -arrow-schema = { workspace = true } async-trait = { workspace = true } chrono = { workspace = true } datafusion-common = { workspace = true, default-features = true } diff --git a/datafusion/wasmtest/Cargo.toml b/datafusion/wasmtest/Cargo.toml index aae66e6b9a97..7db051ad191f 100644 --- a/datafusion/wasmtest/Cargo.toml +++ b/datafusion/wasmtest/Cargo.toml @@ -43,25 +43,16 @@ chrono = { version = "0.4", features = ["wasmbind"] } # code size when deploying. console_error_panic_hook = { version = "0.1.1", optional = true } datafusion = { workspace = true } -datafusion-catalog = { workspace = true } datafusion-common = { workspace = true, default-features = true } -datafusion-common-runtime = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } -datafusion-expr-common = { workspace = true } -datafusion-functions = { workspace = true } -datafusion-functions-aggregate = { workspace = true } -datafusion-functions-aggregate-common = { workspace = true } -datafusion-functions-table = { workspace = true } datafusion-optimizer = { workspace = true, default-features = true } datafusion-physical-expr = { workspace = true, default-features = true } -datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } datafusion-sql = { workspace = true } # getrandom must be compiled with js feature getrandom = { version = "0.2.8", features = ["js"] } -parquet = { workspace = true } wasm-bindgen = "0.2.99" wasm-bindgen-futures = "0.4.49" From 15d2a4d0310dec9e8112811d5a2956f30390901e Mon Sep 17 00:00:00 2001 From: logan-keede Date: Tue, 11 Feb 2025 19:28:17 +0530 Subject: [PATCH 2/9] readdition --- Cargo.lock | 2 ++ datafusion-examples/Cargo.toml | 2 ++ 2 files changed, 4 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 080bcb463910..b54b86b78d55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1923,8 +1923,10 @@ version = "45.0.0" dependencies = [ "arrow", "arrow-flight", + "arrow-schema", "async-trait", "bytes", + "dashmap", "datafusion", "datafusion-proto", "env_logger", diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 6ba87ed625cf..ec6e0ab71d50 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -56,8 +56,10 @@ path = "examples/external_dependency/query-aws-s3.rs" arrow = { workspace = true } # arrow_schema is required for record_batch! macro :sad: arrow-flight = { workspace = true } +arrow-schema = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } +dashmap = { workspace = true } # note only use main datafusion crate for examples datafusion = { workspace = true, default-features = true, features = ["avro"] } datafusion-proto = { workspace = true } From bc10844310f2b3a5107fab3bc606207e645db89c Mon Sep 17 00:00:00 2001 From: logan-keede Date: Tue, 11 Feb 2025 22:26:07 +0530 Subject: [PATCH 3/9] more dependency removals --- Cargo.lock | 2 +- datafusion/core/Cargo.toml | 1 - datafusion/physical-plan/Cargo.toml | 1 + 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b54b86b78d55..8cfb217bfb02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1734,7 +1734,6 @@ dependencies = [ "datafusion-catalog-listing", "datafusion-common", "datafusion-common-runtime", - "datafusion-doc", "datafusion-execution", "datafusion-expr", "datafusion-functions", @@ -2227,6 +2226,7 @@ dependencies = [ "ahash 0.8.11", "arrow", "arrow-ord", + "arrow-schema", "async-trait", "chrono", "criterion", diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 7b5e2d58d3af..108cbf76e63e 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -95,7 +95,6 @@ datafusion-catalog = { workspace = true } datafusion-catalog-listing = { workspace = true } datafusion-common = { workspace = true, features = ["object_store"] } datafusion-common-runtime = { workspace = true } -datafusion-doc = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-functions = { workspace = true } diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 410863272b31..b84243b1b56b 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -42,6 +42,7 @@ path = "src/lib.rs" ahash = { workspace = true } arrow = { workspace = true } arrow-ord = { workspace = true } +arrow-schema = { workspace = true } async-trait = { workspace = true } chrono = { workspace = true } datafusion-common = { workspace = true, default-features = true } From 5786e9677366f2b6ca01d2cc815b5552c67a4db1 Mon Sep 17 00:00:00 2001 From: logan-keede Date: Tue, 11 Feb 2025 23:10:31 +0530 Subject: [PATCH 4/9] fix: ci --- Cargo.lock | 1 + datafusion/core/Cargo.toml | 1 + datafusion/core/src/datasource/physical_plan/file_stream.rs | 4 ++-- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8cfb217bfb02..c4518133de2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1734,6 +1734,7 @@ dependencies = [ "datafusion-catalog-listing", "datafusion-common", "datafusion-common-runtime", + "datafusion-doc", "datafusion-execution", "datafusion-expr", "datafusion-functions", diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 108cbf76e63e..26b4a3d8d1bb 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -129,6 +129,7 @@ xz2 = { version = "0.1", optional = true, features = ["static"] } zstd = { version = "0.13", optional = true, default-features = false } [dev-dependencies] +datafusion-doc = { workspace = true } async-trait = { workspace = true } criterion = { version = "0.5", features = ["async_tokio"] } ctor = { workspace = true } diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index 497af101bee7..8515d954b4df 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -525,7 +525,7 @@ mod tests { use crate::prelude::SessionContext; use crate::test::{make_partition, object_store::register_test_store}; - use crate::datasource::physical_plan::CsvSource; + use crate::datasource::physical_plan::AvroSource; use arrow::datatypes::Schema; use datafusion_common::internal_err; @@ -649,7 +649,7 @@ mod tests { let config = FileScanConfig::new( ObjectStoreUrl::parse("test:///").unwrap(), file_schema, - Arc::new(CsvSource::default()), + Arc::new(AvroSource::default()), ) .with_file_group(file_group) .with_limit(self.limit); From e8d50e463b7a3504b1f5fc06f571419febbaa202 Mon Sep 17 00:00:00 2001 From: logan-keede Date: Tue, 11 Feb 2025 23:11:16 +0530 Subject: [PATCH 5/9] fix: format --- datafusion/core/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 26b4a3d8d1bb..1fa1fd340c59 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -129,11 +129,11 @@ xz2 = { version = "0.1", optional = true, features = ["static"] } zstd = { version = "0.13", optional = true, default-features = false } [dev-dependencies] -datafusion-doc = { workspace = true } async-trait = { workspace = true } criterion = { version = "0.5", features = ["async_tokio"] } ctor = { workspace = true } dashmap = "6.1.0" +datafusion-doc = { workspace = true } datafusion-functions-window-common = { workspace = true } datafusion-physical-optimizer = { workspace = true } doc-comment = { workspace = true } From b5467944f33866cd36857458404d076103ef0508 Mon Sep 17 00:00:00 2001 From: logan-keede Date: Wed, 12 Feb 2025 00:06:22 +0530 Subject: [PATCH 6/9] revert unnecessary --- datafusion/core/src/datasource/physical_plan/file_stream.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index 8515d954b4df..497af101bee7 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -525,7 +525,7 @@ mod tests { use crate::prelude::SessionContext; use crate::test::{make_partition, object_store::register_test_store}; - use crate::datasource::physical_plan::AvroSource; + use crate::datasource::physical_plan::CsvSource; use arrow::datatypes::Schema; use datafusion_common::internal_err; @@ -649,7 +649,7 @@ mod tests { let config = FileScanConfig::new( ObjectStoreUrl::parse("test:///").unwrap(), file_schema, - Arc::new(AvroSource::default()), + Arc::new(CsvSource::default()), ) .with_file_group(file_group) .with_limit(self.limit); From 9dd90dc6db1a527ad97bf5c1f6313e182b2cbaae Mon Sep 17 00:00:00 2001 From: logan-keede Date: Wed, 12 Feb 2025 01:29:51 +0530 Subject: [PATCH 7/9] First Iteration --- datafusion/catalog-listing/src/file_meta.rs | 52 ++++ .../catalog-listing/src/file_scan_config.rs | 260 +++++++++++++++++ .../catalog-listing/src/file_stream_part.rs | 214 ++++++++++++++ datafusion/catalog-listing/src/mod.rs | 3 + .../core/src/datasource/physical_plan/avro.rs | 2 +- .../core/src/datasource/physical_plan/csv.rs | 2 +- .../physical_plan/file_scan_config.rs | 269 +----------------- .../datasource/physical_plan/file_stream.rs | 181 +----------- .../core/src/datasource/physical_plan/json.rs | 2 +- .../core/src/datasource/physical_plan/mod.rs | 34 +-- 10 files changed, 543 insertions(+), 476 deletions(-) create mode 100644 datafusion/catalog-listing/src/file_meta.rs create mode 100644 datafusion/catalog-listing/src/file_scan_config.rs create mode 100644 datafusion/catalog-listing/src/file_stream_part.rs diff --git a/datafusion/catalog-listing/src/file_meta.rs b/datafusion/catalog-listing/src/file_meta.rs new file mode 100644 index 000000000000..098a15eeb38a --- /dev/null +++ b/datafusion/catalog-listing/src/file_meta.rs @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use object_store::{path::Path, ObjectMeta}; + +use crate::FileRange; + +/// A single file or part of a file that should be read, along with its schema, statistics +pub struct FileMeta { + /// Path for the file (e.g. URL, filesystem path, etc) + pub object_meta: ObjectMeta, + /// An optional file range for a more fine-grained parallel execution + pub range: Option, + /// An optional field for user defined per object metadata + pub extensions: Option>, + /// Size hint for the metadata of this file + pub metadata_size_hint: Option, +} + +impl FileMeta { + /// The full path to the object + pub fn location(&self) -> &Path { + &self.object_meta.location + } +} + +impl From for FileMeta { + fn from(object_meta: ObjectMeta) -> Self { + Self { + object_meta, + range: None, + extensions: None, + metadata_size_hint: None, + } + } +} diff --git a/datafusion/catalog-listing/src/file_scan_config.rs b/datafusion/catalog-listing/src/file_scan_config.rs new file mode 100644 index 000000000000..f4d5af130d37 --- /dev/null +++ b/datafusion/catalog-listing/src/file_scan_config.rs @@ -0,0 +1,260 @@ +use std::{borrow::Cow, collections::HashMap, marker::PhantomData, sync::Arc}; + +use arrow::{ + array::{ + ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch, + RecordBatchOptions, + }, + buffer::Buffer, + datatypes::{ArrowNativeType, DataType, SchemaRef, UInt16Type}, +}; +use datafusion_common::{exec_err, Result}; +use datafusion_common::{DataFusionError, ScalarValue}; +use log::warn; +/// A helper that projects partition columns into the file record batches. +/// +/// One interesting trick is the usage of a cache for the key buffers of the partition column +/// dictionaries. Indeed, the partition columns are constant, so the dictionaries that represent them +/// have all their keys equal to 0. This enables us to re-use the same "all-zero" buffer across batches, +/// which makes the space consumption of the partition columns O(batch_size) instead of O(record_count). +pub struct PartitionColumnProjector { + /// An Arrow buffer initialized to zeros that represents the key array of all partition + /// columns (partition columns are materialized by dictionary arrays with only one + /// value in the dictionary, thus all the keys are equal to zero). + key_buffer_cache: ZeroBufferGenerators, + /// Mapping between the indexes in the list of partition columns and the target + /// schema. Sorted by index in the target schema so that we can iterate on it to + /// insert the partition columns in the target record batch. + projected_partition_indexes: Vec<(usize, usize)>, + /// The schema of the table once the projection was applied. + projected_schema: SchemaRef, +} + +impl PartitionColumnProjector { + // Create a projector to insert the partitioning columns into batches read from files + // - `projected_schema`: the target schema with both file and partitioning columns + // - `table_partition_cols`: all the partitioning column names + pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self { + let mut idx_map = HashMap::new(); + for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() { + if let Ok(schema_idx) = projected_schema.index_of(partition_name) { + idx_map.insert(partition_idx, schema_idx); + } + } + + let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect(); + projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b)); + + Self { + projected_partition_indexes, + key_buffer_cache: Default::default(), + projected_schema, + } + } + + // Transform the batch read from the file by inserting the partitioning columns + // to the right positions as deduced from `projected_schema` + // - `file_batch`: batch read from the file, with internal projection applied + // - `partition_values`: the list of partition values, one for each partition column + pub fn project( + &mut self, + file_batch: RecordBatch, + partition_values: &[ScalarValue], + ) -> Result { + let expected_cols = + self.projected_schema.fields().len() - self.projected_partition_indexes.len(); + + if file_batch.columns().len() != expected_cols { + return exec_err!( + "Unexpected batch schema from file, expected {} cols but got {}", + expected_cols, + file_batch.columns().len() + ); + } + + let mut cols = file_batch.columns().to_vec(); + for &(pidx, sidx) in &self.projected_partition_indexes { + let p_value = + partition_values + .get(pidx) + .ok_or(DataFusionError::Execution( + "Invalid partitioning found on disk".to_string(), + ))?; + + let mut partition_value = Cow::Borrowed(p_value); + + // check if user forgot to dict-encode the partition value + let field = self.projected_schema.field(sidx); + let expected_data_type = field.data_type(); + let actual_data_type = partition_value.data_type(); + if let DataType::Dictionary(key_type, _) = expected_data_type { + if !matches!(actual_data_type, DataType::Dictionary(_, _)) { + warn!("Partition value for column {} was not dictionary-encoded, applied auto-fix.", field.name()); + partition_value = Cow::Owned(ScalarValue::Dictionary( + key_type.clone(), + Box::new(partition_value.as_ref().clone()), + )); + } + } + + cols.insert( + sidx, + create_output_array( + &mut self.key_buffer_cache, + partition_value.as_ref(), + file_batch.num_rows(), + )?, + ) + } + + RecordBatch::try_new_with_options( + Arc::clone(&self.projected_schema), + cols, + &RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())), + ) + .map_err(Into::into) + } +} + +#[derive(Debug, Default)] +struct ZeroBufferGenerators { + gen_i8: ZeroBufferGenerator, + gen_i16: ZeroBufferGenerator, + gen_i32: ZeroBufferGenerator, + gen_i64: ZeroBufferGenerator, + gen_u8: ZeroBufferGenerator, + gen_u16: ZeroBufferGenerator, + gen_u32: ZeroBufferGenerator, + gen_u64: ZeroBufferGenerator, +} + +/// Generate a arrow [`Buffer`] that contains zero values. +#[derive(Debug, Default)] +struct ZeroBufferGenerator +where + T: ArrowNativeType, +{ + cache: Option, + _t: PhantomData, +} + +impl ZeroBufferGenerator +where + T: ArrowNativeType, +{ + const SIZE: usize = size_of::(); + + fn get_buffer(&mut self, n_vals: usize) -> Buffer { + match &mut self.cache { + Some(buf) if buf.len() >= n_vals * Self::SIZE => { + buf.slice_with_length(0, n_vals * Self::SIZE) + } + _ => { + let mut key_buffer_builder = BufferBuilder::::new(n_vals); + key_buffer_builder.advance(n_vals); // keys are all 0 + self.cache.insert(key_buffer_builder.finish()).clone() + } + } + } +} + +fn create_dict_array( + buffer_gen: &mut ZeroBufferGenerator, + dict_val: &ScalarValue, + len: usize, + data_type: DataType, +) -> Result +where + T: ArrowNativeType, +{ + let dict_vals = dict_val.to_array()?; + + let sliced_key_buffer = buffer_gen.get_buffer(len); + + // assemble pieces together + let mut builder = ArrayData::builder(data_type) + .len(len) + .add_buffer(sliced_key_buffer); + builder = builder.add_child_data(dict_vals.to_data()); + Ok(Arc::new(DictionaryArray::::from( + builder.build().unwrap(), + ))) +} + +fn create_output_array( + key_buffer_cache: &mut ZeroBufferGenerators, + val: &ScalarValue, + len: usize, +) -> Result { + if let ScalarValue::Dictionary(key_type, dict_val) = &val { + match key_type.as_ref() { + DataType::Int8 => { + return create_dict_array( + &mut key_buffer_cache.gen_i8, + dict_val, + len, + val.data_type(), + ); + } + DataType::Int16 => { + return create_dict_array( + &mut key_buffer_cache.gen_i16, + dict_val, + len, + val.data_type(), + ); + } + DataType::Int32 => { + return create_dict_array( + &mut key_buffer_cache.gen_i32, + dict_val, + len, + val.data_type(), + ); + } + DataType::Int64 => { + return create_dict_array( + &mut key_buffer_cache.gen_i64, + dict_val, + len, + val.data_type(), + ); + } + DataType::UInt8 => { + return create_dict_array( + &mut key_buffer_cache.gen_u8, + dict_val, + len, + val.data_type(), + ); + } + DataType::UInt16 => { + return create_dict_array( + &mut key_buffer_cache.gen_u16, + dict_val, + len, + val.data_type(), + ); + } + DataType::UInt32 => { + return create_dict_array( + &mut key_buffer_cache.gen_u32, + dict_val, + len, + val.data_type(), + ); + } + DataType::UInt64 => { + return create_dict_array( + &mut key_buffer_cache.gen_u64, + dict_val, + len, + val.data_type(), + ); + } + _ => {} + } + } + + val.to_array_of_size(len) +} diff --git a/datafusion/catalog-listing/src/file_stream_part.rs b/datafusion/catalog-listing/src/file_stream_part.rs new file mode 100644 index 000000000000..570ca6678538 --- /dev/null +++ b/datafusion/catalog-listing/src/file_stream_part.rs @@ -0,0 +1,214 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A generic stream over file format readers that can be used by +//! any file format that read its files from start to end. +//! +//! Note: Most traits here need to be marked `Sync + Send` to be +//! compliant with the `SendableRecordBatchStream` trait. + +use crate::file_meta::FileMeta; +use datafusion_common::error::Result; +use datafusion_physical_plan::metrics::{ + Count, ExecutionPlanMetricsSet, MetricBuilder, Time, +}; + +use arrow::error::ArrowError; +use arrow::record_batch::RecordBatch; +use datafusion_common::instant::Instant; +use datafusion_common::ScalarValue; + +use futures::future::BoxFuture; +use futures::stream::BoxStream; + +/// A fallible future that resolves to a stream of [`RecordBatch`] +pub type FileOpenFuture = + BoxFuture<'static, Result>>>; + +/// Describes the behavior of the `FileStream` if file opening or scanning fails +pub enum OnError { + /// Fail the entire stream and return the underlying error + Fail, + /// Continue scanning, ignoring the failed file + Skip, +} + +impl Default for OnError { + fn default() -> Self { + Self::Fail + } +} + +/// Generic API for opening a file using an [`ObjectStore`] and resolving to a +/// stream of [`RecordBatch`] +/// +/// [`ObjectStore`]: object_store::ObjectStore +pub trait FileOpener: Unpin + Send + Sync { + /// Asynchronously open the specified file and return a stream + /// of [`RecordBatch`] + fn open(&self, file_meta: FileMeta) -> Result; +} + +/// Represents the state of the next `FileOpenFuture`. Since we need to poll +/// this future while scanning the current file, we need to store the result if it +/// is ready +pub enum NextOpen { + Pending(FileOpenFuture), + Ready(Result>>), +} + +pub enum FileStreamState { + /// The idle state, no file is currently being read + Idle, + /// Currently performing asynchronous IO to obtain a stream of RecordBatch + /// for a given file + Open { + /// A [`FileOpenFuture`] returned by [`FileOpener::open`] + future: FileOpenFuture, + /// The partition values for this file + partition_values: Vec, + }, + /// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`] + /// returned by [`FileOpener::open`] + Scan { + /// Partitioning column values for the current batch_iter + partition_values: Vec, + /// The reader instance + reader: BoxStream<'static, Result>, + /// A [`FileOpenFuture`] for the next file to be processed, + /// and its corresponding partition column values, if any. + /// This allows the next file to be opened in parallel while the + /// current file is read. + next: Option<(NextOpen, Vec)>, + }, + /// Encountered an error + Error, + /// Reached the row limit + Limit, +} + +/// A timer that can be started and stopped. +pub struct StartableTime { + pub metrics: Time, + // use for record each part cost time, will eventually add into 'metrics'. + pub start: Option, +} + +impl StartableTime { + pub fn start(&mut self) { + assert!(self.start.is_none()); + self.start = Some(Instant::now()); + } + + pub fn stop(&mut self) { + if let Some(start) = self.start.take() { + self.metrics.add_elapsed(start); + } + } +} + +#[allow(rustdoc::broken_intra_doc_links)] +/// Metrics for [`FileStream`] +/// +/// Note that all of these metrics are in terms of wall clock time +/// (not cpu time) so they include time spent waiting on I/O as well +/// as other operators. +/// +/// [`FileStream`]: +pub struct FileStreamMetrics { + /// Wall clock time elapsed for file opening. + /// + /// Time between when [`FileOpener::open`] is called and when the + /// [`FileStream`] receives a stream for reading. + /// + /// If there are multiple files being scanned, the stream + /// will open the next file in the background while scanning the + /// current file. This metric will only capture time spent opening + /// while not also scanning. + /// [`FileStream`]: + pub time_opening: StartableTime, + /// Wall clock time elapsed for file scanning + first record batch of decompression + decoding + /// + /// Time between when the [`FileStream`] requests data from the + /// stream and when the first [`RecordBatch`] is produced. + /// [`FileStream`]: + pub time_scanning_until_data: StartableTime, + /// Total elapsed wall clock time for scanning + record batch decompression / decoding + /// + /// Sum of time between when the [`FileStream`] requests data from + /// the stream and when a [`RecordBatch`] is produced for all + /// record batches in the stream. Note that this metric also + /// includes the time of the parent operator's execution. + pub time_scanning_total: StartableTime, + /// Wall clock time elapsed for data decompression + decoding + /// + /// Time spent waiting for the FileStream's input. + pub time_processing: StartableTime, + /// Count of errors opening file. + /// + /// If using `OnError::Skip` this will provide a count of the number of files + /// which were skipped and will not be included in the scan results. + pub file_open_errors: Count, + /// Count of errors scanning file + /// + /// If using `OnError::Skip` this will provide a count of the number of files + /// which were skipped and will not be included in the scan results. + pub file_scan_errors: Count, +} + +impl FileStreamMetrics { + pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + let time_opening = StartableTime { + metrics: MetricBuilder::new(metrics) + .subset_time("time_elapsed_opening", partition), + start: None, + }; + + let time_scanning_until_data = StartableTime { + metrics: MetricBuilder::new(metrics) + .subset_time("time_elapsed_scanning_until_data", partition), + start: None, + }; + + let time_scanning_total = StartableTime { + metrics: MetricBuilder::new(metrics) + .subset_time("time_elapsed_scanning_total", partition), + start: None, + }; + + let time_processing = StartableTime { + metrics: MetricBuilder::new(metrics) + .subset_time("time_elapsed_processing", partition), + start: None, + }; + + let file_open_errors = + MetricBuilder::new(metrics).counter("file_open_errors", partition); + + let file_scan_errors = + MetricBuilder::new(metrics).counter("file_scan_errors", partition); + + Self { + time_opening, + time_scanning_until_data, + time_scanning_total, + time_processing, + file_open_errors, + file_scan_errors, + } + } +} diff --git a/datafusion/catalog-listing/src/mod.rs b/datafusion/catalog-listing/src/mod.rs index 786c27acb95e..e56f589d31ed 100644 --- a/datafusion/catalog-listing/src/mod.rs +++ b/datafusion/catalog-listing/src/mod.rs @@ -20,7 +20,10 @@ pub mod file_compression_type; pub mod file_groups; +pub mod file_meta; +pub mod file_scan_config; pub mod file_sink_config; +pub mod file_stream_part; pub mod helpers; pub mod url; pub mod write; diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index b148c412c48e..6aa330caffab 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -265,8 +265,8 @@ impl FileSource for AvroSource { #[cfg(feature = "avro")] mod private { use super::*; - use crate::datasource::physical_plan::file_stream::{FileOpenFuture, FileOpener}; use crate::datasource::physical_plan::FileMeta; + use crate::datasource::physical_plan::{FileOpenFuture, FileOpener}; use bytes::Buf; use futures::StreamExt; diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index bfc2c1df8eab..5e017b992581 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -28,8 +28,8 @@ use crate::datasource::data_source::FileSource; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::file_format::{deserialize_stream, DecoderDeserializer}; use crate::datasource::listing::{FileRange, ListingTableUrl, PartitionedFile}; -use crate::datasource::physical_plan::file_stream::{FileOpenFuture, FileOpener}; use crate::datasource::physical_plan::FileMeta; +use crate::datasource::physical_plan::{FileOpenFuture, FileOpener}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index c714fad6e9c1..3708fe6abd5e 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -27,23 +27,15 @@ use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}; use crate::{error::Result, scalar::ScalarValue}; use std::any::Any; use std::fmt::Formatter; -use std::{ - borrow::Cow, collections::HashMap, fmt, fmt::Debug, marker::PhantomData, - mem::size_of, sync::Arc, vec, -}; +use std::{fmt, sync::Arc}; -use arrow::array::{ - ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch, RecordBatchOptions, -}; -use arrow::buffer::Buffer; -use arrow::datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::stats::Precision; -use datafusion_common::{ - exec_err, ColumnStatistics, Constraints, DataFusionError, Statistics, -}; +use datafusion_common::{ColumnStatistics, Constraints, Statistics}; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, Partitioning}; use crate::datasource::data_source::FileSource; +pub use datafusion_catalog_listing::file_scan_config::*; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_plan::display::{display_orderings, ProjectSchemaDisplay}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -52,7 +44,6 @@ use datafusion_physical_plan::projection::{ }; use datafusion_physical_plan::source::{DataSource, DataSourceExec}; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; -use log::warn; /// Convert type to a type suitable for use as a [`ListingTable`] /// partition column. Returns `Dictionary(UInt16, val_type)`, which is @@ -600,261 +591,13 @@ impl FileScanConfig { } } -/// A helper that projects partition columns into the file record batches. -/// -/// One interesting trick is the usage of a cache for the key buffers of the partition column -/// dictionaries. Indeed, the partition columns are constant, so the dictionaries that represent them -/// have all their keys equal to 0. This enables us to re-use the same "all-zero" buffer across batches, -/// which makes the space consumption of the partition columns O(batch_size) instead of O(record_count). -pub struct PartitionColumnProjector { - /// An Arrow buffer initialized to zeros that represents the key array of all partition - /// columns (partition columns are materialized by dictionary arrays with only one - /// value in the dictionary, thus all the keys are equal to zero). - key_buffer_cache: ZeroBufferGenerators, - /// Mapping between the indexes in the list of partition columns and the target - /// schema. Sorted by index in the target schema so that we can iterate on it to - /// insert the partition columns in the target record batch. - projected_partition_indexes: Vec<(usize, usize)>, - /// The schema of the table once the projection was applied. - projected_schema: SchemaRef, -} - -impl PartitionColumnProjector { - // Create a projector to insert the partitioning columns into batches read from files - // - `projected_schema`: the target schema with both file and partitioning columns - // - `table_partition_cols`: all the partitioning column names - pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self { - let mut idx_map = HashMap::new(); - for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() { - if let Ok(schema_idx) = projected_schema.index_of(partition_name) { - idx_map.insert(partition_idx, schema_idx); - } - } - - let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect(); - projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b)); - - Self { - projected_partition_indexes, - key_buffer_cache: Default::default(), - projected_schema, - } - } - - // Transform the batch read from the file by inserting the partitioning columns - // to the right positions as deduced from `projected_schema` - // - `file_batch`: batch read from the file, with internal projection applied - // - `partition_values`: the list of partition values, one for each partition column - pub fn project( - &mut self, - file_batch: RecordBatch, - partition_values: &[ScalarValue], - ) -> Result { - let expected_cols = - self.projected_schema.fields().len() - self.projected_partition_indexes.len(); - - if file_batch.columns().len() != expected_cols { - return exec_err!( - "Unexpected batch schema from file, expected {} cols but got {}", - expected_cols, - file_batch.columns().len() - ); - } - - let mut cols = file_batch.columns().to_vec(); - for &(pidx, sidx) in &self.projected_partition_indexes { - let p_value = - partition_values - .get(pidx) - .ok_or(DataFusionError::Execution( - "Invalid partitioning found on disk".to_string(), - ))?; - - let mut partition_value = Cow::Borrowed(p_value); - - // check if user forgot to dict-encode the partition value - let field = self.projected_schema.field(sidx); - let expected_data_type = field.data_type(); - let actual_data_type = partition_value.data_type(); - if let DataType::Dictionary(key_type, _) = expected_data_type { - if !matches!(actual_data_type, DataType::Dictionary(_, _)) { - warn!("Partition value for column {} was not dictionary-encoded, applied auto-fix.", field.name()); - partition_value = Cow::Owned(ScalarValue::Dictionary( - key_type.clone(), - Box::new(partition_value.as_ref().clone()), - )); - } - } - - cols.insert( - sidx, - create_output_array( - &mut self.key_buffer_cache, - partition_value.as_ref(), - file_batch.num_rows(), - )?, - ) - } - - RecordBatch::try_new_with_options( - Arc::clone(&self.projected_schema), - cols, - &RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())), - ) - .map_err(Into::into) - } -} - -#[derive(Debug, Default)] -struct ZeroBufferGenerators { - gen_i8: ZeroBufferGenerator, - gen_i16: ZeroBufferGenerator, - gen_i32: ZeroBufferGenerator, - gen_i64: ZeroBufferGenerator, - gen_u8: ZeroBufferGenerator, - gen_u16: ZeroBufferGenerator, - gen_u32: ZeroBufferGenerator, - gen_u64: ZeroBufferGenerator, -} - -/// Generate a arrow [`Buffer`] that contains zero values. -#[derive(Debug, Default)] -struct ZeroBufferGenerator -where - T: ArrowNativeType, -{ - cache: Option, - _t: PhantomData, -} - -impl ZeroBufferGenerator -where - T: ArrowNativeType, -{ - const SIZE: usize = size_of::(); - - fn get_buffer(&mut self, n_vals: usize) -> Buffer { - match &mut self.cache { - Some(buf) if buf.len() >= n_vals * Self::SIZE => { - buf.slice_with_length(0, n_vals * Self::SIZE) - } - _ => { - let mut key_buffer_builder = BufferBuilder::::new(n_vals); - key_buffer_builder.advance(n_vals); // keys are all 0 - self.cache.insert(key_buffer_builder.finish()).clone() - } - } - } -} - -fn create_dict_array( - buffer_gen: &mut ZeroBufferGenerator, - dict_val: &ScalarValue, - len: usize, - data_type: DataType, -) -> Result -where - T: ArrowNativeType, -{ - let dict_vals = dict_val.to_array()?; - - let sliced_key_buffer = buffer_gen.get_buffer(len); - - // assemble pieces together - let mut builder = ArrayData::builder(data_type) - .len(len) - .add_buffer(sliced_key_buffer); - builder = builder.add_child_data(dict_vals.to_data()); - Ok(Arc::new(DictionaryArray::::from( - builder.build().unwrap(), - ))) -} - -fn create_output_array( - key_buffer_cache: &mut ZeroBufferGenerators, - val: &ScalarValue, - len: usize, -) -> Result { - if let ScalarValue::Dictionary(key_type, dict_val) = &val { - match key_type.as_ref() { - DataType::Int8 => { - return create_dict_array( - &mut key_buffer_cache.gen_i8, - dict_val, - len, - val.data_type(), - ); - } - DataType::Int16 => { - return create_dict_array( - &mut key_buffer_cache.gen_i16, - dict_val, - len, - val.data_type(), - ); - } - DataType::Int32 => { - return create_dict_array( - &mut key_buffer_cache.gen_i32, - dict_val, - len, - val.data_type(), - ); - } - DataType::Int64 => { - return create_dict_array( - &mut key_buffer_cache.gen_i64, - dict_val, - len, - val.data_type(), - ); - } - DataType::UInt8 => { - return create_dict_array( - &mut key_buffer_cache.gen_u8, - dict_val, - len, - val.data_type(), - ); - } - DataType::UInt16 => { - return create_dict_array( - &mut key_buffer_cache.gen_u16, - dict_val, - len, - val.data_type(), - ); - } - DataType::UInt32 => { - return create_dict_array( - &mut key_buffer_cache.gen_u32, - dict_val, - len, - val.data_type(), - ); - } - DataType::UInt64 => { - return create_dict_array( - &mut key_buffer_cache.gen_u64, - dict_val, - len, - val.data_type(), - ); - } - _ => {} - } - } - - val.to_array_of_size(len) -} - #[cfg(test)] mod tests { - use arrow::array::Int32Array; - use super::*; use crate::datasource::physical_plan::ArrowSource; use crate::{test::columns, test_util::aggr_test_schema}; + use arrow::array::{Int32Array, RecordBatch}; + use std::collections::HashMap; #[test] fn physical_plan_config_no_projection() { diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index 497af101bee7..4d46c2006445 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -31,49 +31,17 @@ use crate::datasource::listing::PartitionedFile; use crate::datasource::physical_plan::file_scan_config::PartitionColumnProjector; use crate::datasource::physical_plan::{FileMeta, FileScanConfig}; use crate::error::Result; -use crate::physical_plan::metrics::{ - BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time, -}; +use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; use crate::physical_plan::RecordBatchStream; use arrow::datatypes::SchemaRef; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; -use datafusion_common::instant::Instant; +pub use datafusion_catalog_listing::file_stream_part::*; use datafusion_common::ScalarValue; -use futures::future::BoxFuture; -use futures::stream::BoxStream; use futures::{ready, FutureExt, Stream, StreamExt}; -/// A fallible future that resolves to a stream of [`RecordBatch`] -pub type FileOpenFuture = - BoxFuture<'static, Result>>>; - -/// Describes the behavior of the `FileStream` if file opening or scanning fails -pub enum OnError { - /// Fail the entire stream and return the underlying error - Fail, - /// Continue scanning, ignoring the failed file - Skip, -} - -impl Default for OnError { - fn default() -> Self { - Self::Fail - } -} - -/// Generic API for opening a file using an [`ObjectStore`] and resolving to a -/// stream of [`RecordBatch`] -/// -/// [`ObjectStore`]: object_store::ObjectStore -pub trait FileOpener: Unpin + Send + Sync { - /// Asynchronously open the specified file and return a stream - /// of [`RecordBatch`] - fn open(&self, file_meta: FileMeta) -> Result; -} - /// A stream that iterates record batch by record batch, file over file. pub struct FileStream { /// An iterator over input files. @@ -98,151 +66,6 @@ pub struct FileStream { on_error: OnError, } -/// Represents the state of the next `FileOpenFuture`. Since we need to poll -/// this future while scanning the current file, we need to store the result if it -/// is ready -enum NextOpen { - Pending(FileOpenFuture), - Ready(Result>>), -} - -enum FileStreamState { - /// The idle state, no file is currently being read - Idle, - /// Currently performing asynchronous IO to obtain a stream of RecordBatch - /// for a given file - Open { - /// A [`FileOpenFuture`] returned by [`FileOpener::open`] - future: FileOpenFuture, - /// The partition values for this file - partition_values: Vec, - }, - /// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`] - /// returned by [`FileOpener::open`] - Scan { - /// Partitioning column values for the current batch_iter - partition_values: Vec, - /// The reader instance - reader: BoxStream<'static, Result>, - /// A [`FileOpenFuture`] for the next file to be processed, - /// and its corresponding partition column values, if any. - /// This allows the next file to be opened in parallel while the - /// current file is read. - next: Option<(NextOpen, Vec)>, - }, - /// Encountered an error - Error, - /// Reached the row limit - Limit, -} - -/// A timer that can be started and stopped. -pub struct StartableTime { - pub(crate) metrics: Time, - // use for record each part cost time, will eventually add into 'metrics'. - pub(crate) start: Option, -} - -impl StartableTime { - pub(crate) fn start(&mut self) { - assert!(self.start.is_none()); - self.start = Some(Instant::now()); - } - - pub(crate) fn stop(&mut self) { - if let Some(start) = self.start.take() { - self.metrics.add_elapsed(start); - } - } -} - -/// Metrics for [`FileStream`] -/// -/// Note that all of these metrics are in terms of wall clock time -/// (not cpu time) so they include time spent waiting on I/O as well -/// as other operators. -struct FileStreamMetrics { - /// Wall clock time elapsed for file opening. - /// - /// Time between when [`FileOpener::open`] is called and when the - /// [`FileStream`] receives a stream for reading. - /// - /// If there are multiple files being scanned, the stream - /// will open the next file in the background while scanning the - /// current file. This metric will only capture time spent opening - /// while not also scanning. - pub time_opening: StartableTime, - /// Wall clock time elapsed for file scanning + first record batch of decompression + decoding - /// - /// Time between when the [`FileStream`] requests data from the - /// stream and when the first [`RecordBatch`] is produced. - pub time_scanning_until_data: StartableTime, - /// Total elapsed wall clock time for scanning + record batch decompression / decoding - /// - /// Sum of time between when the [`FileStream`] requests data from - /// the stream and when a [`RecordBatch`] is produced for all - /// record batches in the stream. Note that this metric also - /// includes the time of the parent operator's execution. - pub time_scanning_total: StartableTime, - /// Wall clock time elapsed for data decompression + decoding - /// - /// Time spent waiting for the FileStream's input. - pub time_processing: StartableTime, - /// Count of errors opening file. - /// - /// If using `OnError::Skip` this will provide a count of the number of files - /// which were skipped and will not be included in the scan results. - pub file_open_errors: Count, - /// Count of errors scanning file - /// - /// If using `OnError::Skip` this will provide a count of the number of files - /// which were skipped and will not be included in the scan results. - pub file_scan_errors: Count, -} - -impl FileStreamMetrics { - fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { - let time_opening = StartableTime { - metrics: MetricBuilder::new(metrics) - .subset_time("time_elapsed_opening", partition), - start: None, - }; - - let time_scanning_until_data = StartableTime { - metrics: MetricBuilder::new(metrics) - .subset_time("time_elapsed_scanning_until_data", partition), - start: None, - }; - - let time_scanning_total = StartableTime { - metrics: MetricBuilder::new(metrics) - .subset_time("time_elapsed_scanning_total", partition), - start: None, - }; - - let time_processing = StartableTime { - metrics: MetricBuilder::new(metrics) - .subset_time("time_elapsed_processing", partition), - start: None, - }; - - let file_open_errors = - MetricBuilder::new(metrics).counter("file_open_errors", partition); - - let file_scan_errors = - MetricBuilder::new(metrics).counter("file_scan_errors", partition); - - Self { - time_opening, - time_scanning_until_data, - time_scanning_total, - time_processing, - file_open_errors, - file_scan_errors, - } - } -} - impl FileStream { /// Create a new `FileStream` using the give `FileOpener` to scan underlying files pub fn new( diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 76cb657b0c5f..51e0a46d942e 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -27,8 +27,8 @@ use crate::datasource::data_source::FileSource; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::file_format::{deserialize_stream, DecoderDeserializer}; use crate::datasource::listing::{ListingTableUrl, PartitionedFile}; -use crate::datasource::physical_plan::file_stream::{FileOpenFuture, FileOpener}; use crate::datasource::physical_plan::FileMeta; +use crate::datasource::physical_plan::{FileOpenFuture, FileOpener}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index f08981605f2f..18174bd54e4f 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -51,6 +51,7 @@ pub use avro::AvroSource; pub use csv::{CsvExec, CsvExecBuilder}; pub use csv::{CsvOpener, CsvSource}; pub use datafusion_catalog_listing::file_groups::FileGroupPartitioner; +pub use datafusion_catalog_listing::file_meta::FileMeta; pub use datafusion_catalog_listing::file_sink_config::*; pub use file_scan_config::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, @@ -61,7 +62,7 @@ use futures::StreamExt; pub use json::NdJsonExec; pub use json::{JsonOpener, JsonSource}; use log::debug; -use object_store::{path::Path, GetOptions, GetRange, ObjectMeta, ObjectStore}; +use object_store::{path::Path, GetOptions, GetRange, ObjectStore}; use std::{ fmt::{Debug, Formatter, Result as FmtResult}, ops::Range, @@ -219,36 +220,6 @@ where Ok(()) } -/// A single file or part of a file that should be read, along with its schema, statistics -pub struct FileMeta { - /// Path for the file (e.g. URL, filesystem path, etc) - pub object_meta: ObjectMeta, - /// An optional file range for a more fine-grained parallel execution - pub range: Option, - /// An optional field for user defined per object metadata - pub extensions: Option>, - /// Size hint for the metadata of this file - pub metadata_size_hint: Option, -} - -impl FileMeta { - /// The full path to the object - pub fn location(&self) -> &Path { - &self.object_meta.location - } -} - -impl From for FileMeta { - fn from(object_meta: ObjectMeta) -> Self { - Self { - object_meta, - range: None, - extensions: None, - metadata_size_hint: None, - } - } -} - /// The various listing tables does not attempt to read all files /// concurrently, instead they will read files in sequence within a /// partition. This is an important property as it allows plans to @@ -490,6 +461,7 @@ mod tests { StringArray, UInt64Array, }; use arrow::datatypes::{DataType, Field, Schema}; + use object_store::ObjectMeta; use crate::datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapterFactory, From cc61f6aaf7d2958ab8e775f3cc93cb0c985c93b7 Mon Sep 17 00:00:00 2001 From: logan-keede Date: Wed, 12 Feb 2025 02:01:36 +0530 Subject: [PATCH 8/9] fix: forgotten Header --- .../catalog-listing/src/file_scan_config.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/datafusion/catalog-listing/src/file_scan_config.rs b/datafusion/catalog-listing/src/file_scan_config.rs index f4d5af130d37..bfddbc3a1fc4 100644 --- a/datafusion/catalog-listing/src/file_scan_config.rs +++ b/datafusion/catalog-listing/src/file_scan_config.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::{borrow::Cow, collections::HashMap, marker::PhantomData, sync::Arc}; use arrow::{ @@ -11,6 +28,7 @@ use arrow::{ use datafusion_common::{exec_err, Result}; use datafusion_common::{DataFusionError, ScalarValue}; use log::warn; + /// A helper that projects partition columns into the file record batches. /// /// One interesting trick is the usage of a cache for the key buffers of the partition column From 709e0f6b8cf259c1eb4d24bc4971fef628a2fb7b Mon Sep 17 00:00:00 2001 From: logan-keede Date: Wed, 12 Feb 2025 18:40:49 +0530 Subject: [PATCH 9/9] Tweaks --- .../src/{file_stream_part.rs => file_stream.rs} | 0 datafusion/catalog-listing/src/mod.rs | 2 +- datafusion/core/src/datasource/physical_plan/file_stream.rs | 5 ++++- 3 files changed, 5 insertions(+), 2 deletions(-) rename datafusion/catalog-listing/src/{file_stream_part.rs => file_stream.rs} (100%) diff --git a/datafusion/catalog-listing/src/file_stream_part.rs b/datafusion/catalog-listing/src/file_stream.rs similarity index 100% rename from datafusion/catalog-listing/src/file_stream_part.rs rename to datafusion/catalog-listing/src/file_stream.rs diff --git a/datafusion/catalog-listing/src/mod.rs b/datafusion/catalog-listing/src/mod.rs index e56f589d31ed..9eb79ec07ac8 100644 --- a/datafusion/catalog-listing/src/mod.rs +++ b/datafusion/catalog-listing/src/mod.rs @@ -23,7 +23,7 @@ pub mod file_groups; pub mod file_meta; pub mod file_scan_config; pub mod file_sink_config; -pub mod file_stream_part; +pub mod file_stream; pub mod helpers; pub mod url; pub mod write; diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index 4d46c2006445..c88d4c4458a5 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -37,7 +37,10 @@ use crate::physical_plan::RecordBatchStream; use arrow::datatypes::SchemaRef; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; -pub use datafusion_catalog_listing::file_stream_part::*; +pub use datafusion_catalog_listing::file_stream::{FileOpenFuture, FileOpener, OnError}; +use datafusion_catalog_listing::file_stream::{ + FileStreamMetrics, FileStreamState, NextOpen, +}; use datafusion_common::ScalarValue; use futures::{ready, FutureExt, Stream, StreamExt};