Skip to content

Commit

Permalink
Resync with a PDS by comparing revs insted of commit CIDs (#373)
Browse files Browse the repository at this point in the history
When resyncing with a PDS, if users are active then the CID we get from
`ListRepos` can become out of date by the time we start comparing the
CIDs against our local repo heads (i.e. new events come in for some
users, we update the repo head in our local Carstore, and our CID is
fresher than the one in-memory from `ListRepos`).

Since the Repo Rev is monotonically increasing, we can use the Rev we
get back from `ListRepos` as a "low-water-mark" such that, if our local
Rev for that repo is > the one we get from the PDS, we know we're
current. This should reduce the number of unnecessary repos being
recrawled during a resync.
  • Loading branch information
ericvolp12 authored Oct 5, 2023
2 parents 8a63a7a + e4d0b55 commit c2be586
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 26 deletions.
1 change: 1 addition & 0 deletions api/atproto/synclistRepos.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 20 additions & 26 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1250,12 +1250,7 @@ func (bgs *BGS) runRepoCompaction(ctx context.Context, lim int, dry bool) (*comp
}, nil
}

type repoHead struct {
Did string
Head string
}

type headCheckResult struct {
type revCheckResult struct {
ai *models.ActorInfo
err error
}
Expand Down Expand Up @@ -1343,7 +1338,7 @@ func (bgs *BGS) ResyncPDS(ctx context.Context, pds models.PDS) error {
cursor := ""
limit := int64(500)

repos := []repoHead{}
repos := []comatproto.SyncListRepos_Repo{}

pages := 0

Expand All @@ -1367,10 +1362,9 @@ func (bgs *BGS) ResyncPDS(ctx context.Context, pds models.PDS) error {
}

for _, r := range repoList.Repos {
repos = append(repos, repoHead{
Did: r.Did,
Head: r.Head,
})
if r != nil {
repos = append(repos, *r)
}
}

if repoList.Cursor == nil || *repoList.Cursor == "" {
Expand All @@ -1386,45 +1380,45 @@ func (bgs *BGS) ResyncPDS(ctx context.Context, pds models.PDS) error {
repolistDone := time.Now()

log.Warnw("listed all repos, checking roots", "num_repos", len(repos), "took", repolistDone.Sub(start))
resync = bgs.SetResyncStatus(pds.ID, "checking heads")
resync = bgs.SetResyncStatus(pds.ID, "checking revs")

// Create a buffered channel for collecting results
results := make(chan headCheckResult, len(repos))
results := make(chan revCheckResult, len(repos))
sem := semaphore.NewWeighted(40)

// Check repo heads against our local copy and enqueue crawls for any that are out of date
// Check repo revs against our local copy and enqueue crawls for any that are out of date
for _, r := range repos {
go func(r repoHead) {
go func(r comatproto.SyncListRepos_Repo) {
if err := sem.Acquire(ctx, 1); err != nil {
log.Errorw("failed to acquire semaphore", "error", err)
results <- headCheckResult{err: err}
results <- revCheckResult{err: err}
return
}
defer sem.Release(1)

log := log.With("did", r.Did, "head", r.Head)
log := log.With("did", r.Did, "remote_rev", r.Rev)
// Fetches the user if we have it, otherwise automatically enqueues it for crawling
ai, err := bgs.Index.GetUserOrMissing(ctx, r.Did)
if err != nil {
log.Errorw("failed to get user while resyncing PDS, we can't recrawl it", "error", err)
results <- headCheckResult{err: err}
results <- revCheckResult{err: err}
return
}

head, err := bgs.repoman.GetRepoRoot(ctx, ai.Uid)
rev, err := bgs.repoman.GetRepoRev(ctx, ai.Uid)
if err != nil {
log.Warnw("recrawling because we failed to get the local repo root", "err", err, "uid", ai.Uid)
results <- headCheckResult{ai: ai}
results <- revCheckResult{ai: ai}
return
}

if head.String() != r.Head {
log.Warnw("recrawling because the repo head from the PDS is different from our local repo root", "local_head", head.String())
results <- headCheckResult{ai: ai}
if rev == "" || rev < r.Rev {
log.Warnw("recrawling because the repo rev from the PDS is newer than our local repo rev", "local_rev", rev)
results <- revCheckResult{ai: ai}
return
}

results <- headCheckResult{}
results <- revCheckResult{}
}(r)
}

Expand All @@ -1442,8 +1436,8 @@ func (bgs *BGS) ResyncPDS(ctx context.Context, pds models.PDS) error {
log.Errorw("failed to enqueue crawl for repo during resync", "error", err, "uid", res.ai.Uid, "did", res.ai.Did)
}
}
if i%10_000 == 0 {
log.Warnw("checked heads during resync", "num_repos_checked", i, "num_repos_to_crawl", numReposToResync, "took", time.Now().Sub(resync.StatusChangedAt))
if i%100_000 == 0 {
log.Warnw("checked revs during resync", "num_repos_checked", i, "num_repos_to_crawl", numReposToResync, "took", time.Now().Sub(resync.StatusChangedAt))
resync.NumReposChecked = i
resync.NumReposToResync = numReposToResync
bgs.UpdateResync(resync)
Expand Down

0 comments on commit c2be586

Please sign in to comment.