30#ifndef THREAD_POOL_HPP
31#define THREAD_POOL_HPP
41#include <condition_variable>
87#if __cplusplus < 201703L
90 template<
class F,
class... ArgTypes>
91 struct result_of<F(ArgTypes...)> : std::result_of<F(ArgTypes...)> {};
95 template<
class>
struct result_of;
96 template<
class F,
class... ArgTypes>
97 struct result_of<F(ArgTypes...)> : std::invoke_result<F, ArgTypes...> {};
106 std::size_t size()
const;
109 void resize(std::size_t nrthreads);
112 template<
typename F,
typename... Args>
113 auto enqueue(F&& f, Args&&... args) -> std::future<
typename detail::result_of<F(Args...)>::type>;
116 void push(
const std::function<
void()>& f);
117 void push(std::function<
void()>&& f);
133 void run(
const std::function<
void()>& f,
int threads = -1);
135 void run(
const std::function<
void(
int)>& f,
int threads = -1);
137 void run(
const std::function<
void(
int,
int)>& f,
int threads = -1);
142 std::atomic_uint _threads_busy;
143 std::vector< std::unique_ptr<std::thread> > _threads;
144 std::vector< std::shared_ptr<std::atomic_bool> > _threads_stop;
145 std::queue< std::function<void()> > _tasks;
147 std::condition_variable _condition;
157 std::size_t _i, _count;
159 std::condition_variable _condition;
171 inline thread_pool::~thread_pool()
176 inline std::size_t thread_pool::size()
const
178 return _threads.size();
181 inline void thread_pool::stop()
186 inline bool thread_pool::work()
188 std::function<void()> task;
190 std::lock_guard<std::mutex> lock(_mutex);
193 task = std::move(this->_tasks.front());
200 inline void thread_pool::wait_work()
204 while (_threads_busy != 0)
205 std::this_thread::yield();
208 inline void thread_pool::wait_sleep()
210 while (_tasks.size() != 0 || _threads_busy != 0)
211 std::this_thread::yield();
214 inline void thread_pool::push(
const std::function<
void()>& f)
217 std::unique_lock<std::mutex> lock(_mutex);
220 _condition.notify_one();
223 inline void thread_pool::push(std::function<
void()>&& f)
226 std::unique_lock<std::mutex> lock(_mutex);
227 _tasks.emplace(std::move(f));
229 _condition.notify_one();
232 template<
typename F,
typename... Args>
233 inline auto thread_pool::enqueue(F&& f, Args&&... args)
237 auto task = std::make_shared< std::packaged_task<return_type()> >
238 ( std::bind(std::forward<F>(f), std::forward<Args>(args)...) );
239 push( [task](){ (*task)(); } );
240 return task->get_future();
243 inline void thread_pool::run(
const std::function<
void()>& f,
int threads)
245 if (threads < 1 || threads >
int(_threads.size())+1)
246 threads = int(_threads.size())+1;
248 std::unique_lock<std::mutex> lock(_mutex);
249 for (
int i = 0; i < threads-1; ++i)
252 _condition.notify_all();
257 inline void thread_pool::run(
const std::function<
void(
int)>& f,
int threads)
259 if (threads < 1 || threads >
int(_threads.size())+1)
260 threads = int(_threads.size())+1;
262 std::unique_lock<std::mutex> lock(_mutex);
263 for (
int i = 0; i < threads-1; ++i)
264 _tasks.emplace( [=](){f(i);} );
266 _condition.notify_all();
271 inline void thread_pool::run(
const std::function<
void(
int,
int)>& f,
int threads)
273 if (threads < 1 || threads >
int(_threads.size())+1)
274 threads = int(_threads.size())+1;
276 std::unique_lock<std::mutex> lock(_mutex);
277 for (
int i = 0; i < threads-1; ++i)
278 _tasks.emplace( [=](){f(i,threads);} );
280 _condition.notify_all();
281 f(threads-1,threads);
285 inline void thread_pool::resize(std::size_t nrthreads)
287 if (nrthreads < _threads.size())
290 std::unique_lock<std::mutex> lock(_mutex);
291 for (std::size_t i = nrthreads; i < _threads.size(); ++i)
292 *(_threads_stop[i]) =
true;
293 _condition.notify_all();
295 for (std::size_t i = nrthreads; i < _threads.size(); ++i)
299 _threads_stop.resize(nrthreads);
300 _threads.resize(nrthreads);
302 else if (nrthreads > _threads.size())
307 std::unique_lock<std::mutex> _lock(_mutex);
308 _threads_stop.reserve(nrthreads);
309 _threads.reserve(nrthreads);
310 for (std::size_t i = _threads.size(); i < nrthreads; ++i)
312 _threads_stop.emplace_back(
new std::atomic_bool(
false));
318 inline void thread_pool::_init_thread()
320 std::size_t i = _threads.size();
321 if (i >= _threads_stop.size())
322 throw std::runtime_error(
"thread_pool::_init_thread(): index out of range!");
323 auto f = [
this,i]() {
324 std::function<void()> task;
325 std::unique_lock<std::mutex> lock(this->_mutex);
327 if (this->_tasks.empty())
329 if (*(this->_threads_stop[i]))
331 this->_condition.wait(lock);
335 ++this->_threads_busy;
336 task = std::move(this->_tasks.front());
341 --this->_threads_busy;
345 _threads.emplace_back(
new std::thread(f));
352 : _i(0), _count(count)
359 throw std::runtime_error(
"barrier::~barrier: threads are still waiting on barrier");
364 std::unique_lock<std::mutex> lock(_mutex);
369 _condition.notify_all();
373 _condition.wait(lock);
Definition: thread_pool.hpp:150
~barrier() noexcept(false)
Definition: thread_pool.hpp:356
void wait()
Definition: thread_pool.hpp:362
barrier(std::size_t count)
Definition: thread_pool.hpp:351
Definition: thread_pool.hpp:101
Definition: thread_pool.hpp:85
Definition: thread_pool.hpp:89
FPLLL_BEGIN_NAMESPACE typedef std::mutex mutex
Definition: threadpool.h:25