Skip to content

Commit

Permalink
backfill: add failback to fetch CAR file from PDS (if relay errors) (#…
Browse files Browse the repository at this point in the history
…855)

The main motivation here is that non-archival relay is returning a 4xx
error for fetching repos, and we want to try fetching those from the
actual PDS when that happens. This code adds a new code branch when a
relay CAR fetch fails to do an identity lookup to find the account's PDS
instance, and fetches the CAR from there.

For the search code specifically, it re-uses an existing identity
directory, to reduce double-resolution.

This also refactors how fetch URLs are constructed to use just
hostnames.

UPDATE: should probably *not* merge this to `main` until Jaz can review
  • Loading branch information
bnewbold authored Dec 3, 2024
2 parents 72b4acb + 8d81824 commit 7fd5887
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 34 deletions.
93 changes: 60 additions & 33 deletions backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ import (
"time"

"github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/bluesky-social/indigo/repo"
"github.com/bluesky-social/indigo/repomgr"

"github.com/ipfs/go-cid"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -71,15 +74,17 @@ type Backfiller struct {
ParallelRecordCreates int
// Prefix match for records to backfill i.e. app.bsky.feed.app/
// If empty, all records will be backfilled
NSIDFilter string
CheckoutPath string
NSIDFilter string
RelayHost string

syncLimiter *rate.Limiter

magicHeaderKey string
magicHeaderVal string

stop chan chan struct{}

Directory identity.Directory
}

var (
Expand Down Expand Up @@ -110,7 +115,7 @@ type BackfillOptions struct {
ParallelRecordCreates int
NSIDFilter string
SyncRequestsPerSecond int
CheckoutPath string
RelayHost string
}

func DefaultBackfillOptions() *BackfillOptions {
Expand All @@ -119,7 +124,7 @@ func DefaultBackfillOptions() *BackfillOptions {
ParallelRecordCreates: 100,
NSIDFilter: "",
SyncRequestsPerSecond: 2,
CheckoutPath: "https://bsky.network/xrpc/com.atproto.sync.getRepo",
RelayHost: "https://bsky.network",
}
}

Expand All @@ -145,8 +150,9 @@ func NewBackfiller(
ParallelRecordCreates: opts.ParallelRecordCreates,
NSIDFilter: opts.NSIDFilter,
syncLimiter: rate.NewLimiter(rate.Limit(opts.SyncRequestsPerSecond), 1),
CheckoutPath: opts.CheckoutPath,
RelayHost: opts.RelayHost,
stop: make(chan chan struct{}, 1),
Directory: identity.DefaultDirectory(),
}
}

Expand Down Expand Up @@ -292,25 +298,12 @@ type recordResult struct {
err error
}

// BackfillRepo backfills a repo
func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error) {
ctx, span := tracer.Start(ctx, "BackfillRepo")
defer span.End()

start := time.Now()

repoDid := job.Repo()
// Fetches a repo CAR file over HTTP from the indicated host. If successful, parses the CAR and returns repo.Repo
func (b *Backfiller) fetchRepo(ctx context.Context, did, since, host string) (*repo.Repo, error) {
url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", host, did)

log := slog.With("source", "backfiller_backfill_repo", "repo", repoDid)
if job.RetryCount() > 0 {
log = log.With("retry_count", job.RetryCount())
}
log.Info(fmt.Sprintf("processing backfill for %s", repoDid))

url := fmt.Sprintf("%s?did=%s", b.CheckoutPath, repoDid)

if job.Rev() != "" {
url = url + fmt.Sprintf("&since=%s", job.Rev())
if since != "" {
url = url + fmt.Sprintf("&since=%s", since)
}

// GET and CAR decode the body
Expand All @@ -320,8 +313,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error)
}
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
state := fmt.Sprintf("failed (create request: %s)", err.Error())
return state, fmt.Errorf("failed to create request: %w", err)
return nil, fmt.Errorf("failed to create request: %w", err)
}

req.Header.Set("Accept", "application/vnd.ipld.car")
Expand All @@ -334,8 +326,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error)

