Skip to content

Commit

Permalink
feat: add node_status and list_indexed_models methods
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Aug 31, 2023
1 parent 991d5ac commit be4b848
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 0 deletions.
134 changes: 134 additions & 0 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,18 @@ pub struct IndexModelData {
pub models: Vec<ModelData>,
}

/// Request to list indexed models
#[derive(Serialize)]
pub struct ListIndexedModelsRequest {}

/// Response list of indexed models
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ListIndexedModelsResponse {
/// List of indexed models
pub models: Vec<StreamId>,
}

/// Response from call to admin api /getCode
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -202,6 +214,13 @@ pub struct AdminApiRequest {
jws: String,
}

impl AdminApiRequest {
/// JWS Compact Serialization string.
pub fn jws(&self) -> &str {
self.jws.as_ref()
}
}

impl TryFrom<Jws> for AdminApiRequest {
type Error = anyhow::Error;
fn try_from(value: Jws) -> Result<Self, Self::Error> {
Expand Down Expand Up @@ -333,6 +352,121 @@ pub struct TypedQueryResponse<T> {
#[derive(Serialize)]
pub struct HealthcheckRequest {}

/// Node status request for http api
#[derive(Serialize)]
pub struct NodeStatusRequest {}

/// Node status response for http api
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NodeStatusResponse {
/// A random UUID that is generated each time a node starts up.
/// Can be used to detect when a node restarts.
pub run_id: String,
/// How long the node has been running.
pub uptime_ms: i64,
/// The Ceramic network the node is connected to.
pub network: String,
/// Information about the anchoring service.
pub anchor: AnchorStatus,
/// Information about the connected IPFS node.
pub ipfs: IpfsStatus,
/// Information about the ComposeDB operations.
pub compose_db: Option<ComposeDBStatus>,
}

/// Information about the anchoring service.
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct AnchorStatus {
/// The URL of the Ceramic Anchor Service used to request anchors.
pub anchor_service_url: String,
/// The ethereum rpc endpoint used to validate anchor transactions. If null, likely means
/// the node is using the default, rate-limited ethereum provider.
pub ethereum_rpc_endpoint: Option<String>,
/// The ethereum chainId used for anchors.
pub chain_id: String,
}

/// Information about the connected IPFS node.
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct IpfsStatus {
/// PeerId of the connected ipfs node
pub peer_id: String,
/// IPFS Swarm multiaddrs of the connected ipfs node
pub addresses: Vec<String>,
}

/// Status about the ComposeDB specific operations of the node.
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ComposeDBStatus {
/// The list of models Ids that are being indexed.
pub indexed_models: Vec<String>,
/// The set of active sync operations.
pub syncs: Option<SyncStatus>,
}

/// Status of all sync operations.
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SyncStatus {
/// Status of currently active sync operations.
pub active_syncs: Vec<ActiveSyncStatus>,
/// Status of continuously running sync operations.
pub continuous_sync: Vec<ContinuousSyncStatus>,
/// Status of pending sync operations.
pub pending_syncs: Vec<PendingSyncStatus>,
}

/// Status of currently active sync operations.
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ActiveSyncStatus {
/// The block the sync starts at
pub start_block: i32,
/// The block the sync is currently processing
pub current_block: i32,
/// The block the sync will end on
pub end_block: i32,
/// Models that are being synced
pub models: Vec<StreamId>,
/// Date when the sync was requested
pub created_at: String,
/// Date when the sync started
pub started_at: String,
}

/// Status of continuously running sync operations.
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ContinuousSyncStatus {
/// The first block recevied form the chain on node startup
pub start_block: i32,
/// The latest block received from the chain
pub latest_block: i32,
/// The number of blocks we wait for before we process a block
pub confirmations: i32,
/// The block we are currently processing (should be latestBlock - confirmations)
pub current_block: i32,
/// Models that are being synced
pub models: Vec<StreamId>,
}
/// Status of pending sync operations.
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PendingSyncStatus {
/// The block the sync starts at
pub start_block: i32,
/// The block the sync will end on
pub end_block: i32,
/// Models that are being synced
pub models: Vec<StreamId>,
/// Date when the sync was requested
pub created_at: String,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
89 changes: 89 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,21 @@ impl<S: Signer> CeramicHttpClient<S> {
pub fn index_endpoint(&self) -> &'static str {
"/api/v0/admin/modelData"
}
/// Get the models endpoint
pub fn models_endpoint(&self) -> &'static str {
"/api/v0/admin/models"
}

/// Get the healthcheck endpoint
pub fn healthcheck_endpoint(&self) -> &'static str {
"/api/v0/node/healthcheck"
}

/// Get the status endpoint
pub fn node_status_endpoint(&self) -> &'static str {
"/api/v0/admin/status"
}

/// Create a serde compatible request for model creation
pub async fn create_model_request(
&self,
Expand Down Expand Up @@ -118,6 +127,21 @@ impl<S: Signer> CeramicHttpClient<S> {
api::AdminApiRequest::try_from(jws)
}

/// Create a serde compatible request for listing indexed models
pub async fn create_list_indexed_models_request(
&self,
code: &str,
) -> anyhow::Result<api::AdminApiRequest> {
let data = api::ListIndexedModelsRequest {};
let req = api::AdminApiPayload {
code: code.to_string(),
request_path: self.models_endpoint().to_string(),
request_body: data,
};
let jws = Jws::for_data(&self.signer, &req).await?;
api::AdminApiRequest::try_from(jws)
}

/// Create a serde compatible request for a single instance per account creation of a model
pub async fn create_single_instance_request(
&self,
Expand Down Expand Up @@ -249,6 +273,20 @@ impl<S: Signer> CeramicHttpClient<S> {
pub async fn create_healthcheck_request(&self) -> anyhow::Result<api::HealthcheckRequest> {
Ok(api::HealthcheckRequest {})
}
/// Create a serde compatible request for the node status
pub async fn create_node_status_request(
&self,
code: &str,
) -> anyhow::Result<api::AdminApiRequest> {
let data = api::NodeStatusRequest {};
let req = api::AdminApiPayload {
code: code.to_string(),
request_path: self.node_status_endpoint().to_string(),
request_body: data,
};
let jws = Jws::for_data(&self.signer, &req).await?;
api::AdminApiRequest::try_from(jws)
}
}

/// Remote HTTP Functionality
Expand Down Expand Up @@ -329,6 +367,33 @@ pub mod remote {
}
}

/// List indexed models on the remote ceramic
pub async fn list_indexed_models(&self) -> anyhow::Result<api::ListIndexedModelsResponse> {
let resp: api::AdminCodeResponse = self
.remote
.get(self.url_for_path(self.cli.admin_code_endpoint())?)
.send()
.await?
.json()
.await?;
let req = self
.cli
.create_list_indexed_models_request(&resp.code)
.await?;
let resp = self
.remote
.get(self.url_for_path(self.cli.models_endpoint())?)
.header(
reqwest::header::AUTHORIZATION,
format!("Basic {}", req.jws()),
)
.send()
.await?
.json()
.await?;
Ok(resp)
}

/// Create an instance of a model that allows a single instance on the remote ceramic
pub async fn create_single_instance(
&self,
Expand Down Expand Up @@ -487,6 +552,30 @@ pub mod remote {
.await?;
Ok(resp)
}

/// Get the node status
pub async fn node_status(&self) -> anyhow::Result<api::NodeStatusResponse> {
let resp: api::AdminCodeResponse = self
.remote
.get(self.url_for_path(self.cli.admin_code_endpoint())?)
.send()
.await?
.json()
.await?;
let req = self.cli.create_node_status_request(&resp.code).await?;
let resp = self
.remote
.get(self.url_for_path(self.cli.node_status_endpoint())?)
.header(
reqwest::header::AUTHORIZATION,
format!("Basic {}", req.jws()),
)
.send()
.await?
.json()
.await?;
Ok(resp)
}
}
}

Expand Down

0 comments on commit be4b848

Please sign in to comment.