Skip to content

Commit

Permalink
Update cache (#63)
Browse files Browse the repository at this point in the history
* update logging; set cache size limit

* add size limit

* testing

* update swagger
  • Loading branch information
decentralgabe authored Dec 5, 2023
1 parent 2808dc4 commit d57574d
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 49 deletions.
10 changes: 6 additions & 4 deletions impl/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ type DHTServiceConfig struct {
}

type PKARRServiceConfig struct {
RepublishCRON string `toml:"republish_cron"`
CacheTTLSeconds int `toml:"cache_ttl_seconds"`
RepublishCRON string `toml:"republish_cron"`
CacheTTLSeconds int `toml:"cache_ttl_seconds"`
CacheSizeLimitMB int `toml:"cache_size_limit_mb"`
}

func GetDefaultConfig() Config {
Expand All @@ -75,8 +76,9 @@ func GetDefaultConfig() Config {
BootstrapPeers: GetDefaultBootstrapPeers(),
},
PkarrConfig: PKARRServiceConfig{
RepublishCRON: "0 */2 * * *",
CacheTTLSeconds: 600,
RepublishCRON: "0 */2 * * *",
CacheTTLSeconds: 600,
CacheSizeLimitMB: 2000,
},
}
}
Expand Down
1 change: 1 addition & 0 deletions impl/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ bootstrap_peers = ["router.magnets.im:6881", "router.bittorrent.com:6881", "dht.
[pkarr]
republish_cron = "0 */2 * * *" # every 2 hours
cache_ttl_seconds = 600 # 10 minutes
cache_size_limit_mb = 2000 # 2GB
2 changes: 1 addition & 1 deletion impl/docs/docs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions impl/docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ paths:
get:
consumes:
- application/octet-stream
description: GetRecord a PKARR record from the DHT
description: GetRecord a Pkarr record from the DHT
parameters:
- description: ID to get
in: path
Expand Down Expand Up @@ -50,13 +50,13 @@ paths:
description: Internal server error
schema:
type: string
summary: GetRecord a PKARR record from the DHT
summary: GetRecord a Pkarr record from the DHT
tags:
- PKARR
- Pkarr
put:
consumes:
- application/octet-stream
description: PutRecord a PKARR record into the DHT
description: PutRecord a Pkarr record into the DHT
parameters:
- description: ID of the record to put
in: path
Expand All @@ -82,9 +82,9 @@ paths:
description: Internal server error
schema:
type: string
summary: PutRecord a PKARR record into the DHT
summary: PutRecord a Pkarr record into the DHT
tags:
- PKARR
- Pkarr
/health:
get:
consumes:
Expand Down
5 changes: 5 additions & 0 deletions impl/internal/did/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package did

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -21,6 +22,7 @@ func TestClient(t *testing.T) {
assert.NoError(t, err)
assert.NotEmpty(t, packet)

start := time.Now()
bep44Put, err := dht.CreatePKARRPublishRequest(sk, *packet)
assert.NoError(t, err)
assert.NotEmpty(t, bep44Put)
Expand All @@ -31,4 +33,7 @@ func TestClient(t *testing.T) {
gotDID, _, err := client.GetDIDDocument(doc.ID)
assert.NoError(t, err)
assert.EqualValues(t, doc, gotDID)

since := time.Since(start)
t.Logf("time to put and get: %s", since)
}
4 changes: 2 additions & 2 deletions impl/pkg/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (d *DHT) Get(ctx context.Context, key string) (*getput.GetResult, error) {
}
res, t, err := getput.Get(ctx, infohash.HashBytes(z32Decoded), d.Server, nil, nil)
if err != nil {
return nil, errutil.LoggingNewErrorf("failed to get key<%s> from dht; tried %d nodes, got %d responses", key, t.NumAddrsTried, t.NumResponses)
return nil, errutil.LoggingNewErrorf("failed to get key[%s] from dht; tried %d nodes, got %d responses", key, t.NumAddrsTried, t.NumResponses)
}
return &res, nil
}
Expand All @@ -64,7 +64,7 @@ func (d *DHT) GetFull(ctx context.Context, key string) (*dhtint.FullGetResult, e
}
res, t, err := dhtint.Get(ctx, infohash.HashBytes(z32Decoded), d.Server, nil, nil)
if err != nil {
return nil, errutil.LoggingNewErrorf("failed to get key<%s> from dht; tried %d nodes, got %d responses", key, t.NumAddrsTried, t.NumResponses)
return nil, errutil.LoggingNewErrorf("failed to get key[%s] from dht; tried %d nodes, got %d responses", key, t.NumAddrsTried, t.NumResponses)
}
return &res, nil
}
26 changes: 13 additions & 13 deletions impl/pkg/server/pkarr.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,21 @@ import (
"github.com/TBD54566975/did-dht-method/pkg/service"
)

