From 6bfe65656be0ed219e459f47c058724ccdfa8638 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Thu, 5 Dec 2024 23:25:51 +0530 Subject: [PATCH] feat: Stderr `CommandRecord` when reading Fixes: #45 Signed-off-by: Vaibhav Rabber --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- src/main.rs | 32 +++++++++++++++++++++++--------- src/stream.rs | 6 ++---- 4 files changed, 28 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6eed313..4034dbc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1665,7 +1665,7 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" [[package]] name = "streamstore" version = "0.2.0" -source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=1754135396dcc0ab1bb7256c8d83a46de2adebde#1754135396dcc0ab1bb7256c8d83a46de2adebde" +source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=07b0186fc19b7354aa5cef919c0bf0e098b9dcd4#07b0186fc19b7354aa5cef919c0bf0e098b9dcd4" dependencies = [ "async-stream", "backon", @@ -1738,7 +1738,7 @@ dependencies = [ [[package]] name = "sync_docs" version = "0.1.0" -source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=1754135396dcc0ab1bb7256c8d83a46de2adebde#1754135396dcc0ab1bb7256c8d83a46de2adebde" +source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=07b0186fc19b7354aa5cef919c0bf0e098b9dcd4#07b0186fc19b7354aa5cef919c0bf0e098b9dcd4" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index a5f8065..9249ea4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,4 +26,4 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } [dependencies.streamstore] git = "https://github.com/s2-streamstore/s2-sdk-rust.git" -rev = "1754135396dcc0ab1bb7256c8d83a46de2adebde" +rev = "07b0186fc19b7354aa5cef919c0bf0e098b9dcd4" diff --git a/src/main.rs b/src/main.rs index 94ab5f0..7a508f4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -649,17 +649,31 @@ async fn run() -> Result<(), S2CliError> { _ => panic!("empty batch"), }; for sequenced_record in sequenced_record_batch.records { - let data = &sequenced_record.body; batch_len += sequenced_record.metered_bytes(); - writer - .write_all(data) - .await - .map_err(|e| S2CliError::RecordWrite(e.to_string()))?; - writer - .write_all(b"\n") - .await - .map_err(|e| S2CliError::RecordWrite(e.to_string()))?; + if let Some(command_record) = sequenced_record.as_command_record() { + let (cmd, description) = match command_record { + CommandRecord::Fence { fencing_token } => ( + "fence", + format!("{fencing_token:?}"), + ), + CommandRecord::Trim { seq_num } => ( + "trim", + format!("TrimPoint({seq_num})"), + ), + }; + eprintln!("{} with {}", cmd.bold(), description.green().bold()); + } else { + let data = &sequenced_record.body; + writer + .write_all(data) + .await + .map_err(|e| S2CliError::RecordWrite(e.to_string()))?; + writer + .write_all(b"\n") + .await + .map_err(|e| S2CliError::RecordWrite(e.to_string()))?; + } } total_data_len += batch_len; diff --git a/src/stream.rs b/src/stream.rs index f371bcb..68a73f8 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,6 +1,6 @@ use streamstore::{ batching::{AppendRecordsBatchingOpts, AppendRecordsBatchingStream}, - client::{ClientError, StreamClient}, + client::StreamClient, types::{ AppendInput, AppendOutput, AppendRecordBatch, CommandRecord, FencingToken, ReadLimit, ReadOutput, ReadSessionRequest, @@ -79,9 +79,7 @@ impl StreamService { CommandRecord::Fence { .. } => ServiceErrorContext::Fence, CommandRecord::Trim { .. } => ServiceErrorContext::Trim, }; - let record = AppendRecord::try_from(cmd) - .map_err(|e| ServiceError::new(context, ClientError::Conversion(e)))?; - let batch = AppendRecordBatch::try_from_iter([record]).expect("single valid append record"); + let batch = AppendRecordBatch::try_from_iter([cmd]).expect("single valid append record"); self.client .append(AppendInput::new(batch)) .await