Skip to content

perf: Reuse row converter during sort #15302

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 30, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 88 additions & 12 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ use arrow::array::{
};
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn};
use arrow::datatypes::{DataType, SchemaRef};
use arrow::row::{RowConverter, SortField};
use datafusion_common::{internal_datafusion_err, internal_err, Result};
use arrow::row::{RowConverter, Rows, SortField};
use datafusion_common::{
exec_datafusion_err, internal_datafusion_err, internal_err, Result,
};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::runtime_env::RuntimeEnv;
Expand Down Expand Up @@ -203,6 +205,8 @@ struct ExternalSorter {
schema: SchemaRef,
/// Sort expressions
expr: Arc<[PhysicalSortExpr]>,
/// RowConverter corresponding to the sort expressions
sort_keys_row_converter: Arc<RowConverter>,
/// If Some, the maximum number of output rows that will be produced
fetch: Option<usize>,
/// The target number of rows for output batches
Expand Down Expand Up @@ -265,7 +269,7 @@ impl ExternalSorter {
sort_in_place_threshold_bytes: usize,
metrics: &ExecutionPlanMetricsSet,
runtime: Arc<RuntimeEnv>,
) -> Self {
) -> Result<Self> {
let metrics = ExternalSorterMetrics::new(metrics, partition_id);
let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
.with_can_spill(true)
Expand All @@ -275,19 +279,36 @@ impl ExternalSorter {
MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
.register(&runtime.memory_pool);

// Construct RowConverter for sort keys
let sort_fields = expr
.iter()
.map(|e| {
let data_type = e
.expr
.data_type(&schema)
.map_err(|e| e.context("Resolving sort expression data type"))?;
Ok(SortField::new_with_options(data_type, e.options))
})
.collect::<Result<Vec<_>>>()?;

let converter = RowConverter::new(sort_fields).map_err(|e| {
exec_datafusion_err!("Failed to create RowConverter: {:?}", e)
})?;

let spill_manager = SpillManager::new(
Arc::clone(&runtime),
metrics.spill_metrics.clone(),
Arc::clone(&schema),
);

Self {
Ok(Self {
schema,
in_mem_batches: vec![],
in_mem_batches_sorted: false,
in_progress_spill_file: None,
finished_spill_files: vec![],
expr: expr.into(),
sort_keys_row_converter: Arc::new(converter),
metrics,
fetch,
reservation,
Expand All @@ -297,7 +318,7 @@ impl ExternalSorter {
batch_size,
sort_spill_reservation_bytes,
sort_in_place_threshold_bytes,
}
})
}

/// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
Expand Down Expand Up @@ -723,15 +744,29 @@ impl ExternalSorter {

let fetch = self.fetch;
let expressions: LexOrdering = self.expr.iter().cloned().collect();
let stream = futures::stream::once(futures::future::lazy(move |_| {
let timer = metrics.elapsed_compute().timer();
let sorted = sort_batch(&batch, &expressions, fetch)?;
timer.done();
let row_converter = Arc::clone(&self.sort_keys_row_converter);
let stream = futures::stream::once(async move {
let _timer = metrics.elapsed_compute().timer();

let sort_columns = expressions
.iter()
.map(|expr| expr.evaluate_to_sort_column(&batch))
.collect::<Result<Vec<_>>>()?;

let sorted = if is_multi_column_with_lists(&sort_columns) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is took from sort_batch() and the same logic inside sort_batch() is kept unchanged. This is due to sort_batch() is a public interface and I want to avoid changing its behavior.
After we move to always using row format, we can clean it up by deprecating sort_batch()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes please

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a good heuristic is to use RowConverter for multi-column + no limit cases as documented in lexsort_to_indices

/// Note: for multi-column sorts without a limit, using the [row format](https://docs.rs/arrow-row/latest/arrow_row/)
/// may be significantly faster

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, now always using row format shows a mixed benchmark result, because converted rows are not reused during sorting and sort-preserving merging, and the conversion overhead outweighs the benefits.
We'd better do it as a follow-up with more benchmark analysis.

// lex_sort_to_indices doesn't support List with more than one column
// https://github.com/apache/arrow-rs/issues/5454
sort_batch_row_based(&batch, &expressions, row_converter, fetch)?
} else {
sort_batch(&batch, &expressions, fetch)?
};

metrics.record_output(sorted.num_rows());
drop(batch);
drop(reservation);
Ok(sorted)
}));
});

Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}

Expand Down Expand Up @@ -776,6 +811,45 @@ impl Debug for ExternalSorter {
}
}

/// Converts rows into a sorted array of indices based on their order.
/// This function returns the indices that represent the sorted order of the rows.
fn rows_to_indices(rows: Rows, limit: Option<usize>) -> Result<UInt32Array> {
let mut sort: Vec<_> = rows.iter().enumerate().collect();
sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));

let mut len = rows.num_rows();
if let Some(limit) = limit {
len = limit.min(len);
}
let indices =
UInt32Array::from_iter_values(sort.iter().take(len).map(|(i, _)| *i as u32));
Ok(indices)
}

/// Sorts a `RecordBatch` by converting its sort columns into Arrow Row Format for faster comparison.
fn sort_batch_row_based(
batch: &RecordBatch,
expressions: &LexOrdering,
row_converter: Arc<RowConverter>,
fetch: Option<usize>,
) -> Result<RecordBatch> {
let sort_columns = expressions
.iter()
.map(|expr| expr.evaluate_to_sort_column(batch).map(|col| col.values))
.collect::<Result<Vec<_>>>()?;
let rows = row_converter.convert_columns(&sort_columns)?;
let indices = rows_to_indices(rows, fetch)?;
let columns = take_arrays(batch.columns(), &indices, None)?;

let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));

Ok(RecordBatch::try_new_with_options(
batch.schema(),
columns,
&options,
)?)
}

pub fn sort_batch(
batch: &RecordBatch,
expressions: &LexOrdering,
Expand Down Expand Up @@ -838,7 +912,9 @@ pub(crate) fn lexsort_to_indices_multi_columns(
},
);

// TODO reuse converter and rows, refer to TopK.
// Note: row converter is reused through `sort_batch_row_based()`, this function
// is not used during normal sort execution, but it's kept temporarily because
// it's inside a public interface `sort_batch()`.
let converter = RowConverter::new(fields)?;
let rows = converter.convert_columns(&columns)?;
let mut sort: Vec<_> = rows.iter().enumerate().collect();
Expand Down Expand Up @@ -1154,7 +1230,7 @@ impl ExecutionPlan for SortExec {
execution_options.sort_in_place_threshold_bytes,
&self.metrics_set,
context.runtime_env(),
);
)?;
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
Expand Down