diff --git a/src/main.rs b/src/main.rs index fdf2474..c7b2875 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,6 +16,7 @@ use tokio::{ io::{self, AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}, }; use tokio_stream::StreamExt; +use tracing::trace; use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt}; use types::{BasinConfig, StreamConfig, RETENTION_POLICY_PATH, STORAGE_CLASS_PATH}; @@ -237,6 +238,7 @@ impl RecordsIO { pub async fn into_writer(&self) -> io::Result> { match self { RecordsIO::File(path) => { + trace!(?path, "opening file writer"); let file = OpenOptions::new() .write(true) .create(true) @@ -244,9 +246,12 @@ impl RecordsIO { .open(path) .await?; - Ok(Box::new(tokio::io::BufWriter::new(file))) + Ok(Box::new(BufWriter::new(file))) + } + RecordsIO::Stdout => { + trace!("stdout writer"); + Ok(Box::new(BufWriter::new(tokio::io::stdout()))) } - RecordsIO::Stdout => Ok(Box::new(BufWriter::new(tokio::io::stdout()))), RecordsIO::Stdin => panic!("unsupported record source"), } } @@ -538,6 +543,9 @@ async fn run() -> Result<(), S2CliError> { ); } } + if let Some(ref mut writer) = writer { + writer.flush().await.expect("writer flush"); + } } } }