9
3

EQStreamFactory.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. /*
  2. EQ2Emulator: Everquest II Server Emulator
  3. Copyright (C) 2007 EQ2EMulator Development Team (http://www.eq2emulator.net)
  4. This file is part of EQ2Emulator.
  5. EQ2Emulator is free software: you can redistribute it and/or modify
  6. it under the terms of the GNU General Public License as published by
  7. the Free Software Foundation, either version 3 of the License, or
  8. (at your option) any later version.
  9. EQ2Emulator is distributed in the hope that it will be useful,
  10. but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. GNU General Public License for more details.
  13. You should have received a copy of the GNU General Public License
  14. along with EQ2Emulator. If not, see <http://www.gnu.org/licenses/>.
  15. */
  16. #include "EQStreamFactory.h"
  17. #include "Log.h"
  18. #ifdef WIN32
  19. #include <WinSock2.h>
  20. #include <windows.h>
  21. #include <process.h>
  22. #include <io.h>
  23. #include <stdio.h>
  24. #else
  25. #include <sys/socket.h>
  26. #include <netinet/in.h>
  27. #include <sys/select.h>
  28. #include <arpa/inet.h>
  29. #include <netdb.h>
  30. #include <pthread.h>
  31. #endif
  32. #include <fcntl.h>
  33. #include <fstream>
  34. #include <iostream>
  35. #include "op_codes.h"
  36. #include "EQStream.h"
  37. #include "packet_dump.h"
  38. #ifdef WORLD
  39. #include "../WorldServer/client.h"
  40. #endif
  41. using namespace std;
  42. #ifdef WORLD
  43. extern ClientList client_list;
  44. #endif
  45. ThreadReturnType EQStreamFactoryReaderLoop(void *eqfs)
  46. {
  47. if(eqfs){
  48. EQStreamFactory *fs=(EQStreamFactory *)eqfs;
  49. fs->ReaderLoop();
  50. }
  51. THREAD_RETURN(NULL);
  52. }
  53. ThreadReturnType EQStreamFactoryWriterLoop(void *eqfs)
  54. {
  55. if(eqfs){
  56. EQStreamFactory *fs=(EQStreamFactory *)eqfs;
  57. fs->WriterLoop();
  58. }
  59. THREAD_RETURN(NULL);
  60. }
  61. ThreadReturnType EQStreamFactoryCombinePacketLoop(void *eqfs)
  62. {
  63. if(eqfs){
  64. EQStreamFactory *fs=(EQStreamFactory *)eqfs;
  65. fs->CombinePacketLoop();
  66. }
  67. THREAD_RETURN(NULL);
  68. }
  69. EQStreamFactory::EQStreamFactory(EQStreamType type, int port)
  70. {
  71. StreamType=type;
  72. Port=port;
  73. listen_ip_address = 0;
  74. }
  75. void EQStreamFactory::Close()
  76. {
  77. CheckTimeout(true);
  78. Stop();
  79. if (sock != -1) {
  80. #ifdef WIN32
  81. closesocket(sock);
  82. #else
  83. close(sock);
  84. #endif
  85. sock = -1;
  86. }
  87. }
  88. bool EQStreamFactory::Open()
  89. {
  90. struct sockaddr_in address;
  91. #ifndef WIN32
  92. pthread_t t1, t2, t3;
  93. #endif
  94. /* Setup internet address information.
  95. This is used with the bind() call */
  96. memset((char *) &address, 0, sizeof(address));
  97. address.sin_family = AF_INET;
  98. address.sin_port = htons(Port);
  99. #if defined(LOGIN) || defined(MINILOGIN)
  100. if(listen_ip_address)
  101. address.sin_addr.s_addr = inet_addr(listen_ip_address);
  102. else
  103. address.sin_addr.s_addr = htonl(INADDR_ANY);
  104. #else
  105. address.sin_addr.s_addr = htonl(INADDR_ANY);
  106. #endif
  107. /* Setting up UDP port for new clients */
  108. sock = socket(AF_INET, SOCK_DGRAM, 0);
  109. if (sock < 0) {
  110. return false;
  111. }
  112. if (::bind(sock, (struct sockaddr *) &address, sizeof(address)) < 0) {
  113. //close(sock);
  114. sock=-1;
  115. return false;
  116. }
  117. #ifdef WIN32
  118. unsigned long nonblock = 1;
  119. ioctlsocket(sock, FIONBIO, &nonblock);
  120. #else
  121. fcntl(sock, F_SETFL, O_NONBLOCK);
  122. #endif
  123. //moved these because on windows the output was delayed and causing the console window to look bad
  124. #ifdef LOGIN
  125. LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Reader");
  126. LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Writer");
  127. #elif WORLD
  128. LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Reader");
  129. LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Writer");
  130. #endif
  131. #ifdef WIN32
  132. _beginthread(EQStreamFactoryReaderLoop,0, this);
  133. _beginthread(EQStreamFactoryWriterLoop,0, this);
  134. _beginthread(EQStreamFactoryCombinePacketLoop,0, this);
  135. #else
  136. pthread_create(&t1,NULL,EQStreamFactoryReaderLoop,this);
  137. pthread_create(&t2,NULL,EQStreamFactoryWriterLoop,this);
  138. pthread_create(&t3,NULL,EQStreamFactoryCombinePacketLoop,this);
  139. pthread_detach(t1);
  140. pthread_detach(t2);
  141. pthread_detach(t3);
  142. #endif
  143. return true;
  144. }
  145. EQStream *EQStreamFactory::Pop()
  146. {
  147. if (!NewStreams.size())
  148. return NULL;
  149. EQStream *s=NULL;
  150. //cout << "Pop():Locking MNewStreams" << endl;
  151. MNewStreams.lock();
  152. if (NewStreams.size()) {
  153. s=NewStreams.front();
  154. NewStreams.pop();
  155. s->PutInUse();
  156. }
  157. MNewStreams.unlock();
  158. //cout << "Pop(): Unlocking MNewStreams" << endl;
  159. return s;
  160. }
  161. void EQStreamFactory::Push(EQStream *s)
  162. {
  163. //cout << "Push():Locking MNewStreams" << endl;
  164. MNewStreams.lock();
  165. NewStreams.push(s);
  166. MNewStreams.unlock();
  167. //cout << "Push(): Unlocking MNewStreams" << endl;
  168. }
  169. void EQStreamFactory::ReaderLoop()
  170. {
  171. fd_set readset;
  172. map<string,EQStream *>::iterator stream_itr;
  173. int num;
  174. int length;
  175. unsigned char buffer[2048];
  176. sockaddr_in from;
  177. int socklen=sizeof(sockaddr_in);
  178. timeval sleep_time;
  179. ReaderRunning=true;
  180. while(sock!=-1) {
  181. MReaderRunning.lock();
  182. if (!ReaderRunning)
  183. break;
  184. MReaderRunning.unlock();
  185. FD_ZERO(&readset);
  186. FD_SET(sock,&readset);
  187. sleep_time.tv_sec=30;
  188. sleep_time.tv_usec=0;
  189. if ((num=select(sock+1,&readset,NULL,NULL,&sleep_time))<0) {
  190. // What do we wanna do?
  191. } else if (num==0)
  192. continue;
  193. if (FD_ISSET(sock,&readset)) {
  194. #ifdef WIN32
  195. if ((length=recvfrom(sock,(char*)buffer,sizeof(buffer),0,(struct sockaddr*)&from,(int *)&socklen))<2)
  196. #else
  197. if ((length=recvfrom(sock,buffer,2048,0,(struct sockaddr *)&from,(socklen_t *)&socklen))<2)
  198. #endif
  199. {
  200. // What do we wanna do?
  201. } else {
  202. char temp[25];
  203. sprintf(temp,"%u.%d",ntohl(from.sin_addr.s_addr),ntohs(from.sin_port));
  204. MStreams.lock();
  205. if ((stream_itr=Streams.find(temp))==Streams.end() || buffer[1]==OP_SessionRequest) {
  206. MStreams.unlock();
  207. if (buffer[1]==OP_SessionRequest) {
  208. if(stream_itr != Streams.end() && stream_itr->second)
  209. stream_itr->second->SetState(CLOSED);
  210. EQStream *s=new EQStream(from);
  211. s->SetFactory(this);
  212. s->SetStreamType(StreamType);
  213. Streams[temp]=s;
  214. WriterWork.Signal();
  215. Push(s);
  216. s->Process(buffer,length);
  217. s->SetLastPacketTime(Timer::GetCurrentTime2());
  218. }
  219. } else {
  220. EQStream *curstream = stream_itr->second;
  221. //dont bother processing incoming packets for closed connections
  222. if(curstream->CheckClosed())
  223. curstream = NULL;
  224. else
  225. curstream->PutInUse();
  226. MStreams.unlock();
  227. if(curstream) {
  228. curstream->Process(buffer,length);
  229. curstream->SetLastPacketTime(Timer::GetCurrentTime2());
  230. curstream->ReleaseFromUse();
  231. }
  232. }
  233. }
  234. }
  235. }
  236. }
  237. void EQStreamFactory::CheckTimeout(bool remove_all)
  238. {
  239. //lock streams the entire time were checking timeouts, it should be fast.
  240. MStreams.lock();
  241. unsigned long now=Timer::GetCurrentTime2();
  242. map<string,EQStream *>::iterator stream_itr;
  243. for(stream_itr=Streams.begin();stream_itr!=Streams.end();) {
  244. EQStream *s = stream_itr->second;
  245. EQStreamState state = s->GetState();
  246. if (state==CLOSING && !s->HasOutgoingData()) {
  247. stream_itr->second->SetState(CLOSED);
  248. state = CLOSED;
  249. } else if (s->CheckTimeout(now, STREAM_TIMEOUT)) {
  250. const char* stateString;
  251. switch (state){
  252. case ESTABLISHED:
  253. stateString = "Established";
  254. break;
  255. case CLOSING:
  256. stateString = "Closing";
  257. break;
  258. case CLOSED:
  259. stateString = "Closed";
  260. break;
  261. case WAIT_CLOSE:
  262. stateString = "Wait-Close";
  263. break;
  264. default:
  265. stateString = "Unknown";
  266. break;
  267. }
  268. LogWrite(WORLD__DEBUG, 0, "World", "Timeout up!, state=%s (%u)", stateString, state);
  269. if (state==ESTABLISHED) {
  270. s->Close();
  271. }
  272. else if (state == WAIT_CLOSE) {
  273. s->SetState(CLOSING);
  274. state = CLOSING;
  275. }
  276. else if (state == CLOSING) {
  277. //if we time out in the closing state, just give up
  278. s->SetState(CLOSED);
  279. state = CLOSED;
  280. }
  281. }
  282. //not part of the else so we check it right away on state change
  283. if (remove_all || state==CLOSED) {
  284. if (!remove_all && s->getTimeoutDelays()<2) {
  285. s->addTimeoutDelay();
  286. //give it a little time for everybody to finish with it
  287. } else {
  288. //everybody is done, we can delete it now
  289. #ifdef LOGIN
  290. LogWrite(LOGIN__DEBUG, 0, "Login", "Removing connection...");
  291. #else
  292. LogWrite(WORLD__DEBUG, 0, "World", "Removing connection...");
  293. #endif
  294. map<string,EQStream *>::iterator temp=stream_itr;
  295. stream_itr++;
  296. //let whoever has the stream outside delete it
  297. #ifdef WORLD
  298. client_list.RemoveConnection(temp->second);
  299. #endif
  300. EQStream* stream = temp->second;
  301. Streams.erase(temp);
  302. delete stream;
  303. continue;
  304. }
  305. }
  306. stream_itr++;
  307. }
  308. MStreams.unlock();
  309. }
  310. void EQStreamFactory::CombinePacketLoop(){
  311. deque<EQStream*> combine_que;
  312. CombinePacketRunning = true;
  313. bool packets_waiting = false;
  314. while(sock!=-1) {
  315. if (!CombinePacketRunning)
  316. break;
  317. MStreams.lock();
  318. map<string,EQStream *>::iterator stream_itr;
  319. for(stream_itr=Streams.begin();stream_itr!=Streams.end();stream_itr++) {
  320. if(!stream_itr->second){
  321. continue;
  322. }
  323. if(stream_itr->second->combine_timer && stream_itr->second->combine_timer->Check())
  324. combine_que.push_back(stream_itr->second);
  325. }
  326. EQStream* stream = 0;
  327. packets_waiting = false;
  328. while(combine_que.size()){
  329. stream = combine_que.front();
  330. if(stream->CheckActive()){
  331. if(!stream->CheckCombineQueue())
  332. packets_waiting = true;
  333. }
  334. combine_que.pop_front();
  335. }
  336. MStreams.unlock();
  337. if(!packets_waiting)
  338. Sleep(25);
  339. Sleep(1);
  340. }
  341. }
  342. void EQStreamFactory::WriterLoop()
  343. {
  344. map<string,EQStream *>::iterator stream_itr;
  345. vector<EQStream *> wants_write;
  346. vector<EQStream *>::iterator cur,end;
  347. deque<EQStream*> resend_que;
  348. bool decay=false;
  349. uint32 stream_count;
  350. Timer DecayTimer(20);
  351. WriterRunning=true;
  352. DecayTimer.Enable();
  353. while(sock!=-1) {
  354. Timer::SetCurrentTime();
  355. //if (!havework) {
  356. //WriterWork.Wait();
  357. //}
  358. MWriterRunning.lock();
  359. if (!WriterRunning)
  360. break;
  361. MWriterRunning.unlock();
  362. wants_write.clear();
  363. decay=DecayTimer.Check();
  364. //copy streams into a seperate list so we dont have to keep
  365. //MStreams locked while we are writting
  366. MStreams.lock();
  367. for(stream_itr=Streams.begin();stream_itr!=Streams.end();stream_itr++) {
  368. // If it's time to decay the bytes sent, then let's do it before we try to write
  369. if(!stream_itr->second){
  370. Streams.erase(stream_itr);
  371. break;
  372. }
  373. if (decay)
  374. stream_itr->second->Decay();
  375. if (stream_itr->second->HasOutgoingData()) {
  376. stream_itr->second->PutInUse();
  377. wants_write.push_back(stream_itr->second);
  378. }
  379. if(stream_itr->second->resend_que_timer->Check())
  380. resend_que.push_back(stream_itr->second);
  381. }
  382. MStreams.unlock();
  383. //do the actual writes
  384. cur = wants_write.begin();
  385. end = wants_write.end();
  386. for(; cur != end; cur++) {
  387. (*cur)->Write(sock);
  388. (*cur)->ReleaseFromUse();
  389. }
  390. while(resend_que.size()){
  391. resend_que.front()->CheckResend(sock);
  392. resend_que.pop_front();
  393. }
  394. Sleep(10);
  395. MStreams.lock();
  396. stream_count=Streams.size();
  397. MStreams.unlock();
  398. if (!stream_count) {
  399. //cout << "No streams, waiting on condition" << endl;
  400. WriterWork.Wait();
  401. //cout << "Awake from condition, must have a stream now" << endl;
  402. }
  403. }
  404. }