From 6b31ee0ea5098eab51e973a37c71d2b4e63f36a7 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 3 Nov 2023 10:23:37 -0700 Subject: [PATCH 01/22] drop consumers if their out buffer fills up --- events/events.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/events/events.go b/events/events.go index 6f0b529e5..e7b1c633b 100644 --- a/events/events.go +++ b/events/events.go @@ -77,7 +77,10 @@ func (em *EventManager) broadcastEvent(evt *XRPCStreamEvent) { em.rmSubscriber(torem) }(s) default: - log.Warnf("event overflow (%d)", len(s.outgoing)) + log.Warnw("event overflow", "bufferSize", len(s.outgoing), "ident", s.ident) + go func(torem *Subscriber) { + em.rmSubscriber(torem) + }(s) } s.broadcastCounter.Inc() } From 150a0cc374d9f9db7cf2cbaf8aaae0eeec92c1f1 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 3 Nov 2023 10:58:24 -0700 Subject: [PATCH 02/22] properly close out subscribe on event overflow --- events/events.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/events/events.go b/events/events.go index e7b1c633b..670c8f454 100644 --- a/events/events.go +++ b/events/events.go @@ -73,13 +73,10 @@ func (em *EventManager) broadcastEvent(evt *XRPCStreamEvent) { select { case s.outgoing <- evt: case <-s.done: - go func(torem *Subscriber) { - em.rmSubscriber(torem) - }(s) default: log.Warnw("event overflow", "bufferSize", len(s.outgoing), "ident", s.ident) go func(torem *Subscriber) { - em.rmSubscriber(torem) + torem.cleanup() }(s) } s.broadcastCounter.Inc() @@ -103,6 +100,8 @@ type Subscriber struct { done chan struct{} + cleanup func() + ident string enqueuedCounter prometheus.Counter broadcastCounter prometheus.Counter @@ -167,15 +166,15 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func broadcastCounter: eventsBroadcast.WithLabelValues(ident), } - cleanup := func() { + sub.cleanup = sync.OnceFunc(func() { close(done) em.rmSubscriber(sub) close(sub.outgoing) - } + }) if since == nil { em.addSubscriber(sub) - return sub.outgoing, cleanup, nil + return sub.outgoing, sub.cleanup, nil } out := make(chan *XRPCStreamEvent, em.bufferSize) @@ -246,7 +245,7 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func } }() - return out, cleanup, nil + return out, sub.cleanup, nil } func sequenceForEvent(evt *XRPCStreamEvent) int64 { From 26f36d3a71adde7526282cb3946ce360ea5f00cd Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Wed, 8 Nov 2023 19:27:39 +0000 Subject: [PATCH 03/22] Remove commit from getRecord since there's no repo history anymore --- bgs/handlers.go | 13 ++----------- bgs/stubs.go | 10 +--------- 2 files changed, 3 insertions(+), 20 deletions(-) diff --git a/bgs/handlers.go b/bgs/handlers.go index 33b0bcef8..324733a0a 100644 --- a/bgs/handlers.go +++ b/bgs/handlers.go @@ -22,7 +22,7 @@ import ( "github.com/labstack/echo/v4" ) -func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, commit string, did string, rkey string) (io.Reader, error) { +func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, did string, rkey string) (io.Reader, error) { u, err := s.lookupUserByDid(ctx, did) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { @@ -40,16 +40,7 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri return nil, fmt.Errorf("account was taken down") } - reqCid := cid.Undef - if commit != "" { - reqCid, err = cid.Decode(commit) - if err != nil { - log.Errorw("failed to decode commit cid", "err", err, "cid", commit) - return nil, echo.NewHTTPError(http.StatusBadRequest, "failed to decode commit cid") - } - } - - _, record, err := s.repoman.GetRecord(ctx, u.ID, collection, rkey, reqCid) + _, record, err := s.repoman.GetRecord(ctx, u.ID, collection, rkey, cid.Undef) if err != nil { if errors.Is(err, mst.ErrNotFound) { return nil, echo.NewHTTPError(http.StatusNotFound, "record not found in repo") diff --git a/bgs/stubs.go b/bgs/stubs.go index f5308d915..f6cb02188 100644 --- a/bgs/stubs.go +++ b/bgs/stubs.go @@ -112,7 +112,6 @@ func (s *BGS) HandleComAtprotoSyncGetRecord(c echo.Context) error { ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetRecord") defer span.End() collection := c.QueryParam("collection") - commit := c.QueryParam("commit") did := c.QueryParam("did") rkey := c.QueryParam("rkey") @@ -131,17 +130,10 @@ func (s *BGS) HandleComAtprotoSyncGetRecord(c echo.Context) error { return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid did: %s", did)}) } - if commit != "" { - _, err = cid.Parse(commit) - if err != nil { - return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid commit: %s", commit)}) - } - } - var out io.Reader var handleErr error // func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context,collection string,commit string,did string,rkey string) (io.Reader, error) - out, handleErr = s.handleComAtprotoSyncGetRecord(ctx, collection, commit, did, rkey) + out, handleErr = s.handleComAtprotoSyncGetRecord(ctx, collection, did, rkey) if handleErr != nil { return handleErr } From 19866f4928ae65f8f02eb771e487abe34cbdc2b0 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Wed, 8 Nov 2023 19:41:07 +0000 Subject: [PATCH 04/22] Fix backfill test --- backfill/backfill_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/backfill/backfill_test.go b/backfill/backfill_test.go index 4c7f7f4b7..885196e21 100644 --- a/backfill/backfill_test.go +++ b/backfill/backfill_test.go @@ -32,6 +32,7 @@ func TestBackfill(t *testing.T) { ts := &testState{} opts := backfill.DefaultBackfillOptions() + opts.CheckoutPath = "https://bsky.network/xrpc/com.atproto.sync.getRepo" opts.NSIDFilter = "app.bsky.feed.follow/" bf := backfill.NewBackfiller( From 8fb248ddc5a67849be8c27aa7cd33f52f3edf89e Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 8 Nov 2023 14:55:21 -0800 Subject: [PATCH 05/22] implement sync.GetRecord correctly --- bgs/handlers.go | 21 ++++++++++--- repomgr/repomgr.go | 28 +++++++++++++++++ util/readrecordbs.go | 72 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 116 insertions(+), 5 deletions(-) create mode 100644 util/readrecordbs.go diff --git a/bgs/handlers.go b/bgs/handlers.go index 324733a0a..768a89775 100644 --- a/bgs/handlers.go +++ b/bgs/handlers.go @@ -13,12 +13,15 @@ import ( atproto "github.com/bluesky-social/indigo/api/atproto" comatprototypes "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/blobs" + "github.com/bluesky-social/indigo/carstore" "github.com/bluesky-social/indigo/mst" "gorm.io/gorm" "github.com/bluesky-social/indigo/util" "github.com/bluesky-social/indigo/xrpc" "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" + "github.com/ipld/go-car" "github.com/labstack/echo/v4" ) @@ -40,7 +43,7 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri return nil, fmt.Errorf("account was taken down") } - _, record, err := s.repoman.GetRecord(ctx, u.ID, collection, rkey, cid.Undef) + root, blocks, err := s.repoman.GetRecordProof(ctx, u.ID, collection, rkey) if err != nil { if errors.Is(err, mst.ErrNotFound) { return nil, echo.NewHTTPError(http.StatusNotFound, "record not found in repo") @@ -50,10 +53,18 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri } buf := new(bytes.Buffer) - err = record.MarshalCBOR(buf) - if err != nil { - log.Errorw("failed to marshal record to CBOR", "err", err, "did", did, "collection", collection, "rkey", rkey) - return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to marshal record to CBOR") + hb, err := cbor.DumpObject(&car.CarHeader{ + Roots: []cid.Cid{root}, + Version: 1, + }) + if _, err := carstore.LdWrite(buf, hb); err != nil { + return nil, err + } + + for _, blk := range blocks { + if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil { + return nil, err + } } return buf, nil diff --git a/repomgr/repomgr.go b/repomgr/repomgr.go index f3c681370..9461a1760 100644 --- a/repomgr/repomgr.go +++ b/repomgr/repomgr.go @@ -16,7 +16,9 @@ import ( "github.com/bluesky-social/indigo/models" "github.com/bluesky-social/indigo/mst" "github.com/bluesky-social/indigo/repo" + "github.com/bluesky-social/indigo/util" + blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" blockstore "github.com/ipfs/go-ipfs-blockstore" @@ -447,6 +449,32 @@ func (rm *RepoManager) GetRecord(ctx context.Context, user models.Uid, collectio return ocid, val, nil } +func (rm *RepoManager) GetRecordProof(ctx context.Context, user models.Uid, collection string, rkey string) (cid.Cid, []blocks.Block, error) { + robs, err := rm.cs.ReadOnlySession(user) + if err != nil { + return cid.Undef, nil, err + } + + bs := util.NewReadRecordBstore(robs) + + head, err := rm.cs.GetUserRepoHead(ctx, user) + if err != nil { + return cid.Undef, nil, err + } + + r, err := repo.OpenRepo(ctx, bs, head, true) + if err != nil { + return cid.Undef, nil, err + } + + _, _, err = r.GetRecord(ctx, collection+"/"+rkey) + if err != nil { + return cid.Undef, nil, err + } + + return head, bs.AllReadBlocks(), nil +} + func (rm *RepoManager) GetProfile(ctx context.Context, uid models.Uid) (*bsky.ActorProfile, error) { bs, err := rm.cs.ReadOnlySession(uid) if err != nil { diff --git a/util/readrecordbs.go b/util/readrecordbs.go new file mode 100644 index 000000000..53262d0a4 --- /dev/null +++ b/util/readrecordbs.go @@ -0,0 +1,72 @@ +package util + +import ( + "context" + "fmt" + + blockformat "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + blockstore "github.com/ipfs/go-ipfs-blockstore" +) + +type ReadRecordBstore struct { + base blockstore.Blockstore + + set map[cid.Cid]blockformat.Block +} + +func NewReadRecordBstore(base blockstore.Blockstore) *ReadRecordBstore { + return &ReadRecordBstore{ + base: base, + set: make(map[cid.Cid]blockformat.Block), + } +} + +var _ blockstore.Blockstore = (*ReadRecordBstore)(nil) + +func (bs *ReadRecordBstore) AllReadBlocks() []blockformat.Block { + var out []blockformat.Block + for _, v := range bs.set { + out = append(out, v) + } + return out +} + +func (bs *ReadRecordBstore) Has(ctx context.Context, c cid.Cid) (bool, error) { + return bs.base.Has(ctx, c) +} + +func (bs *ReadRecordBstore) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) { + blk, err := bs.base.Get(ctx, c) + if err != nil { + return nil, err + } + + bs.set[c] = blk + + return blk, nil +} + +func (bs *ReadRecordBstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { + return bs.base.GetSize(ctx, c) +} + +func (bs *ReadRecordBstore) DeleteBlock(ctx context.Context, c cid.Cid) error { + return fmt.Errorf("deletes not supported") +} + +func (bs *ReadRecordBstore) Put(context.Context, blockformat.Block) error { + return fmt.Errorf("writes not allows on read-record blockstore") +} + +func (bs *ReadRecordBstore) PutMany(context.Context, []blockformat.Block) error { + return fmt.Errorf("writes not allows on read-record blockstore") +} + +func (bs *ReadRecordBstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return nil, fmt.Errorf("iteration not supported on read-record blockstore") +} + +func (bs *ReadRecordBstore) HashOnRead(enabled bool) { + +} From 0355a321ef7049cf76628b5efe60f933b732ee2e Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 8 Nov 2023 15:02:54 -0800 Subject: [PATCH 06/22] grammar are good --- util/readrecordbs.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/util/readrecordbs.go b/util/readrecordbs.go index 53262d0a4..8e5a077c7 100644 --- a/util/readrecordbs.go +++ b/util/readrecordbs.go @@ -56,11 +56,11 @@ func (bs *ReadRecordBstore) DeleteBlock(ctx context.Context, c cid.Cid) error { } func (bs *ReadRecordBstore) Put(context.Context, blockformat.Block) error { - return fmt.Errorf("writes not allows on read-record blockstore") + return fmt.Errorf("writes not allowed on read-record blockstore") } func (bs *ReadRecordBstore) PutMany(context.Context, []blockformat.Block) error { - return fmt.Errorf("writes not allows on read-record blockstore") + return fmt.Errorf("writes not allowed on read-record blockstore") } func (bs *ReadRecordBstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { From 93b7e18532b8dbdf3c7fdca415966fafe4249069 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 8 Nov 2023 15:08:01 -0800 Subject: [PATCH 07/22] standardize error messages --- util/readrecordbs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/readrecordbs.go b/util/readrecordbs.go index 8e5a077c7..ee92cf159 100644 --- a/util/readrecordbs.go +++ b/util/readrecordbs.go @@ -52,7 +52,7 @@ func (bs *ReadRecordBstore) GetSize(ctx context.Context, c cid.Cid) (int, error) } func (bs *ReadRecordBstore) DeleteBlock(ctx context.Context, c cid.Cid) error { - return fmt.Errorf("deletes not supported") + return fmt.Errorf("deletes not allowed on read-record blockstore") } func (bs *ReadRecordBstore) Put(context.Context, blockformat.Block) error { From bb835a144385a533b83c0fa2da2bd1e62fc403d1 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 8 Nov 2023 15:17:35 -0800 Subject: [PATCH 08/22] suck less at naming --- repomgr/repomgr.go | 4 ++-- util/readrecordbs.go | 26 +++++++++++++------------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/repomgr/repomgr.go b/repomgr/repomgr.go index 9461a1760..77fb5adf6 100644 --- a/repomgr/repomgr.go +++ b/repomgr/repomgr.go @@ -455,7 +455,7 @@ func (rm *RepoManager) GetRecordProof(ctx context.Context, user models.Uid, coll return cid.Undef, nil, err } - bs := util.NewReadRecordBstore(robs) + bs := util.NewLoggingBstore(robs) head, err := rm.cs.GetUserRepoHead(ctx, user) if err != nil { @@ -472,7 +472,7 @@ func (rm *RepoManager) GetRecordProof(ctx context.Context, user models.Uid, coll return cid.Undef, nil, err } - return head, bs.AllReadBlocks(), nil + return head, bs.GetLoggedBlocks(), nil } func (rm *RepoManager) GetProfile(ctx context.Context, uid models.Uid) (*bsky.ActorProfile, error) { diff --git a/util/readrecordbs.go b/util/readrecordbs.go index ee92cf159..62271ca4d 100644 --- a/util/readrecordbs.go +++ b/util/readrecordbs.go @@ -9,22 +9,22 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" ) -type ReadRecordBstore struct { +type LoggingBstore struct { base blockstore.Blockstore set map[cid.Cid]blockformat.Block } -func NewReadRecordBstore(base blockstore.Blockstore) *ReadRecordBstore { - return &ReadRecordBstore{ +func NewLoggingBstore(base blockstore.Blockstore) *LoggingBstore { + return &LoggingBstore{ base: base, set: make(map[cid.Cid]blockformat.Block), } } -var _ blockstore.Blockstore = (*ReadRecordBstore)(nil) +var _ blockstore.Blockstore = (*LoggingBstore)(nil) -func (bs *ReadRecordBstore) AllReadBlocks() []blockformat.Block { +func (bs *LoggingBstore) GetLoggedBlocks() []blockformat.Block { var out []blockformat.Block for _, v := range bs.set { out = append(out, v) @@ -32,11 +32,11 @@ func (bs *ReadRecordBstore) AllReadBlocks() []blockformat.Block { return out } -func (bs *ReadRecordBstore) Has(ctx context.Context, c cid.Cid) (bool, error) { +func (bs *LoggingBstore) Has(ctx context.Context, c cid.Cid) (bool, error) { return bs.base.Has(ctx, c) } -func (bs *ReadRecordBstore) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) { +func (bs *LoggingBstore) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) { blk, err := bs.base.Get(ctx, c) if err != nil { return nil, err @@ -47,26 +47,26 @@ func (bs *ReadRecordBstore) Get(ctx context.Context, c cid.Cid) (blockformat.Blo return blk, nil } -func (bs *ReadRecordBstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { +func (bs *LoggingBstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { return bs.base.GetSize(ctx, c) } -func (bs *ReadRecordBstore) DeleteBlock(ctx context.Context, c cid.Cid) error { +func (bs *LoggingBstore) DeleteBlock(ctx context.Context, c cid.Cid) error { return fmt.Errorf("deletes not allowed on read-record blockstore") } -func (bs *ReadRecordBstore) Put(context.Context, blockformat.Block) error { +func (bs *LoggingBstore) Put(context.Context, blockformat.Block) error { return fmt.Errorf("writes not allowed on read-record blockstore") } -func (bs *ReadRecordBstore) PutMany(context.Context, []blockformat.Block) error { +func (bs *LoggingBstore) PutMany(context.Context, []blockformat.Block) error { return fmt.Errorf("writes not allowed on read-record blockstore") } -func (bs *ReadRecordBstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { +func (bs *LoggingBstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { return nil, fmt.Errorf("iteration not supported on read-record blockstore") } -func (bs *ReadRecordBstore) HashOnRead(enabled bool) { +func (bs *LoggingBstore) HashOnRead(enabled bool) { } From bcef4e20878d9d7153e230b201646c2d38bb4baa Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 8 Nov 2023 15:20:27 -0800 Subject: [PATCH 09/22] fix error logs again --- util/readrecordbs.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/util/readrecordbs.go b/util/readrecordbs.go index 62271ca4d..41cb36143 100644 --- a/util/readrecordbs.go +++ b/util/readrecordbs.go @@ -52,19 +52,19 @@ func (bs *LoggingBstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { } func (bs *LoggingBstore) DeleteBlock(ctx context.Context, c cid.Cid) error { - return fmt.Errorf("deletes not allowed on read-record blockstore") + return fmt.Errorf("deletes not allowed on logging blockstore") } func (bs *LoggingBstore) Put(context.Context, blockformat.Block) error { - return fmt.Errorf("writes not allowed on read-record blockstore") + return fmt.Errorf("writes not allowed on logging blockstore") } func (bs *LoggingBstore) PutMany(context.Context, []blockformat.Block) error { - return fmt.Errorf("writes not allowed on read-record blockstore") + return fmt.Errorf("writes not allowed on logging blockstore") } func (bs *LoggingBstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { - return nil, fmt.Errorf("iteration not supported on read-record blockstore") + return nil, fmt.Errorf("iteration not allowed on logging blockstore") } func (bs *LoggingBstore) HashOnRead(enabled bool) { From d80ed5a029ae7e400b093da25f87701f4e2b723e Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 8 Nov 2023 17:00:29 -0800 Subject: [PATCH 10/22] emit tooSlow error frame --- events/events.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/events/events.go b/events/events.go index 670c8f454..1ab8de26b 100644 --- a/events/events.go +++ b/events/events.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync" + "time" comatproto "github.com/bluesky-social/indigo/api/atproto" label "github.com/bluesky-social/indigo/api/label" @@ -74,8 +75,17 @@ func (em *EventManager) broadcastEvent(evt *XRPCStreamEvent) { case s.outgoing <- evt: case <-s.done: default: - log.Warnw("event overflow", "bufferSize", len(s.outgoing), "ident", s.ident) + log.Warnw("dropping slow consumer due to event overflow", "bufferSize", len(s.outgoing), "ident", s.ident) go func(torem *Subscriber) { + select { + case s.outgoing <- &XRPCStreamEvent{ + Error: &ErrorFrame{ + Error: "ConsumerTooSlow", + }, + }: + case <-time.After(time.Second * 5): + log.Warnw("failed to send error frame to backed up consumer", "ident", s.ident) + } torem.cleanup() }(s) } From 2811332dcedf5a6a9d2a34b54c3720387ba26df8 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 9 Nov 2023 10:30:58 -0800 Subject: [PATCH 11/22] only traverse cbor blocks when running mst diff --- carstore/bs.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/carstore/bs.go b/carstore/bs.go index 7a8bbedb2..7feb9d697 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -905,6 +905,10 @@ func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, n } if err := cbg.ScanForLinks(bytes.NewReader(oblk.RawData()), func(lnk cid.Cid) { + if lnk.Prefix().Codec != cid.DagCBOR { + return + } + if !keepset[lnk] { dropset[lnk] = true queue = append(queue, lnk) From aaf2596609db2a51081f99ac9b2cd61795aa3057 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 9 Nov 2023 10:44:52 -0800 Subject: [PATCH 12/22] use closure variable for error send --- events/events.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/events/events.go b/events/events.go index 1ab8de26b..06ccb8cb5 100644 --- a/events/events.go +++ b/events/events.go @@ -78,7 +78,7 @@ func (em *EventManager) broadcastEvent(evt *XRPCStreamEvent) { log.Warnw("dropping slow consumer due to event overflow", "bufferSize", len(s.outgoing), "ident", s.ident) go func(torem *Subscriber) { select { - case s.outgoing <- &XRPCStreamEvent{ + case torem.outgoing <- &XRPCStreamEvent{ Error: &ErrorFrame{ Error: "ConsumerTooSlow", }, From 92bcf49595f879d474020d3adb6be8ff1c4b9613 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 9 Nov 2023 10:53:11 -0800 Subject: [PATCH 13/22] use closure variable for error message --- events/events.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/events/events.go b/events/events.go index 06ccb8cb5..f89aec303 100644 --- a/events/events.go +++ b/events/events.go @@ -84,7 +84,7 @@ func (em *EventManager) broadcastEvent(evt *XRPCStreamEvent) { }, }: case <-time.After(time.Second * 5): - log.Warnw("failed to send error frame to backed up consumer", "ident", s.ident) + log.Warnw("failed to send error frame to backed up consumer", "ident", torem.ident) } torem.cleanup() }(s) From edbc1c95a0bf5dec88e81e3d4f077148614e46e7 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Fri, 10 Nov 2023 05:58:30 +0000 Subject: [PATCH 14/22] Add user count to pds list endpoint --- bgs/admin.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/bgs/admin.go b/bgs/admin.go index 1e8a6d837..48a863095 100644 --- a/bgs/admin.go +++ b/bgs/admin.go @@ -106,6 +106,7 @@ type enrichedPDS struct { EventsSeenSinceStartup uint64 `json:"EventsSeenSinceStartup"` IngestRate rateLimit `json:"IngestRate"` CrawlRate rateLimit `json:"CrawlRate"` + UserCount int64 `json:"UserCount"` } func (bgs *BGS) handleListPDSs(e echo.Context) error { @@ -134,6 +135,11 @@ func (bgs *BGS) handleListPDSs(e echo.Context) error { } enrichedPDSs[i].EventsSeenSinceStartup = uint64(m.Counter.GetValue()) + // Get the number of users for this PDS + if err := bgs.db.Model(&User{}).Where("pds_id = ?", p.ID).Count(&enrichedPDSs[i].UserCount).Error; err != nil { + return err + } + // Get the ingest rate limit for this PDS ingestRate := rateLimit{ MaxEventsPerSecond: p.RateLimit, From cc8f80fa22c1e8c93a46cc626d9103dfd272edf1 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Fri, 10 Nov 2023 06:00:23 +0000 Subject: [PATCH 15/22] Update dashboard to show usercount --- ts/bgs-dash/src/components/Dash/Dash.tsx | 34 ++++++++++++++++++++++++ ts/bgs-dash/src/models/pds.ts | 1 + 2 files changed, 35 insertions(+) diff --git a/ts/bgs-dash/src/components/Dash/Dash.tsx b/ts/bgs-dash/src/components/Dash/Dash.tsx index ed4a38d26..787acdf6f 100644 --- a/ts/bgs-dash/src/components/Dash/Dash.tsx +++ b/ts/bgs-dash/src/components/Dash/Dash.tsx @@ -593,6 +593,37 @@ const Dash: FC<{}> = () => { + + { + setSortField("UserCount"); + setSortOrder(sortOrder === "asc" ? "desc" : "asc"); + }} + > + Users + + {sortField === "UserCount" && sortOrder === "asc" ? ( + + + = () => { )} + + {pds.UserCount?.toLocaleString()} + {pds.EventsSeenSinceStartup?.toLocaleString()} diff --git a/ts/bgs-dash/src/models/pds.ts b/ts/bgs-dash/src/models/pds.ts index 29d0075f8..b2e640e9f 100644 --- a/ts/bgs-dash/src/models/pds.ts +++ b/ts/bgs-dash/src/models/pds.ts @@ -18,6 +18,7 @@ interface PDS { EventsSeenSinceStartup?: number; IngestRate: RateLimit; CrawlRate: RateLimit; + UserCount: number; } type PDSKey = keyof PDS; From 04aa8f4a0a2243774516d55230a513cf9a9d52a6 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Fri, 10 Nov 2023 06:12:33 +0000 Subject: [PATCH 16/22] typo --- bgs/admin.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bgs/admin.go b/bgs/admin.go index 48a863095..b2bfb2c38 100644 --- a/bgs/admin.go +++ b/bgs/admin.go @@ -136,7 +136,7 @@ func (bgs *BGS) handleListPDSs(e echo.Context) error { enrichedPDSs[i].EventsSeenSinceStartup = uint64(m.Counter.GetValue()) // Get the number of users for this PDS - if err := bgs.db.Model(&User{}).Where("pds_id = ?", p.ID).Count(&enrichedPDSs[i].UserCount).Error; err != nil { + if err := bgs.db.Model(&User{}).Where("pds = ?", p.ID).Count(&enrichedPDSs[i].UserCount).Error; err != nil { return err } From d51abd2ca6404624044b2af3ec133107f25709bc Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Fri, 10 Nov 2023 06:21:47 +0000 Subject: [PATCH 17/22] Query for user counts _once_ --- bgs/admin.go | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/bgs/admin.go b/bgs/admin.go index b2bfb2c38..15f754dd6 100644 --- a/bgs/admin.go +++ b/bgs/admin.go @@ -109,6 +109,11 @@ type enrichedPDS struct { UserCount int64 `json:"UserCount"` } +type UserCount struct { + PDSID uint `gorm:"column:pds"` + UserCount int64 `gorm:"column:user_count"` +} + func (bgs *BGS) handleListPDSs(e echo.Context) error { var pds []models.PDS if err := bgs.db.Find(&pds).Error; err != nil { @@ -119,6 +124,20 @@ func (bgs *BGS) handleListPDSs(e echo.Context) error { activePDSHosts := bgs.slurper.GetActiveList() + var userCounts []UserCount + if err := bgs.db.Model(&User{}). + Select("pds, count(*) as user_count"). + Group("pds"). + Find(&userCounts).Error; err != nil { + return err + } + + // Create a map for fast lookup + userCountMap := make(map[uint]int64) + for _, count := range userCounts { + userCountMap[count.PDSID] = count.UserCount + } + for i, p := range pds { enrichedPDSs[i].PDS = p enrichedPDSs[i].HasActiveConnection = false @@ -134,11 +153,7 @@ func (bgs *BGS) handleListPDSs(e echo.Context) error { continue } enrichedPDSs[i].EventsSeenSinceStartup = uint64(m.Counter.GetValue()) - - // Get the number of users for this PDS - if err := bgs.db.Model(&User{}).Where("pds = ?", p.ID).Count(&enrichedPDSs[i].UserCount).Error; err != nil { - return err - } + enrichedPDSs[i].UserCount = userCountMap[p.ID] // Get the ingest rate limit for this PDS ingestRate := rateLimit{ From 520b87a2512e43beb1032a9f3d6b6b1f542f0144 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Fri, 10 Nov 2023 06:40:21 +0000 Subject: [PATCH 18/22] Handle channel closure gracefully without panicing --- events/events.go | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/events/events.go b/events/events.go index f89aec303..92c3417e4 100644 --- a/events/events.go +++ b/events/events.go @@ -77,15 +77,19 @@ func (em *EventManager) broadcastEvent(evt *XRPCStreamEvent) { default: log.Warnw("dropping slow consumer due to event overflow", "bufferSize", len(s.outgoing), "ident", s.ident) go func(torem *Subscriber) { - select { - case torem.outgoing <- &XRPCStreamEvent{ - Error: &ErrorFrame{ - Error: "ConsumerTooSlow", - }, - }: - case <-time.After(time.Second * 5): - log.Warnw("failed to send error frame to backed up consumer", "ident", torem.ident) + torem.lk.Lock() + if !torem.cleanedUp { + select { + case torem.outgoing <- &XRPCStreamEvent{ + Error: &ErrorFrame{ + Error: "ConsumerTooSlow", + }, + }: + case <-time.After(time.Second * 5): + log.Warnw("failed to send error frame to backed up consumer", "ident", torem.ident) + } } + torem.lk.Unlock() torem.cleanup() }(s) } @@ -112,6 +116,9 @@ type Subscriber struct { cleanup func() + lk sync.Mutex + cleanedUp bool + ident string enqueuedCounter prometheus.Counter broadcastCounter prometheus.Counter @@ -177,9 +184,12 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func } sub.cleanup = sync.OnceFunc(func() { + sub.lk.Lock() + defer sub.lk.Unlock() close(done) em.rmSubscriber(sub) close(sub.outgoing) + sub.cleanedUp = true }) if since == nil { @@ -260,6 +270,8 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func func sequenceForEvent(evt *XRPCStreamEvent) int64 { switch { + case evt == nil: + return -1 case evt.RepoCommit != nil: return evt.RepoCommit.Seq case evt.RepoHandle != nil: @@ -270,6 +282,8 @@ func sequenceForEvent(evt *XRPCStreamEvent) int64 { return evt.RepoTombstone.Seq case evt.RepoInfo != nil: return -1 + case evt.Error != nil: + return -1 default: return -1 } From 5cf0703417fa3640a4ced35e931fe6462b1ebea0 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 10 Nov 2023 09:29:32 -0800 Subject: [PATCH 19/22] make testing slow consumers easy --- cmd/gosky/main.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/cmd/gosky/main.go b/cmd/gosky/main.go index d2e231241..3c2fefa88 100644 --- a/cmd/gosky/main.go +++ b/cmd/gosky/main.go @@ -143,6 +143,10 @@ var readRepoStreamCmd = &cli.Command{ &cli.BoolFlag{ Name: "resolve-handles", }, + &cli.DurationFlag{ + Name: "read-delay", + Usage: "make handling each event take at least this long (debug utility)", + }, }, ArgsUsage: `[ [cursor]]`, Action: func(cctx *cli.Context) error { @@ -204,8 +208,14 @@ var readRepoStreamCmd = &cli.Command{ return h, nil } + rr := cctx.Duration("read-delay") + rsc := &events.RepoStreamCallbacks{ RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { + if rr != 0 { + time.Sleep(rr) + } + if jsonfmt { b, err := json.Marshal(evt) if err != nil { From 2e132ee2ccbbd3639ab08e3be3ecae3821cd6e61 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Fri, 10 Nov 2023 17:43:40 +0000 Subject: [PATCH 20/22] One more fun channel closing bugfix --- bgs/bgs.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/bgs/bgs.go b/bgs/bgs.go index d44963a1c..fa0e41565 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -606,7 +606,10 @@ func (bgs *BGS) EventsHandler(c echo.Context) error { header := events.EventHeader{Op: events.EvtKindMessage} for { select { - case evt := <-evts: + case evt, ok := <-evts: + if !ok { + return nil + } wc, err := conn.NextWriter(websocket.BinaryMessage) if err != nil { log.Errorf("failed to get next writer: %s", err) From ab48aed75834bbd8ecfd66ceeb019f108e8077b5 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Fri, 10 Nov 2023 21:24:29 +0000 Subject: [PATCH 21/22] Use a rate.limit --- cmd/gosky/main.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/cmd/gosky/main.go b/cmd/gosky/main.go index 3c2fefa88..fd572282b 100644 --- a/cmd/gosky/main.go +++ b/cmd/gosky/main.go @@ -25,6 +25,7 @@ import ( "github.com/bluesky-social/indigo/util" "github.com/bluesky-social/indigo/util/cliutil" "github.com/bluesky-social/indigo/xrpc" + "golang.org/x/time/rate" "github.com/gorilla/websocket" lru "github.com/hashicorp/golang-lru" @@ -143,9 +144,9 @@ var readRepoStreamCmd = &cli.Command{ &cli.BoolFlag{ Name: "resolve-handles", }, - &cli.DurationFlag{ - Name: "read-delay", - Usage: "make handling each event take at least this long (debug utility)", + &cli.Float64Flag{ + Name: "max-throughput", + Usage: "limit event consumption to a given # of req/sec (debug utility)", }, }, ArgsUsage: `[ [cursor]]`, @@ -207,13 +208,15 @@ var readRepoStreamCmd = &cli.Command{ return h, nil } - - rr := cctx.Duration("read-delay") + var limiter *rate.Limiter + if cctx.Float64("max-throughput") > 0 { + limiter = rate.NewLimiter(rate.Limit(cctx.Float64("max-throughput")), 1) + } rsc := &events.RepoStreamCallbacks{ RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { - if rr != 0 { - time.Sleep(rr) + if limiter != nil { + limiter.Wait(ctx) } if jsonfmt { From f5d06167023e76ceb418df4f43fbff66e4ffff01 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 10 Nov 2023 15:15:52 -0800 Subject: [PATCH 22/22] don't diff against existing data if performing a resync --- carstore/bs.go | 4 ++-- cmd/gosky/sync.go | 9 +++++++++ repomgr/repomgr.go | 6 ++++++ 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/carstore/bs.go b/carstore/bs.go index 7feb9d697..a992a2776 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -875,7 +875,7 @@ func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, n keepset[c] = true oblk, err := bs.Get(ctx, c) if err != nil { - return nil, err + return nil, fmt.Errorf("get failed in new tree: %w", err) } if err := cbg.ScanForLinks(bytes.NewReader(oblk.RawData()), func(lnk cid.Cid) { @@ -901,7 +901,7 @@ func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, n oblk, err := bs.Get(ctx, c) if err != nil { - return nil, err + return nil, fmt.Errorf("get failed in old tree: %w", err) } if err := cbg.ScanForLinks(bytes.NewReader(oblk.RawData()), func(lnk cid.Cid) { diff --git a/cmd/gosky/sync.go b/cmd/gosky/sync.go index f27c8b1af..6304205da 100644 --- a/cmd/gosky/sync.go +++ b/cmd/gosky/sync.go @@ -27,6 +27,11 @@ var syncGetRepoCmd = &cli.Command{ Name: "get-repo", Usage: "download repo from account's PDS to local file (or '-' for stdout). for hex combine with 'xxd -ps -u -c 0'", ArgsUsage: ` []`, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "host", + }, + }, Action: func(cctx *cli.Context) error { ctx := context.Background() arg := cctx.Args().First() @@ -57,6 +62,10 @@ var syncGetRepoCmd = &cli.Command{ return fmt.Errorf("no PDS endpoint for identity") } + if h := cctx.String("host"); h != "" { + xrpcc.Host = h + } + log.Infof("downloading from %s to: %s", xrpcc.Host, carPath) repoBytes, err := comatproto.SyncGetRepo(ctx, xrpcc, ident.DID.String(), "") if err != nil { diff --git a/repomgr/repomgr.go b/repomgr/repomgr.go index 77fb5adf6..14e61f6bc 100644 --- a/repomgr/repomgr.go +++ b/repomgr/repomgr.go @@ -769,6 +769,12 @@ func (rm *RepoManager) ImportNewRepo(ctx context.Context, user models.Uid, repoD return err } + if rev == nil { + // if 'rev' is nil, this implies a fresh sync. + // in this case, ignore any existing blocks we have and treat this like a clean import. + curhead = cid.Undef + } + if rev != nil && *rev != currev { // TODO: we could probably just deal with this return fmt.Errorf("ImportNewRepo called with incorrect base")