diff --git a/automod/action_dedupe_test.go b/automod/action_dedupe_test.go new file mode 100644 index 000000000..316f10f28 --- /dev/null +++ b/automod/action_dedupe_test.go @@ -0,0 +1,45 @@ +package automod + +import ( + "context" + "testing" + + appbsky "github.com/bluesky-social/indigo/api/bsky" + "github.com/bluesky-social/indigo/atproto/identity" + "github.com/bluesky-social/indigo/atproto/syntax" + + "github.com/stretchr/testify/assert" +) + +func alwaysReportAccountRule(evt *RecordEvent) error { + evt.ReportAccount(ReportReasonOther, "test report") + return nil +} + +func TestAccountReportDedupe(t *testing.T) { + assert := assert.New(t) + ctx := context.Background() + engine := engineFixture() + engine.Rules = RuleSet{ + RecordRules: []RecordRuleFunc{ + alwaysReportAccountRule, + }, + } + + path := "app.bsky.feed.post/abc123" + cid1 := "cid123" + p1 := appbsky.FeedPost{Text: "some post blah"} + id1 := identity.Identity{ + DID: syntax.DID("did:plc:abc111"), + Handle: syntax.Handle("handle.example.com"), + } + + // exact same event multiple times; should only report once + for i := 0; i < 5; i++ { + assert.NoError(engine.ProcessRecord(ctx, id1.DID, path, cid1, &p1)) + } + + reports, err := engine.GetCount("automod-quota", "report", PeriodDay) + assert.NoError(err) + assert.Equal(1, reports) +} diff --git a/automod/circuit_breaker_test.go b/automod/circuit_breaker_test.go new file mode 100644 index 000000000..7181f9a62 --- /dev/null +++ b/automod/circuit_breaker_test.go @@ -0,0 +1,94 @@ +package automod + +import ( + "context" + "fmt" + "testing" + + appbsky "github.com/bluesky-social/indigo/api/bsky" + "github.com/bluesky-social/indigo/atproto/identity" + "github.com/bluesky-social/indigo/atproto/syntax" + + "github.com/stretchr/testify/assert" +) + +func alwaysTakedownRecordRule(evt *RecordEvent) error { + evt.TakedownRecord() + return nil +} + +func alwaysReportRecordRule(evt *RecordEvent) error { + evt.ReportRecord(ReportReasonOther, "test report") + return nil +} + +func TestTakedownCircuitBreaker(t *testing.T) { + assert := assert.New(t) + ctx := context.Background() + engine := engineFixture() + dir := identity.NewMockDirectory() + engine.Directory = &dir + // note that this is a record-level action, not account-level + engine.Rules = RuleSet{ + RecordRules: []RecordRuleFunc{ + alwaysTakedownRecordRule, + }, + } + + path := "app.bsky.feed.post/abc123" + cid1 := "cid123" + p1 := appbsky.FeedPost{Text: "some post blah"} + + // generate double the quote of events; expect to only count the quote worth of actions + for i := 0; i < 2*QuotaModTakedownDay; i++ { + ident := identity.Identity{ + DID: syntax.DID(fmt.Sprintf("did:plc:abc%d", i)), + Handle: syntax.Handle("handle.example.com"), + } + dir.Insert(ident) + assert.NoError(engine.ProcessRecord(ctx, ident.DID, path, cid1, &p1)) + } + + takedowns, err := engine.GetCount("automod-quota", "takedown", PeriodDay) + assert.NoError(err) + assert.Equal(QuotaModTakedownDay, takedowns) + + reports, err := engine.GetCount("automod-quota", "report", PeriodDay) + assert.NoError(err) + assert.Equal(0, reports) +} + +func TestReportCircuitBreaker(t *testing.T) { + assert := assert.New(t) + ctx := context.Background() + engine := engineFixture() + dir := identity.NewMockDirectory() + engine.Directory = &dir + engine.Rules = RuleSet{ + RecordRules: []RecordRuleFunc{ + alwaysReportRecordRule, + }, + } + + path := "app.bsky.feed.post/abc123" + cid1 := "cid123" + p1 := appbsky.FeedPost{Text: "some post blah"} + + // generate double the quota of events; expect to only count the quota worth of actions + for i := 0; i < 2*QuotaModReportDay; i++ { + ident := identity.Identity{ + DID: syntax.DID(fmt.Sprintf("did:plc:abc%d", i)), + Handle: syntax.Handle("handle.example.com"), + } + dir.Insert(ident) + assert.NoError(engine.ProcessRecord(ctx, ident.DID, path, cid1, &p1)) + } + + takedowns, err := engine.GetCount("automod-quota", "takedown", PeriodDay) + assert.NoError(err) + assert.Equal(0, takedowns) + + reports, err := engine.GetCount("automod-quota", "report", PeriodDay) + assert.NoError(err) + assert.Equal(QuotaModReportDay, reports) +} diff --git a/automod/countstore/countstore_test.go b/automod/countstore/countstore_test.go index d323d83b1..e9d1d5665 100644 --- a/automod/countstore/countstore_test.go +++ b/automod/countstore/countstore_test.go @@ -20,9 +20,12 @@ func TestMemCountStoreBasics(t *testing.T) { assert.Equal(0, c) assert.NoError(cs.Increment(ctx, "test1", "val1")) assert.NoError(cs.Increment(ctx, "test1", "val1")) - c, err = cs.GetCount(ctx, "test1", "val1", PeriodTotal) - assert.NoError(err) - assert.Equal(2, c) + + for _, period := range []string{PeriodTotal, PeriodDay, PeriodHour} { + c, err = cs.GetCount(ctx, "test1", "val1", period) + assert.NoError(err) + assert.Equal(2, c) + } c, err = cs.GetCountDistinct(ctx, "test2", "val2", PeriodTotal) assert.NoError(err) @@ -36,9 +39,12 @@ func TestMemCountStoreBasics(t *testing.T) { assert.NoError(cs.IncrementDistinct(ctx, "test2", "val2", "two")) assert.NoError(cs.IncrementDistinct(ctx, "test2", "val2", "three")) - c, err = cs.GetCountDistinct(ctx, "test2", "val2", PeriodTotal) - assert.NoError(err) - assert.Equal(3, c) + + for _, period := range []string{PeriodTotal, PeriodDay, PeriodHour} { + c, err = cs.GetCountDistinct(ctx, "test2", "val2", period) + assert.NoError(err) + assert.Equal(3, c) + } } func TestMemCountStoreConcurrent(t *testing.T) { diff --git a/automod/engine.go b/automod/engine.go index 416224f85..2258fa873 100644 --- a/automod/engine.go +++ b/automod/engine.go @@ -72,6 +72,10 @@ func (e *Engine) ProcessIdentityEvent(ctx context.Context, t string, did syntax. if err := evt.PersistCounters(ctx); err != nil { return err } + // check for any new errors during persist + if evt.Err != nil { + return evt.Err + } return nil } @@ -114,6 +118,10 @@ func (e *Engine) ProcessRecord(ctx context.Context, did syntax.DID, path, recCID if err := evt.PersistCounters(ctx); err != nil { return err } + // check for any new errors during persist + if evt.Err != nil { + return evt.Err + } return nil } diff --git a/automod/event.go b/automod/event.go index 544c37c64..b95af1057 100644 --- a/automod/event.go +++ b/automod/event.go @@ -5,15 +5,22 @@ import ( "fmt" "log/slog" "strings" + "time" comatproto "github.com/bluesky-social/indigo/api/atproto" appbsky "github.com/bluesky-social/indigo/api/bsky" + "github.com/bluesky-social/indigo/atproto/syntax" + "github.com/bluesky-social/indigo/xrpc" ) -type ModReport struct { - ReasonType string - Comment string -} +var ( + // time period within which automod will not re-report an account for the same reasonType + ReportDupePeriod = 7 * 24 * time.Hour + // number of reports automod can file per day, for all subjects and types combined (circuit breaker) + QuotaModReportDay = 50 + // number of takedowns automod can action per day, for all subjects combined (circuit breaker) + QuotaModTakedownDay = 10 +) type CounterRef struct { Name string @@ -124,10 +131,21 @@ func (e *RepoEvent) AddAccountFlag(val string) { // Enqueues a moderation report to be filed against the account at the end of rule processing. func (e *RepoEvent) ReportAccount(reason, comment string) { + if comment == "" { + comment = "(no comment)" + } + comment = "automod: " + comment e.AccountReports = append(e.AccountReports, ModReport{ReasonType: reason, Comment: comment}) } -func slackBody(msg string, newLabels, newFlags []string, newReports []ModReport, newTakedown bool) string { +func slackBody(header string, acct AccountMeta, newLabels, newFlags []string, newReports []ModReport, newTakedown bool) string { + msg := header + msg += fmt.Sprintf("`%s` / `%s` / / \n", + acct.Identity.DID, + acct.Identity.Handle, + acct.Identity.DID, + acct.Identity.DID, + ) if len(newLabels) > 0 { msg += fmt.Sprintf("New Labels: `%s`\n", strings.Join(newLabels, ", ")) } @@ -143,24 +161,17 @@ func slackBody(msg string, newLabels, newFlags []string, newReports []ModReport, return msg } -// Persists account-level moderation actions: new labels, new flags, new takedowns, and reports. -// -// If necessary, will "purge" identity and account caches, so that state updates will be picked up for subsequent events. -// -// TODO: de-dupe reports based on existing state, similar to other state -func (e *RepoEvent) PersistAccountActions(ctx context.Context) error { - - // de-dupe actions +func dedupeLabelActions(labels, existing, existingNegated []string) []string { newLabels := []string{} - for _, val := range dedupeStrings(e.AccountLabels) { + for _, val := range dedupeStrings(labels) { exists := false - for _, e := range e.Account.AccountNegatedLabels { + for _, e := range existingNegated { if val == e { exists = true break } } - for _, e := range e.Account.AccountLabels { + for _, e := range existing { if val == e { exists = true break @@ -170,10 +181,14 @@ func (e *RepoEvent) PersistAccountActions(ctx context.Context) error { newLabels = append(newLabels, val) } } + return newLabels +} + +func dedupeFlagActions(flags, existing []string) []string { newFlags := []string{} - for _, val := range dedupeStrings(e.AccountFlags) { + for _, val := range dedupeStrings(flags) { exists := false - for _, e := range e.Account.AccountFlags { + for _, e := range existing { if val == e { exists = true break @@ -183,32 +198,124 @@ func (e *RepoEvent) PersistAccountActions(ctx context.Context) error { newFlags = append(newFlags, val) } } - newReports := e.AccountReports - newTakedown := e.AccountTakedown && !e.Account.Takendown + return newFlags +} - if newTakedown || len(newLabels) > 0 || len(newFlags) > 0 || len(newReports) > 0 { - if e.Engine.SlackWebhookURL != "" { - msg := fmt.Sprintf("⚠️ Automod Account Action ⚠️\n") - msg += fmt.Sprintf("`%s` / `%s` / / \n", - e.Account.Identity.DID, - e.Account.Identity.Handle, - e.Account.Identity.DID, - e.Account.Identity.DID, - ) - msg = slackBody(msg, newLabels, newFlags, newReports, newTakedown) - if err := e.Engine.SendSlackMsg(ctx, msg); err != nil { - e.Logger.Error("sending slack webhook", "err", err) - } +func dedupeReportActions(evt *RepoEvent, reports []ModReport) []ModReport { + newReports := []ModReport{} + for _, r := range reports { + counterName := "automod-account-report-" + reasonShortName(r.ReasonType) + existing := evt.GetCount(counterName, evt.Account.Identity.DID.String(), PeriodDay) + if existing > 0 { + evt.Logger.Debug("skipping account report due to counter", "existing", existing, "reason", reasonShortName(r.ReasonType)) + } else { + evt.Increment(counterName, evt.Account.Identity.DID.String()) + newReports = append(newReports, r) } } + return newReports +} + +func circuitBreakReports(evt *RepoEvent, reports []ModReport) []ModReport { + if len(reports) == 0 { + return []ModReport{} + } + if evt.GetCount("automod-quota", "report", PeriodDay) >= QuotaModReportDay { + evt.Logger.Warn("CIRCUIT BREAKER: automod reports") + return []ModReport{} + } + evt.Increment("automod-quota", "report") + return reports +} + +func circuitBreakTakedown(evt *RepoEvent, takedown bool) bool { + if !takedown { + return takedown + } + if evt.GetCount("automod-quota", "takedown", PeriodDay) >= QuotaModTakedownDay { + evt.Logger.Warn("CIRCUIT BREAKER: automod takedowns") + return false + } + evt.Increment("automod-quota", "takedown") + return takedown +} + +// Creates a moderation report, but checks first if there was a similar recent one, and skips if so. +// +// Returns a bool indicating if a new report was created. +func createReportIfFresh(ctx context.Context, xrpcc *xrpc.Client, evt RepoEvent, mr ModReport) (bool, error) { + // before creating a report, query to see if automod has already reported this account in the past week for the same reason + // NOTE: this is running in an inner loop (if there are multiple reports), which is a bit inefficient, but seems acceptable + // AdminQueryModerationEvents(ctx context.Context, c *xrpc.Client, createdBy string, cursor string, inc ludeAllUserRecords bool, limit int64, sortDirection string, subject string, types []string) + resp, err := comatproto.AdminQueryModerationEvents(ctx, xrpcc, xrpcc.Auth.Did, "", false, 5, "", evt.Account.Identity.DID.String(), []string{"com.atproto.admin.defs#modEventReport"}) + if err != nil { + return false, err + } + for _, modEvt := range resp.Events { + // defensively ensure that our query params worked correctly + if modEvt.Event.AdminDefs_ModEventReport == nil || modEvt.CreatedBy != xrpcc.Auth.Did || modEvt.Subject.AdminDefs_RepoRef == nil || modEvt.Subject.AdminDefs_RepoRef.Did != evt.Account.Identity.DID.String() || (modEvt.Event.AdminDefs_ModEventReport.ReportType != nil && *modEvt.Event.AdminDefs_ModEventReport.ReportType != mr.ReasonType) { + continue + } + // igonre if older + created, err := syntax.ParseDatetime(modEvt.CreatedAt) + if err != nil { + return false, err + } + if time.Since(created.Time()) > ReportDupePeriod { + continue + } + + // there is a recent report which is similar to this one + evt.Logger.Info("skipping duplicate account report due to API check") + return false, nil + } + + evt.Logger.Info("reporting account", "reasonType", mr.ReasonType, "comment", mr.Comment) + _, err = comatproto.ModerationCreateReport(ctx, xrpcc, &comatproto.ModerationCreateReport_Input{ + ReasonType: &mr.ReasonType, + Reason: &mr.Comment, + Subject: &comatproto.ModerationCreateReport_Input_Subject{ + AdminDefs_RepoRef: &comatproto.AdminDefs_RepoRef{ + Did: evt.Account.Identity.DID.String(), + }, + }, + }) + return true, nil +} + +// Persists account-level moderation actions: new labels, new flags, new takedowns, and reports. +// +// If necessary, will "purge" identity and account caches, so that state updates will be picked up for subsequent events. +// +// Note that this method expects to run *before* counts are persisted (it accesses and updates some counts) +func (e *RepoEvent) PersistAccountActions(ctx context.Context) error { + + // de-dupe actions + newLabels := dedupeLabelActions(e.AccountLabels, e.Account.AccountLabels, e.Account.AccountNegatedLabels) + newFlags := dedupeFlagActions(e.AccountFlags, e.Account.AccountFlags) + + // don't report the same account multiple times on the same day for the same reason. this is a quick check; we also query the mod service API just before creating the report. + newReports := circuitBreakReports(e, dedupeReportActions(e, e.AccountReports)) + newTakedown := circuitBreakTakedown(e, e.AccountTakedown && !e.Account.Takendown) + + anyModActions := newTakedown || len(newLabels) > 0 || len(newFlags) > 0 || len(newReports) > 0 + if anyModActions && e.Engine.SlackWebhookURL != "" { + msg := slackBody("⚠️ Automod Account Action ⚠️\n", e.Account, newLabels, newFlags, newReports, newTakedown) + if err := e.Engine.SendSlackMsg(ctx, msg); err != nil { + e.Logger.Error("sending slack webhook", "err", err) + } + } + + // if we can't actually talk to service, bail out early if e.Engine.AdminClient == nil { return nil } - needsPurge := false xrpcc := e.Engine.AdminClient + if len(newLabels) > 0 { + e.Logger.Info("labeling record", "newLabels", newLabels) comment := "automod" _, err := comatproto.AdminEmitModerationEvent(ctx, xrpcc, &comatproto.AdminEmitModerationEvent_Input{ CreatedBy: xrpcc.Auth.Did, @@ -228,27 +335,26 @@ func (e *RepoEvent) PersistAccountActions(ctx context.Context) error { if err != nil { return err } - needsPurge = true } + if len(newFlags) > 0 { e.Engine.Flags.Add(ctx, e.Account.Identity.DID.String(), newFlags) - needsPurge = true } + + // reports are additionally de-duped when persisting the action, so track with a flag + createdReports := false for _, mr := range newReports { - _, err := comatproto.ModerationCreateReport(ctx, xrpcc, &comatproto.ModerationCreateReport_Input{ - ReasonType: &mr.ReasonType, - Reason: &mr.Comment, - Subject: &comatproto.ModerationCreateReport_Input_Subject{ - AdminDefs_RepoRef: &comatproto.AdminDefs_RepoRef{ - Did: e.Account.Identity.DID.String(), - }, - }, - }) + created, err := createReportIfFresh(ctx, xrpcc, *e, mr) if err != nil { return err } + if created { + createdReports = true + } } + if newTakedown { + e.Logger.Warn("account-takedown") comment := "automod" _, err := comatproto.AdminEmitModerationEvent(ctx, xrpcc, &comatproto.AdminEmitModerationEvent_Input{ CreatedBy: xrpcc.Auth.Did, @@ -266,11 +372,13 @@ func (e *RepoEvent) PersistAccountActions(ctx context.Context) error { if err != nil { return err } - needsPurge = true } - if needsPurge { + + needCachePurge := newTakedown || len(newLabels) > 0 || len(newFlags) > 0 || createdReports + if needCachePurge { return e.Engine.PurgeAccountCaches(ctx, e.Account.Identity.DID) } + return nil } @@ -356,6 +464,11 @@ func (e *RecordEvent) AddRecordFlag(val string) { // Enqueues a moderation report to be filed against the record at the end of rule processing. func (e *RecordEvent) ReportRecord(reason, comment string) { + if comment == "" { + comment = "(automod)" + } else { + comment = "automod: " + comment + } e.RecordReports = append(e.RecordReports, ModReport{ReasonType: reason, Comment: comment}) } @@ -364,24 +477,17 @@ func (e *RecordEvent) ReportRecord(reason, comment string) { // NOTE: this method currently does *not* persist record-level flags to any storage, and does not de-dupe most actions, on the assumption that the record is new (from firehose) and has no existing mod state. func (e *RecordEvent) PersistRecordActions(ctx context.Context) error { - // TODO: consider de-duping record-level actions? at least for updates and deletes. + // NOTE: record-level actions are *not* currently de-duplicated (aka, the same record could be labeled multiple times, or re-reported, etc) newLabels := dedupeStrings(e.RecordLabels) newFlags := dedupeStrings(e.RecordFlags) - newReports := e.RecordReports - newTakedown := e.RecordTakedown + newReports := circuitBreakReports(&e.RepoEvent, e.RecordReports) + newTakedown := circuitBreakTakedown(&e.RepoEvent, e.RecordTakedown) atURI := fmt.Sprintf("at://%s/%s/%s", e.Account.Identity.DID, e.Collection, e.RecordKey) if newTakedown || len(newLabels) > 0 || len(newFlags) > 0 || len(newReports) > 0 { if e.Engine.SlackWebhookURL != "" { - msg := fmt.Sprintf("⚠️ Automod Record Action ⚠️\n") - msg += fmt.Sprintf("`%s` / `%s` / / \n", - e.Account.Identity.DID, - e.Account.Identity.Handle, - e.Account.Identity.DID, - e.Account.Identity.DID, - ) + msg := slackBody("⚠️ Automod Record Action ⚠️\n", e.Account, newLabels, newFlags, newReports, newTakedown) msg += fmt.Sprintf("`%s`\n", atURI) - msg = slackBody(msg, newLabels, newFlags, newReports, newTakedown) if err := e.Engine.SendSlackMsg(ctx, msg); err != nil { e.Logger.Error("sending slack webhook", "err", err) } @@ -396,6 +502,7 @@ func (e *RecordEvent) PersistRecordActions(ctx context.Context) error { } xrpcc := e.Engine.AdminClient if len(newLabels) > 0 { + e.Logger.Info("labeling record", "newLabels", newLabels) comment := "automod" _, err := comatproto.AdminEmitModerationEvent(ctx, xrpcc, &comatproto.AdminEmitModerationEvent_Input{ CreatedBy: xrpcc.Auth.Did, @@ -418,6 +525,7 @@ func (e *RecordEvent) PersistRecordActions(ctx context.Context) error { e.Engine.Flags.Add(ctx, atURI, newFlags) } for _, mr := range newReports { + e.Logger.Info("reporting record", "reasonType", mr.ReasonType, "comment", mr.Comment) _, err := comatproto.ModerationCreateReport(ctx, xrpcc, &comatproto.ModerationCreateReport_Input{ ReasonType: &mr.ReasonType, Reason: &mr.Comment, @@ -430,6 +538,7 @@ func (e *RecordEvent) PersistRecordActions(ctx context.Context) error { } } if newTakedown { + e.Logger.Warn("record-takedown") comment := "automod" _, err := comatproto.AdminEmitModerationEvent(ctx, xrpcc, &comatproto.AdminEmitModerationEvent_Input{ CreatedBy: xrpcc.Auth.Did, diff --git a/automod/report.go b/automod/report.go new file mode 100644 index 000000000..774d0c0d2 --- /dev/null +++ b/automod/report.go @@ -0,0 +1,34 @@ +package automod + +type ModReport struct { + ReasonType string + Comment string +} + +var ( + ReportReasonSpam = "com.atproto.moderation.defs#reasonSpam" + ReportReasonViolation = "com.atproto.moderation.defs#reasonViolation" + ReportReasonMisleading = "com.atproto.moderation.defs#reasonMisleading" + ReportReasonSexual = "com.atproto.moderation.defs#reasonSexual" + ReportReasonRude = "com.atproto.moderation.defs#reasonRude" + ReportReasonOther = "com.atproto.moderation.defs#reasonOther" +) + +func reasonShortName(reason string) string { + switch reason { + case ReportReasonSpam: + return "spam" + case ReportReasonViolation: + return "violation" + case ReportReasonMisleading: + return "misleading" + case ReportReasonSexual: + return "sexual" + case ReportReasonRude: + return "rude" + case ReportReasonOther: + return "other" + default: + return "unknown" + } +} diff --git a/automod/rules/interaction.go b/automod/rules/interaction.go index 4cb80ba1e..20bf3b97c 100644 --- a/automod/rules/interaction.go +++ b/automod/rules/interaction.go @@ -1,6 +1,8 @@ package rules import ( + "fmt" + "github.com/bluesky-social/indigo/automod" "github.com/bluesky-social/indigo/automod/countstore" ) @@ -19,6 +21,7 @@ func InteractionChurnRule(evt *automod.RecordEvent) error { if created > interactionDailyThreshold && deleted > interactionDailyThreshold && ratio > 0.5 { evt.Logger.Info("high-like-churn", "created-today", created, "deleted-today", deleted) evt.AddAccountFlag("high-like-churn") + evt.ReportAccount(automod.ReportReasonSpam, fmt.Sprintf("interaction churn: %d likes, %d unlikes today (so far)", created, deleted)) } case "app.bsky.graph.follow": evt.Increment("follow", did) @@ -28,6 +31,7 @@ func InteractionChurnRule(evt *automod.RecordEvent) error { if created > interactionDailyThreshold && deleted > interactionDailyThreshold && ratio > 0.5 { evt.Logger.Info("high-follow-churn", "created-today", created, "deleted-today", deleted) evt.AddAccountFlag("high-follow-churn") + evt.ReportAccount(automod.ReportReasonSpam, fmt.Sprintf("interaction churn: %d follows, %d unfollows today (so far)", created, deleted)) } } return nil