Skip to content

Commit

Permalink
pipeline replay mode
Browse files Browse the repository at this point in the history
  • Loading branch information
omerfirmak committed Jul 10, 2024
1 parent b1491c4 commit 4f2963a
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
23 changes: 15 additions & 8 deletions rollup/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ var (
)

type Pipeline struct {
chain *core.BlockChain
vmConfig vm.Config
parent *types.Block
start time.Time
wg sync.WaitGroup
ctx context.Context
cancelCtx context.CancelFunc
chain *core.BlockChain
vmConfig vm.Config
parent *types.Block
start time.Time
wg sync.WaitGroup
ctx context.Context
cancelCtx context.CancelFunc
replayMode bool

// accumulators
ccc *circuitcapacitychecker.CircuitCapacityChecker
Expand Down Expand Up @@ -114,6 +115,12 @@ func (p *Pipeline) WithBeforeTxHook(beforeTxHook func()) *Pipeline {
return p
}

// WithReplayMode enables the replay mode that allows L1 messages to be skipped
func (p *Pipeline) WithReplayMode() *Pipeline {
p.replayMode = true
return p
}

func (p *Pipeline) Start(deadline time.Time) error {
p.start = time.Now()
p.txnQueue = make(chan *types.Transaction)
Expand Down Expand Up @@ -266,7 +273,7 @@ func (p *Pipeline) traceAndApplyStage(txsIn <-chan *types.Transaction) (<-chan e
return
}

if tx.IsL1MessageTx() && tx.AsL1MessageTx().QueueIndex != p.nextL1MsgIndex {
if !p.replayMode && tx.IsL1MessageTx() && tx.AsL1MessageTx().QueueIndex != p.nextL1MsgIndex {
// Continue, we might still be able to include some L2 messages
sendCancellable(resCh, ErrUnexpectedL1MessageIndex, p.ctx.Done())
continue
Expand Down
2 changes: 1 addition & 1 deletion rollup/state_processor/pipelined_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (p *Processor) Process(block *types.Block, statedb *state.StateDB, cfg vm.C
}
}

pl := pipeline.NewPipeline(p.chain, cfg, statedb, header, nextL1MsgIndex, p.ccc)
pl := pipeline.NewPipeline(p.chain, cfg, statedb, header, nextL1MsgIndex, p.ccc).WithReplayMode()
pl.Start(time.Now().Add(time.Minute))
defer pl.Release()

Expand Down

0 comments on commit 4f2963a

Please sign in to comment.