Skip to content

Commit 1cf74f8

Browse files
committed
move store into store and add back the protocol and net_protocol
1 parent a335cab commit 1cf74f8

31 files changed

+5286
-323
lines changed

Cargo.lock

+3,056-247
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ quinn = { package = "iroh-quinn", version = "0.13.0" }
1313
n0-future = "0.1.2"
1414
range-collections = "0.4.5"
1515
redb = "2.4.0"
16-
smallvec = { version = "1", features = ["serde"] }
16+
smallvec = { version = "1", features = ["serde", "const_new"] }
1717
thiserror = "2.0.11"
1818
tokio = { version = "1.43.0", features = ["full"] }
1919
tokio-util = { version = "0.7.13", features = ["full"] }
@@ -28,6 +28,7 @@ chrono = "0.4.39"
2828
nested_enum_utils = "0.1.0"
2929
ref-cast = "1.0.24"
3030
arrayvec = "0.7.6"
31+
iroh = "0.33.0"
3132

3233
[dev-dependencies]
3334
hex = "0.4.3"

src/fs/entry_state.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
55
use smallvec::SmallVec;
66

77
use super::meta::{ActorError, ActorResult};
8-
use crate::{hash::DD, util::SliceInfoExt};
8+
use crate::util::SliceInfoExt;
99

1010
/// Location of the data.
1111
///

src/hash.rs

+2-14
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use bao_tree::blake3;
77
use postcard::experimental::max_size::MaxSize;
88
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
99

10+
use crate::store::util::DD;
11+
1012
/// Hash type used throughout.
1113
#[derive(PartialEq, Eq, Copy, Clone, Hash)]
1214
pub struct Hash(blake3::Hash);
@@ -17,20 +19,6 @@ impl fmt::Debug for Hash {
1719
}
1820
}
1921

