diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 502ea3317adc..df1bd5c45923 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; -use std::vec; +use std::{mem, vec}; use crate::aggregates::group_values::{new_group_values, GroupValues}; use crate::aggregates::order::GroupOrderingFull; @@ -1066,13 +1066,11 @@ impl GroupedHashAggregateStream { sort_batch(&batch, expr.as_ref(), None) })), ))); - for spill in self.spill_state.spills.drain(..) { - let stream = self.spill_state.spill_manager.read_spill_as_stream(spill)?; - streams.push(stream); - } self.spill_state.is_stream_merging = true; self.input = StreamingMergeBuilder::new() .with_streams(streams) + .with_sorted_spill_files(mem::take(&mut self.spill_state.spills)) + .with_spill_manager(self.spill_state.spill_manager.clone()) .with_schema(schema) .with_expressions(self.spill_state.spill_expr.as_ref()) .with_metrics(self.baseline_metrics.clone()) diff --git a/datafusion/physical-plan/src/sorts/mod.rs b/datafusion/physical-plan/src/sorts/mod.rs index c7ffae4061c0..1caf602b8faa 100644 --- a/datafusion/physical-plan/src/sorts/mod.rs +++ b/datafusion/physical-plan/src/sorts/mod.rs @@ -20,6 +20,7 @@ mod builder; mod cursor; mod merge; +pub mod multi_level_sort_preserving_merge_stream; pub mod partial_sort; pub mod sort; pub mod sort_preserving_merge; diff --git a/datafusion/physical-plan/src/sorts/multi_level_sort_preserving_merge_stream.rs b/datafusion/physical-plan/src/sorts/multi_level_sort_preserving_merge_stream.rs new file mode 100644 index 000000000000..8f1497544768 --- /dev/null +++ b/datafusion/physical-plan/src/sorts/multi_level_sort_preserving_merge_stream.rs @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Merge that deals with an arbitrary size of spilled files. +//! This is an order-preserving merge. + +use crate::metrics::BaselineMetrics; +use crate::{EmptyRecordBatchStream, SpillManager}; +use arrow::array::RecordBatch; +use arrow::datatypes::SchemaRef; +use datafusion_common::{internal_err, Result}; +use datafusion_execution::memory_pool::MemoryReservation; +use std::mem; +use std::ops::Deref; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use crate::sorts::streaming_merge::StreamingMergeBuilder; +use crate::spill::in_progress_spill_file::InProgressSpillFile; +use crate::stream::RecordBatchStreamAdapter; +use datafusion_execution::disk_manager::RefCountedTempFile; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use futures::{Stream, StreamExt}; + +enum State { + /// Had an error + Aborted, + + /// Stream did not start yet or between passes + Uninitialized, + + /// In progress of merging multiple sorted streams + MultiLevel { + stream: SendableRecordBatchStream, + in_progress_file: InProgressSpillFile, + }, + + /// This is the last level of the merge, just pass through the stream + Passthrough(SendableRecordBatchStream), +} + +pub struct MultiLevelSortPreservingMergeStream { + schema: SchemaRef, + spill_manager: SpillManager, + sorted_spill_files: Vec, + sorted_streams: Vec, + expr: Arc, + metrics: BaselineMetrics, + batch_size: usize, + reservation: MemoryReservation, + fetch: Option, + enable_round_robin_tie_breaker: bool, + + /// The number of blocking threads to use for merging sorted streams + max_blocking_threads: usize, + + /// The current state of the stream + state: State, +} + +impl MultiLevelSortPreservingMergeStream { + #[allow(clippy::too_many_arguments)] + pub fn new( + spill_manager: SpillManager, + schema: SchemaRef, + sorted_spill_files: Vec, + sorted_streams: Vec, + expr: LexOrdering, + metrics: BaselineMetrics, + batch_size: usize, + reservation: MemoryReservation, + + max_blocking_threads: Option, + + fetch: Option, + enable_round_robin_tie_breaker: bool, + ) -> Result { + // TODO - add a check to see the actual number of available blocking threads + let max_blocking_threads = max_blocking_threads.unwrap_or(128); + + if max_blocking_threads <= 1 { + return internal_err!("max_blocking_threads must be greater than 1"); + } + + Ok(Self { + spill_manager, + schema, + sorted_spill_files, + sorted_streams, + expr: Arc::new(expr), + metrics, + batch_size, + reservation, + fetch, + state: State::Uninitialized, + enable_round_robin_tie_breaker, + max_blocking_threads, + }) + } + + fn created_sorted_stream(&mut self) -> Result { + let mut sorted_streams = mem::take(&mut self.sorted_streams); + + match (self.sorted_spill_files.len(), sorted_streams.len()) { + // No data so empty batch + (0, 0) => Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone( + &self.schema, + )))), + + // Only in-memory stream + (0, 1) => Ok(sorted_streams.into_iter().next().unwrap()), + + // Only single sorted spill file so stream it + (1, 0) => self + .spill_manager + .read_spill_as_stream(self.sorted_spill_files.drain(..).next().unwrap()), + + // Need to merge multiple streams + (_, _) => { + let sorted_spill_files_to_read = + self.sorted_spill_files.len().min(self.max_blocking_threads); + + for spill in self.sorted_spill_files.drain(..sorted_spill_files_to_read) { + let stream = self.spill_manager.read_spill_as_stream(spill)?; + sorted_streams.push(Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&self.schema), + stream, + ))); + } + + StreamingMergeBuilder::new() + .with_schema(Arc::clone(&self.schema)) + .with_expressions(self.expr.deref()) + .with_batch_size(self.batch_size) + .with_fetch(self.fetch) + .with_metrics(if self.sorted_spill_files.is_empty() { + // Only add the metrics to the last run + self.metrics.clone() + } else { + self.metrics.intermediate() + }) + .with_round_robin_tie_breaker(self.enable_round_robin_tie_breaker) + .with_streams(sorted_streams) + .with_reservation(self.reservation.new_empty()) + .build() + } + } + } + + fn poll_next_inner( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + loop { + match &mut self.state { + State::Aborted => return Poll::Ready(None), + State::Uninitialized => { + let stream = self.created_sorted_stream()?; + + if self.sorted_spill_files.is_empty() { + self.state = State::Passthrough(stream); + } else { + let in_progress_file = + self.spill_manager.create_in_progress_file("spill")?; + + self.state = State::MultiLevel { + stream, + in_progress_file, + }; + } + } + State::MultiLevel { + stream, + in_progress_file, + } => { + 'write_sorted_run: loop { + match futures::ready!(stream.poll_next_unpin(cx)) { + // This stream is finished. + None => { + // finish the file and add it to the sorted spill files + if let Some(sorted_spill_file) = + in_progress_file.finish()? + { + self.sorted_spill_files.push(sorted_spill_file); + } + + // Reset the state to create a stream from the current sorted spill files + self.state = State::Uninitialized; + + break 'write_sorted_run; + } + Some(Err(e)) => { + self.state = State::Aborted; + + // Abort + return Poll::Ready(Some(Err(e))); + } + Some(Ok(batch)) => { + // Got a batch, write it to file + in_progress_file.append_batch(&batch)?; + } + } + } + } + + // Last + State::Passthrough(s) => return s.poll_next_unpin(cx), + } + } + } +} + +impl Stream for MultiLevelSortPreservingMergeStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.poll_next_inner(cx) + } +} + +impl RecordBatchStream for MultiLevelSortPreservingMergeStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index c0bae7761370..075d87dbab7d 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -20,9 +20,9 @@ //! but spills to disk if needed. use std::any::Any; -use std::fmt; use std::fmt::{Debug, Formatter}; use std::sync::Arc; +use std::{fmt, mem}; use crate::common::spawn_buffered; use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; @@ -355,8 +355,6 @@ impl ExternalSorter { self.merge_reservation.free(); if self.spilled_before() { - let mut streams = vec![]; - // Sort `in_mem_batches` and spill it first. If there are many // `in_mem_batches` and the memory limit is almost reached, merging // them with the spilled files at the same time might cause OOM. @@ -364,18 +362,11 @@ impl ExternalSorter { self.sort_and_spill_in_mem_batches().await?; } - for spill in self.finished_spill_files.drain(..) { - if !spill.path().exists() { - return internal_err!("Spill file {:?} does not exist", spill.path()); - } - let stream = self.spill_manager.read_spill_as_stream(spill)?; - streams.push(stream); - } - let expressions: LexOrdering = self.expr.iter().cloned().collect(); StreamingMergeBuilder::new() - .with_streams(streams) + .with_spill_manager(self.spill_manager.clone()) + .with_sorted_spill_files(self.finished_spill_files.drain(..).collect()) .with_schema(Arc::clone(&self.schema)) .with_expressions(expressions.as_ref()) .with_metrics(self.metrics.baseline.clone()) @@ -428,7 +419,7 @@ impl ExternalSorter { debug!("Spilling sort data of ExternalSorter to disk whilst inserting"); - let batches_to_spill = std::mem::take(globally_sorted_batches); + let batches_to_spill = mem::take(globally_sorted_batches); self.reservation.free(); let in_progress_file = self.in_progress_spill_file.as_mut().ok_or_else(|| { @@ -683,7 +674,7 @@ impl ExternalSorter { return self.sort_batch_stream(batch, metrics, reservation); } - let streams = std::mem::take(&mut self.in_mem_batches) + let streams = mem::take(&mut self.in_mem_batches) .into_iter() .map(|batch| { let metrics = self.metrics.baseline.intermediate(); diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index 3f022ec6095a..8c47fc991fac 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -19,14 +19,16 @@ //! This is an order-preserving merge. use crate::metrics::BaselineMetrics; +use crate::sorts::multi_level_sort_preserving_merge_stream::MultiLevelSortPreservingMergeStream; use crate::sorts::{ merge::SortPreservingMergeStream, stream::{FieldCursorStream, RowCursorStream}, }; -use crate::SendableRecordBatchStream; +use crate::{SendableRecordBatchStream, SpillManager}; use arrow::array::*; use arrow::datatypes::{DataType, SchemaRef}; use datafusion_common::{internal_err, Result}; +use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::MemoryReservation; use datafusion_physical_expr_common::sort_expr::LexOrdering; @@ -61,6 +63,10 @@ pub struct StreamingMergeBuilder<'a> { fetch: Option, reservation: Option, enable_round_robin_tie_breaker: bool, + + spill_manager: Option, + sorted_spill_files: Vec, + max_blocking_threads: Option, } impl Default for StreamingMergeBuilder<'_> { @@ -74,6 +80,9 @@ impl Default for StreamingMergeBuilder<'_> { fetch: None, reservation: None, enable_round_robin_tie_breaker: false, + spill_manager: None, + sorted_spill_files: vec![], + max_blocking_threads: None, } } } @@ -133,6 +142,24 @@ impl<'a> StreamingMergeBuilder<'a> { self } + pub fn with_spill_manager(mut self, spill_manager: SpillManager) -> Self { + self.spill_manager = Some(spill_manager); + self + } + + pub fn with_sorted_spill_files( + mut self, + sorted_spill_files: Vec, + ) -> Self { + self.sorted_spill_files = sorted_spill_files; + self + } + + pub fn with_max_blocking_threads(mut self, max_blocking_threads: usize) -> Self { + self.max_blocking_threads = Some(max_blocking_threads); + self + } + pub fn build(self) -> Result { let Self { streams, @@ -143,8 +170,27 @@ impl<'a> StreamingMergeBuilder<'a> { fetch, expressions, enable_round_robin_tie_breaker, + spill_manager, + sorted_spill_files, + max_blocking_threads, } = self; + if spill_manager.is_some() && !sorted_spill_files.is_empty() { + return Ok(Box::pin(MultiLevelSortPreservingMergeStream::new( + spill_manager.unwrap(), + schema.clone().unwrap(), + sorted_spill_files, + streams, + expressions.clone(), + metrics.clone().unwrap(), + batch_size.unwrap(), + reservation.unwrap(), + max_blocking_threads, + fetch, + enable_round_robin_tie_breaker, + )?)); + } + // Early return if streams or expressions are empty let checks = [ ( @@ -155,6 +201,10 @@ impl<'a> StreamingMergeBuilder<'a> { expressions.is_empty(), "Sort expressions cannot be empty for streaming merge", ), + ( + !sorted_spill_files.is_empty(), + "Sorted spill files cannot be used with streaming merge", + ), ]; if let Some((_, error_message)) = checks.iter().find(|(condition, _)| *condition)