From eecbf9088488a4b680161c5a31d67af313fc6fc0 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 28 Sep 2023 11:30:38 -0700 Subject: [PATCH 01/12] for now, add duplicate stale refs to the keep set --- bgs/bgs.go | 7 +++++++ carstore/bs.go | 21 ++++++++++++++++++--- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/bgs/bgs.go b/bgs/bgs.go index 5ac58ab90..c3f22fad8 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -281,6 +281,13 @@ func (bgs *BGS) StartWithListener(listen net.Listener) error { sendHeader = false } + if strings.HasPrefix(ctx.Path(), "/admin/") { + ctx.JSON(500, map[string]any{ + "error": err.Error(), + }) + return + } + log.Warnf("HANDLER ERROR: (%s) %s", ctx.Path(), err) if sendHeader { diff --git a/carstore/bs.go b/carstore/bs.go index 288236183..027542a00 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -23,6 +23,7 @@ import ( cbor "github.com/ipfs/go-ipld-cbor" ipld "github.com/ipfs/go-ipld-format" "github.com/ipfs/go-libipfs/blocks" + logging "github.com/ipfs/go-log" car "github.com/ipld/go-car" carutil "github.com/ipld/go-car/util" cbg "github.com/whyrusleeping/cbor-gen" @@ -31,6 +32,8 @@ import ( "gorm.io/gorm" ) +var log = logging.Logger("carstore") + const MaxSliceLength = 2 << 20 type CarStore struct { @@ -1180,6 +1183,7 @@ type CompactionStats struct { SkippedShards int `json:"skippedShards"` ShardsDeleted int `json:"shardsDeleted"` RefsDeleted int `json:"refsDeleted"` + DupeCount int `json:"dupeCount"` } func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*CompactionStats, error) { @@ -1214,13 +1218,16 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*Co } stale := make(map[cid.Cid]bool) + var dupes []cid.Cid var hasDirtyDupes bool for _, br := range staleRefs { if stale[br.Cid.CID] { + delete(stale, br.Cid.CID) // remove dupes from stale list, see comment below hasDirtyDupes = true - break + dupes = append(dupes, br.Cid.CID) + } else { + stale[br.Cid.CID] = true } - stale[br.Cid.CID] = true } if hasDirtyDupes { @@ -1233,7 +1240,10 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*Co // focus on compacting everything else. it leaves *some* dirty blocks // still around but we're doing that anyways since compaction isnt a // perfect process - return nil, fmt.Errorf("WIP: not currently handling this case") + + log.Warnw("repo has dirty dupes", "count", len(dupes), "uid", user, "staleRefs", len(staleRefs), "blockRefs", len(brefs)) + + //return nil, fmt.Errorf("WIP: not currently handling this case") } keep := make(map[cid.Cid]bool) @@ -1243,6 +1253,10 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*Co } } + for _, dupe := range dupes { + keep[dupe] = true + } + results := aggrRefs(brefs, shardsById, stale) var sum int for _, r := range results { @@ -1335,6 +1349,7 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*Co } stats.RefsDeleted = num + stats.DupeCount = len(dupes) return stats, nil } From 97850b81a4ddf7b14767d7520d36524abf73d4f7 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 28 Sep 2023 14:01:00 -0700 Subject: [PATCH 02/12] use background contexts for repo compaction admin routes --- bgs/admin.go | 7 +++++-- carstore/bs.go | 7 +++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/bgs/admin.go b/bgs/admin.go index 3e8d92de8..547d83fa2 100644 --- a/bgs/admin.go +++ b/bgs/admin.go @@ -12,6 +12,7 @@ import ( "github.com/bluesky-social/indigo/models" "github.com/labstack/echo/v4" dto "github.com/prometheus/client_model/go" + "go.opentelemetry.io/otel" "golang.org/x/time/rate" "gorm.io/gorm" ) @@ -397,7 +398,8 @@ func (bgs *BGS) handleAdminChangePDSCrawlLimit(e echo.Context) error { } func (bgs *BGS) handleAdminCompactRepo(e echo.Context) error { - ctx := e.Request().Context() + ctx, span := otel.Tracer("bgs").Start(context.Background(), "adminCompactRepo") + defer span.End() did := e.QueryParam("did") if did == "" { @@ -421,7 +423,8 @@ func (bgs *BGS) handleAdminCompactRepo(e echo.Context) error { } func (bgs *BGS) handleAdminCompactAllRepos(e echo.Context) error { - ctx := e.Request().Context() + ctx, span := otel.Tracer("bgs").Start(context.Background(), "adminCompactAllRepos") + defer span.End() if err := bgs.runRepoCompaction(ctx); err != nil { return fmt.Errorf("compaction run failed: %w", err) diff --git a/carstore/bs.go b/carstore/bs.go index 027542a00..74833756c 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -1447,6 +1447,13 @@ func (cs *CarStore) compactBucket(ctx context.Context, user models.Uid, b *compB } if err := cs.putShard(ctx, &shard, nbrefs, nil, true); err != nil { + // if writing the shard fails, we should also delete the file + _ = fi.Close() + + if err2 := os.Remove(fi.Name()); err2 != nil { + log.Errorf("failed to remove shard file (%s) after failed db transaction: %w", fi.Name(), err2) + } + return err } return nil From 61c8c55c82842f84f0f9111ed4b721bde9a0463e Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 29 Sep 2023 10:06:06 -0700 Subject: [PATCH 03/12] add endpoint to reset a given repo, other tweaks to compaction --- bgs/admin.go | 24 ++++++++++++++++++++++++ bgs/bgs.go | 1 + carstore/bs.go | 13 +++++++++++-- repomgr/repomgr.go | 10 +++++++++- 4 files changed, 45 insertions(+), 3 deletions(-) diff --git a/bgs/admin.go b/bgs/admin.go index 547d83fa2..71697bbd4 100644 --- a/bgs/admin.go +++ b/bgs/admin.go @@ -484,3 +484,27 @@ func (bgs *BGS) handleAdminGetResyncPDS(e echo.Context) error { "resync": resync, }) } + +func (bgs *BGS) handleAdminResetRepo(e echo.Context) error { + ctx := e.Request().Context() + + did := e.QueryParam("did") + if did == "" { + return fmt.Errorf("must pass a did") + } + + ai, err := bgs.Index.LookupUserByDid(ctx, did) + if err != nil { + return fmt.Errorf("no such user: %w", err) + } + + if err := bgs.repoman.ResetRepo(ctx, ai.Uid); err != nil { + return err + } + + if err := bgs.Index.Crawler.Crawl(ctx, ai); err != nil { + return err + } + + return nil +} diff --git a/bgs/bgs.go b/bgs/bgs.go index c3f22fad8..8de298404 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -333,6 +333,7 @@ func (bgs *BGS) StartWithListener(listen net.Listener) error { admin.POST("/repo/reverseTakedown", bgs.handleAdminReverseTakedown) admin.POST("/repo/compact", bgs.handleAdminCompactRepo) admin.POST("/repo/compactAll", bgs.handleAdminCompactAllRepos) + admin.POST("/repo/reset", bgs.handleAdminResetRepo) // PDS-related Admin API admin.GET("/pds/list", bgs.handleListPDSs) diff --git a/carstore/bs.go b/carstore/bs.go index 74833756c..a5d46da75 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -951,7 +951,7 @@ func (cs *CarStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error return out, nil } -func (cs *CarStore) TakeDownRepo(ctx context.Context, user models.Uid) error { +func (cs *CarStore) WipeUserData(ctx context.Context, user models.Uid) error { var shards []*CarShard if err := cs.meta.Find(&shards, "usr = ?", user).Error; err != nil { return err @@ -1222,7 +1222,6 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*Co var hasDirtyDupes bool for _, br := range staleRefs { if stale[br.Cid.CID] { - delete(stale, br.Cid.CID) // remove dupes from stale list, see comment below hasDirtyDupes = true dupes = append(dupes, br.Cid.CID) } else { @@ -1230,6 +1229,10 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*Co } } + for _, dupe := range dupes { + delete(stale, dupe) // remove dupes from stale list, see comment below + } + if hasDirtyDupes { // if we have no duplicates, then the keep set is simply all the 'clean' blockRefs // in the case we have duplicate dirty references we have to compute @@ -1415,9 +1418,14 @@ func (cs *CarStore) compactBucket(ctx context.Context, user models.Uid, b *compB offset := hnw var nbrefs []map[string]any + written := make(map[cid.Cid]bool) for _, s := range b.shards { sh := shardsById[s.ID] if err := cs.iterateShardBlocks(ctx, &sh, func(blk blockformat.Block) error { + if written[blk.Cid()] { + return nil + } + if keep[blk.Cid()] { nw, err := LdWrite(fi, blk.Cid().Bytes(), blk.RawData()) if err != nil { @@ -1430,6 +1438,7 @@ func (cs *CarStore) compactBucket(ctx context.Context, user models.Uid, b *compB }) offset += nw + written[blk.Cid()] = true } return nil }); err != nil { diff --git a/repomgr/repomgr.go b/repomgr/repomgr.go index 5b11645ef..e98b5a4c3 100644 --- a/repomgr/repomgr.go +++ b/repomgr/repomgr.go @@ -951,5 +951,13 @@ func (rm *RepoManager) TakeDownRepo(ctx context.Context, uid models.Uid) error { unlock := rm.lockUser(ctx, uid) defer unlock() - return rm.cs.TakeDownRepo(ctx, uid) + return rm.cs.WipeUserData(ctx, uid) +} + +// technically identical to TakeDownRepo, for now +func (rm *RepoManager) ResetRepo(ctx context.Context, uid models.Uid) error { + unlock := rm.lockUser(ctx, uid) + defer unlock() + + return rm.cs.WipeUserData(ctx, uid) } From 9cd705933aa1ae29f4e646c003bff2c6e483b6ba Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 29 Sep 2023 10:31:41 -0700 Subject: [PATCH 04/12] cli command to trigger reset repo --- cmd/gosky/bgs.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/cmd/gosky/bgs.go b/cmd/gosky/bgs.go index 727f56f31..dfbddb680 100644 --- a/cmd/gosky/bgs.go +++ b/cmd/gosky/bgs.go @@ -33,6 +33,7 @@ var bgsAdminCmd = &cli.Command{ bgsSetNewSubsEnabledCmd, bgsCompactRepo, bgsCompactAll, + bgsResetRepo, }, } @@ -381,3 +382,44 @@ var bgsCompactAll = &cli.Command{ return nil }, } + +var bgsResetRepo = &cli.Command{ + Name: "reset-repo", + Action: func(cctx *cli.Context) error { + url := cctx.String("bgs") + "/admin/repo/reset" + + did := cctx.Args().First() + url += fmt.Sprintf("?did=%s", did) + + req, err := http.NewRequest("POST", url, nil) + if err != nil { + return err + } + + auth := cctx.String("key") + req.Header.Set("Authorization", "Bearer "+auth) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + + if resp.StatusCode != 200 { + var e xrpc.XRPCError + if err := json.NewDecoder(resp.Body).Decode(&e); err != nil { + return err + } + + return &e + } + + var out map[string]any + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return err + } + + fmt.Println(out) + + return nil + }, +} From b320b8a17b67cc59574ce490f2656c2cedf8ed1e Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 29 Sep 2023 11:36:27 -0700 Subject: [PATCH 05/12] drop shard cache when wiping user data from carstore --- bgs/admin.go | 4 +++- bgs/bgs.go | 4 ++-- carstore/bs.go | 9 +++++++++ repomgr/repomgr.go | 2 +- 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/bgs/admin.go b/bgs/admin.go index 71697bbd4..135041252 100644 --- a/bgs/admin.go +++ b/bgs/admin.go @@ -506,5 +506,7 @@ func (bgs *BGS) handleAdminResetRepo(e echo.Context) error { return err } - return nil + return e.JSON(200, map[string]any{ + "success": true, + }) } diff --git a/bgs/bgs.go b/bgs/bgs.go index 8de298404..1c359fa2b 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -281,6 +281,8 @@ func (bgs *BGS) StartWithListener(listen net.Listener) error { sendHeader = false } + log.Warnf("HANDLER ERROR: (%s) %s", ctx.Path(), err) + if strings.HasPrefix(ctx.Path(), "/admin/") { ctx.JSON(500, map[string]any{ "error": err.Error(), @@ -288,8 +290,6 @@ func (bgs *BGS) StartWithListener(listen net.Listener) error { return } - log.Warnf("HANDLER ERROR: (%s) %s", ctx.Path(), err) - if sendHeader { ctx.Response().WriteHeader(500) } diff --git a/carstore/bs.go b/carstore/bs.go index a5d46da75..cf2e2a8d8 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -276,6 +276,13 @@ func (cs *CarStore) checkLastShardCache(user models.Uid) *CarShard { return nil } +func (cs *CarStore) removeLastShardCache(user models.Uid) { + cs.lscLk.Lock() + defer cs.lscLk.Unlock() + + delete(cs.lastShardCache, user) +} + func (cs *CarStore) putLastShardCache(ls *CarShard) { cs.lscLk.Lock() defer cs.lscLk.Unlock() @@ -963,6 +970,8 @@ func (cs *CarStore) WipeUserData(ctx context.Context, user models.Uid) error { } } + cs.removeLastShardCache(user) + return nil } diff --git a/repomgr/repomgr.go b/repomgr/repomgr.go index e98b5a4c3..e9ffece46 100644 --- a/repomgr/repomgr.go +++ b/repomgr/repomgr.go @@ -731,7 +731,7 @@ func (rm *RepoManager) ImportNewRepo(ctx context.Context, user models.Uid, repoD diffops, err := r.DiffSince(ctx, curhead) if err != nil { - return fmt.Errorf("diff trees: %w", err) + return fmt.Errorf("diff trees (curhead: %s): %w", curhead, err) } var ops []RepoOp From c492597e9c3897d1ee67eed176000e2c72bb429d Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 29 Sep 2023 15:44:17 -0700 Subject: [PATCH 06/12] properly account for dupe-ref staleRefs --- carstore/bs.go | 22 ++++++++++------ carstore/repo_test.go | 59 +++++++++++++++++++++++++++++++------------ cmd/gosky/debug.go | 2 +- mst/mst.go | 8 +++--- 4 files changed, 62 insertions(+), 29 deletions(-) diff --git a/carstore/bs.go b/carstore/bs.go index cf2e2a8d8..e646df8f1 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -751,12 +751,11 @@ func generateInsertQuery(data []map[string]any) (string, []any) { // Function to create in batches func createInBatches(ctx context.Context, tx *gorm.DB, data []map[string]any, batchSize int) error { for i := 0; i < len(data); i += batchSize { - end := i + batchSize - if end > len(data) { - end = len(data) + batch := data[i:] + if len(batch) > batchSize { + batch = batch[:batchSize] } - batch := data[i:end] query, values := generateInsertQuery(batch) if err := tx.WithContext(ctx).Exec(query, values...).Error; err != nil { @@ -1227,14 +1226,21 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*Co } stale := make(map[cid.Cid]bool) + for _, br := range staleRefs { + stale[br.Cid.CID] = true + } + + // if we have a staleRef that references multiple blockRefs, we consider that block a 'dirty duplicate' var dupes []cid.Cid var hasDirtyDupes bool - for _, br := range staleRefs { - if stale[br.Cid.CID] { - hasDirtyDupes = true + seenBlocks := make(map[cid.Cid]bool) + for _, br := range brefs { + if seenBlocks[br.Cid.CID] { dupes = append(dupes, br.Cid.CID) + hasDirtyDupes = true + delete(stale, br.Cid.CID) } else { - stale[br.Cid.CID] = true + seenBlocks[br.Cid.CID] = true } } diff --git a/carstore/repo_test.go b/carstore/repo_test.go index 5ce81ece7..1093f4592 100644 --- a/carstore/repo_test.go +++ b/carstore/repo_test.go @@ -3,6 +3,7 @@ package carstore import ( "bytes" "context" + "errors" "fmt" "io" "os" @@ -17,6 +18,7 @@ import ( "github.com/ipfs/go-cid" flatfs "github.com/ipfs/go-ds-flatfs" blockstore "github.com/ipfs/go-ipfs-blockstore" + ipld "github.com/ipfs/go-ipld-format" "gorm.io/driver/sqlite" "gorm.io/gorm" ) @@ -138,7 +140,7 @@ func TestBasicOperation(t *testing.T) { if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { t.Fatal(err) } - checkRepo(t, buf, recs) + checkRepo(t, cs, buf, recs) if _, err := cs.CompactUserShards(ctx, 1); err != nil { t.Fatal(err) @@ -148,7 +150,7 @@ func TestBasicOperation(t *testing.T) { if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { t.Fatal(err) } - checkRepo(t, buf, recs) + checkRepo(t, cs, buf, recs) } func TestRepeatedCompactions(t *testing.T) { @@ -177,8 +179,10 @@ func TestRepeatedCompactions(t *testing.T) { var recs []cid.Cid head := ncid + var lastRec string + for loop := 0; loop < 50; loop++ { - for i := 0; i < 100; i++ { + for i := 0; i < 20; i++ { ds, err := cs.NewDeltaSession(ctx, 1, &rev) if err != nil { t.Fatal(err) @@ -188,16 +192,23 @@ func TestRepeatedCompactions(t *testing.T) { if err != nil { t.Fatal(err) } - - rc, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ - Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()), - }) - if err != nil { - t.Fatal(err) + if i%4 == 3 { + if err := rr.DeleteRecord(ctx, lastRec); err != nil { + t.Fatal(err) + } + recs = recs[:len(recs)-1] + } else { + rc, tid, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ + Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()), + }) + if err != nil { + t.Fatal(err) + } + + recs = append(recs, rc) + lastRec = "app.bsky.feed.post/" + tid } - recs = append(recs, rc) - kmgr := &util.FakeKeyManager{} nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) if err != nil { @@ -228,21 +239,21 @@ func TestRepeatedCompactions(t *testing.T) { if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { t.Fatal(err) } - checkRepo(t, buf, recs) + checkRepo(t, cs, buf, recs) } buf := new(bytes.Buffer) if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { t.Fatal(err) } - checkRepo(t, buf, recs) + checkRepo(t, cs, buf, recs) } -func checkRepo(t *testing.T, r io.Reader, expRecs []cid.Cid) { +func checkRepo(t *testing.T, cs *CarStore, r io.Reader, expRecs []cid.Cid) { t.Helper() rep, err := repo.ReadRepoFromCar(context.TODO(), r) if err != nil { - t.Fatal(err) + t.Fatal("Reading repo: ", err) } set := make(map[cid.Cid]bool) @@ -259,7 +270,23 @@ func checkRepo(t *testing.T, r io.Reader, expRecs []cid.Cid) { return nil }); err != nil { - t.Fatal(err) + var ierr ipld.ErrNotFound + if errors.As(err, &ierr) { + fmt.Println("matched error") + bs, err := cs.ReadOnlySession(1) + if err != nil { + fmt.Println("couldnt read session: ", err) + } + + blk, err := bs.Get(context.TODO(), ierr.Cid) + if err != nil { + fmt.Println("also failed the local get: ", err) + } else { + fmt.Println("LOCAL GET SUCCESS", len(blk.RawData())) + } + } + + t.Fatal("walking repo: ", err) } if len(set) > 0 { diff --git a/cmd/gosky/debug.go b/cmd/gosky/debug.go index 382b5aab0..3822e392d 100644 --- a/cmd/gosky/debug.go +++ b/cmd/gosky/debug.go @@ -775,7 +775,7 @@ var debugGetRepoCmd = &cli.Command{ if err := rep.ForEach(ctx, "", func(k string, v cid.Cid) error { rec, err := rep.Blockstore().Get(ctx, v) if err != nil { - return err + return fmt.Errorf("getting record %q: %w", k, err) } count++ diff --git a/mst/mst.go b/mst/mst.go index 47c9b0fd8..ee648add4 100644 --- a/mst/mst.go +++ b/mst/mst.go @@ -944,26 +944,26 @@ func (mst *MerkleSearchTree) WalkLeavesFrom(ctx context.Context, from string, cb entries, err := mst.getEntries(ctx) if err != nil { - return err + return fmt.Errorf("get entries: %w", err) } if index > 0 { prev := entries[index-1] if !prev.isUndefined() && prev.isTree() { if err := prev.Tree.WalkLeavesFrom(ctx, from, cb); err != nil { - return err + return fmt.Errorf("walk leaves %d: %w", index, err) } } } - for _, e := range entries[index:] { + for i, e := range entries[index:] { if e.isLeaf() { if err := cb(e.Key, e.Val); err != nil { return err } } else { if err := e.Tree.WalkLeavesFrom(ctx, from, cb); err != nil { - return err + return fmt.Errorf("walk leaves from (%d): %w", i, err) } } } From 98242a4dfae69057b7699d455f3aa02fb20d8449 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Sun, 1 Oct 2023 16:37:52 +0000 Subject: [PATCH 07/12] Use custom metrics middleware for Palomar to avoid 404DOSing --- search/metrics.go | 93 +++++++++++++++++++++++++++++++++++++++++++++++ search/server.go | 6 +-- 2 files changed, 96 insertions(+), 3 deletions(-) diff --git a/search/metrics.go b/search/metrics.go index 829c9bc1d..bd53581e3 100644 --- a/search/metrics.go +++ b/search/metrics.go @@ -1,6 +1,12 @@ package search import ( + "errors" + "net/http" + "strconv" + "time" + + "github.com/labstack/echo/v4" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -49,3 +55,90 @@ var currentSeq = promauto.NewGauge(prometheus.GaugeOpts{ Name: "search_current_seq", Help: "Current sequence number", }) + +var reqSz = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "http_request_size_bytes", + Help: "A histogram of request sizes for requests.", + Buckets: prometheus.ExponentialBuckets(100, 10, 8), +}, []string{"code", "method", "path"}) + +var reqDur = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "http_request_duration_seconds", + Help: "A histogram of latencies for requests.", + Buckets: prometheus.ExponentialBuckets(0.0001, 2, 18), +}, []string{"code", "method", "path"}) + +var reqCnt = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "http_requests_total", + Help: "A counter for requests to the wrapped handler.", +}, []string{"code", "method", "path"}) + +var resSz = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "http_response_size_bytes", + Help: "A histogram of response sizes for requests.", + Buckets: prometheus.ExponentialBuckets(100, 10, 8), +}, []string{"code", "method", "path"}) + +// MetricsMiddleware defines handler function for metrics middleware +func MetricsMiddleware(next echo.HandlerFunc) echo.HandlerFunc { + return func(c echo.Context) error { + path := c.Path() + if path == "/metrics" || path == "/_health" { + return next(c) + } + + start := time.Now() + requestSize := computeApproximateRequestSize(c.Request()) + + err := next(c) + + status := c.Response().Status + if err != nil { + var httpError *echo.HTTPError + if errors.As(err, &httpError) { + status = httpError.Code + } + if status == 0 || status == http.StatusOK { + status = http.StatusInternalServerError + } + } + + elapsed := float64(time.Since(start)) / float64(time.Second) + + statusStr := strconv.Itoa(status) + method := c.Request().Method + + responseSize := float64(c.Response().Size) + + reqDur.WithLabelValues(statusStr, method, path).Observe(elapsed) + reqCnt.WithLabelValues(statusStr, method, path).Inc() + reqSz.WithLabelValues(statusStr, method, path).Observe(float64(requestSize)) + resSz.WithLabelValues(statusStr, method, path).Observe(responseSize) + + return err + } +} + +func computeApproximateRequestSize(r *http.Request) int { + s := 0 + if r.URL != nil { + s = len(r.URL.Path) + } + + s += len(r.Method) + s += len(r.Proto) + for name, values := range r.Header { + s += len(name) + for _, value := range values { + s += len(value) + } + } + s += len(r.Host) + + // N.B. r.Form and r.MultipartForm are assumed to be included in r.URL. + + if r.ContentLength != -1 { + s += int(r.ContentLength) + } + return s +} diff --git a/search/server.go b/search/server.go index 7a80b27d2..82dbccd23 100644 --- a/search/server.go +++ b/search/server.go @@ -13,8 +13,8 @@ import ( "github.com/bluesky-social/indigo/backfill" "github.com/bluesky-social/indigo/util/version" "github.com/bluesky-social/indigo/xrpc" + "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/labstack/echo-contrib/echoprometheus" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" es "github.com/opensearch-project/opensearch-go/v2" @@ -183,7 +183,7 @@ func (s *Server) RunAPI(listen string) error { e.HideBanner = true e.Use(slogecho.New(s.logger)) e.Use(middleware.Recover()) - e.Use(echoprometheus.NewMiddleware("palomar")) + e.Use(MetricsMiddleware) e.Use(middleware.BodyLimit("64M")) e.HTTPErrorHandler = func(err error, ctx echo.Context) { @@ -197,7 +197,7 @@ func (s *Server) RunAPI(listen string) error { e.Use(middleware.CORS()) e.GET("/_health", s.handleHealthCheck) - e.GET("/metrics", echoprometheus.NewHandler()) + e.GET("/metrics", echo.WrapHandler(promhttp.Handler())) e.GET("/xrpc/app.bsky.unspecced.searchPostsSkeleton", s.handleSearchPostsSkeleton) e.GET("/xrpc/app.bsky.unspecced.searchActorsSkeleton", s.handleSearchActorsSkeleton) e.GET("/xrpc/app.bsky.unspecced.indexRepos", s.handleIndexRepos) From a33e87b4859b353e072cab4b0aba52e4383ee109 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Sun, 1 Oct 2023 13:53:21 -0700 Subject: [PATCH 08/12] have compaction endpoints return more information --- bgs/admin.go | 22 ++++++++++++++++++---- bgs/bgs.go | 31 ++++++++++++++++++++++++++----- carstore/bs.go | 10 ++++++---- 3 files changed, 50 insertions(+), 13 deletions(-) diff --git a/bgs/admin.go b/bgs/admin.go index 135041252..4044c9c1c 100644 --- a/bgs/admin.go +++ b/bgs/admin.go @@ -426,13 +426,27 @@ func (bgs *BGS) handleAdminCompactAllRepos(e echo.Context) error { ctx, span := otel.Tracer("bgs").Start(context.Background(), "adminCompactAllRepos") defer span.End() - if err := bgs.runRepoCompaction(ctx); err != nil { + var dry bool + if strings.ToLower(e.QueryParam("dry")) == "true" { + dry = true + } + + lim := 50 + if limstr := e.QueryParam("limit"); limstr != "" { + v, err := strconv.Atoi(limstr) + if err != nil { + return err + } + + lim = v + } + + stats, err := bgs.runRepoCompaction(ctx, lim, dry) + if err != nil { return fmt.Errorf("compaction run failed: %w", err) } - return e.JSON(200, map[string]any{ - "success": "true", - }) + return e.JSON(200, stats) } func (bgs *BGS) handleAdminPostResyncPDS(e echo.Context) error { diff --git a/bgs/bgs.go b/bgs/bgs.go index 1c359fa2b..e3d87bdce 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -1127,23 +1127,44 @@ func (bgs *BGS) ReverseTakedown(ctx context.Context, did string) error { return nil } -func (bgs *BGS) runRepoCompaction(ctx context.Context) error { +type compactionStats struct { + Completed map[models.Uid]*carstore.CompactionStats + Targets []carstore.CompactionTarget +} + +func (bgs *BGS) runRepoCompaction(ctx context.Context, lim int, dry bool) (*compactionStats, error) { ctx, span := otel.Tracer("bgs").Start(ctx, "runRepoCompaction") defer span.End() - repos, err := bgs.repoman.CarStore().GetCompactionTargets(ctx) + repos, err := bgs.repoman.CarStore().GetCompactionTargets(ctx, 50) if err != nil { - return fmt.Errorf("failed to get repos to compact: %w", err) + return nil, fmt.Errorf("failed to get repos to compact: %w", err) + } + + if lim > 0 && len(repos) > lim { + repos = repos[:lim] } + if dry { + return &compactionStats{ + Targets: repos, + }, nil + } + + results := make(map[models.Uid]*carstore.CompactionStats) for _, r := range repos { - if _, err := bgs.repoman.CarStore().CompactUserShards(ctx, r.Usr); err != nil { + st, err := bgs.repoman.CarStore().CompactUserShards(ctx, r.Usr) + if err != nil { log.Errorf("failed to compact shards for user %d: %s", r.Usr, err) continue } + results[r.Usr] = st } - return nil + return &compactionStats{ + Targets: repos, + Completed: results, + }, nil } type repoHead struct { diff --git a/carstore/bs.go b/carstore/bs.go index e646df8f1..4dd057357 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -727,7 +727,7 @@ func createBlockRefs(ctx context.Context, tx *gorm.DB, brefs []map[string]any) e ctx, span := otel.Tracer("carstore").Start(ctx, "createBlockRefs") defer span.End() - if err := createInBatches(ctx, tx, brefs, 100); err != nil { + if err := createInBatches(ctx, tx, brefs, 1000); err != nil { return err } @@ -1150,12 +1150,12 @@ type CompactionTarget struct { NumShards int } -func (cs *CarStore) GetCompactionTargets(ctx context.Context) ([]CompactionTarget, error) { +func (cs *CarStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) { ctx, span := otel.Tracer("carstore").Start(ctx, "GetCompactionTargets") defer span.End() var targets []CompactionTarget - if err := cs.meta.Raw(`select usr, count(*) as num_shards from car_shards group by usr having count(*) > 50 order by num_shards desc`).Scan(&targets).Error; err != nil { + if err := cs.meta.Raw(`select usr, count(*) as num_shards from car_shards group by usr having count(*) > ? order by num_shards desc`, shardCount).Scan(&targets).Error; err != nil { return nil, err } @@ -1186,6 +1186,7 @@ func (cs *CarStore) getBlockRefsForShards(ctx context.Context, shardIds []uint) } type CompactionStats struct { + TotalRefs int `json:"totalRefs"` StartShards int `json:"startShards"` NewShards int `json:"newShards"` SkippedShards int `json:"skippedShards"` @@ -1326,6 +1327,7 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*Co stats := &CompactionStats{ StartShards: len(shards), + TotalRefs: len(brefs), } removedShards := make(map[uint]bool) @@ -1397,7 +1399,7 @@ func (cs *CarStore) deleteStaleRefs(ctx context.Context, brefs []blockRef, stale } } - chunkSize := 100 + chunkSize := 500 for i := 0; i < len(staleToDelete); i += chunkSize { sl := staleToDelete[i:] if len(sl) > chunkSize { From bf081a3f88917931f09d58f2c3238c220f6fa831 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 2 Oct 2023 10:56:40 -0700 Subject: [PATCH 09/12] improvement to compact-all command --- cmd/gosky/bgs.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/cmd/gosky/bgs.go b/cmd/gosky/bgs.go index dfbddb680..6764a3045 100644 --- a/cmd/gosky/bgs.go +++ b/cmd/gosky/bgs.go @@ -347,10 +347,29 @@ var bgsCompactRepo = &cli.Command{ var bgsCompactAll = &cli.Command{ Name: "compact-all", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "dry", + }, + &cli.IntFlag{ + Name: "limit", + }, + }, Action: func(cctx *cli.Context) error { - url := cctx.String("bgs") + "/admin/repo/compactAll" + uu, err := url.Parse(cctx.String("bgs") + "/admin/repo/compactAll") + if err != nil { + return err + } - req, err := http.NewRequest("POST", url, nil) + if cctx.Bool("dry") { + uu.Query().Add("dry", "true") + } + + if cctx.IsSet("limit") { + uu.Query().Add("limit", fmt.Sprint(cctx.Int("limit"))) + } + + req, err := http.NewRequest("POST", uu.String(), nil) if err != nil { return err } From 1d320433ea790323ffc05164fd74d9235cd19925 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 25 Sep 2023 14:37:59 -0700 Subject: [PATCH 10/12] implement repo tombstone handler --- bgs/bgs.go | 43 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/bgs/bgs.go b/bgs/bgs.go index e3d87bdce..019be9416 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -453,7 +453,8 @@ type User struct { // TakenDown is set to true if the user in question has been taken down. // A user in this state will have all future events related to it dropped // and no data about this user will be served. - TakenDown bool + TakenDown bool + Tombstoned bool } type addTargetBody struct { @@ -860,12 +861,52 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event return err } + return nil + case env.RepoTombstone != nil: + if err := bgs.handleRepoTombstone(ctx, host, env.RepoTombstone); err != nil { + return err + } + return nil default: return fmt.Errorf("invalid fed event") } } +func (bgs *BGS) handleRepoTombstone(ctx context.Context, pds *models.PDS, evt *atproto.SyncSubscribeRepos_Tombstone) error { + u, err := bgs.lookupUserByDid(ctx, evt.Did) + if err != nil { + return err + } + + if u.PDS != pds.ID { + return fmt.Errorf("unauthoritative tombstone event from %s for %s", pds.Host, evt.Did) + } + + if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumns(map[string]any{ + "tombstoned": true, + "handle": nil, + }).Error; err != nil { + return err + } + + if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{ + "handle": nil, + }).Error; err != nil { + return err + } + + // delete data from carstore + if err := bgs.repoman.TakeDownRepo(ctx, u.ID); err != nil { + // don't let a failure here prevent us from propagating this event + log.Errorf("failed to delete user data from carstore: %s", err) + } + + return bgs.events.AddEvent(ctx, &events.XRPCStreamEvent{ + RepoTombstone: evt, + }) +} + func (s *BGS) syncUserBlobs(ctx context.Context, pds *models.PDS, user models.Uid, blobs []string) error { if s.blobs == nil { log.Debugf("blob syncing disabled") From 735db31d5c508468d01acb96312ed79c08d0fda1 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 25 Sep 2023 15:20:39 -0700 Subject: [PATCH 11/12] respect user flags in sync endpoints --- bgs/handlers.go | 41 +++++++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/bgs/handlers.go b/bgs/handlers.go index dc2827178..133cc052e 100644 --- a/bgs/handlers.go +++ b/bgs/handlers.go @@ -21,7 +21,7 @@ import ( ) func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, commit string, did string, rkey string) (io.Reader, error) { - u, err := s.Index.LookupUserByDid(ctx, did) + u, err := s.lookupUserByDid(ctx, did) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return nil, echo.NewHTTPError(http.StatusNotFound, "user not found") @@ -29,6 +29,14 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user") } + if u.Tombstoned { + return nil, fmt.Errorf("account was deleted") + } + + if u.TakenDown { + return nil, fmt.Errorf("account was taken down") + } + reqCid := cid.Undef if commit != "" { reqCid, err = cid.Decode(commit) @@ -37,7 +45,7 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri } } - _, record, err := s.repoman.GetRecord(ctx, u.Uid, collection, rkey, reqCid) + _, record, err := s.repoman.GetRecord(ctx, u.ID, collection, rkey, reqCid) if err != nil { return nil, fmt.Errorf("failed to get record: %w", err) } @@ -52,7 +60,7 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri } func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since string) (io.Reader, error) { - u, err := s.Index.LookupUserByDid(ctx, did) + u, err := s.lookupUserByDid(ctx, did) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return nil, echo.NewHTTPError(http.StatusNotFound, "user not found") @@ -60,9 +68,17 @@ func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user") } + if u.Tombstoned { + return nil, fmt.Errorf("account was deleted") + } + + if u.TakenDown { + return nil, fmt.Errorf("account was taken down") + } + // TODO: stream the response buf := new(bytes.Buffer) - if err := s.repoman.ReadRepo(ctx, u.Uid, since, buf); err != nil { + if err := s.repoman.ReadRepo(ctx, u.ID, since, buf); err != nil { return nil, fmt.Errorf("failed to read repo: %w", err) } @@ -158,7 +174,7 @@ func (s *BGS) handleComAtprotoSyncListRepos(ctx context.Context, cursor string, } users := []User{} - if err := s.db.Model(&User{}).Where("id > ?", c).Order("id").Limit(limit).Find(&users).Error; err != nil { + if err := s.db.Model(&User{}).Where("id > ? AND NOT tombstoned AND NOT taken_down", c).Order("id").Limit(limit).Find(&users).Error; err != nil { if err == gorm.ErrRecordNotFound { return &comatprototypes.SyncListRepos_Output{}, nil } @@ -175,6 +191,7 @@ func (s *BGS) handleComAtprotoSyncListRepos(ctx context.Context, cursor string, for i := range users { user := users[i] + root, err := s.repoman.GetRepoRoot(ctx, user.ID) if err != nil { return nil, fmt.Errorf("failed to get repo root for (%s): %w", user.Did, err) @@ -194,7 +211,7 @@ func (s *BGS) handleComAtprotoSyncListRepos(ctx context.Context, cursor string, } func (s *BGS) handleComAtprotoSyncGetLatestCommit(ctx context.Context, did string) (*comatprototypes.SyncGetLatestCommit_Output, error) { - u, err := s.Index.LookupUserByDid(ctx, did) + u, err := s.lookupUserByDid(ctx, did) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return nil, echo.NewHTTPError(http.StatusNotFound, "user not found") @@ -202,12 +219,20 @@ func (s *BGS) handleComAtprotoSyncGetLatestCommit(ctx context.Context, did strin return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user") } - root, err := s.repoman.GetRepoRoot(ctx, u.Uid) + if u.Tombstoned { + return nil, fmt.Errorf("account was deleted") + } + + if u.TakenDown { + return nil, fmt.Errorf("account was taken down") + } + + root, err := s.repoman.GetRepoRoot(ctx, u.ID) if err != nil { return nil, fmt.Errorf("failed to get repo root: %w", err) } - rev, err := s.repoman.GetRepoRev(ctx, u.Uid) + rev, err := s.repoman.GetRepoRev(ctx, u.ID) if err != nil { return nil, fmt.Errorf("failed to get repo rev: %w", err) } From b60c71ea3359f46702c9e3839cfedcfefffe2050 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 28 Sep 2023 11:04:43 -0700 Subject: [PATCH 12/12] handle un-tombstoning --- bgs/bgs.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/bgs/bgs.go b/bgs/bgs.go index 019be9416..5b22595c1 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -777,6 +777,33 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event return fmt.Errorf("rebase was true in event seq:%d,host:%s", evt.Seq, host.Host) } + if host.ID != u.PDS { + log.Infow("received event for repo from different pds than expected", "repo", evt.Repo, "expPds", u.PDS, "gotPds", host.Host) + subj, err := bgs.createExternalUser(ctx, evt.Repo) + if err != nil { + return err + } + + if subj.PDS != host.ID { + return fmt.Errorf("event from non-authoritative pds") + } + } + + if u.Tombstoned { + // we've checked the authority of the users PDS, so reinstate the account + if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumn("tombstoned", false).Error; err != nil { + return fmt.Errorf("failed to un-tombstone a user: %w", err) + } + + ai, err := bgs.Index.LookupUser(ctx, u.ID) + if err != nil { + return fmt.Errorf("failed to look up user (tombstone recover): %w", err) + } + + // Now a simple re-crawl should suffice to bring the user back online + return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt) + } + // skip the fast path for rebases or if the user is already in the slow path if bgs.Index.Crawler.RepoInSlowPath(ctx, host, u.ID) { rebasesCounter.WithLabelValues(host.Host).Add(1) @@ -1050,6 +1077,7 @@ func (s *BGS) createExternalUser(ctx context.Context, did string) (*models.Actor return nil, fmt.Errorf("failed to update users pds: %w", err) } + exu.PDS = peering.ID } if exu.Handle.String != handle {