From f14cefe2170ce93394ee4fdab83277b260ce70a5 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 14 Aug 2024 19:22:42 +0000 Subject: [PATCH] Add topic scoring & reduce pubsub spam (#573) * Add topic scoring & reduce pubsub spam 1. Add topic scoring to both the GPBFT topic and the manifest topic. 2. Hash the data + topic for GPBFT messages. 3. Hash the data + sender + topic for manifest sever messages. We need to include the sender because we reject messages from invalid senders. 4. In the manifest validator, drop messages with old sequence numbers. * Bump topic versions * don't refuse to start the manifest sender * fix logging * reduce logging verbosity * use seqno+sender IDs in tests * remove manifest sender interval limit Just use the configured interval. * final failing test * Revert manifest sender timeout --- cmd/f3/manifest.go | 8 +- cmd/f3/observer.go | 12 ++- cmd/f3/pusub.go | 43 ++++++++ f3_test.go | 9 +- host.go | 9 +- internal/psutil/psutil.go | 97 +++++++++++++++++++ ...nifest.go => dynamic_manifest_provider.go} | 33 ++++--- ...t_sender.go => dynamic_manifest_sender.go} | 7 +- manifest/manifest.go | 2 +- manifest/manifest_test.go | 2 +- 10 files changed, 202 insertions(+), 20 deletions(-) create mode 100644 internal/psutil/psutil.go rename manifest/{dynamic_manifest.go => dynamic_manifest_provider.go} (88%) rename manifest/{manifest_sender.go => dynamic_manifest_sender.go} (91%) diff --git a/cmd/f3/manifest.go b/cmd/f3/manifest.go index 600a1e51..9bde7176 100644 --- a/cmd/f3/manifest.go +++ b/cmd/f3/manifest.go @@ -125,7 +125,7 @@ var manifestServeCmd = cli.Command{ &cli.DurationFlag{ Name: "publishInterval", Usage: "The interval at which manifest is published on pubsub.", - Value: 20 * time.Second, + Value: 2 * pubsub.TimeCacheDuration, }, }, @@ -205,7 +205,11 @@ var manifestServeCmd = cli.Command{ return fmt.Errorf("loading initial manifest: %w", err) } - pubSub, err := pubsub.NewGossipSub(c.Context, host, pubsub.WithPeerExchange(true)) + pubSub, err := pubsub.NewGossipSub(c.Context, host, + pubsub.WithPeerExchange(true), + pubsub.WithFloodPublish(true), + pubsub.WithPeerScore(PubsubPeerScoreParams, PubsubPeerScoreThresholds), + ) if err != nil { return fmt.Errorf("initialzing pubsub: %w", err) } diff --git a/cmd/f3/observer.go b/cmd/f3/observer.go index bfe7bbff..6bfa958d 100644 --- a/cmd/f3/observer.go +++ b/cmd/f3/observer.go @@ -17,6 +17,7 @@ import ( "github.com/filecoin-project/go-f3/cmd/f3/msgdump" "github.com/filecoin-project/go-f3/gpbft" + "github.com/filecoin-project/go-f3/internal/psutil" "github.com/filecoin-project/go-f3/manifest" leveldb "github.com/ipfs/go-ds-leveldb" logging "github.com/ipfs/go-log/v2" @@ -152,7 +153,11 @@ var observerCmd = cli.Command{ // Connect to bootstrappers once as soon as we start. connectToBootstrappers() - pubSub, err := pubsub.NewGossipSub(c.Context, host, pubsub.WithPeerExchange(true)) + pubSub, err := pubsub.NewGossipSub(c.Context, host, + pubsub.WithPeerExchange(true), + pubsub.WithFloodPublish(true), + pubsub.WithPeerScore(PubsubPeerScoreParams, PubsubPeerScoreThresholds), + ) if err != nil { return fmt.Errorf("initialzing pubsub: %w", err) } @@ -241,10 +246,13 @@ func observeManifest(ctx context.Context, manif *manifest.Manifest, pubSub *pubs return fmt.Errorf("registering topic validator: %w", err) } - topic, err := pubSub.Join(manif.PubSubTopic(), pubsub.WithTopicMessageIdFn(pubsub.DefaultMsgIdFn)) + topic, err := pubSub.Join(manif.PubSubTopic(), pubsub.WithTopicMessageIdFn(psutil.GPBFTMessageIdFn)) if err != nil { return fmt.Errorf("joining topic: %w", err) } + if err := topic.SetScoreParams(psutil.PubsubTopicScoreParams); err != nil { + return fmt.Errorf("failed to set topic params: %w", err) + } sub, err := topic.Subscribe() if err != nil { diff --git a/cmd/f3/pusub.go b/cmd/f3/pusub.go index d0671159..32e34106 100644 --- a/cmd/f3/pusub.go +++ b/cmd/f3/pusub.go @@ -4,6 +4,7 @@ import ( "time" pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/peer" ) func init() { @@ -20,3 +21,45 @@ func init() { pubsub.GossipSubHistoryLength = 10 pubsub.GossipSubGossipFactor = 0.1 } + +// Borrowed from lotus +var PubsubPeerScoreParams = &pubsub.PeerScoreParams{ + AppSpecificScore: func(p peer.ID) float64 { return 0 }, + AppSpecificWeight: 1, + + // This sets the IP colocation threshold to 5 peers before we apply penalties + IPColocationFactorThreshold: 5, + IPColocationFactorWeight: -100, + IPColocationFactorWhitelist: nil, + + // P7: behavioural penalties, decay after 1hr + BehaviourPenaltyThreshold: 6, + BehaviourPenaltyWeight: -10, + BehaviourPenaltyDecay: pubsub.ScoreParameterDecay(time.Hour), + + DecayInterval: pubsub.DefaultDecayInterval, + DecayToZero: pubsub.DefaultDecayToZero, + + // this retains non-positive scores for 6 hours + RetainScore: 6 * time.Hour, + + // topic parameters + Topics: make(map[string]*pubsub.TopicScoreParams), +} + +var PubsubPeerScoreThresholds = &pubsub.PeerScoreThresholds{ + GossipThreshold: GossipScoreThreshold, + PublishThreshold: PublishScoreThreshold, + GraylistThreshold: GraylistScoreThreshold, + AcceptPXThreshold: AcceptPXScoreThreshold, + OpportunisticGraftThreshold: OpportunisticGraftScoreThreshold, +} + +// Borrowed from lotus +const ( + GossipScoreThreshold = -500 + PublishScoreThreshold = -1000 + GraylistScoreThreshold = -2500 + AcceptPXScoreThreshold = 1000 + OpportunisticGraftScoreThreshold = 3.5 +) diff --git a/f3_test.go b/f3_test.go index 01fca7c6..3342f563 100644 --- a/f3_test.go +++ b/f3_test.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/go-f3/ec" "github.com/filecoin-project/go-f3/gpbft" "github.com/filecoin-project/go-f3/internal/clock" + "github.com/filecoin-project/go-f3/internal/psutil" "github.com/filecoin-project/go-f3/manifest" "github.com/filecoin-project/go-f3/sim/signing" @@ -25,7 +26,13 @@ import ( "golang.org/x/sync/errgroup" ) -const ManifestSenderTimeout = 30 * time.Second +func init() { + // Hash-based deduplication breaks fast rebroadcast, even if we set the time window to be + // really short because gossipsub has a minimum 1m cache scan interval. + psutil.GPBFTMessageIdFn = pubsub.DefaultMsgIdFn +} + +var ManifestSenderTimeout = 30 * time.Second func TestF3Simple(t *testing.T) { t.Parallel() diff --git a/host.go b/host.go index 17d9cd8f..9dd8dbbd 100644 --- a/host.go +++ b/host.go @@ -13,6 +13,7 @@ import ( "github.com/filecoin-project/go-f3/ec" "github.com/filecoin-project/go-f3/gpbft" "github.com/filecoin-project/go-f3/internal/clock" + "github.com/filecoin-project/go-f3/internal/psutil" "github.com/filecoin-project/go-f3/manifest" "go.opentelemetry.io/otel/metric" @@ -329,11 +330,17 @@ func (h *gpbftRunner) setupPubsub() error { // Force the default (sender + seqno) message de-duplication mechanism instead of hashing // the message (as lotus does) as we need to be able to re-broadcast duplicate messages with // the same content. - topic, err := h.pubsub.Join(pubsubTopicName, pubsub.WithTopicMessageIdFn(pubsub.DefaultMsgIdFn)) + topic, err := h.pubsub.Join(pubsubTopicName, pubsub.WithTopicMessageIdFn(psutil.GPBFTMessageIdFn)) if err != nil { return fmt.Errorf("could not join on pubsub topic: %s: %w", pubsubTopicName, err) } + + if err := topic.SetScoreParams(psutil.PubsubTopicScoreParams); err != nil { + log.Infow("failed to set topic score params", "error", err) + } + h.topic = topic + return nil } diff --git a/internal/psutil/psutil.go b/internal/psutil/psutil.go new file mode 100644 index 00000000..bd5060ac --- /dev/null +++ b/internal/psutil/psutil.go @@ -0,0 +1,97 @@ +package psutil + +import ( + "encoding/binary" + "time" + + "golang.org/x/crypto/blake2b" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" +) + +var ManifestMessageIdFn = pubsubMsgIdHashDataAndSender +var GPBFTMessageIdFn = pubsubMsgIdHashData + +// Generate a pubsub ID from the message topic + data. +func pubsubMsgIdHashData(m *pubsub_pb.Message) string { + hasher, err := blake2b.New256(nil) + if err != nil { + panic("failed to construct hasher") + } + + topic := []byte(m.GetTopic()) + if err := binary.Write(hasher, binary.BigEndian, uint32(len(topic))); err != nil { + panic(err) + } + if _, err := hasher.Write(topic); err != nil { + panic(err) + } + + hash := blake2b.Sum256(m.Data) + return string(hash[:]) +} + +// Generate a pubsub ID from the message topic + sender + data. +func pubsubMsgIdHashDataAndSender(m *pubsub_pb.Message) string { + hasher, err := blake2b.New256(nil) + if err != nil { + panic("failed to construct hasher") + } + + topic := []byte(m.GetTopic()) + if err := binary.Write(hasher, binary.BigEndian, uint32(len(topic))); err != nil { + panic(err) + } + if _, err := hasher.Write(topic); err != nil { + panic(err) + } + if err := binary.Write(hasher, binary.BigEndian, uint32(len(m.From))); err != nil { + panic(err) + } + if _, err := hasher.Write(m.From); err != nil { + panic(err) + } + + hash := blake2b.Sum256(m.Data) + return string(hash[:]) +} + +// Borrowed from lotus +var PubsubTopicScoreParams = &pubsub.TopicScoreParams{ + // expected > 400 msgs/second on average. + // + TopicWeight: 0.1, // max cap is 5, single invalid message is -100 + + // 1 tick per second, maxes at 1 hour + // XXX + TimeInMeshWeight: 0.0002778, // ~1/3600 + TimeInMeshQuantum: time.Second, + TimeInMeshCap: 1, + + // NOTE: Gives weight to the peer that tends to deliver first. + // deliveries decay after 10min, cap at 100 tx + FirstMessageDeliveriesWeight: 0.5, // max value is 50 + FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(10 * time.Minute), + FirstMessageDeliveriesCap: 100, // 100 messages in 10 minutes + + // Mesh Delivery Failure is currently turned off for messages + // This is on purpose as the network is still too small, which results in + // asymmetries and potential unmeshing from negative scores. + // // tracks deliveries in the last minute + // // penalty activates at 1 min and expects 2.5 txs + // MeshMessageDeliveriesWeight: -16, // max penalty is -100 + // MeshMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Minute), + // MeshMessageDeliveriesCap: 100, // 100 txs in a minute + // MeshMessageDeliveriesThreshold: 2.5, // 60/12/2 txs/minute + // MeshMessageDeliveriesWindow: 10 * time.Millisecond, + // MeshMessageDeliveriesActivation: time.Minute, + + // // decays after 5min + // MeshFailurePenaltyWeight: -16, + // MeshFailurePenaltyDecay: pubsub.ScoreParameterDecay(5 * time.Minute), + + // invalid messages decay after 1 hour + InvalidMessageDeliveriesWeight: -1000, + InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), +} diff --git a/manifest/dynamic_manifest.go b/manifest/dynamic_manifest_provider.go similarity index 88% rename from manifest/dynamic_manifest.go rename to manifest/dynamic_manifest_provider.go index d08cef53..56c2378b 100644 --- a/manifest/dynamic_manifest.go +++ b/manifest/dynamic_manifest_provider.go @@ -7,7 +7,9 @@ import ( "errors" "fmt" "io" + "sync/atomic" + "github.com/filecoin-project/go-f3/internal/psutil" "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -20,7 +22,7 @@ var log = logging.Logger("f3/dynamic-manifest") var _ ManifestProvider = (*DynamicManifestProvider)(nil) -const ManifestPubSubTopicName = "/f3/manifests/0.0.1" +const ManifestPubSubTopicName = "/f3/manifests/0.0.2" // DynamicManifestProvider is a manifest provider that allows // the manifest to be changed at runtime. @@ -35,6 +37,7 @@ type DynamicManifestProvider struct { initialManifest *Manifest manifestChanges chan *Manifest + sequenceNumber atomic.Uint64 } // ManifestUpdateMessage updates the GPBFT manifest. @@ -96,23 +99,24 @@ func (m *DynamicManifestProvider) Start(startCtx context.Context) error { return err } - // Force the default (sender + seqno) message de-duplication mechanism instead of hashing - // the message (as lotus does) as validation depends on the sender, not the contents of the - // message. - manifestTopic, err := m.pubsub.Join(ManifestPubSubTopicName, pubsub.WithTopicMessageIdFn(pubsub.DefaultMsgIdFn)) + // Use the message hash as the message ID to reduce the chances of routing cycles. We ensure + // our rebroadcast interval is greater than our cache timeout. + manifestTopic, err := m.pubsub.Join(ManifestPubSubTopicName, pubsub.WithTopicMessageIdFn(psutil.ManifestMessageIdFn)) if err != nil { return fmt.Errorf("could not join manifest pubsub topic: %w", err) } + if err := manifestTopic.SetScoreParams(psutil.PubsubTopicScoreParams); err != nil { + log.Infow("failed to set topic score params", "error", err) + } + manifestSub, err := manifestTopic.Subscribe() if err != nil { return fmt.Errorf("subscribing to manifest pubsub topic: %w", err) } - var msgSeqNumber uint64 var currentManifest *Manifest if mBytes, err := m.ds.Get(startCtx, latestManifestKey); errors.Is(err, datastore.ErrNotFound) { - msgSeqNumber = 0 currentManifest = m.initialManifest } else if err != nil { return fmt.Errorf("error while checking saved manifest") @@ -123,7 +127,7 @@ func (m *DynamicManifestProvider) Start(startCtx context.Context) error { return fmt.Errorf("decoding saved manifest: %w", err) } - msgSeqNumber = update.MessageSequence + m.sequenceNumber.Store(update.MessageSequence) currentManifest = &update.Manifest } @@ -164,10 +168,13 @@ func (m *DynamicManifestProvider) Start(startCtx context.Context) error { continue } - if update.MessageSequence <= msgSeqNumber { - log.Debugw("discarded manifest update", "newSeqNo", update.MessageSequence, "oldSeqNo", msgSeqNumber) + oldSeq := m.sequenceNumber.Load() + + if update.MessageSequence <= oldSeq { + log.Debugw("discarded manifest update", "newSeqNo", update.MessageSequence, "oldSeqNo", oldSeq) continue } + m.sequenceNumber.Store(update.MessageSequence) if err := update.Manifest.Validate(); err != nil { log.Errorw("received invalid manifest, discarded", "error", err) @@ -180,7 +187,6 @@ func (m *DynamicManifestProvider) Start(startCtx context.Context) error { } log.Infow("received manifest update", "seqNo", update.MessageSequence) - msgSeqNumber = update.MessageSequence oldManifest := currentManifest manifestCopy := update.Manifest @@ -227,6 +233,11 @@ func (m *DynamicManifestProvider) registerTopicValidator() error { return pubsub.ValidationReject } + // Only allow the latest sequence number through. + if update.MessageSequence < m.sequenceNumber.Load() { + return pubsub.ValidationIgnore + } + // TODO: Any additional validation? // Expect a sequence number that is over our current sequence number. // Expect an BootstrapEpoch over the BootstrapEpoch of the current manifests? diff --git a/manifest/manifest_sender.go b/manifest/dynamic_manifest_sender.go similarity index 91% rename from manifest/manifest_sender.go rename to manifest/dynamic_manifest_sender.go index 96dff4e5..8f8f5475 100644 --- a/manifest/manifest_sender.go +++ b/manifest/dynamic_manifest_sender.go @@ -7,6 +7,8 @@ import ( "time" "github.com/filecoin-project/go-f3/internal/clock" + "github.com/filecoin-project/go-f3/internal/psutil" + pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" @@ -43,10 +45,13 @@ func NewManifestSender(ctx context.Context, h host.Host, ps *pubsub.PubSub, firs } var err error - m.manifestTopic, err = m.pubsub.Join(ManifestPubSubTopicName, pubsub.WithTopicMessageIdFn(pubsub.DefaultMsgIdFn)) + m.manifestTopic, err = m.pubsub.Join(ManifestPubSubTopicName, pubsub.WithTopicMessageIdFn(psutil.ManifestMessageIdFn)) if err != nil { return nil, fmt.Errorf("could not join on pubsub topic: %s: %w", ManifestPubSubTopicName, err) } + if err := m.manifestTopic.SetScoreParams(psutil.PubsubTopicScoreParams); err != nil { + log.Infow("could not set topic score params", "error", err) + } // Record one-off attributes about the sender for easier runtime debugging. metrics.senderInfo.Record(ctx, 1, metric.WithAttributes( diff --git a/manifest/manifest.go b/manifest/manifest.go index e077a885..63a81602 100644 --- a/manifest/manifest.go +++ b/manifest/manifest.go @@ -324,7 +324,7 @@ func (m *Manifest) PubSubTopic() string { } func PubSubTopicFromNetworkName(nn gpbft.NetworkName) string { - return "/f3/granite/0.0.1/" + string(nn) + return "/f3/granite/0.0.2/" + string(nn) } func (m *Manifest) GpbftOptions() []gpbft.Option { diff --git a/manifest/manifest_test.go b/manifest/manifest_test.go index ca588da4..b4070671 100644 --- a/manifest/manifest_test.go +++ b/manifest/manifest_test.go @@ -111,7 +111,7 @@ func TestManifest_NetworkName(t *testing.T) { require.True(t, strings.HasPrefix(gotDsPrefix, "/f3")) require.True(t, strings.HasSuffix(gotDsPrefix, string(test.subject))) gotPubSubTopic := m.PubSubTopic() - require.True(t, strings.HasPrefix(gotPubSubTopic, "/f3/granite/0.0.1/")) + require.True(t, strings.HasPrefix(gotPubSubTopic, "/f3/granite/0.0.2/")) require.True(t, strings.HasSuffix(gotPubSubTopic, string(test.subject))) }) }