diff --git a/automod/countstore/countstore.go b/automod/countstore/countstore.go index a3ae2b9d1..5531541fd 100644 --- a/automod/countstore/countstore.go +++ b/automod/countstore/countstore.go @@ -48,6 +48,7 @@ type CountStore interface { // TODO: batch increment method GetCountDistinct(ctx context.Context, name, bucket, period string) (int, error) IncrementDistinct(ctx context.Context, name, bucket, val string) error + Reset(ctx context.Context, name, val string) error } func periodBucket(name, val, period string) string { diff --git a/automod/countstore/countstore_mem.go b/automod/countstore/countstore_mem.go index f33c076ee..43bfa06e0 100644 --- a/automod/countstore/countstore_mem.go +++ b/automod/countstore/countstore_mem.go @@ -40,6 +40,21 @@ func (s MemCountStore) Increment(ctx context.Context, name, val string) error { return nil } +func (s MemCountStore) Reset(ctx context.Context, name, val string) error { + for _, p := range []string{PeriodTotal, PeriodDay, PeriodHour} { + if err := s.ResetPeriod(ctx, name, val, p); err != nil { + return err + } + } + return nil +} + +func (s MemCountStore) ResetPeriod(ctx context.Context, name, val, period string) error { + key := periodBucket(name, val, period) + s.Counts.Delete(key) + return nil +} + func (s MemCountStore) IncrementPeriod(ctx context.Context, name, val, period string) error { k := periodBucket(name, val, period) s.Counts.Compute(k, func(oldVal int, _ bool) (int, bool) { diff --git a/automod/countstore/countstore_redis.go b/automod/countstore/countstore_redis.go index 2e42c96f8..cc5c38846 100644 --- a/automod/countstore/countstore_redis.go +++ b/automod/countstore/countstore_redis.go @@ -66,6 +66,25 @@ func (s *RedisCountStore) Increment(ctx context.Context, name, val string) error return err } +func (s *RedisCountStore) Reset(ctx context.Context, name, val string) error { + var key string + + // delete multiple counters in a single redis round-trip + multi := s.Client.Pipeline() + + key = redisCountPrefix + periodBucket(name, val, PeriodHour) + multi.Del(ctx, key) + + key = redisCountPrefix + periodBucket(name, val, PeriodDay) + multi.Del(ctx, key) + + key = redisCountPrefix + periodBucket(name, val, PeriodTotal) + multi.Del(ctx, key) + + _, err := multi.Exec(ctx) + return err +} + // Variant of Increment() which only acts on a single specified time period. The intended us of this variant is to control the total number of counters persisted, by using a relatively short time period, for which the counters will expire. func (s *RedisCountStore) IncrementPeriod(ctx context.Context, name, val, period string) error { diff --git a/automod/engine/context.go b/automod/engine/context.go index 2e447bebd..d0de14fc6 100644 --- a/automod/engine/context.go +++ b/automod/engine/context.go @@ -250,6 +250,9 @@ func (c *BaseContext) GetAccountMeta(did syntax.DID) *AccountMeta { func (c *BaseContext) Increment(name, val string) { c.effects.Increment(name, val) } +func (c *BaseContext) ResetCounter(name, val string) { + c.effects.ResetCounter(name, val) +} func (c *BaseContext) IncrementDistinct(name, bucket, val string) { c.effects.IncrementDistinct(name, bucket, val) @@ -283,6 +286,10 @@ func (c *AccountContext) TakedownAccount() { c.effects.TakedownAccount() } +func (c *AccountContext) ResolveAccountAppeal() { + c.effects.ResolveAccountAppeal() +} + func (c *AccountContext) EscalateAccount() { c.effects.EscalateAccount() } @@ -311,6 +318,10 @@ func (c *RecordContext) TakedownRecord() { c.effects.TakedownRecord() } +func (c *RecordContext) ResolveRecordAppeal() { + c.effects.ResolveRecordAppeal() +} + func (c *RecordContext) TakedownBlob(cid string) { c.effects.TakedownBlob(cid) } diff --git a/automod/engine/effects.go b/automod/engine/effects.go index a318615cc..c9352bc5f 100644 --- a/automod/engine/effects.go +++ b/automod/engine/effects.go @@ -36,6 +36,8 @@ type Effects struct { mu sync.Mutex // List of counters which should be incremented as part of processing this event. These are collected during rule execution and persisted in bulk at the end. CounterIncrements []CounterRef + // List of counters which should be reset as part of processing this event. These are collected during rule execution and persisted in bulk at the end. + CounterResets []CounterRef // Similar to "CounterIncrements", but for "distinct" style counters CounterDistinctIncrements []CounterDistinctRef // TODO: better variable names // Label values which should be applied to the overall account, as a result of rule execution. @@ -48,6 +50,8 @@ type Effects struct { AccountReports []ModReport // If "true", a rule decided that the entire account should have a takedown. AccountTakedown bool + // If "true", indicates that a rule indicates that appeals on the account should be resolved. + AccountAppealResolve bool // If "true", a rule decided that the reported account should be escalated. AccountEscalate bool // If "true", a rule decided that the reports on account should be resolved as acknowledged. @@ -68,6 +72,8 @@ type Effects struct { RejectEvent bool // Services, if any, which should blast out a notification about this even (eg, Slack) NotifyServices []string + // If "true", indicates that a rule indicates that any appeal on the record should be resolved + RecordAppealResolve bool } // Enqueues the named counter to be incremented at the end of all rule processing. Will automatically increment for all time periods. @@ -80,6 +86,16 @@ func (e *Effects) Increment(name, val string) { e.CounterIncrements = append(e.CounterIncrements, CounterRef{Name: name, Val: val}) } +// Enqueues the named counter to be reset at the end of all rule processing. +// +// "name" is the counter namespace. +// "val" is the specific counter with that namespace. +func (e *Effects) ResetCounter(name, val string) { + e.mu.Lock() + defer e.mu.Unlock() + e.CounterResets = append(e.CounterResets, CounterRef{Name: name, Val: val}) +} + // Enqueues the named counter to be incremented at the end of all rule processing. Will only increment the indicated time period bucket. func (e *Effects) IncrementPeriod(name, val string, period string) { e.mu.Lock() @@ -150,6 +166,11 @@ func (e *Effects) TakedownAccount() { e.AccountTakedown = true } +// Enqueues the accounts's appeals to be resolved at the end of rule processing. +func (e *Effects) ResolveAccountAppeal() { + e.AccountAppealResolve = true +} + // Enqueues the account to be "escalated" for mod review at the end of rule processing. func (e *Effects) EscalateAccount() { e.AccountEscalate = true @@ -196,6 +217,17 @@ func (e *Effects) AddRecordFlag(val string) { e.RecordFlags = append(e.RecordFlags, val) } +// Enqueues the provided flag (string value) to be removed (in the Engine's flagstore) at the end of rule processing. +func (e *Effects) RemoveRecordFlag(val string) { + e.mu.Lock() + defer e.mu.Unlock() + for i, v := range e.RecordFlags { + if v == val { + e.RecordFlags = append(e.RecordFlags[:i], e.RecordFlags[i+1:]...) + } + } +} + // Enqueues a moderation report to be filed against the record at the end of rule processing. func (e *Effects) ReportRecord(reason, comment string) { e.mu.Lock() @@ -216,6 +248,11 @@ func (e *Effects) TakedownRecord() { e.RecordTakedown = true } +// Enqueues the record's appeals to be resolved at the end of rule processing. +func (e *Effects) ResolveRecordAppeal() { + e.RecordAppealResolve = true +} + // Enqueues the blob CID to be taken down (aka, CDN purge) as part of any record takedown func (e *Effects) TakedownBlob(cid string) { e.mu.Lock() diff --git a/automod/engine/engine_ozone.go b/automod/engine/engine_ozone.go index bbe354225..18ee58a1e 100644 --- a/automod/engine/engine_ozone.go +++ b/automod/engine/engine_ozone.go @@ -212,6 +212,7 @@ func (e *Engine) CanonicalLogLineOzoneEvent(c *OzoneEventContext) { "accountReports", len(c.effects.AccountReports), "recordLabels", c.effects.RecordLabels, "recordFlags", c.effects.RecordFlags, + "recordAppealResolve", c.effects.RecordAppealResolve, "recordTakedown", c.effects.RecordTakedown, "recordReports", len(c.effects.RecordReports), ) diff --git a/automod/engine/persist.go b/automod/engine/persist.go index 3c4b36fd0..8bac82df3 100644 --- a/automod/engine/persist.go +++ b/automod/engine/persist.go @@ -29,6 +29,12 @@ func (eng *Engine) persistCounters(ctx context.Context, eff *Effects) error { return err } } + for _, ref := range eff.CounterResets { + err := eng.Counters.Reset(ctx, ref.Name, ref.Val) + if err != nil { + return err + } + } return nil } @@ -266,7 +272,8 @@ func (eng *Engine) persistRecordModActions(c *RecordContext) error { atURI := c.RecordOp.ATURI().String() newLabels := dedupeStrings(c.effects.RecordLabels) newTags := dedupeStrings(c.effects.RecordTags) - if (len(newLabels) > 0 || len(newTags) > 0) && eng.OzoneClient != nil { + resolveAppeal := c.effects.RecordAppealResolve + if (len(newLabels) > 0 || len(newTags) > 0 || resolveAppeal) && eng.OzoneClient != nil { // fetch existing record labels, tags, etc rv, err := toolsozone.ModerationGetRecord(ctx, eng.OzoneClient, c.RecordOp.CID.String(), c.RecordOp.ATURI().String()) if err != nil { @@ -290,6 +297,7 @@ func (eng *Engine) persistRecordModActions(c *RecordContext) error { existingTags = rv.Moderation.SubjectStatus.Tags } newTags = dedupeTagActions(newTags, existingTags) + resolveAppeal = resolveAppeal && *rv.Moderation.SubjectStatus.Appealed } } @@ -317,7 +325,12 @@ func (eng *Engine) persistRecordModActions(c *RecordContext) error { return fmt.Errorf("failed to circuit break takedowns: %w", err) } - if newTakedown || len(newLabels) > 0 || len(newTags) > 0 || len(newFlags) > 0 || len(newReports) > 0 { + resolveAppeal, err = eng.circuitBreakModAction(ctx, resolveAppeal) + if err != nil { + return fmt.Errorf("failed to circuit break resolve appeal: %w", err) + } + + if newTakedown || len(newLabels) > 0 || len(newTags) > 0 || len(newFlags) > 0 || len(newReports) > 0 || resolveAppeal { if eng.Notifier != nil { for _, srv := range dedupeStrings(c.effects.NotifyServices) { if err := eng.Notifier.SendRecord(ctx, srv, c); err != nil { @@ -337,7 +350,7 @@ func (eng *Engine) persistRecordModActions(c *RecordContext) error { } // exit early - if !newTakedown && len(newLabels) == 0 && len(newTags) == 0 && len(newReports) == 0 { + if !newTakedown && len(newLabels) == 0 && len(newReports) == 0 && len(newTags) == 0 && !resolveAppeal { return nil } @@ -433,7 +446,27 @@ func (eng *Engine) persistRecordModActions(c *RecordContext) error { if err != nil { c.Logger.Error("failed to execute record takedown", "err", err) } + resolveAppeal = false } + if resolveAppeal { + c.Logger.Warn("record-resolve-appeal") + actionNewTakedownCount.WithLabelValues("record").Inc() + comment := "[automod]: automated appeal resolution due to content deletion" + _, err := toolsozone.ModerationEmitEvent(ctx, xrpcc, &toolsozone.ModerationEmitEvent_Input{ + CreatedBy: xrpcc.Auth.Did, + Event: &toolsozone.ModerationEmitEvent_Input_Event{ + ModerationDefs_ModEventResolveAppeal: &toolsozone.ModerationDefs_ModEventResolveAppeal{ + Comment: &comment, + }, + }, + Subject: &toolsozone.ModerationEmitEvent_Input_Subject{ + RepoStrongRef: &strongRef, + }, + }) + if err != nil { + c.Logger.Error("failed to execute appeal resolve", "err", err) + } + } return nil } diff --git a/automod/rules/all.go b/automod/rules/all.go index 8d66b509d..8cc0a768e 100644 --- a/automod/rules/all.go +++ b/automod/rules/all.go @@ -45,6 +45,7 @@ func DefaultRules() automod.RuleSet { }, RecordDeleteRules: []automod.RecordRuleFunc{ DeleteInteractionRule, + ResolveAppealOnRecordDeleteRule, }, IdentityRules: []automod.IdentityRuleFunc{ NewAccountRule, @@ -52,6 +53,7 @@ func DefaultRules() automod.RuleSet { BadWordDIDRule, NewAccountBotEmailRule, CelebSpamIdentityRule, + ResolveAppealOnAccountDeleteRule, }, BlobRules: []automod.BlobRuleFunc{ //BlobVerifyRule, @@ -61,6 +63,7 @@ func DefaultRules() automod.RuleSet { }, OzoneEventRules: []automod.OzoneEventRuleFunc{ HarassmentProtectionOzoneEventRule, + MarkAppealOzoneEventRule, }, } return rules diff --git a/automod/rules/hashtags.go b/automod/rules/hashtags.go index 682ce746a..a4215fd24 100644 --- a/automod/rules/hashtags.go +++ b/automod/rules/hashtags.go @@ -29,6 +29,7 @@ func BadHashtagsPostRule(c *automod.RecordContext, post *appbsky.FeedPost) error break } } + return nil } diff --git a/automod/rules/resolve_appeal_on_delete.go b/automod/rules/resolve_appeal_on_delete.go new file mode 100644 index 000000000..3b756e80e --- /dev/null +++ b/automod/rules/resolve_appeal_on_delete.go @@ -0,0 +1,59 @@ +package rules + +import ( + "github.com/bluesky-social/indigo/automod" + "github.com/bluesky-social/indigo/automod/countstore" +) + +var _ automod.RecordRuleFunc = ResolveAppealOnRecordDeleteRule + +func ResolveAppealOnRecordDeleteRule(c *automod.RecordContext) error { + switch c.RecordOp.Collection { + case "app.bsky.feed.post": + hasAppeal := c.GetCount("appeal", c.RecordOp.ATURI().String(), countstore.PeriodTotal) + + if hasAppeal > 0 { + c.ResolveRecordAppeal() + } + } + return nil +} + +var _ automod.IdentityRuleFunc = ResolveAppealOnAccountDeleteRule + +func ResolveAppealOnAccountDeleteRule(c *automod.AccountContext) error { + hasAppeal := c.GetCount("appeal", c.Account.Identity.DID.String(), countstore.PeriodTotal) + + // @TODO: Check here that we check if the account has been deleted or not before resolving + // This is not currently available on the context + if hasAppeal > 0 && (c.Account.Deactivated) { + c.ResolveAccountAppeal() + } + return nil +} + +var _ automod.OzoneEventRuleFunc = MarkAppealOzoneEventRule + +// looks for appeals on records/accounts and flags subjects +func MarkAppealOzoneEventRule(c *automod.OzoneEventContext) error { + isResolveAppealEvent := c.Event.Event.ModerationDefs_ModEventResolveAppeal != nil + // appeals are just report events emitted by the author of the reported content with a special report type + isAppealEvent := c.Event.Event.ModerationDefs_ModEventReport != nil && *c.Event.Event.ModerationDefs_ModEventReport.ReportType == "com.atproto.moderation.defs#reasonAppeal" + + if !isAppealEvent && !isResolveAppealEvent { + return nil + } + + counterKey := c.Event.SubjectDID.String() + if c.Event.SubjectURI != nil { + counterKey = c.Event.SubjectURI.String() + } + + if isAppealEvent { + c.Increment("appeal", counterKey) + } else { + c.ResetCounter("appeal", counterKey) + } + + return nil +}