Scroom  0.14
ThreadPool Class Reference

#include <threadpool.hh>

Collaboration diagram for ThreadPool:
Collaboration graph

Classes

struct  Job
 
class  PrivateData
 
class  Queue
 
class  WeakQueue
 

Public Types

using Ptr = boost::shared_ptr< ThreadPool >
 
using ThreadPtr = boost::shared_ptr< boost::thread >
 

Public Member Functions

 ThreadPool (bool completeAllJobsBeforeDestruction=false)
 ThreadPool. More...
 
 ThreadPool (int count, bool completeAllJobsBeforeDestruction=false)
 
 ThreadPool (const ThreadPool &)=delete
 
 ThreadPool (ThreadPool &&)=delete
 
ThreadPool operator= (const ThreadPool &)=delete
 
ThreadPool operator= (ThreadPool &&)=delete
 
 ~ThreadPool ()
 
void schedule (boost::function< void()> const &fn, int priority=defaultPriority, const Queue::Ptr &queue=defaultQueue())
 
void schedule (boost::function< void()> const &fn, const Queue::Ptr &queue)
 
template<typename T >
void schedule (boost::shared_ptr< T > fn, int priority=defaultPriority, const Queue::Ptr &queue=defaultQueue())
 
template<typename T >
void schedule (boost::shared_ptr< T > fn, const Queue::Ptr &queue)
 
void schedule (boost::function< void()> const &fn, int priority, const WeakQueue::Ptr &queue)
 
void schedule (boost::function< void()> const &fn, const WeakQueue::Ptr &queue)
 
template<typename T >
void schedule (boost::shared_ptr< T > fn, int priority, WeakQueue::Ptr queue)
 
template<typename T >
void schedule (boost::shared_ptr< T > fn, WeakQueue::Ptr queue)
 
template<typename R >
boost::unique_future< R > schedule (boost::function< R()> const &fn, int priority=defaultPriority, const Queue::Ptr &queue=defaultQueue())
 
template<typename R >
boost::unique_future< R > schedule (boost::function< R()> const &fn, const Queue::Ptr &queue)
 
template<typename R , typename T >
boost::unique_future< R > schedule (boost::shared_ptr< T > fn, int priority=defaultPriority, const Queue::Ptr &queue=defaultQueue())
 
template<typename R , typename T >
boost::unique_future< R > schedule (boost::shared_ptr< T > fn, const Queue::Ptr &queue)
 
template<typename R >
boost::unique_future< R > schedule (boost::function< R()> const &fn, int priority, WeakQueue::Ptr queue)
 
template<typename R >
boost::unique_future< R > schedule (boost::function< R()> const &fn, WeakQueue::Ptr queue)
 
template<typename R , typename T >
boost::unique_future< R > schedule (boost::shared_ptr< T > fn, int priority, WeakQueue::Ptr queue)
 
template<typename R , typename T >
boost::unique_future< R > schedule (boost::shared_ptr< T > fn, WeakQueue::Ptr queue)
 
ThreadPtr add ()
 
std::vector< ThreadPtradd (int count)
 

Static Public Member Functions

static ThreadPool::Ptr create (bool completeAllJobsBeforeDestruction=false)
 
static ThreadPool::Ptr create (int count, bool completeAllJobsBeforeDestruction=false)
 

Static Private Member Functions

static void work (const PrivateData::Ptr &priv)
 
static void do_one (const PrivateData::Ptr &priv)
 
static Queue::Ptr defaultQueue ()
 

Private Attributes

std::list< ThreadPtrthreads
 
PrivateData::Ptr priv
 

Static Private Attributes

static const int defaultPriority = PRIO_NORMAL
 

Detailed Description

Generic threadpool

A ThreadPool is basically a collection of threads you can schedule() work on.

Member Typedef Documentation

◆ Ptr

using ThreadPool::Ptr = boost::shared_ptr<ThreadPool>

◆ ThreadPtr

