Skip to content

.*: Box all Futures that are larger than 8KiB #23283

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,5 @@ disallowed-types = [
{ path = "std::collections::HashMap", reason = "use `std::collections::BTreeMap` or `mz_ore::collections::HashMap` instead" },
{ path = "std::collections::HashSet", reason = "use `std::collections::BTreeSet` or `mz_ore::collections::HashSet` instead" },
]

future-size-threshold = 8192
3 changes: 3 additions & 0 deletions misc/python/materialize/cli/gen-lints.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@
# Implementing `From` gives you `Into` for free, but the reverse is not
# true.
"clippy::from_over_into",
# Futures get compiled into state machines, which can end up being very large (10's of KB).
# It's inefficient to moves these around on the stack, so require that they get boxed.
Comment on lines +179 to +180
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be useful to include a link to rust-lang/rust#99504 here.

"clippy::large_futures",
]


Expand Down
1 change: 1 addition & 0 deletions src/adapter-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
#![warn(clippy::disallowed_macros)]
#![warn(clippy::disallowed_types)]
#![warn(clippy::from_over_into)]
#![warn(clippy::large_futures)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very exited about moving these to the workspace Cargo.toml and getting out of the business of having to touch every crate when we adjust a lint :)

// END LINT CONFIG

//! Types for the adapter.
Expand Down
1 change: 1 addition & 0 deletions src/adapter/benches/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
#![warn(clippy::disallowed_macros)]
#![warn(clippy::disallowed_types)]
#![warn(clippy::from_over_into)]
#![warn(clippy::large_futures)]
// END LINT CONFIG

use std::str::FromStr;
Expand Down
14 changes: 11 additions & 3 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4272,6 +4272,7 @@ mod tests {
use std::time::{Duration, Instant};
use std::{env, iter};

use futures::future::FutureExt;
use itertools::Itertools;
use tokio_postgres::types::Type;
use tokio_postgres::NoTls;
Expand Down Expand Up @@ -4384,6 +4385,7 @@ mod tests {
}
catalog.expire().await;
})
.boxed()
.await
}

Expand Down Expand Up @@ -4582,6 +4584,7 @@ mod tests {
);
catalog.expire().await;
})
.boxed()
.await
}

Expand All @@ -4608,6 +4611,7 @@ mod tests {
);
catalog.expire().await;
})
.boxed()
.await;
}

Expand Down Expand Up @@ -4787,6 +4791,7 @@ mod tests {
);
catalog.expire().await;
})
.boxed()
.await;
}

Expand All @@ -4810,6 +4815,7 @@ mod tests {
);
catalog.expire().await;
})
.boxed()
.await;
}

Expand Down Expand Up @@ -5208,7 +5214,7 @@ mod tests {
catalog.expire().await;
}

Catalog::with_debug(NOW_ZERO.clone(), inner).await
Catalog::with_debug(NOW_ZERO.clone(), inner).boxed().await
}

// Execute all builtin functions with all combinations of arguments from interesting datums.
Expand Down Expand Up @@ -5358,8 +5364,9 @@ mod tests {
handles
}

let handles =
Catalog::with_debug(NOW_ZERO.clone(), |catalog| async { inner(catalog) }).await;
let handles = Catalog::with_debug(NOW_ZERO.clone(), |catalog| async { inner(catalog) })
.boxed()
.await;
for handle in handles {
handle.await.expect("must succeed");
}
Expand Down Expand Up @@ -5578,6 +5585,7 @@ mod tests {
}
catalog.expire().await;
})
.boxed()
.await
}
}
2 changes: 2 additions & 0 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1854,6 +1854,7 @@ fn default_logging_config() -> ReplicaLogging {
mod builtin_migration_tests {
use std::collections::{BTreeMap, BTreeSet};

use futures::FutureExt;
use itertools::Itertools;
use mz_catalog::memory::objects::Table;

Expand Down Expand Up @@ -2152,6 +2153,7 @@ mod builtin_migration_tests {
);
catalog.expire().await;
})
.boxed()
.await
}

Expand Down
5 changes: 4 additions & 1 deletion src/adapter/src/config/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

use std::time::Duration;

use futures::FutureExt;
use tokio::time;

