From d51a0295b54279160421b9a71fd827a6689fd980 Mon Sep 17 00:00:00 2001 From: Foysal Ahamed Date: Fri, 26 Jul 2024 22:42:28 +0200 Subject: [PATCH 1/8] :construction: Send account events to ozone --- automod/engine/engine.go | 4 +++ automod/engine/persist.go | 69 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/automod/engine/engine.go b/automod/engine/engine.go index ae80db3ee..9aac31fe7 100644 --- a/automod/engine/engine.go +++ b/automod/engine/engine.go @@ -95,6 +95,10 @@ func (eng *Engine) ProcessIdentityEvent(ctx context.Context, typ string, did syn eventErrorCount.WithLabelValues("identity").Inc() return fmt.Errorf("failed to persist actions for identity event: %w", err) } + if err := eng.persistOzoneAccountEvent(&ac, typ); err != nil { + eventErrorCount.WithLabelValues("identity").Inc() + return fmt.Errorf("failed to persist ozone sync for identity event: %w", err) + } if err := eng.persistCounters(ctx, ac.effects); err != nil { eventErrorCount.WithLabelValues("identity").Inc() return fmt.Errorf("failed to persist counters for identity event: %w", err) diff --git a/automod/engine/persist.go b/automod/engine/persist.go index f38b12fe3..607a94292 100644 --- a/automod/engine/persist.go +++ b/automod/engine/persist.go @@ -6,6 +6,7 @@ import ( comatproto "github.com/bluesky-social/indigo/api/atproto" toolsozone "github.com/bluesky-social/indigo/api/ozone" + "github.com/davecgh/go-spew/spew" ) func (eng *Engine) persistCounters(ctx context.Context, eff *Effects) error { @@ -155,6 +156,74 @@ func (eng *Engine) persistAccountModActions(c *AccountContext) error { return nil } +func (eng *Engine) persistOzoneAccountEvent(c *AccountContext, typ string) error { + ctx := c.Ctx + + c.Logger.Info("ozone account event handle", "typ", typ) + comment := "[automod]: account event" + var event toolsozone.ModerationEmitEvent_Input_Event + spew.Dump(c.Account) + + // @TODO: Add duplicate check here so that the same event is not emitted more than once + + // @TODO: Inside of each of these blocks we would define a different type of event, for now, these are just comment events + if typ == "handle" { + comment = "[automod]: handle update" + event = toolsozone.ModerationEmitEvent_Input_Event{ + ModerationDefs_ModEventComment: &toolsozone.ModerationDefs_ModEventComment{ + Comment: comment, + }, + } + } else if typ == "tombstone" { + comment = "[automod]: account deleted" + event = toolsozone.ModerationEmitEvent_Input_Event{ + ModerationDefs_ModEventComment: &toolsozone.ModerationDefs_ModEventComment{ + Comment: comment, + }, + } + } else if typ == "account" { + // @TODO: Could there be other reasons for this event to fire? if so, we should be able to discard those and only handle activation/reactivation + if c.Account.Deactivated { + comment = "[automod]: account deactivated" + } else { + comment = "[automod]: account re-activated" + } + + event = toolsozone.ModerationEmitEvent_Input_Event{ + ModerationDefs_ModEventComment: &toolsozone.ModerationDefs_ModEventComment{ + Comment: comment, + }, + } + } else { + // Silently ignoring unknown event types + return nil + } + + c.Logger.Info("comment for account event", "comment", comment) + + // if we can't actually talk to service, bail out early + if eng.OzoneClient == nil { + c.Logger.Warn("not persisting ozone account event, mod service client not configured") + return nil + } + + xrpcc := eng.OzoneClient + _, err := toolsozone.ModerationEmitEvent(ctx, xrpcc, &toolsozone.ModerationEmitEvent_Input{ + CreatedBy: xrpcc.Auth.Did, + Event: &event, + Subject: &toolsozone.ModerationEmitEvent_Input_Subject{ + AdminDefs_RepoRef: &comatproto.AdminDefs_RepoRef{ + Did: c.Account.Identity.DID.String(), + }, + }, + }) + if err != nil { + c.Logger.Error("failed to send account event", "err", err) + } + + return nil +} + // Persists some record-level state: labels, takedowns, reports. // // NOTE: this method currently does *not* persist record-level flags to any storage, and does not de-dupe most actions, on the assumption that the record is new (from firehose) and has no existing mod state. From 9124fd8ed5a73045d14b718d19f5df266b31cd34 Mon Sep 17 00:00:00 2001 From: Foysal Ahamed Date: Tue, 13 Aug 2024 18:24:28 +0200 Subject: [PATCH 2/8] :sparkles: Refactor ozone event rerouting --- api/ozone/moderationdefs.go | 89 +++++++++++++++++++++++++ api/ozone/moderationemitEvent.go | 24 +++++++ automod/engine/engine.go | 5 +- automod/engine/persist.go | 69 ------------------- automod/engine/reroute.go | 109 +++++++++++++++++++++++++++++++ cmd/hepa/consumer.go | 9 +++ 6 files changed, 232 insertions(+), 73 deletions(-) create mode 100644 automod/engine/reroute.go diff --git a/api/ozone/moderationdefs.go b/api/ozone/moderationdefs.go index a833fbda0..96c07f3c6 100644 --- a/api/ozone/moderationdefs.go +++ b/api/ozone/moderationdefs.go @@ -13,6 +13,20 @@ import ( "github.com/bluesky-social/indigo/lex/util" ) +// ModerationDefs_AccountEvent is a "accountEvent" in the tools.ozone.moderation.defs schema. +// +// Logs account status related events on a repo subject. Normally captured by automod from the firehose and emitted to ozone for historical tracking. +// +// RECORDTYPE: ModerationDefs_AccountEvent +type ModerationDefs_AccountEvent struct { + LexiconTypeID string `json:"$type,const=tools.ozone.moderation.defs#accountEvent" cborgen:"$type,const=tools.ozone.moderation.defs#accountEvent"` + // active: Indicates that the account has a repository which can be fetched from the host that emitted this event. + Active *bool `json:"active,omitempty" cborgen:"active,omitempty"` + Comment *string `json:"comment,omitempty" cborgen:"comment,omitempty"` + Status string `json:"status" cborgen:"status"` + Timestamp string `json:"timestamp" cborgen:"timestamp"` +} + // ModerationDefs_BlobView is a "blobView" in the tools.ozone.moderation.defs schema. type ModerationDefs_BlobView struct { Cid string `json:"cid" cborgen:"cid"` @@ -58,6 +72,20 @@ func (t *ModerationDefs_BlobView_Details) UnmarshalJSON(b []byte) error { } } +// ModerationDefs_IdentityEvent is a "identityEvent" in the tools.ozone.moderation.defs schema. +// +// Logs identity related events on a repo subject. Normally captured by automod from the firehose and emitted to ozone for historical tracking. +// +// RECORDTYPE: ModerationDefs_IdentityEvent +type ModerationDefs_IdentityEvent struct { + LexiconTypeID string `json:"$type,const=tools.ozone.moderation.defs#identityEvent" cborgen:"$type,const=tools.ozone.moderation.defs#identityEvent"` + Comment *string `json:"comment,omitempty" cborgen:"comment,omitempty"` + Handle *string `json:"handle,omitempty" cborgen:"handle,omitempty"` + PdsHost *string `json:"pdsHost,omitempty" cborgen:"pdsHost,omitempty"` + Timestamp string `json:"timestamp" cborgen:"timestamp"` + Tombstone *bool `json:"tombstone,omitempty" cborgen:"tombstone,omitempty"` +} + // ModerationDefs_ImageDetails is a "imageDetails" in the tools.ozone.moderation.defs schema. // // RECORDTYPE: ModerationDefs_ImageDetails @@ -278,6 +306,9 @@ type ModerationDefs_ModEventViewDetail_Event struct { ModerationDefs_ModEventResolveAppeal *ModerationDefs_ModEventResolveAppeal ModerationDefs_ModEventDivert *ModerationDefs_ModEventDivert ModerationDefs_ModEventTag *ModerationDefs_ModEventTag + ModerationDefs_AccountEvent *ModerationDefs_AccountEvent + ModerationDefs_IdentityEvent *ModerationDefs_IdentityEvent + ModerationDefs_RecordEvent *ModerationDefs_RecordEvent } func (t *ModerationDefs_ModEventViewDetail_Event) MarshalJSON() ([]byte, error) { @@ -341,6 +372,18 @@ func (t *ModerationDefs_ModEventViewDetail_Event) MarshalJSON() ([]byte, error) t.ModerationDefs_ModEventTag.LexiconTypeID = "tools.ozone.moderation.defs#modEventTag" return json.Marshal(t.ModerationDefs_ModEventTag) } + if t.ModerationDefs_AccountEvent != nil { + t.ModerationDefs_AccountEvent.LexiconTypeID = "tools.ozone.moderation.defs#accountEvent" + return json.Marshal(t.ModerationDefs_AccountEvent) + } + if t.ModerationDefs_IdentityEvent != nil { + t.ModerationDefs_IdentityEvent.LexiconTypeID = "tools.ozone.moderation.defs#identityEvent" + return json.Marshal(t.ModerationDefs_IdentityEvent) + } + if t.ModerationDefs_RecordEvent != nil { + t.ModerationDefs_RecordEvent.LexiconTypeID = "tools.ozone.moderation.defs#recordEvent" + return json.Marshal(t.ModerationDefs_RecordEvent) + } return nil, fmt.Errorf("cannot marshal empty enum") } func (t *ModerationDefs_ModEventViewDetail_Event) UnmarshalJSON(b []byte) error { @@ -395,6 +438,15 @@ func (t *ModerationDefs_ModEventViewDetail_Event) UnmarshalJSON(b []byte) error case "tools.ozone.moderation.defs#modEventTag": t.ModerationDefs_ModEventTag = new(ModerationDefs_ModEventTag) return json.Unmarshal(b, t.ModerationDefs_ModEventTag) + case "tools.ozone.moderation.defs#accountEvent": + t.ModerationDefs_AccountEvent = new(ModerationDefs_AccountEvent) + return json.Unmarshal(b, t.ModerationDefs_AccountEvent) + case "tools.ozone.moderation.defs#identityEvent": + t.ModerationDefs_IdentityEvent = new(ModerationDefs_IdentityEvent) + return json.Unmarshal(b, t.ModerationDefs_IdentityEvent) + case "tools.ozone.moderation.defs#recordEvent": + t.ModerationDefs_RecordEvent = new(ModerationDefs_RecordEvent) + return json.Unmarshal(b, t.ModerationDefs_RecordEvent) default: return nil @@ -468,6 +520,9 @@ type ModerationDefs_ModEventView_Event struct { ModerationDefs_ModEventResolveAppeal *ModerationDefs_ModEventResolveAppeal ModerationDefs_ModEventDivert *ModerationDefs_ModEventDivert ModerationDefs_ModEventTag *ModerationDefs_ModEventTag + ModerationDefs_AccountEvent *ModerationDefs_AccountEvent + ModerationDefs_IdentityEvent *ModerationDefs_IdentityEvent + ModerationDefs_RecordEvent *ModerationDefs_RecordEvent } func (t *ModerationDefs_ModEventView_Event) MarshalJSON() ([]byte, error) { @@ -531,6 +586,18 @@ func (t *ModerationDefs_ModEventView_Event) MarshalJSON() ([]byte, error) { t.ModerationDefs_ModEventTag.LexiconTypeID = "tools.ozone.moderation.defs#modEventTag" return json.Marshal(t.ModerationDefs_ModEventTag) } + if t.ModerationDefs_AccountEvent != nil { + t.ModerationDefs_AccountEvent.LexiconTypeID = "tools.ozone.moderation.defs#accountEvent" + return json.Marshal(t.ModerationDefs_AccountEvent) + } + if t.ModerationDefs_IdentityEvent != nil { + t.ModerationDefs_IdentityEvent.LexiconTypeID = "tools.ozone.moderation.defs#identityEvent" + return json.Marshal(t.ModerationDefs_IdentityEvent) + } + if t.ModerationDefs_RecordEvent != nil { + t.ModerationDefs_RecordEvent.LexiconTypeID = "tools.ozone.moderation.defs#recordEvent" + return json.Marshal(t.ModerationDefs_RecordEvent) + } return nil, fmt.Errorf("cannot marshal empty enum") } func (t *ModerationDefs_ModEventView_Event) UnmarshalJSON(b []byte) error { @@ -585,6 +652,15 @@ func (t *ModerationDefs_ModEventView_Event) UnmarshalJSON(b []byte) error { case "tools.ozone.moderation.defs#modEventTag": t.ModerationDefs_ModEventTag = new(ModerationDefs_ModEventTag) return json.Unmarshal(b, t.ModerationDefs_ModEventTag) + case "tools.ozone.moderation.defs#accountEvent": + t.ModerationDefs_AccountEvent = new(ModerationDefs_AccountEvent) + return json.Unmarshal(b, t.ModerationDefs_AccountEvent) + case "tools.ozone.moderation.defs#identityEvent": + t.ModerationDefs_IdentityEvent = new(ModerationDefs_IdentityEvent) + return json.Unmarshal(b, t.ModerationDefs_IdentityEvent) + case "tools.ozone.moderation.defs#recordEvent": + t.ModerationDefs_RecordEvent = new(ModerationDefs_RecordEvent) + return json.Unmarshal(b, t.ModerationDefs_RecordEvent) default: return nil @@ -644,6 +720,19 @@ type ModerationDefs_ModerationDetail struct { SubjectStatus *ModerationDefs_SubjectStatusView `json:"subjectStatus,omitempty" cborgen:"subjectStatus,omitempty"` } +// ModerationDefs_RecordEvent is a "recordEvent" in the tools.ozone.moderation.defs schema. +// +// Logs lifecycle event on a record subject. Normally captured by automod from the firehose and emitted to ozone for historical tracking. +// +// RECORDTYPE: ModerationDefs_RecordEvent +type ModerationDefs_RecordEvent struct { + LexiconTypeID string `json:"$type,const=tools.ozone.moderation.defs#recordEvent" cborgen:"$type,const=tools.ozone.moderation.defs#recordEvent"` + Cid *string `json:"cid,omitempty" cborgen:"cid,omitempty"` + Comment *string `json:"comment,omitempty" cborgen:"comment,omitempty"` + Op string `json:"op" cborgen:"op"` + Timestamp string `json:"timestamp" cborgen:"timestamp"` +} + // ModerationDefs_RecordView is a "recordView" in the tools.ozone.moderation.defs schema. // // RECORDTYPE: ModerationDefs_RecordView diff --git a/api/ozone/moderationemitEvent.go b/api/ozone/moderationemitEvent.go index ebd8662c8..f5d3f0541 100644 --- a/api/ozone/moderationemitEvent.go +++ b/api/ozone/moderationemitEvent.go @@ -36,6 +36,9 @@ type ModerationEmitEvent_Input_Event struct { ModerationDefs_ModEventReverseTakedown *ModerationDefs_ModEventReverseTakedown ModerationDefs_ModEventEmail *ModerationDefs_ModEventEmail ModerationDefs_ModEventTag *ModerationDefs_ModEventTag + ModerationDefs_AccountEvent *ModerationDefs_AccountEvent + ModerationDefs_IdentityEvent *ModerationDefs_IdentityEvent + ModerationDefs_RecordEvent *ModerationDefs_RecordEvent } func (t *ModerationEmitEvent_Input_Event) MarshalJSON() ([]byte, error) { @@ -91,6 +94,18 @@ func (t *ModerationEmitEvent_Input_Event) MarshalJSON() ([]byte, error) { t.ModerationDefs_ModEventTag.LexiconTypeID = "tools.ozone.moderation.defs#modEventTag" return json.Marshal(t.ModerationDefs_ModEventTag) } + if t.ModerationDefs_AccountEvent != nil { + t.ModerationDefs_AccountEvent.LexiconTypeID = "tools.ozone.moderation.defs#accountEvent" + return json.Marshal(t.ModerationDefs_AccountEvent) + } + if t.ModerationDefs_IdentityEvent != nil { + t.ModerationDefs_IdentityEvent.LexiconTypeID = "tools.ozone.moderation.defs#identityEvent" + return json.Marshal(t.ModerationDefs_IdentityEvent) + } + if t.ModerationDefs_RecordEvent != nil { + t.ModerationDefs_RecordEvent.LexiconTypeID = "tools.ozone.moderation.defs#recordEvent" + return json.Marshal(t.ModerationDefs_RecordEvent) + } return nil, fmt.Errorf("cannot marshal empty enum") } func (t *ModerationEmitEvent_Input_Event) UnmarshalJSON(b []byte) error { @@ -139,6 +154,15 @@ func (t *ModerationEmitEvent_Input_Event) UnmarshalJSON(b []byte) error { case "tools.ozone.moderation.defs#modEventTag": t.ModerationDefs_ModEventTag = new(ModerationDefs_ModEventTag) return json.Unmarshal(b, t.ModerationDefs_ModEventTag) + case "tools.ozone.moderation.defs#accountEvent": + t.ModerationDefs_AccountEvent = new(ModerationDefs_AccountEvent) + return json.Unmarshal(b, t.ModerationDefs_AccountEvent) + case "tools.ozone.moderation.defs#identityEvent": + t.ModerationDefs_IdentityEvent = new(ModerationDefs_IdentityEvent) + return json.Unmarshal(b, t.ModerationDefs_IdentityEvent) + case "tools.ozone.moderation.defs#recordEvent": + t.ModerationDefs_RecordEvent = new(ModerationDefs_RecordEvent) + return json.Unmarshal(b, t.ModerationDefs_RecordEvent) default: return nil diff --git a/automod/engine/engine.go b/automod/engine/engine.go index 9aac31fe7..970f30120 100644 --- a/automod/engine/engine.go +++ b/automod/engine/engine.go @@ -95,10 +95,6 @@ func (eng *Engine) ProcessIdentityEvent(ctx context.Context, typ string, did syn eventErrorCount.WithLabelValues("identity").Inc() return fmt.Errorf("failed to persist actions for identity event: %w", err) } - if err := eng.persistOzoneAccountEvent(&ac, typ); err != nil { - eventErrorCount.WithLabelValues("identity").Inc() - return fmt.Errorf("failed to persist ozone sync for identity event: %w", err) - } if err := eng.persistCounters(ctx, ac.effects); err != nil { eventErrorCount.WithLabelValues("identity").Inc() return fmt.Errorf("failed to persist counters for identity event: %w", err) @@ -163,6 +159,7 @@ func (eng *Engine) ProcessRecordOp(ctx context.Context, op RecordOp) error { return fmt.Errorf("unexpected op action: %s", op.Action) } eng.CanonicalLogLineRecord(&rc) + eng.RerouteRecordOpToOzone(ctx, &op) // purge the account meta cache when profile is updated if rc.RecordOp.Collection == "app.bsky.actor.profile" { if err := eng.PurgeAccountCaches(ctx, op.DID); err != nil { diff --git a/automod/engine/persist.go b/automod/engine/persist.go index 607a94292..f38b12fe3 100644 --- a/automod/engine/persist.go +++ b/automod/engine/persist.go @@ -6,7 +6,6 @@ import ( comatproto "github.com/bluesky-social/indigo/api/atproto" toolsozone "github.com/bluesky-social/indigo/api/ozone" - "github.com/davecgh/go-spew/spew" ) func (eng *Engine) persistCounters(ctx context.Context, eff *Effects) error { @@ -156,74 +155,6 @@ func (eng *Engine) persistAccountModActions(c *AccountContext) error { return nil } -func (eng *Engine) persistOzoneAccountEvent(c *AccountContext, typ string) error { - ctx := c.Ctx - - c.Logger.Info("ozone account event handle", "typ", typ) - comment := "[automod]: account event" - var event toolsozone.ModerationEmitEvent_Input_Event - spew.Dump(c.Account) - - // @TODO: Add duplicate check here so that the same event is not emitted more than once - - // @TODO: Inside of each of these blocks we would define a different type of event, for now, these are just comment events - if typ == "handle" { - comment = "[automod]: handle update" - event = toolsozone.ModerationEmitEvent_Input_Event{ - ModerationDefs_ModEventComment: &toolsozone.ModerationDefs_ModEventComment{ - Comment: comment, - }, - } - } else if typ == "tombstone" { - comment = "[automod]: account deleted" - event = toolsozone.ModerationEmitEvent_Input_Event{ - ModerationDefs_ModEventComment: &toolsozone.ModerationDefs_ModEventComment{ - Comment: comment, - }, - } - } else if typ == "account" { - // @TODO: Could there be other reasons for this event to fire? if so, we should be able to discard those and only handle activation/reactivation - if c.Account.Deactivated { - comment = "[automod]: account deactivated" - } else { - comment = "[automod]: account re-activated" - } - - event = toolsozone.ModerationEmitEvent_Input_Event{ - ModerationDefs_ModEventComment: &toolsozone.ModerationDefs_ModEventComment{ - Comment: comment, - }, - } - } else { - // Silently ignoring unknown event types - return nil - } - - c.Logger.Info("comment for account event", "comment", comment) - - // if we can't actually talk to service, bail out early - if eng.OzoneClient == nil { - c.Logger.Warn("not persisting ozone account event, mod service client not configured") - return nil - } - - xrpcc := eng.OzoneClient - _, err := toolsozone.ModerationEmitEvent(ctx, xrpcc, &toolsozone.ModerationEmitEvent_Input{ - CreatedBy: xrpcc.Auth.Did, - Event: &event, - Subject: &toolsozone.ModerationEmitEvent_Input_Subject{ - AdminDefs_RepoRef: &comatproto.AdminDefs_RepoRef{ - Did: c.Account.Identity.DID.String(), - }, - }, - }) - if err != nil { - c.Logger.Error("failed to send account event", "err", err) - } - - return nil -} - // Persists some record-level state: labels, takedowns, reports. // // NOTE: this method currently does *not* persist record-level flags to any storage, and does not de-dupe most actions, on the assumption that the record is new (from firehose) and has no existing mod state. diff --git a/automod/engine/reroute.go b/automod/engine/reroute.go new file mode 100644 index 000000000..e14d98d60 --- /dev/null +++ b/automod/engine/reroute.go @@ -0,0 +1,109 @@ +package engine + +import ( + "context" + "time" + + comatproto "github.com/bluesky-social/indigo/api/atproto" + toolsozone "github.com/bluesky-social/indigo/api/ozone" +) + +func (eng *Engine) RerouteAccountEventToOzone(c context.Context, e *comatproto.SyncSubscribeRepos_Account) error { + comment := "[automod]: Account status event" + eng.RerouteEventToOzone(c, toolsozone.ModerationEmitEvent_Input_Event{ + ModerationDefs_AccountEvent: &toolsozone.ModerationDefs_AccountEvent{ + Comment: &comment, + Timestamp: e.Time, + Status: *e.Status, + Active: &e.Active, + }, + }, toolsozone.ModerationEmitEvent_Input_Subject{ + AdminDefs_RepoRef: &comatproto.AdminDefs_RepoRef{ + Did: e.Did, + }, + }) + return nil +} + +func (eng *Engine) RerouteTombstoneEventToOzone(c context.Context, e *comatproto.SyncSubscribeRepos_Tombstone) error { + comment := "[automod]: Tombstone event" + tombstone := true + eng.RerouteEventToOzone(c, toolsozone.ModerationEmitEvent_Input_Event{ + ModerationDefs_IdentityEvent: &toolsozone.ModerationDefs_IdentityEvent{ + Comment: &comment, + // @TODO: These don't seem to exist in the Identity event? + // Handle: e.Handle, + // PdsHost: &e.PdsHost, + Tombstone: &tombstone, + Timestamp: e.Time, + }, + }, toolsozone.ModerationEmitEvent_Input_Subject{ + AdminDefs_RepoRef: &comatproto.AdminDefs_RepoRef{ + Did: e.Did, + }, + }) + return nil +} + +func (eng *Engine) RerouteRecordOpToOzone(c context.Context, e *RecordOp) error { + comment := "[automod]: Record event" + + eng.RerouteEventToOzone(c, toolsozone.ModerationEmitEvent_Input_Event{ + ModerationDefs_RecordEvent: &toolsozone.ModerationDefs_RecordEvent{ + Comment: &comment, + Op: e.Action, + Timestamp: time.Now().Format(time.RFC3339), + }, + }, toolsozone.ModerationEmitEvent_Input_Subject{ + RepoStrongRef: &comatproto.RepoStrongRef{ + LexiconTypeID: "com.atproto.repo.strongRef", + Uri: e.ATURI().String(), + Cid: e.CID.String(), + }, + }) + + return nil +} + +func (eng *Engine) RerouteIdentityEventToOzone(c context.Context, e *comatproto.SyncSubscribeRepos_Identity) error { + comment := "[automod]: Identity event" + tombstone := false + + eng.RerouteEventToOzone(c, toolsozone.ModerationEmitEvent_Input_Event{ + ModerationDefs_IdentityEvent: &toolsozone.ModerationDefs_IdentityEvent{ + Comment: &comment, + Handle: e.Handle, + // @TODO: This doesn't seem to exist in the Identity event? + // PdsHost: &e.PdsHost, + Tombstone: &tombstone, + Timestamp: e.Time, + }, + }, toolsozone.ModerationEmitEvent_Input_Subject{ + AdminDefs_RepoRef: &comatproto.AdminDefs_RepoRef{ + Did: e.Did, + }, + }) + + return nil +} + +func (eng *Engine) RerouteEventToOzone(ctx context.Context, event toolsozone.ModerationEmitEvent_Input_Event, subject toolsozone.ModerationEmitEvent_Input_Subject) error { + // if we can't actually talk to service, bail out early + if eng.OzoneClient == nil { + eng.Logger.Warn("not persisting ozone account event, mod service client not configured") + return nil + } + + xrpcc := eng.OzoneClient + _, err := toolsozone.ModerationEmitEvent(ctx, xrpcc, &toolsozone.ModerationEmitEvent_Input{ + CreatedBy: xrpcc.Auth.Did, + Event: &event, + Subject: &subject, + }) + if err != nil { + eng.Logger.Error("failed to re route event to ozone", "err", err) + return err + } + + return nil +} diff --git a/cmd/hepa/consumer.go b/cmd/hepa/consumer.go index e9d789baa..6699484da 100644 --- a/cmd/hepa/consumer.go +++ b/cmd/hepa/consumer.go @@ -62,6 +62,9 @@ func (s *Server) RunConsumer(ctx context.Context) error { if err := s.engine.ProcessIdentityEvent(ctx, "identity", did); err != nil { s.logger.Error("processing repo identity failed", "did", evt.Did, "seq", evt.Seq, "err", err) } + if err := s.engine.RerouteIdentityEventToOzone(ctx, evt); err != nil { + s.logger.Error("rerouting identity event to ozone failed", "did", evt.Did, "seq", evt.Seq, "err", err) + } return nil }, RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error { @@ -74,6 +77,9 @@ func (s *Server) RunConsumer(ctx context.Context) error { if err := s.engine.ProcessIdentityEvent(ctx, "account", did); err != nil { s.logger.Error("processing repo account failed", "did", evt.Did, "seq", evt.Seq, "err", err) } + if err := s.engine.RerouteAccountEventToOzone(ctx, evt); err != nil { + s.logger.Error("rerouting account event to ozone failed", "did", evt.Did, "seq", evt.Seq, "err", err) + } return nil }, // TODO: deprecated @@ -100,6 +106,9 @@ func (s *Server) RunConsumer(ctx context.Context) error { if err := s.engine.ProcessIdentityEvent(ctx, "tombstone", did); err != nil { s.logger.Error("processing repo tombstone failed", "did", evt.Did, "seq", evt.Seq, "err", err) } + if err := s.engine.RerouteTombstoneEventToOzone(ctx, evt); err != nil { + s.logger.Error("rerouting tombstone event to ozone failed", "did", evt.Did, "seq", evt.Seq, "err", err) + } return nil }, } From 7c4c67e97cdcaf44d821f297219e26b8bcfadec3 Mon Sep 17 00:00:00 2001 From: Foysal Ahamed Date: Tue, 13 Aug 2024 18:52:26 +0200 Subject: [PATCH 3/8] :sparkles: Add duplicate event check --- automod/engine/reroute.go | 68 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/automod/engine/reroute.go b/automod/engine/reroute.go index e14d98d60..5bb56d7f1 100644 --- a/automod/engine/reroute.go +++ b/automod/engine/reroute.go @@ -87,6 +87,63 @@ func (eng *Engine) RerouteIdentityEventToOzone(c context.Context, e *comatproto. return nil } +// For the given subject, checks if there is already an event of the given type within the 5 minutes. +func (eng *Engine) IsDuplicatingEvent(ctx context.Context, event toolsozone.ModerationEmitEvent_Input_Event, subject toolsozone.ModerationEmitEvent_Input_Subject) (bool, error) { + if eng.OzoneClient == nil { + eng.Logger.Warn("can not check if event is duplicate, mod service client not configured") + return false, nil + } + + eventType := "" + if event.ModerationDefs_AccountEvent != nil { + eventType = event.ModerationDefs_AccountEvent.LexiconTypeID + } else if event.ModerationDefs_IdentityEvent != nil { + eventType = event.ModerationDefs_IdentityEvent.LexiconTypeID + } else if event.ModerationDefs_RecordEvent != nil { + eventType = event.ModerationDefs_RecordEvent.LexiconTypeID + } + + eventSubject := "" + if subject.AdminDefs_RepoRef != nil { + eventSubject = subject.AdminDefs_RepoRef.Did + } else if subject.RepoStrongRef != nil { + eventSubject = subject.RepoStrongRef.Uri + } + + xrpcc := eng.OzoneClient + resp, err := toolsozone.ModerationQueryEvents( + ctx, + xrpcc, + nil, + nil, + "", + time.Now().Add(-time.Minute*5).Format(time.RFC3339), + "", + "", + "", + false, + false, + 1, + nil, + nil, + nil, + "", + eventSubject, + []string{eventType}, + ) + + if err != nil { + eng.Logger.Error("failed to query events", "err", err) + return false, err + } + + if len(resp.Events) > 0 { + return true, nil + } + + return false, nil +} + func (eng *Engine) RerouteEventToOzone(ctx context.Context, event toolsozone.ModerationEmitEvent_Input_Event, subject toolsozone.ModerationEmitEvent_Input_Subject) error { // if we can't actually talk to service, bail out early if eng.OzoneClient == nil { @@ -94,6 +151,17 @@ func (eng *Engine) RerouteEventToOzone(ctx context.Context, event toolsozone.Mod return nil } + isDuplicate, duplicateCheckError := eng.IsDuplicatingEvent(ctx, event, subject) + if duplicateCheckError != nil { + eng.Logger.Error("failed to check if event is duplicate", "err", duplicateCheckError) + return duplicateCheckError + } + + if isDuplicate { + eng.Logger.Info("event was already emitted, not emitting again") + return nil + } + xrpcc := eng.OzoneClient _, err := toolsozone.ModerationEmitEvent(ctx, xrpcc, &toolsozone.ModerationEmitEvent_Input{ CreatedBy: xrpcc.Auth.Did, From c8794f06d9715e35f8918b7f72434cca0ff006bd Mon Sep 17 00:00:00 2001 From: Foysal Ahamed Date: Wed, 14 Aug 2024 11:11:55 +0200 Subject: [PATCH 4/8] :sparkles: Check flag before attempting to reroute event to ozone --- automod/engine/engine.go | 1 - cmd/hepa/consumer.go | 40 ++++++++++++++++++++++++++++++---------- cmd/hepa/main.go | 7 +++++++ cmd/hepa/server.go | 5 +++++ 4 files changed, 42 insertions(+), 11 deletions(-) diff --git a/automod/engine/engine.go b/automod/engine/engine.go index 970f30120..ae80db3ee 100644 --- a/automod/engine/engine.go +++ b/automod/engine/engine.go @@ -159,7 +159,6 @@ func (eng *Engine) ProcessRecordOp(ctx context.Context, op RecordOp) error { return fmt.Errorf("unexpected op action: %s", op.Action) } eng.CanonicalLogLineRecord(&rc) - eng.RerouteRecordOpToOzone(ctx, &op) // purge the account meta cache when profile is updated if rc.RecordOp.Collection == "app.bsky.actor.profile" { if err := eng.PurgeAccountCaches(ctx, op.DID); err != nil { diff --git a/cmd/hepa/consumer.go b/cmd/hepa/consumer.go index 6699484da..2039e2620 100644 --- a/cmd/hepa/consumer.go +++ b/cmd/hepa/consumer.go @@ -62,9 +62,13 @@ func (s *Server) RunConsumer(ctx context.Context) error { if err := s.engine.ProcessIdentityEvent(ctx, "identity", did); err != nil { s.logger.Error("processing repo identity failed", "did", evt.Did, "seq", evt.Seq, "err", err) } - if err := s.engine.RerouteIdentityEventToOzone(ctx, evt); err != nil { - s.logger.Error("rerouting identity event to ozone failed", "did", evt.Did, "seq", evt.Seq, "err", err) + + if s.rerouteEvents { + if err := s.engine.RerouteIdentityEventToOzone(ctx, evt); err != nil { + s.logger.Error("rerouting identity event to ozone failed", "did", evt.Did, "seq", evt.Seq, "err", err) + } } + return nil }, RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error { @@ -77,8 +81,10 @@ func (s *Server) RunConsumer(ctx context.Context) error { if err := s.engine.ProcessIdentityEvent(ctx, "account", did); err != nil { s.logger.Error("processing repo account failed", "did", evt.Did, "seq", evt.Seq, "err", err) } - if err := s.engine.RerouteAccountEventToOzone(ctx, evt); err != nil { - s.logger.Error("rerouting account event to ozone failed", "did", evt.Did, "seq", evt.Seq, "err", err) + if s.rerouteEvents { + if err := s.engine.RerouteAccountEventToOzone(ctx, evt); err != nil { + s.logger.Error("rerouting account event to ozone failed", "did", evt.Did, "seq", evt.Seq, "err", err) + } } return nil }, @@ -106,8 +112,10 @@ func (s *Server) RunConsumer(ctx context.Context) error { if err := s.engine.ProcessIdentityEvent(ctx, "tombstone", did); err != nil { s.logger.Error("processing repo tombstone failed", "did", evt.Did, "seq", evt.Seq, "err", err) } - if err := s.engine.RerouteTombstoneEventToOzone(ctx, evt); err != nil { - s.logger.Error("rerouting tombstone event to ozone failed", "did", evt.Did, "seq", evt.Seq, "err", err) + if s.rerouteEvents { + if err := s.engine.RerouteTombstoneEventToOzone(ctx, evt); err != nil { + s.logger.Error("rerouting tombstone event to ozone failed", "did", evt.Did, "seq", evt.Seq, "err", err) + } } return nil }, @@ -215,31 +223,43 @@ func (s *Server) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSubsc break } recCID := syntax.CID(op.Cid.String()) - err = s.engine.ProcessRecordOp(ctx, automod.RecordOp{ + op := automod.RecordOp{ Action: action, DID: did, Collection: collection, RecordKey: rkey, CID: &recCID, RecordCBOR: *recCBOR, - }) + } + err = s.engine.ProcessRecordOp(ctx, op) if err != nil { logger.Error("engine failed to process record", "err", err) continue } + if s.rerouteEvents { + if err := s.engine.RerouteRecordOpToOzone(ctx, &op); err != nil { + logger.Error("rerouting record create/update event to ozone failed", "err", err) + } + } case repomgr.EvtKindDeleteRecord: - err = s.engine.ProcessRecordOp(ctx, automod.RecordOp{ + op := automod.RecordOp{ Action: automod.DeleteOp, DID: did, Collection: collection, RecordKey: rkey, CID: nil, RecordCBOR: nil, - }) + } + err = s.engine.ProcessRecordOp(ctx, op) if err != nil { logger.Error("engine failed to process record", "err", err) continue } + if s.rerouteEvents { + if err := s.engine.RerouteRecordOpToOzone(ctx, &op); err != nil { + logger.Error("rerouting record delete event to ozone failed", "err", err) + } + } default: // TODO: should this be an error? } diff --git a/cmd/hepa/main.go b/cmd/hepa/main.go index bd2447e95..9edc79dd3 100644 --- a/cmd/hepa/main.go +++ b/cmd/hepa/main.go @@ -138,6 +138,11 @@ func run(args []string) error { Usage: "force a fixed number of parallel firehose workers. default (or 0) for auto-scaling; 200 works for a large instance", EnvVars: []string{"HEPA_FIREHOSE_PARALLELISM"}, }, + &cli.BoolFlag{ + Name: "reroute-events", + Usage: "Attempt to reroute firehose events to all configured destinations (for now, only Ozone).", + EnvVars: []string{"HEPA_REROUTE_EVENTS"}, + }, } app.Commands = []*cli.Command{ @@ -242,6 +247,7 @@ var runCmd = &cli.Command{ RatelimitBypass: cctx.String("ratelimit-bypass"), RulesetName: cctx.String("ruleset"), FirehoseParallelism: cctx.Int("firehose-parallelism"), + RerouteEvents: cctx.Bool("reroute-events"), }, ) if err != nil { @@ -316,6 +322,7 @@ func configEphemeralServer(cctx *cli.Context) (*Server, error) { RatelimitBypass: cctx.String("ratelimit-bypass"), RulesetName: cctx.String("ruleset"), FirehoseParallelism: cctx.Int("firehose-parallelism"), + RerouteEvents: cctx.Bool("reroute-events"), }, ) } diff --git a/cmd/hepa/server.go b/cmd/hepa/server.go index 024cb0178..572e3f4cc 100644 --- a/cmd/hepa/server.go +++ b/cmd/hepa/server.go @@ -41,6 +41,9 @@ type Server struct { // same as lastSeq, but for Ozone timestamp cursor. the value is a string. lastOzoneCursor atomic.Value + + // dictates if firehose events should be rerouted to configured destinations + rerouteEvents bool } type Config struct { @@ -61,6 +64,7 @@ type Config struct { RulesetName string RatelimitBypass string FirehoseParallelism int + RerouteEvents bool } func NewServer(dir identity.Directory, config Config) (*Server, error) { @@ -228,6 +232,7 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) { logger: logger, engine: &engine, rdb: rdb, + rerouteEvents: config.RerouteEvents, } return s, nil From 1abcf05f8134ca51e36c1444f30faf92b4f4f554 Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Tue, 27 Aug 2024 23:48:30 -0700 Subject: [PATCH 5/8] refactors from pair-programming --- automod/capture/testing.go | 10 +++- automod/engine/context.go | 4 ++ automod/engine/effects.go | 7 +++ automod/engine/engine.go | 97 ++++++++++++++++++++++++++++++++-- automod/engine/reroute.go | 20 +++---- automod/engine/ruleset.go | 12 +++++ automod/engine/ruletypes.go | 1 + automod/pkg.go | 2 + automod/rules/all.go | 1 + automod/rules/ozone_persist.go | 25 +++++++++ cmd/hepa/consumer.go | 76 ++------------------------ cmd/hepa/server.go | 7 ++- 12 files changed, 173 insertions(+), 89 deletions(-) create mode 100644 automod/rules/ozone_persist.go diff --git a/automod/capture/testing.go b/automod/capture/testing.go index 998aaef48..fbe00d6cb 100644 --- a/automod/capture/testing.go +++ b/automod/capture/testing.go @@ -7,6 +7,7 @@ import ( "io" "os" + comatproto "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/atproto/identity" "github.com/bluesky-social/indigo/atproto/syntax" "github.com/bluesky-social/indigo/automod" @@ -38,12 +39,19 @@ func ProcessCaptureRules(eng *automod.Engine, capture AccountCapture) error { ctx := context.Background() did := capture.AccountMeta.Identity.DID + handle := capture.AccountMeta.Identity.Handle.String() dir := identity.NewMockDirectory() dir.Insert(*capture.AccountMeta.Identity) eng.Directory = &dir // initial identity rules - eng.ProcessIdentityEvent(ctx, "new", did) + identEvent := comatproto.SyncSubscribeRepos_Identity{ + Did: did.String(), + Handle: &handle, + Seq: 12345, + Time: syntax.DatetimeNow().String(), + } + eng.ProcessIdentityEvent(ctx, identEvent) // all the post rules for _, pr := range capture.PostRecords { diff --git a/automod/engine/context.go b/automod/engine/context.go index cc5b6a5e8..d925dab5a 100644 --- a/automod/engine/context.go +++ b/automod/engine/context.go @@ -292,6 +292,10 @@ func (c *RecordContext) TakedownBlob(cid string) { c.effects.TakedownBlob(cid) } +func (c *RecordContext) PersistRecordOzoneEvent() { + c.effects.PersistRecordOzoneEvent() +} + func (c *NotificationContext) Reject() { c.effects.Reject() } diff --git a/automod/engine/effects.go b/automod/engine/effects.go index ed5ac2998..806909229 100644 --- a/automod/engine/effects.go +++ b/automod/engine/effects.go @@ -58,6 +58,8 @@ type Effects struct { RejectEvent bool // Services, if any, which should blast out a notification about this even (eg, Slack) NotifyServices []string + // If "true", and Ozone event history is configured/enable, then sent a mod event to ozone backend for this event + PersistOzoneRecordEvent bool } // Enqueues the named counter to be incremented at the end of all rule processing. Will automatically increment for all time periods. @@ -199,3 +201,8 @@ func (e *Effects) Notify(srv string) { func (e *Effects) Reject() { e.RejectEvent = true } + +// Marks that this subject should be recorded in ozone history +func (e *Effects) PersistRecordOzoneEvent() { + e.PersistOzoneRecordEvent = true +} diff --git a/automod/engine/engine.go b/automod/engine/engine.go index ae80db3ee..fdbf7a949 100644 --- a/automod/engine/engine.go +++ b/automod/engine/engine.go @@ -7,6 +7,7 @@ import ( "net/http" "time" + comatproto "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/atproto/identity" "github.com/bluesky-social/indigo/atproto/syntax" "github.com/bluesky-social/indigo/automod/cachestore" @@ -22,11 +23,17 @@ const ( notificationEventTimeout = 5 * time.Second ) +type EngineConfig struct { + // if true, sent firehose identity and account events to ozone backend as events + PersistSubjectHistoryOzone bool +} + // runtime for executing rules, managing state, and recording moderation actions. // // NOTE: careful when initializing: several fields must not be nil or zero, even though they are pointer type. type Engine struct { Logger *slog.Logger + Config EngineConfig Directory identity.Directory Rules RuleSet Counters countstore.CountStore @@ -45,10 +52,10 @@ type Engine struct { BlobClient *http.Client } -// Entrypoint for external code pushing arbitrary identity events in to the engine. +// Entrypoint for external code pushing #identity events in to the engine. // // This method can be called concurrently, though cached state may end up inconsistent if multiple events for the same account (DID) are processed in parallel. -func (eng *Engine) ProcessIdentityEvent(ctx context.Context, typ string, did syntax.DID) error { +func (eng *Engine) ProcessIdentityEvent(ctx context.Context, evt comatproto.SyncSubscribeRepos_Identity) error { eventProcessCount.WithLabelValues("identity").Inc() start := time.Now() defer func() { @@ -56,10 +63,15 @@ func (eng *Engine) ProcessIdentityEvent(ctx context.Context, typ string, did syn eventProcessDuration.WithLabelValues("identity").Observe(duration.Seconds()) }() + did, err := syntax.ParseDID(evt.Did) + if err != nil { + return fmt.Errorf("bad DID in repo #identity event (%s): %w", evt.Did, err) + } + // similar to an HTTP server, we want to recover any panics from rule execution defer func() { if r := recover(); r != nil { - eng.Logger.Error("automod event execution exception", "err", r, "did", did, "type", typ) + eng.Logger.Error("automod event execution exception", "err", r, "did", did, "type", "identity") eventErrorCount.WithLabelValues("identity").Inc() } }() @@ -70,6 +82,7 @@ func (eng *Engine) ProcessIdentityEvent(ctx context.Context, typ string, did syn if err := eng.PurgeAccountCaches(ctx, did); err != nil { eng.Logger.Error("failed to purge identity cache; identity rule may not run correctly", "err", err) } + // TODO(bnewbold): if it was a tombstone, this might fail ident, err := eng.Directory.LookupDID(ctx, did) if err != nil { eventErrorCount.WithLabelValues("identity").Inc() @@ -91,6 +104,11 @@ func (eng *Engine) ProcessIdentityEvent(ctx context.Context, typ string, did syn return fmt.Errorf("rule execution failed: %w", err) } eng.CanonicalLogLineAccount(&ac) + if eng.Config.PersistSubjectHistoryOzone { + if err := eng.RerouteIdentityEventToOzone(ctx, &evt); err != nil { + return fmt.Errorf("failed to persist identity event to ozone history: %w", err) + } + } if err := eng.persistAccountModActions(&ac); err != nil { eventErrorCount.WithLabelValues("identity").Inc() return fmt.Errorf("failed to persist actions for identity event: %w", err) @@ -102,6 +120,74 @@ func (eng *Engine) ProcessIdentityEvent(ctx context.Context, typ string, did syn return nil } +// Entrypoint for external code pushing #account events in to the engine. +// +// This method can be called concurrently, though cached state may end up inconsistent if multiple events for the same account (DID) are processed in parallel. +func (eng *Engine) ProcessAccountEvent(ctx context.Context, evt comatproto.SyncSubscribeRepos_Account) error { + eventProcessCount.WithLabelValues("account").Inc() + start := time.Now() + defer func() { + duration := time.Since(start) + eventProcessDuration.WithLabelValues("account").Observe(duration.Seconds()) + }() + + did, err := syntax.ParseDID(evt.Did) + if err != nil { + return fmt.Errorf("bad DID in repo #account event (%s): %w", evt.Did, err) + } + + // similar to an HTTP server, we want to recover any panics from rule execution + defer func() { + if r := recover(); r != nil { + eng.Logger.Error("automod event execution exception", "err", r, "did", did, "type", "account") + eventErrorCount.WithLabelValues("account").Inc() + } + }() + ctx, cancel := context.WithTimeout(ctx, identityEventTimeout) + defer cancel() + + // first purge any caches; we need to re-resolve from scratch on account updates + if err := eng.PurgeAccountCaches(ctx, did); err != nil { + eng.Logger.Error("failed to purge account cache; account rule may not run correctly", "err", err) + } + // TODO(bnewbold): if it was a tombstone, this might fail + ident, err := eng.Directory.LookupDID(ctx, did) + if err != nil { + eventErrorCount.WithLabelValues("account").Inc() + return fmt.Errorf("resolving identity: %w", err) + } + if ident == nil { + eventErrorCount.WithLabelValues("account").Inc() + return fmt.Errorf("identity not found for DID: %s", did.String()) + } + + am, err := eng.GetAccountMeta(ctx, ident) + if err != nil { + eventErrorCount.WithLabelValues("account").Inc() + return fmt.Errorf("failed to fetch account metadata: %w", err) + } + ac := NewAccountContext(ctx, eng, *am) + if err := eng.Rules.CallAccountRules(&ac); err != nil { + eventErrorCount.WithLabelValues("account").Inc() + return fmt.Errorf("rule execution failed: %w", err) + } + eng.CanonicalLogLineAccount(&ac) + if eng.Config.PersistSubjectHistoryOzone { + if err := eng.RerouteAccountEventToOzone(ctx, &evt); err != nil { + return fmt.Errorf("failed to persist account event to ozone history: %w", err) + } + } + if err := eng.persistAccountModActions(&ac); err != nil { + eventErrorCount.WithLabelValues("account").Inc() + return fmt.Errorf("failed to persist actions for account event: %w", err) + } + if err := eng.persistCounters(ctx, ac.effects); err != nil { + eventErrorCount.WithLabelValues("account").Inc() + return fmt.Errorf("failed to persist counters for account event: %w", err) + } + return nil +} + // Entrypoint for external code pushing repository updates. A simple repo commit results in multiple calls. // // This method can be called concurrently, though cached state may end up inconsistent if multiple events for the same account (DID) are processed in parallel. @@ -173,6 +259,11 @@ func (eng *Engine) ProcessRecordOp(ctx context.Context, op RecordOp) error { eventErrorCount.WithLabelValues("record").Inc() return fmt.Errorf("failed to persist counts for record event: %w", err) } + if eng.Config.PersistSubjectHistoryOzone { + if err := eng.RerouteRecordOpToOzone(&rc); err != nil { + return fmt.Errorf("failed to persist account event to ozone history: %w", err) + } + } return nil } diff --git a/automod/engine/reroute.go b/automod/engine/reroute.go index 5bb56d7f1..21d2a0298 100644 --- a/automod/engine/reroute.go +++ b/automod/engine/reroute.go @@ -10,7 +10,7 @@ import ( func (eng *Engine) RerouteAccountEventToOzone(c context.Context, e *comatproto.SyncSubscribeRepos_Account) error { comment := "[automod]: Account status event" - eng.RerouteEventToOzone(c, toolsozone.ModerationEmitEvent_Input_Event{ + eng.rerouteEventToOzone(c, toolsozone.ModerationEmitEvent_Input_Event{ ModerationDefs_AccountEvent: &toolsozone.ModerationDefs_AccountEvent{ Comment: &comment, Timestamp: e.Time, @@ -25,10 +25,11 @@ func (eng *Engine) RerouteAccountEventToOzone(c context.Context, e *comatproto.S return nil } +/* func (eng *Engine) RerouteTombstoneEventToOzone(c context.Context, e *comatproto.SyncSubscribeRepos_Tombstone) error { comment := "[automod]: Tombstone event" tombstone := true - eng.RerouteEventToOzone(c, toolsozone.ModerationEmitEvent_Input_Event{ + eng.rerouteEventToOzone(c, toolsozone.ModerationEmitEvent_Input_Event{ ModerationDefs_IdentityEvent: &toolsozone.ModerationDefs_IdentityEvent{ Comment: &comment, // @TODO: These don't seem to exist in the Identity event? @@ -44,21 +45,22 @@ func (eng *Engine) RerouteTombstoneEventToOzone(c context.Context, e *comatproto }) return nil } +*/ -func (eng *Engine) RerouteRecordOpToOzone(c context.Context, e *RecordOp) error { +func (eng *Engine) RerouteRecordOpToOzone(c *RecordContext) error { comment := "[automod]: Record event" - eng.RerouteEventToOzone(c, toolsozone.ModerationEmitEvent_Input_Event{ + eng.rerouteEventToOzone(c.Ctx, toolsozone.ModerationEmitEvent_Input_Event{ ModerationDefs_RecordEvent: &toolsozone.ModerationDefs_RecordEvent{ Comment: &comment, - Op: e.Action, + Op: c.RecordOp.Action, Timestamp: time.Now().Format(time.RFC3339), }, }, toolsozone.ModerationEmitEvent_Input_Subject{ RepoStrongRef: &comatproto.RepoStrongRef{ LexiconTypeID: "com.atproto.repo.strongRef", - Uri: e.ATURI().String(), - Cid: e.CID.String(), + Uri: c.RecordOp.ATURI().String(), + Cid: c.RecordOp.CID.String(), }, }) @@ -69,7 +71,7 @@ func (eng *Engine) RerouteIdentityEventToOzone(c context.Context, e *comatproto. comment := "[automod]: Identity event" tombstone := false - eng.RerouteEventToOzone(c, toolsozone.ModerationEmitEvent_Input_Event{ + eng.rerouteEventToOzone(c, toolsozone.ModerationEmitEvent_Input_Event{ ModerationDefs_IdentityEvent: &toolsozone.ModerationDefs_IdentityEvent{ Comment: &comment, Handle: e.Handle, @@ -144,7 +146,7 @@ func (eng *Engine) IsDuplicatingEvent(ctx context.Context, event toolsozone.Mode return false, nil } -func (eng *Engine) RerouteEventToOzone(ctx context.Context, event toolsozone.ModerationEmitEvent_Input_Event, subject toolsozone.ModerationEmitEvent_Input_Subject) error { +func (eng *Engine) rerouteEventToOzone(ctx context.Context, event toolsozone.ModerationEmitEvent_Input_Event, subject toolsozone.ModerationEmitEvent_Input_Subject) error { // if we can't actually talk to service, bail out early if eng.OzoneClient == nil { eng.Logger.Warn("not persisting ozone account event, mod service client not configured") diff --git a/automod/engine/ruleset.go b/automod/engine/ruleset.go index 0b7d90cc4..4c72ef8f9 100644 --- a/automod/engine/ruleset.go +++ b/automod/engine/ruleset.go @@ -16,6 +16,7 @@ type RuleSet struct { RecordRules []RecordRuleFunc RecordDeleteRules []RecordRuleFunc IdentityRules []IdentityRuleFunc + AccountRules []AccountRuleFunc BlobRules []BlobRuleFunc NotificationRules []NotificationRuleFunc OzoneEventRules []OzoneEventRuleFunc @@ -89,6 +90,17 @@ func (r *RuleSet) CallIdentityRules(c *AccountContext) error { return nil } +// Executes rules for account update events. +func (r *RuleSet) CallAccountRules(c *AccountContext) error { + for _, f := range r.AccountRules { + err := f(c) + if err != nil { + c.Logger.Error("account rule execution failed", "err", err) + } + } + return nil +} + func (r *RuleSet) CallNotificationRules(c *NotificationContext) error { for _, f := range r.NotificationRules { err := f(c) diff --git a/automod/engine/ruletypes.go b/automod/engine/ruletypes.go index a86567ead..27d4a149f 100644 --- a/automod/engine/ruletypes.go +++ b/automod/engine/ruletypes.go @@ -6,6 +6,7 @@ import ( ) type IdentityRuleFunc = func(c *AccountContext) error +type AccountRuleFunc = func(c *AccountContext) error type RecordRuleFunc = func(c *RecordContext) error type PostRuleFunc = func(c *RecordContext, post *appbsky.FeedPost) error type ProfileRuleFunc = func(c *RecordContext, profile *appbsky.ActorProfile) error diff --git a/automod/pkg.go b/automod/pkg.go index 2a2147e49..c8837676e 100644 --- a/automod/pkg.go +++ b/automod/pkg.go @@ -6,6 +6,7 @@ import ( ) type Engine = engine.Engine +type EngineConfig = engine.EngineConfig type AccountMeta = engine.AccountMeta type RuleSet = engine.RuleSet @@ -19,6 +20,7 @@ type NotificationContext = engine.NotificationContext type RecordOp = engine.RecordOp type IdentityRuleFunc = engine.IdentityRuleFunc +type AccountRuleFunc = engine.AccountRuleFunc type RecordRuleFunc = engine.RecordRuleFunc type PostRuleFunc = engine.PostRuleFunc type ProfileRuleFunc = engine.ProfileRuleFunc diff --git a/automod/rules/all.go b/automod/rules/all.go index 33f9c16a6..0f5ebb4c0 100644 --- a/automod/rules/all.go +++ b/automod/rules/all.go @@ -39,6 +39,7 @@ func DefaultRules() automod.RuleSet { BadWordRecordKeyRule, BadWordOtherRecordRule, TooManyRepostRule, + OzoneRecordHistoryPersistRule, }, RecordDeleteRules: []automod.RecordRuleFunc{ DeleteInteractionRule, diff --git a/automod/rules/ozone_persist.go b/automod/rules/ozone_persist.go new file mode 100644 index 000000000..c13c0a84c --- /dev/null +++ b/automod/rules/ozone_persist.go @@ -0,0 +1,25 @@ +package rules + +import ( + "github.com/bluesky-social/indigo/automod" +) + +var _ automod.RecordRuleFunc = OzoneRecordHistoryPersistRule + +func OzoneRecordHistoryPersistRule(c *automod.RecordContext) error { + // TODO: flesh out this logic + // based on record type + switch c.RecordOp.Collection { + case "app.bsky.labeler.service": + c.PersistRecordOzoneEvent() + case "app.bsky.feed.post": + if c.RecordOp.Action == "update" { + } + case "app.bsky.actor.profile": + // TODO: fix this logic + if len(c.Account.AccountLabels) > 2 { + c.PersistRecordOzoneEvent() + } + } + return nil +} diff --git a/cmd/hepa/consumer.go b/cmd/hepa/consumer.go index 2039e2620..d91cade38 100644 --- a/cmd/hepa/consumer.go +++ b/cmd/hepa/consumer.go @@ -54,71 +54,20 @@ func (s *Server) RunConsumer(ctx context.Context) error { }, RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { atomic.StoreInt64(&s.lastSeq, evt.Seq) - did, err := syntax.ParseDID(evt.Did) - if err != nil { - s.logger.Error("bad DID in RepoIdentity event", "did", evt.Did, "seq", evt.Seq, "err", err) - return nil - } - if err := s.engine.ProcessIdentityEvent(ctx, "identity", did); err != nil { + if err := s.engine.ProcessIdentityEvent(ctx, *evt); err != nil { s.logger.Error("processing repo identity failed", "did", evt.Did, "seq", evt.Seq, "err", err) } - - if s.rerouteEvents { - if err := s.engine.RerouteIdentityEventToOzone(ctx, evt); err != nil { - s.logger.Error("rerouting identity event to ozone failed", "did", evt.Did, "seq", evt.Seq, "err", err) - } - } - return nil }, RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error { atomic.StoreInt64(&s.lastSeq, evt.Seq) - did, err := syntax.ParseDID(evt.Did) - if err != nil { - s.logger.Error("bad DID in RepoAccount event", "did", evt.Did, "seq", evt.Seq, "err", err) - return nil - } - if err := s.engine.ProcessIdentityEvent(ctx, "account", did); err != nil { + if err := s.engine.ProcessAccountEvent(ctx, *evt); err != nil { s.logger.Error("processing repo account failed", "did", evt.Did, "seq", evt.Seq, "err", err) } - if s.rerouteEvents { - if err := s.engine.RerouteAccountEventToOzone(ctx, evt); err != nil { - s.logger.Error("rerouting account event to ozone failed", "did", evt.Did, "seq", evt.Seq, "err", err) - } - } - return nil - }, - // TODO: deprecated - RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { - atomic.StoreInt64(&s.lastSeq, evt.Seq) - did, err := syntax.ParseDID(evt.Did) - if err != nil { - s.logger.Error("bad DID in RepoHandle event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) - return nil - } - if err := s.engine.ProcessIdentityEvent(ctx, "handle", did); err != nil { - s.logger.Error("processing handle update failed", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) - } - return nil - }, - // TODO: deprecated - RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error { - atomic.StoreInt64(&s.lastSeq, evt.Seq) - did, err := syntax.ParseDID(evt.Did) - if err != nil { - s.logger.Error("bad DID in RepoTombstone event", "did", evt.Did, "seq", evt.Seq, "err", err) - return nil - } - if err := s.engine.ProcessIdentityEvent(ctx, "tombstone", did); err != nil { - s.logger.Error("processing repo tombstone failed", "did", evt.Did, "seq", evt.Seq, "err", err) - } - if s.rerouteEvents { - if err := s.engine.RerouteTombstoneEventToOzone(ctx, evt); err != nil { - s.logger.Error("rerouting tombstone event to ozone failed", "did", evt.Did, "seq", evt.Seq, "err", err) - } - } return nil }, + // NOTE: no longer process #handle events + // NOTE: no longer process #tombstone events } var scheduler events.Scheduler @@ -184,13 +133,6 @@ func (s *Server) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSubsc return nil } - // empty commit is a special case, temporarily, basically indicates "new account" - if len(evt.Ops) == 0 { - if err := s.engine.ProcessIdentityEvent(ctx, "create", did); err != nil { - s.logger.Error("processing handle update failed", "did", evt.Repo, "rev", evt.Rev, "seq", evt.Seq, "err", err) - } - } - for _, op := range evt.Ops { logger = logger.With("eventKind", op.Action, "path", op.Path) collection, rkey, err := splitRepoPath(op.Path) @@ -236,11 +178,6 @@ func (s *Server) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSubsc logger.Error("engine failed to process record", "err", err) continue } - if s.rerouteEvents { - if err := s.engine.RerouteRecordOpToOzone(ctx, &op); err != nil { - logger.Error("rerouting record create/update event to ozone failed", "err", err) - } - } case repomgr.EvtKindDeleteRecord: op := automod.RecordOp{ Action: automod.DeleteOp, @@ -255,11 +192,6 @@ func (s *Server) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSubsc logger.Error("engine failed to process record", "err", err) continue } - if s.rerouteEvents { - if err := s.engine.RerouteRecordOpToOzone(ctx, &op); err != nil { - logger.Error("rerouting record delete event to ozone failed", "err", err) - } - } default: // TODO: should this be an error? } diff --git a/cmd/hepa/server.go b/cmd/hepa/server.go index 572e3f4cc..068f3718b 100644 --- a/cmd/hepa/server.go +++ b/cmd/hepa/server.go @@ -41,9 +41,6 @@ type Server struct { // same as lastSeq, but for Ozone timestamp cursor. the value is a string. lastOzoneCursor atomic.Value - - // dictates if firehose events should be rerouted to configured destinations - rerouteEvents bool } type Config struct { @@ -224,6 +221,9 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) { OzoneClient: ozoneClient, AdminClient: adminClient, BlobClient: blobClient, + Config: automod.EngineConfig{ + PersistSubjectHistoryOzone: config.RerouteEvents, + }, } s := &Server{ @@ -232,7 +232,6 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) { logger: logger, engine: &engine, rdb: rdb, - rerouteEvents: config.RerouteEvents, } return s, nil From 6e68a26abea6b814dab5544af192d3950e7f61cf Mon Sep 17 00:00:00 2001 From: Foysal Ahamed Date: Fri, 20 Sep 2024 16:32:22 +0200 Subject: [PATCH 6/8] :sparkles: Emit record delete events on post/profile with label and appeal --- automod/countstore/countstore.go | 1 + automod/countstore/countstore_mem.go | 6 +++++ automod/countstore/countstore_redis.go | 12 +++++++++ automod/engine/context.go | 3 +++ automod/engine/effects.go | 11 ++++++++ automod/rules/all.go | 1 + automod/rules/ozone_persist.go | 18 +++++++++---- automod/rules/resolve_appeal_on_delete.go | 31 +++++++++++++++++++++++ 8 files changed, 78 insertions(+), 5 deletions(-) create mode 100644 automod/rules/resolve_appeal_on_delete.go diff --git a/automod/countstore/countstore.go b/automod/countstore/countstore.go index a3ae2b9d1..5531541fd 100644 --- a/automod/countstore/countstore.go +++ b/automod/countstore/countstore.go @@ -48,6 +48,7 @@ type CountStore interface { // TODO: batch increment method GetCountDistinct(ctx context.Context, name, bucket, period string) (int, error) IncrementDistinct(ctx context.Context, name, bucket, val string) error + Reset(ctx context.Context, name, val string) error } func periodBucket(name, val, period string) string { diff --git a/automod/countstore/countstore_mem.go b/automod/countstore/countstore_mem.go index f33c076ee..679ed9e9c 100644 --- a/automod/countstore/countstore_mem.go +++ b/automod/countstore/countstore_mem.go @@ -40,6 +40,12 @@ func (s MemCountStore) Increment(ctx context.Context, name, val string) error { return nil } +func (s *MemCountStore) Reset(ctx context.Context, name, val string) error { + key := periodBucket(name, val, PeriodTotal) + s.Counts.Delete(key) + return nil +} + func (s MemCountStore) IncrementPeriod(ctx context.Context, name, val, period string) error { k := periodBucket(name, val, period) s.Counts.Compute(k, func(oldVal int, _ bool) (int, bool) { diff --git a/automod/countstore/countstore_redis.go b/automod/countstore/countstore_redis.go index 2e42c96f8..15e7be342 100644 --- a/automod/countstore/countstore_redis.go +++ b/automod/countstore/countstore_redis.go @@ -66,6 +66,18 @@ func (s *RedisCountStore) Increment(ctx context.Context, name, val string) error return err } +// Deletes a counter key +func (s *RedisCountStore) Reset(ctx context.Context, name, val string) error { + var key string + + // increment multiple counters in a single redis round-trip + multi := s.Client.Pipeline() + key = redisCountPrefix + periodBucket(name, val, PeriodHour) + multi.Del(ctx, key) + _, err := multi.Exec(ctx) + return err +} + // Variant of Increment() which only acts on a single specified time period. The intended us of this variant is to control the total number of counters persisted, by using a relatively short time period, for which the counters will expire. func (s *RedisCountStore) IncrementPeriod(ctx context.Context, name, val, period string) error { diff --git a/automod/engine/context.go b/automod/engine/context.go index d925dab5a..108fde7cf 100644 --- a/automod/engine/context.go +++ b/automod/engine/context.go @@ -244,6 +244,9 @@ func (c *BaseContext) Increment(name, val string) { c.effects.Increment(name, val) } +func (c *BaseContext) ResetCounter(name, val string) { + c.effects.ResetCounter(name, val) +} func (c *BaseContext) IncrementDistinct(name, bucket, val string) { c.effects.IncrementDistinct(name, bucket, val) } diff --git a/automod/engine/effects.go b/automod/engine/effects.go index 806909229..c45519e35 100644 --- a/automod/engine/effects.go +++ b/automod/engine/effects.go @@ -34,6 +34,7 @@ type Effects struct { mu sync.Mutex // List of counters which should be incremented as part of processing this event. These are collected during rule execution and persisted in bulk at the end. CounterIncrements []CounterRef + CounterResets []CounterRef // Similar to "CounterIncrements", but for "distinct" style counters CounterDistinctIncrements []CounterDistinctRef // TODO: better variable names // Label values which should be applied to the overall account, as a result of rule execution. @@ -79,6 +80,16 @@ func (e *Effects) IncrementPeriod(name, val string, period string) { e.CounterIncrements = append(e.CounterIncrements, CounterRef{Name: name, Val: val, Period: &period}) } +// Enqueues the named counter to be reset at the end of all rule processing. +// +// "name" is the counter namespace. +// "val" is the specific counter with that namespace. +func (e *Effects) ResetCounter(name, val string) { + e.mu.Lock() + defer e.mu.Unlock() + e.CounterResets = append(e.CounterResets, CounterRef{Name: name, Val: val}) +} + // Enqueues the named "distinct value" counter based on the supplied string value ("val") to be incremented at the end of all rule processing. Will automatically increment for all time periods. func (e *Effects) IncrementDistinct(name, bucket, val string) { e.mu.Lock() diff --git a/automod/rules/all.go b/automod/rules/all.go index ca056e46f..713f6722d 100644 --- a/automod/rules/all.go +++ b/automod/rules/all.go @@ -62,6 +62,7 @@ func DefaultRules() automod.RuleSet { }, OzoneEventRules: []automod.OzoneEventRuleFunc{ HarassmentProtectionOzoneEventRule, + MarkAppealOzoneEventRule, }, } return rules diff --git a/automod/rules/ozone_persist.go b/automod/rules/ozone_persist.go index c13c0a84c..756f6aa79 100644 --- a/automod/rules/ozone_persist.go +++ b/automod/rules/ozone_persist.go @@ -2,22 +2,30 @@ package rules import ( "github.com/bluesky-social/indigo/automod" + "github.com/bluesky-social/indigo/automod/countstore" ) var _ automod.RecordRuleFunc = OzoneRecordHistoryPersistRule func OzoneRecordHistoryPersistRule(c *automod.RecordContext) error { - // TODO: flesh out this logic - // based on record type switch c.RecordOp.Collection { case "app.bsky.labeler.service": c.PersistRecordOzoneEvent() case "app.bsky.feed.post": - if c.RecordOp.Action == "update" { + // @TODO: we should probably persist if a deleted post has reports on it but right now, we are not keeping track of reports on records + if c.RecordOp.Action == "delete" { + // If a post being deleted has an active appeal, persist the event + if c.GetCount("appeal", c.RecordOp.ATURI().String(), countstore.PeriodTotal) > 0 { + c.PersistRecordOzoneEvent() + } + } case "app.bsky.actor.profile": - // TODO: fix this logic - if len(c.Account.AccountLabels) > 2 { + hasLabels := len(c.Account.AccountLabels) > 0 + // If there is an appeal on the account or the profile record + // Appeal counts are reset when appeals are resolved so this should only be true when there is an unresolved appeal + hasAppeals := c.GetCount("appeal", c.Account.Identity.DID.String(), countstore.PeriodTotal) > 0 || c.GetCount("appeal", c.RecordOp.ATURI().String(), countstore.PeriodTotal) > 0 + if hasLabels || hasAppeals { c.PersistRecordOzoneEvent() } } diff --git a/automod/rules/resolve_appeal_on_delete.go b/automod/rules/resolve_appeal_on_delete.go new file mode 100644 index 000000000..dc228dbd2 --- /dev/null +++ b/automod/rules/resolve_appeal_on_delete.go @@ -0,0 +1,31 @@ +package rules + +import ( + "github.com/bluesky-social/indigo/automod" +) + +var _ automod.OzoneEventRuleFunc = MarkAppealOzoneEventRule + +// looks for appeals on records/accounts and flags subjects +func MarkAppealOzoneEventRule(c *automod.OzoneEventContext) error { + isResolveAppealEvent := c.Event.Event.ModerationDefs_ModEventResolveAppeal != nil + // appeals are just report events emitted by the author of the reported content with a special report type + isAppealEvent := c.Event.Event.ModerationDefs_ModEventReport != nil && *c.Event.Event.ModerationDefs_ModEventReport.ReportType == "com.atproto.moderation.defs#reasonAppeal" + + if !isAppealEvent && !isResolveAppealEvent { + return nil + } + + counterKey := c.Event.SubjectDID.String() + if c.Event.SubjectURI != nil { + counterKey = c.Event.SubjectURI.String() + } + + if isAppealEvent { + c.Increment("appeal", counterKey) + } else { + c.ResetCounter("appeal", counterKey) + } + + return nil +} From 669a712f7d8ea3f2f525f81f4bd02e7146adfef0 Mon Sep 17 00:00:00 2001 From: Foysal Ahamed Date: Tue, 15 Oct 2024 16:14:49 +0200 Subject: [PATCH 7/8] :sparkles: Added count for mod-event and reroute event based on mod-event count --- automod/countstore/countstore.go | 1 - automod/countstore/countstore_mem.go | 6 ----- automod/countstore/countstore_redis.go | 12 --------- automod/engine/context.go | 4 --- automod/engine/effects.go | 11 -------- automod/rules/all.go | 2 +- automod/rules/mod_event.go | 19 ++++++++++++++ automod/rules/ozone_persist.go | 12 ++------- automod/rules/resolve_appeal_on_delete.go | 31 ----------------------- 9 files changed, 22 insertions(+), 76 deletions(-) create mode 100644 automod/rules/mod_event.go delete mode 100644 automod/rules/resolve_appeal_on_delete.go diff --git a/automod/countstore/countstore.go b/automod/countstore/countstore.go index 5531541fd..a3ae2b9d1 100644 --- a/automod/countstore/countstore.go +++ b/automod/countstore/countstore.go @@ -48,7 +48,6 @@ type CountStore interface { // TODO: batch increment method GetCountDistinct(ctx context.Context, name, bucket, period string) (int, error) IncrementDistinct(ctx context.Context, name, bucket, val string) error - Reset(ctx context.Context, name, val string) error } func periodBucket(name, val, period string) string { diff --git a/automod/countstore/countstore_mem.go b/automod/countstore/countstore_mem.go index 679ed9e9c..f33c076ee 100644 --- a/automod/countstore/countstore_mem.go +++ b/automod/countstore/countstore_mem.go @@ -40,12 +40,6 @@ func (s MemCountStore) Increment(ctx context.Context, name, val string) error { return nil } -func (s *MemCountStore) Reset(ctx context.Context, name, val string) error { - key := periodBucket(name, val, PeriodTotal) - s.Counts.Delete(key) - return nil -} - func (s MemCountStore) IncrementPeriod(ctx context.Context, name, val, period string) error { k := periodBucket(name, val, period) s.Counts.Compute(k, func(oldVal int, _ bool) (int, bool) { diff --git a/automod/countstore/countstore_redis.go b/automod/countstore/countstore_redis.go index 15e7be342..2e42c96f8 100644 --- a/automod/countstore/countstore_redis.go +++ b/automod/countstore/countstore_redis.go @@ -66,18 +66,6 @@ func (s *RedisCountStore) Increment(ctx context.Context, name, val string) error return err } -// Deletes a counter key -func (s *RedisCountStore) Reset(ctx context.Context, name, val string) error { - var key string - - // increment multiple counters in a single redis round-trip - multi := s.Client.Pipeline() - key = redisCountPrefix + periodBucket(name, val, PeriodHour) - multi.Del(ctx, key) - _, err := multi.Exec(ctx) - return err -} - // Variant of Increment() which only acts on a single specified time period. The intended us of this variant is to control the total number of counters persisted, by using a relatively short time period, for which the counters will expire. func (s *RedisCountStore) IncrementPeriod(ctx context.Context, name, val, period string) error { diff --git a/automod/engine/context.go b/automod/engine/context.go index 108fde7cf..16da748df 100644 --- a/automod/engine/context.go +++ b/automod/engine/context.go @@ -243,10 +243,6 @@ func (c *BaseContext) GetAccountMeta(did syntax.DID) *AccountMeta { func (c *BaseContext) Increment(name, val string) { c.effects.Increment(name, val) } - -func (c *BaseContext) ResetCounter(name, val string) { - c.effects.ResetCounter(name, val) -} func (c *BaseContext) IncrementDistinct(name, bucket, val string) { c.effects.IncrementDistinct(name, bucket, val) } diff --git a/automod/engine/effects.go b/automod/engine/effects.go index c45519e35..806909229 100644 --- a/automod/engine/effects.go +++ b/automod/engine/effects.go @@ -34,7 +34,6 @@ type Effects struct { mu sync.Mutex // List of counters which should be incremented as part of processing this event. These are collected during rule execution and persisted in bulk at the end. CounterIncrements []CounterRef - CounterResets []CounterRef // Similar to "CounterIncrements", but for "distinct" style counters CounterDistinctIncrements []CounterDistinctRef // TODO: better variable names // Label values which should be applied to the overall account, as a result of rule execution. @@ -80,16 +79,6 @@ func (e *Effects) IncrementPeriod(name, val string, period string) { e.CounterIncrements = append(e.CounterIncrements, CounterRef{Name: name, Val: val, Period: &period}) } -// Enqueues the named counter to be reset at the end of all rule processing. -// -// "name" is the counter namespace. -// "val" is the specific counter with that namespace. -func (e *Effects) ResetCounter(name, val string) { - e.mu.Lock() - defer e.mu.Unlock() - e.CounterResets = append(e.CounterResets, CounterRef{Name: name, Val: val}) -} - // Enqueues the named "distinct value" counter based on the supplied string value ("val") to be incremented at the end of all rule processing. Will automatically increment for all time periods. func (e *Effects) IncrementDistinct(name, bucket, val string) { e.mu.Lock() diff --git a/automod/rules/all.go b/automod/rules/all.go index 713f6722d..475ee3f7d 100644 --- a/automod/rules/all.go +++ b/automod/rules/all.go @@ -62,7 +62,7 @@ func DefaultRules() automod.RuleSet { }, OzoneEventRules: []automod.OzoneEventRuleFunc{ HarassmentProtectionOzoneEventRule, - MarkAppealOzoneEventRule, + CountModEventRule, }, } return rules diff --git a/automod/rules/mod_event.go b/automod/rules/mod_event.go new file mode 100644 index 000000000..f21ab2995 --- /dev/null +++ b/automod/rules/mod_event.go @@ -0,0 +1,19 @@ +package rules + +import ( + "github.com/bluesky-social/indigo/automod" +) + +var _ automod.OzoneEventRuleFunc = CountModEventRule + +// looks for appeals on records/accounts and flags subjects +func CountModEventRule(c *automod.OzoneEventContext) error { + counterKey := c.Event.SubjectDID.String() + if c.Event.SubjectURI != nil { + counterKey = c.Event.SubjectURI.String() + } + + c.Increment("mod-event", counterKey) + + return nil +} diff --git a/automod/rules/ozone_persist.go b/automod/rules/ozone_persist.go index 756f6aa79..afaabd0bd 100644 --- a/automod/rules/ozone_persist.go +++ b/automod/rules/ozone_persist.go @@ -2,7 +2,6 @@ package rules import ( "github.com/bluesky-social/indigo/automod" - "github.com/bluesky-social/indigo/automod/countstore" ) var _ automod.RecordRuleFunc = OzoneRecordHistoryPersistRule @@ -12,20 +11,13 @@ func OzoneRecordHistoryPersistRule(c *automod.RecordContext) error { case "app.bsky.labeler.service": c.PersistRecordOzoneEvent() case "app.bsky.feed.post": - // @TODO: we should probably persist if a deleted post has reports on it but right now, we are not keeping track of reports on records if c.RecordOp.Action == "delete" { - // If a post being deleted has an active appeal, persist the event - if c.GetCount("appeal", c.RecordOp.ATURI().String(), countstore.PeriodTotal) > 0 { + if c.GetCount("mod-event", c.RecordOp.ATURI().String(), automod.PeriodTotal) > 0 { c.PersistRecordOzoneEvent() } - } case "app.bsky.actor.profile": - hasLabels := len(c.Account.AccountLabels) > 0 - // If there is an appeal on the account or the profile record - // Appeal counts are reset when appeals are resolved so this should only be true when there is an unresolved appeal - hasAppeals := c.GetCount("appeal", c.Account.Identity.DID.String(), countstore.PeriodTotal) > 0 || c.GetCount("appeal", c.RecordOp.ATURI().String(), countstore.PeriodTotal) > 0 - if hasLabels || hasAppeals { + if c.GetCount("mod-event", c.RecordOp.ATURI().String(), automod.PeriodTotal) > 0 { c.PersistRecordOzoneEvent() } } diff --git a/automod/rules/resolve_appeal_on_delete.go b/automod/rules/resolve_appeal_on_delete.go deleted file mode 100644 index dc228dbd2..000000000 --- a/automod/rules/resolve_appeal_on_delete.go +++ /dev/null @@ -1,31 +0,0 @@ -package rules - -import ( - "github.com/bluesky-social/indigo/automod" -) - -var _ automod.OzoneEventRuleFunc = MarkAppealOzoneEventRule - -// looks for appeals on records/accounts and flags subjects -func MarkAppealOzoneEventRule(c *automod.OzoneEventContext) error { - isResolveAppealEvent := c.Event.Event.ModerationDefs_ModEventResolveAppeal != nil - // appeals are just report events emitted by the author of the reported content with a special report type - isAppealEvent := c.Event.Event.ModerationDefs_ModEventReport != nil && *c.Event.Event.ModerationDefs_ModEventReport.ReportType == "com.atproto.moderation.defs#reasonAppeal" - - if !isAppealEvent && !isResolveAppealEvent { - return nil - } - - counterKey := c.Event.SubjectDID.String() - if c.Event.SubjectURI != nil { - counterKey = c.Event.SubjectURI.String() - } - - if isAppealEvent { - c.Increment("appeal", counterKey) - } else { - c.ResetCounter("appeal", counterKey) - } - - return nil -} From e6daafb0f8010f62dbce3f38e3b2c4a2e280621b Mon Sep 17 00:00:00 2001 From: Foysal Ahamed Date: Fri, 15 Nov 2024 21:43:02 +0100 Subject: [PATCH 8/8] :sparkles: Fix breaking changes due to lexicon changes --- api/chat/convodefs.go | 1 + api/ozone/moderationdefs.go | 72 +++++++++++++++++++++++++--- api/ozone/moderationqueryStatuses.go | 12 ++++- api/ozone/settingdefs.go | 23 +++++++++ api/ozone/settinglistOptions.go | 38 +++++++++++++++ api/ozone/settingremoveOptions.go | 31 ++++++++++++ api/ozone/settingupsertOption.go | 36 ++++++++++++++ automod/engine/reroute.go | 6 ++- automod/rules/mod_event.go | 2 + 9 files changed, 212 insertions(+), 9 deletions(-) create mode 100644 api/ozone/settingdefs.go create mode 100644 api/ozone/settinglistOptions.go create mode 100644 api/ozone/settingremoveOptions.go create mode 100644 api/ozone/settingupsertOption.go diff --git a/api/chat/convodefs.go b/api/chat/convodefs.go index 9e215d0fe..59193ce08 100644 --- a/api/chat/convodefs.go +++ b/api/chat/convodefs.go @@ -18,6 +18,7 @@ type ConvoDefs_ConvoView struct { LastMessage *ConvoDefs_ConvoView_LastMessage `json:"lastMessage,omitempty" cborgen:"lastMessage,omitempty"` Members []*ActorDefs_ProfileViewBasic `json:"members" cborgen:"members"` Muted bool `json:"muted" cborgen:"muted"` + Opened *bool `json:"opened,omitempty" cborgen:"opened,omitempty"` Rev string `json:"rev" cborgen:"rev"` UnreadCount int64 `json:"unreadCount" cborgen:"unreadCount"` } diff --git a/api/ozone/moderationdefs.go b/api/ozone/moderationdefs.go index 7c8e939c2..0033c9358 100644 --- a/api/ozone/moderationdefs.go +++ b/api/ozone/moderationdefs.go @@ -21,12 +21,25 @@ import ( type ModerationDefs_AccountEvent struct { LexiconTypeID string `json:"$type,const=tools.ozone.moderation.defs#accountEvent" cborgen:"$type,const=tools.ozone.moderation.defs#accountEvent"` // active: Indicates that the account has a repository which can be fetched from the host that emitted this event. - Active *bool `json:"active,omitempty" cborgen:"active,omitempty"` + Active bool `json:"active" cborgen:"active"` Comment *string `json:"comment,omitempty" cborgen:"comment,omitempty"` - Status string `json:"status" cborgen:"status"` + Status *string `json:"status,omitempty" cborgen:"status,omitempty"` Timestamp string `json:"timestamp" cborgen:"timestamp"` } +// ModerationDefs_AccountHosting is a "accountHosting" in the tools.ozone.moderation.defs schema. +// +// RECORDTYPE: ModerationDefs_AccountHosting +type ModerationDefs_AccountHosting struct { + LexiconTypeID string `json:"$type,const=tools.ozone.moderation.defs#accountHosting" cborgen:"$type,const=tools.ozone.moderation.defs#accountHosting"` + CreatedAt *string `json:"createdAt,omitempty" cborgen:"createdAt,omitempty"` + DeactivatedAt *string `json:"deactivatedAt,omitempty" cborgen:"deactivatedAt,omitempty"` + DeletedAt *string `json:"deletedAt,omitempty" cborgen:"deletedAt,omitempty"` + ReactivatedAt *string `json:"reactivatedAt,omitempty" cborgen:"reactivatedAt,omitempty"` + Status string `json:"status" cborgen:"status"` + UpdatedAt *string `json:"updatedAt,omitempty" cborgen:"updatedAt,omitempty"` +} + // ModerationDefs_BlobView is a "blobView" in the tools.ozone.moderation.defs schema. type ModerationDefs_BlobView struct { Cid string `json:"cid" cborgen:"cid"` @@ -180,8 +193,8 @@ type ModerationDefs_ModEventMute struct { type ModerationDefs_ModEventMuteReporter struct { LexiconTypeID string `json:"$type,const=tools.ozone.moderation.defs#modEventMuteReporter" cborgen:"$type,const=tools.ozone.moderation.defs#modEventMuteReporter"` Comment *string `json:"comment,omitempty" cborgen:"comment,omitempty"` - // durationInHours: Indicates how long the account should remain muted. - DurationInHours int64 `json:"durationInHours" cborgen:"durationInHours"` + // durationInHours: Indicates how long the account should remain muted. Falsy value here means a permanent mute. + DurationInHours *int64 `json:"durationInHours,omitempty" cborgen:"durationInHours,omitempty"` } // ModerationDefs_ModEventReport is a "modEventReport" in the tools.ozone.moderation.defs schema. @@ -735,6 +748,17 @@ type ModerationDefs_RecordEvent struct { Timestamp string `json:"timestamp" cborgen:"timestamp"` } +// ModerationDefs_RecordHosting is a "recordHosting" in the tools.ozone.moderation.defs schema. +// +// RECORDTYPE: ModerationDefs_RecordHosting +type ModerationDefs_RecordHosting struct { + LexiconTypeID string `json:"$type,const=tools.ozone.moderation.defs#recordHosting" cborgen:"$type,const=tools.ozone.moderation.defs#recordHosting"` + CreatedAt *string `json:"createdAt,omitempty" cborgen:"createdAt,omitempty"` + DeletedAt *string `json:"deletedAt,omitempty" cborgen:"deletedAt,omitempty"` + Status string `json:"status" cborgen:"status"` + UpdatedAt *string `json:"updatedAt,omitempty" cborgen:"updatedAt,omitempty"` +} + // ModerationDefs_RecordView is a "recordView" in the tools.ozone.moderation.defs schema. // // RECORDTYPE: ModerationDefs_RecordView @@ -826,8 +850,9 @@ type ModerationDefs_SubjectStatusView struct { // comment: Sticky comment on the subject. Comment *string `json:"comment,omitempty" cborgen:"comment,omitempty"` // createdAt: Timestamp referencing the first moderation status impacting event was emitted on the subject - CreatedAt string `json:"createdAt" cborgen:"createdAt"` - Id int64 `json:"id" cborgen:"id"` + CreatedAt string `json:"createdAt" cborgen:"createdAt"` + Hosting *ModerationDefs_SubjectStatusView_Hosting `json:"hosting,omitempty" cborgen:"hosting,omitempty"` + Id int64 `json:"id" cborgen:"id"` // lastAppealedAt: Timestamp referencing when the author of the subject appealed a moderation action LastAppealedAt *string `json:"lastAppealedAt,omitempty" cborgen:"lastAppealedAt,omitempty"` LastReportedAt *string `json:"lastReportedAt,omitempty" cborgen:"lastReportedAt,omitempty"` @@ -846,6 +871,41 @@ type ModerationDefs_SubjectStatusView struct { UpdatedAt string `json:"updatedAt" cborgen:"updatedAt"` } +type ModerationDefs_SubjectStatusView_Hosting struct { + ModerationDefs_AccountHosting *ModerationDefs_AccountHosting + ModerationDefs_RecordHosting *ModerationDefs_RecordHosting +} + +func (t *ModerationDefs_SubjectStatusView_Hosting) MarshalJSON() ([]byte, error) { + if t.ModerationDefs_AccountHosting != nil { + t.ModerationDefs_AccountHosting.LexiconTypeID = "tools.ozone.moderation.defs#accountHosting" + return json.Marshal(t.ModerationDefs_AccountHosting) + } + if t.ModerationDefs_RecordHosting != nil { + t.ModerationDefs_RecordHosting.LexiconTypeID = "tools.ozone.moderation.defs#recordHosting" + return json.Marshal(t.ModerationDefs_RecordHosting) + } + return nil, fmt.Errorf("cannot marshal empty enum") +} +func (t *ModerationDefs_SubjectStatusView_Hosting) UnmarshalJSON(b []byte) error { + typ, err := util.TypeExtract(b) + if err != nil { + return err + } + + switch typ { + case "tools.ozone.moderation.defs#accountHosting": + t.ModerationDefs_AccountHosting = new(ModerationDefs_AccountHosting) + return json.Unmarshal(b, t.ModerationDefs_AccountHosting) + case "tools.ozone.moderation.defs#recordHosting": + t.ModerationDefs_RecordHosting = new(ModerationDefs_RecordHosting) + return json.Unmarshal(b, t.ModerationDefs_RecordHosting) + + default: + return nil + } +} + type ModerationDefs_SubjectStatusView_Subject struct { AdminDefs_RepoRef *comatprototypes.AdminDefs_RepoRef RepoStrongRef *comatprototypes.RepoStrongRef diff --git a/api/ozone/moderationqueryStatuses.go b/api/ozone/moderationqueryStatuses.go index 969d77e9d..bbe0351f5 100644 --- a/api/ozone/moderationqueryStatuses.go +++ b/api/ozone/moderationqueryStatuses.go @@ -21,6 +21,11 @@ type ModerationQueryStatuses_Output struct { // appealed: Get subjects in unresolved appealed status // collections: If specified, subjects belonging to the given collections will be returned. When subjectType is set to 'account', this will be ignored. // comment: Search subjects by keyword from comments +// hostingDeletedAfter: Search subjects where the associated record/account was deleted after a given timestamp +// hostingDeletedBefore: Search subjects where the associated record/account was deleted before a given timestamp +// hostingStatuses: Search subjects by the status of the associated record/account +// hostingUpdatedAfter: Search subjects where the associated record/account was updated after a given timestamp +// hostingUpdatedBefore: Search subjects where the associated record/account was updated before a given timestamp // includeAllUserRecords: All subjects, or subjects from given 'collections' param, belonging to the account specified in the 'subject' param will be returned. // includeMuted: By default, we don't include muted subjects in the results. Set this to true to include them. // lastReviewedBy: Get all subject statuses that were reviewed by a specific moderator @@ -33,7 +38,7 @@ type ModerationQueryStatuses_Output struct { // subject: The subject to get the status for. // subjectType: If specified, subjects of the given type (account or record) will be returned. When this is set to 'account' the 'collections' parameter will be ignored. When includeAllUserRecords or subject is set, this will be ignored. // takendown: Get subjects that were taken down -func ModerationQueryStatuses(ctx context.Context, c *xrpc.Client, appealed bool, collections []string, comment string, cursor string, excludeTags []string, ignoreSubjects []string, includeAllUserRecords bool, includeMuted bool, lastReviewedBy string, limit int64, onlyMuted bool, reportedAfter string, reportedBefore string, reviewState string, reviewedAfter string, reviewedBefore string, sortDirection string, sortField string, subject string, subjectType string, tags []string, takendown bool) (*ModerationQueryStatuses_Output, error) { +func ModerationQueryStatuses(ctx context.Context, c *xrpc.Client, appealed bool, collections []string, comment string, cursor string, excludeTags []string, hostingDeletedAfter string, hostingDeletedBefore string, hostingStatuses []string, hostingUpdatedAfter string, hostingUpdatedBefore string, ignoreSubjects []string, includeAllUserRecords bool, includeMuted bool, lastReviewedBy string, limit int64, onlyMuted bool, reportedAfter string, reportedBefore string, reviewState string, reviewedAfter string, reviewedBefore string, sortDirection string, sortField string, subject string, subjectType string, tags []string, takendown bool) (*ModerationQueryStatuses_Output, error) { var out ModerationQueryStatuses_Output params := map[string]interface{}{ @@ -42,6 +47,11 @@ func ModerationQueryStatuses(ctx context.Context, c *xrpc.Client, appealed bool, "comment": comment, "cursor": cursor, "excludeTags": excludeTags, + "hostingDeletedAfter": hostingDeletedAfter, + "hostingDeletedBefore": hostingDeletedBefore, + "hostingStatuses": hostingStatuses, + "hostingUpdatedAfter": hostingUpdatedAfter, + "hostingUpdatedBefore": hostingUpdatedBefore, "ignoreSubjects": ignoreSubjects, "includeAllUserRecords": includeAllUserRecords, "includeMuted": includeMuted, diff --git a/api/ozone/settingdefs.go b/api/ozone/settingdefs.go new file mode 100644 index 000000000..ccf334413 --- /dev/null +++ b/api/ozone/settingdefs.go @@ -0,0 +1,23 @@ +// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. + +package ozone + +// schema: tools.ozone.setting.defs + +import ( + "github.com/bluesky-social/indigo/lex/util" +) + +// SettingDefs_Option is a "option" in the tools.ozone.setting.defs schema. +type SettingDefs_Option struct { + CreatedAt *string `json:"createdAt,omitempty" cborgen:"createdAt,omitempty"` + CreatedBy string `json:"createdBy" cborgen:"createdBy"` + Description *string `json:"description,omitempty" cborgen:"description,omitempty"` + Did string `json:"did" cborgen:"did"` + Key string `json:"key" cborgen:"key"` + LastUpdatedBy string `json:"lastUpdatedBy" cborgen:"lastUpdatedBy"` + ManagerRole *string `json:"managerRole,omitempty" cborgen:"managerRole,omitempty"` + Scope string `json:"scope" cborgen:"scope"` + UpdatedAt *string `json:"updatedAt,omitempty" cborgen:"updatedAt,omitempty"` + Value *util.LexiconTypeDecoder `json:"value" cborgen:"value"` +} diff --git a/api/ozone/settinglistOptions.go b/api/ozone/settinglistOptions.go new file mode 100644 index 000000000..8fbdadeca --- /dev/null +++ b/api/ozone/settinglistOptions.go @@ -0,0 +1,38 @@ +// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. + +package ozone + +// schema: tools.ozone.setting.listOptions + +import ( + "context" + + "github.com/bluesky-social/indigo/xrpc" +) + +// SettingListOptions_Output is the output of a tools.ozone.setting.listOptions call. +type SettingListOptions_Output struct { + Cursor *string `json:"cursor,omitempty" cborgen:"cursor,omitempty"` + Options []*SettingDefs_Option `json:"options" cborgen:"options"` +} + +// SettingListOptions calls the XRPC method "tools.ozone.setting.listOptions". +// +// keys: Filter for only the specified keys. Ignored if prefix is provided +// prefix: Filter keys by prefix +func SettingListOptions(ctx context.Context, c *xrpc.Client, cursor string, keys []string, limit int64, prefix string, scope string) (*SettingListOptions_Output, error) { + var out SettingListOptions_Output + + params := map[string]interface{}{ + "cursor": cursor, + "keys": keys, + "limit": limit, + "prefix": prefix, + "scope": scope, + } + if err := c.Do(ctx, xrpc.Query, "", "tools.ozone.setting.listOptions", params, nil, &out); err != nil { + return nil, err + } + + return &out, nil +} diff --git a/api/ozone/settingremoveOptions.go b/api/ozone/settingremoveOptions.go new file mode 100644 index 000000000..5171bb617 --- /dev/null +++ b/api/ozone/settingremoveOptions.go @@ -0,0 +1,31 @@ +// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. + +package ozone + +// schema: tools.ozone.setting.removeOptions + +import ( + "context" + + "github.com/bluesky-social/indigo/xrpc" +) + +// SettingRemoveOptions_Input is the input argument to a tools.ozone.setting.removeOptions call. +type SettingRemoveOptions_Input struct { + Keys []string `json:"keys" cborgen:"keys"` + Scope string `json:"scope" cborgen:"scope"` +} + +// SettingRemoveOptions_Output is the output of a tools.ozone.setting.removeOptions call. +type SettingRemoveOptions_Output struct { +} + +// SettingRemoveOptions calls the XRPC method "tools.ozone.setting.removeOptions". +func SettingRemoveOptions(ctx context.Context, c *xrpc.Client, input *SettingRemoveOptions_Input) (*SettingRemoveOptions_Output, error) { + var out SettingRemoveOptions_Output + if err := c.Do(ctx, xrpc.Procedure, "application/json", "tools.ozone.setting.removeOptions", nil, input, &out); err != nil { + return nil, err + } + + return &out, nil +} diff --git a/api/ozone/settingupsertOption.go b/api/ozone/settingupsertOption.go new file mode 100644 index 000000000..c7d12cffb --- /dev/null +++ b/api/ozone/settingupsertOption.go @@ -0,0 +1,36 @@ +// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. + +package ozone + +// schema: tools.ozone.setting.upsertOption + +import ( + "context" + + "github.com/bluesky-social/indigo/lex/util" + "github.com/bluesky-social/indigo/xrpc" +) + +// SettingUpsertOption_Input is the input argument to a tools.ozone.setting.upsertOption call. +type SettingUpsertOption_Input struct { + Description *string `json:"description,omitempty" cborgen:"description,omitempty"` + Key string `json:"key" cborgen:"key"` + ManagerRole *string `json:"managerRole,omitempty" cborgen:"managerRole,omitempty"` + Scope string `json:"scope" cborgen:"scope"` + Value *util.LexiconTypeDecoder `json:"value" cborgen:"value"` +} + +// SettingUpsertOption_Output is the output of a tools.ozone.setting.upsertOption call. +type SettingUpsertOption_Output struct { + Option *SettingDefs_Option `json:"option" cborgen:"option"` +} + +// SettingUpsertOption calls the XRPC method "tools.ozone.setting.upsertOption". +func SettingUpsertOption(ctx context.Context, c *xrpc.Client, input *SettingUpsertOption_Input) (*SettingUpsertOption_Output, error) { + var out SettingUpsertOption_Output + if err := c.Do(ctx, xrpc.Procedure, "application/json", "tools.ozone.setting.upsertOption", nil, input, &out); err != nil { + return nil, err + } + + return &out, nil +} diff --git a/automod/engine/reroute.go b/automod/engine/reroute.go index 21d2a0298..5eeec2fe0 100644 --- a/automod/engine/reroute.go +++ b/automod/engine/reroute.go @@ -14,8 +14,8 @@ func (eng *Engine) RerouteAccountEventToOzone(c context.Context, e *comatproto.S ModerationDefs_AccountEvent: &toolsozone.ModerationDefs_AccountEvent{ Comment: &comment, Timestamp: e.Time, - Status: *e.Status, - Active: &e.Active, + Status: e.Status, + Active: e.Active, }, }, toolsozone.ModerationEmitEvent_Input_Subject{ AdminDefs_RepoRef: &comatproto.AdminDefs_RepoRef{ @@ -118,6 +118,7 @@ func (eng *Engine) IsDuplicatingEvent(ctx context.Context, event toolsozone.Mode xrpcc, nil, nil, + []string{}, "", time.Now().Add(-time.Minute*5).Format(time.RFC3339), "", @@ -131,6 +132,7 @@ func (eng *Engine) IsDuplicatingEvent(ctx context.Context, event toolsozone.Mode nil, "", eventSubject, + "", []string{eventType}, ) diff --git a/automod/rules/mod_event.go b/automod/rules/mod_event.go index f21ab2995..88708e57d 100644 --- a/automod/rules/mod_event.go +++ b/automod/rules/mod_event.go @@ -2,6 +2,7 @@ package rules import ( "github.com/bluesky-social/indigo/automod" + "github.com/labstack/gommon/log" ) var _ automod.OzoneEventRuleFunc = CountModEventRule @@ -14,6 +15,7 @@ func CountModEventRule(c *automod.OzoneEventContext) error { } c.Increment("mod-event", counterKey) + log.Print("mod-event", counterKey, c.GetCount("mod-event", counterKey, automod.PeriodTotal)) return nil }