diff --git a/atproto/identity/base_directory.go b/atproto/identity/base_directory.go index 40577eac3..15cec3e24 100644 --- a/atproto/identity/base_directory.go +++ b/atproto/identity/base_directory.go @@ -7,12 +7,17 @@ import ( "net/http" "github.com/bluesky-social/indigo/atproto/syntax" + "golang.org/x/time/rate" ) // The zero value ('BaseDirectory{}') is a usable Directory. type BaseDirectory struct { // if non-empty, this string should have URL method, hostname, and optional port; it should not have a path or trailing slash PLCURL string + // If not nil, this limiter will be used to rate-limit requests to the PLCURL + PLCLimiter *rate.Limiter + // If not nil, this function will be called inline with DID Web lookups, and can be used to limit the number of requests to a given hostname + DIDWebLimitFunc func(ctx context.Context, hostname string) error // HTTP client used for did:web, did:plc, and HTTP (well-known) handle resolution HTTPClient http.Client // DNS resolver used for DNS handle resolution. Calling code can use a custom Dialer to query against a specific DNS server, or re-implement the interface for even more control over the resolution process diff --git a/atproto/identity/cache_directory.go b/atproto/identity/cache_directory.go index 7e9575afc..1428526e0 100644 --- a/atproto/identity/cache_directory.go +++ b/atproto/identity/cache_directory.go @@ -6,6 +6,8 @@ import ( "time" "github.com/bluesky-social/indigo/atproto/syntax" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/hashicorp/golang-lru/v2/expirable" ) @@ -29,6 +31,26 @@ type IdentityEntry struct { Err error } +var handleCacheHits = promauto.NewCounter(prometheus.CounterOpts{ + Name: "atproto_directory_handle_cache_hits", + Help: "Number of cache hits for ATProto handle lookups", +}) + +var handleCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ + Name: "atproto_directory_handle_cache_misses", + Help: "Number of cache misses for ATProto handle lookups", +}) + +var identityCacheHits = promauto.NewCounter(prometheus.CounterOpts{ + Name: "atproto_directory_identity_cache_hits", + Help: "Number of cache hits for ATProto identity lookups", +}) + +var identityCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ + Name: "atproto_directory_identity_cache_misses", + Help: "Number of cache misses for ATProto identity lookups", +}) + var _ Directory = (*CacheDirectory)(nil) // Capacity of zero means unlimited size. Similarly, ttl of zero means unlimited duration. @@ -89,6 +111,7 @@ func (d *CacheDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (sy maybeEntry, ok := d.handleCache.Get(h) if !ok { + handleCacheMisses.Inc() entry, err = d.updateHandle(ctx, h) if err != nil { return "", err @@ -97,10 +120,13 @@ func (d *CacheDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (sy entry = &maybeEntry } if d.IsHandleStale(entry) { + handleCacheMisses.Inc() entry, err = d.updateHandle(ctx, h) if err != nil { return "", err } + } else { + handleCacheHits.Inc() } return entry.DID, entry.Err } @@ -136,6 +162,7 @@ func (d *CacheDirectory) LookupDID(ctx context.Context, did syntax.DID) (*Identi maybeEntry, ok := d.identityCache.Get(did) if !ok { + identityCacheMisses.Inc() entry, err = d.updateDID(ctx, did) if err != nil { return nil, err @@ -144,10 +171,13 @@ func (d *CacheDirectory) LookupDID(ctx context.Context, did syntax.DID) (*Identi entry = &maybeEntry } if d.IsIdentityStale(entry) { + identityCacheMisses.Inc() entry, err = d.updateDID(ctx, did) if err != nil { return nil, err } + } else { + identityCacheHits.Inc() } return entry.Identity, entry.Err } diff --git a/atproto/identity/did.go b/atproto/identity/did.go index bca976f5c..b1af03ff5 100644 --- a/atproto/identity/did.go +++ b/atproto/identity/did.go @@ -67,6 +67,13 @@ func (d *BaseDirectory) ResolveDIDWeb(ctx context.Context, did syntax.DID) (*DID // TODO: use a more robust client // TODO: allow ctx to specify unsafe http:// resolution, for testing? + + if d.DIDWebLimitFunc != nil { + if err := d.DIDWebLimitFunc(ctx, hostname); err != nil { + return nil, fmt.Errorf("did:web limit func returned an error for (%s): %w", hostname, err) + } + } + resp, err := http.Get("https://" + hostname + "/.well-known/did.json") // look for NXDOMAIN var dnsErr *net.DNSError @@ -101,6 +108,13 @@ func (d *BaseDirectory) ResolveDIDPLC(ctx context.Context, did syntax.DID) (*DID if plcURL == "" { plcURL = DefaultPLCURL } + + if d.PLCLimiter != nil { + if err := d.PLCLimiter.Wait(ctx); err != nil { + return nil, fmt.Errorf("failed to wait for PLC limiter: %w", err) + } + } + resp, err := http.Get(plcURL + "/" + did.String()) if err != nil { return nil, fmt.Errorf("failed did:plc directory resolution: %w", err) diff --git a/backfill/backfill.go b/backfill/backfill.go index 2199147cb..14d6741a9 100644 --- a/backfill/backfill.go +++ b/backfill/backfill.go @@ -101,7 +101,7 @@ func DefaultBackfillOptions() *BackfillOptions { ParallelRecordCreates: 100, NSIDFilter: "", SyncRequestsPerSecond: 2, - CheckoutPath: "https://bsky.social/xrpc/com.atproto.sync.getCheckout", + CheckoutPath: "https://bsky.social/xrpc/com.atproto.sync.getRepo", } } @@ -261,7 +261,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { // GET and CAR decode the body client := &http.Client{ Transport: otelhttp.NewTransport(http.DefaultTransport), - Timeout: 120 * time.Second, + Timeout: 600 * time.Second, } req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { diff --git a/backfill/gormstore.go b/backfill/gormstore.go index c874d7a7a..f5ec87a71 100644 --- a/backfill/gormstore.go +++ b/backfill/gormstore.go @@ -45,29 +45,37 @@ func NewGormstore(db *gorm.DB) *Gormstore { } func (s *Gormstore) LoadJobs(ctx context.Context) error { - // Load all jobs from the database - var dbjobs []*GormDBJob - if err := s.db.Find(&dbjobs).Error; err != nil { - return err - } - + limit := 20_000 + offset := 0 s.lk.Lock() defer s.lk.Unlock() - // Convert them to in-memory jobs - for i := range dbjobs { - dbj := dbjobs[i] - j := &Gormjob{ - repo: dbj.Repo, - state: dbj.State, - bufferedOps: map[string][]*bufferedOp{}, - createdAt: dbj.CreatedAt, - updatedAt: dbj.UpdatedAt, - - dbj: dbj, - db: s.db, + for { + var dbjobs []*GormDBJob + // Load all jobs from the database + if err := s.db.Limit(limit).Offset(offset).Find(&dbjobs).Error; err != nil { + return err + } + if len(dbjobs) == 0 { + break + } + offset += len(dbjobs) + + // Convert them to in-memory jobs + for i := range dbjobs { + dbj := dbjobs[i] + j := &Gormjob{ + repo: dbj.Repo, + state: dbj.State, + bufferedOps: map[string][]*bufferedOp{}, + createdAt: dbj.CreatedAt, + updatedAt: dbj.UpdatedAt, + + dbj: dbj, + db: s.db, + } + s.jobs[dbj.Repo] = j } - s.jobs[dbj.Repo] = j } return nil diff --git a/cmd/palomar/main.go b/cmd/palomar/main.go index 1b8641c79..8b1a362bc 100644 --- a/cmd/palomar/main.go +++ b/cmd/palomar/main.go @@ -11,6 +11,7 @@ import ( "time" _ "github.com/joho/godotenv/autoload" + "golang.org/x/time/rate" "github.com/bluesky-social/indigo/atproto/identity" "github.com/bluesky-social/indigo/search" @@ -132,6 +133,12 @@ var runCmd = &cli.Command{ Value: 20, EnvVars: []string{"PALOMAR_INDEX_MAX_CONCURRENCY"}, }, + &cli.IntFlag{ + Name: "plc-rate-limit", + Usage: "max number of requests per second to PLC registry", + Value: 100, + EnvVars: []string{"PALOMAR_PLC_RATE_LIMIT"}, + }, }, Action: func(cctx *cli.Context) error { logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ @@ -155,10 +162,11 @@ var runCmd = &cli.Command{ HTTPClient: http.Client{ Timeout: time.Second * 15, }, + PLCLimiter: rate.NewLimiter(rate.Limit(cctx.Int("plc-rate-limit")), 1), TryAuthoritativeDNS: true, SkipDNSDomainSuffixes: []string{".bsky.social"}, } - dir := identity.NewCacheDirectory(&base, 200000, time.Hour*24, time.Minute*2) + dir := identity.NewCacheDirectory(&base, 1_500_000, time.Hour*24, time.Minute*2) srv, err := search.NewServer( db, diff --git a/search/firehose.go b/search/firehose.go index e987f1c1f..102c6112b 100644 --- a/search/firehose.go +++ b/search/firehose.go @@ -7,6 +7,7 @@ import ( "net/http" "net/url" "strings" + "time" comatproto "github.com/bluesky-social/indigo/api/atproto" bsky "github.com/bluesky-social/indigo/api/bsky" @@ -51,6 +52,7 @@ func (s *Server) RunIndexer(ctx context.Context) error { return fmt.Errorf("loading backfill jobs: %w", err) } go s.bf.Start() + go s.discoverRepos() d := websocket.DefaultDialer u, err := url.Parse(s.bgshost) @@ -92,6 +94,15 @@ func (s *Server) RunIndexer(ctx context.Context) error { return nil } + // Check if we've backfilled this repo, if not, we should enqueue it + job, err := s.bfs.GetJob(ctx, evt.Repo) + if job == nil && err == nil { + logEvt.Info("enqueueing backfill job for new repo") + if err := s.bfs.EnqueueJob(evt.Repo); err != nil { + logEvt.Warn("failed to enqueue backfill job", "err", err) + } + } + r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) if err != nil { // TODO: handle this case (instead of return nil) @@ -159,6 +170,60 @@ func (s *Server) RunIndexer(ctx context.Context) error { ) } +func (s *Server) discoverRepos() { + ctx := context.Background() + log := s.logger.With("func", "discoverRepos") + log.Info("starting repo discovery") + + cursor := "" + limit := int64(500) + + totalEnqueued := 0 + totalSkipped := 0 + totalErrored := 0 + + for { + resp, err := comatproto.SyncListRepos(ctx, s.bgsxrpc, cursor, limit) + if err != nil { + log.Error("failed to list repos", "err", err) + time.Sleep(5 * time.Second) + continue + } + log.Info("got repo page", "count", len(resp.Repos), "cursor", resp.Cursor) + enqueued := 0 + skipped := 0 + errored := 0 + for _, repo := range resp.Repos { + job, err := s.bfs.GetJob(ctx, repo.Did) + if job == nil && err == nil { + log.Info("enqueuing backfill job for new repo", "did", repo.Did) + if err := s.bfs.EnqueueJob(repo.Did); err != nil { + log.Warn("failed to enqueue backfill job", "err", err) + errored++ + continue + } + enqueued++ + } else if err != nil { + log.Warn("failed to get backfill job", "did", repo.Did, "err", err) + errored++ + } else { + skipped++ + } + } + log.Info("enqueued repos", "enqueued", enqueued, "skipped", skipped, "errored", errored) + totalEnqueued += enqueued + totalSkipped += skipped + totalErrored += errored + if resp.Cursor != nil && *resp.Cursor != "" { + cursor = *resp.Cursor + } else { + break + } + } + + log.Info("finished repo discovery", "totalEnqueued", totalEnqueued, "totalSkipped", totalSkipped, "totalErrored", totalErrored) +} + func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, path string, recP *typegen.CBORMarshaler, rcid *cid.Cid) error { // Since this gets called in a backfill job, we need to check if the path is a post or profile if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { diff --git a/search/handlers.go b/search/handlers.go index 6232da073..22db33332 100644 --- a/search/handlers.go +++ b/search/handlers.go @@ -123,6 +123,59 @@ func (s *Server) handleSearchActorsSkeleton(e echo.Context) error { return e.JSON(200, out) } +type IndexError struct { + DID string `json:"did"` + Err string `json:"err"` +} + +func (s *Server) handleIndexRepos(e echo.Context) error { + ctx, span := otel.Tracer("search").Start(e.Request().Context(), "handleIndexRepos") + defer span.End() + + dids, ok := e.QueryParams()["did"] + if !ok { + return e.JSON(400, map[string]any{ + "error": "must pass at least one did to index", + }) + } + + for _, did := range dids { + _, err := syntax.ParseDID(did) + if err != nil { + return e.JSON(400, map[string]any{ + "error": fmt.Sprintf("invalid DID (%s): %s", did, err), + }) + } + } + + errs := []IndexError{} + successes := 0 + skipped := 0 + for _, did := range dids { + job, err := s.bfs.GetJob(ctx, did) + if job == nil && err == nil { + err := s.bfs.EnqueueJob(did) + if err != nil { + errs = append(errs, IndexError{ + DID: did, + Err: err.Error(), + }) + continue + } + successes++ + continue + } + skipped++ + } + + return e.JSON(200, map[string]any{ + "numEnqueued": successes, + "numSkipped": skipped, + "numErrored": len(errs), + "errors": errs, + }) +} + func (s *Server) SearchPosts(ctx context.Context, q string, offset, size int) (*SearchPostsSkeletonResp, error) { resp, err := DoSearchPosts(ctx, s.dir, s.escli, s.postIndex, q, offset, size) if err != nil { diff --git a/search/server.go b/search/server.go index a69e6f4b3..7a80b27d2 100644 --- a/search/server.go +++ b/search/server.go @@ -93,6 +93,7 @@ func NewServer(db *gorm.DB, escli *es.Client, dir identity.Directory, config Con } else { opts.SyncRequestsPerSecond = 8 } + opts.CheckoutPath = fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo", bgshttp) if config.IndexMaxConcurrency > 0 { opts.ParallelRecordCreates = config.IndexMaxConcurrency } else { @@ -199,6 +200,7 @@ func (s *Server) RunAPI(listen string) error { e.GET("/metrics", echoprometheus.NewHandler()) e.GET("/xrpc/app.bsky.unspecced.searchPostsSkeleton", s.handleSearchPostsSkeleton) e.GET("/xrpc/app.bsky.unspecced.searchActorsSkeleton", s.handleSearchActorsSkeleton) + e.GET("/xrpc/app.bsky.unspecced.indexRepos", s.handleIndexRepos) s.echo = e s.logger.Info("starting search API daemon", "bind", listen)