-
Notifications
You must be signed in to change notification settings - Fork 5
/
concurrent_queue_bench.py
80 lines (59 loc) · 2.32 KB
/
concurrent_queue_bench.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# Copyright (c) Meta Platforms, Inc. and affiliates.
# pyre-strict
import os
import queue
from ft_utils.benchmark_utils import BenchmarkProvider, execute_benchmarks
from ft_utils.concurrent import ConcurrentQueue, StdConcurrentQueue
from ft_utils.local import LocalWrapper
ConcurrentQueue.put = ConcurrentQueue.push # type: ignore
ConcurrentQueue.get = ConcurrentQueue.pop # type: ignore
class ConcurretQueueBenchmarkProvider(BenchmarkProvider):
def __init__(self, operations: int) -> None:
self._operations = operations
self._queue: ConcurrentQueue | None = None
self._queue_lf: ConcurrentQueue | None = None
self._queue_queue: queue.Queue | None = None # type: ignore
self._queue_std: StdConcurrentQueue | None = None # type: ignore
def set_up(self) -> None:
self._queue = ConcurrentQueue(os.cpu_count())
self._queue_lf = ConcurrentQueue(os.cpu_count(), lock_free=True)
self._queue_queue = queue.Queue()
self._queue_std = StdConcurrentQueue()
def benchmark_locked(self) -> None:
lw = LocalWrapper(self._queue)
self._bm(lw)
def benchmark_lock_free(self) -> None:
lw = LocalWrapper(self._queue_lf)
self._bm(lw)
def benchmark_std(self) -> None:
lw = LocalWrapper(self._queue_std)
self._bm(lw)
def benchmark_queue(self) -> None:
lw = LocalWrapper(self._queue_queue)
self._bm(lw)
def _bm(self, lw) -> None: # type: ignore
for n in range(self._operations):
lw.put(n)
lw.get()
def benchmark_locked_batch(self) -> None:
lw = LocalWrapper(self._queue)
self._bmb(lw)
def benchmark_lock_free_batch(self) -> None:
lw = LocalWrapper(self._queue_lf)
self._bmb(lw)
def benchmark_std_batch(self) -> None:
lw = LocalWrapper(self._queue_std)
self._bmb(lw)
def benchmark_queue_batch(self) -> None:
lw = LocalWrapper(self._queue_queue)
self._bmb(lw)
def _bmb(self, lw) -> None: # type: ignore
for n in range(self._operations // 100):
for _ in range(100):
lw.put(n)
for _ in range(100):
lw.get()
def invoke_main() -> None:
execute_benchmarks(ConcurretQueueBenchmarkProvider)
if __name__ == "__main__":
invoke_main()