Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(DA): refine DA stream define and implementation #2652

Merged
merged 7 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions .github/workflows/deploy_mainnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ on:
workflow_dispatch:
inputs:
tagName:
description: 'Tag Name to Checkout'
description: 'Tag or branch to deploy'
default: 'main'
required: true
default: 'latest'
# workflow_run:
# workflows: ["Build Docker And Deploy Seed"]
# types:
# - completed

jobs:
deploy-rooch-mainnet:
name: deploy rooch mainnet
name: Deploy Rooch Mainnet
runs-on: self-hosted
steps:
- name: Deploy to GCP MAINNET VM
Expand All @@ -26,5 +26,11 @@ jobs:
chmod 600 private_key.pem
sudo apt update
sudo apt install -y --no-install-recommends openssh-server
ssh -o StrictHostKeyChecking=no -i private_key.pem $USER@$HOST bash -c "'sleep 30' && && docker image prune -a -f && docker ps | grep rooch | awk '{print \$1}' | xargs -r docker stop && docker ps -a | grep rooch | awk '{print \$1}' | xargs -r docker rm -f && docker pull 'ghcr.io/rooch-network/rooch:${{ github.event.inputs.tagName }}' && docker run -d -v /data:/root -p 6767:6767 'ghcr.io/rooch-network/rooch:${{ github.event.inputs.tagName }}' server start -n main --btc-rpc-url '${{secrets.BTC_MAIN_RPC_URL}}' --btc-rpc-username rooch-main --btc-rpc-password '${{secrets.BTC_MAIN_RPC_PWD}}' --btc-start-block-height 0 --btc-end-block-height 767420 --data-import-mode 1"

BRANCH=$(basename ${{ github.ref }})
ssh -o StrictHostKeyChecking=no -i private_key.pem $USER@$HOST \
"cd /root/rooch && git fetch origin && git checkout -B $BRANCH origin/$BRANCH || git checkout $BRANCH && bash scripts/deploy_rooch_mainnet.sh \
'${{ env.REF }}' \
'${{ secrets.BTC_MAIN_RPC_URL }}' \
'${{ secrets.BTC_MAIN_RPC_PWD }}' \
'${{ secrets.OPENDA_GCP_MAINNET_BUCKET }}' \
'${{ secrets.OPENDA_GCP_MAINNET_CREDENTIAL }}'"
2 changes: 1 addition & 1 deletion .github/workflows/deploy_testnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
sudo apt install -y --no-install-recommends openssh-server
BRANCH=$(basename ${{ github.ref }})
ssh -o StrictHostKeyChecking=no -i private_key.pem $USER@$HOST \
"cd /root/rooch && git fetch origin && git checkout -B $BRANCH origin/$BRANCH || git checkout $BRANCH && bash scripts/deploy_rooch.sh \
"cd /root/rooch && git fetch origin && git checkout -B $BRANCH origin/$BRANCH || git checkout $BRANCH && bash scripts/deploy_rooch_testnet.sh \
'${{ env.REF }}' \
'${{ secrets.BTC_TEST_RPC_URL }}' \
'${{ secrets.BTC_TEST_RPC_PWD }}' \
Expand Down
20 changes: 15 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ uuid = { version = "1.10.0", features = ["v4", "fast-rng"] }
protobuf = { version = "2.28", features = ["with-bytes"] }
redb = { version = "2.1.1" }
rocksdb = { git = "https://github.com/rooch-network/rust-rocksdb.git", rev = "41d102327ba3cf9a2335d1192e8312c92bc3d6f9", features = ["lz4"] }
lz4 = { version = "1.27.0" }

ripemd = { version = "0.1.3" }
fastcrypto-zkp = { version = "0.1.3" }
Expand Down
1 change: 1 addition & 0 deletions crates/rooch-da/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ opendal = { workspace = true }
rooch-config = { workspace = true }
serde_yaml = { workspace = true }
xxhash-rust = { workspace = true, features = ["xxh3"] }
lz4 = { workspace = true }
32 changes: 32 additions & 0 deletions crates/rooch-da/docs/stream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
DA Stream
====

