Skip to content

Commit

Permalink
fix(batcher): altda concurrent blob responses are reordered to respec…
Browse files Browse the repository at this point in the history
…t holocene strict ordering rules
  • Loading branch information
samlaf committed Dec 8, 2024
1 parent 6dd280b commit d80a4d3
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 24 deletions.
78 changes: 73 additions & 5 deletions op-batcher/batcher/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package batcher
import (
"math"

altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
Expand All @@ -20,7 +21,16 @@ type channel struct {

// pending channel builder
channelBuilder *ChannelBuilder
// Set of unconfirmed txID -> tx data. For tx resubmission
// Temporary cache for altDACommitments that are received potentially out of order from the da layer.
// Map: first frameNumber in txData -> txData (that contains an altDACommitment)
// Once the txData containing altDANextFrame is received, it will be pulled out of the
// channel on the next driver iteration, and sent to L1.
altDACommitments map[uint16]txData
// Points to the next frame number to send to L1 in order to maintain holocene strict ordering rules.
// When altDACommitments[altDAFrameCursor] is non-nil, it will be sent to L1.
altDAFrameCursor uint16
// Set of unconfirmed txID -> tx data. For tx resubmission.
// Also used for altda for the entirity of the submission (data -> commitment -> tx).
pendingTransactions map[string]txData
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions map[string]eth.BlockID
Expand All @@ -38,21 +48,60 @@ func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollup
metr: metr,
cfg: cfg,
channelBuilder: cb,
altDACommitments: make(map[uint16]txData),
pendingTransactions: make(map[string]txData),
confirmedTransactions: make(map[string]eth.BlockID),
minInclusionBlock: math.MaxUint64,
}
}

func (s *channel) CacheAltDACommitment(txData txData, commitment altda.CommitmentData) {
if commitment == nil {
panic("expected non-nil commitment")
}
if len(txData.frames) == 0 {
panic("expected txData to have frames")
}
txData.altDACommitment = commitment
s.log.Debug("caching altDA commitment", "frame", txData.frames[0].id.frameNumber, "commitment", commitment.String())
s.altDACommitments[txData.frames[0].id.frameNumber] = txData
}

func (s *channel) rewindAltDAFrameCursor(txData txData) {
if len(txData.frames) == 0 {
panic("expected txData to have frames")
}
s.altDAFrameCursor = txData.frames[0].id.frameNumber
}

func (s *channel) AltDASubmissionFailed(id string) {
// We coopt TxFailed to rewind the frame cursor.
// This will force a resubmit of all the following frames as well,
// even if they had already successfully been submitted and their commitment cached.
// Ideally we'd have another way but for simplicity and to not tangle the altda code
// too much with the non altda code, we reuse the FrameCursor feature.
// TODO: is there a better abstraction for altda channels? FrameCursors are not well suited
// since frames do not have to be sent in order to the altda, only their commitment does.
s.TxFailed(id)
}

// TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction.
func (c *channel) TxFailed(id string) {
if data, ok := c.pendingTransactions[id]; ok {
c.log.Trace("marked transaction as failed", "id", id)
// Rewind to the first frame of the failed tx
// -- the frames are ordered, and we want to send them
// all again.
c.channelBuilder.RewindFrameCursor(data.Frames()[0])
if data.altDACommitment != nil {
// In altDA mode, we don't want to rewind the channelBuilder's frameCursor
// because that will lead to resubmitting the same data to the da layer.
// We simply need to rewind the altDAFrameCursor to the first frame of the failed txData,
// to force a resubmit of the cached altDACommitment.
c.rewindAltDAFrameCursor(data)
} else {
// Rewind to the first frame of the failed tx
// -- the frames are ordered, and we want to send them
// all again.
c.channelBuilder.RewindFrameCursor(data.Frames()[0])
}
delete(c.pendingTransactions, id)
} else {
c.log.Warn("unknown transaction marked as failed", "id", id)
Expand Down Expand Up @@ -124,6 +173,25 @@ func (c *channel) ID() derive.ChannelID {
return c.channelBuilder.ID()
}

func (c *channel) NextAltDACommitment() (txData, bool) {
if txData, ok := c.altDACommitments[c.altDAFrameCursor]; ok {
if txData.altDACommitment == nil {
panic("expected altDACommitment to be non-nil")
}
if len(txData.frames) == 0 {
panic("expected txData to have frames")
}
// update altDANextFrame to the first frame of the next txData
lastFrame := txData.frames[len(txData.frames)-1]
c.altDAFrameCursor = lastFrame.id.frameNumber + 1
// We also store it in pendingTransactions so that TxFailed can know
// that this tx's altDA commitment was already cached.
c.pendingTransactions[txData.ID().String()] = txData
return txData, true
}
return txData{}, false
}

// NextTxData dequeues the next frames from the channel and returns them encoded in a tx data packet.
// If cfg.UseBlobs is false, it returns txData with a single frame.
// If cfg.UseBlobs is true, it will read frames from its channel builder
Expand Down
55 changes: 53 additions & 2 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"sync"

altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
Expand Down Expand Up @@ -55,7 +56,7 @@ type channelManager struct {
currentChannel *channel
// channels to read frame data from, for writing batches onchain
channelQueue []*channel
// used to lookup channels by tx ID upon tx success / failure
// used to lookup channels by tx ID upon altda and tx success / failure
txChannels map[string]*channel
}

Expand Down Expand Up @@ -94,6 +95,38 @@ func (s *channelManager) pendingBlocks() int {
return s.blocks.Len() - s.blockCursor
}

func (s *channelManager) CacheAltDACommitment(txData txData, commitment altda.CommitmentData) {
s.mu.Lock()
defer s.mu.Unlock()
if len(txData.frames) == 0 {
panic("no frames in txData")
}
firstFrame, lastFrame := txData.frames[0], txData.frames[len(txData.frames)-1]
if firstFrame.id.chID != lastFrame.id.chID {
// The current implementation caches commitments inside channels,
// so it assumes that a txData only contains frames from a single channel.
// If this ever panics (hopefully in tests...) it shouldn't be too hard to fix.
panic("commitment spans multiple channels")
}
if channel, ok := s.txChannels[txData.ID().String()]; ok {
channel.CacheAltDACommitment(txData, commitment)
} else {
s.log.Crit("trying to cache altda commitment for txData from unknown channel", "id", txData.ID())
}
}

func (s *channelManager) AltDASubmissionFailed(_id txID) {
s.mu.Lock()
defer s.mu.Unlock()
id := _id.String()
if channel, ok := s.txChannels[id]; ok {
delete(s.txChannels, id)
channel.AltDASubmissionFailed(id)
} else {
s.log.Warn("transaction from unknown channel marked as failed", "id", id)
}
}

// TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction.
func (s *channelManager) TxFailed(_id txID) {
Expand Down Expand Up @@ -188,6 +221,20 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {
return tx, nil
}

func (s *channelManager) getNextAltDACommitment() (txData, bool) {
for _, channel := range s.channelQueue {
// if all frames have already been sent to altda, skip this channel
if int(channel.altDAFrameCursor) == channel.channelBuilder.TotalFrames() {
continue
}
if txData, ok := channel.NextAltDACommitment(); ok {
s.txChannels[txData.ID().String()] = channel
return txData, true
}
}
return emptyTxData, false
}

// TxData returns the next tx data that should be submitted to L1.
//
// If the current channel is
Expand All @@ -200,6 +247,10 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {
func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
s.mu.Lock()
defer s.mu.Unlock()
// if any altda commitment is ready, return it
if txdata, ok := s.getNextAltDACommitment(); ok {
return txdata, nil
}
channel, err := s.getReadyChannel(l1Head)
if err != nil {
return emptyTxData, err
Expand Down Expand Up @@ -257,7 +308,7 @@ func (s *channelManager) getReadyChannel(l1Head eth.BlockID) (*channel, error) {
}

dataPending := firstWithTxData != nil
s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", s.blocks.Len())
s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", s.pendingBlocks())

// Short circuit if there is pending tx data or the channel manager is closed
if dataPending {
Expand Down
36 changes: 22 additions & 14 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,12 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
}
l.Metr.RecordLatestL1Block(l1tip)

// In AltDA mode, before pulling data out of the state, we make sure
// that the daGroup has not reached the maximum number of goroutines.
// This is to prevent blocking the main event loop when submitting the data to the DA Provider.
if l.Config.UseAltDA && !daGroup.TryGo(func() error { return nil }) {
return io.EOF
}
// Collect next transaction data. This pulls data out of the channel, so we need to make sure
// to put it back if ever da or txmgr requests fail, by calling l.recordFailedDARequest/recordFailedTx.
txdata, err := l.state.TxData(l1tip.ID())
Expand Down Expand Up @@ -765,8 +771,9 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh
l.sendTx(txData{}, true, candidate, queue, receiptsCh)
}

// publishToAltDAAndL1 posts the txdata to the DA Provider and then sends the commitment to L1.
func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) {
// publishToAltDAAndStoreCommitment posts the txdata to the DA Provider and stores the returned commitment
// in the state. The commitment will later be sent to the L1 while making sure to follow holocene's strict ordering rules.
func (l *BatchSubmitter) publishToAltDAAndStoreCommitment(txdata txData, daGroup *errgroup.Group) {
// sanity checks
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
Expand All @@ -777,7 +784,7 @@ func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[t

// when posting txdata to an external DA Provider, we use a goroutine to avoid blocking the main loop
// since it may take a while for the request to return.
goroutineSpawned := daGroup.TryGo(func() error {
daGroup.Go(func() error {
// TODO: probably shouldn't be using the global shutdownCtx here, see https://go.dev/blog/context-and-structs
// but sendTransaction receives l.killCtx as an argument, which currently is only canceled after waiting for the main loop
// to exit, which would wait on this DA call to finish, which would take a long time.
Expand All @@ -796,17 +803,10 @@ func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[t
}
return nil
}
l.Log.Info("Set altda input", "commitment", comm, "tx", txdata.ID())
candidate := l.calldataTxCandidate(comm.TxData())
l.sendTx(txdata, false, candidate, queue, receiptsCh)
l.Log.Info("Sent txdata to altda layer and received commitment", "commitment", comm, "tx", txdata.ID())
l.state.CacheAltDACommitment(txdata, comm)
return nil
})
if !goroutineSpawned {
// We couldn't start the goroutine because the errgroup.Group limit
// is already reached. Since we can't send the txdata, we have to
// return it for later processing. We use nil error to skip error logging.
l.recordFailedDARequest(txdata.ID(), nil)
}
}

// sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`.
Expand All @@ -817,7 +817,15 @@ func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txRef

// if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if l.Config.UseAltDA {
l.publishToAltDAAndL1(txdata, queue, receiptsCh, daGroup)
if txdata.altDACommitment == nil {
l.publishToAltDAAndStoreCommitment(txdata, daGroup)
} else {
// This means the txdata was already sent to the DA Provider and we have the commitment
// so we can send the commitment to the L1
l.Log.Info("Sending altda commitment to L1", "commitment", txdata.altDACommitment, "tx", txdata.ID())
candidate := l.calldataTxCandidate(txdata.altDACommitment.TxData())
l.sendTx(txdata, false, candidate, queue, receiptsCh)
}
// we return nil to allow publishStateToL1 to keep processing the next txdata
return nil
}
Expand Down Expand Up @@ -894,7 +902,7 @@ func (l *BatchSubmitter) recordFailedDARequest(id txID, err error) {
if err != nil {
l.Log.Warn("DA request failed", logFields(id, err)...)
}
l.state.TxFailed(id)
l.state.AltDASubmissionFailed(id)
}

func (l *BatchSubmitter) recordFailedTx(id txID, err error) {
Expand Down
3 changes: 0 additions & 3 deletions op-batcher/batcher/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,6 @@ func TestBatchSubmitter_AltDA_FailureCase2_FailedL1Tx(t *testing.T) {
err = bs.StopBatchSubmitting(context.Background())
require.NoError(t, err)

// FIXME: storeCount=7 with current buggy implementation, because when an L1 tx fails,
// we BOTH rewind the altdaChannelCursor (to resubmit the failed tx) AND push back the frames into the channelManager.
// A quick fix (?) is to not push back if the failed tx was an altda tx.
require.Equal(t, 4, mockAltDAClient.StoreCount)
// TODO: we should prob also check that the commitments are in order?
require.Equal(t, uint64(4), fakeTxMgr.Nonce)
Expand Down
4 changes: 4 additions & 0 deletions op-batcher/batcher/tx_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strings"

altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
Expand All @@ -16,6 +17,9 @@ import (
type txData struct {
frames []frameData
asBlob bool // indicates whether this should be sent as blob
// altDACommitment is non-nil when the frames have been sent to the alt-da server,
// and the received commitment needs to be sent to the L1.
altDACommitment altda.CommitmentData
}

func singleFrameTxData(frame frameData) txData {
Expand Down

0 comments on commit d80a4d3

Please sign in to comment.