Skip to content

Commit 66f31fd

Browse files
authored
Merge pull request #230 from image-rs/rayon-blocking
Restructure multithreading
2 parents 222c264 + ac9f2e2 commit 66f31fd

File tree

11 files changed

+401
-72
lines changed

11 files changed

+401
-72
lines changed

.github/workflows/rust.yml

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ jobs:
4949
steps:
5050
- name: Installing emulator and linker
5151
run: |
52+
sudo apt-get update
5253
sudo apt-get install qemu binfmt-support qemu-user-static gcc-aarch64-linux-gnu binutils-aarch64-linux-gnu
5354
5455
- name: Installing Rust toolchain

Cargo.toml

+21-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@ png = "0.16"
1919
walkdir = "2.0"
2020
criterion = "0.3"
2121

22+
[features]
23+
default = ["rayon"]
24+
platform_independent = []
25+
nightly_aarch64_neon = []
26+
27+
## Internal development configuration: testing and benchmarking
28+
2229
[[bench]]
2330
name = "decoding_benchmark"
2431
harness = false
@@ -27,8 +34,18 @@ harness = false
2734
name = "large_image"
2835
harness = false
2936

30-
[features]
31-
default = ["rayon"]
32-
platform_independent = []
33-
nightly_aarch64_neon = []
37+
[[test]]
38+
name = "rayon"
39+
required-features = ["rayon"]
40+
41+
[[test]]
42+
name = "rayon-0"
43+
required-features = ["rayon"]
44+
45+
[[test]]
46+
name = "rayon-1"
47+
required-features = ["rayon"]
3448

49+
[[test]]
50+
name = "rayon-2"
51+
required-features = ["rayon"]

src/decoder.rs

+25-20
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::parser::{
1717
IccChunk, ScanInfo,
1818
};
1919
use crate::upsampler::Upsampler;
20-
use crate::worker::{PlatformWorker, RowData, Worker};
20+
use crate::worker::{PreferWorkerKind, RowData, Worker, with_worker};
2121

2222
pub const MAX_COMPONENTS: usize = 4;
2323

