Skip to content

Commit 47bdfe2

Browse files
authored
Merge pull request #10 from fairingrey/decompression
Decompressed Stream Support
2 parents 0ff6229 + 01cd407 commit 47bdfe2

File tree

9 files changed

+601
-34
lines changed

9 files changed

+601
-34
lines changed

src/stream/brotli.rs

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ use core::{
22
pin::Pin,
33
task::{Context, Poll},
44
};
5-
use std::io::Result;
5+
use std::io::{Error, ErrorKind, Result};
66

7-
use brotli2::raw::{CoStatus, CompressOp};
7+
use brotli2::raw::{CoStatus, CompressOp, DeStatus, Decompress};
88
pub use brotli2::{raw::Compress, CompressParams};
99
use bytes::{Bytes, BytesMut};
1010
use futures::{ready, stream::Stream};
@@ -72,3 +72,67 @@ impl<S: Stream<Item = Result<Bytes>>> BrotliStream<S> {
7272
}
7373
}
7474
}
75+
76+
#[unsafe_project(Unpin)]
77+
pub struct DecompressedBrotliStream<S: Stream<Item = Result<Bytes>>> {
78+
#[pin]
79+
inner: S,
80+
flush: bool,
81+
decompress: Decompress,
82+
}
83+
84+
impl<S: Stream<Item = Result<Bytes>>> Stream for DecompressedBrotliStream<S> {
85+
type Item = Result<Bytes>;
86+
87+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
88+
const OUTPUT_BUFFER_SIZE: usize = 8_000;
89+
90+
let this = self.project();
91+
92+
if *this.flush {
93+
return Poll::Ready(None);
94+
}
95+
96+
let input_buffer = if let Some(bytes) = ready!(this.inner.poll_next(cx)) {
97+
bytes?
98+
} else {
99+
*this.flush = true;
100+
Bytes::new()
101+
};
102+
103+
let mut decompressed_output = BytesMut::with_capacity(OUTPUT_BUFFER_SIZE);
104+
let input_ref = &mut input_buffer.as_ref();
105+
let output_ref = &mut &mut [][..];
106+
loop {
107+
let status = this.decompress.decompress(input_ref, output_ref)?;
108+
while let Some(buf) = this.decompress.take_output(None) {
109+
decompressed_output.extend_from_slice(buf);
110+
}
111+
match status {
112+
DeStatus::Finished => break,
113+
DeStatus::NeedInput => {
114+
if *this.flush {
115+
return Poll::Ready(Some(Err(Error::new(
116+
ErrorKind::UnexpectedEof,
117+
"reached unexpected EOF",
118+
))));
119+
}
120+
break;
121+
}
122+
DeStatus::NeedOutput => (),
123+
}
124+
}
125+
126+
Poll::Ready(Some(Ok(decompressed_output.freeze())))
127+
}
128+
}
129+
130+
impl<S: Stream<Item = Result<Bytes>>> DecompressedBrotliStream<S> {
131+
pub fn new(stream: S) -> DecompressedBrotliStream<S> {
132+
DecompressedBrotliStream {
133+
inner: stream,
134+
flush: false,
135+
decompress: Decompress::new(),
136+
}
137+
}
138+
}

src/stream/deflate.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ use core::{
55
use std::io::Result;
66
use std::marker::Unpin;
77

8-
use super::flate::CompressedStream;
8+
use super::flate::{CompressedStream, DecompressedStream};
99
use bytes::Bytes;
10-
use flate2::Compress;
1110
pub use flate2::Compression;
11+
use flate2::{Compress, Decompress};
1212
use futures::{stream::Stream, stream::StreamExt};
1313

1414
pub struct DeflateStream<S: Stream<Item = Result<Bytes>> + Unpin> {
@@ -30,3 +30,23 @@ impl<S: Stream<Item = Result<Bytes>> + Unpin> DeflateStream<S> {
3030
}
3131
}
3232
}
33+
34+
pub struct DecompressedDeflateStream<S: Stream<Item = Result<Bytes>> + Unpin> {
35+
inner: DecompressedStream<S>,
36+
}
37+
38+
impl<S: Stream<Item = Result<Bytes>> + Unpin> Stream for DecompressedDeflateStream<S> {
39+
type Item = Result<Bytes>;
40+
41+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
42+
self.inner.poll_next_unpin(cx)
43+
}
44+
}
45+
46+
impl<S: Stream<Item = Result<Bytes>> + Unpin> DecompressedDeflateStream<S> {
47+
pub fn new(stream: S) -> DecompressedDeflateStream<S> {
48+
DecompressedDeflateStream {
49+
inner: DecompressedStream::new(stream, Decompress::new(false)),
50+
}
51+
}
52+
}

