From 5cd5814f5119e4780341a7087ef79b0e628eedcd Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Tue, 4 Apr 2023 12:23:46 +0200 Subject: [PATCH] wip --- crates/re_arrow_store/benches/data_store.rs | 2 +- crates/re_arrow_store/src/store.rs | 68 ++--- crates/re_arrow_store/src/store_arrow.rs | 3 +- crates/re_arrow_store/src/store_format.rs | 17 +- crates/re_arrow_store/src/store_polars.rs | 3 +- crates/re_arrow_store/src/store_read.rs | 6 +- crates/re_arrow_store/src/store_sanity.rs | 210 ++++++++------ crates/re_arrow_store/src/store_stats.rs | 266 ++++++++++++++---- crates/re_arrow_store/src/store_write.rs | 151 ++++++---- crates/re_arrow_store/tests/internals.rs | 4 +- crates/re_data_store/examples/memory_usage.rs | 6 +- crates/re_data_store/src/log_db.rs | 3 +- .../benches/msg_encode_benchmark.rs | 6 +- crates/re_log_types/src/arrow_msg.rs | 4 +- crates/re_log_types/src/data_cell.rs | 84 +++++- crates/re_log_types/src/data_row.rs | 12 + crates/re_log_types/src/data_table.rs | 79 ++++-- crates/re_sdk/src/msg_sender.rs | 8 +- crates/re_viewer/src/ui/data_ui/log_msg.rs | 2 +- crates/re_viewer/src/ui/event_log_view.rs | 2 +- crates/re_viewer/src/ui/memory_panel.rs | 45 +-- rerun_py/src/arrow.rs | 5 +- rerun_py/src/python_bridge.rs | 30 +- 23 files changed, 694 insertions(+), 322 deletions(-) diff --git a/crates/re_arrow_store/benches/data_store.rs b/crates/re_arrow_store/benches/data_store.rs index 34531e8aef9d0..c448355999693 100644 --- a/crates/re_arrow_store/benches/data_store.rs +++ b/crates/re_arrow_store/benches/data_store.rs @@ -259,7 +259,7 @@ fn build_table(n: usize, packed: bool) -> DataTable { // Do a serialization roundtrip to pack everything in contiguous memory. if packed { let (schema, columns) = table.serialize().unwrap(); - table = DataTable::deserialize(MsgId::ZERO, &schema, &columns).unwrap(); + table = DataTable::deserialize(MsgId::ZERO, &schema, &columns, false).unwrap(); } table diff --git a/crates/re_arrow_store/src/store.rs b/crates/re_arrow_store/src/store.rs index c8824e4016712..5399bd9903410 100644 --- a/crates/re_arrow_store/src/store.rs +++ b/crates/re_arrow_store/src/store.rs @@ -262,7 +262,7 @@ fn datastore_internal_repr() { }, ); - let timeless = DataTable::example(false); + let timeless = DataTable::example(true); eprintln!("{timeless}"); store.insert_table(&timeless).unwrap(); @@ -317,38 +317,31 @@ pub struct IndexedTable { /// to free up space. pub all_components: IntSet, - /// The total number of rows in this indexed table, accounting for all buckets. - pub total_rows: u64, + /// The number of rows stored in this table, across all of its buckets, in bytes. + pub buckets_num_rows: u64, - /// The size of this table in bytes across all of its buckets, accounting for both data and - /// metadata. + /// The size of both the control & component data stored in this table, across all of its + /// buckets, in bytes. /// - /// Accurately computing the size of arrow arrays is surprisingly costly, which is why we - /// cache this. - /// Also: there are many buckets. - pub total_size_bytes: u64, + /// This is a best-effort approximation, adequate for most purposes (stats, + /// triggering GCs, ...). + pub buckets_size_bytes: u64, } impl IndexedTable { pub fn new(cluster_key: ComponentName, timeline: Timeline, ent_path: EntityPath) -> Self { + let bucket = IndexedBucket::new(cluster_key, timeline); + let buckets_size_bytes = bucket.size_bytes(); Self { timeline, ent_path, - buckets: [(i64::MIN.into(), IndexedBucket::new(cluster_key, timeline))].into(), + buckets: [(i64::MIN.into(), bucket)].into(), cluster_key, all_components: Default::default(), - total_rows: 0, - total_size_bytes: 0, // TODO(#1619) + buckets_num_rows: 0, + buckets_size_bytes, } } - - /// Returns a read-only iterator over the raw buckets. - /// - /// Do _not_ use this to try and test the internal state of the datastore. - #[doc(hidden)] - pub fn iter_buckets(&self) -> impl ExactSizeIterator { - self.buckets.values() - } } /// An `IndexedBucket` holds a chunk of rows from an [`IndexedTable`] @@ -414,16 +407,18 @@ pub struct IndexedBucketInner { /// (i.e. the table is sparse). pub columns: IntMap, - /// The size of this bucket in bytes, accounting for both data and metadata. + /// The size of both the control & component data stored in this bucket, in bytes. /// - /// Accurately computing the size of arrow arrays is surprisingly costly, which is why we - /// cache this. - pub total_size_bytes: u64, + /// This is a best-effort approximation, adequate for most purposes (stats, + /// triggering GCs, ...). + /// + /// We cache this because there can be many, many buckets. + pub size_bytes: u64, } impl Default for IndexedBucketInner { fn default() -> Self { - Self { + let mut this = Self { is_sorted: true, time_range: TimeRange::new(i64::MAX.into(), i64::MIN.into()), col_time: Default::default(), @@ -431,8 +426,10 @@ impl Default for IndexedBucketInner { col_row_id: Default::default(), col_num_instances: Default::default(), columns: Default::default(), - total_size_bytes: 0, // TODO(#1619) - } + size_bytes: 0, // NOTE: computed below + }; + this.compute_size_bytes(); + this } } @@ -476,15 +473,20 @@ pub struct PersistentIndexedTable { /// The cells are optional since not all rows will have data for every single component /// (i.e. the table is sparse). pub columns: IntMap, - - /// The size of this indexed table in bytes, accounting for both data and metadata. - /// - /// Accurately computing the size of arrow arrays is surprisingly costly, which is why we - /// cache this. - pub total_size_bytes: u64, } impl PersistentIndexedTable { + pub fn new(cluster_key: ComponentName, ent_path: EntityPath) -> Self { + Self { + cluster_key, + ent_path, + col_insert_id: Default::default(), + col_row_id: Default::default(), + col_num_instances: Default::default(), + columns: Default::default(), + } + } + pub fn is_empty(&self) -> bool { self.col_num_instances.is_empty() } diff --git a/crates/re_arrow_store/src/store_arrow.rs b/crates/re_arrow_store/src/store_arrow.rs index f9016ecde50e2..b7db9157855e4 100644 --- a/crates/re_arrow_store/src/store_arrow.rs +++ b/crates/re_arrow_store/src/store_arrow.rs @@ -38,7 +38,7 @@ impl IndexedBucket { col_row_id, col_num_instances, columns, - total_size_bytes: _, + size_bytes: _, } = &*inner.read(); serialize( @@ -72,7 +72,6 @@ impl PersistentIndexedTable { col_row_id, col_num_instances, columns, - total_size_bytes: _, } = self; serialize( diff --git a/crates/re_arrow_store/src/store_format.rs b/crates/re_arrow_store/src/store_format.rs index 9974f725c2281..975f2f81dd239 100644 --- a/crates/re_arrow_store/src/store_format.rs +++ b/crates/re_arrow_store/src/store_format.rs @@ -34,8 +34,8 @@ impl std::fmt::Display for DataStore { format!( "{} timeless indexed tables, for a total of {} across {} total rows\n", timeless_tables.len(), - format_bytes(self.total_timeless_index_size_bytes() as _), - format_number(self.total_timeless_index_rows() as _) + format_bytes(self.total_timeless_size_bytes() as _), + format_number(self.total_timeless_rows() as _) ), ))?; f.write_str(&indent::indent_all_by(4, "timeless_tables: [\n"))?; @@ -53,8 +53,8 @@ impl std::fmt::Display for DataStore { format!( "{} indexed tables, for a total of {} across {} total rows\n", tables.len(), - format_bytes(self.total_temporal_index_size_bytes() as _), - format_number(self.total_temporal_index_rows() as _) + format_bytes(self.total_temporal_size_bytes() as _), + format_number(self.total_temporal_rows() as _) ), ))?; f.write_str(&indent::indent_all_by(4, "tables: [\n"))?; @@ -83,8 +83,8 @@ impl std::fmt::Display for IndexedTable { buckets, cluster_key: _, all_components: _, - total_rows: _, - total_size_bytes: _, + buckets_num_rows: _, + buckets_size_bytes: _, } = self; f.write_fmt(format_args!("timeline: {}\n", timeline.name()))?; @@ -116,8 +116,8 @@ impl std::fmt::Display for IndexedBucket { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!( "size: {} across {} rows\n", - format_bytes(self.total_size_bytes() as _), - format_number(self.total_rows() as _), + format_bytes(self.size_bytes() as _), + format_number(self.num_rows() as _), ))?; let time_range = { @@ -156,7 +156,6 @@ impl std::fmt::Display for PersistentIndexedTable { col_row_id: _, col_num_instances: _, columns: _, - total_size_bytes: _, } = self; f.write_fmt(format_args!("entity: {ent_path}\n"))?; diff --git a/crates/re_arrow_store/src/store_polars.rs b/crates/re_arrow_store/src/store_polars.rs index 494b320aad550..6ef71c714377f 100644 --- a/crates/re_arrow_store/src/store_polars.rs +++ b/crates/re_arrow_store/src/store_polars.rs @@ -176,7 +176,6 @@ impl PersistentIndexedTable { col_row_id, col_num_instances, columns, - total_size_bytes: _, } = self; let num_rows = self.total_rows() as usize; @@ -217,7 +216,7 @@ impl IndexedBucket { col_row_id, col_num_instances, columns, - total_size_bytes: _, + size_bytes: _, } = &*self.inner.read(); let (_, times) = DataTable::serialize_primitive_column( diff --git a/crates/re_arrow_store/src/store_read.rs b/crates/re_arrow_store/src/store_read.rs index 59ba178c41d90..e9820914feb09 100644 --- a/crates/re_arrow_store/src/store_read.rs +++ b/crates/re_arrow_store/src/store_read.rs @@ -655,7 +655,7 @@ impl IndexedBucket { col_row_id: _, col_num_instances: _, columns, - total_size_bytes: _, // TODO(#1619) + size_bytes: _, } = &*self.inner.read(); debug_assert!(is_sorted); @@ -761,7 +761,7 @@ impl IndexedBucket { col_row_id, col_num_instances: _, columns, - total_size_bytes: _, // TODO(#1619) + size_bytes: _, } = &*self.inner.read(); debug_assert!(is_sorted); @@ -873,7 +873,7 @@ impl IndexedBucketInner { col_row_id, col_num_instances, columns, - total_size_bytes: _, + size_bytes: _, } = self; if *is_sorted { diff --git a/crates/re_arrow_store/src/store_sanity.rs b/crates/re_arrow_store/src/store_sanity.rs index 7f5b416c753c0..a8ba1f8d4ae38 100644 --- a/crates/re_arrow_store/src/store_sanity.rs +++ b/crates/re_arrow_store/src/store_sanity.rs @@ -8,6 +8,20 @@ use crate::{DataStore, IndexedBucket, IndexedBucketInner, IndexedTable, Persiste #[derive(thiserror::Error, Debug)] pub enum SanityError { + #[error("Reported size for {origin} is out of sync: got {got}, expected {expected}")] + SizeOutOfSync { + origin: &'static str, + expected: String, + got: String, + }, + + #[error("Reported number of rows for {origin} is out of sync: got {got}, expected {expected}")] + RowsOutOfSync { + origin: &'static str, + expected: String, + got: String, + }, + #[error("Column '{component}' has too few/many rows: got {got} instead of {expected}")] ColumnLengthMismatch { component: ComponentName, @@ -53,77 +67,7 @@ impl DataStore { } } -// --- Persistent Indices --- - -impl PersistentIndexedTable { - /// Runs the sanity check suite for the entire table. - /// - /// Returns an error if anything looks wrong. - pub fn sanity_check(&self) -> SanityResult<()> { - crate::profile_function!(); - - let Self { - ent_path: _, - cluster_key, - col_insert_id, - col_row_id, - col_num_instances, - columns, - total_size_bytes: _, // TODO(#1619) - } = self; - - // All columns should be `Self::num_rows` long. - { - let num_rows = self.total_rows(); - - let column_lengths = [ - (!col_insert_id.is_empty()) - .then(|| (DataStore::insert_id_key(), col_insert_id.len())), // - Some((COLUMN_ROW_ID.into(), col_row_id.len())), - Some((COLUMN_NUM_INSTANCES.into(), col_num_instances.len())), - ] - .into_iter() - .flatten() - .chain( - columns - .iter() - .map(|(component, column)| (*component, column.len())), - ) - .map(|(component, len)| (component, len as u64)); - - for (component, len) in column_lengths { - if len != num_rows { - return Err(SanityError::ColumnLengthMismatch { - component, - expected: num_rows, - got: len, - }); - } - } - } - - // The cluster column must be fully dense. - { - let cluster_column = - columns - .get(cluster_key) - .ok_or(SanityError::ClusterColumnMissing { - cluster_key: *cluster_key, - })?; - if !cluster_column.iter().all(|cell| cell.is_some()) { - return Err(SanityError::ClusterColumnSparse { - cluster_column: cluster_column.clone().into(), - }); - } - } - - // TODO(#1619): recomputing shouldnt change the size - - Ok(()) - } -} - -// --- Indices --- +// --- Temporal --- impl IndexedTable { /// Runs the sanity check suite for the entire table. @@ -152,6 +96,32 @@ impl IndexedTable { } } + // Make sure row numbers aren't out of sync + { + let total_rows = self.total_rows(); + let total_rows_uncached = self.total_rows_uncached(); + if total_rows != total_rows_uncached { + return Err(SanityError::RowsOutOfSync { + origin: std::any::type_name::(), + expected: re_format::format_number(total_rows_uncached as _), + got: re_format::format_number(total_rows as _), + }); + } + } + + // Make sure size values aren't out of sync + { + let total_size_bytes = self.total_size_bytes(); + let total_size_bytes_uncached = self.total_size_bytes_uncached(); + if total_size_bytes != total_size_bytes_uncached { + return Err(SanityError::SizeOutOfSync { + origin: std::any::type_name::(), + expected: re_format::format_bytes(total_size_bytes_uncached as _), + got: re_format::format_bytes(total_size_bytes as _), + }); + } + } + // Run individual bucket sanity check suites too. for bucket in self.buckets.values() { bucket.sanity_check()?; @@ -174,16 +144,99 @@ impl IndexedBucket { inner, } = self; - let IndexedBucketInner { - is_sorted: _, - time_range: _, - col_time, + { + let IndexedBucketInner { + is_sorted: _, + time_range: _, + col_time, + col_insert_id, + col_row_id, + col_num_instances, + columns, + size_bytes: _, + } = &*inner.read(); + + // All columns should be `Self::num_rows` long. + { + let num_rows = self.num_rows(); + + let column_lengths = [ + (!col_insert_id.is_empty()) + .then(|| (DataStore::insert_id_key(), col_insert_id.len())), // + Some((COLUMN_TIMEPOINT.into(), col_time.len())), + Some((COLUMN_ROW_ID.into(), col_row_id.len())), + Some((COLUMN_NUM_INSTANCES.into(), col_num_instances.len())), + ] + .into_iter() + .flatten() + .chain( + columns + .iter() + .map(|(component, column)| (*component, column.len())), + ) + .map(|(component, len)| (component, len as u64)); + + for (component, len) in column_lengths { + if len != num_rows { + return Err(SanityError::ColumnLengthMismatch { + component, + expected: num_rows, + got: len, + }); + } + } + } + + // The cluster column must be fully dense. + { + let cluster_column = + columns + .get(cluster_key) + .ok_or(SanityError::ClusterColumnMissing { + cluster_key: *cluster_key, + })?; + if !cluster_column.iter().all(|cell| cell.is_some()) { + return Err(SanityError::ClusterColumnSparse { + cluster_column: cluster_column.clone().into(), + }); + } + } + } + + // Make sure size values aren't out of sync + { + let size_bytes = inner.read().size_bytes; + let size_bytes_uncached = inner.write().compute_size_bytes(); + if size_bytes != size_bytes_uncached { + return Err(SanityError::SizeOutOfSync { + origin: std::any::type_name::(), + expected: re_format::format_bytes(size_bytes_uncached as _), + got: re_format::format_bytes(size_bytes as _), + }); + } + } + + Ok(()) + } +} + +// --- Timeless --- + +impl PersistentIndexedTable { + /// Runs the sanity check suite for the entire table. + /// + /// Returns an error if anything looks wrong. + pub fn sanity_check(&self) -> SanityResult<()> { + crate::profile_function!(); + + let Self { + ent_path: _, + cluster_key, col_insert_id, col_row_id, col_num_instances, columns, - total_size_bytes: _, // TODO(#1619) - } = &*inner.read(); + } = self; // All columns should be `Self::num_rows` long. { @@ -192,7 +245,6 @@ impl IndexedBucket { let column_lengths = [ (!col_insert_id.is_empty()) .then(|| (DataStore::insert_id_key(), col_insert_id.len())), // - Some((COLUMN_TIMEPOINT.into(), col_time.len())), Some((COLUMN_ROW_ID.into(), col_row_id.len())), Some((COLUMN_NUM_INSTANCES.into(), col_num_instances.len())), ] @@ -231,8 +283,6 @@ impl IndexedBucket { } } - // TODO(#1619): recomputing shouldnt change the size - Ok(()) } } diff --git a/crates/re_arrow_store/src/store_stats.rs b/crates/re_arrow_store/src/store_stats.rs index fb836133d5da1..65bc49fcc2b46 100644 --- a/crates/re_arrow_store/src/store_stats.rs +++ b/crates/re_arrow_store/src/store_stats.rs @@ -1,19 +1,24 @@ -use crate::{DataStore, DataStoreConfig, IndexedBucket, IndexedTable, PersistentIndexedTable}; +use nohash_hasher::IntMap; +use re_log_types::{ComponentName, DataCellColumn}; + +use crate::{ + store::IndexedBucketInner, DataStore, DataStoreConfig, IndexedBucket, IndexedTable, + PersistentIndexedTable, +}; // --- -// TODO(cmc): compute incrementally once/if this becomes too expensive. -#[derive(Default, Debug)] +#[derive(Default, Debug, Clone)] pub struct DataStoreStats { - pub total_timeless_index_rows: u64, - pub total_timeless_index_size_bytes: u64, + pub total_timeless_rows: u64, + pub total_timeless_size_bytes: u64, - pub total_temporal_index_rows: u64, - pub total_temporal_index_size_bytes: u64, - pub total_temporal_index_buckets: u64, + pub total_temporal_rows: u64, + pub total_temporal_size_bytes: u64, + pub total_temporal_buckets: u64, - pub total_index_rows: u64, - pub total_index_size_bytes: u64, + pub total_rows: u64, + pub total_size_bytes: u64, pub config: DataStoreConfig, } @@ -22,27 +27,26 @@ impl DataStoreStats { pub fn from_store(store: &DataStore) -> Self { crate::profile_function!(); - let total_timeless_index_rows = store.total_timeless_index_rows(); - let total_timeless_index_size_bytes = store.total_timeless_index_size_bytes(); + let total_timeless_rows = store.total_timeless_rows(); + let total_timeless_size_bytes = store.total_timeless_size_bytes(); - let total_temporal_index_rows = store.total_temporal_index_rows(); - let total_temporal_index_size_bytes = store.total_temporal_index_size_bytes(); - let total_temporal_index_buckets = store.total_temporal_index_buckets(); + let total_temporal_rows = store.total_temporal_rows(); + let total_temporal_size_bytes = store.total_temporal_size_bytes(); + let total_temporal_buckets = store.total_temporal_buckets(); - let total_index_rows = total_timeless_index_rows + total_temporal_index_rows; - let total_index_size_bytes = - total_timeless_index_size_bytes + total_temporal_index_size_bytes; + let total_rows = total_timeless_rows + total_temporal_rows; + let total_size_bytes = total_timeless_size_bytes + total_temporal_size_bytes; Self { - total_timeless_index_rows, - total_timeless_index_size_bytes, + total_timeless_rows, + total_timeless_size_bytes, - total_temporal_index_rows, - total_temporal_index_size_bytes, - total_temporal_index_buckets, + total_temporal_rows, + total_temporal_size_bytes, + total_temporal_buckets, - total_index_rows, - total_index_size_bytes, + total_rows, + total_size_bytes, config: store.config.clone(), } @@ -55,7 +59,7 @@ impl DataStore { /// Returns the number of timeless index rows stored across this entire store, i.e. the sum of /// the number of rows across all of its timeless indexed tables. #[inline] - pub fn total_timeless_index_rows(&self) -> u64 { + pub fn total_timeless_rows(&self) -> u64 { crate::profile_function!(); self.timeless_tables .values() @@ -66,7 +70,7 @@ impl DataStore { /// Returns the size of the timeless index data stored across this entire store, i.e. the sum /// of the size of the data stored across all of its timeless indexed tables, in bytes. #[inline] - pub fn total_timeless_index_size_bytes(&self) -> u64 { + pub fn total_timeless_size_bytes(&self) -> u64 { crate::profile_function!(); self.timeless_tables .values() @@ -77,7 +81,7 @@ impl DataStore { /// Returns the number of temporal index rows stored across this entire store, i.e. the sum of /// the number of rows across all of its temporal indexed tables. #[inline] - pub fn total_temporal_index_rows(&self) -> u64 { + pub fn total_temporal_rows(&self) -> u64 { crate::profile_function!(); self.tables.values().map(|table| table.total_rows()).sum() } @@ -85,7 +89,7 @@ impl DataStore { /// Returns the size of the temporal index data stored across this entire store, i.e. the sum /// of the size of the data stored across all of its temporal indexed tables, in bytes. #[inline] - pub fn total_temporal_index_size_bytes(&self) -> u64 { + pub fn total_temporal_size_bytes(&self) -> u64 { crate::profile_function!(); self.tables .values() @@ -95,7 +99,7 @@ impl DataStore { /// Returns the number of temporal indexed buckets stored across this entire store. #[inline] - pub fn total_temporal_index_buckets(&self) -> u64 { + pub fn total_temporal_buckets(&self) -> u64 { crate::profile_function!(); self.tables .values() @@ -104,42 +108,92 @@ impl DataStore { } } -// --- Persistent Indices --- +// --- Temporal --- -impl PersistentIndexedTable { - /// Returns the number of rows stored across this table. +impl IndexedTable { + /// Returns the number of rows stored across this entire table, i.e. the sum of the number + /// of rows stored across all of its buckets. #[inline] pub fn total_rows(&self) -> u64 { - self.col_num_instances.len() as _ - } - - /// Returns the size of the data stored across this table, in bytes. - #[inline] - pub fn total_size_bytes(&self) -> u64 { - self.total_size_bytes + crate::profile_function!(); + self.buckets_num_rows } -} -// --- Indices --- - -impl IndexedTable { /// Returns the number of rows stored across this entire table, i.e. the sum of the number /// of rows stored across all of its buckets. + /// + /// Recomputed from scratch, for sanity checking. #[inline] - pub fn total_rows(&self) -> u64 { - self.total_rows + pub fn total_rows_uncached(&self) -> u64 { + crate::profile_function!(); + self.buckets.values().map(|bucket| bucket.num_rows()).sum() } - /// Returns the size of data stored across this entire table, i.e. the sum of the size of - /// the data stored across all of its buckets, in bytes. + /// The size of both the control & component data stored in this table, across all of its + /// buckets, in bytes. + /// + /// This is a best-effort approximation, adequate for most purposes (stats, + /// triggering GCs, ...). #[inline] pub fn total_size_bytes(&self) -> u64 { - self.total_size_bytes + crate::profile_function!(); + + let Self { + timeline, + ent_path, + cluster_key, + buckets: _, + all_components, + buckets_num_rows: _, + buckets_size_bytes, + } = self; + + let size_bytes = std::mem::size_of_val(timeline) + + std::mem::size_of_val(ent_path) + + std::mem::size_of_val(cluster_key) + + (all_components.len() * std::mem::size_of::()); + + size_bytes as u64 + buckets_size_bytes + } + + /// The size of both the control & component data stored in this table, across all of its + /// buckets, in bytes. + /// + /// This is a best-effort approximation, adequate for most purposes (stats, + /// triggering GCs, ...). + /// + /// Recomputed from scratch, for sanity checking. + #[inline] + pub fn total_size_bytes_uncached(&self) -> u64 { + crate::profile_function!(); + + let Self { + timeline, + ent_path, + cluster_key, + buckets, + all_components, + buckets_num_rows: _, + buckets_size_bytes: _, + } = self; + + let buckets_size_bytes = buckets + .values() + .map(|bucket| bucket.size_bytes()) + .sum::(); + + let size_bytes = std::mem::size_of_val(timeline) + + std::mem::size_of_val(ent_path) + + std::mem::size_of_val(cluster_key) + + (all_components.len() * std::mem::size_of::()); + + size_bytes as u64 + buckets_size_bytes } /// Returns the number of buckets stored across this entire table. #[inline] pub fn total_buckets(&self) -> u64 { + crate::profile_function!(); self.buckets.len() as _ } } @@ -147,13 +201,121 @@ impl IndexedTable { impl IndexedBucket { /// Returns the number of rows stored across this bucket. #[inline] - pub fn total_rows(&self) -> u64 { + pub fn num_rows(&self) -> u64 { + crate::profile_function!(); self.inner.read().col_time.len() as u64 } - /// Returns the size of the data stored across this bucket, in bytes. + /// The size of both the control & component data stored in this bucket, in bytes. + /// + /// This is a best-effort approximation, adequate for most purposes (stats, + /// triggering GCs, ...). + #[inline] + pub fn size_bytes(&self) -> u64 { + crate::profile_function!(); + + let Self { + timeline, + cluster_key, + inner, + } = self; + + (std::mem::size_of_val(timeline) + std::mem::size_of_val(cluster_key)) as u64 + + inner.read().size_bytes + } +} + +impl IndexedBucketInner { + /// Computes and caches the size of both the control & component data stored in this bucket, + /// in bytes. + /// + /// This is a best-effort approximation, adequate for most purposes (stats, + /// triggering GCs, ...). + #[inline] + pub fn compute_size_bytes(&mut self) -> u64 { + crate::profile_function!(); + + let Self { + is_sorted, + time_range, + col_time, + col_insert_id, + col_row_id, + col_num_instances, + columns, + size_bytes, + } = self; + + let control_size_bytes = std::mem::size_of_val(is_sorted) + + std::mem::size_of_val(time_range) + + std::mem::size_of_val(col_time.as_slice()) + + std::mem::size_of_val(col_insert_id.as_slice()) + + std::mem::size_of_val(col_row_id.as_slice()) + + std::mem::size_of_val(col_num_instances.as_slice()) + + std::mem::size_of_val(size_bytes); + + let data_size_bytes = compute_columns_size_bytes(columns); + + *size_bytes = control_size_bytes as u64 + data_size_bytes; + + *size_bytes + } +} + +// --- Timeless --- + +impl PersistentIndexedTable { + /// Returns the number of rows stored across this table. + #[inline] + pub fn total_rows(&self) -> u64 { + self.col_num_instances.len() as _ + } + + /// The size of both the control & component data stored in this table, in bytes. + /// + /// This is a best-effort approximation, adequate for most purposes (stats, + /// triggering GCs, ...). #[inline] pub fn total_size_bytes(&self) -> u64 { - self.inner.read().total_size_bytes + let Self { + ent_path, + cluster_key, + col_insert_id, + col_row_id, + col_num_instances, + columns, + } = self; + + let control_size_bytes = std::mem::size_of_val(ent_path) + + std::mem::size_of_val(cluster_key) + + std::mem::size_of_val(col_insert_id.as_slice()) + + std::mem::size_of_val(col_row_id.as_slice()) + + std::mem::size_of_val(col_num_instances.as_slice()); + + let data_size_bytes = compute_columns_size_bytes(columns); + + control_size_bytes as u64 + data_size_bytes } } + +// --- Common --- + +/// Computes the size in bytes of an entire table's worth of arrow data. +#[allow(dead_code)] +fn compute_columns_size_bytes(columns: &IntMap) -> u64 { + crate::profile_function!(); + let keys = (columns.keys().len() * std::mem::size_of::()) as u64; + let cells = columns + .values() + .flat_map(|column| column.iter()) + .flatten() // option + .map(|cell| cell.size_bytes()) + .sum::(); + keys + cells +} + +#[test] +fn compute_table_size_bytes_ignore_headers() { + let columns = Default::default(); + assert_eq!(0, compute_columns_size_bytes(&columns)); +} diff --git a/crates/re_arrow_store/src/store_write.rs b/crates/re_arrow_store/src/store_write.rs index b85822ca5a1dc..7ad3081ca19f4 100644 --- a/crates/re_arrow_store/src/store_write.rs +++ b/crates/re_arrow_store/src/store_write.rs @@ -7,7 +7,7 @@ use smallvec::SmallVec; use re_log::{debug, trace}; use re_log_types::{ component_types::InstanceKey, ComponentName, DataCell, DataCellColumn, DataCellError, DataRow, - DataTable, EntityPath, TimeInt, TimeRange, + DataTable, TimeInt, TimeRange, }; use crate::{ @@ -18,7 +18,6 @@ use crate::{ // TODO(#1619): // - The store should insert column-per-column rather than row-per-row (purely a performance // matter) -// - Bring back size stats // --- Data store --- @@ -245,10 +244,11 @@ impl IndexedTable { let (_, bucket) = self.find_bucket_mut(time); - let len = bucket.total_rows(); + let len = bucket.num_rows(); let len_overflow = len > config.indexed_bucket_num_rows; if len_overflow { + let bucket_size_before = bucket.size_bytes(); if let Some((min, second_half)) = bucket.split() { trace!( kind = "insert", @@ -261,7 +261,12 @@ impl IndexedTable { "splitting off indexed bucket following overflow" ); + let bucket_size_after = bucket.size_bytes(); + self.buckets_size_bytes -= bucket_size_before - bucket_size_after; + + self.buckets_size_bytes += second_half.size_bytes(); self.buckets.insert(min, second_half); + return self.insert_row(config, insert_id, time, generated_cluster_cell, row); } @@ -306,17 +311,25 @@ impl IndexedTable { new_time_bound = timeline.typ().format(new_time_bound.into()), "creating brand new indexed bucket following overflow" ); + + let (inner, inner_size_bytes) = { + let mut inner = IndexedBucketInner { + time_range: TimeRange::new(time, time), + ..Default::default() + }; + let size_bytes = inner.compute_size_bytes(); + (inner, size_bytes) + }; self.buckets.insert( (new_time_bound).into(), IndexedBucket { timeline, cluster_key: self.cluster_key, - inner: RwLock::new(IndexedBucketInner { - time_range: TimeRange::new(time, time), - ..Default::default() - }), + inner: RwLock::new(inner), }, ); + + self.buckets_size_bytes += inner_size_bytes; return self.insert_row(config, insert_id, time, generated_cluster_cell, row); } } @@ -341,7 +354,9 @@ impl IndexedTable { "inserted into indexed tables" ); - bucket.insert_row(insert_id, time, generated_cluster_cell, row, &components); + self.buckets_size_bytes += + bucket.insert_row(insert_id, time, generated_cluster_cell, row, &components); + self.buckets_num_rows += 1; // Insert components last, only if bucket-insert succeeded. self.all_components.extend(components); @@ -349,6 +364,7 @@ impl IndexedTable { } impl IndexedBucket { + /// Returns the size in bytes of the inserted arrow data. fn insert_row( &mut self, insert_id: Option, @@ -356,12 +372,13 @@ impl IndexedBucket { generated_cluster_cell: Option, row: &DataRow, components: &IntSet, - ) { + ) -> u64 { crate::profile_function!(); - let num_rows = self.total_rows() as usize; + let mut size_bytes_added = 0u64; + let num_rows = self.num_rows() as usize; - let mut guard = self.inner.write(); + let mut inner = self.inner.write(); let IndexedBucketInner { is_sorted, time_range, @@ -370,25 +387,32 @@ impl IndexedBucket { col_row_id, col_num_instances, columns, - total_size_bytes: _, // TODO(#1619) - } = &mut *guard; + size_bytes, + } = &mut *inner; // append time to primary column and update time range appropriately col_time.push(time.as_i64()); *time_range = TimeRange::new(time_range.min.min(time), time_range.max.max(time)); + size_bytes_added += std::mem::size_of_val(&time.as_i64()) as u64; // update all control columns if let Some(insert_id) = insert_id { col_insert_id.push(insert_id); + size_bytes_added += std::mem::size_of_val(&insert_id) as u64; } col_row_id.push(row.row_id()); + size_bytes_added += std::mem::size_of_val(&row.row_id()) as u64; col_num_instances.push(row.num_instances()); + size_bytes_added += std::mem::size_of_val(&row.num_instances()) as u64; // insert auto-generated cluster cell if present if let Some(cluster_cell) = generated_cluster_cell { - let column = columns - .entry(cluster_cell.component_name()) - .or_insert_with(|| DataCellColumn::empty(num_rows)); + let component = cluster_cell.component_name(); + let column = columns.entry(component).or_insert_with(|| { + size_bytes_added += std::mem::size_of_val(&component) as u64; + DataCellColumn::empty(num_rows) + }); + size_bytes_added += cluster_cell.size_bytes(); column.0.push(Some(cluster_cell)); } @@ -396,9 +420,12 @@ impl IndexedBucket { // 2-way merge, step 1: left-to-right for cell in row.cells().iter() { - let column = columns - .entry(cell.component_name()) - .or_insert_with(|| DataCellColumn::empty(col_time.len().saturating_sub(1))); + let component = cell.component_name(); + let column = columns.entry(component).or_insert_with(|| { + size_bytes_added += std::mem::size_of_val(&component) as u64; + DataCellColumn::empty(col_time.len().saturating_sub(1)) + }); + size_bytes_added += cell.size_bytes(); column.0.push(Some(cell.clone() /* shallow */)); } @@ -419,11 +446,15 @@ impl IndexedBucket { // TODO(#433): re_datastore: properly handle already sorted data during insertion *is_sorted = false; + *size_bytes += size_bytes_added; + #[cfg(debug_assertions)] { - drop(guard); // sanity checking will grab the lock! + drop(inner); self.sanity_check().unwrap(); } + + size_bytes_added } /// Splits the bucket into two, potentially uneven parts. @@ -458,8 +489,8 @@ impl IndexedBucket { inner, } = self; - let mut inner = inner.write(); - inner.sort(); + let mut inner1 = inner.write(); + inner1.sort(); let IndexedBucketInner { is_sorted: _, @@ -469,8 +500,8 @@ impl IndexedBucket { col_row_id: col_row_id1, col_num_instances: col_num_instances1, columns: columns1, - total_size_bytes: _, // NOTE: recomputed from scratch for both halves - } = &mut *inner; + size_bytes: _, // NOTE: recomputed below + } = &mut *inner1; if col_time1.len() < 2 { return None; // early exit: can't split the unsplittable @@ -485,8 +516,9 @@ impl IndexedBucket { crate::profile_function!(); let timeline = *timeline; + // Used in debug builds to assert that we've left everything in a sane state. - let _total_rows = col_time1.len(); + let _num_rows = col_time1.len(); fn split_off_column( column: &mut SmallVec<[T; N]>, @@ -541,10 +573,8 @@ impl IndexedBucket { .collect() }; - let bucket2 = Self { - timeline, - cluster_key: self.cluster_key, - inner: RwLock::new(IndexedBucketInner { + let inner2 = { + let mut inner2 = IndexedBucketInner { is_sorted: true, time_range: time_range2, col_time: col_time2, @@ -552,36 +582,47 @@ impl IndexedBucket { col_row_id: col_row_id2, col_num_instances: col_num_instances2, columns: columns2, - total_size_bytes: 0, // TODO(#1619) - }), + size_bytes: 0, // NOTE: computed below + }; + inner2.compute_size_bytes(); + inner2 + }; + let bucket2 = Self { + timeline, + cluster_key: self.cluster_key, + inner: RwLock::new(inner2), }; - // TODO(#1619): bring back size stats - // bucket2.compute_total_size_bytes(); (time_range2.min, bucket2) }; - // TODO(#1619): bring back size stats - // inner.compute_total_size_bytes(); + inner1.compute_size_bytes(); // sanity checks #[cfg(debug_assertions)] { - drop(inner); // sanity checking will grab the lock! + drop(inner1); // sanity checking will grab the lock! self.sanity_check().unwrap(); bucket2.sanity_check().unwrap(); - // TODO(#1619): check size_bytes sum too - let total_rows1 = self.total_rows() as i64; - let total_rows2 = bucket2.total_rows() as i64; - debug_assert!( - _total_rows as i64 == total_rows1 + total_rows2, - "expected both buckets to sum up to the length of the original bucket: \ - got bucket={} vs. bucket1+bucket2={}", - _total_rows, + let total_rows1 = self.num_rows() as i64; + let total_rows2 = bucket2.num_rows() as i64; + debug_assert_eq!( + _num_rows as i64, total_rows1 + total_rows2, + "expected both buckets to sum up to the length of the original bucket" ); - debug_assert_eq!(_total_rows as i64, total_rows1 + total_rows2); + + // TODO(cmc): Cannot work without more effort because of the fixed overhead... fixable, + // but not today. + // + // let total_size_bytes1 = self.size_bytes() as i64; + // let total_size_bytes2 = bucket2.size_bytes() as i64; + // debug_assert_eq!( + // _size_bytes as i64, + // total_size_bytes1 + total_size_bytes2, + // "expected both buckets to sum up to the size in bytes of the original bucket" + // ); } Some((min2, bucket2)) @@ -709,18 +750,6 @@ fn split_time_range_off( // --- Timeless --- impl PersistentIndexedTable { - pub fn new(cluster_key: ComponentName, ent_path: EntityPath) -> Self { - Self { - cluster_key, - ent_path, - col_insert_id: Default::default(), - col_row_id: Default::default(), - col_num_instances: Default::default(), - columns: Default::default(), - total_size_bytes: 0, // TODO(#1619) - } - } - fn insert_row( &mut self, insert_id: Option, @@ -738,19 +767,19 @@ impl PersistentIndexedTable { col_row_id, col_num_instances, columns, - total_size_bytes: _, // TODO(#1619) } = self; let components: IntSet<_> = row.component_names().collect(); - // update all control columns + // --- update all control columns --- + if let Some(insert_id) = insert_id { col_insert_id.push(insert_id); } col_row_id.push(row.row_id()); col_num_instances.push(row.num_instances()); - // append components to their respective columns (2-way merge) + // --- append components to their respective columns (2-way merge) --- // insert auto-generated cluster cell if present if let Some(cluster_cell) = generated_cluster_cell { @@ -771,7 +800,7 @@ impl PersistentIndexedTable { // 2-way merge, step 2: right-to-left // // fill unimpacted secondary indices with null values - for (component, column) in columns { + for (component, column) in columns.iter_mut() { // The cluster key always gets added one way or another, don't try to force fill it! if *component == self.cluster_key { continue; diff --git a/crates/re_arrow_store/tests/internals.rs b/crates/re_arrow_store/tests/internals.rs index 1b776d2619dcf..9233438b72e0a 100644 --- a/crates/re_arrow_store/tests/internals.rs +++ b/crates/re_arrow_store/tests/internals.rs @@ -112,7 +112,7 @@ fn pathological_bucket_topology() { { let num_buckets = store_forward .iter_indices() - .flat_map(|(_, table)| table.iter_buckets()) + .flat_map(|(_, table)| table.buckets.values()) .count(); assert_eq!( 7usize, @@ -127,7 +127,7 @@ fn pathological_bucket_topology() { { let num_buckets = store_backward .iter_indices() - .flat_map(|(_, table)| table.iter_buckets()) + .flat_map(|(_, table)| table.buckets.values()) .count(); assert_eq!( 8usize, diff --git a/crates/re_data_store/examples/memory_usage.rs b/crates/re_data_store/examples/memory_usage.rs index 105d7e1a00145..8cc4c8d27c217 100644 --- a/crates/re_data_store/examples/memory_usage.rs +++ b/crates/re_data_store/examples/memory_usage.rs @@ -57,7 +57,7 @@ fn main() { fn log_messages() { use re_log_types::{ datagen::{build_frame_nr, build_some_point2d}, - ArrowMsg, LogMsg, TimeInt, TimePoint, Timeline, + LogMsg, TimeInt, TimePoint, Timeline, }; // Note: we use Box in this function so that we also count the "static" @@ -116,7 +116,7 @@ fn log_messages() { .into_table(), ); let table_bytes = live_bytes() - used_bytes_start; - let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap())); + let log_msg = Box::new(LogMsg::ArrowMsg(table.as_arrow_msg().unwrap())); let log_msg_bytes = live_bytes() - used_bytes_start; println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM"); let encoded = encode_log_msg(&log_msg); @@ -139,7 +139,7 @@ fn log_messages() { .into_table(), ); let table_bytes = live_bytes() - used_bytes_start; - let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap())); + let log_msg = Box::new(LogMsg::ArrowMsg(table.as_arrow_msg().unwrap())); let log_msg_bytes = live_bytes() - used_bytes_start; println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM"); let encoded = encode_log_msg(&log_msg); diff --git a/crates/re_data_store/src/log_db.rs b/crates/re_data_store/src/log_db.rs index d8812b56f0a9f..8732ac4ae7893 100644 --- a/crates/re_data_store/src/log_db.rs +++ b/crates/re_data_store/src/log_db.rs @@ -53,7 +53,8 @@ impl EntityDb { } fn try_add_arrow_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> { - let table: DataTable = msg.try_into()?; + // TODO: pass sizes over the wire + let table = DataTable::from_arrow_msg(msg, true)?; // TODO(#1619): batch all of this for row in table.as_rows() { diff --git a/crates/re_log_types/benches/msg_encode_benchmark.rs b/crates/re_log_types/benches/msg_encode_benchmark.rs index d9131ef9f9f90..306c3e10eb11d 100644 --- a/crates/re_log_types/benches/msg_encode_benchmark.rs +++ b/crates/re_log_types/benches/msg_encode_benchmark.rs @@ -6,7 +6,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; use re_log_types::{ datagen::{build_frame_nr, build_some_colors, build_some_point2d}, - entity_path, ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId, + entity_path, DataRow, DataTable, Index, LogMsg, MsgId, }; use criterion::{criterion_group, criterion_main, Criterion}; @@ -45,7 +45,7 @@ fn decode_log_msgs(mut bytes: &[u8]) -> Vec { fn generate_messages(tables: &[DataTable]) -> Vec { tables .iter() - .map(|table| LogMsg::ArrowMsg(ArrowMsg::try_from(table).unwrap())) + .map(|table| LogMsg::ArrowMsg(table.as_arrow_msg().unwrap())) .collect() } @@ -54,7 +54,7 @@ fn decode_tables(messages: &[LogMsg]) -> Vec { .iter() .map(|log_msg| { if let LogMsg::ArrowMsg(arrow_msg) = log_msg { - DataTable::try_from(arrow_msg).unwrap() + DataTable::from_arrow_msg(arrow_msg, false).unwrap() } else { unreachable!() } diff --git a/crates/re_log_types/src/arrow_msg.rs b/crates/re_log_types/src/arrow_msg.rs index 81f48c032057f..15df060d9a2e5 100644 --- a/crates/re_log_types/src/arrow_msg.rs +++ b/crates/re_log_types/src/arrow_msg.rs @@ -140,10 +140,10 @@ mod tests { ); let table_in = row.into_table(); - let msg_in: ArrowMsg = (&table_in).try_into().unwrap(); + let msg_in = table_in.as_arrow_msg().unwrap(); let buf = rmp_serde::to_vec(&msg_in).unwrap(); let msg_out: ArrowMsg = rmp_serde::from_slice(&buf).unwrap(); - let table_out: DataTable = (&msg_out).try_into().unwrap(); + let table_out = DataTable::from_arrow_msg(&msg_out, true).unwrap(); assert_eq!(msg_in, msg_out); assert_eq!(table_in, table_out); diff --git a/crates/re_log_types/src/data_cell.rs b/crates/re_log_types/src/data_cell.rs index 4a198ef365f8f..a1fb3098d8e04 100644 --- a/crates/re_log_types/src/data_cell.rs +++ b/crates/re_log_types/src/data_cell.rs @@ -114,6 +114,12 @@ pub struct DataCellInner { // TODO(#1696): Store this within the datatype itself. pub(crate) name: ComponentName, + /// The size in bytes of the underlying arrow data. + /// + /// This is always zero unless [`Self::compute_size_bytes`] has been called, which is a very + /// costly operation. + pub(crate) values_size_bytes: u64, + /// A uniformly typed list of values for the given component type: `[C, C, C, ...]` /// /// Includes the data, its schema and probably soon the component metadata @@ -223,7 +229,11 @@ impl DataCell { values: Box, ) -> DataCellResult { Ok(Self { - inner: Arc::new(DataCellInner { name, values }), + inner: Arc::new(DataCellInner { + name, + values_size_bytes: 0, + values, + }), }) } @@ -256,11 +266,16 @@ impl DataCell { datatype: arrow2::datatypes::DataType, ) -> DataCellResult { // TODO(cmc): check that it is indeed a component datatype + + let mut inner = DataCellInner { + name, + values_size_bytes: 0, + values: arrow2::array::new_empty_array(datatype), + }; + inner.compute_size_bytes(); + Ok(Self { - inner: Arc::new(DataCellInner { - name, - values: arrow2::array::new_empty_array(datatype), - }), + inner: Arc::new(inner), }) } @@ -426,6 +441,28 @@ impl DataCell { _ => Err(DataCellError::UnsupportedDatatype(arr.data_type().clone())), } } + + /// Returns the total (heap) allocated size of the cell in bytes, provided that + /// [`Self::compute_size_bytes`] has been called first. + /// + /// This is an approximation, accurate enough for most purposes (stats, GC trigger, ...). + /// + /// This is `O(1)`, the value is computed and cached by calling [`Self::compute_size_bytes`]. + #[inline] + pub fn size_bytes(&self) -> u64 { + let Self { inner } = self; + let DataCellInner { + name, + values_size_bytes, + values, + } = &**inner; + + (std::mem::size_of_val(inner) + + std::mem::size_of_val(name) + + std::mem::size_of_val(values_size_bytes) + + std::mem::size_of_val(values)) as u64 + + *values_size_bytes + } } // --- @@ -457,6 +494,10 @@ impl From<&Vec> for DataCell { impl std::fmt::Display for DataCell { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!( + "DataCell({})", + re_format::format_bytes(self.size_bytes() as _) + ))?; re_format::arrow::format_table( // NOTE: wrap in a ListArray so that it looks more cell-like (i.e. single row) [&*self.as_arrow_monolist()], @@ -469,15 +510,34 @@ impl std::fmt::Display for DataCell { // --- impl DataCell { - /// Returns the total (heap) allocated size of the array in bytes. + /// Compute and cache the total (heap) allocated size of the underlying arrow array in bytes. + /// This does nothing if the size has already been computed and cached before. /// - /// Beware: this is costly! Cache the returned value as much as possible. - pub fn size_bytes(&self) -> u64 { - let DataCellInner { name, values } = &*self.inner; + /// The caller must the sole owner of this cell, as this requires mutating an `Arc` under the + /// hood. Returns false otherwise. + /// + /// Beware: this is _very_ costly! + #[inline] + pub fn compute_size_bytes(&mut self) -> bool { + if let Some(inner) = Arc::get_mut(&mut self.inner) { + inner.compute_size_bytes(); + return true; + } + false + } +} - std::mem::size_of_val(name) as u64 + - // Warning: this is surprisingly costly! - arrow2::compute::aggregate::estimated_bytes_size(&**values) as u64 +impl DataCellInner { + /// Compute and cache the total (heap) allocated size of the underlying arrow array in bytes. + /// This does nothing if the size has already been computed and cached before. + /// + /// Beware: this is _very_ costly! + #[inline] + pub fn compute_size_bytes(&mut self) { + if self.values_size_bytes == 0 { + self.values_size_bytes = + arrow2::compute::aggregate::estimated_bytes_size(&*self.values) as u64; + } } } diff --git a/crates/re_log_types/src/data_row.rs b/crates/re_log_types/src/data_row.rs index 3122ad478a539..04304ece662ee 100644 --- a/crates/re_log_types/src/data_row.rs +++ b/crates/re_log_types/src/data_row.rs @@ -326,6 +326,18 @@ impl DataRow { .map(|cell| cell.component_name()) .position(|name| name == *component) } + + /// Compute and cache the total (heap) allocated size of each individual underlying + /// [`DataCell`]. + /// This does nothing for cells whose size has already been computed and cached before. + /// + /// Beware: this is _very_ costly! + #[inline] + pub fn compute_all_size_bytes(&mut self) { + for cell in &mut self.cells.0 { + cell.compute_size_bytes(); + } + } } // --- diff --git a/crates/re_log_types/src/data_table.rs b/crates/re_log_types/src/data_table.rs index 7ac6dbd3dbc37..67c2f3b69ef88 100644 --- a/crates/re_log_types/src/data_table.rs +++ b/crates/re_log_types/src/data_table.rs @@ -97,6 +97,18 @@ impl DataCellColumn { pub fn empty(num_rows: usize) -> Self { Self(smallvec::smallvec![None; num_rows]) } + + /// Compute and cache the total (heap) allocated size of each individual underlying + /// [`DataCell`]. + /// This does nothing for cells whose size has already been computed and cached before. + /// + /// Beware: this is _very_ costly! + #[inline] + pub fn compute_all_size_bytes(&mut self) { + for cell in &mut self.0 { + cell.as_mut().map(|cell| cell.compute_size_bytes()); + } + } } // --- @@ -433,6 +445,18 @@ impl DataTable { .iter() .fold(TimePoint::timeless(), |acc, tp| acc.union_max(tp)) } + + /// Compute and cache the total (heap) allocated size of each individual underlying + /// [`DataCell`]. + /// This does nothing for cells whose size has already been computed and cached before. + /// + /// Beware: this is _very_ costly! + #[inline] + pub fn compute_all_size_bytes(&mut self) { + for column in self.columns.values_mut() { + column.compute_all_size_bytes(); + } + } } // --- Serialization --- @@ -699,10 +723,15 @@ impl DataTable { impl DataTable { /// Deserializes an entire table from an arrow payload and schema. + /// + /// If `compute_cell_sizes` is enabled, this will estimate the size of the arrow data within + /// each cell and cache it there. + /// See [`DataCellInner::compute_size_bytes`] for more information. pub fn deserialize( table_id: TableId, schema: &Schema, chunk: &Chunk>, + compute_cell_sizes: bool, ) -> DataTableResult { crate::profile_function!(); @@ -749,7 +778,7 @@ impl DataTable { .get(index) .ok_or(DataTableError::MissingColumn(name.to_owned())) .and_then(|column| { - Self::deserialize_data_column(component, &**column) + Self::deserialize_data_column(component, &**column, compute_cell_sizes) .map(|data| (component, data)) }) }) @@ -767,9 +796,14 @@ impl DataTable { } /// Deserializes a sparse data column. + /// + /// If `compute_cell_sizes` is enabled, this will estimate the size of the arrow data within + /// each cell and cache it there. + /// See [`DataCellInner::compute_size_bytes`] for more information. fn deserialize_data_column( component: ComponentName, column: &dyn Array, + compute_cell_sizes: bool, ) -> DataTableResult { Ok(DataCellColumn( column @@ -777,7 +811,15 @@ impl DataTable { .downcast_ref::>() .ok_or(DataTableError::NotAColumn(component.to_string()))? .iter() - .map(|array| array.map(|values| DataCell::from_arrow(component, values))) + .map(|array| { + array.map(|values| { + let mut cell = DataCell::from_arrow(component, values); + if compute_cell_sizes { + cell.compute_size_bytes(); // WARN: costly! + } + cell + }) + }) .collect(), )) } @@ -785,11 +827,14 @@ impl DataTable { // --- -impl TryFrom<&ArrowMsg> for DataTable { - type Error = DataTableError; - +impl DataTable { + /// Deserializes the contents of an [`ArrowMsg`] into a `DataTable`. + /// + /// If `compute_cell_sizes` is enabled, this will estimate the size of the arrow data within + /// each cell and cache it there. + /// See [`DataCellInner::compute_size_bytes`] for more information. #[inline] - fn try_from(msg: &ArrowMsg) -> DataTableResult { + pub fn from_arrow_msg(msg: &ArrowMsg, compute_cell_sizes: bool) -> DataTableResult { let ArrowMsg { table_id, timepoint_max: _, @@ -797,20 +842,19 @@ impl TryFrom<&ArrowMsg> for DataTable { chunk, } = msg; - Self::deserialize(*table_id, schema, chunk) + Self::deserialize(*table_id, schema, chunk, compute_cell_sizes) } -} - -impl TryFrom<&DataTable> for ArrowMsg { - type Error = DataTableError; + /// Serializes the contents of a `DataTable` into an [`ArrowMsg`]. + // + // TODO: support serializing the cell size itself, so it can be computed on the clients. #[inline] - fn try_from(table: &DataTable) -> DataTableResult { - let timepoint_max = table.timepoint_max(); - let (schema, chunk) = table.serialize()?; + pub fn as_arrow_msg(&self) -> DataTableResult { + let timepoint_max = self.timepoint_max(); + let (schema, chunk) = self.serialize()?; Ok(ArrowMsg { - table_id: table.table_id, + table_id: self.table_id, timepoint_max, schema, chunk, @@ -895,6 +939,9 @@ impl DataTable { ) }; - DataTable::from_rows(table_id, [row0, row1, row2]) + let mut table = DataTable::from_rows(table_id, [row0, row1, row2]); + table.compute_all_size_bytes(); + + table } } diff --git a/crates/re_sdk/src/msg_sender.rs b/crates/re_sdk/src/msg_sender.rs index 790a06e71bd41..b11f98f755f70 100644 --- a/crates/re_sdk/src/msg_sender.rs +++ b/crates/re_sdk/src/msg_sender.rs @@ -243,15 +243,17 @@ impl MsgSender { let [row_standard, row_transforms, row_splats] = self.into_rows(); if let Some(row_transforms) = row_transforms { - sink.send(LogMsg::ArrowMsg((&row_transforms.into_table()).try_into()?)); + sink.send(LogMsg::ArrowMsg( + row_transforms.into_table().as_arrow_msg()?, + )); } if let Some(row_splats) = row_splats { - sink.send(LogMsg::ArrowMsg((&row_splats.into_table()).try_into()?)); + sink.send(LogMsg::ArrowMsg(row_splats.into_table().as_arrow_msg()?)); } // Always the primary component last so range-based queries will include the other data. // Since the primary component can't be splatted it must be in msg_standard, see(#1215). if let Some(row_standard) = row_standard { - sink.send(LogMsg::ArrowMsg((&row_standard.into_table()).try_into()?)); + sink.send(LogMsg::ArrowMsg(row_standard.into_table().as_arrow_msg()?)); } Ok(()) diff --git a/crates/re_viewer/src/ui/data_ui/log_msg.rs b/crates/re_viewer/src/ui/data_ui/log_msg.rs index b536284f6eebe..926889f728ac2 100644 --- a/crates/re_viewer/src/ui/data_ui/log_msg.rs +++ b/crates/re_viewer/src/ui/data_ui/log_msg.rs @@ -101,7 +101,7 @@ impl DataUi for ArrowMsg { verbosity: UiVerbosity, query: &re_arrow_store::LatestAtQuery, ) { - let table: DataTable = match self.try_into() { + let table = match DataTable::from_arrow_msg(self, false) { Ok(table) => table, Err(err) => { ui.label( diff --git a/crates/re_viewer/src/ui/event_log_view.rs b/crates/re_viewer/src/ui/event_log_view.rs index 94c96edddfe8a..fd937fd7f2d8d 100644 --- a/crates/re_viewer/src/ui/event_log_view.rs +++ b/crates/re_viewer/src/ui/event_log_view.rs @@ -176,7 +176,7 @@ fn table_row( // NOTE: This really only makes sense because we don't yet have batches with more than a // single row at the moment... and by the time we do, the event log view will have // disappeared entirely. - LogMsg::ArrowMsg(msg) => match DataTable::try_from(msg) { + LogMsg::ArrowMsg(msg) => match DataTable::from_arrow_msg(msg, false) { Ok(table) => { for datarow in table.as_rows() { row.col(|ui| { diff --git a/crates/re_viewer/src/ui/memory_panel.rs b/crates/re_viewer/src/ui/memory_panel.rs index bff514d5f7b84..1a390ed267a86 100644 --- a/crates/re_viewer/src/ui/memory_panel.rs +++ b/crates/re_viewer/src/ui/memory_panel.rs @@ -26,7 +26,7 @@ impl MemoryPanel { (gpu_resource_stats.total_buffer_size_in_bytes + gpu_resource_stats.total_texture_size_in_bytes) as _, ), - Some(store_stats.total_index_size_bytes as _), + Some(store_stats.total_size_bytes as _), ); } @@ -186,7 +186,6 @@ impl MemoryPanel { ui.label(egui::RichText::new("Limits").italics()); ui.label("Row limit"); - ui.label("Size limit"); ui.end_row(); let label_rows = |ui: &mut egui::Ui, num_rows| { @@ -197,7 +196,11 @@ impl MemoryPanel { } }; - ui.label("Indices:"); + ui.label("Timeless:"); + label_rows(ui, u64::MAX); + ui.end_row(); + + ui.label("Temporal:"); label_rows(ui, config.indexed_bucket_num_rows); ui.end_row(); }); @@ -208,13 +211,13 @@ impl MemoryPanel { .num_columns(3) .show(ui, |ui| { let DataStoreStats { - total_timeless_index_rows, - total_timeless_index_size_bytes, - total_temporal_index_rows, - total_temporal_index_size_bytes, - total_temporal_index_buckets, - total_index_rows, - total_index_size_bytes, + total_timeless_rows, + total_timeless_size_bytes, + total_temporal_rows, + total_temporal_size_bytes, + total_temporal_buckets, + total_rows, + total_size_bytes, config: _, } = *store_stats; @@ -232,22 +235,22 @@ impl MemoryPanel { let label_size = |ui: &mut egui::Ui, size| ui.label(re_format::format_bytes(size as _)); - ui.label("Indices (timeless):"); + ui.label("Timeless:"); ui.label(""); - label_rows(ui, total_timeless_index_rows); - label_size(ui, total_timeless_index_size_bytes); + label_rows(ui, total_timeless_rows); + label_size(ui, total_timeless_size_bytes); ui.end_row(); - ui.label("Indices (temporal):"); - label_buckets(ui, total_temporal_index_buckets); - label_rows(ui, total_temporal_index_rows); - label_size(ui, total_temporal_index_size_bytes); + ui.label("Temporal:"); + label_buckets(ui, total_temporal_buckets); + label_rows(ui, total_temporal_rows); + label_size(ui, total_temporal_size_bytes); ui.end_row(); - ui.label("Indices (total):"); - label_buckets(ui, total_temporal_index_buckets); - label_rows(ui, total_index_rows); - label_size(ui, total_index_size_bytes); + ui.label("Total"); + label_buckets(ui, total_temporal_buckets); + label_rows(ui, total_rows); + label_size(ui, total_size_bytes); ui.end_row(); }); } diff --git a/rerun_py/src/arrow.rs b/rerun_py/src/arrow.rs index 8e75cc60bc4d1..3edd4bb297e14 100644 --- a/rerun_py/src/arrow.rs +++ b/rerun_py/src/arrow.rs @@ -112,8 +112,9 @@ pub fn build_chunk_from_components( cells, ); - let msg = (&row.into_table()) - .try_into() + let msg = row + .into_table() + .as_arrow_msg() .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; Ok(LogMsg::ArrowMsg(msg)) diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 0437757f6b3fa..9dd1f1ba3b77b 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -472,8 +472,9 @@ fn log_transform( [transform].as_slice(), ); - let msg = (&row.into_table()) - .try_into() + let msg = row + .into_table() + .as_arrow_msg() .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; session.send(LogMsg::ArrowMsg(msg)); @@ -556,8 +557,9 @@ fn log_view_coordinates( [coordinates].as_slice(), ); - let msg = (&row.into_table()) - .try_into() + let msg = row + .into_table() + .as_arrow_msg() .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; session.send(LogMsg::ArrowMsg(msg)); @@ -690,8 +692,9 @@ fn log_meshes( meshes, ); - let msg = (&row.into_table()) - .try_into() + let msg = row + .into_table() + .as_arrow_msg() .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; session.send(LogMsg::ArrowMsg(msg)); @@ -771,8 +774,9 @@ fn log_mesh_file( [mesh3d].as_slice(), ); - let msg = (&row.into_table()) - .try_into() + let msg = row + .into_table() + .as_arrow_msg() .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; session.send(LogMsg::ArrowMsg(msg)); @@ -863,8 +867,9 @@ fn log_image_file( [tensor].as_slice(), ); - let msg = (&row.into_table()) - .try_into() + let msg = row + .into_table() + .as_arrow_msg() .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; session.send(LogMsg::ArrowMsg(msg)); @@ -942,8 +947,9 @@ fn log_annotation_context( [annotation_context].as_slice(), ); - let msg = (&row.into_table()) - .try_into() + let msg = row + .into_table() + .as_arrow_msg() .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; session.send(LogMsg::ArrowMsg(msg));