From adef4c500f6580eeeefdfceecb8ebcfcdb6843c2 Mon Sep 17 00:00:00 2001 From: Flix Date: Sun, 18 Aug 2024 14:43:05 +0200 Subject: [PATCH] refactor!: Remodel the way paging works There is now a page wrapper, which is able to return the stream of all entries and matches. Prefetching is done via stream buffering now. --- README.md | 4 +- crates/fhir-sdk/src/client/fhir/crud.rs | 94 +++--- crates/fhir-sdk/src/client/fhir/mod.rs | 1 + crates/fhir-sdk/src/client/fhir/paging.rs | 304 +++++++++--------- .../fhir-sdk/src/extensions/any_resource.rs | 4 +- crates/fhir-sdk/src/extensions/bundle.rs | 30 +- crates/fhir-sdk/tests/client-hapi.rs | 8 +- crates/fhir-sdk/tests/client-medplum-r4.rs | 6 +- 8 files changed, 246 insertions(+), 205 deletions(-) diff --git a/README.md b/README.md index 6afb2256..51e1c1d6 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,8 @@ async fn main() -> Result<(), Error> { code: Some("false"), not: false, })) + .await? + .all_matches() .try_collect() .await?; @@ -75,7 +77,7 @@ async fn main() -> Result<(), Error> { } ``` -For more examples, see the [tests](https://github.com/FlixCoder/fhir-sdk/blob/main/crates/fhir-sdk/tests/client-r5.rs) or below. +For more examples, see the [tests](https://github.com/FlixCoder/fhir-sdk/blob/main/crates/fhir-sdk/tests/) or below. ## Development & Testing diff --git a/crates/fhir-sdk/src/client/fhir/crud.rs b/crates/fhir-sdk/src/client/fhir/crud.rs index 47751ac4..ed8c0291 100644 --- a/crates/fhir-sdk/src/client/fhir/crud.rs +++ b/crates/fhir-sdk/src/client/fhir/crud.rs @@ -1,7 +1,6 @@ //! FHIR CRUD API interactions. use fhir_model::{ParsedReference, WrongResourceType}; -use futures::{Stream, TryStreamExt}; use reqwest::{ header::{self, HeaderValue}, StatusCode, Url, @@ -10,13 +9,13 @@ use serde::{de::DeserializeOwned, Serialize}; use super::{ misc, - paging::Paged, + paging::Page, patch::{PatchViaFhir, PatchViaJson}, transaction::BatchTransaction, Client, Error, SearchParameters, }; use crate::{ - extensions::{AnyResource, BundleEntryExt, GenericResource, ReferenceExt, SearchEntryModeExt}, + extensions::{AnyResource, GenericResource, ReferenceExt}, version::FhirVersion, }; @@ -109,32 +108,25 @@ where Ok(resource) } - // TODO: Refactor to improve: - // - Do we want generic resource type instead of argument? - // - We might need/want paging? - /// Retrieve the history of a resource type or a specific resource. - pub async fn history( - &self, - resource_type: V::ResourceType, - id: Option<&str>, - ) -> Result { + /// Retrieve the history of the specified resource type or a specific resource. + pub async fn history(&self, id: Option<&str>) -> Result, Error> + where + R: AnyResource + TryFrom + 'static, + for<'a> &'a R: TryFrom<&'a V::Resource>, + { let url = { if let Some(id) = id { - self.url(&[resource_type.as_ref(), id, "_history"]) + self.url(&[R::TYPE_STR, id, "_history"]) } else { - self.url(&[resource_type.as_ref(), "_history"]) + self.url(&[R::TYPE_STR, "_history"]) } }; - let request = self - .0 - .client - .get(url) - .header(header::ACCEPT, V::MIME_TYPE) - .header(header::CONTENT_TYPE, V::MIME_TYPE); + let request = self.0.client.get(url).header(header::ACCEPT, V::MIME_TYPE); + let response = self.run_request(request).await?; if response.status().is_success() { - let resource: V::Bundle = response.json().await?; - Ok(resource) + let bundle: V::Bundle = response.json().await?; + Ok(Page::new(self.clone(), bundle)) } else { Err(Error::from_response::(response).await) } @@ -234,33 +226,55 @@ where } } - /// Search for any FHIR resources given the query parameters. - pub fn search_all( + /// Search for FHIR resources of any type given the query parameters. + pub async fn search_all( &self, queries: SearchParameters, - ) -> impl Stream> + Send + 'static { - let mut url = self.url(&[]); - url.query_pairs_mut().extend_pairs(queries.into_queries()).finish(); + ) -> Result, Error> { + // TODO: Use POST for long queries? + + let url = self.url(&[]); + let request = self + .0 + .client + .get(url) + .query(&queries.into_queries()) + .header(header::ACCEPT, V::MIME_TYPE); - Paged::new(self.clone(), url, |entry| { - entry.search_mode().map_or(true, SearchEntryModeExt::is_match) - }) + let response = self.run_request(request).await?; + if response.status().is_success() { + let bundle: V::Bundle = response.json().await?; + Ok(Page::new(self.clone(), bundle)) + } else { + Err(Error::from_response::(response).await) + } } /// Search for FHIR resources of a given type given the query parameters. /// This simply ignores resources of the wrong type, e.g. an additional /// OperationOutcome. - pub fn search + TryFrom>( - &self, - queries: SearchParameters, - ) -> impl Stream> + Send + 'static { - let mut url = self.url(&[R::TYPE_STR]); - url.query_pairs_mut().extend_pairs(queries.into_queries()).finish(); + pub async fn search(&self, queries: SearchParameters) -> Result, Error> + where + R: AnyResource + TryFrom + 'static, + for<'a> &'a R: TryFrom<&'a V::Resource>, + { + // TODO: Use POST for long queries? - Paged::new(self.clone(), url, |entry| { - entry.search_mode().map_or(true, SearchEntryModeExt::is_match) - }) - .try_filter_map(|resource| async move { Ok(R::try_from(resource).ok()) }) + let url = self.url(&[R::TYPE_STR]); + let request = self + .0 + .client + .get(url) + .query(&queries.into_queries()) + .header(header::ACCEPT, V::MIME_TYPE); + + let response = self.run_request(request).await?; + if response.status().is_success() { + let bundle: V::Bundle = response.json().await?; + Ok(Page::new(self.clone(), bundle)) + } else { + Err(Error::from_response::(response).await) + } } /// Begin building a patch request for a FHIR resource on the server via the diff --git a/crates/fhir-sdk/src/client/fhir/mod.rs b/crates/fhir-sdk/src/client/fhir/mod.rs index 86e3e794..8474bbc3 100644 --- a/crates/fhir-sdk/src/client/fhir/mod.rs +++ b/crates/fhir-sdk/src/client/fhir/mod.rs @@ -9,6 +9,7 @@ mod transaction; mod write; pub use self::{ + paging::Page, search_params::{ DateSearch, MissingSearch, NumberSearch, QuantitySearch, ReferenceSearch, StringSearch, TokenSearch, UriSearch, diff --git a/crates/fhir-sdk/src/client/fhir/paging.rs b/crates/fhir-sdk/src/client/fhir/paging.rs index c79e1c68..9847f37e 100644 --- a/crates/fhir-sdk/src/client/fhir/paging.rs +++ b/crates/fhir-sdk/src/client/fhir/paging.rs @@ -1,182 +1,188 @@ //! FHIR paging functionality, e.g. for search results. -use std::{collections::VecDeque, pin::Pin, task::Poll}; +use std::{any::type_name, fmt::Debug, marker::PhantomData}; -use futures::{future::BoxFuture, ready, FutureExt, Stream}; +use futures::{stream, Stream, StreamExt, TryStreamExt}; use reqwest::{StatusCode, Url}; -use serde::de::DeserializeOwned; use super::{Client, Error}; use crate::{ - extensions::{BundleEntryExt, BundleExt}, + extensions::{BundleEntryExt, BundleExt, SearchEntryModeExt}, version::FhirVersion, }; -/// Results of a query that can be paged or given via URL only. The resources -/// can be consumed via the `Stream`/`StreamExt` traits. -pub struct Paged { - /// The FHIR client to make further requests for the next pages. +/// Type alias for the `BundleEntry` type for any version. +type BundleEntry = <::Bundle as BundleExt>::Entry; + +/// Wrapper around `Bundle`s that have multiple pages of results, e.g. search results, resource +/// history, etc. +pub struct Page { + /// The FHIR client to make further requests for the next pages and resources. client: Client, - /// The URL of the next page. This is opaque and can be anything the server - /// wants. The client ensures it accesses the same server only. - next_url: Option, - /// The current set of entries cached. - entries: VecDeque<::Entry>, - /// Filter on Bundle entries, whether they should be included in the - /// results. - #[allow(clippy::type_complexity)] // Fine for now :D - filter: Box::Entry) -> bool + Send>, - /// Current future to retrieve a resource. - future_resource: Option>>, - /// Current future to retrieve the next page. - future_next_page: Option>>, -} + /// The inner Bundle result. + bundle: V::Bundle, -impl Paged { - /// Start up a new Paged stream. - pub(crate) fn new(client: Client, url: Url, filter: FilterFn) -> Self - where - FilterFn: FnMut(&::Entry) -> bool + Send + 'static, - { - let next_url = Some(url); - let filter = Box::new(filter); - - Self { - client, - next_url, - entries: VecDeque::new(), - filter, - future_resource: None, - future_next_page: None, - } - } + /// The resource type to return in matches. + _resource_type: PhantomData, } -impl Stream for Paged +impl Page where (StatusCode, V::OperationOutcome): Into, + R: TryFrom + Send + Sync + 'static, + for<'a> &'a R: TryFrom<&'a V::Resource>, { - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - let span = tracing::trace_span!("Paged::poll_next"); - let _span_guard = span.enter(); - - // Check on single resource future first to output the next resource. - if let Some(future_resource) = self.future_resource.as_mut() { - let item = ready!(future_resource.as_mut().poll(cx)); - self.future_resource = None; - tracing::trace!("Next `full_url` fetched resource ready"); - return Poll::Ready(Some(item)); - } - - // Otherwise check on next page future to save the next batch of entries. - if let Some(future_next_page) = self.future_next_page.as_mut() { - if let Poll::Ready(next_page) = future_next_page.as_mut().poll(cx) { - self.future_next_page = None; - tracing::trace!("Next page fetched and ready"); - - // Get the Bundle or error out. - let bundle = match next_page { - Ok(bundle) => bundle, - Err(err) => return Poll::Ready(Some(Err(err))), - }; - - // Parse the next page's URL or error out. - if let Some(next_url_string) = bundle.next_page_url() { - let Ok(next_url) = Url::parse(next_url_string) else { - tracing::error!("Could not parse next page URL"); - return Poll::Ready(Some(Err(Error::UrlParse(next_url_string.clone())))); - }; - self.next_url = Some(next_url); - } - - // Save the `BundleEntry`s. - self.entries.extend(bundle.into_entries()); - } - } - - // If there are not enough items in the queue, query the next page. - if self.entries.len() < 5 { - if let Some(next_page_url) = self.next_url.take() { - tracing::trace!("Less than 5 entries left, starting to fetch new page"); - self.future_next_page = - Some(fetch_resource(self.client.clone(), next_page_url).boxed()); - cx.waker().wake_by_ref(); - } - } - - // Then get the next item from the queue that matches the filter. - while let Some(entry) = self.entries.pop_front() { - if !(self.filter)(&entry) { - continue; - } - - let (full_url, resource) = entry.into_parts(); - if let Some(resource) = resource { - return Poll::Ready(Some(Ok(resource))); - } else if let Some(url) = full_url { - if let Ok(url) = Url::parse(&url) { - tracing::trace!("Next entry needs to be fetched, starting to fetch it"); - self.future_resource = Some(fetch_resource(self.client.clone(), url).boxed()); - cx.waker().wake_by_ref(); - return Poll::Pending; - } else { - tracing::error!("Could not parse next entry URL"); - return Poll::Ready(Some(Err(Error::UrlParse(url)))); - } - } - } - - // Else check if all resources were consumed or if we are waiting for new - // resources to arrive. - if self.future_next_page.is_some() { - tracing::trace!("Paged results waiting for next page fetch"); - Poll::Pending - } else if self.next_url.is_some() { - tracing::trace!("Paged results waiting for next URL fetch"); - cx.waker().wake_by_ref(); - Poll::Pending - } else { - tracing::trace!("Paged results exhausted"); - Poll::Ready(None) - } + /// Create a new `Page` result from a `Bundle` and client. + pub(crate) const fn new(client: Client, bundle: V::Bundle) -> Self { + Self { client, bundle, _resource_type: PhantomData } + } + + /// Get the next page URL, if there is one. + pub fn next_page_url(&self) -> Option<&String> { + self.bundle.next_page_url() + } + + /// Fetch the next page and return it. + pub async fn next_page(&self) -> Option> { + let next_page_url = self.next_page_url()?; + let url = match Url::parse(next_page_url) { + Ok(url) => url, + Err(_err) => return Some(Err(Error::UrlParse(next_page_url.clone()))), + }; + + tracing::debug!("Fetching next page from URL: {next_page_url}"); + let next_bundle = match self.client.read_generic::(url).await { + Ok(Some(bundle)) => bundle, + Ok(None) => return Some(Err(Error::ResourceNotFound(next_page_url.clone()))), + Err(err) => return Some(Err(err)), + }; + + Some(Ok(Self::new(self.client.clone(), next_bundle))) } - fn size_hint(&self) -> (usize, Option) { - if self.next_url.is_some() { - (self.entries.len(), None) - } else { - (self.entries.len(), Some(self.entries.len())) - } + /// Get the `total` field, indicating the total number of results. + pub fn total(&self) -> Option { + self.bundle.total() + } + + /// Get access to the inner `Bundle`. + pub const fn bundle(&self) -> &V::Bundle { + &self.bundle + } + + /// Consume the `Page` and return the inner `Bundle`. + pub fn into_inner(self) -> V::Bundle { + self.bundle + } + + /// Consumes the raw inner entries, leaving the page empty. Returns the entries. + pub fn take_entries(&mut self) -> Vec>> { + self.bundle.take_entries() + } + + /// Get the entries of this page, ignoring entries whenever there is no `resource` in the entry. + pub fn entries(&self) -> impl Iterator + Send { + self.bundle.entries().filter_map(|entry| entry.resource()) + } + + /// Get the matches of this page, ignoring entries whenever there is no `resource` in the entry + /// or resources of the wrong type. + pub fn matches(&self) -> impl Iterator + Send { + self.bundle + .entries() + .filter(|entry| entry.search_mode().is_some_and(SearchEntryModeExt::is_match)) + .filter_map(|entry| entry.resource()) + .filter_map(|resource| resource.try_into().ok()) + } + + /// Get the entries of this page, where the `fullUrl` is automatically resolved whenever there + /// is no `resource` in the entry. Consumes the entries, leaving the page empty. + pub fn entries_owned( + &mut self, + ) -> impl Stream> + Send + 'static { + let client = self.client.clone(); + stream::iter(self.take_entries().into_iter().flatten()) + .filter_map(move |entry| resolve_bundle_entry(entry, client.clone())) + } + + /// Get the matches of this page, where the `fullUrl` is automatically resolved whenever there + /// is no `resource` in the entry. Consumes the entries, leaving the page empty. Ignores entries + /// of the wrong resource type and entries without resource or full URL. + pub fn matches_owned(&mut self) -> impl Stream> + Send + 'static { + let client = self.client.clone(); + stream::iter( + self.take_entries() + .into_iter() + .flatten() + .filter(|entry| entry.search_mode().is_some_and(SearchEntryModeExt::is_match)), + ) + .filter_map(move |entry| resolve_bundle_entry(entry, client.clone())) + .try_filter_map(|resource| std::future::ready(Ok(resource.try_into().ok()))) + } + + /// Start automatic paging through all entries across pages. + /// + /// Hint: you can activate pre-fetching by [StreamExt::buffered]. + pub fn all_entries( + mut self, + ) -> impl Stream> + Send + 'static { + self.entries_owned() + .chain( + stream::once(async move { self.next_page().await }) + .filter_map(std::future::ready) + .map_ok(Self::all_entries) + .try_flatten(), + ) + .boxed() // Somehow gives error when using if not boxed? + } + + /// Start automatic paging through all matches across pages. + /// + /// Hint: you can activate pre-fetching by [StreamExt::buffered]. + pub fn all_matches(mut self) -> impl Stream> + Send + 'static { + self.matches_owned() + .chain( + stream::once(async move { self.next_page().await }) + .filter_map(std::future::ready) + .map_ok(Self::all_matches) + .try_flatten(), + ) + .boxed() // Somehow gives error when using if not boxed? } } -/// Query a resource from a given URL. -async fn fetch_resource( +/// Convert the bundle entry into a resource, resolving the `fullUrl` if there is no resource +/// inside. Returns `None` if there is neither resource nor full URL. +async fn resolve_bundle_entry( + entry: BundleEntry, client: Client, - url: Url, -) -> Result +) -> Option> where (StatusCode, V::OperationOutcome): Into, { - // Fetch a single resource from the given URL. - let resource = client.read_generic(url.clone()).await?; - resource.ok_or_else(|| Error::ResourceNotFound(url.to_string())) + if entry.resource().is_some() { + return entry.into_resource().map(Ok); + } + + let full_url = entry.full_url()?; + let url = match Url::parse(full_url) { + Ok(url) => url, + Err(_err) => return Some(Err(Error::UrlParse(full_url.clone()))), + }; + + let result = client + .read_generic::(url) + .await + .and_then(|opt| opt.ok_or_else(|| Error::ResourceNotFound(full_url.clone()))); + Some(result) } -impl std::fmt::Debug for Paged { +impl Debug for Page { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Paged") + f.debug_struct("Page") .field("client", &self.client) - .field("next_url", &self.next_url) - .field("entries", &self.entries) - .field("filter", &"_") - .field("future_resource", &self.future_resource.as_ref().map(|_| "_")) - .field("future_next_page", &self.future_next_page.as_ref().map(|_| "_")) + .field("bundle", &self.bundle) + .field("_resource_type", &type_name::()) .finish() } } diff --git a/crates/fhir-sdk/src/extensions/any_resource.rs b/crates/fhir-sdk/src/extensions/any_resource.rs index 18c52850..bb954248 100644 --- a/crates/fhir-sdk/src/extensions/any_resource.rs +++ b/crates/fhir-sdk/src/extensions/any_resource.rs @@ -7,7 +7,7 @@ use crate::version::{fhir_version, FhirVersion}; /// Basic trait to combine all resources from all FHIR versions to one. /// Especially for use in the client to accept and handly any resource. /// Only implemented if "builders" feature is active. -pub trait AnyResource { +pub trait AnyResource: Send + Sync { /// ResourceType of this resource. const TYPE: V::ResourceType; /// Resource type of the resource as `&str`. Must be valid for use in URLs. @@ -30,7 +30,7 @@ macro_rules! impl_any_resource { impl AnyResource for R where - R: $version::resources::NamedResource + $version::resources::BaseResource, + R: $version::resources::NamedResource + $version::resources::BaseResource + Send + Sync, { const TYPE: $version::resources::ResourceType = R::TYPE; const TYPE_STR: &'static str = R::TYPE.as_str(); diff --git a/crates/fhir-sdk/src/extensions/bundle.rs b/crates/fhir-sdk/src/extensions/bundle.rs index b54cbf47..d93ab302 100644 --- a/crates/fhir-sdk/src/extensions/bundle.rs +++ b/crates/fhir-sdk/src/extensions/bundle.rs @@ -26,10 +26,14 @@ pub trait BundleExt { /// See [Bundle::next_page_url]. fn next_page_url(&self) -> Option<&String>; + /// Get the total number of entries from the `total` field, if it exists. + fn total(&self) -> Option; /// Iterate over entries. - fn entries(&self) -> impl Iterator; + fn entries(&self) -> impl Iterator + Send; + /// Take ownership of the entries, removing them from the `Bundle`. + fn take_entries(&mut self) -> Vec>; /// Iterate over owned entries, consuming this Bundle. - fn into_entries(self) -> impl Iterator; + fn into_entries(self) -> impl Iterator + Send + 'static; /// Create a new `Bundle` of type batch. fn make_batch(entries: Vec>) -> Self; @@ -55,11 +59,19 @@ macro_rules! impl_bundle_ext { Bundle::next_page_url(self) } - fn entries(&self) -> impl Iterator { + fn total(&self) -> Option { + self.total + } + + fn entries(&self) -> impl Iterator + Send { self.0.entry.iter().flatten() } - fn into_entries(self) -> impl Iterator { + fn take_entries(&mut self) -> Vec> { + std::mem::take(&mut self.0.entry) + } + + fn into_entries(self) -> impl Iterator + Send + 'static { self.0.entry.into_iter().flatten() } @@ -125,14 +137,12 @@ pub trait BundleEntryExt { /// Get the search.mode field. fn search_mode(&self) -> Option<&Self::SearchEntryMode>; /// Get the full URL field. - #[allow(dead_code)] // For future use. fn full_url(&self) -> Option<&String>; /// Get the inner resource. - #[allow(dead_code)] // For future use. fn resource(&self) -> Option<&Self::Resource>; - /// Consume the entry and turn it into its relevant parts. - fn into_parts(self) -> (Option, Option); + /// Consume the entry and turn it into its inner resource. + fn into_resource(self) -> Option; /// Create a new empty `BundleEntry`. fn empty() -> Self; @@ -172,8 +182,8 @@ macro_rules! impl_bundle_entry_ext { self.resource.as_ref() } - fn into_parts(self) -> (Option, Option) { - (self.full_url, self.resource) + fn into_resource(self) -> Option { + self.resource } fn empty() -> Self { diff --git a/crates/fhir-sdk/tests/client-hapi.rs b/crates/fhir-sdk/tests/client-hapi.rs index 9552f9be..2219ac9c 100644 --- a/crates/fhir-sdk/tests/client-hapi.rs +++ b/crates/fhir-sdk/tests/client-hapi.rs @@ -268,6 +268,8 @@ macro_rules! impl_hapi_tests { not: false, }), ) + .await? + .all_matches() .try_collect() .await?; assert_eq!(patients.len(), 1); @@ -369,6 +371,8 @@ macro_rules! impl_hapi_tests { comparator: Some(SearchComparator::Eq), value: date, })) + .await? + .all_matches() .try_collect() .await?; assert_eq!(patients.len(), n); @@ -456,7 +460,7 @@ macro_rules! impl_hapi_tests { let mut patient = Patient::builder().language("history2".to_owned()).build().unwrap(); let second_patient_id = patient.create(&client).await?; - let bundle = client.history(ResourceType::Patient, None).await?; + let bundle = client.history::(None).await?.into_inner(); assert_eq!(bundle.r#type, BundleType::History); assert!(bundle.entry.len() >= 2); for id in &[first_patient_id, second_patient_id] { @@ -489,7 +493,7 @@ macro_rules! impl_hapi_tests { patient.clone().delete(&client).await?; - let bundle = client.history(ResourceType::Patient, patient.id.as_deref()).await?; + let bundle = client.history::(patient.id.as_deref()).await?.into_inner(); assert_eq!(bundle.r#type, BundleType::History); assert_eq!(bundle.entry.len(), 3); diff --git a/crates/fhir-sdk/tests/client-medplum-r4.rs b/crates/fhir-sdk/tests/client-medplum-r4.rs index c08f6488..deb406ea 100644 --- a/crates/fhir-sdk/tests/client-medplum-r4.rs +++ b/crates/fhir-sdk/tests/client-medplum-r4.rs @@ -216,6 +216,8 @@ async fn search_inner() -> Result<()> { not: false, }), ) + .await? + .all_matches() .try_collect() .await?; assert_eq!(patients.len(), 1); @@ -323,6 +325,8 @@ async fn paging_inner() -> Result<()> { comparator: Some(SearchComparator::Eq), value: date, })) + .await? + .all_matches() .try_collect() .await?; let patients_len = patients.len(); @@ -354,7 +358,7 @@ async fn history_with_id_inner() -> Result<()> { patient.clone().delete(&client).await?; - let bundle = client.history(ResourceType::Patient, patient.id.as_deref()).await?; + let bundle = client.history::(patient.id.as_deref()).await?.into_inner(); assert_eq!(bundle.r#type, BundleType::History); assert_eq!(bundle.entry.len(), 3);