Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/ethstorage/es-node into fix…
Browse files Browse the repository at this point in the history
…-underflow
  • Loading branch information
syntrust committed Nov 7, 2024
2 parents 3335bb9 + 72ee4ac commit a5431da
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 70 deletions.
78 changes: 47 additions & 31 deletions cmd/priv-dashboard/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethstorage/go-ethstorage/ethstorage/metrics"
"github.com/ethstorage/go-ethstorage/ethstorage/node"
Expand All @@ -31,6 +30,7 @@ var (
portFlag = flag.Int("port", 8080, "Listener port for the es-node to report node status")
grafanaPortFlag = flag.Int("grafana", 9500, "Listener port for the metrics report")
logFlag = flag.Int("loglevel", 3, "Log level to use for Ethereum and the faucet")
contractFlag = flag.String("contract", "0x804C520d3c084C805E37A35E90057Ac32831F96f", "Default contract address used to compatible with older versions of es-node")
)

type record struct {
Expand All @@ -46,6 +46,13 @@ type dashboard struct {
logger log.Logger
}

type statistics struct {
count int
versions map[string]int
shards map[uint64]int
phasesOfShard map[uint64]map[string]int
}

func newDashboard() (*dashboard, error) {
var (
m = metrics.NewNetworkMetrics()
Expand Down Expand Up @@ -82,27 +89,31 @@ func (d *dashboard) ReportStateHandler(w http.ResponseWriter, r *http.Request) {
}
err = json.Unmarshal(body, &state)
if err != nil {
log.Warn("Parse node state failed", "error", err.Error())
log.Warn("Parse node state failed", "state", string(body), "error", err.Error())
w.Write([]byte(fmt.Sprintf(`{"status":"error", "err message":"%s"}`, err.Error())))
return
}
if err = d.checkState(&state); err != nil {
log.Warn("check node state failed", "error", err.Error())
log.Warn("check node state failed", "state", string(body), "error", err.Error())
w.Write([]byte(fmt.Sprintf(`{"status":"error", "err message":"%s"}`, err.Error())))
return
}

if state.Contract == "" {
state.Contract = *contractFlag
}

log.Info("Get state from peer", "peer id", state.Id, "state", string(body))
d.lock.Lock()
d.nodes[state.Id] = &record{receivedTime: time.Now(), state: &state}
d.lock.Unlock()
for _, shard := range state.Shards {
d.m.SetPeerInfo(state.Id, state.Version, state.Address, shard.ShardId, shard.Miner)
d.m.SetPeerInfo(state.Id, state.Contract, state.Version, state.Address, shard.ShardId, shard.Miner)
sync, mining, submission := shard.SyncState, shard.MiningState, shard.SubmissionState
d.m.SetSyncState(state.Id, state.Version, state.Address, shard.ShardId, shard.Miner, sync.PeerCount, sync.SyncProgress,
d.m.SetSyncState(state.Id, state.Contract, state.Version, state.Address, shard.ShardId, shard.Miner, sync.PeerCount, sync.SyncProgress,
sync.SyncedSeconds, sync.FillEmptyProgress, sync.FillEmptySeconds, shard.ProvidedBlob)
d.m.SetMiningState(state.Id, state.Version, state.Address, shard.ShardId, shard.Miner, mining.MiningPower, mining.SamplingTime)
d.m.SetSubmissionState(state.Id, state.Version, state.Address, shard.ShardId, shard.Miner, submission.Succeeded,
d.m.SetMiningState(state.Id, state.Contract, state.Version, state.Address, shard.ShardId, shard.Miner, mining.MiningPower, mining.SamplingTime)
d.m.SetSubmissionState(state.Id, state.Contract, state.Version, state.Address, shard.ShardId, shard.Miner, submission.Succeeded,
submission.Failed, submission.Dropped, submission.LastSucceededTime)
}

Expand All @@ -126,58 +137,63 @@ func (d *dashboard) checkState(state *node.NodeState) error {
}

func (d *dashboard) Report() {
var (
minerOfShards = make(map[uint64]map[common.Address]struct{})
versions = make(map[string]int)
shards = make(map[uint64]int)
phasesOfShard = make(map[uint64]map[string]int)
)
summary := make(map[string]*statistics)

d.lock.Lock()
defer d.lock.Unlock()
for id, r := range d.nodes {
if time.Since(r.receivedTime) > timeoutTime {
delete(d.nodes, id)
for _, shard := range r.state.Shards {
d.m.DeletePeerInfo(r.state.Id, r.state.Version, r.state.Address, shard.ShardId, shard.Miner)
d.m.DeletePeerInfo(r.state.Id, r.state.Contract, r.state.Version, r.state.Address, shard.ShardId, shard.Miner)
}
continue
}

if _, ok := versions[r.state.Version]; !ok {
versions[r.state.Version] = 0
if _, ok := summary[r.state.Contract]; !ok {
summary[r.state.Contract] = &statistics{
count: 0,
versions: make(map[string]int),
shards: make(map[uint64]int),
phasesOfShard: make(map[uint64]map[string]int),
}
}
versions[r.state.Version] = versions[r.state.Version] + 1
sd := summary[r.state.Contract]
sd.count++

for _, s := range r.state.Shards {
if _, ok := shards[s.ShardId]; !ok {
shards[s.ShardId] = 0
}
shards[s.ShardId] = shards[s.ShardId] + 1
if _, ok := sd.versions[r.state.Version]; !ok {
sd.versions[r.state.Version] = 0
}
sd.versions[r.state.Version] = sd.versions[r.state.Version] + 1

if _, ok := minerOfShards[s.ShardId]; !ok {
minerOfShards[s.ShardId] = make(map[common.Address]struct{})
for _, s := range r.state.Shards {
shard := s.ShardId
if _, ok := sd.shards[shard]; !ok {
sd.shards[shard] = 0
}
minerOfShards[s.ShardId][s.Miner] = struct{}{}
sd.shards[shard] = sd.shards[shard] + 1

if _, ok := phasesOfShard[s.ShardId]; !ok {
if _, ok := sd.phasesOfShard[shard]; !ok {
phases := make(map[string]int)
phases["syncing"] = 0
phases["mining"] = 0
phases["mined"] = 0
phasesOfShard[s.ShardId] = phases
sd.phasesOfShard[shard] = phases
}
if s.SyncState.SyncProgress < 10000 || s.SyncState.FillEmptyProgress < 10000 {
phasesOfShard[s.ShardId]["syncing"] = phasesOfShard[s.ShardId]["syncing"] + 1
sd.phasesOfShard[shard]["syncing"] = sd.phasesOfShard[shard]["syncing"] + 1
} else if s.SubmissionState.Succeeded > 0 {
phasesOfShard[s.ShardId]["mined"] = phasesOfShard[s.ShardId]["mined"] + 1
sd.phasesOfShard[shard]["mined"] = sd.phasesOfShard[shard]["mined"] + 1
} else {
phasesOfShard[s.ShardId]["mining"] = phasesOfShard[s.ShardId]["mining"] + 1
sd.phasesOfShard[shard]["mining"] = sd.phasesOfShard[shard]["mining"] + 1
}
}
}

d.m.SetStaticMetrics(len(d.nodes), minerOfShards, versions, shards, phasesOfShard)
d.m.ResetStaticMetrics()
for contract, data := range summary {
d.m.SetStaticMetrics(contract, data.count, data.versions, data.shards, data.phasesOfShard)
}
}

func (d *dashboard) loop() {
Expand Down
Loading

0 comments on commit a5431da

Please sign in to comment.