Skip to content

Commit

Permalink
refactor(DA): refine DA stream define and implementation (#2652)
Browse files Browse the repository at this point in the history
1. clarify stream, batch, l2 block, chunk ,segment's relations
2. implement necessary methods of chunk and segment
3. refine code logic of open-da/celestia submit
4. refine mainet deploy workflow
  • Loading branch information
popcnt1 authored Sep 19, 2024
1 parent 652c31e commit 0c15e33
Show file tree
Hide file tree
Showing 21 changed files with 389 additions and 182 deletions.
16 changes: 11 additions & 5 deletions .github/workflows/deploy_mainnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ on:
workflow_dispatch:
inputs:
tagName:
description: 'Tag Name to Checkout'
description: 'Tag or branch to deploy'
default: 'main'
required: true
default: 'latest'
# workflow_run:
# workflows: ["Build Docker And Deploy Seed"]
# types:
# - completed

jobs:
deploy-rooch-mainnet:
name: deploy rooch mainnet
name: Deploy Rooch Mainnet
runs-on: self-hosted
steps:
- name: Deploy to GCP MAINNET VM
Expand All @@ -26,5 +26,11 @@ jobs:
chmod 600 private_key.pem
sudo apt update
sudo apt install -y --no-install-recommends openssh-server
ssh -o StrictHostKeyChecking=no -i private_key.pem $USER@$HOST bash -c "'sleep 30' && && docker image prune -a -f && docker ps | grep rooch | awk '{print \$1}' | xargs -r docker stop && docker ps -a | grep rooch | awk '{print \$1}' | xargs -r docker rm -f && docker pull 'ghcr.io/rooch-network/rooch:${{ github.event.inputs.tagName }}' && docker run -d -v /data:/root -p 6767:6767 'ghcr.io/rooch-network/rooch:${{ github.event.inputs.tagName }}' server start -n main --btc-rpc-url '${{secrets.BTC_MAIN_RPC_URL}}' --btc-rpc-username rooch-main --btc-rpc-password '${{secrets.BTC_MAIN_RPC_PWD}}' --btc-start-block-height 0 --btc-end-block-height 767420 --data-import-mode 1"
BRANCH=$(basename ${{ github.ref }})
ssh -o StrictHostKeyChecking=no -i private_key.pem $USER@$HOST \
"cd /root/rooch && git fetch origin && git checkout -B $BRANCH origin/$BRANCH || git checkout $BRANCH && bash scripts/deploy_rooch_mainnet.sh \
'${{ env.REF }}' \
'${{ secrets.BTC_MAIN_RPC_URL }}' \
'${{ secrets.BTC_MAIN_RPC_PWD }}' \
'${{ secrets.OPENDA_GCP_MAINNET_BUCKET }}' \
'${{ secrets.OPENDA_GCP_MAINNET_CREDENTIAL }}'"
2 changes: 1 addition & 1 deletion .github/workflows/deploy_testnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
sudo apt install -y --no-install-recommends openssh-server
BRANCH=$(basename ${{ github.ref }})
ssh -o StrictHostKeyChecking=no -i private_key.pem $USER@$HOST \
"cd /root/rooch && git fetch origin && git checkout -B $BRANCH origin/$BRANCH || git checkout $BRANCH && bash scripts/deploy_rooch.sh \
"cd /root/rooch && git fetch origin && git checkout -B $BRANCH origin/$BRANCH || git checkout $BRANCH && bash scripts/deploy_rooch_testnet.sh \
'${{ env.REF }}' \
'${{ secrets.BTC_TEST_RPC_URL }}' \
'${{ secrets.BTC_TEST_RPC_PWD }}' \
Expand Down
20 changes: 15 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ uuid = { version = "1.10.0", features = ["v4", "fast-rng"] }
protobuf = { version = "2.28", features = ["with-bytes"] }
redb = { version = "2.1.1" }
rocksdb = { git = "https://github.com/rooch-network/rust-rocksdb.git", rev = "41d102327ba3cf9a2335d1192e8312c92bc3d6f9", features = ["lz4"] }
lz4 = { version = "1.27.0" }

ripemd = { version = "0.1.3" }
fastcrypto-zkp = { version = "0.1.3" }
Expand Down
1 change: 1 addition & 0 deletions crates/rooch-da/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ opendal = { workspace = true }
rooch-config = { workspace = true }
serde_yaml = { workspace = true }
xxhash-rust = { workspace = true, features = ["xxh3"] }
lz4 = { workspace = true }
32 changes: 32 additions & 0 deletions crates/rooch-da/docs/stream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
DA Stream
====

DA Stream is a continuous flow of data from sequencer to verifier. It is a sequence of DA Batch.

## Batch

A batch is a collection of transactions. It is the unit of data flow in DA Stream.

Each batch maps to a L2 block.

## Chunk

A chunk is a collection of DA Batch for better compression ratio.

Components:

- Chunk Header: Version, Chunk Number, Chunk Checksum
- Chunk Body: One or more DA batch after compression

Version of chunk determines:

1. chunk format: serialization/deserialization, compression algorithm of chunk body
2. segment format

### Segment

Segment consists of chunk bytes split by a certain size.

Segment is the unit submitted to DA backend, designed to comply with the block size restrictions of the DA backend.

Version of segment inherits from chunk version.
21 changes: 21 additions & 0 deletions crates/rooch-da/docs/todo.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
TODO
===

## Chunk Builder

Chunk Builder is a component to build chunks from batches, avoiding burst I/O to DA backend.

1. Persist batch into buffer(local persistence layer or other faster media) first, then return ok(if high-performance is
preferred).
2. Split chunk into segments and submit segments to DA backend asynchronously.
3. Clean up batch buffer after segments being submitted to DA backend schedule.

## DA Proxy

### Get API

1. get by block number
2. get by batch hash
3. pull by stream

add FEC for SDC protection (wrong response attacks)
1 change: 0 additions & 1 deletion crates/rooch-da/src/actor/da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ impl Actor for DAActor {}
impl DAActor {
pub async fn new(da_config: DAConfig, actor_system: &ActorSystem) -> Result<Self> {
// internal servers

let mut servers: Vec<Arc<dyn DAServerProxy + Send + Sync>> = Vec::new();
let mut submit_threshold = 1;
let mut success_count = 0;
Expand Down
211 changes: 198 additions & 13 deletions crates/rooch-da/src/chunk.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,202 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use moveos_types::h256::H256;
use serde::Serialize;

// DABatchV0 is the first version of the batch inside a chunk, each batch is a chunk
#[derive(Serialize, Debug, PartialEq, Clone)]
pub struct DABatchV0 {
pub version: u8,
// each batch maps to a L2 block
pub block_number: u128,
// sha3_256 hash of the batch data
pub batch_hash: H256,
// encoded tx list
pub data: Vec<u8>,
use crate::messages::Batch;
use crate::segment::{Segment, SegmentID, SegmentV0};
use lz4::EncoderBuilder;
use serde::{Deserialize, Serialize};
use std::io;

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub enum ChunkVersion {
V0,
Unknown(u8),
}

impl From<u8> for ChunkVersion {
fn from(num: u8) -> Self {
match num {
0 => ChunkVersion::V0,
// ...
_ => Self::Unknown(num),
}
}
}

impl From<ChunkVersion> for u8 {
fn from(version: ChunkVersion) -> Self {
match version {
ChunkVersion::V0 => 0,
ChunkVersion::Unknown(num) => num,
}
}
}

pub trait Chunk {
fn to_bytes(&self) -> Vec<u8>;
fn get_version(&self) -> ChunkVersion;
fn to_segments(&self, max_segment_size: usize) -> Vec<Box<dyn Segment>>;
fn get_batch(&self) -> Batch;
}

// ChunkV0:
// 1. each chunk maps to a batch
// 2. batch_data compressed by lz4
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
pub struct ChunkV0 {
pub version: ChunkVersion,
pub batch: Batch,
}

impl From<Batch> for ChunkV0 {
fn from(batch: Batch) -> Self {
Self {
version: ChunkVersion::V0,
batch: Batch {
block_number: batch.block_number,
tx_count: batch.tx_count,
prev_tx_accumulator_root: batch.prev_tx_accumulator_root,
tx_accumulator_root: batch.tx_accumulator_root,
batch_hash: batch.batch_hash,
data: batch.data,
},
}
}
}

impl Chunk for ChunkV0 {
fn to_bytes(&self) -> Vec<u8> {
let mut compressed_bytes = Vec::new();

{
let mut encoder = EncoderBuilder::new().build(&mut compressed_bytes).unwrap();
bcs::serialize_into(&mut encoder, self).unwrap();
let (_output, result) = encoder.finish();
result.unwrap();
}

compressed_bytes
}

fn get_version(&self) -> ChunkVersion {
ChunkVersion::V0
}

fn to_segments(&self, max_segment_size: usize) -> Vec<Box<dyn Segment>> {
let bytes = self.to_bytes();
let segments_data = bytes.chunks(max_segment_size);
let segments_count = segments_data.len();

let chunk_id = self.batch.block_number;
segments_data
.enumerate()
.map(|(i, data)| {
Box::new(SegmentV0 {
id: SegmentID {
chunk_id,
segment_number: i as u64,
},
is_last: i == segments_count - 1, // extra info overhead is much smaller than max_block_size - max_segment_size
// *_checksum will be filled in to_bytes method of Segment
data_checksum: 0,
checksum: 0,
data: data.to_vec(),
}) as Box<dyn Segment>
})
.collect::<Vec<_>>()
}

fn get_batch(&self) -> Batch {
self.batch.clone()
}
}

pub fn chunk_from_segments(segments: Vec<Box<dyn Segment>>) -> anyhow::Result<Box<dyn Chunk>> {
if segments.is_empty() {
return Err(anyhow::anyhow!("empty segments"));
}
// check all segments have the same version
let versions = segments
.iter()
.map(|segment| segment.get_version())
.collect::<Vec<_>>();
let version = versions.first().unwrap();
if versions.iter().any(|seg_version| *seg_version != *version) {
return Err(anyhow::anyhow!("inconsistent segment versions"));
}
// check last segment.is_last == true, others must be false
if let Some(last_segment) = segments.last() {
if last_segment.is_last() {
if segments
.iter()
.take(segments.len() - 1)
.any(|segment| segment.is_last())
{
return Err(anyhow::anyhow!("inconsistent is_last"));
}
} else {
return Err(anyhow::anyhow!("missing last segments"));
}
}
// check all segments have the same chunk_id, segment_number starts from 0 and increments by 1
let chunk_id = segments.first().unwrap().get_id().chunk_id;
if segments.iter().enumerate().any(|(i, segment)| {
segment.get_id()
!= SegmentID {
chunk_id,
segment_number: i as u64,
}
}) {
return Err(anyhow::anyhow!("inconsistent segment ids"));
}

match version {
ChunkVersion::V0 => Ok(Box::new(ChunkV0::from_segments(segments)?)),
// ...
ChunkVersion::Unknown(_) => Err(anyhow::anyhow!("unsupported segment version")),
}
}

impl ChunkV0 {
pub fn from_segments(segments: Vec<Box<dyn Segment>>) -> anyhow::Result<Self> {
let bytes = segments
.iter()
.flat_map(|segment| segment.get_data())
.collect::<Vec<_>>();

let decoder = lz4::Decoder::new(&bytes[..])?;
let mut decompressed_reader = io::BufReader::new(decoder);
let chunk: ChunkV0 = bcs::from_reader(&mut decompressed_reader)?;
Ok(chunk)
}
}

#[cfg(test)]
mod tests {
use super::*;
use moveos_types::h256;

#[test]
fn test_chunk_v0() {
let batch = Batch {
block_number: 1,
tx_count: 1,
prev_tx_accumulator_root: Default::default(),
tx_accumulator_root: Default::default(),
batch_hash: h256::sha2_256_of(&[1, 2, 3, 4, 5]),
data: vec![1, 2, 3, 4, 5],
};

let chunk = ChunkV0::from(batch.clone());
let segments = chunk.to_segments(3);
assert_eq!(segments.len(), 39);

let chunk = chunk_from_segments(segments).unwrap();
assert_eq!(chunk.get_batch(), batch);

assert_eq!(
chunk.get_batch().batch_hash,
h256::sha2_256_of(&[1, 2, 3, 4, 5])
);
}
}
Loading

0 comments on commit 0c15e33

Please sign in to comment.