Skip to content

Commit

Permalink
refactor: remove offset/limit where too generic (#624)
Browse files Browse the repository at this point in the history
* refactor: remove offset/limit where to generic

This change removes the offset/limit pagination pattern where it implies
a more flexible API than was needed or used. Having the flexible API
meant that casts from usize to smaller types for limits and offsets
could break pagination assumptions. Now with this change more specific
APIs are exposed where its safe to make assumptions about the size of
limit offset values in queries.

Additionally in the API where true pagination was the desired API
u32 is used instead of usize to keep page sizes small.

Finally, the range_with_values method on the Recon trait was removed as
it was unused

* fix: add tests and fix middle syntax error
  • Loading branch information
nathanielc authored Dec 4, 2024
1 parent 763c7e4 commit c479255
Show file tree
Hide file tree
Showing 22 changed files with 401 additions and 495 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM public.ecr.aws/r5b3e0r5/3box/rust-builder:latest as builder
FROM public.ecr.aws/r5b3e0r5/3box/rust-builder:latest AS builder

RUN mkdir -p /home/builder/rust-ceramic
WORKDIR /home/builder/rust-ceramic
Expand Down
40 changes: 11 additions & 29 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,7 @@ impl TryFrom<models::Interest> for ValidatedInterest {
pub trait InterestService: Send + Sync {
/// Returns true if the key was newly inserted, false if it already existed.
async fn insert(&self, key: Interest) -> Result<bool>;
async fn range(
&self,
start: &Interest,
end: &Interest,
offset: usize,
limit: usize,
) -> Result<Vec<Interest>>;
async fn range(&self, start: &Interest, end: &Interest) -> Result<Vec<Interest>>;
}

#[async_trait]
Expand All @@ -165,14 +159,8 @@ impl<S: InterestService> InterestService for Arc<S> {
self.as_ref().insert(key).await
}

async fn range(
&self,
start: &Interest,
end: &Interest,
offset: usize,
limit: usize,
) -> Result<Vec<Interest>> {
self.as_ref().range(start, end, offset, limit).await
async fn range(&self, start: &Interest, end: &Interest) -> Result<Vec<Interest>> {
self.as_ref().range(start, end).await
}
}

Expand Down Expand Up @@ -271,8 +259,8 @@ pub trait EventService: Send + Sync {
async fn range_with_values(
&self,
range: Range<EventId>,
offset: usize,
limit: usize,
offset: u32,
limit: u32,
) -> Result<Vec<(Cid, Vec<u8>)>>;

/**
Expand Down Expand Up @@ -313,8 +301,8 @@ impl<S: EventService> EventService for Arc<S> {
async fn range_with_values(
&self,
range: Range<EventId>,
offset: usize,
limit: usize,
offset: u32,
limit: u32,
) -> Result<Vec<(Cid, Vec<u8>)>> {
self.as_ref().range_with_values(range, offset, limit).await
}
Expand Down Expand Up @@ -605,12 +593,7 @@ where
) -> Result<ExperimentalInterestsGetResponse, ErrorResponse> {
let interests = self
.interest
.range(
&Interest::min_value(),
&Interest::max_value(),
0,
usize::MAX,
)
.range(&Interest::min_value(), &Interest::max_value())
.await
.map_err(|e| ErrorResponse::new(format!("failed to get interests: {e}")))?;

Expand Down Expand Up @@ -654,9 +637,8 @@ where
offset: Option<i32>,
limit: Option<i32>,
) -> Result<ExperimentalEventsSepSepValueGetResponse, ErrorResponse> {
let limit: usize =
limit.map_or(10000, |l| if l.is_negative() { 10000 } else { l }) as usize;
let offset = offset.map_or(0, |o| if o.is_negative() { 0 } else { o }) as usize;
let limit = limit.map_or(10000, |l| if l.is_negative() { 10000 } else { l }) as u32;
let offset = offset.map_or(0, |o| if o.is_negative() { 0 } else { o }) as u32;
let sep_value = match decode_multibase_data(&sep_value) {
Ok(v) => v,
Err(e) => return Ok(ExperimentalEventsSepSepValueGetResponse::BadRequest(e)),
Expand All @@ -674,7 +656,7 @@ where
.map(|(id, data)| BuildResponse::event(id, Some(data)))
.collect::<Vec<_>>();

let event_cnt = events.len();
let event_cnt = events.len() as u32;
Ok(ExperimentalEventsSepSepValueGetResponse::Success(
models::EventsGet {
resume_offset: (offset + event_cnt) as i32,
Expand Down
14 changes: 4 additions & 10 deletions api/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ mock! {
&self,
start: &Interest,
end: &Interest,
offset: usize,
limit: usize,
) -> Result<Vec<Interest>>;
}
}
Expand All @@ -128,8 +126,8 @@ mock! {
async fn range_with_values(
&self,
range: Range<EventId>,
offset: usize,
limit: usize,
offset: u32,
limit: u32,
) -> Result<Vec<(Cid, Vec<u8>)>>;
async fn value_for_order_key(&self, key: &EventId) -> Result<Option<Vec<u8>>>;
async fn value_for_cid(&self, key: &Cid) -> Result<Option<Vec<u8>>>;
Expand Down Expand Up @@ -580,11 +578,9 @@ async fn get_interests() {
.with(
predicate::eq(Interest::min_value()),
predicate::eq(Interest::max_value()),
predicate::eq(0),
predicate::eq(usize::MAX),
)
.once()
.return_once(move |_, _, _, _| {
.return_once(move |_, _| {
Ok(vec![
Interest::builder()
.with_sep_key("model")
Expand Down Expand Up @@ -662,11 +658,9 @@ async fn get_interests_for_peer() {
.with(
predicate::eq(Interest::min_value()),
predicate::eq(Interest::max_value()),
predicate::eq(0),
predicate::eq(usize::MAX),
)
.once()
.return_once(move |_, _, _, _| {
.return_once(move |_, _| {
Ok(vec![
Interest::builder()
.with_sep_key("model")
Expand Down
48 changes: 9 additions & 39 deletions event-svc/src/event/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ impl recon::Store for EventService {
type Key = EventId;
type Hash = Sha256a;

/// Insert new keys into the key space.
/// Returns true for each key if it did not previously exist, in the
/// same order as the input iterator.
async fn insert_many(
&self,
items: &[ReconItem<Self::Key>],
Expand All @@ -67,9 +64,6 @@ impl recon::Store for EventService {
Ok(res.into())
}

/// Return the hash of all keys in the range between left_fencepost and right_fencepost.
/// Both range bounds are exclusive.
/// Returns ReconResult<(Hash, count), Err>
async fn hash_range(&self, range: Range<&Self::Key>) -> ReconResult<HashCount<Self::Hash>> {
let res = self
.event_access
Expand All @@ -79,52 +73,28 @@ impl recon::Store for EventService {
Ok(res)
}

/// Return all keys in the range between left_fencepost and right_fencepost.
/// Both range bounds are exclusive.
///
/// Offset and limit values are applied within the range of keys.
async fn range(
&self,
range: Range<&Self::Key>,
offset: usize,
limit: usize,
) -> ReconResult<Box<dyn Iterator<Item = Self::Key> + Send + 'static>> {
Ok(Box::new(
self.event_access
.range(range, offset, limit)
.range(range)
.await
.map_err(Error::from)?
.into_iter(),
))
}

/// Return all keys and values in the range between left_fencepost and right_fencepost.
/// Both range bounds are exclusive.
///
/// Offset and limit values are applied within the range of keys.
async fn range_with_values(
&self,
range: Range<&Self::Key>,
offset: usize,
limit: usize,
) -> ReconResult<Box<dyn Iterator<Item = (Self::Key, Vec<u8>)> + Send + 'static>> {
Ok(Box::new(
self.event_access
.range_with_values(range, offset, limit)
.await
.map_err(Error::from)?
.into_iter(),
))
async fn first(&self, range: Range<&Self::Key>) -> ReconResult<Option<Self::Key>> {
Ok(self.event_access.first(range).await.map_err(Error::from)?)
}
/// Return the number of keys within the range.
async fn middle(&self, range: Range<&Self::Key>) -> ReconResult<Option<Self::Key>> {
Ok(self.event_access.middle(range).await.map_err(Error::from)?)
}

async fn count(&self, range: Range<&Self::Key>) -> ReconResult<usize> {
Ok(self.event_access.count(range).await.map_err(Error::from)?)
}

/// value_for_key returns
/// Ok(Some(value)) if stored,
/// Ok(None) if not stored, and
/// Err(e) if retrieving failed.
async fn value_for_key(&self, key: &Self::Key) -> ReconResult<Option<Vec<u8>>> {
Ok(self
.event_access
Expand Down Expand Up @@ -210,8 +180,8 @@ impl ceramic_api::EventService for EventService {
async fn range_with_values(
&self,
range: Range<EventId>,
offset: usize,
limit: usize,
offset: u32,
limit: u32,
) -> anyhow::Result<Vec<(Cid, Vec<u8>)>> {
self.event_access
.range_with_values(&range.start..&range.end, offset, limit)
Expand Down
34 changes: 8 additions & 26 deletions event-svc/src/store/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ where
async fn range_with_values(
&self,
range: Range<EventId>,
offset: usize,
limit: usize,
offset: u32,
limit: u32,
) -> anyhow::Result<Vec<(Cid, Vec<u8>)>> {
StoreMetricsMiddleware::<S>::record(
&self.metrics,
Expand Down Expand Up @@ -277,28 +277,14 @@ where
async fn range(
&self,
range: Range<&Self::Key>,
offset: usize,
limit: usize,
) -> ReconResult<Box<dyn Iterator<Item = Self::Key> + Send + 'static>> {
StoreMetricsMiddleware::<S>::record(
&self.metrics,
"range",
self.store.range(range, offset, limit),
)
.await
StoreMetricsMiddleware::<S>::record(&self.metrics, "range", self.store.range(range)).await
}
async fn range_with_values(
&self,
range: Range<&Self::Key>,
offset: usize,
limit: usize,
) -> ReconResult<Box<dyn Iterator<Item = (Self::Key, Vec<u8>)> + Send + 'static>> {
StoreMetricsMiddleware::<S>::record(
&self.metrics,
"range_with_values",
self.store.range_with_values(range, offset, limit),
)
.await
async fn first(&self, range: Range<&Self::Key>) -> ReconResult<Option<Self::Key>> {
StoreMetricsMiddleware::<S>::record(&self.metrics, "first", self.store.first(range)).await
}
async fn middle(&self, range: Range<&Self::Key>) -> ReconResult<Option<Self::Key>> {
StoreMetricsMiddleware::<S>::record(&self.metrics, "middle", self.store.middle(range)).await
}

async fn full_range(
Expand All @@ -307,10 +293,6 @@ where
StoreMetricsMiddleware::<S>::record(&self.metrics, "full_range", self.store.full_range())
.await
}

async fn middle(&self, range: Range<&Self::Key>) -> ReconResult<Option<Self::Key>> {
StoreMetricsMiddleware::<S>::record(&self.metrics, "middle", self.store.middle(range)).await
}
async fn count(&self, range: Range<&Self::Key>) -> ReconResult<usize> {
StoreMetricsMiddleware::<S>::record(&self.metrics, "count", self.store.count(range)).await
}
Expand Down
48 changes: 30 additions & 18 deletions event-svc/src/store/sql/access/event.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{
num::TryFromIntError,
ops::Range,
sync::atomic::{AtomicI64, Ordering},
};
Expand Down Expand Up @@ -242,21 +241,10 @@ impl EventAccess {
}

/// Find a range of event IDs
pub async fn range(
&self,
range: Range<&EventId>,
offset: usize,
limit: usize,
) -> Result<Vec<EventId>> {
let offset: i64 = offset.try_into().map_err(|_e: TryFromIntError| {
Error::new_app(anyhow!("Offset too large to fit into i64"))
})?;
let limit = limit.try_into().unwrap_or(100000); // 100k is still a huge limit
pub async fn range(&self, range: Range<&EventId>) -> Result<Vec<EventId>> {
let rows: Vec<OrderKey> = sqlx::query_as(ReconQuery::range())
.bind(range.start.as_bytes())
.bind(range.end.as_bytes())
.bind(limit)
.bind(offset)
.fetch_all(self.pool.reader())
.await
.map_err(Error::from)?;
Expand All @@ -267,16 +255,40 @@ impl EventAccess {
Ok(rows)
}

/// Find first event id within the range
pub async fn first(&self, range: Range<&EventId>) -> Result<Option<EventId>> {
let key: Option<OrderKey> = sqlx::query_as(ReconQuery::first())
.bind(range.start.as_bytes())
.bind(range.end.as_bytes())
.fetch_optional(self.pool.reader())
.await
.map_err(Error::from)?;
key.map(|k| EventId::try_from(k).map_err(|e: InvalidEventId| Error::new_app(anyhow!(e))))
.transpose()
}
/// Find an approximate middle event id within the range
pub async fn middle(&self, range: Range<&EventId>) -> Result<Option<EventId>> {
let count = self.count(range.clone()).await?;
// (usize::MAX / 2) == i64::MAX, meaning it should always fit inside an i64.
// However to be safe we default to i64::MAX.
let half: i64 = (count / 2).try_into().unwrap_or(i64::MAX);
let key: Option<OrderKey> = sqlx::query_as(ReconQuery::middle())
.bind(range.start.as_bytes())
.bind(range.end.as_bytes())
.bind(half)
.fetch_optional(self.pool.reader())
.await
.map_err(Error::from)?;
key.map(|k| EventId::try_from(k).map_err(|e: InvalidEventId| Error::new_app(anyhow!(e))))
.transpose()
}
/// Find a range of event IDs with their values. Should replace `range` when we move to discovering values and keys simultaneously.
pub async fn range_with_values(
&self,
range: Range<&EventId>,
offset: usize,
limit: usize,
offset: u32,
limit: u32,
) -> Result<Vec<(EventId, Vec<u8>)>> {
let offset = offset.try_into().unwrap_or(i64::MAX);
let limit: i64 = limit.try_into().unwrap_or(i64::MAX);

let all_blocks: Vec<ReconEventBlockRaw> =
sqlx::query_as(EventQuery::value_blocks_by_order_key_many())
.bind(range.start.as_bytes())
Expand Down
Loading

0 comments on commit c479255

Please sign in to comment.