Skip to content

Commit

Permalink
Add topic scoring & reduce pubsub spam (#573)
Browse files Browse the repository at this point in the history
* Add topic scoring & reduce pubsub spam

1. Add topic scoring to both the GPBFT topic and the manifest topic.
2. Hash the data + topic for GPBFT messages.
3. Hash the data + sender + topic for manifest sever messages. We need
to include the sender because we reject messages from invalid senders.
4. In the manifest validator, drop messages with old sequence numbers.

* Bump topic versions

* don't refuse to start the manifest sender

* fix logging

* reduce logging verbosity

* use seqno+sender IDs in tests

* remove manifest sender interval limit

Just use the configured interval.

* final failing test

* Revert manifest sender timeout
  • Loading branch information
Stebalien authored Aug 14, 2024
1 parent e2865e7 commit f14cefe
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 20 deletions.
8 changes: 6 additions & 2 deletions cmd/f3/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ var manifestServeCmd = cli.Command{
&cli.DurationFlag{
Name: "publishInterval",
Usage: "The interval at which manifest is published on pubsub.",
Value: 20 * time.Second,
Value: 2 * pubsub.TimeCacheDuration,
},
},

Expand Down Expand Up @@ -205,7 +205,11 @@ var manifestServeCmd = cli.Command{
return fmt.Errorf("loading initial manifest: %w", err)
}

pubSub, err := pubsub.NewGossipSub(c.Context, host, pubsub.WithPeerExchange(true))
pubSub, err := pubsub.NewGossipSub(c.Context, host,
pubsub.WithPeerExchange(true),
pubsub.WithFloodPublish(true),
pubsub.WithPeerScore(PubsubPeerScoreParams, PubsubPeerScoreThresholds),
)
if err != nil {
return fmt.Errorf("initialzing pubsub: %w", err)
}
Expand Down
12 changes: 10 additions & 2 deletions cmd/f3/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/filecoin-project/go-f3/cmd/f3/msgdump"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/internal/psutil"
"github.com/filecoin-project/go-f3/manifest"
leveldb "github.com/ipfs/go-ds-leveldb"
logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -152,7 +153,11 @@ var observerCmd = cli.Command{
// Connect to bootstrappers once as soon as we start.
connectToBootstrappers()

pubSub, err := pubsub.NewGossipSub(c.Context, host, pubsub.WithPeerExchange(true))
pubSub, err := pubsub.NewGossipSub(c.Context, host,
pubsub.WithPeerExchange(true),
pubsub.WithFloodPublish(true),
pubsub.WithPeerScore(PubsubPeerScoreParams, PubsubPeerScoreThresholds),
)
if err != nil {
return fmt.Errorf("initialzing pubsub: %w", err)
}
Expand Down Expand Up @@ -241,10 +246,13 @@ func observeManifest(ctx context.Context, manif *manifest.Manifest, pubSub *pubs
return fmt.Errorf("registering topic validator: %w", err)
}

topic, err := pubSub.Join(manif.PubSubTopic(), pubsub.WithTopicMessageIdFn(pubsub.DefaultMsgIdFn))
topic, err := pubSub.Join(manif.PubSubTopic(), pubsub.WithTopicMessageIdFn(psutil.GPBFTMessageIdFn))
if err != nil {
return fmt.Errorf("joining topic: %w", err)
}
if err := topic.SetScoreParams(psutil.PubsubTopicScoreParams); err != nil {
return fmt.Errorf("failed to set topic params: %w", err)
}

sub, err := topic.Subscribe()
if err != nil {
Expand Down
43 changes: 43 additions & 0 deletions cmd/f3/pusub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
)

func init() {
Expand All @@ -20,3 +21,45 @@ func init() {
pubsub.GossipSubHistoryLength = 10
pubsub.GossipSubGossipFactor = 0.1
}

// Borrowed from lotus
var PubsubPeerScoreParams = &pubsub.PeerScoreParams{
AppSpecificScore: func(p peer.ID) float64 { return 0 },
AppSpecificWeight: 1,

// This sets the IP colocation threshold to 5 peers before we apply penalties
IPColocationFactorThreshold: 5,
IPColocationFactorWeight: -100,
IPColocationFactorWhitelist: nil,

// P7: behavioural penalties, decay after 1hr
BehaviourPenaltyThreshold: 6,
BehaviourPenaltyWeight: -10,
BehaviourPenaltyDecay: pubsub.ScoreParameterDecay(time.Hour),

DecayInterval: pubsub.DefaultDecayInterval,
DecayToZero: pubsub.DefaultDecayToZero,

// this retains non-positive scores for 6 hours
RetainScore: 6 * time.Hour,

// topic parameters
Topics: make(map[string]*pubsub.TopicScoreParams),
}

var PubsubPeerScoreThresholds = &pubsub.PeerScoreThresholds{
GossipThreshold: GossipScoreThreshold,
PublishThreshold: PublishScoreThreshold,
GraylistThreshold: GraylistScoreThreshold,
AcceptPXThreshold: AcceptPXScoreThreshold,
OpportunisticGraftThreshold: OpportunisticGraftScoreThreshold,
}

// Borrowed from lotus
const (
GossipScoreThreshold = -500
PublishScoreThreshold = -1000
GraylistScoreThreshold = -2500
AcceptPXScoreThreshold = 1000
OpportunisticGraftScoreThreshold = 3.5
)
9 changes: 8 additions & 1 deletion f3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/go-f3/ec"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/internal/clock"
"github.com/filecoin-project/go-f3/internal/psutil"
"github.com/filecoin-project/go-f3/manifest"
"github.com/filecoin-project/go-f3/sim/signing"

Expand All @@ -25,7 +26,13 @@ import (
"golang.org/x/sync/errgroup"
)

const ManifestSenderTimeout = 30 * time.Second
func init() {
// Hash-based deduplication breaks fast rebroadcast, even if we set the time window to be
// really short because gossipsub has a minimum 1m cache scan interval.
psutil.GPBFTMessageIdFn = pubsub.DefaultMsgIdFn
}

var ManifestSenderTimeout = 30 * time.Second

func TestF3Simple(t *testing.T) {
t.Parallel()
Expand Down
9 changes: 8 additions & 1 deletion host.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/filecoin-project/go-f3/ec"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/internal/clock"
"github.com/filecoin-project/go-f3/internal/psutil"
"github.com/filecoin-project/go-f3/manifest"
"go.opentelemetry.io/otel/metric"

Expand Down Expand Up @@ -329,11 +330,17 @@ func (h *gpbftRunner) setupPubsub() error {
// Force the default (sender + seqno) message de-duplication mechanism instead of hashing
// the message (as lotus does) as we need to be able to re-broadcast duplicate messages with
// the same content.
topic, err := h.pubsub.Join(pubsubTopicName, pubsub.WithTopicMessageIdFn(pubsub.DefaultMsgIdFn))
topic, err := h.pubsub.Join(pubsubTopicName, pubsub.WithTopicMessageIdFn(psutil.GPBFTMessageIdFn))
if err != nil {
return fmt.Errorf("could not join on pubsub topic: %s: %w", pubsubTopicName, err)
}

if err := topic.SetScoreParams(psutil.PubsubTopicScoreParams); err != nil {
log.Infow("failed to set topic score params", "error", err)
}

h.topic = topic

return nil
}

Expand Down
97 changes: 97 additions & 0 deletions internal/psutil/psutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package psutil

import (
"encoding/binary"
"time"

"golang.org/x/crypto/blake2b"

pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
)

var ManifestMessageIdFn = pubsubMsgIdHashDataAndSender
var GPBFTMessageIdFn = pubsubMsgIdHashData

// Generate a pubsub ID from the message topic + data.
func pubsubMsgIdHashData(m *pubsub_pb.Message) string {
hasher, err := blake2b.New256(nil)
if err != nil {
panic("failed to construct hasher")
}

topic := []byte(m.GetTopic())
if err := binary.Write(hasher, binary.BigEndian, uint32(len(topic))); err != nil {
panic(err)
}
if _, err := hasher.Write(topic); err != nil {
panic(err)
}

hash := blake2b.Sum256(m.Data)
return string(hash[:])
}

// Generate a pubsub ID from the message topic + sender + data.
func pubsubMsgIdHashDataAndSender(m *pubsub_pb.Message) string {
hasher, err := blake2b.New256(nil)
if err != nil {
panic("failed to construct hasher")
}

topic := []byte(m.GetTopic())
if err := binary.Write(hasher, binary.BigEndian, uint32(len(topic))); err != nil {
panic(err)
}
if _, err := hasher.Write(topic); err != nil {
panic(err)
}
if err := binary.Write(hasher, binary.BigEndian, uint32(len(m.From))); err != nil {
panic(err)
}
if _, err := hasher.Write(m.From); err != nil {
panic(err)
}

hash := blake2b.Sum256(m.Data)
return string(hash[:])
}

// Borrowed from lotus
var PubsubTopicScoreParams = &pubsub.TopicScoreParams{
// expected > 400 msgs/second on average.
//
TopicWeight: 0.1, // max cap is 5, single invalid message is -100

// 1 tick per second, maxes at 1 hour
// XXX
TimeInMeshWeight: 0.0002778, // ~1/3600
TimeInMeshQuantum: time.Second,
TimeInMeshCap: 1,

// NOTE: Gives weight to the peer that tends to deliver first.
// deliveries decay after 10min, cap at 100 tx
FirstMessageDeliveriesWeight: 0.5, // max value is 50
FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(10 * time.Minute),
FirstMessageDeliveriesCap: 100, // 100 messages in 10 minutes

// Mesh Delivery Failure is currently turned off for messages
// This is on purpose as the network is still too small, which results in
// asymmetries and potential unmeshing from negative scores.
// // tracks deliveries in the last minute
// // penalty activates at 1 min and expects 2.5 txs
// MeshMessageDeliveriesWeight: -16, // max penalty is -100
// MeshMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Minute),
// MeshMessageDeliveriesCap: 100, // 100 txs in a minute
// MeshMessageDeliveriesThreshold: 2.5, // 60/12/2 txs/minute
// MeshMessageDeliveriesWindow: 10 * time.Millisecond,
// MeshMessageDeliveriesActivation: time.Minute,

// // decays after 5min
// MeshFailurePenaltyWeight: -16,
// MeshFailurePenaltyDecay: pubsub.ScoreParameterDecay(5 * time.Minute),

// invalid messages decay after 1 hour
InvalidMessageDeliveriesWeight: -1000,
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"errors"
"fmt"
"io"
"sync/atomic"

"github.com/filecoin-project/go-f3/internal/psutil"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand All @@ -20,7 +22,7 @@ var log = logging.Logger("f3/dynamic-manifest")

var _ ManifestProvider = (*DynamicManifestProvider)(nil)

const ManifestPubSubTopicName = "/f3/manifests/0.0.1"
const ManifestPubSubTopicName = "/f3/manifests/0.0.2"

// DynamicManifestProvider is a manifest provider that allows
// the manifest to be changed at runtime.
Expand All @@ -35,6 +37,7 @@ type DynamicManifestProvider struct {

initialManifest *Manifest
manifestChanges chan *Manifest
sequenceNumber atomic.Uint64
}

// ManifestUpdateMessage updates the GPBFT manifest.
Expand Down Expand Up @@ -96,23 +99,24 @@ func (m *DynamicManifestProvider) Start(startCtx context.Context) error {
return err
}

// Force the default (sender + seqno) message de-duplication mechanism instead of hashing
// the message (as lotus does) as validation depends on the sender, not the contents of the
// message.
manifestTopic, err := m.pubsub.Join(ManifestPubSubTopicName, pubsub.WithTopicMessageIdFn(pubsub.DefaultMsgIdFn))
// Use the message hash as the message ID to reduce the chances of routing cycles. We ensure
// our rebroadcast interval is greater than our cache timeout.
manifestTopic, err := m.pubsub.Join(ManifestPubSubTopicName, pubsub.WithTopicMessageIdFn(psutil.ManifestMessageIdFn))
if err != nil {
return fmt.Errorf("could not join manifest pubsub topic: %w", err)
}

if err := manifestTopic.SetScoreParams(psutil.PubsubTopicScoreParams); err != nil {
log.Infow("failed to set topic score params", "error", err)
}

manifestSub, err := manifestTopic.Subscribe()
if err != nil {
return fmt.Errorf("subscribing to manifest pubsub topic: %w", err)
}

var msgSeqNumber uint64
var currentManifest *Manifest
if mBytes, err := m.ds.Get(startCtx, latestManifestKey); errors.Is(err, datastore.ErrNotFound) {
msgSeqNumber = 0
currentManifest = m.initialManifest
} else if err != nil {
return fmt.Errorf("error while checking saved manifest")
Expand All @@ -123,7 +127,7 @@ func (m *DynamicManifestProvider) Start(startCtx context.Context) error {
return fmt.Errorf("decoding saved manifest: %w", err)
}

msgSeqNumber = update.MessageSequence
m.sequenceNumber.Store(update.MessageSequence)
currentManifest = &update.Manifest
}

Expand Down Expand Up @@ -164,10 +168,13 @@ func (m *DynamicManifestProvider) Start(startCtx context.Context) error {
continue
}

if update.MessageSequence <= msgSeqNumber {
log.Debugw("discarded manifest update", "newSeqNo", update.MessageSequence, "oldSeqNo", msgSeqNumber)
oldSeq := m.sequenceNumber.Load()

if update.MessageSequence <= oldSeq {
log.Debugw("discarded manifest update", "newSeqNo", update.MessageSequence, "oldSeqNo", oldSeq)
continue
}
m.sequenceNumber.Store(update.MessageSequence)

if err := update.Manifest.Validate(); err != nil {
log.Errorw("received invalid manifest, discarded", "error", err)
Expand All @@ -180,7 +187,6 @@ func (m *DynamicManifestProvider) Start(startCtx context.Context) error {
}

log.Infow("received manifest update", "seqNo", update.MessageSequence)
msgSeqNumber = update.MessageSequence

oldManifest := currentManifest
manifestCopy := update.Manifest
Expand Down Expand Up @@ -227,6 +233,11 @@ func (m *DynamicManifestProvider) registerTopicValidator() error {
return pubsub.ValidationReject
}

// Only allow the latest sequence number through.
if update.MessageSequence < m.sequenceNumber.Load() {
return pubsub.ValidationIgnore
}

// TODO: Any additional validation?
// Expect a sequence number that is over our current sequence number.
// Expect an BootstrapEpoch over the BootstrapEpoch of the current manifests?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

"github.com/filecoin-project/go-f3/internal/clock"
"github.com/filecoin-project/go-f3/internal/psutil"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -43,10 +45,13 @@ func NewManifestSender(ctx context.Context, h host.Host, ps *pubsub.PubSub, firs
}

var err error
m.manifestTopic, err = m.pubsub.Join(ManifestPubSubTopicName, pubsub.WithTopicMessageIdFn(pubsub.DefaultMsgIdFn))
m.manifestTopic, err = m.pubsub.Join(ManifestPubSubTopicName, pubsub.WithTopicMessageIdFn(psutil.ManifestMessageIdFn))
if err != nil {
return nil, fmt.Errorf("could not join on pubsub topic: %s: %w", ManifestPubSubTopicName, err)
}
if err := m.manifestTopic.SetScoreParams(psutil.PubsubTopicScoreParams); err != nil {
log.Infow("could not set topic score params", "error", err)
}

// Record one-off attributes about the sender for easier runtime debugging.
metrics.senderInfo.Record(ctx, 1, metric.WithAttributes(
Expand Down
2 changes: 1 addition & 1 deletion manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (m *Manifest) PubSubTopic() string {
}

func PubSubTopicFromNetworkName(nn gpbft.NetworkName) string {
return "/f3/granite/0.0.1/" + string(nn)
return "/f3/granite/0.0.2/" + string(nn)
}

func (m *Manifest) GpbftOptions() []gpbft.Option {
Expand Down
Loading

0 comments on commit f14cefe

Please sign in to comment.