Skip to content

[Feature request] Support alternative future executor handling for compute-bounded futures #94

Open
@jlizen

Description

@jlizen

I am willing to contribute work on this. Sharing for broader feedback after a conversation with djc@ and ctz@. There is also a thread in the rustls discord: https://discord.com/channels/976380008299917365/1313647498061025411

Overview

Today, this crate wraps a heterogenous set of futures, some of which are I/O bounded (eg, network hops), some of which are compute bounded (eg, crypto operations). In the case of initial handshake, those compute operations can impact the performance of the tokio executor by starving worker threads.

I've encountered some pathological cases of this where applications are needing to establish many connections very quickly, and the executor grinds to a halt. It also highly impacts current-thread runtime. The impact on multi-threaded runtimes without pathological amounts of connections is lessened, but still non-zero.

Meanwhile, approaches like delegating the tls handshake to a secondary threadpool with lower priority threads, dramatically improves throughput, both for the tls handshake work as well as other tasks.

Given that we know the composition of the work inside these futures, it would be great to enable more sophisticated handling of the futures that might block the executor.

Task::spawn_blocking as it stands today isn't quite right as is can spawn many more threads than cores, and also is fairly opinionated in that can force expensive thread-local initializations and cause other issues. Task::block_in_place has its own limitations.

Ultimately I think the goal state here would be:

  • tokio-rustls is able to signal to the tokio executor that these futures are likely to block / be long running (at least for initial handshake)
  • tokio runtime has perhaps some smarter default handling than a naive spawn_blocking
  • the caller can optionally tune the tokio runtime behavior directly via tokio config, rather than bubbling up config from eg tokio-rustls -> hyper-rustls -> hyper-util -> reqwest

There is a conversation in progress about this sort of possibility in the #tokio-internals channel of the tokio discord.

As that conversation progresses, I think that there is an intermediate step available here under an experimental feature flag, that would be a good POC for such functionality:

  • create a new experimental crate that accepts a special type of call, let's call it spawn_compute_heavy_future, and applies special handling to it
  • add a wrapper call in tokio-rustls that checks an a cfg flag to decide whether to delegate to the experimental crate, otherwise it just awaits the provided future
    • the cfg flag is important since then calling crates won't need to specify any tokio-rustls feature specifically, just the end application with the flag

We could also embed this directly into tokio-rustls rather than an experimental crate, depending on maintainer preference.

Suggested approach

A couple of guidelines:

  • no change to the tokio-rustls interface for callers (besides the feature flag)
  • try to support both 'standard' use cases (the occasional new handshake, no need for long-lived threadpool) and the 'hyperscalar' use case (potentially pathological cases with many connections, want a separate threadpool)

We'll have a lot more options if support for this lands in tokio, since then you can configure via runtime config, do clever things with unwinding the poll fn call for 'sometimes blocking' futures that might prefer thread locality, pre-emptively kick tasks to other worker queues, overloading spawn_blocking, etc.

In the meantime I suggest that we just use a public static constant OnceLock which can be initialized with a strategy for spawn_compute_heavy_future. This allows the outermost caller to inject config, without propagating it through intermediate crates, which better simulates what it would be like to embed it in tokio. Or we could fall back to defaults.

If the caller doesn't specify anything, we can implicitly set it to one of two options:

  • if the current runtime is a multi-threaded flavor, use block_in_place to call received futures
  • if the current runtime is current thread flavor, use spawn_blocking

I think that the above is a decent middle ground for 'somewhat better' than the current behavior for the caller establishing occasional connections, and it's behind an experimental flag so we could document this opinionated behavior and guide users to evaluate it and consider overriding it. We'll hopefully come up with some cooler things to do natively on tokio, before we ever lift this out of the experimental flag.

Alternately, the caller CAN specify arbitrary behavior, such as injecting their own secondary executor backed by a threadpool. We could provide a sample usage showing a simple version of that, but I don't think it belongs as a default in the crate since anyway calling applications using threadpools probably have other logic they want to incorporate.

