Skip to content

Commit f5ba2a3

Browse files
committed
Make YieldStream public to allow static dispatch
1 parent 4211db7 commit f5ba2a3

File tree

6 files changed

+38
-44
lines changed

6 files changed

+38
-44
lines changed

datafusion/physical-plan/src/aggregates/no_grouping.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::aggregates::{
2222
AggregateMode,
2323
};
2424
use crate::metrics::{BaselineMetrics, RecordOutput};
25-
use crate::{RecordBatchStream, SendableRecordBatchStream};
25+
use crate::RecordBatchStream;
2626
use arrow::datatypes::SchemaRef;
2727
use arrow::record_batch::RecordBatch;
2828
use datafusion_common::Result;
@@ -35,7 +35,7 @@ use std::task::{Context, Poll};
3535

3636
use super::AggregateExec;
3737
use crate::filter::batch_filter;
38-
use crate::poll_budget::PollBudget;
38+
use crate::poll_budget::{PollBudget, YieldStream};
3939
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
4040
use futures::stream::{Stream, StreamExt};
4141

@@ -55,7 +55,7 @@ pub(crate) struct AggregateStream {
5555
struct AggregateStreamInner {
5656
schema: SchemaRef,
5757
mode: AggregateMode,
58-
input: SendableRecordBatchStream,
58+
input: YieldStream,
5959
baseline_metrics: BaselineMetrics,
6060
aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
6161
filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
4646
use datafusion_execution::TaskContext;
4747
use datafusion_physical_expr::equivalence::join_equivalence_properties;
4848

49-
use crate::poll_budget::PollBudget;
49+
use crate::poll_budget::{PollBudget, YieldStream};
5050
use async_trait::async_trait;
5151
use futures::{ready, Stream, StreamExt, TryStreamExt};
5252

@@ -189,7 +189,7 @@ impl CrossJoinExec {
189189

190190
/// Asynchronously collect the result of the left child
191191
async fn load_left_input(
192-
stream: SendableRecordBatchStream,
192+
stream: YieldStream,
193193
metrics: BuildProbeJoinMetrics,
194194
reservation: MemoryReservation,
195195
) -> Result<JoinLeftData> {

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ use datafusion_physical_expr::equivalence::{
8181
use datafusion_physical_expr::PhysicalExprRef;
8282
use datafusion_physical_expr_common::datum::compare_op_for_nested;
8383

84-
use crate::poll_budget::PollBudget;
84+
use crate::poll_budget::{PollBudget, YieldStream};
8585
use ahash::RandomState;
8686
use datafusion_physical_expr_common::physical_expr::fmt_sql;
8787
use futures::{ready, Stream, StreamExt, TryStreamExt};
@@ -953,7 +953,7 @@ impl ExecutionPlan for HashJoinExec {
953953
/// hash table (`LeftJoinData`)
954954
async fn collect_left_input(
955955
random_state: RandomState,
956-
left_stream: SendableRecordBatchStream,
956+
left_stream: YieldStream,
957957
on_left: Vec<PhysicalExprRef>,
958958
metrics: BuildProbeJoinMetrics,
959959
reservation: MemoryReservation,

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ use datafusion_physical_expr::equivalence::{
6161
join_equivalence_properties, ProjectionMapping,
6262
};
6363

64-
use crate::poll_budget::PollBudget;
64+
use crate::poll_budget::{PollBudget, YieldStream};
6565
use futures::{ready, Stream, StreamExt, TryStreamExt};
6666
use parking_lot::Mutex;
6767

@@ -626,7 +626,7 @@ impl ExecutionPlan for NestedLoopJoinExec {
626626

627627
/// Asynchronously collect input into a single batch, and creates `JoinLeftData` from it
628628
async fn collect_left_input(
629-
stream: SendableRecordBatchStream,
629+
stream: YieldStream,
630630
join_metrics: BuildProbeJoinMetrics,
631631
reservation: MemoryReservation,
632632
with_visited_left_side: bool,

datafusion/physical-plan/src/poll_budget.rs

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,8 @@ impl PollBudget {
5454
}
5555
}
5656

57-
pub fn wrap_stream(
58-
&self,
59-
inner: SendableRecordBatchStream,
60-
) -> SendableRecordBatchStream {
61-
match self.budget {
62-
None => inner,
63-
Some(budget) => {
64-
Box::pin(YieldStream::new(inner, budget)) as SendableRecordBatchStream
65-
}
66-
}
57+
pub fn wrap_stream(&self, inner: SendableRecordBatchStream) -> YieldStream {
58+
YieldStream::new(inner, self.budget)
6759
}
6860
}
6961

@@ -102,18 +94,18 @@ impl Future for ConsumeBudget {
10294
}
10395
}
10496

105-
struct YieldStream {
97+
pub struct YieldStream {
10698
inner: SendableRecordBatchStream,
107-
budget: u8,
108-
remaining: u8,
99+
budget: Option<u8>,
100+
remaining: Option<u8>,
109101
}
110102

111103
impl YieldStream {
112-
pub fn new(inner: SendableRecordBatchStream, budget: u8) -> Self {
104+
pub fn new(inner: SendableRecordBatchStream, budget: Option<u8>) -> Self {
113105
Self {
114106
inner,
115107
budget,
116-
remaining: 0,
108+
remaining: budget,
117109
}
118110
}
119111
}
@@ -125,21 +117,23 @@ impl Stream for YieldStream {
125117
mut self: Pin<&mut Self>,
126118
cx: &mut Context<'_>,
127119
) -> Poll<Option<Self::Item>> {
128-
if self.remaining == 0 {
129-
self.remaining = self.budget;
130-
cx.waker().wake_by_ref();
131-
return Pending;
132-
}
133-
134-
match self.inner.poll_next_unpin(cx) {
135-
ready @ Ready(Some(_)) => {
136-
self.remaining -= 1;
137-
ready
138-
}
139-
other => {
120+
match self.remaining {
121+
None => self.inner.poll_next_unpin(cx),
122+
Some(0) => {
140123
self.remaining = self.budget;
141-
other
124+
cx.waker().wake_by_ref();
125+
Pending
142126
}
127+
Some(remaining) => match self.inner.poll_next_unpin(cx) {
128+
ready @ Ready(Some(_)) => {
129+
self.remaining = Some(remaining - 1);
130+
ready
131+
}
132+
other => {
133+
self.remaining = self.budget;
134+
other
135+
}
136+
},
143137
}
144138
}
145139
}

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,6 @@
1919
//! It will do in-memory sorting if it has enough memory budget
2020
//! but spills to disk if needed.
2121
22-
use std::any::Any;
23-
use std::fmt;
24-
use std::fmt::{Debug, Formatter};
25-
use std::sync::Arc;
26-
2722
use crate::common::spawn_buffered;
2823
use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
2924
use crate::expressions::PhysicalSortExpr;
@@ -43,6 +38,10 @@ use crate::{
4338
ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
4439
Statistics,
4540
};
41+
use std::any::Any;
42+
use std::fmt;
43+
use std::fmt::{Debug, Formatter};
44+
use std::sync::Arc;
4645

4746
use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
4847
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
@@ -1107,10 +1106,9 @@ impl ExecutionPlan for SortExec {
11071106
.equivalence_properties()
11081107
.ordering_satisfy_requirement(requirement);
11091108

1110-
let mut input = PollBudget::from(context.as_ref()).wrap_stream(input);
1111-
11121109
match (sort_satisfied, self.fetch.as_ref()) {
11131110
(true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
1111+
// limit is not a pipeline breaking stream, so poll budget is not required
11141112
input,
11151113
0,
11161114
Some(*fetch),
@@ -1128,6 +1126,7 @@ impl ExecutionPlan for SortExec {
11281126
context.runtime_env(),
11291127
&self.metrics_set,
11301128
)?;
1129+
let mut input = PollBudget::from(context.as_ref()).wrap_stream(input);
11311130
Ok(Box::pin(RecordBatchStreamAdapter::new(
11321131
self.schema(),
11331132
futures::stream::once(async move {
@@ -1154,6 +1153,7 @@ impl ExecutionPlan for SortExec {
11541153
&self.metrics_set,
11551154
context.runtime_env(),
11561155
)?;
1156+
let mut input = PollBudget::from(context.as_ref()).wrap_stream(input);
11571157
Ok(Box::pin(RecordBatchStreamAdapter::new(
11581158
self.schema(),
11591159
futures::stream::once(async move {

0 commit comments

Comments
 (0)