Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

for now, add duplicate stale refs to the keep set #353

Merged
merged 6 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions bgs/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/bluesky-social/indigo/models"
"github.com/labstack/echo/v4"
dto "github.com/prometheus/client_model/go"
"go.opentelemetry.io/otel"
"golang.org/x/time/rate"
"gorm.io/gorm"
)
Expand Down Expand Up @@ -397,7 +398,8 @@ func (bgs *BGS) handleAdminChangePDSCrawlLimit(e echo.Context) error {
}

func (bgs *BGS) handleAdminCompactRepo(e echo.Context) error {
ctx := e.Request().Context()
ctx, span := otel.Tracer("bgs").Start(context.Background(), "adminCompactRepo")
defer span.End()

did := e.QueryParam("did")
if did == "" {
Expand All @@ -421,7 +423,8 @@ func (bgs *BGS) handleAdminCompactRepo(e echo.Context) error {
}

func (bgs *BGS) handleAdminCompactAllRepos(e echo.Context) error {
ctx := e.Request().Context()
ctx, span := otel.Tracer("bgs").Start(context.Background(), "adminCompactAllRepos")
defer span.End()

if err := bgs.runRepoCompaction(ctx); err != nil {
return fmt.Errorf("compaction run failed: %w", err)
Expand Down Expand Up @@ -481,3 +484,29 @@ func (bgs *BGS) handleAdminGetResyncPDS(e echo.Context) error {
"resync": resync,
})
}

func (bgs *BGS) handleAdminResetRepo(e echo.Context) error {
ctx := e.Request().Context()

did := e.QueryParam("did")
if did == "" {
return fmt.Errorf("must pass a did")
}

ai, err := bgs.Index.LookupUserByDid(ctx, did)
if err != nil {
return fmt.Errorf("no such user: %w", err)
}

if err := bgs.repoman.ResetRepo(ctx, ai.Uid); err != nil {
return err
}

if err := bgs.Index.Crawler.Crawl(ctx, ai); err != nil {
return err
}

return e.JSON(200, map[string]any{
"success": true,
})
}
8 changes: 8 additions & 0 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,13 @@ func (bgs *BGS) StartWithListener(listen net.Listener) error {

log.Warnf("HANDLER ERROR: (%s) %s", ctx.Path(), err)

if strings.HasPrefix(ctx.Path(), "/admin/") {
ctx.JSON(500, map[string]any{
"error": err.Error(),
})
return
}

if sendHeader {
ctx.Response().WriteHeader(500)
}
Expand Down Expand Up @@ -326,6 +333,7 @@ func (bgs *BGS) StartWithListener(listen net.Listener) error {
admin.POST("/repo/reverseTakedown", bgs.handleAdminReverseTakedown)
admin.POST("/repo/compact", bgs.handleAdminCompactRepo)
admin.POST("/repo/compactAll", bgs.handleAdminCompactAllRepos)
admin.POST("/repo/reset", bgs.handleAdminResetRepo)

// PDS-related Admin API
admin.GET("/pds/list", bgs.handleListPDSs)
Expand Down
48 changes: 44 additions & 4 deletions carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
cbor "github.com/ipfs/go-ipld-cbor"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-libipfs/blocks"
logging "github.com/ipfs/go-log"
car "github.com/ipld/go-car"
carutil "github.com/ipld/go-car/util"
cbg "github.com/whyrusleeping/cbor-gen"
Expand All @@ -31,6 +32,8 @@ import (
"gorm.io/gorm"
)

var log = logging.Logger("carstore")

const MaxSliceLength = 2 << 20

type CarStore struct {
Expand Down Expand Up @@ -273,6 +276,13 @@ func (cs *CarStore) checkLastShardCache(user models.Uid) *CarShard {
return nil
}

func (cs *CarStore) removeLastShardCache(user models.Uid) {
cs.lscLk.Lock()
defer cs.lscLk.Unlock()

delete(cs.lastShardCache, user)
}

func (cs *CarStore) putLastShardCache(ls *CarShard) {
cs.lscLk.Lock()
defer cs.lscLk.Unlock()
Expand Down Expand Up @@ -948,7 +958,7 @@ func (cs *CarStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error
return out, nil
}

func (cs *CarStore) TakeDownRepo(ctx context.Context, user models.Uid) error {
func (cs *CarStore) WipeUserData(ctx context.Context, user models.Uid) error {
var shards []*CarShard
if err := cs.meta.Find(&shards, "usr = ?", user).Error; err != nil {
return err
Expand All @@ -960,6 +970,8 @@ func (cs *CarStore) TakeDownRepo(ctx context.Context, user models.Uid) error {
}
}

cs.removeLastShardCache(user)

return nil
}

Expand Down Expand Up @@ -1180,6 +1192,7 @@ type CompactionStats struct {
SkippedShards int `json:"skippedShards"`
ShardsDeleted int `json:"shardsDeleted"`
RefsDeleted int `json:"refsDeleted"`
DupeCount int `json:"dupeCount"`
}

func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*CompactionStats, error) {
Expand Down Expand Up @@ -1214,13 +1227,19 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*Co
}

stale := make(map[cid.Cid]bool)
var dupes []cid.Cid
var hasDirtyDupes bool
for _, br := range staleRefs {
if stale[br.Cid.CID] {
hasDirtyDupes = true
break
dupes = append(dupes, br.Cid.CID)
} else {
stale[br.Cid.CID] = true
}
stale[br.Cid.CID] = true
}

for _, dupe := range dupes {
delete(stale, dupe) // remove dupes from stale list, see comment below
}

if hasDirtyDupes {
Expand All @@ -1233,7 +1252,10 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*Co
// focus on compacting everything else. it leaves *some* dirty blocks
// still around but we're doing that anyways since compaction isnt a
// perfect process
return nil, fmt.Errorf("WIP: not currently handling this case")

log.Warnw("repo has dirty dupes", "count", len(dupes), "uid", user, "staleRefs", len(staleRefs), "blockRefs", len(brefs))

//return nil, fmt.Errorf("WIP: not currently handling this case")
}

keep := make(map[cid.Cid]bool)
Expand All @@ -1243,6 +1265,10 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*Co
}
}

for _, dupe := range dupes {
keep[dupe] = true
}

results := aggrRefs(brefs, shardsById, stale)
var sum int
for _, r := range results {
Expand Down Expand Up @@ -1335,6 +1361,7 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*Co
}

stats.RefsDeleted = num
stats.DupeCount = len(dupes)

return stats, nil
}
Expand Down Expand Up @@ -1400,9 +1427,14 @@ func (cs *CarStore) compactBucket(ctx context.Context, user models.Uid, b *compB

offset := hnw
var nbrefs []map[string]any
written := make(map[cid.Cid]bool)
for _, s := range b.shards {
sh := shardsById[s.ID]
if err := cs.iterateShardBlocks(ctx, &sh, func(blk blockformat.Block) error {
if written[blk.Cid()] {
return nil
}

if keep[blk.Cid()] {
nw, err := LdWrite(fi, blk.Cid().Bytes(), blk.RawData())
if err != nil {
Expand All @@ -1415,6 +1447,7 @@ func (cs *CarStore) compactBucket(ctx context.Context, user models.Uid, b *compB
})

offset += nw
written[blk.Cid()] = true
}
return nil
}); err != nil {
Expand All @@ -1432,6 +1465,13 @@ func (cs *CarStore) compactBucket(ctx context.Context, user models.Uid, b *compB
}

if err := cs.putShard(ctx, &shard, nbrefs, nil, true); err != nil {
// if writing the shard fails, we should also delete the file
_ = fi.Close()

if err2 := os.Remove(fi.Name()); err2 != nil {
log.Errorf("failed to remove shard file (%s) after failed db transaction: %w", fi.Name(), err2)
}

return err
}
return nil
Expand Down
42 changes: 42 additions & 0 deletions cmd/gosky/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var bgsAdminCmd = &cli.Command{
bgsSetNewSubsEnabledCmd,
bgsCompactRepo,
bgsCompactAll,
bgsResetRepo,
},
}

Expand Down Expand Up @@ -381,3 +382,44 @@ var bgsCompactAll = &cli.Command{
return nil
},
}

var bgsResetRepo = &cli.Command{
Name: "reset-repo",
Action: func(cctx *cli.Context) error {
url := cctx.String("bgs") + "/admin/repo/reset"

did := cctx.Args().First()
url += fmt.Sprintf("?did=%s", did)

req, err := http.NewRequest("POST", url, nil)
if err != nil {
return err
}

auth := cctx.String("key")
req.Header.Set("Authorization", "Bearer "+auth)

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}

if resp.StatusCode != 200 {
var e xrpc.XRPCError
if err := json.NewDecoder(resp.Body).Decode(&e); err != nil {
return err
}

return &e
}

var out map[string]any
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return err
}

fmt.Println(out)

return nil
},
}
12 changes: 10 additions & 2 deletions repomgr/repomgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ func (rm *RepoManager) ImportNewRepo(ctx context.Context, user models.Uid, repoD

diffops, err := r.DiffSince(ctx, curhead)
if err != nil {
return fmt.Errorf("diff trees: %w", err)
return fmt.Errorf("diff trees (curhead: %s): %w", curhead, err)
}

var ops []RepoOp
Expand Down Expand Up @@ -951,5 +951,13 @@ func (rm *RepoManager) TakeDownRepo(ctx context.Context, uid models.Uid) error {
unlock := rm.lockUser(ctx, uid)
defer unlock()

return rm.cs.TakeDownRepo(ctx, uid)
return rm.cs.WipeUserData(ctx, uid)
}

// technically identical to TakeDownRepo, for now
func (rm *RepoManager) ResetRepo(ctx context.Context, uid models.Uid) error {
unlock := rm.lockUser(ctx, uid)
defer unlock()

return rm.cs.WipeUserData(ctx, uid)
}