Skip to content

Commit

Permalink
feat: exercise limits for read session
Browse files Browse the repository at this point in the history
  • Loading branch information
infiniteregrets committed Nov 20, 2024
1 parent d1f2c49 commit 623cac5
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 24 deletions.
Binary file added foo.txt
Binary file not shown.
35 changes: 16 additions & 19 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,21 @@ enum StreamActions {

Read {
/// Starting sequence number (inclusive). If not specified, the latest record.
start_seq_num: Option<u64>,
start: u64,

/// Output records to a file or stdout.
/// Use "-" to write to stdout.
#[arg(value_parser = parse_records_output_source, default_value = "-")]
output: Option<RecordsOut>,

// Limit the number of records to read.
limit_count: Option<u64>,

// Limit the number of bytes to read.
limit_bytes: Option<u64>,
},
}

/// Source of records for an append session.
#[derive(Debug, Clone)]
pub enum RecordsIn {
File(PathBuf),
Expand All @@ -239,7 +244,7 @@ impl RecordsIn {
pub async fn into_reader(&self) -> std::io::Result<Box<dyn AsyncBufRead + Send + Unpin>> {
match self {
RecordsIn::File(path) => Ok(Box::new(BufReader::new(File::open(path).await?))),
RecordsIn::Stdin => Ok(Box::new(BufReader::new(tokio::io::stdin()))),
RecordsIn::Stdin => Ok(Box::new(BufReader::new(tokio::io::stdin()))),
}
}
}
Expand All @@ -261,7 +266,7 @@ impl RecordsOut {
RecordsOut::Stdout => {
trace!("stdout writer");
Ok(Box::new(BufWriter::new(tokio::io::stdout())))
}
}
}
}
}
Expand Down Expand Up @@ -510,12 +515,14 @@ async fn run() -> Result<(), S2CliError> {
}
}
StreamActions::Read {
start_seq_num,
start,
output,
} => {
limit_count,
limit_bytes,
} => {
let stream_client = StreamClient::new(client_config, basin, stream);
let mut read_output_stream = StreamService::new(stream_client)
.read_session(start_seq_num)
.read_session(start, limit_count, limit_bytes)
.await?;
let mut writer = match output {
Some(output) => Some(output.into_writer().await.unwrap()),
Expand Down Expand Up @@ -545,17 +552,7 @@ async fn run() -> Result<(), S2CliError> {
(Some(first), Some(last)) => first.seq_num..=last.seq_num,
_ => panic!("empty batch"),
};
for sequenced_record in sequenced_record_batch.records {
eprintln!(
"{}",
format!(
"✓ [READ] got record batch: seq_num: {}",
sequenced_record.seq_num,
)
.green()
.bold()
);

for sequenced_record in sequenced_record_batch.records {
let data = &sequenced_record.body;
batch_len += sequenced_record.metered_size();

Expand All @@ -580,7 +577,7 @@ async fn run() -> Result<(), S2CliError> {
eprintln!(
"{}",
format!(
"{throughput_mibps:.2} MiB/s \
"{throughput_mibps:.2} MiB/s \
({num_records} records in range {seq_range:?})",
)
.blue()
Expand Down
26 changes: 21 additions & 5 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use streamstore::{
batching::AppendRecordsBatchingStream,
client::StreamClient,
types::{AppendOutput, ReadOutput, ReadSessionRequest},
types::{AppendOutput, ReadLimit, ReadOutput, ReadSessionRequest},
Streaming,
};
use tokio::io::AsyncBufRead;
Expand Down Expand Up @@ -89,12 +89,28 @@ impl StreamService {

pub async fn read_session(
&self,
start_seq_num: Option<u64>,
start: u64,
limit_count: Option<u64>,
limit_bytes: Option<u64>,
) -> Result<Streaming<ReadOutput>, StreamServiceError> {
let mut read_session_req = ReadSessionRequest::new();
if let Some(start_seq_num) = start_seq_num {
read_session_req = read_session_req.with_start_seq_num(start_seq_num);
let read_limit = match (limit_count, limit_bytes) {
(Some(count), Some(bytes)) => Some(ReadLimit { count, bytes }),
(Some(count), None) => Some(ReadLimit {
count,
bytes: 1024 * 1024,
}),

(None, Some(bytes)) => Some(ReadLimit { count: 1000, bytes }),
_ => None,
};

let mut read_session_req = ReadSessionRequest::new()
.with_start_seq_num(start);

if let Some(read_limit) = read_limit {
read_session_req = read_session_req.with_limit(read_limit);
}

self.client
.read_session(read_session_req)
.await
Expand Down

0 comments on commit 623cac5

Please sign in to comment.