00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include "stdafx.h"
00028 #include "SocketPool.h"
00029
00030 #include "ErrorHandlerMacros.h"
00031 #include "OSManager.h"
00032 #include "GenericCriticalSection.h"
00033
00034 KOMODIA_NAMESPACE_START
00035
00036 #define CSocketPool_Class "CSocketPool"
00037
00038 CSocketPool::CSocketPool(int iMaxSockets,
00039 LPSocketAllocator pAllocator,
00040 int iMaxQueue) : CErrorHandler(),
00041 m_iMaxSocket(iMaxSockets),
00042 m_pAllocator(pAllocator),
00043 m_pCSection(NULL),
00044 m_pCSectionQueue(NULL),
00045 m_iActiveSockets(0),
00046 m_bClosing(FALSE),
00047 m_iMaxBackQueue(iMaxQueue)
00048 {
00049 try
00050 {
00051
00052 SetName(CSocketPool_Class);
00053
00054
00055 m_pCSection=COSManager::CreateCriticalSection();
00056 m_pCSectionQueue=COSManager::CreateCriticalSection();
00057 }
00058 ERROR_HANDLER("CSocketPool")
00059 }
00060
00061 CSocketPool::~CSocketPool()
00062 {
00063 try
00064 {
00065
00066 Clear();
00067
00068
00069 delete m_pCSection;
00070 delete m_pCSectionQueue;
00071 }
00072 ERROR_HANDLER("~CSocketPool")
00073 }
00074
00075 void CSocketPool::Clear()
00076 {
00077 try
00078 {
00079
00080 CCriticalAutoRelease aRelease(m_pCSection);
00081
00082
00083 m_bClosing=TRUE;
00084
00085
00086 CCriticalAutoRelease aRelease2(m_pCSectionQueue);
00087
00088
00089 SocketMap::iterator aIterator;
00090 aIterator=m_aSocketMap.begin();
00091
00092
00093 while (aIterator!=m_aSocketMap.end())
00094 {
00095
00096 aIterator->second->DeleteSocketFromThread(1000);
00097
00098
00099 ++aIterator;
00100 }
00101
00102
00103 Sleep(5000);
00104
00105
00106 m_bClosing=FALSE;
00107 }
00108 ERROR_HANDLER("Clear")
00109 }
00110
00111 BOOL CSocketPool::Connect(const std::string& rAddress,
00112 unsigned short usPort)
00113 {
00114 try
00115 {
00116
00117 CSocketPoolSocket* pSocket;
00118
00119
00120 if (!m_pAllocator)
00121 {
00122
00123 ReportError("Connect","No allocator!");
00124
00125
00126 return FALSE;
00127 }
00128
00129
00130 pSocket=(*m_pAllocator)(this);
00131
00132
00133 return Connect(rAddress,
00134 usPort,
00135 pSocket);
00136 }
00137 ERROR_HANDLER_RETURN("Connect",FALSE)
00138 }
00139
00140 BOOL CSocketPool::Connect(const std::string& rAddress,
00141 unsigned short usPort,
00142 CSocketPoolSocket* pSocket)
00143 {
00144 try
00145 {
00146
00147 std::auto_ptr<CSocketPoolSocket> pProtection(pSocket);
00148
00149
00150 if (m_bClosing)
00151 return TRUE;
00152
00153
00154 pSocket->SetConnectionData(rAddress,
00155 usPort);
00156
00157
00158 CCriticalAutoRelease aRelease(m_pCSection);
00159
00160
00161 if (m_iActiveSockets>=m_iMaxSocket)
00162 {
00163
00164 if (m_aQueue.size()>m_iMaxBackQueue)
00165 return FALSE;
00166
00167
00168 m_aQueue.push_back(pSocket);
00169
00170
00171 pProtection.release();
00172
00173
00174 return TRUE;
00175 }
00176
00177
00178 ++m_iActiveSockets;
00179
00180
00181 aRelease.Exit();
00182
00183
00184 if (!pSocket->Connect())
00185 {
00186
00187 CheckQueue();
00188
00189
00190 return FALSE;
00191 }
00192 else
00193 {
00194
00195 pProtection.release();
00196
00197
00198 return TRUE;
00199 }
00200 }
00201 ERROR_HANDLER_RETURN("Connect",FALSE)
00202 }
00203
00204 void CSocketPool::SocketConnected(CSocketPoolSocket* pSocket)
00205 {
00206 try
00207 {
00208
00209 CCriticalAutoRelease aRelease(m_pCSectionQueue);
00210
00211
00212 m_aSocketMap.insert(SocketMap::value_type(pSocket->GetID(),
00213 pSocket));
00214 }
00215 ERROR_HANDLER("SocketConnected")
00216 }
00217
00218 void CSocketPool::SocketClosed(CSocketPoolSocket* pSocket)
00219 {
00220 try
00221 {
00222 {
00223
00224 CCriticalAutoRelease aRelease(m_pCSectionQueue);
00225
00226
00227 m_aSocketMap.erase(pSocket->GetID());
00228
00229
00230 pSocket->DeleteSocketFromThread(1000);
00231 }
00232
00233
00234 if (!m_bClosing)
00235
00236 CheckQueue();
00237 }
00238 ERROR_HANDLER("SocketClosed")
00239 }
00240
00241 void CSocketPool::CheckQueue()
00242 {
00243 try
00244 {
00245
00246 CCriticalAutoRelease aRelease(m_pCSection);
00247
00248
00249 if (m_aQueue.empty())
00250
00251 --m_iActiveSockets;
00252 else
00253 {
00254
00255 CSocketPoolSocket* pSocket;
00256 pSocket=m_aQueue.front();
00257
00258
00259 std::auto_ptr<CSocketPoolSocket> pProtection(pSocket);
00260
00261
00262 m_aQueue.pop_front();
00263
00264
00265 aRelease.Exit();
00266
00267
00268 if (!pSocket->Connect())
00269
00270 CheckQueue();
00271 else
00272
00273 pProtection.release();
00274 }
00275 }
00276 ERROR_HANDLER("CheckQueue")
00277 }
00278
00279 KOMODIA_NAMESPACE_END