Skip to content

Commit

Permalink
Tweak integration tests
Browse files Browse the repository at this point in the history
- Use random TCP ports
- Use random output files
- Use goldenfile (has the practical UPDATE_GOLDENFILES=1 flag)
  • Loading branch information
Anviking committed Nov 12, 2024
1 parent 365210b commit f32dded
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 105 deletions.
115 changes: 50 additions & 65 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,6 @@ futures-util = "0.3"
bytes = "1.7.2"

[dev-dependencies]
assert_cmd = "2"
goldenfile = "1.7.3"
tempfile = "3.4"
port-selector = "0.1.6"
2 changes: 1 addition & 1 deletion src/sources/hydra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ impl gasket::framework::Worker<Stage> for Worker {

#[derive(Deserialize)]
pub struct Config {
hydra_socket_url: String,
pub hydra_socket_url: String,
}

impl Config {
Expand Down
100 changes: 62 additions & 38 deletions tests/hydra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@ use std::time::Duration;
use anyhow::Result;
use futures_util::SinkExt;
use oura::sources::hydra::{HydraMessage, HydraMessagePayload};
use oura::sinks::Config::FileRotate;
use serde_json::json;
use tokio::net::TcpListener;
use oura::sources::Config::Hydra;
use oura::daemon::{run_daemon, ConfigRoot};
use gasket::daemon::Daemon;
use goldenfile::Mint;
use std::io::Write;
use tempfile::NamedTempFile;
use tokio::runtime::Runtime;
use tokio::time;
use tokio_tungstenite::accept_async;
use port_selector::random_free_port;
use tokio_tungstenite::tungstenite::protocol::Message;

type TestResult = Result<(), Box<dyn std::error::Error>>;
Expand Down Expand Up @@ -455,74 +461,92 @@ fn scenario_2() -> TestResult {

#[test]
fn hydra_oura_stdout_scenario_1() -> TestResult {
let golden_jsons = fs::read_to_string("tests/hydra/golden_1.txt")?;
hydra_oura_stdout_test("tests/hydra/scenario_1.txt".to_string(), golden_jsons)
hydra_oura_stdout_test("tests/hydra/scenario_1.txt".to_string(), "golden_1".to_string())
}

#[test]
fn hydra_oura_stdout_scenario_2() -> TestResult {
let golden_jsons = fs::read_to_string("tests/hydra/golden_2.txt")?;
hydra_oura_stdout_test("tests/hydra/scenario_2.txt".to_string(), golden_jsons)
hydra_oura_stdout_test("tests/hydra/scenario_2.txt".to_string(), "golden_2".to_string())
}

// Run:
// cargo test hydra_oura -- --nocapture
// in order to see println
fn hydra_oura_stdout_test(file: String, expected: String) -> TestResult {
fn hydra_oura_stdout_test(scenario_file: String, golden_name: String) -> TestResult {

let mut mint = Mint::new("tests/hydra");
let mut golden = mint.new_goldenfile(golden_name.clone()).unwrap();

let rt = Runtime::new().unwrap();
let _ = rt.block_on(async move {
let addr = "127.0.0.1:4001".to_string();
let port: u16 = random_free_port().unwrap();
let addr= format!("127.0.0.1:{}", port);
let server = TcpListener::bind(&addr).await?;
println!("WebSocket server started on ws://{}", addr);
let output_file = NamedTempFile::new()?;
let config = test_config(&output_file, &addr);

let _ = tokio::spawn(async move { oura_pipeline().await });
println!("WebSocket server starting on ws://{}", addr);

mock_hydra_node(server, file).await;

let emitted_jsons = fs::read_to_string("tests/hydra/logs.txt")?;
assert_eq!(emitted_jsons, expected);
let _ = tokio::spawn(async move { run_oura(config) });
let _ = mock_hydra_node(server, scenario_file).await;

// After the connection is established, give oura time to process
// the chain data and write the output to disk.
time::sleep(Duration::from_secs(3)).await;
let emitted_jsons= fs::read_to_string(output_file)?;
golden.write_all(emitted_jsons.as_bytes()).unwrap();
println!("test done, will exit");
Ok::<(), std::io::Error>(())
});
Ok(())
}

/// Will await the first connection, and then return while handling it in the
/// background.
async fn mock_hydra_node(server: TcpListener, file: String) {
let (tx, _rx) = mpsc::channel();
while let Ok((stream, _)) = server.accept().await {
tokio::spawn(handle_connection(stream, file, tx));
time::sleep(Duration::from_secs(3)).await;
break;
async fn handle_connection(
stream: tokio::net::TcpStream,
file: String,
tx: mpsc::Sender<usize>,
) -> Result<()> {
let mut ws_stream = accept_async(stream).await?;
println!("WebSocket server oura connection established");

let to_send = fs::read_to_string(file)?;

let mut lines = 0;
for line in to_send.lines() {
ws_stream.send(Message::Text(line.to_string())).await?;
lines += 1;
}
tx.send(lines).unwrap();
Ok(())
}

let (tx, _rx) = mpsc::channel();
let (stream, _) = server.accept().await.unwrap();
let _ = tokio::spawn(handle_connection(stream, file, tx));
}

async fn handle_connection(
stream: tokio::net::TcpStream,
file: String,
tx: mpsc::Sender<usize>,
) -> Result<()> {
let mut ws_stream = accept_async(stream).await?;
println!("WebSocket server oura connection established");

let to_send = fs::read_to_string(file)?;
fn test_config(tmp_output_file: &NamedTempFile, socket_path: &String) -> ConfigRoot {
let mut config = ConfigRoot::new(&Some(PathBuf::from("tests/daemon.toml"))).unwrap();

let mut lines = 0;
for line in to_send.lines() {
ws_stream.send(Message::Text(line.to_string())).await?;
lines += 1;
if let FileRotate(ref mut file_rotate) = config.sink {
file_rotate.output_path = Some(tmp_output_file.path().to_string_lossy().to_string());
} else {
panic!("assumed config template to use file_rotate sink");
}
tx.send(lines).unwrap();

Ok(())
}
if let Hydra(ref mut hydra_config) = config.source {
hydra_config.hydra_socket_url = "ws://".to_string() + socket_path;
} else {
panic!("assumed config template to use hydra source");
}

async fn oura_pipeline() -> Result<()> {
tokio::spawn(invoke_pipeline());
time::sleep(Duration::from_secs(1)).await;
Ok(())
config
}

async fn invoke_pipeline() -> Result<Daemon> {
let config = ConfigRoot::new(&Some(PathBuf::from("tests/daemon.toml")))?;
fn run_oura(config: ConfigRoot) -> Result<Daemon> {
run_daemon(config).map_err(|e| anyhow::anyhow!(e))
}
File renamed without changes.
File renamed without changes.

0 comments on commit f32dded

Please sign in to comment.