Skip to content

Commit 3b0ede4

Browse files
authored
Replace lz4 with lz4_flex Allowing Compilation for WASM (#4884)
* Use lz4_flex * Fix features * Install clang for zlib * Update arrow-ipc * Fix CI * Use LZ4F * Support LZ4F fallback * Restore support for LZ4F compressed CSV * Clippy * Fix features * Add benchmark * Additional system dependencies
1 parent 8c495b6 commit 3b0ede4

File tree

8 files changed

+191
-70
lines changed

8 files changed

+191
-70
lines changed

.github/workflows/parquet.yml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,12 @@ jobs:
123123
uses: ./.github/actions/setup-builder
124124
with:
125125
target: wasm32-unknown-unknown,wasm32-wasi
126+
- name: Install clang # Needed for zlib compilation
127+
run: apt-get update && apt-get install -y clang gcc-multilib
126128
- name: Build wasm32-unknown-unknown
127-
run: cargo build -p parquet --no-default-features --features cli,snap,flate2,brotli --target wasm32-unknown-unknown
129+
run: cargo build -p parquet --target wasm32-unknown-unknown
128130
- name: Build wasm32-wasi
129-
run: cargo build -p parquet --no-default-features --features cli,snap,flate2,brotli --target wasm32-wasi
131+
run: cargo build -p parquet --target wasm32-wasi
130132

131133
pyspark-integration-test:
132134
name: PySpark Integration Test

arrow-integration-test/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,8 @@ impl ArrowJson {
183183
return Ok(false);
184184
}
185185
}
186-
_ => return Ok(false),
186+
Some(Err(e)) => return Err(e),
187+
None => return Ok(false),
187188
}
188189
}
189190

arrow-ipc/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,12 @@ arrow-cast = { workspace = true }
4040
arrow-data = { workspace = true }
4141
arrow-schema = { workspace = true }
4242
flatbuffers = { version = "23.1.21", default-features = false }
43-
lz4 = { version = "1.23", default-features = false, optional = true }
43+
lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"], optional = true }
4444
zstd = { version = "0.12.0", default-features = false, optional = true }
4545

46+
[features]
47+
default = []
48+
lz4 = ["lz4_flex"]
49+
4650
[dev-dependencies]
4751
tempfile = "3.3"

arrow-ipc/src/compression.rs

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,15 @@ impl CompressionCodec {
103103
} else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
104104
// no compression
105105
input.slice(LENGTH_OF_PREFIX_DATA as usize)
106-
} else {
106+
} else if let Ok(decompressed_length) = usize::try_from(decompressed_length) {
107107
// decompress data using the codec
108-
let mut uncompressed_buffer =
109-
Vec::with_capacity(decompressed_length as usize);
110108
let input_data = &input[(LENGTH_OF_PREFIX_DATA as usize)..];
111-
self.decompress(input_data, &mut uncompressed_buffer)?;
112-
Buffer::from(uncompressed_buffer)
109+
self.decompress(input_data, decompressed_length as _)?
110+
.into()
111+
} else {
112+
return Err(ArrowError::IpcError(format!(
113+
"Invalid uncompressed length: {decompressed_length}"
114+
)));
113115
};
114116
Ok(buffer)
115117
}
@@ -128,21 +130,30 @@ impl CompressionCodec {
128130
fn decompress(
129131
&self,
130132
input: &[u8],
131-
output: &mut Vec<u8>,
132-
) -> Result<usize, ArrowError> {
133-
match self {
134-
CompressionCodec::Lz4Frame => decompress_lz4(input, output),
135-
CompressionCodec::Zstd => decompress_zstd(input, output),
133+
decompressed_size: usize,
134+
) -> Result<Vec<u8>, ArrowError> {
135+
let ret = match self {
136+
CompressionCodec::Lz4Frame => decompress_lz4(input, decompressed_size)?,
137+
CompressionCodec::Zstd => decompress_zstd(input, decompressed_size)?,
138+
};
139+
if ret.len() != decompressed_size {
140+
return Err(ArrowError::IpcError(format!(
141+
"Expected compressed length of {decompressed_size} got {}",
142+
ret.len()
143+
)));
136144
}
145+
Ok(ret)
137146
}
138147
}
139148

140149
#[cfg(feature = "lz4")]
141150
fn compress_lz4(input: &[u8], output: &mut Vec<u8>) -> Result<(), ArrowError> {
142151
use std::io::Write;
143-
let mut encoder = lz4::EncoderBuilder::new().build(output)?;
152+
let mut encoder = lz4_flex::frame::FrameEncoder::new(output);
144153
encoder.write_all(input)?;
145-
encoder.finish().1?;
154+
encoder
155+
.finish()
156+
.map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
146157
Ok(())
147158
}
148159

@@ -155,14 +166,19 @@ fn compress_lz4(_input: &[u8], _output: &mut Vec<u8>) -> Result<(), ArrowError>
155166
}
156167

