Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port more of the viewer to arrow1 #8539

Merged
merged 8 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5668,6 +5668,7 @@ version = "0.22.0-alpha.1+dev"
dependencies = [
"ahash",
"anyhow",
"arrow",
"criterion",
"document-features",
"indent",
Expand Down Expand Up @@ -6627,6 +6628,7 @@ dependencies = [
"itertools 0.13.0",
"re_chunk_store",
"re_dataframe",
"re_error",
"re_format",
"re_log",
"re_log_types",
Expand Down
1 change: 1 addition & 0 deletions crates/store/re_chunk_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ re_types_core.workspace = true
# External dependencies:
ahash.workspace = true
anyhow.workspace = true
arrow.workspace = true
arrow2 = { workspace = true, features = ["compute_concatenate"] }
document-features.workspace = true
indent.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/store/re_chunk_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub use re_chunk::{
pub use re_log_types::{ResolvedTimeRange, TimeInt, TimeType, Timeline};

pub mod external {
pub use arrow;
pub use arrow2;

pub use re_chunk;
Expand Down
43 changes: 38 additions & 5 deletions crates/store/re_dataframe/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use itertools::Itertools;

use nohash_hasher::{IntMap, IntSet};
use re_chunk::{
Chunk, ComponentName, EntityPath, RangeQuery, RowId, TimeInt, Timeline, UnitChunkShared,
external::arrow::array::ArrayRef, Chunk, ComponentName, EntityPath, RangeQuery, RowId, TimeInt,
Timeline, UnitChunkShared,
};
use re_chunk_store::{
ChunkStore, ColumnDescriptor, ColumnSelector, ComponentColumnDescriptor,
Expand Down Expand Up @@ -794,7 +795,39 @@ impl<E: StorageEngineLike> QueryHandle<E> {
/// }
/// ```
#[inline]
pub fn next_row(&self) -> Option<Vec<Box<dyn Arrow2Array>>> {
pub fn next_row(&self) -> Option<Vec<ArrayRef>> {
self.engine
.with(|store, cache| self._next_row(store, cache))
.map(|vec| vec.into_iter().map(|a| a.into()).collect())
}

/// Returns the next row's worth of data.
///
/// The returned vector of Arrow arrays strictly follows the schema specified by [`Self::schema`].
/// Columns that do not yield any data will still be present in the results, filled with null values.
///
/// Each cell in the result corresponds to the latest _locally_ known value at that particular point in
/// the index, for each respective `ColumnDescriptor`.
/// See [`QueryExpression::sparse_fill_strategy`] to go beyond local resolution.
///
/// Example:
/// ```ignore
/// while let Some(row) = query_handle.next_row() {
/// // …
/// }
/// ```
///
/// ## Pagination
///
/// Use [`Self::seek_to_row`]:
/// ```ignore
/// query_handle.seek_to_row(42);
/// for row in query_handle.into_iter().take(len) {
/// // …
/// }
/// ```
#[inline]
fn next_row_arrow2(&self) -> Option<Vec<Box<dyn Arrow2Array>>> {
self.engine
.with(|store, cache| self._next_row(store, cache))
}
Expand Down Expand Up @@ -1239,7 +1272,7 @@ impl<E: StorageEngineLike> QueryHandle<E> {
pub fn next_row_batch(&self) -> Option<RecordBatch> {
Some(RecordBatch {
schema: self.schema().clone(),
data: Arrow2Chunk::new(self.next_row()?),
data: Arrow2Chunk::new(self.next_row_arrow2()?),
})
}

Expand All @@ -1266,13 +1299,13 @@ impl<E: StorageEngineLike> QueryHandle<E> {
/// Returns an iterator backed by [`Self::next_row`].
#[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work
pub fn iter(&self) -> impl Iterator<Item = Vec<Box<dyn Arrow2Array>>> + '_ {
std::iter::from_fn(move || self.next_row())
std::iter::from_fn(move || self.next_row_arrow2())
}

/// Returns an iterator backed by [`Self::next_row`].
#[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work
pub fn into_iter(self) -> impl Iterator<Item = Vec<Box<dyn Arrow2Array>>> {
std::iter::from_fn(move || self.next_row())
std::iter::from_fn(move || self.next_row_arrow2())
}

/// Returns an iterator backed by [`Self::next_row_batch`].
Expand Down
4 changes: 2 additions & 2 deletions crates/utils/re_memory/src/accounting_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ use crate::{
};

/// Only track allocations of at least this size.
const SMALL_SIZE: usize = 128; // TODO(emilk): make this setable by users
const SMALL_SIZE: usize = 128; // TODO(emilk): make this settable by users

/// Allocations smaller than are stochastically sampled.
const MEDIUM_SIZE: usize = 4 * 1024; // TODO(emilk): make this setable by users
const MEDIUM_SIZE: usize = 4 * 1024; // TODO(emilk): make this settable by users

// TODO(emilk): yet another tier would maybe make sense, with a different stochastic rate.

Expand Down
1 change: 1 addition & 0 deletions crates/viewer/re_view_dataframe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ all-features = true
[dependencies]
re_chunk_store.workspace = true
re_dataframe.workspace = true
re_error.workspace = true
re_format.workspace = true
re_log_types.workspace = true
re_log.workspace = true
Expand Down
9 changes: 6 additions & 3 deletions crates/viewer/re_view_dataframe/src/dataframe_ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::collections::BTreeMap;
use std::ops::Range;

use anyhow::Context;
use arrow::array::ArrayRef;
use egui::NumExt as _;
use itertools::Itertools;

use re_chunk_store::external::re_chunk::Arrow2Array;
use re_chunk_store::{ColumnDescriptor, LatestAtQuery};
use re_dataframe::external::re_query::StorageEngineArcReadGuard;
use re_dataframe::QueryHandle;
Expand Down Expand Up @@ -142,7 +142,7 @@ struct RowsDisplayData {
impl RowsDisplayData {
fn try_new(
row_indices: &Range<u64>,
row_data: Vec<Vec<Box<dyn Arrow2Array>>>,
row_data: Vec<Vec<ArrayRef>>,
selected_columns: &[ColumnDescriptor],
query_timeline: &Timeline,
) -> Result<Self, DisplayRecordBatchError> {
Expand Down Expand Up @@ -360,7 +360,10 @@ impl egui_table::TableDelegate for DataframeTableDelegate<'_> {
let display_data = match &self.display_data {
Ok(display_data) => display_data,
Err(err) => {
error_ui(ui, format!("Error with display data: {err}"));
error_ui(
ui,
format!("Error with display data: {}", re_error::format(err)),
);
return;
}
};
Expand Down
103 changes: 60 additions & 43 deletions crates/viewer/re_view_dataframe/src/display_record_batch.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,29 @@
//! Intermediate data structures to make `re_datastore`'s row data more amenable to displaying in a
//! table.

use thiserror::Error;

use re_chunk_store::external::arrow2::{
use arrow::{
array::{
Array as Arrow2Array, DictionaryArray as Arrow2DictionaryArray,
ListArray as Arrow2ListArray, PrimitiveArray as Arrow2PrimitiveArray,
Array as ArrowArray, ArrayRef as ArrowArrayRef,
Int32DictionaryArray as ArrowInt32DictionaryArray, Int64Array as ArrowInt64Array,
ListArray as ArrowListArray, TimestampNanosecondArray as ArrowTimestampNanosecondArray,
},
datatypes::DataType,
datatypes::DataType as Arrow2DataType,
datatypes::DataType as ArrowDataType,
};
use thiserror::Error;

use re_chunk_store::{ColumnDescriptor, ComponentColumnDescriptor, LatestAtQuery};
use re_log_types::{EntityPath, TimeInt, Timeline};
use re_types::external::arrow2::datatypes::IntegerType;
use re_types_core::ComponentName;
use re_ui::UiExt;
use re_viewer_context::{UiLayout, ViewerContext};

#[derive(Error, Debug)]
pub(crate) enum DisplayRecordBatchError {
#[error("Unexpected column data type for timeline '{0}': {1:?}")]
UnexpectedTimeColumnDataType(String, Arrow2DataType),
UnexpectedTimeColumnDataType(String, ArrowDataType),

#[error("Unexpected column data type for component '{0}': {1:?}")]
UnexpectedComponentColumnDataType(String, Arrow2DataType),
UnexpectedComponentColumnDataType(String, ArrowDataType),
}

/// A single column of component data.
Expand All @@ -33,38 +32,37 @@ pub(crate) enum DisplayRecordBatchError {
#[derive(Debug)]
pub(crate) enum ComponentData {
Null,
ListArray(Arrow2ListArray<i32>),
ListArray(ArrowListArray),
DictionaryArray {
dict: Arrow2DictionaryArray<i32>,
values: Arrow2ListArray<i32>,
dict: ArrowInt32DictionaryArray,
values: ArrowListArray,
},
}

impl ComponentData {
#[allow(clippy::borrowed_box)] // https://github.com/rust-lang/rust-clippy/issues/11940
fn try_new(
descriptor: &ComponentColumnDescriptor,
column_data: &Box<dyn Arrow2Array>,
column_data: &ArrowArrayRef,
) -> Result<Self, DisplayRecordBatchError> {
match column_data.data_type() {
DataType::Null => Ok(Self::Null),
DataType::List(_) => Ok(Self::ListArray(
ArrowDataType::Null => Ok(Self::Null),
ArrowDataType::List(_) => Ok(Self::ListArray(
column_data
.as_any()
.downcast_ref::<Arrow2ListArray<i32>>()
.downcast_ref::<ArrowListArray>()
.expect("`data_type` checked, failure is a bug in re_dataframe")
.clone(),
)),
DataType::Dictionary(IntegerType::Int32, _, _) => {
ArrowDataType::Dictionary(_, _) => {
let dict = column_data
.as_any()
.downcast_ref::<Arrow2DictionaryArray<i32>>()
.downcast_ref::<ArrowInt32DictionaryArray>()
.expect("`data_type` checked, failure is a bug in re_dataframe")
.clone();
let values = dict
.values()
.as_any()
.downcast_ref::<Arrow2ListArray<i32>>()
.downcast_ref::<ArrowListArray>()
.expect("`data_type` checked, failure is a bug in re_dataframe")
.clone();
Ok(Self::DictionaryArray { dict, values })
Expand All @@ -90,8 +88,8 @@ impl ComponentData {
}
}
Self::DictionaryArray { dict, values } => {
if dict.is_valid(row_index) {
values.value(dict.key_value(row_index)).len() as u64
if let Some(key) = dict.key(row_index) {
values.value(key).len() as u64
} else {
0
}
Expand Down Expand Up @@ -131,22 +129,20 @@ impl ComponentData {
Self::ListArray(list_array) => list_array
.is_valid(row_index)
.then(|| list_array.value(row_index)),
Self::DictionaryArray { dict, values } => dict
.is_valid(row_index)
.then(|| values.value(dict.key_value(row_index))),
Self::DictionaryArray { dict, values } => {
dict.key(row_index).map(|key| values.value(key))
}
};

if let Some(data) = data {
let data_to_display = if let Some(instance_index) = instance_index {
// Panics if the instance index is out of bound. This is checked in
// `DisplayColumn::data_ui`.
data.sliced(instance_index as usize, 1)
data.slice(instance_index as usize, 1)
} else {
data
};

let data_to_display: arrow::array::ArrayRef = data_to_display.into();

ctx.component_ui_registry.ui_raw(
ctx,
ui,
Expand All @@ -169,7 +165,7 @@ impl ComponentData {
pub(crate) enum DisplayColumn {
Timeline {
timeline: Timeline,
time_data: Arrow2PrimitiveArray<i64>,
time_data: ArrowInt64Array,
},
Component {
entity_path: EntityPath,
Expand All @@ -179,26 +175,47 @@ pub(crate) enum DisplayColumn {
}

impl DisplayColumn {
#[allow(clippy::borrowed_box)] // https://github.com/rust-lang/rust-clippy/issues/11940
fn try_new(
column_descriptor: &ColumnDescriptor,
column_data: &Box<dyn Arrow2Array>,
column_data: &ArrowArrayRef,
) -> Result<Self, DisplayRecordBatchError> {
fn int64_from_nanoseconds(
duration_array: &ArrowTimestampNanosecondArray,
) -> Option<ArrowInt64Array> {
let data = duration_array.to_data();
let buffer = data.buffers().first()?.clone();
arrow::array::ArrayData::builder(arrow::datatypes::DataType::Int64)
.len(duration_array.len())
.add_buffer(buffer)
.build()
.ok()
.map(ArrowInt64Array::from)
}

match column_descriptor {
ColumnDescriptor::Time(desc) => {
let time_data = column_data
let timeline = desc.timeline;

// Sequence timelines are i64, but time columns are nanoseconds (also as i64)
let time_data_result = column_data
.as_any()
.downcast_ref::<Arrow2PrimitiveArray<i64>>()
.ok_or_else(|| {
DisplayRecordBatchError::UnexpectedTimeColumnDataType(
desc.timeline.name().as_str().to_owned(),
column_data.data_type().to_owned(),
)
})?
.clone();
.downcast_ref::<ArrowInt64Array>()
.cloned()
.or_else(|| {
column_data
.as_any()
.downcast_ref::<ArrowTimestampNanosecondArray>()
.and_then(int64_from_nanoseconds)
});
let time_data = time_data_result.ok_or_else(|| {
DisplayRecordBatchError::UnexpectedTimeColumnDataType(
timeline.name().as_str().to_owned(),
column_data.data_type().to_owned(),
)
})?;

Ok(Self::Timeline {
timeline: desc.timeline,
timeline,
time_data,
})
}
Expand Down Expand Up @@ -307,7 +324,7 @@ impl DisplayRecordBatch {
/// The columns in the record batch must match the selected columns. This is guaranteed by
/// `re_datastore`.
pub(crate) fn try_new(
row_data: &Vec<Box<dyn Arrow2Array>>,
row_data: &Vec<ArrowArrayRef>,
selected_columns: &[ColumnDescriptor],
) -> Result<Self, DisplayRecordBatchError> {
let num_rows = row_data.first().map(|arr| arr.len()).unwrap_or(0);
Expand Down
1 change: 1 addition & 0 deletions rerun_py/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,7 @@ impl PyRecordingView {
/// This schema will only contain the columns that are included in the view via
/// the view contents.
fn schema(&self, py: Python<'_>) -> PyResult<PySchema> {
#![allow(clippy::unnecessary_wraps)] // In case of feature != "remote"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated thing I discovered when running cargo clippy (without --all-features)

match &self.recording {
PyRecordingHandle::Local(recording) => {
let borrowed: PyRef<'_, PyRecording> = recording.borrow(py);
Expand Down
Loading