diff --git a/Cargo.lock b/Cargo.lock index c708c516ab36..c9cce17dd3c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -246,9 +246,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc208515aa0151028e464cc94a692156e945ce5126abd3537bb7fd6ba2143ed1" +checksum = "84ef243634a39fb6e9d1710737e7a5ef96c9bacabd2326859ff889bc9ef755e5" dependencies = [ "arrow-arith", "arrow-array", @@ -270,9 +270,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e07e726e2b3f7816a85c6a45b6ec118eeeabf0b2a8c208122ad949437181f49a" +checksum = "8f420c6aef51dad2e4a96ce29c0ec90ad84880bdb60b321c74c652a6be07b93f" dependencies = [ "arrow-array", "arrow-buffer", @@ -284,9 +284,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2262eba4f16c78496adfd559a29fe4b24df6088efc9985a873d58e92be022d5" +checksum = "24bda5ff6461a4ff9739959b3d57b377f45e3f878f7be1a4f28137c0a8f339fa" dependencies = [ "ahash 0.8.11", "arrow-buffer", @@ -301,9 +301,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e899dade2c3b7f5642eb8366cfd898958bcca099cde6dfea543c7e8d3ad88d4" +checksum = "bc6ed265c73f134a583d02c3cab5e16afab9446d8048ede8707e31f85fad58a0" dependencies = [ "bytes", "half", @@ -312,9 +312,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4103d88c5b441525ed4ac23153be7458494c2b0c9a11115848fdb9b81f6f886a" +checksum = "01c648572391edcef10e5fd458db70ba27ed6f71bcaee04397d0cfb100b34f8b" dependencies = [ "arrow-array", "arrow-buffer", @@ -333,9 +333,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43d3cb0914486a3cae19a5cad2598e44e225d53157926d0ada03c20521191a65" +checksum = "a02fb265a6d8011a7d3ad1a36f25816ad0a3bb04cb8e9fe7929c165b98c0cbcd" dependencies = [ "arrow-array", "arrow-cast", @@ -349,9 +349,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a329fb064477c9ec5f0870d2f5130966f91055c7c5bce2b3a084f116bc28c3b" +checksum = "5f2cebf504bb6a92a134a87fff98f01b14fbb3a93ecf7aef90cd0f888c5fffa4" dependencies = [ "arrow-buffer", "arrow-schema", @@ -361,9 +361,9 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7408f2bf3b978eddda272c7699f439760ebc4ac70feca25fefa82c5b8ce808d" +checksum = "aecab6e2ed46c553354ffbc6cb76ac3eaa61dc07bd13d7de8ece233933bcf679" dependencies = [ "arrow-arith", "arrow-array", @@ -388,9 +388,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddecdeab02491b1ce88885986e25002a3da34dd349f682c7cfe67bab7cc17b86" +checksum = "8e6405b287671c88846e7751f7291f717b164911474cabac6d3d8614d5aa7374" dependencies = [ "arrow-array", "arrow-buffer", @@ -402,9 +402,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d03b9340013413eb84868682ace00a1098c81a5ebc96d279f7ebf9a4cac3c0fd" +checksum = "5329bf9e7390cbb6b117ddd4d82e94c5362ea4cab5095697139429f36a38350c" dependencies = [ "arrow-array", "arrow-buffer", @@ -415,16 +415,18 @@ dependencies = [ "half", "indexmap 2.8.0", "lexical-core", + "memchr", "num", "serde", "serde_json", + "simdutf8", ] [[package]] name = "arrow-ord" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f841bfcc1997ef6ac48ee0305c4dfceb1f7c786fe31e67c1186edf775e1f1160" +checksum = "e103c13d4b80da28339c1d7aa23dd85bd59f42158acc45d39eeb6770627909ce" dependencies = [ "arrow-array", "arrow-buffer", @@ -435,9 +437,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1eeb55b0a0a83851aa01f2ca5ee5648f607e8506ba6802577afdda9d75cdedcd" +checksum = "170549a11b8534f3097a0619cfe89c42812345dc998bcf81128fc700b84345b8" dependencies = [ "arrow-array", "arrow-buffer", @@ -448,9 +450,9 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85934a9d0261e0fa5d4e2a5295107d743b543a6e0484a835d4b8db2da15306f9" +checksum = "a5c53775bba63f319189f366d2b86e9a8889373eb198f07d8544938fc9f8ed9a" dependencies = [ "bitflags 2.8.0", "serde", @@ -458,9 +460,9 @@ dependencies = [ [[package]] name = "arrow-select" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e2932aece2d0c869dd2125feb9bd1709ef5c445daa3838ac4112dcfa0fda52c" +checksum = "0a99003b2eb562b8d9c99dfb672306f15e94b20d3734179d596895703e821dcf" dependencies = [ "ahash 0.8.11", "arrow-array", @@ -472,9 +474,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "912e38bd6a7a7714c1d9b61df80315685553b7455e8a6045c27531d8ecd5b458" +checksum = "90fdb130ee8325f4cd8262e19bb6baa3cbcef2b2573c4bee8c6fda7ea08199d7" dependencies = [ "arrow-array", "arrow-buffer", @@ -1361,9 +1363,9 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "chrono" -version = "0.4.39" +version = "0.4.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" +checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c" dependencies = [ "android-tzdata", "iana-time-zone", @@ -1371,7 +1373,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.6", + "windows-link", ] [[package]] @@ -2489,6 +2491,7 @@ dependencies = [ "rand 0.8.5", "rstest", "rstest_reuse", + "tempfile", "tokio", ] @@ -4363,9 +4366,9 @@ dependencies = [ [[package]] name = "parquet" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f88838dca3b84d41444a0341b19f347e8098a3898b0f21536654b8b799e11abd" +checksum = "94243778210509a5a5e9e012872127180c155d73a9cd6e2df9243d213e81e100" dependencies = [ "ahash 0.8.11", "arrow-array", @@ -4395,7 +4398,6 @@ dependencies = [ "tokio", "twox-hash", "zstd", - "zstd-sys", ] [[package]] @@ -7125,6 +7127,12 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "windows-link" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38" + [[package]] name = "windows-registry" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index d26446c11167..a930174e2a30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,19 +87,19 @@ ahash = { version = "0.8", default-features = false, features = [ "runtime-rng", ] } apache-avro = { version = "0.17", default-features = false } -arrow = { version = "54.2.1", features = [ +arrow = { version = "54.3.0", features = [ "prettyprint", "chrono-tz", ] } -arrow-buffer = { version = "54.1.0", default-features = false } -arrow-flight = { version = "54.2.1", features = [ +arrow-buffer = { version = "54.3.0", default-features = false } +arrow-flight = { version = "54.3.0", features = [ "flight-sql-experimental", ] } -arrow-ipc = { version = "54.2.0", default-features = false, features = [ +arrow-ipc = { version = "54.3.0", default-features = false, features = [ "lz4", ] } -arrow-ord = { version = "54.1.0", default-features = false } -arrow-schema = { version = "54.1.0", default-features = false } +arrow-ord = { version = "54.3.0", default-features = false } +arrow-schema = { version = "54.3.0", default-features = false } async-trait = "0.1.88" bigdecimal = "0.4.7" bytes = "1.10" @@ -149,7 +149,7 @@ itertools = "0.14" log = "^0.4" object_store = { version = "0.11.0", default-features = false } parking_lot = "0.12" -parquet = { version = "54.2.1", default-features = false, features = [ +parquet = { version = "54.3.0", default-features = false, features = [ "arrow", "async", "object_store", diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 1f38e2ed3126..5210ee26755c 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -72,6 +72,7 @@ insta = { workspace = true } rand = { workspace = true } rstest = { workspace = true } rstest_reuse = "0.7.0" +tempfile = "3.19.1" tokio = { workspace = true, features = [ "rt-multi-thread", "fs", @@ -81,3 +82,7 @@ tokio = { workspace = true, features = [ [[bench]] harness = false name = "partial_ordering" + +[[bench]] +harness = false +name = "spill_io" diff --git a/datafusion/physical-plan/benches/spill_io.rs b/datafusion/physical-plan/benches/spill_io.rs new file mode 100644 index 000000000000..3b877671ad58 --- /dev/null +++ b/datafusion/physical-plan/benches/spill_io.rs @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + Date32Builder, Decimal128Builder, Int32Builder, RecordBatch, StringBuilder, +}; +use arrow::datatypes::{DataType, Field, Schema}; +use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion}; +use datafusion_execution::runtime_env::RuntimeEnv; +use datafusion_physical_plan::common::collect; +use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics}; +use datafusion_physical_plan::SpillManager; +use std::sync::Arc; +use tokio::runtime::Runtime; + +pub fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("c0", DataType::Int32, true), + Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Date32, true), + Field::new("c3", DataType::Decimal128(11, 2), true), + ])); + + let mut a = Int32Builder::new(); + let mut b = StringBuilder::new(); + let mut c = Date32Builder::new(); + let mut d = Decimal128Builder::new() + .with_precision_and_scale(11, 2) + .unwrap(); + + for i in 0..num_rows { + a.append_value(i as i32); + c.append_value(i as i32); + d.append_value((i * 1000000) as i128); + if allow_nulls && i % 10 == 0 { + b.append_null(); + } else { + b.append_value(format!("this is string number {i}")); + } + } + + let a = a.finish(); + let b = b.finish(); + let c = c.finish(); + let d = d.finish(); + + RecordBatch::try_new( + schema.clone(), + vec![Arc::new(a), Arc::new(b), Arc::new(c), Arc::new(d)], + ) + .unwrap() +} + +// BENCHMARK: REVALIDATION OVERHEAD COMPARISON +// --------------------------------------------------------- +// To compare performance with/without Arrow IPC validation: +// +// 1. Locate the function `read_spill` +// 2. Modify the `skip_validation` flag: +// - Set to `false` to enable validation +// 3. Rerun `cargo bench --bench spill_io` +fn bench_spill_io(c: &mut Criterion) { + let env = Arc::new(RuntimeEnv::default()); + let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let schema = Arc::new(Schema::new(vec![ + Field::new("c0", DataType::Int32, true), + Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Date32, true), + Field::new("c3", DataType::Decimal128(11, 2), true), + ])); + let spill_manager = SpillManager::new(env, metrics, schema); + + let mut group = c.benchmark_group("spill_io"); + let rt = Runtime::new().unwrap(); + + group.bench_with_input( + BenchmarkId::new("StreamReader/read_100", ""), + &spill_manager, + |b, spill_manager| { + b.iter_batched( + // Setup phase: Create fresh state for each benchmark iteration. + // - generate an ipc file. + // This ensures each iteration starts with clean resources. + || { + let batch = create_batch(8192, true); + spill_manager + .spill_record_batch_and_finish(&vec![batch; 100], "Test") + .unwrap() + .unwrap() + }, + // Benchmark phase: + // - Execute the read operation via SpillManager + // - Wait for the consumer to finish processing + |spill_file| { + rt.block_on(async { + let stream = + spill_manager.read_spill_as_stream(spill_file).unwrap(); + let _ = collect(stream).await.unwrap(); + }) + }, + BatchSize::LargeInput, + ) + }, + ); + group.finish(); +} + +criterion_group!(benches, bench_spill_io); +criterion_main!(benches); diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 04fbd06fabcd..b256e615b232 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -50,6 +50,7 @@ pub use crate::ordering::InputOrderMode; pub use crate::stream::EmptyRecordBatchStream; pub use crate::topk::TopK; pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor}; +pub use spill::spill_manager::SpillManager; mod ordering; mod render_tree; diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index 88bf7953daeb..c32711a96b97 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -35,7 +35,10 @@ use datafusion_common::{exec_datafusion_err, HashSet, Result}; fn read_spill(sender: Sender>, path: &Path) -> Result<()> { let file = BufReader::new(File::open(path)?); - let reader = StreamReader::try_new(file, None)?; + // SAFETY: DataFusion's spill writer strictly follows Arrow IPC specifications + // with validated schemas and buffers. Skip redundant validation during read + // to speedup read operation. This is safe for DataFusion as input guaranteed to be correct when written. + let reader = unsafe { StreamReader::try_new(file, None)?.with_skip_validation(true) }; for batch in reader { sender .blocking_send(batch.map_err(Into::into)) diff --git a/datafusion/sqllogictest/test_files/dates.slt b/datafusion/sqllogictest/test_files/dates.slt index 4425eee33373..148f0dfe64bb 100644 --- a/datafusion/sqllogictest/test_files/dates.slt +++ b/datafusion/sqllogictest/test_files/dates.slt @@ -183,7 +183,7 @@ query error input contains invalid characters SELECT to_date('2020-09-08 12/00/00+00:00', '%c', '%+') # to_date with broken formatting -query error bad or unsupported format string +query error DataFusion error: Execution error: Error parsing timestamp from '2020\-09\-08 12/00/00\+00:00' using format '%q': trailing input SELECT to_date('2020-09-08 12/00/00+00:00', '%q') statement ok diff --git a/datafusion/sqllogictest/test_files/timestamps.slt b/datafusion/sqllogictest/test_files/timestamps.slt index e042e3863b69..16168eeae4a1 100644 --- a/datafusion/sqllogictest/test_files/timestamps.slt +++ b/datafusion/sqllogictest/test_files/timestamps.slt @@ -2268,23 +2268,23 @@ query error input contains invalid characters SELECT to_timestamp_seconds('2020-09-08 12/00/00+00:00', '%c', '%+') # to_timestamp with broken formatting -query error bad or unsupported format string +query error DataFusion error: Execution error: Error parsing timestamp from '2020\-09\-08 12/00/00\+00:00' using format '%q': trailing input SELECT to_timestamp('2020-09-08 12/00/00+00:00', '%q') # to_timestamp_nanos with broken formatting -query error bad or unsupported format string +query error DataFusion error: Execution error: Error parsing timestamp from '2020\-09\-08 12/00/00\+00:00' using format '%q': trailing input SELECT to_timestamp_nanos('2020-09-08 12/00/00+00:00', '%q') # to_timestamp_millis with broken formatting -query error bad or unsupported format string +query error DataFusion error: Execution error: Error parsing timestamp from '2020\-09\-08 12/00/00\+00:00' using format '%q': trailing input SELECT to_timestamp_millis('2020-09-08 12/00/00+00:00', '%q') # to_timestamp_micros with broken formatting -query error bad or unsupported format string +query error DataFusion error: Execution error: Error parsing timestamp from '2020\-09\-08 12/00/00\+00:00' using format '%q': trailing input SELECT to_timestamp_micros('2020-09-08 12/00/00+00:00', '%q') # to_timestamp_seconds with broken formatting -query error bad or unsupported format string +query error DataFusion error: Execution error: Error parsing timestamp from '2020\-09\-08 12/00/00\+00:00' using format '%q': trailing input SELECT to_timestamp_seconds('2020-09-08 12/00/00+00:00', '%q') # Create string timestamp table with different formats