Skip to content

Commit

Permalink
add memcached for did cache to relay
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Nov 21, 2024
1 parent d619a37 commit aff0005
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 9 deletions.
37 changes: 28 additions & 9 deletions cmd/bigsky/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ func run(args []string) error {
EnvVars: []string{"RELAY_DID_CACHE_SIZE"},
Value: 5_000_000,
},
&cli.StringSliceFlag{
Name: "did-memcached",
EnvVars: []string{"RELAY_DID_MEMCACHED"},
},
&cli.DurationFlag{
Name: "event-playback-ttl",
Usage: "time to live for event playback buffering (only applies to disk persister)",
Expand Down Expand Up @@ -339,18 +343,33 @@ func runBigsky(cctx *cli.Context) error {
return err
}

mr := did.NewMultiResolver()
// DID RESOLUTION
// 1. the outside world, PLCSerever or Web
// 2. (maybe memcached)
// 3. in-process cache
var cachedidr did.Resolver
{
mr := did.NewMultiResolver()

didr := &api.PLCServer{Host: cctx.String("plc-host")}
mr.AddHandler("plc", didr)
didr := &api.PLCServer{Host: cctx.String("plc-host")}
mr.AddHandler("plc", didr)

webr := did.WebResolver{}
if cctx.Bool("crawl-insecure-ws") {
webr.Insecure = true
}
mr.AddHandler("web", &webr)
webr := did.WebResolver{}
if cctx.Bool("crawl-insecure-ws") {
webr.Insecure = true
}
mr.AddHandler("web", &webr)

cachedidr := plc.NewCachingDidResolver(mr, time.Hour*24, cctx.Int("did-cache-size"))
var prevResolver did.Resolver
memcachedServers := cctx.StringSlice("did-memcached")
if len(memcachedServers) > 0 {
prevResolver = plc.NewMemcachedDidResolver(mr, time.Hour*24, memcachedServers)
} else {
prevResolver = mr
}

cachedidr = plc.NewCachingDidResolver(prevResolver, time.Hour*24, cctx.Int("did-cache-size"))
}

kmgr := indexer.NewKeyManager(cachedidr, nil)

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b
github.com/adrg/xdg v0.5.0
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874
github.com/brianvoe/gofakeit/v6 v6.25.0
github.com/carlmjohnson/versioninfo v0.22.5
github.com/cockroachdb/pebble v1.1.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 h1:N7oVaKyGp8bttX0bfZGmcGkjz7DLQXhAn3DNd3T0ous=
github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874/go.mod h1:r5xuitiExdLAJ09PR7vBVENGvp4ZuTBeWTGtxuX3K+c=
github.com/brianvoe/gofakeit/v6 v6.25.0 h1:ZpFjktOpLZUeF8q223o0rUuXtA+m5qW5srjvVi+JkXk=
github.com/brianvoe/gofakeit/v6 v6.25.0/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
Expand Down
90 changes: 90 additions & 0 deletions plc/memcached.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package plc

import (
"context"
"encoding/json"
"github.com/bradfitz/gomemcache/memcache"
"go.opentelemetry.io/otel/attribute"
"time"

"github.com/bluesky-social/indigo/did"
"go.opentelemetry.io/otel"
)

type MemcachedDidResolver struct {
mcd *memcache.Client
res did.Resolver
maxAge int32
}

func NewMemcachedDidResolver(res did.Resolver, maxAge time.Duration, servers []string) *MemcachedDidResolver {
expiry := int32(0)
if maxAge.Seconds() > (30 * 24 * 60 * 60) {
// clamp expiry at 30 days minus a minute for memcached
expiry = (30 * 24 * 60 * 60) - 60
} else {
expiry = int32(maxAge.Seconds())
}
client := memcache.New(servers...)
return &MemcachedDidResolver{
mcd: client,
res: res,
maxAge: expiry,
}
}

func (r *MemcachedDidResolver) FlushCacheFor(didstr string) {
r.mcd.Delete(didstr)
r.res.FlushCacheFor(didstr)
}

func (r *MemcachedDidResolver) tryCache(didstr string) (*did.Document, bool) {
ob, err := r.mcd.Get(didstr)
if (ob == nil) || (err != nil) {
return nil, false
}
var doc did.Document
err = json.Unmarshal(ob.Value, &doc)
if err != nil {
// TODO: log error?
return nil, false
}

return &doc, true
}

func (r *MemcachedDidResolver) putCache(did string, doc *did.Document) {
blob, err := json.Marshal(doc)
if err != nil {
// TODO: log error
return
}
item := memcache.Item{
Key: did,
Value: blob,
Expiration: int32(r.maxAge),
}
r.mcd.Set(&item)
}

func (r *MemcachedDidResolver) GetDocument(ctx context.Context, didstr string) (*did.Document, error) {
ctx, span := otel.Tracer("cacheResolver").Start(ctx, "getDocument")
defer span.End()

doc, ok := r.tryCache(didstr)
if ok {
span.SetAttributes(attribute.Bool("cache", true))
cacheHitsTotal.Inc()
return doc, nil
}
cacheMissesTotal.Inc()
span.SetAttributes(attribute.Bool("cache", false))

doc, err := r.res.GetDocument(ctx, didstr)
if err != nil {
return nil, err
}

r.putCache(didstr, doc)
return doc, nil
}

0 comments on commit aff0005

Please sign in to comment.