From f24fe0287ed4bb621800354eebb0a64cc1c9ed8c Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Wed, 24 Jan 2024 18:21:34 -0800 Subject: [PATCH 01/17] redisdir: handle purge of uncached directory info --- atproto/identity/redisdir/redis_directory.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/atproto/identity/redisdir/redis_directory.go b/atproto/identity/redisdir/redis_directory.go index 3ea9f354a..90817b6dc 100644 --- a/atproto/identity/redisdir/redis_directory.go +++ b/atproto/identity/redisdir/redis_directory.go @@ -317,11 +317,19 @@ func (d *RedisDirectory) Purge(ctx context.Context, a syntax.AtIdentifier) error handle, err := a.AsHandle() if nil == err { // if not an error, is a handle handle = handle.Normalize() - return d.handleCache.Delete(ctx, handle.String()) + err = d.handleCache.Delete(ctx, handle.String()) + if err == cache.ErrCacheMiss { + return nil + } + return err } did, err := a.AsDID() if nil == err { // if not an error, is a DID - return d.identityCache.Delete(ctx, did.String()) + err = d.identityCache.Delete(ctx, did.String()) + if err == cache.ErrCacheMiss { + return nil + } + return err } return fmt.Errorf("at-identifier neither a Handle nor a DID") } From 2e695b86ae15c9347acad9958fd1ae87992e8f30 Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Wed, 25 Sep 2024 01:45:31 -0700 Subject: [PATCH 02/17] identity: simplify type sig --- atproto/identity/cache_directory.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/atproto/identity/cache_directory.go b/atproto/identity/cache_directory.go index 55951bbf0..dad78fe82 100644 --- a/atproto/identity/cache_directory.go +++ b/atproto/identity/cache_directory.go @@ -68,7 +68,7 @@ var handleRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ var _ Directory = (*CacheDirectory)(nil) // Capacity of zero means unlimited size. Similarly, ttl of zero means unlimited duration. -func NewCacheDirectory(inner Directory, capacity int, hitTTL, errTTL time.Duration, invalidHandleTTL time.Duration) CacheDirectory { +func NewCacheDirectory(inner Directory, capacity int, hitTTL, errTTL, invalidHandleTTL time.Duration) CacheDirectory { return CacheDirectory{ ErrTTL: errTTL, InvalidHandleTTL: invalidHandleTTL, From 048c0b9af1ad57fc31f57596f56c1272feabb3c1 Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Wed, 25 Sep 2024 01:46:24 -0700 Subject: [PATCH 03/17] redisdir: add invalid handle TTL --- atproto/identity/redisdir/redis_directory.go | 23 ++++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/atproto/identity/redisdir/redis_directory.go b/atproto/identity/redisdir/redis_directory.go index 90817b6dc..698939c7b 100644 --- a/atproto/identity/redisdir/redis_directory.go +++ b/atproto/identity/redisdir/redis_directory.go @@ -20,9 +20,10 @@ var redisDirPrefix string = "dir/" // // Includes an in-process LRU cache as well (provided by the redis client library), for hot key (identities). type RedisDirectory struct { - Inner identity.Directory - ErrTTL time.Duration - HitTTL time.Duration + Inner identity.Directory + ErrTTL time.Duration + HitTTL time.Duration + InvalidHandleTTL time.Duration handleCache *cache.Cache identityCache *cache.Cache @@ -49,7 +50,7 @@ var _ identity.Directory = (*RedisDirectory)(nil) // `redisURL` contains all the redis connection config options. // `hitTTL` and `errTTL` define how long successful and errored identity metadata should be cached (respectively). errTTL is expected to be shorted than hitTTL. // `lruSize` is the size of the in-process cache, for each of the handle and identity caches. 10000 is a reasonable default. -func NewRedisDirectory(inner identity.Directory, redisURL string, hitTTL, errTTL time.Duration, lruSize int) (*RedisDirectory, error) { +func NewRedisDirectory(inner identity.Directory, redisURL string, hitTTL, errTTL, invalidHandleTTL time.Duration, lruSize int) (*RedisDirectory, error) { opt, err := redis.ParseURL(redisURL) if err != nil { return nil, err @@ -69,11 +70,12 @@ func NewRedisDirectory(inner identity.Directory, redisURL string, hitTTL, errTTL LocalCache: cache.NewTinyLFU(lruSize, hitTTL), }) return &RedisDirectory{ - Inner: inner, - ErrTTL: errTTL, - HitTTL: hitTTL, - handleCache: handleCache, - identityCache: identityCache, + Inner: inner, + ErrTTL: errTTL, + HitTTL: hitTTL, + InvalidHandleTTL: invalidHandleTTL, + handleCache: handleCache, + identityCache: identityCache, }, nil } @@ -88,6 +90,9 @@ func (d *RedisDirectory) isIdentityStale(e *identityEntry) bool { if e.Err != nil && time.Since(e.Updated) > d.ErrTTL { return true } + if e.Identity != nil && e.Identity.Handle.IsInvalidHandle() && time.Since(e.Updated) > d.InvalidHandleTTL { + return true + } return false } From e30e51b329d68f46c768f1a52dc8bac517aa1694 Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Wed, 25 Sep 2024 01:47:40 -0700 Subject: [PATCH 04/17] redisdir: have cache update code match caching directory --- atproto/identity/redisdir/redis_directory.go | 43 ++++++++++++-------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/atproto/identity/redisdir/redis_directory.go b/atproto/identity/redisdir/redis_directory.go index 698939c7b..0eee1dde0 100644 --- a/atproto/identity/redisdir/redis_directory.go +++ b/atproto/identity/redisdir/redis_directory.go @@ -96,7 +96,7 @@ func (d *RedisDirectory) isIdentityStale(e *identityEntry) bool { return false } -func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) (*handleEntry, error) { +func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) handleEntry { h = h.Normalize() ident, err := d.Inner.LookupHandle(ctx, h) if err != nil { @@ -112,9 +112,10 @@ func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) (*ha TTL: d.ErrTTL, }) if err != nil { - return nil, err + he.DID = "" + he.Err = err + return he } - return &he, nil } entry := identityEntry{ @@ -135,7 +136,9 @@ func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) (*ha TTL: d.HitTTL, }) if err != nil { - return nil, err + he.DID = "" + he.Err = err + return he } err = d.handleCache.Set(&cache.Item{ Ctx: ctx, @@ -144,9 +147,11 @@ func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) (*ha TTL: d.HitTTL, }) if err != nil { - return nil, err + he.DID = "" + he.Err = err + return he } - return &he, nil + return he } func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) { @@ -183,21 +188,23 @@ func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (sy } } - var did syntax.DID // Update the Handle Entry from PLC and cache the result - newEntry, err := d.updateHandle(ctx, h) - if err == nil && newEntry != nil { - did = newEntry.DID - } + newEntry := d.updateHandle(ctx, h) // Cleanup the coalesce map and close the results channel d.handleLookupChans.Delete(h.String()) // Callers waiting will now get the result from the cache close(res) - return did, err + if newEntry.Err != nil { + return "", newEntry.Err + } + if newEntry.DID != "" { + return newEntry.DID, nil + } + return "", fmt.Errorf("unexpected control-flow error") } -func (d *RedisDirectory) updateDID(ctx context.Context, did syntax.DID) (*identityEntry, error) { +func (d *RedisDirectory) updateDID(ctx context.Context, did syntax.DID) identityEntry { ident, err := d.Inner.LookupDID(ctx, did) // persist the identity lookup error, instead of processing it immediately entry := identityEntry{ @@ -222,7 +229,9 @@ func (d *RedisDirectory) updateDID(ctx context.Context, did syntax.DID) (*identi TTL: d.HitTTL, }) if err != nil { - return nil, err + entry.Identity = nil + entry.Err = err + return entry } if he != nil { err = d.handleCache.Set(&cache.Item{ @@ -232,10 +241,12 @@ func (d *RedisDirectory) updateDID(ctx context.Context, did syntax.DID) (*identi TTL: d.HitTTL, }) if err != nil { - return nil, err + entry.Identity = nil + entry.Err = err + return entry } } - return &entry, nil + return entry } func (d *RedisDirectory) LookupDID(ctx context.Context, did syntax.DID) (*identity.Identity, error) { From 23497fa3579ff5708f47ef498b51318269483436 Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Wed, 25 Sep 2024 01:48:14 -0700 Subject: [PATCH 05/17] redisdir: refactor to *WithCacheState, matching cached directory --- atproto/identity/redisdir/redis_directory.go | 50 +++++++++++++------- 1 file changed, 32 insertions(+), 18 deletions(-) diff --git a/atproto/identity/redisdir/redis_directory.go b/atproto/identity/redisdir/redis_directory.go index 0eee1dde0..936b743f3 100644 --- a/atproto/identity/redisdir/redis_directory.go +++ b/atproto/identity/redisdir/redis_directory.go @@ -250,14 +250,19 @@ func (d *RedisDirectory) updateDID(ctx context.Context, did syntax.DID) identity } func (d *RedisDirectory) LookupDID(ctx context.Context, did syntax.DID) (*identity.Identity, error) { + id, _, err := d.LookupDIDWithCacheState(ctx, did) + return id, err +} + +func (d *RedisDirectory) LookupDIDWithCacheState(ctx context.Context, did syntax.DID) (*identity.Identity, bool, error) { var entry identityEntry err := d.identityCache.Get(ctx, redisDirPrefix+did.String(), &entry) if err != nil && err != cache.ErrCacheMiss { - return nil, err + return nil, false, err } if err != cache.ErrCacheMiss && !d.isIdentityStale(&entry) { identityCacheHits.Inc() - return entry.Identity, entry.Err + return entry.Identity, true, entry.Err } identityCacheMisses.Inc() @@ -272,51 +277,60 @@ func (d *RedisDirectory) LookupDID(ctx context.Context, did syntax.DID) (*identi // The result should now be in the cache err = d.identityCache.Get(ctx, redisDirPrefix+did.String(), &entry) if err != nil && err != cache.ErrCacheMiss { - return nil, err + return nil, false, err } if err != cache.ErrCacheMiss && !d.isIdentityStale(&entry) { - return entry.Identity, entry.Err + return entry.Identity, false, entry.Err } - return nil, fmt.Errorf("identity not found in cache after coalesce returned") + return nil, false, fmt.Errorf("identity not found in cache after coalesce returned") case <-ctx.Done(): - return nil, ctx.Err() + return nil, false, ctx.Err() } } - var doc *identity.Identity // Update the Identity Entry from PLC and cache the result - newEntry, err := d.updateDID(ctx, did) - if err == nil && newEntry != nil { - doc = newEntry.Identity - } + newEntry := d.updateDID(ctx, did) // Cleanup the coalesce map and close the results channel d.didLookupChans.Delete(did.String()) // Callers waiting will now get the result from the cache close(res) - return doc, err + if newEntry.Err != nil { + return nil, false, newEntry.Err + } + if newEntry.Identity != nil { + return newEntry.Identity, false, nil + } + return nil, false, fmt.Errorf("unexpected control-flow error") } func (d *RedisDirectory) LookupHandle(ctx context.Context, h syntax.Handle) (*identity.Identity, error) { + ident, _, err := d.LookupHandleWithCacheState(ctx, h) + return ident, err +} + +func (d *RedisDirectory) LookupHandleWithCacheState(ctx context.Context, h syntax.Handle) (*identity.Identity, bool, error) { + h = h.Normalize() did, err := d.ResolveHandle(ctx, h) if err != nil { - return nil, err + return nil, false, err } - ident, err := d.LookupDID(ctx, did) + ident, hit, err := d.LookupDIDWithCacheState(ctx, did) if err != nil { - return nil, err + return nil, hit, err } declared, err := ident.DeclaredHandle() if err != nil { - return nil, err + return nil, hit, err } if declared != h { - return nil, fmt.Errorf("handle does not match that declared in DID document") + return nil, hit, identity.ErrHandleMismatch } - return ident, nil + return ident, hit, nil } + func (d *RedisDirectory) Lookup(ctx context.Context, a syntax.AtIdentifier) (*identity.Identity, error) { handle, err := a.AsHandle() if nil == err { // if not an error, is a handle From 22f38fbadd908e769ab95bf7218452d432bf49bc Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Wed, 25 Sep 2024 02:10:43 -0700 Subject: [PATCH 06/17] fix missing return --- atproto/identity/redisdir/redis_directory.go | 1 + 1 file changed, 1 insertion(+) diff --git a/atproto/identity/redisdir/redis_directory.go b/atproto/identity/redisdir/redis_directory.go index 936b743f3..8344ce783 100644 --- a/atproto/identity/redisdir/redis_directory.go +++ b/atproto/identity/redisdir/redis_directory.go @@ -116,6 +116,7 @@ func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) hand he.Err = err return he } + return he } entry := identityEntry{ From 618b150227f8ede21a710b04e88145f09f8fdf19 Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Wed, 25 Sep 2024 03:25:50 -0700 Subject: [PATCH 07/17] fix redisdir purging --- atproto/identity/redisdir/redis_directory.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/atproto/identity/redisdir/redis_directory.go b/atproto/identity/redisdir/redis_directory.go index 8344ce783..b0460f990 100644 --- a/atproto/identity/redisdir/redis_directory.go +++ b/atproto/identity/redisdir/redis_directory.go @@ -348,7 +348,7 @@ func (d *RedisDirectory) Purge(ctx context.Context, a syntax.AtIdentifier) error handle, err := a.AsHandle() if nil == err { // if not an error, is a handle handle = handle.Normalize() - err = d.handleCache.Delete(ctx, handle.String()) + err = d.handleCache.Delete(ctx, redisDirPrefix+handle.String()) if err == cache.ErrCacheMiss { return nil } @@ -356,7 +356,7 @@ func (d *RedisDirectory) Purge(ctx context.Context, a syntax.AtIdentifier) error } did, err := a.AsDID() if nil == err { // if not an error, is a DID - err = d.identityCache.Delete(ctx, did.String()) + err = d.identityCache.Delete(ctx, redisDirPrefix+did.String()) if err == cache.ErrCacheMiss { return nil } From 7e2b115921c66a0d711d66601f38117d8eb1b18e Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Wed, 25 Sep 2024 03:26:33 -0700 Subject: [PATCH 08/17] redisdir: DID as nullable This fixes a bug where de-serializing the cached object was confusingly resulting in an unmarshal error. --- atproto/identity/redisdir/redis_directory.go | 21 ++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/atproto/identity/redisdir/redis_directory.go b/atproto/identity/redisdir/redis_directory.go index b0460f990..5aef9806b 100644 --- a/atproto/identity/redisdir/redis_directory.go +++ b/atproto/identity/redisdir/redis_directory.go @@ -33,8 +33,9 @@ type RedisDirectory struct { type handleEntry struct { Updated time.Time - DID syntax.DID - Err error + // needs to be pointer type, because unmarshalling empty string would be an error + DID *syntax.DID + Err error } type identityEntry struct { @@ -102,7 +103,7 @@ func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) hand if err != nil { he := handleEntry{ Updated: time.Now(), - DID: "", + DID: nil, Err: err, } err = d.handleCache.Set(&cache.Item{ @@ -112,7 +113,7 @@ func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) hand TTL: d.ErrTTL, }) if err != nil { - he.DID = "" + he.DID = nil he.Err = err return he } @@ -126,7 +127,7 @@ func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) hand } he := handleEntry{ Updated: time.Now(), - DID: ident.DID, + DID: &ident.DID, Err: nil, } @@ -137,7 +138,7 @@ func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) hand TTL: d.HitTTL, }) if err != nil { - he.DID = "" + he.DID = nil he.Err = err return he } @@ -148,7 +149,7 @@ func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) hand TTL: d.HitTTL, }) if err != nil { - he.DID = "" + he.DID = nil he.Err = err return he } @@ -199,8 +200,8 @@ func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (sy if newEntry.Err != nil { return "", newEntry.Err } - if newEntry.DID != "" { - return newEntry.DID, nil + if newEntry.DID != nil { + return *newEntry.DID, nil } return "", fmt.Errorf("unexpected control-flow error") } @@ -218,7 +219,7 @@ func (d *RedisDirectory) updateDID(ctx context.Context, did syntax.DID) identity if nil == err && !ident.Handle.IsInvalidHandle() { he = &handleEntry{ Updated: time.Now(), - DID: did, + DID: &did, Err: nil, } } From 64033934dfcbf04f828c31209458032d0fa46a56 Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Wed, 25 Sep 2024 03:34:32 -0700 Subject: [PATCH 09/17] redisdir: better meta error messages --- atproto/identity/redisdir/redis_directory.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/atproto/identity/redisdir/redis_directory.go b/atproto/identity/redisdir/redis_directory.go index 5aef9806b..476a17ed8 100644 --- a/atproto/identity/redisdir/redis_directory.go +++ b/atproto/identity/redisdir/redis_directory.go @@ -114,7 +114,7 @@ func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) hand }) if err != nil { he.DID = nil - he.Err = err + he.Err = fmt.Errorf("identity cache write: %w", err) return he } return he @@ -139,7 +139,7 @@ func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) hand }) if err != nil { he.DID = nil - he.Err = err + he.Err = fmt.Errorf("identity cache write: %w", err) return he } err = d.handleCache.Set(&cache.Item{ @@ -150,7 +150,7 @@ func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) hand }) if err != nil { he.DID = nil - he.Err = err + he.Err = fmt.Errorf("identity cache write: %w", err) return he } return he @@ -160,7 +160,7 @@ func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (sy var entry handleEntry err := d.handleCache.Get(ctx, redisDirPrefix+h.String(), &entry) if err != nil && err != cache.ErrCacheMiss { - return "", err + return "", fmt.Errorf("identity cache read: %w", err) } if err != cache.ErrCacheMiss && !d.isHandleStale(&entry) { handleCacheHits.Inc() @@ -179,7 +179,7 @@ func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (sy // The result should now be in the cache err := d.handleCache.Get(ctx, redisDirPrefix+h.String(), entry) if err != nil && err != cache.ErrCacheMiss { - return "", err + return "", fmt.Errorf("identity cache read: %w", err) } if err != cache.ErrCacheMiss && !d.isHandleStale(&entry) { return entry.DID, entry.Err @@ -232,7 +232,7 @@ func (d *RedisDirectory) updateDID(ctx context.Context, did syntax.DID) identity }) if err != nil { entry.Identity = nil - entry.Err = err + entry.Err = fmt.Errorf("identity cache write: %v", err) return entry } if he != nil { @@ -244,7 +244,7 @@ func (d *RedisDirectory) updateDID(ctx context.Context, did syntax.DID) identity }) if err != nil { entry.Identity = nil - entry.Err = err + entry.Err = fmt.Errorf("identity cache write: %v", err) return entry } } @@ -260,7 +260,7 @@ func (d *RedisDirectory) LookupDIDWithCacheState(ctx context.Context, did syntax var entry identityEntry err := d.identityCache.Get(ctx, redisDirPrefix+did.String(), &entry) if err != nil && err != cache.ErrCacheMiss { - return nil, false, err + return nil, false, fmt.Errorf("identity cache read: %v", err) } if err != cache.ErrCacheMiss && !d.isIdentityStale(&entry) { identityCacheHits.Inc() @@ -279,7 +279,7 @@ func (d *RedisDirectory) LookupDIDWithCacheState(ctx context.Context, did syntax // The result should now be in the cache err = d.identityCache.Get(ctx, redisDirPrefix+did.String(), &entry) if err != nil && err != cache.ErrCacheMiss { - return nil, false, err + return nil, false, fmt.Errorf("identity cache read: %v", err) } if err != cache.ErrCacheMiss && !d.isIdentityStale(&entry) { return entry.Identity, false, entry.Err From f658332e03b819769be9bf5d57605084fc3ecd26 Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Wed, 25 Sep 2024 03:35:49 -0700 Subject: [PATCH 10/17] cleanups --- atproto/identity/redisdir/redis_directory.go | 27 +++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/atproto/identity/redisdir/redis_directory.go b/atproto/identity/redisdir/redis_directory.go index 476a17ed8..6eef690c4 100644 --- a/atproto/identity/redisdir/redis_directory.go +++ b/atproto/identity/redisdir/redis_directory.go @@ -162,9 +162,15 @@ func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (sy if err != nil && err != cache.ErrCacheMiss { return "", fmt.Errorf("identity cache read: %w", err) } - if err != cache.ErrCacheMiss && !d.isHandleStale(&entry) { + if nil == err && !d.isHandleStale(&entry) { handleCacheHits.Inc() - return entry.DID, entry.Err + if entry.Err != nil { + return "", entry.Err + } else if entry.DID != nil { + return *entry.DID, nil + } else { + return "", fmt.Errorf("code flow error in redis identity directory") + } } handleCacheMisses.Inc() @@ -181,8 +187,14 @@ func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (sy if err != nil && err != cache.ErrCacheMiss { return "", fmt.Errorf("identity cache read: %w", err) } - if err != cache.ErrCacheMiss && !d.isHandleStale(&entry) { - return entry.DID, entry.Err + if nil == err && !d.isHandleStale(&entry) { + if entry.Err != nil { + return "", entry.Err + } else if entry.DID != nil { + return *entry.DID, nil + } else { + return "", fmt.Errorf("code flow error in redis identity directory") + } } return "", fmt.Errorf("identity not found in cache after coalesce returned") case <-ctx.Done(): @@ -192,6 +204,7 @@ func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (sy // Update the Handle Entry from PLC and cache the result newEntry := d.updateHandle(ctx, h) + // Cleanup the coalesce map and close the results channel d.handleLookupChans.Delete(h.String()) // Callers waiting will now get the result from the cache @@ -262,7 +275,7 @@ func (d *RedisDirectory) LookupDIDWithCacheState(ctx context.Context, did syntax if err != nil && err != cache.ErrCacheMiss { return nil, false, fmt.Errorf("identity cache read: %v", err) } - if err != cache.ErrCacheMiss && !d.isIdentityStale(&entry) { + if nil == err && !d.isIdentityStale(&entry) { identityCacheHits.Inc() return entry.Identity, true, entry.Err } @@ -281,7 +294,7 @@ func (d *RedisDirectory) LookupDIDWithCacheState(ctx context.Context, did syntax if err != nil && err != cache.ErrCacheMiss { return nil, false, fmt.Errorf("identity cache read: %v", err) } - if err != cache.ErrCacheMiss && !d.isIdentityStale(&entry) { + if nil == err && !d.isIdentityStale(&entry) { return entry.Identity, false, entry.Err } return nil, false, fmt.Errorf("identity not found in cache after coalesce returned") @@ -292,6 +305,7 @@ func (d *RedisDirectory) LookupDIDWithCacheState(ctx context.Context, did syntax // Update the Identity Entry from PLC and cache the result newEntry := d.updateDID(ctx, did) + // Cleanup the coalesce map and close the results channel d.didLookupChans.Delete(did.String()) // Callers waiting will now get the result from the cache @@ -332,7 +346,6 @@ func (d *RedisDirectory) LookupHandleWithCacheState(ctx context.Context, h synta return ident, hit, nil } - func (d *RedisDirectory) Lookup(ctx context.Context, a syntax.AtIdentifier) (*identity.Identity, error) { handle, err := a.AsHandle() if nil == err { // if not an error, is a handle From a2efc12f5185e673b6b44c687e8d7439e131cf56 Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Wed, 25 Sep 2024 03:37:07 -0700 Subject: [PATCH 11/17] identity: break out cache metrics declarations --- atproto/identity/cache_directory.go | 32 ------------------------- atproto/identity/metrics.go | 36 +++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 32 deletions(-) create mode 100644 atproto/identity/metrics.go diff --git a/atproto/identity/cache_directory.go b/atproto/identity/cache_directory.go index dad78fe82..6989049a3 100644 --- a/atproto/identity/cache_directory.go +++ b/atproto/identity/cache_directory.go @@ -7,8 +7,6 @@ import ( "time" "github.com/bluesky-social/indigo/atproto/syntax" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/hashicorp/golang-lru/v2/expirable" ) @@ -35,36 +33,6 @@ type IdentityEntry struct { Err error } -var handleCacheHits = promauto.NewCounter(prometheus.CounterOpts{ - Name: "atproto_directory_handle_cache_hits", - Help: "Number of cache hits for ATProto handle lookups", -}) - -var handleCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ - Name: "atproto_directory_handle_cache_misses", - Help: "Number of cache misses for ATProto handle lookups", -}) - -var identityCacheHits = promauto.NewCounter(prometheus.CounterOpts{ - Name: "atproto_directory_identity_cache_hits", - Help: "Number of cache hits for ATProto identity lookups", -}) - -var identityCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ - Name: "atproto_directory_identity_cache_misses", - Help: "Number of cache misses for ATProto identity lookups", -}) - -var identityRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ - Name: "atproto_directory_identity_requests_coalesced", - Help: "Number of identity requests coalesced", -}) - -var handleRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ - Name: "atproto_directory_handle_requests_coalesced", - Help: "Number of handle requests coalesced", -}) - var _ Directory = (*CacheDirectory)(nil) // Capacity of zero means unlimited size. Similarly, ttl of zero means unlimited duration. diff --git a/atproto/identity/metrics.go b/atproto/identity/metrics.go new file mode 100644 index 000000000..d6061c9ce --- /dev/null +++ b/atproto/identity/metrics.go @@ -0,0 +1,36 @@ +package identity + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var handleCacheHits = promauto.NewCounter(prometheus.CounterOpts{ + Name: "atproto_directory_handle_cache_hits", + Help: "Number of cache hits for ATProto handle lookups", +}) + +var handleCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ + Name: "atproto_directory_handle_cache_misses", + Help: "Number of cache misses for ATProto handle lookups", +}) + +var identityCacheHits = promauto.NewCounter(prometheus.CounterOpts{ + Name: "atproto_directory_identity_cache_hits", + Help: "Number of cache hits for ATProto identity lookups", +}) + +var identityCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ + Name: "atproto_directory_identity_cache_misses", + Help: "Number of cache misses for ATProto identity lookups", +}) + +var identityRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ + Name: "atproto_directory_identity_requests_coalesced", + Help: "Number of identity requests coalesced", +}) + +var handleRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ + Name: "atproto_directory_handle_requests_coalesced", + Help: "Number of handle requests coalesced", +}) From de7eb52ac7454d264ae0849e230b83200dfa749b Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Wed, 25 Sep 2024 03:37:48 -0700 Subject: [PATCH 12/17] identity: catch invalid handle sooner --- atproto/identity/cache_directory.go | 3 +++ atproto/identity/handle.go | 4 ++++ atproto/identity/redisdir/redis_directory.go | 3 +++ 3 files changed, 10 insertions(+) diff --git a/atproto/identity/cache_directory.go b/atproto/identity/cache_directory.go index 6989049a3..2df1ac5c5 100644 --- a/atproto/identity/cache_directory.go +++ b/atproto/identity/cache_directory.go @@ -92,6 +92,9 @@ func (d *CacheDirectory) updateHandle(ctx context.Context, h syntax.Handle) Hand } func (d *CacheDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) { + if h.IsInvalidHandle() { + return "", fmt.Errorf("invalid handle") + } entry, ok := d.handleCache.Get(h) if ok && !d.IsHandleStale(&entry) { handleCacheHits.Inc() diff --git a/atproto/identity/handle.go b/atproto/identity/handle.go index 024ac6e32..77a157f85 100644 --- a/atproto/identity/handle.go +++ b/atproto/identity/handle.go @@ -168,6 +168,10 @@ func (d *BaseDirectory) ResolveHandle(ctx context.Context, handle syntax.Handle) var dnsErr error var did syntax.DID + if handle.IsInvalidHandle() { + return "", fmt.Errorf("invalid handle") + } + if !handle.AllowedTLD() { return "", ErrHandleReservedTLD } diff --git a/atproto/identity/redisdir/redis_directory.go b/atproto/identity/redisdir/redis_directory.go index 6eef690c4..d8369382e 100644 --- a/atproto/identity/redisdir/redis_directory.go +++ b/atproto/identity/redisdir/redis_directory.go @@ -157,6 +157,9 @@ func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) hand } func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) { + if h.IsInvalidHandle() { + return "", fmt.Errorf("invalid handle") + } var entry handleEntry err := d.handleCache.Get(ctx, redisDirPrefix+h.String(), &entry) if err != nil && err != cache.ErrCacheMiss { From f570fec249bbbaf9f7241a9d4d803860de10d544 Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Wed, 25 Sep 2024 03:41:47 -0700 Subject: [PATCH 13/17] redisdir live test (skipped) --- atproto/identity/redisdir/live_test.go | 136 +++++++++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100644 atproto/identity/redisdir/live_test.go diff --git a/atproto/identity/redisdir/live_test.go b/atproto/identity/redisdir/live_test.go new file mode 100644 index 000000000..433c45b8c --- /dev/null +++ b/atproto/identity/redisdir/live_test.go @@ -0,0 +1,136 @@ +package redisdir + +import ( + "context" + "log/slog" + "net/http" + "strings" + "sync" + "testing" + "time" + + "github.com/bluesky-social/indigo/atproto/identity" + "github.com/bluesky-social/indigo/atproto/syntax" + "golang.org/x/time/rate" + + "github.com/stretchr/testify/assert" +) + +var redisLocalTestURL string = "redis://localhost:6379/0" + +// NOTE: this hits the open internet! marked as skip below by default +func testDirectoryLive(t *testing.T, d identity.Directory) { + assert := assert.New(t) + ctx := context.Background() + + handle := syntax.Handle("atproto.com") + did := syntax.DID("did:plc:ewvi7nxzyoun6zhxrhs64oiz") + pdsSuffix := "host.bsky.network" + + resp, err := d.LookupHandle(ctx, handle) + assert.NoError(err) + assert.Equal(handle, resp.Handle) + assert.Equal(did, resp.DID) + assert.True(strings.HasSuffix(resp.PDSEndpoint(), pdsSuffix)) + dh, err := resp.DeclaredHandle() + assert.NoError(err) + assert.Equal(handle, dh) + pk, err := resp.PublicKey() + assert.NoError(err) + assert.NotNil(pk) + + resp, err = d.LookupDID(ctx, did) + assert.NoError(err) + assert.Equal(handle, resp.Handle) + assert.Equal(did, resp.DID) + assert.True(strings.HasSuffix(resp.PDSEndpoint(), pdsSuffix)) + + _, err = d.LookupHandle(ctx, syntax.Handle("fake-dummy-no-resolve.atproto.com")) + assert.Error(err) + //assert.ErrorIs(err, identity.ErrHandleNotFound) + + _, err = d.LookupDID(ctx, syntax.DID("did:web:fake-dummy-no-resolve.atproto.com")) + assert.Error(err) + //assert.ErrorIs(err, identity.ErrDIDNotFound) + + _, err = d.LookupDID(ctx, syntax.DID("did:plc:fake-dummy-no-resolve.atproto.com")) + assert.Error(err) + //assert.ErrorIs(err, identity.ErrDIDNotFound) + + _, err = d.LookupHandle(ctx, syntax.HandleInvalid) + assert.Error(err) +} + +func TestRedisDirectory(t *testing.T) { + t.Skip("TODO: skipping live network test") + assert := assert.New(t) + ctx := context.Background() + inner := identity.BaseDirectory{} + d, err := NewRedisDirectory(&inner, redisLocalTestURL, time.Hour*1, time.Hour*1, time.Hour*1, 1000) + if err != nil { + t.Fatal(err) + } + + err = d.Purge(ctx, syntax.Handle("atproto.com").AtIdentifier()) + assert.NoError(err) + err = d.Purge(ctx, syntax.Handle("fake-dummy-no-resolve.atproto.com").AtIdentifier()) + assert.NoError(err) + err = d.Purge(ctx, syntax.DID("did:web:fake-dummy-no-resolve.atproto.com").AtIdentifier()) + assert.NoError(err) + err = d.Purge(ctx, syntax.DID("did:plc:fake-dummy-no-resolve.atproto.com").AtIdentifier()) + assert.NoError(err) + + for i := 0; i < 3; i = i + 1 { + testDirectoryLive(t, d) + } +} + +func TestRedisCoalesce(t *testing.T) { + t.Skip("TODO: skipping live network test") + + assert := assert.New(t) + handle := syntax.Handle("atproto.com") + did := syntax.DID("did:plc:ewvi7nxzyoun6zhxrhs64oiz") + + base := identity.BaseDirectory{ + PLCURL: "https://plc.directory", + HTTPClient: http.Client{ + Timeout: time.Second * 15, + }, + // Limit the number of requests we can make to the PLC to 1 per second + PLCLimiter: rate.NewLimiter(1, 1), + TryAuthoritativeDNS: true, + SkipDNSDomainSuffixes: []string{".bsky.social"}, + } + dir, err := NewRedisDirectory(&base, redisLocalTestURL, time.Hour*1, time.Hour*1, time.Hour*1, 1000) + if err != nil { + t.Fatal(err) + } + // All 60 routines launch at the same time, so they should all miss the cache initially + routines := 60 + wg := sync.WaitGroup{} + + // Cancel the context after 2 seconds, if we're coalescing correctly, we should only make 1 request + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + for i := 0; i < routines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + ident, err := dir.LookupDID(ctx, did) + if err != nil { + slog.Error("Failed lookup", "error", err) + } + assert.NoError(err) + assert.Equal(handle, ident.Handle) + + ident, err = dir.LookupHandle(ctx, handle) + if err != nil { + slog.Error("Failed lookup", "error", err) + } + assert.NoError(err) + assert.Equal(did, ident.DID) + }() + } + wg.Wait() +} From a8f3a4edaeb606d8467fe21dfc5594f11be6160e Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Wed, 25 Sep 2024 03:53:06 -0700 Subject: [PATCH 14/17] hepa: set invalid handle TTL to 5min --- cmd/hepa/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/hepa/main.go b/cmd/hepa/main.go index 6cc0bcb82..dbef6c488 100644 --- a/cmd/hepa/main.go +++ b/cmd/hepa/main.go @@ -172,7 +172,7 @@ func configDirectory(cctx *cli.Context) (identity.Directory, error) { } var dir identity.Directory if cctx.String("redis-url") != "" { - rdir, err := redisdir.NewRedisDirectory(&baseDir, cctx.String("redis-url"), time.Hour*24, time.Minute*2, 10_000) + rdir, err := redisdir.NewRedisDirectory(&baseDir, cctx.String("redis-url"), time.Hour*24, time.Minute*2, time.Minute*5, 10_000) if err != nil { return nil, err } From eb4057413c327e49242e614d546118ef90f591a0 Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Fri, 27 Sep 2024 17:22:00 -0700 Subject: [PATCH 15/17] add warning about error equality --- atproto/identity/redisdir/redis_directory.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/atproto/identity/redisdir/redis_directory.go b/atproto/identity/redisdir/redis_directory.go index d8369382e..26e937abf 100644 --- a/atproto/identity/redisdir/redis_directory.go +++ b/atproto/identity/redisdir/redis_directory.go @@ -51,6 +51,8 @@ var _ identity.Directory = (*RedisDirectory)(nil) // `redisURL` contains all the redis connection config options. // `hitTTL` and `errTTL` define how long successful and errored identity metadata should be cached (respectively). errTTL is expected to be shorted than hitTTL. // `lruSize` is the size of the in-process cache, for each of the handle and identity caches. 10000 is a reasonable default. +// +// NOTE: Errors returned may be inconsistent with the base directory, or between calls. This is because cached errors are serialized/deserialized and that may break equality checks. func NewRedisDirectory(inner identity.Directory, redisURL string, hitTTL, errTTL, invalidHandleTTL time.Duration, lruSize int) (*RedisDirectory, error) { opt, err := redis.ParseURL(redisURL) if err != nil { From 873a7392139277a3d20fd446940ff1275f2c5577 Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Fri, 27 Sep 2024 17:23:19 -0700 Subject: [PATCH 16/17] flip err/nill checks back around --- atproto/identity/redisdir/redis_directory.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/atproto/identity/redisdir/redis_directory.go b/atproto/identity/redisdir/redis_directory.go index 26e937abf..c769d42c6 100644 --- a/atproto/identity/redisdir/redis_directory.go +++ b/atproto/identity/redisdir/redis_directory.go @@ -167,7 +167,7 @@ func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (sy if err != nil && err != cache.ErrCacheMiss { return "", fmt.Errorf("identity cache read: %w", err) } - if nil == err && !d.isHandleStale(&entry) { + if err == nil && !d.isHandleStale(&entry) { // if no error... handleCacheHits.Inc() if entry.Err != nil { return "", entry.Err @@ -192,7 +192,7 @@ func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (sy if err != nil && err != cache.ErrCacheMiss { return "", fmt.Errorf("identity cache read: %w", err) } - if nil == err && !d.isHandleStale(&entry) { + if err == nil && !d.isHandleStale(&entry) { // if no error... if entry.Err != nil { return "", entry.Err } else if entry.DID != nil { @@ -234,7 +234,7 @@ func (d *RedisDirectory) updateDID(ctx context.Context, did syntax.DID) identity } var he *handleEntry // if *not* an error, then also update the handle cache - if nil == err && !ident.Handle.IsInvalidHandle() { + if err == nil && !ident.Handle.IsInvalidHandle() { he = &handleEntry{ Updated: time.Now(), DID: &did, @@ -280,7 +280,7 @@ func (d *RedisDirectory) LookupDIDWithCacheState(ctx context.Context, did syntax if err != nil && err != cache.ErrCacheMiss { return nil, false, fmt.Errorf("identity cache read: %v", err) } - if nil == err && !d.isIdentityStale(&entry) { + if err == nil && !d.isIdentityStale(&entry) { // if no error... identityCacheHits.Inc() return entry.Identity, true, entry.Err } @@ -299,7 +299,7 @@ func (d *RedisDirectory) LookupDIDWithCacheState(ctx context.Context, did syntax if err != nil && err != cache.ErrCacheMiss { return nil, false, fmt.Errorf("identity cache read: %v", err) } - if nil == err && !d.isIdentityStale(&entry) { + if err == nil && !d.isIdentityStale(&entry) { // if no error... return entry.Identity, false, entry.Err } return nil, false, fmt.Errorf("identity not found in cache after coalesce returned") @@ -353,11 +353,11 @@ func (d *RedisDirectory) LookupHandleWithCacheState(ctx context.Context, h synta func (d *RedisDirectory) Lookup(ctx context.Context, a syntax.AtIdentifier) (*identity.Identity, error) { handle, err := a.AsHandle() - if nil == err { // if not an error, is a handle + if err == nil { // if not an error, is a handle return d.LookupHandle(ctx, handle) } did, err := a.AsDID() - if nil == err { // if not an error, is a DID + if err == nil { // if not an error, is a DID return d.LookupDID(ctx, did) } return nil, fmt.Errorf("at-identifier neither a Handle nor a DID") @@ -365,7 +365,7 @@ func (d *RedisDirectory) Lookup(ctx context.Context, a syntax.AtIdentifier) (*id func (d *RedisDirectory) Purge(ctx context.Context, a syntax.AtIdentifier) error { handle, err := a.AsHandle() - if nil == err { // if not an error, is a handle + if err == nil { // if not an error, is a handle handle = handle.Normalize() err = d.handleCache.Delete(ctx, redisDirPrefix+handle.String()) if err == cache.ErrCacheMiss { @@ -374,7 +374,7 @@ func (d *RedisDirectory) Purge(ctx context.Context, a syntax.AtIdentifier) error return err } did, err := a.AsDID() - if nil == err { // if not an error, is a DID + if err == nil { // if not an error, is a DID err = d.identityCache.Delete(ctx, redisDirPrefix+did.String()) if err == cache.ErrCacheMiss { return nil From dc4176cd475c5d891f61daf48ba9f2b20fdfefd4 Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Fri, 27 Sep 2024 17:24:29 -0700 Subject: [PATCH 17/17] replace fmt.Errorf with errors.New for static strings --- atproto/identity/redisdir/redis_directory.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/atproto/identity/redisdir/redis_directory.go b/atproto/identity/redisdir/redis_directory.go index c769d42c6..f1d67f256 100644 --- a/atproto/identity/redisdir/redis_directory.go +++ b/atproto/identity/redisdir/redis_directory.go @@ -2,6 +2,7 @@ package redisdir import ( "context" + "errors" "fmt" "sync" "time" @@ -160,7 +161,7 @@ func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) hand func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) { if h.IsInvalidHandle() { - return "", fmt.Errorf("invalid handle") + return "", errors.New("invalid handle") } var entry handleEntry err := d.handleCache.Get(ctx, redisDirPrefix+h.String(), &entry) @@ -174,7 +175,7 @@ func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (sy } else if entry.DID != nil { return *entry.DID, nil } else { - return "", fmt.Errorf("code flow error in redis identity directory") + return "", errors.New("code flow error in redis identity directory") } } handleCacheMisses.Inc() @@ -198,10 +199,10 @@ func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (sy } else if entry.DID != nil { return *entry.DID, nil } else { - return "", fmt.Errorf("code flow error in redis identity directory") + return "", errors.New("code flow error in redis identity directory") } } - return "", fmt.Errorf("identity not found in cache after coalesce returned") + return "", errors.New("identity not found in cache after coalesce returned") case <-ctx.Done(): return "", ctx.Err() } @@ -221,7 +222,7 @@ func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (sy if newEntry.DID != nil { return *newEntry.DID, nil } - return "", fmt.Errorf("unexpected control-flow error") + return "", errors.New("unexpected control-flow error") } func (d *RedisDirectory) updateDID(ctx context.Context, did syntax.DID) identityEntry { @@ -302,7 +303,7 @@ func (d *RedisDirectory) LookupDIDWithCacheState(ctx context.Context, did syntax if err == nil && !d.isIdentityStale(&entry) { // if no error... return entry.Identity, false, entry.Err } - return nil, false, fmt.Errorf("identity not found in cache after coalesce returned") + return nil, false, errors.New("identity not found in cache after coalesce returned") case <-ctx.Done(): return nil, false, ctx.Err() } @@ -322,7 +323,7 @@ func (d *RedisDirectory) LookupDIDWithCacheState(ctx context.Context, did syntax if newEntry.Identity != nil { return newEntry.Identity, false, nil } - return nil, false, fmt.Errorf("unexpected control-flow error") + return nil, false, errors.New("unexpected control-flow error") } func (d *RedisDirectory) LookupHandle(ctx context.Context, h syntax.Handle) (*identity.Identity, error) { @@ -360,7 +361,7 @@ func (d *RedisDirectory) Lookup(ctx context.Context, a syntax.AtIdentifier) (*id if err == nil { // if not an error, is a DID return d.LookupDID(ctx, did) } - return nil, fmt.Errorf("at-identifier neither a Handle nor a DID") + return nil, errors.New("at-identifier neither a Handle nor a DID") } func (d *RedisDirectory) Purge(ctx context.Context, a syntax.AtIdentifier) error { @@ -381,5 +382,5 @@ func (d *RedisDirectory) Purge(ctx context.Context, a syntax.AtIdentifier) error } return err } - return fmt.Errorf("at-identifier neither a Handle nor a DID") + return errors.New("at-identifier neither a Handle nor a DID") }