Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace some of our use of polars #4801

Merged
merged 7 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 22 additions & 5 deletions crates/re_data_store/src/store_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::BTreeMap;

use arrow2::Either;
use re_log_types::{
DataCellColumn, DataTable, ErasedTimeVec, RowIdVec, TableId, TimeRange, Timeline,
DataCellColumn, DataRow, DataTable, ErasedTimeVec, RowIdVec, TableId, TimeRange, Timeline,
};

use crate::{
Expand All @@ -13,13 +13,14 @@ use crate::{
// ---

impl DataStore {
/// Serializes the entire datastore into one big sorted [`DataTable`].
/// Serializes the entire datastore into one big sorted list of [`DataRow`].
///
/// Individual [`re_log_types::DataRow`]s that were split apart due to bucketing are merged back together.
///
/// Beware: this is extremely costly, don't use this in hot paths.
pub fn to_data_table(&self) -> re_log_types::DataReadResult<DataTable> {
use re_log_types::{DataRow, RowId};
pub fn to_rows(&self) -> re_log_types::DataReadResult<Vec<DataRow>> {
use re_log_types::RowId;
re_tracing::profile_function!();

let mut rows = ahash::HashMap::<RowId, DataRow>::default();
for table in self.to_data_tables(None) {
Expand All @@ -39,7 +40,23 @@ impl DataStore {
}

let mut rows = rows.into_values().collect::<Vec<_>>();
rows.sort_by_key(|row| (row.timepoint.clone(), row.row_id));
{
re_tracing::profile_scope!("sort_rows");
rows.sort_by_key(|row| (row.timepoint.clone(), row.row_id));
}

Ok(rows)
}

/// Serializes the entire datastore into one big sorted [`DataTable`].
///
/// Individual [`re_log_types::DataRow`]s that were split apart due to bucketing are merged back together.
///
/// Beware: this is extremely costly, don't use this in hot paths.
pub fn to_data_table(&self) -> re_log_types::DataReadResult<DataTable> {
re_tracing::profile_function!();

let rows = self.to_rows()?;

Ok(re_log_types::DataTable::from_rows(
re_log_types::TableId::new(),
Expand Down
7 changes: 2 additions & 5 deletions crates/re_data_store/tests/correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,11 +552,8 @@ fn gc_correct() {
check_still_readable(&store);
}

fn check_still_readable(_store: &DataStore) {
#[cfg(feature = "polars")]
{
_ = _store.to_dataframe(); // simple way of checking that everything is still readable
}
fn check_still_readable(store: &DataStore) {
store.to_data_table().unwrap(); // simple way of checking that everything is still readable
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure what the comment means by "readable"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a simple sanity check that it is still possible to iterate through the entire store and get all the data out in a table.
Pretty basic but has failed quite a few times 😇

}

// This used to panic because the GC will decrement the metadata_registry size trackers before
Expand Down
164 changes: 112 additions & 52 deletions crates/re_data_store/tests/dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use re_data_store::{
test_row, test_util::sanity_unwrap, DataStore, DataStoreStats, GarbageCollectionOptions,
TimeInt, TimeRange, Timeline,
};
use re_log_types::{build_frame_nr, build_log_time, DataTable, EntityPath, TableId};
use re_log_types::{
build_frame_nr, build_log_time, DataRow, DataTable, EntityPath, RowId, TableId,
};
use re_types::components::InstanceKey;
use re_types::datagen::{build_some_colors, build_some_instances, build_some_positions2d};
use re_types_core::Loggable as _;
Expand All @@ -31,6 +33,58 @@ fn insert_table_with_retries(store: &mut DataStore, table: &DataTable) {
}
}

// Panic on RowId clash.
fn insert_table(store: &mut DataStore, table: &DataTable) {
for row in table.to_rows() {
let row = row.unwrap();
store.insert_row(&row).unwrap();
}
}

// ---

/// Allows adding more data to the same `RowId`.
#[derive(Default)]
struct RowSet(ahash::HashMap<RowId, DataRow>);

impl RowSet {
fn insert_tables(&mut self, tables: impl Iterator<Item = DataTable>) {
for table in tables {
self.insert_table(&table);
}
}

fn insert_table(&mut self, table: &DataTable) {
for row in table.to_rows() {
self.insert_row(row.unwrap());
}
}

fn insert_row(&mut self, row: re_log_types::DataRow) {
match self.0.entry(row.row_id()) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
assert_eq!(entry.get().entity_path(), row.entity_path());
assert_eq!(entry.get().cells(), row.cells());
assert_eq!(entry.get().num_instances(), row.num_instances());
for (timeline, time) in row.timepoint() {
entry.get_mut().timepoint.insert(*timeline, *time);
}
emilk marked this conversation as resolved.
Show resolved Hide resolved
}
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(row);
}
}
}

fn insert_into(self, store: &mut DataStore) {
let mut rows = self.0.into_values().collect::<Vec<_>>();
rows.sort_by_key(|row| (row.timepoint.clone(), row.row_id));
for row in rows {
store.insert_row(&row).unwrap();
}
}
}

// --- Dump ---

#[test]
Expand Down Expand Up @@ -69,17 +123,6 @@ fn data_store_dump() {
}

fn data_store_dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: &mut DataStore) {
// helper to insert a table both as a temporal and timeless payload
let insert_table = |store: &mut DataStore, table: &DataTable| {
// insert temporal
insert_table_with_retries(store, table);

// insert timeless
let mut table_timeless = table.clone();
table_timeless.col_timelines = Default::default();
insert_table_with_retries(store, &table_timeless);
};

let ent_paths = ["this/that", "other", "yet/another/one"];
let tables = ent_paths
.iter()
Expand All @@ -88,34 +131,45 @@ fn data_store_dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3:

// Fill the first store.
for table in &tables {
// insert temporal
insert_table(store1, table);

// insert timeless
let mut table_timeless = table.clone();
table_timeless.col_timelines = Default::default();
insert_table_with_retries(store1, &table_timeless);
}
sanity_unwrap(store1);

// Dump the first store into the second one.
for table in store1.to_data_tables(None) {
insert_table_with_retries(store2, &table);
{
// We use a RowSet instead of a DataTable to handle duplicate RowIds.
let mut row_set = RowSet::default();
row_set.insert_tables(store1.to_data_tables(None));
row_set.insert_into(store2);
sanity_unwrap(store2);
Comment on lines +145 to +150
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old code would use "retrying" to generate new row-ids, which mean the resulting DataTables weren't the same anymore, hence the new RowSet test struct.

The old test worked because to_dataframe ignored rowids

}
sanity_unwrap(store2);

// Dump the second store into the third one.
for table in store2.to_data_tables(None) {
insert_table_with_retries(store3, &table);
{
let mut row_set = RowSet::default();
row_set.insert_tables(store2.to_data_tables(None));
row_set.insert_into(store3);
sanity_unwrap(store3);
}
sanity_unwrap(store3);

#[cfg(feature = "polars")]
{
let store1_df = store1.to_dataframe();
let store2_df = store2.to_dataframe();
let store3_df = store3.to_dataframe();
let table_id = TableId::new(); // Reuse TableId so == works
let table1 = DataTable::from_rows(table_id, store1.to_rows().unwrap());
let table2 = DataTable::from_rows(table_id, store2.to_rows().unwrap());
let table3 = DataTable::from_rows(table_id, store3.to_rows().unwrap());
assert!(
store1_df == store2_df,
"First & second stores differ:\n{store1_df}\n{store2_df}"
table1 == table2,
"First & second stores differ:\n{table1}\n{table2}"
);
assert!(
store1_df == store3_df,
"First & third stores differ:\n{store1_df}\n{store3_df}"
table1 == table3,
"First & third stores differ:\n{table1}\n{table3}"
);
}

Expand Down Expand Up @@ -183,39 +237,45 @@ fn data_store_dump_filtered_impl(store1: &mut DataStore, store2: &mut DataStore)

// Fill the first store.
for table in &tables {
insert_table_with_retries(store1, table);
insert_table(store1, table);
}
sanity_unwrap(store1);

// Dump frame1 from the first store into the second one.
for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame1, frame1)).into()) {
insert_table_with_retries(store2, &table);
}
// Dump frame2 from the first store into the second one.
for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame2, frame2)).into()) {
insert_table_with_retries(store2, &table);
}
// Dump frame3 from the first store into the second one.
for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame3, frame3)).into()) {
insert_table_with_retries(store2, &table);
}
// Dump the other frame3 from the first store into the second one.
for table in store1.to_data_tables((timeline_log_time, TimeRange::new(frame3, frame3)).into()) {
insert_table_with_retries(store2, &table);
}
// Dump frame4 from the first store into the second one.
for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame4, frame4)).into()) {
insert_table_with_retries(store2, &table);
}
// We use a RowSet instead of a DataTable to handle duplicate RowIds.
let mut row_set = RowSet::default();

// Dump frame1 from the first store.
row_set.insert_tables(
store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame1, frame1)).into()),
);
// Dump frame2 from the first store.
row_set.insert_tables(
store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame2, frame2)).into()),
);
// Dump frame3 from the first store.
row_set.insert_tables(
store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame3, frame3)).into()),
);
// Dump frame3 _from the other timeline_, from the first store.
// This will produce the same RowIds again!
row_set.insert_tables(
store1.to_data_tables((timeline_log_time, TimeRange::new(frame3, frame3)).into()),
);
// Dump frame4 from the first store.
row_set.insert_tables(
store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame4, frame4)).into()),
);

row_set.insert_into(store2);
sanity_unwrap(store2);

#[cfg(feature = "polars")]
{
let store1_df = store1.to_dataframe();
let store2_df = store2.to_dataframe();
let table_id = TableId::new(); // Reuse TableId so == works
let table1 = DataTable::from_rows(table_id, store1.to_rows().unwrap());
let table2 = DataTable::from_rows(table_id, store2.to_rows().unwrap());
assert!(
store1_df == store2_df,
"First & second stores differ:\n{store1_df}\n{store2_df}"
table1 == table2,
"First & second stores differ:\n{table1}\n{table2}"
);
}

Expand Down
15 changes: 14 additions & 1 deletion crates/re_log_types/src/data_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl PartialEq for DataCell {
/// virtual calls.
///
/// See #1746 for details.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone)]
pub struct DataCellInner {
/// Name of the component type used in this cell.
//
Expand All @@ -163,6 +163,19 @@ pub struct DataCellInner {
pub(crate) values: Box<dyn arrow2::array::Array>,
}

impl PartialEq for DataCellInner {
#[inline]
fn eq(&self, rhs: &Self) -> bool {
let Self {
name,
size_bytes: _, // we ignore the size (it may be 0 = uncomputed)
values,
} = self;

name == &rhs.name && values.eq(&rhs.values)
}
}

// TODO(#1696): We shouldn't have to specify the component name separately, this should be
// part of the metadata by using an extension.
// TODO(#1696): Check that the array is indeed a leaf / component type when building a cell from an
Expand Down
1 change: 1 addition & 0 deletions crates/re_log_types/src/example_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use re_types_core::{Loggable, SizeBytes};

// ----------------------------------------------------------------------------

#[derive(Debug)]
pub struct MyPoints;

impl re_types_core::Archetype for MyPoints {
Expand Down
1 change: 1 addition & 0 deletions crates/re_query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ arrow2.workspace = true
backtrace.workspace = true
document-features.workspace = true
itertools = { workspace = true }
smallvec.workspace = true
thiserror.workspace = true

# Optional dependencies:
Expand Down
37 changes: 36 additions & 1 deletion crates/re_query/src/archetype_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{collections::BTreeMap, marker::PhantomData};

use arrow2::array::{Array, PrimitiveArray};
use re_format::arrow;
use re_log_types::{DataCell, RowId};
use re_log_types::{DataCell, DataCellRow, RowId};
use re_types_core::{
components::InstanceKey, Archetype, Component, ComponentName, DeserializationError,
DeserializationResult, Loggable, SerializationResult,
Expand Down Expand Up @@ -116,6 +116,15 @@ impl ComponentWithInstances {
values: DataCell::from_arrow(C::name(), values),
})
}

#[inline]
pub fn into_data_cell_row(self) -> DataCellRow {
let Self {
instance_keys,
values,
} = self;
DataCellRow(smallvec::smallvec![instance_keys, values])
}
}

/// Iterator over a single [`Component`] joined onto a primary [`Component`]
Expand Down Expand Up @@ -519,6 +528,32 @@ impl<A: Archetype> ArchetypeView<A> {
.map(|comp| (comp.name(), comp.values.to_arrow())),
)?)
}

/// Useful for tests.
pub fn to_data_cell_row_1<
'a,
C1: re_types_core::Component + Clone + Into<::std::borrow::Cow<'a, C1>> + 'a,
>(
&self,
) -> crate::Result<DataCellRow> {
let cell0 = DataCell::from_native(self.iter_instance_keys());
let cell1 = DataCell::from_native_sparse(self.iter_optional_component::<C1>()?);
Ok(DataCellRow(smallvec::smallvec![cell0, cell1]))
}

/// Useful for tests.
pub fn to_data_cell_row_2<
'a,
C1: re_types_core::Component + Clone + Into<::std::borrow::Cow<'a, C1>> + 'a,
C2: re_types_core::Component + Clone + Into<::std::borrow::Cow<'a, C2>> + 'a,
>(
&self,
) -> crate::Result<DataCellRow> {
let cell0 = DataCell::from_native(self.iter_instance_keys());
let cell1 = DataCell::from_native_sparse(self.iter_optional_component::<C1>()?);
let cell2 = DataCell::from_native_sparse(self.iter_optional_component::<C2>()?);
Ok(DataCellRow(smallvec::smallvec![cell0, cell1, cell2]))
}
}

#[test]
Expand Down
Loading
Loading