From bb3938fc40b921b12c6c1e57a376cd3e0eab3b6d Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Mon, 10 Apr 2023 02:00:02 +0200 Subject: [PATCH] wip --- Cargo.lock | 1 + crates/re_arrow_store/src/store.rs | 15 +- crates/re_arrow_store/src/store_gc.rs | 208 ++++++++++++++++++++- crates/re_arrow_store/src/store_read.rs | 5 +- crates/re_arrow_store/src/store_sanity.rs | 14 +- crates/re_arrow_store/src/store_stats.rs | 26 +++ crates/re_arrow_store/src/test_util.rs | 20 +- crates/re_arrow_store/tests/correctness.rs | 97 ++++------ crates/re_arrow_store/tests/data_store.rs | 52 +++--- crates/re_arrow_store/tests/dump.rs | 24 ++- crates/re_data_store/Cargo.toml | 3 +- crates/re_data_store/src/log_db.rs | 18 +- crates/re_log_types/src/data_cell.rs | 80 +++++++- crates/re_log_types/src/data_table.rs | 48 +++++ crates/re_log_types/src/time_point/mod.rs | 16 ++ examples/rust/objectron/Cargo.toml | 3 +- 16 files changed, 505 insertions(+), 125 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c59f66796d836..db79d1079f95c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3839,6 +3839,7 @@ dependencies = [ "puffin", "rand", "re_arrow_store", + "re_format", "re_int_histogram", "re_log", "re_log_encoding", diff --git a/crates/re_arrow_store/src/store.rs b/crates/re_arrow_store/src/store.rs index fa191ab12aa0f..823a692f99cb0 100644 --- a/crates/re_arrow_store/src/store.rs +++ b/crates/re_arrow_store/src/store.rs @@ -67,10 +67,11 @@ pub type InsertIdVec = SmallVec<[u64; 4]>; /// so far. /// /// See also [`DataStore::lookup_datatype`]. -#[derive(Default)] +#[derive(Debug, Default)] pub struct DataTypeRegistry(IntMap); impl DataTypeRegistry { + // TODO: why is this here tho // Estimated size in bytes of the registry. pub fn size_bytes(&self) -> u64 { type K = ComponentName; @@ -104,10 +105,11 @@ impl std::ops::DerefMut for DataTypeRegistry { } /// Keeps track of arbitrary per-row metadata. -#[derive(Default)] +#[derive(Debug, Default)] pub struct MetadataRegistry(BTreeMap); impl MetadataRegistry { + // TODO: why is this here tho // Estimated size in bytes of the registry. pub fn size_bytes(&self) -> u64 { type K = RowId; @@ -118,9 +120,12 @@ impl MetadataRegistry { let inner: &BTreeMap = &self.0; let keys_size_bytes = std::mem::size_of::() * inner.len(); - let values_size_bytes = std::mem::size_of::() * inner.len(); + let values_size_bytes = inner + .values() + .map(|timepoint| timepoint.size_bytes()) + .sum::(); - keys_size_bytes as u64 + values_size_bytes as u64 + keys_size_bytes as u64 + values_size_bytes } } @@ -178,7 +183,7 @@ pub struct DataStore { /// Keeps track of arbitrary per-row metadata. /// /// Only used to map `RowId`s to their original [`TimePoint`]s at the moment. - pub(crate) metadata_registry: MetadataRegistry, + pub metadata_registry: MetadataRegistry, // TODO: part of the size stats /// Used to cache auto-generated cluster cells (`[0]`, `[0, 1]`, `[0, 1, 2]`, ...) diff --git a/crates/re_arrow_store/src/store_gc.rs b/crates/re_arrow_store/src/store_gc.rs index 777ac333160bd..b9cbb9802fe9b 100644 --- a/crates/re_arrow_store/src/store_gc.rs +++ b/crates/re_arrow_store/src/store_gc.rs @@ -1,3 +1,9 @@ +use re_log_types::{RowId, TimeInt}; + +use crate::{store::IndexedBucketInner, DataStore, DataStoreStats}; + +// --- + #[derive(Debug, Clone, Copy)] pub enum GarbageCollectionTarget { /// Try to drop _at least_ the given percentage. @@ -17,4 +23,204 @@ impl std::fmt::Display for GarbageCollectionTarget { } } -// TODO(#1619): Implement garbage collection. +// TODO: test with +// 1. clocks infinite +// 2. canny infinite +// 3. objectron but not insanely fast + +impl DataStore { + /// Triggers a garbage collection according to the desired `target`. + /// + /// Returns the list of `RowId`s that were purged from the store. + /// + /// ## Semantics + /// + /// Garbage collection works a row-level basis and is driven by `RowId` order, i.e. the order + /// defined by the clients' wall-clocks, allowing it to drop data across the different timelines + /// in a fair, deterministic manner. + /// Similarly, out-of-order data is supported out of the box. + /// + /// The garbage collector doesn't deallocate data in and of itself: all it does is drop the + /// store's internal references to that data (the `DataCell`s), which will be deallocated once + /// their reference count reaches 0. + /// + /// ## Limitations + /// + /// The garbage collector is currently unaware of our latest-at semantics, i.e. it will drop + /// old data even if doing so would impact the results of recent queries. + /// See . + // + // TODO(cmc): There shouldn't be any need to return the purged `RowId`s, all secondary + // datastructures should be able to purge themselves based solely off of + // `DataStore::oldest_time_per_timeline`. + // + // TODO(cmc): The GC should be aware of latest-at semantics and make sure they are upheld when + // purging data. + pub fn gc(&mut self, target: GarbageCollectionTarget) -> (Vec, DataStoreStats) { + crate::profile_function!(); + + self.gc_id += 1; + + let stats_before = DataStoreStats::from_store(self); + let initial_nb_rows = stats_before.total_temporal_rows; + // NOTE: only temporal data and row metadata get purged! + let initial_size_bytes = (stats_before.total_temporal_size_bytes + + stats_before.metadata_registry_size_bytes) as f64; + + let row_ids = match target { + GarbageCollectionTarget::DropAtLeastPercentage(p) => { + assert!((0.0..=1.0).contains(&p)); + + let drop_at_least_size_bytes = initial_size_bytes * p; + let target_size_bytes = initial_size_bytes - drop_at_least_size_bytes; + + re_log::debug!( + kind = "gc", + id = self.gc_id, + %target, + initial_nb_rows = re_format::format_large_number(initial_nb_rows as _), + initial_size_bytes = re_format::format_bytes(initial_size_bytes), + target_size_bytes = re_format::format_bytes(target_size_bytes), + drop_at_least_size_bytes = re_format::format_bytes(drop_at_least_size_bytes), + "starting GC" + ); + + self.gc_drop_at_least_size_bytes(drop_at_least_size_bytes) + } + }; + + #[cfg(debug_assertions)] + self.sanity_check().unwrap(); + + let stats_after = DataStoreStats::from_store(self); + let new_nb_rows = stats_after.total_temporal_rows; + // NOTE: only temporal data and row metadata get purged! + let new_size_bytes = (stats_before.total_temporal_size_bytes + + stats_before.metadata_registry_size_bytes) as f64; + + re_log::debug!( + kind = "gc", + id = self.gc_id, + %target, + initial_nb_rows = re_format::format_large_number(initial_nb_rows as _), + initial_size_bytes = re_format::format_bytes(initial_size_bytes), + new_nb_rows = re_format::format_large_number(new_nb_rows as _), + new_size_bytes = re_format::format_bytes(new_size_bytes), + "GC done" + ); + + let stats_diff = stats_before - stats_after; + + (row_ids, stats_diff) + } + + fn gc_drop_at_least_size_bytes(&mut self, mut drop_at_least_size_bytes: f64) -> Vec { + let mut row_ids = Vec::new(); + + while drop_at_least_size_bytes > 0.0 { + // pop next row id + let Some((row_id, timepoint)) = self.metadata_registry.pop_first() else { + break; + }; + drop_at_least_size_bytes -= std::mem::size_of_val(&row_id) as f64; + drop_at_least_size_bytes -= timepoint.size_bytes() as f64; + row_ids.push(row_id); + + let tables = self.tables.iter_mut().filter_map(|((timeline, _), table)| { + timepoint.get(timeline).map(|time| (*time, table)) + }); + + for (time, table) in tables { + let table_has_more_than_one_bucket = table.buckets.len() > 1; + + let (bucket_key, bucket) = table.find_bucket_mut(time); + let bucket_size_bytes = bucket.size_bytes(); + + let (dropped_rows, mut dropped_size_bytes) = { + let inner = &mut *bucket.inner.write(); + inner.sort(); + + let IndexedBucketInner { + is_sorted, + time_range: _, // NOTE: Lazily updated when sorting + col_time, + col_insert_id, + col_row_id, + col_num_instances, + columns, + size_bytes, + } = inner; + + let mut dropped_rows = 0u64; + let mut dropped_size_bytes = 0u64; + + let mut col_index = col_time.binary_search(&time.as_i64()).unwrap_or(0); + + while col_time.get(col_index) == Some(&time.as_i64()) { + if col_row_id[col_index] != row_id { + col_index += 1; + continue; + } + + // col_row_id + let removed_row_id = col_row_id.swap_remove(col_index); + debug_assert_eq!(row_id, removed_row_id); + dropped_size_bytes += std::mem::size_of_val(&removed_row_id) as u64; + + // col_time + let row_time = col_time.swap_remove(col_index); + dropped_size_bytes += std::mem::size_of_val(&row_time) as u64; + + // col_insert_id (if present) + if !col_insert_id.is_empty() { + dropped_size_bytes += + std::mem::size_of_val(&col_insert_id.swap_remove(col_index)) as u64; + } + + // col_num_instances + dropped_size_bytes += + std::mem::size_of_val(&col_num_instances.swap_remove(col_index)) as u64; + + // each data column + for column in columns.values_mut() { + dropped_size_bytes += column + .0 + .swap_remove(col_index) + .map_or(0, |cell| cell.size_bytes()); + } + + *is_sorted = false; + + dropped_rows += 1; + col_index += 1; + } + + *size_bytes -= dropped_size_bytes; + + (dropped_rows, dropped_size_bytes) + }; + + // NOTE: We always need to keep at least one bucket alive, otherwise we have + // nowhere to write to. + if table_has_more_than_one_bucket && bucket.num_rows() == 0 { + // NOTE: We're dropping the bucket itself in this case, rather than just its + // contents. + dropped_size_bytes = bucket_size_bytes; + table.buckets.remove(&bucket_key); + + if bucket_key == TimeInt::MIN { + if let Some((_, bucket)) = table.buckets.pop_first() { + table.buckets.insert(TimeInt::MIN, bucket); + } + } + } + + drop_at_least_size_bytes -= dropped_size_bytes as f64; + table.buckets_size_bytes -= dropped_size_bytes; + table.buckets_num_rows -= dropped_rows; + } + } + + row_ids + } +} diff --git a/crates/re_arrow_store/src/store_read.rs b/crates/re_arrow_store/src/store_read.rs index 74f75c5863636..c559705acd2bb 100644 --- a/crates/re_arrow_store/src/store_read.rs +++ b/crates/re_arrow_store/src/store_read.rs @@ -892,7 +892,7 @@ impl IndexedBucketInner { pub fn sort(&mut self) { let Self { is_sorted, - time_range: _, + time_range, col_time, col_insert_id, col_row_id, @@ -967,6 +967,9 @@ impl IndexedBucketInner { } *is_sorted = true; + + time_range.min = col_time.first().copied().unwrap_or(i64::MAX).into(); + time_range.max = col_time.last().copied().unwrap_or(i64::MIN).into(); } } diff --git a/crates/re_arrow_store/src/store_sanity.rs b/crates/re_arrow_store/src/store_sanity.rs index 0bf32c720cab1..a3ee197f2e53f 100644 --- a/crates/re_arrow_store/src/store_sanity.rs +++ b/crates/re_arrow_store/src/store_sanity.rs @@ -112,6 +112,11 @@ impl IndexedTable { } } + // Run individual bucket sanity check suites too. + for bucket in self.buckets.values() { + bucket.sanity_check()?; + } + // Make sure size values aren't out of sync { let total_size_bytes = self.total_size_bytes(); @@ -125,11 +130,6 @@ impl IndexedTable { } } - // Run individual bucket sanity check suites too. - for bucket in self.buckets.values() { - bucket.sanity_check()?; - } - Ok(()) } } @@ -191,7 +191,7 @@ impl IndexedBucket { } // The cluster column must be fully dense. - { + if self.num_rows() > 0 { let cluster_column = columns .get(cluster_key) @@ -272,7 +272,7 @@ impl PersistentIndexedTable { } // The cluster column must be fully dense. - { + if self.total_rows() > 0 { let cluster_column = columns .get(cluster_key) diff --git a/crates/re_arrow_store/src/store_stats.rs b/crates/re_arrow_store/src/store_stats.rs index 2984e5e388902..47d5d735392d1 100644 --- a/crates/re_arrow_store/src/store_stats.rs +++ b/crates/re_arrow_store/src/store_stats.rs @@ -29,6 +29,32 @@ pub struct DataStoreStats { pub total_size_bytes: u64, } +impl std::ops::Sub for DataStoreStats { + type Output = Self; + + fn sub(self, rhs: Self) -> Self::Output { + Self { + type_registry_rows: self.type_registry_rows - rhs.type_registry_rows, + type_registry_size_bytes: self.type_registry_size_bytes - rhs.type_registry_size_bytes, + metadata_registry_rows: self.metadata_registry_rows - rhs.metadata_registry_rows, + metadata_registry_size_bytes: self.metadata_registry_size_bytes + - rhs.metadata_registry_size_bytes, + total_autogenerated_rows: self.total_autogenerated_rows - rhs.total_autogenerated_rows, + total_autogenerated_size_bytes: self.total_autogenerated_size_bytes + - rhs.total_autogenerated_size_bytes, + total_timeless_rows: self.total_timeless_rows - rhs.total_timeless_rows, + total_timeless_size_bytes: self.total_timeless_size_bytes + - rhs.total_timeless_size_bytes, + total_temporal_rows: self.total_temporal_rows - rhs.total_temporal_rows, + total_temporal_size_bytes: self.total_temporal_size_bytes + - rhs.total_temporal_size_bytes, + total_temporal_buckets: self.total_temporal_buckets - rhs.total_temporal_buckets, + total_rows: self.total_rows - rhs.total_rows, + total_size_bytes: self.total_size_bytes - rhs.total_size_bytes, + } + } +} + impl DataStoreStats { pub fn from_store(store: &DataStore) -> Self { crate::profile_function!(); diff --git a/crates/re_arrow_store/src/test_util.rs b/crates/re_arrow_store/src/test_util.rs index 0fe1b28bf8c26..623866f2d567c 100644 --- a/crates/re_arrow_store/src/test_util.rs +++ b/crates/re_arrow_store/src/test_util.rs @@ -5,24 +5,28 @@ use crate::DataStoreConfig; #[doc(hidden)] #[macro_export] macro_rules! test_row { - ($entity:ident @ $frames:tt => $n:expr; [$c0:expr $(,)*]) => { - ::re_log_types::DataRow::from_cells1( + ($entity:ident @ $frames:tt => $n:expr; [$c0:expr $(,)*]) => {{ + let mut row = ::re_log_types::DataRow::from_cells1( ::re_log_types::RowId::random(), $entity.clone(), $frames, $n, $c0, - ) - }; - ($entity:ident @ $frames:tt => $n:expr; [$c0:expr, $c1:expr $(,)*]) => { - ::re_log_types::DataRow::from_cells2( + ); + row.compute_all_size_bytes(); + row + }}; + ($entity:ident @ $frames:tt => $n:expr; [$c0:expr, $c1:expr $(,)*]) => {{ + let mut row = ::re_log_types::DataRow::from_cells2( ::re_log_types::RowId::random(), $entity.clone(), $frames, $n, ($c0, $c1), - ) - }; + ); + row.compute_all_size_bytes(); + row + }}; } pub fn all_configs() -> impl Iterator { diff --git a/crates/re_arrow_store/tests/correctness.rs b/crates/re_arrow_store/tests/correctness.rs index d6d68fcfe6836..03e41e49697a4 100644 --- a/crates/re_arrow_store/tests/correctness.rs +++ b/crates/re_arrow_store/tests/correctness.rs @@ -6,7 +6,10 @@ use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; use rand::Rng; -use re_arrow_store::{test_row, DataStore, DataStoreConfig, LatestAtQuery, WriteError}; +use re_arrow_store::{ + test_row, DataStore, DataStoreConfig, DataStoreStats, GarbageCollectionTarget, LatestAtQuery, + WriteError, +}; use re_log_types::{ component_types::InstanceKey, datagen::{ @@ -307,64 +310,40 @@ fn gc_correct() { } check_still_readable(&store); - // TODO(#1619): bring back garbage collection - - // let row_id_chunks = store.gc( - // GarbageCollectionTarget::DropAtLeastPercentage(1.0), - // Timeline::new("frame_nr", TimeType::Sequence), - // MsgId::name(), - // ); - - // let row_ids = row_id_chunks - // .iter() - // .flat_map(|chunk| arrow_array_deserialize_iterator::>(&**chunk).unwrap()) - // .map(Option::unwrap) // MsgId is always present - // .collect::>(); - // assert!(!row_ids.is_empty()); - - // if let err @ Err(_) = store.sanity_check() { - // store.sort_indices_if_needed(); - // eprintln!("{store}"); - // err.unwrap(); - // } - // check_still_readable(&store); - // for row_id in &row_ids { - // assert!(store.get_msg_metadata(row_id).is_some()); - // } - - // store.clear_msg_metadata(&row_ids); - - // if let err @ Err(_) = store.sanity_check() { - // store.sort_indices_if_needed(); - // eprintln!("{store}"); - // err.unwrap(); - // } - // check_still_readable(&store); - // for row_id in &row_ids { - // assert!(store.get_msg_metadata(row_id).is_none()); - // } - - // let row_id_chunks = store.gc( - // GarbageCollectionTarget::DropAtLeastPercentage(1.0), - // Timeline::new("frame_nr", TimeType::Sequence), - // MsgId::name(), - // ); - - // let row_ids = row_id_chunks - // .iter() - // .flat_map(|chunk| arrow_array_deserialize_iterator::>(&**chunk).unwrap()) - // .map(Option::unwrap) // MsgId is always present - // .collect::>(); - // assert!(row_ids.is_empty()); - - // if let err @ Err(_) = store.sanity_check() { - // store.sort_indices_if_needed(); - // eprintln!("{store}"); - // err.unwrap(); - // } - // check_still_readable(&store); - - // assert_eq!(2, store.total_temporal_component_rows()); + let stats = DataStoreStats::from_store(&store); + + let (row_ids, stats_diff) = store.gc(GarbageCollectionTarget::DropAtLeastPercentage(1.0)); + assert_eq!(row_ids.len() as u64, stats.total_rows); + assert_eq!( + stats.metadata_registry_rows, + stats_diff.metadata_registry_rows + ); + assert_eq!( + stats.metadata_registry_size_bytes, + stats_diff.metadata_registry_size_bytes + ); + assert_eq!(stats.total_temporal_rows, stats_diff.total_temporal_rows); + + if let err @ Err(_) = store.sanity_check() { + store.sort_indices_if_needed(); + eprintln!("{store}"); + err.unwrap(); + } + check_still_readable(&store); + for row_id in &row_ids { + assert!(store.get_msg_metadata(row_id).is_none()); + } + + let (row_ids, stats_diff) = store.gc(GarbageCollectionTarget::DropAtLeastPercentage(1.0)); + assert!(row_ids.is_empty()); + assert_eq!(DataStoreStats::default(), stats_diff); + + if let err @ Err(_) = store.sanity_check() { + store.sort_indices_if_needed(); + eprintln!("{store}"); + err.unwrap(); + } + check_still_readable(&store); } fn check_still_readable(_store: &DataStore) { diff --git a/crates/re_arrow_store/tests/data_store.rs b/crates/re_arrow_store/tests/data_store.rs index 29af874ae0e32..ae74084f9739b 100644 --- a/crates/re_arrow_store/tests/data_store.rs +++ b/crates/re_arrow_store/tests/data_store.rs @@ -11,8 +11,8 @@ use polars_core::{prelude::*, series::Series}; use polars_ops::prelude::DataFrameJoinOps; use rand::Rng; use re_arrow_store::{ - polars_util, test_row, DataStore, DataStoreConfig, LatestAtQuery, RangeQuery, TimeInt, - TimeRange, + polars_util, test_row, DataStore, DataStoreConfig, DataStoreStats, GarbageCollectionTarget, + LatestAtQuery, RangeQuery, TimeInt, TimeRange, }; use re_log_types::{ component_types::{ColorRGBA, InstanceKey, Point2D, Rect2D}, @@ -28,6 +28,7 @@ use re_log_types::{ // --- LatestComponentsAt --- +// TODO: stress test GC in here #[test] fn all_components() { init_logs(); @@ -259,14 +260,9 @@ fn latest_at() { for config in re_arrow_store::test_util::all_configs() { let mut store = DataStore::new(InstanceKey::name(), config.clone()); latest_at_impl(&mut store); - - // TODO(#1619): bring back garbage collection - // store.gc( - // GarbageCollectionTarget::DropAtLeastPercentage(1.0), - // Timeline::new("frame_nr", TimeType::Sequence), - // MsgId::name(), - // ); - // latest_at_impl(&mut store); + // stress-test GC impl + store.gc(GarbageCollectionTarget::DropAtLeastPercentage(1.0)); + latest_at_impl(&mut store); } } @@ -380,6 +376,10 @@ fn range() { for config in re_arrow_store::test_util::all_configs() { let mut store = DataStore::new(InstanceKey::name(), config.clone()); range_impl(&mut store); + // TODO + // // stress-test GC impl + // store.gc(GarbageCollectionTarget::DropAtLeastPercentage(1.0)); + // range_impl(&mut store); } } @@ -889,29 +889,19 @@ fn gc_impl(store: &mut DataStore) { } _ = store.to_dataframe(); // simple way of checking that everything is still readable - // TODO(#1619): bring back garbage collection - - // let row_id_chunks = store.gc( - // GarbageCollectionTarget::DropAtLeastPercentage(1.0 / 3.0), - // Timeline::new("frame_nr", TimeType::Sequence), - // MsgId::name(), - // ); - - // let row_ids = row_id_chunks - // .iter() - // .flat_map(|chunk| arrow_array_deserialize_iterator::>(&**chunk).unwrap()) - // .map(Option::unwrap) // MsgId is always present - // .collect::>(); + let stats = DataStoreStats::from_store(store); - // for row_id in &row_ids { - // assert!(store.get_msg_metadata(row_id).is_some()); - // } - - // store.clear_msg_metadata(&row_ids); + let (row_ids, stats_diff) = + store.gc(GarbageCollectionTarget::DropAtLeastPercentage(1.0 / 3.0)); + for row_id in &row_ids { + assert!(store.get_msg_metadata(row_id).is_none()); + } - // for row_id in &row_ids { - // assert!(store.get_msg_metadata(row_id).is_none()); - // } + assert!( + (stats.total_size_bytes as f64 * 0.95 / 3.0) as u64 <= stats_diff.total_size_bytes + && stats_diff.total_size_bytes + <= (stats.total_size_bytes as f64 * 1.05 / 3.0) as u64 + ); } } diff --git a/crates/re_arrow_store/tests/dump.rs b/crates/re_arrow_store/tests/dump.rs index 5293870bf882e..dfa07c806bdc6 100644 --- a/crates/re_arrow_store/tests/dump.rs +++ b/crates/re_arrow_store/tests/dump.rs @@ -27,6 +27,14 @@ fn data_store_dump() { let mut store3 = DataStore::new(InstanceKey::name(), config.clone()); data_store_dump_impl(&mut store1, &mut store2, &mut store3); + + // TODO + // // stress-test GC impl + // store1.gc(GarbageCollectionTarget::DropAtLeastPercentage(1.0)); + // store2.gc(GarbageCollectionTarget::DropAtLeastPercentage(1.0)); + // store3.gc(GarbageCollectionTarget::DropAtLeastPercentage(1.0)); + + // data_store_dump_impl(&mut store1, &mut store2, &mut store3); } } @@ -94,12 +102,14 @@ fn data_store_dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: let store2_stats = DataStoreStats::from_store(store2); let store3_stats = DataStoreStats::from_store(store3); assert!( - store1_stats <= store2_stats, + store1_stats.total_temporal_size_bytes <= store2_stats.total_temporal_size_bytes + && store1_stats.total_timeless_size_bytes <= store2_stats.total_timeless_size_bytes, "First store should have <= amount of data of second store:\n\ {store1_stats:#?}\n{store2_stats:#?}" ); assert!( - store2_stats <= store3_stats, + store2_stats.total_temporal_size_bytes <= store3_stats.total_temporal_size_bytes + && store2_stats.total_timeless_size_bytes <= store3_stats.total_timeless_size_bytes, "Second store should have <= amount of data of third store:\n\ {store2_stats:#?}\n{store3_stats:#?}" ); @@ -119,6 +129,13 @@ fn data_store_dump_filtered() { let mut store2 = DataStore::new(InstanceKey::name(), config.clone()); data_store_dump_filtered_impl(&mut store1, &mut store2); + + // TODO + // // stress-test GC impl + // store1.gc(GarbageCollectionTarget::DropAtLeastPercentage(1.0)); + // store2.gc(GarbageCollectionTarget::DropAtLeastPercentage(1.0)); + + // data_store_dump_filtered_impl(&mut store1, &mut store2); } } @@ -182,7 +199,8 @@ fn data_store_dump_filtered_impl(store1: &mut DataStore, store2: &mut DataStore) let store1_stats = DataStoreStats::from_store(store1); let store2_stats = DataStoreStats::from_store(store2); assert!( - store1_stats <= store2_stats, + store1_stats.total_temporal_size_bytes <= store2_stats.total_temporal_size_bytes + && store1_stats.total_timeless_size_bytes <= store2_stats.total_timeless_size_bytes, "First store should have <= amount of data of second store:\n\ {store1_stats:#?}\n{store2_stats:#?}" ); diff --git a/crates/re_data_store/Cargo.toml b/crates/re_data_store/Cargo.toml index 4b5991b37c0a0..607efc62ef3b9 100644 --- a/crates/re_data_store/Cargo.toml +++ b/crates/re_data_store/Cargo.toml @@ -25,10 +25,11 @@ serde = ["dep:serde", "re_log_types/serde"] [dependencies] re_arrow_store.workspace = true +re_format.workspace = true re_int_histogram.workspace = true +re_log.workspace = true re_log_encoding = { workspace = true, optional = true } re_log_types.workspace = true -re_log.workspace = true re_smart_channel.workspace = true ahash.workspace = true diff --git a/crates/re_data_store/src/log_db.rs b/crates/re_data_store/src/log_db.rs index 5821b97cac2a0..453905f28eaae 100644 --- a/crates/re_data_store/src/log_db.rs +++ b/crates/re_data_store/src/log_db.rs @@ -36,7 +36,12 @@ impl Default for EntityDb { tree: crate::EntityTree::root(), data_store: re_arrow_store::DataStore::new( InstanceKey::name(), - DataStoreConfig::default(), + DataStoreConfig { + // NOTE: Empiral testing has shown that 128 is a good balance between sorting + // and binary search costs with the current GC implementation. + indexed_bucket_num_rows: 128, + ..Default::default() + }, ), } } @@ -251,9 +256,16 @@ impl LogDb { crate::profile_function!(); assert!((0.0..=1.0).contains(&fraction_to_purge)); - // TODO(#1619): bring back garbage collection - let drop_row_ids: ahash::HashSet<_> = Default::default(); + let (drop_row_ids, stats_diff) = self.entity_db.data_store.gc( + re_arrow_store::GarbageCollectionTarget::DropAtLeastPercentage(fraction_to_purge as _), + ); + re_log::debug!( + num_events_dropped = drop_row_ids.len(), + size_bytes_dropped = re_format::format_bytes(stats_diff.total_size_bytes as _), + "purged datastore" + ); + let drop_row_ids: ahash::HashSet<_> = drop_row_ids.into_iter().collect(); let cutoff_times = self.entity_db.data_store.oldest_time_per_timeline(); let Self { diff --git a/crates/re_log_types/src/data_cell.rs b/crates/re_log_types/src/data_cell.rs index 316d25f7978df..c5cc6c52403d0 100644 --- a/crates/re_log_types/src/data_cell.rs +++ b/crates/re_log_types/src/data_cell.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use arrow2::datatypes::{DataType, Field}; use itertools::Itertools as _; use crate::{Component, ComponentName, DeserializableComponent, SerializableComponent}; @@ -412,7 +413,6 @@ impl DataCell { pub fn is_sorted_and_unique(&self) -> DataCellResult { use arrow2::{ array::{Array, PrimitiveArray}, - datatypes::DataType, types::NativeType, }; @@ -544,10 +544,80 @@ impl DataCellInner { *size_bytes = (std::mem::size_of_val(name) + std::mem::size_of_val(size_bytes) + std::mem::size_of_val(values)) as u64 + + datatype_size_bytes(self.values.data_type()) + arrow2::compute::aggregate::estimated_bytes_size(&*self.values) as u64; } } +fn datatype_size_bytes(datatype: &DataType) -> u64 { + std::mem::size_of_val(datatype) as u64 + + match datatype { + DataType::Null + | DataType::Binary + | DataType::Boolean + | DataType::Date32 + | DataType::Date64 + | DataType::Float16 + | DataType::Float32 + | DataType::Float64 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Int8 + | DataType::LargeBinary + | DataType::LargeUtf8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::UInt8 + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Duration(_) + | DataType::Interval(_) + | DataType::FixedSizeBinary(_) + | DataType::Decimal(_, _) + | DataType::Decimal256(_, _) + | DataType::Utf8 => 0, + DataType::Timestamp(_, str) => str.as_ref().map_or(0, string_size_bytes), + DataType::List(field) + | DataType::FixedSizeList(field, _) + | DataType::LargeList(field) + | DataType::Map(field, _) => field_size_bytes(field), + DataType::Struct(fields) => fields.iter().map(field_size_bytes).sum(), + DataType::Union(fields, indices, _) => { + fields.iter().map(field_size_bytes).sum::() + + indices.as_ref().map_or(0, |indices| { + std::mem::size_of_val(indices.as_slice()) as u64 + }) + } + DataType::Dictionary(_, datatype, _) => datatype_size_bytes(datatype), + DataType::Extension(name, datatype, extra) => { + datatype_size_bytes(datatype) + + string_size_bytes(name) + + extra.as_ref().map_or(0, string_size_bytes) + } + } +} + +fn field_size_bytes(field: &Field) -> u64 { + let Field { + name, + data_type, + is_nullable, + metadata, + } = field; + + datatype_size_bytes(data_type) + + string_size_bytes(name) + + std::mem::size_of_val(is_nullable) as u64 + + metadata.keys().map(string_size_bytes).sum::() + + metadata.values().map(string_size_bytes).sum::() +} + +fn string_size_bytes(s: &String) -> u64 { + (std::mem::size_of_val(s) + std::mem::size_of_val(s.as_bytes())) as u64 +} + #[test] fn data_cell_sizes() { use crate::{component_types::InstanceKey, Component as _}; @@ -567,8 +637,8 @@ fn data_cell_sizes() { cell.compute_size_bytes(); // only the size of the outer & inner cells themselves - assert_eq!(56, cell.size_bytes()); - assert_eq!(56, cell.size_bytes()); + assert_eq!(120, cell.size_bytes()); + assert_eq!(120, cell.size_bytes()); } // anything else @@ -580,8 +650,8 @@ fn data_cell_sizes() { cell.compute_size_bytes(); // 56 bytes for the inner & outer cells + 3x u64s - assert_eq!(80, cell.size_bytes()); - assert_eq!(80, cell.size_bytes()); + assert_eq!(144, cell.size_bytes()); + assert_eq!(144, cell.size_bytes()); } } diff --git a/crates/re_log_types/src/data_table.rs b/crates/re_log_types/src/data_table.rs index 4c54140be7224..3b73804da6ec8 100644 --- a/crates/re_log_types/src/data_table.rs +++ b/crates/re_log_types/src/data_table.rs @@ -118,6 +118,24 @@ impl DataCellColumn { cell.as_mut().map(|cell| cell.compute_size_bytes()); } } + + // TODO + /// Returns the total (heap) allocated size of the cell in bytes, provided that + /// [`Self::compute_size_bytes`] has been called first (zero otherwise). + /// + /// This is an approximation, accurate enough for most purposes (stats, GC trigger, ...). + /// + /// This is `O(1)`, the value is computed and cached by calling [`Self::compute_size_bytes`]. + #[inline] + pub fn size_bytes(&self) -> u64 { + self.0 + .iter() + .map(|cell| { + std::mem::size_of_val(cell) as u64 + + cell.as_ref().map_or(0, |cell| cell.size_bytes()) + }) + .sum() + } } // --- @@ -513,6 +531,34 @@ impl DataTable { column.compute_all_size_bytes(); } } + + /// Returns the total (heap) allocated size of the table in bytes, provided that + /// [`Self::compute_all_size_bytes`] has been called first (zero otherwise). + /// + /// This is an approximation, accurate enough for most purposes (stats, GC trigger, ...). + #[inline] + pub fn size_bytes(&self) -> u64 { + let Self { + table_id, + col_row_id, + col_timelines, + col_entity_path, + col_num_instances, + columns, + } = self; + + (std::mem::size_of_val(table_id) + + std::mem::size_of_val(col_row_id.as_slice()) + + std::mem::size_of_val(col_entity_path.as_slice()) + + std::mem::size_of_val(col_num_instances.as_slice()) + + (col_timelines.len() * std::mem::size_of::()) + + col_timelines + .values() + .map(|col| std::mem::size_of_val(col.as_slice())) + .sum::() + + (columns.len() * std::mem::size_of::())) as u64 + + columns.values().map(|col| col.size_bytes()).sum::() + } } // --- Serialization --- @@ -979,6 +1025,8 @@ impl DataTable { .downcast_ref::>() .ok_or(DataTableError::NotAColumn(component.to_string()))? .iter() + // TODO: Pretty sure this clones the schema metadata in each and every array -.- + // This'll become a problem as soon as we enable batching. .map(|array| array.map(|values| DataCell::from_arrow(component, values))) .collect(), )) diff --git a/crates/re_log_types/src/time_point/mod.rs b/crates/re_log_types/src/time_point/mod.rs index 79a1eeae526fd..b931203118091 100644 --- a/crates/re_log_types/src/time_point/mod.rs +++ b/crates/re_log_types/src/time_point/mod.rs @@ -87,6 +87,22 @@ impl TimePoint { } self } + + // TODO + // Estimated size in bytes of the registry. + pub fn size_bytes(&self) -> u64 { + type K = Timeline; + type V = TimeInt; + + // NOTE: This is only here to make sure this method fails to compile if the inner type + // changes, as the following size computation assumes types with shallow depths. + let inner: &BTreeMap = &self.0; + + let keys_size_bytes = std::mem::size_of::() * inner.len(); + let values_size_bytes = std::mem::size_of::() * inner.len(); + + (std::mem::size_of_val(self) + keys_size_bytes + values_size_bytes) as u64 + } } // ---------------------------------------------------------------------------- diff --git a/examples/rust/objectron/Cargo.toml b/examples/rust/objectron/Cargo.toml index e96c1b651a733..03c888e7fac47 100644 --- a/examples/rust/objectron/Cargo.toml +++ b/examples/rust/objectron/Cargo.toml @@ -8,7 +8,8 @@ publish = false [dependencies] -rerun = { workspace = true, features = ["web_viewer"] } +# rerun = { workspace = true, features = ["web_viewer"] } +rerun = { workspace = true } anyhow.workspace = true clap = { workspace = true, features = ["derive"] }