Skip to content

Commit

Permalink
feat: tester
Browse files Browse the repository at this point in the history
  • Loading branch information
dbcfd committed Feb 5, 2024
1 parent acc6109 commit 8a94f87
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 15 deletions.
103 changes: 100 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"schema",
"util",
"checkpointer",
"tester",
"fluence/src/services/event_joiner/modules/event_joiner"
]

Expand Down
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ build:
# Build with all features
$(CARGO) build --locked --release --all-features

.PHONY: fluence-build
fluence-build:
# Build with default features
fluence build

.PHONY: release
release: RUSTFLAGS += -D warnings
release:
Expand All @@ -55,7 +60,7 @@ test:
$(CARGO) test -p checkpointer --locked --release --all-features

.PHONY: test-event-joiner
test:
test-event-joiner:
# Setup scaffolding
IT_TEST_CHECKPOINTER=1 ./ci-scripts/setup_test_env.sh
# Test with default features
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ anyhow = "1.0.79"
#ceramic-http-client = { git = "https://github.com/3box/ceramic-http-client-rs.git", branch = "main", default-features = false, features = ["url"] }
#ceramic-http-client = { path = "/Users/dbrowning/code/3box/ceramic-http-client-rs", default-features = false }
ceramic-http-client = { git = "https://github.com/3box/ceramic-http-client-rs.git", branch = "feat/wasi", default-features = false }
env_logger = "0.11.1"
httparse = "1.8.0"
log = "0.4.20"
marine-rs-sdk = "0.10.1"
schema = { path = "../../../../../../schema" }
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
url.workspace = true
util = { path = "../../../../../../util" }
wasm-rs-async-executor = "0.9.0"

[dev-dependencies]
Expand Down
25 changes: 16 additions & 9 deletions fluence/src/services/event_joiner/modules/event_joiner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use schema::Event;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use httparse::Header;
use tracing::*;
use log::*;
use url::Url;
use wasm_rs_async_executor::single_threaded as executor;

Expand Down Expand Up @@ -48,15 +48,15 @@ pub struct ExecutionConfig {

#[marine]
pub struct SseResponse {
error: String,
events: u32,
pub error: String,
pub events: u32,
}

pub fn main() {}

#[marine]
pub fn process_events(cfg: ExecutionConfig) -> SseResponse {
let _guard = util::init_tracing();
let _ = env_logger::try_init();

match executor::block_on(try_process_events(cfg)) {
Ok(res) => res,
Expand Down Expand Up @@ -96,7 +96,7 @@ async fn try_process_events(cfg: ExecutionConfig) -> Result<SseResponse, anyhow:
let (resp, _) = parse_http_response(&res, &mut headers)?;
match resp.code {
Some(200) | Some(409) => {
debug!("Consumption started");
info!("Consumption started");
}
_ => {
anyhow::bail!("Error({:?}): {:?}", resp.code, resp.reason);
Expand All @@ -111,6 +111,7 @@ async fn try_process_events(cfg: ExecutionConfig) -> Result<SseResponse, anyhow:
loop {
let res = curl(cmd.clone());
let events: Vec<Event> = from_http_response(res)?;
info!("Received {} ceramic events", events.len());
for event in events {
process_event(&cfg, event).await?;
events_processed += 1;
Expand Down Expand Up @@ -196,10 +197,16 @@ mod tests {
fn test_get_events(iface: marine_test_env::event_joiner::ModuleInterface) {
let cfg = marine_test_env::event_joiner::ExecutionConfig {
client_id: "client".to_string(),
public_key: "did".to_string(),
private_key: "key".to_string(),
checkpointer_endpoint: "http://localhost:8080".to_string(),
ceramic_endpoint: "http://localhost:7007".to_string(),
public_key: std::env::var("DID_DOCUMENT").unwrap_or_else(|_| {
"did:key:z6MkeqCTPhHPVg3HaAAtsR7vZ6FXkAHPXEbTJs7Y4CQABV9Z".to_string()
}),
private_key: std::env::var("DID_PRIVATE_KEY").unwrap(),
checkpointer_endpoint: std::env::var("CHECKPOINTER_URL").unwrap_or_else(|_| {
"http://localhost:8080".to_string()
}),
ceramic_endpoint: std::env::var("CERAMIC_URL").unwrap_or_else(|_| {
"http://localhost:8080".to_string()
}),
depin_stream_id: "depin".to_string(),
proof_of_data_stream_id: "proof".to_string(),
};
Expand Down
17 changes: 17 additions & 0 deletions tester/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "tester"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0.79"
#ceramic-http-client = { git = "https://github.com/3box/ceramic-http-client-rs.git", branch = "main", default-features = false, features = ["url"] }
#ceramic-http-client = { path = "/Users/dbrowning/code/3box/ceramic-http-client-rs", default-features = false }
ceramic-http-client = { git = "https://github.com/3box/ceramic-http-client-rs.git", branch = "feat/wasi" }
serde.workspace = true
serde_json.workspace = true
tokio = { version = "1.36.0", default-features = false, features = ["macros", "rt-multi-thread"] }
url.workspace = true
util = { path = "../util" }
55 changes: 55 additions & 0 deletions tester/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use ceramic_http_client::{json_patch, schemars::{self, JsonSchema}, ceramic_event::{DidDocument, JwkSigner}, ModelDefinition, ModelAccountRelation, GetRootSchema};
use ceramic_http_client::json_patch::ReplaceOperation;
use ceramic_http_client::remote::CeramicRemoteHttpClient;
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
#[schemars(rename_all = "camelCase", deny_unknown_fields)]
struct AttendedEvent1 {
controller: String,
jwt: String,
}

impl GetRootSchema for AttendedEvent1 {}

#[derive(Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
#[schemars(rename_all = "camelCase", deny_unknown_fields)]
struct AttendedEvent2 {
controller: String,
jwt: String,
}

impl GetRootSchema for AttendedEvent2 {}

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let _ = util::init_tracing();

let did = std::env::var("DID_DOCUMENT").unwrap_or_else(|_| {
"did:key:z6MkeqCTPhHPVg3HaAAtsR7vZ6FXkAHPXEbTJs7Y4CQABV9Z".to_string()
});
let signer = JwkSigner::new(
DidDocument::new(&did),
&std::env::var("DID_PRIVATE_KEY").unwrap(),
)
.await?;

let url = url::Url::parse("http://localhost:7007").unwrap();
let client = CeramicRemoteHttpClient::new(signer, url);

let event1_model = ModelDefinition::new::<AttendedEvent1>("AttendedEvent1", ModelAccountRelation::Single)?;
let event1_model = client.create_model(&event1_model).await?;
let event2_model = ModelDefinition::new::<AttendedEvent1>("AttendedEvent2", ModelAccountRelation::Single)?;
let event2_model = client.create_model(&event2_model).await?;

let event1 = client.create_single_instance(&event1_model).await?;
let patch = json_patch::Patch(vec![json_patch::PatchOperation::Replace(
ReplaceOperation {
path: "controller".to_string(),
value: serde_json::Value::String(did.clone()),
},
)]);
client.update(&event1_model, &event1, patch).await.unwrap();

Ok(())
}

0 comments on commit 8a94f87

Please sign in to comment.