Scroom  0.14
threadpool.hh
Go to the documentation of this file.
1 /*
2  * Scroom - Generic viewer for 2D data
3  * Copyright (C) 2009-2022 Kees-Jan Dijkzeul
4  *
5  * SPDX-License-Identifier: LGPL-2.1
6  */
7 
8 #pragma once
9 
10 #ifdef HAVE_CONFIG_H
11 # include <config.h>
12 #endif
13 
14 #include <queue>
15 #include <vector>
16 
17 #include <boost/function.hpp>
18 #include <boost/shared_ptr.hpp>
19 #include <boost/thread.hpp>
20 
21 #include <scroom/semaphore.hh>
22 
24 enum
25 {
26  PRIO_HIGHEST = 100,
33 };
34 
36 {
37  class QueueImpl;
38 } // namespace Scroom::Detail::ThreadPool
39 
47 {
48 public:
49  class WeakQueue;
50 
79  class Queue
80  {
81  public:
82  using Ptr = boost::shared_ptr<Queue>;
83  using WeakPtr = boost::weak_ptr<Queue>;
84 
85  public:
91  static Ptr create();
92 
98  static Ptr createAsync();
99 
100  ~Queue();
101  boost::shared_ptr<Scroom::Detail::ThreadPool::QueueImpl> get();
102  boost::shared_ptr<WeakQueue> getWeak();
103 
104  private:
105  Queue();
106  Queue(const Queue&) = delete;
107  Queue(Queue&&) = delete;
108  Queue& operator=(const Queue&) = delete;
109  Queue& operator=(Queue&&) = delete;
110 
111  private:
112  boost::shared_ptr<WeakQueue> weak;
113  };
114 
134  class WeakQueue
135  {
136  public:
137  using Ptr = boost::shared_ptr<WeakQueue>;
138  using WeakPtr = boost::weak_ptr<WeakQueue>;
139 
140  public:
141  ~WeakQueue() = default;
142 
143  WeakQueue(const WeakQueue&) = delete;
144  WeakQueue(WeakQueue&&) = delete;
145  WeakQueue& operator=(const WeakQueue&) = delete;
146  WeakQueue& operator=(WeakQueue&&) = delete;
147 
148  static Ptr create();
149  boost::shared_ptr<Scroom::Detail::ThreadPool::QueueImpl> get();
150 
151  private:
152  WeakQueue();
153 
154  private:
155  boost::shared_ptr<Scroom::Detail::ThreadPool::QueueImpl> qi;
156  };
157 
158 private:
159  struct Job
160  {
161  boost::shared_ptr<Scroom::Detail::ThreadPool::QueueImpl> queue;
162  boost::function<void()> fn;
163 
164  Job() = default;
165  Job(boost::function<void()> fn, const WeakQueue::Ptr& queue);
166  };
167 
168 public:
169  using Ptr = boost::shared_ptr<ThreadPool>;
170  using ThreadPtr = boost::shared_ptr<boost::thread>;
171 
172 private:
194  {
195  public:
196  using Ptr = boost::shared_ptr<PrivateData>;
197 
198  public:
199  unsigned int jobcount{0};
200  boost::mutex mut;
201  bool alive{true};
202  boost::condition_variable cond;
212  std::map<int, std::queue<Job>> jobs;
213 
221 
238 
239  private:
241 
242  public:
244  };
245 
246  std::list<ThreadPtr> threads;
248 
249 private:
259  static void work(const PrivateData::Ptr& priv);
260 
267  static void do_one(const PrivateData::Ptr& priv);
268 
269  static Queue::Ptr defaultQueue();
270  static const int defaultPriority;
271 
272 public:
274  explicit ThreadPool(bool completeAllJobsBeforeDestruction = false);
275 
277  explicit ThreadPool(int count, bool completeAllJobsBeforeDestruction = false);
278 
280  static ThreadPool::Ptr create(bool completeAllJobsBeforeDestruction = false);
281 
283  static ThreadPool::Ptr create(int count, bool completeAllJobsBeforeDestruction = false);
284 
285  ThreadPool(const ThreadPool&) = delete;
286  ThreadPool(ThreadPool&&) = delete;
287  ThreadPool operator=(const ThreadPool&) = delete;
288  ThreadPool operator=(ThreadPool&&) = delete;
289 
296  ~ThreadPool();
297 
299  void schedule(boost::function<void()> const& fn, int priority = defaultPriority, const Queue::Ptr& queue = defaultQueue());
300 
302  void schedule(boost::function<void()> const& fn, const Queue::Ptr& queue);
303 
309  template <typename T>
310  void schedule(boost::shared_ptr<T> fn, int priority = defaultPriority, const Queue::Ptr& queue = defaultQueue());
311 
317  template <typename T>
318  void schedule(boost::shared_ptr<T> fn, const Queue::Ptr& queue);
319 
321  void schedule(boost::function<void()> const& fn, int priority, const WeakQueue::Ptr& queue);
322 
324  void schedule(boost::function<void()> const& fn, const WeakQueue::Ptr& queue);
325 
331  template <typename T>
332  void schedule(boost::shared_ptr<T> fn, int priority, WeakQueue::Ptr queue);
333 
339  template <typename T>
340  void schedule(boost::shared_ptr<T> fn, WeakQueue::Ptr queue);
341 
342  template <typename R>
343  boost::unique_future<R>
344  schedule(boost::function<R()> const& fn, int priority = defaultPriority, const Queue::Ptr& queue = defaultQueue());
345 
346  template <typename R>
347  boost::unique_future<R> schedule(boost::function<R()> const& fn, const Queue::Ptr& queue);
348 
349  template <typename R, typename T>
350  boost::unique_future<R>
351  schedule(boost::shared_ptr<T> fn, int priority = defaultPriority, const Queue::Ptr& queue = defaultQueue());
352 
353  template <typename R, typename T>
354  boost::unique_future<R> schedule(boost::shared_ptr<T> fn, const Queue::Ptr& queue);
355 
356  template <typename R>
357  boost::unique_future<R> schedule(boost::function<R()> const& fn, int priority, WeakQueue::Ptr queue);
358 
359  template <typename R>
360  boost::unique_future<R> schedule(boost::function<R()> const& fn, WeakQueue::Ptr queue);
361 
362  template <typename R, typename T>
363  boost::unique_future<R> schedule(boost::shared_ptr<T> fn, int priority, WeakQueue::Ptr queue);
364 
365  template <typename R, typename T>
366  boost::unique_future<R> schedule(boost::shared_ptr<T> fn, WeakQueue::Ptr queue);
367 
375  ThreadPtr add();
376 
384  std::vector<ThreadPtr> add(int count);
385 };
386 
535 {
536 public:
537  using Ptr = boost::shared_ptr<QueueJumper>;
538 
539 private:
540  boost::mutex mut;
541  bool inQueue{true};
542  bool isSet{false};
543 
544  boost::function<void()> fn;
545 
546 protected:
547  QueueJumper() = default;
548 
549 public:
561  static Ptr create();
562 
571  bool setWork(boost::function<void()> const& fn);
572 
573  void operator()();
574 };
575 
588 
606 
ThreadPool::Job::Job
Job()=default
ThreadPool::ThreadPool
ThreadPool(bool completeAllJobsBeforeDestruction=false)
ThreadPool.
Definition: threadpoolimpl.cc:207
ThreadPool::Ptr
boost::shared_ptr< ThreadPool > Ptr
Definition: threadpool.hh:169
ThreadPool::Queue::Ptr
boost::shared_ptr< Queue > Ptr
Definition: threadpool.hh:82
PRIO_LOW
@ PRIO_LOW
Definition: threadpool.hh:30
ThreadPool::PrivateData::mut
boost::mutex mut
Definition: threadpool.hh:200
ThreadPool::Queue::get
boost::shared_ptr< Scroom::Detail::ThreadPool::QueueImpl > get()
Definition: threadpoolimpl.cc:388
ThreadPool::WeakQueue::Ptr
boost::shared_ptr< WeakQueue > Ptr
Definition: threadpool.hh:137
QueueJumper::inQueue
bool inQueue
Definition: threadpool.hh:541
ThreadPool::operator=
ThreadPool operator=(const ThreadPool &)=delete
ThreadPool
Definition: threadpool.hh:46
ThreadPool::Queue::createAsync
static Ptr createAsync()
Definition: threadpoolimpl.cc:379
Scroom::Detail::ThreadPool
Definition: async-deleter.hh:21
ThreadPool::PrivateData::create
static Ptr create(bool completeAllJobsBeforeDestruction)
Definition: threadpoolimpl.cc:198
QueueJumper::setWork
bool setWork(boost::function< void()> const &fn)
Definition: threadpoolimpl.cc:421
ThreadPool::Queue::operator=
Queue & operator=(const Queue &)=delete
ThreadPool::PrivateData::defaultQueue
Queue::Ptr defaultQueue
Definition: threadpool.hh:237
ThreadPool::PrivateData::PrivateData
PrivateData(bool completeAllJobsBeforeDestruction)
ThreadPool::PrivateData.
Definition: threadpoolimpl.cc:192
ThreadPool::Job
Definition: threadpool.hh:159
Sequentially
ThreadPool::Ptr Sequentially()
Definition: threadpoolimpl.cc:459
PRIO_LOWEST
@ PRIO_LOWEST
Definition: threadpool.hh:32
ThreadPool::WeakQueue::WeakQueue
WeakQueue()
Definition: threadpoolimpl.cc:398
PRIO_LOWER
@ PRIO_LOWER
Definition: threadpool.hh:31
QueueJumper::create
static Ptr create()
QueueJumper.
Definition: threadpoolimpl.cc:419
ThreadPool::Queue
Definition: threadpool.hh:79
QueueJumper
Definition: threadpool.hh:534
ThreadPool::defaultPriority
static const int defaultPriority
Definition: threadpool.hh:270
ThreadPool::Queue::weak
boost::shared_ptr< WeakQueue > weak
Definition: threadpool.hh:112
ThreadPool::Queue::create
static Ptr create()
ThreadPool::Queue.
Definition: threadpoolimpl.cc:377
ThreadPool::PrivateData::alive
bool alive
Definition: threadpool.hh:201
QueueJumper::fn
boost::function< void()> fn
Definition: threadpool.hh:544
QueueJumper::Ptr
boost::shared_ptr< QueueJumper > Ptr
Definition: threadpool.hh:537
ThreadPool::WeakQueue::WeakPtr
boost::weak_ptr< WeakQueue > WeakPtr
Definition: threadpool.hh:138
ThreadPool::WeakQueue
Definition: threadpool.hh:134
ThreadPool::WeakQueue::get
boost::shared_ptr< Scroom::Detail::ThreadPool::QueueImpl > get()
Definition: threadpoolimpl.cc:403
ThreadPool::PrivateData::cond
boost::condition_variable cond
Definition: threadpool.hh:202
ThreadPool::defaultQueue
static Queue::Ptr defaultQueue()
Definition: threadpoolimpl.cc:365
PRIO_HIGHEST
@ PRIO_HIGHEST
Definition: threadpool.hh:26
QueueJumper::mut
boost::mutex mut
Definition: threadpool.hh:540
CpuBound
ThreadPool::Ptr CpuBound()
Definition: threadpoolimpl.cc:452
ThreadPool::Queue::getWeak
boost::shared_ptr< WeakQueue > getWeak()
Definition: threadpoolimpl.cc:390
ThreadPool::WeakQueue::create
static Ptr create()
ThreadPool::WeakQueue.
Definition: threadpoolimpl.cc:396
ThreadPool::PrivateData::completeAllJobsBeforeDestruction
bool completeAllJobsBeforeDestruction
Definition: threadpool.hh:220
ThreadPool::create
static ThreadPool::Ptr create(bool completeAllJobsBeforeDestruction=false)
Definition: threadpoolimpl.cc:228
ThreadPool::PrivateData::Ptr
boost::shared_ptr< PrivateData > Ptr
Definition: threadpool.hh:196
ThreadPool::PrivateData::jobs
std::map< int, std::queue< Job > > jobs
Definition: threadpool.hh:212
PRIO_HIGH
@ PRIO_HIGH
Definition: threadpool.hh:28
ThreadPool::WeakQueue::~WeakQueue
~WeakQueue()=default
PRIO_HIGHER
@ PRIO_HIGHER
Definition: threadpool.hh:27
ThreadPool::WeakQueue::operator=
WeakQueue & operator=(const WeakQueue &)=delete
ThreadPool::Job::fn
boost::function< void()> fn
Definition: threadpool.hh:162
ThreadPool::Queue::~Queue
~Queue()
Definition: threadpoolimpl.cc:386
QueueJumper::QueueJumper
QueueJumper()=default
ThreadPool::Queue::Queue
Queue()
Definition: threadpoolimpl.cc:381
ThreadPool::PrivateData
Definition: threadpool.hh:193
ThreadPool::WeakQueue::qi
boost::shared_ptr< Scroom::Detail::ThreadPool::QueueImpl > qi
Definition: threadpool.hh:155
ThreadPool::do_one
static void do_one(const PrivateData::Ptr &priv)
Definition: threadpoolimpl.cc:308
ThreadPool::add
ThreadPtr add()
Definition: threadpoolimpl.cc:238
ThreadPool::Queue::WeakPtr
boost::weak_ptr< Queue > WeakPtr
Definition: threadpool.hh:83
ThreadPool::ThreadPtr
boost::shared_ptr< boost::thread > ThreadPtr
Definition: threadpool.hh:170
PRIO_NORMAL
@ PRIO_NORMAL
Definition: threadpool.hh:29
ThreadPool::work
static void work(const PrivateData::Ptr &priv)
Definition: threadpoolimpl.cc:273
ThreadPool::schedule
void schedule(boost::function< void()> const &fn, int priority=defaultPriority, const Queue::Ptr &queue=defaultQueue())
Definition: threadpoolimpl.cc:342
QueueJumper::isSet
bool isSet
Definition: threadpool.hh:542
ThreadPool::priv
PrivateData::Ptr priv
Definition: threadpool.hh:247
ThreadPool::PrivateData::jobcount
unsigned int jobcount
Definition: threadpool.hh:199
ThreadPool::Job::queue
boost::shared_ptr< Scroom::Detail::ThreadPool::QueueImpl > queue
Definition: threadpool.hh:161
semaphore.hh
threadpoolimpl.hh
ThreadPool::threads
std::list< ThreadPtr > threads
Definition: threadpool.hh:246
ThreadPool::~ThreadPool
~ThreadPool()
Definition: threadpoolimpl.cc:257
QueueJumper::operator()
void operator()()
Definition: threadpoolimpl.cc:438