using ThreadPool::ThreadPtr = boost::shared_ptr<boost::thread>

Constructor & Destructor Documentation

◆ ThreadPool() [1/4]

ThreadPool::ThreadPool ( bool  completeAllJobsBeforeDestruction = false)
explicit

ThreadPool.

Create a ThreadPool with one thread for each core in the system

208  : priv(PrivateData::create(completeAllJobsBeforeDestruction))
209 {
210  const int count = boost::thread::hardware_concurrency();
211 #ifndef MULTITHREADING
212  if(count > 1)
213  count = 1;
214 #endif
215  add(count);
216 }

Referenced by CpuBound(), create(), and Sequentially().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ ThreadPool() [2/4]

ThreadPool::ThreadPool ( int  count,
bool  completeAllJobsBeforeDestruction = false 
)
explicit

Create a ThreadPool with the given number of threads

219  : priv(PrivateData::create(completeAllJobsBeforeDestruction))
220 {
221 #ifndef MULTITHREADING
222  if(count > 1)
223  count = 1;
224 #endif
225  add(count);
226 }
Here is the call graph for this function:

◆ ThreadPool() [3/4]

ThreadPool::ThreadPool ( const ThreadPool )
delete

◆ ThreadPool() [4/4]

ThreadPool::ThreadPool ( ThreadPool &&  )
delete

◆ ~ThreadPool()

ThreadPool::~ThreadPool ( )

Destructor

  • stop all threads
  • throw away all jobs that haven't been executed
258 {
259  // Destroying the threadpool used to be done by interrupting all
260  // threads, but this doesn't work reliably, at least until boost
261  // 1.45. Hence, we're back to using an "alive" boolean and a regular
262  // condition variable.
263  //
264  // See also https://svn.boost.org/trac/boost/ticket/2330
265 
266  {
267  boost::mutex::scoped_lock const lock(priv->mut);
268  priv->alive = false;
269  priv->cond.notify_all();
270  }
271 }

Member Function Documentation

◆ add() [1/2]

ThreadPool::ThreadPtr ThreadPool::add ( )

Add an additional thread to the pool.

This is mostly used for testing

Returns
a reference to the newly added thread.
239 {
240  ThreadPool::ThreadPtr t = ThreadPool::ThreadPtr(new boost::thread(boost::bind(&ThreadPool::work, priv)));
241  threads.push_back(t);
242  ThreadList::instance()->add(t);
243  return t;
244 }

Referenced by add(), BOOST_AUTO_TEST_CASE(), and ThreadPool().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ add() [2/2]

std::vector< ThreadPool::ThreadPtr > ThreadPool::add ( int  count)

Add the given number of threads to the pool.

This is mostly used for testing

Returns
references to the newly added threads.
247 {
248  std::vector<ThreadPool::ThreadPtr> result(count);
249  for(int i = 0; i < count; i++)
250  {
251  result[i] = add();
252  }
253 
254  return result;
255 }
Here is the call graph for this function:

◆ create() [1/2]

ThreadPool::Ptr ThreadPool::create ( bool  completeAllJobsBeforeDestruction = false)
static

Create a ThreadPool with one thread for each core in the system

229 {
230  return ThreadPool::Ptr(new ThreadPool(completeAllJobsBeforeDestruction));
231 }

Referenced by BOOST_AUTO_TEST_CASE().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ create() [2/2]

ThreadPool::Ptr ThreadPool::create ( int  count,
bool  completeAllJobsBeforeDestruction = false 
)
static

Create a ThreadPool with the given number of threads

234 {
235  return ThreadPool::Ptr(new ThreadPool(count, completeAllJobsBeforeDestruction));
236 }
Here is the call graph for this function:

◆ defaultQueue()

ThreadPool::Queue::Ptr ThreadPool::defaultQueue ( )
staticprivate
366 {
367  static ThreadPool::Queue::Ptr const queue = ThreadPool::Queue::create();
368  return queue;
369 }
Here is the call graph for this function:

