Scroom 0.14-49-gb7ae7a6d
Loading...
Searching...
No Matches
ThreadPool Class Reference

#include <threadpool.hh>

Collaboration diagram for ThreadPool:
Collaboration graph

Classes

struct  Job
 
class  PrivateData
 
class  Queue
 
class  WeakQueue
 

Public Types

using Ptr = std::shared_ptr< ThreadPool >
 
using ThreadPtr = std::shared_ptr< boost::thread >
 

Public Member Functions

 ThreadPool (bool completeAllJobsBeforeDestruction=false)
 ThreadPool.
 
 ThreadPool (int count, bool completeAllJobsBeforeDestruction=false)
 
 ThreadPool (const ThreadPool &)=delete
 
 ThreadPool (ThreadPool &&)=delete
 
ThreadPool operator= (const ThreadPool &)=delete
 
ThreadPool operator= (ThreadPool &&)=delete
 
 ~ThreadPool ()
 
void schedule (boost::function< void()> const &fn, int priority=defaultPriority, const Queue::Ptr &queue=defaultQueue())
 
void schedule (boost::function< void()> const &fn, const Queue::Ptr &queue)
 
template<typename T >
void schedule (std::shared_ptr< T > fn, int priority=defaultPriority, const Queue::Ptr &queue=defaultQueue())
 
template<typename T >
void schedule (std::shared_ptr< T > fn, const Queue::Ptr &queue)
 
void schedule (boost::function< void()> const &fn, int priority, const WeakQueue::Ptr &queue)
 
void schedule (boost::function< void()> const &fn, const WeakQueue::Ptr &queue)
 
template<typename T >
void schedule (std::shared_ptr< T > fn, int priority, const WeakQueue::Ptr &queue)
 
template<typename T >
void schedule (std::shared_ptr< T > fn, WeakQueue::Ptr queue)
 
template<typename R >
boost::unique_future< R > schedule (boost::function< R()> const &fn, int priority=defaultPriority, const Queue::Ptr &queue=defaultQueue())
 
template<typename R >
boost::unique_future< R > schedule (boost::function< R()> const &fn, const Queue::Ptr &queue)
 
template<typename R , typename T >
boost::unique_future< R > schedule (const std::shared_ptr< T > &fn, int priority=defaultPriority, const Queue::Ptr &queue=defaultQueue())
 
template<typename R , typename T >
boost::unique_future< R > schedule (std::shared_ptr< T > fn, const Queue::Ptr &queue)
 
template<typename R >
boost::unique_future< R > schedule (boost::function< R()> const &fn, int priority, const WeakQueue::Ptr &queue)
 
template<typename R >
boost::unique_future< R > schedule (boost::function< R()> const &fn, WeakQueue::Ptr queue)
 
template<typename R , typename T >
boost::unique_future< R > schedule (const std::shared_ptr< T > &fn, int priority, const WeakQueue::Ptr &queue)
 
template<typename R , typename T >
boost::unique_future< R > schedule (std::shared_ptr< T > fn, WeakQueue::Ptr queue)
 
ThreadPtr add ()
 
std::vector< ThreadPtradd (int count)
 

Static Public Member Functions

static ThreadPool::Ptr create (bool completeAllJobsBeforeDestruction=false)
 
static ThreadPool::Ptr create (int count, bool completeAllJobsBeforeDestruction=false)
 

Static Private Member Functions

static void work (const PrivateData::Ptr &priv)
 
static void do_one (const PrivateData::Ptr &priv)
 
static Queue::Ptr defaultQueue ()
 

Private Attributes

std::list< ThreadPtrthreads
 
PrivateData::Ptr priv
 

Static Private Attributes

static const int defaultPriority = PRIO_NORMAL
 

Detailed Description

Generic threadpool

A ThreadPool is basically a collection of threads you can schedule() work on.

Member Typedef Documentation

◆ Ptr

using ThreadPool::Ptr = std::shared_ptr<ThreadPool>

◆ ThreadPtr

using ThreadPool::ThreadPtr = std::shared_ptr<boost::thread>

Constructor & Destructor Documentation

◆ ThreadPool() [1/4]

ThreadPool::ThreadPool ( bool  completeAllJobsBeforeDestruction = false)
explicit

ThreadPool.

Create a ThreadPool with one thread for each core in the system

211 : priv(PrivateData::create(completeAllJobsBeforeDestruction))
212{
213 const int count = boost::thread::hardware_concurrency();
214#ifndef MULTITHREADING
215 if(count > 1)
216 count = 1;
217#endif
218 add(count);
219}
static Ptr create(bool completeAllJobsBeforeDestruction)
Definition threadpoolimpl.cc:201
PrivateData::Ptr priv
Definition threadpool.hh:243
ThreadPtr add()
Definition threadpoolimpl.cc:241
const size_t count
Definition pageprovider-tests.cc:21
Here is the call graph for this function:

