Skip to content

Commit

Permalink
Clean up coalescing code
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 committed Sep 29, 2023
1 parent 0abc722 commit 0c7204d
Showing 1 changed file with 39 additions and 70 deletions.
109 changes: 39 additions & 70 deletions atproto/identity/cache_directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,64 +118,50 @@ func (d *CacheDirectory) updateHandle(ctx context.Context, h syntax.Handle) (*Ha
return &he, nil
}

func (d *CacheDirectory) coalescedResolveHandle(ctx context.Context, handle syntax.Handle) (syntax.DID, error) {
resC := make(chan struct{}, 1)
actualLookup := false
func (d *CacheDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) {
entry, ok := d.handleCache.Get(h)
if ok && !d.IsHandleStale(&entry) {
handleCacheHits.Inc()
return entry.DID, entry.Err
}
handleCacheMisses.Inc()

val, loaded := d.handleLookupChans.LoadOrStore(handle.String(), resC)
// Coalesce multiple requests for the same Handle
res := make(chan struct{})
val, loaded := d.handleLookupChans.LoadOrStore(h.String(), res)
if loaded {
handleRequestsCoalesced.Inc()
// Wait for the result from the original goroutine
select {
case <-val.(chan struct{}):
// The result should now be in the cache
entry, ok := d.handleCache.Get(handle)
entry, ok := d.handleCache.Get(h)
if ok && !d.IsHandleStale(&entry) {
return entry.DID, entry.Err
}
return "", fmt.Errorf("identity not found in cache after coalesce returned")
case <-ctx.Done():
return "", ctx.Err()
}
} else {
actualLookup = true
}

var did syntax.DID
var err error

// Perform actual lookup only if this goroutine is the one doing it
if actualLookup {
var entry *HandleEntry
// Update the cache entry for this Handle
entry, err = d.updateHandle(ctx, handle)
if err == nil && entry != nil {
did = entry.DID
}
// Cleanup the coalesce map and close the results channel
d.handleLookupChans.Delete(handle.String())
// Callers waiting will now get the result from the cache
close(resC)
var newEntry *HandleEntry
// Update the cache entry for this Handle
newEntry, err := d.updateHandle(ctx, h)
if err == nil && newEntry != nil {
did = newEntry.DID
}
// Cleanup the coalesce map and close the results channel
d.handleLookupChans.Delete(h.String())
// Callers waiting will now get the result from the cache
close(res)

return did, err
}

func (d *CacheDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) {
entry, ok := d.handleCache.Get(h)
if ok && !d.IsHandleStale(&entry) {
handleCacheHits.Inc()
return entry.DID, entry.Err
}
handleCacheMisses.Inc()

did, err := d.coalescedResolveHandle(ctx, h)
if err != nil {
return "", err
}
return did, nil
}

func (d *CacheDirectory) updateDID(ctx context.Context, did syntax.DID) (*IdentityEntry, error) {
ident, err := d.Inner.LookupDID(ctx, did)
// persist the identity lookup error, instead of processing it immediately
Expand All @@ -201,14 +187,20 @@ func (d *CacheDirectory) updateDID(ctx context.Context, did syntax.DID) (*Identi
return &entry, nil
}

func (d *CacheDirectory) coalescedResolveDID(ctx context.Context, did syntax.DID) (*Identity, error) {
resC := make(chan struct{}, 1)
actualLookup := false
func (d *CacheDirectory) LookupDID(ctx context.Context, did syntax.DID) (*Identity, error) {
entry, ok := d.identityCache.Get(did)
if ok && !d.IsIdentityStale(&entry) {
identityCacheHits.Inc()
return entry.Identity, entry.Err
}
identityCacheMisses.Inc()

val, loaded := d.didLookupChans.LoadOrStore(did.String(), resC)
// Coalesce multiple requests for the same DID
res := make(chan struct{})
val, loaded := d.didLookupChans.LoadOrStore(did.String(), res)
if loaded {
identityRequestsCoalesced.Inc()
// Wait for the result from the original goroutine
// Wait for the result from the pending request
select {
case <-val.(chan struct{}):
// The result should now be in the cache
Expand All @@ -220,45 +212,22 @@ func (d *CacheDirectory) coalescedResolveDID(ctx context.Context, did syntax.DID
case <-ctx.Done():
return nil, ctx.Err()
}
} else {
actualLookup = true
}

var doc *Identity
var err error

// Perform actual lookup only if this goroutine is the one doing it
if actualLookup {
var entry *IdentityEntry
// Update the cache entry for this DID
entry, err = d.updateDID(ctx, did)
if err == nil && entry != nil {
doc = entry.Identity
}
// Cleanup the coalesce map and close the results channel
d.didLookupChans.Delete(did.String())
// Callers waiting will now get the result from the cache
close(resC)
// Update the Identity Entry from PLC and cache the result
newEntry, err := d.updateDID(ctx, did)
if err == nil && newEntry != nil {
doc = newEntry.Identity
}
// Cleanup the coalesce map and close the results channel
d.didLookupChans.Delete(did.String())
// Callers waiting will now get the result from the cache
close(res)

return doc, err
}

func (d *CacheDirectory) LookupDID(ctx context.Context, did syntax.DID) (*Identity, error) {
entry, ok := d.identityCache.Get(did)
if ok && !d.IsIdentityStale(&entry) {
identityCacheHits.Inc()
return entry.Identity, entry.Err
}
identityCacheMisses.Inc()

doc, err := d.coalescedResolveDID(ctx, did)
if err != nil {
return nil, err
}
return doc, nil
}

func (d *CacheDirectory) LookupHandle(ctx context.Context, h syntax.Handle) (*Identity, error) {
did, err := d.ResolveHandle(ctx, h)
if err != nil {
Expand Down

0 comments on commit 0c7204d

Please sign in to comment.