diff --git a/bgs/bgs.go b/bgs/bgs.go index 18eab263f..b64df715e 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -1646,13 +1646,15 @@ func (bgs *BGS) ResyncPDS(ctx context.Context, pds models.PDS) error { log.Warnw("listed all repos, checking roots", "num_repos", len(repos), "took", repolistDone.Sub(start)) resync = bgs.SetResyncStatus(pds.ID, "checking revs") - // run loop over repos with some concurrency + // Create a buffered channel for collecting results + results := make(chan revCheckResult, len(repos)) sem := semaphore.NewWeighted(40) // Check repo revs against our local copy and enqueue crawls for any that are out of date - for i, r := range repos { + for _, r := range repos { if err := sem.Acquire(ctx, 1); err != nil { log.Errorw("failed to acquire semaphore", "error", err) + results <- revCheckResult{err: err} continue } go func(r comatproto.SyncListRepos_Repo) { @@ -1662,41 +1664,56 @@ func (bgs *BGS) ResyncPDS(ctx context.Context, pds models.PDS) error { ai, err := bgs.Index.GetUserOrMissing(ctx, r.Did) if err != nil { log.Errorw("failed to get user while resyncing PDS, we can't recrawl it", "error", err) + results <- revCheckResult{err: err} return } rev, err := bgs.repoman.GetRepoRev(ctx, ai.Uid) if err != nil { log.Warnw("recrawling because we failed to get the local repo root", "err", err, "uid", ai.Uid) - err := bgs.Index.Crawler.Crawl(ctx, ai) - if err != nil { - log.Errorw("failed to enqueue crawl for repo during resync", "error", err, "uid", ai.Uid, "did", ai.Did) - } + results <- revCheckResult{ai: ai} return } if rev == "" || rev < r.Rev { log.Warnw("recrawling because the repo rev from the PDS is newer than our local repo rev", "local_rev", rev) - err := bgs.Index.Crawler.Crawl(ctx, ai) - if err != nil { - log.Errorw("failed to enqueue crawl for repo during resync", "error", err, "uid", ai.Uid, "did", ai.Did) - } + results <- revCheckResult{ai: ai} return } + + results <- revCheckResult{} }(r) + } + + var numReposToResync int + for i := 0; i < len(repos); i++ { + res := <-results + if res.err != nil { + log.Errorw("failed to process repo during resync", "error", res.err) + + } + if res.ai != nil { + numReposToResync++ + err := bgs.Index.Crawler.Crawl(ctx, res.ai) + if err != nil { + log.Errorw("failed to enqueue crawl for repo during resync", "error", err, "uid", res.ai.Uid, "did", res.ai.Did) + } + } if i%100 == 0 { if i%10_000 == 0 { - log.Warnw("checked revs during resync", "num_repos_checked", i, "num_repos_to_crawl", -1, "took", time.Now().Sub(resync.StatusChangedAt)) + log.Warnw("checked revs during resync", "num_repos_checked", i, "num_repos_to_crawl", numReposToResync, "took", time.Now().Sub(resync.StatusChangedAt)) } resync.NumReposChecked = i + resync.NumReposToResync = numReposToResync bgs.UpdateResync(resync) } } resync.NumReposChecked = len(repos) + resync.NumReposToResync = numReposToResync bgs.UpdateResync(resync) - log.Warnw("enqueued all crawls, exiting resync", "took", time.Now().Sub(start), "num_repos_to_crawl", -1) + log.Warnw("enqueued all crawls, exiting resync", "took", time.Now().Sub(start), "num_repos_to_crawl", numReposToResync) return nil }