00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include "stdafx.h"
00037 #include "ThreadPool.h"
00038
00039 #include "ErrorHandlerMacros.h"
00040 #include "GenericSemaphore.h"
00041 #include "GenericCriticalSection.h"
00042 #include "GenericEvent.h"
00043 #include "WaitList.h"
00044 #include "OSManager.h"
00045
00046 #include <objbase.h>
00047
00048 #ifdef _MEMORY_DEBUG
00049 #define new DEBUG_NEW
00050 #define malloc DEBUG_MALLOC
00051 static char THIS_FILE[] = __FILE__;
00052 #endif
00053
00054 KOMODIA_NAMESPACE_START
00055
00056
00057
00058 CThreadPoolManager CThreadPoolManager::m_aInstance;
00059
00060 CThreadPoolManager::CThreadPoolManager()
00061 {
00062
00063 m_pCS=COSManager::CreateCriticalSection();
00064 }
00065
00066 CThreadPoolManager::~CThreadPoolManager()
00067 {
00068
00069 delete m_pCS;
00070 }
00071
00072 CThreadPool* CThreadPoolManager::GetPool(DWORD dwThreads)
00073 {
00074
00075 CCriticalAutoRelease aRelease(m_pCS);
00076
00077
00078 PoolMap::iterator aIterator;
00079 aIterator=m_aData.find(dwThreads);
00080
00081
00082 if (aIterator==m_aData.end())
00083 {
00084
00085 aRelease.Exit();
00086
00087
00088 return new CThreadPool(dwThreads);
00089 }
00090 else
00091 {
00092
00093 CThreadPool* pPool;
00094 pPool=aIterator->second;
00095
00096
00097 m_aData.erase(aIterator);
00098
00099
00100 aRelease.Exit();
00101
00102
00103 pPool->Clear();
00104
00105
00106 return pPool;
00107 }
00108 }
00109
00110 void CThreadPoolManager::ReleasePool(CThreadPool** ppPool)
00111 {
00112
00113 if (!ppPool ||
00114 !*ppPool)
00115 return;
00116
00117
00118 CThreadPool* pTmp;
00119 pTmp=*ppPool;
00120 *ppPool=NULL;
00121
00122
00123 ReleasePool(pTmp);
00124 }
00125
00126 void CThreadPoolManager::ReleasePool(CThreadPool* pPool)
00127 {
00128
00129 if (!pPool)
00130 return;
00131
00132
00133 pPool->Clear();
00134
00135
00136 DWORD dwTimeout;
00137 dwTimeout=pPool->GetTimeout();
00138 if (!dwTimeout)
00139 dwTimeout=10000;
00140
00141
00142 DWORD dwTick;
00143 dwTick=GetTickCount();
00144
00145
00146 while (1)
00147 {
00148
00149 if (!pPool->GetRunningThreads())
00150 break;
00151
00152
00153 if (GetTickCount()-dwTick>=dwTimeout)
00154 break;
00155
00156
00157 Sleep(50);
00158 }
00159
00160
00161 if (pPool->GetRunningThreads())
00162 {
00163
00164 pPool->SetTimeout(0);
00165
00166
00167 delete pPool;
00168
00169
00170 return;
00171 }
00172
00173
00174 CCriticalAutoRelease aRelease(m_pCS);
00175
00176
00177 m_aData.insert(PoolMap::value_type(pPool->GetMaxThreads(),pPool));
00178 }
00179
00180 CThreadPoolManager& CThreadPoolManager::GetInstance()
00181 {
00182 return m_aInstance;
00183 }
00184
00185
00186
00187
00188
00189 #define CThreadPool_Class "CThreadPool"
00190
00191
00192 #define THREAD_TIMEOUT 10000
00193
00194 CThreadPool::CThreadPool(unsigned long ulNumberOfThreads,
00195 unsigned long ulMaxJobsPending,
00196 LPInitializePoolProc pInitializeProc,
00197 CGenericThread::ThreadPriority aPriority,
00198 COMInit bInitializeCOM) : CErrorHandler(),
00199 m_ulThreadCount(ulNumberOfThreads),
00200 m_pSemaphore(NULL),
00201 m_ppThreads(NULL),
00202 m_pCSection(NULL),
00203 m_ulJobsCount(0),
00204 m_ppThreadData(NULL),
00205 m_iMaxJobsPending(ulMaxJobsPending),
00206 m_pCSectionCounter(NULL),
00207 m_iRunningThreads(0),
00208 m_bInitialized(FALSE),
00209 m_dwSleepInterval(1),
00210 m_bDrop(FALSE),
00211 m_iTotalJobsRan(0),
00212 m_bInitializeCOM(bInitializeCOM),
00213 m_dwTimeout(0)
00214 {
00215 try
00216 {
00217
00218 SetName(CThreadPool_Class);
00219
00220
00221 if (!ulNumberOfThreads)
00222 {
00223
00224 ReportError("CThreadPool","Received zero threads!");
00225
00226
00227 throw std::string("Received zero threads!");
00228 }
00229
00230
00231 m_pSemaphore=COSManager::CreateSemaphore(0,
00232 m_iMaxJobsPending);
00233
00234
00235 m_pCSection=COSManager::CreateCriticalSection();
00236 m_pCSectionCounter=COSManager::CreateCriticalSection();
00237
00238
00239 m_bInitialized=SpawnThreads(pInitializeProc,
00240 aPriority,
00241 m_bInitializeCOM);
00242 }
00243 ERROR_HANDLER_RETHROW("CThreadPool")
00244 }
00245
00246 CThreadPool::~CThreadPool()
00247 {
00248 try
00249 {
00250
00251 ClearThreads();
00252
00253
00254 delete m_pSemaphore;
00255
00256
00257 delete m_pCSection;
00258 delete m_pCSectionCounter;
00259 }
00260 ERROR_HANDLER("~CThreadPool")
00261 }
00262
00263 void CThreadPool::SetTimeout(DWORD dwTimeout)
00264 {
00265 m_dwTimeout=dwTimeout;
00266 }
00267
00268 DWORD CThreadPool::GetTimeout()const
00269 {
00270 return m_dwTimeout;
00271 }
00272
00273 BOOL CThreadPool::SpawnThreads(LPInitializePoolProc pInitializeProc,
00274 CGenericThread::ThreadPriority aPriority,
00275 COMInit bCOM)
00276 {
00277 try
00278 {
00279
00280 m_ppThreads=new CGenericThread*[m_ulThreadCount];
00281
00282
00283 m_ppThreadData=new ThreadData*[m_ulThreadCount];
00284
00285
00286 memset(m_ppThreads,
00287 0,
00288 sizeof(CGenericThread*)*m_ulThreadCount);
00289 memset(m_ppThreadData,
00290 0,
00291 sizeof(ThreadData*)*m_ulThreadCount);
00292
00293
00294 CWaitList aWaitingList;
00295
00296
00297 for (int iCount=0;
00298 iCount<m_ulThreadCount;
00299 ++iCount)
00300 {
00301 m_ppThreadData[iCount]=new ThreadData;
00302
00303
00304 m_ppThreadData[iCount]->pEvent=COSManager::CreateEvent();
00305 m_ppThreadData[iCount]->pExitEvent=COSManager::CreateEvent();
00306 m_ppThreadData[iCount]->bCOM=bCOM;
00307
00308
00309 aWaitingList.AddObject(m_ppThreadData[iCount]->pEvent,TRUE);
00310
00311
00312 m_ppThreadData[iCount]->pClass=this;
00313
00314
00315 m_ppThreadData[iCount]->pInitializeProc=pInitializeProc;
00316
00317
00318 m_ppThreads[iCount]=COSManager::CreateThread(PoolThread);
00319
00320
00321 m_ppThreads[iCount]->SetPriority(aPriority);
00322
00323
00324 m_ppThreads[iCount]->Start((LPVOID)m_ppThreadData[iCount]);
00325 }
00326
00327
00328 BOOL bError;
00329 bError=FALSE;
00330
00331
00332 int iTimeout;
00333
00334
00335 if (m_ulThreadCount>20)
00336 iTimeout=THREAD_TIMEOUT+(m_ulThreadCount-20)*100;
00337 else
00338 iTimeout=THREAD_TIMEOUT;
00339
00340
00341 DWORD dwTmp;
00342 if (aWaitingList.Wait(TRUE,
00343 dwTmp,
00344 THREAD_TIMEOUT))
00345 {
00346
00347 ReportError("SpawnThreads","Timeout waiting for threads!");
00348
00349
00350 bError=TRUE;
00351 }
00352
00353
00354 return !bError;
00355 }
00356 ERROR_HANDLER_RETURN("SpawnThreads",FALSE)
00357 }
00358
00359 BOOL CThreadPool::SubmitJob(LPThreadPoolProc pJobProc,
00360 LPVOID lpData)
00361 {
00362 try
00363 {
00364 return SubmitJob(pJobProc,
00365 NULL,
00366 NULL,
00367 lpData);
00368 }
00369 ERROR_HANDLER_RETURN("SubmitJob",FALSE)
00370 }
00371
00372 BOOL CThreadPool::SubmitJob(LPThreadDWORDPoolProc pJobProc,
00373 LPVOID lpData)
00374 {
00375 try
00376 {
00377 return SubmitJob(NULL,
00378 pJobProc,
00379 NULL,
00380 lpData);
00381 }
00382 ERROR_HANDLER_RETURN("SubmitJob",FALSE)
00383 }
00384
00385 BOOL CThreadPool::SubmitJob(LPThreadPoolDataProc pJobProc,
00386 LPVOID lpData)
00387 {
00388 try
00389 {
00390 return SubmitJob(NULL,
00391 NULL,
00392 pJobProc,
00393 lpData);
00394 }
00395 ERROR_HANDLER_RETURN("SubmitJob",FALSE)
00396 }
00397
00398 BOOL CThreadPool::SubmitJob(LPThreadPoolProc pJobProc,
00399 LPThreadDWORDPoolProc pDWORDJobProc,
00400 LPThreadPoolDataProc pDataProc,
00401 LPVOID lpData)
00402 {
00403 try
00404 {
00405
00406 BOOL bRelease;
00407 bRelease=FALSE;
00408
00409
00410 JobData aData;
00411 aData.pProc=pJobProc;
00412 aData.pDWORDProc=pDWORDJobProc;
00413 aData.pDataProc=pDataProc;
00414 aData.pParam=lpData;
00415
00416
00417 CCriticalAutoRelease aRelease(m_pCSection);
00418
00419
00420 if (m_ulJobsCount>=m_iMaxJobsPending &&
00421 m_bDrop)
00422
00423 return FALSE;
00424
00425
00426 m_aJobList.push_back(aData);
00427
00428
00429 ++m_ulJobsCount;
00430
00431
00432 aRelease.Exit();
00433
00434
00435 while (m_pSemaphore->Release()==-1)
00436 Sleep(m_dwSleepInterval);
00437
00438
00439 return TRUE;
00440 }
00441 ERROR_HANDLER_RETURN("SubmitJob",FALSE)
00442 }
00443
00444 DWORD CThreadPool::PoolThread(LPVOID pParam)
00445 {
00446 try
00447 {
00448
00449 ThreadData* pData;
00450 pData=(ThreadData*)pParam;
00451
00452
00453
00454
00455
00456 BOOL bError;
00457 bError=FALSE;
00458
00459
00460 LPVOID lpData;
00461 lpData=NULL;
00462
00463
00464 if (pData->pInitializeProc)
00465 {
00466 try
00467 {
00468 bError=!(*pData->pInitializeProc)(lpData,TRUE);
00469 }
00470 catch (...)
00471 {
00472 bError=TRUE;
00473 }
00474 }
00475
00476
00477 if (pData->bCOM!=ciNone)
00478 {
00479
00480 HRESULT hr;
00481 switch (pData->bCOM)
00482 {
00483 case ciSingle:
00484 hr=CoInitialize(NULL);
00485 break;
00486 case ciApartment:
00487 hr=CoInitializeEx(NULL,
00488 COINIT_APARTMENTTHREADED);
00489 case ciMulti:
00490 hr=CoInitializeEx(NULL,
00491 COINIT_MULTITHREADED);
00492 }
00493
00494 if (FAILED(hr))
00495 {
00496
00497 ReportStaticError(CThreadPool_Class,"PoolThread","Failed to initialize COM!",hr);
00498
00499
00500 bError=TRUE;
00501 }
00502 }
00503
00504
00505 if (pData->pEvent)
00506 if (!bError)
00507 pData->pEvent->Set();
00508 else
00509 {
00510
00511 ReportStaticError(CThreadPool_Class,"PoolThread","Initialize proc failed!");
00512
00513
00514 return FALSE;
00515 }
00516 else
00517 {
00518
00519 ReportStaticError(CThreadPool_Class,"PoolThread","Received null event!");
00520
00521
00522 return FALSE;
00523 }
00524
00525
00526 CWaitList aList;
00527 aList.AddObject(pData->pClass->GetSemaphore(),
00528 TRUE);
00529 aList.AddObject(pData->pExitEvent);
00530
00531
00532 while (1)
00533 {
00534 DWORD dwObject;
00535
00536
00537 aList.Wait(FALSE,
00538 dwObject);
00539
00540
00541 if (dwObject)
00542 break;
00543
00544
00545 JobData aJobData;
00546 pData->pClass->GetJob(aJobData);
00547
00548
00549 if (aJobData.pProc ||
00550 aJobData.pDWORDProc ||
00551 aJobData.pDataProc)
00552 {
00553
00554 pData->bInsideThread=true;
00555
00556
00557 pData->pClass->ReSetThreadCount(1);
00558
00559
00560 try
00561 {
00562
00563 if (aJobData.pProc)
00564
00565 (*aJobData.pProc)(aJobData.pParam);
00566 else if (aJobData.pDWORDProc)
00567
00568 (*aJobData.pDWORDProc)(aJobData.pParam);
00569 else if (aJobData.pDataProc)
00570 (*aJobData.pDataProc)(aJobData.pParam,lpData);
00571 }
00572 ERROR_HANDLER_STATIC(CThreadPool_Class,"PoolThread - Proc")
00573
00574
00575 pData->pClass->ReSetThreadCount(-1);
00576
00577
00578 pData->bInsideThread=false;
00579 }
00580 }
00581
00582 if (pData->bCOM)
00583
00584 CoUninitialize();
00585
00586
00587 if (pData->pInitializeProc)
00588 {
00589 try
00590 {
00591 bError=!(*pData->pInitializeProc)(lpData,FALSE);
00592 }
00593 catch (...)
00594 {
00595 bError=TRUE;
00596 }
00597 }
00598
00599
00600 pData->pEvent->Set();
00601
00602
00603 return TRUE;
00604 }
00605 ERROR_HANDLER_STATIC_RETURN(CThreadPool_Class,"PoolThread",FALSE)
00606 }
00607
00608 CGenericSemaphore* CThreadPool::GetSemaphore() const
00609 {
00610 return m_pSemaphore;
00611 }
00612
00613 void CThreadPool::GetJob(JobData& aData)
00614 {
00615 try
00616 {
00617
00618 CCriticalAutoRelease aRelease(m_pCSection);
00619
00620
00621 if (!m_ulJobsCount)
00622 {
00623
00624 aRelease.Exit();
00625
00626
00627 aData.pProc=NULL;
00628 aData.pDWORDProc=NULL;
00629 aData.pDataProc=NULL;
00630 aData.pParam=NULL;
00631
00632
00633 return;
00634 }
00635
00636
00637 aData=m_aJobList.front();
00638
00639
00640 m_aJobList.pop_front();
00641
00642
00643 --m_ulJobsCount;
00644 }
00645 ERROR_HANDLER("GetJob")
00646 }
00647
00648 void CThreadPool::FastStop()
00649 {
00650 try
00651 {
00652
00653 Clear();
00654
00655
00656 ClearThreads(true);
00657 }
00658 ERROR_HANDLER("FastStop")
00659 }
00660
00661 void CThreadPool::ClearThreads(bool bFast)
00662 {
00663 try
00664 {
00665
00666 if (m_bInitialized)
00667 {
00668
00669 int iTimeout;
00670
00671
00672 if (!bFast)
00673
00674 if (!m_dwTimeout)
00675 if (m_ulThreadCount>20)
00676 iTimeout=THREAD_TIMEOUT+(m_ulThreadCount-20)*100;
00677 else
00678 iTimeout=THREAD_TIMEOUT;
00679 else
00680 iTimeout=m_dwTimeout;
00681 else
00682 iTimeout=1000;
00683
00684
00685 CWaitList aList;
00686
00687
00688 for (int iCount=0;
00689 iCount<m_ulThreadCount;
00690 iCount++)
00691 {
00692
00693 aList.AddObject(m_ppThreadData[iCount]->pEvent,TRUE);
00694
00695
00696 m_ppThreadData[iCount]->pExitEvent->Set();
00697 }
00698
00699
00700 DWORD dwTmp;
00701 if (aList.Wait(TRUE,
00702 dwTmp,
00703 iTimeout))
00704
00705 ReportError("ClearThreads","Timeout waiting for threads!");
00706 else
00707
00708 for (int iRun=0;
00709 iRun<iTimeout/200;
00710 iRun++)
00711 {
00712 bool bNotFinished;
00713 bNotFinished=false;
00714
00715
00716 for (int iCount=0;
00717 iCount<m_ulThreadCount;
00718 iCount++)
00719 if (m_ppThreads[iCount]->GetThreadStatus()!=CGenericThread::tsStopped)
00720 {
00721
00722 bNotFinished=true;
00723
00724
00725 break;
00726 }
00727
00728
00729 if (!bNotFinished)
00730 break;
00731 else
00732 Sleep(100);
00733 }
00734 }
00735
00736 if (m_ppThreads &&
00737 m_ppThreadData)
00738 {
00739
00740 for (int iCount=0;
00741 iCount<m_ulThreadCount;
00742 iCount++)
00743 {
00744
00745 delete m_ppThreads[iCount];
00746
00747
00748 delete m_ppThreadData[iCount]->pEvent;
00749 delete m_ppThreadData[iCount]->pExitEvent;
00750
00751
00752 delete m_ppThreadData[iCount];
00753 }
00754
00755
00756 delete [] m_ppThreads;
00757 delete [] m_ppThreadData;
00758 }
00759
00760
00761 m_ppThreads=NULL;
00762 m_ppThreadData=NULL;
00763
00764
00765 m_bInitialized=false;
00766 }
00767 ERROR_HANDLER("ClearThreads")
00768 }
00769
00770 int CThreadPool::GetWaitingJobs()const
00771 {
00772 try
00773 {
00774
00775 int iJobs;
00776 iJobs=m_ulJobsCount;
00777
00778
00779 return iJobs;
00780 }
00781 ERROR_HANDLER_RETURN("GetWaitingJobs",0)
00782 }
00783
00784 void CThreadPool::ReSetThreadCount(int iIncrement)
00785 {
00786 try
00787 {
00788
00789 CCriticalAutoRelease aRelease(m_pCSectionCounter);
00790
00791
00792 m_iRunningThreads+=iIncrement;
00793
00794
00795 if (iIncrement<0)
00796 m_iTotalJobsRan-=iIncrement;
00797 }
00798 ERROR_HANDLER("ReSetThreadCount")
00799 }
00800
00801 BOOL CThreadPool::IsFinished()const
00802 {
00803 return !m_ulJobsCount &&
00804 !m_iRunningThreads;
00805 }
00806
00807 DWORD CThreadPool::GetMaxThreads()const
00808 {
00809 return m_ulThreadCount;
00810 }
00811
00812 BOOL CThreadPool::IsInitialized()const
00813 {
00814 return m_bInitialized;
00815 }
00816
00817 void CThreadPool::SetSleepInterval(DWORD dwSleepInterval)
00818 {
00819 m_dwSleepInterval=dwSleepInterval;
00820 }
00821
00822 void CThreadPool::SetExtraDataDrop(BOOL bDrop)
00823 {
00824 m_bDrop=bDrop;
00825 }
00826
00827 int CThreadPool::GetRunningThreads()const
00828 {
00829 return m_iRunningThreads;
00830 }
00831
00832 void CThreadPool::Clear()
00833 {
00834 try
00835 {
00836
00837 CCriticalAutoRelease aRelease(m_pCSection);
00838
00839
00840 if (!m_ulJobsCount)
00841 return;
00842
00843
00844 m_aJobList.clear();
00845 m_ulJobsCount=0;
00846 }
00847 ERROR_HANDLER("Clear")
00848 }
00849
00850 int CThreadPool::GetJobsRan()const
00851 {
00852 return m_iTotalJobsRan;
00853 }
00854
00855 void CThreadPool::SetPoolName(const std::string& rName)
00856 {
00857 m_sName=rName;
00858 }
00859
00860
00861
00862 KOMODIA_NAMESPACE_END