◆ ThreadPool() [2/4]

ThreadPool::ThreadPool ( int  count,
bool  completeAllJobsBeforeDestruction = false 
)
explicit

Create a ThreadPool with the given number of threads

222 : priv(PrivateData::create(completeAllJobsBeforeDestruction))
223{
224#ifndef MULTITHREADING
225 if(count > 1)
226 count = 1;
227#endif
228 add(count);
229}
Here is the call graph for this function:

◆ ThreadPool() [3/4]

ThreadPool::ThreadPool ( const ThreadPool )
delete

◆ ThreadPool() [4/4]

ThreadPool::ThreadPool ( ThreadPool &&  )
delete

◆ ~ThreadPool()

ThreadPool::~ThreadPool ( )

Destructor

  • stop all threads
  • throw away all jobs that haven't been executed
261{
262 // Destroying the threadpool used to be done by interrupting all
263 // threads, but this doesn't work reliably, at least until boost
264 // 1.45. Hence, we're back to using an "alive" boolean and a regular
265 // condition variable.
266 //
267 // See also https://svn.boost.org/trac/boost/ticket/2330
268
269 {
270 boost::mutex::scoped_lock const lock(priv->mut);
271 priv->alive = false;
272 priv->cond.notify_all();
273 }
274}

Member Function Documentation

◆ add() [1/2]

ThreadPool::ThreadPtr ThreadPool::add ( )

Add an additional thread to the pool.

This is mostly used for testing

Returns
a reference to the newly added thread.
242{
243 auto t = std::make_shared<boost::thread>([data = priv] { ThreadPool::work(data); });
244 threads.push_back(t);
246 return t;
247}
uint8_t data
Definition blob-tests.cc:36
std::list< ThreadPtr > threads
Definition threadpool.hh:242
static void work(const PrivateData::Ptr &priv)
Definition threadpoolimpl.cc:276
static Ptr instance()
Definition threadpoolimpl.cc:91
ThreadPool t(0)

Referenced by add(), add(), on_presentation_added_to_view(), Layer::registerObserver(), Scroom::Bookkeeping::MapBase< K, V >::reReserve(), Scroom::Bookkeeping::MapBase< K, V >::reserve(), ThreadPool(), and ThreadPool().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ add() [2/2]

std::vector< ThreadPool::ThreadPtr > ThreadPool::add ( int  count)

Add the given number of threads to the pool.

This is mostly used for testing

Returns
references to the newly added threads.
250{
251 std::vector<ThreadPool::ThreadPtr> result(count);
252 for(int i = 0; i < count; i++)
253 {
254 result[i] = add();
255 }
256
257 return result;
258}
SampleIterator< const uint8_t > result
Definition sampleiterator-tests.cc:94
Here is the call graph for this function:

◆ create() [1/2]

ThreadPool::Ptr ThreadPool::create ( bool  completeAllJobsBeforeDestruction = false)
static

Create a ThreadPool with one thread for each core in the system

232{
233 return std::make_shared<ThreadPool>(completeAllJobsBeforeDestruction);
234}

◆ create() [2/2]

ThreadPool::Ptr ThreadPool::create ( int  count,
bool  completeAllJobsBeforeDestruction = false 
)
static

Create a ThreadPool with the given number of threads

237{
238 return std::make_shared<ThreadPool>(count, completeAllJobsBeforeDestruction);
239}

◆ defaultQueue()

ThreadPool::Queue::Ptr ThreadPool::defaultQueue ( )
staticprivate
369{
371 return queue;
372}
std::shared_ptr< Queue > Ptr
Definition threadpool.hh:78
static Ptr create()
ThreadPool::Queue.
Definition threadpoolimpl.cc:380
Here is the call graph for this function:

◆ do_one()

void ThreadPool::do_one ( const PrivateData::Ptr priv)
staticprivate

Execute one job.

This gets called from work(). It fetches and executes the highest-prio job from ThreadPool::jobs

