From 74819953d53b253e7553da0ab27300f99dba228d Mon Sep 17 00:00:00 2001
From: Mehul Arora <mehul_arora@live.com>
Date: Sat, 28 Sep 2024 19:31:08 -0400
Subject: [PATCH] fmly

---
 Cargo.lock   | 30 ++++++++++++++++++------
 Cargo.toml   |  4 +++-
 src/error.rs |  9 ++++++++
 src/main.rs  | 64 ++++++++++++++++++++++++++++++++++++++++++++++++----
 4 files changed, 95 insertions(+), 12 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 8abf64d..51bc708 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -128,9 +128,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
 
 [[package]]
 name = "axum"
-version = "0.7.6"
+version = "0.7.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8f43644eed690f5374f1af436ecd6aea01cd201f6fbdf0178adaf6907afb2cec"
+checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae"
 dependencies = [
  "async-trait",
  "axum-core",
@@ -155,9 +155,9 @@ dependencies = [
 
 [[package]]
 name = "axum-core"
-version = "0.4.4"
+version = "0.4.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5e6b8ba012a258d63c9adfa28b9ddcf66149da6f986c5b5452e629d5ee64bf00"
+checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199"
 dependencies = [
  "async-trait",
  "bytes",
@@ -792,6 +792,18 @@ dependencies = [
  "serde",
 ]
 
+[[package]]
+name = "json_dotpath"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dbdcfef3cf5591f0cef62da413ae795e3d1f5a00936ccec0b2071499a32efd1a"
+dependencies = [
+ "serde",
+ "serde_derive",
+ "serde_json",
+ "thiserror",
+]
+
 [[package]]
 name = "lazy_static"
 version = "1.5.0"
@@ -1358,13 +1370,15 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f"
 [[package]]
 name = "s2"
 version = "0.1.0"
-source = "git+ssh://git@github.com/s2-streamstore/s2.rs.git?branch=main#153aa1b2d7e9eb328ce4ae50a1eeb972a780fdc9"
 dependencies = [
  "backon",
  "http",
  "prost",
  "prost-types",
  "secrecy",
+ "serde",
+ "serde_derive",
+ "serde_json",
  "thiserror",
  "tonic",
  "tonic-build",
@@ -1381,9 +1395,11 @@ dependencies = [
  "config",
  "dirs",
  "humantime",
+ "json_dotpath",
  "miette",
  "s2",
  "serde",
+ "serde_json",
  "thiserror",
  "tokio",
  "toml",
@@ -1536,9 +1552,9 @@ checksum = "b7401a30af6cb5818bb64852270bb722533397edcfc7344954a38f420819ece2"
 
 [[package]]
 name = "syn"
-version = "2.0.77"
+version = "2.0.79"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9f35bcdf61fd8e7be6caf75f429fdca8beb3ed76584befb503b1569faee373ed"
+checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590"
 dependencies = [
  "proc-macro2",
  "quote",
diff --git a/Cargo.toml b/Cargo.toml
index 3483ee1..23b8b6a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,10 +11,12 @@ dirs = "5.0.1"
 serde = { version = "1.0.210", features = ["derive"] }
 thiserror = "1.0.63"
 toml = "0.8.19"
-s2 = { git = "ssh://git@github.com/s2-streamstore/s2.rs.git", branch = "main" }
+s2 = { path = "../../s2.rs" }
 tokio = { version = "*", features = ["full"] }
 humantime = "2.1.0"
 miette = { version = "7.2.0", features = ["fancy"] }
 color-print = "0.3.6"
 tracing = "0.1.40"
 tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
+serde_json = "1.0.128"
+json_dotpath = "1.1.0"
diff --git a/src/error.rs b/src/error.rs
index 8ae177a..4945473 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -41,4 +41,13 @@ pub enum S2CliError {
     #[error(transparent)]
     #[diagnostic(help("{}", get_help()))]
     BasinService(#[from] BasinServiceError),
+
+    #[error(transparent)]
+    InvalidConfigSubPath(#[from] json_dotpath::Error),
+
+    #[error(transparent)]
+    InvalidConfig(#[from] serde_json::Error),
+
+    #[error("Path: {0} not found!")]
+    PathKeyNotFound(String),
 }
diff --git a/src/main.rs b/src/main.rs
index 58877b1..2b8a664 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,13 +1,17 @@
+use std::{error::Error, time::Duration};
+
 use account::AccountService;
 use basin::BasinService;
 use clap::{builder::styling, Parser, Subcommand};
 use colored::*;
 use config::{config_path, create_config};
 use error::S2CliError;
+use json_dotpath::DotPaths;
 use s2::{
     client::{Client, ClientConfig, HostCloud},
-    types::{BasinMetadata, StorageClass},
+    types::{BasinConfig, BasinMetadata, RetentionPolicy, StorageClass, StreamConfig},
 };
+use serde_json::Value;
 use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt};
 
 mod account;
@@ -115,8 +119,8 @@ enum AccountActions {
         basin: String,
 
         /// Configuration to apply.
-        #[arg(short, long)]
-        config: Vec<String>,
+        #[arg(short, long, value_parser = parse_key_val::<String, String>, num_args = 1..)]
+        config: Vec<(String, String)>,
     },
 }
 
@@ -154,6 +158,19 @@ async fn s2_client(auth_token: String) -> Result<Client, S2CliError> {
     Ok(Client::connect(config).await?)
 }
 
+fn parse_key_val<T, U>(s: &str) -> Result<(T, U), Box<dyn Error + Send + Sync + 'static>>
+where
+    T: std::str::FromStr,
+    T::Err: Error + Send + Sync + 'static,
+    U: std::str::FromStr,
+    U::Err: Error + Send + Sync + 'static,
+{
+    let pos = s
+        .find('=')
+        .ok_or_else(|| format!("invalid KEY=value: no `=` found in `{s}`"))?;
+    Ok((s[..pos].parse()?, s[pos + 1..].parse()?))
+}
+
 #[tokio::main]
 async fn main() -> miette::Result<()> {
     run().await?;
@@ -237,7 +254,46 @@ async fn run() -> Result<(), S2CliError> {
                     println!("{:?}", basin_config);
                 }
                 AccountActions::ReconfigureBasin { basin, config } => {
-                    unimplemented!()
+                    // dummy basin config for full path matching
+                    let basin_config = BasinConfig::builder()
+                        .default_stream_config(Some(
+                            StreamConfig::builder()
+                                .storage_class(StorageClass::Unspecified)
+                                .retention_policy(RetentionPolicy::Age(Duration::from_secs(60)))
+                                .build(),
+                        ))
+                        .build();
+
+                    let mut json_config: Value = serde_json::to_value(basin_config)
+                        .expect("Failed to convert basin_config to Value");                    
+
+                    for (key, value) in config {
+                        match value.as_str() {
+                            "null" => {
+                                json_config.dot_remove(&key)?;
+                            }
+                            _ => {
+                                let parsed_value = match humantime::parse_duration(&value) {
+                                    Ok(duration) => serde_json::json!({
+                                        "secs": duration.as_secs(),
+                                        "nanos": duration.subsec_nanos()
+                                    }),
+                                    Err(_) => Value::String(value.clone()),
+                                };
+
+                                match json_config.dot_has_checked(&key) {
+                                    Ok(true) => {
+                                        json_config.dot_set(&key, parsed_value)?;
+                                    }
+                                    _ => {
+                                        Err(S2CliError::PathKeyNotFound(key.clone()))?;
+                                    }
+                                }
+                            }
+                        }
+                    }                    
+
+                    let basin_config: BasinConfig = serde_json::from_value(json_config)?;                    
                 }
             }
         }