Skip to content

Commit 3fa3e8b

Browse files
committed
chore: Clean up
1 parent 7146684 commit 3fa3e8b

File tree

6 files changed

+63
-67
lines changed

6 files changed

+63
-67
lines changed

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
5858
use datafusion_physical_plan::projection::ProjectionExec;
5959
use datafusion_physical_plan::sorts::sort::SortExec;
6060
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
61-
use datafusion_physical_plan::source::DataSourceExec;
6261
use datafusion_physical_plan::union::UnionExec;
6362
use datafusion_physical_plan::ExecutionPlanProperties;
6463
use datafusion_physical_plan::PlanProperties;

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1096,6 +1096,7 @@ fn replace_order_preserving_variants(
10961096
context.update_plan_from_children()
10971097
}
10981098

1099+
/// Replace the round robin repartition with on-demand repartition when prefer_round_robin_repartition is set to false.
10991100
fn replace_round_robin_repartition_with_on_demand(
11001101
mut context: DistributionContext,
11011102
) -> Result<DistributionContext> {
@@ -1413,6 +1414,7 @@ pub fn ensure_distribution(
14131414
}
14141415
}
14151416
}
1417+
// when prefer_round_robin_repartition is set to false, replace round robin repartition with on-demand repartition
14161418
if !prefer_round_robin_repartition {
14171419
child = replace_round_robin_repartition_with_on_demand(child)?;
14181420
}
@@ -1501,6 +1503,5 @@ fn update_children(mut dist_context: DistributionContext) -> Result<Distribution
15011503
dist_context.data = false;
15021504
Ok(dist_context)
15031505
}
1504-
// See tests in datafusion/core/tests/physical_optimizer
15051506

15061507
// See tests in datafusion/core/tests/physical_optimizer

datafusion/physical-plan/src/repartition/distributor_channels.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ pub fn channels<T>(
7979
(senders, receivers)
8080
}
8181

