Feat(Stream): Use redis stream #5
Merged
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
The whole PR got inspired from these two PR: here and here. Also I took a look at this library to get some idea.
First PR try to use redis stream but still using _pool_iteration which is somehow utilizing the old implementation without any benefit. Not sure what stream feature is actually happening there.
However, it has some nice ideas, like passing the flag to decide whether the user wants to use the stream or not.
On the other hand, the second PR has a more accurate usage of stream, but still has some weird ideas for implementation. On one hand, it assume the user only wants to use stream by removing the old implementation while enqueuing the job.
The second thing that I didn't like was registering 3 different task in the worker:
run_delayed_queue_poller
always tries to read jobs from the simple queue and put it in the stream. If you take a close look, it has some logic, some places to keep putting the task in the queue, and again read it here and put it in the stream. STRANGErun_stream_reader
is the actual method to read tasks from the stream.run_idle_consumer_cleanup
also tries to remove idle consumers from the Redis. Imagine if we have multiple workers who always try to clean Redis. Strange again. I try to address removing the consumer in the close method like this part of the library. We need to clean up the consumer because we are generating a consumer name each time the worker is brought up. I can't imagine any easy way to consistently generate a unique name for our consumers.Also I removed _unclaim_job from the implementation since from my understanding, the usage was to unclaim any job that consumer is trying to get while it is already taken by another consumer. Well, that is the whole point of using Redis stream to prevent such scenario and it shouldn't happen unless there is something wrong with Redis.
As a final note, I also added some improvements in some places to make the logic more robust, like if self.use_stream