Skip to content

Commit 6de0796

Browse files
authored
Ensure the row count is preserved when coalescing over empty records (#3439)
* Ensure the row count is preserved when coalescing over empty records * Explain the reasoning for not using optimized_plan in tests
1 parent 69d05aa commit 6de0796

File tree

2 files changed

+17
-5
lines changed

2 files changed

+17
-5
lines changed

datafusion/core/src/physical_plan/coalesce_batches.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::execution::context::TaskContext;
3333
use arrow::compute::kernels::concat::concat;
3434
use arrow::datatypes::SchemaRef;
3535
use arrow::error::Result as ArrowResult;
36-
use arrow::record_batch::RecordBatch;
36+
use arrow::record_batch::{RecordBatch, RecordBatchOptions};
3737
use futures::stream::{Stream, StreamExt};
3838
use log::trace;
3939

@@ -291,7 +291,11 @@ pub fn concat_batches(
291291
batches.len(),
292292
row_count
293293
);
294-
RecordBatch::try_new(schema.clone(), arrays)
294+
295+
let mut options = RecordBatchOptions::default();
296+
options.row_count = Some(row_count);
297+
298+
RecordBatch::try_new_with_options(schema.clone(), arrays, &options)
295299
}
296300

297301
#[cfg(test)]

datafusion/core/tests/sql/mod.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -770,14 +770,22 @@ async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> Vec<RecordBatch>
770770
.unwrap();
771771
let logical_schema = plan.schema();
772772

773+
// We are not really interested in the direct output of optimized_logical_plan
774+
// since the physical plan construction already optimizes the given logical plan
775+
// and we want to avoid double-optimization as a consequence. So we just construct
776+
// it here to make sure that it doesn't fail at this step and get the optimized
777+
// schema (to assert later that the logical and optimized schemas are the same).
773778
let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan);
774-
let plan = ctx
779+
let optimized_logical_plan = ctx
775780
.optimize(&plan)
776781
.map_err(|e| format!("{:?} at {}", e, msg))
777782
.unwrap();
778-
let optimized_logical_schema = plan.schema();
783+
let optimized_logical_schema = optimized_logical_plan.schema();
779784

780-
let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
785+
let msg = format!(
786+
"Creating physical plan for '{}': {:?}",
787+
sql, optimized_logical_plan
788+
);
781789
let plan = ctx
782790
.create_physical_plan(&plan)
783791
.await

0 commit comments

Comments
 (0)