Skip to content

Commit

Permalink
in the case of missing blocks, fall back to full repo fetch (#332)
Browse files Browse the repository at this point in the history
If for some reason we end up with missing data, add a multi-stage
fallback to first fetch the missing revision range ( in the case of an
event slice erroring) and then if data is still missing, fall back to
fetching the entire repo.

also added a debug command to fetch and verify data in a repo.
  • Loading branch information
whyrusleeping authored Sep 22, 2023
2 parents 6442e52 + 0eca951 commit 976cb2b
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 2 deletions.
3 changes: 2 additions & 1 deletion bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/gorilla/websocket"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
Expand Down Expand Up @@ -765,7 +766,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, evt.Since, evt.Rev, evt.Blocks, evt.Ops); err != nil {
log.Warnw("failed handling event", "err", err, "host", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String())

if errors.Is(err, carstore.ErrRepoBaseMismatch) {
if errors.Is(err, carstore.ErrRepoBaseMismatch) || ipld.IsNotFound(err) {
ai, err := bgs.Index.LookupUser(ctx, u.ID)
if err != nil {
return err
Expand Down
4 changes: 3 additions & 1 deletion carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ type DeltaSession struct {
seq int
readonly bool
cs *CarStore
lastRev string
}

func (cs *CarStore) checkLastShardCache(user models.Uid) *CarShard {
Expand Down Expand Up @@ -332,6 +333,7 @@ func (cs *CarStore) NewDeltaSession(ctx context.Context, user models.Uid, since
baseCid: lastShard.Root.CID,
cs: cs,
seq: lastShard.Seq + 1,
lastRev: lastShard.Rev,
}, nil
}

Expand Down Expand Up @@ -880,7 +882,7 @@ func (cs *CarStore) ImportSlice(ctx context.Context, uid models.Uid, since *stri

rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks)
if err != nil {
return cid.Undef, nil, fmt.Errorf("block diff failed (base=%s): %w", ds.baseCid, err)
return cid.Undef, nil, fmt.Errorf("block diff failed (base=%s,since=%v,rev=%s): %w", ds.baseCid, since, ds.lastRev, err)
}

ds.rmcids = rmcids
Expand Down
43 changes: 43 additions & 0 deletions cmd/gosky/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var debugCmd = &cli.Command{
debugFeedGenCmd,
debugFeedViewCmd,
compareStreamsCmd,
debugGetRepoCmd,
},
}

Expand Down Expand Up @@ -746,3 +747,45 @@ func saveCache(filename string, data map[string]*bsky.FeedDefs_PostView) error {

return nil
}

var debugGetRepoCmd = &cli.Command{
Name: "get-repo",
Flags: []cli.Flag{},
ArgsUsage: `<did>`,
Action: func(cctx *cli.Context) error {
xrpcc, err := cliutil.GetXrpcClient(cctx, false)
if err != nil {
return err
}

ctx := context.TODO()

repobytes, err := comatproto.SyncGetRepo(ctx, xrpcc, cctx.Args().First(), "")
if err != nil {
return fmt.Errorf("getting repo: %w", err)
}

rep, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repobytes))
if err != nil {
return err
}

fmt.Println("Rev: ", rep.SignedCommit().Rev)
var count int
if err := rep.ForEach(ctx, "", func(k string, v cid.Cid) error {
rec, err := rep.Blockstore().Get(ctx, v)
if err != nil {
return err
}

count++
_ = rec
return nil
}); err != nil {
return err
}
fmt.Printf("scanned %d records\n", count)

return nil
},
}
17 changes: 17 additions & 0 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"golang.org/x/time/rate"

"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -464,6 +465,22 @@ func (ix *Indexer) FetchAndIndexRepo(ctx context.Context, job *crawlWork) error
// we probably want alternative ways of doing this for 'very large' or 'very old' repos, but this works for now
if err := ix.repomgr.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), &rev); err != nil {
span.RecordError(err)

if ipld.IsNotFound(err) {
limiter.Wait(ctx)
log.Infow("SyncGetRepo", "did", ai.Did, "user", ai.Handle, "since", "", "fallback", true)
repo, err := comatproto.SyncGetRepo(ctx, c, ai.Did, "")
if err != nil {
reposFetched.WithLabelValues("fail").Inc()
return fmt.Errorf("failed to fetch repo: %w", err)
}
reposFetched.WithLabelValues("success").Inc()

if err := ix.repomgr.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), nil); err != nil {
span.RecordError(err)
return fmt.Errorf("failed to import backup repo (%s): %w", ai.Did, err)
}
}
return fmt.Errorf("importing fetched repo (curRev: %s): %w", rev, err)
}

Expand Down

0 comments on commit 976cb2b

Please sign in to comment.