diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 4d8d8f02a0bc..5d8cda68dbd9 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1099,6 +1099,46 @@ impl GetResult { } } +/// Configuration for controlling transfer behavior. +#[derive(Debug, Clone, Copy)] +pub struct TransferOptions { + /// Maximum number of concurrent chunks to transfer. + pub concurrent_tasks: usize, + /// Maximum number of chunks to buffer in memory during the transfer. + /// Defaults to `concurrent_tasks` if `None`. + pub buffer_capacity: Option, + /// Maximum number of retries for a chunk transfer. + pub max_retries: Option, +} + +impl TransferOptions { + /// Creates a new `TransferOptions` with the specified parameters. + pub fn new( + concurrent_tasks: usize, + buffer_capacity: Option, + max_retries: Option, + ) -> Self { + let buffer_capacity = buffer_capacity.or(Some(concurrent_tasks)); + let max_retries = max_retries.or(Some(3)); + Self { + concurrent_tasks, + buffer_capacity, + max_retries, + } + } +} + +/// Default implementation for `TransferOptions`. +impl Default for TransferOptions { + fn default() -> Self { + Self { + concurrent_tasks: 1, + buffer_capacity: Some(1), + max_retries: None, + } + } +} + /// Configure preconditions for the put operation #[derive(Debug, Clone, PartialEq, Eq, Default)] pub enum PutMode { diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 78fce9c26224..aa1339ef7722 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -38,8 +38,9 @@ use crate::{ maybe_spawn_blocking, path::{absolute_path_to_url, Path}, util::InvalidGetRange, - Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, - ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart, + Attributes, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload, + ObjectMeta, ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, + TransferOptions, UploadPart, }; /// A specialized `Error` for filesystem object store-related errors @@ -155,6 +156,30 @@ pub(crate) enum Error { #[snafu(display("Upload aborted"))] Aborted, + + #[snafu(display("Failed to seek to position in file: {}", source))] + SeekFile { + source: io::Error, + }, + + #[snafu(display("Failed to write to file: {}", source))] + WriteFile { + source: io::Error, + }, + + #[snafu(display("Failed to send to channel: {}", source))] + UnableSendToChannel { + source: Box, + }, + + #[snafu(display("Failed to download file from '{}': {}", path, source))] + DownloadFile { + source: io::Error, + path: String, + }, + + #[snafu(display("Download was aborted by user or system"))] + DownloadAborted, } impl From for super::Error { @@ -995,6 +1020,231 @@ fn convert_metadata(metadata: Metadata, location: Path) -> Result { }) } +/// Downloads a single chunk from the object store. +/// +/// # Arguments +/// - `store`: A reference-counted `ObjectStore` instance used for downloading. +/// - `location`: The path in the object store to download from. +/// - `opts`: Options for the download request, such as range and metadata preferences. +/// - `sender`: A sender for transmitting downloaded data chunks to the processing pipeline. +/// - `cancellation_alert`: A watch receiver to monitor for cancellation signals, allowing +/// the download to abort gracefully if needed. +/// - `max_retries`: The maximum number of retry attempts for a failed download. +/// +/// # Returns +/// A `Result` indicating success or failure. On success, it returns `()`; on failure, it returns +/// an `Error`. +/// +/// # Details +/// The function listens for cancellation signals using `cancellation_alert` and exits early +/// if a cancellation is detected. It also retries download attempts up to `max_retries` in case +/// of transient errors. +async fn download_chunk( + store: Arc, + location: Path, + opts: GetOptions, + sender: tokio::sync::mpsc::Sender<(usize, Bytes)>, + cancellation_alert: tokio::sync::watch::Receiver, + max_retries: usize, +) -> Result<(), Error> { + let mut attempt = 0; + let request = store + .get_opts(&location, opts) + .await + .map_err(|e| Error::Metadata { + source: e.into(), + path: location.to_string(), + })?; + + if let GetResultPayload::Stream(mut stream) = request.payload { + let mut offset = request.range.start; + 'download_chunk: while !*cancellation_alert.borrow() { + let buffer = match stream.try_next().await { + Ok(Some(buffer)) => buffer, + Ok(None) => break 'download_chunk, + Err(_) if attempt < max_retries => { + attempt += 1; + continue; + } + Err(e) => { + return Err(Error::DownloadFile { + source: e.into(), + path: location.to_string(), + }); + } + }; + + let bytes_readed = buffer.len(); + sender + .send((offset, buffer)) + .await + .map_err(|e| Error::UnableSendToChannel { source: e.into() })?; + offset += bytes_readed; + } + + if *cancellation_alert.borrow() { + return Err(Error::DownloadAborted); + } + } + Ok(()) +} + +/// Writes multiple chunks of downloaded data into a local file. +/// +/// # Arguments +/// - `file`: A mutable reference to the target local file for writing. +/// - `receiver`: A mutable receiver for fetching data chunks from the download pipeline. +/// - `cancellation_alert`: A watch receiver to monitor for cancellation signals, allowing +/// the writing process to abort gracefully if needed. +/// +/// # Returns +/// A `Result` containing the total number of bytes written or an error if the write operation fails. +/// +/// # Details +/// This function listens for cancellation signals via `cancellation_alert` and stops processing +/// further chunks if cancellation is detected. It ensures that no unnecessary writes are performed +/// once a cancellation request is received. + +async fn write_multi_chunks( + file: &mut File, + receiver: &mut tokio::sync::mpsc::Receiver<(usize, Bytes)>, + cancellation_alert: tokio::sync::watch::Receiver, +) -> Result { + let mut data = 0; + + while let Some((offset, buffer)) = receiver.recv().await { + if *cancellation_alert.borrow() { + return Err(Error::DownloadAborted); + } + + file.seek(SeekFrom::Start(offset as u64)) + .map_err(|e| Error::SeekFile { source: e })?; + file.write_all(&buffer) + .map_err(|e| Error::WriteFile { source: e })?; + + data += buffer.len() as u64; + } + + Ok(data) +} + +/// Downloads a file from the object store to a local file. +/// +/// # Arguments +/// - `store`: A reference-counted `ObjectStore` instance used for the download. +/// - `location`: The path in the object store to download from. +/// - `opts`: Options for the download request. +/// - `file`: A mutable reference to the local file where the downloaded data will be written. +/// - `transfer_opts`: Optional transfer configuration for managing concurrent downloads and memory usage. +/// +/// # Details +/// The function determines the number of concurrent tasks (`concurrent_tasks`) and the size of the +/// channel buffer (`channel_size`). If `buffer_capacity` is not provided, it defaults to the value of +/// `concurrent_tasks`. The total size of the file is divided into chunks, and each chunk is +/// processed by a concurrent task. +/// +/// The download process listens for cancellation signals via a shared `cancellation_alert`. If a +/// cancellation is requested, the process terminates early, and no further data is downloaded or written. +/// +/// # Returns +/// A `Result` containing the total number of bytes written to the local file or an error if the download fails. +/// +/// # Notes +/// This function spawns concurrent tasks for downloading and writing data chunks. The `write_multi_chunks` +/// task ensures that cancellation signals are respected, stopping all operations promptly. + +pub async fn download( + store: Arc, + location: &Path, + opts: GetOptions, + mut file: std::fs::File, + transfer_opts: Option<&TransferOptions>, +) -> Result { + let req = store.get_opts(&location, opts.clone()).await?; + let transfer_opts = *transfer_opts.unwrap_or(&TransferOptions::default()); + let concurrent_tasks = transfer_opts.concurrent_tasks; + let channel_size = transfer_opts.buffer_capacity.unwrap_or(concurrent_tasks); + let mut written_bytes: u64 = 0; + match req.payload { + GetResultPayload::Stream(_) => { + let obj_size = req.meta.size; + let chunk_size = (obj_size as f64 / concurrent_tasks as f64).ceil() as usize; + let (sender, mut receiver) = tokio::sync::mpsc::channel(channel_size); + let (notify_cancellation, cancellation_alert) = tokio::sync::watch::channel(false); + + #[derive(Debug)] + enum TaskResult { + Download(Result<(), Error>), + Write(Result), + } + + let mut tasks = tokio::task::JoinSet::new(); + for i in 0..transfer_opts.concurrent_tasks { + let chunk_start = i * chunk_size; + let chunk_end = std::cmp::min((i + 1) * chunk_size - 1, obj_size - 1); + let ranged_opts = GetOptions { + range: Some(GetRange::Bounded(chunk_start..(chunk_end + 1)).into()), + ..opts.clone() + }; + let location_clone = location.clone(); + let sender_clone = sender.clone(); + let store_clone = Arc::clone(&store); + let max_retries = transfer_opts.max_retries.unwrap_or(0); + let cancellation_alert_clone = cancellation_alert.clone(); + tasks.spawn(async move { + TaskResult::Download( + download_chunk( + store_clone, + location_clone, + ranged_opts, + sender_clone, + cancellation_alert_clone, + max_retries, + ) + .await, + ) + }); + } + drop(sender); + tasks.spawn(async move { + TaskResult::Write( + write_multi_chunks(&mut file, &mut receiver, cancellation_alert).await, + ) + }); + + while let Some(result) = tasks.join_next().await { + match result { + Ok(TaskResult::Download(Ok(()))) => {} + Ok(TaskResult::Download(Err(e))) => { + eprintln!("Error en descarga: {:?}", e); + let _ = notify_cancellation.send(true); + return Err(e.into()); + } + Ok(TaskResult::Write(Ok(bytes))) => { + written_bytes = bytes; + } + Ok(TaskResult::Write(Err(e))) => { + eprintln!("Error en escritura: {:?}", e); + let _ = notify_cancellation.send(true); + return Err(e.into()); + } + Err(e) => { + eprintln!("Error crítico al ejecutar una tarea: {:?}", e); + let _ = notify_cancellation.send(true); + return Err(e.into()); + } + } + } + } + GetResultPayload::File(mut source_file, _path) => { + let mut file = file.try_clone().unwrap(); + written_bytes = std::io::copy(&mut source_file, &mut file) + .map_err(|e| Error::WriteFile { source: e })?; + } + } + Ok(written_bytes) +} + #[cfg(unix)] /// We include the inode when available to yield an ETag more resistant to collisions /// and as used by popular web servers such as [Apache](https://httpd.apache.org/docs/2.2/mod/core.html#fileetag)