Skip to content

Commit 0b0f089

Browse files
committed
Fix timeouts firing while tarballs are extracted
This commit fixes #6125 by ensuring that while we're extracting tarballs or doing other synchronous work like grabbing file locks we're not letting the timeout timers of each HTTP transfer keep ticking. This is curl's default behavior (which we don't want in this scenario). Instead the timeout logic is inlined directly and we manually account for the synchronous work happening not counting towards timeout limits. Closes #6125
1 parent c14839b commit 0b0f089

File tree

7 files changed

+196
-47
lines changed

7 files changed

+196
-47
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ crates-io = { path = "src/crates-io", version = "0.20" }
2323
crossbeam-utils = "0.5"
2424
crypto-hash = "0.3.1"
2525
curl = { version = "0.4.17", features = ['http2'] }
26+
curl-sys = "0.4.12"
2627
env_logger = "0.5.11"
2728
failure = "0.1.2"
2829
filetime = "0.2"

src/cargo/core/package.rs

Lines changed: 140 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use std::path::{Path, PathBuf};
77
use std::time::{Instant, Duration};
88

99
use bytesize::ByteSize;
10+
use curl;
11+
use curl_sys;
1012
use curl::easy::{Easy, HttpVersion};
1113
use curl::multi::{Multi, EasyHandle};
1214
use lazycell::LazyCell;
@@ -257,7 +259,7 @@ pub struct Downloads<'a, 'cfg: 'a> {
257259
set: &'a PackageSet<'cfg>,
258260
pending: HashMap<usize, (Download<'cfg>, EasyHandle)>,
259261
pending_ids: HashSet<PackageId>,
260-
results: Vec<(usize, CargoResult<()>)>,
262+
results: Vec<(usize, Result<(), curl::Error>)>,
261263
next: usize,
262264
progress: RefCell<Option<Progress<'cfg>>>,
263265
downloads_finished: usize,
@@ -268,14 +270,49 @@ pub struct Downloads<'a, 'cfg: 'a> {
268270
}
269271

270272
struct Download<'cfg> {
273+
/// Token for this download, used as the key of the `Downloads::pending` map
274+
/// and stored in `EasyHandle` as well.
271275
token: usize,
276+
277+
/// Package that we're downloading
272278
id: PackageId,
279+
280+
/// Actual downloaded data, updated throughout the lifetime of this download
273281
data: RefCell<Vec<u8>>,
282+
283+
/// The URL that we're downloading from, cached here for error messages and
284+
/// reenqueuing.
274285
url: String,
286+
287+
/// A descriptive string to print when we've finished downloading this crate
275288
descriptor: String,
289+
290+
/// Statistics updated from the progress callback in libcurl
276291
total: Cell<u64>,
277292
current: Cell<u64>,
293+
294+
/// The moment we started this transfer at
278295
start: Instant,
296+
297+
/// Last time we noticed that we got some more data from libcurl
298+
updated_at: Cell<Instant>,
299+
300+
/// Timeout management, both of timeout thresholds as well as whether or not
301+
/// our connection has timed out (and accompanying message if it has).
302+
///
303+
/// Note that timeout management is done manually here because we have a
304+
/// `Multi` with a lot of active transfers but between transfers finishing
305+
/// we perform some possibly slow synchronous work (like grabbing file
306+
/// locks, extracting tarballs, etc). The default timers on our `Multi` keep
307+
/// running during this work, but we don't want them to count towards timing
308+
/// everythig out. As a result, we manage this manually and take the time
309+
/// for synchronous work into account manually.
310+
timeout: ops::HttpTimeout,
311+
timed_out: Cell<Option<String>>,
312+
next_speed_check: Cell<Instant>,
313+
next_speed_check_bytes_threshold: Cell<u64>,
314+
315+
/// Logic used to track retrying this download if it's a spurious failure.
279316
retry: Retry<'cfg>,
280317
}
281318

@@ -409,7 +446,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
409446
debug!("downloading {} as {}", id, token);
410447
assert!(self.pending_ids.insert(id.clone()));
411448

