Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relay splitter daemon + pebble persistence #810

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
c39f17c
wip, start pebble persister
brianolson Nov 14, 2024
61c836c
pebble persister
brianolson Nov 14, 2024
44ce34e
fix events tests
brianolson Nov 14, 2024
3f77d63
GC thread with retention and period
brianolson Nov 14, 2024
81a9ce8
connect rainbow to pebble persister
brianolson Nov 14, 2024
31c5697
Merge remote-tracking branch 'origin/feat/splitter-rebase' into feat/…
brianolson Nov 15, 2024
a71ce5a
delete dead code
brianolson Nov 15, 2024
096fc7c
PR feedback, use pebble DeleteRange
brianolson Nov 15, 2024
e8c9d2e
identity: default dir with 100 max idle conns, and 1sec idle
bnewbold Nov 15, 2024
ed0a5c6
identity: drop default HTTP timeout from 15s to 10s
bnewbold Nov 15, 2024
d88346a
identity: drop default DNS timeout from 5s to 3s
bnewbold Nov 15, 2024
5dc06c0
add gauge spl_active_clients
brianolson Nov 15, 2024
586cc75
--persist-hours
brianolson Nov 15, 2024
43c1d8e
fix err log
brianolson Nov 15, 2024
f5a301b
fix broadcast
brianolson Nov 15, 2024
0fc4c7e
identity package default tweaks (#811)
bnewbold Nov 15, 2024
9780691
last seq from pebble
brianolson Nov 15, 2024
766dc86
log gc sizes
brianolson Nov 15, 2024
1b7e54e
allow carstore to use multiple directories, round robin style
whyrusleeping Nov 15, 2024
fd6ae47
fixup build
whyrusleeping Nov 15, 2024
ffe7fb6
more test fixups
whyrusleeping Nov 15, 2024
9e941db
reorg config
brianolson Nov 15, 2024
be9e655
Merge remote-tracking branch 'origin/feat/splitter-rebase' into feat/…
brianolson Nov 16, 2024
05b8751
allow carstore to use multiple directories, round robin style (#812)
whyrusleeping Nov 16, 2024
0fb485d
add a user cache on the bgs
whyrusleeping Nov 16, 2024
7057577
fixup refactor
whyrusleeping Nov 16, 2024
4a15b38
fix lint
whyrusleeping Nov 16, 2024
a422903
add a user cache on the bgs (#816)
whyrusleeping Nov 16, 2024
75d29f8
add compaction to gc
brianolson Nov 17, 2024
30a2725
Add more metrics to the relay
ericvolp12 Nov 17, 2024
bd727d1
dockerize
brianolson Nov 17, 2024
15004ab
whoops
ericvolp12 Nov 17, 2024
3a4136f
Rename series
ericvolp12 Nov 17, 2024
1eeb496
Track disk vs meta write durations
ericvolp12 Nov 17, 2024
be57b0d
Add more metrics to the relay (#817)
ericvolp12 Nov 17, 2024
bf3ffd5
reorg config, make --cursor-file
brianolson Nov 17, 2024
93497b7
no-cursor subscribe
brianolson Nov 18, 2024
83811b9
Merge remote-tracking branch 'origin/main' into feat/splitter-rebase
brianolson Nov 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions .github/workflows/container-rainbow-aws.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: container-rainbow-aws
on: [push]
env:
REGISTRY: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_REGISTRY }}
USERNAME: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_USERNAME }}
PASSWORD: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_PASSWORD }}
# github.repository as <account>/<repo>
IMAGE_NAME: rainbow

jobs:
container-rainbow-aws:
if: github.repository == 'bluesky-social/indigo'
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
id-token: write

steps:
- name: Checkout repository
uses: actions/checkout@v3

- name: Setup Docker buildx
uses: docker/setup-buildx-action@v1

- name: Log into registry ${{ env.REGISTRY }}
uses: docker/login-action@v2
with:
registry: ${{ env.REGISTRY }}
username: ${{ env.USERNAME }}
password: ${{ env.PASSWORD }}

