Skip to content

Commit

Permalink
automod: redis identity cache; early parallelism; fix rules
Browse files Browse the repository at this point in the history
  • Loading branch information
bnewbold committed Nov 15, 2023
1 parent 2e93643 commit 0ad1010
Show file tree
Hide file tree
Showing 7 changed files with 467 additions and 9 deletions.
4 changes: 2 additions & 2 deletions automod/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (e *Engine) ProcessRecord(ctx context.Context, did syntax.DID, path, recCID
return fmt.Errorf("mismatch between collection (%s) and type", collection)
}
evt := e.NewPostEvent(ident, path, recCID, post)
e.Logger.Info("processing post", "did", ident.DID, "path", path)
e.Logger.Debug("processing post", "did", ident.DID, "path", path)
if err := e.Rules.CallPostRules(&evt); err != nil {
return err
}
Expand All @@ -103,7 +103,7 @@ func (e *Engine) ProcessRecord(ctx context.Context, did syntax.DID, path, recCID
}
default:
evt := e.NewRecordEvent(ident, path, recCID, rec)
e.Logger.Info("processing record", "did", ident.DID, "path", path)
e.Logger.Debug("processing record", "did", ident.DID, "path", path)
if err := e.Rules.CallRecordRules(&evt); err != nil {
return err
}
Expand Down
336 changes: 336 additions & 0 deletions automod/redis_directory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,336 @@
package automod

import (
"context"
"fmt"
"sync"
"time"

"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/atproto/syntax"

"github.com/go-redis/cache/v9"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/redis/go-redis/v9"
)

// uses redis as a cache for identity lookups. includes a local cache layer as well, for hot keys
type RedisDirectory struct {
Inner identity.Directory
ErrTTL time.Duration
HitTTL time.Duration

handleCache *cache.Cache
identityCache *cache.Cache
didLookupChans sync.Map
handleLookupChans sync.Map
}

type HandleEntry struct {
Updated time.Time
DID syntax.DID
Err error
}

type IdentityEntry struct {
Updated time.Time
Identity *identity.Identity
Err error
}

var _ identity.Directory = (*RedisDirectory)(nil)

func NewRedisDirectory(inner identity.Directory, redisURL string, hitTTL, errTTL time.Duration) (*RedisDirectory, error) {
opt, err := redis.ParseURL(redisURL)
if err != nil {
return nil, err
}
rdb := redis.NewClient(opt)
// check redis connection
_, err = rdb.Ping(context.TODO()).Result()
if err != nil {
return nil, err
}
handleCache := cache.New(&cache.Options{
Redis: rdb,
LocalCache: cache.NewTinyLFU(10_000, hitTTL),
})
identityCache := cache.New(&cache.Options{
Redis: rdb,
LocalCache: cache.NewTinyLFU(10_000, hitTTL),
})
return &RedisDirectory{
Inner: inner,
ErrTTL: errTTL,
HitTTL: hitTTL,
handleCache: handleCache,
identityCache: identityCache,
}, nil
}

func (d *RedisDirectory) IsHandleStale(e *HandleEntry) bool {
if e.Err != nil && time.Since(e.Updated) > d.ErrTTL {
return true
}
return false
}

func (d *RedisDirectory) IsIdentityStale(e *IdentityEntry) bool {
if e.Err != nil && time.Since(e.Updated) > d.ErrTTL {
return true
}
return false
}

func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) (*HandleEntry, error) {
ident, err := d.Inner.LookupHandle(ctx, h)
if err != nil {
he := HandleEntry{
Updated: time.Now(),
DID: "",
Err: err,
}
d.handleCache.Set(&cache.Item{
Ctx: ctx,
Key: h.String(),
Value: he,
TTL: d.ErrTTL,
})
return &he, nil
}

entry := IdentityEntry{
Updated: time.Now(),
Identity: ident,
Err: nil,
}
he := HandleEntry{
Updated: time.Now(),
DID: ident.DID,
Err: nil,
}

d.identityCache.Set(&cache.Item{
Ctx: ctx,
Key: ident.DID.String(),
Value: entry,
TTL: d.HitTTL,
})
d.handleCache.Set(&cache.Item{
Ctx: ctx,
Key: h.String(),
Value: he,
TTL: d.HitTTL,
})
return &he, nil
}

