Skip to content

Commit

Permalink
Allow test runner to serve PoET proofs during distributed post test (#…
Browse files Browse the repository at this point in the history
…6456)

## Motivation

Sometimes the system test `TestPostMalfeasanceProof` fails because dependencies for the created ATX cannot be fetched by the receiver of it's gossip. This adds a fetcher that allows another peer to request that data if necessary.
  • Loading branch information
fasmat committed Nov 15, 2024
1 parent ea3286f commit 046279e
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 46 deletions.
6 changes: 4 additions & 2 deletions fetch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ func (h *handler) doHandleHashReq(ctx context.Context, data []byte, hint datasto
h.logger.Debug("remote peer requested nonexistent hash",
log.ZContext(ctx),
zap.Stringer("hash", r.Hash),
zap.String("hint", string(r.Hint)))
zap.String("hint", string(r.Hint)),
)
hashMissing.WithLabelValues(string(r.Hint)).Add(1)
continue
} else if len(blob.Bytes) == 0 {
Expand All @@ -302,7 +303,8 @@ func (h *handler) doHandleHashReq(ctx context.Context, data []byte, hint datasto
h.logger.Debug("responded to hash request",
log.ZContext(ctx),
zap.Stringer("hash", r.Hash),
zap.Int("dataSize", len(blob.Bytes)))
zap.Int("dataSize", len(blob.Bytes)),
)
}
// add response to batch
m := ResponseMessage{
Expand Down
6 changes: 6 additions & 0 deletions malfeasance/wire/malfeasance.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,12 @@ type InvalidPostIndexProof struct {
InvalidIdx uint32
}

func (p *InvalidPostIndexProof) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddObject("atx", &p.Atx)
encoder.AddUint32("invalid_index", p.InvalidIdx)
return nil
}

type BallotProofMsg struct {
InnerMsg types.BallotMetadata

Expand Down
54 changes: 24 additions & 30 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,24 @@ func (app *App) initServices(ctx context.Context) error {
return nil
})

fetcherWrapped := &layerFetcher{}
proposalsStore := store.New(
store.WithEvictedLayer(app.clock.CurrentLayer()),
store.WithLogger(app.addLogger(ProposalStoreLogger, lg).Zap()),
store.WithCapacity(app.Config.Tortoise.Zdist+1),
)

flog := app.addLogger(Fetcher, lg)
fetcher, err := fetch.NewFetch(app.cachedDB, proposalsStore, app.host,
fetch.WithContext(ctx),
fetch.WithConfig(app.Config.FETCH),
fetch.WithLogger(flog.Zap()),
)
if err != nil {
return fmt.Errorf("create fetcher: %w", err)
}
app.eg.Go(func() error {
return blockssync.Sync(ctx, flog.Zap(), msh.MissingBlocks(), fetcher)
})

