Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Apr 10, 2023
1 parent d4baeea commit bb3938f
Show file tree
Hide file tree
Showing 16 changed files with 505 additions and 125 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 10 additions & 5 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ pub type InsertIdVec = SmallVec<[u64; 4]>;
/// so far.
///
/// See also [`DataStore::lookup_datatype`].
#[derive(Default)]
#[derive(Debug, Default)]
pub struct DataTypeRegistry(IntMap<ComponentName, DataType>);

impl DataTypeRegistry {
// TODO: why is this here tho
// Estimated size in bytes of the registry.
pub fn size_bytes(&self) -> u64 {
type K = ComponentName;
Expand Down Expand Up @@ -104,10 +105,11 @@ impl std::ops::DerefMut for DataTypeRegistry {
}

/// Keeps track of arbitrary per-row metadata.
#[derive(Default)]
#[derive(Debug, Default)]
pub struct MetadataRegistry<T: Clone>(BTreeMap<RowId, T>);

impl MetadataRegistry<TimePoint> {
// TODO: why is this here tho
// Estimated size in bytes of the registry.
pub fn size_bytes(&self) -> u64 {
type K = RowId;
Expand All @@ -118,9 +120,12 @@ impl MetadataRegistry<TimePoint> {
let inner: &BTreeMap<K, V> = &self.0;

let keys_size_bytes = std::mem::size_of::<K>() * inner.len();
let values_size_bytes = std::mem::size_of::<V>() * inner.len();
let values_size_bytes = inner
.values()
.map(|timepoint| timepoint.size_bytes())
.sum::<u64>();

keys_size_bytes as u64 + values_size_bytes as u64
keys_size_bytes as u64 + values_size_bytes
}
}

Expand Down Expand Up @@ -178,7 +183,7 @@ pub struct DataStore {
/// Keeps track of arbitrary per-row metadata.
///
/// Only used to map `RowId`s to their original [`TimePoint`]s at the moment.
pub(crate) metadata_registry: MetadataRegistry<TimePoint>,
pub metadata_registry: MetadataRegistry<TimePoint>,

// TODO: part of the size stats
/// Used to cache auto-generated cluster cells (`[0]`, `[0, 1]`, `[0, 1, 2]`, ...)
Expand Down
208 changes: 207 additions & 1 deletion crates/re_arrow_store/src/store_gc.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
use re_log_types::{RowId, TimeInt};

use crate::{store::IndexedBucketInner, DataStore, DataStoreStats};

// ---

#[derive(Debug, Clone, Copy)]
pub enum GarbageCollectionTarget {
/// Try to drop _at least_ the given percentage.
Expand All @@ -17,4 +23,204 @@ impl std::fmt::Display for GarbageCollectionTarget {
}
}

// TODO(#1619): Implement garbage collection.
// TODO: test with
// 1. clocks infinite
// 2. canny infinite
// 3. objectron but not insanely fast

impl DataStore {
/// Triggers a garbage collection according to the desired `target`.
///
/// Returns the list of `RowId`s that were purged from the store.
///
/// ## Semantics
///
/// Garbage collection works a row-level basis and is driven by `RowId` order, i.e. the order
/// defined by the clients' wall-clocks, allowing it to drop data across the different timelines
/// in a fair, deterministic manner.
/// Similarly, out-of-order data is supported out of the box.
///
/// The garbage collector doesn't deallocate data in and of itself: all it does is drop the
/// store's internal references to that data (the `DataCell`s), which will be deallocated once
/// their reference count reaches 0.
///
/// ## Limitations
///
/// The garbage collector is currently unaware of our latest-at semantics, i.e. it will drop
/// old data even if doing so would impact the results of recent queries.
/// See <issue>.
//
// TODO(cmc): There shouldn't be any need to return the purged `RowId`s, all secondary
// datastructures should be able to purge themselves based solely off of
// `DataStore::oldest_time_per_timeline`.
//
// TODO(cmc): The GC should be aware of latest-at semantics and make sure they are upheld when
// purging data.
pub fn gc(&mut self, target: GarbageCollectionTarget) -> (Vec<RowId>, DataStoreStats) {
crate::profile_function!();

self.gc_id += 1;

let stats_before = DataStoreStats::from_store(self);
let initial_nb_rows = stats_before.total_temporal_rows;
// NOTE: only temporal data and row metadata get purged!
let initial_size_bytes = (stats_before.total_temporal_size_bytes
+ stats_before.metadata_registry_size_bytes) as f64;

let row_ids = match target {
GarbageCollectionTarget::DropAtLeastPercentage(p) => {
assert!((0.0..=1.0).contains(&p));

let drop_at_least_size_bytes = initial_size_bytes * p;
let target_size_bytes = initial_size_bytes - drop_at_least_size_bytes;

re_log::debug!(
kind = "gc",
id = self.gc_id,
%target,
initial_nb_rows = re_format::format_large_number(initial_nb_rows as _),
initial_size_bytes = re_format::format_bytes(initial_size_bytes),
target_size_bytes = re_format::format_bytes(target_size_bytes),
drop_at_least_size_bytes = re_format::format_bytes(drop_at_least_size_bytes),
"starting GC"
);

self.gc_drop_at_least_size_bytes(drop_at_least_size_bytes)
}
};

#[cfg(debug_assertions)]
self.sanity_check().unwrap();

let stats_after = DataStoreStats::from_store(self);
let new_nb_rows = stats_after.total_temporal_rows;
// NOTE: only temporal data and row metadata get purged!
let new_size_bytes = (stats_before.total_temporal_size_bytes
+ stats_before.metadata_registry_size_bytes) as f64;

re_log::debug!(
kind = "gc",
id = self.gc_id,
%target,
initial_nb_rows = re_format::format_large_number(initial_nb_rows as _),
initial_size_bytes = re_format::format_bytes(initial_size_bytes),
new_nb_rows = re_format::format_large_number(new_nb_rows as _),
new_size_bytes = re_format::format_bytes(new_size_bytes),
"GC done"
);

let stats_diff = stats_before - stats_after;

(row_ids, stats_diff)
}

fn gc_drop_at_least_size_bytes(&mut self, mut drop_at_least_size_bytes: f64) -> Vec<RowId> {
let mut row_ids = Vec::new();

while drop_at_least_size_bytes > 0.0 {
// pop next row id
let Some((row_id, timepoint)) = self.metadata_registry.pop_first() else {
break;
};
drop_at_least_size_bytes -= std::mem::size_of_val(&row_id) as f64;
drop_at_least_size_bytes -= timepoint.size_bytes() as f64;
row_ids.push(row_id);

let tables = self.tables.iter_mut().filter_map(|((timeline, _), table)| {
timepoint.get(timeline).map(|time| (*time, table))
});

for (time, table) in tables {
let table_has_more_than_one_bucket = table.buckets.len() > 1;

let (bucket_key, bucket) = table.find_bucket_mut(time);
let bucket_size_bytes = bucket.size_bytes();

let (dropped_rows, mut dropped_size_bytes) = {
let inner = &mut *bucket.inner.write();
inner.sort();

let IndexedBucketInner {
is_sorted,
time_range: _, // NOTE: Lazily updated when sorting
col_time,
col_insert_id,
col_row_id,
col_num_instances,
columns,
size_bytes,
} = inner;

let mut dropped_rows = 0u64;
let mut dropped_size_bytes = 0u64;

let mut col_index = col_time.binary_search(&time.as_i64()).unwrap_or(0);

while col_time.get(col_index) == Some(&time.as_i64()) {
if col_row_id[col_index] != row_id {
col_index += 1;
continue;
}

// col_row_id
let removed_row_id = col_row_id.swap_remove(col_index);
debug_assert_eq!(row_id, removed_row_id);
dropped_size_bytes += std::mem::size_of_val(&removed_row_id) as u64;

// col_time
let row_time = col_time.swap_remove(col_index);
dropped_size_bytes += std::mem::size_of_val(&row_time) as u64;

// col_insert_id (if present)
if !col_insert_id.is_empty() {
dropped_size_bytes +=
std::mem::size_of_val(&col_insert_id.swap_remove(col_index)) as u64;
}

// col_num_instances
dropped_size_bytes +=
std::mem::size_of_val(&col_num_instances.swap_remove(col_index)) as u64;

// each data column
for column in columns.values_mut() {
dropped_size_bytes += column
.0
.swap_remove(col_index)
.map_or(0, |cell| cell.size_bytes());
}

*is_sorted = false;

dropped_rows += 1;
col_index += 1;
}

*size_bytes -= dropped_size_bytes;

(dropped_rows, dropped_size_bytes)
};

// NOTE: We always need to keep at least one bucket alive, otherwise we have
// nowhere to write to.
if table_has_more_than_one_bucket && bucket.num_rows() == 0 {
// NOTE: We're dropping the bucket itself in this case, rather than just its
// contents.
dropped_size_bytes = bucket_size_bytes;
table.buckets.remove(&bucket_key);

if bucket_key == TimeInt::MIN {
if let Some((_, bucket)) = table.buckets.pop_first() {
table.buckets.insert(TimeInt::MIN, bucket);
}
}
}

drop_at_least_size_bytes -= dropped_size_bytes as f64;
table.buckets_size_bytes -= dropped_size_bytes;
table.buckets_num_rows -= dropped_rows;
}
}

row_ids
}
}
5 changes: 4 additions & 1 deletion crates/re_arrow_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ impl IndexedBucketInner {
pub fn sort(&mut self) {
let Self {
is_sorted,
time_range: _,
time_range,
col_time,
col_insert_id,
col_row_id,
Expand Down Expand Up @@ -967,6 +967,9 @@ impl IndexedBucketInner {
}

*is_sorted = true;

time_range.min = col_time.first().copied().unwrap_or(i64::MAX).into();
time_range.max = col_time.last().copied().unwrap_or(i64::MIN).into();
}
}

Expand Down
14 changes: 7 additions & 7 deletions crates/re_arrow_store/src/store_sanity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ impl IndexedTable {
}
}

// Run individual bucket sanity check suites too.
for bucket in self.buckets.values() {
bucket.sanity_check()?;
}

// Make sure size values aren't out of sync
{
let total_size_bytes = self.total_size_bytes();
Expand All @@ -125,11 +130,6 @@ impl IndexedTable {
}
}

// Run individual bucket sanity check suites too.
for bucket in self.buckets.values() {
bucket.sanity_check()?;
}

Ok(())
}
}
Expand Down Expand Up @@ -191,7 +191,7 @@ impl IndexedBucket {
}

// The cluster column must be fully dense.
{
if self.num_rows() > 0 {
let cluster_column =
columns
.get(cluster_key)
Expand Down Expand Up @@ -272,7 +272,7 @@ impl PersistentIndexedTable {
}

// The cluster column must be fully dense.
{
if self.total_rows() > 0 {
let cluster_column =
columns
.get(cluster_key)
Expand Down
26 changes: 26 additions & 0 deletions crates/re_arrow_store/src/store_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,32 @@ pub struct DataStoreStats {
pub total_size_bytes: u64,
}

impl std::ops::Sub for DataStoreStats {
type Output = Self;

fn sub(self, rhs: Self) -> Self::Output {
Self {
type_registry_rows: self.type_registry_rows - rhs.type_registry_rows,
type_registry_size_bytes: self.type_registry_size_bytes - rhs.type_registry_size_bytes,
metadata_registry_rows: self.metadata_registry_rows - rhs.metadata_registry_rows,
metadata_registry_size_bytes: self.metadata_registry_size_bytes
- rhs.metadata_registry_size_bytes,
total_autogenerated_rows: self.total_autogenerated_rows - rhs.total_autogenerated_rows,
total_autogenerated_size_bytes: self.total_autogenerated_size_bytes
- rhs.total_autogenerated_size_bytes,
total_timeless_rows: self.total_timeless_rows - rhs.total_timeless_rows,
total_timeless_size_bytes: self.total_timeless_size_bytes
- rhs.total_timeless_size_bytes,
total_temporal_rows: self.total_temporal_rows - rhs.total_temporal_rows,
total_temporal_size_bytes: self.total_temporal_size_bytes
- rhs.total_temporal_size_bytes,
total_temporal_buckets: self.total_temporal_buckets - rhs.total_temporal_buckets,
total_rows: self.total_rows - rhs.total_rows,
total_size_bytes: self.total_size_bytes - rhs.total_size_bytes,
}
}
}

impl DataStoreStats {
pub fn from_store(store: &DataStore) -> Self {
crate::profile_function!();
Expand Down
Loading

0 comments on commit bb3938f

Please sign in to comment.