diff --git a/Dockerfile b/Dockerfile index 667748344..519093dff 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM public.ecr.aws/r5b3e0r5/3box/rust-builder:latest as builder +FROM public.ecr.aws/r5b3e0r5/3box/rust-builder:latest AS builder RUN mkdir -p /home/builder/rust-ceramic WORKDIR /home/builder/rust-ceramic diff --git a/api/src/server.rs b/api/src/server.rs index 9e8b0fa55..0de34ece7 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -150,13 +150,7 @@ impl TryFrom for ValidatedInterest { pub trait InterestService: Send + Sync { /// Returns true if the key was newly inserted, false if it already existed. async fn insert(&self, key: Interest) -> Result; - async fn range( - &self, - start: &Interest, - end: &Interest, - offset: usize, - limit: usize, - ) -> Result>; + async fn range(&self, start: &Interest, end: &Interest) -> Result>; } #[async_trait] @@ -165,14 +159,8 @@ impl InterestService for Arc { self.as_ref().insert(key).await } - async fn range( - &self, - start: &Interest, - end: &Interest, - offset: usize, - limit: usize, - ) -> Result> { - self.as_ref().range(start, end, offset, limit).await + async fn range(&self, start: &Interest, end: &Interest) -> Result> { + self.as_ref().range(start, end).await } } @@ -271,8 +259,8 @@ pub trait EventService: Send + Sync { async fn range_with_values( &self, range: Range, - offset: usize, - limit: usize, + offset: u32, + limit: u32, ) -> Result)>>; /** @@ -313,8 +301,8 @@ impl EventService for Arc { async fn range_with_values( &self, range: Range, - offset: usize, - limit: usize, + offset: u32, + limit: u32, ) -> Result)>> { self.as_ref().range_with_values(range, offset, limit).await } @@ -605,12 +593,7 @@ where ) -> Result { let interests = self .interest - .range( - &Interest::min_value(), - &Interest::max_value(), - 0, - usize::MAX, - ) + .range(&Interest::min_value(), &Interest::max_value()) .await .map_err(|e| ErrorResponse::new(format!("failed to get interests: {e}")))?; @@ -654,9 +637,8 @@ where offset: Option, limit: Option, ) -> Result { - let limit: usize = - limit.map_or(10000, |l| if l.is_negative() { 10000 } else { l }) as usize; - let offset = offset.map_or(0, |o| if o.is_negative() { 0 } else { o }) as usize; + let limit = limit.map_or(10000, |l| if l.is_negative() { 10000 } else { l }) as u32; + let offset = offset.map_or(0, |o| if o.is_negative() { 0 } else { o }) as u32; let sep_value = match decode_multibase_data(&sep_value) { Ok(v) => v, Err(e) => return Ok(ExperimentalEventsSepSepValueGetResponse::BadRequest(e)), @@ -674,7 +656,7 @@ where .map(|(id, data)| BuildResponse::event(id, Some(data))) .collect::>(); - let event_cnt = events.len(); + let event_cnt = events.len() as u32; Ok(ExperimentalEventsSepSepValueGetResponse::Success( models::EventsGet { resume_offset: (offset + event_cnt) as i32, diff --git a/api/src/tests.rs b/api/src/tests.rs index 460f8ca7b..3ca80ed6f 100644 --- a/api/src/tests.rs +++ b/api/src/tests.rs @@ -114,8 +114,6 @@ mock! { &self, start: &Interest, end: &Interest, - offset: usize, - limit: usize, ) -> Result>; } } @@ -128,8 +126,8 @@ mock! { async fn range_with_values( &self, range: Range, - offset: usize, - limit: usize, + offset: u32, + limit: u32, ) -> Result)>>; async fn value_for_order_key(&self, key: &EventId) -> Result>>; async fn value_for_cid(&self, key: &Cid) -> Result>>; @@ -580,11 +578,9 @@ async fn get_interests() { .with( predicate::eq(Interest::min_value()), predicate::eq(Interest::max_value()), - predicate::eq(0), - predicate::eq(usize::MAX), ) .once() - .return_once(move |_, _, _, _| { + .return_once(move |_, _| { Ok(vec![ Interest::builder() .with_sep_key("model") @@ -662,11 +658,9 @@ async fn get_interests_for_peer() { .with( predicate::eq(Interest::min_value()), predicate::eq(Interest::max_value()), - predicate::eq(0), - predicate::eq(usize::MAX), ) .once() - .return_once(move |_, _, _, _| { + .return_once(move |_, _| { Ok(vec![ Interest::builder() .with_sep_key("model") diff --git a/event-svc/src/event/store.rs b/event-svc/src/event/store.rs index 3880889c9..146894d7d 100644 --- a/event-svc/src/event/store.rs +++ b/event-svc/src/event/store.rs @@ -47,9 +47,6 @@ impl recon::Store for EventService { type Key = EventId; type Hash = Sha256a; - /// Insert new keys into the key space. - /// Returns true for each key if it did not previously exist, in the - /// same order as the input iterator. async fn insert_many( &self, items: &[ReconItem], @@ -67,9 +64,6 @@ impl recon::Store for EventService { Ok(res.into()) } - /// Return the hash of all keys in the range between left_fencepost and right_fencepost. - /// Both range bounds are exclusive. - /// Returns ReconResult<(Hash, count), Err> async fn hash_range(&self, range: Range<&Self::Key>) -> ReconResult> { let res = self .event_access @@ -79,52 +73,28 @@ impl recon::Store for EventService { Ok(res) } - /// Return all keys in the range between left_fencepost and right_fencepost. - /// Both range bounds are exclusive. - /// - /// Offset and limit values are applied within the range of keys. async fn range( &self, range: Range<&Self::Key>, - offset: usize, - limit: usize, ) -> ReconResult + Send + 'static>> { Ok(Box::new( self.event_access - .range(range, offset, limit) + .range(range) .await .map_err(Error::from)? .into_iter(), )) } - - /// Return all keys and values in the range between left_fencepost and right_fencepost. - /// Both range bounds are exclusive. - /// - /// Offset and limit values are applied within the range of keys. - async fn range_with_values( - &self, - range: Range<&Self::Key>, - offset: usize, - limit: usize, - ) -> ReconResult)> + Send + 'static>> { - Ok(Box::new( - self.event_access - .range_with_values(range, offset, limit) - .await - .map_err(Error::from)? - .into_iter(), - )) + async fn first(&self, range: Range<&Self::Key>) -> ReconResult> { + Ok(self.event_access.first(range).await.map_err(Error::from)?) } - /// Return the number of keys within the range. + async fn middle(&self, range: Range<&Self::Key>) -> ReconResult> { + Ok(self.event_access.middle(range).await.map_err(Error::from)?) + } + async fn count(&self, range: Range<&Self::Key>) -> ReconResult { Ok(self.event_access.count(range).await.map_err(Error::from)?) } - - /// value_for_key returns - /// Ok(Some(value)) if stored, - /// Ok(None) if not stored, and - /// Err(e) if retrieving failed. async fn value_for_key(&self, key: &Self::Key) -> ReconResult>> { Ok(self .event_access @@ -210,8 +180,8 @@ impl ceramic_api::EventService for EventService { async fn range_with_values( &self, range: Range, - offset: usize, - limit: usize, + offset: u32, + limit: u32, ) -> anyhow::Result)>> { self.event_access .range_with_values(&range.start..&range.end, offset, limit) diff --git a/event-svc/src/store/metrics.rs b/event-svc/src/store/metrics.rs index 269fc5faa..da9f2aaef 100644 --- a/event-svc/src/store/metrics.rs +++ b/event-svc/src/store/metrics.rs @@ -173,8 +173,8 @@ where async fn range_with_values( &self, range: Range, - offset: usize, - limit: usize, + offset: u32, + limit: u32, ) -> anyhow::Result)>> { StoreMetricsMiddleware::::record( &self.metrics, @@ -277,28 +277,14 @@ where async fn range( &self, range: Range<&Self::Key>, - offset: usize, - limit: usize, ) -> ReconResult + Send + 'static>> { - StoreMetricsMiddleware::::record( - &self.metrics, - "range", - self.store.range(range, offset, limit), - ) - .await + StoreMetricsMiddleware::::record(&self.metrics, "range", self.store.range(range)).await } - async fn range_with_values( - &self, - range: Range<&Self::Key>, - offset: usize, - limit: usize, - ) -> ReconResult)> + Send + 'static>> { - StoreMetricsMiddleware::::record( - &self.metrics, - "range_with_values", - self.store.range_with_values(range, offset, limit), - ) - .await + async fn first(&self, range: Range<&Self::Key>) -> ReconResult> { + StoreMetricsMiddleware::::record(&self.metrics, "first", self.store.first(range)).await + } + async fn middle(&self, range: Range<&Self::Key>) -> ReconResult> { + StoreMetricsMiddleware::::record(&self.metrics, "middle", self.store.middle(range)).await } async fn full_range( @@ -307,10 +293,6 @@ where StoreMetricsMiddleware::::record(&self.metrics, "full_range", self.store.full_range()) .await } - - async fn middle(&self, range: Range<&Self::Key>) -> ReconResult> { - StoreMetricsMiddleware::::record(&self.metrics, "middle", self.store.middle(range)).await - } async fn count(&self, range: Range<&Self::Key>) -> ReconResult { StoreMetricsMiddleware::::record(&self.metrics, "count", self.store.count(range)).await } diff --git a/event-svc/src/store/sql/access/event.rs b/event-svc/src/store/sql/access/event.rs index 131705c8d..c6f45024f 100644 --- a/event-svc/src/store/sql/access/event.rs +++ b/event-svc/src/store/sql/access/event.rs @@ -1,5 +1,4 @@ use std::{ - num::TryFromIntError, ops::Range, sync::atomic::{AtomicI64, Ordering}, }; @@ -242,21 +241,10 @@ impl EventAccess { } /// Find a range of event IDs - pub async fn range( - &self, - range: Range<&EventId>, - offset: usize, - limit: usize, - ) -> Result> { - let offset: i64 = offset.try_into().map_err(|_e: TryFromIntError| { - Error::new_app(anyhow!("Offset too large to fit into i64")) - })?; - let limit = limit.try_into().unwrap_or(100000); // 100k is still a huge limit + pub async fn range(&self, range: Range<&EventId>) -> Result> { let rows: Vec = sqlx::query_as(ReconQuery::range()) .bind(range.start.as_bytes()) .bind(range.end.as_bytes()) - .bind(limit) - .bind(offset) .fetch_all(self.pool.reader()) .await .map_err(Error::from)?; @@ -267,16 +255,40 @@ impl EventAccess { Ok(rows) } + /// Find first event id within the range + pub async fn first(&self, range: Range<&EventId>) -> Result> { + let key: Option = sqlx::query_as(ReconQuery::first()) + .bind(range.start.as_bytes()) + .bind(range.end.as_bytes()) + .fetch_optional(self.pool.reader()) + .await + .map_err(Error::from)?; + key.map(|k| EventId::try_from(k).map_err(|e: InvalidEventId| Error::new_app(anyhow!(e)))) + .transpose() + } + /// Find an approximate middle event id within the range + pub async fn middle(&self, range: Range<&EventId>) -> Result> { + let count = self.count(range.clone()).await?; + // (usize::MAX / 2) == i64::MAX, meaning it should always fit inside an i64. + // However to be safe we default to i64::MAX. + let half: i64 = (count / 2).try_into().unwrap_or(i64::MAX); + let key: Option = sqlx::query_as(ReconQuery::middle()) + .bind(range.start.as_bytes()) + .bind(range.end.as_bytes()) + .bind(half) + .fetch_optional(self.pool.reader()) + .await + .map_err(Error::from)?; + key.map(|k| EventId::try_from(k).map_err(|e: InvalidEventId| Error::new_app(anyhow!(e)))) + .transpose() + } /// Find a range of event IDs with their values. Should replace `range` when we move to discovering values and keys simultaneously. pub async fn range_with_values( &self, range: Range<&EventId>, - offset: usize, - limit: usize, + offset: u32, + limit: u32, ) -> Result)>> { - let offset = offset.try_into().unwrap_or(i64::MAX); - let limit: i64 = limit.try_into().unwrap_or(i64::MAX); - let all_blocks: Vec = sqlx::query_as(EventQuery::value_blocks_by_order_key_many()) .bind(range.start.as_bytes()) diff --git a/event-svc/src/store/sql/query.rs b/event-svc/src/store/sql/query.rs index bcfadad6a..9b3808ca5 100644 --- a/event-svc/src/store/sql/query.rs +++ b/event-svc/src/store/sql/query.rs @@ -26,9 +26,9 @@ impl EventQuery { /// Requires binding 1 parameter. Finds the `BlockRow` values needed to rebuild the event /// Looks up the event by the EventID (ie order_key). pub fn value_blocks_by_order_key_one() -> &'static str { - r#"SELECT + r#"SELECT eb.codec, eb.root, b.multihash, b.bytes - FROM ceramic_one_event_block eb + FROM ceramic_one_event_block eb JOIN ceramic_one_block b on b.multihash = eb.block_multihash JOIN ceramic_one_event e on e.cid = eb.event_cid WHERE e.order_key = $1 @@ -129,7 +129,7 @@ impl EventQuery { } pub fn new_delivered_events_id_only() -> &'static str { - r#"SELECT + r#"SELECT cid, COALESCE(delivered, 0) as "new_highwater_mark" FROM ceramic_one_event WHERE delivered >= $1 -- we return delivered+1 so we must match it next search @@ -139,8 +139,8 @@ impl EventQuery { /// Returns the max delivered value in the event table pub fn max_delivered() -> &'static str { - r#"SELECT - COALESCE(MAX(delivered), 0) as res + r#"SELECT + COALESCE(MAX(delivered), 0) as res FROM ceramic_one_event;"# } @@ -214,13 +214,36 @@ impl ReconQuery { TOTAL(ahash_4) & 0xFFFFFFFF as ahash_4, TOTAL(ahash_5) & 0xFFFFFFFF as ahash_5, TOTAL(ahash_6) & 0xFFFFFFFF as ahash_6, TOTAL(ahash_7) & 0xFFFFFFFF as ahash_7, COUNT(1) as count - FROM ceramic_one_event + FROM ceramic_one_event WHERE order_key >= $1 AND order_key < $2;"# } } } /// Requires binding 2 parameters pub fn range() -> &'static str { + r#"SELECT + order_key + FROM + ceramic_one_event + WHERE + order_key >= $1 AND order_key < $2 + ORDER BY + order_key ASC;"# + } + /// Requires binding 2 parameters + pub fn first() -> &'static str { + r#"SELECT + order_key + FROM + ceramic_one_event + WHERE + order_key >= $1 AND order_key < $2 + ORDER BY + order_key ASC + LIMIT 1;"# + } + /// Requires binding 3 parameters + pub fn middle() -> &'static str { r#"SELECT order_key FROM @@ -230,9 +253,9 @@ impl ReconQuery { ORDER BY order_key ASC LIMIT - $3 + 1 OFFSET - $4;"# + $3;"# } pub fn count(db: SqlBackend) -> &'static str { diff --git a/event-svc/src/store/sql/test.rs b/event-svc/src/store/sql/test.rs index 90b51cd96..bfd5e5648 100644 --- a/event-svc/src/store/sql/test.rs +++ b/event-svc/src/store/sql/test.rs @@ -101,8 +101,6 @@ async fn range_query() { .range( &event_id_builder().with_min_event().build() ..&event_id_builder().with_max_event().build(), - 0, - usize::MAX, ) .await .unwrap(); diff --git a/event-svc/src/tests/event.rs b/event-svc/src/tests/event.rs index 889e5faa3..d14c260cf 100644 --- a/event-svc/src/tests/event.rs +++ b/event-svc/src/tests/event.rs @@ -47,8 +47,8 @@ macro_rules! test_with_dbs { } test_with_dbs!( - range_query_with_values, - range_query_with_values, + range_query, + range_query, [ "delete from ceramic_one_event_block", "delete from ceramic_one_event", @@ -56,36 +56,86 @@ test_with_dbs!( ] ); -async fn range_query_with_values(store: S) +async fn range_query(store: S) where S: recon::Store, { let (model, events) = get_events_return_model().await; - let one = &events[0]; - let two = &events[1]; - let init_cid = one.key.cid().unwrap(); + let init_cid = events[0].key.cid().unwrap(); let min_id = event_id_min(&init_cid, &model); let max_id = event_id_max(&init_cid, &model); - recon::Store::insert_many(&store, &[one.clone()], NodeKey::random().id()) + recon::Store::insert_many(&store, &events, NodeKey::random().id()) .await .unwrap(); - recon::Store::insert_many(&store, &[two.clone()], NodeKey::random().id()) + let values: Vec = recon::Store::range(&store, &min_id..&max_id) .await - .unwrap(); - let values: Vec<(EventId, Vec)> = - recon::Store::range_with_values(&store, &min_id..&max_id, 0, usize::MAX) - .await - .unwrap() - .collect(); + .unwrap() + .collect(); - let mut expected = vec![ - (one.key.to_owned(), one.value.to_vec()), - (two.key.to_owned(), two.value.to_vec()), - ]; + let mut expected: Vec<_> = events.into_iter().map(|item| item.key).collect(); expected.sort(); assert_eq!(expected, values); } +test_with_dbs!( + first_query, + first_query, + [ + "delete from ceramic_one_event_block", + "delete from ceramic_one_event", + "delete from ceramic_one_block", + ] +); + +async fn first_query(store: S) +where + S: recon::Store + std::marker::Sync, +{ + let (model, events) = get_events_return_model().await; + let init_cid = events[0].key.cid().unwrap(); + let min_id = event_id_min(&init_cid, &model); + let max_id = event_id_max(&init_cid, &model); + recon::Store::insert_many(&store, &events, NodeKey::random().id()) + .await + .unwrap(); + let first = recon::Store::first(&store, &min_id..&max_id).await.unwrap(); + + // Sort events into expected because event ids are not sorted in log order + let mut expected: Vec<_> = events.into_iter().map(|item| item.key).collect(); + expected.sort(); + assert_eq!(Some(expected[0].clone()), first); +} +test_with_dbs!( + middle_query, + middle_query, + [ + "delete from ceramic_one_event_block", + "delete from ceramic_one_event", + "delete from ceramic_one_block", + ] +); + +async fn middle_query(store: S) +where + S: recon::Store, +{ + let (model, events) = get_events_return_model().await; + let init_cid = events[0].key.cid().unwrap(); + let min_id = event_id_min(&init_cid, &model); + let max_id = event_id_max(&init_cid, &model); + recon::Store::insert_many(&store, &events, NodeKey::random().id()) + .await + .unwrap(); + let middle = recon::Store::middle(&store, &min_id..&max_id) + .await + .unwrap(); + + // Sort events into expected because event ids are not sorted in log order + let mut expected: Vec<_> = events.into_iter().map(|item| item.key).collect(); + expected.sort(); + assert_eq!(Some(expected[expected.len() / 2].clone()), middle); +} + test_with_dbs!( double_insert, double_insert, diff --git a/event-svc/src/tests/migration.rs b/event-svc/src/tests/migration.rs index 63c63c4ff..c57b8a9b5 100644 --- a/event-svc/src/tests/migration.rs +++ b/event-svc/src/tests/migration.rs @@ -59,14 +59,15 @@ async fn test_migration(cars: Vec>) { .migrate_from_ipfs(Network::Local(42), blocks, false) .await .unwrap(); - let actual_events: BTreeSet<_> = recon::Store::range_with_values( + let actual_events: BTreeSet<_> = ceramic_api::EventService::range_with_values( &service, - &EventId::min_value()..&EventId::max_value(), + EventId::min_value()..EventId::max_value(), 0, - usize::MAX, + u32::MAX, ) .await .unwrap() + .into_iter() .map(|(_event_id, car)| multibase::encode(multibase::Base::Base64Url, car)) .collect(); assert_eq!(expected_events, actual_events) diff --git a/interest-svc/src/interest/store.rs b/interest-svc/src/interest/store.rs index b2f3e2184..1b22a271a 100644 --- a/interest-svc/src/interest/store.rs +++ b/interest-svc/src/interest/store.rs @@ -13,9 +13,6 @@ impl recon::Store for InterestService { type Key = Interest; type Hash = Sha256a; - /// Insert new keys into the key space. - /// Returns true for each key if it did not previously exist, in the - /// same order as the input iterator. #[instrument(skip(self))] async fn insert_many( &self, @@ -29,9 +26,6 @@ impl recon::Store for InterestService { .map_err(Error::from)?) } - /// Return the hash of all keys in the range between left_fencepost and right_fencepost. - /// Both range bounds are exclusive. - /// Returns ReconResult<(Hash, count), Err> #[instrument(skip(self))] async fn hash_range(&self, range: Range<&Self::Key>) -> ReconResult> { Ok(CeramicOneInterest::hash_range(&self.pool, range) @@ -39,43 +33,31 @@ impl recon::Store for InterestService { .map_err(Error::from)?) } - /// Return all keys in the range between left_fencepost and right_fencepost. - /// Both range bounds are exclusive. - /// - /// Offset and limit values are applied within the range of keys. #[instrument(skip(self))] async fn range( &self, range: Range<&Self::Key>, - - offset: usize, - limit: usize, ) -> ReconResult + Send + 'static>> { Ok(Box::new( - CeramicOneInterest::range(&self.pool, range, offset, limit) + CeramicOneInterest::range(&self.pool, range) .await .map_err(Error::from)? .into_iter(), )) } - - /// Return all keys and values in the range between left_fencepost and right_fencepost. - /// Both range bounds are exclusive. - /// - /// Offset and limit values are applied within the range of keys. #[instrument(skip(self))] - async fn range_with_values( - &self, - range: Range<&Self::Key>, - offset: usize, - limit: usize, - ) -> ReconResult)> + Send + 'static>> { - let res = CeramicOneInterest::range(&self.pool, range, offset, limit) + async fn first(&self, range: Range<&Self::Key>) -> ReconResult> { + Ok(CeramicOneInterest::first(&self.pool, range) + .await + .map_err(Error::from)?) + } + #[instrument(skip(self))] + async fn middle(&self, range: Range<&Self::Key>) -> ReconResult> { + Ok(CeramicOneInterest::middle(&self.pool, range) .await - .map_err(Error::from)?; - Ok(Box::new(res.into_iter().map(|key| (key, vec![])))) + .map_err(Error::from)?) } - /// Return the number of keys within the range. + #[instrument(skip(self))] async fn count(&self, range: Range<&Self::Key>) -> ReconResult { Ok(CeramicOneInterest::count(&self.pool, range) @@ -83,10 +65,6 @@ impl recon::Store for InterestService { .map_err(Error::from)?) } - /// value_for_key returns - /// Ok(Some(value)) if stored, - /// Ok(None) if not stored, and - /// Err(e) if retrieving failed. #[instrument(skip(self))] async fn value_for_key(&self, _key: &Interest) -> ReconResult>> { Ok(Some(vec![])) @@ -98,13 +76,7 @@ impl ceramic_api::InterestService for InterestService { async fn insert(&self, key: Interest) -> anyhow::Result { Ok(CeramicOneInterest::insert(&self.pool, &key).await?) } - async fn range( - &self, - start: &Interest, - end: &Interest, - offset: usize, - limit: usize, - ) -> anyhow::Result> { - Ok(CeramicOneInterest::range(&self.pool, start..end, offset, limit).await?) + async fn range(&self, start: &Interest, end: &Interest) -> anyhow::Result> { + Ok(CeramicOneInterest::range(&self.pool, start..end).await?) } } diff --git a/interest-svc/src/store/metrics.rs b/interest-svc/src/store/metrics.rs index 3ad852d9e..de6bb1b5e 100644 --- a/interest-svc/src/store/metrics.rs +++ b/interest-svc/src/store/metrics.rs @@ -137,17 +137,11 @@ where self.record_key_insert(new); Ok(new) } - async fn range( - &self, - start: &Interest, - end: &Interest, - offset: usize, - limit: usize, - ) -> anyhow::Result> { + async fn range(&self, start: &Interest, end: &Interest) -> anyhow::Result> { StoreMetricsMiddleware::::record( &self.metrics, "api_interest_range", - self.store.range(start, end, offset, limit), + self.store.range(start, end), ) .await } @@ -194,28 +188,14 @@ where async fn range( &self, range: Range<&Self::Key>, - offset: usize, - limit: usize, ) -> ReconResult + Send + 'static>> { - StoreMetricsMiddleware::::record( - &self.metrics, - "range", - self.store.range(range, offset, limit), - ) - .await + StoreMetricsMiddleware::::record(&self.metrics, "range", self.store.range(range)).await } - async fn range_with_values( - &self, - range: Range<&Self::Key>, - offset: usize, - limit: usize, - ) -> ReconResult)> + Send + 'static>> { - StoreMetricsMiddleware::::record( - &self.metrics, - "range_with_values", - self.store.range_with_values(range, offset, limit), - ) - .await + async fn first(&self, range: Range<&Self::Key>) -> ReconResult> { + StoreMetricsMiddleware::::record(&self.metrics, "first", self.store.first(range)).await + } + async fn middle(&self, range: Range<&Self::Key>) -> ReconResult> { + StoreMetricsMiddleware::::record(&self.metrics, "middle", self.store.middle(range)).await } async fn full_range( @@ -225,9 +205,6 @@ where .await } - async fn middle(&self, range: Range<&Self::Key>) -> ReconResult> { - StoreMetricsMiddleware::::record(&self.metrics, "middle", self.store.middle(range)).await - } async fn count(&self, range: Range<&Self::Key>) -> ReconResult { StoreMetricsMiddleware::::record(&self.metrics, "count", self.store.count(range)).await } diff --git a/interest-svc/src/store/sql/access/interest.rs b/interest-svc/src/store/sql/access/interest.rs index 07a848337..4f5d6bd8a 100644 --- a/interest-svc/src/store/sql/access/interest.rs +++ b/interest-svc/src/store/sql/access/interest.rs @@ -10,7 +10,7 @@ use sqlx::Row; use crate::store::{ sql::{ - entities::ReconHash, + entities::{OrderKey, ReconHash}, query::{ReconQuery, SqlBackend}, SqliteTransaction, }, @@ -104,19 +104,12 @@ impl CeramicOneInterest { Ok(HashCount::new(Sha256a::from(bytes), res.count())) } - /// Find the interestsin the range between left_fencepost and right_fencepost. - pub async fn range( - pool: &SqlitePool, - range: Range<&Interest>, - offset: usize, - limit: usize, - ) -> Result> { + /// Find the interests in the range + pub async fn range(pool: &SqlitePool, range: Range<&Interest>) -> Result> { let query = sqlx::query(ReconQuery::range()); let rows = query .bind(range.start.as_bytes()) .bind(range.end.as_bytes()) - .bind(limit as i64) - .bind(offset as i64) .fetch_all(pool.reader()) .await?; let rows = rows @@ -130,6 +123,35 @@ impl CeramicOneInterest { Ok(rows) } + /// Find the first interest in the range + pub async fn first(pool: &SqlitePool, range: Range<&Interest>) -> Result> { + sqlx::query_as(ReconQuery::first()) + .bind(range.start.as_bytes()) + .bind(range.end.as_bytes()) + .fetch_optional(pool.reader()) + .await + .map_err(Error::from)? + .map(|key: OrderKey| Interest::try_from(key)) + .transpose() + .map_err(Error::new_fatal) + } + /// Find the approximate middle interest in the range + pub async fn middle(pool: &SqlitePool, range: Range<&Interest>) -> Result> { + let count = Self::count(pool, range.clone()).await?; + // (usize::MAX / 2) == i64::MAX, meaning it should always fit inside an i64. + // However to be safe we default to i64::MAX. + let half: i64 = (count / 2).try_into().unwrap_or(i64::MAX); + sqlx::query_as(ReconQuery::middle()) + .bind(range.start.as_bytes()) + .bind(range.end.as_bytes()) + .bind(half) + .fetch_optional(pool.reader()) + .await + .map_err(Error::from)? + .map(|key: OrderKey| Interest::try_from(key)) + .transpose() + .map_err(Error::new_fatal) + } /// Count the number of keys in a given range pub async fn count(pool: &SqlitePool, range: Range<&Interest>) -> Result { let row = sqlx::query(ReconQuery::count(SqlBackend::Sqlite)) diff --git a/interest-svc/src/store/sql/entities/mod.rs b/interest-svc/src/store/sql/entities/mod.rs index 4860a3909..71602a528 100644 --- a/interest-svc/src/store/sql/entities/mod.rs +++ b/interest-svc/src/store/sql/entities/mod.rs @@ -1,5 +1,7 @@ mod hash; +mod order_key; mod version; pub use hash::ReconHash; +pub use order_key::OrderKey; pub use version::VersionRow; diff --git a/interest-svc/src/store/sql/entities/order_key.rs b/interest-svc/src/store/sql/entities/order_key.rs new file mode 100644 index 000000000..9e85bda5a --- /dev/null +++ b/interest-svc/src/store/sql/entities/order_key.rs @@ -0,0 +1,14 @@ +use ceramic_core::Interest; + +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct OrderKey { + pub order_key: Vec, +} + +impl TryFrom for Interest { + type Error = ceramic_core::interest::InvalidInterest; + + fn try_from(value: OrderKey) -> std::prelude::v1::Result { + Interest::try_from(value.order_key) + } +} diff --git a/interest-svc/src/store/sql/query.rs b/interest-svc/src/store/sql/query.rs index 98c319db5..98dec8e10 100644 --- a/interest-svc/src/store/sql/query.rs +++ b/interest-svc/src/store/sql/query.rs @@ -36,6 +36,30 @@ impl ReconQuery { } /// Requires binding 2 parameters pub fn range() -> &'static str { + r#"SELECT + order_key + FROM + ceramic_one_interest + WHERE + order_key >= $1 AND order_key < $2 + ORDER BY + order_key ASC;"# + } + /// Requires binding 2 parameters + pub fn first() -> &'static str { + r#"SELECT + order_key + FROM + ceramic_one_interest + WHERE + order_key >= $1 AND order_key < $2 + ORDER BY + order_key ASC + LIMIT + 1;"# + } + /// Requires binding 3 parameters + pub fn middle() -> &'static str { r#"SELECT order_key FROM @@ -45,9 +69,9 @@ impl ReconQuery { ORDER BY order_key ASC LIMIT - $3 + 1 OFFSET - $4;"# + $3;"# } pub fn count(db: SqlBackend) -> &'static str { diff --git a/interest-svc/src/tests/interest.rs b/interest-svc/src/tests/interest.rs index ecad79529..2684b1ba7 100644 --- a/interest-svc/src/tests/interest.rs +++ b/interest-svc/src/tests/interest.rs @@ -85,15 +85,9 @@ async fn access_interest_model(store: impl InterestService) { InterestService::insert(&store, interest_1.clone()) .await .unwrap(); - let interests = InterestService::range( - &store, - &random_interest_min(), - &random_interest_max(), - 0, - usize::MAX, - ) - .await - .unwrap(); + let interests = InterestService::range(&store, &random_interest_min(), &random_interest_max()) + .await + .unwrap(); assert_eq!( BTreeSet::from_iter([interest_0, interest_1]), BTreeSet::from_iter(interests) @@ -101,12 +95,12 @@ async fn access_interest_model(store: impl InterestService) { } test_with_dbs!( - test_hash_range_query, - test_hash_range_query, + hash_range_query, + hash_range_query, ["delete from ceramic_one_interest"] ); -async fn test_hash_range_query(store: S) +async fn hash_range_query(store: S) where S: recon::Store, { @@ -140,93 +134,92 @@ where } test_with_dbs!( - test_range_query, - test_range_query, + range_query, + range_query, ["delete from ceramic_one_interest"] ); -async fn test_range_query(store: S) +async fn range_query(store: S) where S: recon::Store, { - let interest_0 = random_interest(None, None); - let interest_1 = random_interest(None, None); + let mut interests: Vec<_> = (0..10).map(|_| random_interest(None, None)).collect(); + let items: Vec<_> = interests + .iter() + .map(|interest| ReconItem::new(interest.clone(), Vec::new())) + .collect(); - recon::Store::insert_many( - &store, - &[ReconItem::new(interest_0.clone(), Vec::new())], - NodeKey::random().id(), - ) - .await - .unwrap(); - recon::Store::insert_many( - &store, - &[ReconItem::new(interest_1.clone(), Vec::new())], - NodeKey::random().id(), - ) - .await - .unwrap(); - let ids = recon::Store::range( - &store, - &random_interest_min()..&random_interest_max(), - 0, - usize::MAX, - ) - .await - .unwrap(); - let interests = ids.collect::>(); - assert_eq!(BTreeSet::from_iter([interest_0, interest_1]), interests); + recon::Store::insert_many(&store, &items, NodeKey::random().id()) + .await + .unwrap(); + let ids = recon::Store::range(&store, &random_interest_min()..&random_interest_max()) + .await + .unwrap(); + let mut ids: Vec = ids.collect(); + interests.sort(); + ids.sort(); + assert_eq!(interests, ids); } test_with_dbs!( - test_range_with_values_query, - test_range_with_values_query, + first_query, + first_query, ["delete from ceramic_one_interest"] ); -async fn test_range_with_values_query(store: S) +async fn first_query(store: S) where - S: recon::Store, + S: recon::Store + Sync, { - let interest_0 = random_interest(None, None); - let interest_1 = random_interest(None, None); + let mut interests: Vec<_> = (0..10).map(|_| random_interest(None, None)).collect(); + let items: Vec<_> = interests + .iter() + .map(|interest| ReconItem::new(interest.clone(), Vec::new())) + .collect(); - store - .insert_many( - &[ReconItem::new(interest_0.clone(), Vec::new())], - NodeKey::random().id(), - ) + recon::Store::insert_many(&store, &items, NodeKey::random().id()) .await .unwrap(); - store - .insert_many( - &[ReconItem::new(interest_1.clone(), Vec::new())], - NodeKey::random().id(), - ) + let first = recon::Store::first(&store, &random_interest_min()..&random_interest_max()) + .await + .unwrap(); + interests.sort(); + assert_eq!(Some(interests[0].clone()), first); +} + +test_with_dbs!( + middle_query, + middle_query, + ["delete from ceramic_one_interest"] +); + +async fn middle_query(store: S) +where + S: recon::Store + Sync, +{ + let mut interests: Vec<_> = (0..10).map(|_| random_interest(None, None)).collect(); + let items: Vec<_> = interests + .iter() + .map(|interest| ReconItem::new(interest.clone(), Vec::new())) + .collect(); + + recon::Store::insert_many(&store, &items, NodeKey::random().id()) .await .unwrap(); - let ids = store - .range_with_values( - &random_interest_min()..&random_interest_max(), - 0, - usize::MAX, - ) + let middle = recon::Store::middle(&store, &random_interest_min()..&random_interest_max()) .await .unwrap(); - let interests = ids - .into_iter() - .map(|(i, _v)| i) - .collect::>(); - assert_eq!(BTreeSet::from_iter([interest_0, interest_1]), interests); + interests.sort(); + assert_eq!(Some(interests[interests.len() / 2].clone()), middle); } test_with_dbs!( - test_double_insert, - test_double_insert, + double_insert, + double_insert, ["delete from ceramic_one_interest"] ); -async fn test_double_insert(store: S) +async fn double_insert(store: S) where S: recon::Store, { @@ -253,12 +246,12 @@ where } test_with_dbs!( - test_value_for_key, - test_value_for_key, + value_for_key, + value_for_key, ["delete from ceramic_one_interest"] ); -async fn test_value_for_key(store: S) +async fn value_for_key(store: S) where S: recon::Store, { diff --git a/p2p/src/node.rs b/p2p/src/node.rs index 32d71fb62..0c0fed5d0 100644 --- a/p2p/src/node.rs +++ b/p2p/src/node.rs @@ -1269,13 +1269,7 @@ mod tests { unreachable!() } - async fn range( - &self, - _left_fencepost: Self::Key, - _right_fencepost: Self::Key, - _offset: usize, - _limit: usize, - ) -> ReconResult> { + async fn range(&self, _range: std::ops::Range<&Self::Key>) -> ReconResult> { unreachable!() } diff --git a/recon/src/libp2p/tests.rs b/recon/src/libp2p/tests.rs index 7f603f90e..17ece75b3 100644 --- a/recon/src/libp2p/tests.rs +++ b/recon/src/libp2p/tests.rs @@ -82,22 +82,20 @@ where async fn range( &self, range: Range<&Self::Key>, - offset: usize, - limit: usize, ) -> ReconResult + Send + 'static>> { self.as_error()?; - self.inner.range(range, offset, limit).await + self.inner.range(range).await } - async fn range_with_values( - &self, - range: Range<&Self::Key>, - offset: usize, - limit: usize, - ) -> ReconResult)> + Send + 'static>> { + async fn first(&self, range: Range<&Self::Key>) -> ReconResult> { + self.as_error()?; + + self.inner.first(range).await + } + async fn middle(&self, range: Range<&Self::Key>) -> ReconResult> { self.as_error()?; - self.inner.range_with_values(range, offset, limit).await + self.inner.middle(range).await } async fn value_for_key(&self, key: &Self::Key) -> ReconResult>> { diff --git a/recon/src/protocol.rs b/recon/src/protocol.rs index 59215a548..0ac52b20c 100644 --- a/recon/src/protocol.rs +++ b/recon/src/protocol.rs @@ -11,6 +11,8 @@ //! Encoding and framing of messages is outside the scope of this crate. //! However the message types do implement serde::Serialize and serde::Deserialize. +use std::ops::Range; + use anyhow::{anyhow, bail, Context, Result}; use async_stream::try_stream; use async_trait::async_trait; @@ -747,7 +749,7 @@ where // TODO this holds all keys in memory // Paginate or otherwise stream the keys and values back out let keys = recon - .range(range.first, range.last, 0, usize::MAX) + .range(&range.first..&range.last) .await?; for key in keys { let value = recon.value_for_key(key.clone()).await?.ok_or_else(|| anyhow!("recon key does not have a value: key={}", key))?; @@ -824,13 +826,7 @@ pub trait Recon: Clone + Send + Sync + 'static { ) -> ReconResult>; /// Get all keys in the specified range - async fn range( - &self, - left_fencepost: Self::Key, - right_fencepost: Self::Key, - offset: usize, - limit: usize, - ) -> ReconResult>; + async fn range(&self, range: Range<&Self::Key>) -> ReconResult>; /// Reports total number of keys async fn len(&self) -> ReconResult; diff --git a/recon/src/recon.rs b/recon/src/recon.rs index c03bfc717..beaf132a7 100644 --- a/recon/src/recon.rs +++ b/recon/src/recon.rs @@ -72,11 +72,7 @@ where trace!(count, ?range, "small split sending all keys"); // We have only a few keys in the range. Let's short circuit the roundtrips and // send the keys directly. - let keys: Vec = self - .store - .range(&range.first..&range.last, 0, usize::MAX) - .await? - .collect(); + let keys: Vec = self.store.range(&range.first..&range.last).await?.collect(); let mut ranges = Vec::with_capacity(keys.len() + 1); let mut prev: Option = None; @@ -147,22 +143,6 @@ where self.store.is_empty().await } - /// Return all keys and values in the range between left_fencepost and right_fencepost. - /// The upper range bound is exclusive. - /// - /// Offset and limit values are applied within the range of keys. - pub async fn range_with_values( - &self, - left_fencepost: &K, - right_fencepost: &K, - offset: usize, - limit: usize, - ) -> Result)> + Send + 'static>> { - self.store - .range_with_values(left_fencepost..right_fencepost, offset, limit) - .await - } - /// Return all keys. pub async fn full_range(&self) -> Result + Send + 'static>> { self.store.full_range().await @@ -185,22 +165,11 @@ where self.store.insert_many(&items, informant).await } - /// Return all keys in the range between left_fencepost and right_fencepost. - /// The upper range bound is exclusive. + /// Return all keys in the range. /// /// Offset and limit values are applied within the range of keys. - async fn range( - &self, - left_fencepost: Self::Key, - right_fencepost: Self::Key, - offset: usize, - limit: usize, - ) -> Result> { - Ok(self - .store - .range(&left_fencepost..&right_fencepost, offset, limit) - .await? - .collect()) + async fn range(&self, range: Range<&Self::Key>) -> Result> { + Ok(self.store.range(range).await?.collect()) } async fn len(&self) -> Result { @@ -296,9 +265,8 @@ where // in the range already. let split_key = self .store - .range(&range.first..&range.last, 0, 1) + .first(&range.first..&range.last,) .await? - .next() .ok_or_else(|| { Error::new_fatal(anyhow!( "unreachable, at least one key should exist in range given the conditional guard above" @@ -543,80 +511,31 @@ pub trait Store { async fn hash_range(&self, range: Range<&Self::Key>) -> Result>; /// Return all keys in the range. - /// The upper range bound is exclusive. - /// - /// Offset and limit values are applied within the range of keys. async fn range( &self, range: Range<&Self::Key>, - offset: usize, - limit: usize, ) -> Result + Send + 'static>>; - /// Return all keys and values in the range between left_fencepost and right_fencepost. - /// The upper range bound is exclusive. + /// Return the first key in the range. /// - /// Offset and limit values are applied within the range of keys. - async fn range_with_values( - &self, - range: Range<&Self::Key>, - offset: usize, - limit: usize, - ) -> Result)> + Send + 'static>>; + /// Default implementation uses range and reports only the first key + async fn first(&self, range: Range<&Self::Key>) -> Result> { + self.range(range).await.map(|mut keys| keys.next()) + } /// Return all keys. async fn full_range(&self) -> Result + Send + 'static>> { - self.range( - &Self::Key::min_value()..&Self::Key::max_value(), - 0, - usize::MAX, - ) - .await + self.range(&Self::Key::min_value()..&Self::Key::max_value()) + .await } /// Return a key that is approximately in the middle of the range. /// An exact middle is not necessary but performance will be better with a better approximation. - /// - /// The default implementation will count all elements and then find the middle. - async fn middle(&self, range: Range<&Self::Key>) -> Result> { - let count = self.count(range.clone()).await?; - if count == 0 { - Ok(None) - } else { - Ok(self.range(range, count / 2, 1).await?.next()) - } - } - - /// Return any number of splits of the range. - /// An exact split is not necessary but performance will be better with a better approximation. - /// - /// The input range bounds are not part of the returned split as they are the - /// implicit outermost bounds of the split. - /// - /// The default implementation uses middle to split the range approximately in two. - async fn split(&self, range: Range<&Self::Key>) -> Result> { - if let Some(middle) = self.middle(range.clone()).await? { - let left = self.hash_range(range.start..&middle).await?; - let right = self.hash_range(&middle..range.end).await?; - Ok(Split { - keys: vec![middle], - hashes: vec![left, right], - }) - } else { - // No keys in range return empty split - Ok(Split { - keys: vec![], - hashes: vec![HashCount { - hash: Self::Hash::identity(), - count: 0, - }], - }) - } - } + async fn middle(&self, range: Range<&Self::Key>) -> Result>; /// Return the number of keys within the range. async fn count(&self, range: Range<&Self::Key>) -> Result { - Ok(self.range(range, 0, usize::MAX).await?.count()) + Ok(self.range(range).await?.count()) } /// Reports total number of keys @@ -633,6 +552,8 @@ pub trait Store { async fn value_for_key(&self, key: &Self::Key) -> Result>>; } +// Explicitly implement every member of the trait so we do not mask any non default implementations +// on the S instance. #[async_trait::async_trait] impl Store for std::sync::Arc where @@ -650,29 +571,33 @@ where ) -> Result> { self.as_ref().insert_many(items, informant).await } - async fn hash_range(&self, range: Range<&Self::Key>) -> Result> { self.as_ref().hash_range(range).await } - async fn range( &self, range: Range<&Self::Key>, - offset: usize, - limit: usize, ) -> Result + Send + 'static>> { - self.as_ref().range(range, offset, limit).await + self.as_ref().range(range).await } - - async fn range_with_values( - &self, - range: Range<&Self::Key>, - offset: usize, - limit: usize, - ) -> Result)> + Send + 'static>> { - self.as_ref().range_with_values(range, offset, limit).await + async fn first(&self, range: Range<&Self::Key>) -> Result> { + self.as_ref().first(range).await + } + async fn full_range(&self) -> Result + Send + 'static>> { + self.as_ref().full_range().await + } + async fn middle(&self, range: Range<&Self::Key>) -> Result> { + self.as_ref().middle(range).await + } + async fn count(&self, range: Range<&Self::Key>) -> Result { + self.as_ref().count(range).await + } + async fn len(&self) -> Result { + self.as_ref().len().await + } + async fn is_empty(&self) -> Result { + self.as_ref().is_empty().await } - async fn value_for_key(&self, key: &Self::Key) -> Result>> { self.as_ref().value_for_key(key).await } @@ -859,7 +784,7 @@ where async fn interests(&self) -> Result>> { self.store - .range(&self.start..&self.end, 0, usize::MAX) + .range(&self.start..&self.end) .await? .map(|interest| { if let Some(RangeOpen { start, end }) = interest.range() { diff --git a/recon/src/recon/btreestore.rs b/recon/src/recon/btreestore.rs index c6a441ce6..0f4bc49b4 100644 --- a/recon/src/recon/btreestore.rs +++ b/recon/src/recon/btreestore.rs @@ -51,9 +51,7 @@ where r } - /// Return the hash of all keys in the range between left_fencepost and right_fencepost. - /// Both range bounds are exclusive. - pub async fn hash_range(&self, range: Range<&K>) -> Result> { + async fn hash_range(&self, range: Range<&K>) -> Result> { if range.start >= range.end { return Ok(HashCount { hash: H::identity(), @@ -74,15 +72,9 @@ where }) } - /// Return all keys in the range between left_fencepost and right_fencepost. - /// Both range bounds are exclusive. - /// - /// Offset and limit values are applied within the range of keys. - pub async fn range( + async fn range( &self, range: Range<&K>, - offset: usize, - limit: usize, ) -> Result + Send + 'static>> { let keys: Vec = self .inner @@ -90,38 +82,30 @@ where .await .keys .range(range) - .skip(offset) - .take(limit) .map(|(key, _hash)| key) .cloned() .collect(); Ok(Box::new(keys.into_iter())) } - /// Return all keys and values in the range between left_fencepost and right_fencepost. - /// Both range bounds are exclusive. - /// - /// Offset and limit values are applied within the range of keys. - pub async fn range_with_values( - &self, - range: Range<&K>, - offset: usize, - limit: usize, - ) -> Result)> + Send + 'static>> { - let inner = self.inner.lock().await; + async fn first(&self, range: Range<&K>) -> Result> { + Ok(self + .inner + .lock() + .await + .keys + .range(range) + .next() + .map(|(k, _)| k.clone())) + } - let keys: Vec<(K, Vec)> = inner + async fn middle(&self, range: Range<&K>) -> Result> { + let inner = self.inner.lock().await; + let count = inner.keys.range(range.clone()).count(); + Ok(inner .keys .range(range) - .skip(offset) - .take(limit) - .filter_map(|(key, _hash)| { - inner - .values - .get(key) - .map(|value| (key.clone(), value.clone())) - }) - .collect(); - Ok(Box::new(keys.into_iter())) + .nth(count / 2) + .map(|(k, _)| k.clone())) } async fn insert(&self, item: &ReconItem, _informant: NodeId) -> Result { @@ -160,28 +144,21 @@ where } async fn hash_range(&self, range: Range<&Self::Key>) -> Result> { - // Self does not need async to implement hash_range, so it exposes a pub non async hash_range function - // and we delegate to its implementation here. BTreeStore::hash_range(self, range).await } async fn range( &self, range: Range<&Self::Key>, - offset: usize, - limit: usize, ) -> Result + Send + 'static>> { - // Self does not need async to implement range, so it exposes a pub non async range function - // and we delegate to its implementation here. - BTreeStore::range(self, range, offset, limit).await + BTreeStore::range(self, range).await } - async fn range_with_values( - &self, - range: Range<&Self::Key>, - offset: usize, - limit: usize, - ) -> Result)> + Send + 'static>> { - BTreeStore::range_with_values(self, range, offset, limit).await + async fn first(&self, range: Range<&Self::Key>) -> Result> { + BTreeStore::first(self, range).await + } + + async fn middle(&self, range: Range<&Self::Key>) -> Result> { + BTreeStore::middle(self, range).await } async fn value_for_key(&self, key: &Self::Key) -> Result>> {