Skip to content

Commit

Permalink
s
Browse files Browse the repository at this point in the history
  • Loading branch information
shikhar committed Nov 7, 2024
1 parent 4670074 commit 63228a6
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 21 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pin-project-lite = "0.2"
pin-utils = "0.1.0"
serde = { version = "1.0.214", features = ["derive"] }
serde_json = "1.0.132"
streamstore = { git = "https://github.com/s2-streamstore/s2-sdk-rust.git", branch = "main" }
streamstore = { git = "https://github.com/s2-streamstore/s2-sdk-rust.git", rev = "54e36154a1aaf9ead2512bb446677cb7e94a4dc2" }
thiserror = "1.0.67"
tokio = { version = "*", features = ["full"] }
tokio-stream = { version = "0.1.16", features = ["io-util"] }
Expand Down
6 changes: 5 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use miette::Diagnostic;
use streamstore::client::ClientError;
use streamstore::client::{ClientError, InvalidHostError};
use thiserror::Error;

use crate::{
Expand Down Expand Up @@ -32,6 +32,10 @@ pub enum S2CliError {
#[diagnostic(help("Are you connected to the internet?"))]
Connection(#[from] ClientError),

#[error("{0}")]
#[diagnostic(help("Are you overriding `S2_CLOUD`, `S2_CELL`, or `S2_BASIN_ZONE`?"))]
InvalidHost(#[from] InvalidHostError),

#[error(transparent)]
#[diagnostic(help("{}", HELP))]
AccountService(#[from] AccountServiceError),
Expand Down
34 changes: 17 additions & 17 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use config::{config_path, create_config};
use error::S2CliError;
use stream::{RecordStream, StreamService, StreamServiceError};
use streamstore::{
client::{BasinClient, Client, ClientConfig, HostCloud, StreamClient},
client::{BasinClient, Client, ClientConfig, HostEndpoints, InvalidHostError, StreamClient},
types::{BasinMetadata, ReadOutput},
};
use tokio::{
Expand Down Expand Up @@ -266,10 +266,10 @@ fn parse_records_output_source(s: &str) -> Result<RecordsIO, std::io::Error> {
}
}

fn s2_config(auth_token: String) -> ClientConfig {
ClientConfig::new(auth_token.to_string())
.with_host_uri(HostCloud::Local)
.with_connection_timeout(std::time::Duration::from_secs(5))
fn client_config(auth_token: String) -> Result<ClientConfig, InvalidHostError> {
Ok(ClientConfig::new(auth_token.to_string())
.with_host_endpoint(HostEndpoints::from_env()?)
.with_connection_timeout(std::time::Duration::from_secs(5)))
}

#[tokio::main]
Expand Down Expand Up @@ -308,8 +308,8 @@ async fn run() -> Result<(), S2CliError> {

Commands::Account { action } => {
let cfg = config::load_config(&config_path)?;
let account_service =
AccountService::new(Client::connect(s2_config(cfg.auth_token)).await?);
let client_config = client_config(cfg.auth_token)?;
let account_service = AccountService::new(Client::connect(client_config).await?);
match action {
AccountActions::ListBasins {
prefix,
Expand Down Expand Up @@ -383,14 +383,14 @@ async fn run() -> Result<(), S2CliError> {

Commands::Basin { basin, action } => {
let cfg = config::load_config(&config_path)?;
let basin_config = s2_config(cfg.auth_token);
let client_config = client_config(cfg.auth_token)?;
match action {
BasinActions::ListStreams {
prefix,
start_after,
limit,
} => {
let basin_client = BasinClient::connect(basin_config, basin).await?;
let basin_client = BasinClient::connect(client_config, basin).await?;
let streams = BasinService::new(basin_client)
.list_streams(
prefix.unwrap_or_default(),
Expand All @@ -404,23 +404,23 @@ async fn run() -> Result<(), S2CliError> {
}

BasinActions::CreateStream { stream, config } => {
let basin_client = BasinClient::connect(basin_config, basin).await?;
let basin_client = BasinClient::connect(client_config, basin).await?;
BasinService::new(basin_client)
.create_stream(stream, config.map(Into::into))
.await?;
eprintln!("{}", "✓ Stream created successfully".green().bold());
}

BasinActions::DeleteStream { stream } => {
let basin_client = BasinClient::connect(basin_config, basin).await?;
let basin_client = BasinClient::connect(client_config, basin).await?;
BasinService::new(basin_client)
.delete_stream(stream)
.await?;
eprintln!("{}", "✓ Stream deleted successfully".green().bold());
}

BasinActions::GetStreamConfig { stream } => {
let basin_client = BasinClient::connect(basin_config, basin).await?;
let basin_client = BasinClient::connect(client_config, basin).await?;
let config = BasinService::new(basin_client)
.get_stream_config(stream)
.await?;
Expand All @@ -429,7 +429,7 @@ async fn run() -> Result<(), S2CliError> {
}

BasinActions::ReconfigureStream { stream, config } => {
let basin_client = BasinClient::connect(basin_config, basin).await?;
let basin_client = BasinClient::connect(client_config, basin).await?;
let mut mask = Vec::new();

if config.storage_class.is_some() {
Expand All @@ -454,15 +454,15 @@ async fn run() -> Result<(), S2CliError> {
action,
} => {
let cfg = config::load_config(&config_path)?;
let basin_config = s2_config(cfg.auth_token);
let client_config = client_config(cfg.auth_token)?;
match action {
StreamActions::CheckTail => {
let stream_client = StreamClient::connect(basin_config, basin, stream).await?;
let stream_client = StreamClient::connect(client_config, basin, stream).await?;
let next_seq_num = StreamService::new(stream_client).check_tail().await?;
println!("{}", next_seq_num);
}
StreamActions::Append { records } => {
let stream_client = StreamClient::connect(basin_config, basin, stream).await?;
let stream_client = StreamClient::connect(client_config, basin, stream).await?;
let append_input_stream = RecordStream::new(
records
.into_reader()
Expand Down Expand Up @@ -496,7 +496,7 @@ async fn run() -> Result<(), S2CliError> {
start_seq_num,
output,
} => {
let stream_client = StreamClient::connect(basin_config, basin, stream).await?;
let stream_client = StreamClient::connect(client_config, basin, stream).await?;
let mut read_output_stream = StreamService::new(stream_client)
.read_session(start_seq_num)
.await?;
Expand Down

0 comments on commit 63228a6

Please sign in to comment.