00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include "stdafx.h"
00037 #include "ThreadPool.h"
00038
00039 #include "ErrorHandlerMacros.h"
00040 #include "GenericSemaphore.h"
00041 #include "GenericCriticalSection.h"
00042 #include "GenericEvent.h"
00043 #include "WaitList.h"
00044 #include "OSManager.h"
00045
00046 #ifdef _MEMORY_DEBUG
00047 #define new DEBUG_NEW
00048 #define malloc DEBUG_MALLOC
00049 static char THIS_FILE[] = __FILE__;
00050 #endif
00051
00052 KOMODIA_NAMESPACE_START
00053
00054 #define CThreadPool_Class "CThreadPool"
00055
00056
00057 #define THREAD_TIMEOUT 10000
00058
00059 CThreadPool::CThreadPool(unsigned long ulNumberOfThreads,
00060 unsigned long ulMaxJobsPending,
00061 LPInitializePoolProc pInitializeProc,
00062 CGenericThread::ThreadPriority aPriority) : CErrorHandler(),
00063 m_ulThreadCount(ulNumberOfThreads),
00064 m_pSemaphore(NULL),
00065 m_ppThreads(NULL),
00066 m_pCSection(NULL),
00067 m_ulJobsCount(0),
00068 m_ppThreadData(NULL),
00069 m_iMaxJobsPending(ulMaxJobsPending),
00070 m_pCSectionCounter(NULL),
00071 m_iRunningThreads(0),
00072 m_bInitialized(FALSE),
00073 m_dwSleepInterval(1),
00074 m_bDrop(FALSE)
00075 {
00076 try
00077 {
00078
00079 SetName(CThreadPool_Class);
00080
00081
00082 m_pSemaphore=COSManager::CreateSemaphore(0,m_iMaxJobsPending);
00083
00084
00085 m_pCSection=COSManager::CreateCriticalSection();
00086 m_pCSectionCounter=COSManager::CreateCriticalSection();
00087
00088
00089 m_bInitialized=SpawnThreads(pInitializeProc,
00090 aPriority);
00091 }
00092 ERROR_HANDLER("CThreadPool")
00093 }
00094
00095 CThreadPool::~CThreadPool()
00096 {
00097 try
00098 {
00099
00100 ClearThreads();
00101
00102
00103 delete m_pSemaphore;
00104
00105
00106 delete m_pCSection;
00107 delete m_pCSectionCounter;
00108 }
00109 ERROR_HANDLER("~CThreadPool")
00110 }
00111
00112 BOOL CThreadPool::SpawnThreads(LPInitializePoolProc pInitializeProc,
00113 CGenericThread::ThreadPriority aPriority)
00114 {
00115 try
00116 {
00117
00118 m_ppThreads=new CGenericThread*[m_ulThreadCount];
00119
00120
00121 m_ppThreadData=new ThreadData*[m_ulThreadCount];
00122
00123
00124 memset(m_ppThreads,
00125 0,
00126 sizeof(CGenericThread*)*m_ulThreadCount);
00127 memset(m_ppThreadData,
00128 0,
00129 sizeof(ThreadData*)*m_ulThreadCount);
00130
00131
00132 CWaitList aWaitingList;
00133
00134
00135 for (int iCount=0;iCount<m_ulThreadCount;++iCount)
00136 {
00137 m_ppThreadData[iCount]=new ThreadData;
00138
00139
00140 m_ppThreadData[iCount]->pEvent=COSManager::CreateEvent();
00141 m_ppThreadData[iCount]->pExitEvent=COSManager::CreateEvent();
00142
00143
00144 aWaitingList.AddObject(m_ppThreadData[iCount]->pEvent,TRUE);
00145
00146
00147 m_ppThreadData[iCount]->pClass=this;
00148
00149
00150 m_ppThreadData[iCount]->pInitializeProc=pInitializeProc;
00151
00152
00153 m_ppThreads[iCount]=COSManager::CreateThread(PoolThread);
00154
00155
00156 m_ppThreads[iCount]->SetPriority(aPriority);
00157
00158
00159 m_ppThreads[iCount]->Start((LPVOID)m_ppThreadData[iCount]);
00160 }
00161
00162
00163 BOOL bError;
00164 bError=FALSE;
00165
00166
00167 DWORD dwTmp;
00168 if (aWaitingList.Wait(TRUE,
00169 dwTmp,
00170 THREAD_TIMEOUT))
00171 {
00172
00173 ReportError("SpawnThreads","Timeout waiting for threads!");
00174
00175
00176 bError=TRUE;
00177 }
00178
00179
00180 return !bError;
00181 }
00182 ERROR_HANDLER_RETURN("SpawnThreads",FALSE)
00183 }
00184
00185 BOOL CThreadPool::SubmitJob(LPThreadPoolProc pJobProc,
00186 LPVOID lpData)
00187 {
00188 try
00189 {
00190 return SubmitJob(pJobProc,
00191 NULL,
00192 NULL,
00193 lpData);
00194 }
00195 ERROR_HANDLER_RETURN("SubmitJob",FALSE)
00196 }
00197
00198 BOOL CThreadPool::SubmitJob(LPThreadDWORDPoolProc pJobProc,
00199 LPVOID lpData)
00200 {
00201 try
00202 {
00203 return SubmitJob(NULL,
00204 pJobProc,
00205 NULL,
00206 lpData);
00207 }
00208 ERROR_HANDLER_RETURN("SubmitJob",FALSE)
00209 }
00210
00211 BOOL CThreadPool::SubmitJob(LPThreadPoolDataProc pJobProc,
00212 LPVOID lpData)
00213 {
00214 try
00215 {
00216 return SubmitJob(NULL,
00217 NULL,
00218 pJobProc,
00219 lpData);
00220 }
00221 ERROR_HANDLER_RETURN("SubmitJob",FALSE)
00222 }
00223
00224 BOOL CThreadPool::SubmitJob(LPThreadPoolProc pJobProc,
00225 LPThreadDWORDPoolProc pDWORDJobProc,
00226 LPThreadPoolDataProc pDataProc,
00227 LPVOID lpData)
00228 {
00229 try
00230 {
00231
00232 BOOL bRelease;
00233 bRelease=FALSE;
00234
00235
00236 JobData aData;
00237 aData.pProc=pJobProc;
00238 aData.pDWORDProc=pDWORDJobProc;
00239 aData.pDataProc=pDataProc;
00240 aData.pParam=lpData;
00241
00242
00243 CCriticalAutoRelease aRelease(m_pCSection);
00244
00245
00246 if (m_ulJobsCount>=m_iMaxJobsPending &&
00247 m_bDrop)
00248
00249 return FALSE;
00250
00251
00252 m_aJobList.push_back(aData);
00253
00254
00255 ++m_ulJobsCount;
00256
00257
00258 aRelease.Exit();
00259
00260
00261 while (m_pSemaphore->Release()==-1)
00262 Sleep(m_dwSleepInterval);
00263
00264
00265 return TRUE;
00266 }
00267 ERROR_HANDLER_RETURN("SubmitJob",FALSE)
00268 }
00269
00270 DWORD CThreadPool::PoolThread(LPVOID pParam)
00271 {
00272 try
00273 {
00274
00275 ThreadData* pData;
00276 pData=(ThreadData*)pParam;
00277
00278
00279 srand(GetTickCount()+((unsigned long)pData->pEvent)*2);
00280
00281
00282 BOOL bError;
00283 bError=FALSE;
00284
00285
00286 LPVOID lpData;
00287 lpData=NULL;
00288
00289
00290 if (pData->pInitializeProc)
00291 {
00292 try
00293 {
00294 bError=!(*pData->pInitializeProc)(lpData,TRUE);
00295 }
00296 catch (...)
00297 {
00298 bError=TRUE;
00299 }
00300 }
00301
00302
00303 if (pData->pEvent)
00304 if (!bError)
00305 pData->pEvent->Set();
00306 else
00307 {
00308
00309 ReportStaticError(CThreadPool_Class,"PoolThread","Initialize proc failed!");
00310
00311
00312 return FALSE;
00313 }
00314 else
00315 {
00316
00317 ReportStaticError(CThreadPool_Class,"PoolThread","Received null event!");
00318
00319
00320 return FALSE;
00321 }
00322
00323
00324 CWaitList aList;
00325 aList.AddObject(pData->pClass->GetSemaphore(),TRUE);
00326 aList.AddObject(pData->pExitEvent);
00327
00328
00329 while (1)
00330 {
00331 DWORD dwObject;
00332
00333
00334 aList.Wait(FALSE,dwObject);
00335
00336
00337 if (dwObject)
00338 break;
00339
00340
00341 pData->pClass->ReSetThreadCount(1);
00342
00343
00344 JobData aJobData;
00345 pData->pClass->GetJob(aJobData);
00346
00347
00348 try
00349 {
00350
00351 if (aJobData.pProc)
00352
00353 (*aJobData.pProc)(aJobData.pParam);
00354 else if (aJobData.pDWORDProc)
00355
00356 (*aJobData.pDWORDProc)(aJobData.pParam);
00357 else if (aJobData.pDataProc)
00358 (*aJobData.pDataProc)(aJobData.pParam,lpData);
00359 }
00360 ERROR_HANDLER_STATIC(CThreadPool_Class,"PoolThread - Proc")
00361
00362
00363 pData->pClass->ReSetThreadCount(-1);
00364 }
00365
00366
00367 if (pData->pInitializeProc)
00368 {
00369 try
00370 {
00371 bError=!(*pData->pInitializeProc)(lpData,FALSE);
00372 }
00373 catch (...)
00374 {
00375 bError=TRUE;
00376 }
00377 }
00378
00379
00380 pData->pEvent->Set();
00381
00382
00383 return TRUE;
00384 }
00385 ERROR_HANDLER_STATIC_RETURN(CThreadPool_Class,"PoolThread",FALSE)
00386 }
00387
00388 CGenericSemaphore* CThreadPool::GetSemaphore() const
00389 {
00390 return m_pSemaphore;
00391 }
00392
00393 void CThreadPool::GetJob(JobData& aData)
00394 {
00395 try
00396 {
00397 CCriticalAutoRelease aRelease(m_pCSection);
00398
00399
00400 if (!m_ulJobsCount)
00401 {
00402
00403 aRelease.Exit();
00404
00405
00406 aData.pProc=NULL;
00407 aData.pDWORDProc=NULL;
00408 aData.pDataProc=NULL;
00409 aData.pParam=NULL;
00410
00411
00412 return;
00413 }
00414
00415
00416 aData=m_aJobList.front();
00417
00418
00419 m_aJobList.pop_front();
00420
00421
00422 --m_ulJobsCount;
00423 }
00424 ERROR_HANDLER("GetJob")
00425 }
00426
00427 void CThreadPool::ClearThreads()
00428 {
00429 try
00430 {
00431
00432 if (m_bInitialized)
00433 {
00434
00435 CWaitList aList;
00436
00437
00438 for (int iCount=0;iCount<m_ulThreadCount;iCount++)
00439 {
00440
00441 aList.AddObject(m_ppThreadData[iCount]->pEvent,TRUE);
00442
00443
00444 m_ppThreadData[iCount]->pExitEvent->Set();
00445 }
00446
00447
00448 DWORD dwTmp;
00449 if (aList.Wait(TRUE,dwTmp,THREAD_TIMEOUT))
00450
00451 ReportError("ClearThreads","Timeout waiting for threads!");
00452 }
00453
00454
00455 for (int iCount=0;iCount<m_ulThreadCount;iCount++)
00456 {
00457
00458 delete m_ppThreads[iCount];
00459
00460
00461 delete m_ppThreadData[iCount]->pEvent;
00462 delete m_ppThreadData[iCount]->pExitEvent;
00463
00464
00465 delete m_ppThreadData[iCount];
00466 }
00467
00468
00469 delete [] m_ppThreads;
00470 delete [] m_ppThreadData;
00471
00472
00473 m_ppThreads=NULL;
00474 m_ppThreadData=NULL;
00475 }
00476 ERROR_HANDLER("ClearThreads")
00477 }
00478
00479 int CThreadPool::GetWaitingJobs()const
00480 {
00481 try
00482 {
00483 CCriticalAutoRelease aRelease(m_pCSection);
00484
00485
00486 int iJobs;
00487 iJobs=m_ulJobsCount;
00488
00489
00490 return iJobs;
00491 }
00492 ERROR_HANDLER_RETURN("GetWaitingJobs",0)
00493 }
00494
00495 void CThreadPool::ReSetThreadCount(int iIncrement)
00496 {
00497 try
00498 {
00499
00500 CCriticalAutoRelease aRelease(m_pCSectionCounter);
00501
00502
00503 m_iRunningThreads+=iIncrement;
00504 }
00505 ERROR_HANDLER("ReSetThreadCount")
00506 }
00507
00508 BOOL CThreadPool::IsFinished()const
00509 {
00510 return m_ulJobsCount==0 && m_iRunningThreads==0;
00511 }
00512
00513 DWORD CThreadPool::GetMaxThreads()const
00514 {
00515 return m_ulThreadCount;
00516 }
00517
00518 BOOL CThreadPool::IsInitialized()const
00519 {
00520 return m_bInitialized;
00521 }
00522
00523 void CThreadPool::SetSleepInterval(DWORD dwSleepInterval)
00524 {
00525 m_dwSleepInterval=dwSleepInterval;
00526 }
00527
00528 void CThreadPool::SetExtraDataDrop(BOOL bDrop)
00529 {
00530 m_bDrop=bDrop;
00531 }
00532
00533 int CThreadPool::GetRunningThreads()const
00534 {
00535 return m_iRunningThreads;
00536 }
00537
00538 void CThreadPool::Clear()
00539 {
00540 try
00541 {
00542 CCriticalAutoRelease aRelease(m_pCSection);
00543
00544
00545 if (!m_ulJobsCount)
00546 return;
00547
00548
00549 m_aJobList.clear();
00550 m_ulJobsCount=0;
00551 }
00552 ERROR_HANDLER("Clear")
00553 }
00554
00555 KOMODIA_NAMESPACE_END