Skip to content

Commit

Permalink
feat: use many tasks to order streams and discover undelivered events…
Browse files Browse the repository at this point in the history
… at startup (#620)

* feat: use a reader and a writer task to process undelivered events at start up

This will help some as we're able to do all the sorting/reading of event history in one task while the
 other finds new events that need to be added. It is similar to the insert/ordering task flow now.

* chore: add more undelivered startup tests

* chore: clippy

* feat: use multiple tasks to read events during ordering

we can process each stream individually, so we spawn tasks to handle batches
of streams so we can do db reads in parallel.

* feat: use multiple tasks order events for streams

* chore: fix nonsense doc comment

* chore: reduce ordering task message to debug on startup

* chore: one more info -> debug

* chore: fmt
  • Loading branch information
dav1do authored Dec 5, 2024
1 parent 1206a48 commit c959cc3
Show file tree
Hide file tree
Showing 9 changed files with 449 additions and 136 deletions.
4 changes: 2 additions & 2 deletions anchor-service/src/anchor_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,13 @@ mod tests {
);
let (shutdown_signal_tx, mut shutdown_signal) = broadcast::channel::<()>(1);
// let mut shutdown_signal = shutdown_signal_rx.resubscribe();
Some(tokio::spawn(async move {
tokio::spawn(async move {
anchor_service
.run(async move {
let _ = shutdown_signal.recv().await;
})
.await
}));
});
while event_service.events.lock().unwrap().is_empty() {
sleep(Duration::from_millis(1)).await;
}
Expand Down
556 changes: 430 additions & 126 deletions event-svc/src/event/ordering_task.rs

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions event-svc/src/event/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ use crate::{Error, Result};
/// How many events to select at once to see if they've become deliverable when we have downtime
/// Used at startup and occasionally in case we ever dropped something
/// We keep the number small for now as we may need to traverse many prevs for each one of these and load them into memory.
const DELIVERABLE_EVENTS_BATCH_SIZE: u32 = 1000;
const DELIVERABLE_EVENTS_BATCH_SIZE: u32 = 250;

/// How many batches of undelivered events are we willing to process on start up?
/// To avoid an infinite loop. It's going to take a long time to process `DELIVERABLE_EVENTS_BATCH_SIZE * MAX_ITERATIONS` events
const MAX_ITERATIONS: usize = 100_000_000;

/// The max number of events we can have pending for delivery in the channel before we start dropping them.
/// This is currently 304 bytes per event, so this is 3 MB of data
const PENDING_EVENTS_CHANNEL_DEPTH: usize = 1_000_000;
/// The max number of events we can have pending for delivery in the channel.
/// This is currently 304 bytes per event, but the task may have more in memory so to avoid growing indefinitely we apply backpressure
/// as we no longer use `try_send` and will wait for room to be available before accepting data.
pub(crate) const PENDING_EVENTS_CHANNEL_DEPTH: usize = 10_000;

/// The max number of events that can be waiting on a previous event before we can validate them
/// that are allowed to live in memory. This is possible during recon conversations where we discover things out of order,
Expand Down
5 changes: 5 additions & 0 deletions event-svc/src/store/sql/access/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,10 @@ impl EventAccess {
&self,
highwater_mark: i64,
limit: i64,
num_tasks: u32,
task_id: u32,
) -> Result<(Vec<(Cid, unvalidated::Event<Ipld>)>, i64)> {
debug_assert!(task_id < num_tasks, "task_id must be in 0..num_tasks");
struct UndeliveredEventBlockRow {
block: ReconEventBlockRaw,
row_id: i64,
Expand All @@ -466,6 +469,8 @@ impl EventAccess {
sqlx::query_as(EventQuery::undelivered_with_values())
.bind(highwater_mark)
.bind(limit)
.bind(num_tasks)
.bind(task_id)
.fetch_all(self.pool.reader())
.await?;

Expand Down
3 changes: 3 additions & 0 deletions event-svc/src/store/sql/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ impl EventQuery {
/// Requires binding two parameters:
/// $1: limit (i64)
/// $2: rowid (i64)
/// $3: number_of_tasks/partitions (i32)
/// $4: task_id (i32)
pub fn undelivered_with_values() -> &'static str {
r#"SELECT
key.order_key, key.event_cid, eb.codec, eb.root, eb.idx, b.multihash, b.bytes, key.rowid
Expand All @@ -94,6 +96,7 @@ impl EventQuery {
WHERE
EXISTS (SELECT 1 FROM ceramic_one_event_block where event_cid = e.cid)
AND e.delivered IS NULL and e.rowid > $1
AND (e.rowid % $3) = $4
LIMIT
$2
) key
Expand Down
2 changes: 1 addition & 1 deletion event-svc/src/store/sql/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ async fn undelivered_with_values() {
let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap());

let (res, hw) = event_access
.undelivered_with_values(0, 10000)
.undelivered_with_values(0, 10000, 1, 0)
.await
.unwrap();
assert_eq!(res.len(), 0);
Expand Down
2 changes: 1 addition & 1 deletion event-svc/src/tests/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ fn events_to_table(conclusion_events: &[ConclusionEvent]) -> String {
.join(", ")
)),
Cell::new(&event_cid.to_string()),
Cell::new(&data),
Cell::new(data),
Cell::new(&previous),
]));
}
Expand Down
2 changes: 1 addition & 1 deletion event-svc/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub(crate) fn random_cid() -> Cid {
Cid::new_v1(0x00, hash)
}
pub(crate) fn deterministic_cid(data: &[u8]) -> Cid {
let hash = MultihashDigest::digest(&Code::Sha2_256, &data);
let hash = MultihashDigest::digest(&Code::Sha2_256, data);
Cid::new_v1(0x00, hash)
}

Expand Down
2 changes: 1 addition & 1 deletion flight/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async fn execute_flight(
batches.append(&mut endpoint_batches);
}

Ok(concat_batches(&schema, &batches).context("concat_batches for output")?)
concat_batches(&schema, &batches).context("concat_batches for output")
}

mock! {
Expand Down

0 comments on commit c959cc3

Please sign in to comment.