From c2bd668cb2c4859c00190111b76315f9d5d01c8c Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Tue, 25 Jun 2024 09:52:15 -0700 Subject: [PATCH] ipfs: remove useless once --- ipfs/provide.go | 130 +++++++++++++++++++++++------------------------- 1 file changed, 63 insertions(+), 67 deletions(-) diff --git a/ipfs/provide.go b/ipfs/provide.go index d37f194..1ab3732 100644 --- a/ipfs/provide.go +++ b/ipfs/provide.go @@ -2,7 +2,6 @@ package ipfs import ( "context" - "sync" "time" "github.com/ipfs/boxo/provider" @@ -51,85 +50,82 @@ func (r *Reprovider) Trigger() { // Run starts the reprovider loop, which periodically announces CIDs that // have not been announced in the last interval. func (r *Reprovider) Run(ctx context.Context, interval, timeout time.Duration, batchSize int) { - var once sync.Once - once.Do(func() { - var reprovideSleep time.Duration + var reprovideSleep time.Duration - for { - if r.provider.Ready() { - break - } - r.log.Debug("provider not ready") - time.Sleep(30 * time.Second) + for { + if r.provider.Ready() { + break + } + r.log.Debug("provider not ready") + time.Sleep(30 * time.Second) + } + + for { + r.log.Debug("sleeping until next reprovide time", zap.Duration("duration", reprovideSleep)) + select { + case <-ctx.Done(): + return + case <-r.triggerProvide: + r.log.Debug("reprovide triggered") + case <-time.After(reprovideSleep): + r.log.Debug("reprovide sleep expired") + } + + doProvide := func(ctx context.Context, keys []multihash.Multihash) error { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + return r.provider.ProvideMany(ctx, keys) } for { - r.log.Debug("sleeping until next reprovide time", zap.Duration("duration", reprovideSleep)) - select { - case <-ctx.Done(): - return - case <-r.triggerProvide: - r.log.Debug("reprovide triggered") - case <-time.After(reprovideSleep): - r.log.Debug("reprovide sleep expired") - } + start := time.Now() - doProvide := func(ctx context.Context, keys []multihash.Multihash) error { - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - return r.provider.ProvideMany(ctx, keys) + cids, err := r.store.ProvideCIDs(batchSize) + if err != nil { + reprovideSleep = time.Minute + r.log.Error("failed to fetch CIDs to provide", zap.Error(err)) + break + } else if len(cids) == 0 { + reprovideSleep = 10 * time.Minute + r.log.Debug("reprovide complete") + break } - for { - start := time.Now() - - cids, err := r.store.ProvideCIDs(batchSize) - if err != nil { - reprovideSleep = time.Minute - r.log.Error("failed to fetch CIDs to provide", zap.Error(err)) - break - } else if len(cids) == 0 { - reprovideSleep = 10 * time.Minute - r.log.Debug("reprovide complete") - break - } + rem := time.Until(cids[0].LastAnnouncement.Add(interval)) + if rem > 0 { + reprovideSleep = rem + r.log.Debug("reprovide complete") + break + } - rem := time.Until(cids[0].LastAnnouncement.Add(interval)) - if rem > 0 { - reprovideSleep = rem - r.log.Debug("reprovide complete") + announced := make([]cid.Cid, 0, len(cids)) + keys := make([]multihash.Multihash, 0, len(cids)) + // include a slight buffer for CIDs that are about to expire + // so they will be provided as one batch + buffer := interval / 10 + minAnnouncement := time.Now().Add(-(interval - buffer)) + for _, c := range cids { + // only provide CIDs that have not been provided within the + // last interval + if c.LastAnnouncement.After(minAnnouncement) { break } + keys = append(keys, c.CID.Hash()) + announced = append(announced, c.CID) + } - announced := make([]cid.Cid, 0, len(cids)) - keys := make([]multihash.Multihash, 0, len(cids)) - // include a slight buffer for CIDs that are about to expire - // so they will be provided as one batch - buffer := interval / 10 - minAnnouncement := time.Now().Add(-(interval - buffer)) - for _, c := range cids { - // only provide CIDs that have not been provided within the - // last interval - if c.LastAnnouncement.After(minAnnouncement) { - break - } - keys = append(keys, c.CID.Hash()) - announced = append(announced, c.CID) - } - - if err := doProvide(ctx, keys); err != nil { - reprovideSleep = time.Minute - r.log.Error("failed to provide CIDs", zap.Error(err)) - break - } else if err := r.store.SetLastAnnouncement(announced, time.Now()); err != nil { - reprovideSleep = time.Minute - r.log.Error("failed to update last announcement time", zap.Error(err)) - break - } - r.log.Debug("provided CIDs", zap.Int("count", len(announced)), zap.Duration("elapsed", time.Since(start))) + if err := doProvide(ctx, keys); err != nil { + reprovideSleep = time.Minute + r.log.Error("failed to provide CIDs", zap.Error(err)) + break + } else if err := r.store.SetLastAnnouncement(announced, time.Now()); err != nil { + reprovideSleep = time.Minute + r.log.Error("failed to update last announcement time", zap.Error(err)) + break } + r.log.Debug("provided CIDs", zap.Int("count", len(announced)), zap.Duration("elapsed", time.Since(start))) } - }) + } } // NewReprovider creates a new reprovider.