diff --git a/bgs/admin.go b/bgs/admin.go index 1e8a6d837..15f754dd6 100644 --- a/bgs/admin.go +++ b/bgs/admin.go @@ -106,6 +106,12 @@ type enrichedPDS struct { EventsSeenSinceStartup uint64 `json:"EventsSeenSinceStartup"` IngestRate rateLimit `json:"IngestRate"` CrawlRate rateLimit `json:"CrawlRate"` + UserCount int64 `json:"UserCount"` +} + +type UserCount struct { + PDSID uint `gorm:"column:pds"` + UserCount int64 `gorm:"column:user_count"` } func (bgs *BGS) handleListPDSs(e echo.Context) error { @@ -118,6 +124,20 @@ func (bgs *BGS) handleListPDSs(e echo.Context) error { activePDSHosts := bgs.slurper.GetActiveList() + var userCounts []UserCount + if err := bgs.db.Model(&User{}). + Select("pds, count(*) as user_count"). + Group("pds"). + Find(&userCounts).Error; err != nil { + return err + } + + // Create a map for fast lookup + userCountMap := make(map[uint]int64) + for _, count := range userCounts { + userCountMap[count.PDSID] = count.UserCount + } + for i, p := range pds { enrichedPDSs[i].PDS = p enrichedPDSs[i].HasActiveConnection = false @@ -133,6 +153,7 @@ func (bgs *BGS) handleListPDSs(e echo.Context) error { continue } enrichedPDSs[i].EventsSeenSinceStartup = uint64(m.Counter.GetValue()) + enrichedPDSs[i].UserCount = userCountMap[p.ID] // Get the ingest rate limit for this PDS ingestRate := rateLimit{ diff --git a/bgs/handlers.go b/bgs/handlers.go index cb7ea6170..d93ac2bbe 100644 --- a/bgs/handlers.go +++ b/bgs/handlers.go @@ -14,15 +14,18 @@ import ( atproto "github.com/bluesky-social/indigo/api/atproto" comatprototypes "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/blobs" + "github.com/bluesky-social/indigo/carstore" "github.com/bluesky-social/indigo/mst" "gorm.io/gorm" "github.com/bluesky-social/indigo/xrpc" "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" + "github.com/ipld/go-car" "github.com/labstack/echo/v4" ) -func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, commit string, did string, rkey string) (io.Reader, error) { +func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, did string, rkey string) (io.Reader, error) { u, err := s.lookupUserByDid(ctx, did) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { @@ -40,16 +43,7 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri return nil, fmt.Errorf("account was taken down") } - reqCid := cid.Undef - if commit != "" { - reqCid, err = cid.Decode(commit) - if err != nil { - log.Errorw("failed to decode commit cid", "err", err, "cid", commit) - return nil, echo.NewHTTPError(http.StatusBadRequest, "failed to decode commit cid") - } - } - - _, record, err := s.repoman.GetRecord(ctx, u.ID, collection, rkey, reqCid) + root, blocks, err := s.repoman.GetRecordProof(ctx, u.ID, collection, rkey) if err != nil { if errors.Is(err, mst.ErrNotFound) { return nil, echo.NewHTTPError(http.StatusNotFound, "record not found in repo") @@ -59,10 +53,18 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri } buf := new(bytes.Buffer) - err = record.MarshalCBOR(buf) - if err != nil { - log.Errorw("failed to marshal record to CBOR", "err", err, "did", did, "collection", collection, "rkey", rkey) - return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to marshal record to CBOR") + hb, err := cbor.DumpObject(&car.CarHeader{ + Roots: []cid.Cid{root}, + Version: 1, + }) + if _, err := carstore.LdWrite(buf, hb); err != nil { + return nil, err + } + + for _, blk := range blocks { + if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil { + return nil, err + } } return buf, nil diff --git a/bgs/stubs.go b/bgs/stubs.go index f5308d915..f6cb02188 100644 --- a/bgs/stubs.go +++ b/bgs/stubs.go @@ -112,7 +112,6 @@ func (s *BGS) HandleComAtprotoSyncGetRecord(c echo.Context) error { ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetRecord") defer span.End() collection := c.QueryParam("collection") - commit := c.QueryParam("commit") did := c.QueryParam("did") rkey := c.QueryParam("rkey") @@ -131,17 +130,10 @@ func (s *BGS) HandleComAtprotoSyncGetRecord(c echo.Context) error { return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid did: %s", did)}) } - if commit != "" { - _, err = cid.Parse(commit) - if err != nil { - return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid commit: %s", commit)}) - } - } - var out io.Reader var handleErr error // func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context,collection string,commit string,did string,rkey string) (io.Reader, error) - out, handleErr = s.handleComAtprotoSyncGetRecord(ctx, collection, commit, did, rkey) + out, handleErr = s.handleComAtprotoSyncGetRecord(ctx, collection, did, rkey) if handleErr != nil { return handleErr } diff --git a/carstore/bs.go b/carstore/bs.go index 7a8bbedb2..a992a2776 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -875,7 +875,7 @@ func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, n keepset[c] = true oblk, err := bs.Get(ctx, c) if err != nil { - return nil, err + return nil, fmt.Errorf("get failed in new tree: %w", err) } if err := cbg.ScanForLinks(bytes.NewReader(oblk.RawData()), func(lnk cid.Cid) { @@ -901,10 +901,14 @@ func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, n oblk, err := bs.Get(ctx, c) if err != nil { - return nil, err + return nil, fmt.Errorf("get failed in old tree: %w", err) } if err := cbg.ScanForLinks(bytes.NewReader(oblk.RawData()), func(lnk cid.Cid) { + if lnk.Prefix().Codec != cid.DagCBOR { + return + } + if !keepset[lnk] { dropset[lnk] = true queue = append(queue, lnk) diff --git a/cmd/gosky/main.go b/cmd/gosky/main.go index d2e231241..fd572282b 100644 --- a/cmd/gosky/main.go +++ b/cmd/gosky/main.go @@ -25,6 +25,7 @@ import ( "github.com/bluesky-social/indigo/util" "github.com/bluesky-social/indigo/util/cliutil" "github.com/bluesky-social/indigo/xrpc" + "golang.org/x/time/rate" "github.com/gorilla/websocket" lru "github.com/hashicorp/golang-lru" @@ -143,6 +144,10 @@ var readRepoStreamCmd = &cli.Command{ &cli.BoolFlag{ Name: "resolve-handles", }, + &cli.Float64Flag{ + Name: "max-throughput", + Usage: "limit event consumption to a given # of req/sec (debug utility)", + }, }, ArgsUsage: `[ [cursor]]`, Action: func(cctx *cli.Context) error { @@ -203,9 +208,17 @@ var readRepoStreamCmd = &cli.Command{ return h, nil } + var limiter *rate.Limiter + if cctx.Float64("max-throughput") > 0 { + limiter = rate.NewLimiter(rate.Limit(cctx.Float64("max-throughput")), 1) + } rsc := &events.RepoStreamCallbacks{ RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { + if limiter != nil { + limiter.Wait(ctx) + } + if jsonfmt { b, err := json.Marshal(evt) if err != nil { diff --git a/cmd/gosky/sync.go b/cmd/gosky/sync.go index f27c8b1af..6304205da 100644 --- a/cmd/gosky/sync.go +++ b/cmd/gosky/sync.go @@ -27,6 +27,11 @@ var syncGetRepoCmd = &cli.Command{ Name: "get-repo", Usage: "download repo from account's PDS to local file (or '-' for stdout). for hex combine with 'xxd -ps -u -c 0'", ArgsUsage: ` []`, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "host", + }, + }, Action: func(cctx *cli.Context) error { ctx := context.Background() arg := cctx.Args().First() @@ -57,6 +62,10 @@ var syncGetRepoCmd = &cli.Command{ return fmt.Errorf("no PDS endpoint for identity") } + if h := cctx.String("host"); h != "" { + xrpcc.Host = h + } + log.Infof("downloading from %s to: %s", xrpcc.Host, carPath) repoBytes, err := comatproto.SyncGetRepo(ctx, xrpcc, ident.DID.String(), "") if err != nil { diff --git a/events/events.go b/events/events.go index 6f0b529e5..92c3417e4 100644 --- a/events/events.go +++ b/events/events.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync" + "time" comatproto "github.com/bluesky-social/indigo/api/atproto" label "github.com/bluesky-social/indigo/api/label" @@ -73,11 +74,24 @@ func (em *EventManager) broadcastEvent(evt *XRPCStreamEvent) { select { case s.outgoing <- evt: case <-s.done: + default: + log.Warnw("dropping slow consumer due to event overflow", "bufferSize", len(s.outgoing), "ident", s.ident) go func(torem *Subscriber) { - em.rmSubscriber(torem) + torem.lk.Lock() + if !torem.cleanedUp { + select { + case torem.outgoing <- &XRPCStreamEvent{ + Error: &ErrorFrame{ + Error: "ConsumerTooSlow", + }, + }: + case <-time.After(time.Second * 5): + log.Warnw("failed to send error frame to backed up consumer", "ident", torem.ident) + } + } + torem.lk.Unlock() + torem.cleanup() }(s) - default: - log.Warnf("event overflow (%d)", len(s.outgoing)) } s.broadcastCounter.Inc() } @@ -100,6 +114,11 @@ type Subscriber struct { done chan struct{} + cleanup func() + + lk sync.Mutex + cleanedUp bool + ident string enqueuedCounter prometheus.Counter broadcastCounter prometheus.Counter @@ -164,15 +183,18 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func broadcastCounter: eventsBroadcast.WithLabelValues(ident), } - cleanup := func() { + sub.cleanup = sync.OnceFunc(func() { + sub.lk.Lock() + defer sub.lk.Unlock() close(done) em.rmSubscriber(sub) close(sub.outgoing) - } + sub.cleanedUp = true + }) if since == nil { em.addSubscriber(sub) - return sub.outgoing, cleanup, nil + return sub.outgoing, sub.cleanup, nil } out := make(chan *XRPCStreamEvent, em.bufferSize) @@ -243,11 +265,13 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func } }() - return out, cleanup, nil + return out, sub.cleanup, nil } func sequenceForEvent(evt *XRPCStreamEvent) int64 { switch { + case evt == nil: + return -1 case evt.RepoCommit != nil: return evt.RepoCommit.Seq case evt.RepoHandle != nil: @@ -258,6 +282,8 @@ func sequenceForEvent(evt *XRPCStreamEvent) int64 { return evt.RepoTombstone.Seq case evt.RepoInfo != nil: return -1 + case evt.Error != nil: + return -1 default: return -1 } diff --git a/repomgr/repomgr.go b/repomgr/repomgr.go index f3c681370..14e61f6bc 100644 --- a/repomgr/repomgr.go +++ b/repomgr/repomgr.go @@ -16,7 +16,9 @@ import ( "github.com/bluesky-social/indigo/models" "github.com/bluesky-social/indigo/mst" "github.com/bluesky-social/indigo/repo" + "github.com/bluesky-social/indigo/util" + blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" blockstore "github.com/ipfs/go-ipfs-blockstore" @@ -447,6 +449,32 @@ func (rm *RepoManager) GetRecord(ctx context.Context, user models.Uid, collectio return ocid, val, nil } +func (rm *RepoManager) GetRecordProof(ctx context.Context, user models.Uid, collection string, rkey string) (cid.Cid, []blocks.Block, error) { + robs, err := rm.cs.ReadOnlySession(user) + if err != nil { + return cid.Undef, nil, err + } + + bs := util.NewLoggingBstore(robs) + + head, err := rm.cs.GetUserRepoHead(ctx, user) + if err != nil { + return cid.Undef, nil, err + } + + r, err := repo.OpenRepo(ctx, bs, head, true) + if err != nil { + return cid.Undef, nil, err + } + + _, _, err = r.GetRecord(ctx, collection+"/"+rkey) + if err != nil { + return cid.Undef, nil, err + } + + return head, bs.GetLoggedBlocks(), nil +} + func (rm *RepoManager) GetProfile(ctx context.Context, uid models.Uid) (*bsky.ActorProfile, error) { bs, err := rm.cs.ReadOnlySession(uid) if err != nil { @@ -741,6 +769,12 @@ func (rm *RepoManager) ImportNewRepo(ctx context.Context, user models.Uid, repoD return err } + if rev == nil { + // if 'rev' is nil, this implies a fresh sync. + // in this case, ignore any existing blocks we have and treat this like a clean import. + curhead = cid.Undef + } + if rev != nil && *rev != currev { // TODO: we could probably just deal with this return fmt.Errorf("ImportNewRepo called with incorrect base") diff --git a/ts/bgs-dash/src/components/Dash/Dash.tsx b/ts/bgs-dash/src/components/Dash/Dash.tsx index ed4a38d26..787acdf6f 100644 --- a/ts/bgs-dash/src/components/Dash/Dash.tsx +++ b/ts/bgs-dash/src/components/Dash/Dash.tsx @@ -593,6 +593,37 @@ const Dash: FC<{}> = () => { + + { + setSortField("UserCount"); + setSortOrder(sortOrder === "asc" ? "desc" : "asc"); + }} + > + Users + + {sortField === "UserCount" && sortOrder === "asc" ? ( + + + = () => { )} + + {pds.UserCount?.toLocaleString()} + {pds.EventsSeenSinceStartup?.toLocaleString()} diff --git a/ts/bgs-dash/src/models/pds.ts b/ts/bgs-dash/src/models/pds.ts index 29d0075f8..b2e640e9f 100644 --- a/ts/bgs-dash/src/models/pds.ts +++ b/ts/bgs-dash/src/models/pds.ts @@ -18,6 +18,7 @@ interface PDS { EventsSeenSinceStartup?: number; IngestRate: RateLimit; CrawlRate: RateLimit; + UserCount: number; } type PDSKey = keyof PDS; diff --git a/util/readrecordbs.go b/util/readrecordbs.go new file mode 100644 index 000000000..41cb36143 --- /dev/null +++ b/util/readrecordbs.go @@ -0,0 +1,72 @@ +package util + +import ( + "context" + "fmt" + + blockformat "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + blockstore "github.com/ipfs/go-ipfs-blockstore" +) + +type LoggingBstore struct { + base blockstore.Blockstore + + set map[cid.Cid]blockformat.Block +} + +func NewLoggingBstore(base blockstore.Blockstore) *LoggingBstore { + return &LoggingBstore{ + base: base, + set: make(map[cid.Cid]blockformat.Block), + } +} + +var _ blockstore.Blockstore = (*LoggingBstore)(nil) + +func (bs *LoggingBstore) GetLoggedBlocks() []blockformat.Block { + var out []blockformat.Block + for _, v := range bs.set { + out = append(out, v) + } + return out +} + +func (bs *LoggingBstore) Has(ctx context.Context, c cid.Cid) (bool, error) { + return bs.base.Has(ctx, c) +} + +func (bs *LoggingBstore) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) { + blk, err := bs.base.Get(ctx, c) + if err != nil { + return nil, err + } + + bs.set[c] = blk + + return blk, nil +} + +func (bs *LoggingBstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { + return bs.base.GetSize(ctx, c) +} + +func (bs *LoggingBstore) DeleteBlock(ctx context.Context, c cid.Cid) error { + return fmt.Errorf("deletes not allowed on logging blockstore") +} + +func (bs *LoggingBstore) Put(context.Context, blockformat.Block) error { + return fmt.Errorf("writes not allowed on logging blockstore") +} + +func (bs *LoggingBstore) PutMany(context.Context, []blockformat.Block) error { + return fmt.Errorf("writes not allowed on logging blockstore") +} + +func (bs *LoggingBstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return nil, fmt.Errorf("iteration not allowed on logging blockstore") +} + +func (bs *LoggingBstore) HashOnRead(enabled bool) { + +}