Skip to content

Commit

Permalink
Replace some of our use of polars (#4801)
Browse files Browse the repository at this point in the history
### What
* Part of #4789

We're using polars for some tests.
This PR replaces polars with our own data tables for the tests.


https://github.com/rerun-io/rerun/blob/main/crates/re_data_store/tests/data_store.rs
is left, and a pain in the ass.

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested the web demo (if applicable):
* Using newly built examples:
[app.rerun.io](https://app.rerun.io/pr/4801/index.html)
* Using examples from latest `main` build:
[app.rerun.io](https://app.rerun.io/pr/4801/index.html?manifest_url=https://app.rerun.io/version/main/examples_manifest.json)
* Using full set of examples from `nightly` build:
[app.rerun.io](https://app.rerun.io/pr/4801/index.html?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json)
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG

- [PR Build Summary](https://build.rerun.io/pr/4801)
- [Docs
preview](https://rerun.io/preview/5f0bfe1996e12b974d6ed306977628e8e9572386/docs)
<!--DOCS-PREVIEW-->
- [Examples
preview](https://rerun.io/preview/5f0bfe1996e12b974d6ed306977628e8e9572386/examples)
<!--EXAMPLES-PREVIEW-->
- [Recent benchmark results](https://build.rerun.io/graphs/crates.html)
- [Wasm size tracking](https://build.rerun.io/graphs/sizes.html)

---------

Co-authored-by: Clement Rey <[email protected]>
  • Loading branch information
emilk and teh-cmc authored Jan 15, 2024
1 parent 2e451af commit d8569ee
Show file tree
Hide file tree
Showing 13 changed files with 442 additions and 290 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 22 additions & 5 deletions crates/re_data_store/src/store_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::BTreeMap;

use arrow2::Either;
use re_log_types::{
DataCellColumn, DataTable, ErasedTimeVec, RowIdVec, TableId, TimeRange, Timeline,
DataCellColumn, DataRow, DataTable, ErasedTimeVec, RowIdVec, TableId, TimeRange, Timeline,
};

use crate::{
Expand All @@ -13,13 +13,14 @@ use crate::{
// ---

impl DataStore {
/// Serializes the entire datastore into one big sorted [`DataTable`].
/// Serializes the entire datastore into one big sorted list of [`DataRow`].
///
/// Individual [`re_log_types::DataRow`]s that were split apart due to bucketing are merged back together.
///
/// Beware: this is extremely costly, don't use this in hot paths.
pub fn to_data_table(&self) -> re_log_types::DataReadResult<DataTable> {
use re_log_types::{DataRow, RowId};
pub fn to_rows(&self) -> re_log_types::DataReadResult<Vec<DataRow>> {
use re_log_types::RowId;
re_tracing::profile_function!();

let mut rows = ahash::HashMap::<RowId, DataRow>::default();
for table in self.to_data_tables(None) {
Expand All @@ -39,7 +40,23 @@ impl DataStore {
}

let mut rows = rows.into_values().collect::<Vec<_>>();
rows.sort_by_key(|row| (row.timepoint.clone(), row.row_id));
{
re_tracing::profile_scope!("sort_rows");
rows.sort_by_key(|row| (row.timepoint.clone(), row.row_id));
}

Ok(rows)
}

/// Serializes the entire datastore into one big sorted [`DataTable`].
///
/// Individual [`re_log_types::DataRow`]s that were split apart due to bucketing are merged back together.
///
/// Beware: this is extremely costly, don't use this in hot paths.
pub fn to_data_table(&self) -> re_log_types::DataReadResult<DataTable> {
re_tracing::profile_function!();

let rows = self.to_rows()?;

Ok(re_log_types::DataTable::from_rows(
re_log_types::TableId::new(),
Expand Down
7 changes: 2 additions & 5 deletions crates/re_data_store/tests/correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,11 +552,8 @@ fn gc_correct() {
check_still_readable(&store);
}

fn check_still_readable(_store: &DataStore) {
#[cfg(feature = "polars")]
{
_ = _store.to_dataframe(); // simple way of checking that everything is still readable
}
fn check_still_readable(store: &DataStore) {
store.to_data_table().unwrap(); // simple way of checking that everything is still readable
}

// This used to panic because the GC will decrement the metadata_registry size trackers before
Expand Down
164 changes: 112 additions & 52 deletions crates/re_data_store/tests/dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use re_data_store::{
test_row, test_util::sanity_unwrap, DataStore, DataStoreStats, GarbageCollectionOptions,
TimeInt, TimeRange, Timeline,
};
use re_log_types::{build_frame_nr, build_log_time, DataTable, EntityPath, TableId};
use re_log_types::{
build_frame_nr, build_log_time, DataRow, DataTable, EntityPath, RowId, TableId,
};
use re_types::components::InstanceKey;
use re_types::datagen::{build_some_colors, build_some_instances, build_some_positions2d};
use re_types_core::Loggable as _;
Expand All @@ -31,6 +33,58 @@ fn insert_table_with_retries(store: &mut DataStore, table: &DataTable) {
}
}

// Panic on RowId clash.
fn insert_table(store: &mut DataStore, table: &DataTable) {
for row in table.to_rows() {
let row = row.unwrap();
store.insert_row(&row).unwrap();
}
}

// ---

/// Allows adding more data to the same `RowId`.
#[derive(Default)]
struct RowSet(ahash::HashMap<RowId, DataRow>);

impl RowSet {
fn insert_tables(&mut self, tables: impl Iterator<Item = DataTable>) {
for table in tables {
self.insert_table(&table);
}
}

fn insert_table(&mut self, table: &DataTable) {
for row in table.to_rows() {
self.insert_row(row.unwrap());
}
}

fn insert_row(&mut self, row: re_log_types::DataRow) {
match self.0.entry(row.row_id()) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
assert_eq!(entry.get().entity_path(), row.entity_path());
assert_eq!(entry.get().cells(), row.cells());
assert_eq!(entry.get().num_instances(), row.num_instances());
for (timeline, time) in row.timepoint() {
entry.get_mut().timepoint.insert(*timeline, *time);
}
}
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(row);
}
}
}

fn insert_into(self, store: &mut DataStore) {
let mut rows = self.0.into_values().collect::<Vec<_>>();
rows.sort_by_key(|row| (row.timepoint.clone(), row.row_id));
for row in rows {
store.insert_row(&row).unwrap();
}
}
}

// --- Dump ---

#[test]
Expand Down Expand Up @@ -69,17 +123,6 @@ fn data_store_dump() {
}

fn data_store_dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: &mut DataStore) {
// helper to insert a table both as a temporal and timeless payload
let insert_table = |store: &mut DataStore, table: &DataTable| {
// insert temporal
insert_table_with_retries(store, table);

// insert timeless
let mut table_timeless = table.clone();
table_timeless.col_timelines = Default::default();
insert_table_with_retries(store, &table_timeless);
};

let ent_paths = ["this/that", "other", "yet/another/one"];
let tables = ent_paths
.iter()
Expand All @@ -88,34 +131,45 @@ fn data_store_dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3:

// Fill the first store.
for table in &tables {
// insert temporal
insert_table(store1, table);

// insert timeless
let mut table_timeless = table.clone();
table_timeless.col_timelines = Default::default();
insert_table_with_retries(store1, &table_timeless);
}
sanity_unwrap(store1);

// Dump the first store into the second one.
for table in store1.to_data_tables(None) {
insert_table_with_retries(store2, &table);
{
// We use a RowSet instead of a DataTable to handle duplicate RowIds.
let mut row_set = RowSet::default();
row_set.insert_tables(store1.to_data_tables(None));
row_set.insert_into(store2);
sanity_unwrap(store2);
}
sanity_unwrap(store2);

// Dump the second store into the third one.
for table in store2.to_data_tables(None) {
insert_table_with_retries(store3, &table);
{
let mut row_set = RowSet::default();
row_set.insert_tables(store2.to_data_tables(None));
row_set.insert_into(store3);
sanity_unwrap(store3);
}
sanity_unwrap(store3);

#[cfg(feature = "polars")]
{
let store1_df = store1.to_dataframe();
let store2_df = store2.to_dataframe();
let store3_df = store3.to_dataframe();
let table_id = TableId::new(); // Reuse TableId so == works
let table1 = DataTable::from_rows(table_id, store1.to_rows().unwrap());
let table2 = DataTable::from_rows(table_id, store2.to_rows().unwrap());
let table3 = DataTable::from_rows(table_id, store3.to_rows().unwrap());
assert!(
store1_df == store2_df,
"First & second stores differ:\n{store1_df}\n{store2_df}"
table1 == table2,
"First & second stores differ:\n{table1}\n{table2}"
);
assert!(
store1_df == store3_df,
"First & third stores differ:\n{store1_df}\n{store3_df}"
table1 == table3,
"First & third stores differ:\n{table1}\n{table3}"
);
}

Expand Down Expand Up @@ -183,39 +237,45 @@ fn data_store_dump_filtered_impl(store1: &mut DataStore, store2: &mut DataStore)

// Fill the first store.
for table in &tables {
insert_table_with_retries(store1, table);
insert_table(store1, table);
}
sanity_unwrap(store1);

// Dump frame1 from the first store into the second one.
for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame1, frame1)).into()) {
insert_table_with_retries(store2, &table);
}
// Dump frame2 from the first store into the second one.
for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame2, frame2)).into()) {
insert_table_with_retries(store2, &table);
}
// Dump frame3 from the first store into the second one.
for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame3, frame3)).into()) {
insert_table_with_retries(store2, &table);
}
// Dump the other frame3 from the first store into the second one.
for table in store1.to_data_tables((timeline_log_time, TimeRange::new(frame3, frame3)).into()) {
insert_table_with_retries(store2, &table);
}
// Dump frame4 from the first store into the second one.
for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame4, frame4)).into()) {
insert_table_with_retries(store2, &table);
}
// We use a RowSet instead of a DataTable to handle duplicate RowIds.
let mut row_set = RowSet::default();

// Dump frame1 from the first store.
row_set.insert_tables(
store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame1, frame1)).into()),
);
// Dump frame2 from the first store.
row_set.insert_tables(
store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame2, frame2)).into()),
);
// Dump frame3 from the first store.
row_set.insert_tables(
store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame3, frame3)).into()),
);
// Dump frame3 _from the other timeline_, from the first store.
// This will produce the same RowIds again!
row_set.insert_tables(
store1.to_data_tables((timeline_log_time, TimeRange::new(frame3, frame3)).into()),
);
// Dump frame4 from the first store.
row_set.insert_tables(
store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame4, frame4)).into()),
);