func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) {
var entry HandleEntry
err := d.handleCache.Get(ctx, h.String(), &entry)
if err != nil && err != cache.ErrCacheMiss {
return "", err
}
if err != cache.ErrCacheMiss && !d.IsHandleStale(&entry) {
handleCacheHits.Inc()
return entry.DID, entry.Err
}
handleCacheMisses.Inc()

// Coalesce multiple requests for the same Handle
res := make(chan struct{})
val, loaded := d.handleLookupChans.LoadOrStore(h.String(), res)
if loaded {
handleRequestsCoalesced.Inc()
// Wait for the result from the pending request
select {
case <-val.(chan struct{}):
// The result should now be in the cache
err := d.handleCache.Get(ctx, h.String(), entry)
if err != nil && err != cache.ErrCacheMiss {
return "", err
}
if err != cache.ErrCacheMiss && !d.IsHandleStale(&entry) {
return entry.DID, entry.Err
}
return "", fmt.Errorf("identity not found in cache after coalesce returned")
case <-ctx.Done():
return "", ctx.Err()
}
}

var did syntax.DID
// Update the Handle Entry from PLC and cache the result
newEntry, err := d.updateHandle(ctx, h)
if err == nil && newEntry != nil {
did = newEntry.DID
}
// Cleanup the coalesce map and close the results channel
d.handleLookupChans.Delete(h.String())
// Callers waiting will now get the result from the cache
close(res)

return did, err
}

func (d *RedisDirectory) updateDID(ctx context.Context, did syntax.DID) (*IdentityEntry, error) {
ident, err := d.Inner.LookupDID(ctx, did)
// wipe parsed public key; it's a waste of space
if nil == err {
ident.ParsedPublicKey = nil
}
// persist the identity lookup error, instead of processing it immediately
entry := IdentityEntry{
Updated: time.Now(),
Identity: ident,
Err: err,
}
var he *HandleEntry
// if *not* an error, then also update the handle cache
if nil == err && !ident.Handle.IsInvalidHandle() {
he = &HandleEntry{
Updated: time.Now(),
DID: did,
Err: nil,
}
}

d.identityCache.Set(&cache.Item{
Ctx: ctx,
Key: did.String(),
Value: entry,
TTL: d.HitTTL,
})
if he != nil {
d.handleCache.Set(&cache.Item{
Ctx: ctx,
Key: ident.Handle.String(),
Value: *he,
TTL: d.HitTTL,
})
}
return &entry, nil
}

func (d *RedisDirectory) LookupDID(ctx context.Context, did syntax.DID) (*identity.Identity, error) {
var entry IdentityEntry
err := d.identityCache.Get(ctx, did.String(), &entry)
if err != nil && err != cache.ErrCacheMiss {
return nil, err
}
if err != cache.ErrCacheMiss && !d.IsIdentityStale(&entry) {
identityCacheHits.Inc()
return entry.Identity, entry.Err
}
identityCacheMisses.Inc()

// Coalesce multiple requests for the same DID
res := make(chan struct{})
val, loaded := d.didLookupChans.LoadOrStore(did.String(), res)
if loaded {
identityRequestsCoalesced.Inc()
// Wait for the result from the pending request
select {
case <-val.(chan struct{}):
// The result should now be in the cache
err = d.identityCache.Get(ctx, did.String(), &entry)
if err != nil && err != cache.ErrCacheMiss {
return nil, err
}
if err != cache.ErrCacheMiss && !d.IsIdentityStale(&entry) {
return entry.Identity, entry.Err
}
return nil, fmt.Errorf("identity not found in cache after coalesce returned")
case <-ctx.Done():
return nil, ctx.Err()
}
}

var doc *identity.Identity
// Update the Identity Entry from PLC and cache the result
newEntry, err := d.updateDID(ctx, did)
if err == nil && newEntry != nil {
doc = newEntry.Identity
}
// Cleanup the coalesce map and close the results channel
d.didLookupChans.Delete(did.String())
// Callers waiting will now get the result from the cache
close(res)

return doc, err
}

