00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 #include "stdafx.h"
00037 #include "ThreadPool.h"
00038 
00039 #include "ErrorHandlerMacros.h"
00040 #include "GenericSemaphore.h"
00041 #include "GenericCriticalSection.h"
00042 #include "GenericEvent.h"
00043 #include "WaitList.h"
00044 #include "OSManager.h"
00045 
00046 #include <objbase.h>
00047 
00048 #ifdef _MEMORY_DEBUG 
00049     #define new    DEBUG_NEW  
00050     #define malloc DEBUG_MALLOC  
00051     static char THIS_FILE[] = __FILE__;  
00052 #endif
00053 
00054 KOMODIA_NAMESPACE_START
00055 
00056 #define CThreadPool_Class "CThreadPool"
00057 
00058 
00059 #define THREAD_TIMEOUT 10000
00060 
00061 CThreadPool::CThreadPool(unsigned long ulNumberOfThreads,
00062                          unsigned long ulMaxJobsPending,
00063                          LPInitializePoolProc pInitializeProc,
00064                          CGenericThread::ThreadPriority aPriority,
00065                          BOOL bInitializeCOM) : CErrorHandler(),
00066                                                 m_ulThreadCount(ulNumberOfThreads),
00067                                                 m_pSemaphore(NULL),
00068                                                 m_ppThreads(NULL),
00069                                                 m_pCSection(NULL),
00070                                                 m_ulJobsCount(0),
00071                                                 m_ppThreadData(NULL),
00072                                                 m_iMaxJobsPending(ulMaxJobsPending),
00073                                                 m_pCSectionCounter(NULL),
00074                                                 m_iRunningThreads(0),
00075                                                 m_bInitialized(FALSE),
00076                                                 m_dwSleepInterval(1),
00077                                                 m_bDrop(FALSE),
00078                                                 m_iTotalJobsRan(0),
00079                                                 m_bInitializeCOM(bInitializeCOM)
00080 {
00081     try
00082     {
00083         
00084         SetName(CThreadPool_Class);
00085 
00086         
00087         if (!ulNumberOfThreads)
00088         {
00089             
00090             ReportError("CThreadPool","Received zero threads!");
00091 
00092             
00093             throw std::string("Received zero threads!");
00094         }
00095 
00096         
00097         m_pSemaphore=COSManager::CreateSemaphore(0,
00098                                                  m_iMaxJobsPending);
00099 
00100         
00101         m_pCSection=COSManager::CreateCriticalSection();
00102         m_pCSectionCounter=COSManager::CreateCriticalSection();
00103 
00104         
00105         m_bInitialized=SpawnThreads(pInitializeProc,
00106                                     aPriority,
00107                                     m_bInitializeCOM);
00108     }
00109     ERROR_HANDLER_RETHROW("CThreadPool")
00110 }
00111 
00112 CThreadPool::~CThreadPool()
00113 {
00114     try
00115     {
00116         
00117         ClearThreads();
00118 
00119         
00120         delete m_pSemaphore;
00121         
00122         
00123         delete m_pCSection;
00124         delete m_pCSectionCounter;
00125     }
00126     ERROR_HANDLER("~CThreadPool")
00127 }
00128 
00129 BOOL CThreadPool::SpawnThreads(LPInitializePoolProc pInitializeProc,
00130                                CGenericThread::ThreadPriority aPriority,
00131                                BOOL bCOM)
00132 {
00133     try
00134     {
00135         
00136         m_ppThreads=new CGenericThread*[m_ulThreadCount];
00137 
00138         
00139         m_ppThreadData=new ThreadData*[m_ulThreadCount];
00140 
00141         
00142         memset(m_ppThreads,
00143                0,
00144                sizeof(CGenericThread*)*m_ulThreadCount);
00145         memset(m_ppThreadData,
00146                0,
00147                sizeof(ThreadData*)*m_ulThreadCount);
00148 
00149         
00150         CWaitList aWaitingList;
00151 
00152         
00153         for (int iCount=0;
00154              iCount<m_ulThreadCount;
00155              ++iCount)
00156         {
00157             m_ppThreadData[iCount]=new ThreadData;
00158 
00159             
00160             m_ppThreadData[iCount]->pEvent=COSManager::CreateEvent();
00161             m_ppThreadData[iCount]->pExitEvent=COSManager::CreateEvent();
00162             m_ppThreadData[iCount]->bCOM=bCOM;
00163 
00164             
00165             aWaitingList.AddObject(m_ppThreadData[iCount]->pEvent,TRUE);
00166 
00167             
00168             m_ppThreadData[iCount]->pClass=this;
00169 
00170             
00171             m_ppThreadData[iCount]->pInitializeProc=pInitializeProc;
00172 
00173             
00174             m_ppThreads[iCount]=COSManager::CreateThread(PoolThread);
00175 
00176             
00177             m_ppThreads[iCount]->SetPriority(aPriority);
00178 
00179             
00180             m_ppThreads[iCount]->Start((LPVOID)m_ppThreadData[iCount]);
00181         }
00182 
00183         
00184         BOOL bError;
00185         bError=FALSE;
00186 
00187         
00188         int iTimeout;
00189 
00190         
00191         if (m_ulThreadCount>20)
00192             iTimeout=THREAD_TIMEOUT+(m_ulThreadCount-20)*100;
00193         else
00194             iTimeout=THREAD_TIMEOUT;
00195 
00196         
00197         DWORD dwTmp;
00198         if (aWaitingList.Wait(TRUE,
00199                               dwTmp,
00200                               THREAD_TIMEOUT))
00201         {
00202             
00203             ReportError("SpawnThreads","Timeout waiting for threads!");
00204 
00205             
00206             bError=TRUE;
00207         }
00208 
00209         
00210         return !bError;
00211     }
00212     ERROR_HANDLER_RETURN("SpawnThreads",FALSE)
00213 }
00214 
00215 BOOL CThreadPool::SubmitJob(LPThreadPoolProc pJobProc, 
00216                             LPVOID lpData)
00217 {
00218     try
00219     {
00220         return SubmitJob(pJobProc,
00221                          NULL,
00222                          NULL,
00223                          lpData);
00224     }
00225     ERROR_HANDLER_RETURN("SubmitJob",FALSE)
00226 }
00227 
00228 BOOL CThreadPool::SubmitJob(LPThreadDWORDPoolProc pJobProc,
00229                             LPVOID lpData)
00230 {
00231     try
00232     {
00233         return SubmitJob(NULL,
00234                          pJobProc,
00235                          NULL,
00236                          lpData);
00237     }
00238     ERROR_HANDLER_RETURN("SubmitJob",FALSE)
00239 }
00240 
00241 BOOL CThreadPool::SubmitJob(LPThreadPoolDataProc pJobProc,
00242                             LPVOID lpData)
00243 {
00244     try
00245     {
00246         return SubmitJob(NULL,
00247                          NULL,
00248                          pJobProc,
00249                          lpData);
00250     }
00251     ERROR_HANDLER_RETURN("SubmitJob",FALSE)
00252 }
00253 
00254 BOOL CThreadPool::SubmitJob(LPThreadPoolProc pJobProc,
00255                             LPThreadDWORDPoolProc pDWORDJobProc,
00256                             LPThreadPoolDataProc pDataProc,
00257                             LPVOID lpData)
00258 {
00259     try
00260     {
00261         
00262         BOOL bRelease;
00263         bRelease=FALSE;
00264 
00265         
00266         JobData aData;
00267         aData.pProc=pJobProc;
00268         aData.pDWORDProc=pDWORDJobProc;
00269         aData.pDataProc=pDataProc;
00270         aData.pParam=lpData;
00271 
00272         
00273         CCriticalAutoRelease aRelease(m_pCSection);
00274 
00275         
00276         if (m_ulJobsCount>=m_iMaxJobsPending &&
00277             m_bDrop)
00278             
00279             return FALSE;
00280 
00281         
00282         m_aJobList.push_back(aData);
00283 
00284         
00285         ++m_ulJobsCount;
00286 
00287         
00288         aRelease.Exit();
00289 
00290         
00291         while (m_pSemaphore->Release()==-1)
00292             Sleep(m_dwSleepInterval);
00293 
00294         
00295         return TRUE;
00296     }
00297     ERROR_HANDLER_RETURN("SubmitJob",FALSE)
00298 }
00299 
00300 DWORD CThreadPool::PoolThread(LPVOID pParam)
00301 {
00302     try
00303     {
00304         
00305         ThreadData* pData;
00306         pData=(ThreadData*)pParam;
00307 
00308         
00309         srand(GetTickCount()+((unsigned long)pData->pEvent)*2);
00310 
00311         
00312         BOOL bError;
00313         bError=FALSE;
00314 
00315         
00316         LPVOID lpData;
00317         lpData=NULL;
00318 
00319         
00320         if (pData->pInitializeProc)
00321         {
00322             try
00323             {
00324                 bError=!(*pData->pInitializeProc)(lpData,TRUE);
00325             }
00326             catch (...)
00327             {
00328                 bError=TRUE;
00329             }
00330         }
00331 
00332         
00333         if (pData->bCOM)
00334         {
00335             
00336             HRESULT hr;
00337             if (FAILED(hr=CoInitialize(NULL)))
00338             {
00339                 
00340                 ReportStaticError(CThreadPool_Class,"PoolThread","Failed to initialize COM!",hr);
00341 
00342                 
00343                 bError=TRUE;
00344             }
00345         }
00346 
00347         
00348         if (pData->pEvent)
00349             if (!bError)
00350                 pData->pEvent->Set();
00351             else
00352             {
00353                 
00354                 ReportStaticError(CThreadPool_Class,"PoolThread","Initialize proc failed!");
00355 
00356                 
00357                 return FALSE;
00358             }
00359         else
00360         {
00361             
00362             ReportStaticError(CThreadPool_Class,"PoolThread","Received null event!");
00363 
00364             
00365             return FALSE;
00366         }
00367 
00368         
00369         CWaitList aList;
00370         aList.AddObject(pData->pClass->GetSemaphore(),
00371                         TRUE);
00372         aList.AddObject(pData->pExitEvent);
00373 
00374         
00375         while (1)
00376         {
00377             DWORD dwObject;
00378 
00379             
00380             aList.Wait(FALSE,
00381                        dwObject);
00382 
00383             
00384             if (dwObject)
00385                 break;
00386 
00387             
00388             pData->pClass->ReSetThreadCount(1);
00389 
00390             
00391             JobData aJobData;
00392             pData->pClass->GetJob(aJobData);
00393 
00394             
00395             try
00396             {
00397                 
00398                 if (aJobData.pProc)
00399                     
00400                     (*aJobData.pProc)(aJobData.pParam);
00401                 else if (aJobData.pDWORDProc)
00402                     
00403                     (*aJobData.pDWORDProc)(aJobData.pParam);
00404                 else if (aJobData.pDataProc)
00405                     (*aJobData.pDataProc)(aJobData.pParam,lpData);
00406             }
00407             ERROR_HANDLER_STATIC(CThreadPool_Class,"PoolThread - Proc")
00408 
00409             
00410             pData->pClass->ReSetThreadCount(-1);
00411         }
00412         
00413         if (pData->bCOM)
00414             
00415             CoUninitialize();
00416 
00417         
00418         if (pData->pInitializeProc)
00419         {
00420             try
00421             {
00422                 bError=!(*pData->pInitializeProc)(lpData,FALSE);
00423             }
00424             catch (...)
00425             {
00426                 bError=TRUE;
00427             }
00428         }
00429 
00430         
00431         pData->pEvent->Set();
00432 
00433         
00434         return TRUE;
00435     }
00436     ERROR_HANDLER_STATIC_RETURN(CThreadPool_Class,"PoolThread",FALSE)
00437 }
00438 
00439 CGenericSemaphore* CThreadPool::GetSemaphore() const
00440 {
00441     return m_pSemaphore;
00442 }
00443 
00444 void CThreadPool::GetJob(JobData& aData)
00445 {
00446     try
00447     {
00448         
00449         CCriticalAutoRelease aRelease(m_pCSection);
00450 
00451         
00452         if (!m_ulJobsCount)
00453         {
00454             
00455             aRelease.Exit();
00456 
00457             
00458             aData.pProc=NULL;
00459             aData.pDWORDProc=NULL;
00460             aData.pDataProc=NULL;
00461             aData.pParam=NULL;
00462 
00463             
00464             return;
00465         }
00466 
00467         
00468         aData=m_aJobList.front();
00469 
00470         
00471         m_aJobList.pop_front();
00472 
00473         
00474         --m_ulJobsCount;
00475     }
00476     ERROR_HANDLER("GetJob")
00477 }
00478 
00479 void CThreadPool::ClearThreads()
00480 {
00481     try
00482     {
00483         
00484         if (m_bInitialized)
00485         {
00486             
00487             int iTimeout;
00488 
00489             
00490             if (m_ulThreadCount>20)
00491                 iTimeout=THREAD_TIMEOUT+(m_ulThreadCount-20)*100;
00492             else
00493                 iTimeout=THREAD_TIMEOUT;
00494 
00495             
00496             CWaitList aList;
00497 
00498             
00499             for (int iCount=0;
00500                  iCount<m_ulThreadCount;
00501                  iCount++)
00502             {
00503                 
00504                 aList.AddObject(m_ppThreadData[iCount]->pEvent,TRUE);
00505 
00506                 
00507                 m_ppThreadData[iCount]->pExitEvent->Set();
00508             }
00509             
00510             
00511             DWORD dwTmp;
00512             if (aList.Wait(TRUE,
00513                            dwTmp,
00514                            iTimeout))
00515                 
00516                 ReportError("ClearThreads","Timeout waiting for threads!");
00517         }
00518 
00519         
00520         for (int iCount=0;
00521              iCount<m_ulThreadCount;
00522              iCount++)
00523         {
00524             
00525             delete m_ppThreads[iCount];
00526 
00527             
00528             delete m_ppThreadData[iCount]->pEvent;
00529             delete m_ppThreadData[iCount]->pExitEvent;
00530 
00531             
00532             delete m_ppThreadData[iCount];
00533         }
00534 
00535         
00536         delete [] m_ppThreads;
00537         delete [] m_ppThreadData;
00538 
00539         
00540         m_ppThreads=NULL;
00541         m_ppThreadData=NULL;
00542     }
00543     ERROR_HANDLER("ClearThreads")
00544 }
00545 
00546 int CThreadPool::GetWaitingJobs()const
00547 {
00548     try
00549     {
00550         
00551         CCriticalAutoRelease aRelease(m_pCSection);
00552 
00553         
00554         int iJobs;
00555         iJobs=m_ulJobsCount;
00556 
00557         
00558         return iJobs;
00559     }
00560     ERROR_HANDLER_RETURN("GetWaitingJobs",0)
00561 }
00562 
00563 void CThreadPool::ReSetThreadCount(int iIncrement)
00564 {
00565     try
00566     {
00567         
00568         CCriticalAutoRelease aRelease(m_pCSectionCounter);
00569 
00570         
00571         m_iRunningThreads+=iIncrement;
00572 
00573         
00574         if (iIncrement<0)
00575             m_iTotalJobsRan-=iIncrement;
00576     }
00577     ERROR_HANDLER("ReSetThreadCount")
00578 }
00579 
00580 BOOL CThreadPool::IsFinished()const
00581 {
00582     return !m_ulJobsCount && 
00583            !m_iRunningThreads;
00584 }
00585 
00586 DWORD CThreadPool::GetMaxThreads()const
00587 {
00588     return m_ulThreadCount;
00589 }
00590 
00591 BOOL CThreadPool::IsInitialized()const
00592 {
00593     return m_bInitialized;
00594 }
00595 
00596 void CThreadPool::SetSleepInterval(DWORD dwSleepInterval)
00597 {
00598     m_dwSleepInterval=dwSleepInterval;
00599 }
00600 
00601 void CThreadPool::SetExtraDataDrop(BOOL bDrop)
00602 {
00603     m_bDrop=bDrop;
00604 }
00605 
00606 int CThreadPool::GetRunningThreads()const
00607 {
00608     return m_iRunningThreads;
00609 }
00610 
00611 void CThreadPool::Clear()
00612 {
00613     try
00614     {
00615         
00616         CCriticalAutoRelease aRelease(m_pCSection);
00617 
00618         
00619         if (!m_ulJobsCount)
00620             return;
00621 
00622         
00623         m_aJobList.clear();
00624         m_ulJobsCount=0;
00625     }
00626     ERROR_HANDLER("Clear")
00627 }
00628 
00629 int CThreadPool::GetJobsRan()const
00630 {
00631     return m_iTotalJobsRan;
00632 }
00633 
00634 KOMODIA_NAMESPACE_END