-
Notifications
You must be signed in to change notification settings - Fork 384
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace some of our use of polars
#4801
Changes from 6 commits
ec8668d
d0105e3
cf9b77e
531a92d
df6f82d
99f09de
5f0bfe1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,9 @@ use re_data_store::{ | |
test_row, test_util::sanity_unwrap, DataStore, DataStoreStats, GarbageCollectionOptions, | ||
TimeInt, TimeRange, Timeline, | ||
}; | ||
use re_log_types::{build_frame_nr, build_log_time, DataTable, EntityPath, TableId}; | ||
use re_log_types::{ | ||
build_frame_nr, build_log_time, DataRow, DataTable, EntityPath, RowId, TableId, | ||
}; | ||
use re_types::components::InstanceKey; | ||
use re_types::datagen::{build_some_colors, build_some_instances, build_some_positions2d}; | ||
use re_types_core::Loggable as _; | ||
|
@@ -31,6 +33,55 @@ fn insert_table_with_retries(store: &mut DataStore, table: &DataTable) { | |
} | ||
} | ||
|
||
// Panic on RowId clash. | ||
fn insert_table(store: &mut DataStore, table: &DataTable) { | ||
for row in table.to_rows() { | ||
let row = row.unwrap(); | ||
store.insert_row(&row).unwrap(); | ||
} | ||
} | ||
|
||
// --- | ||
|
||
/// Allows adding more data to the same `RowId`. | ||
#[derive(Default)] | ||
struct RowSet(ahash::HashMap<RowId, DataRow>); | ||
|
||
impl RowSet { | ||
fn insert_tables(&mut self, tables: impl Iterator<Item = DataTable>) { | ||
for table in tables { | ||
self.insert_table(&table); | ||
} | ||
} | ||
|
||
fn insert_table(&mut self, table: &DataTable) { | ||
for row in table.to_rows() { | ||
self.insert_row(row.unwrap()); | ||
} | ||
} | ||
|
||
fn insert_row(&mut self, row: re_log_types::DataRow) { | ||
match self.0.entry(row.row_id()) { | ||
std::collections::hash_map::Entry::Occupied(mut entry) => { | ||
for (timeline, time) in row.timepoint() { | ||
entry.get_mut().timepoint.insert(*timeline, *time); | ||
} | ||
emilk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
std::collections::hash_map::Entry::Vacant(entry) => { | ||
entry.insert(row); | ||
} | ||
} | ||
} | ||
|
||
fn insert_into(self, store: &mut DataStore) { | ||
let mut rows = self.0.into_values().collect::<Vec<_>>(); | ||
rows.sort_by_key(|row| (row.timepoint.clone(), row.row_id)); | ||
for row in rows { | ||
store.insert_row(&row).unwrap(); | ||
} | ||
} | ||
} | ||
|
||
// --- Dump --- | ||
|
||
#[test] | ||
|
@@ -69,17 +120,6 @@ fn data_store_dump() { | |
} | ||
|
||
fn data_store_dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: &mut DataStore) { | ||
// helper to insert a table both as a temporal and timeless payload | ||
let insert_table = |store: &mut DataStore, table: &DataTable| { | ||
// insert temporal | ||
insert_table_with_retries(store, table); | ||
|
||
// insert timeless | ||
let mut table_timeless = table.clone(); | ||
table_timeless.col_timelines = Default::default(); | ||
insert_table_with_retries(store, &table_timeless); | ||
}; | ||
|
||
let ent_paths = ["this/that", "other", "yet/another/one"]; | ||
let tables = ent_paths | ||
.iter() | ||
|
@@ -88,34 +128,45 @@ fn data_store_dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: | |
|
||
// Fill the first store. | ||
for table in &tables { | ||
// insert temporal | ||
insert_table(store1, table); | ||
|
||
// insert timeless | ||
let mut table_timeless = table.clone(); | ||
table_timeless.col_timelines = Default::default(); | ||
insert_table_with_retries(store1, &table_timeless); | ||
} | ||
sanity_unwrap(store1); | ||
|
||
// Dump the first store into the second one. | ||
for table in store1.to_data_tables(None) { | ||
insert_table_with_retries(store2, &table); | ||
{ | ||
// We use a RowSet instead of a DataTable to handle duplicate RowIds. | ||
let mut row_set = RowSet::default(); | ||
row_set.insert_tables(store1.to_data_tables(None)); | ||
row_set.insert_into(store2); | ||
sanity_unwrap(store2); | ||
Comment on lines
+145
to
+150
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The old code would use "retrying" to generate new row-ids, which mean the resulting DataTables weren't the same anymore, hence the new The old test worked because |
||
} | ||
sanity_unwrap(store2); | ||
|
||
// Dump the second store into the third one. | ||
for table in store2.to_data_tables(None) { | ||
insert_table_with_retries(store3, &table); | ||
{ | ||
let mut row_set = RowSet::default(); | ||
row_set.insert_tables(store2.to_data_tables(None)); | ||
row_set.insert_into(store3); | ||
sanity_unwrap(store3); | ||
} | ||
sanity_unwrap(store3); | ||
|
||
#[cfg(feature = "polars")] | ||
{ | ||
let store1_df = store1.to_dataframe(); | ||
let store2_df = store2.to_dataframe(); | ||
let store3_df = store3.to_dataframe(); | ||
let table_id = TableId::new(); // Reuse TableId so == works | ||
let table1 = DataTable::from_rows(table_id, store1.to_rows().unwrap()); | ||
let table2 = DataTable::from_rows(table_id, store2.to_rows().unwrap()); | ||
let table3 = DataTable::from_rows(table_id, store3.to_rows().unwrap()); | ||
assert!( | ||
store1_df == store2_df, | ||
"First & second stores differ:\n{store1_df}\n{store2_df}" | ||
table1 == table2, | ||
"First & second stores differ:\n{table1}\n{table2}" | ||
); | ||
assert!( | ||
store1_df == store3_df, | ||
"First & third stores differ:\n{store1_df}\n{store3_df}" | ||
table1 == table3, | ||
"First & third stores differ:\n{table1}\n{table3}" | ||
); | ||
} | ||
|
||
|
@@ -183,39 +234,45 @@ fn data_store_dump_filtered_impl(store1: &mut DataStore, store2: &mut DataStore) | |
|
||
// Fill the first store. | ||
for table in &tables { | ||
insert_table_with_retries(store1, table); | ||
insert_table(store1, table); | ||
} | ||
sanity_unwrap(store1); | ||
|
||
// Dump frame1 from the first store into the second one. | ||
for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame1, frame1)).into()) { | ||
insert_table_with_retries(store2, &table); | ||
} | ||
// Dump frame2 from the first store into the second one. | ||
for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame2, frame2)).into()) { | ||
insert_table_with_retries(store2, &table); | ||
} | ||
// Dump frame3 from the first store into the second one. | ||
for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame3, frame3)).into()) { | ||
insert_table_with_retries(store2, &table); | ||
} | ||
// Dump the other frame3 from the first store into the second one. | ||
for table in store1.to_data_tables((timeline_log_time, TimeRange::new(frame3, frame3)).into()) { | ||
insert_table_with_retries(store2, &table); | ||
} | ||
// Dump frame4 from the first store into the second one. | ||
for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame4, frame4)).into()) { | ||
insert_table_with_retries(store2, &table); | ||
} | ||
// We use a RowSet instead of a DataTable to handle duplicate RowIds. | ||
let mut row_set = RowSet::default(); | ||
|
||
// Dump frame1 from the first store. | ||
row_set.insert_tables( | ||
store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame1, frame1)).into()), | ||
); | ||
// Dump frame2 from the first store. | ||
row_set.insert_tables( | ||
store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame2, frame2)).into()), | ||
); | ||
// Dump frame3 from the first store. | ||
row_set.insert_tables( | ||
store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame3, frame3)).into()), | ||
); | ||
// Dump frame3 _from the other timeline_, from the first store. | ||
// This will produce the same RowIds again! | ||
row_set.insert_tables( | ||
store1.to_data_tables((timeline_log_time, TimeRange::new(frame3, frame3)).into()), | ||
); | ||
// Dump frame4 from the first store. | ||
row_set.insert_tables( | ||
store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame4, frame4)).into()), | ||
); | ||
|
||
row_set.insert_into(store2); | ||
sanity_unwrap(store2); | ||
|
||
#[cfg(feature = "polars")] | ||
{ | ||
let store1_df = store1.to_dataframe(); | ||
let store2_df = store2.to_dataframe(); | ||
let table_id = TableId::new(); // Reuse TableId so == works | ||
let table1 = DataTable::from_rows(table_id, store1.to_rows().unwrap()); | ||
let table2 = DataTable::from_rows(table_id, store2.to_rows().unwrap()); | ||
assert!( | ||
store1_df == store2_df, | ||
"First & second stores differ:\n{store1_df}\n{store2_df}" | ||
table1 == table2, | ||
"First & second stores differ:\n{table1}\n{table2}" | ||
); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure what the comment means by "readable"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a simple sanity check that it is still possible to iterate through the entire store and get all the data out in a table.
Pretty basic but has failed quite a few times 😇