Skip to content

Commit

Permalink
Attempt to address repeated puts (#170)
Browse files Browse the repository at this point in the history
* update

* update

* remove test
  • Loading branch information
decentralgabe authored Apr 9, 2024
1 parent f08a311 commit 6a7c36c
Show file tree
Hide file tree
Showing 21 changed files with 214 additions and 75 deletions.
2 changes: 1 addition & 1 deletion impl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,4 @@ docker run \
### Postgres

To use a postgres database as the storage backend, set configuration option `storage_uri` to a `postgres://` URI with
the database connection string. The schema will be created or updated as needed while the program starts.
the database connection string. The schema will be created or updated as needed while the program starts.
3 changes: 1 addition & 2 deletions impl/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ func run() error {
}

// create a channel of buffer size 1 to handle shutdown.
// buffer's size is 1 in order to ignore any additional ctrl+c
// spamming.
// buffer's size is 1 in order to ignore any additional ctrl+c spamming.
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)

Expand Down
14 changes: 8 additions & 6 deletions impl/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ type DHTServiceConfig struct {
}

type PKARRServiceConfig struct {
RepublishCRON string `toml:"republish_cron"`
CacheTTLSeconds int `toml:"cache_ttl_seconds"`
CacheSizeLimitMB int `toml:"cache_size_limit_mb"`
RepublishCRON string `toml:"republish_cron"`
CacheTTLSeconds int `toml:"cache_ttl_seconds"`
CacheSizeLimitMB int `toml:"cache_size_limit_mb"`
PutTimeoutSeconds int `toml:"put_timeout_seconds"`
}