resp, err := client.Do(req)
if err != nil {
state := fmt.Sprintf("failed (do request: %s)", err.Error())
return state, fmt.Errorf("failed to send request: %w", err)
return nil, fmt.Errorf("failed to send request: %w", err)
}

if resp.StatusCode != http.StatusOK {
Expand All @@ -345,8 +336,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error)
} else {
reason = resp.Status
}
state := fmt.Sprintf("failed (%s)", reason)
return state, fmt.Errorf("failed to get repo: %s", reason)
return nil, fmt.Errorf("failed to get repo: %s", reason)
}

instrumentedReader := instrumentedReader{
Expand All @@ -356,10 +346,47 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error)

defer instrumentedReader.Close()

r, err := repo.ReadRepoFromCar(ctx, instrumentedReader)
repo, err := repo.ReadRepoFromCar(ctx, instrumentedReader)
if err != nil {
state := "failed (couldn't read repo CAR from response body)"
return state, fmt.Errorf("failed to read repo from car: %w", err)
return nil, fmt.Errorf("failed to parse repo from CAR file: %w", err)
}
return repo, nil
}

// BackfillRepo backfills a repo
func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error) {
ctx, span := tracer.Start(ctx, "BackfillRepo")
defer span.End()

start := time.Now()

repoDID := job.Repo()

log := slog.With("source", "backfiller_backfill_repo", "repo", repoDID)
if job.RetryCount() > 0 {
log = log.With("retry_count", job.RetryCount())
}
log.Info(fmt.Sprintf("processing backfill for %s", repoDID))

// first try with Relay endpoint
r, err := b.fetchRepo(ctx, repoDID, job.Rev(), b.RelayHost)
if err != nil {
slog.Warn("repo CAR fetch from relay failed", "did", repoDID, "since", job.Rev(), "relayHost", b.RelayHost, "err", err)
// fallback to direct PDS fetch
ident, err := b.Directory.LookupDID(ctx, syntax.DID(repoDID))
if err != nil {
return "failed resolving DID to PDS repo", fmt.Errorf("resolving DID for PDS repo fetch: %w", err)
}
pdsHost := ident.PDSEndpoint()
if pdsHost == "" {
return "DID document missing PDS endpoint", fmt.Errorf("no PDS endpoint for DID: %s", repoDID)
}
r, err = b.fetchRepo(ctx, repoDID, job.Rev(), pdsHost)
if err != nil {
slog.Warn("repo CAR fetch from PDS failed", "did", repoDID, "since", job.Rev(), "pdsHost", pdsHost, "err", err)
return "repo CAR fetch from PDS failed", err
}
slog.Info("repo CAR fetch from PDS successful", "did", repoDID, "since", job.Rev(), "pdsHost", pdsHost, "err", err)
}

numRecords := 0
Expand Down Expand Up @@ -396,7 +423,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error)

raw := blk.RawData()

err = b.HandleCreateRecord(ctx, repoDid, rev, item.recordPath, &raw, &item.nodeCid)
err = b.HandleCreateRecord(ctx, repoDID, rev, item.recordPath, &raw, &item.nodeCid)
if err != nil {
recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)}
continue
Expand Down
4 changes: 3 additions & 1 deletion search/indexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func NewIndexer(db *gorm.DB, escli *es.Client, dir identity.Directory, config In
opts.SyncRequestsPerSecond = 8
}

opts.CheckoutPath = fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo", relayHTTP)
opts.RelayHost = relayHTTP
if config.IndexMaxConcurrency > 0 {
opts.ParallelRecordCreates = config.IndexMaxConcurrency
} else {
Expand All @@ -145,6 +145,8 @@ func NewIndexer(db *gorm.DB, escli *es.Client, dir identity.Directory, config In
idx.handleDelete,
opts,
)
// reuse identity directory (for efficient caching)
bf.Directory = dir

idx.bfs = bfstore
idx.bf = bf
Expand Down

0 comments on commit 7fd5887

Please sign in to comment.