atxHandler := activation.NewHandler(
app.host.ID(),
Expand All @@ -744,7 +761,7 @@ func (app *App) initServices(ctx context.Context) error {
app.edVerifier,
app.clock,
app.host,
fetcherWrapped,
fetcher,
goldenATXID,
validator,
beaconProtocol,
Expand All @@ -768,8 +785,9 @@ func (app *App) initServices(ctx context.Context) error {
)
}

blockHandler := blocks.NewHandler(fetcherWrapped, app.db, trtl, msh,
blocks.WithLogger(app.addLogger(BlockHandlerLogger, lg).Zap()))
blockHandler := blocks.NewHandler(fetcher, app.db, trtl, msh,
blocks.WithLogger(app.addLogger(BlockHandlerLogger, lg).Zap()),
)

app.txHandler = txs.NewTxHandler(
app.conState,
Expand Down Expand Up @@ -819,26 +837,6 @@ func (app *App) initServices(ctx context.Context) error {
app.certifier.Register(sig)
}

proposalsStore := store.New(
store.WithEvictedLayer(app.clock.CurrentLayer()),
store.WithLogger(app.addLogger(ProposalStoreLogger, lg).Zap()),
store.WithCapacity(app.Config.Tortoise.Zdist+1),
)

flog := app.addLogger(Fetcher, lg)
fetcher, err := fetch.NewFetch(app.cachedDB, proposalsStore, app.host,
fetch.WithContext(ctx),
fetch.WithConfig(app.Config.FETCH),
fetch.WithLogger(flog.Zap()),
)
if err != nil {
return fmt.Errorf("create fetcher: %w", err)
}
fetcherWrapped.Fetcher = fetcher
app.eg.Go(func() error {
return blockssync.Sync(ctx, flog.Zap(), msh.MissingBlocks(), fetcher)
})

patrol := layerpatrol.New()
syncerConf := app.Config.Sync
syncerConf.HareDelayLayers = app.Config.Tortoise.Zdist
Expand Down Expand Up @@ -954,7 +952,7 @@ func (app *App) initServices(ctx context.Context) error {
propHare,
app.edVerifier,
app.host,
fetcherWrapped,
fetcher,
beaconProtocol,
msh,
trtl,
Expand All @@ -977,7 +975,7 @@ func (app *App) initServices(ctx context.Context) error {
proposalsStore,
executor,
msh,
fetcherWrapped,
fetcher,
app.certifier,
patrol,
blocks.WithConfig(blocks.Config{
Expand Down Expand Up @@ -2283,10 +2281,6 @@ func (app *App) Host() *p2p.Host {
return app.host
}

type layerFetcher struct {
system.Fetcher
}

func decodeLoggerLevel(cfg *config.Config, name string) (zap.AtomicLevel, error) {
lvl := zap.NewAtomicLevel()
loggers := map[string]string{}
Expand Down
60 changes: 46 additions & 14 deletions systest/tests/distributed_post_verification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import (
"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/fetch"
mwire "github.com/spacemeshos/go-spacemesh/malfeasance/wire"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/handshake"
"github.com/spacemeshos/go-spacemesh/p2p/pubsub"
"github.com/spacemeshos/go-spacemesh/proposals/store"
"github.com/spacemeshos/go-spacemesh/signing"
"github.com/spacemeshos/go-spacemesh/sql/localsql"
"github.com/spacemeshos/go-spacemesh/sql/localsql/nipost"
Expand Down Expand Up @@ -95,10 +97,50 @@ func TestPostMalfeasanceProof(t *testing.T) {
require.NoError(t, err)
logger.Info("p2p host created", zap.Stringer("id", host.ID()))
host.Register(pubsub.AtxProtocol, func(context.Context, peer.ID, []byte) error { return nil })

require.NoError(t, host.Start())
t.Cleanup(func() { assert.NoError(t, host.Stop()) })

db := statesql.InMemoryTest(t)
cdb := datastore.NewCachedDB(db, zap.NewNop())
t.Cleanup(func() { assert.NoError(t, cdb.Close()) })

clock, err := timesync.NewClock(
timesync.WithLayerDuration(cfg.LayerDuration),
timesync.WithTickInterval(1*time.Second),
timesync.WithGenesisTime(cl.Genesis()),
timesync.WithLogger(logger.Named("clock")),
)
require.NoError(t, err)
t.Cleanup(clock.Close)

proposalsStore := store.New(
store.WithEvictedLayer(clock.CurrentLayer()),
store.WithLogger(logger.Named("proposals-store")),
store.WithCapacity(cfg.Tortoise.Zdist+1),
)

fetcher, err := fetch.NewFetch(cdb, proposalsStore, host,
fetch.WithContext(ctx),
fetch.WithConfig(cfg.FETCH),
fetch.WithLogger(logger.Named("fetcher")),
)
require.NoError(t, err)

fetcher.SetValidators(
fetch.ValidatorFunc(func(context.Context, types.Hash32, peer.ID, []byte) error { return nil }),
fetch.ValidatorFunc(func(context.Context, types.Hash32, peer.ID, []byte) error { return nil }),
fetch.ValidatorFunc(func(context.Context, types.Hash32, peer.ID, []byte) error { return nil }),
fetch.ValidatorFunc(func(context.Context, types.Hash32, peer.ID, []byte) error { return nil }),
fetch.ValidatorFunc(func(context.Context, types.Hash32, peer.ID, []byte) error { return nil }),
fetch.ValidatorFunc(func(context.Context, types.Hash32, peer.ID, []byte) error { return nil }),
fetch.ValidatorFunc(func(context.Context, types.Hash32, peer.ID, []byte) error { return nil }),
fetch.ValidatorFunc(func(context.Context, types.Hash32, peer.ID, []byte) error { return nil }),
fetch.ValidatorFunc(func(context.Context, types.Hash32, peer.ID, []byte) error { return nil }),
)

require.NoError(t, fetcher.Start())
t.Cleanup(fetcher.Stop)

ctrl := gomock.NewController(t)
syncer := activation.NewMocksyncer(ctrl)
syncer.EXPECT().RegisterForATXSynced().DoAndReturn(func() <-chan struct{} {
Expand All @@ -108,9 +150,6 @@ func TestPostMalfeasanceProof(t *testing.T) {
}).AnyTimes()

// 1. Initialize
db := statesql.InMemoryTest(t)
cdb := datastore.NewCachedDB(db, zap.NewNop())
t.Cleanup(func() { assert.NoError(t, cdb.Close()) })
postSetupMgr, err := activation.NewPostSetupManager(
cfg.POST,
logger.Named("post"),
Expand All @@ -135,15 +174,6 @@ func TestPostMalfeasanceProof(t *testing.T) {
t.Cleanup(func() { assert.NoError(t, postSupervisor.Stop(false)) })

// 2. create ATX with invalid POST labels
clock, err := timesync.NewClock(
timesync.WithLayerDuration(cfg.LayerDuration),
timesync.WithTickInterval(1*time.Second),
timesync.WithGenesisTime(cl.Genesis()),
timesync.WithLogger(logger.Named("clock")),
)
require.NoError(t, err)
t.Cleanup(clock.Close)

grpcPostService := grpcserver.NewPostService(
logger.Named("grpc-post-service"),
grpcserver.PostServiceQueryInterval(500*time.Millisecond),
Expand Down Expand Up @@ -248,6 +278,8 @@ func TestPostMalfeasanceProof(t *testing.T) {
Pow: challenge.InitialPost.Pow,
},
}
err = nipost.AddChallenge(localDb, signer.NodeID(), nipostChallenge)
require.NoError(t, err)

nipost, err := nipostBuilder.BuildNIPost(ctx, signer, challenge.Hash(), nipostChallenge)
require.NoError(t, err)
Expand Down Expand Up @@ -326,7 +358,7 @@ func TestPostMalfeasanceProof(t *testing.T) {
require.NoError(t, codec.Decode(malf.Proof.Proof, &proof))
require.Equal(t, mwire.InvalidPostIndex, proof.Proof.Type)
invalidPostProof := proof.Proof.Data.(*mwire.InvalidPostIndexProof)
logger.Sugar().Infow("malfeasance post proof", "proof", invalidPostProof)
logger.Info("malfeasance post proof", zap.Object("proof", invalidPostProof))
invalidAtx := invalidPostProof.Atx
require.Equal(t, atx.PublishEpoch, invalidAtx.PublishEpoch)
require.Equal(t, atx.SmesherID, invalidAtx.SmesherID)
Expand Down

0 comments on commit 046279e

Please sign in to comment.