/* EQ2Emulator: Everquest II Server Emulator Copyright (C) 2007 EQ2EMulator Development Team (http://www.eq2emulator.net) This file is part of EQ2Emulator. EQ2Emulator is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. EQ2Emulator is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with EQ2Emulator. If not, see . */ #include "../common/debug.h" #include using namespace std; #include #include #include using namespace std; #include "TCPConnection.h" #include "../common/servertalk.h" #include "../common/timer.h" #include "../common/packet_dump.h" #include "Log.h" #ifdef FREEBSD //Timothy Whitman - January 7, 2003 #define MSG_NOSIGNAL 0 #endif #ifdef WIN32 InitWinsock winsock; #endif #define LOOP_GRANULARITY 3 //# of ms between checking our socket/queues #define SERVER_LOOP_GRANULARITY 3 //# of ms between checking our socket/queues #define TCPN_DEBUG 0 #define TCPN_DEBUG_Console 0 #define TCPN_DEBUG_Memory 0 #define TCPN_LOG_PACKETS 0 #define TCPN_LOG_RAW_DATA_OUT 0 #define TCPN_LOG_RAW_DATA_IN 0 TCPConnection::TCPNetPacket_Struct* TCPConnection::MakePacket(ServerPacket* pack, int32 iDestination) { sint32 size = sizeof(TCPNetPacket_Struct) + pack->size; if (pack->compressed) { size += 4; } if (iDestination) { size += 4; } TCPNetPacket_Struct* tnps = (TCPNetPacket_Struct*) new uchar[size]; tnps->size = size; tnps->opcode = pack->opcode; *((int8*) &tnps->flags) = 0; uchar* buffer = tnps->buffer; if (pack->compressed) { tnps->flags.compressed = 1; *((sint32*) buffer) = pack->InflatedSize; buffer += 4; } if (iDestination) { tnps->flags.destination = 1; *((sint32*) buffer) = iDestination; buffer += 4; } memcpy(buffer, pack->pBuffer, pack->size); return tnps; } TCPConnection::TCPConnection(bool iOldFormat, TCPServer* iRelayServer, eTCPMode iMode) { id = 0; Server = iRelayServer; if (Server) RelayServer = true; else RelayServer = false; RelayLink = 0; RelayCount = 0; RemoteID = 0; pOldFormat = iOldFormat; ConnectionType = Outgoing; TCPMode = iMode; pState = TCPS_Ready; pFree = false; pEcho = false; sock = 0; rIP = 0; rPort = 0; keepalive_timer = new Timer(SERVER_TIMEOUT); timeout_timer = new Timer(SERVER_TIMEOUT * 2); recvbuf = 0; sendbuf = 0; pRunLoop = false; charAsyncConnect = 0; pAsyncConnect = false; connection_socket = 0; recvbuf_size = 0; recvbuf_used = 0; recvbuf_echo = 0; sendbuf_size = 0; sendbuf_used = 0; #if TCPN_DEBUG_Memory >= 7 cout << "Constructor #1 on outgoing TCP# " << GetID() << endl; #endif } TCPConnection::TCPConnection(TCPServer* iServer, SOCKET in_socket, int32 irIP, int16 irPort, bool iOldFormat) { Server = iServer; RelayLink = 0; RelayServer = false; RelayCount = 0; RemoteID = 0; id = Server->GetNextID(); ConnectionType = Incomming; pOldFormat = iOldFormat; TCPMode = modePacket; pState = TCPS_Connected; pFree = false; pEcho = false; sock = 0; connection_socket = in_socket; rIP = irIP; rPort = irPort; keepalive_timer = new Timer(SERVER_TIMEOUT); timeout_timer = new Timer(SERVER_TIMEOUT * 2); recvbuf = 0; sendbuf = 0; pRunLoop = false; charAsyncConnect = 0; pAsyncConnect = false; recvbuf_size = 0; recvbuf_used = 0; recvbuf_echo = 0; sendbuf_size = 0; sendbuf_used = 0; #if TCPN_DEBUG_Memory >= 7 cout << "Constructor #2 on outgoing TCP# " << GetID() << endl; #endif } TCPConnection::TCPConnection(TCPServer* iServer, TCPConnection* iRelayLink, int32 iRemoteID, int32 irIP, int16 irPort) { Server = iServer; RelayLink = iRelayLink; RelayServer = true; id = Server->GetNextID(); RelayCount = 0; RemoteID = iRemoteID; if (!RemoteID) ThrowError("Error: TCPConnection: RemoteID == 0 on RelayLink constructor"); pOldFormat = false; ConnectionType = Incomming; TCPMode = modePacket; pState = TCPS_Connected; pFree = false; pEcho = false; sock = 0; connection_socket = 0; rIP = irIP; rPort = irPort; keepalive_timer = 0; timeout_timer = 0; recvbuf = 0; sendbuf = 0; pRunLoop = false; charAsyncConnect = 0; pAsyncConnect = false; recvbuf_size = 0; recvbuf_used = 0; recvbuf_echo = 0; sendbuf_size = 0; sendbuf_used = 0; #if TCPN_DEBUG_Memory >= 7 cout << "Constructor #3 on outgoing TCP# " << GetID() << endl; #endif } TCPConnection::~TCPConnection() { Disconnect(); ClearBuffers(); if (ConnectionType == Outgoing) { MRunLoop.lock(); pRunLoop = false; MRunLoop.unlock(); MLoopRunning.lock(); MLoopRunning.unlock(); #if TCPN_DEBUG_Memory >= 6 cout << "Deconstructor on outgoing TCP# " << GetID() << endl; #endif } #if TCPN_DEBUG_Memory >= 5 else { cout << "Deconstructor on incomming TCP# " << GetID() << endl; } #endif safe_delete(keepalive_timer); safe_delete(timeout_timer); safe_delete_array(recvbuf); safe_delete_array(sendbuf); safe_delete_array(charAsyncConnect); } void TCPConnection::SetState(int8 in_state) { MState.lock(); pState = in_state; MState.unlock(); } int8 TCPConnection::GetState() { int8 ret; MState.lock(); ret = pState; MState.unlock(); return ret; } void TCPConnection::Free() { if (ConnectionType == Outgoing) { ThrowError("TCPConnection::Free() called on an Outgoing connection"); } #if TCPN_DEBUG_Memory >= 5 cout << "Free on TCP# " << GetID() << endl; #endif Disconnect(); pFree = true; } bool TCPConnection::SendPacket(ServerPacket* pack, int32 iDestination) { LockMutex lock(&MState); if (!Connected()) return false; eTCPMode tmp = GetMode(); if (tmp != modePacket && tmp != modeTransition) return false; if (RemoteID) return RelayLink->SendPacket(pack, RemoteID); else { TCPNetPacket_Struct* tnps = MakePacket(pack, iDestination); if (tmp == modeTransition) { InModeQueuePush(tnps); } else { #if TCPN_LOG_PACKETS >= 1 if (pack && pack->opcode != 0) { struct in_addr in; in.s_addr = GetrIP(); CoutTimestamp(true); cout << ": Logging outgoing TCP packet. OPCode: 0x" << hex << setw(4) << setfill('0') << pack->opcode << dec << ", size: " << setw(5) << setfill(' ') << pack->size << " " << inet_ntoa(in) << ":" << GetrPort() << endl; #if TCPN_LOG_PACKETS == 2 if (pack->size >= 32) DumpPacket(pack->pBuffer, 32); else DumpPacket(pack); #endif #if TCPN_LOG_PACKETS >= 3 DumpPacket(pack); #endif } #endif ServerSendQueuePushEnd((uchar**) &tnps, tnps->size); } } return true; } bool TCPConnection::SendPacket(TCPNetPacket_Struct* tnps) { LockMutex lock(&MState); if (RemoteID) return false; if (!Connected()) return false; eTCPMode tmp = GetMode(); if (tmp == modeTransition) { TCPNetPacket_Struct* tnps2 = (TCPNetPacket_Struct*) new uchar[tnps->size]; memcpy(tnps2, tnps, tnps->size); InModeQueuePush(tnps2); return true; } if (GetMode() != modePacket) return false; #if TCPN_LOG_PACKETS >= 1 if (tnps && tnps->opcode != 0) { struct in_addr in; in.s_addr = GetrIP(); CoutTimestamp(true); cout << ": Logging outgoing TCP NetPacket. OPCode: 0x" << hex << setw(4) << setfill('0') << tnps->opcode << dec << ", size: " << setw(5) << setfill(' ') << tnps->size << " " << inet_ntoa(in) << ":" << GetrPort(); if (pOldFormat) cout << " (OldFormat)"; cout << endl; #if TCPN_LOG_PACKETS == 2 if (tnps->size >= 32) DumpPacket((uchar*) tnps, 32); else DumpPacket((uchar*) tnps, tnps->size); #endif #if TCPN_LOG_PACKETS >= 3 DumpPacket((uchar*) tnps, tnps->size); #endif } #endif ServerSendQueuePushEnd((const uchar*) tnps, tnps->size); return true; } bool TCPConnection::Send(const uchar* data, sint32 size) { if (!Connected()) return false; if (GetMode() != modeConsole) return false; if (!size) return true; ServerSendQueuePushEnd(data, size); return true; } void TCPConnection::InModeQueuePush(TCPNetPacket_Struct* tnps) { MSendQueue.lock(); InModeQueue.push(tnps); MSendQueue.unlock(); } void TCPConnection::ServerSendQueuePushEnd(const uchar* data, sint32 size) { MSendQueue.lock(); if (sendbuf == 0) { sendbuf = new uchar[size]; sendbuf_size = size; sendbuf_used = 0; } else if (size > (sendbuf_size - sendbuf_used)) { sendbuf_size += size + 1024; uchar* tmp = new uchar[sendbuf_size]; memcpy(tmp, sendbuf, sendbuf_used); safe_delete_array(sendbuf); sendbuf = tmp; } memcpy(&sendbuf[sendbuf_used], data, size); sendbuf_used += size; MSendQueue.unlock(); } void TCPConnection::ServerSendQueuePushEnd(uchar** data, sint32 size) { MSendQueue.lock(); if (sendbuf == 0) { sendbuf = *data; sendbuf_size = size; sendbuf_used = size; MSendQueue.unlock(); *data = 0; return; } if (size > (sendbuf_size - sendbuf_used)) { sendbuf_size += size; uchar* tmp = new uchar[sendbuf_size]; memcpy(tmp, sendbuf, sendbuf_used); safe_delete_array(sendbuf); sendbuf = tmp; } memcpy(&sendbuf[sendbuf_used], *data, size); sendbuf_used += size; MSendQueue.unlock(); delete[] (TCPNetPacket_Struct*)*data; } void TCPConnection::ServerSendQueuePushFront(uchar* data, sint32 size) { MSendQueue.lock(); if (sendbuf == 0) { sendbuf = new uchar[size]; sendbuf_size = size; sendbuf_used = 0; } else if (size > (sendbuf_size - sendbuf_used)) { sendbuf_size += size; uchar* tmp = new uchar[sendbuf_size]; memcpy(&tmp[size], sendbuf, sendbuf_used); safe_delete_array(sendbuf); sendbuf = tmp; } memcpy(sendbuf, data, size); sendbuf_used += size; MSendQueue.unlock(); } bool TCPConnection::ServerSendQueuePop(uchar** data, sint32* size) { bool ret; if (!MSendQueue.trylock()) return false; if (sendbuf) { *data = sendbuf; *size = sendbuf_used; sendbuf = 0; ret = true; } else { ret = false; } MSendQueue.unlock(); return ret; } ServerPacket* TCPConnection::PopPacket() { ServerPacket* ret; if (!MOutQueueLock.trylock()) return 0; ret = OutQueue.pop(); MOutQueueLock.unlock(); return ret; } char* TCPConnection::PopLine() { char* ret; if (!MOutQueueLock.trylock()) return 0; ret = (char*) LineOutQueue.pop(); MOutQueueLock.unlock(); return ret; } void TCPConnection::OutQueuePush(ServerPacket* pack) { MOutQueueLock.lock(); OutQueue.push(pack); MOutQueueLock.unlock(); } void TCPConnection::LineOutQueuePush(char* line) { #if defined(GOTFRAGS) && 0 if (strcmp(line, "**CRASHME**") == 0) { int i = 0; cout << (5 / i) << endl; } #endif if (strcmp(line, "**PACKETMODE**") == 0) { MSendQueue.lock(); safe_delete_array(sendbuf); if (TCPMode == modeConsole) Send((const uchar*) "\0**PACKETMODE**\r", 16); TCPMode = modePacket; TCPNetPacket_Struct* tnps = 0; while ((tnps = InModeQueue.pop())) { SendPacket(tnps); safe_delete_array(tnps); } MSendQueue.unlock(); safe_delete_array(line); return; } MOutQueueLock.lock(); LineOutQueue.push(line); MOutQueueLock.unlock(); } void TCPConnection::Disconnect(bool iSendRelayDisconnect) { if (connection_socket != INVALID_SOCKET && connection_socket != 0) { MState.lock(); if (pState == TCPS_Connected || pState == TCPS_Disconnecting || pState == TCPS_Disconnected) SendData(); pState = TCPS_Closing; MState.unlock(); shutdown(connection_socket, 0x01); shutdown(connection_socket, 0x00); #ifdef WIN32 closesocket(connection_socket); #else close(connection_socket); #endif connection_socket = 0; rIP = 0; rPort = 0; ClearBuffers(); } SetState(TCPS_Ready); if (RelayLink) { RelayLink->RemoveRelay(this, iSendRelayDisconnect); RelayLink = 0; } } bool TCPConnection::GetAsyncConnect() { bool ret; MAsyncConnect.lock(); ret = pAsyncConnect; MAsyncConnect.unlock(); return ret; } bool TCPConnection::SetAsyncConnect(bool iValue) { bool ret; MAsyncConnect.lock(); ret = pAsyncConnect; pAsyncConnect = iValue; MAsyncConnect.unlock(); return ret; } void TCPConnection::AsyncConnect(char* irAddress, int16 irPort) { if (ConnectionType != Outgoing) { // If this code runs, we got serious problems // Crash and burn. ThrowError("TCPConnection::AsyncConnect() call on a Incomming connection object!"); return; } if (GetState() != TCPS_Ready) return; MAsyncConnect.lock(); if (pAsyncConnect) { MAsyncConnect.unlock(); return; } pAsyncConnect = true; safe_delete_array(charAsyncConnect); charAsyncConnect = new char[strlen(irAddress) + 1]; strcpy(charAsyncConnect, irAddress); rPort = irPort; MAsyncConnect.unlock(); if (!pRunLoop) { pRunLoop = true; #ifdef WIN32 _beginthread(TCPConnectionLoop, 0, this); #else pthread_t thread; pthread_create(&thread, NULL, TCPConnectionLoop, this); pthread_detach(thread); #endif } return; } void TCPConnection::AsyncConnect(int32 irIP, int16 irPort) { if (ConnectionType != Outgoing) { // If this code runs, we got serious problems // Crash and burn. ThrowError("TCPConnection::AsyncConnect() call on a Incomming connection object!"); return; } if (GetState() != TCPS_Ready) return; MAsyncConnect.lock(); if (pAsyncConnect) { MAsyncConnect.unlock(); return; } pAsyncConnect = true; safe_delete(charAsyncConnect); rIP = irIP; rPort = irPort; MAsyncConnect.unlock(); if (!pRunLoop) { pRunLoop = true; #ifdef WIN32 _beginthread(TCPConnectionLoop, 0, this); #else pthread_t thread; pthread_create(&thread, NULL, TCPConnectionLoop, this); pthread_detach(thread); #endif } return; } bool TCPConnection::Connect(char* irAddress, int16 irPort, char* errbuf) { if (errbuf) errbuf[0] = 0; int32 tmpIP = ResolveIP(irAddress); if (!tmpIP) { if (errbuf) { #ifdef WIN32 snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): Couldnt resolve hostname. Error: %i", WSAGetLastError()); #else snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): Couldnt resolve hostname. Error #%i: %s", errno, strerror(errno)); #endif } return false; } return Connect(tmpIP, irPort, errbuf); } bool TCPConnection::Connect(int32 in_ip, int16 in_port, char* errbuf) { if (errbuf) errbuf[0] = 0; if (ConnectionType != Outgoing) { // If this code runs, we got serious problems // Crash and burn. ThrowError("TCPConnection::Connect() call on a Incomming connection object!"); return false; } MState.lock(); if (pState == TCPS_Ready) { pState = TCPS_Connecting; } else { MState.unlock(); SetAsyncConnect(false); return false; } MState.unlock(); if (!pRunLoop) { pRunLoop = true; #ifdef WIN32 _beginthread(TCPConnectionLoop, 0, this); #else pthread_t thread; pthread_create(&thread, NULL, TCPConnectionLoop, this); pthread_detach(thread); #endif } connection_socket = INVALID_SOCKET; struct sockaddr_in server_sin; // struct in_addr in; if ((connection_socket = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET || connection_socket == 0) { #ifdef WIN32 if (errbuf) snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): Allocating socket failed. Error: %i", WSAGetLastError()); #else if (errbuf) snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): Allocating socket failed. Error: %s", strerror(errno)); #endif SetState(TCPS_Ready); SetAsyncConnect(false); return false; } server_sin.sin_family = AF_INET; server_sin.sin_addr.s_addr = in_ip; server_sin.sin_port = htons(in_port); // Establish a connection to the server socket. #ifdef WIN32 if (connect(connection_socket, (PSOCKADDR) &server_sin, sizeof (server_sin)) == SOCKET_ERROR) { if (errbuf) snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): connect() failed. Error: %i", WSAGetLastError()); closesocket(connection_socket); connection_socket = 0; SetState(TCPS_Ready); SetAsyncConnect(false); return false; } #else if (connect(connection_socket, (struct sockaddr *) &server_sin, sizeof (server_sin)) == SOCKET_ERROR) { if (errbuf) snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): connect() failed. Error: %s", strerror(errno)); close(connection_socket); connection_socket = 0; SetState(TCPS_Ready); SetAsyncConnect(false); return false; } #endif int bufsize = 64 * 1024; // 64kbyte recieve buffer, up from default of 8k setsockopt(connection_socket, SOL_SOCKET, SO_RCVBUF, (char*) &bufsize, sizeof(bufsize)); #ifdef WIN32 unsigned long nonblocking = 1; ioctlsocket(connection_socket, FIONBIO, &nonblocking); #else fcntl(connection_socket, F_SETFL, O_NONBLOCK); #endif SetEcho(false); MSendQueue.lock(); ClearBuffers(); TCPMode = modePacket; MSendQueue.unlock(); rIP = in_ip; rPort = in_port; SetState(TCPS_Connected); SetAsyncConnect(false); return true; } void TCPConnection::ClearBuffers() { LockMutex lock1(&MSendQueue); LockMutex lock2(&MOutQueueLock); LockMutex lock3(&MRunLoop); LockMutex lock4(&MState); safe_delete_array(recvbuf); safe_delete_array(sendbuf); ServerPacket* pack = 0; while ((pack = PopPacket())) safe_delete(pack); TCPNetPacket_Struct* tnps = 0; while ((tnps = InModeQueue.pop())) safe_delete(tnps); char* line = 0; while ((line = LineOutQueue.pop())) safe_delete_array(line); keepalive_timer->Start(); timeout_timer->Start(); } bool TCPConnection::CheckNetActive() { MState.lock(); if (pState == TCPS_Connected || pState == TCPS_Disconnecting) { MState.unlock(); return true; } MState.unlock(); return false; } bool TCPConnection::Process() { char errbuf[TCPConnection_ErrorBufferSize]; if (!CheckNetActive()) { if (ConnectionType == Outgoing) { if (GetAsyncConnect()) { if (charAsyncConnect) rIP = ResolveIP(charAsyncConnect); Connect(rIP, rPort); } } if (GetState() == TCPS_Disconnected) { Disconnect(); return false; } else if (GetState() == TCPS_Connecting) return true; else return false; } if (!SendData(errbuf)) { struct in_addr in; in.s_addr = GetrIP(); cout << inet_ntoa(in) << ":" << GetrPort() << ": " << errbuf << endl; return false; } if (!Connected()) return false; if (!RecvData(errbuf)) { struct in_addr in; in.s_addr = GetrIP(); cout << inet_ntoa(in) << ":" << GetrPort() << ": " << errbuf << endl; return false; } return true; } bool TCPConnection::RecvData(char* errbuf) { if (errbuf) errbuf[0] = 0; if (!Connected()) { return false; } int status = 0; if (recvbuf == 0) { recvbuf = new uchar[5120]; recvbuf_size = 5120; recvbuf_used = 0; recvbuf_echo = 0; } else if ((recvbuf_size - recvbuf_used) < 2048) { uchar* tmpbuf = new uchar[recvbuf_size + 5120]; memcpy(tmpbuf, recvbuf, recvbuf_used); recvbuf_size += 5120; safe_delete_array(recvbuf); recvbuf = tmpbuf; if (recvbuf_size >= MaxTCPReceiveBufferSize) { if (errbuf) snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::RecvData(): recvbuf_size >= MaxTCPReceiveBufferSize"); return false; } } status = recv(connection_socket, (char *) &recvbuf[recvbuf_used], (recvbuf_size - recvbuf_used), 0); if (status >= 1) { #if TCPN_LOG_RAW_DATA_IN >= 1 struct in_addr in; in.s_addr = GetrIP(); CoutTimestamp(true); cout << ": Read " << status << " bytes from network. (recvbuf_used = " << recvbuf_used << ") " << inet_ntoa(in) << ":" << GetrPort(); if (pOldFormat) cout << " (OldFormat)"; cout << endl; #if TCPN_LOG_RAW_DATA_IN == 2 sint32 tmp = status; if (tmp > 32) tmp = 32; DumpPacket(&recvbuf[recvbuf_used], status); #elif TCPN_LOG_RAW_DATA_IN >= 3 DumpPacket(&recvbuf[recvbuf_used], status); #endif #endif recvbuf_used += status; timeout_timer->Start(); if (!ProcessReceivedData(errbuf)) return false; } else if (status == SOCKET_ERROR) { #ifdef WIN32 if (!(WSAGetLastError() == WSAEWOULDBLOCK)) { if (errbuf) snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::RecvData(): Error: %i", WSAGetLastError()); return false; } #else if (!(errno == EWOULDBLOCK)) { if (errbuf) snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::RecvData(): Error: %s", strerror(errno)); return false; } #endif } if ((TCPMode == modePacket || TCPMode == modeTransition) && timeout_timer->Check()) { if (errbuf) snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::RecvData(): Connection timeout"); return false; } return true; } bool TCPConnection::GetEcho() { bool ret; MEcho.lock(); ret = pEcho; MEcho.unlock(); return ret; } void TCPConnection::SetEcho(bool iValue) { MEcho.lock(); pEcho = iValue; MEcho.unlock(); } bool TCPConnection::ProcessReceivedData(char* errbuf) { if (errbuf) errbuf[0] = 0; if (!recvbuf) return true; if (TCPMode == modePacket) { //if (pOldFormat) // return ProcessReceivedDataAsOldPackets(errbuf); //else return ProcessReceivedDataAsPackets(errbuf); } else { #if TCPN_DEBUG_Console >= 4 if (recvbuf_used) { cout << "Starting Processing: recvbuf=" << recvbuf_used << endl; DumpPacket(recvbuf, recvbuf_used); } #endif for (int i=0; i < recvbuf_used; i++) { if (GetEcho() && i >= recvbuf_echo) { Send(&recvbuf[i], 1); recvbuf_echo = i + 1; } switch(recvbuf[i]) { case 0: { // 0 is the code for clear buffer if (i==0) { recvbuf_used--; recvbuf_echo--; memcpy(recvbuf, &recvbuf[1], recvbuf_used); i = -1; } else { if (i == recvbuf_used) { safe_delete_array(recvbuf); i = -1; } else { uchar* tmpdel = recvbuf; recvbuf = new uchar[recvbuf_size]; memcpy(recvbuf, &tmpdel[i+1], recvbuf_used-i); recvbuf_used -= i + 1; recvbuf_echo -= i + 1; safe_delete(tmpdel); i = -1; } } #if TCPN_DEBUG_Console >= 5 cout << "Removed 0x00" << endl; if (recvbuf_used) { cout << "recvbuf left: " << recvbuf_used << endl; DumpPacket(recvbuf, recvbuf_used); } else cout << "recbuf left: None" << endl; #endif break; } case 10: case 13: // newline marker { if (i==0) { // empty line recvbuf_used--; recvbuf_echo--; memcpy(recvbuf, &recvbuf[1], recvbuf_used); i = -1; } else { char* line = new char[i+1]; memset(line, 0, i+1); memcpy(line, recvbuf, i); #if TCPN_DEBUG_Console >= 3 cout << "Line Out: " << endl; DumpPacket((uchar*) line, i); #endif //line[i] = 0; uchar* tmpdel = recvbuf; recvbuf = new uchar[recvbuf_size]; recvbuf_used -= i+1; recvbuf_echo -= i+1; memcpy(recvbuf, &tmpdel[i+1], recvbuf_used); #if TCPN_DEBUG_Console >= 5 cout << "i+1=" << i+1 << endl; if (recvbuf_used) { cout << "recvbuf left: " << recvbuf_used << endl; DumpPacket(recvbuf, recvbuf_used); } else cout << "recbuf left: None" << endl; #endif safe_delete(tmpdel); if (strlen(line) > 0) LineOutQueuePush(line); else safe_delete_array(line); if (TCPMode == modePacket) { return ProcessReceivedDataAsPackets(errbuf); } i = -1; } break; } case 8: // backspace { if (i==0) { // nothin to backspace recvbuf_used--; recvbuf_echo--; memcpy(recvbuf, &recvbuf[1], recvbuf_used); i = -1; } else { uchar* tmpdel = recvbuf; recvbuf = new uchar[recvbuf_size]; memcpy(recvbuf, tmpdel, i-1); memcpy(&recvbuf[i-1], &tmpdel[i+1], recvbuf_used-i); recvbuf_used -= 2; recvbuf_echo -= 2; safe_delete(tmpdel); i -= 2; } break; } } } if (recvbuf_used < 0) safe_delete_array(recvbuf); } return true; } bool TCPConnection::ProcessReceivedDataAsPackets(char* errbuf) { if (errbuf) errbuf[0] = 0; sint32 base = 0; sint32 size = 7; uchar* buffer; ServerPacket* pack = 0; while ((recvbuf_used - base) >= size) { TCPNetPacket_Struct* tnps = (TCPNetPacket_Struct*) &recvbuf[base]; buffer = tnps->buffer; size = tnps->size; if (size >= MaxTCPReceiveBufferSize) { #if TCPN_DEBUG_Memory >= 1 cout << "TCPConnection[" << GetID() << "]::ProcessReceivedDataAsPackets(): size[" << size << "] >= MaxTCPReceiveBufferSize" << endl; #endif if (errbuf) snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::ProcessReceivedDataAsPackets(): size >= MaxTCPReceiveBufferSize"); return false; } if ((recvbuf_used - base) >= size) { // ok, we got enough data to make this packet! safe_delete(pack); pack = new ServerPacket; pack->size = size - sizeof(TCPNetPacket_Struct); // read headers pack->opcode = tnps->opcode; if (tnps->flags.compressed) { pack->compressed = true; pack->InflatedSize = *((sint32*)buffer); pack->size -= 4; buffer += 4; } if (tnps->flags.destination) { pack->destination = *((sint32*)buffer); pack->size -= 4; buffer += 4; } // end read headers if (pack->size > 0) { if (tnps->flags.compressed) { // Lets decompress the packet here pack->compressed = false; if(pack->InflatedSize < MaxTCPReceiveBufferSize) { pack->pBuffer = new uchar[pack->InflatedSize]; pack->size = InflatePacket(buffer, pack->size, pack->pBuffer, pack->InflatedSize); } else { cout << "Invalid inflated packet." << endl; safe_delete(pack); return false; } } else { pack->pBuffer = new uchar[pack->size]; memcpy(pack->pBuffer, buffer, pack->size); } } if (pack->opcode == 0) { if (pack->size) { #if TCPN_DEBUG >= 2 cout << "Received TCP Network layer packet" << endl; #endif ProcessNetworkLayerPacket(pack); } #if TCPN_DEBUG >= 5 else { cout << "Received TCP keepalive packet. (opcode=0)" << endl; } #endif } else { #if TCPN_LOG_PACKETS >= 1 if (pack && pack->opcode != 0) { struct in_addr in; in.s_addr = GetrIP(); CoutTimestamp(true); cout << ": Logging incoming TCP packet. OPCode: 0x" << hex << setw(4) << setfill('0') << pack->opcode << dec << ", size: " << setw(5) << setfill(' ') << pack->size << " " << inet_ntoa(in) << ":" << GetrPort() << endl; #if TCPN_LOG_PACKETS == 2 if (pack->size >= 32) DumpPacket(pack->pBuffer, 32); else DumpPacket(pack); #endif #if TCPN_LOG_PACKETS >= 3 DumpPacket(pack); #endif } #endif if (RelayServer && Server && pack->destination) { TCPConnection* con = Server->GetConnection(pack->destination); if (!con) { #if TCPN_DEBUG >= 1 cout << "Error relaying packet: con = 0" << endl; #endif } else{ con->OutQueuePush(pack); pack = 0; } } else{ OutQueuePush(pack); pack = 0; } } base += size; size = 7; } } safe_delete(pack); if (base != 0) { if (base >= recvbuf_used) { safe_delete_array(recvbuf); } else { uchar* tmpbuf = new uchar[recvbuf_size - base]; memcpy(tmpbuf, &recvbuf[base], recvbuf_used - base); safe_delete_array(recvbuf); recvbuf = tmpbuf; recvbuf_used -= base; recvbuf_size -= base; } } return true; } bool TCPConnection::ProcessReceivedDataAsOldPackets(char* errbuf) { sint32 base = 0; sint32 size = 4; uchar* buffer; ServerPacket* pack = 0; while ((recvbuf_used - base) >= size) { buffer = &recvbuf[base]; memcpy(&size, &buffer[2], 2); if (size >= MaxTCPReceiveBufferSize) { #if TCPN_DEBUG_Memory >= 1 cout << "TCPConnection[" << GetID() << "]::ProcessReceivedDataAsPackets(): size[" << size << "] >= MaxTCPReceiveBufferSize" << endl; #endif if (errbuf) snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::ProcessReceivedDataAsPackets(): size >= MaxTCPReceiveBufferSize"); return false; } if ((recvbuf_used - base) >= size) { // ok, we got enough data to make this packet! pack = new ServerPacket; memcpy(&pack->opcode, &buffer[0], 2); pack->size = size - 4; LogWrite(MISC__TODO, 1, "TODO", "Checksum or size check or something similar\n\t(%s, function: %s, line #: %i)", __FILE__, __FUNCTION__, __LINE__); /* if () { // TODO: Checksum or size check or something similar // Datastream corruption, get the hell outta here! delete pack; return false; } */ if (pack->size > 0) { pack->pBuffer = new uchar[pack->size]; memcpy(pack->pBuffer, &buffer[4], pack->size); } if (pack->opcode == 0) { // keepalive, no need to process safe_delete(pack); } else { #if TCPN_LOG_PACKETS >= 1 if (pack && pack->opcode != 0) { struct in_addr in; in.s_addr = GetrIP(); CoutTimestamp(true); cout << ": Logging incoming TCP OldPacket. OPCode: 0x" << hex << setw(4) << setfill('0') << pack->opcode << dec << ", size: " << setw(5) << setfill(' ') << pack->size << " " << inet_ntoa(in) << ":" << GetrPort() << endl; #if TCPN_LOG_PACKETS == 2 if (pack->size >= 32) DumpPacket(pack->pBuffer, 32); else DumpPacket(pack); #endif #if TCPN_LOG_PACKETS >= 3 DumpPacket(pack); #endif } #endif OutQueuePush(pack); } base += size; size = 4; } } if (base != 0) { if (base >= recvbuf_used) { safe_delete_array(recvbuf); } else { uchar* tmpbuf = new uchar[recvbuf_size - base]; memcpy(tmpbuf, &recvbuf[base], recvbuf_used - base); safe_delete_array(recvbuf); recvbuf = tmpbuf; recvbuf_used -= base; recvbuf_size -= base; } } return true; } void TCPConnection::ProcessNetworkLayerPacket(ServerPacket* pack) { int8 opcode = pack->pBuffer[0]; /** disabling RELAY capabilities, this functionality is poorly implemented even if such a feature needs to be re-used need authentication BEFORE allowing relay to take place secondly we need to protect the LS accepting new connections as bogus data can be passed to open fake TCP connections opcode 0 is OK, that is Keep-Alive **/ if (opcode > 0) { Disconnect(); return; } int8* data = &pack->pBuffer[1]; switch (opcode) { case 0: { break; } case 1: { // Switch to RelayServer mode if (pack->size != 1) { SendNetErrorPacket("New RelayClient: wrong size, expected 1"); break; } if (RelayServer) { SendNetErrorPacket("Switch to RelayServer mode when already in RelayServer mode"); break; } if (RemoteID) { SendNetErrorPacket("Switch to RelayServer mode by a Relay Client"); break; } if (ConnectionType != Incomming) { SendNetErrorPacket("Switch to RelayServer mode on outgoing connection"); break; } #if TCPC_DEBUG >= 3 struct in_addr in; in.s_addr = GetrIP(); cout << "Switching to RelayServer mode: " << inet_ntoa(in) << ":" << GetPort() << endl; #endif RelayServer = true; break; } case 2: { // New Relay Client if (!RelayServer) { SendNetErrorPacket("New RelayClient when not in RelayServer mode"); break; } if (pack->size != 11) { SendNetErrorPacket("New RelayClient: wrong size, expected 11"); break; } if (ConnectionType != Incomming) { SendNetErrorPacket("New RelayClient: illegal on outgoing connection"); break; } TCPConnection* con = new TCPConnection(Server, this, *((int32*) data), *((int32*) &data[4]), *((int16*) &data[8])); Server->AddConnection(con); RelayCount++; break; } case 3: { // Delete Relay Client if (!RelayServer) { SendNetErrorPacket("Delete RelayClient when not in RelayServer mode"); break; } if (pack->size != 5) { SendNetErrorPacket("Delete RelayClient: wrong size, expected 5"); break; } TCPConnection* con = Server->GetConnection(*((int32*)data)); if (con) { if (ConnectionType == Incomming) { if (con->GetRelayLink() != this) { SendNetErrorPacket("Delete RelayClient: RelayLink != this"); break; } } con->Disconnect(false); } break; } case 255: { #if TCPC_DEBUG >= 1 struct in_addr in; in.s_addr = GetrIP(); cout "Received NetError: '"; if (pack->size > 1) cout << (char*) data; cout << "': " << inet_ntoa(in) << ":" << GetPort() << endl; #endif break; } } } void TCPConnection::SendNetErrorPacket(const char* reason) { #if TCPC_DEBUG >= 1 struct in_addr in; in.s_addr = GetrIP(); cout "NetError: '"; if (reason) cout << reason; cout << "': " << inet_ntoa(in) << ":" << GetPort() << endl; #endif ServerPacket* pack = new ServerPacket(0); pack->size = 1; if (reason) pack->size += strlen(reason) + 1; pack->pBuffer = new uchar[pack->size]; memset(pack->pBuffer, 0, pack->size); pack->pBuffer[0] = 255; strcpy((char*) &pack->pBuffer[1], reason); SendPacket(pack); safe_delete(pack); } void TCPConnection::RemoveRelay(TCPConnection* relay, bool iSendRelayDisconnect) { if (iSendRelayDisconnect) { ServerPacket* pack = new ServerPacket(0, 5); pack->pBuffer[0] = 3; *((int32*) &pack->pBuffer[1]) = relay->GetRemoteID(); SendPacket(pack); safe_delete(pack); } RelayCount--; } bool TCPConnection::SendData(char* errbuf) { if (errbuf) errbuf[0] = 0; /************ Get first send packet on queue and send it! ************/ uchar* data = 0; sint32 size = 0; int status = 0; if (ServerSendQueuePop(&data, &size)) { #ifdef WIN32 status = send(connection_socket, (const char *) data, size, 0); #else status = send(connection_socket, data, size, MSG_NOSIGNAL); if(errno==EPIPE) status = SOCKET_ERROR; #endif if (status >= 1) { #if TCPN_LOG_RAW_DATA_OUT >= 1 struct in_addr in; in.s_addr = GetrIP(); CoutTimestamp(true); cout << ": Wrote " << status << " bytes to network. " << inet_ntoa(in) << ":" << GetrPort(); if (pOldFormat) cout << " (OldFormat)"; cout << endl; #if TCPN_LOG_RAW_DATA_OUT == 2 sint32 tmp = status; if (tmp > 32) tmp = 32; DumpPacket(data, status); #elif TCPN_LOG_RAW_DATA_OUT >= 3 DumpPacket(data, status); #endif #endif keepalive_timer->Start(); if (status < (signed)size) { #if TCPN_LOG_RAW_DATA_OUT >= 1 struct in_addr in; in.s_addr = GetrIP(); CoutTimestamp(true); cout << ": Pushed " << (size - status) << " bytes back onto the send queue. " << inet_ntoa(in) << ":" << GetrPort(); if (pOldFormat) cout << " (OldFormat)"; cout << endl; #endif // If there's network congestion, the number of bytes sent can be less than // what we tried to give it... Push the extra back on the queue for later ServerSendQueuePushFront(&data[status], size - status); } else if (status > (signed)size) { ThrowError("TCPConnection::SendData(): WTF! status > size"); return false; } // else if (status == size) {} } else { ServerSendQueuePushFront(data, size); } safe_delete_array(data); if (status == SOCKET_ERROR) { #ifdef WIN32 if (WSAGetLastError() != WSAEWOULDBLOCK) #else if (errno != EWOULDBLOCK) #endif { if (errbuf) { #ifdef WIN32 snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::SendData(): send(): Errorcode: %i", WSAGetLastError()); #else snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::SendData(): send(): Errorcode: %s", strerror(errno)); #endif } return false; } } } if (TCPMode == modePacket && keepalive_timer->Check()) { ServerPacket* pack = new ServerPacket(0, 0); SendPacket(pack); safe_delete(pack); #if TCPN_DEBUG >= 5 cout << "Sending TCP keepalive packet. (timeout=" << timeout_timer->GetRemainingTime() << " remaining)" << endl; #endif } return true; } ThreadReturnType TCPConnectionLoop(void* tmp) { #ifdef WIN32 SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL); #endif if (tmp == 0) { ThrowError("TCPConnectionLoop(): tmp = 0!"); THREAD_RETURN(NULL); } TCPConnection* tcpc = (TCPConnection*) tmp; tcpc->MLoopRunning.lock(); while (tcpc->RunLoop()) { Sleep(LOOP_GRANULARITY); if (tcpc->GetState() != TCPS_Ready) { if (!tcpc->Process()) { tcpc->Disconnect(); } } else if (tcpc->GetAsyncConnect()) { if (tcpc->charAsyncConnect) tcpc->Connect(tcpc->charAsyncConnect, tcpc->GetrPort()); else tcpc->Connect(tcpc->GetrIP(), tcpc->GetrPort()); tcpc->SetAsyncConnect(false); } else Sleep(10); } tcpc->MLoopRunning.unlock(); THREAD_RETURN(NULL); } bool TCPConnection::RunLoop() { bool ret; MRunLoop.lock(); ret = pRunLoop; MRunLoop.unlock(); return ret; } TCPServer::TCPServer(int16 in_port, bool iOldFormat) { NextID = 1; pPort = in_port; sock = 0; pOldFormat = iOldFormat; list = new LinkedList; pRunLoop = true; #ifdef WIN32 _beginthread(TCPServerLoop, 0, this); #else pthread_t thread; pthread_create(&thread, NULL, &TCPServerLoop, this); pthread_detach(thread); #endif } TCPServer::~TCPServer() { MRunLoop.lock(); pRunLoop = false; MRunLoop.unlock(); MLoopRunning.lock(); MLoopRunning.unlock(); while (NewQueue.pop()); // the objects are deleted with the list, clear this queue so it doesnt try to delete them again safe_delete(list); } bool TCPServer::RunLoop() { bool ret; MRunLoop.lock(); ret = pRunLoop; MRunLoop.unlock(); return ret; } ThreadReturnType TCPServerLoop(void* tmp) { #ifdef WIN32 SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL); #endif if (tmp == 0) { ThrowError("TCPServerLoop(): tmp = 0!"); THREAD_RETURN(NULL); } TCPServer* tcps = (TCPServer*) tmp; tcps->MLoopRunning.lock(); while (tcps->RunLoop()) { Sleep(SERVER_LOOP_GRANULARITY); tcps->Process(); } tcps->MLoopRunning.unlock(); THREAD_RETURN(NULL); } void TCPServer::Process() { CheckInQueue(); ListenNewConnections(); LinkedListIterator iterator(*list); iterator.Reset(); while(iterator.MoreElements()) { if (iterator.GetData()->IsFree() && (!iterator.GetData()->CheckNetActive())) { #if _DEBUG LogWrite(NET__DEBUG, 0, "Net", "EQStream Connection deleted."); #endif iterator.RemoveCurrent(); } else { if (!iterator.GetData()->Process()) iterator.GetData()->Disconnect(); iterator.Advance(); } } } void TCPServer::ListenNewConnections() { SOCKET tmpsock; struct sockaddr_in from; struct in_addr in; unsigned int fromlen; TCPConnection* con; from.sin_family = AF_INET; fromlen = sizeof(from); LockMutex lock(&MSock); if (!sock) return; // Check for pending connects #ifdef WIN32 unsigned long nonblocking = 1; while ((tmpsock = accept(sock, (struct sockaddr*) &from, (int *) &fromlen)) != INVALID_SOCKET) { ioctlsocket (tmpsock, FIONBIO, &nonblocking); #else while ((tmpsock = accept(sock, (struct sockaddr*) &from, &fromlen)) != INVALID_SOCKET) { fcntl(tmpsock, F_SETFL, O_NONBLOCK); #endif int bufsize = 64 * 1024; // 64kbyte recieve buffer, up from default of 8k setsockopt(tmpsock, SOL_SOCKET, SO_RCVBUF, (char*) &bufsize, sizeof(bufsize)); in.s_addr = from.sin_addr.s_addr; // New TCP connection con = new TCPConnection(this, tmpsock, in.s_addr, ntohs(from.sin_port), pOldFormat); #if TCPN_DEBUG >= 1 cout << "New TCP connection: " << inet_ntoa(in) << ":" << con->GetrPort() << endl; #endif AddConnection(con); } } bool TCPServer::Open(int16 in_port, char* errbuf) { if (errbuf) errbuf[0] = 0; LockMutex lock(&MSock); if (sock != 0) { if (errbuf) snprintf(errbuf, TCPConnection_ErrorBufferSize, "Listening socket already open"); return false; } if (in_port != 0) { pPort = in_port; } #ifdef WIN32 SOCKADDR_IN address; unsigned long nonblocking = 1; #else struct sockaddr_in address; #endif int reuse_addr = 1; // Setup internet address information. // This is used with the bind() call memset((char *) &address, 0, sizeof(address)); address.sin_family = AF_INET; address.sin_port = htons(pPort); address.sin_addr.s_addr = htonl(INADDR_ANY); // Setting up TCP port for new TCP connections sock = socket(AF_INET, SOCK_STREAM, 0); if (sock == INVALID_SOCKET) { if (errbuf) snprintf(errbuf, TCPConnection_ErrorBufferSize, "socket(): INVALID_SOCKET"); return false; } // Quag: dont think following is good stuff for TCP, good for UDP // Mis: SO_REUSEADDR shouldn't be a problem for tcp--allows you to restart // without waiting for conns in TIME_WAIT to die setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse_addr, sizeof(reuse_addr)); if (bind(sock, (struct sockaddr *) &address, sizeof(address)) < 0) { #ifdef WIN32 closesocket(sock); #else close(sock); #endif sock = 0; if (errbuf) sprintf(errbuf, "bind(): <0"); return false; } int bufsize = 64 * 1024; // 64kbyte recieve buffer, up from default of 8k setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char*) &bufsize, sizeof(bufsize)); #ifdef WIN32 ioctlsocket (sock, FIONBIO, &nonblocking); #else fcntl(sock, F_SETFL, O_NONBLOCK); #endif if (listen(sock, SOMAXCONN) == SOCKET_ERROR) { #ifdef WIN32 closesocket(sock); if (errbuf) snprintf(errbuf, TCPConnection_ErrorBufferSize, "listen() failed, Error: %d", WSAGetLastError()); #else close(sock); if (errbuf) snprintf(errbuf, TCPConnection_ErrorBufferSize, "listen() failed, Error: %s", strerror(errno)); #endif sock = 0; return false; } return true; } void TCPServer::Close() { LockMutex lock(&MSock); if (sock) { #ifdef WIN32 closesocket(sock); #else close(sock); #endif } sock = 0; } bool TCPServer::IsOpen() { MSock.lock(); bool ret = (bool) (sock != 0); MSock.unlock(); return ret; } TCPConnection* TCPServer::NewQueuePop() { TCPConnection* ret; MNewQueue.lock(); ret = NewQueue.pop(); MNewQueue.unlock(); return ret; } void TCPServer::AddConnection(TCPConnection* con) { list->Append(con); MNewQueue.lock(); NewQueue.push(con); MNewQueue.unlock(); } TCPConnection* TCPServer::GetConnection(int32 iID) { LinkedListIterator iterator(*list); iterator.Reset(); while(iterator.MoreElements()) { if (iterator.GetData()->GetID() == iID) return iterator.GetData(); iterator.Advance(); } return 0; } void TCPServer::SendPacket(ServerPacket* pack) { TCPConnection::TCPNetPacket_Struct* tnps = TCPConnection::MakePacket(pack); SendPacket(&tnps); } void TCPServer::SendPacket(TCPConnection::TCPNetPacket_Struct** tnps) { MInQueue.lock(); InQueue.push(*tnps); MInQueue.unlock(); tnps = 0; } void TCPServer::CheckInQueue() { LinkedListIterator iterator(*list); TCPConnection::TCPNetPacket_Struct* tnps = 0; while (( tnps = InQueuePop() )) { iterator.Reset(); while(iterator.MoreElements()) { if (iterator.GetData()->GetMode() != modeConsole && iterator.GetData()->GetRemoteID() == 0) iterator.GetData()->SendPacket(tnps); iterator.Advance(); } safe_delete(tnps); } } TCPConnection::TCPNetPacket_Struct* TCPServer::InQueuePop() { TCPConnection::TCPNetPacket_Struct* ret; MInQueue.lock(); ret = InQueue.pop(); MInQueue.unlock(); return ret; }