DA Stream is a continuous flow of data from sequencer to verifier. It is a sequence of DA Batch.

## Batch

A batch is a collection of transactions. It is the unit of data flow in DA Stream.

Each batch maps to a L2 block.

## Chunk

A chunk is a collection of DA Batch for better compression ratio.

Components:

- Chunk Header: Version, Chunk Number, Chunk Checksum
- Chunk Body: One or more DA batch after compression

Version of chunk determines:

1. chunk format: serialization/deserialization, compression algorithm of chunk body
2. segment format

### Segment

Segment consists of chunk bytes split by a certain size.

Segment is the unit submitted to DA backend, designed to comply with the block size restrictions of the DA backend.

Version of segment inherits from chunk version.
21 changes: 21 additions & 0 deletions crates/rooch-da/docs/todo.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
TODO
===

## Chunk Builder

Chunk Builder is a component to build chunks from batches, avoiding burst I/O to DA backend.

1. Persist batch into buffer(local persistence layer or other faster media) first, then return ok(if high-performance is
preferred).
2. Split chunk into segments and submit segments to DA backend asynchronously.
3. Clean up batch buffer after segments being submitted to DA backend schedule.

## DA Proxy

### Get API

1. get by block number
2. get by batch hash
3. pull by stream

add FEC for SDC protection (wrong response attacks)
1 change: 0 additions & 1 deletion crates/rooch-da/src/actor/da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ impl Actor for DAActor {}
impl DAActor {
pub async fn new(da_config: DAConfig, actor_system: &ActorSystem) -> Result<Self> {
// internal servers

let mut servers: Vec<Arc<dyn DAServerProxy + Send + Sync>> = Vec::new();
let mut submit_threshold = 1;
let mut success_count = 0;
Expand Down
211 changes: 198 additions & 13 deletions crates/rooch-da/src/chunk.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,202 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use moveos_types::h256::H256;
use serde::Serialize;

// DABatchV0 is the first version of the batch inside a chunk, each batch is a chunk
#[derive(Serialize, Debug, PartialEq, Clone)]
pub struct DABatchV0 {
pub version: u8,
// each batch maps to a L2 block
pub block_number: u128,
// sha3_256 hash of the batch data
pub batch_hash: H256,
// encoded tx list
pub data: Vec<u8>,
use crate::messages::Batch;
use crate::segment::{Segment, SegmentID, SegmentV0};
use lz4::EncoderBuilder;
use serde::{Deserialize, Serialize};
use std::io;

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub enum ChunkVersion {
V0,
Unknown(u8),
}

impl From<u8> for ChunkVersion {
fn from(num: u8) -> Self {
match num {
0 => ChunkVersion::V0,
// ...
_ => Self::Unknown(num),
}
}
}

impl From<ChunkVersion> for u8 {
fn from(version: ChunkVersion) -> Self {
match version {
ChunkVersion::V0 => 0,
ChunkVersion::Unknown(num) => num,
}
}
}

pub trait Chunk {
fn to_bytes(&self) -> Vec<u8>;
fn get_version(&self) -> ChunkVersion;
fn to_segments(&self, max_segment_size: usize) -> Vec<Box<dyn Segment>>;
fn get_batch(&self) -> Batch;
}

// ChunkV0:
// 1. each chunk maps to a batch
// 2. batch_data compressed by lz4
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
pub struct ChunkV0 {
pub version: ChunkVersion,
pub batch: Batch,
}

impl From<Batch> for ChunkV0 {
fn from(batch: Batch) -> Self {
Self {
version: ChunkVersion::V0,
batch: Batch {
block_number: batch.block_number,
tx_count: batch.tx_count,
prev_tx_accumulator_root: batch.prev_tx_accumulator_root,
tx_accumulator_root: batch.tx_accumulator_root,
batch_hash: batch.batch_hash,
data: batch.data,
},
}
}
}

impl Chunk for ChunkV0 {
fn to_bytes(&self) -> Vec<u8> {
let mut compressed_bytes = Vec::new();

{
let mut encoder = EncoderBuilder::new().build(&mut compressed_bytes).unwrap();
bcs::serialize_into(&mut encoder, self).unwrap();
let (_output, result) = encoder.finish();
result.unwrap();
}

compressed_bytes
}

fn get_version(&self) -> ChunkVersion {
ChunkVersion::V0
}

fn to_segments(&self, max_segment_size: usize) -> Vec<Box<dyn Segment>> {
let bytes = self.to_bytes();
let segments_data = bytes.chunks(max_segment_size);
let segments_count = segments_data.len();

let chunk_id = self.batch.block_number;
segments_data
.enumerate()
.map(|(i, data)| {
Box::new(SegmentV0 {
id: SegmentID {
chunk_id,
segment_number: i as u64,
},
is_last: i == segments_count - 1, // extra info overhead is much smaller than max_block_size - max_segment_size
// *_checksum will be filled in to_bytes method of Segment
data_checksum: 0,
checksum: 0,
data: data.to_vec(),
}) as Box<dyn Segment>
})
.collect::<Vec<_>>()
}

fn get_batch(&self) -> Batch {
self.batch.clone()
}
}

pub fn chunk_from_segments(segments: Vec<Box<dyn Segment>>) -> anyhow::Result<Box<dyn Chunk>> {
if segments.is_empty() {
return Err(anyhow::anyhow!("empty segments"));
}
// check all segments have the same version
let versions = segments
.iter()
.map(|segment| segment.get_version())
.collect::<Vec<_>>();
let version = versions.first().unwrap();
if versions.iter().any(|seg_version| *seg_version != *version) {
return Err(anyhow::anyhow!("inconsistent segment versions"));
}
// check last segment.is_last == true, others must be false
if let Some(last_segment) = segments.last() {
if last_segment.is_last() {
if segments
.iter()
.take(segments.len() - 1)
.any(|segment| segment.is_last())
{
return Err(anyhow::anyhow!("inconsistent is_last"));
}
} else {
return Err(anyhow::anyhow!("missing last segments"));
}
}
// check all segments have the same chunk_id, segment_number starts from 0 and increments by 1
let chunk_id = segments.first().unwrap().get_id().chunk_id;
if segments.iter().enumerate().any(|(i, segment)| {
segment.get_id()
!= SegmentID {
chunk_id,
segment_number: i as u64,
}
}) {
return Err(anyhow::anyhow!("inconsistent segment ids"));
}

match version {
ChunkVersion::V0 => Ok(Box::new(ChunkV0::from_segments(segments)?)),
// ...
ChunkVersion::Unknown(_) => Err(anyhow::anyhow!("unsupported segment version")),
}
}

impl ChunkV0 {
pub fn from_segments(segments: Vec<Box<dyn Segment>>) -> anyhow::Result<Self> {
let bytes = segments
.iter()
.flat_map(|segment| segment.get_data())
.collect::<Vec<_>>();

let decoder = lz4::Decoder::new(&bytes[..])?;
let mut decompressed_reader = io::BufReader::new(decoder);
let chunk: ChunkV0 = bcs::from_reader(&mut decompressed_reader)?;
Ok(chunk)
}
}

#[cfg(test)]
mod tests {
use super::*;
use moveos_types::h256;

#[test]
fn test_chunk_v0() {
let batch = Batch {
block_number: 1,
tx_count: 1,
prev_tx_accumulator_root: Default::default(),
tx_accumulator_root: Default::default(),
batch_hash: h256::sha2_256_of(&[1, 2, 3, 4, 5]),
data: vec![1, 2, 3, 4, 5],
};

let chunk = ChunkV0::from(batch.clone());
let segments = chunk.to_segments(3);
assert_eq!(segments.len(), 39);

let chunk = chunk_from_segments(segments).unwrap();
assert_eq!(chunk.get_batch(), batch);

assert_eq!(
chunk.get_batch().batch_hash,
h256::sha2_256_of(&[1, 2, 3, 4, 5])
);
}
}
Loading
Loading