thread_pool.hpp
Go to the documentation of this file.
1/*********************************************************************************\
2* *
3* https://github.com/cr-marcstevens/snippets/tree/master/cxxheaderonly *
4* *
5* thread_pool.hpp - A header only C++ light-weight thread pool *
6* Copyright (c) 2017 Marc Stevens *
7* *
8* MIT License *
9* *
10* Permission is hereby granted, free of charge, to any person obtaining a copy *
11* of this software and associated documentation files (the "Software"), to deal *
12* in the Software without restriction, including without limitation the rights *
13* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell *
14* copies of the Software, and to permit persons to whom the Software is *
15* furnished to do so, subject to the following conditions: *
16* *
17* The above copyright notice and this permission notice shall be included in all *
18* copies or substantial portions of the Software. *
19* *
20* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *
21* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *
22* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE *
23* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *
24* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, *
25* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE *
26* SOFTWARE. *
27* *
28\*********************************************************************************/
29
30#ifndef THREAD_POOL_HPP
31#define THREAD_POOL_HPP
32
33#include <cstdint>
34#include <memory>
35#include <stdexcept>
36#include <vector>
37#include <queue>
38#include <functional>
39#include <thread>
40#include <mutex>
41#include <condition_variable>
42#include <future>
43#include <atomic>
44
45/*************************** example usage ***************************************\
46grep "^//test.cpp" thread_pool.hpp -A33 > test.cpp
47g++ -std=c++11 -o test test.cpp -pthread -lpthread
48
49//test.cpp:
50#include "thread_pool.hpp"
51#include <iostream>
52
53int main()
54{
55 // use main thread also as worker using wait_work(), so init 1 less in thread pool
56 // (alternatively use wait_sleep() and make threads for all logical hardware cores)
57 thread_pool::thread_pool tp(std::thread::hardware_concurrency() - 1);
58
59 std::atomic_uint aui(0);
60 for (unsigned i = 0; i < 100; ++i)
61 tp.push( [&aui](){ ++aui; } );
62 std::cout << aui << std::endl;
63 tp.wait_work();
64 std::cout << aui << std::endl;
65
66 // blocking run 'void f()' function on all threads and main thread
67 tp.run( [&aui](){
68 std::cout << "Run 1: Thread started." << std::endl;
69 });
70
71 // blocking run 'void f(int)' function on all threads and main thread
72 tp.run( [&aui](int threadid){
73 std::cout << "Run 2: Thread " << threadid << " started." << std::endl;
74 });
75
76 // blocking run 'void f(int,int)' function on all threads and main thread
77 tp.run( [&aui](int threadid, int threads){
78 std::cout << "Run 3: Thread " << threadid << " of " << threads << " started." << std::endl;
79 });
80 return 0;
81}
82
83\************************* end example usage *************************************/
84
85namespace thread_pool {
86
87#if __cplusplus < 201703L
88 namespace detail {
89 template<class> struct result_of;
90 template<class F, class... ArgTypes>
91 struct result_of<F(ArgTypes...)> : std::result_of<F(ArgTypes...)> {};
92 }
93#else
94 namespace detail {
95 template<class> struct result_of;
96 template<class F, class... ArgTypes>
97 struct result_of<F(ArgTypes...)> : std::invoke_result<F, ArgTypes...> {};
98 }
99#endif
100
102 public:
103 thread_pool(std::size_t nrthreads = 0);
104 ~thread_pool();
105
106 std::size_t size() const;
107
108 // resize the thread pool
109 void resize(std::size_t nrthreads);
110
111 // enqueue a function and obtain a future on its return value
112 template<typename F, typename... Args>
113 auto enqueue(F&& f, Args&&... args) -> std::future<typename detail::result_of<F(Args...)>::type>;
114
115 // push a trivial function without a future
116 void push(const std::function<void()>& f);
117 void push(std::function<void()>&& f);
118
119 // stop the thread pool
120 void stop();
121
122 // process single task
123 bool work();
124
125 // process tasks & then wait until all threads are idle
126 void wait_work();
127
128 // sleep until all threads are idle
129 void wait_sleep();
130
131 // run a job 'void f()' on #threads <= #threadpoolsize+1
132 // (-1 => #threads = #threadpoolsize + 1)
133 void run(const std::function<void()>& f, int threads = -1);
134 // run a job given function 'void f(int threadid)'
135 void run(const std::function<void(int)>& f, int threads = -1);
136 // run a job given function 'void f(int threadid, int threads)' (0 <= threadid < threads)
137 void run(const std::function<void(int,int)>& f, int threads = -1);
138
139 private:
140 void _init_thread();
141
142 std::atomic_uint _threads_busy;
143 std::vector< std::unique_ptr<std::thread> > _threads;
144 std::vector< std::shared_ptr<std::atomic_bool> > _threads_stop;
145 std::queue< std::function<void()> > _tasks;
146 std::mutex _mutex;
147 std::condition_variable _condition;
148 };
149
150 class barrier {
151 public:
152 barrier(std::size_t count);
153 ~barrier() noexcept(false);
154
155 void wait();
156 private:
157 std::size_t _i, _count;
158 std::mutex _mutex;
159 std::condition_variable _condition;
160 };
161
162
163
164
165 inline thread_pool::thread_pool(std::size_t nrthreads)
166 {
167 _threads_busy = 0;
168 resize(nrthreads);
169 }
170
171 inline thread_pool::~thread_pool()
172 {
173 stop();
174 }
175
176 inline std::size_t thread_pool::size() const
177 {
178 return _threads.size();
179 }
180
181 inline void thread_pool::stop()
182 {
183 resize(0);
184 }
185
186 inline bool thread_pool::work()
187 {
188 std::function<void()> task;
189 {
190 std::lock_guard<std::mutex> lock(_mutex);
191 if (_tasks.empty())
192 return false;
193 task = std::move(this->_tasks.front());
194 this->_tasks.pop();
195 }
196 task();
197 return true;
198 }
199
200 inline void thread_pool::wait_work()
201 {
202 while (work())
203 ;
204 while (_threads_busy != 0)
205 std::this_thread::yield();
206 }
207
208 inline void thread_pool::wait_sleep()
209 {
210 while (_tasks.size() != 0 || _threads_busy != 0)
211 std::this_thread::yield();
212 }
213
214 inline void thread_pool::push(const std::function<void()>& f)
215 {
216 {
217 std::unique_lock<std::mutex> lock(_mutex);
218 _tasks.emplace(f);
219 }
220 _condition.notify_one();
221 }
222
223 inline void thread_pool::push(std::function<void()>&& f)
224 {
225 {
226 std::unique_lock<std::mutex> lock(_mutex);
227 _tasks.emplace(std::move(f));
228 }
229 _condition.notify_one();
230 }
231
232 template<typename F, typename... Args>
233 inline auto thread_pool::enqueue(F&& f, Args&&... args)
234 -> std::future<typename detail::result_of<F(Args...)>::type>
235 {
236 typedef typename detail::result_of<F(Args...)>::type return_type;
237 auto task = std::make_shared< std::packaged_task<return_type()> >
238 ( std::bind(std::forward<F>(f), std::forward<Args>(args)...) );
239 push( [task](){ (*task)(); } );
240 return task->get_future();
241 }
242
243 inline void thread_pool::run(const std::function<void()>& f, int threads)
244 {
245 if (threads < 1 || threads > int(_threads.size())+1)
246 threads = int(_threads.size())+1;
247 {
248 std::unique_lock<std::mutex> lock(_mutex);
249 for (int i = 0; i < threads-1; ++i)
250 _tasks.emplace(f);
251 }
252 _condition.notify_all();
253 f();
254 this->wait_sleep();
255 }
256
257 inline void thread_pool::run(const std::function<void(int)>& f, int threads)
258 {
259 if (threads < 1 || threads > int(_threads.size())+1)
260 threads = int(_threads.size())+1;
261 {
262 std::unique_lock<std::mutex> lock(_mutex);
263 for (int i = 0; i < threads-1; ++i)
264 _tasks.emplace( [=](){f(i);} );
265 }
266 _condition.notify_all();
267 f(threads-1);
268 this->wait_sleep();
269 }
270
271 inline void thread_pool::run(const std::function<void(int,int)>& f, int threads)
272 {
273 if (threads < 1 || threads > int(_threads.size())+1)
274 threads = int(_threads.size())+1;
275 {
276 std::unique_lock<std::mutex> lock(_mutex);
277 for (int i = 0; i < threads-1; ++i)
278 _tasks.emplace( [=](){f(i,threads);} );
279 }
280 _condition.notify_all();
281 f(threads-1,threads);
282 this->wait_sleep();
283 }
284
285 inline void thread_pool::resize(std::size_t nrthreads)
286 {
287 if (nrthreads < _threads.size())
288 {
289 // decreasing number of active threads
290 std::unique_lock<std::mutex> lock(_mutex);
291 for (std::size_t i = nrthreads; i < _threads.size(); ++i)
292 *(_threads_stop[i]) = true;
293 _condition.notify_all();
294 lock.unlock();
295 for (std::size_t i = nrthreads; i < _threads.size(); ++i)
296 _threads[i]->join();
297
298 lock.lock();
299 _threads_stop.resize(nrthreads);
300 _threads.resize(nrthreads);
301 }
302 else if (nrthreads > _threads.size())
303 {
304 // wait before resizing because it may cause reallocation
305 wait_work();
306
307 std::unique_lock<std::mutex> _lock(_mutex);
308 _threads_stop.reserve(nrthreads);
309 _threads.reserve(nrthreads);
310 for (std::size_t i = _threads.size(); i < nrthreads; ++i)
311 {
312 _threads_stop.emplace_back(new std::atomic_bool(false));
313 _init_thread();
314 }
315 }
316 }
317
318 inline void thread_pool::_init_thread()
319 {
320 std::size_t i = _threads.size();
321 if (i >= _threads_stop.size())
322 throw std::runtime_error("thread_pool::_init_thread(): index out of range!");
323 auto f = [this,i]() {
324 std::function<void()> task;
325 std::unique_lock<std::mutex> lock(this->_mutex);
326 while (true) {
327 if (this->_tasks.empty())
328 {
329 if (*(this->_threads_stop[i]))
330 return;
331 this->_condition.wait(lock);
332 continue;
333 }
334
335 ++this->_threads_busy;
336 task = std::move(this->_tasks.front());
337 this->_tasks.pop();
338
339 lock.unlock();
340 task();
341 --this->_threads_busy;
342 lock.lock();
343 }
344 };
345 _threads.emplace_back(new std::thread(f));
346 }
347
348
349
350
351 inline barrier::barrier(std::size_t count)
352 : _i(0), _count(count)
353 {
354 }
355
356 inline barrier::~barrier() noexcept(false)
357 {
358 if (_i != 0)
359 throw std::runtime_error("barrier::~barrier: threads are still waiting on barrier");
360 }
361
362 inline void barrier::wait()
363 {
364 std::unique_lock<std::mutex> lock(_mutex);
365 if (++_i >= _count)
366 {
367 _i = 0;
368 lock.unlock();
369 _condition.notify_all();
370 }
371 else
372 {
373 _condition.wait(lock);
374 }
375 }
376
377} // namespace thread_pool
378
379#endif // THREAD_POOL_HPP
Definition: thread_pool.hpp:150
~barrier() noexcept(false)
Definition: thread_pool.hpp:356
void wait()
Definition: thread_pool.hpp:362
barrier(std::size_t count)
Definition: thread_pool.hpp:351
Definition: thread_pool.hpp:101
Definition: thread_pool.hpp:85
Definition: thread_pool.hpp:89
FPLLL_BEGIN_NAMESPACE typedef std::mutex mutex
Definition: threadpool.h:25