82-
pub fn tokio_channels<T>(
82+
/// Create `n` empty mpsc channels with unbounded capacity.
83+
pub fn unbounded_channels<T>(
8384
n: usize,
8485
) -> (
8586
Vec<tokio::sync::mpsc::UnboundedSender<T>>,
@@ -115,7 +116,7 @@ pub fn on_demand_partition_aware_channels<T>(
115116
OnDemandPartitionAwareSenders<T>,
116117
OnDemandPartitionAwareReceivers<T>,
117118
) {
118-
(0..n_in).map(|_| tokio_channels(n_out)).unzip()
119+
(0..n_in).map(|_| unbounded_channels(n_out)).unzip()
119120
}
120121

121122
/// Erroring during [send](DistributionSender::send).

datafusion/physical-plan/src/repartition/on_demand_repartition.rs

Lines changed: 57 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ use std::sync::Arc;
2727
use std::task::{Context, Poll};
2828
use std::{any::Any, vec};
2929

30-
use super::distributor_channels::{on_demand_partition_aware_channels, tokio_channels};
30+
use super::distributor_channels::{
31+
on_demand_partition_aware_channels, unbounded_channels,
32+
};
3133
use super::metrics::{ExecutionPlanMetricsSet, MetricsSet};
3234
use super::{
3335
DisplayAs, ExecutionPlanProperties, MaybeBatch, RecordBatchStream,
@@ -58,51 +60,17 @@ use futures::{ready, StreamExt, TryStreamExt};
5860
use log::trace;
5961
use parking_lot::Mutex;
6062

63+
/// Channels for sending output partition number to the operator.
6164
type PartitionChannels = (Vec<Sender<usize>>, Vec<Receiver<usize>>);
6265

63-
/// The OnDemandRepartitionExec operator repartitions the input data based on a push-based model.
64-
/// It is similar to the RepartitionExec operator, but it doesn't distribute the data to the output
65-
/// partitions until the output partitions request the data.
66-
///
67-
/// When polling, the operator sends the output partition number to the one partition channel, then the prefetch buffer will distribute the data based on the order of the partition number.
68-
/// Each input steams has a prefetch buffer(channel) to distribute the data to the output partitions.
69-
///
70-
/// The following diagram illustrates the data flow of the OnDemandRepartitionExec operator with 3 output partitions for the input stream 1:
71-
/// ```text
72-
/// /\ /\ /\
73-
/// ││ ││ ││
74-
/// ││ ││ ││
75-
/// ││ ││ ││
76-
/// ┌───────┴┴────────┐ ┌───────┴┴────────┐ ┌───────┴┴────────┐
77-
/// │ Stream │ │ Stream │ │ Stream │
78-
/// │ (1) │ │ (2) │ │ (3) │
79-
/// └────────┬────────┘ └───────┬─────────┘ └────────┬────────┘
80-
/// │ │ │ / \
81-
/// │ │ │ | |
82-
/// │ │ │ | |
83-
/// └────────────────┐ │ ┌──────────────────┘ | |
84-
/// │ │ │ | |
85-
/// ▼ ▼ ▼ | |
86-
/// ┌─────────────────┐ | |
87-
/// Send the partition │ partion channel │ | |
88-
/// number when polling │ │ | |
89-
/// └────────┬────────┘ | |
90-
/// │ | |
91-
/// │ | |
92-
/// │ Get the partition number | |
93-
/// ▼ then send data | |
94-
/// ┌─────────────────┐ | |
95-
/// │ Prefetch Buffer │───────────────────┘ |
96-
/// │ (1) │─────────────────────┘
97-
/// └─────────────────┘ Distribute data to the output partitions
98-
///
99-
/// ```
66+
/// The OnDemandRepartitionExec operator cannot use the custom DistributionSender and DistributionReceiver because it can prevent channels from filling up endlessly.
10067
type OnDemandDistributionSender = tokio::sync::mpsc::UnboundedSender<MaybeBatch>;
10168
type OnDemandDistributionReceiver = tokio::sync::mpsc::UnboundedReceiver<MaybeBatch>;
10269

10370
type OnDemandInputPartitionsToCurrentPartitionSender = Vec<OnDemandDistributionSender>;
10471
type OnDemandInputPartitionsToCurrentPartitionReceiver =
10572
Vec<OnDemandDistributionReceiver>;
73+
10674
/// Inner state of [`OnDemandRepartitionExec`].
10775
#[derive(Debug)]
10876
struct OnDemandRepartitionExecState {
@@ -143,7 +111,7 @@ fn create_on_demand_repartition_channels(
143111
// create one channel per *output* partition
144112
// note we use a custom channel that ensures there is always data for each receiver
145113
// but limits the amount of buffering if required.
146-
let (txs, rxs) = tokio_channels(num_output_partitions);
114+
let (txs, rxs) = unbounded_channels(num_output_partitions);
147115
// Clone sender for each input partitions
148116
let txs = txs
149117
.into_iter()
@@ -258,6 +226,43 @@ impl OnDemandRepartitionExecState {
258226
}
259227
}
260228

229+
/// The OnDemandRepartitionExec operator repartitions the input data based on a push-based model.
230+
/// It is similar to the RepartitionExec operator, but it doesn't distribute the data to the output
231+
/// partitions until the output partitions request the data.
232+
///
233+
/// When polling, the operator sends the output partition number to the one partition channel, then the prefetch buffer will distribute the data based on the order of the partition number.
234+
/// Each input steams has a prefetch buffer(channel) to distribute the data to the output partitions.
235+
///
236+
/// The following diagram illustrates the data flow of the OnDemandRepartitionExec operator with 3 output partitions for the input stream 1:
237+
/// ```text
238+
/// /\ /\ /\
239+
/// ││ ││ ││
240+
/// ││ ││ ││
241+
/// ││ ││ ││
242+
/// ┌───────┴┴────────┐ ┌───────┴┴────────┐ ┌───────┴┴────────┐
243+
/// │ Stream │ │ Stream │ │ Stream │
244+
/// │ (1) │ │ (2) │ │ (3) │
245+
/// └────────┬────────┘ └───────┬─────────┘ └────────┬────────┘
246+
/// │ │ │ / \
247+
/// │ │ │ | |
248+
/// │ │ │ | |
249+
/// └────────────────┐ │ ┌──────────────────┘ | |
250+
/// │ │ │ | |
251+
/// ▼ ▼ ▼ | |
252+
/// ┌─────────────────┐ | |
253+
/// Send the partition │ partion channel │ | |
254+
/// number when polling │ │ | |
255+
/// └────────┬────────┘ | |
256+
/// │ | |
257+
/// │ | |
258+
/// │ Get the partition number | |
259+
/// ▼ then send data | |
260+
/// ┌─────────────────┐ | |
261+
/// │ Prefetch Buffer │───────────────────┘ |
262+
/// │ (1) │─────────────────────┘
263+
/// └─────────────────┘ Distribute data to the output partitions
264+
///
265+
/// ```
261266
#[derive(Debug, Clone)]
262267
pub struct OnDemandRepartitionExec {
263268
base: RepartitionExecBase,
@@ -961,14 +966,13 @@ mod tests {
961966
use super::*;
962967
use crate::{
963968
collect,
964-
memory::MemorySourceConfig,
965-
source::DataSourceExec,
966969
test::{
967970
assert_is_pending,
968971
exec::{
969972
assert_strong_count_converges_to_zero, BarrierExec, BlockingExec,
970973
ErrorExec, MockExec,
971974
},
975+
TestMemoryExec,
972976
},
973977
};
974978

@@ -1018,11 +1022,8 @@ mod tests {
10181022
) -> Result<Vec<Vec<RecordBatch>>> {
10191023
let task_ctx = Arc::new(TaskContext::default());
10201024
// create physical plan
1021-
let exec = MemorySourceConfig::try_new_exec(
1022-
&input_partitions,
1023-
Arc::clone(schema),
1024-
None,
1025-
)?;
1025+
let exec =
1026+
TestMemoryExec::try_new_exec(&input_partitions, Arc::clone(schema), None)?;
10261027
let exec = OnDemandRepartitionExec::try_new(exec, partitioning)?;
10271028

10281029
// execute and collect results
@@ -1082,8 +1083,7 @@ mod tests {
10821083
let schema = test_schema();
10831084
let partition: Vec<RecordBatch> = create_vec_batches(2);
10841085
let partitions = vec![partition.clone(), partition.clone()];
1085-
let input =
1086-
MemorySourceConfig::try_new_exec(&partitions, Arc::clone(&schema), None)?;
1086+
let input = TestMemoryExec::try_new_exec(&partitions, Arc::clone(&schema), None)?;
10871087
let exec =
10881088
OnDemandRepartitionExec::try_new(input, Partitioning::OnDemand(3)).unwrap();
10891089

@@ -1452,19 +1452,15 @@ mod tests {
14521452
Arc::new(UInt32Array::from(vec![1, 2, 3, 4])) as ArrayRef,
14531453
)])?;
14541454

1455-
let source = Arc::new(DataSourceExec::new(Arc::new(
1456-
MemorySourceConfig::try_new(
1457-
&[vec![batch.clone()]],
1458-
Arc::clone(&schema),
1459-
None,
1460-
)
1461-
.unwrap()
1462-
.try_with_sort_information(vec![sort_exprs])
1463-
.unwrap(),
1455+
let source = Arc::new(TestMemoryExec::update_cache(Arc::new(
1456+
TestMemoryExec::try_new(&[vec![batch.clone()]], Arc::clone(&schema), None)
1457+
.unwrap()
1458+
.try_with_sort_information(vec![sort_exprs])
1459+
.unwrap(),
14641460
)));
14651461

14661462
// output has multiple partitions, and is sorted
1467-
let union = UnionExec::new(vec![Arc::<DataSourceExec>::clone(&source), source]);
1463+
let union = UnionExec::new(vec![Arc::<TestMemoryExec>::clone(&source), source]);
14681464
let repartition_exec =
14691465
OnDemandRepartitionExec::try_new(Arc::new(union), Partitioning::OnDemand(5))
14701466
.unwrap()
@@ -1559,15 +1555,15 @@ mod tests {
15591555
}
15601556

15611557
fn memory_exec(schema: &SchemaRef) -> Arc<dyn ExecutionPlan> {
1562-
MemorySourceConfig::try_new_exec(&[vec![]], Arc::clone(schema), None).unwrap()
1558+
TestMemoryExec::try_new_exec(&[vec![]], Arc::clone(schema), None).unwrap()
15631559
}
15641560

15651561
fn sorted_memory_exec(
15661562
schema: &SchemaRef,
15671563
sort_exprs: LexOrdering,
15681564
) -> Arc<dyn ExecutionPlan> {
1569-
Arc::new(DataSourceExec::new(Arc::new(
1570-
MemorySourceConfig::try_new(&[vec![]], Arc::clone(schema), None)
1565+
Arc::new(TestMemoryExec::update_cache(Arc::new(
1566+
TestMemoryExec::try_new(&[vec![]], Arc::clone(schema), None)
15711567
.unwrap()
15721568
.try_with_sort_information(vec![sort_exprs])
15731569
.unwrap(),

datafusion/proto/src/logical_plan/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1417,7 +1417,6 @@ impl AsLogicalPlan for LogicalPlanNode {
14171417
Partitioning::RoundRobinBatch(partition_count) => {
14181418
PartitionMethod::RoundRobin(*partition_count as u64)
14191419
}
1420-
14211420
Partitioning::DistributeBy(_) => {
14221421
return not_impl_err!("DistributeBy")
14231422
}

datafusion/sqllogictest/test_files/on_demand_repartition.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ EXPLAIN SELECT
426426
----
427427
logical_plan
428428
01)Projection: sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
429-
02)--WindowAggr: windowExpr=[[count(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
429+
02)--WindowAggr: windowExpr=[[count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
430430
03)----Projection: aggregate_test_100.c1, aggregate_test_100.c2, sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
431431
04)------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
432432
05)--------TableScan: aggregate_test_100 projection=[c1, c2, c4]

0 commit comments

Comments
 (0)