Skip to content

Commit

Permalink
feat: Stderr CommandRecord when reading
Browse files Browse the repository at this point in the history
Fixes: #45

Signed-off-by: Vaibhav Rabber <[email protected]>
  • Loading branch information
vrongmeal committed Dec 5, 2024
1 parent 91bf66d commit 6bfe656
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 16 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

[dependencies.streamstore]
git = "https://github.com/s2-streamstore/s2-sdk-rust.git"
rev = "1754135396dcc0ab1bb7256c8d83a46de2adebde"
rev = "07b0186fc19b7354aa5cef919c0bf0e098b9dcd4"
32 changes: 23 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,17 +649,31 @@ async fn run() -> Result<(), S2CliError> {
_ => panic!("empty batch"),
};
for sequenced_record in sequenced_record_batch.records {
let data = &sequenced_record.body;
batch_len += sequenced_record.metered_bytes();

writer
.write_all(data)
.await
.map_err(|e| S2CliError::RecordWrite(e.to_string()))?;
writer
.write_all(b"\n")
.await
.map_err(|e| S2CliError::RecordWrite(e.to_string()))?;
if let Some(command_record) = sequenced_record.as_command_record() {
let (cmd, description) = match command_record {
CommandRecord::Fence { fencing_token } => (
"fence",
format!("{fencing_token:?}"),
),
CommandRecord::Trim { seq_num } => (
"trim",
format!("TrimPoint({seq_num})"),
),
};
eprintln!("{} with {}", cmd.bold(), description.green().bold());
} else {
let data = &sequenced_record.body;
writer
.write_all(data)
.await
.map_err(|e| S2CliError::RecordWrite(e.to_string()))?;
writer
.write_all(b"\n")
.await
.map_err(|e| S2CliError::RecordWrite(e.to_string()))?;
}
}
total_data_len += batch_len;

Expand Down
6 changes: 2 additions & 4 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use streamstore::{
batching::{AppendRecordsBatchingOpts, AppendRecordsBatchingStream},
client::{ClientError, StreamClient},
client::StreamClient,
types::{
AppendInput, AppendOutput, AppendRecordBatch, CommandRecord, FencingToken, ReadLimit,
ReadOutput, ReadSessionRequest,
Expand Down Expand Up @@ -79,9 +79,7 @@ impl StreamService {
CommandRecord::Fence { .. } => ServiceErrorContext::Fence,
CommandRecord::Trim { .. } => ServiceErrorContext::Trim,
};
let record = AppendRecord::try_from(cmd)
.map_err(|e| ServiceError::new(context, ClientError::Conversion(e)))?;
let batch = AppendRecordBatch::try_from_iter([record]).expect("single valid append record");
let batch = AppendRecordBatch::try_from_iter([cmd]).expect("single valid append record");
self.client
.append(AppendInput::new(batch))
.await
Expand Down

0 comments on commit 6bfe656

Please sign in to comment.