From d57574dd28377f3edce6773bf6926c1723bf024f Mon Sep 17 00:00:00 2001 From: Gabe <7622243+decentralgabe@users.noreply.github.com> Date: Tue, 5 Dec 2023 20:43:47 +0100 Subject: [PATCH] Update cache (#63) * update logging; set cache size limit * add size limit * testing * update swagger --- impl/config/config.go | 10 ++++++---- impl/config/config.toml | 1 + impl/docs/docs.go | 2 +- impl/docs/swagger.yaml | 12 ++++++------ impl/internal/did/client_test.go | 5 +++++ impl/pkg/dht/dht.go | 4 ++-- impl/pkg/server/pkarr.go | 26 +++++++++++++------------- impl/pkg/server/server.go | 4 ++-- impl/pkg/server/server_pkarr_test.go | 2 +- impl/pkg/service/pkarr.go | 21 +++++++++++---------- impl/pkg/storage/storage.go | 16 ++++++++-------- impl/pkg/storage/storage_test.go | 4 ++-- 12 files changed, 58 insertions(+), 49 deletions(-) diff --git a/impl/config/config.go b/impl/config/config.go index 97c37eab..d31ed494 100644 --- a/impl/config/config.go +++ b/impl/config/config.go @@ -56,8 +56,9 @@ type DHTServiceConfig struct { } type PKARRServiceConfig struct { - RepublishCRON string `toml:"republish_cron"` - CacheTTLSeconds int `toml:"cache_ttl_seconds"` + RepublishCRON string `toml:"republish_cron"` + CacheTTLSeconds int `toml:"cache_ttl_seconds"` + CacheSizeLimitMB int `toml:"cache_size_limit_mb"` } func GetDefaultConfig() Config { @@ -75,8 +76,9 @@ func GetDefaultConfig() Config { BootstrapPeers: GetDefaultBootstrapPeers(), }, PkarrConfig: PKARRServiceConfig{ - RepublishCRON: "0 */2 * * *", - CacheTTLSeconds: 600, + RepublishCRON: "0 */2 * * *", + CacheTTLSeconds: 600, + CacheSizeLimitMB: 2000, }, } } diff --git a/impl/config/config.toml b/impl/config/config.toml index 849f0520..1fe520d3 100644 --- a/impl/config/config.toml +++ b/impl/config/config.toml @@ -13,3 +13,4 @@ bootstrap_peers = ["router.magnets.im:6881", "router.bittorrent.com:6881", "dht. [pkarr] republish_cron = "0 */2 * * *" # every 2 hours cache_ttl_seconds = 600 # 10 minutes +cache_size_limit_mb = 2000 # 2GB diff --git a/impl/docs/docs.go b/impl/docs/docs.go index 0309e016..ab9a07fb 100644 --- a/impl/docs/docs.go +++ b/impl/docs/docs.go @@ -5,7 +5,7 @@ package docs import "github.com/swaggo/swag/v2" const docTemplate = `{ - "schemes": {{ marshal .Schemes }},"swagger":"2.0","info":{"description":"{{escape .Description}}","title":"{{.Title}}","contact":{"name":"TBD","url":"https://github.com/TBD54566975/did-dht-method/issues","email":"tbd-developer@squareup.com"},"license":{"name":"Apache 2.0","url":"http://www.apache.org/licenses/LICENSE-2.0.html"},"version":"{{.Version}}"},"host":"{{.Host}}","basePath":"{{.BasePath}}","paths":{"/health":{"get":{"description":"Health is a simple handler that always responds with a 200 OK","consumes":["application/json"],"produces":["application/json"],"tags":["Health"],"summary":"Health Check","responses":{"200":{"description":"OK","schema":{"$ref":"#/definitions/pkg_server.GetHealthCheckResponse"}}}}},"/{id}":{"get":{"description":"GetRecord a PKARR record from the DHT","consumes":["application/octet-stream"],"produces":["application/octet-stream"],"tags":["PKARR"],"summary":"GetRecord a PKARR record from the DHT","parameters":[{"type":"string","description":"ID to get","name":"id","in":"path","required":true}],"responses":{"200":{"description":"64 bytes sig, 8 bytes u64 big-endian seq, 0-1000 bytes of v.","schema":{"type":"array","items":{"type":"integer"}}},"400":{"description":"Bad request","schema":{"type":"string"}},"404":{"description":"Not found","schema":{"type":"string"}},"500":{"description":"Internal server error","schema":{"type":"string"}}}},"put":{"description":"PutRecord a PKARR record into the DHT","consumes":["application/octet-stream"],"tags":["PKARR"],"summary":"PutRecord a PKARR record into the DHT","parameters":[{"type":"string","description":"ID of the record to put","name":"id","in":"path","required":true},{"description":"64 bytes sig, 8 bytes u64 big-endian seq, 0-1000 bytes of v.","name":"request","in":"body","required":true,"schema":{"type":"array","items":{"type":"integer"}}}],"responses":{"200":{"description":"OK"},"400":{"description":"Bad request","schema":{"type":"string"}},"500":{"description":"Internal server error","schema":{"type":"string"}}}}}},"definitions":{"pkg_server.GetHealthCheckResponse":{"type":"object","properties":{"status":{"description":"Status is always equal to ` + "`" + `OK` + "`" + `.","type":"string"}}}}}` + "schemes": {{ marshal .Schemes }},"swagger":"2.0","info":{"description":"{{escape .Description}}","title":"{{.Title}}","contact":{"name":"TBD","url":"https://github.com/TBD54566975/did-dht-method/issues","email":"tbd-developer@squareup.com"},"license":{"name":"Apache 2.0","url":"http://www.apache.org/licenses/LICENSE-2.0.html"},"version":"{{.Version}}"},"host":"{{.Host}}","basePath":"{{.BasePath}}","paths":{"/health":{"get":{"description":"Health is a simple handler that always responds with a 200 OK","consumes":["application/json"],"produces":["application/json"],"tags":["Health"],"summary":"Health Check","responses":{"200":{"description":"OK","schema":{"$ref":"#/definitions/pkg_server.GetHealthCheckResponse"}}}}},"/{id}":{"get":{"description":"GetRecord a Pkarr record from the DHT","consumes":["application/octet-stream"],"produces":["application/octet-stream"],"tags":["Pkarr"],"summary":"GetRecord a Pkarr record from the DHT","parameters":[{"type":"string","description":"ID to get","name":"id","in":"path","required":true}],"responses":{"200":{"description":"64 bytes sig, 8 bytes u64 big-endian seq, 0-1000 bytes of v.","schema":{"type":"array","items":{"type":"integer"}}},"400":{"description":"Bad request","schema":{"type":"string"}},"404":{"description":"Not found","schema":{"type":"string"}},"500":{"description":"Internal server error","schema":{"type":"string"}}}},"put":{"description":"PutRecord a Pkarr record into the DHT","consumes":["application/octet-stream"],"tags":["Pkarr"],"summary":"PutRecord a Pkarr record into the DHT","parameters":[{"type":"string","description":"ID of the record to put","name":"id","in":"path","required":true},{"description":"64 bytes sig, 8 bytes u64 big-endian seq, 0-1000 bytes of v.","name":"request","in":"body","required":true,"schema":{"type":"array","items":{"type":"integer"}}}],"responses":{"200":{"description":"OK"},"400":{"description":"Bad request","schema":{"type":"string"}},"500":{"description":"Internal server error","schema":{"type":"string"}}}}}},"definitions":{"pkg_server.GetHealthCheckResponse":{"type":"object","properties":{"status":{"description":"Status is always equal to ` + "`" + `OK` + "`" + `.","type":"string"}}}}}` // SwaggerInfo holds exported Swagger Info so clients can modify it var SwaggerInfo = &swag.Spec{ diff --git a/impl/docs/swagger.yaml b/impl/docs/swagger.yaml index 1a3cd03c..1e18e6da 100644 --- a/impl/docs/swagger.yaml +++ b/impl/docs/swagger.yaml @@ -22,7 +22,7 @@ paths: get: consumes: - application/octet-stream - description: GetRecord a PKARR record from the DHT + description: GetRecord a Pkarr record from the DHT parameters: - description: ID to get in: path @@ -50,13 +50,13 @@ paths: description: Internal server error schema: type: string - summary: GetRecord a PKARR record from the DHT + summary: GetRecord a Pkarr record from the DHT tags: - - PKARR + - Pkarr put: consumes: - application/octet-stream - description: PutRecord a PKARR record into the DHT + description: PutRecord a Pkarr record into the DHT parameters: - description: ID of the record to put in: path @@ -82,9 +82,9 @@ paths: description: Internal server error schema: type: string - summary: PutRecord a PKARR record into the DHT + summary: PutRecord a Pkarr record into the DHT tags: - - PKARR + - Pkarr /health: get: consumes: diff --git a/impl/internal/did/client_test.go b/impl/internal/did/client_test.go index 2a0b96f0..5f00b678 100644 --- a/impl/internal/did/client_test.go +++ b/impl/internal/did/client_test.go @@ -2,6 +2,7 @@ package did import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -21,6 +22,7 @@ func TestClient(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, packet) + start := time.Now() bep44Put, err := dht.CreatePKARRPublishRequest(sk, *packet) assert.NoError(t, err) assert.NotEmpty(t, bep44Put) @@ -31,4 +33,7 @@ func TestClient(t *testing.T) { gotDID, _, err := client.GetDIDDocument(doc.ID) assert.NoError(t, err) assert.EqualValues(t, doc, gotDID) + + since := time.Since(start) + t.Logf("time to put and get: %s", since) } diff --git a/impl/pkg/dht/dht.go b/impl/pkg/dht/dht.go index d1f4b98a..f14a0b98 100644 --- a/impl/pkg/dht/dht.go +++ b/impl/pkg/dht/dht.go @@ -49,7 +49,7 @@ func (d *DHT) Get(ctx context.Context, key string) (*getput.GetResult, error) { } res, t, err := getput.Get(ctx, infohash.HashBytes(z32Decoded), d.Server, nil, nil) if err != nil { - return nil, errutil.LoggingNewErrorf("failed to get key<%s> from dht; tried %d nodes, got %d responses", key, t.NumAddrsTried, t.NumResponses) + return nil, errutil.LoggingNewErrorf("failed to get key[%s] from dht; tried %d nodes, got %d responses", key, t.NumAddrsTried, t.NumResponses) } return &res, nil } @@ -64,7 +64,7 @@ func (d *DHT) GetFull(ctx context.Context, key string) (*dhtint.FullGetResult, e } res, t, err := dhtint.Get(ctx, infohash.HashBytes(z32Decoded), d.Server, nil, nil) if err != nil { - return nil, errutil.LoggingNewErrorf("failed to get key<%s> from dht; tried %d nodes, got %d responses", key, t.NumAddrsTried, t.NumResponses) + return nil, errutil.LoggingNewErrorf("failed to get key[%s] from dht; tried %d nodes, got %d responses", key, t.NumAddrsTried, t.NumResponses) } return &res, nil } diff --git a/impl/pkg/server/pkarr.go b/impl/pkg/server/pkarr.go index 12bbab99..5c5a2e86 100644 --- a/impl/pkg/server/pkarr.go +++ b/impl/pkg/server/pkarr.go @@ -12,21 +12,21 @@ import ( "github.com/TBD54566975/did-dht-method/pkg/service" ) -// PKARRRouter is the router for the PKARR API -type PKARRRouter struct { +// PkarrRouter is the router for the Pkarr API +type PkarrRouter struct { service *service.PkarrService } -// NewPKARRRouter returns a new instance of the Relay router -func NewPKARRRouter(service *service.PkarrService) (*PKARRRouter, error) { - return &PKARRRouter{service: service}, nil +// NewPkarrRouter returns a new instance of the Relay router +func NewPkarrRouter(service *service.PkarrService) (*PkarrRouter, error) { + return &PkarrRouter{service: service}, nil } // GetRecord godoc // -// @Summary GetRecord a PKARR record from the DHT -// @Description GetRecord a PKARR record from the DHT -// @Tags PKARR +// @Summary GetRecord a Pkarr record from the DHT +// @Description GetRecord a Pkarr record from the DHT +// @Tags Pkarr // @Accept octet-stream // @Produce octet-stream // @Param id path string true "ID to get" @@ -35,7 +35,7 @@ func NewPKARRRouter(service *service.PkarrService) (*PKARRRouter, error) { // @Failure 404 {string} string "Not found" // @Failure 500 {string} string "Internal server error" // @Router /{id} [get] -func (r *PKARRRouter) GetRecord(c *gin.Context) { +func (r *PkarrRouter) GetRecord(c *gin.Context) { id := GetParam(c, IDParam) if id == nil || *id == "" { LoggingRespondErrMsg(c, "missing id param", http.StatusBadRequest) @@ -63,9 +63,9 @@ func (r *PKARRRouter) GetRecord(c *gin.Context) { // PutRecord godoc // -// @Summary PutRecord a PKARR record into the DHT -// @Description PutRecord a PKARR record into the DHT -// @Tags PKARR +// @Summary PutRecord a Pkarr record into the DHT +// @Description PutRecord a Pkarr record into the DHT +// @Tags Pkarr // @Accept octet-stream // @Param id path string true "ID of the record to put" // @Param request body []byte true "64 bytes sig, 8 bytes u64 big-endian seq, 0-1000 bytes of v." @@ -73,7 +73,7 @@ func (r *PKARRRouter) GetRecord(c *gin.Context) { // @Failure 400 {string} string "Bad request" // @Failure 500 {string} string "Internal server error" // @Router /{id} [put] -func (r *PKARRRouter) PutRecord(c *gin.Context) { +func (r *PkarrRouter) PutRecord(c *gin.Context) { id := GetParam(c, IDParam) if id == nil || *id == "" { LoggingRespondErrMsg(c, "missing id param", http.StatusBadRequest) diff --git a/impl/pkg/server/server.go b/impl/pkg/server/server.go index 0a15e67e..c09678cc 100644 --- a/impl/pkg/server/server.go +++ b/impl/pkg/server/server.go @@ -84,7 +84,7 @@ func setupLogger(level string) { logLevel, err := logrus.ParseLevel(level) if err != nil { - logrus.WithError(err).Errorf("could not parse log level<%s>, setting to info", level) + logrus.WithError(err).Errorf("could not parse log level[%s], setting to info", level) logrus.SetLevel(logrus.InfoLevel) } else { logrus.SetLevel(logLevel) @@ -114,7 +114,7 @@ func setupHandler(env config.Environment) *gin.Engine { // PkarrAPI sets up the relay API routes according to https://github.com/Nuhvi/pkarr/blob/main/design/relays.md func PkarrAPI(rg *gin.RouterGroup, service *service.PkarrService) error { - relayRouter, err := NewPKARRRouter(service) + relayRouter, err := NewPkarrRouter(service) if err != nil { return util.LoggingErrorMsg(err, "could not instantiate relay router") } diff --git a/impl/pkg/server/server_pkarr_test.go b/impl/pkg/server/server_pkarr_test.go index 9d11268c..dcd281f7 100644 --- a/impl/pkg/server/server_pkarr_test.go +++ b/impl/pkg/server/server_pkarr_test.go @@ -21,7 +21,7 @@ import ( func TestPKARRRouter(t *testing.T) { pkarrSvc := testPKARRService(t) - pkarrRouter, err := NewPKARRRouter(&pkarrSvc) + pkarrRouter, err := NewPkarrRouter(&pkarrSvc) require.NoError(t, err) require.NotEmpty(t, pkarrRouter) diff --git a/impl/pkg/service/pkarr.go b/impl/pkg/service/pkarr.go index bac3003c..16e79e2a 100644 --- a/impl/pkg/service/pkarr.go +++ b/impl/pkg/service/pkarr.go @@ -43,9 +43,10 @@ func NewPkarrService(cfg *config.Config, db *storage.Storage) (*PkarrService, er } // create and start cache and scheduler - ttl := time.Duration(cfg.PkarrConfig.CacheTTLSeconds) * time.Second - // TODO(gabe): consider setting size limits on the cache - cache, err := bigcache.New(context.Background(), bigcache.DefaultConfig(ttl)) + cacheTTL := time.Duration(cfg.PkarrConfig.CacheTTLSeconds) * time.Second + cacheConfig := bigcache.DefaultConfig(cacheTTL) + cacheConfig.HardMaxCacheSize = cfg.PkarrConfig.CacheSizeLimitMB + cache, err := bigcache.New(context.Background(), cacheConfig) if err != nil { return nil, util.LoggingErrorMsg(err, "failed to instantiate cache") } @@ -104,7 +105,6 @@ func (s *PkarrService) PublishPkarr(ctx context.Context, id string, request Publ } // write to db and cache - // TODO(gabe): if putting to the DHT fails we should note that in the db and retry later record := request.toRecord() if err := s.db.WriteRecord(record); err != nil { return err @@ -122,6 +122,7 @@ func (s *PkarrService) PublishPkarr(ctx context.Context, id string, request Publ } // return here and put it in the DHT asynchronously + // TODO(gabe): consider a background process to monitor failures go s.dht.Put(ctx, bep44.Put{ V: request.V, K: &request.K, @@ -173,13 +174,13 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*GetPkarrRespon if err != nil { // try to resolve from storage before returning and error // if we detect this and have the record we should republish to the DHT - logrus.WithError(err).Warnf("failed to get pkarr<%s> from dht, attempting to resolve from storage", id) + logrus.WithError(err).Warnf("failed to get pkarr record[%s] from dht, attempting to resolve from storage", id) record, err := s.db.ReadRecord(id) if err != nil || record == nil { - logrus.WithError(err).Errorf("failed to resolve pkarr<%s> from storage", id) + logrus.WithError(err).Errorf("failed to resolve pkarr record[%s] from storage", id) return nil, err } - logrus.Debugf("resolved pkarr<%s> from storage", id) + logrus.Debugf("resolved pkarr record[%s] from storage", id) return fromPkarrRecord(*record) } @@ -201,10 +202,10 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*GetPkarrRespon // add the record to cache, do it here to avoid duplicate calculations recordBytes, err := json.Marshal(resp) if err != nil { - return nil, util.LoggingErrorMsgf(err, "failed to marshal pkarr record<%s> for cache", id) + return nil, util.LoggingErrorMsgf(err, "failed to marshal pkarr record[%s] for cache", id) } if err = s.cache.Set(id, recordBytes); err != nil { - return nil, util.LoggingErrorMsgf(err, "failed to set pkarr record<%s> in cache", id) + return nil, util.LoggingErrorMsgf(err, "failed to set pkarr record[%s] in cache", id) } return &resp, nil @@ -221,7 +222,7 @@ func (s *PkarrService) republish() { logrus.Info("No records to republish") return } - logrus.Infof("Republishing %d record(s)", len(allRecords)) + logrus.Infof("Republishing [%d] record(s)", len(allRecords)) errCnt := 0 for _, record := range allRecords { put, err := recordToBEP44Put(record) diff --git a/impl/pkg/storage/storage.go b/impl/pkg/storage/storage.go index e38a5b0f..6b458df3 100644 --- a/impl/pkg/storage/storage.go +++ b/impl/pkg/storage/storage.go @@ -82,7 +82,7 @@ func (s *Storage) Read(namespace, key string) ([]byte, error) { err := s.db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket([]byte(namespace)) if bucket == nil { - logrus.Infof("namespace<%s> does not exist", namespace) + logrus.Infof("namespace[%s] does not exist", namespace) return nil } result = bucket.Get([]byte(key)) @@ -97,7 +97,7 @@ func (s *Storage) ReadPrefix(namespace, prefix string) (map[string][]byte, error err := s.db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket([]byte(namespace)) if bucket == nil { - errMsg := fmt.Sprintf("namespace<%s> does not exist", namespace) + errMsg := fmt.Sprintf("namespace[%s] does not exist", namespace) logrus.Error(errMsg) return errors.New(errMsg) } @@ -115,7 +115,7 @@ func (s *Storage) ReadAll(namespace string) (map[string][]byte, error) { err := s.db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket([]byte(namespace)) if bucket == nil { - logrus.Warnf("namespace<%s> does not exist", namespace) + logrus.Warnf("namespace[%s] does not exist", namespace) return nil } cursor := bucket.Cursor() @@ -132,7 +132,7 @@ func (s *Storage) ReadAllKeys(namespace string) ([]string, error) { err := s.db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket([]byte(namespace)) if bucket == nil { - logrus.Warnf("namespace<%s> does not exist", namespace) + logrus.Warnf("namespace[%s] does not exist", namespace) return nil } cursor := bucket.Cursor() @@ -148,10 +148,10 @@ func (s *Storage) Update(namespace string, key string, value []byte) error { return s.db.Update(func(tx *bolt.Tx) error { bucket := tx.Bucket([]byte(namespace)) if bucket == nil { - return fmt.Errorf("namespace<%s> does not exist", namespace) + return fmt.Errorf("namespace[%s] does not exist", namespace) } if bucket.Get([]byte(key)) == nil { - return fmt.Errorf("key<%s> does not exist", key) + return fmt.Errorf("key[%s] does not exist", key) } if err := bucket.Put([]byte(key), value); err != nil { return err @@ -164,7 +164,7 @@ func (s *Storage) Delete(namespace, key string) error { return s.db.Update(func(tx *bolt.Tx) error { bucket := tx.Bucket([]byte(namespace)) if bucket == nil { - return fmt.Errorf("namespace<%s> does not exist", namespace) + return fmt.Errorf("namespace[%s] does not exist", namespace) } return bucket.Delete([]byte(key)) }) @@ -173,7 +173,7 @@ func (s *Storage) Delete(namespace, key string) error { func (s *Storage) DeleteNamespace(namespace string) error { return s.db.Update(func(tx *bolt.Tx) error { if err := tx.DeleteBucket([]byte(namespace)); err != nil { - return errors.Wrapf(err, "could not delete namespace<%s>, n", namespace) + return errors.Wrapf(err, "could not delete namespace[%s], n", namespace) } return nil }) diff --git a/impl/pkg/storage/storage_test.go b/impl/pkg/storage/storage_test.go index 58a48844..63374fea 100644 --- a/impl/pkg/storage/storage_test.go +++ b/impl/pkg/storage/storage_test.go @@ -73,11 +73,11 @@ func TestBoltDB_ReadWrite(t *testing.T) { // delete value in a dhtNamespace that doesn't exist err = db.Delete("bad", team2) assert.Error(t, err) - assert.Contains(t, err.Error(), "namespace does not exist") + assert.Contains(t, err.Error(), "namespace[bad] does not exist") // delete a dhtNamespace that doesn't exist err = db.DeleteNamespace("bad") - assert.Contains(t, err.Error(), "could not delete namespace") + assert.Contains(t, err.Error(), "could not delete namespace[bad]") // delete dhtNamespace err = db.DeleteNamespace(namespace)