- name: Extract Docker metadata
id: meta
uses: docker/metadata-action@v4
with:
images: |
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=sha,enable=true,priority=100,prefix=,suffix=,format=long

- name: Build and push Docker image
id: build-and-push
uses: docker/build-push-action@v4
with:
context: .
file: ./cmd/rainbow/Dockerfile
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
9 changes: 7 additions & 2 deletions atproto/identity/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,16 @@ func DefaultDirectory() Directory {
base := BaseDirectory{
PLCURL: DefaultPLCURL,
HTTPClient: http.Client{
Timeout: time.Second * 15,
Timeout: time.Second * 10,
Transport: &http.Transport{
// would want this around 100ms for services doing lots of handle resolution. Impacts PLC connections as well, but not too bad.
IdleConnTimeout: time.Millisecond * 1000,
MaxIdleConns: 100,
},
},
Resolver: net.Resolver{
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
d := net.Dialer{Timeout: time.Second * 5}
d := net.Dialer{Timeout: time.Second * 3}
return d.DialContext(ctx, network, address)
},
},
Expand Down
81 changes: 74 additions & 7 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/bluesky-social/indigo/models"
"github.com/bluesky-social/indigo/repomgr"
"github.com/bluesky-social/indigo/xrpc"
lru "github.com/hashicorp/golang-lru/v2"
"golang.org/x/sync/semaphore"
"golang.org/x/time/rate"

Expand Down Expand Up @@ -87,6 +88,9 @@ type BGS struct {

// Management of Compaction
compactor *Compactor

// User cache
userCache *lru.Cache[string, *User]
}

type PDSResync struct {
Expand Down Expand Up @@ -136,6 +140,8 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm
db.AutoMigrate(models.PDS{})
db.AutoMigrate(models.DomainBan{})

uc, _ := lru.New[string, *User](1_000_000)

bgs := &BGS{
Index: ix,
db: db,
Expand All @@ -151,6 +157,8 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm
consumers: make(map[uint64]*SocketConsumer),

pdsResyncs: make(map[uint]*PDSResync),

userCache: uc,
}

ix.CreateExternalUser = bgs.createExternalUser
Expand Down Expand Up @@ -521,6 +529,44 @@ type User struct {

// UpstreamStatus is the state of the user as reported by the upstream PDS
UpstreamStatus string `gorm:"index"`

lk sync.Mutex
}

func (u *User) SetTakenDown(v bool) {
u.lk.Lock()
defer u.lk.Unlock()
u.TakenDown = v
}

func (u *User) GetTakenDown() bool {
u.lk.Lock()
defer u.lk.Unlock()
return u.TakenDown
}

func (u *User) SetTombstoned(v bool) {
u.lk.Lock()
defer u.lk.Unlock()
u.Tombstoned = v
}

func (u *User) GetTombstoned() bool {
u.lk.Lock()
defer u.lk.Unlock()
return u.Tombstoned
}

func (u *User) SetUpstreamStatus(v string) {
u.lk.Lock()
defer u.lk.Unlock()
u.UpstreamStatus = v
}

func (u *User) GetUpstreamStatus() string {
u.lk.Lock()
defer u.lk.Unlock()
return u.UpstreamStatus
}

type addTargetBody struct {
Expand Down Expand Up @@ -771,6 +817,11 @@ func (bgs *BGS) lookupUserByDid(ctx context.Context, did string) (*User, error)
ctx, span := tracer.Start(ctx, "lookupUserByDid")
defer span.End()

cu, ok := bgs.userCache.Get(did)
if ok {
return cu, nil
}

var u User
if err := bgs.db.Find(&u, "did = ?", did).Error; err != nil {
return nil, err
Expand All @@ -780,6 +831,8 @@ func (bgs *BGS) lookupUserByDid(ctx context.Context, did string) (*User, error)
return nil, gorm.ErrRecordNotFound
}

bgs.userCache.Add(did, &u)

return &u, nil
}

Expand Down Expand Up @@ -823,14 +876,19 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
repoCommitsReceivedCounter.WithLabelValues(host.Host).Add(1)
evt := env.RepoCommit
log.Debugw("bgs got repo append event", "seq", evt.Seq, "pdsHost", host.Host, "repo", evt.Repo)

s := time.Now()
u, err := bgs.lookupUserByDid(ctx, evt.Repo)
userLookupDuration.Observe(time.Since(s).Seconds())
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
return fmt.Errorf("looking up event user: %w", err)
}

newUsersDiscovered.Inc()
start := time.Now()
subj, err := bgs.createExternalUser(ctx, evt.Repo)
newUserDiscoveryDuration.Observe(time.Since(start).Seconds())
if err != nil {
return fmt.Errorf("fed event create external user: %w", err)
}
Expand All @@ -840,20 +898,21 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
u.Did = evt.Repo
}

