Skip to content

Commit

Permalink
identity/redisdir: various fixes (#534)
Browse files Browse the repository at this point in the history
A bunch of cleanups and bug fixes.

The biggest fix is to apply a bug fix from the cache directory resulting
in "nil/nil" responses. These were fixed in
#479, but not copied to the
redis directory implementation, which has the same coalescing code. This
recently impacted @whyrusleeping's use in discover. This is a
backwards-compatible fix.

Another fix handles cached error messages. An empty string was being
stored for the DID. Because redis cache serializes, and the
de-serialized DID is re-parsed, the empty strings fail to parse,
resulting in an error. I guess we could consider having text parsing of
DIDs not actually parse the string? but that would break auto-validation
when it is expected. The fix was to make that field nullable. this might
make the change not-backwards-compatible with existing cached metadata?
this only impacts cached errors, which have a shorter TTL, so might not
be that big of a deal in practice?

Other smaller cleanups and fixes, which rolled in here to make the code
align better with the in-memory caching directory, so that merge/review
is easier:

- adds invalid handle TTL to redisdir
- adds "WithCacheState" variants of methods, for metrics/perf
measurement
- fix deletion/purging
- special-case handling of resolving invalid handle

One remaining issue is that the cached error types come back as generic
errors (with error kind/type striped). It would be nice if that wasn't
the case, so that code could detect specific types of errors?
Considering doing a manual enum/integer hack to encode the type and
convert the errors to the actual type on read, for a small set of known
identity errors.
  • Loading branch information
whyrusleeping authored Oct 7, 2024
2 parents 2e5d3d7 + dc4176c commit 160af4a
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 100 deletions.
37 changes: 4 additions & 33 deletions atproto/identity/cache_directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"time"

"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/hashicorp/golang-lru/v2/expirable"
)
Expand All @@ -35,40 +33,10 @@ type IdentityEntry struct {
Err error
}

var handleCacheHits = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_directory_handle_cache_hits",
Help: "Number of cache hits for ATProto handle lookups",
})

var handleCacheMisses = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_directory_handle_cache_misses",
Help: "Number of cache misses for ATProto handle lookups",
})

var identityCacheHits = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_directory_identity_cache_hits",
Help: "Number of cache hits for ATProto identity lookups",
})

var identityCacheMisses = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_directory_identity_cache_misses",
Help: "Number of cache misses for ATProto identity lookups",
})

var identityRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_directory_identity_requests_coalesced",
Help: "Number of identity requests coalesced",
})

var handleRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_directory_handle_requests_coalesced",
Help: "Number of handle requests coalesced",
})

var _ Directory = (*CacheDirectory)(nil)

// Capacity of zero means unlimited size. Similarly, ttl of zero means unlimited duration.
func NewCacheDirectory(inner Directory, capacity int, hitTTL, errTTL time.Duration, invalidHandleTTL time.Duration) CacheDirectory {
func NewCacheDirectory(inner Directory, capacity int, hitTTL, errTTL, invalidHandleTTL time.Duration) CacheDirectory {
return CacheDirectory{
ErrTTL: errTTL,
InvalidHandleTTL: invalidHandleTTL,
Expand Down Expand Up @@ -124,6 +92,9 @@ func (d *CacheDirectory) updateHandle(ctx context.Context, h syntax.Handle) Hand
}

func (d *CacheDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) {
if h.IsInvalidHandle() {
return "", fmt.Errorf("invalid handle")
}
entry, ok := d.handleCache.Get(h)
if ok && !d.IsHandleStale(&entry) {
handleCacheHits.Inc()
Expand Down
4 changes: 4 additions & 0 deletions atproto/identity/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ func (d *BaseDirectory) ResolveHandle(ctx context.Context, handle syntax.Handle)
var dnsErr error
var did syntax.DID

if handle.IsInvalidHandle() {
return "", fmt.Errorf("invalid handle")
}

if !handle.AllowedTLD() {
return "", ErrHandleReservedTLD
}
Expand Down
36 changes: 36 additions & 0 deletions atproto/identity/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package identity

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var handleCacheHits = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_directory_handle_cache_hits",
Help: "Number of cache hits for ATProto handle lookups",
})

var handleCacheMisses = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_directory_handle_cache_misses",
Help: "Number of cache misses for ATProto handle lookups",
})

var identityCacheHits = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_directory_identity_cache_hits",
Help: "Number of cache hits for ATProto identity lookups",
})

var identityCacheMisses = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_directory_identity_cache_misses",
Help: "Number of cache misses for ATProto identity lookups",
})

var identityRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_directory_identity_requests_coalesced",
Help: "Number of identity requests coalesced",
})

var handleRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{
Name: "atproto_directory_handle_requests_coalesced",
Help: "Number of handle requests coalesced",
})
136 changes: 136 additions & 0 deletions atproto/identity/redisdir/live_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package redisdir

