Skip to content

Commit a3c3f55

Browse files
authored
Add arrow feature to re_chunk and conversions to RecordBatch (#7355)
### What Basic type conversions from TransportChunk to RecordBatch and back. Adding the round-trip test turned up an interesting issue. TransportChunk <-> RecordBatch fails to round-trip successfully because we lose the ExtensionType encapsulation that used to be encoded by arrow2. While on the surface this isn't immediately problematic, as we don't care about ExtensionTypes, the discussion indicates there are in fact going to be very real pain points when it comes to writing semantic data processing engines using arrow-rs. This is because the metadata is attached to the FIELD, not the DATATYPE, and there exist many processing contexts where the context of that field itself is lost. apache/arrow-rs#4472 ### Checklist * [x] I have read and agree to [Contributor Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and the [Code of Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md) * [x] I've included a screenshot or gif (if applicable) * [x] I have tested the web demo (if applicable): * Using examples from latest `main` build: [rerun.io/viewer](https://rerun.io/viewer/pr/7355?manifest_url=https://app.rerun.io/version/main/examples_manifest.json) * Using full set of examples from `nightly` build: [rerun.io/viewer](https://rerun.io/viewer/pr/7355?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json) * [x] The PR title and labels are set such as to maximize their usefulness for the next release's CHANGELOG * [x] If applicable, add a new check to the [release checklist](https://github.com/rerun-io/rerun/blob/main/tests/python/release_checklist)! * [x] If have noted any breaking changes to the log API in `CHANGELOG.md` and the migration guide - [PR Build Summary](https://build.rerun.io/pr/7355) - [Recent benchmark results](https://build.rerun.io/graphs/crates.html) - [Wasm size tracking](https://build.rerun.io/graphs/sizes.html) To run all checks from `main`, comment on the PR with `@rerun-bot full-check`.
1 parent 2643ebc commit a3c3f55

File tree

6 files changed

+160
-2
lines changed

6 files changed

+160
-2
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4784,6 +4784,7 @@ version = "0.19.0-alpha.1+dev"
47844784
dependencies = [
47854785
"ahash",
47864786
"anyhow",
4787+
"arrow",
47874788
"bytemuck",
47884789
"criterion",
47894790
"crossbeam",

crates/store/re_chunk/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ serde = [
3030
"re_types_core/serde",
3131
]
3232

33+
## Enable conversion to and from arrow-rs types
34+
arrow = ["arrow2/arrow", "dep:arrow"]
35+
3336

3437
[dependencies]
3538

@@ -60,6 +63,7 @@ thiserror.workspace = true
6063

6164
# Optional dependencies:
6265
serde = { workspace = true, optional = true, features = ["derive", "rc"] }
66+
arrow = { workspace = true, optional = true }
6367

6468
# Native dependencies:
6569
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]

crates/store/re_chunk/src/arrow.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
use arrow::{
2+
array::{make_array, RecordBatch},
3+
datatypes::{Field, Schema},
4+
error::ArrowError,
5+
};
6+
7+
use crate::TransportChunk;
8+
9+
impl TransportChunk {
10+
/// Create an arrow-rs [`RecordBatch`] containing the data from this [`TransportChunk`].
11+
///
12+
/// This is a "fairly" cheap operation, as it does not copy the underlying arrow data,
13+
/// but does incur overhead of generating an alternative representation of the arrow-
14+
/// related rust structures that refer to those data buffers.
15+
pub fn try_to_arrow_record_batch(&self) -> Result<RecordBatch, ArrowError> {
16+
let fields: Vec<Field> = self
17+
.schema
18+
.fields
19+
.iter()
20+
.map(|f| f.clone().into())
21+
.collect();
22+
23+
let metadata = self.schema.metadata.clone().into_iter().collect();
24+
25+
let schema = Schema::new(fields).with_metadata(metadata);
26+
27+
let columns: Vec<_> = self
28+
.data
29+
.columns()
30+
.iter()
31+
.map(|arr2_array| {
32+
let data = arrow2::array::to_data(arr2_array.as_ref());
33+
make_array(data)
34+
})
35+
.collect();
36+
37+
RecordBatch::try_new(std::sync::Arc::new(schema), columns)
38+
}
39+
40+
/// Create a [`TransportChunk`] from an arrow-rs [`RecordBatch`].
41+
///
42+
/// This is a "fairly" cheap operation, as it does not copy the underlying arrow data,
43+
/// but does incur overhead of generating an alternative representation of the arrow-
44+
/// related rust structures that refer to those data buffers.
45+
pub fn from_arrow_record_batch(batch: &RecordBatch) -> Self {
46+
let fields: Vec<arrow2::datatypes::Field> = batch
47+
.schema()
48+
.fields
49+
.iter()
50+
.map(|f| f.clone().into())
51+
.collect();
52+
53+
let metadata = batch.schema().metadata.clone().into_iter().collect();
54+
55+
let schema = arrow2::datatypes::Schema::from(fields).with_metadata(metadata);
56+
57+
let columns: Vec<_> = batch
58+
.columns()
59+
.iter()
60+
.map(|array| arrow2::array::from_data(&array.to_data()))
61+
.collect();
62+
63+
let data = arrow2::chunk::Chunk::new(columns);
64+
65+
Self { schema, data }
66+
}
67+
}

crates/store/re_chunk/src/chunk.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,71 @@ impl Chunk {
152152
}
153153
&& *components == rhs.components
154154
}
155+
156+
/// Check for equality while ignoring possible `Extension` type information
157+
///
158+
/// This is necessary because `arrow2` loses the `Extension` datatype
159+
/// when deserializing back from the `arrow_schema::DataType` representation.
160+
///
161+
/// In theory we could fix this, but as we're moving away from arrow2 anyways
162+
/// it's unlikely worth the effort.
163+
pub fn are_equal_ignoring_extension_types(&self, other: &Self) -> bool {
164+
let Self {
165+
id,
166+
entity_path,
167+
heap_size_bytes: _,
168+
is_sorted,
169+
row_ids,
170+
timelines,
171+
components,
172+
} = self;
173+
174+
let row_ids_no_extension = arrow2::array::StructArray::new(
175+
row_ids.data_type().to_logical_type().clone(),
176+
row_ids.values().to_vec(),
177+
row_ids.validity().cloned(),
178+
);
179+
180+
let components_no_extension: BTreeMap<_, _> = components
181+
.iter()
182+
.map(|(name, arr)| {
183+
let arr = arrow2::array::ListArray::new(
184+
arr.data_type().to_logical_type().clone(),
185+
arr.offsets().clone(),
186+
arr.values().clone(),
187+
arr.validity().cloned(),
188+
);
189+
(name, arr)
190+
})
191+
.collect();
192+
193+
let other_components_no_extension: BTreeMap<_, _> = other
194+
.components
195+
.iter()
196+
.map(|(name, arr)| {
197+
let arr = arrow2::array::ListArray::new(
198+
arr.data_type().to_logical_type().clone(),
199+
arr.offsets().clone(),
200+
arr.values().clone(),
201+
arr.validity().cloned(),
202+
);
203+
(name, arr)
204+
})
205+
.collect();
206+
207+
let other_row_ids_no_extension = arrow2::array::StructArray::new(
208+
other.row_ids.data_type().to_logical_type().clone(),
209+
other.row_ids.values().to_vec(),
210+
other.row_ids.validity().cloned(),
211+
);
212+
213+
*id == other.id
214+
&& *entity_path == other.entity_path
215+
&& *is_sorted == other.is_sorted
216+
&& row_ids_no_extension == other_row_ids_no_extension
217+
&& *timelines == other.timelines
218+
&& components_no_extension == other_components_no_extension
219+
}
155220
}
156221

