From 9833254dc0ce588c071c364f47de8d6c8a358b60 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Fri, 22 Nov 2024 23:08:04 +0000 Subject: [PATCH 1/5] Handle requestCrawl fanout in Rainbow --- cmd/rainbow/main.go | 18 +++-- indexer/metrics.go | 5 ++ splitter/splitter.go | 157 ++++++++++++++++++++++++++++++++++--------- 3 files changed, 141 insertions(+), 39 deletions(-) diff --git a/cmd/rainbow/main.go b/cmd/rainbow/main.go index 93dc2f8c..d574d626 100644 --- a/cmd/rainbow/main.go +++ b/cmd/rainbow/main.go @@ -2,7 +2,6 @@ package main import ( "context" - "github.com/bluesky-social/indigo/events" "log/slog" _ "net/http/pprof" "os" @@ -10,12 +9,11 @@ import ( "syscall" "time" + "github.com/bluesky-social/indigo/events" "github.com/bluesky-social/indigo/splitter" - _ "github.com/joho/godotenv/autoload" - _ "go.uber.org/automaxprocs" - "github.com/carlmjohnson/versioninfo" + _ "github.com/joho/godotenv/autoload" "github.com/urfave/cli/v2" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -23,6 +21,7 @@ import ( "go.opentelemetry.io/otel/sdk/resource" tracesdk "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + _ "go.uber.org/automaxprocs" ) var log = slog.Default().With("system", "rainbow") @@ -88,6 +87,11 @@ func run(args []string) { Usage: "max bytes target for event cache, 0 to disable size target trimming", EnvVars: []string{"RAINBOW_PERSIST_BYTES", "SPLITTER_PERSIST_BYTES"}, }, + &cli.StringSliceFlag{ + Name: "next-crawler", + Usage: "forward POST requestCrawl to this url, should be machine root url and not xrpc/requestCrawl, comma separated list", + EnvVars: []string{"RELAY_NEXT_CRAWLER"}, + }, } // TODO: slog.SetDefault and set module `var log *slog.Logger` based on flags and env @@ -143,6 +147,8 @@ func Splitter(cctx *cli.Context) error { persistPath := cctx.String("persist-db") upstreamHost := cctx.String("splitter-host") + nextCrawlers := cctx.StringSlice("next-crawler") + var spl *splitter.Splitter var err error if persistPath != "" { @@ -158,14 +164,14 @@ func Splitter(cctx *cli.Context) error { CursorFile: cctx.String("cursor-file"), PebbleOptions: &ppopts, } - spl, err = splitter.NewSplitter(conf) + spl, err = splitter.NewSplitter(conf, nextCrawlers) } else { log.Info("building in-memory splitter") conf := splitter.SplitterConfig{ UpstreamHost: upstreamHost, CursorFile: cctx.String("cursor-file"), } - spl, err = splitter.NewSplitter(conf) + spl, err = splitter.NewSplitter(conf, nextCrawlers) } if err != nil { log.Error("failed to create splitter", "path", persistPath, "error", err) diff --git a/indexer/metrics.go b/indexer/metrics.go index 447460e8..68602efa 100644 --- a/indexer/metrics.go +++ b/indexer/metrics.go @@ -44,3 +44,8 @@ var catchupReposGauge = promauto.NewGauge(prometheus.GaugeOpts{ Name: "indexer_catchup_repos", Help: "Number of repos waiting on catchup", }) + +var usersAddedToCatchupQueue = promauto.NewCounter(prometheus.CounterOpts{ + Name: "indexer_users_added_to_catchup_queue", + Help: "Number of users added to catchup queue", +}) diff --git a/splitter/splitter.go b/splitter/splitter.go index ca63aa05..076f1c8b 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -1,7 +1,9 @@ package splitter import ( + "bytes" "context" + "encoding/json" "errors" "fmt" "io" @@ -9,15 +11,20 @@ import ( "math/rand" "net" "net/http" + "net/url" "os" "strconv" "strings" "sync" "time" + "github.com/bluesky-social/indigo/api/atproto" + comatproto "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/bgs" events "github.com/bluesky-social/indigo/events" "github.com/bluesky-social/indigo/events/schedulers/sequential" + "github.com/bluesky-social/indigo/util" + "github.com/bluesky-social/indigo/xrpc" "github.com/gorilla/websocket" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" @@ -39,6 +46,9 @@ type Splitter struct { conf SplitterConfig log *slog.Logger + + httpC *http.Client + nextCrawlers []*url.URL } type SplitterConfig struct { @@ -47,52 +57,45 @@ type SplitterConfig struct { PebbleOptions *events.PebblePersistOptions } -func NewMemSplitter(host string) *Splitter { - conf := SplitterConfig{ - UpstreamHost: host, - CursorFile: "cursor-file", +func NewSplitter(conf SplitterConfig, nextCrawlers []string) (*Splitter, error) { + var nextCrawlerURLs []*url.URL + log := slog.Default().With("system", "splitter") + if len(nextCrawlers) > 0 { + nextCrawlerURLs = make([]*url.URL, len(nextCrawlers)) + for i, tu := range nextCrawlers { + var err error + nextCrawlerURLs[i], err = url.Parse(tu) + if err != nil { + return nil, fmt.Errorf("failed to parse next-crawler url: %w", err) + } + log.Info("configuring relay for requestCrawl", "host", nextCrawlerURLs[i]) + } } - erb := NewEventRingBuffer(20_000, 10_000) - - em := events.NewEventManager(erb) - return &Splitter{ - conf: conf, - erb: erb, - events: em, - consumers: make(map[uint64]*SocketConsumer), - log: slog.Default().With("system", "splitter"), + s := &Splitter{ + conf: conf, + consumers: make(map[uint64]*SocketConsumer), + log: log, + httpC: util.RobustHTTPClient(), + nextCrawlers: nextCrawlerURLs, } -} -func NewSplitter(conf SplitterConfig) (*Splitter, error) { + if conf.PebbleOptions == nil { // mem splitter erb := NewEventRingBuffer(20_000, 10_000) - - em := events.NewEventManager(erb) - return &Splitter{ - conf: conf, - erb: erb, - events: em, - consumers: make(map[uint64]*SocketConsumer), - log: slog.Default().With("system", "splitter"), - }, nil + s.erb = erb + s.events = events.NewEventManager(erb) } else { pp, err := events.NewPebblePersistance(conf.PebbleOptions) if err != nil { return nil, err } - go pp.GCThread(context.Background()) - em := events.NewEventManager(pp) - return &Splitter{ - conf: conf, - pp: pp, - events: em, - consumers: make(map[uint64]*SocketConsumer), - log: slog.Default().With("system", "splitter"), - }, nil + s.pp = pp + s.events = events.NewEventManager(pp) } + + return s, nil } func NewDiskSplitter(host, path string, persistHours float64, maxBytes int64) (*Splitter, error) { ppopts := events.PebblePersistOptions{ @@ -200,6 +203,9 @@ func (s *Splitter) StartWithListener(listen net.Listener) error { } } + // TODO: this API is temporary until we formalize what we want here + + e.GET("/xrpc/com.atproto.sync.requestCrawl", s.RequestCrawlHandler) e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.EventsHandler) e.GET("/xrpc/_health", s.HandleHealthCheck) @@ -238,6 +244,91 @@ func (s *Splitter) HandleHomeMessage(c echo.Context) error { return c.String(http.StatusOK, homeMessage) } +type XRPCError struct { + Message string `json:"message"` +} + +func (s *Splitter) RequestCrawlHandler(c echo.Context) error { + ctx := c.Request().Context() + var body comatproto.SyncRequestCrawl_Input + if err := c.Bind(&body); err != nil { + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid body: %s", err)}) + } + + host := body.Hostname + if host == "" { + return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname") + } + + if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") { + host = "https://" + host + } + + u, err := url.Parse(host) + if err != nil { + return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname") + } + + if u.Scheme == "http" { + return echo.NewHTTPError(http.StatusBadRequest, "this server requires https") + } + if u.Path != "" { + return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without path") + } + + if u.Query().Encode() != "" { + return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without query") + } + + host = u.Host // potentially hostname:port + + clientHost := fmt.Sprintf("%s://%s", u.Scheme, host) + + xrpcC := &xrpc.Client{ + Host: clientHost, + Client: http.DefaultClient, // not using the client that auto-retries + } + + desc, err := atproto.ServerDescribeServer(ctx, xrpcC) + if err != nil { + errMsg := fmt.Sprintf("requested host (%s) failed to respond to describe request", clientHost) + return echo.NewHTTPError(http.StatusBadRequest, errMsg) + } + + // Maybe we could do something with this response later + _ = desc + + if len(s.nextCrawlers) != 0 { + blob, err := json.Marshal(body) + if err != nil { + s.log.Warn("could not forward requestCrawl, json err", "err", err) + } else { + go func(bodyBlob []byte) { + for _, remote := range s.nextCrawlers { + if remote == nil { + continue + } + + pu := remote.JoinPath("/xrpc/com.atproto.sync.requestCrawl") + response, err := s.httpC.Post(pu.String(), "application/json", bytes.NewReader(bodyBlob)) + if response != nil && response.Body != nil { + response.Body.Close() + } + if err != nil || response == nil { + s.log.Warn("requestCrawl forward failed", "host", remote, "err", err) + } else if response.StatusCode != http.StatusOK { + s.log.Warn("requestCrawl forward failed", "host", remote, "status", response.Status) + } else { + s.log.Info("requestCrawl forward successful", "host", remote) + } + } + }(blob) + } + } + + return c.JSON(200, HealthStatus{Status: "ok"}) +} + func (s *Splitter) EventsHandler(c echo.Context) error { var since *int64 if sinceVal := c.QueryParam("cursor"); sinceVal != "" { From b225d61a32f3c02af01ce2bb652d02680251b356 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Fri, 22 Nov 2024 23:22:19 +0000 Subject: [PATCH 2/5] requestCrawl should be a post --- splitter/splitter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splitter/splitter.go b/splitter/splitter.go index 076f1c8b..89341419 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -205,7 +205,7 @@ func (s *Splitter) StartWithListener(listen net.Listener) error { // TODO: this API is temporary until we formalize what we want here - e.GET("/xrpc/com.atproto.sync.requestCrawl", s.RequestCrawlHandler) + e.POST("/xrpc/com.atproto.sync.requestCrawl", s.RequestCrawlHandler) e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.EventsHandler) e.GET("/xrpc/_health", s.HandleHealthCheck) From ad69fef36bc6519d83d7d2ca5ae57ddb60a3faa6 Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Tue, 17 Dec 2024 01:59:27 -0500 Subject: [PATCH 3/5] slog fix --- splitter/splitter.go | 1 + 1 file changed, 1 insertion(+) diff --git a/splitter/splitter.go b/splitter/splitter.go index 89341419..103d3f4d 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -78,6 +78,7 @@ func NewSplitter(conf SplitterConfig, nextCrawlers []string) (*Splitter, error) log: log, httpC: util.RobustHTTPClient(), nextCrawlers: nextCrawlerURLs, + log: log, } if conf.PebbleOptions == nil { From 070d7654fe6e5ac3622395477c31076eede85059 Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Wed, 18 Dec 2024 13:27:05 -0500 Subject: [PATCH 4/5] fix merge copy-paste-o --- splitter/splitter.go | 1 - 1 file changed, 1 deletion(-) diff --git a/splitter/splitter.go b/splitter/splitter.go index 103d3f4d..89341419 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -78,7 +78,6 @@ func NewSplitter(conf SplitterConfig, nextCrawlers []string) (*Splitter, error) log: log, httpC: util.RobustHTTPClient(), nextCrawlers: nextCrawlerURLs, - log: log, } if conf.PebbleOptions == nil { From d2169bebad6dbe6827a723f9e515ff9ef4c096d6 Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Wed, 18 Dec 2024 15:05:55 -0500 Subject: [PATCH 5/5] drop dead code --- indexer/metrics.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/indexer/metrics.go b/indexer/metrics.go index 68602efa..447460e8 100644 --- a/indexer/metrics.go +++ b/indexer/metrics.go @@ -44,8 +44,3 @@ var catchupReposGauge = promauto.NewGauge(prometheus.GaugeOpts{ Name: "indexer_catchup_repos", Help: "Number of repos waiting on catchup", }) - -var usersAddedToCatchupQueue = promauto.NewCounter(prometheus.CounterOpts{ - Name: "indexer_users_added_to_catchup_queue", - Help: "Number of users added to catchup queue", -})