Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datastore: revamp bench suite #1733

Merged
merged 5 commits into from
Mar 30, 2023
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 179 additions & 91 deletions crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,88 +8,125 @@ use re_arrow_store::{DataStore, DataStoreConfig, LatestAtQuery, RangeQuery, Time
use re_log_types::{
component_types::{InstanceKey, Rect2D},
datagen::{build_frame_nr, build_some_instances, build_some_rects},
Component as _, ComponentName, DataRow, EntityPath, MsgId, TimeType, Timeline,
Component as _, ComponentName, DataRow, DataTable, EntityPath, MsgId, TimeType, Timeline,
};

// ---

#[cfg(not(debug_assertions))]
const NUM_FRAMES: i64 = 100;
const NUM_ROWS: i64 = 1_000;
#[cfg(not(debug_assertions))]
const NUM_RECTS: i64 = 100;
const NUM_INSTANCES: i64 = 1_000;

// `cargo test` also runs the benchmark setup code, so make sure they run quickly:
#[cfg(debug_assertions)]
const NUM_FRAMES: i64 = 1;
const NUM_ROWS: i64 = 1;
#[cfg(debug_assertions)]
const NUM_RECTS: i64 = 1;
const NUM_INSTANCES: i64 = 1;

// --- Benchmarks ---

// TODO(cmc): need additional benches for full tables

fn insert(c: &mut Criterion) {
{
let rows = build_rows(NUM_RECTS as usize);
let mut group = c.benchmark_group("datastore/insert/batch/rects");
for packed in [false, true] {
let mut group = c.benchmark_group(format!(
"datastore/num_rows={NUM_ROWS}/num_instances={NUM_INSTANCES}/packed={packed}/insert"
));
group.throughput(criterion::Throughput::Elements(
(NUM_RECTS * NUM_FRAMES) as _,
(NUM_INSTANCES * NUM_ROWS) as _,
));
group.bench_function("insert", |b| {
b.iter(|| insert_rows(Default::default(), InstanceKey::name(), rows.iter()));

let table = build_table(NUM_INSTANCES as usize, packed);

// Default config
group.bench_function("default", |b| {
b.iter(|| insert_table(Default::default(), InstanceKey::name(), &table));
});

// Emulate more or less buckets
let num_rows_per_bucket = [0, 2, 32, 2048];
for num_rows_per_bucket in num_rows_per_bucket {
group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| {
b.iter(|| {
insert_table(
DataStoreConfig {
index_bucket_nb_rows: num_rows_per_bucket,
component_bucket_nb_rows: num_rows_per_bucket,
index_bucket_size_bytes: u64::MAX,
component_bucket_size_bytes: u64::MAX,
..Default::default()
},
InstanceKey::name(),
&table,
)
});
});
}
}
}

fn latest_at_batch(c: &mut Criterion) {
{
let rows = build_rows(NUM_RECTS as usize);
let store = insert_rows(Default::default(), InstanceKey::name(), rows.iter());
let mut group = c.benchmark_group("datastore/latest_at/batch/rects");
group.throughput(criterion::Throughput::Elements(NUM_RECTS as _));
group.bench_function("query", |b| {
b.iter(|| {
let results = latest_data_at(&store, Rect2D::name(), &[Rect2D::name()]);
let rects = results[0]
.as_ref()
.unwrap()
.as_any()
.downcast_ref::<UnionArray>()
.unwrap();
assert_eq!(NUM_RECTS as usize, rects.len());
});
fn latest_at(c: &mut Criterion) {
for packed in [false, true] {
let mut group = c.benchmark_group(format!(
"datastore/num_rows={NUM_ROWS}/num_instances={NUM_INSTANCES}/packed={packed}/latest_at"
));
group.throughput(criterion::Throughput::Elements(NUM_INSTANCES as _));

let table = build_table(NUM_INSTANCES as usize, packed);

// Default config
group.bench_function("default", |b| {
b.iter(|| insert_table(Default::default(), InstanceKey::name(), &table));
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
});

// Emulate more or less buckets
let num_rows_per_bucket = [0, 2, 32, 2048];
for num_rows_per_bucket in num_rows_per_bucket {
let store = insert_table(
DataStoreConfig {
index_bucket_nb_rows: num_rows_per_bucket,
component_bucket_nb_rows: num_rows_per_bucket,
index_bucket_size_bytes: u64::MAX,
component_bucket_size_bytes: u64::MAX,
..Default::default()
},
InstanceKey::name(),
&table,
);
group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| {
b.iter(|| {
let results = latest_data_at(&store, Rect2D::name(), &[Rect2D::name()]);
let rects = results[0]
.as_ref()
.unwrap()
.as_any()
.downcast_ref::<UnionArray>()
.unwrap();
assert_eq!(NUM_INSTANCES as usize, rects.len());
});
});
}
}
}

fn latest_at_missing_components(c: &mut Criterion) {
// Simulate the worst possible case: many many buckets.
let config = DataStoreConfig {
index_bucket_size_bytes: 0,
index_bucket_nb_rows: 0,
..Default::default()
};

{
let msgs = build_rows(NUM_RECTS as usize);
let store = insert_rows(config.clone(), InstanceKey::name(), msgs.iter());
let mut group = c.benchmark_group("datastore/latest_at/missing_components");
group.throughput(criterion::Throughput::Elements(NUM_RECTS as _));
group.bench_function("primary", |b| {
fn latest_at_missing(c: &mut Criterion) {
for packed in [false, true] {
let mut group = c.benchmark_group(format!(
"datastore/num_rows={NUM_ROWS}/num_instances={NUM_INSTANCES}/packed={packed}/latest_at_missing"
));
group.throughput(criterion::Throughput::Elements(NUM_INSTANCES as _));

let table = build_table(NUM_INSTANCES as usize, packed);

// Default config
let store = insert_table(Default::default(), InstanceKey::name(), &table);
group.bench_function("primary/default", |b| {
b.iter(|| {
let results =
latest_data_at(&store, "non_existing_component".into(), &[Rect2D::name()]);
assert!(results[0].is_none());
});
});
}

{
let msgs = build_rows(NUM_RECTS as usize);
let store = insert_rows(config, InstanceKey::name(), msgs.iter());
let mut group = c.benchmark_group("datastore/latest_at/missing_components");
group.throughput(criterion::Throughput::Elements(NUM_RECTS as _));
group.bench_function("secondaries", |b| {
group.bench_function("secondaries/default", |b| {
b.iter(|| {
let results = latest_data_at(
&store,
Expand All @@ -105,69 +142,123 @@ fn latest_at_missing_components(c: &mut Criterion) {
assert!(results[2].is_none());
});
});
}
}

fn range_batch(c: &mut Criterion) {
{
let msgs = build_rows(NUM_RECTS as usize);
let store = insert_rows(Default::default(), InstanceKey::name(), msgs.iter());
let mut group = c.benchmark_group("datastore/range/batch/rects");
group.throughput(criterion::Throughput::Elements(
(NUM_RECTS * NUM_FRAMES) as _,
));
group.bench_function("query", |b| {
b.iter(|| {
let msgs = range_data(&store, [Rect2D::name()]);
for (cur_time, (time, results)) in msgs.enumerate() {
let time = time.unwrap();
assert_eq!(cur_time as i64, time.as_i64());

// Emulate more or less buckets
let num_rows_per_bucket = [0, 2, 32, 2048];
for num_rows_per_bucket in num_rows_per_bucket {
let store = insert_table(
DataStoreConfig {
index_bucket_nb_rows: num_rows_per_bucket,
component_bucket_nb_rows: num_rows_per_bucket,
index_bucket_size_bytes: u64::MAX,
component_bucket_size_bytes: u64::MAX,
..Default::default()
},
InstanceKey::name(),
&table,
);
group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| {
b.iter(|| {
let results = latest_data_at(&store, Rect2D::name(), &[Rect2D::name()]);
let rects = results[0]
.as_ref()
.unwrap()
.as_any()
.downcast_ref::<UnionArray>()
.unwrap();
assert_eq!(NUM_RECTS as usize, rects.len());
}
assert_eq!(NUM_INSTANCES as usize, rects.len());
});
});
}
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
}
}

fn range(c: &mut Criterion) {
for packed in [false, true] {
let mut group = c.benchmark_group(format!(
"datastore/num_rows={NUM_ROWS}/num_instances={NUM_INSTANCES}/packed={packed}/range"
));
group.throughput(criterion::Throughput::Elements(
(NUM_INSTANCES * NUM_ROWS) as _,
));

let table = build_table(NUM_INSTANCES as usize, packed);

// Default config
group.bench_function("default", |b| {
b.iter(|| insert_table(Default::default(), InstanceKey::name(), &table));
});

// Emulate more or less buckets
let num_rows_per_bucket = [0, 2, 32, 2048];
for num_rows_per_bucket in num_rows_per_bucket {
let store = insert_table(
DataStoreConfig {
index_bucket_nb_rows: num_rows_per_bucket,
component_bucket_nb_rows: num_rows_per_bucket,
index_bucket_size_bytes: u64::MAX,
component_bucket_size_bytes: u64::MAX,
..Default::default()
},
InstanceKey::name(),
&table,
);
group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| {
b.iter(|| {
let msgs = range_data(&store, [Rect2D::name()]);
for (cur_time, (time, results)) in msgs.enumerate() {
let time = time.unwrap();
assert_eq!(cur_time as i64, time.as_i64());

let rects = results[0]
.as_ref()
.unwrap()
.as_any()
.downcast_ref::<UnionArray>()
.unwrap();
assert_eq!(NUM_INSTANCES as usize, rects.len());
}
});
});
}
}
}

criterion_group!(
benches,
insert,
latest_at_batch,
latest_at_missing_components,
range_batch,
);
criterion_group!(benches, insert, latest_at, latest_at_missing, range);
criterion_main!(benches);

// --- Helpers ---

fn build_rows(n: usize) -> Vec<DataRow> {
(0..NUM_FRAMES)
.map(move |frame_idx| {
fn build_table(n: usize, packed: bool) -> DataTable {
let mut table = DataTable::from_rows(
MsgId::ZERO,
(0..NUM_ROWS).map(move |frame_idx| {
DataRow::from_cells2(
MsgId::random(),
"rects",
[build_frame_nr(frame_idx.into())],
n as _,
(build_some_instances(n), build_some_rects(n)),
)
})
.collect()
}),
);

// Do a serialization roundtrip to pack everything in contiguous memory.
if packed {
let (schema, columns) = table.serialize().unwrap();
table = DataTable::deserialize(MsgId::ZERO, &schema, &columns).unwrap();
}

table
}

fn insert_rows<'a>(
fn insert_table(
config: DataStoreConfig,
cluster_key: ComponentName,
rows: impl Iterator<Item = &'a DataRow>,
table: &DataTable,
) -> DataStore {
let mut store = DataStore::new(cluster_key, config);
rows.for_each(|row| store.insert_row(row).unwrap());
store.insert_table(table).unwrap();
store
}

Expand All @@ -177,7 +268,7 @@ fn latest_data_at<const N: usize>(
secondaries: &[ComponentName; N],
) -> [Option<Box<dyn Array>>; N] {
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let timeline_query = LatestAtQuery::new(timeline_frame_nr, (NUM_FRAMES / 2).into());
let timeline_query = LatestAtQuery::new(timeline_frame_nr, (NUM_ROWS / 2).into());
let ent_path = EntityPath::from("rects");

let row_indices = store
Expand All @@ -191,10 +282,7 @@ fn range_data<const N: usize>(
components: [ComponentName; N],
) -> impl Iterator<Item = (Option<TimeInt>, [Option<Box<dyn Array>>; N])> + '_ {
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let query = RangeQuery::new(
timeline_frame_nr,
TimeRange::new(0.into(), NUM_FRAMES.into()),
);
let query = RangeQuery::new(timeline_frame_nr, TimeRange::new(0.into(), NUM_ROWS.into()));
let ent_path = EntityPath::from("rects");

store
Expand Down