-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFastQueue.h
258 lines (228 loc) · 7.59 KB
/
FastQueue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
//
// Created by Anders Cedronius
//
// Usage
// Create the queue
// auto queue = FastQueue<Type, Size, L1-Cache size>
// Type of data
// Size of queue as a contiguous bitmask from LSB example 0b1111
// The ring buffer is acting as a rubber band between the
// producer/consumer to avoid unnecessary stalls when pushing new data.
// L1-Cache size typically 64 bytes
// queue.push is blocking if queue is full
// queue.stopQueue() or a popped entry will release the spinlock only.
// queue.push(object/pointer)
// queue.pop is blocking if the queue is empty
// queue.stopQueue() or a pushed entry will release the spinlock only.
// auto result = queue.pop();
// if result is {} this signals all objects are popped and the consumer should
// not pop any more data
// use tryPush and/or tryPop if you want to avoid spinlock CPU hogging for low
// frequency data transfer tryPush should be followed by pushAfterTry if used
// and tryPop should be followed by popAfterTry.
// Call queue.stopQueue() from any thread to signal end of transaction
// the user may drop the queue or pop the queue until {} is returned.
// Call queue.isQueueStopped() to see the status of the queue.
// May be used to manage the life cycle of the thread pushing data for example.
#pragma once
#include <iostream>
#include <cstdint>
#include <vector>
#include <atomic>
#include <stdexcept>
#include <bitset>
#if __x86_64__ || _M_X64
#include <immintrin.h>
#ifdef _MSC_VER
#include <intrin.h>
#endif
#elif __aarch64__ || _M_ARM64
#ifdef _MSC_VER
#include <arm64_neon.h>
#endif
#else
#error Arhitecture not supported
#endif
template<typename T, uint64_t RING_BUFFER_SIZE, uint64_t L1_CACHE_LNE>
class FastQueue {
public:
enum class FastQueueMessages : uint64_t {
END_OF_SERVICE,
READY_TO_POP,
NOT_READY_TO_POP,
READY_TO_PUSH,
NOT_READY_TO_PUSH,
};
explicit FastQueue() {
uint64_t lSource = RING_BUFFER_SIZE;
uint64_t lContiguousBits = 0;
while (true) {
if (!(lSource & 1)) break;
lSource = lSource >> 1;
lContiguousBits++;
}
uint64_t lBitsSetTotal = std::bitset<64>(RING_BUFFER_SIZE).count();
if (lContiguousBits != lBitsSetTotal || !lContiguousBits) {
throw std::runtime_error(
"Buffer size must be a number of contiguous bits set from LSB. Example: 0b00001111 not 0b01001111");
}
if ((uint64_t) &mWritePositionPush % 8 || (uint64_t) &mReadPositionPop % 8) {
throw std::runtime_error("Queue-pointers are misaligned in memory.");
}
}
///////////////////////
/// Push part
///////////////////////
FastQueueMessages tryPush() {
if (mWritePositionPush - mReadPositionPush >= RING_BUFFER_SIZE || mExitThreadSemaphore) {
return FastQueueMessages::NOT_READY_TO_PUSH;
}
return FastQueueMessages::READY_TO_PUSH;
}
void pushAfterTry(T &rItem) {
mRingBuffer[mWritePositionPush & RING_BUFFER_SIZE].mObj = std::move(rItem);
#if __x86_64__ || _M_X64
_mm_sfence();
#elif __aarch64__ || _M_ARM64
#ifdef _MSC_VER
__dmb(_ARM64_BARRIER_ISHST);
#else
asm volatile("dmb ishst" : : : "memory");
#endif
#else
#error Architecture not supported
#endif
mWritePositionPop = ++mWritePositionPush;
}
void push(T &rItem) noexcept {
while (mWritePositionPush - mReadPositionPush >= RING_BUFFER_SIZE) {
if (mExitThreadSemaphore) {
return;
}
}
mRingBuffer[mWritePositionPush & RING_BUFFER_SIZE].mObj = std::move(rItem);
#if __x86_64__ || _M_X64
_mm_sfence();
#elif __aarch64__ || _M_ARM64
#ifdef _MSC_VER
__dmb(_ARM64_BARRIER_ISHST);
#else
asm volatile("dmb ishst" : : : "memory");
#endif
#else
#error Architecture not supported
#endif
mWritePositionPop = ++mWritePositionPush;
}
void pushRaw(T &rItem) noexcept {
while (mWritePositionPush - mReadPositionPush >= RING_BUFFER_SIZE) {
}
mRingBuffer[mWritePositionPush & RING_BUFFER_SIZE].mObj = std::move(rItem);
#if __x86_64__ || _M_X64
_mm_sfence();
#elif __aarch64__ || _M_ARM64
#ifdef _MSC_VER
__dmb(_ARM64_BARRIER_ISHST);
#else
asm volatile("dmb ishst" : : : "memory");
#endif
#else
#error Architecture not supported
#endif
mWritePositionPop = ++mWritePositionPush;
}
///////////////////////
/// Pop part
///////////////////////
FastQueueMessages tryPop() {
if (mWritePositionPop == mReadPositionPop) {
if ((mExitThread == mReadPositionPop) && mExitThreadSemaphore) {
return FastQueueMessages::END_OF_SERVICE;
}
return FastQueueMessages::NOT_READY_TO_POP;
}
return FastQueueMessages::READY_TO_POP;
}
T popAfterTry() {
T lData = std::move(mRingBuffer[mReadPositionPop & RING_BUFFER_SIZE].mObj);
#if __x86_64__ || _M_X64
_mm_lfence();
#elif __aarch64__ || _M_ARM64
#ifdef _MSC_VER
__dmb(_ARM64_BARRIER_ISHLD);
#else
asm volatile("dmb ishld" : : : "memory");
#endif
#else
#error Architecture not supported
#endif
mReadPositionPush = ++mReadPositionPop;
return lData;
}
T pop() noexcept {
while (mWritePositionPop == mReadPositionPop) {
if ((mExitThread == mReadPositionPop) && mExitThreadSemaphore) {
return {};
}
}
T lData = std::move(mRingBuffer[mReadPositionPop & RING_BUFFER_SIZE].mObj);
#if __x86_64__ || _M_X64
_mm_lfence();
#elif __aarch64__ || _M_ARM64
#ifdef _MSC_VER
__dmb(_ARM64_BARRIER_ISHLD);
#else
asm volatile("dmb ishld" : : : "memory");
#endif
#else
#error Architecture not supported
#endif
mReadPositionPush = ++mReadPositionPop;
return lData;
}
void popRaw(T& out) noexcept {
while (mWritePositionPop == mReadPositionPop) {
}
out = std::move(mRingBuffer[mReadPositionPop & RING_BUFFER_SIZE].mObj);
#if __x86_64__ || _M_X64
_mm_lfence();
#elif __aarch64__ || _M_ARM64
#ifdef _MSC_VER
__dmb(_ARM64_BARRIER_ISHLD);
#else
asm volatile("dmb ishld" : : : "memory");
#endif
#else
#error Architecture not supported
#endif
mReadPositionPush = ++mReadPositionPop;
}
//Stop queue (Maybe called from any thread)
void stopQueue() {
mExitThread = mWritePositionPush;
mExitThreadSemaphore = true;
}
//Is the queue stopped?
bool isQueueStopped() {
return mExitThreadSemaphore;
}
///Delete copy and move constructors and assign operators
FastQueue(FastQueue const &) = delete; // Copy construct
FastQueue(FastQueue &&) = delete; // Move construct
FastQueue &operator=(FastQueue const &) = delete; // Copy assign
FastQueue &operator=(FastQueue &&) = delete; // Move assign
private:
struct alignas(L1_CACHE_LNE) mAlign {
T mObj;
volatile uint8_t mStuff[L1_CACHE_LNE - sizeof(T)];
};
alignas(L1_CACHE_LNE) volatile uint8_t mBorderUpp[L1_CACHE_LNE];
alignas(L1_CACHE_LNE) volatile uint64_t mWritePositionPush = 0;
alignas(L1_CACHE_LNE) volatile uint64_t mReadPositionPop = 0;
alignas(L1_CACHE_LNE) volatile uint64_t mWritePositionPop = 0;
alignas(L1_CACHE_LNE) volatile uint64_t mReadPositionPush = 0;
alignas(L1_CACHE_LNE) volatile uint64_t mExitThread = 0;
alignas(L1_CACHE_LNE) volatile bool mExitThreadSemaphore = false;
alignas(L1_CACHE_LNE) mAlign mRingBuffer[RING_BUFFER_SIZE + 1];
alignas(L1_CACHE_LNE) volatile uint8_t mBorderDown[L1_CACHE_LNE];
};