From 53686c83d1cebd6d7168c224ac108968a0b713ad Mon Sep 17 00:00:00 2001 From: Patrick Nordahl Date: Thu, 21 Nov 2024 04:24:01 -0600 Subject: [PATCH 1/2] shuffling error types and refactor dag label/node lookups --- src/app/analyze.rs | 6 +-- src/app/run.rs | 11 +--- src/core/error.rs | 45 ++++------------ src/core/graph.rs | 126 ++++++++++++++++++++++++++++++--------------- src/core/mod.rs | 31 +++-------- 5 files changed, 106 insertions(+), 113 deletions(-) diff --git a/src/app/analyze.rs b/src/app/analyze.rs index 5853e69..5ace994 100644 --- a/src/app/analyze.rs +++ b/src/app/analyze.rs @@ -5,7 +5,7 @@ use std::result::Result; use serde::Serialize; -use crate::core::error::{GraphError, MonorailError}; +use crate::core::error::MonorailError; use crate::core::{self, Change, ChangeProviderKind}; use crate::core::{git, tracking}; @@ -357,9 +357,7 @@ pub(crate) fn analyze( for group in groups.iter().rev() { let mut pg: Vec = vec![]; for id in group { - let label = index.dag.node2label.get(id).ok_or_else(|| { - MonorailError::DependencyGraph(GraphError::LabelNotFound(*id)) - })?; + let label = index.dag.get_label_by_node(id)?; if output_targets.contains::(label) { pg.push(label.to_owned()); } diff --git a/src/app/run.rs b/src/app/run.rs index 22eca74..40ab2bf 100644 --- a/src/app/run.rs +++ b/src/app/run.rs @@ -14,7 +14,7 @@ use sha2::Digest; use tracing::{debug, error, info, instrument}; use crate::app::{analyze, log, result, target}; -use crate::core::error::{GraphError, MonorailError}; +use crate::core::error::MonorailError; use crate::core::{self, file, git, tracking, ChangeProviderKind, Target}; #[derive(Debug)] @@ -409,14 +409,7 @@ fn get_plan<'a>( .targets .insert(target_path.to_string(), target_hash.clone()); let logs = Logs::new(run_path, cmd.as_str(), &target_hash)?; - let target_index = index - .dag - .label2node - .get(target_path.as_str()) - .copied() - .ok_or(MonorailError::DependencyGraph( - GraphError::LabelNodeNotFound(target_path.to_owned()), - ))?; + let target_index = index.dag.get_node_by_label(target_path)?; let tar = targets .get(target_index) .ok_or(MonorailError::from("Target not found"))?; diff --git a/src/core/error.rs b/src/core/error.rs index 3d704a5..90f4c7b 100644 --- a/src/core/error.rs +++ b/src/core/error.rs @@ -2,37 +2,7 @@ use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use std::{fmt, io, num, str}; -use crate::core::server; - -#[derive(Debug, Serialize)] -pub enum GraphError { - LabelNotFound(usize), - Cycle(usize, String), - Connected, - DuplicateLabel(String), - LabelNodeNotFound(String), -} -impl fmt::Display for GraphError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - GraphError::LabelNotFound(node) => { - write!(f, "Label not found for node: {}", node) - } - GraphError::Cycle(node, label) => { - write!(f, "Cycle detected at node: {}, label: {}", node, label) - } - GraphError::Connected => { - write!(f, "Graph is fully connected, with no free nodes",) - } - GraphError::DuplicateLabel(label) => { - write!(f, "Duplicate label provided: {}", label) - } - GraphError::LabelNodeNotFound(label) => { - write!(f, "Node not found for label: {}", label) - } - } - } -} +use crate::core::{graph, server}; #[derive(Debug)] pub enum MonorailError { @@ -43,7 +13,7 @@ pub enum MonorailError { SerdeJSON(serde_json::error::Error), Utf8(str::Utf8Error), ParseInt(num::ParseIntError), - DependencyGraph(GraphError), + Graph(graph::GraphError), Join(tokio::task::JoinError), TrackingCheckpointNotFound(io::Error), TrackingRunNotFound(io::Error), @@ -58,6 +28,11 @@ impl From for MonorailError { MonorailError::Server(error) } } +impl From for MonorailError { + fn from(error: graph::GraphError) -> Self { + MonorailError::Graph(error) + } +} impl From> for MonorailError { fn from(error: tokio::sync::mpsc::error::SendError) -> Self { MonorailError::ChannelSend(error.to_string()) @@ -109,7 +84,7 @@ impl fmt::Display for MonorailError { MonorailError::SerdeJSON(error) => write!(f, "{}", error), MonorailError::Utf8(error) => write!(f, "{}", error), MonorailError::ParseInt(error) => write!(f, "{}", error), - MonorailError::DependencyGraph(error) => { + MonorailError::Graph(error) => { write!(f, "{}", error) } MonorailError::Join(error) => write!(f, "Task join error; {}", error), @@ -170,8 +145,8 @@ impl Serialize for MonorailError { state.serialize_field("type", "parse_int")?; state.serialize_field("message", &self.to_string())?; } - MonorailError::DependencyGraph(_) => { - state.serialize_field("type", "dependency_graph")?; + MonorailError::Graph(_) => { + state.serialize_field("type", "graph")?; state.serialize_field("message", &self.to_string())?; } MonorailError::PathDNE(_) => { diff --git a/src/core/graph.rs b/src/core/graph.rs index acf66f5..d3d413a 100644 --- a/src/core/graph.rs +++ b/src/core/graph.rs @@ -1,7 +1,40 @@ -use crate::core::error::{GraphError, MonorailError}; use std::cmp::{Eq, PartialEq}; use std::collections::{HashMap, HashSet, VecDeque}; -use std::io::Write; +use std::{fmt, io::Write, str}; + +#[derive(Debug)] +pub enum GraphError { + DotFileIo(std::io::Error), + LabelNotFound(usize), + Cycle(usize, String), + Connected, + DuplicateLabel(String), + LabelNodeNotFound(String), +} +impl fmt::Display for GraphError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + GraphError::DotFileIo(error) => { + write!(f, "Dot file i/o error: {}", error) + } + GraphError::LabelNotFound(node) => { + write!(f, "Label not found for node: {}", node) + } + GraphError::Cycle(node, label) => { + write!(f, "Cycle detected at node: {}, label: {}", node, label) + } + GraphError::Connected => { + write!(f, "Graph is fully connected, with no free nodes",) + } + GraphError::DuplicateLabel(label) => { + write!(f, "Duplicate label provided: {}", label) + } + GraphError::LabelNodeNotFound(label) => { + write!(f, "Node not found for label: {}", label) + } + } + } +} #[derive(Debug, Eq, PartialEq)] enum CycleState { @@ -37,15 +70,13 @@ impl Dag { } // TODO: test - pub fn get_labeled_groups(&mut self) -> Result>, MonorailError> { + pub fn get_labeled_groups(&mut self) -> Result>, GraphError> { let groups = self.get_groups()?; let mut o = Vec::with_capacity(groups.len()); for group in groups.iter().rev() { let mut labels = vec![]; for node in group { - let label = self.node2label.get(node).ok_or_else(|| { - MonorailError::DependencyGraph(GraphError::LabelNotFound(*node)) - })?; + let label = self.get_label_by_node(node)?; labels.push(label.to_owned()); } o.push(labels); @@ -58,21 +89,34 @@ impl Dag { self.adj_list[node] = nodes; } - pub fn set_label(&mut self, label: &str, node: usize) { + pub fn set_label(&mut self, label: &str, node: usize) -> Result<(), GraphError> { + if self.label2node.contains_key(label) { + return Err(GraphError::DuplicateLabel(label.to_owned())); + } // update internal hashmaps let l = label.to_owned(); self.label2node.insert(l.clone(), node); self.node2label.insert(node, l); + Ok(()) + } + + pub(crate) fn get_node_by_label(&self, label: &str) -> Result { + self.label2node + .get(label) + .copied() + .ok_or(GraphError::LabelNodeNotFound(label.to_owned())) + } + + pub(crate) fn get_label_by_node(&self, node: &usize) -> Result<&String, GraphError> { + self.node2label + .get(node) + .ok_or_else(|| GraphError::LabelNotFound(*node)) } // Walk the graph from node and mark all descendents with the provided visibility. // By default, all nodes are false, and calling this is required to make a // subtree visible during graph traversals. - pub fn set_subtree_visibility( - &mut self, - node: usize, - visible: bool, - ) -> Result<(), MonorailError> { + pub fn set_subtree_visibility(&mut self, node: usize, visible: bool) -> Result<(), GraphError> { let mut work: VecDeque = VecDeque::new(); let mut visited = HashSet::new(); let mut active = HashSet::new(); @@ -83,13 +127,8 @@ impl Dag { active.remove(&n); for &depn in &self.adj_list[n] { if active.contains(&depn) { - let label = self.node2label.get(&depn).ok_or_else(|| { - MonorailError::DependencyGraph(GraphError::LabelNotFound(depn)) - })?; - return Err(MonorailError::DependencyGraph(GraphError::Cycle( - depn, - label.to_owned(), - ))); + let label = self.get_label_by_node(&depn)?; + return Err(GraphError::Cycle(depn, label.to_owned())); } if !visited.contains(&depn) { work.push_back(depn); @@ -102,7 +141,7 @@ impl Dag { } // Uses Kahn's Algorithm to walk the graph and build lists of nodes that are independent of each other at that level. In addition, this function will compute a cycle detection if it is not yet known. - pub fn get_groups(&mut self) -> Result>, MonorailError> { + pub fn get_groups(&mut self) -> Result>, GraphError> { self.check_acyclic()?; let mut groups = Vec::new(); @@ -152,15 +191,10 @@ impl Dag { Ok(groups) } - fn check_acyclic(&self) -> Result<(), MonorailError> { + fn check_acyclic(&self) -> Result<(), GraphError> { if let CycleState::Yes(cycle_node) = self.cycle_state { - let label = self.node2label.get(&cycle_node).ok_or_else(|| { - MonorailError::DependencyGraph(GraphError::LabelNotFound(cycle_node)) - })?; - return Err(MonorailError::DependencyGraph(GraphError::Cycle( - cycle_node, - label.to_owned(), - ))); + let label = self.get_label_by_node(&cycle_node)?; + return Err(GraphError::Cycle(cycle_node, label.to_owned())); } Ok(()) } @@ -176,12 +210,13 @@ impl Dag { } // Render the graph as a .dot file for use with graphviz, etc. - pub(crate) fn render_dotfile(&self, p: &std::path::Path) -> Result<(), MonorailError> { + pub(crate) fn render_dotfile(&self, p: &std::path::Path) -> Result<(), GraphError> { let mut f = std::fs::OpenOptions::new() .create(true) .write(true) .truncate(true) - .open(p)?; + .open(p) + .map_err(GraphError::DotFileIo)?; let mut s = String::new(); s.push_str("digraph DAG {\n"); @@ -191,9 +226,7 @@ impl Dag { s.push_str(&format!( "{} [label=\"{}\"];\n", n, - self.node2label - .get(&n) - .ok_or(MonorailError::DependencyGraph(GraphError::LabelNotFound(n)))? + self.get_label_by_node(&n)? )); } // emit edges @@ -210,7 +243,7 @@ impl Dag { s.push('}'); // write string to file - write!(f, "{}", s)?; + write!(f, "{}", s).map_err(GraphError::DotFileIo)?; Ok(()) } } @@ -220,16 +253,21 @@ mod tests { use super::*; use crate::core::testing::*; - fn fill_dag(data: Vec<(&str, usize, Vec, bool)>) -> Dag { + fn fill_dag(data: Vec<(&str, usize, Vec, bool)>) -> Result { let mut dag = Dag::new(data.len()); for d in data.iter() { - dag.set_label(d.0, d.1); + dag.set_label(d.0, d.1)?; dag.set(d.1, d.2.clone()); dag.visibility[d.1] = d.3; } - dag + Ok(dag) } + #[test] + fn test_label_exists_err() { + let mut dag = fill_dag(vec![("0", 0, vec![1, 2], true), ("1", 1, vec![2], true)]).unwrap(); + assert!(dag.set_label("1", 0).is_err()) + } #[test] fn test_dag_render_dotfile() { let td = new_testdir().unwrap(); @@ -240,7 +278,8 @@ mod tests { ("1", 1, vec![2], true), ("2", 2, vec![], true), ("3", 3, vec![1], true), - ]); + ]) + .unwrap(); let res = dag.render_dotfile(&p); assert!(res.is_ok()); @@ -263,7 +302,8 @@ mod tests { ("1", 1, vec![2], true), ("2", 2, vec![], true), ("3", 3, vec![1], true), - ]); + ]) + .unwrap(); let g = dag.get_groups().unwrap(); assert_eq!(g[0], &[0, 3]); @@ -273,7 +313,7 @@ mod tests { #[test] fn test_dag_get_groups_err_cyclic() { - let mut dag = fill_dag(vec![("0", 0, vec![1], true), ("1", 1, vec![0], true)]); + let mut dag = fill_dag(vec![("0", 0, vec![1], true), ("1", 1, vec![0], true)]).unwrap(); assert!(dag.get_groups().is_err()); } @@ -285,7 +325,8 @@ mod tests { ("1", 1, vec![2], true), ("2", 2, vec![], true), ("3", 3, vec![1], true), - ]); + ]) + .unwrap(); dag.set_subtree_visibility(1, false).unwrap(); assert!(dag.visibility[0]); @@ -301,7 +342,8 @@ mod tests { ("1", 1, vec![2], false), ("2", 2, vec![0], false), ("3", 3, vec![1], false), - ]); + ]) + .unwrap(); assert!(dag.set_subtree_visibility(0, true).is_err()); } } diff --git a/src/core/mod.rs b/src/core/mod.rs index 363f389..3112d15 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize}; use sha2::Digest; use trie_rs::{Trie, TrieBuilder}; -use crate::core::error::{GraphError, MonorailError}; +use crate::core::error::MonorailError; use tracing::error; #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] @@ -387,12 +387,9 @@ impl<'a> Index<'a> { targets.push(target.path.to_owned()); let target_path_str = target.path.as_str(); file::contains_file(&work_path.join(target_path_str))?; - if dag.label2node.contains_key(target_path_str) { - return Err(MonorailError::DependencyGraph(GraphError::DuplicateLabel( - target.path.to_owned(), - ))); - } - dag.set_label(&target.path, i); + + dag.set_label(&target.path, i) + .map_err(MonorailError::from)?; targets_builder.push(&target.path); if let Some(ignores) = target.ignores.as_ref() { @@ -404,7 +401,7 @@ impl<'a> Index<'a> { .push(target_path_str); }); } - Ok(()) + Ok::<(), MonorailError>(()) })?; let targets_trie = targets_builder.build(); @@ -416,11 +413,7 @@ impl<'a> Index<'a> { let mut nodes = targets_trie .common_prefix_search(target_path_str) .filter(|t: &String| t != &target.path) - .map(|t| { - dag.label2node.get(t.as_str()).copied().ok_or_else(|| { - MonorailError::DependencyGraph(GraphError::LabelNodeNotFound(t)) - }) - }) + .map(|t| dag.get_node_by_label(&t).map_err(MonorailError::from)) .collect::, MonorailError>>()?; if let Some(uses) = &target.uses { @@ -436,13 +429,7 @@ impl<'a> Index<'a> { matching_targets .iter() .filter(|&t| t != &target.path) - .map(|t| { - dag.label2node.get(t.as_str()).copied().ok_or_else(|| { - MonorailError::DependencyGraph(GraphError::LabelNodeNotFound( - t.to_owned(), - )) - }) - }) + .map(|t| dag.get_node_by_label(t).map_err(MonorailError::from)) .collect::, MonorailError>>()?, ); } @@ -455,9 +442,7 @@ impl<'a> Index<'a> { // now that the graph is fully constructed, set subtree visibility for t in visible_targets { - let node = dag.label2node.get(t.as_str()).copied().ok_or_else(|| { - MonorailError::DependencyGraph(GraphError::LabelNodeNotFound(t.to_owned().clone())) - })?; + let node = dag.get_node_by_label(t)?; dag.set_subtree_visibility(node, true)?; } From b3b67b25d1a8647e6e0fa5deebbe0c16f857fe3e Mon Sep 17 00:00:00 2001 From: Patrick Nordahl Date: Thu, 21 Nov 2024 06:01:21 -0600 Subject: [PATCH 2/2] log server config --- CHANGELOG.md | 5 ++ Monorail.reference.js | 21 ++++++ TUTORIAL.md | 15 +++- src/api/cli.rs | 6 +- src/app/log.rs | 83 ++++++----------------- src/app/run.rs | 15 ++-- src/core/mod.rs | 26 +++---- src/core/server.rs | 154 ++++++++++++++++++++++++++++++++++++++---- 8 files changed, 215 insertions(+), 110 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c1143b..77b2b07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Change Log +## [3.5.5] - 2024-11-21 + +### Added +- `server.log` config + ## [3.5.4] - 2024-11-20 ### Added diff --git a/Monorail.reference.js b/Monorail.reference.js index 28232ab..9743bff 100644 --- a/Monorail.reference.js +++ b/Monorail.reference.js @@ -147,6 +147,27 @@ const reference = { ] }, "server": { + // Log server for command output streams + "log": { + /* + Host to bind. + + Optional: default: "127.0.0.1" + */ + "host": "127.0.0.1", + /* + Port to bind. + + Optional: default: 5917 + */ + "port": 5918, + /* + Milliseconds to wait for a successful bind of the host:port. + + Optional: default: 1000 + */ + "bind_timeout_ms": 1000, + }, // Lock server used for concurrency control of a subset of monorail APIs. "lock": { /* diff --git a/TUTORIAL.md b/TUTORIAL.md index eac6ca0..4c8c029 100644 --- a/TUTORIAL.md +++ b/TUTORIAL.md @@ -76,13 +76,22 @@ monorail config show | jq } } ], - "log": { - "flush_interval_ms": 500 + "server": { + "log": { + "host": "127.0.0.1", + "port": 5918, + "bind_timeout_ms": 1000 + }, + "lock": { + "host": "127.0.0.1", + "port": 5917, + "bind_timeout_ms": 1000 + } } } ``` -This output includes some default values for things not specified, but otherwise reflects what we have entered. An additional note about the location of the `Monorail.json` file; you can specify an absolute path with `-c`, e.g. `monorail -c `, and this will be used instead of the default (`$(pwd)/Monorail.json`). All of `monorail`s commands are executed, and internal tracking files and logs stored, _relative to this path_. +This output includes some default values for things not specified (irrelevant for the purposes of this tutorial), but otherwise reflects what we have entered. An additional note about the location of the `Monorail.json` file; you can specify an absolute path with `-c`, e.g. `monorail -c `, and this will be used instead of the default (`$(pwd)/Monorail.json`). All of `monorail`s commands are executed, and internal tracking files and logs stored, _relative to this path_. Before we continue, let's create an initial `checkpoint`. The checkpoint will be described in more detail in a later section of this tutorial, but think of it as a "marker" for the beginning of a change detection window. For now, just run the command: diff --git a/src/api/cli.rs b/src/api/cli.rs index 16c6cdc..4331eba 100644 --- a/src/api/cli.rs +++ b/src/api/cli.rs @@ -897,12 +897,12 @@ impl<'a> TryFrom<&'a clap::ArgMatches> for app::log::LogTailInput { type Error = MonorailError; fn try_from(cmd: &'a clap::ArgMatches) -> Result { Ok(Self { - filter_input: app::log::LogFilterInput::try_from(cmd)?, + filter_input: core::server::LogFilterInput::try_from(cmd)?, }) } } -impl<'a> TryFrom<&'a clap::ArgMatches> for app::log::LogFilterInput { +impl<'a> TryFrom<&'a clap::ArgMatches> for core::server::LogFilterInput { type Error = MonorailError; fn try_from(cmd: &'a clap::ArgMatches) -> Result { Ok(Self { @@ -928,7 +928,7 @@ impl<'a> TryFrom<&'a clap::ArgMatches> for app::log::LogShowInput<'a> { type Error = MonorailError; fn try_from(cmd: &'a clap::ArgMatches) -> Result { Ok(Self { - filter_input: app::log::LogFilterInput::try_from(cmd)?, + filter_input: core::server::LogFilterInput::try_from(cmd)?, id: cmd.get_one::(ARG_ID), }) } diff --git a/src/app/log.rs b/src/app/log.rs index d68469c..a5d0195 100644 --- a/src/app/log.rs +++ b/src/app/log.rs @@ -3,7 +3,7 @@ use std::io::Write; use std::result::Result; use std::{path, sync}; -use serde::{Deserialize, Serialize}; +use serde::Serialize; use sha2::Digest; use std::io::BufRead; @@ -11,27 +11,24 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; use tokio::sync::mpsc; use tracing::{debug, info, instrument, trace}; -use crate::core::{self, error::MonorailError, tracking}; +use crate::core::{self, error::MonorailError, server, tracking}; pub(crate) const STDOUT_FILE: &str = "stdout.zst"; pub(crate) const STDERR_FILE: &str = "stderr.zst"; pub(crate) const RESET_COLOR: &str = "\x1b[0m"; -#[derive(Debug, Clone, Deserialize, Serialize)] -pub(crate) struct LogFilterInput { - pub(crate) commands: HashSet, - pub(crate) targets: HashSet, - pub(crate) include_stdout: bool, - pub(crate) include_stderr: bool, -} +// How often to flush accumulated logs to clients and compressors. +// While this could technically be configurable, a user is unlikely +// to know what to set this to, so we'll leave it a constant for now. +const FLUSH_INTERVAL_MS: u64 = 500_u64; #[derive(Serialize)] pub(crate) struct LogTailInput { - pub(crate) filter_input: LogFilterInput, + pub(crate) filter_input: server::LogFilterInput, } pub(crate) async fn log_tail<'a>( - _cfg: &'a core::Config, + cfg: &'a core::Config, input: &LogTailInput, ) -> Result<(), MonorailError> { // require at least one of the log types be opted into @@ -40,14 +37,14 @@ pub(crate) async fn log_tail<'a>( "No stream selected; provide one or both of: --stdout, --stderr", )); } - let mut lss = StreamServer::new("127.0.0.1:9201", &input.filter_input); - lss.listen().await + let ls = server::LogServer::new(cfg.server.log.clone(), &input.filter_input); + ls.serve().await.map_err(MonorailError::from) } #[derive(Serialize)] pub(crate) struct LogShowInput<'a> { pub(crate) id: Option<&'a usize>, - pub(crate) filter_input: LogFilterInput, + pub(crate) filter_input: server::LogFilterInput, } pub(crate) fn log_show<'a>( @@ -174,48 +171,12 @@ fn stream_archive_file_to_stdout( Ok(()) } -pub(crate) struct StreamServer<'a> { - address: &'a str, - args: &'a LogFilterInput, -} -impl<'a> StreamServer<'a> { - pub(crate) fn new(address: &'a str, args: &'a LogFilterInput) -> Self { - Self { address, args } - } - pub(crate) async fn listen(&mut self) -> Result<(), MonorailError> { - let listener = tokio::net::TcpListener::bind(self.address).await?; - let args_data = serde_json::to_vec(&self.args)?; - debug!("Log stream server listening"); - loop { - let (mut socket, _) = listener.accept().await?; - debug!("Client connected"); - // first, write to the client what we're interested in receiving - socket.write_all(&args_data).await?; - _ = socket.write(b"\n").await?; - debug!("Sent log stream arguments"); - Self::process(socket).await?; - } - } - #[instrument] - async fn process(socket: tokio::net::TcpStream) -> Result<(), MonorailError> { - let br = tokio::io::BufReader::new(socket); - let mut lines = br.lines(); - let mut stdout = tokio::io::stdout(); - while let Some(line) = lines.next_line().await? { - stdout.write_all(line.as_bytes()).await?; - _ = stdout.write(b"\n").await?; - stdout.flush().await?; - } - Ok(()) - } -} - #[derive(Debug, Clone)] -pub(crate) struct StreamClient { +pub(crate) struct LogServerClient { stream: sync::Arc>, - pub(crate) args: LogFilterInput, + pub(crate) args: server::LogFilterInput, } -impl StreamClient { +impl LogServerClient { #[instrument] pub(crate) async fn data( &mut self, @@ -230,14 +191,14 @@ impl StreamClient { Ok(()) } #[instrument] - pub(crate) async fn connect(addr: &str) -> Result { - let mut stream = tokio::net::TcpStream::connect(addr).await?; - info!(address = addr, "Connected to log stream server"); + pub(crate) async fn connect(cfg: &server::LogServerConfig) -> Result { + let mut stream = tokio::net::TcpStream::connect(cfg.address().as_str()).await?; + info!(address = cfg.address(), "Connected to log stream server"); let mut args_data = Vec::new(); // pull arg preferences from the server on connect let mut br = tokio::io::BufReader::new(&mut stream); br.read_until(b'\n', &mut args_data).await?; - let args: LogFilterInput = serde_json::from_slice(args_data.as_slice())?; + let args: server::LogFilterInput = serde_json::from_slice(args_data.as_slice())?; debug!("Received log stream arguments"); if args.include_stdout || args.include_stderr { let targets = if args.targets.is_empty() { @@ -271,18 +232,16 @@ impl StreamClient { } pub(crate) async fn process_reader( - config: &core::LogConfig, mut reader: tokio::io::BufReader, compressor_client: CompressorClient, header: String, - mut log_stream_client: Option, + mut log_stream_client: Option, token: sync::Arc, ) -> Result<(), MonorailError> where R: tokio::io::AsyncRead + Unpin, { - let mut interval = - tokio::time::interval(tokio::time::Duration::from_millis(config.flush_interval_ms)); + let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(FLUSH_INTERVAL_MS)); loop { let mut bufs = Vec::new(); loop { @@ -321,7 +280,7 @@ async fn process_bufs( header: &str, bufs: Vec>, compressor_client: &CompressorClient, - log_stream_client: &mut Option, + log_stream_client: &mut Option, should_end: bool, ) -> Result<(), MonorailError> { if !bufs.is_empty() { diff --git a/src/app/run.rs b/src/app/run.rs index 40ab2bf..c216f0f 100644 --- a/src/app/run.rs +++ b/src/app/run.rs @@ -15,7 +15,7 @@ use tracing::{debug, error, info, instrument}; use crate::app::{analyze, log, result, target}; use crate::core::error::MonorailError; -use crate::core::{self, file, git, tracking, ChangeProviderKind, Target}; +use crate::core::{self, file, git, server, tracking, ChangeProviderKind, Target}; #[derive(Debug)] pub(crate) struct HandleRunInput<'a> { @@ -492,10 +492,9 @@ struct CommandTask { token: sync::Arc, target: sync::Arc, command: sync::Arc, - log_config: sync::Arc, stdout_client: log::CompressorClient, stderr_client: log::CompressorClient, - log_stream_client: Option, + log_stream_client: Option, } impl CommandTask { #[allow(clippy::too_many_arguments)] @@ -533,7 +532,6 @@ impl CommandTask { true, ); let stdout_fut = log::process_reader( - &self.log_config, tokio::io::BufReader::new( child .stdout @@ -558,7 +556,6 @@ impl CommandTask { true, ); let stderr_fut = log::process_reader( - &self.log_config, tokio::io::BufReader::new( child .stderr @@ -656,8 +653,8 @@ fn get_all_commands<'a>( Ok(all_commands) } -async fn initialize_log_stream(addr: &str) -> Option { - match log::StreamClient::connect(addr).await { +async fn initialize_log_stream(cfg: &server::LogServerConfig) -> Option { + match log::LogServerClient::connect(cfg).await { Ok(client) => Some(client), Err(e) => { debug!(error = e.to_string(), "Log streaming disabled"); @@ -898,7 +895,7 @@ async fn process_plan( fail_on_undefined: bool, ) -> Result<(Vec, bool), MonorailError> { // TODO: parameterize addr from cfg - let log_stream_client = initialize_log_stream("127.0.0.1:9201").await; + let log_stream_client = initialize_log_stream(&cfg.server.log).await; let mut results = Vec::new(); let mut failed = false; @@ -923,7 +920,6 @@ async fn process_plan( continue; } - let log_config = sync::Arc::new(cfg.log.clone()); let mut crr = CommandRunResult::new(command); for plan_targets in plan_command_target_group.target_groups.iter() { @@ -949,7 +945,6 @@ async fn process_plan( token: token.clone(), target, command, - log_config: log_config.clone(), stdout_client: clients.0.clone(), stderr_client: clients.1.clone(), log_stream_client: log_stream_client.clone(), diff --git a/src/core/mod.rs b/src/core/mod.rs index 3112d15..11c8b59 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -53,21 +53,6 @@ impl FromStr for ChangeProviderKind { } } -#[derive(Debug, Clone, Deserialize, Serialize)] -#[serde(deny_unknown_fields)] -pub(crate) struct LogConfig { - // Tick frequency for flushing accumulated logs to stream - // and compression tasks - pub(crate) flush_interval_ms: u64, -} -impl Default for LogConfig { - fn default() -> Self { - Self { - flush_interval_ms: 500, - } - } -} - #[derive(Debug, Clone, Deserialize, Serialize)] pub(crate) enum AlgorithmKind { #[serde(rename = "sha256")] @@ -85,6 +70,13 @@ pub(crate) struct ConfigSource { pub(crate) checksum: Option, } +#[derive(Debug, Default, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub(crate) struct ServerConfig { + pub(crate) log: server::LogServerConfig, + pub(crate) lock: server::LockServerConfig, +} + #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub(crate) struct Config { @@ -101,9 +93,7 @@ pub(crate) struct Config { #[serde(skip_serializing_if = "Option::is_none")] pub(crate) sequences: Option>>, #[serde(default)] - pub(crate) log: LogConfig, - #[serde(default)] - pub(crate) server: server::ServerConfig, + pub(crate) server: ServerConfig, // sha256 of the file used to deserialize #[serde(skip)] diff --git a/src/core/server.rs b/src/core/server.rs index 2fd3ea1..d7b1f39 100644 --- a/src/core/server.rs +++ b/src/core/server.rs @@ -1,15 +1,32 @@ use serde::{Deserialize, Serialize}; use std::{fmt, str}; -use tracing::{error, info}; + +use std::collections::HashSet; +use std::result::Result; + +use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; +use tracing::{debug, error, info, instrument}; #[derive(Debug)] pub enum ServerError { + Filter(String), + LogClient(std::io::Error), + LogServer(std::io::Error), BindTimeout(tokio::time::error::Elapsed), Lock(std::io::Error), } impl fmt::Display for ServerError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { + ServerError::LogServer(e) => { + write!(f, "Log server error: {}", e) + } + ServerError::LogClient(e) => { + write!(f, "Log client error: {}", e) + } + ServerError::Filter(e) => { + write!(f, "Invalid log filter: {}", e) + } ServerError::BindTimeout(e) => { write!(f, "Bind timed out: {}", e) } @@ -20,42 +37,151 @@ impl fmt::Display for ServerError { } } -#[derive(Debug, Default, Serialize, Deserialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields)] -pub(crate) struct ServerConfig { - pub(crate) lock: LockServerConfig, +pub(crate) struct LogServerConfig { + #[serde(default = "default_host")] + pub(crate) host: String, + #[serde(default = "LogServerConfig::default_port")] + pub(crate) port: usize, + #[serde(default = "default_bind_timeout_ms")] + pub(crate) bind_timeout_ms: u64, +} +impl LogServerConfig { + pub(crate) fn address(&self) -> String { + format!("{}:{}", self.host, self.port) + } + fn default_port() -> usize { + 5918 + } +} +impl Default for LogServerConfig { + fn default() -> Self { + Self { + host: default_host(), + port: Self::default_port(), + bind_timeout_ms: default_bind_timeout_ms(), + } + } } + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub(crate) struct LogFilterInput { + pub(crate) commands: HashSet, + pub(crate) targets: HashSet, + pub(crate) include_stdout: bool, + pub(crate) include_stderr: bool, +} + +pub(crate) struct LogServer<'a> { + config: LogServerConfig, + address: String, + bind_timeout: std::time::Duration, + filter: &'a LogFilterInput, +} +impl<'a> LogServer<'a> { + pub(crate) fn new(config: LogServerConfig, filter: &'a LogFilterInput) -> Self { + let address = config.address(); + let bind_timeout = std::time::Duration::from_millis(config.bind_timeout_ms); + Self { + config, + address, + bind_timeout, + filter, + } + } + pub(crate) async fn serve(self) -> Result<(), ServerError> { + info!( + address = &self.address, + timeout = &self.config.bind_timeout_ms, + "Log server binding" + ); + let timeout_res = tokio::time::timeout( + self.bind_timeout, + tokio::net::TcpListener::bind(&self.address), + ) + .await; + match timeout_res { + Ok(Ok(listener)) => { + info!("Log server accepting clients"); + let mut args_data = serde_json::to_vec(&self.filter) + .map_err(|e| ServerError::Filter(e.to_string()))?; + args_data.push(b'\n'); + loop { + let (mut socket, _) = + listener.accept().await.map_err(ServerError::LogServer)?; + debug!("Log server client connected"); + // first, write to the client what we're interested in receiving + socket + .write_all(&args_data) + .await + .map_err(ServerError::LogServer)?; + debug!("Sent log stream arguments to client"); + Self::process(socket).await?; + } + } + Ok(Err(e)) => { + error!(address = &self.address, "Log server bind failed"); + Err(ServerError::Lock(e)) + } + Err(e) => Err(ServerError::BindTimeout(e)), + } + } + #[instrument] + async fn process(socket: tokio::net::TcpStream) -> Result<(), ServerError> { + let br = tokio::io::BufReader::new(socket); + let mut lines = br.lines(); + let mut stdout = tokio::io::stdout(); + while let Some(line) = lines.next_line().await.map_err(ServerError::LogClient)? { + stdout + .write_all(line.as_bytes()) + .await + .map_err(ServerError::LogClient)?; + _ = stdout.write(b"\n").await.map_err(ServerError::LogClient)?; + stdout.flush().await.map_err(ServerError::LogClient)?; + } + Ok(()) + } +} + #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields)] pub(crate) struct LockServerConfig { - #[serde(default = "LockServerConfig::default_host")] + #[serde(default = "default_host")] pub(crate) host: String, #[serde(default = "LockServerConfig::default_port")] pub(crate) port: usize, - #[serde(default = "LockServerConfig::default_bind_timeout_ms")] + #[serde(default = "default_bind_timeout_ms")] pub(crate) bind_timeout_ms: u64, } impl LockServerConfig { - fn default_host() -> String { - "127.0.0.1".to_string() - } fn default_port() -> usize { 5917 } - fn default_bind_timeout_ms() -> u64 { - 1000 - } } impl Default for LockServerConfig { fn default() -> Self { Self { - host: Self::default_host(), + host: default_host(), port: Self::default_port(), - bind_timeout_ms: Self::default_bind_timeout_ms(), + bind_timeout_ms: default_bind_timeout_ms(), } } } +fn default_host() -> String { + "127.0.0.1".to_string() +} +fn default_bind_timeout_ms() -> u64 { + 1000 +} + +// The lock server prevents concurrent use of a subset of monorail +// APIs, such as `run` and `checkpoint update`. +// A server bind is used instead of a lock file, because it's more +// robust and easier to use than an ephemeral file that requires +// pid tracking and manual cleanup. This technique essentially defers +// that responsibility to the OS, which already performs these tasks. pub(crate) struct LockServer { config: LockServerConfig, address: String,