Browse Source

EQStream updates (sequencing/packet writing)

Fixes #17
Image 4 years ago
parent
commit
7cea150b49

+ 14 - 2
EQ2/source/common/EQPacket.cpp

@@ -119,14 +119,26 @@ uint32 EQProtocolPacket::serialize(unsigned char *dest, int8 offset) const
 
 uint32 EQApplicationPacket::serialize(unsigned char *dest) const
 {
+	uint8 OpCodeBytes = app_opcode_size;
+
 	if (app_opcode_size==1)
 		*(unsigned char *)dest=opcode;
 	else
-		*(uint16 *)dest=opcode;
+	{
+		// Application opcodes with a low order byte of 0x00 require an extra 0x00 byte inserting prior to the opcode.
+		if ((opcode & 0x00ff) == 0)
+		{
+			*(uint8*)dest = 0;
+			*(uint16*)(dest + 1) = opcode;
+			++OpCodeBytes;
+		}
+		else
+			*(uint16*)dest = opcode;
+	}
 
 	memcpy(dest+app_opcode_size,pBuffer,size);
 
-	return size+app_opcode_size;
+	return size+ OpCodeBytes;
 }
 
 EQPacket::~EQPacket()

+ 2 - 0
EQ2/source/common/EQPacket.h

@@ -95,6 +95,7 @@ public:
 		sequence = 0;
 		sent_time = 0;
 		attempt_count = 0;
+		acked = false;
 	} 
 	EQProtocolPacket(const unsigned char *buf, uint32 len, int in_opcode = -1);
 	bool combine(const EQProtocolPacket *rhs);
@@ -117,6 +118,7 @@ public:
 	bool eq2_compressed;
 	bool packet_prepared;
 	bool packet_encrypted;
+	bool acked;
 	int32 sent_time;
 	int8  attempt_count;
 	int32 sequence;

+ 257 - 86
EQ2/source/common/EQStream.cpp

