EQStream.cpp 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427
  1. /*
  2. EQ2Emulator: Everquest II Server Emulator
  3. Copyright (C) 2007 EQ2EMulator Development Team (http://www.eq2emulator.net)
  4. This file is part of EQ2Emulator.
  5. EQ2Emulator is free software: you can redistribute it and/or modify
  6. it under the terms of the GNU General Public License as published by
  7. the Free Software Foundation, either version 3 of the License, or
  8. (at your option) any later version.
  9. EQ2Emulator is distributed in the hope that it will be useful,
  10. but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. GNU General Public License for more details.
  13. You should have received a copy of the GNU General Public License
  14. along with EQ2Emulator. If not, see <http://www.gnu.org/licenses/>.
  15. */
  16. #ifdef WIN32
  17. #include <WinSock2.h>
  18. #include <windows.h>
  19. #endif
  20. #include "debug.h"
  21. #include <string>
  22. #include <iomanip>
  23. #include <iostream>
  24. #include <vector>
  25. #include <time.h>
  26. #include <sys/types.h>
  27. #ifdef WIN32
  28. #include <time.h>
  29. #else
  30. #include <sys/socket.h>
  31. #include <netinet/in.h>
  32. #include <sys/time.h>
  33. #include <sys/socket.h>
  34. #include <netdb.h>
  35. #include <fcntl.h>
  36. #include <arpa/inet.h>
  37. #endif
  38. #include "EQPacket.h"
  39. #include "EQStream.h"
  40. #include "EQStreamFactory.h"
  41. #include "misc.h"
  42. #include "Mutex.h"
  43. #include "op_codes.h"
  44. #include "CRC16.h"
  45. #include "packet_dump.h"
  46. #ifdef LOGIN
  47. #include "../LoginServer/login_structs.h"
  48. #endif
  49. #include "EQ2_Common_Structs.h"
  50. #include "Log.h"
  51. uint16 EQStream::MaxWindowSize=2048;
  52. void EQStream::init(bool resetSession) {
  53. if (resetSession)
  54. {
  55. streamactive = false;
  56. sessionAttempts = 0;
  57. }
  58. timeout_delays = 0;
  59. MInUse.lock();
  60. active_users = 0;
  61. MInUse.unlock();
  62. Session=0;
  63. Key=0;
  64. MaxLen=0;
  65. NextInSeq=0;
  66. NextOutSeq=0;
  67. CombinedAppPacket=NULL;
  68. MAcks.lock();
  69. MaxAckReceived = -1;
  70. NextAckToSend = -1;
  71. LastAckSent = -1;
  72. MAcks.unlock();
  73. LastSeqSent=-1;
  74. MaxSends=5;
  75. LastPacket=Timer::GetCurrentTime2();
  76. oversize_buffer=NULL;
  77. oversize_length=0;
  78. oversize_offset=0;
  79. Factory = NULL;
  80. MRate.lock();
  81. RateThreshold=RATEBASE/250;
  82. DecayRate=DECAYBASE/250;
  83. MRate.unlock();
  84. BytesWritten=0;
  85. crypto->setRC4Key(0);
  86. }
  87. EQStream::EQStream(sockaddr_in addr){
  88. crypto = new Crypto();
  89. resend_que_timer = new Timer(1000);
  90. combine_timer = new Timer(250); //250 milliseconds
  91. combine_timer->Start();
  92. resend_que_timer->Start();
  93. init();
  94. remote_ip=addr.sin_addr.s_addr;
  95. remote_port=addr.sin_port;
  96. State=CLOSED;
  97. StreamType=UnknownStream;
  98. compressed=true;
  99. encoded=false;
  100. app_opcode_size=2;
  101. #ifdef WIN32
  102. ZeroMemory(&stream, sizeof(z_stream));
  103. #else
  104. bzero(&stream, sizeof(z_stream));
  105. #endif
  106. stream.zalloc = (alloc_func)0;
  107. stream.zfree = (free_func)0;
  108. stream.opaque = (voidpf)0;
  109. deflateInit2(&stream, 9, Z_DEFLATED, 13, 9, Z_DEFAULT_STRATEGY);
  110. //deflateInit(&stream, 5);
  111. compressed_offset = 0;
  112. client_version = 0;
  113. received_packets = 0;
  114. sent_packets = 0;
  115. }
  116. EQProtocolPacket* EQStream::ProcessEncryptedData(uchar* data, int32 size, int16 opcode){
  117. //cout << "B4:\n";
  118. //DumpPacket(data, size);
  119. /*if(size >= 2 && data[0] == 0 && data[1] == 0){
  120. cout << "Attempting to fix packet!\n";
  121. //Have to fix bad packet from client or it will screw up encryption :P
  122. size--;
  123. data++;
  124. }*/
  125. crypto->RC4Decrypt(data,size);
  126. int8 offset = 0;
  127. if(data[0] == 0xFF && size > 2){
  128. offset = 3;
  129. memcpy(&opcode, data+sizeof(int8), sizeof(int16));
  130. }
  131. else{
  132. offset = 1;
  133. memcpy(&opcode, data, sizeof(int8));
  134. }
  135. //cout << "After:\n";
  136. //DumpPacket(data, size);
  137. return new EQProtocolPacket(opcode, data+offset, size - offset);
  138. }
  139. EQProtocolPacket* EQStream::ProcessEncryptedPacket(EQProtocolPacket *p){
  140. EQProtocolPacket* ret = NULL;
  141. if(p->opcode == OP_Packet && p->size > 2)
  142. ret = ProcessEncryptedData(p->pBuffer+2, p->size-2, p->opcode);
  143. else
  144. ret = ProcessEncryptedData(p->pBuffer, p->size, p->opcode);
  145. return ret;
  146. }
  147. bool EQStream::HandleEmbeddedPacket(EQProtocolPacket *p, int16 offset, int16 length){
  148. if(p && p->size >= ((uint32)(offset+2))){
  149. if(p->pBuffer[offset] == 0 && p->pBuffer[offset+1] == 0x19){
  150. if(length == 0)
  151. length = p->size-2-offset;
  152. else
  153. length-=2;
  154. #ifdef LE_DEBUG
  155. LogWrite(PACKET__DEBUG, 0, "Packet", "Creating OP_AppCombined Packet!");
  156. #endif
  157. EQProtocolPacket *subp=new EQProtocolPacket(OP_AppCombined, p->pBuffer+2+offset, length);
  158. subp->copyInfo(p);
  159. ProcessPacket(subp);
  160. safe_delete(subp);
  161. return true;
  162. }
  163. else if(p->pBuffer[offset] == 0 && p->pBuffer[offset+1] == 0){
  164. if(length == 0)
  165. length = p->size-1-offset;
  166. else
  167. length--;
  168. #ifdef LE_DEBUG
  169. LogWrite(PACKET__DEBUG, 0, "Packet", "Creating Opcode 0 Packet!");
  170. DumpPacket(p->pBuffer+1+offset, length);
  171. #endif
  172. EQProtocolPacket* newpacket = ProcessEncryptedData(p->pBuffer+1+offset, length, OP_Packet);
  173. if(newpacket){
  174. #ifdef LE_DEBUG
  175. LogWrite(PACKET__DEBUG, 0, "Packet", "Result: ");
  176. DumpPacket(newpacket);
  177. #endif
  178. EQApplicationPacket *ap = newpacket->MakeApplicationPacket(2);
  179. InboundQueuePush(ap);
  180. safe_delete(newpacket);
  181. }
  182. else
  183. LogWrite(PACKET__ERROR, 0, "Packet", "No Packet!");
  184. return true;
  185. }
  186. }
  187. return false;
  188. }
  189. void EQStream::ProcessPacket(EQProtocolPacket *p)
  190. {
  191. uint32 processed=0,subpacket_length=0;
  192. if (p) {
  193. if (p->opcode!=OP_SessionRequest && p->opcode!=OP_SessionResponse && !Session) {
  194. #ifdef EQN_DEBUG
  195. LogWrite(PACKET__ERROR, 0, "Packet", "*** Session not initialized, packet ignored ");
  196. //p->DumpRaw();
  197. #endif
  198. return;
  199. }
  200. //cout << "Received " << (int)p->opcode << ":\n";
  201. //DumpPacket(p->pBuffer, p->size);
  202. switch (p->opcode) {
  203. case OP_Combined: {
  204. processed=0;
  205. int8 offset = 0;
  206. int count = 0;
  207. #ifdef LE_DEBUG
  208. LogWrite(PACKET__DEBUG, 0, "Packet", "OP_Combined: ");
  209. DumpPacket(p);
  210. #endif
  211. while(processed<p->size) {
  212. if ((subpacket_length=(unsigned char)*(p->pBuffer+processed))==0xff) {
  213. subpacket_length=ntohs(*(uint16 *)(p->pBuffer+processed+1));
  214. offset = 3;
  215. }
  216. else
  217. offset = 1;
  218. count++;
  219. #ifdef LE_DEBUG
  220. LogWrite(PACKET__DEBUG, 0, "Packet", "OP_Combined Packet %i (%u) (%u): ", count, subpacket_length, processed);
  221. #endif
  222. EQProtocolPacket *subp=new EQProtocolPacket(p->pBuffer+processed+offset,subpacket_length);
  223. subp->copyInfo(p);
  224. #ifdef LE_DEBUG
  225. LogWrite(PACKET__DEBUG, 0, "Packet", "Opcode %i:", subp->opcode);
  226. DumpPacket(subp);
  227. #endif
  228. ProcessPacket(subp);
  229. #ifdef LE_DEBUG
  230. DumpPacket(subp);
  231. #endif
  232. delete subp;
  233. processed+=subpacket_length+offset;
  234. }
  235. break;
  236. }
  237. case OP_AppCombined: {
  238. processed=0;
  239. EQProtocolPacket* newpacket = 0;
  240. int8 offset = 0;
  241. #ifdef LE_DEBUG
  242. LogWrite(PACKET__DEBUG, 0, "Packet", "OP_AppCombined: ");
  243. DumpPacket(p);
  244. #endif
  245. int count = 0;
  246. while(processed<p->size) {
  247. count++;
  248. if ((subpacket_length=(unsigned char)*(p->pBuffer+processed))==0xff) {
  249. subpacket_length=ntohs(*(uint16 *)(p->pBuffer+processed+1));
  250. offset = 3;
  251. } else
  252. offset = 1;
  253. if(crypto->getRC4Key()==0 && p->size >= 70){
  254. processRSAKey(p);
  255. }
  256. else if(crypto->isEncrypted()){
  257. #ifdef LE_DEBUG
  258. LogWrite(PACKET__DEBUG, 0, "Packet", "OP_AppCombined Packet %i (%u) (%u): ", count, subpacket_length, processed);
  259. DumpPacket(p->pBuffer+processed+offset, subpacket_length);
  260. #endif
  261. if(!HandleEmbeddedPacket(p, processed + offset, subpacket_length)){
  262. #ifdef LE_DEBUG
  263. LogWrite(PACKET__DEBUG, 0, "Packet", "OP_AppCombined Here:");
  264. #endif
  265. newpacket = ProcessEncryptedData(p->pBuffer+processed + offset, subpacket_length, OP_AppCombined);
  266. if(newpacket){
  267. #ifdef LE_DEBUG
  268. LogWrite(PACKET__DEBUG, 0, "Packet", "Opcode %i:", newpacket->opcode);
  269. DumpPacket(newpacket);
  270. #endif
  271. EQApplicationPacket* ap = newpacket->MakeApplicationPacket(2);
  272. #ifdef LE_DEBUG
  273. LogWrite(PACKET__DEBUG, 0, "Packet", "OP_AppCombined Here2:");
  274. DumpPacket(ap);
  275. #endif
  276. InboundQueuePush(ap);
  277. safe_delete(newpacket);
  278. }
  279. }
  280. }
  281. processed+=subpacket_length+offset;
  282. }
  283. }
  284. break;
  285. case OP_Packet: {
  286. if (!p->pBuffer || (p->Size() < 4))
  287. {
  288. break;
  289. }
  290. uint16 seq=ntohs(*(uint16 *)(p->pBuffer));
  291. sint8 check=CompareSequence(NextInSeq,seq);
  292. if (check>0) {
  293. #ifdef EQN_DEBUG
  294. LogWrite(PACKET__DEBUG, 1, "Packet", "*** Future packet: Expecting Seq=%i, but got Seq=%i", NextInSeq, seq);
  295. LogWrite(PACKET__DEBUG, 1, "Packet", "[Start]");
  296. p->DumpRawHeader(seq);
  297. LogWrite(PACKET__DEBUG, 1, "Packet", "[End]");
  298. #endif
  299. OutOfOrderpackets[seq] = p->Copy();
  300. // Image (2020): Removed as this is bad contributes to infinite loop
  301. //SendOutOfOrderAck(seq);
  302. } else if (check<0) {
  303. #ifdef EQN_DEBUG
  304. LogWrite(PACKET__DEBUG, 1, "Packet", "*** Duplicate packet: Expecting Seq=%i, but got Seq=%i", NextInSeq, seq);
  305. LogWrite(PACKET__DEBUG, 1, "Packet", "[Start]");
  306. p->DumpRawHeader(seq);
  307. LogWrite(PACKET__DEBUG, 1, "Packet", "[End]");
  308. #endif
  309. // Image (2020): Removed as this is bad contributes to infinite loop
  310. //OutOfOrderpackets[seq] = p->Copy();
  311. SendOutOfOrderAck(seq);
  312. } else {
  313. EQProtocolPacket* qp = RemoveQueue(seq);
  314. if (qp) {
  315. LogWrite(PACKET__DEBUG, 1, "Packet", "OP_Fragment: Removing older queued packet with sequence %i", seq);
  316. delete qp;
  317. }
  318. SetNextAckToSend(seq);
  319. NextInSeq++;
  320. if(HandleEmbeddedPacket(p))
  321. break;
  322. if(crypto->getRC4Key()==0 && p && p->size >= 70){
  323. processRSAKey(p);
  324. }
  325. else if(crypto->isEncrypted() && p){
  326. EQProtocolPacket* newpacket = ProcessEncryptedPacket(p);
  327. if(newpacket){
  328. EQApplicationPacket *ap = newpacket->MakeApplicationPacket(2);
  329. InboundQueuePush(ap);
  330. safe_delete(newpacket);
  331. }
  332. }
  333. }
  334. }
  335. break;
  336. case OP_Fragment: {
  337. if (!p->pBuffer || (p->Size() < 4))
  338. {
  339. break;
  340. }
  341. uint16 seq=ntohs(*(uint16 *)(p->pBuffer));
  342. sint8 check=CompareSequence(NextInSeq,seq);
  343. if (check>0) {
  344. #ifdef EQN_DEBUG
  345. LogWrite(PACKET__DEBUG, 1, "Packet", "*** Future packet2: Expecting Seq=%i, but got Seq=%i", NextInSeq, seq);
  346. LogWrite(PACKET__DEBUG, 1, "Packet", "[Start]");
  347. //p->DumpRawHeader(seq);
  348. LogWrite(PACKET__DEBUG, 1, "Packet", "[End]");
  349. #endif
  350. OutOfOrderpackets[seq] = p->Copy();
  351. //SendOutOfOrderAck(seq);
  352. } else if (check<0) {
  353. #ifdef EQN_DEBUG
  354. LogWrite(PACKET__DEBUG, 1, "Packet", "*** Duplicate packet2: Expecting Seq=%i, but got Seq=%i", NextInSeq, seq);
  355. LogWrite(PACKET__DEBUG, 1, "Packet", "[Start]");
  356. //p->DumpRawHeader(seq);
  357. LogWrite(PACKET__DEBUG, 1, "Packet", "[End]");
  358. #endif
  359. //OutOfOrderpackets[seq] = p->Copy();
  360. SendOutOfOrderAck(seq);
  361. } else {
  362. // In case we did queue one before as well.
  363. EQProtocolPacket* qp = RemoveQueue(seq);
  364. if (qp) {
  365. LogWrite(PACKET__DEBUG, 1, "Packet", "OP_Fragment: Removing older queued packet with sequence %i", seq);
  366. delete qp;
  367. }
  368. SetNextAckToSend(seq);
  369. NextInSeq++;
  370. if (oversize_buffer) {
  371. memcpy(oversize_buffer+oversize_offset,p->pBuffer+2,p->size-2);
  372. oversize_offset+=p->size-2;
  373. //cout << "Oversized is " << oversize_offset << "/" << oversize_length << " (" << (p->size-2) << ") Seq=" << seq << endl;
  374. if (oversize_offset==oversize_length) {
  375. if (*(p->pBuffer+2)==0x00 && *(p->pBuffer+3)==0x19) {
  376. EQProtocolPacket *subp=new EQProtocolPacket(oversize_buffer,oversize_offset);
  377. subp->copyInfo(p);
  378. ProcessPacket(subp);
  379. delete subp;
  380. } else {
  381. if(crypto->isEncrypted() && p && p->size > 2){
  382. EQProtocolPacket* p2 = ProcessEncryptedData(oversize_buffer, oversize_offset, p->opcode);
  383. EQApplicationPacket *ap = p2->MakeApplicationPacket(2);
  384. ap->copyInfo(p);
  385. InboundQueuePush(ap);
  386. safe_delete(p2);
  387. }
  388. }
  389. delete[] oversize_buffer;
  390. oversize_buffer=NULL;
  391. oversize_offset=0;
  392. }
  393. } else if (!oversize_buffer) {
  394. oversize_length=ntohl(*(uint32 *)(p->pBuffer+2));
  395. oversize_buffer=new unsigned char[oversize_length];
  396. memcpy(oversize_buffer,p->pBuffer+6,p->size-6);
  397. oversize_offset=p->size-6;
  398. //cout << "Oversized is " << oversize_offset << "/" << oversize_length << " (" << (p->size-6) << ") Seq=" << seq << endl;
  399. }
  400. }
  401. }
  402. break;
  403. case OP_KeepAlive: {
  404. #ifndef COLLECTOR
  405. NonSequencedPush(new EQProtocolPacket(p->opcode,p->pBuffer,p->size));
  406. #endif
  407. }
  408. break;
  409. case OP_Ack: {
  410. if (!p->pBuffer || (p->Size() < 4))
  411. {
  412. break;
  413. }
  414. uint16 seq=ntohs(*(uint16 *)(p->pBuffer));
  415. SetMaxAckReceived(seq);
  416. }
  417. break;
  418. case OP_SessionRequest: {
  419. if (p->Size() < sizeof(SessionRequest))
  420. {
  421. break;
  422. }
  423. if (GetState() == ESTABLISHED) {
  424. //_log(NET__ERROR, _L "Received OP_SessionRequest in ESTABLISHED state (%d) streamactive (%i) attempt (%i)" __L, GetState(), streamactive, sessionAttempts);
  425. // client seems to try a max of 4 times (initial +3 retries) then gives up, giving it a few more attempts just in case
  426. // streamactive means we identified the opcode, we cannot re-establish this connection
  427. if (streamactive || (sessionAttempts > 30))
  428. {
  429. SendDisconnect(false);
  430. SetState(CLOSED);
  431. break;
  432. }
  433. }
  434. sessionAttempts++;
  435. init(GetState() != ESTABLISHED);
  436. OutboundQueueClear();
  437. SessionRequest *Request=(SessionRequest *)p->pBuffer;
  438. Session=ntohl(Request->Session);
  439. SetMaxLen(ntohl(Request->MaxLength));
  440. #ifndef COLLECTOR
  441. NextInSeq=0;
  442. Key=0x33624702;
  443. SendSessionResponse();
  444. #endif
  445. SetState(ESTABLISHED);
  446. }
  447. break;
  448. case OP_SessionResponse: {
  449. if (p->Size() < sizeof(SessionResponse))
  450. {
  451. break;
  452. }
  453. init();
  454. OutboundQueueClear();
  455. SetActive(true);
  456. SessionResponse *Response=(SessionResponse *)p->pBuffer;
  457. SetMaxLen(ntohl(Response->MaxLength));
  458. Key=ntohl(Response->Key);
  459. NextInSeq=0;
  460. SetState(ESTABLISHED);
  461. if (!Session)
  462. Session=ntohl(Response->Session);
  463. compressed=(Response->Format&FLAG_COMPRESSED);
  464. encoded=(Response->Format&FLAG_ENCODED);
  465. // Kinda kludgy, but trie for now
  466. if (compressed) {
  467. if (remote_port==9000 || (remote_port==0 && p->src_port==9000))
  468. SetStreamType(WorldStream);
  469. else
  470. SetStreamType(ZoneStream);
  471. } else if (encoded)
  472. SetStreamType(ChatOrMailStream);
  473. else
  474. SetStreamType(LoginStream);
  475. }
  476. break;
  477. case OP_SessionDisconnect: {
  478. //NextInSeq=0;
  479. SendDisconnect();
  480. //SetState(CLOSED);
  481. }
  482. break;
  483. case OP_OutOfOrderAck: {
  484. if (!p->pBuffer || (p->Size() < 4))
  485. {
  486. break;
  487. }
  488. #ifndef COLLECTOR
  489. uint16 seq=ntohs(*(uint16 *)(p->pBuffer));
  490. if (CompareSequence(GetMaxAckReceived(),seq)>0 && CompareSequence(NextOutSeq,seq) < 0) {
  491. SetLastSeqSent(GetMaxAckReceived());
  492. }
  493. #endif
  494. }
  495. break;
  496. case OP_ServerKeyRequest:{
  497. if (p->Size() < sizeof(ClientSessionStats))
  498. {
  499. //_log(NET__ERROR, _L "Received OP_SessionStatRequest that was of malformed size" __L);
  500. break;
  501. }
  502. ClientSessionStats* Stats = (ClientSessionStats*)p->pBuffer;
  503. int16 request_id = Stats->RequestID;
  504. AdjustRates(ntohl(Stats->average_delta));
  505. ServerSessionStats* stats=(ServerSessionStats*)p->pBuffer;
  506. memset(stats, 0, sizeof(ServerSessionStats));
  507. stats->RequestID = request_id;
  508. stats->current_time = ntohl(Timer::GetCurrentTime2());
  509. stats->sent_packets = ntohl(sent_packets);
  510. stats->sent_packets2 = ntohl(sent_packets);
  511. stats->received_packets = ntohl(received_packets);
  512. stats->received_packets2 = ntohl(received_packets);
  513. NonSequencedPush(new EQProtocolPacket(OP_SessionStatResponse,p->pBuffer,p->size));
  514. if(!crypto->isEncrypted())
  515. SendKeyRequest();
  516. }
  517. break;
  518. case OP_SessionStatResponse: {
  519. LogWrite(PACKET__INFO, 0, "Packet", "OP_SessionStatResponse");
  520. }
  521. break;
  522. case OP_OutOfSession: {
  523. LogWrite(PACKET__INFO, 0, "Packet", "OP_OutOfSession");
  524. SendDisconnect();
  525. }
  526. break;
  527. default:
  528. EQApplicationPacket *ap = p->MakeApplicationPacket(app_opcode_size);
  529. InboundQueuePush(ap);
  530. LogWrite(PACKET__INFO, 0, "Packet", "Received unknown packet type, disconnecting client");
  531. //SendDisconnect();
  532. break;
  533. }
  534. }
  535. }
  536. int8 EQStream::EQ2_Compress(EQ2Packet* app, int8 offset){
  537. #ifdef LE_DEBUG
  538. LogWrite(PACKET__DEBUG, 0, "Packet", "Before Compress in %s, line %i:", __FUNCTION__, __LINE__);
  539. DumpPacket(app);
  540. #endif
  541. uchar* pDataPtr = app->pBuffer + offset;
  542. uchar* deflate_buff = new uchar[app->size];
  543. MCompressData.lock();
  544. stream.next_in = pDataPtr;
  545. stream.avail_in = app->size - offset;
  546. stream.next_out = deflate_buff;
  547. stream.avail_out = app->size;
  548. deflate(&stream, Z_SYNC_FLUSH);
  549. int32 newsize = app->size - stream.avail_out;
  550. safe_delete_array(app->pBuffer);
  551. app->size = newsize + offset;
  552. app->pBuffer = new uchar[app->size];
  553. app->pBuffer[(offset-1)] = 1;
  554. memcpy(app->pBuffer + offset, deflate_buff, newsize);
  555. MCompressData.unlock();
  556. safe_delete_array(deflate_buff);
  557. #ifdef LE_DEBUG
  558. LogWrite(PACKET__DEBUG, 0, "Packet", "After Compress in %s, line %i:", __FUNCTION__, __LINE__);
  559. DumpPacket(app);
  560. #endif
  561. return offset - 1;
  562. }
  563. int16 EQStream::processRSAKey(EQProtocolPacket *p){
  564. /*int16 limit = 0;
  565. int8 offset = 13;
  566. int8 offset2 = 0;
  567. if(p->pBuffer[2] == 0)
  568. limit = p->pBuffer[9];
  569. else{
  570. limit = p->pBuffer[5];
  571. offset2 = 5;
  572. offset-=1;
  573. }
  574. crypto->setRC4Key(Crypto::RSADecrypt(p->pBuffer + offset + (limit-8), 8));
  575. return (limit + offset +1) - offset2;*/
  576. if(p->pBuffer[0] == 0)
  577. crypto->setRC4Key(Crypto::RSADecrypt(p->pBuffer + 62, 8));
  578. else
  579. crypto->setRC4Key(Crypto::RSADecrypt(p->pBuffer + 61, 8));
  580. return 0;
  581. }
  582. void EQStream::SendKeyRequest(){
  583. int32 crypto_key_size = 60;
  584. int16 size = sizeof(KeyGen_Struct) + sizeof(KeyGen_End_Struct) + crypto_key_size;
  585. EQ2Packet *outapp=new EQ2Packet(OP_WSLoginRequestMsg,NULL,size);
  586. memcpy(&outapp->pBuffer[0], &crypto_key_size, sizeof(int32));
  587. memset(&outapp->pBuffer[4], 0xFF, crypto_key_size);
  588. memset(&outapp->pBuffer[size-5], 1, 1);
  589. memset(&outapp->pBuffer[size-1], 1, 1);
  590. EQ2QueuePacket(outapp);
  591. }
  592. void EQStream::EncryptPacket(EQ2Packet* app, int8 compress_offset, int8 offset){
  593. if(app->size>2 && crypto->isEncrypted()){
  594. app->packet_encrypted = true;
  595. uchar* crypt_buff = app->pBuffer;
  596. if(app->eq2_compressed)
  597. crypto->RC4Encrypt(crypt_buff + compress_offset, app->size - compress_offset);
  598. else
  599. crypto->RC4Encrypt(crypt_buff + 2 + offset, app->size - 2 - offset);
  600. }
  601. }
  602. void EQStream::EQ2QueuePacket(EQ2Packet* app, bool attempted_combine){
  603. if(CheckActive()){
  604. if(app->size < 600 && !attempted_combine){
  605. MCombineQueueLock.lock();
  606. combine_queue.push_back(app);
  607. MCombineQueueLock.unlock();
  608. }
  609. else{
  610. PreparePacket(app);
  611. #ifdef LE_DEBUG
  612. LogWrite(PACKET__DEBUG, 0, "Packet", "After B in %s, line %i:", __FUNCTION__, __LINE__);
  613. DumpPacket(app);
  614. #endif
  615. SendPacket(app);
  616. }
  617. }
  618. }
  619. void EQStream::UnPreparePacket(EQ2Packet* app){
  620. if(app->pBuffer[2] == 0 && app->pBuffer[3] == 19){
  621. uchar* new_buffer = new uchar[app->size-3];
  622. memcpy(new_buffer+2, app->pBuffer+5, app->size-3);
  623. delete[] app->pBuffer;
  624. app->size-=3;
  625. app->pBuffer = new_buffer;
  626. }
  627. }
  628. void EQStream::PreparePacket(EQ2Packet* app, int8 offset){
  629. app->setVersion(client_version);
  630. compressed_offset = 0;
  631. #ifdef LE_DEBUG
  632. LogWrite(PACKET__DEBUG, 0, "Packet", "Before A in %s, line %i:", __FUNCTION__, __LINE__);
  633. DumpPacket(app);
  634. #endif
  635. if(!app->packet_prepared){
  636. if(app->PreparePacket(MaxLen) == 255) //invalid version
  637. return;
  638. }
  639. #ifdef LE_DEBUG
  640. LogWrite(PACKET__DEBUG, 0, "Packet", "After Prepare in %s, line %i:", __FUNCTION__, __LINE__);
  641. DumpPacket(app);
  642. #endif
  643. if(!app->eq2_compressed && app->size>=0x80){
  644. compressed_offset = EQ2_Compress(app);
  645. app->eq2_compressed = true;
  646. }
  647. if(!app->packet_encrypted){
  648. EncryptPacket(app, compressed_offset, offset);
  649. if(app->size > 2 && app->pBuffer[2] == 0){
  650. uchar* new_buffer = new uchar[app->size+1];
  651. new_buffer[2] = 0;
  652. memcpy(new_buffer+3, app->pBuffer+2, app->size-2);
  653. delete[] app->pBuffer;
  654. app->pBuffer = new_buffer;
  655. app->size++;
  656. }
  657. }
  658. #ifdef LE_DEBUG
  659. LogWrite(PACKET__DEBUG, 0, "Packet", "After A in %s, line %i:", __FUNCTION__, __LINE__);
  660. DumpPacket(app);
  661. #endif
  662. }
  663. void EQStream::SendPacket(EQProtocolPacket *p)
  664. {
  665. uint32 chunksize,used;
  666. uint32 length;
  667. // Convert the EQApplicationPacket to 1 or more EQProtocolPackets
  668. if (p->size>( MaxLen-8)) { // proto-op(2), seq(2), app-op(2) ... data ... crc(2)
  669. uchar* tmpbuff=p->pBuffer;
  670. length=p->size - 2;
  671. EQProtocolPacket *out=new EQProtocolPacket(OP_Fragment,NULL,MaxLen-4);
  672. *(uint32 *)(out->pBuffer+2)=htonl(length);
  673. used=MaxLen-10;
  674. memcpy(out->pBuffer+6,tmpbuff+2,used);
  675. #ifdef LE_DEBUG
  676. LogWrite(PACKET__DEBUG, 0, "Packet", "(%s, %i) New Fragment: ", __FUNCTION__, __LINE__);
  677. DumpPacket(out);
  678. #endif
  679. SequencedPush(out);
  680. while (used<length) {
  681. chunksize=min(length-used,MaxLen-6);
  682. out=new EQProtocolPacket(OP_Fragment,NULL,chunksize+2);
  683. //memcpy(out->pBuffer+2,tmpbuff,1);
  684. memcpy(out->pBuffer+2,tmpbuff+used+2,chunksize);
  685. #ifdef LE_DEBUG
  686. LogWrite(PACKET__DEBUG, 0, "Packet", "Chunk: ");
  687. DumpPacket(out);
  688. #endif
  689. SequencedPush(out);
  690. used+=chunksize;
  691. }
  692. #ifdef LE_DEBUG
  693. LogWrite(PACKET__DEBUG, 0, "Packet", "Chunk: ");
  694. DumpPacket(out);
  695. cerr << "1: Deleting 0x" << hex << (uint32)(p) << dec << endl;
  696. #endif
  697. delete p;
  698. } else {
  699. SequencedPush(p);
  700. }
  701. }
  702. void EQStream::SendPacket(EQApplicationPacket *p)
  703. {
  704. uint32 chunksize,used;
  705. uint32 length;
  706. // Convert the EQApplicationPacket to 1 or more EQProtocolPackets
  707. if (p->size>(MaxLen-8)) { // proto-op(2), seq(2), app-op(2) ... data ... crc(2)
  708. //cout << "Making oversized packet for: " << endl;
  709. //cout << p->size << endl;
  710. //p->DumpRawHeader();
  711. //dump_message(p->pBuffer,p->size,timestamp());
  712. //cout << p->size << endl;
  713. unsigned char *tmpbuff=new unsigned char[p->size+2];
  714. //cout << hex << (int)tmpbuff << dec << endl;
  715. length=p->serialize(tmpbuff);
  716. EQProtocolPacket *out=new EQProtocolPacket(OP_Fragment,NULL,MaxLen-4);
  717. *(uint32 *)(out->pBuffer+2)=htonl(p->Size());
  718. memcpy(out->pBuffer+6,tmpbuff,MaxLen-10);
  719. used=MaxLen-10;
  720. SequencedPush(out);
  721. //cout << "Chunk #" << ++i << " size=" << used << ", length-used=" << (length-used) << endl;
  722. while (used<length) {
  723. out=new EQProtocolPacket(OP_Fragment,NULL,MaxLen-4);
  724. chunksize=min(length-used,MaxLen-6);
  725. memcpy(out->pBuffer+2,tmpbuff+used,chunksize);
  726. out->size=chunksize+2;
  727. SequencedPush(out);
  728. used+=chunksize;
  729. //cout << "Chunk #"<< ++i << " size=" << chunksize << ", length-used=" << (length-used) << endl;
  730. }
  731. //cerr << "1: Deleting 0x" << hex << (uint32)(p) << dec << endl;
  732. delete p;
  733. delete[] tmpbuff;
  734. } else {
  735. EQProtocolPacket *out=new EQProtocolPacket(OP_Packet,NULL,p->Size()+2);
  736. p->serialize(out->pBuffer+2);
  737. SequencedPush(out);
  738. //cerr << "2: Deleting 0x" << hex << (uint32)(p) << dec << endl;
  739. delete p;
  740. }
  741. }
  742. void EQStream::SequencedPush(EQProtocolPacket *p)
  743. {
  744. p->setVersion(client_version);
  745. MOutboundQueue.lock();
  746. *(uint16 *)(p->pBuffer)=htons(NextOutSeq);
  747. SequencedQueue.push_back(p);
  748. p->sequence = NextOutSeq;
  749. NextOutSeq++;
  750. MOutboundQueue.unlock();
  751. }
  752. void EQStream::NonSequencedPush(EQProtocolPacket *p)
  753. {
  754. p->setVersion(client_version);
  755. MOutboundQueue.lock();
  756. NonSequencedQueue.push_back(p);
  757. MOutboundQueue.unlock();
  758. }
  759. void EQStream::SendAck(uint16 seq)
  760. {
  761. uint16 Seq=htons(seq);
  762. SetLastAckSent(seq);
  763. NonSequencedPush(new EQProtocolPacket(OP_Ack,(unsigned char *)&Seq,sizeof(uint16)));
  764. }
  765. void EQStream::SendOutOfOrderAck(uint16 seq)
  766. {
  767. uint16 Seq=htons(seq);
  768. NonSequencedPush(new EQProtocolPacket(OP_OutOfOrderAck,(unsigned char *)&Seq,sizeof(uint16)));
  769. }
  770. bool EQStream::CheckCombineQueue(){
  771. bool ret = true; //processed all packets
  772. MCombineQueueLock.lock();
  773. if(combine_queue.size() > 0){
  774. EQ2Packet* first = combine_queue.front();
  775. combine_queue.pop_front();
  776. if(combine_queue.size() == 0){ //nothing to combine this with
  777. EQ2QueuePacket(first, true);
  778. }
  779. else{
  780. PreparePacket(first);
  781. EQ2Packet* second = 0;
  782. bool combine_worked = false;
  783. int16 count = 0;
  784. while(combine_queue.size()){
  785. count++;
  786. second = combine_queue.front();
  787. combine_queue.pop_front();
  788. MCombineQueueLock.unlock();
  789. PreparePacket(second);
  790. /*if(first->GetRawOpcode() != OP_AppCombined && first->pBuffer[2] == 0){
  791. EQ2Packet* tmp = second;
  792. second = first;
  793. first = tmp;
  794. }*/
  795. if(!first->AppCombine(second)){
  796. first->SetProtocolOpcode(OP_Packet);
  797. if(combine_worked){
  798. SequencedPush(first);
  799. }
  800. else{
  801. EQ2QueuePacket(first, true);
  802. }
  803. first = second;
  804. combine_worked = false;
  805. }
  806. else{
  807. combine_worked = true;
  808. //DumpPacket(first);
  809. }
  810. MCombineQueueLock.lock();
  811. if(count >= 20){ //other clients need packets too
  812. ret = false;
  813. break;
  814. }
  815. }
  816. if(first){
  817. first->SetProtocolOpcode(OP_Packet);
  818. if(combine_worked){
  819. SequencedPush(first);
  820. }
  821. else{
  822. EQ2QueuePacket(first, true);
  823. }
  824. }
  825. }
  826. }
  827. MCombineQueueLock.unlock();
  828. return ret;
  829. }
  830. void EQStream::CheckResend(int eq_fd){
  831. int32 curr = Timer::GetCurrentTime2();
  832. EQProtocolPacket* packet = 0;
  833. deque<EQProtocolPacket*>::iterator itr;
  834. MResendQue.lock();
  835. for(itr=resend_que.begin();itr!=resend_que.end();itr++){
  836. packet = *itr;
  837. if(packet->attempt_count >= 5){//tried to resend this packet 5 times, client must already have it but didnt ack it
  838. safe_delete(packet);
  839. itr = resend_que.erase(itr);
  840. if(itr == resend_que.end())
  841. break;
  842. }
  843. else{
  844. if((curr - packet->sent_time) < 1000)
  845. continue;
  846. packet->sent_time -=1000;
  847. packet->attempt_count++;
  848. WritePacket(eq_fd, packet);
  849. }
  850. }
  851. MResendQue.unlock();
  852. }
  853. void EQStream::Write(int eq_fd)
  854. {
  855. deque<EQProtocolPacket *> ReadyToSend;
  856. deque<EQProtocolPacket *> SeqReadyToSend;
  857. long maxack;
  858. map<uint16, EQProtocolPacket *>::iterator sitr;
  859. // Check our rate to make sure we can send more
  860. MRate.lock();
  861. sint32 threshold=RateThreshold;
  862. MRate.unlock();
  863. if (BytesWritten > threshold) {
  864. //cout << "Over threshold: " << BytesWritten << " > " << threshold << endl;
  865. return;
  866. }
  867. MCombinedAppPacket.lock();
  868. EQApplicationPacket *CombPack=CombinedAppPacket;
  869. CombinedAppPacket=NULL;
  870. MCombinedAppPacket.unlock();
  871. if (CombPack) {
  872. SendPacket(CombPack);
  873. }
  874. // If we got more packets to we need to ack, send an ack on the highest one
  875. MAcks.lock();
  876. maxack=MaxAckReceived;
  877. // Added from peaks findings
  878. if (NextAckToSend>LastAckSent || LastAckSent == 0x0000ffff)
  879. SendAck(NextAckToSend);
  880. MAcks.unlock();
  881. // Lock the outbound queues while we process
  882. MOutboundQueue.lock();
  883. // Adjust where we start sending in case we get a late ack
  884. if (maxack>LastSeqSent)
  885. LastSeqSent=maxack;
  886. // Place to hold the base packet t combine into
  887. EQProtocolPacket *p=NULL;
  888. // Loop until both are empty or MaxSends is reached
  889. // See if there are more non-sequenced packets left
  890. while (NonSequencedQueue.size()) {
  891. if (!p) {
  892. // If we don't have a packet to try to combine into, use this one as the base
  893. // And remove it form the queue
  894. p = NonSequencedQueue.front();
  895. NonSequencedQueue.pop_front();
  896. }
  897. //Check if we have a packet to combine p with...
  898. if (NonSequencedQueue.size()){
  899. if (!p->combine(NonSequencedQueue.front())) {
  900. // Tryint to combine this packet with the base didn't work (too big maybe)
  901. // So just send the base packet (we'll try this packet again later)
  902. ReadyToSend.push_back(p);
  903. BytesWritten += p->size;
  904. p = NULL;
  905. }
  906. else {
  907. // Combine worked, so just remove this packet and it's spot in the queue
  908. delete NonSequencedQueue.front();
  909. NonSequencedQueue.pop_front();
  910. }
  911. }
  912. else {
  913. //We have no packets to combine p with so just send it...
  914. ReadyToSend.push_back(p);
  915. BytesWritten += p->size;
  916. p = NULL;
  917. }
  918. if (BytesWritten > threshold) {
  919. // Sent enough this round, lets stop to be fair
  920. break;
  921. }
  922. }
  923. //The non-seq loop must have broke before we sent this packet, send it now
  924. if (p){
  925. ReadyToSend.push_back(p);
  926. BytesWritten += p->size;
  927. }
  928. if (SequencedQueue.size() && BytesWritten < threshold) {
  929. while(SequencedQueue.size()) {
  930. p = SequencedQueue.front();
  931. BytesWritten+=p->size;
  932. SeqReadyToSend.push_back(p);
  933. p->sent_time = Timer::GetCurrentTime2();
  934. resend_que.push_back(p);
  935. SequencedQueue.pop_front();
  936. LastSeqSent=p->sequence;
  937. if (BytesWritten > threshold) {
  938. break;
  939. }
  940. }
  941. }
  942. // Unlock the queue
  943. MOutboundQueue.unlock();
  944. // Send all the packets we "made"
  945. while(ReadyToSend.size()) {
  946. WritePacket(eq_fd,ReadyToSend.front());
  947. delete ReadyToSend.front();
  948. ReadyToSend.pop_front();
  949. }
  950. while(SeqReadyToSend.size()){
  951. WritePacket(eq_fd,SeqReadyToSend.front());
  952. SeqReadyToSend.pop_front();
  953. }
  954. }
  955. void EQStream::WritePacket(int eq_fd, EQProtocolPacket *p)
  956. {
  957. uint32 length = 0;
  958. sockaddr_in address;
  959. unsigned char tmpbuffer[1024];
  960. address.sin_family = AF_INET;
  961. address.sin_addr.s_addr=remote_ip;
  962. address.sin_port=remote_port;
  963. #ifdef NOWAY
  964. uint32 ip=address.sin_addr.s_addr;
  965. cout << "Sending to: "
  966. << (int)*(unsigned char *)&ip
  967. << "." << (int)*((unsigned char *)&ip+1)
  968. << "." << (int)*((unsigned char *)&ip+2)
  969. << "." << (int)*((unsigned char *)&ip+3)
  970. << "," << (int)ntohs(address.sin_port) << "(" << p->size << ")" << endl;
  971. p->DumpRaw();
  972. cout << "-------------" << endl;
  973. #endif
  974. length=p->serialize(buffer);
  975. if (p->opcode!=OP_SessionRequest && p->opcode!=OP_SessionResponse) {
  976. if (compressed) {
  977. uint32 newlen=EQProtocolPacket::Compress(buffer,length,tmpbuffer,1024);
  978. memcpy(buffer,tmpbuffer,newlen);
  979. length=newlen;
  980. }
  981. if (encoded) {
  982. EQProtocolPacket::ChatEncode(buffer,length,Key);
  983. }
  984. *(uint16 *)(buffer+length)=htons(CRC16(buffer,length,Key));
  985. length+=2;
  986. }
  987. sent_packets++;
  988. //dump_message_column(buffer,length,"Writer: ");
  989. //cout << "Raw Data:\n";
  990. //DumpPacket(buffer, length);
  991. sendto(eq_fd,(char *)buffer,length,0,(sockaddr *)&address,sizeof(address));
  992. }
  993. EQProtocolPacket *EQStream::Read(int eq_fd, sockaddr_in *from)
  994. {
  995. int socklen;
  996. int length=0;
  997. unsigned char buffer[2048];
  998. EQProtocolPacket *p=NULL;
  999. char temp[15];
  1000. socklen=sizeof(sockaddr);
  1001. #ifdef WIN32
  1002. length=recvfrom(eq_fd, (char *)buffer, 2048, 0, (struct sockaddr*)from, (int *)&socklen);
  1003. #else
  1004. length=recvfrom(eq_fd, buffer, 2048, 0, (struct sockaddr*)from, (socklen_t *)&socklen);
  1005. #endif
  1006. if (length>=2) {
  1007. DumpPacket(buffer, length);
  1008. p=new EQProtocolPacket(buffer[1],&buffer[2],length-2);
  1009. uint32 ip=from->sin_addr.s_addr;
  1010. sprintf(temp,"%d.%d.%d.%d:%d",
  1011. *(unsigned char *)&ip,
  1012. *((unsigned char *)&ip+1),
  1013. *((unsigned char *)&ip+2),
  1014. *((unsigned char *)&ip+3),
  1015. ntohs(from->sin_port));
  1016. //cout << timestamp() << "Data from: " << temp << " OpCode 0x" << hex << setw(2) << setfill('0') << (int)p->opcode << dec << endl;
  1017. //dump_message(p->pBuffer,p->size,timestamp());
  1018. }
  1019. return p;
  1020. }
  1021. void EQStream::SendSessionResponse()
  1022. {
  1023. EQProtocolPacket *out=new EQProtocolPacket(OP_SessionResponse,NULL,sizeof(SessionResponse));
  1024. SessionResponse *Response=(SessionResponse *)out->pBuffer;
  1025. Response->Session=htonl(Session);
  1026. Response->MaxLength=htonl(MaxLen);
  1027. Response->UnknownA=2;
  1028. Response->Format=0;
  1029. if (compressed)
  1030. Response->Format|=FLAG_COMPRESSED;
  1031. if (encoded)
  1032. Response->Format|=FLAG_ENCODED;
  1033. Response->Key=htonl(Key);
  1034. out->size=sizeof(SessionResponse);
  1035. NonSequencedPush(out);
  1036. }
  1037. void EQStream::SendSessionRequest()
  1038. {
  1039. EQProtocolPacket *out=new EQProtocolPacket(OP_SessionRequest,NULL,sizeof(SessionRequest));
  1040. SessionRequest *Request=(SessionRequest *)out->pBuffer;
  1041. memset(Request,0,sizeof(SessionRequest));
  1042. Request->Session=htonl(time(NULL));
  1043. Request->MaxLength=htonl(512);
  1044. NonSequencedPush(out);
  1045. }
  1046. void EQStream::SendDisconnect(bool setstate)
  1047. {
  1048. try{
  1049. if(GetState() != ESTABLISHED)
  1050. return;
  1051. EQProtocolPacket *out=new EQProtocolPacket(OP_SessionDisconnect,NULL,sizeof(uint32)+sizeof(int16));
  1052. *(uint32 *)out->pBuffer=htonl(Session);
  1053. out->pBuffer[4] = 0;
  1054. out->pBuffer[5] = 6;
  1055. NonSequencedPush(out);
  1056. if(setstate)
  1057. SetState(CLOSING);
  1058. }
  1059. catch(...){}
  1060. }
  1061. void EQStream::InboundQueuePush(EQApplicationPacket *p)
  1062. {
  1063. MInboundQueue.lock();
  1064. InboundQueue.push_back(p);
  1065. MInboundQueue.unlock();
  1066. }
  1067. EQApplicationPacket *EQStream::PopPacket()
  1068. {
  1069. EQApplicationPacket *p=NULL;
  1070. MInboundQueue.lock();
  1071. if (InboundQueue.size()) {
  1072. p=InboundQueue.front();
  1073. InboundQueue.pop_front();
  1074. }
  1075. MInboundQueue.unlock();
  1076. if(p)
  1077. p->setVersion(client_version);
  1078. return p;
  1079. }
  1080. void EQStream::InboundQueueClear()
  1081. {
  1082. MInboundQueue.lock();
  1083. while(InboundQueue.size()){
  1084. delete InboundQueue.front();
  1085. InboundQueue.pop_front();
  1086. }
  1087. MInboundQueue.unlock();
  1088. }
  1089. void EQStream::EncryptPacket(uchar* data, int16 size){
  1090. if(size>6){
  1091. }
  1092. }
  1093. bool EQStream::HasOutgoingData()
  1094. {
  1095. bool flag;
  1096. //once closed, we have nothing more to say
  1097. if(CheckClosed())
  1098. return(false);
  1099. MOutboundQueue.lock();
  1100. flag=(!NonSequencedQueue.empty());
  1101. if (!flag) {
  1102. flag = (!SequencedQueue.empty());
  1103. }
  1104. MOutboundQueue.unlock();
  1105. if (!flag) {
  1106. MAcks.lock();
  1107. flag= (NextAckToSend>LastAckSent);
  1108. MAcks.unlock();
  1109. }
  1110. if (!flag) {
  1111. MCombinedAppPacket.lock();
  1112. flag=(CombinedAppPacket!=NULL);
  1113. MCombinedAppPacket.unlock();
  1114. }
  1115. return flag;
  1116. }
  1117. void EQStream::OutboundQueueClear()
  1118. {
  1119. MOutboundQueue.lock();
  1120. while(NonSequencedQueue.size()) {
  1121. delete NonSequencedQueue.front();
  1122. NonSequencedQueue.pop_front();
  1123. }
  1124. while(SequencedQueue.size()) {
  1125. delete SequencedQueue.front();
  1126. SequencedQueue.pop_front();
  1127. }
  1128. MOutboundQueue.unlock();
  1129. }
  1130. void EQStream::Process(const unsigned char *buffer, const uint32 length)
  1131. {
  1132. received_packets++;
  1133. static unsigned char newbuffer[2048];
  1134. uint32 newlength=0;
  1135. if (EQProtocolPacket::ValidateCRC(buffer,length,Key)) {
  1136. if (compressed) {
  1137. newlength=EQProtocolPacket::Decompress(buffer,length,newbuffer,2048);
  1138. } else {
  1139. memcpy(newbuffer,buffer,length);
  1140. newlength=length;
  1141. if (encoded)
  1142. EQProtocolPacket::ChatDecode(newbuffer,newlength-2,Key);
  1143. }
  1144. if (buffer[1]!=0x01 && buffer[1]!=0x02 && buffer[1]!=0x1d)
  1145. newlength-=2;
  1146. EQProtocolPacket p(newbuffer,newlength);
  1147. ProcessPacket(&p);
  1148. ProcessQueue();
  1149. } else {
  1150. #ifdef EQN_DEBUG
  1151. cout << "Incoming packet failed checksum:" <<endl;
  1152. dump_message_column(const_cast<unsigned char *>(buffer),length,"CRC failed: ");
  1153. #endif
  1154. }
  1155. }
  1156. long EQStream::GetMaxAckReceived()
  1157. {
  1158. MAcks.lock();
  1159. long l=MaxAckReceived;
  1160. MAcks.unlock();
  1161. return l;
  1162. }
  1163. long EQStream::GetNextAckToSend()
  1164. {
  1165. MAcks.lock();
  1166. long l=NextAckToSend;
  1167. MAcks.unlock();
  1168. return l;
  1169. }
  1170. long EQStream::GetLastAckSent()
  1171. {
  1172. MAcks.lock();
  1173. long l=LastAckSent;
  1174. MAcks.unlock();
  1175. return l;
  1176. }
  1177. void EQStream::SetMaxAckReceived(uint32 seq)
  1178. {
  1179. deque<EQProtocolPacket *>::iterator itr;
  1180. MAcks.lock();
  1181. MaxAckReceived=seq;
  1182. MAcks.unlock();
  1183. MOutboundQueue.lock();
  1184. if (long(seq) > LastSeqSent)
  1185. LastSeqSent=seq;
  1186. MResendQue.lock();
  1187. EQProtocolPacket* packet = 0;
  1188. for(itr=resend_que.begin();itr!=resend_que.end();itr++){
  1189. packet = *itr;
  1190. if(packet && packet->sequence <= seq){
  1191. safe_delete(packet);
  1192. itr = resend_que.erase(itr);
  1193. if(itr == resend_que.end())
  1194. break;
  1195. }
  1196. }
  1197. MResendQue.unlock();
  1198. MOutboundQueue.unlock();
  1199. }
  1200. void EQStream::SetNextAckToSend(uint32 seq)
  1201. {
  1202. MAcks.lock();
  1203. NextAckToSend=seq;
  1204. MAcks.unlock();
  1205. }
  1206. void EQStream::SetLastAckSent(uint32 seq)
  1207. {
  1208. MAcks.lock();
  1209. LastAckSent=seq;
  1210. MAcks.unlock();
  1211. }
  1212. void EQStream::SetLastSeqSent(uint32 seq)
  1213. {
  1214. MOutboundQueue.lock();
  1215. LastSeqSent=seq;
  1216. MOutboundQueue.unlock();
  1217. }
  1218. void EQStream::SetStreamType(EQStreamType type)
  1219. {
  1220. StreamType=type;
  1221. switch (StreamType) {
  1222. case LoginStream:
  1223. app_opcode_size=1;
  1224. compressed=false;
  1225. encoded=false;
  1226. break;
  1227. case EQ2Stream:
  1228. app_opcode_size=2;
  1229. compressed=false;
  1230. encoded=false;
  1231. break;
  1232. case ChatOrMailStream:
  1233. case ChatStream:
  1234. case MailStream:
  1235. app_opcode_size=1;
  1236. compressed=false;
  1237. encoded=true;
  1238. break;
  1239. case ZoneStream:
  1240. case WorldStream:
  1241. default:
  1242. app_opcode_size=2;
  1243. compressed=true;
  1244. encoded=false;
  1245. break;
  1246. }
  1247. }
  1248. void EQStream::ProcessQueue()
  1249. {
  1250. if (OutOfOrderpackets.empty()) {
  1251. return;
  1252. }
  1253. EQProtocolPacket* qp = NULL;
  1254. while ((qp = RemoveQueue(NextInSeq)) != NULL) {
  1255. //_log(NET__DEBUG, _L "Processing Queued Packet: Seq=%d" __L, NextInSeq);
  1256. ProcessPacket(qp);
  1257. delete qp;
  1258. //_log(NET__APP_TRACE, _L "OP_Packet Queue size=%d" __L, PacketQueue.size());
  1259. }
  1260. }
  1261. EQProtocolPacket* EQStream::RemoveQueue(uint16 seq)
  1262. {
  1263. map<unsigned short, EQProtocolPacket*>::iterator itr;
  1264. EQProtocolPacket* qp = NULL;
  1265. if ((itr = OutOfOrderpackets.find(seq)) != OutOfOrderpackets.end()) {
  1266. qp = itr->second;
  1267. OutOfOrderpackets.erase(itr);
  1268. //_log(NET__APP_TRACE, _L "OP_Packet Queue size=%d" __L, PacketQueue.size());
  1269. }
  1270. return qp;
  1271. }
  1272. sint8 EQStream::CompareSequence(uint16 expected_seq , uint16 seq)
  1273. {
  1274. if (expected_seq==seq) {
  1275. // Curent
  1276. return 0;
  1277. } else if ((seq > expected_seq && (uint32)seq < ((uint32)expected_seq + EQStream::MaxWindowSize)) || seq < (expected_seq - EQStream::MaxWindowSize)) {
  1278. // Future
  1279. return 1;
  1280. } else {
  1281. // Past
  1282. return -1;
  1283. }
  1284. }
  1285. void EQStream::Decay()
  1286. {
  1287. MRate.lock();
  1288. uint32 rate=DecayRate;
  1289. MRate.unlock();
  1290. if (BytesWritten>0) {
  1291. BytesWritten-=rate;
  1292. if (BytesWritten<0)
  1293. BytesWritten=0;
  1294. }
  1295. }
  1296. void EQStream::AdjustRates(uint32 average_delta)
  1297. {
  1298. if (average_delta) {
  1299. MRate.lock();
  1300. RateThreshold=RATEBASE/average_delta;
  1301. DecayRate=DECAYBASE/average_delta;
  1302. MRate.unlock();
  1303. }
  1304. }