Skip to content
This repository was archived by the owner on Nov 1, 2023. It is now read-only.

Commit 419ca05

Browse files
authored
Actively tail worker stdio from supervisor agent (#588)
In the supervisor agent, incrementally read from the running worker agent's redirected stderr and stdout, instead of waiting until it exits. The worker agent's stderr and stdout are piped to the supervisor when tasks are run. The supervisor's `WorkerRunner` does _not_ use `wait_with_output()`, which handles this (at the cost of blocking). Instead, it makes repeated calls to to `try_wait()` on timer-based state transitions, and does not try to read the pipes until the worker exits. But when one of the child's pipes is full, the child can block forever waiting on a `write(2)`, such as in a `log` facade implementation. This bug has not been caught because we control the child worker agent, and until recently, it mostly only wrote to these streams using `env_logger` at its default log level. But recent work: (1) set more-verbose `INFO` level default logging, (2) logged stderr/stdout lines of child processes of _the worker_, and (3) some user targets logged very verbosely for debugging. This surfaced the underlying issue.
1 parent 06f45f3 commit 419ca05

File tree

4 files changed

+249
-22
lines changed

4 files changed

+249
-22
lines changed
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
#[derive(Clone, Debug, Eq, PartialEq)]
4+
pub struct TailBuffer {
5+
data: Vec<u8>,
6+
capacity: usize,
7+
}
8+
9+
impl TailBuffer {
10+
pub fn new(capacity: usize) -> Self {
11+
let data = Vec::with_capacity(capacity);
12+
Self { data, capacity }
13+
}
14+
15+
pub fn data(&self) -> &[u8] {
16+
&self.data
17+
}
18+
19+
pub fn to_string_lossy(&self) -> String {
20+
String::from_utf8_lossy(self.data()).to_string()
21+
}
22+
}
23+
24+
impl std::io::Write for TailBuffer {
25+
fn write(&mut self, new_data: &[u8]) -> std::io::Result<usize> {
26+
// Write the new data to the internal buffer, allocating internally as needed.
27+
self.data.extend(new_data);
28+
29+
// Shift and truncate the buffer if it is too big.
30+
if self.data.len() > self.capacity {
31+
let lo = self.data.len() - self.capacity;
32+
let range = lo..self.data.len();
33+
self.data.copy_within(range, 0);
34+
self.data.truncate(self.capacity);
35+
}
36+
37+
Ok(new_data.len())
38+
}
39+
40+
fn flush(&mut self) -> std::io::Result<()> {
41+
Ok(())
42+
}
43+
}
44+
45+
#[cfg(test)]
46+
mod tests {
47+
use std::io::Write;
48+
49+
use super::*;
50+
51+
#[test]
52+
fn test_tail_buffer() {
53+
let mut buf = TailBuffer::new(5);
54+
55+
assert!(buf.data().is_empty());
56+
57+
buf.write(&[1, 2, 3]).unwrap();
58+
assert_eq!(buf.data(), &[1, 2, 3]);
59+
60+
buf.write(&[]).unwrap();
61+
assert_eq!(buf.data(), &[1, 2, 3]);
62+
63+
buf.write(&[4, 5]).unwrap();
64+
assert_eq!(buf.data(), &[1, 2, 3, 4, 5]);
65+
66+
buf.write(&[6, 7, 8]).unwrap();
67+
assert_eq!(buf.data(), &[4, 5, 6, 7, 8]);
68+
69+
buf.write(&[9, 10, 11, 12, 13]).unwrap();
70+
assert_eq!(buf.data(), &[9, 10, 11, 12, 13]);
71+
72+
buf.write(&[14, 15, 16, 17, 18, 19, 20, 21, 22]).unwrap();
73+
assert_eq!(buf.data(), &[18, 19, 20, 21, 22]);
74+
}
75+
}

src/agent/onefuzz-supervisor/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use structopt::StructOpt;
2929

3030
pub mod agent;
3131
pub mod auth;
32+
pub mod buffer;
3233
pub mod commands;
3334
pub mod config;
3435
pub mod coordinator;

src/agent/onefuzz-supervisor/src/worker.rs

Lines changed: 111 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
3-
43
use std::{
54
path::{Path, PathBuf},
6-
process::{Child, Command, Stdio},
5+
process::{Child, ChildStderr, ChildStdout, Command, Stdio},
6+
thread::{self, JoinHandle},
77
};
88

9-
use anyhow::{Context as AnyhowContext, Result};
9+
use anyhow::{format_err, Context as AnyhowContext, Result};
1010
use downcast_rs::Downcast;
1111
use onefuzz::process::{ExitStatus, Output};
1212
use tokio::fs;
1313

14+
use crate::buffer::TailBuffer;
1415
use crate::work::*;
1516

17+
// Max length of captured output streams from worker child processes.
18+
const MAX_TAIL_LEN: usize = 4096;
19+
1620
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1721
#[serde(rename_all = "snake_case")]
1822
pub enum WorkerEvent {
@@ -231,24 +235,118 @@ impl IWorkerRunner for WorkerRunner {
231235
cmd.stderr(Stdio::piped());
232236
cmd.stdout(Stdio::piped());
233237

234-
let child = cmd.spawn().context("onefuzz-agent failed to start")?;
235-
let child = Box::new(child);
238+
Ok(Box::new(RedirectedChild::spawn(cmd)?))
239+
}
240+
}
241+
242+
/// Child process with redirected output streams, tailed by two worker threads.
243+
struct RedirectedChild {
244+
/// The child process.
245+
child: Child,
246+
247+
/// Worker threads which continuously read from the redirected streams.
248+
streams: Option<StreamReaderThreads>,
249+
}
250+
251+
impl RedirectedChild {
252+
pub fn spawn(mut cmd: Command) -> Result<Self> {
253+
// Make sure we capture the child's output streams.
254+
cmd.stderr(Stdio::piped());
255+
cmd.stdout(Stdio::piped());
256+
257+
let mut child = cmd.spawn().context("onefuzz-agent failed to start")?;
258+
259+
// Guaranteed by the above.
260+
let stderr = child.stderr.take().unwrap();
261+
let stdout = child.stdout.take().unwrap();
262+
let streams = Some(StreamReaderThreads::new(stderr, stdout));
236263

237-
Ok(child)
264+
Ok(Self { child, streams })
238265
}
239266
}
240267

241-
impl IWorkerChild for Child {
268+
/// Worker threads that tail the redirected output streams of a running child process.
269+
struct StreamReaderThreads {
270+
stderr: JoinHandle<TailBuffer>,
271+
stdout: JoinHandle<TailBuffer>,
272+
}
273+
274+
struct CapturedStreams {
275+
stderr: String,
276+
stdout: String,
277+
}
278+
279+
impl StreamReaderThreads {
280+
pub fn new(mut stderr: ChildStderr, mut stdout: ChildStdout) -> Self {
281+
use std::io::Read;
282+
283+
let stderr = thread::spawn(move || {
284+
let mut buf = TailBuffer::new(MAX_TAIL_LEN);
285+
let mut tmp = [0u8; MAX_TAIL_LEN];
286+
287+
while let Ok(count) = stderr.read(&mut tmp) {
288+
if count == 0 {
289+
break;
290+
}
291+
if let Err(err) = std::io::copy(&mut &tmp[..count], &mut buf) {
292+
log::error!("error copying to circular buffer: {}", err);
293+
break;
294+
}
295+
}
296+
297+
buf
298+
});
299+
300+
let stdout = thread::spawn(move || {
301+
let mut buf = TailBuffer::new(MAX_TAIL_LEN);
302+
let mut tmp = [0u8; MAX_TAIL_LEN];
303+
304+
while let Ok(count) = stdout.read(&mut tmp) {
305+
if count == 0 {
306+
break;
307+
}
308+
309+
if let Err(err) = std::io::copy(&mut &tmp[..count], &mut buf) {
310+
log::error!("error copying to circular buffer: {}", err);
311+
break;
312+
}
313+
}
314+
315+
buf
316+
});
317+
318+
Self { stderr, stdout }
319+
}
320+
321+
pub fn join(self) -> Result<CapturedStreams> {
322+
let stderr = self
323+
.stderr
324+
.join()
325+
.map_err(|_| format_err!("stderr tail thread panicked"))?
326+
.to_string_lossy();
327+
let stdout = self
328+
.stdout
329+
.join()
330+
.map_err(|_| format_err!("stdout tail thread panicked"))?
331+
.to_string_lossy();
332+
333+
Ok(CapturedStreams { stderr, stdout })
334+
}
335+
}
336+
337+
impl IWorkerChild for RedirectedChild {
242338
fn try_wait(&mut self) -> Result<Option<Output>> {
243-
let output = if let Some(exit_status) = self.try_wait()? {
339+
let output = if let Some(exit_status) = self.child.try_wait()? {
244340
let exit_status = exit_status.into();
245-
let stderr = read_to_string(&mut self.stderr)?;
246-
let stdout = read_to_string(&mut self.stdout)?;
341+
let streams = self.streams.take();
342+
let streams = streams
343+
.ok_or_else(|| format_err!("onefuzz-agent streams not captured"))?
344+
.join()?;
247345

248346
Some(Output {
249347
exit_status,
250-
stderr,
251-
stdout,
348+
stderr: streams.stderr,
349+
stdout: streams.stdout,
252350
})
253351
} else {
254352
None
@@ -260,7 +358,7 @@ impl IWorkerChild for Child {
260358
fn kill(&mut self) -> Result<()> {
261359
use std::io::ErrorKind;
262360

263-
let killed = self.kill();
361+
let killed = self.child.kill();
264362

265363
if let Err(err) = &killed {
266364
if let ErrorKind::InvalidInput = err.kind() {
@@ -273,15 +371,6 @@ impl IWorkerChild for Child {
273371
}
274372
}
275373

276-
fn read_to_string(stream: &mut Option<impl std::io::Read>) -> Result<String> {
277-
let mut data = Vec::new();
278-
if let Some(stream) = stream {
279-
stream.read_to_end(&mut data)?;
280-
}
281-
282-
Ok(String::from_utf8_lossy(&data).into_owned())
283-
}
284-
285374
#[cfg(test)]
286375
pub mod double;
287376

src/agent/onefuzz-supervisor/src/worker/tests.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,3 +226,65 @@ async fn test_worker_done() {
226226
assert!(matches!(worker, Worker::Done(..)));
227227
assert_eq!(events, vec![]);
228228
}
229+
230+
#[cfg(target_os = "linux")]
231+
#[test]
232+
fn test_redirected_child() {
233+
use std::iter::repeat;
234+
use std::process::Command;
235+
236+
// Assume OS pipe capacity of 16 pages, each 4096 bytes.
237+
//
238+
// For each stream,
239+
//
240+
// 1. Write enough of one char to fill up the OS pipe.
241+
// 2. Write a smaller count (< tail size) of another char to overflow it.
242+
//
243+
// Our tailing buffer has size 4096, so we will expect to see all of the
244+
// bytes from the second char, and the remainder from the first char.
245+
let script = "import sys;\
246+
sys.stdout.write('A' * 65536 + 'B' * 4000);\
247+
sys.stderr.write('C' * 65536 + 'D' * 4000)";
248+
249+
let mut cmd = Command::new("python3");
250+
cmd.args(&["-c", script]);
251+
252+
let mut redirected = RedirectedChild::spawn(cmd).unwrap();
253+
redirected.child.wait().unwrap();
254+
let captured = redirected.streams.unwrap().join().unwrap();
255+
256+
let stdout: String = repeat('A').take(96).chain(repeat('B').take(4000)).collect();
257+
assert_eq!(captured.stdout, stdout);
258+
259+
let stderr: String = repeat('C').take(96).chain(repeat('D').take(4000)).collect();
260+
assert_eq!(captured.stderr, stderr);
261+
}
262+
263+
#[cfg(target_os = "windows")]
264+
#[test]
265+
fn test_redirected_child() {
266+
use std::iter::repeat;
267+
use std::process::Command;
268+
269+
// Only write to stdout.
270+
let script = "Write-Output ('A' * 65536 + 'B' * 4000)";
271+
272+
let mut cmd = Command::new("powershell.exe");
273+
cmd.args(&[
274+
"-NonInteractive",
275+
"-ExecutionPolicy",
276+
"Unrestricted",
277+
"-Command",
278+
script,
279+
]);
280+
281+
let mut redirected = RedirectedChild::spawn(cmd).unwrap();
282+
redirected.child.wait().unwrap();
283+
let captured = redirected.streams.unwrap().join().unwrap();
284+
285+
let mut stdout: String = repeat('A').take(94).chain(repeat('B').take(4000)).collect();
286+
stdout.push_str("\r\n");
287+
assert_eq!(captured.stdout, stdout);
288+
289+
assert_eq!(captured.stderr, "");
290+
}

0 commit comments

Comments
 (0)