Skip to content

Commit

Permalink
Restructured logic for processing epoch recover to avoid a case where…
Browse files Browse the repository at this point in the history
… invalid event modifies current state. Updated tests
  • Loading branch information
durkmurder committed Aug 9, 2024
1 parent 610abde commit 8f7b413
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 36 deletions.
39 changes: 20 additions & 19 deletions state/protocol/protocol_state/epochs/fallback_statemachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,19 @@ func (m *FallbackStateMachine) ProcessEpochRecover(epochRecover *flow.EpochRecov
m.telemetry.OnInvalidServiceEvent(epochRecover.ServiceEvent(), err)
return false, nil
}
nextEpoch := m.state.NextEpoch
if nextEpoch != nil {
// accept iff the EpochRecover is the same as the one we have already recovered.
if nextEpoch.SetupID != epochRecover.EpochSetup.ID() ||
nextEpoch.CommitID != epochRecover.EpochCommit.ID() {
m.telemetry.OnInvalidServiceEvent(epochRecover.ServiceEvent(),
protocol.NewInvalidServiceEventErrorf("multiple inconsistent EpochRecover events sealed in the same block"))
return false, nil
}
}
// m.state.NextEpoch is either nil, or its EpochSetup and EpochCommit are identical to the given `epochRecover`

