diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs index a257ccf09994..f2edacd90847 100644 --- a/datafusion/core/src/physical_plan/coalesce_batches.rs +++ b/datafusion/core/src/physical_plan/coalesce_batches.rs @@ -33,7 +33,7 @@ use crate::execution::context::TaskContext; use arrow::compute::kernels::concat::concat; use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; -use arrow::record_batch::RecordBatch; +use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -291,7 +291,11 @@ pub fn concat_batches( batches.len(), row_count ); - RecordBatch::try_new(schema.clone(), arrays) + + let mut options = RecordBatchOptions::default(); + options.row_count = Some(row_count); + + RecordBatch::try_new_with_options(schema.clone(), arrays, &options) } #[cfg(test)] diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 89609fada393..d58483484624 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -765,14 +765,22 @@ async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> Vec .unwrap(); let logical_schema = plan.schema(); + // We are not really interested in the direct output of optimized_logical_plan + // since the physical plan construction already optimizes the given logical plan + // and we want to avoid double-optimization as a consequence. So we just construct + // it here to make sure that it doesn't fail at this step and get the optimized + // schema (to assert later that the logical and optimized schemas are the same). let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan); - let plan = ctx + let optimized_logical_plan = ctx .optimize(&plan) .map_err(|e| format!("{:?} at {}", e, msg)) .unwrap(); - let optimized_logical_schema = plan.schema(); + let optimized_logical_schema = optimized_logical_plan.schema(); - let msg = format!("Creating physical plan for '{}': {:?}", sql, plan); + let msg = format!( + "Creating physical plan for '{}': {:?}", + sql, optimized_logical_plan + ); let plan = ctx .create_physical_plan(&plan) .await