From 5c6c718b5870fd19552cf186645b8cf850766a9a Mon Sep 17 00:00:00 2001 From: Mehul Arora Date: Fri, 15 Nov 2024 14:46:45 -0500 Subject: [PATCH 1/3] feat: display throughput for read session --- Cargo.lock | 95 ++++++++++++++++++++++++++------------------------ Cargo.toml | 2 +- src/account.rs | 42 +++++++++++++--------- src/basin.rs | 48 +++++++++++++++---------- src/error.rs | 18 ++++++---- src/main.rs | 69 +++++++++++++++++++++++++++--------- src/stream.rs | 34 +++++++++++------- 7 files changed, 190 insertions(+), 118 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9d19c02..c4f820d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -40,15 +40,15 @@ dependencies = [ [[package]] name = "allocator-api2" -version = "0.2.18" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" +checksum = "45862d1c77f2228b9e10bc609d5bc203d86ebc9b87ad8d5d5167a6c9abf739d9" [[package]] name = "anstream" -version = "0.6.17" +version = "0.6.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23a1e53f0f5d86382dafe1cf314783b2044280f406e7e1506368220ad11b1338" +checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b" dependencies = [ "anstyle", "anstyle-parse", @@ -95,9 +95,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.92" +version = "1.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74f37166d7d48a0284b99dd824694c26119c700b53bf0d1540cdb147dbdaaf13" +checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" [[package]] name = "arraydeque" @@ -288,9 +288,9 @@ checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc" [[package]] name = "cc" -version = "1.1.34" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67b9470d453346108f93a59222a9a1a5724db32d0a4727b7ab7ace4b4d822dc9" +checksum = "fd9de9f2205d5ef3fd67e685b0df337994ddd4495e2a28d185500d0e1edfea47" dependencies = [ "shlex", ] @@ -303,9 +303,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "clap" -version = "4.5.20" +version = "4.5.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b97f376d85a664d5837dbae44bf546e6477a679ff6610010f17276f686d867e8" +checksum = "fb3b4b9e5a7c7514dfa52869339ee98b3156b0bfb4e8a77c4ff4babb64b1604f" dependencies = [ "clap_builder", "clap_derive", @@ -313,9 +313,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.20" +version = "4.5.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19bc80abd44e4bed93ca373a0704ccbd1b710dc5749406201bb018272808dc54" +checksum = "b17a95aa67cc7b5ebd32aa5370189aa0d79069ef1c64ce893bd30fb24bff20ec" dependencies = [ "anstream", "anstyle", @@ -337,24 +337,24 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" +checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7" [[package]] name = "color-print" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ee543c60ff3888934877a5671f45494dd27ed4ba25c6670b9a7576b7ed7a8c0" +checksum = "3aa954171903797d5623e047d9ab69d91b493657917bdfb8c2c80ecaf9cdb6f4" dependencies = [ "color-print-proc-macro", ] [[package]] name = "color-print-proc-macro" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77ff1a80c5f3cb1ca7c06ffdd71b6a6dd6d8f896c42141fbd43f50ed28dcdb93" +checksum = "692186b5ebe54007e45a59aea47ece9eb4108e141326c304cdc91699a7118a22" dependencies = [ "nom", "proc-macro2", @@ -428,9 +428,9 @@ dependencies = [ [[package]] name = "cpufeatures" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "608697df725056feaccfa42cffdaeeec3fccc4ffc38358ecd19b243e716a78e0" +checksum = "0ca741a962e1b0bff6d724a1a0958b686406e853bb14061f218562e1896f95e6" dependencies = [ "libc", ] @@ -524,9 +524,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" +checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4" [[package]] name = "fixedbitset" @@ -705,9 +705,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.15.0" +version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3" [[package]] name = "hashlink" @@ -852,7 +852,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "equivalent", - "hashbrown 0.15.0", + "hashbrown 0.15.1", ] [[package]] @@ -910,9 +910,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.161" +version = "0.2.162" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" +checksum = "18d287de67fe55fd7e1581fe933d965a5a9477b38e949cfa9f8574ef01506398" [[package]] name = "libredox" @@ -1370,7 +1370,7 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.8", + "regex-automata 0.4.9", "regex-syntax 0.8.5", ] @@ -1385,9 +1385,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" +checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", @@ -1451,9 +1451,9 @@ checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] name = "rustix" -version = "0.38.38" +version = "0.38.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa260229e6538e52293eeb577aabd09945a09d6d9cc0fc550ed7529056c2e32a" +checksum = "99e4ea3e1cdc4b559b8e5650f9c8e5998e3e5c1343b4eaf034565f32318d63c0" dependencies = [ "bitflags", "errno", @@ -1557,18 +1557,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.214" +version = "1.0.215" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f55c3193aca71c12ad7890f1785d2b73e1b9f63a0bbc353c08ef26fe03fc56b5" +checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.214" +version = "1.0.215" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de523f781f095e28fa605cdce0f8307e451cc0fd14e2eb4cd2e98a355b147766" +checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" dependencies = [ "proc-macro2", "quote", @@ -1671,12 +1671,14 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" [[package]] name = "streamstore" version = "0.1.0" -source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=54e36154a1aaf9ead2512bb446677cb7e94a4dc2#54e36154a1aaf9ead2512bb446677cb7e94a4dc2" +source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=ead36a9bf8177dab744ac158b625ffec9ea6c50a#ead36a9bf8177dab744ac158b625ffec9ea6c50a" dependencies = [ "backon", "bytesize", "futures", "http", + "hyper", + "hyper-util", "prost", "prost-types", "secrecy", @@ -1684,6 +1686,7 @@ dependencies = [ "thiserror", "tonic", "tonic-build", + "tower-service", ] [[package]] @@ -1733,7 +1736,7 @@ dependencies = [ [[package]] name = "sync_docs" version = "0.1.0" -source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=54e36154a1aaf9ead2512bb446677cb7e94a4dc2#54e36154a1aaf9ead2512bb446677cb7e94a4dc2" +source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=ead36a9bf8177dab744ac158b625ffec9ea6c50a#ead36a9bf8177dab744ac158b625ffec9ea6c50a" dependencies = [ "proc-macro2", "quote", @@ -1754,9 +1757,9 @@ checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" [[package]] name = "tempfile" -version = "3.13.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0f2c9fc62d0beef6951ccffd757e241266a2c833136efbe35af6cd2567dca5b" +checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" dependencies = [ "cfg-if", "fastrand", @@ -1788,18 +1791,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.67" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b3c6efbfc763e64eb85c11c25320f0737cb7364c4b6336db90aa9ebe27a0bbd" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.67" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b607164372e89797d78b8e23a6d67d5d1038c1c65efd52e1389ef8b77caba2a6" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", @@ -1827,9 +1830,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.41.0" +version = "1.41.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "145f3413504347a2be84393cc8a7d2fb4d863b375909ea59f2158261aa258bbb" +checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33" dependencies = [ "backtrace", "bytes", diff --git a/Cargo.toml b/Cargo.toml index a3fe5b3..8c6f779 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ pin-project-lite = "0.2" pin-utils = "0.1.0" serde = { version = "1.0.214", features = ["derive"] } serde_json = "1.0.132" -streamstore = { git = "https://github.com/s2-streamstore/s2-sdk-rust.git", rev = "54e36154a1aaf9ead2512bb446677cb7e94a4dc2" } +streamstore = { git = "https://github.com/s2-streamstore/s2-sdk-rust.git", rev = "ead36a9bf8177dab744ac158b625ffec9ea6c50a" } thiserror = "1.0.67" tokio = { version = "*", features = ["full"] } tokio-stream = { version = "0.1.16", features = ["io-util"] } diff --git a/src/account.rs b/src/account.rs index d1cdcdf..16738c0 100644 --- a/src/account.rs +++ b/src/account.rs @@ -1,15 +1,13 @@ use streamstore::{ client::Client, - service_error::{ - CreateBasinError, DeleteBasinError, GetBasinConfigError, ReconfigureBasinError, - ServiceError, - }, types::{ BasinConfig, BasinMetadata, CreateBasinRequest, DeleteBasinRequest, ListBasinsRequest, ListBasinsResponse, ReconfigureBasinRequest, StreamConfig, }, }; +use crate::error::s2_status; + pub struct AccountService { client: Client, } @@ -19,17 +17,17 @@ pub enum AccountServiceError { #[error("Failed to list basins: {0}")] ListBasins(String), - #[error("Failed to create basin")] - CreateBasin(#[from] ServiceError), + #[error("Failed to create basin: {0}")] + CreateBasin(String), - #[error("Failed to delete basin")] - DeleteBasin(#[from] ServiceError), + #[error("Failed to delete basin: {0}")] + DeleteBasin(String), - #[error("Failed to get basin config")] - GetBasinConfig(#[from] ServiceError), + #[error("Failed to get basin config: {0}")] + GetBasinConfig(String), - #[error("Failed to reconfigure basin")] - ReconfigureBasin(#[from] ServiceError), + #[error("Failed to reconfigure basin: {0}")] + ReconfigureBasin(String), } impl AccountService { @@ -51,7 +49,7 @@ impl AccountService { self.client .list_basins(list_basins_req) .await - .map_err(|e| AccountServiceError::ListBasins(e.to_string())) + .map_err(|e| AccountServiceError::ListBasins(s2_status(&e))) } pub async fn create_basin( @@ -76,12 +74,15 @@ impl AccountService { self.client .create_basin(create_basin_req) .await - .map_err(AccountServiceError::CreateBasin) + .map_err(|e| AccountServiceError::CreateBasin(s2_status(&e))) } pub async fn delete_basin(&self, basin: String) -> Result<(), AccountServiceError> { let delete_basin_req = DeleteBasinRequest::new(basin); - self.client.delete_basin(delete_basin_req).await?; + self.client + .delete_basin(delete_basin_req) + .await + .map_err(|e| AccountServiceError::DeleteBasin(s2_status(&e)))?; Ok(()) } @@ -89,7 +90,11 @@ impl AccountService { &self, basin: String, ) -> Result { - Ok(self.client.get_basin_config(basin).await?) + Ok(self + .client + .get_basin_config(basin) + .await + .map_err(|e| AccountServiceError::GetBasinConfig(s2_status(&e)))?) } pub async fn reconfigure_basin( @@ -101,7 +106,10 @@ impl AccountService { let reconfigure_basin_req = ReconfigureBasinRequest::new(basin) .with_config(basin_config) .with_mask(mask); - self.client.reconfigure_basin(reconfigure_basin_req).await?; + self.client + .reconfigure_basin(reconfigure_basin_req) + .await + .map_err(|e| AccountServiceError::ReconfigureBasin(s2_status(&e)))?; Ok(()) } } diff --git a/src/basin.rs b/src/basin.rs index 9fbe5b1..20f27f3 100644 --- a/src/basin.rs +++ b/src/basin.rs @@ -1,15 +1,13 @@ use streamstore::{ client::BasinClient, - service_error::{ - CreateStreamError, DeleteStreamError, GetStreamConfigError, ListStreamsError, - ReconfigureStreamError, ServiceError, - }, types::{ CreateStreamRequest, DeleteStreamRequest, ListStreamsRequest, ListStreamsResponse, ReconfigureStreamRequest, StreamConfig, }, }; +use crate::error::s2_status; + pub struct BasinService { client: BasinClient, } @@ -17,19 +15,19 @@ pub struct BasinService { #[derive(Debug, thiserror::Error)] pub enum BasinServiceError { #[error("Failed to list streams: {0}")] - ListStreams(#[from] ServiceError), + ListStreams(String), - #[error("Failed to create stream")] - CreateStream(#[from] ServiceError), + #[error("Failed to create stream: {0}")] + CreateStream(String), - #[error("Failed to delete stream")] - DeleteStream(#[from] ServiceError), + #[error("Failed to delete stream: {0}")] + DeleteStream(String), - #[error("Failed to get stream config")] - GetStreamConfig(#[from] ServiceError), + #[error("Failed to get stream config: {0}")] + GetStreamConfig(String), - #[error("Failed to reconfigure stream")] - ReconfigureStream(#[from] ServiceError), + #[error("Failed to reconfigure stream: {0}")] + ReconfigureStream(String), } impl BasinService { @@ -48,8 +46,11 @@ impl BasinService { .with_start_after(start_after) .with_limit(limit); - let ListStreamsResponse { streams, .. } = - self.client.list_streams(list_streams_req).await?; + let ListStreamsResponse { streams, .. } = self + .client + .list_streams(list_streams_req) + .await + .map_err(|e| BasinServiceError::ListStreams(s2_status(&e)))?; Ok(streams) } @@ -65,14 +66,18 @@ impl BasinService { create_stream_req = create_stream_req.with_config(config); }; - self.client.create_stream(create_stream_req).await?; + self.client + .create_stream(create_stream_req) + .await + .map_err(|e| BasinServiceError::CreateStream(s2_status(&e)))?; Ok(()) } pub async fn delete_stream(&self, stream: String) -> Result<(), BasinServiceError> { self.client .delete_stream(DeleteStreamRequest::new(stream)) - .await?; + .await + .map_err(|e| BasinServiceError::DeleteStream(s2_status(&e)))?; Ok(()) } @@ -80,7 +85,11 @@ impl BasinService { &self, stream: String, ) -> Result { - Ok(self.client.get_stream_config(stream).await?) + Ok(self + .client + .get_stream_config(stream) + .await + .map_err(|e| BasinServiceError::GetStreamConfig(s2_status(&e)))?) } pub async fn reconfigure_stream( @@ -95,7 +104,8 @@ impl BasinService { self.client .reconfigure_stream(reconfigure_stream_req) - .await?; + .await + .map_err(|e| BasinServiceError::ReconfigureStream(s2_status(&e)))?; Ok(()) } } diff --git a/src/error.rs b/src/error.rs index 1bb1a37..43ae2ba 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,5 @@ use miette::Diagnostic; -use streamstore::client::{ClientError, InvalidHostError}; +use streamstore::client::{ClientError, ConnectionError, ParseError}; use thiserror::Error; use crate::{ @@ -30,11 +30,11 @@ pub enum S2CliError { #[error("Failed to connect to s2: {0}")] #[diagnostic(help("Are you connected to the internet?"))] - Connection(#[from] ClientError), + Connection(#[from] ConnectionError), #[error("{0}")] #[diagnostic(help("Are you overriding `S2_CLOUD`, `S2_CELL`, or `S2_BASIN_ZONE`?"))] - InvalidHost(#[from] InvalidHostError), + HostEndpoints(#[from] ParseError), #[error(transparent)] #[diagnostic(help("{}", HELP))] @@ -52,7 +52,13 @@ pub enum S2CliError { #[diagnostic(help("{}", BUG_HELP))] InvalidConfig(#[from] serde_json::Error), - #[error("Failed to initialize a `Record Reader`!")] - #[diagnostic(help("{}", BUG_HELP))] - RecordReaderInit, + #[error("Failed to initialize a `Record Reader`! {0}")] + RecordReaderInit(String), +} + +pub fn s2_status(error: &ClientError) -> String { + match error { + ClientError::Service(status) => status.code().to_string(), + _ => error.to_string(), + } } diff --git a/src/main.rs b/src/main.rs index fa802f7..7d59f28 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,12 +8,14 @@ use config::{config_path, create_config}; use error::S2CliError; use stream::{RecordStream, StreamService, StreamServiceError}; use streamstore::{ - client::{BasinClient, Client, ClientConfig, HostEndpoints, InvalidHostError, StreamClient}, - types::{BasinMetadata, ReadOutput}, + bytesize::ByteSize, + client::{BasinClient, Client, ClientConfig, HostEndpoints, ParseError, StreamClient}, + types::{BasinMetadata, MeteredSize as _, ReadOutput}, }; use tokio::{ fs::{File, OpenOptions}, io::{self, AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}, + time::Instant, }; use tokio_stream::StreamExt; use tracing::trace; @@ -271,10 +273,10 @@ fn parse_records_output_source(s: &str) -> Result { } } -fn client_config(auth_token: String) -> Result { +fn client_config(auth_token: String) -> Result { Ok(ClientConfig::new(auth_token.to_string()) .with_user_agent("s2-cli") - .with_host_endpoint(HostEndpoints::from_env()?) + .with_host_endpoints(HostEndpoints::from_env()?) .with_connection_timeout(std::time::Duration::from_secs(5))) } @@ -315,7 +317,7 @@ async fn run() -> Result<(), S2CliError> { Commands::Account { action } => { let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; - let account_service = AccountService::new(Client::connect(client_config).await?); + let account_service = AccountService::new(Client::new(client_config)?); match action { AccountActions::ListBasins { prefix, @@ -396,7 +398,7 @@ async fn run() -> Result<(), S2CliError> { start_after, limit, } => { - let basin_client = BasinClient::connect(client_config, basin).await?; + let basin_client = BasinClient::new(client_config, basin)?; let streams = BasinService::new(basin_client) .list_streams( prefix.unwrap_or_default(), @@ -410,7 +412,7 @@ async fn run() -> Result<(), S2CliError> { } BasinActions::CreateStream { stream, config } => { - let basin_client = BasinClient::connect(client_config, basin).await?; + let basin_client = BasinClient::new(client_config, basin)?; BasinService::new(basin_client) .create_stream(stream, config.map(Into::into)) .await?; @@ -418,7 +420,7 @@ async fn run() -> Result<(), S2CliError> { } BasinActions::DeleteStream { stream } => { - let basin_client = BasinClient::connect(client_config, basin).await?; + let basin_client = BasinClient::new(client_config, basin)?; BasinService::new(basin_client) .delete_stream(stream) .await?; @@ -426,7 +428,7 @@ async fn run() -> Result<(), S2CliError> { } BasinActions::GetStreamConfig { stream } => { - let basin_client = BasinClient::connect(client_config, basin).await?; + let basin_client = BasinClient::new(client_config, basin)?; let config = BasinService::new(basin_client) .get_stream_config(stream) .await?; @@ -435,7 +437,7 @@ async fn run() -> Result<(), S2CliError> { } BasinActions::ReconfigureStream { stream, config } => { - let basin_client = BasinClient::connect(client_config, basin).await?; + let basin_client = BasinClient::new(client_config, basin)?; let mut mask = Vec::new(); if config.storage_class.is_some() { @@ -463,17 +465,17 @@ async fn run() -> Result<(), S2CliError> { let client_config = client_config(cfg.auth_token)?; match action { StreamActions::CheckTail => { - let stream_client = StreamClient::connect(client_config, basin, stream).await?; + let stream_client = StreamClient::new(client_config, basin, stream)?; let next_seq_num = StreamService::new(stream_client).check_tail().await?; println!("{}", next_seq_num); } StreamActions::Append { records } => { - let stream_client = StreamClient::connect(client_config, basin, stream).await?; + let stream_client = StreamClient::new(client_config, basin, stream)?; let append_input_stream = RecordStream::new( records .into_reader() .await - .map_err(|_| S2CliError::RecordReaderInit)? + .map_err(|e| S2CliError::RecordReaderInit(e.to_string()))? .lines(), ); @@ -495,14 +497,14 @@ async fn run() -> Result<(), S2CliError> { .bold() ); }) - .map_err(StreamServiceError::AppendSession)?; + .map_err(|e| StreamServiceError::AppendSession(e.to_string()))?; } } StreamActions::Read { start_seq_num, output, } => { - let stream_client = StreamClient::connect(client_config, basin, stream).await?; + let stream_client = StreamClient::new(client_config, basin, stream)?; let mut read_output_stream = StreamService::new(stream_client) .read_session(start_seq_num) .await?; @@ -510,10 +512,30 @@ async fn run() -> Result<(), S2CliError> { Some(output) => Some(output.into_writer().await.unwrap()), None => None, }; + + let mut start = None; + let mut total_data_len = ByteSize::b(0); + while let Some(read_result) = read_output_stream.next().await { - let read_result = read_result.map_err(StreamServiceError::ReadSession)?; + if start.is_none() { + start = Some(Instant::now()); + } + + let read_result = read_result + .map_err(|e| StreamServiceError::ReadSession(e.to_string()))?; + match read_result.output { ReadOutput::Batch(sequenced_record_batch) => { + let num_records = sequenced_record_batch.records.len(); + let mut batch_len = ByteSize::b(0); + + let seq_range = match ( + sequenced_record_batch.records.first(), + sequenced_record_batch.records.last(), + ) { + (Some(first), Some(last)) => first.seq_num..=last.seq_num, + _ => panic!("empty batch"), + }; for sequenced_record in sequenced_record_batch.records { eprintln!( "{}", @@ -524,11 +546,24 @@ async fn run() -> Result<(), S2CliError> { .green() .bold() ); + + let data = &sequenced_record.body; + batch_len += sequenced_record.metered_size(); + if let Some(ref mut writer) = writer { - writer.write_all(&sequenced_record.body).await.unwrap(); + writer.write_all(&data).await.unwrap(); writer.write_all(b"\n").await.unwrap(); } } + total_data_len += batch_len; + + let throughput_mibps = ((total_data_len.0 as f64 + / start.unwrap().elapsed().as_secs_f64()) + / 1024.0 + / 1024.0) + as u64; + + eprintln!("{seq_range:?} // {batch_len} bytes in {num_records} records // {throughput_mibps:?}"); } // TODO: better message for these cases ReadOutput::FirstSeqNum(seq_num) => { diff --git a/src/stream.rs b/src/stream.rs index a610ac7..01d382b 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,6 +1,5 @@ use streamstore::{ client::StreamClient, - service_error::{AppendSessionError, CheckTailError, ReadSessionError, ServiceError}, streams::AppendRecordStream, types::{AppendOutput, ReadSessionRequest, ReadSessionResponse}, Streaming, @@ -14,6 +13,8 @@ use streamstore::types::AppendRecord; use tokio::io::Lines; use tokio_stream::Stream; +use crate::error::s2_status; + pin_project! { #[derive(Debug)] pub struct RecordStream { @@ -51,14 +52,14 @@ pub struct StreamService { #[derive(Debug, thiserror::Error)] pub enum StreamServiceError { - #[error("Failed to get next sequence number")] - CheckTail(#[from] ServiceError), + #[error("Failed to get next sequence number: {0}")] + CheckTail(String), - #[error("Failed to append records")] - AppendSession(#[from] ServiceError), + #[error("Failed to append records: {0}")] + AppendSession(String), - #[error("Failed to read records")] - ReadSession(#[from] ServiceError), + #[error("Failed to read records: {0}")] + ReadSession(String), } impl StreamService { @@ -67,27 +68,36 @@ impl StreamService { } pub async fn check_tail(&self) -> Result { - Ok(self.client.check_tail().await?) + self.client + .check_tail() + .await + .map_err(|e| StreamServiceError::CheckTail(s2_status(&e))) } pub async fn append_session( &self, append_input_stream: RecordStream>, - ) -> Result, StreamServiceError> { + ) -> Result, StreamServiceError> { let append_record_stream = AppendRecordStream::new(append_input_stream, Default::default()) .expect("stream init can't fail"); - Ok(self.client.append_session(append_record_stream).await?) + self.client + .append_session(append_record_stream) + .await + .map_err(|e| StreamServiceError::AppendSession(s2_status(&e))) } pub async fn read_session( &self, start_seq_num: Option, - ) -> Result, StreamServiceError> { + ) -> Result, StreamServiceError> { let mut read_session_req = ReadSessionRequest::new(); if let Some(start_seq_num) = start_seq_num { read_session_req = read_session_req.with_start_seq_num(start_seq_num); } - Ok(self.client.read_session(read_session_req).await?) + self.client + .read_session(read_session_req) + .await + .map_err(|e| StreamServiceError::ReadSession(s2_status(&e))) } } From d2efcd064b0a7a02d44938cd04e2068b7359e753 Mon Sep 17 00:00:00 2001 From: Mehul Arora Date: Fri, 15 Nov 2024 15:16:15 -0500 Subject: [PATCH 2/3] change to float and display total --- src/main.rs | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/src/main.rs b/src/main.rs index 7d59f28..d52d22b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -557,13 +557,22 @@ async fn run() -> Result<(), S2CliError> { } total_data_len += batch_len; - let throughput_mibps = ((total_data_len.0 as f64 + let throughput_mibps = (total_data_len.0 as f64 / start.unwrap().elapsed().as_secs_f64()) / 1024.0 - / 1024.0) - as u64; + / 1024.0; - eprintln!("{seq_range:?} // {batch_len} bytes in {num_records} records // {throughput_mibps:?}"); + eprintln!( + "{}", + format!( + "✓ [READ] Batch throughput: {:.2} MiB/s ({} records in range {:?})", + throughput_mibps, + num_records, + seq_range + ) + .blue() + .bold() + ); } // TODO: better message for these cases ReadOutput::FirstSeqNum(seq_num) => { @@ -579,6 +588,22 @@ async fn run() -> Result<(), S2CliError> { ); } } + + let total_throughput_mibps = (total_data_len.0 as f64 + / start.unwrap().elapsed().as_secs_f64()) + / 1024.0 + / 1024.0; + + eprintln!( + "{}", + format!( + "✓ [READ] Total throughput: {:.2} MiB/s", + total_throughput_mibps + ) + .yellow() + .bold() + ); + if let Some(ref mut writer) = writer { writer.flush().await.expect("writer flush"); } From dcac0112972efeca411d118f136994234c51943d Mon Sep 17 00:00:00 2001 From: Mehul Arora Date: Fri, 15 Nov 2024 19:29:57 -0500 Subject: [PATCH 3/3] . --- src/error.rs | 3 +++ src/main.rs | 36 ++++++++++++++++-------------------- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/src/error.rs b/src/error.rs index 43ae2ba..31834af 100644 --- a/src/error.rs +++ b/src/error.rs @@ -54,6 +54,9 @@ pub enum S2CliError { #[error("Failed to initialize a `Record Reader`! {0}")] RecordReaderInit(String), + + #[error("Failed to write records: {0}")] + RecordWrite(String), } pub fn s2_status(error: &ClientError) -> String { diff --git a/src/main.rs b/src/main.rs index d52d22b..d10de64 100644 --- a/src/main.rs +++ b/src/main.rs @@ -551,8 +551,14 @@ async fn run() -> Result<(), S2CliError> { batch_len += sequenced_record.metered_size(); if let Some(ref mut writer) = writer { - writer.write_all(&data).await.unwrap(); - writer.write_all(b"\n").await.unwrap(); + writer + .write_all(&data) + .await + .map_err(|e| S2CliError::RecordWrite(e.to_string()))?; + writer + .write_all(b"\n") + .await + .map_err(|e| S2CliError::RecordWrite(e.to_string()))?; } } total_data_len += batch_len; @@ -565,10 +571,7 @@ async fn run() -> Result<(), S2CliError> { eprintln!( "{}", format!( - "✓ [READ] Batch throughput: {:.2} MiB/s ({} records in range {:?})", - throughput_mibps, - num_records, - seq_range + "{throughput_mibps:.2} MiB/s ({num_records} records in range {seq_range:?})", ) .blue() .bold() @@ -576,29 +579,22 @@ async fn run() -> Result<(), S2CliError> { } // TODO: better message for these cases ReadOutput::FirstSeqNum(seq_num) => { - eprintln!( - "{}", - format!("✓ [READ] first_seq_num: {}", seq_num).blue().bold() - ); + eprintln!("{}", format!("first_seq_num: {seq_num}").blue().bold()); } ReadOutput::NextSeqNum(seq_num) => { - eprintln!( - "{}", - format!("✓ [READ] next_seq_num: {}", seq_num).blue().bold() - ); + eprintln!("{}", format!("next_seq_num: {seq_num}").blue().bold()); } } - let total_throughput_mibps = (total_data_len.0 as f64 - / start.unwrap().elapsed().as_secs_f64()) - / 1024.0 - / 1024.0; + let total_elapsed_time = start.unwrap().elapsed().as_secs_f64(); + + let total_throughput_mibps = + (total_data_len.0 as f64 / total_elapsed_time) / 1024.0 / 1024.0; eprintln!( "{}", format!( - "✓ [READ] Total throughput: {:.2} MiB/s", - total_throughput_mibps + "{total_data_len} metered bytes in {total_elapsed_time} seconds at {total_throughput_mibps:.2} MiB/s" ) .yellow() .bold()