|
Scroom 0.14-49-gb7ae7a6d
|
#include <threadpool.hh>

Public Types | |
| using | Ptr = std::shared_ptr< QueueJumper > |
Public Member Functions | |
| bool | setWork (boost::function< void()> const &fn) |
| void | operator() () |
Static Public Member Functions | |
| static Ptr | create () |
| QueueJumper. | |
Protected Member Functions | |
| QueueJumper ()=default | |
Private Attributes | |
| boost::mutex | mut |
| bool | inQueue {true} |
| bool | isSet {false} |
| boost::function< void()> | fn |
Jump the queue of a ThreadPool
Have you ever seen a couple going shopping in the mall, where the husband will go and wait in the queue for the register, while the wife goes out into the shop and brings items to her husband, already waiting to pay them? With QueueJumper, you'll get to be the wife, while QueueJumper is the husband.
You'll create() a QueueJumper, and schedule() it on a ThreadPool. While the QueueJumper sits in the queue, waiting to be processed, you can do whatever you want. At some later time, you'll call setWork(). If the QueueJumper is still in the queue, it will accept the work, and it will eventually get executed. Of course, if the QueueJumper is no longer in the queue, you're out of luck.
QueueJumper is used to maximally use the number of cores you have available in your system, however many (or few) that is. Let's assume you have a single threaded task (say, reading data from a bitmap) that spawns a lot of background tasks (i.e. pre-scaling the bitmap to some pre-defined zoom levels) that can be executed in parallel. On any given system, two things can happen
Let's assume for the moment that reading data from a bitmap is a task that reads a given number of lines (say 1024 or some such), and then reschedules itself. Let's assume also that all of these threads are scheduled at the same priority. You'll start out as follows:
| Queue | R
Shown here is a ThreadPool queue, with one job in it, marked R, because it is a Reading job. After the reading job has completed, the queue will look as follows:
| Queue | S S S S S S S S S S
The queue now has some (say 10) scaling jobs in it, which can be executed in parallel. Let's also reschedule our reading task:
| Queue | S S S S S S S S S S R
Wait a minute! The scaling tasks can be executed in parallel, but the reading task will be scheduled last, and then we'll be back to waiting for it to complete, while my x-core machine is using only one core. Let's fix that by executing our reading task at a higher priority, such that it gets executed first.
| Higher prio | R | Normal prio | S S S S S S S S S S
In this scenario, our new reading task will be executed, while the other cores are busy with the scaling jobs. When the scaling jobs are finished, there are hopefully new scaling jobs waiting to be processed.
This will, in fact, work fine if the scaling is faster than the reading. By the time the Reading task has scheduled new Scaling jobs, all the previous ones will have finished. Problems may start, however, if the Reading is faster than the Scaling. When the Reading is finished (and has scheduled 10 new Scaling jobs), the old scaling jobs still haven't completed. As a result, the number of remaining scaling jobs will grow and grow. Worst case, on a 1-core machine, no scaling job will get processed, because there is always a reading job with a higher priority in the queue. That is, of course, until all reading jobs have finished. Scaling will begin only when the bitmap is completely loaded.
For you, this may or may not be a problem. But if you are dealing with very large bitmaps, then a scenario of first loading the bitmap and then scaling it involves a lot of swapping: First you load the bitmap, and as you load it, you'll swap out the older unused parts. And then when you start scaling, you'll have to swap those parts back in. It is much more efficient to do the scaling before swapping out the old parts, and then swap out the old and the scaled parts in one go. This is where QueueJumper can help out.
Let's revisit for a moment the scenario where Reading jobs are scheduled at a higher priority:
| Higher prio | R | Normal prio | S S S S S S S S S S
When the Reading job starts, the first thing it does is schedule a QueueJumper:
| Higher prio | | Normal prio | S S S S S S S S S S Q
Next, it will proceed to read bitmap data, which will result in additional scaling jobs:
| Higher prio | | Normal prio | S S S S S S S S S S Q S S S S S S S S S S
In the mean time, the other cores (if any) are busy with the scaling jobs. Like we observed earlier, there are two possible scenarios
Scaling is faster than reading
When the reading job completes, all earlier scaling jobs will have finished. Also, the QueueJumper will have been processed (doing no work)
| Higher prio | | Normal prio | S S S S S S S S S S
In this case, QueueJumper::setWork() will fail, because the QueueJumper is no longer in the queue. You will have to reschedule your reading task at the higher priority, like we did before:
| Higher prio | R | Normal prio | S S S S S S S S S S
In this scenario, you are not optimally using all cores, because reading is a bottleneck.
Reading is faster than scaling
When the reading job completes, there will still be scaling jobs left in the queue. Also the QueueJumper will still be there:
| Higher prio | | Normal prio | S S S S Q S S S S S S S S S S
If we now schedule a Reading task at a higher priority, like before, then the number of scaling tasks will grow and grow. But if we call QueueJumper::setWork(), then we can effectively substitute the QueueJumper with our reading task:
| Higher prio | | Normal prio | S S S S R S S S S S S S S S S
This way, the reading task will not be started immediately, but still sufficiently early. We can be confident that the new reading task, too, is finished before the scaling tasks that succeed it.
Memory behaviour is very attractive to: We need to have only two "batches" in memory: One that is currently being read and one that is currently being scaled. After those tasks complete, we'll not need the data again until it is time to show the bitmap on the screen.
| using QueueJumper::Ptr = std::shared_ptr<QueueJumper> |
|
protecteddefault |
|
static |
Create a QueueJumper
QueueJumper objects are managed as shared pointers, because there will be two references to it:
The QueueJumper will be destroyed when both parties have released their reference.
Referenced by DataFetcher::operator()().

| bool QueueJumper::setWork | ( | boost::function< void()> const & | fn | ) |
Set the function to be executed
| fn | Function to be called from within operator()(). |
| true | if the QueueJumper is still in the queue (and the function will get executed) |
| false | if the QueueJumper no longer is in the queue (fn will be ignored) |
|
private |
|
private |
|
private |
|
private |