From c479255a8f0b0155077e64bcbc39c2cae73f5b14 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Wed, 4 Dec 2024 13:46:16 -0700 Subject: [PATCH] refactor: remove offset/limit where too generic (#624) * refactor: remove offset/limit where to generic This change removes the offset/limit pagination pattern where it implies a more flexible API than was needed or used. Having the flexible API meant that casts from usize to smaller types for limits and offsets could break pagination assumptions. Now with this change more specific APIs are exposed where its safe to make assumptions about the size of limit offset values in queries. Additionally in the API where true pagination was the desired API u32 is used instead of usize to keep page sizes small. Finally, the range_with_values method on the Recon trait was removed as it was unused * fix: add tests and fix middle syntax error --- Dockerfile | 2 +- api/src/server.rs | 40 ++--- api/src/tests.rs | 14 +- event-svc/src/event/store.rs | 48 ++---- event-svc/src/store/metrics.rs | 34 +--- event-svc/src/store/sql/access/event.rs | 48 +++--- event-svc/src/store/sql/query.rs | 39 ++++- event-svc/src/store/sql/test.rs | 2 - event-svc/src/tests/event.rs | 86 ++++++++--- event-svc/src/tests/migration.rs | 7 +- interest-svc/src/interest/store.rs | 54 ++----- interest-svc/src/store/metrics.rs | 39 +---- interest-svc/src/store/sql/access/interest.rs | 42 +++-- interest-svc/src/store/sql/entities/mod.rs | 2 + .../src/store/sql/entities/order_key.rs | 14 ++ interest-svc/src/store/sql/query.rs | 28 +++- interest-svc/src/tests/interest.rs | 141 ++++++++--------- p2p/src/node.rs | 8 +- recon/src/libp2p/tests.rs | 18 +-- recon/src/protocol.rs | 12 +- recon/src/recon.rs | 145 +++++------------- recon/src/recon/btreestore.rs | 73 +++------ 22 files changed, 401 insertions(+), 495 deletions(-) create mode 100644 interest-svc/src/store/sql/entities/order_key.rs diff --git a/Dockerfile b/Dockerfile index 667748344..519093dff 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM public.ecr.aws/r5b3e0r5/3box/rust-builder:latest as builder +FROM public.ecr.aws/r5b3e0r5/3box/rust-builder:latest AS builder RUN mkdir -p /home/builder/rust-ceramic WORKDIR /home/builder/rust-ceramic diff --git a/api/src/server.rs b/api/src/server.rs index 9e8b0fa55..0de34ece7 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -150,13 +150,7 @@ impl TryFrom for ValidatedInterest { pub trait InterestService: Send + Sync { /// Returns true if the key was newly inserted, false if it already existed. async fn insert(&self, key: Interest) -> Result; - async fn range( - &self, - start: &Interest, - end: &Interest, - offset: usize, - limit: usize, - ) -> Result>; + async fn range(&self, start: &Interest, end: &Interest) -> Result>; } #[async_trait] @@ -165,14 +159,8 @@ impl InterestService for Arc { self.as_ref().insert(key).await } - async fn range( - &self, - start: &Interest, - end: &Interest, - offset: usize, - limit: usize, - ) -> Result> { - self.as_ref().range(start, end, offset, limit).await + async fn range(&self, start: &Interest, end: &Interest) -> Result> { + self.as_ref().range(start, end).await } } @@ -271,8 +259,8 @@ pub trait EventService: Send + Sync { async fn range_with_values( &self, range: Range, - offset: usize, - limit: usize, + offset: u32, + limit: u32, ) -> Result)>>; /** @@ -313,8 +301,8 @@ impl EventService for Arc { async fn range_with_values( &self, range: Range, - offset: usize, - limit: usize, + offset: u32, + limit: u32, ) -> Result)>> { self.as_ref().range_with_values(range, offset, limit).await } @@ -605,12 +593,7 @@ where ) -> Result { let interests = self .interest - .range( - &Interest::min_value(), - &Interest::max_value(), - 0, - usize::MAX, - ) + .range(&Interest::min_value(), &Interest::max_value()) .await .map_err(|e| ErrorResponse::new(format!("failed to get interests: {e}")))?; @@ -654,9 +637,8 @@ where offset: Option, limit: Option, ) -> Result { - let limit: usize = - limit.map_or(10000, |l| if l.is_negative() { 10000 } else { l }) as usize; - let offset = offset.map_or(0, |o| if o.is_negative() { 0 } else { o }) as usize; + let limit = limit.map_or(10000, |l| if l.is_negative() { 10000 } else { l }) as u32; + let offset = offset.map_or(0, |o| if o.is_negative() { 0 } else { o }) as u32; let sep_value = match decode_multibase_data(&sep_value) { Ok(v) => v, Err(e) => return Ok(ExperimentalEventsSepSepValueGetResponse::BadRequest(e)), @@ -674,7 +656,7 @@ where .map(|(id, data)| BuildResponse::event(id, Some(data))) .collect::>(); - let event_cnt = events.len(); + let event_cnt = events.len() as u32; Ok(ExperimentalEventsSepSepValueGetResponse::Success( models::EventsGet { resume_offset: (offset + event_cnt) as i32, diff --git a/api/src/tests.rs b/api/src/tests.rs index 460f8ca7b..3ca80ed6f 100644 --- a/api/src/tests.rs +++ b/api/src/tests.rs @@ -114,8 +114,6 @@ mock! { &self, start: &Interest, end: &Interest, - offset: usize, - limit: usize, ) -> Result>; } } @@ -128,8 +126,8 @@ mock! { async fn range_with_values( &self, range: Range, - offset: usize, - limit: usize, + offset: u32, + limit: u32, ) -> Result)>>; async fn value_for_order_key(&self, key: &EventId) -> Result>>; async fn value_for_cid(&self, key: &Cid) -> Result>>; @@ -580,11 +578,9 @@ async fn get_interests() { .with( predicate::eq(Interest::min_value()), predicate::eq(Interest::max_value()), - predicate::eq(0), - predicate::eq(usize::MAX), ) .once() - .return_once(move |_, _, _, _| { + .return_once(move |_, _| { Ok(vec![ Interest::builder() .with_sep_key("model") @@ -662,11 +658,9 @@ async fn get_interests_for_peer() { .with( predicate::eq(Interest::min_value()), predicate::eq(Interest::max_value()), - predicate::eq(0), - predicate::eq(usize::MAX), ) .once() - .return_once(move |_, _, _, _| { + .return_once(move |_, _| { Ok(vec![ Interest::builder() .with_sep_key("model") diff --git a/event-svc/src/event/store.rs b/event-svc/src/event/store.rs index 3880889c9..146894d7d 100644 --- a/event-svc/src/event/store.rs +++ b/event-svc/src/event/store.rs @@ -47,9 +47,6 @@ impl recon::Store for EventService { type Key = EventId; type Hash = Sha256a; - /// Insert new keys into the key space. - /// Returns true for each key if it did not previously exist, in the - /// same order as the input iterator. async fn insert_many( &self, items: &[ReconItem], @@ -67,9 +64,6 @@ impl recon::Store for EventService { Ok(res.into()) } - /// Return the hash of all keys in the range between left_fencepost and right_fencepost. - /// Both range bounds are exclusive. - /// Returns ReconResult<(Hash, count), Err> async fn hash_range(&self, range: Range<&Self::Key>) -> ReconResult> { let res = self .event_access @@ -79,52 +73,28 @@ impl recon::Store for EventService { Ok(res) } - /// Return all keys in the range between left_fencepost and right_fencepost. - /// Both range bounds are exclusive. - /// - /// Offset and limit values are applied within the range of keys. async fn range( &self, range: Range<&Self::Key>, - offset: usize, - limit: usize, ) -> ReconResult + Send + 'static>> { Ok(Box::new( self.event_access - .range(range, offset, limit) + .range(range) .await .map_err(Error::from)? .into_iter(), )) } - - /// Return all keys and values in the range between left_fencepost and right_fencepost. - /// Both range bounds are exclusive. - /// - /// Offset and limit values are applied within the range of keys. - async fn range_with_values( - &self, - range: Range<&Self::Key>, - offset: usize, - limit: usize, - ) -> ReconResult)> + Send + 'static>> { - Ok(Box::new( - self.event_access - .range_with_values(range, offset, limit) - .await - .map_err(Error::from)? - .into_iter(), - )) + async fn first(&self, range: Range<&Self::Key>) -> ReconResult> { + Ok(self.event_access.first(range).await.map_err(Error::from)?) } - /// Return the number of keys within the range. + async fn middle(&self, range: Range<&Self::Key>) -> ReconResult> { + Ok(self.event_access.middle(range).await.map_err(Error::from)?) + } + async fn count(&self, range: Range<&Self::Key>) -> ReconResult { Ok(self.event_access.count(range).await.map_err(Error::from)?) } - - /// value_for_key returns - /// Ok(Some(value)) if stored, - /// Ok(None) if not stored, and - /// Err(e) if retrieving failed. async fn value_for_key(&self, key: &Self::Key) -> ReconResult>> { Ok(self .event_access @@ -210,8 +180,8 @@ impl ceramic_api::EventService for EventService { async fn range_with_values( &self, range: Range, - offset: usize, - limit: usize, + offset: u32, + limit: u32, ) -> anyhow::Result)>> { self.event_access .range_with_values(&range.start..&range.end, offset, limit) diff --git a/event-svc/src/store/metrics.rs b/event-svc/src/store/metrics.rs index 269fc5faa..da9f2aaef 100644 --- a/event-svc/src/store/metrics.rs +++ b/event-svc/src/store/metrics.rs @@ -173,8 +173,8 @@ where async fn range_with_values( &self, range: Range, - offset: usize, - limit: usize, + offset: u32, + limit: u32, ) -> anyhow::Result)>> { StoreMetricsMiddleware::::record( &self.metrics, @@ -277,28 +277,14 @@ where async fn range( &self, range: Range<&Self::Key>, - offset: usize, - limit: usize, ) -> ReconResult + Send + 'static>> { - StoreMetricsMiddleware::::record( - &self.metrics, - "range", - self.store.range(range, offset, limit), - ) - .await + StoreMetricsMiddleware::::record(&self.metrics, "range", self.store.range(range)).await } - async fn range_with_values( - &self, - range: Range<&Self::Key>, - offset: usize, - limit: usize, - ) -> ReconResult)> + Send + 'static>> { - StoreMetricsMiddleware::::record( - &self.metrics, - "range_with_values", - self.store.range_with_values(range, offset, limit), - ) - .await + async fn first(&self, range: Range<&Self::Key>) -> ReconResult> { + StoreMetricsMiddleware::::record(&self.metrics, "first", self.store.first(range)).await + } + async fn middle(&self, range: Range<&Self::Key>) -> ReconResult> { + StoreMetricsMiddleware::::record(&self.metrics, "middle", self.store.middle(range)).await } async fn full_range( @@ -307,10 +293,6 @@ where StoreMetricsMiddleware::::record(&self.metrics, "full_range", self.store.full_range()) .await } - - async fn middle(&self, range: Range<&Self::Key>) -> ReconResult> { - StoreMetricsMiddleware::::record(&self.metrics, "middle", self.store.middle(range)).await - } async fn count(&self, range: Range<&Self::Key>) -> ReconResult { StoreMetricsMiddleware::::record(&self.metrics, "count", self.store.count(range)).await } diff --git a/event-svc/src/store/sql/access/event.rs b/event-svc/src/store/sql/access/event.rs index 131705c8d..c6f45024f 100644 --- a/event-svc/src/store/sql/access/event.rs +++ b/event-svc/src/store/sql/access/event.rs @@ -1,5 +1,4 @@ use std::{ - num::TryFromIntError, ops::Range, sync::atomic::{AtomicI64, Ordering}, }; @@ -242,21 +241,10 @@ impl EventAccess { } /// Find a range of event IDs - pub async fn range( - &self, - range: Range<&EventId>, - offset: usize, - limit: usize, - ) -> Result> { - let offset: i64 = offset.try_into().map_err(|_e: TryFromIntError| { - Error::new_app(anyhow!("Offset too large to fit into i64")) - })?; - let limit = limit.try_into().unwrap_or(100000); // 100k is still a huge limit + pub async fn range(&self, range: Range<&EventId>) -> Result> { let rows: Vec = sqlx::query_as(ReconQuery::range()) .bind(range.start.as_bytes()) .bind(range.end.as_bytes()) - .bind(limit) - .bind(offset) .fetch_all(self.pool.reader()) .await .map_err(Error::from)?; @@ -267,16 +255,40 @@ impl EventAccess { Ok(rows) } + /// Find first event id within the range + pub async fn first(&self, range: Range<&EventId>) -> Result> { + let key: Option = sqlx::query_as(ReconQuery::first()) + .bind(range.start.as_bytes()) + .bind(range.end.as_bytes()) + .fetch_optional(self.pool.reader()) + .await + .map_err(Error::from)?; + key.map(|k| EventId::try_from(k).map_err(|e: InvalidEventId| Error::new_app(anyhow!(e)))) + .transpose() + } + /// Find an approximate middle event id within the range + pub async fn middle(&self, range: Range<&EventId>) -> Result> { + let count = self.count(range.clone()).await?; + // (usize::MAX / 2) == i64::MAX, meaning it should always fit inside an i64. + // However to be safe we default to i64::MAX. + let half: i64 = (count / 2).try_into().unwrap_or(i64::MAX); + let key: Option = sqlx::query_as(ReconQuery::middle()) + .bind(range.start.as_bytes()) + .bind(range.end.as_bytes()) + .bind(half) + .fetch_optional(self.pool.reader()) + .await + .map_err(Error::from)?; + key.map(|k| EventId::try_from(k).map_err(|e: InvalidEventId| Error::new_app(anyhow!(e)))) + .transpose() + } /// Find a range of event IDs with their values. Should replace `range` when we move to discovering values and keys simultaneously. pub async fn range_with_values( &self, range: Range<&EventId>, - offset: usize, - limit: usize, + offset: u32, + limit: u32, ) -> Result)>> { - let offset = offset.try_into().unwrap_or(i64::MAX); - let limit: i64 = limit.try_into().unwrap_or(i64::MAX); - let all_blocks: Vec = sqlx::query_as(EventQuery::value_blocks_by_order_key_many()) .bind(range.start.as_bytes()) diff --git a/event-svc/src/store/sql/query.rs b/event-svc/src/store/sql/query.rs index bcfadad6a..9b3808ca5 100644 --- a/event-svc/src/store/sql/query.rs +++ b/event-svc/src/store/sql/query.rs @@ -26,9 +26,9 @@ impl EventQuery { /// Requires binding 1 parameter. Finds the `BlockRow` values needed to rebuild the event /// Looks up the event by the EventID (ie order_key). pub fn value_blocks_by_order_key_one() -> &'static str { - r#"SELECT + r#"SELECT eb.codec, eb.root, b.multihash, b.bytes - FROM ceramic_one_event_block eb + FROM ceramic_one_event_block eb JOIN ceramic_one_block b on b.multihash = eb.block_multihash JOIN ceramic_one_event e on e.cid = eb.event_cid WHERE e.order_key = $1 @@ -129,7 +129,7 @@ impl EventQuery { } pub fn new_delivered_events_id_only() -> &'static str { - r#"SELECT + r#"SELECT cid, COALESCE(delivered, 0) as "new_highwater_mark" FROM ceramic_one_event WHERE delivered >= $1 -- we return delivered+1 so we must match it next search @@ -139,8 +139,8 @@ impl EventQuery { /// Returns the max delivered value in the event table pub fn max_delivered() -> &'static str { - r#"SELECT - COALESCE(MAX(delivered), 0) as res + r#"SELECT + COALESCE(MAX(delivered), 0) as res FROM ceramic_one_event;"# } @@ -214,13 +214,36 @@ impl ReconQuery { TOTAL(ahash_4) & 0xFFFFFFFF as ahash_4, TOTAL(ahash_5) & 0xFFFFFFFF as ahash_5, TOTAL(ahash_6) & 0xFFFFFFFF as ahash_6, TOTAL(ahash_7) & 0xFFFFFFFF as ahash_7, COUNT(1) as count - FROM ceramic_one_event + FROM ceramic_one_event WHERE order_key >= $1 AND order_key < $2;"# } } } /// Requires binding 2 parameters pub fn range() -> &'static str { + r#"SELECT + order_key + FROM + ceramic_one_event + WHERE + order_key >= $1 AND order_key < $2 + ORDER BY + order_key ASC;"# + } + /// Requires binding 2 parameters + pub fn first() -> &'static str { + r#"SELECT + order_key + FROM + ceramic_one_event + WHERE + order_key >= $1 AND order_key < $2 + ORDER BY + order_key ASC + LIMIT 1;"# + } + /// Requires binding 3 parameters + pub fn middle() -> &'static str { r#"SELECT order_key FROM @@ -230,9 +253,9 @@ impl ReconQuery { ORDER BY order_key ASC LIMIT - $3 + 1 OFFSET - $4;"# + $3;"# } pub fn count(db: SqlBackend) -> &'static str { diff --git a/event-svc/src/store/sql/test.rs b/event-svc/src/store/sql/test.rs index 90b51cd96..bfd5e5648 100644 --- a/event-svc/src/store/sql/test.rs +++ b/event-svc/src/store/sql/test.rs @@ -101,8 +101,6 @@ async fn range_query() { .range( &event_id_builder().with_min_event().build() ..&event_id_builder().with_max_event().build(), - 0, - usize::MAX, ) .await .unwrap(); diff --git a/event-svc/src/tests/event.rs b/event-svc/src/tests/event.rs index 889e5faa3..d14c260cf 100644 --- a/event-svc/src/tests/event.rs +++ b/event-svc/src/tests/event.rs @@ -47,8 +47,8 @@ macro_rules! test_with_dbs { } test_with_dbs!( - range_query_with_values, - range_query_with_values, + range_query, + range_query, [ "delete from ceramic_one_event_block", "delete from ceramic_one_event", @@ -56,36 +56,86 @@ test_with_dbs!( ] ); -async fn range_query_with_values(store: S) +async fn range_query(store: S) where S: recon::Store, { let (model, events) = get_events_return_model().await; - let one = &events[0]; - let two = &events[1]; - let init_cid = one.key.cid().unwrap(); + let init_cid = events[0].key.cid().unwrap(); let min_id = event_id_min(&init_cid, &model); let max_id = event_id_max(&init_cid, &model); - recon::Store::insert_many(&store, &[one.clone()], NodeKey::random().id()) + recon::Store::insert_many(&store, &events, NodeKey::random().id()) .await .unwrap(); - recon::Store::insert_many(&store, &[two.clone()], NodeKey::random().id()) + let values: Vec = recon::Store::range(&store, &min_id..&max_id) .await - .unwrap(); - let values: Vec<(EventId, Vec)> = - recon::Store::range_with_values(&store, &min_id..&max_id, 0, usize::MAX) - .await - .unwrap() - .collect(); + .unwrap() + .collect(); - let mut expected = vec![ - (one.key.to_owned(), one.value.to_vec()), - (two.key.to_owned(), two.value.to_vec()), - ]; + let mut expected: Vec<_> = events.into_iter().map(|item| item.key).collect(); expected.sort(); assert_eq!(expected, values); } +test_with_dbs!( + first_query, + first_query, + [ + "delete from ceramic_one_event_block", + "delete from ceramic_one_event", + "delete from ceramic_one_block", + ] +); + +async fn first_query(store: S) +where + S: recon::Store + std::marker::Sync, +{ + let (model, events) = get_events_return_model().await; + let init_cid = events[0].key.cid().unwrap(); + let min_id = event_id_min(&init_cid, &model); + let max_id = event_id_max(&init_cid, &model); + recon::Store::insert_many(&store, &events, NodeKey::random().id()) + .await + .unwrap(); + let first = recon::Store::first(&store, &min_id..&max_id).await.unwrap(); + + // Sort events into expected because event ids are not sorted in log order + let mut expected: Vec<_> = events.into_iter().map(|item| item.key).collect(); + expected.sort(); + assert_eq!(Some(expected[0].clone()), first); +} +test_with_dbs!( + middle_query, + middle_query, + [ + "delete from ceramic_one_event_block", + "delete from ceramic_one_event", + "delete from ceramic_one_block", + ] +); + +async fn middle_query(store: S) +where + S: recon::Store, +{ + let (model, events) = get_events_return_model().await; + let init_cid = events[0].key.cid().unwrap(); + let min_id = event_id_min(&init_cid, &model); + let max_id = event_id_max(&init_cid, &model); + recon::Store::insert_many(&store, &events, NodeKey::random().id()) + .await + .unwrap(); + let middle = recon::Store::middle(&store, &min_id..&max_id) + .await + .unwrap(); + + // Sort events into expected because event ids are not sorted in log order + let mut expected: Vec<_> = events.into_iter().map(|item| item.key).collect(); + expected.sort(); + assert_eq!(Some(expected[expected.len() / 2].clone()), middle); +} + test_with_dbs!( double_insert, double_insert, diff --git a/event-svc/src/tests/migration.rs b/event-svc/src/tests/migration.rs index 63c63c4ff..c57b8a9b5 100644 --- a/event-svc/src/tests/migration.rs +++ b/event-svc/src/tests/migration.rs @@ -59,14 +59,15 @@ async fn test_migration(cars: Vec>) { .migrate_from_ipfs(Network::Local(42), blocks, false) .await .unwrap(); - let actual_events: BTreeSet<_> = recon::Store::range_with_values( + let actual_events: BTreeSet<_> = ceramic_api::EventService::range_with_values( &service, - &EventId::min_value()..&EventId::max_value(), + EventId::min_value()..EventId::max_value(), 0, - usize::MAX, + u32::MAX, ) .await .unwrap() + .into_iter() .map(|(_event_id, car)| multibase::encode(multibase::Base::Base64Url, car)) .collect(); assert_eq!(expected_events, actual_events) diff --git a/interest-svc/src/interest/store.rs b/interest-svc/src/interest/store.rs index b2f3e2184..1b22a271a 100644 --- a/interest-svc/src/interest/store.rs +++ b/interest-svc/src/interest/store.rs @@ -13,9 +13,6 @@ impl recon::Store for InterestService { type Key = Interest; type Hash = Sha256a; - /// Insert new keys into the key space. - /// Returns true for each key if it did not previously exist, in the - /// same order as the input iterator. #[instrument(skip(self))] async fn insert_many( &self, @@ -29,9 +26,6 @@ impl recon::Store for InterestService { .map_err(Error::from)?) } - /// Return the hash of all keys in the range between left_fencepost and right_fencepost. - /// Both range bounds are exclusive. - /// Returns ReconResult<(Hash, count), Err> #[instrument(skip(self))] async fn hash_range(&self, range: Range<&Self::Key>) -> ReconResult> { Ok(CeramicOneInterest::hash_range(&self.pool, range) @@ -39,43 +33,31 @@ impl recon::Store for InterestService { .map_err(Error::from)?) } - /// Return all keys in the range between left_fencepost and right_fencepost. - /// Both range bounds are exclusive. - /// - /// Offset and limit values are applied within the range of keys. #[instrument(skip(self))] async fn range( &self, range: Range<&Self::Key>, - - offset: usize, - limit: usize, ) -> ReconResult + Send + 'static>> { Ok(Box::new( - CeramicOneInterest::range(&self.pool, range, offset, limit) + CeramicOneInterest::range(&self.pool, range) .await .map_err(Error::from)? .into_iter(), )) } - - /// Return all keys and values in the range between left_fencepost and right_fencepost. - /// Both range bounds are exclusive. - /// - /// Offset and limit values are applied within the range of keys. #[instrument(skip(self))] - async fn range_with_values( - &self, - range: Range<&Self::Key>, - offset: usize, - limit: usize, - ) -> ReconResult)> + Send + 'static>> { - let res = CeramicOneInterest::range(&self.pool, range, offset, limit) + async fn first(&self, range: Range<&Self::Key>) -> ReconResult> { + Ok(CeramicOneInterest::first(&self.pool, range) + .await + .map_err(Error::from)?) + } + #[instrument(skip(self))] + async fn middle(&self, range: Range<&Self::Key>) -> ReconResult> { + Ok(CeramicOneInterest::middle(&self.pool, range) .await - .map_err(Error::from)?; - Ok(Box::new(res.into_iter().map(|key| (key, vec![])))) + .map_err(Error::from)?) } - /// Return the number of keys within the range. + #[instrument(skip(self))] async fn count(&self, range: Range<&Self::Key>) -> ReconResult { Ok(CeramicOneInterest::count(&self.pool, range) @@ -83,10 +65,6 @@ impl recon::Store for InterestService { .map_err(Error::from)?) } - /// value_for_key returns - /// Ok(Some(value)) if stored, - /// Ok(None) if not stored, and - /// Err(e) if retrieving failed. #[instrument(skip(self))] async fn value_for_key(&self, _key: &Interest) -> ReconResult>> { Ok(Some(vec![])) @@ -98,13 +76,7 @@ impl ceramic_api::InterestService for InterestService { async fn insert(&self, key: Interest) -> anyhow::Result { Ok(CeramicOneInterest::insert(&self.pool, &key).await?) } - async fn range( - &self, - start: &Interest, - end: &Interest, - offset: usize, - limit: usize, - ) -> anyhow::Result> { - Ok(CeramicOneInterest::range(&self.pool, start..end, offset, limit).await?) + async fn range(&self, start: &Interest, end: &Interest) -> anyhow::Result> { + Ok(CeramicOneInterest::range(&self.pool, start..end).await?) } } diff --git a/interest-svc/src/store/metrics.rs b/interest-svc/src/store/metrics.rs index 3ad852d9e..de6bb1b5e 100644 --- a/interest-svc/src/store/metrics.rs +++ b/interest-svc/src/store/metrics.rs @@ -137,17 +137,11 @@ where self.record_key_insert(new); Ok(new) } - async fn range( - &self, - start: &Interest, - end: &Interest, - offset: usize, - limit: usize, - ) -> anyhow::Result> { + async fn range(&self, start: &Interest, end: &Interest) -> anyhow::Result> { StoreMetricsMiddleware::::record( &self.metrics, "api_interest_range", - self.store.range(start, end, offset, limit), + self.store.range(start, end), ) .await } @@ -194,28 +188,14 @@ where async fn range( &self, range: Range<&Self::Key>, - offset: usize, - limit: usize, ) -> ReconResult + Send + 'static>> { - StoreMetricsMiddleware::::record( - &self.metrics, - "range", - self.store.range(range, offset, limit), - ) - .await + StoreMetricsMiddleware::::record(&self.metrics, "range", self.store.range(range)).await } - async fn range_with_values( - &self, - range: Range<&Self::Key>, - offset: usize, - limit: usize, - ) -> ReconResult)> + Send + 'static>> { - StoreMetricsMiddleware::::record( - &self.metrics, - "range_with_values", - self.store.range_with_values(range, offset, limit), - ) - .await + async fn first(&self, range: Range<&Self::Key>) -> ReconResult> { + StoreMetricsMiddleware::::record(&self.metrics, "first", self.store.first(range)).await + } + async fn middle(&self, range: Range<&Self::Key>) -> ReconResult> { + StoreMetricsMiddleware::::record(&self.metrics, "middle", self.store.middle(range)).await } async fn full_range( @@ -225,9 +205,6 @@ where .await } - async fn middle(&self, range: Range<&Self::Key>) -> ReconResult> { - StoreMetricsMiddleware::::record(&self.metrics, "middle", self.store.middle(range)).await - } async fn count(&self, range: Range<&Self::Key>) -> ReconResult { StoreMetricsMiddleware::::record(&self.metrics, "count", self.store.count(range)).await } diff --git a/interest-svc/src/store/sql/access/interest.rs b/interest-svc/src/store/sql/access/interest.rs index 07a848337..4f5d6bd8a 100644 --- a/interest-svc/src/store/sql/access/interest.rs +++ b/interest-svc/src/store/sql/access/interest.rs @@ -10,7 +10,7 @@ use sqlx::Row; use crate::store::{ sql::{ - entities::ReconHash, + entities::{OrderKey, ReconHash}, query::{ReconQuery, SqlBackend}, SqliteTransaction, }, @@ -104,19 +104,12 @@ impl CeramicOneInterest { Ok(HashCount::new(Sha256a::from(bytes), res.count())) } - /// Find the interestsin the range between left_fencepost and right_fencepost. - pub async fn range( - pool: &SqlitePool, - range: Range<&Interest>, - offset: usize, - limit: usize, - ) -> Result> { + /// Find the interests in the range + pub async fn range(pool: &SqlitePool, range: Range<&Interest>) -> Result> { let query = sqlx::query(ReconQuery::range()); let rows = query .bind(range.start.as_bytes()) .bind(range.end.as_bytes()) - .bind(limit as i64) - .bind(offset as i64) .fetch_all(pool.reader()) .await?; let rows = rows @@ -130,6 +123,35 @@ impl CeramicOneInterest { Ok(rows) } + /// Find the first interest in the range + pub async fn first(pool: &SqlitePool, range: Range<&Interest>) -> Result> { + sqlx::query_as(ReconQuery::first()) + .bind(range.start.as_bytes()) + .bind(range.end.as_bytes()) + .fetch_optional(pool.reader()) + .await + .map_err(Error::from)? + .map(|key: OrderKey| Interest::try_from(key)) + .transpose() + .map_err(Error::new_fatal) + } + /// Find the approximate middle interest in the range + pub async fn middle(pool: &SqlitePool, range: Range<&Interest>) -> Result> { + let count = Self::count(pool, range.clone()).await?; + // (usize::MAX / 2) == i64::MAX, meaning it should always fit inside an i64. + // However to be safe we default to i64::MAX. + let half: i64 = (count / 2).try_into().unwrap_or(i64::MAX); + sqlx::query_as(ReconQuery::middle()) + .bind(range.start.as_bytes()) + .bind(range.end.as_bytes()) + .bind(half) + .fetch_optional(pool.reader()) + .await + .map_err(Error::from)? + .map(|key: OrderKey| Interest::try_from(key)) + .transpose() + .map_err(Error::new_fatal) + } /// Count the number of keys in a given range pub async fn count(pool: &SqlitePool, range: Range<&Interest>) -> Result { let row = sqlx::query(ReconQuery::count(SqlBackend::Sqlite)) diff --git a/interest-svc/src/store/sql/entities/mod.rs b/interest-svc/src/store/sql/entities/mod.rs index 4860a3909..71602a528 100644 --- a/interest-svc/src/store/sql/entities/mod.rs +++ b/interest-svc/src/store/sql/entities/mod.rs @@ -1,5 +1,7 @@ mod hash; +mod order_key; mod version; pub use hash::ReconHash; +pub use order_key::OrderKey; pub use version::VersionRow; diff --git a/interest-svc/src/store/sql/entities/order_key.rs b/interest-svc/src/store/sql/entities/order_key.rs new file mode 100644 index 000000000..9e85bda5a --- /dev/null +++ b/interest-svc/src/store/sql/entities/order_key.rs @@ -0,0 +1,14 @@ +use ceramic_core::Interest; + +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct OrderKey { + pub order_key: Vec, +} + +impl TryFrom for Interest { + type Error = ceramic_core::interest::InvalidInterest; + + fn try_from(value: OrderKey) -> std::prelude::v1::Result { + Interest::try_from(value.order_key) + } +} diff --git a/interest-svc/src/store/sql/query.rs b/interest-svc/src/store/sql/query.rs index 98c319db5..98dec8e10 100644 --- a/interest-svc/src/store/sql/query.rs +++ b/interest-svc/src/store/sql/query.rs @@ -36,6 +36,30 @@ impl ReconQuery { } /// Requires binding 2 parameters pub fn range() -> &'static str { + r#"SELECT + order_key + FROM + ceramic_one_interest + WHERE + order_key >= $1 AND order_key < $2 + ORDER BY + order_key ASC;"# + } + /// Requires binding 2 parameters + pub fn first() -> &'static str { + r#"SELECT + order_key + FROM + ceramic_one_interest + WHERE + order_key >= $1 AND order_key < $2 + ORDER BY + order_key ASC + LIMIT + 1;"# + } + /// Requires binding 3 parameters + pub fn middle() -> &'static str { r#"SELECT order_key FROM @@ -45,9 +69,9 @@ impl ReconQuery { ORDER BY order_key ASC LIMIT - $3 + 1 OFFSET - $4;"# + $3;"# } pub fn count(db: SqlBackend) -> &'static str { diff --git a/interest-svc/src/tests/interest.rs b/interest-svc/src/tests/interest.rs index ecad79529..2684b1ba7 100644 --- a/interest-svc/src/tests/interest.rs +++ b/interest-svc/src/tests/interest.rs @@ -85,15 +85,9 @@ async fn access_interest_model(store: impl InterestService) { InterestService::insert(&store, interest_1.clone()) .await .unwrap(); - let interests = InterestService::range( - &store, - &random_interest_min(), - &random_interest_max(), - 0, - usize::MAX, - ) - .await - .unwrap(); + let interests = InterestService::range(&store, &random_interest_min(), &random_interest_max()) + .await + .unwrap(); assert_eq!( BTreeSet::from_iter([interest_0, interest_1]), BTreeSet::from_iter(interests) @@ -101,12 +95,12 @@ async fn access_interest_model(store: impl InterestService) { } test_with_dbs!( - test_hash_range_query, - test_hash_range_query, + hash_range_query, + hash_range_query, ["delete from ceramic_one_interest"] ); -async fn test_hash_range_query(store: S) +async fn hash_range_query(store: S) where S: recon::Store, { @@ -140,93 +134,92 @@ where } test_with_dbs!( - test_range_query, - test_range_query, + range_query, + range_query, ["delete from ceramic_one_interest"] ); -async fn test_range_query(store: S) +async fn range_query(store: S) where S: recon::Store, { - let interest_0 = random_interest(None, None); - let interest_1 = random_interest(None, None); + let mut interests: Vec<_> = (0..10).map(|_| random_interest(None, None)).collect(); + let items: Vec<_> = interests + .iter() + .map(|interest| ReconItem::new(interest.clone(), Vec::new())) + .collect(); - recon::Store::insert_many( - &store, - &[ReconItem::new(interest_0.clone(), Vec::new())], - NodeKey::random().id(), - ) - .await - .unwrap(); - recon::Store::insert_many( - &store, - &[ReconItem::new(interest_1.clone(), Vec::new())], - NodeKey::random().id(), - ) - .await - .unwrap(); - let ids = recon::Store::range( - &store, - &random_interest_min()..&random_interest_max(), - 0, - usize::MAX, - ) - .await - .unwrap(); - let interests = ids.collect::>(); - assert_eq!(BTreeSet::from_iter([interest_0, interest_1]), interests); + recon::Store::insert_many(&store, &items, NodeKey::random().id()) + .await + .unwrap(); + let ids = recon::Store::range(&store, &random_interest_min()..&random_interest_max()) + .await + .unwrap(); + let mut ids: Vec = ids.collect(); + interests.sort(); + ids.sort(); + assert_eq!(interests, ids); } test_with_dbs!( - test_range_with_values_query, - test_range_with_values_query, + first_query, + first_query, ["delete from ceramic_one_interest"] ); -async fn test_range_with_values_query(store: S) +async fn first_query(store: S) where - S: recon::Store, + S: recon::Store + Sync, { - let interest_0 = random_interest(None, None); - let interest_1 = random_interest(None, None); + let mut interests: Vec<_> = (0..10).map(|_| random_interest(None, None)).collect(); + let items: Vec<_> = interests + .iter() + .map(|interest| ReconItem::new(interest.clone(), Vec::new())) + .collect(); - store - .insert_many( - &[ReconItem::new(interest_0.clone(), Vec::new())], - NodeKey::random().id(), - ) + recon::Store::insert_many(&store, &items, NodeKey::random().id()) .await .unwrap(); - store - .insert_many( - &[ReconItem::new(interest_1.clone(), Vec::new())], - NodeKey::random().id(), - ) + let first = recon::Store::first(&store, &random_interest_min()..&random_interest_max()) + .await + .unwrap(); + interests.sort(); + assert_eq!(Some(interests[0].clone()), first); +} + +test_with_dbs!( + middle_query, + middle_query, + ["delete from ceramic_one_interest"] +); + +async fn middle_query(store: S) +where + S: recon::Store + Sync, +{ + let mut interests: Vec<_> = (0..10).map(|_| random_interest(None, None)).collect(); + let items: Vec<_> = interests + .iter() + .map(|interest| ReconItem::new(interest.clone(), Vec::new())) + .collect(); + + recon::Store::insert_many(&store, &items, NodeKey::random().id()) .await .unwrap(); - let ids = store - .range_with_values( - &random_interest_min()..&random_interest_max(), - 0, - usize::MAX, - ) + let middle = recon::Store::middle(&store, &random_interest_min()..&random_interest_max()) .await .unwrap(); - let interests = ids - .into_iter() - .map(|(i, _v)| i) - .collect::>(); - assert_eq!(BTreeSet::from_iter([interest_0, interest_1]), interests); + interests.sort(); + assert_eq!(Some(interests[interests.len() / 2].clone()), middle); } test_with_dbs!( - test_double_insert, - test_double_insert, + double_insert, + double_insert, ["delete from ceramic_one_interest"] ); -async fn test_double_insert(store: S) +async fn double_insert(store: S) where S: recon::Store, { @@ -253,12 +246,12 @@ where } test_with_dbs!( - test_value_for_key, - test_value_for_key, + value_for_key, + value_for_key, ["delete from ceramic_one_interest"] ); -async fn test_value_for_key(store: S) +async fn value_for_key(store: S) where S: recon::Store, { diff --git a/p2p/src/node.rs b/p2p/src/node.rs index 32d71fb62..0c0fed5d0 100644 --- a/p2p/src/node.rs +++ b/p2p/src/node.rs @@ -1269,13 +1269,7 @@ mod tests { unreachable!() } - async fn range( - &self, - _left_fencepost: Self::Key, - _right_fencepost: Self::Key, - _offset: usize, - _limit: usize, - ) -> ReconResult> { + async fn range(&self, _range: std::ops::Range<&Self::Key>) -> ReconResult> { unreachable!() } diff --git a/recon/src/libp2p/tests.rs b/recon/src/libp2p/tests.rs index 7f603f90e..17ece75b3 100644 --- a/recon/src/libp2p/tests.rs +++ b/recon/src/libp2p/tests.rs @@ -82,22 +82,20 @@ where async fn range( &self, range: Range<&Self::Key>, - offset: usize, - limit: usize, ) -> ReconResult + Send + 'static>> { self.as_error()?; - self.inner.range(range, offset, limit).await + self.inner.range(range).await } - async fn range_with_values( - &self, - range: Range<&Self::Key>, - offset: usize, - limit: usize, - ) -> ReconResult)> + Send + 'static>> { + async fn first(&self, range: Range<&Self::Key>) -> ReconResult> { + self.as_error()?; + + self.inner.first(range).await + } + async fn middle(&self, range: Range<&Self::Key>) -> ReconResult> { self.as_error()?; - self.inner.range_with_values(range, offset, limit).await + self.inner.middle(range).await } async fn value_for_key(&self, key: &Self::Key) -> ReconResult>> { diff --git a/recon/src/protocol.rs b/recon/src/protocol.rs index 59215a548..0ac52b20c 100644 --- a/recon/src/protocol.rs +++ b/recon/src/protocol.rs @@ -11,6 +11,8 @@ //! Encoding and framing of messages is outside the scope of this crate. //! However the message types do implement serde::Serialize and serde::Deserialize. +use std::ops::Range; + use anyhow::{anyhow, bail, Context, Result}; use async_stream::try_stream; use async_trait::async_trait; @@ -747,7 +749,7 @@ where // TODO this holds all keys in memory // Paginate or otherwise stream the keys and values back out let keys = recon - .range(range.first, range.last, 0, usize::MAX) + .range(&range.first..&range.last) .await?; for key in keys { let value = recon.value_for_key(key.clone()).await?.ok_or_else(|| anyhow!("recon key does not have a value: key={}", key))?; @@ -824,13 +826,7 @@ pub trait Recon: Clone + Send + Sync + 'static { ) -> ReconResult>; /// Get all keys in the specified range - async fn range( - &self, - left_fencepost: Self::Key, - right_fencepost: Self::Key, - offset: usize, - limit: usize, - ) -> ReconResult>; + async fn range(&self, range: Range<&Self::Key>) -> ReconResult>; /// Reports total number of keys async fn len(&self) -> ReconResult; diff --git a/recon/src/recon.rs b/recon/src/recon.rs index c03bfc717..beaf132a7 100644 --- a/recon/src/recon.rs +++ b/recon/src/recon.rs @@ -72,11 +72,7 @@ where trace!(count, ?range, "small split sending all keys"); // We have only a few keys in the range. Let's short circuit the roundtrips and // send the keys directly. - let keys: Vec = self - .store - .range(&range.first..&range.last, 0, usize::MAX) - .await? - .collect(); + let keys: Vec = self.store.range(&range.first..&range.last).await?.collect(); let mut ranges = Vec::with_capacity(keys.len() + 1); let mut prev: Option = None; @@ -147,22 +143,6 @@ where self.store.is_empty().await } - /// Return all keys and values in the range between left_fencepost and right_fencepost. - /// The upper range bound is exclusive. - /// - /// Offset and limit values are applied within the range of keys. - pub async fn range_with_values( - &self, - left_fencepost: &K, - right_fencepost: &K, - offset: usize, - limit: usize, - ) -> Result)> + Send + 'static>> { - self.store - .range_with_values(left_fencepost..right_fencepost, offset, limit) - .await - } - /// Return all keys. pub async fn full_range(&self) -> Result + Send + 'static>> { self.store.full_range().await @@ -185,22 +165,11 @@ where self.store.insert_many(&items, informant).await } - /// Return all keys in the range between left_fencepost and right_fencepost. - /// The upper range bound is exclusive. + /// Return all keys in the range. /// /// Offset and limit values are applied within the range of keys. - async fn range( - &self, - left_fencepost: Self::Key, - right_fencepost: Self::Key, - offset: usize, - limit: usize, - ) -> Result> { - Ok(self - .store - .range(&left_fencepost..&right_fencepost, offset, limit) - .await? - .collect()) + async fn range(&self, range: Range<&Self::Key>) -> Result> { + Ok(self.store.range(range).await?.collect()) } async fn len(&self) -> Result { @@ -296,9 +265,8 @@ where // in the range already. let split_key = self .store - .range(&range.first..&range.last, 0, 1) + .first(&range.first..&range.last,) .await? - .next() .ok_or_else(|| { Error::new_fatal(anyhow!( "unreachable, at least one key should exist in range given the conditional guard above" @@ -543,80 +511,31 @@ pub trait Store { async fn hash_range(&self, range: Range<&Self::Key>) -> Result>; /// Return all keys in the range. - /// The upper range bound is exclusive. - /// - /// Offset and limit values are applied within the range of keys. async fn range( &self, range: Range<&Self::Key>, - offset: usize, - limit: usize, ) -> Result + Send + 'static>>; - /// Return all keys and values in the range between left_fencepost and right_fencepost. - /// The upper range bound is exclusive. + /// Return the first key in the range. /// - /// Offset and limit values are applied within the range of keys. - async fn range_with_values( - &self, - range: Range<&Self::Key>, - offset: usize, - limit: usize, - ) -> Result)> + Send + 'static>>; + /// Default implementation uses range and reports only the first key + async fn first(&self, range: Range<&Self::Key>) -> Result> { + self.range(range).await.map(|mut keys| keys.next()) + } /// Return all keys. async fn full_range(&self) -> Result + Send + 'static>> { - self.range( - &Self::Key::min_value()..&Self::Key::max_value(), - 0, - usize::MAX, - ) - .await + self.range(&Self::Key::min_value()..&Self::Key::max_value()) + .await } /// Return a key that is approximately in the middle of the range. /// An exact middle is not necessary but performance will be better with a better approximation. - /// - /// The default implementation will count all elements and then find the middle. - async fn middle(&self, range: Range<&Self::Key>) -> Result> { - let count = self.count(range.clone()).await?; - if count == 0 { - Ok(None) - } else { - Ok(self.range(range, count / 2, 1).await?.next()) - } - } - - /// Return any number of splits of the range. - /// An exact split is not necessary but performance will be better with a better approximation. - /// - /// The input range bounds are not part of the returned split as they are the - /// implicit outermost bounds of the split. - /// - /// The default implementation uses middle to split the range approximately in two. - async fn split(&self, range: Range<&Self::Key>) -> Result> { - if let Some(middle) = self.middle(range.clone()).await? { - let left = self.hash_range(range.start..&middle).await?; - let right = self.hash_range(&middle..range.end).await?; - Ok(Split { - keys: vec![middle], - hashes: vec![left, right], - }) - } else { - // No keys in range return empty split - Ok(Split { - keys: vec![], - hashes: vec![HashCount { - hash: Self::Hash::identity(), - count: 0, - }], - }) - } - } + async fn middle(&self, range: Range<&Self::Key>) -> Result>; /// Return the number of keys within the range. async fn count(&self, range: Range<&Self::Key>) -> Result { - Ok(self.range(range, 0, usize::MAX).await?.count()) + Ok(self.range(range).await?.count()) } /// Reports total number of keys @@ -633,6 +552,8 @@ pub trait Store { async fn value_for_key(&self, key: &Self::Key) -> Result>>; } +// Explicitly implement every member of the trait so we do not mask any non default implementations +// on the S instance. #[async_trait::async_trait] impl Store for std::sync::Arc where @@ -650,29 +571,33 @@ where ) -> Result> { self.as_ref().insert_many(items, informant).await } - async fn hash_range(&self, range: Range<&Self::Key>) -> Result> { self.as_ref().hash_range(range).await } - async fn range( &self, range: Range<&Self::Key>, - offset: usize, - limit: usize, ) -> Result + Send + 'static>> { - self.as_ref().range(range, offset, limit).await + self.as_ref().range(range).await } - - async fn range_with_values( - &self, - range: Range<&Self::Key>, - offset: usize, - limit: usize, - ) -> Result)> + Send + 'static>> { - self.as_ref().range_with_values(range, offset, limit).await + async fn first(&self, range: Range<&Self::Key>) -> Result> { + self.as_ref().first(range).await + } + async fn full_range(&self) -> Result + Send + 'static>> { + self.as_ref().full_range().await + } + async fn middle(&self, range: Range<&Self::Key>) -> Result> { + self.as_ref().middle(range).await + } + async fn count(&self, range: Range<&Self::Key>) -> Result { + self.as_ref().count(range).await + } + async fn len(&self) -> Result { + self.as_ref().len().await + } + async fn is_empty(&self) -> Result { + self.as_ref().is_empty().await } - async fn value_for_key(&self, key: &Self::Key) -> Result>> { self.as_ref().value_for_key(key).await } @@ -859,7 +784,7 @@ where async fn interests(&self) -> Result>> { self.store - .range(&self.start..&self.end, 0, usize::MAX) + .range(&self.start..&self.end) .await? .map(|interest| { if let Some(RangeOpen { start, end }) = interest.range() { diff --git a/recon/src/recon/btreestore.rs b/recon/src/recon/btreestore.rs index c6a441ce6..0f4bc49b4 100644 --- a/recon/src/recon/btreestore.rs +++ b/recon/src/recon/btreestore.rs @@ -51,9 +51,7 @@ where r } - /// Return the hash of all keys in the range between left_fencepost and right_fencepost. - /// Both range bounds are exclusive. - pub async fn hash_range(&self, range: Range<&K>) -> Result> { + async fn hash_range(&self, range: Range<&K>) -> Result> { if range.start >= range.end { return Ok(HashCount { hash: H::identity(), @@ -74,15 +72,9 @@ where }) } - /// Return all keys in the range between left_fencepost and right_fencepost. - /// Both range bounds are exclusive. - /// - /// Offset and limit values are applied within the range of keys. - pub async fn range( + async fn range( &self, range: Range<&K>, - offset: usize, - limit: usize, ) -> Result + Send + 'static>> { let keys: Vec = self .inner @@ -90,38 +82,30 @@ where .await .keys .range(range) - .skip(offset) - .take(limit) .map(|(key, _hash)| key) .cloned() .collect(); Ok(Box::new(keys.into_iter())) } - /// Return all keys and values in the range between left_fencepost and right_fencepost. - /// Both range bounds are exclusive. - /// - /// Offset and limit values are applied within the range of keys. - pub async fn range_with_values( - &self, - range: Range<&K>, - offset: usize, - limit: usize, - ) -> Result)> + Send + 'static>> { - let inner = self.inner.lock().await; + async fn first(&self, range: Range<&K>) -> Result> { + Ok(self + .inner + .lock() + .await + .keys + .range(range) + .next() + .map(|(k, _)| k.clone())) + } - let keys: Vec<(K, Vec)> = inner + async fn middle(&self, range: Range<&K>) -> Result> { + let inner = self.inner.lock().await; + let count = inner.keys.range(range.clone()).count(); + Ok(inner .keys .range(range) - .skip(offset) - .take(limit) - .filter_map(|(key, _hash)| { - inner - .values - .get(key) - .map(|value| (key.clone(), value.clone())) - }) - .collect(); - Ok(Box::new(keys.into_iter())) + .nth(count / 2) + .map(|(k, _)| k.clone())) } async fn insert(&self, item: &ReconItem, _informant: NodeId) -> Result { @@ -160,28 +144,21 @@ where } async fn hash_range(&self, range: Range<&Self::Key>) -> Result> { - // Self does not need async to implement hash_range, so it exposes a pub non async hash_range function - // and we delegate to its implementation here. BTreeStore::hash_range(self, range).await } async fn range( &self, range: Range<&Self::Key>, - offset: usize, - limit: usize, ) -> Result + Send + 'static>> { - // Self does not need async to implement range, so it exposes a pub non async range function - // and we delegate to its implementation here. - BTreeStore::range(self, range, offset, limit).await + BTreeStore::range(self, range).await } - async fn range_with_values( - &self, - range: Range<&Self::Key>, - offset: usize, - limit: usize, - ) -> Result)> + Send + 'static>> { - BTreeStore::range_with_values(self, range, offset, limit).await + async fn first(&self, range: Range<&Self::Key>) -> Result> { + BTreeStore::first(self, range).await + } + + async fn middle(&self, range: Range<&Self::Key>) -> Result> { + BTreeStore::middle(self, range).await } async fn value_for_key(&self, key: &Self::Key) -> Result>> {