diff --git a/CHANGELOG.md b/CHANGELOG.md index f23fcc2..335228e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] +## [0.1.1] +### Added +- Add `Client::publish_success` to signal instance success to daemon and sync service. See [PR 5]. + +[PR 5]: https://github.com/testground/sdk-rust/pull/5 ## [0.1.0] - 2022-01-24 ### Added diff --git a/Cargo.toml b/Cargo.toml index 8592930..7d6bd0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,13 +6,14 @@ edition = "2021" license = "Apache-2.0 OR MIT" name = "testground" repository = "https://github.com/testground/sdk-rust/" -version = "0.1.0" +version = "0.1.1" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -soketto = "0.7.1" async-std = { version = "1.10", features = [ "attributes" ] } +log = "0.4" serde = { version = "1", features = [ "derive" ] } serde_json = "1" +soketto = "0.7.1" thiserror = "1" diff --git a/src/sync.rs b/src/sync.rs index f4c553d..0647764 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -1,6 +1,7 @@ use serde::{Deserialize, Serialize}; use soketto::connection::{Receiver, Sender}; use soketto::handshake::{self, ServerResponse}; +use std::time::{SystemTime, UNIX_EPOCH}; use thiserror::Error; /// Basic synchronization client enabling one to send signals and await barriers. @@ -57,6 +58,7 @@ impl Client { state: contextualized_state, }), barrier: None, + publish: None, }; self.send(request).await?; @@ -83,6 +85,7 @@ impl Client { let request = Request { id: id.clone(), signal_entry: None, + publish: None, barrier: Some(BarrierRequest { state: contextualized_state, target, @@ -100,6 +103,54 @@ impl Client { Ok(()) } + pub async fn publish_success(&mut self) -> Result { + let id = self.next_id().to_string(); + + let event = Event { + success_event: SuccessEvent { + group: std::env::var("TEST_GROUP_ID").unwrap(), + }, + }; + + let request = Request { + id: id.clone(), + signal_entry: None, + barrier: None, + publish: Some(PublishRequest { + topic: topic(), + payload: event.clone(), + }), + }; + + self.send(request).await?; + let resp = self.receive().await?; + if resp.id != id { + return Err(PublishSuccessError::UnexpectedId(resp.id)); + } + if !resp.error.is_empty() { + return Err(PublishSuccessError::Remote(resp.error)); + } + let seq = resp + .publish + .ok_or(PublishSuccessError::ExpectedPublishInResponse) + .map(|resp| resp.seq)?; + + // The Testground daemon determines the success or failure of a test + // instance by parsing stdout for runtime events. + println!( + "{}", + serde_json::to_string(&LogLine { + ts: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos(), + event, + })? + ); + + Ok(seq) + } + fn next_id(&mut self) -> u64 { let next_id = self.next_id; self.next_id += 1; @@ -107,6 +158,7 @@ impl Client { } async fn send(&mut self, req: Request) -> Result<(), SendError> { + log::debug!("Sending request: {:?}", req); self.sender.send_text(serde_json::to_string(&req)?).await?; self.sender.flush().await?; Ok(()) @@ -116,20 +168,28 @@ impl Client { let mut data = Vec::new(); self.receiver.receive_data(&mut data).await?; let resp = serde_json::from_str(&String::from_utf8(data)?)?; + log::debug!("Received response: {:?}", resp); Ok(resp) } } -fn contextualize_state(state: String) -> String { +fn context_from_env() -> String { format!( - "run:{}:plan:{}:case:{}:states:{}", + "run:{}:plan:{}:case:{}", std::env::var("TEST_RUN").unwrap(), std::env::var("TEST_PLAN").unwrap(), std::env::var("TEST_CASE").unwrap(), - state ) } +fn contextualize_state(state: String) -> String { + format!("{}:states:{}", context_from_env(), state,) +} + +fn topic() -> String { + format!("{}:run_events", context_from_env(),) +} + #[derive(Error, Debug)] pub enum SignalError { #[error("Remote returned error {0}.")] @@ -156,6 +216,22 @@ pub enum BarrierError { Receive(#[from] ReceiveError), } +#[derive(Error, Debug)] +pub enum PublishSuccessError { + #[error("Serde: {0}")] + Serde(#[from] serde_json::error::Error), + #[error("Remote returned error {0}.")] + Remote(String), + #[error("Remote returned response with unexpected ID {0}.")] + UnexpectedId(String), + #[error("Expected remote to return publish entry in response.")] + ExpectedPublishInResponse, + #[error("Error sending request {0}")] + Send(#[from] SendError), + #[error("Error receiving response: {0}")] + Receive(#[from] ReceiveError), +} + #[derive(Error, Debug)] pub enum SendError { #[error("Soketto: {0}")] @@ -174,32 +250,61 @@ pub enum ReceiveError { FromUtf8(#[from] std::string::FromUtf8Error), } -#[derive(Serialize)] +#[derive(Debug, Serialize)] struct Request { id: String, signal_entry: Option, barrier: Option, + publish: Option, } -#[derive(Serialize)] +#[derive(Debug, Serialize)] struct SignalEntryRequest { state: String, } -#[derive(Serialize)] +#[derive(Debug, Serialize)] struct BarrierRequest { state: String, target: u64, } +#[derive(Debug, Serialize)] +struct PublishRequest { + topic: String, + payload: Event, +} + +#[derive(Debug, Serialize)] +struct LogLine { + ts: u128, + event: Event, +} + +#[derive(Clone, Debug, Serialize)] +struct Event { + success_event: SuccessEvent, +} + +#[derive(Clone, Debug, Serialize)] +struct SuccessEvent { + group: String, +} + #[derive(Deserialize, Debug)] struct Response { id: String, signal_entry: Option, error: String, + publish: Option, } #[derive(Deserialize, Debug)] struct SignalEntryResponse { seq: u64, } + +#[derive(Deserialize, Debug)] +struct PublishResponse { + seq: u64, +}