type LogConfig struct {
Expand All @@ -81,9 +82,10 @@ func GetDefaultConfig() Config {
BootstrapPeers: GetDefaultBootstrapPeers(),
},
PkarrConfig: PKARRServiceConfig{
RepublishCRON: "0 */2 * * *",
CacheTTLSeconds: 600,
CacheSizeLimitMB: 500,
RepublishCRON: "0 */2 * * *",
CacheTTLSeconds: 600,
CacheSizeLimitMB: 500,
PutTimeoutSeconds: 5,
},
Log: LogConfig{
Level: logrus.InfoLevel.String(),
Expand Down
3 changes: 2 additions & 1 deletion impl/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ bootstrap_peers = ["router.magnets.im:6881", "router.bittorrent.com:6881", "dht.
[pkarr]
republish_cron = "0 */2 * * *" # every 2 hours
cache_ttl_seconds = 600 # 10 minutes
cache_size_limit_mb = 500 # 512 MB
cache_size_limit_mb = 1000 # 1000 MB
put_timeout_seconds = 5 # 5 seconds before puts time out
6 changes: 3 additions & 3 deletions impl/integrationtest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func run(server string) {
continue
}

if err := get(ctx, server, suffix); err != nil {
if err = get(ctx, server, suffix); err != nil {
logrus.WithError(err).Error("error making GET request")
continue
}
Expand All @@ -71,7 +71,7 @@ func put(ctx context.Context, server string) (string, error) {
return "", err
}

if err := doRequest(ctx, req); err != nil {
if err = doRequest(ctx, req); err != nil {
return "", err
}

Expand All @@ -84,7 +84,7 @@ func get(ctx context.Context, server string, suffix string) error {
return err
}

if err := doRequest(ctx, req); err != nil {
if err = doRequest(ctx, req); err != nil {
return err
}

Expand Down
14 changes: 12 additions & 2 deletions impl/pkg/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net"
"testing"
"time"

errutil "github.com/TBD54566975/ssi-sdk/util"
"github.com/anacrolix/dht/v2"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/anacrolix/torrent/types/infohash"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"

dhtint "github.com/TBD54566975/did-dht-method/internal/dht"
"github.com/TBD54566975/did-dht-method/internal/util"
Expand All @@ -29,6 +31,9 @@ func NewDHT(bootstrapPeers []string) (*DHT, error) {
logrus.WithField("bootstrap_peers", len(bootstrapPeers)).Info("initializing DHT")

c := dht.NewDefaultServerConfig()
// change default expire to 24 hours
c.Exp = time.Hour * 24
c.NoSecurity = false
conn, err := net.ListenPacket("udp", "0.0.0.0:6881")
if err != nil {
return nil, errutil.LoggingErrorMsg(err, "failed to listen on udp port 6881")
Expand All @@ -37,6 +42,8 @@ func NewDHT(bootstrapPeers []string) (*DHT, error) {
c.Logger = log.NewLogger().WithFilterLevel(log.Debug)
c.Logger.SetHandlers(logHandler{})
c.StartingNodes = func() ([]dht.Addr, error) { return dht.ResolveHostPorts(bootstrapPeers) }
// set up rate limiter - 100 requests per second, 500 requests burst
c.SendLimiter = rate.NewLimiter(100, 500)
s, err := dht.NewServer(c)
if err != nil {
return nil, errutil.LoggingErrorMsg(err, "failed to create dht server")
Expand Down Expand Up @@ -84,14 +91,17 @@ func (d *DHT) Put(ctx context.Context, request bep44.Put) (string, error) {
logrus.Warn("no nodes available in the DHT for publishing")
}

key := util.Z32Encode(request.K[:])
t, err := getput.Put(ctx, request.Target(), d.Server, nil, func(int64) bep44.Put {
return request
})
if err != nil {
if t == nil {
return "", errutil.LoggingNewErrorf("failed to put key into dht: %v", err)
return "", errutil.LoggingNewErrorf("failed to put key[%s] into dht: %v", key, err)
}
return "", errutil.LoggingNewErrorf("failed to put key into dht, tried %d nodes, got %d responses", t.NumAddrsTried, t.NumResponses)
return "", errutil.LoggingNewErrorf("failed to put key[%s] into dht, tried %d nodes, got %d responses", key, t.NumAddrsTried, t.NumResponses)
} else {
logrus.WithField("key", key).Debug("successfully put key into dht")
}
return util.Z32Encode(request.K[:]), nil
}
Expand Down
39 changes: 33 additions & 6 deletions impl/pkg/pkarr/record.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,38 @@
package pkarr

import (
"bytes"
"crypto/sha256"
"encoding/base64"
"errors"
"fmt"

"github.com/TBD54566975/ssi-sdk/util"
"github.com/anacrolix/dht/v2/bep44"
"github.com/anacrolix/torrent/bencode"
"github.com/goccy/go-json"
"github.com/tv42/zbase32"
)

type Response struct {
V []byte `validate:"required"`
Seq int64 `validate:"required"`
Sig [64]byte `validate:"required"`
}

// Equals returns true if the response is equal to the other response
func (r Response) Equals(other Response) bool {
return r.Seq == other.Seq && bytes.Equal(r.V, other.V) && r.Sig == other.Sig
}

type Record struct {
Value []byte `json:"v" validate:"required"`
Key [32]byte `json:"k" validate:"required"`
Signature [64]byte `json:"sig" validate:"required"`
SequenceNumber int64 `json:"seq" validate:"required"`
}

type Response struct {
V []byte `validate:"required"`
Seq int64 `validate:"required"`
Sig [64]byte `validate:"required"`
}

// NewRecord returns a new Record with the given key, value, signature, and sequence number
func NewRecord(k []byte, v []byte, sig []byte, seq int64) (*Record, error) {
record := Record{SequenceNumber: seq}

Expand Down Expand Up @@ -67,6 +76,7 @@ func (r Record) IsValid() error {
return nil
}

// Response returns the record as a Response
func (r Record) Response() Response {
return Response{
V: r.Value,
Expand All @@ -75,6 +85,7 @@ func (r Record) Response() Response {
}
}

// BEP44 returns the record as a BEP44 Put message
func (r Record) BEP44() bep44.Put {
return bep44.Put{
V: r.Value,
Expand All @@ -84,11 +95,27 @@ func (r Record) BEP44() bep44.Put {
}
}

// String returns a string representation of the record
func (r Record) String() string {
e := base64.RawURLEncoding
return fmt.Sprintf("pkarr.Record{K=%s V=%s Sig=%s Seq=%d}", zbase32.EncodeToString(r.Key[:]), e.EncodeToString(r.Value), e.EncodeToString(r.Signature[:]), r.SequenceNumber)
}

// ID returns the base32 encoded key as a string
func (r Record) ID() string {
return zbase32.EncodeToString(r.Key[:])
}

// Hash returns the SHA256 hash of the record as a string
func (r Record) Hash() (string, error) {
recordBytes, err := json.Marshal(r)
if err != nil {
return "", err
}
return string(sha256.New().Sum(recordBytes)), nil
}

// RecordFromBEP44 returns a Record from a BEP44 Put message
func RecordFromBEP44(putMsg *bep44.Put) Record {
return Record{
Key: *putMsg.K,
Expand Down
8 changes: 8 additions & 0 deletions impl/pkg/server/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package server

import (
"context"
"fmt"
"net/http"
"os"
Expand Down Expand Up @@ -44,6 +45,13 @@ func NewServer(cfg *config.Config, shutdown chan os.Signal, d *dht.DHT) (*Server
return nil, util.LoggingErrorMsg(err, "failed to instantiate storage")
}

recordCnt, err := db.RecordCount(context.Background())
if err != nil {
logrus.WithError(err).Error("failed to get record count")
} else {
logrus.WithField("record_count", recordCnt).Info("storage instantiated with record count")
}

pkarrService, err := service.NewPkarrService(cfg, db, d)
if err != nil {
return nil, util.LoggingErrorMsg(err, "could not instantiate pkarr service")
Expand Down
35 changes: 24 additions & 11 deletions impl/pkg/service/pkarr.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,23 @@ func (s *PkarrService) PublishPkarr(ctx context.Context, id string, record pkarr
return err
}

// check if the message is already in the cache
if got, err := s.cache.Get(id); err == nil {
var resp pkarr.Response
if err = json.Unmarshal(got, &resp); err == nil && record.Response().Equals(resp) {
logrus.WithField("record_id", id).Debug("resolved pkarr record from cache with matching response")
return nil
}
}

// write to db and cache
if err := s.db.WriteRecord(ctx, record); err != nil {
return err
}

recordBytes, err := json.Marshal(record.Response())
if err != nil {
return err
}

if err = s.cache.Set(id, recordBytes); err != nil {
return err
}
Expand All @@ -90,11 +97,11 @@ func (s *PkarrService) PublishPkarr(ctx context.Context, id string, record pkarr
// TODO(gabe): consider a background process to monitor failures
go func() {
// Create a new context with a timeout so that the parent context does not cancel the put
putCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
putCtx, cancel := context.WithTimeout(context.Background(), time.Duration(s.cfg.PkarrConfig.PutTimeoutSeconds)*time.Second)
defer cancel()

if _, err = s.dht.Put(putCtx, record.BEP44()); err != nil {
logrus.WithError(err).Error("error from dht.Put")
logrus.WithError(err).Errorf("error from dht.Put for record: %s", id)
}
}()

Expand All @@ -109,12 +116,11 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response
// first do a cache lookup
if got, err := s.cache.Get(id); err == nil {
var resp pkarr.Response
err = json.Unmarshal(got, &resp)
if err == nil {
if err = json.Unmarshal(got, &resp); err == nil {
logrus.WithField("record_id", id).Debug("resolved pkarr record from cache")
return &resp, nil
}
logrus.WithError(err).WithField("record", id).Warn("failed to unmarshal pkarr record from cache, falling back to dht")
logrus.WithError(err).WithField("record", id).Warn("failed to get pkarr record from cache, falling back to dht")
}

// next do a dht lookup
Expand Down Expand Up @@ -182,9 +188,15 @@ func (s *PkarrService) republish() {
ctx, span := telemetry.GetTracer().Start(context.Background(), "PkarrService.republish")
defer span.End()

recordCnt, err := s.db.RecordCount(ctx)
if err != nil {
logrus.WithError(err).Error("failed to get record count")
} else {
logrus.WithField("record_count", recordCnt).Info("republishing records")
}

var nextPageToken []byte
var allRecords []pkarr.Record
var err error
errCnt := 0
successCnt := 0
for {
Expand All @@ -199,12 +211,13 @@ func (s *PkarrService) republish() {
return
}

logrus.WithField("record_count", len(allRecords)).Info("Republishing record")
logrus.WithField("record_count", len(allRecords)).Info("republishing records in batch")

for _, record := range allRecords {
logrus.Infof("Republishing record: %s", zbase32.EncodeToString(record.Key[:]))
recordID := zbase32.EncodeToString(record.Key[:])
logrus.Debugf("republishing record: %s", recordID)
if _, err = s.dht.Put(ctx, record.BEP44()); err != nil {
logrus.WithError(err).Error("failed to republish record")
logrus.WithError(err).Errorf("failed to republish record: %s", recordID)
errCnt++
continue
}
Expand Down
10 changes: 5 additions & 5 deletions impl/pkg/service/pkarr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"github.com/TBD54566975/did-dht-method/pkg/storage"
)

func TestPKARRService(t *testing.T) {
svc := newPKARRService(t, "a")
func TestPkarrService(t *testing.T) {
svc := newPkarrService(t, "a")

t.Run("test put bad record", func(t *testing.T) {
err := svc.PublishPkarr(context.Background(), "", pkarr.Record{})
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestPKARRService(t *testing.T) {
}

func TestDHT(t *testing.T) {
svc1 := newPKARRService(t, "b")
svc1 := newPkarrService(t, "b")

// create and publish a record to service1
sk, doc, err := did.GenerateDIDDHT(did.CreateDIDDHTOpts{})
Expand All @@ -155,7 +155,7 @@ func TestDHT(t *testing.T) {
assert.Equal(t, putMsg.Seq, got.Seq)

// create service2 with service1 as a bootstrap peer
svc2 := newPKARRService(t, "c", anacrolixdht.NewAddr(svc1.dht.Addr()))
svc2 := newPkarrService(t, "c", anacrolixdht.NewAddr(svc1.dht.Addr()))

// get the record via service2
gotFrom2, err := svc2.GetPkarr(context.Background(), suffix)
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestNoConfig(t *testing.T) {
assert.Nil(t, svc)
}

func newPKARRService(t *testing.T, id string, bootstrapPeers ...anacrolixdht.Addr) PkarrService {
func newPkarrService(t *testing.T, id string, bootstrapPeers ...anacrolixdht.Addr) PkarrService {
defaultConfig := config.GetDefaultConfig()

db, err := storage.NewStorage(fmt.Sprintf("bolt://diddht-test-%s.db", id))
Expand Down
Loading

0 comments on commit 6a7c36c

Please sign in to comment.