-
Notifications
You must be signed in to change notification settings - Fork 0
/
atomic_aio.stub
478 lines (413 loc) · 13.3 KB
/
atomic_aio.stub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
#include <stdio.h>
#include <assert.h>
#include <pthread.h>
// CROSS-PLATFORM
#ifndef __CROSS_PLATFORM_H__
#define __CROSS_PLATFORM_H__
// bool define
#ifdef __KERNEL__
#include <sys/stdbool.h>
#else
#include <stdbool.h>
#endif
// malloc free
#ifdef __KERNEL__
#define malloc(x) kmalloc(x, GFP_KERNEL)
#define free kfree
#define calloc(x, y) kmalloc(x *y, GFP_KERNEL | __GFP_ZERO)
#include <linux/string.h>
#else
#include <stdlib.h>
#include <string.h>
#endif
#ifndef asm
#define asm __asm
#endif
#define cmpxchg(ptr, _old, _new) \
{ \
volatile uint32_t *__ptr = (volatile uint32_t *)(ptr); \
uint32_t __ret; \
asm volatile("lock; cmpxchgl %2,%1" \
: "=a"(__ret), "+m"(*__ptr) \
: "r"(_new), "0"(_old) \
: "memory"); \
); \
__ret; \
}
//#define CAS cmpxchg
#define ATOMIC_SET __sync_lock_test_and_set
#define ATOMIC_RELEASE __sync_lock_release
#if defined __GNUC__
#define ATOMIC_SUB __sync_sub_and_fetch
#define ATOMIC_SUB64 ATOMIC_SUB
#define CAS __sync_bool_compare_and_swap
#define XCHG __sync_lock_test_and_set // yes really. The 2nd arg is limited to 1 on machines with TAS but not XCHG. On x86 it's an arbitrary value
#define ATOMIC_ADD __sync_add_and_fetch
#define ATOMIC_ADD64 ATOMIC_ADD
#define mb __sync_synchronize
#if defined(__x86_64__) || defined(__i386)
// #define lmb() asm volatile( "lfence" )
// #define smb() asm volatile( "sfence" )
// #define lmb() asm volatile("":::"memory") // compiler barrier only. runtime reordering already impossible on x86
// #define smb() asm volatile("":::"memory")
// "mfence" for lmb and smb makes assertion failures rarer, but doesn't eliminate, so it's just papering over the symptoms
#endif // else no definition
// thread
#include <pthread.h>
// #include <sched.h>
#define THREAD_WAIT(x) pthread_join(x, NULL);
#define THREAD_ID pthread_self
#define THREAD_FN void *
#define THREAD_YIELD sched_yield
#define THREAD_TOKEN pthread_t
#else
// #include <Windows.h>
// #define ATOMIC_SUB(x,y) InterlockedExchangeAddNoFence(x, -y)
// #define ATOMIC_SUB64(x,y) InterlockedExchangeAddNoFence64(x, -y)
// #define ATOMIC_ADD InterlockedExchangeAddNoFence
// #define ATOMIC_ADD64 InterlockedExchangeAddNoFence64
// #ifdef _WIN64
// #define mb() MemoryBarrier()
// #define lmb() LoadFence()
// #define smb() StoreFence()
// inline bool __CAS(LONG64 volatile *x, LONG64 y, LONG64 z) {
// return InterlockedCompareExchangeNoFence64(x, z, y) == y;
// }
// #define CAS(x,y,z) __CAS((LONG64 volatile *)x, (LONG64)y, (LONG64)z)
// #else
// #define mb() asm mfence
// #define lmb() asm lfence
// #define smb() asm sfence
// inline bool __CAS(LONG volatile *x, LONG y, LONG z) {
// return InterlockedCompareExchangeNoFence(x, z, y) == y;
// }
// #define CAS(x,y,z) __CAS((LONG volatile *)x, (LONG)y, (LONG)z)
// #endif
// // thread
// #include <windows.h>
// #define THREAD_WAIT(x) WaitForSingleObject(x, INFINITE);
// #define THREAD_ID GetCurrentThreadId
// #define THREAD_FN WORD WINAPI
// #define THREAD_YIELD SwitchToThread
// #define THREAD_TOKEN HANDLE
#endif
#endif
// LFQ.H
#ifndef __LFQ_H__
#define __LFQ_H__
// #include <stdalign.h> // C11
struct lfq_node
{
void *data;
union
{
struct lfq_node *volatile next;
struct lfq_node *volatile free_next;
};
volatile int can_free;
};
struct lfq_ctx
{
// alignas(64) volatile struct lfq_node * volatile head;
struct lfq_node *volatile head;
int volatile count;
volatile struct lfq_node **HP;
volatile int *tid_map;
int volatile is_freeing;
volatile struct lfq_node *volatile fph; // free pool head
volatile struct lfq_node *volatile fpt; // free pool tail
int MAXHPSIZE;
// alignas(64) volatile struct lfq_node * volatile tail; // in another cache line to avoid contention
struct lfq_node *volatile tail;
};
#define LFQ_MB_DEQUEUE(ctx, ret) ({ \
do \
{ \
ret = lfq_dequeue(ctx); \
mb(); \
} while (ret == 0); \
})
#endif
#ifdef DEBUG
#include <assert.h>
#endif
#include <errno.h>
#define MAXFREE 150
static int inHP(struct lfq_ctx *ctx, struct lfq_node *lfn)
{
for (int i = 0; i < ctx->MAXHPSIZE; i++)
{
// lmb(); // not needed, we don't care if loads reorder here, just that we check all the elements
if (ctx->HP[i] == lfn)
return 1;
}
return 0;
}
static void enpool(struct lfq_ctx *ctx, struct lfq_node *lfn)
{
// add to tail of the free list
lfn->free_next = NULL;
volatile struct lfq_node *old_tail = XCHG(&ctx->fpt, lfn); // seq_cst
old_tail->free_next = lfn;
// getting nodes out of this will have exactly the same deallocation problem
// as the main queue.
// TODO: a stack might be easier to manage, but would increase contention.
/*
volatile struct lfq_node * p;
do {
p = ctx->fpt;
} while(!CAS(&ctx->fpt, p, lfn)); // exchange using CAS
p->free_next = lfn;
*/
}
static void free_pool(struct lfq_ctx *ctx, bool freeall)
{
if (!CAS(&ctx->is_freeing, 0, 1))
return; // this pool free is not support multithreading.
volatile struct lfq_node *p;
for (int i = 0; i < MAXFREE || freeall; i++)
{
p = ctx->fph;
if ((!p->can_free) || (!p->free_next) || inHP(ctx, (struct lfq_node *)p))
goto exit;
ctx->fph = p->free_next;
free((void *)p);
}
exit:
ctx->is_freeing = false;
// smb();
}
static void safe_free(struct lfq_ctx *ctx, struct lfq_node *lfn)
{
if (lfn->can_free && !inHP(ctx, lfn))
{
// free is not thread-safe
if (CAS(&ctx->is_freeing, 0, 1))
{
lfn->next = (void *)-1; // poison the pointer to detect use-after-free
free(lfn); // we got the lock; actually free
ctx->is_freeing = false;
// smb();
}
else // we didn't get the lock; only add to a freelist
enpool(ctx, lfn);
}
else
enpool(ctx, lfn);
free_pool(ctx, false);
}
static int alloc_tid(struct lfq_ctx *ctx)
{
for (int i = 0; i < ctx->MAXHPSIZE; i++)
if (ctx->tid_map[i] == 0)
if (CAS(&ctx->tid_map[i], 0, 1))
return i;
return -1;
}
static void free_tid(struct lfq_ctx *ctx, int tid)
{
ctx->tid_map[tid] = 0;
}
int lfq_init(struct lfq_ctx *ctx, int max_consume_thread)
{
struct lfq_node *tmpnode = calloc(1, sizeof(struct lfq_node));
if (!tmpnode)
return -errno;
struct lfq_node *free_pool_node = calloc(1, sizeof(struct lfq_node));
if (!free_pool_node)
return -errno;
tmpnode->can_free = free_pool_node->can_free = true;
memset(ctx, 0, sizeof(struct lfq_ctx));
ctx->MAXHPSIZE = max_consume_thread;
ctx->HP = calloc(max_consume_thread, sizeof(struct lfq_node));
ctx->tid_map = calloc(max_consume_thread, sizeof(struct lfq_node));
ctx->head = ctx->tail = tmpnode;
ctx->fph = ctx->fpt = free_pool_node;
return 0;
}
long lfg_count_freelist(const struct lfq_ctx *ctx)
{
long count = 0;
struct lfq_node *p = (struct lfq_node *)ctx->fph; // non-volatile
while (p)
{
count++;
p = p->free_next;
}
return count;
}
int lfq_clean(struct lfq_ctx *ctx)
{
if (ctx->tail && ctx->head)
{ // if have data in queue
struct lfq_node *tmp;
while ((struct lfq_node *)ctx->head)
{ // while still have node
tmp = (struct lfq_node *)ctx->head->next;
safe_free(ctx, (struct lfq_node *)ctx->head);
ctx->head = tmp;
}
ctx->tail = 0;
}
if (ctx->fph && ctx->fpt)
{
free_pool(ctx, true);
if (ctx->fph != ctx->fpt)
return -1;
free((void *)ctx->fpt); // free the empty node
ctx->fph = ctx->fpt = 0;
}
if (!ctx->fph && !ctx->fpt)
{
free((void *)ctx->HP);
free((void *)ctx->tid_map);
memset(ctx, 0, sizeof(struct lfq_ctx));
}
else
return -1;
return 0;
}
int lfq_enqueue(struct lfq_ctx *ctx, void *data)
{
struct lfq_node *insert_node = calloc(1, sizeof(struct lfq_node));
if (!insert_node)
return -errno;
insert_node->data = data;
// mb(); // we've only written to "private" memory that other threads can't see.
volatile struct lfq_node *old_tail;
#if 0
do {
old_tail = (struct lfq_node *) ctx->tail;
} while(!CAS(&ctx->tail,old_tail,insert_node));
#else
old_tail = XCHG(&ctx->tail, insert_node);
#endif
// We've claimed our spot in the insertion order by modifying tail
// we are the only inserting thread with a pointer to the old tail.
// now we can make it part of the list by overwriting the NULL pointer in the old tail
// This is safe whether or not other threads have updated ->next in our insert_node
#ifdef 0
assert(!(old_tail->next) && "old tail wasn't NULL");
#endif
old_tail->next = insert_node;
// TODO: could a consumer thread could have freed the old tail? no because that would leave head=NULL
// ATOMIC_ADD( &ctx->count, 1);
return 0;
}
void *lfq_dequeue_tid(struct lfq_ctx *ctx, int tid)
{
// int cn_runtimes = 0;
volatile struct lfq_node *old_head, *new_head;
#if 1 // HP[tid] stuff is necessary for deallocation. (but it's still not safe).
do
{
retry: // continue jumps to the bottom of the loop, and would attempt a CAS with uninitialized new_head
old_head = ctx->head;
ctx->HP[tid] = old_head; // seq-cst store. (better: use xchg instead of mov + mfence on x86)
mb();
if (old_head != ctx->head) // another thread freed it before seeing our HP[tid] store
goto retry;
new_head = old_head->next; // FIXME: crash with old_head=NULL during deallocation (tid=5)? (main thread=25486, this=25489)
if (new_head == 0 /* || new_head != old_head->next*/)
{ // redoing the same load isn't useful
ctx->HP[tid] = 0;
return 0; // never remove the last node
}
#ifdef DEBUG
assert(new_head != (void *)-1 && "read an already-freed node");
#endif
} while (!CAS(&ctx->head, old_head, new_head));
#else // without HP[] stuff
do
{
old_head = ctx->head;
// ctx->HP[tid] = old_head;
new_head = old_head->next;
// if (old_head != ctx->head) continue;
if (!new_head)
{
// ctx->HP[tid] = 0;
return 0; // never remove the last node
}
#ifdef DEBUG
assert(new_head != (void *)-1 && "read an already-freed node");
#endif
} while (!CAS(&ctx->head, old_head, new_head));
#endif
// mb(); // CAS is already a memory barrier, at least on x86.
// we've atomically advanced head, and we're the thread that won the race to claim a node
// We return the data from the *new* head.
// The list starts off with a dummy node, so the current head is always a node that's already been read.
ctx->HP[tid] = 0;
void *ret = new_head->data;
new_head->can_free = true;
// ATOMIC_SUB( &ctx->count, 1 );
// old_head->next = (void*)-1; // done in safe-free in the actual free() path. poison the pointer to detect use-after-free
// we need to avoid freeing until other readers are definitely not going to load its ->next in the CAS loop
safe_free(ctx, (struct lfq_node *)old_head);
// free(old_head);
return ret;
}
void *lfq_dequeue(struct lfq_ctx *ctx)
{
// return lfq_dequeue_tid(ctx, 0); // TODO: let this inline even in the shared library
// old version
int tid = alloc_tid(ctx);
if (tid == -1)
return (void *)-1; // To many thread race
void *ret = lfq_dequeue_tid(ctx, tid);
free_tid(ctx, tid);
return ret;
}
// ###############################################
// MY CODE
int *ret;
struct lfq_ctx ctx;
int var_to_get[8] = {-1, -1, -1, -1, -1, -1, -1, -1};
int vars[8] = {1, 2, 3, 4, 5, 6, 7, 8};
void __VERIFIER_atomic_enq(int *num)
{
lfq_enqueue(&ctx, (void *)num);
}
int *__VERIFIER_atomic_deq()
{
return (int *)lfq_dequeue(&ctx);
}
void __VERIFIER_atomic_print()
{
int i = 0;
while ((ret = __VERIFIER_atomic_deq()) != 0)
{
var_to_get[i] = *ret;
i++;
}
}
void *thread_func_one()
{
__VERIFIER_atomic_enq(&vars[0]);
__VERIFIER_atomic_enq(&vars[1]);
__VERIFIER_atomic_enq(&vars[2]);
__VERIFIER_atomic_deq();
__VERIFIER_atomic_enq(&vars[3]);
}
void *thread_func_two()
{
__VERIFIER_atomic_enq(&vars[4]);
__VERIFIER_atomic_deq();
__VERIFIER_atomic_enq(&vars[5]);
__VERIFIER_atomic_enq(&vars[6]);
__VERIFIER_atomic_enq(&vars[7]);
}
int main(int argc, char const *argv[])
{
lfq_init(&ctx, 1);
pthread_t thread_one, thread_two;
pthread_create(&thread_one, NULL, thread_func_one, NULL);
pthread_create(&thread_two, NULL, thread_func_two, NULL);
pthread_join(thread_one, NULL);
pthread_join(thread_two, NULL);
__VERIFIER_atomic_print();
//CS_ASSUME
assert(0);
return 0;
}