diff --git a/systest/testcontext/context.go b/systest/testcontext/context.go index 8eb41fb0aff..6e92112bef8 100644 --- a/systest/testcontext/context.go +++ b/systest/testcontext/context.go @@ -250,7 +250,8 @@ func updateContext(ctx *Context) error { keep, err := strconv.ParseBool(keepval) if err != nil { ctx.Log.Panicw("invalid state. keep label should be parsable as a boolean", - "keepval", keepval) + "keepval", keepval, + ) } ctx.Keep = ctx.Keep || keep @@ -261,7 +262,8 @@ func updateContext(ctx *Context) error { psize, err := strconv.Atoi(psizeval) if err != nil { ctx.Log.Panicw("invalid state. poet size label should be parsable as an integer", - "psizeval", psizeval) + "psizeval", psizeval, + ) } ctx.PoetSize = psize return nil diff --git a/systest/tests/checkpoint_test.go b/systest/tests/checkpoint_test.go index 239d8f76469..dba1d534df4 100644 --- a/systest/tests/checkpoint_test.go +++ b/systest/tests/checkpoint_test.go @@ -39,7 +39,7 @@ func TestCheckpoint(t *testing.T) { tctx := testcontext.New(t) addedLater := 2 - size := min(tctx.ClusterSize, 30) + size := min(tctx.ClusterSize, 20) oldSize := size - addedLater if tctx.ClusterSize > oldSize { tctx.Log.Info("cluster size changed to ", oldSize) @@ -63,14 +63,15 @@ func TestCheckpoint(t *testing.T) { require.EqualValues(t, 4, layersPerEpoch, "checkpoint layer require tuning as layersPerEpoch is changed") layerDuration := testcontext.LayerDuration.Get(tctx.Parameters) - eg, ctx := errgroup.WithContext(tctx) first := layersPerEpoch * 2 stop := first + 2 receiver := types.GenerateAddress([]byte{11, 1, 1}) tctx.Log.Infow("sending transactions", "from", first, "to", stop-1) - require.NoError(t, sendTransactions(ctx, eg, tctx.Log, cl, first, stop, receiver, 1, 100)) - require.NoError(t, eg.Wait()) + deadline := cl.Genesis().Add(time.Duration(stop+1) * layerDuration) + ctx, cancel := context.WithDeadline(tctx, deadline) + defer cancel() + require.NoError(t, sendTransactions(ctx, tctx.Log.Desugar(), cl, first, stop, receiver, 1, 100)) require.NoError(t, waitLayer(tctx, cl.Client(0), snapshotLayer)) tctx.Log.Debugw("getting account balances") @@ -100,7 +101,8 @@ func TestCheckpoint(t *testing.T) { diffs = append(diffs, cl.Client(i).Name) tctx.Log.Errorw("diff checkpoint data", fmt.Sprintf("reference %v", cl.Client(0).Name), string(checkpoints[0]), - fmt.Sprintf("client %v", cl.Client(i).Name), string(checkpoints[i])) + fmt.Sprintf("client %v", cl.Client(i).Name), string(checkpoints[i]), + ) } } require.Empty(t, diffs) diff --git a/systest/tests/common.go b/systest/tests/common.go index 86cd7e10921..46121ac1bee 100644 --- a/systest/tests/common.go +++ b/systest/tests/common.go @@ -32,65 +32,55 @@ var retryBackoff = 10 * time.Second func sendTransactions( ctx context.Context, - eg *errgroup.Group, - logger *zap.SugaredLogger, + logger *zap.Logger, cl *cluster.Cluster, first, stop uint32, receiver types.Address, batch, amount int, ) error { + eg, ctx := errgroup.WithContext(ctx) for i := range cl.Accounts() { client := cl.Client(i % cl.Total()) nonce, err := getNonce(ctx, client, cl.Address(i)) if err != nil { - return fmt.Errorf("get nonce failed (%s:%s): %w", client.Name, cl.Address(i), err) + return fmt.Errorf("get nonce failed (%s: %s): %w", client.Name, cl.Address(i), err) } - watchLayers(ctx, eg, client, logger.Desugar(), func(layer *pb.LayerStreamResponse) (bool, error) { - if layer.Layer.Number.Number == stop { + watchLayers(ctx, eg, client, logger, func(layer *pb.LayerStreamResponse) (bool, error) { + if layer.Layer.Number.Number >= stop { return false, nil } - if layer.Layer.Status != pb.Layer_LAYER_STATUS_APPROVED || - layer.Layer.Number.Number < first { + if layer.Layer.Status != pb.Layer_LAYER_STATUS_APPLIED || layer.Layer.Number.Number < first { return true, nil } - // give some time for a previous layer to be applied - // TODO(dshulyak) introduce api that simply subscribes to internal clock - // and outputs events when the tick for the layer is available - time.Sleep(200 * time.Millisecond) if nonce == 0 { - logger.Infow("address needs to be spawned", "account", i) + logger.Info("address needs to be spawned", zap.Stringer("address", cl.Address(i))) if err := submitSpawn(ctx, cl, i, client); err != nil { return false, fmt.Errorf("failed to spawn %w", err) } nonce++ return true, nil } - logger.Debugw("submitting transactions", - "layer", layer.Layer.Number.Number, - "client", client.Name, - "account", i, - "nonce", nonce, - "batch", batch, + logger.Debug("submitting transactions", + zap.Uint32("layer", layer.Layer.Number.Number), + zap.String("client", client.Name), + zap.Stringer("address", cl.Address(i)), + zap.Uint64("nonce", nonce), + zap.Int("batch", batch), ) - for j := 0; j < batch; j++ { + for j := range batch { // in case spawn isn't executed on this particular client retries := 3 spendClient := client - for k := 0; k < retries; k++ { + for k := range retries { err = submitSpend(ctx, cl, i, receiver, uint64(amount), nonce+uint64(j), spendClient) if err == nil { break } - logger.Warnw( - "failed to spend", - "client", - spendClient.Name, - "account", - i, - "nonce", - nonce+uint64(j), - "err", - err.Error(), + logger.Warn("failed to spend", + zap.String("client", spendClient.Name), + zap.Stringer("address", cl.Address(i)), + zap.Uint64("nonce", nonce+uint64(j)), + zap.Error(err), ) spendClient = cl.Client((i + k + 1) % cl.Total()) } @@ -102,7 +92,7 @@ func sendTransactions( return true, nil }) } - return nil + return eg.Wait() } func submitTransaction(ctx context.Context, tx []byte, node *cluster.NodeClient) ([]byte, error) { @@ -522,13 +512,8 @@ func submitSpend( ) error { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - _, err := submitTransaction(ctx, - wallet.Spend( - cluster.Private(account), receiver, amount, - nonce, - sdk.WithGenesisID(cluster.GenesisID()), - ), - client) + tx := wallet.Spend(cluster.Private(account), receiver, amount, nonce, sdk.WithGenesisID(cluster.GenesisID())) + _, err := submitTransaction(ctx, tx, client) return err } diff --git a/systest/tests/partition_test.go b/systest/tests/partition_test.go index 99cd3184af3..2f5ccf7c552 100644 --- a/systest/tests/partition_test.go +++ b/systest/tests/partition_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "testing" + "time" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" "github.com/stretchr/testify/require" @@ -16,8 +17,8 @@ import ( "github.com/spacemeshos/go-spacemesh/systest/testcontext" ) -func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster, pct int, wait uint32) { - require.Greater(t, cl.Bootnodes(), 1) +func testPartition(tb testing.TB, tctx *testcontext.Context, cl *cluster.Cluster, pct int, wait uint32) { + require.Greater(tb, cl.Bootnodes(), 1) layersPerEpoch := uint32(testcontext.LayersPerEpoch.Get(tctx.Parameters)) var ( @@ -60,9 +61,16 @@ func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster, // start sending transactions tctx.Log.Debug("sending transactions...") - eg2, ctx2 := errgroup.WithContext(tctx) receiver := types.GenerateAddress([]byte{11, 1, 1}) - require.NoError(t, sendTransactions(ctx2, eg2, tctx.Log, cl, first, stop, receiver, 10, 100)) + + layerDuration := testcontext.LayerDuration.Get(tctx.Parameters) + deadline := cl.Genesis().Add(time.Duration(stop) * layerDuration) + ctx2, cancel := context.WithDeadline(tctx, deadline) + eg2, ctx2 := errgroup.WithContext(ctx2) + defer cancel() + eg2.Go(func() error { + return sendTransactions(ctx2, tctx.Log.Desugar(), cl, first, stop, receiver, 10, 100) + }) type stateUpdate struct { layer uint32 @@ -80,7 +88,7 @@ func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster, return stateHashStream(ctx, node, tctx.Log.Desugar(), func(state *pb.GlobalStateStreamResponse) (bool, error) { data := state.Datum.Datum - require.IsType(t, &pb.GlobalStateData_GlobalState{}, data) + require.IsType(tb, &pb.GlobalStateData_GlobalState{}, data) resp := data.(*pb.GlobalStateData_GlobalState) layer := resp.GlobalState.Layer.Number @@ -92,7 +100,8 @@ func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster, tctx.Log.Debugw("state hash collected", "client", node.Name, "layer", layer, - "state", stateHash.ShortString()) + "state", stateHash.ShortString(), + ) stateCh <- &stateUpdate{ layer: layer, hash: stateHash, @@ -154,7 +163,8 @@ func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster, "ref_client", cl.Client(0).Name, "layer", layer, "client_hash", clientState[layer], - "ref_hash", refState[layer]) + "ref_hash", refState[layer], + ) agree = false break } @@ -162,13 +172,14 @@ func testPartition(t *testing.T, tctx *testcontext.Context, cl *cluster.Cluster, if agree { tctx.Log.Debugw("client agreed with ref client on all layers", "client", cl.Client(i).Name, - "ref_client", cl.Client(0).Name) + "ref_client", cl.Client(0).Name, + ) } pass = pass && agree } - require.NoError(t, finalErr) - require.True(t, pass) - eg2.Wait() + require.NoError(tb, finalErr) + require.True(tb, pass) + require.NoError(tb, eg2.Wait()) } // TestPartition_30_70 tests the network partitioning with 30% and 70% of the nodes in each partition. @@ -176,9 +187,9 @@ func TestPartition_30_70(t *testing.T) { t.Parallel() tctx := testcontext.New(t) - if tctx.ClusterSize > 30 { - tctx.Log.Info("cluster size changed to 30") - tctx.ClusterSize = 30 + if tctx.ClusterSize > 20 { + tctx.Log.Info("cluster size changed to 20") + tctx.ClusterSize = 20 } cl, err := cluster.ReuseWait(tctx, cluster.WithKeys(tctx.ClusterSize)) require.NoError(t, err) @@ -191,9 +202,9 @@ func TestPartition_50_50(t *testing.T) { t.Parallel() tctx := testcontext.New(t) - if tctx.ClusterSize > 30 { - tctx.Log.Info("cluster size changed to 30") - tctx.ClusterSize = 30 + if tctx.ClusterSize > 20 { + tctx.Log.Info("cluster size changed to 20") + tctx.ClusterSize = 20 } cl, err := cluster.ReuseWait(tctx, cluster.WithKeys(tctx.ClusterSize)) require.NoError(t, err) diff --git a/systest/tests/transactions_test.go b/systest/tests/transactions_test.go index 4b722bba013..e4982382776 100644 --- a/systest/tests/transactions_test.go +++ b/systest/tests/transactions_test.go @@ -1,8 +1,10 @@ package tests import ( + "context" "encoding/hex" "testing" + "time" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" "github.com/stretchr/testify/require" @@ -21,10 +23,10 @@ func testTransactions( ) { var ( // start sending transactions after two layers or after genesis - first = max(currentLayer(tctx, tb, cl.Client(0))+2, 8) - stopSending = first + sendFor - batch = 10 - amount = 100 + first = max(currentLayer(tctx, tb, cl.Client(0))+2, 8) + stop = first + sendFor + batch = 10 + amount = 100 // each account creates spawn transaction in the first layer // plus batch number of spend transactions in every layer after that @@ -32,7 +34,7 @@ func testTransactions( ) tctx.Log.Debugw("running transactions test", "from", first, - "stop sending", stopSending, + "stop sending", stop, "expected transactions", expectedCount, ) receiver := types.GenerateAddress([]byte{11, 1, 1}) @@ -44,16 +46,19 @@ func testTransactions( require.NoError(tb, err) before := response.AccountWrapper.StateCurrent.Balance - eg, ctx := errgroup.WithContext(tctx) - require.NoError( - tb, - sendTransactions(ctx, eg, tctx.Log, cl, first, stopSending, receiver, batch, amount), - ) + layerDuration := testcontext.LayerDuration.Get(tctx.Parameters) + deadline := cl.Genesis().Add(time.Duration(stop) * layerDuration) + ctx, cancel := context.WithDeadline(tctx, deadline) + defer cancel() + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + return sendTransactions(ctx, tctx.Log.Desugar(), cl, first, stop, receiver, batch, amount) + }) txs := make([][]*pb.Transaction, cl.Total()) for i := range cl.Total() { client := cl.Client(i) - watchTransactionResults(tctx.Context, eg, client, tctx.Log.Desugar(), + watchTransactionResults(tctx, eg, client, tctx.Log.Desugar(), func(rst *pb.TransactionResult) (bool, error) { txs[i] = append(txs[i], rst.Tx) count := len(txs[i])