-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathqueue.go
192 lines (167 loc) · 4.9 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
package shoveler
import (
"container/list"
"github.com/joncrlsn/dque"
"errors"
"path"
"sync"
"time"
)
type MessageStruct struct {
Message []byte
}
type ConfirmationQueue struct {
diskQueue *dque.DQue
mutex sync.Mutex
emptyCond *sync.Cond
memQueue *list.List
usingDisk bool
}
var (
ErrEmpty = errors.New("queue is empty")
MaxInMemory = 100
LowWaterMark = 50
)
// NewConfirmationQueue returns an initialized list.
func NewConfirmationQueue(config *Config) *ConfirmationQueue {
return new(ConfirmationQueue).Init(config)
}
// ItemBuilder creates a new item and returns a pointer to it.
// This is used when we load a segment of the queue from disk.
func ItemBuilder() interface{} {
return &MessageStruct{}
}
// Init initializes the queue
func (cq *ConfirmationQueue) Init(config *Config) *ConfirmationQueue {
qName := path.Base(config.QueueDir)
qDir := path.Dir(config.QueueDir)
segmentSize := 10000
var err error
cq.diskQueue, err = dque.NewOrOpen(qName, qDir, segmentSize, ItemBuilder)
if err != nil {
log.Panicln("Failed to create queue:", err)
}
err = cq.diskQueue.TurboOn()
if err != nil {
log.Errorln("Failed to turn on dque Turbo mode, the queue will be safer but much slower:", err)
}
// Check if we have any messages in the queue
if cq.diskQueue.Size() > 0 {
cq.usingDisk = true
}
cq.emptyCond = sync.NewCond(&cq.mutex)
// Start the metrics goroutine
cq.memQueue = list.New()
go cq.queueMetrics()
return cq
}
func (cq *ConfirmationQueue) Size() int {
cq.mutex.Lock()
defer cq.mutex.Unlock()
if cq.usingDisk {
return cq.diskQueue.SizeUnsafe()
} else {
return cq.memQueue.Len()
}
}
// queueMetrics updates the queue size prometheus metric
// Should be run within a go routine
func (cq *ConfirmationQueue) queueMetrics() {
// Setup the timer, every 5 seconds update the queue
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
// Do a select on the timer
for {
<-ticker.C
// Update the prometheus
queueSizeInt := cq.Size()
QueueSize.Set(float64(queueSizeInt))
log.Debugln("Queue Size:", queueSizeInt)
}
}
// Enqueue the message
func (cq *ConfirmationQueue) Enqueue(msg []byte) {
cq.mutex.Lock()
defer cq.mutex.Unlock()
// Check size of in memory queue
// Still using in-memory
if !cq.usingDisk && (cq.memQueue.Len()+1) < MaxInMemory {
cq.memQueue.PushBack(msg)
} else if !cq.usingDisk && (cq.memQueue.Len()+1) >= MaxInMemory {
// Not using disk queue, but the next message would go over MaxInMemory
// Transfer everything to the on-disk queue
for cq.memQueue.Len() > 0 {
toEnqueue := cq.memQueue.Remove(cq.memQueue.Front()).([]byte)
err := cq.diskQueue.Enqueue(&MessageStruct{Message: toEnqueue})
if err != nil {
log.Errorln("Failed to enqueue message:", err)
}
}
// Enqueue the current
err := cq.diskQueue.Enqueue(&MessageStruct{Message: msg})
if err != nil {
log.Errorln("Failed to enqueue message:", err)
}
cq.usingDisk = true
} else {
// Last option is we are using disk
err := cq.diskQueue.Enqueue(&MessageStruct{Message: msg})
if err != nil {
log.Errorln("Failed to enqueue message:", err)
}
}
cq.emptyCond.Broadcast()
}
// dequeueLocked dequeues a message, assuming the queue has already been locked
func (cq *ConfirmationQueue) dequeueLocked() ([]byte, error) {
// Check if we have a message available in the queue
if !cq.usingDisk && cq.memQueue.Len() == 0 {
return nil, ErrEmpty
} else if cq.usingDisk && cq.diskQueue.Size() == 0 {
return nil, ErrEmpty
}
if !cq.usingDisk {
return cq.memQueue.Remove(cq.memQueue.Front()).([]byte), nil
} else if cq.usingDisk && (cq.diskQueue.Size()-1) >= LowWaterMark {
// If we are using disk, and the on disk size is larger than the low water mark
msgStruct, err := cq.diskQueue.Dequeue()
if err != nil {
log.Errorln("Failed to dequeue: ", err)
}
return msgStruct.(*MessageStruct).Message, err
} else {
// Using disk, but the next enqueue makes it < LowWaterMark, transfer everything from on disk to in-memory
for cq.diskQueue.Size() > 0 {
msgStruct, err := cq.diskQueue.Dequeue()
if err != nil {
log.Errorln("Failed to dequeue: ", err)
}
cq.memQueue.PushBack(msgStruct.(*MessageStruct).Message)
}
cq.usingDisk = false
return cq.memQueue.Remove(cq.memQueue.Front()).([]byte), nil
}
}
// Dequeue Blocking function to receive a message
func (cq *ConfirmationQueue) Dequeue() ([]byte, error) {
cq.mutex.Lock()
defer cq.mutex.Unlock()
for {
msg, err := cq.dequeueLocked()
if err == ErrEmpty {
cq.emptyCond.Wait()
// Wait() atomically unlocks mutexEmptyCond and suspends execution of the calling goroutine.
// Receiving the signal does not guarantee an item is available, let's loop and check again.
continue
} else if err != nil {
return nil, err
}
return msg, nil
}
}
// Close will close the on-disk files
func (cq *ConfirmationQueue) Close() error {
cq.mutex.Lock()
defer cq.mutex.Unlock()
return cq.diskQueue.Close()
}