diff --git a/.github/workflows/container-rainbow-aws.yaml b/.github/workflows/container-rainbow-aws.yaml new file mode 100644 index 000000000..412be454a --- /dev/null +++ b/.github/workflows/container-rainbow-aws.yaml @@ -0,0 +1,52 @@ +name: container-rainbow-aws +on: [push] +env: + REGISTRY: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_REGISTRY }} + USERNAME: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_USERNAME }} + PASSWORD: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_PASSWORD }} + # github.repository as / + IMAGE_NAME: rainbow + +jobs: + container-rainbow-aws: + if: github.repository == 'bluesky-social/indigo' + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + id-token: write + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Setup Docker buildx + uses: docker/setup-buildx-action@v1 + + - name: Log into registry ${{ env.REGISTRY }} + uses: docker/login-action@v2 + with: + registry: ${{ env.REGISTRY }} + username: ${{ env.USERNAME }} + password: ${{ env.PASSWORD }} + + - name: Extract Docker metadata + id: meta + uses: docker/metadata-action@v4 + with: + images: | + ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + tags: | + type=sha,enable=true,priority=100,prefix=,suffix=,format=long + + - name: Build and push Docker image + id: build-and-push + uses: docker/build-push-action@v4 + with: + context: . + file: ./cmd/rainbow/Dockerfile + push: ${{ github.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/atproto/identity/identity.go b/atproto/identity/identity.go index 02e66f22c..c0453b2af 100644 --- a/atproto/identity/identity.go +++ b/atproto/identity/identity.go @@ -65,11 +65,16 @@ func DefaultDirectory() Directory { base := BaseDirectory{ PLCURL: DefaultPLCURL, HTTPClient: http.Client{ - Timeout: time.Second * 15, + Timeout: time.Second * 10, + Transport: &http.Transport{ + // would want this around 100ms for services doing lots of handle resolution. Impacts PLC connections as well, but not too bad. + IdleConnTimeout: time.Millisecond * 1000, + MaxIdleConns: 100, + }, }, Resolver: net.Resolver{ Dial: func(ctx context.Context, network, address string) (net.Conn, error) { - d := net.Dialer{Timeout: time.Second * 5} + d := net.Dialer{Timeout: time.Second * 3} return d.DialContext(ctx, network, address) }, }, diff --git a/bgs/bgs.go b/bgs/bgs.go index 35dfab9d9..b64df715e 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -27,6 +27,7 @@ import ( "github.com/bluesky-social/indigo/models" "github.com/bluesky-social/indigo/repomgr" "github.com/bluesky-social/indigo/xrpc" + lru "github.com/hashicorp/golang-lru/v2" "golang.org/x/sync/semaphore" "golang.org/x/time/rate" @@ -87,6 +88,9 @@ type BGS struct { // Management of Compaction compactor *Compactor + + // User cache + userCache *lru.Cache[string, *User] } type PDSResync struct { @@ -136,6 +140,8 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm db.AutoMigrate(models.PDS{}) db.AutoMigrate(models.DomainBan{}) + uc, _ := lru.New[string, *User](1_000_000) + bgs := &BGS{ Index: ix, db: db, @@ -151,6 +157,8 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm consumers: make(map[uint64]*SocketConsumer), pdsResyncs: make(map[uint]*PDSResync), + + userCache: uc, } ix.CreateExternalUser = bgs.createExternalUser @@ -521,6 +529,44 @@ type User struct { // UpstreamStatus is the state of the user as reported by the upstream PDS UpstreamStatus string `gorm:"index"` + + lk sync.Mutex +} + +func (u *User) SetTakenDown(v bool) { + u.lk.Lock() + defer u.lk.Unlock() + u.TakenDown = v +} + +func (u *User) GetTakenDown() bool { + u.lk.Lock() + defer u.lk.Unlock() + return u.TakenDown +} + +func (u *User) SetTombstoned(v bool) { + u.lk.Lock() + defer u.lk.Unlock() + u.Tombstoned = v +} + +func (u *User) GetTombstoned() bool { + u.lk.Lock() + defer u.lk.Unlock() + return u.Tombstoned +} + +func (u *User) SetUpstreamStatus(v string) { + u.lk.Lock() + defer u.lk.Unlock() + u.UpstreamStatus = v +} + +func (u *User) GetUpstreamStatus() string { + u.lk.Lock() + defer u.lk.Unlock() + return u.UpstreamStatus } type addTargetBody struct { @@ -771,6 +817,11 @@ func (bgs *BGS) lookupUserByDid(ctx context.Context, did string) (*User, error) ctx, span := tracer.Start(ctx, "lookupUserByDid") defer span.End() + cu, ok := bgs.userCache.Get(did) + if ok { + return cu, nil + } + var u User if err := bgs.db.Find(&u, "did = ?", did).Error; err != nil { return nil, err @@ -780,6 +831,8 @@ func (bgs *BGS) lookupUserByDid(ctx context.Context, did string) (*User, error) return nil, gorm.ErrRecordNotFound } + bgs.userCache.Add(did, &u) + return &u, nil } @@ -823,14 +876,19 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event repoCommitsReceivedCounter.WithLabelValues(host.Host).Add(1) evt := env.RepoCommit log.Debugw("bgs got repo append event", "seq", evt.Seq, "pdsHost", host.Host, "repo", evt.Repo) + + s := time.Now() u, err := bgs.lookupUserByDid(ctx, evt.Repo) + userLookupDuration.Observe(time.Since(s).Seconds()) if err != nil { if !errors.Is(err, gorm.ErrRecordNotFound) { return fmt.Errorf("looking up event user: %w", err) } newUsersDiscovered.Inc() + start := time.Now() subj, err := bgs.createExternalUser(ctx, evt.Repo) + newUserDiscoveryDuration.Observe(time.Since(start).Seconds()) if err != nil { return fmt.Errorf("fed event create external user: %w", err) } @@ -840,20 +898,21 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event u.Did = evt.Repo } - span.SetAttributes(attribute.String("upstream_status", u.UpstreamStatus)) + ustatus := u.GetUpstreamStatus() + span.SetAttributes(attribute.String("upstream_status", ustatus)) - if u.TakenDown || u.UpstreamStatus == events.AccountStatusTakendown { - span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.TakenDown)) + if u.GetTakenDown() || ustatus == events.AccountStatusTakendown { + span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.GetTakenDown())) log.Debugw("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host) return nil } - if u.UpstreamStatus == events.AccountStatusSuspended { + if ustatus == events.AccountStatusSuspended { log.Debugw("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host) return nil } - if u.UpstreamStatus == events.AccountStatusDeactivated { + if ustatus == events.AccountStatusDeactivated { log.Debugw("dropping commit event from deactivated user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host) return nil } @@ -877,12 +936,13 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event } } - if u.Tombstoned { + if u.GetTombstoned() { span.SetAttributes(attribute.Bool("tombstoned", true)) // we've checked the authority of the users PDS, so reinstate the account if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumn("tombstoned", false).Error; err != nil { return fmt.Errorf("failed to un-tombstone a user: %w", err) } + u.SetTombstoned(false) ai, err := bgs.Index.LookupUser(ctx, u.ID) if err != nil { @@ -1041,7 +1101,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event return fmt.Errorf("failed to look up user by did: %w", err) } - if u.TakenDown { + if u.GetTakenDown() { shouldBeActive = false status = &events.AccountStatusTakendown } @@ -1370,18 +1430,22 @@ func (bgs *BGS) UpdateAccountStatus(ctx context.Context, did string, status stri if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusActive).Error; err != nil { return fmt.Errorf("failed to set user active status: %w", err) } + u.SetUpstreamStatus(events.AccountStatusActive) case events.AccountStatusDeactivated: if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusDeactivated).Error; err != nil { return fmt.Errorf("failed to set user deactivation status: %w", err) } + u.SetUpstreamStatus(events.AccountStatusDeactivated) case events.AccountStatusSuspended: if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusSuspended).Error; err != nil { return fmt.Errorf("failed to set user suspension status: %w", err) } + u.SetUpstreamStatus(events.AccountStatusSuspended) case events.AccountStatusTakendown: if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusTakendown).Error; err != nil { return fmt.Errorf("failed to set user taken down status: %w", err) } + u.SetUpstreamStatus(events.AccountStatusTakendown) if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{ "handle": nil, @@ -1396,6 +1460,7 @@ func (bgs *BGS) UpdateAccountStatus(ctx context.Context, did string, status stri }).Error; err != nil { return err } + u.SetUpstreamStatus(events.AccountStatusDeleted) if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{ "handle": nil, @@ -1422,6 +1487,7 @@ func (bgs *BGS) TakeDownRepo(ctx context.Context, did string) error { if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("taken_down", true).Error; err != nil { return err } + u.SetTakenDown(true) if err := bgs.repoman.TakeDownRepo(ctx, u.ID); err != nil { return err @@ -1443,6 +1509,7 @@ func (bgs *BGS) ReverseTakedown(ctx context.Context, did string) error { if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("taken_down", false).Error; err != nil { return err } + u.SetTakenDown(false) return nil } diff --git a/bgs/compactor.go b/bgs/compactor.go index bca4d8e02..dd4ec4211 100644 --- a/bgs/compactor.go +++ b/bgs/compactor.go @@ -349,7 +349,7 @@ func (c *Compactor) compactNext(ctx context.Context, bgs *BGS, strategy NextStra return state, nil } -func (c *Compactor) EnqueueRepo(ctx context.Context, user User, fast bool) { +func (c *Compactor) EnqueueRepo(ctx context.Context, user *User, fast bool) { ctx, span := otel.Tracer("compactor").Start(ctx, "EnqueueRepo") defer span.End() log.Infow("enqueueing compaction for repo", "repo", user.Did, "uid", user.ID, "fast", fast) diff --git a/bgs/handlers.go b/bgs/handlers.go index da87c9521..28cf1f0f2 100644 --- a/bgs/handlers.go +++ b/bgs/handlers.go @@ -34,23 +34,24 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user") } - if u.Tombstoned { + if u.GetTombstoned() { return nil, fmt.Errorf("account was deleted") } - if u.TakenDown { + if u.GetTakenDown() { return nil, fmt.Errorf("account was taken down by the Relay") } - if u.UpstreamStatus == events.AccountStatusTakendown { + ustatus := u.GetUpstreamStatus() + if ustatus == events.AccountStatusTakendown { return nil, fmt.Errorf("account was taken down by its PDS") } - if u.UpstreamStatus == events.AccountStatusDeactivated { + if ustatus == events.AccountStatusDeactivated { return nil, fmt.Errorf("account is temporarily deactivated") } - if u.UpstreamStatus == events.AccountStatusSuspended { + if ustatus == events.AccountStatusSuspended { return nil, fmt.Errorf("account is suspended by its PDS") } @@ -91,23 +92,24 @@ func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user") } - if u.Tombstoned { + if u.GetTombstoned() { return nil, fmt.Errorf("account was deleted") } - if u.TakenDown { + if u.GetTakenDown() { return nil, fmt.Errorf("account was taken down by the Relay") } - if u.UpstreamStatus == events.AccountStatusTakendown { + ustatus := u.GetUpstreamStatus() + if ustatus == events.AccountStatusTakendown { return nil, fmt.Errorf("account was taken down by its PDS") } - if u.UpstreamStatus == events.AccountStatusDeactivated { + if ustatus == events.AccountStatusDeactivated { return nil, fmt.Errorf("account is temporarily deactivated") } - if u.UpstreamStatus == events.AccountStatusSuspended { + if ustatus == events.AccountStatusSuspended { return nil, fmt.Errorf("account is suspended by its PDS") } @@ -253,23 +255,24 @@ func (s *BGS) handleComAtprotoSyncGetLatestCommit(ctx context.Context, did strin return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user") } - if u.Tombstoned { + if u.GetTombstoned() { return nil, fmt.Errorf("account was deleted") } - if u.TakenDown { + if u.GetTakenDown() { return nil, fmt.Errorf("account was taken down by the Relay") } - if u.UpstreamStatus == events.AccountStatusTakendown { + ustatus := u.GetUpstreamStatus() + if ustatus == events.AccountStatusTakendown { return nil, fmt.Errorf("account was taken down by its PDS") } - if u.UpstreamStatus == events.AccountStatusDeactivated { + if ustatus == events.AccountStatusDeactivated { return nil, fmt.Errorf("account is temporarily deactivated") } - if u.UpstreamStatus == events.AccountStatusSuspended { + if ustatus == events.AccountStatusSuspended { return nil, fmt.Errorf("account is suspended by its PDS") } diff --git a/bgs/metrics.go b/bgs/metrics.go index b33677e6e..edd687596 100644 --- a/bgs/metrics.go +++ b/bgs/metrics.go @@ -81,6 +81,18 @@ var resSz = promauto.NewHistogramVec(prometheus.HistogramOpts{ Buckets: prometheus.ExponentialBuckets(100, 10, 8), }, []string{"code", "method", "path"}) +var userLookupDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "relay_user_lookup_duration", + Help: "A histogram of user lookup latencies", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), +}) + +var newUserDiscoveryDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "relay_new_user_discovery_duration", + Help: "A histogram of new user discovery latencies", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), +}) + // MetricsMiddleware defines handler function for metrics middleware func MetricsMiddleware(next echo.HandlerFunc) echo.HandlerFunc { return func(c echo.Context) error { diff --git a/carstore/bs.go b/carstore/bs.go index e7af35d12..2cf0a5093 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -62,21 +62,23 @@ type CarStore interface { } type FileCarStore struct { - meta *CarStoreGormMeta - rootDir string + meta *CarStoreGormMeta + rootDirs []string lscLk sync.Mutex lastShardCache map[models.Uid]*CarShard } -func NewCarStore(meta *gorm.DB, root string) (CarStore, error) { - if _, err := os.Stat(root); err != nil { - if !os.IsNotExist(err) { - return nil, err - } +func NewCarStore(meta *gorm.DB, roots []string) (CarStore, error) { + for _, root := range roots { + if _, err := os.Stat(root); err != nil { + if !os.IsNotExist(err) { + return nil, err + } - if err := os.Mkdir(root, 0775); err != nil { - return nil, err + if err := os.Mkdir(root, 0775); err != nil { + return nil, err + } } } if err := meta.AutoMigrate(&CarShard{}, &blockRef{}); err != nil { @@ -88,7 +90,7 @@ func NewCarStore(meta *gorm.DB, root string) (CarStore, error) { return &FileCarStore{ meta: &CarStoreGormMeta{meta: meta}, - rootDir: root, + rootDirs: roots, lastShardCache: make(map[models.Uid]*CarShard), }, nil } @@ -541,9 +543,14 @@ func (ds *DeltaSession) GetSize(ctx context.Context, c cid.Cid) (int, error) { func fnameForShard(user models.Uid, seq int) string { return fmt.Sprintf("sh-%d-%d", user, seq) } + +func (cs *FileCarStore) dirForUser(user models.Uid) string { + return cs.rootDirs[int(user)%len(cs.rootDirs)] +} + func (cs *FileCarStore) openNewShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) { // TODO: some overwrite protections - fname := filepath.Join(cs.rootDir, fnameForShard(user, seq)) + fname := filepath.Join(cs.dirForUser(user), fnameForShard(user, seq)) fi, err := os.Create(fname) if err != nil { return nil, "", err @@ -557,7 +564,7 @@ func (cs *FileCarStore) writeNewShardFile(ctx context.Context, user models.Uid, defer span.End() // TODO: some overwrite protections - fname := filepath.Join(cs.rootDir, fnameForShard(user, seq)) + fname := filepath.Join(cs.dirForUser(user), fnameForShard(user, seq)) if err := os.WriteFile(fname, data, 0664); err != nil { return "", err } @@ -638,10 +645,12 @@ func (cs *FileCarStore) writeNewShard(ctx context.Context, root cid.Cid, rev str offset += nw } + start := time.Now() path, err := cs.writeNewShardFile(ctx, user, seq, buf.Bytes()) if err != nil { return nil, fmt.Errorf("failed to write shard file: %w", err) } + writeShardFileDuration.Observe(time.Since(start).Seconds()) shard := CarShard{ Root: models.DbCID{CID: root}, @@ -652,9 +661,11 @@ func (cs *FileCarStore) writeNewShard(ctx context.Context, root cid.Cid, rev str Rev: rev, } + start = time.Now() if err := cs.putShard(ctx, &shard, brefs, rmcids, false); err != nil { return nil, err } + writeShardMetadataDuration.Observe(time.Since(start).Seconds()) return buf.Bytes(), nil } @@ -982,7 +993,7 @@ func (cs *FileCarStore) openNewCompactedShardFile(ctx context.Context, user mode // TODO: some overwrite protections // NOTE CreateTemp is used for creating a non-colliding file, but we keep it and don't delete it so don't think of it as "temporary". // This creates "sh-%d-%d%s" with some random stuff in the last position - fi, err := os.CreateTemp(cs.rootDir, fnameForShard(user, seq)) + fi, err := os.CreateTemp(cs.dirForUser(user), fnameForShard(user, seq)) if err != nil { return nil, "", err } diff --git a/carstore/metrics.go b/carstore/metrics.go new file mode 100644 index 000000000..0d2a0794a --- /dev/null +++ b/carstore/metrics.go @@ -0,0 +1,18 @@ +package carstore + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var writeShardFileDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "carstore_write_shard_file_duration", + Help: "Duration of writing shard file to disk", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), +}) + +var writeShardMetadataDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "carstore_write_shard_metadata_duration", + Help: "Duration of writing shard metadata to DB", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), +}) diff --git a/carstore/repo_test.go b/carstore/repo_test.go index a4d2c8cb8..8366cab95 100644 --- a/carstore/repo_test.go +++ b/carstore/repo_test.go @@ -30,8 +30,13 @@ func testCarStore() (CarStore, func(), error) { return nil, nil, err } - sharddir := filepath.Join(tempdir, "shards") - if err := os.MkdirAll(sharddir, 0775); err != nil { + sharddir1 := filepath.Join(tempdir, "shards1") + if err := os.MkdirAll(sharddir1, 0775); err != nil { + return nil, nil, err + } + + sharddir2 := filepath.Join(tempdir, "shards2") + if err := os.MkdirAll(sharddir2, 0775); err != nil { return nil, nil, err } @@ -45,7 +50,7 @@ func testCarStore() (CarStore, func(), error) { return nil, nil, err } - cs, err := NewCarStore(db, sharddir) + cs, err := NewCarStore(db, []string{sharddir1, sharddir2}) if err != nil { return nil, nil, err } diff --git a/cmd/bigsky/main.go b/cmd/bigsky/main.go index 540796f51..54d56735d 100644 --- a/cmd/bigsky/main.go +++ b/cmd/bigsky/main.go @@ -200,6 +200,11 @@ func run(args []string) error { EnvVars: []string{"RELAY_NUM_COMPACTION_WORKERS"}, Value: 2, }, + &cli.StringSliceFlag{ + Name: "carstore-shard-dirs", + Usage: "specify list of shard directories for carstore storage, overrides default storage within datadir", + EnvVars: []string{"RELAY_CARSTORE_SHARD_DIRS"}, + }, } app.Action = runBigsky @@ -312,8 +317,18 @@ func runBigsky(cctx *cli.Context) error { } } - os.MkdirAll(filepath.Dir(csdir), os.ModePerm) - cstore, err := carstore.NewCarStore(csdb, csdir) + csdirs := []string{csdir} + if paramDirs := cctx.StringSlice("carstore-shard-dirs"); len(paramDirs) > 0 { + csdirs = paramDirs + } + + for _, csd := range csdirs { + if err := os.MkdirAll(filepath.Dir(csd), os.ModePerm); err != nil { + return err + } + } + + cstore, err := carstore.NewCarStore(csdb, csdirs) if err != nil { return err } diff --git a/cmd/laputa/main.go b/cmd/laputa/main.go index 2cedb393a..d91edfc62 100644 --- a/cmd/laputa/main.go +++ b/cmd/laputa/main.go @@ -158,7 +158,7 @@ func run(args []string) { } } - cstore, err := carstore.NewCarStore(csdb, csdir) + cstore, err := carstore.NewCarStore(csdb, []string{csdir}) if err != nil { return err } diff --git a/cmd/rainbow/Dockerfile b/cmd/rainbow/Dockerfile new file mode 100644 index 000000000..72bfc3572 --- /dev/null +++ b/cmd/rainbow/Dockerfile @@ -0,0 +1,43 @@ +FROM golang:1.22-bullseye AS build-env + +ENV DEBIAN_FRONTEND=noninteractive +ENV TZ=Etc/UTC +ENV GODEBUG="netdns=go" +ENV GOOS="linux" +ENV GOARCH="amd64" +ENV CGO_ENABLED="1" + +WORKDIR /usr/src/rainbow + +COPY . . + +RUN go mod download && \ + go mod verify + +RUN go build \ + -v \ + -trimpath \ + -tags timetzdata \ + -o /rainbow-bin \ + ./cmd/rainbow + +FROM debian:bullseye-slim + +ENV DEBIAN_FRONTEND="noninteractive" +ENV TZ=Etc/UTC +ENV GODEBUG="netdns=go" + +RUN apt-get update && apt-get install --yes \ + dumb-init \ + ca-certificates \ + runit + +WORKDIR /rainbow +COPY --from=build-env /rainbow-bin /usr/bin/rainbow + +ENTRYPOINT ["/usr/bin/dumb-init", "--"] +CMD ["/usr/bin/rainbow"] + +LABEL org.opencontainers.image.source=https://github.com/bluesky-social/indigo +LABEL org.opencontainers.image.description="bsky.app rainbow" +LABEL org.opencontainers.image.licenses=MIT diff --git a/cmd/rainbow/main.go b/cmd/rainbow/main.go index 5d6ba14c6..398f56387 100644 --- a/cmd/rainbow/main.go +++ b/cmd/rainbow/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "github.com/bluesky-social/indigo/events" "os" "os/signal" "syscall" @@ -52,6 +53,16 @@ func run(args []string) { Name: "splitter-host", Value: "bsky.network", }, + &cli.StringFlag{ + Name: "persist-db", + Value: "", + Usage: "path to persistence db", + }, + &cli.StringFlag{ + Name: "cursor-file", + Value: "", + Usage: "write upstream cursor number to this file", + }, &cli.StringFlag{ Name: "api-listen", Value: ":2480", @@ -61,6 +72,18 @@ func run(args []string) { Value: ":2481", EnvVars: []string{"SPLITTER_METRICS_LISTEN"}, }, + &cli.Float64Flag{ + Name: "persist-hours", + Value: 24 * 7, + EnvVars: []string{"SPLITTER_PERSIST_HOURS"}, + Usage: "hours to buffer (float, may be fractional)", + }, + &cli.Int64Flag{ + Name: "persist-bytes", + Value: 0, + Usage: "max bytes target for event cache, 0 to disable size target trimming", + EnvVars: []string{"SPLITTER_PERSIST_BYTES"}, + }, } app.Action = Splitter @@ -110,7 +133,36 @@ func Splitter(cctx *cli.Context) error { otel.SetTracerProvider(tp) } - spl := splitter.NewSplitter(cctx.String("splitter-host")) + persistPath := cctx.String("persist-db") + upstreamHost := cctx.String("splitter-host") + var spl *splitter.Splitter + var err error + if persistPath != "" { + log.Infof("building splitter with storage at: %s", persistPath) + ppopts := events.PebblePersistOptions{ + DbPath: persistPath, + PersistDuration: time.Duration(float64(time.Hour) * cctx.Float64("persist-hours")), + GCPeriod: 5 * time.Minute, + MaxBytes: uint64(cctx.Int64("persist-bytes")), + } + conf := splitter.SplitterConfig{ + UpstreamHost: upstreamHost, + CursorFile: cctx.String("cursor-file"), + PebbleOptions: &ppopts, + } + spl, err = splitter.NewSplitter(conf) + } else { + log.Info("building in-memory splitter") + conf := splitter.SplitterConfig{ + UpstreamHost: upstreamHost, + CursorFile: cctx.String("cursor-file"), + } + spl, err = splitter.NewSplitter(conf) + } + if err != nil { + log.Fatalw("failed to create splitter", "path", persistPath, "error", err) + return err + } // set up metrics endpoint go func() { diff --git a/cmd/supercollider/main.go b/cmd/supercollider/main.go index 49499ef69..1c64e71d3 100644 --- a/cmd/supercollider/main.go +++ b/cmd/supercollider/main.go @@ -565,7 +565,7 @@ func initSpeedyRepoMan(key *godid.PrivKey) (*repomgr.RepoManager, *godid.PrivKey return nil, nil, err } - cs, err := carstore.NewCarStore(cardb, cspath) + cs, err := carstore.NewCarStore(cardb, []string{cspath}) if err != nil { return nil, nil, err } diff --git a/did/metrics.go b/did/metrics.go index e7bea3b56..d3c8f0ecd 100644 --- a/did/metrics.go +++ b/did/metrics.go @@ -9,3 +9,9 @@ var mrResolvedDidsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "multiresolver_resolved_dids_total", Help: "Total number of DIDs resolved", }, []string{"resolver"}) + +var mrResolveDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "indigo_multiresolver_resolve_duration_seconds", + Help: "A histogram of resolve latencies", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), +}, []string{"resolver"}) diff --git a/did/multi.go b/did/multi.go index e871ff454..2c67781fe 100644 --- a/did/multi.go +++ b/did/multi.go @@ -3,6 +3,7 @@ package did import ( "context" "fmt" + "time" "github.com/whyrusleeping/go-did" ) @@ -43,12 +44,17 @@ func (mr *MultiResolver) FlushCacheFor(didstr string) { } func (mr *MultiResolver) GetDocument(ctx context.Context, didstr string) (*did.Document, error) { + s := time.Now() + pdid, err := did.ParseDID(didstr) if err != nil { return nil, err } method := pdid.Protocol() + defer func() { + mrResolveDuration.WithLabelValues(method).Observe(time.Since(s).Seconds()) + }() res, ok := mr.handlers[method] if !ok { diff --git a/events/dbpersist_test.go b/events/dbpersist_test.go index c299569da..4e7ecdc74 100644 --- a/events/dbpersist_test.go +++ b/events/dbpersist_test.go @@ -1,4 +1,4 @@ -package events_test +package events import ( "context" @@ -11,19 +11,18 @@ import ( atproto "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/indigo/carstore" - "github.com/bluesky-social/indigo/events" lexutil "github.com/bluesky-social/indigo/lex/util" "github.com/bluesky-social/indigo/models" - "github.com/bluesky-social/indigo/pds" + pds "github.com/bluesky-social/indigo/pds/data" "github.com/bluesky-social/indigo/repomgr" "github.com/bluesky-social/indigo/util" - "github.com/ipfs/go-log/v2" + logging "github.com/ipfs/go-log/v2" "gorm.io/driver/sqlite" "gorm.io/gorm" ) func init() { - log.SetAllLoggers(log.LevelDebug) + logging.SetAllLoggers(logging.LevelDebug) } func BenchmarkDBPersist(b *testing.B) { @@ -61,24 +60,24 @@ func BenchmarkDBPersist(b *testing.B) { defer os.RemoveAll(tempPath) // Initialize a DBPersister - dbp, err := events.NewDbPersistence(db, cs, nil) + dbp, err := NewDbPersistence(db, cs, nil) if err != nil { b.Fatal(err) } // Create a bunch of events - evtman := events.NewEventManager(dbp) + evtman := NewEventManager(dbp) userRepoHead, err := mgr.GetRepoRoot(ctx, 1) if err != nil { b.Fatal(err) } - inEvts := make([]*events.XRPCStreamEvent, b.N) + inEvts := make([]*XRPCStreamEvent, b.N) for i := 0; i < b.N; i++ { cidLink := lexutil.LexLink(cid) headLink := lexutil.LexLink(userRepoHead) - inEvts[i] = &events.XRPCStreamEvent{ + inEvts[i] = &XRPCStreamEvent{ RepoCommit: &atproto.SyncSubscribeRepos_Commit{ Repo: "did:example:123", Commit: headLink, @@ -136,7 +135,7 @@ func BenchmarkDBPersist(b *testing.B) { b.StopTimer() - dbp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { + dbp.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { outEvtCount++ return nil }) @@ -183,24 +182,24 @@ func BenchmarkPlayback(b *testing.B) { defer os.RemoveAll(tempPath) // Initialize a DBPersister - dbp, err := events.NewDbPersistence(db, cs, nil) + dbp, err := NewDbPersistence(db, cs, nil) if err != nil { b.Fatal(err) } // Create a bunch of events - evtman := events.NewEventManager(dbp) + evtman := NewEventManager(dbp) userRepoHead, err := mgr.GetRepoRoot(ctx, 1) if err != nil { b.Fatal(err) } - inEvts := make([]*events.XRPCStreamEvent, n) + inEvts := make([]*XRPCStreamEvent, n) for i := 0; i < n; i++ { cidLink := lexutil.LexLink(cid) headLink := lexutil.LexLink(userRepoHead) - inEvts[i] = &events.XRPCStreamEvent{ + inEvts[i] = &XRPCStreamEvent{ RepoCommit: &atproto.SyncSubscribeRepos_Commit{ Repo: "did:example:123", Commit: headLink, @@ -256,7 +255,7 @@ func BenchmarkPlayback(b *testing.B) { b.ResetTimer() - dbp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { + dbp.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { outEvtCount++ return nil }) @@ -301,7 +300,7 @@ func setupDBs(t testing.TB) (*gorm.DB, *gorm.DB, carstore.CarStore, string, erro return nil, nil, nil, "", err } - cs, err := carstore.NewCarStore(cardb, cspath) + cs, err := carstore.NewCarStore(cardb, []string{cspath}) if err != nil { return nil, nil, nil, "", err } diff --git a/events/diskpersist_test.go b/events/diskpersist_test.go index 5d09c0fc2..74d0d62a5 100644 --- a/events/diskpersist_test.go +++ b/events/diskpersist_test.go @@ -1,4 +1,4 @@ -package events_test +package events import ( "context" @@ -14,16 +14,15 @@ import ( atproto "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/indigo/carstore" - "github.com/bluesky-social/indigo/events" lexutil "github.com/bluesky-social/indigo/lex/util" "github.com/bluesky-social/indigo/models" - "github.com/bluesky-social/indigo/pds" + pds "github.com/bluesky-social/indigo/pds/data" "github.com/bluesky-social/indigo/repomgr" "github.com/bluesky-social/indigo/util" "gorm.io/gorm" ) -func TestDiskPersist(t *testing.T) { +func testPersister(t *testing.T, perisistenceFactory func(path string, db *gorm.DB) (EventPersistence, error)) { ctx := context.Background() db, _, cs, tempPath, err := setupDBs(t) @@ -57,19 +56,14 @@ func TestDiskPersist(t *testing.T) { defer os.RemoveAll(tempPath) - // Initialize a DBPersister - - dp, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ - EventsPerFile: 10, - UIDCacheSize: 100000, - DIDCacheSize: 100000, - }) + // Initialize a persister + dp, err := perisistenceFactory(tempPath, db) if err != nil { t.Fatal(err) } // Create a bunch of events - evtman := events.NewEventManager(dp) + evtman := NewEventManager(dp) userRepoHead, err := mgr.GetRepoRoot(ctx, 1) if err != nil { @@ -77,11 +71,11 @@ func TestDiskPersist(t *testing.T) { } n := 100 - inEvts := make([]*events.XRPCStreamEvent, n) + inEvts := make([]*XRPCStreamEvent, n) for i := 0; i < n; i++ { cidLink := lexutil.LexLink(cid) headLink := lexutil.LexLink(userRepoHead) - inEvts[i] = &events.XRPCStreamEvent{ + inEvts[i] = &XRPCStreamEvent{ RepoCommit: &atproto.SyncSubscribeRepos_Commit{ Repo: "did:example:123", Commit: headLink, @@ -93,6 +87,7 @@ func TestDiskPersist(t *testing.T) { }, }, Time: time.Now().Format(util.ISO8601), + Seq: int64(i), }, } } @@ -112,7 +107,7 @@ func TestDiskPersist(t *testing.T) { outEvtCount := 0 expectedEvtCount := n - dp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { + dp.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { outEvtCount++ return nil }) @@ -125,7 +120,7 @@ func TestDiskPersist(t *testing.T) { time.Sleep(time.Millisecond * 100) - dp2, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ + dp2, err := NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{ EventsPerFile: 10, UIDCacheSize: 100000, DIDCacheSize: 100000, @@ -134,13 +129,13 @@ func TestDiskPersist(t *testing.T) { t.Fatal(err) } - evtman2 := events.NewEventManager(dp2) + evtman2 := NewEventManager(dp2) - inEvts = make([]*events.XRPCStreamEvent, n) + inEvts = make([]*XRPCStreamEvent, n) for i := 0; i < n; i++ { cidLink := lexutil.LexLink(cid) headLink := lexutil.LexLink(userRepoHead) - inEvts[i] = &events.XRPCStreamEvent{ + inEvts[i] = &XRPCStreamEvent{ RepoCommit: &atproto.SyncSubscribeRepos_Commit{ Repo: "did:example:123", Commit: headLink, @@ -163,6 +158,16 @@ func TestDiskPersist(t *testing.T) { } } } +func TestDiskPersist(t *testing.T) { + factory := func(tempPath string, db *gorm.DB) (EventPersistence, error) { + return NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{ + EventsPerFile: 10, + UIDCacheSize: 100000, + DIDCacheSize: 100000, + }) + } + testPersister(t, factory) +} func BenchmarkDiskPersist(b *testing.B) { db, _, cs, tempPath, err := setupDBs(b) @@ -174,7 +179,7 @@ func BenchmarkDiskPersist(b *testing.B) { // Initialize a DBPersister - dp, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ + dp, err := NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{ EventsPerFile: 5000, UIDCacheSize: 100000, DIDCacheSize: 100000, @@ -187,7 +192,7 @@ func BenchmarkDiskPersist(b *testing.B) { } -func runPersisterBenchmark(b *testing.B, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) { +func runPersisterBenchmark(b *testing.B, cs carstore.CarStore, db *gorm.DB, p EventPersistence) { ctx := context.Background() db.AutoMigrate(&pds.User{}) @@ -215,18 +220,18 @@ func runPersisterBenchmark(b *testing.B, cs carstore.CarStore, db *gorm.DB, p ev } // Create a bunch of events - evtman := events.NewEventManager(p) + evtman := NewEventManager(p) userRepoHead, err := mgr.GetRepoRoot(ctx, 1) if err != nil { b.Fatal(err) } - inEvts := make([]*events.XRPCStreamEvent, b.N) + inEvts := make([]*XRPCStreamEvent, b.N) for i := 0; i < b.N; i++ { cidLink := lexutil.LexLink(cid) headLink := lexutil.LexLink(userRepoHead) - inEvts[i] = &events.XRPCStreamEvent{ + inEvts[i] = &XRPCStreamEvent{ RepoCommit: &atproto.SyncSubscribeRepos_Commit{ Repo: "did:example:123", Commit: headLink, @@ -290,7 +295,7 @@ func TestDiskPersister(t *testing.T) { // Initialize a DBPersister - dp, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ + dp, err := NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{ EventsPerFile: 20, UIDCacheSize: 100000, DIDCacheSize: 100000, @@ -302,7 +307,7 @@ func TestDiskPersister(t *testing.T) { runEventManagerTest(t, cs, db, dp) } -func runEventManagerTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) { +func runEventManagerTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p EventPersistence) { ctx := context.Background() db.AutoMigrate(&pds.User{}) @@ -329,7 +334,7 @@ func runEventManagerTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p even t.Fatal(err) } - evtman := events.NewEventManager(p) + evtman := NewEventManager(p) userRepoHead, err := mgr.GetRepoRoot(ctx, 1) if err != nil { @@ -337,11 +342,11 @@ func runEventManagerTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p even } testSize := 100 // you can adjust this number as needed - inEvts := make([]*events.XRPCStreamEvent, testSize) + inEvts := make([]*XRPCStreamEvent, testSize) for i := 0; i < testSize; i++ { cidLink := lexutil.LexLink(cid) headLink := lexutil.LexLink(userRepoHead) - inEvts[i] = &events.XRPCStreamEvent{ + inEvts[i] = &XRPCStreamEvent{ RepoCommit: &atproto.SyncSubscribeRepos_Commit{ Repo: "did:example:123", Commit: headLink, @@ -368,7 +373,7 @@ func runEventManagerTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p even } outEvtCount := 0 - p.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { + p.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { // Check that the contents of the output events match the input events // Clear cache, don't care if one has it and not the other inEvts[outEvtCount].Preserialized = nil @@ -397,7 +402,7 @@ func TestDiskPersisterTakedowns(t *testing.T) { // Initialize a DBPersister - dp, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ + dp, err := NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{ EventsPerFile: 10, UIDCacheSize: 100000, DIDCacheSize: 100000, @@ -409,7 +414,7 @@ func TestDiskPersisterTakedowns(t *testing.T) { runTakedownTest(t, cs, db, dp) } -func runTakedownTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) { +func runTakedownTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p EventPersistence) { ctx := context.TODO() db.AutoMigrate(&pds.User{}) @@ -439,10 +444,10 @@ func runTakedownTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p events.E } } - evtman := events.NewEventManager(p) + evtman := NewEventManager(p) testSize := 100 // you can adjust this number as needed - inEvts := make([]*events.XRPCStreamEvent, testSize*userCount) + inEvts := make([]*XRPCStreamEvent, testSize*userCount) for i := 0; i < testSize*userCount; i++ { user := users[i%userCount] _, cid, err := mgr.CreateRecord(ctx, user.Uid, "app.bsky.feed.post", &bsky.FeedPost{ @@ -460,7 +465,7 @@ func runTakedownTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p events.E cidLink := lexutil.LexLink(cid) headLink := lexutil.LexLink(userRepoHead) - inEvts[i] = &events.XRPCStreamEvent{ + inEvts[i] = &XRPCStreamEvent{ RepoCommit: &atproto.SyncSubscribeRepos_Commit{ Repo: user.Did, Commit: headLink, @@ -495,7 +500,7 @@ func runTakedownTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p events.E // Verify that the events of the user have been removed from the event stream var evtsCount int - if err := p.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { + if err := p.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { evtsCount++ if evt.RepoCommit.Repo == takeDownUser.Did { t.Fatalf("found event for user %d after takedown", takeDownUser.Uid) diff --git a/events/events.go b/events/events.go index 83438ee44..ba6d0faad 100644 --- a/events/events.go +++ b/events/events.go @@ -219,6 +219,78 @@ func (evt *XRPCStreamEvent) Serialize(wc io.Writer) error { return obj.MarshalCBOR(cborWriter) } +func (xevt *XRPCStreamEvent) Deserialize(r io.Reader) error { + var header EventHeader + if err := header.UnmarshalCBOR(r); err != nil { + return fmt.Errorf("reading header: %w", err) + } + switch header.Op { + case EvtKindMessage: + switch header.MsgType { + case "#commit": + var evt comatproto.SyncSubscribeRepos_Commit + if err := evt.UnmarshalCBOR(r); err != nil { + return fmt.Errorf("reading repoCommit event: %w", err) + } + xevt.RepoCommit = &evt + case "#handle": + var evt comatproto.SyncSubscribeRepos_Handle + if err := evt.UnmarshalCBOR(r); err != nil { + return err + } + xevt.RepoHandle = &evt + case "#identity": + var evt comatproto.SyncSubscribeRepos_Identity + if err := evt.UnmarshalCBOR(r); err != nil { + return err + } + xevt.RepoIdentity = &evt + case "#account": + var evt comatproto.SyncSubscribeRepos_Account + if err := evt.UnmarshalCBOR(r); err != nil { + return err + } + xevt.RepoAccount = &evt + case "#info": + // TODO: this might also be a LabelInfo (as opposed to RepoInfo) + var evt comatproto.SyncSubscribeRepos_Info + if err := evt.UnmarshalCBOR(r); err != nil { + return err + } + xevt.RepoInfo = &evt + case "#migrate": + var evt comatproto.SyncSubscribeRepos_Migrate + if err := evt.UnmarshalCBOR(r); err != nil { + return err + } + xevt.RepoMigrate = &evt + case "#tombstone": + var evt comatproto.SyncSubscribeRepos_Tombstone + if err := evt.UnmarshalCBOR(r); err != nil { + return err + } + xevt.RepoTombstone = &evt + case "#labels": + var evt comatproto.LabelSubscribeLabels_Labels + if err := evt.UnmarshalCBOR(r); err != nil { + return fmt.Errorf("reading Labels event: %w", err) + } + xevt.LabelLabels = &evt + } + case EvtKindErrorFrame: + var errframe ErrorFrame + if err := errframe.UnmarshalCBOR(r); err != nil { + return err + } + xevt.Error = &errframe + default: + return fmt.Errorf("unrecognized event stream type: %d", header.Op) + } + return nil +} + +var ErrNoSeq = errors.New("event has no sequence number") + // serialize content into Preserialized cache func (evt *XRPCStreamEvent) Preserialize() error { if evt.Preserialized != nil { @@ -352,6 +424,10 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func } func SequenceForEvent(evt *XRPCStreamEvent) int64 { + return evt.Sequence() +} + +func (evt *XRPCStreamEvent) Sequence() int64 { switch { case evt == nil: return -1 diff --git a/events/pebblepersist.go b/events/pebblepersist.go new file mode 100644 index 000000000..2c1c787e5 --- /dev/null +++ b/events/pebblepersist.go @@ -0,0 +1,262 @@ +package events + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/hex" + "errors" + "fmt" + "time" + + "github.com/bluesky-social/indigo/models" + "github.com/cockroachdb/pebble" +) + +type PebblePersist struct { + broadcast func(*XRPCStreamEvent) + db *pebble.DB + + prevSeq int64 + prevSeqExtra uint32 + + cancel func() + + options PebblePersistOptions +} + +type PebblePersistOptions struct { + // path where pebble will create a directory full of files + DbPath string + + // Throw away posts older than some time ago + PersistDuration time.Duration + + // Throw away old posts every so often + GCPeriod time.Duration + + // MaxBytes is what we _try_ to keep disk usage under + MaxBytes uint64 +} + +var DefaultPebblePersistOptions = PebblePersistOptions{ + PersistDuration: time.Minute * 20, + GCPeriod: time.Minute * 5, + MaxBytes: 1024 * 1024 * 1024, // 1 GiB +} + +// Create a new EventPersistence which stores data in pebbledb +// nil opts is ok +func NewPebblePersistance(opts *PebblePersistOptions) (*PebblePersist, error) { + if opts == nil { + opts = &DefaultPebblePersistOptions + } + db, err := pebble.Open(opts.DbPath, &pebble.Options{}) + if err != nil { + return nil, fmt.Errorf("%s: %w", opts.DbPath, err) + } + pp := new(PebblePersist) + pp.options = *opts + pp.db = db + return pp, nil +} + +func setKeySeqMillis(key []byte, seq, millis int64) { + binary.BigEndian.PutUint64(key[:8], uint64(seq)) + binary.BigEndian.PutUint64(key[8:16], uint64(millis)) +} + +func (pp *PebblePersist) Persist(ctx context.Context, e *XRPCStreamEvent) error { + err := e.Preserialize() + if err != nil { + return err + } + blob := e.Preserialized + + seq := e.Sequence() + nowMillis := time.Now().UnixMilli() + + if seq < 0 { + // persist with longer key {prev 8 byte key}{time}{int32 extra counter} + pp.prevSeqExtra++ + var key [20]byte + setKeySeqMillis(key[:], seq, nowMillis) + binary.BigEndian.PutUint32(key[16:], pp.prevSeqExtra) + + err = pp.db.Set(key[:], blob, pebble.Sync) + } else { + pp.prevSeq = seq + pp.prevSeqExtra = 0 + var key [16]byte + setKeySeqMillis(key[:], seq, nowMillis) + + err = pp.db.Set(key[:], blob, pebble.Sync) + } + + if err != nil { + return err + } + pp.broadcast(e) + + return err +} + +func eventFromPebbleIter(iter *pebble.Iterator) (*XRPCStreamEvent, error) { + blob, err := iter.ValueAndErr() + if err != nil { + return nil, err + } + br := bytes.NewReader(blob) + evt := new(XRPCStreamEvent) + err = evt.Deserialize(br) + if err != nil { + return nil, err + } + evt.Preserialized = bytes.Clone(blob) + return evt, nil +} + +func (pp *PebblePersist) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error { + var key [8]byte + binary.BigEndian.PutUint64(key[:], uint64(since)) + + iter, err := pp.db.NewIterWithContext(ctx, &pebble.IterOptions{LowerBound: key[:]}) + if err != nil { + return err + } + defer iter.Close() + + for iter.First(); iter.Valid(); iter.Next() { + evt, err := eventFromPebbleIter(iter) + if err != nil { + return err + } + + err = cb(evt) + if err != nil { + return err + } + } + + return nil +} +func (pp *PebblePersist) TakeDownRepo(ctx context.Context, usr models.Uid) error { + // TODO: implement filter on playback to ignore taken-down-repos? + return nil +} +func (pp *PebblePersist) Flush(context.Context) error { + return pp.db.Flush() +} +func (pp *PebblePersist) Shutdown(context.Context) error { + if pp.cancel != nil { + pp.cancel() + } + err := pp.db.Close() + pp.db = nil + return err +} + +func (pp *PebblePersist) SetEventBroadcaster(broadcast func(*XRPCStreamEvent)) { + pp.broadcast = broadcast +} + +var ErrNoLast = errors.New("no last event") + +func (pp *PebblePersist) GetLast(ctx context.Context) (seq, millis int64, evt *XRPCStreamEvent, err error) { + iter, err := pp.db.NewIterWithContext(ctx, &pebble.IterOptions{}) + if err != nil { + return 0, 0, nil, err + } + ok := iter.Last() + if !ok { + return 0, 0, nil, ErrNoLast + } + evt, err = eventFromPebbleIter(iter) + keyblob := iter.Key() + seq = int64(binary.BigEndian.Uint64(keyblob[:8])) + millis = int64(binary.BigEndian.Uint64(keyblob[8:16])) + return seq, millis, evt, nil +} + +// example; +// ``` +// pp := NewPebblePersistance("/tmp/foo.pebble") +// go pp.GCThread(context.Background(), 48 * time.Hour, 5 * time.Minute) +// ``` +func (pp *PebblePersist) GCThread(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + pp.cancel = cancel + ticker := time.NewTicker(pp.options.GCPeriod) + defer ticker.Stop() + for { + select { + case <-ticker.C: + err := pp.GarbageCollect(ctx) + if err != nil { + log.Errorw("GC err", "err", err) + } + case <-ctx.Done(): + return + } + } +} + +var zeroKey [16]byte +var ffffKey [16]byte + +func init() { + setKeySeqMillis(zeroKey[:], 0, 0) + for i := range ffffKey { + ffffKey[i] = 0xff + } +} + +func (pp *PebblePersist) GarbageCollect(ctx context.Context) error { + nowMillis := time.Now().UnixMilli() + expired := nowMillis - pp.options.PersistDuration.Milliseconds() + iter, err := pp.db.NewIterWithContext(ctx, &pebble.IterOptions{}) + if err != nil { + return err + } + defer iter.Close() + // scan keys to find last expired, then delete range + var seq int64 = int64(-1) + var lastKeyTime int64 + for iter.First(); iter.Valid(); iter.Next() { + keyblob := iter.Key() + + keyTime := int64(binary.BigEndian.Uint64(keyblob[8:16])) + if keyTime <= expired { + lastKeyTime = keyTime + seq = int64(binary.BigEndian.Uint64(keyblob[:8])) + } else { + break + } + } + + // TODO: use pp.options.MaxBytes + + sizeBefore, _ := pp.db.EstimateDiskUsage(zeroKey[:], ffffKey[:]) + if seq == -1 { + // nothing to delete + log.Infow("pebble gc nop", "size", sizeBefore) + return nil + } + var key [16]byte + setKeySeqMillis(key[:], seq, lastKeyTime) + log.Infow("pebble gc start", "to", hex.EncodeToString(key[:])) + err = pp.db.DeleteRange(zeroKey[:], key[:], pebble.Sync) + if err != nil { + return err + } + sizeAfter, _ := pp.db.EstimateDiskUsage(zeroKey[:], ffffKey[:]) + log.Infow("pebble gc", "before", sizeBefore, "after", sizeAfter) + start := time.Now() + err = pp.db.Compact(zeroKey[:], key[:], true) + if err != nil { + log.Warnw("pebble gc compact", "err", err) + } + dt := time.Since(start) + log.Infow("pebble gc compact ok", "dt", dt) + return nil +} diff --git a/events/pebblepersist_test.go b/events/pebblepersist_test.go new file mode 100644 index 000000000..901365c5d --- /dev/null +++ b/events/pebblepersist_test.go @@ -0,0 +1,16 @@ +package events + +import ( + "gorm.io/gorm" + "path/filepath" + "testing" +) + +func TestPebblePersist(t *testing.T) { + factory := func(tempPath string, db *gorm.DB) (EventPersistence, error) { + opts := DefaultPebblePersistOptions + opts.DbPath = filepath.Join(tempPath, "pebble.db") + return NewPebblePersistance(&opts) + } + testPersister(t, factory) +} diff --git a/go.mod b/go.mod index 66391db61..da1a77d84 100644 --- a/go.mod +++ b/go.mod @@ -10,11 +10,11 @@ require ( github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de github.com/brianvoe/gofakeit/v6 v6.25.0 github.com/carlmjohnson/versioninfo v0.22.5 + github.com/cockroachdb/pebble v1.1.2 github.com/dustinkirkland/golang-petname v0.0.0-20231002161417-6a283f1aaaf2 github.com/flosch/pongo2/v6 v6.0.0 github.com/go-redis/cache/v9 v9.0.0 github.com/goccy/go-json v0.10.2 - github.com/gocql/gocql v1.7.0 github.com/golang-jwt/jwt v3.2.2+incompatible github.com/gorilla/websocket v1.5.1 github.com/hashicorp/go-retryablehttp v0.7.5 @@ -66,7 +66,7 @@ require ( go.opentelemetry.io/otel/trace v1.21.0 go.uber.org/automaxprocs v1.5.3 golang.org/x/crypto v0.21.0 - golang.org/x/sync v0.5.0 + golang.org/x/sync v0.7.0 golang.org/x/text v0.14.0 golang.org/x/time v0.3.0 golang.org/x/tools v0.15.0 @@ -78,23 +78,32 @@ require ( ) require ( + github.com/DataDog/zstd v1.4.5 // indirect + github.com/cockroachdb/errors v1.11.3 // indirect + github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect + github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect + github.com/cockroachdb/redact v1.1.5 // indirect + github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/getsentry/sentry-go v0.27.0 // indirect github.com/go-redis/redis v6.15.9+incompatible // indirect - github.com/golang/snappy v0.0.3 // indirect - github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/klauspost/compress v1.17.3 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/kr/text v0.2.0 // indirect github.com/labstack/gommon v0.4.1 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/rogpeppe/go-internal v1.10.0 // indirect github.com/vmihailenco/go-tinylfu v0.2.2 // indirect github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect - gopkg.in/inf.v0 v0.9.1 // indirect ) require ( @@ -167,12 +176,12 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.14.0 // indirect - golang.org/x/net v0.21.0 // indirect + golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.22.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20231120223509-83a465c0220f // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect google.golang.org/grpc v1.59.0 // indirect - google.golang.org/protobuf v1.31.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.2.1 // indirect diff --git a/go.sum b/go.sum index 8cd2edd60..446851e37 100644 --- a/go.sum +++ b/go.sum @@ -35,6 +35,8 @@ contrib.go.opencensus.io/exporter/prometheus v0.4.2/go.mod h1:dvEHbiKmgvbr5pjaF9 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= +github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/PuerkitoBio/purell v1.2.1 h1:QsZ4TjvwiMpat6gBCBxEQI0rcS9ehtkKtSpiUnd9N28= github.com/PuerkitoBio/purell v1.2.1/go.mod h1:ZwHcC/82TOaovDi//J/804umJFFmbOHPngi8iYYv/Eo= github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b h1:5/++qT1/z812ZqBvqQt6ToRswSuPZ/B33m6xVHRzADU= @@ -71,10 +73,6 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= -github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= -github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= -github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/brianvoe/gofakeit/v6 v6.25.0 h1:ZpFjktOpLZUeF8q223o0rUuXtA+m5qW5srjvVi+JkXk= github.com/brianvoe/gofakeit/v6 v6.25.0/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= @@ -95,6 +93,20 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f h1:otljaYPt5hWxV3MUfO5dFPFiOXg9CyG5/kCfayTqsJ4= +github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= +github.com/cockroachdb/errors v1.11.3 h1:5bA+k2Y6r+oz/6Z/RFlNeVCesGARKuC6YymtcDrbC/I= +github.com/cockroachdb/errors v1.11.3/go.mod h1:m4UIW4CDjx+R5cybPsNrRbreomiFqt8o1h1wUVazSd8= +github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce h1:giXvy4KSc/6g/esnpM7Geqxka4WSqI1SZc7sMJFd3y4= +github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce/go.mod h1:9/y3cnZ5GKakj/H4y9r9GTjCvAFta7KLgSHPJJYc52M= +github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE= +github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= +github.com/cockroachdb/pebble v1.1.2 h1:CUh2IPtR4swHlEj48Rhfzw6l/d0qA31fItcIszQVIsA= +github.com/cockroachdb/pebble v1.1.2/go.mod h1:4exszw1r40423ZsmkG/09AFEG83I0uDgfujJdbL6kYU= +github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30= +github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= +github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= +github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= github.com/corpix/uarand v0.2.0 h1:U98xXwud/AVuCpkpgfPF7J5TQgr7R5tqT8VZP5KWbzE= github.com/corpix/uarand v0.2.0/go.mod h1:/3Z1QIqWkDIhf6XWn/08/uMHoQ8JUoTIKc2iPchBOmM= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= @@ -126,6 +138,10 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= +github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= +github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= +github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -156,8 +172,6 @@ github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg78 github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= -github.com/gocql/gocql v1.7.0 h1:O+7U7/1gSN7QTEAaMEsJc1Oq2QHXvCWoF3DFK9HDHus= -github.com/gocql/gocql v1.7.0/go.mod h1:vnlvXyFZeLBF0Wy+RS8hrOdbn0UWsWtdg07XJnFxZ+4= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -195,8 +209,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= -github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -239,8 +253,6 @@ github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/ github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 h1:6UKoz5ujsI55KNpsJH3UwCq3T8kKbZwNZBNPuTTje8U= github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1/go.mod h1:YvJ2f6MplWDhfxiUC3KpyTy76kYUZA4W3pTv/wdKQ9Y= -github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= -github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI= @@ -512,8 +524,12 @@ github.com/orandin/slog-gorm v1.3.2 h1:C0lKDQPAx/pF+8K2HL7bdShPwOEJpPM0Bn80zTzxU github.com/orandin/slog-gorm v1.3.2/go.mod h1:MoZ51+b7xE9lwGNPYEhxcUtRNrYzjdcKvA8QXQQGEPA= github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 h1:1/WtZae0yGtPq+TI6+Tv1WTxkukpXeMlviSxvL7SRgk= github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9/go.mod h1:x3N5drFsm2uilKKuuYo6LdyD8vZAW55sH/9w+pbo1sw= +github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= +github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -564,6 +580,7 @@ github.com/rivo/uniseg v0.1.0 h1:+2KBaVoUmb9XzDsrx/Ct0W/EYOSFf/nWTauy++DprtY= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -795,8 +812,8 @@ golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= -golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -818,8 +835,8 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= -golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1061,8 +1078,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -1071,8 +1088,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= -gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= -gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/indexer/posts_test.go b/indexer/posts_test.go index ed21ab666..aa6fc99b3 100644 --- a/indexer/posts_test.go +++ b/indexer/posts_test.go @@ -50,7 +50,7 @@ func testIndexer(t *testing.T) *testIx { t.Fatal(err) } - cs, err := carstore.NewCarStore(cardb, cspath) + cs, err := carstore.NewCarStore(cardb, []string{cspath}) if err != nil { t.Fatal(err) } diff --git a/pds/data/types.go b/pds/data/types.go new file mode 100644 index 000000000..f7ff96e54 --- /dev/null +++ b/pds/data/types.go @@ -0,0 +1,27 @@ +package data + +import ( + "github.com/bluesky-social/indigo/models" + "gorm.io/gorm" + "time" +) + +type User struct { + ID models.Uid `gorm:"primarykey"` + CreatedAt time.Time + UpdatedAt time.Time + DeletedAt gorm.DeletedAt `gorm:"index"` + Handle string `gorm:"uniqueIndex"` + Password string + RecoveryKey string + Email string + Did string `gorm:"uniqueIndex"` + PDS uint +} + +type Peering struct { + gorm.Model + Host string + Did string + Approved bool +} diff --git a/pds/handlers_test.go b/pds/handlers_test.go index fe2bb14b8..9cecd3f91 100644 --- a/pds/handlers_test.go +++ b/pds/handlers_test.go @@ -29,7 +29,7 @@ func testCarStore(t *testing.T, db *gorm.DB) (carstore.CarStore, func()) { t.Fatal(err) } - cs, err := carstore.NewCarStore(db, sharddir) + cs, err := carstore.NewCarStore(db, []string{sharddir}) if err != nil { t.Fatal(err) } diff --git a/pds/server.go b/pds/server.go index b9d1c903b..54f1dfed1 100644 --- a/pds/server.go +++ b/pds/server.go @@ -21,6 +21,7 @@ import ( lexutil "github.com/bluesky-social/indigo/lex/util" "github.com/bluesky-social/indigo/models" "github.com/bluesky-social/indigo/notifs" + pdsdata "github.com/bluesky-social/indigo/pds/data" "github.com/bluesky-social/indigo/plc" "github.com/bluesky-social/indigo/repomgr" "github.com/bluesky-social/indigo/util" @@ -456,18 +457,7 @@ func (s *Server) HandleResolveDid(c echo.Context) error { return c.String(200, u.Did) } -type User struct { - ID models.Uid `gorm:"primarykey"` - CreatedAt time.Time - UpdatedAt time.Time - DeletedAt gorm.DeletedAt `gorm:"index"` - Handle string `gorm:"uniqueIndex"` - Password string - RecoveryKey string - Email string - Did string `gorm:"uniqueIndex"` - PDS uint -} +type User = pdsdata.User type RefreshToken struct { gorm.Model @@ -636,12 +626,7 @@ func (s *Server) invalidateToken(ctx context.Context, u *User, tok *jwt.Token) e panic("nyi") } -type Peering struct { - gorm.Model - Host string - Did string - Approved bool -} +type Peering = pdsdata.Peering func (s *Server) EventsHandler(c echo.Context) error { conn, err := websocket.Upgrade(c.Response().Writer, c.Request(), c.Response().Header(), 1<<10, 1<<10) diff --git a/repomgr/bench_test.go b/repomgr/bench_test.go index 271813909..c01789422 100644 --- a/repomgr/bench_test.go +++ b/repomgr/bench_test.go @@ -54,7 +54,7 @@ func BenchmarkRepoMgrCreates(b *testing.B) { b.Fatal(err) } - cs, err := carstore.NewCarStore(cardb, cspath) + cs, err := carstore.NewCarStore(cardb, []string{cspath}) if err != nil { b.Fatal(err) } diff --git a/repomgr/ingest_test.go b/repomgr/ingest_test.go index dcb9097ac..38a8562e5 100644 --- a/repomgr/ingest_test.go +++ b/repomgr/ingest_test.go @@ -50,7 +50,7 @@ func TestLoadNewRepo(t *testing.T) { t.Fatal(err) } - cs, err := carstore.NewCarStore(cardb, cspath) + cs, err := carstore.NewCarStore(cardb, []string{cspath}) if err != nil { t.Fatal(err) } @@ -80,7 +80,7 @@ func testCarstore(t *testing.T, dir string) carstore.CarStore { t.Fatal(err) } - cs, err := carstore.NewCarStore(cardb, cspath) + cs, err := carstore.NewCarStore(cardb, []string{cspath}) if err != nil { t.Fatal(err) } diff --git a/repomgr/metrics.go b/repomgr/metrics.go index a92e3091d..df3e4bea1 100644 --- a/repomgr/metrics.go +++ b/repomgr/metrics.go @@ -9,3 +9,21 @@ var repoOpsImported = promauto.NewCounter(prometheus.CounterOpts{ Name: "repomgr_repo_ops_imported", Help: "Number of repo ops imported", }) + +var openAndSigCheckDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "repomgr_open_and_sig_check_duration", + Help: "Duration of opening and signature check", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), +}) + +var calcDiffDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "repomgr_calc_diff_duration", + Help: "Duration of calculating diff", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), +}) + +var writeCarSliceDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "repomgr_write_car_slice_duration", + Help: "Duration of writing car slice", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), +}) diff --git a/repomgr/repomgr.go b/repomgr/repomgr.go index ad90a43b2..bdf20d784 100644 --- a/repomgr/repomgr.go +++ b/repomgr/repomgr.go @@ -8,6 +8,7 @@ import ( "io" "strings" "sync" + "time" atproto "github.com/bluesky-social/indigo/api/atproto" bsky "github.com/bluesky-social/indigo/api/bsky" @@ -538,6 +539,7 @@ func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, unlock := rm.lockUser(ctx, uid) defer unlock() + start := time.Now() root, ds, err := rm.cs.ImportSlice(ctx, uid, since, carslice) if err != nil { return fmt.Errorf("importing external carslice: %w", err) @@ -551,6 +553,7 @@ func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, if err := rm.CheckRepoSig(ctx, r, did); err != nil { return err } + openAndSigCheckDuration.Observe(time.Since(start).Seconds()) var skipcids map[cid.Cid]bool if ds.BaseCid().Defined() { @@ -571,10 +574,11 @@ func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, } } + start = time.Now() if err := ds.CalcDiff(ctx, skipcids); err != nil { return fmt.Errorf("failed while calculating mst diff (since=%v): %w", since, err) - } + calcDiffDuration.Observe(time.Since(start).Seconds()) evtops := make([]RepoOp, 0, len(ops)) @@ -631,10 +635,12 @@ func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, } } + start = time.Now() rslice, err := ds.CloseWithRoot(ctx, root, nrev) if err != nil { return fmt.Errorf("close with root: %w", err) } + writeCarSliceDuration.Observe(time.Since(start).Seconds()) if rm.events != nil { rm.events(ctx, &RepoEvent{ diff --git a/splitter/metrics.go b/splitter/metrics.go index a2173a639..76161ce45 100644 --- a/splitter/metrics.go +++ b/splitter/metrics.go @@ -9,3 +9,8 @@ var eventsSentCounter = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "spl_events_sent_counter", Help: "The total number of events sent to consumers", }, []string{"remote_addr", "user_agent"}) + +var activeClientGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "spl_active_clients", + Help: "Current number of active clients", +}) diff --git a/splitter/splitter.go b/splitter/splitter.go index 0397aa720..e167b7757 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -2,6 +2,7 @@ package splitter import ( "context" + "errors" "fmt" "io" "math/rand" @@ -28,30 +29,93 @@ import ( var log = logging.Logger("splitter") type Splitter struct { - Host string erb *EventRingBuffer + pp *events.PebblePersist events *events.EventManager - // cursor storage - cursorFile string - // Management of Socket Consumers consumersLk sync.RWMutex nextConsumerID uint64 consumers map[uint64]*SocketConsumer + + conf SplitterConfig +} + +type SplitterConfig struct { + UpstreamHost string + CursorFile string + PebbleOptions *events.PebblePersistOptions } -func NewSplitter(host string) *Splitter { +func NewMemSplitter(host string) *Splitter { + conf := SplitterConfig{ + UpstreamHost: host, + CursorFile: "cursor-file", + } + erb := NewEventRingBuffer(20_000, 10_000) em := events.NewEventManager(erb) return &Splitter{ - cursorFile: "cursor-file", - Host: host, - erb: erb, - events: em, - consumers: make(map[uint64]*SocketConsumer), + conf: conf, + erb: erb, + events: em, + consumers: make(map[uint64]*SocketConsumer), + } +} +func NewSplitter(conf SplitterConfig) (*Splitter, error) { + if conf.PebbleOptions == nil { + // mem splitter + erb := NewEventRingBuffer(20_000, 10_000) + + em := events.NewEventManager(erb) + return &Splitter{ + conf: conf, + erb: erb, + events: em, + consumers: make(map[uint64]*SocketConsumer), + }, nil + } else { + pp, err := events.NewPebblePersistance(conf.PebbleOptions) + if err != nil { + return nil, err + } + + go pp.GCThread(context.Background()) + em := events.NewEventManager(pp) + return &Splitter{ + conf: conf, + pp: pp, + events: em, + consumers: make(map[uint64]*SocketConsumer), + }, nil + } +} +func NewDiskSplitter(host, path string, persistHours float64, maxBytes int64) (*Splitter, error) { + ppopts := events.PebblePersistOptions{ + DbPath: path, + PersistDuration: time.Duration(float64(time.Hour) * persistHours), + GCPeriod: 5 * time.Minute, + MaxBytes: uint64(maxBytes), + } + conf := SplitterConfig{ + UpstreamHost: host, + CursorFile: "cursor-file", + PebbleOptions: &ppopts, + } + pp, err := events.NewPebblePersistance(&ppopts) + if err != nil { + return nil, err } + + go pp.GCThread(context.Background()) + em := events.NewEventManager(pp) + return &Splitter{ + conf: conf, + pp: pp, + events: em, + consumers: make(map[uint64]*SocketConsumer), + }, nil } func (s *Splitter) Start(addr string) error { @@ -64,7 +128,7 @@ func (s *Splitter) Start(addr string) error { return fmt.Errorf("loading cursor failed: %w", err) } - go s.subscribeWithRedialer(context.Background(), s.Host, curs) + go s.subscribeWithRedialer(context.Background(), s.conf.UpstreamHost, curs) li, err := lc.Listen(ctx, "tcp", addr) if err != nil { @@ -253,6 +317,8 @@ func (s *Splitter) EventsHandler(c echo.Context) error { "cursor", since, "consumer_id", consumerID, ) + activeClientGauge.Inc() + defer activeClientGauge.Dec() for { select { @@ -360,7 +426,12 @@ func (s *Splitter) subscribeWithRedialer(ctx context.Context, host string, curso "User-Agent": []string{"bgs-rainbow-v0"}, } - url := fmt.Sprintf("%s://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", protocol, host, cursor) + var url string + if cursor < 0 { + url = fmt.Sprintf("%s://%s/xrpc/com.atproto.sync.subscribeRepos", protocol, host) + } else { + url = fmt.Sprintf("%s://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", protocol, host, cursor) + } con, res, err := d.DialContext(ctx, url, header) if err != nil { log.Warnw("dialing failed", "host", host, "err", err, "backoff", backoff) @@ -394,6 +465,7 @@ func (s *Splitter) handleConnection(ctx context.Context, host string, con *webso } if seq%5000 == 0 { + // TODO: don't need this after we move to getting seq from pebble if err := s.writeCursor(seq); err != nil { log.Errorf("write cursor failed: %s", err) } @@ -407,27 +479,39 @@ func (s *Splitter) handleConnection(ctx context.Context, host string, con *webso } func (s *Splitter) getLastCursor() (int64, error) { - fi, err := os.Open(s.cursorFile) + if s.pp != nil { + seq, millis, _, err := s.pp.GetLast(context.Background()) + if err == nil { + log.Debugw("got last cursor from pebble", "seq", seq, "millis", millis) + return seq, nil + } else if errors.Is(err, events.ErrNoLast) { + log.Info("pebble no last") + } else { + log.Errorw("pebble seq fail", "err", err) + } + } + + fi, err := os.Open(s.conf.CursorFile) if err != nil { if os.IsNotExist(err) { - return 0, nil + return -1, nil } - return 0, err + return -1, err } b, err := io.ReadAll(fi) if err != nil { - return 0, err + return -1, err } v, err := strconv.ParseInt(string(b), 10, 64) if err != nil { - return 0, err + return -1, err } return v, nil } func (s *Splitter) writeCursor(curs int64) error { - return os.WriteFile(s.cursorFile, []byte(fmt.Sprint(curs)), 0664) + return os.WriteFile(s.conf.CursorFile, []byte(fmt.Sprint(curs)), 0664) } diff --git a/testing/utils.go b/testing/utils.go index 9b076ef17..7af6e1adc 100644 --- a/testing/utils.go +++ b/testing/utils.go @@ -117,7 +117,7 @@ func SetupPDS(ctx context.Context, suffix string, plc plc.PLCClient) (*TestPDS, return nil, err } - cs, err := carstore.NewCarStore(cardb, cspath) + cs, err := carstore.NewCarStore(cardb, []string{cspath}) if err != nil { return nil, err } @@ -550,7 +550,7 @@ func SetupRelay(ctx context.Context, didr plc.PLCClient) (*TestRelay, error) { return nil, err } - cs, err := carstore.NewCarStore(cardb, cspath) + cs, err := carstore.NewCarStore(cardb, []string{cspath}) if err != nil { return nil, err }