-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathnodestats.go
156 lines (134 loc) · 5.4 KB
/
nodestats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package mir
import (
"fmt"
"sync"
"time"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/util/maputil"
t "github.com/filecoin-project/mir/stdtypes"
)
// ==============================================================================================================
// Event dispatching statistics
// ==============================================================================================================
// StatsConfig configures the generation of event processing statistics.
type StatsConfig struct {
// Logger to send event processing statistics to.
Logger logging.Logger
// Level with which to log event processing statistics.
LogLevel logging.LogLevel
// If not zero, the Node will emit a log entry every period containing statistics about event processing.
Period time.Duration
}
// eventDispatchStats saves statistical information about the dispatching of events between modules,
// such as the numbers of events dispatched for each module.
type eventDispatchStats struct {
dispatchCounts map[t.ModuleID]int
eventCounts map[t.ModuleID]int
numDispatches int
lastDispatch time.Time
}
// newDispatchStats returns a new eventDispatchStats object with all counters set to 0
// and the last dispatch time set to the current time.
func newDispatchStats(moduleIDs []t.ModuleID) eventDispatchStats {
stats := eventDispatchStats{
dispatchCounts: make(map[t.ModuleID]int),
eventCounts: make(map[t.ModuleID]int),
numDispatches: 0,
lastDispatch: time.Now(),
}
for _, moduleID := range moduleIDs {
stats.dispatchCounts[moduleID] = 0
stats.eventCounts[moduleID] = 0
}
return stats
}
// AddDispatch registers the dispatching of an event list between two modules to the statistics.
func (ds *eventDispatchStats) AddDispatch(mID t.ModuleID, numEvents int) {
ds.numDispatches++
ds.dispatchCounts[mID]++
ds.eventCounts[mID] += numEvents
}
// CombinedStats returns the dispatching statistics combined with information about event buffer occupancy
// in a format that can directly be passed to the logger.
// For each module, it includes the number of event lists dispatched to that module (d),
// the total number of events in those lists (e),
// and the number of events still buffered at the input of that module (b).
func (ds *eventDispatchStats) CombinedStats(
bufferStats map[t.ModuleID]int,
processingTimes map[t.ModuleID]time.Duration,
period time.Duration,
) []interface{} {
// Calculate the column width for the module IDs.
maxModuleIDLength := 0
for mID := range ds.eventCounts {
if len(string(mID)) > maxModuleIDLength {
maxModuleIDLength = len(string(mID))
}
}
//
logVals := make([]interface{}, 0, len(ds.eventCounts)+2)
totalEventsDispatched := 0
totalEventsBuffered := 0
maputil.IterateSortedCustom(ds.dispatchCounts, func(mID t.ModuleID, cnt int) (cont bool) {
// TODO: Pass these as separate values somehow ahd have the logger figure out what to do with them.
logVals = append(logVals, fmt.Sprintf("%"+fmt.Sprint(maxModuleIDLength)+"s", mID),
fmt.Sprintf("d(%10d)-e(%10d)-b(%10d)-t(%6.2f%%)\n",
cnt*int(time.Second)/int(period),
ds.eventCounts[mID]*int(time.Second)/int(period),
bufferStats[mID],
100*float32(processingTimes[mID])/float32(period)),
)
totalEventsDispatched += ds.eventCounts[mID]
totalEventsBuffered += bufferStats[mID]
return true
}, func(mID1 t.ModuleID, mID2 t.ModuleID) bool {
// For now, sort by processing time.
return processingTimes[mID1] >= processingTimes[mID2]
})
logVals = append(logVals,
"numDispatches/s", ds.numDispatches*int(time.Second)/int(period),
"totalEventsDispatched/s", totalEventsDispatched*int(time.Second)/int(period),
"totalEventsBuffered", totalEventsBuffered,
)
return logVals
}
// ==============================================================================================================
// Additional methods of Node that deal with stats.
// ==============================================================================================================
// monitorStats prints and resets the dispatching statistics every given time interval, until the node is stopped.
func (n *Node) monitorStats(interval time.Duration, wg *sync.WaitGroup) {
defer wg.Done()
ticker := time.NewTicker(interval)
for {
select {
case <-n.workErrNotifier.ExitC():
ticker.Stop()
n.Config.Logger.Log(logging.LevelInfo, "Event processing monitoring finished.")
n.flushStats()
return
case <-ticker.C:
n.flushStats()
}
}
}
func (n *Node) flushStats() {
n.statsLock.Lock()
defer n.statsLock.Unlock()
eventBufferStats := n.pendingEvents.Stats()
stats := n.dispatchStats.CombinedStats(eventBufferStats, n.resetStopwatches(), n.Config.Stats.Period)
if n.inputIsPaused() {
n.Config.Stats.Logger.Log(n.Config.Stats.LogLevel, "External event processing paused.\n", stats...)
} else {
n.Config.Stats.Logger.Log(n.Config.Stats.LogLevel, "External event processing running.\n", stats...)
}
n.dispatchStats = newDispatchStats(maputil.GetSortedKeys(n.modules))
}
// resetStopwatches resets all the nodes stopwatches tracking the processing time of each module.
// It returns the durations read from the stopwatches just before resetting.
func (n *Node) resetStopwatches() map[t.ModuleID]time.Duration {
processingTimes := make(map[t.ModuleID]time.Duration, len(n.stopwatches))
for mID, stopwatch := range n.stopwatches {
processingTimes[mID] = stopwatch.Reset()
}
return processingTimes
}