import (
"context"
"log/slog"
"net/http"
"strings"
"sync"
"testing"
"time"

"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/atproto/syntax"
"golang.org/x/time/rate"

"github.com/stretchr/testify/assert"
)

var redisLocalTestURL string = "redis://localhost:6379/0"

// NOTE: this hits the open internet! marked as skip below by default
func testDirectoryLive(t *testing.T, d identity.Directory) {
assert := assert.New(t)
ctx := context.Background()

handle := syntax.Handle("atproto.com")
did := syntax.DID("did:plc:ewvi7nxzyoun6zhxrhs64oiz")
pdsSuffix := "host.bsky.network"

resp, err := d.LookupHandle(ctx, handle)
assert.NoError(err)
assert.Equal(handle, resp.Handle)
assert.Equal(did, resp.DID)
assert.True(strings.HasSuffix(resp.PDSEndpoint(), pdsSuffix))
dh, err := resp.DeclaredHandle()
assert.NoError(err)
assert.Equal(handle, dh)
pk, err := resp.PublicKey()
assert.NoError(err)
assert.NotNil(pk)

resp, err = d.LookupDID(ctx, did)
assert.NoError(err)
assert.Equal(handle, resp.Handle)
assert.Equal(did, resp.DID)
assert.True(strings.HasSuffix(resp.PDSEndpoint(), pdsSuffix))

_, err = d.LookupHandle(ctx, syntax.Handle("fake-dummy-no-resolve.atproto.com"))
assert.Error(err)
//assert.ErrorIs(err, identity.ErrHandleNotFound)

_, err = d.LookupDID(ctx, syntax.DID("did:web:fake-dummy-no-resolve.atproto.com"))
assert.Error(err)
//assert.ErrorIs(err, identity.ErrDIDNotFound)

_, err = d.LookupDID(ctx, syntax.DID("did:plc:fake-dummy-no-resolve.atproto.com"))
assert.Error(err)
//assert.ErrorIs(err, identity.ErrDIDNotFound)

_, err = d.LookupHandle(ctx, syntax.HandleInvalid)
assert.Error(err)
}

func TestRedisDirectory(t *testing.T) {
t.Skip("TODO: skipping live network test")
assert := assert.New(t)
ctx := context.Background()
inner := identity.BaseDirectory{}
d, err := NewRedisDirectory(&inner, redisLocalTestURL, time.Hour*1, time.Hour*1, time.Hour*1, 1000)
if err != nil {
t.Fatal(err)
}

err = d.Purge(ctx, syntax.Handle("atproto.com").AtIdentifier())
assert.NoError(err)
err = d.Purge(ctx, syntax.Handle("fake-dummy-no-resolve.atproto.com").AtIdentifier())
assert.NoError(err)
err = d.Purge(ctx, syntax.DID("did:web:fake-dummy-no-resolve.atproto.com").AtIdentifier())
assert.NoError(err)
err = d.Purge(ctx, syntax.DID("did:plc:fake-dummy-no-resolve.atproto.com").AtIdentifier())
assert.NoError(err)

for i := 0; i < 3; i = i + 1 {
testDirectoryLive(t, d)
}
}

func TestRedisCoalesce(t *testing.T) {
t.Skip("TODO: skipping live network test")

assert := assert.New(t)
handle := syntax.Handle("atproto.com")
did := syntax.DID("did:plc:ewvi7nxzyoun6zhxrhs64oiz")

base := identity.BaseDirectory{
PLCURL: "https://plc.directory",
HTTPClient: http.Client{
Timeout: time.Second * 15,
},
// Limit the number of requests we can make to the PLC to 1 per second
PLCLimiter: rate.NewLimiter(1, 1),
TryAuthoritativeDNS: true,
SkipDNSDomainSuffixes: []string{".bsky.social"},
}
dir, err := NewRedisDirectory(&base, redisLocalTestURL, time.Hour*1, time.Hour*1, time.Hour*1, 1000)
if err != nil {
t.Fatal(err)
}
// All 60 routines launch at the same time, so they should all miss the cache initially
routines := 60
wg := sync.WaitGroup{}

// Cancel the context after 2 seconds, if we're coalescing correctly, we should only make 1 request
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
for i := 0; i < routines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
ident, err := dir.LookupDID(ctx, did)
if err != nil {
slog.Error("Failed lookup", "error", err)
}
assert.NoError(err)
assert.Equal(handle, ident.Handle)

ident, err = dir.LookupHandle(ctx, handle)
if err != nil {
slog.Error("Failed lookup", "error", err)
}
assert.NoError(err)
assert.Equal(did, ident.DID)
}()
}
wg.Wait()
}
Loading

0 comments on commit 160af4a

Please sign in to comment.