Scroom 0.14-48-ga0fee447
Loading...
Searching...
No Matches
QueueJumper Class Reference

#include <threadpool.hh>

Collaboration diagram for QueueJumper:
Collaboration graph

Public Types

using Ptr = std::shared_ptr< QueueJumper >
 

Public Member Functions

bool setWork (boost::function< void()> const &fn)
 
void operator() ()
 

Static Public Member Functions

static Ptr create ()
 QueueJumper.
 

Protected Member Functions

 QueueJumper ()=default
 

Private Attributes

boost::mutex mut
 
bool inQueue {true}
 
bool isSet {false}
 
boost::function< void()> fn
 

Detailed Description

Jump the queue of a ThreadPool

Have you ever seen a couple going shopping in the mall, where the husband will go and wait in the queue for the register, while the wife goes out into the shop and brings items to her husband, already waiting to pay them? With QueueJumper, you'll get to be the wife, while QueueJumper is the husband.

You'll create() a QueueJumper, and schedule() it on a ThreadPool. While the QueueJumper sits in the queue, waiting to be processed, you can do whatever you want. At some later time, you'll call setWork(). If the QueueJumper is still in the queue, it will accept the work, and it will eventually get executed. Of course, if the QueueJumper is no longer in the queue, you're out of luck.

QueueJumper is used to maximally use the number of cores you have available in your system, however many (or few) that is. Let's assume you have a single threaded task (say, reading data from a bitmap) that spawns a lot of background tasks (i.e. pre-scaling the bitmap to some pre-defined zoom levels) that can be executed in parallel. On any given system, two things can happen

  • The scaling is faster than the loading.
  • The scaling is slower than the loading.

Let's assume for the moment that reading data from a bitmap is a task that reads a given number of lines (say 1024 or some such), and then reschedules itself. Let's assume also that all of these threads are scheduled at the same priority. You'll start out as follows:

| Queue | R

Shown here is a ThreadPool queue, with one job in it, marked R, because it is a Reading job. After the reading job has completed, the queue will look as follows:

| Queue | S S S S S S S S S S

The queue now has some (say 10) scaling jobs in it, which can be executed in parallel. Let's also reschedule our reading task:

| Queue | S S S S S S S S S S R

Wait a minute! The scaling tasks can be executed in parallel, but the reading task will be scheduled last, and then we'll be back to waiting for it to complete, while my x-core machine is using only one core. Let's fix that by executing our reading task at a higher priority, such that it gets executed first.

| Higher prio | R
| Normal prio | S S S S S S S S S S

In this scenario, our new reading task will be executed, while the other cores are busy with the scaling jobs. When the scaling jobs are finished, there are hopefully new scaling jobs waiting to be processed.

This will, in fact, work fine if the scaling is faster than the reading. By the time the Reading task has scheduled new Scaling jobs, all the previous ones will have finished. Problems may start, however, if the Reading is faster than the Scaling. When the Reading is finished (and has scheduled 10 new Scaling jobs), the old scaling jobs still haven't completed. As a result, the number of remaining scaling jobs will grow and grow. Worst case, on a 1-core machine, no scaling job will get processed, because there is always a reading job with a higher priority in the queue. That is, of course, until all reading jobs have finished. Scaling will begin only when the bitmap is completely loaded.

For you, this may or may not be a problem. But if you are dealing with very large bitmaps, then a scenario of first loading the bitmap and then scaling it involves a lot of swapping: First you load the bitmap, and as you load it, you'll swap out the older unused parts. And then when you start scaling, you'll have to swap those parts back in. It is much more efficient to do the scaling before swapping out the old parts, and then swap out the old and the scaled parts in one go. This is where QueueJumper can help out.

Let's revisit for a moment the scenario where Reading jobs are scheduled at a higher priority:

| Higher prio | R
| Normal prio | S S S S S S S S S S

When the Reading job starts, the first thing it does is schedule a QueueJumper:

| Higher prio |
| Normal prio | S S S S S S S S S S Q

