diff --git a/bgs/bgs.go b/bgs/bgs.go index 35dfab9d9..8e584cb5f 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -27,6 +27,7 @@ import ( "github.com/bluesky-social/indigo/models" "github.com/bluesky-social/indigo/repomgr" "github.com/bluesky-social/indigo/xrpc" + lru "github.com/hashicorp/golang-lru/v2" "golang.org/x/sync/semaphore" "golang.org/x/time/rate" @@ -87,6 +88,9 @@ type BGS struct { // Management of Compaction compactor *Compactor + + // User cache + userCache *lru.Cache[string, *User] } type PDSResync struct { @@ -136,6 +140,8 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm db.AutoMigrate(models.PDS{}) db.AutoMigrate(models.DomainBan{}) + uc, _ := lru.New[string, *User](1_000_000) + bgs := &BGS{ Index: ix, db: db, @@ -151,6 +157,8 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm consumers: make(map[uint64]*SocketConsumer), pdsResyncs: make(map[uint]*PDSResync), + + userCache: uc, } ix.CreateExternalUser = bgs.createExternalUser @@ -520,7 +528,45 @@ type User struct { Tombstoned bool // UpstreamStatus is the state of the user as reported by the upstream PDS - UpstreamStatus string `gorm:"index"` + UpstreamStatusT string `gorm:"index"` + + lk sync.Mutex +} + +func (u *User) SetTakenDown(v bool) { + u.lk.Lock() + defer u.lk.Unlock() + u.TakenDown = v +} + +func (u *User) GetTakenDown() bool { + u.lk.Lock() + defer u.lk.Unlock() + return u.TakenDown +} + +func (u *User) SetTombstoned(v bool) { + u.lk.Lock() + defer u.lk.Unlock() + u.Tombstoned = v +} + +func (u *User) GetTombstoned() bool { + u.lk.Lock() + defer u.lk.Unlock() + return u.Tombstoned +} + +func (u *User) SetUpstreamStatus(v string) { + u.lk.Lock() + defer u.lk.Unlock() + u.UpstreamStatusT = v +} + +func (u *User) GetUpstreamStatus() string { + u.lk.Lock() + defer u.lk.Unlock() + return u.UpstreamStatusT } type addTargetBody struct { @@ -771,6 +817,11 @@ func (bgs *BGS) lookupUserByDid(ctx context.Context, did string) (*User, error) ctx, span := tracer.Start(ctx, "lookupUserByDid") defer span.End() + cu, ok := bgs.userCache.Get(did) + if ok { + return cu, nil + } + var u User if err := bgs.db.Find(&u, "did = ?", did).Error; err != nil { return nil, err @@ -780,6 +831,8 @@ func (bgs *BGS) lookupUserByDid(ctx context.Context, did string) (*User, error) return nil, gorm.ErrRecordNotFound } + bgs.userCache.Add(did, &u) + return &u, nil } @@ -840,20 +893,21 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event u.Did = evt.Repo } - span.SetAttributes(attribute.String("upstream_status", u.UpstreamStatus)) + ustatus := u.GetUpstreamStatus() + span.SetAttributes(attribute.String("upstream_status", ustatus)) - if u.TakenDown || u.UpstreamStatus == events.AccountStatusTakendown { - span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.TakenDown)) + if u.GetTakenDown() || ustatus == events.AccountStatusTakendown { + span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.GetTakenDown())) log.Debugw("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host) return nil } - if u.UpstreamStatus == events.AccountStatusSuspended { + if ustatus == events.AccountStatusSuspended { log.Debugw("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host) return nil } - if u.UpstreamStatus == events.AccountStatusDeactivated { + if ustatus == events.AccountStatusDeactivated { log.Debugw("dropping commit event from deactivated user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host) return nil } @@ -877,12 +931,13 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event } } - if u.Tombstoned { + if u.GetTombstoned() { span.SetAttributes(attribute.Bool("tombstoned", true)) // we've checked the authority of the users PDS, so reinstate the account if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumn("tombstoned", false).Error; err != nil { return fmt.Errorf("failed to un-tombstone a user: %w", err) } + u.SetTombstoned(false) ai, err := bgs.Index.LookupUser(ctx, u.ID) if err != nil { @@ -1041,7 +1096,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event return fmt.Errorf("failed to look up user by did: %w", err) } - if u.TakenDown { + if u.GetTakenDown() { shouldBeActive = false status = &events.AccountStatusTakendown } @@ -1370,18 +1425,22 @@ func (bgs *BGS) UpdateAccountStatus(ctx context.Context, did string, status stri if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusActive).Error; err != nil { return fmt.Errorf("failed to set user active status: %w", err) } + u.SetUpstreamStatus(events.AccountStatusActive) case events.AccountStatusDeactivated: if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusDeactivated).Error; err != nil { return fmt.Errorf("failed to set user deactivation status: %w", err) } + u.SetUpstreamStatus(events.AccountStatusDeactivated) case events.AccountStatusSuspended: if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusSuspended).Error; err != nil { return fmt.Errorf("failed to set user suspension status: %w", err) } + u.SetUpstreamStatus(events.AccountStatusSuspended) case events.AccountStatusTakendown: if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusTakendown).Error; err != nil { return fmt.Errorf("failed to set user taken down status: %w", err) } + u.SetUpstreamStatus(events.AccountStatusTakendown) if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{ "handle": nil, @@ -1396,6 +1455,7 @@ func (bgs *BGS) UpdateAccountStatus(ctx context.Context, did string, status stri }).Error; err != nil { return err } + u.SetUpstreamStatus(events.AccountStatusDeleted) if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{ "handle": nil, @@ -1422,6 +1482,7 @@ func (bgs *BGS) TakeDownRepo(ctx context.Context, did string) error { if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("taken_down", true).Error; err != nil { return err } + u.SetTakenDown(true) if err := bgs.repoman.TakeDownRepo(ctx, u.ID); err != nil { return err @@ -1443,6 +1504,7 @@ func (bgs *BGS) ReverseTakedown(ctx context.Context, did string) error { if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("taken_down", false).Error; err != nil { return err } + u.SetTakenDown(false) return nil } diff --git a/bgs/handlers.go b/bgs/handlers.go index da87c9521..28cf1f0f2 100644 --- a/bgs/handlers.go +++ b/bgs/handlers.go @@ -34,23 +34,24 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user") } - if u.Tombstoned { + if u.GetTombstoned() { return nil, fmt.Errorf("account was deleted") } - if u.TakenDown { + if u.GetTakenDown() { return nil, fmt.Errorf("account was taken down by the Relay") } - if u.UpstreamStatus == events.AccountStatusTakendown { + ustatus := u.GetUpstreamStatus() + if ustatus == events.AccountStatusTakendown { return nil, fmt.Errorf("account was taken down by its PDS") } - if u.UpstreamStatus == events.AccountStatusDeactivated { + if ustatus == events.AccountStatusDeactivated { return nil, fmt.Errorf("account is temporarily deactivated") } - if u.UpstreamStatus == events.AccountStatusSuspended { + if ustatus == events.AccountStatusSuspended { return nil, fmt.Errorf("account is suspended by its PDS") } @@ -91,23 +92,24 @@ func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user") } - if u.Tombstoned { + if u.GetTombstoned() { return nil, fmt.Errorf("account was deleted") } - if u.TakenDown { + if u.GetTakenDown() { return nil, fmt.Errorf("account was taken down by the Relay") } - if u.UpstreamStatus == events.AccountStatusTakendown { + ustatus := u.GetUpstreamStatus() + if ustatus == events.AccountStatusTakendown { return nil, fmt.Errorf("account was taken down by its PDS") } - if u.UpstreamStatus == events.AccountStatusDeactivated { + if ustatus == events.AccountStatusDeactivated { return nil, fmt.Errorf("account is temporarily deactivated") } - if u.UpstreamStatus == events.AccountStatusSuspended { + if ustatus == events.AccountStatusSuspended { return nil, fmt.Errorf("account is suspended by its PDS") } @@ -253,23 +255,24 @@ func (s *BGS) handleComAtprotoSyncGetLatestCommit(ctx context.Context, did strin return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user") } - if u.Tombstoned { + if u.GetTombstoned() { return nil, fmt.Errorf("account was deleted") } - if u.TakenDown { + if u.GetTakenDown() { return nil, fmt.Errorf("account was taken down by the Relay") } - if u.UpstreamStatus == events.AccountStatusTakendown { + ustatus := u.GetUpstreamStatus() + if ustatus == events.AccountStatusTakendown { return nil, fmt.Errorf("account was taken down by its PDS") } - if u.UpstreamStatus == events.AccountStatusDeactivated { + if ustatus == events.AccountStatusDeactivated { return nil, fmt.Errorf("account is temporarily deactivated") } - if u.UpstreamStatus == events.AccountStatusSuspended { + if ustatus == events.AccountStatusSuspended { return nil, fmt.Errorf("account is suspended by its PDS") }