-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpthread-lite.h
165 lines (136 loc) · 3.43 KB
/
pthread-lite.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/* pthread-lite - Template classes for C++ concurrency
* Copyright 2016 Jeremiah Wala
* Modified and expanded by Jeremiah Wala ([email protected])
* Released under the MIT license
*
* Modified from wqueue.h by Vic Hargrave
* https://github.com/vichargrave/wqueue
*/
#ifndef PTHREAD_LITE_H__
#define PTHREAD_LITE_H__
#include <list>
#include <pthread.h>
/** Class to hold a queue of work items to
* be processed by individual threads
*/
template <typename T> class WorkQueue {
public:
WorkQueue() {
pthread_mutex_init(&m_mutex, NULL);
pthread_cond_init(&m_condv, NULL);
}
~WorkQueue() {
pthread_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_condv);
}
void add(T item) {
pthread_mutex_lock(&m_mutex);
m_queue.push_back(item);
pthread_cond_signal(&m_condv);
pthread_mutex_unlock(&m_mutex);
}
T remove() {
pthread_mutex_lock(&m_mutex);
while (m_queue.size() == 0) {
pthread_cond_wait(&m_condv, &m_mutex);
}
T item = m_queue.front();
m_queue.pop_front();
pthread_mutex_unlock(&m_mutex);
return item;
}
int size() {
pthread_mutex_lock(&m_mutex);
int size = m_queue.size();
pthread_mutex_unlock(&m_mutex);
return size;
}
std::list<T> m_queue;
pthread_mutex_t m_mutex;
pthread_cond_t m_condv;
};
class WorkThread {
public:
WorkThread() : m_tid(0), m_running(0), m_detached(0) {}
~WorkThread()
{
if (m_running == 1 && m_detached == 0) {
pthread_detach(m_tid);
}
if (m_running == 1) {
pthread_cancel(m_tid);
}
}
static void* runThread(void* arg) {
return ((WorkThread*)arg)->run();
}
int start()
{
int result = pthread_create(&m_tid, NULL, runThread, this);
if (result == 0) {
m_running = 1;
}
return result;
}
int join()
{
int result = -1;
if (m_running == 1) {
result = pthread_join(m_tid, NULL);
if (result == 0) {
m_detached = 1;
}
}
return result;
}
int detach()
{
int result = -1;
if (m_running == 1 && m_detached == 0) {
result = pthread_detach(m_tid);
if (result == 0) {
m_detached = 1;
}
}
return result;
}
pthread_t self() {
return m_tid;
}
virtual void* run() = 0;
private:
pthread_t m_tid;
int m_running;
int m_detached;
};
/** Class to hold both a reference to a queue of
* work items, and a set of data specific to a particular
* thread (eg separate pointer to a location in a file
* to avoid thread collisions when randomly accessing files)
*/
template <typename W, typename T> // W work item, T thread item
class ConsumerThread : public WorkThread {
public:
ConsumerThread(WorkQueue<W*>& queue, T* thread_data) : m_queue(queue), m_thread_data(thread_data) {}
void* run() {
// Remove 1 item at a time and process it. Blocks if no items are
// available to process.
for (int i = 0;; i++) {
W* item = (W*)m_queue.remove();
if (!item)
return NULL;
item->run(m_thread_data); // pass thread data to worker unit always
delete item;
if (m_queue.size() == 0)
return NULL;
}
return NULL;
}
// return thread data. Useful at end when want to process
// results stored per each thread
T* GetThreadData() const { return m_thread_data; }
private:
WorkQueue<W*>& m_queue; // a queue of worker units
T* m_thread_data; // a set of data private to this thread (often not uses)
};
#endif