Next, it will proceed to read bitmap data, which will result in additional scaling jobs:

| Higher prio |
| Normal prio | S S S S S S S S S S Q S S S S S S S S S S

In the mean time, the other cores (if any) are busy with the scaling jobs. Like we observed earlier, there are two possible scenarios

Scaling is faster than reading
When the reading job completes, all earlier scaling jobs will have finished. Also, the QueueJumper will have been processed (doing no work)

| Higher prio |
| Normal prio | S S S S S S S S S S

In this case, QueueJumper::setWork() will fail, because the QueueJumper is no longer in the queue. You will have to reschedule your reading task at the higher priority, like we did before:

| Higher prio | R
| Normal prio | S S S S S S S S S S

In this scenario, you are not optimally using all cores, because reading is a bottleneck.

Reading is faster than scaling
When the reading job completes, there will still be scaling jobs left in the queue. Also the QueueJumper will still be there:

| Higher prio |
| Normal prio | S S S S Q S S S S S S S S S S

If we now schedule a Reading task at a higher priority, like before, then the number of scaling tasks will grow and grow. But if we call QueueJumper::setWork(), then we can effectively substitute the QueueJumper with our reading task:

| Higher prio |
| Normal prio | S S S S R S S S S S S S S S S

This way, the reading task will not be started immediately, but still sufficiently early. We can be confident that the new reading task, too, is finished before the scaling tasks that succeed it.

Memory behaviour is very attractive to: We need to have only two "batches" in memory: One that is currently being read and one that is currently being scaled. After those tasks complete, we'll not need the data again until it is time to show the bitmap on the screen.

See also
TiledBitmap

Member Typedef Documentation

◆ Ptr

using QueueJumper::Ptr = std::shared_ptr<QueueJumper>

Constructor & Destructor Documentation

◆ QueueJumper()

QueueJumper::QueueJumper ( )
protecteddefault

Member Function Documentation

◆ create()

QueueJumper::Ptr QueueJumper::create ( )
static

QueueJumper.

Create a QueueJumper

QueueJumper objects are managed as shared pointers, because there will be two references to it:

  • one held by the ThreadPool, such that it can execute the job
  • one held by the original submitter, such that it can call setWork()

The QueueJumper will be destroyed when both parties have released their reference.

422{ return QueueJumper::Ptr(new QueueJumper()); }
std::shared_ptr< QueueJumper > Ptr
Definition threadpool.hh:533
QueueJumper()=default

Referenced by DataFetcher::operator()().

Here is the caller graph for this function:

◆ operator()()

void QueueJumper::operator() ( )
442{
443 boost::mutex::scoped_lock const lock(mut);
444 if(isSet)
445 {
446 fn();
447 }
448 inQueue = false;
449}
bool isSet
Definition threadpool.hh:538
bool inQueue
Definition threadpool.hh:537
boost::mutex mut
Definition threadpool.hh:536
boost::function< void()> fn
Definition threadpool.hh:540

◆ setWork()

bool QueueJumper::setWork ( boost::function< void()> const &  fn)

Set the function to be executed

Parameters
fnFunction to be called from within operator()().
Return values
trueif the QueueJumper is still in the queue (and the function will get executed)
falseif the QueueJumper no longer is in the queue (fn will be ignored)
425{
426 boost::mutex::scoped_lock const lock(mut);
427 if(inQueue)
428 {
429 // Our turn hasn't passed yet. Accept work.
430 fn = fn_;
431 isSet = true;
432 }
433 else
434 {
435 // Our turn has passed. We cannot do the work
436 }
437
438 return inQueue;
439}

Member Data Documentation

◆ fn

boost::function<void()> QueueJumper::fn
private

◆ inQueue

bool QueueJumper::inQueue {true}
private
537{true};

◆ isSet

bool QueueJumper::isSet {false}
private
538{false};

◆ mut

boost::mutex QueueJumper::mut
private

The documentation for this class was generated from the following files: