Skip to content

Commit

Permalink
chore: hack to try reusing a sqlite connection for undelivered proces…
Browse files Browse the repository at this point in the history
…sing
  • Loading branch information
dav1do committed Dec 3, 2024
1 parent a5a6624 commit 1d03275
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 18 deletions.
45 changes: 30 additions & 15 deletions event-svc/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;

use anyhow::anyhow;
use ceramic_event::unvalidated;
use ceramic_sql::sqlite::SqliteConnection;
use cid::Cid;
use ipld_core::ipld::Ipld;
use tokio::sync::mpsc::Sender;
Expand Down Expand Up @@ -157,16 +158,25 @@ impl StreamEvent {
}

/// Builds a stream event from the database if it exists.
async fn load_by_cid(event_access: Arc<EventAccess>, cid: EventCid) -> Result<Option<Self>> {
async fn load_by_cid(
event_access: &Arc<EventAccess>,
conn: &mut SqliteConnection,
cid: EventCid,
) -> Result<Option<Self>> {
// TODO: Condense the multiple DB queries happening here into a single query
let (exists, deliverable) = event_access.deliverable_by_cid(&cid).await?;
let (exists, deliverable) = event_access
.deliverable_by_cid_with_conn(&cid, conn)
.await?;
if exists {
let data = event_access.value_by_cid(&cid).await?.ok_or_else(|| {
Error::new_app(anyhow!(
"Missing event data for event that must exist: CID={}",
cid
))
})?;
let data = event_access
.value_by_cid_with_conn(&cid, conn)
.await?
.ok_or_else(|| {
Error::new_app(anyhow!(
"Missing event data for event that must exist: CID={}",
cid
))
})?;
let (_cid, parsed) = unvalidated::Event::<Ipld>::decode_car(data.as_slice(), false)
.map_err(Error::new_app)?;

Expand Down Expand Up @@ -383,6 +393,8 @@ impl StreamEvents {

debug!(count=%undelivered_q.len(), "undelivered events to process");

// use one connection for all reads in the loop. will close when dropped
let mut conn = event_access.detach_ro_connection().await?;
while let Some(StreamEventMetadata {
cid: undelivered_cid,
prev: desired_prev,
Expand All @@ -405,7 +417,7 @@ impl StreamEvents {
// nothing to do until it arrives on the channel
}
} else if let Some(discovered_prev) =
StreamEvent::load_by_cid(Arc::clone(&event_access), desired_prev).await?
StreamEvent::load_by_cid(&event_access, &mut conn, desired_prev).await?
{
match &discovered_prev {
// we found our prev in the database and it's deliverable, so we're deliverable now
Expand Down Expand Up @@ -535,14 +547,15 @@ impl OrderingState {
tx: Sender<DiscoveredEvent>,
) -> Result<usize> {
info!("Attempting to process all undelivered events. This could take some time.");
let mut state = Self::new();
let mut iter_cnt = 0;
let mut event_cnt = 0;
let mut highwater = 0;
// use one connection for all our reads in this loop
let mut conn = event_access.detach_ro_connection().await?;
while iter_cnt < max_iterations {
iter_cnt += 1;
let (undelivered, new_hw) = event_access
.undelivered_with_values(highwater, batch_size.into())
.undelivered_with_values(&mut conn, highwater, batch_size.into())
.await?;
highwater = new_hw;
let found_something = !undelivered.is_empty();
Expand All @@ -553,9 +566,12 @@ impl OrderingState {
// at the beginning and mark them all delivered, or we find a gap and stop processing and leave them in memory.
// In this case, we won't discover them until we start running recon with a peer, so maybe we should drop them
// or otherwise mark them ignored somehow. When this function ends, we do drop everything so for now it's probably okay.
let number_processed = state
.process_undelivered_events_batch(&event_access, undelivered, &tx)
.await?;
let number_processed = OrderingState::process_undelivered_events_batch(
&event_access,
undelivered,
&tx,
)
.await?;
event_cnt += number_processed;
if event_cnt % LOG_EVERY_N_ENTRIES < number_processed {
info!(count=%event_cnt, highwater=%new_hw, "Processed undelivered events");
Expand All @@ -573,7 +589,6 @@ impl OrderingState {
}

async fn process_undelivered_events_batch(
&mut self,
event_access: &EventAccess,
event_data: Vec<(Cid, unvalidated::Event<Ipld>)>,
tx: &Sender<DiscoveredEvent>,
Expand Down
56 changes: 54 additions & 2 deletions event-svc/src/store/sql/access/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use anyhow::anyhow;
use ceramic_anchor_service::AnchorRequest;
use ceramic_core::{event_id::InvalidEventId, Cid, EventId, NodeId};
use ceramic_event::unvalidated;
use ceramic_sql::sqlite::{SqlitePool, SqliteTransaction};
use ceramic_sql::sqlite::{SqliteConnection, SqlitePool, SqliteTransaction};
use ipld_core::ipld::Ipld;
use itertools::Itertools;
use recon::{AssociativeHash, HashCount, Key, Sha256a};
Expand Down Expand Up @@ -166,6 +166,12 @@ impl EventAccess {
self.pool.begin_tx().await
}

/// Remove a readonly connection from the pool (permanently).
/// It may exceed max connections setting in the meantime.
pub async fn detach_ro_connection(&self) -> Result<SqliteConnection> {
self.pool.detach_ro_connection().await
}

/// Get the current highwater mark for delivered events.
pub async fn get_highwater_mark(&self) -> Result<i64> {
Ok(self.delivered_counter.load(Ordering::Relaxed))
Expand Down Expand Up @@ -432,6 +438,7 @@ impl EventAccess {
/// The highwater mark can be used on the next call to get the next batch of events and will be 0 when done.
pub async fn undelivered_with_values(
&self,
conn: &mut SqliteConnection,
highwater_mark: i64,
limit: i64,
) -> Result<(Vec<(Cid, unvalidated::Event<Ipld>)>, i64)> {
Expand All @@ -454,7 +461,7 @@ impl EventAccess {
sqlx::query_as(EventQuery::undelivered_with_values())
.bind(highwater_mark)
.bind(limit)
.fetch_all(self.pool.reader())
.fetch_all(conn.inner())
.await?;

let max_highwater = all_blocks.iter().map(|row| row.row_id).max().unwrap_or(0); // if there's nothing in the list we just return 0
Expand All @@ -472,6 +479,18 @@ impl EventAccess {
rebuild_car(blocks).await
}

/// Finds the event data for the given CIDs i.e. the root CID in the carfile of the event.
pub async fn value_by_cid_conn(
conn: &mut sqlx::SqliteConnection,
key: &Cid,
) -> Result<Option<Vec<u8>>> {
let blocks: Vec<BlockRow> = sqlx::query_as(EventQuery::value_blocks_by_cid_one())
.bind(key.to_bytes())
.fetch_all(conn)
.await?;
rebuild_car(blocks).await
}

/// Finds the event data by a given CID i.e. the root CID in the carfile of the event.
pub async fn value_by_cid(&self, key: &Cid) -> Result<Option<Vec<u8>>> {
let blocks: Vec<BlockRow> = sqlx::query_as(EventQuery::value_blocks_by_cid_one())
Expand All @@ -481,6 +500,19 @@ impl EventAccess {
rebuild_car(blocks).await
}

/// Finds the event data by a given CID i.e. the root CID in the carfile of the event using a provided connection
pub async fn value_by_cid_with_conn(
&self,
key: &Cid,
conn: &mut SqliteConnection,
) -> Result<Option<Vec<u8>>> {
let blocks: Vec<BlockRow> = sqlx::query_as(EventQuery::value_blocks_by_cid_one())
.bind(key.to_bytes())
.fetch_all(conn.inner())
.await?;
rebuild_car(blocks).await
}

/// Finds if an event exists and has been previously delivered, meaning anything that depends on it can be delivered.
/// returns (bool, bool) = (exists, deliverable)
/// We don't guarantee that a client has seen the event, just that it's been marked as deliverable and they could.
Expand All @@ -497,6 +529,26 @@ impl EventAccess {
Ok(exist.map_or((false, false), |row| (row.exists, row.delivered)))
}

/// Finds if an event exists and has been previously delivered, meaning anything that depends on it can be delivered.
/// returns (bool, bool) = (exists, deliverable)
/// We don't guarantee that a client has seen the event, just that it's been marked as deliverable and they could.
pub async fn deliverable_by_cid_with_conn(
&self,
key: &Cid,
conn: &mut SqliteConnection,
) -> Result<(bool, bool)> {
#[derive(sqlx::FromRow)]
struct CidExists {
exists: bool,
delivered: bool,
}
let exist: Option<CidExists> = sqlx::query_as(EventQuery::value_delivered_by_cid())
.bind(key.to_bytes())
.fetch_optional(conn.inner())
.await?;
Ok(exist.map_or((false, false), |row| (row.exists, row.delivered)))
}

/// Fetch data event CIDs from a specified source that are above the current anchoring high water mark
pub async fn data_events_by_informant(
&self,
Expand Down
3 changes: 2 additions & 1 deletion event-svc/src/store/sql/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ async fn undelivered_with_values() {
let pool = SqlitePool::connect_in_memory().await.unwrap();
let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap());

let mut conn = event_access.detach_ro_connection().await.unwrap();
let (res, hw) = event_access
.undelivered_with_values(0, 10000)
.undelivered_with_values(&mut conn, 0, 10000)
.await
.unwrap();
assert_eq!(res.len(), 0);
Expand Down
20 changes: 20 additions & 0 deletions sql/src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ impl SqlitePool {
Ok(SqliteTransaction { tx })
}

/// Remove a connection from the pool (permanently). It may exceed max connections setting in the meantime.
pub async fn detach_ro_connection(&self) -> Result<SqliteConnection> {
let conn = self.reader.acquire().await?.detach();
Ok(SqliteConnection { conn })
}

/// Get a reference to the writer database pool. The writer pool has only one connection.
/// If you are going to do multiple writes in a row, instead use `tx` and `commit`.
pub fn writer(&self) -> &sqlx::SqlitePool {
Expand All @@ -157,6 +163,19 @@ impl SqlitePool {
}
}

#[derive(Debug)]
/// A wrapper around a sqlx Sqlite connection
pub struct SqliteConnection {
conn: sqlx::SqliteConnection,
}

impl SqliteConnection {
/// Access to the `sqlx::SqliteConnection` directly
pub fn inner(&mut self) -> &mut sqlx::SqliteConnection {
&mut self.conn
}
}

#[derive(Debug)]
/// A wrapper around a sqlx Sqlite transaction
pub struct SqliteTransaction<'a> {
Expand All @@ -176,6 +195,7 @@ impl<'a> SqliteTransaction<'a> {
Ok(())
}

/// Access to the `sqlx::Transaction` directly
pub fn inner(&mut self) -> &mut Transaction<'a, Sqlite> {
&mut self.tx
}
Expand Down

0 comments on commit 1d03275

Please sign in to comment.