diff --git a/Cargo.toml b/Cargo.toml index 4e8a0d4..4eaa6dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,4 +20,7 @@ members = [ "hdfs", "hdfs-examples", "hdfs-testing", -] \ No newline at end of file +] + +[patch.crates-io] +object_store = { git = "https://github.com/apache/arrow-rs.git", rev = "bff6155d38e19bfe62a776731b78b435560f2c8e" } diff --git a/hdfs/Cargo.toml b/hdfs/Cargo.toml index 1df6333..70ea610 100644 --- a/hdfs/Cargo.toml +++ b/hdfs/Cargo.toml @@ -46,5 +46,5 @@ chrono = { version = "0.4" } fs-hdfs = { version = "^0.1.11", optional = true } fs-hdfs3 = { version = "^0.1.11", optional = true } futures = "0.3" -object_store = "0.6.1" +object_store = { version = "0.6.1", features = ["cloud"] } tokio = { version = "1.18", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] } diff --git a/hdfs/src/object_store/hdfs.rs b/hdfs/src/object_store/hdfs.rs index 5a45641..f2fdc32 100644 --- a/hdfs/src/object_store/hdfs.rs +++ b/hdfs/src/object_store/hdfs.rs @@ -17,11 +17,11 @@ //! Object store that represents the HDFS File System. -use std::collections::{BTreeSet, VecDeque}; +use std::collections::{BTreeSet, HashMap, VecDeque}; use std::fmt::{Display, Formatter}; use std::ops::Range; use std::path::PathBuf; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use async_trait::async_trait; use bytes::Bytes; @@ -33,6 +33,7 @@ use object_store::{ path::{self, Path}, Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result, }; +use object_store::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl}; use tokio::io::AsyncWrite; /// scheme for HDFS File System @@ -111,6 +112,58 @@ impl Display for HadoopFileSystem { } } +struct HdfsMultiPartUpload { + location: Path, + hdfs: Arc, + content: Arc>>>, +} + +#[async_trait] +impl CloudMultiPartUploadImpl for HdfsMultiPartUpload { + async fn put_multipart_part(&self, buf: Vec, part_idx: usize) -> Result { + let mut content = self.content.lock().unwrap(); + content.insert(part_idx, buf); + + Ok(object_store::multipart::UploadPart { + content_id: part_idx.to_string(), + }) + } + + async fn complete(&self, _completed_parts: Vec) -> Result<(), std::io::Error> { + let hdfs = self.hdfs.clone(); + let location = HadoopFileSystem::path_to_filesystem(&self.location.clone()); + let content = self.content.clone(); + + maybe_spawn_blocking(move || { + let file = match hdfs.create_with_overwrite(&location, true) { + Ok(f) => f, + Err(e) => { + return Err(to_error(e)); + } + }; + + let content = content.lock().unwrap(); + // sort by hash key and put into file + let mut keys: Vec = content.keys().cloned().collect(); + keys.sort(); + + assert_eq!(keys[0], 0, "Missing part 0 for multipart upload"); + assert_eq!(keys[keys.len() - 1], keys.len() - 1, "Missing last part for multipart upload"); + + for key in keys { + let buf = content.get(&key).unwrap(); + file.write(buf.as_slice()).map_err(to_error)?; + } + + file.close().map_err(to_error)?; + + Ok(()) + }) + .await + .map_err(to_io_error) + } +} + #[async_trait] impl ObjectStore for HadoopFileSystem { // Current implementation is very simple due to missing configs, @@ -138,13 +191,20 @@ impl ObjectStore for HadoopFileSystem { async fn put_multipart( &self, - _location: &Path, + location: &Path, ) -> Result<(MultipartId, Box)> { - todo!() + let upload = HdfsMultiPartUpload { + location: location.clone(), + hdfs: self.hdfs.clone(), + content: Arc::new(Mutex::new(HashMap::new())), + }; + + Ok((MultipartId::default(), Box::new(CloudMultiPartUpload::new(upload, 8)))) } async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> { - todo!() + // Currently, the implementation doesn't put anything to HDFS until complete is called. + Ok(()) } async fn get(&self, location: &Path) -> Result { @@ -620,6 +680,27 @@ fn to_error(err: HdfsErr) -> Error { } } +fn to_io_error(err: Error) -> std::io::Error { + match err { + Error::Generic { store, source } => { + std::io::Error::new(std::io::ErrorKind::Other, format!("{}: {}", store, source)) + } + Error::NotFound { path, source } => { + std::io::Error::new(std::io::ErrorKind::NotFound, format!("{}: {}", path, source)) + } + Error::AlreadyExists { path, source } => { + std::io::Error::new(std::io::ErrorKind::AlreadyExists, format!("{}: {}", path, source)) + } + Error::InvalidPath { source } => { + std::io::Error::new(std::io::ErrorKind::InvalidInput, source) + } + + _ => { + std::io::Error::new(std::io::ErrorKind::Other, format!("HadoopFileSystem: {}", err)) + } + } +} + #[cfg(test)] mod tests { use super::*;