Skip to content

Commit

Permalink
Reduce coordinator tests contention re: cosign messages
Browse files Browse the repository at this point in the history
  • Loading branch information
kayabaNerve committed Mar 20, 2024
1 parent 4a6496a commit 13b147c
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 161 deletions.
316 changes: 156 additions & 160 deletions tests/coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use std::{
time::Duration,
};

use tokio::{task::AbortHandle, sync::Mutex as AsyncMutex};
use tokio::{
task::AbortHandle,
sync::{Mutex as AsyncMutex, mpsc},
};

use rand_core::{RngCore, OsRng};

Expand Down Expand Up @@ -96,15 +99,15 @@ pub struct Handles {
pub(crate) message_queue: String,
}

#[derive(Clone)]
pub struct Processor {
network: NetworkId,

serai_rpc: String,
#[allow(unused)]
handles: Handles,

queue: Arc<AsyncMutex<(u64, u64, MessageQueue)>>,
msgs: mpsc::UnboundedReceiver<messages::CoordinatorMessage>,
queue_for_sending: MessageQueue,
abort_handle: Option<Arc<AbortHandle>>,

substrate_key: Arc<AsyncMutex<Option<Zeroizing<<Ristretto as Ciphersuite>::F>>>>,
Expand Down Expand Up @@ -145,156 +148,173 @@ impl Processor {
// The Serai RPC may or may not be started
// Assume it is and continue, so if it's a few seconds late, it's still within tolerance

// Create the queue
let mut queue = (
0,
Arc::new(MessageQueue::new(
Service::Processor(network),
message_queue_rpc.clone(),
Zeroizing::new(processor_key),
)),
);

let (msg_send, msg_recv) = mpsc::unbounded_channel();

let substrate_key = Arc::new(AsyncMutex::new(None));
let mut res = Processor {
network,

serai_rpc,
handles,

queue: Arc::new(AsyncMutex::new((
0,
0,
MessageQueue::new(
Service::Processor(network),
message_queue_rpc,
Zeroizing::new(processor_key),
),
))),
queue_for_sending: MessageQueue::new(
Service::Processor(network),
message_queue_rpc,
Zeroizing::new(processor_key),
),
msgs: msg_recv,
abort_handle: None,

substrate_key: Arc::new(AsyncMutex::new(None)),
substrate_key: substrate_key.clone(),
};

// Handle any cosigns which come up
res.abort_handle = Some(Arc::new(
tokio::spawn({
let mut res = res.clone();
async move {
loop {
tokio::task::yield_now().await;

let msg = {
let mut queue_lock = res.queue.lock().await;
let (_, next_recv_id, queue) = &mut *queue_lock;
let Ok(msg) =
tokio::time::timeout(Duration::from_secs(1), queue.next(Service::Coordinator))
.await
// Spawn a task to handle cosigns and forward messages as appropriate
let abort_handle = tokio::spawn({
async move {
loop {
// Get new messages
let (next_recv_id, queue) = &mut queue;
let msg = queue.next(Service::Coordinator).await;
assert_eq!(msg.from, Service::Coordinator);
assert_eq!(msg.id, *next_recv_id);
queue.ack(Service::Coordinator, msg.id).await;
*next_recv_id += 1;

let msg_msg = borsh::from_slice(&msg.msg).unwrap();

// Remove any BatchReattempts clogging the pipe
// TODO: Set up a wrapper around serai-client so we aren't throwing this away yet
// leave it for the tests
if matches!(
msg_msg,
messages::CoordinatorMessage::Coordinator(
messages::coordinator::CoordinatorMessage::BatchReattempt { .. }
)
) {
continue;
}

if !is_cosign_message(&msg_msg) {
msg_send.send(msg_msg).unwrap();
continue;
}
let msg = msg_msg;

let send_message = |msg: ProcessorMessage| async move {
queue
.queue(
Metadata {
from: Service::Processor(network),
to: Service::Coordinator,
intent: msg.intent(),
},
borsh::to_vec(&msg).unwrap(),
)
.await;
};

struct CurrentCosign {
block_number: u64,
block: [u8; 32],
}
static CURRENT_COSIGN: OnceLock<AsyncMutex<Option<CurrentCosign>>> = OnceLock::new();
let mut current_cosign =
CURRENT_COSIGN.get_or_init(|| AsyncMutex::new(None)).lock().await;
match msg {
// If this is a CosignSubstrateBlock, reset the CurrentCosign
// While technically, each processor should individually track the current cosign,
// this is fine for current testing purposes
CoordinatorMessage::Coordinator(
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { id, block_number },
) => {
let SubstrateSignId {
id: SubstrateSignableId::CosigningSubstrateBlock(block), ..
} = id
else {
continue;
panic!("CosignSubstrateBlock didn't have CosigningSubstrateBlock ID")
};
assert_eq!(msg.from, Service::Coordinator);
assert_eq!(msg.id, *next_recv_id);

let msg_msg = borsh::from_slice(&msg.msg).unwrap();
// Remove any BatchReattempts clogging the pipe
// TODO: Set up a wrapper around serai-client so we aren't throwing this away yet
// leave it for the tests
if matches!(
msg_msg,
messages::CoordinatorMessage::Coordinator(
messages::coordinator::CoordinatorMessage::BatchReattempt { .. }
)
) {
queue.ack(Service::Coordinator, msg.id).await;
*next_recv_id += 1;
continue;

let new_cosign = CurrentCosign { block_number, block };
if current_cosign.is_none() || (current_cosign.as_ref().unwrap().block != block) {
*current_cosign = Some(new_cosign);
}
if !is_cosign_message(&msg_msg) {
continue;
};
queue.ack(Service::Coordinator, msg.id).await;
*next_recv_id += 1;
msg_msg
};

struct CurrentCosign {
block_number: u64,
block: [u8; 32],
send_message(
messages::coordinator::ProcessorMessage::CosignPreprocess {
id: id.clone(),
preprocesses: vec![[raw_i; 64]],
}
.into(),
)
.await;
}
static CURRENT_COSIGN: OnceLock<AsyncMutex<Option<CurrentCosign>>> = OnceLock::new();
let mut current_cosign =
CURRENT_COSIGN.get_or_init(|| AsyncMutex::new(None)).lock().await;
match msg {
// If this is a CosignSubstrateBlock, reset the CurrentCosign
// While technically, each processor should individually track the current cosign,
// this is fine for current testing purposes
CoordinatorMessage::Coordinator(
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
CoordinatorMessage::Coordinator(
messages::coordinator::CoordinatorMessage::SubstratePreprocesses { id, .. },
) => {
// TODO: Assert the ID matches CURRENT_COSIGN
// TODO: Verify the received preprocesses
send_message(
messages::coordinator::ProcessorMessage::SubstrateShare {
id,
shares: vec![[raw_i; 32]],
}
.into(),
)
.await;
}
CoordinatorMessage::Coordinator(
messages::coordinator::CoordinatorMessage::SubstrateShares { .. },
) => {
// TODO: Assert the ID matches CURRENT_COSIGN
// TODO: Verify the shares

let block_number = current_cosign.as_ref().unwrap().block_number;
let block = current_cosign.as_ref().unwrap().block;

let substrate_key = substrate_key.lock().await.clone().unwrap();

// Expand to a key pair as Schnorrkel expects
// It's the private key + 32-bytes of entropy for nonces + the public key
let mut schnorrkel_key_pair = [0; 96];
schnorrkel_key_pair[.. 32].copy_from_slice(&substrate_key.to_repr());
OsRng.fill_bytes(&mut schnorrkel_key_pair[32 .. 64]);
schnorrkel_key_pair[64 ..].copy_from_slice(
&(<Ristretto as Ciphersuite>::generator() * *substrate_key).to_bytes(),
);
let signature = Signature(
schnorrkel::keys::Keypair::from_bytes(&schnorrkel_key_pair)
.unwrap()
.sign_simple(b"substrate", &cosign_block_msg(block_number, block))
.to_bytes(),
);

send_message(
messages::coordinator::ProcessorMessage::CosignedBlock {
block_number,
},
) => {
let SubstrateSignId {
id: SubstrateSignableId::CosigningSubstrateBlock(block), ..
} = id
else {
panic!("CosignSubstrateBlock didn't have CosigningSubstrateBlock ID")
};

let new_cosign = CurrentCosign { block_number, block };
if current_cosign.is_none() || (current_cosign.as_ref().unwrap().block != block) {
*current_cosign = Some(new_cosign);
block,
signature: signature.0.to_vec(),
}
res
.send_message(messages::coordinator::ProcessorMessage::CosignPreprocess {
id: id.clone(),
preprocesses: vec![[raw_i; 64]],
})
.await;
}
CoordinatorMessage::Coordinator(
messages::coordinator::CoordinatorMessage::SubstratePreprocesses { id, .. },
) => {
// TODO: Assert the ID matches CURRENT_COSIGN
// TODO: Verify the received preprocesses
res
.send_message(messages::coordinator::ProcessorMessage::SubstrateShare {
id,
shares: vec![[raw_i; 32]],
})
.await;
}
CoordinatorMessage::Coordinator(
messages::coordinator::CoordinatorMessage::SubstrateShares { .. },
) => {
// TODO: Assert the ID matches CURRENT_COSIGN
// TODO: Verify the shares

let block_number = current_cosign.as_ref().unwrap().block_number;
let block = current_cosign.as_ref().unwrap().block;

let substrate_key = res.substrate_key.lock().await.clone().unwrap();

// Expand to a key pair as Schnorrkel expects
// It's the private key + 32-bytes of entropy for nonces + the public key
let mut schnorrkel_key_pair = [0; 96];
schnorrkel_key_pair[.. 32].copy_from_slice(&substrate_key.to_repr());
OsRng.fill_bytes(&mut schnorrkel_key_pair[32 .. 64]);
schnorrkel_key_pair[64 ..].copy_from_slice(
&(<Ristretto as Ciphersuite>::generator() * *substrate_key).to_bytes(),
);
let signature = Signature(
schnorrkel::keys::Keypair::from_bytes(&schnorrkel_key_pair)
.unwrap()
.sign_simple(b"substrate", &cosign_block_msg(block_number, block))
.to_bytes(),
);

res
.send_message(messages::coordinator::ProcessorMessage::CosignedBlock {
block_number,
block,
signature: signature.0.to_vec(),
})
.await;
}
_ => panic!("unexpected message passed is_cosign_message"),
.into(),
)
.await;
}
_ => panic!("unexpected message passed is_cosign_message"),
}
}
})
.abort_handle(),
));
}
})
.abort_handle();

res.abort_handle = Some(Arc::new(abort_handle));

res
}
Expand All @@ -307,9 +327,8 @@ impl Processor {
pub async fn send_message(&mut self, msg: impl Into<ProcessorMessage>) {
let msg: ProcessorMessage = msg.into();

let mut queue_lock = self.queue.lock().await;
let (next_send_id, _, queue) = &mut *queue_lock;
queue
self
.queue_for_sending
.queue(
Metadata {
from: Service::Processor(self.network),
Expand All @@ -319,36 +338,13 @@ impl Processor {
borsh::to_vec(&msg).unwrap(),
)
.await;
*next_send_id += 1;
}

async fn recv_message_inner(&mut self) -> CoordinatorMessage {
loop {
tokio::task::yield_now().await;

let mut queue_lock = self.queue.lock().await;
let (_, next_recv_id, queue) = &mut *queue_lock;
let msg = queue.next(Service::Coordinator).await;
assert_eq!(msg.from, Service::Coordinator);
assert_eq!(msg.id, *next_recv_id);

// If this is a cosign message, let the cosign task handle it
let msg_msg = borsh::from_slice(&msg.msg).unwrap();
if is_cosign_message(&msg_msg) {
continue;
}

queue.ack(Service::Coordinator, msg.id).await;
*next_recv_id += 1;
return msg_msg;
}
}

/// Receive a message from the coordinator as a processor.
pub async fn recv_message(&mut self) -> CoordinatorMessage {
// Set a timeout of 20 minutes to allow effectively any protocol to occur without a fear of
// an arbitrary timeout cutting it short
tokio::time::timeout(Duration::from_secs(20 * 60), self.recv_message_inner()).await.unwrap()
tokio::time::timeout(Duration::from_secs(20 * 60), self.msgs.recv()).await.unwrap().unwrap()
}

pub async fn set_substrate_key(
Expand Down
2 changes: 1 addition & 1 deletion tests/coordinator/src/tests/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ pub async fn batch(
)
);

// Send the ack as expected, though it shouldn't trigger any observable behavior
// Send the ack as expected
processor
.send_message(messages::ProcessorMessage::Coordinator(
messages::coordinator::ProcessorMessage::SubstrateBlockAck {
Expand Down

0 comments on commit 13b147c

Please sign in to comment.