Skip to content

Commit

Permalink
feat(da-indexer): update celestia integration (#1132)
Browse files Browse the repository at this point in the history
* feat: update celestia integration

* fix: format

* fix: refactor code

* Update da-indexer/da-indexer-logic/src/celestia/parser.rs

Co-authored-by: Lev Lymarenko <[email protected]>

---------

Co-authored-by: Lev Lymarenko <[email protected]>
  • Loading branch information
AllFi and sevenzing authored Dec 3, 2024
1 parent ec1c755 commit 8d701b7
Show file tree
Hide file tree
Showing 12 changed files with 403 additions and 254 deletions.
414 changes: 305 additions & 109 deletions da-indexer/Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions da-indexer/da-indexer-logic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ sea-orm = { version = "0.12.2", features = [
"postgres-array",
] }

celestia-rpc = "0.1.1"
celestia-types = "0.1.1"
celestia-rpc = "0.7.0"
celestia-types = "0.7.0"
tokio = { version = "1", features = ["full"] }
hex = "0.4.3"
lazy_static = "1.4.0"
sha3 = "0.10.8"
futures = "0.3"
jsonrpsee = { version = "0.20", features = ["client-core", "macros"] }
jsonrpsee = { version = "0.24.7", features = ["client-core", "macros"] }
serde = "1.0"
serde_with = "3.6.1"
serde_json = "1.0.96"
async-trait = "0.1"
http = "0.2.9"
http = "1.1.0"
tonic = { version = "0.7", features = ["tls", "tls-roots"] }
prost = "0.10"
ethabi = "18.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use celestia_rpc::{Error, Result};
use http::{header, HeaderValue};
use jsonrpsee::{
http_client::{HeaderMap, HttpClientBuilder},
ws_client::WsClientBuilder,
};
pub mod share;

use celestia_rpc::{Client, Error, Result};
use http::{header, HeaderMap, HeaderValue};
use jsonrpsee::{http_client::HttpClientBuilder, ws_client::WsClientBuilder};

/// The maximum request size in the default client in celestia_rpc is not sufficient for some blocks,
/// therefore, we need to customize client initialization
Expand All @@ -12,7 +11,7 @@ pub async fn new_celestia_client(
auth_token: Option<&str>,
max_request_size: u32,
max_response_size: u32,
) -> Result<celestia_rpc::Client> {
) -> Result<Client> {
let mut headers = HeaderMap::new();

if let Some(token) = auth_token {
Expand Down
45 changes: 45 additions & 0 deletions da-indexer/da-indexer-logic/src/celestia/client/share.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use std::future::Future;

use celestia_types::{AppVersion, ExtendedDataSquare};
use jsonrpsee::core::client::{ClientT, Error};

// celestia_rpc::Client doesn't support new version of share.GetEDS method
// so we need to implement it manually
mod rpc {
use celestia_types::eds::RawExtendedDataSquare;
use jsonrpsee::proc_macros::rpc;

#[rpc(client)]
pub trait Share {
#[method(name = "share.GetEDS")]
async fn share_get_eds(&self, height: u64) -> Result<RawExtendedDataSquare, client::Error>;
}
}

pub trait ShareClient: ClientT {
/// GetEDS gets the full EDS identified by the given root.
fn share_get_eds<'a, 'b, 'fut>(
&'a self,
height: u64,
app_version: u64,
) -> impl Future<Output = Result<ExtendedDataSquare, Error>> + Send + 'fut
where
'a: 'fut,
'b: 'fut,
Self: Sized + Sync + 'fut,
{
async move {
let app_version = AppVersion::from_u64(app_version).ok_or_else(|| {
let e = format!("Invalid or unsupported AppVersion: {app_version}");
Error::Custom(e)
})?;

let raw_eds = rpc::ShareClient::share_get_eds(self, height).await?;

ExtendedDataSquare::from_raw(raw_eds, app_version)
.map_err(|e| Error::Custom(e.to_string()))
}
}
}

impl<T> ShareClient for T where T: ClientT {}
30 changes: 20 additions & 10 deletions da-indexer/da-indexer-logic/src/celestia/da.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
use crate::{
celestia::{client, repository::blobs},
indexer::{Job, DA},
};
use anyhow::Result;
use async_trait::async_trait;
use celestia_rpc::{Client, HeaderClient, ShareClient};
use celestia_rpc::{Client, HeaderClient};
use celestia_types::{Blob, ExtendedHeader};
use sea_orm::{DatabaseConnection, TransactionTrait};
use std::sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
};

use crate::{
celestia::{repository::blobs, rpc_client},
indexer::{Job, DA},
use super::{
client::share::ShareClient, job::CelestiaJob, parser, repository::blocks,
settings::IndexerSettings,
};

use super::{job::CelestiaJob, parser, repository::blocks, settings::IndexerSettings};

pub struct CelestiaDA {
client: Client,
db: Arc<DatabaseConnection>,
Expand All @@ -25,7 +27,7 @@ pub struct CelestiaDA {

impl CelestiaDA {
pub async fn new(db: Arc<DatabaseConnection>, settings: IndexerSettings) -> Result<Self> {
let client = rpc_client::new_celestia_client(
let client = client::new_celestia_client(
&settings.rpc.url,
settings.rpc.auth_token.as_deref(),
settings.rpc.max_request_size,
Expand Down Expand Up @@ -54,11 +56,14 @@ impl CelestiaDA {

async fn get_blobs_by_height(&self, height: u64) -> Result<(ExtendedHeader, Vec<Blob>)> {
let header = self.client.header_get_by_height(height).await?;
let mut blobs = vec![];

let mut blobs = vec![];
if parser::maybe_contains_blobs(&header.dah) {
let eds = self.client.share_get_eds(&header).await?;
blobs = parser::parse_eds(&eds, header.dah.square_len())?;
let eds = self
.client
.share_get_eds(height, header.header.version.app)
.await?;
blobs = parser::parse_eds(&eds, header.header.version.app)?;
}

Ok((header, blobs))
Expand Down Expand Up @@ -103,6 +108,11 @@ impl DA for CelestiaDA {
let height = self.client.header_local_head().await?.header.height.value();
tracing::info!(height, "latest block");

if height <= self.last_known_height.load(Ordering::Acquire) {
tracing::info!("latest block is below last known height, skipping...");
return Ok(vec![]);
}

let from = self.last_known_height.swap(height, Ordering::AcqRel) + 1;
Ok((from..=height)
.map(|height| Job::Celestia(CelestiaJob { height }))
Expand Down
2 changes: 1 addition & 1 deletion da-indexer/da-indexer-logic/src/celestia/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
mod client;
pub mod da;
pub mod job;
pub mod l2_router;
mod parser;
pub mod repository;
mod rpc_client;
pub mod settings;
#[cfg(test)]
pub mod tests;
83 changes: 10 additions & 73 deletions da-indexer/da-indexer-logic/src/celestia/parser.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use anyhow::{Error, Result};
use anyhow::Result;
use celestia_types::{
blob::Blob, consts::appconsts, nmt::Namespace, Commitment, DataAvailabilityHeader,
ExtendedDataSquare, Share,
blob::Blob, nmt::Namespace, AppVersion, DataAvailabilityHeader, ExtendedDataSquare,
};

lazy_static! {
Expand All @@ -17,81 +16,19 @@ lazy_static! {

/// Checks if the DataAvailabilityHeader might contain blobs.
pub fn maybe_contains_blobs(dah: &DataAvailabilityHeader) -> bool {
dah.row_roots.iter().any(|row| {
dah.row_roots().iter().any(|row| {
*PAY_FOR_BLOB_NAMESPACE >= row.min_namespace().into()
&& *PAY_FOR_BLOB_NAMESPACE <= row.max_namespace().into()
})
}

/// Extracts blobs from the ExtendedDataSquare.
/// The format described here: https://github.com/celestiaorg/celestia-app/blob/main/specs/src/specs/shares.md
pub fn parse_eds(eds: &ExtendedDataSquare, width: usize) -> Result<Vec<Blob>> {
// sanity check
if width * width != eds.data_square.len() {
return Err(Error::msg("data square length mismatch"));
}
pub fn parse_eds(eds: &ExtendedDataSquare, app_version: u64) -> Result<Vec<Blob>> {
let app_version = AppVersion::from_u64(app_version)
.ok_or_else(|| anyhow::anyhow!("invalid or unsupported app_version: {app_version}"))?;

let mut blobs: Vec<Blob> = vec![];
let mut sequence_length = 0;
let mut parsed_length = 0;

for row in eds.data_square.chunks(width).take(width / 2) {
for share in row.iter().take(width / 2) {
let share = Share::from_raw(share)?;
let ns = share.namespace();

if ns == *TAIL_PADDING_NAMESPACE {
break;
}

if ns.is_reserved_on_celestia() {
continue;
}

let info_byte = share.info_byte();

let mut share_data;
if info_byte.is_sequence_start() {
assert!(parsed_length == sequence_length);

sequence_length = share.sequence_length().unwrap() as usize;
parsed_length = 0;

if sequence_length == 0
&& blobs.last().is_some()
&& blobs.last().unwrap().namespace == ns
{
// Namespace Padding Share, should be ignored
continue;
}

blobs.push(Blob {
namespace: ns,
data: vec![0; sequence_length],
share_version: info_byte.version(),
commitment: Commitment([0; 32]),
});

// first share: skip info byte and sequence length
share_data = &share.data()[1 + appconsts::SEQUENCE_LEN_BYTES..];
} else {
// continuation share: skip info byte
share_data = &share.data()[1..];
}

let data_length = share_data.len().min(sequence_length - parsed_length);
share_data = &share_data[..data_length];

let last_blob = blobs.last_mut().unwrap();
last_blob.data[parsed_length..(parsed_length + data_length)]
.copy_from_slice(share_data);
parsed_length += data_length;

if parsed_length == sequence_length {
last_blob.commitment =
Commitment::from_blob(ns, info_byte.version(), &last_blob.data)?;
}
}
}
Ok(blobs)
Blob::reconstruct_all(eds.data_square(), app_version).map_err(|err| {
tracing::error!(err = ?err, "failed to parse EDS");
anyhow::anyhow!(err)
})
}
14 changes: 12 additions & 2 deletions da-indexer/da-indexer-logic/src/celestia/tests/blobs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use celestia_types::{nmt::Namespace, Blob as CelestiaBlob, Commitment};
use celestia_types::{
consts::appconsts::subtree_root_threshold, nmt::Namespace, AppVersion, Blob as CelestiaBlob,
Commitment,
};

use crate::celestia::{
repository::{blobs, blocks},
Expand Down Expand Up @@ -49,12 +52,19 @@ fn celestia_blob(seed: u32) -> CelestiaBlob {
Namespace::new(0, &[&[0_u8; 18], &sha3("namespace", seed)[..10]].concat()).unwrap();
let data = sha3("data", seed).to_vec();
let share_version = 0;
let commitment = Commitment::from_blob(namespace, share_version, &data).unwrap();
let commitment = Commitment::from_blob(
namespace,
&data,
share_version,
subtree_root_threshold(AppVersion::latest()),
)
.unwrap();
CelestiaBlob {
namespace,
data,
share_version,
commitment,
index: None,
}
}

Expand Down

This file was deleted.

This file was deleted.

1 change: 0 additions & 1 deletion da-indexer/da-indexer-logic/src/celestia/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
pub mod blobs;
pub mod blocks;
pub mod l2_router;
pub mod parser;

use blockscout_service_launcher::test_database::TestDbGuard;

Expand Down
45 changes: 0 additions & 45 deletions da-indexer/da-indexer-logic/src/celestia/tests/parser.rs

This file was deleted.

0 comments on commit 8d701b7

Please sign in to comment.