diff --git a/.github/workflows/deploy_mainnet.yml b/.github/workflows/deploy_mainnet.yml index 78dcef21cf..8d943f18a0 100644 --- a/.github/workflows/deploy_mainnet.yml +++ b/.github/workflows/deploy_mainnet.yml @@ -3,9 +3,9 @@ on: workflow_dispatch: inputs: tagName: - description: 'Tag Name to Checkout' + description: 'Tag or branch to deploy' + default: 'main' required: true - default: 'latest' # workflow_run: # workflows: ["Build Docker And Deploy Seed"] # types: @@ -13,7 +13,7 @@ on: jobs: deploy-rooch-mainnet: - name: deploy rooch mainnet + name: Deploy Rooch Mainnet runs-on: self-hosted steps: - name: Deploy to GCP MAINNET VM @@ -26,5 +26,11 @@ jobs: chmod 600 private_key.pem sudo apt update sudo apt install -y --no-install-recommends openssh-server - ssh -o StrictHostKeyChecking=no -i private_key.pem $USER@$HOST bash -c "'sleep 30' && && docker image prune -a -f && docker ps | grep rooch | awk '{print \$1}' | xargs -r docker stop && docker ps -a | grep rooch | awk '{print \$1}' | xargs -r docker rm -f && docker pull 'ghcr.io/rooch-network/rooch:${{ github.event.inputs.tagName }}' && docker run -d -v /data:/root -p 6767:6767 'ghcr.io/rooch-network/rooch:${{ github.event.inputs.tagName }}' server start -n main --btc-rpc-url '${{secrets.BTC_MAIN_RPC_URL}}' --btc-rpc-username rooch-main --btc-rpc-password '${{secrets.BTC_MAIN_RPC_PWD}}' --btc-start-block-height 0 --btc-end-block-height 767420 --data-import-mode 1" - + BRANCH=$(basename ${{ github.ref }}) + ssh -o StrictHostKeyChecking=no -i private_key.pem $USER@$HOST \ + "cd /root/rooch && git fetch origin && git checkout -B $BRANCH origin/$BRANCH || git checkout $BRANCH && bash scripts/deploy_rooch_mainnet.sh \ + '${{ env.REF }}' \ + '${{ secrets.BTC_MAIN_RPC_URL }}' \ + '${{ secrets.BTC_MAIN_RPC_PWD }}' \ + '${{ secrets.OPENDA_GCP_MAINNET_BUCKET }}' \ + '${{ secrets.OPENDA_GCP_MAINNET_CREDENTIAL }}'" diff --git a/.github/workflows/deploy_testnet.yml b/.github/workflows/deploy_testnet.yml index b2506a1a07..8e708f0e06 100644 --- a/.github/workflows/deploy_testnet.yml +++ b/.github/workflows/deploy_testnet.yml @@ -68,7 +68,7 @@ jobs: sudo apt install -y --no-install-recommends openssh-server BRANCH=$(basename ${{ github.ref }}) ssh -o StrictHostKeyChecking=no -i private_key.pem $USER@$HOST \ - "cd /root/rooch && git fetch origin && git checkout -B $BRANCH origin/$BRANCH || git checkout $BRANCH && bash scripts/deploy_rooch.sh \ + "cd /root/rooch && git fetch origin && git checkout -B $BRANCH origin/$BRANCH || git checkout $BRANCH && bash scripts/deploy_rooch_testnet.sh \ '${{ env.REF }}' \ '${{ secrets.BTC_TEST_RPC_URL }}' \ '${{ secrets.BTC_TEST_RPC_PWD }}' \ diff --git a/Cargo.lock b/Cargo.lock index a54b2dfdc4..dfb4c9afa7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1580,13 +1580,13 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.98" +version = "1.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41c270e7540d725e65ac7f1b212ac8ce349719624d7bcff99f8e2e488e8cf03f" +checksum = "07b1695e2c7e8fc85310cde85aeaab7e3097f593c91d209d3f9df76c928100f0" dependencies = [ "jobserver", "libc", - "once_cell", + "shlex", ] [[package]] @@ -5882,11 +5882,20 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "lz4" +version = "1.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a231296ca742e418c43660cb68e082486ff2538e8db432bc818580f3965025ed" +dependencies = [ + "lz4-sys", +] + [[package]] name = "lz4-sys" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "109de74d5d2353660401699a4174a4ff23fcc649caf553df71933c7fb45ad868" +checksum = "fcb44a01837a858d47e5a630d2ccf304c8efcc4b83b8f9f75b7a9ee4fcc6e57d" dependencies = [ "cc", "libc", @@ -9352,6 +9361,7 @@ dependencies = [ "coerce", "futures", "log", + "lz4", "moveos-types", "opendal", "rooch-config", diff --git a/Cargo.toml b/Cargo.toml index afd277e320..9fe48227ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -332,6 +332,7 @@ uuid = { version = "1.10.0", features = ["v4", "fast-rng"] } protobuf = { version = "2.28", features = ["with-bytes"] } redb = { version = "2.1.1" } rocksdb = { git = "https://github.com/rooch-network/rust-rocksdb.git", rev = "41d102327ba3cf9a2335d1192e8312c92bc3d6f9", features = ["lz4"] } +lz4 = { version = "1.27.0" } ripemd = { version = "0.1.3" } fastcrypto-zkp = { version = "0.1.3" } diff --git a/crates/rooch-da/Cargo.toml b/crates/rooch-da/Cargo.toml index a821494b28..675d69c60a 100644 --- a/crates/rooch-da/Cargo.toml +++ b/crates/rooch-da/Cargo.toml @@ -30,3 +30,4 @@ opendal = { workspace = true } rooch-config = { workspace = true } serde_yaml = { workspace = true } xxhash-rust = { workspace = true, features = ["xxh3"] } +lz4 = { workspace = true } diff --git a/crates/rooch-da/docs/stream.md b/crates/rooch-da/docs/stream.md new file mode 100644 index 0000000000..3fb0e184f4 --- /dev/null +++ b/crates/rooch-da/docs/stream.md @@ -0,0 +1,32 @@ +DA Stream +==== + +DA Stream is a continuous flow of data from sequencer to verifier. It is a sequence of DA Batch. + +## Batch + +A batch is a collection of transactions. It is the unit of data flow in DA Stream. + +Each batch maps to a L2 block. + +## Chunk + +A chunk is a collection of DA Batch for better compression ratio. + +Components: + +- Chunk Header: Version, Chunk Number, Chunk Checksum +- Chunk Body: One or more DA batch after compression + +Version of chunk determines: + +1. chunk format: serialization/deserialization, compression algorithm of chunk body +2. segment format + +### Segment + +Segment consists of chunk bytes split by a certain size. + +Segment is the unit submitted to DA backend, designed to comply with the block size restrictions of the DA backend. + +Version of segment inherits from chunk version. \ No newline at end of file diff --git a/crates/rooch-da/docs/todo.md b/crates/rooch-da/docs/todo.md new file mode 100644 index 0000000000..34d5788080 --- /dev/null +++ b/crates/rooch-da/docs/todo.md @@ -0,0 +1,21 @@ +TODO +=== + +## Chunk Builder + +Chunk Builder is a component to build chunks from batches, avoiding burst I/O to DA backend. + +1. Persist batch into buffer(local persistence layer or other faster media) first, then return ok(if high-performance is + preferred). +2. Split chunk into segments and submit segments to DA backend asynchronously. +3. Clean up batch buffer after segments being submitted to DA backend schedule. + +## DA Proxy + +### Get API + +1. get by block number +2. get by batch hash +3. pull by stream + +add FEC for SDC protection (wrong response attacks) \ No newline at end of file diff --git a/crates/rooch-da/src/actor/da.rs b/crates/rooch-da/src/actor/da.rs index f0bb909ff0..3a8a482405 100644 --- a/crates/rooch-da/src/actor/da.rs +++ b/crates/rooch-da/src/actor/da.rs @@ -36,7 +36,6 @@ impl Actor for DAActor {} impl DAActor { pub async fn new(da_config: DAConfig, actor_system: &ActorSystem) -> Result { // internal servers - let mut servers: Vec> = Vec::new(); let mut submit_threshold = 1; let mut success_count = 0; diff --git a/crates/rooch-da/src/chunk.rs b/crates/rooch-da/src/chunk.rs index 249bcde8c8..b8825672c9 100644 --- a/crates/rooch-da/src/chunk.rs +++ b/crates/rooch-da/src/chunk.rs @@ -1,17 +1,202 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use moveos_types::h256::H256; -use serde::Serialize; - -// DABatchV0 is the first version of the batch inside a chunk, each batch is a chunk -#[derive(Serialize, Debug, PartialEq, Clone)] -pub struct DABatchV0 { - pub version: u8, - // each batch maps to a L2 block - pub block_number: u128, - // sha3_256 hash of the batch data - pub batch_hash: H256, - // encoded tx list - pub data: Vec, +use crate::messages::Batch; +use crate::segment::{Segment, SegmentID, SegmentV0}; +use lz4::EncoderBuilder; +use serde::{Deserialize, Serialize}; +use std::io; + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +pub enum ChunkVersion { + V0, + Unknown(u8), +} + +impl From for ChunkVersion { + fn from(num: u8) -> Self { + match num { + 0 => ChunkVersion::V0, + // ... + _ => Self::Unknown(num), + } + } +} + +impl From for u8 { + fn from(version: ChunkVersion) -> Self { + match version { + ChunkVersion::V0 => 0, + ChunkVersion::Unknown(num) => num, + } + } +} + +pub trait Chunk { + fn to_bytes(&self) -> Vec; + fn get_version(&self) -> ChunkVersion; + fn to_segments(&self, max_segment_size: usize) -> Vec>; + fn get_batch(&self) -> Batch; +} + +// ChunkV0: +// 1. each chunk maps to a batch +// 2. batch_data compressed by lz4 +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +pub struct ChunkV0 { + pub version: ChunkVersion, + pub batch: Batch, +} + +impl From for ChunkV0 { + fn from(batch: Batch) -> Self { + Self { + version: ChunkVersion::V0, + batch: Batch { + block_number: batch.block_number, + tx_count: batch.tx_count, + prev_tx_accumulator_root: batch.prev_tx_accumulator_root, + tx_accumulator_root: batch.tx_accumulator_root, + batch_hash: batch.batch_hash, + data: batch.data, + }, + } + } +} + +impl Chunk for ChunkV0 { + fn to_bytes(&self) -> Vec { + let mut compressed_bytes = Vec::new(); + + { + let mut encoder = EncoderBuilder::new().build(&mut compressed_bytes).unwrap(); + bcs::serialize_into(&mut encoder, self).unwrap(); + let (_output, result) = encoder.finish(); + result.unwrap(); + } + + compressed_bytes + } + + fn get_version(&self) -> ChunkVersion { + ChunkVersion::V0 + } + + fn to_segments(&self, max_segment_size: usize) -> Vec> { + let bytes = self.to_bytes(); + let segments_data = bytes.chunks(max_segment_size); + let segments_count = segments_data.len(); + + let chunk_id = self.batch.block_number; + segments_data + .enumerate() + .map(|(i, data)| { + Box::new(SegmentV0 { + id: SegmentID { + chunk_id, + segment_number: i as u64, + }, + is_last: i == segments_count - 1, // extra info overhead is much smaller than max_block_size - max_segment_size + // *_checksum will be filled in to_bytes method of Segment + data_checksum: 0, + checksum: 0, + data: data.to_vec(), + }) as Box + }) + .collect::>() + } + + fn get_batch(&self) -> Batch { + self.batch.clone() + } +} + +pub fn chunk_from_segments(segments: Vec>) -> anyhow::Result> { + if segments.is_empty() { + return Err(anyhow::anyhow!("empty segments")); + } + // check all segments have the same version + let versions = segments + .iter() + .map(|segment| segment.get_version()) + .collect::>(); + let version = versions.first().unwrap(); + if versions.iter().any(|seg_version| *seg_version != *version) { + return Err(anyhow::anyhow!("inconsistent segment versions")); + } + // check last segment.is_last == true, others must be false + if let Some(last_segment) = segments.last() { + if last_segment.is_last() { + if segments + .iter() + .take(segments.len() - 1) + .any(|segment| segment.is_last()) + { + return Err(anyhow::anyhow!("inconsistent is_last")); + } + } else { + return Err(anyhow::anyhow!("missing last segments")); + } + } + // check all segments have the same chunk_id, segment_number starts from 0 and increments by 1 + let chunk_id = segments.first().unwrap().get_id().chunk_id; + if segments.iter().enumerate().any(|(i, segment)| { + segment.get_id() + != SegmentID { + chunk_id, + segment_number: i as u64, + } + }) { + return Err(anyhow::anyhow!("inconsistent segment ids")); + } + + match version { + ChunkVersion::V0 => Ok(Box::new(ChunkV0::from_segments(segments)?)), + // ... + ChunkVersion::Unknown(_) => Err(anyhow::anyhow!("unsupported segment version")), + } +} + +impl ChunkV0 { + pub fn from_segments(segments: Vec>) -> anyhow::Result { + let bytes = segments + .iter() + .flat_map(|segment| segment.get_data()) + .collect::>(); + + let decoder = lz4::Decoder::new(&bytes[..])?; + let mut decompressed_reader = io::BufReader::new(decoder); + let chunk: ChunkV0 = bcs::from_reader(&mut decompressed_reader)?; + Ok(chunk) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use moveos_types::h256; + + #[test] + fn test_chunk_v0() { + let batch = Batch { + block_number: 1, + tx_count: 1, + prev_tx_accumulator_root: Default::default(), + tx_accumulator_root: Default::default(), + batch_hash: h256::sha2_256_of(&[1, 2, 3, 4, 5]), + data: vec![1, 2, 3, 4, 5], + }; + + let chunk = ChunkV0::from(batch.clone()); + let segments = chunk.to_segments(3); + assert_eq!(segments.len(), 39); + + let chunk = chunk_from_segments(segments).unwrap(); + assert_eq!(chunk.get_batch(), batch); + + assert_eq!( + chunk.get_batch().batch_hash, + h256::sha2_256_of(&[1, 2, 3, 4, 5]) + ); + } } diff --git a/crates/rooch-da/src/messages.rs b/crates/rooch-da/src/messages.rs index 6f31bee1b2..624684f136 100644 --- a/crates/rooch-da/src/messages.rs +++ b/crates/rooch-da/src/messages.rs @@ -7,13 +7,21 @@ use serde::{Deserialize, Serialize}; use moveos_types::h256::H256; -#[derive(Debug, Serialize, Deserialize, Clone)] +/// The batch in Rooch is constructed by the batch submitter, representing a batch of transactions, mapping to a L2 block +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] pub struct Batch { - // each batch maps to a L2 block + /// each batch maps to a L2 block pub block_number: u128, - // sha3_256 hash of the batch data + /// How many transactions in the batch + pub tx_count: u64, + /// The previous tx accumulator root of the block + pub prev_tx_accumulator_root: H256, + /// The tx accumulator root after the last transaction append to the accumulator + pub tx_accumulator_root: H256, + + /// sha256h of data pub batch_hash: H256, - // encoded tx list + /// encoded tx(LedgerTransaction) list pub data: Vec, } diff --git a/crates/rooch-da/src/segment.rs b/crates/rooch-da/src/segment.rs index ad5bde3e77..354105d8ab 100644 --- a/crates/rooch-da/src/segment.rs +++ b/crates/rooch-da/src/segment.rs @@ -1,42 +1,19 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 +use crate::chunk::ChunkVersion; +use serde::Serialize; use std::fmt; use std::str::FromStr; - -use serde::Serialize; use xxhash_rust::xxh3::xxh3_64; -#[derive(Debug, PartialEq, Eq)] -pub enum SegmentVersion { - V0, - Unknown(u8), -} - -impl From for SegmentVersion { - fn from(num: u8) -> Self { - match num { - 0 => SegmentVersion::V0, - // ... - _ => Self::Unknown(num), - } - } -} - -impl From for u8 { - fn from(version: SegmentVersion) -> Self { - match version { - SegmentVersion::V0 => 0, - SegmentVersion::Unknown(num) => num, - } - } -} - -// `Segment` is the basic unit of storage in DA server. -pub trait Segment: Send { +// Segment is the unit submitted to DA backend, designed to comply with the block size restrictions of the DA backend. +pub trait Segment: fmt::Debug + Send { fn to_bytes(&self) -> Vec; - fn get_version(&self) -> SegmentVersion; + fn get_version(&self) -> ChunkVersion; fn get_id(&self) -> SegmentID; + fn get_data(&self) -> Vec; + fn is_last(&self) -> bool; } pub const SEGMENT_V0_DATA_OFFSET: usize = 42; @@ -97,7 +74,7 @@ impl SegmentV0 { impl Segment for SegmentV0 { fn to_bytes(&self) -> Vec { let mut bytes = Vec::with_capacity(SEGMENT_V0_DATA_OFFSET + self.data.len()); - bytes.push(SegmentVersion::V0.into()); // version + bytes.push(ChunkVersion::V0.into()); // version bytes.extend_from_slice(&self.id.chunk_id.to_le_bytes()); bytes.extend_from_slice(&self.id.segment_number.to_le_bytes()); bytes.push(self.is_last as u8); @@ -109,30 +86,37 @@ impl Segment for SegmentV0 { bytes } - fn get_version(&self) -> SegmentVersion { - SegmentVersion::V0 + fn get_version(&self) -> ChunkVersion { + ChunkVersion::V0 } fn get_id(&self) -> SegmentID { self.id.clone() } + + fn get_data(&self) -> Vec { + self.data.clone() + } + + fn is_last(&self) -> bool { + self.is_last + } } -pub fn get_data_offset(version: SegmentVersion) -> usize { +pub fn get_data_offset(version: ChunkVersion) -> usize { match version { - SegmentVersion::V0 => SEGMENT_V0_DATA_OFFSET, - SegmentVersion::Unknown(_) => panic!("unsupported segment version"), + ChunkVersion::V0 => SEGMENT_V0_DATA_OFFSET, + ChunkVersion::Unknown(_) => panic!("unsupported segment version"), } } -// falling back to Result here to cater for corrupted data etc pub fn segment_from_bytes(bytes: &[u8]) -> anyhow::Result> { let version = bytes[0]; - match SegmentVersion::from(version) { - SegmentVersion::V0 => Ok(Box::new(SegmentV0::from_bytes(bytes)?)), + match ChunkVersion::from(version) { + ChunkVersion::V0 => Ok(Box::new(SegmentV0::from_bytes(bytes)?)), // ... - SegmentVersion::Unknown(_) => Err(anyhow::anyhow!("unsupported segment version")), + ChunkVersion::Unknown(_) => Err(anyhow::anyhow!("unsupported segment version")), } } @@ -207,7 +191,7 @@ mod tests { let version = segment.get_version(); match version { - SegmentVersion::V0 => { + ChunkVersion::V0 => { let recovered_segment = SegmentV0::from_bytes(&bytes).expect("successful deserialization"); segment_v0.checksum = recovered_segment.checksum; diff --git a/crates/rooch-da/src/server/celestia/README.md b/crates/rooch-da/src/server/celestia/README.md index 2fccc5db3d..bff30a9e5c 100644 --- a/crates/rooch-da/src/server/celestia/README.md +++ b/crates/rooch-da/src/server/celestia/README.md @@ -1,3 +1,3 @@ # Celestia Server -server implementation of using Celestia as DA backend. \ No newline at end of file +DA Server implementation of using Celestia as DA backend. \ No newline at end of file diff --git a/crates/rooch-da/src/server/celestia/actor/server.rs b/crates/rooch-da/src/server/celestia/actor/server.rs index 229b677e23..dad211ae30 100644 --- a/crates/rooch-da/src/server/celestia/actor/server.rs +++ b/crates/rooch-da/src/server/celestia/actor/server.rs @@ -8,11 +8,10 @@ use coerce::actor::context::ActorContext; use coerce::actor::message::Handler; use coerce::actor::Actor; -use crate::chunk::DABatchV0; +use crate::chunk::{Chunk, ChunkV0}; use rooch_config::da_config::DAServerCelestiaConfig; use crate::messages::PutBatchInternalDAMessage; -use crate::segment::{SegmentID, SegmentV0}; use crate::server::celestia::backend::Backend; pub struct DAServerCelestiaActor { @@ -20,15 +19,8 @@ pub struct DAServerCelestiaActor { backend: Backend, } -// TODO get request and response -// 1. get by block number -// 2. get by batch hash -// 3. pull by stream -// - impl Actor for DAServerCelestiaActor {} -// TODO add FEC get for SDC protection (wrong response attacks) impl DAServerCelestiaActor { pub async fn new(cfg: &DAServerCelestiaConfig) -> Self { let namespace_str = cfg.namespace.as_ref().unwrap().clone(); @@ -42,48 +34,20 @@ impl DAServerCelestiaActor { } } - // TODO reuse public_batch logic in openda - pub async fn public_batch(&self, batch: PutBatchInternalDAMessage) -> Result<()> { - // TODO using chunk builder to make segments: - // 1. persist batch into buffer then return ok - // 2. collect batch for better compression ratio - // 3. split chunk into segments - // 4. submit segments to celestia node - // 5. record segment id in order - // 6. clean up batch buffer - // TODO more chunk version supports - let chunk = DABatchV0 { - version: 0, - block_number: batch.batch.block_number, - batch_hash: batch.batch.batch_hash, - data: batch.batch.data, - }; - let chunk_bytes = bcs::to_bytes(&chunk).unwrap(); - let segs = chunk_bytes.chunks(self.max_segment_size); - let total = segs.len(); - - let chunk_id = batch.batch.block_number; - let segments = segs - .enumerate() - .map(|(i, data)| { - SegmentV0 { - id: SegmentID { - chunk_id, - segment_number: i as u64, - }, - is_last: i == total - 1, // extra info overhead is much smaller than max_block_size - max_segment_size - data_checksum: 0, - checksum: 0, - data: data.to_vec(), - } - }) - .collect::>(); - + pub async fn public_batch(&self, batch_msg: PutBatchInternalDAMessage) -> Result<()> { + let chunk: ChunkV0 = batch_msg.batch.into(); + let segments = chunk.to_segments(self.max_segment_size); for segment in segments { - // TODO record ok segment in order - // TODO segment indexer trait (local file, db, etc) - self.backend.submit(Box::new(segment)).await?; + let result = self.backend.submit(segment).await?; + log::info!( + "submitted segment to celestia node, segment_id: {:?}, namespace: {:?}, commitment: {:?}, height: {}", + result.segment_id, + result.namespace, + result.commitment, + result.height, + ); } + Ok(()) } } diff --git a/crates/rooch-da/src/server/celestia/backend.rs b/crates/rooch-da/src/server/celestia/backend.rs index 82d4c99843..ae1df33d3d 100644 --- a/crates/rooch-da/src/server/celestia/backend.rs +++ b/crates/rooch-da/src/server/celestia/backend.rs @@ -32,7 +32,6 @@ impl Backend { } } - // TODO return segment id, height, commitment pub async fn submit(&self, segment: Box) -> Result { let data = segment.to_bytes(); let blob = Blob::new(self.namespace, data).unwrap(); @@ -53,9 +52,8 @@ impl Backend { }), Err(e) => { log::warn!( - "failed to submit segment to celestia node, chunk: {}, segment: {}, commitment: {:?}, error:{:?}", - segment_id.chunk_id, - segment_id.segment_number, + "failed to submit segment to celestia node, segment_id: {:?}, commitment: {:?}, error:{:?}", + segment_id, blob.commitment, e, ); diff --git a/crates/rooch-da/src/server/openda/actor/server.rs b/crates/rooch-da/src/server/openda/actor/server.rs index 1a8a0ca6f9..4db1bcd310 100644 --- a/crates/rooch-da/src/server/openda/actor/server.rs +++ b/crates/rooch-da/src/server/openda/actor/server.rs @@ -12,26 +12,21 @@ use rooch_config::config::retrieve_map_config_value; use std::collections::HashMap; use std::path::Path; -use crate::chunk::DABatchV0; +use crate::chunk::{Chunk, ChunkV0}; use rooch_config::da_config::{DAServerOpenDAConfig, OpenDAScheme}; use crate::messages::PutBatchInternalDAMessage; -use crate::segment::{Segment, SegmentID, SegmentV0}; pub struct DAServerOpenDAActor { max_segment_size: usize, operator: Operator, } -// TODO get request and response -// 1. get by block number -// 2. get by batch hash -// 3. pull by stream -// +pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 4 * 1024 * 1024; +pub const DEFAULT_MAX_RETRY_TIMES: usize = 4; impl Actor for DAServerOpenDAActor {} -// TODO add FEC get for SDC protection (wrong response attacks) impl DAServerOpenDAActor { pub async fn new(cfg: &DAServerOpenDAConfig) -> Result { let mut config = cfg.clone(); @@ -114,55 +109,36 @@ impl DAServerOpenDAActor { }; Ok(Self { - max_segment_size: cfg.max_segment_size.unwrap_or(4 * 1024 * 1024) as usize, + max_segment_size: cfg.max_segment_size.unwrap_or(DEFAULT_MAX_SEGMENT_SIZE) as usize, operator: op, }) } - pub async fn pub_batch(&self, batch: PutBatchInternalDAMessage) -> Result<()> { - // TODO using chunk builder to make segments: - // 1. persist batch into buffer then return ok - // 2. collect batch for better compression ratio - // 3. split chunk into segments - // 4. submit segments to celestia node - // 5. record segment id in order - // 6. clean up batch buffer - - // TODO more chunk version supports - let chunk = DABatchV0 { - version: 0, - block_number: batch.batch.block_number, - batch_hash: batch.batch.batch_hash, - data: batch.batch.data, - }; - let chunk_bytes = bcs::to_bytes(&chunk).unwrap(); - let segs = chunk_bytes.chunks(self.max_segment_size); - let total = segs.len(); - - // TODO explain why block number is a good idea: easy to get next block number for segments, then we could request chunk by block number - let chunk_id = batch.batch.block_number; - let segments = segs - .enumerate() - .map(|(i, data)| { - SegmentV0 { - id: SegmentID { - chunk_id, - segment_number: i as u64, - }, - is_last: i == total - 1, // extra info overhead is much smaller than max_block_size - max_segment_size - data_checksum: 0, - checksum: 0, - data: data.to_vec(), - } - }) - .collect::>(); - + pub async fn pub_batch(&self, batch_msg: PutBatchInternalDAMessage) -> Result<()> { + let chunk: ChunkV0 = batch_msg.batch.into(); + let segments = chunk.to_segments(self.max_segment_size); for segment in segments { let bytes = segment.to_bytes(); - - // TODO record ok segment in order - // TODO segment indexer trait (local file, db, etc) - self.operator.write(&segment.id.to_string(), bytes).await?; // TODO retry logic + match self + .operator + .write(&segment.get_id().to_string(), bytes) + .await + { + Ok(_) => { + log::info!( + "submitted segment to open-da node, segment: {:?}", + segment.get_id(), + ); + } + Err(e) => { + log::warn!( + "failed to submit segment to open-da node, segment_id: {:?}, error:{:?}", + segment.get_id(), + e, + ); + return Err(e.into()); + } + } } Ok(()) @@ -191,7 +167,7 @@ async fn new_retry_operator( max_retry_times: Option, ) -> Result { let mut op = Operator::via_map(scheme, config)?; - let max_times = max_retry_times.unwrap_or(4); + let max_times = max_retry_times.unwrap_or(DEFAULT_MAX_RETRY_TIMES); op = op .layer(RetryLayer::new().with_max_times(max_times)) .layer(LoggingLayer::default()); diff --git a/crates/rooch-proposer/src/actor/proposer.rs b/crates/rooch-proposer/src/actor/proposer.rs index 7128308d2c..7dce391c32 100644 --- a/crates/rooch-proposer/src/actor/proposer.rs +++ b/crates/rooch-proposer/src/actor/proposer.rs @@ -76,7 +76,7 @@ impl Handler for ProposerActor { log::debug!("[ProposeBlock] no transaction to propose block"); } }; - //TODO submit to the on-chain SCC contract use the proposer key + // TODO submit to the on-chain SCC contract use the proposer key let _proposer_key = &self.proposer_key; let batch_size = block.map(|v| v.batch_size).unwrap_or(0u64); self.metrics diff --git a/crates/rooch-proposer/src/scc/mod.rs b/crates/rooch-proposer/src/scc/mod.rs index 4118e1f2b5..02d23796c1 100644 --- a/crates/rooch-proposer/src/scc/mod.rs +++ b/crates/rooch-proposer/src/scc/mod.rs @@ -80,12 +80,14 @@ impl StateCommitmentChain { // submit batch to DA server // TODO move batch submit out of proposer let batch_data: Vec = self.buffer.iter().flat_map(|tx| tx.tx.encode()).collect(); - // regard batch(tx list) as a blob: easy to check integrity - let batch_hash = h256::sha3_256_of(&batch_data); + let batch_hash = h256::sha2_256_of(&batch_data); if let Err(e) = self .da .submit_batch(Batch { block_number, + tx_count: batch_size, + prev_tx_accumulator_root, + tx_accumulator_root, batch_hash, data: batch_data, }) diff --git a/crates/rooch-types/src/block.rs b/crates/rooch-types/src/block.rs index e0a0d7f17c..30cc44e70b 100644 --- a/crates/rooch-types/src/block.rs +++ b/crates/rooch-types/src/block.rs @@ -9,7 +9,6 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] pub struct Block { /// The index if the block - //TODO should we use the U256? pub block_number: u128, /// How many transactions in the block pub batch_size: u64, diff --git a/moveos/smt/src/jellyfish_merkle/hash.rs b/moveos/smt/src/jellyfish_merkle/hash.rs index c980d30da0..005ee77a6b 100644 --- a/moveos/smt/src/jellyfish_merkle/hash.rs +++ b/moveos/smt/src/jellyfish_merkle/hash.rs @@ -24,11 +24,11 @@ sha256t_hash_newtype! { #[cfg(any(test, feature = "fuzzing"))] impl Arbitrary for SMTNodeHash { type Parameters = (); - type Strategy = BoxedStrategy; - fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy { any::<[u8; 32]>().prop_map(SMTNodeHash::new).boxed() } + + type Strategy = BoxedStrategy; } pub(crate) fn merkle_hash(left: SMTNodeHash, right: SMTNodeHash) -> SMTNodeHash { diff --git a/scripts/deploy_rooch_mainnet.sh b/scripts/deploy_rooch_mainnet.sh new file mode 100644 index 0000000000..f65ae25c80 --- /dev/null +++ b/scripts/deploy_rooch_mainnet.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# Copyright (c) RoochNetwork +# SPDX-License-Identifier: Apache-2.0 + +REF="$1" +BTC_MAIN_RPC_URL="$2" +BTC_MAIN_RPC_PWD="$3" +OPENDA_GCP_MAINNET_BUCKET="$4" +OPENDA_GCP_MAINNET_CREDENTIAL="$5" + +sleep 30 +docker image prune -a -f +docker ps | grep rooch | grep -v faucet | awk '{print $1}' | xargs -r docker stop +docker ps -a | grep rooch | grep -v faucet | awk '{print $1}' | xargs -r docker rm -f +docker pull "ghcr.io/rooch-network/rooch:$REF" +docker run -d --name rooch --restart unless-stopped -v /data:/root -p 6767:6767 -p 9184:9184 -e RUST_BACKTRACE=full "ghcr.io/rooch-network/rooch:$REF" \ + server start -n main \ + --btc-rpc-url "$BTC_MAIN_RPC_URL" \ + --btc-rpc-username rooch-main \ + --btc-rpc-password "$BTC_MAIN_RPC_PWD" \ + --da "{\"internal-da-server\": {\"servers\": [{\"open-da\": {\"scheme\": \"gcs\", \"config\": {\"bucket\": \"$OPENDA_GCP_MAINNET_BUCKET\", \"credential\": \"$OPENDA_GCP_MAINNET_CREDENTIAL\"}}}]}}" diff --git a/scripts/deploy_rooch.sh b/scripts/deploy_rooch_testnet.sh similarity index 100% rename from scripts/deploy_rooch.sh rename to scripts/deploy_rooch_testnet.sh