use crate::config::{
Expand All @@ -32,7 +33,9 @@ pub async fn system_parameter_sync(

// Ensure the frontend client is initialized.
let mut frontend = Option::<SystemParameterFrontend>::None; // lazy initialize the frontend below
let mut backend = SystemParameterBackend::new(adapter_client).await?;

// Note: the Future is intentionally boxed because it is very large.
let mut backend = SystemParameterBackend::new(adapter_client).boxed().await?;

// Tick every `tick_duration` ms, skipping missed ticks.
let mut interval = time::interval(tick_interval);
Expand Down
7 changes: 6 additions & 1 deletion src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ impl Coordinator {

self.handle_execute(portal_name, session, tx, outer_ctx_extra)
.instrument(span)
// Note: the Future is intentionally boxed because it is very large.
.boxed_local()
.await;
}

Expand Down Expand Up @@ -445,7 +447,10 @@ impl Coordinator {
_ => {}
}

self.handle_execute_inner(stmt, params, ctx).await
self.handle_execute_inner(stmt, params, ctx)
// Note: the Future is intentionally boxed because it is very large.
.boxed_local()
.await
}

#[tracing::instrument(level = "debug", skip(self, ctx))]
Expand Down
43 changes: 33 additions & 10 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ use crate::util::{ComputeSinkId, ResultExt};
use crate::{catalog, AdapterNotice, TimestampContext};

impl Coordinator {
/// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 74KB. This would
/// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
/// Because of that we purposefully move this Future onto the heap (i.e. Box it).
/// Note: the returned Future is intentionally boxed because it is very large.
pub(crate) fn handle_message<'a>(&'a mut self, msg: Message) -> LocalBoxFuture<'a, ()> {
async move {
match msg {
Expand All @@ -60,13 +58,22 @@ impl Coordinator {
}
}
Message::PurifiedStatementReady(ready) => {
self.message_purified_statement_ready(ready).await
self.message_purified_statement_ready(ready)
// Note: the Future is intentionally boxed because it is very large.
.boxed_local()
.await
}
Message::CreateConnectionValidationReady(ready) => {
self.message_create_connection_validation_ready(ready).await
self.message_create_connection_validation_ready(ready)
// Note: the Future is intentionally boxed because it is very large.
.boxed_local()
.await
}
Message::WriteLockGrant(write_lock_guard) => {
self.message_write_lock_grant(write_lock_guard).await;
self.message_write_lock_grant(write_lock_guard)
// Note: the Future is intentionally boxed because it is very large.
.boxed_local()
.await;
}
Message::GroupCommitInitiate(span, permit) => {
// Add an OpenTelemetry link to our current span.
Expand Down Expand Up @@ -118,26 +125,35 @@ impl Coordinator {
real_time_recency_ts,
validity,
)
// Note: the Future is intentionally boxed because it is very large.
.boxed_local()
.await;
}
Message::RetireExecute { data, reason } => {
self.retire_execution(reason, data);
}
Message::ExecuteSingleStatementTransaction { ctx, stmt, params } => {
self.sequence_execute_single_statement_transaction(ctx, stmt, params)
// Note: the Future is intentionally boxed because it is very large.
.boxed_local()
.await;
}
Message::PeekStageReady { ctx, stage } => {
self.sequence_peek_stage(ctx, stage).await;
// Note: the Future is intentionally boxed because it is very large.
self.sequence_peek_stage(ctx, stage).boxed_local().await;
}
Message::PeekStageDeprecatedReady { ctx, stage } => {
// Allow while the introduction of the new optimizer API in
// #20569 is in progress.
#[allow(deprecated)]
self.sequence_peek_stage_deprecated(ctx, stage).await;
self.sequence_peek_stage_deprecated(ctx, stage)
// Note: the Future is intentionally boxed because it is very large.
.boxed_local()
.await;
}
Message::DrainStatementLog => {
self.drain_statement_log().await;
// Note: the Future is intentionally boxed because it is very large.
self.drain_statement_log().boxed_local().await;
}
}
}
Expand Down Expand Up @@ -362,7 +378,10 @@ impl Coordinator {
.iter()
.all(|id| self.catalog().try_get_entry(id).is_some())
{
self.handle_execute_inner(original_stmt, params, ctx).await;
self.handle_execute_inner(original_stmt, params, ctx)
// Note: the Future is intentionally boxed because it is very large.
.boxed_local()
.await;
return;
}

Expand Down Expand Up @@ -711,6 +730,8 @@ impl Coordinator {
global_mir_plan,
}),
)
// Note: the Future is intentionally boxed because it is very large.
.boxed_local()
.await;
}
RealTimeRecencyContext::PeekDeprecated {
Expand Down Expand Up @@ -754,6 +775,8 @@ impl Coordinator {
dataflow_metainfo,
}),
)
// Note: the Future is intentionally boxed because it is very large.
.boxed_local()
.await;
}
}
Expand Down
34 changes: 28 additions & 6 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ impl Coordinator {
}
Plan::CreateConnection(plan) => {
self.sequence_create_connection(ctx, plan, resolved_ids)
// Note: the Future is intentionally boxed because it is very large.
.boxed_local()
.await;
}
Plan::CreateDatabase(plan) => {
Expand Down Expand Up @@ -196,7 +198,10 @@ impl Coordinator {
ctx.retire(result);
}
Plan::CreateSink(plan) => {
self.sequence_create_sink(ctx, plan, resolved_ids).await;
self.sequence_create_sink(ctx, plan, resolved_ids)
// Note: the Future is intentionally boxed because it is very large.
.boxed_local()
.await;
}
Plan::CreateView(plan) => {
let result = self
Expand All @@ -207,6 +212,8 @@ impl Coordinator {
Plan::CreateMaterializedView(plan) => {
let result = self
.sequence_create_materialized_view(ctx.session_mut(), plan, resolved_ids)
// Note: the Future is intentionally boxed because it is very large.
.boxed_local()
.await;
ctx.retire(result);
}
Expand Down Expand Up @@ -298,10 +305,16 @@ impl Coordinator {
AdapterNotice::ExplicitTransactionControlInImplicitTransaction,
);
}
self.sequence_end_transaction(ctx, action).await;
self.sequence_end_transaction(ctx, action)
// Note: the Future is intentionally boxed because it is very large.
.boxed_local()
.await;
}
Plan::Select(plan) => {
self.sequence_peek(ctx, plan, target_cluster).await;
self.sequence_peek(ctx, plan, target_cluster)
// Note: the Future is intentionally boxed because it is very large.
.boxed_local()
.await;
}
Plan::Subscribe(plan) => {
if enable_unified_optimizer_api {
Expand All @@ -327,6 +340,8 @@ impl Coordinator {
}
Plan::ShowColumns(show_columns_plan) => {
self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster)
// Note: the Future is intentionally boxed because it is very large.
.boxed_local()
.await;
}
Plan::CopyFrom(plan) => {
Expand All @@ -353,10 +368,12 @@ impl Coordinator {
.await;
}
Plan::Insert(plan) => {
self.sequence_insert(ctx, plan).await;
// Note: the Future is intentionally boxed because it is very large.
self.sequence_insert(ctx, plan).boxed_local().await;
}
Plan::ReadThenWrite(plan) => {
self.sequence_read_then_write(ctx, plan).await;
// Note: the Future is intentionally boxed because it is very large.
self.sequence_read_then_write(ctx, plan).boxed_local().await;
}
Plan::AlterNoop(plan) => {
ctx.retire(Ok(ExecuteResponse::AlteredObject(plan.object_type)));
Expand Down Expand Up @@ -439,6 +456,8 @@ impl Coordinator {
} => {
let result = self
.sequence_alter_source(ctx.session_mut(), alter_source, subsources)
// Note: the Future is intentionally boxed because it is very large.
.boxed_local()
.await;
ctx.retire(result);
}
Expand Down Expand Up @@ -627,7 +646,10 @@ impl Coordinator {
let (sub_tx, sub_rx) = oneshot::channel();
let sub_ct = ClientTransmitter::new(sub_tx, self.internal_cmd_tx.clone());
let sub_ctx = ExecuteContext::from_parts(sub_ct, internal_cmd_tx, session, extra);
self.handle_execute_inner(stmt, params, sub_ctx).await;
self.handle_execute_inner(stmt, params, sub_ctx)
// Note: the Future is intentionally boxed because it is very large.
.boxed_local()
.await;

// The response can need off-thread processing. Wait for it elsewhere so the coordinator can
// continue processing.
Expand Down
Loading