Skip to content

Commit 7d3565a

Browse files
committed
partway through porting over isidentical's work
1 parent 78d9613 commit 7d3565a

File tree

4 files changed

+208
-0
lines changed

4 files changed

+208
-0
lines changed

datafusion/execution/src/task.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ use crate::{
3333
runtime_env::{RuntimeConfig, RuntimeEnv},
3434
};
3535

36+
use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
37+
use futures::channel::mpsc::Receiver as SingleChannelReceiver;
38+
// use futures::lock::Mutex;
39+
use parking_lot::Mutex;
40+
// use futures::
41+
42+
type RelationHandler = SingleChannelReceiver<ArrowResult<RecordBatch>>;
43+
3644
/// Task Execution Context
3745
///
3846
/// A [`TaskContext`] contains the state available during a single
@@ -56,6 +64,8 @@ pub struct TaskContext {
5664
window_functions: HashMap<String, Arc<WindowUDF>>,
5765
/// Runtime environment associated with this task context
5866
runtime: Arc<RuntimeEnv>,
67+
/// Registered relation handlers
68+
relation_handlers: Mutex<HashMap<String, RelationHandler>>,
5969
}
6070

6171
impl Default for TaskContext {
@@ -72,6 +82,7 @@ impl Default for TaskContext {
7282
aggregate_functions: HashMap::new(),
7383
window_functions: HashMap::new(),
7484
runtime: Arc::new(runtime),
85+
relation_handlers: Mutex::new(HashMap::new()),
7586
}
7687
}
7788
}
@@ -99,6 +110,7 @@ impl TaskContext {
99110
aggregate_functions,
100111
window_functions,
101112
runtime,
113+
relation_handlers: Mutex::new(HashMap::new()),
102114
}
103115
}
104116

@@ -171,6 +183,34 @@ impl TaskContext {
171183
self.runtime = runtime;
172184
self
173185
}
186+
187+
/// Register a new relation handler. If a handler with the same name already exists
188+
/// this function will return an error.
189+
pub fn push_relation_handler(
190+
&self,
191+
name: String,
192+
handler: RelationHandler,
193+
) -> Result<()> {
194+
let mut handlers = self.relation_handlers.lock();
195+
if handlers.contains_key(&name) {
196+
return Err(DataFusionError::Internal(format!(
197+
"Relation handler {} already registered",
198+
name
199+
)));
200+
}
201+
handlers.insert(name, handler);
202+
Ok(())
203+
}
204+
205+
/// Retrieve the relation handler for the given name. It will remove the handler from
206+
/// the storage if it exists, and return it as is.
207+
pub fn pop_relation_handler(&self, name: String) -> Result<RelationHandler> {
208+
let mut handlers = self.relation_handlers.lock();
209+
210+
handlers.remove(name.as_str()).ok_or_else(|| {
211+
DataFusionError::Internal(format!("Relation handler {} not registered", name))
212+
})
213+
}
174214
}
175215