func (d *RedisDirectory) LookupHandle(ctx context.Context, h syntax.Handle) (*identity.Identity, error) {
did, err := d.ResolveHandle(ctx, h)
if err != nil {
return nil, err
}
ident, err := d.LookupDID(ctx, did)
if err != nil {
return nil, err
}

declared, err := ident.DeclaredHandle()
if err != nil {
return nil, err
}
if declared != h {
return nil, fmt.Errorf("handle does not match that declared in DID document")
}
return ident, nil
}

func (d *RedisDirectory) Lookup(ctx context.Context, a syntax.AtIdentifier) (*identity.Identity, error) {
handle, err := a.AsHandle()
if nil == err { // if not an error, is a handle
return d.LookupHandle(ctx, handle)
}
did, err := a.AsDID()
if nil == err { // if not an error, is a DID
return d.LookupDID(ctx, did)
}
return nil, fmt.Errorf("at-identifier neither a Handle nor a DID")
}

func (d *RedisDirectory) Purge(ctx context.Context, a syntax.AtIdentifier) error {
handle, err := a.AsHandle()
if nil == err { // if not an error, is a handle
return d.handleCache.Delete(ctx, handle.String())
}
did, err := a.AsDID()
if nil == err { // if not an error, is a DID
return d.identityCache.Delete(ctx, did.String())
}
return fmt.Errorf("at-identifier neither a Handle nor a DID")
}

var handleCacheHits = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_redis_directory_handle_cache_hits",
Help: "Number of cache hits for ATProto handle lookups",
})

var handleCacheMisses = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_redis_directory_handle_cache_misses",
Help: "Number of cache misses for ATProto handle lookups",
})

var identityCacheHits = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_redis_directory_identity_cache_hits",
Help: "Number of cache hits for ATProto identity lookups",
})

var identityCacheMisses = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_redis_directory_identity_cache_misses",
Help: "Number of cache misses for ATProto identity lookups",
})

var identityRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_redis_directory_identity_requests_coalesced",
Help: "Number of identity requests coalesced",
})

var handleRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_redis_directory_handle_requests_coalesced",
Help: "Number of handle requests coalesced",
})
2 changes: 2 additions & 0 deletions automod/rules/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
func DefaultRules() automod.RuleSet {
rules := automod.RuleSet{
PostRules: []automod.PostRuleFunc{
MisleadingURLPostRule,
MisleadingMentionPostRule,
BanHashtagsPostRule,
},
}
Expand Down
8 changes: 5 additions & 3 deletions cmd/hepa/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@ func (s *Server) RunConsumer(ctx context.Context) error {
// TODO: other event callbacks as needed
}

// start at higher parallelism (somewhat arbitrary)
scaleSettings := autoscaling.DefaultAutoscaleSettings()
scaleSettings.Concurrency = 6
return events.HandleRepoStream(
ctx, con, autoscaling.NewScheduler(
autoscaling.DefaultAutoscaleSettings(),
scaleSettings,
s.bgshost,
rsc.EventHandler,
),
Expand All @@ -72,8 +75,7 @@ func (s *Server) RunConsumer(ctx context.Context) error {
func (s *Server) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {

logger := s.logger.With("event", "commit", "did", evt.Repo, "rev", evt.Rev, "seq", evt.Seq)
// XXX: debug, not info
logger.Info("received commit event")
logger.Debug("received commit event")

if evt.TooBig {
logger.Warn("skipping tooBig events for now")
Expand Down
Loading

0 comments on commit 0ad1010

Please sign in to comment.