diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ba7aea9..3a00f58 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -55,5 +55,7 @@ jobs: run: cargo doc --workspace --all-features --no-deps --document-private-items - name: Check formatting run: cargo fmt --all -- --check + - name: check clippy + run: cargo clippy --workspace --all-features --all-targets -- -D warnings --allow deprecated - name: Check Cargo.toml sorting run: cargo sort --workspace --check diff --git a/Cargo.lock b/Cargo.lock index c4f820d..c2529fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1671,8 +1671,9 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" [[package]] name = "streamstore" version = "0.1.0" -source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=ead36a9bf8177dab744ac158b625ffec9ea6c50a#ead36a9bf8177dab744ac158b625ffec9ea6c50a" +source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=f39cb52c15869b0d02c92e441b2828a27dfeac72#f39cb52c15869b0d02c92e441b2828a27dfeac72" dependencies = [ + "async-stream", "backon", "bytesize", "futures", @@ -1681,9 +1682,11 @@ dependencies = [ "hyper-util", "prost", "prost-types", + "regex", "secrecy", "sync_docs", "thiserror", + "tokio", "tonic", "tonic-build", "tower-service", @@ -1736,7 +1739,7 @@ dependencies = [ [[package]] name = "sync_docs" version = "0.1.0" -source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=ead36a9bf8177dab744ac158b625ffec9ea6c50a#ead36a9bf8177dab744ac158b625ffec9ea6c50a" +source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=f39cb52c15869b0d02c92e441b2828a27dfeac72#f39cb52c15869b0d02c92e441b2828a27dfeac72" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index d195362..7466772 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ pin-project-lite = "0.2" pin-utils = "0.1.0" serde = { version = "1.0.214", features = ["derive"] } serde_json = "1.0.132" -streamstore = { git = "https://github.com/s2-streamstore/s2-sdk-rust.git", rev = "ead36a9bf8177dab744ac158b625ffec9ea6c50a" } +streamstore = { git = "https://github.com/s2-streamstore/s2-sdk-rust.git", rev = "f39cb52c15869b0d02c92e441b2828a27dfeac72" } thiserror = "1.0.67" tokio = { version = "*", features = ["full"] } tokio-stream = { version = "0.1.16", features = ["io-util"] } diff --git a/src/account.rs b/src/account.rs index 16738c0..9335ab1 100644 --- a/src/account.rs +++ b/src/account.rs @@ -1,8 +1,8 @@ use streamstore::{ client::Client, types::{ - BasinConfig, BasinMetadata, CreateBasinRequest, DeleteBasinRequest, ListBasinsRequest, - ListBasinsResponse, ReconfigureBasinRequest, StreamConfig, + BasinConfig, BasinMetadata, BasinName, CreateBasinRequest, DeleteBasinRequest, + ListBasinsRequest, ListBasinsResponse, ReconfigureBasinRequest, StreamConfig, }, }; @@ -54,7 +54,7 @@ impl AccountService { pub async fn create_basin( &self, - basin: String, + basin: BasinName, storage_class: Option, retention_policy: Option, ) -> Result { @@ -77,7 +77,7 @@ impl AccountService { .map_err(|e| AccountServiceError::CreateBasin(s2_status(&e))) } - pub async fn delete_basin(&self, basin: String) -> Result<(), AccountServiceError> { + pub async fn delete_basin(&self, basin: BasinName) -> Result<(), AccountServiceError> { let delete_basin_req = DeleteBasinRequest::new(basin); self.client .delete_basin(delete_basin_req) @@ -88,18 +88,17 @@ impl AccountService { pub async fn get_basin_config( &self, - basin: String, + basin: BasinName, ) -> Result { - Ok(self - .client + self.client .get_basin_config(basin) .await - .map_err(|e| AccountServiceError::GetBasinConfig(s2_status(&e)))?) + .map_err(|e| AccountServiceError::GetBasinConfig(s2_status(&e))) } pub async fn reconfigure_basin( &self, - basin: String, + basin: BasinName, basin_config: BasinConfig, mask: Vec, ) -> Result<(), AccountServiceError> { diff --git a/src/basin.rs b/src/basin.rs index 20f27f3..356a76d 100644 --- a/src/basin.rs +++ b/src/basin.rs @@ -85,11 +85,10 @@ impl BasinService { &self, stream: String, ) -> Result { - Ok(self - .client + self.client .get_stream_config(stream) .await - .map_err(|e| BasinServiceError::GetStreamConfig(s2_status(&e)))?) + .map_err(|e| BasinServiceError::GetStreamConfig(s2_status(&e))) } pub async fn reconfigure_stream( diff --git a/src/error.rs b/src/error.rs index 31834af..96cf899 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,8 @@ use miette::Diagnostic; -use streamstore::client::{ClientError, ConnectionError, ParseError}; +use streamstore::{ + client::{ClientError, ParseError}, + types::ConvertError, +}; use thiserror::Error; use crate::{ @@ -28,9 +31,9 @@ pub enum S2CliError { #[diagnostic(transparent)] Config(#[from] S2ConfigError), - #[error("Failed to connect to s2: {0}")] - #[diagnostic(help("Are you connected to the internet?"))] - Connection(#[from] ConnectionError), + #[error(transparent)] + #[diagnostic(help("Are you trying to operate on an invalid basin?"))] + ConvertError(#[from] ConvertError), #[error("{0}")] #[diagnostic(help("Are you overriding `S2_CLOUD`, `S2_CELL`, or `S2_BASIN_ZONE`?"))] diff --git a/src/main.rs b/src/main.rs index d10de64..ce0727a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,7 +10,8 @@ use stream::{RecordStream, StreamService, StreamServiceError}; use streamstore::{ bytesize::ByteSize, client::{BasinClient, Client, ClientConfig, HostEndpoints, ParseError, StreamClient}, - types::{BasinMetadata, MeteredSize as _, ReadOutput}, + types::{BasinMetadata, BasinName, MeteredSize as _, ReadOutput}, + HeaderValue, }; use tokio::{ fs::{File, OpenOptions}, @@ -275,7 +276,7 @@ fn parse_records_output_source(s: &str) -> Result { fn client_config(auth_token: String) -> Result { Ok(ClientConfig::new(auth_token.to_string()) - .with_user_agent("s2-cli") + .with_user_agent("s2-cli".parse::().expect("valid user agent")) .with_host_endpoints(HostEndpoints::from_env()?) .with_connection_timeout(std::time::Duration::from_secs(5))) } @@ -317,7 +318,7 @@ async fn run() -> Result<(), S2CliError> { Commands::Account { action } => { let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; - let account_service = AccountService::new(Client::new(client_config)?); + let account_service = AccountService::new(Client::new(client_config)); match action { AccountActions::ListBasins { prefix, @@ -354,19 +355,19 @@ async fn run() -> Result<(), S2CliError> { None => (None, None), }; account_service - .create_basin(basin, storage_class, retention_policy) + .create_basin(basin.try_into()?, storage_class, retention_policy) .await?; eprintln!("{}", "✓ Basin created".green().bold()); } AccountActions::DeleteBasin { basin } => { - account_service.delete_basin(basin).await?; + account_service.delete_basin(basin.try_into()?).await?; eprintln!("{}", "✓ Basin deletion requested".green().bold()); } AccountActions::GetBasinConfig { basin } => { - let basin_config = account_service.get_basin_config(basin).await?; + let basin_config = account_service.get_basin_config(basin.try_into()?).await?; let basin_config: BasinConfig = basin_config.into(); println!("{}", serde_json::to_string_pretty(&basin_config)?); } @@ -383,13 +384,14 @@ async fn run() -> Result<(), S2CliError> { } account_service - .reconfigure_basin(basin, config.into(), mask) + .reconfigure_basin(basin.try_into()?, config.into(), mask) .await?; } } } Commands::Basin { basin, action } => { + let basin = BasinName::try_from(basin)?; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; match action { @@ -398,7 +400,7 @@ async fn run() -> Result<(), S2CliError> { start_after, limit, } => { - let basin_client = BasinClient::new(client_config, basin)?; + let basin_client = BasinClient::new(client_config, basin); let streams = BasinService::new(basin_client) .list_streams( prefix.unwrap_or_default(), @@ -412,7 +414,7 @@ async fn run() -> Result<(), S2CliError> { } BasinActions::CreateStream { stream, config } => { - let basin_client = BasinClient::new(client_config, basin)?; + let basin_client = BasinClient::new(client_config, basin); BasinService::new(basin_client) .create_stream(stream, config.map(Into::into)) .await?; @@ -420,7 +422,7 @@ async fn run() -> Result<(), S2CliError> { } BasinActions::DeleteStream { stream } => { - let basin_client = BasinClient::new(client_config, basin)?; + let basin_client = BasinClient::new(client_config, basin); BasinService::new(basin_client) .delete_stream(stream) .await?; @@ -428,7 +430,7 @@ async fn run() -> Result<(), S2CliError> { } BasinActions::GetStreamConfig { stream } => { - let basin_client = BasinClient::new(client_config, basin)?; + let basin_client = BasinClient::new(client_config, basin); let config = BasinService::new(basin_client) .get_stream_config(stream) .await?; @@ -437,7 +439,7 @@ async fn run() -> Result<(), S2CliError> { } BasinActions::ReconfigureStream { stream, config } => { - let basin_client = BasinClient::new(client_config, basin)?; + let basin_client = BasinClient::new(client_config, basin); let mut mask = Vec::new(); if config.storage_class.is_some() { @@ -461,16 +463,17 @@ async fn run() -> Result<(), S2CliError> { stream, action, } => { + let basin = BasinName::try_from(basin)?; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; match action { StreamActions::CheckTail => { - let stream_client = StreamClient::new(client_config, basin, stream)?; + let stream_client = StreamClient::new(client_config, basin, stream); let next_seq_num = StreamService::new(stream_client).check_tail().await?; println!("{}", next_seq_num); } StreamActions::Append { records } => { - let stream_client = StreamClient::new(client_config, basin, stream)?; + let stream_client = StreamClient::new(client_config, basin, stream); let append_input_stream = RecordStream::new( records .into_reader() @@ -504,7 +507,7 @@ async fn run() -> Result<(), S2CliError> { start_seq_num, output, } => { - let stream_client = StreamClient::new(client_config, basin, stream)?; + let stream_client = StreamClient::new(client_config, basin, stream); let mut read_output_stream = StreamService::new(stream_client) .read_session(start_seq_num) .await?; @@ -524,7 +527,7 @@ async fn run() -> Result<(), S2CliError> { let read_result = read_result .map_err(|e| StreamServiceError::ReadSession(e.to_string()))?; - match read_result.output { + match read_result { ReadOutput::Batch(sequenced_record_batch) => { let num_records = sequenced_record_batch.records.len(); let mut batch_len = ByteSize::b(0); @@ -552,7 +555,7 @@ async fn run() -> Result<(), S2CliError> { if let Some(ref mut writer) = writer { writer - .write_all(&data) + .write_all(data) .await .map_err(|e| S2CliError::RecordWrite(e.to_string()))?; writer @@ -569,13 +572,14 @@ async fn run() -> Result<(), S2CliError> { / 1024.0; eprintln!( - "{}", - format!( - "{throughput_mibps:.2} MiB/s ({num_records} records in range {seq_range:?})", - ) - .blue() - .bold() - ); + "{}", + format!( + "{throughput_mibps:.2} MiB/s \ + ({num_records} records in range {seq_range:?})", + ) + .blue() + .bold() + ); } // TODO: better message for these cases ReadOutput::FirstSeqNum(seq_num) => { @@ -594,7 +598,9 @@ async fn run() -> Result<(), S2CliError> { eprintln!( "{}", format!( - "{total_data_len} metered bytes in {total_elapsed_time} seconds at {total_throughput_mibps:.2} MiB/s" + "{total_data_len} metered bytes in \ + {total_elapsed_time} seconds \ + at {total_throughput_mibps:.2} MiB/s" ) .yellow() .bold() diff --git a/src/stream.rs b/src/stream.rs index 01d382b..e4a68d9 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,7 +1,7 @@ use streamstore::{ + batching::AppendRecordsBatchingStream, client::StreamClient, - streams::AppendRecordStream, - types::{AppendOutput, ReadSessionRequest, ReadSessionResponse}, + types::{AppendOutput, ReadOutput, ReadSessionRequest}, Streaming, }; use tokio::io::AsyncBufRead; @@ -78,8 +78,8 @@ impl StreamService { &self, append_input_stream: RecordStream>, ) -> Result, StreamServiceError> { - let append_record_stream = AppendRecordStream::new(append_input_stream, Default::default()) - .expect("stream init can't fail"); + let append_record_stream = + AppendRecordsBatchingStream::new(append_input_stream, Default::default()); self.client .append_session(append_record_stream) @@ -90,7 +90,7 @@ impl StreamService { pub async fn read_session( &self, start_seq_num: Option, - ) -> Result, StreamServiceError> { + ) -> Result, StreamServiceError> { let mut read_session_req = ReadSessionRequest::new(); if let Some(start_seq_num) = start_seq_num { read_session_req = read_session_req.with_start_seq_num(start_seq_num);