157222
impl Clone for Chunk {

crates/store/re_chunk/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ pub mod util;
2020
#[cfg(not(target_arch = "wasm32"))]
2121
mod batcher;
2222

23+
#[cfg(feature = "arrow")]
24+
mod arrow;
25+
2326
pub use self::builder::{ChunkBuilder, TimeColumnBuilder};
2427
pub use self::chunk::{Chunk, ChunkError, ChunkResult, TimeColumn};
2528
pub use self::helpers::{ChunkShared, UnitChunkShared};

crates/store/re_chunk/src/transport.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -711,7 +711,18 @@ mod tests {
711711

712712
for _ in 0..3 {
713713
let chunk_in_transport = chunk_before.to_transport()?;
714-
let chunk_after = Chunk::from_transport(&chunk_in_transport)?;
714+
let chunk_roundtrip;
715+
#[cfg(feature = "arrow")]
716+
{
717+
let chunk_in_record_batch = chunk_in_transport.try_to_arrow_record_batch()?;
718+
chunk_roundtrip =
719+
TransportChunk::from_arrow_record_batch(&chunk_in_record_batch);
720+
}
721+
#[cfg(not(feature = "arrow"))]
722+
{
723+
chunk_roundtrip = &chunk_in_transport;
724+
}
725+
let chunk_after = Chunk::from_transport(&chunk_roundtrip)?;
715726

716727
assert_eq!(
717728
chunk_in_transport.entity_path()?,
@@ -762,7 +773,14 @@ mod tests {
762773
eprintln!("{chunk_in_transport}");
763774
eprintln!("{chunk_after}");
764775

765-
assert_eq!(chunk_before, chunk_after);
776+
#[cfg(not(feature = "arrow"))]
777+
{
778+
// This will fail when round-tripping all the way to record-batch
779+
// the below check should always pass regardless.
780+
assert_eq!(chunk_before, &chunk_after);
781+
}
782+
783+
assert!(chunk_before.are_equal_ignoring_extension_types(&chunk_after));
766784

767785
chunk_before = chunk_after;
768786
}

0 commit comments

Comments
 (0)