Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: use "real" car files for events in scenarios #171

Merged
merged 3 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
335 changes: 193 additions & 142 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ env_logger = "0.10.0"
expect-patch = { path = "./expect-patch/" }
hex = "0.4.3"
keramik-common = { path = "./common/", default-features = false }
multiaddr = "0.17"
multiaddr = "0.18"
multibase = "0.9.1"
multihash = "0.18"
multihash = { version = "0.19" }
multihash-codetable = { version = "0.1", features = ["sha2", "sha3"] }
# multihash-derive = { version = "0.9" }
opentelemetry = { version = "0.21", features = ["metrics", "trace"] }
opentelemetry-otlp = { version = "0.14", features = [
"metrics",
Expand Down
1 change: 1 addition & 0 deletions operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ kube = { version = "0.88", features = [
multiaddr = { workspace = true, optional = true }
multibase = { workspace = true, optional = true }
multihash = { workspace = true, optional = true }
libp2p-identity = "0.2"
opentelemetry = { workspace = true, optional = true }
rand = { version = "0.8.5" }
reqwest = { workspace = true, optional = true }
Expand Down
8 changes: 4 additions & 4 deletions operator/src/network/ipfs_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ impl IpfsRpcClient for HttpRpcClient {
addresses: Vec<String>,
}
let data: Response = resp.json().await?;

let p2p_proto = Protocol::P2p(Multihash::from_bytes(
&multibase::Base::Base58Btc.decode(data.id.clone())?,
)?);
let hash = Multihash::from_bytes(&multibase::Base::Base58Btc.decode(data.id.clone())?)?;
let peer_id = libp2p_identity::PeerId::from_multihash(hash)
.map_err(|e| anyhow!("failed to build multiash: {:?}", e))?;
let p2p_proto = Protocol::P2p(peer_id);
// We expect to find at least one non loop back address
let p2p_addrs = data
.addresses
Expand Down
8 changes: 7 additions & 1 deletion runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ did-method-key = "0.2"
goose = { version = "0.16", features = ["gaggle"] }
hex.workspace = true
keramik-common = { workspace = true, features = ["telemetry", "tokio-console"] }
libipld = { version = "0.16.0", features = ["serde-codec"] }
ipld-core = "0.4"
ipld-dagpb = "0.2"
multihash.workspace = true
multihash-codetable.workspace = true
opentelemetry.workspace = true
rand = "0.8.5"
redis = { version = "0.24", features = ["tokio-comp"] }
reqwest.workspace = true
serde = { version = "1.0", features = ["derive"] }
serde_ipld_dagcbor = "0.6"
serde_ipld_dagjson = "0.2"
schemars.workspace = true
serde_json.workspace = true
tokio.workspace = true
Expand All @@ -35,5 +39,7 @@ uuid = { version = "1.6.1", features = ["v4"] }
chrono = "0.4.31"
ed25519-dalek = "2.1"

unsigned-varint = "0.8" # temporary until we can use our http client updated for new c1

[dev-dependencies]
test-log = "0.2"
60 changes: 12 additions & 48 deletions runner/src/scenario/ceramic/anchor.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
use anyhow::Result;
use base64::{engine::general_purpose, Engine};
use ceramic_http_client::ceramic_event::{
Cid, DidDocument, JwkSigner, Jws, StreamId, StreamIdType,
};
use ceramic_core::{Cid, DagCborEncoded};
use ceramic_http_client::ceramic_event::{DidDocument, JwkSigner, Jws, StreamId};
use chrono::Utc;
use goose::prelude::*;
use ipld_core::ipld;
use iroh_car::{CarHeader, CarWriter};
use libipld::{cbor::DagCborCodec, ipld, prelude::Codec, Ipld, IpldCodec};
use multihash::{Code::Sha2_256, MultihashDigest};
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use multihash_codetable::{Code, MultihashDigest};

use redis::{aio::MultiplexedConnection, AsyncCommands};
use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, sync::Arc};
use std::sync::Arc;
use tokio::sync::Mutex;
use uuid::Uuid;

use crate::scenario::get_redis_client;
use crate::scenario::{get_redis_client, util::DAG_CBOR_CODEC};

#[derive(Serialize, Deserialize)]
struct CasAuthPayload {
Expand Down Expand Up @@ -59,8 +57,8 @@ pub async fn stream_tip_car(
"tip": genesis_cid,
});

let ipld_bytes = DagCborCodec.encode(&root_block)?;
let root_cid = Cid::new_v1(IpldCodec::DagCbor.into(), Sha2_256.digest(&ipld_bytes));
let ipld_bytes = DagCborEncoded::new(&root_block)?;
let root_cid = Cid::new_v1(DAG_CBOR_CODEC, Code::Sha2_256.digest(ipld_bytes.as_ref()));
let car_header = CarHeader::new_v1(vec![root_cid]);
let mut car_writer = CarWriter::new(car_header, Vec::new());
car_writer.write(root_cid, ipld_bytes).await.unwrap();
Expand All @@ -80,14 +78,14 @@ pub async fn create_anchor_request_on_cas(
.unwrap_or_else(|_| "https://cas-dev.3boxlabs.com".to_string());
let node_controller = std::env::var("node_controller")
.unwrap_or_else(|_| "did:key:z6Mkh3pajt5brscshuDrCCber9nC9Ujpi7EcECveKtJPMEPo".to_string());
let (stream_id, genesis_cid, genesis_block) = create_stream(StreamIdType::Tile).unwrap();
let (stream_id, genesis_cid, genesis_block) = crate::scenario::util::create_stream().unwrap();

let (root_cid, car_bytes) = stream_tip_car(
stream_id.clone(),
genesis_cid,
genesis_block.clone(),
genesis_block.as_ref().to_vec(),
genesis_cid,
genesis_block,
genesis_block.as_ref().to_vec(),
)
.await
.unwrap();
Expand Down Expand Up @@ -131,37 +129,3 @@ pub async fn cas_benchmark() -> Result<Scenario, GooseError> {

Ok(scenario!("CeramicCasBenchmark").register_transaction(create_anchor_request))
}

/// Create a new Ceramic stream
pub fn create_stream(stream_type: StreamIdType) -> Result<(StreamId, Cid, Vec<u8>)> {
let controller: String = thread_rng()
.sample_iter(&Alphanumeric)
.take(32)
.map(char::from)
.collect();

let genesis_commit = ipld!({
"header": {
"unique": stream_unique_header(),
"controllers": [controller]
}
});
// Deserialize the genesis commit, encode it as CBOR, and compute the CID.
let ipld_map: BTreeMap<String, Ipld> = libipld::serde::from_ipld(genesis_commit)?;
let ipld_bytes = DagCborCodec.encode(&ipld_map)?;
let genesis_cid = Cid::new_v1(IpldCodec::DagCbor.into(), Sha2_256.digest(&ipld_bytes));
Ok((
StreamId {
r#type: stream_type,
cid: genesis_cid,
},
genesis_cid,
ipld_bytes,
))
}

fn stream_unique_header() -> String {
let mut data = [0u8; 8];
thread_rng().fill(&mut data);
general_purpose::STANDARD.encode(data)
}
15 changes: 7 additions & 8 deletions runner/src/scenario/ceramic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ mod models;
pub mod new_streams;
pub mod query;
pub mod simple;
pub mod util;
pub mod write_only;

use ceramic_core::ssi::did::{DIDMethod, DocumentBuilder, Source};
use ceramic_core::ssi::did::{DIDMethod, Document, DocumentBuilder, Source};
use ceramic_core::ssi::jwk::{self, Base64urlUInt, Params, JWK};
use ceramic_http_client::ceramic_event::{DidDocument, JwkSigner};
use ceramic_http_client::ceramic_event::JwkSigner;
use ceramic_http_client::CeramicHttpClient;

use models::RandomModelInstance;
Expand All @@ -22,12 +21,12 @@ pub type CeramicClient = CeramicHttpClient<JwkSigner>;
#[derive(Clone)]
pub struct Credentials {
pub signer: JwkSigner,
pub did: DidDocument,
pub did: Document,
}

impl Credentials {
pub async fn from_env() -> Result<Self, anyhow::Error> {
let did = DidDocument::new(&std::env::var("DID_KEY").expect("DID_KEY is required"));
let did = Document::new(&std::env::var("DID_KEY").expect("DID_KEY is required"));
let private_key = std::env::var("DID_PRIVATE_KEY").expect("DID_PRIVATE_KEY is required");
let signer = JwkSigner::new(did.clone(), &private_key).await?;
Ok(Self { signer, did })
Expand Down Expand Up @@ -60,12 +59,12 @@ impl Credentials {
Ok(Self { signer, did })
}

fn generate_did_for_jwk(key: &JWK) -> anyhow::Result<DidDocument> {
fn generate_did_for_jwk(key: &JWK) -> anyhow::Result<Document> {
let did = did_method_key::DIDKey
.generate(&Source::Key(key))
.ok_or_else(|| anyhow::anyhow!("Failed to generate DID"))?;

let doc: DidDocument = DocumentBuilder::default()
let doc = DocumentBuilder::default()
.id(did)
.build()
.map_err(|e| anyhow::anyhow!("failed to build DID document: {}", e))?;
Expand All @@ -74,7 +73,7 @@ impl Credentials {
}

/// Returns (Private Key, DID Document)
fn generate_did_and_pk() -> anyhow::Result<(String, DidDocument)> {
fn generate_did_and_pk() -> anyhow::Result<(String, Document)> {
let key = jwk::JWK::generate_ed25519()?;
let private_key = if let Params::OKP(params) = &key.params {
let pk = params
Expand Down
23 changes: 0 additions & 23 deletions runner/src/scenario/ceramic/util.rs

This file was deleted.

23 changes: 13 additions & 10 deletions runner/src/scenario/ipfs_block_fetch.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::{sync::Arc, time::Duration};

use anyhow::Result;
use ceramic_core::Cid;
use ceramic_core::{Cid, DagCborEncoded};
use goose::prelude::*;
use libipld::prelude::Codec;
use libipld::{ipld, json::DagJsonCodec};
use multihash::{Code, MultihashDigest};
use std::{sync::Arc, time::Duration};
use ipld_core::ipld;
use multihash_codetable::{Code, MultihashDigest};

use crate::simulate::Topology;

use super::util::DAG_CBOR_CODEC;

pub fn scenario(topo: Topology) -> Result<Scenario> {
let put: Transaction = Transaction::new(Arc::new(move |user| {
Box::pin(async move { put(topo, user).await })
Expand Down Expand Up @@ -40,22 +42,22 @@ fn global_user_id(user: usize, topo: Topology) -> u64 {
}

/// Produce DAG-JSON IPLD node that contains determisiticly unique data for the user.
fn user_data(local_user: usize, topo: Topology) -> (Cid, Vec<u8>) {
fn user_data(local_user: usize, topo: Topology) -> (Cid, DagCborEncoded) {
let id = global_user_id(local_user, topo);
let data = ipld!({
"user": id,
"nonce": topo.nonce,
});

let bytes = DagJsonCodec.encode(&data).unwrap();

let hash = Code::Sha2_256.digest(bytes.as_slice());
(Cid::new_v1(DagJsonCodec.into(), hash), bytes)
let bytes = DagCborEncoded::new(&data).unwrap();
let cid = Cid::new_v1(DAG_CBOR_CODEC, Code::Sha2_256.digest(bytes.as_ref()));
(cid, bytes)
}

// Generate determisitic random data and put it into IPFS
async fn put(topo: Topology, user: &mut GooseUser) -> TransactionResult {
let (cid, data) = user_data(user.weighted_users_index, topo);
let data = data.as_ref().to_vec();
println!(
"put id: {} user: {} nonce: {} cid: {}",
topo.target_worker, user.weighted_users_index, topo.nonce, cid,
Expand Down Expand Up @@ -118,6 +120,7 @@ async fn get(mut topo: Topology, user: &mut GooseUser) -> TransactionResult {
// Check that all written data is accounted for.
async fn check(topo: Topology, user: &mut GooseUser) -> TransactionResult {
let (cid, data) = user_data(user.weighted_users_index, topo);
let data = data.as_ref().to_vec();
println!(
"stop id: {} user: {} cid: {}",
topo.target_worker, user.weighted_users_index, cid,
Expand Down
3 changes: 2 additions & 1 deletion runner/src/scenario/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::scenario::ceramic::util::goose_error;
use crate::scenario::util::goose_error;
use goose::GooseError;

pub mod ceramic;
pub mod ipfs_block_fetch;
pub mod recon_sync;
pub mod util;

static FIRST_USER: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(true);

Expand Down
Loading
Loading