Skip to content

Commit

Permalink
Allow process names to be dynamically generated
Browse files Browse the repository at this point in the history
  • Loading branch information
nviennot committed May 18, 2021
1 parent 336bba3 commit daf5cdb
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 19 deletions.
4 changes: 2 additions & 2 deletions src/cli/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ pub fn do_checkpoint(opts: Checkpoint) -> Result<Stats> {
img_streamer.process.join(&mut pgrp);

// Spawn the upload processes connected to the image streamer's output
for (upload_cmd, shard_pipe) in shard_upload_cmds.into_iter().zip(img_streamer.shard_pipes) {
for (i, (upload_cmd, shard_pipe)) in shard_upload_cmds.into_iter().zip(img_streamer.shard_pipes).enumerate() {
Command::new_shell(&upload_cmd)
.stdin(Stdio::from(shard_pipe))
.enable_stderr_logging("upload shard")
.enable_stderr_logging(format!("upload shard {}", i+1))
.spawn()?
.join(&mut pgrp);
}
Expand Down
4 changes: 2 additions & 2 deletions src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,10 @@ fn restore(
img_streamer.process.join(&mut pgrp);

// Spawn the download processes connected to the image streamer's input
for (download_cmd, shard_pipe) in shard_download_cmds.into_iter().zip(img_streamer.shard_pipes) {
for (i, (download_cmd, shard_pipe)) in shard_download_cmds.into_iter().zip(img_streamer.shard_pipes).enumerate() {
Command::new_shell(&download_cmd)
.stdout(Stdio::from(shard_pipe))
.enable_stderr_logging("download shard")
.enable_stderr_logging(format!("download shard {}", i))
.spawn()?
.join(&mut pgrp);
}
Expand Down
11 changes: 7 additions & 4 deletions src/process/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use anyhow::{Result, Context};
use std::{
borrow::Cow,
io::Result as IoResult,
os::unix::io::AsRawFd,
ffi::{OsString, OsStr},
Expand Down Expand Up @@ -45,7 +46,7 @@ pub struct Command {
inner: StdCommand,
display_args: Vec<String>,
show_cmd_on_spawn: bool,
stderr_log_prefix: Option<&'static str>,
stderr_log_prefix: Option<Cow<'static, str>>,
}

impl Command {
Expand Down Expand Up @@ -114,7 +115,7 @@ impl Command {
if self.show_cmd_on_spawn {
debug!("+ {}", display_cmd);
}
Ok(Process::new(inner, display_cmd, self.stderr_log_prefix))
Ok(Process::new(inner, display_cmd, self.stderr_log_prefix.clone()))
}

pub fn exec(&mut self) -> Result<()> {
Expand All @@ -126,8 +127,10 @@ impl Command {
/// log lines are prefixed with `log_prefix`.
/// 2) A fixed sized backlog is kept, and included in the error message.
/// The process' stderr is drained when calling try_wait(), wait(), or drain_stderr_logger().
pub fn enable_stderr_logging(&mut self, log_prefix: &'static str) -> &mut Command {
self.stderr_log_prefix = Some(log_prefix);
pub fn enable_stderr_logging<S>(&mut self, log_prefix: S) -> &mut Command
where S: Into<Cow<'static, str>>
{
self.stderr_log_prefix = Some(log_prefix.into());
// We'd also like to redirect stdout to stderr in some cases.
// But I can't find a way to do this in a simple way with the Rust std library.
self.stderr(Stdio::piped());
Expand Down
2 changes: 1 addition & 1 deletion src/process/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct ProcessError {
impl ProcessError {
pub fn to_json(&self) -> Value {
self.stderr_tail.as_ref().map(|st| json!({
st.log_prefix: {
st.log_prefix.as_ref(): {
"exit_status": self.formatted_exit_status(),
"log": &st.tail,
}
Expand Down
10 changes: 6 additions & 4 deletions src/process/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use anyhow::{Result, Context};
use std::{
borrow::Cow,
os::unix::io::RawFd,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -49,11 +50,12 @@ pub struct Process {
}

impl Process {
pub fn new(mut inner: Child, display_cmd: String, stderr_log_prefix: Option<&'static str>) -> Self {
let stderr_tail = stderr_log_prefix.map(StderrTail::new);
let stderr_reader = stderr_log_prefix.map(|_|
pub fn new(mut inner: Child, display_cmd: String, stderr_log_prefix: Option<Cow<'static, str>>) -> Self {
let stderr_reader = stderr_log_prefix.as_ref().map(|_|
StderrReader::new(inner.stderr.take().expect("stderr not captured"))
);
let stderr_tail = stderr_log_prefix.map(StderrTail::new);

Self { inner, display_cmd, stderr_reader, stderr_tail }
}

Expand Down Expand Up @@ -189,7 +191,7 @@ impl Output {
ensure_successful_exit_status(self.status, self.display_cmd.clone(), None)
}

pub fn ensure_success_with_stderr_log(&self, log_prefix: &'static str) -> Result<()> {
pub fn ensure_success_with_stderr_log(&self, log_prefix: Cow<'static, str>) -> Result<()> {
if self.status.success() {
Ok(())
} else {
Expand Down
7 changes: 4 additions & 3 deletions src/process/stderr_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use anyhow::{Result, Error};
use std::{
borrow::Cow,
os::unix::io::{AsRawFd, RawFd},
collections::VecDeque,
io::{BufReader, BufRead, ErrorKind},
Expand Down Expand Up @@ -72,7 +73,7 @@ impl StderrReader {
// programs we run (tar, criu) are well behaved.
match self.reader.read_line(&mut line) {
Err(err) if err.kind() == ErrorKind::WouldBlock => break,
Err(err) => bail!(self.format_read_error(anyhow!(err), tail.log_prefix, &line)),
Err(err) => bail!(self.format_read_error(anyhow!(err), &tail.log_prefix, &line)),
Ok(0) => break, // Reached EOF
Ok(_) => tail.log_line(&line),
}
Expand All @@ -94,14 +95,14 @@ impl StderrReader {

#[derive(Clone, Debug)]
pub struct StderrTail {
pub log_prefix: &'static str,
pub log_prefix: Cow<'static, str>,
/// We buffer the last few lines of stderr so that we can emit metrics with the
/// stderr of the process.
pub tail: VecDeque<Box<str>>,
}

impl StderrTail {
pub fn new(log_prefix: &'static str) -> Self {
pub fn new(log_prefix: Cow<'static, str>) -> Self {
let tail = VecDeque::with_capacity(STDERR_TAIL_NUM_LINES);
Self { log_prefix, tail }
}
Expand Down
12 changes: 9 additions & 3 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ mod s3;
mod gs;

use anyhow::{Result, Context};
use std::fmt;
use std::io::Write;
use std::{
borrow::Cow,
fmt,
io::Write,
};
use url::{Url, ParseError};
use crate::process::{Stdio, Command};

Expand Down Expand Up @@ -76,7 +79,10 @@ pub trait FileExt: File {
}

/// Reads a file. Returns None if it doesn't exist.
fn try_read(&self, log_prefix: &'static str) -> Result<Option<Vec<u8>>> {
fn try_read<S>(&self, log_prefix: S) -> Result<Option<Vec<u8>>>
where S: Into<Cow<'static, str>>
{
let log_prefix = log_prefix.into();
let p = Command::new_shell(&self.download_shell_cmd())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
Expand Down

0 comments on commit daf5cdb

Please sign in to comment.