312{
313 ThreadPool::Job job;
314
315 {
316 boost::mutex::scoped_lock const lock(priv->mut);
317
318 while(!priv->jobs.empty() && priv->jobs.begin()->second.empty())
319 {
320 priv->jobs.erase(priv->jobs.begin());
321 }
322
323 if(!priv->jobs.empty() && !priv->jobs.begin()->second.empty())
324 {
325 job = priv->jobs.begin()->second.front();
326 priv->jobs.begin()->second.pop();
327 }
328 else
329 {
330 defect_message("JobQueue empty while it shouldn't be");
331 }
332 }
333
334 if(job.queue)
335 {
336 QueueLock const l(job.queue);
337 if(l.queueExists())
338 {
339 boost::this_thread::disable_interruption const while_executing_jobs;
340 job.fn();
341 }
342 }
343}
#define defect_message(m)
Definition assertions.hh:49
PageList const l
Definition compression-tests.cc:33
Definition threadpool.hh:156
std::shared_ptr< Scroom::Detail::ThreadPool::QueueImpl > queue
Definition threadpool.hh:157
boost::function< void()> fn
Definition threadpool.hh:158

Referenced by work().

Here is the caller graph for this function:

◆ operator=() [1/2]

ThreadPool ThreadPool::operator= ( const ThreadPool )
delete

◆ operator=() [2/2]

ThreadPool ThreadPool::operator= ( ThreadPool &&  )
delete

◆ schedule() [1/16]

template<typename R >
boost::unique_future< R > ThreadPool::schedule ( boost::function< R()> const &  fn,
const Queue::Ptr queue 
)
58{
59 return schedule(fn, defaultPriority, queue);
60}
void schedule(boost::function< void()> const &fn, int priority=defaultPriority, const Queue::Ptr &queue=defaultQueue())
Definition threadpoolimpl.cc:345
static const int defaultPriority
Definition threadpool.hh:266
Here is the call graph for this function:

◆ schedule() [2/16]

template<typename R >
boost::unique_future< R > ThreadPool::schedule ( boost::function< R()> const &  fn,
int  priority,
const WeakQueue::Ptr queue 
)
77{
78 // Todo: If boost::function supported move semantics, we could do without
79 // the shared pointer.
80
81 // Todo: Without the static cast, Boost 1.53 packaged_task stores a
82 // reference to fn, which is a temporary, and hence results in
83 // undefined behaviour. Move semantics seem to work OK there...
84 //
85 // See https://svn.boost.org/trac/boost/ticket/8596
86 std::shared_ptr<boost::packaged_task<R>> const t(new boost::packaged_task<R>(static_cast<boost::function<R()>>(fn)));
87 boost::unique_future<R> f = t->get_future();
88 schedule([t] { Detail::threadPoolExecute<void>(t); }, priority, queue);
89 return f;
90}
f
Definition gtkhelper-tests.cc:43
Here is the call graph for this function:

◆ schedule() [3/16]

template<typename R >
boost::unique_future< R > ThreadPool::schedule ( boost::function< R()> const &  fn,
int  priority = defaultPriority,
const Queue::Ptr queue = defaultQueue() 
)
52{
53 return schedule(fn, priority, queue->getWeak());
54}
Here is the call graph for this function:

◆ schedule() [4/16]

template<typename R >
boost::unique_future< R > ThreadPool::schedule ( boost::function< R()> const &  fn,
WeakQueue::Ptr  queue 
)
94{
95 return schedule(fn, defaultPriority, queue);
96}
Here is the call graph for this function:

◆ schedule() [5/16]

void ThreadPool::schedule ( boost::function< void()> const &  fn,
const Queue::Ptr queue 
)

Schedule the given job at the given queue

351{
352 schedule(fn, defaultPriority, std::move(queue));
353}
Here is the call graph for this function:

◆ schedule() [6/16]

void ThreadPool::schedule ( boost::function< void()> const &  fn,
const WeakQueue::Ptr queue 
)

Schedule the given job at the given queue

364{
365 schedule(fn, defaultPriority, std::move(queue));
366}
Here is the call graph for this function:

◆ schedule() [7/16]

void ThreadPool::schedule ( boost::function< void()> const &  fn,
int  priority,
const WeakQueue::Ptr queue 
)

Schedule the given job at the given priority

356{
357 boost::mutex::scoped_lock const lock(priv->mut);
358 priv->jobs[priority].emplace(fn, queue);
359 priv->jobcount++;
360 priv->cond.notify_one();
361}

◆ schedule() [8/16]

void ThreadPool::schedule ( boost::function< void()> const &  fn,
int  priority = defaultPriority,
const Queue::Ptr queue = defaultQueue() 
)

Schedule the given job at the given priority

346{
347 schedule(fn, priority, queue->getWeak());
348}

Referenced by has_at_least_n_threads(), WaitForAsyncOp::operator()(), schedule(), schedule(), schedule(), schedule(), schedule(), schedule(), schedule(), schedule(), schedule(), schedule(), schedule(), and schedule().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ schedule() [9/16]

