Skip to content

Commit

Permalink
refactor automod consumer into package (#776)
Browse files Browse the repository at this point in the history
  • Loading branch information
bnewbold authored Oct 29, 2024
2 parents 150e051 + d52ce4d commit 3ff7405
Show file tree
Hide file tree
Showing 12 changed files with 619 additions and 518 deletions.
2 changes: 2 additions & 0 deletions automod/consumer/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Code for consuming from atproto firehose and ozone event stream, pushing events in to automod engine.
package consumer
312 changes: 312 additions & 0 deletions automod/consumer/firehose.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
package consumer

import (
"bytes"
"context"
"fmt"
"log/slog"
"net/http"
"net/url"
"sync/atomic"
"time"

comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/bluesky-social/indigo/automod"
"github.com/bluesky-social/indigo/events/schedulers/autoscaling"
"github.com/bluesky-social/indigo/events/schedulers/parallel"
lexutil "github.com/bluesky-social/indigo/lex/util"

"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/repo"
"github.com/bluesky-social/indigo/repomgr"
"github.com/carlmjohnson/versioninfo"
"github.com/gorilla/websocket"
"github.com/redis/go-redis/v9"
)

// TODO: should probably make this not hepa-specific; or even configurable
var firehoseCursorKey = "hepa/seq"

type FirehoseConsumer struct {
Parallelism int
Logger *slog.Logger
RedisClient *redis.Client
Engine *automod.Engine
Host string

// TODO: prefilter record collections; or predicate function?
// TODO: enable/disable event types; or predicate function?

// lastSeq is the most recent event sequence number we've received and begun to handle.
// This number is periodically persisted to redis, if redis is present.
// The value is best-effort (the stream handling itself is concurrent, so event numbers may not be monotonic),
// but nonetheless, you must use atomics when updating or reading this (to avoid data races).
lastSeq int64
}

func (fc *FirehoseConsumer) Run(ctx context.Context) error {

if fc.Engine == nil {
return fmt.Errorf("nil engine")
}

cur, err := fc.ReadLastCursor(ctx)
if err != nil {
return err
}

dialer := websocket.DefaultDialer
u, err := url.Parse(fc.Host)
if err != nil {
return fmt.Errorf("invalid Host URI: %w", err)
}
u.Path = "xrpc/com.atproto.sync.subscribeRepos"
if cur != 0 {
u.RawQuery = fmt.Sprintf("cursor=%d", cur)
}
fc.Logger.Info("subscribing to repo event stream", "upstream", fc.Host, "cursor", cur)
con, _, err := dialer.Dial(u.String(), http.Header{
"User-Agent": []string{fmt.Sprintf("hepa/%s", versioninfo.Short())},
})
if err != nil {
return fmt.Errorf("subscribing to firehose failed (dialing): %w", err)
}

rsc := &events.RepoStreamCallbacks{
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
atomic.StoreInt64(&fc.lastSeq, evt.Seq)
return fc.HandleRepoCommit(ctx, evt)
},
RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error {
atomic.StoreInt64(&fc.lastSeq, evt.Seq)
did, err := syntax.ParseDID(evt.Did)
if err != nil {
fc.Logger.Error("bad DID in RepoIdentity event", "did", evt.Did, "seq", evt.Seq, "err", err)
return nil
}
if err := fc.Engine.ProcessIdentityEvent(ctx, "identity", did); err != nil {
fc.Logger.Error("processing repo identity failed", "did", evt.Did, "seq", evt.Seq, "err", err)
}
return nil
},
RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error {
atomic.StoreInt64(&fc.lastSeq, evt.Seq)
did, err := syntax.ParseDID(evt.Did)
if err != nil {
fc.Logger.Error("bad DID in RepoAccount event", "did", evt.Did, "seq", evt.Seq, "err", err)
return nil
}
if err := fc.Engine.ProcessIdentityEvent(ctx, "account", did); err != nil {
fc.Logger.Error("processing repo account failed", "did", evt.Did, "seq", evt.Seq, "err", err)
}
return nil
},
// TODO: deprecated
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
atomic.StoreInt64(&fc.lastSeq, evt.Seq)
did, err := syntax.ParseDID(evt.Did)
if err != nil {
fc.Logger.Error("bad DID in RepoHandle event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err)
return nil
}
if err := fc.Engine.ProcessIdentityEvent(ctx, "handle", did); err != nil {
fc.Logger.Error("processing handle update failed", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err)
}
return nil
},
// TODO: deprecated
RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error {
atomic.StoreInt64(&fc.lastSeq, evt.Seq)
did, err := syntax.ParseDID(evt.Did)
if err != nil {
fc.Logger.Error("bad DID in RepoTombstone event", "did", evt.Did, "seq", evt.Seq, "err", err)
return nil
}
if err := fc.Engine.ProcessIdentityEvent(ctx, "tombstone", did); err != nil {
fc.Logger.Error("processing repo tombstone failed", "did", evt.Did, "seq", evt.Seq, "err", err)
}
return nil
},
}

var scheduler events.Scheduler
if fc.Parallelism > 0 {
// use a fixed-parallelism scheduler if configured
scheduler = parallel.NewScheduler(
fc.Parallelism,
1000,
fc.Host,
rsc.EventHandler,
)
fc.Logger.Info("hepa scheduler configured", "scheduler", "parallel", "initial", fc.Parallelism)
} else {
// otherwise use auto-scaling scheduler
scaleSettings := autoscaling.DefaultAutoscaleSettings()
// start at higher parallelism (somewhat arbitrary)
scaleSettings.Concurrency = 4
scaleSettings.MaxConcurrency = 200
scheduler = autoscaling.NewScheduler(scaleSettings, fc.Host, rsc.EventHandler)
fc.Logger.Info("hepa scheduler configured", "scheduler", "autoscaling", "initial", scaleSettings.Concurrency, "max", scaleSettings.MaxConcurrency)
}

return events.HandleRepoStream(ctx, con, scheduler)
}