@@ -95,7 +95,17 @@ void EQStream::init(bool resetSession) {
 	MRate.unlock();
 
 	BytesWritten=0;
+	SequencedBase = 0;
+	AverageDelta = 500;
+
 	crypto->setRC4Key(0);
+
+	retransmittimer = Timer::GetCurrentTime2();
+	retransmittimeout = 500 * RETRANSMIT_TIMEOUT_MULT;
+
+	if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) {
+		LogWrite(PACKET__DEBUG, 9, "Packet",  "init Invalid Sequenced queue: BS %u + SQ %u != NOS %u", SequencedBase, SequencedQueue.size(), NextOutSeq);
+	}
 }
 
 EQStream::EQStream(sockaddr_in addr){ 
@@ -335,7 +345,7 @@ void EQStream::ProcessPacket(EQProtocolPacket *p)
 
 				uint16 seq=ntohs(*(uint16 *)(p->pBuffer));
 				sint8 check=CompareSequence(NextInSeq,seq);
-				if (check>0) {
+				if (check == SeqFuture) {
 #ifdef EQN_DEBUG
 					LogWrite(PACKET__DEBUG, 1, "Packet", "*** Future packet: Expecting Seq=%i, but got Seq=%i", NextInSeq, seq);
 					LogWrite(PACKET__DEBUG, 1, "Packet", "[Start]");
@@ -346,7 +356,7 @@ void EQStream::ProcessPacket(EQProtocolPacket *p)
 
 					// Image (2020): Removed as this is bad contributes to infinite loop
 					//SendOutOfOrderAck(seq);
-				} else if (check<0) {
+				} else if (check == SeqPast) {
 #ifdef EQN_DEBUG
 					LogWrite(PACKET__DEBUG, 1, "Packet", "*** Duplicate packet: Expecting Seq=%i, but got Seq=%i", NextInSeq, seq);
 					LogWrite(PACKET__DEBUG, 1, "Packet", "[Start]");
@@ -389,7 +399,7 @@ void EQStream::ProcessPacket(EQProtocolPacket *p)
 
 				uint16 seq=ntohs(*(uint16 *)(p->pBuffer));
 				sint8 check=CompareSequence(NextInSeq,seq);
-				if (check>0) {
+				if (check == SeqFuture) {
 #ifdef EQN_DEBUG
 					LogWrite(PACKET__DEBUG, 1, "Packet", "*** Future packet2: Expecting Seq=%i, but got Seq=%i", NextInSeq, seq);
 					LogWrite(PACKET__DEBUG, 1, "Packet", "[Start]");
@@ -398,7 +408,7 @@ void EQStream::ProcessPacket(EQProtocolPacket *p)
 #endif
 					OutOfOrderpackets[seq] = p->Copy();
 					//SendOutOfOrderAck(seq);
-				} else if (check<0) {
+				} else if (check == SeqPast) {
 #ifdef EQN_DEBUG
 					LogWrite(PACKET__DEBUG, 1, "Packet", "*** Duplicate packet2: Expecting Seq=%i, but got Seq=%i", NextInSeq, seq);
 					LogWrite(PACKET__DEBUG, 1, "Packet", "[Start]");
@@ -460,10 +470,12 @@ void EQStream::ProcessPacket(EQProtocolPacket *p)
 			case OP_Ack: {
 				if (!p->pBuffer || (p->Size() < 4))
 				{
+					LogWrite(PACKET__DEBUG, 9, "Packet", "Received OP_Ack that was of malformed size");
 					break;
 				}
-				uint16 seq=ntohs(*(uint16 *)(p->pBuffer));
-				SetMaxAckReceived(seq);
+				uint16 seq = ntohs(*(uint16*)(p->pBuffer));
+				AckPackets(seq);
+				retransmittimer = Timer::GetCurrentTime2();
 			}
 			break;
 			case OP_SessionRequest: {
@@ -538,14 +550,47 @@ void EQStream::ProcessPacket(EQProtocolPacket *p)
 			case OP_OutOfOrderAck: {
 				if (!p->pBuffer || (p->Size() < 4))
 				{
+					LogWrite(PACKET__DEBUG, 9, "Packet",  "Received OP_OutOfOrderAck that was of malformed size");
 					break;
 				}
-#ifndef COLLECTOR
-				uint16 seq=ntohs(*(uint16 *)(p->pBuffer));
-				if (CompareSequence(GetMaxAckReceived(),seq)>0 && CompareSequence(NextOutSeq,seq) < 0) {
-					SetLastSeqSent(GetMaxAckReceived());
+				uint16 seq = ntohs(*(uint16*)(p->pBuffer));
+				MOutboundQueue.lock();
+
+				if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) {
+					LogWrite(PACKET__DEBUG, 9, "Packet",  "Pre-OOA Invalid Sequenced queue: BS %u + SQ %u != NOS %u", SequencedBase, SequencedQueue.size(), NextOutSeq);
 				}
-#endif
+
+				//if the packet they got out of order is between our last acked packet and the last sent packet, then its valid.
+				if (CompareSequence(SequencedBase, seq) != SeqPast && CompareSequence(NextOutSeq, seq) == SeqPast) {
+					uint16 sqsize = SequencedQueue.size();
+					uint16 index = seq - SequencedBase;
+					LogWrite(PACKET__DEBUG, 9, "Packet",  "OP_OutOfOrderAck marking packet acked in queue (queue index = %u, queue size = %u)", index, sqsize);
+					if (index < sqsize) {
+						SequencedQueue[index]->acked = true;
+						// flag packets for a resend
+						uint16 count = 0;
+						uint32 timeout = AverageDelta * 2 + 100;
+						for (auto sitr = SequencedQueue.begin(); sitr != SequencedQueue.end() && count < index; ++sitr, ++count) {
+							if (!(*sitr)->acked && (*sitr)->sent_time > 0 && (((*sitr)->sent_time + timeout) < Timer::GetCurrentTime2())) {
+								(*sitr)->sent_time = 0;
+								LogWrite(PACKET__DEBUG, 9, "Packet",  "OP_OutOfOrderAck Flagging packet %u for retransmission", SequencedBase + count);
+							}
+						}
+					}
+
+					if (RETRANSMIT_TIMEOUT_MULT) {
+						retransmittimer = Timer::GetCurrentTime2();
+					}
+				}
+				else {
+					LogWrite(PACKET__DEBUG, 9, "Packet",  "Received OP_OutOfOrderAck for out-of-window %u. Window (%u->%u)", seq, SequencedBase, NextOutSeq);
+				}
+
+				if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) {
+					LogWrite(PACKET__DEBUG, 9, "Packet",  "Post-OOA Invalid Sequenced queue: BS %u + SQ %u != NOS %u", SequencedBase, SequencedQueue.size(), NextOutSeq);
+				}
+
+				MOutboundQueue.unlock();
 			}
 			break;
 			case OP_ServerKeyRequest:{
@@ -849,7 +894,7 @@ void EQStream::NonSequencedPush(EQProtocolPacket *p)
 {
 	p->setVersion(client_version);
 	MOutboundQueue.lock();
-	NonSequencedQueue.push_back(p);
+	NonSequencedQueue.push(p);
 	MOutboundQueue.unlock();
 }
 
@@ -951,12 +996,71 @@ void EQStream::CheckResend(int eq_fd){
 	MResendQue.unlock();
 }
 
+
+
+//returns SeqFuture if `seq` is later than `expected_seq`
+EQStream::SeqOrder EQStream::CompareSequence(uint16 expected_seq, uint16 seq)
+{
+	if (expected_seq == seq) {
+		// Curent
+		return SeqInOrder;
+	}
+	else if ((seq > expected_seq && (uint32)seq < ((uint32)expected_seq + EQStream::MaxWindowSize)) || seq < (expected_seq - EQStream::MaxWindowSize)) {
+		// Future
+		return SeqFuture;
+	}
+	else {
+		// Past
+		return SeqPast;
+	}
+}
+
+void EQStream::AckPackets(uint16 seq)
+{
+	std::deque<EQProtocolPacket*>::iterator itr, tmp;
+
+	MOutboundQueue.lock();
+
+	SeqOrder ord = CompareSequence(SequencedBase, seq);
+	if (ord == SeqInOrder) {
+		//they are not acking anything new...
+		LogWrite(PACKET__DEBUG, 9, "Packet",  "Received an ack with no window advancement (seq %u)", seq);
+	}
+	else if (ord == SeqPast) {
+		//they are nacking blocks going back before our buffer, wtf?
+		LogWrite(PACKET__DEBUG, 9, "Packet",  "Received an ack with backward window advancement (they gave %u, our window starts at %u). This is bad" , seq, SequencedBase);
+	}
+	else {
+		LogWrite(PACKET__DEBUG, 9, "Packet",  "Received an ack up through sequence %u. Our base is %u", seq, SequencedBase);
+
+
+		//this is a good ack, we get to ack some blocks.
+		seq++;	//we stop at the block right after their ack, counting on the wrap of both numbers.
+		while (SequencedBase != seq) {
+			if (SequencedQueue.empty()) {
+				LogWrite(PACKET__DEBUG, 9, "Packet",  "OUT OF PACKETS acked packet with sequence %u. Next send is %u before this", (unsigned long)SequencedBase, SequencedQueue.size());
+				SequencedBase = NextOutSeq;
+				break;
+			}
+			LogWrite(PACKET__DEBUG, 9, "Packet",  "Removing acked packet with sequence %u", (unsigned long)SequencedBase);
+			//clean out the acked packet
+			delete SequencedQueue.front();
+			SequencedQueue.pop_front();
+			//advance the base sequence number to the seq of the block after the one we just got rid of.
+			SequencedBase++;
+		}
+		if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) {
+			LogWrite(PACKET__DEBUG, 9, "Packet",  "Post-Ack on %u Invalid Sequenced queue: BS %u + SQ %u != NOS %u", seq, SequencedBase, SequencedQueue.size(), NextOutSeq);
+		}
+	}
+
+	MOutboundQueue.unlock();
+}
+
 void EQStream::Write(int eq_fd)
 {
-	deque<EQProtocolPacket *> ReadyToSend;
-	deque<EQProtocolPacket *> SeqReadyToSend;
+	queue<EQProtocolPacket *> ReadyToSend;
 	long maxack;
-	map<uint16, EQProtocolPacket *>::iterator sitr;
 
 	// Check our rate to make sure we can send more
 	MRate.lock();
@@ -988,83 +1092,148 @@ void EQStream::Write(int eq_fd)
 	MOutboundQueue.lock();
 
 	// Adjust where we start sending in case we get a late ack
-	if (maxack>LastSeqSent)
-		LastSeqSent=maxack;
+	//if (maxack>LastSeqSent)
+	//	LastSeqSent=maxack;
 
 	// Place to hold the base packet t combine into
 	EQProtocolPacket *p=NULL;
+	std::deque<EQProtocolPacket*>::iterator sitr;
+
+	// Find the next sequenced packet to send from the "queue"
+	sitr = SequencedQueue.begin();
 
+	uint16 count = 0;
+	// get to start of packets
+	while (sitr != SequencedQueue.end() && (*sitr)->sent_time > 0) {
+		++sitr;
+		++count;
+	}
+
+	bool SeqEmpty = false, NonSeqEmpty = false;
 	// Loop until both are empty or MaxSends is reached
-	
-	// See if there are more non-sequenced packets left
-	while (NonSequencedQueue.size()) {
-		if (!p) {
-			// If we don't have a packet to try to combine into, use this one as the base
-			// And remove it form the queue
-			p = NonSequencedQueue.front();
-			NonSequencedQueue.pop_front();
-		}
-		//Check if we have a packet to combine p with...
-		if (NonSequencedQueue.size()){
-			if (!p->combine(NonSequencedQueue.front())) {
-				// Tryint to combine this packet with the base didn't work (too big maybe)
+	while (!SeqEmpty || !NonSeqEmpty) {
+
+		// See if there are more non-sequenced packets left
+		if (!NonSequencedQueue.empty()) {
+			if (!p) {
+				// If we don't have a packet to try to combine into, use this one as the base
+				// And remove it form the queue
+				p = NonSequencedQueue.front();
+				LogWrite(PACKET__DEBUG, 9, "Packet", "Starting combined packet with non-seq packet of len %u",p->size);
+				NonSequencedQueue.pop();
+			}
+			else if (!p->combine(NonSequencedQueue.front())) {
+				// Trying to combine this packet with the base didn't work (too big maybe)
 				// So just send the base packet (we'll try this packet again later)
-				ReadyToSend.push_back(p);
+				LogWrite(PACKET__DEBUG, 9, "Packet", "Combined packet full at len %u, next non-seq packet is len %u", p->size, (NonSequencedQueue.front())->size);
+				ReadyToSend.push(p);
 				BytesWritten += p->size;
-				p = NULL;
+				p = nullptr;
+
+				if (BytesWritten > threshold) {
+					// Sent enough this round, lets stop to be fair
+					LogWrite(PACKET__DEBUG, 9, "Packet", "Exceeded write threshold in nonseq (%u > %u)", BytesWritten, threshold);
+					break;
+				}
 			}
 			else {
 				// Combine worked, so just remove this packet and it's spot in the queue
+				LogWrite(PACKET__DEBUG, 9, "Packet", "Combined non-seq packet of len %u, yeilding %u combined", (NonSequencedQueue.front())->size, p->size);
 				delete NonSequencedQueue.front();
-				NonSequencedQueue.pop_front();
+				NonSequencedQueue.pop();
 			}
 		}
 		else {
-			//We have no packets to combine p with so just send it...
-			ReadyToSend.push_back(p);
-			BytesWritten += p->size;
-			p = NULL;
-		}
-		if (BytesWritten > threshold) {
-			// Sent enough this round, lets stop to be fair
-			break;
+			// No more non-sequenced packets
+			NonSeqEmpty = true;
 		}
-	}
 
-	//The non-seq loop must have broke before we sent this packet, send it now
-	if (p){
-		ReadyToSend.push_back(p);
-		BytesWritten += p->size;
-	}
+		if (sitr != SequencedQueue.end()) {
+			uint16 seq_send = SequencedBase + count;	//just for logging...
 
-	if (SequencedQueue.size() && BytesWritten < threshold) {
-		while(SequencedQueue.size()) {
-			p = SequencedQueue.front();
-			BytesWritten+=p->size;
-			SeqReadyToSend.push_back(p);
-			p->sent_time = Timer::GetCurrentTime2();
-			resend_que.push_back(p);
-			SequencedQueue.pop_front();
-			LastSeqSent=p->sequence;
-			if (BytesWritten > threshold) {
-				break;
+			if (SequencedQueue.empty()) {
+				LogWrite(PACKET__DEBUG, 9, "Packet", "Tried to write a packet with an empty queue (%u is past next out %u)", seq_send, NextOutSeq);
+				SeqEmpty = true;
+				continue;
+			}
+
+				if ((*sitr)->acked || (*sitr)->sent_time != 0) {
+					++sitr;
+					++count;
+					if (p) {
+						LogWrite(PACKET__DEBUG, 9, "Packet", "Final combined packet not full, len %u", p->size);
+						ReadyToSend.push(p);
+						BytesWritten += p->size;
+						p = nullptr;
+					}
+					LogWrite(PACKET__DEBUG, 9, "Packet", "Not retransmitting seq packet %u because already marked as acked", seq_send);
+				}
+				else if (!p) {
+					// If we don't have a packet to try to combine into, use this one as the base
+					// Copy it first as it will still live until it is acked
+					p = (*sitr)->Copy();
+					LogWrite(PACKET__DEBUG, 9, "Packet", "Starting combined packet with seq packet %u of len %u", seq_send, p->size);
+					(*sitr)->sent_time = Timer::GetCurrentTime2();
+					++sitr;
+					++count;
+				}
+				else if (!p->combine(*sitr)) {
+					// Trying to combine this packet with the base didn't work (too big maybe)
+					// So just send the base packet (we'll try this packet again later)
+					LogWrite(PACKET__DEBUG, 9, "Packet", "Combined packet full at len %u, next seq packet %u is len %u", p->size, seq_send + 1, (*sitr)->size);
+					ReadyToSend.push(p);
+					BytesWritten += p->size;
+					p = nullptr;
+					if ((*sitr)->opcode != OP_Fragment && BytesWritten > threshold) {
+						// Sent enough this round, lets stop to be fair
+						LogWrite(PACKET__DEBUG, 9, "Packet", "Exceeded write threshold in seq (%u > %u)", BytesWritten, threshold);
+						break;
+					}
+				}
+				else {
+					// Combine worked
+					LogWrite(PACKET__DEBUG, 9, "Packet", "Combined seq packet %u of len %u, yeilding %u combined", seq_send, (*sitr)->size, p->size);
+					(*sitr)->sent_time = Timer::GetCurrentTime2();
+					++sitr;
+					++count;
+				}
+
+			if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) {
+				LogWrite(PACKET__DEBUG, 9, "Packet", "Post send Invalid Sequenced queue: BS %u + SQ %u != NOS %u", SequencedBase, SequencedQueue.size(), NextOutSeq);
 			}
 		}
+		else {
+			// No more sequenced packets
+			SeqEmpty = true;
+		}
+	}
+	MOutboundQueue.unlock();	// Unlock the queue
+
+	// We have a packet still, must have run out of both seq and non-seq, so send it
+	if (p) {
+		LogWrite(PACKET__DEBUG, 9, "Packet",  "Final combined packet not full, len %u", p->size);
+		ReadyToSend.push(p);
+		BytesWritten += p->size;
 	}
-			
-	// Unlock the queue
-	MOutboundQueue.unlock();
 
 	// Send all the packets we "made"
-	while(ReadyToSend.size()) {
-		WritePacket(eq_fd,ReadyToSend.front());
-		delete ReadyToSend.front();
-		ReadyToSend.pop_front();
+	while (!ReadyToSend.empty()) {
+		p = ReadyToSend.front();
+		WritePacket(eq_fd, p);
+		delete p;
+		ReadyToSend.pop();
 	}
 
-	while(SeqReadyToSend.size()){
-		WritePacket(eq_fd,SeqReadyToSend.front());
-		SeqReadyToSend.pop_front();
+	//see if we need to send our disconnect and finish our close
+	if (SeqEmpty && NonSeqEmpty) {
+		//no more data to send
+		if (GetState() == CLOSING) {
+			LogWrite(PACKET__DEBUG, 9, "Packet",  "All outgoing data flushed, closing stream");
+			//we are waiting for the queues to empty, now we can do our disconnect.
+			//this packet will not actually go out until the next call to Write().
+			SendDisconnect();
+			SetState(CLOSED);
+		}
 	}
 }
 
@@ -1258,7 +1427,7 @@ void EQStream::OutboundQueueClear()
 	MOutboundQueue.lock();
 	while(NonSequencedQueue.size()) {
 		delete NonSequencedQueue.front();
-		NonSequencedQueue.pop_front();
+		NonSequencedQueue.pop();
 	}
 	while(SequencedQueue.size()) {
 		delete SequencedQueue.front();
@@ -1425,20 +1594,6 @@ EQProtocolPacket* EQStream::RemoveQueue(uint16 seq)
 	return qp;
 }
 
-sint8 EQStream::CompareSequence(uint16 expected_seq , uint16 seq)
-{
-	if (expected_seq==seq) {
-		// Curent
-		return 0;
-	}  else if ((seq > expected_seq && (uint32)seq < ((uint32)expected_seq + EQStream::MaxWindowSize)) || seq < (expected_seq - EQStream::MaxWindowSize)) {
-		// Future
-		return 1;
-	} else {
-		// Past
-		return -1;
-	}
-}
-
 void EQStream::Decay()
 {
 	MRate.lock();
@@ -1449,14 +1604,30 @@ void EQStream::Decay()
 		if (BytesWritten<0)
 			BytesWritten=0;
 	}
+
+	int count = 0;
+	MOutboundQueue.lock();
+	for (auto sitr = SequencedQueue.begin(); sitr != SequencedQueue.end(); ++sitr, count++) {
+		if (!(*sitr)->acked && (*sitr)->sent_time > 0 && ((*sitr)->sent_time + retransmittimeout) < Timer::GetCurrentTime2()) {
+			(*sitr)->sent_time = 0;
+			LogWrite(PACKET__DEBUG, 9, "Packet", "Timeout exceeded for seq %u.  Flagging packet for retransmission", SequencedBase + count);
+		}
+	}
+	MOutboundQueue.unlock();
 }
 
 void EQStream::AdjustRates(uint32 average_delta)
 {
-	if (average_delta) {
+	if (average_delta && (average_delta <= AVERAGE_DELTA_MAX)) {
 		MRate.lock();
-		RateThreshold=RATEBASE/average_delta;
-		DecayRate=DECAYBASE/average_delta;
+		AverageDelta = average_delta;
+		RateThreshold = RATEBASE / average_delta;
+		DecayRate = DECAYBASE / average_delta;
+		if (BytesWritten > RateThreshold)
+			BytesWritten = RateThreshold + DecayRate;
 		MRate.unlock();
 	}
+	else {
+		AverageDelta = AVERAGE_DELTA_MAX;
+	}
 }

+ 35 - 5
EQ2/source/common/EQStream.h

@@ -23,6 +23,7 @@
 #include <string>
 #include <vector>
 #include <deque>
+#include <queue>
 
 #include <map>
 #include <set>
@@ -52,6 +53,18 @@ typedef enum {
 #define RATEBASE	1048576 // 1 MB
 #define DECAYBASE	78642	// RATEBASE/10
 
+#ifndef RETRANSMIT_TIMEOUT_MULT
+#define RETRANSMIT_TIMEOUT_MULT 3.0
+#endif
+
+#ifndef RETRANSMIT_TIMEOUT_MAX
+#define RETRANSMIT_TIMEOUT_MAX 5000
+#endif
+
+#ifndef AVERAGE_DELTA_MAX
+#define AVERAGE_DELTA_MAX 2500
+#endif
+
 #pragma pack(1)
 struct SessionRequest {
 	uint32 UnknownA;
@@ -115,6 +128,12 @@ typedef enum {
 
 class EQStream {
 	protected:
+		typedef enum {
+			SeqPast,
+			SeqInOrder,
+			SeqFuture
+		} SeqOrder;
+
 		uint32 received_packets;
 		uint32 sent_packets;
 		uint32 remote_ip;
@@ -125,6 +144,9 @@ class EQStream {
 		uint8 app_opcode_size;
 		EQStreamType StreamType;
 		bool compressed,encoded;
+
+		uint32 retransmittimer;
+		uint32 retransmittimeout;
 		//uint32 buffer_len;
 
 		uint16 sessionAttempts;
@@ -133,6 +155,7 @@ class EQStream {
 		uint32 Session, Key;
 		uint16 NextInSeq;
 		uint16 NextOutSeq;
+		uint16 SequencedBase;	//the sequence number of SequencedQueue[0]
 		uint32  MaxLen;
 		uint16 MaxSends;
 		int8 timeout_delays;
@@ -165,8 +188,8 @@ class EQStream {
 		Mutex MAcks;
 
 		// Packets waiting to be sent
-		deque<EQProtocolPacket *> NonSequencedQueue;
-		deque<EQProtocolPacket *> SequencedQueue;
+		queue<EQProtocolPacket*> NonSequencedQueue;
+		deque<EQProtocolPacket*> SequencedQueue;
 		map<uint16, EQProtocolPacket *> OutOfOrderpackets;
 		Mutex MOutboundQueue;
 
@@ -181,6 +204,7 @@ class EQStream {
 		Mutex MRate;
 		sint32 RateThreshold;
 		sint32 DecayRate;
+		uint32 AverageDelta;
 
 		EQStreamFactory *Factory;
 
@@ -205,6 +229,9 @@ class EQStream {
 		encoded = false; app_opcode_size = 2;}
 		EQStream(sockaddr_in addr);
 		virtual ~EQStream() { 
+			MOutboundQueue.lock();
+			SetState(CLOSED);
+			MOutboundQueue.unlock();
 			RemoveData(); 
 			safe_delete(crypto);
 			safe_delete(combine_timer);
@@ -243,6 +270,8 @@ class EQStream {
 		Mutex MCompressData;
 		deque<EQProtocolPacket*>resend_que;
 		void CheckResend(int eq_fd);
+
+		void AckPackets(uint16 seq);
 		void Write(int eq_fd);
 
 		void SetActive(bool val) { streamactive = val; }
@@ -287,9 +316,11 @@ class EQStream {
 		inline bool IsInUse() { bool flag; MInUse.lock(); flag=(active_users>0); MInUse.unlock(); return flag; }
 		inline void PutInUse() { MInUse.lock(); active_users++; MInUse.unlock(); }
 		inline void ReleaseFromUse() { MInUse.lock(); if(active_users > 0) active_users--; MInUse.unlock(); }
-		
+
+		static SeqOrder CompareSequence(uint16 expected_seq, uint16 seq);
+
 		inline EQStreamState GetState() { return State; }
-		inline void SetState(EQStreamState state) { State=state;  }
+		inline void SetState(EQStreamState state) { MState.lock();  State = state; MState.unlock(); }
 
 		inline uint32 GetRemoteIP() { return remote_ip; }
 		inline uint32 GetrIP() { return remote_ip; }
@@ -298,7 +329,6 @@ class EQStream {
 		
 
 		static EQProtocolPacket *Read(int eq_fd, sockaddr_in *from);
-		static sint8 CompareSequence(uint16 expected_seq , uint16 seq);
 
 		void Close() { SendDisconnect(); }
 		bool CheckActive() { return GetState()==ESTABLISHED; }