From 5fdfd70573000ea42b746a03b94a4f338a28be80 Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Tue, 29 Oct 2024 16:48:42 -0700 Subject: [PATCH] automod: refactor identity and account event processing --- automod/capture/testing.go | 10 ++++- automod/consumer/firehose.go | 59 +++++-------------------- automod/engine/engine.go | 84 ++++++++++++++++++++++++++++++++++-- automod/engine/ruleset.go | 12 ++++++ automod/engine/ruletypes.go | 1 + 5 files changed, 113 insertions(+), 53 deletions(-) diff --git a/automod/capture/testing.go b/automod/capture/testing.go index 998aaef48..fbe00d6cb 100644 --- a/automod/capture/testing.go +++ b/automod/capture/testing.go @@ -7,6 +7,7 @@ import ( "io" "os" + comatproto "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/atproto/identity" "github.com/bluesky-social/indigo/atproto/syntax" "github.com/bluesky-social/indigo/automod" @@ -38,12 +39,19 @@ func ProcessCaptureRules(eng *automod.Engine, capture AccountCapture) error { ctx := context.Background() did := capture.AccountMeta.Identity.DID + handle := capture.AccountMeta.Identity.Handle.String() dir := identity.NewMockDirectory() dir.Insert(*capture.AccountMeta.Identity) eng.Directory = &dir // initial identity rules - eng.ProcessIdentityEvent(ctx, "new", did) + identEvent := comatproto.SyncSubscribeRepos_Identity{ + Did: did.String(), + Handle: &handle, + Seq: 12345, + Time: syntax.DatetimeNow().String(), + } + eng.ProcessIdentityEvent(ctx, identEvent) // all the post rules for _, pr := range capture.PostRecords { diff --git a/automod/consumer/firehose.go b/automod/consumer/firehose.go index df6d8f91b..f210b3055 100644 --- a/automod/consumer/firehose.go +++ b/automod/consumer/firehose.go @@ -80,54 +80,20 @@ func (fc *FirehoseConsumer) Run(ctx context.Context) error { }, RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { atomic.StoreInt64(&fc.lastSeq, evt.Seq) - did, err := syntax.ParseDID(evt.Did) - if err != nil { - fc.Logger.Error("bad DID in RepoIdentity event", "did", evt.Did, "seq", evt.Seq, "err", err) - return nil - } - if err := fc.Engine.ProcessIdentityEvent(ctx, "identity", did); err != nil { + if err := fc.Engine.ProcessIdentityEvent(ctx, *evt); err != nil { fc.Logger.Error("processing repo identity failed", "did", evt.Did, "seq", evt.Seq, "err", err) } return nil }, RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error { atomic.StoreInt64(&fc.lastSeq, evt.Seq) - did, err := syntax.ParseDID(evt.Did) - if err != nil { - fc.Logger.Error("bad DID in RepoAccount event", "did", evt.Did, "seq", evt.Seq, "err", err) - return nil - } - if err := fc.Engine.ProcessIdentityEvent(ctx, "account", did); err != nil { + if err := fc.Engine.ProcessAccountEvent(ctx, *evt); err != nil { fc.Logger.Error("processing repo account failed", "did", evt.Did, "seq", evt.Seq, "err", err) } return nil }, - // TODO: deprecated - RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { - atomic.StoreInt64(&fc.lastSeq, evt.Seq) - did, err := syntax.ParseDID(evt.Did) - if err != nil { - fc.Logger.Error("bad DID in RepoHandle event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) - return nil - } - if err := fc.Engine.ProcessIdentityEvent(ctx, "handle", did); err != nil { - fc.Logger.Error("processing handle update failed", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) - } - return nil - }, - // TODO: deprecated - RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error { - atomic.StoreInt64(&fc.lastSeq, evt.Seq) - did, err := syntax.ParseDID(evt.Did) - if err != nil { - fc.Logger.Error("bad DID in RepoTombstone event", "did", evt.Did, "seq", evt.Seq, "err", err) - return nil - } - if err := fc.Engine.ProcessIdentityEvent(ctx, "tombstone", did); err != nil { - fc.Logger.Error("processing repo tombstone failed", "did", evt.Did, "seq", evt.Seq, "err", err) - } - return nil - }, + // NOTE: no longer process #handle events + // NOTE: no longer process #tombstone events } var scheduler events.Scheduler @@ -176,13 +142,6 @@ func (fc *FirehoseConsumer) HandleRepoCommit(ctx context.Context, evt *comatprot return nil } - // empty commit is a special case, temporarily, basically indicates "new account" - if len(evt.Ops) == 0 { - if err := fc.Engine.ProcessIdentityEvent(ctx, "create", did); err != nil { - fc.Logger.Error("processing handle update failed", "did", evt.Repo, "rev", evt.Rev, "seq", evt.Seq, "err", err) - } - } - for _, op := range evt.Ops { logger = logger.With("eventKind", op.Action, "path", op.Path) collection, rkey, err := splitRepoPath(op.Path) @@ -215,27 +174,29 @@ func (fc *FirehoseConsumer) HandleRepoCommit(ctx context.Context, evt *comatprot break } recCID := syntax.CID(op.Cid.String()) - err = fc.Engine.ProcessRecordOp(ctx, automod.RecordOp{ + op := automod.RecordOp{ Action: action, DID: did, Collection: collection, RecordKey: rkey, CID: &recCID, RecordCBOR: *recCBOR, - }) + } + err = fc.Engine.ProcessRecordOp(ctx, op) if err != nil { logger.Error("engine failed to process record", "err", err) continue } case repomgr.EvtKindDeleteRecord: - err = fc.Engine.ProcessRecordOp(ctx, automod.RecordOp{ + op := automod.RecordOp{ Action: automod.DeleteOp, DID: did, Collection: collection, RecordKey: rkey, CID: nil, RecordCBOR: nil, - }) + } + err = fc.Engine.ProcessRecordOp(ctx, op) if err != nil { logger.Error("engine failed to process record", "err", err) continue diff --git a/automod/engine/engine.go b/automod/engine/engine.go index 4ad71ba15..8ed864371 100644 --- a/automod/engine/engine.go +++ b/automod/engine/engine.go @@ -7,6 +7,7 @@ import ( "net/http" "time" + comatproto "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/atproto/identity" "github.com/bluesky-social/indigo/atproto/syntax" "github.com/bluesky-social/indigo/automod/cachestore" @@ -53,10 +54,10 @@ type EngineConfig struct { SkipAccountMeta bool } -// Entrypoint for external code pushing arbitrary identity events in to the engine. +// Entrypoint for external code pushing #identity events in to the engine. // // This method can be called concurrently, though cached state may end up inconsistent if multiple events for the same account (DID) are processed in parallel. -func (eng *Engine) ProcessIdentityEvent(ctx context.Context, typ string, did syntax.DID) error { +func (eng *Engine) ProcessIdentityEvent(ctx context.Context, evt comatproto.SyncSubscribeRepos_Identity) error { eventProcessCount.WithLabelValues("identity").Inc() start := time.Now() defer func() { @@ -64,10 +65,15 @@ func (eng *Engine) ProcessIdentityEvent(ctx context.Context, typ string, did syn eventProcessDuration.WithLabelValues("identity").Observe(duration.Seconds()) }() + did, err := syntax.ParseDID(evt.Did) + if err != nil { + return fmt.Errorf("bad DID in repo #identity event (%s): %w", evt.Did, err) + } + // similar to an HTTP server, we want to recover any panics from rule execution defer func() { if r := recover(); r != nil { - eng.Logger.Error("automod event execution exception", "err", r, "did", did, "type", typ) + eng.Logger.Error("automod event execution exception", "err", r, "did", did, "type", "identity") eventErrorCount.WithLabelValues("identity").Inc() } }() @@ -78,6 +84,7 @@ func (eng *Engine) ProcessIdentityEvent(ctx context.Context, typ string, did syn if err := eng.PurgeAccountCaches(ctx, did); err != nil { eng.Logger.Error("failed to purge identity cache; identity rule may not run correctly", "err", err) } + // TODO(bnewbold): if it was a tombstone, this might fail ident, err := eng.Directory.LookupDID(ctx, did) if err != nil { eventErrorCount.WithLabelValues("identity").Inc() @@ -118,6 +125,77 @@ func (eng *Engine) ProcessIdentityEvent(ctx context.Context, typ string, did syn return nil } +// Entrypoint for external code pushing #account events in to the engine. +// +// This method can be called concurrently, though cached state may end up inconsistent if multiple events for the same account (DID) are processed in parallel. +func (eng *Engine) ProcessAccountEvent(ctx context.Context, evt comatproto.SyncSubscribeRepos_Account) error { + eventProcessCount.WithLabelValues("account").Inc() + start := time.Now() + defer func() { + duration := time.Since(start) + eventProcessDuration.WithLabelValues("account").Observe(duration.Seconds()) + }() + + did, err := syntax.ParseDID(evt.Did) + if err != nil { + return fmt.Errorf("bad DID in repo #account event (%s): %w", evt.Did, err) + } + + // similar to an HTTP server, we want to recover any panics from rule execution + defer func() { + if r := recover(); r != nil { + eng.Logger.Error("automod event execution exception", "err", r, "did", did, "type", "account") + eventErrorCount.WithLabelValues("account").Inc() + } + }() + ctx, cancel := context.WithTimeout(ctx, identityEventTimeout) + defer cancel() + + // first purge any caches; we need to re-resolve from scratch on account updates + if err := eng.PurgeAccountCaches(ctx, did); err != nil { + eng.Logger.Error("failed to purge account cache; account rule may not run correctly", "err", err) + } + // TODO(bnewbold): if it was a tombstone, this might fail + ident, err := eng.Directory.LookupDID(ctx, did) + if err != nil { + eventErrorCount.WithLabelValues("account").Inc() + return fmt.Errorf("resolving identity: %w", err) + } + if ident == nil { + eventErrorCount.WithLabelValues("account").Inc() + return fmt.Errorf("identity not found for DID: %s", did.String()) + } + + var am *AccountMeta + if !eng.Config.SkipAccountMeta { + am, err = eng.GetAccountMeta(ctx, ident) + if err != nil { + eventErrorCount.WithLabelValues("identity").Inc() + return fmt.Errorf("failed to fetch account metadata: %w", err) + } + } else { + am = &AccountMeta{ + Identity: ident, + Profile: ProfileSummary{}, + } + } + ac := NewAccountContext(ctx, eng, *am) + if err := eng.Rules.CallAccountRules(&ac); err != nil { + eventErrorCount.WithLabelValues("account").Inc() + return fmt.Errorf("rule execution failed: %w", err) + } + eng.CanonicalLogLineAccount(&ac) + if err := eng.persistAccountModActions(&ac); err != nil { + eventErrorCount.WithLabelValues("account").Inc() + return fmt.Errorf("failed to persist actions for account event: %w", err) + } + if err := eng.persistCounters(ctx, ac.effects); err != nil { + eventErrorCount.WithLabelValues("account").Inc() + return fmt.Errorf("failed to persist counters for account event: %w", err) + } + return nil +} + // Entrypoint for external code pushing repository updates. A simple repo commit results in multiple calls. // // This method can be called concurrently, though cached state may end up inconsistent if multiple events for the same account (DID) are processed in parallel. diff --git a/automod/engine/ruleset.go b/automod/engine/ruleset.go index 0b7d90cc4..4c72ef8f9 100644 --- a/automod/engine/ruleset.go +++ b/automod/engine/ruleset.go @@ -16,6 +16,7 @@ type RuleSet struct { RecordRules []RecordRuleFunc RecordDeleteRules []RecordRuleFunc IdentityRules []IdentityRuleFunc + AccountRules []AccountRuleFunc BlobRules []BlobRuleFunc NotificationRules []NotificationRuleFunc OzoneEventRules []OzoneEventRuleFunc @@ -89,6 +90,17 @@ func (r *RuleSet) CallIdentityRules(c *AccountContext) error { return nil } +// Executes rules for account update events. +func (r *RuleSet) CallAccountRules(c *AccountContext) error { + for _, f := range r.AccountRules { + err := f(c) + if err != nil { + c.Logger.Error("account rule execution failed", "err", err) + } + } + return nil +} + func (r *RuleSet) CallNotificationRules(c *NotificationContext) error { for _, f := range r.NotificationRules { err := f(c) diff --git a/automod/engine/ruletypes.go b/automod/engine/ruletypes.go index a86567ead..27d4a149f 100644 --- a/automod/engine/ruletypes.go +++ b/automod/engine/ruletypes.go @@ -6,6 +6,7 @@ import ( ) type IdentityRuleFunc = func(c *AccountContext) error +type AccountRuleFunc = func(c *AccountContext) error type RecordRuleFunc = func(c *RecordContext) error type PostRuleFunc = func(c *RecordContext, post *appbsky.FeedPost) error type ProfileRuleFunc = func(c *RecordContext, profile *appbsky.ActorProfile) error