Skip to content

Commit 0efbc08

Browse files
committed
Convert re_dataframe/src/qeury.rs
1 parent b45c16f commit 0efbc08

File tree

1 file changed

+19
-59
lines changed

1 file changed

+19
-59
lines changed

crates/store/re_dataframe/src/query.rs

Lines changed: 19 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ use std::{
77
};
88

99
use arrow::{
10-
array::RecordBatch as ArrowRecordBatch,
10+
array::{ArrayRef as ArrowArrayRef, RecordBatch as ArrowRecordBatch},
1111
buffer::ScalarBuffer as ArrowScalarBuffer,
12-
datatypes::SchemaRef as ArrowSchemaRef,
13-
datatypes::{DataType as ArrowDataType, Fields as ArrowFields, Schema as ArrowSchema},
12+
datatypes::{
13+
DataType as ArrowDataType, Fields as ArrowFields, Schema as ArrowSchema,
14+
SchemaRef as ArrowSchemaRef,
15+
},
1416
};
1517
use arrow2::{
1618
array::{
@@ -792,39 +794,7 @@ impl<E: StorageEngineLike> QueryHandle<E> {
792794
#[inline]
793795
pub fn next_row(&self) -> Option<Vec<ArrayRef>> {
794796
self.engine
795-
.with(|store, cache| self._next_row_arrow2(store, cache))
796-
.map(|vec| vec.into_iter().map(|a| a.into()).collect())
797-
}
798-
799-
/// Returns the next row's worth of data.
800-
///
801-
/// The returned vector of Arrow arrays strictly follows the schema specified by [`Self::schema`].
802-
/// Columns that do not yield any data will still be present in the results, filled with null values.
803-
///
804-
/// Each cell in the result corresponds to the latest _locally_ known value at that particular point in
805-
/// the index, for each respective `ColumnDescriptor`.
806-
/// See [`QueryExpression::sparse_fill_strategy`] to go beyond local resolution.
807-
///
808-
/// Example:
809-
/// ```ignore
810-
/// while let Some(row) = query_handle.next_row() {
811-
/// // …
812-
/// }
813-
/// ```
814-
///
815-
/// ## Pagination
816-
///
817-
/// Use [`Self::seek_to_row`]:
818-
/// ```ignore
819-
/// query_handle.seek_to_row(42);
820-
/// for row in query_handle.into_iter().take(len) {
821-
/// // …
822-
/// }
823-
/// ```
824-
#[inline]
825-
fn next_row_arrow2(&self) -> Option<Vec<Box<dyn Arrow2Array>>> {
826-
self.engine
827-
.with(|store, cache| self._next_row_arrow2(store, cache))
797+
.with(|store, cache| self._next_row(store, cache))
828798
}
829799

830800
/// Asynchronously returns the next row's worth of data.
@@ -843,15 +813,13 @@ impl<E: StorageEngineLike> QueryHandle<E> {
843813
/// }
844814
/// ```
845815
#[cfg(not(target_arch = "wasm32"))]
846-
pub fn next_row_async_arrow2(
847-
&self,
848-
) -> impl std::future::Future<Output = Option<Vec<Box<dyn Arrow2Array>>>>
816+
pub fn next_row_async(&self) -> impl std::future::Future<Output = Option<Vec<ArrayRef>>>
849817
where
850818
E: 'static + Send + Clone,
851819
{
852820
let res: Option<Option<_>> = self
853821
.engine
854-
.try_with(|store, cache| self._next_row_arrow2(store, cache));
822+
.try_with(|store, cache| self._next_row(store, cache));
855823

856824
let engine = self.engine.clone();
857825
std::future::poll_fn(move |cx| {
@@ -881,11 +849,7 @@ impl<E: StorageEngineLike> QueryHandle<E> {
881849
})
882850
}
883851

884-
pub fn _next_row_arrow2(
885-
&self,
886-
store: &ChunkStore,
887-
cache: &QueryCache,
888-
) -> Option<Vec<Box<dyn Arrow2Array>>> {
852+
pub fn _next_row(&self, store: &ChunkStore, cache: &QueryCache) -> Option<Vec<ArrowArrayRef>> {
889853
re_tracing::profile_function!();
890854

891855
/// Temporary state used to resolve the streaming join for the current iteration.
@@ -1238,20 +1202,17 @@ impl<E: StorageEngineLike> QueryHandle<E> {
12381202
.map(|(view_idx, column)| match column {
12391203
ColumnDescriptor::Time(descr) => {
12401204
max_value_per_index.get(&descr.timeline()).map_or_else(
1241-
|| arrow2::array::new_null_array(column.arrow_datatype().into(), 1),
1242-
|(_time, time_sliced)| {
1243-
descr.typ().make_arrow_array(time_sliced.clone()).into()
1244-
},
1205+
|| arrow::array::new_null_array(&column.arrow_datatype(), 1),
1206+
|(_time, time_sliced)| descr.typ().make_arrow_array(time_sliced.clone()),
12451207
)
12461208
}
12471209

12481210
ColumnDescriptor::Component(_descr) => view_sliced_arrays
12491211
.get(*view_idx)
12501212
.cloned()
12511213
.flatten()
1252-
.unwrap_or_else(|| {
1253-
arrow2::array::new_null_array(column.arrow_datatype().into(), 1)
1254-
}),
1214+
.map(|a| a.into())
1215+
.unwrap_or_else(|| arrow::array::new_null_array(&column.arrow_datatype(), 1)),
12551216
})
12561217
.collect_vec();
12571218

@@ -1278,28 +1239,27 @@ impl<E: StorageEngineLike> QueryHandle<E> {
12781239
where
12791240
E: 'static + Send + Clone,
12801241
{
1281-
let row = self.next_row_async_arrow2().await?;
1242+
let row = self.next_row_async().await?;
12821243

12831244
// If we managed to get a row, then the state must be initialized already.
12841245
#[allow(clippy::unwrap_used)]
12851246
let schema = self.state.get().unwrap().arrow_schema.clone();
12861247

1287-
// TODO(#3741): remove the collect
1288-
ArrowRecordBatch::try_new(schema, row.into_iter().map(|a| a.into()).collect()).ok()
1248+
ArrowRecordBatch::try_new(schema, row).ok()
12891249
}
12901250
}
12911251

12921252
impl<E: StorageEngineLike> QueryHandle<E> {
12931253
/// Returns an iterator backed by [`Self::next_row`].
12941254
#[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work
1295-
pub fn iter(&self) -> impl Iterator<Item = Vec<Box<dyn Arrow2Array>>> + '_ {
1296-
std::iter::from_fn(move || self.next_row_arrow2())
1255+
pub fn iter(&self) -> impl Iterator<Item = Vec<ArrowArrayRef>> + '_ {
1256+
std::iter::from_fn(move || self.next_row())
12971257
}
12981258

12991259
/// Returns an iterator backed by [`Self::next_row`].
13001260
#[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work
1301-
pub fn into_iter(self) -> impl Iterator<Item = Vec<Box<dyn Arrow2Array>>> {
1302-
std::iter::from_fn(move || self.next_row_arrow2())
1261+
pub fn into_iter(self) -> impl Iterator<Item = Vec<ArrowArrayRef>> {
1262+
std::iter::from_fn(move || self.next_row())
13031263
}
13041264

13051265
/// Returns an iterator backed by [`Self::next_row_batch`].

0 commit comments

Comments
 (0)