Feature: Parallel message processing #796
Open
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
The goal of this PR is to process messages in parallel based on address
Related Clickup or Jira tickets : ALEPH-XXX
Self proofreading checklist
Changes
This pull request introduces significant enhancements to the handling of pending messages in the database and the processing pipeline. The changes include the addition of a new accessor method for fetching batches of messages by address, improvements to the message processing logic for concurrency, and updates to type hints and imports for better clarity and functionality.
Database Accessor Enhancements:
get_next_pending_messages_by_address
, insrc/aleph/db/accessors/pending_messages.py
to fetch batches of pending messages grouped by address. This method supports filtering by fetched status, excluded hashes, and excluded addresses, and ensures efficient querying by limiting results to a specified batch size.Message Processing Pipeline Improvements:
process_messages
method insrc/aleph/jobs/process_pending_messages.py
to support concurrent processing of messages using asyncio tasks and semaphores. This allows multiple addresses to be processed in parallel while maintaining a limit on the number of concurrent tasks.process_message_batch
method for handling batches of messages associated with the same address, ensuring that tasks are properly cleaned up upon completion.Type Hint and Import Updates:
src/aleph/db/accessors/pending_messages.py
andsrc/aleph/jobs/process_pending_messages.py
to includeList
andSet
for improved type clarity. [1] [2]src/aleph/jobs/process_pending_messages.py
to include new dependencies such asPendingMessageDb
andget_next_pending_messages_by_address
. [1] [2]These changes enhance the scalability and maintainability of the system by improving the efficiency of pending message retrieval and processing.
How to test
To be tested you can sync a node
Process
Print screen / video
Upload here print screens or videos showing the changes if relevant.
Notes