row_set.insert_into(store2);
sanity_unwrap(store2);

#[cfg(feature = "polars")]
{
let store1_df = store1.to_dataframe();
let store2_df = store2.to_dataframe();
let table_id = TableId::new(); // Reuse TableId so == works
let table1 = DataTable::from_rows(table_id, store1.to_rows().unwrap());
let table2 = DataTable::from_rows(table_id, store2.to_rows().unwrap());
assert!(
store1_df == store2_df,
"First & second stores differ:\n{store1_df}\n{store2_df}"
table1 == table2,
"First & second stores differ:\n{table1}\n{table2}"
);
}

Expand Down
15 changes: 14 additions & 1 deletion crates/re_log_types/src/data_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl PartialEq for DataCell {
/// virtual calls.
///
/// See #1746 for details.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone)]
pub struct DataCellInner {
/// Name of the component type used in this cell.
//
Expand All @@ -163,6 +163,19 @@ pub struct DataCellInner {
pub(crate) values: Box<dyn arrow2::array::Array>,
}

impl PartialEq for DataCellInner {
#[inline]
fn eq(&self, rhs: &Self) -> bool {
let Self {
name,
size_bytes: _, // we ignore the size (it may be 0 = uncomputed)
values,
} = self;

name == &rhs.name && values.eq(&rhs.values)
}
}

// TODO(#1696): We shouldn't have to specify the component name separately, this should be
// part of the metadata by using an extension.
// TODO(#1696): Check that the array is indeed a leaf / component type when building a cell from an
Expand Down
1 change: 1 addition & 0 deletions crates/re_log_types/src/example_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use re_types_core::{Loggable, SizeBytes};

// ----------------------------------------------------------------------------

#[derive(Debug)]
pub struct MyPoints;

impl re_types_core::Archetype for MyPoints {
Expand Down
1 change: 1 addition & 0 deletions crates/re_query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ arrow2.workspace = true
backtrace.workspace = true
document-features.workspace = true
itertools = { workspace = true }
smallvec.workspace = true
thiserror.workspace = true

# Optional dependencies:
Expand Down
37 changes: 36 additions & 1 deletion crates/re_query/src/archetype_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{collections::BTreeMap, marker::PhantomData};

use arrow2::array::{Array, PrimitiveArray};
use re_format::arrow;
use re_log_types::{DataCell, RowId};
use re_log_types::{DataCell, DataCellRow, RowId};
use re_types_core::{
components::InstanceKey, Archetype, Component, ComponentName, DeserializationError,
DeserializationResult, Loggable, SerializationResult,
Expand Down Expand Up @@ -116,6 +116,15 @@ impl ComponentWithInstances {
values: DataCell::from_arrow(C::name(), values),
})
}

#[inline]
pub fn into_data_cell_row(self) -> DataCellRow {
let Self {
instance_keys,
values,
} = self;
DataCellRow(smallvec::smallvec![instance_keys, values])
}
}

/// Iterator over a single [`Component`] joined onto a primary [`Component`]
Expand Down Expand Up @@ -519,6 +528,32 @@ impl<A: Archetype> ArchetypeView<A> {
.map(|comp| (comp.name(), comp.values.to_arrow())),
)?)
}

