Skip to content

Commit

Permalink
chore: Update SDK (#26)
Browse files Browse the repository at this point in the history
Signed-off-by: Vaibhav Rabber <[email protected]>
  • Loading branch information
vrongmeal authored Nov 19, 2024
1 parent 4b7c646 commit 74326a3
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 49 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,7 @@ jobs:
run: cargo doc --workspace --all-features --no-deps --document-private-items
- name: Check formatting
run: cargo fmt --all -- --check
- name: check clippy
run: cargo clippy --workspace --all-features --all-targets -- -D warnings --allow deprecated
- name: Check Cargo.toml sorting
run: cargo sort --workspace --check
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pin-project-lite = "0.2"
pin-utils = "0.1.0"
serde = { version = "1.0.214", features = ["derive"] }
serde_json = "1.0.132"
streamstore = { git = "https://github.com/s2-streamstore/s2-sdk-rust.git", rev = "ead36a9bf8177dab744ac158b625ffec9ea6c50a" }
streamstore = { git = "https://github.com/s2-streamstore/s2-sdk-rust.git", rev = "f39cb52c15869b0d02c92e441b2828a27dfeac72" }
thiserror = "1.0.67"
tokio = { version = "*", features = ["full"] }
tokio-stream = { version = "0.1.16", features = ["io-util"] }
Expand Down
17 changes: 8 additions & 9 deletions src/account.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use streamstore::{
client::Client,
types::{
BasinConfig, BasinMetadata, CreateBasinRequest, DeleteBasinRequest, ListBasinsRequest,
ListBasinsResponse, ReconfigureBasinRequest, StreamConfig,
BasinConfig, BasinMetadata, BasinName, CreateBasinRequest, DeleteBasinRequest,
ListBasinsRequest, ListBasinsResponse, ReconfigureBasinRequest, StreamConfig,
},
};

Expand Down Expand Up @@ -54,7 +54,7 @@ impl AccountService {

pub async fn create_basin(
&self,
basin: String,
basin: BasinName,
storage_class: Option<crate::types::StorageClass>,
retention_policy: Option<crate::types::RetentionPolicy>,
) -> Result<BasinMetadata, AccountServiceError> {
Expand All @@ -77,7 +77,7 @@ impl AccountService {
.map_err(|e| AccountServiceError::CreateBasin(s2_status(&e)))
}

pub async fn delete_basin(&self, basin: String) -> Result<(), AccountServiceError> {
pub async fn delete_basin(&self, basin: BasinName) -> Result<(), AccountServiceError> {
let delete_basin_req = DeleteBasinRequest::new(basin);
self.client
.delete_basin(delete_basin_req)
Expand All @@ -88,18 +88,17 @@ impl AccountService {

pub async fn get_basin_config(
&self,
basin: String,
basin: BasinName,
) -> Result<BasinConfig, AccountServiceError> {
Ok(self
.client
self.client
.get_basin_config(basin)
.await
.map_err(|e| AccountServiceError::GetBasinConfig(s2_status(&e)))?)
.map_err(|e| AccountServiceError::GetBasinConfig(s2_status(&e)))
}

pub async fn reconfigure_basin(
&self,
basin: String,
basin: BasinName,
basin_config: BasinConfig,
mask: Vec<String>,
) -> Result<(), AccountServiceError> {
Expand Down
5 changes: 2 additions & 3 deletions src/basin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,10 @@ impl BasinService {
&self,
stream: String,
) -> Result<StreamConfig, BasinServiceError> {
Ok(self
.client
self.client
.get_stream_config(stream)
.await
.map_err(|e| BasinServiceError::GetStreamConfig(s2_status(&e)))?)
.map_err(|e| BasinServiceError::GetStreamConfig(s2_status(&e)))
}

pub async fn reconfigure_stream(
Expand Down
11 changes: 7 additions & 4 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use miette::Diagnostic;
use streamstore::client::{ClientError, ConnectionError, ParseError};
use streamstore::{
client::{ClientError, ParseError},
types::ConvertError,
};
use thiserror::Error;

use crate::{
Expand Down Expand Up @@ -28,9 +31,9 @@ pub enum S2CliError {
#[diagnostic(transparent)]
Config(#[from] S2ConfigError),

#[error("Failed to connect to s2: {0}")]
#[diagnostic(help("Are you connected to the internet?"))]
Connection(#[from] ConnectionError),
#[error(transparent)]
#[diagnostic(help("Are you trying to operate on an invalid basin?"))]
ConvertError(#[from] ConvertError),

#[error("{0}")]
#[diagnostic(help("Are you overriding `S2_CLOUD`, `S2_CELL`, or `S2_BASIN_ZONE`?"))]
Expand Down
56 changes: 31 additions & 25 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use stream::{RecordStream, StreamService, StreamServiceError};
use streamstore::{
bytesize::ByteSize,
client::{BasinClient, Client, ClientConfig, HostEndpoints, ParseError, StreamClient},
types::{BasinMetadata, MeteredSize as _, ReadOutput},
types::{BasinMetadata, BasinName, MeteredSize as _, ReadOutput},
HeaderValue,
};
use tokio::{
fs::{File, OpenOptions},
Expand Down Expand Up @@ -275,7 +276,7 @@ fn parse_records_output_source(s: &str) -> Result<RecordsIO, std::io::Error> {

fn client_config(auth_token: String) -> Result<ClientConfig, ParseError> {
Ok(ClientConfig::new(auth_token.to_string())
.with_user_agent("s2-cli")
.with_user_agent("s2-cli".parse::<HeaderValue>().expect("valid user agent"))
.with_host_endpoints(HostEndpoints::from_env()?)
.with_connection_timeout(std::time::Duration::from_secs(5)))
}
Expand Down Expand Up @@ -317,7 +318,7 @@ async fn run() -> Result<(), S2CliError> {
Commands::Account { action } => {
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let account_service = AccountService::new(Client::new(client_config)?);
let account_service = AccountService::new(Client::new(client_config));
match action {
AccountActions::ListBasins {
prefix,
Expand Down Expand Up @@ -354,19 +355,19 @@ async fn run() -> Result<(), S2CliError> {
None => (None, None),
};
account_service
.create_basin(basin, storage_class, retention_policy)
.create_basin(basin.try_into()?, storage_class, retention_policy)
.await?;

eprintln!("{}", "✓ Basin created".green().bold());
}

AccountActions::DeleteBasin { basin } => {
account_service.delete_basin(basin).await?;
account_service.delete_basin(basin.try_into()?).await?;
eprintln!("{}", "✓ Basin deletion requested".green().bold());
}

AccountActions::GetBasinConfig { basin } => {
let basin_config = account_service.get_basin_config(basin).await?;
let basin_config = account_service.get_basin_config(basin.try_into()?).await?;
let basin_config: BasinConfig = basin_config.into();
println!("{}", serde_json::to_string_pretty(&basin_config)?);
}
Expand All @@ -383,13 +384,14 @@ async fn run() -> Result<(), S2CliError> {
}

account_service
.reconfigure_basin(basin, config.into(), mask)
.reconfigure_basin(basin.try_into()?, config.into(), mask)
.await?;
}
}
}

Commands::Basin { basin, action } => {
let basin = BasinName::try_from(basin)?;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
match action {
Expand All @@ -398,7 +400,7 @@ async fn run() -> Result<(), S2CliError> {
start_after,
limit,
} => {
let basin_client = BasinClient::new(client_config, basin)?;
let basin_client = BasinClient::new(client_config, basin);
let streams = BasinService::new(basin_client)
.list_streams(
prefix.unwrap_or_default(),
Expand All @@ -412,23 +414,23 @@ async fn run() -> Result<(), S2CliError> {
}

BasinActions::CreateStream { stream, config } => {
let basin_client = BasinClient::new(client_config, basin)?;
let basin_client = BasinClient::new(client_config, basin);
BasinService::new(basin_client)
.create_stream(stream, config.map(Into::into))
.await?;
eprintln!("{}", "✓ Stream created".green().bold());
}

BasinActions::DeleteStream { stream } => {
let basin_client = BasinClient::new(client_config, basin)?;
let basin_client = BasinClient::new(client_config, basin);
BasinService::new(basin_client)
.delete_stream(stream)
.await?;
eprintln!("{}", "✓ Stream deleted".green().bold());
}

BasinActions::GetStreamConfig { stream } => {
let basin_client = BasinClient::new(client_config, basin)?;
let basin_client = BasinClient::new(client_config, basin);
let config = BasinService::new(basin_client)
.get_stream_config(stream)
.await?;
Expand All @@ -437,7 +439,7 @@ async fn run() -> Result<(), S2CliError> {
}

BasinActions::ReconfigureStream { stream, config } => {
let basin_client = BasinClient::new(client_config, basin)?;
let basin_client = BasinClient::new(client_config, basin);
let mut mask = Vec::new();

if config.storage_class.is_some() {
Expand All @@ -461,16 +463,17 @@ async fn run() -> Result<(), S2CliError> {
stream,
action,
} => {
let basin = BasinName::try_from(basin)?;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
match action {
StreamActions::CheckTail => {
let stream_client = StreamClient::new(client_config, basin, stream)?;
let stream_client = StreamClient::new(client_config, basin, stream);
let next_seq_num = StreamService::new(stream_client).check_tail().await?;
println!("{}", next_seq_num);
}
StreamActions::Append { records } => {
let stream_client = StreamClient::new(client_config, basin, stream)?;
let stream_client = StreamClient::new(client_config, basin, stream);
let append_input_stream = RecordStream::new(
records
.into_reader()
Expand Down Expand Up @@ -504,7 +507,7 @@ async fn run() -> Result<(), S2CliError> {
start_seq_num,
output,
} => {
let stream_client = StreamClient::new(client_config, basin, stream)?;
let stream_client = StreamClient::new(client_config, basin, stream);
let mut read_output_stream = StreamService::new(stream_client)
.read_session(start_seq_num)
.await?;
Expand All @@ -524,7 +527,7 @@ async fn run() -> Result<(), S2CliError> {
let read_result = read_result
.map_err(|e| StreamServiceError::ReadSession(e.to_string()))?;

match read_result.output {
match read_result {
ReadOutput::Batch(sequenced_record_batch) => {
let num_records = sequenced_record_batch.records.len();
let mut batch_len = ByteSize::b(0);
Expand Down Expand Up @@ -552,7 +555,7 @@ async fn run() -> Result<(), S2CliError> {

if let Some(ref mut writer) = writer {
writer
.write_all(&data)
.write_all(data)
.await
.map_err(|e| S2CliError::RecordWrite(e.to_string()))?;
writer
Expand All @@ -569,13 +572,14 @@ async fn run() -> Result<(), S2CliError> {
/ 1024.0;

eprintln!(
"{}",
format!(
"{throughput_mibps:.2} MiB/s ({num_records} records in range {seq_range:?})",
)
.blue()
.bold()
);
"{}",
format!(
"{throughput_mibps:.2} MiB/s \
({num_records} records in range {seq_range:?})",
)
.blue()
.bold()
);
}
// TODO: better message for these cases
ReadOutput::FirstSeqNum(seq_num) => {
Expand All @@ -594,7 +598,9 @@ async fn run() -> Result<(), S2CliError> {
eprintln!(
"{}",
format!(
"{total_data_len} metered bytes in {total_elapsed_time} seconds at {total_throughput_mibps:.2} MiB/s"
"{total_data_len} metered bytes in \
{total_elapsed_time} seconds \
at {total_throughput_mibps:.2} MiB/s"
)
.yellow()
.bold()
Expand Down
10 changes: 5 additions & 5 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use streamstore::{
batching::AppendRecordsBatchingStream,
client::StreamClient,
streams::AppendRecordStream,
types::{AppendOutput, ReadSessionRequest, ReadSessionResponse},
types::{AppendOutput, ReadOutput, ReadSessionRequest},
Streaming,
};
use tokio::io::AsyncBufRead;
Expand Down Expand Up @@ -78,8 +78,8 @@ impl StreamService {
&self,
append_input_stream: RecordStream<Box<dyn AsyncBufRead + Send + Unpin>>,
) -> Result<Streaming<AppendOutput>, StreamServiceError> {
let append_record_stream = AppendRecordStream::new(append_input_stream, Default::default())
.expect("stream init can't fail");
let append_record_stream =
AppendRecordsBatchingStream::new(append_input_stream, Default::default());

self.client
.append_session(append_record_stream)
Expand All @@ -90,7 +90,7 @@ impl StreamService {
pub async fn read_session(
&self,
start_seq_num: Option<u64>,
) -> Result<Streaming<ReadSessionResponse>, StreamServiceError> {
) -> Result<Streaming<ReadOutput>, StreamServiceError> {
let mut read_session_req = ReadSessionRequest::new();
if let Some(start_seq_num) = start_seq_num {
read_session_req = read_session_req.with_start_seq_num(start_seq_num);
Expand Down

0 comments on commit 74326a3

Please sign in to comment.