src/stream/flate.rs

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ use std::{
66
};
77

88
use bytes::{Bytes, BytesMut};
9-
pub(crate) use flate2::Compress;
10-
use flate2::{FlushCompress, Status};
9+
pub(crate) use flate2::{Compress, Decompress};
10+
use flate2::{FlushCompress, FlushDecompress, Status};
1111
use futures::{ready, stream::Stream};
1212
use pin_project::unsafe_project;
1313

@@ -125,3 +125,109 @@ impl<S: Stream<Item = Result<Bytes>>> CompressedStream<S> {
125125
}
126126
}
127127
}
128+
129+
#[unsafe_project(Unpin)]
130+
pub(crate) struct DecompressedStream<S: Stream<Item = Result<Bytes>>> {
131+
#[pin]
132+
inner: S,
133+
state: State,
134+
output: BytesMut,
135+
decompress: Decompress,
136+
}
137+
138+
impl<S: Stream<Item = Result<Bytes>>> Stream for DecompressedStream<S> {
139+
type Item = Result<Bytes>;
140+
141+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
142+
let mut this = self.project();
143+
144+
fn decompress(
145+
decompress: &mut Decompress,
146+
input: &mut Bytes,
147+
output: &mut BytesMut,
148+
flush: FlushDecompress,
149+
) -> Result<(Status, Bytes)> {
150+
const OUTPUT_BUFFER_SIZE: usize = 8_000;
151+
152+
if output.len() < OUTPUT_BUFFER_SIZE {
153+
output.resize(OUTPUT_BUFFER_SIZE, 0);
154+
}
155+
156+
let (prior_in, prior_out) = (decompress.total_in(), decompress.total_out());
157+
let status = decompress.decompress(input, output, flush)?;
158+
let input_len = decompress.total_in() - prior_in;
159+
let output_len = decompress.total_out() - prior_out;
160+
161+
input.advance(input_len as usize);
162+
Ok((status, output.split_to(output_len as usize).freeze()))
163+
}
164+
165+
#[allow(clippy::never_loop)] // https://github.com/rust-lang/rust-clippy/issues/4058
166+
loop {
167+
break match mem::replace(this.state, State::Invalid) {
168+
State::Reading => {
169+
*this.state = State::Reading;
170+
*this.state = match ready!(this.inner.as_mut().poll_next(cx)) {
171+
Some(chunk) => State::Writing(chunk?),
172+
None => State::Flushing,
173+
};
174+
continue;
175+
}
176+
177+
State::Writing(mut input) => {
178+
if input.is_empty() {
179+
*this.state = State::Reading;
180+
continue;
181+
}
182+
183+
let (status, chunk) = decompress(
184+
&mut this.decompress,
185+
&mut input,
186+
&mut this.output,
187+
FlushDecompress::None,
188+
)?;
189+
190+
*this.state = match status {
191+
Status::Ok => State::Writing(input),
192+
Status::StreamEnd => State::Reading,
193+
Status::BufError => panic!("unexpected BufError"),
194+
};
195+
196+
Poll::Ready(Some(Ok(chunk)))
197+
}
198+
199+
State::Flushing => {
200+
let (status, chunk) = decompress(
201+
&mut this.decompress,
202+
&mut Bytes::new(),
203+
&mut this.output,
204+
FlushDecompress::Finish,
205+
)?;
206+
207+
*this.state = match status {
208+
Status::Ok => State::Flushing,
209+
Status::StreamEnd => State::Done,
210+
Status::BufError => panic!("unexpected BufError"),
211+
};
212+
213+
Poll::Ready(Some(Ok(chunk)))
214+
}
215+
216+
State::Done => Poll::Ready(None),
217+
218+
State::Invalid => panic!("CompressedStream reached invalid state"),
219+
};
220+
}
221+
}
222+
}
223+
224+
impl<S: Stream<Item = Result<Bytes>>> DecompressedStream<S> {
225+
pub(crate) fn new(stream: S, decompress: Decompress) -> DecompressedStream<S> {
226+
DecompressedStream {
227+
inner: stream,
228+
state: State::Reading,
229+
output: BytesMut::new(),
230+
decompress,
231+
}
232+
}
233+
}

0 commit comments

Comments
 (0)