From f15a597b653f8a7f8efb28045bbfc9c1dab7a795 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 12 Apr 2023 17:27:03 +0200 Subject: [PATCH] Datastore revamp 3: efficient incremental stats --- crates/re_arrow_store/benches/arrow2.rs | 2 +- .../re_arrow_store/benches/arrow2_convert.rs | 2 +- crates/re_arrow_store/src/store.rs | 68 ++--- crates/re_arrow_store/src/store_arrow.rs | 3 +- crates/re_arrow_store/src/store_format.rs | 17 +- crates/re_arrow_store/src/store_polars.rs | 3 +- crates/re_arrow_store/src/store_read.rs | 10 +- crates/re_arrow_store/src/store_sanity.rs | 210 ++++++++------ crates/re_arrow_store/src/store_stats.rs | 265 ++++++++++++++---- crates/re_arrow_store/src/store_write.rs | 140 +++++---- crates/re_arrow_store/tests/data_store.rs | 6 +- crates/re_arrow_store/tests/internals.rs | 4 +- crates/re_data_store/examples/memory_usage.rs | 6 +- crates/re_data_store/src/entity_properties.rs | 2 +- crates/re_data_store/src/log_db.rs | 8 +- .../benches/msg_encode_benchmark.rs | 6 +- crates/re_log_types/src/arrow_msg.rs | 14 +- crates/re_log_types/src/data_cell.rs | 155 ++++++++-- crates/re_log_types/src/data_row.rs | 14 +- crates/re_log_types/src/data_table.rs | 62 +++- crates/re_query/src/query.rs | 4 +- crates/re_query/src/range.rs | 4 +- crates/re_sdk/src/msg_sender.rs | 6 +- crates/re_viewer/src/ui/data_ui/log_msg.rs | 4 +- crates/re_viewer/src/ui/event_log_view.rs | 4 +- crates/re_viewer/src/ui/memory_panel.rs | 45 +-- rerun_py/src/python_bridge.rs | 52 +--- rerun_py/src/python_session.rs | 17 +- 28 files changed, 755 insertions(+), 378 deletions(-) diff --git a/crates/re_arrow_store/benches/arrow2.rs b/crates/re_arrow_store/benches/arrow2.rs index a74d875c9945..77f682c57dc0 100644 --- a/crates/re_arrow_store/benches/arrow2.rs +++ b/crates/re_arrow_store/benches/arrow2.rs @@ -241,7 +241,7 @@ fn estimated_size_bytes(c: &mut Criterion) { { let cells = generate_cells(kind); - let arrays = cells.iter().map(|cell| cell.as_arrow()).collect_vec(); + let arrays = cells.iter().map(|cell| cell.to_arrow()).collect_vec(); let total_instances = arrays.iter().map(|array| array.len() as u32).sum::(); assert_eq!(total_instances, (NUM_ROWS * NUM_INSTANCES) as u32); diff --git a/crates/re_arrow_store/benches/arrow2_convert.rs b/crates/re_arrow_store/benches/arrow2_convert.rs index db96a115c257..e0d4ce95abdc 100644 --- a/crates/re_arrow_store/benches/arrow2_convert.rs +++ b/crates/re_arrow_store/benches/arrow2_convert.rs @@ -95,7 +95,7 @@ fn deserialize(c: &mut Criterion) { group.throughput(criterion::Throughput::Elements(NUM_INSTANCES as _)); let cell = DataCell::from_component::(0..NUM_INSTANCES as u64); - let data = cell.as_arrow(); + let data = cell.to_arrow(); { group.bench_function("arrow2_convert", |b| { diff --git a/crates/re_arrow_store/src/store.rs b/crates/re_arrow_store/src/store.rs index c8824e401671..0b45171cf54f 100644 --- a/crates/re_arrow_store/src/store.rs +++ b/crates/re_arrow_store/src/store.rs @@ -262,7 +262,7 @@ fn datastore_internal_repr() { }, ); - let timeless = DataTable::example(false); + let timeless = DataTable::example(true); eprintln!("{timeless}"); store.insert_table(&timeless).unwrap(); @@ -317,38 +317,31 @@ pub struct IndexedTable { /// to free up space. pub all_components: IntSet, - /// The total number of rows in this indexed table, accounting for all buckets. - pub total_rows: u64, + /// The number of rows stored in this table, across all of its buckets. + pub buckets_num_rows: u64, - /// The size of this table in bytes across all of its buckets, accounting for both data and - /// metadata. + /// The size of both the control & component data stored in this table, across all of its + /// buckets, in bytes. /// - /// Accurately computing the size of arrow arrays is surprisingly costly, which is why we - /// cache this. - /// Also: there are many buckets. - pub total_size_bytes: u64, + /// This is a best-effort approximation, adequate for most purposes (stats, + /// triggering GCs, ...). + pub buckets_size_bytes: u64, } impl IndexedTable { pub fn new(cluster_key: ComponentName, timeline: Timeline, ent_path: EntityPath) -> Self { + let bucket = IndexedBucket::new(cluster_key, timeline); + let buckets_size_bytes = bucket.size_bytes(); Self { timeline, ent_path, - buckets: [(i64::MIN.into(), IndexedBucket::new(cluster_key, timeline))].into(), + buckets: [(i64::MIN.into(), bucket)].into(), cluster_key, all_components: Default::default(), - total_rows: 0, - total_size_bytes: 0, // TODO(#1619) + buckets_num_rows: 0, + buckets_size_bytes, } } - - /// Returns a read-only iterator over the raw buckets. - /// - /// Do _not_ use this to try and test the internal state of the datastore. - #[doc(hidden)] - pub fn iter_buckets(&self) -> impl ExactSizeIterator { - self.buckets.values() - } } /// An `IndexedBucket` holds a chunk of rows from an [`IndexedTable`] @@ -414,16 +407,18 @@ pub struct IndexedBucketInner { /// (i.e. the table is sparse). pub columns: IntMap, - /// The size of this bucket in bytes, accounting for both data and metadata. + /// The size of both the control & component data stored in this bucket, in bytes. /// - /// Accurately computing the size of arrow arrays is surprisingly costly, which is why we - /// cache this. - pub total_size_bytes: u64, + /// This is a best-effort approximation, adequate for most purposes (stats, + /// triggering GCs, ...). + /// + /// We cache this because there can be many, many buckets. + pub size_bytes: u64, } impl Default for IndexedBucketInner { fn default() -> Self { - Self { + let mut this = Self { is_sorted: true, time_range: TimeRange::new(i64::MAX.into(), i64::MIN.into()), col_time: Default::default(), @@ -431,8 +426,10 @@ impl Default for IndexedBucketInner { col_row_id: Default::default(), col_num_instances: Default::default(), columns: Default::default(), - total_size_bytes: 0, // TODO(#1619) - } + size_bytes: 0, // NOTE: computed below + }; + this.compute_size_bytes(); + this } } @@ -476,15 +473,20 @@ pub struct PersistentIndexedTable { /// The cells are optional since not all rows will have data for every single component /// (i.e. the table is sparse). pub columns: IntMap, - - /// The size of this indexed table in bytes, accounting for both data and metadata. - /// - /// Accurately computing the size of arrow arrays is surprisingly costly, which is why we - /// cache this. - pub total_size_bytes: u64, } impl PersistentIndexedTable { + pub fn new(cluster_key: ComponentName, ent_path: EntityPath) -> Self { + Self { + cluster_key, + ent_path, + col_insert_id: Default::default(), + col_row_id: Default::default(), + col_num_instances: Default::default(), + columns: Default::default(), + } + } + pub fn is_empty(&self) -> bool { self.col_num_instances.is_empty() } diff --git a/crates/re_arrow_store/src/store_arrow.rs b/crates/re_arrow_store/src/store_arrow.rs index f9016ecde50e..b7db9157855e 100644 --- a/crates/re_arrow_store/src/store_arrow.rs +++ b/crates/re_arrow_store/src/store_arrow.rs @@ -38,7 +38,7 @@ impl IndexedBucket { col_row_id, col_num_instances, columns, - total_size_bytes: _, + size_bytes: _, } = &*inner.read(); serialize( @@ -72,7 +72,6 @@ impl PersistentIndexedTable { col_row_id, col_num_instances, columns, - total_size_bytes: _, } = self; serialize( diff --git a/crates/re_arrow_store/src/store_format.rs b/crates/re_arrow_store/src/store_format.rs index 9974f725c228..975f2f81dd23 100644 --- a/crates/re_arrow_store/src/store_format.rs +++ b/crates/re_arrow_store/src/store_format.rs @@ -34,8 +34,8 @@ impl std::fmt::Display for DataStore { format!( "{} timeless indexed tables, for a total of {} across {} total rows\n", timeless_tables.len(), - format_bytes(self.total_timeless_index_size_bytes() as _), - format_number(self.total_timeless_index_rows() as _) + format_bytes(self.total_timeless_size_bytes() as _), + format_number(self.total_timeless_rows() as _) ), ))?; f.write_str(&indent::indent_all_by(4, "timeless_tables: [\n"))?; @@ -53,8 +53,8 @@ impl std::fmt::Display for DataStore { format!( "{} indexed tables, for a total of {} across {} total rows\n", tables.len(), - format_bytes(self.total_temporal_index_size_bytes() as _), - format_number(self.total_temporal_index_rows() as _) + format_bytes(self.total_temporal_size_bytes() as _), + format_number(self.total_temporal_rows() as _) ), ))?; f.write_str(&indent::indent_all_by(4, "tables: [\n"))?; @@ -83,8 +83,8 @@ impl std::fmt::Display for IndexedTable { buckets, cluster_key: _, all_components: _, - total_rows: _, - total_size_bytes: _, + buckets_num_rows: _, + buckets_size_bytes: _, } = self; f.write_fmt(format_args!("timeline: {}\n", timeline.name()))?; @@ -116,8 +116,8 @@ impl std::fmt::Display for IndexedBucket { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!( "size: {} across {} rows\n", - format_bytes(self.total_size_bytes() as _), - format_number(self.total_rows() as _), + format_bytes(self.size_bytes() as _), + format_number(self.num_rows() as _), ))?; let time_range = { @@ -156,7 +156,6 @@ impl std::fmt::Display for PersistentIndexedTable { col_row_id: _, col_num_instances: _, columns: _, - total_size_bytes: _, } = self; f.write_fmt(format_args!("entity: {ent_path}\n"))?; diff --git a/crates/re_arrow_store/src/store_polars.rs b/crates/re_arrow_store/src/store_polars.rs index 494b320aad55..6ef71c714377 100644 --- a/crates/re_arrow_store/src/store_polars.rs +++ b/crates/re_arrow_store/src/store_polars.rs @@ -176,7 +176,6 @@ impl PersistentIndexedTable { col_row_id, col_num_instances, columns, - total_size_bytes: _, } = self; let num_rows = self.total_rows() as usize; @@ -217,7 +216,7 @@ impl IndexedBucket { col_row_id, col_num_instances, columns, - total_size_bytes: _, + size_bytes: _, } = &*self.inner.read(); let (_, times) = DataTable::serialize_primitive_column( diff --git a/crates/re_arrow_store/src/store_read.rs b/crates/re_arrow_store/src/store_read.rs index aa3038b5885b..4eea9ed463e7 100644 --- a/crates/re_arrow_store/src/store_read.rs +++ b/crates/re_arrow_store/src/store_read.rs @@ -173,7 +173,7 @@ impl DataStore { /// .map(|cell| { /// Series::try_from(( /// cell.component_name().as_str(), - /// cell.as_arrow(), + /// cell.to_arrow(), /// )) /// }) /// .collect(); @@ -332,7 +332,7 @@ impl DataStore { /// # .map(|cell| { /// # Series::try_from(( /// # cell.component_name().as_str(), - /// # cell.as_arrow(), + /// # cell.to_arrow(), /// # )) /// # }) /// # .collect(); @@ -672,7 +672,7 @@ impl IndexedBucket { col_row_id: _, col_num_instances: _, columns, - total_size_bytes: _, // TODO(#1619) + size_bytes: _, } = &*self.inner.read(); debug_assert!(is_sorted); @@ -787,7 +787,7 @@ impl IndexedBucket { col_row_id, col_num_instances: _, columns, - total_size_bytes: _, // TODO(#1619) + size_bytes: _, } = &*self.inner.read(); debug_assert!(is_sorted); @@ -899,7 +899,7 @@ impl IndexedBucketInner { col_row_id, col_num_instances, columns, - total_size_bytes: _, + size_bytes: _, } = self; if *is_sorted { diff --git a/crates/re_arrow_store/src/store_sanity.rs b/crates/re_arrow_store/src/store_sanity.rs index 7baa52de5230..0bf32c720cab 100644 --- a/crates/re_arrow_store/src/store_sanity.rs +++ b/crates/re_arrow_store/src/store_sanity.rs @@ -11,6 +11,20 @@ use crate::{DataStore, IndexedBucket, IndexedBucketInner, IndexedTable, Persiste /// These violations can only stem from a bug in the store's implementation itself. #[derive(thiserror::Error, Debug)] pub enum SanityError { + #[error("Reported size for {origin} is out of sync: got {got}, expected {expected}")] + SizeOutOfSync { + origin: &'static str, + expected: String, + got: String, + }, + + #[error("Reported number of rows for {origin} is out of sync: got {got}, expected {expected}")] + RowsOutOfSync { + origin: &'static str, + expected: String, + got: String, + }, + #[error("Column '{component}' has too few/many rows: got {got} instead of {expected}")] ColumnLengthMismatch { component: ComponentName, @@ -56,77 +70,7 @@ impl DataStore { } } -// --- Persistent Indices --- - -impl PersistentIndexedTable { - /// Runs the sanity check suite for the entire table. - /// - /// Returns an error if anything looks wrong. - pub fn sanity_check(&self) -> SanityResult<()> { - crate::profile_function!(); - - let Self { - ent_path: _, - cluster_key, - col_insert_id, - col_row_id, - col_num_instances, - columns, - total_size_bytes: _, // TODO(#1619) - } = self; - - // All columns should be `Self::num_rows` long. - { - let num_rows = self.total_rows(); - - let column_lengths = [ - (!col_insert_id.is_empty()) - .then(|| (DataStore::insert_id_key(), col_insert_id.len())), // - Some((COLUMN_ROW_ID.into(), col_row_id.len())), - Some((COLUMN_NUM_INSTANCES.into(), col_num_instances.len())), - ] - .into_iter() - .flatten() - .chain( - columns - .iter() - .map(|(component, column)| (*component, column.len())), - ) - .map(|(component, len)| (component, len as u64)); - - for (component, len) in column_lengths { - if len != num_rows { - return Err(SanityError::ColumnLengthMismatch { - component, - expected: num_rows, - got: len, - }); - } - } - } - - // The cluster column must be fully dense. - { - let cluster_column = - columns - .get(cluster_key) - .ok_or(SanityError::ClusterColumnMissing { - cluster_key: *cluster_key, - })?; - if !cluster_column.iter().all(|cell| cell.is_some()) { - return Err(SanityError::ClusterColumnSparse { - cluster_column: cluster_column.clone().into(), - }); - } - } - - // TODO(#1619): recomputing shouldnt change the size - - Ok(()) - } -} - -// --- Indices --- +// --- Temporal --- impl IndexedTable { /// Runs the sanity check suite for the entire table. @@ -155,6 +99,32 @@ impl IndexedTable { } } + // Make sure row numbers aren't out of sync + { + let total_rows = self.total_rows(); + let total_rows_uncached = self.total_rows_uncached(); + if total_rows != total_rows_uncached { + return Err(SanityError::RowsOutOfSync { + origin: std::any::type_name::(), + expected: re_format::format_number(total_rows_uncached as _), + got: re_format::format_number(total_rows as _), + }); + } + } + + // Make sure size values aren't out of sync + { + let total_size_bytes = self.total_size_bytes(); + let total_size_bytes_uncached = self.total_size_bytes_uncached(); + if total_size_bytes != total_size_bytes_uncached { + return Err(SanityError::SizeOutOfSync { + origin: std::any::type_name::(), + expected: re_format::format_bytes(total_size_bytes_uncached as _), + got: re_format::format_bytes(total_size_bytes as _), + }); + } + } + // Run individual bucket sanity check suites too. for bucket in self.buckets.values() { bucket.sanity_check()?; @@ -177,16 +147,99 @@ impl IndexedBucket { inner, } = self; - let IndexedBucketInner { - is_sorted: _, - time_range: _, - col_time, + { + let IndexedBucketInner { + is_sorted: _, + time_range: _, + col_time, + col_insert_id, + col_row_id, + col_num_instances, + columns, + size_bytes: _, + } = &*inner.read(); + + // All columns should be `Self::num_rows` long. + { + let num_rows = self.num_rows(); + + let column_lengths = [ + (!col_insert_id.is_empty()) + .then(|| (DataStore::insert_id_key(), col_insert_id.len())), // + Some((COLUMN_TIMEPOINT.into(), col_time.len())), + Some((COLUMN_ROW_ID.into(), col_row_id.len())), + Some((COLUMN_NUM_INSTANCES.into(), col_num_instances.len())), + ] + .into_iter() + .flatten() + .chain( + columns + .iter() + .map(|(component, column)| (*component, column.len())), + ) + .map(|(component, len)| (component, len as u64)); + + for (component, len) in column_lengths { + if len != num_rows { + return Err(SanityError::ColumnLengthMismatch { + component, + expected: num_rows, + got: len, + }); + } + } + } + + // The cluster column must be fully dense. + { + let cluster_column = + columns + .get(cluster_key) + .ok_or(SanityError::ClusterColumnMissing { + cluster_key: *cluster_key, + })?; + if !cluster_column.iter().all(|cell| cell.is_some()) { + return Err(SanityError::ClusterColumnSparse { + cluster_column: cluster_column.clone().into(), + }); + } + } + } + + // Make sure size values aren't out of sync + { + let size_bytes = inner.read().size_bytes; + let size_bytes_uncached = inner.write().compute_size_bytes(); + if size_bytes != size_bytes_uncached { + return Err(SanityError::SizeOutOfSync { + origin: std::any::type_name::(), + expected: re_format::format_bytes(size_bytes_uncached as _), + got: re_format::format_bytes(size_bytes as _), + }); + } + } + + Ok(()) + } +} + +// --- Timeless --- + +impl PersistentIndexedTable { + /// Runs the sanity check suite for the entire table. + /// + /// Returns an error if anything looks wrong. + pub fn sanity_check(&self) -> SanityResult<()> { + crate::profile_function!(); + + let Self { + ent_path: _, + cluster_key, col_insert_id, col_row_id, col_num_instances, columns, - total_size_bytes: _, // TODO(#1619) - } = &*inner.read(); + } = self; // All columns should be `Self::num_rows` long. { @@ -195,7 +248,6 @@ impl IndexedBucket { let column_lengths = [ (!col_insert_id.is_empty()) .then(|| (DataStore::insert_id_key(), col_insert_id.len())), // - Some((COLUMN_TIMEPOINT.into(), col_time.len())), Some((COLUMN_ROW_ID.into(), col_row_id.len())), Some((COLUMN_NUM_INSTANCES.into(), col_num_instances.len())), ] @@ -234,8 +286,6 @@ impl IndexedBucket { } } - // TODO(#1619): recomputing shouldnt change the size - Ok(()) } } diff --git a/crates/re_arrow_store/src/store_stats.rs b/crates/re_arrow_store/src/store_stats.rs index fb836133d5da..58d738091269 100644 --- a/crates/re_arrow_store/src/store_stats.rs +++ b/crates/re_arrow_store/src/store_stats.rs @@ -1,19 +1,24 @@ -use crate::{DataStore, DataStoreConfig, IndexedBucket, IndexedTable, PersistentIndexedTable}; +use nohash_hasher::IntMap; +use re_log_types::{ComponentName, DataCellColumn}; + +use crate::{ + store::IndexedBucketInner, DataStore, DataStoreConfig, IndexedBucket, IndexedTable, + PersistentIndexedTable, +}; // --- -// TODO(cmc): compute incrementally once/if this becomes too expensive. -#[derive(Default, Debug)] +#[derive(Default, Debug, Clone)] pub struct DataStoreStats { - pub total_timeless_index_rows: u64, - pub total_timeless_index_size_bytes: u64, + pub total_timeless_rows: u64, + pub total_timeless_size_bytes: u64, - pub total_temporal_index_rows: u64, - pub total_temporal_index_size_bytes: u64, - pub total_temporal_index_buckets: u64, + pub total_temporal_rows: u64, + pub total_temporal_size_bytes: u64, + pub total_temporal_buckets: u64, - pub total_index_rows: u64, - pub total_index_size_bytes: u64, + pub total_rows: u64, + pub total_size_bytes: u64, pub config: DataStoreConfig, } @@ -22,27 +27,26 @@ impl DataStoreStats { pub fn from_store(store: &DataStore) -> Self { crate::profile_function!(); - let total_timeless_index_rows = store.total_timeless_index_rows(); - let total_timeless_index_size_bytes = store.total_timeless_index_size_bytes(); + let total_timeless_rows = store.total_timeless_rows(); + let total_timeless_size_bytes = store.total_timeless_size_bytes(); - let total_temporal_index_rows = store.total_temporal_index_rows(); - let total_temporal_index_size_bytes = store.total_temporal_index_size_bytes(); - let total_temporal_index_buckets = store.total_temporal_index_buckets(); + let total_temporal_rows = store.total_temporal_rows(); + let total_temporal_size_bytes = store.total_temporal_size_bytes(); + let total_temporal_buckets = store.total_temporal_buckets(); - let total_index_rows = total_timeless_index_rows + total_temporal_index_rows; - let total_index_size_bytes = - total_timeless_index_size_bytes + total_temporal_index_size_bytes; + let total_rows = total_timeless_rows + total_temporal_rows; + let total_size_bytes = total_timeless_size_bytes + total_temporal_size_bytes; Self { - total_timeless_index_rows, - total_timeless_index_size_bytes, + total_timeless_rows, + total_timeless_size_bytes, - total_temporal_index_rows, - total_temporal_index_size_bytes, - total_temporal_index_buckets, + total_temporal_rows, + total_temporal_size_bytes, + total_temporal_buckets, - total_index_rows, - total_index_size_bytes, + total_rows, + total_size_bytes, config: store.config.clone(), } @@ -55,7 +59,7 @@ impl DataStore { /// Returns the number of timeless index rows stored across this entire store, i.e. the sum of /// the number of rows across all of its timeless indexed tables. #[inline] - pub fn total_timeless_index_rows(&self) -> u64 { + pub fn total_timeless_rows(&self) -> u64 { crate::profile_function!(); self.timeless_tables .values() @@ -66,7 +70,7 @@ impl DataStore { /// Returns the size of the timeless index data stored across this entire store, i.e. the sum /// of the size of the data stored across all of its timeless indexed tables, in bytes. #[inline] - pub fn total_timeless_index_size_bytes(&self) -> u64 { + pub fn total_timeless_size_bytes(&self) -> u64 { crate::profile_function!(); self.timeless_tables .values() @@ -77,7 +81,7 @@ impl DataStore { /// Returns the number of temporal index rows stored across this entire store, i.e. the sum of /// the number of rows across all of its temporal indexed tables. #[inline] - pub fn total_temporal_index_rows(&self) -> u64 { + pub fn total_temporal_rows(&self) -> u64 { crate::profile_function!(); self.tables.values().map(|table| table.total_rows()).sum() } @@ -85,7 +89,7 @@ impl DataStore { /// Returns the size of the temporal index data stored across this entire store, i.e. the sum /// of the size of the data stored across all of its temporal indexed tables, in bytes. #[inline] - pub fn total_temporal_index_size_bytes(&self) -> u64 { + pub fn total_temporal_size_bytes(&self) -> u64 { crate::profile_function!(); self.tables .values() @@ -95,7 +99,7 @@ impl DataStore { /// Returns the number of temporal indexed buckets stored across this entire store. #[inline] - pub fn total_temporal_index_buckets(&self) -> u64 { + pub fn total_temporal_buckets(&self) -> u64 { crate::profile_function!(); self.tables .values() @@ -104,37 +108,85 @@ impl DataStore { } } -// --- Persistent Indices --- +// --- Temporal --- -impl PersistentIndexedTable { - /// Returns the number of rows stored across this table. +impl IndexedTable { + /// Returns the number of rows stored across this entire table, i.e. the sum of the number + /// of rows stored across all of its buckets. #[inline] pub fn total_rows(&self) -> u64 { - self.col_num_instances.len() as _ + self.buckets_num_rows } - /// Returns the size of the data stored across this table, in bytes. - #[inline] - pub fn total_size_bytes(&self) -> u64 { - self.total_size_bytes - } -} - -// --- Indices --- - -impl IndexedTable { /// Returns the number of rows stored across this entire table, i.e. the sum of the number /// of rows stored across all of its buckets. + /// + /// Recomputed from scratch, for sanity checking. #[inline] - pub fn total_rows(&self) -> u64 { - self.total_rows + pub(crate) fn total_rows_uncached(&self) -> u64 { + crate::profile_function!(); + self.buckets.values().map(|bucket| bucket.num_rows()).sum() } - /// Returns the size of data stored across this entire table, i.e. the sum of the size of - /// the data stored across all of its buckets, in bytes. + /// The size of both the control & component data stored in this table, across all of its + /// buckets, in bytes. + /// + /// This is a best-effort approximation, adequate for most purposes (stats, + /// triggering GCs, ...). #[inline] pub fn total_size_bytes(&self) -> u64 { - self.total_size_bytes + crate::profile_function!(); + + let Self { + timeline, + ent_path, + cluster_key, + buckets: _, + all_components, + buckets_num_rows: _, + buckets_size_bytes, + } = self; + + let size_bytes = std::mem::size_of_val(timeline) + + std::mem::size_of_val(ent_path) + + std::mem::size_of_val(cluster_key) + + (all_components.len() * std::mem::size_of::()); + + size_bytes as u64 + buckets_size_bytes + } + + /// The size of both the control & component data stored in this table, across all of its + /// buckets, in bytes. + /// + /// This is a best-effort approximation, adequate for most purposes (stats, + /// triggering GCs, ...). + /// + /// Recomputed from scratch, for sanity checking. + #[inline] + pub(crate) fn total_size_bytes_uncached(&self) -> u64 { + crate::profile_function!(); + + let Self { + timeline, + ent_path, + cluster_key, + buckets, + all_components, + buckets_num_rows: _, + buckets_size_bytes: _, + } = self; + + let buckets_size_bytes = buckets + .values() + .map(|bucket| bucket.size_bytes()) + .sum::(); + + let size_bytes = std::mem::size_of_val(timeline) + + std::mem::size_of_val(ent_path) + + std::mem::size_of_val(cluster_key) + + (all_components.len() * std::mem::size_of::()); + + size_bytes as u64 + buckets_size_bytes } /// Returns the number of buckets stored across this entire table. @@ -147,13 +199,122 @@ impl IndexedTable { impl IndexedBucket { /// Returns the number of rows stored across this bucket. #[inline] - pub fn total_rows(&self) -> u64 { + pub fn num_rows(&self) -> u64 { + crate::profile_function!(); self.inner.read().col_time.len() as u64 } - /// Returns the size of the data stored across this bucket, in bytes. + /// The size of both the control & component data stored in this bucket, in bytes. + /// + /// This is a best-effort approximation, adequate for most purposes (stats, + /// triggering GCs, ...). + #[inline] + pub fn size_bytes(&self) -> u64 { + crate::profile_function!(); + + let Self { + timeline, + cluster_key, + inner, + } = self; + + (std::mem::size_of_val(timeline) + std::mem::size_of_val(cluster_key)) as u64 + + inner.read().size_bytes + } +} + +impl IndexedBucketInner { + /// Computes and caches the size of both the control & component data stored in this bucket, + /// in bytes. + /// + /// This is a best-effort approximation, adequate for most purposes (stats, + /// triggering GCs, ...). + #[inline] + pub fn compute_size_bytes(&mut self) -> u64 { + crate::profile_function!(); + + let Self { + is_sorted, + time_range, + col_time, + col_insert_id, + col_row_id, + col_num_instances, + columns, + size_bytes, + } = self; + + let control_size_bytes = std::mem::size_of_val(is_sorted) + + std::mem::size_of_val(time_range) + + std::mem::size_of_val(col_time.as_slice()) + + std::mem::size_of_val(col_insert_id.as_slice()) + + std::mem::size_of_val(col_row_id.as_slice()) + + std::mem::size_of_val(col_num_instances.as_slice()) + + std::mem::size_of_val(size_bytes); + + let data_size_bytes = compute_columns_size_bytes(columns); + + *size_bytes = control_size_bytes as u64 + data_size_bytes; + + *size_bytes + } +} + +// --- Timeless --- + +impl PersistentIndexedTable { + /// Returns the number of rows stored across this table. + #[inline] + pub fn total_rows(&self) -> u64 { + self.col_num_instances.len() as _ + } + + /// The size of both the control & component data stored in this table, in bytes. + /// + /// This is a best-effort approximation, adequate for most purposes (stats, + /// triggering GCs, ...). #[inline] pub fn total_size_bytes(&self) -> u64 { - self.inner.read().total_size_bytes + crate::profile_function!(); + + let Self { + ent_path, + cluster_key, + col_insert_id, + col_row_id, + col_num_instances, + columns, + } = self; + + let control_size_bytes = std::mem::size_of_val(ent_path) + + std::mem::size_of_val(cluster_key) + + std::mem::size_of_val(col_insert_id.as_slice()) + + std::mem::size_of_val(col_row_id.as_slice()) + + std::mem::size_of_val(col_num_instances.as_slice()); + + let data_size_bytes = compute_columns_size_bytes(columns); + + control_size_bytes as u64 + data_size_bytes } } + +// --- Common --- + +/// Computes the size in bytes of an entire table's worth of arrow data. +fn compute_columns_size_bytes(columns: &IntMap) -> u64 { + crate::profile_function!(); + let keys = (columns.keys().len() * std::mem::size_of::()) as u64; + let cells = columns + .values() + .flat_map(|column| column.iter()) + .flatten() // option + .map(|cell| cell.size_bytes()) + .sum::(); + keys + cells +} + +#[test] +fn compute_table_size_bytes_ignore_headers() { + let columns = Default::default(); + assert_eq!(0, compute_columns_size_bytes(&columns)); +} diff --git a/crates/re_arrow_store/src/store_write.rs b/crates/re_arrow_store/src/store_write.rs index 772750f48c97..7994c83a992c 100644 --- a/crates/re_arrow_store/src/store_write.rs +++ b/crates/re_arrow_store/src/store_write.rs @@ -7,7 +7,7 @@ use smallvec::SmallVec; use re_log::{debug, trace}; use re_log_types::{ component_types::InstanceKey, ComponentName, DataCell, DataCellColumn, DataCellError, DataRow, - DataTable, EntityPath, TimeInt, TimeRange, + DataTable, TimeInt, TimeRange, }; use crate::{ @@ -18,7 +18,6 @@ use crate::{ // TODO(#1619): // - The store should insert column-per-column rather than row-per-row (purely a performance // matter) -// - Bring back size stats // --- Data store --- @@ -59,7 +58,7 @@ impl DataStore { /// /// See [`Self::insert_row`]. pub fn insert_table(&mut self, table: &DataTable) -> WriteResult<()> { - for row in table.as_rows() { + for row in table.to_rows() { self.insert_row(&row)?; } Ok(()) @@ -246,10 +245,11 @@ impl IndexedTable { let (_, bucket) = self.find_bucket_mut(time); - let len = bucket.total_rows(); + let len = bucket.num_rows(); let len_overflow = len > config.indexed_bucket_num_rows; if len_overflow { + let bucket_size_before = bucket.size_bytes(); if let Some((min, second_half)) = bucket.split() { trace!( kind = "insert", @@ -262,7 +262,10 @@ impl IndexedTable { "splitting off indexed bucket following overflow" ); + self.buckets_size_bytes += bucket.size_bytes() + second_half.size_bytes(); + self.buckets_size_bytes -= bucket_size_before; self.buckets.insert(min, second_half); + return self.insert_row(config, insert_id, time, generated_cluster_cell, row); } @@ -307,17 +310,25 @@ impl IndexedTable { new_time_bound = timeline.typ().format(new_time_bound.into()), "creating brand new indexed bucket following overflow" ); + + let (inner, inner_size_bytes) = { + let mut inner = IndexedBucketInner { + time_range: TimeRange::new(time, time), + ..Default::default() + }; + let size_bytes = inner.compute_size_bytes(); + (inner, size_bytes) + }; self.buckets.insert( (new_time_bound).into(), IndexedBucket { timeline, cluster_key: self.cluster_key, - inner: RwLock::new(IndexedBucketInner { - time_range: TimeRange::new(time, time), - ..Default::default() - }), + inner: RwLock::new(inner), }, ); + + self.buckets_size_bytes += inner_size_bytes; return self.insert_row(config, insert_id, time, generated_cluster_cell, row); } } @@ -342,7 +353,9 @@ impl IndexedTable { "inserted into indexed tables" ); - bucket.insert_row(insert_id, time, generated_cluster_cell, row, &components); + self.buckets_size_bytes += + bucket.insert_row(insert_id, time, generated_cluster_cell, row, &components); + self.buckets_num_rows += 1; // Insert components last, only if bucket-insert succeeded. self.all_components.extend(components); @@ -350,6 +363,7 @@ impl IndexedTable { } impl IndexedBucket { + /// Returns the size in bytes of the inserted arrow data. fn insert_row( &mut self, insert_id: Option, @@ -357,12 +371,13 @@ impl IndexedBucket { generated_cluster_cell: Option, row: &DataRow, components: &IntSet, - ) { + ) -> u64 { crate::profile_function!(); - let num_rows = self.total_rows() as usize; + let mut size_bytes_added = 0u64; + let num_rows = self.num_rows() as usize; - let mut guard = self.inner.write(); + let mut inner = self.inner.write(); let IndexedBucketInner { is_sorted, time_range, @@ -371,25 +386,32 @@ impl IndexedBucket { col_row_id, col_num_instances, columns, - total_size_bytes: _, // TODO(#1619) - } = &mut *guard; + size_bytes, + } = &mut *inner; // append time to primary column and update time range appropriately col_time.push(time.as_i64()); *time_range = TimeRange::new(time_range.min.min(time), time_range.max.max(time)); + size_bytes_added += std::mem::size_of_val(&time.as_i64()) as u64; // update all control columns if let Some(insert_id) = insert_id { col_insert_id.push(insert_id); + size_bytes_added += std::mem::size_of_val(&insert_id) as u64; } col_row_id.push(row.row_id()); + size_bytes_added += std::mem::size_of_val(&row.row_id()) as u64; col_num_instances.push(row.num_instances()); + size_bytes_added += std::mem::size_of_val(&row.num_instances()) as u64; // insert auto-generated cluster cell if present if let Some(cluster_cell) = generated_cluster_cell { - let column = columns - .entry(cluster_cell.component_name()) - .or_insert_with(|| DataCellColumn::empty(num_rows)); + let component = cluster_cell.component_name(); + let column = columns.entry(component).or_insert_with(|| { + size_bytes_added += std::mem::size_of_val(&component) as u64; + DataCellColumn::empty(num_rows) + }); + size_bytes_added += cluster_cell.size_bytes(); column.0.push(Some(cluster_cell)); } @@ -397,9 +419,12 @@ impl IndexedBucket { // 2-way merge, step 1: left-to-right for cell in row.cells().iter() { - let column = columns - .entry(cell.component_name()) - .or_insert_with(|| DataCellColumn::empty(col_time.len().saturating_sub(1))); + let component = cell.component_name(); + let column = columns.entry(component).or_insert_with(|| { + size_bytes_added += std::mem::size_of_val(&component) as u64; + DataCellColumn::empty(col_time.len().saturating_sub(1)) + }); + size_bytes_added += cell.size_bytes(); column.0.push(Some(cell.clone() /* shallow */)); } @@ -420,11 +445,15 @@ impl IndexedBucket { // TODO(#433): re_datastore: properly handle already sorted data during insertion *is_sorted = false; + *size_bytes += size_bytes_added; + #[cfg(debug_assertions)] { - drop(guard); // sanity checking will grab the lock! + drop(inner); self.sanity_check().unwrap(); } + + size_bytes_added } /// Splits the bucket into two, potentially uneven parts. @@ -459,8 +488,8 @@ impl IndexedBucket { inner, } = self; - let mut inner = inner.write(); - inner.sort(); + let mut inner1 = inner.write(); + inner1.sort(); let IndexedBucketInner { is_sorted: _, @@ -470,8 +499,8 @@ impl IndexedBucket { col_row_id: col_row_id1, col_num_instances: col_num_instances1, columns: columns1, - total_size_bytes: _, // NOTE: recomputed from scratch for both halves - } = &mut *inner; + size_bytes: _, // NOTE: recomputed below + } = &mut *inner1; if col_time1.len() < 2 { return None; // early exit: can't split the unsplittable @@ -486,8 +515,9 @@ impl IndexedBucket { crate::profile_function!(); let timeline = *timeline; + // Used in debug builds to assert that we've left everything in a sane state. - let _total_rows = col_time1.len(); + let _num_rows = col_time1.len(); fn split_off_column( column: &mut SmallVec<[T; N]>, @@ -542,10 +572,8 @@ impl IndexedBucket { .collect() }; - let bucket2 = Self { - timeline, - cluster_key: self.cluster_key, - inner: RwLock::new(IndexedBucketInner { + let inner2 = { + let mut inner2 = IndexedBucketInner { is_sorted: true, time_range: time_range2, col_time: col_time2, @@ -553,36 +581,36 @@ impl IndexedBucket { col_row_id: col_row_id2, col_num_instances: col_num_instances2, columns: columns2, - total_size_bytes: 0, // TODO(#1619) - }), + size_bytes: 0, // NOTE: computed below + }; + inner2.compute_size_bytes(); + inner2 + }; + let bucket2 = Self { + timeline, + cluster_key: self.cluster_key, + inner: RwLock::new(inner2), }; - // TODO(#1619): bring back size stats - // bucket2.compute_total_size_bytes(); (time_range2.min, bucket2) }; - // TODO(#1619): bring back size stats - // inner.compute_total_size_bytes(); + inner1.compute_size_bytes(); // sanity checks #[cfg(debug_assertions)] { - drop(inner); // sanity checking will grab the lock! + drop(inner1); // sanity checking will grab the lock! self.sanity_check().unwrap(); bucket2.sanity_check().unwrap(); - // TODO(#1619): check size_bytes sum too - let total_rows1 = self.total_rows() as i64; - let total_rows2 = bucket2.total_rows() as i64; - debug_assert!( - _total_rows as i64 == total_rows1 + total_rows2, - "expected both buckets to sum up to the length of the original bucket: \ - got bucket={} vs. bucket1+bucket2={}", - _total_rows, + let total_rows1 = self.num_rows() as i64; + let total_rows2 = bucket2.num_rows() as i64; + debug_assert_eq!( + _num_rows as i64, total_rows1 + total_rows2, + "expected both buckets to sum up to the length of the original bucket" ); - debug_assert_eq!(_total_rows as i64, total_rows1 + total_rows2); } Some((min2, bucket2)) @@ -710,18 +738,6 @@ fn split_time_range_off( // --- Timeless --- impl PersistentIndexedTable { - pub fn new(cluster_key: ComponentName, ent_path: EntityPath) -> Self { - Self { - cluster_key, - ent_path, - col_insert_id: Default::default(), - col_row_id: Default::default(), - col_num_instances: Default::default(), - columns: Default::default(), - total_size_bytes: 0, // TODO(#1619) - } - } - fn insert_row( &mut self, insert_id: Option, @@ -739,19 +755,19 @@ impl PersistentIndexedTable { col_row_id, col_num_instances, columns, - total_size_bytes: _, // TODO(#1619) } = self; let components: IntSet<_> = row.component_names().collect(); - // update all control columns + // --- update all control columns --- + if let Some(insert_id) = insert_id { col_insert_id.push(insert_id); } col_row_id.push(row.row_id()); col_num_instances.push(row.num_instances()); - // append components to their respective columns (2-way merge) + // --- append components to their respective columns (2-way merge) --- // insert auto-generated cluster cell if present if let Some(cluster_cell) = generated_cluster_cell { @@ -772,7 +788,7 @@ impl PersistentIndexedTable { // 2-way merge, step 2: right-to-left // // fill unimpacted secondary indices with null values - for (component, column) in columns { + for (component, column) in columns.iter_mut() { // The cluster key always gets added one way or another, don't try to force fill it! if *component == self.cluster_key { continue; diff --git a/crates/re_arrow_store/tests/data_store.rs b/crates/re_arrow_store/tests/data_store.rs index 57f989517b4b..edc1f61f9e52 100644 --- a/crates/re_arrow_store/tests/data_store.rs +++ b/crates/re_arrow_store/tests/data_store.rs @@ -795,14 +795,14 @@ fn joint_df(cluster_key: ComponentName, rows: &[(ComponentName, &DataRow)]) -> D .iter() .map(|(component, row)| { let cluster_comp = if let Some(idx) = row.find_cell(&cluster_key) { - Series::try_from((cluster_key.as_str(), row.cells[idx].as_arrow_monolist())) + Series::try_from((cluster_key.as_str(), row.cells[idx].to_arrow_monolist())) .unwrap() } else { let num_instances = row.num_instances(); Series::try_from(( cluster_key.as_str(), DataCell::from_component::(0..num_instances as u64) - .as_arrow_monolist(), + .to_arrow_monolist(), )) .unwrap() }; @@ -810,7 +810,7 @@ fn joint_df(cluster_key: ComponentName, rows: &[(ComponentName, &DataRow)]) -> D let comp_idx = row.find_cell(component).unwrap(); let df = DataFrame::new(vec![ cluster_comp, - Series::try_from((component.as_str(), row.cells[comp_idx].as_arrow_monolist())) + Series::try_from((component.as_str(), row.cells[comp_idx].to_arrow_monolist())) .unwrap(), ]) .unwrap(); diff --git a/crates/re_arrow_store/tests/internals.rs b/crates/re_arrow_store/tests/internals.rs index 1b776d2619dc..9233438b72e0 100644 --- a/crates/re_arrow_store/tests/internals.rs +++ b/crates/re_arrow_store/tests/internals.rs @@ -112,7 +112,7 @@ fn pathological_bucket_topology() { { let num_buckets = store_forward .iter_indices() - .flat_map(|(_, table)| table.iter_buckets()) + .flat_map(|(_, table)| table.buckets.values()) .count(); assert_eq!( 7usize, @@ -127,7 +127,7 @@ fn pathological_bucket_topology() { { let num_buckets = store_backward .iter_indices() - .flat_map(|(_, table)| table.iter_buckets()) + .flat_map(|(_, table)| table.buckets.values()) .count(); assert_eq!( 8usize, diff --git a/crates/re_data_store/examples/memory_usage.rs b/crates/re_data_store/examples/memory_usage.rs index 69ca6ef5d6a1..6e8518eae542 100644 --- a/crates/re_data_store/examples/memory_usage.rs +++ b/crates/re_data_store/examples/memory_usage.rs @@ -57,7 +57,7 @@ fn main() { fn log_messages() { use re_log_types::{ datagen::{build_frame_nr, build_some_point2d}, - ArrowMsg, LogMsg, TimeInt, TimePoint, Timeline, + LogMsg, TimeInt, TimePoint, Timeline, }; // Note: we use Box in this function so that we also count the "static" @@ -119,7 +119,7 @@ fn log_messages() { let table_bytes = live_bytes() - used_bytes_start; let log_msg = Box::new(LogMsg::ArrowMsg( recording_id, - ArrowMsg::try_from(&*table).unwrap(), + table.to_arrow_msg().unwrap(), )); let log_msg_bytes = live_bytes() - used_bytes_start; println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM"); @@ -145,7 +145,7 @@ fn log_messages() { let table_bytes = live_bytes() - used_bytes_start; let log_msg = Box::new(LogMsg::ArrowMsg( recording_id, - ArrowMsg::try_from(&*table).unwrap(), + table.to_arrow_msg().unwrap(), )); let log_msg_bytes = live_bytes() - used_bytes_start; println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM"); diff --git a/crates/re_data_store/src/entity_properties.rs b/crates/re_data_store/src/entity_properties.rs index 89a9235fdbb8..4c8ba0f4d6a6 100644 --- a/crates/re_data_store/src/entity_properties.rs +++ b/crates/re_data_store/src/entity_properties.rs @@ -211,7 +211,7 @@ where let cells = data_store.latest_at(query, entity_path, C::name(), &[C::name()])?; let cell = cells.get(0)?.as_ref()?; - let mut iter = cell.try_as_native::().ok()?; + let mut iter = cell.try_to_native::().ok()?; let component = iter.next(); diff --git a/crates/re_data_store/src/log_db.rs b/crates/re_data_store/src/log_db.rs index 9e5d2320432b..8bafa9dbb29c 100644 --- a/crates/re_data_store/src/log_db.rs +++ b/crates/re_data_store/src/log_db.rs @@ -53,10 +53,14 @@ impl EntityDb { } fn try_add_arrow_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> { - let table: DataTable = msg.try_into()?; + crate::profile_function!(); + + // TODO(#1760): Compute the size of the datacells in the batching threads on the clients. + let mut table = DataTable::from_arrow_msg(msg)?; + table.compute_all_size_bytes(); // TODO(#1619): batch all of this - for row in table.as_rows() { + for row in table.to_rows() { self.try_add_data_row(&row)?; } diff --git a/crates/re_log_encoding/benches/msg_encode_benchmark.rs b/crates/re_log_encoding/benches/msg_encode_benchmark.rs index fb36e7bc34d7..373add32096e 100644 --- a/crates/re_log_encoding/benches/msg_encode_benchmark.rs +++ b/crates/re_log_encoding/benches/msg_encode_benchmark.rs @@ -6,7 +6,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; use re_log_types::{ datagen::{build_frame_nr, build_some_colors, build_some_point2d}, - entity_path, ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId, RecordingId, + entity_path, DataRow, DataTable, Index, LogMsg, MsgId, RecordingId, }; use criterion::{criterion_group, criterion_main, Criterion}; @@ -45,7 +45,7 @@ fn decode_log_msgs(mut bytes: &[u8]) -> Vec { fn generate_messages(recording_id: RecordingId, tables: &[DataTable]) -> Vec { tables .iter() - .map(|table| LogMsg::ArrowMsg(recording_id, ArrowMsg::try_from(table).unwrap())) + .map(|table| LogMsg::ArrowMsg(recording_id, table.to_arrow_msg().unwrap())) .collect() } @@ -54,7 +54,7 @@ fn decode_tables(messages: &[LogMsg]) -> Vec { .iter() .map(|log_msg| { if let LogMsg::ArrowMsg(_, arrow_msg) = log_msg { - DataTable::try_from(arrow_msg).unwrap() + DataTable::from_arrow_msg(arrow_msg).unwrap() } else { unreachable!() } diff --git a/crates/re_log_types/src/arrow_msg.rs b/crates/re_log_types/src/arrow_msg.rs index 706635970da8..bda5c66f8434 100644 --- a/crates/re_log_types/src/arrow_msg.rs +++ b/crates/re_log_types/src/arrow_msg.rs @@ -146,11 +146,19 @@ mod tests { (build_some_point2d(1), build_some_rects(1)), ); - let table_in = row.into_table(); - let msg_in: ArrowMsg = (&table_in).try_into().unwrap(); + let table_in = { + let mut table = row.into_table(); + table.compute_all_size_bytes(); + table + }; + let msg_in = table_in.to_arrow_msg().unwrap(); let buf = rmp_serde::to_vec(&msg_in).unwrap(); let msg_out: ArrowMsg = rmp_serde::from_slice(&buf).unwrap(); - let table_out: DataTable = (&msg_out).try_into().unwrap(); + let table_out = { + let mut table = DataTable::from_arrow_msg(&msg_out).unwrap(); + table.compute_all_size_bytes(); + table + }; assert_eq!(msg_in, msg_out); assert_eq!(table_in, table_out); diff --git a/crates/re_log_types/src/data_cell.rs b/crates/re_log_types/src/data_cell.rs index 4a198ef365f8..316d25f7978d 100644 --- a/crates/re_log_types/src/data_cell.rs +++ b/crates/re_log_types/src/data_cell.rs @@ -88,7 +88,7 @@ pub type DataCellResult = ::std::result::Result; /// # assert_eq!(3, cell.num_instances()); /// # assert_eq!(cell.datatype(), &Point2D::data_type()); /// # -/// # assert_eq!(points, cell.as_native().collect_vec().as_slice()); +/// # assert_eq!(points, cell.to_native().collect_vec().as_slice()); /// ``` /// #[derive(Debug, Clone, PartialEq)] @@ -114,6 +114,12 @@ pub struct DataCellInner { // TODO(#1696): Store this within the datatype itself. pub(crate) name: ComponentName, + /// The size in bytes of both the underlying arrow data _and_ the inner cell itself. + /// + /// This is always zero unless [`Self::compute_size_bytes`] has been called, which is a very + /// costly operation. + pub(crate) size_bytes: u64, + /// A uniformly typed list of values for the given component type: `[C, C, C, ...]` /// /// Includes the data, its schema and probably soon the component metadata @@ -223,7 +229,11 @@ impl DataCell { values: Box, ) -> DataCellResult { Ok(Self { - inner: Arc::new(DataCellInner { name, values }), + inner: Arc::new(DataCellInner { + name, + size_bytes: 0, + values, + }), }) } @@ -256,11 +266,16 @@ impl DataCell { datatype: arrow2::datatypes::DataType, ) -> DataCellResult { // TODO(cmc): check that it is indeed a component datatype + + let mut inner = DataCellInner { + name, + size_bytes: 0, + values: arrow2::array::new_empty_array(datatype), + }; + inner.compute_size_bytes(); + Ok(Self { - inner: Arc::new(DataCellInner { - name, - values: arrow2::array::new_empty_array(datatype), - }), + inner: Arc::new(inner), }) } @@ -282,7 +297,7 @@ impl DataCell { /// If you do use them, try to keep the scope as short as possible: holding on to a raw array /// might prevent the datastore from releasing memory from garbage collected data. #[inline] - pub fn as_arrow(&self) -> Box { + pub fn to_arrow(&self) -> Box { self.inner.values.clone() /* shallow */ } @@ -310,10 +325,10 @@ impl DataCell { // TODO(cmc): effectively, this returns a `DataColumn`... think about that. #[doc(hidden)] #[inline] - pub fn as_arrow_monolist(&self) -> Box { + pub fn to_arrow_monolist(&self) -> Box { use arrow2::{array::ListArray, offset::Offsets}; - let values = self.as_arrow(); + let values = self.to_arrow(); let datatype = self.datatype().clone(); let datatype = ListArray::::default_datatype(datatype); @@ -331,7 +346,7 @@ impl DataCell { // // TODO(#1694): There shouldn't need to be HRTBs (Higher-Rank Trait Bounds) here. #[inline] - pub fn try_as_native( + pub fn try_to_native( &self, ) -> DataCellResult + '_> where @@ -344,15 +359,15 @@ impl DataCell { /// Returns the contents of the cell as an iterator of native components. /// /// Panics if the underlying arrow data cannot be deserialized into `C`. - /// See [`Self::try_as_native`] for a fallible alternative. + /// See [`Self::try_to_native`] for a fallible alternative. // // TODO(#1694): There shouldn't need to be HRTBs here. #[inline] - pub fn as_native(&self) -> impl Iterator + '_ + pub fn to_native(&self) -> impl Iterator + '_ where for<'a> &'a C::ArrayType: IntoIterator, { - self.try_as_native().unwrap() + self.try_to_native().unwrap() } } @@ -426,6 +441,24 @@ impl DataCell { _ => Err(DataCellError::UnsupportedDatatype(arr.data_type().clone())), } } + + /// Returns the total (heap) allocated size of the cell in bytes, provided that + /// [`Self::compute_size_bytes`] has been called first (zero otherwise). + /// + /// This is an approximation, accurate enough for most purposes (stats, GC trigger, ...). + /// + /// This is `O(1)`, the value is computed and cached by calling [`Self::compute_size_bytes`]. + #[inline] + pub fn size_bytes(&self) -> u64 { + let Self { inner } = self; + + (inner.size_bytes > 0) + .then_some(std::mem::size_of_val(inner) as u64 + inner.size_bytes) + .unwrap_or_else(|| { + re_log::warn_once!("called `DataCell::size_bytes() without computing it first"); + 0 + }) + } } // --- @@ -457,9 +490,13 @@ impl From<&Vec> for DataCell { impl std::fmt::Display for DataCell { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!( + "DataCell({})", + re_format::format_bytes(self.size_bytes() as _) + ))?; re_format::arrow::format_table( // NOTE: wrap in a ListArray so that it looks more cell-like (i.e. single row) - [&*self.as_arrow_monolist()], + [&*self.to_arrow_monolist()], [self.component_name()], ) .fmt(f) @@ -469,15 +506,82 @@ impl std::fmt::Display for DataCell { // --- impl DataCell { - /// Returns the total (heap) allocated size of the array in bytes. + /// Compute and cache the total (heap) allocated size of the underlying arrow array in bytes. + /// This does nothing if the size has already been computed and cached before. /// - /// Beware: this is costly! Cache the returned value as much as possible. - pub fn size_bytes(&self) -> u64 { - let DataCellInner { name, values } = &*self.inner; + /// The caller must the sole owner of this cell, as this requires mutating an `Arc` under the + /// hood. Returns false otherwise. + /// + /// Beware: this is _very_ costly! + #[inline] + pub fn compute_size_bytes(&mut self) -> bool { + if let Some(inner) = Arc::get_mut(&mut self.inner) { + inner.compute_size_bytes(); + return true; + } + false + } +} - std::mem::size_of_val(name) as u64 + - // Warning: this is surprisingly costly! - arrow2::compute::aggregate::estimated_bytes_size(&**values) as u64 +impl DataCellInner { + /// Compute and cache the total (heap) allocated size of the underlying arrow array in bytes. + /// This does nothing if the size has already been computed and cached before. + /// + /// Beware: this is _very_ costly! + #[inline] + pub fn compute_size_bytes(&mut self) { + let Self { + name, + size_bytes, + values, + } = self; + + // NOTE: The computed size cannot ever be zero. + if *size_bytes > 0 { + return; + } + + *size_bytes = (std::mem::size_of_val(name) + + std::mem::size_of_val(size_bytes) + + std::mem::size_of_val(values)) as u64 + + arrow2::compute::aggregate::estimated_bytes_size(&*self.values) as u64; + } +} + +#[test] +fn data_cell_sizes() { + use crate::{component_types::InstanceKey, Component as _}; + use arrow2::array::UInt64Array; + + // not computed + { + let cell = DataCell::from_arrow(InstanceKey::name(), UInt64Array::from_vec(vec![]).boxed()); + assert_eq!(0, cell.size_bytes()); + assert_eq!(0, cell.size_bytes()); + } + + // zero-sized + { + let mut cell = + DataCell::from_arrow(InstanceKey::name(), UInt64Array::from_vec(vec![]).boxed()); + cell.compute_size_bytes(); + + // only the size of the outer & inner cells themselves + assert_eq!(56, cell.size_bytes()); + assert_eq!(56, cell.size_bytes()); + } + + // anything else + { + let mut cell = DataCell::from_arrow( + InstanceKey::name(), + UInt64Array::from_vec(vec![1, 2, 3]).boxed(), + ); + cell.compute_size_bytes(); + + // 56 bytes for the inner & outer cells + 3x u64s + assert_eq!(80, cell.size_bytes()); + assert_eq!(80, cell.size_bytes()); } } @@ -500,6 +604,15 @@ fn test_arrow_estimated_size_bytes() { offset::Offsets, }; + // empty primitive array + { + let data = vec![]; + let array = UInt64Array::from_vec(data.clone()).boxed(); + let sz = estimated_bytes_size(&*array); + assert_eq!(0, sz); + assert_eq!(std::mem::size_of_val(data.as_slice()), sz); + } + // simple primitive array { let data = vec![42u64; 100]; diff --git a/crates/re_log_types/src/data_row.rs b/crates/re_log_types/src/data_row.rs index 3122ad478a53..a96e1c80bc48 100644 --- a/crates/re_log_types/src/data_row.rs +++ b/crates/re_log_types/src/data_row.rs @@ -326,6 +326,18 @@ impl DataRow { .map(|cell| cell.component_name()) .position(|name| name == *component) } + + /// Compute and cache the total (heap) allocated size of each individual underlying + /// [`DataCell`]. + /// This does nothing for cells whose size has already been computed and cached before. + /// + /// Beware: this is _very_ costly! + #[inline] + pub fn compute_all_size_bytes(&mut self) { + for cell in &mut self.cells.0 { + cell.compute_size_bytes(); + } + } } // --- @@ -482,7 +494,7 @@ impl std::fmt::Display for DataRow { } re_format::arrow::format_table( - self.cells.iter().map(|cell| cell.as_arrow_monolist()), + self.cells.iter().map(|cell| cell.to_arrow_monolist()), self.cells.iter().map(|cell| cell.component_name()), ) .fmt(f) diff --git a/crates/re_log_types/src/data_table.rs b/crates/re_log_types/src/data_table.rs index 063fa44f91b1..a15e340e31e8 100644 --- a/crates/re_log_types/src/data_table.rs +++ b/crates/re_log_types/src/data_table.rs @@ -106,6 +106,18 @@ impl DataCellColumn { pub fn empty(num_rows: usize) -> Self { Self(smallvec::smallvec![None; num_rows]) } + + /// Compute and cache the total (heap) allocated size of each individual underlying + /// [`DataCell`]. + /// This does nothing for cells whose size has already been computed and cached before. + /// + /// Beware: this is _very_ costly! + #[inline] + pub fn compute_all_size_bytes(&mut self) { + for cell in &mut self.0 { + cell.as_mut().map(|cell| cell.compute_size_bytes()); + } + } } // --- @@ -382,7 +394,7 @@ impl DataTable { } #[inline] - pub fn as_rows(&self) -> impl ExactSizeIterator + '_ { + pub fn to_rows(&self) -> impl ExactSizeIterator + '_ { let num_rows = self.num_rows() as usize; let Self { @@ -428,6 +440,18 @@ impl DataTable { } timepoint } + + /// Compute and cache the total (heap) allocated size of each individual underlying + /// [`DataCell`]. + /// This does nothing for cells whose size has already been computed and cached before. + /// + /// Beware: this is _very_ costly! + #[inline] + pub fn compute_all_size_bytes(&mut self) { + for column in self.columns.values_mut() { + column.compute_all_size_bytes(); + } + } } // --- Serialization --- @@ -588,6 +612,8 @@ impl DataTable { name: &str, values: &[C], ) -> DataTableResult<(Field, Box)> { + crate::profile_function!(); + /// Transforms an array of unit values into a list of unit arrays. /// /// * Before: `[C, C, C, C, C, ...]` @@ -627,6 +653,8 @@ impl DataTable { values: &[T], datatype: Option, ) -> DataTableResult<(Field, Box)> { + crate::profile_function!(); + let data = PrimitiveArray::from_slice(values); let datatype = datatype.unwrap_or(data.data_type().clone()); @@ -679,6 +707,8 @@ impl DataTable { name: &str, column: &[Option], ) -> DataTableResult<(Field, Box)> { + crate::profile_function!(); + /// Create a list-array out of a flattened array of cell values. /// /// * Before: `[C, C, C, C, C, C, C, ...]` @@ -852,6 +882,8 @@ impl DataTable { name: &str, column: &dyn Array, ) -> DataTableResult<(Timeline, TimeOptVec)> { + crate::profile_function!(); + // See also [`Timeline::datatype`] let timeline = match column.data_type().to_logical_type() { DataType::Int64 => Timeline::new_sequence(name), @@ -879,6 +911,7 @@ impl DataTable { component: ComponentName, column: &dyn Array, ) -> DataTableResult { + crate::profile_function!(); Ok(DataCellColumn( column .as_any() @@ -893,11 +926,10 @@ impl DataTable { // --- -impl TryFrom<&ArrowMsg> for DataTable { - type Error = DataTableError; - +impl DataTable { + /// Deserializes the contents of an [`ArrowMsg`] into a `DataTable`. #[inline] - fn try_from(msg: &ArrowMsg) -> DataTableResult { + pub fn from_arrow_msg(msg: &ArrowMsg) -> DataTableResult { let ArrowMsg { table_id, timepoint_max: _, @@ -907,18 +939,17 @@ impl TryFrom<&ArrowMsg> for DataTable { Self::deserialize(*table_id, schema, chunk) } -} - -impl TryFrom<&DataTable> for ArrowMsg { - type Error = DataTableError; + /// Serializes the contents of a `DataTable` into an [`ArrowMsg`]. + // + // TODO(#1760): support serializing the cell size itself, so it can be computed on the clients. #[inline] - fn try_from(table: &DataTable) -> DataTableResult { - let timepoint_max = table.timepoint_max(); - let (schema, chunk) = table.serialize()?; + pub fn to_arrow_msg(&self) -> DataTableResult { + let timepoint_max = self.timepoint_max(); + let (schema, chunk) = self.serialize()?; Ok(ArrowMsg { - table_id: table.table_id, + table_id: self.table_id, timepoint_max, schema, chunk, @@ -1003,6 +1034,9 @@ impl DataTable { ) }; - DataTable::from_rows(table_id, [row0, row1, row2]) + let mut table = DataTable::from_rows(table_id, [row0, row1, row2]); + table.compute_all_size_bytes(); + + table } } diff --git a/crates/re_query/src/query.rs b/crates/re_query/src/query.rs index 8cc818bd9db2..adb29f55aa9e 100644 --- a/crates/re_query/src/query.rs +++ b/crates/re_query/src/query.rs @@ -55,10 +55,10 @@ pub fn get_component_with_instances( Ok(ComponentWithInstances { name: component, - instance_keys: cells[0].take().map(|cell| cell.as_arrow()), + instance_keys: cells[0].take().map(|cell| cell.to_arrow()), values: cells[1] .take() - .map(|cell| cell.as_arrow()) + .map(|cell| cell.to_arrow()) .ok_or(QueryError::PrimaryNotFound)?, }) } diff --git a/crates/re_query/src/range.rs b/crates/re_query/src/range.rs index 52374810552a..4b3bdae4ca06 100644 --- a/crates/re_query/src/range.rs +++ b/crates/re_query/src/range.rs @@ -94,7 +94,7 @@ pub fn range_entity_with_primary<'a, Primary: Component + 'a, const N: usize>( store .range(query, ent_path, components) .map(move |(time, _, mut cells)| { - let instance_keys = cells[cluster_col].take().map(|cell| cell.as_arrow()); + let instance_keys = cells[cluster_col].take().map(|cell| cell.to_arrow()); let is_primary = cells[primary_col].is_some(); let cwis = cells .into_iter() @@ -103,7 +103,7 @@ pub fn range_entity_with_primary<'a, Primary: Component + 'a, const N: usize>( cell.map(|cell| ComponentWithInstances { name: components[i], instance_keys: instance_keys.clone(), /* shallow */ - values: cell.as_arrow(), + values: cell.to_arrow(), }) }) .collect::>(); diff --git a/crates/re_sdk/src/msg_sender.rs b/crates/re_sdk/src/msg_sender.rs index 9ab867a3d452..2bc377010444 100644 --- a/crates/re_sdk/src/msg_sender.rs +++ b/crates/re_sdk/src/msg_sender.rs @@ -251,13 +251,13 @@ impl MsgSender { if let Some(row_transforms) = row_transforms { sink.send(LogMsg::ArrowMsg( recording_id, - (&row_transforms.into_table()).try_into()?, + row_transforms.into_table().to_arrow_msg()?, )); } if let Some(row_splats) = row_splats { sink.send(LogMsg::ArrowMsg( recording_id, - (&row_splats.into_table()).try_into()?, + row_splats.into_table().to_arrow_msg()?, )); } // Always the primary component last so range-based queries will include the other data. @@ -265,7 +265,7 @@ impl MsgSender { if let Some(row_standard) = row_standard { sink.send(LogMsg::ArrowMsg( recording_id, - (&row_standard.into_table()).try_into()?, + row_standard.into_table().to_arrow_msg()?, )); } diff --git a/crates/re_viewer/src/ui/data_ui/log_msg.rs b/crates/re_viewer/src/ui/data_ui/log_msg.rs index 5c19a09fa3b8..260e132feb31 100644 --- a/crates/re_viewer/src/ui/data_ui/log_msg.rs +++ b/crates/re_viewer/src/ui/data_ui/log_msg.rs @@ -101,7 +101,7 @@ impl DataUi for ArrowMsg { verbosity: UiVerbosity, query: &re_arrow_store::LatestAtQuery, ) { - let table: DataTable = match self.try_into() { + let table = match DataTable::from_arrow_msg(self) { Ok(table) => table, Err(err) => { ui.label( @@ -113,7 +113,7 @@ impl DataUi for ArrowMsg { }; // TODO(cmc): Come up with something a bit nicer once data tables become a common sight. - for row in table.as_rows() { + for row in table.to_rows() { egui::Grid::new("fields").num_columns(2).show(ui, |ui| { ui.monospace("entity_path:"); ctx.entity_path_button(ui, None, row.entity_path()); diff --git a/crates/re_viewer/src/ui/event_log_view.rs b/crates/re_viewer/src/ui/event_log_view.rs index 89aea702faa2..3820710be32e 100644 --- a/crates/re_viewer/src/ui/event_log_view.rs +++ b/crates/re_viewer/src/ui/event_log_view.rs @@ -176,9 +176,9 @@ fn table_row( // NOTE: This really only makes sense because we don't yet have batches with more than a // single row at the moment... and by the time we do, the event log view will have // disappeared entirely. - LogMsg::ArrowMsg(_, msg) => match DataTable::try_from(msg) { + LogMsg::ArrowMsg(_, msg) => match DataTable::from_arrow_msg(msg) { Ok(table) => { - for datarow in table.as_rows() { + for datarow in table.to_rows() { row.col(|ui| { ctx.msg_id_button(ui, datarow.row_id()); }); diff --git a/crates/re_viewer/src/ui/memory_panel.rs b/crates/re_viewer/src/ui/memory_panel.rs index bff514d5f7b8..1a390ed267a8 100644 --- a/crates/re_viewer/src/ui/memory_panel.rs +++ b/crates/re_viewer/src/ui/memory_panel.rs @@ -26,7 +26,7 @@ impl MemoryPanel { (gpu_resource_stats.total_buffer_size_in_bytes + gpu_resource_stats.total_texture_size_in_bytes) as _, ), - Some(store_stats.total_index_size_bytes as _), + Some(store_stats.total_size_bytes as _), ); } @@ -186,7 +186,6 @@ impl MemoryPanel { ui.label(egui::RichText::new("Limits").italics()); ui.label("Row limit"); - ui.label("Size limit"); ui.end_row(); let label_rows = |ui: &mut egui::Ui, num_rows| { @@ -197,7 +196,11 @@ impl MemoryPanel { } }; - ui.label("Indices:"); + ui.label("Timeless:"); + label_rows(ui, u64::MAX); + ui.end_row(); + + ui.label("Temporal:"); label_rows(ui, config.indexed_bucket_num_rows); ui.end_row(); }); @@ -208,13 +211,13 @@ impl MemoryPanel { .num_columns(3) .show(ui, |ui| { let DataStoreStats { - total_timeless_index_rows, - total_timeless_index_size_bytes, - total_temporal_index_rows, - total_temporal_index_size_bytes, - total_temporal_index_buckets, - total_index_rows, - total_index_size_bytes, + total_timeless_rows, + total_timeless_size_bytes, + total_temporal_rows, + total_temporal_size_bytes, + total_temporal_buckets, + total_rows, + total_size_bytes, config: _, } = *store_stats; @@ -232,22 +235,22 @@ impl MemoryPanel { let label_size = |ui: &mut egui::Ui, size| ui.label(re_format::format_bytes(size as _)); - ui.label("Indices (timeless):"); + ui.label("Timeless:"); ui.label(""); - label_rows(ui, total_timeless_index_rows); - label_size(ui, total_timeless_index_size_bytes); + label_rows(ui, total_timeless_rows); + label_size(ui, total_timeless_size_bytes); ui.end_row(); - ui.label("Indices (temporal):"); - label_buckets(ui, total_temporal_index_buckets); - label_rows(ui, total_temporal_index_rows); - label_size(ui, total_temporal_index_size_bytes); + ui.label("Temporal:"); + label_buckets(ui, total_temporal_buckets); + label_rows(ui, total_temporal_rows); + label_size(ui, total_temporal_size_bytes); ui.end_row(); - ui.label("Indices (total):"); - label_buckets(ui, total_temporal_index_buckets); - label_rows(ui, total_index_rows); - label_size(ui, total_index_size_bytes); + ui.label("Total"); + label_buckets(ui, total_temporal_buckets); + label_rows(ui, total_rows); + label_size(ui, total_size_bytes); ui.end_row(); }); } diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 738059999692..cd641b9f7562 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -484,13 +484,7 @@ fn log_transform( [transform].as_slice(), ); - let msg = (&row.into_table()) - .try_into() - .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; - - session.send_arrow_msg(msg); - - Ok(()) + session.send_row(row) } // ---------------------------------------------------------------------------- @@ -568,13 +562,7 @@ fn log_view_coordinates( [coordinates].as_slice(), ); - let msg = (&row.into_table()) - .try_into() - .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; - - session.send_arrow_msg(msg); - - Ok(()) + session.send_row(row) } // ---------------------------------------------------------------------------- @@ -702,13 +690,7 @@ fn log_meshes( meshes, ); - let msg = (&row.into_table()) - .try_into() - .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; - - session.send_arrow_msg(msg); - - Ok(()) + session.send_row(row) } #[pyfunction] @@ -783,13 +765,7 @@ fn log_mesh_file( [mesh3d].as_slice(), ); - let msg = (&row.into_table()) - .try_into() - .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; - - session.send_arrow_msg(msg); - - Ok(()) + session.send_row(row) } /// Log an image file given its contents or path on disk. @@ -875,13 +851,7 @@ fn log_image_file( [tensor].as_slice(), ); - let msg = (&row.into_table()) - .try_into() - .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; - - session.send_arrow_msg(msg); - - Ok(()) + session.send_row(row) } #[derive(FromPyObject)] @@ -954,13 +924,7 @@ fn log_annotation_context( [annotation_context].as_slice(), ); - let msg = (&row.into_table()) - .try_into() - .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; - - session.send_arrow_msg(msg); - - Ok(()) + session.send_row(row) } #[pyfunction] @@ -987,8 +951,8 @@ fn log_arrow_msg(entity_path: &str, components: &PyDict, timeless: bool) -> PyRe let mut session = python_session(); - let msg: ArrowMsg = (&data_table) - .try_into() + let msg: ArrowMsg = data_table + .to_arrow_msg() .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; session.send_arrow_msg(msg); diff --git a/rerun_py/src/python_session.rs b/rerun_py/src/python_session.rs index a44a7619747b..985118e00712 100644 --- a/rerun_py/src/python_session.rs +++ b/rerun_py/src/python_session.rs @@ -1,8 +1,9 @@ use std::net::SocketAddr; +use pyo3::{exceptions::PyValueError, PyResult}; use re_log_types::{ - ApplicationId, ArrowMsg, BeginRecordingMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo, - RecordingSource, Time, TimePoint, + ApplicationId, ArrowMsg, BeginRecordingMsg, DataRow, DataTableError, LogMsg, MsgId, PathOp, + RecordingId, RecordingInfo, RecordingSource, Time, TimePoint, }; use rerun::sink::LogSink; @@ -197,6 +198,18 @@ impl PythonSession { self.sink.drop_msgs_if_disconnected(); } + /// Send a single [`DataRow`]. + pub fn send_row(&mut self, row: DataRow) -> PyResult<()> { + let msg = row + .into_table() + .to_arrow_msg() + .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; + + self.send(LogMsg::ArrowMsg(self.recording_id(), msg)); + + Ok(()) + } + /// Send a [`LogMsg`]. pub fn send(&mut self, log_msg: LogMsg) { if !self.enabled {