Main Page | Class List | File List | Class Members

BoundedThreadPool.hpp

00001 /*
00002  * Copyright (C) 2003-2004
00003  * Slawomir Lisznianski <slisznianski@asyncnet.com>
00004  *
00005  * Permission to use, copy, modify, distribute and sell this software
00006  * and its documentation for any purpose is hereby granted without fee,
00007  * provided that the above copyright notice appear in all copies and
00008  * that both that copyright notice and this permission notice appear
00009  * in supporting documentation. Slawomir Lisznianski makes no
00010  * representations about the suitability of this software for any
00011  * purpose. It is provided "as is" without express or implied warranty.
00012  *
00013  */
00014 
00015 #ifndef RHA_BOUNDEDTHREADPOOL_HPP
00016 #define RHA_BOUNDEDTHREADPOOL_HPP
00017 
00018 #include <map> // For thread group.
00019 #include <vector> // For expired threads collection.
00020 
00021 // Threading and mutexing
00022 // NOTE: compile with BOOST_HAS_THREADS
00023 //
00024 #include <boost/config.hpp>
00025 #ifndef BOOST_HAS_THREADS
00026 #   error   Thread support is unavailable!
00027 #endif
00028 
00029 // Threading and mutexing.
00030 //
00031 #include <boost/thread/condition.hpp>
00032 #include <boost/thread/thread.hpp>
00033 
00034 // Smart pointers
00035 //
00036 #include <boost/scoped_ptr.hpp>
00037 #include <boost/shared_ptr.hpp>
00038 
00039 namespace Rha
00040 {
00050   template<class Q>
00051   class BoundedThreadPool
00052   {
00053   public:
00054     typedef BoundedThreadPool<Q> OfType;
00055     typedef typename Q::Task     Task;
00056     typedef Q                    Queue;
00057 
00070     class Configurator
00071     {
00072     public:
00073       // Default values.
00074       // VC++ 6 doesn't support in-place initialization of static const
00075       // per C2258 and C225.
00076       //
00077       enum {defaultMinThreads = 0,
00078             defaultMaxThreads = 5,
00079             defaultGracefulTermination = 1};
00080 
00086       Configurator()
00087       {
00088         init__();
00089       }
00090 
00096       unsigned int minThreads() const
00097       {
00098         return M_minThreads;
00099       }
00100 
00107       Configurator& minThreads(unsigned int minThreads)
00108       {
00109         M_minThreads = minThreads;
00110         validate__();
00111         return (*this);
00112       }
00113 
00119       unsigned int maxThreads() const
00120       {
00121         return M_maxThreads;
00122       }
00123 
00130       Configurator& maxThreads(unsigned int maxThreads)
00131       {
00132         M_maxThreads = maxThreads;
00133         validate__();
00134         return (*this);
00135       }
00136 
00142       bool gracefulTermination() const
00143       {
00144         return M_gracefulTermination;
00145       }
00146 
00157       Configurator& gracefulTermination(bool gracefulTermination)
00158       {
00159         M_gracefulTermination = gracefulTermination;
00160         validate__();
00161         return (*this);
00162       }
00163 
00164     private:
00165       void init__()
00166       {
00167         M_minThreads = defaultMinThreads;
00168         M_maxThreads = defaultMaxThreads;
00169         M_gracefulTermination = (defaultGracefulTermination != 0);
00170       }
00171 
00172       void validate__()
00173       {
00174         // Are parameters sane?
00175         //
00176         if (M_maxThreads < M_minThreads)
00177           throw std::invalid_argument("upper thread boundry less than low thread boundry");
00178         if (M_maxThreads < 1)
00179           throw std::invalid_argument("upper thread boundry less than one");
00180       }
00181 
00182       unsigned int M_minThreads;
00183       unsigned int M_maxThreads;
00184       bool         M_gracefulTermination;
00185     };
00186 
00188     BoundedThreadPool();
00189 
00196     BoundedThreadPool(const OfType& exec);
00197 
00204     BoundedThreadPool(const Configurator& config);
00205 
00213     ~BoundedThreadPool();
00214 
00222     void setUp(Queue* queuePtr);
00223 
00229     void operator()(const Task& task);
00230 
00231   private:
00232     // Helper struct used to encapsulate thread object and its
00233     // executor data. Its destructor is responsible for joining
00234     // with the thread it wraps prior to returning.
00235     //
00236     struct ThreadHandler :
00237       boost::noncopyable
00238     {
00239       ThreadHandler(BoundedThreadPool& executor, const unsigned int threadId, boost::thread* threadPtr) :
00240         M_threadImplPtr(threadPtr),
00241         M_executor(executor),
00242         M_threadId(threadId)
00243       {
00244         ++executor.M_createdThreads;
00245       }
00246 
00247       ~ThreadHandler()
00248       {
00249         M_threadImplPtr->join();
00250       }
00251 
00252       boost::scoped_ptr<boost::thread> M_threadImplPtr;
00253       BoundedThreadPool& M_executor;
00254       const unsigned int M_threadId;
00255     };
00256     friend struct ThreadHandler;
00257     // Helper struct used to encapsulate user's job. It has various
00258     // hooks to help in keeping track of job execution status.
00259     //
00260     class ExecutableTask
00261     {
00262     public:
00263       ExecutableTask(const unsigned int threadId, BoundedThreadPool& executor) :
00264         M_executor(executor),
00265         M_threadId(threadId)
00266       {}
00267 
00268       void operator ()()
00269       {
00270         try
00271         {
00272           // This aux flag helps in reducing number of mutexing
00273           // required when updating number of working threads.
00274           //
00275           for(bool firstIteration__(true);; firstIteration__ = false)
00276           {
00277             boost::scoped_ptr<Task> taskPtr__;
00278             {
00279               // Synchronize on the queue.
00280               //
00281               QueueScopedLock lock__(M_executor.MQueuePtrAccessToken);
00282               // If this is not our first iteration
00283               // we have to decrement working thread count.
00284               //
00285               if (!firstIteration__)
00286                 --M_executor.M_workingThreads;
00287 
00288               // Wait until task queue is not empty or
00289               // we are exiting.
00290               //
00291               while (!M_executor.M_stopped && M_executor.MQueuePtr->empty())
00292               {
00293                 // Stick around only if number of created
00294                 // threads is below or equal to the minimum threads required.
00295                 //
00296                 if (M_executor.M_createdThreads > M_executor.M_configurator.minThreads())
00297                 {
00298                   threadCleanUp__();
00299                   return;
00300                 }
00301                 M_executor.M_taskPushed.wait(lock__);
00302               }
00303 
00304               // There are two scenarios in which we will exit...
00305               // 1) there are no more jobs queued and we are told to stop running or
00306               // 2) there are outstanting jobs, but GracefulShutdown flag is
00307               //    false in which case outstanding jobs are ignored.
00308               //
00309               if (M_executor.M_stopped && (!M_executor.M_configurator.gracefulTermination() ||
00310                                               M_executor.MQueuePtr->empty()))
00311               {
00312                 threadCleanUp__();
00313                 return;
00314               }
00315 
00316               // Make a copy of the task. It may throw...
00317               //
00318               taskPtr__.reset(new Task(M_executor.MQueuePtr->front()));
00319 
00320               // Remove it from the job queue.
00321               //
00322               M_executor.MQueuePtr->pop();
00323 
00324               // Signal the task was popped.
00325               //
00326               M_executor.M_taskPoped.notify_one();
00327 
00328               // Increment the working threads count
00329               //
00330               ++M_executor.M_workingThreads;
00331 
00332               // Since there could be outstanding tasks in the queue
00333               // notify fellow threads if they are waiting.
00334               //
00335               if (!M_executor.MQueuePtr->empty())
00336               {
00337                 // We may need more threads too.
00338                 //
00339                 M_executor.createThreadIfNeeded__();
00340                 M_executor.M_taskPushed.notify_one();
00341               }
00342             }
00343             (*taskPtr__)();
00344           }
00345         }
00346         catch (...)
00347         {
00348           // Synchronize on the queue.
00349           //
00350           QueueScopedLock lock__(M_executor.MQueuePtrAccessToken);
00351           onException__();
00352         }
00353       }
00354     private:
00355       // Cleans up thread resources.
00356       //
00357       void threadCleanUp__()
00358       {
00359         // Pass the exiting thread to thread collector.
00360         //
00361         M_executor.M_expiredThreads.push_back(M_threadId);
00362         // Decrement number of created threads to prevent other threads from expiring
00363         // unnceseraly. New threads will not be created until existing 'dead' threads
00364         // are joined, so we should never leak threads.
00365         // This operation was intentionally not put in an auto-variable (RAII idiom)
00366         // as we want to reuse existing synchronized section.
00367         //
00368         --M_executor.M_createdThreads;
00369       }
00370 
00371       // Handles exception within thread.
00372       //
00373       void onException__()
00374       {
00375         // Since tasks are not allowed to leak exceptions,
00376         // we consider this condition unexpected.
00377         //
00378         std::unexpected();
00379       }
00380 
00381       BoundedThreadPool& M_executor;
00382       const unsigned int M_threadId;
00383     };
00384     friend class ExecutableTask;
00385 
00386     // Unconditionally creates a new thread.
00387     // This method must be invoked within a synchronized section.
00388     //
00389     void createThread__();
00390 
00391     // Conditionally creates a new thread.
00392     // This method must be invoked within a synchronized section.
00393     //
00394     void createThreadIfNeeded__();
00395 
00396     typedef boost::condition                                           Condition;
00397     typedef std::vector<unsigned int>                                  ThreadIdSeq;
00398     typedef std::map<unsigned int,  boost::shared_ptr<ThreadHandler> > ThreadGroup;
00399     typedef boost::mutex                                               QueueAccessToken;
00400     typedef boost::mutex::scoped_lock                                  QueueScopedLock;
00401 
00402     // Executor configurator
00403     Configurator M_configurator;
00404     // Thread accounting
00405     unsigned int     M_workingThreads;
00406     unsigned int     M_createdThreads;
00407     ThreadIdSeq      M_expiredThreads;
00408     ThreadGroup      M_threadGroup;
00409     unsigned int     M_threadId;
00410     // State signals, access control and state holders.
00411     QueueAccessToken MQueuePtrAccessToken;
00412     Condition        M_taskPushed;
00413     Condition        M_taskPoped;
00414     bool             M_stopped;
00415     Queue*           MQueuePtr;
00416   };
00417 }
00418 
00419 using namespace Rha;
00420 
00421 template<class Q>
00422 BoundedThreadPool<Q>::BoundedThreadPool()
00423 { }
00424 
00425 template<class Q>
00426 BoundedThreadPool<Q>::BoundedThreadPool(const Configurator& config) :
00427   M_configurator(config)
00428 { }
00429 
00430 template<class Q>
00431 BoundedThreadPool<Q>::BoundedThreadPool(const OfType& exec) :
00432   M_configurator(exec.M_configurator)
00433 { }
00434 
00435 template<class Q>
00436 void
00437 BoundedThreadPool<Q>::setUp(Queue* queuePtr)
00438 {
00439   // Reset stats.
00440   //
00441   M_workingThreads = 0;
00442   M_createdThreads = 0;
00443   M_threadId = 0;
00444   M_stopped = false;
00445 
00446   // Set pointer to queue.
00447   //
00448   MQueuePtr = queuePtr;
00449 
00450   // Optionally start worker threads.
00451   //
00452   for (unsigned int i = 0; i < M_configurator.minThreads(); ++i)
00453     createThread__();
00454 }
00455 
00456 template<class Q>
00457 BoundedThreadPool<Q>::~BoundedThreadPool()
00458 {
00459   {
00460     // Synchronize on the queue.
00461     //
00462     QueueScopedLock lock__(MQueuePtrAccessToken);
00463 
00464     // Set the flag to indicate we are exiting...
00465     //
00466     M_stopped = true;
00467 
00468     // Emit signal to force worker threads
00469     // check their 'stopped' flag.
00470     //
00471     M_taskPushed.notify_all();
00472 
00473     // Emit signal to force incoming call threads
00474     // check their 'stopped' flag.
00475     //
00476     M_taskPoped.notify_all();
00477   } // Store mutex released.
00478   //
00479   // Clear thread group. It will wait for
00480   // each worker thread to exit.
00481   //
00482   M_threadGroup.clear();
00483 }
00484 
00485 template<class Q>
00486 void
00487 BoundedThreadPool<Q>::operator()(const Task& task)
00488 {
00489   // Synchronize queue access.
00490   //
00491   QueueScopedLock lock__(MQueuePtrAccessToken);
00492 
00493   // Check if there are any expired threads
00494   // that need to be cleaned up first.
00495   //
00496   if (!M_expiredThreads.empty())
00497   {
00498     //std::for_each(M_expiredThreads.begin(), M_expiredThreads.end(), boost::bind(&ThreadGroup::erase, M_threadGroup, _1));
00499     for (ThreadIdSeq::const_iterator threadIdIt__ = M_expiredThreads.begin();
00500          threadIdIt__ != M_expiredThreads.end();
00501          ++threadIdIt__)
00502     {
00503       M_threadGroup.erase(*threadIdIt__);
00504     }
00505   }
00506   // We may have to create a thread
00507   // if user specified 0 (zero) for minimum.
00508   //
00509   createThreadIfNeeded__();
00510 
00511   // If queue reports full, we have to wait.
00512   //
00513   while (MQueuePtr->full())
00514     M_taskPoped.wait(lock__);
00515 
00516   // Push job for pickup by worker thread.
00517   //
00518   MQueuePtr->push(task);
00519 
00520   // Send out the signal that the job is ready
00521   // for processing.
00522   //
00523   M_taskPushed.notify_one();
00524 }
00525 
00526 template<class Q>
00527 void
00528 BoundedThreadPool<Q>::createThread__()
00529 {
00530   ++M_threadId;
00531   std::auto_ptr<boost::thread> threadPtr__(new boost::thread(ExecutableTask(M_threadId, *this)));
00532   boost::shared_ptr<ThreadHandler> tHandlerPtr__(new ThreadHandler(*this, M_threadId, threadPtr__.release()));
00533   M_threadGroup.insert(std::make_pair<unsigned int, boost::shared_ptr<ThreadHandler> >(M_threadId, tHandlerPtr__));
00534 }
00535 
00536 template<class Q>
00537 void
00538 BoundedThreadPool<Q>::createThreadIfNeeded__()
00539 {
00540   // Check if there is enough worker threads
00541   // handling the load.
00542   // We will create a new thread if one of the
00543   // following conditions exists:
00544   // - There is currently more working threads than
00545   //   user specified miniumum threads and number of created
00546   //   threads is less than maximum allowed threads,
00547   // - Or, number of created threads is less than minimum
00548   //   threads. This is an abnormal condition unless exception
00549   //   is allowed to leak from a job and thread exits,
00550   // - Or, number of user specified minimum threads equals
00551   //   zero and number of created threads is zero
00552   //   (boostrap).
00553   //
00554   if ((M_workingThreads >= M_configurator.minThreads() && M_createdThreads < M_configurator.maxThreads()) ||
00555        M_createdThreads < M_configurator.minThreads() ||
00556       M_configurator.minThreads() == M_createdThreads && M_configurator.minThreads() == 0)
00557   {
00558     createThread__();
00559   }
00560 }
00561 
00562 #endif // RHA_BOUNDEDTHREADPOOL_HPP