Skip to content

Commit

Permalink
Debug unstable system tests (#6486)
Browse files Browse the repository at this point in the history
## Motivation

This PR tries to improve instable system tests
  • Loading branch information
fasmat committed Nov 25, 2024
1 parent fc05e13 commit d475044
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 73 deletions.
6 changes: 4 additions & 2 deletions systest/testcontext/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ func updateContext(ctx *Context) error {
keep, err := strconv.ParseBool(keepval)
if err != nil {
ctx.Log.Panicw("invalid state. keep label should be parsable as a boolean",
"keepval", keepval)
"keepval", keepval,
)
}
ctx.Keep = ctx.Keep || keep

Expand All @@ -261,7 +262,8 @@ func updateContext(ctx *Context) error {
psize, err := strconv.Atoi(psizeval)
if err != nil {
ctx.Log.Panicw("invalid state. poet size label should be parsable as an integer",
"psizeval", psizeval)
"psizeval", psizeval,
)
}
ctx.PoetSize = psize
return nil
Expand Down
12 changes: 7 additions & 5 deletions systest/tests/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestCheckpoint(t *testing.T) {

tctx := testcontext.New(t)
addedLater := 2
size := min(tctx.ClusterSize, 30)
size := min(tctx.ClusterSize, 20)
oldSize := size - addedLater
if tctx.ClusterSize > oldSize {
tctx.Log.Info("cluster size changed to ", oldSize)
Expand All @@ -63,14 +63,15 @@ func TestCheckpoint(t *testing.T) {
require.EqualValues(t, 4, layersPerEpoch, "checkpoint layer require tuning as layersPerEpoch is changed")
layerDuration := testcontext.LayerDuration.Get(tctx.Parameters)

eg, ctx := errgroup.WithContext(tctx)
first := layersPerEpoch * 2
stop := first + 2
receiver := types.GenerateAddress([]byte{11, 1, 1})
tctx.Log.Infow("sending transactions", "from", first, "to", stop-1)
require.NoError(t, sendTransactions(ctx, eg, tctx.Log, cl, first, stop, receiver, 1, 100))
require.NoError(t, eg.Wait())

deadline := cl.Genesis().Add(time.Duration(stop+1) * layerDuration)
ctx, cancel := context.WithDeadline(tctx, deadline)
defer cancel()
require.NoError(t, sendTransactions(ctx, tctx.Log.Desugar(), cl, first, stop, receiver, 1, 100))
require.NoError(t, waitLayer(tctx, cl.Client(0), snapshotLayer))

tctx.Log.Debugw("getting account balances")
Expand Down Expand Up @@ -100,7 +101,8 @@ func TestCheckpoint(t *testing.T) {
diffs = append(diffs, cl.Client(i).Name)
tctx.Log.Errorw("diff checkpoint data",
fmt.Sprintf("reference %v", cl.Client(0).Name), string(checkpoints[0]),
fmt.Sprintf("client %v", cl.Client(i).Name), string(checkpoints[i]))
fmt.Sprintf("client %v", cl.Client(i).Name), string(checkpoints[i]),
)
}
}
require.Empty(t, diffs)
Expand Down
61 changes: 23 additions & 38 deletions systest/tests/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,65 +32,55 @@ var retryBackoff = 10 * time.Second

func sendTransactions(
ctx context.Context,
eg *errgroup.Group,
logger *zap.SugaredLogger,
logger *zap.Logger,
cl *cluster.Cluster,
first, stop uint32,
receiver types.Address,
batch, amount int,
) error {
eg, ctx := errgroup.WithContext(ctx)
for i := range cl.Accounts() {
client := cl.Client(i % cl.Total())
nonce, err := getNonce(ctx, client, cl.Address(i))
if err != nil {
return fmt.Errorf("get nonce failed (%s:%s): %w", client.Name, cl.Address(i), err)
return fmt.Errorf("get nonce failed (%s: %s): %w", client.Name, cl.Address(i), err)
}
watchLayers(ctx, eg, client, logger.Desugar(), func(layer *pb.LayerStreamResponse) (bool, error) {
if layer.Layer.Number.Number == stop {
watchLayers(ctx, eg, client, logger, func(layer *pb.LayerStreamResponse) (bool, error) {
if layer.Layer.Number.Number >= stop {
return false, nil
}
if layer.Layer.Status != pb.Layer_LAYER_STATUS_APPROVED ||
layer.Layer.Number.Number < first {
if layer.Layer.Status != pb.Layer_LAYER_STATUS_APPLIED || layer.Layer.Number.Number < first {
return true, nil
}
// give some time for a previous layer to be applied
// TODO(dshulyak) introduce api that simply subscribes to internal clock
// and outputs events when the tick for the layer is available
time.Sleep(200 * time.Millisecond)
if nonce == 0 {
logger.Infow("address needs to be spawned", "account", i)
logger.Info("address needs to be spawned", zap.Stringer("address", cl.Address(i)))
if err := submitSpawn(ctx, cl, i, client); err != nil {
return false, fmt.Errorf("failed to spawn %w", err)
}
nonce++
return true, nil
}
logger.Debugw("submitting transactions",
"layer", layer.Layer.Number.Number,
"client", client.Name,
"account", i,
"nonce", nonce,
"batch", batch,
logger.Debug("submitting transactions",
zap.Uint32("layer", layer.Layer.Number.Number),
zap.String("client", client.Name),
zap.Stringer("address", cl.Address(i)),
zap.Uint64("nonce", nonce),
zap.Int("batch", batch),
)
for j := 0; j < batch; j++ {
for j := range batch {
// in case spawn isn't executed on this particular client
retries := 3
spendClient := client
for k := 0; k < retries; k++ {
for k := range retries {
err = submitSpend(ctx, cl, i, receiver, uint64(amount), nonce+uint64(j), spendClient)
if err == nil {
break
}
logger.Warnw(
"failed to spend",
"client",
spendClient.Name,
"account",
i,
"nonce",
nonce+uint64(j),
"err",
err.Error(),
logger.Warn("failed to spend",
zap.String("client", spendClient.Name),
zap.Stringer("address", cl.Address(i)),
zap.Uint64("nonce", nonce+uint64(j)),
zap.Error(err),
)
spendClient = cl.Client((i + k + 1) % cl.Total())
}
Expand All @@ -102,7 +92,7 @@ func sendTransactions(
return true, nil
})
}
return nil
return eg.Wait()
}

func submitTransaction(ctx context.Context, tx []byte, node *cluster.NodeClient) ([]byte, error) {
Expand Down Expand Up @@ -522,13 +512,8 @@ func submitSpend(
) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
_, err := submitTransaction(ctx,
wallet.Spend(
cluster.Private(account), receiver, amount,
nonce,
sdk.WithGenesisID(cluster.GenesisID()),
),
client)
tx := wallet.Spend(cluster.Private(account), receiver, amount, nonce, sdk.WithGenesisID(cluster.GenesisID()))
_, err := submitTransaction(ctx, tx, client)
return err
}

Expand Down
45 changes: 28 additions & 17 deletions systest/tests/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"testing"
"time"

pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"github.com/stretchr/testify/require"
Expand All @@ -16,8 +17,8 @@ import (
"github.com/spacemeshos/go-spacemesh/systest/testcontext"
)

func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster, pct int, wait uint32) {
require.Greater(t, cl.Bootnodes(), 1)
func testPartition(tb testing.TB, tctx *testcontext.Context, cl *cluster.Cluster, pct int, wait uint32) {
require.Greater(tb, cl.Bootnodes(), 1)
layersPerEpoch := uint32(testcontext.LayersPerEpoch.Get(tctx.Parameters))

var (
Expand Down Expand Up @@ -60,9 +61,16 @@ func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster,

// start sending transactions
tctx.Log.Debug("sending transactions...")
eg2, ctx2 := errgroup.WithContext(tctx)
receiver := types.GenerateAddress([]byte{11, 1, 1})
require.NoError(t, sendTransactions(ctx2, eg2, tctx.Log, cl, first, stop, receiver, 10, 100))

layerDuration := testcontext.LayerDuration.Get(tctx.Parameters)
deadline := cl.Genesis().Add(time.Duration(stop) * layerDuration)
ctx2, cancel := context.WithDeadline(tctx, deadline)
eg2, ctx2 := errgroup.WithContext(ctx2)
defer cancel()
eg2.Go(func() error {
return sendTransactions(ctx2, tctx.Log.Desugar(), cl, first, stop, receiver, 10, 100)
})

type stateUpdate struct {
layer uint32
Expand All @@ -80,7 +88,7 @@ func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster,
return stateHashStream(ctx, node, tctx.Log.Desugar(),
func(state *pb.GlobalStateStreamResponse) (bool, error) {
data := state.Datum.Datum
require.IsType(t, &pb.GlobalStateData_GlobalState{}, data)
require.IsType(tb, &pb.GlobalStateData_GlobalState{}, data)

resp := data.(*pb.GlobalStateData_GlobalState)
layer := resp.GlobalState.Layer.Number
Expand All @@ -92,7 +100,8 @@ func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster,
tctx.Log.Debugw("state hash collected",
"client", node.Name,
"layer", layer,
"state", stateHash.ShortString())
"state", stateHash.ShortString(),
)
stateCh <- &stateUpdate{
layer: layer,
hash: stateHash,
Expand Down Expand Up @@ -154,31 +163,33 @@ func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster,
"ref_client", cl.Client(0).Name,
"layer", layer,
"client_hash", clientState[layer],
"ref_hash", refState[layer])
"ref_hash", refState[layer],
)
agree = false
break
}
}
if agree {
tctx.Log.Debugw("client agreed with ref client on all layers",
"client", cl.Client(i).Name,
"ref_client", cl.Client(0).Name)
"ref_client", cl.Client(0).Name,
)
}
pass = pass && agree
}
require.NoError(t, finalErr)
require.True(t, pass)
eg2.Wait()
require.NoError(tb, finalErr)
require.True(tb, pass)
require.NoError(tb, eg2.Wait())
}

// TestPartition_30_70 tests the network partitioning with 30% and 70% of the nodes in each partition.
func TestPartition_30_70(t *testing.T) {
t.Parallel()

tctx := testcontext.New(t)
if tctx.ClusterSize > 30 {
tctx.Log.Info("cluster size changed to 30")
tctx.ClusterSize = 30
if tctx.ClusterSize > 20 {
tctx.Log.Info("cluster size changed to 20")
tctx.ClusterSize = 20
}
cl, err := cluster.ReuseWait(tctx, cluster.WithKeys(tctx.ClusterSize))
require.NoError(t, err)
Expand All @@ -191,9 +202,9 @@ func TestPartition_50_50(t *testing.T) {
t.Parallel()

tctx := testcontext.New(t)
if tctx.ClusterSize > 30 {
tctx.Log.Info("cluster size changed to 30")
tctx.ClusterSize = 30
if tctx.ClusterSize > 20 {
tctx.Log.Info("cluster size changed to 20")
tctx.ClusterSize = 20
}
cl, err := cluster.ReuseWait(tctx, cluster.WithKeys(tctx.ClusterSize))
require.NoError(t, err)
Expand Down
27 changes: 16 additions & 11 deletions systest/tests/transactions_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package tests

import (
"context"
"encoding/hex"
"testing"
"time"

pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"github.com/stretchr/testify/require"
Expand All @@ -21,18 +23,18 @@ func testTransactions(
) {
var (
// start sending transactions after two layers or after genesis
first = max(currentLayer(tctx, tb, cl.Client(0))+2, 8)
stopSending = first + sendFor
batch = 10
amount = 100
first = max(currentLayer(tctx, tb, cl.Client(0))+2, 8)
stop = first + sendFor
batch = 10
amount = 100

// each account creates spawn transaction in the first layer
// plus batch number of spend transactions in every layer after that
expectedCount = cl.Accounts() * (1 + int(sendFor-1)*batch)
)
tctx.Log.Debugw("running transactions test",
"from", first,
"stop sending", stopSending,
"stop sending", stop,
"expected transactions", expectedCount,
)
receiver := types.GenerateAddress([]byte{11, 1, 1})
Expand All @@ -44,16 +46,19 @@ func testTransactions(
require.NoError(tb, err)
before := response.AccountWrapper.StateCurrent.Balance

eg, ctx := errgroup.WithContext(tctx)
require.NoError(
tb,
sendTransactions(ctx, eg, tctx.Log, cl, first, stopSending, receiver, batch, amount),
)
layerDuration := testcontext.LayerDuration.Get(tctx.Parameters)
deadline := cl.Genesis().Add(time.Duration(stop) * layerDuration)
ctx, cancel := context.WithDeadline(tctx, deadline)
defer cancel()
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
return sendTransactions(ctx, tctx.Log.Desugar(), cl, first, stop, receiver, batch, amount)
})
txs := make([][]*pb.Transaction, cl.Total())

for i := range cl.Total() {
client := cl.Client(i)
watchTransactionResults(tctx.Context, eg, client, tctx.Log.Desugar(),
watchTransactionResults(tctx, eg, client, tctx.Log.Desugar(),
func(rst *pb.TransactionResult) (bool, error) {
txs[i] = append(txs[i], rst.Tx)
count := len(txs[i])
Expand Down

0 comments on commit d475044

Please sign in to comment.