◆ do_one()

void ThreadPool::do_one ( const PrivateData::Ptr priv)
staticprivate

Execute one job.

This gets called from work(). It fetches and executes the highest-prio job from ThreadPool::jobs

309 {
310  ThreadPool::Job job;
311 
312  {
313  boost::mutex::scoped_lock const lock(priv->mut);
314 
315  while(!priv->jobs.empty() && priv->jobs.begin()->second.empty())
316  {
317  priv->jobs.erase(priv->jobs.begin());
318  }
319 
320  if(!priv->jobs.empty() && !priv->jobs.begin()->second.empty())
321  {
322  job = priv->jobs.begin()->second.front();
323  priv->jobs.begin()->second.pop();
324  }
325  else
326  {
327  defect_message("JobQueue empty while it shouldn't be");
328  }
329  }
330 
331  if(job.queue)
332  {
333  QueueLock const l(job.queue);
334  if(l.queueExists())
335  {
336  boost::this_thread::disable_interruption const while_executing_jobs;
337  job.fn();
338  }
339  }
340 }

Referenced by work().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ operator=() [1/2]

ThreadPool ThreadPool::operator= ( const ThreadPool )
delete

◆ operator=() [2/2]

ThreadPool ThreadPool::operator= ( ThreadPool &&  )
delete

◆ schedule() [1/16]

template<typename R >
boost::unique_future< R > ThreadPool::schedule ( boost::function< R()> const &  fn,
const Queue::Ptr queue 
)
56 {
57  return schedule(fn, defaultPriority, queue);
58 }
Here is the call graph for this function:

◆ schedule() [2/16]

template<typename R >
boost::unique_future< R > ThreadPool::schedule ( boost::function< R()> const &  fn,
int  priority,
WeakQueue::Ptr  queue 
)
74 {
75  // Todo: If boost::function supported move semantics, we could do without
76  // the shared pointer.
77 
78  // Todo: Without the static cast, Boost 1.53 packaged_task stores a
79  // reference to fn, which is a temporary, and hence results in
80  // undefined behaviour. Move semantics seem to work OK there...
81  //
82  // See https://svn.boost.org/trac/boost/ticket/8596
83  boost::shared_ptr<boost::packaged_task<R>> const t(new boost::packaged_task<R>(static_cast<boost::function<R()>>(fn)));
84  boost::unique_future<R> f = t->get_future();
85  schedule(boost::bind(Detail::threadPoolExecute<void, boost::packaged_task<R>>, t), priority, queue);
86  return f;
87 }
Here is the call graph for this function:

◆ schedule() [3/16]

template<typename R >
boost::unique_future< R > ThreadPool::schedule ( boost::function< R()> const &  fn,
int  priority = defaultPriority,
const Queue::Ptr queue = defaultQueue() 
)
50 {
51  return schedule(fn, priority, queue->getWeak());
52 }
Here is the call graph for this function:

◆ schedule() [4/16]

template<typename R >
boost::unique_future< R > ThreadPool::schedule ( boost::function< R()> const &  fn,
WeakQueue::Ptr  queue 
)
91 {
92  return schedule(fn, defaultPriority, queue);
93 }
Here is the call graph for this function:

◆ schedule() [5/16]

void ThreadPool::schedule ( boost::function< void()> const &  fn,
const Queue::Ptr queue 
)

Schedule the given job at the given queue

348 {
349  schedule(fn, defaultPriority, std::move(queue));
350 }
Here is the call graph for this function:

◆ schedule() [6/16]

void ThreadPool::schedule ( boost::function< void()> const &  fn,
const WeakQueue::Ptr queue 
)

Schedule the given job at the given queue

361 {
362  schedule(fn, defaultPriority, std::move(queue));
363 }
Here is the call graph for this function:

◆ schedule() [7/16]