/// Useful for tests.
pub fn to_data_cell_row_1<
'a,
C1: re_types_core::Component + Clone + Into<::std::borrow::Cow<'a, C1>> + 'a,
>(
&self,
) -> crate::Result<DataCellRow> {
let cell0 = DataCell::from_native(self.iter_instance_keys());
let cell1 = DataCell::from_native_sparse(self.iter_optional_component::<C1>()?);
Ok(DataCellRow(smallvec::smallvec![cell0, cell1]))
}

/// Useful for tests.
pub fn to_data_cell_row_2<
'a,
C1: re_types_core::Component + Clone + Into<::std::borrow::Cow<'a, C1>> + 'a,
C2: re_types_core::Component + Clone + Into<::std::borrow::Cow<'a, C2>> + 'a,
>(
&self,
) -> crate::Result<DataCellRow> {
let cell0 = DataCell::from_native(self.iter_instance_keys());
let cell1 = DataCell::from_native_sparse(self.iter_optional_component::<C1>()?);
let cell2 = DataCell::from_native_sparse(self.iter_optional_component::<C2>()?);
Ok(DataCellRow(smallvec::smallvec![cell0, cell1, cell2]))
}
}

#[test]
Expand Down
Loading

0 comments on commit d8569ee

Please sign in to comment.