Skip to content

Commit

Permalink
chore: fmt, exit script properly
Browse files Browse the repository at this point in the history
  • Loading branch information
dbcfd committed Feb 5, 2024
1 parent 8a94f87 commit 9ced3dc
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 53 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ RUN --mount=type=cache,target=/home/builder/.cargo \
make $BUILD_MODE && \
cp ./target/release/checkpointer ./bin

FROM debian:bookworm-slim
FROM --platform=linux/amd64 debian:bookworm-slim

COPY --from=builder /home/builder/checkpointer/bin/* /usr/bin

Expand Down
39 changes: 21 additions & 18 deletions checkpointer/src/batcher.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::errors::Error;
use schema::Event;
use futures_util::StreamExt;
use reqwest_eventsource::{Event as SseEvent, EventSource};
use schema::Event;
use serde::Deserialize;
use std::collections::{HashMap, HashSet};
use std::ops::DerefMut;
Expand Down Expand Up @@ -71,7 +71,7 @@ impl Batcher {
params.client_id.clone(),
BatchCreationRequest { params, tx },
);

rx.await.map_err(Error::Recv)?
}

Expand All @@ -97,9 +97,7 @@ impl Batcher {
loop {
let mut streams_to_process = std::mem::take(&mut streams);
for (k, mut batcher) in streams_to_process.drain() {
let entry = outstanding_events
.entry(k.clone())
.or_default();
let entry = outstanding_events.entry(k.clone()).or_default();

loop {
match batcher.rx.try_recv() {
Expand All @@ -126,7 +124,12 @@ impl Batcher {
for (client_id, req) in create_requests.into_iter() {
let (tx, rx) = tokio::sync::mpsc::channel(100);
let shutdown = Arc::new(AtomicBool::new(false));
tokio::spawn(run_event_source(self.ceramic_url.clone(), req.params, shutdown.clone(), tx));
tokio::spawn(run_event_source(
self.ceramic_url.clone(),
req.params,
shutdown.clone(),
tx,
));
streams.insert(client_id, RunningEventSource { rx, shutdown });
req.tx.send(Ok(())).unwrap();
}
Expand Down Expand Up @@ -213,18 +216,15 @@ async fn run_event_source(

#[cfg(test)]
mod tests {
use super::{BatchCreationParameters, Batcher};
use ceramic_http_client::{
ceramic_event::{DidDocument, JwkSigner, Signer, StreamId},
json_patch,
remote,
schemars,
GetRootSchema, ModelAccountRelation, ModelDefinition
json_patch, remote, schemars, GetRootSchema, ModelAccountRelation, ModelDefinition,
};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use schema::CeramicMetadata;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use super::{BatchCreationParameters, Batcher};
use std::time::Duration;

// See https://github.com/ajv-validator/ajv-formats for information on valid formats
#[derive(Debug, Deserialize, Eq, schemars::JsonSchema, PartialEq, Serialize)]
Expand Down Expand Up @@ -253,8 +253,8 @@ mod tests {
DidDocument::new(&s),
&std::env::var("DID_PRIVATE_KEY").unwrap(),
)
.await
.unwrap()
.await
.unwrap()
}

pub async fn create_model(cli: &remote::CeramicRemoteHttpClient<JwkSigner>) -> StreamId {
Expand All @@ -268,9 +268,12 @@ mod tests {

let batcher = Batcher::new().unwrap();
let client_id = "test";
batcher.create_batcher(BatchCreationParameters {
client_id: client_id.to_string(),
}).await.unwrap();
batcher
.create_batcher(BatchCreationParameters {
client_id: client_id.to_string(),
})
.await
.unwrap();

let ceramic = remote::CeramicRemoteHttpClient::new(signer().await, ceramic_url());
let model = create_model(&ceramic).await;
Expand Down
2 changes: 1 addition & 1 deletion checkpointer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,4 @@ async fn main() -> Result<(), Error> {
.run()
.await?;
Ok(())
}
}
7 changes: 7 additions & 0 deletions ci-scripts/setup_test_env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,15 @@ if [ -z "$IT_TEST_CHECKPOINTER" ]; then
echo "Starting Checkpointer"
docker compose -f it/docker-compose.yml up -d checkpointer

count=0
while [ $(curl -s -o /dev/null -I -w "%{http_code}" "http://localhost:8080/api/v1/healthcheck") -ne "200" ]; do
count=$((count+1))
echo "Checkpointer is not yet ready, waiting and trying again"
sleep 1
if [ $count -eq 30 ]; then
docker logs it-checkpointer-1
echo "Checkpointer failed to start"
exit 1
fi
done
fi
68 changes: 43 additions & 25 deletions fluence/src/services/event_joiner/modules/event_joiner/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use std::collections::HashMap;
use ceramic_http_client::{schemars, CeramicHttpClient, ceramic_event::{JwkSigner, StreamId}, OperationFilter, FilterQuery, api};
use ceramic_http_client::api::Pagination;
use ceramic_http_client::ceramic_event::DidDocument;
use ceramic_http_client::schemars::JsonSchema;
use ceramic_http_client::{
api,
ceramic_event::{JwkSigner, StreamId},
schemars, CeramicHttpClient, FilterQuery, OperationFilter,
};
use httparse::Header;
use log::*;
use marine_rs_sdk::{marine, MountedBinaryStringResult};
use schema::Event;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::str::FromStr;
use httparse::Header;
use log::*;
use url::Url;
use wasm_rs_async_executor::single_threaded as executor;

Expand Down Expand Up @@ -62,7 +66,10 @@ pub fn process_events(cfg: ExecutionConfig) -> SseResponse {
Ok(res) => res,
Err(e) => {
error!("Error processing events: {}", e);
SseResponse { error: e.to_string(), events: 0 }
SseResponse {
error: e.to_string(),
events: 0,
}
}
}
}
Expand All @@ -72,10 +79,7 @@ async fn try_process_events(cfg: ExecutionConfig) -> Result<SseResponse, anyhow:
let checkpointer_endpoint = Url::parse(&cfg.checkpointer_endpoint)?;
let client_id = cfg.client_id;

let signer = JwkSigner::new(
DidDocument::new(&cfg.public_key),
&cfg.private_key,
).await?;
let signer = JwkSigner::new(DidDocument::new(&cfg.public_key), &cfg.private_key).await?;

let cfg = ProcessConfig {
ceramic_endpoint,
Expand Down Expand Up @@ -104,9 +108,9 @@ async fn try_process_events(cfg: ExecutionConfig) -> Result<SseResponse, anyhow:
}
let now = std::time::Instant::now();
let mut events_processed = 0u32;
let cmd = vec![
checkpointer_endpoint.join(&format!("/batch/{}", client_id))?.to_string(),
];
let cmd = vec![checkpointer_endpoint
.join(&format!("/batch/{}", client_id))?
.to_string()];

loop {
let res = curl(cmd.clone());
Expand All @@ -126,7 +130,10 @@ async fn try_process_events(cfg: ExecutionConfig) -> Result<SseResponse, anyhow:
})
}

fn parse_http_response<'a>(res: &'a MountedBinaryStringResult, headers: &'a mut [Header<'a>]) -> Result<(httparse::Response<'a, 'a>, &'a[u8]), anyhow::Error> {
fn parse_http_response<'a>(
res: &'a MountedBinaryStringResult,
headers: &'a mut [Header<'a>],
) -> Result<(httparse::Response<'a, 'a>, &'a [u8]), anyhow::Error> {
info!("Response: {}", res.stdout);
let mut resp = httparse::Response::new(headers);
let bytes = res.stdout.as_bytes();
Expand All @@ -135,7 +142,9 @@ fn parse_http_response<'a>(res: &'a MountedBinaryStringResult, headers: &'a mut
Ok((resp, rest))
}

fn from_http_response<T: serde::de::DeserializeOwned>(res: MountedBinaryStringResult) -> Result<T, anyhow::Error> {
fn from_http_response<T: serde::de::DeserializeOwned>(
res: MountedBinaryStringResult,
) -> Result<T, anyhow::Error> {
let mut headers = [httparse::EMPTY_HEADER; 64];
let (resp, rest) = parse_http_response(&res, &mut headers)?;
if let Some(200) = resp.code {
Expand All @@ -155,7 +164,10 @@ struct ProcessConfig {
async fn process_event(cfg: &ProcessConfig, event: Event) -> Result<(), anyhow::Error> {
info!("Processing event: {:?}", event);
if let Ok(meta) = serde_json::from_value::<schema::CeramicMetadata>(event.metadata) {
let controller = meta.controllers.first().ok_or_else(|| anyhow::anyhow!("No controller"))?;
let controller = meta
.controllers
.first()
.ok_or_else(|| anyhow::anyhow!("No controller"))?;
let depin_event = query_event(cfg, &cfg.depin_stream_id, controller).await?;
let data_event = query_event(cfg, &cfg.proof_of_data_stream_id, controller).await?;
if let (Some(depin), Some(data)) = (depin_event, data_event) {
Expand All @@ -165,11 +177,19 @@ async fn process_event(cfg: &ProcessConfig, event: Event) -> Result<(), anyhow::
Ok(())
}

async fn query_event(cfg: &ProcessConfig, model_id: &StreamId, controller: &str) -> Result<Option<EventAttendance>, anyhow::Error> {
async fn query_event(
cfg: &ProcessConfig,
model_id: &StreamId,
controller: &str,
) -> Result<Option<EventAttendance>, anyhow::Error> {
let mut where_filter = HashMap::new();
where_filter.insert("controller".to_string(), OperationFilter::EqualTo(controller.into()));
where_filter.insert(
"controller".to_string(),
OperationFilter::EqualTo(controller.into()),
);
let filter = FilterQuery::Where(where_filter);
let req = cfg.cli
let req = cfg
.cli
.create_query_request(model_id, Some(filter), Pagination::default())
.await?;
let endpoint = cfg.ceramic_endpoint.join(cfg.cli.collection_endpoint())?;
Expand Down Expand Up @@ -201,16 +221,14 @@ mod tests {
"did:key:z6MkeqCTPhHPVg3HaAAtsR7vZ6FXkAHPXEbTJs7Y4CQABV9Z".to_string()
}),
private_key: std::env::var("DID_PRIVATE_KEY").unwrap(),
checkpointer_endpoint: std::env::var("CHECKPOINTER_URL").unwrap_or_else(|_| {
"http://localhost:8080".to_string()
}),
ceramic_endpoint: std::env::var("CERAMIC_URL").unwrap_or_else(|_| {
"http://localhost:8080".to_string()
}),
checkpointer_endpoint: std::env::var("CHECKPOINTER_URL")
.unwrap_or_else(|_| "http://localhost:8080".to_string()),
ceramic_endpoint: std::env::var("CERAMIC_URL")
.unwrap_or_else(|_| "http://localhost:8080".to_string()),
depin_stream_id: "depin".to_string(),
proof_of_data_stream_id: "proof".to_string(),
};
let greeting = iface.process_events(cfg);
assert!(greeting.error.is_empty());
}
}
}
20 changes: 13 additions & 7 deletions tester/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use ceramic_http_client::{json_patch, schemars::{self, JsonSchema}, ceramic_event::{DidDocument, JwkSigner}, ModelDefinition, ModelAccountRelation, GetRootSchema};
use ceramic_http_client::json_patch::ReplaceOperation;
use ceramic_http_client::remote::CeramicRemoteHttpClient;
use ceramic_http_client::{
ceramic_event::{DidDocument, JwkSigner},
json_patch,
schemars::{self, JsonSchema},
GetRootSchema, ModelAccountRelation, ModelDefinition,
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
Expand All @@ -25,21 +30,22 @@ impl GetRootSchema for AttendedEvent2 {}
async fn main() -> Result<(), anyhow::Error> {
let _ = util::init_tracing();

let did = std::env::var("DID_DOCUMENT").unwrap_or_else(|_| {
"did:key:z6MkeqCTPhHPVg3HaAAtsR7vZ6FXkAHPXEbTJs7Y4CQABV9Z".to_string()
});
let did = std::env::var("DID_DOCUMENT")
.unwrap_or_else(|_| "did:key:z6MkeqCTPhHPVg3HaAAtsR7vZ6FXkAHPXEbTJs7Y4CQABV9Z".to_string());
let signer = JwkSigner::new(
DidDocument::new(&did),
&std::env::var("DID_PRIVATE_KEY").unwrap(),
)
.await?;
.await?;

let url = url::Url::parse("http://localhost:7007").unwrap();
let client = CeramicRemoteHttpClient::new(signer, url);

let event1_model = ModelDefinition::new::<AttendedEvent1>("AttendedEvent1", ModelAccountRelation::Single)?;
let event1_model =
ModelDefinition::new::<AttendedEvent1>("AttendedEvent1", ModelAccountRelation::Single)?;
let event1_model = client.create_model(&event1_model).await?;
let event2_model = ModelDefinition::new::<AttendedEvent1>("AttendedEvent2", ModelAccountRelation::Single)?;
let event2_model =
ModelDefinition::new::<AttendedEvent1>("AttendedEvent2", ModelAccountRelation::Single)?;
let event2_model = client.create_model(&event2_model).await?;

let event1 = client.create_single_instance(&event1_model).await?;
Expand Down
1 change: 0 additions & 1 deletion util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,3 @@ pub fn init_tracing() -> tracing_appender::non_blocking::WorkerGuard {
}
guard
}

0 comments on commit 9ced3dc

Please sign in to comment.