void ThreadPool::schedule ( boost::function< void()> const &  fn,
int  priority,
const WeakQueue::Ptr queue 
)

Schedule the given job at the given priority

353 {
354  boost::mutex::scoped_lock const lock(priv->mut);
355  priv->jobs[priority].emplace(fn, queue);
356  priv->jobcount++;
357  priv->cond.notify_one();
358 }

◆ schedule() [8/16]

void ThreadPool::schedule ( boost::function< void()> const &  fn,
int  priority = defaultPriority,
const Queue::Ptr queue = defaultQueue() 
)

Schedule the given job at the given priority

343 {
344  schedule(fn, priority, queue->getWeak());
345 }

Referenced by BOOST_AUTO_TEST_CASE(), has_at_least_n_threads(), and schedule().

Here is the caller graph for this function:

◆ schedule() [9/16]

template<typename T >
void ThreadPool::schedule ( boost::shared_ptr< T >  fn,
const Queue::Ptr queue 
)

Schedule the given job at the given priority

Precondition
T::operator()() must be defined
32 {
33  schedule(fn, defaultPriority, queue);
34 }
Here is the call graph for this function:

◆ schedule() [10/16]

template<typename R , typename T >
boost::unique_future< R > ThreadPool::schedule ( boost::shared_ptr< T >  fn,
const Queue::Ptr queue 
)
68 {
69  return schedule<R, T>(fn, defaultPriority, queue);
70 }

◆ schedule() [11/16]

template<typename T >
void ThreadPool::schedule ( boost::shared_ptr< T >  fn,
int  priority,
WeakQueue::Ptr  queue 
)

Schedule the given job at the given priority

Precondition
T::operator()() must be defined
38 {
39  schedule(boost::bind(Detail::threadPoolExecute<void, T>, fn), priority, queue);
40 }
Here is the call graph for this function:

◆ schedule() [12/16]

template<typename R , typename T >
boost::unique_future< R > ThreadPool::schedule ( boost::shared_ptr< T >  fn,
int  priority,
WeakQueue::Ptr  queue 
)
97 {
98  // Todo: If boost::function supported move semantics, we could do without
99  // the shared pointer.
100  boost::shared_ptr<boost::packaged_task<R>> const t(
101  new boost::packaged_task<R>(boost::bind(Detail::threadPoolExecute<R, T>, fn)));
102  boost::unique_future<R> f = t->get_future();
103  schedule(boost::bind(Detail::threadPoolExecute<void, boost::packaged_task<R>>, t), priority, queue);
104  return f;
105 }
Here is the call graph for this function:

◆ schedule() [13/16]

template<typename T >
void ThreadPool::schedule ( boost::shared_ptr< T >  fn,
int  priority = defaultPriority,
const Queue::Ptr queue = defaultQueue() 
)

Schedule the given job at the given priority

Precondition
T::operator()() must be defined
26 {
27  schedule(fn, priority, queue->getWeak());
28 }
Here is the call graph for this function:

◆ schedule() [14/16]

template<typename R , typename T >
boost::unique_future< R > ThreadPool::schedule ( boost::shared_ptr< T >  fn,
int  priority = defaultPriority,
const Queue::Ptr queue = defaultQueue() 
)
62 {
63  return schedule<R, T>(fn, priority, queue->getWeak());
64 }

◆ schedule() [15/16]

template<typename T >
void ThreadPool::schedule ( boost::shared_ptr< T >  fn,
WeakQueue::Ptr  queue 
)

Schedule the given job at the given priority

Precondition
T::operator()() must be defined
44 {
45  schedule(fn, defaultPriority, queue);
46 }
Here is the call graph for this function:

◆ schedule() [16/16]

template<typename R , typename T >
boost::unique_future< R > ThreadPool::schedule ( boost::shared_ptr< T >  fn,
WeakQueue::Ptr  queue 
)
109 {
110  return schedule(fn, defaultPriority, queue);
111 }
Here is the call graph for this function:

◆ work()

void ThreadPool::work ( const PrivateData::Ptr priv)
staticprivate

This is executed by each of the threads in the ThreadPool

  • Wait for a job to be scheduled
  • Fetch the highest-prio job from ThreadPool::jobs
  • Execute

Those last two tasks will be performed by do_one()

274 {
275  boost::mutex::scoped_lock lock(priv->mut);
276  while(priv->alive)
277  {
278  if(priv->jobcount > 0)
279  {
280  priv->jobcount--;
281  lock.unlock();
282  do_one(priv);
283  lock.lock();
284  }
285  else
286  {
287  priv->cond.wait(lock);
288  }
289  }
290 
291  bool busy = priv->completeAllJobsBeforeDestruction;
292  while(busy)
293  {
294  if(priv->jobcount > 0)
295  {
296  priv->jobcount--;
297  lock.unlock();
298  do_one(priv);
299  lock.lock();
300  }
301  else
302  {
303  busy = false;
304  }
305  }
306 }

Referenced by add().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ defaultPriority

const int ThreadPool::defaultPriority = PRIO_NORMAL
staticprivate

Referenced by schedule().

◆ priv

PrivateData::Ptr ThreadPool::priv
private

Referenced by add(), do_one(), schedule(), work(), and ~ThreadPool().

◆ threads

std::list<ThreadPtr> ThreadPool::threads
private

Threads in this ThreadPool

Referenced by add().


The documentation for this class was generated from the following files:
ThreadPool::ThreadPool
ThreadPool(bool completeAllJobsBeforeDestruction=false)
ThreadPool.
Definition: threadpoolimpl.cc:207
ThreadPool::Ptr
boost::shared_ptr< ThreadPool > Ptr
Definition: threadpool.hh:169
ThreadPool::Queue::Ptr
boost::shared_ptr< Queue > Ptr
Definition: threadpool.hh:82
ThreadPool::PrivateData::create
static Ptr create(bool completeAllJobsBeforeDestruction)
Definition: threadpoolimpl.cc:198
Detail::threadPoolExecute
R threadPoolExecute(boost::shared_ptr< T > fn)
Definition: threadpoolimpl.hh:18
ThreadPool::Job
Definition: threadpool.hh:159
ThreadPool::defaultPriority
static const int defaultPriority
Definition: threadpool.hh:270
ThreadPool::Queue::create
static Ptr create()
ThreadPool::Queue.
Definition: threadpoolimpl.cc:377
Scroom::Detail::ThreadPool::QueueLock
Definition: queue.hh:63
anonymous_namespace{progressbarmanager.cc}::instance
ProgressBarPulser::Ptr instance()
Definition: progressbarmanager.cc:43
ThreadPool::Job::fn
boost::function< void()> fn
Definition: threadpool.hh:162
ThreadPool::do_one
static void do_one(const PrivateData::Ptr &priv)
Definition: threadpoolimpl.cc:308
defect_message
#define defect_message(m)
Definition: assertions.hh:43
ThreadPool::add
ThreadPtr add()
Definition: threadpoolimpl.cc:238
ThreadPool::ThreadPtr
boost::shared_ptr< boost::thread > ThreadPtr
Definition: threadpool.hh:170
ThreadPool::work
static void work(const PrivateData::Ptr &priv)
Definition: threadpoolimpl.cc:273
ThreadPool::schedule
void schedule(boost::function< void()> const &fn, int priority=defaultPriority, const Queue::Ptr &queue=defaultQueue())
Definition: threadpoolimpl.cc:342
ThreadPool::priv
PrivateData::Ptr priv
Definition: threadpool.hh:247
ThreadPool::Job::queue
boost::shared_ptr< Scroom::Detail::ThreadPool::QueueImpl > queue
Definition: threadpool.hh:161
ThreadPool::threads
std::list< ThreadPtr > threads
Definition: threadpool.hh:246