From 3652be89c92b920ba427d2592587e695691e8f72 Mon Sep 17 00:00:00 2001 From: Joel Smith Date: Mon, 16 Sep 2024 11:41:45 -0500 Subject: [PATCH] Fix incorrectly merged db_test.go --- node/pkg/db/db_test.go | 368 +++++++++++++++++++++++++++++------------ 1 file changed, 264 insertions(+), 104 deletions(-) diff --git a/node/pkg/db/db_test.go b/node/pkg/db/db_test.go index e58f7dfc04..543d1b6cd6 100644 --- a/node/pkg/db/db_test.go +++ b/node/pkg/db/db_test.go @@ -1,130 +1,290 @@ -package processor +package db import ( - "encoding/hex" - "time" - - "github.com/mr-tron/base58" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" + "bytes" + "crypto/ecdsa" + "crypto/rand" + "fmt" + math_rand "math/rand" + "os" + "runtime" + "sync" + "sync/atomic" - ethCommon "github.com/ethereum/go-ethereum/common" + "github.com/dgraph-io/badger/v3" "github.com/ethereum/go-ethereum/crypto" + "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "github.com/certusone/wormhole/node/pkg/common" - "github.com/wormhole-foundation/wormhole/sdk/vaa" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -var ( - // SECURITY: source_chain/target_chain are untrusted uint8 values. An attacker could cause a maximum of 255**2 label - // pairs to be created, which is acceptable. +func getVAA() vaa.VAA { + return getVAAWithSeqNum(1) +} - messagesObservedTotal = promauto.NewCounterVec( - prometheus.CounterOpts{ - Name: "wormhole_message_observations_total", - Help: "Total number of messages observed", - }, - []string{"emitter_chain"}) -) +func getVAAWithSeqNum(seqNum uint64) vaa.VAA { + var payload = []byte{97, 97, 97, 97, 97, 97} + var governanceEmitter = vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4} + + return vaa.VAA{ + Version: uint8(1), + GuardianSetIndex: uint32(1), + Signatures: nil, + Timestamp: time.Unix(0, 0), + Nonce: uint32(1), + Sequence: seqNum, + ConsistencyLevel: uint8(32), + EmitterChain: vaa.ChainIDSolana, + EmitterAddress: governanceEmitter, + Payload: payload, + } +} + +// Testing the expected default behavior of a CreateGovernanceVAA +func TestVaaIDFromString(t *testing.T) { + vaaIdString := "1/0000000000000000000000000000000000000000000000000000000000000004/1" + vaaID, _ := VaaIDFromString(vaaIdString) + expectAddr := vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4} + + assert.Equal(t, vaa.ChainIDSolana, vaaID.EmitterChain) + assert.Equal(t, expectAddr, vaaID.EmitterAddress) + assert.Equal(t, uint64(1), vaaID.Sequence) +} + +func TestVaaIDFromVAA(t *testing.T) { + testVaa := getVAA() + vaaID := VaaIDFromVAA(&testVaa) + expectAddr := vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4} + + assert.Equal(t, vaa.ChainIDSolana, vaaID.EmitterChain) + assert.Equal(t, expectAddr, vaaID.EmitterAddress) + assert.Equal(t, uint64(1), vaaID.Sequence) +} + +func TestBytes(t *testing.T) { + vaaIdString := "1/0000000000000000000000000000000000000000000000000000000000000004/1" + vaaID, _ := VaaIDFromString(vaaIdString) + expected := []byte{0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x2f, 0x31, 0x2f, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x34, 0x2f, 0x31} + + assert.Equal(t, expected, vaaID.Bytes()) +} + +func TestEmitterPrefixBytesWithChainIDAndAddress(t *testing.T) { + vaaIdString := "1/0000000000000000000000000000000000000000000000000000000000000004/1" + vaaID, _ := VaaIDFromString(vaaIdString) + expected := []byte{0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x2f, 0x31, 0x2f, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x34} + + assert.Equal(t, expected, vaaID.EmitterPrefixBytes()) +} + +func TestEmitterPrefixBytesWithOnlyChainID(t *testing.T) { + vaaID := VAAID{EmitterChain: vaa.ChainID(26)} + assert.Equal(t, []byte("signed/26"), vaaID.EmitterPrefixBytes()) +} + +func TestStoreSignedVAAUnsigned(t *testing.T) { + dbPath := t.TempDir() + db := OpenDb(zap.NewNop(), &dbPath) + defer db.Close() + defer os.Remove(dbPath) + + testVaa := getVAA() + + // Should panic because the VAA is not signed + assert.Panics(t, func() { db.StoreSignedVAA(&testVaa) }, "The code did not panic") //nolint:errcheck +} + +func TestStoreSignedVAASigned(t *testing.T) { + dbPath := t.TempDir() + db := OpenDb(zap.NewNop(), &dbPath) + defer db.Close() + defer os.Remove(dbPath) + + testVaa := getVAA() + + privKey, _ := ecdsa.GenerateKey(crypto.S256(), rand.Reader) + testVaa.AddSignature(privKey, 0) + + err2 := db.StoreSignedVAA(&testVaa) + assert.NoError(t, err2) +} + +func TestStoreSignedVAABatch(t *testing.T) { + dbPath := t.TempDir() + db := OpenDb(zap.NewNop(), &dbPath) + defer db.Close() + defer os.Remove(dbPath) -// handleMessage processes a message received from a chain and instantiates our deterministic copy of the VAA. An -// event may be received multiple times and must be handled in an idempotent fashion. -func (p *Processor) handleMessage(k *common.MessagePublication) { - if p.gs == nil { - p.logger.Warn("dropping observation since we haven't initialized our guardian set yet", - zap.String("message_id", k.MessageIDString()), - zap.Uint32("nonce", k.Nonce), - zap.Stringer("txhash", k.TxHash), - zap.Time("timestamp", k.Timestamp), - ) - return + privKey, err := ecdsa.GenerateKey(crypto.S256(), rand.Reader) + require.NoError(t, err) + + require.Less(t, int64(0), db.db.MaxBatchCount()) // In testing this was 104857. + require.Less(t, int64(0), db.db.MaxBatchSize()) // In testing this was 10066329. + + // Make sure we exceed the max batch size. + numVAAs := uint64(db.db.MaxBatchCount() + 1) + + // Build the VAA batch. + vaaBatch := make([]*vaa.VAA, 0, numVAAs) + for seqNum := uint64(0); seqNum < numVAAs; seqNum++ { + v := getVAAWithSeqNum(seqNum) + v.AddSignature(privKey, 0) + vaaBatch = append(vaaBatch, &v) } - messagesObservedTotal.WithLabelValues(k.EmitterChain.String()).Inc() - - // All nodes will create the exact same VAA and sign its digest. - // Consensus is established on this digest. - - v := &VAA{ - VAA: vaa.VAA{ - Version: vaa.SupportedVAAVersion, - GuardianSetIndex: p.gs.Index, - Signatures: nil, - Timestamp: k.Timestamp, - Nonce: k.Nonce, - EmitterChain: k.EmitterChain, - EmitterAddress: k.EmitterAddress, - Payload: k.Payload, - Sequence: k.Sequence, - ConsistencyLevel: k.ConsistencyLevel, - }, - Unreliable: k.Unreliable, - Reobservation: k.IsReobservation, + // Store the batch in the database. + err = db.StoreSignedVAABatch(vaaBatch) + require.NoError(t, err) + + // Verify all the VAAs are in the database. + for _, v := range vaaBatch { + storedBytes, err := db.GetSignedVAABytes(*VaaIDFromVAA(v)) + require.NoError(t, err) + + origBytes, err := v.Marshal() + require.NoError(t, err) + + assert.True(t, bytes.Equal(origBytes, storedBytes)) + } + + // Verify that updates work as well by tweaking the VAAs and rewriting them. + for _, v := range vaaBatch { + v.Nonce += 1 } - // Generate digest of the unsigned VAA. - digest := v.SigningDigest() - hash := hex.EncodeToString(digest.Bytes()) + // Store the updated batch in the database. + err = db.StoreSignedVAABatch(vaaBatch) + require.NoError(t, err) - // Sign the digest using our node's guardian key. - signature, err := crypto.Sign(digest.Bytes(), p.gk) - if err != nil { - panic(err) + // Verify all the updated VAAs are in the database. + for _, v := range vaaBatch { + storedBytes, err := db.GetSignedVAABytes(*VaaIDFromVAA(v)) + require.NoError(t, err) + + origBytes, err := v.Marshal() + require.NoError(t, err) + + assert.True(t, bytes.Equal(origBytes, storedBytes)) + } +} + +func TestGetSignedVAABytes(t *testing.T) { + dbPath := t.TempDir() + db := OpenDb(zap.NewNop(), &dbPath) + defer db.Close() + defer os.Remove(dbPath) + + testVaa := getVAA() + + vaaID := VaaIDFromVAA(&testVaa) + + privKey, _ := ecdsa.GenerateKey(crypto.S256(), rand.Reader) + testVaa.AddSignature(privKey, 0) + + // Store full VAA + err2 := db.StoreSignedVAA(&testVaa) + assert.NoError(t, err2) + + // Retrieve it using vaaID + vaaBytes, err2 := db.GetSignedVAABytes(*vaaID) + assert.NoError(t, err2) + + testVaaBytes, err3 := testVaa.Marshal() + assert.NoError(t, err3) + + assert.Equal(t, testVaaBytes, vaaBytes) +} + +func TestFindEmitterSequenceGap(t *testing.T) { + dbPath := t.TempDir() + db := OpenDb(zap.NewNop(), &dbPath) + defer db.Close() + defer os.Remove(dbPath) + + testVaa := getVAA() + + vaaID := VaaIDFromVAA(&testVaa) + + privKey, _ := ecdsa.GenerateKey(crypto.S256(), rand.Reader) + testVaa.AddSignature(privKey, 0) + + // Store full VAA + err2 := db.StoreSignedVAA(&testVaa) + assert.NoError(t, err2) + + resp, firstSeq, lastSeq, err := db.FindEmitterSequenceGap(*vaaID) + + assert.Equal(t, []uint64{0x0}, resp) + assert.Equal(t, uint64(0x0), firstSeq) + assert.Equal(t, uint64(0x1), lastSeq) + assert.NoError(t, err) +} + +// BenchmarkVaaLookup benchmarks db.GetSignedVAABytes +// You need to set the environment variable WH_DBPATH to a path with a populated BadgerDB. +// You may want to play with the CONCURRENCY parameter. +func BenchmarkVaaLookup(b *testing.B) { + CONCURRENCY := runtime.NumCPU() + dbPath := os.Getenv("WH_DBPATH") + require.NotEqual(b, dbPath, "") + + // open DB + optionsDB := badger.DefaultOptions(dbPath) + optionsDB.Logger = nil + badgerDb, err := badger.Open(optionsDB) + require.NoError(b, err) + db := &Database{ + db: badgerDb, } - shouldPublishImmediately := p.shouldPublishImmediately(&v.VAA) - - if p.logger.Core().Enabled(zapcore.DebugLevel) { - p.logger.Debug("observed and signed confirmed message publication", - zap.String("message_id", k.MessageIDString()), - zap.Stringer("txhash", k.TxHash), - zap.String("txhash_b58", base58.Encode(k.TxHash.Bytes())), - zap.String("hash", hash), - zap.Uint32("nonce", k.Nonce), - zap.Time("timestamp", k.Timestamp), - zap.Uint8("consistency_level", k.ConsistencyLevel), - zap.String("signature", hex.EncodeToString(signature)), - zap.Bool("shouldPublishImmediately", shouldPublishImmediately), - zap.Bool("isReobservation", k.IsReobservation), - ) + if err != nil { + b.Error("failed to open database") } + defer db.Close() + + vaaIds := make(chan *VAAID, b.N) - // Broadcast the signature. - ourObs, msg := p.broadcastSignature(v.MessageID(), k.TxHash.Bytes(), digest, signature, shouldPublishImmediately) + for i := 0; i < b.N; i++ { + randId := math_rand.Intn(250000) //nolint + randId = 250000 - (i / 18) + vaaId, err := VaaIDFromString(fmt.Sprintf("4/000000000000000000000000b6f6d86a8f9879a9c87f643768d9efc38c1da6e7/%d", randId)) + assert.NoError(b, err) + vaaIds <- vaaId + } - // Indicate that we observed this one. - observationsReceivedTotal.Inc() - observationsReceivedByGuardianAddressTotal.WithLabelValues(p.ourAddr.Hex()).Inc() + b.ResetTimer() - // Get / create our state entry. - s := p.state.signatures[hash] - if s == nil { - s = &state{ - firstObserved: time.Now(), - nextRetry: time.Now().Add(nextRetryDuration(0)), - signatures: map[ethCommon.Address][]byte{}, - source: "loopback", - } + // actual timed code + var errCtr atomic.Int32 + var wg sync.WaitGroup - p.state.signatures[hash] = s + for i := 0; i < CONCURRENCY; i++ { + wg.Add(1) + go func() { + for { + select { + case vaaId := <-vaaIds: + _, err = db.GetSignedVAABytes(*vaaId) + if err != nil { + fmt.Printf("error retrieving %s/%s/%d: %s\n", vaaId.EmitterChain, vaaId.EmitterAddress, vaaId.Sequence, err) + errCtr.Add(1) + } + default: + wg.Done() + return + } + } + }() } - // Update our state. - s.ourObservation = v - s.txHash = k.TxHash.Bytes() - s.source = v.GetEmitterChain().String() - s.gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs - s.signatures[p.ourAddr] = signature - s.ourObs = ourObs - s.ourMsg = msg - - // Fast path for our own signature. - if !s.submitted { - start := time.Now() - p.checkForQuorum(ourObs, s, s.gs, hash) - timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) + wg.Wait() + + if int(errCtr.Load()) > b.N/3 { + b.Error("More than 1/3 of GetSignedVAABytes failed.") } }