Skip to content

Commit

Permalink
src/sync: Add Client::publish_success (#5)
Browse files Browse the repository at this point in the history
Enables test instances to signal success both via a publish to
the sync service and to Testground daemon via stdout.
  • Loading branch information
mxinden authored Mar 17, 2022
1 parent d583657 commit 0daa87b
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 9 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
## [0.1.1]
### Added
- Add `Client::publish_success` to signal instance success to daemon and sync service. See [PR 5].

[PR 5]: https://github.com/testground/sdk-rust/pull/5

## [0.1.0] - 2022-01-24
### Added
Expand Down
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ edition = "2021"
license = "Apache-2.0 OR MIT"
name = "testground"
repository = "https://github.com/testground/sdk-rust/"
version = "0.1.0"
version = "0.1.1"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
soketto = "0.7.1"
async-std = { version = "1.10", features = [ "attributes" ] }
log = "0.4"
serde = { version = "1", features = [ "derive" ] }
serde_json = "1"
soketto = "0.7.1"
thiserror = "1"
117 changes: 111 additions & 6 deletions src/sync.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use serde::{Deserialize, Serialize};
use soketto::connection::{Receiver, Sender};
use soketto::handshake::{self, ServerResponse};
use std::time::{SystemTime, UNIX_EPOCH};
use thiserror::Error;

/// Basic synchronization client enabling one to send signals and await barriers.
Expand Down Expand Up @@ -57,6 +58,7 @@ impl Client {
state: contextualized_state,
}),
barrier: None,
publish: None,
};

self.send(request).await?;
Expand All @@ -83,6 +85,7 @@ impl Client {
let request = Request {
id: id.clone(),
signal_entry: None,
publish: None,
barrier: Some(BarrierRequest {
state: contextualized_state,
target,
Expand All @@ -100,13 +103,62 @@ impl Client {
Ok(())
}

pub async fn publish_success(&mut self) -> Result<u64, PublishSuccessError> {
let id = self.next_id().to_string();

let event = Event {
success_event: SuccessEvent {
group: std::env::var("TEST_GROUP_ID").unwrap(),
},
};

let request = Request {
id: id.clone(),
signal_entry: None,
barrier: None,
publish: Some(PublishRequest {
topic: topic(),
payload: event.clone(),
}),
};

self.send(request).await?;
let resp = self.receive().await?;
if resp.id != id {
return Err(PublishSuccessError::UnexpectedId(resp.id));
}
if !resp.error.is_empty() {
return Err(PublishSuccessError::Remote(resp.error));
}
let seq = resp
.publish
.ok_or(PublishSuccessError::ExpectedPublishInResponse)
.map(|resp| resp.seq)?;

// The Testground daemon determines the success or failure of a test
// instance by parsing stdout for runtime events.
println!(
"{}",
serde_json::to_string(&LogLine {
ts: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos(),
event,
})?
);

Ok(seq)
}

fn next_id(&mut self) -> u64 {
let next_id = self.next_id;
self.next_id += 1;
next_id
}

async fn send(&mut self, req: Request) -> Result<(), SendError> {
log::debug!("Sending request: {:?}", req);
self.sender.send_text(serde_json::to_string(&req)?).await?;
self.sender.flush().await?;
Ok(())
Expand All @@ -116,20 +168,28 @@ impl Client {
let mut data = Vec::new();
self.receiver.receive_data(&mut data).await?;
let resp = serde_json::from_str(&String::from_utf8(data)?)?;
log::debug!("Received response: {:?}", resp);
Ok(resp)
}
}

fn contextualize_state(state: String) -> String {
fn context_from_env() -> String {
format!(
"run:{}:plan:{}:case:{}:states:{}",
"run:{}:plan:{}:case:{}",
std::env::var("TEST_RUN").unwrap(),
std::env::var("TEST_PLAN").unwrap(),
std::env::var("TEST_CASE").unwrap(),
state
)
}

fn contextualize_state(state: String) -> String {
format!("{}:states:{}", context_from_env(), state,)
}

fn topic() -> String {
format!("{}:run_events", context_from_env(),)
}

#[derive(Error, Debug)]
pub enum SignalError {
#[error("Remote returned error {0}.")]
Expand All @@ -156,6 +216,22 @@ pub enum BarrierError {
Receive(#[from] ReceiveError),
}

#[derive(Error, Debug)]
pub enum PublishSuccessError {
#[error("Serde: {0}")]
Serde(#[from] serde_json::error::Error),
#[error("Remote returned error {0}.")]
Remote(String),
#[error("Remote returned response with unexpected ID {0}.")]
UnexpectedId(String),
#[error("Expected remote to return publish entry in response.")]
ExpectedPublishInResponse,
#[error("Error sending request {0}")]
Send(#[from] SendError),
#[error("Error receiving response: {0}")]
Receive(#[from] ReceiveError),
}

#[derive(Error, Debug)]
pub enum SendError {
#[error("Soketto: {0}")]
Expand All @@ -174,32 +250,61 @@ pub enum ReceiveError {
FromUtf8(#[from] std::string::FromUtf8Error),
}

#[derive(Serialize)]
#[derive(Debug, Serialize)]
struct Request {
id: String,
signal_entry: Option<SignalEntryRequest>,
barrier: Option<BarrierRequest>,
publish: Option<PublishRequest>,
}

#[derive(Serialize)]
#[derive(Debug, Serialize)]
struct SignalEntryRequest {
state: String,
}

#[derive(Serialize)]
#[derive(Debug, Serialize)]
struct BarrierRequest {
state: String,
target: u64,
}

#[derive(Debug, Serialize)]
struct PublishRequest {
topic: String,
payload: Event,
}

#[derive(Debug, Serialize)]
struct LogLine {
ts: u128,
event: Event,
}

#[derive(Clone, Debug, Serialize)]
struct Event {
success_event: SuccessEvent,
}

#[derive(Clone, Debug, Serialize)]
struct SuccessEvent {
group: String,
}

#[derive(Deserialize, Debug)]
struct Response {
id: String,
signal_entry: Option<SignalEntryResponse>,
error: String,
publish: Option<PublishResponse>,
}

#[derive(Deserialize, Debug)]
struct SignalEntryResponse {
seq: u64,
}

#[derive(Deserialize, Debug)]
struct PublishResponse {
seq: u64,
}

0 comments on commit 0daa87b

Please sign in to comment.