From b885639980e3ff066421b901edfe9533de0cb1d8 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 10 Oct 2023 11:37:24 -0700 Subject: [PATCH 1/7] WIP: creating bgs splitter daemon --- cmd/splitter/main.go | 147 +++++++++++++++++++++++++++++ splitter/splitter.go | 218 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 365 insertions(+) create mode 100644 cmd/splitter/main.go create mode 100644 splitter/splitter.go diff --git a/cmd/splitter/main.go b/cmd/splitter/main.go new file mode 100644 index 000000000..a6d92e17e --- /dev/null +++ b/cmd/splitter/main.go @@ -0,0 +1,147 @@ +package main + +import ( + "context" + "os" + "os/signal" + "syscall" + "time" + + "github.com/bluesky-social/indigo/bgs" + "github.com/bluesky-social/indigo/util/version" + _ "go.uber.org/automaxprocs" + + _ "net/http/pprof" + + _ "github.com/joho/godotenv/autoload" + + logging "github.com/ipfs/go-log" + "github.com/urfave/cli/v2" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/sdk/resource" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" +) + +var log = logging.Logger("splitter") + +func init() { + // control log level using, eg, GOLOG_LOG_LEVEL=debug + //logging.SetAllLoggers(logging.LevelDebug) +} + +func main() { + run(os.Args) +} + +func run(args []string) { + app := cli.App{ + Name: "splitter", + Usage: "firehose proxy", + Version: version.Version, + } + + app.Flags = []cli.Flag{ + &cli.BoolFlag{ + Name: "crawl-insecure-ws", + Usage: "when connecting to PDS instances, use ws:// instead of wss://", + }, + &cli.StringFlag{ + Name: "api-listen", + Value: ":2480", + }, + &cli.StringFlag{ + Name: "metrics-listen", + Value: ":2481", + EnvVars: []string{"SPLITTER_METRICS_LISTEN"}, + }, + } + + app.Action = Splitter + err := app.Run(os.Args) + if err != nil { + log.Fatal(err) + } +} + +func Splitter(cctx *cli.Context) error { + // Trap SIGINT to trigger a shutdown. + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) + + // Enable OTLP HTTP exporter + // For relevant environment variables: + // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables + // At a minimum, you need to set + // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 + if ep := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); ep != "" { + log.Infow("setting up trace exporter", "endpoint", ep) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + exp, err := otlptracehttp.New(ctx) + if err != nil { + log.Fatalw("failed to create trace exporter", "error", err) + } + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if err := exp.Shutdown(ctx); err != nil { + log.Errorw("failed to shutdown trace exporter", "error", err) + } + }() + + tp := tracesdk.NewTracerProvider( + tracesdk.WithBatcher(exp), + tracesdk.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String("splitter"), + attribute.String("env", os.Getenv("ENVIRONMENT")), // DataDog + attribute.String("environment", os.Getenv("ENVIRONMENT")), // Others + attribute.Int64("ID", 1), + )), + ) + otel.SetTracerProvider(tp) + } + + spl := splitter.New(cctx.String("bgs-host")) + + // set up metrics endpoint + go func() { + if err := spl.StartMetrics(cctx.String("metrics-listen")); err != nil { + log.Fatalf("failed to start metrics endpoint: %s", err) + } + }() + + runErr := make(chan error, 1) + + go func() { + err := spl.Start(cctx.String("api-listen")) + runErr <- err + }() + + log.Infow("startup complete") + select { + case <-signals: + log.Info("received shutdown signal") + errs := spl.Shutdown() + for err := range errs { + log.Errorw("error during Splitter shutdown", "err", err) + } + case err := <-runErr: + if err != nil { + log.Errorw("error during Splitter startup", "err", err) + } + log.Info("shutting down") + errs := bgs.Shutdown() + for err := range errs { + log.Errorw("error during Splitter shutdown", "err", err) + } + } + + log.Info("shutdown complete") + + return nil +} diff --git a/splitter/splitter.go b/splitter/splitter.go new file mode 100644 index 000000000..efe573a09 --- /dev/null +++ b/splitter/splitter.go @@ -0,0 +1,218 @@ +package splitter + +import ( + "context" + "fmt" + "math/rand" + "sync" + "time" + + events "github.com/bluesky-social/indigo/events" + "github.com/bluesky-social/indigo/events/schedulers/sequential" + "github.com/bluesky-social/indigo/models" + "github.com/gorilla/websocket" + logging "github.com/ipfs/go-log" +) + +var log = logging.Logger("splitter") + +type Splitter struct { + Host string + erb *EventRingBuffer + events *events.EventManager +} + +func NewSplitter(host string, persister events.EventPersistence) *Splitter { + erb := NewEventRingBuffer(20000, 1000) + + em := events.NewEventManager(erb) + return &Splitter{ + Host: host, + erb: erb, + events: em, + } +} + +func (s *Splitter) Start() error { + return nil +} + +func sleepForBackoff(b int) time.Duration { + if b == 0 { + return 0 + } + + if b < 50 { + return time.Millisecond * time.Duration(rand.Intn(100)+(5*b)) + } + + return time.Second * 5 +} + +func (s *Splitter) subscribeWithRedialer(ctx context.Context, host string, cursor int64) { + d := websocket.Dialer{} + + protocol := "wss" + + var backoff int + for { + select { + case <-ctx.Done(): + return + default: + } + + url := fmt.Sprintf("%s://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", protocol, host, cursor) + con, res, err := d.DialContext(ctx, url, nil) + if err != nil { + log.Warnw("dialing failed", "host", host, "err", err, "backoff", backoff) + time.Sleep(sleepForBackoff(backoff)) + backoff++ + + continue + } + + log.Info("event subscription response code: ", res.StatusCode) + + if err := s.handleConnection(ctx, host, con, &cursor); err != nil { + log.Warnf("connection to %q failed: %s", host, err) + } + } +} + +func (s *Splitter) handleConnection(ctx context.Context, host string, con *websocket.Conn, lastCursor *int64) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + sched := sequential.NewScheduler("splitter", s.events.AddEvent) + return events.HandleRepoStream(ctx, con, sched) +} + +func sequenceForEvent(evt *events.XRPCStreamEvent) int64 { + switch { + case evt.RepoCommit != nil: + return evt.RepoCommit.Seq + case evt.RepoHandle != nil: + return evt.RepoHandle.Seq + case evt.RepoMigrate != nil: + return evt.RepoMigrate.Seq + case evt.RepoTombstone != nil: + return evt.RepoTombstone.Seq + case evt.RepoInfo != nil: + return -1 + default: + return -1 + } +} + +func NewEventRingBuffer(chunkSize, nchunks int) *EventRingBuffer { + return &EventRingBuffer{ + chunkSize: chunkSize, + maxChunkCount: nchunks, + } +} + +type EventRingBuffer struct { + lk sync.Mutex + chunks []*ringChunk + chunkSize int + maxChunkCount int + + broadcast func(*events.XRPCStreamEvent) +} + +type ringChunk struct { + lk sync.Mutex + buf []*events.XRPCStreamEvent +} + +func (rc *ringChunk) append(evt *events.XRPCStreamEvent) { + rc.lk.Lock() + defer rc.lk.Unlock() + rc.buf = append(rc.buf, evt) +} + +func (rc *ringChunk) events() []*events.XRPCStreamEvent { + rc.lk.Lock() + defer rc.lk.Unlock() + return rc.buf +} + +func (er *EventRingBuffer) Persist(ctx context.Context, evt *events.XRPCStreamEvent) error { + er.lk.Lock() + defer er.lk.Unlock() + + if len(er.chunks) == 0 { + er.chunks = []*ringChunk{new(ringChunk)} + } + + last := er.chunks[len(er.chunks)-1] + if len(last.buf) >= er.chunkSize { + last = new(ringChunk) + er.chunks = append(er.chunks, last) + if len(er.chunks) > er.maxChunkCount { + er.chunks = er.chunks[1:] + } + } + + last.append(evt) + + er.broadcast(evt) + return nil +} + +func (er *EventRingBuffer) Flush(context.Context) error { + return nil +} + +func (er *EventRingBuffer) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { + // grab a snapshot of the current chunks + er.lk.Lock() + chunks := er.chunks + er.lk.Unlock() + + i := len(chunks) - 1 + for ; i >= 0; i-- { + c := chunks[i] + evts := c.events() + if since > sequenceForEvent(evts[len(evts)-1]) { + i++ + break + } + } + + for _, c := range chunks[i:] { + var nread int + evts := c.events() + for nread < len(evts) { + for _, e := range evts[nread:] { + if since > 0 && sequenceForEvent(e) < since { + continue + } + since = 0 + + if err := cb(e); err != nil { + return err + } + } + + // recheck evts buffer to see if more were added while we were here + evts = c.events() + } + } + + // TODO: probably also check for if new chunks were added while we were iterating... + return nil +} + +func (er *EventRingBuffer) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) { + er.broadcast = brc +} + +func (er *EventRingBuffer) Shutdown(context.Context) error { + return nil +} + +func (er *EventRingBuffer) TakeDownRepo(context.Context, models.Uid) error { + return nil +} From 8fcce340522e796e3359a5267ce6970359b47215 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 11 Oct 2023 10:33:33 -0700 Subject: [PATCH 2/7] splitter is now functional --- cmd/splitter/main.go | 147 -------------------- splitter/splitter.go | 314 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 309 insertions(+), 152 deletions(-) delete mode 100644 cmd/splitter/main.go diff --git a/cmd/splitter/main.go b/cmd/splitter/main.go deleted file mode 100644 index a6d92e17e..000000000 --- a/cmd/splitter/main.go +++ /dev/null @@ -1,147 +0,0 @@ -package main - -import ( - "context" - "os" - "os/signal" - "syscall" - "time" - - "github.com/bluesky-social/indigo/bgs" - "github.com/bluesky-social/indigo/util/version" - _ "go.uber.org/automaxprocs" - - _ "net/http/pprof" - - _ "github.com/joho/godotenv/autoload" - - logging "github.com/ipfs/go-log" - "github.com/urfave/cli/v2" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" - "go.opentelemetry.io/otel/sdk/resource" - tracesdk "go.opentelemetry.io/otel/sdk/trace" - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" -) - -var log = logging.Logger("splitter") - -func init() { - // control log level using, eg, GOLOG_LOG_LEVEL=debug - //logging.SetAllLoggers(logging.LevelDebug) -} - -func main() { - run(os.Args) -} - -func run(args []string) { - app := cli.App{ - Name: "splitter", - Usage: "firehose proxy", - Version: version.Version, - } - - app.Flags = []cli.Flag{ - &cli.BoolFlag{ - Name: "crawl-insecure-ws", - Usage: "when connecting to PDS instances, use ws:// instead of wss://", - }, - &cli.StringFlag{ - Name: "api-listen", - Value: ":2480", - }, - &cli.StringFlag{ - Name: "metrics-listen", - Value: ":2481", - EnvVars: []string{"SPLITTER_METRICS_LISTEN"}, - }, - } - - app.Action = Splitter - err := app.Run(os.Args) - if err != nil { - log.Fatal(err) - } -} - -func Splitter(cctx *cli.Context) error { - // Trap SIGINT to trigger a shutdown. - signals := make(chan os.Signal, 1) - signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) - - // Enable OTLP HTTP exporter - // For relevant environment variables: - // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables - // At a minimum, you need to set - // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 - if ep := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); ep != "" { - log.Infow("setting up trace exporter", "endpoint", ep) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - exp, err := otlptracehttp.New(ctx) - if err != nil { - log.Fatalw("failed to create trace exporter", "error", err) - } - defer func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - if err := exp.Shutdown(ctx); err != nil { - log.Errorw("failed to shutdown trace exporter", "error", err) - } - }() - - tp := tracesdk.NewTracerProvider( - tracesdk.WithBatcher(exp), - tracesdk.WithResource(resource.NewWithAttributes( - semconv.SchemaURL, - semconv.ServiceNameKey.String("splitter"), - attribute.String("env", os.Getenv("ENVIRONMENT")), // DataDog - attribute.String("environment", os.Getenv("ENVIRONMENT")), // Others - attribute.Int64("ID", 1), - )), - ) - otel.SetTracerProvider(tp) - } - - spl := splitter.New(cctx.String("bgs-host")) - - // set up metrics endpoint - go func() { - if err := spl.StartMetrics(cctx.String("metrics-listen")); err != nil { - log.Fatalf("failed to start metrics endpoint: %s", err) - } - }() - - runErr := make(chan error, 1) - - go func() { - err := spl.Start(cctx.String("api-listen")) - runErr <- err - }() - - log.Infow("startup complete") - select { - case <-signals: - log.Info("received shutdown signal") - errs := spl.Shutdown() - for err := range errs { - log.Errorw("error during Splitter shutdown", "err", err) - } - case err := <-runErr: - if err != nil { - log.Errorw("error during Splitter startup", "err", err) - } - log.Info("shutting down") - errs := bgs.Shutdown() - for err := range errs { - log.Errorw("error during Splitter shutdown", "err", err) - } - } - - log.Info("shutdown complete") - - return nil -} diff --git a/splitter/splitter.go b/splitter/splitter.go index efe573a09..059a82fcd 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -4,14 +4,25 @@ import ( "context" "fmt" "math/rand" + "net" + "net/http" + "strconv" + "strings" "sync" "time" + "github.com/bluesky-social/indigo/bgs" events "github.com/bluesky-social/indigo/events" "github.com/bluesky-social/indigo/events/schedulers/sequential" + lexutil "github.com/bluesky-social/indigo/lex/util" "github.com/bluesky-social/indigo/models" "github.com/gorilla/websocket" logging "github.com/ipfs/go-log" + "github.com/labstack/echo/v4" + "github.com/labstack/echo/v4/middleware" + promclient "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + dto "github.com/prometheus/client_model/go" ) var log = logging.Logger("splitter") @@ -20,23 +31,315 @@ type Splitter struct { Host string erb *EventRingBuffer events *events.EventManager + + // Management of Socket Consumers + consumersLk sync.RWMutex + nextConsumerID uint64 + consumers map[uint64]*SocketConsumer } -func NewSplitter(host string, persister events.EventPersistence) *Splitter { +func NewSplitter(host string) *Splitter { erb := NewEventRingBuffer(20000, 1000) em := events.NewEventManager(erb) return &Splitter{ - Host: host, - erb: erb, - events: em, + Host: host, + erb: erb, + events: em, + consumers: make(map[uint64]*SocketConsumer), + } +} + +func (s *Splitter) Start(addr string) error { + var lc net.ListenConfig + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + go s.subscribeWithRedialer(context.Background(), s.Host, 0) + + li, err := lc.Listen(ctx, "tcp", addr) + if err != nil { + return err } + return s.StartWithListener(li) +} + +func (s *Splitter) StartMetrics(listen string) error { + http.Handle("/metrics", promhttp.Handler()) + return http.ListenAndServe(listen, nil) } -func (s *Splitter) Start() error { +func (s *Splitter) Shutdown() error { return nil } +func (s *Splitter) StartWithListener(listen net.Listener) error { + e := echo.New() + e.HideBanner = true + + e.Use(middleware.CORSWithConfig(middleware.CORSConfig{ + AllowOrigins: []string{"http://localhost:*", "https://bgs.bsky-sandbox.dev"}, + AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept, echo.HeaderAuthorization}, + })) + + /* + if !s.ssl { + e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{ + Format: "method=${method}, uri=${uri}, status=${status} latency=${latency_human}\n", + })) + } else { + e.Use(middleware.LoggerWithConfig(middleware.DefaultLoggerConfig)) + } + */ + + e.Use(bgs.MetricsMiddleware) + + e.HTTPErrorHandler = func(err error, ctx echo.Context) { + switch err := err.(type) { + case *echo.HTTPError: + if err2 := ctx.JSON(err.Code, map[string]any{ + "error": err.Message, + }); err2 != nil { + log.Errorf("Failed to write http error: %s", err2) + } + default: + sendHeader := true + if ctx.Path() == "/xrpc/com.atproto.sync.subscribeRepos" { + sendHeader = false + } + + log.Warnf("HANDLER ERROR: (%s) %s", ctx.Path(), err) + + if strings.HasPrefix(ctx.Path(), "/admin/") { + ctx.JSON(500, map[string]any{ + "error": err.Error(), + }) + return + } + + if sendHeader { + ctx.Response().WriteHeader(500) + } + } + } + + // TODO: this API is temporary until we formalize what we want here + + e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.EventsHandler) + e.GET("/xrpc/_health", s.HandleHealthCheck) + + // In order to support booting on random ports in tests, we need to tell the + // Echo instance it's already got a port, and then use its StartServer + // method to re-use that listener. + e.Listener = listen + srv := &http.Server{} + return e.StartServer(srv) +} + +type HealthStatus struct { + Status string `json:"status"` + Message string `json:"msg,omitempty"` +} + +func (s *Splitter) HandleHealthCheck(c echo.Context) error { + return c.JSON(200, HealthStatus{Status: "ok"}) +} + +func (s *Splitter) EventsHandler(c echo.Context) error { + var since *int64 + if sinceVal := c.QueryParam("cursor"); sinceVal != "" { + sval, err := strconv.ParseInt(sinceVal, 10, 64) + if err != nil { + return err + } + since = &sval + } + + ctx, cancel := context.WithCancel(c.Request().Context()) + defer cancel() + + // TODO: authhhh + conn, err := websocket.Upgrade(c.Response(), c.Request(), c.Response().Header(), 10<<10, 10<<10) + if err != nil { + return fmt.Errorf("upgrading websocket: %w", err) + } + + lastWriteLk := sync.Mutex{} + lastWrite := time.Now() + + // Start a goroutine to ping the client every 30 seconds to check if it's + // still alive. If the client doesn't respond to a ping within 5 seconds, + // we'll close the connection and teardown the consumer. + go func() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + lastWriteLk.Lock() + lw := lastWrite + lastWriteLk.Unlock() + + if time.Since(lw) < 30*time.Second { + continue + } + + if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second)); err != nil { + log.Errorf("failed to ping client: %s", err) + cancel() + return + } + case <-ctx.Done(): + return + } + } + }() + + conn.SetPingHandler(func(message string) error { + err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second*60)) + if err == websocket.ErrCloseSent { + return nil + } else if e, ok := err.(net.Error); ok && e.Temporary() { + return nil + } + return err + }) + + // Start a goroutine to read messages from the client and discard them. + go func() { + for { + _, _, err := conn.ReadMessage() + if err != nil { + log.Errorf("failed to read message from client: %s", err) + cancel() + return + } + } + }() + + ident := c.RealIP() + "-" + c.Request().UserAgent() + + evts, cleanup, err := s.events.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { return true }, since) + if err != nil { + return err + } + defer cleanup() + + // Keep track of the consumer for metrics and admin endpoints + consumer := SocketConsumer{ + RemoteAddr: c.RealIP(), + UserAgent: c.Request().UserAgent(), + ConnectedAt: time.Now(), + } + sentCounter := eventsSentCounter.WithLabelValues(consumer.RemoteAddr, consumer.UserAgent) + consumer.EventsSent = sentCounter + + consumerID := s.registerConsumer(&consumer) + defer s.cleanupConsumer(consumerID) + + log.Infow("new consumer", + "remote_addr", consumer.RemoteAddr, + "user_agent", consumer.UserAgent, + "cursor", since, + "consumer_id", consumerID, + ) + + header := events.EventHeader{Op: events.EvtKindMessage} + for { + select { + case evt := <-evts: + wc, err := conn.NextWriter(websocket.BinaryMessage) + if err != nil { + log.Errorf("failed to get next writer: %s", err) + return err + } + + var obj lexutil.CBOR + + switch { + case evt.Error != nil: + header.Op = events.EvtKindErrorFrame + obj = evt.Error + case evt.RepoCommit != nil: + header.MsgType = "#commit" + obj = evt.RepoCommit + case evt.RepoHandle != nil: + header.MsgType = "#handle" + obj = evt.RepoHandle + case evt.RepoInfo != nil: + header.MsgType = "#info" + obj = evt.RepoInfo + case evt.RepoMigrate != nil: + header.MsgType = "#migrate" + obj = evt.RepoMigrate + case evt.RepoTombstone != nil: + header.MsgType = "#tombstone" + obj = evt.RepoTombstone + default: + return fmt.Errorf("unrecognized event kind") + } + + if err := header.MarshalCBOR(wc); err != nil { + return fmt.Errorf("failed to write header: %w", err) + } + + if err := obj.MarshalCBOR(wc); err != nil { + return fmt.Errorf("failed to write event: %w", err) + } + + if err := wc.Close(); err != nil { + return fmt.Errorf("failed to flush-close our event write: %w", err) + } + lastWriteLk.Lock() + lastWrite = time.Now() + lastWriteLk.Unlock() + sentCounter.Inc() + case <-ctx.Done(): + return nil + } + } +} + +type SocketConsumer struct { + UserAgent string + RemoteAddr string + ConnectedAt time.Time + EventsSent promclient.Counter +} + +func (s *Splitter) registerConsumer(c *SocketConsumer) uint64 { + s.consumersLk.Lock() + defer s.consumersLk.Unlock() + + id := s.nextConsumerID + s.nextConsumerID++ + + s.consumers[id] = c + + return id +} + +func (s *Splitter) cleanupConsumer(id uint64) { + s.consumersLk.Lock() + defer s.consumersLk.Unlock() + + c := s.consumers[id] + + var m = &dto.Metric{} + if err := c.EventsSent.Write(m); err != nil { + log.Errorf("failed to get sent counter: %s", err) + } + + log.Infow("consumer disconnected", + "consumer_id", id, + "remote_addr", c.RemoteAddr, + "user_agent", c.UserAgent, + "events_sent", m.Counter.GetValue()) + + delete(s.consumers, id) +} + func sleepForBackoff(b int) time.Duration { if b == 0 { return 0 @@ -139,6 +442,7 @@ func (rc *ringChunk) events() []*events.XRPCStreamEvent { } func (er *EventRingBuffer) Persist(ctx context.Context, evt *events.XRPCStreamEvent) error { + fmt.Println("persist event", sequenceForEvent(evt)) er.lk.Lock() defer er.lk.Unlock() From a1875ea37d5e159fc0467cf9a01fd28a9ab09e13 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 12 Oct 2023 11:59:49 -0700 Subject: [PATCH 3/7] fix edge cases around playback-to-live transition --- cmd/rainbow/main.go | 149 ++++++++++++++++++++++++++++++++++++++ splitter/metrics.go | 11 +++ splitter/ringbuf.go | 144 +++++++++++++++++++++++++++++++++++++ splitter/splitter.go | 168 +++++++++++++++---------------------------- 4 files changed, 361 insertions(+), 111 deletions(-) create mode 100644 cmd/rainbow/main.go create mode 100644 splitter/metrics.go create mode 100644 splitter/ringbuf.go diff --git a/cmd/rainbow/main.go b/cmd/rainbow/main.go new file mode 100644 index 000000000..de3c194a9 --- /dev/null +++ b/cmd/rainbow/main.go @@ -0,0 +1,149 @@ +package main + +import ( + "context" + "os" + "os/signal" + "syscall" + "time" + + "github.com/bluesky-social/indigo/splitter" + "github.com/bluesky-social/indigo/util/version" + _ "go.uber.org/automaxprocs" + + _ "net/http/pprof" + + _ "github.com/joho/godotenv/autoload" + + logging "github.com/ipfs/go-log" + "github.com/urfave/cli/v2" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/sdk/resource" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" +) + +var log = logging.Logger("splitter") + +func init() { + // control log level using, eg, GOLOG_LOG_LEVEL=debug + logging.SetAllLoggers(logging.LevelDebug) +} + +func main() { + run(os.Args) +} + +func run(args []string) { + app := cli.App{ + Name: "splitter", + Usage: "firehose proxy", + Version: version.Version, + } + + app.Flags = []cli.Flag{ + &cli.BoolFlag{ + Name: "crawl-insecure-ws", + Usage: "when connecting to PDS instances, use ws:// instead of wss://", + }, + &cli.StringFlag{ + Name: "splitter-host", + Value: "bsky.network", + }, + &cli.StringFlag{ + Name: "api-listen", + Value: ":2480", + }, + &cli.StringFlag{ + Name: "metrics-listen", + Value: ":2481", + EnvVars: []string{"SPLITTER_METRICS_LISTEN"}, + }, + } + + app.Action = Splitter + err := app.Run(os.Args) + if err != nil { + log.Fatal(err) + } +} + +func Splitter(cctx *cli.Context) error { + // Trap SIGINT to trigger a shutdown. + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) + + // Enable OTLP HTTP exporter + // For relevant environment variables: + // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables + // At a minimum, you need to set + // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 + if ep := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); ep != "" { + log.Infow("setting up trace exporter", "endpoint", ep) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + exp, err := otlptracehttp.New(ctx) + if err != nil { + log.Fatalw("failed to create trace exporter", "error", err) + } + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if err := exp.Shutdown(ctx); err != nil { + log.Errorw("failed to shutdown trace exporter", "error", err) + } + }() + + tp := tracesdk.NewTracerProvider( + tracesdk.WithBatcher(exp), + tracesdk.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String("splitter"), + attribute.String("env", os.Getenv("ENVIRONMENT")), // DataDog + attribute.String("environment", os.Getenv("ENVIRONMENT")), // Others + attribute.Int64("ID", 1), + )), + ) + otel.SetTracerProvider(tp) + } + + spl := splitter.NewSplitter(cctx.String("splitter-host")) + + // set up metrics endpoint + go func() { + if err := spl.StartMetrics(cctx.String("metrics-listen")); err != nil { + log.Fatalf("failed to start metrics endpoint: %s", err) + } + }() + + runErr := make(chan error, 1) + + go func() { + err := spl.Start(cctx.String("api-listen")) + runErr <- err + }() + + log.Infow("startup complete") + select { + case <-signals: + log.Info("received shutdown signal") + if err := spl.Shutdown(); err != nil { + log.Errorw("error during Splitter shutdown", "err", err) + } + case err := <-runErr: + if err != nil { + log.Errorw("error during Splitter startup", "err", err) + } + log.Info("shutting down") + if err := spl.Shutdown(); err != nil { + log.Errorw("error during Splitter shutdown", "err", err) + } + } + + log.Info("shutdown complete") + + return nil +} diff --git a/splitter/metrics.go b/splitter/metrics.go new file mode 100644 index 000000000..a2173a639 --- /dev/null +++ b/splitter/metrics.go @@ -0,0 +1,11 @@ +package splitter + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var eventsSentCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "spl_events_sent_counter", + Help: "The total number of events sent to consumers", +}, []string{"remote_addr", "user_agent"}) diff --git a/splitter/ringbuf.go b/splitter/ringbuf.go new file mode 100644 index 000000000..9168cdf04 --- /dev/null +++ b/splitter/ringbuf.go @@ -0,0 +1,144 @@ +package splitter + +import ( + "context" + "sync" + + events "github.com/bluesky-social/indigo/events" + "github.com/bluesky-social/indigo/models" +) + +func NewEventRingBuffer(chunkSize, nchunks int) *EventRingBuffer { + return &EventRingBuffer{ + chunkSize: chunkSize, + maxChunkCount: nchunks, + } +} + +type EventRingBuffer struct { + lk sync.Mutex + chunks []*ringChunk + chunkSize int + maxChunkCount int + + broadcast func(*events.XRPCStreamEvent) +} + +type ringChunk struct { + lk sync.Mutex + buf []*events.XRPCStreamEvent +} + +func (rc *ringChunk) append(evt *events.XRPCStreamEvent) { + rc.lk.Lock() + defer rc.lk.Unlock() + rc.buf = append(rc.buf, evt) +} + +func (rc *ringChunk) events() []*events.XRPCStreamEvent { + rc.lk.Lock() + defer rc.lk.Unlock() + return rc.buf +} + +func (er *EventRingBuffer) Persist(ctx context.Context, evt *events.XRPCStreamEvent) error { + er.lk.Lock() + defer er.lk.Unlock() + + if len(er.chunks) == 0 { + er.chunks = []*ringChunk{new(ringChunk)} + } + + last := er.chunks[len(er.chunks)-1] + if len(last.buf) >= er.chunkSize { + last = new(ringChunk) + er.chunks = append(er.chunks, last) + if len(er.chunks) > er.maxChunkCount { + er.chunks = er.chunks[1:] + } + } + + last.append(evt) + + er.broadcast(evt) + return nil +} + +func (er *EventRingBuffer) Flush(context.Context) error { + return nil +} + +func (er *EventRingBuffer) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { + // run playback a few times to get as close to 'live' as possible before returning + for i := 0; i < 10; i++ { + n, err := er.playbackRound(ctx, since, cb) + if err != nil { + return err + } + + // playback had no new events + if n-since == 0 { + return nil + } + since = n + } + + return nil +} + +func (er *EventRingBuffer) playbackRound(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) (int64, error) { + // grab a snapshot of the current chunks + er.lk.Lock() + chunks := er.chunks + er.lk.Unlock() + + i := len(chunks) - 1 + for ; i >= 0; i-- { + c := chunks[i] + evts := c.events() + if since > sequenceForEvent(evts[len(evts)-1]) { + i++ + break + } + } + if i < 0 { + i = 0 + } + + var lastSeq int64 = since + for _, c := range chunks[i:] { + var nread int + evts := c.events() + for nread < len(evts) { + for _, e := range evts[nread:] { + nread++ + seq := sequenceForEvent(e) + if seq <= since { + continue + } + + if err := cb(e); err != nil { + return 0, err + } + lastSeq = seq + } + + // recheck evts buffer to see if more were added while we were here + evts = c.events() + } + } + + return lastSeq, nil +} + +func (er *EventRingBuffer) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) { + er.broadcast = brc +} + +func (er *EventRingBuffer) Shutdown(context.Context) error { + return nil +} + +func (er *EventRingBuffer) TakeDownRepo(context.Context, models.Uid) error { + return nil +} diff --git a/splitter/splitter.go b/splitter/splitter.go index 059a82fcd..b8030c534 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -3,9 +3,11 @@ package splitter import ( "context" "fmt" + "io" "math/rand" "net" "net/http" + "os" "strconv" "strings" "sync" @@ -15,7 +17,6 @@ import ( events "github.com/bluesky-social/indigo/events" "github.com/bluesky-social/indigo/events/schedulers/sequential" lexutil "github.com/bluesky-social/indigo/lex/util" - "github.com/bluesky-social/indigo/models" "github.com/gorilla/websocket" logging "github.com/ipfs/go-log" "github.com/labstack/echo/v4" @@ -32,6 +33,9 @@ type Splitter struct { erb *EventRingBuffer events *events.EventManager + // cursor storage + cursorFile string + // Management of Socket Consumers consumersLk sync.RWMutex nextConsumerID uint64 @@ -39,14 +43,15 @@ type Splitter struct { } func NewSplitter(host string) *Splitter { - erb := NewEventRingBuffer(20000, 1000) + erb := NewEventRingBuffer(20_000, 1000) em := events.NewEventManager(erb) return &Splitter{ - Host: host, - erb: erb, - events: em, - consumers: make(map[uint64]*SocketConsumer), + cursorFile: "cursor-file", + Host: host, + erb: erb, + events: em, + consumers: make(map[uint64]*SocketConsumer), } } @@ -55,7 +60,12 @@ func (s *Splitter) Start(addr string) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - go s.subscribeWithRedialer(context.Background(), s.Host, 0) + curs, err := s.getLastCursor() + if err != nil { + return fmt.Errorf("loading cursor failed: %w", err) + } + + go s.subscribeWithRedialer(context.Background(), s.Host, curs) li, err := lc.Listen(ctx, "tcp", addr) if err != nil { @@ -365,8 +375,12 @@ func (s *Splitter) subscribeWithRedialer(ctx context.Context, host string, curso default: } + header := http.Header{ + "User-Agent": []string{"bgs-rainbow-v0"}, + } + url := fmt.Sprintf("%s://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", protocol, host, cursor) - con, res, err := d.DialContext(ctx, url, nil) + con, res, err := d.DialContext(ctx, url, header) if err != nil { log.Warnw("dialing failed", "host", host, "err", err, "backoff", backoff) time.Sleep(sleepForBackoff(backoff)) @@ -387,7 +401,26 @@ func (s *Splitter) handleConnection(ctx context.Context, host string, con *webso ctx, cancel := context.WithCancel(ctx) defer cancel() - sched := sequential.NewScheduler("splitter", s.events.AddEvent) + sched := sequential.NewScheduler("splitter", func(ctx context.Context, evt *events.XRPCStreamEvent) error { + seq := sequenceForEvent(evt) + if seq < 0 { + // ignore info events and other unsupported types + return nil + } + + if err := s.events.AddEvent(ctx, evt); err != nil { + return err + } + + if seq%5000 == 0 { + if err := s.writeCursor(seq); err != nil { + log.Errorf("write cursor failed: %s", err) + } + } + + *lastCursor = seq + return nil + }) return events.HandleRepoStream(ctx, con, sched) } @@ -408,115 +441,28 @@ func sequenceForEvent(evt *events.XRPCStreamEvent) int64 { } } -func NewEventRingBuffer(chunkSize, nchunks int) *EventRingBuffer { - return &EventRingBuffer{ - chunkSize: chunkSize, - maxChunkCount: nchunks, - } -} - -type EventRingBuffer struct { - lk sync.Mutex - chunks []*ringChunk - chunkSize int - maxChunkCount int - - broadcast func(*events.XRPCStreamEvent) -} - -type ringChunk struct { - lk sync.Mutex - buf []*events.XRPCStreamEvent -} - -func (rc *ringChunk) append(evt *events.XRPCStreamEvent) { - rc.lk.Lock() - defer rc.lk.Unlock() - rc.buf = append(rc.buf, evt) -} - -func (rc *ringChunk) events() []*events.XRPCStreamEvent { - rc.lk.Lock() - defer rc.lk.Unlock() - return rc.buf -} - -func (er *EventRingBuffer) Persist(ctx context.Context, evt *events.XRPCStreamEvent) error { - fmt.Println("persist event", sequenceForEvent(evt)) - er.lk.Lock() - defer er.lk.Unlock() - - if len(er.chunks) == 0 { - er.chunks = []*ringChunk{new(ringChunk)} - } - - last := er.chunks[len(er.chunks)-1] - if len(last.buf) >= er.chunkSize { - last = new(ringChunk) - er.chunks = append(er.chunks, last) - if len(er.chunks) > er.maxChunkCount { - er.chunks = er.chunks[1:] +func (s *Splitter) getLastCursor() (int64, error) { + fi, err := os.Open(s.cursorFile) + if err != nil { + if os.IsNotExist(err) { + return 0, nil } + return 0, err } - last.append(evt) - - er.broadcast(evt) - return nil -} - -func (er *EventRingBuffer) Flush(context.Context) error { - return nil -} - -func (er *EventRingBuffer) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { - // grab a snapshot of the current chunks - er.lk.Lock() - chunks := er.chunks - er.lk.Unlock() - - i := len(chunks) - 1 - for ; i >= 0; i-- { - c := chunks[i] - evts := c.events() - if since > sequenceForEvent(evts[len(evts)-1]) { - i++ - break - } + b, err := io.ReadAll(fi) + if err != nil { + return 0, err } - for _, c := range chunks[i:] { - var nread int - evts := c.events() - for nread < len(evts) { - for _, e := range evts[nread:] { - if since > 0 && sequenceForEvent(e) < since { - continue - } - since = 0 - - if err := cb(e); err != nil { - return err - } - } - - // recheck evts buffer to see if more were added while we were here - evts = c.events() - } + v, err := strconv.ParseInt(string(b), 10, 64) + if err != nil { + return 0, err } - // TODO: probably also check for if new chunks were added while we were iterating... - return nil -} - -func (er *EventRingBuffer) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) { - er.broadcast = brc + return v, nil } -func (er *EventRingBuffer) Shutdown(context.Context) error { - return nil -} - -func (er *EventRingBuffer) TakeDownRepo(context.Context, models.Uid) error { - return nil +func (s *Splitter) writeCursor(curs int64) error { + return os.WriteFile(s.cursorFile, []byte(fmt.Sprint(curs)), 0664) } From 07c8e94359abd140211ac798f0c270d2e00d107d Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 13 Nov 2024 17:08:24 -0800 Subject: [PATCH 4/7] update version stuff --- cmd/rainbow/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/rainbow/main.go b/cmd/rainbow/main.go index de3c194a9..5d6ba14c6 100644 --- a/cmd/rainbow/main.go +++ b/cmd/rainbow/main.go @@ -8,7 +8,7 @@ import ( "time" "github.com/bluesky-social/indigo/splitter" - "github.com/bluesky-social/indigo/util/version" + "github.com/carlmjohnson/versioninfo" _ "go.uber.org/automaxprocs" _ "net/http/pprof" @@ -40,7 +40,7 @@ func run(args []string) { app := cli.App{ Name: "splitter", Usage: "firehose proxy", - Version: version.Version, + Version: versioninfo.Short(), } app.Flags = []cli.Flag{ From da9457bd0d8709b39252b0ea42ea6237c79c8816 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 13 Nov 2024 23:15:00 -0800 Subject: [PATCH 5/7] copy over some updates from the relay --- splitter/splitter.go | 49 ++++++++++++++------------------------------ 1 file changed, 15 insertions(+), 34 deletions(-) diff --git a/splitter/splitter.go b/splitter/splitter.go index b8030c534..0f4a6cf8c 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -16,7 +16,6 @@ import ( "github.com/bluesky-social/indigo/bgs" events "github.com/bluesky-social/indigo/events" "github.com/bluesky-social/indigo/events/schedulers/sequential" - lexutil "github.com/bluesky-social/indigo/lex/util" "github.com/gorilla/websocket" logging "github.com/ipfs/go-log" "github.com/labstack/echo/v4" @@ -43,7 +42,7 @@ type Splitter struct { } func NewSplitter(host string) *Splitter { - erb := NewEventRingBuffer(20_000, 1000) + erb := NewEventRingBuffer(20_000, 10_000) em := events.NewEventManager(erb) return &Splitter{ @@ -255,52 +254,34 @@ func (s *Splitter) EventsHandler(c echo.Context) error { "consumer_id", consumerID, ) - header := events.EventHeader{Op: events.EvtKindMessage} for { select { - case evt := <-evts: + case evt, ok := <-evts: + if !ok { + log.Error("event stream closed unexpectedly") + return nil + } + wc, err := conn.NextWriter(websocket.BinaryMessage) if err != nil { log.Errorf("failed to get next writer: %s", err) return err } - var obj lexutil.CBOR - - switch { - case evt.Error != nil: - header.Op = events.EvtKindErrorFrame - obj = evt.Error - case evt.RepoCommit != nil: - header.MsgType = "#commit" - obj = evt.RepoCommit - case evt.RepoHandle != nil: - header.MsgType = "#handle" - obj = evt.RepoHandle - case evt.RepoInfo != nil: - header.MsgType = "#info" - obj = evt.RepoInfo - case evt.RepoMigrate != nil: - header.MsgType = "#migrate" - obj = evt.RepoMigrate - case evt.RepoTombstone != nil: - header.MsgType = "#tombstone" - obj = evt.RepoTombstone - default: - return fmt.Errorf("unrecognized event kind") - } - - if err := header.MarshalCBOR(wc); err != nil { - return fmt.Errorf("failed to write header: %w", err) + if evt.Preserialized != nil { + _, err = wc.Write(evt.Preserialized) + } else { + err = evt.Serialize(wc) } - - if err := obj.MarshalCBOR(wc); err != nil { + if err != nil { return fmt.Errorf("failed to write event: %w", err) } if err := wc.Close(); err != nil { - return fmt.Errorf("failed to flush-close our event write: %w", err) + log.Warnf("failed to flush-close our event write: %s", err) + return nil } + lastWriteLk.Lock() lastWrite = time.Now() lastWriteLk.Unlock() From 54f0388053ed30863ae3b6690bdba97c7cebe5dc Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 14 Nov 2024 10:49:39 -0800 Subject: [PATCH 6/7] dont duplicate sequence for event func --- events/events.go | 8 ++++---- splitter/ringbuf.go | 4 ++-- splitter/splitter.go | 20 ++------------------ 3 files changed, 8 insertions(+), 24 deletions(-) diff --git a/events/events.go b/events/events.go index 915beaf19..83438ee44 100644 --- a/events/events.go +++ b/events/events.go @@ -290,7 +290,7 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func case <-done: return ErrPlaybackShutdown case out <- e: - seq := sequenceForEvent(e) + seq := SequenceForEvent(e) if seq > 0 { lastSeq = seq } @@ -315,8 +315,8 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func // run playback again to get us to the events that have started buffering if err := em.persister.Playback(ctx, lastSeq, func(e *XRPCStreamEvent) error { - seq := sequenceForEvent(e) - if seq > sequenceForEvent(first) { + seq := SequenceForEvent(e) + if seq > SequenceForEvent(first) { return ErrCaughtUp } @@ -351,7 +351,7 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func return out, sub.cleanup, nil } -func sequenceForEvent(evt *XRPCStreamEvent) int64 { +func SequenceForEvent(evt *XRPCStreamEvent) int64 { switch { case evt == nil: return -1 diff --git a/splitter/ringbuf.go b/splitter/ringbuf.go index 9168cdf04..2417f4eb0 100644 --- a/splitter/ringbuf.go +++ b/splitter/ringbuf.go @@ -96,7 +96,7 @@ func (er *EventRingBuffer) playbackRound(ctx context.Context, since int64, cb fu for ; i >= 0; i-- { c := chunks[i] evts := c.events() - if since > sequenceForEvent(evts[len(evts)-1]) { + if since > events.SequenceForEvent(evts[len(evts)-1]) { i++ break } @@ -112,7 +112,7 @@ func (er *EventRingBuffer) playbackRound(ctx context.Context, since int64, cb fu for nread < len(evts) { for _, e := range evts[nread:] { nread++ - seq := sequenceForEvent(e) + seq := events.SequenceForEvent(e) if seq <= since { continue } diff --git a/splitter/splitter.go b/splitter/splitter.go index 0f4a6cf8c..c39fbb5e3 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -383,7 +383,7 @@ func (s *Splitter) handleConnection(ctx context.Context, host string, con *webso defer cancel() sched := sequential.NewScheduler("splitter", func(ctx context.Context, evt *events.XRPCStreamEvent) error { - seq := sequenceForEvent(evt) + seq := events.SequenceForEvent(evt) if seq < 0 { // ignore info events and other unsupported types return nil @@ -402,24 +402,8 @@ func (s *Splitter) handleConnection(ctx context.Context, host string, con *webso *lastCursor = seq return nil }) - return events.HandleRepoStream(ctx, con, sched) -} -func sequenceForEvent(evt *events.XRPCStreamEvent) int64 { - switch { - case evt.RepoCommit != nil: - return evt.RepoCommit.Seq - case evt.RepoHandle != nil: - return evt.RepoHandle.Seq - case evt.RepoMigrate != nil: - return evt.RepoMigrate.Seq - case evt.RepoTombstone != nil: - return evt.RepoTombstone.Seq - case evt.RepoInfo != nil: - return -1 - default: - return -1 - } + return events.HandleRepoStream(ctx, con, sched) } func (s *Splitter) getLastCursor() (int64, error) { From c7898a9c72e9892a78bb9ed28ec5ffa448cdd9e0 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 15 Nov 2024 11:30:41 -0800 Subject: [PATCH 7/7] cors nonsense --- splitter/splitter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splitter/splitter.go b/splitter/splitter.go index c39fbb5e3..0397aa720 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -87,7 +87,7 @@ func (s *Splitter) StartWithListener(listen net.Listener) error { e.HideBanner = true e.Use(middleware.CORSWithConfig(middleware.CORSConfig{ - AllowOrigins: []string{"http://localhost:*", "https://bgs.bsky-sandbox.dev"}, + AllowOrigins: []string{"*"}, AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept, echo.HeaderAuthorization}, }))