Skip to content

Commit

Permalink
feat: exercise limits for read session (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
infiniteregrets authored Nov 20, 2024
1 parent 74326a3 commit e260538
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 59 deletions.
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub enum S2CliError {
#[diagnostic(help("Are you trying to operate on an invalid basin?"))]
ConvertError(#[from] ConvertError),

#[error("{0}")]
#[error(transparent)]
#[diagnostic(help("Are you overriding `S2_CLOUD`, `S2_CELL`, or `S2_BASIN_ZONE`?"))]
HostEndpoints(#[from] ParseError),

Expand Down
107 changes: 55 additions & 52 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,43 +204,62 @@ enum StreamActions {

/// Append records to a stream. Currently, only newline delimited records are supported.
Append {
/// Newline delimited records to append from a file or stdin (all records are treated as plain text).
/// Input newline delimited records to append from a file or stdin.
/// All records are treated as plain text.
/// Use "-" to read from stdin.
#[arg(value_parser = parse_records_input_source)]
records: RecordsIO,
#[arg(short = 'i', long, value_parser = parse_records_input_source, default_value = "-")]
input: RecordsIn,
},

/// Read records from a stream.
/// If a limit if specified, reading will stop when the limit is reached or there are no more records on the stream.
/// If a limit is not specified, the reader will keep tailing and wait for new records.
Read {
/// Starting sequence number (inclusive). If not specified, the latest record.
start_seq_num: Option<u64>,
#[arg(short = 's', long)]
start_seq_num: u64,

/// Output records to a file or stdout.
/// Use "-" to write to stdout.
#[arg(value_parser = parse_records_output_source)]
output: Option<RecordsIO>,
#[arg(short = 'o', long, value_parser = parse_records_output_source, default_value = "-")]
output: RecordsOut,

/// Limit the number of records returned.
#[arg(short = 'n', long)]
limit_count: Option<u64>,

/// Limit the number of bytes returned.
#[arg(short = 'b', long)]
limit_bytes: Option<ByteSize>,
},
}

/// Source of records for an append session.
#[derive(Debug, Clone)]
pub enum RecordsIO {
pub enum RecordsIn {
File(PathBuf),
Stdin,
}

/// Sink for records in a read session.
#[derive(Debug, Clone)]
pub enum RecordsOut {
File(PathBuf),
Stdout,
}

impl RecordsIO {
impl RecordsIn {
pub async fn into_reader(&self) -> std::io::Result<Box<dyn AsyncBufRead + Send + Unpin>> {
match self {
RecordsIO::File(path) => Ok(Box::new(BufReader::new(File::open(path).await?))),
RecordsIO::Stdin => Ok(Box::new(BufReader::new(tokio::io::stdin()))),
_ => panic!("unsupported record source"),
RecordsIn::File(path) => Ok(Box::new(BufReader::new(File::open(path).await?))),
RecordsIn::Stdin => Ok(Box::new(BufReader::new(tokio::io::stdin()))),
}
}
}

impl RecordsOut {
pub async fn into_writer(&self) -> io::Result<Box<dyn AsyncWrite + Send + Unpin>> {
match self {
RecordsIO::File(path) => {
RecordsOut::File(path) => {
trace!(?path, "opening file writer");
let file = OpenOptions::new()
.write(true)
Expand All @@ -251,26 +270,25 @@ impl RecordsIO {

Ok(Box::new(BufWriter::new(file)))
}
RecordsIO::Stdout => {
RecordsOut::Stdout => {
trace!("stdout writer");
Ok(Box::new(BufWriter::new(tokio::io::stdout())))
}
RecordsIO::Stdin => panic!("unsupported record source"),
}
}
}

fn parse_records_input_source(s: &str) -> Result<RecordsIO, std::io::Error> {
fn parse_records_input_source(s: &str) -> Result<RecordsIn, std::io::Error> {
match s {
"-" => Ok(RecordsIO::Stdin),
_ => Ok(RecordsIO::File(PathBuf::from(s))),
"" | "-" => Ok(RecordsIn::Stdin),
_ => Ok(RecordsIn::File(PathBuf::from(s))),
}
}

fn parse_records_output_source(s: &str) -> Result<RecordsIO, std::io::Error> {
fn parse_records_output_source(s: &str) -> Result<RecordsOut, std::io::Error> {
match s {
"-" => Ok(RecordsIO::Stdout),
_ => Ok(RecordsIO::File(PathBuf::from(s))),
"" | "-" => Ok(RecordsOut::Stdout),
_ => Ok(RecordsOut::File(PathBuf::from(s))),
}
}

Expand Down Expand Up @@ -472,10 +490,10 @@ async fn run() -> Result<(), S2CliError> {
let next_seq_num = StreamService::new(stream_client).check_tail().await?;
println!("{}", next_seq_num);
}
StreamActions::Append { records } => {
StreamActions::Append { input } => {
let stream_client = StreamClient::new(client_config, basin, stream);
let append_input_stream = RecordStream::new(
records
input
.into_reader()
.await
.map_err(|e| S2CliError::RecordReaderInit(e.to_string()))?
Expand Down Expand Up @@ -506,15 +524,14 @@ async fn run() -> Result<(), S2CliError> {
StreamActions::Read {
start_seq_num,
output,
limit_count,
limit_bytes,
} => {
let stream_client = StreamClient::new(client_config, basin, stream);
let mut read_output_stream = StreamService::new(stream_client)
.read_session(start_seq_num)
.read_session(start_seq_num, limit_count, limit_bytes)
.await?;
let mut writer = match output {
Some(output) => Some(output.into_writer().await.unwrap()),
None => None,
};
let mut writer = output.into_writer().await.unwrap();

let mut start = None;
let mut total_data_len = ByteSize::b(0);
Expand All @@ -540,29 +557,17 @@ async fn run() -> Result<(), S2CliError> {
_ => panic!("empty batch"),
};
for sequenced_record in sequenced_record_batch.records {
eprintln!(
"{}",
format!(
"✓ [READ] got record batch: seq_num: {}",
sequenced_record.seq_num,
)
.green()
.bold()
);

let data = &sequenced_record.body;
batch_len += sequenced_record.metered_size();

if let Some(ref mut writer) = writer {
writer
.write_all(data)
.await
.map_err(|e| S2CliError::RecordWrite(e.to_string()))?;
writer
.write_all(b"\n")
.await
.map_err(|e| S2CliError::RecordWrite(e.to_string()))?;
}
writer
.write_all(data)
.await
.map_err(|e| S2CliError::RecordWrite(e.to_string()))?;
writer
.write_all(b"\n")
.await
.map_err(|e| S2CliError::RecordWrite(e.to_string()))?;
}
total_data_len += batch_len;

Expand All @@ -574,7 +579,7 @@ async fn run() -> Result<(), S2CliError> {
eprintln!(
"{}",
format!(
"{throughput_mibps:.2} MiB/s \
"⦿ {throughput_mibps:.2} MiB/s \
({num_records} records in range {seq_range:?})",
)
.blue()
Expand Down Expand Up @@ -606,9 +611,7 @@ async fn run() -> Result<(), S2CliError> {
.bold()
);

if let Some(ref mut writer) = writer {
writer.flush().await.expect("writer flush");
}
writer.flush().await.expect("writer flush");
}
}
}
Expand Down
21 changes: 15 additions & 6 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use streamstore::{
batching::AppendRecordsBatchingStream,
client::StreamClient,
types::{AppendOutput, ReadOutput, ReadSessionRequest},
types::{AppendOutput, ReadLimit, ReadOutput, ReadSessionRequest},
Streaming,
};
use tokio::io::AsyncBufRead;
Expand All @@ -14,6 +14,7 @@ use tokio::io::Lines;
use tokio_stream::Stream;

use crate::error::s2_status;
use crate::ByteSize;

pin_project! {
#[derive(Debug)]
Expand Down Expand Up @@ -89,12 +90,20 @@ impl StreamService {

pub async fn read_session(
&self,
start_seq_num: Option<u64>,
start_seq_num: u64,
limit_count: Option<u64>,
limit_bytes: Option<ByteSize>,
) -> Result<Streaming<ReadOutput>, StreamServiceError> {
let mut read_session_req = ReadSessionRequest::new();
if let Some(start_seq_num) = start_seq_num {
read_session_req = read_session_req.with_start_seq_num(start_seq_num);
}
let read_session_req = ReadSessionRequest {
start_seq_num: Some(start_seq_num),
limit: match (limit_count, limit_bytes.map(|b| b.as_u64())) {
(Some(count), Some(bytes)) => Some(ReadLimit { count, bytes }),
(Some(count), None) => Some(ReadLimit { count, bytes: 0 }),
(None, Some(bytes)) => Some(ReadLimit { count: 0, bytes }),
(None, None) => None,
},
};

self.client
.read_session(read_session_req)
.await
Expand Down

0 comments on commit e260538

Please sign in to comment.