// assemble EpochStateContainer for next epoch:
nextEpochParticipants, err := buildNextEpochActiveParticipants(
m.state.CurrentEpoch.ActiveIdentities.Lookup(),
m.state.CurrentEpochSetup,
Expand All @@ -174,26 +186,14 @@ func (m *FallbackStateMachine) ProcessEpochRecover(epochRecover *flow.EpochRecov
m.telemetry.OnInvalidServiceEvent(epochRecover.ServiceEvent(), fmt.Errorf("rejecting EpochRecover event: %w", err))
return false, nil
}

nextEpoch := m.state.NextEpoch
if nextEpoch == nil {
// setup next epoch if there is none
m.state.NextEpoch = &flow.EpochStateContainer{
SetupID: epochRecover.EpochSetup.ID(),
CommitID: epochRecover.EpochCommit.ID(),
ActiveIdentities: nextEpochParticipants,
EpochExtensions: nil,
}
} else {
// accept iff the EpochRecover is the same as the one we have already recovered.
if nextEpoch.SetupID != epochRecover.EpochSetup.ID() ||
nextEpoch.CommitID != epochRecover.EpochCommit.ID() {
m.telemetry.OnInvalidServiceEvent(epochRecover.ServiceEvent(),
protocol.NewInvalidServiceEventErrorf("multiple inconsistent EpochRecover events sealed in the same block"))
return false, nil
}
nextEpoch = &flow.EpochStateContainer{
SetupID: epochRecover.EpochSetup.ID(),
CommitID: epochRecover.EpochCommit.ID(),
ActiveIdentities: nextEpochParticipants,
EpochExtensions: nil,
}
err = m.ejector.TrackDynamicIdentityList(m.state.NextEpoch.ActiveIdentities)

err = m.ejector.TrackDynamicIdentityList(nextEpoch.ActiveIdentities)
if err != nil {
if protocol.IsInvalidServiceEventError(err) {
m.telemetry.OnInvalidServiceEvent(epochRecover.ServiceEvent(), fmt.Errorf("rejecting EpochRecover event: %w", err))
Expand All @@ -202,6 +202,7 @@ func (m *FallbackStateMachine) ProcessEpochRecover(epochRecover *flow.EpochRecov
return false, fmt.Errorf("unexpected errors tracking identity list: %w", err)
}
// if we have processed a valid EpochRecover event, we should exit EFM.
m.state.NextEpoch = nextEpoch
m.state.EpochFallbackTriggered = false
m.telemetry.OnServiceEventProcessed(epochRecover.ServiceEvent())
return true, nil
Expand Down
76 changes: 59 additions & 17 deletions state/protocol/protocol_state/epochs/fallback_statemachine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,28 +659,70 @@ func (s *EpochFallbackStateMachineSuite) TestEpochFallbackStateMachineInjectsMul
}
}

// TestEpochRecoverAndEjectionInSameBlock tests that processing an epoch recover event which re-admits an ejected identity results in an error.
// Such action should be considered illegal since smart contract emitted ejection before epoch recover and service events are delivered
// in an order-preserving manner.
// TestEpochRecoverAndEjectionInSameBlock tests that state machine correctly handles ejection events and a subsequent
// epoch recover in the same block. Specifically we test two cases:
// 1. Happy Path: The Epoch Recover event excludes the previously ejected node.
// 2. Invalid Epoch Recover: an epoch recover event which re-admits an ejected identity is ignored. Such action should
// be considered illegal since smart contract emitted ejection before epoch recover and service events are delivered
// in an order-preserving manner. However, since the FallbackStateMachine is intended to keep the protocol alive even
// in the presence of (largely) arbitrary Epoch Smart Contract bugs, it should also handle this case gracefully.
// In this case, the EpochRecover service event should be discarded and the internal state should remain unchanged.
func (s *EpochFallbackStateMachineSuite) TestEpochRecoverAndEjectionInSameBlock() {
nextEpochParticipants := s.parentProtocolState.CurrentEpochIdentityTable.Copy()
ejectedIdentityID := nextEpochParticipants.Filter(filter.HasRole[flow.Identity](flow.RoleAccess))[0].NodeID
epochRecover := unittest.EpochRecoverFixture(func(setup *flow.EpochSetup) {
setup.Participants = nextEpochParticipants.ToSkeleton()
setup.Assignments = unittest.ClusterAssignment(1, nextEpochParticipants.ToSkeleton())
setup.Counter = s.parentProtocolState.CurrentEpochSetup.Counter + 1
setup.FirstView = s.parentProtocolState.CurrentEpochSetup.FinalView + 1
setup.FinalView = setup.FirstView + 10_000

s.Run("happy path", func() {
wasEjected := s.stateMachine.EjectIdentity(ejectedIdentityID)
require.True(s.T(), wasEjected)

epochRecover := unittest.EpochRecoverFixture(func(setup *flow.EpochSetup) {
setup.Participants = nextEpochParticipants.ToSkeleton().Filter(
filter.Not(filter.HasNodeID[flow.IdentitySkeleton](ejectedIdentityID)))
setup.Assignments = unittest.ClusterAssignment(1, setup.Participants.ToSkeleton())
setup.Counter = s.parentProtocolState.CurrentEpochSetup.Counter + 1
setup.FirstView = s.parentProtocolState.CurrentEpochSetup.FinalView + 1
setup.FinalView = setup.FirstView + 10_000
})
s.consumer.On("OnServiceEventReceived", epochRecover.ServiceEvent()).Once()
s.consumer.On("OnServiceEventProcessed", epochRecover.ServiceEvent()).Once()
processed, err := s.stateMachine.ProcessEpochRecover(epochRecover)
require.NoError(s.T(), err)
require.True(s.T(), processed)

updatedState, _, _ := s.stateMachine.Build()
require.False(s.T(), updatedState.EpochFallbackTriggered, "should exit EFM")
require.NotNil(s.T(), updatedState.NextEpoch, "should setup & commit next epoch")
})
ejected := s.stateMachine.EjectIdentity(ejectedIdentityID)
require.True(s.T(), ejected)
s.Run("invalid epoch recover event", func() {
s.kvstore = mockstate.NewKVStoreReader(s.T())
s.kvstore.On("GetEpochExtensionViewCount").Return(extensionViewCount).Maybe()
s.kvstore.On("GetEpochCommitSafetyThreshold").Return(uint64(200))

s.consumer.On("OnServiceEventReceived", epochRecover.ServiceEvent()).Once()
s.consumer.On("OnInvalidServiceEvent", epochRecover.ServiceEvent(),
mock.MatchedBy(func(err error) bool { return protocol.IsInvalidServiceEventError(err) })).Once()
processed, err := s.stateMachine.ProcessEpochRecover(epochRecover)
require.NoError(s.T(), err)
require.False(s.T(), processed)
var err error
s.stateMachine, err = NewFallbackStateMachine(s.kvstore, s.consumer, s.candidate.View, s.parentProtocolState.Copy())
require.NoError(s.T(), err)

wasEjected := s.stateMachine.EjectIdentity(ejectedIdentityID)
require.True(s.T(), wasEjected)

epochRecover := unittest.EpochRecoverFixture(func(setup *flow.EpochSetup) {
setup.Participants = nextEpochParticipants.ToSkeleton()
setup.Assignments = unittest.ClusterAssignment(1, nextEpochParticipants.ToSkeleton())
setup.Counter = s.parentProtocolState.CurrentEpochSetup.Counter + 1
setup.FirstView = s.parentProtocolState.CurrentEpochSetup.FinalView + 1
setup.FinalView = setup.FirstView + 10_000
})
s.consumer.On("OnServiceEventReceived", epochRecover.ServiceEvent()).Once()
s.consumer.On("OnInvalidServiceEvent", epochRecover.ServiceEvent(),
mock.MatchedBy(func(err error) bool { return protocol.IsInvalidServiceEventError(err) })).Once()
processed, err := s.stateMachine.ProcessEpochRecover(epochRecover)
require.NoError(s.T(), err)
require.False(s.T(), processed)

updatedState, _, _ := s.stateMachine.Build()
require.True(s.T(), updatedState.EpochFallbackTriggered, "should remain in EFM")
require.Nil(s.T(), updatedState.NextEpoch, "next epoch should be nil as recover event is invalid")
})
}

// TestProcessingMultipleEventsAtTheSameBlock tests that the state machine can process multiple events at the same block.
Expand Down

0 comments on commit 8f7b413

Please sign in to comment.