Skip to content

Commit 197e363

Browse files
committed
support slice view data on writing ipc
1 parent 51c1b4b commit 197e363

File tree

1 file changed

+63
-8
lines changed

1 file changed

+63
-8
lines changed

arrow-ipc/src/writer.rs

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -580,7 +580,12 @@ fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
580580
DataType::BinaryView | DataType::Utf8View => {
581581
// The spec documents the counts only includes the variadic buffers, not the view/null buffers.
582582
// https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers
583-
counts.push(array.buffers().len() as i64 - 1);
583+
let views = array.buffers()[0].typed_data::<u128>();
584+
if views.iter().any(|view| *view as u32 > 12) {
585+
counts.push(array.buffers().len() as i64 - 1);
586+
} else {
587+
counts.push(0);
588+
}
584589
}
585590
DataType::Dictionary(_, _) => {
586591
// Do nothing
@@ -1245,6 +1250,61 @@ fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Arra
12451250
(offsets, child_data)
12461251
}
12471252

1253+
fn update_buffer_index(value: &mut u128, new_buffer_index: u32) {
1254+
// keep length、prefix and offset,clear old buffer_index, see [`ByteView`] for detail.
1255+
let mask = (0xFFFFFFFFu128 << 96) | (0xFFFFFFFFu128 << 64) | (0xFFFFFFFFu128);
1256+
*value &= mask;
1257+
1258+
// move new buffer index to right position
1259+
let new_buffer_index = (new_buffer_index as u128) << 32;
1260+
1261+
// update value with new buffer index
1262+
*value |= new_buffer_index;
1263+
}
1264+
1265+
fn select_data_buffers(mut views_slice: Vec<u128>, data: &ArrayData) -> Vec<Buffer> {
1266+
let first_buffer = views_slice.iter().find(|view| (**view) as u32 > 12);
1267+
// all values shorter than 12 bytes.
1268+
if first_buffer.is_none() {
1269+
return vec![Buffer::from_vec(views_slice)];
1270+
}
1271+
let first_buffer_index = ((*first_buffer.unwrap()) >> 96) as u32 as usize;
1272+
let last_buffer = views_slice
1273+
.iter()
1274+
.rfind(|view| (**view) as u32 > 12)
1275+
.unwrap();
1276+
let last_buffer_index = ((*last_buffer) >> 96) as u32 as usize;
1277+
let data_buffers = &data.buffers()[1..];
1278+
let sliced_data_buffers = &data_buffers[first_buffer_index..last_buffer_index + 1];
1279+
1280+
// if first buffer index not 0, we need re-mapping view's buffer index to sliced data buffers
1281+
if first_buffer_index != 0 {
1282+
views_slice
1283+
.iter_mut()
1284+
.filter(|view| (**view) as u32 > 12)
1285+
.for_each(|view| {
1286+
// new buffer index = original buffer index - offset
1287+
let new_buffer_index = ((*view >> 96) as u32) - first_buffer_index as u32;
1288+
update_buffer_index(view, new_buffer_index);
1289+
});
1290+
}
1291+
1292+
let mut buffers = Vec::with_capacity(sliced_data_buffers.len() + 1);
1293+
buffers.push(views_slice.iter().map(|view| *view).collect());
1294+
buffers.extend_from_slice(sliced_data_buffers);
1295+
buffers
1296+
}
1297+
1298+
fn get_byte_view_buffers(data: &ArrayData) -> Vec<Buffer> {
1299+
if data.is_empty() {
1300+
return Vec::with_capacity(0);
1301+
}
1302+
1303+
let views_slice = data.buffers()[0].typed_data::<u128>();
1304+
let views_slice = &views_slice[data.offset()..data.offset() + data.len()];
1305+
select_data_buffers(views_slice.to_vec(), data)
1306+
}
1307+
12481308
/// Write array data to a vector of bytes
12491309
#[allow(clippy::too_many_arguments)]
12501310
fn write_array_data(
@@ -1303,13 +1363,8 @@ fn write_array_data(
13031363
)?;
13041364
}
13051365
} else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
1306-
// Slicing the views buffer is safe and easy,
1307-
// but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers
1308-
//
1309-
// Current implementation just serialize the raw arrays as given and not try to optimize anything.
1310-
// If users wants to "compact" the arrays prior to sending them over IPC,
1311-
// they should consider the gc API suggested in #5513
1312-
for buffer in array_data.buffers() {
1366+
let view_buffers = get_byte_view_buffers(array_data);
1367+
for buffer in view_buffers {
13131368
offset = write_buffer(
13141369
buffer.as_slice(),
13151370
buffers,

0 commit comments

Comments
 (0)