/* EQ2Emulator: Everquest II Server Emulator Copyright (C) 2007 EQ2EMulator Development Team (http://www.eq2emulator.net) This file is part of EQ2Emulator. EQ2Emulator is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. EQ2Emulator is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with EQ2Emulator. If not, see . */ #include "EQStreamFactory.h" #include "Log.h" #ifdef WIN32 #include #include #include #include #include #else #include #include #include #include #include #include #endif #include #include #include #include "op_codes.h" #include "EQStream.h" #include "packet_dump.h" #ifdef WORLD #include "../WorldServer/client.h" #endif using namespace std; #ifdef WORLD extern ClientList client_list; #endif ThreadReturnType EQStreamFactoryReaderLoop(void *eqfs) { if(eqfs){ EQStreamFactory *fs=(EQStreamFactory *)eqfs; fs->ReaderLoop(); } THREAD_RETURN(NULL); } ThreadReturnType EQStreamFactoryWriterLoop(void *eqfs) { if(eqfs){ EQStreamFactory *fs=(EQStreamFactory *)eqfs; fs->WriterLoop(); } THREAD_RETURN(NULL); } ThreadReturnType EQStreamFactoryCombinePacketLoop(void *eqfs) { if(eqfs){ EQStreamFactory *fs=(EQStreamFactory *)eqfs; fs->CombinePacketLoop(); } THREAD_RETURN(NULL); } EQStreamFactory::EQStreamFactory(EQStreamType type, int port) { StreamType=type; Port=port; listen_ip_address = 0; } void EQStreamFactory::Close() { CheckTimeout(true); Stop(); if (sock != -1) { #ifdef WIN32 closesocket(sock); #else close(sock); #endif sock = -1; } } bool EQStreamFactory::Open() { struct sockaddr_in address; #ifndef WIN32 pthread_t t1, t2, t3; #endif /* Setup internet address information. This is used with the bind() call */ memset((char *) &address, 0, sizeof(address)); address.sin_family = AF_INET; address.sin_port = htons(Port); #if defined(LOGIN) || defined(MINILOGIN) if(listen_ip_address) address.sin_addr.s_addr = inet_addr(listen_ip_address); else address.sin_addr.s_addr = htonl(INADDR_ANY); #else address.sin_addr.s_addr = htonl(INADDR_ANY); #endif /* Setting up UDP port for new clients */ sock = socket(AF_INET, SOCK_DGRAM, 0); if (sock < 0) { return false; } if (bind(sock, (struct sockaddr *) &address, sizeof(address)) < 0) { //close(sock); sock=-1; return false; } #ifdef WIN32 unsigned long nonblock = 1; ioctlsocket(sock, FIONBIO, &nonblock); #else fcntl(sock, F_SETFL, O_NONBLOCK); #endif //moved these because on windows the output was delayed and causing the console window to look bad #ifdef LOGIN LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Reader"); LogWrite(LOGIN__DEBUG, 0, "Login", "Starting factory Writer"); #elif WORLD LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Reader"); LogWrite(WORLD__DEBUG, 0, "World", "Starting factory Writer"); #endif #ifdef WIN32 _beginthread(EQStreamFactoryReaderLoop,0, this); _beginthread(EQStreamFactoryWriterLoop,0, this); _beginthread(EQStreamFactoryCombinePacketLoop,0, this); #else pthread_create(&t1,NULL,EQStreamFactoryReaderLoop,this); pthread_create(&t2,NULL,EQStreamFactoryWriterLoop,this); pthread_create(&t3,NULL,EQStreamFactoryCombinePacketLoop,this); pthread_detach(t1); pthread_detach(t2); pthread_detach(t3); #endif return true; } EQStream *EQStreamFactory::Pop() { if (!NewStreams.size()) return NULL; EQStream *s=NULL; //cout << "Pop():Locking MNewStreams" << endl; MNewStreams.lock(); if (NewStreams.size()) { s=NewStreams.front(); NewStreams.pop(); s->PutInUse(); } MNewStreams.unlock(); //cout << "Pop(): Unlocking MNewStreams" << endl; return s; } void EQStreamFactory::Push(EQStream *s) { //cout << "Push():Locking MNewStreams" << endl; MNewStreams.lock(); NewStreams.push(s); MNewStreams.unlock(); //cout << "Push(): Unlocking MNewStreams" << endl; } void EQStreamFactory::ReaderLoop() { fd_set readset; map::iterator stream_itr; int num; int length; unsigned char buffer[2048]; sockaddr_in from; int socklen=sizeof(sockaddr_in); timeval sleep_time; ReaderRunning=true; while(sock!=-1) { MReaderRunning.lock(); if (!ReaderRunning) break; MReaderRunning.unlock(); FD_ZERO(&readset); FD_SET(sock,&readset); sleep_time.tv_sec=30; sleep_time.tv_usec=0; if ((num=select(sock+1,&readset,NULL,NULL,&sleep_time))<0) { // What do we wanna do? } else if (num==0) continue; if (FD_ISSET(sock,&readset)) { #ifdef WIN32 if ((length=recvfrom(sock,(char*)buffer,sizeof(buffer),0,(struct sockaddr*)&from,(int *)&socklen))<0) #else if ((length=recvfrom(sock,buffer,2048,0,(struct sockaddr *)&from,(socklen_t *)&socklen))<0) #endif { // What do we wanna do? } else { char temp[25]; sprintf(temp,"%u.%d",ntohl(from.sin_addr.s_addr),ntohs(from.sin_port)); MStreams.lock(); if ((stream_itr=Streams.find(temp))==Streams.end() || buffer[1]==OP_SessionRequest) { MStreams.unlock(); if (buffer[1]==OP_SessionRequest) { if(stream_itr != Streams.end() && stream_itr->second) stream_itr->second->SetState(CLOSED); EQStream *s=new EQStream(from); s->SetFactory(this); s->SetStreamType(StreamType); Streams[temp]=s; WriterWork.Signal(); Push(s); s->Process(buffer,length); s->SetLastPacketTime(Timer::GetCurrentTime2()); } } else { EQStream *curstream = stream_itr->second; //dont bother processing incoming packets for closed connections if(curstream->CheckClosed()) curstream = NULL; else curstream->PutInUse(); MStreams.unlock(); if(curstream) { curstream->Process(buffer,length); curstream->SetLastPacketTime(Timer::GetCurrentTime2()); curstream->ReleaseFromUse(); } } } } } } void EQStreamFactory::CheckTimeout(bool remove_all) { //lock streams the entire time were checking timeouts, it should be fast. MStreams.lock(); unsigned long now=Timer::GetCurrentTime2(); map::iterator stream_itr; for(stream_itr=Streams.begin();stream_itr!=Streams.end();) { EQStream *s = stream_itr->second; EQStreamState state = s->GetState(); if (state==CLOSING && !s->HasOutgoingData()) { stream_itr->second->SetState(CLOSED); state = CLOSED; } else if (s->CheckTimeout(now, STREAM_TIMEOUT)) { const char* stateString; switch (state){ case ESTABLISHED: stateString = "Established"; break; case CLOSING: stateString = "Closing"; break; case CLOSED: stateString = "Closed"; break; default: stateString = "Unknown"; break; } LogWrite(WORLD__DEBUG, 0, "World", "Timeout up!, state=%s", stateString); if (state==ESTABLISHED) { s->Close(); } else if (state == CLOSING) { //if we time out in the closing state, just give up s->SetState(CLOSED); state = CLOSED; } } //not part of the else so we check it right away on state change if (remove_all || state==CLOSED) { if (!remove_all && s->getTimeoutDelays()<2) { s->addTimeoutDelay(); //give it a little time for everybody to finish with it } else { //everybody is done, we can delete it now #ifdef LOGIN LogWrite(LOGIN__DEBUG, 0, "Login", "Removing connection..."); #else LogWrite(WORLD__DEBUG, 0, "World", "Removing connection..."); #endif map::iterator temp=stream_itr; stream_itr++; //let whoever has the stream outside delete it #ifdef WORLD client_list.RemoveConnection(temp->second); #endif delete temp->second; Streams.erase(temp); continue; } } stream_itr++; } MStreams.unlock(); } void EQStreamFactory::CombinePacketLoop(){ deque combine_que; CombinePacketRunning = true; bool packets_waiting = false; while(sock!=-1) { if (!CombinePacketRunning) break; MStreams.lock(); map::iterator stream_itr; for(stream_itr=Streams.begin();stream_itr!=Streams.end();stream_itr++) { if(!stream_itr->second){ continue; } if(stream_itr->second->combine_timer && stream_itr->second->combine_timer->Check()) combine_que.push_back(stream_itr->second); } EQStream* stream = 0; packets_waiting = false; while(combine_que.size()){ stream = combine_que.front(); if(stream->CheckActive()){ if(!stream->CheckCombineQueue()) packets_waiting = true; } combine_que.pop_front(); } MStreams.unlock(); if(!packets_waiting) Sleep(25); Sleep(1); } } void EQStreamFactory::WriterLoop() { map::iterator stream_itr; vector wants_write; vector::iterator cur,end; deque resend_que; bool decay=false; uint32 stream_count; Timer DecayTimer(20); WriterRunning=true; DecayTimer.Enable(); while(sock!=-1) { Timer::SetCurrentTime(); //if (!havework) { //WriterWork.Wait(); //} MWriterRunning.lock(); if (!WriterRunning) break; MWriterRunning.unlock(); wants_write.clear(); decay=DecayTimer.Check(); //copy streams into a seperate list so we dont have to keep //MStreams locked while we are writting MStreams.lock(); for(stream_itr=Streams.begin();stream_itr!=Streams.end();stream_itr++) { // If it's time to decay the bytes sent, then let's do it before we try to write if(!stream_itr->second){ Streams.erase(stream_itr); break; } if (decay) stream_itr->second->Decay(); if (stream_itr->second->HasOutgoingData()) { stream_itr->second->PutInUse(); wants_write.push_back(stream_itr->second); } if(stream_itr->second->resend_que_timer->Check()) resend_que.push_back(stream_itr->second); } MStreams.unlock(); //do the actual writes cur = wants_write.begin(); end = wants_write.end(); for(; cur != end; cur++) { (*cur)->Write(sock); (*cur)->ReleaseFromUse(); } while(resend_que.size()){ resend_que.front()->CheckResend(sock); resend_que.pop_front(); } Sleep(10); MStreams.lock(); stream_count=Streams.size(); MStreams.unlock(); if (!stream_count) { //cout << "No streams, waiting on condition" << endl; WriterWork.Wait(); //cout << "Awake from condition, must have a stream now" << endl; } } }