Rough example:
Experimental crate contents:

    use std::{future::Future, pin::Pin, sync::OnceLock};
    use tokio::{
        runtime::{Handle, RuntimeFlavor},
        select, task,
    };

    /// optionally configured by caller, otherwise falls back to default based on runtime
    pub static EXPERIMENTAL_CPU_HEAVY_TOKIO_EXECUTOR_STRATEGY: OnceLock<CpuHeavyFutureExecutor> =
        OnceLock::new();

    pub type CustomExecutorClosureInput = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;

    pub type CustomExecutorClosure = dyn Fn(CustomExecutorClosureInput) + Send + Sync;

    pub enum CpuHeavyFutureExecutor {
        SpawnBlocking,
        BlockInPlace,
        Custom(Box<CustomExecutorClosure>),
    }

    impl CpuHeavyFutureExecutor {
        /// Send the future to a blocking tokio thread. By default, tokio will spin up a blocking thread
        /// per task, which may be more than your count of CPU cores, depending on runtime config.
        /// If you expect many concurrent cpu-heavy futures, consider limiting your blocking tokio threadpool
        /// size or using `CpuHeavyFutureExecutor::custom()` with a different threadpool.
        pub fn spawn_blocking() -> Self {
            CpuHeavyFutureExecutor::SpawnBlocking
        }

        /// Calls task::block_in_place on the current worker, and evicts other tasks on same worker thread
        /// to avoid blocking them. Can starve your executor of worker threads if called with too many
        /// concurrent cpu-heavy futures.
        pub fn block_in_place() -> Self {
            CpuHeavyFutureExecutor::BlockInPlace
        }

        /// Accepts a closure that will execute the provided future on a background task or different
        /// threadpool, and immediately return. `CpuHeavyFutureExecutor` will use a oneshot
        /// channel to await the result of the provided future, in the foreground.
        pub fn custom(custom_executor_closure: Box<CustomExecutorClosure>) -> Self {
            CpuHeavyFutureExecutor::Custom(custom_executor_closure)
        }
    }

    pub async fn spawn_compute_heavy_future<F, R>(fut: F) -> Result<R, String>
    where
        F: Future<Output = R> + Send + 'static,
        R: Send + 'static,
    {
        let executor = EXPERIMENTAL_CPU_HEAVY_TOKIO_EXECUTOR_STRATEGY
            .get()
            .unwrap_or_else(|| match Handle::current().runtime_flavor() {
                RuntimeFlavor::CurrentThread => &CpuHeavyFutureExecutor::SpawnBlocking,
                _ => &CpuHeavyFutureExecutor::BlockInPlace,
            });
        match executor {
            CpuHeavyFutureExecutor::BlockInPlace => execute_block_in_place(fut).await,
            CpuHeavyFutureExecutor::SpawnBlocking => execute_spawn_blocking(fut).await,
            CpuHeavyFutureExecutor::Custom(custom_executor_closure) => {
                send_to_custom_executor(fut, Box::new(custom_executor_closure)).await
            }
        }
    }

    async fn send_to_custom_executor<T: Send + 'static>(
        future: impl Future<Output = T> + Send + 'static,
        custom_executor_closure: Box<CustomExecutorClosure>,
    ) -> Result<T, String> {
        let (mut tx, rx) = tokio::sync::oneshot::channel();
        custom_executor_closure(Box::pin(async move {
            select!(
                _ = tx.closed() => {
                    // receiver already dropped, don't need to do anything
                }
                result = future => {
                    // if this fails, the receiver already dropped, so we don't need to do anything
                    let _ = tx.send(result);
                }
            )
        }));

        rx.await
            .map_err(|err| format!("error awaiting response from threadpool: {err}"))
    }

    async fn execute_spawn_blocking<F, R>(fut: F) -> Result<R, String>
    where
        F: Future<Output = R> + Send + 'static,
        R: Send + 'static,
    {
        task::spawn_blocking(move || Handle::current().block_on(async { fut.await }))
            .await
            .map_err(|err| format!("error awaiting spawn_blocking handle: {err}"))
    }

    async fn execute_block_in_place<F, R>(fut: F) -> Result<R, String>
    where
        F: Future<Output = R> + Send + 'static,
        R: Send + 'static,
    {
        Ok(task::block_in_place(move || {
            Handle::current().block_on(async { fut.await })
        }))
    }

