diff --git a/iroh-api/src/api.rs b/iroh-api/src/api.rs index 947dd786c..76b504638 100644 --- a/iroh-api/src/api.rs +++ b/iroh-api/src/api.rs @@ -11,6 +11,7 @@ use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use iroh_resolver::resolver::Resolver; use iroh_rpc_client::{Client, ClientStatus}; +use iroh_unixfs::balanced_tree::DEFAULT_CODE; use iroh_unixfs::{ builder::Entry as UnixfsEntry, content_loader::{FullLoader, FullLoaderConfig}, @@ -182,15 +183,19 @@ impl Api { ) -> Result>> { let blocks = match entry { UnixfsEntry::File(f) => f.encode().await?.boxed(), - UnixfsEntry::Directory(d) => d.encode(), + UnixfsEntry::Directory(d) => d.encode(DEFAULT_CODE), UnixfsEntry::Symlink(s) => Box::pin(async_stream::try_stream! { - yield s.encode()? + yield s.encode(DEFAULT_CODE)? + }), + UnixfsEntry::RawBlock(r) => Box::pin(async_stream::try_stream! { + yield r.encode()? }), }; - Ok(Box::pin( - add_blocks_to_store(Some(self.client.clone()), blocks).await, - )) + Ok(Box::pin(add_blocks_to_store( + Some(self.client.clone()), + blocks, + ))) } /// The `add` method encodes the entry into a DAG and adds the resulting diff --git a/iroh-api/src/store.rs b/iroh-api/src/store.rs index 921f39e1a..96e471f7a 100644 --- a/iroh-api/src/store.rs +++ b/iroh-api/src/store.rs @@ -85,7 +85,7 @@ fn add_blocks_to_store_chunked( } } -pub async fn add_blocks_to_store( +pub fn add_blocks_to_store( store: Option, blocks: Pin> + Send>>, ) -> impl Stream> { diff --git a/iroh-bitswap/src/lib.rs b/iroh-bitswap/src/lib.rs index ae1b3f1ba..f58fc056f 100644 --- a/iroh-bitswap/src/lib.rs +++ b/iroh-bitswap/src/lib.rs @@ -77,21 +77,16 @@ pub struct Bitswap { _workers: Arc>>, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] enum PeerState { Connected(ConnectionId), Responsive(ConnectionId, ProtocolId), Unresponsive, + #[default] Disconnected, DialFailure(Instant), } -impl Default for PeerState { - fn default() -> Self { - PeerState::Disconnected - } -} - impl PeerState { fn is_connected(self) -> bool { matches!(self, PeerState::Connected(_) | PeerState::Responsive(_, _)) @@ -146,7 +141,7 @@ impl Bitswap { }; let client = Client::new(network.clone(), store, cb, config.client).await; - let (sender_msg, mut receiver_msg) = mpsc::channel(2048); + let (sender_msg, mut receiver_msg) = mpsc::channel::<(PeerId, BitswapMessage)>(2048); let (sender_con, mut receiver_con) = mpsc::channel(2048); let (sender_dis, mut receiver_dis) = mpsc::channel(2048); @@ -157,7 +152,13 @@ impl Bitswap { async move { // process messages serially but without blocking the p2p loop - while let Some((peer, message)) = receiver_msg.recv().await { + while let Some((peer, mut message)) = receiver_msg.recv().await { + let message = tokio::task::spawn_blocking(move || { + message.verify_blocks(); + message + }) + .await + .expect("cannot spawn blocking thread"); if let Some(ref server) = server { futures::future::join( client.receive_message(&peer, &message), @@ -490,14 +491,8 @@ impl NetworkBehaviour for Bitswap { } } } - HandlerEvent::Message { - mut message, - protocol, - } => { - // mark peer as responsive + HandlerEvent::Message { message, protocol } => { self.set_peer_state(&peer_id, PeerState::Responsive(connection, protocol)); - - message.verify_blocks(); self.receive_message(peer_id, message); } HandlerEvent::FailedToSendMessage { .. } => { diff --git a/iroh-gateway/assets/404.html b/iroh-gateway/assets/404.html index e2d50089e..d49656a96 100644 --- a/iroh-gateway/assets/404.html +++ b/iroh-gateway/assets/404.html @@ -21,8 +21,8 @@ - - + + {{ root_path }} diff --git a/iroh-gateway/assets/dir_list.html b/iroh-gateway/assets/dir_list.html index 1a3b2ace0..3bbf98332 100644 --- a/iroh-gateway/assets/dir_list.html +++ b/iroh-gateway/assets/dir_list.html @@ -18,8 +18,8 @@ - - + + {{ root_path }} diff --git a/iroh-gateway/src/client.rs b/iroh-gateway/src/client.rs index 3873a70a5..d5fcba74b 100644 --- a/iroh-gateway/src/client.rs +++ b/iroh-gateway/src/client.rs @@ -6,7 +6,7 @@ use anyhow::Result; use bytes::Bytes; use cid::Cid; use futures::{StreamExt, TryStream}; -use http::HeaderMap; +use http::{HeaderMap, StatusCode}; use iroh_car::{CarHeader, CarWriter}; use iroh_metrics::{ core::{MObserver, MRecorder}, @@ -15,15 +15,18 @@ use iroh_metrics::{ resolver::OutMetrics, }; use iroh_resolver::dns_resolver::Config; -use iroh_resolver::resolver::{CidOrDomain, Metadata, Out, OutPrettyReader, OutType, Resolver}; +use iroh_resolver::resolver::{Metadata, Out, OutPrettyReader, OutType, Resolver}; use iroh_unixfs::{content_loader::ContentLoader, Source}; use mime::Mime; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWrite}; use tokio_util::io::ReaderStream; use tracing::{info, warn}; +use crate::constants::RECURSION_LIMIT; +use crate::error::GatewayError; +use crate::handler_params::GetParams; +use crate::ipfs_request::IpfsRequest; use crate::response::ResponseFormat; -use crate::{constants::RECURSION_LIMIT, handler_params::GetParams}; #[derive(Debug, Clone)] pub struct Client { @@ -95,26 +98,58 @@ impl Client { } } + #[tracing::instrument(skip(self))] + pub async fn build_ipfs_request( + &self, + path: &iroh_resolver::resolver::Path, + query_params: &GetParams, + format: ResponseFormat, + subdomain_mode: bool, + ) -> Result { + info!("build ipfs request {}", path); + let path_metadata = match self + .retrieve_path_metadata(path.clone(), format == ResponseFormat::Raw) + .await + { + Ok(metadata) => metadata, + Err(e) => { + if e == "offline" { + return Err(GatewayError::new(StatusCode::SERVICE_UNAVAILABLE, &e)); + } else if e.starts_with("failed to find") { + return Err(GatewayError::new(StatusCode::NOT_FOUND, &e)); + } else { + return Err(GatewayError::new(StatusCode::INTERNAL_SERVER_ERROR, &e)); + } + } + }; + Ok(IpfsRequest { + format, + cid: path.root().clone(), + resolved_path: path.clone(), + query_params: query_params.clone(), + subdomain_mode, + path_metadata, + }) + } + #[tracing::instrument(skip(self))] pub async fn retrieve_path_metadata( &self, path: iroh_resolver::resolver::Path, - format: Option, + raw_format: bool, ) -> Result { info!("retrieve path metadata {}", path); - if let Some(f) = format { - if f == ResponseFormat::Raw { - return self - .resolver - .resolve_raw(path) - .await - .map_err(|e| e.to_string()); - } + if raw_format { + self.resolver + .resolve_raw(path) + .await + .map_err(|e| e.to_string()) + } else { + self.resolver.resolve(path).await.map_err(|e| e.to_string()) } - self.resolver.resolve(path).await.map_err(|e| e.to_string()) } - #[tracing::instrument(skip(self))] + #[tracing::instrument(skip(self, path_metadata))] pub async fn get_file( &self, path: iroh_resolver::resolver::Path, @@ -126,7 +161,7 @@ impl Client { let path_metadata = if let Some(path_metadata) = path_metadata { path_metadata } else { - self.retrieve_path_metadata(path.clone(), None).await? + self.retrieve_path_metadata(path.clone(), false).await? }; let metadata = path_metadata.metadata().clone(); record_ttfb_metrics(start_time, &metadata.source); @@ -241,28 +276,6 @@ impl Client { } } -#[derive(Debug, Clone)] -pub struct IpfsRequest { - pub format: ResponseFormat, - pub cid: CidOrDomain, - pub resolved_path: iroh_resolver::resolver::Path, - pub query_file_name: String, - pub download: bool, - pub query_params: GetParams, - pub subdomain_mode: bool, - pub path_metadata: Out, -} - -impl IpfsRequest { - pub fn request_path_for_redirection(&self) -> String { - if self.subdomain_mode { - self.resolved_path.to_relative_string() - } else { - self.resolved_path.to_string() - } - } -} - async fn fetch_car_recursive( resolver: &Resolver, path: iroh_resolver::resolver::Path, diff --git a/iroh-gateway/src/core.rs b/iroh-gateway/src/core.rs index 32b971721..55fc9f76f 100644 --- a/iroh-gateway/src/core.rs +++ b/iroh-gateway/src/core.rs @@ -130,6 +130,7 @@ mod tests { use iroh_rpc_client::Config as RpcClientConfig; use iroh_rpc_types::store::StoreAddr; use iroh_rpc_types::Addr; + use iroh_unixfs::balanced_tree::DEFAULT_CODE; use iroh_unixfs::builder::{DirectoryBuilder, FileBuilder}; use iroh_unixfs::content_loader::{FullLoader, FullLoaderConfig}; use iroh_unixfs::unixfs::UnixfsNode; @@ -214,13 +215,12 @@ mod tests { .name(name) .content_bytes(content.to_vec()) .build() - .await .unwrap(); dir_builder = dir_builder.add_file(file); } - let root_dir = dir_builder.build().await.unwrap(); - let mut parts = root_dir.encode(); + let root_dir = dir_builder.build().unwrap(); + let mut parts = root_dir.encode(DEFAULT_CODE); while let Some(part) = parts.next().await { let (cid, bytes, links) = part.unwrap().into_parts(); cids.push(cid); diff --git a/iroh-gateway/src/handlers.rs b/iroh-gateway/src/handlers.rs index 73343ee2f..c57a601fe 100644 --- a/iroh-gateway/src/handlers.rs +++ b/iroh-gateway/src/handlers.rs @@ -42,9 +42,10 @@ use crate::handler_params::{ inlined_dns_link_to_dns_link, recode_path_to_inlined_dns_link, DefaultHandlerPathParams, GetParams, SubdomainHandlerPathParams, }; +use crate::ipfs_request::IpfsRequest; use crate::text::IpfsSubdomain; use crate::{ - client::{FileResult, IpfsRequest}, + client::FileResult, constants::*, core::State, error::GatewayError, @@ -183,25 +184,12 @@ async fn request_preprocessing( // parse query params let format = get_response_format(request_headers, &query_params.format) .map_err(|err| GatewayError::new(StatusCode::BAD_REQUEST, &err))?; - - let path_metadata = match state + let ipfs_request = state .client - .retrieve_path_metadata(path.clone(), Some(format.clone())) - .await - { - Ok(metadata) => metadata, - Err(e) => { - if e == "offline" { - return Err(GatewayError::new(StatusCode::SERVICE_UNAVAILABLE, &e)); - } else if e.starts_with("failed to find") { - return Err(GatewayError::new(StatusCode::NOT_FOUND, &e)); - } else { - return Err(GatewayError::new(StatusCode::INTERNAL_SERVER_ERROR, &e)); - } - } - }; + .build_ipfs_request(path, query_params, format.clone(), subdomain_mode) + .await?; - let resolved_cid = path_metadata.metadata().resolved_path.last(); + let resolved_cid = ipfs_request.path_metadata.metadata().resolved_path.last(); let resolved_cid = match resolved_cid { Some(cid) => cid, None => { @@ -243,22 +231,9 @@ async fn request_preprocessing( }; response_headers.insert(&HEADER_X_IPFS_PATH, hv); - // handle request and fetch data - let req = IpfsRequest { - format, - cid: path.root().clone(), - resolved_path: path.clone(), - query_file_name: query_params - .filename - .as_deref() - .unwrap_or_default() - .to_string(), - download: query_params.download.unwrap_or_default(), - query_params: query_params.clone(), - subdomain_mode, - path_metadata, - }; - Ok(RequestPreprocessingResult::ShouldRequestData(Box::new(req))) + Ok(RequestPreprocessingResult::ShouldRequestData(Box::new( + ipfs_request, + ))) } pub async fn handler( @@ -604,9 +579,9 @@ async fn serve_raw( match body { FileResult::File(body) | FileResult::Raw(body) => { - let file_name = match req.query_file_name.is_empty() { + let file_name = match req.query_file_name().is_empty() { true => format!("{}.bin", req.cid), - false => req.query_file_name.clone(), + false => req.query_file_name().to_string(), }; set_content_disposition_headers(&mut headers, &file_name, DISPOSITION_ATTACHMENT); @@ -662,9 +637,9 @@ async fn serve_car( match body { FileResult::File(body) | FileResult::Raw(body) => { - let file_name = match req.query_file_name.is_empty() { + let file_name = match req.query_file_name().is_empty() { true => format!("{}.car", req.cid), - false => req.query_file_name.clone(), + false => req.query_file_name().to_string(), }; set_content_disposition_headers(&mut headers, &file_name, DISPOSITION_ATTACHMENT); @@ -700,9 +675,9 @@ async fn serve_car_recursive( .await .map_err(|e| GatewayError::new(StatusCode::INTERNAL_SERVER_ERROR, &e))?; - let file_name = match req.query_file_name.is_empty() { + let file_name = match req.query_file_name().is_empty() { true => format!("{}.car", req.cid), - false => req.query_file_name.clone(), + false => req.query_file_name().to_string(), }; set_content_disposition_headers(&mut headers, &file_name, DISPOSITION_ATTACHMENT); @@ -717,7 +692,7 @@ async fn serve_car_recursive( Ok(GatewayResponse::new(StatusCode::OK, body, headers)) } -#[tracing::instrument()] +#[tracing::instrument(skip_all)] #[async_recursion] async fn serve_fs( req: &IpfsRequest, @@ -783,9 +758,9 @@ async fn serve_fs( } let name = add_content_disposition_headers( &mut headers, - &req.query_file_name, + req.query_file_name(), &req.resolved_path, - req.download, + req.query_download(), ); if metadata.unixfs_type == Some(UnixfsType::Symlink) { headers.insert( @@ -831,9 +806,9 @@ async fn serve_fs( } let name = add_content_disposition_headers( &mut headers, - &req.query_file_name, + req.query_file_name(), &req.resolved_path, - req.download, + req.query_params.download.unwrap_or_default(), ); let content_sniffed_mime = body.get_mime(); add_content_type_headers(&mut headers, &name, content_sniffed_mime); @@ -842,7 +817,7 @@ async fn serve_fs( } } -#[tracing::instrument()] +#[tracing::instrument(skip_all)] async fn serve_fs_dir( dir_list: &[Link], req: &IpfsRequest, @@ -861,14 +836,22 @@ async fn serve_fs_dir( if !force_dir && has_index { if !req.resolved_path.has_trailing_slash() { let redirect_path = format!( - "{}/{}", + "{}{}", req.request_path_for_redirection(), req.query_params.to_query_string() ); return Ok(GatewayResponse::redirect_permanently(&redirect_path)); } - let mut new_req = req.clone(); - new_req.resolved_path.push("index.html"); + let modified_path = req.resolved_path.with_suffix("index.html"); + let new_req = state + .client + .build_ipfs_request( + &modified_path, + &req.query_params, + req.format.clone(), + req.subdomain_mode, + ) + .await?; return serve_fs(&new_req, state, headers, http_req, start_time).await; } diff --git a/iroh-gateway/src/ipfs_request.rs b/iroh-gateway/src/ipfs_request.rs new file mode 100644 index 000000000..f158fe3a4 --- /dev/null +++ b/iroh-gateway/src/ipfs_request.rs @@ -0,0 +1,31 @@ +use crate::handler_params::GetParams; +use crate::response::ResponseFormat; +use iroh_resolver::resolver::{CidOrDomain, Out}; + +#[derive(Debug)] +pub struct IpfsRequest { + pub format: ResponseFormat, + pub cid: CidOrDomain, + pub resolved_path: iroh_resolver::resolver::Path, + pub query_params: GetParams, + pub subdomain_mode: bool, + pub path_metadata: Out, +} + +impl IpfsRequest { + pub fn request_path_for_redirection(&self) -> String { + if self.subdomain_mode { + self.resolved_path.to_relative_string() + } else { + self.resolved_path.to_string() + } + } + + pub fn query_file_name(&self) -> &str { + self.query_params.filename.as_deref().unwrap_or_default() + } + + pub fn query_download(&self) -> bool { + self.query_params.download.unwrap_or_default() + } +} diff --git a/iroh-gateway/src/lib.rs b/iroh-gateway/src/lib.rs index 76d53efed..03f7816bd 100644 --- a/iroh-gateway/src/lib.rs +++ b/iroh-gateway/src/lib.rs @@ -9,6 +9,7 @@ mod error; pub mod handler_params; pub mod handlers; pub mod headers; +mod ipfs_request; pub mod metrics; pub mod response; mod rpc; diff --git a/iroh-p2p/src/node.rs b/iroh-p2p/src/node.rs index 7b9b128d0..df5a9e614 100644 --- a/iroh-p2p/src/node.rs +++ b/iroh-p2p/src/node.rs @@ -380,7 +380,7 @@ impl Node { let worker = tokio::task::spawn(async move { tokio::select! { _ = closer_r => { - // Explicit sesssion stop. + // Explicit session stop. debug!("session {}: stopped: closed", ctx); } _ = chan.closed() => { diff --git a/iroh-resolver/src/resolver.rs b/iroh-resolver/src/resolver.rs index 98aa59719..e7b4b3e0d 100644 --- a/iroh-resolver/src/resolver.rs +++ b/iroh-resolver/src/resolver.rs @@ -125,6 +125,12 @@ impl Path { self.tail.push(str.as_ref().to_owned()); } + pub fn with_suffix(&self, suffix: impl AsRef) -> Self { + let mut suffixed = self.clone(); + suffixed.push(suffix); + suffixed + } + // Empty path segments in the *middle* shouldn't occur, // though they can occur at the end, which `join` handles. // TODO(faassen): it would make sense to return a `RelativePathBuf` here at some diff --git a/iroh-resolver/tests/roundtrip.rs b/iroh-resolver/tests/roundtrip.rs index b509bdd16..170782efd 100644 --- a/iroh-resolver/tests/roundtrip.rs +++ b/iroh-resolver/tests/roundtrip.rs @@ -16,6 +16,7 @@ use std::collections::BTreeMap; use tokio::io::AsyncReadExt; use iroh_resolver::resolver::{read_to_vec, stream_to_resolver, Out, Resolver}; +use iroh_unixfs::balanced_tree::DEFAULT_CODE; #[derive(Debug, Clone, PartialEq, Eq)] enum TestDirEntry { @@ -37,8 +38,7 @@ async fn build_directory(name: &str, dir: &TestDir, hamt: bool) -> Result { @@ -47,7 +47,7 @@ async fn build_directory(name: &str, dir: &TestDir, hamt: bool) -> Result Result { let directory = build_directory("", &dir, hamt).await?; - let stream = directory.encode(); + let stream = directory.encode(DEFAULT_CODE); let (root, resolver) = stream_to_resolver(stream).await?; let stream = resolver.resolve_recursive_with_paths(iroh_resolver::resolver::Path::from_cid(root)); @@ -138,8 +138,7 @@ async fn file_roundtrip_test( .fixed_chunker(chunk_size) .degree(degree) .content_bytes(data.clone()) - .build() - .await?; + .build()?; let stream = file.encode().await?; let (root, resolver) = stream_to_resolver(stream).await?; let out = resolver @@ -156,7 +155,7 @@ async fn symlink_roundtrip_test() -> Result<()> { let target = "../../bar.txt"; builder.target(target); let sym = builder.build().await?; - let block = sym.encode()?; + let block = sym.encode(DEFAULT_CODE)?; let stream = async_stream::try_stream! { yield block; }; diff --git a/iroh-resolver/tests/unixfs.rs b/iroh-resolver/tests/unixfs.rs index 9cd62505a..abd6015ae 100644 --- a/iroh-resolver/tests/unixfs.rs +++ b/iroh-resolver/tests/unixfs.rs @@ -74,8 +74,7 @@ async fn test_dagger_testdata() -> Result<()> { .chunker(param.chunker.clone()) .degree(param.degree) .content_bytes(data.clone()) - .build() - .await?; + .build()?; let stream = file.encode().await?; let (root, resolver) = stream_to_resolver(stream).await?; let out = resolver.resolve(Path::from_cid(root)).await?; diff --git a/iroh-share/src/lib.rs b/iroh-share/src/lib.rs index 720134553..756199168 100644 --- a/iroh-share/src/lib.rs +++ b/iroh-share/src/lib.rs @@ -139,8 +139,7 @@ mod tests { let file_1 = FileBuilder::new() .name("bar.txt") .content_bytes(&b"bar"[..]) - .build() - .await?; + .build()?; let mut bytes = vec![0u8; 5 * 1024 * 1024 - 8]; rand::thread_rng().fill_bytes(&mut bytes); @@ -149,8 +148,7 @@ mod tests { let file_2 = FileBuilder::new() .name("baz.txt") .content_reader(f) - .build() - .await?; + .build()?; let dir_builder = DirectoryBuilder::new() .name("foo") .add_file(file_1) diff --git a/iroh-share/src/sender.rs b/iroh-share/src/sender.rs index 5b6174961..019ca3167 100644 --- a/iroh-share/src/sender.rs +++ b/iroh-share/src/sender.rs @@ -5,6 +5,7 @@ use bytes::Bytes; use futures::channel::oneshot::{channel as oneshot, Receiver as OneShotReceiver}; use futures::StreamExt; use iroh_p2p::{GossipsubEvent, NetworkEvent}; +use iroh_unixfs::balanced_tree::DEFAULT_CODE; use iroh_unixfs::builder::{DirectoryBuilder, FileBuilder}; use libp2p::gossipsub::Sha256Topic; use rand::Rng; @@ -58,14 +59,14 @@ impl Sender { } = self; let t = Sha256Topic::new(format!("iroh-share-{id}")); - let root_dir = dir_builder.build().await?; + let root_dir = dir_builder.build()?; let (done_sender, done_receiver) = oneshot(); let p2p_rpc = p2p.rpc().try_p2p()?; let store = p2p.rpc().try_store()?; let (root, num_parts) = { - let parts = root_dir.encode(); + let parts = root_dir.encode(DEFAULT_CODE); tokio::pin!(parts); let mut num_parts = 0; let mut root_cid = None; @@ -157,11 +158,7 @@ impl Sender { ) -> Result { let name = name.into(); // wrap in directory to preserve the name - let file = FileBuilder::new() - .name(name) - .content_bytes(data) - .build() - .await?; + let file = FileBuilder::new().name(name).content_bytes(data).build()?; let root_dir = DirectoryBuilder::new().add_file(file); self.transfer_from_dir_builder(root_dir).await diff --git a/iroh-unixfs/src/balanced_tree.rs b/iroh-unixfs/src/balanced_tree.rs index 9b11584ad..e62efd6b3 100644 --- a/iroh-unixfs/src/balanced_tree.rs +++ b/iroh-unixfs/src/balanced_tree.rs @@ -13,22 +13,26 @@ use crate::unixfs::{dag_pb, unixfs_pb, DataType, Node, UnixfsNode}; /// Default degree number for balanced tree, taken from unixfs specs /// pub const DEFAULT_DEGREE: usize = 174; +pub const DEFAULT_CODE: multihash::Code = multihash::Code::Sha2_256; #[derive(Debug, PartialEq, Eq)] pub enum TreeBuilder { /// TreeBuilder that builds a "balanced tree" with a max degree size of /// degree - Balanced { degree: usize }, + Balanced { + degree: usize, + code: multihash::Code, + }, } impl TreeBuilder { pub fn balanced_tree() -> Self { - Self::balanced_tree_with_degree(DEFAULT_DEGREE) + Self::balanced_tree_with_degree_and_code(DEFAULT_DEGREE, DEFAULT_CODE) } - pub fn balanced_tree_with_degree(degree: usize) -> Self { + pub fn balanced_tree_with_degree_and_code(degree: usize, code: multihash::Code) -> Self { assert!(degree > 1); - TreeBuilder::Balanced { degree } + TreeBuilder::Balanced { degree, code } } pub fn stream_tree( @@ -36,20 +40,30 @@ impl TreeBuilder { chunks: impl Stream> + Send, ) -> impl Stream> { match self { - TreeBuilder::Balanced { degree } => stream_balanced_tree(chunks, *degree), + TreeBuilder::Balanced { degree, code } => stream_balanced_tree(chunks, *degree, *code), } } } #[derive(Clone, Debug, PartialEq)] -struct LinkInfo { - raw_data_len: u64, - encoded_len: u64, +pub struct LinkInfo { + pub raw_data_len: u64, + pub encoded_len: u64, +} + +impl LinkInfo { + pub fn new(raw_data_len: u64, encoded_len: u64) -> LinkInfo { + LinkInfo { + raw_data_len, + encoded_len, + } + } } fn stream_balanced_tree( in_stream: impl Stream> + Send, degree: usize, + code: multihash::Code, ) -> impl Stream> { try_stream! { // degree = 8 @@ -80,8 +94,9 @@ fn stream_balanced_tree( let hash_par: usize = 8; let in_stream = in_stream.err_into::().map(|chunk| { - tokio::task::spawn_blocking(|| { - chunk.and_then(|chunk| TreeNode::Leaf(chunk).encode()) + let code = code.clone(); + tokio::task::spawn_blocking(move || { + chunk.and_then(|chunk| TreeNode::Leaf(chunk).encode(code)) }).err_into::() }).buffered(hash_par).map(|x| x.and_then(|x| x)); @@ -94,7 +109,7 @@ fn stream_balanced_tree( // check if the leaf node of the tree is full if tree[0].len() == degree { // if so, iterate through nodes - for i in 0..tree_len { + for i in 1..tree_len { // if we encounter any nodes that are not full, break if tree[i].len() < degree { break; @@ -108,20 +123,20 @@ fn stream_balanced_tree( // create node, keeping the cid let links = std::mem::replace(&mut tree[i], Vec::with_capacity(degree)); - let (block, link_info) = TreeNode::Stem(links).encode()?; + let (block, link_info) = TreeNode::Stem(links).encode(code)?; let cid = *block.cid(); yield block; // add link_info to parent node tree[i+1].push((cid, link_info)); } - // at this point the tree will be able to recieve new links + // at this point the tree will be able to receive new links // without "overflowing", aka the leaf node and stem nodes // have fewer than `degree` number of links } // now that we know the tree is in a "healthy" state to - // recieve more links, add the link to the tree + // receive more links, add the link to the tree tree[0].push((*block.cid(), link_info)); yield block; // at this point, the leaf node may have `degree` number of @@ -134,10 +149,10 @@ fn stream_balanced_tree( } // clean up, aka yield the rest of the stem nodes - // since all the stem nodes are able to recieve links + // since all the stem nodes are able to receive links // we don't have to worry about "overflow" while let Some(links) = tree.pop_front() { - let (block, link_info) = TreeNode::Stem(links).encode()?; + let (block, link_info) = TreeNode::Stem(links).encode(code)?; let cid = *block.cid(); yield block; @@ -191,18 +206,19 @@ fn create_unixfs_node_from_links(links: Vec<(Cid, LinkInfo)>) -> Result), } impl TreeNode { - fn encode(self) -> Result<(Block, LinkInfo)> { + pub fn encode(self, code: multihash::Code) -> Result<(Block, LinkInfo)> { match self { TreeNode::Leaf(bytes) => { let len = bytes.len(); let node = UnixfsNode::Raw(bytes); - let block = node.encode()?; + let block = node.encode(code)?; let link_info = LinkInfo { // in a leaf the raw data len and encoded len are the same since our leaf // nodes are raw unixfs nodes @@ -214,7 +230,7 @@ impl TreeNode { TreeNode::Stem(links) => { let mut encoded_len: u64 = links.iter().map(|(_, l)| l.encoded_len).sum(); let node = create_unixfs_node_from_links(links)?; - let block = node.encode()?; + let block = node.encode(code)?; encoded_len += block.data().len() as u64; let raw_data_len = node .filesize() @@ -253,7 +269,7 @@ mod tests { if num_chunks / degree == 0 { let chunk = chunks.next().await.unwrap().unwrap(); let leaf = TreeNode::Leaf(chunk); - let (block, _) = leaf.encode().unwrap(); + let (block, _) = leaf.encode(multihash::Code::Sha2_256).unwrap(); tree[0].push(block); return tree; } @@ -261,7 +277,7 @@ mod tests { while let Some(chunk) = chunks.next().await { let chunk = chunk.unwrap(); let leaf = TreeNode::Leaf(chunk); - let (block, link_info) = leaf.encode().unwrap(); + let (block, link_info) = leaf.encode(multihash::Code::Sha2_256).unwrap(); links[0].push((*block.cid(), link_info)); tree[0].push(block); } @@ -273,7 +289,7 @@ mod tests { let mut links_layer = Vec::with_capacity(count); for links in prev_layer.chunks(degree) { let stem = TreeNode::Stem(links.to_vec()); - let (block, link_info) = stem.encode().unwrap(); + let (block, link_info) = stem.encode(multihash::Code::Sha2_256).unwrap(); links_layer.push((*block.cid(), link_info)); tree_layer.push(block); } @@ -337,12 +353,14 @@ mod tests { fn make_leaf(data: usize) -> (Block, LinkInfo) { TreeNode::Leaf(BytesMut::from(&data.to_be_bytes()[..]).freeze()) - .encode() + .encode(multihash::Code::Sha2_256) .unwrap() } fn make_stem(links: Vec<(Cid, LinkInfo)>) -> (Block, LinkInfo) { - TreeNode::Stem(links).encode().unwrap() + TreeNode::Stem(links) + .encode(multihash::Code::Sha2_256) + .unwrap() } #[tokio::test] @@ -450,7 +468,7 @@ mod tests { async fn balanced_tree_test_leaf() { let num_chunks = 1; let expect = build_expect(num_chunks, 3).await; - let got = stream_balanced_tree(test_chunk_stream(1), 3); + let got = stream_balanced_tree(test_chunk_stream(1), 3, multihash::Code::Sha2_256); tokio::pin!(got); ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await; } @@ -460,7 +478,11 @@ mod tests { let num_chunks = 3; let degrees = 3; let expect = build_expect(num_chunks, degrees).await; - let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); + let got = stream_balanced_tree( + test_chunk_stream(num_chunks), + degrees, + multihash::Code::Sha2_256, + ); tokio::pin!(got); ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await; } @@ -470,7 +492,11 @@ mod tests { let degrees = 3; let num_chunks = 9; let expect = build_expect(num_chunks, degrees).await; - let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); + let got = stream_balanced_tree( + test_chunk_stream(num_chunks), + degrees, + multihash::Code::Sha2_256, + ); tokio::pin!(got); ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await; } @@ -480,7 +506,11 @@ mod tests { let degrees = 3; let num_chunks = 10; let expect = build_expect(num_chunks, degrees).await; - let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); + let got = stream_balanced_tree( + test_chunk_stream(num_chunks), + degrees, + multihash::Code::Sha2_256, + ); tokio::pin!(got); ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await; } @@ -490,7 +520,11 @@ mod tests { let num_chunks = 125; let degrees = 5; let expect = build_expect(num_chunks, degrees).await; - let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); + let got = stream_balanced_tree( + test_chunk_stream(num_chunks), + degrees, + multihash::Code::Sha2_256, + ); tokio::pin!(got); ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await; } @@ -500,7 +534,11 @@ mod tests { let num_chunks = 780; let degrees = 11; let expect = build_expect(num_chunks, degrees).await; - let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); + let got = stream_balanced_tree( + test_chunk_stream(num_chunks), + degrees, + multihash::Code::Sha2_256, + ); tokio::pin!(got); ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await; } diff --git a/iroh-unixfs/src/builder.rs b/iroh-unixfs/src/builder.rs index 89b574d78..486da9295 100644 --- a/iroh-unixfs/src/builder.rs +++ b/iroh-unixfs/src/builder.rs @@ -15,6 +15,7 @@ use futures::{ use prost::Message; use tokio::io::AsyncRead; +use crate::balanced_tree::DEFAULT_CODE; use crate::{ balanced_tree::{TreeBuilder, DEFAULT_DEGREE}, chunker::{self, Chunker, ChunkerConfig, DEFAULT_CHUNK_SIZE_LIMIT}, @@ -92,9 +93,9 @@ impl Directory { Directory::single("".into(), Entry::Directory(self)) } - pub async fn encode_root(self) -> Result { + pub async fn encode_root(self, code: multihash::Code) -> Result { let mut current = None; - let parts = self.encode(); + let parts = self.encode(code); tokio::pin!(parts); while let Some(part) = parts.next().await { @@ -104,33 +105,35 @@ impl Directory { current.expect("must not be empty") } - pub fn encode<'a>(self) -> BoxStream<'a, Result> { + pub fn encode<'a>(self, code: multihash::Code) -> BoxStream<'a, Result> { match self { - Directory::Basic(basic) => basic.encode(), - Directory::Hamt(hamt) => hamt.encode(), + Directory::Basic(basic) => basic.encode(code), + Directory::Hamt(hamt) => hamt.encode(code), } } } impl BasicDirectory { - pub fn encode<'a>(self) -> BoxStream<'a, Result> { + pub fn encode<'a>(self, code: multihash::Code) -> BoxStream<'a, Result> { async_stream::try_stream! { let mut links = Vec::new(); for entry in self.entries { let name = entry.name().to_string(); - let parts = entry.encode().await?; + let parts = entry.encode(code).await?; tokio::pin!(parts); let mut root = None; + let mut size = 0u64; while let Some(part) = parts.next().await { let block = part?; root = Some(block.clone()); + size += block.data().len() as u64; yield block; } let root_block = root.expect("file must not be empty"); links.push(dag_pb::PbLink { hash: Some(root_block.cid().to_bytes()), name: Some(name), - tsize: Some(root_block.data().len() as u64), + tsize: Some(size), }); } @@ -141,15 +144,15 @@ impl BasicDirectory { }; let outer = encode_unixfs_pb(&inner, links)?; let node = UnixfsNode::Directory(Node { outer, inner }); - yield node.encode()?; + yield node.encode(code)?; } .boxed() } } impl HamtDirectory { - pub fn encode<'a>(self) -> BoxStream<'a, Result> { - self.hamt.encode() + pub fn encode<'a>(self, code: multihash::Code) -> BoxStream<'a, Result> { + self.hamt.encode(code) } } @@ -260,7 +263,7 @@ impl Symlink { &self.name } - pub fn encode(self) -> Result { + pub fn encode(self, code: multihash::Code) -> Result { let target = self .target .to_str() @@ -273,7 +276,38 @@ impl Symlink { }; let outer = encode_unixfs_pb(&inner, Vec::new())?; let node = UnixfsNode::Symlink(Node { outer, inner }); - node.encode() + node.encode(code) + } +} + +/// Representation of a raw block +#[derive(Debug, PartialEq, Eq)] +pub struct RawBlock { + name: String, + block: Block, +} + +impl RawBlock { + pub fn new(name: &str, block: Block) -> Self { + RawBlock { + name: name.to_string(), + block, + } + } + pub fn name(&self) -> &str { + &self.name + } + + pub fn into_block(self) -> Block { + self.block + } + + pub fn wrap(self) -> Directory { + Directory::single("".into(), Entry::RawBlock(self)) + } + + pub fn encode(self) -> Result { + Ok(self.into_block()) } } @@ -284,6 +318,7 @@ pub struct FileBuilder { reader: Option>>, chunker: Chunker, degree: usize, + code: multihash::Code, } impl Default for FileBuilder { @@ -294,6 +329,7 @@ impl Default for FileBuilder { reader: None, chunker: Chunker::Fixed(chunker::Fixed::default()), degree: DEFAULT_DEGREE, + code: DEFAULT_CODE, } } } @@ -353,6 +389,11 @@ impl FileBuilder { self } + pub fn code(mut self, code: multihash::Code) -> Self { + self.code = code; + self + } + pub fn content_bytes>(mut self, content: B) -> Self { let bytes = content.into(); self.reader = Some(Box::pin(std::io::Cursor::new(bytes))); @@ -364,10 +405,10 @@ impl FileBuilder { self } - pub async fn build(self) -> Result { + pub fn build(self) -> Result { let degree = self.degree; let chunker = self.chunker; - let tree_builder = TreeBuilder::balanced_tree_with_degree(degree); + let tree_builder = TreeBuilder::balanced_tree_with_degree_and_code(degree, self.code); if let Some(path) = self.path { let name = match self.name { Some(n) => n, @@ -408,6 +449,7 @@ pub enum Entry { File(File), Directory(Directory), Symlink(Symlink), + RawBlock(RawBlock), } impl Entry { @@ -416,14 +458,16 @@ impl Entry { Entry::File(f) => f.name(), Entry::Directory(d) => d.name(), Entry::Symlink(s) => s.name(), + Entry::RawBlock(r) => r.name(), } } - pub async fn encode(self) -> Result>> { + pub async fn encode(self, code: multihash::Code) -> Result>> { Ok(match self { Entry::File(f) => f.encode().await?.boxed(), - Entry::Directory(d) => d.encode(), - Entry::Symlink(s) => stream::iter(Some(s.encode())).boxed(), + Entry::Directory(d) => d.encode(code), + Entry::Symlink(s) => stream::iter(Some(s.encode(code))).boxed(), + Entry::RawBlock(r) => stream::iter(Some(r.encode())).boxed(), }) } @@ -433,9 +477,9 @@ impl Entry { let chunker = chunker_config.into(); let dir = DirectoryBuilder::new() .chunker(chunker) - .path(path) - .build() - .await?; + .add_path(path) + .await? + .build()?; Entry::Directory(dir) } else { anyhow::bail!("expected a ChunkerConfig in the Config"); @@ -443,11 +487,7 @@ impl Entry { } else if path.is_file() { if let Some(chunker_config) = config.chunker { let chunker = chunker_config.into(); - let file = FileBuilder::new() - .chunker(chunker) - .path(path) - .build() - .await?; + let file = FileBuilder::new().chunker(chunker).path(path).build()?; Entry::File(file) } else { anyhow::bail!("expected a ChunkerConfig in the Config"); @@ -469,6 +509,7 @@ impl Entry { Entry::File(f) => f.wrap(), Entry::Directory(d) => d.wrap(), Entry::Symlink(s) => s.wrap(), + Entry::RawBlock(r) => r.wrap(), } } } @@ -481,7 +522,6 @@ pub struct DirectoryBuilder { typ: DirectoryType, chunker: Chunker, degree: usize, - path: Option, } impl Default for DirectoryBuilder { @@ -492,7 +532,6 @@ impl Default for DirectoryBuilder { typ: DirectoryType::Basic, chunker: Chunker::Fixed(chunker::Fixed::default()), degree: DEFAULT_DEGREE, - path: None, } } } @@ -512,11 +551,6 @@ impl DirectoryBuilder { self } - pub fn path(mut self, path: &Path) -> Self { - self.path = Some(path.into()); - self - } - pub fn chunker(mut self, chunker: Chunker) -> Self { self.chunker = chunker; self @@ -528,18 +562,22 @@ impl DirectoryBuilder { } pub fn add_dir(self, dir: Directory) -> Result { - Ok(self.entry(Entry::Directory(dir))) + Ok(self.add_entry(Entry::Directory(dir))) } pub fn add_file(self, file: File) -> Self { - self.entry(Entry::File(file)) + self.add_entry(Entry::File(file)) + } + + pub fn add_raw_block(self, raw_block: RawBlock) -> Self { + self.add_entry(Entry::RawBlock(raw_block)) } pub fn add_symlink(self, symlink: Symlink) -> Self { - self.entry(Entry::Symlink(symlink)) + self.add_entry(Entry::Symlink(symlink)) } - fn entry(mut self, entry: Entry) -> Self { + pub fn add_entry(mut self, entry: Entry) -> Self { if self.typ == DirectoryType::Basic && self.entries.len() >= DIRECTORY_LINK_LIMIT { self.typ = DirectoryType::Hamt } @@ -547,34 +585,38 @@ impl DirectoryBuilder { self } - pub async fn build(self) -> Result { + pub fn add_entries(mut self, entries: impl Iterator) -> Self { + for entry in entries { + self = self.add_entry(entry); + } + self + } + + pub async fn add_path(self, path: impl Into) -> Result { + let chunker = self.chunker.clone(); + let degree = self.degree; + Ok(self.add_entries( + make_entries_from_path(path, chunker, degree) + .await? + .into_iter(), + )) + } + + pub fn build(self) -> Result { let DirectoryBuilder { - name, - entries, - typ, - path, - chunker, - degree, + name, entries, typ, .. } = self; - Ok(if let Some(path) = path { - let mut dir = make_dir_from_path(path, chunker.clone(), degree).await?; - if let Some(name) = name { - dir.set_name(name); - } - dir - } else { - let name = name.unwrap_or_default(); - match typ { - DirectoryType::Basic => Directory::Basic(BasicDirectory { name, entries }), - DirectoryType::Hamt => { - let hamt = HamtNode::new(entries) - .context("unable to build hamt. Probably a hash collision.")?; - Directory::Hamt(HamtDirectory { - name, - hamt: Box::new(hamt), - }) - } + let name = name.unwrap_or_default(); + Ok(match typ { + DirectoryType::Basic => Directory::Basic(BasicDirectory { name, entries }), + DirectoryType::Hamt => { + let hamt = HamtNode::new(entries) + .context("unable to build hamt. Probably a hash collision.")?; + Directory::Hamt(HamtDirectory { + name, + hamt: Box::new(hamt), + }) } }) } @@ -637,7 +679,7 @@ impl HamtNode { } } - pub fn encode<'a>(self) -> BoxStream<'a, Result> { + pub fn encode<'a>(self, code: multihash::Code) -> BoxStream<'a, Result> { match self { Self::Branch(tree) => { async_stream::try_stream! { @@ -646,7 +688,7 @@ impl HamtNode { for (prefix, node) in tree { let name = format!("{:02X}{}", prefix, node.name()); bitfield.set_bit(prefix); - let blocks = node.encode(); + let blocks = node.encode(code); let mut root = None; tokio::pin!(blocks); while let Some(block) = blocks.next().await { @@ -671,11 +713,11 @@ impl HamtNode { // it does not really matter what enum variant we choose here as long as // it is not raw. The type of the node will be HamtShard from above. let node = UnixfsNode::Directory(crate::unixfs::Node { outer, inner }); - yield node.encode()?; + yield node.encode(code)?; } .boxed() } - Self::Leaf(HamtLeaf(_hash, entry)) => async move { entry.encode().await } + Self::Leaf(HamtLeaf(_hash, entry)) => async move { entry.encode(code).await } .try_flatten_stream() .boxed(), } @@ -717,7 +759,7 @@ impl SymlinkBuilder { } } -pub(crate) fn encode_unixfs_pb( +pub fn encode_unixfs_pb( inner: &unixfs_pb::Data, links: Vec, ) -> Result { @@ -743,40 +785,45 @@ pub struct Config { } #[async_recursion(?Send)] -async fn make_dir_from_path>( +async fn make_entries_from_path>( path: P, chunker: Chunker, degree: usize, -) -> Result { - let path = path.into(); - let mut dir = DirectoryBuilder::new().name( - path.file_name() - .and_then(|s| s.to_str()) - .unwrap_or_default(), - ); - - let mut directory_reader = tokio::fs::read_dir(path.clone()).await?; +) -> Result> { + let mut directory_reader = tokio::fs::read_dir(path.into()).await?; + let mut entries = vec![]; while let Some(entry) = directory_reader.next_entry().await? { let path = entry.path(); if path.is_symlink() { - let s = SymlinkBuilder::new(path).build().await?; - dir = dir.add_symlink(s); + entries.push(Entry::Symlink(SymlinkBuilder::new(path).build().await?)) } else if path.is_file() { - let f = FileBuilder::new() - .chunker(chunker.clone()) - .degree(degree) - .path(path) - .build() - .await?; - dir = dir.add_file(f); + entries.push(Entry::File( + FileBuilder::new() + .chunker(chunker.clone()) + .degree(degree) + .path(path) + .build()?, + )); } else if path.is_dir() { - let d = make_dir_from_path(path, chunker.clone(), degree).await?; - dir = dir.add_dir(d)?; + entries.push(Entry::Directory( + DirectoryBuilder::new() + .name( + path.file_name() + .and_then(|s| s.to_str()) + .unwrap_or_default(), + ) + .add_entries( + make_entries_from_path(path, chunker.clone(), degree) + .await? + .into_iter(), + ) + .build()?, + )) } else { - anyhow::bail!("directory entry is neither file nor directory") + anyhow::bail!("directory entry is neither file nor directory nor symlink") } } - dir.build().await + Ok(entries) } #[cfg(test)] @@ -795,14 +842,12 @@ mod tests { let bar = FileBuilder::new() .name("bar.txt") .content_bytes(b"bar".to_vec()) - .build() - .await?; + .build()?; let bar_encoded: Vec<_> = { let bar = FileBuilder::new() .name("bar.txt") .content_bytes(b"bar".to_vec()) - .build() - .await?; + .build()?; bar.encode().await?.try_collect().await? }; assert_eq!(bar_encoded.len(), 1); @@ -815,12 +860,12 @@ mod tests { let mut baz = SymlinkBuilder::new("baz.txt"); baz.target("bat.txt"); let baz = baz.build().await?; - baz.encode()? + baz.encode(multihash::Code::Sha2_256)? }; - let dir = dir.add_file(bar).add_symlink(baz).build().await?; + let dir = dir.add_file(bar).add_symlink(baz).build()?; - let dir_block = dir.encode_root().await?; + let dir_block = dir.encode_root(multihash::Code::Sha2_256).await?; let decoded_dir = UnixfsNode::decode(dir_block.cid(), dir_block.data().clone())?; let links = decoded_dir.links().collect::>>().unwrap(); @@ -835,7 +880,7 @@ mod tests { #[tokio::test] async fn test_recursive_dir_builder() -> Result<()> { - let dir = DirectoryBuilder::new().build().await?; + let dir = DirectoryBuilder::new().build()?; DirectoryBuilder::new() .add_dir(dir) @@ -853,15 +898,13 @@ mod tests { let bar = FileBuilder::new() .name("bar.txt") .content_reader(bar_reader) - .build() - .await?; + .build()?; let bar_encoded: Vec<_> = { let bar_reader = std::io::Cursor::new(b"bar"); let bar = FileBuilder::new() .name("bar.txt") .content_reader(bar_reader) - .build() - .await?; + .build()?; bar.encode().await?.try_collect().await? }; assert_eq!(bar_encoded.len(), 1); @@ -874,12 +917,12 @@ mod tests { let mut baz = SymlinkBuilder::new("baz.txt"); baz.target("bat.txt"); let baz = baz.build().await?; - baz.encode()? + baz.encode(multihash::Code::Sha2_256)? }; - let dir = dir.add_file(bar).add_symlink(baz).build().await?; + let dir = dir.add_file(bar).add_symlink(baz).build()?; - let dir_block = dir.encode_root().await?; + let dir_block = dir.encode_root(multihash::Code::Sha2_256).await?; let decoded_dir = UnixfsNode::decode(dir_block.cid(), dir_block.data().clone())?; let links = decoded_dir.links().collect::>>().unwrap(); @@ -916,15 +959,13 @@ mod tests { let bar = FileBuilder::new() .name("bar.txt") .content_reader(bar_reader) - .build() - .await?; + .build()?; let bar_encoded: Vec<_> = { let bar_reader = std::io::Cursor::new(vec![1u8; 1024 * 1024]); let bar = FileBuilder::new() .name("bar.txt") .content_reader(bar_reader) - .build() - .await?; + .build()?; bar.encode().await?.try_collect().await? }; assert_eq!(bar_encoded.len(), 5); @@ -941,22 +982,20 @@ mod tests { let baz = FileBuilder::new() .name("baz.txt") .content_reader(baz_reader) - .build() - .await?; + .build()?; let baz_encoded: Vec<_> = { let baz_reader = std::io::Cursor::new(baz_content); let baz = FileBuilder::new() .name("baz.txt") .content_reader(baz_reader) - .build() - .await?; + .build()?; baz.encode().await?.try_collect().await? }; assert_eq!(baz_encoded.len(), 9); - let dir = dir.add_file(bar).add_file(baz).build().await?; + let dir = dir.add_file(bar).add_file(baz).build()?; - let dir_block = dir.encode_root().await?; + let dir_block = dir.encode_root(multihash::Code::Sha2_256).await?; let decoded_dir = UnixfsNode::decode(dir_block.cid(), dir_block.data().clone())?; let links = decoded_dir.links().collect::>>().unwrap(); @@ -995,8 +1034,7 @@ mod tests { let file = FileBuilder::new() .name("foo.txt") .content_bytes(Bytes::from("hello world")) - .build() - .await?; + .build()?; builder = builder.add_file(file); } @@ -1006,8 +1044,7 @@ mod tests { let file = FileBuilder::new() .name("foo.txt") .content_bytes(Bytes::from("hello world")) - .build() - .await?; + .build()?; builder = builder.add_file(file); // at directory link limit should be processed as a hamt @@ -1023,11 +1060,10 @@ mod tests { let file = FileBuilder::new() .name("foo.txt") .content_bytes(Bytes::from("hello world")) - .build() - .await?; + .build()?; builder = builder.add_file(file); } - assert!(builder.build().await.is_err()); + assert!(builder.build().is_err()); Ok(()) } @@ -1058,7 +1094,7 @@ mod tests { file.write_all(b"hello world").unwrap(); // create directory manually - let nested_file = FileBuilder::new().path(nested_file_path).build().await?; + let nested_file = FileBuilder::new().path(nested_file_path).build()?; let nested_dir = Directory::single( String::from( nested_dir_path @@ -1069,19 +1105,24 @@ mod tests { Entry::File(nested_file), ); - let file = FileBuilder::new().path(file_path).build().await?; + let file = FileBuilder::new().path(file_path).build()?; let expected = Directory::basic( String::from(dir.clone().file_name().and_then(|s| s.to_str()).unwrap()), vec![Entry::File(file), Entry::Directory(nested_dir)], ); - let got = make_dir_from_path( - dir, - Chunker::Fixed(chunker::Fixed::default()), - DEFAULT_DEGREE, - ) - .await?; + let got = DirectoryBuilder::new() + .add_entries( + make_entries_from_path( + dir, + Chunker::Fixed(chunker::Fixed::default()), + DEFAULT_DEGREE, + ) + .await? + .into_iter(), + ) + .build()?; let basic_entries = |dir: Directory| match dir { Directory::Basic(basic) => basic.entries, diff --git a/iroh-unixfs/src/content_loader.rs b/iroh-unixfs/src/content_loader.rs index 0399253c7..35a615353 100644 --- a/iroh-unixfs/src/content_loader.rs +++ b/iroh-unixfs/src/content_loader.rs @@ -226,10 +226,9 @@ impl FullLoader { #[async_trait] impl ContentLoader for FullLoader { async fn stop_session(&self, ctx: ContextId) -> Result<()> { - self.client - .try_p2p()? - .stop_session_bitswap(ctx.into()) - .await?; + if let Ok(p2p) = self.client.try_p2p() { + p2p.stop_session_bitswap(ctx.into()).await?; + } Ok(()) } diff --git a/iroh-unixfs/src/unixfs.rs b/iroh-unixfs/src/unixfs.rs index 4fe8553e0..56db1226b 100644 --- a/iroh-unixfs/src/unixfs.rs +++ b/iroh-unixfs/src/unixfs.rs @@ -21,12 +21,12 @@ use crate::{ types::{Block, Link, LinkRef, Links, PbLinks}, }; -pub(crate) mod unixfs_pb { +pub mod unixfs_pb { #![allow(clippy::all)] include!(concat!(env!("OUT_DIR"), "/unixfs_pb.rs")); } -pub(crate) mod dag_pb { +pub mod dag_pb { #![allow(clippy::all)] include!(concat!(env!("OUT_DIR"), "/merkledag_pb.rs")); } @@ -179,12 +179,12 @@ impl UnixfsNode { } } - pub fn encode(&self) -> Result { + pub fn encode(&self, code: multihash::Code) -> Result { let res = match self { UnixfsNode::Raw(data) => { let out = data.clone(); let links = vec![]; - let cid = Cid::new_v1(Codec::Raw as _, cid::multihash::Code::Sha2_256.digest(&out)); + let cid = Cid::new_v1(Codec::Raw as _, code.digest(&out)); Block::new(cid, out, links) } UnixfsNode::RawNode(node) @@ -197,10 +197,7 @@ impl UnixfsNode { .links() .map(|x| Ok(x?.cid)) .collect::>>()?; - let cid = Cid::new_v1( - Codec::DagPb as _, - cid::multihash::Code::Sha2_256.digest(&out), - ); + let cid = Cid::new_v1(Codec::DagPb as _, code.digest(&out)); Block::new(cid, out, links) } };