Revised on 4 Mar 2004
Rhapsodia.Scheduler Documentation
Rhapsodia.Scheduler library provides minimal and hopefully complete versions of classes commonly used in concurrent C++ programming. The library code consists of concepts that have been successfully used in production systems and follows well established concurrency design patterns and language idioms. Most of all, the emphasis has been on code that will work well with the C++ Standard Library and the Boost library.
Overview
Configuration, build and installation
Examples
Download
Bibliography
Acknowledgments
Overview
One way to categorize the functionality associated with threads is to take task-based view. A task might range from a single, asynchronous method invocation, to entire application session. Task-based designs are most commonly used in event frameworks, parallel computations, and IO systems. Whenever there is a need to invoke an operation asynchronously rather than relaying on a direct procedural invocation, task-based approach is usually chosen.
Unfortunately, while performance characteristics vary across hardware and software platforms, the overhead in creating threads to handle tasks is still significantly greater than a direct method invocation. Unless a task involves out-of-process remote procedural call, network IO or database access, payoff of creating a thread is often negative. Rhapsodia.Scheduler is trying to address this problem by mapping tasks to threads similarly in which threads are mapped to processes within modern operating systems. Calls triggering conceptually autonomous activities are maintained as events, queued and processed by relatively small pool of threads. The goal is to reduce the utilization of system-level resources and allow for concurrent execution techniques when expressing logically independent activities.
Design of Rhapsodia.Scheduler
There are tree components that constitute Rhapsodia.Scheduler. They are Scheduler, Executor and the Queue.
Application programs use Scheduler interface to pass tasks for execution. Apart from providing the interface for scheduling tasks, Scheduler controls the lifetime of the Executor and the Queue by containing their instances. This is somewhat similar to how fstream object contains file stream buffer in Standard C++ library. Although Scheduler is the most visible component of the three, its primary work scope is to pass tasks onto the Queue and signal the Executor to take over.
When tasks are scheduled, Scheduler puts them on the Queue and usually immediately returns the control back to the caller. The Queue serves as a buffer between the caller and the task execution mechanism and may support various saturation policies, like:
Size
policy: how big should the queue grow before starting to reject more tasks.
Priority
policy: should queued tasks be ordered, so high priority tasks get executed first?
Expiration
policy: should old tasks expire and be removed from the queue?
Executor
Executor is responsible for thread creation, thread pool management and assignment of threads to tasks. Whenever a task is put on the Queue, a signal is sent to the Executor to inform it of pending work. Like the Queue, Executor supports policies to control its runtime characteristics. In particular, it can be instructed on:
Maximum
number of threads that should be created to cope with the load.
Minimum
number of threads that should always stay in the pool no matter how light the load is.
On exit
should Executor finish pending jobs in the queue, or ignore them and exit immediately.
The separation and independent configuration of each component allows tuning of Rhapsodia.Scheduler to achieve optimum performance characteristics suitable for particular application needs.
Rhapsodia.Scheduler uses
Boost.Threads to gain access to the underlying, operating system specific, threading and synchronization support. Apart from the thread library, it is imperative to use multithreaded versions of all runtime libraries used by the program, including the C++ Standard Library implementation.
Rhapsodia.Scheduler follows the C++ Standard Library practice of allowing all functions except destructors or other specified functions to throw exceptions on errors. Unless otherwise specified, Rhapsodia.Scheduler functions that do not have an exception-specification may throw implementation-defined exceptions.
Scheduler meets the NonCopyable
requirement disallowing copy construction and copy assignment. It would be overly confusing to remember the rules for controlling copying semantics for multithreaded class like Scheduler.
Configuration, build and installation
Rhapsodia.Scheduler authors will make every effort to support compilers that
Boost library has been ported to. Your help is much appreciated in testing
Rhapsodia.Scheduler with various compilers and operating systems to which the authors of this library do not have access.
List of compilers on which Rhapsodia.Scheduler passed regression tests include:
- GNU g++ (GCC) 3.2.2 20030222
- Microsoft Visual C++ 7.1 (.NET)
- Microsoft Visual C++ 6, Service Pack 5 (Futures are not supported on this compiler)
Library configuration
Rhapsodia.Scheduler uses configuration macros found in
boost/config.hpp
. Please make sure that this header file is accessible when compiling your projects. For information on how to build sample programs, please read
Building sample programs.
Unix-like systems
Use configure script located under the Rhapsodia.Scheduler library root directory. If you installed the Boost library in a location that configure script cannot find automatically, you may need to specify it using --with-boost-include
and --with-boost-thread-lib
options of configure script. Note: Version 1.31.0 of Boost distribution does not create symbolic links from augmented shared library names to their generic equivalents. To fix this, you may need to create a symbolic link yourself. For example, when compiled with gcc: libboost_thread.so needs to point to libboost_thread-gcc-mt-1_31.so
Microsoft Windows (Win32)
Currently only project files for Visual C++ 7 (.NET) are provided for Windows platform. Visual Studio project file is located under the VCppExamples directory of Rhapsodia.Scheduler library.
Examples
Consider the following scenario. Imagine a program that needs to perform three separate tasks represented by functions
task1, task2
and
task3
. Once all three tasks are done our program should inform the user of completion and exit. Here is the code that executes those tasks sequentially:
int main(int, char**)
{
task1();
task2("string argument");
task3("string argument followed by number", 9999);
std::cout << "Tasks execution completed" << std::endl;
}
If tasks are CPU bound only and we are running on a single CPU machine, then that's probably the fastest way to execute this program. If, however, any or all of those tasks involve some sort of I/O operations, such as disk or database access, or make remote procedure calls, such as CORBA or DCOM calls, then chances are that our CPU will be waiting idle for one task to finish before it can proceed to second or third task. Also, if our calls are of purely computational nature and our program runs on a multi CPU system, we will likely not see the "expected" improvement of completion time.
The solution is to somehow express the fact that our tasks are autonomous and can be run concurrently. Below is the code that will make use of Rhapsodia.Scheduler to produce the same result, except that this time, tasks might run in parallel:
#include <BasicScheduler.hpp>
int main(
int,
char**)
{
Rha::BasicScheduler() << &task1 << boost::bind(task2,
"arg") << boost::bind(task3,
"arg", 9999);
std::cout <<
"Tasks execution completed" << std::endl;
}
In the code above, std::cout
statement will be reached only after all three tasks complete their execution (normally or via exception).
Files NonMemberFunction.cpp
and ExceptionHandling.cpp
provide working and more complete examples of the above scenario. Particularly, the later shows how to deal with exceptions thrown during task execution.
Tasks stored in a container
Now that we are past the simple case, let's move on to a more challenging one. Consider a batch program that stores tasks in a container. This batch program when run, iterates through a container executing each task it finds. When there are no more tasks to run, it informs the user of completion and exits. Here is how a simplified code might look like:
class Task {
public:
void perform();
};
int main(int, char**)
{
std::vector<Task> taskList;
std::for_each(taskList.begin(), taskList.end(), boost::mem_fn(&Task::perform));
std::cout << "Tasks execution completed" << std::endl;
}
The above program follows the same strategy as in example 1, except that this time the problem becomes more apparent. Imagine the situation where each task runs for a few seconds, and our container holds hunderds or thousands of tasks. Clearly we need to introduce some concurrency. Here is an example that appears to fix this problem using thread_group from Boost thread library:
int main(int, char**)
{
std::vector<Task> taskList;
boost::thread_group threads;
for (std::vector<Task>::iterator it = taskList.begin(), end = taskList.end();
it != end;
++it)
{
threads.create_thread(bind(&Task::perform, boost::ref(*it)));
}
threads.join_all();
std::cout << "Tasks execution completed" << std::endl;
}
Under certain conditions the above program will run significantly faster, but not always as fast as expected. For example, if each task runs for a very brief moment the overhead of creating a thread, then handling a task and immediately destroying that thread may be very substantial. In other cases, where tasks run longer and there is large amount of them in the container, we may run out of system resources and terminate. In other words, creating a new thread per task does not scale.
A much better solution would be to limit number of threads that would concurrently run tasks. Also, those threads that finish processing their task should be re-assigned another task without exiting. Let's see how this can be done using Rhapsodia.Scheduler and threading policy:
#include <BasicScheduler.hpp>
1: typedef BasicScheduler::Executor::Configurator ExecCfg;
int main(int, char**)
{
std::vector<Task> taskList;
{
BasicScheduler scheduler( ExecCfg().maxThreads(20) );
for (std::vector<Task>::iterator it = taskList.begin(), end = taskList.end();
it != end;
++it)
{
scheduler.push( boost::bind( &Task::perform, ref(*it) ) );
}
}
4: std::cout << "Tasks execution completed" << std::endl;
}
On line 1, we use BoundedExecutor
when we define Scheduler type. Among other things,BoundedExecutor
takes configuration parameter (of type defined on line 2) to limit number of threads. On line 3, we construct the Scheduler and specify 20 as the maximum number of threads that will work on tasks. 20 was the number artificially picked. Possibly better alternative would be to detect number of CPUs that the hardware provides and set that number accordingly. Notice how we put scheduler
instance variable in a separate scope, just outside the for-loop
. This ensures that by the time we reach line 4 all tasks are processed. Remember, Scheduler follows the Resource Acquisition is Initialization idiom and its destructor will not return until all threads finish running. Those of you familiar with the threading terminology recognize this as an implementation of a Barrier pattern.
FileBoundedQueue.cpp
demonstrates usage of yet another policy to limit number of tasks that can be queued before Scheduler starts blocking. Additionally, policies can be combined, so it's possible to define a Scheduler type that will have bounds on number of threads and queued tasks. FilePriorityQueue.cpp
shows how to parameterize the Scheduler with a queue that will run tasks according to their priority. Finally, one can write her own Executor or Queue implementing special features not available in stock implementation and plug them into the Scheduler.
Using Futures with Rhapsodia.Scheduler
Occasionally, you may want to be able to synchronize with a task immediately after it runs without having to wait for other tasks to complete. Another example is when an application framework provides you with a reference to a Scheduler for pushing your tasks. In such case you will have no generic way of knowing when they complete as the lifetime of the Scheduler is no longer under your control. You can of course implement your own signaling system such that when a task completes, it informs registered parties but this requires considerable effort. For cases like these, use of Futures is recommended. A Future is a handler object to a result returned when the client invokes an asynchronous call. The client is then free to try to obtain the results from this future object whenever it pleases. If the client tries to extract the result from the future object before it has been initialized, the client will block. If the client does not wish to block, it can poll the future object asking if result is ready.
Here is a code snippet demonstrating use of Futures with Rhapsodia.Scheduler:
FileFutures.cpp
provides more examples of how Futures can be utilized.
Download
Bibliography
Doug Lea, Concurrent Programming in Java: Design Principles and Patterns, (second edition) published November 1, 1999 by Addison-Wesley
Douglas C. Schmidt, Michael Stal, Hans Rohnert and Frank Buschmann, Pattern-Oriented Software Architecture Volume 2 - Patterns for Concurrent and Networked Objects, Wiley 2000, ISBN 0-471-60695-2
Bjarne Stroustrup, The C++ Programming Language, Special Edition, Addison-Wesley 2000, ISBN 0-201-70073-5
Andrei Alexandrescu, Modern C++ Design: Generic Programming and Design Patterns Applied, Addison-Wesley, ISBN 0201704315
Nicolai M. Josuttis, The C++ Standard Library : A Tutorial and Reference, Addison-Wesley, ISBN 0201379260
Acknowledgments
William E. Kempf who is the architect, designer, and implementor of Boost.Threads.
Doug Lea whose book
Concurrent Programming in Java was an inspiration for writing this library. Discussions of the rationale and applications of several of
Rhapsodia.Scheduler classes can be found in the second edition of his book.