Skip to content

Commit 2ed1871

Browse files
committed
box all futures that are larger than 8KiB
1 parent f02a444 commit 2ed1871

File tree

44 files changed

+635
-329
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+635
-329
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/adapter/src/catalog.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4276,6 +4276,7 @@ mod tests {
42764276
use std::time::{Duration, Instant};
42774277
use std::{env, iter};
42784278

4279+
use futures::future::FutureExt;
42794280
use itertools::Itertools;
42804281
use tokio_postgres::types::Type;
42814282
use tokio_postgres::NoTls;
@@ -4388,6 +4389,7 @@ mod tests {
43884389
}
43894390
catalog.expire().await;
43904391
})
4392+
.boxed()
43914393
.await
43924394
}
43934395

@@ -4586,6 +4588,7 @@ mod tests {
45864588
);
45874589
catalog.expire().await;
45884590
})
4591+
.boxed()
45894592
.await
45904593
}
45914594

@@ -4612,6 +4615,7 @@ mod tests {
46124615
);
46134616
catalog.expire().await;
46144617
})
4618+
.boxed()
46154619
.await;
46164620
}
46174621

@@ -4791,6 +4795,7 @@ mod tests {
47914795
);
47924796
catalog.expire().await;
47934797
})
4798+
.boxed()
47944799
.await;
47954800
}
47964801

@@ -4814,6 +4819,7 @@ mod tests {
48144819
);
48154820
catalog.expire().await;
48164821
})
4822+
.boxed()
48174823
.await;
48184824
}
48194825

@@ -5212,7 +5218,7 @@ mod tests {
52125218
catalog.expire().await;
52135219
}
52145220

5215-
Catalog::with_debug(NOW_ZERO.clone(), inner).await
5221+
Catalog::with_debug(NOW_ZERO.clone(), inner).boxed().await
52165222
}
52175223

52185224
// Execute all builtin functions with all combinations of arguments from interesting datums.
@@ -5362,8 +5368,9 @@ mod tests {
53625368
handles
53635369
}
53645370

5365-
let handles =
5366-
Catalog::with_debug(NOW_ZERO.clone(), |catalog| async { inner(catalog) }).await;
5371+
let handles = Catalog::with_debug(NOW_ZERO.clone(), |catalog| async { inner(catalog) })
5372+
.boxed()
5373+
.await;
53675374
for handle in handles {
53685375
handle.await.expect("must succeed");
53695376
}
@@ -5582,6 +5589,7 @@ mod tests {
55825589
}
55835590
catalog.expire().await;
55845591
})
5592+
.boxed()
55855593
.await
55865594
}
55875595
}

src/adapter/src/catalog/open.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1854,6 +1854,7 @@ fn default_logging_config() -> ReplicaLogging {
18541854
mod builtin_migration_tests {
18551855
use std::collections::{BTreeMap, BTreeSet};
18561856

1857+
use futures::FutureExt;
18571858
use itertools::Itertools;
18581859
use mz_catalog::memory::objects::Table;
18591860

@@ -2152,6 +2153,7 @@ mod builtin_migration_tests {
21522153
);
21532154
catalog.expire().await;
21542155
})
2156+
.boxed()
21552157
.await
21562158
}
21572159

src/adapter/src/config/sync.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
use std::time::Duration;
1111

12+
use futures::FutureExt;
1213
use tokio::time;
1314

