Skip to content

Commit

Permalink
Merge pull request #5112 from onflow/yahya/6915-fixing-flakey-integra…
Browse files Browse the repository at this point in the history
…tion-test-part-2

[Networking] Restores skipped `TestGossipSubSpamMitigationIntegration`
  • Loading branch information
yhassanzadeh13 authored Dec 7, 2023
2 parents 6cbb9d7 + 3e3d2ed commit db3a542
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

// TestMetricsInspector_ObserveRPC ensures that the gossipsub rpc metrics inspector observes metrics for control messages as expected.
func TestMetricsInspector_ObserveRPC(t *testing.T) {
t.Parallel()
role := flow.RoleConsensus
sporkID := unittest.IdentifierFixture()
idProvider := unittest.NewUpdatableIDProvider(flow.IdentityList{})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
// - malformed topic: topic is malformed in some way
// - invalid spork ID: spork ID prepended to topic and current spork ID do not match
func TestValidationInspector_InvalidTopicId_Detection(t *testing.T) {
t.Parallel()
role := flow.RoleConsensus
sporkID := unittest.IdentifierFixture()
flowConfig, err := config.DefaultConfig()
Expand Down Expand Up @@ -175,7 +174,6 @@ func TestValidationInspector_InvalidTopicId_Detection(t *testing.T) {
// TestValidationInspector_DuplicateTopicId_Detection ensures that when an RPC control message contains a duplicate topic ID an invalid control message
// notification is disseminated with the expected error.
func TestValidationInspector_DuplicateTopicId_Detection(t *testing.T) {
t.Parallel()
role := flow.RoleConsensus
sporkID := unittest.IdentifierFixture()
flowConfig, err := config.DefaultConfig()
Expand Down Expand Up @@ -287,7 +285,6 @@ func TestValidationInspector_DuplicateTopicId_Detection(t *testing.T) {
// TestValidationInspector_IHaveDuplicateMessageId_Detection ensures that when an RPC iHave control message contains a duplicate message ID for a single topic
// notification is disseminated with the expected error.
func TestValidationInspector_IHaveDuplicateMessageId_Detection(t *testing.T) {
t.Parallel()
role := flow.RoleConsensus
sporkID := unittest.IdentifierFixture()
flowConfig, err := config.DefaultConfig()
Expand Down Expand Up @@ -391,7 +388,6 @@ func TestValidationInspector_IHaveDuplicateMessageId_Detection(t *testing.T) {
// TestValidationInspector_UnknownClusterId_Detection ensures that when an RPC control message contains a topic with an unknown cluster ID an invalid control message
// notification is disseminated with the expected error.
func TestValidationInspector_UnknownClusterId_Detection(t *testing.T) {
t.Parallel()
role := flow.RoleConsensus
sporkID := unittest.IdentifierFixture()
flowConfig, err := config.DefaultConfig()
Expand Down Expand Up @@ -504,7 +500,6 @@ func TestValidationInspector_UnknownClusterId_Detection(t *testing.T) {
// cluster prefix hard threshold when the active cluster IDs not set and an invalid control message notification is disseminated with the expected error.
// This test involves Graft control messages.
func TestValidationInspector_ActiveClusterIdsNotSet_Graft_Detection(t *testing.T) {
t.Parallel()
role := flow.RoleConsensus
sporkID := unittest.IdentifierFixture()
flowConfig, err := config.DefaultConfig()
Expand Down Expand Up @@ -594,7 +589,6 @@ func TestValidationInspector_ActiveClusterIdsNotSet_Graft_Detection(t *testing.T
// cluster prefix hard threshold when the active cluster IDs not set and an invalid control message notification is disseminated with the expected error.
// This test involves Prune control messages.
func TestValidationInspector_ActiveClusterIdsNotSet_Prune_Detection(t *testing.T) {
t.Parallel()
role := flow.RoleConsensus
sporkID := unittest.IdentifierFixture()
flowConfig, err := config.DefaultConfig()
Expand Down Expand Up @@ -680,7 +674,6 @@ func TestValidationInspector_ActiveClusterIdsNotSet_Prune_Detection(t *testing.T
// TestValidationInspector_UnstakedNode_Detection ensures that RPC control message inspector disseminates an invalid control message notification when an unstaked peer
// sends a control message for a cluster prefixed topic.
func TestValidationInspector_UnstakedNode_Detection(t *testing.T) {
t.Parallel()
role := flow.RoleConsensus
sporkID := unittest.IdentifierFixture()
flowConfig, err := config.DefaultConfig()
Expand Down Expand Up @@ -779,7 +772,6 @@ func TestValidationInspector_UnstakedNode_Detection(t *testing.T) {
// TestValidationInspector_InspectIWants_CacheMissThreshold ensures that expected invalid control message notification is disseminated when the number of iWant message Ids
// without a corresponding iHave message sent with the same message ID exceeds the configured cache miss threshold.
func TestValidationInspector_InspectIWants_CacheMissThreshold(t *testing.T) {
t.Parallel()
role := flow.RoleConsensus
sporkID := unittest.IdentifierFixture()
// create our RPC validation inspector
Expand Down Expand Up @@ -885,7 +877,6 @@ func TestValidationInspector_InspectIWants_CacheMissThreshold(t *testing.T) {
// TestValidationInspector_InspectRpcPublishMessages ensures that expected invalid control message notification is disseminated when the number of errors encountered during
// RPC publish message validation exceeds the configured error threshold.
func TestValidationInspector_InspectRpcPublishMessages(t *testing.T) {
t.Parallel()
role := flow.RoleConsensus
sporkID := unittest.IdentifierFixture()
// create our RPC validation inspector
Expand Down Expand Up @@ -1031,8 +1022,6 @@ func TestValidationInspector_InspectRpcPublishMessages(t *testing.T) {
// The victim node is configured to use the GossipSubInspector to detect spam and the scoring system to mitigate spam.
// The test ensures that the victim node is disconnected from the spammer node on the GossipSub mesh after the spam detection is triggered.
func TestGossipSubSpamMitigationIntegration(t *testing.T) {
unittest.SkipUnless(t, unittest.TEST_FLAKY, "flaky in CI")
t.Parallel()
idProvider := mock.NewIdentityProvider(t)
sporkID := unittest.IdentifierFixture()
spammer := corruptlibp2p.NewGossipSubRouterSpammer(t, sporkID, flow.RoleConsensus, idProvider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestControlMessageValidationInspector_truncateRPC(t *testing.T) {
distributor.On("Distribute", mock.AnythingOfType("*p2p.InvCtrlMsgNotif")).Return(nil).Twice()

inspector.Start(signalerCtx)
//unittest.RequireCloseBefore(t, inspector.Ready(), 100*time.Millisecond, "inspector did not start")
// unittest.RequireCloseBefore(t, inspector.Ready(), 100*time.Millisecond, "inspector did not start")
// topic validation not performed, so we can use random strings
prunesGreaterThanMaxSampleSize := unittest.P2PRPCFixture(unittest.WithPrunes(unittest.P2PRPCPruneFixtures(unittest.IdentifierListFixture(2000).Strings()...)...))
require.Greater(t, len(prunesGreaterThanMaxSampleSize.GetControl().GetPrune()), graftPruneMessageMaxSampleSize)
Expand Down Expand Up @@ -520,35 +520,36 @@ func TestControlMessageValidationInspector_processInspectRPCReq(t *testing.T) {
stopInspector(t, cancel, inspector)
})

t.Run("inspectIWantMessages should not disseminate invalid control message notification for iWant messages when cache misses exceeds allowed threshold if cache miss check size not exceeded", func(t *testing.T) {
inspector, signalerCtx, cancel, distributor, rpcTracker, _, _, _ := inspectorFixture(t, func(params *validation.InspectorParams) {
// if size of iwants not greater than 10 cache misses will not be checked
params.Config.CacheMissCheckSize = 10
// set high cache miss threshold to ensure we only disseminate notification when it is exceeded
params.Config.IWantRPCInspectionConfig.CacheMissThreshold = .9
})
// oracle must be set even though iWant messages do not have topic IDs
defer distributor.AssertNotCalled(t, "Distribute")
t.Run("inspectIWantMessages should not disseminate invalid control message notification for iWant messages when cache misses exceeds allowed threshold if cache miss check size not exceeded",
func(t *testing.T) {
inspector, signalerCtx, cancel, distributor, rpcTracker, _, _, _ := inspectorFixture(t, func(params *validation.InspectorParams) {
// if size of iwants not greater than 10 cache misses will not be checked
params.Config.CacheMissCheckSize = 10
// set high cache miss threshold to ensure we only disseminate notification when it is exceeded
params.Config.IWantRPCInspectionConfig.CacheMissThreshold = .9
})
// oracle must be set even though iWant messages do not have topic IDs
defer distributor.AssertNotCalled(t, "Distribute")

msgIds := unittest.IdentifierListFixture(100).Strings()
inspectMsgRpc := unittest.P2PRPCFixture(unittest.WithIWants(unittest.P2PRPCIWantFixture(msgIds...)))
rpcTracker.On("LastHighestIHaveRPCSize").Return(int64(100)).Maybe()
// return false each time to eventually force a notification to be disseminated when the cache miss count finally exceeds the 90% threshold
rpcTracker.On("WasIHaveRPCSent", mock.AnythingOfType("string")).Return(false).Run(func(args mock.Arguments) {
id, ok := args[0].(string)
require.True(t, ok)
require.Contains(t, msgIds, id)
})

from := unittest.PeerIdFixture(t)
inspector.Start(signalerCtx)

msgIds := unittest.IdentifierListFixture(100).Strings()
inspectMsgRpc := unittest.P2PRPCFixture(unittest.WithIWants(unittest.P2PRPCIWantFixture(msgIds...)))
rpcTracker.On("LastHighestIHaveRPCSize").Return(int64(100)).Maybe()
// return false each time to eventually force a notification to be disseminated when the cache miss count finally exceeds the 90% threshold
rpcTracker.On("WasIHaveRPCSent", mock.AnythingOfType("string")).Return(false).Run(func(args mock.Arguments) {
id, ok := args[0].(string)
require.True(t, ok)
require.Contains(t, msgIds, id)
require.NoError(t, inspector.Inspect(from, inspectMsgRpc))
// sleep for 1 second to ensure rpc's is processed
time.Sleep(time.Second)
stopInspector(t, cancel, inspector)
})

from := unittest.PeerIdFixture(t)
inspector.Start(signalerCtx)

require.NoError(t, inspector.Inspect(from, inspectMsgRpc))
// sleep for 1 second to ensure rpc's is processed
time.Sleep(time.Second)
stopInspector(t, cancel, inspector)
})

t.Run("inspectRpcPublishMessages should disseminate invalid control message notification when invalid pubsub messages count greater than configured RpcMessageErrorThreshold", func(t *testing.T) {
errThreshold := 500
inspector, signalerCtx, cancel, distributor, _, sporkID, _, topicProviderOracle := inspectorFixture(t, func(params *validation.InspectorParams) {
Expand Down Expand Up @@ -804,7 +805,11 @@ func invalidTopics(t *testing.T, sporkID flow.Identifier) (string, string, strin
}

// checkNotificationFunc returns util func used to ensure invalid control message notification disseminated contains expected information.
func checkNotificationFunc(t *testing.T, expectedPeerID peer.ID, expectedMsgType p2pmsg.ControlMessageType, isExpectedErr func(err error) bool, topicType p2p.CtrlMsgTopicType) func(args mock.Arguments) {
func checkNotificationFunc(t *testing.T,
expectedPeerID peer.ID,
expectedMsgType p2pmsg.ControlMessageType,
isExpectedErr func(err error) bool,
topicType p2p.CtrlMsgTopicType) func(args mock.Arguments) {
return func(args mock.Arguments) {
notification, ok := args[0].(*p2p.InvCtrlMsgNotif)
require.True(t, ok)
Expand All @@ -815,7 +820,14 @@ func checkNotificationFunc(t *testing.T, expectedPeerID peer.ID, expectedMsgType
}
}

func inspectorFixture(t *testing.T, opts ...func(params *validation.InspectorParams)) (*validation.ControlMsgValidationInspector, *irrecoverable.MockSignalerContext, context.CancelFunc, *mockp2p.GossipSubInspectorNotificationDistributor, *mockp2p.RpcControlTracking, flow.Identifier, *mockmodule.IdentityProvider, *internal.MockUpdatableTopicProvider) {
func inspectorFixture(t *testing.T, opts ...func(params *validation.InspectorParams)) (*validation.ControlMsgValidationInspector,
*irrecoverable.MockSignalerContext,
context.CancelFunc,
*mockp2p.GossipSubInspectorNotificationDistributor,
*mockp2p.RpcControlTracking,
flow.Identifier,
*mockmodule.IdentityProvider,
*internal.MockUpdatableTopicProvider) {
sporkID := unittest.IdentifierFixture()
flowConfig, err := config.DefaultConfig()
require.NoError(t, err)
Expand Down Expand Up @@ -850,7 +862,7 @@ func inspectorFixture(t *testing.T, opts ...func(params *validation.InspectorPar

func stopInspector(t *testing.T, cancel context.CancelFunc, inspector *validation.ControlMsgValidationInspector) {
cancel()
unittest.RequireCloseBefore(t, inspector.Done(), 500*time.Millisecond, "inspector did not stop")
unittest.RequireCloseBefore(t, inspector.Done(), 5*time.Second, "inspector did not stop")
}

func defaultTopicOracle() []string {
Expand Down

0 comments on commit db3a542

Please sign in to comment.