diff --git a/event-svc/src/event/ordering_task.rs b/event-svc/src/event/ordering_task.rs index 55c2d57e..5a133e4c 100644 --- a/event-svc/src/event/ordering_task.rs +++ b/event-svc/src/event/ordering_task.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use anyhow::anyhow; use ceramic_event::unvalidated; +use ceramic_sql::sqlite::SqliteConnection; use cid::Cid; use ipld_core::ipld::Ipld; use tokio::sync::mpsc::Sender; @@ -157,16 +158,25 @@ impl StreamEvent { } /// Builds a stream event from the database if it exists. - async fn load_by_cid(event_access: Arc, cid: EventCid) -> Result> { + async fn load_by_cid( + event_access: &Arc, + conn: &mut SqliteConnection, + cid: EventCid, + ) -> Result> { // TODO: Condense the multiple DB queries happening here into a single query - let (exists, deliverable) = event_access.deliverable_by_cid(&cid).await?; + let (exists, deliverable) = event_access + .deliverable_by_cid_with_conn(&cid, conn) + .await?; if exists { - let data = event_access.value_by_cid(&cid).await?.ok_or_else(|| { - Error::new_app(anyhow!( - "Missing event data for event that must exist: CID={}", - cid - )) - })?; + let data = event_access + .value_by_cid_with_conn(&cid, conn) + .await? + .ok_or_else(|| { + Error::new_app(anyhow!( + "Missing event data for event that must exist: CID={}", + cid + )) + })?; let (_cid, parsed) = unvalidated::Event::::decode_car(data.as_slice(), false) .map_err(Error::new_app)?; @@ -383,6 +393,8 @@ impl StreamEvents { debug!(count=%undelivered_q.len(), "undelivered events to process"); + // use one connection for all reads in the loop. will close when dropped + let mut conn = event_access.detach_ro_connection().await?; while let Some(StreamEventMetadata { cid: undelivered_cid, prev: desired_prev, @@ -405,7 +417,7 @@ impl StreamEvents { // nothing to do until it arrives on the channel } } else if let Some(discovered_prev) = - StreamEvent::load_by_cid(Arc::clone(&event_access), desired_prev).await? + StreamEvent::load_by_cid(&event_access, &mut conn, desired_prev).await? { match &discovered_prev { // we found our prev in the database and it's deliverable, so we're deliverable now @@ -535,14 +547,15 @@ impl OrderingState { tx: Sender, ) -> Result { info!("Attempting to process all undelivered events. This could take some time."); - let mut state = Self::new(); let mut iter_cnt = 0; let mut event_cnt = 0; let mut highwater = 0; + // use one connection for all our reads in this loop + let mut conn = event_access.detach_ro_connection().await?; while iter_cnt < max_iterations { iter_cnt += 1; let (undelivered, new_hw) = event_access - .undelivered_with_values(highwater, batch_size.into()) + .undelivered_with_values(&mut conn, highwater, batch_size.into()) .await?; highwater = new_hw; let found_something = !undelivered.is_empty(); @@ -553,9 +566,12 @@ impl OrderingState { // at the beginning and mark them all delivered, or we find a gap and stop processing and leave them in memory. // In this case, we won't discover them until we start running recon with a peer, so maybe we should drop them // or otherwise mark them ignored somehow. When this function ends, we do drop everything so for now it's probably okay. - let number_processed = state - .process_undelivered_events_batch(&event_access, undelivered, &tx) - .await?; + let number_processed = OrderingState::process_undelivered_events_batch( + &event_access, + undelivered, + &tx, + ) + .await?; event_cnt += number_processed; if event_cnt % LOG_EVERY_N_ENTRIES < number_processed { info!(count=%event_cnt, highwater=%new_hw, "Processed undelivered events"); @@ -573,7 +589,6 @@ impl OrderingState { } async fn process_undelivered_events_batch( - &mut self, event_access: &EventAccess, event_data: Vec<(Cid, unvalidated::Event)>, tx: &Sender, diff --git a/event-svc/src/store/sql/access/event.rs b/event-svc/src/store/sql/access/event.rs index 131705c8..92d865eb 100644 --- a/event-svc/src/store/sql/access/event.rs +++ b/event-svc/src/store/sql/access/event.rs @@ -8,7 +8,7 @@ use anyhow::anyhow; use ceramic_anchor_service::AnchorRequest; use ceramic_core::{event_id::InvalidEventId, Cid, EventId, NodeId}; use ceramic_event::unvalidated; -use ceramic_sql::sqlite::{SqlitePool, SqliteTransaction}; +use ceramic_sql::sqlite::{SqliteConnection, SqlitePool, SqliteTransaction}; use ipld_core::ipld::Ipld; use itertools::Itertools; use recon::{AssociativeHash, HashCount, Key, Sha256a}; @@ -166,6 +166,12 @@ impl EventAccess { self.pool.begin_tx().await } + /// Remove a readonly connection from the pool (permanently). + /// It may exceed max connections setting in the meantime. + pub async fn detach_ro_connection(&self) -> Result { + self.pool.detach_ro_connection().await + } + /// Get the current highwater mark for delivered events. pub async fn get_highwater_mark(&self) -> Result { Ok(self.delivered_counter.load(Ordering::Relaxed)) @@ -432,6 +438,7 @@ impl EventAccess { /// The highwater mark can be used on the next call to get the next batch of events and will be 0 when done. pub async fn undelivered_with_values( &self, + conn: &mut SqliteConnection, highwater_mark: i64, limit: i64, ) -> Result<(Vec<(Cid, unvalidated::Event)>, i64)> { @@ -454,7 +461,7 @@ impl EventAccess { sqlx::query_as(EventQuery::undelivered_with_values()) .bind(highwater_mark) .bind(limit) - .fetch_all(self.pool.reader()) + .fetch_all(conn.inner()) .await?; let max_highwater = all_blocks.iter().map(|row| row.row_id).max().unwrap_or(0); // if there's nothing in the list we just return 0 @@ -472,6 +479,18 @@ impl EventAccess { rebuild_car(blocks).await } + /// Finds the event data for the given CIDs i.e. the root CID in the carfile of the event. + pub async fn value_by_cid_conn( + conn: &mut sqlx::SqliteConnection, + key: &Cid, + ) -> Result>> { + let blocks: Vec = sqlx::query_as(EventQuery::value_blocks_by_cid_one()) + .bind(key.to_bytes()) + .fetch_all(conn) + .await?; + rebuild_car(blocks).await + } + /// Finds the event data by a given CID i.e. the root CID in the carfile of the event. pub async fn value_by_cid(&self, key: &Cid) -> Result>> { let blocks: Vec = sqlx::query_as(EventQuery::value_blocks_by_cid_one()) @@ -481,6 +500,19 @@ impl EventAccess { rebuild_car(blocks).await } + /// Finds the event data by a given CID i.e. the root CID in the carfile of the event using a provided connection + pub async fn value_by_cid_with_conn( + &self, + key: &Cid, + conn: &mut SqliteConnection, + ) -> Result>> { + let blocks: Vec = sqlx::query_as(EventQuery::value_blocks_by_cid_one()) + .bind(key.to_bytes()) + .fetch_all(conn.inner()) + .await?; + rebuild_car(blocks).await + } + /// Finds if an event exists and has been previously delivered, meaning anything that depends on it can be delivered. /// returns (bool, bool) = (exists, deliverable) /// We don't guarantee that a client has seen the event, just that it's been marked as deliverable and they could. @@ -497,6 +529,26 @@ impl EventAccess { Ok(exist.map_or((false, false), |row| (row.exists, row.delivered))) } + /// Finds if an event exists and has been previously delivered, meaning anything that depends on it can be delivered. + /// returns (bool, bool) = (exists, deliverable) + /// We don't guarantee that a client has seen the event, just that it's been marked as deliverable and they could. + pub async fn deliverable_by_cid_with_conn( + &self, + key: &Cid, + conn: &mut SqliteConnection, + ) -> Result<(bool, bool)> { + #[derive(sqlx::FromRow)] + struct CidExists { + exists: bool, + delivered: bool, + } + let exist: Option = sqlx::query_as(EventQuery::value_delivered_by_cid()) + .bind(key.to_bytes()) + .fetch_optional(conn.inner()) + .await?; + Ok(exist.map_or((false, false), |row| (row.exists, row.delivered))) + } + /// Fetch data event CIDs from a specified source that are above the current anchoring high water mark pub async fn data_events_by_informant( &self, diff --git a/event-svc/src/store/sql/test.rs b/event-svc/src/store/sql/test.rs index 90b51cd9..06aa9453 100644 --- a/event-svc/src/store/sql/test.rs +++ b/event-svc/src/store/sql/test.rs @@ -155,8 +155,9 @@ async fn undelivered_with_values() { let pool = SqlitePool::connect_in_memory().await.unwrap(); let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap()); + let mut conn = event_access.detach_ro_connection().await.unwrap(); let (res, hw) = event_access - .undelivered_with_values(0, 10000) + .undelivered_with_values(&mut conn, 0, 10000) .await .unwrap(); assert_eq!(res.len(), 0); diff --git a/sql/src/sqlite.rs b/sql/src/sqlite.rs index 11ca3144..7ee154ad 100644 --- a/sql/src/sqlite.rs +++ b/sql/src/sqlite.rs @@ -139,6 +139,12 @@ impl SqlitePool { Ok(SqliteTransaction { tx }) } + /// Remove a connection from the pool (permanently). It may exceed max connections setting in the meantime. + pub async fn detach_ro_connection(&self) -> Result { + let conn = self.reader.acquire().await?.detach(); + Ok(SqliteConnection { conn }) + } + /// Get a reference to the writer database pool. The writer pool has only one connection. /// If you are going to do multiple writes in a row, instead use `tx` and `commit`. pub fn writer(&self) -> &sqlx::SqlitePool { @@ -157,6 +163,19 @@ impl SqlitePool { } } +#[derive(Debug)] +/// A wrapper around a sqlx Sqlite connection +pub struct SqliteConnection { + conn: sqlx::SqliteConnection, +} + +impl SqliteConnection { + /// Access to the `sqlx::SqliteConnection` directly + pub fn inner(&mut self) -> &mut sqlx::SqliteConnection { + &mut self.conn + } +} + #[derive(Debug)] /// A wrapper around a sqlx Sqlite transaction pub struct SqliteTransaction<'a> { @@ -176,6 +195,7 @@ impl<'a> SqliteTransaction<'a> { Ok(()) } + /// Access to the `sqlx::Transaction` directly pub fn inner(&mut self) -> &mut Transaction<'a, Sqlite> { &mut self.tx }