Skip to content

Commit d7a7812

Browse files
andygroveviirya
andauthored
feat: Implement custom RecordBatch serde for shuffle for improved performance (#1190)
* Implement faster encoder for shuffle blocks * make code more concise * enable fast encoding for columnar shuffle * update benches * test all int types * test float * remaining types * add Snappy and Zstd(6) back to benchmark * fix regression * Update native/core/src/execution/shuffle/codec.rs Co-authored-by: Liang-Chi Hsieh <[email protected]> * address feedback * support nullable flag --------- Co-authored-by: Liang-Chi Hsieh <[email protected]>
1 parent e8261fb commit d7a7812

File tree

15 files changed

+1090
-187
lines changed

15 files changed

+1090
-187
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,14 @@ object CometConf extends ShimCometConf {
295295
.intConf
296296
.createWithDefault(1)
297297

298+
val COMET_SHUFFLE_ENABLE_FAST_ENCODING: ConfigEntry[Boolean] =
299+
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enableFastEncoding")
300+
.doc("Whether to enable Comet's faster proprietary encoding for shuffle blocks " +
301+
"rather than using Arrow IPC.")
302+
.internal()
303+
.booleanConf
304+
.createWithDefault(true)
305+
298306
val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] =
299307
conf("spark.comet.columnar.shuffle.async.enabled")
300308
.doc("Whether to enable asynchronous shuffle for Arrow-based shuffle.")

native/core/benches/row_columnar.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ fn benchmark(c: &mut Criterion) {
7979
0,
8080
None,
8181
&CompressionCodec::Zstd(1),
82+
true,
8283
)
8384
.unwrap();
8485
});

native/core/benches/shuffle_writer.rs

Lines changed: 67 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use arrow_array::builder::Int32Builder;
18+
use arrow_array::builder::{Date32Builder, Decimal128Builder, Int32Builder};
1919
use arrow_array::{builder::StringBuilder, RecordBatch};
2020
use arrow_schema::{DataType, Field, Schema};
21-
use comet::execution::shuffle::{write_ipc_compressed, CompressionCodec, ShuffleWriterExec};
21+
use comet::execution::shuffle::{CompressionCodec, ShuffleBlockWriter, ShuffleWriterExec};
2222
use criterion::{criterion_group, criterion_main, Criterion};
2323
use datafusion::physical_plan::metrics::Time;
2424
use datafusion::{
@@ -31,67 +31,56 @@ use std::sync::Arc;
3131
use tokio::runtime::Runtime;
3232

3333
fn criterion_benchmark(c: &mut Criterion) {
34+
let batch = create_batch(8192, true);
3435
let mut group = c.benchmark_group("shuffle_writer");
35-
group.bench_function("shuffle_writer: encode (no compression))", |b| {
36-
let batch = create_batch(8192, true);
37-
let mut buffer = vec![];
38-
let ipc_time = Time::default();
39-
b.iter(|| {
40-
buffer.clear();
41-
let mut cursor = Cursor::new(&mut buffer);
42-
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::None, &ipc_time)
43-
});
44-
});
45-
group.bench_function("shuffle_writer: encode and compress (snappy)", |b| {
46-
let batch = create_batch(8192, true);
47-
let mut buffer = vec![];
48-
let ipc_time = Time::default();
49-
b.iter(|| {
50-
buffer.clear();
51-
let mut cursor = Cursor::new(&mut buffer);
52-
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Snappy, &ipc_time)
53-
});
54-
});
55-
group.bench_function("shuffle_writer: encode and compress (lz4)", |b| {
56-
let batch = create_batch(8192, true);
57-
let mut buffer = vec![];
58-
let ipc_time = Time::default();
59-
b.iter(|| {
60-
buffer.clear();
61-
let mut cursor = Cursor::new(&mut buffer);
62-
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Lz4Frame, &ipc_time)
63-
});
64-
});
65-
group.bench_function("shuffle_writer: encode and compress (zstd level 1)", |b| {
66-
let batch = create_batch(8192, true);
67-
let mut buffer = vec![];
68-
let ipc_time = Time::default();
69-
b.iter(|| {
70-
buffer.clear();
71-
let mut cursor = Cursor::new(&mut buffer);
72-
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(1), &ipc_time)
73-
});
74-
});
75-
group.bench_function("shuffle_writer: encode and compress (zstd level 6)", |b| {
76-
let batch = create_batch(8192, true);
77-
let mut buffer = vec![];
78-
let ipc_time = Time::default();
79-
b.iter(|| {
80-
buffer.clear();
81-
let mut cursor = Cursor::new(&mut buffer);
82-
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(6), &ipc_time)
83-
});
84-
});
85-
group.bench_function("shuffle_writer: end to end", |b| {
86-
let ctx = SessionContext::new();
87-
let exec = create_shuffle_writer_exec(CompressionCodec::Zstd(1));
88-
b.iter(|| {
89-
let task_ctx = ctx.task_ctx();
90-
let stream = exec.execute(0, task_ctx).unwrap();
91-
let rt = Runtime::new().unwrap();
92-
criterion::black_box(rt.block_on(collect(stream)).unwrap());
93-
});
94-
});
36+
for compression_codec in &[
37+
CompressionCodec::None,
38+
CompressionCodec::Lz4Frame,
39+
CompressionCodec::Snappy,
40+
CompressionCodec::Zstd(1),
41+
CompressionCodec::Zstd(6),
42+
] {
43+
for enable_fast_encoding in [true, false] {
44+
let name = format!("shuffle_writer: write encoded (enable_fast_encoding={enable_fast_encoding}, compression={compression_codec:?})");
45+
group.bench_function(name, |b| {
46+
let mut buffer = vec![];
47+
let ipc_time = Time::default();
48+
let w = ShuffleBlockWriter::try_new(
49+
&batch.schema(),
50+
enable_fast_encoding,
51+
compression_codec.clone(),
52+
)
53+
.unwrap();
54+
b.iter(|| {
55+
buffer.clear();
56+
let mut cursor = Cursor::new(&mut buffer);
57+
w.write_batch(&batch, &mut cursor, &ipc_time).unwrap();
58+
});
59+
});
60+
}
61+
}
62+
63+
for compression_codec in [
64+
CompressionCodec::None,
65+
CompressionCodec::Lz4Frame,
66+
CompressionCodec::Snappy,
67+
CompressionCodec::Zstd(1),
68+
CompressionCodec::Zstd(6),
69+
] {
70+
group.bench_function(
71+
format!("shuffle_writer: end to end (compression = {compression_codec:?}"),
72+
|b| {
73+
let ctx = SessionContext::new();
74+
let exec = create_shuffle_writer_exec(compression_codec.clone());
75+
b.iter(|| {
76+
let task_ctx = ctx.task_ctx();
77+
let stream = exec.execute(0, task_ctx).unwrap();
78+
let rt = Runtime::new().unwrap();
79+
rt.block_on(collect(stream)).unwrap();
80+
});
81+
},
82+
);
83+
}
9584
}
9685

