From 63228a6e79496bc61059cb966a4096d6c3b22207 Mon Sep 17 00:00:00 2001 From: shikhar Date: Wed, 6 Nov 2024 19:08:00 -0500 Subject: [PATCH] s --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- src/error.rs | 6 +++++- src/main.rs | 34 +++++++++++++++++----------------- 4 files changed, 25 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7b39beb..df627d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1671,7 +1671,7 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" [[package]] name = "streamstore" version = "0.1.0" -source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?branch=main#326610dfd4229a8d66c4429ec7820c8f83636290" +source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=54e36154a1aaf9ead2512bb446677cb7e94a4dc2#54e36154a1aaf9ead2512bb446677cb7e94a4dc2" dependencies = [ "backon", "bytesize", @@ -1733,7 +1733,7 @@ dependencies = [ [[package]] name = "sync_docs" version = "0.1.0" -source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?branch=main#326610dfd4229a8d66c4429ec7820c8f83636290" +source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=54e36154a1aaf9ead2512bb446677cb7e94a4dc2#54e36154a1aaf9ead2512bb446677cb7e94a4dc2" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 301540e..f711daf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ pin-project-lite = "0.2" pin-utils = "0.1.0" serde = { version = "1.0.214", features = ["derive"] } serde_json = "1.0.132" -streamstore = { git = "https://github.com/s2-streamstore/s2-sdk-rust.git", branch = "main" } +streamstore = { git = "https://github.com/s2-streamstore/s2-sdk-rust.git", rev = "54e36154a1aaf9ead2512bb446677cb7e94a4dc2" } thiserror = "1.0.67" tokio = { version = "*", features = ["full"] } tokio-stream = { version = "0.1.16", features = ["io-util"] } diff --git a/src/error.rs b/src/error.rs index 0639b27..1bb1a37 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,5 @@ use miette::Diagnostic; -use streamstore::client::ClientError; +use streamstore::client::{ClientError, InvalidHostError}; use thiserror::Error; use crate::{ @@ -32,6 +32,10 @@ pub enum S2CliError { #[diagnostic(help("Are you connected to the internet?"))] Connection(#[from] ClientError), + #[error("{0}")] + #[diagnostic(help("Are you overriding `S2_CLOUD`, `S2_CELL`, or `S2_BASIN_ZONE`?"))] + InvalidHost(#[from] InvalidHostError), + #[error(transparent)] #[diagnostic(help("{}", HELP))] AccountService(#[from] AccountServiceError), diff --git a/src/main.rs b/src/main.rs index 3f282f0..2a4a788 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,7 @@ use config::{config_path, create_config}; use error::S2CliError; use stream::{RecordStream, StreamService, StreamServiceError}; use streamstore::{ - client::{BasinClient, Client, ClientConfig, HostCloud, StreamClient}, + client::{BasinClient, Client, ClientConfig, HostEndpoints, InvalidHostError, StreamClient}, types::{BasinMetadata, ReadOutput}, }; use tokio::{ @@ -266,10 +266,10 @@ fn parse_records_output_source(s: &str) -> Result { } } -fn s2_config(auth_token: String) -> ClientConfig { - ClientConfig::new(auth_token.to_string()) - .with_host_uri(HostCloud::Local) - .with_connection_timeout(std::time::Duration::from_secs(5)) +fn client_config(auth_token: String) -> Result { + Ok(ClientConfig::new(auth_token.to_string()) + .with_host_endpoint(HostEndpoints::from_env()?) + .with_connection_timeout(std::time::Duration::from_secs(5))) } #[tokio::main] @@ -308,8 +308,8 @@ async fn run() -> Result<(), S2CliError> { Commands::Account { action } => { let cfg = config::load_config(&config_path)?; - let account_service = - AccountService::new(Client::connect(s2_config(cfg.auth_token)).await?); + let client_config = client_config(cfg.auth_token)?; + let account_service = AccountService::new(Client::connect(client_config).await?); match action { AccountActions::ListBasins { prefix, @@ -383,14 +383,14 @@ async fn run() -> Result<(), S2CliError> { Commands::Basin { basin, action } => { let cfg = config::load_config(&config_path)?; - let basin_config = s2_config(cfg.auth_token); + let client_config = client_config(cfg.auth_token)?; match action { BasinActions::ListStreams { prefix, start_after, limit, } => { - let basin_client = BasinClient::connect(basin_config, basin).await?; + let basin_client = BasinClient::connect(client_config, basin).await?; let streams = BasinService::new(basin_client) .list_streams( prefix.unwrap_or_default(), @@ -404,7 +404,7 @@ async fn run() -> Result<(), S2CliError> { } BasinActions::CreateStream { stream, config } => { - let basin_client = BasinClient::connect(basin_config, basin).await?; + let basin_client = BasinClient::connect(client_config, basin).await?; BasinService::new(basin_client) .create_stream(stream, config.map(Into::into)) .await?; @@ -412,7 +412,7 @@ async fn run() -> Result<(), S2CliError> { } BasinActions::DeleteStream { stream } => { - let basin_client = BasinClient::connect(basin_config, basin).await?; + let basin_client = BasinClient::connect(client_config, basin).await?; BasinService::new(basin_client) .delete_stream(stream) .await?; @@ -420,7 +420,7 @@ async fn run() -> Result<(), S2CliError> { } BasinActions::GetStreamConfig { stream } => { - let basin_client = BasinClient::connect(basin_config, basin).await?; + let basin_client = BasinClient::connect(client_config, basin).await?; let config = BasinService::new(basin_client) .get_stream_config(stream) .await?; @@ -429,7 +429,7 @@ async fn run() -> Result<(), S2CliError> { } BasinActions::ReconfigureStream { stream, config } => { - let basin_client = BasinClient::connect(basin_config, basin).await?; + let basin_client = BasinClient::connect(client_config, basin).await?; let mut mask = Vec::new(); if config.storage_class.is_some() { @@ -454,15 +454,15 @@ async fn run() -> Result<(), S2CliError> { action, } => { let cfg = config::load_config(&config_path)?; - let basin_config = s2_config(cfg.auth_token); + let client_config = client_config(cfg.auth_token)?; match action { StreamActions::CheckTail => { - let stream_client = StreamClient::connect(basin_config, basin, stream).await?; + let stream_client = StreamClient::connect(client_config, basin, stream).await?; let next_seq_num = StreamService::new(stream_client).check_tail().await?; println!("{}", next_seq_num); } StreamActions::Append { records } => { - let stream_client = StreamClient::connect(basin_config, basin, stream).await?; + let stream_client = StreamClient::connect(client_config, basin, stream).await?; let append_input_stream = RecordStream::new( records .into_reader() @@ -496,7 +496,7 @@ async fn run() -> Result<(), S2CliError> { start_seq_num, output, } => { - let stream_client = StreamClient::connect(basin_config, basin, stream).await?; + let stream_client = StreamClient::connect(client_config, basin, stream).await?; let mut read_output_stream = StreamService::new(stream_client) .read_session(start_seq_num) .await?;