Skip to content

Commit

Permalink
chore: make ClientConfig fields private + revise docs (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
shikhar authored Nov 20, 2024
1 parent c51d541 commit dfaea7e
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 51 deletions.
6 changes: 3 additions & 3 deletions examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;
use futures::StreamExt;
use streamstore::{
batching::AppendRecordsBatchingStream,
client::{Client, ClientConfig, ClientError, HostEndpoints},
client::{Client, ClientConfig, ClientError, S2Endpoints},
types::{
AppendInput, AppendRecord, AppendRecordBatch, BasinName, CreateBasinRequest,
CreateStreamRequest, DeleteBasinRequest, DeleteStreamRequest, ListBasinsRequest,
Expand All @@ -15,10 +15,10 @@ use streamstore::{
async fn main() {
let token = std::env::var("S2_AUTH_TOKEN").unwrap();

let host_endpoints = HostEndpoints::from_env().unwrap();
let endpoints = S2Endpoints::from_env().unwrap();

let config = ClientConfig::new(token)
.with_host_endpoints(host_endpoints)
.with_endpoints(endpoints)
.with_request_timeout(Duration::from_secs(10));

println!("Connecting with {config:#?}");
Expand Down
86 changes: 38 additions & 48 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ use crate::{

const DEFAULT_HTTP_CONNECTOR: Option<HttpConnector> = None;

/// Cloud deployment to be used to connect the client with.
/// S2 cloud environment to connect with.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum HostCloud {
/// S2 hosted on AWS.
/// S2 running on AWS.
#[default]
Aws,
}
Expand Down Expand Up @@ -163,26 +163,26 @@ impl ParseError {
}
}

/// Endpoints for the hosted S2 environment.
/// Endpoints for the S2 environment.
#[derive(Debug, Clone)]
pub struct HostEndpoints {
pub struct S2Endpoints {
cell: Authority,
basin_zone: Option<Authority>,
}

impl From<HostCloud> for HostEndpoints {
impl From<HostCloud> for S2Endpoints {
fn from(cloud: HostCloud) -> Self {
HostEndpoints::for_cloud(cloud)
S2Endpoints::for_cloud(cloud)
}
}

impl Default for HostEndpoints {
impl Default for S2Endpoints {
fn default() -> Self {
Self::for_cloud(HostCloud::default())
}
}

impl HostEndpoints {
impl S2Endpoints {
pub fn for_cloud(cloud: HostCloud) -> Self {
Self::from_parts(cloud, HostEnv::default(), None, None)
}
Expand Down Expand Up @@ -244,35 +244,26 @@ impl HostEndpoints {
}
}

/// Client configuration to be used to connect with the host.
/// Client configuration.
#[derive(Debug, Clone)]
pub struct ClientConfig {
/// Auth token for the client.
pub token: SecretString,
/// Host URI to connect with.
pub host_endpoints: HostEndpoints,
/// Timeout for connecting/reconnecting.
pub connection_timeout: Duration,
/// Timeout for a particular request.
pub request_timeout: Duration,
/// User agent to be used for the client.
pub user_agent: HeaderValue,
/// URI scheme to use to connect.
pub(crate) token: SecretString,
pub(crate) endpoints: S2Endpoints,
pub(crate) connection_timeout: Duration,
pub(crate) request_timeout: Duration,
pub(crate) user_agent: HeaderValue,
#[cfg(feature = "connector")]
pub uri_scheme: http::uri::Scheme,
/// Backoff duration for retries.
pub retry_backoff_duration: Duration,
/// Maximum number of retries.
pub max_attempts: usize,
pub(crate) uri_scheme: http::uri::Scheme,
pub(crate) retry_backoff_duration: Duration,
pub(crate) max_attempts: usize,
}

impl ClientConfig {
/// Construct a new client configuration with given auth token and other
/// defaults.
/// Initialize a default client configuration with the specified authentication token.
pub fn new(token: impl Into<String>) -> Self {
Self {
token: token.into().into(),
host_endpoints: HostEndpoints::default(),
endpoints: S2Endpoints::default(),
connection_timeout: Duration::from_secs(3),
request_timeout: Duration::from_secs(5),
user_agent: "s2-sdk-rust".parse().expect("valid user agent"),
Expand All @@ -283,40 +274,39 @@ impl ClientConfig {
}
}

/// Construct from an existing configuration with the new host URIs.
pub fn with_host_endpoints(self, host_endpoints: impl Into<HostEndpoints>) -> Self {
/// S2 endpoints to connect to.
pub fn with_endpoints(self, host_endpoints: impl Into<S2Endpoints>) -> Self {
Self {
host_endpoints: host_endpoints.into(),
endpoints: host_endpoints.into(),
..self
}
}

/// Construct from an existing configuration with the new connection
/// timeout.
/// Timeout for connecting and transparently reconnecting. Defaults to 3s.
pub fn with_connection_timeout(self, connection_timeout: impl Into<Duration>) -> Self {
Self {
connection_timeout: connection_timeout.into(),
..self
}
}

/// Construct from an existing configuration with the new request timeout.
/// Timeout for a particular request. Defaults to 5s.
pub fn with_request_timeout(self, request_timeout: impl Into<Duration>) -> Self {
Self {
request_timeout: request_timeout.into(),
..self
}
}

/// Construct from an existing configuration with the new user agent.
/// User agent. Defaults to `s2-sdk-rust`. Feel free to say hi.
pub fn with_user_agent(self, user_agent: impl Into<HeaderValue>) -> Self {
Self {
user_agent: user_agent.into(),
..self
}
}

/// Construct from an existing configuration with the new URI scheme.
/// URI scheme to use when connecting with a custom connector. Defaults to `https`.
#[cfg(feature = "connector")]
pub fn with_uri_scheme(self, uri_scheme: impl Into<http::uri::Scheme>) -> Self {
Self {
Expand All @@ -325,15 +315,19 @@ impl ClientConfig {
}
}

/// Construct from an existing configuration with the retry backoff duration.
/// Backoff duration when retrying.
/// Defaults to 100ms.
/// A jitter is always applied.
pub fn with_retry_backoff_duration(self, retry_backoff_duration: impl Into<Duration>) -> Self {
Self {
retry_backoff_duration: retry_backoff_duration.into(),
..self
}
}

/// Construct from an existing configuration with maximum number of retries.
/// Maximum number of attempts per request.
/// Setting it to 1 disables retrying.
/// The default is to make 3 attempts.
pub fn max_attempts(self, max_attempts: usize) -> Self {
assert!(max_attempts > 0, "max attempts must be greater than 0");
Self {
Expand All @@ -351,14 +345,13 @@ pub enum ClientError {
Service(#[from] tonic::Status),
}

/// The S2 client to interact with the API.
/// Client for account-level operations.
#[derive(Debug, Clone)]
pub struct Client {
inner: ClientInner,
}

impl Client {
/// Create the client to connect with the S2 API.
pub fn new(config: ClientConfig) -> Self {
Self {
inner: ClientInner::new_cell(config, DEFAULT_HTTP_CONNECTOR),
Expand All @@ -378,7 +371,6 @@ impl Client {
}
}

/// Get the client to interact with the S2 basin service API.
pub fn basin_client(&self, basin: types::BasinName) -> BasinClient {
BasinClient {
inner: self.inner.new_basin(basin),
Expand Down Expand Up @@ -448,14 +440,13 @@ impl Client {
}
}

/// Client to interact with the S2 basin service API.
/// Client for basin-level operations.
#[derive(Debug, Clone)]
pub struct BasinClient {
inner: ClientInner,
}

impl BasinClient {
/// Create the client to connect with the S2 basin service API.
pub fn new(config: ClientConfig, basin: types::BasinName) -> Self {
Client::new(config).basin_client(basin)
}
Expand Down Expand Up @@ -543,15 +534,14 @@ impl BasinClient {
}
}

/// Client to interact with the S2 stream service API.
/// Client for stream-level operations.
#[derive(Debug, Clone)]
pub struct StreamClient {
inner: ClientInner,
stream: String,
}

impl StreamClient {
/// Create the client to connect with the S2 stream service API.
pub fn new(config: ClientConfig, basin: types::BasinName, stream: impl Into<String>) -> Self {
BasinClient::new(config, basin).stream_client(stream)
}
Expand Down Expand Up @@ -660,12 +650,12 @@ impl ClientInner {
C::Future: Send,
C::Error: std::error::Error + Send + Sync + 'static,
{
let cell_endpoint = config.host_endpoints.cell.clone();
let cell_endpoint = config.endpoints.cell.clone();
Self::new(config, cell_endpoint, connector)
}

fn new_basin(&self, basin: types::BasinName) -> Self {
match self.config.host_endpoints.basin_zone.clone() {
match self.config.endpoints.basin_zone.clone() {
Some(endpoint) => {
let basin_endpoint: Authority = format!("{basin}.{endpoint}")
.parse()
Expand Down Expand Up @@ -708,7 +698,7 @@ impl ClientInner {

let channel = if let Some(connector) = connector {
assert!(
config.host_endpoints.basin_zone.is_none(),
config.endpoints.basin_zone.is_none(),
"cannot connect with connector if basin zone is provided"
);
endpoint.connect_with_connector_lazy(connector)
Expand Down

0 comments on commit dfaea7e

Please sign in to comment.