00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include "stdafx.h"
00037 #include "SocketThreadManager.h"
00038
00039 #include "ErrorHandlerMacros.h"
00040 #include "OSManager.h"
00041
00042 #include "GenericCriticalSection.h"
00043 #include "GenericThread.h"
00044 #include "GenericEvent.h"
00045 #include "WaitList.h"
00046
00047 #ifdef _MEMORY_DEBUG
00048 #define new DEBUG_NEW
00049 #define malloc DEBUG_MALLOC
00050 static char THIS_FILE[] = __FILE__;
00051 #endif
00052
00053 KOMODIA_NAMESPACE_START
00054
00055 #define CSocketThreadManager_Class "CSocketThreadManager"
00056
00057
00058 #define THREADS_TIMEOUT 20000
00059
00060
00061 BOOL CSocketThreadManager::m_bRegisteredWindow=FALSE;
00062
00063
00064 CSocketThreadManager CSocketThreadManager::m_sManager(TRUE);
00065
00066 CSocketThreadManager::CSocketThreadManager(HINSTANCE hInstance) : CErrorHandler(),
00067 m_ulThreadCount(0),
00068 m_hInstance(hInstance),
00069 m_pThreadData(NULL),
00070 m_pCSection(NULL),
00071 m_bStaticClass(FALSE),
00072 m_bInitialized(FALSE)
00073 {
00074 try
00075 {
00076
00077 SetName(CSocketThreadManager_Class);
00078
00079
00080 m_pCSection=COSManager::CreateCriticalSection();
00081 }
00082 ERROR_HANDLER("CSocketThreadManager")
00083 }
00084
00085 CSocketThreadManager::CSocketThreadManager(BOOL bStatic) : CErrorHandler(),
00086 m_bStaticClass(bStatic)
00087 {
00088 try
00089 {
00090
00091 SetName(CSocketThreadManager_Class);
00092 }
00093 ERROR_HANDLER("CSocketThreadManager")
00094 }
00095
00096 CSocketThreadManager::~CSocketThreadManager()
00097 {
00098 try
00099 {
00100
00101 if (m_bInitialized)
00102 Uninitialize();
00103
00104
00105 delete m_pCSection;
00106 }
00107 ERROR_HANDLER("~CSocketThreadManager")
00108 }
00109
00110 BOOL CSocketThreadManager::Initialize(unsigned long ulThreadCount,
00111 LPCSTR lpClassName)
00112 {
00113 try
00114 {
00115
00116 if (m_bInitialized)
00117 {
00118
00119 ReportError("Initialize","Already initialized!");
00120
00121
00122 return FALSE;
00123 }
00124
00125
00126 if (ulThreadCount>MAX_THREADS)
00127 {
00128
00129 ReportError("Initialize","Too many threads to spawn!");
00130
00131
00132 return FALSE;
00133 }
00134
00135
00136 if (!ulThreadCount)
00137 {
00138
00139 ReportError("Initialize","Can't have zero threads!");
00140
00141
00142 return FALSE;
00143 }
00144
00145
00146 m_ulThreadCount=ulThreadCount;
00147
00148
00149 return SpawnThreads(lpClassName);
00150 }
00151 ERROR_HANDLER_RETURN("Initialize",FALSE)
00152 }
00153
00154 BOOL CSocketThreadManager::Uninitialize()
00155 {
00156 try
00157 {
00158
00159 if (!m_bInitialized)
00160 {
00161
00162 ReportError("Initialize","Not initialized!");
00163
00164
00165 return FALSE;
00166 }
00167
00168
00169 if (m_bStaticClass)
00170
00171 UnregisterClass();
00172 else
00173
00174 CleanThreads(FALSE);
00175
00176
00177 return TRUE;
00178 }
00179 ERROR_HANDLER_RETURN("Uninitialize",FALSE)
00180 }
00181
00182 BOOL CSocketThreadManager::SpawnThreads(LPCSTR lpClassName)
00183 {
00184 try
00185 {
00186
00187 if (!lpClassName)
00188 {
00189
00190 int iStringLength;
00191 iStringLength=strlen(CSocketThreadManager_Class);
00192
00193
00194 iStringLength+=50;
00195
00196
00197 std::auto_ptr<char> pString(new char[iStringLength]);
00198
00199
00200 DWORD dwThreadID;
00201 dwThreadID=GetCurrentThreadId();
00202
00203
00204 DWORD dwProcessID;
00205 dwProcessID=GetTickCount();
00206
00207
00208 sprintf(pString.get(),"%s_%lu_%lu",CSocketThreadManager_Class,
00209 GetCurrentThreadId(),
00210 GetTickCount());
00211
00212
00213
00214 if (!m_bRegisteredWindow)
00215 if (!RegisterClass(pString.get()))
00216 {
00217
00218 ReportError("SpawnThreads","Failed to register window!");
00219
00220
00221 return FALSE;
00222 }
00223
00224
00225 m_sClassName=pString.get();
00226 }
00227 else
00228 m_sClassName=lpClassName;
00229
00230
00231
00232 m_pThreadData=new ThreadData[m_ulThreadCount];
00233
00234
00235 for (int iCount=0;iCount<m_ulThreadCount;++iCount)
00236 {
00237 m_pThreadData[iCount].bFreeze=FALSE;
00238 m_pThreadData[iCount].pEvent=NULL;
00239 m_pThreadData[iCount].hInstance=NULL;
00240 m_pThreadData[iCount].hWindowHandle=NULL;
00241 m_pThreadData[iCount].iSocketCount=0;
00242 m_pThreadData[iCount].iTimeoutCount=0;
00243 m_pThreadData[iCount].pThread=NULL;
00244 m_pThreadData[iCount].pCSection=COSManager::CreateCriticalSection();
00245 }
00246
00247
00248 CWaitList aList;
00249
00250
00251 for (int iCounter=0;iCounter<m_ulThreadCount;++iCounter)
00252 {
00253
00254 m_pThreadData[iCounter].sClassName=m_sClassName;
00255
00256
00257 m_pThreadData[iCounter].pEvent=COSManager::CreateEvent();
00258
00259
00260 aList.AddObject(m_pThreadData[iCounter].pEvent,TRUE);
00261
00262
00263 m_pThreadData[iCounter].hInstance=m_hInstance;
00264
00265
00266 m_pThreadData[iCounter].pThread=COSManager::CreateThread(SocketThread);
00267
00268
00269 if (!m_pThreadData[iCounter].pThread->GetThreadID())
00270 {
00271
00272 ReportError("SpawnThreads","Failed to create thread!");
00273
00274
00275 CleanThreads(TRUE);
00276
00277
00278 return FALSE;
00279 }
00280 else
00281
00282 m_pThreadData[iCounter].pThread->Start((LPVOID)&m_pThreadData[iCounter]);
00283 }
00284
00285
00286 DWORD dwTmp;
00287
00288 if (aList.Wait(TRUE,
00289 dwTmp,
00290 THREADS_TIMEOUT))
00291 {
00292
00293 ReportError("SpawnThreads","Timeout waiting for threads!");
00294
00295
00296 CleanThreads(TRUE);
00297
00298
00299 return FALSE;
00300 }
00301 else
00302 {
00303
00304 m_bInitialized=TRUE;
00305
00306
00307 return TRUE;
00308 }
00309 }
00310 ERROR_HANDLER_RETURN("SpawnThreads",FALSE)
00311 }
00312
00313 DWORD CSocketThreadManager::SocketThread(LPVOID lpParameter)
00314 {
00315 try
00316 {
00317
00318 ThreadData* pData;
00319 pData=(ThreadData*)lpParameter;
00320
00321
00322 srand(GetTickCount()+(unsigned long)pData->pEvent+(unsigned long)pData->hWindowHandle);
00323
00324
00325 pData->hWindowHandle=CreateWindowEx(0,
00326 pData->sClassName.c_str(),
00327 SOCKET_WINDOW_NAME,
00328 WS_OVERLAPPED,
00329 0,
00330 0,
00331 0,
00332 0,
00333 0,
00334 NULL,
00335 pData->hInstance,
00336 NULL);
00337
00338
00339 pData->pEvent->Set();
00340
00341
00342 if (pData->hWindowHandle)
00343 {
00344
00345 MSG msg;
00346
00347 while (GetMessage(&msg,NULL,0,0))
00348 {
00349
00350 TranslateMessage(&msg);
00351
00352
00353 if (msg.message==WM_TIMER)
00354 {
00355
00356 CCriticalAutoRelease aRelease(pData->pCSection);
00357
00358
00359 TOMap::iterator aTheIterator;
00360 aTheIterator=pData->pMap.find(msg.wParam);
00361
00362
00363 if (aTheIterator!=pData->pMap.end())
00364 {
00365
00366 TimeoutData aData;
00367 aData=aTheIterator->second;
00368
00369
00370 if (aData.bClearTimeout)
00371 {
00372
00373 KillTimer(pData->hWindowHandle,msg.wParam);
00374
00375
00376 if (aData.pTimer)
00377 memset(aData.pTimer,0,sizeof(TimerID));
00378
00379
00380 pData->pMap.erase(aTheIterator);
00381
00382
00383 --pData->iTimeoutCount;
00384 }
00385
00386
00387 aRelease.Exit();
00388
00389
00390 try
00391 {
00392
00393 (*(aData.pTimeoutProc))(aData.pData);
00394 }
00395 ERROR_HANDLER_STATIC(CSocketThreadManager_Class,"SocketThread - Proc")
00396 }
00397 else
00398 {
00399
00400 aRelease.Exit();
00401
00402
00403 DispatchMessage(&msg);
00404 }
00405 }
00406 else
00407
00408 DispatchMessage(&msg);
00409 }
00410 }
00411 else
00412
00413 ReportStaticError(CSocketThreadManager_Class,"SocketThread","Failed to create window!");
00414
00415
00416 pData->pEvent->Set();
00417
00418
00419 return FALSE;
00420 }
00421 ERROR_HANDLER_STATIC_RETURN(CSocketThreadManager_Class,"SocketThread",TRUE)
00422 }
00423
00424 int CSocketThreadManager::GetMostAvailableThread()const
00425 {
00426 try
00427 {
00428
00429 int iStartPosition;
00430 iStartPosition=1;
00431
00432 int iIndex;
00433 iIndex=0;
00434
00435
00436 CCriticalAutoRelease aRelease(m_pCSection);
00437
00438
00439 while (m_pThreadData[iIndex].bFreeze &&
00440 iIndex<m_ulThreadCount)
00441 {
00442
00443 m_pThreadData[iIndex].bFreeze=FALSE;
00444
00445 ++iIndex;
00446 ++iStartPosition;
00447 }
00448
00449
00450 if (iIndex==m_ulThreadCount)
00451 return GetMostAvailableThread();
00452
00453
00454 for (int iCounter=iStartPosition;iCounter<m_ulThreadCount;++iCounter)
00455
00456 if (m_pThreadData[iCounter].iSocketCount+m_pThreadData[iCounter].iTimeoutCount<
00457 m_pThreadData[iIndex].iSocketCount+m_pThreadData[iIndex].iTimeoutCount &&
00458 m_pThreadData[iCounter].pThread->GetThreadID())
00459
00460 if (m_pThreadData[iCounter].bFreeze)
00461
00462 m_pThreadData[iCounter].bFreeze=FALSE;
00463 else
00464
00465 iIndex=iCounter;
00466
00467
00468 return iIndex+1;
00469 }
00470 ERROR_HANDLER_RETURN("GetMostAvailableThread",0)
00471 }
00472
00473 HWND CSocketThreadManager::GetWindowHandle(BOOL bTimeout)
00474 {
00475 try
00476 {
00477
00478 CCriticalAutoRelease aRelease(m_pCSection);
00479
00480
00481 int iIndex;
00482 iIndex=GetMostAvailableThread();
00483
00484
00485 if (!iIndex)
00486
00487 return 0;
00488
00489
00490 if (bTimeout)
00491 ++m_pThreadData[iIndex-1].iTimeoutCount;
00492 else
00493
00494 ++m_pThreadData[iIndex-1].iSocketCount;
00495
00496
00497 return m_pThreadData[iIndex-1].hWindowHandle;
00498 }
00499 ERROR_HANDLER_RETURN("GetWindowHandle",0)
00500 }
00501
00502 void CSocketThreadManager::DecreaseSocketCount(HWND hWindowHandle,
00503 BOOL bFreeze)
00504 {
00505 try
00506 {
00507
00508 int iIndex;
00509 iIndex=GetIndexByHWND(hWindowHandle);
00510
00511
00512 if (!iIndex)
00513 return;
00514
00515
00516 CCriticalAutoRelease aRelease(m_pCSection);
00517
00518
00519 if (m_pThreadData[iIndex-1].iSocketCount>0)
00520 {
00521 --m_pThreadData[iIndex-1].iSocketCount;
00522 m_pThreadData[iIndex-1].bFreeze=bFreeze;
00523 }
00524 }
00525 ERROR_HANDLER("DecreaseSocketCount")
00526 }
00527
00528 void CSocketThreadManager::DecreaseTimeoutCount(HWND hWindowHandle,
00529 BOOL bFreeze)
00530 {
00531 try
00532 {
00533
00534 int iIndex;
00535 iIndex=GetIndexByHWND(hWindowHandle);
00536
00537
00538 if (!iIndex)
00539 return;
00540
00541
00542 CCriticalAutoRelease aRelease(m_pCSection);
00543
00544
00545 if (m_pThreadData[iIndex-1].iTimeoutCount>0)
00546 {
00547 --m_pThreadData[iIndex-1].iTimeoutCount;
00548 m_pThreadData[iIndex-1].bFreeze=bFreeze;
00549 }
00550 }
00551 ERROR_HANDLER("DecreaseTimeoutCount")
00552 }
00553
00554 int CSocketThreadManager::GetIndexByHWND(HWND hHandle)const
00555 {
00556 try
00557 {
00558 for (int iCounter=0;iCounter<m_ulThreadCount;++iCounter)
00559 if (m_pThreadData[iCounter].hWindowHandle==hHandle)
00560
00561 return iCounter+1;
00562
00563
00564 return 0;
00565 }
00566 ERROR_HANDLER_RETURN("GetIndexByHWND",0)
00567 }
00568
00569 void CSocketThreadManager::CleanThreads(BOOL bError)
00570 {
00571 try
00572 {
00573
00574 CWaitList aList;
00575
00576
00577 if (m_pThreadData)
00578 {
00579
00580 int iCounter;
00581
00582
00583 for (iCounter=0;iCounter<m_ulThreadCount;++iCounter)
00584
00585 if (m_pThreadData[iCounter].pThread->GetThreadID())
00586 {
00587
00588 PostThreadMessage(m_pThreadData[iCounter].pThread->GetThreadID(),WM_QUIT,0,0);
00589
00590
00591 if (bError)
00592
00593 DestroyWindow(m_pThreadData[iCounter].hWindowHandle);
00594 else if (m_pThreadData[iCounter].pEvent)
00595 aList.AddObject(m_pThreadData[iCounter].pEvent,TRUE);
00596 }
00597
00598
00599 if (bError)
00600
00601 Sleep(1000);
00602 else
00603 {
00604
00605 DWORD dwTmp;
00606
00607
00608 if (aList.Wait(TRUE,dwTmp,THREADS_TIMEOUT))
00609 ReportError("CleanThreads","Timeout waiting for threads!");
00610 }
00611
00612
00613 for (iCounter=0;iCounter<m_ulThreadCount;++iCounter)
00614 {
00615
00616 delete m_pThreadData[iCounter].pThread;
00617
00618
00619 delete m_pThreadData[iCounter].pCSection;
00620
00621
00622 delete m_pThreadData[iCounter].pEvent;
00623
00624
00625 if (!bError)
00626
00627 DestroyWindow(m_pThreadData[iCounter].hWindowHandle);
00628 }
00629
00630
00631 delete [] m_pThreadData;
00632 }
00633 }
00634 ERROR_HANDLER("CleanThreads")
00635 }
00636
00637 CSocketThreadManager::TimerID CSocketThreadManager::RegisterTimeout(int iMS,
00638 LPTimeoutProc pProc,
00639 LPVOID pData,
00640 BOOL bClearTimeout,
00641 HWND hHandle,
00642 TimerID* pTimer)
00643 {
00644 TimerID aTimerID;
00645 aTimerID.iIndex=0;
00646
00647 try
00648 {
00649
00650 if (pTimer)
00651 memset(pTimer,0,sizeof(TimerID));
00652
00653 int iIndex;
00654
00655
00656 TimeoutData aData;
00657 aData.pTimeoutProc=pProc;
00658 aData.pData=pData;
00659 aData.bClearTimeout=bClearTimeout;
00660 aData.pTimer=pTimer;
00661
00662
00663 CCriticalAutoRelease aRelease(m_pCSection);
00664
00665
00666 if (hHandle)
00667 {
00668
00669 iIndex=GetIndexByHWND(hHandle);
00670
00671
00672 if (!iIndex)
00673 {
00674
00675 ReportError("RegisterTimeout","Failed to find handle!");
00676
00677
00678 return aTimerID;
00679 }
00680 else
00681 --iIndex;
00682 }
00683 else
00684 {
00685
00686 iIndex=GetMostAvailableThread();
00687
00688
00689 if (!iIndex)
00690 {
00691
00692 ReportError("RegisterTimeout","Failed to find thread!");
00693
00694
00695 return aTimerID;
00696 }
00697
00698
00699 --iIndex;
00700 }
00701
00702
00703 unsigned int iTimerID;
00704 iTimerID=0;
00705
00706
00707 static int sTimerCounter=1000000;
00708
00709
00710 int iSetTimerID;
00711 iSetTimerID=sTimerCounter++;
00712
00713
00714 CCriticalAutoRelease aRelease2(m_pThreadData[iIndex].pCSection);
00715
00716
00717 ++m_pThreadData[iIndex].iTimeoutCount;
00718
00719
00720 m_pThreadData[iIndex].pMap.insert(TOMap::value_type(iSetTimerID,aData));
00721
00722
00723 aRelease2.Exit();
00724 aRelease.Exit();
00725
00726
00727 iTimerID=SetTimer(m_pThreadData[iIndex].hWindowHandle,
00728 iSetTimerID,
00729 iMS,
00730 NULL);
00731
00732
00733 if (!iTimerID)
00734 {
00735 {
00736
00737
00738 CCriticalAutoRelease aRelease2(m_pThreadData[iIndex].pCSection);
00739
00740
00741 --m_pThreadData[iIndex].iTimeoutCount;
00742
00743
00744 m_pThreadData[iIndex].pMap.erase(iSetTimerID);
00745 }
00746
00747
00748 ReportError("RegisterTimeout","Failed to create the timer!");
00749
00750
00751 return aTimerID;
00752 }
00753
00754
00755 aTimerID.iTimerID=iSetTimerID;
00756 aTimerID.iIndex=iIndex+1;
00757 aTimerID.iMS=iMS;
00758
00759
00760 if (pTimer)
00761 memcpy(pTimer,&aTimerID,sizeof(aTimerID));
00762
00763
00764 return aTimerID;
00765 }
00766 ERROR_HANDLER_RETURN("RegisterTimeout",aTimerID)
00767 }
00768
00769 BOOL CSocketThreadManager::RemoveTimeout(TimerID& aTimerID)
00770 {
00771
00772 if (!aTimerID.iIndex)
00773 {
00774
00775 ReportError("RemoveTimeout","Received invalid TimerID");
00776
00777
00778 return FALSE;
00779 }
00780
00781 try
00782 {
00783
00784 TimerID aID;
00785 aID=aTimerID;
00786
00787 {
00788
00789 CCriticalAutoRelease aRelease(m_pThreadData[aID.iIndex-1].pCSection);
00790
00791
00792 if (!aTimerID.iIndex)
00793 return TRUE;
00794
00795
00796 m_pThreadData[aTimerID.iIndex-1].pMap.erase(aTimerID.iTimerID);
00797
00798
00799 --m_pThreadData[aTimerID.iIndex-1].iTimeoutCount;
00800
00801
00802 aTimerID.iIndex=0;
00803 aTimerID.iMS=0;
00804 aTimerID.iTimerID=0;
00805 }
00806
00807
00808 if (!KillTimer(m_pThreadData[aID.iIndex-1].hWindowHandle,
00809 aID.iTimerID))
00810
00811 ReportErrorOS("RemoveTimeout","Failed to delete timer!");
00812
00813
00814 return TRUE;
00815 }
00816 ERROR_HANDLER_RETURN("RemoveTimeout",FALSE)
00817 }
00818
00819 BOOL CSocketThreadManager::UnregisterClass()
00820 {
00821 if (!m_bRegisteredWindow)
00822 return TRUE;
00823
00824 try
00825 {
00826
00827 m_bRegisteredWindow=!::UnregisterClass(m_sClassName.c_str(),
00828 m_hInstance);
00829
00830
00831 return !m_bRegisteredWindow;
00832 }
00833 ERROR_HANDLER_RETURN("UnregisterClass",FALSE)
00834 }
00835
00836 BOOL CSocketThreadManager::RegisterClass(LPCSTR lpClassName)
00837 {
00838
00839 if (m_bRegisteredWindow)
00840 return TRUE;
00841
00842 try
00843 {
00844 WNDCLASS wc;
00845
00846
00847
00848
00849 wc.style = 0;
00850 wc.lpfnWndProc = (WNDPROC)DefWindowProc;
00851
00852 wc.cbClsExtra = 0;
00853 wc.cbWndExtra = 0;
00854 wc.hIcon = NULL;
00855 wc.hInstance = m_hInstance;
00856 wc.hCursor = NULL;
00857 wc.hbrBackground = NULL;
00858 wc.lpszMenuName = NULL;
00859 wc.lpszClassName = lpClassName ;
00860
00861
00862 m_bRegisteredWindow=::RegisterClass(&wc)!=0;
00863
00864
00865 return m_bRegisteredWindow;
00866 }
00867 ERROR_HANDLER_RETURN("RegisterClass",FALSE)
00868 }
00869
00870 BOOL CSocketThreadManager::ReSetTimeout(const TimerID &rTimerID)
00871 {
00872
00873 if (!rTimerID.iTimerID)
00874 {
00875
00876 ReportError("ReSetTimeout","Someone send an empty TimerID!");
00877
00878
00879 return FALSE;
00880 }
00881
00882 try
00883 {
00884
00885 int iResult;
00886 iResult=SetTimer(m_pThreadData[rTimerID.iIndex-1].hWindowHandle,
00887 rTimerID.iTimerID,
00888 rTimerID.iMS,
00889 NULL);
00890
00891
00892 if (!iResult)
00893 ReportError("ReSetTimeout","Failed to reset the timeout!");
00894
00895
00896 return iResult!=0;
00897 }
00898 ERROR_HANDLER_RETURN("ReSetTimeout",FALSE)
00899 }
00900
00901 const std::string& CSocketThreadManager::GetClassName()const
00902 {
00903 return m_sClassName;
00904 }
00905
00906 KOMODIA_NAMESPACE_END