diff --git a/atproto/data/data.go b/atproto/data/data.go index 1a4a8c5b9..560566c2d 100644 --- a/atproto/data/data.go +++ b/atproto/data/data.go @@ -66,17 +66,11 @@ func extractBlobsAtom(atom any) []Blob { out = append(out, v) case []any: for _, el := range v { - down := extractBlobsAtom(el) - for _, d := range down { - out = append(out, d) - } + out = append(out, extractBlobsAtom(el)...) } case map[string]any: for _, val := range v { - down := extractBlobsAtom(val) - for _, d := range down { - out = append(out, d) - } + out = append(out, extractBlobsAtom(val)...) } default: } diff --git a/automod/engine/fetch_account_meta.go b/automod/engine/fetch_account_meta.go index 5c55b417f..39b4df1eb 100644 --- a/automod/engine/fetch_account_meta.go +++ b/automod/engine/fetch_account_meta.go @@ -139,14 +139,16 @@ func (e *Engine) GetAccountMeta(ctx context.Context, ident *identity.Identity) ( ap.AccountTags = dedupeStrings(rd.Moderation.SubjectStatus.Tags) if rd.Moderation.SubjectStatus.ReviewState != nil { switch *rd.Moderation.SubjectStatus.ReviewState { - case "#reviewOpen": + case "tools.ozone.moderation.defs#reviewOpen": ap.ReviewState = ReviewStateOpen - case "#reviewEscalated": + case "tools.ozone.moderation.defs#reviewEscalated": ap.ReviewState = ReviewStateEscalated - case "#reviewClosed": + case "tools.ozone.moderation.defs#reviewClosed": ap.ReviewState = ReviewStateClosed - case "#reviewNonde": + case "tools.ozone.moderation.defs#reviewNone": ap.ReviewState = ReviewStateNone + default: + logger.Warn("unexpected ozone moderation review state", "state", rd.Moderation.SubjectStatus.ReviewState, "did", ident.DID) } } } diff --git a/automod/engine/slack.go b/automod/engine/slack.go index ae7c00d20..fe21abb72 100644 --- a/automod/engine/slack.go +++ b/automod/engine/slack.go @@ -86,7 +86,7 @@ func slackBody(header string, acct AccountMeta, newLabels, newFlags []string, ne msg += fmt.Sprintf("Report `%s`: %s\n", rep.ReasonType, rep.Comment) } if newTakedown { - msg += fmt.Sprintf("Takedown!\n") + msg += "Takedown!\n" } return msg } diff --git a/automod/flagstore/flagstore_mem.go b/automod/flagstore/flagstore_mem.go index 4f128075a..bc068475c 100644 --- a/automod/flagstore/flagstore_mem.go +++ b/automod/flagstore/flagstore_mem.go @@ -27,9 +27,7 @@ func (s MemFlagStore) Add(ctx context.Context, key string, flags []string) error if !ok { v = []string{} } - for _, f := range flags { - v = append(v, f) - } + v = append(v, flags...) v = dedupeStrings(v) s.Data[key] = v return nil diff --git a/automod/helpers/bsky.go b/automod/helpers/bsky.go index c7416f2dd..a38ecda92 100644 --- a/automod/helpers/bsky.go +++ b/automod/helpers/bsky.go @@ -11,9 +11,7 @@ import ( func ExtractHashtagsPost(post *appbsky.FeedPost) []string { var tags []string - for _, tag := range post.Tags { - tags = append(tags, tag) - } + tags = append(tags, post.Tags...) for _, facet := range post.Facets { for _, feat := range facet.Features { if feat.RichtextFacet_Tag != nil { diff --git a/automod/rules/harassment.go b/automod/rules/harassment.go index 2cf7ce194..e38e7c44a 100644 --- a/automod/rules/harassment.go +++ b/automod/rules/harassment.go @@ -130,7 +130,7 @@ func HarassmentTrivialPostRule(c *automod.RecordContext, post *appbsky.FeedPost) if count > 5 { //c.AddRecordFlag("trivial-harassing-post") - c.ReportAccount(automod.ReportReasonOther, fmt.Sprintf("possible targetted harassment (also labeled; remove label if this isn't harassment!)")) + c.ReportAccount(automod.ReportReasonOther, "possible targetted harassment (also labeled; remove label if this isn't harassment!)") c.AddAccountLabel("!hide") c.Notify("slack") } diff --git a/automod/rules/nostr.go b/automod/rules/nostr.go index 5f91e7ee6..6ad623ae3 100644 --- a/automod/rules/nostr.go +++ b/automod/rules/nostr.go @@ -1,7 +1,6 @@ package rules import ( - "fmt" "strings" "time" @@ -37,7 +36,7 @@ func NostrSpamPostRule(c *automod.RecordContext, post *appbsky.FeedPost) error { return nil } - c.ReportAccount(automod.ReportReasonOther, fmt.Sprintf("likely nostr spam account (also labeled; remove label if this isn't spam!)")) + c.ReportAccount(automod.ReportReasonOther, "likely nostr spam account (also labeled; remove label if this isn't spam!)") c.AddAccountLabel("!hide") c.Notify("slack") return nil diff --git a/automod/rules/promo.go b/automod/rules/promo.go index f6fe23a24..001dbc715 100644 --- a/automod/rules/promo.go +++ b/automod/rules/promo.go @@ -1,7 +1,6 @@ package rules import ( - "fmt" "net/url" "strings" "time" @@ -54,7 +53,7 @@ func AggressivePromotionRule(c *automod.RecordContext, post *appbsky.FeedPost) e uniqueReplies := c.GetCountDistinct("reply-to", did, countstore.PeriodDay) if uniqueReplies >= 10 { c.AddAccountFlag("promo-multi-reply") - c.ReportAccount(automod.ReportReasonSpam, fmt.Sprintf("possible aggressive self-promotion")) + c.ReportAccount(automod.ReportReasonSpam, "possible aggressive self-promotion") c.Notify("slack") } diff --git a/automod/rules/quick.go b/automod/rules/quick.go index ea6a69e36..127914c7a 100644 --- a/automod/rules/quick.go +++ b/automod/rules/quick.go @@ -29,7 +29,7 @@ func BotLinkProfileRule(c *automod.RecordContext, profile *appbsky.ActorProfile) } if strings.Contains(*profile.Description, "🏈🍕🌀") { c.AddAccountFlag("profile-bot-string") - c.ReportAccount(automod.ReportReasonSpam, fmt.Sprintf("possible bot based on string in profile")) + c.ReportAccount(automod.ReportReasonSpam, "possible bot based on string in profile") c.Notify("slack") return nil } @@ -89,7 +89,7 @@ func TrivialSpamPostRule(c *automod.RecordContext, post *appbsky.FeedPost) error return nil } - c.ReportAccount(automod.ReportReasonOther, fmt.Sprintf("trivial spam account (also labeled; remove label if this isn't spam!)")) + c.ReportAccount(automod.ReportReasonOther, "trivial spam account (also labeled; remove label if this isn't spam!)") c.AddAccountLabel("!hide") c.Notify("slack") return nil diff --git a/backfill/backfill.go b/backfill/backfill.go index 6f199677c..9232fa9f0 100644 --- a/backfill/backfill.go +++ b/backfill/backfill.go @@ -12,8 +12,11 @@ import ( "time" "github.com/bluesky-social/indigo/api/atproto" + "github.com/bluesky-social/indigo/atproto/identity" + "github.com/bluesky-social/indigo/atproto/syntax" "github.com/bluesky-social/indigo/repo" "github.com/bluesky-social/indigo/repomgr" + "github.com/ipfs/go-cid" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel" @@ -71,8 +74,8 @@ type Backfiller struct { ParallelRecordCreates int // Prefix match for records to backfill i.e. app.bsky.feed.app/ // If empty, all records will be backfilled - NSIDFilter string - CheckoutPath string + NSIDFilter string + RelayHost string syncLimiter *rate.Limiter @@ -80,6 +83,8 @@ type Backfiller struct { magicHeaderVal string stop chan chan struct{} + + Directory identity.Directory } var ( @@ -110,7 +115,7 @@ type BackfillOptions struct { ParallelRecordCreates int NSIDFilter string SyncRequestsPerSecond int - CheckoutPath string + RelayHost string } func DefaultBackfillOptions() *BackfillOptions { @@ -119,7 +124,7 @@ func DefaultBackfillOptions() *BackfillOptions { ParallelRecordCreates: 100, NSIDFilter: "", SyncRequestsPerSecond: 2, - CheckoutPath: "https://bsky.network/xrpc/com.atproto.sync.getRepo", + RelayHost: "https://bsky.network", } } @@ -145,8 +150,9 @@ func NewBackfiller( ParallelRecordCreates: opts.ParallelRecordCreates, NSIDFilter: opts.NSIDFilter, syncLimiter: rate.NewLimiter(rate.Limit(opts.SyncRequestsPerSecond), 1), - CheckoutPath: opts.CheckoutPath, + RelayHost: opts.RelayHost, stop: make(chan chan struct{}, 1), + Directory: identity.DefaultDirectory(), } } @@ -292,25 +298,12 @@ type recordResult struct { err error } -// BackfillRepo backfills a repo -func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error) { - ctx, span := tracer.Start(ctx, "BackfillRepo") - defer span.End() - - start := time.Now() - - repoDid := job.Repo() +// Fetches a repo CAR file over HTTP from the indicated host. If successful, parses the CAR and returns repo.Repo +func (b *Backfiller) fetchRepo(ctx context.Context, did, since, host string) (*repo.Repo, error) { + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", host, did) - log := slog.With("source", "backfiller_backfill_repo", "repo", repoDid) - if job.RetryCount() > 0 { - log = log.With("retry_count", job.RetryCount()) - } - log.Info(fmt.Sprintf("processing backfill for %s", repoDid)) - - url := fmt.Sprintf("%s?did=%s", b.CheckoutPath, repoDid) - - if job.Rev() != "" { - url = url + fmt.Sprintf("&since=%s", job.Rev()) + if since != "" { + url = url + fmt.Sprintf("&since=%s", since) } // GET and CAR decode the body @@ -320,8 +313,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error) } req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { - state := fmt.Sprintf("failed (create request: %s)", err.Error()) - return state, fmt.Errorf("failed to create request: %w", err) + return nil, fmt.Errorf("failed to create request: %w", err) } req.Header.Set("Accept", "application/vnd.ipld.car") @@ -334,8 +326,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error) resp, err := client.Do(req) if err != nil { - state := fmt.Sprintf("failed (do request: %s)", err.Error()) - return state, fmt.Errorf("failed to send request: %w", err) + return nil, fmt.Errorf("failed to send request: %w", err) } if resp.StatusCode != http.StatusOK { @@ -345,8 +336,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error) } else { reason = resp.Status } - state := fmt.Sprintf("failed (%s)", reason) - return state, fmt.Errorf("failed to get repo: %s", reason) + return nil, fmt.Errorf("failed to get repo: %s", reason) } instrumentedReader := instrumentedReader{ @@ -356,10 +346,47 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error) defer instrumentedReader.Close() - r, err := repo.ReadRepoFromCar(ctx, instrumentedReader) + repo, err := repo.ReadRepoFromCar(ctx, instrumentedReader) if err != nil { - state := "failed (couldn't read repo CAR from response body)" - return state, fmt.Errorf("failed to read repo from car: %w", err) + return nil, fmt.Errorf("failed to parse repo from CAR file: %w", err) + } + return repo, nil +} + +// BackfillRepo backfills a repo +func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error) { + ctx, span := tracer.Start(ctx, "BackfillRepo") + defer span.End() + + start := time.Now() + + repoDID := job.Repo() + + log := slog.With("source", "backfiller_backfill_repo", "repo", repoDID) + if job.RetryCount() > 0 { + log = log.With("retry_count", job.RetryCount()) + } + log.Info(fmt.Sprintf("processing backfill for %s", repoDID)) + + // first try with Relay endpoint + r, err := b.fetchRepo(ctx, repoDID, job.Rev(), b.RelayHost) + if err != nil { + slog.Warn("repo CAR fetch from relay failed", "did", repoDID, "since", job.Rev(), "relayHost", b.RelayHost, "err", err) + // fallback to direct PDS fetch + ident, err := b.Directory.LookupDID(ctx, syntax.DID(repoDID)) + if err != nil { + return "failed resolving DID to PDS repo", fmt.Errorf("resolving DID for PDS repo fetch: %w", err) + } + pdsHost := ident.PDSEndpoint() + if pdsHost == "" { + return "DID document missing PDS endpoint", fmt.Errorf("no PDS endpoint for DID: %s", repoDID) + } + r, err = b.fetchRepo(ctx, repoDID, job.Rev(), pdsHost) + if err != nil { + slog.Warn("repo CAR fetch from PDS failed", "did", repoDID, "since", job.Rev(), "pdsHost", pdsHost, "err", err) + return "repo CAR fetch from PDS failed", err + } + slog.Info("repo CAR fetch from PDS successful", "did", repoDID, "since", job.Rev(), "pdsHost", pdsHost, "err", err) } numRecords := 0 @@ -396,7 +423,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error) raw := blk.RawData() - err = b.HandleCreateRecord(ctx, repoDid, rev, item.recordPath, &raw, &item.nodeCid) + err = b.HandleCreateRecord(ctx, repoDID, rev, item.recordPath, &raw, &item.nodeCid) if err != nil { recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)} continue diff --git a/bgs/bgs.go b/bgs/bgs.go index 602530b98..d7838311e 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -896,6 +896,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event userLookupDuration.Observe(time.Since(s).Seconds()) if err != nil { if !errors.Is(err, gorm.ErrRecordNotFound) { + repoCommitsResultCounter.WithLabelValues(host.Host, "nou").Inc() return fmt.Errorf("looking up event user: %w", err) } @@ -904,6 +905,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event subj, err := bgs.createExternalUser(ctx, evt.Repo) newUserDiscoveryDuration.Observe(time.Since(start).Seconds()) if err != nil { + repoCommitsResultCounter.WithLabelValues(host.Host, "uerr").Inc() return fmt.Errorf("fed event create external user: %w", err) } @@ -918,20 +920,24 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event if u.GetTakenDown() || ustatus == events.AccountStatusTakendown { span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.GetTakenDown())) bgs.log.Debug("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host) + repoCommitsResultCounter.WithLabelValues(host.Host, "tdu").Inc() return nil } if ustatus == events.AccountStatusSuspended { bgs.log.Debug("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host) + repoCommitsResultCounter.WithLabelValues(host.Host, "susu").Inc() return nil } if ustatus == events.AccountStatusDeactivated { bgs.log.Debug("dropping commit event from deactivated user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host) + repoCommitsResultCounter.WithLabelValues(host.Host, "du").Inc() return nil } if evt.Rebase { + repoCommitsResultCounter.WithLabelValues(host.Host, "rebase").Inc() return fmt.Errorf("rebase was true in event seq:%d,host:%s", evt.Seq, host.Host) } @@ -942,10 +948,12 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event subj, err := bgs.createExternalUser(ctx, evt.Repo) if err != nil { + repoCommitsResultCounter.WithLabelValues(host.Host, "uerr2").Inc() return err } if subj.PDS != host.ID { + repoCommitsResultCounter.WithLabelValues(host.Host, "noauth").Inc() return fmt.Errorf("event from non-authoritative pds") } } @@ -954,16 +962,19 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event span.SetAttributes(attribute.Bool("tombstoned", true)) // we've checked the authority of the users PDS, so reinstate the account if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumn("tombstoned", false).Error; err != nil { + repoCommitsResultCounter.WithLabelValues(host.Host, "tomb").Inc() return fmt.Errorf("failed to un-tombstone a user: %w", err) } u.SetTombstoned(false) ai, err := bgs.Index.LookupUser(ctx, u.ID) if err != nil { + repoCommitsResultCounter.WithLabelValues(host.Host, "nou2").Inc() return fmt.Errorf("failed to look up user (tombstone recover): %w", err) } // Now a simple re-crawl should suffice to bring the user back online + repoCommitsResultCounter.WithLabelValues(host.Host, "catchupt").Inc() return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt) } @@ -972,6 +983,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event rebasesCounter.WithLabelValues(host.Host).Add(1) ai, err := bgs.Index.LookupUser(ctx, u.ID) if err != nil { + repoCommitsResultCounter.WithLabelValues(host.Host, "nou3").Inc() return fmt.Errorf("failed to look up user (slow path): %w", err) } @@ -983,26 +995,33 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event // processor coming off of the pds stream, we should investigate // whether or not we even need this 'slow path' logic, as it makes // accounting for which events have been processed much harder + repoCommitsResultCounter.WithLabelValues(host.Host, "catchup").Inc() return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt) } if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, evt.Since, evt.Rev, evt.Blocks, evt.Ops); err != nil { - bgs.log.Warn("failed handling event", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String()) if errors.Is(err, carstore.ErrRepoBaseMismatch) || ipld.IsNotFound(err) { ai, lerr := bgs.Index.LookupUser(ctx, u.ID) if lerr != nil { + log.Warn("failed handling event, no user", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String()) + repoCommitsResultCounter.WithLabelValues(host.Host, "nou4").Inc() return fmt.Errorf("failed to look up user %s (%d) (err case: %s): %w", u.Did, u.ID, err, lerr) } span.SetAttributes(attribute.Bool("catchup_queue", true)) + log.Info("failed handling event, catchup", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String()) + repoCommitsResultCounter.WithLabelValues(host.Host, "catchup2").Inc() return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt) } + log.Warn("failed handling event", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String()) + repoCommitsResultCounter.WithLabelValues(host.Host, "err").Inc() return fmt.Errorf("handle user event failed: %w", err) } + repoCommitsResultCounter.WithLabelValues(host.Host, "ok").Inc() return nil case env.RepoHandle != nil: bgs.log.Info("bgs got repo handle event", "did", env.RepoHandle.Did, "handle", env.RepoHandle.Handle) diff --git a/bgs/fedmgr.go b/bgs/fedmgr.go index 064e32e66..c68759d91 100644 --- a/bgs/fedmgr.go +++ b/bgs/fedmgr.go @@ -478,6 +478,10 @@ func (s *Slurper) subscribeWithRedialer(ctx context.Context, host *models.PDS, s cursor := host.Cursor + connectedInbound.Inc() + defer connectedInbound.Dec() + // TODO:? maybe keep a gauge of 'in retry backoff' sources? + var backoff int for { select { diff --git a/bgs/metrics.go b/bgs/metrics.go index edd687596..5ff362a16 100644 --- a/bgs/metrics.go +++ b/bgs/metrics.go @@ -27,6 +27,11 @@ var repoCommitsReceivedCounter = promauto.NewCounterVec(prometheus.CounterOpts{ Help: "The total number of events received", }, []string{"pds"}) +var repoCommitsResultCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "repo_commits_result_counter", + Help: "The results of commit events received", +}, []string{"pds", "status"}) + var rebasesCounter = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "event_rebases", Help: "The total number of rebase events received", @@ -42,6 +47,11 @@ var externalUserCreationAttempts = promauto.NewCounter(prometheus.CounterOpts{ Help: "The total number of external users created", }) +var connectedInbound = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "bgs_connected_inbound", + Help: "Number of inbound firehoses we are consuming", +}) + var compactionDuration = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "compaction_duration", Help: "A histogram of compaction latencies", diff --git a/cmd/astrolabe/handlers.go b/cmd/astrolabe/handlers.go index 4a735e8ed..f7732fe4e 100644 --- a/cmd/astrolabe/handlers.go +++ b/cmd/astrolabe/handlers.go @@ -64,7 +64,7 @@ func (srv *Server) WebAccount(c echo.Context) error { atid, err := syntax.ParseAtIdentifier(c.Param("atid")) if err != nil { - return echo.NewHTTPError(404, fmt.Sprintf("failed to parse handle or DID")) + return echo.NewHTTPError(404, "failed to parse handle or DID") } ident, err := srv.dir.Lookup(ctx, *atid) @@ -96,7 +96,7 @@ func (srv *Server) WebRepo(c echo.Context) error { atid, err := syntax.ParseAtIdentifier(c.Param("atid")) if err != nil { - return echo.NewHTTPError(400, fmt.Sprintf("failed to parse handle or DID")) + return echo.NewHTTPError(400, "failed to parse handle or DID") } ident, err := srv.dir.Lookup(ctx, *atid) @@ -133,12 +133,12 @@ func (srv *Server) WebRepoCollection(c echo.Context) error { atid, err := syntax.ParseAtIdentifier(c.Param("atid")) if err != nil { - return echo.NewHTTPError(400, fmt.Sprintf("failed to parse handle or DID")) + return echo.NewHTTPError(400, "failed to parse handle or DID") } collection, err := syntax.ParseNSID(c.Param("collection")) if err != nil { - return echo.NewHTTPError(400, fmt.Sprintf("failed to parse collection NSID")) + return echo.NewHTTPError(400, "failed to parse collection NSID") } ident, err := srv.dir.Lookup(ctx, *atid) @@ -191,17 +191,17 @@ func (srv *Server) WebRepoRecord(c echo.Context) error { atid, err := syntax.ParseAtIdentifier(c.Param("atid")) if err != nil { - return echo.NewHTTPError(400, fmt.Sprintf("failed to parse handle or DID")) + return echo.NewHTTPError(400, "failed to parse handle or DID") } collection, err := syntax.ParseNSID(c.Param("collection")) if err != nil { - return echo.NewHTTPError(400, fmt.Sprintf("failed to parse collection NSID")) + return echo.NewHTTPError(400, "failed to parse collection NSID") } rkey, err := syntax.ParseRecordKey(c.Param("rkey")) if err != nil { - return echo.NewHTTPError(400, fmt.Sprintf("failed to parse record key")) + return echo.NewHTTPError(400, "failed to parse record key") } ident, err := srv.dir.Lookup(ctx, *atid) diff --git a/cmd/bigsky/main.go b/cmd/bigsky/main.go index 319a0fd9b..77f7953f2 100644 --- a/cmd/bigsky/main.go +++ b/cmd/bigsky/main.go @@ -405,10 +405,11 @@ func runBigsky(cctx *cli.Context) error { rf := indexer.NewRepoFetcher(db, repoman, cctx.Int("max-fetch-concurrency")) - ix, err := indexer.NewIndexer(db, notifman, evtman, cachedidr, rf, true, cctx.Bool("spidering"), false) + ix, err := indexer.NewIndexer(db, notifman, evtman, cachedidr, rf, true, false, cctx.Bool("spidering")) if err != nil { return err } + defer ix.Shutdown() rlskip := cctx.String("bsky-social-rate-limit-skip") ix.ApplyPDSClientSettings = func(c *xrpc.Client) { diff --git a/cmd/gosky/main.go b/cmd/gosky/main.go index 8b15506c5..f0541ce33 100644 --- a/cmd/gosky/main.go +++ b/cmd/gosky/main.go @@ -19,6 +19,7 @@ import ( "github.com/bluesky-social/indigo/api/atproto" comatproto "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/api/bsky" + "github.com/bluesky-social/indigo/atproto/identity" "github.com/bluesky-social/indigo/atproto/syntax" "github.com/bluesky-social/indigo/events" "github.com/bluesky-social/indigo/events/schedulers/sequential" @@ -465,6 +466,18 @@ var getRecordCmd = &cli.Command{ return fmt.Errorf("unrecognized link") } + atid, err := syntax.ParseAtIdentifier(did) + if err != nil { + return err + } + + resp, err := identity.DefaultDirectory().Lookup(ctx, *atid) + if err != nil { + return err + } + + xrpcc.Host = resp.PDSEndpoint() + out, err := comatproto.RepoGetRecord(ctx, xrpcc, "", collection, did, rkey) if err != nil { return err @@ -493,7 +506,7 @@ var getRecordCmd = &cli.Command{ rc, rec, err := rr.GetRecord(ctx, cctx.Args().First()) if err != nil { - return err + return fmt.Errorf("get record failed: %w", err) } if cctx.Bool("raw") { diff --git a/cmd/netsync/main.go b/cmd/netsync/main.go index 24cd445ae..98d23a626 100644 --- a/cmd/netsync/main.go +++ b/cmd/netsync/main.go @@ -345,8 +345,8 @@ func Netsync(cctx *cli.Context) error { Handler: mux, } + state.wg.Add(1) go func() { - state.wg.Add(1) defer state.wg.Done() if err := metricsServer.ListenAndServe(); err != http.ErrServerClosed { logger.Error("failed to start metrics server", "err", err) @@ -368,8 +368,8 @@ func Netsync(cctx *cli.Context) error { } // Check for empty queue + state.wg.Add(1) go func() { - state.wg.Add(1) defer state.wg.Done() t := time.NewTicker(30 * time.Second) for { diff --git a/cmd/sonar/main.go b/cmd/sonar/main.go index 434d9be00..c5ece10ec 100644 --- a/cmd/sonar/main.go +++ b/cmd/sonar/main.go @@ -104,8 +104,8 @@ func Sonar(cctx *cli.Context) error { pool := sequential.NewScheduler(u.Host, s.HandleStreamEvent) // Start a goroutine to manage the cursor file, saving the current cursor every 5 seconds. + wg.Add(1) go func() { - wg.Add(1) defer wg.Done() ticker := time.NewTicker(5 * time.Second) logger := logger.With("source", "cursor_file_manager") @@ -130,8 +130,8 @@ func Sonar(cctx *cli.Context) error { }() // Start a goroutine to manage the liveness checker, shutting down if no events are received for 15 seconds + wg.Add(1) go func() { - wg.Add(1) defer wg.Done() ticker := time.NewTicker(15 * time.Second) lastSeq := int64(0) @@ -167,8 +167,8 @@ func Sonar(cctx *cli.Context) error { } // Startup metrics server + wg.Add(1) go func() { - wg.Add(1) defer wg.Done() logger = logger.With("source", "metrics_server") @@ -194,8 +194,8 @@ func Sonar(cctx *cli.Context) error { } defer c.Close() + wg.Add(1) go func() { - wg.Add(1) defer wg.Done() err = events.HandleRepoStream(ctx, c, pool, logger) logger.Info("HandleRepoStream returned unexpectedly", "err", err) diff --git a/fakedata/accounts.go b/fakedata/accounts.go index 85c978457..3337fc8f6 100644 --- a/fakedata/accounts.go +++ b/fakedata/accounts.go @@ -20,12 +20,8 @@ type AccountCatalog struct { func (ac *AccountCatalog) Combined() []AccountContext { var combined []AccountContext - for _, c := range ac.Celebs { - combined = append(combined, c) - } - for _, r := range ac.Regulars { - combined = append(combined, r) - } + combined = append(combined, ac.Celebs...) + combined = append(combined, ac.Regulars...) return combined } diff --git a/indexer/crawler.go b/indexer/crawler.go index 60ff9ff03..526da9bb6 100644 --- a/indexer/crawler.go +++ b/indexer/crawler.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "sync" + "time" comatproto "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/models" @@ -30,6 +31,8 @@ type CrawlDispatcher struct { concurrency int log *slog.Logger + + done chan struct{} } func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurrency int, log *slog.Logger) (*CrawlDispatcher, error) { @@ -37,7 +40,7 @@ func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurre return nil, fmt.Errorf("must specify a non-zero positive integer for crawl dispatcher concurrency") } - return &CrawlDispatcher{ + out := &CrawlDispatcher{ ingest: make(chan *models.ActorInfo), repoSync: make(chan *crawlWork), complete: make(chan models.Uid), @@ -47,7 +50,11 @@ func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurre todo: make(map[models.Uid]*crawlWork), inProgress: make(map[models.Uid]*crawlWork), log: log, - }, nil + done: make(chan struct{}), + } + go out.CatchupRepoGaugePoller() + + return out, nil } func (c *CrawlDispatcher) Run() { @@ -58,6 +65,10 @@ func (c *CrawlDispatcher) Run() { } } +func (c *CrawlDispatcher) Shutdown() { + close(c.done) +} + type catchupJob struct { evt *comatproto.SyncSubscribeRepos_Commit host *models.PDS @@ -177,13 +188,13 @@ func (c *CrawlDispatcher) dequeueJob(job *crawlWork) { } func (c *CrawlDispatcher) addToCatchupQueue(catchup *catchupJob) *crawlWork { - catchupEventsEnqueued.Inc() c.maplk.Lock() defer c.maplk.Unlock() // If the actor crawl is enqueued, we can append to the catchup queue which gets emptied during the crawl job, ok := c.todo[catchup.user.Uid] if ok { + catchupEventsEnqueued.WithLabelValues("todo").Inc() job.catchup = append(job.catchup, catchup) return nil } @@ -191,10 +202,12 @@ func (c *CrawlDispatcher) addToCatchupQueue(catchup *catchupJob) *crawlWork { // If the actor crawl is in progress, we can append to the nextr queue which gets emptied after the crawl job, ok = c.inProgress[catchup.user.Uid] if ok { + catchupEventsEnqueued.WithLabelValues("prog").Inc() job.next = append(job.next, catchup) return nil } + catchupEventsEnqueued.WithLabelValues("new").Inc() // Otherwise, we need to create a new crawl job for this actor and enqueue it cw := &crawlWork{ act: catchup.user, @@ -273,3 +286,21 @@ func (c *CrawlDispatcher) RepoInSlowPath(ctx context.Context, uid models.Uid) bo return false } + +func (c *CrawlDispatcher) countReposInSlowPath() int { + c.maplk.Lock() + defer c.maplk.Unlock() + return len(c.inProgress) + len(c.todo) +} + +func (c *CrawlDispatcher) CatchupRepoGaugePoller() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for { + select { + case <-c.done: + case <-ticker.C: + catchupReposGauge.Set(float64(c.countReposInSlowPath())) + } + } +} diff --git a/indexer/indexer.go b/indexer/indexer.go index 99ff905ce..2c3a2b53d 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -81,6 +81,12 @@ func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events return ix, nil } +func (ix *Indexer) Shutdown() { + if ix.Crawler != nil { + ix.Crawler.Shutdown() + } +} + func (ix *Indexer) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent) error { ctx, span := otel.Tracer("indexer").Start(ctx, "HandleRepoEvent") defer span.End() diff --git a/indexer/metrics.go b/indexer/metrics.go index 21a0cecb0..447460e8f 100644 --- a/indexer/metrics.go +++ b/indexer/metrics.go @@ -25,12 +25,22 @@ var reposFetched = promauto.NewCounterVec(prometheus.CounterOpts{ Help: "Number of repos fetched", }, []string{"status"}) -var catchupEventsEnqueued = promauto.NewCounter(prometheus.CounterOpts{ +var catchupEventsEnqueued = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "indexer_catchup_events_enqueued", Help: "Number of catchup events enqueued", -}) +}, []string{"how"}) var catchupEventsProcessed = promauto.NewCounter(prometheus.CounterOpts{ Name: "indexer_catchup_events_processed", Help: "Number of catchup events processed", }) + +var catchupEventsFailed = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "indexer_catchup_events_failed", + Help: "Number of catchup events processed", +}, []string{"err"}) + +var catchupReposGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "indexer_catchup_repos", + Help: "Number of repos waiting on catchup", +}) diff --git a/indexer/posts_test.go b/indexer/posts_test.go index aa6fc99b3..9b9fddb4a 100644 --- a/indexer/posts_test.go +++ b/indexer/posts_test.go @@ -81,6 +81,7 @@ func (ix *testIx) Cleanup() { if ix.dir != "" { _ = os.RemoveAll(ix.dir) } + ix.ix.Shutdown() } // TODO: dedupe this out into some testing utility package diff --git a/indexer/repofetch.go b/indexer/repofetch.go index 584c8902b..33d5e0a68 100644 --- a/indexer/repofetch.go +++ b/indexer/repofetch.go @@ -111,11 +111,13 @@ func (rf *RepoFetcher) FetchAndIndexRepo(ctx context.Context, job *crawlWork) er var pds models.PDS if err := rf.db.First(&pds, "id = ?", ai.PDS).Error; err != nil { + catchupEventsFailed.WithLabelValues("nopds").Inc() return fmt.Errorf("expected to find pds record (%d) in db for crawling one of their users: %w", ai.PDS, err) } rev, err := rf.repoman.GetRepoRev(ctx, ai.Uid) if err != nil && !isNotFound(err) { + catchupEventsFailed.WithLabelValues("noroot").Inc() return fmt.Errorf("failed to get repo root: %w", err) } diff --git a/lex/type_schema.go b/lex/type_schema.go index fcec3575a..62c5e6702 100644 --- a/lex/type_schema.go +++ b/lex/type_schema.go @@ -223,7 +223,7 @@ func (s *TypeSchema) WriteHandlerStub(w io.Writer, fname, shortname, impname str } returndef = fmt.Sprintf("(*%s.%s, error)", impname, outname) case "application/cbor", "application/vnd.ipld.car", "*/*": - returndef = fmt.Sprintf("(io.Reader, error)") + returndef = "(io.Reader, error)" default: return fmt.Errorf("unrecognized output encoding (handler stub): %q", s.Output.Encoding) } diff --git a/plc/memcached.go b/plc/memcached.go index 09b883dc1..50b0e2925 100644 --- a/plc/memcached.go +++ b/plc/memcached.go @@ -74,10 +74,10 @@ func (r *MemcachedDidResolver) GetDocument(ctx context.Context, didstr string) ( doc, ok := r.tryCache(didstr) if ok { span.SetAttributes(attribute.Bool("cache", true)) - cacheHitsTotal.Inc() + memcacheHitsTotal.Inc() return doc, nil } - cacheMissesTotal.Inc() + memcacheMissesTotal.Inc() span.SetAttributes(attribute.Bool("cache", false)) doc, err := r.res.GetDocument(ctx, didstr) diff --git a/plc/metrics.go b/plc/metrics.go index 3a2521bd9..ea2af2022 100644 --- a/plc/metrics.go +++ b/plc/metrics.go @@ -14,3 +14,13 @@ var cacheMissesTotal = promauto.NewCounter(prometheus.CounterOpts{ Name: "plc_cache_misses_total", Help: "Total number of cache misses", }) + +var memcacheHitsTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "plc_memcache_hits_total", + Help: "Total number of cache hits", +}) + +var memcacheMissesTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "plc_memcache_misses_total", + Help: "Total number of cache misses", +}) diff --git a/repomgr/ingest_test.go b/repomgr/ingest_test.go index 38a8562e5..03444b9f3 100644 --- a/repomgr/ingest_test.go +++ b/repomgr/ingest_test.go @@ -208,14 +208,14 @@ func TestDuplicateRecord(t *testing.T) { } p1, _, err := repoman.CreateRecord(ctx, 1, "app.bsky.feed.post", &bsky.FeedPost{ - Text: fmt.Sprintf("hello friend"), + Text: "hello friend", }) if err != nil { t.Fatal(err) } p2, _, err := repoman.CreateRecord(ctx, 1, "app.bsky.feed.post", &bsky.FeedPost{ - Text: fmt.Sprintf("hello friend"), + Text: "hello friend", }) if err != nil { t.Fatal(err) diff --git a/search/handlers.go b/search/handlers.go index 735080dbd..a5d2b27ce 100644 --- a/search/handlers.go +++ b/search/handlers.go @@ -39,7 +39,7 @@ func parseCursorLimit(e echo.Context) (int, int, error) { if offset > 10000 { return 0, 0, &echo.HTTPError{ Code: 400, - Message: fmt.Sprintf("invalid value for 'cursor' (can't paginate so deep)"), + Message: "invalid value for 'cursor' (can't paginate so deep)", } } diff --git a/search/indexing.go b/search/indexing.go index 1a599be5a..2d2082a21 100644 --- a/search/indexing.go +++ b/search/indexing.go @@ -130,7 +130,7 @@ func NewIndexer(db *gorm.DB, escli *es.Client, dir identity.Directory, config In opts.SyncRequestsPerSecond = 8 } - opts.CheckoutPath = fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo", relayHTTP) + opts.RelayHost = relayHTTP if config.IndexMaxConcurrency > 0 { opts.ParallelRecordCreates = config.IndexMaxConcurrency } else { @@ -145,6 +145,8 @@ func NewIndexer(db *gorm.DB, escli *es.Client, dir identity.Directory, config In idx.handleDelete, opts, ) + // reuse identity directory (for efficient caching) + bf.Directory = dir idx.bfs = bfstore idx.bf = bf diff --git a/search/transform.go b/search/transform.go index c06cea46c..917b622d8 100644 --- a/search/transform.go +++ b/search/transform.go @@ -267,9 +267,7 @@ func parsePostTags(p *appbsky.FeedPost) []string { } } } - for _, t := range p.Tags { - ret = append(ret, t) - } + ret = append(ret, p.Tags...) if len(ret) == 0 { return nil }