Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/l1 msg queue #1055

Draft
wants to merge 5 commits into
base: feat/l1-state-tracker
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions core/rawdb/accessors_l1_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,22 @@ func ReadFirstQueueIndexNotInL2Block(db ethdb.Reader, l2BlockHash common.Hash) *
queueIndex := binary.BigEndian.Uint64(data)
return &queueIndex
}

// WriteL1MsgStorageState writes the L1MsgStorage state
func WriteL1MsgStorageState(db ethdb.KeyValueWriter, state []byte) {
if err := db.Put(l1MsgStorageStateKey, state); err != nil {
log.Crit("Failed to update L1MsgStorage state", "err", err)
}
}

// ReadL1MsgStorageState retrieves the L1MsgStorage state
func ReadL1MsgStorageState(db ethdb.Reader) []byte {
data, err := db.Get(l1MsgStorageStateKey)
if err != nil && isNotFoundErr(err) {
return nil
}
if err != nil {
log.Crit("Failed to read highest synced L1 message queue index from database", "err", err)
}
return data
}
2 changes: 2 additions & 0 deletions core/rawdb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ var (
firstQueueIndexNotInL2BlockPrefix = []byte("q") // firstQueueIndexNotInL2BlockPrefix + L2 block hash -> enqueue index
highestSyncedQueueIndexKey = []byte("HighestSyncedQueueIndex")

l1MsgStorageStateKey = []byte("L1MsgStorageState")

// Scroll rollup event store
rollupEventSyncedL1BlockNumberKey = []byte("R-LastRollupEventSyncedL1BlockNumber")
batchMetaPrefix = []byte("R-bm")
Expand Down
10 changes: 6 additions & 4 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,12 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client l1.Client) (*Ether
eth.syncingPipeline.Start()
}

// initialize and start L1 message sync service
eth.syncService, err = sync_service.NewSyncService(context.Background(), chainConfig, stack.Config(), eth.chainDb, l1Client)
if err != nil {
return nil, fmt.Errorf("cannot initialize L1 sync service: %w", err)
if !config.EnableDASyncing {
// initialize and start L1 message sync service
eth.syncService, err = sync_service.NewSyncService(context.Background(), chainConfig, stack.Config(), eth.chainDb, l1Client)
if err != nil {
return nil, fmt.Errorf("cannot initialize L1 sync service: %w", err)
}
}
eth.syncService.Start()
if config.EnableRollupVerify {
Expand Down
38 changes: 15 additions & 23 deletions rollup/da_syncer/da/calldata_blob_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"github.com/scroll-tech/da-codec/encoding"

"github.com/scroll-tech/go-ethereum/accounts/abi"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors"
"github.com/scroll-tech/go-ethereum/rollup/l1"
Expand All @@ -32,34 +30,28 @@ var (
)

type CalldataBlobSource struct {
ctx context.Context
l1Reader *l1.Reader
blobClient blob_client.BlobClient
l1height uint64
scrollChainABI *abi.ABI
l1CommitBatchEventSignature common.Hash
l1RevertBatchEventSignature common.Hash
l1FinalizeBatchEventSignature common.Hash
db ethdb.Database
ctx context.Context
l1Reader *l1.Reader
blobClient blob_client.BlobClient
l1height uint64
scrollChainABI *abi.ABI
msgStorage *l1.MsgStorage

l1Finalized uint64
}

func NewCalldataBlobSource(ctx context.Context, l1height uint64, l1Reader *l1.Reader, blobClient blob_client.BlobClient, db ethdb.Database) (*CalldataBlobSource, error) {
func NewCalldataBlobSource(ctx context.Context, l1height uint64, l1Reader *l1.Reader, blobClient blob_client.BlobClient, msgStorage *l1.MsgStorage) (*CalldataBlobSource, error) {
scrollChainABI, err := l1.ScrollChainMetaData.GetAbi()
if err != nil {
return nil, fmt.Errorf("failed to get scroll chain abi: %w", err)
}
return &CalldataBlobSource{
ctx: ctx,
l1Reader: l1Reader,
blobClient: blobClient,
l1height: l1height,
scrollChainABI: scrollChainABI,
l1CommitBatchEventSignature: scrollChainABI.Events[commitBatchEventName].ID,
l1RevertBatchEventSignature: scrollChainABI.Events[revertBatchEventName].ID,
l1FinalizeBatchEventSignature: scrollChainABI.Events[finalizeBatchEventName].ID,
db: db,
ctx: ctx,
l1Reader: l1Reader,
blobClient: blobClient,
l1height: l1height,
scrollChainABI: scrollChainABI,
msgStorage: msgStorage,
}, nil
}

Expand Down Expand Up @@ -148,9 +140,9 @@ func (ds *CalldataBlobSource) getCommitBatchDA(commitEvent *l1.CommitBatchEvent)

switch codec.Version() {
case 0:
return NewCommitBatchDAV0(ds.db, codec, commitEvent, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap)
return NewCommitBatchDAV0(ds.msgStorage, codec, commitEvent, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap)
case 1, 2, 3, 4:
return NewCommitBatchDAWithBlob(ds.ctx, ds.db, ds.l1Reader, ds.blobClient, codec, commitEvent, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap)
return NewCommitBatchDAWithBlob(ds.ctx, ds.msgStorage, ds.l1Reader, ds.blobClient, codec, commitEvent, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap)
default:
return nil, fmt.Errorf("failed to decode DA, codec version is unknown: codec version: %d", args.Version)
}
Expand Down
14 changes: 6 additions & 8 deletions rollup/da_syncer/da/commitV0.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ import (

"github.com/scroll-tech/da-codec/encoding"

"github.com/scroll-tech/go-ethereum/core/rawdb"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors"
"github.com/scroll-tech/go-ethereum/rollup/l1"
)
Expand All @@ -24,7 +22,7 @@ type CommitBatchDAV0 struct {
l1BlockNumber uint64
}

func NewCommitBatchDAV0(db ethdb.Database,
func NewCommitBatchDAV0(msgStorage *l1.MsgStorage,
codec encoding.Codec,
commitEvent *l1.CommitBatchEvent,
parentBatchHeader []byte,
Expand All @@ -36,10 +34,10 @@ func NewCommitBatchDAV0(db ethdb.Database,
return nil, fmt.Errorf("failed to unpack chunks: %d, err: %w", commitEvent.BatchIndex().Uint64(), err)
}

return NewCommitBatchDAV0WithChunks(db, uint8(codec.Version()), commitEvent.BatchIndex().Uint64(), parentBatchHeader, decodedChunks, skippedL1MessageBitmap, commitEvent.BlockNumber())
return NewCommitBatchDAV0WithChunks(msgStorage, uint8(codec.Version()), commitEvent.BatchIndex().Uint64(), parentBatchHeader, decodedChunks, skippedL1MessageBitmap, commitEvent.BlockNumber())
}

func NewCommitBatchDAV0WithChunks(db ethdb.Database,
func NewCommitBatchDAV0WithChunks(msgStorage *l1.MsgStorage,
version uint8,
batchIndex uint64,
parentBatchHeader []byte,
Expand All @@ -48,7 +46,7 @@ func NewCommitBatchDAV0WithChunks(db ethdb.Database,
l1BlockNumber uint64,
) (*CommitBatchDAV0, error) {
parentTotalL1MessagePopped := getBatchTotalL1MessagePopped(parentBatchHeader)
l1Txs, err := getL1Messages(db, parentTotalL1MessagePopped, skippedL1MessageBitmap, getTotalMessagesPoppedFromChunks(decodedChunks))
l1Txs, err := getL1Messages(msgStorage, parentTotalL1MessagePopped, skippedL1MessageBitmap, getTotalMessagesPoppedFromChunks(decodedChunks))
if err != nil {
return nil, fmt.Errorf("failed to get L1 messages for v0 batch %d: %w", batchIndex, err)
}
Expand Down Expand Up @@ -138,7 +136,7 @@ func getTotalMessagesPoppedFromChunks(decodedChunks []*encoding.DAChunkRawTx) in
return totalL1MessagePopped
}

func getL1Messages(db ethdb.Database, parentTotalL1MessagePopped uint64, skippedBitmap []byte, totalL1MessagePopped int) ([]*types.L1MessageTx, error) {
func getL1Messages(msgStorage *l1.MsgStorage, parentTotalL1MessagePopped uint64, skippedBitmap []byte, totalL1MessagePopped int) ([]*types.L1MessageTx, error) {
var txs []*types.L1MessageTx

decodedSkippedBitmap, err := encoding.DecodeBitmap(skippedBitmap, totalL1MessagePopped)
Expand All @@ -153,7 +151,7 @@ func getL1Messages(db ethdb.Database, parentTotalL1MessagePopped uint64, skipped
currentIndex++
continue
}
l1Tx := rawdb.ReadL1Message(db, currentIndex)
l1Tx := msgStorage.ReadL1Message(currentIndex)
if l1Tx == nil {
// message not yet available
// we return serrors.EOFError as this will be handled in the syncing pipeline with a backoff and retry
Expand Down
5 changes: 2 additions & 3 deletions rollup/da_syncer/da/commitV1.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ import (

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/crypto/kzg4844"
"github.com/scroll-tech/go-ethereum/ethdb"
)

type CommitBatchDAV1 struct {
*CommitBatchDAV0
}

func NewCommitBatchDAWithBlob(ctx context.Context, db ethdb.Database,
func NewCommitBatchDAWithBlob(ctx context.Context, msgStorage *l1.MsgStorage,
l1Reader *l1.Reader,
blobClient blob_client.BlobClient,
codec encoding.Codec,
Expand Down Expand Up @@ -70,7 +69,7 @@ func NewCommitBatchDAWithBlob(ctx context.Context, db ethdb.Database,
return nil, fmt.Errorf("decodedChunks is nil after decoding")
}

v0, err := NewCommitBatchDAV0WithChunks(db, uint8(codec.Version()), commitEvent.BatchIndex().Uint64(), parentBatchHeader, decodedChunks, skippedL1MessageBitmap, commitEvent.BlockNumber())
v0, err := NewCommitBatchDAV0WithChunks(msgStorage, uint8(codec.Version()), commitEvent.BatchIndex().Uint64(), parentBatchHeader, decodedChunks, skippedL1MessageBitmap, commitEvent.BlockNumber())
if err != nil {
return nil, err
}
Expand Down
9 changes: 4 additions & 5 deletions rollup/da_syncer/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"

"github.com/scroll-tech/go-ethereum/core"
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/params"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/da"
Expand All @@ -22,22 +21,22 @@ type DataSourceFactory struct {
genesisConfig *params.ChainConfig
l1Reader *l1.Reader
blobClient blob_client.BlobClient
db ethdb.Database
msgStorage *l1.MsgStorage
}

func NewDataSourceFactory(blockchain *core.BlockChain, genesisConfig *params.ChainConfig, config Config, l1Reader *l1.Reader, blobClient blob_client.BlobClient, db ethdb.Database) *DataSourceFactory {
func NewDataSourceFactory(blockchain *core.BlockChain, genesisConfig *params.ChainConfig, config Config, l1Reader *l1.Reader, msgStorage *l1.MsgStorage, blobClient blob_client.BlobClient) *DataSourceFactory {
return &DataSourceFactory{
config: config,
genesisConfig: genesisConfig,
l1Reader: l1Reader,
blobClient: blobClient,
db: db,
msgStorage: msgStorage,
}
}

func (ds *DataSourceFactory) OpenDataSource(ctx context.Context, l1height uint64) (DataSource, error) {
if ds.config.FetcherMode == L1RPC {
return da.NewCalldataBlobSource(ctx, l1height, ds.l1Reader, ds.blobClient, ds.db)
return da.NewCalldataBlobSource(ctx, l1height, ds.l1Reader, ds.blobClient, ds.msgStorage)
} else {
return nil, errors.New("snapshot_data_source: not implemented")
}
Expand Down
43 changes: 42 additions & 1 deletion rollup/da_syncer/syncing_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
"github.com/scroll-tech/go-ethereum/rollup/l1"
)

const (
defaultPruneInterval = 30 * time.Second
)

// Config is the configuration parameters of data availability syncing.
type Config struct {
FetcherMode FetcherMode // mode of fetcher
Expand All @@ -36,6 +40,8 @@ type SyncingPipeline struct {
expBackoff *backoff.Exponential

l1DeploymentBlock uint64
l1Tracker *l1.Tracker
msgStorage *l1.MsgStorage

db ethdb.Database
blockchain *core.BlockChain
Expand All @@ -51,6 +57,16 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi
if err != nil {
return nil, fmt.Errorf("failed to initialize l1.Reader, err = %w", err)
}
l1DeploymentBlockHeader, err := l1Reader.FetchBlockHeaderByNumber(l1DeploymentBlock)
if err != nil {
return nil, fmt.Errorf("reader failed to fetch block header by number, err = %w", err)
}
l1Tracker := l1.NewTracker(ctx, ethClient, blockchain.Genesis().Hash())

msgStorage, err := l1.NewMsgStorage(ctx, l1Tracker, l1Reader, db, l1.LatestChainHead, l1DeploymentBlockHeader)
if err != nil {
return nil, fmt.Errorf("failed to initialize msg storage, err = %w", err)
}

blobClientList := blob_client.NewBlobClients()
if config.BeaconNodeAPIEndpoint != "" {
Expand All @@ -71,7 +87,7 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi
return nil, errors.New("DA syncing is enabled but no blob client is configured. Please provide at least one blob client via command line flag")
}

dataSourceFactory := NewDataSourceFactory(blockchain, genesisConfig, config, l1Reader, blobClientList, db)
dataSourceFactory := NewDataSourceFactory(blockchain, genesisConfig, config, l1Reader, msgStorage, blobClientList)
syncedL1Height := l1DeploymentBlock - 1
from := rawdb.ReadDASyncedL1BlockNumber(db)
if from != nil {
Expand All @@ -90,6 +106,8 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi
expBackoff: backoff.NewExponential(100*time.Millisecond, 10*time.Second, 100*time.Millisecond),
wg: sync.WaitGroup{},
l1DeploymentBlock: l1DeploymentBlock,
l1Tracker: l1Tracker,
msgStorage: msgStorage,
db: db,
blockchain: blockchain,
blockQueue: blockQueue,
Expand All @@ -109,6 +127,9 @@ func (s *SyncingPipeline) Step() error {
func (s *SyncingPipeline) Start() {
log.Info("sync from DA: starting pipeline")

s.msgStorage.Start()
s.l1Tracker.Start()

s.wg.Add(1)
go func() {
s.mainLoop()
Expand All @@ -122,6 +143,9 @@ func (s *SyncingPipeline) mainLoop() {
var resetCounter int
var tempErrorCounter int

l1msgPruneTicker := time.NewTicker(defaultPruneInterval)
defer l1msgPruneTicker.Stop()

// reqStep is a helper function to request a step to be executed.
// If delay is true, it will request a delayed step with exponential backoff, otherwise it will request an immediate step.
reqStep := func(delay bool) {
Expand Down Expand Up @@ -150,6 +174,21 @@ func (s *SyncingPipeline) mainLoop() {
return
default:
}
select {
case <-s.ctx.Done():
return
case <-l1msgPruneTicker.C:
nextQueueIndex := rawdb.ReadFirstQueueIndexNotInL2Block(s.db, s.blockchain.CurrentBlock().Hash())
if nextQueueIndex == nil {
// should not happen
log.Warn("FirstQueueIndexNotInL2Block is nil for block", "hash", s.blockchain.CurrentBlock().Hash())
continue
}
if *nextQueueIndex > 0 {
s.msgStorage.PruneMessages(*nextQueueIndex - 1)
}
default:
}

select {
case <-s.ctx.Done():
Expand Down Expand Up @@ -213,6 +252,8 @@ func (s *SyncingPipeline) mainLoop() {
func (s *SyncingPipeline) Stop() {
log.Info("sync from DA: stopping pipeline...")
s.cancel()
s.msgStorage.Stop()
s.l1Tracker.Stop()
s.wg.Wait()
log.Info("sync from DA: stopping pipeline... done")
}
Expand Down
1 change: 0 additions & 1 deletion rollup/l1/abi.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ func (c *CommitBatchEvent) BlockHash() common.Hash {
func (c *CommitBatchEvent) BlockNumber() uint64 {
return c.blockNumber
}

func (c *CommitBatchEvent) CompareTo(other *CommitBatchEvent) int {
return c.batchIndex.Cmp(other.batchIndex)
}
Expand Down
Loading
Loading