Skip to content

Commit c5f9930

Browse files
committed
logging
1 parent b2622a4 commit c5f9930

File tree

8 files changed

+47
-6
lines changed

8 files changed

+47
-6
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1322,6 +1322,7 @@ impl DefaultPhysicalPlanner {
13221322
}
13231323
LogicalPlan::RecursiveQuery(RecursiveQuery { name, static_term, recursive_term, is_distinct }) => {
13241324
let static_term = self.create_initial_plan(static_term, session_state).await?;
1325+
println!("recursiving term: {:?}", recursive_term);
13251326
let recursive_term = self.create_initial_plan(recursive_term, session_state).await?;
13261327

13271328
Ok(Arc::new(RecursiveQueryExec::new(name.clone(), static_term, recursive_term, *is_distinct)))
@@ -1989,6 +1990,7 @@ impl DefaultPhysicalPlanner {
19891990
F: FnMut(&dyn ExecutionPlan, &dyn PhysicalOptimizerRule),
19901991
{
19911992
let optimizers = session_state.physical_optimizers();
1993+
println!("optimizers: {}", optimizers.len());
19921994
debug!(
19931995
"Input physical plan:\n{}\n",
19941996
displayable(plan.as_ref()).indent(false)

datafusion/execution/src/task.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::{
3333
runtime_env::{RuntimeConfig, RuntimeEnv},
3434
};
3535

36-
use arrow::record_batch::RecordBatch;
36+
use arrow::{compute::kernels::partition, record_batch::RecordBatch};
3737
// use futures::channel::mpsc::Receiver as SingleChannelReceiver;
3838
use tokio::sync::mpsc::Receiver as SingleChannelReceiver;
3939
// use futures::lock::Mutex;
@@ -65,6 +65,8 @@ pub struct TaskContext {
6565
window_functions: HashMap<String, Arc<WindowUDF>>,
6666
/// Runtime environment associated with this task context
6767
runtime: Arc<RuntimeEnv>,
68+
// TODO: to remove, this is only for testing
69+
active_partition: Mutex<Option<usize>>,
6870
/// Registered relation handlers
6971
relation_handlers: Mutex<HashMap<String, RelationHandler>>,
7072
}
@@ -84,6 +86,7 @@ impl Default for TaskContext {
8486
window_functions: HashMap::new(),
8587
runtime: Arc::new(runtime),
8688
relation_handlers: Mutex::new(HashMap::new()),
89+
active_partition: Mutex::new(None),
8790
}
8891
}
8992
}
@@ -112,6 +115,19 @@ impl TaskContext {
112115
window_functions,
113116
runtime,
114117
relation_handlers: Mutex::new(HashMap::new()),
118+
active_partition: Mutex::new(None),
119+
}
120+
}
121+
122+
pub fn set_and_increment_partition(&self) -> usize {
123+
let mut partition_guard = self.active_partition.lock();
124+
if let Some(ref mut partition) = &mut *partition_guard {
125+
let prev = *partition;
126+
*partition += 1;
127+
prev
128+
} else {
129+
*partition_guard = Some(1);
130+
0
115131
}
116132
}
117133

datafusion/physical-expr/src/expressions/binary.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use arrow::compute::kernels::comparison::regexp_is_match_utf8_scalar;
3838
use arrow::compute::kernels::concat_elements::concat_elements_utf8;
3939
use arrow::datatypes::*;
4040
use arrow::record_batch::RecordBatch;
41+
use arrow::util::pretty::print_batches;
4142
use arrow_array::Datum;
4243
use datafusion_common::cast::as_boolean_array;
4344
use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
@@ -292,6 +293,13 @@ impl PhysicalExpr for BinaryExpr {
292293
let schema = batch.schema();
293294
let input_schema = schema.as_ref();
294295

296+
println!(
297+
"evaluate binary on batch left {:?}, right {:?}, operator {:?}",
298+
left_value, right_value, self.op
299+
);
300+
301+
print_batches(&[batch.clone()]).unwrap();
302+
295303
if self.is_datum_operator() {
296304
return match (&left_value, &right_value) {
297305
(ColumnarValue::Array(left), ColumnarValue::Array(right)) => {

datafusion/physical-plan/src/coalesce_batches.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
159159
partition: usize,
160160
context: Arc<TaskContext>,
161161
) -> Result<SendableRecordBatchStream> {
162+
println!("CoalesceBatchesExec::execute partition={}", partition);
162163
Ok(Box::pin(CoalesceBatchesStream {
163164
input: self.input.execute(partition, context)?,
164165
schema: self.input.schema(),

datafusion/physical-plan/src/continuance.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ impl ExecutionPlan for ContinuanceExec {
102102
}
103103

104104
fn output_partitioning(&self) -> Partitioning {
105-
Partitioning::UnknownPartitioning(0)
105+
Partitioning::UnknownPartitioning(1)
106106
}
107107

108108
fn maintains_input_order(&self) -> Vec<bool> {

datafusion/physical-plan/src/recursive_query.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use super::{
4141
metrics::{ExecutionPlanMetricsSet, MetricsSet},
4242
SendableRecordBatchStream, Statistics,
4343
};
44-
use arrow::error::{ArrowError, Result as ArrowResult};
44+
use arrow::error::ArrowError;
4545
use tokio::sync::mpsc::{Receiver, Sender};
4646

4747
// use crate::execution::context::TaskContext;
@@ -150,6 +150,7 @@ impl ExecutionPlan for RecursiveQueryExec {
150150
partition: usize,
151151
context: Arc<TaskContext>,
152152
) -> Result<SendableRecordBatchStream> {
153+
println!("recursive query exec execute");
153154
// All partitions must be coalesced before coming to RecursiveQueryExec.
154155
// TODO: we might be able to handle multiple partitions in the future.
155156
if partition != 0 {
@@ -306,8 +307,14 @@ impl RecursiveQueryStream {
306307
}
307308
}
308309

309-
self.recursive_stream =
310-
Some(self.recursive_term.execute(0, self.task_context.clone())?);
310+
println!("executing recursive term");
311+
312+
// let partition = self.task_context.set_and_increment_partition();
313+
let partition = 0;
314+
self.recursive_stream = Some(
315+
self.recursive_term
316+
.execute(partition, self.task_context.clone())?,
317+
);
311318
self.poll_next(cx)
312319
}
313320
}

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,7 @@ impl ExecutionPlan for RepartitionExec {
483483

484484
// if this is the first partition to be invoked then we need to set up initial state
485485
if state.channels.is_empty() {
486+
println!("channels are empty");
486487
let (txs, rxs) = if self.preserve_order {
487488
let (txs, rxs) =
488489
partition_aware_channels(num_input_partitions, num_output_partitions);
@@ -555,6 +556,8 @@ impl ExecutionPlan for RepartitionExec {
555556
);
556557

557558
println!("execute partition: {}", partition);
559+
println!("channel number: {}", state.channels.len());
560+
println!("preserve order {}", self.preserve_order);
558561

559562
// now return stream for the specified *output* partition which will
560563
// read from the channel
@@ -597,11 +600,13 @@ impl ExecutionPlan for RepartitionExec {
597600
merge_reservation,
598601
)
599602
} else {
603+
let input = rx.swap_remove(0);
604+
println!("input partition: {:?}", input);
600605
Ok(Box::pin(RepartitionStream {
601606
num_input_partitions,
602607
num_input_partitions_processed: 0,
603608
schema: self.input.schema(),
604-
input: rx.swap_remove(0),
609+
input,
605610
drop_helper: Arc::clone(&state.abort_helper),
606611
reservation,
607612
}))

datafusion/sql/src/query.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
120120
// it as a CTE.
121121
planner_context.insert_cte(cte_name.clone(), named_rel);
122122

123+
// this uses the named_relation we inserted above to resolve the
124+
// relation
123125
let recursive_plan = self
124126
.set_expr_to_plan(*right, &mut planner_context.clone())?;
125127

0 commit comments

Comments
 (0)