Skip to content

Commit 96bc130

Browse files
committed
Start dealing with the case where a file is partial but really complete.
This can happen if the the store crashes right a the transition from partial to complete
1 parent 9d0fccd commit 96bc130

File tree

3 files changed

+21
-11
lines changed

3 files changed

+21
-11
lines changed

proptest-regressions/protocol/range_spec.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@
66
# everyone who runs the test benefits from these saved cases.
77
cc 5ff4de8531a81c637b4d202c97b724a41a989bc6894464e84db5ac2a519c08a9 # shrinks to ranges = [RangeSet{1..2}]
88
cc 50cb338763aa276705bb970d57d3d87e834f31a7e57bba810f46690c6d1e9955 # shrinks to ranges = [RangeSet{7..98}, RangeSet{7..98}]
9+
cc 8579821a8bde7872fed2cfa38e8a5923706b9915f3920e9c2d101a06bc789309 # shrinks to ranges = []

src/api/downloader.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ use tracing::instrument::Instrument;
2323

2424
use super::{Store, remote::GetConnection};
2525
use crate::{
26-
protocol::{GetManyRequest, GetRequest}, util::sink::{Drain, IrpcSenderRefSink, Sink, TokioMpscSenderSink}, BlobFormat, Hash, HashAndFormat
26+
BlobFormat, Hash, HashAndFormat,
27+
get::request,
28+
protocol::{GetManyRequest, GetRequest},
29+
util::sink::{Drain, IrpcSenderRefSink, Sink, TokioMpscSenderSink},
2730
};
2831

2932
#[derive(Debug, Clone)]
@@ -134,12 +137,16 @@ async fn handle_download_split_impl(
134137
request: DownloadRequest,
135138
tx: &mut spsc::Sender<DownloadProgessItem>,
136139
) -> anyhow::Result<()> {
140+
println!("Handling download split");
137141
let providers = request.providers;
138142
let requests = split_request(&request.request, &providers, &pool, &store, Drain).await?;
143+
let requests = requests.collect::<Vec<_>>();
144+
println!("Split into {:?} requests", requests);
139145
// todo: this is it's own mini actor, we should probably refactor this out
140146
let (progress_tx, progress_rx) = mpsc::channel(32);
141147
let mut futs = stream::iter(requests.into_iter().enumerate())
142148
.map(|(id, request)| {
149+
println!("Spawning download fut for request {id} {request:?}");
143150
let pool = pool.clone();
144151
let providers = providers.clone();
145152
let store = store.clone();
@@ -178,7 +185,6 @@ async fn handle_download_split_impl(
178185
tx.send(item).await?;
179186
},
180187
res = futs.next() => {
181-
println!("Got result: {:?}", res);
182188
match res {
183189
Some((hash, Ok(()))) => {
184190
}
@@ -245,7 +251,8 @@ impl SupportedRequest for HashAndFormat {
245251
(match self.format {
246252
BlobFormat::Raw => GetRequest::blob(self.hash),
247253
BlobFormat::HashSeq => GetRequest::all(self.hash),
248-
}).into()
254+
})
255+
.into()
249256
}
250257
}
251258

@@ -387,7 +394,7 @@ async fn split_request<'a>(
387394
) -> anyhow::Result<Box<dyn Iterator<Item = GetRequest> + Send + 'a>> {
388395
Ok(match request {
389396
FiniteRequest::Get(req) => {
390-
let Some(first) = req.ranges.iter().next() else {
397+
let Some(first) = req.ranges.iter_infinite().next() else {
391398
return Ok(Box::new(std::iter::empty()));
392399
};
393400
let first = GetRequest::blob(req.hash);
@@ -397,7 +404,7 @@ async fn split_request<'a>(
397404
let n = size / 32;
398405
Box::new(
399406
req.ranges
400-
.iter()
407+
.iter_infinite()
401408
.take(n as usize + 1)
402409
.enumerate()
403410
.filter_map(|(i, ranges)| {

src/store/fs.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -320,12 +320,13 @@ impl HashContext {
320320
self.ctx.options.clone(),
321321
)),
322322
};
323-
res
323+
Ok((res?, ()))
324324
})
325325
.await
326326
.map_err(api::Error::from);
327327
trace!("{res:?}");
328-
res
328+
let (res, _) = res?;
329+
Ok(res)
329330
}
330331
}
331332

@@ -384,19 +385,20 @@ impl Slot {
384385
/// If there is nothing in the database, create a new in-memory handle.
385386
///
386387
/// `make` will be called if the a live handle does not exist.
387-
pub async fn get_or_create<F, Fut>(&self, make: F) -> io::Result<BaoFileHandle>
388+
pub async fn get_or_create<F, Fut, T>(&self, make: F) -> io::Result<(BaoFileHandle, T)>
388389
where
389390
F: FnOnce() -> Fut,
390-
Fut: std::future::Future<Output = io::Result<BaoFileHandle>>,
391+
Fut: std::future::Future<Output = io::Result<(BaoFileHandle, T)>>,
392+
T: Default,
391393
{
392394
let mut slot = self.0.lock().await;
393395
if let Some(weak) = &*slot {
394396
if let Some(handle) = weak.upgrade() {
395-
return Ok(handle);
397+
return Ok((handle, Default::default()));
396398
}
397399
}
398400
let handle = make().await;
399-
if let Ok(handle) = &handle {
401+
if let Ok((handle, _)) = &handle {
400402
*slot = Some(handle.downgrade());
401403
}
402404
handle

0 commit comments

Comments
 (0)