From e3e7f600cb33d62d0bd893d4457e7cbbc85678c2 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Sat, 23 Sep 2023 01:27:10 +0000 Subject: [PATCH] Discover repos by listing tehm from the BGS --- search/firehose.go | 53 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/search/firehose.go b/search/firehose.go index bff7aadb9..8b1a8b2ad 100644 --- a/search/firehose.go +++ b/search/firehose.go @@ -51,6 +51,7 @@ func (s *Server) RunIndexer(ctx context.Context) error { return fmt.Errorf("loading backfill jobs: %w", err) } go s.bf.Start() + go s.discoverRepos() d := websocket.DefaultDialer u, err := url.Parse(s.bgshost) @@ -168,6 +169,58 @@ func (s *Server) RunIndexer(ctx context.Context) error { ) } +func (s *Server) discoverRepos() { + ctx := context.Background() + log := s.logger.With("func", "discoverRepos") + log.Info("starting repo discovery") + + cursor := "" + limit := int64(500) + + totalEnqueued := 0 + totalSkipped := 0 + totalErrored := 0 + + for { + resp, err := comatproto.SyncListRepos(ctx, s.bgsxrpc, cursor, limit) + if err != nil { + log.Error("failed to list repos", "err", err) + continue + } + log.Info("got repo page", "count", len(resp.Repos), "cursor", resp.Cursor) + enqueued := 0 + skipped := 0 + errored := 0 + for _, repo := range resp.Repos { + job, err := s.bfs.GetJob(ctx, repo.Did) + if job == nil && err == nil { + log.Info("enqueuing backfill job for new repo", "did", repo.Did) + if err := s.bfs.EnqueueJob(repo.Did); err != nil { + log.Warn("failed to enqueue backfill job", "err", err) + errored++ + } + enqueued++ + } else if err != nil { + log.Warn("failed to get backfill job", "did", repo.Did, "err", err) + errored++ + } else { + skipped++ + } + } + log.Info("enqueued repos", "enqueued", enqueued, "skipped", skipped, "errored", errored) + totalEnqueued += enqueued + totalSkipped += skipped + totalErrored += errored + if resp.Cursor != nil && *resp.Cursor != "" { + cursor = *resp.Cursor + } else { + break + } + } + + log.Info("finished repo discovery", "totalEnqueued", totalEnqueued, "totalSkipped", totalSkipped, "totalErrored", totalErrored) +} + func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, path string, recP *typegen.CBORMarshaler, rcid *cid.Cid) error { // Since this gets called in a backfill job, we need to check if the path is a post or profile if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") {