176216
impl FunctionRegistry for TaskContext {
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Defines the continuance query plan
19+
20+
use std::any::Any;
21+
use std::sync::Arc;
22+
23+
// use crate::error::{DataFusionError, Result};
24+
// use crate::physical_plan::{
25+
// DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
26+
// };
27+
use arrow::datatypes::SchemaRef;
28+
use datafusion_execution::TaskContext;
29+
use datafusion_physical_expr::Partitioning;
30+
31+
use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};
32+
33+
use super::expressions::PhysicalSortExpr;
34+
use super::stream::RecordBatchReceiverStream;
35+
use super::{
36+
metrics::{ExecutionPlanMetricsSet, MetricsSet},
37+
SendableRecordBatchStream, Statistics,
38+
};
39+
use datafusion_common::{DataFusionError, Result};
40+
41+
// use crate::exe::context::TaskContext;
42+
43+
/// A temporary "working table" operation wehre the input data will be
44+
/// taken from the named handle during the execution and will be re-published
45+
/// as is (kind of like a mirror).
46+
///
47+
/// Most notably used in the implementation of recursive queries where the
48+
/// underlying relation does not exist yet but the data will come as the previous
49+
/// term is evaluated.
50+
#[derive(Debug)]
51+
pub struct ContinuanceExec {
52+
/// Name of the relation handler
53+
name: String,
54+
/// The schema of the stream
55+
schema: SchemaRef,
56+
/// Execution metrics
57+
metrics: ExecutionPlanMetricsSet,
58+
}
59+
60+
impl ContinuanceExec {
61+
/// Create a new execution plan for a continuance stream. The given relation
62+
/// handler must exist in the task context before calling [`execute`] on this
63+
/// plan.
64+
pub fn new(name: String, schema: SchemaRef) -> Self {
65+
Self {
66+
name,
67+
schema,
68+
metrics: ExecutionPlanMetricsSet::new(),
69+
}
70+
}
71+
}
72+
73+
impl DisplayAs for ContinuanceExec {
74+
fn fmt_as(
75+
&self,
76+
t: DisplayFormatType,
77+
f: &mut std::fmt::Formatter,
78+
) -> std::fmt::Result {
79+
match t {
80+
DisplayFormatType::Default | DisplayFormatType::Verbose => {
81+
// TODO: add more details
82+
write!(f, "ContinuanceExec: name={}", self.name)
83+
}
84+
}
85+
}
86+
}
87+
88+
impl ExecutionPlan for ContinuanceExec {
89+
fn as_any(&self) -> &dyn Any {
90+
self
91+
}
92+
93+
fn schema(&self) -> SchemaRef {
94+
self.schema.clone()
95+
}
96+
97+
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
98+
vec![]
99+
}
100+
101+
fn output_partitioning(&self) -> Partitioning {
102+
Partitioning::UnknownPartitioning(1)
103+
}
104+
105+
fn maintains_input_order(&self) -> Vec<bool> {
106+
vec![false]
107+
}
108+
109+
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
110+
vec![false]
111+
}
112+
113+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
114+
None
115+
}
116+
117+
fn with_new_children(
118+
self: Arc<Self>,
119+
_: Vec<Arc<dyn ExecutionPlan>>,
120+
) -> Result<Arc<dyn ExecutionPlan>> {
121+
Ok(Arc::new(ContinuanceExec::new(
122+
self.name.clone(),
123+
self.schema.clone(),
124+
)))
125+
}
126+
127+
/// This plan does not come with any special streams, but rather we use
128+
/// the existing [`RecordBatchReceiverStream`] to receive the data from
129+
/// the registered handle.
130+
fn execute(
131+
&self,
132+
partition: usize,
133+
context: Arc<TaskContext>,
134+
) -> Result<SendableRecordBatchStream> {
135+
// Continuance streams must be the plan base.
136+
if partition != 0 {
137+
return Err(DataFusionError::Internal(format!(
138+
"ContinuanceExec got an invalid partition {} (expected 0)",
139+
partition
140+
)));
141+
}
142+
143+
// let stream = Box::pin(CombinedRecordBatchStream::new(
144+
// self.schema(),
145+
// input_stream_vec,
146+
// ));
147+
// return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
148+
149+
// The relation handler must be already registered by the
150+
// parent op.
151+
let receiver = context.pop_relation_handler(self.name.clone())?;
152+
// TODO: this looks wrong.
153+
Ok(RecordBatchReceiverStream::builder(self.schema.clone(), 1).build())
154+
}
155+
156+
fn metrics(&self) -> Option<MetricsSet> {
157+
Some(self.metrics.clone_inner())
158+
}
159+
160+
fn statistics(&self) -> Statistics {
161+
Statistics::default()
162+
}
163+
}
164+
165+
#[cfg(test)]
166+
mod tests {}

datafusion/physical-plan/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ pub mod analyze;
337337
pub mod coalesce_batches;
338338
pub mod coalesce_partitions;
339339
pub mod common;
340+
pub mod continuance;
340341
pub mod display;
341342
pub mod empty;
342343
pub mod explain;

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use datafusion::physical_plan::windows::{create_window_expr, WindowAggExec};
4747
use datafusion::physical_plan::{
4848
udaf, AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, WindowExpr,
4949
};
50+
// use datafusion::physical_plan::con
5051
use datafusion_common::FileCompressionType;
5152
use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
5253
use prost::bytes::BufMut;

0 commit comments

Comments
 (0)