template<typename R , typename T >
boost::unique_future< R > ThreadPool::schedule ( const std::shared_ptr< T > &  fn,
int  priority,
const WeakQueue::Ptr queue 
)
100{
101 // Todo: If boost::function supported move semantics, we could do without
102 // the shared pointer.
103 const auto t = std::make_shared<boost::packaged_task<R>>([fn] { return Detail::threadPoolExecute<R>(fn); });
104 boost::unique_future<R> f = t->get_future();
105 schedule([t] { Detail::threadPoolExecute<void>(t); }, priority, queue);
106 return f;
107}
Here is the call graph for this function:

◆ schedule() [10/16]

template<typename R , typename T >
boost::unique_future< R > ThreadPool::schedule ( const std::shared_ptr< T > &  fn,
int  priority = defaultPriority,
const Queue::Ptr queue = defaultQueue() 
)
64{
65 return schedule<R, T>(fn, priority, queue->getWeak());
66}

◆ schedule() [11/16]

template<typename T >
void ThreadPool::schedule ( std::shared_ptr< T >  fn,
const Queue::Ptr queue 
)

Schedule the given job at the given priority

Precondition
T::operator()() must be defined
34{
35 schedule(fn, defaultPriority, queue);
36}
Here is the call graph for this function:

◆ schedule() [12/16]

template<typename R , typename T >
boost::unique_future< R > ThreadPool::schedule ( std::shared_ptr< T >  fn,
const Queue::Ptr queue 
)
70{
71 return schedule<R, T>(fn, defaultPriority, queue);
72}

◆ schedule() [13/16]

template<typename T >
void ThreadPool::schedule ( std::shared_ptr< T >  fn,
int  priority,
const WeakQueue::Ptr queue 
)

Schedule the given job at the given priority

Precondition
T::operator()() must be defined
40{
41 schedule([fn = std::move(fn)] { Detail::threadPoolExecute<void>(fn); }, priority, std::move(queue));
42}
Here is the call graph for this function:

◆ schedule() [14/16]

template<typename T >
void ThreadPool::schedule ( std::shared_ptr< T >  fn,
int  priority = defaultPriority,
const Queue::Ptr queue = defaultQueue() 
)

Schedule the given job at the given priority

Precondition
T::operator()() must be defined
28{
29 schedule(std::move(fn), priority, queue->getWeak());
30}
Here is the call graph for this function:

◆ schedule() [15/16]

template<typename T >
void ThreadPool::schedule ( std::shared_ptr< T >  fn,
WeakQueue::Ptr  queue 
)

Schedule the given job at the given priority

Precondition
T::operator()() must be defined
46{
47 schedule(fn, defaultPriority, queue);
48}
Here is the call graph for this function:

◆ schedule() [16/16]

template<typename R , typename T >
boost::unique_future< R > ThreadPool::schedule ( std::shared_ptr< T >  fn,
WeakQueue::Ptr  queue 
)
111{
112 return schedule(fn, defaultPriority, queue);
113}
Here is the call graph for this function:

◆ work()

void ThreadPool::work ( const PrivateData::Ptr priv)
staticprivate

This is executed by each of the threads in the ThreadPool

  • Wait for a job to be scheduled
  • Fetch the highest-prio job from ThreadPool::jobs
  • Execute

Those last two tasks will be performed by do_one()

277{
278 boost::mutex::scoped_lock lock(priv->mut);
279 while(priv->alive)
280 {
281 if(priv->jobcount > 0)
282 {
283 priv->jobcount--;
284 lock.unlock();
285 do_one(priv);
286 lock.lock();
287 }
288 else
289 {
290 priv->cond.wait(lock);
291 }
292 }
293
294 bool busy = priv->completeAllJobsBeforeDestruction;
295 while(busy)
296 {
297 if(priv->jobcount > 0)
298 {
299 priv->jobcount--;
300 lock.unlock();
301 do_one(priv);
302 lock.lock();
303 }
304 else
305 {
306 busy = false;
307 }
308 }
309}
static void do_one(const PrivateData::Ptr &priv)
Definition threadpoolimpl.cc:311

Referenced by add().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ defaultPriority

const int ThreadPool::defaultPriority = PRIO_NORMAL
staticprivate

◆ priv

PrivateData::Ptr ThreadPool::priv
private

Referenced by add(), do_one(), schedule(), work(), and ~ThreadPool().

◆ threads

std::list<ThreadPtr> ThreadPool::threads
private

Threads in this ThreadPool

Referenced by add().


The documentation for this class was generated from the following files: