-
Notifications
You must be signed in to change notification settings - Fork 50
/
Copy pathchoc_VariableSizeFIFO.h
299 lines (250 loc) · 12.1 KB
/
choc_VariableSizeFIFO.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
//
// ██████ ██ ██ ██████ ██████
// ██ ██ ██ ██ ██ ██ ** Classy Header-Only Classes **
// ██ ███████ ██ ██ ██
// ██ ██ ██ ██ ██ ██ https://github.com/Tracktion/choc
// ██████ ██ ██ ██████ ██████
//
// CHOC is (C)2022 Tracktion Corporation, and is offered under the terms of the ISC license:
//
// Permission to use, copy, modify, and/or distribute this software for any purpose with or
// without fee is hereby granted, provided that the above copyright notice and this permission
// notice appear in all copies. THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
// WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
// CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
// WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
// CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#ifndef CHOC_VARIABLE_SIZE_FIFO_HEADER_INCLUDED
#define CHOC_VARIABLE_SIZE_FIFO_HEADER_INCLUDED
#include <vector>
#include <mutex>
#include <type_traits>
#include "../platform/choc_Assert.h"
#include "../threading/choc_SpinLock.h"
namespace choc::fifo
{
//==============================================================================
/**
A multiple writer, single consumer FIFO which can store items as contiguous
blocks of data with individual sizes.
Multiple write threads may have to briefly spin-wait for each other, but the
reader thread is not blocked by the activity of writers.
Note that this class uses a circular buffer, but does not split individual
items across the end of the buffer. This means that when accessing an item,
the reader always has direct access to each item's data as a contiguous block.
But it also means that when an item is too large to fit into empty space at the
end of the circular buffer, that space is treated as padding and the item is
written at the start of the buffer, so it may not always be possible to add an
item, even if there's enough total space for it. You might want to take this
into account by making sure the capacity is at least a few times greater than
the largest item you need to store.
*/
struct VariableSizeFIFO
{
VariableSizeFIFO();
~VariableSizeFIFO();
/// Resets the FIFO with a given capacity in bytes.
/// Note that this is not thread-safe with respect to the other methods - it must
/// only be called when nothing else is pushing or popping to the FIFO.
void reset (uint32_t totalFIFOSizeBytes);
/// Pushes a chunk of data onto the FIFO.
/// If there is space in the FIFO for the given chunk, it will be added, and the
/// function will return true.
/// If the function returns false, then the FIFO didn't have space, and its state
/// will not have been modified by the call, so a caller can try again.
/// If numBytes is 0, nothing will be done, and the function will return false.
/// Note that because the FIFO stores each item's data as contiguous block, then
/// if the free space is split across the end of the circular buffer, then items
/// are not always guaranteed to fit, even if they are smaller than the space
/// returned by getFreeSpace().
bool push (const void* sourceData, uint32_t numBytes);
/// Pushes data into the FIFO, using a callback to provide the data.
/// This does the same job as the other version of push(), but by taking a functor
/// to copy the data to its target location, it allows more complex writes or
/// data generation to take place inside a single transaction.
/// The functor provided must be a function that takes a void* parameter, and writes
/// exactly the number of bytes specified by totalBytes to that location.
template <typename DataProvider>
bool push (uint32_t totalBytes, DataProvider&&);
/// Retrieves the first item's data chunk via a callback.
/// If there are any pending items in the FIFO, the handleItem function
/// provided will be called - it must be a functor or lambda with parameters which can
/// accept being called as handleItem (const void* data, uint32_t size).
/// The function returns true if a callback was made, or false if the FIFO was empty.
template <typename HandleItem>
bool pop (HandleItem&& handleItem);
/// Allows access to all the available item in the FIFO via a callback.
/// If there are any pending items in the FIFO, the handleItem function will be called
/// for each of them. HandleItem must be a functor or lambda which can be called as
/// handleItems (const void* data, uint32_t size). Optionally, the functor can return
/// a bool, in which case returning true will pop the current item and continue, but
/// false will leave the current item in the FIFO and stop.
template <typename HandleItem>
void popAllAvailable (HandleItem&&);
/// Allows multiple items to be read from the FIFO without releasing their slots
/// until the BatchReadOperation object is deleted.
struct BatchReadOperation
{
explicit BatchReadOperation (VariableSizeFIFO&) noexcept;
BatchReadOperation() = default;
BatchReadOperation (BatchReadOperation&&);
BatchReadOperation& operator= (BatchReadOperation&&);
~BatchReadOperation() noexcept;
template <typename HandleItem>
bool pop (HandleItem&& handleItem);
bool isActive() const { return fifo != nullptr; }
void release() noexcept;
private:
VariableSizeFIFO* fifo = nullptr;
uint32_t newReadPos = 0;
};
/// Returns the number of used bytes in the FIFO.
uint32_t getUsedSpace() const;
/// Returns the number of bytes free in the FIFO.
/// Bear in mind that because each item needs some header bytes, and because items
/// are stored contiguously, then the number of free bytes does not mean that an
/// item of this size can definitely be added.
uint32_t getFreeSpace() const;
private:
using ItemHeader = uint32_t; // 0 = skip to the end of the buffer
static constexpr uint32_t headerSize = static_cast<uint32_t> (sizeof (ItemHeader));
uint32_t capacity = 0;
std::atomic<uint32_t> readPos, writePos;
choc::threading::SpinLock writeLock;
std::vector<char> buffer;
};
//==============================================================================
// _ _ _ _
// __| | ___ | |_ __ _ (_)| | ___
// / _` | / _ \| __| / _` || || |/ __|
// | (_| || __/| |_ | (_| || || |\__ \ _ _ _
// \__,_| \___| \__| \__,_||_||_||___/(_)(_)(_)
//
// Code beyond this point is implementation detail...
//
//==============================================================================
inline VariableSizeFIFO::VariableSizeFIFO() { reset (8); }
inline VariableSizeFIFO::~VariableSizeFIFO() = default;
inline uint32_t VariableSizeFIFO::getUsedSpace() const { auto s = readPos.load(); auto e = writePos.load(); return e >= s ? (e - s) : (capacity + 1u - (s - e)); }
inline uint32_t VariableSizeFIFO::getFreeSpace() const { auto s = readPos.load(); auto e = writePos.load(); return e >= s ? (capacity + 1u - (e - s)) : (s - e); }
inline void VariableSizeFIFO::reset (uint32_t totalFIFOSizeBytes)
{
readPos = 0;
writePos = 0;
capacity = std::max (totalFIFOSizeBytes, headerSize + 4);
buffer.clear();
buffer.resize (capacity + 1u);
}
template <typename DataProvider>
bool VariableSizeFIFO::push (uint32_t numBytes, DataProvider&& writeSourceData)
{
if (numBytes == 0)
return false;
auto bytesNeeded = numBytes + headerSize;
const std::scoped_lock lock (writeLock);
auto destOffset = writePos.load();
auto dest = buffer.data() + destOffset;
if (destOffset >= readPos)
{
// check whether we can fit a contiguous block at the current position
if (destOffset + bytesNeeded > capacity)
{
if (bytesNeeded >= readPos) // check whether there'll be enough space after padding
return false;
std::memset (dest, 0, headerSize); // header of 0 = skip to the end
dest = buffer.data();
destOffset = 0;
}
}
else
{
if (destOffset + bytesNeeded >= readPos)
return false;
}
auto header = static_cast<ItemHeader> (numBytes);
std::memcpy (dest, std::addressof (header), headerSize);
writeSourceData (dest + headerSize);
writePos = (destOffset + bytesNeeded) % capacity;
return true;
}
inline bool VariableSizeFIFO::push (const void* sourceData, uint32_t numBytes)
{
return push (numBytes, [=] (void* dest) { std::memcpy (dest, sourceData, numBytes); });
}
template <typename HandleItem>
bool VariableSizeFIFO::pop (HandleItem&& handleItem)
{
for (;;)
{
if (readPos == writePos)
return false;
auto itemData = buffer.data() + static_cast<int32_t> (readPos);
ItemHeader itemSize;
std::memcpy (std::addressof (itemSize), itemData, headerSize);
if (itemSize != 0)
{
handleItem (static_cast<void*> (itemData + headerSize), itemSize);
readPos = (readPos + itemSize + headerSize) % capacity;
return true;
}
readPos = 0;
}
}
template <typename HandleItem>
void VariableSizeFIFO::popAllAvailable (HandleItem&& handleItem)
{
auto originalWritePos = writePos.load();
auto newReadPos = readPos.load();
while (newReadPos != originalWritePos)
{
auto itemData = buffer.data() + static_cast<int32_t> (newReadPos);
ItemHeader itemSize;
std::memcpy (std::addressof (itemSize), itemData, headerSize);
if (itemSize != 0)
{
if constexpr (std::is_void<decltype(handleItem(nullptr, 0))>::value)
{
handleItem (static_cast<void*> (itemData + headerSize), itemSize);
}
else
{
if (! handleItem (static_cast<void*> (itemData + headerSize), itemSize))
break;
}
newReadPos = (newReadPos + itemSize + headerSize) % capacity;
}
else
{
newReadPos = 0;
}
}
readPos = newReadPos;
}
inline VariableSizeFIFO::BatchReadOperation::BatchReadOperation (VariableSizeFIFO& f) noexcept : fifo (std::addressof (f)) { newReadPos = f.readPos.load(); }
inline VariableSizeFIFO::BatchReadOperation::BatchReadOperation (BatchReadOperation&& other) : fifo (other.fifo), newReadPos (other.newReadPos) { other.fifo = nullptr; }
inline VariableSizeFIFO::BatchReadOperation& VariableSizeFIFO::BatchReadOperation::operator= (BatchReadOperation&& other) { release(); fifo = other.fifo; newReadPos = other.newReadPos; other.fifo = nullptr; return *this; }
inline VariableSizeFIFO::BatchReadOperation::~BatchReadOperation() noexcept { release(); }
inline void VariableSizeFIFO::BatchReadOperation::release() noexcept { if (fifo != nullptr) { fifo->readPos = newReadPos; fifo = nullptr; } }
template <typename HandleItem>
bool VariableSizeFIFO::BatchReadOperation::pop (HandleItem&& handleItem)
{
CHOC_ASSERT (fifo != nullptr);
auto originalWritePos = fifo->writePos.load();
while (newReadPos != originalWritePos)
{
auto itemData = fifo->buffer.data() + static_cast<int32_t> (newReadPos);
ItemHeader itemSize;
std::memcpy (std::addressof (itemSize), itemData, headerSize);
if (itemSize != 0)
{
handleItem (static_cast<void*> (itemData + headerSize), itemSize);
newReadPos = (newReadPos + itemSize + headerSize) % fifo->capacity;
return true;
}
newReadPos = 0;
}
return false;
}
} // namespace choc::fifo
#endif // CHOC_VARIABLE_SIZE_FIFO_HEADER_INCLUDED