00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #ifndef RHA_BOUNDEDTHREADPOOL_HPP
00016 #define RHA_BOUNDEDTHREADPOOL_HPP
00017
00018 #include <map>
00019 #include <vector>
00020
00021
00022
00023
00024 #include <boost/config.hpp>
00025 #ifndef BOOST_HAS_THREADS
00026 # error Thread support is unavailable!
00027 #endif
00028
00029
00030
00031 #include <boost/thread/condition.hpp>
00032 #include <boost/thread/thread.hpp>
00033
00034
00035
00036 #include <boost/scoped_ptr.hpp>
00037 #include <boost/shared_ptr.hpp>
00038
00039 namespace Rha
00040 {
00050 template<class Q>
00051 class BoundedThreadPool
00052 {
00053 public:
00054 typedef BoundedThreadPool<Q> OfType;
00055 typedef typename Q::Task Task;
00056 typedef Q Queue;
00057
00070 class Configurator
00071 {
00072 public:
00073
00074
00075
00076
00077 enum {defaultMinThreads = 0,
00078 defaultMaxThreads = 5,
00079 defaultGracefulTermination = 1};
00080
00086 Configurator()
00087 {
00088 init__();
00089 }
00090
00096 unsigned int minThreads() const
00097 {
00098 return M_minThreads;
00099 }
00100
00107 Configurator& minThreads(unsigned int minThreads)
00108 {
00109 M_minThreads = minThreads;
00110 validate__();
00111 return (*this);
00112 }
00113
00119 unsigned int maxThreads() const
00120 {
00121 return M_maxThreads;
00122 }
00123
00130 Configurator& maxThreads(unsigned int maxThreads)
00131 {
00132 M_maxThreads = maxThreads;
00133 validate__();
00134 return (*this);
00135 }
00136
00142 bool gracefulTermination() const
00143 {
00144 return M_gracefulTermination;
00145 }
00146
00157 Configurator& gracefulTermination(bool gracefulTermination)
00158 {
00159 M_gracefulTermination = gracefulTermination;
00160 validate__();
00161 return (*this);
00162 }
00163
00164 private:
00165 void init__()
00166 {
00167 M_minThreads = defaultMinThreads;
00168 M_maxThreads = defaultMaxThreads;
00169 M_gracefulTermination = (defaultGracefulTermination != 0);
00170 }
00171
00172 void validate__()
00173 {
00174
00175
00176 if (M_maxThreads < M_minThreads)
00177 throw std::invalid_argument("upper thread boundry less than low thread boundry");
00178 if (M_maxThreads < 1)
00179 throw std::invalid_argument("upper thread boundry less than one");
00180 }
00181
00182 unsigned int M_minThreads;
00183 unsigned int M_maxThreads;
00184 bool M_gracefulTermination;
00185 };
00186
00188 BoundedThreadPool();
00189
00196 BoundedThreadPool(const OfType& exec);
00197
00204 BoundedThreadPool(const Configurator& config);
00205
00213 ~BoundedThreadPool();
00214
00222 void setUp(Queue* queuePtr);
00223
00229 void operator()(const Task& task);
00230
00231 private:
00232
00233
00234
00235
00236 struct ThreadHandler :
00237 boost::noncopyable
00238 {
00239 ThreadHandler(BoundedThreadPool& executor, const unsigned int threadId, boost::thread* threadPtr) :
00240 M_threadImplPtr(threadPtr),
00241 M_executor(executor),
00242 M_threadId(threadId)
00243 {
00244 ++executor.M_createdThreads;
00245 }
00246
00247 ~ThreadHandler()
00248 {
00249 M_threadImplPtr->join();
00250 }
00251
00252 boost::scoped_ptr<boost::thread> M_threadImplPtr;
00253 BoundedThreadPool& M_executor;
00254 const unsigned int M_threadId;
00255 };
00256 friend struct ThreadHandler;
00257
00258
00259
00260 class ExecutableTask
00261 {
00262 public:
00263 ExecutableTask(const unsigned int threadId, BoundedThreadPool& executor) :
00264 M_executor(executor),
00265 M_threadId(threadId)
00266 {}
00267
00268 void operator ()()
00269 {
00270 try
00271 {
00272
00273
00274
00275 for(bool firstIteration__(true);; firstIteration__ = false)
00276 {
00277 boost::scoped_ptr<Task> taskPtr__;
00278 {
00279
00280
00281 QueueScopedLock lock__(M_executor.MQueuePtrAccessToken);
00282
00283
00284
00285 if (!firstIteration__)
00286 --M_executor.M_workingThreads;
00287
00288
00289
00290
00291 while (!M_executor.M_stopped && M_executor.MQueuePtr->empty())
00292 {
00293
00294
00295
00296 if (M_executor.M_createdThreads > M_executor.M_configurator.minThreads())
00297 {
00298 threadCleanUp__();
00299 return;
00300 }
00301 M_executor.M_taskPushed.wait(lock__);
00302 }
00303
00304
00305
00306
00307
00308
00309 if (M_executor.M_stopped && (!M_executor.M_configurator.gracefulTermination() ||
00310 M_executor.MQueuePtr->empty()))
00311 {
00312 threadCleanUp__();
00313 return;
00314 }
00315
00316
00317
00318 taskPtr__.reset(new Task(M_executor.MQueuePtr->front()));
00319
00320
00321
00322 M_executor.MQueuePtr->pop();
00323
00324
00325
00326 M_executor.M_taskPoped.notify_one();
00327
00328
00329
00330 ++M_executor.M_workingThreads;
00331
00332
00333
00334
00335 if (!M_executor.MQueuePtr->empty())
00336 {
00337
00338
00339 M_executor.createThreadIfNeeded__();
00340 M_executor.M_taskPushed.notify_one();
00341 }
00342 }
00343 (*taskPtr__)();
00344 }
00345 }
00346 catch (...)
00347 {
00348
00349
00350 QueueScopedLock lock__(M_executor.MQueuePtrAccessToken);
00351 onException__();
00352 }
00353 }
00354 private:
00355
00356
00357 void threadCleanUp__()
00358 {
00359
00360
00361 M_executor.M_expiredThreads.push_back(M_threadId);
00362
00363
00364
00365
00366
00367
00368 --M_executor.M_createdThreads;
00369 }
00370
00371
00372
00373 void onException__()
00374 {
00375
00376
00377
00378 std::unexpected();
00379 }
00380
00381 BoundedThreadPool& M_executor;
00382 const unsigned int M_threadId;
00383 };
00384 friend class ExecutableTask;
00385
00386
00387
00388
00389 void createThread__();
00390
00391
00392
00393
00394 void createThreadIfNeeded__();
00395
00396 typedef boost::condition Condition;
00397 typedef std::vector<unsigned int> ThreadIdSeq;
00398 typedef std::map<unsigned int, boost::shared_ptr<ThreadHandler> > ThreadGroup;
00399 typedef boost::mutex QueueAccessToken;
00400 typedef boost::mutex::scoped_lock QueueScopedLock;
00401
00402
00403 Configurator M_configurator;
00404
00405 unsigned int M_workingThreads;
00406 unsigned int M_createdThreads;
00407 ThreadIdSeq M_expiredThreads;
00408 ThreadGroup M_threadGroup;
00409 unsigned int M_threadId;
00410
00411 QueueAccessToken MQueuePtrAccessToken;
00412 Condition M_taskPushed;
00413 Condition M_taskPoped;
00414 bool M_stopped;
00415 Queue* MQueuePtr;
00416 };
00417 }
00418
00419 using namespace Rha;
00420
00421 template<class Q>
00422 BoundedThreadPool<Q>::BoundedThreadPool()
00423 { }
00424
00425 template<class Q>
00426 BoundedThreadPool<Q>::BoundedThreadPool(const Configurator& config) :
00427 M_configurator(config)
00428 { }
00429
00430 template<class Q>
00431 BoundedThreadPool<Q>::BoundedThreadPool(const OfType& exec) :
00432 M_configurator(exec.M_configurator)
00433 { }
00434
00435 template<class Q>
00436 void
00437 BoundedThreadPool<Q>::setUp(Queue* queuePtr)
00438 {
00439
00440
00441 M_workingThreads = 0;
00442 M_createdThreads = 0;
00443 M_threadId = 0;
00444 M_stopped = false;
00445
00446
00447
00448 MQueuePtr = queuePtr;
00449
00450
00451
00452 for (unsigned int i = 0; i < M_configurator.minThreads(); ++i)
00453 createThread__();
00454 }
00455
00456 template<class Q>
00457 BoundedThreadPool<Q>::~BoundedThreadPool()
00458 {
00459 {
00460
00461
00462 QueueScopedLock lock__(MQueuePtrAccessToken);
00463
00464
00465
00466 M_stopped = true;
00467
00468
00469
00470
00471 M_taskPushed.notify_all();
00472
00473
00474
00475
00476 M_taskPoped.notify_all();
00477 }
00478
00479
00480
00481
00482 M_threadGroup.clear();
00483 }
00484
00485 template<class Q>
00486 void
00487 BoundedThreadPool<Q>::operator()(const Task& task)
00488 {
00489
00490
00491 QueueScopedLock lock__(MQueuePtrAccessToken);
00492
00493
00494
00495
00496 if (!M_expiredThreads.empty())
00497 {
00498
00499 for (ThreadIdSeq::const_iterator threadIdIt__ = M_expiredThreads.begin();
00500 threadIdIt__ != M_expiredThreads.end();
00501 ++threadIdIt__)
00502 {
00503 M_threadGroup.erase(*threadIdIt__);
00504 }
00505 }
00506
00507
00508
00509 createThreadIfNeeded__();
00510
00511
00512
00513 while (MQueuePtr->full())
00514 M_taskPoped.wait(lock__);
00515
00516
00517
00518 MQueuePtr->push(task);
00519
00520
00521
00522
00523 M_taskPushed.notify_one();
00524 }
00525
00526 template<class Q>
00527 void
00528 BoundedThreadPool<Q>::createThread__()
00529 {
00530 ++M_threadId;
00531 std::auto_ptr<boost::thread> threadPtr__(new boost::thread(ExecutableTask(M_threadId, *this)));
00532 boost::shared_ptr<ThreadHandler> tHandlerPtr__(new ThreadHandler(*this, M_threadId, threadPtr__.release()));
00533 M_threadGroup.insert(std::make_pair<unsigned int, boost::shared_ptr<ThreadHandler> >(M_threadId, tHandlerPtr__));
00534 }
00535
00536 template<class Q>
00537 void
00538 BoundedThreadPool<Q>::createThreadIfNeeded__()
00539 {
00540
00541
00542
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554 if ((M_workingThreads >= M_configurator.minThreads() && M_createdThreads < M_configurator.maxThreads()) ||
00555 M_createdThreads < M_configurator.minThreads() ||
00556 M_configurator.minThreads() == M_createdThreads && M_configurator.minThreads() == 0)
00557 {
00558 createThread__();
00559 }
00560 }
00561
00562 #endif // RHA_BOUNDEDTHREADPOOL_HPP