// NOTE: for now, this function basically never errors, just logs and returns nil. Should think through error processing better.
func (fc *FirehoseConsumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {

logger := fc.Logger.With("event", "commit", "did", evt.Repo, "rev", evt.Rev, "seq", evt.Seq)
logger.Debug("received commit event")

if evt.TooBig {
logger.Warn("skipping tooBig events for now")
return nil
}

did, err := syntax.ParseDID(evt.Repo)
if err != nil {
logger.Error("bad DID syntax in event", "err", err)
return nil
}

rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
if err != nil {
logger.Error("failed to read repo from car", "err", err)
return nil
}

// empty commit is a special case, temporarily, basically indicates "new account"
if len(evt.Ops) == 0 {
if err := fc.Engine.ProcessIdentityEvent(ctx, "create", did); err != nil {
fc.Logger.Error("processing handle update failed", "did", evt.Repo, "rev", evt.Rev, "seq", evt.Seq, "err", err)
}
}

for _, op := range evt.Ops {
logger = logger.With("eventKind", op.Action, "path", op.Path)
collection, rkey, err := splitRepoPath(op.Path)
if err != nil {
logger.Error("invalid path in repo op")
return nil
}

ek := repomgr.EventKind(op.Action)
switch ek {
case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord:
// read the record bytes from blocks, and verify CID
rc, recCBOR, err := rr.GetRecordBytes(ctx, op.Path)
if err != nil {
logger.Error("reading record from event blocks (CAR)", "err", err)
break
}
if op.Cid == nil || lexutil.LexLink(rc) != *op.Cid {
logger.Error("mismatch between commit op CID and record block", "recordCID", rc, "opCID", op.Cid)
break
}
var action string
switch ek {
case repomgr.EvtKindCreateRecord:
action = automod.CreateOp
case repomgr.EvtKindUpdateRecord:
action = automod.UpdateOp
default:
logger.Error("impossible event kind", "kind", ek)
break
}
recCID := syntax.CID(op.Cid.String())
err = fc.Engine.ProcessRecordOp(ctx, automod.RecordOp{
Action: action,
DID: did,
Collection: collection,
RecordKey: rkey,
CID: &recCID,
RecordCBOR: *recCBOR,
})
if err != nil {
logger.Error("engine failed to process record", "err", err)
continue
}
case repomgr.EvtKindDeleteRecord:
err = fc.Engine.ProcessRecordOp(ctx, automod.RecordOp{
Action: automod.DeleteOp,
DID: did,
Collection: collection,
RecordKey: rkey,
CID: nil,
RecordCBOR: nil,
})
if err != nil {
logger.Error("engine failed to process record", "err", err)
continue
}
default:
// TODO: should this be an error?
}
}

return nil
}

func (fc *FirehoseConsumer) ReadLastCursor(ctx context.Context) (int64, error) {
// if redis isn't configured, just skip
if fc.RedisClient == nil {
fc.Logger.Info("redis not configured, skipping cursor read")
return 0, nil
}

val, err := fc.RedisClient.Get(ctx, firehoseCursorKey).Int64()
if err == redis.Nil {
fc.Logger.Info("no pre-existing cursor in redis")
return 0, nil
} else if err != nil {
return 0, err
}
fc.Logger.Info("successfully found prior subscription cursor seq in redis", "seq", val)
return val, nil
}

func (fc *FirehoseConsumer) PersistCursor(ctx context.Context) error {
// if redis isn't configured, just skip
if fc.RedisClient == nil {
return nil
}
lastSeq := atomic.LoadInt64(&fc.lastSeq)
if lastSeq <= 0 {
return nil
}
err := fc.RedisClient.Set(ctx, firehoseCursorKey, lastSeq, 14*24*time.Hour).Err()
return err
}

// this method runs in a loop, persisting the current cursor state every 5 seconds
func (fc *FirehoseConsumer) RunPersistCursor(ctx context.Context) error {

// if redis isn't configured, just skip
if fc.RedisClient == nil {
return nil
}
ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-ctx.Done():
lastSeq := atomic.LoadInt64(&fc.lastSeq)
if lastSeq >= 1 {
fc.Logger.Info("persisting final cursor seq value", "seq", lastSeq)
err := fc.PersistCursor(ctx)
if err != nil {
fc.Logger.Error("failed to persist cursor", "err", err, "seq", lastSeq)
}
}
return nil
case <-ticker.C:
lastSeq := atomic.LoadInt64(&fc.lastSeq)
if lastSeq >= 1 {
err := fc.PersistCursor(ctx)
if err != nil {
fc.Logger.Error("failed to persist cursor", "err", err, "seq", lastSeq)
}
}
}
}
}
Loading

0 comments on commit 3ff7405

Please sign in to comment.