Implement window scaling for PseudoTCP.
git-svn-id: http://libjingle.googlecode.com/svn/trunk@77 dd674b97-3498-5ee5-1854-bdd07cd0ff33
diff --git a/talk/base/stream.cc b/talk/base/stream.cc
index 25dce7b..9fcb1a8 100644
--- a/talk/base/stream.cc
+++ b/talk/base/stream.cc
@@ -772,6 +772,18 @@
return true;
}
+StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
+ size_t offset, size_t* bytes_read) {
+ CritScope cs(&crit_);
+ return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
+}
+
+StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
+ size_t offset, size_t* bytes_written) {
+ CritScope cs(&crit_);
+ return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
+}
+
StreamState FifoBuffer::GetState() const {
return state_;
}
@@ -779,60 +791,48 @@
StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
size_t* bytes_read, int* error) {
CritScope cs(&crit_);
- const size_t available = data_length_;
- if (0 == available) {
- return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
- }
-
const bool was_writable = data_length_ < buffer_length_;
- const size_t copy = _min(bytes, available);
- const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
- char* const p = static_cast<char*>(buffer);
- memcpy(p, &buffer_[read_position_], tail_copy);
- memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
- read_position_ = (read_position_ + copy) % buffer_length_;
- data_length_ -= copy;
- if (bytes_read) {
- *bytes_read = copy;
- }
- // if we were full before, and now we're not, post an event
- if (!was_writable && copy > 0) {
- PostEvent(owner_, SE_WRITE, 0);
- }
+ size_t copy = 0;
+ StreamResult result = ReadOffsetLocked(buffer, bytes, 0, ©);
- return SR_SUCCESS;
+ if (result == SR_SUCCESS) {
+ // If read was successful then adjust the read position and number of
+ // bytes buffered.
+ read_position_ = (read_position_ + copy) % buffer_length_;
+ data_length_ -= copy;
+ if (bytes_read) {
+ *bytes_read = copy;
+ }
+
+ // if we were full before, and now we're not, post an event
+ if (!was_writable && copy > 0) {
+ PostEvent(owner_, SE_WRITE, 0);
+ }
+ }
+ return result;
}
StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
size_t* bytes_written, int* error) {
CritScope cs(&crit_);
- if (state_ == SS_CLOSED) {
- return SR_EOS;
- }
-
- const size_t available = buffer_length_ - data_length_;
- if (0 == available) {
- return SR_BLOCK;
- }
const bool was_readable = (data_length_ > 0);
- const size_t write_position = (read_position_ + data_length_)
- % buffer_length_;
- const size_t copy = _min(bytes, available);
- const size_t tail_copy = _min(copy, buffer_length_ - write_position);
- const char* const p = static_cast<const char*>(buffer);
- memcpy(&buffer_[write_position], p, tail_copy);
- memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
- data_length_ += copy;
- if (bytes_written) {
- *bytes_written = copy;
- }
- // if we didn't have any data to read before, and now we do, post an event
- if (!was_readable && copy > 0) {
- PostEvent(owner_, SE_READ, 0);
- }
+ size_t copy = 0;
+ StreamResult result = WriteOffsetLocked(buffer, bytes, 0, ©);
- return SR_SUCCESS;
+ if (result == SR_SUCCESS) {
+ // If write was successful then adjust the number of readable bytes.
+ data_length_ += copy;
+ if (bytes_written) {
+ *bytes_written = copy;
+ }
+
+ // if we didn't have any data to read before, and now we do, post an event
+ if (!was_readable && copy > 0) {
+ PostEvent(owner_, SE_READ, 0);
+ }
+ }
+ return result;
}
void FifoBuffer::Close() {
@@ -887,6 +887,63 @@
}
}
+bool FifoBuffer::GetWriteRemaining(size_t* size) const {
+ CritScope cs(&crit_);
+ *size = buffer_length_ - data_length_;
+ return true;
+}
+
+StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
+ size_t bytes,
+ size_t offset,
+ size_t* bytes_read) {
+ if (offset >= data_length_) {
+ return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
+ }
+
+ const size_t available = data_length_ - offset;
+ const size_t read_position = (read_position_ + offset) % buffer_length_;
+ const size_t copy = _min(bytes, available);
+ const size_t tail_copy = _min(copy, buffer_length_ - read_position);
+ char* const p = static_cast<char*>(buffer);
+ memcpy(p, &buffer_[read_position], tail_copy);
+ memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
+
+ if (bytes_read) {
+ *bytes_read = copy;
+ }
+ return SR_SUCCESS;
+}
+
+StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
+ size_t bytes,
+ size_t offset,
+ size_t* bytes_written) {
+ if (state_ == SS_CLOSED) {
+ return SR_EOS;
+ }
+
+ if (data_length_ + offset >= buffer_length_) {
+ return SR_BLOCK;
+ }
+
+ const size_t available = buffer_length_ - data_length_ - offset;
+ const size_t write_position = (read_position_ + data_length_ + offset)
+ % buffer_length_;
+ const size_t copy = _min(bytes, available);
+ const size_t tail_copy = _min(copy, buffer_length_ - write_position);
+ const char* const p = static_cast<const char*>(buffer);
+ memcpy(&buffer_[write_position], p, tail_copy);
+ memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
+
+ if (bytes_written) {
+ *bytes_written = copy;
+ }
+ return SR_SUCCESS;
+}
+
+
+
///////////////////////////////////////////////////////////////////////////////
// LoggingAdapter
///////////////////////////////////////////////////////////////////////////////
diff --git a/talk/base/stream.h b/talk/base/stream.h
index 37fcae7..0798741 100644
--- a/talk/base/stream.h
+++ b/talk/base/stream.h
@@ -562,6 +562,20 @@
// Resizes the buffer to the specified capacity. Fails if data_length_ > size
bool SetCapacity(size_t length);
+ // Read into |buffer| with an offset from the current read position, offset
+ // is specified in number of bytes.
+ // This method doesn't adjust read position nor the number of available
+ // bytes, user has to call ConsumeReadData() to do this.
+ StreamResult ReadOffset(void* buffer, size_t bytes, size_t offset,
+ size_t* bytes_read);
+
+ // Write |buffer| with an offset from the current write position, offset is
+ // specified in number of bytes.
+ // This method doesn't adjust the number of buffered bytes, user has to call
+ // ConsumeWriteBuffer() to do this.
+ StreamResult WriteOffset(const void* buffer, size_t bytes, size_t offset,
+ size_t* bytes_written);
+
// StreamInterface methods
virtual StreamState GetState() const;
virtual StreamResult Read(void* buffer, size_t bytes,
@@ -573,8 +587,19 @@
virtual void ConsumeReadData(size_t used);
virtual void* GetWriteBuffer(size_t *buf_len);
virtual void ConsumeWriteBuffer(size_t used);
+ virtual bool GetWriteRemaining(size_t* size) const;
private:
+ // Helper method that implements ReadOffset. Caller must acquire a lock
+ // when calling this method.
+ StreamResult ReadOffsetLocked(void* buffer, size_t bytes, size_t offset,
+ size_t* bytes_read);
+
+ // Helper method that implements WriteOffset. Caller must acquire a lock
+ // when calling this method.
+ StreamResult WriteOffsetLocked(const void* buffer, size_t bytes,
+ size_t offset, size_t* bytes_written);
+
StreamState state_; // keeps the opened/closed state of the stream
scoped_array<char> buffer_; // the allocated buffer
size_t buffer_length_; // size of the allocated buffer
diff --git a/talk/main.scons b/talk/main.scons
index de1d84b..0464252 100644
--- a/talk/main.scons
+++ b/talk/main.scons
@@ -523,6 +523,6 @@
print '***Unfortunately the vsproj creator isn\'t smart enough to '
print '***automatically get the correct output locations. It is very easy'
print '***though to change it in the properties pane to the following'
- print '***($SolutionDir)/build/<foo>/staging/<bar>.exe'
+ print '***$(SolutionDir)/build/<foo>/staging/<bar>.exe'
Default(None)
Default([s])
diff --git a/talk/p2p/base/pseudotcp.cc b/talk/p2p/base/pseudotcp.cc
index 685a39f..2e8cea3 100644
--- a/talk/p2p/base/pseudotcp.cc
+++ b/talk/p2p/base/pseudotcp.cc
@@ -29,8 +29,10 @@
#include <cstdio>
#include <cstdlib>
+#include <set>
#include "talk/base/basictypes.h"
+#include "talk/base/bytebuffer.h"
#include "talk/base/byteorder.h"
#include "talk/base/common.h"
#include "talk/base/logging.h"
@@ -83,6 +85,10 @@
// TODO: Make JINGLE_HEADER_SIZE transparent to this code?
const uint32 JINGLE_HEADER_SIZE = 64; // when relay framing is in use
+// Default size for receive and send buffer.
+const uint32 DEFAULT_RCV_BUF_SIZE = 60 * 1024;
+const uint32 DEFAULT_SND_BUF_SIZE = 90 * 1024;
+
//////////////////////////////////////////////////////////////////////
// Global Constants and Functions
//////////////////////////////////////////////////////////////////////
@@ -127,6 +133,12 @@
//const uint8 CTL_REDIRECT = 1;
const uint8 CTL_EXTRA = 255;
+// TCP options.
+const uint8 TCP_OPT_EOL = 0; // End of list.
+const uint8 TCP_OPT_NOOP = 1; // No-op.
+const uint8 TCP_OPT_MSS = 2; // Maximum segment size.
+const uint8 TCP_OPT_WND_SCALE = 3; // Window scale factor.
+
/*
const uint8 FLAG_FIN = 0x01;
const uint8 FLAG_SYN = 0x02;
@@ -219,19 +231,26 @@
}
PseudoTcp::PseudoTcp(IPseudoTcpNotify* notify, uint32 conv)
- : m_notify(notify), m_shutdown(SD_NONE), m_error(0) {
+ : m_notify(notify),
+ m_shutdown(SD_NONE),
+ m_error(0),
+ m_rbuf_len(DEFAULT_RCV_BUF_SIZE),
+ m_rbuf(m_rbuf_len),
+ m_sbuf_len(DEFAULT_SND_BUF_SIZE),
+ m_sbuf(m_sbuf_len) {
// Sanity check on buffer sizes (needed for OnTcpWriteable notification logic)
- ASSERT(sizeof(m_rbuf) + MIN_PACKET < sizeof(m_sbuf));
+ ASSERT(m_rbuf_len + MIN_PACKET < m_sbuf_len);
uint32 now = Now();
m_state = TCP_LISTEN;
m_conv = conv;
- m_rcv_wnd = sizeof(m_rbuf);
- m_snd_nxt = m_slen = 0;
+ m_rcv_wnd = m_rbuf_len;
+ m_rwnd_scale = m_swnd_scale = 0;
+ m_snd_nxt = 0;
m_snd_wnd = 1;
- m_snd_una = m_rcv_nxt = m_rlen = m_rpos = 0;
+ m_snd_una = m_rcv_nxt = 0;
m_bReadEnable = true;
m_bWriteEnable = false;
m_t_ack = 0;
@@ -245,7 +264,7 @@
m_rto_base = 0;
m_cwnd = 2 * m_mss;
- m_ssthresh = sizeof(m_rbuf);
+ m_ssthresh = m_rbuf_len;
m_lastrecv = m_lastsend = m_lasttraffic = now;
m_bOutgoing = false;
@@ -259,6 +278,7 @@
m_use_nagling = true;
m_ack_delay = DEF_ACK_DELAY;
+ m_support_wnd_scale = true;
}
PseudoTcp::~PseudoTcp() {
@@ -273,9 +293,7 @@
m_state = TCP_SYN_SENT;
LOG(LS_INFO) << "State: TCP_SYN_SENT";
- char buffer[1];
- buffer[0] = CTL_CONNECT;
- queue(buffer, 1, true);
+ queueConnectMessage();
attemptSend();
return 0;
@@ -375,16 +393,25 @@
*value = m_use_nagling ? 0 : 1;
} else if (opt == OPT_ACKDELAY) {
*value = m_ack_delay;
+ } else if (opt == OPT_SNDBUF) {
+ *value = m_sbuf_len;
+ } else if (opt == OPT_RCVBUF) {
+ *value = m_rbuf_len;
} else {
ASSERT(false);
}
}
-
void PseudoTcp::SetOption(Option opt, int value) {
if (opt == OPT_NODELAY) {
m_use_nagling = value == 0;
} else if (opt == OPT_ACKDELAY) {
m_ack_delay = value;
+ } else if (opt == OPT_SNDBUF) {
+ ASSERT(m_state == TCP_LISTEN);
+ resizeSendBuffer(value);
+ } else if (opt == OPT_RCVBUF) {
+ ASSERT(m_state == TCP_LISTEN);
+ resizeReceiveBuffer(value);
} else {
ASSERT(false);
}
@@ -400,23 +427,24 @@
return SOCKET_ERROR;
}
- // Make sure read position is correct.
- ASSERT(m_rpos <= m_rlen);
- if (m_rlen == m_rpos) {
+ size_t read = 0;
+ talk_base::StreamResult result = m_rbuf.Read(buffer, len, &read, NULL);
+
+ // If there's no data in |m_rbuf|.
+ if (result == talk_base::SR_BLOCK) {
m_bReadEnable = true;
m_error = EWOULDBLOCK;
return SOCKET_ERROR;
}
+ ASSERT(result == talk_base::SR_SUCCESS);
- uint32 read = talk_base::_min(uint32(len), m_rlen - m_rpos);
- memcpy(buffer, m_rbuf + m_rpos, read);
- m_rpos += read;
+ size_t available_space = 0;
+ m_rbuf.GetWriteRemaining(&available_space);
- if (getReceiveBufferSpace() - m_rcv_wnd >=
- talk_base::_min<uint32>(sizeof(m_rbuf) / 2, m_mss)) {
+ if (uint32(available_space) - m_rcv_wnd >=
+ talk_base::_min<uint32>(m_rbuf_len / 2, m_mss)) {
bool bWasClosed = (m_rcv_wnd == 0); // !?! Not sure about this was closed business
-
- m_rcv_wnd = getReceiveBufferSpace();
+ m_rcv_wnd = available_space;
if (bWasClosed) {
attemptSend(sfImmediateAck);
@@ -432,7 +460,10 @@
return SOCKET_ERROR;
}
- if (m_slen == sizeof(m_sbuf)) {
+ size_t available_space = 0;
+ m_sbuf.GetWriteRemaining(&available_space);
+
+ if (!available_space) {
m_bWriteEnable = true;
m_error = EWOULDBLOCK;
return SOCKET_ERROR;
@@ -457,9 +488,12 @@
//
uint32 PseudoTcp::queue(const char* data, uint32 len, bool bCtrl) {
- if (len > sizeof(m_sbuf) - m_slen) {
+ size_t available_space = 0;
+ m_sbuf.GetWriteRemaining(&available_space);
+
+ if (len > static_cast<uint32>(available_space)) {
ASSERT(!bCtrl);
- len = sizeof(m_sbuf) - m_slen;
+ len = static_cast<uint32>(available_space);
}
// We can concatenate data if the last segment is the same type
@@ -467,18 +501,19 @@
if (!m_slist.empty() && (m_slist.back().bCtrl == bCtrl) && (m_slist.back().xmit == 0)) {
m_slist.back().len += len;
} else {
- SSegment sseg(m_snd_una + m_slen, len, bCtrl);
+ size_t snd_buffered = 0;
+ m_sbuf.GetBuffered(&snd_buffered);
+ SSegment sseg(m_snd_una + snd_buffered, len, bCtrl);
m_slist.push_back(sseg);
}
- memcpy(m_sbuf + m_slen, data, len);
- m_slen += len;
- //LOG(LS_INFO) << "PseudoTcp::queue - m_slen = " << m_slen;
- return len;
+ size_t written = 0;
+ m_sbuf.Write(data, len, &written, NULL);
+ return written;
}
IPseudoTcpNotify::WriteResult PseudoTcp::packet(uint32 seq, uint8 flags,
- const char* data, uint32 len) {
+ uint32 offset, uint32 len) {
ASSERT(HEADER_SIZE + len <= MAX_PACKET);
uint32 now = Now();
@@ -489,14 +524,22 @@
long_to_bytes(m_rcv_nxt, buffer + 8);
buffer[12] = 0;
buffer[13] = flags;
- short_to_bytes(uint16(m_rcv_wnd), buffer + 14);
+ short_to_bytes(static_cast<uint16>(m_rcv_wnd >> m_rwnd_scale), buffer + 14);
// Timestamp computations
long_to_bytes(now, buffer + 16);
long_to_bytes(m_ts_recent, buffer + 20);
m_ts_lastack = m_rcv_nxt;
- memcpy(buffer + HEADER_SIZE, data, len);
+ if (len) {
+ size_t bytes_read = 0;
+ talk_base::StreamResult result = m_sbuf.ReadOffset(buffer + HEADER_SIZE,
+ len,
+ offset,
+ &bytes_read);
+ ASSERT(result == talk_base::SR_SUCCESS);
+ ASSERT(static_cast<uint32>(bytes_read) == len);
+ }
#if _DEBUGMSG >= _DBG_VERBOSE
LOG(LS_INFO) << "<-- <CONV=" << m_conv
@@ -510,10 +553,10 @@
#endif // _DEBUGMSG
IPseudoTcpNotify::WriteResult wres = m_notify->TcpWritePacket(this, reinterpret_cast<char *>(buffer), len + HEADER_SIZE);
- // Note: When data is NULL, this is an ACK packet. We don't read the return value for those,
+ // Note: When len is 0, this is an ACK packet. We don't read the return value for those,
// and thus we won't retry. So go ahead and treat the packet as a success (basically simulate
// as if it were dropped), which will prevent our timers from being messed up.
- if ((wres != IPseudoTcpNotify::WR_SUCCESS) && (NULL != data))
+ if ((wres != IPseudoTcpNotify::WR_SUCCESS) && (0 != len))
return wres;
m_t_ack = 0;
@@ -561,9 +604,11 @@
if (m_shutdown == SD_FORCEFUL)
return false;
+ size_t snd_buffered = 0;
+ m_sbuf.GetBuffered(&snd_buffered);
if ((m_shutdown == SD_GRACEFUL)
&& ((m_state != TCP_ESTABLISHED)
- || ((m_slen == 0) && (m_t_ack == 0)))) {
+ || ((snd_buffered == 0) && (m_t_ack == 0)))) {
return false;
}
@@ -628,13 +673,15 @@
return false;
} else if (seg.data[0] == CTL_CONNECT) {
bConnect = true;
+
+ // TCP options are in the remainder of the payload after CTL_CONNECT.
+ parseOptions(&seg.data[1], seg.len - 1);
+
if (m_state == TCP_LISTEN) {
m_state = TCP_SYN_RECEIVED;
LOG(LS_INFO) << "State: TCP_SYN_RECEIVED";
//m_notify->associate(addr);
- char buffer[1];
- buffer[0] = CTL_CONNECT;
- queue(buffer, 1, true);
+ queueConnectMessage();
} else if (m_state == TCP_SYN_SENT) {
m_state = TCP_ESTABLISHED;
LOG(LS_INFO) << "State: TCP_ESTABLISHED";
@@ -680,16 +727,14 @@
}
}
- m_snd_wnd = seg.wnd;
+ m_snd_wnd = static_cast<uint32>(seg.wnd) << m_swnd_scale;
uint32 nAcked = seg.ack - m_snd_una;
m_snd_una = seg.ack;
m_rto_base = (m_snd_una == m_snd_nxt) ? 0 : now;
- m_slen -= nAcked;
- memmove(m_sbuf, m_sbuf + nAcked, m_slen);
- //LOG(LS_INFO) << "PseudoTcp::process - m_slen = " << m_slen;
+ m_sbuf.ConsumeReadData(nAcked);
for (uint32 nFree = nAcked; nFree > 0; ) {
ASSERT(!m_slist.empty());
@@ -732,32 +777,9 @@
m_cwnd += talk_base::_max<uint32>(1, m_mss * m_mss / m_cwnd);
}
}
-
- // !?! A bit hacky
- if ((m_state == TCP_SYN_RECEIVED) && !bConnect) {
- m_state = TCP_ESTABLISHED;
- LOG(LS_INFO) << "State: TCP_ESTABLISHED";
- adjustMTU();
- if (m_notify) {
- m_notify->OnTcpOpen(this);
- }
- //notify(evOpen);
- }
-
- // If we make room in the send queue, notify the user
- // The goal it to make sure we always have at least enough data to fill the
- // window. We'd like to notify the app when we are halfway to that point.
- const uint32 kIdealRefillSize = (sizeof(m_sbuf) + sizeof(m_rbuf)) / 2;
- if (m_bWriteEnable && (m_slen < kIdealRefillSize)) {
- m_bWriteEnable = false;
- if (m_notify) {
- m_notify->OnTcpWriteable(this);
- }
- //notify(evWrite);
- }
} else if (seg.ack == m_snd_una) {
// !?! Note, tcp says don't do this... but otherwise how does a closed window become open?
- m_snd_wnd = seg.wnd;
+ m_snd_wnd = static_cast<uint32>(seg.wnd) << m_swnd_scale;
// Check duplicate acks
if (seg.len > 0) {
@@ -786,6 +808,31 @@
}
}
+ // !?! A bit hacky
+ if ((m_state == TCP_SYN_RECEIVED) && !bConnect) {
+ m_state = TCP_ESTABLISHED;
+ LOG(LS_INFO) << "State: TCP_ESTABLISHED";
+ adjustMTU();
+ if (m_notify) {
+ m_notify->OnTcpOpen(this);
+ }
+ //notify(evOpen);
+ }
+
+ // If we make room in the send queue, notify the user
+ // The goal it to make sure we always have at least enough data to fill the
+ // window. We'd like to notify the app when we are halfway to that point.
+ const uint32 kIdealRefillSize = (m_sbuf_len + m_rbuf_len) / 2;
+ size_t snd_buffered = 0;
+ m_sbuf.GetBuffered(&snd_buffered);
+ if (m_bWriteEnable && static_cast<uint32>(snd_buffered) < kIdealRefillSize) {
+ m_bWriteEnable = false;
+ if (m_notify) {
+ m_notify->OnTcpWriteable(this);
+ }
+ //notify(evWrite);
+ }
+
// Conditions were acks must be sent:
// 1) Segment is too old (they missed an ACK) (immediately)
// 2) Segment is too new (we missed a segment) (immediately)
@@ -823,8 +870,12 @@
seg.len = 0;
}
}
- if ((seg.seq + seg.len - m_rcv_nxt) > getReceiveBufferSpace()) {
- uint32 nAdjust = seg.seq + seg.len - m_rcv_nxt - getReceiveBufferSpace();
+
+ size_t available_space = 0;
+ m_rbuf.GetWriteRemaining(&available_space);
+
+ if ((seg.seq + seg.len - m_rcv_nxt) > static_cast<uint32>(available_space)) {
+ uint32 nAdjust = seg.seq + seg.len - m_rcv_nxt - static_cast<uint32>(available_space);
if (nAdjust < seg.len) {
seg.len -= nAdjust;
} else {
@@ -843,14 +894,12 @@
} else {
uint32 nOffset = seg.seq - m_rcv_nxt;
- if (getReceiveBufferConsecutiveSpace() < seg.len + nOffset) {
- consolidateReceiveBufferSpace();
- ASSERT(getReceiveBufferConsecutiveSpace() >= seg.len + nOffset);
- }
+ talk_base::StreamResult result = m_rbuf.WriteOffset(seg.data, seg.len,
+ nOffset, NULL);
+ ASSERT(result == talk_base::SR_SUCCESS);
- memcpy(m_rbuf + m_rlen + nOffset, seg.data, seg.len);
if (seg.seq == m_rcv_nxt) {
- m_rlen += seg.len;
+ m_rbuf.ConsumeWriteBuffer(seg.len);
m_rcv_nxt += seg.len;
m_rcv_wnd -= seg.len;
bNewData = true;
@@ -863,7 +912,7 @@
#if _DEBUGMSG >= _DBG_NORMAL
LOG(LS_INFO) << "Recovered " << nAdjust << " bytes (" << m_rcv_nxt << " -> " << m_rcv_nxt + nAdjust << ")";
#endif // _DEBUGMSG
- m_rlen += nAdjust;
+ m_rbuf.ConsumeWriteBuffer(nAdjust);
m_rcv_nxt += nAdjust;
m_rcv_wnd -= nAdjust;
}
@@ -910,8 +959,10 @@
while (true) {
uint32 seq = seg->seq;
uint8 flags = (seg->bCtrl ? FLAG_CTL : 0);
- const char* buffer = m_sbuf + (seg->seq - m_snd_una);
- IPseudoTcpNotify::WriteResult wres = this->packet(seq, flags, buffer, nTransmit);
+ IPseudoTcpNotify::WriteResult wres = packet(seq,
+ flags,
+ seg->seq - m_snd_una,
+ nTransmit);
if (wres == IPseudoTcpNotify::WR_SUCCESS)
break;
@@ -987,7 +1038,10 @@
uint32 nInFlight = m_snd_nxt - m_snd_una;
uint32 nUseable = (nInFlight < nWindow) ? (nWindow - nInFlight) : 0;
- uint32 nAvailable = talk_base::_min(m_slen - nInFlight, m_mss);
+ size_t snd_buffered = 0;
+ m_sbuf.GetBuffered(&snd_buffered);
+ uint32 nAvailable =
+ talk_base::_min(static_cast<uint32>(snd_buffered) - nInFlight, m_mss);
if (nAvailable > nUseable) {
if (nUseable * 4 < nWindow) {
@@ -1000,13 +1054,16 @@
#if _DEBUGMSG >= _DBG_VERBOSE
if (bFirst) {
+ size_t available_space = 0;
+ m_sbuf.GetWriteRemaining(&available_space);
+
bFirst = false;
LOG(LS_INFO) << "[cwnd: " << m_cwnd
<< " nWindow: " << nWindow
<< " nInFlight: " << nInFlight
<< " nAvailable: " << nAvailable
- << " nQueued: " << m_slen - nInFlight
- << " nEmpty: " << sizeof(m_sbuf) - m_slen
+ << " nQueued: " << snd_buffered
+ << " nEmpty: " << available_space
<< " ssthresh: " << m_ssthresh << "]";
}
#endif // _DEBUGMSG
@@ -1059,8 +1116,6 @@
void
PseudoTcp::closedown(uint32 err) {
- m_slen = 0;
-
LOG(LS_INFO) << "State: TCP_CLOSED";
m_state = TCP_CLOSED;
if (m_notify) {
@@ -1089,24 +1144,131 @@
bool
PseudoTcp::isReceiveBufferFull() const {
- return !getReceiveBufferSpace();
-}
-
-uint32
-PseudoTcp::getReceiveBufferSpace() const {
- return sizeof(m_rbuf) - m_rlen + m_rpos;
-}
-
-uint32
-PseudoTcp::getReceiveBufferConsecutiveSpace() const {
- return sizeof(m_rbuf) - m_rlen;
+ size_t available_space = 0;
+ m_rbuf.GetWriteRemaining(&available_space);
+ return !available_space;
}
void
-PseudoTcp::consolidateReceiveBufferSpace() {
- memmove(m_rbuf, m_rbuf + m_rpos, sizeof(m_rbuf) - m_rpos);
- m_rlen -= m_rpos;
- m_rpos = 0;
+PseudoTcp::disableWindowScale() {
+ m_support_wnd_scale = false;
+}
+
+void
+PseudoTcp::queueConnectMessage() {
+ talk_base::ByteBuffer buf(talk_base::ByteBuffer::ORDER_NETWORK);
+
+ buf.WriteUInt8(CTL_CONNECT);
+ if (m_support_wnd_scale) {
+ buf.WriteUInt8(TCP_OPT_WND_SCALE);
+ buf.WriteUInt8(1);
+ buf.WriteUInt8(m_rwnd_scale);
+ }
+ m_snd_wnd = buf.Length();
+ queue(buf.Data(), buf.Length(), true);
+}
+
+void
+PseudoTcp::parseOptions(const char* data, uint32 len) {
+ std::set<uint8> options_specified;
+
+ // See http://www.freesoft.org/CIE/Course/Section4/8.htm for
+ // parsing the options list.
+ talk_base::ByteBuffer buf(data, len);
+ while (buf.Length()) {
+ uint8 kind = TCP_OPT_EOL;
+ buf.ReadUInt8(&kind);
+
+ if (kind == TCP_OPT_EOL) {
+ // End of option list.
+ break;
+ } else if (kind == TCP_OPT_NOOP) {
+ // No op.
+ continue;
+ }
+
+ // Length of this option.
+ ASSERT(len);
+ uint8 opt_len = 0;
+ buf.ReadUInt8(&opt_len);
+
+ // Content of this option.
+ if (opt_len <= buf.Length()) {
+ applyOption(kind, buf.Data(), opt_len);
+ buf.Consume(opt_len);
+ } else {
+ LOG(LS_ERROR) << "Invalid option length received.";
+ return;
+ }
+ options_specified.insert(kind);
+ }
+
+ if (options_specified.find(TCP_OPT_WND_SCALE) == options_specified.end()) {
+ LOG(LS_WARNING) << "Peer doesn't support window scaling";
+
+ if (m_rwnd_scale > 0) {
+ // Peer doesn't support TCP options and window scaling.
+ // Revert receive buffer size to default value.
+ resizeReceiveBuffer(DEFAULT_RCV_BUF_SIZE);
+ m_swnd_scale = 0;
+ }
+ }
+}
+
+void
+PseudoTcp::applyOption(char kind, const char* data, uint32 len) {
+ if (kind == TCP_OPT_MSS) {
+ LOG(LS_WARNING) << "Peer specified MSS option which is not supported.";
+ // TODO: Implement.
+ } else if (kind == TCP_OPT_WND_SCALE) {
+ // Window scale factor.
+ // http://www.ietf.org/rfc/rfc1323.txt
+ if (len != 1) {
+ LOG_F(WARNING) << "Invalid window scale option received.";
+ return;
+ }
+ applyWindowScaleOption(data[0]);
+ }
+}
+
+void
+PseudoTcp::applyWindowScaleOption(uint8 scale_factor) {
+ m_swnd_scale = scale_factor;
+}
+
+void
+PseudoTcp::resizeSendBuffer(uint32 new_size) {
+ m_sbuf_len = new_size;
+ m_sbuf.SetCapacity(new_size);
+}
+
+void
+PseudoTcp::resizeReceiveBuffer(uint32 new_size) {
+ uint8 scale_factor = 0;
+
+ // Determine the scale factor such that the scaled window size can fit
+ // in a 16-bit unsigned integer.
+ while (new_size > 0xFFFF) {
+ ++scale_factor;
+ new_size >>= 1;
+ }
+
+ // Determine the proper size of the buffer.
+ new_size <<= scale_factor;
+ bool result = m_rbuf.SetCapacity(new_size);
+
+ // Make sure the new buffer is large enough to contain data in the old
+ // buffer. This should always be true because this method is called either
+ // before connection is established or when peers are exchanging connect
+ // messages.
+ ASSERT(result);
+ m_rbuf_len = new_size;
+ m_rwnd_scale = scale_factor;
+ m_ssthresh = new_size;
+
+ size_t available_space = 0;
+ m_rbuf.GetWriteRemaining(&available_space);
+ m_rcv_wnd = available_space;
}
} // namespace cricket
diff --git a/talk/p2p/base/pseudotcp.h b/talk/p2p/base/pseudotcp.h
index fd11c86..f07a555 100644
--- a/talk/p2p/base/pseudotcp.h
+++ b/talk/p2p/base/pseudotcp.h
@@ -31,6 +31,7 @@
#include <list>
#include "talk/base/basictypes.h"
+#include "talk/base/stream.h"
namespace cricket {
@@ -95,24 +96,20 @@
// Call these to get/set option values to tailor this PseudoTcp
// instance's behaviour for the kind of data it will carry.
// If an unrecognized option is set or got, an assertion will fire.
+ //
+ // Setting options for OPT_RCVBUF or OPT_SNDBUF after Connect() is called
+ // will result in an assertion.
enum Option {
OPT_NODELAY, // Whether to enable Nagle's algorithm (0 == off)
OPT_ACKDELAY, // The Delayed ACK timeout (0 == off).
- //kOptRcvBuf, // Set the receive buffer size, in bytes.
- //kOptSndBuf, // Set the send buffer size, in bytes.
+ OPT_RCVBUF, // Set the receive buffer size, in bytes.
+ OPT_SNDBUF, // Set the send buffer size, in bytes.
};
void GetOption(Option opt, int* value);
void SetOption(Option opt, int value);
protected:
enum SendFlags { sfNone, sfDelayedAck, sfImmediateAck };
- enum {
- // Note: can't go as high as 1024 * 64, because of uint16 precision
- kRcvBufSize = 1024 * 60,
- // Note: send buffer should be larger to make sure we can always fill the
- // receiver window
- kSndBufSize = 1024 * 90
- };
struct Segment {
uint32 conv, seq, ack;
@@ -140,8 +137,16 @@
uint32 queue(const char* data, uint32 len, bool bCtrl);
+ // Creates a packet and submits it to the network. This method can either
+ // send payload or just an ACK packet.
+ //
+ // |seq| is the sequence number of this packet.
+ // |flags| is the flags for sending this packet.
+ // |offset| is the offset to read from |m_sbuf|.
+ // |len| is the number of bytes to read from |m_sbuf| as payload. If this
+ // value is 0 then this is an ACK packet, otherwise this packet has payload.
IPseudoTcpNotify::WriteResult packet(uint32 seq, uint8 flags,
- const char* data, uint32 len);
+ uint32 offset, uint32 len);
bool parse(const uint8* buffer, uint32 size);
void attemptSend(SendFlags sflags = sfNone);
@@ -159,15 +164,29 @@
// This method is used in test only to query receive buffer state.
bool isReceiveBufferFull() const;
+ // This method is only used in tests, to disable window scaling
+ // support for testing backward compatibility.
+ void disableWindowScale();
+
private:
- // Get the total number of bytes of free space in m_rbuf, consecutive or not.
- uint32 getReceiveBufferSpace() const;
+ // Queue the connect message with TCP options.
+ void queueConnectMessage();
- // Get the number of bytes that can be written to m_rbuf.
- uint32 getReceiveBufferConsecutiveSpace() const;
+ // Parse TCP options in the header.
+ void parseOptions(const char* data, uint32 len);
- // Consolidate free space in m_rbuf so that it is a consecutive segment.
- void consolidateReceiveBufferSpace();
+ // Apply a TCP option that has been read from the header.
+ void applyOption(char kind, const char* data, uint32 len);
+
+ // Apply window scale option.
+ void applyWindowScaleOption(uint8 scale_factor);
+
+ // Resize the send buffer with |new_size| in bytes.
+ void resizeSendBuffer(uint32 new_size);
+
+ // Resize the receive buffer with |new_size| in bytes. This call adjusts
+ // window scale factor |m_swnd_scale| accordingly.
+ void resizeReceiveBuffer(uint32 new_size);
IPseudoTcpNotify* m_notify;
enum Shutdown { SD_NONE, SD_GRACEFUL, SD_FORCEFUL } m_shutdown;
@@ -182,13 +201,16 @@
// Incoming data
typedef std::list<RSegment> RList;
RList m_rlist;
- char m_rbuf[kRcvBufSize];
- uint32 m_rcv_nxt, m_rcv_wnd, m_rpos, m_rlen, m_lastrecv;
+ uint32 m_rbuf_len, m_rcv_nxt, m_rcv_wnd, m_lastrecv;
+ uint8 m_rwnd_scale; // Window scale factor.
+ talk_base::FifoBuffer m_rbuf;
// Outgoing data
SList m_slist;
- char m_sbuf[kSndBufSize];
- uint32 m_snd_nxt, m_snd_wnd, m_slen, m_lastsend, m_snd_una;
+ uint32 m_sbuf_len, m_snd_nxt, m_snd_wnd, m_lastsend, m_snd_una;
+ uint8 m_swnd_scale; // Window scale factor.
+ talk_base::FifoBuffer m_sbuf;
+
// Maximum segment size, estimated protocol level, largest segment sent
uint32 m_mss, m_msslevel, m_largest, m_mtu_advise;
// Retransmit timer
@@ -209,6 +231,10 @@
// Configuration options
bool m_use_nagling;
uint32 m_ack_delay;
+
+ // This is used by unit tests to test backward compatibility of
+ // PseudoTcp implementations that don't support window scaling.
+ bool m_support_wnd_scale;
};
} // namespace cricket