TCPConnection.cpp 42 KB


  1. /*
  2. EQ2Emulator: Everquest II Server Emulator
  3. Copyright (C) 2007 EQ2EMulator Development Team (http://www.eq2emulator.net)
  4. This file is part of EQ2Emulator.
  5. EQ2Emulator is free software: you can redistribute it and/or modify
  6. it under the terms of the GNU General Public License as published by
  7. the Free Software Foundation, either version 3 of the License, or
  8. (at your option) any later version.
  9. EQ2Emulator is distributed in the hope that it will be useful,
  10. but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. GNU General Public License for more details.
  13. You should have received a copy of the GNU General Public License
  14. along with EQ2Emulator. If not, see <http://www.gnu.org/licenses/>.
  15. */
  16. #include "../common/debug.h"
  17. #include <iostream>
  18. using namespace std;
  19. #include <string.h>
  20. #include <stdio.h>
  21. #include <iomanip>
  22. using namespace std;
  23. #include "TCPConnection.h"
  24. #include "../common/servertalk.h"
  25. #include "../common/timer.h"
  26. #include "../common/packet_dump.h"
  27. #include "Log.h"
  28. #ifdef FREEBSD //Timothy Whitman - January 7, 2003
  29. #define MSG_NOSIGNAL 0
  30. #endif
  31. #ifdef WIN32
  32. InitWinsock winsock;
  33. #endif
  34. #define LOOP_GRANULARITY 3 //# of ms between checking our socket/queues
  35. #define SERVER_LOOP_GRANULARITY 3 //# of ms between checking our socket/queues
  36. #define TCPN_DEBUG 0
  37. #define TCPN_DEBUG_Console 0
  38. #define TCPN_DEBUG_Memory 0
  39. #define TCPN_LOG_PACKETS 0
  40. #define TCPN_LOG_RAW_DATA_OUT 0
  41. #define TCPN_LOG_RAW_DATA_IN 0
  42. TCPConnection::TCPNetPacket_Struct* TCPConnection::MakePacket(ServerPacket* pack, int32 iDestination) {
  43. sint32 size = sizeof(TCPNetPacket_Struct) + pack->size;
  44. if (pack->compressed) {
  45. size += 4;
  46. }
  47. if (iDestination) {
  48. size += 4;
  49. }
  50. TCPNetPacket_Struct* tnps = (TCPNetPacket_Struct*) new uchar[size];
  51. tnps->size = size;
  52. tnps->opcode = pack->opcode;
  53. *((int8*) &tnps->flags) = 0;
  54. uchar* buffer = tnps->buffer;
  55. if (pack->compressed) {
  56. tnps->flags.compressed = 1;
  57. *((sint32*) buffer) = pack->InflatedSize;
  58. buffer += 4;
  59. }
  60. if (iDestination) {
  61. tnps->flags.destination = 1;
  62. *((sint32*) buffer) = iDestination;
  63. buffer += 4;
  64. }
  65. memcpy(buffer, pack->pBuffer, pack->size);
  66. return tnps;
  67. }
  68. TCPConnection::TCPConnection(bool iOldFormat, TCPServer* iRelayServer, eTCPMode iMode) {
  69. id = 0;
  70. Server = iRelayServer;
  71. if (Server)
  72. RelayServer = true;
  73. else
  74. RelayServer = false;
  75. RelayLink = 0;
  76. RelayCount = 0;
  77. RemoteID = 0;
  78. pOldFormat = iOldFormat;
  79. ConnectionType = Outgoing;
  80. TCPMode = iMode;
  81. pState = TCPS_Ready;
  82. pFree = false;
  83. pEcho = false;
  84. sock = 0;
  85. rIP = 0;
  86. rPort = 0;
  87. keepalive_timer = new Timer(SERVER_TIMEOUT);
  88. timeout_timer = new Timer(SERVER_TIMEOUT * 2);
  89. recvbuf = 0;
  90. sendbuf = 0;
  91. pRunLoop = false;
  92. charAsyncConnect = 0;
  93. pAsyncConnect = false;
  94. connection_socket = 0;
  95. recvbuf_size = 0;
  96. recvbuf_used = 0;
  97. recvbuf_echo = 0;
  98. sendbuf_size = 0;
  99. sendbuf_used = 0;
  100. #if TCPN_DEBUG_Memory >= 7
  101. cout << "Constructor #1 on outgoing TCP# " << GetID() << endl;
  102. #endif
  103. }
  104. TCPConnection::TCPConnection(TCPServer* iServer, SOCKET in_socket, int32 irIP, int16 irPort, bool iOldFormat) {
  105. Server = iServer;
  106. RelayLink = 0;
  107. RelayServer = false;
  108. RelayCount = 0;
  109. RemoteID = 0;
  110. id = Server->GetNextID();
  111. ConnectionType = Incomming;
  112. pOldFormat = iOldFormat;
  113. TCPMode = modePacket;
  114. pState = TCPS_Connected;
  115. pFree = false;
  116. pEcho = false;
  117. sock = 0;
  118. connection_socket = in_socket;
  119. rIP = irIP;
  120. rPort = irPort;
  121. keepalive_timer = new Timer(SERVER_TIMEOUT);
  122. timeout_timer = new Timer(SERVER_TIMEOUT * 2);
  123. recvbuf = 0;
  124. sendbuf = 0;
  125. pRunLoop = false;
  126. charAsyncConnect = 0;
  127. pAsyncConnect = false;
  128. recvbuf_size = 0;
  129. recvbuf_used = 0;
  130. recvbuf_echo = 0;
  131. sendbuf_size = 0;
  132. sendbuf_used = 0;
  133. #if TCPN_DEBUG_Memory >= 7
  134. cout << "Constructor #2 on outgoing TCP# " << GetID() << endl;
  135. #endif
  136. }
  137. TCPConnection::TCPConnection(TCPServer* iServer, TCPConnection* iRelayLink, int32 iRemoteID, int32 irIP, int16 irPort) {
  138. Server = iServer;
  139. RelayLink = iRelayLink;
  140. RelayServer = true;
  141. id = Server->GetNextID();
  142. RelayCount = 0;
  143. RemoteID = iRemoteID;
  144. if (!RemoteID)
  145. ThrowError("Error: TCPConnection: RemoteID == 0 on RelayLink constructor");
  146. pOldFormat = false;
  147. ConnectionType = Incomming;
  148. TCPMode = modePacket;
  149. pState = TCPS_Connected;
  150. pFree = false;
  151. pEcho = false;
  152. sock = 0;
  153. connection_socket = 0;
  154. rIP = irIP;
  155. rPort = irPort;
  156. keepalive_timer = 0;
  157. timeout_timer = 0;
  158. recvbuf = 0;
  159. sendbuf = 0;
  160. pRunLoop = false;
  161. charAsyncConnect = 0;
  162. pAsyncConnect = false;
  163. recvbuf_size = 0;
  164. recvbuf_used = 0;
  165. recvbuf_echo = 0;
  166. sendbuf_size = 0;
  167. sendbuf_used = 0;
  168. #if TCPN_DEBUG_Memory >= 7
  169. cout << "Constructor #3 on outgoing TCP# " << GetID() << endl;
  170. #endif
  171. }
  172. TCPConnection::~TCPConnection() {
  173. Disconnect();
  174. ClearBuffers();
  175. if (ConnectionType == Outgoing) {
  176. MRunLoop.lock();
  177. pRunLoop = false;
  178. MRunLoop.unlock();
  179. MLoopRunning.lock();
  180. MLoopRunning.unlock();
  181. #if TCPN_DEBUG_Memory >= 6
  182. cout << "Deconstructor on outgoing TCP# " << GetID() << endl;
  183. #endif
  184. }
  185. #if TCPN_DEBUG_Memory >= 5
  186. else {
  187. cout << "Deconstructor on incomming TCP# " << GetID() << endl;
  188. }
  189. #endif
  190. safe_delete(keepalive_timer);
  191. safe_delete(timeout_timer);
  192. safe_delete_array(recvbuf);
  193. safe_delete_array(sendbuf);
  194. safe_delete_array(charAsyncConnect);
  195. }
  196. void TCPConnection::SetState(int8 in_state) {
  197. MState.lock();
  198. pState = in_state;
  199. MState.unlock();
  200. }
  201. int8 TCPConnection::GetState() {
  202. int8 ret;
  203. MState.lock();
  204. ret = pState;
  205. MState.unlock();
  206. return ret;
  207. }
  208. void TCPConnection::Free() {
  209. if (ConnectionType == Outgoing) {
  210. ThrowError("TCPConnection::Free() called on an Outgoing connection");
  211. }
  212. #if TCPN_DEBUG_Memory >= 5
  213. cout << "Free on TCP# " << GetID() << endl;
  214. #endif
  215. Disconnect();
  216. pFree = true;
  217. }
  218. bool TCPConnection::SendPacket(ServerPacket* pack, int32 iDestination) {
  219. LockMutex lock(&MState);
  220. if (!Connected())
  221. return false;
  222. eTCPMode tmp = GetMode();
  223. if (tmp != modePacket && tmp != modeTransition)
  224. return false;
  225. if (RemoteID)
  226. return RelayLink->SendPacket(pack, RemoteID);
  227. else {
  228. TCPNetPacket_Struct* tnps = MakePacket(pack, iDestination);
  229. if (tmp == modeTransition) {
  230. InModeQueuePush(tnps);
  231. }
  232. else {
  233. #if TCPN_LOG_PACKETS >= 1
  234. if (pack && pack->opcode != 0) {
  235. struct in_addr in;
  236. in.s_addr = GetrIP();
  237. CoutTimestamp(true);
  238. cout << ": Logging outgoing TCP packet. OPCode: 0x" << hex << setw(4) << setfill('0') << pack->opcode << dec << ", size: " << setw(5) << setfill(' ') << pack->size << " " << inet_ntoa(in) << ":" << GetrPort() << endl;
  239. #if TCPN_LOG_PACKETS == 2
  240. if (pack->size >= 32)
  241. DumpPacket(pack->pBuffer, 32);
  242. else
  243. DumpPacket(pack);
  244. #endif
  245. #if TCPN_LOG_PACKETS >= 3
  246. DumpPacket(pack);
  247. #endif
  248. }
  249. #endif
  250. ServerSendQueuePushEnd((uchar**) &tnps, tnps->size);
  251. }
  252. }
  253. return true;
  254. }
  255. bool TCPConnection::SendPacket(TCPNetPacket_Struct* tnps) {
  256. LockMutex lock(&MState);
  257. if (RemoteID)
  258. return false;
  259. if (!Connected())
  260. return false;
  261. eTCPMode tmp = GetMode();
  262. if (tmp == modeTransition) {
  263. TCPNetPacket_Struct* tnps2 = (TCPNetPacket_Struct*) new uchar[tnps->size];
  264. memcpy(tnps2, tnps, tnps->size);
  265. InModeQueuePush(tnps2);
  266. return true;
  267. }
  268. if (GetMode() != modePacket)
  269. return false;
  270. #if TCPN_LOG_PACKETS >= 1
  271. if (tnps && tnps->opcode != 0) {
  272. struct in_addr in;
  273. in.s_addr = GetrIP();
  274. CoutTimestamp(true);
  275. cout << ": Logging outgoing TCP NetPacket. OPCode: 0x" << hex << setw(4) << setfill('0') << tnps->opcode << dec << ", size: " << setw(5) << setfill(' ') << tnps->size << " " << inet_ntoa(in) << ":" << GetrPort();
  276. if (pOldFormat)
  277. cout << " (OldFormat)";
  278. cout << endl;
  279. #if TCPN_LOG_PACKETS == 2
  280. if (tnps->size >= 32)
  281. DumpPacket((uchar*) tnps, 32);
  282. else
  283. DumpPacket((uchar*) tnps, tnps->size);
  284. #endif
  285. #if TCPN_LOG_PACKETS >= 3
  286. DumpPacket((uchar*) tnps, tnps->size);
  287. #endif
  288. }
  289. #endif
  290. ServerSendQueuePushEnd((const uchar*) tnps, tnps->size);
  291. return true;
  292. }
  293. bool TCPConnection::Send(const uchar* data, sint32 size) {
  294. if (!Connected())
  295. return false;
  296. if (GetMode() != modeConsole)
  297. return false;
  298. if (!size)
  299. return true;
  300. ServerSendQueuePushEnd(data, size);
  301. return true;
  302. }
  303. void TCPConnection::InModeQueuePush(TCPNetPacket_Struct* tnps) {
  304. MSendQueue.lock();
  305. InModeQueue.push(tnps);
  306. MSendQueue.unlock();
  307. }
  308. void TCPConnection::ServerSendQueuePushEnd(const uchar* data, sint32 size) {
  309. MSendQueue.lock();
  310. if (sendbuf == 0) {
  311. sendbuf = new uchar[size];
  312. sendbuf_size = size;
  313. sendbuf_used = 0;
  314. }
  315. else if (size > (sendbuf_size - sendbuf_used)) {
  316. sendbuf_size += size + 1024;
  317. uchar* tmp = new uchar[sendbuf_size];
  318. memcpy(tmp, sendbuf, sendbuf_used);
  319. safe_delete_array(sendbuf);
  320. sendbuf = tmp;
  321. }
  322. memcpy(&sendbuf[sendbuf_used], data, size);
  323. sendbuf_used += size;
  324. MSendQueue.unlock();
  325. }
  326. void TCPConnection::ServerSendQueuePushEnd(uchar** data, sint32 size) {
  327. MSendQueue.lock();
  328. if (sendbuf == 0) {
  329. sendbuf = *data;
  330. sendbuf_size = size;
  331. sendbuf_used = size;
  332. MSendQueue.unlock();
  333. *data = 0;
  334. return;
  335. }
  336. if (size > (sendbuf_size - sendbuf_used)) {
  337. sendbuf_size += size;
  338. uchar* tmp = new uchar[sendbuf_size];
  339. memcpy(tmp, sendbuf, sendbuf_used);
  340. safe_delete_array(sendbuf);
  341. sendbuf = tmp;
  342. }
  343. memcpy(&sendbuf[sendbuf_used], *data, size);
  344. sendbuf_used += size;
  345. MSendQueue.unlock();
  346. delete[] (TCPNetPacket_Struct*)*data;
  347. }
  348. void TCPConnection::ServerSendQueuePushFront(uchar* data, sint32 size) {
  349. MSendQueue.lock();
  350. if (sendbuf == 0) {
  351. sendbuf = new uchar[size];
  352. sendbuf_size = size;
  353. sendbuf_used = 0;
  354. }
  355. else if (size > (sendbuf_size - sendbuf_used)) {
  356. sendbuf_size += size;
  357. uchar* tmp = new uchar[sendbuf_size];
  358. memcpy(&tmp[size], sendbuf, sendbuf_used);
  359. safe_delete_array(sendbuf);
  360. sendbuf = tmp;
  361. }
  362. memcpy(sendbuf, data, size);
  363. sendbuf_used += size;
  364. MSendQueue.unlock();
  365. }
  366. bool TCPConnection::ServerSendQueuePop(uchar** data, sint32* size) {
  367. bool ret;
  368. if (!MSendQueue.trylock())
  369. return false;
  370. if (sendbuf) {
  371. *data = sendbuf;
  372. *size = sendbuf_used;
  373. sendbuf = 0;
  374. ret = true;
  375. }
  376. else {
  377. ret = false;
  378. }
  379. MSendQueue.unlock();
  380. return ret;
  381. }
  382. ServerPacket* TCPConnection::PopPacket() {
  383. ServerPacket* ret;
  384. if (!MOutQueueLock.trylock())
  385. return 0;
  386. ret = OutQueue.pop();
  387. MOutQueueLock.unlock();
  388. return ret;
  389. }
  390. char* TCPConnection::PopLine() {
  391. char* ret;
  392. if (!MOutQueueLock.trylock())
  393. return 0;
  394. ret = (char*) LineOutQueue.pop();
  395. MOutQueueLock.unlock();
  396. return ret;
  397. }
  398. void TCPConnection::OutQueuePush(ServerPacket* pack) {
  399. MOutQueueLock.lock();
  400. OutQueue.push(pack);
  401. MOutQueueLock.unlock();
  402. }
  403. void TCPConnection::LineOutQueuePush(char* line) {
  404. #if defined(GOTFRAGS) && 0
  405. if (strcmp(line, "**CRASHME**") == 0) {
  406. int i = 0;
  407. cout << (5 / i) << endl;
  408. }
  409. #endif
  410. if (strcmp(line, "**PACKETMODE**") == 0) {
  411. MSendQueue.lock();
  412. safe_delete_array(sendbuf);
  413. if (TCPMode == modeConsole)
  414. Send((const uchar*) "\0**PACKETMODE**\r", 16);
  415. TCPMode = modePacket;
  416. TCPNetPacket_Struct* tnps = 0;
  417. while ((tnps = InModeQueue.pop())) {
  418. SendPacket(tnps);
  419. safe_delete_array(tnps);
  420. }
  421. MSendQueue.unlock();
  422. safe_delete_array(line);
  423. return;
  424. }
  425. MOutQueueLock.lock();
  426. LineOutQueue.push(line);
  427. MOutQueueLock.unlock();
  428. }
  429. void TCPConnection::Disconnect(bool iSendRelayDisconnect) {
  430. if (connection_socket != INVALID_SOCKET && connection_socket != 0) {
  431. MState.lock();
  432. if (pState == TCPS_Connected || pState == TCPS_Disconnecting || pState == TCPS_Disconnected)
  433. SendData();
  434. pState = TCPS_Closing;
  435. MState.unlock();
  436. shutdown(connection_socket, 0x01);
  437. shutdown(connection_socket, 0x00);
  438. #ifdef WIN32
  439. closesocket(connection_socket);
  440. #else
  441. close(connection_socket);
  442. #endif
  443. connection_socket = 0;
  444. rIP = 0;
  445. rPort = 0;
  446. ClearBuffers();
  447. }
  448. SetState(TCPS_Ready);
  449. if (RelayLink) {
  450. RelayLink->RemoveRelay(this, iSendRelayDisconnect);
  451. RelayLink = 0;
  452. }
  453. }
  454. bool TCPConnection::GetAsyncConnect() {
  455. bool ret;
  456. MAsyncConnect.lock();
  457. ret = pAsyncConnect;
  458. MAsyncConnect.unlock();
  459. return ret;
  460. }
  461. bool TCPConnection::SetAsyncConnect(bool iValue) {
  462. bool ret;
  463. MAsyncConnect.lock();
  464. ret = pAsyncConnect;
  465. pAsyncConnect = iValue;
  466. MAsyncConnect.unlock();
  467. return ret;
  468. }
  469. void TCPConnection::AsyncConnect(char* irAddress, int16 irPort) {
  470. if (ConnectionType != Outgoing) {
  471. // If this code runs, we got serious problems
  472. // Crash and burn.
  473. ThrowError("TCPConnection::AsyncConnect() call on a Incomming connection object!");
  474. return;
  475. }
  476. if (GetState() != TCPS_Ready)
  477. return;
  478. MAsyncConnect.lock();
  479. if (pAsyncConnect) {
  480. MAsyncConnect.unlock();
  481. return;
  482. }
  483. pAsyncConnect = true;
  484. safe_delete_array(charAsyncConnect);
  485. charAsyncConnect = new char[strlen(irAddress) + 1];
  486. strcpy(charAsyncConnect, irAddress);
  487. rPort = irPort;
  488. MAsyncConnect.unlock();
  489. if (!pRunLoop) {
  490. pRunLoop = true;
  491. #ifdef WIN32
  492. _beginthread(TCPConnectionLoop, 0, this);
  493. #else
  494. pthread_t thread;
  495. pthread_create(&thread, NULL, TCPConnectionLoop, this);
  496. pthread_detach(thread);
  497. #endif
  498. }
  499. return;
  500. }
  501. void TCPConnection::AsyncConnect(int32 irIP, int16 irPort) {
  502. if (ConnectionType != Outgoing) {
  503. // If this code runs, we got serious problems
  504. // Crash and burn.
  505. ThrowError("TCPConnection::AsyncConnect() call on a Incomming connection object!");
  506. return;
  507. }
  508. if (GetState() != TCPS_Ready)
  509. return;
  510. MAsyncConnect.lock();
  511. if (pAsyncConnect) {
  512. MAsyncConnect.unlock();
  513. return;
  514. }
  515. pAsyncConnect = true;
  516. safe_delete(charAsyncConnect);
  517. rIP = irIP;
  518. rPort = irPort;
  519. MAsyncConnect.unlock();
  520. if (!pRunLoop) {
  521. pRunLoop = true;
  522. #ifdef WIN32
  523. _beginthread(TCPConnectionLoop, 0, this);
  524. #else
  525. pthread_t thread;
  526. pthread_create(&thread, NULL, TCPConnectionLoop, this);
  527. pthread_detach(thread);
  528. #endif
  529. }
  530. return;
  531. }
  532. bool TCPConnection::Connect(char* irAddress, int16 irPort, char* errbuf) {
  533. if (errbuf)
  534. errbuf[0] = 0;
  535. int32 tmpIP = ResolveIP(irAddress);
  536. if (!tmpIP) {
  537. if (errbuf) {
  538. #ifdef WIN32
  539. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): Couldnt resolve hostname. Error: %i", WSAGetLastError());
  540. #else
  541. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): Couldnt resolve hostname. Error #%i: %s", errno, strerror(errno));
  542. #endif
  543. }
  544. return false;
  545. }
  546. return Connect(tmpIP, irPort, errbuf);
  547. }
  548. bool TCPConnection::Connect(int32 in_ip, int16 in_port, char* errbuf) {
  549. if (errbuf)
  550. errbuf[0] = 0;
  551. if (ConnectionType != Outgoing) {
  552. // If this code runs, we got serious problems
  553. // Crash and burn.
  554. ThrowError("TCPConnection::Connect() call on a Incomming connection object!");
  555. return false;
  556. }
  557. MState.lock();
  558. if (pState == TCPS_Ready) {
  559. pState = TCPS_Connecting;
  560. }
  561. else {
  562. MState.unlock();
  563. SetAsyncConnect(false);
  564. return false;
  565. }
  566. MState.unlock();
  567. if (!pRunLoop) {
  568. pRunLoop = true;
  569. #ifdef WIN32
  570. _beginthread(TCPConnectionLoop, 0, this);
  571. #else
  572. pthread_t thread;
  573. pthread_create(&thread, NULL, TCPConnectionLoop, this);
  574. pthread_detach(thread);
  575. #endif
  576. }
  577. connection_socket = INVALID_SOCKET;
  578. struct sockaddr_in server_sin;
  579. // struct in_addr in;
  580. if ((connection_socket = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET || connection_socket == 0) {
  581. #ifdef WIN32
  582. if (errbuf)
  583. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): Allocating socket failed. Error: %i", WSAGetLastError());
  584. #else
  585. if (errbuf)
  586. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): Allocating socket failed. Error: %s", strerror(errno));
  587. #endif
  588. SetState(TCPS_Ready);
  589. SetAsyncConnect(false);
  590. return false;
  591. }
  592. server_sin.sin_family = AF_INET;
  593. server_sin.sin_addr.s_addr = in_ip;
  594. server_sin.sin_port = htons(in_port);
  595. // Establish a connection to the server socket.
  596. #ifdef WIN32
  597. if (connect(connection_socket, (PSOCKADDR) &server_sin, sizeof (server_sin)) == SOCKET_ERROR) {
  598. if (errbuf)
  599. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): connect() failed. Error: %i", WSAGetLastError());
  600. closesocket(connection_socket);
  601. connection_socket = 0;
  602. SetState(TCPS_Ready);
  603. SetAsyncConnect(false);
  604. return false;
  605. }
  606. #else
  607. if (connect(connection_socket, (struct sockaddr *) &server_sin, sizeof (server_sin)) == SOCKET_ERROR) {
  608. if (errbuf)
  609. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): connect() failed. Error: %s", strerror(errno));
  610. close(connection_socket);
  611. connection_socket = 0;
  612. SetState(TCPS_Ready);
  613. SetAsyncConnect(false);
  614. return false;
  615. }
  616. #endif
  617. int bufsize = 64 * 1024; // 64kbyte recieve buffer, up from default of 8k
  618. setsockopt(connection_socket, SOL_SOCKET, SO_RCVBUF, (char*) &bufsize, sizeof(bufsize));
  619. #ifdef WIN32
  620. unsigned long nonblocking = 1;
  621. ioctlsocket(connection_socket, FIONBIO, &nonblocking);
  622. #else
  623. fcntl(connection_socket, F_SETFL, O_NONBLOCK);
  624. #endif
  625. SetEcho(false);
  626. MSendQueue.lock();
  627. ClearBuffers();
  628. TCPMode = modePacket;
  629. MSendQueue.unlock();
  630. rIP = in_ip;
  631. rPort = in_port;
  632. SetState(TCPS_Connected);
  633. SetAsyncConnect(false);
  634. return true;
  635. }
  636. void TCPConnection::ClearBuffers() {
  637. LockMutex lock1(&MSendQueue);
  638. LockMutex lock2(&MOutQueueLock);
  639. LockMutex lock3(&MRunLoop);
  640. LockMutex lock4(&MState);
  641. safe_delete_array(recvbuf);
  642. safe_delete_array(sendbuf);
  643. ServerPacket* pack = 0;
  644. while ((pack = PopPacket()))
  645. safe_delete(pack);
  646. TCPNetPacket_Struct* tnps = 0;
  647. while ((tnps = InModeQueue.pop()))
  648. safe_delete(tnps);
  649. char* line = 0;
  650. while ((line = LineOutQueue.pop()))
  651. safe_delete_array(line);
  652. keepalive_timer->Start();
  653. timeout_timer->Start();
  654. }
  655. bool TCPConnection::CheckNetActive() {
  656. MState.lock();
  657. if (pState == TCPS_Connected || pState == TCPS_Disconnecting) {
  658. MState.unlock();
  659. return true;
  660. }
  661. MState.unlock();
  662. return false;
  663. }
  664. bool TCPConnection::Process() {
  665. char errbuf[TCPConnection_ErrorBufferSize];
  666. if (!CheckNetActive()) {
  667. if (ConnectionType == Outgoing) {
  668. if (GetAsyncConnect()) {
  669. if (charAsyncConnect)
  670. rIP = ResolveIP(charAsyncConnect);
  671. Connect(rIP, rPort);
  672. }
  673. }
  674. if (GetState() == TCPS_Disconnected) {
  675. Disconnect();
  676. return false;
  677. }
  678. else if (GetState() == TCPS_Connecting)
  679. return true;
  680. else
  681. return false;
  682. }
  683. if (!SendData(errbuf)) {
  684. struct in_addr in;
  685. in.s_addr = GetrIP();
  686. cout << inet_ntoa(in) << ":" << GetrPort() << ": " << errbuf << endl;
  687. return false;
  688. }
  689. if (!Connected())
  690. return false;
  691. if (!RecvData(errbuf)) {
  692. struct in_addr in;
  693. in.s_addr = GetrIP();
  694. cout << inet_ntoa(in) << ":" << GetrPort() << ": " << errbuf << endl;
  695. return false;
  696. }
  697. return true;
  698. }
  699. bool TCPConnection::RecvData(char* errbuf) {
  700. if (errbuf)
  701. errbuf[0] = 0;
  702. if (!Connected()) {
  703. return false;
  704. }
  705. int status = 0;
  706. if (recvbuf == 0) {
  707. recvbuf = new uchar[5120];
  708. recvbuf_size = 5120;
  709. recvbuf_used = 0;
  710. recvbuf_echo = 0;
  711. }
  712. else if ((recvbuf_size - recvbuf_used) < 2048) {
  713. uchar* tmpbuf = new uchar[recvbuf_size + 5120];
  714. memcpy(tmpbuf, recvbuf, recvbuf_used);
  715. recvbuf_size += 5120;
  716. safe_delete_array(recvbuf);
  717. recvbuf = tmpbuf;
  718. if (recvbuf_size >= MaxTCPReceiveBufferSize) {
  719. if (errbuf)
  720. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::RecvData(): recvbuf_size >= MaxTCPReceiveBufferSize");
  721. return false;
  722. }
  723. }
  724. status = recv(connection_socket, (char *) &recvbuf[recvbuf_used], (recvbuf_size - recvbuf_used), 0);
  725. if (status >= 1) {
  726. #if TCPN_LOG_RAW_DATA_IN >= 1
  727. struct in_addr in;
  728. in.s_addr = GetrIP();
  729. CoutTimestamp(true);
  730. cout << ": Read " << status << " bytes from network. (recvbuf_used = " << recvbuf_used << ") " << inet_ntoa(in) << ":" << GetrPort();
  731. if (pOldFormat)
  732. cout << " (OldFormat)";
  733. cout << endl;
  734. #if TCPN_LOG_RAW_DATA_IN == 2
  735. sint32 tmp = status;
  736. if (tmp > 32)
  737. tmp = 32;
  738. DumpPacket(&recvbuf[recvbuf_used], status);
  739. #elif TCPN_LOG_RAW_DATA_IN >= 3
  740. DumpPacket(&recvbuf[recvbuf_used], status);
  741. #endif
  742. #endif
  743. recvbuf_used += status;
  744. timeout_timer->Start();
  745. if (!ProcessReceivedData(errbuf))
  746. return false;
  747. }
  748. else if (status == SOCKET_ERROR) {
  749. #ifdef WIN32
  750. if (!(WSAGetLastError() == WSAEWOULDBLOCK)) {
  751. if (errbuf)
  752. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::RecvData(): Error: %i", WSAGetLastError());
  753. return false;
  754. }
  755. #else
  756. if (!(errno == EWOULDBLOCK)) {
  757. if (errbuf)
  758. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::RecvData(): Error: %s", strerror(errno));
  759. return false;
  760. }
  761. #endif
  762. }
  763. if ((TCPMode == modePacket || TCPMode == modeTransition) && timeout_timer->Check()) {
  764. if (errbuf)
  765. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::RecvData(): Connection timeout");
  766. return false;
  767. }
  768. return true;
  769. }
  770. bool TCPConnection::GetEcho() {
  771. bool ret;
  772. MEcho.lock();
  773. ret = pEcho;
  774. MEcho.unlock();
  775. return ret;
  776. }
  777. void TCPConnection::SetEcho(bool iValue) {
  778. MEcho.lock();
  779. pEcho = iValue;
  780. MEcho.unlock();
  781. }
  782. bool TCPConnection::ProcessReceivedData(char* errbuf) {
  783. if (errbuf)
  784. errbuf[0] = 0;
  785. if (!recvbuf)
  786. return true;
  787. if (TCPMode == modePacket) {
  788. //if (pOldFormat)
  789. // return ProcessReceivedDataAsOldPackets(errbuf);
  790. //else
  791. return ProcessReceivedDataAsPackets(errbuf);
  792. }
  793. else {
  794. #if TCPN_DEBUG_Console >= 4
  795. if (recvbuf_used) {
  796. cout << "Starting Processing: recvbuf=" << recvbuf_used << endl;
  797. DumpPacket(recvbuf, recvbuf_used);
  798. }
  799. #endif
  800. for (int i=0; i < recvbuf_used; i++) {
  801. if (GetEcho() && i >= recvbuf_echo) {
  802. Send(&recvbuf[i], 1);
  803. recvbuf_echo = i + 1;
  804. }
  805. switch(recvbuf[i]) {
  806. case 0: { // 0 is the code for clear buffer
  807. if (i==0) {
  808. recvbuf_used--;
  809. recvbuf_echo--;
  810. memcpy(recvbuf, &recvbuf[1], recvbuf_used);
  811. i = -1;
  812. } else {
  813. if (i == recvbuf_used) {
  814. safe_delete_array(recvbuf);
  815. i = -1;
  816. }
  817. else {
  818. uchar* tmpdel = recvbuf;
  819. recvbuf = new uchar[recvbuf_size];
  820. memcpy(recvbuf, &tmpdel[i+1], recvbuf_used-i);
  821. recvbuf_used -= i + 1;
  822. recvbuf_echo -= i + 1;
  823. safe_delete(tmpdel);
  824. i = -1;
  825. }
  826. }
  827. #if TCPN_DEBUG_Console >= 5
  828. cout << "Removed 0x00" << endl;
  829. if (recvbuf_used) {
  830. cout << "recvbuf left: " << recvbuf_used << endl;
  831. DumpPacket(recvbuf, recvbuf_used);
  832. }
  833. else
  834. cout << "recbuf left: None" << endl;
  835. #endif
  836. break;
  837. }
  838. case 10:
  839. case 13: // newline marker
  840. {
  841. if (i==0) { // empty line
  842. recvbuf_used--;
  843. recvbuf_echo--;
  844. memcpy(recvbuf, &recvbuf[1], recvbuf_used);
  845. i = -1;
  846. } else {
  847. char* line = new char[i+1];
  848. memset(line, 0, i+1);
  849. memcpy(line, recvbuf, i);
  850. #if TCPN_DEBUG_Console >= 3
  851. cout << "Line Out: " << endl;
  852. DumpPacket((uchar*) line, i);
  853. #endif
  854. //line[i] = 0;
  855. uchar* tmpdel = recvbuf;
  856. recvbuf = new uchar[recvbuf_size];
  857. recvbuf_used -= i+1;
  858. recvbuf_echo -= i+1;
  859. memcpy(recvbuf, &tmpdel[i+1], recvbuf_used);
  860. #if TCPN_DEBUG_Console >= 5
  861. cout << "i+1=" << i+1 << endl;
  862. if (recvbuf_used) {
  863. cout << "recvbuf left: " << recvbuf_used << endl;
  864. DumpPacket(recvbuf, recvbuf_used);
  865. }
  866. else
  867. cout << "recbuf left: None" << endl;
  868. #endif
  869. safe_delete(tmpdel);
  870. if (strlen(line) > 0)
  871. LineOutQueuePush(line);
  872. else
  873. safe_delete_array(line);
  874. if (TCPMode == modePacket) {
  875. return ProcessReceivedDataAsPackets(errbuf);
  876. }
  877. i = -1;
  878. }
  879. break;
  880. }
  881. case 8: // backspace
  882. {
  883. if (i==0) { // nothin to backspace
  884. recvbuf_used--;
  885. recvbuf_echo--;
  886. memcpy(recvbuf, &recvbuf[1], recvbuf_used);
  887. i = -1;
  888. } else {
  889. uchar* tmpdel = recvbuf;
  890. recvbuf = new uchar[recvbuf_size];
  891. memcpy(recvbuf, tmpdel, i-1);
  892. memcpy(&recvbuf[i-1], &tmpdel[i+1], recvbuf_used-i);
  893. recvbuf_used -= 2;
  894. recvbuf_echo -= 2;
  895. safe_delete(tmpdel);
  896. i -= 2;
  897. }
  898. break;
  899. }
  900. }
  901. }
  902. if (recvbuf_used < 0)
  903. safe_delete_array(recvbuf);
  904. }
  905. return true;
  906. }
  907. bool TCPConnection::ProcessReceivedDataAsPackets(char* errbuf) {
  908. if (errbuf)
  909. errbuf[0] = 0;
  910. sint32 base = 0;
  911. sint32 size = 7;
  912. uchar* buffer;
  913. ServerPacket* pack = 0;
  914. while ((recvbuf_used - base) >= size) {
  915. TCPNetPacket_Struct* tnps = (TCPNetPacket_Struct*) &recvbuf[base];
  916. buffer = tnps->buffer;
  917. size = tnps->size;
  918. if (size >= MaxTCPReceiveBufferSize) {
  919. #if TCPN_DEBUG_Memory >= 1
  920. cout << "TCPConnection[" << GetID() << "]::ProcessReceivedDataAsPackets(): size[" << size << "] >= MaxTCPReceiveBufferSize" << endl;
  921. #endif
  922. if (errbuf)
  923. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::ProcessReceivedDataAsPackets(): size >= MaxTCPReceiveBufferSize");
  924. return false;
  925. }
  926. if ((recvbuf_used - base) >= size) {
  927. // ok, we got enough data to make this packet!
  928. safe_delete(pack);
  929. pack = new ServerPacket;
  930. pack->size = size - sizeof(TCPNetPacket_Struct);
  931. // read headers
  932. pack->opcode = tnps->opcode;
  933. if (tnps->flags.compressed) {
  934. pack->compressed = true;
  935. pack->InflatedSize = *((sint32*)buffer);
  936. pack->size -= 4;
  937. buffer += 4;
  938. }
  939. if (tnps->flags.destination) {
  940. pack->destination = *((sint32*)buffer);
  941. pack->size -= 4;
  942. buffer += 4;
  943. }
  944. // end read headers
  945. if (pack->size > 0) {
  946. if (tnps->flags.compressed) {
  947. // Lets decompress the packet here
  948. pack->compressed = false;
  949. if(pack->InflatedSize < MaxTCPReceiveBufferSize)
  950. {
  951. pack->pBuffer = new uchar[pack->InflatedSize];
  952. pack->size = InflatePacket(buffer, pack->size, pack->pBuffer, pack->InflatedSize);
  953. }
  954. else
  955. {
  956. cout << "Invalid inflated packet." << endl;
  957. safe_delete(pack);
  958. return false;
  959. }
  960. }
  961. else {
  962. pack->pBuffer = new uchar[pack->size];
  963. memcpy(pack->pBuffer, buffer, pack->size);
  964. }
  965. }
  966. if (pack->opcode == 0) {
  967. if (pack->size) {
  968. #if TCPN_DEBUG >= 2
  969. cout << "Received TCP Network layer packet" << endl;
  970. #endif
  971. ProcessNetworkLayerPacket(pack);
  972. }
  973. #if TCPN_DEBUG >= 5
  974. else {
  975. cout << "Received TCP keepalive packet. (opcode=0)" << endl;
  976. }
  977. #endif
  978. }
  979. else {
  980. #if TCPN_LOG_PACKETS >= 1
  981. if (pack && pack->opcode != 0) {
  982. struct in_addr in;
  983. in.s_addr = GetrIP();
  984. CoutTimestamp(true);
  985. cout << ": Logging incoming TCP packet. OPCode: 0x" << hex << setw(4) << setfill('0') << pack->opcode << dec << ", size: " << setw(5) << setfill(' ') << pack->size << " " << inet_ntoa(in) << ":" << GetrPort() << endl;
  986. #if TCPN_LOG_PACKETS == 2
  987. if (pack->size >= 32)
  988. DumpPacket(pack->pBuffer, 32);
  989. else
  990. DumpPacket(pack);
  991. #endif
  992. #if TCPN_LOG_PACKETS >= 3
  993. DumpPacket(pack);
  994. #endif
  995. }
  996. #endif
  997. if (RelayServer && Server && pack->destination) {
  998. TCPConnection* con = Server->GetConnection(pack->destination);
  999. if (!con) {
  1000. #if TCPN_DEBUG >= 1
  1001. cout << "Error relaying packet: con = 0" << endl;
  1002. #endif
  1003. }
  1004. else{
  1005. con->OutQueuePush(pack);
  1006. pack = 0;
  1007. }
  1008. }
  1009. else{
  1010. OutQueuePush(pack);
  1011. pack = 0;
  1012. }
  1013. }
  1014. base += size;
  1015. size = 7;
  1016. }
  1017. }
  1018. safe_delete(pack);
  1019. if (base != 0) {
  1020. if (base >= recvbuf_used) {
  1021. safe_delete_array(recvbuf);
  1022. }
  1023. else {
  1024. uchar* tmpbuf = new uchar[recvbuf_size - base];
  1025. memcpy(tmpbuf, &recvbuf[base], recvbuf_used - base);
  1026. safe_delete_array(recvbuf);
  1027. recvbuf = tmpbuf;
  1028. recvbuf_used -= base;
  1029. recvbuf_size -= base;
  1030. }
  1031. }
  1032. return true;
  1033. }
  1034. bool TCPConnection::ProcessReceivedDataAsOldPackets(char* errbuf) {
  1035. sint32 base = 0;
  1036. sint32 size = 4;
  1037. uchar* buffer;
  1038. ServerPacket* pack = 0;
  1039. while ((recvbuf_used - base) >= size) {
  1040. buffer = &recvbuf[base];
  1041. memcpy(&size, &buffer[2], 2);
  1042. if (size >= MaxTCPReceiveBufferSize) {
  1043. #if TCPN_DEBUG_Memory >= 1
  1044. cout << "TCPConnection[" << GetID() << "]::ProcessReceivedDataAsPackets(): size[" << size << "] >= MaxTCPReceiveBufferSize" << endl;
  1045. #endif
  1046. if (errbuf)
  1047. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::ProcessReceivedDataAsPackets(): size >= MaxTCPReceiveBufferSize");
  1048. return false;
  1049. }
  1050. if ((recvbuf_used - base) >= size) {
  1051. // ok, we got enough data to make this packet!
  1052. pack = new ServerPacket;
  1053. memcpy(&pack->opcode, &buffer[0], 2);
  1054. pack->size = size - 4;
  1055. LogWrite(MISC__TODO, 1, "TODO", "Checksum or size check or something similar\n\t(%s, function: %s, line #: %i)", __FILE__, __FUNCTION__, __LINE__);
  1056. /*
  1057. if () { // TODO: Checksum or size check or something similar
  1058. // Datastream corruption, get the hell outta here!
  1059. delete pack;
  1060. return false;
  1061. }
  1062. */
  1063. if (pack->size > 0) {
  1064. pack->pBuffer = new uchar[pack->size];
  1065. memcpy(pack->pBuffer, &buffer[4], pack->size);
  1066. }
  1067. if (pack->opcode == 0) {
  1068. // keepalive, no need to process
  1069. safe_delete(pack);
  1070. }
  1071. else {
  1072. #if TCPN_LOG_PACKETS >= 1
  1073. if (pack && pack->opcode != 0) {
  1074. struct in_addr in;
  1075. in.s_addr = GetrIP();
  1076. CoutTimestamp(true);
  1077. cout << ": Logging incoming TCP OldPacket. OPCode: 0x" << hex << setw(4) << setfill('0') << pack->opcode << dec << ", size: " << setw(5) << setfill(' ') << pack->size << " " << inet_ntoa(in) << ":" << GetrPort() << endl;
  1078. #if TCPN_LOG_PACKETS == 2
  1079. if (pack->size >= 32)
  1080. DumpPacket(pack->pBuffer, 32);
  1081. else
  1082. DumpPacket(pack);
  1083. #endif
  1084. #if TCPN_LOG_PACKETS >= 3
  1085. DumpPacket(pack);
  1086. #endif
  1087. }
  1088. #endif
  1089. OutQueuePush(pack);
  1090. }
  1091. base += size;
  1092. size = 4;
  1093. }
  1094. }
  1095. if (base != 0) {
  1096. if (base >= recvbuf_used) {
  1097. safe_delete_array(recvbuf);
  1098. }
  1099. else {
  1100. uchar* tmpbuf = new uchar[recvbuf_size - base];
  1101. memcpy(tmpbuf, &recvbuf[base], recvbuf_used - base);
  1102. safe_delete_array(recvbuf);
  1103. recvbuf = tmpbuf;
  1104. recvbuf_used -= base;
  1105. recvbuf_size -= base;
  1106. }
  1107. }
  1108. return true;
  1109. }
  1110. void TCPConnection::ProcessNetworkLayerPacket(ServerPacket* pack) {
  1111. int8 opcode = pack->pBuffer[0];
  1112. /** disabling RELAY capabilities, this functionality is poorly implemented
  1113. even if such a feature needs to be re-used need authentication BEFORE allowing relay to take place
  1114. secondly we need to protect the LS accepting new connections as bogus data can be passed to open
  1115. fake TCP connections
  1116. opcode 0 is OK, that is Keep-Alive
  1117. **/
  1118. if (opcode > 0)
  1119. {
  1120. Disconnect();
  1121. return;
  1122. }
  1123. int8* data = &pack->pBuffer[1];
  1124. switch (opcode) {
  1125. case 0: {
  1126. break;
  1127. }
  1128. case 1: { // Switch to RelayServer mode
  1129. if (pack->size != 1) {
  1130. SendNetErrorPacket("New RelayClient: wrong size, expected 1");
  1131. break;
  1132. }
  1133. if (RelayServer) {
  1134. SendNetErrorPacket("Switch to RelayServer mode when already in RelayServer mode");
  1135. break;
  1136. }
  1137. if (RemoteID) {
  1138. SendNetErrorPacket("Switch to RelayServer mode by a Relay Client");
  1139. break;
  1140. }
  1141. if (ConnectionType != Incomming) {
  1142. SendNetErrorPacket("Switch to RelayServer mode on outgoing connection");
  1143. break;
  1144. }
  1145. #if TCPC_DEBUG >= 3
  1146. struct in_addr in;
  1147. in.s_addr = GetrIP();
  1148. cout << "Switching to RelayServer mode: " << inet_ntoa(in) << ":" << GetPort() << endl;
  1149. #endif
  1150. RelayServer = true;
  1151. break;
  1152. }
  1153. case 2: { // New Relay Client
  1154. if (!RelayServer) {
  1155. SendNetErrorPacket("New RelayClient when not in RelayServer mode");
  1156. break;
  1157. }
  1158. if (pack->size != 11) {
  1159. SendNetErrorPacket("New RelayClient: wrong size, expected 11");
  1160. break;
  1161. }
  1162. if (ConnectionType != Incomming) {
  1163. SendNetErrorPacket("New RelayClient: illegal on outgoing connection");
  1164. break;
  1165. }
  1166. TCPConnection* con = new TCPConnection(Server, this, *((int32*) data), *((int32*) &data[4]), *((int16*) &data[8]));
  1167. Server->AddConnection(con);
  1168. RelayCount++;
  1169. break;
  1170. }
  1171. case 3: { // Delete Relay Client
  1172. if (!RelayServer) {
  1173. SendNetErrorPacket("Delete RelayClient when not in RelayServer mode");
  1174. break;
  1175. }
  1176. if (pack->size != 5) {
  1177. SendNetErrorPacket("Delete RelayClient: wrong size, expected 5");
  1178. break;
  1179. }
  1180. TCPConnection* con = Server->GetConnection(*((int32*)data));
  1181. if (con) {
  1182. if (ConnectionType == Incomming) {
  1183. if (con->GetRelayLink() != this) {
  1184. SendNetErrorPacket("Delete RelayClient: RelayLink != this");
  1185. break;
  1186. }
  1187. }
  1188. con->Disconnect(false);
  1189. }
  1190. break;
  1191. }
  1192. case 255: {
  1193. #if TCPC_DEBUG >= 1
  1194. struct in_addr in;
  1195. in.s_addr = GetrIP();
  1196. cout "Received NetError: '";
  1197. if (pack->size > 1)
  1198. cout << (char*) data;
  1199. cout << "': " << inet_ntoa(in) << ":" << GetPort() << endl;
  1200. #endif
  1201. break;
  1202. }
  1203. }
  1204. }
  1205. void TCPConnection::SendNetErrorPacket(const char* reason) {
  1206. #if TCPC_DEBUG >= 1
  1207. struct in_addr in;
  1208. in.s_addr = GetrIP();
  1209. cout "NetError: '";
  1210. if (reason)
  1211. cout << reason;
  1212. cout << "': " << inet_ntoa(in) << ":" << GetPort() << endl;
  1213. #endif
  1214. ServerPacket* pack = new ServerPacket(0);
  1215. pack->size = 1;
  1216. if (reason)
  1217. pack->size += strlen(reason) + 1;
  1218. pack->pBuffer = new uchar[pack->size];
  1219. memset(pack->pBuffer, 0, pack->size);
  1220. pack->pBuffer[0] = 255;
  1221. strcpy((char*) &pack->pBuffer[1], reason);
  1222. SendPacket(pack);
  1223. safe_delete(pack);
  1224. }
  1225. void TCPConnection::RemoveRelay(TCPConnection* relay, bool iSendRelayDisconnect) {
  1226. if (iSendRelayDisconnect) {
  1227. ServerPacket* pack = new ServerPacket(0, 5);
  1228. pack->pBuffer[0] = 3;
  1229. *((int32*) &pack->pBuffer[1]) = relay->GetRemoteID();
  1230. SendPacket(pack);
  1231. safe_delete(pack);
  1232. }
  1233. RelayCount--;
  1234. }
  1235. bool TCPConnection::SendData(char* errbuf) {
  1236. if (errbuf)
  1237. errbuf[0] = 0;
  1238. /************ Get first send packet on queue and send it! ************/
  1239. uchar* data = 0;
  1240. sint32 size = 0;
  1241. int status = 0;
  1242. if (ServerSendQueuePop(&data, &size)) {
  1243. #ifdef WIN32
  1244. status = send(connection_socket, (const char *) data, size, 0);
  1245. #else
  1246. status = send(connection_socket, data, size, MSG_NOSIGNAL);
  1247. if(errno==EPIPE) status = SOCKET_ERROR;
  1248. #endif
  1249. if (status >= 1) {
  1250. #if TCPN_LOG_RAW_DATA_OUT >= 1
  1251. struct in_addr in;
  1252. in.s_addr = GetrIP();
  1253. CoutTimestamp(true);
  1254. cout << ": Wrote " << status << " bytes to network. " << inet_ntoa(in) << ":" << GetrPort();
  1255. if (pOldFormat)
  1256. cout << " (OldFormat)";
  1257. cout << endl;
  1258. #if TCPN_LOG_RAW_DATA_OUT == 2
  1259. sint32 tmp = status;
  1260. if (tmp > 32)
  1261. tmp = 32;
  1262. DumpPacket(data, status);
  1263. #elif TCPN_LOG_RAW_DATA_OUT >= 3
  1264. DumpPacket(data, status);
  1265. #endif
  1266. #endif
  1267. keepalive_timer->Start();
  1268. if (status < (signed)size) {
  1269. #if TCPN_LOG_RAW_DATA_OUT >= 1
  1270. struct in_addr in;
  1271. in.s_addr = GetrIP();
  1272. CoutTimestamp(true);
  1273. cout << ": Pushed " << (size - status) << " bytes back onto the send queue. " << inet_ntoa(in) << ":" << GetrPort();
  1274. if (pOldFormat)
  1275. cout << " (OldFormat)";
  1276. cout << endl;
  1277. #endif
  1278. // If there's network congestion, the number of bytes sent can be less than
  1279. // what we tried to give it... Push the extra back on the queue for later
  1280. ServerSendQueuePushFront(&data[status], size - status);
  1281. }
  1282. else if (status > (signed)size) {
  1283. ThrowError("TCPConnection::SendData(): WTF! status > size");
  1284. return false;
  1285. }
  1286. // else if (status == size) {}
  1287. }
  1288. else {
  1289. ServerSendQueuePushFront(data, size);
  1290. }
  1291. safe_delete_array(data);
  1292. if (status == SOCKET_ERROR) {
  1293. #ifdef WIN32
  1294. if (WSAGetLastError() != WSAEWOULDBLOCK)
  1295. #else
  1296. if (errno != EWOULDBLOCK)
  1297. #endif
  1298. {
  1299. if (errbuf) {
  1300. #ifdef WIN32
  1301. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::SendData(): send(): Errorcode: %i", WSAGetLastError());
  1302. #else
  1303. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::SendData(): send(): Errorcode: %s", strerror(errno));
  1304. #endif
  1305. }
  1306. return false;
  1307. }
  1308. }
  1309. }
  1310. if (TCPMode == modePacket && keepalive_timer->Check()) {
  1311. ServerPacket* pack = new ServerPacket(0, 0);
  1312. SendPacket(pack);
  1313. safe_delete(pack);
  1314. #if TCPN_DEBUG >= 5
  1315. cout << "Sending TCP keepalive packet. (timeout=" << timeout_timer->GetRemainingTime() << " remaining)" << endl;
  1316. #endif
  1317. }
  1318. return true;
  1319. }
  1320. ThreadReturnType TCPConnectionLoop(void* tmp) {
  1321. #ifdef WIN32
  1322. SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
  1323. #endif
  1324. if (tmp == 0) {
  1325. ThrowError("TCPConnectionLoop(): tmp = 0!");
  1326. THREAD_RETURN(NULL);
  1327. }
  1328. TCPConnection* tcpc = (TCPConnection*) tmp;
  1329. tcpc->MLoopRunning.lock();
  1330. while (tcpc->RunLoop()) {
  1331. Sleep(LOOP_GRANULARITY);
  1332. if (tcpc->GetState() != TCPS_Ready) {
  1333. if (!tcpc->Process()) {
  1334. tcpc->Disconnect();
  1335. }
  1336. }
  1337. else if (tcpc->GetAsyncConnect()) {
  1338. if (tcpc->charAsyncConnect)
  1339. tcpc->Connect(tcpc->charAsyncConnect, tcpc->GetrPort());
  1340. else
  1341. tcpc->Connect(tcpc->GetrIP(), tcpc->GetrPort());
  1342. tcpc->SetAsyncConnect(false);
  1343. }
  1344. else
  1345. Sleep(10);
  1346. }
  1347. tcpc->MLoopRunning.unlock();
  1348. THREAD_RETURN(NULL);
  1349. }
  1350. bool TCPConnection::RunLoop() {
  1351. bool ret;
  1352. MRunLoop.lock();
  1353. ret = pRunLoop;
  1354. MRunLoop.unlock();
  1355. return ret;
  1356. }
  1357. TCPServer::TCPServer(int16 in_port, bool iOldFormat) {
  1358. NextID = 1;
  1359. pPort = in_port;
  1360. sock = 0;
  1361. pOldFormat = iOldFormat;
  1362. list = new LinkedList<TCPConnection*>;
  1363. pRunLoop = true;
  1364. #ifdef WIN32
  1365. _beginthread(TCPServerLoop, 0, this);
  1366. #else
  1367. pthread_t thread;
  1368. pthread_create(&thread, NULL, &TCPServerLoop, this);
  1369. pthread_detach(thread);
  1370. #endif
  1371. }
  1372. TCPServer::~TCPServer() {
  1373. MRunLoop.lock();
  1374. pRunLoop = false;
  1375. MRunLoop.unlock();
  1376. MLoopRunning.lock();
  1377. MLoopRunning.unlock();
  1378. while (NewQueue.pop()); // the objects are deleted with the list, clear this queue so it doesnt try to delete them again
  1379. safe_delete(list);
  1380. }
  1381. bool TCPServer::RunLoop() {
  1382. bool ret;
  1383. MRunLoop.lock();
  1384. ret = pRunLoop;
  1385. MRunLoop.unlock();
  1386. return ret;
  1387. }
  1388. ThreadReturnType TCPServerLoop(void* tmp) {
  1389. #ifdef WIN32
  1390. SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
  1391. #endif
  1392. if (tmp == 0) {
  1393. ThrowError("TCPServerLoop(): tmp = 0!");
  1394. THREAD_RETURN(NULL);
  1395. }
  1396. TCPServer* tcps = (TCPServer*) tmp;
  1397. tcps->MLoopRunning.lock();
  1398. while (tcps->RunLoop()) {
  1399. Sleep(SERVER_LOOP_GRANULARITY);
  1400. tcps->Process();
  1401. }
  1402. tcps->MLoopRunning.unlock();
  1403. THREAD_RETURN(NULL);
  1404. }
  1405. void TCPServer::Process() {
  1406. CheckInQueue();
  1407. ListenNewConnections();
  1408. LinkedListIterator<TCPConnection*> iterator(*list);
  1409. iterator.Reset();
  1410. while(iterator.MoreElements()) {
  1411. if (iterator.GetData()->IsFree() && (!iterator.GetData()->CheckNetActive())) {
  1412. #if _DEBUG
  1413. LogWrite(NET__DEBUG, 0, "Net", "EQStream Connection deleted.");
  1414. #endif
  1415. iterator.RemoveCurrent();
  1416. }
  1417. else {
  1418. if (!iterator.GetData()->Process())
  1419. iterator.GetData()->Disconnect();
  1420. iterator.Advance();
  1421. }
  1422. }
  1423. }
  1424. void TCPServer::ListenNewConnections() {
  1425. SOCKET tmpsock;
  1426. struct sockaddr_in from;
  1427. struct in_addr in;
  1428. unsigned int fromlen;
  1429. TCPConnection* con;
  1430. from.sin_family = AF_INET;
  1431. fromlen = sizeof(from);
  1432. LockMutex lock(&MSock);
  1433. if (!sock)
  1434. return;
  1435. // Check for pending connects
  1436. #ifdef WIN32
  1437. unsigned long nonblocking = 1;
  1438. while ((tmpsock = accept(sock, (struct sockaddr*) &from, (int *) &fromlen)) != INVALID_SOCKET) {
  1439. ioctlsocket (tmpsock, FIONBIO, &nonblocking);
  1440. #else
  1441. while ((tmpsock = accept(sock, (struct sockaddr*) &from, &fromlen)) != INVALID_SOCKET) {
  1442. fcntl(tmpsock, F_SETFL, O_NONBLOCK);
  1443. #endif
  1444. int bufsize = 64 * 1024; // 64kbyte recieve buffer, up from default of 8k
  1445. setsockopt(tmpsock, SOL_SOCKET, SO_RCVBUF, (char*) &bufsize, sizeof(bufsize));
  1446. in.s_addr = from.sin_addr.s_addr;
  1447. // New TCP connection
  1448. con = new TCPConnection(this, tmpsock, in.s_addr, ntohs(from.sin_port), pOldFormat);
  1449. #if TCPN_DEBUG >= 1
  1450. cout << "New TCP connection: " << inet_ntoa(in) << ":" << con->GetrPort() << endl;
  1451. #endif
  1452. AddConnection(con);
  1453. }
  1454. }
  1455. bool TCPServer::Open(int16 in_port, char* errbuf) {
  1456. if (errbuf)
  1457. errbuf[0] = 0;
  1458. LockMutex lock(&MSock);
  1459. if (sock != 0) {
  1460. if (errbuf)
  1461. snprintf(errbuf, TCPConnection_ErrorBufferSize, "Listening socket already open");
  1462. return false;
  1463. }
  1464. if (in_port != 0) {
  1465. pPort = in_port;
  1466. }
  1467. #ifdef WIN32
  1468. SOCKADDR_IN address;
  1469. unsigned long nonblocking = 1;
  1470. #else
  1471. struct sockaddr_in address;
  1472. #endif
  1473. int reuse_addr = 1;
  1474. // Setup internet address information.
  1475. // This is used with the bind() call
  1476. memset((char *) &address, 0, sizeof(address));
  1477. address.sin_family = AF_INET;
  1478. address.sin_port = htons(pPort);
  1479. address.sin_addr.s_addr = htonl(INADDR_ANY);
  1480. // Setting up TCP port for new TCP connections
  1481. sock = socket(AF_INET, SOCK_STREAM, 0);
  1482. if (sock == INVALID_SOCKET) {
  1483. if (errbuf)
  1484. snprintf(errbuf, TCPConnection_ErrorBufferSize, "socket(): INVALID_SOCKET");
  1485. return false;
  1486. }
  1487. // Quag: dont think following is good stuff for TCP, good for UDP
  1488. // Mis: SO_REUSEADDR shouldn't be a problem for tcp--allows you to restart
  1489. // without waiting for conns in TIME_WAIT to die
  1490. setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse_addr, sizeof(reuse_addr));
  1491. if (bind(sock, (struct sockaddr *) &address, sizeof(address)) < 0) {
  1492. #ifdef WIN32
  1493. closesocket(sock);
  1494. #else
  1495. close(sock);
  1496. #endif
  1497. sock = 0;
  1498. if (errbuf)
  1499. sprintf(errbuf, "bind(): <0");
  1500. return false;
  1501. }
  1502. int bufsize = 64 * 1024; // 64kbyte recieve buffer, up from default of 8k
  1503. setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char*) &bufsize, sizeof(bufsize));
  1504. #ifdef WIN32
  1505. ioctlsocket (sock, FIONBIO, &nonblocking);
  1506. #else
  1507. fcntl(sock, F_SETFL, O_NONBLOCK);
  1508. #endif
  1509. if (listen(sock, SOMAXCONN) == SOCKET_ERROR) {
  1510. #ifdef WIN32
  1511. closesocket(sock);
  1512. if (errbuf)
  1513. snprintf(errbuf, TCPConnection_ErrorBufferSize, "listen() failed, Error: %d", WSAGetLastError());
  1514. #else
  1515. close(sock);
  1516. if (errbuf)
  1517. snprintf(errbuf, TCPConnection_ErrorBufferSize, "listen() failed, Error: %s", strerror(errno));
  1518. #endif
  1519. sock = 0;
  1520. return false;
  1521. }
  1522. return true;
  1523. }
  1524. void TCPServer::Close() {
  1525. LockMutex lock(&MSock);
  1526. if (sock) {
  1527. #ifdef WIN32
  1528. closesocket(sock);
  1529. #else
  1530. close(sock);
  1531. #endif
  1532. }
  1533. sock = 0;
  1534. }
  1535. bool TCPServer::IsOpen() {
  1536. MSock.lock();
  1537. bool ret = (bool) (sock != 0);
  1538. MSock.unlock();
  1539. return ret;
  1540. }
  1541. TCPConnection* TCPServer::NewQueuePop() {
  1542. TCPConnection* ret;
  1543. MNewQueue.lock();
  1544. ret = NewQueue.pop();
  1545. MNewQueue.unlock();
  1546. return ret;
  1547. }
  1548. void TCPServer::AddConnection(TCPConnection* con) {
  1549. list->Append(con);
  1550. MNewQueue.lock();
  1551. NewQueue.push(con);
  1552. MNewQueue.unlock();
  1553. }
  1554. TCPConnection* TCPServer::GetConnection(int32 iID) {
  1555. LinkedListIterator<TCPConnection*> iterator(*list);
  1556. iterator.Reset();
  1557. while(iterator.MoreElements()) {
  1558. if (iterator.GetData()->GetID() == iID)
  1559. return iterator.GetData();
  1560. iterator.Advance();
  1561. }
  1562. return 0;
  1563. }
  1564. void TCPServer::SendPacket(ServerPacket* pack) {
  1565. TCPConnection::TCPNetPacket_Struct* tnps = TCPConnection::MakePacket(pack);
  1566. SendPacket(&tnps);
  1567. }
  1568. void TCPServer::SendPacket(TCPConnection::TCPNetPacket_Struct** tnps) {
  1569. MInQueue.lock();
  1570. InQueue.push(*tnps);
  1571. MInQueue.unlock();
  1572. tnps = 0;
  1573. }
  1574. void TCPServer::CheckInQueue() {
  1575. LinkedListIterator<TCPConnection*> iterator(*list);
  1576. TCPConnection::TCPNetPacket_Struct* tnps = 0;
  1577. while (( tnps = InQueuePop() )) {
  1578. iterator.Reset();
  1579. while(iterator.MoreElements()) {
  1580. if (iterator.GetData()->GetMode() != modeConsole && iterator.GetData()->GetRemoteID() == 0)
  1581. iterator.GetData()->SendPacket(tnps);
  1582. iterator.Advance();
  1583. }
  1584. safe_delete(tnps);
  1585. }
  1586. }
  1587. TCPConnection::TCPNetPacket_Struct* TCPServer::InQueuePop() {
  1588. TCPConnection::TCPNetPacket_Struct* ret;
  1589. MInQueue.lock();
  1590. ret = InQueue.pop();
  1591. MInQueue.unlock();
  1592. return ret;
  1593. }