1415
use crate::config::{
@@ -32,7 +33,9 @@ pub async fn system_parameter_sync(
3233

3334
// Ensure the frontend client is initialized.
3435
let mut frontend = Option::<SystemParameterFrontend>::None; // lazy initialize the frontend below
35-
let mut backend = SystemParameterBackend::new(adapter_client).await?;
36+
37+
// Note: the Future is intentionally boxed because it is very large.
38+
let mut backend = SystemParameterBackend::new(adapter_client).boxed().await?;
3639

3740
// Tick every `tick_duration` ms, skipping missed ticks.
3841
let mut interval = time::interval(tick_interval);

src/adapter/src/coord/command_handler.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ impl Coordinator {
106106

107107
self.handle_execute(portal_name, session, tx, outer_ctx_extra)
108108
.instrument(span)
109+
// Note: the Future is intentionally boxed because it is very large.
110+
.boxed_local()
109111
.await;
110112
}
111113

@@ -445,7 +447,10 @@ impl Coordinator {
445447
_ => {}
446448
}
447449

448-
self.handle_execute_inner(stmt, params, ctx).await
450+
self.handle_execute_inner(stmt, params, ctx)
451+
// Note: the Future is intentionally boxed because it is very large.
452+
.boxed_local()
453+
.await
449454
}
450455

451456
#[tracing::instrument(level = "debug", skip(self, ctx))]

src/adapter/src/coord/message_handler.rs

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,7 @@ use crate::util::{ComputeSinkId, ResultExt};
4242
use crate::{catalog, AdapterNotice, TimestampContext};
4343

4444
impl Coordinator {
45-
/// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 74KB. This would
46-
/// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
47-
/// Because of that we purposefully move this Future onto the heap (i.e. Box it).
45+
/// Note: the returned Future is intentionally boxed because it is very large.
4846
pub(crate) fn handle_message<'a>(&'a mut self, msg: Message) -> LocalBoxFuture<'a, ()> {
4947
async move {
5048
match msg {
@@ -60,13 +58,22 @@ impl Coordinator {
6058
}
6159
}
6260
Message::PurifiedStatementReady(ready) => {
63-
self.message_purified_statement_ready(ready).await
61+
self.message_purified_statement_ready(ready)
62+
// Note: the Future is intentionally boxed because it is very large.
63+
.boxed_local()
64+
.await
6465
}
6566
Message::CreateConnectionValidationReady(ready) => {
66-
self.message_create_connection_validation_ready(ready).await
67+
self.message_create_connection_validation_ready(ready)
68+
// Note: the Future is intentionally boxed because it is very large.
69+
.boxed_local()
70+
.await
6771
}
6872
Message::WriteLockGrant(write_lock_guard) => {
69-
self.message_write_lock_grant(write_lock_guard).await;
73+
self.message_write_lock_grant(write_lock_guard)
74+
// Note: the Future is intentionally boxed because it is very large.
75+
.boxed_local()
76+
.await;
7077
}
7178
Message::GroupCommitInitiate(span, permit) => {
7279
// Add an OpenTelemetry link to our current span.
@@ -118,26 +125,35 @@ impl Coordinator {
118125
real_time_recency_ts,
119126
validity,
120127
)
128+
// Note: the Future is intentionally boxed because it is very large.
129+
.boxed_local()
121130
.await;
122131
}
123132
Message::RetireExecute { data, reason } => {
124133
self.retire_execution(reason, data);
125134
}
126135
Message::ExecuteSingleStatementTransaction { ctx, stmt, params } => {
127136
self.sequence_execute_single_statement_transaction(ctx, stmt, params)
137+
// Note: the Future is intentionally boxed because it is very large.
138+
.boxed_local()
128139
.await;
129140
}
130141
Message::PeekStageReady { ctx, stage } => {
131-
self.sequence_peek_stage(ctx, stage).await;
142+
// Note: the Future is intentionally boxed because it is very large.
143+
self.sequence_peek_stage(ctx, stage).boxed_local().await;
132144
}
133145
Message::PeekStageDeprecatedReady { ctx, stage } => {
134146
// Allow while the introduction of the new optimizer API in
135147
// #20569 is in progress.
136148
#[allow(deprecated)]
137-
self.sequence_peek_stage_deprecated(ctx, stage).await;
149+
self.sequence_peek_stage_deprecated(ctx, stage)
150+
// Note: the Future is intentionally boxed because it is very large.
151+
.boxed_local()
152+
.await;
138153
}
139154
Message::DrainStatementLog => {
140-
self.drain_statement_log().await;
155+
// Note: the Future is intentionally boxed because it is very large.
156+
self.drain_statement_log().boxed_local().await;
141157
}
142158
}
143159
}
@@ -362,7 +378,10 @@ impl Coordinator {
362378
.iter()
363379
.all(|id| self.catalog().try_get_entry(id).is_some())
364380
{
365-
self.handle_execute_inner(original_stmt, params, ctx).await;
381+
self.handle_execute_inner(original_stmt, params, ctx)
382+
// Note: the Future is intentionally boxed because it is very large.
383+
.boxed_local()
384+
.await;
366385
return;
367386
}
368387

@@ -711,6 +730,8 @@ impl Coordinator {
711730
global_mir_plan,
712731
}),
713732
)
733+
// Note: the Future is intentionally boxed because it is very large.
734+
.boxed_local()
714735
.await;
715736
}
716737
RealTimeRecencyContext::PeekDeprecated {
@@ -754,6 +775,8 @@ impl Coordinator {
754775
dataflow_metainfo,
755776
}),
756777
)
778+
// Note: the Future is intentionally boxed because it is very large.
779+
.boxed_local()
757780
.await;
758781
}
759782
}

