-
Notifications
You must be signed in to change notification settings - Fork 100
/
task_runner.go
171 lines (141 loc) · 3.79 KB
/
task_runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package goquic
// #include <stddef.h>
// #include "src/adaptor.h"
import "C"
import (
"container/heap"
"time"
)
type HeapItem struct {
alarm *GoQuicAlarm
// This duration should be managed explicitly by heap, and should not be shared with GoQuicAlarm to be thread-safe.
deadline int64
// Negative value means this alarm is not in heap, otherwise this alarm is idx-th element in heap.
heapIdx int
// Inserted order, to be used at tie-breaking
insertOrd int
}
type AlarmHeap struct {
items []*HeapItem
insertNum int
}
// This TaskRunner is NOT THREAD SAFE (and NEED NOT TO BE) so be careful
// All heap operations should be called in a mainloop, not seperated goroutine
type TaskRunner struct {
alarmList map[*GoQuicAlarm]*HeapItem
alarmHeap *AlarmHeap
deadlineTop int64
timer *time.Timer
}
func (ht *AlarmHeap) Len() int { return len(ht.items) }
func (ht *AlarmHeap) Less(i, j int) bool {
if ht.items[i].deadline == ht.items[j].deadline {
return ht.items[i].insertOrd < ht.items[j].insertOrd
}
return ht.items[i].deadline < ht.items[j].deadline
}
func (ht *AlarmHeap) Swap(i, j int) {
ht.items[i], ht.items[j] = ht.items[j], ht.items[i]
ht.items[i].heapIdx = i
ht.items[j].heapIdx = j
}
func (ht *AlarmHeap) Push(x interface{}) {
n := len(ht.items)
item := x.(*HeapItem)
item.heapIdx = n
ht.insertNum += 1
item.insertOrd = ht.insertNum
ht.items = append(ht.items, item)
}
func (ht *AlarmHeap) Pop() interface{} {
old := ht.items
n := len(old)
item := old[n-1]
item.heapIdx = -1 // for safety
ht.items = old[0 : n-1]
return item
}
func newAlarmHeap() *AlarmHeap {
return &AlarmHeap{make([]*HeapItem, 0), 0}
}
func CreateTaskRunner() *TaskRunner {
taskRunner := &TaskRunner{
alarmList: make(map[*GoQuicAlarm]*HeapItem),
alarmHeap: newAlarmHeap(),
timer: time.NewTimer(time.Duration(200*365*24) * time.Hour), // ~ 200 year
}
return taskRunner
}
func (t *TaskRunner) RunAlarm(alarm *GoQuicAlarm) {
item := t.alarmList[alarm]
item.deadline = item.alarm.deadline
if item.heapIdx < 0 {
heap.Push(t.alarmHeap, item)
} else {
heap.Fix(t.alarmHeap, item.heapIdx)
}
if t.alarmHeap.Len() != 0 && t.deadlineTop != t.alarmHeap.items[0].deadline {
t.resetTimer()
}
}
func (t *TaskRunner) CancelAlarm(alarm *GoQuicAlarm) {
item := t.alarmList[alarm]
if item.heapIdx >= 0 {
heap.Remove(t.alarmHeap, item.heapIdx)
}
if t.alarmHeap.Len() != 0 && t.deadlineTop != t.alarmHeap.items[0].deadline {
t.resetTimer()
}
}
func (t *TaskRunner) resetTimer() {
if t.alarmHeap.Len() == 0 {
return
}
t.deadlineTop = t.alarmHeap.items[0].deadline
now := t.alarmHeap.items[0].alarm.Now()
duration_i64 := t.alarmHeap.items[0].deadline - now
if duration_i64 < 0 {
duration_i64 = 0
}
// C++ clocks: Microseconds
// Go duration: Nanoseconds
duration := time.Duration(duration_i64) * time.Microsecond
t.timer.Reset(duration)
}
func (t *TaskRunner) DoTasks() {
if t.alarmHeap.Len() == 0 {
return
}
now := t.alarmHeap.items[0].alarm.Now()
taskItems := make([]*HeapItem, 0)
for t.alarmHeap.Len() > 0 {
duration_i64 := t.alarmHeap.items[0].deadline - now
if duration_i64 < 0 {
item := heap.Pop(t.alarmHeap).(*HeapItem)
taskItems = append(taskItems, item)
} else {
// fmt.Println(unsafe.Pointer(t), "next alarm will be called after", duration_i64)
break
}
}
for _, item := range taskItems {
item.alarm.OnAlarm()
}
t.resetTimer()
}
func (t *TaskRunner) WaitTimer() <-chan time.Time {
return t.timer.C
}
func (t *TaskRunner) RegisterAlarm(alarm *GoQuicAlarm) {
// This is to prevent garbage collection. This is cleaned up on UnregisterAlarm()
t.alarmList[alarm] = &HeapItem{
alarm: alarm,
heapIdx: -1,
}
}
func (t *TaskRunner) UnregisterAlarm(alarm *GoQuicAlarm) {
if t.alarmList[alarm].heapIdx != -1 {
t.CancelAlarm(alarm)
}
delete(t.alarmList, alarm)
}