From 0f1cd6fbde03aadd73269898516bfec50f498f6b Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 12 Apr 2023 17:34:01 +0200 Subject: [PATCH] Datastore revamp 7: garbage collection --- Cargo.lock | 1 + crates/re_arrow_store/benches/data_store.rs | 49 ++- crates/re_arrow_store/src/lib.rs | 5 +- crates/re_arrow_store/src/store.rs | 94 ++++- crates/re_arrow_store/src/store_arrow.rs | 25 +- crates/re_arrow_store/src/store_dump.rs | 4 +- crates/re_arrow_store/src/store_format.rs | 15 +- crates/re_arrow_store/src/store_gc.rs | 281 +++++++++++++- crates/re_arrow_store/src/store_polars.rs | 2 +- crates/re_arrow_store/src/store_read.rs | 4 +- crates/re_arrow_store/src/store_sanity.rs | 55 ++- crates/re_arrow_store/src/store_stats.rs | 345 ++++++++++-------- crates/re_arrow_store/src/store_write.rs | 80 ++-- crates/re_arrow_store/src/test_util.rs | 30 +- crates/re_arrow_store/tests/correctness.rs | 105 ++---- crates/re_arrow_store/tests/data_store.rs | 106 +++--- crates/re_arrow_store/tests/dump.rs | 60 +-- crates/re_arrow_store/tests/internals.rs | 6 +- crates/re_data_store/Cargo.toml | 3 +- crates/re_data_store/src/log_db.rs | 15 +- crates/re_log_types/src/data_cell.rs | 71 ++-- crates/re_log_types/src/data_row.rs | 11 +- crates/re_log_types/src/data_table.rs | 49 ++- crates/re_log_types/src/lib.rs | 2 + .../re_log_types/src/path/component_name.rs | 10 + crates/re_log_types/src/path/entity_path.rs | 8 + crates/re_log_types/src/size_bytes.rs | 173 +++++++++ crates/re_log_types/src/time_point/mod.rs | 20 +- .../re_log_types/src/time_point/timeline.rs | 9 +- crates/re_log_types/src/time_range.rs | 9 +- crates/re_query/src/query.rs | 4 +- crates/re_viewer/src/ui/memory_panel.rs | 62 ++-- crates/re_viewer/src/ui/time_panel/mod.rs | 9 +- scripts/lint.py | 3 +- 34 files changed, 1226 insertions(+), 499 deletions(-) create mode 100644 crates/re_log_types/src/size_bytes.rs diff --git a/Cargo.lock b/Cargo.lock index 8ca7d48e4b0f..587df51f948f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3849,6 +3849,7 @@ dependencies = [ "puffin", "rand", "re_arrow_store", + "re_format", "re_int_histogram", "re_log", "re_log_encoding", diff --git a/crates/re_arrow_store/benches/data_store.rs b/crates/re_arrow_store/benches/data_store.rs index 385b4f83f904..6e9aeda7a922 100644 --- a/crates/re_arrow_store/benches/data_store.rs +++ b/crates/re_arrow_store/benches/data_store.rs @@ -4,7 +4,10 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; use arrow2::array::UnionArray; use criterion::{criterion_group, criterion_main, Criterion}; -use re_arrow_store::{DataStore, DataStoreConfig, LatestAtQuery, RangeQuery, TimeInt, TimeRange}; +use re_arrow_store::{ + DataStore, DataStoreConfig, GarbageCollectionTarget, LatestAtQuery, RangeQuery, TimeInt, + TimeRange, +}; use re_log_types::{ component_types::{InstanceKey, Rect2D}, datagen::{build_frame_nr, build_some_instances, build_some_rects}, @@ -12,7 +15,7 @@ use re_log_types::{ TimeType, Timeline, }; -criterion_group!(benches, insert, latest_at, latest_at_missing, range); +criterion_group!(benches, insert, latest_at, latest_at_missing, range, gc); criterion_main!(benches); // --- @@ -258,6 +261,48 @@ fn range(c: &mut Criterion) { } } +fn gc(c: &mut Criterion) { + let mut group = c.benchmark_group(format!( + "datastore/num_rows={NUM_ROWS}/num_instances={NUM_INSTANCES}/gc" + )); + group.throughput(criterion::Throughput::Elements( + (NUM_INSTANCES * NUM_ROWS) as _, + )); + + let mut table = build_table(NUM_INSTANCES as usize, false); + table.compute_all_size_bytes(); + + // Default config + group.bench_function("default", |b| { + let store = insert_table(Default::default(), InstanceKey::name(), &table); + b.iter(|| { + let mut store = store.clone(); + let (_, stats_diff) = store.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0 / 3.0)); + stats_diff + }); + }); + + // Emulate more or less bucket + for &num_rows_per_bucket in num_rows_per_bucket() { + group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| { + let store = insert_table( + DataStoreConfig { + indexed_bucket_num_rows: num_rows_per_bucket, + ..Default::default() + }, + InstanceKey::name(), + &table, + ); + b.iter(|| { + let mut store = store.clone(); + let (_, stats_diff) = + store.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0 / 3.0)); + stats_diff + }); + }); + } +} + // --- Helpers --- fn build_table(n: usize, packed: bool) -> DataTable { diff --git a/crates/re_arrow_store/src/lib.rs b/crates/re_arrow_store/src/lib.rs index 4593868e9ec6..0ac2be79a28e 100644 --- a/crates/re_arrow_store/src/lib.rs +++ b/crates/re_arrow_store/src/lib.rs @@ -38,11 +38,12 @@ pub use self::arrow_util::ArrayExt; pub use self::store::{DataStore, DataStoreConfig}; pub use self::store_gc::GarbageCollectionTarget; pub use self::store_read::{LatestAtQuery, RangeQuery}; -pub use self::store_stats::DataStoreStats; +pub use self::store_stats::{DataStoreRowStats, DataStoreStats}; pub use self::store_write::{WriteError, WriteResult}; pub(crate) use self::store::{ - IndexedBucket, IndexedBucketInner, IndexedTable, PersistentIndexedTable, + ClusterCellCache, DataTypeRegistry, IndexedBucket, IndexedBucketInner, IndexedTable, + MetadataRegistry, PersistentIndexedTable, }; // Re-exports diff --git a/crates/re_arrow_store/src/store.rs b/crates/re_arrow_store/src/store.rs index a2dba209616f..15056b70f490 100644 --- a/crates/re_arrow_store/src/store.rs +++ b/crates/re_arrow_store/src/store.rs @@ -9,7 +9,7 @@ use nohash_hasher::{IntMap, IntSet}; use parking_lot::RwLock; use re_log_types::{ ComponentName, DataCell, DataCellColumn, EntityPath, EntityPathHash, ErasedTimeVec, - NumInstancesVec, RowId, RowIdVec, TimeInt, TimePoint, TimeRange, Timeline, + NumInstancesVec, RowId, RowIdVec, SizeBytes, TimeInt, TimePoint, TimeRange, Timeline, }; // --- Data store --- @@ -23,8 +23,13 @@ pub struct DataStoreConfig { /// to a specific timeline _and_ a specific entity. /// /// This effectively puts an upper bound on the number of rows that need to be sorted when an - /// indexed bucket gets out of order. + /// indexed bucket gets out of order (e.g. because of new insertions or a GC pass). /// This is a tradeoff: less rows means faster sorts at the cost of more metadata overhead. + /// In particular: + /// - Query performance scales inversely logarithmically to this number (i.e. it gets better + /// the higher this number gets). + /// - GC performance scales quadratically with this number (i.e. it gets better the lower this + /// number gets). /// /// See [`Self::DEFAULT`] for defaults. pub indexed_bucket_num_rows: u64, @@ -53,7 +58,12 @@ impl Default for DataStoreConfig { impl DataStoreConfig { pub const DEFAULT: Self = Self { - indexed_bucket_num_rows: 1024, + // NOTE: Empirical testing has shown that 512 is a good balance between sorting + // and binary search costs with the current GC implementation. + // + // Garbage collection costs are entirely driven by the number of buckets around, the size + // of the data itself has no impact. + indexed_bucket_num_rows: 512, store_insert_ids: cfg!(debug_assertions), enable_typecheck: cfg!(debug_assertions), }; @@ -67,8 +77,8 @@ pub type InsertIdVec = SmallVec<[u64; 4]>; /// so far. /// /// See also [`DataStore::lookup_datatype`]. -#[derive(Default)] -pub struct DataTypeRegistry(IntMap); +#[derive(Debug, Default, Clone)] +pub struct DataTypeRegistry(pub IntMap); impl std::ops::Deref for DataTypeRegistry { type Target = IntMap; @@ -87,11 +97,11 @@ impl std::ops::DerefMut for DataTypeRegistry { } /// Keeps track of arbitrary per-row metadata. -#[derive(Default)] -pub struct MetadataRegistry(HashMap); +#[derive(Debug, Default, Clone)] +pub struct MetadataRegistry(pub BTreeMap); impl std::ops::Deref for MetadataRegistry { - type Target = HashMap; + type Target = BTreeMap; #[inline] fn deref(&self) -> &Self::Target { @@ -106,6 +116,29 @@ impl std::ops::DerefMut for MetadataRegistry { } } +/// Used to cache auto-generated cluster cells (`[0]`, `[0, 1]`, `[0, 1, 2]`, ...) so that they +/// can be properly deduplicated on insertion. +#[derive(Debug, Default, Clone)] +pub struct ClusterCellCache(pub IntMap); + +impl std::ops::Deref for ClusterCellCache { + type Target = IntMap; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::DerefMut for ClusterCellCache { + #[inline] + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +// --- + /// A complete data store: covers all timelines, all entities, everything. /// /// ## Debugging @@ -148,7 +181,7 @@ pub struct DataStore { /// Used to cache auto-generated cluster cells (`[0]`, `[0, 1]`, `[0, 1, 2]`, ...) /// so that they can be properly deduplicated on insertion. - pub(crate) cluster_cell_cache: IntMap, + pub(crate) cluster_cell_cache: ClusterCellCache, /// All temporal [`IndexedTable`]s for all entities on all timelines. /// @@ -167,10 +200,29 @@ pub struct DataStore { pub(crate) query_id: AtomicU64, /// Monotonically increasing ID for GCs. - #[allow(dead_code)] pub(crate) gc_id: u64, } +impl Clone for DataStore { + fn clone(&self) -> Self { + Self { + cluster_key: self.cluster_key, + config: self.config.clone(), + type_registry: self.type_registry.clone(), + metadata_registry: self.metadata_registry.clone(), + cluster_cell_cache: self.cluster_cell_cache.clone(), + tables: self.tables.clone(), + timeless_tables: self.timeless_tables.clone(), + insert_id: self.insert_id, + query_id: self + .query_id + .load(std::sync::atomic::Ordering::Relaxed) + .into(), + gc_id: self.gc_id, + } + } +} + impl DataStore { /// See [`Self::cluster_key`] for more information about the cluster key. pub fn new(cluster_key: ComponentName, config: DataStoreConfig) -> Self { @@ -293,7 +345,7 @@ fn datastore_internal_repr() { /// ``` // // TODO(#1524): inline visualization once it's back to a manageable state -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct IndexedTable { /// The timeline this table operates in, for debugging purposes. pub timeline: Timeline, @@ -336,7 +388,7 @@ pub struct IndexedTable { impl IndexedTable { pub fn new(cluster_key: ComponentName, timeline: Timeline, ent_path: EntityPath) -> Self { let bucket = IndexedBucket::new(cluster_key, timeline); - let buckets_size_bytes = bucket.size_bytes(); + let buckets_size_bytes = bucket.total_size_bytes(); Self { timeline, ent_path, @@ -364,6 +416,16 @@ pub struct IndexedBucket { pub inner: RwLock, } +impl Clone for IndexedBucket { + fn clone(&self) -> Self { + Self { + timeline: self.timeline, + cluster_key: self.cluster_key, + inner: RwLock::new(self.inner.read().clone()), + } + } +} + impl IndexedBucket { fn new(cluster_key: ComponentName, timeline: Timeline) -> Self { Self { @@ -375,7 +437,7 @@ impl IndexedBucket { } /// See [`IndexedBucket`]; this is a helper struct to simplify interior mutability. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct IndexedBucketInner { /// Are the rows in this table chunk sorted? /// @@ -412,7 +474,8 @@ pub struct IndexedBucketInner { /// (i.e. the table is sparse). pub columns: IntMap, - /// The size of both the control & component data stored in this bucket, in bytes. + /// The size of both the control & component data stored in this bucket, heap and stack + /// included, in bytes. /// /// This is a best-effort approximation, adequate for most purposes (stats, /// triggering GCs, ...). @@ -449,7 +512,8 @@ impl Default for IndexedBucketInner { /// ``` // // TODO(#1524): inline visualization once it's back to a manageable state -#[derive(Debug)] +// TODO(#1807): timeless should be row-id ordered too then +#[derive(Debug, Clone)] pub struct PersistentIndexedTable { /// The entity this table is related to, for debugging purposes. pub ent_path: EntityPath, diff --git a/crates/re_arrow_store/src/store_arrow.rs b/crates/re_arrow_store/src/store_arrow.rs index b7db9157855e..d62d8c3b8f3b 100644 --- a/crates/re_arrow_store/src/store_arrow.rs +++ b/crates/re_arrow_store/src/store_arrow.rs @@ -100,6 +100,11 @@ fn serialize( let mut schema = Schema::default(); let mut columns = Vec::new(); + // NOTE: Empty table / bucket. + if col_row_id.is_empty() { + return Ok((schema, Chunk::new(columns))); + } + { let (control_schema, control_columns) = serialize_control_columns(col_time, col_insert_id, col_row_id, col_num_instances)?; @@ -135,10 +140,13 @@ fn serialize_control_columns( // - time // - num_instances - let (insert_id_field, insert_id_column) = - DataTable::serialize_primitive_column(COLUMN_INSERT_ID, col_insert_id, None)?; - schema.fields.push(insert_id_field); - columns.push(insert_id_column); + // NOTE: Optional column, so make sure it's actually there: + if !col_insert_id.is_empty() { + let (insert_id_field, insert_id_column) = + DataTable::serialize_primitive_column(COLUMN_INSERT_ID, col_insert_id, None)?; + schema.fields.push(insert_id_field); + columns.push(insert_id_column); + } let (row_id_field, row_id_column) = DataTable::serialize_control_column(COLUMN_ROW_ID, col_row_id)?; @@ -187,9 +195,12 @@ fn serialize_data_columns( } for (component, column) in table { - let (field, column) = DataTable::serialize_data_column(component.as_str(), column)?; - schema.fields.push(field); - columns.push(column); + // NOTE: Don't serialize columns with only null values. + if column.iter().any(Option::is_some) { + let (field, column) = DataTable::serialize_data_column(component.as_str(), column)?; + schema.fields.push(field); + columns.push(column); + } } Ok((schema, columns)) diff --git a/crates/re_arrow_store/src/store_dump.rs b/crates/re_arrow_store/src/store_dump.rs index d24f3a317454..31845f75467d 100644 --- a/crates/re_arrow_store/src/store_dump.rs +++ b/crates/re_arrow_store/src/store_dump.rs @@ -48,7 +48,7 @@ impl DataStore { col_row_id: col_row_id.clone(), col_timelines: Default::default(), col_entity_path: std::iter::repeat_with(|| ent_path.clone()) - .take(table.total_rows() as _) + .take(table.num_rows() as _) .collect(), col_num_instances: col_num_instances.clone(), columns: columns.clone(), // shallow @@ -89,7 +89,7 @@ impl DataStore { col_timelines: [(*timeline, col_time.iter().copied().map(Some).collect())] .into(), col_entity_path: std::iter::repeat_with(|| table.ent_path.clone()) - .take(table.total_rows() as _) + .take(table.num_rows() as _) .collect(), col_num_instances: col_num_instances.clone(), columns: columns.clone(), // shallow diff --git a/crates/re_arrow_store/src/store_format.rs b/crates/re_arrow_store/src/store_format.rs index 975f2f81dd23..d21110f87274 100644 --- a/crates/re_arrow_store/src/store_format.rs +++ b/crates/re_arrow_store/src/store_format.rs @@ -1,4 +1,5 @@ use re_format::{format_bytes, format_number}; +use re_log_types::SizeBytes as _; use crate::{DataStore, IndexedBucket, IndexedTable, PersistentIndexedTable}; @@ -34,8 +35,8 @@ impl std::fmt::Display for DataStore { format!( "{} timeless indexed tables, for a total of {} across {} total rows\n", timeless_tables.len(), - format_bytes(self.total_timeless_size_bytes() as _), - format_number(self.total_timeless_rows() as _) + format_bytes(self.timeless_size_bytes() as _), + format_number(self.num_timeless_rows() as _) ), ))?; f.write_str(&indent::indent_all_by(4, "timeless_tables: [\n"))?; @@ -53,8 +54,8 @@ impl std::fmt::Display for DataStore { format!( "{} indexed tables, for a total of {} across {} total rows\n", tables.len(), - format_bytes(self.total_temporal_size_bytes() as _), - format_number(self.total_temporal_rows() as _) + format_bytes(self.temporal_size_bytes() as _), + format_number(self.num_temporal_rows() as _) ), ))?; f.write_str(&indent::indent_all_by(4, "tables: [\n"))?; @@ -94,7 +95,7 @@ impl std::fmt::Display for IndexedTable { "size: {} buckets for a total of {} across {} total rows\n", self.buckets.len(), format_bytes(self.total_size_bytes() as _), - format_number(self.total_rows() as _), + format_number(self.num_rows() as _), ))?; f.write_str("buckets: [\n")?; for (time, bucket) in buckets.iter() { @@ -116,7 +117,7 @@ impl std::fmt::Display for IndexedBucket { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!( "size: {} across {} rows\n", - format_bytes(self.size_bytes() as _), + format_bytes(self.total_size_bytes() as _), format_number(self.num_rows() as _), ))?; @@ -163,7 +164,7 @@ impl std::fmt::Display for PersistentIndexedTable { f.write_fmt(format_args!( "size: {} across {} rows\n", format_bytes(self.total_size_bytes() as _), - format_number(self.total_rows() as _), + format_number(self.num_rows() as _), ))?; let (schema, columns) = self.serialize().map_err(|err| { diff --git a/crates/re_arrow_store/src/store_gc.rs b/crates/re_arrow_store/src/store_gc.rs index 777ac333160b..a1cd819830fd 100644 --- a/crates/re_arrow_store/src/store_gc.rs +++ b/crates/re_arrow_store/src/store_gc.rs @@ -1,20 +1,285 @@ +use re_log_types::{RowId, SizeBytes as _, TimeInt, TimeRange}; + +use crate::{ + store::{IndexedBucketInner, IndexedTable}, + DataStore, DataStoreStats, +}; + +// --- + #[derive(Debug, Clone, Copy)] pub enum GarbageCollectionTarget { - /// Try to drop _at least_ the given percentage. + /// Try to drop _at least_ the given fraction. /// - /// The percentage must be a float in the range [0.0 : 1.0]. - DropAtLeastPercentage(f64), + /// The fraction must be a float in the range [0.0 : 1.0]. + DropAtLeastFraction(f64), } impl std::fmt::Display for GarbageCollectionTarget { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - GarbageCollectionTarget::DropAtLeastPercentage(p) => f.write_fmt(format_args!( - "DropAtLeast({}%)", - re_format::format_f64(*p * 100.0) - )), + GarbageCollectionTarget::DropAtLeastFraction(p) => { + write!(f, "DropAtLeast({:.3}%)", re_format::format_f64(*p * 100.0)) + } } } } -// TODO(#1619): Implement garbage collection. +impl DataStore { + /// Triggers a garbage collection according to the desired `target`. + /// + /// Garbage collection's performance is bounded by the number of buckets in each table (for + /// each `RowId`, we have to find the corresponding bucket, which is roughly `O(log(n))`) as + /// well as the number of rows in each of those buckets (for each `RowId`, we have to sort the + /// corresponding bucket (roughly `O(n*log(n))`) and then find the corresponding row (roughly + /// `O(log(n))`. + /// The size of the data itself has no impact on performance. + /// + /// Returns the list of `RowId`s that were purged from the store. + /// + /// ## Semantics + /// + /// Garbage collection works on a row-level basis and is driven by [`RowId`] order, + /// i.e. the order defined by the clients' wall-clocks, allowing it to drop data across + /// the different timelines + /// in a fair, deterministic manner. + /// Similarly, out-of-order data is supported out of the box. + /// + /// The garbage collector doesn't deallocate data in and of itself: all it does is drop the + /// store's internal references to that data (the `DataCell`s), which will be deallocated once + /// their reference count reaches 0. + /// + /// ## Limitations + /// + /// The garbage collector is currently unaware of our latest-at semantics, i.e. it will drop + /// old data even if doing so would impact the results of recent queries. + /// See . + // + // TODO(#1804): There shouldn't be any need to return the purged `RowId`s, all secondary + // datastructures should be able to purge themselves based solely off of + // [`DataStore::oldest_time_per_timeline`]. + // + // TODO(#1803): The GC should be aware of latest-at semantics and make sure they are upheld + // when purging data. + // + // TODO(#1823): Workload specific optimizations. + pub fn gc(&mut self, target: GarbageCollectionTarget) -> (Vec, DataStoreStats) { + crate::profile_function!(); + + self.gc_id += 1; + + // NOTE: only temporal data and row metadata get purged! + let stats_before = DataStoreStats::from_store(self); + let initial_num_rows = + stats_before.temporal.num_rows + stats_before.metadata_registry.num_rows; + let initial_num_bytes = + (stats_before.temporal.num_bytes + stats_before.metadata_registry.num_bytes) as f64; + + let row_ids = match target { + GarbageCollectionTarget::DropAtLeastFraction(p) => { + assert!((0.0..=1.0).contains(&p)); + + let num_bytes_to_drop = initial_num_bytes * p; + let target_num_bytes = initial_num_bytes - num_bytes_to_drop; + + re_log::debug!( + kind = "gc", + id = self.gc_id, + %target, + initial_num_rows = re_format::format_large_number(initial_num_rows as _), + initial_num_bytes = re_format::format_bytes(initial_num_bytes), + target_num_bytes = re_format::format_bytes(target_num_bytes), + drop_at_least_num_bytes = re_format::format_bytes(num_bytes_to_drop), + "starting GC" + ); + + self.gc_drop_at_least_num_bytes(num_bytes_to_drop) + } + }; + + #[cfg(debug_assertions)] + self.sanity_check().unwrap(); + + // NOTE: only temporal data and row metadata get purged! + let stats_after = DataStoreStats::from_store(self); + let new_num_rows = stats_after.temporal.num_rows + stats_after.metadata_registry.num_rows; + let new_num_bytes = + (stats_after.temporal.num_bytes + stats_after.metadata_registry.num_bytes) as f64; + + re_log::debug!( + kind = "gc", + id = self.gc_id, + %target, + initial_num_rows = re_format::format_large_number(initial_num_rows as _), + initial_num_bytes = re_format::format_bytes(initial_num_bytes), + new_num_rows = re_format::format_large_number(new_num_rows as _), + new_num_bytes = re_format::format_bytes(new_num_bytes), + "GC done" + ); + + let stats_diff = stats_before - stats_after; + + (row_ids, stats_diff) + } + + /// Tries to drop _at least_ `num_bytes_to_drop` bytes of data from the store. + /// + /// Returns the list of `RowId`s that were purged from the store. + fn gc_drop_at_least_num_bytes(&mut self, mut num_bytes_to_drop: f64) -> Vec { + crate::profile_function!(); + + let mut row_ids = Vec::new(); + + // The algorithm is straightforward: + // 1. Pop the oldest `RowId` available + // 2. Find all tables that potentially hold data associated with that `RowId` + // 3. Drop the associated row and account for the space we got back + while num_bytes_to_drop > 0.0 { + // pop next row id + let Some((row_id, timepoint)) = self.metadata_registry.pop_first() else { + break; + }; + num_bytes_to_drop -= row_id.total_size_bytes() as f64; + num_bytes_to_drop -= timepoint.total_size_bytes() as f64; + row_ids.push(row_id); + + // find all tables that could possibly contain this `RowId` + let tables = self.tables.iter_mut().filter_map(|((timeline, _), table)| { + timepoint.get(timeline).map(|time| (*time, table)) + }); + + for (time, table) in tables { + num_bytes_to_drop -= table.try_drop_row(row_id, time.as_i64()) as f64; + } + } + + row_ids + } +} + +impl IndexedTable { + /// Tries to drop the given `row_id` from the table, which is expected to be found at the + /// specified `time`. + /// + /// Returns how many bytes were actually dropped, or zero if the row wasn't found. + fn try_drop_row(&mut self, row_id: RowId, time: i64) -> u64 { + crate::profile_function!(); + + let table_has_more_than_one_bucket = self.buckets.len() > 1; + + let (bucket_key, bucket) = self.find_bucket_mut(time.into()); + let bucket_num_bytes = bucket.total_size_bytes(); + + let mut dropped_num_bytes = { + let inner = &mut *bucket.inner.write(); + inner.try_drop_row(row_id, time) + }; + + // NOTE: We always need to keep at least one bucket alive, otherwise we have + // nowhere to write to. + if table_has_more_than_one_bucket && bucket.num_rows() == 0 { + // NOTE: We're dropping the bucket itself in this case, rather than just its + // contents. + debug_assert!( + dropped_num_bytes <= bucket_num_bytes, + "Bucket contained more bytes than it thought" + ); + dropped_num_bytes = bucket_num_bytes; + self.buckets.remove(&bucket_key); + + // NOTE: If this is the first bucket of the table that we've just removed, we need the + // next one to become responsible for `-∞`. + if bucket_key == TimeInt::MIN { + if let Some((_, bucket)) = self.buckets.pop_first() { + self.buckets.insert(TimeInt::MIN, bucket); + } + } + } + + self.buckets_size_bytes -= dropped_num_bytes; + self.buckets_num_rows -= (dropped_num_bytes > 0) as u64; + + dropped_num_bytes + } +} + +impl IndexedBucketInner { + /// Tries to drop the given `row_id` from the table, which is expected to be found at the + /// specified `time`. + /// + /// Returns how many bytes were actually dropped, or zero if the row wasn't found. + fn try_drop_row(&mut self, row_id: RowId, time: i64) -> u64 { + crate::profile_function!(); + + self.sort(); + + let IndexedBucketInner { + is_sorted, + time_range, + col_time, + col_insert_id, + col_row_id, + col_num_instances, + columns, + size_bytes, + } = self; + + let mut dropped_num_bytes = 0u64; + + let mut row_index = col_time.partition_point(|&time2| time2 < time); + while col_time.get(row_index) == Some(&time) { + if col_row_id[row_index] != row_id { + row_index += 1; + continue; + } + + // Update the time_range min/max: + if col_time.len() == 1 { + // We removed the last row + *time_range = TimeRange::EMPTY; + } else { + *is_sorted = false; + + // We have at least two rows, so we can safely [index] here: + if row_index == 0 { + // We removed the first row, so the second row holds the new min + time_range.min = col_time[1].into(); + } + if row_index + 1 == col_time.len() { + // We removed the last row, so the penultimate row holds the new max + time_range.max = col_time[row_index - 1].into(); + } + } + + // col_row_id + let removed_row_id = col_row_id.swap_remove(row_index); + debug_assert_eq!(row_id, removed_row_id); + dropped_num_bytes += removed_row_id.total_size_bytes(); + + // col_time + let row_time = col_time.swap_remove(row_index); + dropped_num_bytes += row_time.total_size_bytes(); + + // col_insert_id (if present) + if !col_insert_id.is_empty() { + dropped_num_bytes += col_insert_id.swap_remove(row_index).total_size_bytes(); + } + + // col_num_instances + dropped_num_bytes += col_num_instances.swap_remove(row_index).total_size_bytes(); + + // each data column + for column in columns.values_mut() { + dropped_num_bytes += column.0.swap_remove(row_index).total_size_bytes(); + } + + // NOTE: A single `RowId` cannot possibly have more than one datapoint for + // a single timeline. + break; + } + + *size_bytes -= dropped_num_bytes; + + dropped_num_bytes + } +} diff --git a/crates/re_arrow_store/src/store_polars.rs b/crates/re_arrow_store/src/store_polars.rs index 6ef71c714377..00a233c70d6f 100644 --- a/crates/re_arrow_store/src/store_polars.rs +++ b/crates/re_arrow_store/src/store_polars.rs @@ -178,7 +178,7 @@ impl PersistentIndexedTable { columns, } = self; - let num_rows = self.total_rows() as usize; + let num_rows = self.num_rows() as usize; let insert_ids = config .store_insert_ids diff --git a/crates/re_arrow_store/src/store_read.rs b/crates/re_arrow_store/src/store_read.rs index 74f75c586363..02c753ce4be3 100644 --- a/crates/re_arrow_store/src/store_read.rs +++ b/crates/re_arrow_store/src/store_read.rs @@ -1001,7 +1001,7 @@ impl PersistentIndexedTable { ); // find the primary row number's row. - let primary_row_nr = self.total_rows() - 1; + let primary_row_nr = self.num_rows() - 1; trace!( kind = "latest_at", @@ -1085,7 +1085,7 @@ impl PersistentIndexedTable { // for building the returned iterator. crate::profile_function!(); - let cells = (0..self.total_rows()).filter_map(move |row_nr| { + let cells = (0..self.num_rows()).filter_map(move |row_nr| { let mut cells = [(); N].map(|_| None); for (i, component) in components.iter().enumerate() { if let Some(column) = self.columns.get(component) { diff --git a/crates/re_arrow_store/src/store_sanity.rs b/crates/re_arrow_store/src/store_sanity.rs index 0bf32c720cab..eba65eb70011 100644 --- a/crates/re_arrow_store/src/store_sanity.rs +++ b/crates/re_arrow_store/src/store_sanity.rs @@ -1,5 +1,6 @@ use re_log_types::{ - ComponentName, DataCellColumn, COLUMN_NUM_INSTANCES, COLUMN_ROW_ID, COLUMN_TIMEPOINT, + ComponentName, DataCellColumn, SizeBytes as _, TimeRange, COLUMN_NUM_INSTANCES, COLUMN_ROW_ID, + COLUMN_TIMEPOINT, }; use crate::{DataStore, IndexedBucket, IndexedBucketInner, IndexedTable, PersistentIndexedTable}; @@ -11,6 +12,11 @@ use crate::{DataStore, IndexedBucket, IndexedBucketInner, IndexedTable, Persiste /// These violations can only stem from a bug in the store's implementation itself. #[derive(thiserror::Error, Debug)] pub enum SanityError { + #[error( + "Reported time range for indexed bucket is out of sync: got {got:?}, expected {expected:?}" + )] + TimeRangeOutOfSync { expected: TimeRange, got: TimeRange }, + #[error("Reported size for {origin} is out of sync: got {got}, expected {expected}")] SizeOutOfSync { origin: &'static str, @@ -101,21 +107,26 @@ impl IndexedTable { // Make sure row numbers aren't out of sync { - let total_rows = self.total_rows(); - let total_rows_uncached = self.total_rows_uncached(); - if total_rows != total_rows_uncached { + let num_rows = self.num_rows(); + let num_rows_uncached = self.num_rows_uncached(); + if num_rows != num_rows_uncached { return Err(SanityError::RowsOutOfSync { origin: std::any::type_name::(), - expected: re_format::format_number(total_rows_uncached as _), - got: re_format::format_number(total_rows as _), + expected: re_format::format_number(num_rows_uncached as _), + got: re_format::format_number(num_rows as _), }); } } + // Run individual bucket sanity check suites too. + for bucket in self.buckets.values() { + bucket.sanity_check()?; + } + // Make sure size values aren't out of sync { let total_size_bytes = self.total_size_bytes(); - let total_size_bytes_uncached = self.total_size_bytes_uncached(); + let total_size_bytes_uncached = self.size_bytes_uncached(); if total_size_bytes != total_size_bytes_uncached { return Err(SanityError::SizeOutOfSync { origin: std::any::type_name::(), @@ -125,11 +136,6 @@ impl IndexedTable { } } - // Run individual bucket sanity check suites too. - for bucket in self.buckets.values() { - bucket.sanity_check()?; - } - Ok(()) } } @@ -150,7 +156,7 @@ impl IndexedBucket { { let IndexedBucketInner { is_sorted: _, - time_range: _, + time_range, col_time, col_insert_id, col_row_id, @@ -159,6 +165,23 @@ impl IndexedBucket { size_bytes: _, } = &*inner.read(); + // Time ranges are eagerly maintained. + { + let mut times = col_time.clone(); + times.sort(); + + let expected_min = times.first().copied().unwrap_or(i64::MAX).into(); + let expected_max = times.last().copied().unwrap_or(i64::MIN).into(); + let expected_time_range = TimeRange::new(expected_min, expected_max); + + if expected_time_range != *time_range { + return Err(SanityError::TimeRangeOutOfSync { + expected: expected_time_range, + got: *time_range, + }); + } + } + // All columns should be `Self::num_rows` long. { let num_rows = self.num_rows(); @@ -191,7 +214,7 @@ impl IndexedBucket { } // The cluster column must be fully dense. - { + if self.num_rows() > 0 { let cluster_column = columns .get(cluster_key) @@ -243,7 +266,7 @@ impl PersistentIndexedTable { // All columns should be `Self::num_rows` long. { - let num_rows = self.total_rows(); + let num_rows = self.num_rows(); let column_lengths = [ (!col_insert_id.is_empty()) @@ -272,7 +295,7 @@ impl PersistentIndexedTable { } // The cluster column must be fully dense. - { + if self.num_rows() > 0 { let cluster_column = columns .get(cluster_key) diff --git a/crates/re_arrow_store/src/store_stats.rs b/crates/re_arrow_store/src/store_stats.rs index 11c91bfc6223..612c03124855 100644 --- a/crates/re_arrow_store/src/store_stats.rs +++ b/crates/re_arrow_store/src/store_stats.rs @@ -1,71 +1,184 @@ use nohash_hasher::IntMap; -use re_log_types::{ComponentName, DataCellColumn}; +use re_log_types::{ComponentName, SizeBytes, TimePoint}; use crate::{ - store::IndexedBucketInner, DataStore, IndexedBucket, IndexedTable, PersistentIndexedTable, + store::IndexedBucketInner, ClusterCellCache, DataStore, DataTypeRegistry, IndexedBucket, + IndexedTable, MetadataRegistry, PersistentIndexedTable, }; // --- +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd)] +pub struct DataStoreRowStats { + pub num_rows: u64, + pub num_bytes: u64, +} + +impl std::ops::Sub for DataStoreRowStats { + type Output = Self; + + fn sub(self, rhs: Self) -> Self::Output { + Self { + num_rows: self.num_rows - rhs.num_rows, + num_bytes: self.num_bytes - rhs.num_bytes, + } + } +} + +impl std::ops::Add for DataStoreRowStats { + type Output = Self; + + fn add(self, rhs: Self) -> Self::Output { + Self { + num_rows: self.num_rows + rhs.num_rows, + num_bytes: self.num_bytes + rhs.num_bytes, + } + } +} + #[derive(Default, Debug, Clone, PartialEq, Eq, PartialOrd)] pub struct DataStoreStats { - pub total_timeless_rows: u64, - pub total_timeless_size_bytes: u64, + pub type_registry: DataStoreRowStats, + pub metadata_registry: DataStoreRowStats, + pub autogenerated: DataStoreRowStats, + pub timeless: DataStoreRowStats, + pub temporal: DataStoreRowStats, + pub temporal_buckets: u64, + pub total: DataStoreRowStats, +} + +impl std::ops::Sub for DataStoreStats { + type Output = Self; + + fn sub(self, rhs: Self) -> Self::Output { + Self { + type_registry: self.type_registry - rhs.type_registry, + metadata_registry: self.metadata_registry - rhs.metadata_registry, + autogenerated: self.autogenerated - rhs.autogenerated, + timeless: self.timeless - rhs.timeless, + temporal: self.temporal - rhs.temporal, + temporal_buckets: self.temporal_buckets - rhs.temporal_buckets, + total: self.total - rhs.total, + } + } +} - pub total_temporal_rows: u64, - pub total_temporal_size_bytes: u64, - pub total_temporal_buckets: u64, +impl std::ops::Add for DataStoreStats { + type Output = Self; - pub total_rows: u64, - pub total_size_bytes: u64, + fn add(self, rhs: Self) -> Self::Output { + Self { + type_registry: self.type_registry + rhs.type_registry, + metadata_registry: self.metadata_registry + rhs.metadata_registry, + autogenerated: self.autogenerated + rhs.autogenerated, + timeless: self.timeless + rhs.timeless, + temporal: self.temporal + rhs.temporal, + temporal_buckets: self.temporal_buckets + rhs.temporal_buckets, + total: self.total + rhs.total, + } + } } impl DataStoreStats { pub fn from_store(store: &DataStore) -> Self { crate::profile_function!(); - let total_timeless_rows = store.total_timeless_rows(); - let total_timeless_size_bytes = store.total_timeless_size_bytes(); + let type_registry = DataStoreRowStats { + num_rows: store.type_registry.len() as _, + num_bytes: store.type_registry.total_size_bytes(), + }; + + let metadata_registry = DataStoreRowStats { + num_rows: store.metadata_registry.len() as _, + num_bytes: store.metadata_registry.total_size_bytes(), + }; + + let autogenerated = DataStoreRowStats { + num_rows: store.cluster_cell_cache.len() as _, + num_bytes: store.cluster_cell_cache.total_size_bytes(), + }; + + let timeless = DataStoreRowStats { + num_rows: store.num_timeless_rows(), + num_bytes: store.timeless_size_bytes(), + }; + + let temporal = DataStoreRowStats { + num_rows: store.num_temporal_rows(), + num_bytes: store.temporal_size_bytes(), + }; + let temporal_buckets = store.num_temporal_buckets(); + + let total = DataStoreRowStats { + num_rows: timeless.num_rows + temporal.num_rows, + num_bytes: type_registry.num_bytes + + metadata_registry.num_bytes + + autogenerated.num_bytes + + timeless.num_bytes + + temporal.num_bytes, + }; - let total_temporal_rows = store.total_temporal_rows(); - let total_temporal_size_bytes = store.total_temporal_size_bytes(); - let total_temporal_buckets = store.total_temporal_buckets(); + Self { + type_registry, + metadata_registry, + autogenerated, + timeless, + temporal, + temporal_buckets, + total, + } + } +} - let total_rows = total_timeless_rows + total_temporal_rows; - let total_size_bytes = total_timeless_size_bytes + total_temporal_size_bytes; +// --- Data store --- - Self { - total_timeless_rows, - total_timeless_size_bytes, +impl SizeBytes for DataTypeRegistry { + #[inline] + fn heap_size_bytes(&self) -> u64 { + type K = ComponentName; - total_temporal_rows, - total_temporal_size_bytes, - total_temporal_buckets, + // NOTE: This is only here to make sure this method fails to compile if the inner type + // changes, as the following size computation assumes POD types. + let inner: &IntMap = &self.0; - total_rows, - total_size_bytes, - } + let keys_size_bytes = std::mem::size_of::() * inner.len(); + // NOTE: It's all on the heap at this point. + let values_size_bytes = self.values().map(SizeBytes::total_size_bytes).sum::(); + + keys_size_bytes as u64 + values_size_bytes } } -// --- Data store --- +impl SizeBytes for MetadataRegistry { + #[inline] + fn heap_size_bytes(&self) -> u64 { + self.0.heap_size_bytes() + } +} + +impl SizeBytes for ClusterCellCache { + #[inline] + fn heap_size_bytes(&self) -> u64 { + self.0.heap_size_bytes() + } +} impl DataStore { /// Returns the number of timeless index rows stored across this entire store, i.e. the sum of /// the number of rows across all of its timeless indexed tables. #[inline] - pub fn total_timeless_rows(&self) -> u64 { + pub fn num_timeless_rows(&self) -> u64 { crate::profile_function!(); self.timeless_tables .values() - .map(|table| table.total_rows()) + .map(|table| table.num_rows()) .sum() } /// Returns the size of the timeless index data stored across this entire store, i.e. the sum /// of the size of the data stored across all of its timeless indexed tables, in bytes. #[inline] - pub fn total_timeless_size_bytes(&self) -> u64 { + pub fn timeless_size_bytes(&self) -> u64 { crate::profile_function!(); self.timeless_tables .values() @@ -76,15 +189,15 @@ impl DataStore { /// Returns the number of temporal index rows stored across this entire store, i.e. the sum of /// the number of rows across all of its temporal indexed tables. #[inline] - pub fn total_temporal_rows(&self) -> u64 { + pub fn num_temporal_rows(&self) -> u64 { crate::profile_function!(); - self.tables.values().map(|table| table.total_rows()).sum() + self.tables.values().map(|table| table.num_rows()).sum() } /// Returns the size of the temporal index data stored across this entire store, i.e. the sum /// of the size of the data stored across all of its temporal indexed tables, in bytes. #[inline] - pub fn total_temporal_size_bytes(&self) -> u64 { + pub fn temporal_size_bytes(&self) -> u64 { crate::profile_function!(); self.tables .values() @@ -94,12 +207,9 @@ impl DataStore { /// Returns the number of temporal indexed buckets stored across this entire store. #[inline] - pub fn total_temporal_buckets(&self) -> u64 { + pub fn num_temporal_buckets(&self) -> u64 { crate::profile_function!(); - self.tables - .values() - .map(|table| table.total_buckets()) - .sum() + self.tables.values().map(|table| table.num_buckets()).sum() } } @@ -109,7 +219,7 @@ impl IndexedTable { /// Returns the number of rows stored across this entire table, i.e. the sum of the number /// of rows stored across all of its buckets. #[inline] - pub fn total_rows(&self) -> u64 { + pub fn num_rows(&self) -> u64 { self.buckets_num_rows } @@ -118,76 +228,33 @@ impl IndexedTable { /// /// Recomputed from scratch, for sanity checking. #[inline] - pub(crate) fn total_rows_uncached(&self) -> u64 { + pub(crate) fn num_rows_uncached(&self) -> u64 { crate::profile_function!(); self.buckets.values().map(|bucket| bucket.num_rows()).sum() } - /// The size of both the control & component data stored in this table, across all of its - /// buckets, in bytes. - /// - /// This is a best-effort approximation, adequate for most purposes (stats, - /// triggering GCs, ...). #[inline] - pub fn total_size_bytes(&self) -> u64 { + pub(crate) fn size_bytes_uncached(&self) -> u64 { crate::profile_function!(); - - let Self { - timeline, - ent_path, - cluster_key, - buckets: _, - all_components, - buckets_num_rows: _, - buckets_size_bytes, - } = self; - - let size_bytes = std::mem::size_of_val(timeline) - + std::mem::size_of_val(ent_path) - + std::mem::size_of_val(cluster_key) - + (all_components.len() * std::mem::size_of::()); - - size_bytes as u64 + buckets_size_bytes + self.stack_size_bytes() + + self + .buckets + .values() + .map(|bucket| bucket.total_size_bytes()) + .sum::() } - /// The size of both the control & component data stored in this table, across all of its - /// buckets, in bytes. - /// - /// This is a best-effort approximation, adequate for most purposes (stats, - /// triggering GCs, ...). - /// - /// Recomputed from scratch, for sanity checking. + /// Returns the number of buckets stored across this entire table. #[inline] - pub(crate) fn total_size_bytes_uncached(&self) -> u64 { - crate::profile_function!(); - - let Self { - timeline, - ent_path, - cluster_key, - buckets, - all_components, - buckets_num_rows: _, - buckets_size_bytes: _, - } = self; - - let buckets_size_bytes = buckets - .values() - .map(|bucket| bucket.size_bytes()) - .sum::(); - - let size_bytes = std::mem::size_of_val(timeline) - + std::mem::size_of_val(ent_path) - + std::mem::size_of_val(cluster_key) - + (all_components.len() * std::mem::size_of::()); - - size_bytes as u64 + buckets_size_bytes + pub fn num_buckets(&self) -> u64 { + self.buckets.len() as _ } +} - /// Returns the number of buckets stored across this entire table. +impl SizeBytes for IndexedTable { #[inline] - pub fn total_buckets(&self) -> u64 { - self.buckets.len() as _ + fn heap_size_bytes(&self) -> u64 { + self.buckets_size_bytes } } @@ -198,29 +265,18 @@ impl IndexedBucket { crate::profile_function!(); self.inner.read().col_time.len() as u64 } +} - /// The size of both the control & component data stored in this bucket, in bytes. - /// - /// This is a best-effort approximation, adequate for most purposes (stats, - /// triggering GCs, ...). +impl SizeBytes for IndexedBucket { #[inline] - pub fn size_bytes(&self) -> u64 { - crate::profile_function!(); - - let Self { - timeline, - cluster_key, - inner, - } = self; - - (std::mem::size_of_val(timeline) + std::mem::size_of_val(cluster_key)) as u64 - + inner.read().size_bytes + fn heap_size_bytes(&self) -> u64 { + self.inner.read().size_bytes } } impl IndexedBucketInner { /// Computes and caches the size of both the control & component data stored in this bucket, - /// in bytes. + /// stack and heap included, in bytes. /// /// This is a best-effort approximation, adequate for most purposes (stats, /// triggering GCs, ...). @@ -239,17 +295,14 @@ impl IndexedBucketInner { size_bytes, } = self; - let control_size_bytes = std::mem::size_of_val(is_sorted) - + std::mem::size_of_val(time_range) - + std::mem::size_of_val(col_time.as_slice()) - + std::mem::size_of_val(col_insert_id.as_slice()) - + std::mem::size_of_val(col_row_id.as_slice()) - + std::mem::size_of_val(col_num_instances.as_slice()) - + std::mem::size_of_val(size_bytes); - - let data_size_bytes = compute_columns_size_bytes(columns); - - *size_bytes = control_size_bytes as u64 + data_size_bytes; + *size_bytes = is_sorted.total_size_bytes() + + time_range.total_size_bytes() + + col_time.total_size_bytes() + + col_insert_id.total_size_bytes() + + col_row_id.total_size_bytes() + + col_num_instances.total_size_bytes() + + columns.total_size_bytes() + + size_bytes.total_size_bytes(); *size_bytes } @@ -260,16 +313,14 @@ impl IndexedBucketInner { impl PersistentIndexedTable { /// Returns the number of rows stored across this table. #[inline] - pub fn total_rows(&self) -> u64 { + pub fn num_rows(&self) -> u64 { self.col_num_instances.len() as _ } +} - /// The size of both the control & component data stored in this table, in bytes. - /// - /// This is a best-effort approximation, adequate for most purposes (stats, - /// triggering GCs, ...). +impl SizeBytes for PersistentIndexedTable { #[inline] - pub fn total_size_bytes(&self) -> u64 { + fn heap_size_bytes(&self) -> u64 { crate::profile_function!(); let Self { @@ -281,35 +332,11 @@ impl PersistentIndexedTable { columns, } = self; - let control_size_bytes = std::mem::size_of_val(ent_path) - + std::mem::size_of_val(cluster_key) - + std::mem::size_of_val(col_insert_id.as_slice()) - + std::mem::size_of_val(col_row_id.as_slice()) - + std::mem::size_of_val(col_num_instances.as_slice()); - - let data_size_bytes = compute_columns_size_bytes(columns); - - control_size_bytes as u64 + data_size_bytes + ent_path.total_size_bytes() + + cluster_key.total_size_bytes() + + col_insert_id.total_size_bytes() + + col_row_id.total_size_bytes() + + col_num_instances.total_size_bytes() + + columns.total_size_bytes() } } - -// --- Common --- - -/// Computes the size in bytes of an entire table's worth of arrow data. -fn compute_columns_size_bytes(columns: &IntMap) -> u64 { - crate::profile_function!(); - let keys = (columns.keys().len() * std::mem::size_of::()) as u64; - let cells = columns - .values() - .flat_map(|column| column.iter()) - .flatten() // option - .map(|cell| cell.size_bytes()) - .sum::(); - keys + cells -} - -#[test] -fn compute_table_size_bytes_ignore_headers() { - let columns = Default::default(); - assert_eq!(0, compute_columns_size_bytes(&columns)); -} diff --git a/crates/re_arrow_store/src/store_write.rs b/crates/re_arrow_store/src/store_write.rs index 2f68bedf445c..853bdf4b222c 100644 --- a/crates/re_arrow_store/src/store_write.rs +++ b/crates/re_arrow_store/src/store_write.rs @@ -7,12 +7,12 @@ use smallvec::SmallVec; use re_log::{debug, trace}; use re_log_types::{ component_types::InstanceKey, ComponentName, DataCell, DataCellColumn, DataCellError, DataRow, - DataTable, TimeInt, TimeRange, + DataTable, RowId, SizeBytes as _, TimeInt, TimePoint, TimeRange, }; use crate::{ - DataStore, DataStoreConfig, IndexedBucket, IndexedBucketInner, IndexedTable, - PersistentIndexedTable, + store::MetadataRegistry, DataStore, DataStoreConfig, IndexedBucket, IndexedBucketInner, + IndexedTable, PersistentIndexedTable, }; // TODO(#1619): @@ -184,12 +184,18 @@ impl DataStore { } } - // This is valuable information even for a timeless timepoint! - self.metadata_registry.insert(*row_id, timepoint.clone()); + self.metadata_registry.upsert(*row_id, timepoint.clone()); Ok(()) } + /// Wipes all timeless data. + /// + /// Mostly useful for testing/debugging purposes. + pub fn wipe_timeless_data(&mut self) { + self.timeless_tables = Default::default(); + } + /// Auto-generates an appropriate cluster cell for the specified number of instances and /// transparently handles caching. // TODO(#1777): shared slices for auto generated keys @@ -225,6 +231,29 @@ impl DataStore { } } +impl MetadataRegistry { + fn upsert(&mut self, row_id: RowId, timepoint: TimePoint) { + // This is valuable information even for a timeless timepoint! + match self.entry(row_id) { + std::collections::btree_map::Entry::Vacant(entry) => { + entry.insert(timepoint); + } + // NOTE: When saving and loading data from disk, it's very possible that we try to + // insert data for a single `RowId` in multiple calls (buckets are per-timeline, so a + // single `RowId` can get spread across multiple buckets)! + std::collections::btree_map::Entry::Occupied(mut entry) => { + let entry = entry.get_mut(); + for (timeline, time) in timepoint { + if let Some(old_time) = entry.insert(timeline, time) { + re_log::error!(%row_id, ?timeline, old_time = ?old_time, new_time = ?time, "detected re-used `RowId/Timeline` pair, this is illegal and will lead to undefined behavior in the datastore"); + debug_assert!(false, "detected re-used `RowId/Timeline`"); + } + } + } + } + } +} + // --- Temporal --- impl IndexedTable { @@ -250,7 +279,7 @@ impl IndexedTable { let len_overflow = len > config.indexed_bucket_num_rows; if len_overflow { - let bucket_size_before = bucket.size_bytes(); + let bucket_size_before = bucket.total_size_bytes(); if let Some((min, second_half)) = bucket.split() { trace!( kind = "insert", @@ -263,7 +292,8 @@ impl IndexedTable { "splitting off indexed bucket following overflow" ); - self.buckets_size_bytes += bucket.size_bytes() + second_half.size_bytes(); + self.buckets_size_bytes += + bucket.total_size_bytes() + second_half.total_size_bytes(); self.buckets_size_bytes -= bucket_size_before; self.buckets.insert(min, second_half); @@ -393,26 +423,28 @@ impl IndexedBucket { // append time to primary column and update time range appropriately col_time.push(time.as_i64()); *time_range = TimeRange::new(time_range.min.min(time), time_range.max.max(time)); - size_bytes_added += std::mem::size_of_val(&time.as_i64()) as u64; + size_bytes_added += time.as_i64().total_size_bytes(); // update all control columns if let Some(insert_id) = insert_id { col_insert_id.push(insert_id); - size_bytes_added += std::mem::size_of_val(&insert_id) as u64; + size_bytes_added += insert_id.total_size_bytes(); } col_row_id.push(row.row_id()); - size_bytes_added += std::mem::size_of_val(&row.row_id()) as u64; + size_bytes_added += row.row_id().total_size_bytes(); col_num_instances.push(row.num_instances()); - size_bytes_added += std::mem::size_of_val(&row.num_instances()) as u64; + size_bytes_added += row.num_instances().total_size_bytes(); // insert auto-generated cluster cell if present if let Some(cluster_cell) = generated_cluster_cell { let component = cluster_cell.component_name(); let column = columns.entry(component).or_insert_with(|| { - size_bytes_added += std::mem::size_of_val(&component) as u64; - DataCellColumn::empty(num_rows) + let column = DataCellColumn::empty(num_rows); + size_bytes_added += component.total_size_bytes(); + size_bytes_added += column.total_size_bytes(); + column }); - size_bytes_added += cluster_cell.size_bytes(); + size_bytes_added += cluster_cell.total_size_bytes(); column.0.push(Some(cluster_cell)); } @@ -422,10 +454,12 @@ impl IndexedBucket { for cell in row.cells().iter() { let component = cell.component_name(); let column = columns.entry(component).or_insert_with(|| { - size_bytes_added += std::mem::size_of_val(&component) as u64; - DataCellColumn::empty(col_time.len().saturating_sub(1)) + let column = DataCellColumn::empty(col_time.len().saturating_sub(1)); + size_bytes_added += component.total_size_bytes(); + size_bytes_added += column.total_size_bytes(); + column }); - size_bytes_added += cell.size_bytes(); + size_bytes_added += cell.total_size_bytes(); column.0.push(Some(cell.clone() /* shallow */)); } @@ -439,7 +473,9 @@ impl IndexedBucket { } if !components.contains(component) { - column.0.push(None); + let none_cell: Option = None; + size_bytes_added += none_cell.total_size_bytes(); + column.0.push(none_cell); } } @@ -605,11 +641,11 @@ impl IndexedBucket { self.sanity_check().unwrap(); bucket2.sanity_check().unwrap(); - let total_rows1 = self.num_rows() as i64; - let total_rows2 = bucket2.num_rows() as i64; + let num_rows1 = self.num_rows() as i64; + let num_rows2 = bucket2.num_rows() as i64; debug_assert_eq!( _num_rows as i64, - total_rows1 + total_rows2, + num_rows1 + num_rows2, "expected both buckets to sum up to the length of the original bucket" ); } @@ -747,7 +783,7 @@ impl PersistentIndexedTable { ) { crate::profile_function!(); - let num_rows = self.total_rows() as usize; + let num_rows = self.num_rows() as usize; let Self { ent_path: _, diff --git a/crates/re_arrow_store/src/test_util.rs b/crates/re_arrow_store/src/test_util.rs index 0fe1b28bf8c2..35dc129bddd4 100644 --- a/crates/re_arrow_store/src/test_util.rs +++ b/crates/re_arrow_store/src/test_util.rs @@ -1,28 +1,32 @@ -use crate::DataStoreConfig; +use crate::{DataStore, DataStoreConfig}; // --- #[doc(hidden)] #[macro_export] macro_rules! test_row { - ($entity:ident @ $frames:tt => $n:expr; [$c0:expr $(,)*]) => { - ::re_log_types::DataRow::from_cells1( + ($entity:ident @ $frames:tt => $n:expr; [$c0:expr $(,)*]) => {{ + let mut row = ::re_log_types::DataRow::from_cells1( ::re_log_types::RowId::random(), $entity.clone(), $frames, $n, $c0, - ) - }; - ($entity:ident @ $frames:tt => $n:expr; [$c0:expr, $c1:expr $(,)*]) => { - ::re_log_types::DataRow::from_cells2( + ); + row.compute_all_size_bytes(); + row + }}; + ($entity:ident @ $frames:tt => $n:expr; [$c0:expr, $c1:expr $(,)*]) => {{ + let mut row = ::re_log_types::DataRow::from_cells2( ::re_log_types::RowId::random(), $entity.clone(), $frames, $n, ($c0, $c1), - ) - }; + ); + row.compute_all_size_bytes(); + row + }}; } pub fn all_configs() -> impl Iterator { @@ -51,3 +55,11 @@ pub fn all_configs() -> impl Iterator { enable_typecheck: idx.enable_typecheck, }) } + +pub fn sanity_unwrap(store: &mut DataStore) { + if let err @ Err(_) = store.sanity_check() { + store.sort_indices_if_needed(); + eprintln!("{store}"); + err.unwrap(); + } +} diff --git a/crates/re_arrow_store/tests/correctness.rs b/crates/re_arrow_store/tests/correctness.rs index d6d68fcfe683..fba86298332e 100644 --- a/crates/re_arrow_store/tests/correctness.rs +++ b/crates/re_arrow_store/tests/correctness.rs @@ -6,7 +6,10 @@ use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; use rand::Rng; -use re_arrow_store::{test_row, DataStore, DataStoreConfig, LatestAtQuery, WriteError}; +use re_arrow_store::{ + test_row, test_util::sanity_unwrap, DataStore, DataStoreConfig, DataStoreStats, + GarbageCollectionTarget, LatestAtQuery, WriteError, +}; use re_log_types::{ component_types::InstanceKey, datagen::{ @@ -99,11 +102,7 @@ fn latest_at_emptiness_edge_cases_impl(store: &mut DataStore) { ] => num_instances; [build_some_instances(num_instances as _)])) .unwrap(); - if let err @ Err(_) = store.sanity_check() { - store.sort_indices_if_needed(); - eprintln!("{store}"); - err.unwrap(); - } + sanity_unwrap(store); let timeline_wrong_name = Timeline::new("lag_time", TimeType::Time); let timeline_wrong_kind = Timeline::new("log_time", TimeType::Sequence); @@ -282,6 +281,8 @@ fn gc_correct() { }, ); + let stats_empty = DataStoreStats::from_store(&store); + let mut rng = rand::thread_rng(); let num_frames = rng.gen_range(0..=100); @@ -300,71 +301,37 @@ fn gc_correct() { } } - if let err @ Err(_) = store.sanity_check() { - store.sort_indices_if_needed(); - eprintln!("{store}"); - err.unwrap(); - } + sanity_unwrap(&mut store); + check_still_readable(&store); + + let stats = DataStoreStats::from_store(&store); + + let (row_ids, stats_diff) = store.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0)); + let stats_diff = stats_diff + stats_empty; // account for fixed overhead + + assert_eq!(row_ids.len() as u64, stats.total.num_rows); + assert_eq!( + stats.metadata_registry.num_rows, + stats_diff.metadata_registry.num_rows + ); + assert_eq!( + stats.metadata_registry.num_bytes, + stats_diff.metadata_registry.num_bytes + ); + assert_eq!(stats.temporal.num_rows, stats_diff.temporal.num_rows); + + sanity_unwrap(&mut store); check_still_readable(&store); + for row_id in &row_ids { + assert!(store.get_msg_metadata(row_id).is_none()); + } + + let (row_ids, stats_diff) = store.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0)); + assert!(row_ids.is_empty()); + assert_eq!(DataStoreStats::default(), stats_diff); - // TODO(#1619): bring back garbage collection - - // let row_id_chunks = store.gc( - // GarbageCollectionTarget::DropAtLeastPercentage(1.0), - // Timeline::new("frame_nr", TimeType::Sequence), - // MsgId::name(), - // ); - - // let row_ids = row_id_chunks - // .iter() - // .flat_map(|chunk| arrow_array_deserialize_iterator::>(&**chunk).unwrap()) - // .map(Option::unwrap) // MsgId is always present - // .collect::>(); - // assert!(!row_ids.is_empty()); - - // if let err @ Err(_) = store.sanity_check() { - // store.sort_indices_if_needed(); - // eprintln!("{store}"); - // err.unwrap(); - // } - // check_still_readable(&store); - // for row_id in &row_ids { - // assert!(store.get_msg_metadata(row_id).is_some()); - // } - - // store.clear_msg_metadata(&row_ids); - - // if let err @ Err(_) = store.sanity_check() { - // store.sort_indices_if_needed(); - // eprintln!("{store}"); - // err.unwrap(); - // } - // check_still_readable(&store); - // for row_id in &row_ids { - // assert!(store.get_msg_metadata(row_id).is_none()); - // } - - // let row_id_chunks = store.gc( - // GarbageCollectionTarget::DropAtLeastPercentage(1.0), - // Timeline::new("frame_nr", TimeType::Sequence), - // MsgId::name(), - // ); - - // let row_ids = row_id_chunks - // .iter() - // .flat_map(|chunk| arrow_array_deserialize_iterator::>(&**chunk).unwrap()) - // .map(Option::unwrap) // MsgId is always present - // .collect::>(); - // assert!(row_ids.is_empty()); - - // if let err @ Err(_) = store.sanity_check() { - // store.sort_indices_if_needed(); - // eprintln!("{store}"); - // err.unwrap(); - // } - // check_still_readable(&store); - - // assert_eq!(2, store.total_temporal_component_rows()); + sanity_unwrap(&mut store); + check_still_readable(&store); } fn check_still_readable(_store: &DataStore) { diff --git a/crates/re_arrow_store/tests/data_store.rs b/crates/re_arrow_store/tests/data_store.rs index 29af874ae0e3..5ba2a722bf80 100644 --- a/crates/re_arrow_store/tests/data_store.rs +++ b/crates/re_arrow_store/tests/data_store.rs @@ -11,8 +11,8 @@ use polars_core::{prelude::*, series::Series}; use polars_ops::prelude::DataFrameJoinOps; use rand::Rng; use re_arrow_store::{ - polars_util, test_row, DataStore, DataStoreConfig, LatestAtQuery, RangeQuery, TimeInt, - TimeRange, + polars_util, test_row, test_util::sanity_unwrap, DataStore, DataStoreConfig, DataStoreStats, + GarbageCollectionTarget, LatestAtQuery, RangeQuery, TimeInt, TimeRange, }; use re_log_types::{ component_types::{ColorRGBA, InstanceKey, Point2D, Rect2D}, @@ -48,6 +48,12 @@ fn all_components() { store2.insert_table(&table).unwrap(); } + // Stress test GC + store2.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0)); + for table in store.to_data_tables(None) { + store2.insert_table(&table).unwrap(); + } + let mut store = store2; let timeline = Timeline::new("frame_nr", TimeType::Sequence); @@ -110,11 +116,7 @@ fn all_components() { assert_latest_components_at(&mut store, &ent_path, Some(components_b)); - if let err @ Err(_) = store.sanity_check() { - store.sort_indices_if_needed(); - eprintln!("{store}"); - err.unwrap(); - } + sanity_unwrap(&mut store); } // Tiny buckets, demonstrating the harder-to-reason-about cases. @@ -172,11 +174,7 @@ fn all_components() { assert_latest_components_at(&mut store, &ent_path, Some(components_b)); - if let err @ Err(_) = store.sanity_check() { - store.sort_indices_if_needed(); - eprintln!("{store}"); - err.unwrap(); - } + sanity_unwrap(&mut store); } // Tiny buckets and tricky splits, demonstrating a case that is not only extremely hard to @@ -242,11 +240,7 @@ fn all_components() { assert_latest_components_at(&mut store, &ent_path, Some(components_b)); - if let err @ Err(_) = store.sanity_check() { - store.sort_indices_if_needed(); - eprintln!("{store}"); - err.unwrap(); - } + sanity_unwrap(&mut store); } } @@ -259,14 +253,6 @@ fn latest_at() { for config in re_arrow_store::test_util::all_configs() { let mut store = DataStore::new(InstanceKey::name(), config.clone()); latest_at_impl(&mut store); - - // TODO(#1619): bring back garbage collection - // store.gc( - // GarbageCollectionTarget::DropAtLeastPercentage(1.0), - // Timeline::new("frame_nr", TimeType::Sequence), - // MsgId::name(), - // ); - // latest_at_impl(&mut store); } } @@ -317,13 +303,14 @@ fn latest_at_impl(store: &mut DataStore) { for table in store.to_data_tables(None) { store2.insert_table(&table).unwrap(); } + // Stress test GC + store2.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0)); + for table in store.to_data_tables(None) { + store2.insert_table(&table).unwrap(); + } let mut store = store2; - if let err @ Err(_) = store.sanity_check() { - store.sort_indices_if_needed(); - eprintln!("{store}"); - err.unwrap(); - } + sanity_unwrap(&mut store); let mut assert_latest_components = |frame_nr: TimeInt, rows: &[(ComponentName, &DataRow)]| { let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); @@ -442,11 +429,7 @@ fn range_impl(store: &mut DataStore) { let row4_4 = test_row!(ent_path @ [build_frame_nr(frame4)] => 5; [insts4_3, points4_4]); insert(store, &row4_4); - if let err @ Err(_) = store.sanity_check() { - store.sort_indices_if_needed(); - eprintln!("{store}"); - err.unwrap(); - } + sanity_unwrap(store); // Each entry in `rows_at_times` corresponds to a dataframe that's expected to be returned // by the range query. @@ -462,6 +445,11 @@ fn range_impl(store: &mut DataStore) { for table in store.to_data_tables(None) { store2.insert_table(&table).unwrap(); } + store2.wipe_timeless_data(); + store2.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0)); + for table in store.to_data_tables(None) { + store2.insert_table(&table).unwrap(); + } let mut store = store2; let mut expected_timeless = Vec::::new(); @@ -882,36 +870,32 @@ fn gc_impl(store: &mut DataStore) { } } - if let err @ Err(_) = store.sanity_check() { - store.sort_indices_if_needed(); - eprintln!("{store}"); - err.unwrap(); - } + sanity_unwrap(store); _ = store.to_dataframe(); // simple way of checking that everything is still readable - // TODO(#1619): bring back garbage collection - - // let row_id_chunks = store.gc( - // GarbageCollectionTarget::DropAtLeastPercentage(1.0 / 3.0), - // Timeline::new("frame_nr", TimeType::Sequence), - // MsgId::name(), - // ); - - // let row_ids = row_id_chunks - // .iter() - // .flat_map(|chunk| arrow_array_deserialize_iterator::>(&**chunk).unwrap()) - // .map(Option::unwrap) // MsgId is always present - // .collect::>(); + let stats = DataStoreStats::from_store(store); - // for row_id in &row_ids { - // assert!(store.get_msg_metadata(row_id).is_some()); - // } - - // store.clear_msg_metadata(&row_ids); + let (row_ids, stats_diff) = + store.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0 / 3.0)); + for row_id in &row_ids { + assert!(store.get_msg_metadata(row_id).is_none()); + } - // for row_id in &row_ids { - // assert!(store.get_msg_metadata(row_id).is_none()); - // } + // NOTE: only temporal data and row metadata get purged! + let num_bytes_dropped = + (stats_diff.temporal.num_bytes + stats_diff.metadata_registry.num_bytes) as f64; + let num_bytes_dropped_expected_min = + (stats.temporal.num_bytes + stats.metadata_registry.num_bytes) as f64 * 0.95 / 3.0; + let num_bytes_dropped_expected_max = + (stats.temporal.num_bytes + stats.metadata_registry.num_bytes) as f64 * 1.05 / 3.0; + assert!( + num_bytes_dropped_expected_min <= num_bytes_dropped + && num_bytes_dropped <= num_bytes_dropped_expected_max, + "{} <= {} <= {}", + re_format::format_bytes(num_bytes_dropped_expected_min), + re_format::format_bytes(num_bytes_dropped), + re_format::format_bytes(num_bytes_dropped_expected_max), + ); } } diff --git a/crates/re_arrow_store/tests/dump.rs b/crates/re_arrow_store/tests/dump.rs index 5293870bf882..5a18c4d962e9 100644 --- a/crates/re_arrow_store/tests/dump.rs +++ b/crates/re_arrow_store/tests/dump.rs @@ -3,7 +3,10 @@ use std::sync::atomic::{AtomicBool, Ordering}; use itertools::Itertools; -use re_arrow_store::{test_row, DataStore, DataStoreStats, TimeInt, TimeRange, Timeline}; +use re_arrow_store::{ + test_row, test_util::sanity_unwrap, DataStore, DataStoreStats, GarbageCollectionTarget, + TimeInt, TimeRange, Timeline, +}; use re_log_types::{ component_types::InstanceKey, datagen::{ @@ -27,6 +30,16 @@ fn data_store_dump() { let mut store3 = DataStore::new(InstanceKey::name(), config.clone()); data_store_dump_impl(&mut store1, &mut store2, &mut store3); + + // stress-test GC impl + store1.wipe_timeless_data(); + store1.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0)); + store2.wipe_timeless_data(); + store2.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0)); + store3.wipe_timeless_data(); + store3.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0)); + + data_store_dump_impl(&mut store1, &mut store2, &mut store3); } } @@ -52,31 +65,19 @@ fn data_store_dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: for table in &tables { insert_table(store1, table); } - if let err @ Err(_) = store1.sanity_check() { - store1.sort_indices_if_needed(); - eprintln!("{store1}"); - err.unwrap(); - } + sanity_unwrap(store1); // Dump the first store into the second one. for table in store1.to_data_tables(None) { store2.insert_table(&table).unwrap(); } - if let err @ Err(_) = store2.sanity_check() { - store2.sort_indices_if_needed(); - eprintln!("{store2}"); - err.unwrap(); - } + sanity_unwrap(store2); // Dump the second store into the third one. for table in store2.to_data_tables(None) { store3.insert_table(&table).unwrap(); } - if let err @ Err(_) = store3.sanity_check() { - store3.sort_indices_if_needed(); - eprintln!("{store3}"); - err.unwrap(); - } + sanity_unwrap(store3); let store1_df = store1.to_dataframe(); let store2_df = store2.to_dataframe(); @@ -94,12 +95,14 @@ fn data_store_dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: let store2_stats = DataStoreStats::from_store(store2); let store3_stats = DataStoreStats::from_store(store3); assert!( - store1_stats <= store2_stats, + store1_stats.temporal.num_bytes <= store2_stats.temporal.num_bytes + && store1_stats.timeless.num_bytes <= store2_stats.timeless.num_bytes, "First store should have <= amount of data of second store:\n\ {store1_stats:#?}\n{store2_stats:#?}" ); assert!( - store2_stats <= store3_stats, + store2_stats.temporal.num_bytes <= store3_stats.temporal.num_bytes + && store2_stats.timeless.num_bytes <= store3_stats.timeless.num_bytes, "Second store should have <= amount of data of third store:\n\ {store2_stats:#?}\n{store3_stats:#?}" ); @@ -119,6 +122,12 @@ fn data_store_dump_filtered() { let mut store2 = DataStore::new(InstanceKey::name(), config.clone()); data_store_dump_filtered_impl(&mut store1, &mut store2); + + // stress-test GC impl + store1.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0)); + store2.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0)); + + data_store_dump_filtered_impl(&mut store1, &mut store2); } } @@ -140,11 +149,7 @@ fn data_store_dump_filtered_impl(store1: &mut DataStore, store2: &mut DataStore) for table in &tables { store1.insert_table(table).unwrap(); } - if let err @ Err(_) = store1.sanity_check() { - store1.sort_indices_if_needed(); - eprintln!("{store1}"); - err.unwrap(); - } + sanity_unwrap(store1); // Dump frame1 from the first store into the second one. for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame1, frame1)).into()) { @@ -166,11 +171,7 @@ fn data_store_dump_filtered_impl(store1: &mut DataStore, store2: &mut DataStore) for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame4, frame4)).into()) { store2.insert_table(&table).unwrap(); } - if let err @ Err(_) = store2.sanity_check() { - store2.sort_indices_if_needed(); - eprintln!("{store2}"); - err.unwrap(); - } + sanity_unwrap(store2); let store1_df = store1.to_dataframe(); let store2_df = store2.to_dataframe(); @@ -182,7 +183,8 @@ fn data_store_dump_filtered_impl(store1: &mut DataStore, store2: &mut DataStore) let store1_stats = DataStoreStats::from_store(store1); let store2_stats = DataStoreStats::from_store(store2); assert!( - store1_stats <= store2_stats, + store1_stats.temporal.num_bytes <= store2_stats.temporal.num_bytes + && store1_stats.timeless.num_bytes <= store2_stats.timeless.num_bytes, "First store should have <= amount of data of second store:\n\ {store1_stats:#?}\n{store2_stats:#?}" ); diff --git a/crates/re_arrow_store/tests/internals.rs b/crates/re_arrow_store/tests/internals.rs index 9233438b72e0..63888bf65b21 100644 --- a/crates/re_arrow_store/tests/internals.rs +++ b/crates/re_arrow_store/tests/internals.rs @@ -53,7 +53,7 @@ fn pathological_bucket_topology() { let timepoint = TimePoint::from([build_frame_nr(frame_nr.into())]); for _ in 0..num { let row = DataRow::from_cells1( - RowId::ZERO, + RowId::random(), ent_path.clone(), timepoint.clone(), num_instances, @@ -62,7 +62,7 @@ fn pathological_bucket_topology() { store_forward.insert_row(&row).unwrap(); let row = DataRow::from_cells1( - RowId::ZERO, + RowId::random(), ent_path.clone(), timepoint.clone(), num_instances, @@ -84,7 +84,7 @@ fn pathological_bucket_topology() { .map(|frame_nr| { let timepoint = TimePoint::from([build_frame_nr(frame_nr.into())]); DataRow::from_cells1( - RowId::ZERO, + RowId::random(), ent_path.clone(), timepoint, num_instances, diff --git a/crates/re_data_store/Cargo.toml b/crates/re_data_store/Cargo.toml index 4b5991b37c0a..607efc62ef3b 100644 --- a/crates/re_data_store/Cargo.toml +++ b/crates/re_data_store/Cargo.toml @@ -25,10 +25,11 @@ serde = ["dep:serde", "re_log_types/serde"] [dependencies] re_arrow_store.workspace = true +re_format.workspace = true re_int_histogram.workspace = true +re_log.workspace = true re_log_encoding = { workspace = true, optional = true } re_log_types.workspace = true -re_log.workspace = true re_smart_channel.workspace = true ahash.workspace = true diff --git a/crates/re_data_store/src/log_db.rs b/crates/re_data_store/src/log_db.rs index 5821b97cac2a..b520262aa4f9 100644 --- a/crates/re_data_store/src/log_db.rs +++ b/crates/re_data_store/src/log_db.rs @@ -204,8 +204,8 @@ impl LogDb { } pub fn num_rows(&self) -> usize { - self.entity_db.data_store.total_timeless_rows() as usize - + self.entity_db.data_store.total_temporal_rows() as usize + self.entity_db.data_store.num_timeless_rows() as usize + + self.entity_db.data_store.num_temporal_rows() as usize } pub fn is_empty(&self) -> bool { @@ -251,9 +251,16 @@ impl LogDb { crate::profile_function!(); assert!((0.0..=1.0).contains(&fraction_to_purge)); - // TODO(#1619): bring back garbage collection - let drop_row_ids: ahash::HashSet<_> = Default::default(); + let (drop_row_ids, stats_diff) = self.entity_db.data_store.gc( + re_arrow_store::GarbageCollectionTarget::DropAtLeastFraction(fraction_to_purge as _), + ); + re_log::debug!( + num_row_ids_dropped = drop_row_ids.len(), + size_bytes_dropped = re_format::format_bytes(stats_diff.total.num_bytes as _), + "purged datastore" + ); + let drop_row_ids: ahash::HashSet<_> = drop_row_ids.into_iter().collect(); let cutoff_times = self.entity_db.data_store.oldest_time_per_timeline(); let Self { diff --git a/crates/re_log_types/src/data_cell.rs b/crates/re_log_types/src/data_cell.rs index 316d25f7978d..568df8fe8928 100644 --- a/crates/re_log_types/src/data_cell.rs +++ b/crates/re_log_types/src/data_cell.rs @@ -1,8 +1,9 @@ use std::sync::Arc; +use arrow2::datatypes::DataType; use itertools::Itertools as _; -use crate::{Component, ComponentName, DeserializableComponent, SerializableComponent}; +use crate::{Component, ComponentName, DeserializableComponent, SerializableComponent, SizeBytes}; // --- @@ -114,7 +115,8 @@ pub struct DataCellInner { // TODO(#1696): Store this within the datatype itself. pub(crate) name: ComponentName, - /// The size in bytes of both the underlying arrow data _and_ the inner cell itself. + /// The pre-computed size of the cell (stack + heap) as well as its underlying arrow data, + /// in bytes. /// /// This is always zero unless [`Self::compute_size_bytes`] has been called, which is a very /// costly operation. @@ -412,7 +414,6 @@ impl DataCell { pub fn is_sorted_and_unique(&self) -> DataCellResult { use arrow2::{ array::{Array, PrimitiveArray}, - datatypes::DataType, types::NativeType, }; @@ -441,24 +442,6 @@ impl DataCell { _ => Err(DataCellError::UnsupportedDatatype(arr.data_type().clone())), } } - - /// Returns the total (heap) allocated size of the cell in bytes, provided that - /// [`Self::compute_size_bytes`] has been called first (zero otherwise). - /// - /// This is an approximation, accurate enough for most purposes (stats, GC trigger, ...). - /// - /// This is `O(1)`, the value is computed and cached by calling [`Self::compute_size_bytes`]. - #[inline] - pub fn size_bytes(&self) -> u64 { - let Self { inner } = self; - - (inner.size_bytes > 0) - .then_some(std::mem::size_of_val(inner) as u64 + inner.size_bytes) - .unwrap_or_else(|| { - re_log::warn_once!("called `DataCell::size_bytes() without computing it first"); - 0 - }) - } } // --- @@ -492,7 +475,7 @@ impl std::fmt::Display for DataCell { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!( "DataCell({})", - re_format::format_bytes(self.size_bytes() as _) + re_format::format_bytes(self.total_size_bytes() as _) ))?; re_format::arrow::format_table( // NOTE: wrap in a ListArray so that it looks more cell-like (i.e. single row) @@ -506,7 +489,8 @@ impl std::fmt::Display for DataCell { // --- impl DataCell { - /// Compute and cache the total (heap) allocated size of the underlying arrow array in bytes. + /// Compute and cache the total size (stack + heap) of the inner cell and its underlying arrow + /// array, in bytes. /// This does nothing if the size has already been computed and cached before. /// /// The caller must the sole owner of this cell, as this requires mutating an `Arc` under the @@ -523,8 +507,23 @@ impl DataCell { } } +impl SizeBytes for DataCell { + #[inline] + fn heap_size_bytes(&self) -> u64 { + (self.inner.size_bytes > 0) + .then_some(self.inner.size_bytes) + .unwrap_or_else(|| { + re_log::warn_once!( + "called `DataCell::heap_size_bytes() without computing it first" + ); + 0 + }) + } +} + impl DataCellInner { - /// Compute and cache the total (heap) allocated size of the underlying arrow array in bytes. + /// Compute and cache the total size (stack + heap) of the cell and its underlying arrow array, + /// in bytes. /// This does nothing if the size has already been computed and cached before. /// /// Beware: this is _very_ costly! @@ -541,10 +540,11 @@ impl DataCellInner { return; } - *size_bytes = (std::mem::size_of_val(name) - + std::mem::size_of_val(size_bytes) - + std::mem::size_of_val(values)) as u64 - + arrow2::compute::aggregate::estimated_bytes_size(&*self.values) as u64; + *size_bytes = name.total_size_bytes() + + size_bytes.total_size_bytes() + + values.data_type().total_size_bytes() + + std::mem::size_of_val(values) as u64 + + arrow2::compute::aggregate::estimated_bytes_size(&**values) as u64; } } @@ -556,8 +556,8 @@ fn data_cell_sizes() { // not computed { let cell = DataCell::from_arrow(InstanceKey::name(), UInt64Array::from_vec(vec![]).boxed()); - assert_eq!(0, cell.size_bytes()); - assert_eq!(0, cell.size_bytes()); + assert_eq!(0, cell.heap_size_bytes()); + assert_eq!(0, cell.heap_size_bytes()); } // zero-sized @@ -566,9 +566,8 @@ fn data_cell_sizes() { DataCell::from_arrow(InstanceKey::name(), UInt64Array::from_vec(vec![]).boxed()); cell.compute_size_bytes(); - // only the size of the outer & inner cells themselves - assert_eq!(56, cell.size_bytes()); - assert_eq!(56, cell.size_bytes()); + assert_eq!(112, cell.heap_size_bytes()); + assert_eq!(112, cell.heap_size_bytes()); } // anything else @@ -579,9 +578,9 @@ fn data_cell_sizes() { ); cell.compute_size_bytes(); - // 56 bytes for the inner & outer cells + 3x u64s - assert_eq!(80, cell.size_bytes()); - assert_eq!(80, cell.size_bytes()); + // zero-sized + 3x u64s + assert_eq!(136, cell.heap_size_bytes()); + assert_eq!(136, cell.heap_size_bytes()); } } diff --git a/crates/re_log_types/src/data_row.rs b/crates/re_log_types/src/data_row.rs index 460a645bb20a..6cee609fa8fb 100644 --- a/crates/re_log_types/src/data_row.rs +++ b/crates/re_log_types/src/data_row.rs @@ -2,7 +2,9 @@ use ahash::HashSetExt; use nohash_hasher::IntSet; use smallvec::SmallVec; -use crate::{ComponentName, DataCell, DataCellError, DataTable, EntityPath, TableId, TimePoint}; +use crate::{ + ComponentName, DataCell, DataCellError, DataTable, EntityPath, SizeBytes, TableId, TimePoint, +}; // --- @@ -126,6 +128,13 @@ impl RowId { } } +impl SizeBytes for RowId { + #[inline] + fn heap_size_bytes(&self) -> u64 { + 0 + } +} + impl std::ops::Deref for RowId { type Target = re_tuid::Tuid; diff --git a/crates/re_log_types/src/data_table.rs b/crates/re_log_types/src/data_table.rs index 4c54140be722..adc9df0e0344 100644 --- a/crates/re_log_types/src/data_table.rs +++ b/crates/re_log_types/src/data_table.rs @@ -7,7 +7,7 @@ use smallvec::SmallVec; use crate::{ ArrowMsg, ComponentName, DataCell, DataCellError, DataRow, DataRowError, EntityPath, RowId, - TimePoint, Timeline, + SizeBytes, TimePoint, Timeline, }; // --- @@ -41,9 +41,6 @@ pub enum DataTableError { pub type DataTableResult = ::std::result::Result; -// TODO(#1757): The timepoint should be serialized as one column per timeline... that would be both -// more efficient and yield much better debugging views of our tables. - // --- pub type RowIdVec = SmallVec<[RowId; 4]>; @@ -107,8 +104,7 @@ impl DataCellColumn { Self(smallvec::smallvec![None; num_rows]) } - /// Compute and cache the total (heap) allocated size of each individual underlying - /// [`DataCell`]. + /// Compute and cache the size of each individual underlying [`DataCell`]. /// This does nothing for cells whose size has already been computed and cached before. /// /// Beware: this is _very_ costly! @@ -120,6 +116,13 @@ impl DataCellColumn { } } +impl SizeBytes for DataCellColumn { + #[inline] + fn heap_size_bytes(&self) -> u64 { + self.0.heap_size_bytes() + } +} + // --- /// A unique ID for a [`DataTable`]. @@ -161,6 +164,13 @@ impl TableId { } } +impl SizeBytes for TableId { + #[inline] + fn heap_size_bytes(&self) -> u64 { + 0 + } +} + impl std::ops::Deref for TableId { type Target = re_tuid::Tuid; @@ -515,6 +525,27 @@ impl DataTable { } } +impl SizeBytes for DataTable { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { + table_id, + col_row_id, + col_timelines, + col_entity_path, + col_num_instances, + columns, + } = self; + + table_id.heap_size_bytes() + + col_row_id.heap_size_bytes() + + col_timelines.heap_size_bytes() + + col_entity_path.heap_size_bytes() + + col_num_instances.heap_size_bytes() + + columns.heap_size_bytes() + } +} + // --- Serialization --- use arrow2::{ @@ -697,8 +728,6 @@ impl DataTable { let mut field = Field::new(name, data.data_type().clone(), false) .with_metadata([(METADATA_KIND.to_owned(), METADATA_KIND_CONTROL.to_owned())].into()); - // TODO(cmc): why do we have to do this manually on the way out, but it's done - // automatically on our behalf on the way in...? if let DataType::Extension(name, _, _) = data.data_type() { field .metadata @@ -724,8 +753,6 @@ impl DataTable { let mut field = Field::new(name, datatype.clone(), false) .with_metadata([(METADATA_KIND.to_owned(), METADATA_KIND_CONTROL.to_owned())].into()); - // TODO(cmc): why do we have to do this manually on the way out, but it's done - // automatically on our behalf on the way in...? if let DataType::Extension(name, _, _) = datatype { field .metadata @@ -979,6 +1006,8 @@ impl DataTable { .downcast_ref::>() .ok_or(DataTableError::NotAColumn(component.to_string()))? .iter() + // TODO(#1805): Schema metadata gets cloned in every single array. + // This'll become a problem as soon as we enable batching. .map(|array| array.map(|values| DataCell::from_arrow(component, values))) .collect(), )) diff --git a/crates/re_log_types/src/lib.rs b/crates/re_log_types/src/lib.rs index 0a20ae6ef4da..8f964b8195a7 100644 --- a/crates/re_log_types/src/lib.rs +++ b/crates/re_log_types/src/lib.rs @@ -17,6 +17,7 @@ mod data_table; pub mod hash; mod index; pub mod path; +mod size_bytes; mod time; pub mod time_point; mod time_range; @@ -53,6 +54,7 @@ pub use self::data_table::{ }; pub use self::index::*; pub use self::path::*; +pub use self::size_bytes::SizeBytes; pub use self::time::{Duration, Time}; pub use self::time_point::{TimeInt, TimePoint, TimeType, Timeline, TimelineName}; pub use self::time_range::{TimeRange, TimeRangeF}; diff --git a/crates/re_log_types/src/path/component_name.rs b/crates/re_log_types/src/path/component_name.rs index 4e0020903746..bb96e862f84c 100644 --- a/crates/re_log_types/src/path/component_name.rs +++ b/crates/re_log_types/src/path/component_name.rs @@ -1,3 +1,5 @@ +use crate::SizeBytes; + re_string_interner::declare_new_type!( /// The name of an entity component, e.g. `pos` or `color`. pub struct ComponentName; @@ -15,6 +17,7 @@ impl ComponentName { /// Excludes the rerun namespace, so you'll get `color` but `ext.confidence`. /// /// Used for most UI elements. + #[inline] pub fn short_name(&self) -> &'static str { let full_name = self.0.as_str(); if let Some(short_name) = full_name.strip_prefix("rerun.") { @@ -24,3 +27,10 @@ impl ComponentName { } } } + +impl SizeBytes for ComponentName { + #[inline] + fn heap_size_bytes(&self) -> u64 { + 0 + } +} diff --git a/crates/re_log_types/src/path/entity_path.rs b/crates/re_log_types/src/path/entity_path.rs index 1a68576555c0..d23b257aa64b 100644 --- a/crates/re_log_types/src/path/entity_path.rs +++ b/crates/re_log_types/src/path/entity_path.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use crate::{ hash::Hash64, parse_entity_path, path::entity_path_impl::EntityPathImpl, EntityPathPart, + SizeBytes, }; // ---------------------------------------------------------------------------- @@ -156,6 +157,13 @@ impl EntityPath { } } +impl SizeBytes for EntityPath { + #[inline] + fn heap_size_bytes(&self) -> u64 { + 0 // NOTE: we assume it's amortized due to the `Arc` + } +} + impl FromIterator for EntityPath { fn from_iter>(parts: T) -> Self { Self::new(parts.into_iter().collect()) diff --git a/crates/re_log_types/src/size_bytes.rs b/crates/re_log_types/src/size_bytes.rs new file mode 100644 index 000000000000..a670eee44d74 --- /dev/null +++ b/crates/re_log_types/src/size_bytes.rs @@ -0,0 +1,173 @@ +use std::collections::{BTreeMap, HashMap}; + +use arrow2::datatypes::{DataType, Field}; +use smallvec::SmallVec; + +// --- + +/// Approximations of stack and heap size for both internal and external types. +/// +/// Motly used for statistics and triggering events such as garbage collection. +pub trait SizeBytes: Sized { + /// Returns the total size of `self` in bytes, accounting for both stack and heap space. + #[inline] + fn total_size_bytes(&self) -> u64 { + self.stack_size_bytes() + self.heap_size_bytes() + } + + /// Returns the total size of `self` on the stack, in bytes. + /// + /// Defaults to `std::mem::size_of_val(self)`. + #[inline] + fn stack_size_bytes(&self) -> u64 { + std::mem::size_of_val(self) as _ + } + + /// Returns the total size of `self` on the heap, in bytes. + fn heap_size_bytes(&self) -> u64; +} + +// --- Std --- + +impl SizeBytes for String { + #[inline] + fn heap_size_bytes(&self) -> u64 { + self.capacity() as u64 + } +} + +impl SizeBytes for BTreeMap { + #[inline] + fn heap_size_bytes(&self) -> u64 { + // TODO(cmc): This is sub-optimal if these types are PODs. + + // NOTE: It's all on the heap at this point. + self.keys().map(SizeBytes::total_size_bytes).sum::() + + self.values().map(SizeBytes::total_size_bytes).sum::() + } +} + +impl SizeBytes for HashMap { + #[inline] + fn heap_size_bytes(&self) -> u64 { + // TODO(cmc): This is sub-optimal if these types are PODs. + + // NOTE: It's all on the heap at this point. + self.keys().map(SizeBytes::total_size_bytes).sum::() + + self.values().map(SizeBytes::total_size_bytes).sum::() + } +} + +impl SizeBytes for Vec { + /// Does not take capacity into account. + #[inline] + fn heap_size_bytes(&self) -> u64 { + // TODO(cmc): This is sub-optimal if these types are PODs. + + // NOTE: It's all on the heap at this point. + self.iter().map(SizeBytes::total_size_bytes).sum::() + } +} + +impl SizeBytes for SmallVec<[T; N]> { + /// Does not take capacity into account. + #[inline] + fn heap_size_bytes(&self) -> u64 { + // TODO(cmc): This is sub-optimal if these types are PODs. + + // NOTE: It's all on the heap at this point. + self.iter().map(SizeBytes::total_size_bytes).sum::() + } +} + +impl SizeBytes for Option { + #[inline] + fn heap_size_bytes(&self) -> u64 { + self.as_ref().map_or(0, SizeBytes::heap_size_bytes) + } +} + +// NOTE: `impl SizeBytesExt for T {}` would be nice but violates orphan rules. +macro_rules! impl_size_bytes_pod { + ($ty:ty) => { + impl SizeBytes for $ty { + #[inline] + fn heap_size_bytes(&self) -> u64 { + 0 + } + } + }; + ($ty:ty, $($rest:ty),+) => { + impl_size_bytes_pod!($ty); impl_size_bytes_pod!($($rest),+); + }; +} + +impl_size_bytes_pod!(u8, u16, u32, u64, u128, i8, i16, i32, i64, i128, bool, f32, f64); + +// --- Arrow --- + +impl SizeBytes for DataType { + #[inline] + fn heap_size_bytes(&self) -> u64 { + match self { + DataType::Null + | DataType::Binary + | DataType::Boolean + | DataType::Date32 + | DataType::Date64 + | DataType::Float16 + | DataType::Float32 + | DataType::Float64 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Int8 + | DataType::LargeBinary + | DataType::LargeUtf8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::UInt8 + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Duration(_) + | DataType::Interval(_) + | DataType::FixedSizeBinary(_) + | DataType::Decimal(_, _) + | DataType::Decimal256(_, _) + | DataType::Utf8 => 0, + DataType::Timestamp(_, str) => str.heap_size_bytes(), + DataType::List(field) + | DataType::FixedSizeList(field, _) + | DataType::LargeList(field) + | DataType::Map(field, _) => field.total_size_bytes(), // NOTE: Boxed, it's all on the heap + DataType::Struct(fields) => fields.heap_size_bytes(), + DataType::Union(fields, indices, _) => { + fields.heap_size_bytes() + indices.heap_size_bytes() + } + DataType::Dictionary(_, datatype, _) => datatype.total_size_bytes(), // NOTE: Boxed, it's all on the heap + DataType::Extension(name, datatype, extra) => { + name.heap_size_bytes() + + datatype.total_size_bytes() // NOTE: Boxed, it's all on the heap + + extra.heap_size_bytes() + } + } + } +} + +impl SizeBytes for Field { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Field { + name, + data_type, + is_nullable, + metadata, + } = self; + + name.heap_size_bytes() + + data_type.heap_size_bytes() + + is_nullable.heap_size_bytes() + + metadata.heap_size_bytes() + } +} diff --git a/crates/re_log_types/src/time_point/mod.rs b/crates/re_log_types/src/time_point/mod.rs index 79a1eeae526f..4074810b8222 100644 --- a/crates/re_log_types/src/time_point/mod.rs +++ b/crates/re_log_types/src/time_point/mod.rs @@ -3,7 +3,7 @@ use std::collections::{btree_map, BTreeMap}; mod time_int; mod timeline; -use crate::{time::Time, TimeRange}; +use crate::{time::Time, SizeBytes, TimeRange}; // Re-exports pub use time_int::TimeInt; @@ -73,6 +73,7 @@ impl TimePoint { /// Computes the union of two `TimePoint`s, keeping the maximum time value in case of /// conflicts. + #[inline] pub fn union_max(mut self, rhs: &Self) -> Self { for (&timeline, &time) in rhs { match self.0.entry(timeline) { @@ -89,6 +90,23 @@ impl TimePoint { } } +impl SizeBytes for TimePoint { + #[inline] + fn heap_size_bytes(&self) -> u64 { + type K = Timeline; + type V = TimeInt; + + // NOTE: This is only here to make sure this method fails to compile if the inner type + // changes, as the following size computation assumes POD types. + let inner: &BTreeMap = &self.0; + + let keys_size_bytes = std::mem::size_of::() * inner.len(); + let values_size_bytes = std::mem::size_of::() * inner.len(); + + (keys_size_bytes + values_size_bytes) as u64 + } +} + // ---------------------------------------------------------------------------- /// The type of a [`TimeInt`] or [`Timeline`]. diff --git a/crates/re_log_types/src/time_point/timeline.rs b/crates/re_log_types/src/time_point/timeline.rs index 20c207365cbc..d42f92d0869c 100644 --- a/crates/re_log_types/src/time_point/timeline.rs +++ b/crates/re_log_types/src/time_point/timeline.rs @@ -1,6 +1,6 @@ use arrow2::datatypes::{DataType, TimeUnit}; -use crate::{TimeRange, TimeType}; +use crate::{SizeBytes, TimeRange, TimeType}; re_string_interner::declare_new_type!( /// The name of a timeline. Often something like `"log_time"` or `"frame_nr"`. @@ -100,6 +100,13 @@ impl Timeline { impl nohash_hasher::IsEnabled for Timeline {} +impl SizeBytes for Timeline { + #[inline] + fn heap_size_bytes(&self) -> u64 { + 0 + } +} + // required for [`nohash_hasher`]. #[allow(clippy::derive_hash_xor_eq)] impl std::hash::Hash for Timeline { diff --git a/crates/re_log_types/src/time_range.rs b/crates/re_log_types/src/time_range.rs index 68b7e4a7f159..e8350932e9cc 100644 --- a/crates/re_log_types/src/time_range.rs +++ b/crates/re_log_types/src/time_range.rs @@ -1,6 +1,6 @@ use std::ops::RangeInclusive; -use crate::{TimeInt, TimeReal}; +use crate::{SizeBytes, TimeInt, TimeReal}; // ---------------------------------------------------------------------------- @@ -62,6 +62,13 @@ impl TimeRange { } } +impl SizeBytes for TimeRange { + #[inline] + fn heap_size_bytes(&self) -> u64 { + 0 + } +} + impl From for RangeInclusive { fn from(range: TimeRange) -> RangeInclusive { range.min..=range.max diff --git a/crates/re_query/src/query.rs b/crates/re_query/src/query.rs index 5c4bd34194b0..e52a1deea003 100644 --- a/crates/re_query/src/query.rs +++ b/crates/re_query/src/query.rs @@ -168,7 +168,7 @@ pub fn __populate_example_store() -> DataStore { let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; let row = DataRow::from_cells2( - RowId::ZERO, + RowId::random(), ent_path, timepoint, instances.len() as _, @@ -180,7 +180,7 @@ pub fn __populate_example_store() -> DataStore { let colors = vec![ColorRGBA(0xff000000)]; let row = DataRow::from_cells2( - RowId::ZERO, + RowId::random(), ent_path, timepoint, instances.len() as _, diff --git a/crates/re_viewer/src/ui/memory_panel.rs b/crates/re_viewer/src/ui/memory_panel.rs index c2e9f9450059..796c98977a9d 100644 --- a/crates/re_viewer/src/ui/memory_panel.rs +++ b/crates/re_viewer/src/ui/memory_panel.rs @@ -1,4 +1,4 @@ -use re_arrow_store::{DataStoreConfig, DataStoreStats}; +use re_arrow_store::{DataStoreConfig, DataStoreRowStats, DataStoreStats}; use re_format::{format_bytes, format_number}; use re_memory::{util::sec_since_start, MemoryHistory, MemoryLimit, MemoryUse}; use re_renderer::WgpuResourcePoolStatistics; @@ -26,7 +26,7 @@ impl MemoryPanel { (gpu_resource_stats.total_buffer_size_in_bytes + gpu_resource_stats.total_texture_size_in_bytes) as _, ), - Some(store_stats.total_size_bytes as _), + Some(store_stats.total.num_bytes as _), ); } @@ -215,13 +215,13 @@ impl MemoryPanel { .num_columns(3) .show(ui, |ui| { let DataStoreStats { - total_timeless_rows, - total_timeless_size_bytes, - total_temporal_rows, - total_temporal_size_bytes, - total_temporal_buckets, - total_rows, - total_size_bytes, + type_registry, + metadata_registry, + autogenerated, + timeless, + temporal, + temporal_buckets, + total, } = *store_stats; ui.label(egui::RichText::new("Stats").italics()); @@ -230,30 +230,44 @@ impl MemoryPanel { ui.label("Size"); ui.end_row(); - let label_buckets = |ui: &mut egui::Ui, num_buckets| { - ui.label(re_format::format_number(num_buckets as _)) - }; - let label_rows = - |ui: &mut egui::Ui, num_rows| ui.label(re_format::format_number(num_rows as _)); - let label_size = - |ui: &mut egui::Ui, size| ui.label(re_format::format_bytes(size as _)); + fn label_row_stats(ui: &mut egui::Ui, row_stats: DataStoreRowStats) { + let DataStoreRowStats { + num_rows, + num_bytes, + } = row_stats; + + ui.label(re_format::format_number(num_rows as _)); + ui.label(re_format::format_bytes(num_bytes as _)); + } + + ui.label("Type registry:"); + ui.label(""); + label_row_stats(ui, type_registry); + ui.end_row(); + + ui.label("Metadata registry:"); + ui.label(""); + label_row_stats(ui, metadata_registry); + ui.end_row(); + + ui.label("Cluster cache:"); + ui.label(""); + label_row_stats(ui, autogenerated); + ui.end_row(); ui.label("Timeless:"); ui.label(""); - label_rows(ui, total_timeless_rows); - label_size(ui, total_timeless_size_bytes); + label_row_stats(ui, timeless); ui.end_row(); ui.label("Temporal:"); - label_buckets(ui, total_temporal_buckets); - label_rows(ui, total_temporal_rows); - label_size(ui, total_temporal_size_bytes); + ui.label(re_format::format_number(temporal_buckets as _)); + label_row_stats(ui, temporal); ui.end_row(); ui.label("Total"); - label_buckets(ui, total_temporal_buckets); - label_rows(ui, total_rows); - label_size(ui, total_size_bytes); + ui.label(re_format::format_number(temporal_buckets as _)); + label_row_stats(ui, total); ui.end_row(); }); } diff --git a/crates/re_viewer/src/ui/time_panel/mod.rs b/crates/re_viewer/src/ui/time_panel/mod.rs index c937cb5374c3..e106cb4ed74a 100644 --- a/crates/re_viewer/src/ui/time_panel/mod.rs +++ b/crates/re_viewer/src/ui/time_panel/mod.rs @@ -747,9 +747,12 @@ fn initialize_time_ranges_ui( .prefix_times .get(ctx.rec_cfg.time_ctrl.timeline()) { - let timeline_axis = TimelineAxis::new(ctx.rec_cfg.time_ctrl.time_type(), times); - time_view = time_view.or_else(|| Some(view_everything(&time_x_range, &timeline_axis))); - time_range.extend(timeline_axis.ranges); + // NOTE: `times` can be empty if a GC wiped everything. + if !times.is_empty() { + let timeline_axis = TimelineAxis::new(ctx.rec_cfg.time_ctrl.time_type(), times); + time_view = time_view.or_else(|| Some(view_everything(&time_x_range, &timeline_axis))); + time_range.extend(timeline_axis.ranges); + } } TimeRangesUi::new( diff --git a/scripts/lint.py b/scripts/lint.py index c74719628d13..594748c94056 100755 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -18,7 +18,7 @@ debug_format_of_err = re.compile(r"\{\:#?\?\}.*, err") error_match_name = re.compile(r"Err\((\w+)\)") wasm_caps = re.compile(r"\bWASM\b") -nb_prefix = re.compile(r"\bnb_") +nb_prefix = re.compile(r"nb_") def lint_line(line: str) -> Optional[str]: @@ -102,6 +102,7 @@ def test_lint_line() -> None: "if let Err(error) = foo", "We use WASM in Rerun", "nb_instances", + "inner_nb_instances", ] for line in should_pass: