Skip to content

Commit

Permalink
feat: introduce custom errors and mark RPC related errors as temporar…
Browse files Browse the repository at this point in the history
…y so that they can be retried in pipeline
  • Loading branch information
jonastheis committed Aug 13, 2024
1 parent f8eadfe commit 8f1f6b7
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 30 deletions.
7 changes: 4 additions & 3 deletions rollup/da_syncer/blob_client/blob_client_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package blob_client

import (
"context"
"errors"
"fmt"
"io"

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/crypto/kzg4844"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors"
)

type BlobClientList struct {
Expand Down Expand Up @@ -37,8 +38,8 @@ func (c *BlobClientList) GetBlobByVersionedHash(ctx context.Context, versionedHa
log.Warn("BlobClientList: failed to get blob by versioned hash from BlobClient", "err", err, "blob client pos in BlobClientList", c.curPos)
}

// if we iterated over entire list, return EOF error that will be handled in syncing_pipeline with a backoff and retry
return nil, io.EOF
// if we iterated over entire list, return a temporary error that will be handled in syncing_pipeline with a backoff and retry
return nil, serrors.NewTemporaryError(errors.New("BlobClientList.GetBlobByVersionedHash: failed to get blob by versioned hash from all BlobClients"))
}

func (c *BlobClientList) nextPos() int {
Expand Down
5 changes: 1 addition & 4 deletions rollup/da_syncer/block_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@ func (bq *BlockQueue) getBlocksFromBatch(ctx context.Context) error {
return fmt.Errorf("unexpected type of daEntry: %T", daEntry)
}

bq.blocks, err = entryWithBlocks.Blocks()
if err != nil {
return fmt.Errorf("failed to get blocks from daEntry: %w", err)
}
bq.blocks = entryWithBlocks.Blocks()

return nil
}
Expand Down
19 changes: 11 additions & 8 deletions rollup/da_syncer/da/calldata_blob_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors"
"github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service"
)

Expand Down Expand Up @@ -72,7 +73,7 @@ func (ds *CalldataBlobSource) NextData() (Entries, error) {
if to > ds.l1Finalized {
ds.l1Finalized, err = ds.l1Client.GetLatestFinalizedBlockNumber()
if err != nil {
return nil, fmt.Errorf("failed to query GetLatestFinalizedBlockNumber, error: %v", err)
return nil, serrors.WrapWithTemporary(fmt.Errorf("failed to query GetLatestFinalizedBlockNumber, error: %v", err))
}
// make sure we don't request more than finalized blocks
to = min(to, ds.l1Finalized)
Expand All @@ -84,13 +85,15 @@ func (ds *CalldataBlobSource) NextData() (Entries, error) {

logs, err := ds.l1Client.FetchRollupEventsInRange(ds.l1height, to)
if err != nil {
return nil, fmt.Errorf("cannot get events, l1height: %d, error: %v", ds.l1height, err)
return nil, serrors.WrapWithTemporary(fmt.Errorf("cannot get events, l1height: %d, error: %v", ds.l1height, err))
}
da, err := ds.processLogsToDA(logs)
if err == nil {
ds.l1height = to + 1
if err != nil {
return nil, serrors.WrapWithTemporary(fmt.Errorf("failed to process logs to DA, error: %v", err))
}
return da, err

ds.l1height = to + 1
return da, nil
}

func (ds *CalldataBlobSource) L1Height() uint64 {
Expand Down Expand Up @@ -119,7 +122,7 @@ func (ds *CalldataBlobSource) processLogsToDA(logs []types.Log) (Entries, error)

case ds.l1RevertBatchEventSignature:
event := &rollup_sync_service.L1RevertBatchEvent{}
if err := rollup_sync_service.UnpackLog(ds.scrollChainABI, event, revertBatchEventName, vLog); err != nil {
if err = rollup_sync_service.UnpackLog(ds.scrollChainABI, event, revertBatchEventName, vLog); err != nil {
return nil, fmt.Errorf("failed to unpack revert rollup event log, err: %w", err)
}

Expand All @@ -129,7 +132,7 @@ func (ds *CalldataBlobSource) processLogsToDA(logs []types.Log) (Entries, error)

case ds.l1FinalizeBatchEventSignature:
event := &rollup_sync_service.L1FinalizeBatchEvent{}
if err := rollup_sync_service.UnpackLog(ds.scrollChainABI, event, finalizeBatchEventName, vLog); err != nil {
if err = rollup_sync_service.UnpackLog(ds.scrollChainABI, event, finalizeBatchEventName, vLog); err != nil {
return nil, fmt.Errorf("failed to unpack finalized rollup event log, err: %w", err)
}

Expand Down Expand Up @@ -188,7 +191,7 @@ func (ds *CalldataBlobSource) getCommitBatchDA(batchIndex uint64, vLog *types.Lo

txData, err := ds.l1Client.FetchTxData(vLog)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to fetch tx data, tx hash: %v, err: %w", vLog.TxHash.Hex(), err)
}
if len(txData) < methodIDLength {
return nil, fmt.Errorf("transaction data is too short, length of tx data: %v, minimum length required: %v", len(txData), methodIDLength)
Expand Down
11 changes: 6 additions & 5 deletions rollup/da_syncer/da/commitV0.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package da
import (
"encoding/binary"
"fmt"
"io"

"github.com/scroll-tech/da-codec/encoding"
"github.com/scroll-tech/da-codec/encoding/codecv0"

"github.com/scroll-tech/go-ethereum/core/rawdb"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors"
)

type CommitBatchDAV0 struct {
Expand Down Expand Up @@ -92,7 +92,7 @@ func (c *CommitBatchDAV0) CompareTo(other Entry) int {
return 0
}

func (c *CommitBatchDAV0) Blocks() ([]*PartialBlock, error) {
func (c *CommitBatchDAV0) Blocks() []*PartialBlock {
var blocks []*PartialBlock
l1TxPointer := 0

Expand Down Expand Up @@ -125,7 +125,8 @@ func (c *CommitBatchDAV0) Blocks() ([]*PartialBlock, error) {
blocks = append(blocks, block)
}
}
return blocks, nil

return blocks
}

func getTotalMessagesPoppedFromChunks(decodedChunks []*codecv0.DAChunkRawTx) int {
Expand Down Expand Up @@ -156,8 +157,8 @@ func getL1Messages(db ethdb.Database, parentTotalL1MessagePopped uint64, skipped
l1Tx := rawdb.ReadL1Message(db, currentIndex)
if l1Tx == nil {
// message not yet available
// we return io.EOF as this will be handled in the syncing pipeline with a backoff and retry
return nil, io.EOF
// we return serrors.EOFError as this will be handled in the syncing pipeline with a backoff and retry
return nil, serrors.EOFError
}
txs = append(txs, l1Tx)
currentIndex++
Expand Down
2 changes: 1 addition & 1 deletion rollup/da_syncer/da/da.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Entry interface {

type EntryWithBlocks interface {
Entry
Blocks() ([]*PartialBlock, error)
Blocks() []*PartialBlock
}

type Entries []Entry
Expand Down
5 changes: 4 additions & 1 deletion rollup/da_syncer/da_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"

"github.com/scroll-tech/go-ethereum/rollup/da_syncer/da"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors"
)

// DAQueue is a pipeline stage that reads DA entries from a DataSource and provides them to the next stage.
Expand Down Expand Up @@ -54,7 +55,9 @@ func (dq *DAQueue) getNextData(ctx context.Context) error {
if errors.Is(err, da.ErrSourceExhausted) {
dq.l1height = dq.dataSource.L1Height()
dq.dataSource = nil
return dq.getNextData(ctx)

// we return EOFError to be handled in pipeline
return serrors.EOFError
}

return err
Expand Down
69 changes: 69 additions & 0 deletions rollup/da_syncer/serrors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package serrors

import (
"fmt"
)

const (
temporary Type = iota
eof
)

var (
TemporaryError = NewTemporaryError(nil)
EOFError = NewEOFError(nil)
)

type Type uint8

func (t Type) String() string {
switch t {
case temporary:
return "temporary"
case eof:
return "EOF"
default:
return "unknown"
}
}

type syncError struct {
t Type
err error
}

func NewTemporaryError(err error) error {
return &syncError{t: temporary, err: err}
}

func NewEOFError(err error) error {
return &syncError{t: eof, err: err}
}

func (s *syncError) Error() string {
return fmt.Sprintf("%s: %v", s.t, s.err)
}

func (s *syncError) Unwrap() error {
return s.err
}

func (s *syncError) Is(target error) bool {
if target == nil {
return s == nil
}

targetSyncErr, ok := target.(*syncError)
if !ok {
return false
}

return s.t == targetSyncErr.t
}

func WrapWithTemporary(err error) error {
return &syncError{
t: temporary,
err: err,
}
}
28 changes: 21 additions & 7 deletions rollup/da_syncer/syncing_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"

Expand All @@ -15,6 +14,7 @@ import (
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/params"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors"
"github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service"
"github.com/scroll-tech/go-ethereum/rollup/sync_service"
)
Expand Down Expand Up @@ -114,6 +114,7 @@ func (s *SyncingPipeline) mainLoop() {
stepCh := make(chan struct{}, 1)
var delayedStepCh <-chan time.Time
var resetCounter int
var tempErrorCounter int

// reqStep is a helper function to request a step to be executed.
// If delay is true, it will request a delayed step with exponential backoff, otherwise it will request an immediate step.
Expand Down Expand Up @@ -157,29 +158,42 @@ func (s *SyncingPipeline) mainLoop() {
reqStep(false)
s.expBackoff.Reset()
resetCounter = 0
tempErrorCounter = 0
continue
}

if errors.Is(err, io.EOF) {
if errors.Is(err, serrors.EOFError) {
// pipeline is empty, request a delayed step
// TODO: eventually (with state manager) this should not trigger a delayed step because external events will trigger a new step anyway
reqStep(true)
tempErrorCounter = 0
continue
}
if errors.Is(err, ErrBlockTooLow) {
} else if errors.Is(err, serrors.TemporaryError) {
log.Warn("syncing pipeline step failed due to temporary error, retrying", "err", err)
if tempErrorCounter > 100 {
log.Warn("syncing pipeline step failed due to 100 consecutive temporary errors, stopping pipeline worker", "last err", err)
return
}

// temporary error, request a delayed step
reqStep(true)
tempErrorCounter++
continue
} else if errors.Is(err, ErrBlockTooLow) {
// block number returned by the block queue is too low,
// we skip the blocks until we reach the correct block number again.
reqStep(false)
tempErrorCounter = 0
continue
} else if errors.Is(err, ErrBlockTooHigh) {
// block number returned by the block queue is too high,
// reset the pipeline and move backwards from the last L1 block we read
s.reset(resetCounter)
resetCounter++
reqStep(false)
tempErrorCounter = 0
continue
}

if errors.Is(err, context.Canceled) {
} else if errors.Is(err, context.Canceled) {
log.Info("syncing pipeline stopped due to cancelled context", "err", err)
return
}
Expand Down
2 changes: 1 addition & 1 deletion rollup/rollup_sync_service/l1client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewL1Client(ctx context.Context, l1Client sync_service.EthClient, l1ChainId
return &client, nil
}

// fetcRollupEventsInRange retrieves and parses commit/revert/finalize rollup events between block numbers: [from, to].
// FetchRollupEventsInRange retrieves and parses commit/revert/finalize rollup events between block numbers: [from, to].
func (c *L1Client) FetchRollupEventsInRange(from, to uint64) ([]types.Log, error) {
log.Trace("L1Client fetchRollupEventsInRange", "fromBlock", from, "toBlock", to)

Expand Down

0 comments on commit 8f1f6b7

Please sign in to comment.