Skip to content

Commit 71e2a73

Browse files
committed
add decompressed gzip stream support, tests
and a bit of refactoring adding the bytes
1 parent 1c2f1d0 commit 71e2a73

File tree

6 files changed

+153
-27
lines changed

6 files changed

+153
-27
lines changed

src/stream/flate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ impl<S: Stream<Item = Result<Bytes>>> Stream for DecompressedStream<S> {
9494
if *this.flushing {
9595
return Poll::Ready(None);
9696
} else if let Some(bytes) = ready!(this.inner.poll_next(cx)) {
97-
*this.input_buffer = bytes?;
97+
this.input_buffer.extend_from_slice(&bytes?);
9898
} else {
9999
*this.flushing = true;
100100
}

src/stream/gzip.rs

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ use core::{
22
pin::Pin,
33
task::{Context, Poll},
44
};
5-
use std::io::Result;
5+
use std::io::{Error, ErrorKind, Result};
66

77
use bytes::{Bytes, BytesMut};
88
pub use flate2::Compression;
9-
use flate2::{Compress, Crc, Decompress, FlushCompress};
9+
use flate2::{Compress, Crc, Decompress, FlushCompress, FlushDecompress};
1010
use futures::{ready, stream::Stream};
1111
use pin_project::unsafe_project;
1212

@@ -42,15 +42,15 @@ impl<S: Stream<Item = Result<Bytes>>> Stream for GzipStream<S> {
4242
if *this.flushing {
4343
if !*this.footer_appended {
4444
let mut footer = Bytes::from(&this.crc.sum().to_le_bytes()[..]);
45-
let length_read = &this.crc.amount().to_le_bytes()[..];
46-
footer.extend_from_slice(length_read);
45+
let bytes_read = &this.crc.amount().to_le_bytes()[..];
46+
footer.extend_from_slice(bytes_read);
4747
*this.footer_appended = true;
4848
return Poll::Ready(Some(Ok(footer)));
4949
} else {
5050
return Poll::Ready(None);
5151
}
5252
} else if let Some(bytes) = ready!(this.inner.poll_next(cx)) {
53-
*this.input_buffer = bytes?;
53+
this.input_buffer.extend_from_slice(&bytes?);
5454
} else {
5555
*this.flushing = true;
5656
}
@@ -121,7 +121,6 @@ pub struct DecompressedGzipStream<S: Stream<Item = Result<Bytes>>> {
121121
output_buffer: BytesMut,
122122
crc: Crc,
123123
header_stripped: bool,
124-
footer_stripped: bool,
125124
decompress: Decompress,
126125
}
127126

@@ -133,17 +132,63 @@ impl<S: Stream<Item = Result<Bytes>>> Stream for DecompressedGzipStream<S> {
133132

134133
let this = self.project();
135134

136-
if this.input_buffer.is_empty() {
135+
if this.input_buffer.len() <= 8 {
137136
if *this.flushing {
137+
// check crc and len in the footer
138+
let crc = &this.crc.sum().to_le_bytes()[..];
139+
let bytes_read = &this.crc.amount().to_le_bytes()[..];
140+
if crc != this.input_buffer.slice(0, 4) {
141+
return Poll::Ready(Some(Err(Error::new(
142+
ErrorKind::InvalidData,
143+
"CRC computed does not match",
144+
))));
145+
} else if bytes_read != this.input_buffer.slice(4, 8) {
146+
return Poll::Ready(Some(Err(Error::new(
147+
ErrorKind::InvalidData,
148+
"amount of bytes read does not match",
149+
))));
150+
}
138151
return Poll::Ready(None);
139152
} else if let Some(bytes) = ready!(this.inner.poll_next(cx)) {
140-
*this.input_buffer = bytes?;
153+
this.input_buffer.extend_from_slice(&bytes?);
141154
} else {
142155
*this.flushing = true;
143156
}
144157
}
145158

146-
unimplemented!()
159+
if !*this.header_stripped {
160+
let header = this.input_buffer.split_to(10);
161+
if header[0..3] != [0x1f, 0x8b, 0x08] {
162+
return Poll::Ready(Some(Err(Error::new(
163+
ErrorKind::InvalidData,
164+
"Invalid file header",
165+
))));
166+
}
167+
*this.header_stripped = true;
168+
return Poll::Ready(Some(Ok(Bytes::new())));
169+
}
170+
171+
this.output_buffer.resize(OUTPUT_BUFFER_SIZE, 0);
172+
173+
let flush = if *this.flushing {
174+
FlushDecompress::Finish
175+
} else {
176+
FlushDecompress::None
177+
};
178+
179+
let (prior_in, prior_out) = (this.decompress.total_in(), this.decompress.total_out());
180+
this.decompress
181+
.decompress(this.input_buffer, this.output_buffer, flush)?;
182+
let input = this.decompress.total_in() - prior_in;
183+
let output = this.decompress.total_out() - prior_out;
184+
185+
this.crc.update(&this.output_buffer[..output as usize]);
186+
this.input_buffer.advance(input as usize);
187+
188+
Poll::Ready(Some(Ok(this
189+
.output_buffer
190+
.split_to(output as usize)
191+
.freeze())))
147192
}
148193
}
149194

@@ -156,7 +201,6 @@ impl<S: Stream<Item = Result<Bytes>>> DecompressedGzipStream<S> {
156201
output_buffer: BytesMut::new(),
157202
crc: Crc::new(),
158203
header_stripped: false,
159-
footer_stripped: false,
160204
decompress: Decompress::new(false),
161205
}
162206
}

tests/brotli.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use brotli2::bufread::BrotliDecoder;
22
use bytes::Bytes;
33
use futures::{
44
executor::block_on,
5-
//io::AsyncReadExt,
65
stream::{self, StreamExt},
76
};
87
use std::io::{self, Read};
@@ -27,18 +26,19 @@ fn brotli_stream() {
2726
assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
2827
}
2928

30-
//#[test]
31-
//fn brotli_read() {
32-
// use async_compression::read::brotli;
33-
//
34-
// let input = &[1, 2, 3, 4, 5, 6];
35-
// let compress = brotli::Compress::new();
36-
// let mut compressed = brotli::compress_read(&input[..], compress);
37-
// let mut data = vec![];
38-
// block_on(compressed.read_to_end(&mut data)).unwrap();
39-
// let mut output = vec![];
40-
// BrotliDecoder::new(&data[..])
41-
// .read_to_end(&mut output)
42-
// .unwrap();
43-
// assert_eq!(output, input);
44-
//}
29+
#[test]
30+
fn decompressed_brotli_stream() {
31+
use async_compression::stream::brotli;
32+
33+
let stream = stream::iter(vec![
34+
Bytes::from_static(&[1, 2, 3]),
35+
Bytes::from_static(&[4, 5, 6]),
36+
]);
37+
let compress = brotli::Compress::new();
38+
let compressed = brotli::BrotliStream::new(stream.map(Ok), compress);
39+
let decompressed = brotli::DecompressedBrotliStream::new(compressed);
40+
let data: Vec<_> = block_on(decompressed.collect());
41+
let data: io::Result<Vec<_>> = data.into_iter().collect();
42+
let data: Vec<u8> = data.unwrap().into_iter().flatten().collect();
43+
assert_eq!(data, vec![1, 2, 3, 4, 5, 6]);
44+
}

tests/deflate.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,23 @@ fn deflate_stream() {
2626
assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
2727
}
2828

29+
#[test]
30+
fn decompressed_deflate_stream() {
31+
use async_compression::stream::deflate;
32+
33+
let stream = stream::iter(vec![
34+
Bytes::from_static(&[1, 2, 3]),
35+
Bytes::from_static(&[4, 5, 6]),
36+
]);
37+
let compressed = deflate::DeflateStream::new(stream.map(Ok), deflate::Compression::default());
38+
let decompressed = deflate::DecompressedDeflateStream::new(compressed);
39+
let data: Vec<_> = block_on(decompressed.collect());
40+
let data: io::Result<Vec<_>> = data.into_iter().collect();
41+
let data: Vec<u8> = data.unwrap().into_iter().flatten().collect();
42+
43+
assert_eq!(data, vec![1, 2, 3, 4, 5, 6]);
44+
}
45+
2946
#[test]
3047
fn deflate_read() {
3148
use async_compression::read::deflate;

tests/gzip.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,51 @@ fn gzip_stream() {
2222
GzDecoder::new(&data[..]).read_to_end(&mut output).unwrap();
2323
assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
2424
}
25+
26+
#[test]
27+
fn decompressed_gzip_stream_1() {
28+
use async_compression::stream::gzip;
29+
30+
let stream = stream::iter(vec![
31+
Bytes::from_static(&[1, 2, 3]),
32+
Bytes::from_static(&[4, 5, 6]),
33+
]);
34+
let compressed = gzip::GzipStream::new(stream.map(Ok), gzip::Compression::default());
35+
let decompressed = gzip::DecompressedGzipStream::new(compressed);
36+
let data: Vec<_> = block_on(decompressed.collect());
37+
let data: io::Result<Vec<_>> = data.into_iter().collect();
38+
let data: Vec<u8> = data.unwrap().into_iter().flatten().collect();
39+
assert_eq!(data, vec![1, 2, 3, 4, 5, 6]);
40+
}
41+
42+
#[test]
43+
fn decompressed_gzip_stream_2() {
44+
use async_compression::stream::gzip;
45+
46+
let stream = stream::iter(vec![
47+
Bytes::from_static(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
48+
Bytes::from_static(&[11, 12, 13, 14, 15]),
49+
]);
50+
let compressed = gzip::GzipStream::new(stream.map(Ok), gzip::Compression::default());
51+
let decompressed = gzip::DecompressedGzipStream::new(compressed);
52+
let data: Vec<_> = block_on(decompressed.collect());
53+
let data: io::Result<Vec<_>> = data.into_iter().collect();
54+
let data: Vec<u8> = data.unwrap().into_iter().flatten().collect();
55+
assert_eq!(
56+
data,
57+
vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
58+
);
59+
}
60+
61+
#[test]
62+
fn decompressed_gzip_stream_3() {
63+
use async_compression::stream::gzip;
64+
65+
let stream = stream::iter(vec![Bytes::from_static(&[])]);
66+
let compressed = gzip::GzipStream::new(stream.map(Ok), gzip::Compression::default());
67+
let decompressed = gzip::DecompressedGzipStream::new(compressed);
68+
let data: Vec<_> = block_on(decompressed.collect());
69+
let data: io::Result<Vec<_>> = data.into_iter().collect();
70+
let data: Vec<u8> = data.unwrap().into_iter().flatten().collect();
71+
assert_eq!(data, vec![]);
72+
}

tests/zlib.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,23 @@ fn zlib_stream() {
2626
assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
2727
}
2828

29+
#[test]
30+
fn decompressed_zlib_stream() {
31+
use async_compression::stream::zlib;
32+
33+
let stream = stream::iter(vec![
34+
Bytes::from_static(&[1, 2, 3]),
35+
Bytes::from_static(&[4, 5, 6]),
36+
]);
37+
let compressed = zlib::ZlibStream::new(stream.map(Ok), zlib::Compression::default());
38+
let decompressed = zlib::DecompressedZlibStream::new(compressed);
39+
let data: Vec<_> = block_on(decompressed.collect());
40+
let data: io::Result<Vec<_>> = data.into_iter().collect();
41+
let data: Vec<u8> = data.unwrap().into_iter().flatten().collect();
42+
43+
assert_eq!(data, vec![1, 2, 3, 4, 5, 6]);
44+
}
45+
2946
#[test]
3047
fn zlib_read() {
3148
use async_compression::read::zlib;

0 commit comments

Comments
 (0)