From d1f34b2d5a2147593b46084f84c6028de4b0b6e8 Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Tue, 7 May 2024 21:59:26 -0700 Subject: [PATCH] ipfs: raise bulk send parallelism --- ipfs/node.go | 18 +++++++++++------- ipfs/provide.go | 44 +++++++++++++++++++++----------------------- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/ipfs/node.go b/ipfs/node.go index 7fdea41..e82ced8 100644 --- a/ipfs/node.go +++ b/ipfs/node.go @@ -215,14 +215,18 @@ func NewNode(ctx context.Context, privateKey crypto.PrivKey, cfg config.IPFS, rs return nil, fmt.Errorf("failed to create libp2p host: %w", err) } - dhtOpts := []dht.Option{ - dht.Mode(dht.ModeServer), - dht.BootstrapPeers(bootstrapPeers...), - dht.BucketSize(20), - dht.Concurrency(30), - dht.Datastore(ds), + fullRTOpts := []fullrt.Option{ + fullrt.DHTOption([]dht.Option{ + dht.Mode(dht.ModeServer), + dht.BootstrapPeers(bootstrapPeers...), + dht.BucketSize(40), + dht.Concurrency(60), + dht.Datastore(ds), + }...), + fullrt.WithBulkSendParallelism(256), } - frt, err := fullrt.NewFullRT(host, dht.DefaultPrefix, fullrt.DHTOption(dhtOpts...)) + + frt, err := fullrt.NewFullRT(host, dht.DefaultPrefix, fullRTOpts...) if err != nil { return nil, fmt.Errorf("failed to create fullrt: %w", err) } diff --git a/ipfs/provide.go b/ipfs/provide.go index c02d953..be4fd86 100644 --- a/ipfs/provide.go +++ b/ipfs/provide.go @@ -71,52 +71,50 @@ func (r *Reprovider) Run(ctx context.Context, interval time.Duration) { continue } + doProvide := func(ctx context.Context, keys []multihash.Multihash) error { + ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + return r.provider.ProvideMany(ctx, keys) + } + for { start := time.Now() - cids, err := r.store.ProvideCIDs(5000) + + cids, err := r.store.ProvideCIDs(1000) if err != nil { + reprovideSleep = time.Minute r.log.Error("failed to fetch CIDs to provide", zap.Error(err)) break - } - - if len(cids) == 0 { - r.log.Debug("no CIDs to provide") - reprovideSleep = 15 * time.Minute // set a minimum sleep time even + } else if len(cids) == 0 { + reprovideSleep = 10 * time.Minute + r.log.Debug("reprovide complete") break } - nextAnnounce := time.Until(cids[0].LastAnnouncement.Add(interval)) - - if nextAnnounce > 0 { - r.log.Debug("sleeping until next reprovide time", zap.Duration("duration", nextAnnounce)) - reprovideSleep = nextAnnounce + rem := time.Until(cids[0].LastAnnouncement.Add(interval)) + if rem > 0 { + reprovideSleep = rem + r.log.Debug("reprovide complete") break } announced := make([]cid.Cid, 0, len(cids)) keys := make([]multihash.Multihash, 0, len(cids)) for _, c := range cids { - if time.Since(c.LastAnnouncement) < interval { - reprovideSleep = time.Until(c.LastAnnouncement.Add(interval)) - break - } - keys = append(keys, c.CID.Hash()) announced = append(announced, c.CID) } - ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) - defer cancel() - - if err := r.provider.ProvideMany(ctx, keys); err != nil { + if err := doProvide(ctx, keys); err != nil { + reprovideSleep = time.Minute r.log.Error("failed to provide CIDs", zap.Error(err)) break } else if err := r.store.SetLastAnnouncement(announced, time.Now()); err != nil { - r.log.Error("failed to update last announcement", zap.Error(err)) + reprovideSleep = time.Minute + r.log.Error("failed to update last announcement time", zap.Error(err)) break } - - r.log.Debug("announced CIDs", zap.Int("count", len(announced)), zap.Duration("elapsed", time.Since(start))) + r.log.Debug("provided CIDs", zap.Int("count", len(announced)), zap.Duration("elapsed", time.Since(start))) } } })