Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
infiniteregrets committed Nov 20, 2024
1 parent 623cac5 commit b3b88a9
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 24 deletions.
18 changes: 11 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,16 @@ enum StreamActions {

/// Append records to a stream. Currently, only newline delimited records are supported.
Append {
/// Newline delimited records to append from a file or stdin (all records are treated as plain text).
/// Input newline delimited records to append from a file or stdin.
/// All records are treated as plain text.
/// Use "-" to read from stdin.
#[arg(value_parser = parse_records_input_source, default_value = "-")]
records: RecordsIn,
},

/// Read records from a stream.
/// If a limit if specified, reading will stop when the limit is reached or there are no more records on the stream.
/// If a limit is not specified, the reader will keep tailing and wait for new records.
Read {
/// Starting sequence number (inclusive). If not specified, the latest record.
start: u64,
Expand All @@ -219,11 +223,11 @@ enum StreamActions {
#[arg(value_parser = parse_records_output_source, default_value = "-")]
output: Option<RecordsOut>,

// Limit the number of records to read.
/// Limit the number of records returned.
limit_count: Option<u64>,

// Limit the number of bytes to read.
limit_bytes: Option<u64>,
/// Limit the number of bytes returned
limit_bytes: Option<ByteSize>,
},
}

Expand Down Expand Up @@ -519,7 +523,7 @@ async fn run() -> Result<(), S2CliError> {
output,
limit_count,
limit_bytes,
} => {
} => {
let stream_client = StreamClient::new(client_config, basin, stream);
let mut read_output_stream = StreamService::new(stream_client)
.read_session(start, limit_count, limit_bytes)
Expand Down Expand Up @@ -552,7 +556,7 @@ async fn run() -> Result<(), S2CliError> {
(Some(first), Some(last)) => first.seq_num..=last.seq_num,
_ => panic!("empty batch"),
};
for sequenced_record in sequenced_record_batch.records {
for sequenced_record in sequenced_record_batch.records {
let data = &sequenced_record.body;
batch_len += sequenced_record.metered_size();

Expand All @@ -577,7 +581,7 @@ async fn run() -> Result<(), S2CliError> {
eprintln!(
"{}",
format!(
" {throughput_mibps:.2} MiB/s \
"⦿ {throughput_mibps:.2} MiB/s \
({num_records} records in range {seq_range:?})",
)
.blue()
Expand Down
27 changes: 10 additions & 17 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tokio::io::Lines;
use tokio_stream::Stream;

use crate::error::s2_status;
use crate::ByteSize;

pin_project! {
#[derive(Debug)]
Expand Down Expand Up @@ -91,26 +92,18 @@ impl StreamService {
&self,
start: u64,
limit_count: Option<u64>,
limit_bytes: Option<u64>,
limit_bytes: Option<ByteSize>,
) -> Result<Streaming<ReadOutput>, StreamServiceError> {
let read_limit = match (limit_count, limit_bytes) {
(Some(count), Some(bytes)) => Some(ReadLimit { count, bytes }),
(Some(count), None) => Some(ReadLimit {
count,
bytes: 1024 * 1024,
}),

(None, Some(bytes)) => Some(ReadLimit { count: 1000, bytes }),
_ => None,
let read_session_req = ReadSessionRequest {
start_seq_num: Some(start),
limit: match (limit_count, limit_bytes.map(|b| b.as_u64())) {
(Some(count), Some(bytes)) => Some(ReadLimit { count, bytes }),
(Some(count), None) => Some(ReadLimit { count, bytes: 0 }),
(None, Some(bytes)) => Some(ReadLimit { count: 0, bytes }),
(None, None) => None,
},
};

let mut read_session_req = ReadSessionRequest::new()
.with_start_seq_num(start);

if let Some(read_limit) = read_limit {
read_session_req = read_session_req.with_limit(read_limit);
}

self.client
.read_session(read_session_req)
.await
Expand Down

0 comments on commit b3b88a9

Please sign in to comment.