// PKARRRouter is the router for the PKARR API
type PKARRRouter struct {
// PkarrRouter is the router for the Pkarr API
type PkarrRouter struct {
service *service.PkarrService
}

// NewPKARRRouter returns a new instance of the Relay router
func NewPKARRRouter(service *service.PkarrService) (*PKARRRouter, error) {
return &PKARRRouter{service: service}, nil
// NewPkarrRouter returns a new instance of the Relay router
func NewPkarrRouter(service *service.PkarrService) (*PkarrRouter, error) {
return &PkarrRouter{service: service}, nil
}

// GetRecord godoc
//
// @Summary GetRecord a PKARR record from the DHT
// @Description GetRecord a PKARR record from the DHT
// @Tags PKARR
// @Summary GetRecord a Pkarr record from the DHT
// @Description GetRecord a Pkarr record from the DHT
// @Tags Pkarr
// @Accept octet-stream
// @Produce octet-stream
// @Param id path string true "ID to get"
Expand All @@ -35,7 +35,7 @@ func NewPKARRRouter(service *service.PkarrService) (*PKARRRouter, error) {
// @Failure 404 {string} string "Not found"
// @Failure 500 {string} string "Internal server error"
// @Router /{id} [get]
func (r *PKARRRouter) GetRecord(c *gin.Context) {
func (r *PkarrRouter) GetRecord(c *gin.Context) {
id := GetParam(c, IDParam)
if id == nil || *id == "" {
LoggingRespondErrMsg(c, "missing id param", http.StatusBadRequest)
Expand Down Expand Up @@ -63,17 +63,17 @@ func (r *PKARRRouter) GetRecord(c *gin.Context) {

// PutRecord godoc
//
// @Summary PutRecord a PKARR record into the DHT
// @Description PutRecord a PKARR record into the DHT
// @Tags PKARR
// @Summary PutRecord a Pkarr record into the DHT
// @Description PutRecord a Pkarr record into the DHT
// @Tags Pkarr
// @Accept octet-stream
// @Param id path string true "ID of the record to put"
// @Param request body []byte true "64 bytes sig, 8 bytes u64 big-endian seq, 0-1000 bytes of v."
// @Success 200
// @Failure 400 {string} string "Bad request"
// @Failure 500 {string} string "Internal server error"
// @Router /{id} [put]
func (r *PKARRRouter) PutRecord(c *gin.Context) {
func (r *PkarrRouter) PutRecord(c *gin.Context) {
id := GetParam(c, IDParam)
if id == nil || *id == "" {
LoggingRespondErrMsg(c, "missing id param", http.StatusBadRequest)
Expand Down
4 changes: 2 additions & 2 deletions impl/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func setupLogger(level string) {

logLevel, err := logrus.ParseLevel(level)
if err != nil {
logrus.WithError(err).Errorf("could not parse log level<%s>, setting to info", level)
logrus.WithError(err).Errorf("could not parse log level[%s], setting to info", level)
logrus.SetLevel(logrus.InfoLevel)
} else {
logrus.SetLevel(logLevel)
Expand Down Expand Up @@ -114,7 +114,7 @@ func setupHandler(env config.Environment) *gin.Engine {

// PkarrAPI sets up the relay API routes according to https://github.com/Nuhvi/pkarr/blob/main/design/relays.md
func PkarrAPI(rg *gin.RouterGroup, service *service.PkarrService) error {
relayRouter, err := NewPKARRRouter(service)
relayRouter, err := NewPkarrRouter(service)
if err != nil {
return util.LoggingErrorMsg(err, "could not instantiate relay router")
}
Expand Down
2 changes: 1 addition & 1 deletion impl/pkg/server/server_pkarr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

func TestPKARRRouter(t *testing.T) {
pkarrSvc := testPKARRService(t)
pkarrRouter, err := NewPKARRRouter(&pkarrSvc)
pkarrRouter, err := NewPkarrRouter(&pkarrSvc)
require.NoError(t, err)
require.NotEmpty(t, pkarrRouter)

Expand Down
21 changes: 11 additions & 10 deletions impl/pkg/service/pkarr.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ func NewPkarrService(cfg *config.Config, db *storage.Storage) (*PkarrService, er
}

// create and start cache and scheduler
ttl := time.Duration(cfg.PkarrConfig.CacheTTLSeconds) * time.Second
// TODO(gabe): consider setting size limits on the cache
cache, err := bigcache.New(context.Background(), bigcache.DefaultConfig(ttl))
cacheTTL := time.Duration(cfg.PkarrConfig.CacheTTLSeconds) * time.Second
cacheConfig := bigcache.DefaultConfig(cacheTTL)
cacheConfig.HardMaxCacheSize = cfg.PkarrConfig.CacheSizeLimitMB
cache, err := bigcache.New(context.Background(), cacheConfig)
if err != nil {
return nil, util.LoggingErrorMsg(err, "failed to instantiate cache")
}
Expand Down Expand Up @@ -104,7 +105,6 @@ func (s *PkarrService) PublishPkarr(ctx context.Context, id string, request Publ
}

// write to db and cache
// TODO(gabe): if putting to the DHT fails we should note that in the db and retry later
record := request.toRecord()
if err := s.db.WriteRecord(record); err != nil {
return err
Expand All @@ -122,6 +122,7 @@ func (s *PkarrService) PublishPkarr(ctx context.Context, id string, request Publ
}

// return here and put it in the DHT asynchronously
// TODO(gabe): consider a background process to monitor failures
go s.dht.Put(ctx, bep44.Put{
V: request.V,
K: &request.K,
Expand Down Expand Up @@ -173,13 +174,13 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*GetPkarrRespon
if err != nil {
// try to resolve from storage before returning and error
// if we detect this and have the record we should republish to the DHT
logrus.WithError(err).Warnf("failed to get pkarr<%s> from dht, attempting to resolve from storage", id)
logrus.WithError(err).Warnf("failed to get pkarr record[%s] from dht, attempting to resolve from storage", id)
record, err := s.db.ReadRecord(id)
if err != nil || record == nil {
logrus.WithError(err).Errorf("failed to resolve pkarr<%s> from storage", id)
logrus.WithError(err).Errorf("failed to resolve pkarr record[%s] from storage", id)
return nil, err
}
logrus.Debugf("resolved pkarr<%s> from storage", id)
logrus.Debugf("resolved pkarr record[%s] from storage", id)
return fromPkarrRecord(*record)
}

Expand All @@ -201,10 +202,10 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*GetPkarrRespon
// add the record to cache, do it here to avoid duplicate calculations
recordBytes, err := json.Marshal(resp)
if err != nil {
return nil, util.LoggingErrorMsgf(err, "failed to marshal pkarr record<%s> for cache", id)
return nil, util.LoggingErrorMsgf(err, "failed to marshal pkarr record[%s] for cache", id)
}
if err = s.cache.Set(id, recordBytes); err != nil {
return nil, util.LoggingErrorMsgf(err, "failed to set pkarr record<%s> in cache", id)
return nil, util.LoggingErrorMsgf(err, "failed to set pkarr record[%s] in cache", id)
}

return &resp, nil
Expand All @@ -221,7 +222,7 @@ func (s *PkarrService) republish() {
logrus.Info("No records to republish")
return
}
logrus.Infof("Republishing %d record(s)", len(allRecords))
logrus.Infof("Republishing [%d] record(s)", len(allRecords))
errCnt := 0
for _, record := range allRecords {
put, err := recordToBEP44Put(record)
Expand Down
16 changes: 8 additions & 8 deletions impl/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s *Storage) Read(namespace, key string) ([]byte, error) {
err := s.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(namespace))
if bucket == nil {
logrus.Infof("namespace<%s> does not exist", namespace)
logrus.Infof("namespace[%s] does not exist", namespace)
return nil
}
result = bucket.Get([]byte(key))
Expand All @@ -97,7 +97,7 @@ func (s *Storage) ReadPrefix(namespace, prefix string) (map[string][]byte, error
err := s.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(namespace))
if bucket == nil {
errMsg := fmt.Sprintf("namespace<%s> does not exist", namespace)
errMsg := fmt.Sprintf("namespace[%s] does not exist", namespace)
logrus.Error(errMsg)
return errors.New(errMsg)
}
Expand All @@ -115,7 +115,7 @@ func (s *Storage) ReadAll(namespace string) (map[string][]byte, error) {
err := s.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(namespace))
if bucket == nil {
logrus.Warnf("namespace<%s> does not exist", namespace)
logrus.Warnf("namespace[%s] does not exist", namespace)
return nil
}
cursor := bucket.Cursor()
Expand All @@ -132,7 +132,7 @@ func (s *Storage) ReadAllKeys(namespace string) ([]string, error) {
err := s.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(namespace))
if bucket == nil {
logrus.Warnf("namespace<%s> does not exist", namespace)
logrus.Warnf("namespace[%s] does not exist", namespace)
return nil
}
cursor := bucket.Cursor()
Expand All @@ -148,10 +148,10 @@ func (s *Storage) Update(namespace string, key string, value []byte) error {
return s.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(namespace))
if bucket == nil {
return fmt.Errorf("namespace<%s> does not exist", namespace)
return fmt.Errorf("namespace[%s] does not exist", namespace)
}
if bucket.Get([]byte(key)) == nil {
return fmt.Errorf("key<%s> does not exist", key)
return fmt.Errorf("key[%s] does not exist", key)
}
if err := bucket.Put([]byte(key), value); err != nil {
return err
Expand All @@ -164,7 +164,7 @@ func (s *Storage) Delete(namespace, key string) error {
return s.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(namespace))
if bucket == nil {
return fmt.Errorf("namespace<%s> does not exist", namespace)
return fmt.Errorf("namespace[%s] does not exist", namespace)
}
return bucket.Delete([]byte(key))
})
Expand All @@ -173,7 +173,7 @@ func (s *Storage) Delete(namespace, key string) error {
func (s *Storage) DeleteNamespace(namespace string) error {
return s.db.Update(func(tx *bolt.Tx) error {
if err := tx.DeleteBucket([]byte(namespace)); err != nil {
return errors.Wrapf(err, "could not delete namespace<%s>, n", namespace)
return errors.Wrapf(err, "could not delete namespace[%s], n", namespace)
}
return nil
})
Expand Down
4 changes: 2 additions & 2 deletions impl/pkg/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ func TestBoltDB_ReadWrite(t *testing.T) {
// delete value in a dhtNamespace that doesn't exist
err = db.Delete("bad", team2)
assert.Error(t, err)
assert.Contains(t, err.Error(), "namespace<bad> does not exist")
assert.Contains(t, err.Error(), "namespace[bad] does not exist")

// delete a dhtNamespace that doesn't exist
err = db.DeleteNamespace("bad")
assert.Contains(t, err.Error(), "could not delete namespace<bad>")
assert.Contains(t, err.Error(), "could not delete namespace[bad]")

// delete dhtNamespace
err = db.DeleteNamespace(namespace)
Expand Down

0 comments on commit d57574d

Please sign in to comment.