Skip to content

Commit

Permalink
ipfs: remove useless once
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Jun 25, 2024
1 parent 02c1615 commit c2bd668
Showing 1 changed file with 63 additions and 67 deletions.
130 changes: 63 additions & 67 deletions ipfs/provide.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ipfs

import (
"context"
"sync"
"time"

"github.com/ipfs/boxo/provider"
Expand Down Expand Up @@ -51,85 +50,82 @@ func (r *Reprovider) Trigger() {
// Run starts the reprovider loop, which periodically announces CIDs that
// have not been announced in the last interval.
func (r *Reprovider) Run(ctx context.Context, interval, timeout time.Duration, batchSize int) {
var once sync.Once
once.Do(func() {
var reprovideSleep time.Duration
var reprovideSleep time.Duration

for {
if r.provider.Ready() {
break
}
r.log.Debug("provider not ready")
time.Sleep(30 * time.Second)
for {
if r.provider.Ready() {
break
}
r.log.Debug("provider not ready")
time.Sleep(30 * time.Second)
}

for {
r.log.Debug("sleeping until next reprovide time", zap.Duration("duration", reprovideSleep))
select {
case <-ctx.Done():
return
case <-r.triggerProvide:
r.log.Debug("reprovide triggered")
case <-time.After(reprovideSleep):
r.log.Debug("reprovide sleep expired")
}

doProvide := func(ctx context.Context, keys []multihash.Multihash) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return r.provider.ProvideMany(ctx, keys)
}

for {
r.log.Debug("sleeping until next reprovide time", zap.Duration("duration", reprovideSleep))
select {
case <-ctx.Done():
return
case <-r.triggerProvide:
r.log.Debug("reprovide triggered")
case <-time.After(reprovideSleep):
r.log.Debug("reprovide sleep expired")
}
start := time.Now()

doProvide := func(ctx context.Context, keys []multihash.Multihash) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return r.provider.ProvideMany(ctx, keys)
cids, err := r.store.ProvideCIDs(batchSize)
if err != nil {
reprovideSleep = time.Minute
r.log.Error("failed to fetch CIDs to provide", zap.Error(err))
break
} else if len(cids) == 0 {
reprovideSleep = 10 * time.Minute
r.log.Debug("reprovide complete")
break
}

for {
start := time.Now()

cids, err := r.store.ProvideCIDs(batchSize)
if err != nil {
reprovideSleep = time.Minute
r.log.Error("failed to fetch CIDs to provide", zap.Error(err))
break
} else if len(cids) == 0 {
reprovideSleep = 10 * time.Minute
r.log.Debug("reprovide complete")
break
}
rem := time.Until(cids[0].LastAnnouncement.Add(interval))
if rem > 0 {
reprovideSleep = rem
r.log.Debug("reprovide complete")
break
}

rem := time.Until(cids[0].LastAnnouncement.Add(interval))
if rem > 0 {
reprovideSleep = rem
r.log.Debug("reprovide complete")
announced := make([]cid.Cid, 0, len(cids))
keys := make([]multihash.Multihash, 0, len(cids))
// include a slight buffer for CIDs that are about to expire
// so they will be provided as one batch
buffer := interval / 10
minAnnouncement := time.Now().Add(-(interval - buffer))
for _, c := range cids {
// only provide CIDs that have not been provided within the
// last interval
if c.LastAnnouncement.After(minAnnouncement) {
break
}
keys = append(keys, c.CID.Hash())
announced = append(announced, c.CID)
}

announced := make([]cid.Cid, 0, len(cids))
keys := make([]multihash.Multihash, 0, len(cids))
// include a slight buffer for CIDs that are about to expire
// so they will be provided as one batch
buffer := interval / 10
minAnnouncement := time.Now().Add(-(interval - buffer))
for _, c := range cids {
// only provide CIDs that have not been provided within the
// last interval
if c.LastAnnouncement.After(minAnnouncement) {
break
}
keys = append(keys, c.CID.Hash())
announced = append(announced, c.CID)
}

if err := doProvide(ctx, keys); err != nil {
reprovideSleep = time.Minute
r.log.Error("failed to provide CIDs", zap.Error(err))
break
} else if err := r.store.SetLastAnnouncement(announced, time.Now()); err != nil {
reprovideSleep = time.Minute
r.log.Error("failed to update last announcement time", zap.Error(err))
break
}
r.log.Debug("provided CIDs", zap.Int("count", len(announced)), zap.Duration("elapsed", time.Since(start)))
if err := doProvide(ctx, keys); err != nil {
reprovideSleep = time.Minute
r.log.Error("failed to provide CIDs", zap.Error(err))
break
} else if err := r.store.SetLastAnnouncement(announced, time.Now()); err != nil {
reprovideSleep = time.Minute
r.log.Error("failed to update last announcement time", zap.Error(err))
break
}
r.log.Debug("provided CIDs", zap.Int("count", len(announced)), zap.Duration("elapsed", time.Since(start)))
}
})
}
}

// NewReprovider creates a new reprovider.
Expand Down

0 comments on commit c2bd668

Please sign in to comment.