20-
pub(crate) struct DD<T: fmt::Display>(T);
21-
22-
impl<T: fmt::Display> From<T> for DD<T> {
23-
fn from(value: T) -> Self {
24-
DD(value)
25-
}
26-
}
27-
28-
impl<T: fmt::Display> fmt::Debug for DD<T> {
29-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30-
fmt::Display::fmt(&self.0, f)
31-
}
32-
}
33-
3422
impl Hash {
3523
/// The hash for the empty byte range (`b""`).
3624
pub const EMPTY: Hash = Hash::from_bytes([

src/hashseq.rs

+156
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
//! traits related to collections of blobs
2+
use std::{fmt::Debug, io};
3+
4+
use bytes::Bytes;
5+
use iroh_io::{AsyncSliceReader, AsyncSliceReaderExt};
6+
7+
use crate::Hash;
8+
9+
/// A sequence of links, backed by a [`Bytes`] object.
10+
#[derive(Debug, Clone, derive_more::Into)]
11+
pub struct HashSeq(Bytes);
12+
13+
impl FromIterator<Hash> for HashSeq {
14+
fn from_iter<T: IntoIterator<Item = Hash>>(iter: T) -> Self {
15+
let iter = iter.into_iter();
16+
let (lower, _upper) = iter.size_hint();
17+
let mut bytes = Vec::with_capacity(lower * 32);
18+
for hash in iter {
19+
bytes.extend_from_slice(hash.as_ref());
20+
}
21+
Self(bytes.into())
22+
}
23+
}
24+
25+
impl TryFrom<Bytes> for HashSeq {
26+
type Error = anyhow::Error;
27+
28+
fn try_from(bytes: Bytes) -> Result<Self, Self::Error> {
29+
Self::new(bytes).ok_or_else(|| anyhow::anyhow!("invalid hash sequence"))
30+
}
31+
}
32+
33+
impl IntoIterator for HashSeq {
34+
type Item = Hash;
35+
type IntoIter = HashSeqIter;
36+
37+
fn into_iter(self) -> Self::IntoIter {
38+
HashSeqIter(self)
39+
}
40+
}
41+
42+
/// Stream over the hashes in a [`HashSeq`].
43+
///
44+
/// todo: make this wrap a reader instead of a [`HashSeq`].
45+
#[derive(Debug, Clone)]
46+
pub struct HashSeqStream(HashSeq);
47+
48+
impl HashSeqStream {
49+
/// Get the next hash in the sequence.
50+
#[allow(clippy::should_implement_trait, clippy::unused_async)]
51+
pub async fn next(&mut self) -> io::Result<Option<Hash>> {
52+
Ok(self.0.pop_front())
53+
}
54+
55+
/// Skip a number of hashes in the sequence.
56+
#[allow(clippy::unused_async)]
57+
pub async fn skip(&mut self, n: u64) -> io::Result<()> {
58+
let ok = self.0.drop_front(n as usize);
59+
if !ok {
60+
Err(io::Error::new(
61+
io::ErrorKind::UnexpectedEof,
62+
"end of sequence",
63+
))
64+
} else {
65+
Ok(())
66+
}
67+
}
68+
}
69+
70+
impl HashSeq {
71+
/// Create a new sequence of hashes.
72+
pub fn new(bytes: Bytes) -> Option<Self> {
73+
if bytes.len() % 32 == 0 {
74+
Some(Self(bytes))
75+
} else {
76+
None
77+
}
78+
}
79+
80+
fn drop_front(&mut self, n: usize) -> bool {
81+
let start = n * 32;
82+
if start > self.0.len() {
83+
false
84+
} else {
85+
self.0 = self.0.slice(start..);
86+
true
87+
}
88+
}
89+
90+
/// Iterate over the hashes in this sequence.
91+
pub fn iter(&self) -> impl Iterator<Item = Hash> + '_ {
92+
self.0.chunks_exact(32).map(|chunk| {
93+
let hash: [u8; 32] = chunk.try_into().unwrap();
94+
hash.into()
95+
})
96+
}
97+
98+
/// Get the number of hashes in this sequence.
99+
pub fn len(&self) -> usize {
100+
self.0.len() / 32
101+
}
102+
103+
/// Check if this sequence is empty.
104+
pub fn is_empty(&self) -> bool {
105+
self.0.is_empty()
106+
}
107+
108+
/// Get the hash at the given index.
109+
pub fn get(&self, index: usize) -> Option<Hash> {
110+
if index < self.len() {
111+
let hash: [u8; 32] = self.0[index * 32..(index + 1) * 32].try_into().unwrap();
112+
Some(hash.into())
113+
} else {
114+
None
115+
}
116+
}
117+
118+
/// Get and remove the first hash in this sequence.
119+
pub fn pop_front(&mut self) -> Option<Hash> {
120+
if self.is_empty() {
121+
None
122+
} else {
123+
let hash = self.get(0).unwrap();
124+
self.0 = self.0.slice(32..);
125+
Some(hash)
126+
}
127+
}
128+
129+
/// Get the underlying bytes.
130+
pub fn into_inner(self) -> Bytes {
131+
self.0
132+
}
133+
}
134+
135+
/// Iterator over the hashes in a [`HashSeq`].
136+
#[derive(Debug, Clone)]
137+
pub struct HashSeqIter(HashSeq);
138+
139+
impl Iterator for HashSeqIter {
140+
type Item = Hash;
141+
142+
fn next(&mut self) -> Option<Self::Item> {
143+
self.0.pop_front()
144+
}
145+
}
146+
147+
/// Parse a sequence of hashes.
148+
pub async fn parse_hash_seq<'a, R: AsyncSliceReader + 'a>(
149+
mut reader: R,
150+
) -> anyhow::Result<(HashSeqStream, u64)> {
151+
let bytes = reader.read_to_end().await?;
152+
let hashes = HashSeq::try_from(bytes)?;
153+
let num_hashes = hashes.len() as u64;
154+
let stream = HashSeqStream(hashes);
155+
Ok((stream, num_hashes))
156+
}

src/lib.rs

+7-30
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,9 @@
1-
use bao_tree::BlockSize;
2-
mod api;
3-
mod bitfield;
4-
mod fs;
5-
mod hash;
6-
mod mem;
7-
mod proto;
8-
mod readonly_mem;
9-
mod test;
10-
mod util;
1+
pub mod store;
112

3+
mod hash;
124
pub use hash::{BlobFormat, Hash, HashAndFormat};
13-
use ref_cast::RefCast;
14-
15-
#[derive(Debug, Clone, ref_cast::RefCast)]
16-
#[repr(transparent)]
17-
pub struct Store {
18-
sender: tokio::sync::mpsc::Sender<proto::Command>,
19-
}
20-
21-
impl Store {
22-
pub fn from_sender(sender: tokio::sync::mpsc::Sender<proto::Command>) -> Self {
23-
Self { sender }
24-
}
25-
26-
pub fn ref_from_sender(sender: &tokio::sync::mpsc::Sender<proto::Command>) -> &Self {
27-
Store::ref_cast(sender)
28-
}
29-
}
30-
31-
/// Block size used by iroh, 2^4*1024 = 16KiB
32-
pub const IROH_BLOCK_SIZE: BlockSize = BlockSize::from_chunk_log(4);
5+
pub use store::IROH_BLOCK_SIZE;
6+
pub mod hashseq;
7+
pub mod net_protocol;
8+
pub mod protocol;
9+
pub mod provider;

src/net_protocol.rs

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
//! Adaptation of `iroh-blobs` as an `iroh` protocol.
2+
3+
// TODO: reduce API surface and add documentation
4+
#![allow(missing_docs)]
5+
6+
use std::{
7+
collections::BTreeSet,
8+
fmt::Debug,
9+
ops::{Deref, DerefMut},
10+
sync::Arc,
11+
};
12+
13+
use anyhow::{bail, Result};
14+
use futures_lite::future::Boxed as BoxedFuture;
15+
use iroh::{endpoint::Connecting, protocol::ProtocolHandler, Endpoint, NodeAddr};
16+
use serde::{Deserialize, Serialize};
17+
use tracing::debug;
18+
19+
use crate::{store::Store, BlobFormat, Hash};
20+
21+
#[derive(Debug)]
22+
pub(crate) struct BlobsInner {
23+
pub(crate) store: Store,
24+
pub(crate) endpoint: Endpoint,
25+
}
26+
27+
#[derive(Debug, Clone)]
28+
pub struct Blobs {
29+
pub(crate) inner: Arc<BlobsInner>,
30+
}
31+
32+
impl Blobs {
33+
fn new(store: Store, endpoint: Endpoint) -> Self {
34+
Self {
35+
inner: Arc::new(BlobsInner { store, endpoint }),
36+
}
37+
}
38+
39+
pub fn store(&self) -> &Store {
40+
&self.inner.store
41+
}
42+
43+
pub fn endpoint(&self) -> &Endpoint {
44+
&self.inner.endpoint
45+
}
46+
}
47+
48+
impl ProtocolHandler for Blobs {
49+
fn accept(&self, conn: Connecting) -> BoxedFuture<Result<()>> {
50+
let store = self.store().clone();
51+
52+
Box::pin(async move {
53+
crate::provider::handle_connection(conn.await?, store).await;
54+
Ok(())
55+
})
56+
}
57+
58+
fn shutdown(&self) -> BoxedFuture<()> {
59+
let store = self.store().clone();
60+
Box::pin(async move {
61+
store.shutdown().await;
62+
})
63+
}
64+
}

src/proto.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ use serde::{Deserialize, Serialize};
1414
use tokio::sync::{mpsc, oneshot};
1515

1616
use crate::{
17-
bitfield::Bitfield,
18-
hash::DD,
17+
bitfield::Bitfield
1918
util::{observer::Observer, Tag},
2019
BlobFormat, HashAndFormat,
2120
};

0 commit comments

Comments
 (0)