src/adapter/src/coord/sequencer.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,8 @@ impl Coordinator {
156156
}
157157
Plan::CreateConnection(plan) => {
158158
self.sequence_create_connection(ctx, plan, resolved_ids)
159+
// Note: the Future is intentionally boxed because it is very large.
160+
.boxed_local()
159161
.await;
160162
}
161163
Plan::CreateDatabase(plan) => {
@@ -196,7 +198,10 @@ impl Coordinator {
196198
ctx.retire(result);
197199
}
198200
Plan::CreateSink(plan) => {
199-
self.sequence_create_sink(ctx, plan, resolved_ids).await;
201+
self.sequence_create_sink(ctx, plan, resolved_ids)
202+
// Note: the Future is intentionally boxed because it is very large.
203+
.boxed_local()
204+
.await;
200205
}
201206
Plan::CreateView(plan) => {
202207
let result = self
@@ -207,6 +212,8 @@ impl Coordinator {
207212
Plan::CreateMaterializedView(plan) => {
208213
let result = self
209214
.sequence_create_materialized_view(ctx.session_mut(), plan, resolved_ids)
215+
// Note: the Future is intentionally boxed because it is very large.
216+
.boxed_local()
210217
.await;
211218
ctx.retire(result);
212219
}
@@ -298,10 +305,16 @@ impl Coordinator {
298305
AdapterNotice::ExplicitTransactionControlInImplicitTransaction,
299306
);
300307
}
301-
self.sequence_end_transaction(ctx, action).await;
308+
self.sequence_end_transaction(ctx, action)
309+
// Note: the Future is intentionally boxed because it is very large.
310+
.boxed_local()
311+
.await;
302312
}
303313
Plan::Select(plan) => {
304-
self.sequence_peek(ctx, plan, target_cluster).await;
314+
self.sequence_peek(ctx, plan, target_cluster)
315+
// Note: the Future is intentionally boxed because it is very large.
316+
.boxed_local()
317+
.await;
305318
}
306319
Plan::Subscribe(plan) => {
307320
if enable_unified_optimizer_api {
@@ -327,6 +340,8 @@ impl Coordinator {
327340
}
328341
Plan::ShowColumns(show_columns_plan) => {
329342
self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster)
343+
// Note: the Future is intentionally boxed because it is very large.
344+
.boxed_local()
330345
.await;
331346
}
332347
Plan::CopyFrom(plan) => {
@@ -353,10 +368,12 @@ impl Coordinator {
353368
.await;
354369
}
355370
Plan::Insert(plan) => {
356-
self.sequence_insert(ctx, plan).await;
371+
// Note: the Future is intentionally boxed because it is very large.
372+
self.sequence_insert(ctx, plan).boxed_local().await;
357373
}
358374
Plan::ReadThenWrite(plan) => {
359-
self.sequence_read_then_write(ctx, plan).await;
375+
// Note: the Future is intentionally boxed because it is very large.
376+
self.sequence_read_then_write(ctx, plan).boxed_local().await;
360377
}
361378
Plan::AlterNoop(plan) => {
362379
ctx.retire(Ok(ExecuteResponse::AlteredObject(plan.object_type)));
@@ -439,6 +456,8 @@ impl Coordinator {
439456
} => {
440457
let result = self
441458
.sequence_alter_source(ctx.session_mut(), alter_source, subsources)
459+
// Note: the Future is intentionally boxed because it is very large.
460+
.boxed_local()
442461
.await;
443462
ctx.retire(result);
444463
}
@@ -627,7 +646,10 @@ impl Coordinator {
627646
let (sub_tx, sub_rx) = oneshot::channel();
628647
let sub_ct = ClientTransmitter::new(sub_tx, self.internal_cmd_tx.clone());
629648
let sub_ctx = ExecuteContext::from_parts(sub_ct, internal_cmd_tx, session, extra);
630-
self.handle_execute_inner(stmt, params, sub_ctx).await;
649+
self.handle_execute_inner(stmt, params, sub_ctx)
650+
// Note: the Future is intentionally boxed because it is very large.
651+
.boxed_local()
652+
.await;
631653

632654
// The response can need off-thread processing. Wait for it elsewhere so the coordinator can
633655
// continue processing.

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use std::sync::Arc;
1616
use std::time::{Duration, Instant};
1717

1818
use anyhow::anyhow;
19-
use futures::future::BoxFuture;
19+
use futures::future::{BoxFuture, FutureExt};
2020
use itertools::Itertools;
2121
use maplit::{btreemap, btreeset};
2222
use mz_cloud_resources::VpcEndpointConfig;
@@ -2171,6 +2171,8 @@ impl Coordinator {
21712171
target_cluster,
21722172
}),
21732173
)
2174+
// Note: the Future is intentionally boxed because it is very large.
2175+
.boxed_local()
21742176
.await;
21752177
} else {
21762178
// Allow while the introduction of the new optimizer API in
@@ -2183,6 +2185,8 @@ impl Coordinator {
21832185
target_cluster,
21842186
}),
21852187
)
2188+
// Note: the Future is intentionally boxed because it is very large.
2189+
.boxed_local()
21862190
.await;
21872191
}
21882192
}
@@ -4060,6 +4064,8 @@ impl Coordinator {
40604064
};
40614065

40624066
self.sequence_read_then_write(ctx, read_then_write_plan)
4067+
// Note: the Future is intentionally boxed because it is very large.
4068+
.boxed_local()
40634069
.await;
40644070
}
40654071
}
@@ -4178,6 +4184,8 @@ impl Coordinator {
41784184
},
41794185
TargetCluster::Active,
41804186
)
4187+
// Note: the Future is intentionally boxed because it is very large.
4188+
.boxed_local()
41814189
.await;
41824190

41834191
let internal_cmd_tx = self.internal_cmd_tx.clone();

0 commit comments

Comments
 (0)