Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds transitive sharing of peer information #618

Draft
wants to merge 7 commits into
base: feat/peer-recon-ring
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
.idea
.vscode

peer*
sync_test.sh

openapi-generator-cli.jar
Expand Down
38 changes: 37 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ members = [
"metrics",
"one",
"p2p",
"peer-svc",
"pipeline",
"recon",
"sql",
Expand Down Expand Up @@ -76,6 +77,7 @@ ceramic-metadata = { path = "./metadata" }
ceramic-metrics = { path = "./metrics" }
ceramic-one = { path = "./one" }
ceramic-p2p = { path = "./p2p" }
ceramic-peer-svc = { path = "./peer-svc" }
ceramic-pipeline = { path = "./pipeline" }
ceramic-sql = { path = "./sql" }
ceramic-validation = { path = "./validation" }
Expand Down
1 change: 0 additions & 1 deletion anchor-remote/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ expect-test.workspace = true
hex.workspace = true
multihash-codetable.workspace = true
reqwest.workspace = true
ring.workspace = true
serde.workspace = true
serde_ipld_dagcbor.workspace = true
serde_json.workspace = true
Expand Down
41 changes: 15 additions & 26 deletions anchor-remote/src/cas_remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use base64::{
Engine as _,
};
use multihash_codetable::{Code, MultihashDigest};
use ring::signature::Ed25519KeyPair;
use serde::{Deserialize, Serialize};
use tokio::time::interval;
use tracing::{debug, info, warn};
Expand All @@ -17,7 +16,7 @@ use ceramic_anchor_service::{
DetachedTimeEvent, MerkleNode, MerkleNodes, RootTimeEvent, TransactionManager,
};
use ceramic_car::CarReader;
use ceramic_core::{Cid, NodeId, StreamId};
use ceramic_core::{Cid, NodeKey, StreamId};
use ceramic_event::unvalidated::AnchorProof;

pub const AGENT_VERSION: &str = concat!("ceramic-one/", env!("CARGO_PKG_VERSION"));
Expand Down Expand Up @@ -63,8 +62,7 @@ struct CasAnchorResponse {

/// Remote CAS transaction manager
pub struct RemoteCas {
node_id: NodeId,
signing_key: Ed25519KeyPair,
node_key: NodeKey,
url: String,
poll_interval: Duration,
poll_retry_count: u64,
Expand Down Expand Up @@ -112,13 +110,12 @@ impl TransactionManager for RemoteCas {
impl RemoteCas {
/// Create a new RemoteCas instance
pub fn new(
node_id: NodeId,
keypair: Ed25519KeyPair,
node_key: NodeKey,
remote_anchor_service_url: String,
anchor_poll_interval: Duration,
anchor_poll_retry_count: u64,
) -> Self {
let controller = node_id.did_key();
let controller = node_key.did_key();
let jws_header = Header {
kid: format!(
"{}#{}",
Expand All @@ -133,8 +130,7 @@ impl RemoteCas {
let jws_header_b64 =
b64.encode(serde_json::to_vec(&jws_header).expect("invalid jws header"));
Self {
node_id,
signing_key: keypair,
node_key,
url: format!("{}/api/v0/requests", remote_anchor_service_url),
poll_interval: anchor_poll_interval,
poll_retry_count: anchor_poll_retry_count,
Expand All @@ -146,7 +142,7 @@ impl RemoteCas {
/// Create an anchor request on the remote CAS
pub async fn create_anchor_request(&self, root_cid: Cid) -> Result<String> {
let cas_request_body = serde_json::to_string(&CasAnchorRequest {
stream_id: self.node_id.stream_id(),
stream_id: self.node_key.stream_id(),
cid: root_cid.to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
ceramic_one_version: AGENT_VERSION.to_owned(),
Expand Down Expand Up @@ -175,7 +171,7 @@ impl RemoteCas {
};
let body_b64 = b64.encode(serde_json::to_vec(&body)?);
let message = [self.jws_header_b64.clone(), body_b64].join(".");
let sig_b64 = b64.encode(self.signing_key.sign(message.as_bytes()));
let sig_b64 = b64.encode(self.node_key.sign(message.as_bytes()));
Ok([message.clone(), sig_b64].join("."))
}
}
Expand Down Expand Up @@ -237,16 +233,15 @@ mod tests {

use expect_test::expect_file;
use multihash_codetable::{Code, MultihashDigest};
use ring::signature::Ed25519KeyPair;

use ceramic_anchor_service::{
AnchorService, MockAnchorEventService, Store, TransactionManager,
};
use ceramic_core::Cid;
use ceramic_core::{Cid, NodeKey};
use ceramic_sql::sqlite::SqlitePool;

fn node_id_and_private_key() -> (NodeId, Ed25519KeyPair) {
NodeId::try_from_secret(
fn node_key() -> NodeKey {
NodeKey::try_from_secret(
std::env::var("NODE_PRIVATE_KEY")
// The following secret is NOT authenticated with CAS, it is only used for testing.
.unwrap_or(
Expand All @@ -263,13 +258,11 @@ mod tests {
async fn test_anchor_batch_with_cas() {
let anchor_client = Arc::new(MockAnchorEventService::new(10));
let anchor_requests = anchor_client
.events_since_high_water_mark(NodeId::random().0, 0, 1_000_000)
.events_since_high_water_mark(NodeKey::random().id(), 0, 1_000_000)
.await
.unwrap();
let (node_id, keypair) = node_id_and_private_key();
let remote_cas = Arc::new(RemoteCas::new(
node_id,
keypair,
node_key(),
"https://cas-dev.3boxlabs.com".to_owned(),
Duration::from_secs(1),
1,
Expand All @@ -278,7 +271,7 @@ mod tests {
remote_cas,
anchor_client,
SqlitePool::connect_in_memory().await.unwrap(),
NodeId::random().0,
NodeKey::random().id(),
Duration::from_secs(1),
10,
);
Expand All @@ -295,11 +288,9 @@ mod tests {
async fn test_create_anchor_request_with_cas() {
let mock_root_cid =
Cid::from_str("bafyreia776z4jdg5zgycivcpr3q6lcu6llfowkrljkmq3bex2k5hkzat54").unwrap();
let (node_id, keypair) = node_id_and_private_key();

let remote_cas = RemoteCas::new(
node_id,
keypair,
node_key(),
"https://cas-dev.3boxlabs.com".to_owned(),
Duration::from_secs(1),
1,
Expand All @@ -323,10 +314,8 @@ mod tests {
async fn test_jwt() {
let mock_data = serde_ipld_dagcbor::to_vec(b"mock root").unwrap();
let mock_hash = MultihashDigest::digest(&Code::Sha2_256, &mock_data);
let (node_id, keypair) = node_id_and_private_key();
let remote_cas = Arc::new(RemoteCas::new(
node_id,
keypair,
node_key(),
"https://cas-dev.3boxlabs.com".to_owned(),
Duration::from_secs(1),
1,
Expand Down
6 changes: 3 additions & 3 deletions anchor-service/src/anchor_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl AnchorService {
mod tests {
use std::{sync::Arc, time::Duration};

use ceramic_core::NodeId;
use ceramic_core::NodeKey;
use ceramic_sql::sqlite::SqlitePool;
use expect_test::expect_file;
use tokio::{sync::broadcast, time::sleep};
Expand All @@ -245,7 +245,7 @@ mod tests {
let tx_manager = Arc::new(MockCas);
let event_service = Arc::new(MockAnchorEventService::new(10));
let pool = SqlitePool::connect_in_memory().await.unwrap();
let node_id = NodeId::random().0;
let node_id = NodeKey::random().id();
let anchor_interval = Duration::from_millis(5);
let anchor_batch_size = 1000000;
let mut anchor_service = AnchorService::new(
Expand Down Expand Up @@ -277,7 +277,7 @@ mod tests {
let tx_manager = Arc::new(MockCas);
let event_service = Arc::new(MockAnchorEventService::new(1));
let pool = SqlitePool::connect_in_memory().await.unwrap();
let node_id = NodeId::random().0;
let node_id = NodeKey::random().id();
let anchor_interval = Duration::from_millis(5);
let anchor_batch_size = 1000000;
let mut anchor_service = AnchorService::new(
Expand Down
26 changes: 13 additions & 13 deletions anchor-service/src/cas_mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl Store for MockAnchorEventService {
#[cfg(test)]
pub mod tests {
use super::*;
use ceramic_core::NodeId;
use ceramic_core::NodeKey;
use ceramic_sql::sqlite::SqlitePool;
use expect_test::expect_file;
use std::sync::Arc;
Expand All @@ -105,14 +105,14 @@ pub mod tests {
async fn test_anchor_batch_with_10_requests() {
let anchor_client = Arc::new(MockAnchorEventService::new(10));
let anchor_requests = anchor_client
.events_since_high_water_mark(NodeId::random().0, 0, 1_000_000)
.events_since_high_water_mark(NodeKey::random().id(), 0, 1_000_000)
.await
.unwrap();
let anchor_service = AnchorService::new(
Arc::new(MockCas),
anchor_client,
SqlitePool::connect_in_memory().await.unwrap(),
NodeId::random().0,
NodeKey::random().id(),
Duration::from_secs(1),
10,
);
Expand All @@ -128,14 +128,14 @@ pub mod tests {
async fn test_anchor_batch_with_pow2_requests() {
let anchor_client = Arc::new(MockAnchorEventService::new(16));
let anchor_requests = anchor_client
.events_since_high_water_mark(NodeId::random().0, 0, 1_000_000)
.events_since_high_water_mark(NodeKey::random().id(), 0, 1_000_000)
.await
.unwrap();
let anchor_service = AnchorService::new(
Arc::new(MockCas),
anchor_client,
SqlitePool::connect_in_memory().await.unwrap(),
NodeId::random().0,
NodeKey::random().id(),
Duration::from_secs(1),
16,
);
Expand All @@ -151,14 +151,14 @@ pub mod tests {
async fn test_anchor_batch_with_more_than_pow2_requests() {
let anchor_client = Arc::new(MockAnchorEventService::new(18));
let anchor_requests = anchor_client
.events_since_high_water_mark(NodeId::random().0, 0, 1_000_000)
.events_since_high_water_mark(NodeKey::random().id(), 0, 1_000_000)
.await
.unwrap();
let anchor_service = AnchorService::new(
Arc::new(MockCas),
anchor_client,
SqlitePool::connect_in_memory().await.unwrap(),
NodeId::random().0,
NodeKey::random().id(),
Duration::from_secs(1),
18,
);
Expand All @@ -174,14 +174,14 @@ pub mod tests {
async fn test_anchor_batch_with_less_than_pow2_requests() {
let anchor_client = Arc::new(MockAnchorEventService::new(15));
let anchor_requests = anchor_client
.events_since_high_water_mark(NodeId::random().0, 0, 1_000_000)
.events_since_high_water_mark(NodeKey::random().id(), 0, 1_000_000)
.await
.unwrap();
let anchor_service = AnchorService::new(
Arc::new(MockCas),
anchor_client,
SqlitePool::connect_in_memory().await.unwrap(),
NodeId::random().0,
NodeKey::random().id(),
Duration::from_secs(1),
15,
);
Expand All @@ -197,14 +197,14 @@ pub mod tests {
async fn test_anchor_batch_with_0_requests() {
let anchor_client = Arc::new(MockAnchorEventService::new(0));
let anchor_requests = anchor_client
.events_since_high_water_mark(NodeId::random().0, 0, 1_000_000)
.events_since_high_water_mark(NodeKey::random().id(), 0, 1_000_000)
.await
.unwrap();
let anchor_service = AnchorService::new(
Arc::new(MockCas),
anchor_client,
SqlitePool::connect_in_memory().await.unwrap(),
NodeId::random().0,
NodeKey::random().id(),
Duration::from_secs(1),
1,
);
Expand All @@ -219,14 +219,14 @@ pub mod tests {
async fn test_anchor_batch_with_1_request() {
let anchor_client = Arc::new(MockAnchorEventService::new(1));
let anchor_requests = anchor_client
.events_since_high_water_mark(NodeId::random().0, 0, 1_000_000)
.events_since_high_water_mark(NodeKey::random().id(), 0, 1_000_000)
.await
.unwrap();
let anchor_service = AnchorService::new(
Arc::new(MockCas),
anchor_client,
SqlitePool::connect_in_memory().await.unwrap(),
NodeId::random().0,
NodeKey::random().id(),
Duration::from_secs(1),
1,
);
Expand Down
Loading