412-
let mut handle = ops::http_handle(self.set.config)?;
449+
let (mut handle, timeout) = ops::http_handle_and_timeout(self.set.config)?;
413450
handle.get(true)?;
414451
handle.url(&url)?;
415452
handle.follow_location(true)?; // follow redirects
@@ -447,14 +484,10 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
447484
handle.progress(true)?;
448485
handle.progress_function(move |dl_total, dl_cur, _, _| {
449486
tls::with(|downloads| {
450-
let downloads = match downloads {
451-
Some(d) => d,
452-
None => return false,
453-
};
454-
let dl = &downloads.pending[&token].0;
455-
dl.total.set(dl_total as u64);
456-
dl.current.set(dl_cur as u64);
457-
downloads.tick(WhyTick::DownloadUpdate).is_ok()
487+
match downloads {
488+
Some(d) => d.progress(token, dl_total as u64, dl_cur as u64),
489+
None => false,
490+
}
458491
})
459492
})?;
460493

@@ -468,6 +501,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
468501
self.set.config.shell().status("Downloading", "crates ...")?;
469502
}
470503

504+
let now = Instant::now();
471505
let dl = Download {
472506
token,
473507
data: RefCell::new(Vec::new()),
@@ -477,6 +511,11 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
477511
total: Cell::new(0),
478512
current: Cell::new(0),
479513
start: Instant::now(),
514+
updated_at: Cell::new(now),
515+
timeout,
516+
timed_out: Cell::new(None),
517+
next_speed_check: Cell::new(now),
518+
next_speed_check_bytes_threshold: Cell::new(0),
480519
retry: Retry::new(self.set.config)?,
481520
};
482521
self.enqueue(dl, handle)?;
@@ -514,13 +553,35 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
514553
// then we want to re-enqueue our request for another attempt and
515554
// then we wait for another request to finish.
516555
let ret = {
556+
let timed_out = &dl.timed_out;
517557
let url = &dl.url;
518558
dl.retry.try(|| {
519-
result?;
559+
if let Err(e) = result {
560+
// If this error is "aborted by callback" then that's
561+
// probably because our progress callback aborted due to
562+
// a timeout. We'll find out by looking at the
563+
// `timed_out` field, looking for a descriptive message.
564+
// If one is found we switch the error code (to ensure
565+
// it's flagged as spurious) and then attach our extra
566+
// information to the error.
567+
if !e.is_aborted_by_callback() {
568+
return Err(e.into())
569+
}
570+
571+
return Err(match timed_out.replace(None) {
572+
Some(msg) => {
573+
let code = curl_sys::CURLE_OPERATION_TIMEDOUT;
574+
let mut err = curl::Error::new(code);
575+
err.set_extra(msg);
576+
err
577+
}
578+
None => e,
579+
}.into())
580+
}
520581

521582
let code = handle.response_code()?;
522583
if code != 200 && code != 0 {
523-
let url = handle.effective_url()?.unwrap_or(&url);
584+
let url = handle.effective_url()?.unwrap_or(url);
524585
return Err(HttpNot200 {
525586
code,
526587
url: url.to_string(),
@@ -569,20 +630,39 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
569630
let source = sources
570631
.get_mut(dl.id.source_id())
571632
.ok_or_else(|| internal(format!("couldn't find source for `{}`", dl.id)))?;
633+
let start = Instant::now();
572634
let pkg = source.finish_download(&dl.id, data)?;
635+
636+
// Assume that no time has passed while we were calling
637+
// `finish_download`, update all speed checks and timeout limits of all
638+
// active downloads to make sure they don't fire because of a slowly
639+
// extracted tarball.
640+
let finish_dur = start.elapsed();
641+
for (dl, _) in self.pending.values_mut() {
642+
dl.updated_at.set(dl.updated_at.get() + finish_dur);
643+
dl.next_speed_check.set(dl.next_speed_check.get() + finish_dur);
644+
}
645+
573646
let slot = &self.set.packages[&dl.id];
574647
assert!(slot.fill(pkg).is_ok());
575648
Ok(slot.borrow().unwrap())
576649
}
577650

578651
fn enqueue(&mut self, dl: Download<'cfg>, handle: Easy) -> CargoResult<()> {
579652
let mut handle = self.set.multi.add(handle)?;
653+
let now = Instant::now();
580654
handle.set_token(dl.token)?;
655+
dl.timed_out.set(None);
656+
dl.updated_at.set(now);
657+
dl.current.set(0);
658+
dl.total.set(0);
659+
dl.next_speed_check.set(now + dl.timeout.dur);
660+
dl.next_speed_check_bytes_threshold.set(dl.timeout.low_speed_limit as u64);
581661
self.pending.insert(dl.token, (dl, handle));
582662
Ok(())
583663
}
584664

585-
fn wait_for_curl(&mut self) -> CargoResult<(usize, CargoResult<()>)> {
665+
fn wait_for_curl(&mut self) -> CargoResult<(usize, Result<(), curl::Error>)> {
586666
// This is the main workhorse loop. We use libcurl's portable `wait`
587667
// method to actually perform blocking. This isn't necessarily too
588668
// efficient in terms of fd management, but we should only be juggling
@@ -610,7 +690,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
610690
let token = msg.token().expect("failed to read token");
611691
let handle = &pending[&token].1;
612692
if let Some(result) = msg.result_for(&handle) {
613-
results.push((token, result.map_err(|e| e.into())));
693+
results.push((token, result));
614694
} else {
615695
debug!("message without a result (?)");
616696
}
@@ -627,6 +707,52 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
627707
}
628708
}
629709

710+
fn progress(&self, token: usize, total: u64, cur: u64) -> bool {
711+
let dl = &self.pending[&token].0;
712+
dl.total.set(total);
713+
let now = Instant::now();
714+
if cur != dl.current.get() {
715+
dl.current.set(cur);
716+
dl.updated_at.set(now);
717+
718+
if dl.current.get() >= dl.next_speed_check_bytes_threshold.get() {
719+
dl.next_speed_check.set(now + dl.timeout.dur);
720+
dl.next_speed_check_bytes_threshold.set(
721+
dl.current.get() + dl.timeout.low_speed_limit as u64,
722+
);
723+
}
724+
}
725+
if !self.tick(WhyTick::DownloadUpdate).is_ok() {
726+
return false
727+
}
728+
729+
// If we've spent too long not actually receiving any data we time out.
730+
if now - dl.updated_at.get() > dl.timeout.dur {
731+
let msg = format!("failed to download any data for `{}` within {}s",
732+
dl.id,
733+
dl.timeout.dur.as_secs());
734+
dl.timed_out.set(Some(msg));
735+
return false
736+
}
737+
738+
// If we reached the point in time that we need to check our speed
739+
// limit, see if we've transferred enough data during this threshold. If
740+
// it fails this check then we fail because the download is going too
741+
// slowly.
742+
if now >= dl.next_speed_check.get() {
743+
assert!(dl.current.get() < dl.next_speed_check_bytes_threshold.get());
744+
let msg = format!("download of `{}` failed to transfer more \
745+
than {} bytes in {}s",
746+
dl.id,
747+
dl.timeout.low_speed_limit,
748+
dl.timeout.dur.as_secs());
749+
dl.timed_out.set(Some(msg));
750+
return false
751+
}
752+
753+
true
754+
}
755+
630756
fn tick(&self, why: WhyTick) -> CargoResult<()> {
631757
let mut progress = self.progress.borrow_mut();
632758
let progress = progress.as_mut().unwrap();

src/cargo/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ extern crate core_foundation;
2222
extern crate crates_io as registry;
2323
extern crate crossbeam_utils;
2424
extern crate curl;
25+
extern crate curl_sys;
2526
#[macro_use]
2627
extern crate failure;
2728
extern crate filetime;

src/cargo/ops/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ pub use self::cargo_package::{package, PackageOpts};
1515
pub use self::registry::{publish, registry_configuration, RegistryConfig};
1616
pub use self::registry::{http_handle, needs_custom_http_transport, registry_login, search};
1717
pub use self::registry::{modify_owners, yank, OwnersOptions, PublishOpts};
18-
pub use self::registry::configure_http_handle;
18+
pub use self::registry::{configure_http_handle, http_handle_and_timeout};
19+
pub use self::registry::HttpTimeout;
1920
pub use self::cargo_fetch::{fetch, FetchOptions};
2021
pub use self::cargo_pkgid::pkgid;
2122
pub use self::resolve::{add_overrides, get_resolved_packages, resolve_with_previous, resolve_ws,

0 commit comments

Comments
 (0)