EQStreamFactory.cpp 10 KB

  1. /*
  2. EQ2Emulator: Everquest II Server Emulator
  3. Copyright (C) 2007 EQ2EMulator Development Team (http://www.eq2emulator.net)
  4. This file is part of EQ2Emulator.
  5. EQ2Emulator is free software: you can redistribute it and/or modify
  6. it under the terms of the GNU General Public License as published by
  7. the Free Software Foundation, either version 3 of the License, or
  8. (at your option) any later version.
  9. EQ2Emulator is distributed in the hope that it will be useful,
  10. but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. GNU General Public License for more details.
  13. You should have received a copy of the GNU General Public License
  14. along with EQ2Emulator. If not, see <http://www.gnu.org/licenses/>.
  15. */
  16. #include "EQStreamFactory.h"
  17. #include "Log.h"
  18. #ifdef WIN32
  19. #include <WinSock2.h>
  20. #include <windows.h>
  21. #include <process.h>
  22. #include <io.h>
  23. #include <stdio.h>
  24. #else
  25. #include <sys/socket.h>
  26. #include <netinet/in.h>
  27. #include <sys/select.h>
  28. #include <arpa/inet.h>
  29. #include <netdb.h>
  30. #include <pthread.h>
  31. #endif
  32. #include <fcntl.h>
  33. #include <fstream>
  34. #include <iostream>
  35. #include "op_codes.h"
  36. #include "EQStream.h"
  37. #include "packet_dump.h"
  38. #ifdef WORLD
  39. #include "../WorldServer/client.h"
  40. #endif
  41. using namespace std;
  42. #ifdef WORLD
  43. extern ClientList client_list;
  44. #endif
  45. ThreadReturnType EQStreamFactoryReaderLoop(void *eqfs)
  46. {
  47. if(eqfs){
  48. EQStreamFactory *fs=(EQStreamFactory *)eqfs;
  49. fs->ReaderLoop();
  50. }
  52. }
  53. ThreadReturnType EQStreamFactoryWriterLoop(void *eqfs)
  54. {
  55. if(eqfs){
  56. EQStreamFactory *fs=(EQStreamFactory *)eqfs;
  57. fs->WriterLoop();
  58. }
  60. }
  61. ThreadReturnType EQStreamFactoryCombinePacketLoop(void *eqfs)
  62. {
  63. if(eqfs){
  64. EQStreamFactory *fs=(EQStreamFactory *)eqfs;
  65. fs->CombinePacketLoop();
  66. }
  68. }
  69. EQStreamFactory::EQStreamFactory(EQStreamType type, int port)
  70. {
  71. StreamType=type;
  72. Port=port;
  73. listen_ip_address = 0;
  74. }
  75. void EQStreamFactory::Close()
  76. {
  77. CheckTimeout(true);
  78. Stop();
  79. #ifdef WIN32
  80. closesocket(sock);
  81. #else
  82. close(sock);
  83. #endif
  84. sock=-1;
  85. }
  86. bool EQStreamFactory::Open()
  87. {
  88. struct sockaddr_in address;
  89. #ifndef WIN32
  90. pthread_t t1, t2, t3;
  91. #endif
  92. /* Setup internet address information.
  93. This is used with the bind() call */
  94. memset((char *) &address, 0, sizeof(address));
  95. address.sin_family = AF_INET;
  96. address.sin_port = htons(Port);
  97. #if defined(LOGIN) || defined(MINILOGIN)
  98. if(listen_ip_address)
  99. address.sin_addr.s_addr = inet_addr(listen_ip_address);
  100. else
  101. address.sin_addr.s_addr = htonl(INADDR_ANY);
  102. #else
  103. address.sin_addr.s_addr = htonl(INADDR_ANY);
  104. #endif
  105. /* Setting up UDP port for new clients */
  106. sock = socket(AF_INET, SOCK_DGRAM, 0);
  107. if (sock < 0) {
  108. return false;
  109. }
  110. if (bind(sock, (struct sockaddr *) &address, sizeof(address)) < 0) {
  111. close(sock);
  112. sock=-1;
  113. return false;
  114. }
  115. #ifdef WIN32
  116. unsigned long nonblock = 1;
  117. ioctlsocket(sock, FIONBIO, &nonblock);
  118. #else
  119. fcntl(sock, F_SETFL, O_NONBLOCK);
  120. #endif
  121. //moved these because on windows the output was delayed and causing the console window to look bad
  122. LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Reader");
  123. LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Writer");
  124. #ifdef WIN32
  125. _beginthread(EQStreamFactoryReaderLoop,0, this);
  126. _beginthread(EQStreamFactoryWriterLoop,0, this);
  127. _beginthread(EQStreamFactoryCombinePacketLoop,0, this);
  128. #else
  129. pthread_create(&t1,NULL,EQStreamFactoryReaderLoop,this);
  130. pthread_create(&t2,NULL,EQStreamFactoryWriterLoop,this);
  131. pthread_create(&t3,NULL,EQStreamFactoryCombinePacketLoop,this);
  132. pthread_detach(t1);
  133. pthread_detach(t2);
  134. pthread_detach(t3);
  135. #endif
  136. return true;
  137. }
  138. EQStream *EQStreamFactory::Pop()
  139. {
  140. EQStream *s=NULL;
  141. //cout << "Pop():Locking MNewStreams" << endl;
  142. MNewStreams.lock();
  143. if (NewStreams.size()) {
  144. s=NewStreams.front();
  145. NewStreams.pop();
  146. s->PutInUse();
  147. }
  148. MNewStreams.unlock();
  149. //cout << "Pop(): Unlocking MNewStreams" << endl;
  150. return s;
  151. }
  152. void EQStreamFactory::Push(EQStream *s)
  153. {
  154. //cout << "Push():Locking MNewStreams" << endl;
  155. MNewStreams.lock();
  156. NewStreams.push(s);
  157. MNewStreams.unlock();
  158. //cout << "Push(): Unlocking MNewStreams" << endl;
  159. }
  160. void EQStreamFactory::ReaderLoop()
  161. {
  162. fd_set readset;
  163. map<string,EQStream *>::iterator stream_itr;
  164. int num;
  165. int length;
  166. unsigned char buffer[2048];
  167. sockaddr_in from;
  168. int socklen=sizeof(sockaddr_in);
  169. timeval sleep_time;
  170. ReaderRunning=true;
  171. while(sock!=-1) {
  172. MReaderRunning.lock();
  173. if (!ReaderRunning)
  174. break;
  175. MReaderRunning.unlock();
  176. FD_ZERO(&readset);
  177. FD_SET(sock,&readset);
  178. sleep_time.tv_sec=30;
  179. sleep_time.tv_usec=0;
  180. if ((num=select(sock+1,&readset,NULL,NULL,&sleep_time))<0) {
  181. // What do we wanna do?
  182. } else if (num==0)
  183. continue;
  184. if (FD_ISSET(sock,&readset)) {
  185. #ifdef WIN32
  186. if ((length=recvfrom(sock,(char*)buffer,sizeof(buffer),0,(struct sockaddr*)&from,(int *)&socklen))<0)
  187. #else
  188. if ((length=recvfrom(sock,buffer,2048,0,(struct sockaddr *)&from,(socklen_t *)&socklen))<0)
  189. #endif
  190. {
  191. // What do we wanna do?
  192. } else {
  193. char temp[25];
  194. sprintf(temp,"%u.%d",ntohl(from.sin_addr.s_addr),ntohs(from.sin_port));
  195. MStreams.lock();
  196. if ((stream_itr=Streams.find(temp))==Streams.end() || buffer[1]==OP_SessionRequest) {
  197. MStreams.unlock();
  198. if (buffer[1]==OP_SessionRequest) {
  199. if(stream_itr != Streams.end() && stream_itr->second)
  200. stream_itr->second->SetState(CLOSED);
  201. EQStream *s=new EQStream(from);
  202. s->SetFactory(this);
  203. s->SetStreamType(StreamType);
  204. Streams[temp]=s;
  205. WriterWork.Signal();
  206. Push(s);
  207. s->Process(buffer,length);
  208. s->SetLastPacketTime(Timer::GetCurrentTime2());
  209. }
  210. } else {
  211. EQStream *curstream = stream_itr->second;
  212. //dont bother processing incoming packets for closed connections
  213. if(curstream->CheckClosed())
  214. curstream = NULL;
  215. else
  216. curstream->PutInUse();
  217. MStreams.unlock();
  218. if(curstream) {
  219. curstream->Process(buffer,length);
  220. curstream->SetLastPacketTime(Timer::GetCurrentTime2());
  221. curstream->ReleaseFromUse();
  222. }
  223. }
  224. }
  225. }
  226. }
  227. }
  228. void EQStreamFactory::CheckTimeout(bool remove_all)
  229. {
  230. //lock streams the entire time were checking timeouts, it should be fast.
  231. MStreams.lock();
  232. unsigned long now=Timer::GetCurrentTime2();
  233. map<string,EQStream *>::iterator stream_itr;
  234. for(stream_itr=Streams.begin();stream_itr!=Streams.end();) {
  235. EQStream *s = stream_itr->second;
  236. EQStreamState state = s->GetState();
  237. if (state==CLOSING && !s->HasOutgoingData()) {
  238. stream_itr->second->SetState(CLOSED);
  239. state = CLOSED;
  240. } else if (s->CheckTimeout(now, STREAM_TIMEOUT)) {
  241. const char* stateString;
  242. switch (state){
  243. case ESTABLISHED:
  244. stateString = "Established";
  245. break;
  246. case CLOSING:
  247. stateString = "Closing";
  248. break;
  249. case CLOSED:
  250. stateString = "Closed";
  251. break;
  252. default:
  253. stateString = "Unknown";
  254. break;
  255. }
  256. LogWrite(WORLD__DEBUG, 0, "World", "Timeout up!, state=%s", stateString);
  257. if (state==ESTABLISHED) {
  258. s->Close();
  259. } else if (state == CLOSING) {
  260. //if we time out in the closing state, just give up
  261. s->SetState(CLOSED);
  262. state = CLOSED;
  263. }
  264. }
  265. //not part of the else so we check it right away on state change
  266. if (remove_all || state==CLOSED) {
  267. if (!remove_all && s->getTimeoutDelays()<2) {
  268. s->addTimeoutDelay();
  269. //give it a little time for everybody to finish with it
  270. } else {
  271. //everybody is done, we can delete it now
  272. LogWrite(WORLD__DEBUG, 0, "World", "Removing connection...");
  273. map<string,EQStream *>::iterator temp=stream_itr;
  274. stream_itr++;
  275. //let whoever has the stream outside delete it
  276. #ifdef WORLD
  277. client_list.RemoveConnection(temp->second);
  278. #endif
  279. delete temp->second;
  280. Streams.erase(temp);
  281. continue;
  282. }
  283. }
  284. stream_itr++;
  285. }
  286. MStreams.unlock();
  287. }
  288. void EQStreamFactory::CombinePacketLoop(){
  289. deque<EQStream*> combine_que;
  290. CombinePacketRunning = true;
  291. bool packets_waiting = false;
  292. while(sock!=-1) {
  293. if (!CombinePacketRunning)
  294. break;
  295. MStreams.lock();
  296. map<string,EQStream *>::iterator stream_itr;
  297. for(stream_itr=Streams.begin();stream_itr!=Streams.end();stream_itr++) {
  298. if(!stream_itr->second){
  299. continue;
  300. }
  301. if(stream_itr->second->combine_timer && stream_itr->second->combine_timer->Check())
  302. combine_que.push_back(stream_itr->second);
  303. }
  304. EQStream* stream = 0;
  305. packets_waiting = false;
  306. while(combine_que.size()){
  307. stream = combine_que.front();
  308. if(stream->CheckActive()){
  309. if(!stream->CheckCombineQueue())
  310. packets_waiting = true;
  311. }
  312. combine_que.pop_front();
  313. }
  314. MStreams.unlock();
  315. if(!packets_waiting)
  316. Sleep(25);
  317. }
  318. }
  319. void EQStreamFactory::WriterLoop()
  320. {
  321. map<string,EQStream *>::iterator stream_itr;
  322. vector<EQStream *> wants_write;
  323. vector<EQStream *>::iterator cur,end;
  324. deque<EQStream*> resend_que;
  325. bool decay=false;
  326. uint32 stream_count;
  327. Timer DecayTimer(20);
  328. WriterRunning=true;
  329. DecayTimer.Enable();
  330. while(sock!=-1) {
  331. Timer::SetCurrentTime();
  332. //if (!havework) {
  333. //WriterWork.Wait();
  334. //}
  335. MWriterRunning.lock();
  336. if (!WriterRunning)
  337. break;
  338. MWriterRunning.unlock();
  339. wants_write.clear();
  340. decay=DecayTimer.Check();
  341. //copy streams into a seperate list so we dont have to keep
  342. //MStreams locked while we are writting
  343. MStreams.lock();
  344. for(stream_itr=Streams.begin();stream_itr!=Streams.end();stream_itr++) {
  345. // If it's time to decay the bytes sent, then let's do it before we try to write
  346. if(!stream_itr->second){
  347. Streams.erase(stream_itr);
  348. break;
  349. }
  350. if (decay)
  351. stream_itr->second->Decay();
  352. if (stream_itr->second->HasOutgoingData()) {
  353. stream_itr->second->PutInUse();
  354. wants_write.push_back(stream_itr->second);
  355. }
  356. if(stream_itr->second->resend_que_timer->Check())
  357. resend_que.push_back(stream_itr->second);
  358. }
  359. MStreams.unlock();
  360. //do the actual writes
  361. cur = wants_write.begin();
  362. end = wants_write.end();
  363. for(; cur != end; cur++) {
  364. (*cur)->Write(sock);
  365. (*cur)->ReleaseFromUse();
  366. }
  367. while(resend_que.size()){
  368. resend_que.front()->CheckResend(sock);
  369. resend_que.pop_front();
  370. }
  371. Sleep(10);
  372. MStreams.lock();
  373. stream_count=Streams.size();
  374. MStreams.unlock();
  375. if (!stream_count) {
  376. //cout << "No streams, waiting on condition" << endl;
  377. WriterWork.Wait();
  378. //cout << "Awake from condition, must have a stream now" << endl;
  379. }
  380. }
  381. }