157168
#[cfg(feature = "lz4")]
158-
fn decompress_lz4(input: &[u8], output: &mut Vec<u8>) -> Result<usize, ArrowError> {
169+
fn decompress_lz4(input: &[u8], decompressed_size: usize) -> Result<Vec<u8>, ArrowError> {
159170
use std::io::Read;
160-
Ok(lz4::Decoder::new(input)?.read_to_end(output)?)
171+
let mut output = Vec::with_capacity(decompressed_size);
172+
lz4_flex::frame::FrameDecoder::new(input).read_to_end(&mut output)?;
173+
Ok(output)
161174
}
162175

163176
#[cfg(not(feature = "lz4"))]
164177
#[allow(clippy::ptr_arg)]
165-
fn decompress_lz4(_input: &[u8], _output: &mut Vec<u8>) -> Result<usize, ArrowError> {
178+
fn decompress_lz4(
179+
_input: &[u8],
180+
_decompressed_size: usize,
181+
) -> Result<Vec<u8>, ArrowError> {
166182
Err(ArrowError::InvalidArgumentError(
167183
"lz4 IPC decompression requires the lz4 feature".to_string(),
168184
))
@@ -186,14 +202,22 @@ fn compress_zstd(_input: &[u8], _output: &mut Vec<u8>) -> Result<(), ArrowError>
186202
}
187203

188204
#[cfg(feature = "zstd")]
189-
fn decompress_zstd(input: &[u8], output: &mut Vec<u8>) -> Result<usize, ArrowError> {
205+
fn decompress_zstd(
206+
input: &[u8],
207+
decompressed_size: usize,
208+
) -> Result<Vec<u8>, ArrowError> {
190209
use std::io::Read;
191-
Ok(zstd::Decoder::new(input)?.read_to_end(output)?)
210+
let mut output = Vec::with_capacity(decompressed_size);
211+
zstd::Decoder::with_buffer(input)?.read_to_end(&mut output)?;
212+
Ok(output)
192213
}
193214

194215
#[cfg(not(feature = "zstd"))]
195216
#[allow(clippy::ptr_arg)]
196-
fn decompress_zstd(_input: &[u8], _output: &mut Vec<u8>) -> Result<usize, ArrowError> {
217+
fn decompress_zstd(
218+
_input: &[u8],
219+
_decompressed_size: usize,
220+
) -> Result<Vec<u8>, ArrowError> {
197221
Err(ArrowError::InvalidArgumentError(
198222
"zstd IPC decompression requires the zstd feature".to_string(),
199223
))
@@ -216,28 +240,26 @@ mod tests {
216240
#[test]
217241
#[cfg(feature = "lz4")]
218242
fn test_lz4_compression() {
219-
let input_bytes = "hello lz4".as_bytes();
243+
let input_bytes = b"hello lz4";
220244
let codec = super::CompressionCodec::Lz4Frame;
221245
let mut output_bytes: Vec<u8> = Vec::new();
222246
codec.compress(input_bytes, &mut output_bytes).unwrap();
223-
let mut result_output_bytes: Vec<u8> = Vec::new();
224-
codec
225-
.decompress(output_bytes.as_slice(), &mut result_output_bytes)
247+
let result = codec
248+
.decompress(output_bytes.as_slice(), input_bytes.len())
226249
.unwrap();
227-
assert_eq!(input_bytes, result_output_bytes.as_slice());
250+
assert_eq!(input_bytes, result.as_slice());
228251
}
229252

230253
#[test]
231254
#[cfg(feature = "zstd")]
232255
fn test_zstd_compression() {
233-
let input_bytes = "hello zstd".as_bytes();
256+
let input_bytes = b"hello zstd";
234257
let codec = super::CompressionCodec::Zstd;
235258
let mut output_bytes: Vec<u8> = Vec::new();
236259
codec.compress(input_bytes, &mut output_bytes).unwrap();
237-
let mut result_output_bytes: Vec<u8> = Vec::new();
238-
codec
239-
.decompress(output_bytes.as_slice(), &mut result_output_bytes)
260+
let result = codec
261+
.decompress(output_bytes.as_slice(), input_bytes.len())
240262
.unwrap();
241-
assert_eq!(input_bytes, result_output_bytes.as_slice());
263+
assert_eq!(input_bytes, result.as_slice());
242264
}
243265
}

parquet/Cargo.toml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ thrift = { version = "0.17", default-features = false }
5151
snap = { version = "1.0", default-features = false, optional = true }
5252
brotli = { version = "3.3", default-features = false, features = ["std"], optional = true }
5353
flate2 = { version = "1.0", default-features = false, features = ["rust_backend"], optional = true }
54-
lz4 = { version = "1.23", default-features = false, optional = true }
54+
lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"], optional = true }
5555
zstd = { version = "0.12.0", optional = true, default-features = false }
5656
chrono = { workspace = true }
5757
num = { version = "0.4", default-features = false }
@@ -74,7 +74,7 @@ snap = { version = "1.0", default-features = false }
7474
tempfile = { version = "3.0", default-features = false }
7575
brotli = { version = "3.3", default-features = false, features = ["std"] }
7676
flate2 = { version = "1.0", default-features = false, features = ["rust_backend"] }
77-
lz4 = { version = "1.23", default-features = false }
77+
lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"] }
7878
zstd = { version = "0.12", default-features = false }
7979
serde_json = { version = "1.0", features = ["std"], default-features = false }
8080
arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "json"] }
@@ -86,6 +86,8 @@ all-features = true
8686

8787
[features]
8888
default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
89+
# Enable lz4
90+
lz4 = ["lz4_flex"]
8991
# Enable arrow reader/writer APIs
9092
arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-cast", "arrow-data", "arrow-schema", "arrow-select", "arrow-ipc"]
9193
# Enable CLI tools
@@ -166,5 +168,10 @@ name = "arrow_reader"
166168
required-features = ["arrow", "test_common", "experimental"]
167169
harness = false
168170

171+
[[bench]]
172+
name = "compression"
173+
required-features = ["experimental", "default"]
174+
harness = false
175+
169176
[lib]
170177
bench = false

parquet/benches/compression.rs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use criterion::*;
19+
use parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
20+
use parquet::compression::create_codec;
21+
use rand::distributions::Alphanumeric;
22+
use rand::prelude::*;
23+
24+
fn do_bench(c: &mut Criterion, name: &str, uncompressed: &[u8]) {
25+
let codecs = [
26+
Compression::BROTLI(BrotliLevel::default()),
27+
Compression::GZIP(GzipLevel::default()),
28+
Compression::LZ4,
29+
Compression::LZ4_RAW,
30+
Compression::SNAPPY,
31+
Compression::GZIP(GzipLevel::default()),
32+
Compression::ZSTD(ZstdLevel::default()),
33+
];
34+
35+
for compression in codecs {
36+
let mut codec = create_codec(compression, &Default::default())
37+
.unwrap()
38+
.unwrap();
39+
40+
c.bench_function(&format!("compress {compression} - {name}"), |b| {
41+
b.iter(|| {
42+
let mut out = Vec::new();
43+
codec.compress(uncompressed, &mut out).unwrap();
44+
out
45+
});
46+
});
47+
48+
let mut compressed = Vec::new();
49+
codec.compress(uncompressed, &mut compressed).unwrap();
50+
println!(
51+
"{compression} compressed {} bytes of {name} to {} bytes",
52+
uncompressed.len(),
53+
compressed.len()
54+
);
55+
56+
c.bench_function(&format!("decompress {compression} - {name}"), |b| {
57+
b.iter(|| {
58+
let mut out = Vec::new();
59+
codec
60+
.decompress(
61+
black_box(&compressed),
62+
&mut out,
63+
Some(uncompressed.len()),
64+
)
65+
.unwrap();
66+
out
67+
});
68+
});
69+
}
70+
}
71+
72+
fn criterion_benchmark(c: &mut Criterion) {
73+
let mut rng = StdRng::seed_from_u64(42);
74+
let rng = &mut rng;
75+
const DATA_SIZE: usize = 1024 * 1024;
76+
77+
let uncompressed: Vec<_> = rng.sample_iter(&Alphanumeric).take(DATA_SIZE).collect();
78+
do_bench(c, "alphanumeric", &uncompressed);
79+
80+
// Create a collection of 64 words
81+
let words: Vec<Vec<_>> = (0..64)
82+
.map(|_| {
83+
let len = rng.gen_range(1..12);
84+
rng.sample_iter(&Alphanumeric).take(len).collect()
85+
})
86+
.collect();
87+
88+
// Build data by concatenating these words randomly together
89+
let mut uncompressed = Vec::with_capacity(DATA_SIZE);
90+
while uncompressed.len() < DATA_SIZE {
91+
let word = &words[rng.gen_range(0..words.len())];
92+
uncompressed
93+
.extend_from_slice(&word[..word.len().min(DATA_SIZE - uncompressed.len())])
94+
}
95+
assert_eq!(uncompressed.len(), DATA_SIZE);
96+
97+
do_bench(c, "words", &uncompressed);
98+
}
99+
100+
criterion_group!(benches, criterion_benchmark);
101+
criterion_main!(benches);

parquet/src/bin/parquet-fromcsv.rs

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -386,9 +386,9 @@ fn convert_csv_to_parquet(args: &Args) -> Result<(), ParquetFromCsvError> {
386386
Compression::BROTLI(_) => {
387387
Box::new(brotli::Decompressor::new(input_file, 0)) as Box<dyn Read>
388388
}
389-
Compression::LZ4 => Box::new(lz4::Decoder::new(input_file).map_err(|e| {
390-
ParquetFromCsvError::with_context(e, "Failed to create lz4::Decoder")
391-
})?) as Box<dyn Read>,
389+
Compression::LZ4 => {
390+
Box::new(lz4_flex::frame::FrameDecoder::new(input_file)) as Box<dyn Read>
391+
}
392392
Compression::ZSTD(_) => Box::new(zstd::Decoder::new(input_file).map_err(|e| {
393393
ParquetFromCsvError::with_context(e, "Failed to create zstd::Decoder")
394394
})?) as Box<dyn Read>,
@@ -692,19 +692,9 @@ mod tests {
692692
encoder.into_inner()
693693
}
694694
Compression::LZ4 => {
695-
let mut encoder = lz4::EncoderBuilder::new()
696-
.build(input_file)
697-
.map_err(|e| {
698-
ParquetFromCsvError::with_context(
699-
e,
700-
"Failed to create lz4::Encoder",
701-
)
702-
})
703-
.unwrap();
695+
let mut encoder = lz4_flex::frame::FrameEncoder::new(input_file);
704696
write_tmp_file(&mut encoder);
705-
let (inner, err) = encoder.finish();
706-
err.unwrap();
707-
inner
697+
encoder.finish().unwrap()
708698
}
709699

710700
Compression::ZSTD(level) => {

0 commit comments

Comments
 (0)