From af8753cb5b3476a8be2e3884d12764a7309edf94 Mon Sep 17 00:00:00 2001 From: Hailey Date: Tue, 19 Nov 2024 15:16:54 -0800 Subject: [PATCH 1/5] configurable circuit breakers --- automod/engine/effects.go | 12 ------------ automod/engine/engine.go | 8 ++++++++ automod/engine/persisthelpers.go | 10 +++++----- cmd/hepa/main.go | 28 ++++++++++++++++++++++++++++ cmd/hepa/server.go | 11 +++++++++++ 5 files changed, 52 insertions(+), 17 deletions(-) diff --git a/automod/engine/effects.go b/automod/engine/effects.go index a318615cc..10bbc1fb0 100644 --- a/automod/engine/effects.go +++ b/automod/engine/effects.go @@ -2,18 +2,6 @@ package engine import ( "sync" - "time" -) - -var ( - // time period within which automod will not re-report an account for the same reasonType - ReportDupePeriod = 1 * 24 * time.Hour - // number of reports automod can file per day, for all subjects and types combined (circuit breaker) - QuotaModReportDay = 2000 - // number of takedowns automod can action per day, for all subjects combined (circuit breaker) - QuotaModTakedownDay = 200 - // number of misc actions automod can do per day, for all subjects combined (circuit breaker) - QuotaModActionDay = 1000 ) type CounterRef struct { diff --git a/automod/engine/engine.go b/automod/engine/engine.go index b313a4492..933987072 100644 --- a/automod/engine/engine.go +++ b/automod/engine/engine.go @@ -52,6 +52,14 @@ type Engine struct { type EngineConfig struct { // if enabled, account metadata is not hydrated for every event by default SkipAccountMeta bool + // time period within which automod will not re-report an account for the same reasonType + ReportDupePeriod time.Duration + // number of reports automod can file per day, for all subjects and types combined (circuit breaker) + QuotaModReportDay int + // number of takedowns automod can action per day, for all subjects combined (circuit breaker) + QuotaModTakedownDay int + // number of misc actions automod can do per day, for all subjects combined (circuit breaker) + QuotaModActionDay int } // Entrypoint for external code pushing #identity events in to the engine. diff --git a/automod/engine/persisthelpers.go b/automod/engine/persisthelpers.go index 42ba8934e..b08322223 100644 --- a/automod/engine/persisthelpers.go +++ b/automod/engine/persisthelpers.go @@ -98,7 +98,7 @@ func (eng *Engine) circuitBreakReports(ctx context.Context, reports []ModReport) if err != nil { return nil, fmt.Errorf("checking report action quota: %w", err) } - if c >= QuotaModReportDay { + if c >= eng.Config.QuotaModReportDay { eng.Logger.Warn("CIRCUIT BREAKER: automod reports") return []ModReport{}, nil } @@ -117,7 +117,7 @@ func (eng *Engine) circuitBreakTakedown(ctx context.Context, takedown bool) (boo if err != nil { return false, fmt.Errorf("checking takedown action quota: %w", err) } - if c >= QuotaModTakedownDay { + if c >= eng.Config.QuotaModTakedownDay { eng.Logger.Warn("CIRCUIT BREAKER: automod takedowns") return false, nil } @@ -137,7 +137,7 @@ func (eng *Engine) circuitBreakModAction(ctx context.Context, action bool) (bool if err != nil { return false, fmt.Errorf("checking mod action quota: %w", err) } - if c >= QuotaModActionDay { + if c >= eng.Config.QuotaModActionDay { eng.Logger.Warn("CIRCUIT BREAKER: automod action") return false, nil } @@ -191,7 +191,7 @@ func (eng *Engine) createReportIfFresh(ctx context.Context, xrpcc *xrpc.Client, if err != nil { return false, err } - if time.Since(created.Time()) > ReportDupePeriod { + if time.Since(created.Time()) > eng.Config.ReportDupePeriod { continue } @@ -267,7 +267,7 @@ func (eng *Engine) createRecordReportIfFresh(ctx context.Context, xrpcc *xrpc.Cl if err != nil { return false, err } - if time.Since(created.Time()) > ReportDupePeriod { + if time.Since(created.Time()) > eng.Config.ReportDupePeriod { continue } diff --git a/cmd/hepa/main.go b/cmd/hepa/main.go index 7883d9c29..9e3029b6c 100644 --- a/cmd/hepa/main.go +++ b/cmd/hepa/main.go @@ -149,6 +149,30 @@ func run(args []string) error { Usage: "secret token for prescreen server", EnvVars: []string{"HEPA_PRESCREEN_TOKEN"}, }, + &cli.DurationFlag{ + Name: "report-dupe-period", + Usage: "time period within which automod will not re-report an account for the same reasonType", + EnvVars: []string{"HEPA_REPORT_DUPE_PERIOD"}, + Value: 1 * 24 * time.Hour, + }, + &cli.IntFlag{ + Name: "quota-mod-report-day", + Usage: "number of reports automod can file per day, for all subjects and types combined (circuit breaker)", + EnvVars: []string{"HEPA_QUOTA_MOD_REPORT_DAY"}, + Value: 10000, + }, + &cli.IntFlag{ + Name: "quota-mod-takedown-day", + Usage: "number of takedowns automod can action per day, for all subjects combined (circuit breaker)", + EnvVars: []string{"HEPA_QUOTA_MOD_TAKEDOWN_DAY"}, + Value: 200, + }, + &cli.IntFlag{ + Name: "quota-mod-action-day", + Usage: "number of misc actions automod can do per day, for all subjects combined (circuit breaker)", + EnvVars: []string{"HEPA_QUOTA_MOD_ACTION_DAY"}, + Value: 2000, + }, } app.Commands = []*cli.Command{ @@ -255,6 +279,10 @@ var runCmd = &cli.Command{ FirehoseParallelism: cctx.Int("firehose-parallelism"), // DEPRECATED PreScreenHost: cctx.String("prescreen-host"), PreScreenToken: cctx.String("prescreen-token"), + ReportDupePeriod: cctx.Duration("report-dupe-period"), + QuotaModReportDay: cctx.Int("quota-mod-report-day"), + QuotaModTakedownDay: cctx.Int("quota-mod-takedown-day"), + QuotaModActionDay: cctx.Int("quota-mod-action-day"), }, ) if err != nil { diff --git a/cmd/hepa/server.go b/cmd/hepa/server.go index 9fe08f98e..a4ca252d1 100644 --- a/cmd/hepa/server.go +++ b/cmd/hepa/server.go @@ -14,6 +14,7 @@ import ( "github.com/bluesky-social/indigo/automod" "github.com/bluesky-social/indigo/automod/cachestore" "github.com/bluesky-social/indigo/automod/countstore" + "github.com/bluesky-social/indigo/automod/engine" "github.com/bluesky-social/indigo/automod/flagstore" "github.com/bluesky-social/indigo/automod/rules" "github.com/bluesky-social/indigo/automod/setstore" @@ -54,6 +55,10 @@ type Config struct { FirehoseParallelism int // DEPRECATED PreScreenHost string PreScreenToken string + ReportDupePeriod time.Duration + QuotaModReportDay int + QuotaModTakedownDay int + QuotaModActionDay int } func NewServer(dir identity.Directory, config Config) (*Server, error) { @@ -219,6 +224,12 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) { OzoneClient: ozoneClient, AdminClient: adminClient, BlobClient: blobClient, + Config: engine.EngineConfig{ + ReportDupePeriod: config.ReportDupePeriod, + QuotaModReportDay: config.QuotaModReportDay, + QuotaModTakedownDay: config.QuotaModTakedownDay, + QuotaModActionDay: config.QuotaModActionDay, + }, } s := &Server{ From 23deea7877ab7674abffe43aa1a85b6d16806f29 Mon Sep 17 00:00:00 2001 From: Hailey Date: Tue, 19 Nov 2024 15:20:17 -0800 Subject: [PATCH 2/5] fix test --- automod/engine/circuit_breaker_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/automod/engine/circuit_breaker_test.go b/automod/engine/circuit_breaker_test.go index 43c38393a..dee3dcc52 100644 --- a/automod/engine/circuit_breaker_test.go +++ b/automod/engine/circuit_breaker_test.go @@ -44,7 +44,7 @@ func TestTakedownCircuitBreaker(t *testing.T) { p1cbor := p1buf.Bytes() // generate double the quote of events; expect to only count the quote worth of actions - for i := 0; i < 2*QuotaModTakedownDay; i++ { + for i := 0; i < 2*eng.Config.QuotaModTakedownDay; i++ { ident := identity.Identity{ DID: syntax.DID(fmt.Sprintf("did:plc:abc%d", i)), Handle: syntax.Handle("handle.example.com"), @@ -63,7 +63,7 @@ func TestTakedownCircuitBreaker(t *testing.T) { takedowns, err := eng.Counters.GetCount(ctx, "automod-quota", "takedown", countstore.PeriodDay) assert.NoError(err) - assert.Equal(QuotaModTakedownDay, takedowns) + assert.Equal(eng.Config.QuotaModTakedownDay, takedowns) reports, err := eng.Counters.GetCount(ctx, "automod-quota", "report", countstore.PeriodDay) assert.NoError(err) @@ -89,7 +89,7 @@ func TestReportCircuitBreaker(t *testing.T) { p1cbor := p1buf.Bytes() // generate double the quota of events; expect to only count the quota worth of actions - for i := 0; i < 2*QuotaModReportDay; i++ { + for i := 0; i < 2*eng.Config.QuotaModReportDay; i++ { ident := identity.Identity{ DID: syntax.DID(fmt.Sprintf("did:plc:abc%d", i)), Handle: syntax.Handle("handle.example.com"), @@ -112,5 +112,5 @@ func TestReportCircuitBreaker(t *testing.T) { reports, err := eng.Counters.GetCount(ctx, "automod-quota", "report", countstore.PeriodDay) assert.NoError(err) - assert.Equal(QuotaModReportDay, reports) + assert.Equal(eng.Config.QuotaModReportDay, reports) } From 40a55ceda860e96bff656efaf47437ab49d1c733 Mon Sep 17 00:00:00 2001 From: Hailey Date: Tue, 19 Nov 2024 15:39:02 -0800 Subject: [PATCH 3/5] default values --- automod/engine/persisthelpers.go | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/automod/engine/persisthelpers.go b/automod/engine/persisthelpers.go index b08322223..a584d5ca7 100644 --- a/automod/engine/persisthelpers.go +++ b/automod/engine/persisthelpers.go @@ -98,7 +98,12 @@ func (eng *Engine) circuitBreakReports(ctx context.Context, reports []ModReport) if err != nil { return nil, fmt.Errorf("checking report action quota: %w", err) } - if c >= eng.Config.QuotaModReportDay { + + quotaModReportDay := eng.Config.QuotaModReportDay + if quotaModReportDay == 0 { + quotaModReportDay = 10000 + } + if c >= quotaModReportDay { eng.Logger.Warn("CIRCUIT BREAKER: automod reports") return []ModReport{}, nil } @@ -117,7 +122,11 @@ func (eng *Engine) circuitBreakTakedown(ctx context.Context, takedown bool) (boo if err != nil { return false, fmt.Errorf("checking takedown action quota: %w", err) } - if c >= eng.Config.QuotaModTakedownDay { + quotaModTakedownDay := eng.Config.QuotaModTakedownDay + if quotaModTakedownDay == 0 { + quotaModTakedownDay = 200 + } + if c >= quotaModTakedownDay { eng.Logger.Warn("CIRCUIT BREAKER: automod takedowns") return false, nil } @@ -137,7 +146,11 @@ func (eng *Engine) circuitBreakModAction(ctx context.Context, action bool) (bool if err != nil { return false, fmt.Errorf("checking mod action quota: %w", err) } - if c >= eng.Config.QuotaModActionDay { + quotaModActionDay := eng.Config.QuotaModActionDay + if quotaModActionDay == 0 { + quotaModActionDay = 2000 + } + if c >= quotaModActionDay { eng.Logger.Warn("CIRCUIT BREAKER: automod action") return false, nil } @@ -191,7 +204,11 @@ func (eng *Engine) createReportIfFresh(ctx context.Context, xrpcc *xrpc.Client, if err != nil { return false, err } - if time.Since(created.Time()) > eng.Config.ReportDupePeriod { + reportDupePeriod := eng.Config.ReportDupePeriod + if reportDupePeriod == 0 { + reportDupePeriod = 1 * 24 * time.Hour + } + if time.Since(created.Time()) > reportDupePeriod { continue } From cd076e2e48a2059fa926c84b951eaa7c9ee4cca8 Mon Sep 17 00:00:00 2001 From: Hailey Date: Tue, 19 Nov 2024 15:43:03 -0800 Subject: [PATCH 4/5] one more default value --- automod/engine/persisthelpers.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/automod/engine/persisthelpers.go b/automod/engine/persisthelpers.go index a584d5ca7..906422ab9 100644 --- a/automod/engine/persisthelpers.go +++ b/automod/engine/persisthelpers.go @@ -284,7 +284,11 @@ func (eng *Engine) createRecordReportIfFresh(ctx context.Context, xrpcc *xrpc.Cl if err != nil { return false, err } - if time.Since(created.Time()) > eng.Config.ReportDupePeriod { + reportDupePeriod := eng.Config.ReportDupePeriod + if reportDupePeriod == 0 { + reportDupePeriod = 1 * 24 * time.Hour + } + if time.Since(created.Time()) > reportDupePeriod { continue } From 85b52ddd8dc5627bd99786088767b7a544cc6555 Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Tue, 19 Nov 2024 15:53:03 -0500 Subject: [PATCH 5/5] stream resync to indexer crawler --- bgs/bgs.go | 41 ++++++++++++----------------------------- 1 file changed, 12 insertions(+), 29 deletions(-) diff --git a/bgs/bgs.go b/bgs/bgs.go index b64df715e..18eab263f 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -1646,15 +1646,13 @@ func (bgs *BGS) ResyncPDS(ctx context.Context, pds models.PDS) error { log.Warnw("listed all repos, checking roots", "num_repos", len(repos), "took", repolistDone.Sub(start)) resync = bgs.SetResyncStatus(pds.ID, "checking revs") - // Create a buffered channel for collecting results - results := make(chan revCheckResult, len(repos)) + // run loop over repos with some concurrency sem := semaphore.NewWeighted(40) // Check repo revs against our local copy and enqueue crawls for any that are out of date - for _, r := range repos { + for i, r := range repos { if err := sem.Acquire(ctx, 1); err != nil { log.Errorw("failed to acquire semaphore", "error", err) - results <- revCheckResult{err: err} continue } go func(r comatproto.SyncListRepos_Repo) { @@ -1664,56 +1662,41 @@ func (bgs *BGS) ResyncPDS(ctx context.Context, pds models.PDS) error { ai, err := bgs.Index.GetUserOrMissing(ctx, r.Did) if err != nil { log.Errorw("failed to get user while resyncing PDS, we can't recrawl it", "error", err) - results <- revCheckResult{err: err} return } rev, err := bgs.repoman.GetRepoRev(ctx, ai.Uid) if err != nil { log.Warnw("recrawling because we failed to get the local repo root", "err", err, "uid", ai.Uid) - results <- revCheckResult{ai: ai} + err := bgs.Index.Crawler.Crawl(ctx, ai) + if err != nil { + log.Errorw("failed to enqueue crawl for repo during resync", "error", err, "uid", ai.Uid, "did", ai.Did) + } return } if rev == "" || rev < r.Rev { log.Warnw("recrawling because the repo rev from the PDS is newer than our local repo rev", "local_rev", rev) - results <- revCheckResult{ai: ai} + err := bgs.Index.Crawler.Crawl(ctx, ai) + if err != nil { + log.Errorw("failed to enqueue crawl for repo during resync", "error", err, "uid", ai.Uid, "did", ai.Did) + } return } - - results <- revCheckResult{} }(r) - } - - var numReposToResync int - for i := 0; i < len(repos); i++ { - res := <-results - if res.err != nil { - log.Errorw("failed to process repo during resync", "error", res.err) - - } - if res.ai != nil { - numReposToResync++ - err := bgs.Index.Crawler.Crawl(ctx, res.ai) - if err != nil { - log.Errorw("failed to enqueue crawl for repo during resync", "error", err, "uid", res.ai.Uid, "did", res.ai.Did) - } - } if i%100 == 0 { if i%10_000 == 0 { - log.Warnw("checked revs during resync", "num_repos_checked", i, "num_repos_to_crawl", numReposToResync, "took", time.Now().Sub(resync.StatusChangedAt)) + log.Warnw("checked revs during resync", "num_repos_checked", i, "num_repos_to_crawl", -1, "took", time.Now().Sub(resync.StatusChangedAt)) } resync.NumReposChecked = i - resync.NumReposToResync = numReposToResync bgs.UpdateResync(resync) } } resync.NumReposChecked = len(repos) - resync.NumReposToResync = numReposToResync bgs.UpdateResync(resync) - log.Warnw("enqueued all crawls, exiting resync", "took", time.Now().Sub(start), "num_repos_to_crawl", numReposToResync) + log.Warnw("enqueued all crawls, exiting resync", "took", time.Now().Sub(start), "num_repos_to_crawl", -1) return nil }