From b94d04e4f006bb9d0e977b1487a46231ee991659 Mon Sep 17 00:00:00 2001 From: popcnt1 Date: Thu, 19 Sep 2024 15:39:17 +0800 Subject: [PATCH 1/6] refactor(da): refine DA stream Enhanced DA server with chunk and segment handling functionality including compression with lz4. Improved batch submission process, addressing batch data integrity and submission logic. --- .github/workflows/deploy_mainnet.yml | 8 +- Cargo.lock | 20 ++- Cargo.toml | 1 + crates/rooch-da/Cargo.toml | 1 + crates/rooch-da/docs/stream.md | 32 +++++ crates/rooch-da/docs/todo.md | 11 ++ crates/rooch-da/src/chunk.rs | 115 ++++++++++++++++-- crates/rooch-da/src/messages.rs | 15 ++- crates/rooch-da/src/segment.rs | 51 ++------ .../src/server/celestia/actor/server.rs | 48 +------- .../src/server/openda/actor/server.rs | 51 ++------ crates/rooch-proposer/src/actor/proposer.rs | 2 +- crates/rooch-proposer/src/scc/mod.rs | 6 +- crates/rooch-types/src/block.rs | 1 - moveos/smt/src/jellyfish_merkle/hash.rs | 4 +- 15 files changed, 210 insertions(+), 156 deletions(-) create mode 100644 crates/rooch-da/docs/stream.md create mode 100644 crates/rooch-da/docs/todo.md diff --git a/.github/workflows/deploy_mainnet.yml b/.github/workflows/deploy_mainnet.yml index 78dcef21cf..78e01b4295 100644 --- a/.github/workflows/deploy_mainnet.yml +++ b/.github/workflows/deploy_mainnet.yml @@ -3,9 +3,9 @@ on: workflow_dispatch: inputs: tagName: - description: 'Tag Name to Checkout' + description: 'Tag or branch to deploy' + default: 'main' required: true - default: 'latest' # workflow_run: # workflows: ["Build Docker And Deploy Seed"] # types: @@ -13,7 +13,7 @@ on: jobs: deploy-rooch-mainnet: - name: deploy rooch mainnet + name: Deploy Rooch Mainnet runs-on: self-hosted steps: - name: Deploy to GCP MAINNET VM @@ -26,5 +26,5 @@ jobs: chmod 600 private_key.pem sudo apt update sudo apt install -y --no-install-recommends openssh-server - ssh -o StrictHostKeyChecking=no -i private_key.pem $USER@$HOST bash -c "'sleep 30' && && docker image prune -a -f && docker ps | grep rooch | awk '{print \$1}' | xargs -r docker stop && docker ps -a | grep rooch | awk '{print \$1}' | xargs -r docker rm -f && docker pull 'ghcr.io/rooch-network/rooch:${{ github.event.inputs.tagName }}' && docker run -d -v /data:/root -p 6767:6767 'ghcr.io/rooch-network/rooch:${{ github.event.inputs.tagName }}' server start -n main --btc-rpc-url '${{secrets.BTC_MAIN_RPC_URL}}' --btc-rpc-username rooch-main --btc-rpc-password '${{secrets.BTC_MAIN_RPC_PWD}}' --btc-start-block-height 0 --btc-end-block-height 767420 --data-import-mode 1" + ssh -o StrictHostKeyChecking=no -i private_key.pem $USER@$HOST bash -c "'sleep 30' && && docker image prune -a -f && docker ps | grep rooch | awk '{print \$1}' | xargs -r docker stop && docker ps -a | grep rooch | awk '{print \$1}' | xargs -r docker rm -f && docker pull 'ghcr.io/rooch-network/rooch:${{ github.event.inputs.tagName }}' && docker run -d -v /data:/root -p 6767:6767 'ghcr.io/rooch-network/rooch:${{ github.event.inputs.tagName }}' server start -n main --btc-rpc-url '${{secrets.BTC_MAIN_RPC_URL}}' --btc-rpc-username rooch-main --btc-rpc-password '${{secrets.BTC_MAIN_RPC_PWD}}'" diff --git a/Cargo.lock b/Cargo.lock index 635127e4b7..9ffea9a783 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1580,13 +1580,13 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.98" +version = "1.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41c270e7540d725e65ac7f1b212ac8ce349719624d7bcff99f8e2e488e8cf03f" +checksum = "07b1695e2c7e8fc85310cde85aeaab7e3097f593c91d209d3f9df76c928100f0" dependencies = [ "jobserver", "libc", - "once_cell", + "shlex", ] [[package]] @@ -5882,11 +5882,20 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "lz4" +version = "1.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a231296ca742e418c43660cb68e082486ff2538e8db432bc818580f3965025ed" +dependencies = [ + "lz4-sys", +] + [[package]] name = "lz4-sys" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "109de74d5d2353660401699a4174a4ff23fcc649caf553df71933c7fb45ad868" +checksum = "fcb44a01837a858d47e5a630d2ccf304c8efcc4b83b8f9f75b7a9ee4fcc6e57d" dependencies = [ "cc", "libc", @@ -9352,6 +9361,7 @@ dependencies = [ "coerce", "futures", "log", + "lz4", "moveos-types", "opendal", "rooch-config", diff --git a/Cargo.toml b/Cargo.toml index 6c3034621e..841ac911bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -332,6 +332,7 @@ uuid = { version = "1.10.0", features = ["v4", "fast-rng"] } protobuf = { version = "2.28", features = ["with-bytes"] } redb = { version = "2.1.1" } rocksdb = { git = "https://github.com/rooch-network/rust-rocksdb.git", rev = "41d102327ba3cf9a2335d1192e8312c92bc3d6f9", features = ["lz4"] } +lz4 = { version = "1.27.0" } ripemd = { version = "0.1.3" } fastcrypto-zkp = { version = "0.1.3" } diff --git a/crates/rooch-da/Cargo.toml b/crates/rooch-da/Cargo.toml index a821494b28..675d69c60a 100644 --- a/crates/rooch-da/Cargo.toml +++ b/crates/rooch-da/Cargo.toml @@ -30,3 +30,4 @@ opendal = { workspace = true } rooch-config = { workspace = true } serde_yaml = { workspace = true } xxhash-rust = { workspace = true, features = ["xxh3"] } +lz4 = { workspace = true } diff --git a/crates/rooch-da/docs/stream.md b/crates/rooch-da/docs/stream.md new file mode 100644 index 0000000000..3fb0e184f4 --- /dev/null +++ b/crates/rooch-da/docs/stream.md @@ -0,0 +1,32 @@ +DA Stream +==== + +DA Stream is a continuous flow of data from sequencer to verifier. It is a sequence of DA Batch. + +## Batch + +A batch is a collection of transactions. It is the unit of data flow in DA Stream. + +Each batch maps to a L2 block. + +## Chunk + +A chunk is a collection of DA Batch for better compression ratio. + +Components: + +- Chunk Header: Version, Chunk Number, Chunk Checksum +- Chunk Body: One or more DA batch after compression + +Version of chunk determines: + +1. chunk format: serialization/deserialization, compression algorithm of chunk body +2. segment format + +### Segment + +Segment consists of chunk bytes split by a certain size. + +Segment is the unit submitted to DA backend, designed to comply with the block size restrictions of the DA backend. + +Version of segment inherits from chunk version. \ No newline at end of file diff --git a/crates/rooch-da/docs/todo.md b/crates/rooch-da/docs/todo.md new file mode 100644 index 0000000000..15ea2860fb --- /dev/null +++ b/crates/rooch-da/docs/todo.md @@ -0,0 +1,11 @@ +TODO +=== + +## Chunk Builder + +Chunk Builder is a component to build chunks from batches, avoiding burst I/O to DA backend. + +1. Persist batch into buffer(local persistence layer or other faster media) first, then return ok(if high-performance is + preferred). +2. Split chunk into segments and submit segments to DA backend asynchronously. +3. Clean up batch buffer after segments being submitted to DA backend schedule. diff --git a/crates/rooch-da/src/chunk.rs b/crates/rooch-da/src/chunk.rs index 249bcde8c8..b05a1c23ad 100644 --- a/crates/rooch-da/src/chunk.rs +++ b/crates/rooch-da/src/chunk.rs @@ -1,17 +1,106 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use moveos_types::h256::H256; -use serde::Serialize; - -// DABatchV0 is the first version of the batch inside a chunk, each batch is a chunk -#[derive(Serialize, Debug, PartialEq, Clone)] -pub struct DABatchV0 { - pub version: u8, - // each batch maps to a L2 block - pub block_number: u128, - // sha3_256 hash of the batch data - pub batch_hash: H256, - // encoded tx list - pub data: Vec, +use crate::messages::Batch; +use crate::segment::{Segment, SegmentID, SegmentV0}; +use lz4::EncoderBuilder; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +pub enum ChunkVersion { + V0, + Unknown(u8), +} + +impl From for ChunkVersion { + fn from(num: u8) -> Self { + match num { + 0 => ChunkVersion::V0, + // ... + _ => Self::Unknown(num), + } + } +} + +impl From for u8 { + fn from(version: ChunkVersion) -> Self { + match version { + ChunkVersion::V0 => 0, + ChunkVersion::Unknown(num) => num, + } + } +} + +pub trait Chunk { + fn to_bytes(&self) -> Vec; + fn get_version(&self) -> ChunkVersion; + fn to_segments(&self, max_segment_size: usize) -> Vec>; +} + +// ChunkV0: +// 1. each chunk maps to a batch +// 2. batch_data compressed by lz4 +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +pub struct ChunkV0 { + pub version: ChunkVersion, + pub batch: Batch, +} + +impl From for ChunkV0 { + fn from(batch: Batch) -> Self { + Self { + version: ChunkVersion::V0, + batch: Batch { + block_number: batch.block_number, + tx_count: batch.tx_count, + prev_tx_accumulator_root: batch.prev_tx_accumulator_root, + tx_accumulator_root: batch.tx_accumulator_root, + batch_hash: batch.batch_hash, + data: batch.data, + }, + } + } +} + +impl Chunk for ChunkV0 { + fn to_bytes(&self) -> Vec { + let mut compressed_bytes = Vec::new(); + + { + let mut encoder = EncoderBuilder::new().build(&mut compressed_bytes).unwrap(); + bcs::serialize_into(&mut encoder, self).unwrap(); + let (_output, result) = encoder.finish(); + result.unwrap(); + } + + compressed_bytes + } + + fn get_version(&self) -> ChunkVersion { + ChunkVersion::V0 + } + + fn to_segments(&self, max_segment_size: usize) -> Vec> { + let bytes = self.to_bytes(); + let segments_data = bytes.chunks(max_segment_size); + let segments_count = segments_data.len(); + + let chunk_id = self.batch.block_number; + segments_data + .enumerate() + .map(|(i, data)| { + Box::new(SegmentV0 { + id: SegmentID { + chunk_id, + segment_number: i as u64, + }, + is_last: i == segments_count - 1, // extra info overhead is much smaller than max_block_size - max_segment_size + // *_checksum will be filled in to_bytes method of Segment + data_checksum: 0, + checksum: 0, + data: data.to_vec(), + }) as Box + }) + .collect::>() + } } diff --git a/crates/rooch-da/src/messages.rs b/crates/rooch-da/src/messages.rs index 6f31bee1b2..a4124d62ef 100644 --- a/crates/rooch-da/src/messages.rs +++ b/crates/rooch-da/src/messages.rs @@ -7,13 +7,20 @@ use serde::{Deserialize, Serialize}; use moveos_types::h256::H256; -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] pub struct Batch { - // each batch maps to a L2 block + /// each batch maps to a L2 block pub block_number: u128, - // sha3_256 hash of the batch data + /// How many transactions in the batch + pub tx_count: u64, + /// The previous tx accumulator root of the block + pub prev_tx_accumulator_root: H256, + /// The tx accumulator root after the last transaction append to the accumulator + pub tx_accumulator_root: H256, + + // sha256h of data pub batch_hash: H256, - // encoded tx list + // encoded tx(LedgerTransaction) list pub data: Vec, } diff --git a/crates/rooch-da/src/segment.rs b/crates/rooch-da/src/segment.rs index ad5bde3e77..ac1e244505 100644 --- a/crates/rooch-da/src/segment.rs +++ b/crates/rooch-da/src/segment.rs @@ -4,38 +4,14 @@ use std::fmt; use std::str::FromStr; +use crate::chunk::ChunkVersion; use serde::Serialize; use xxhash_rust::xxh3::xxh3_64; -#[derive(Debug, PartialEq, Eq)] -pub enum SegmentVersion { - V0, - Unknown(u8), -} - -impl From for SegmentVersion { - fn from(num: u8) -> Self { - match num { - 0 => SegmentVersion::V0, - // ... - _ => Self::Unknown(num), - } - } -} - -impl From for u8 { - fn from(version: SegmentVersion) -> Self { - match version { - SegmentVersion::V0 => 0, - SegmentVersion::Unknown(num) => num, - } - } -} - -// `Segment` is the basic unit of storage in DA server. +// Segment is the unit submitted to DA backend, designed to comply with the block size restrictions of the DA backend. pub trait Segment: Send { fn to_bytes(&self) -> Vec; - fn get_version(&self) -> SegmentVersion; + fn get_version(&self) -> ChunkVersion; fn get_id(&self) -> SegmentID; } @@ -97,7 +73,7 @@ impl SegmentV0 { impl Segment for SegmentV0 { fn to_bytes(&self) -> Vec { let mut bytes = Vec::with_capacity(SEGMENT_V0_DATA_OFFSET + self.data.len()); - bytes.push(SegmentVersion::V0.into()); // version + bytes.push(ChunkVersion::V0.into()); // version bytes.extend_from_slice(&self.id.chunk_id.to_le_bytes()); bytes.extend_from_slice(&self.id.segment_number.to_le_bytes()); bytes.push(self.is_last as u8); @@ -109,8 +85,8 @@ impl Segment for SegmentV0 { bytes } - fn get_version(&self) -> SegmentVersion { - SegmentVersion::V0 + fn get_version(&self) -> ChunkVersion { + ChunkVersion::V0 } fn get_id(&self) -> SegmentID { @@ -118,21 +94,20 @@ impl Segment for SegmentV0 { } } -pub fn get_data_offset(version: SegmentVersion) -> usize { +pub fn get_data_offset(version: ChunkVersion) -> usize { match version { - SegmentVersion::V0 => SEGMENT_V0_DATA_OFFSET, - SegmentVersion::Unknown(_) => panic!("unsupported segment version"), + ChunkVersion::V0 => SEGMENT_V0_DATA_OFFSET, + ChunkVersion::Unknown(_) => panic!("unsupported segment version"), } } -// falling back to Result here to cater for corrupted data etc pub fn segment_from_bytes(bytes: &[u8]) -> anyhow::Result> { let version = bytes[0]; - match SegmentVersion::from(version) { - SegmentVersion::V0 => Ok(Box::new(SegmentV0::from_bytes(bytes)?)), + match ChunkVersion::from(version) { + ChunkVersion::V0 => Ok(Box::new(SegmentV0::from_bytes(bytes)?)), // ... - SegmentVersion::Unknown(_) => Err(anyhow::anyhow!("unsupported segment version")), + ChunkVersion::Unknown(_) => Err(anyhow::anyhow!("unsupported segment version")), } } @@ -207,7 +182,7 @@ mod tests { let version = segment.get_version(); match version { - SegmentVersion::V0 => { + ChunkVersion::V0 => { let recovered_segment = SegmentV0::from_bytes(&bytes).expect("successful deserialization"); segment_v0.checksum = recovered_segment.checksum; diff --git a/crates/rooch-da/src/server/celestia/actor/server.rs b/crates/rooch-da/src/server/celestia/actor/server.rs index 229b677e23..c95c1d05e6 100644 --- a/crates/rooch-da/src/server/celestia/actor/server.rs +++ b/crates/rooch-da/src/server/celestia/actor/server.rs @@ -8,11 +8,10 @@ use coerce::actor::context::ActorContext; use coerce::actor::message::Handler; use coerce::actor::Actor; -use crate::chunk::DABatchV0; +use crate::chunk::{Chunk, ChunkV0}; use rooch_config::da_config::DAServerCelestiaConfig; use crate::messages::PutBatchInternalDAMessage; -use crate::segment::{SegmentID, SegmentV0}; use crate::server::celestia::backend::Backend; pub struct DAServerCelestiaActor { @@ -42,48 +41,13 @@ impl DAServerCelestiaActor { } } - // TODO reuse public_batch logic in openda - pub async fn public_batch(&self, batch: PutBatchInternalDAMessage) -> Result<()> { - // TODO using chunk builder to make segments: - // 1. persist batch into buffer then return ok - // 2. collect batch for better compression ratio - // 3. split chunk into segments - // 4. submit segments to celestia node - // 5. record segment id in order - // 6. clean up batch buffer - // TODO more chunk version supports - let chunk = DABatchV0 { - version: 0, - block_number: batch.batch.block_number, - batch_hash: batch.batch.batch_hash, - data: batch.batch.data, - }; - let chunk_bytes = bcs::to_bytes(&chunk).unwrap(); - let segs = chunk_bytes.chunks(self.max_segment_size); - let total = segs.len(); - - let chunk_id = batch.batch.block_number; - let segments = segs - .enumerate() - .map(|(i, data)| { - SegmentV0 { - id: SegmentID { - chunk_id, - segment_number: i as u64, - }, - is_last: i == total - 1, // extra info overhead is much smaller than max_block_size - max_segment_size - data_checksum: 0, - checksum: 0, - data: data.to_vec(), - } - }) - .collect::>(); - + pub async fn public_batch(&self, batch_msg: PutBatchInternalDAMessage) -> Result<()> { + let chunk: ChunkV0 = batch_msg.batch.into(); + let segments = chunk.to_segments(self.max_segment_size); for segment in segments { - // TODO record ok segment in order - // TODO segment indexer trait (local file, db, etc) - self.backend.submit(Box::new(segment)).await?; + self.backend.submit(segment).await?; } + Ok(()) } } diff --git a/crates/rooch-da/src/server/openda/actor/server.rs b/crates/rooch-da/src/server/openda/actor/server.rs index 1a8a0ca6f9..2c6353aa72 100644 --- a/crates/rooch-da/src/server/openda/actor/server.rs +++ b/crates/rooch-da/src/server/openda/actor/server.rs @@ -12,11 +12,10 @@ use rooch_config::config::retrieve_map_config_value; use std::collections::HashMap; use std::path::Path; -use crate::chunk::DABatchV0; +use crate::chunk::{Chunk, ChunkV0}; use rooch_config::da_config::{DAServerOpenDAConfig, OpenDAScheme}; use crate::messages::PutBatchInternalDAMessage; -use crate::segment::{Segment, SegmentID, SegmentV0}; pub struct DAServerOpenDAActor { max_segment_size: usize, @@ -119,50 +118,14 @@ impl DAServerOpenDAActor { }) } - pub async fn pub_batch(&self, batch: PutBatchInternalDAMessage) -> Result<()> { - // TODO using chunk builder to make segments: - // 1. persist batch into buffer then return ok - // 2. collect batch for better compression ratio - // 3. split chunk into segments - // 4. submit segments to celestia node - // 5. record segment id in order - // 6. clean up batch buffer - - // TODO more chunk version supports - let chunk = DABatchV0 { - version: 0, - block_number: batch.batch.block_number, - batch_hash: batch.batch.batch_hash, - data: batch.batch.data, - }; - let chunk_bytes = bcs::to_bytes(&chunk).unwrap(); - let segs = chunk_bytes.chunks(self.max_segment_size); - let total = segs.len(); - - // TODO explain why block number is a good idea: easy to get next block number for segments, then we could request chunk by block number - let chunk_id = batch.batch.block_number; - let segments = segs - .enumerate() - .map(|(i, data)| { - SegmentV0 { - id: SegmentID { - chunk_id, - segment_number: i as u64, - }, - is_last: i == total - 1, // extra info overhead is much smaller than max_block_size - max_segment_size - data_checksum: 0, - checksum: 0, - data: data.to_vec(), - } - }) - .collect::>(); - + pub async fn pub_batch(&self, batch_msg: PutBatchInternalDAMessage) -> Result<()> { + let chunk: ChunkV0 = batch_msg.batch.into(); + let segments = chunk.to_segments(self.max_segment_size); for segment in segments { let bytes = segment.to_bytes(); - - // TODO record ok segment in order - // TODO segment indexer trait (local file, db, etc) - self.operator.write(&segment.id.to_string(), bytes).await?; // TODO retry logic + self.operator + .write(&segment.get_id().to_string(), bytes) + .await?; } Ok(()) diff --git a/crates/rooch-proposer/src/actor/proposer.rs b/crates/rooch-proposer/src/actor/proposer.rs index 7128308d2c..7dce391c32 100644 --- a/crates/rooch-proposer/src/actor/proposer.rs +++ b/crates/rooch-proposer/src/actor/proposer.rs @@ -76,7 +76,7 @@ impl Handler for ProposerActor { log::debug!("[ProposeBlock] no transaction to propose block"); } }; - //TODO submit to the on-chain SCC contract use the proposer key + // TODO submit to the on-chain SCC contract use the proposer key let _proposer_key = &self.proposer_key; let batch_size = block.map(|v| v.batch_size).unwrap_or(0u64); self.metrics diff --git a/crates/rooch-proposer/src/scc/mod.rs b/crates/rooch-proposer/src/scc/mod.rs index 4118e1f2b5..02d23796c1 100644 --- a/crates/rooch-proposer/src/scc/mod.rs +++ b/crates/rooch-proposer/src/scc/mod.rs @@ -80,12 +80,14 @@ impl StateCommitmentChain { // submit batch to DA server // TODO move batch submit out of proposer let batch_data: Vec = self.buffer.iter().flat_map(|tx| tx.tx.encode()).collect(); - // regard batch(tx list) as a blob: easy to check integrity - let batch_hash = h256::sha3_256_of(&batch_data); + let batch_hash = h256::sha2_256_of(&batch_data); if let Err(e) = self .da .submit_batch(Batch { block_number, + tx_count: batch_size, + prev_tx_accumulator_root, + tx_accumulator_root, batch_hash, data: batch_data, }) diff --git a/crates/rooch-types/src/block.rs b/crates/rooch-types/src/block.rs index e0a0d7f17c..30cc44e70b 100644 --- a/crates/rooch-types/src/block.rs +++ b/crates/rooch-types/src/block.rs @@ -9,7 +9,6 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] pub struct Block { /// The index if the block - //TODO should we use the U256? pub block_number: u128, /// How many transactions in the block pub batch_size: u64, diff --git a/moveos/smt/src/jellyfish_merkle/hash.rs b/moveos/smt/src/jellyfish_merkle/hash.rs index c980d30da0..005ee77a6b 100644 --- a/moveos/smt/src/jellyfish_merkle/hash.rs +++ b/moveos/smt/src/jellyfish_merkle/hash.rs @@ -24,11 +24,11 @@ sha256t_hash_newtype! { #[cfg(any(test, feature = "fuzzing"))] impl Arbitrary for SMTNodeHash { type Parameters = (); - type Strategy = BoxedStrategy; - fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy { any::<[u8; 32]>().prop_map(SMTNodeHash::new).boxed() } + + type Strategy = BoxedStrategy; } pub(crate) fn merkle_hash(left: SMTNodeHash, right: SMTNodeHash) -> SMTNodeHash { From 6f4e2f28c40bcda5fddce66818e51d0de79ba3b2 Mon Sep 17 00:00:00 2001 From: popcnt1 Date: Thu, 19 Sep 2024 16:27:09 +0800 Subject: [PATCH 2/6] feat(rooch-da): add batch handling in chunk and segment Introduce batch retrieval and chunk creation from segments. This adds new methods to handle batch data and validate segment consistency, enhancing the chunk and segment interfaces. --- crates/rooch-da/src/chunk.rs | 96 +++++++++++++++++++++++++++++++++ crates/rooch-da/src/messages.rs | 5 +- crates/rooch-da/src/segment.rs | 17 ++++-- 3 files changed, 112 insertions(+), 6 deletions(-) diff --git a/crates/rooch-da/src/chunk.rs b/crates/rooch-da/src/chunk.rs index b05a1c23ad..b8825672c9 100644 --- a/crates/rooch-da/src/chunk.rs +++ b/crates/rooch-da/src/chunk.rs @@ -5,6 +5,7 @@ use crate::messages::Batch; use crate::segment::{Segment, SegmentID, SegmentV0}; use lz4::EncoderBuilder; use serde::{Deserialize, Serialize}; +use std::io; #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] pub enum ChunkVersion { @@ -35,6 +36,7 @@ pub trait Chunk { fn to_bytes(&self) -> Vec; fn get_version(&self) -> ChunkVersion; fn to_segments(&self, max_segment_size: usize) -> Vec>; + fn get_batch(&self) -> Batch; } // ChunkV0: @@ -103,4 +105,98 @@ impl Chunk for ChunkV0 { }) .collect::>() } + + fn get_batch(&self) -> Batch { + self.batch.clone() + } +} + +pub fn chunk_from_segments(segments: Vec>) -> anyhow::Result> { + if segments.is_empty() { + return Err(anyhow::anyhow!("empty segments")); + } + // check all segments have the same version + let versions = segments + .iter() + .map(|segment| segment.get_version()) + .collect::>(); + let version = versions.first().unwrap(); + if versions.iter().any(|seg_version| *seg_version != *version) { + return Err(anyhow::anyhow!("inconsistent segment versions")); + } + // check last segment.is_last == true, others must be false + if let Some(last_segment) = segments.last() { + if last_segment.is_last() { + if segments + .iter() + .take(segments.len() - 1) + .any(|segment| segment.is_last()) + { + return Err(anyhow::anyhow!("inconsistent is_last")); + } + } else { + return Err(anyhow::anyhow!("missing last segments")); + } + } + // check all segments have the same chunk_id, segment_number starts from 0 and increments by 1 + let chunk_id = segments.first().unwrap().get_id().chunk_id; + if segments.iter().enumerate().any(|(i, segment)| { + segment.get_id() + != SegmentID { + chunk_id, + segment_number: i as u64, + } + }) { + return Err(anyhow::anyhow!("inconsistent segment ids")); + } + + match version { + ChunkVersion::V0 => Ok(Box::new(ChunkV0::from_segments(segments)?)), + // ... + ChunkVersion::Unknown(_) => Err(anyhow::anyhow!("unsupported segment version")), + } +} + +impl ChunkV0 { + pub fn from_segments(segments: Vec>) -> anyhow::Result { + let bytes = segments + .iter() + .flat_map(|segment| segment.get_data()) + .collect::>(); + + let decoder = lz4::Decoder::new(&bytes[..])?; + let mut decompressed_reader = io::BufReader::new(decoder); + let chunk: ChunkV0 = bcs::from_reader(&mut decompressed_reader)?; + Ok(chunk) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use moveos_types::h256; + + #[test] + fn test_chunk_v0() { + let batch = Batch { + block_number: 1, + tx_count: 1, + prev_tx_accumulator_root: Default::default(), + tx_accumulator_root: Default::default(), + batch_hash: h256::sha2_256_of(&[1, 2, 3, 4, 5]), + data: vec![1, 2, 3, 4, 5], + }; + + let chunk = ChunkV0::from(batch.clone()); + let segments = chunk.to_segments(3); + assert_eq!(segments.len(), 39); + + let chunk = chunk_from_segments(segments).unwrap(); + assert_eq!(chunk.get_batch(), batch); + + assert_eq!( + chunk.get_batch().batch_hash, + h256::sha2_256_of(&[1, 2, 3, 4, 5]) + ); + } } diff --git a/crates/rooch-da/src/messages.rs b/crates/rooch-da/src/messages.rs index a4124d62ef..624684f136 100644 --- a/crates/rooch-da/src/messages.rs +++ b/crates/rooch-da/src/messages.rs @@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize}; use moveos_types::h256::H256; +/// The batch in Rooch is constructed by the batch submitter, representing a batch of transactions, mapping to a L2 block #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] pub struct Batch { /// each batch maps to a L2 block @@ -18,9 +19,9 @@ pub struct Batch { /// The tx accumulator root after the last transaction append to the accumulator pub tx_accumulator_root: H256, - // sha256h of data + /// sha256h of data pub batch_hash: H256, - // encoded tx(LedgerTransaction) list + /// encoded tx(LedgerTransaction) list pub data: Vec, } diff --git a/crates/rooch-da/src/segment.rs b/crates/rooch-da/src/segment.rs index ac1e244505..354105d8ab 100644 --- a/crates/rooch-da/src/segment.rs +++ b/crates/rooch-da/src/segment.rs @@ -1,18 +1,19 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use std::fmt; -use std::str::FromStr; - use crate::chunk::ChunkVersion; use serde::Serialize; +use std::fmt; +use std::str::FromStr; use xxhash_rust::xxh3::xxh3_64; // Segment is the unit submitted to DA backend, designed to comply with the block size restrictions of the DA backend. -pub trait Segment: Send { +pub trait Segment: fmt::Debug + Send { fn to_bytes(&self) -> Vec; fn get_version(&self) -> ChunkVersion; fn get_id(&self) -> SegmentID; + fn get_data(&self) -> Vec; + fn is_last(&self) -> bool; } pub const SEGMENT_V0_DATA_OFFSET: usize = 42; @@ -92,6 +93,14 @@ impl Segment for SegmentV0 { fn get_id(&self) -> SegmentID { self.id.clone() } + + fn get_data(&self) -> Vec { + self.data.clone() + } + + fn is_last(&self) -> bool { + self.is_last + } } pub fn get_data_offset(version: ChunkVersion) -> usize { From de143eb42149aa23d3582f35d33e0f6b3cc15bfc Mon Sep 17 00:00:00 2001 From: popcnt1 Date: Thu, 19 Sep 2024 17:41:33 +0800 Subject: [PATCH 3/6] feat(rooch-da): add logging for segment submission Add detailed log messages for segment submission to Celestia and Open-DA nodes. Improves error handling and debug capability by logging segment IDs, namespaces, commitments, and errors. --- crates/rooch-da/docs/todo.md | 10 +++++++ crates/rooch-da/src/actor/da.rs | 1 - crates/rooch-da/src/server/celestia/README.md | 2 +- .../src/server/celestia/actor/server.rs | 16 +++++----- .../rooch-da/src/server/celestia/backend.rs | 6 ++-- .../src/server/openda/actor/server.rs | 30 +++++++++++++------ 6 files changed, 42 insertions(+), 23 deletions(-) diff --git a/crates/rooch-da/docs/todo.md b/crates/rooch-da/docs/todo.md index 15ea2860fb..34d5788080 100644 --- a/crates/rooch-da/docs/todo.md +++ b/crates/rooch-da/docs/todo.md @@ -9,3 +9,13 @@ Chunk Builder is a component to build chunks from batches, avoiding burst I/O to preferred). 2. Split chunk into segments and submit segments to DA backend asynchronously. 3. Clean up batch buffer after segments being submitted to DA backend schedule. + +## DA Proxy + +### Get API + +1. get by block number +2. get by batch hash +3. pull by stream + +add FEC for SDC protection (wrong response attacks) \ No newline at end of file diff --git a/crates/rooch-da/src/actor/da.rs b/crates/rooch-da/src/actor/da.rs index f0bb909ff0..3a8a482405 100644 --- a/crates/rooch-da/src/actor/da.rs +++ b/crates/rooch-da/src/actor/da.rs @@ -36,7 +36,6 @@ impl Actor for DAActor {} impl DAActor { pub async fn new(da_config: DAConfig, actor_system: &ActorSystem) -> Result { // internal servers - let mut servers: Vec> = Vec::new(); let mut submit_threshold = 1; let mut success_count = 0; diff --git a/crates/rooch-da/src/server/celestia/README.md b/crates/rooch-da/src/server/celestia/README.md index 2fccc5db3d..bff30a9e5c 100644 --- a/crates/rooch-da/src/server/celestia/README.md +++ b/crates/rooch-da/src/server/celestia/README.md @@ -1,3 +1,3 @@ # Celestia Server -server implementation of using Celestia as DA backend. \ No newline at end of file +DA Server implementation of using Celestia as DA backend. \ No newline at end of file diff --git a/crates/rooch-da/src/server/celestia/actor/server.rs b/crates/rooch-da/src/server/celestia/actor/server.rs index c95c1d05e6..dad211ae30 100644 --- a/crates/rooch-da/src/server/celestia/actor/server.rs +++ b/crates/rooch-da/src/server/celestia/actor/server.rs @@ -19,15 +19,8 @@ pub struct DAServerCelestiaActor { backend: Backend, } -// TODO get request and response -// 1. get by block number -// 2. get by batch hash -// 3. pull by stream -// - impl Actor for DAServerCelestiaActor {} -// TODO add FEC get for SDC protection (wrong response attacks) impl DAServerCelestiaActor { pub async fn new(cfg: &DAServerCelestiaConfig) -> Self { let namespace_str = cfg.namespace.as_ref().unwrap().clone(); @@ -45,7 +38,14 @@ impl DAServerCelestiaActor { let chunk: ChunkV0 = batch_msg.batch.into(); let segments = chunk.to_segments(self.max_segment_size); for segment in segments { - self.backend.submit(segment).await?; + let result = self.backend.submit(segment).await?; + log::info!( + "submitted segment to celestia node, segment_id: {:?}, namespace: {:?}, commitment: {:?}, height: {}", + result.segment_id, + result.namespace, + result.commitment, + result.height, + ); } Ok(()) diff --git a/crates/rooch-da/src/server/celestia/backend.rs b/crates/rooch-da/src/server/celestia/backend.rs index 82d4c99843..ae1df33d3d 100644 --- a/crates/rooch-da/src/server/celestia/backend.rs +++ b/crates/rooch-da/src/server/celestia/backend.rs @@ -32,7 +32,6 @@ impl Backend { } } - // TODO return segment id, height, commitment pub async fn submit(&self, segment: Box) -> Result { let data = segment.to_bytes(); let blob = Blob::new(self.namespace, data).unwrap(); @@ -53,9 +52,8 @@ impl Backend { }), Err(e) => { log::warn!( - "failed to submit segment to celestia node, chunk: {}, segment: {}, commitment: {:?}, error:{:?}", - segment_id.chunk_id, - segment_id.segment_number, + "failed to submit segment to celestia node, segment_id: {:?}, commitment: {:?}, error:{:?}", + segment_id, blob.commitment, e, ); diff --git a/crates/rooch-da/src/server/openda/actor/server.rs b/crates/rooch-da/src/server/openda/actor/server.rs index 2c6353aa72..408335ed60 100644 --- a/crates/rooch-da/src/server/openda/actor/server.rs +++ b/crates/rooch-da/src/server/openda/actor/server.rs @@ -22,15 +22,10 @@ pub struct DAServerOpenDAActor { operator: Operator, } -// TODO get request and response -// 1. get by block number -// 2. get by batch hash -// 3. pull by stream -// +pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 4 * 1024 * 1024; impl Actor for DAServerOpenDAActor {} -// TODO add FEC get for SDC protection (wrong response attacks) impl DAServerOpenDAActor { pub async fn new(cfg: &DAServerOpenDAConfig) -> Result { let mut config = cfg.clone(); @@ -113,7 +108,7 @@ impl DAServerOpenDAActor { }; Ok(Self { - max_segment_size: cfg.max_segment_size.unwrap_or(4 * 1024 * 1024) as usize, + max_segment_size: cfg.max_segment_size.unwrap_or(DEFAULT_MAX_SEGMENT_SIZE) as usize, operator: op, }) } @@ -123,9 +118,26 @@ impl DAServerOpenDAActor { let segments = chunk.to_segments(self.max_segment_size); for segment in segments { let bytes = segment.to_bytes(); - self.operator + match self + .operator .write(&segment.get_id().to_string(), bytes) - .await?; + .await + { + Ok(_) => { + log::info!( + "submitted segment to open-da node, segment: {:?}", + segment.get_id(), + ); + } + Err(e) => { + log::warn!( + "failed to submit segment to open-da node, segment_id: {:?}, error:{:?}", + segment.get_id(), + e, + ); + return Err(e.into()); + } + } } Ok(()) From 00b215e0a4cbe19faae5eeb4f782421f75ed9b32 Mon Sep 17 00:00:00 2001 From: popcnt1 Date: Thu, 19 Sep 2024 20:18:49 +0800 Subject: [PATCH 4/6] feat(ci): split deploy scripts for testnet and mainnet Refactor deployment workflows for distinct environments. Added separate scripts for testnet and mainnet deployments. Introduced a new constant for default max retry times in DAServerOpenDAActor. --- .github/workflows/deploy_mainnet.yml | 10 +++++++-- .github/workflows/deploy_testnet.yml | 2 +- .../src/server/openda/actor/server.rs | 3 ++- scripts/deploy_rooch_mainnet.sh | 21 +++++++++++++++++++ ...eploy_rooch.sh => deploy_rooch_testnet.sh} | 0 5 files changed, 32 insertions(+), 4 deletions(-) create mode 100644 scripts/deploy_rooch_mainnet.sh rename scripts/{deploy_rooch.sh => deploy_rooch_testnet.sh} (100%) diff --git a/.github/workflows/deploy_mainnet.yml b/.github/workflows/deploy_mainnet.yml index 78e01b4295..807927689f 100644 --- a/.github/workflows/deploy_mainnet.yml +++ b/.github/workflows/deploy_mainnet.yml @@ -26,5 +26,11 @@ jobs: chmod 600 private_key.pem sudo apt update sudo apt install -y --no-install-recommends openssh-server - ssh -o StrictHostKeyChecking=no -i private_key.pem $USER@$HOST bash -c "'sleep 30' && && docker image prune -a -f && docker ps | grep rooch | awk '{print \$1}' | xargs -r docker stop && docker ps -a | grep rooch | awk '{print \$1}' | xargs -r docker rm -f && docker pull 'ghcr.io/rooch-network/rooch:${{ github.event.inputs.tagName }}' && docker run -d -v /data:/root -p 6767:6767 'ghcr.io/rooch-network/rooch:${{ github.event.inputs.tagName }}' server start -n main --btc-rpc-url '${{secrets.BTC_MAIN_RPC_URL}}' --btc-rpc-username rooch-main --btc-rpc-password '${{secrets.BTC_MAIN_RPC_PWD}}'" - + BRANCH=$(basename ${{ github.ref }}) + ssh -o StrictHostKeyChecking=no -i private_key.pem $USER@$HOST \ + "cd /root/rooch && git fetch origin && git checkout -B $BRANCH origin/$BRANCH || git checkout $BRANCH && bash scripts/deploy_rooch_testnet.sh \ + '${{ env.REF }}' \ + '${{ secrets.BTC_MAIN_RPC_URL }}' \ + '${{ secrets.BTC_MAIN_RPC_PWD }}' \ + '${{ secrets.OPENDA_GCP_MAINNET_BUCKET }}' \ + '${{ secrets.OPENDA_GCP_MAINNET_CREDENTIAL }}'" diff --git a/.github/workflows/deploy_testnet.yml b/.github/workflows/deploy_testnet.yml index b2506a1a07..8e708f0e06 100644 --- a/.github/workflows/deploy_testnet.yml +++ b/.github/workflows/deploy_testnet.yml @@ -68,7 +68,7 @@ jobs: sudo apt install -y --no-install-recommends openssh-server BRANCH=$(basename ${{ github.ref }}) ssh -o StrictHostKeyChecking=no -i private_key.pem $USER@$HOST \ - "cd /root/rooch && git fetch origin && git checkout -B $BRANCH origin/$BRANCH || git checkout $BRANCH && bash scripts/deploy_rooch.sh \ + "cd /root/rooch && git fetch origin && git checkout -B $BRANCH origin/$BRANCH || git checkout $BRANCH && bash scripts/deploy_rooch_testnet.sh \ '${{ env.REF }}' \ '${{ secrets.BTC_TEST_RPC_URL }}' \ '${{ secrets.BTC_TEST_RPC_PWD }}' \ diff --git a/crates/rooch-da/src/server/openda/actor/server.rs b/crates/rooch-da/src/server/openda/actor/server.rs index 408335ed60..4db1bcd310 100644 --- a/crates/rooch-da/src/server/openda/actor/server.rs +++ b/crates/rooch-da/src/server/openda/actor/server.rs @@ -23,6 +23,7 @@ pub struct DAServerOpenDAActor { } pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 4 * 1024 * 1024; +pub const DEFAULT_MAX_RETRY_TIMES: usize = 4; impl Actor for DAServerOpenDAActor {} @@ -166,7 +167,7 @@ async fn new_retry_operator( max_retry_times: Option, ) -> Result { let mut op = Operator::via_map(scheme, config)?; - let max_times = max_retry_times.unwrap_or(4); + let max_times = max_retry_times.unwrap_or(DEFAULT_MAX_RETRY_TIMES); op = op .layer(RetryLayer::new().with_max_times(max_times)) .layer(LoggingLayer::default()); diff --git a/scripts/deploy_rooch_mainnet.sh b/scripts/deploy_rooch_mainnet.sh new file mode 100644 index 0000000000..1dcde81466 --- /dev/null +++ b/scripts/deploy_rooch_mainnet.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# Copyright (c) RoochNetwork +# SPDX-License-Identifier: Apache-2.0 + +REF="$1" +BTC_MAIN_RPC_URL="$2" +BTC_MAIN_RPC_PWD="$3" +OPENDA_GCP_MAINNET_BUCKET="$4" +OPENDA_GCP_MAINNET_CREDENTIAL="$5" + +sleep 30 +docker image prune -a -f +docker ps | grep rooch | awk '{print $1}' | xargs -r docker stop +docker ps -a | grep rooch | awk '{print $1}' | xargs -r docker rm -f +docker pull "ghcr.io/rooch-network/rooch:$REF" +docker run -d --name rooch --restart unless-stopped -v /data:/root -p 6767:6767 -p 9184:9184 -e RUST_BACKTRACE=full "ghcr.io/rooch-network/rooch:$REF" \ + server start -n main \ + --btc-rpc-url "$BTC_MAIN_RPC_URL" \ + --btc-rpc-username rooch-main \ + --btc-rpc-password "$BTC_MAIN_RPC_PWD" \ + --da "{\"internal-da-server\": {\"servers\": [{\"open-da\": {\"scheme\": \"gcs\", \"config\": {\"bucket\": \"$OPENDA_GCP_MAINNET_BUCKET\", \"credential\": \"$OPENDA_GCP_MAINNET_CREDENTIAL\"}}}]}}" diff --git a/scripts/deploy_rooch.sh b/scripts/deploy_rooch_testnet.sh similarity index 100% rename from scripts/deploy_rooch.sh rename to scripts/deploy_rooch_testnet.sh From ccb52b7cc858ede8a50c81ffd4e7da8a44757179 Mon Sep 17 00:00:00 2001 From: popcnt1 Date: Thu, 19 Sep 2024 20:28:22 +0800 Subject: [PATCH 5/6] typo --- .github/workflows/deploy_mainnet.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/deploy_mainnet.yml b/.github/workflows/deploy_mainnet.yml index 807927689f..8d943f18a0 100644 --- a/.github/workflows/deploy_mainnet.yml +++ b/.github/workflows/deploy_mainnet.yml @@ -28,7 +28,7 @@ jobs: sudo apt install -y --no-install-recommends openssh-server BRANCH=$(basename ${{ github.ref }}) ssh -o StrictHostKeyChecking=no -i private_key.pem $USER@$HOST \ - "cd /root/rooch && git fetch origin && git checkout -B $BRANCH origin/$BRANCH || git checkout $BRANCH && bash scripts/deploy_rooch_testnet.sh \ + "cd /root/rooch && git fetch origin && git checkout -B $BRANCH origin/$BRANCH || git checkout $BRANCH && bash scripts/deploy_rooch_mainnet.sh \ '${{ env.REF }}' \ '${{ secrets.BTC_MAIN_RPC_URL }}' \ '${{ secrets.BTC_MAIN_RPC_PWD }}' \ From f1f46fb0da3bd75c666c04ceb3102ac8c6502b29 Mon Sep 17 00:00:00 2001 From: popcnt1 Date: Thu, 19 Sep 2024 22:17:11 +0800 Subject: [PATCH 6/6] feat(scripts): filter out faucet container in deploy script Updated deploy_rooch_mainnet.sh to exclude the faucet container when stopping and removing Docker containers. This ensures the faucet container stays running during the deployment process. --- scripts/deploy_rooch_mainnet.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/deploy_rooch_mainnet.sh b/scripts/deploy_rooch_mainnet.sh index 1dcde81466..f65ae25c80 100644 --- a/scripts/deploy_rooch_mainnet.sh +++ b/scripts/deploy_rooch_mainnet.sh @@ -10,8 +10,8 @@ OPENDA_GCP_MAINNET_CREDENTIAL="$5" sleep 30 docker image prune -a -f -docker ps | grep rooch | awk '{print $1}' | xargs -r docker stop -docker ps -a | grep rooch | awk '{print $1}' | xargs -r docker rm -f +docker ps | grep rooch | grep -v faucet | awk '{print $1}' | xargs -r docker stop +docker ps -a | grep rooch | grep -v faucet | awk '{print $1}' | xargs -r docker rm -f docker pull "ghcr.io/rooch-network/rooch:$REF" docker run -d --name rooch --restart unless-stopped -v /data:/root -p 6767:6767 -p 9184:9184 -e RUST_BACKTRACE=full "ghcr.io/rooch-network/rooch:$REF" \ server start -n main \