Skip to content

Commit

Permalink
Merge pull request #338 from spacemeshos/improve-observability
Browse files Browse the repository at this point in the history
Add more logs and round members metric
  • Loading branch information
poszu authored Aug 8, 2023
2 parents 9fb74ef + 89ee249 commit beb3bdf
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 89 deletions.
24 changes: 10 additions & 14 deletions prover/prover.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var persist persistFunc = func(context.Context, *merkle.Tree, *cache.Writer, uin
// Merkle proof using the challenge and the DAG.
func GenerateProof(
ctx context.Context,
leavesCounter prometheus.Counter,
treeCfg TreeConfig,
labelHashFunc func(data []byte) []byte,
merkleHashFunc merkle.HashFunc,
Expand All @@ -58,12 +59,13 @@ func GenerateProof(
}
defer treeCache.Close()

return generateProof(ctx, labelHashFunc, tree, treeCache, limit, 0, securityParam, persist)
return generateProof(ctx, leavesCounter, labelHashFunc, tree, treeCache, limit, 0, securityParam, persist)
}

// GenerateProofRecovery recovers proof generation, from a given 'nextLeafID' and for a given 'parkedNodes' snapshot.
func GenerateProofRecovery(
ctx context.Context,
leavesCounter prometheus.Counter,
treeCfg TreeConfig,
labelHashFunc func(data []byte) []byte,
merkleHashFunc merkle.HashFunc,
Expand All @@ -79,11 +81,12 @@ func GenerateProofRecovery(
}
defer treeCache.Close()

return generateProof(ctx, labelHashFunc, tree, treeCache, limit, nextLeafID, securityParam, persist)
return generateProof(ctx, leavesCounter, labelHashFunc, tree, treeCache, limit, nextLeafID, securityParam, persist)
}

// GenerateProofWithoutPersistency calls GenerateProof with disabled persistency functionality
// and potential soft/hard-shutdown recovery.
// Meant to be used for testing purposes only. Doesn't expose metrics too.
func GenerateProofWithoutPersistency(
ctx context.Context,
treeCfg TreeConfig,
Expand All @@ -92,7 +95,8 @@ func GenerateProofWithoutPersistency(
limit time.Time,
securityParam uint8,
) (uint64, *shared.MerkleProof, error) {
return GenerateProof(ctx, treeCfg, labelHashFunc, merkleHashFunc, limit, securityParam, persist)
leavesCounter := prometheus.NewCounter(prometheus.CounterOpts{})
return GenerateProof(ctx, leavesCounter, treeCfg, labelHashFunc, merkleHashFunc, limit, securityParam, persist)
}

func makeProofTree(treeCfg TreeConfig, merkleHashFunc merkle.HashFunc) (*merkle.Tree, *cache.Writer, error) {
Expand Down Expand Up @@ -205,12 +209,12 @@ func makeRecoveryProofTree(

func sequentialWork(
ctx context.Context,
leavesCounter prometheus.Counter,
labelHashFunc func(data []byte) []byte,
tree *merkle.Tree,
treeCache *cache.Writer,
end time.Time,
nextLeafID uint64,
securityParam uint8,
persist persistFunc,
) (uint64, error) {
var parkedNodes [][]byte
Expand All @@ -219,17 +223,8 @@ func sequentialWork(
finished := time.NewTimer(time.Until(end))
defer finished.Stop()

leavesCounter := prometheus.NewCounter(prometheus.CounterOpts{
Name: "poet_leaves",
Help: "Number of generated leaves",
})
leavesCounter.Add(float64(nextLeafID))

if err := prometheus.Register(leavesCounter); err != nil {
logging.FromContext(ctx).Error("failed to register prometheus leaves counter", zap.Error(err))
}
defer prometheus.Unregister(leavesCounter)

for {
// Generate the next leaf.
parkedNodes = tree.GetParkedNodes(parkedNodes[:0])
Expand Down Expand Up @@ -264,6 +259,7 @@ func sequentialWork(

func generateProof(
ctx context.Context,
leavesCounter prometheus.Counter,
labelHashFunc func(data []byte) []byte,
tree *merkle.Tree,
treeCache *cache.Writer,
Expand All @@ -275,7 +271,7 @@ func generateProof(
logger := logging.FromContext(ctx)
logger.Info("generating proof", zap.Time("end", end), zap.Uint64("nextLeafID", nextLeafID))

leaves, err := sequentialWork(ctx, labelHashFunc, tree, treeCache, end, nextLeafID, securityParam, persist)
leaves, err := sequentialWork(ctx, leavesCounter, labelHashFunc, tree, treeCache, end, nextLeafID, persist)
if err != nil {
return 0, nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func (s *Server) Start(ctx context.Context) error {
})

if err := s.svc.Start(ctx); err != nil {
return err
stop()
return fmt.Errorf("service stopped with error: %v (serverGroup: %v)", err, serverGroup.Wait())
}

rpcServer := rpc.NewServer(s.svc, proofsDb, s.cfg)
Expand All @@ -150,7 +151,7 @@ func (s *Server) Start(ctx context.Context) error {

// Start the gRPC server listening for HTTP/2 connections.
serverGroup.Go(func() error {
logger.Sugar().Infof("RPC server listening on %s", s.rpcListener.Addr())
logger.Sugar().Infof("GRPC server listening on %s", s.rpcListener.Addr())
return grpcServer.Serve(s.rpcListener)
})

Expand Down
2 changes: 2 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ func TestCannotSubmitMoreThanMaxRoundMembers(t *testing.T) {
submitChallenge([]byte("challenge 3")),
status.Error(codes.ResourceExhausted, service.ErrMaxMembersReached.Error()),
)
cancel()
req.NoError(eg.Wait())
}

func TestGettingInitialPowParams(t *testing.T) {
Expand Down
114 changes: 93 additions & 21 deletions service/round.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/spacemeshos/merkle-tree"
"github.com/spacemeshos/merkle-tree/cache"
"github.com/syndtr/goleveldb/leveldb"
Expand All @@ -24,6 +26,20 @@ import (
var (
ErrRoundIsNotOpen = errors.New("round is not open")
ErrMaxMembersReached = errors.New("maximum number of round members reached")

membersMetric = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "poet",
Subsystem: "round",
Name: "members_total",
Help: "Number of members in a round",
}, []string{"id"})

leavesMetric = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "poet",
Subsystem: "round",
Name: "leaves_total",
Help: "Number of generated leaves in a round",
}, []string{"id"})
)

type executionState struct {
Expand All @@ -40,7 +56,6 @@ const roundStateFileBaseName = "state.bin"
type roundState struct {
ExecutionStarted time.Time
Execution *executionState
Members uint
}

func (r *round) isOpen() bool {
Expand All @@ -60,6 +75,9 @@ type round struct {
execution *executionState
members uint
maxMembers uint

membersCounter prometheus.Counter
leavesCounter prometheus.Counter
}

func (r *round) Epoch() uint32 {
Expand All @@ -76,22 +94,36 @@ func newRound(datadir string, epoch uint32, maxMembers uint) (*round, error) {
return nil, err
}

return &round{
// Note: using the panicking version here because it panics
// only if the number of label values is not the same as the number of variable labels in Desc.
// There is only 1 label (round ID), that is passed, so it's safe to use.
membersCounter := membersMetric.WithLabelValues(id)
leavesCounter := leavesMetric.WithLabelValues(id)

r := &round{
epoch: epoch,
datadir: datadir,
ID: id,
challengesDb: db,
execution: &executionState{
SecurityParam: shared.T,
},
maxMembers: maxMembers,
}, nil
members: countMembersInDB(db),
maxMembers: maxMembers,
membersCounter: membersCounter,
leavesCounter: leavesCounter,
}

membersCounter.Add(float64(r.members))

return r, nil
}

func (r *round) submit(key, challenge []byte) error {
func (r *round) submit(ctx context.Context, key, challenge []byte) error {
if !r.isOpen() {
return ErrRoundIsNotOpen
}

if r.members >= r.maxMembers {
return ErrMaxMembersReached
}
Expand All @@ -104,13 +136,13 @@ func (r *round) submit(key, challenge []byte) error {
err := r.challengesDb.Put(key, challenge, &opt.WriteOptions{Sync: true})
if err == nil {
r.members += 1
r.membersCounter.Inc()
}
return err
}

func (r *round) execute(ctx context.Context, end time.Time, minMemoryLayer, fileWriterBufSize uint) error {
logger := logging.FromContext(ctx).With(zap.String("round", r.ID))
logger.Sugar().Infof("executing until %v...", end)

r.executionStarted = time.Now()
if err := r.saveState(); err != nil {
Expand All @@ -123,12 +155,20 @@ func (r *round) execute(ctx context.Context, end time.Time, minMemoryLayer, file
r.execution.Members, r.execution.Statement = members, statement
}

logger.Info(
"executing round",
zap.Time("end", end),
zap.Int("members", len(r.execution.Members)),
zap.Binary("statement", r.execution.Statement),
)

if err := r.saveState(); err != nil {
return err
}

numLeaves, nip, err := prover.GenerateProof(
ctx,
r.leavesCounter,
prover.TreeConfig{
MinMemoryLayer: minMemoryLayer,
Datadir: r.datadir,
Expand All @@ -141,14 +181,19 @@ func (r *round) execute(ctx context.Context, end time.Time, minMemoryLayer, file
r.persistExecution,
)
if err != nil {
return err
return fmt.Errorf("generating proof: %w", err)
}
r.execution.NumLeaves, r.execution.NIP = numLeaves, nip
if err := r.saveState(); err != nil {
return err
}

logger.Sugar().Infof("execution ended, phi=%x, duration %v", r.execution.NIP.Root, time.Since(r.executionStarted))
logger.Info(
"execution ended",
zap.Binary("root", r.execution.NIP.Root),
zap.Uint64("num_leaves", r.execution.NumLeaves),
zap.Duration("duration", time.Since(r.executionStarted)),
)
return nil
}

Expand All @@ -168,16 +213,11 @@ func (r *round) persistExecution(

r.execution.NumLeaves = numLeaves
r.execution.ParkedNodes = tree.GetParkedNodes(r.execution.ParkedNodes[:0])
if err := r.saveState(); err != nil {
return err
}

return nil
return r.saveState()
}

func (r *round) recoverExecution(ctx context.Context, end time.Time, fileWriterBufSize uint) error {
logger := logging.FromContext(ctx).With(zap.String("round", r.ID))
logger.With().Info("recovering execution", zap.Time("end", end))

started := time.Now()

Expand All @@ -193,8 +233,12 @@ func (r *round) recoverExecution(ctx context.Context, end time.Time, fileWriterB
}
}

logger.With().
Info("recovering execution", zap.Time("end", end), zap.Int("members", len(r.execution.Members)), zap.Uint64("num_leaves", r.execution.NumLeaves))

numLeaves, nip, err := prover.GenerateProofRecovery(
ctx,
r.leavesCounter,
prover.TreeConfig{
Datadir: r.datadir,
FileWriterBufSize: fileWriterBufSize,
Expand All @@ -215,7 +259,12 @@ func (r *round) recoverExecution(ctx context.Context, end time.Time, fileWriterB
return err
}

logger.With().Info("finished round recovered execution", zap.Duration("duration", time.Since(started)))
logger.With().Info(
"finished round recovered execution",
zap.Binary("root", r.execution.NIP.Root),
zap.Uint64("num_leaves", r.execution.NumLeaves),
zap.Duration("duration", time.Since(started)),
)

return nil
}
Expand All @@ -225,25 +274,27 @@ func (r *round) loadState() error {
filename := filepath.Join(r.datadir, roundStateFileBaseName)
state := roundState{}
if err := load(filename, &state); err != nil {
return err
return fmt.Errorf("loading state: %w", err)
}
if r.execution.SecurityParam != state.Execution.SecurityParam {
return errors.New("SecurityParam config mismatch")
}
r.execution = state.Execution
r.executionStarted = state.ExecutionStarted
r.members = state.Members

return nil
}

func (r *round) saveState() error {
filename := filepath.Join(r.datadir, roundStateFileBaseName)
return persist(filename, &roundState{
err := persist(filename, &roundState{
ExecutionStarted: r.executionStarted,
Execution: r.execution,
Members: r.members,
})
if err != nil {
return fmt.Errorf("persisting state: %w", err)
}
return nil
}

func (r *round) calcMembersAndStatement() ([][]byte, []byte, error) {
Expand Down Expand Up @@ -271,13 +322,34 @@ func (r *round) calcMembersAndStatement() ([][]byte, []byte, error) {
return members, mtree.Root(), nil
}

func (r *round) teardown(cleanup bool) error {
func (r *round) teardown(ctx context.Context, cleanup bool) error {
logger := logging.FromContext(ctx)
logger.Info("tearing down round", zap.String("round", r.ID), zap.Bool("cleanup", cleanup))
started := time.Now()
defer logger.Info(
"finished tearing down round",
zap.String("round", r.ID),
zap.Duration("duration", time.Since(started)),
)

membersMetric.DeleteLabelValues(r.ID)
leavesMetric.DeleteLabelValues(r.ID)

if err := r.challengesDb.Close(); err != nil {
return err
return fmt.Errorf("closing DB: %w", err)
}

if cleanup {
return os.RemoveAll(r.datadir)
}
return r.saveState()
}

func countMembersInDB(db *leveldb.DB) (count uint) {
iter := db.NewIterator(nil, nil)
defer iter.Release()
for iter.Next() {
count++
}
return count
}
Loading

0 comments on commit beb3bdf

Please sign in to comment.