diff --git a/cmd/splitter/main.go b/cmd/splitter/main.go new file mode 100644 index 000000000..a6d92e17e --- /dev/null +++ b/cmd/splitter/main.go @@ -0,0 +1,147 @@ +package main + +import ( + "context" + "os" + "os/signal" + "syscall" + "time" + + "github.com/bluesky-social/indigo/bgs" + "github.com/bluesky-social/indigo/util/version" + _ "go.uber.org/automaxprocs" + + _ "net/http/pprof" + + _ "github.com/joho/godotenv/autoload" + + logging "github.com/ipfs/go-log" + "github.com/urfave/cli/v2" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/sdk/resource" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" +) + +var log = logging.Logger("splitter") + +func init() { + // control log level using, eg, GOLOG_LOG_LEVEL=debug + //logging.SetAllLoggers(logging.LevelDebug) +} + +func main() { + run(os.Args) +} + +func run(args []string) { + app := cli.App{ + Name: "splitter", + Usage: "firehose proxy", + Version: version.Version, + } + + app.Flags = []cli.Flag{ + &cli.BoolFlag{ + Name: "crawl-insecure-ws", + Usage: "when connecting to PDS instances, use ws:// instead of wss://", + }, + &cli.StringFlag{ + Name: "api-listen", + Value: ":2480", + }, + &cli.StringFlag{ + Name: "metrics-listen", + Value: ":2481", + EnvVars: []string{"SPLITTER_METRICS_LISTEN"}, + }, + } + + app.Action = Splitter + err := app.Run(os.Args) + if err != nil { + log.Fatal(err) + } +} + +func Splitter(cctx *cli.Context) error { + // Trap SIGINT to trigger a shutdown. + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) + + // Enable OTLP HTTP exporter + // For relevant environment variables: + // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables + // At a minimum, you need to set + // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 + if ep := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); ep != "" { + log.Infow("setting up trace exporter", "endpoint", ep) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + exp, err := otlptracehttp.New(ctx) + if err != nil { + log.Fatalw("failed to create trace exporter", "error", err) + } + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if err := exp.Shutdown(ctx); err != nil { + log.Errorw("failed to shutdown trace exporter", "error", err) + } + }() + + tp := tracesdk.NewTracerProvider( + tracesdk.WithBatcher(exp), + tracesdk.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String("splitter"), + attribute.String("env", os.Getenv("ENVIRONMENT")), // DataDog + attribute.String("environment", os.Getenv("ENVIRONMENT")), // Others + attribute.Int64("ID", 1), + )), + ) + otel.SetTracerProvider(tp) + } + + spl := splitter.New(cctx.String("bgs-host")) + + // set up metrics endpoint + go func() { + if err := spl.StartMetrics(cctx.String("metrics-listen")); err != nil { + log.Fatalf("failed to start metrics endpoint: %s", err) + } + }() + + runErr := make(chan error, 1) + + go func() { + err := spl.Start(cctx.String("api-listen")) + runErr <- err + }() + + log.Infow("startup complete") + select { + case <-signals: + log.Info("received shutdown signal") + errs := spl.Shutdown() + for err := range errs { + log.Errorw("error during Splitter shutdown", "err", err) + } + case err := <-runErr: + if err != nil { + log.Errorw("error during Splitter startup", "err", err) + } + log.Info("shutting down") + errs := bgs.Shutdown() + for err := range errs { + log.Errorw("error during Splitter shutdown", "err", err) + } + } + + log.Info("shutdown complete") + + return nil +} diff --git a/splitter/splitter.go b/splitter/splitter.go new file mode 100644 index 000000000..efe573a09 --- /dev/null +++ b/splitter/splitter.go @@ -0,0 +1,218 @@ +package splitter + +import ( + "context" + "fmt" + "math/rand" + "sync" + "time" + + events "github.com/bluesky-social/indigo/events" + "github.com/bluesky-social/indigo/events/schedulers/sequential" + "github.com/bluesky-social/indigo/models" + "github.com/gorilla/websocket" + logging "github.com/ipfs/go-log" +) + +var log = logging.Logger("splitter") + +type Splitter struct { + Host string + erb *EventRingBuffer + events *events.EventManager +} + +func NewSplitter(host string, persister events.EventPersistence) *Splitter { + erb := NewEventRingBuffer(20000, 1000) + + em := events.NewEventManager(erb) + return &Splitter{ + Host: host, + erb: erb, + events: em, + } +} + +func (s *Splitter) Start() error { + return nil +} + +func sleepForBackoff(b int) time.Duration { + if b == 0 { + return 0 + } + + if b < 50 { + return time.Millisecond * time.Duration(rand.Intn(100)+(5*b)) + } + + return time.Second * 5 +} + +func (s *Splitter) subscribeWithRedialer(ctx context.Context, host string, cursor int64) { + d := websocket.Dialer{} + + protocol := "wss" + + var backoff int + for { + select { + case <-ctx.Done(): + return + default: + } + + url := fmt.Sprintf("%s://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", protocol, host, cursor) + con, res, err := d.DialContext(ctx, url, nil) + if err != nil { + log.Warnw("dialing failed", "host", host, "err", err, "backoff", backoff) + time.Sleep(sleepForBackoff(backoff)) + backoff++ + + continue + } + + log.Info("event subscription response code: ", res.StatusCode) + + if err := s.handleConnection(ctx, host, con, &cursor); err != nil { + log.Warnf("connection to %q failed: %s", host, err) + } + } +} + +func (s *Splitter) handleConnection(ctx context.Context, host string, con *websocket.Conn, lastCursor *int64) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + sched := sequential.NewScheduler("splitter", s.events.AddEvent) + return events.HandleRepoStream(ctx, con, sched) +} + +func sequenceForEvent(evt *events.XRPCStreamEvent) int64 { + switch { + case evt.RepoCommit != nil: + return evt.RepoCommit.Seq + case evt.RepoHandle != nil: + return evt.RepoHandle.Seq + case evt.RepoMigrate != nil: + return evt.RepoMigrate.Seq + case evt.RepoTombstone != nil: + return evt.RepoTombstone.Seq + case evt.RepoInfo != nil: + return -1 + default: + return -1 + } +} + +func NewEventRingBuffer(chunkSize, nchunks int) *EventRingBuffer { + return &EventRingBuffer{ + chunkSize: chunkSize, + maxChunkCount: nchunks, + } +} + +type EventRingBuffer struct { + lk sync.Mutex + chunks []*ringChunk + chunkSize int + maxChunkCount int + + broadcast func(*events.XRPCStreamEvent) +} + +type ringChunk struct { + lk sync.Mutex + buf []*events.XRPCStreamEvent +} + +func (rc *ringChunk) append(evt *events.XRPCStreamEvent) { + rc.lk.Lock() + defer rc.lk.Unlock() + rc.buf = append(rc.buf, evt) +} + +func (rc *ringChunk) events() []*events.XRPCStreamEvent { + rc.lk.Lock() + defer rc.lk.Unlock() + return rc.buf +} + +func (er *EventRingBuffer) Persist(ctx context.Context, evt *events.XRPCStreamEvent) error { + er.lk.Lock() + defer er.lk.Unlock() + + if len(er.chunks) == 0 { + er.chunks = []*ringChunk{new(ringChunk)} + } + + last := er.chunks[len(er.chunks)-1] + if len(last.buf) >= er.chunkSize { + last = new(ringChunk) + er.chunks = append(er.chunks, last) + if len(er.chunks) > er.maxChunkCount { + er.chunks = er.chunks[1:] + } + } + + last.append(evt) + + er.broadcast(evt) + return nil +} + +func (er *EventRingBuffer) Flush(context.Context) error { + return nil +} + +func (er *EventRingBuffer) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { + // grab a snapshot of the current chunks + er.lk.Lock() + chunks := er.chunks + er.lk.Unlock() + + i := len(chunks) - 1 + for ; i >= 0; i-- { + c := chunks[i] + evts := c.events() + if since > sequenceForEvent(evts[len(evts)-1]) { + i++ + break + } + } + + for _, c := range chunks[i:] { + var nread int + evts := c.events() + for nread < len(evts) { + for _, e := range evts[nread:] { + if since > 0 && sequenceForEvent(e) < since { + continue + } + since = 0 + + if err := cb(e); err != nil { + return err + } + } + + // recheck evts buffer to see if more were added while we were here + evts = c.events() + } + } + + // TODO: probably also check for if new chunks were added while we were iterating... + return nil +} + +func (er *EventRingBuffer) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) { + er.broadcast = brc +} + +func (er *EventRingBuffer) Shutdown(context.Context) error { + return nil +} + +func (er *EventRingBuffer) TakeDownRepo(context.Context, models.Uid) error { + return nil +}