Skip to content

Commit

Permalink
Merge branch 'main' into trusted_slurp_domains
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 committed Nov 13, 2023
2 parents f7372c1 + 1e3e99d commit 9f01575
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 33 deletions.
21 changes: 21 additions & 0 deletions bgs/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ type enrichedPDS struct {
EventsSeenSinceStartup uint64 `json:"EventsSeenSinceStartup"`
IngestRate rateLimit `json:"IngestRate"`
CrawlRate rateLimit `json:"CrawlRate"`
UserCount int64 `json:"UserCount"`
}

type UserCount struct {
PDSID uint `gorm:"column:pds"`
UserCount int64 `gorm:"column:user_count"`
}

func (bgs *BGS) handleListPDSs(e echo.Context) error {
Expand All @@ -118,6 +124,20 @@ func (bgs *BGS) handleListPDSs(e echo.Context) error {

activePDSHosts := bgs.slurper.GetActiveList()

var userCounts []UserCount
if err := bgs.db.Model(&User{}).
Select("pds, count(*) as user_count").
Group("pds").
Find(&userCounts).Error; err != nil {
return err
}

// Create a map for fast lookup
userCountMap := make(map[uint]int64)
for _, count := range userCounts {
userCountMap[count.PDSID] = count.UserCount
}

for i, p := range pds {
enrichedPDSs[i].PDS = p
enrichedPDSs[i].HasActiveConnection = false
Expand All @@ -133,6 +153,7 @@ func (bgs *BGS) handleListPDSs(e echo.Context) error {
continue
}
enrichedPDSs[i].EventsSeenSinceStartup = uint64(m.Counter.GetValue())
enrichedPDSs[i].UserCount = userCountMap[p.ID]

// Get the ingest rate limit for this PDS
ingestRate := rateLimit{
Expand Down
32 changes: 17 additions & 15 deletions bgs/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@ import (
atproto "github.com/bluesky-social/indigo/api/atproto"
comatprototypes "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/blobs"
"github.com/bluesky-social/indigo/carstore"
"github.com/bluesky-social/indigo/mst"
"gorm.io/gorm"

"github.com/bluesky-social/indigo/xrpc"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/ipld/go-car"
"github.com/labstack/echo/v4"
)

func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, commit string, did string, rkey string) (io.Reader, error) {
func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, did string, rkey string) (io.Reader, error) {
u, err := s.lookupUserByDid(ctx, did)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
Expand All @@ -40,16 +43,7 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri
return nil, fmt.Errorf("account was taken down")
}

reqCid := cid.Undef
if commit != "" {
reqCid, err = cid.Decode(commit)
if err != nil {
log.Errorw("failed to decode commit cid", "err", err, "cid", commit)
return nil, echo.NewHTTPError(http.StatusBadRequest, "failed to decode commit cid")
}
}

_, record, err := s.repoman.GetRecord(ctx, u.ID, collection, rkey, reqCid)
root, blocks, err := s.repoman.GetRecordProof(ctx, u.ID, collection, rkey)
if err != nil {
if errors.Is(err, mst.ErrNotFound) {
return nil, echo.NewHTTPError(http.StatusNotFound, "record not found in repo")
Expand All @@ -59,10 +53,18 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri
}

buf := new(bytes.Buffer)
err = record.MarshalCBOR(buf)
if err != nil {
log.Errorw("failed to marshal record to CBOR", "err", err, "did", did, "collection", collection, "rkey", rkey)
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to marshal record to CBOR")
hb, err := cbor.DumpObject(&car.CarHeader{
Roots: []cid.Cid{root},
Version: 1,
})
if _, err := carstore.LdWrite(buf, hb); err != nil {
return nil, err
}

for _, blk := range blocks {
if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil {
return nil, err
}
}

return buf, nil
Expand Down
10 changes: 1 addition & 9 deletions bgs/stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ func (s *BGS) HandleComAtprotoSyncGetRecord(c echo.Context) error {
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetRecord")
defer span.End()
collection := c.QueryParam("collection")
commit := c.QueryParam("commit")
did := c.QueryParam("did")
rkey := c.QueryParam("rkey")

Expand All @@ -131,17 +130,10 @@ func (s *BGS) HandleComAtprotoSyncGetRecord(c echo.Context) error {
return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid did: %s", did)})
}

if commit != "" {
_, err = cid.Parse(commit)
if err != nil {
return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid commit: %s", commit)})
}
}

var out io.Reader
var handleErr error
// func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context,collection string,commit string,did string,rkey string) (io.Reader, error)
out, handleErr = s.handleComAtprotoSyncGetRecord(ctx, collection, commit, did, rkey)
out, handleErr = s.handleComAtprotoSyncGetRecord(ctx, collection, did, rkey)
if handleErr != nil {
return handleErr
}
Expand Down
8 changes: 6 additions & 2 deletions carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, n
keepset[c] = true
oblk, err := bs.Get(ctx, c)
if err != nil {
return nil, err
return nil, fmt.Errorf("get failed in new tree: %w", err)
}

if err := cbg.ScanForLinks(bytes.NewReader(oblk.RawData()), func(lnk cid.Cid) {
Expand All @@ -901,10 +901,14 @@ func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, n

oblk, err := bs.Get(ctx, c)
if err != nil {
return nil, err
return nil, fmt.Errorf("get failed in old tree: %w", err)
}

if err := cbg.ScanForLinks(bytes.NewReader(oblk.RawData()), func(lnk cid.Cid) {
if lnk.Prefix().Codec != cid.DagCBOR {
return
}

if !keepset[lnk] {
dropset[lnk] = true
queue = append(queue, lnk)
Expand Down
13 changes: 13 additions & 0 deletions cmd/gosky/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/bluesky-social/indigo/util"
"github.com/bluesky-social/indigo/util/cliutil"
"github.com/bluesky-social/indigo/xrpc"
"golang.org/x/time/rate"

"github.com/gorilla/websocket"
lru "github.com/hashicorp/golang-lru"
Expand Down Expand Up @@ -143,6 +144,10 @@ var readRepoStreamCmd = &cli.Command{
&cli.BoolFlag{
Name: "resolve-handles",
},
&cli.Float64Flag{
Name: "max-throughput",
Usage: "limit event consumption to a given # of req/sec (debug utility)",
},
},
ArgsUsage: `[<repo> [cursor]]`,
Action: func(cctx *cli.Context) error {
Expand Down Expand Up @@ -203,9 +208,17 @@ var readRepoStreamCmd = &cli.Command{

return h, nil
}
var limiter *rate.Limiter
if cctx.Float64("max-throughput") > 0 {
limiter = rate.NewLimiter(rate.Limit(cctx.Float64("max-throughput")), 1)
}

rsc := &events.RepoStreamCallbacks{
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
if limiter != nil {
limiter.Wait(ctx)
}

if jsonfmt {
b, err := json.Marshal(evt)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions cmd/gosky/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ var syncGetRepoCmd = &cli.Command{
Name: "get-repo",
Usage: "download repo from account's PDS to local file (or '-' for stdout). for hex combine with 'xxd -ps -u -c 0'",
ArgsUsage: `<at-identifier> [<car-file-path>]`,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "host",
},
},
Action: func(cctx *cli.Context) error {
ctx := context.Background()
arg := cctx.Args().First()
Expand Down Expand Up @@ -57,6 +62,10 @@ var syncGetRepoCmd = &cli.Command{
return fmt.Errorf("no PDS endpoint for identity")
}

if h := cctx.String("host"); h != "" {
xrpcc.Host = h
}

log.Infof("downloading from %s to: %s", xrpcc.Host, carPath)
repoBytes, err := comatproto.SyncGetRepo(ctx, xrpcc, ident.DID.String(), "")
if err != nil {
Expand Down
40 changes: 33 additions & 7 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync"
"time"

comatproto "github.com/bluesky-social/indigo/api/atproto"
label "github.com/bluesky-social/indigo/api/label"
Expand Down Expand Up @@ -73,11 +74,24 @@ func (em *EventManager) broadcastEvent(evt *XRPCStreamEvent) {
select {
case s.outgoing <- evt:
case <-s.done:
default:
log.Warnw("dropping slow consumer due to event overflow", "bufferSize", len(s.outgoing), "ident", s.ident)
go func(torem *Subscriber) {
em.rmSubscriber(torem)
torem.lk.Lock()
if !torem.cleanedUp {
select {
case torem.outgoing <- &XRPCStreamEvent{
Error: &ErrorFrame{
Error: "ConsumerTooSlow",
},
}:
case <-time.After(time.Second * 5):
log.Warnw("failed to send error frame to backed up consumer", "ident", torem.ident)
}
}
torem.lk.Unlock()
torem.cleanup()
}(s)
default:
log.Warnf("event overflow (%d)", len(s.outgoing))
}
s.broadcastCounter.Inc()
}
Expand All @@ -100,6 +114,11 @@ type Subscriber struct {

done chan struct{}

cleanup func()

lk sync.Mutex
cleanedUp bool

ident string
enqueuedCounter prometheus.Counter
broadcastCounter prometheus.Counter
Expand Down Expand Up @@ -164,15 +183,18 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func
broadcastCounter: eventsBroadcast.WithLabelValues(ident),
}

cleanup := func() {
sub.cleanup = sync.OnceFunc(func() {
sub.lk.Lock()
defer sub.lk.Unlock()
close(done)
em.rmSubscriber(sub)
close(sub.outgoing)
}
sub.cleanedUp = true
})

if since == nil {
em.addSubscriber(sub)
return sub.outgoing, cleanup, nil
return sub.outgoing, sub.cleanup, nil
}

out := make(chan *XRPCStreamEvent, em.bufferSize)
Expand Down Expand Up @@ -243,11 +265,13 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func
}
}()

return out, cleanup, nil
return out, sub.cleanup, nil
}

func sequenceForEvent(evt *XRPCStreamEvent) int64 {
switch {
case evt == nil:
return -1
case evt.RepoCommit != nil:
return evt.RepoCommit.Seq
case evt.RepoHandle != nil:
Expand All @@ -258,6 +282,8 @@ func sequenceForEvent(evt *XRPCStreamEvent) int64 {
return evt.RepoTombstone.Seq
case evt.RepoInfo != nil:
return -1
case evt.Error != nil:
return -1
default:
return -1
}
Expand Down
34 changes: 34 additions & 0 deletions repomgr/repomgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (
"github.com/bluesky-social/indigo/models"
"github.com/bluesky-social/indigo/mst"
"github.com/bluesky-social/indigo/repo"
"github.com/bluesky-social/indigo/util"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
Expand Down Expand Up @@ -447,6 +449,32 @@ func (rm *RepoManager) GetRecord(ctx context.Context, user models.Uid, collectio
return ocid, val, nil
}

func (rm *RepoManager) GetRecordProof(ctx context.Context, user models.Uid, collection string, rkey string) (cid.Cid, []blocks.Block, error) {
robs, err := rm.cs.ReadOnlySession(user)
if err != nil {
return cid.Undef, nil, err
}

bs := util.NewLoggingBstore(robs)

head, err := rm.cs.GetUserRepoHead(ctx, user)
if err != nil {
return cid.Undef, nil, err
}

r, err := repo.OpenRepo(ctx, bs, head, true)
if err != nil {
return cid.Undef, nil, err
}

_, _, err = r.GetRecord(ctx, collection+"/"+rkey)
if err != nil {
return cid.Undef, nil, err
}

return head, bs.GetLoggedBlocks(), nil
}

func (rm *RepoManager) GetProfile(ctx context.Context, uid models.Uid) (*bsky.ActorProfile, error) {
bs, err := rm.cs.ReadOnlySession(uid)
if err != nil {
Expand Down Expand Up @@ -741,6 +769,12 @@ func (rm *RepoManager) ImportNewRepo(ctx context.Context, user models.Uid, repoD
return err
}

if rev == nil {
// if 'rev' is nil, this implies a fresh sync.
// in this case, ignore any existing blocks we have and treat this like a clean import.
curhead = cid.Undef
}

if rev != nil && *rev != currev {
// TODO: we could probably just deal with this
return fmt.Errorf("ImportNewRepo called with incorrect base")
Expand Down
Loading

0 comments on commit 9f01575

Please sign in to comment.