@@ -191,7 +191,9 @@ impl<R: Read> Decoder<R> {
191191
///
192192
/// If successful, the metadata can be obtained using the `info` method.
193193
pub fn read_info(&mut self) -> Result<()> {
194-
self.decode_internal(true).map(|_| ())
194+
with_worker(PreferWorkerKind::Multithreaded, |worker| {
195+
self.decode_internal(true, worker)
196+
}).map(|_| ())
195197
}
196198

197199
/// Configure the decoder to scale the image during decoding.
@@ -219,10 +221,16 @@ impl<R: Read> Decoder<R> {
219221

220222
/// Decodes the image and returns the decoded pixels if successful.
221223
pub fn decode(&mut self) -> Result<Vec<u8>> {
222-
self.decode_internal(false)
224+
with_worker(PreferWorkerKind::Multithreaded, |worker| {
225+
self.decode_internal(false, worker)
226+
})
223227
}
224228

225-
fn decode_internal(&mut self, stop_after_metadata: bool) -> Result<Vec<u8>> {
229+
fn decode_internal(
230+
&mut self,
231+
stop_after_metadata: bool,
232+
worker: &mut dyn Worker,
233+
) -> Result<Vec<u8>> {
226234
if stop_after_metadata && self.frame.is_some() {
227235
// The metadata has already been read.
228236
return Ok(Vec::new());
@@ -237,7 +245,6 @@ impl<R: Read> Decoder<R> {
237245

238246
let mut previous_marker = Marker::SOI;
239247
let mut pending_marker = None;
240-
let mut worker = None;
241248
let mut scans_processed = 0;
242249
let mut planes = vec![
243250
Vec::<u8>::new();
@@ -318,9 +325,6 @@ impl<R: Read> Decoder<R> {
318325
if self.frame.is_none() {
319326
return Err(Error::Format("scan encountered before frame".to_owned()));
320327
}
321-
if worker.is_none() {
322-
worker = Some(PlatformWorker::new()?);
323-
}
324328

325329
let frame = self.frame.clone().unwrap();
326330
let scan = parse_sos(&mut self.reader, &frame)?;
@@ -383,7 +387,7 @@ impl<R: Read> Decoder<R> {
383387
}
384388

385389
let (marker, data) =
386-
self.decode_scan(&frame, &scan, worker.as_mut().unwrap(), &finished)?;
390+
self.decode_scan(&frame, &scan, worker, &finished)?;
387391

388392
if let Some(data) = data {
389393
for (i, plane) in data
@@ -545,10 +549,6 @@ impl<R: Read> Decoder<R> {
545549
};
546550

547551
// Get the worker prepared
548-
if worker.is_none() {
549-
worker = Some(PlatformWorker::new()?);
550-
}
551-
let worker = worker.as_mut().unwrap();
552552
let row_data = RowData {
553553
index: i,
554554
component: component.clone(),
@@ -560,14 +560,17 @@ impl<R: Read> Decoder<R> {
560560
let coefficients_per_mcu_row = usize::from(component.block_size.width)
561561
* usize::from(component.vertical_sampling_factor)
562562
* 64;
563-
for mcu_y in 0..frame.mcu_size.height {
564-
let row_coefficients = {
563+
564+
let mut tasks = (0..frame.mcu_size.height)
565+
.map(|mcu_y| {
565566
let offset = usize::from(mcu_y) * coefficients_per_mcu_row;
566-
self.coefficients[i][offset..offset + coefficients_per_mcu_row].to_vec()
567-
};
567+
let row_coefficients = self.coefficients[i][offset..offset + coefficients_per_mcu_row].to_vec();
568+
(i, row_coefficients)
569+
});
568570

569-
worker.append_row((i, row_coefficients))?;
570-
}
571+
// FIXME: additional potential work stealing opportunities for rayon case if we
572+
// also internally can parallelize over components.
573+
worker.append_rows(&mut tasks)?;
571574
planes[i] = worker.get_result(i)?;
572575
}
573576
}
@@ -616,7 +619,7 @@ impl<R: Read> Decoder<R> {
616619
&mut self,
617620
frame: &FrameInfo,
618621
scan: &ScanInfo,
619-
worker: &mut PlatformWorker,
622+
worker: &mut dyn Worker,
620623
finished: &[bool; MAX_COMPONENTS],
621624
) -> Result<(Option<Marker>, Option<Vec<Vec<u8>>>)> {
622625
assert!(scan.component_indices.len() <= MAX_COMPONENTS);
@@ -871,6 +874,8 @@ impl<R: Read> Decoder<R> {
871874
)
872875
};
873876

877+
// FIXME: additional potential work stealing opportunities for rayon case if we
878+
// also internally can parallelize over components.
874879
worker.append_row((i, row_coefficients))?;
875880
}
876881
}

src/worker/immediate.rs

+8-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ pub struct ImmediateWorker {
1616
quantization_tables: Vec<Option<Arc<[u16; 64]>>>,
1717
}
1818

19+
pub fn with_immediate<T>(f: impl FnOnce(&mut dyn Worker) -> T) -> T {
20+
let mut worker = ImmediateWorker::new_immediate();
21+
f(&mut worker)
22+
}
23+
1924
impl ImmediateWorker {
2025
pub fn new_immediate() -> ImmediateWorker {
2126
ImmediateWorker {
@@ -25,6 +30,7 @@ impl ImmediateWorker {
2530
quantization_tables: vec![None; MAX_COMPONENTS],
2631
}
2732
}
33+
2834
pub fn start_immediate(&mut self, data: RowData) {
2935
assert!(self.results[data.index].is_empty());
3036

@@ -33,6 +39,7 @@ impl ImmediateWorker {
3339
self.components[data.index] = Some(data.component);
3440
self.quantization_tables[data.index] = Some(data.quantization_table);
3541
}
42+
3643
pub fn append_row_immediate(&mut self, (index, data): (usize, Vec<i16>)) {
3744
// Convert coefficients from a MCU row to samples.
3845

@@ -55,15 +62,13 @@ impl ImmediateWorker {
5562

5663
self.offsets[index] += block_count * component.dct_scale * component.dct_scale;
5764
}
65+
5866
pub fn get_result_immediate(&mut self, index: usize) -> Vec<u8> {
5967
mem::take(&mut self.results[index])
6068
}
6169
}
6270

6371
impl Worker for ImmediateWorker {
64-
fn new() -> Result<Self> {
65-
Ok(ImmediateWorker::new_immediate())
66-
}
6772
fn start(&mut self, data: RowData) -> Result<()> {
6873
self.start_immediate(data);
6974
Ok(())

src/worker/mod.rs

+29-7
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
mod immediate;
22
mod multithreaded;
3-
4-
#[cfg(not(any(target_arch = "asmjs", target_arch = "wasm32")))]
5-
pub use self::multithreaded::MultiThreadedWorker as PlatformWorker;
6-
#[cfg(any(target_arch = "asmjs", target_arch = "wasm32"))]
7-
pub use self::immediate::ImmediateWorker as PlatformWorker;
3+
#[cfg(feature = "rayon")]
4+
mod rayon;
85

96
use alloc::sync::Arc;
107
use alloc::vec::Vec;
@@ -17,9 +14,34 @@ pub struct RowData {
1714
pub quantization_table: Arc<[u16; 64]>,
1815
}
1916

20-
pub trait Worker: Sized {
21-
fn new() -> Result<Self>;
17+
pub trait Worker {
2218
fn start(&mut self, row_data: RowData) -> Result<()>;
2319
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()>;
2420
fn get_result(&mut self, index: usize) -> Result<Vec<u8>>;
21+
/// Default implementation for spawning multiple tasks.
22+
fn append_rows(&mut self, row: &mut dyn Iterator<Item=(usize, Vec<i16>)>)
23+
-> Result<()>
24+
{
25+
for item in row {
26+
self.append_row(item)?;
27+
}
28+
Ok(())
29+
}
30+
}
31+
32+
pub enum PreferWorkerKind {
33+
Immediate,
34+
Multithreaded,
35+
}
36+
37+
/// Execute something with a worker system.
38+
pub fn with_worker<T>(prefer: PreferWorkerKind, f: impl FnOnce(&mut dyn Worker) -> T) -> T {
39+
match prefer {
40+
#[cfg(not(any(target_arch = "asmjs", target_arch = "wasm32")))]
41+
#[cfg(feature = "rayon")]
42+
PreferWorkerKind::Multithreaded => self::rayon::with_rayon(f),
43+
#[cfg(not(any(target_arch = "asmjs", target_arch = "wasm32")))]
44+
PreferWorkerKind::Multithreaded => self::multithreaded::with_multithreading(f),
45+
_ => self::immediate::with_immediate(f),
46+
}
2547
}

0 commit comments

Comments
 (0)