Skip to content

Commit 3f1e786

Browse files
committed
WIP
1 parent 2bda04d commit 3f1e786

File tree

4 files changed

+72
-23
lines changed

4 files changed

+72
-23
lines changed

src/api/remote.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ impl Remote {
316316
&self,
317317
mut conn: impl GetConnection,
318318
content: impl Into<HashAndFormat>,
319-
progress: impl Sink<u64, Error = anyhow::Error> + Unpin,
319+
progress: impl Sink<u64, Error = io::Error> + Unpin,
320320
) -> anyhow::Result<Stats> {
321321
let content = content.into();
322322
let local = self.local(content).await?;
@@ -435,7 +435,7 @@ impl Remote {
435435
conn: Connection,
436436
request: GetRequest,
437437
counters: RequestCounters,
438-
mut progress: impl Sink<u64, Error = anyhow::Error> + Unpin,
438+
mut progress: impl Sink<u64, Error = io::Error> + Unpin,
439439
) -> GetResult<Stats> {
440440
let store = self.store();
441441
let root = request.hash;
@@ -505,7 +505,7 @@ impl Remote {
505505
&self,
506506
conn: Connection,
507507
request: GetManyRequest,
508-
mut progress: impl Sink<u64, Error = anyhow::Error> + Unpin,
508+
mut progress: impl Sink<u64, Error = io::Error> + Unpin,
509509
) -> GetResult<Stats> {
510510
let store = self.store();
511511
let hash_seq = request.hashes.iter().copied().collect::<HashSeq>();
@@ -654,7 +654,7 @@ async fn get_blob_ranges_impl(
654654
header: AtBlobHeader,
655655
hash: Hash,
656656
store: &Store,
657-
mut progress: impl Sink<u64, Error = anyhow::Error> + Unpin,
657+
mut progress: impl Sink<u64, Error = io::Error> + Unpin,
658658
) -> GetResult<AtEndBlob> {
659659
let (mut content, size) = header.next().await?;
660660
let Some(size) = NonZeroU64::new(size) else {

src/api/swarm.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,13 @@ async fn handle_download_split_impl(
157157
.map(|x| into_stream(x))
158158
.flatten();
159159
let mut offsets = HashMap::new();
160+
let mut total = 0;
160161
while let Some((id, offset)) = stream.next().await {
161-
offsets.insert(id, offset);
162-
println!("Got progress: {:#?}", offsets);
162+
total += offset;
163+
if let Some(prev) = offsets.insert(id, offset) {
164+
total -= prev;
165+
}
166+
println!("Progress: {total}");
163167
}
164168
};
165169
tokio::pin!(forward_progress);
@@ -473,7 +477,7 @@ async fn execute_get(
473477
request: GetRequest,
474478
providers: &Arc<dyn ContentDiscovery>,
475479
store: &Store,
476-
mut progress: impl Sink<u64, Error = anyhow::Error> + Unpin,
480+
mut progress: impl Sink<u64, Error = io::Error> + Unpin,
477481
) -> anyhow::Result<Stats> {
478482
let mut last_error = None;
479483
let remote = store.remote();

src/store/fs/import.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -342,10 +342,10 @@ async fn get_import_source(
342342
#[repr(transparent)]
343343
struct OutboardProgress(spsc::Sender<AddProgressItem>);
344344

345-
impl Progress for OutboardProgress {
345+
impl Progress<ChunkNum> for OutboardProgress {
346346
type Error = irpc::channel::SendError;
347347

348-
async fn progress(&mut self, offset: ChunkNum) -> std::result::Result<(), Self::Error> {
348+
async fn send(&mut self, offset: ChunkNum) -> std::result::Result<(), Self::Error> {
349349
// if offset.0 % 1024 != 0 {
350350
// return Ok(());
351351
// }

src/util.rs

Lines changed: 59 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ where
121121
pub mod outboard_with_progress {
122122
use std::{
123123
future::Future,
124-
io::{self, BufReader, Read},
124+
io::{self, BufReader, Read}, sync::mpsc,
125125
};
126126

127127
use bao_tree::{
@@ -135,40 +135,85 @@ pub mod outboard_with_progress {
135135
use blake3::guts::parent_cv;
136136
use smallvec::SmallVec;
137137

138-
pub trait Progress {
138+
pub trait Progress<Item> {
139139
type Error;
140-
fn progress(
140+
fn send(
141141
&mut self,
142-
offset: ChunkNum,
142+
value: Item,
143143
) -> impl Future<Output = std::result::Result<(), Self::Error>>;
144+
145+
fn with_map<F, U>(
146+
self,
147+
f: F,
148+
) -> WithMap<Self, F>
149+
where
150+
Self: Sized,
151+
F: Fn(Item) -> U + Send + 'static,
152+
{
153+
WithMap {
154+
inner: self,
155+
f,
156+
}
157+
}
158+
}
159+
160+
impl<T> Progress<T> for tokio::sync::mpsc::Sender<T> {
161+
type Error = tokio::sync::mpsc::error::SendError<T>;
162+
163+
async fn send(
164+
&mut self,
165+
value: T,
166+
) -> std::result::Result<(), Self::Error> {
167+
tokio::sync::mpsc::Sender::send(self, value).await
168+
}
169+
}
170+
171+
pub struct WithMap<P, F> {
172+
inner: P,
173+
f: F,
174+
}
175+
176+
impl<P, F, T, U> Progress<T> for WithMap<P, F>
177+
where
178+
P: Progress<U>,
179+
F: Fn(T) -> U + Send + 'static,
180+
{
181+
type Error = P::Error;
182+
183+
async fn send(
184+
&mut self,
185+
value: T,
186+
) -> std::result::Result<(), Self::Error> {
187+
self.inner.send((self.f)(value)).await
188+
}
144189
}
145190

146191
pub struct NoProgress;
147192

148193
impl<Item> n0_future::Sink<Item> for NoProgress {
149-
type Error = anyhow::Error;
194+
type Error = io::Error;
150195

151196
fn poll_ready(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
152-
std::task::Poll::Ready(anyhow::Result::Ok(()))
197+
std::task::Poll::Ready(io::Result::Ok(()))
153198
}
154199

155200
fn start_send(self: std::pin::Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
156-
anyhow::Result::Ok(())
201+
io::Result::Ok(())
157202
}
158203

159204
fn poll_flush(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
160-
std::task::Poll::Ready(anyhow::Result::Ok(()))
205+
std::task::Poll::Ready(io::Result::Ok(()))
161206
}
162207

163208
fn poll_close(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
164-
std::task::Poll::Ready(anyhow::Result::Ok(()))
209+
std::task::Poll::Ready(io::Result::Ok(()))
165210
}
166211
}
167212

168-
impl Progress for NoProgress {
213+
impl<T> Progress<T> for NoProgress {
169214
type Error = io::Error;
170215

171-
async fn progress(&mut self, _offset: ChunkNum) -> std::result::Result<(), Self::Error> {
216+
async fn send(&mut self, _offset: T) -> std::result::Result<(), Self::Error> {
172217
io::Result::Ok(())
173218
}
174219
}
@@ -181,7 +226,7 @@ pub mod outboard_with_progress {
181226
where
182227
W: WriteAt,
183228
R: Read,
184-
P: Progress,
229+
P: Progress<ChunkNum>,
185230
{
186231
// wrap the reader in a buffered reader, so we read in large chunks
187232
// this reduces the number of io ops
@@ -203,7 +248,7 @@ pub mod outboard_with_progress {
203248
) -> io::Result<std::result::Result<(), P::Error>>
204249
where
205250
W: WriteAt,
206-
P: Progress,
251+
P: Progress<ChunkNum>,
207252
{
208253
// do not allocate for small trees
209254
let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
@@ -223,7 +268,7 @@ pub mod outboard_with_progress {
223268
start_chunk,
224269
..
225270
} => {
226-
if let Err(err) = progress.progress(start_chunk).await {
271+
if let Err(err) = progress.send(start_chunk).await {
227272
return Ok(Err(err));
228273
}
229274
let buf = &mut buffer[..size];

0 commit comments

Comments
 (0)