diff --git a/atproto/identity/cache_directory.go b/atproto/identity/cache_directory.go index 55951bbf0..2df1ac5c5 100644 --- a/atproto/identity/cache_directory.go +++ b/atproto/identity/cache_directory.go @@ -7,8 +7,6 @@ import ( "time" "github.com/bluesky-social/indigo/atproto/syntax" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/hashicorp/golang-lru/v2/expirable" ) @@ -35,40 +33,10 @@ type IdentityEntry struct { Err error } -var handleCacheHits = promauto.NewCounter(prometheus.CounterOpts{ - Name: "atproto_directory_handle_cache_hits", - Help: "Number of cache hits for ATProto handle lookups", -}) - -var handleCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ - Name: "atproto_directory_handle_cache_misses", - Help: "Number of cache misses for ATProto handle lookups", -}) - -var identityCacheHits = promauto.NewCounter(prometheus.CounterOpts{ - Name: "atproto_directory_identity_cache_hits", - Help: "Number of cache hits for ATProto identity lookups", -}) - -var identityCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ - Name: "atproto_directory_identity_cache_misses", - Help: "Number of cache misses for ATProto identity lookups", -}) - -var identityRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ - Name: "atproto_directory_identity_requests_coalesced", - Help: "Number of identity requests coalesced", -}) - -var handleRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ - Name: "atproto_directory_handle_requests_coalesced", - Help: "Number of handle requests coalesced", -}) - var _ Directory = (*CacheDirectory)(nil) // Capacity of zero means unlimited size. Similarly, ttl of zero means unlimited duration. -func NewCacheDirectory(inner Directory, capacity int, hitTTL, errTTL time.Duration, invalidHandleTTL time.Duration) CacheDirectory { +func NewCacheDirectory(inner Directory, capacity int, hitTTL, errTTL, invalidHandleTTL time.Duration) CacheDirectory { return CacheDirectory{ ErrTTL: errTTL, InvalidHandleTTL: invalidHandleTTL, @@ -124,6 +92,9 @@ func (d *CacheDirectory) updateHandle(ctx context.Context, h syntax.Handle) Hand } func (d *CacheDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) { + if h.IsInvalidHandle() { + return "", fmt.Errorf("invalid handle") + } entry, ok := d.handleCache.Get(h) if ok && !d.IsHandleStale(&entry) { handleCacheHits.Inc() diff --git a/atproto/identity/handle.go b/atproto/identity/handle.go index 024ac6e32..77a157f85 100644 --- a/atproto/identity/handle.go +++ b/atproto/identity/handle.go @@ -168,6 +168,10 @@ func (d *BaseDirectory) ResolveHandle(ctx context.Context, handle syntax.Handle) var dnsErr error var did syntax.DID + if handle.IsInvalidHandle() { + return "", fmt.Errorf("invalid handle") + } + if !handle.AllowedTLD() { return "", ErrHandleReservedTLD } diff --git a/atproto/identity/metrics.go b/atproto/identity/metrics.go new file mode 100644 index 000000000..d6061c9ce --- /dev/null +++ b/atproto/identity/metrics.go @@ -0,0 +1,36 @@ +package identity + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var handleCacheHits = promauto.NewCounter(prometheus.CounterOpts{ + Name: "atproto_directory_handle_cache_hits", + Help: "Number of cache hits for ATProto handle lookups", +}) + +var handleCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ + Name: "atproto_directory_handle_cache_misses", + Help: "Number of cache misses for ATProto handle lookups", +}) + +var identityCacheHits = promauto.NewCounter(prometheus.CounterOpts{ + Name: "atproto_directory_identity_cache_hits", + Help: "Number of cache hits for ATProto identity lookups", +}) + +var identityCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ + Name: "atproto_directory_identity_cache_misses", + Help: "Number of cache misses for ATProto identity lookups", +}) + +var identityRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ + Name: "atproto_directory_identity_requests_coalesced", + Help: "Number of identity requests coalesced", +}) + +var handleRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ + Name: "atproto_directory_handle_requests_coalesced", + Help: "Number of handle requests coalesced", +}) diff --git a/atproto/identity/redisdir/live_test.go b/atproto/identity/redisdir/live_test.go new file mode 100644 index 000000000..433c45b8c --- /dev/null +++ b/atproto/identity/redisdir/live_test.go @@ -0,0 +1,136 @@ +package redisdir + +import ( + "context" + "log/slog" + "net/http" + "strings" + "sync" + "testing" + "time" + + "github.com/bluesky-social/indigo/atproto/identity" + "github.com/bluesky-social/indigo/atproto/syntax" + "golang.org/x/time/rate" + + "github.com/stretchr/testify/assert" +) + +var redisLocalTestURL string = "redis://localhost:6379/0" + +// NOTE: this hits the open internet! marked as skip below by default +func testDirectoryLive(t *testing.T, d identity.Directory) { + assert := assert.New(t) + ctx := context.Background() + + handle := syntax.Handle("atproto.com") + did := syntax.DID("did:plc:ewvi7nxzyoun6zhxrhs64oiz") + pdsSuffix := "host.bsky.network" + + resp, err := d.LookupHandle(ctx, handle) + assert.NoError(err) + assert.Equal(handle, resp.Handle) + assert.Equal(did, resp.DID) + assert.True(strings.HasSuffix(resp.PDSEndpoint(), pdsSuffix)) + dh, err := resp.DeclaredHandle() + assert.NoError(err) + assert.Equal(handle, dh) + pk, err := resp.PublicKey() + assert.NoError(err) + assert.NotNil(pk) + + resp, err = d.LookupDID(ctx, did) + assert.NoError(err) + assert.Equal(handle, resp.Handle) + assert.Equal(did, resp.DID) + assert.True(strings.HasSuffix(resp.PDSEndpoint(), pdsSuffix)) + + _, err = d.LookupHandle(ctx, syntax.Handle("fake-dummy-no-resolve.atproto.com")) + assert.Error(err) + //assert.ErrorIs(err, identity.ErrHandleNotFound) + + _, err = d.LookupDID(ctx, syntax.DID("did:web:fake-dummy-no-resolve.atproto.com")) + assert.Error(err) + //assert.ErrorIs(err, identity.ErrDIDNotFound) + + _, err = d.LookupDID(ctx, syntax.DID("did:plc:fake-dummy-no-resolve.atproto.com")) + assert.Error(err) + //assert.ErrorIs(err, identity.ErrDIDNotFound) + + _, err = d.LookupHandle(ctx, syntax.HandleInvalid) + assert.Error(err) +} + +func TestRedisDirectory(t *testing.T) { + t.Skip("TODO: skipping live network test") + assert := assert.New(t) + ctx := context.Background() + inner := identity.BaseDirectory{} + d, err := NewRedisDirectory(&inner, redisLocalTestURL, time.Hour*1, time.Hour*1, time.Hour*1, 1000) + if err != nil { + t.Fatal(err) + } + + err = d.Purge(ctx, syntax.Handle("atproto.com").AtIdentifier()) + assert.NoError(err) + err = d.Purge(ctx, syntax.Handle("fake-dummy-no-resolve.atproto.com").AtIdentifier()) + assert.NoError(err) + err = d.Purge(ctx, syntax.DID("did:web:fake-dummy-no-resolve.atproto.com").AtIdentifier()) + assert.NoError(err) + err = d.Purge(ctx, syntax.DID("did:plc:fake-dummy-no-resolve.atproto.com").AtIdentifier()) + assert.NoError(err) + + for i := 0; i < 3; i = i + 1 { + testDirectoryLive(t, d) + } +} + +func TestRedisCoalesce(t *testing.T) { + t.Skip("TODO: skipping live network test") + + assert := assert.New(t) + handle := syntax.Handle("atproto.com") + did := syntax.DID("did:plc:ewvi7nxzyoun6zhxrhs64oiz") + + base := identity.BaseDirectory{ + PLCURL: "https://plc.directory", + HTTPClient: http.Client{ + Timeout: time.Second * 15, + }, + // Limit the number of requests we can make to the PLC to 1 per second + PLCLimiter: rate.NewLimiter(1, 1), + TryAuthoritativeDNS: true, + SkipDNSDomainSuffixes: []string{".bsky.social"}, + } + dir, err := NewRedisDirectory(&base, redisLocalTestURL, time.Hour*1, time.Hour*1, time.Hour*1, 1000) + if err != nil { + t.Fatal(err) + } + // All 60 routines launch at the same time, so they should all miss the cache initially + routines := 60 + wg := sync.WaitGroup{} + + // Cancel the context after 2 seconds, if we're coalescing correctly, we should only make 1 request + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + for i := 0; i < routines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + ident, err := dir.LookupDID(ctx, did) + if err != nil { + slog.Error("Failed lookup", "error", err) + } + assert.NoError(err) + assert.Equal(handle, ident.Handle) + + ident, err = dir.LookupHandle(ctx, handle) + if err != nil { + slog.Error("Failed lookup", "error", err) + } + assert.NoError(err) + assert.Equal(did, ident.DID) + }() + } + wg.Wait() +} diff --git a/atproto/identity/redisdir/redis_directory.go b/atproto/identity/redisdir/redis_directory.go index 3ea9f354a..f1d67f256 100644 --- a/atproto/identity/redisdir/redis_directory.go +++ b/atproto/identity/redisdir/redis_directory.go @@ -2,6 +2,7 @@ package redisdir import ( "context" + "errors" "fmt" "sync" "time" @@ -20,9 +21,10 @@ var redisDirPrefix string = "dir/" // // Includes an in-process LRU cache as well (provided by the redis client library), for hot key (identities). type RedisDirectory struct { - Inner identity.Directory - ErrTTL time.Duration - HitTTL time.Duration + Inner identity.Directory + ErrTTL time.Duration + HitTTL time.Duration + InvalidHandleTTL time.Duration handleCache *cache.Cache identityCache *cache.Cache @@ -32,8 +34,9 @@ type RedisDirectory struct { type handleEntry struct { Updated time.Time - DID syntax.DID - Err error + // needs to be pointer type, because unmarshalling empty string would be an error + DID *syntax.DID + Err error } type identityEntry struct { @@ -49,7 +52,9 @@ var _ identity.Directory = (*RedisDirectory)(nil) // `redisURL` contains all the redis connection config options. // `hitTTL` and `errTTL` define how long successful and errored identity metadata should be cached (respectively). errTTL is expected to be shorted than hitTTL. // `lruSize` is the size of the in-process cache, for each of the handle and identity caches. 10000 is a reasonable default. -func NewRedisDirectory(inner identity.Directory, redisURL string, hitTTL, errTTL time.Duration, lruSize int) (*RedisDirectory, error) { +// +// NOTE: Errors returned may be inconsistent with the base directory, or between calls. This is because cached errors are serialized/deserialized and that may break equality checks. +func NewRedisDirectory(inner identity.Directory, redisURL string, hitTTL, errTTL, invalidHandleTTL time.Duration, lruSize int) (*RedisDirectory, error) { opt, err := redis.ParseURL(redisURL) if err != nil { return nil, err @@ -69,11 +74,12 @@ func NewRedisDirectory(inner identity.Directory, redisURL string, hitTTL, errTTL LocalCache: cache.NewTinyLFU(lruSize, hitTTL), }) return &RedisDirectory{ - Inner: inner, - ErrTTL: errTTL, - HitTTL: hitTTL, - handleCache: handleCache, - identityCache: identityCache, + Inner: inner, + ErrTTL: errTTL, + HitTTL: hitTTL, + InvalidHandleTTL: invalidHandleTTL, + handleCache: handleCache, + identityCache: identityCache, }, nil } @@ -88,16 +94,19 @@ func (d *RedisDirectory) isIdentityStale(e *identityEntry) bool { if e.Err != nil && time.Since(e.Updated) > d.ErrTTL { return true } + if e.Identity != nil && e.Identity.Handle.IsInvalidHandle() && time.Since(e.Updated) > d.InvalidHandleTTL { + return true + } return false } -func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) (*handleEntry, error) { +func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) handleEntry { h = h.Normalize() ident, err := d.Inner.LookupHandle(ctx, h) if err != nil { he := handleEntry{ Updated: time.Now(), - DID: "", + DID: nil, Err: err, } err = d.handleCache.Set(&cache.Item{ @@ -107,9 +116,11 @@ func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) (*ha TTL: d.ErrTTL, }) if err != nil { - return nil, err + he.DID = nil + he.Err = fmt.Errorf("identity cache write: %w", err) + return he } - return &he, nil + return he } entry := identityEntry{ @@ -119,7 +130,7 @@ func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) (*ha } he := handleEntry{ Updated: time.Now(), - DID: ident.DID, + DID: &ident.DID, Err: nil, } @@ -130,7 +141,9 @@ func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) (*ha TTL: d.HitTTL, }) if err != nil { - return nil, err + he.DID = nil + he.Err = fmt.Errorf("identity cache write: %w", err) + return he } err = d.handleCache.Set(&cache.Item{ Ctx: ctx, @@ -139,20 +152,31 @@ func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) (*ha TTL: d.HitTTL, }) if err != nil { - return nil, err + he.DID = nil + he.Err = fmt.Errorf("identity cache write: %w", err) + return he } - return &he, nil + return he } func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) { + if h.IsInvalidHandle() { + return "", errors.New("invalid handle") + } var entry handleEntry err := d.handleCache.Get(ctx, redisDirPrefix+h.String(), &entry) if err != nil && err != cache.ErrCacheMiss { - return "", err + return "", fmt.Errorf("identity cache read: %w", err) } - if err != cache.ErrCacheMiss && !d.isHandleStale(&entry) { + if err == nil && !d.isHandleStale(&entry) { // if no error... handleCacheHits.Inc() - return entry.DID, entry.Err + if entry.Err != nil { + return "", entry.Err + } else if entry.DID != nil { + return *entry.DID, nil + } else { + return "", errors.New("code flow error in redis identity directory") + } } handleCacheMisses.Inc() @@ -167,32 +191,41 @@ func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (sy // The result should now be in the cache err := d.handleCache.Get(ctx, redisDirPrefix+h.String(), entry) if err != nil && err != cache.ErrCacheMiss { - return "", err + return "", fmt.Errorf("identity cache read: %w", err) } - if err != cache.ErrCacheMiss && !d.isHandleStale(&entry) { - return entry.DID, entry.Err + if err == nil && !d.isHandleStale(&entry) { // if no error... + if entry.Err != nil { + return "", entry.Err + } else if entry.DID != nil { + return *entry.DID, nil + } else { + return "", errors.New("code flow error in redis identity directory") + } } - return "", fmt.Errorf("identity not found in cache after coalesce returned") + return "", errors.New("identity not found in cache after coalesce returned") case <-ctx.Done(): return "", ctx.Err() } } - var did syntax.DID // Update the Handle Entry from PLC and cache the result - newEntry, err := d.updateHandle(ctx, h) - if err == nil && newEntry != nil { - did = newEntry.DID - } + newEntry := d.updateHandle(ctx, h) + // Cleanup the coalesce map and close the results channel d.handleLookupChans.Delete(h.String()) // Callers waiting will now get the result from the cache close(res) - return did, err + if newEntry.Err != nil { + return "", newEntry.Err + } + if newEntry.DID != nil { + return *newEntry.DID, nil + } + return "", errors.New("unexpected control-flow error") } -func (d *RedisDirectory) updateDID(ctx context.Context, did syntax.DID) (*identityEntry, error) { +func (d *RedisDirectory) updateDID(ctx context.Context, did syntax.DID) identityEntry { ident, err := d.Inner.LookupDID(ctx, did) // persist the identity lookup error, instead of processing it immediately entry := identityEntry{ @@ -202,10 +235,10 @@ func (d *RedisDirectory) updateDID(ctx context.Context, did syntax.DID) (*identi } var he *handleEntry // if *not* an error, then also update the handle cache - if nil == err && !ident.Handle.IsInvalidHandle() { + if err == nil && !ident.Handle.IsInvalidHandle() { he = &handleEntry{ Updated: time.Now(), - DID: did, + DID: &did, Err: nil, } } @@ -217,7 +250,9 @@ func (d *RedisDirectory) updateDID(ctx context.Context, did syntax.DID) (*identi TTL: d.HitTTL, }) if err != nil { - return nil, err + entry.Identity = nil + entry.Err = fmt.Errorf("identity cache write: %v", err) + return entry } if he != nil { err = d.handleCache.Set(&cache.Item{ @@ -227,21 +262,28 @@ func (d *RedisDirectory) updateDID(ctx context.Context, did syntax.DID) (*identi TTL: d.HitTTL, }) if err != nil { - return nil, err + entry.Identity = nil + entry.Err = fmt.Errorf("identity cache write: %v", err) + return entry } } - return &entry, nil + return entry } func (d *RedisDirectory) LookupDID(ctx context.Context, did syntax.DID) (*identity.Identity, error) { + id, _, err := d.LookupDIDWithCacheState(ctx, did) + return id, err +} + +func (d *RedisDirectory) LookupDIDWithCacheState(ctx context.Context, did syntax.DID) (*identity.Identity, bool, error) { var entry identityEntry err := d.identityCache.Get(ctx, redisDirPrefix+did.String(), &entry) if err != nil && err != cache.ErrCacheMiss { - return nil, err + return nil, false, fmt.Errorf("identity cache read: %v", err) } - if err != cache.ErrCacheMiss && !d.isIdentityStale(&entry) { + if err == nil && !d.isIdentityStale(&entry) { // if no error... identityCacheHits.Inc() - return entry.Identity, entry.Err + return entry.Identity, true, entry.Err } identityCacheMisses.Inc() @@ -256,72 +298,89 @@ func (d *RedisDirectory) LookupDID(ctx context.Context, did syntax.DID) (*identi // The result should now be in the cache err = d.identityCache.Get(ctx, redisDirPrefix+did.String(), &entry) if err != nil && err != cache.ErrCacheMiss { - return nil, err + return nil, false, fmt.Errorf("identity cache read: %v", err) } - if err != cache.ErrCacheMiss && !d.isIdentityStale(&entry) { - return entry.Identity, entry.Err + if err == nil && !d.isIdentityStale(&entry) { // if no error... + return entry.Identity, false, entry.Err } - return nil, fmt.Errorf("identity not found in cache after coalesce returned") + return nil, false, errors.New("identity not found in cache after coalesce returned") case <-ctx.Done(): - return nil, ctx.Err() + return nil, false, ctx.Err() } } - var doc *identity.Identity // Update the Identity Entry from PLC and cache the result - newEntry, err := d.updateDID(ctx, did) - if err == nil && newEntry != nil { - doc = newEntry.Identity - } + newEntry := d.updateDID(ctx, did) + // Cleanup the coalesce map and close the results channel d.didLookupChans.Delete(did.String()) // Callers waiting will now get the result from the cache close(res) - return doc, err + if newEntry.Err != nil { + return nil, false, newEntry.Err + } + if newEntry.Identity != nil { + return newEntry.Identity, false, nil + } + return nil, false, errors.New("unexpected control-flow error") } func (d *RedisDirectory) LookupHandle(ctx context.Context, h syntax.Handle) (*identity.Identity, error) { + ident, _, err := d.LookupHandleWithCacheState(ctx, h) + return ident, err +} + +func (d *RedisDirectory) LookupHandleWithCacheState(ctx context.Context, h syntax.Handle) (*identity.Identity, bool, error) { + h = h.Normalize() did, err := d.ResolveHandle(ctx, h) if err != nil { - return nil, err + return nil, false, err } - ident, err := d.LookupDID(ctx, did) + ident, hit, err := d.LookupDIDWithCacheState(ctx, did) if err != nil { - return nil, err + return nil, hit, err } declared, err := ident.DeclaredHandle() if err != nil { - return nil, err + return nil, hit, err } if declared != h { - return nil, fmt.Errorf("handle does not match that declared in DID document") + return nil, hit, identity.ErrHandleMismatch } - return ident, nil + return ident, hit, nil } func (d *RedisDirectory) Lookup(ctx context.Context, a syntax.AtIdentifier) (*identity.Identity, error) { handle, err := a.AsHandle() - if nil == err { // if not an error, is a handle + if err == nil { // if not an error, is a handle return d.LookupHandle(ctx, handle) } did, err := a.AsDID() - if nil == err { // if not an error, is a DID + if err == nil { // if not an error, is a DID return d.LookupDID(ctx, did) } - return nil, fmt.Errorf("at-identifier neither a Handle nor a DID") + return nil, errors.New("at-identifier neither a Handle nor a DID") } func (d *RedisDirectory) Purge(ctx context.Context, a syntax.AtIdentifier) error { handle, err := a.AsHandle() - if nil == err { // if not an error, is a handle + if err == nil { // if not an error, is a handle handle = handle.Normalize() - return d.handleCache.Delete(ctx, handle.String()) + err = d.handleCache.Delete(ctx, redisDirPrefix+handle.String()) + if err == cache.ErrCacheMiss { + return nil + } + return err } did, err := a.AsDID() - if nil == err { // if not an error, is a DID - return d.identityCache.Delete(ctx, did.String()) + if err == nil { // if not an error, is a DID + err = d.identityCache.Delete(ctx, redisDirPrefix+did.String()) + if err == cache.ErrCacheMiss { + return nil + } + return err } - return fmt.Errorf("at-identifier neither a Handle nor a DID") + return errors.New("at-identifier neither a Handle nor a DID") } diff --git a/cmd/hepa/main.go b/cmd/hepa/main.go index 6cc0bcb82..dbef6c488 100644 --- a/cmd/hepa/main.go +++ b/cmd/hepa/main.go @@ -172,7 +172,7 @@ func configDirectory(cctx *cli.Context) (identity.Directory, error) { } var dir identity.Directory if cctx.String("redis-url") != "" { - rdir, err := redisdir.NewRedisDirectory(&baseDir, cctx.String("redis-url"), time.Hour*24, time.Minute*2, 10_000) + rdir, err := redisdir.NewRedisDirectory(&baseDir, cctx.String("redis-url"), time.Hour*24, time.Minute*2, time.Minute*5, 10_000) if err != nil { return nil, err }