Skip to content

Commit

Permalink
Housekeeping (#34)
Browse files Browse the repository at this point in the history
* shuffling error types and refactor dag label/node lookups

* log server config
  • Loading branch information
pnordahl authored Nov 21, 2024
1 parent 72bf299 commit da9d798
Show file tree
Hide file tree
Showing 11 changed files with 321 additions and 223 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Change Log

## [3.5.5] - 2024-11-21

### Added
- `server.log` config

## [3.5.4] - 2024-11-20

### Added
Expand Down
21 changes: 21 additions & 0 deletions Monorail.reference.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,27 @@ const reference = {
]
},
"server": {
// Log server for command output streams
"log": {
/*
Host to bind.
Optional: default: "127.0.0.1"
*/
"host": "127.0.0.1",
/*
Port to bind.
Optional: default: 5917
*/
"port": 5918,
/*
Milliseconds to wait for a successful bind of the host:port.
Optional: default: 1000
*/
"bind_timeout_ms": 1000,
},
// Lock server used for concurrency control of a subset of monorail APIs.
"lock": {
/*
Expand Down
15 changes: 12 additions & 3 deletions TUTORIAL.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,22 @@ monorail config show | jq
}
}
],
"log": {
"flush_interval_ms": 500
"server": {
"log": {
"host": "127.0.0.1",
"port": 5918,
"bind_timeout_ms": 1000
},
"lock": {
"host": "127.0.0.1",
"port": 5917,
"bind_timeout_ms": 1000
}
}
}
```

This output includes some default values for things not specified, but otherwise reflects what we have entered. An additional note about the location of the `Monorail.json` file; you can specify an absolute path with `-c`, e.g. `monorail -c </path/to/your/config/file>`, and this will be used instead of the default (`$(pwd)/Monorail.json`). All of `monorail`s commands are executed, and internal tracking files and logs stored, _relative to this path_.
This output includes some default values for things not specified (irrelevant for the purposes of this tutorial), but otherwise reflects what we have entered. An additional note about the location of the `Monorail.json` file; you can specify an absolute path with `-c`, e.g. `monorail -c </path/to/your/config/file>`, and this will be used instead of the default (`$(pwd)/Monorail.json`). All of `monorail`s commands are executed, and internal tracking files and logs stored, _relative to this path_.

Before we continue, let's create an initial `checkpoint`. The checkpoint will be described in more detail in a later section of this tutorial, but think of it as a "marker" for the beginning of a change detection window. For now, just run the command:

Expand Down
6 changes: 3 additions & 3 deletions src/api/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -897,12 +897,12 @@ impl<'a> TryFrom<&'a clap::ArgMatches> for app::log::LogTailInput {
type Error = MonorailError;
fn try_from(cmd: &'a clap::ArgMatches) -> Result<Self, Self::Error> {
Ok(Self {
filter_input: app::log::LogFilterInput::try_from(cmd)?,
filter_input: core::server::LogFilterInput::try_from(cmd)?,
})
}
}

impl<'a> TryFrom<&'a clap::ArgMatches> for app::log::LogFilterInput {
impl<'a> TryFrom<&'a clap::ArgMatches> for core::server::LogFilterInput {
type Error = MonorailError;
fn try_from(cmd: &'a clap::ArgMatches) -> Result<Self, Self::Error> {
Ok(Self {
Expand All @@ -928,7 +928,7 @@ impl<'a> TryFrom<&'a clap::ArgMatches> for app::log::LogShowInput<'a> {
type Error = MonorailError;
fn try_from(cmd: &'a clap::ArgMatches) -> Result<Self, Self::Error> {
Ok(Self {
filter_input: app::log::LogFilterInput::try_from(cmd)?,
filter_input: core::server::LogFilterInput::try_from(cmd)?,
id: cmd.get_one::<usize>(ARG_ID),
})
}
Expand Down
6 changes: 2 additions & 4 deletions src/app/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::result::Result;

use serde::Serialize;

use crate::core::error::{GraphError, MonorailError};
use crate::core::error::MonorailError;
use crate::core::{self, Change, ChangeProviderKind};
use crate::core::{git, tracking};

Expand Down Expand Up @@ -357,9 +357,7 @@ pub(crate) fn analyze(
for group in groups.iter().rev() {
let mut pg: Vec<String> = vec![];
for id in group {
let label = index.dag.node2label.get(id).ok_or_else(|| {
MonorailError::DependencyGraph(GraphError::LabelNotFound(*id))
})?;
let label = index.dag.get_label_by_node(id)?;
if output_targets.contains::<String>(label) {
pg.push(label.to_owned());
}
Expand Down
83 changes: 21 additions & 62 deletions src/app/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,32 @@ use std::io::Write;
use std::result::Result;
use std::{path, sync};

use serde::{Deserialize, Serialize};
use serde::Serialize;
use sha2::Digest;
use std::io::BufRead;

use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::sync::mpsc;
use tracing::{debug, info, instrument, trace};

use crate::core::{self, error::MonorailError, tracking};
use crate::core::{self, error::MonorailError, server, tracking};

pub(crate) const STDOUT_FILE: &str = "stdout.zst";
pub(crate) const STDERR_FILE: &str = "stderr.zst";
pub(crate) const RESET_COLOR: &str = "\x1b[0m";

#[derive(Debug, Clone, Deserialize, Serialize)]
pub(crate) struct LogFilterInput {
pub(crate) commands: HashSet<String>,
pub(crate) targets: HashSet<String>,
pub(crate) include_stdout: bool,
pub(crate) include_stderr: bool,
}
// How often to flush accumulated logs to clients and compressors.
// While this could technically be configurable, a user is unlikely
// to know what to set this to, so we'll leave it a constant for now.
const FLUSH_INTERVAL_MS: u64 = 500_u64;

#[derive(Serialize)]
pub(crate) struct LogTailInput {
pub(crate) filter_input: LogFilterInput,
pub(crate) filter_input: server::LogFilterInput,
}

pub(crate) async fn log_tail<'a>(
_cfg: &'a core::Config,
cfg: &'a core::Config,
input: &LogTailInput,
) -> Result<(), MonorailError> {
// require at least one of the log types be opted into
Expand All @@ -40,14 +37,14 @@ pub(crate) async fn log_tail<'a>(
"No stream selected; provide one or both of: --stdout, --stderr",
));
}
let mut lss = StreamServer::new("127.0.0.1:9201", &input.filter_input);
lss.listen().await
let ls = server::LogServer::new(cfg.server.log.clone(), &input.filter_input);
ls.serve().await.map_err(MonorailError::from)
}

#[derive(Serialize)]
pub(crate) struct LogShowInput<'a> {
pub(crate) id: Option<&'a usize>,
pub(crate) filter_input: LogFilterInput,
pub(crate) filter_input: server::LogFilterInput,
}

pub(crate) fn log_show<'a>(
Expand Down Expand Up @@ -174,48 +171,12 @@ fn stream_archive_file_to_stdout(
Ok(())
}

pub(crate) struct StreamServer<'a> {
address: &'a str,
args: &'a LogFilterInput,
}
impl<'a> StreamServer<'a> {
pub(crate) fn new(address: &'a str, args: &'a LogFilterInput) -> Self {
Self { address, args }
}
pub(crate) async fn listen(&mut self) -> Result<(), MonorailError> {
let listener = tokio::net::TcpListener::bind(self.address).await?;
let args_data = serde_json::to_vec(&self.args)?;
debug!("Log stream server listening");
loop {
let (mut socket, _) = listener.accept().await?;
debug!("Client connected");
// first, write to the client what we're interested in receiving
socket.write_all(&args_data).await?;
_ = socket.write(b"\n").await?;
debug!("Sent log stream arguments");
Self::process(socket).await?;
}
}
#[instrument]
async fn process(socket: tokio::net::TcpStream) -> Result<(), MonorailError> {
let br = tokio::io::BufReader::new(socket);
let mut lines = br.lines();
let mut stdout = tokio::io::stdout();
while let Some(line) = lines.next_line().await? {
stdout.write_all(line.as_bytes()).await?;
_ = stdout.write(b"\n").await?;
stdout.flush().await?;
}
Ok(())
}
}

#[derive(Debug, Clone)]
pub(crate) struct StreamClient {
pub(crate) struct LogServerClient {
stream: sync::Arc<tokio::sync::Mutex<tokio::net::TcpStream>>,
pub(crate) args: LogFilterInput,
pub(crate) args: server::LogFilterInput,
}
impl StreamClient {
impl LogServerClient {
#[instrument]
pub(crate) async fn data(
&mut self,
Expand All @@ -230,14 +191,14 @@ impl StreamClient {
Ok(())
}
#[instrument]
pub(crate) async fn connect(addr: &str) -> Result<Self, MonorailError> {
let mut stream = tokio::net::TcpStream::connect(addr).await?;
info!(address = addr, "Connected to log stream server");
pub(crate) async fn connect(cfg: &server::LogServerConfig) -> Result<Self, MonorailError> {
let mut stream = tokio::net::TcpStream::connect(cfg.address().as_str()).await?;
info!(address = cfg.address(), "Connected to log stream server");
let mut args_data = Vec::new();
// pull arg preferences from the server on connect
let mut br = tokio::io::BufReader::new(&mut stream);
br.read_until(b'\n', &mut args_data).await?;
let args: LogFilterInput = serde_json::from_slice(args_data.as_slice())?;
let args: server::LogFilterInput = serde_json::from_slice(args_data.as_slice())?;
debug!("Received log stream arguments");
if args.include_stdout || args.include_stderr {
let targets = if args.targets.is_empty() {
Expand Down Expand Up @@ -271,18 +232,16 @@ impl StreamClient {
}

pub(crate) async fn process_reader<R>(
config: &core::LogConfig,
mut reader: tokio::io::BufReader<R>,
compressor_client: CompressorClient,
header: String,
mut log_stream_client: Option<StreamClient>,
mut log_stream_client: Option<LogServerClient>,
token: sync::Arc<tokio_util::sync::CancellationToken>,
) -> Result<(), MonorailError>
where
R: tokio::io::AsyncRead + Unpin,
{
let mut interval =
tokio::time::interval(tokio::time::Duration::from_millis(config.flush_interval_ms));
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(FLUSH_INTERVAL_MS));
loop {
let mut bufs = Vec::new();
loop {
Expand Down Expand Up @@ -321,7 +280,7 @@ async fn process_bufs(
header: &str,
bufs: Vec<Vec<u8>>,
compressor_client: &CompressorClient,
log_stream_client: &mut Option<StreamClient>,
log_stream_client: &mut Option<LogServerClient>,
should_end: bool,
) -> Result<(), MonorailError> {
if !bufs.is_empty() {
Expand Down
26 changes: 7 additions & 19 deletions src/app/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use sha2::Digest;
use tracing::{debug, error, info, instrument};

use crate::app::{analyze, log, result, target};
use crate::core::error::{GraphError, MonorailError};
use crate::core::{self, file, git, tracking, ChangeProviderKind, Target};
use crate::core::error::MonorailError;
use crate::core::{self, file, git, server, tracking, ChangeProviderKind, Target};

#[derive(Debug)]
pub(crate) struct HandleRunInput<'a> {
Expand Down Expand Up @@ -409,14 +409,7 @@ fn get_plan<'a>(
.targets
.insert(target_path.to_string(), target_hash.clone());
let logs = Logs::new(run_path, cmd.as_str(), &target_hash)?;
let target_index = index
.dag
.label2node
.get(target_path.as_str())
.copied()
.ok_or(MonorailError::DependencyGraph(
GraphError::LabelNodeNotFound(target_path.to_owned()),
))?;
let target_index = index.dag.get_node_by_label(target_path)?;
let tar = targets
.get(target_index)
.ok_or(MonorailError::from("Target not found"))?;
Expand Down Expand Up @@ -499,10 +492,9 @@ struct CommandTask {
token: sync::Arc<tokio_util::sync::CancellationToken>,
target: sync::Arc<String>,
command: sync::Arc<String>,
log_config: sync::Arc<core::LogConfig>,
stdout_client: log::CompressorClient,
stderr_client: log::CompressorClient,
log_stream_client: Option<log::StreamClient>,
log_stream_client: Option<log::LogServerClient>,
}
impl CommandTask {
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -540,7 +532,6 @@ impl CommandTask {
true,
);
let stdout_fut = log::process_reader(
&self.log_config,
tokio::io::BufReader::new(
child
.stdout
Expand All @@ -565,7 +556,6 @@ impl CommandTask {
true,
);
let stderr_fut = log::process_reader(
&self.log_config,
tokio::io::BufReader::new(
child
.stderr
Expand Down Expand Up @@ -663,8 +653,8 @@ fn get_all_commands<'a>(
Ok(all_commands)
}

async fn initialize_log_stream(addr: &str) -> Option<log::StreamClient> {
match log::StreamClient::connect(addr).await {
async fn initialize_log_stream(cfg: &server::LogServerConfig) -> Option<log::LogServerClient> {
match log::LogServerClient::connect(cfg).await {
Ok(client) => Some(client),
Err(e) => {
debug!(error = e.to_string(), "Log streaming disabled");
Expand Down Expand Up @@ -905,7 +895,7 @@ async fn process_plan(
fail_on_undefined: bool,
) -> Result<(Vec<CommandRunResult>, bool), MonorailError> {
// TODO: parameterize addr from cfg
let log_stream_client = initialize_log_stream("127.0.0.1:9201").await;
let log_stream_client = initialize_log_stream(&cfg.server.log).await;

let mut results = Vec::new();
let mut failed = false;
Expand All @@ -930,7 +920,6 @@ async fn process_plan(
continue;
}

let log_config = sync::Arc::new(cfg.log.clone());
let mut crr = CommandRunResult::new(command);

for plan_targets in plan_command_target_group.target_groups.iter() {
Expand All @@ -956,7 +945,6 @@ async fn process_plan(
token: token.clone(),
target,
command,
log_config: log_config.clone(),
stdout_client: clients.0.clone(),
stderr_client: clients.1.clone(),
log_stream_client: log_stream_client.clone(),
Expand Down
Loading

0 comments on commit da9d798

Please sign in to comment.