Skip to content

Commit

Permalink
Merge
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 committed Oct 2, 2023
2 parents 176fdf8 + cf60dd2 commit b02b058
Show file tree
Hide file tree
Showing 11 changed files with 464 additions and 61 deletions.
55 changes: 49 additions & 6 deletions bgs/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/bluesky-social/indigo/models"
"github.com/labstack/echo/v4"
dto "github.com/prometheus/client_model/go"
"go.opentelemetry.io/otel"
"golang.org/x/time/rate"
"gorm.io/gorm"
)
Expand Down Expand Up @@ -397,7 +398,8 @@ func (bgs *BGS) handleAdminChangePDSCrawlLimit(e echo.Context) error {
}

func (bgs *BGS) handleAdminCompactRepo(e echo.Context) error {
ctx := e.Request().Context()
ctx, span := otel.Tracer("bgs").Start(context.Background(), "adminCompactRepo")
defer span.End()

did := e.QueryParam("did")
if did == "" {
Expand All @@ -421,15 +423,30 @@ func (bgs *BGS) handleAdminCompactRepo(e echo.Context) error {
}

func (bgs *BGS) handleAdminCompactAllRepos(e echo.Context) error {
ctx := e.Request().Context()
ctx, span := otel.Tracer("bgs").Start(context.Background(), "adminCompactAllRepos")
defer span.End()

var dry bool
if strings.ToLower(e.QueryParam("dry")) == "true" {
dry = true
}

lim := 50
if limstr := e.QueryParam("limit"); limstr != "" {
v, err := strconv.Atoi(limstr)
if err != nil {
return err
}

lim = v
}

if err := bgs.runRepoCompaction(ctx); err != nil {
stats, err := bgs.runRepoCompaction(ctx, lim, dry)
if err != nil {
return fmt.Errorf("compaction run failed: %w", err)
}

return e.JSON(200, map[string]any{
"success": "true",
})
return e.JSON(200, stats)
}

func (bgs *BGS) handleAdminPostResyncPDS(e echo.Context) error {
Expand Down Expand Up @@ -481,3 +498,29 @@ func (bgs *BGS) handleAdminGetResyncPDS(e echo.Context) error {
"resync": resync,
})
}

func (bgs *BGS) handleAdminResetRepo(e echo.Context) error {
ctx := e.Request().Context()

did := e.QueryParam("did")
if did == "" {
return fmt.Errorf("must pass a did")
}

ai, err := bgs.Index.LookupUserByDid(ctx, did)
if err != nil {
return fmt.Errorf("no such user: %w", err)
}

if err := bgs.repoman.ResetRepo(ctx, ai.Uid); err != nil {
return err
}

if err := bgs.Index.Crawler.Crawl(ctx, ai); err != nil {
return err
}

return e.JSON(200, map[string]any{
"success": true,
})
}
110 changes: 104 additions & 6 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,13 @@ func (bgs *BGS) StartWithListener(listen net.Listener) error {

log.Warnf("HANDLER ERROR: (%s) %s", ctx.Path(), err)

if strings.HasPrefix(ctx.Path(), "/admin/") {
ctx.JSON(500, map[string]any{
"error": err.Error(),
})
return
}

if sendHeader {
ctx.Response().WriteHeader(500)
}
Expand Down Expand Up @@ -326,6 +333,7 @@ func (bgs *BGS) StartWithListener(listen net.Listener) error {
admin.POST("/repo/reverseTakedown", bgs.handleAdminReverseTakedown)
admin.POST("/repo/compact", bgs.handleAdminCompactRepo)
admin.POST("/repo/compactAll", bgs.handleAdminCompactAllRepos)
admin.POST("/repo/reset", bgs.handleAdminResetRepo)

// PDS-related Admin API
admin.GET("/pds/list", bgs.handleListPDSs)
Expand Down Expand Up @@ -445,7 +453,8 @@ type User struct {
// TakenDown is set to true if the user in question has been taken down.
// A user in this state will have all future events related to it dropped
// and no data about this user will be served.
TakenDown bool
TakenDown bool
Tombstoned bool
}

type addTargetBody struct {
Expand Down Expand Up @@ -768,6 +777,33 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
return fmt.Errorf("rebase was true in event seq:%d,host:%s", evt.Seq, host.Host)
}

if host.ID != u.PDS {
log.Infow("received event for repo from different pds than expected", "repo", evt.Repo, "expPds", u.PDS, "gotPds", host.Host)
subj, err := bgs.createExternalUser(ctx, evt.Repo)
if err != nil {
return err
}

if subj.PDS != host.ID {
return fmt.Errorf("event from non-authoritative pds")
}
}

if u.Tombstoned {
// we've checked the authority of the users PDS, so reinstate the account
if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumn("tombstoned", false).Error; err != nil {
return fmt.Errorf("failed to un-tombstone a user: %w", err)
}

ai, err := bgs.Index.LookupUser(ctx, u.ID)
if err != nil {
return fmt.Errorf("failed to look up user (tombstone recover): %w", err)
}

// Now a simple re-crawl should suffice to bring the user back online
return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
}

// skip the fast path for rebases or if the user is already in the slow path
if bgs.Index.Crawler.RepoInSlowPath(ctx, host, u.ID) {
rebasesCounter.WithLabelValues(host.Host).Add(1)
Expand Down Expand Up @@ -852,12 +888,52 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
return err
}

return nil
case env.RepoTombstone != nil:
if err := bgs.handleRepoTombstone(ctx, host, env.RepoTombstone); err != nil {
return err
}

return nil
default:
return fmt.Errorf("invalid fed event")
}
}

func (bgs *BGS) handleRepoTombstone(ctx context.Context, pds *models.PDS, evt *atproto.SyncSubscribeRepos_Tombstone) error {
u, err := bgs.lookupUserByDid(ctx, evt.Did)
if err != nil {
return err
}

if u.PDS != pds.ID {
return fmt.Errorf("unauthoritative tombstone event from %s for %s", pds.Host, evt.Did)
}

if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumns(map[string]any{
"tombstoned": true,
"handle": nil,
}).Error; err != nil {
return err
}

if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{
"handle": nil,
}).Error; err != nil {
return err
}

// delete data from carstore
if err := bgs.repoman.TakeDownRepo(ctx, u.ID); err != nil {
// don't let a failure here prevent us from propagating this event
log.Errorf("failed to delete user data from carstore: %s", err)
}

return bgs.events.AddEvent(ctx, &events.XRPCStreamEvent{
RepoTombstone: evt,
})
}

func (s *BGS) syncUserBlobs(ctx context.Context, pds *models.PDS, user models.Uid, blobs []string) error {
if s.blobs == nil {
log.Debugf("blob syncing disabled")
Expand Down Expand Up @@ -1001,6 +1077,7 @@ func (s *BGS) createExternalUser(ctx context.Context, did string) (*models.Actor
return nil, fmt.Errorf("failed to update users pds: %w", err)
}

exu.PDS = peering.ID
}

if exu.Handle.String != handle {
Expand Down Expand Up @@ -1119,23 +1196,44 @@ func (bgs *BGS) ReverseTakedown(ctx context.Context, did string) error {
return nil
}

func (bgs *BGS) runRepoCompaction(ctx context.Context) error {
type compactionStats struct {
Completed map[models.Uid]*carstore.CompactionStats
Targets []carstore.CompactionTarget
}

func (bgs *BGS) runRepoCompaction(ctx context.Context, lim int, dry bool) (*compactionStats, error) {
ctx, span := otel.Tracer("bgs").Start(ctx, "runRepoCompaction")
defer span.End()

repos, err := bgs.repoman.CarStore().GetCompactionTargets(ctx)
repos, err := bgs.repoman.CarStore().GetCompactionTargets(ctx, 50)
if err != nil {
return fmt.Errorf("failed to get repos to compact: %w", err)
return nil, fmt.Errorf("failed to get repos to compact: %w", err)
}

if lim > 0 && len(repos) > lim {
repos = repos[:lim]
}

if dry {
return &compactionStats{
Targets: repos,
}, nil
}

results := make(map[models.Uid]*carstore.CompactionStats)
for _, r := range repos {
if _, err := bgs.repoman.CarStore().CompactUserShards(ctx, r.Usr); err != nil {
st, err := bgs.repoman.CarStore().CompactUserShards(ctx, r.Usr)
if err != nil {
log.Errorf("failed to compact shards for user %d: %s", r.Usr, err)
continue
}
results[r.Usr] = st
}

return nil
return &compactionStats{
Targets: repos,
Completed: results,
}, nil
}

type repoHead struct {
Expand Down
41 changes: 33 additions & 8 deletions bgs/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, commit string, did string, rkey string) (io.Reader, error) {
u, err := s.Index.LookupUserByDid(ctx, did)
u, err := s.lookupUserByDid(ctx, did)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, echo.NewHTTPError(http.StatusNotFound, "user not found")
Expand All @@ -32,6 +32,14 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
}

if u.Tombstoned {
return nil, fmt.Errorf("account was deleted")
}

if u.TakenDown {
return nil, fmt.Errorf("account was taken down")
}

reqCid := cid.Undef
if commit != "" {
reqCid, err = cid.Decode(commit)
Expand All @@ -41,7 +49,7 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri
}
}

_, record, err := s.repoman.GetRecord(ctx, u.Uid, collection, rkey, reqCid)
_, record, err := s.repoman.GetRecord(ctx, u.ID, collection, rkey, reqCid)
if err != nil {
if errors.Is(err, mst.ErrNotFound) {
return nil, echo.NewHTTPError(http.StatusNotFound, "record not found in repo")
Expand All @@ -61,7 +69,7 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri
}

func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since string) (io.Reader, error) {
u, err := s.Index.LookupUserByDid(ctx, did)
u, err := s.lookupUserByDid(ctx, did)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, echo.NewHTTPError(http.StatusNotFound, "user not found")
Expand All @@ -70,9 +78,17 @@ func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
}

if u.Tombstoned {
return nil, fmt.Errorf("account was deleted")
}

if u.TakenDown {
return nil, fmt.Errorf("account was taken down")
}

// TODO: stream the response
buf := new(bytes.Buffer)
if err := s.repoman.ReadRepo(ctx, u.Uid, since, buf); err != nil {
if err := s.repoman.ReadRepo(ctx, u.ID, since, buf); err != nil {
log.Errorw("failed to read repo into buffer", "err", err, "did", did)
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to read repo into buffer")
}
Expand Down Expand Up @@ -164,7 +180,7 @@ func (s *BGS) handleComAtprotoSyncListRepos(ctx context.Context, cursor string,
}

users := []User{}
if err := s.db.Model(&User{}).Where("id > ?", c).Order("id").Limit(limit).Find(&users).Error; err != nil {
if err := s.db.Model(&User{}).Where("id > ? AND NOT tombstoned AND NOT taken_down", c).Order("id").Limit(limit).Find(&users).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return &comatprototypes.SyncListRepos_Output{}, nil
}
Expand All @@ -182,6 +198,7 @@ func (s *BGS) handleComAtprotoSyncListRepos(ctx context.Context, cursor string,

for i := range users {
user := users[i]

root, err := s.repoman.GetRepoRoot(ctx, user.ID)
if err != nil {
log.Errorw("failed to get repo root", "err", err, "did", user.Did)
Expand All @@ -202,21 +219,29 @@ func (s *BGS) handleComAtprotoSyncListRepos(ctx context.Context, cursor string,
}

func (s *BGS) handleComAtprotoSyncGetLatestCommit(ctx context.Context, did string) (*comatprototypes.SyncGetLatestCommit_Output, error) {
u, err := s.Index.LookupUserByDid(ctx, did)
u, err := s.lookupUserByDid(ctx, did)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, echo.NewHTTPError(http.StatusNotFound, "user not found")
}
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
}

root, err := s.repoman.GetRepoRoot(ctx, u.Uid)
if u.Tombstoned {
return nil, fmt.Errorf("account was deleted")
}

if u.TakenDown {
return nil, fmt.Errorf("account was taken down")
}

root, err := s.repoman.GetRepoRoot(ctx, u.ID)
if err != nil {
log.Errorw("failed to get repo root", "err", err, "did", u.Did)
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to get repo root")
}

rev, err := s.repoman.GetRepoRev(ctx, u.Uid)
rev, err := s.repoman.GetRepoRev(ctx, u.ID)
if err != nil {
log.Errorw("failed to get repo rev", "err", err, "did", u.Did)
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to get repo rev")
Expand Down
Loading

0 comments on commit b02b058

Please sign in to comment.