Skip to content

Commit 98c62d0

Browse files
committed
Restructure multithreading
Redefine the Worker trait, which allows the chosen worker to create a new scope. This is relevant for a newly created (and installed) rayon worker. The rayon thread pool is now local to the decoding step. This fixes an issue where improper task scheduling would deadlock decoding. It's not clear how the intended task scheduling can be reliably achieved without the guarantee of having at least a second, free, worker thread.
1 parent 222c264 commit 98c62d0

File tree

4 files changed

+166
-56
lines changed

4 files changed

+166
-56
lines changed

src/decoder.rs

+14-14
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::parser::{
1717
IccChunk, ScanInfo,
1818
};
1919
use crate::upsampler::Upsampler;
20-
use crate::worker::{PlatformWorker, RowData, Worker};
20+
use crate::worker::{PreferWorkerKind, RowData, Worker, with_worker};
2121

2222
pub const MAX_COMPONENTS: usize = 4;
2323

@@ -191,7 +191,9 @@ impl<R: Read> Decoder<R> {
191191
///
192192
/// If successful, the metadata can be obtained using the `info` method.
193193
pub fn read_info(&mut self) -> Result<()> {
194-
self.decode_internal(true).map(|_| ())
194+
with_worker(PreferWorkerKind::Multithreaded, |worker| {
195+
self.decode_internal(true, worker)
196+
}).map(|_| ())
195197
}
196198

197199
/// Configure the decoder to scale the image during decoding.
@@ -219,10 +221,16 @@ impl<R: Read> Decoder<R> {
219221

220222
/// Decodes the image and returns the decoded pixels if successful.
221223
pub fn decode(&mut self) -> Result<Vec<u8>> {
222-
self.decode_internal(false)
224+
with_worker(PreferWorkerKind::Multithreaded, |worker| {
225+
self.decode_internal(false, worker)
226+
})
223227
}
224228

225-
fn decode_internal(&mut self, stop_after_metadata: bool) -> Result<Vec<u8>> {
229+
fn decode_internal(
230+
&mut self,
231+
stop_after_metadata: bool,
232+
worker: &mut dyn Worker,
233+
) -> Result<Vec<u8>> {
226234
if stop_after_metadata && self.frame.is_some() {
227235
// The metadata has already been read.
228236
return Ok(Vec::new());
@@ -237,7 +245,6 @@ impl<R: Read> Decoder<R> {
237245

238246
let mut previous_marker = Marker::SOI;
239247
let mut pending_marker = None;
240-
let mut worker = None;
241248
let mut scans_processed = 0;
242249
let mut planes = vec![
243250
Vec::<u8>::new();
@@ -318,9 +325,6 @@ impl<R: Read> Decoder<R> {
318325
if self.frame.is_none() {
319326
return Err(Error::Format("scan encountered before frame".to_owned()));
320327
}
321-
if worker.is_none() {
322-
worker = Some(PlatformWorker::new()?);
323-
}
324328

325329
let frame = self.frame.clone().unwrap();
326330
let scan = parse_sos(&mut self.reader, &frame)?;
@@ -383,7 +387,7 @@ impl<R: Read> Decoder<R> {
383387
}
384388

385389
let (marker, data) =
386-
self.decode_scan(&frame, &scan, worker.as_mut().unwrap(), &finished)?;
390+
self.decode_scan(&frame, &scan, worker, &finished)?;
387391

388392
if let Some(data) = data {
389393
for (i, plane) in data
@@ -545,10 +549,6 @@ impl<R: Read> Decoder<R> {
545549
};
546550

547551
// Get the worker prepared
548-
if worker.is_none() {
549-
worker = Some(PlatformWorker::new()?);
550-
}
551-
let worker = worker.as_mut().unwrap();
552552
let row_data = RowData {
553553
index: i,
554554
component: component.clone(),
@@ -616,7 +616,7 @@ impl<R: Read> Decoder<R> {
616616
&mut self,
617617
frame: &FrameInfo,
618618
scan: &ScanInfo,
619-
worker: &mut PlatformWorker,
619+
worker: &mut dyn Worker,
620620
finished: &[bool; MAX_COMPONENTS],
621621
) -> Result<(Option<Marker>, Option<Vec<Vec<u8>>>)> {
622622
assert!(scan.component_indices.len() <= MAX_COMPONENTS);

src/worker/immediate.rs

+8-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ pub struct ImmediateWorker {
1616
quantization_tables: Vec<Option<Arc<[u16; 64]>>>,
1717
}
1818

19+
pub fn with_immediate<T>(f: impl FnOnce(&mut dyn Worker) -> T) -> T {
20+
let mut worker = ImmediateWorker::new_immediate();
21+
f(&mut worker)
22+
}
23+
1924
impl ImmediateWorker {
2025
pub fn new_immediate() -> ImmediateWorker {
2126
ImmediateWorker {
@@ -25,6 +30,7 @@ impl ImmediateWorker {
2530
quantization_tables: vec![None; MAX_COMPONENTS],
2631
}
2732
}
33+
2834
pub fn start_immediate(&mut self, data: RowData) {
2935
assert!(self.results[data.index].is_empty());
3036

@@ -33,6 +39,7 @@ impl ImmediateWorker {
3339
self.components[data.index] = Some(data.component);
3440
self.quantization_tables[data.index] = Some(data.quantization_table);
3541
}
42+
3643
pub fn append_row_immediate(&mut self, (index, data): (usize, Vec<i16>)) {
3744
// Convert coefficients from a MCU row to samples.
3845

@@ -55,15 +62,13 @@ impl ImmediateWorker {
5562

5663
self.offsets[index] += block_count * component.dct_scale * component.dct_scale;
5764
}
65+
5866
pub fn get_result_immediate(&mut self, index: usize) -> Vec<u8> {
5967
mem::take(&mut self.results[index])
6068
}
6169
}
6270

6371
impl Worker for ImmediateWorker {
64-
fn new() -> Result<Self> {
65-
Ok(ImmediateWorker::new_immediate())
66-
}
6772
fn start(&mut self, data: RowData) -> Result<()> {
6873
self.start_immediate(data);
6974
Ok(())

src/worker/mod.rs

+15-7
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
11
mod immediate;
22
mod multithreaded;
33

4-
#[cfg(not(any(target_arch = "asmjs", target_arch = "wasm32")))]
5-
pub use self::multithreaded::MultiThreadedWorker as PlatformWorker;
6-
#[cfg(any(target_arch = "asmjs", target_arch = "wasm32"))]
7-
pub use self::immediate::ImmediateWorker as PlatformWorker;
8-
94
use alloc::sync::Arc;
105
use alloc::vec::Vec;
116
use crate::error::Result;
@@ -17,9 +12,22 @@ pub struct RowData {
1712
pub quantization_table: Arc<[u16; 64]>,
1813
}
1914

20-
pub trait Worker: Sized {
21-
fn new() -> Result<Self>;
15+
pub trait Worker {
2216
fn start(&mut self, row_data: RowData) -> Result<()>;
2317
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()>;
2418
fn get_result(&mut self, index: usize) -> Result<Vec<u8>>;
2519
}
20+
21+
pub enum PreferWorkerKind {
22+
Immediate,
23+
Multithreaded,
24+
}
25+
26+
/// Execute something with a worker system.
27+
pub fn with_worker<T>(prefer: PreferWorkerKind, f: impl FnOnce(&mut dyn Worker) -> T) -> T {
28+
match prefer {
29+
#[cfg(not(any(target_arch = "asmjs", target_arch = "wasm32")))]
30+
PreferWorkerKind::Multithreaded => self::multithreaded::with_multithreading(f),
31+
_ => self::immediate::with_immediate(f),
32+
}
33+
}

src/worker/multithreaded.rs

+129-32
Original file line numberDiff line numberDiff line change
@@ -4,34 +4,48 @@
44
//! and allow scaling to more cores.
55
//! However, that would be more complex, so we use this as a starting point.
66
7-
use std::{mem, io, sync::mpsc::{self, Sender}};
7+
use std::{mem, sync::mpsc::{self, Receiver, Sender}};
88
use crate::decoder::MAX_COMPONENTS;
99
use crate::error::Result;
1010
use super::{RowData, Worker};
1111
use super::immediate::ImmediateWorker;
1212

13+
pub fn with_multithreading<T>(f: impl FnOnce(&mut dyn Worker) -> T) -> T {
14+
#[cfg(not(feature = "rayon"))]
15+
return self::enter_threads(f);
16+
17+
#[cfg(feature = "rayon")]
18+
return jpeg_rayon::enter(|mut worker| {
19+
f(&mut worker)
20+
});
21+
}
22+
1323
enum WorkerMsg {
1424
Start(RowData),
1525
AppendRow(Vec<i16>),
1626
GetResult(Sender<Vec<u8>>),
1727
}
18-
pub struct MultiThreadedWorker {
28+
29+
#[derive(Default)]
30+
pub struct MpscWorker {
1931
senders: [Option<Sender<WorkerMsg>>; MAX_COMPONENTS]
2032
}
2133

22-
impl Worker for MultiThreadedWorker {
23-
fn new() -> Result<Self> {
24-
Ok(MultiThreadedWorker {
25-
senders: [None, None, None, None]
26-
})
27-
}
28-
fn start(&mut self, row_data: RowData) -> Result<()> {
34+
pub struct StdThreadWorker(MpscWorker);
35+
36+
impl MpscWorker {
37+
fn start_with(
38+
&mut self,
39+
row_data: RowData,
40+
spawn_worker: impl FnOnce(usize) -> Result<Sender<WorkerMsg>>,
41+
) -> Result<()> {
2942
// if there is no worker thread for this component yet, start one
3043
let component = row_data.index;
3144
if let None = self.senders[component] {
32-
let sender = spawn_worker_thread(component)?;
45+
let sender = spawn_worker(component)?;
3346
self.senders[component] = Some(sender);
3447
}
48+
3549
// we do the "take out value and put it back in once we're done" dance here
3650
// and in all other message-passing methods because there's not that many rows
3751
// and this should be cheaper than spawning MAX_COMPONENTS many threads up front
@@ -40,25 +54,42 @@ impl Worker for MultiThreadedWorker {
4054
self.senders[component] = Some(sender);
4155
Ok(())
4256
}
57+
4358
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
4459
let component = row.0;
4560
let sender = mem::replace(&mut self.senders[component], None).unwrap();
4661
sender.send(WorkerMsg::AppendRow(row.1)).expect("jpeg-decoder worker thread error");
4762
self.senders[component] = Some(sender);
4863
Ok(())
4964
}
50-
fn get_result(&mut self, index: usize) -> Result<Vec<u8>> {
65+
66+
fn get_result_with(
67+
&mut self,
68+
index: usize,
69+
collect: impl FnOnce(Receiver<Vec<u8>>) -> Vec<u8>,
70+
) -> Result<Vec<u8>> {
5171
let (tx, rx) = mpsc::channel();
5272
let sender = mem::replace(&mut self.senders[index], None).unwrap();
5373
sender.send(WorkerMsg::GetResult(tx)).expect("jpeg-decoder worker thread error");
54-
Ok(rx.recv().expect("jpeg-decoder worker thread error"))
74+
Ok(collect(rx))
5575
}
5676
}
5777

58-
fn spawn_worker_thread(component: usize) -> Result<Sender<WorkerMsg>> {
59-
let (tx, rx) = mpsc::channel();
78+
impl Worker for StdThreadWorker {
79+
fn start(&mut self, row_data: RowData) -> Result<()> {
80+
self.0.start_with(row_data, spawn_worker_thread)
81+
}
82+
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
83+
self.0.append_row(row)
84+
}
85+
fn get_result(&mut self, index: usize) -> Result<Vec<u8>> {
86+
self.0.get_result_with(index, collect_worker_thread)
87+
}
88+
}
6089

61-
spawn(component, move || {
90+
fn create_worker() -> (Sender<WorkerMsg>, impl FnOnce() + 'static) {
91+
let (tx, rx) = mpsc::channel();
92+
let closure = move || {
6293
let mut worker = ImmediateWorker::new_immediate();
6394

6495
while let Ok(message) = rx.recv() {
@@ -79,27 +110,93 @@ fn spawn_worker_thread(component: usize) -> Result<Sender<WorkerMsg>> {
79110
},
80111
}
81112
}
82-
})?;
113+
};
114+
115+
(tx, closure)
116+
}
83117

118+
fn spawn_worker_thread(component: usize) -> Result<Sender<WorkerMsg>> {
119+
let (tx, worker) = create_worker();
120+
let thread_builder =
121+
std::thread::Builder::new().name(format!("worker thread for component {}", component));
122+
thread_builder.spawn(worker)?;
84123
Ok(tx)
85124
}
86125

87-
#[cfg(feature = "rayon")]
88-
fn spawn<F>(_component: usize, func: F) -> io::Result<()>
89-
where
90-
F: FnOnce() + Send + 'static,
91-
{
92-
rayon::spawn(func);
93-
Ok(())
126+
127+
fn collect_worker_thread(rx: Receiver<Vec<u8>>) -> Vec<u8> {
128+
rx.recv().expect("jpeg-decoder worker thread error")
94129
}
95130

96-
#[cfg(not(feature = "rayon"))]
97-
fn spawn<F>(component: usize, func: F) -> io::Result<()>
98-
where
99-
F: FnOnce() + Send + 'static,
100-
{
101-
let thread_builder =
102-
std::thread::Builder::new().name(format!("worker thread for component {}", component));
103-
thread_builder.spawn(func)?;
104-
Ok(())
131+
#[allow(dead_code)]
132+
fn enter_threads<T>(f: impl FnOnce(&mut dyn Worker) -> T) -> T {
133+
let mut worker = StdThreadWorker(MpscWorker::default());
134+
f(&mut worker)
105135
}
136+
137+
138+
#[cfg(feature = "rayon")]
139+
mod jpeg_rayon {
140+
use crate::error::Result;
141+
use super::{MpscWorker, RowData};
142+
143+
pub struct Scoped<'r, 'scope> {
144+
fifo: &'r rayon::ScopeFifo<'scope>,
145+
inner: MpscWorker,
146+
}
147+
148+
pub fn enter<T>(f: impl FnOnce(Scoped) -> T) -> T {
149+
// Note: Must be at least two threads. Otherwise, we may deadlock, due to ordering
150+
// constraints that we can not impose properly. Note that `append_row` creates a new task
151+
// while in `get_result` we wait for all tasks of a component. The only way for rayon to
152+
// impose this wait __and get a result__ is by ending an in_place_scope.
153+
//
154+
// However, the ordering of tasks is not as FIFO as the name would suggest. Indeed, even
155+
// though tasks are spawned in `start` _before_ the task spawned in `get_result`, the
156+
// `in_place_scope_fifo` will wait for ITS OWN results in fifo order. This implies, unless
157+
// there is some other thread capable of stealing the worker the work task will in fact not
158+
// get executed and the result will wait forever. It is impossible to otherwise schedule
159+
// the worker tasks specifically (e.g. join handle would be cool *cough* if you read this
160+
// and work on rayon) before while yielding from the current thread.
161+
//
162+
// So: we need at least one more worker thread that is _not_ occupied.
163+
let threads = rayon::ThreadPoolBuilder::new().num_threads(4).build().unwrap();
164+
165+
threads.in_place_scope_fifo(|fifo| {
166+
f(Scoped { fifo, inner: MpscWorker::default() })
167+
})
168+
}
169+
170+
impl super::Worker for Scoped<'_, '_> {
171+
fn start(&mut self, row_data: RowData) -> Result<()> {
172+
let fifo = &mut self.fifo;
173+
self.inner.start_with(row_data, |_| {
174+
let (tx, worker) = super::create_worker();
175+
fifo.spawn_fifo(move |_| {
176+
worker()
177+
});
178+
Ok(tx)
179+
})
180+
}
181+
182+
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
183+
self.inner.append_row(row)
184+
}
185+
186+
fn get_result(&mut self, index: usize) -> Result<Vec<u8>> {
187+
self.inner.get_result_with(index, |rx| {
188+
let mut result = vec![];
189+
let deliver_result = &mut result;
190+
191+
rayon::in_place_scope_fifo(|scope| {
192+
scope.spawn_fifo(move |_| {
193+
*deliver_result = rx.recv().expect("jpeg-decoder worker thread error");
194+
});
195+
});
196+
197+
result
198+
})
199+
}
200+
}
201+
}
202+

0 commit comments

Comments
 (0)