diff --git a/.github/workflows/container-rainbow-aws.yaml b/.github/workflows/container-rainbow-aws.yaml new file mode 100644 index 000000000..412be454a --- /dev/null +++ b/.github/workflows/container-rainbow-aws.yaml @@ -0,0 +1,52 @@ +name: container-rainbow-aws +on: [push] +env: + REGISTRY: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_REGISTRY }} + USERNAME: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_USERNAME }} + PASSWORD: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_PASSWORD }} + # github.repository as / + IMAGE_NAME: rainbow + +jobs: + container-rainbow-aws: + if: github.repository == 'bluesky-social/indigo' + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + id-token: write + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Setup Docker buildx + uses: docker/setup-buildx-action@v1 + + - name: Log into registry ${{ env.REGISTRY }} + uses: docker/login-action@v2 + with: + registry: ${{ env.REGISTRY }} + username: ${{ env.USERNAME }} + password: ${{ env.PASSWORD }} + + - name: Extract Docker metadata + id: meta + uses: docker/metadata-action@v4 + with: + images: | + ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + tags: | + type=sha,enable=true,priority=100,prefix=,suffix=,format=long + + - name: Build and push Docker image + id: build-and-push + uses: docker/build-push-action@v4 + with: + context: . + file: ./cmd/rainbow/Dockerfile + push: ${{ github.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/cmd/rainbow/Dockerfile b/cmd/rainbow/Dockerfile new file mode 100644 index 000000000..72bfc3572 --- /dev/null +++ b/cmd/rainbow/Dockerfile @@ -0,0 +1,43 @@ +FROM golang:1.22-bullseye AS build-env + +ENV DEBIAN_FRONTEND=noninteractive +ENV TZ=Etc/UTC +ENV GODEBUG="netdns=go" +ENV GOOS="linux" +ENV GOARCH="amd64" +ENV CGO_ENABLED="1" + +WORKDIR /usr/src/rainbow + +COPY . . + +RUN go mod download && \ + go mod verify + +RUN go build \ + -v \ + -trimpath \ + -tags timetzdata \ + -o /rainbow-bin \ + ./cmd/rainbow + +FROM debian:bullseye-slim + +ENV DEBIAN_FRONTEND="noninteractive" +ENV TZ=Etc/UTC +ENV GODEBUG="netdns=go" + +RUN apt-get update && apt-get install --yes \ + dumb-init \ + ca-certificates \ + runit + +WORKDIR /rainbow +COPY --from=build-env /rainbow-bin /usr/bin/rainbow + +ENTRYPOINT ["/usr/bin/dumb-init", "--"] +CMD ["/usr/bin/rainbow"] + +LABEL org.opencontainers.image.source=https://github.com/bluesky-social/indigo +LABEL org.opencontainers.image.description="bsky.app rainbow" +LABEL org.opencontainers.image.licenses=MIT diff --git a/cmd/rainbow/main.go b/cmd/rainbow/main.go new file mode 100644 index 000000000..398f56387 --- /dev/null +++ b/cmd/rainbow/main.go @@ -0,0 +1,201 @@ +package main + +import ( + "context" + "github.com/bluesky-social/indigo/events" + "os" + "os/signal" + "syscall" + "time" + + "github.com/bluesky-social/indigo/splitter" + "github.com/carlmjohnson/versioninfo" + _ "go.uber.org/automaxprocs" + + _ "net/http/pprof" + + _ "github.com/joho/godotenv/autoload" + + logging "github.com/ipfs/go-log" + "github.com/urfave/cli/v2" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/sdk/resource" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" +) + +var log = logging.Logger("splitter") + +func init() { + // control log level using, eg, GOLOG_LOG_LEVEL=debug + logging.SetAllLoggers(logging.LevelDebug) +} + +func main() { + run(os.Args) +} + +func run(args []string) { + app := cli.App{ + Name: "splitter", + Usage: "firehose proxy", + Version: versioninfo.Short(), + } + + app.Flags = []cli.Flag{ + &cli.BoolFlag{ + Name: "crawl-insecure-ws", + Usage: "when connecting to PDS instances, use ws:// instead of wss://", + }, + &cli.StringFlag{ + Name: "splitter-host", + Value: "bsky.network", + }, + &cli.StringFlag{ + Name: "persist-db", + Value: "", + Usage: "path to persistence db", + }, + &cli.StringFlag{ + Name: "cursor-file", + Value: "", + Usage: "write upstream cursor number to this file", + }, + &cli.StringFlag{ + Name: "api-listen", + Value: ":2480", + }, + &cli.StringFlag{ + Name: "metrics-listen", + Value: ":2481", + EnvVars: []string{"SPLITTER_METRICS_LISTEN"}, + }, + &cli.Float64Flag{ + Name: "persist-hours", + Value: 24 * 7, + EnvVars: []string{"SPLITTER_PERSIST_HOURS"}, + Usage: "hours to buffer (float, may be fractional)", + }, + &cli.Int64Flag{ + Name: "persist-bytes", + Value: 0, + Usage: "max bytes target for event cache, 0 to disable size target trimming", + EnvVars: []string{"SPLITTER_PERSIST_BYTES"}, + }, + } + + app.Action = Splitter + err := app.Run(os.Args) + if err != nil { + log.Fatal(err) + } +} + +func Splitter(cctx *cli.Context) error { + // Trap SIGINT to trigger a shutdown. + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) + + // Enable OTLP HTTP exporter + // For relevant environment variables: + // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables + // At a minimum, you need to set + // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 + if ep := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); ep != "" { + log.Infow("setting up trace exporter", "endpoint", ep) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + exp, err := otlptracehttp.New(ctx) + if err != nil { + log.Fatalw("failed to create trace exporter", "error", err) + } + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if err := exp.Shutdown(ctx); err != nil { + log.Errorw("failed to shutdown trace exporter", "error", err) + } + }() + + tp := tracesdk.NewTracerProvider( + tracesdk.WithBatcher(exp), + tracesdk.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String("splitter"), + attribute.String("env", os.Getenv("ENVIRONMENT")), // DataDog + attribute.String("environment", os.Getenv("ENVIRONMENT")), // Others + attribute.Int64("ID", 1), + )), + ) + otel.SetTracerProvider(tp) + } + + persistPath := cctx.String("persist-db") + upstreamHost := cctx.String("splitter-host") + var spl *splitter.Splitter + var err error + if persistPath != "" { + log.Infof("building splitter with storage at: %s", persistPath) + ppopts := events.PebblePersistOptions{ + DbPath: persistPath, + PersistDuration: time.Duration(float64(time.Hour) * cctx.Float64("persist-hours")), + GCPeriod: 5 * time.Minute, + MaxBytes: uint64(cctx.Int64("persist-bytes")), + } + conf := splitter.SplitterConfig{ + UpstreamHost: upstreamHost, + CursorFile: cctx.String("cursor-file"), + PebbleOptions: &ppopts, + } + spl, err = splitter.NewSplitter(conf) + } else { + log.Info("building in-memory splitter") + conf := splitter.SplitterConfig{ + UpstreamHost: upstreamHost, + CursorFile: cctx.String("cursor-file"), + } + spl, err = splitter.NewSplitter(conf) + } + if err != nil { + log.Fatalw("failed to create splitter", "path", persistPath, "error", err) + return err + } + + // set up metrics endpoint + go func() { + if err := spl.StartMetrics(cctx.String("metrics-listen")); err != nil { + log.Fatalf("failed to start metrics endpoint: %s", err) + } + }() + + runErr := make(chan error, 1) + + go func() { + err := spl.Start(cctx.String("api-listen")) + runErr <- err + }() + + log.Infow("startup complete") + select { + case <-signals: + log.Info("received shutdown signal") + if err := spl.Shutdown(); err != nil { + log.Errorw("error during Splitter shutdown", "err", err) + } + case err := <-runErr: + if err != nil { + log.Errorw("error during Splitter startup", "err", err) + } + log.Info("shutting down") + if err := spl.Shutdown(); err != nil { + log.Errorw("error during Splitter shutdown", "err", err) + } + } + + log.Info("shutdown complete") + + return nil +} diff --git a/events/dbpersist_test.go b/events/dbpersist_test.go index ad8d266b6..4e7ecdc74 100644 --- a/events/dbpersist_test.go +++ b/events/dbpersist_test.go @@ -1,4 +1,4 @@ -package events_test +package events import ( "context" @@ -11,19 +11,18 @@ import ( atproto "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/indigo/carstore" - "github.com/bluesky-social/indigo/events" lexutil "github.com/bluesky-social/indigo/lex/util" "github.com/bluesky-social/indigo/models" - "github.com/bluesky-social/indigo/pds" + pds "github.com/bluesky-social/indigo/pds/data" "github.com/bluesky-social/indigo/repomgr" "github.com/bluesky-social/indigo/util" - "github.com/ipfs/go-log/v2" + logging "github.com/ipfs/go-log/v2" "gorm.io/driver/sqlite" "gorm.io/gorm" ) func init() { - log.SetAllLoggers(log.LevelDebug) + logging.SetAllLoggers(logging.LevelDebug) } func BenchmarkDBPersist(b *testing.B) { @@ -61,24 +60,24 @@ func BenchmarkDBPersist(b *testing.B) { defer os.RemoveAll(tempPath) // Initialize a DBPersister - dbp, err := events.NewDbPersistence(db, cs, nil) + dbp, err := NewDbPersistence(db, cs, nil) if err != nil { b.Fatal(err) } // Create a bunch of events - evtman := events.NewEventManager(dbp) + evtman := NewEventManager(dbp) userRepoHead, err := mgr.GetRepoRoot(ctx, 1) if err != nil { b.Fatal(err) } - inEvts := make([]*events.XRPCStreamEvent, b.N) + inEvts := make([]*XRPCStreamEvent, b.N) for i := 0; i < b.N; i++ { cidLink := lexutil.LexLink(cid) headLink := lexutil.LexLink(userRepoHead) - inEvts[i] = &events.XRPCStreamEvent{ + inEvts[i] = &XRPCStreamEvent{ RepoCommit: &atproto.SyncSubscribeRepos_Commit{ Repo: "did:example:123", Commit: headLink, @@ -136,7 +135,7 @@ func BenchmarkDBPersist(b *testing.B) { b.StopTimer() - dbp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { + dbp.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { outEvtCount++ return nil }) @@ -183,24 +182,24 @@ func BenchmarkPlayback(b *testing.B) { defer os.RemoveAll(tempPath) // Initialize a DBPersister - dbp, err := events.NewDbPersistence(db, cs, nil) + dbp, err := NewDbPersistence(db, cs, nil) if err != nil { b.Fatal(err) } // Create a bunch of events - evtman := events.NewEventManager(dbp) + evtman := NewEventManager(dbp) userRepoHead, err := mgr.GetRepoRoot(ctx, 1) if err != nil { b.Fatal(err) } - inEvts := make([]*events.XRPCStreamEvent, n) + inEvts := make([]*XRPCStreamEvent, n) for i := 0; i < n; i++ { cidLink := lexutil.LexLink(cid) headLink := lexutil.LexLink(userRepoHead) - inEvts[i] = &events.XRPCStreamEvent{ + inEvts[i] = &XRPCStreamEvent{ RepoCommit: &atproto.SyncSubscribeRepos_Commit{ Repo: "did:example:123", Commit: headLink, @@ -256,7 +255,7 @@ func BenchmarkPlayback(b *testing.B) { b.ResetTimer() - dbp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { + dbp.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { outEvtCount++ return nil }) diff --git a/events/diskpersist_test.go b/events/diskpersist_test.go index 5d09c0fc2..74d0d62a5 100644 --- a/events/diskpersist_test.go +++ b/events/diskpersist_test.go @@ -1,4 +1,4 @@ -package events_test +package events import ( "context" @@ -14,16 +14,15 @@ import ( atproto "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/indigo/carstore" - "github.com/bluesky-social/indigo/events" lexutil "github.com/bluesky-social/indigo/lex/util" "github.com/bluesky-social/indigo/models" - "github.com/bluesky-social/indigo/pds" + pds "github.com/bluesky-social/indigo/pds/data" "github.com/bluesky-social/indigo/repomgr" "github.com/bluesky-social/indigo/util" "gorm.io/gorm" ) -func TestDiskPersist(t *testing.T) { +func testPersister(t *testing.T, perisistenceFactory func(path string, db *gorm.DB) (EventPersistence, error)) { ctx := context.Background() db, _, cs, tempPath, err := setupDBs(t) @@ -57,19 +56,14 @@ func TestDiskPersist(t *testing.T) { defer os.RemoveAll(tempPath) - // Initialize a DBPersister - - dp, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ - EventsPerFile: 10, - UIDCacheSize: 100000, - DIDCacheSize: 100000, - }) + // Initialize a persister + dp, err := perisistenceFactory(tempPath, db) if err != nil { t.Fatal(err) } // Create a bunch of events - evtman := events.NewEventManager(dp) + evtman := NewEventManager(dp) userRepoHead, err := mgr.GetRepoRoot(ctx, 1) if err != nil { @@ -77,11 +71,11 @@ func TestDiskPersist(t *testing.T) { } n := 100 - inEvts := make([]*events.XRPCStreamEvent, n) + inEvts := make([]*XRPCStreamEvent, n) for i := 0; i < n; i++ { cidLink := lexutil.LexLink(cid) headLink := lexutil.LexLink(userRepoHead) - inEvts[i] = &events.XRPCStreamEvent{ + inEvts[i] = &XRPCStreamEvent{ RepoCommit: &atproto.SyncSubscribeRepos_Commit{ Repo: "did:example:123", Commit: headLink, @@ -93,6 +87,7 @@ func TestDiskPersist(t *testing.T) { }, }, Time: time.Now().Format(util.ISO8601), + Seq: int64(i), }, } } @@ -112,7 +107,7 @@ func TestDiskPersist(t *testing.T) { outEvtCount := 0 expectedEvtCount := n - dp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { + dp.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { outEvtCount++ return nil }) @@ -125,7 +120,7 @@ func TestDiskPersist(t *testing.T) { time.Sleep(time.Millisecond * 100) - dp2, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ + dp2, err := NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{ EventsPerFile: 10, UIDCacheSize: 100000, DIDCacheSize: 100000, @@ -134,13 +129,13 @@ func TestDiskPersist(t *testing.T) { t.Fatal(err) } - evtman2 := events.NewEventManager(dp2) + evtman2 := NewEventManager(dp2) - inEvts = make([]*events.XRPCStreamEvent, n) + inEvts = make([]*XRPCStreamEvent, n) for i := 0; i < n; i++ { cidLink := lexutil.LexLink(cid) headLink := lexutil.LexLink(userRepoHead) - inEvts[i] = &events.XRPCStreamEvent{ + inEvts[i] = &XRPCStreamEvent{ RepoCommit: &atproto.SyncSubscribeRepos_Commit{ Repo: "did:example:123", Commit: headLink, @@ -163,6 +158,16 @@ func TestDiskPersist(t *testing.T) { } } } +func TestDiskPersist(t *testing.T) { + factory := func(tempPath string, db *gorm.DB) (EventPersistence, error) { + return NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{ + EventsPerFile: 10, + UIDCacheSize: 100000, + DIDCacheSize: 100000, + }) + } + testPersister(t, factory) +} func BenchmarkDiskPersist(b *testing.B) { db, _, cs, tempPath, err := setupDBs(b) @@ -174,7 +179,7 @@ func BenchmarkDiskPersist(b *testing.B) { // Initialize a DBPersister - dp, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ + dp, err := NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{ EventsPerFile: 5000, UIDCacheSize: 100000, DIDCacheSize: 100000, @@ -187,7 +192,7 @@ func BenchmarkDiskPersist(b *testing.B) { } -func runPersisterBenchmark(b *testing.B, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) { +func runPersisterBenchmark(b *testing.B, cs carstore.CarStore, db *gorm.DB, p EventPersistence) { ctx := context.Background() db.AutoMigrate(&pds.User{}) @@ -215,18 +220,18 @@ func runPersisterBenchmark(b *testing.B, cs carstore.CarStore, db *gorm.DB, p ev } // Create a bunch of events - evtman := events.NewEventManager(p) + evtman := NewEventManager(p) userRepoHead, err := mgr.GetRepoRoot(ctx, 1) if err != nil { b.Fatal(err) } - inEvts := make([]*events.XRPCStreamEvent, b.N) + inEvts := make([]*XRPCStreamEvent, b.N) for i := 0; i < b.N; i++ { cidLink := lexutil.LexLink(cid) headLink := lexutil.LexLink(userRepoHead) - inEvts[i] = &events.XRPCStreamEvent{ + inEvts[i] = &XRPCStreamEvent{ RepoCommit: &atproto.SyncSubscribeRepos_Commit{ Repo: "did:example:123", Commit: headLink, @@ -290,7 +295,7 @@ func TestDiskPersister(t *testing.T) { // Initialize a DBPersister - dp, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ + dp, err := NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{ EventsPerFile: 20, UIDCacheSize: 100000, DIDCacheSize: 100000, @@ -302,7 +307,7 @@ func TestDiskPersister(t *testing.T) { runEventManagerTest(t, cs, db, dp) } -func runEventManagerTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) { +func runEventManagerTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p EventPersistence) { ctx := context.Background() db.AutoMigrate(&pds.User{}) @@ -329,7 +334,7 @@ func runEventManagerTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p even t.Fatal(err) } - evtman := events.NewEventManager(p) + evtman := NewEventManager(p) userRepoHead, err := mgr.GetRepoRoot(ctx, 1) if err != nil { @@ -337,11 +342,11 @@ func runEventManagerTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p even } testSize := 100 // you can adjust this number as needed - inEvts := make([]*events.XRPCStreamEvent, testSize) + inEvts := make([]*XRPCStreamEvent, testSize) for i := 0; i < testSize; i++ { cidLink := lexutil.LexLink(cid) headLink := lexutil.LexLink(userRepoHead) - inEvts[i] = &events.XRPCStreamEvent{ + inEvts[i] = &XRPCStreamEvent{ RepoCommit: &atproto.SyncSubscribeRepos_Commit{ Repo: "did:example:123", Commit: headLink, @@ -368,7 +373,7 @@ func runEventManagerTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p even } outEvtCount := 0 - p.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { + p.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { // Check that the contents of the output events match the input events // Clear cache, don't care if one has it and not the other inEvts[outEvtCount].Preserialized = nil @@ -397,7 +402,7 @@ func TestDiskPersisterTakedowns(t *testing.T) { // Initialize a DBPersister - dp, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ + dp, err := NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{ EventsPerFile: 10, UIDCacheSize: 100000, DIDCacheSize: 100000, @@ -409,7 +414,7 @@ func TestDiskPersisterTakedowns(t *testing.T) { runTakedownTest(t, cs, db, dp) } -func runTakedownTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) { +func runTakedownTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p EventPersistence) { ctx := context.TODO() db.AutoMigrate(&pds.User{}) @@ -439,10 +444,10 @@ func runTakedownTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p events.E } } - evtman := events.NewEventManager(p) + evtman := NewEventManager(p) testSize := 100 // you can adjust this number as needed - inEvts := make([]*events.XRPCStreamEvent, testSize*userCount) + inEvts := make([]*XRPCStreamEvent, testSize*userCount) for i := 0; i < testSize*userCount; i++ { user := users[i%userCount] _, cid, err := mgr.CreateRecord(ctx, user.Uid, "app.bsky.feed.post", &bsky.FeedPost{ @@ -460,7 +465,7 @@ func runTakedownTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p events.E cidLink := lexutil.LexLink(cid) headLink := lexutil.LexLink(userRepoHead) - inEvts[i] = &events.XRPCStreamEvent{ + inEvts[i] = &XRPCStreamEvent{ RepoCommit: &atproto.SyncSubscribeRepos_Commit{ Repo: user.Did, Commit: headLink, @@ -495,7 +500,7 @@ func runTakedownTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p events.E // Verify that the events of the user have been removed from the event stream var evtsCount int - if err := p.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { + if err := p.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { evtsCount++ if evt.RepoCommit.Repo == takeDownUser.Did { t.Fatalf("found event for user %d after takedown", takeDownUser.Uid) diff --git a/events/events.go b/events/events.go index 915beaf19..ba6d0faad 100644 --- a/events/events.go +++ b/events/events.go @@ -219,6 +219,78 @@ func (evt *XRPCStreamEvent) Serialize(wc io.Writer) error { return obj.MarshalCBOR(cborWriter) } +func (xevt *XRPCStreamEvent) Deserialize(r io.Reader) error { + var header EventHeader + if err := header.UnmarshalCBOR(r); err != nil { + return fmt.Errorf("reading header: %w", err) + } + switch header.Op { + case EvtKindMessage: + switch header.MsgType { + case "#commit": + var evt comatproto.SyncSubscribeRepos_Commit + if err := evt.UnmarshalCBOR(r); err != nil { + return fmt.Errorf("reading repoCommit event: %w", err) + } + xevt.RepoCommit = &evt + case "#handle": + var evt comatproto.SyncSubscribeRepos_Handle + if err := evt.UnmarshalCBOR(r); err != nil { + return err + } + xevt.RepoHandle = &evt + case "#identity": + var evt comatproto.SyncSubscribeRepos_Identity + if err := evt.UnmarshalCBOR(r); err != nil { + return err + } + xevt.RepoIdentity = &evt + case "#account": + var evt comatproto.SyncSubscribeRepos_Account + if err := evt.UnmarshalCBOR(r); err != nil { + return err + } + xevt.RepoAccount = &evt + case "#info": + // TODO: this might also be a LabelInfo (as opposed to RepoInfo) + var evt comatproto.SyncSubscribeRepos_Info + if err := evt.UnmarshalCBOR(r); err != nil { + return err + } + xevt.RepoInfo = &evt + case "#migrate": + var evt comatproto.SyncSubscribeRepos_Migrate + if err := evt.UnmarshalCBOR(r); err != nil { + return err + } + xevt.RepoMigrate = &evt + case "#tombstone": + var evt comatproto.SyncSubscribeRepos_Tombstone + if err := evt.UnmarshalCBOR(r); err != nil { + return err + } + xevt.RepoTombstone = &evt + case "#labels": + var evt comatproto.LabelSubscribeLabels_Labels + if err := evt.UnmarshalCBOR(r); err != nil { + return fmt.Errorf("reading Labels event: %w", err) + } + xevt.LabelLabels = &evt + } + case EvtKindErrorFrame: + var errframe ErrorFrame + if err := errframe.UnmarshalCBOR(r); err != nil { + return err + } + xevt.Error = &errframe + default: + return fmt.Errorf("unrecognized event stream type: %d", header.Op) + } + return nil +} + +var ErrNoSeq = errors.New("event has no sequence number") + // serialize content into Preserialized cache func (evt *XRPCStreamEvent) Preserialize() error { if evt.Preserialized != nil { @@ -290,7 +362,7 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func case <-done: return ErrPlaybackShutdown case out <- e: - seq := sequenceForEvent(e) + seq := SequenceForEvent(e) if seq > 0 { lastSeq = seq } @@ -315,8 +387,8 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func // run playback again to get us to the events that have started buffering if err := em.persister.Playback(ctx, lastSeq, func(e *XRPCStreamEvent) error { - seq := sequenceForEvent(e) - if seq > sequenceForEvent(first) { + seq := SequenceForEvent(e) + if seq > SequenceForEvent(first) { return ErrCaughtUp } @@ -351,7 +423,11 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func return out, sub.cleanup, nil } -func sequenceForEvent(evt *XRPCStreamEvent) int64 { +func SequenceForEvent(evt *XRPCStreamEvent) int64 { + return evt.Sequence() +} + +func (evt *XRPCStreamEvent) Sequence() int64 { switch { case evt == nil: return -1 diff --git a/events/pebblepersist.go b/events/pebblepersist.go new file mode 100644 index 000000000..2c1c787e5 --- /dev/null +++ b/events/pebblepersist.go @@ -0,0 +1,262 @@ +package events + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/hex" + "errors" + "fmt" + "time" + + "github.com/bluesky-social/indigo/models" + "github.com/cockroachdb/pebble" +) + +type PebblePersist struct { + broadcast func(*XRPCStreamEvent) + db *pebble.DB + + prevSeq int64 + prevSeqExtra uint32 + + cancel func() + + options PebblePersistOptions +} + +type PebblePersistOptions struct { + // path where pebble will create a directory full of files + DbPath string + + // Throw away posts older than some time ago + PersistDuration time.Duration + + // Throw away old posts every so often + GCPeriod time.Duration + + // MaxBytes is what we _try_ to keep disk usage under + MaxBytes uint64 +} + +var DefaultPebblePersistOptions = PebblePersistOptions{ + PersistDuration: time.Minute * 20, + GCPeriod: time.Minute * 5, + MaxBytes: 1024 * 1024 * 1024, // 1 GiB +} + +// Create a new EventPersistence which stores data in pebbledb +// nil opts is ok +func NewPebblePersistance(opts *PebblePersistOptions) (*PebblePersist, error) { + if opts == nil { + opts = &DefaultPebblePersistOptions + } + db, err := pebble.Open(opts.DbPath, &pebble.Options{}) + if err != nil { + return nil, fmt.Errorf("%s: %w", opts.DbPath, err) + } + pp := new(PebblePersist) + pp.options = *opts + pp.db = db + return pp, nil +} + +func setKeySeqMillis(key []byte, seq, millis int64) { + binary.BigEndian.PutUint64(key[:8], uint64(seq)) + binary.BigEndian.PutUint64(key[8:16], uint64(millis)) +} + +func (pp *PebblePersist) Persist(ctx context.Context, e *XRPCStreamEvent) error { + err := e.Preserialize() + if err != nil { + return err + } + blob := e.Preserialized + + seq := e.Sequence() + nowMillis := time.Now().UnixMilli() + + if seq < 0 { + // persist with longer key {prev 8 byte key}{time}{int32 extra counter} + pp.prevSeqExtra++ + var key [20]byte + setKeySeqMillis(key[:], seq, nowMillis) + binary.BigEndian.PutUint32(key[16:], pp.prevSeqExtra) + + err = pp.db.Set(key[:], blob, pebble.Sync) + } else { + pp.prevSeq = seq + pp.prevSeqExtra = 0 + var key [16]byte + setKeySeqMillis(key[:], seq, nowMillis) + + err = pp.db.Set(key[:], blob, pebble.Sync) + } + + if err != nil { + return err + } + pp.broadcast(e) + + return err +} + +func eventFromPebbleIter(iter *pebble.Iterator) (*XRPCStreamEvent, error) { + blob, err := iter.ValueAndErr() + if err != nil { + return nil, err + } + br := bytes.NewReader(blob) + evt := new(XRPCStreamEvent) + err = evt.Deserialize(br) + if err != nil { + return nil, err + } + evt.Preserialized = bytes.Clone(blob) + return evt, nil +} + +func (pp *PebblePersist) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error { + var key [8]byte + binary.BigEndian.PutUint64(key[:], uint64(since)) + + iter, err := pp.db.NewIterWithContext(ctx, &pebble.IterOptions{LowerBound: key[:]}) + if err != nil { + return err + } + defer iter.Close() + + for iter.First(); iter.Valid(); iter.Next() { + evt, err := eventFromPebbleIter(iter) + if err != nil { + return err + } + + err = cb(evt) + if err != nil { + return err + } + } + + return nil +} +func (pp *PebblePersist) TakeDownRepo(ctx context.Context, usr models.Uid) error { + // TODO: implement filter on playback to ignore taken-down-repos? + return nil +} +func (pp *PebblePersist) Flush(context.Context) error { + return pp.db.Flush() +} +func (pp *PebblePersist) Shutdown(context.Context) error { + if pp.cancel != nil { + pp.cancel() + } + err := pp.db.Close() + pp.db = nil + return err +} + +func (pp *PebblePersist) SetEventBroadcaster(broadcast func(*XRPCStreamEvent)) { + pp.broadcast = broadcast +} + +var ErrNoLast = errors.New("no last event") + +func (pp *PebblePersist) GetLast(ctx context.Context) (seq, millis int64, evt *XRPCStreamEvent, err error) { + iter, err := pp.db.NewIterWithContext(ctx, &pebble.IterOptions{}) + if err != nil { + return 0, 0, nil, err + } + ok := iter.Last() + if !ok { + return 0, 0, nil, ErrNoLast + } + evt, err = eventFromPebbleIter(iter) + keyblob := iter.Key() + seq = int64(binary.BigEndian.Uint64(keyblob[:8])) + millis = int64(binary.BigEndian.Uint64(keyblob[8:16])) + return seq, millis, evt, nil +} + +// example; +// ``` +// pp := NewPebblePersistance("/tmp/foo.pebble") +// go pp.GCThread(context.Background(), 48 * time.Hour, 5 * time.Minute) +// ``` +func (pp *PebblePersist) GCThread(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + pp.cancel = cancel + ticker := time.NewTicker(pp.options.GCPeriod) + defer ticker.Stop() + for { + select { + case <-ticker.C: + err := pp.GarbageCollect(ctx) + if err != nil { + log.Errorw("GC err", "err", err) + } + case <-ctx.Done(): + return + } + } +} + +var zeroKey [16]byte +var ffffKey [16]byte + +func init() { + setKeySeqMillis(zeroKey[:], 0, 0) + for i := range ffffKey { + ffffKey[i] = 0xff + } +} + +func (pp *PebblePersist) GarbageCollect(ctx context.Context) error { + nowMillis := time.Now().UnixMilli() + expired := nowMillis - pp.options.PersistDuration.Milliseconds() + iter, err := pp.db.NewIterWithContext(ctx, &pebble.IterOptions{}) + if err != nil { + return err + } + defer iter.Close() + // scan keys to find last expired, then delete range + var seq int64 = int64(-1) + var lastKeyTime int64 + for iter.First(); iter.Valid(); iter.Next() { + keyblob := iter.Key() + + keyTime := int64(binary.BigEndian.Uint64(keyblob[8:16])) + if keyTime <= expired { + lastKeyTime = keyTime + seq = int64(binary.BigEndian.Uint64(keyblob[:8])) + } else { + break + } + } + + // TODO: use pp.options.MaxBytes + + sizeBefore, _ := pp.db.EstimateDiskUsage(zeroKey[:], ffffKey[:]) + if seq == -1 { + // nothing to delete + log.Infow("pebble gc nop", "size", sizeBefore) + return nil + } + var key [16]byte + setKeySeqMillis(key[:], seq, lastKeyTime) + log.Infow("pebble gc start", "to", hex.EncodeToString(key[:])) + err = pp.db.DeleteRange(zeroKey[:], key[:], pebble.Sync) + if err != nil { + return err + } + sizeAfter, _ := pp.db.EstimateDiskUsage(zeroKey[:], ffffKey[:]) + log.Infow("pebble gc", "before", sizeBefore, "after", sizeAfter) + start := time.Now() + err = pp.db.Compact(zeroKey[:], key[:], true) + if err != nil { + log.Warnw("pebble gc compact", "err", err) + } + dt := time.Since(start) + log.Infow("pebble gc compact ok", "dt", dt) + return nil +} diff --git a/events/pebblepersist_test.go b/events/pebblepersist_test.go new file mode 100644 index 000000000..901365c5d --- /dev/null +++ b/events/pebblepersist_test.go @@ -0,0 +1,16 @@ +package events + +import ( + "gorm.io/gorm" + "path/filepath" + "testing" +) + +func TestPebblePersist(t *testing.T) { + factory := func(tempPath string, db *gorm.DB) (EventPersistence, error) { + opts := DefaultPebblePersistOptions + opts.DbPath = filepath.Join(tempPath, "pebble.db") + return NewPebblePersistance(&opts) + } + testPersister(t, factory) +} diff --git a/go.mod b/go.mod index 66391db61..da1a77d84 100644 --- a/go.mod +++ b/go.mod @@ -10,11 +10,11 @@ require ( github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de github.com/brianvoe/gofakeit/v6 v6.25.0 github.com/carlmjohnson/versioninfo v0.22.5 + github.com/cockroachdb/pebble v1.1.2 github.com/dustinkirkland/golang-petname v0.0.0-20231002161417-6a283f1aaaf2 github.com/flosch/pongo2/v6 v6.0.0 github.com/go-redis/cache/v9 v9.0.0 github.com/goccy/go-json v0.10.2 - github.com/gocql/gocql v1.7.0 github.com/golang-jwt/jwt v3.2.2+incompatible github.com/gorilla/websocket v1.5.1 github.com/hashicorp/go-retryablehttp v0.7.5 @@ -66,7 +66,7 @@ require ( go.opentelemetry.io/otel/trace v1.21.0 go.uber.org/automaxprocs v1.5.3 golang.org/x/crypto v0.21.0 - golang.org/x/sync v0.5.0 + golang.org/x/sync v0.7.0 golang.org/x/text v0.14.0 golang.org/x/time v0.3.0 golang.org/x/tools v0.15.0 @@ -78,23 +78,32 @@ require ( ) require ( + github.com/DataDog/zstd v1.4.5 // indirect + github.com/cockroachdb/errors v1.11.3 // indirect + github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect + github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect + github.com/cockroachdb/redact v1.1.5 // indirect + github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/getsentry/sentry-go v0.27.0 // indirect github.com/go-redis/redis v6.15.9+incompatible // indirect - github.com/golang/snappy v0.0.3 // indirect - github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/klauspost/compress v1.17.3 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/kr/text v0.2.0 // indirect github.com/labstack/gommon v0.4.1 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/rogpeppe/go-internal v1.10.0 // indirect github.com/vmihailenco/go-tinylfu v0.2.2 // indirect github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect - gopkg.in/inf.v0 v0.9.1 // indirect ) require ( @@ -167,12 +176,12 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.14.0 // indirect - golang.org/x/net v0.21.0 // indirect + golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.22.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20231120223509-83a465c0220f // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect google.golang.org/grpc v1.59.0 // indirect - google.golang.org/protobuf v1.31.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.2.1 // indirect diff --git a/go.sum b/go.sum index 8cd2edd60..446851e37 100644 --- a/go.sum +++ b/go.sum @@ -35,6 +35,8 @@ contrib.go.opencensus.io/exporter/prometheus v0.4.2/go.mod h1:dvEHbiKmgvbr5pjaF9 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= +github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/PuerkitoBio/purell v1.2.1 h1:QsZ4TjvwiMpat6gBCBxEQI0rcS9ehtkKtSpiUnd9N28= github.com/PuerkitoBio/purell v1.2.1/go.mod h1:ZwHcC/82TOaovDi//J/804umJFFmbOHPngi8iYYv/Eo= github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b h1:5/++qT1/z812ZqBvqQt6ToRswSuPZ/B33m6xVHRzADU= @@ -71,10 +73,6 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= -github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= -github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= -github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/brianvoe/gofakeit/v6 v6.25.0 h1:ZpFjktOpLZUeF8q223o0rUuXtA+m5qW5srjvVi+JkXk= github.com/brianvoe/gofakeit/v6 v6.25.0/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= @@ -95,6 +93,20 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f h1:otljaYPt5hWxV3MUfO5dFPFiOXg9CyG5/kCfayTqsJ4= +github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= +github.com/cockroachdb/errors v1.11.3 h1:5bA+k2Y6r+oz/6Z/RFlNeVCesGARKuC6YymtcDrbC/I= +github.com/cockroachdb/errors v1.11.3/go.mod h1:m4UIW4CDjx+R5cybPsNrRbreomiFqt8o1h1wUVazSd8= +github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce h1:giXvy4KSc/6g/esnpM7Geqxka4WSqI1SZc7sMJFd3y4= +github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce/go.mod h1:9/y3cnZ5GKakj/H4y9r9GTjCvAFta7KLgSHPJJYc52M= +github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE= +github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= +github.com/cockroachdb/pebble v1.1.2 h1:CUh2IPtR4swHlEj48Rhfzw6l/d0qA31fItcIszQVIsA= +github.com/cockroachdb/pebble v1.1.2/go.mod h1:4exszw1r40423ZsmkG/09AFEG83I0uDgfujJdbL6kYU= +github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30= +github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= +github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= +github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= github.com/corpix/uarand v0.2.0 h1:U98xXwud/AVuCpkpgfPF7J5TQgr7R5tqT8VZP5KWbzE= github.com/corpix/uarand v0.2.0/go.mod h1:/3Z1QIqWkDIhf6XWn/08/uMHoQ8JUoTIKc2iPchBOmM= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= @@ -126,6 +138,10 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= +github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= +github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= +github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -156,8 +172,6 @@ github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg78 github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= -github.com/gocql/gocql v1.7.0 h1:O+7U7/1gSN7QTEAaMEsJc1Oq2QHXvCWoF3DFK9HDHus= -github.com/gocql/gocql v1.7.0/go.mod h1:vnlvXyFZeLBF0Wy+RS8hrOdbn0UWsWtdg07XJnFxZ+4= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -195,8 +209,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= -github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -239,8 +253,6 @@ github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/ github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 h1:6UKoz5ujsI55KNpsJH3UwCq3T8kKbZwNZBNPuTTje8U= github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1/go.mod h1:YvJ2f6MplWDhfxiUC3KpyTy76kYUZA4W3pTv/wdKQ9Y= -github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= -github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI= @@ -512,8 +524,12 @@ github.com/orandin/slog-gorm v1.3.2 h1:C0lKDQPAx/pF+8K2HL7bdShPwOEJpPM0Bn80zTzxU github.com/orandin/slog-gorm v1.3.2/go.mod h1:MoZ51+b7xE9lwGNPYEhxcUtRNrYzjdcKvA8QXQQGEPA= github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 h1:1/WtZae0yGtPq+TI6+Tv1WTxkukpXeMlviSxvL7SRgk= github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9/go.mod h1:x3N5drFsm2uilKKuuYo6LdyD8vZAW55sH/9w+pbo1sw= +github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= +github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -564,6 +580,7 @@ github.com/rivo/uniseg v0.1.0 h1:+2KBaVoUmb9XzDsrx/Ct0W/EYOSFf/nWTauy++DprtY= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -795,8 +812,8 @@ golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= -golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -818,8 +835,8 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= -golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1061,8 +1078,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -1071,8 +1088,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= -gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= -gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/pds/data/types.go b/pds/data/types.go new file mode 100644 index 000000000..f7ff96e54 --- /dev/null +++ b/pds/data/types.go @@ -0,0 +1,27 @@ +package data + +import ( + "github.com/bluesky-social/indigo/models" + "gorm.io/gorm" + "time" +) + +type User struct { + ID models.Uid `gorm:"primarykey"` + CreatedAt time.Time + UpdatedAt time.Time + DeletedAt gorm.DeletedAt `gorm:"index"` + Handle string `gorm:"uniqueIndex"` + Password string + RecoveryKey string + Email string + Did string `gorm:"uniqueIndex"` + PDS uint +} + +type Peering struct { + gorm.Model + Host string + Did string + Approved bool +} diff --git a/pds/server.go b/pds/server.go index b9d1c903b..54f1dfed1 100644 --- a/pds/server.go +++ b/pds/server.go @@ -21,6 +21,7 @@ import ( lexutil "github.com/bluesky-social/indigo/lex/util" "github.com/bluesky-social/indigo/models" "github.com/bluesky-social/indigo/notifs" + pdsdata "github.com/bluesky-social/indigo/pds/data" "github.com/bluesky-social/indigo/plc" "github.com/bluesky-social/indigo/repomgr" "github.com/bluesky-social/indigo/util" @@ -456,18 +457,7 @@ func (s *Server) HandleResolveDid(c echo.Context) error { return c.String(200, u.Did) } -type User struct { - ID models.Uid `gorm:"primarykey"` - CreatedAt time.Time - UpdatedAt time.Time - DeletedAt gorm.DeletedAt `gorm:"index"` - Handle string `gorm:"uniqueIndex"` - Password string - RecoveryKey string - Email string - Did string `gorm:"uniqueIndex"` - PDS uint -} +type User = pdsdata.User type RefreshToken struct { gorm.Model @@ -636,12 +626,7 @@ func (s *Server) invalidateToken(ctx context.Context, u *User, tok *jwt.Token) e panic("nyi") } -type Peering struct { - gorm.Model - Host string - Did string - Approved bool -} +type Peering = pdsdata.Peering func (s *Server) EventsHandler(c echo.Context) error { conn, err := websocket.Upgrade(c.Response().Writer, c.Request(), c.Response().Header(), 1<<10, 1<<10) diff --git a/splitter/metrics.go b/splitter/metrics.go new file mode 100644 index 000000000..76161ce45 --- /dev/null +++ b/splitter/metrics.go @@ -0,0 +1,16 @@ +package splitter + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var eventsSentCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "spl_events_sent_counter", + Help: "The total number of events sent to consumers", +}, []string{"remote_addr", "user_agent"}) + +var activeClientGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "spl_active_clients", + Help: "Current number of active clients", +}) diff --git a/splitter/ringbuf.go b/splitter/ringbuf.go new file mode 100644 index 000000000..2417f4eb0 --- /dev/null +++ b/splitter/ringbuf.go @@ -0,0 +1,144 @@ +package splitter + +import ( + "context" + "sync" + + events "github.com/bluesky-social/indigo/events" + "github.com/bluesky-social/indigo/models" +) + +func NewEventRingBuffer(chunkSize, nchunks int) *EventRingBuffer { + return &EventRingBuffer{ + chunkSize: chunkSize, + maxChunkCount: nchunks, + } +} + +type EventRingBuffer struct { + lk sync.Mutex + chunks []*ringChunk + chunkSize int + maxChunkCount int + + broadcast func(*events.XRPCStreamEvent) +} + +type ringChunk struct { + lk sync.Mutex + buf []*events.XRPCStreamEvent +} + +func (rc *ringChunk) append(evt *events.XRPCStreamEvent) { + rc.lk.Lock() + defer rc.lk.Unlock() + rc.buf = append(rc.buf, evt) +} + +func (rc *ringChunk) events() []*events.XRPCStreamEvent { + rc.lk.Lock() + defer rc.lk.Unlock() + return rc.buf +} + +func (er *EventRingBuffer) Persist(ctx context.Context, evt *events.XRPCStreamEvent) error { + er.lk.Lock() + defer er.lk.Unlock() + + if len(er.chunks) == 0 { + er.chunks = []*ringChunk{new(ringChunk)} + } + + last := er.chunks[len(er.chunks)-1] + if len(last.buf) >= er.chunkSize { + last = new(ringChunk) + er.chunks = append(er.chunks, last) + if len(er.chunks) > er.maxChunkCount { + er.chunks = er.chunks[1:] + } + } + + last.append(evt) + + er.broadcast(evt) + return nil +} + +func (er *EventRingBuffer) Flush(context.Context) error { + return nil +} + +func (er *EventRingBuffer) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { + // run playback a few times to get as close to 'live' as possible before returning + for i := 0; i < 10; i++ { + n, err := er.playbackRound(ctx, since, cb) + if err != nil { + return err + } + + // playback had no new events + if n-since == 0 { + return nil + } + since = n + } + + return nil +} + +func (er *EventRingBuffer) playbackRound(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) (int64, error) { + // grab a snapshot of the current chunks + er.lk.Lock() + chunks := er.chunks + er.lk.Unlock() + + i := len(chunks) - 1 + for ; i >= 0; i-- { + c := chunks[i] + evts := c.events() + if since > events.SequenceForEvent(evts[len(evts)-1]) { + i++ + break + } + } + if i < 0 { + i = 0 + } + + var lastSeq int64 = since + for _, c := range chunks[i:] { + var nread int + evts := c.events() + for nread < len(evts) { + for _, e := range evts[nread:] { + nread++ + seq := events.SequenceForEvent(e) + if seq <= since { + continue + } + + if err := cb(e); err != nil { + return 0, err + } + lastSeq = seq + } + + // recheck evts buffer to see if more were added while we were here + evts = c.events() + } + } + + return lastSeq, nil +} + +func (er *EventRingBuffer) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) { + er.broadcast = brc +} + +func (er *EventRingBuffer) Shutdown(context.Context) error { + return nil +} + +func (er *EventRingBuffer) TakeDownRepo(context.Context, models.Uid) error { + return nil +} diff --git a/splitter/splitter.go b/splitter/splitter.go new file mode 100644 index 000000000..e167b7757 --- /dev/null +++ b/splitter/splitter.go @@ -0,0 +1,517 @@ +package splitter + +import ( + "context" + "errors" + "fmt" + "io" + "math/rand" + "net" + "net/http" + "os" + "strconv" + "strings" + "sync" + "time" + + "github.com/bluesky-social/indigo/bgs" + events "github.com/bluesky-social/indigo/events" + "github.com/bluesky-social/indigo/events/schedulers/sequential" + "github.com/gorilla/websocket" + logging "github.com/ipfs/go-log" + "github.com/labstack/echo/v4" + "github.com/labstack/echo/v4/middleware" + promclient "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + dto "github.com/prometheus/client_model/go" +) + +var log = logging.Logger("splitter") + +type Splitter struct { + erb *EventRingBuffer + pp *events.PebblePersist + events *events.EventManager + + // Management of Socket Consumers + consumersLk sync.RWMutex + nextConsumerID uint64 + consumers map[uint64]*SocketConsumer + + conf SplitterConfig +} + +type SplitterConfig struct { + UpstreamHost string + CursorFile string + PebbleOptions *events.PebblePersistOptions +} + +func NewMemSplitter(host string) *Splitter { + conf := SplitterConfig{ + UpstreamHost: host, + CursorFile: "cursor-file", + } + + erb := NewEventRingBuffer(20_000, 10_000) + + em := events.NewEventManager(erb) + return &Splitter{ + conf: conf, + erb: erb, + events: em, + consumers: make(map[uint64]*SocketConsumer), + } +} +func NewSplitter(conf SplitterConfig) (*Splitter, error) { + if conf.PebbleOptions == nil { + // mem splitter + erb := NewEventRingBuffer(20_000, 10_000) + + em := events.NewEventManager(erb) + return &Splitter{ + conf: conf, + erb: erb, + events: em, + consumers: make(map[uint64]*SocketConsumer), + }, nil + } else { + pp, err := events.NewPebblePersistance(conf.PebbleOptions) + if err != nil { + return nil, err + } + + go pp.GCThread(context.Background()) + em := events.NewEventManager(pp) + return &Splitter{ + conf: conf, + pp: pp, + events: em, + consumers: make(map[uint64]*SocketConsumer), + }, nil + } +} +func NewDiskSplitter(host, path string, persistHours float64, maxBytes int64) (*Splitter, error) { + ppopts := events.PebblePersistOptions{ + DbPath: path, + PersistDuration: time.Duration(float64(time.Hour) * persistHours), + GCPeriod: 5 * time.Minute, + MaxBytes: uint64(maxBytes), + } + conf := SplitterConfig{ + UpstreamHost: host, + CursorFile: "cursor-file", + PebbleOptions: &ppopts, + } + pp, err := events.NewPebblePersistance(&ppopts) + if err != nil { + return nil, err + } + + go pp.GCThread(context.Background()) + em := events.NewEventManager(pp) + return &Splitter{ + conf: conf, + pp: pp, + events: em, + consumers: make(map[uint64]*SocketConsumer), + }, nil +} + +func (s *Splitter) Start(addr string) error { + var lc net.ListenConfig + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + curs, err := s.getLastCursor() + if err != nil { + return fmt.Errorf("loading cursor failed: %w", err) + } + + go s.subscribeWithRedialer(context.Background(), s.conf.UpstreamHost, curs) + + li, err := lc.Listen(ctx, "tcp", addr) + if err != nil { + return err + } + return s.StartWithListener(li) +} + +func (s *Splitter) StartMetrics(listen string) error { + http.Handle("/metrics", promhttp.Handler()) + return http.ListenAndServe(listen, nil) +} + +func (s *Splitter) Shutdown() error { + return nil +} + +func (s *Splitter) StartWithListener(listen net.Listener) error { + e := echo.New() + e.HideBanner = true + + e.Use(middleware.CORSWithConfig(middleware.CORSConfig{ + AllowOrigins: []string{"*"}, + AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept, echo.HeaderAuthorization}, + })) + + /* + if !s.ssl { + e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{ + Format: "method=${method}, uri=${uri}, status=${status} latency=${latency_human}\n", + })) + } else { + e.Use(middleware.LoggerWithConfig(middleware.DefaultLoggerConfig)) + } + */ + + e.Use(bgs.MetricsMiddleware) + + e.HTTPErrorHandler = func(err error, ctx echo.Context) { + switch err := err.(type) { + case *echo.HTTPError: + if err2 := ctx.JSON(err.Code, map[string]any{ + "error": err.Message, + }); err2 != nil { + log.Errorf("Failed to write http error: %s", err2) + } + default: + sendHeader := true + if ctx.Path() == "/xrpc/com.atproto.sync.subscribeRepos" { + sendHeader = false + } + + log.Warnf("HANDLER ERROR: (%s) %s", ctx.Path(), err) + + if strings.HasPrefix(ctx.Path(), "/admin/") { + ctx.JSON(500, map[string]any{ + "error": err.Error(), + }) + return + } + + if sendHeader { + ctx.Response().WriteHeader(500) + } + } + } + + // TODO: this API is temporary until we formalize what we want here + + e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.EventsHandler) + e.GET("/xrpc/_health", s.HandleHealthCheck) + + // In order to support booting on random ports in tests, we need to tell the + // Echo instance it's already got a port, and then use its StartServer + // method to re-use that listener. + e.Listener = listen + srv := &http.Server{} + return e.StartServer(srv) +} + +type HealthStatus struct { + Status string `json:"status"` + Message string `json:"msg,omitempty"` +} + +func (s *Splitter) HandleHealthCheck(c echo.Context) error { + return c.JSON(200, HealthStatus{Status: "ok"}) +} + +func (s *Splitter) EventsHandler(c echo.Context) error { + var since *int64 + if sinceVal := c.QueryParam("cursor"); sinceVal != "" { + sval, err := strconv.ParseInt(sinceVal, 10, 64) + if err != nil { + return err + } + since = &sval + } + + ctx, cancel := context.WithCancel(c.Request().Context()) + defer cancel() + + // TODO: authhhh + conn, err := websocket.Upgrade(c.Response(), c.Request(), c.Response().Header(), 10<<10, 10<<10) + if err != nil { + return fmt.Errorf("upgrading websocket: %w", err) + } + + lastWriteLk := sync.Mutex{} + lastWrite := time.Now() + + // Start a goroutine to ping the client every 30 seconds to check if it's + // still alive. If the client doesn't respond to a ping within 5 seconds, + // we'll close the connection and teardown the consumer. + go func() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + lastWriteLk.Lock() + lw := lastWrite + lastWriteLk.Unlock() + + if time.Since(lw) < 30*time.Second { + continue + } + + if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second)); err != nil { + log.Errorf("failed to ping client: %s", err) + cancel() + return + } + case <-ctx.Done(): + return + } + } + }() + + conn.SetPingHandler(func(message string) error { + err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second*60)) + if err == websocket.ErrCloseSent { + return nil + } else if e, ok := err.(net.Error); ok && e.Temporary() { + return nil + } + return err + }) + + // Start a goroutine to read messages from the client and discard them. + go func() { + for { + _, _, err := conn.ReadMessage() + if err != nil { + log.Errorf("failed to read message from client: %s", err) + cancel() + return + } + } + }() + + ident := c.RealIP() + "-" + c.Request().UserAgent() + + evts, cleanup, err := s.events.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { return true }, since) + if err != nil { + return err + } + defer cleanup() + + // Keep track of the consumer for metrics and admin endpoints + consumer := SocketConsumer{ + RemoteAddr: c.RealIP(), + UserAgent: c.Request().UserAgent(), + ConnectedAt: time.Now(), + } + sentCounter := eventsSentCounter.WithLabelValues(consumer.RemoteAddr, consumer.UserAgent) + consumer.EventsSent = sentCounter + + consumerID := s.registerConsumer(&consumer) + defer s.cleanupConsumer(consumerID) + + log.Infow("new consumer", + "remote_addr", consumer.RemoteAddr, + "user_agent", consumer.UserAgent, + "cursor", since, + "consumer_id", consumerID, + ) + activeClientGauge.Inc() + defer activeClientGauge.Dec() + + for { + select { + case evt, ok := <-evts: + if !ok { + log.Error("event stream closed unexpectedly") + return nil + } + + wc, err := conn.NextWriter(websocket.BinaryMessage) + if err != nil { + log.Errorf("failed to get next writer: %s", err) + return err + } + + if evt.Preserialized != nil { + _, err = wc.Write(evt.Preserialized) + } else { + err = evt.Serialize(wc) + } + if err != nil { + return fmt.Errorf("failed to write event: %w", err) + } + + if err := wc.Close(); err != nil { + log.Warnf("failed to flush-close our event write: %s", err) + return nil + } + + lastWriteLk.Lock() + lastWrite = time.Now() + lastWriteLk.Unlock() + sentCounter.Inc() + case <-ctx.Done(): + return nil + } + } +} + +type SocketConsumer struct { + UserAgent string + RemoteAddr string + ConnectedAt time.Time + EventsSent promclient.Counter +} + +func (s *Splitter) registerConsumer(c *SocketConsumer) uint64 { + s.consumersLk.Lock() + defer s.consumersLk.Unlock() + + id := s.nextConsumerID + s.nextConsumerID++ + + s.consumers[id] = c + + return id +} + +func (s *Splitter) cleanupConsumer(id uint64) { + s.consumersLk.Lock() + defer s.consumersLk.Unlock() + + c := s.consumers[id] + + var m = &dto.Metric{} + if err := c.EventsSent.Write(m); err != nil { + log.Errorf("failed to get sent counter: %s", err) + } + + log.Infow("consumer disconnected", + "consumer_id", id, + "remote_addr", c.RemoteAddr, + "user_agent", c.UserAgent, + "events_sent", m.Counter.GetValue()) + + delete(s.consumers, id) +} + +func sleepForBackoff(b int) time.Duration { + if b == 0 { + return 0 + } + + if b < 50 { + return time.Millisecond * time.Duration(rand.Intn(100)+(5*b)) + } + + return time.Second * 5 +} + +func (s *Splitter) subscribeWithRedialer(ctx context.Context, host string, cursor int64) { + d := websocket.Dialer{} + + protocol := "wss" + + var backoff int + for { + select { + case <-ctx.Done(): + return + default: + } + + header := http.Header{ + "User-Agent": []string{"bgs-rainbow-v0"}, + } + + var url string + if cursor < 0 { + url = fmt.Sprintf("%s://%s/xrpc/com.atproto.sync.subscribeRepos", protocol, host) + } else { + url = fmt.Sprintf("%s://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", protocol, host, cursor) + } + con, res, err := d.DialContext(ctx, url, header) + if err != nil { + log.Warnw("dialing failed", "host", host, "err", err, "backoff", backoff) + time.Sleep(sleepForBackoff(backoff)) + backoff++ + + continue + } + + log.Info("event subscription response code: ", res.StatusCode) + + if err := s.handleConnection(ctx, host, con, &cursor); err != nil { + log.Warnf("connection to %q failed: %s", host, err) + } + } +} + +func (s *Splitter) handleConnection(ctx context.Context, host string, con *websocket.Conn, lastCursor *int64) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + sched := sequential.NewScheduler("splitter", func(ctx context.Context, evt *events.XRPCStreamEvent) error { + seq := events.SequenceForEvent(evt) + if seq < 0 { + // ignore info events and other unsupported types + return nil + } + + if err := s.events.AddEvent(ctx, evt); err != nil { + return err + } + + if seq%5000 == 0 { + // TODO: don't need this after we move to getting seq from pebble + if err := s.writeCursor(seq); err != nil { + log.Errorf("write cursor failed: %s", err) + } + } + + *lastCursor = seq + return nil + }) + + return events.HandleRepoStream(ctx, con, sched) +} + +func (s *Splitter) getLastCursor() (int64, error) { + if s.pp != nil { + seq, millis, _, err := s.pp.GetLast(context.Background()) + if err == nil { + log.Debugw("got last cursor from pebble", "seq", seq, "millis", millis) + return seq, nil + } else if errors.Is(err, events.ErrNoLast) { + log.Info("pebble no last") + } else { + log.Errorw("pebble seq fail", "err", err) + } + } + + fi, err := os.Open(s.conf.CursorFile) + if err != nil { + if os.IsNotExist(err) { + return -1, nil + } + return -1, err + } + + b, err := io.ReadAll(fi) + if err != nil { + return -1, err + } + + v, err := strconv.ParseInt(string(b), 10, 64) + if err != nil { + return -1, err + } + + return v, nil +} + +func (s *Splitter) writeCursor(curs int64) error { + return os.WriteFile(s.conf.CursorFile, []byte(fmt.Sprint(curs)), 0664) +}