span.SetAttributes(attribute.String("upstream_status", u.UpstreamStatus))
ustatus := u.GetUpstreamStatus()
span.SetAttributes(attribute.String("upstream_status", ustatus))

if u.TakenDown || u.UpstreamStatus == events.AccountStatusTakendown {
span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.TakenDown))
if u.GetTakenDown() || ustatus == events.AccountStatusTakendown {
span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.GetTakenDown()))
log.Debugw("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
return nil
}

if u.UpstreamStatus == events.AccountStatusSuspended {
if ustatus == events.AccountStatusSuspended {
log.Debugw("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
return nil
}

if u.UpstreamStatus == events.AccountStatusDeactivated {
if ustatus == events.AccountStatusDeactivated {
log.Debugw("dropping commit event from deactivated user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
return nil
}
Expand All @@ -877,12 +936,13 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
}
}

if u.Tombstoned {
if u.GetTombstoned() {
span.SetAttributes(attribute.Bool("tombstoned", true))
// we've checked the authority of the users PDS, so reinstate the account
if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumn("tombstoned", false).Error; err != nil {
return fmt.Errorf("failed to un-tombstone a user: %w", err)
}
u.SetTombstoned(false)

ai, err := bgs.Index.LookupUser(ctx, u.ID)
if err != nil {
Expand Down Expand Up @@ -1041,7 +1101,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
return fmt.Errorf("failed to look up user by did: %w", err)
}

if u.TakenDown {
if u.GetTakenDown() {
shouldBeActive = false
status = &events.AccountStatusTakendown
}
Expand Down Expand Up @@ -1370,18 +1430,22 @@ func (bgs *BGS) UpdateAccountStatus(ctx context.Context, did string, status stri
if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusActive).Error; err != nil {
return fmt.Errorf("failed to set user active status: %w", err)
}
u.SetUpstreamStatus(events.AccountStatusActive)
case events.AccountStatusDeactivated:
if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusDeactivated).Error; err != nil {
return fmt.Errorf("failed to set user deactivation status: %w", err)
}
u.SetUpstreamStatus(events.AccountStatusDeactivated)
case events.AccountStatusSuspended:
if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusSuspended).Error; err != nil {
return fmt.Errorf("failed to set user suspension status: %w", err)
}
u.SetUpstreamStatus(events.AccountStatusSuspended)
case events.AccountStatusTakendown:
if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusTakendown).Error; err != nil {
return fmt.Errorf("failed to set user taken down status: %w", err)
}
u.SetUpstreamStatus(events.AccountStatusTakendown)

if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{
"handle": nil,
Expand All @@ -1396,6 +1460,7 @@ func (bgs *BGS) UpdateAccountStatus(ctx context.Context, did string, status stri
}).Error; err != nil {
return err
}
u.SetUpstreamStatus(events.AccountStatusDeleted)

if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{
"handle": nil,
Expand All @@ -1422,6 +1487,7 @@ func (bgs *BGS) TakeDownRepo(ctx context.Context, did string) error {
if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("taken_down", true).Error; err != nil {
return err
}
u.SetTakenDown(true)

if err := bgs.repoman.TakeDownRepo(ctx, u.ID); err != nil {
return err
Expand All @@ -1443,6 +1509,7 @@ func (bgs *BGS) ReverseTakedown(ctx context.Context, did string) error {
if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("taken_down", false).Error; err != nil {
return err
}
u.SetTakenDown(false)

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion bgs/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (c *Compactor) compactNext(ctx context.Context, bgs *BGS, strategy NextStra
return state, nil
}

func (c *Compactor) EnqueueRepo(ctx context.Context, user User, fast bool) {
func (c *Compactor) EnqueueRepo(ctx context.Context, user *User, fast bool) {
ctx, span := otel.Tracer("compactor").Start(ctx, "EnqueueRepo")
defer span.End()
log.Infow("enqueueing compaction for repo", "repo", user.Did, "uid", user.ID, "fast", fast)
Expand Down
33 changes: 18 additions & 15 deletions bgs/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,24 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
}

if u.Tombstoned {
if u.GetTombstoned() {
return nil, fmt.Errorf("account was deleted")
}

if u.TakenDown {
if u.GetTakenDown() {
return nil, fmt.Errorf("account was taken down by the Relay")
}

if u.UpstreamStatus == events.AccountStatusTakendown {
ustatus := u.GetUpstreamStatus()
if ustatus == events.AccountStatusTakendown {
return nil, fmt.Errorf("account was taken down by its PDS")
}

if u.UpstreamStatus == events.AccountStatusDeactivated {
if ustatus == events.AccountStatusDeactivated {
return nil, fmt.Errorf("account is temporarily deactivated")
}

if u.UpstreamStatus == events.AccountStatusSuspended {
if ustatus == events.AccountStatusSuspended {
return nil, fmt.Errorf("account is suspended by its PDS")
}

Expand Down Expand Up @@ -91,23 +92,24 @@ func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
}

if u.Tombstoned {
if u.GetTombstoned() {
return nil, fmt.Errorf("account was deleted")
}

if u.TakenDown {
if u.GetTakenDown() {
return nil, fmt.Errorf("account was taken down by the Relay")
}

if u.UpstreamStatus == events.AccountStatusTakendown {
ustatus := u.GetUpstreamStatus()
if ustatus == events.AccountStatusTakendown {
return nil, fmt.Errorf("account was taken down by its PDS")
}

if u.UpstreamStatus == events.AccountStatusDeactivated {
if ustatus == events.AccountStatusDeactivated {
return nil, fmt.Errorf("account is temporarily deactivated")
}

if u.UpstreamStatus == events.AccountStatusSuspended {
if ustatus == events.AccountStatusSuspended {
return nil, fmt.Errorf("account is suspended by its PDS")
}

Expand Down Expand Up @@ -253,23 +255,24 @@ func (s *BGS) handleComAtprotoSyncGetLatestCommit(ctx context.Context, did strin
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
}

if u.Tombstoned {
if u.GetTombstoned() {
return nil, fmt.Errorf("account was deleted")
}

if u.TakenDown {
if u.GetTakenDown() {
return nil, fmt.Errorf("account was taken down by the Relay")
}

if u.UpstreamStatus == events.AccountStatusTakendown {
ustatus := u.GetUpstreamStatus()
if ustatus == events.AccountStatusTakendown {
return nil, fmt.Errorf("account was taken down by its PDS")
}

if u.UpstreamStatus == events.AccountStatusDeactivated {
if ustatus == events.AccountStatusDeactivated {
return nil, fmt.Errorf("account is temporarily deactivated")
}

if u.UpstreamStatus == events.AccountStatusSuspended {
if ustatus == events.AccountStatusSuspended {
return nil, fmt.Errorf("account is suspended by its PDS")
}

Expand Down
Loading
Loading