Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement repo tombstone handler #340

Merged
merged 3 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 70 additions & 1 deletion bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ type User struct {
// TakenDown is set to true if the user in question has been taken down.
// A user in this state will have all future events related to it dropped
// and no data about this user will be served.
TakenDown bool
TakenDown bool
Tombstoned bool
}

type addTargetBody struct {
Expand Down Expand Up @@ -776,6 +777,33 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
return fmt.Errorf("rebase was true in event seq:%d,host:%s", evt.Seq, host.Host)
}

if host.ID != u.PDS {
log.Infow("received event for repo from different pds than expected", "repo", evt.Repo, "expPds", u.PDS, "gotPds", host.Host)
subj, err := bgs.createExternalUser(ctx, evt.Repo)
if err != nil {
return err
}

if subj.PDS != host.ID {
return fmt.Errorf("event from non-authoritative pds")
}
}

if u.Tombstoned {
// we've checked the authority of the users PDS, so reinstate the account
if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumn("tombstoned", false).Error; err != nil {
return fmt.Errorf("failed to un-tombstone a user: %w", err)
}

ai, err := bgs.Index.LookupUser(ctx, u.ID)
if err != nil {
return fmt.Errorf("failed to look up user (tombstone recover): %w", err)
}

// Now a simple re-crawl should suffice to bring the user back online
return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
}

// skip the fast path for rebases or if the user is already in the slow path
if bgs.Index.Crawler.RepoInSlowPath(ctx, host, u.ID) {
rebasesCounter.WithLabelValues(host.Host).Add(1)
Expand Down Expand Up @@ -860,12 +888,52 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
return err
}

return nil
case env.RepoTombstone != nil:
if err := bgs.handleRepoTombstone(ctx, host, env.RepoTombstone); err != nil {
return err
}

return nil
default:
return fmt.Errorf("invalid fed event")
}
}

func (bgs *BGS) handleRepoTombstone(ctx context.Context, pds *models.PDS, evt *atproto.SyncSubscribeRepos_Tombstone) error {
u, err := bgs.lookupUserByDid(ctx, evt.Did)
if err != nil {
return err
}

if u.PDS != pds.ID {
return fmt.Errorf("unauthoritative tombstone event from %s for %s", pds.Host, evt.Did)
}

if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumns(map[string]any{
"tombstoned": true,
"handle": nil,
}).Error; err != nil {
return err
}

if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{
"handle": nil,
}).Error; err != nil {
return err
}

// delete data from carstore
if err := bgs.repoman.TakeDownRepo(ctx, u.ID); err != nil {
// don't let a failure here prevent us from propagating this event
log.Errorf("failed to delete user data from carstore: %s", err)
}

return bgs.events.AddEvent(ctx, &events.XRPCStreamEvent{
RepoTombstone: evt,
})
}

func (s *BGS) syncUserBlobs(ctx context.Context, pds *models.PDS, user models.Uid, blobs []string) error {
if s.blobs == nil {
log.Debugf("blob syncing disabled")
Expand Down Expand Up @@ -1009,6 +1077,7 @@ func (s *BGS) createExternalUser(ctx context.Context, did string) (*models.Actor
return nil, fmt.Errorf("failed to update users pds: %w", err)
}

exu.PDS = peering.ID
}

if exu.Handle.String != handle {
Expand Down
41 changes: 33 additions & 8 deletions bgs/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,22 @@ import (
)

func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, commit string, did string, rkey string) (io.Reader, error) {
u, err := s.Index.LookupUserByDid(ctx, did)
u, err := s.lookupUserByDid(ctx, did)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, echo.NewHTTPError(http.StatusNotFound, "user not found")
}
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
}

if u.Tombstoned {
return nil, fmt.Errorf("account was deleted")
}

if u.TakenDown {
return nil, fmt.Errorf("account was taken down")
}

reqCid := cid.Undef
if commit != "" {
reqCid, err = cid.Decode(commit)
Expand All @@ -37,7 +45,7 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri
}
}

_, record, err := s.repoman.GetRecord(ctx, u.Uid, collection, rkey, reqCid)
_, record, err := s.repoman.GetRecord(ctx, u.ID, collection, rkey, reqCid)
if err != nil {
return nil, fmt.Errorf("failed to get record: %w", err)
}
Expand All @@ -52,17 +60,25 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri
}

func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since string) (io.Reader, error) {
u, err := s.Index.LookupUserByDid(ctx, did)
u, err := s.lookupUserByDid(ctx, did)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, echo.NewHTTPError(http.StatusNotFound, "user not found")
}
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
}

if u.Tombstoned {
return nil, fmt.Errorf("account was deleted")
}

if u.TakenDown {
return nil, fmt.Errorf("account was taken down")
}

// TODO: stream the response
buf := new(bytes.Buffer)
if err := s.repoman.ReadRepo(ctx, u.Uid, since, buf); err != nil {
if err := s.repoman.ReadRepo(ctx, u.ID, since, buf); err != nil {
return nil, fmt.Errorf("failed to read repo: %w", err)
}

Expand Down Expand Up @@ -158,7 +174,7 @@ func (s *BGS) handleComAtprotoSyncListRepos(ctx context.Context, cursor string,
}

users := []User{}
if err := s.db.Model(&User{}).Where("id > ?", c).Order("id").Limit(limit).Find(&users).Error; err != nil {
if err := s.db.Model(&User{}).Where("id > ? AND NOT tombstoned AND NOT taken_down", c).Order("id").Limit(limit).Find(&users).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return &comatprototypes.SyncListRepos_Output{}, nil
}
Expand All @@ -175,6 +191,7 @@ func (s *BGS) handleComAtprotoSyncListRepos(ctx context.Context, cursor string,

for i := range users {
user := users[i]

root, err := s.repoman.GetRepoRoot(ctx, user.ID)
if err != nil {
return nil, fmt.Errorf("failed to get repo root for (%s): %w", user.Did, err)
Expand All @@ -194,20 +211,28 @@ func (s *BGS) handleComAtprotoSyncListRepos(ctx context.Context, cursor string,
}

func (s *BGS) handleComAtprotoSyncGetLatestCommit(ctx context.Context, did string) (*comatprototypes.SyncGetLatestCommit_Output, error) {
u, err := s.Index.LookupUserByDid(ctx, did)
u, err := s.lookupUserByDid(ctx, did)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, echo.NewHTTPError(http.StatusNotFound, "user not found")
}
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
}

root, err := s.repoman.GetRepoRoot(ctx, u.Uid)
if u.Tombstoned {
return nil, fmt.Errorf("account was deleted")
}

if u.TakenDown {
return nil, fmt.Errorf("account was taken down")
}

root, err := s.repoman.GetRepoRoot(ctx, u.ID)
if err != nil {
return nil, fmt.Errorf("failed to get repo root: %w", err)
}

rev, err := s.repoman.GetRepoRev(ctx, u.Uid)
rev, err := s.repoman.GetRepoRev(ctx, u.ID)
if err != nil {
return nil, fmt.Errorf("failed to get repo rev: %w", err)
}
Expand Down