Skip to content

Ability to chunk download from object store #274

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
trungda opened this issue Jul 24, 2024 · 21 comments
Open

Ability to chunk download from object store #274

trungda opened this issue Jul 24, 2024 · 21 comments
Assignees
Labels
enhancement New feature or request

Comments

@trungda
Copy link
Contributor

trungda commented Jul 24, 2024

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
When downloading large objects (> 300MBs) using object_store crate, I often hit timeout using the default configuration (30 seconds connection timeout). Interestingly, when increasing the timeout, the download speed is actual lower (not sure if it's the same for everyone?).

Describe the solution you'd like
I am thinking if it makes sense to chunk a file into smaller ranges (say, 100MB each), and in parallel, download each range with different connection and reconcile them under the same interface.

Describe alternatives you've considered
Not sure if such a capability can be composed using the existing interfaces.

Additional context

@trungda trungda added the enhancement New feature or request label Jul 24, 2024
@trungda
Copy link
Contributor Author

trungda commented Jul 24, 2024

I originally submitted this issue in the datafusion repo which I think is the wrong repo. Quote reply from @alamb

Thank you @trungda

I think it would be very interesting to build a "parallel downloader" ObjectStore implementation, though I am not sure it necessairly belongs in the core object_store crate (though it could be added if there is enough interest)

There might also be some interesting ideas to explore around "racing reads" to avoid latency

There are many good ideas in this paper, BTW: https://dl.acm.org/doi/10.14778/3611479.3611486

I think you could compose this kind of smart client from the existing interfaces

@tustvold
Copy link
Contributor

It should be relatively straightforward to achieve this using buffer_ordered from the futures crate, we may just need to document how to do this

@alamb
Copy link
Contributor

alamb commented Jul 25, 2024

Maybe it would make a good example

@trungda
Copy link
Contributor Author

trungda commented Jul 25, 2024

I can write an example. Using buffered is what we are doing to download multiple files concurrently. Something like this:

let parallelism = 10;
let mut downloaders = Vec::new();
for path in paths.iter() {
  downloaders.push(download(path)); <----This downloads the whole file.
}
let mut buffered = stream.buffered(parallelism);
while let Some(_) = buffered.next().await {}

But it's not obvious for me how to use the stream interface with bufferred, i.e., how can we reconcile different streams (from different parts of the file) into one stream, but is it something really needed?

@alamb
Copy link
Contributor

alamb commented Jul 26, 2024

how can we reconcile different streams (from different parts of the file) into one stream

I was imagining that it would look something like making multiple calls to ObjectStore::get_ranges for each file

@alamb
Copy link
Contributor

alamb commented Mar 20, 2025

@alamb
Copy link
Contributor

alamb commented Mar 20, 2025

I believe @crepererum is working on something like this, called "chunked downloading"

@alamb alamb transferred this issue from apache/arrow-rs Mar 20, 2025
@crepererum crepererum self-assigned this Mar 21, 2025
@crepererum
Copy link
Contributor

I do. We have code for that at InfluxData and I plan to upstream this in the following order:

  1. mock helper, to easier test object store wrappers
  2. an extension to tell object store wrappers a pre-known size of object (because we extensively use that for caching and it avoids that a normal ObjectStore::get request needs an initial head request to know what it should be chunking)
  3. the actual chunking implementation

@tustvold
Copy link
Contributor

tustvold commented Mar 21, 2025

FWIW my preference would be to build this into the store implementations, e.g. into GetClient, as opposed to adding further wrapper types. I'd very much like to move away from wrapping things at the ObjectStore interface.

Edit: Actually my real preference would be to build this into something akin to the buffered interfaces as opposed to baking it into ObjectStore at all. This would allow for out of order chunking, avoid the issue of providing size and Etag information, and generally be far more flexible...

@alamb
Copy link
Contributor

alamb commented Mar 21, 2025

FWIW my preference would be to build this into the store implementations, e.g. into GetClient, as opposed to adding further wrapper types. I'd very much like to move away from wrapping things at the ObjectStore interface.

Edit: Actually my real preference would be to build this into something akin to the buffered interfaces as opposed to baking it into ObjectStore at all. This would allow for out of order chunking, avoid the issue of providing size and Etag information, and generally be far more flexible...

What do you mean by "buffered interfaces" ?

I mean a more general implementation sounds great, but if we have one that is implemented as an ObjectStore wrapper that also seems fine to me (we could potentially always work on the more flexible implementation later)

@tustvold
Copy link
Contributor

tustvold commented Mar 21, 2025

What do you mean by "buffered interfaces" ?

I am referring to things like BufReader.

if we have one that is implemented as an ObjectStore wrapper that also seems fine to me

My understanding from Marco's comment is that we would need to use the extension mechanism in order to get the size (and possibly ETag) through to the wrapper. Given this already implies a non-standard invocation of the ObjectStore::get API by the caller, I don't really see the advantage over using a separate utility helper akin to BufReader in order to achieve this. We avoid overloading the ObjectStore interface, can return data out of order, and have a more clean and focused API.

a more general implementation sounds great

TBC I am not suggesting an initial cut needs to implement all of the above, but that we should adopt an approach to this issue that allows for this down the line. Tbh the utility approach should be significantly simpler than an ObjectStore wrapper.

@alamb
Copy link
Contributor

alamb commented Mar 21, 2025

The utility approach certainly looked nice with an alternate tokio runtime

@crepererum
Copy link
Contributor

I also don't really follow the vague interface design that @tustvold proposes here. TBH I have an implementation that works. It's pragmatic, maybe not the most beautiful one, but it works.

@tustvold
Copy link
Contributor

I also don't really follow the vague interface design that @tustvold proposes here

As a very first cut one could take your existing implementation that yields a stream and wrap it up as follows

pub struct ChunkedDownloader {}

impl ChunkedDownloader {
    pub fn new(store: Arc<dyn ObjectStore>, meta: &ObjectMeta) -> Self {  ... }

    pub fn with_chunk_size() {...}

    pub fn with_max_concurrency() {...}

    pub fn into_stream(self) -> impl Stream<Result<Bytes>> 
}

One could in future envision extending that with additional functionality, for example

pub async fn sink<W: AsyncWrite + AsyncSeek>(self, out: W) -> Result<()> {...}

pub fn into_unordered_stream(self) -> impl Stream<Result<(u64, Bytes)>>

@crepererum
Copy link
Contributor

So we're back to the discussion of generic/exchangeable interfaces and the issue that the chunked downloader provides a rather large integration churn.

@tustvold
Copy link
Contributor

tustvold commented Mar 26, 2025

rather large integration churn

Isn't churn unavoidable as you need some way to provide the size of the target object and its version information? No approach is going to be a drop-in replacement?

@crepererum
Copy link
Contributor

rather large integration churn

Isn't churn unavoidable as you need some way to provide the size of the target object and its version information? No approach is going to be a drop-in replacement?

The version information are NOT required. You get them from the individual sub-GETs and just need to compare them.

The size is required if you want to be ideal, but you can also send an initial HEAD request and I would argue that for a 100MB file and 16MB chunks, that's still better than not chunking at all. So that extension is (from a user PoV) optional.

@tustvold
Copy link
Contributor

tustvold commented Mar 26, 2025

In which case what did you think of my initial suggestion of adding this logic to GetClient?

Also FWIW one could just fetch the first chunk instead of doing a separate head request, so long as some of the range requested is satisfiable it should be ok.

I also suspect many users will already be issuing range requests, in which case we just chunk that if necessary.

Edit: actually going back to the original description the request is for files on the order of 100s of MB, an approach that returns in order runs the risk of pulling the entire file into memory... This feels problematic...

@crepererum
Copy link
Contributor

crepererum commented Mar 27, 2025

Also FWIW one could just fetch the first chunk instead of doing a separate head request, so long as some of the range requested is satisfiable it should be ok.

Can you request a range that is potentially larger than the file?

Edit: actually going back to the original description the request is for files on the order of 100s of MB, an approach that returns in order runs the risk of pulling the entire file into memory... This feels problematic...

I think it depends on your concrete deployment / integration. Enabling chunking also increases cost, so I wouldn't do that by default anyways. Also you can chunk even for very large files if you use a buffered-stream like approach. That enables concurrency while limiting the maximum number of pre-fetched chunks.

In which case what did you think of my initial suggestion of adding this logic to GetClient?

I think that could work. I'm just a bit afraid that we have so much complexity/features fully baked into the builtin clients that it is impossible for people to write their own implementations / backends.

@tustvold
Copy link
Contributor

Can you request a range that is potentially larger than the file?

According to the HTTP spec, yes, but I have not tested it.

I'm just a bit afraid that we have so much complexity/features fully baked into the builtin clients that it is impossible for people to write their own implementations / backends.

Provided we encapsulate the functionality, we should be able to expose it much like we expose coalesce_ranges, without needing to expose GetClient itself.

@crepererum
Copy link
Contributor

Sounds like a fair deal to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants