-
Notifications
You must be signed in to change notification settings - Fork 1k
Simplify logs fetching #1540
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simplify logs fetching #1540
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leoyvens I made some suggestions, but I am approving and leaving it up to you if you want to address these comments. If you make the requested changes feel free to re-request review as you see fit.
@@ -264,7 +260,7 @@ where | |||
from: u64, | |||
to: u64, | |||
filter: EthGetLogsFilter, | |||
) -> impl Future<Item = Vec<Log>, Error = Error> { | |||
) -> Pin<Box<dyn std::future::Future<Output = Result<Vec<Log>, Error>> + Send>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put a name in the prelude to make this common pattern terse. Prefer TryDynFuture<'static, Vec<Log>>
.
For bonus points, you can do TryDynFuture<'a, Vec<Log>>
to allow borrowing from self and other arguments in the future - eliminating both the existing pesky clones like the let eth = self.clone()
on line 280, as well as the freshly introduced clones on 291 - 294. I verified that doing so compiles and removes all the clones from the method except for the Arc (which could also be removed if callees are also modified to take references)
The idiomatic thing to do is to return TryDynFuture<'_, Vec<Log>>
and take all non-Copy
types by reference when migrating to async-await. This is just a suggestion though because it means fixing the caller as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh nice I'll use TryDynFuture
.
Did you manage to also fix the caller (logs_in_block_range
) when trying that change? The resulting lifetime error is pretty nasty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of fixing it I cut it off with a todo()!
just to test changes to the first function in isolation.
The way I've been fixing these is just to migrate the use of references up the stack until the compiler stops complaining. Sometimes this means all the way up to main. It's a bit of a pain, but each function migrated is progress so it'll get easier with time as it's fewer and fewer steps up the callstack before finding something that's been migrated already. Sometimes you luck out though and the compiler is just happy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed a lot of these in this PR - #1498
So if you get stuck on a lifetime error let's screen share.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I am curious to know the solution to this particular lifetime puzzle, I wouldn't want to bubble up lifetimes through the calling functions, given the difficulty I already had trying to do it.
All these clones are cheap, and insignificant compared to a network call, so this would be more a discussion on code style. On that, 'static
things have less of an annotational burden and most importantly are easier to work with, lifetimes would will generate more lifetime puzzles up the stack, which costs development time now and in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, my suggestion to use DynTryFuture
may be misguided here (though it's not wrong).
Better would be to return some kind of impl Future
. Box<dyn Future<..>>
is a hack to get around the fact that GAT is required to return impl Future
from traits. Since traits are not in play here, the Box
is unnecessary and the quickest way to get the impl Future
is just to use the async
keyword.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I strongly disagree with the points for not using lifetimes.
All these clones are cheap
Didn't you just fix a bug that was eating up gigabytes because of a seemingly innocent clone()
?. In the previous PR I just linked to there were a bunch of clones that were removed which were not cheap as well. The problem is that when the use of clone is ubiquitous it is easy to miss performance problems because the clones become noise.
(That gives me an idea for a trait CheapClone
which would be implemented for things like Arc
, but not Vec
- making the performance story more self-documenting)
Performance aside, a clone is semantically different than a shared reference. A clone gives you owned mutable data. A shared reference keeps things in sync.
The way that lifetimes work, with bubbling up the stack being the solution to the problem we are encouraged to use a consistent style across the codebase. Things become more difficult by trying to intermingle the two styles (by reference vs &'static), async, not async.
On that, 'static things have less of an annotational burden and most importantly are easier to work with
They are more familiar to you perhaps but are not easier to work with. The style of this codebase is more of an outlier in this regard. Once you start working with mutable data references are the only reasonable option. Don't be afraid of references. Working with them will make you a much better Rust developer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first thing I tried was making log_stream
an async fn
, but that resulted in the most inscrutable Rust error I've seen in while, it said "one type is more general than the other" 🤷♂️.
I've introduced a CheapClone
trait and used it in all these clones. The filter
was being cloned around a bunch so I put it in an Arc
. It's a vec of event signatures and contract addresses, I even benchmarked it out of curiosity to see the impact of cloning a structure like that.
On trying to use lifetimes on everything, if we had more async/await on the path between the task being spawned and this function I might feel more adventurous to try it, but that refactor would be disproportional to the size of the change in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ooh, I got that "more general" error once when learning Rust - for a completely different reason. This time it might have been needing to add async move.
Glad to see the CheapClone trait working out. The one thing I'd do there is add an #[inline]
attribute.
In any case, I'm happy with the code as-is. These and previous comments are all just possible areas for improvement so let's get this merged.
@@ -1042,15 +1035,17 @@ where | |||
from: u64, | |||
to: u64, | |||
log_filter: EthereumLogFilter, | |||
) -> Box<dyn Future<Item = Vec<Log>, Error = Error> + Send> { | |||
let eth = self.clone(); | |||
) -> Box<dyn std::future::Future<Output = Result<Vec<Log>, Error>> + Send + Unpin> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use DynTryFuture
here as well, and use boxed
instead of Box::new
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I did.
}), | ||
) | ||
} | ||
Ok(logs) => Ok(Some((logs, (end + 1, step)))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the nature of asking for too many logs that there are simply too many in that range, but that a future range may contain fewer logs and successfully complete with a larger step?
If so, I would consider increasing the step size on successful requests. Eg: Ok(Some((logs, (end + 1, max_step.min(step * 2)))))
. As the code is, the step size can (and will) only decrease the step size, slowing down and never recovering the speed. The max_step
variable would simply be the value set earlier at let step = match filter.contracts.is_empty() {
... This value would be both max_step
and step
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not convinced that it's simply the number of logs in all cases, the Alchemy rinkeby nodes where returning -32000 at any range over 500 blocks, even for the first blocks of the network. Hopefully this is something they can improve on their side.
For Infura it really is about the number of logs, but limit is now 10,000 logs which is well above our target triggers per range (currently 100 by default), so this should be a rare event anyways.
So I don't think it's worth having logic that tries to find the optimum range here, that's kinda what our adaptive block range already does, this is just a fail safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
// Typically this will loop only once and fetch the entire range in one request. But if the | ||
// node returns an error that signifies the request is to heavy to process, the range will | ||
// be broken down to smaller steps. | ||
futures03::stream::try_unfold((from, step), move |(start, step)| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked everything in here carefully for bugs and everything checks out. Nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! It's easy to make an off-by-one error here.
// respecting `LOG_STREAM_PARALLEL_CHUNKS`. | ||
let mut chunk_futures = vec![]; | ||
let mut low = start; | ||
for _ in 0..*LOG_STREAM_PARALLEL_CHUNKS { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reasoning behind dropping parallel chunks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My rationale is on the second paragraph of the PR description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
af736ca
to
fb2bcc3
Compare
This PR simplifies how fetch logs so it's more robust and so can better reason about how many requests are made when.
First, the
log_stream
function no longer makes requests in parallel if the first request fails. This used to be important for performance before we started splitting theeth_getLogs
requests, but those are split up now inlogs_in_block_range
so we already have a good level of parallelism. This is also hard on the Ethereum service, since we only make requests in parallel if we detect the node considered the original request too heavy so the range is split up, but if the original request was overloading the service it's possible that requesting the logs in parallel will also overload it, doing sequential requests gives the service a better chance to cope. This should help with the recent issues we've had with our Rinkeby service being rate limited due to these parallel requests.The other usage of the
ETHEREUM_PARALLEL_BLOCK_RANGES
env var was to limit the parallel eth_getLogs requests inlogs_in_block_range
. Only complex subgraphs would have enough requests to hit a limit here, and limiting parallelism at this level will hurt performance for those complex subgraphs. We already haveETHEREUM_RPC_MAX_PARALLEL_REQUESTS
to limit the total amount of parallel requests a node can make to an Ethereum endpoint, so in the spirit of having less but more meaningful configuration I removed that env var completely.The code around this was refactored a bit to use futures 0.3 and async/await.
I tested on the Moloch subgraph that syncing was done correctly.