00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include "stdafx.h"
00037 #include "ThreadPool.h"
00038
00039 #include "ErrorHandlerMacros.h"
00040 #include "GenericSemaphore.h"
00041 #include "GenericCriticalSection.h"
00042 #include "GenericEvent.h"
00043 #include "WaitList.h"
00044 #include "OSManager.h"
00045
00046 #include <objbase.h>
00047
00048 #ifdef _MEMORY_DEBUG
00049 #define new DEBUG_NEW
00050 #define malloc DEBUG_MALLOC
00051 static char THIS_FILE[] = __FILE__;
00052 #endif
00053
00054 KOMODIA_NAMESPACE_START
00055
00056 #define CThreadPool_Class "CThreadPool"
00057
00058
00059 #define THREAD_TIMEOUT 10000
00060
00061 CThreadPool::CThreadPool(unsigned long ulNumberOfThreads,
00062 unsigned long ulMaxJobsPending,
00063 LPInitializePoolProc pInitializeProc,
00064 CGenericThread::ThreadPriority aPriority,
00065 BOOL bInitializeCOM) : CErrorHandler(),
00066 m_ulThreadCount(ulNumberOfThreads),
00067 m_pSemaphore(NULL),
00068 m_ppThreads(NULL),
00069 m_pCSection(NULL),
00070 m_ulJobsCount(0),
00071 m_ppThreadData(NULL),
00072 m_iMaxJobsPending(ulMaxJobsPending),
00073 m_pCSectionCounter(NULL),
00074 m_iRunningThreads(0),
00075 m_bInitialized(FALSE),
00076 m_dwSleepInterval(1),
00077 m_bDrop(FALSE),
00078 m_iTotalJobsRan(0),
00079 m_bInitializeCOM(bInitializeCOM)
00080 {
00081 try
00082 {
00083
00084 SetName(CThreadPool_Class);
00085
00086
00087 if (!ulNumberOfThreads)
00088 {
00089
00090 ReportError("CThreadPool","Received zero threads!");
00091
00092
00093 throw std::string("Received zero threads!");
00094 }
00095
00096
00097 m_pSemaphore=COSManager::CreateSemaphore(0,
00098 m_iMaxJobsPending);
00099
00100
00101 m_pCSection=COSManager::CreateCriticalSection();
00102 m_pCSectionCounter=COSManager::CreateCriticalSection();
00103
00104
00105 m_bInitialized=SpawnThreads(pInitializeProc,
00106 aPriority,
00107 m_bInitializeCOM);
00108 }
00109 ERROR_HANDLER_RETHROW("CThreadPool")
00110 }
00111
00112 CThreadPool::~CThreadPool()
00113 {
00114 try
00115 {
00116
00117 ClearThreads();
00118
00119
00120 delete m_pSemaphore;
00121
00122
00123 delete m_pCSection;
00124 delete m_pCSectionCounter;
00125 }
00126 ERROR_HANDLER("~CThreadPool")
00127 }
00128
00129 BOOL CThreadPool::SpawnThreads(LPInitializePoolProc pInitializeProc,
00130 CGenericThread::ThreadPriority aPriority,
00131 BOOL bCOM)
00132 {
00133 try
00134 {
00135
00136 m_ppThreads=new CGenericThread*[m_ulThreadCount];
00137
00138
00139 m_ppThreadData=new ThreadData*[m_ulThreadCount];
00140
00141
00142 memset(m_ppThreads,
00143 0,
00144 sizeof(CGenericThread*)*m_ulThreadCount);
00145 memset(m_ppThreadData,
00146 0,
00147 sizeof(ThreadData*)*m_ulThreadCount);
00148
00149
00150 CWaitList aWaitingList;
00151
00152
00153 for (int iCount=0;
00154 iCount<m_ulThreadCount;
00155 ++iCount)
00156 {
00157 m_ppThreadData[iCount]=new ThreadData;
00158
00159
00160 m_ppThreadData[iCount]->pEvent=COSManager::CreateEvent();
00161 m_ppThreadData[iCount]->pExitEvent=COSManager::CreateEvent();
00162 m_ppThreadData[iCount]->bCOM=bCOM;
00163
00164
00165 aWaitingList.AddObject(m_ppThreadData[iCount]->pEvent,TRUE);
00166
00167
00168 m_ppThreadData[iCount]->pClass=this;
00169
00170
00171 m_ppThreadData[iCount]->pInitializeProc=pInitializeProc;
00172
00173
00174 m_ppThreads[iCount]=COSManager::CreateThread(PoolThread);
00175
00176
00177 m_ppThreads[iCount]->SetPriority(aPriority);
00178
00179
00180 m_ppThreads[iCount]->Start((LPVOID)m_ppThreadData[iCount]);
00181 }
00182
00183
00184 BOOL bError;
00185 bError=FALSE;
00186
00187
00188 int iTimeout;
00189
00190
00191 if (m_ulThreadCount>20)
00192 iTimeout=THREAD_TIMEOUT+(m_ulThreadCount-20)*100;
00193 else
00194 iTimeout=THREAD_TIMEOUT;
00195
00196
00197 DWORD dwTmp;
00198 if (aWaitingList.Wait(TRUE,
00199 dwTmp,
00200 THREAD_TIMEOUT))
00201 {
00202
00203 ReportError("SpawnThreads","Timeout waiting for threads!");
00204
00205
00206 bError=TRUE;
00207 }
00208
00209
00210 return !bError;
00211 }
00212 ERROR_HANDLER_RETURN("SpawnThreads",FALSE)
00213 }
00214
00215 BOOL CThreadPool::SubmitJob(LPThreadPoolProc pJobProc,
00216 LPVOID lpData)
00217 {
00218 try
00219 {
00220 return SubmitJob(pJobProc,
00221 NULL,
00222 NULL,
00223 lpData);
00224 }
00225 ERROR_HANDLER_RETURN("SubmitJob",FALSE)
00226 }
00227
00228 BOOL CThreadPool::SubmitJob(LPThreadDWORDPoolProc pJobProc,
00229 LPVOID lpData)
00230 {
00231 try
00232 {
00233 return SubmitJob(NULL,
00234 pJobProc,
00235 NULL,
00236 lpData);
00237 }
00238 ERROR_HANDLER_RETURN("SubmitJob",FALSE)
00239 }
00240
00241 BOOL CThreadPool::SubmitJob(LPThreadPoolDataProc pJobProc,
00242 LPVOID lpData)
00243 {
00244 try
00245 {
00246 return SubmitJob(NULL,
00247 NULL,
00248 pJobProc,
00249 lpData);
00250 }
00251 ERROR_HANDLER_RETURN("SubmitJob",FALSE)
00252 }
00253
00254 BOOL CThreadPool::SubmitJob(LPThreadPoolProc pJobProc,
00255 LPThreadDWORDPoolProc pDWORDJobProc,
00256 LPThreadPoolDataProc pDataProc,
00257 LPVOID lpData)
00258 {
00259 try
00260 {
00261
00262 BOOL bRelease;
00263 bRelease=FALSE;
00264
00265
00266 JobData aData;
00267 aData.pProc=pJobProc;
00268 aData.pDWORDProc=pDWORDJobProc;
00269 aData.pDataProc=pDataProc;
00270 aData.pParam=lpData;
00271
00272
00273 CCriticalAutoRelease aRelease(m_pCSection);
00274
00275
00276 if (m_ulJobsCount>=m_iMaxJobsPending &&
00277 m_bDrop)
00278
00279 return FALSE;
00280
00281
00282 m_aJobList.push_back(aData);
00283
00284
00285 ++m_ulJobsCount;
00286
00287
00288 aRelease.Exit();
00289
00290
00291 while (m_pSemaphore->Release()==-1)
00292 Sleep(m_dwSleepInterval);
00293
00294
00295 return TRUE;
00296 }
00297 ERROR_HANDLER_RETURN("SubmitJob",FALSE)
00298 }
00299
00300 DWORD CThreadPool::PoolThread(LPVOID pParam)
00301 {
00302 try
00303 {
00304
00305 ThreadData* pData;
00306 pData=(ThreadData*)pParam;
00307
00308
00309 srand(GetTickCount()+((unsigned long)pData->pEvent)*2);
00310
00311
00312 BOOL bError;
00313 bError=FALSE;
00314
00315
00316 LPVOID lpData;
00317 lpData=NULL;
00318
00319
00320 if (pData->pInitializeProc)
00321 {
00322 try
00323 {
00324 bError=!(*pData->pInitializeProc)(lpData,TRUE);
00325 }
00326 catch (...)
00327 {
00328 bError=TRUE;
00329 }
00330 }
00331
00332
00333 if (pData->bCOM)
00334 {
00335
00336 HRESULT hr;
00337 if (FAILED(hr=CoInitialize(NULL)))
00338 {
00339
00340 ReportStaticError(CThreadPool_Class,"PoolThread","Failed to initialize COM!",hr);
00341
00342
00343 bError=TRUE;
00344 }
00345 }
00346
00347
00348 if (pData->pEvent)
00349 if (!bError)
00350 pData->pEvent->Set();
00351 else
00352 {
00353
00354 ReportStaticError(CThreadPool_Class,"PoolThread","Initialize proc failed!");
00355
00356
00357 return FALSE;
00358 }
00359 else
00360 {
00361
00362 ReportStaticError(CThreadPool_Class,"PoolThread","Received null event!");
00363
00364
00365 return FALSE;
00366 }
00367
00368
00369 CWaitList aList;
00370 aList.AddObject(pData->pClass->GetSemaphore(),
00371 TRUE);
00372 aList.AddObject(pData->pExitEvent);
00373
00374
00375 while (1)
00376 {
00377 DWORD dwObject;
00378
00379
00380 aList.Wait(FALSE,
00381 dwObject);
00382
00383
00384 if (dwObject)
00385 break;
00386
00387
00388 pData->pClass->ReSetThreadCount(1);
00389
00390
00391 JobData aJobData;
00392 pData->pClass->GetJob(aJobData);
00393
00394
00395 try
00396 {
00397
00398 if (aJobData.pProc)
00399
00400 (*aJobData.pProc)(aJobData.pParam);
00401 else if (aJobData.pDWORDProc)
00402
00403 (*aJobData.pDWORDProc)(aJobData.pParam);
00404 else if (aJobData.pDataProc)
00405 (*aJobData.pDataProc)(aJobData.pParam,lpData);
00406 }
00407 ERROR_HANDLER_STATIC(CThreadPool_Class,"PoolThread - Proc")
00408
00409
00410 pData->pClass->ReSetThreadCount(-1);
00411 }
00412
00413 if (pData->bCOM)
00414
00415 CoUninitialize();
00416
00417
00418 if (pData->pInitializeProc)
00419 {
00420 try
00421 {
00422 bError=!(*pData->pInitializeProc)(lpData,FALSE);
00423 }
00424 catch (...)
00425 {
00426 bError=TRUE;
00427 }
00428 }
00429
00430
00431 pData->pEvent->Set();
00432
00433
00434 return TRUE;
00435 }
00436 ERROR_HANDLER_STATIC_RETURN(CThreadPool_Class,"PoolThread",FALSE)
00437 }
00438
00439 CGenericSemaphore* CThreadPool::GetSemaphore() const
00440 {
00441 return m_pSemaphore;
00442 }
00443
00444 void CThreadPool::GetJob(JobData& aData)
00445 {
00446 try
00447 {
00448
00449 CCriticalAutoRelease aRelease(m_pCSection);
00450
00451
00452 if (!m_ulJobsCount)
00453 {
00454
00455 aRelease.Exit();
00456
00457
00458 aData.pProc=NULL;
00459 aData.pDWORDProc=NULL;
00460 aData.pDataProc=NULL;
00461 aData.pParam=NULL;
00462
00463
00464 return;
00465 }
00466
00467
00468 aData=m_aJobList.front();
00469
00470
00471 m_aJobList.pop_front();
00472
00473
00474 --m_ulJobsCount;
00475 }
00476 ERROR_HANDLER("GetJob")
00477 }
00478
00479 void CThreadPool::ClearThreads()
00480 {
00481 try
00482 {
00483
00484 if (m_bInitialized)
00485 {
00486
00487 int iTimeout;
00488
00489
00490 if (m_ulThreadCount>20)
00491 iTimeout=THREAD_TIMEOUT+(m_ulThreadCount-20)*100;
00492 else
00493 iTimeout=THREAD_TIMEOUT;
00494
00495
00496 CWaitList aList;
00497
00498
00499 for (int iCount=0;
00500 iCount<m_ulThreadCount;
00501 iCount++)
00502 {
00503
00504 aList.AddObject(m_ppThreadData[iCount]->pEvent,TRUE);
00505
00506
00507 m_ppThreadData[iCount]->pExitEvent->Set();
00508 }
00509
00510
00511 DWORD dwTmp;
00512 if (aList.Wait(TRUE,
00513 dwTmp,
00514 iTimeout))
00515
00516 ReportError("ClearThreads","Timeout waiting for threads!");
00517 }
00518
00519
00520 for (int iCount=0;
00521 iCount<m_ulThreadCount;
00522 iCount++)
00523 {
00524
00525 delete m_ppThreads[iCount];
00526
00527
00528 delete m_ppThreadData[iCount]->pEvent;
00529 delete m_ppThreadData[iCount]->pExitEvent;
00530
00531
00532 delete m_ppThreadData[iCount];
00533 }
00534
00535
00536 delete [] m_ppThreads;
00537 delete [] m_ppThreadData;
00538
00539
00540 m_ppThreads=NULL;
00541 m_ppThreadData=NULL;
00542 }
00543 ERROR_HANDLER("ClearThreads")
00544 }
00545
00546 int CThreadPool::GetWaitingJobs()const
00547 {
00548 try
00549 {
00550
00551 CCriticalAutoRelease aRelease(m_pCSection);
00552
00553
00554 int iJobs;
00555 iJobs=m_ulJobsCount;
00556
00557
00558 return iJobs;
00559 }
00560 ERROR_HANDLER_RETURN("GetWaitingJobs",0)
00561 }
00562
00563 void CThreadPool::ReSetThreadCount(int iIncrement)
00564 {
00565 try
00566 {
00567
00568 CCriticalAutoRelease aRelease(m_pCSectionCounter);
00569
00570
00571 m_iRunningThreads+=iIncrement;
00572
00573
00574 if (iIncrement<0)
00575 m_iTotalJobsRan-=iIncrement;
00576 }
00577 ERROR_HANDLER("ReSetThreadCount")
00578 }
00579
00580 BOOL CThreadPool::IsFinished()const
00581 {
00582 return !m_ulJobsCount &&
00583 !m_iRunningThreads;
00584 }
00585
00586 DWORD CThreadPool::GetMaxThreads()const
00587 {
00588 return m_ulThreadCount;
00589 }
00590
00591 BOOL CThreadPool::IsInitialized()const
00592 {
00593 return m_bInitialized;
00594 }
00595
00596 void CThreadPool::SetSleepInterval(DWORD dwSleepInterval)
00597 {
00598 m_dwSleepInterval=dwSleepInterval;
00599 }
00600
00601 void CThreadPool::SetExtraDataDrop(BOOL bDrop)
00602 {
00603 m_bDrop=bDrop;
00604 }
00605
00606 int CThreadPool::GetRunningThreads()const
00607 {
00608 return m_iRunningThreads;
00609 }
00610
00611 void CThreadPool::Clear()
00612 {
00613 try
00614 {
00615
00616 CCriticalAutoRelease aRelease(m_pCSection);
00617
00618
00619 if (!m_ulJobsCount)
00620 return;
00621
00622
00623 m_aJobList.clear();
00624 m_ulJobsCount=0;
00625 }
00626 ERROR_HANDLER("Clear")
00627 }
00628
00629 int CThreadPool::GetJobsRan()const
00630 {
00631 return m_iTotalJobsRan;
00632 }
00633
00634 KOMODIA_NAMESPACE_END