Skip to content
This repository was archived by the owner on Dec 29, 2022. It is now read-only.

Commit ec8c6a2

Browse files
authored
Merge pull request #702 from alexheretic/work-pool-dos
Address work-pool DoS with long running tasks
2 parents 52f5348 + d778b9e commit ec8c6a2

File tree

8 files changed

+117
-35
lines changed

8 files changed

+117
-35
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ serde_json = "1.0"
3232
serde_derive = "1.0"
3333
url = "1.1.0"
3434
rayon = "0.9"
35+
num_cpus = "1"
3536

3637
[dev-dependencies]
3738
json = "0.11"

src/actions/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ macro_rules! parse_file_path {
5353
}
5454
}
5555

56+
pub mod work_pool;
5657
pub mod post_build;
5758
pub mod requests;
5859
pub mod notifications;

src/actions/requests.rs

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ use rustfmt::{FileName, format_input, Input as FmtInput};
2222
use rustfmt::file_lines::{FileLines, Range as RustfmtRange};
2323
use serde_json;
2424
use span;
25-
use rayon;
2625

26+
use actions::work_pool;
27+
use actions::work_pool::WorkDescription;
2728
use lsp_data;
2829
use lsp_data::*;
2930
use server;
@@ -50,7 +51,6 @@ pub use lsp_data::FindImpls;
5051

5152
use std::collections::HashMap;
5253
use std::path::Path;
53-
use std::sync::mpsc;
5454

5555
/// Represent the result of a deglob action for a single wildcard import.
5656
///
@@ -222,14 +222,14 @@ impl RequestAction for Definition {
222222
// If configured start racer concurrently and fallback to racer result
223223
let racer_receiver = {
224224
if ctx.config.lock().unwrap().goto_def_racer_fallback {
225-
Some(receive_from_thread(move || {
225+
Some(work_pool::receive_from_thread(move || {
226226
let cache = racer::FileCache::new(vfs);
227227
let session = racer::Session::new(&cache);
228228
let location = pos_to_racer_location(params.position);
229229

230230
racer::find_definition(file_path, location, &session)
231231
.and_then(location_from_racer_match)
232-
}))
232+
}, WorkDescription("textDocument/definition-racer")))
233233
} else {
234234
None
235235
}
@@ -850,29 +850,3 @@ fn location_from_racer_match(a_match: racer::Match) -> Option<Location> {
850850
ls_util::rls_location_to_location(&loc)
851851
})
852852
}
853-
854-
lazy_static! {
855-
/// Thread pool for request execution allowing concurrent request processing.
856-
static ref WORK_POOL: rayon::ThreadPool = rayon::ThreadPool::new(
857-
rayon::Configuration::default()
858-
.thread_name(|num| format!("request-worker-{}", num))
859-
// panic details will be on stderr, otherwise ignore the work panic as it
860-
// will already cause a mpsc disconnect-error & there isn't anything else to log
861-
.panic_handler(|_| {})
862-
).unwrap();
863-
}
864-
865-
/// Runs work in a new thread on the `WORK_POOL` returning a result `Receiver`
866-
/// Panicking work will receive `Err(RecvError)` / `Err(RecvTimeoutError::Disconnected)`
867-
pub fn receive_from_thread<T, F>(work_fn: F) -> mpsc::Receiver<T>
868-
where
869-
T: Send + 'static,
870-
F: FnOnce() -> T + Send + 'static,
871-
{
872-
let (sender, receiver) = mpsc::channel();
873-
WORK_POOL.spawn(move || {
874-
// an error here simply means the work took too long and the receiver has been dropped
875-
let _ = sender.send(work_fn());
876-
});
877-
receiver
878-
}

src/actions/work_pool.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
use rayon;
2+
use server::DEFAULT_REQUEST_TIMEOUT;
3+
use std::{fmt, panic};
4+
use std::time::{Duration, Instant};
5+
use std::sync::{mpsc, Mutex};
6+
7+
/// Description of work on the request work pool. Equality implies two pieces of work are the same
8+
/// kind of thing. The `str` should be human readable for logging, ie the language server protocol
9+
/// request message name or similar.
10+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11+
pub struct WorkDescription(pub &'static str);
12+
13+
impl fmt::Display for WorkDescription {
14+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
15+
write!(f, "{}", self.0)
16+
}
17+
}
18+
19+
lazy_static! {
20+
/// Maximum total concurrent working tasks
21+
static ref NUM_THREADS: usize = ::num_cpus::get();
22+
23+
/// Duration of work after which we should warn something is taking a long time
24+
static ref WARN_TASK_DURATION: Duration = *DEFAULT_REQUEST_TIMEOUT * 5;
25+
26+
/// Current work descriptions active on the work pool
27+
static ref WORK: Mutex<Vec<WorkDescription>> = Mutex::new(vec![]);
28+
29+
/// Thread pool for request execution allowing concurrent request processing.
30+
static ref WORK_POOL: rayon::ThreadPool = rayon::ThreadPool::new(
31+
rayon::Configuration::default()
32+
.thread_name(|num| format!("request-worker-{}", num))
33+
.num_threads(*NUM_THREADS)
34+
).unwrap();
35+
}
36+
37+
/// Maximum concurrent working tasks of the same type
38+
/// Note: `2` allows a single task to run immediately after a similar task has timed out.
39+
/// Once multiple tasks have timed out but remain running we start refusing to start new ones.
40+
const MAX_SIMILAR_CONCURRENT_WORK: usize = 2;
41+
42+
/// Runs work in a new thread on the `WORK_POOL` returning a result `Receiver`
43+
///
44+
/// Panicking work will receive `Err(RecvError)` / `Err(RecvTimeoutError::Disconnected)`
45+
///
46+
/// If too many tasks are already running the work will not be done and the receiver will
47+
/// immediately return `Err(RecvTimeoutError::Disconnected)`
48+
pub fn receive_from_thread<T, F>(work_fn: F, description: WorkDescription) -> mpsc::Receiver<T>
49+
where
50+
T: Send + 'static,
51+
F: FnOnce() -> T + Send + panic::UnwindSafe + 'static,
52+
{
53+
let (sender, receiver) = mpsc::channel();
54+
55+
{
56+
let mut work = WORK.lock().unwrap();
57+
if work.len() >= *NUM_THREADS {
58+
// there are already N ongoing tasks, that may or may not have timed out
59+
// don't add yet more to the queue fail fast to allow the work pool to recover
60+
warn!(
61+
"Could not start `{}` as at work capacity, {:?} in progress",
62+
description, *work,
63+
);
64+
return receiver;
65+
}
66+
if work.iter().filter(|desc| *desc == &description).count() >= MAX_SIMILAR_CONCURRENT_WORK {
67+
// this type of work is already filling around half the work pool, so there's
68+
// good reason to believe it may fill the entire pool => fail fast to allow
69+
// other task-types to run
70+
info!(
71+
"Could not start `{}` as same work-type is filling half capacity, {:?} in progress",
72+
description, *work,
73+
);
74+
return receiver;
75+
}
76+
work.push(description);
77+
}
78+
79+
WORK_POOL.spawn(move || {
80+
let start = Instant::now();
81+
82+
// panic details will be on stderr, otherwise ignore the work panic as it
83+
// will already cause a mpsc disconnect-error & there isn't anything else to log
84+
if let Ok(work_result) = panic::catch_unwind(work_fn) {
85+
// an error here simply means the work took too long and the receiver has been dropped
86+
let _ = sender.send(work_result);
87+
}
88+
89+
let mut work = WORK.lock().unwrap();
90+
if let Some(index) = work.iter().position(|desc| desc == &description) {
91+
work.swap_remove(index);
92+
}
93+
94+
let elapsed = start.elapsed();
95+
if elapsed >= *WARN_TASK_DURATION {
96+
let secs =
97+
elapsed.as_secs() as f64 + f64::from(elapsed.subsec_nanos()) / 1_000_000_000_f64;
98+
warn!("`{}` took {:.1}s", description, secs);
99+
}
100+
});
101+
receiver
102+
}

src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ extern crate languageserver_types as ls_types;
3030
extern crate lazy_static;
3131
#[macro_use]
3232
extern crate log;
33+
extern crate num_cpus;
3334
extern crate racer;
3435
extern crate rayon;
3536
extern crate rls_analysis as analysis;

src/server/dispatch.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
// except according to those terms.
1010

1111
use super::requests::*;
12+
use actions::work_pool;
13+
use actions::work_pool::WorkDescription;
1214
use jsonrpc_core as jsonrpc;
1315
use server;
1416
use server::{Request, Response};
@@ -20,7 +22,7 @@ use std::thread;
2022
use std::time::Duration;
2123

2224
lazy_static! {
23-
static ref TIMEOUT: Duration = Duration::from_millis(::COMPILER_TIMEOUT);
25+
pub static ref DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_millis(::COMPILER_TIMEOUT);
2426
}
2527

2628
/// Macro enum `DispatchRequest` packing in various similar `Request` types
@@ -48,7 +50,7 @@ macro_rules! define_dispatch_request_enum {
4850
let Request { id, params, received, .. } = req;
4951
let timeout = $request_type::timeout();
5052

51-
let receiver = receive_from_thread(move || {
53+
let receiver = work_pool::receive_from_thread(move || {
5254
// checking timeout here can prevent starting expensive work that has
5355
// already timed out due to previous long running requests
5456
// Note: done here on the threadpool as pool scheduling may incur
@@ -59,7 +61,7 @@ macro_rules! define_dispatch_request_enum {
5961
else {
6062
$request_type::handle(ctx, params)
6163
}
62-
});
64+
}, WorkDescription($request_type::METHOD));
6365

6466
match receiver.recv_timeout(timeout)
6567
.unwrap_or_else(|_| $request_type::fallback_response()) {
@@ -160,7 +162,7 @@ pub trait RequestAction: LSPRequest {
160162

161163
/// Max duration this request should finish within, also see `fallback_response()`
162164
fn timeout() -> Duration {
163-
*TIMEOUT
165+
*DEFAULT_REQUEST_TIMEOUT
164166
}
165167

166168
/// Returns a response used in timeout scenarios

src/server/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use config::Config;
2828
pub use server::io::{MessageReader, Output};
2929
use server::io::{StdioMsgReader, StdioOutput};
3030
use server::dispatch::Dispatcher;
31-
pub use server::dispatch::{RequestAction, ResponseError};
31+
pub use server::dispatch::{RequestAction, ResponseError, DEFAULT_REQUEST_TIMEOUT};
3232

3333
pub use ls_types::request::Shutdown as ShutdownRequest;
3434
pub use ls_types::request::Initialize as InitializeRequest;

0 commit comments

Comments
 (0)