9786
fn create_shuffle_writer_exec(compression_codec: CompressionCodec) -> ShuffleWriterExec {
@@ -104,6 +93,7 @@ fn create_shuffle_writer_exec(compression_codec: CompressionCodec) -> ShuffleWri
10493
compression_codec,
10594
"/tmp/data.out".to_string(),
10695
"/tmp/index.out".to_string(),
96+
true,
10797
)
10898
.unwrap()
10999
}
@@ -121,11 +111,19 @@ fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {
121111
let schema = Arc::new(Schema::new(vec![
122112
Field::new("c0", DataType::Int32, true),
123113
Field::new("c1", DataType::Utf8, true),
114+
Field::new("c2", DataType::Date32, true),
115+
Field::new("c3", DataType::Decimal128(11, 2), true),
124116
]));
125117
let mut a = Int32Builder::new();
126118
let mut b = StringBuilder::new();
119+
let mut c = Date32Builder::new();
120+
let mut d = Decimal128Builder::new()
121+
.with_precision_and_scale(11, 2)
122+
.unwrap();
127123
for i in 0..num_rows {
128124
a.append_value(i as i32);
125+
c.append_value(i as i32);
126+
d.append_value((i * 1000000) as i128);
129127
if allow_nulls && i % 10 == 0 {
130128
b.append_null();
131129
} else {
@@ -134,7 +132,13 @@ fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {
134132
}
135133
let a = a.finish();
136134
let b = b.finish();
137-
RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap()
135+
let c = c.finish();
136+
let d = d.finish();
137+
RecordBatch::try_new(
138+
schema.clone(),
139+
vec![Arc::new(a), Arc::new(b), Arc::new(c), Arc::new(d)],
140+
)
141+
.unwrap()
138142
}
139143

140144
fn config() -> Criterion {

native/core/src/execution/jni_api.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
635635
current_checksum: jlong,
636636
compression_codec: jstring,
637637
compression_level: jint,
638+
enable_fast_encoding: jboolean,
638639
) -> jlongArray {
639640
try_unwrap_or_throw(&e, |mut env| unsafe {
640641
let data_types = convert_datatype_arrays(&mut env, serialized_datatypes)?;
@@ -686,6 +687,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
686687
checksum_algo,
687688
current_checksum,
688689
&compression_codec,
690+
enable_fast_encoding != JNI_FALSE,
689691
)?;
690692

691693
let checksum = if let Some(checksum) = checksum {

native/core/src/execution/planner.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1114,6 +1114,7 @@ impl PhysicalPlanner {
11141114
codec,
11151115
writer.output_data_file.clone(),
11161116
writer.output_index_file.clone(),
1117+
writer.enable_fast_encoding,
11171118
)?);
11181119

11191120
Ok((

0 commit comments

Comments
 (0)