Skip to content

Allow task-based subscription processing via the subscription manager #3709

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

fselmo
Copy link
Collaborator

@fselmo fselmo commented May 23, 2025

What was wrong?

Closes #3672

How was it fixed?

  • For high-throughput subscriptions, where order of processing is not important, we should allow a configuration to be set to process tasks in a task-based manner - creating a task for each subscription and processing them in parallel.

  • Add a parallelize flag to the subscription manager, defaulting to False, but allowing this to be set to True to enable task-based, parallelized processing.

  • This should be able to be controlled on a per-subscription basis as well. If sub-second blocks need to be processed in order but, say, processing some information about transactions in the mempool (newPendingTransactions) does not, this should be able to be accounted for as well. Adds a parallelize flag to subscriptions which overrides whatever the manager setting is for that particular subscription - defaults to None which means the manager controls the global processing state.

Todo:

  • Clean up commit history
  • Add or update documentation related to these changes
  • Add entry to the release notes

Cute Animal Picture

Put a link to a cute animal picture inside the parenthesis-->

@fselmo fselmo force-pushed the subscription-throughput-increase branch from e6f5210 to 3d0a47d Compare May 23, 2025 16:44
- For high-throughput subscriptions, where order of processing is not important,
  we should allow a configuration to be set to process tasks in a task-based
  manner - creating a task for each subscription and processing them in parallel.

- Add ``task_based`` configuration to the subscription manager, defaulting to
  ``False``, but allowing this to be set to true to enable task-based processing.
@fselmo fselmo force-pushed the subscription-throughput-increase branch from 3d0a47d to 9af9808 Compare May 23, 2025 17:05
Copy link
Collaborator

@kclowes kclowes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! 🚀 (after tests pass :) )

- Propagate subscription manager tasks exceptions to the main loop.
- Allow configuring exceptions to be logged silently, rather than raised.
- Add task cleanup.
@fselmo
Copy link
Collaborator Author

fselmo commented May 27, 2025

I cleaned this up a bit but I while I think this mostly keeps FIFO, I think the correct implementation here is setting off a task for each subscription to process its own loop in the background but to keep messages in order for that subscription. I think for most tasks this won't matter but either way it would be the right way to do this.

I'm going to put this away for a little bit as there are more pressing tasks ahead of this one but I already have a clean idea for how to handle that with the current manager structure.


also, rip py38 🙁

@fselmo fselmo force-pushed the subscription-throughput-increase branch from 55a6fac to 5f4c57a Compare May 28, 2025 16:14
fselmo added 2 commits May 28, 2025 10:19
- Some subscriptions may be ok to parallelize, but others may not.
  If a user wants to parallelize pending transactions but not sub-second
  block headers, they should be able to set ``parallelize`` on a
  per-subscription basis.

- Add a ``parallelize`` parameter to subscriptions and rename the
  flag on the manager to be the same.

- Add tests for overriding the parallelization of certain subscriptions
  when the manager is not parallelizing, and if the manager is parallelizing
  by default but the subscription should not be.
- Add test for eth.subscribe with all kwargs and make sure they
  are passed through when creating the subscription
- Clean up old ``_subscribe_with_args``
@fselmo fselmo force-pushed the subscription-throughput-increase branch from 5f4c57a to ec2278b Compare May 28, 2025 16:19
@fselmo
Copy link
Collaborator Author

fselmo commented May 28, 2025

OK nevermind. I thought about it some more and I think this maybe is the only way to go. I don't see enough subscriptions on a single web3 instance to where they each need their own queue. That feels like over-engineering for a niche case that I don't even think exists yet today. If a user is subscribing across many chains they will have many different web3 instances so they can parallelize each instance and they should be within their own contexts with their own managers.

I do think it's useful to parallelize on a per-subscription basis as you may want to parallelize newPendingTransactions, for example, but not necessarily newHeads. Something like newHeads on a chain that does sub-second blocks would matter and you likely want to process blocks in order.

So instead, I built on the manager configuration as a global flag and allowed per-subscription management as well. I added some good tests for this but it's till missing the docs. I will try to update the docs soon but marking this Ready for review so the code can be reviewed for now.

@fselmo fselmo marked this pull request as ready for review May 28, 2025 16:19
@fselmo fselmo requested review from kclowes, reedsa and pacrob May 28, 2025 16:19
@fselmo fselmo self-assigned this May 28, 2025
Copy link
Contributor

@fubuloubu fubuloubu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR looks good (will try to test it early next week if I can), however I'm a bit confused about the disclaimer concerning OOO processing of subscription data. Shouldn't it be fairly transparent based on the behavior of the node, regardless of whether its processed sequentially or thrown into a background task queue?

At least in my experience the data flows in based on the order the node provides it. I don't see anything in this update that would modify that behavior, so I want to make sure I'm not missing something about the change.

Is the disclaimer more that you cannot guarantee the order in which the background tasks get executed/completed once the callback is made on new message data? If so, that's okay for our use case, because we are using the callback to populate a message queue which maintains ordered execution.

@fselmo
Copy link
Collaborator Author

fselmo commented May 30, 2025

Is the disclaimer more that you cannot guarantee the order in which the background tasks get executed/completed once the callback is made on new message data?

Yeah, I just mean that you can't guarantee the order of when the task processing finishes. They will still all start in the order they were received, but web3.py can't guarantee the amount of logic you're throwing at each task in the handler. So let's say you are trying to keep track of balances and putting this data in a database, if you end up processing a future block faster than the previous block because you had to do more calculations on that block, you can run into a race condition where your shared state is not being updated sequentially, which could lead to some miscalculations.

That's just one example but it really depends on the conditionals defined in the handler of the task. If tasks are running through the same logic, they'll still all likely be processed in order, it's just no longer a guarantee when you're yielding control across tasks.

Does that make sense? The documentation for this change should be quite clear on this and I haven't started on that yet. But, for this reason, the default setting should be that all processing is done within the main loop imo.

@fubuloubu
Copy link
Contributor

Is the disclaimer more that you cannot guarantee the order in which the background tasks get executed/completed once the callback is made on new message data?

Yeah, I just mean that you can't guarantee the order of when the task processing finishes. They will still all start in the order they were received, but web3.py can't guarantee the amount of logic you're throwing at each task in the handler. So let's say you are trying to keep track of balances and putting this data in a database, if you end up processing a future block faster than the previous block because you had to do more calculations on that block, you can run into a race condition where your shared state is not being updated sequentially, which could lead to some miscalculations.

That's just one example but it really depends on the conditionals defined in the handler of the task. If tasks are running through the same logic, they'll still all likely be processed in order, it's just no longer a guarantee when you're yielding control across tasks.

Does that make sense? The documentation for this change should be quite clear on this and I haven't started on that yet. But, for this reason, the default setting should be that all processing is done within the main loop imo.

Okay, yeah that makes sense, I agree with the default behevaior being essentially synchronous, and this feature having the disclaimer that even though the subscriptions loop is single-threaded/single-process, it does pay to understand OOO execution across coroutines may affect when the tasks complete with parallelize=True

@pacrob pacrob removed their request for review June 2, 2025 18:55
@fselmo fselmo removed the request for review from reedsa June 5, 2025 17:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Subscription manager waits on handler to execute
3 participants