And then a sample usage with a custom closure backed by a threadpool:

use std::{future::Future, pin::Pin, sync::OnceLock};

use tokio::{runtime::Handle, sync::mpsc::error::TrySendError};

use compute_heavy_executor_experimental::{
    CpuHeavyFutureExecutor, CustomExecutorClosureInput,
    EXPERIMENTAL_CPU_HEAVY_TOKIO_EXECUTOR_STRATEGY,
};

static CPU_HEAVY_THREAD_POOL: OnceLock<
    tokio::sync::mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
> = OnceLock::new();

#[tokio::main]
async fn main() {
    // spin up our background threadpool + start a tokio executor on it, listening on our mpsc channel in our oncelock
    init_custom_executor();
    let closure = Box::new(move |fut| custom_executor_closure(fut));
    let compute_heavy_executor = CpuHeavyFutureExecutor::custom(Box::new(closure));
    EXPERIMENTAL_CPU_HEAVY_TOKIO_EXECUTOR_STRATEGY
        .set(compute_heavy_executor)
        .unwrap_or_else(|_| {
            panic!("EXPERIMENTAL_CPU_HEAVY_TOKIO_EXECUTOR_STRATEGY already initialized")
        });
}

fn init_custom_executor() {
    std::thread::Builder::new()
        .name("cpu-heavy-threadpool".to_string())
        .spawn(move || {
            let rt = tokio::runtime::Builder::new_multi_thread()
                .thread_name("cpu-heavy-pool-thread")
                .worker_threads(num_cpus::get() as usize)
                // ref: https://github.com/tokio-rs/tokio/issues/4941
                // consider uncommenting if seeing heavy task contention
                // .disable_lifo_slot()
                .on_thread_start(move || unsafe {
                    // Reduce thread pool thread niceness, so they are lower priority
                    // than the foreground executor and don't interfere with I/O tasks
                    #[cfg(target_os = "linux")]
                    {
                        *libc::__errno_location() = 0;
                        if libc::nice(10) == -1 && *libc::__errno_location() != 0 {
                            let error = std::io::Error::last_os_error();
                            tracing::log::error!("failed to set threadpool niceness: {}", error);
                        }
                    }
                })
                .enable_all()
                .build()
                .unwrap_or_else(|e| panic!("cpu heavy runtime failed_to_initialize: {}", e));
            rt.block_on(async {
                tracing::log::debug!("starting background cpu work");
                process_cpu_work().await;
            });
        })
        .unwrap_or_else(|e| panic!("cpu heavy thread failed_to_initialize: {}", e));
}

async fn process_cpu_work() {
    let (tx, mut rx) = tokio::sync::mpsc::channel(10);
    CPU_HEAVY_THREAD_POOL.set(tx).unwrap();

    while let Some(work) = rx.recv().await {
        tokio::task::spawn(work);
    }
}

fn custom_executor_closure(fut: CustomExecutorClosureInput) {
    let tx = CPU_HEAVY_THREAD_POOL
        .get()
        .expect("Call process_cpu_work() before using cpu heavy threadpool")
        .clone();

    match tx.try_send(Box::pin(fut)) {
        Ok(_) => (),
        Err(TrySendError::Closed(_)) => {
            panic!("background cpu heavy threadpool channel is closed")
        }
        Err(TrySendError::Full(msg)) => {
            tracing::log::warn!("background channel is full, task spawning loop delayed");
            Handle::current().spawn(async move {
                tx.send(msg)
                    .await
                    .expect("background cpu heavy threadpool channel is closed")
            });
        }
    }
}

Questions

  • For folks that have noticed perf issues due to TLS, does this suit your use case?
  • Any initial thoughts on what futures make sense to move behind such handling? I'm thinking, mostly during the initial TLS handshake only, whenever we process bytes? I haven't poked too deeply yet.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions