diff --git a/crates/rooch-config/src/da_config.rs b/crates/rooch-config/src/da_config.rs index 35ff1003de..763ea8cd30 100644 --- a/crates/rooch-config/src/da_config.rs +++ b/crates/rooch-config/src/da_config.rs @@ -71,8 +71,10 @@ pub enum OpenDAScheme { // access_key_id // secret_access_key S3, - // Avail App Light Client, main config: - // endpoint + // Avail Fusion(TurboDA & Light Client), + // turbo_endpoint + // turbo_auth_token + // light_endpoint Avail, // Celestia, main config: // endpoint @@ -262,12 +264,12 @@ pub enum DABackendConfigType { #[derive(Clone, Default, Debug, PartialEq, Deserialize, Serialize)] #[serde(rename_all = "kebab-case")] #[serde(deny_unknown_fields)] -/// Open DA provides ability to access various storage services +/// Open DA provides ability to access various backends pub struct DABackendOpenDAConfig { - /// specifies the type of storage service to be used. 'gcs' with corresponding GCS server configuration, 's3' with corresponding S3 server configuration, etc + /// specifies the type of backend to be used. 'gcs' with corresponding GCS server configuration, 's3' with corresponding S3 server configuration, etc #[serde(default)] pub scheme: OpenDAScheme, - /// specifies the configuration of the storage service. 'gcs' with corresponding GCS server configuration, 's3' with corresponding S3 server configuration, etc. + /// specifies the configuration of the backend. 'gcs' with corresponding GCS server configuration, 's3' with corresponding S3 server configuration, etc. pub config: HashMap, #[serde(skip_serializing_if = "Option::is_none")] /// for fs backend: diff --git a/crates/rooch-da/src/backend/openda/avail.rs b/crates/rooch-da/src/backend/openda/avail.rs index 533e0338ed..73d43005fe 100644 --- a/crates/rooch-da/src/backend/openda/avail.rs +++ b/crates/rooch-da/src/backend/openda/avail.rs @@ -10,34 +10,259 @@ use reqwest::{Client, StatusCode}; use rooch_types::da::segment::SegmentID; use serde::{Deserialize, Serialize}; use serde_json::json; +use std::collections::HashMap; use tokio::time::{sleep, Duration}; // small blob size for transaction to get included in a block quickly pub(crate) const DEFAULT_AVAIL_MAX_SEGMENT_SIZE: u64 = 256 * 1024; -const BACK_OFF_MIN_DELAY: Duration = Duration::from_millis(3000); + +const MIN_BACKOFF_DELAY: Duration = Duration::from_millis(3000); const SUBMIT_API_PATH: &str = "v2/submit"; +// TurboDA provides relay service, +// so the delay is shorter. +// default retry duration(seconds): 0.5, 1, 2, 4, 8, 10 +const MIN_BACKOFF_DELAY_TURBO: Duration = Duration::from_millis(500); +const MAX_BACKOFF_DELAY_TURBO: Duration = Duration::from_secs(10); +const TURBO_SUBMIT_API_PATH: &str = "user/submit_raw_data"; + +/// Avail client: A turbo and Light +/// Turbo client has higher priority, if not available, use the Light client +#[derive(Clone)] +pub struct AvailFusionClient { + turbo_client: Option, + light_client: Option, +} + +#[async_trait] +impl Operator for AvailFusionClient { + async fn submit_segment( + &self, + segment_id: SegmentID, + segment_bytes: Vec, + prefix: Option, + ) -> anyhow::Result<()> { + // Fallback to light_client if turbo_client is not available + if let Some(turbo_client) = &self.turbo_client { + let turbo_result = turbo_client + .submit_segment(segment_id, segment_bytes.clone(), prefix.clone()) + .await; + + if let Err(error) = turbo_result { + tracing::warn!( + "Failed to submit segment to Avail Turbo: {}, trying light_client if available", + error + ); + + if let Some(light_client) = &self.light_client { + return light_client + .submit_segment(segment_id, segment_bytes, prefix) + .await; + } else { + return Err(anyhow!("Light client is not available")); + } + } + + turbo_result + } else if let Some(light_client) = &self.light_client { + light_client + .submit_segment(segment_id, segment_bytes, prefix) + .await + } else { + Err(anyhow!("Both turbo and light clients are not available")) + } + } +} + +pub struct AvailFusionClientConfig { + pub turbo_endpoint: Option, + pub turbo_auth_token: Option, + pub light_endpoint: Option, + pub max_retries: usize, +} + +impl AvailFusionClientConfig { + pub fn from_scheme_config( + scheme_config: HashMap, + max_retries: usize, + ) -> anyhow::Result { + let turbo_endpoint = scheme_config.get("turbo_endpoint").cloned(); + let turbo_auth_token = scheme_config.get("turbo_auth_token").cloned(); + let light_endpoint = scheme_config.get("light_endpoint").cloned(); + + if turbo_endpoint.is_none() && light_endpoint.is_none() { + return Err(anyhow!("turbo_endpoint or light_endpoint must be provided")); + } + if turbo_endpoint.is_some() && turbo_auth_token.is_none() { + return Err(anyhow!("turbo_auth_token must be provided")); + } + + Ok(AvailFusionClientConfig { + turbo_endpoint, + turbo_auth_token, + light_endpoint, + max_retries, + }) + } + + pub fn build_client(&self) -> anyhow::Result { + let turbo_client = if let Some(endpoint) = &self.turbo_endpoint { + Some(AvailTurboClient::new( + endpoint, + self.max_retries, + self.turbo_auth_token.as_ref().unwrap(), + )?) + } else { + None + }; + let light_client = if let Some(endpoint) = &self.light_endpoint { + Some(AvailLightClient::new(endpoint, self.max_retries)?) + } else { + None + }; + + Ok(AvailFusionClient { + turbo_client, + light_client, + }) + } +} + +#[derive(Clone)] +pub(crate) struct AvailTurboClient { + endpoint: String, + http_client: Client, + max_retries: usize, + auth_token: String, +} + +impl AvailTurboClient { + pub(crate) fn new( + endpoint: &str, + max_retries: usize, + auth_token: &str, + ) -> anyhow::Result { + let client = Client::new(); + + Ok(AvailTurboClient { + endpoint: endpoint.to_string(), + http_client: client, + max_retries, + auth_token: auth_token.to_string(), + }) + } + + async fn handle_success( + segment_id: SegmentID, + response: reqwest::Response, + ) -> anyhow::Result<()> { + match response.json::().await { + Ok(submit_response) => { + tracing::info!( + "Submitted segment: {} to Avail Turbo, submission_id: {}", + segment_id, + submit_response.submission_id, + ); + Ok(()) + } + Err(json_error) => Err(anyhow!( + "Failed to parse response JSON for segment {:?}: {:?}", + segment_id, + json_error, + )), + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct AvailTurboClientSubmitResponse { + submission_id: String, +} + +#[async_trait] +impl Operator for AvailTurboClient { + async fn submit_segment( + &self, + segment_id: SegmentID, + segment_bytes: Vec, + _prefix: Option, + ) -> anyhow::Result<()> { + let submit_url = format!("{}/{}", self.endpoint, TURBO_SUBMIT_API_PATH); + let max_attempts = self.max_retries + 1; // max_attempts = max_retries + first attempt + let mut attempts = 0; + let mut retry_delay = MIN_BACKOFF_DELAY_TURBO; + + // token for turbo submit, + // will support more tokens in the future + const TOKEN: &str = "avail"; + + loop { + attempts += 1; + let request = self + .http_client + .post(&submit_url) + .query(&[("token", TOKEN.to_string())]) + .bearer_auth(&self.auth_token) + .header("Content-Type", "application/json") + .body(segment_bytes.clone()); + + let response = request.send().await?; + + if response.status().is_success() { + return AvailTurboClient::handle_success(segment_id, response).await; + } + + if response.status().is_server_error() { + if attempts < max_attempts { + tracing::warn!( + "Failed to submit segment: {:?} to Avail Turbo: {}, attempts: {},retrying after {}ms", + segment_id, + response.status(), + attempts, + retry_delay.as_millis(), + ); + sleep(retry_delay).await; + retry_delay = std::cmp::min(retry_delay * 2, MAX_BACKOFF_DELAY_TURBO); + continue; + } + + return Err(anyhow!( + "Failed to submit segment: {:?} to Avail Turbo: {} after {} attempts", + segment_id, + response.status(), + attempts, + )); + } + return Err(anyhow!( + "Failed to submit segment: {:?} to Avail Turbo: {}", + segment_id, + response.status(), + )); + } + } +} + #[derive(Clone)] -pub(crate) struct AvailClient { +pub struct AvailLightClient { endpoint: String, - client: Client, + http_client: Client, max_retries: usize, } -impl AvailClient { - pub(crate) fn new(endpoint: &str, max_retries: usize) -> anyhow::Result { +impl AvailLightClient { + pub fn new(endpoint: &str, max_retries: usize) -> anyhow::Result { let client = Client::new(); - Ok(AvailClient { + Ok(AvailLightClient { endpoint: endpoint.to_string(), - client, + http_client: client, max_retries, }) } } #[derive(Debug, Serialize, Deserialize)] -pub struct AvailSubmitResponse { +pub struct AvailLightClientSubmitResponse { block_number: u32, block_hash: String, hash: String, @@ -45,7 +270,7 @@ pub struct AvailSubmitResponse { } #[async_trait] -impl Operator for AvailClient { +impl Operator for AvailLightClient { async fn submit_segment( &self, segment_id: SegmentID, @@ -56,12 +281,12 @@ impl Operator for AvailClient { let data = general_purpose::STANDARD.encode(&segment_bytes); let max_attempts = self.max_retries + 1; // max_attempts = max_retries + first attempt let mut attempts = 0; - let mut retry_delay = BACK_OFF_MIN_DELAY; + let mut retry_delay = MIN_BACKOFF_DELAY; loop { attempts += 1; let response = self - .client + .http_client .post(&submit_url) .header("Content-Type", "application/json") .body(json!({ "data": data }).to_string()) @@ -69,7 +294,7 @@ impl Operator for AvailClient { .await?; match response.status() { StatusCode::OK => { - let submit_response: AvailSubmitResponse = response.json().await?; + let submit_response: AvailLightClientSubmitResponse = response.json().await?; tracing::info!( "Submitted segment: {} to Avail, block_number: {}, block_hash: {}, hash: {}, index: {}", segment_id, diff --git a/crates/rooch-da/src/backend/openda/backend.rs b/crates/rooch-da/src/backend/openda/backend.rs index a4e3bb750b..108f8c0082 100644 --- a/crates/rooch-da/src/backend/openda/backend.rs +++ b/crates/rooch-da/src/backend/openda/backend.rs @@ -1,7 +1,7 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use crate::backend::openda::avail::AvailClient; +use crate::backend::openda::avail::AvailFusionClientConfig; use crate::backend::openda::celestia::{CelestiaClient, WrappedNamespace}; use crate::backend::openda::opendal::BACK_OFF_MIN_DELAY; use crate::backend::openda::operator::{Operator, OperatorConfig}; @@ -31,9 +31,9 @@ impl OpenDABackend { cfg: &DABackendOpenDAConfig, genesis_namespace: String, ) -> anyhow::Result { - let (operator_config, map_config) = + let (operator_config, scheme_config) = OperatorConfig::from_backend_config(cfg.clone(), genesis_namespace)?; - let operator = new_operator(operator_config.clone(), map_config).await?; + let operator = new_operator(operator_config.clone(), scheme_config).await?; Ok(Self { operator_config, @@ -82,27 +82,32 @@ impl OpenDABackend { async fn new_operator( operator_config: OperatorConfig, - config: HashMap, + scheme_config: HashMap, ) -> anyhow::Result> { let max_retries = operator_config.max_retries; let scheme = operator_config.scheme.clone(); let operator: Box = match scheme { - OpenDAScheme::Avail => Box::new(AvailClient::new(&config["endpoint"], max_retries)?), + OpenDAScheme::Avail => { + let avail_fusion_config = + AvailFusionClientConfig::from_scheme_config(scheme_config, max_retries)?; + let avail_fusion_client = avail_fusion_config.build_client()?; + Box::new(avail_fusion_client) + } OpenDAScheme::Celestia => { let namespace = WrappedNamespace::from_string(&operator_config.namespace.clone())?; Box::new( CelestiaClient::new( namespace.into_inner(), - &config["endpoint"], - config.get("auth_token").map(|s| s.as_str()), + &scheme_config["endpoint"], + scheme_config.get("auth_token").map(|s| s.as_str()), max_retries, ) .await?, ) } _ => { - let mut op = opendal::Operator::via_iter(Scheme::from(scheme), config)?; + let mut op = opendal::Operator::via_iter(Scheme::from(scheme), scheme_config)?; op = op .layer( RetryLayer::new() diff --git a/crates/rooch-da/src/backend/openda/operator.rs b/crates/rooch-da/src/backend/openda/operator.rs index c9161495d3..1792687205 100644 --- a/crates/rooch-da/src/backend/openda/operator.rs +++ b/crates/rooch-da/src/backend/openda/operator.rs @@ -48,8 +48,8 @@ impl OperatorConfig { )); } let namespace = backend_config.namespace.unwrap_or(genesis_namespace); - let mut map_config = backend_config.config; - check_map_config(scheme.clone(), &mut map_config)?; + let mut scheme_config = backend_config.config; + check_map_config(scheme.clone(), &mut scheme_config)?; let default_max_segment_size = match scheme { OpenDAScheme::Avail => DEFAULT_AVAIL_MAX_SEGMENT_SIZE, @@ -66,7 +66,7 @@ impl OperatorConfig { max_segment_size, max_retries, }, - map_config, + scheme_config, )) } } diff --git a/crates/rooch-types/src/da/segment.rs b/crates/rooch-types/src/da/segment.rs index d570a2bab6..1b34d10473 100644 --- a/crates/rooch-types/src/da/segment.rs +++ b/crates/rooch-types/src/da/segment.rs @@ -105,7 +105,7 @@ impl Segment for SegmentV0 { } fn get_id(&self) -> SegmentID { - self.id.clone() + self.id } fn get_data(&self) -> Vec { @@ -134,7 +134,7 @@ pub fn segment_from_bytes(bytes: &[u8]) -> anyhow::Result> { } } -#[derive(Serialize, Debug, PartialEq, Clone)] +#[derive(Serialize, Debug, PartialEq, Clone, Copy)] pub struct SegmentID { // chunk id represents the sequential order of extents within a stream, commencing from 0 and incrementing successively. pub chunk_id: u128,