Skip to content

Commit 29a5509

Browse files
committed
Add a new test, testing emulated nonblocking streams, it fails.
1 parent acc792b commit 29a5509

File tree

2 files changed

+50
-30
lines changed

2 files changed

+50
-30
lines changed

tests/blob.rs

+16-8
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ quick_error! {
5050
}
5151
}
5252

53-
fn blob(slow: bool, max_size: usize) -> Result<(), Error> {
53+
fn blob(slow: bool, blocking: bool, max_size: usize) -> Result<(), Error> {
5454
const SIZE: usize = 262_144;
5555
// This test generates a random 256KiB BLOB, sends it, and then receives the BLOB, where every byte is
5656
// added by 1.
@@ -82,13 +82,13 @@ fn blob(slow: bool, max_size: usize) -> Result<(), Error> {
8282
.with_endianness::<BigEndian>()
8383
.with_reader::<BufReader<SlowReader<TcpStream>>>()
8484
.with_max_size(max_size)
85-
.build(BufReader::new(SlowReader::new(stream.try_clone()?, slow)));
85+
.build(BufReader::new(SlowReader::new(stream.try_clone()?, slow, blocking)));
8686

8787
let mut sender = SenderBuilder::buffered()
8888
.with_type::<Response>()
8989
.with_endianness::<BigEndian>()
9090
.with_writer::<BufWriter<SlowWriter<TcpStream>>>()
91-
.build(BufWriter::new(SlowWriter::new(stream, slow)));
91+
.build(BufWriter::new(SlowWriter::new(stream, slow, blocking)));
9292

9393
while let Ok(command) = receiver.recv() {
9494
match command {
@@ -111,14 +111,14 @@ fn blob(slow: bool, max_size: usize) -> Result<(), Error> {
111111
.with_type::<Request>()
112112
.with_writer::<SlowWriter<TcpStream>>()
113113
.with_endianness::<BigEndian>()
114-
.build(SlowWriter::new(stream.try_clone()?, slow));
114+
.build(SlowWriter::new(stream.try_clone()?, slow, blocking));
115115

116116
let mut receiver = ReceiverBuilder::buffered()
117117
.with_type::<Response>()
118118
.with_reader::<BufReader<SlowReader<TcpStream>>>()
119119
.with_endianness::<BigEndian>()
120120
.with_max_size(max_size)
121-
.build(BufReader::new(SlowReader::new(stream, slow)));
121+
.build(BufReader::new(SlowReader::new(stream, slow, blocking)));
122122

123123
let blob = {
124124
let mut blob = vec! [0u8; SIZE];
@@ -150,14 +150,22 @@ fn blob(slow: bool, max_size: usize) -> Result<(), Error> {
150150
}
151151
#[test]
152152
fn fast_blob() -> Result<(), Error> {
153-
blob(false, DEFAULT_MAX_SIZE)
153+
blob(false, true, DEFAULT_MAX_SIZE)
154+
}
155+
#[test]
156+
fn fast_nonblocking_blob() -> Result<(), Error> {
157+
blob(false, false, DEFAULT_MAX_SIZE)
154158
}
155159
#[test]
156160
fn slow_blob() -> Result<(), Error> {
157-
blob(true, DEFAULT_MAX_SIZE)
161+
blob(true, true, DEFAULT_MAX_SIZE)
162+
}
163+
#[test]
164+
fn slow_nonblocking_blob() -> Result<(), Error> {
165+
blob(true, false, DEFAULT_MAX_SIZE)
158166
}
159167
#[should_panic]
160168
#[test]
161169
fn fast_blob_too_small() {
162-
blob(true, 1024).unwrap()
170+
blob(true, true, 1024).unwrap()
163171
}

tests/slow_io.rs

+34-22
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,39 @@
1-
use std::io::{Read, Result, Write};
2-
use std::time::Duration;
1+
use std::io::{Read, Result, Write, ErrorKind as IoErrorKind};
2+
use std::time::{Instant, Duration};
33

44
// In milliseconds.
55
const DELAY: u64 = 200;
66

77
pub struct SlowWriter<T: Write> {
88
inner: T,
99
slow: bool,
10+
blocking: bool,
11+
last_write: Option<Instant>,
1012
}
1113
impl<T: Write> SlowWriter<T> {
12-
pub fn new(inner: T, slow: bool) -> Self {
14+
pub fn new(inner: T, slow: bool, blocking: bool) -> Self {
1315
Self {
1416
inner,
1517
slow,
18+
blocking,
19+
last_write: None,
1620
}
1721
}
1822
}
1923
impl<T: Write> Write for SlowWriter<T> {
2024
fn write(&mut self, data: &[u8]) -> Result<usize> {
2125
if self.slow {
22-
std::thread::sleep(Duration::from_millis(DELAY));
26+
if self.blocking {
27+
std::thread::sleep(Duration::from_millis(DELAY));
28+
} else {
29+
match self.last_write {
30+
Some(last_write) => if last_write + Duration::from_millis(DELAY) > Instant::now() {
31+
} else {
32+
return Err(IoErrorKind::WouldBlock.into())
33+
},
34+
None => self.last_write = Some(Instant::now()),
35+
}
36+
}
2337
}
2438
self.inner.write(data)
2539
}
@@ -30,39 +44,37 @@ impl<T: Write> Write for SlowWriter<T> {
3044
self.inner.flush()
3145
}
3246
}
33-
impl<T: Write> From<T> for SlowWriter<T> {
34-
fn from(inner: T) -> Self {
35-
Self {
36-
inner,
37-
slow: false,
38-
}
39-
}
40-
}
4147
pub struct SlowReader<T: Read> {
4248
inner: T,
4349
slow: bool,
50+
blocking: bool,
51+
last_read: Option<Instant>,
4452
}
4553
impl<T: Read> SlowReader<T> {
46-
pub fn new(inner: T, slow: bool) -> Self {
54+
pub fn new(inner: T, slow: bool, blocking: bool) -> Self {
4755
Self {
4856
inner,
4957
slow,
58+
blocking,
59+
last_read: None,
5060
}
5161
}
5262
}
5363
impl<T: Read> Read for SlowReader<T> {
5464
fn read(&mut self, buffer: &mut [u8]) -> Result<usize> {
5565
if self.slow {
56-
std::thread::sleep(Duration::from_millis(DELAY));
66+
if self.blocking {
67+
std::thread::sleep(Duration::from_millis(DELAY));
68+
} else {
69+
match self.last_read {
70+
Some(last_write) => if last_write + Duration::from_millis(DELAY) > Instant::now() {
71+
} else {
72+
return Err(IoErrorKind::WouldBlock.into())
73+
},
74+
None => self.last_read = Some(Instant::now()),
75+
}
76+
}
5777
}
5878
self.inner.read(buffer)
5979
}
6080
}
61-
impl<T: Read> From<T> for SlowReader<T> {
62-
fn from(inner: T) -> Self {
63-
Self {
64-
inner,
65-
slow: false,
66-
}
67-
}
68-
}

0 commit comments

Comments
 (0)