Implement window scaling for PseudoTCP.

git-svn-id: http://libjingle.googlecode.com/svn/trunk@77 dd674b97-3498-5ee5-1854-bdd07cd0ff33
diff --git a/talk/base/stream.cc b/talk/base/stream.cc
index 25dce7b..9fcb1a8 100644
--- a/talk/base/stream.cc
+++ b/talk/base/stream.cc
@@ -772,6 +772,18 @@
   return true;
 }
 
+StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
+                                    size_t offset, size_t* bytes_read) {
+  CritScope cs(&crit_);
+  return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
+}
+
+StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
+                                     size_t offset, size_t* bytes_written) {
+  CritScope cs(&crit_);
+  return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
+}
+
 StreamState FifoBuffer::GetState() const {
   return state_;
 }
@@ -779,60 +791,48 @@
 StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
                               size_t* bytes_read, int* error) {
   CritScope cs(&crit_);
-  const size_t available = data_length_;
-  if (0 == available) {
-    return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
-  }
-
   const bool was_writable = data_length_ < buffer_length_;
-  const size_t copy = _min(bytes, available);
-  const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
-  char* const p = static_cast<char*>(buffer);
-  memcpy(p, &buffer_[read_position_], tail_copy);
-  memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
-  read_position_ = (read_position_ + copy) % buffer_length_;
-  data_length_ -= copy;
-  if (bytes_read) {
-    *bytes_read = copy;
-  }
-  // if we were full before, and now we're not, post an event
-  if (!was_writable && copy > 0) {
-    PostEvent(owner_, SE_WRITE, 0);
-  }
+  size_t copy = 0;
+  StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
 
-  return SR_SUCCESS;
+  if (result == SR_SUCCESS) {
+    // If read was successful then adjust the read position and number of
+    // bytes buffered.
+    read_position_ = (read_position_ + copy) % buffer_length_;
+    data_length_ -= copy;
+    if (bytes_read) {
+      *bytes_read = copy;
+    }
+
+    // if we were full before, and now we're not, post an event
+    if (!was_writable && copy > 0) {
+      PostEvent(owner_, SE_WRITE, 0);
+    }
+  }
+  return result;
 }
 
 StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
                                size_t* bytes_written, int* error) {
   CritScope cs(&crit_);
-  if (state_ == SS_CLOSED) {
-    return SR_EOS;
-  }
-
-  const size_t available = buffer_length_ - data_length_;
-  if (0 == available) {
-    return SR_BLOCK;
-  }
 
   const bool was_readable = (data_length_ > 0);
-  const size_t write_position = (read_position_ + data_length_)
-      % buffer_length_;
-  const size_t copy = _min(bytes, available);
-  const size_t tail_copy = _min(copy, buffer_length_ - write_position);
-  const char* const p = static_cast<const char*>(buffer);
-  memcpy(&buffer_[write_position], p, tail_copy);
-  memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
-  data_length_ += copy;
-  if (bytes_written) {
-    *bytes_written = copy;
-  }
-  // if we didn't have any data to read before, and now we do, post an event
-  if (!was_readable && copy > 0) {
-    PostEvent(owner_, SE_READ, 0);
-  }
+  size_t copy = 0;
+  StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
 
-  return SR_SUCCESS;
+  if (result == SR_SUCCESS) {
+    // If write was successful then adjust the number of readable bytes.
+    data_length_ += copy;
+    if (bytes_written) {
+      *bytes_written = copy;
+    }
+
+    // if we didn't have any data to read before, and now we do, post an event
+    if (!was_readable && copy > 0) {
+      PostEvent(owner_, SE_READ, 0);
+    }
+  }
+  return result;
 }
 
 void FifoBuffer::Close() {
@@ -887,6 +887,63 @@
   }
 }
 
+bool FifoBuffer::GetWriteRemaining(size_t* size) const {
+  CritScope cs(&crit_);
+  *size = buffer_length_ - data_length_;
+  return true;
+}
+
+StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
+                                          size_t bytes,
+                                          size_t offset,
+                                          size_t* bytes_read) {
+  if (offset >= data_length_) {
+    return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
+  }
+
+  const size_t available = data_length_ - offset;
+  const size_t read_position = (read_position_ + offset) % buffer_length_;
+  const size_t copy = _min(bytes, available);
+  const size_t tail_copy = _min(copy, buffer_length_ - read_position);
+  char* const p = static_cast<char*>(buffer);
+  memcpy(p, &buffer_[read_position], tail_copy);
+  memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
+
+  if (bytes_read) {
+    *bytes_read = copy;
+  }
+  return SR_SUCCESS;
+}
+
+StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
+                                           size_t bytes,
+                                           size_t offset,
+                                           size_t* bytes_written) {
+  if (state_ == SS_CLOSED) {
+    return SR_EOS;
+  }
+
+  if (data_length_ + offset >= buffer_length_) {
+    return SR_BLOCK;
+  }
+
+  const size_t available = buffer_length_ - data_length_ - offset;
+  const size_t write_position = (read_position_ + data_length_ + offset)
+      % buffer_length_;
+  const size_t copy = _min(bytes, available);
+  const size_t tail_copy = _min(copy, buffer_length_ - write_position);
+  const char* const p = static_cast<const char*>(buffer);
+  memcpy(&buffer_[write_position], p, tail_copy);
+  memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
+
+  if (bytes_written) {
+    *bytes_written = copy;
+  }
+  return SR_SUCCESS;
+}
+
+
+
 ///////////////////////////////////////////////////////////////////////////////
 // LoggingAdapter
 ///////////////////////////////////////////////////////////////////////////////
diff --git a/talk/base/stream.h b/talk/base/stream.h
index 37fcae7..0798741 100644
--- a/talk/base/stream.h
+++ b/talk/base/stream.h
@@ -562,6 +562,20 @@
   // Resizes the buffer to the specified capacity. Fails if data_length_ > size
   bool SetCapacity(size_t length);
 
+  // Read into |buffer| with an offset from the current read position, offset
+  // is specified in number of bytes.
+  // This method doesn't adjust read position nor the number of available
+  // bytes, user has to call ConsumeReadData() to do this.
+  StreamResult ReadOffset(void* buffer, size_t bytes, size_t offset,
+                          size_t* bytes_read);
+
+  // Write |buffer| with an offset from the current write position, offset is
+  // specified in number of bytes.
+  // This method doesn't adjust the number of buffered bytes, user has to call
+  // ConsumeWriteBuffer() to do this.
+  StreamResult WriteOffset(const void* buffer, size_t bytes, size_t offset,
+                           size_t* bytes_written);
+
   // StreamInterface methods
   virtual StreamState GetState() const;
   virtual StreamResult Read(void* buffer, size_t bytes,
@@ -573,8 +587,19 @@
   virtual void ConsumeReadData(size_t used);
   virtual void* GetWriteBuffer(size_t *buf_len);
   virtual void ConsumeWriteBuffer(size_t used);
+  virtual bool GetWriteRemaining(size_t* size) const;
 
  private:
+  // Helper method that implements ReadOffset. Caller must acquire a lock
+  // when calling this method.
+  StreamResult ReadOffsetLocked(void* buffer, size_t bytes, size_t offset,
+                                size_t* bytes_read);
+
+  // Helper method that implements WriteOffset. Caller must acquire a lock
+  // when calling this method.
+  StreamResult WriteOffsetLocked(const void* buffer, size_t bytes,
+                                 size_t offset, size_t* bytes_written);
+
   StreamState state_;  // keeps the opened/closed state of the stream
   scoped_array<char> buffer_;  // the allocated buffer
   size_t buffer_length_;  // size of the allocated buffer
diff --git a/talk/main.scons b/talk/main.scons
index de1d84b..0464252 100644
--- a/talk/main.scons
+++ b/talk/main.scons
@@ -523,6 +523,6 @@
   print '***Unfortunately the vsproj creator isn\'t smart enough to '
   print '***automatically get the correct output locations.  It is very easy'
   print '***though to change it in the properties pane to the following'
-  print '***($SolutionDir)/build/<foo>/staging/<bar>.exe'
+  print '***$(SolutionDir)/build/<foo>/staging/<bar>.exe'
   Default(None)
   Default([s])
diff --git a/talk/p2p/base/pseudotcp.cc b/talk/p2p/base/pseudotcp.cc
index 685a39f..2e8cea3 100644
--- a/talk/p2p/base/pseudotcp.cc
+++ b/talk/p2p/base/pseudotcp.cc
@@ -29,8 +29,10 @@
 
 #include <cstdio>
 #include <cstdlib>
+#include <set>
 
 #include "talk/base/basictypes.h"
+#include "talk/base/bytebuffer.h"
 #include "talk/base/byteorder.h"
 #include "talk/base/common.h"
 #include "talk/base/logging.h"
@@ -83,6 +85,10 @@
 // TODO: Make JINGLE_HEADER_SIZE transparent to this code?
 const uint32 JINGLE_HEADER_SIZE = 64; // when relay framing is in use
 
+// Default size for receive and send buffer.
+const uint32 DEFAULT_RCV_BUF_SIZE = 60 * 1024;
+const uint32 DEFAULT_SND_BUF_SIZE = 90 * 1024;
+
 //////////////////////////////////////////////////////////////////////
 // Global Constants and Functions
 //////////////////////////////////////////////////////////////////////
@@ -127,6 +133,12 @@
 //const uint8 CTL_REDIRECT = 1;
 const uint8 CTL_EXTRA = 255;
 
+// TCP options.
+const uint8 TCP_OPT_EOL = 0;  // End of list.
+const uint8 TCP_OPT_NOOP = 1;  // No-op.
+const uint8 TCP_OPT_MSS = 2;  // Maximum segment size.
+const uint8 TCP_OPT_WND_SCALE = 3;  // Window scale factor.
+
 /*
 const uint8 FLAG_FIN = 0x01;
 const uint8 FLAG_SYN = 0x02;
@@ -219,19 +231,26 @@
 }
 
 PseudoTcp::PseudoTcp(IPseudoTcpNotify* notify, uint32 conv)
-    : m_notify(notify), m_shutdown(SD_NONE), m_error(0) {
+    : m_notify(notify),
+      m_shutdown(SD_NONE),
+      m_error(0),
+      m_rbuf_len(DEFAULT_RCV_BUF_SIZE),
+      m_rbuf(m_rbuf_len),
+      m_sbuf_len(DEFAULT_SND_BUF_SIZE),
+      m_sbuf(m_sbuf_len) {
 
   // Sanity check on buffer sizes (needed for OnTcpWriteable notification logic)
-  ASSERT(sizeof(m_rbuf) + MIN_PACKET < sizeof(m_sbuf));
+  ASSERT(m_rbuf_len + MIN_PACKET < m_sbuf_len);
 
   uint32 now = Now();
 
   m_state = TCP_LISTEN;
   m_conv = conv;
-  m_rcv_wnd = sizeof(m_rbuf);
-  m_snd_nxt = m_slen = 0;
+  m_rcv_wnd = m_rbuf_len;
+  m_rwnd_scale = m_swnd_scale = 0;
+  m_snd_nxt = 0;
   m_snd_wnd = 1;
-  m_snd_una = m_rcv_nxt = m_rlen = m_rpos = 0;
+  m_snd_una = m_rcv_nxt = 0;
   m_bReadEnable = true;
   m_bWriteEnable = false;
   m_t_ack = 0;
@@ -245,7 +264,7 @@
   m_rto_base = 0;
 
   m_cwnd = 2 * m_mss;
-  m_ssthresh = sizeof(m_rbuf);
+  m_ssthresh = m_rbuf_len;
   m_lastrecv = m_lastsend = m_lasttraffic = now;
   m_bOutgoing = false;
 
@@ -259,6 +278,7 @@
 
   m_use_nagling = true;
   m_ack_delay = DEF_ACK_DELAY;
+  m_support_wnd_scale = true;
 }
 
 PseudoTcp::~PseudoTcp() {
@@ -273,9 +293,7 @@
   m_state = TCP_SYN_SENT;
   LOG(LS_INFO) << "State: TCP_SYN_SENT";
 
-  char buffer[1];
-  buffer[0] = CTL_CONNECT;
-  queue(buffer, 1, true);
+  queueConnectMessage();
   attemptSend();
 
   return 0;
@@ -375,16 +393,25 @@
     *value = m_use_nagling ? 0 : 1;
   } else if (opt == OPT_ACKDELAY) {
     *value = m_ack_delay;
+  } else if (opt == OPT_SNDBUF) {
+    *value = m_sbuf_len;
+  } else if (opt == OPT_RCVBUF) {
+    *value = m_rbuf_len;
   } else {
     ASSERT(false);
   }
 }
-
 void PseudoTcp::SetOption(Option opt, int value) {
   if (opt == OPT_NODELAY) {
     m_use_nagling = value == 0;
   } else if (opt == OPT_ACKDELAY) {
     m_ack_delay = value;
+  } else if (opt == OPT_SNDBUF) {
+    ASSERT(m_state == TCP_LISTEN);
+    resizeSendBuffer(value);
+  } else if (opt == OPT_RCVBUF) {
+    ASSERT(m_state == TCP_LISTEN);
+    resizeReceiveBuffer(value);
   } else {
     ASSERT(false);
   }
@@ -400,23 +427,24 @@
     return SOCKET_ERROR;
   }
 
-  // Make sure read position is correct.
-  ASSERT(m_rpos <= m_rlen);
-  if (m_rlen == m_rpos) {
+  size_t read = 0;
+  talk_base::StreamResult result = m_rbuf.Read(buffer, len, &read, NULL);
+
+  // If there's no data in |m_rbuf|.
+  if (result == talk_base::SR_BLOCK) {
     m_bReadEnable = true;
     m_error = EWOULDBLOCK;
     return SOCKET_ERROR;
   }
+  ASSERT(result == talk_base::SR_SUCCESS);
 
-  uint32 read = talk_base::_min(uint32(len), m_rlen - m_rpos);
-  memcpy(buffer, m_rbuf + m_rpos, read);
-  m_rpos += read;
+  size_t available_space = 0;
+  m_rbuf.GetWriteRemaining(&available_space);
 
-  if (getReceiveBufferSpace() - m_rcv_wnd >=
-      talk_base::_min<uint32>(sizeof(m_rbuf) / 2, m_mss)) {
+  if (uint32(available_space) - m_rcv_wnd >=
+      talk_base::_min<uint32>(m_rbuf_len / 2, m_mss)) {
     bool bWasClosed = (m_rcv_wnd == 0); // !?! Not sure about this was closed business
-
-    m_rcv_wnd = getReceiveBufferSpace();
+    m_rcv_wnd = available_space;
 
     if (bWasClosed) {
       attemptSend(sfImmediateAck);
@@ -432,7 +460,10 @@
     return SOCKET_ERROR;
   }
 
-  if (m_slen == sizeof(m_sbuf)) {
+  size_t available_space = 0;
+  m_sbuf.GetWriteRemaining(&available_space);
+
+  if (!available_space) {
     m_bWriteEnable = true;
     m_error = EWOULDBLOCK;
     return SOCKET_ERROR;
@@ -457,9 +488,12 @@
 //
 
 uint32 PseudoTcp::queue(const char* data, uint32 len, bool bCtrl) {
-  if (len > sizeof(m_sbuf) - m_slen) {
+  size_t available_space = 0;
+  m_sbuf.GetWriteRemaining(&available_space);
+
+  if (len > static_cast<uint32>(available_space)) {
     ASSERT(!bCtrl);
-    len = sizeof(m_sbuf) - m_slen;
+    len = static_cast<uint32>(available_space);
   }
 
   // We can concatenate data if the last segment is the same type
@@ -467,18 +501,19 @@
   if (!m_slist.empty() && (m_slist.back().bCtrl == bCtrl) && (m_slist.back().xmit == 0)) {
     m_slist.back().len += len;
   } else {
-    SSegment sseg(m_snd_una + m_slen, len, bCtrl);
+    size_t snd_buffered = 0;
+    m_sbuf.GetBuffered(&snd_buffered);
+    SSegment sseg(m_snd_una + snd_buffered, len, bCtrl);
     m_slist.push_back(sseg);
   }
 
-  memcpy(m_sbuf + m_slen, data, len);
-  m_slen += len;
-  //LOG(LS_INFO) << "PseudoTcp::queue - m_slen = " << m_slen;
-  return len;
+  size_t written = 0;
+  m_sbuf.Write(data, len, &written, NULL);
+  return written;
 }
 
 IPseudoTcpNotify::WriteResult PseudoTcp::packet(uint32 seq, uint8 flags,
-                                                const char* data, uint32 len) {
+                                                uint32 offset, uint32 len) {
   ASSERT(HEADER_SIZE + len <= MAX_PACKET);
 
   uint32 now = Now();
@@ -489,14 +524,22 @@
   long_to_bytes(m_rcv_nxt, buffer + 8);
   buffer[12] = 0;
   buffer[13] = flags;
-  short_to_bytes(uint16(m_rcv_wnd), buffer + 14);
+  short_to_bytes(static_cast<uint16>(m_rcv_wnd >> m_rwnd_scale), buffer + 14);
 
   // Timestamp computations
   long_to_bytes(now, buffer + 16);
   long_to_bytes(m_ts_recent, buffer + 20);
   m_ts_lastack = m_rcv_nxt;
 
-  memcpy(buffer + HEADER_SIZE, data, len);
+  if (len) {
+    size_t bytes_read = 0;
+    talk_base::StreamResult result = m_sbuf.ReadOffset(buffer + HEADER_SIZE,
+                                                       len,
+                                                       offset,
+                                                       &bytes_read);
+    ASSERT(result == talk_base::SR_SUCCESS);
+    ASSERT(static_cast<uint32>(bytes_read) == len);
+  }
 
 #if _DEBUGMSG >= _DBG_VERBOSE
   LOG(LS_INFO) << "<-- <CONV=" << m_conv
@@ -510,10 +553,10 @@
 #endif // _DEBUGMSG
 
   IPseudoTcpNotify::WriteResult wres = m_notify->TcpWritePacket(this, reinterpret_cast<char *>(buffer), len + HEADER_SIZE);
-  // Note: When data is NULL, this is an ACK packet.  We don't read the return value for those,
+  // Note: When len is 0, this is an ACK packet.  We don't read the return value for those,
   // and thus we won't retry.  So go ahead and treat the packet as a success (basically simulate
   // as if it were dropped), which will prevent our timers from being messed up.
-  if ((wres != IPseudoTcpNotify::WR_SUCCESS) && (NULL != data))
+  if ((wres != IPseudoTcpNotify::WR_SUCCESS) && (0 != len))
     return wres;
 
   m_t_ack = 0;
@@ -561,9 +604,11 @@
   if (m_shutdown == SD_FORCEFUL)
     return false;
 
+  size_t snd_buffered = 0;
+  m_sbuf.GetBuffered(&snd_buffered);
   if ((m_shutdown == SD_GRACEFUL)
       && ((m_state != TCP_ESTABLISHED)
-          || ((m_slen == 0) && (m_t_ack == 0)))) {
+          || ((snd_buffered == 0) && (m_t_ack == 0)))) {
     return false;
   }
 
@@ -628,13 +673,15 @@
       return false;
     } else if (seg.data[0] == CTL_CONNECT) {
       bConnect = true;
+
+      // TCP options are in the remainder of the payload after CTL_CONNECT.
+      parseOptions(&seg.data[1], seg.len - 1);
+
       if (m_state == TCP_LISTEN) {
         m_state = TCP_SYN_RECEIVED;
         LOG(LS_INFO) << "State: TCP_SYN_RECEIVED";
         //m_notify->associate(addr);
-        char buffer[1];
-        buffer[0] = CTL_CONNECT;
-        queue(buffer, 1, true);
+        queueConnectMessage();
       } else if (m_state == TCP_SYN_SENT) {
         m_state = TCP_ESTABLISHED;
         LOG(LS_INFO) << "State: TCP_ESTABLISHED";
@@ -680,16 +727,14 @@
       }
     }
 
-    m_snd_wnd = seg.wnd;
+    m_snd_wnd = static_cast<uint32>(seg.wnd) << m_swnd_scale;
 
     uint32 nAcked = seg.ack - m_snd_una;
     m_snd_una = seg.ack;
 
     m_rto_base = (m_snd_una == m_snd_nxt) ? 0 : now;
 
-    m_slen -= nAcked;
-    memmove(m_sbuf, m_sbuf + nAcked, m_slen);
-    //LOG(LS_INFO) << "PseudoTcp::process - m_slen = " << m_slen;
+    m_sbuf.ConsumeReadData(nAcked);
 
     for (uint32 nFree = nAcked; nFree > 0; ) {
       ASSERT(!m_slist.empty());
@@ -732,32 +777,9 @@
         m_cwnd += talk_base::_max<uint32>(1, m_mss * m_mss / m_cwnd);
       }
     }
-
-    // !?! A bit hacky
-    if ((m_state == TCP_SYN_RECEIVED) && !bConnect) {
-      m_state = TCP_ESTABLISHED;
-      LOG(LS_INFO) << "State: TCP_ESTABLISHED";
-      adjustMTU();
-      if (m_notify) {
-        m_notify->OnTcpOpen(this);
-      }
-      //notify(evOpen);
-    }
-
-    // If we make room in the send queue, notify the user
-    // The goal it to make sure we always have at least enough data to fill the
-    // window.  We'd like to notify the app when we are halfway to that point.
-    const uint32 kIdealRefillSize = (sizeof(m_sbuf) + sizeof(m_rbuf)) / 2;
-    if (m_bWriteEnable && (m_slen < kIdealRefillSize)) {
-      m_bWriteEnable = false;
-      if (m_notify) {
-        m_notify->OnTcpWriteable(this);
-      }
-      //notify(evWrite);
-    }
   } else if (seg.ack == m_snd_una) {
     // !?! Note, tcp says don't do this... but otherwise how does a closed window become open?
-    m_snd_wnd = seg.wnd;
+    m_snd_wnd = static_cast<uint32>(seg.wnd) << m_swnd_scale;
 
     // Check duplicate acks
     if (seg.len > 0) {
@@ -786,6 +808,31 @@
     }
   }
 
+  // !?! A bit hacky
+  if ((m_state == TCP_SYN_RECEIVED) && !bConnect) {
+    m_state = TCP_ESTABLISHED;
+    LOG(LS_INFO) << "State: TCP_ESTABLISHED";
+    adjustMTU();
+    if (m_notify) {
+      m_notify->OnTcpOpen(this);
+    }
+    //notify(evOpen);
+  }
+
+  // If we make room in the send queue, notify the user
+  // The goal it to make sure we always have at least enough data to fill the
+  // window.  We'd like to notify the app when we are halfway to that point.
+  const uint32 kIdealRefillSize = (m_sbuf_len + m_rbuf_len) / 2;
+  size_t snd_buffered = 0;
+  m_sbuf.GetBuffered(&snd_buffered);
+  if (m_bWriteEnable && static_cast<uint32>(snd_buffered) < kIdealRefillSize) {
+    m_bWriteEnable = false;
+    if (m_notify) {
+      m_notify->OnTcpWriteable(this);
+    }
+    //notify(evWrite);
+  }
+
   // Conditions were acks must be sent:
   // 1) Segment is too old (they missed an ACK) (immediately)
   // 2) Segment is too new (we missed a segment) (immediately)
@@ -823,8 +870,12 @@
       seg.len = 0;
     }
   }
-  if ((seg.seq + seg.len - m_rcv_nxt) > getReceiveBufferSpace()) {
-    uint32 nAdjust = seg.seq + seg.len - m_rcv_nxt - getReceiveBufferSpace();
+
+  size_t available_space = 0;
+  m_rbuf.GetWriteRemaining(&available_space);
+
+  if ((seg.seq + seg.len - m_rcv_nxt) > static_cast<uint32>(available_space)) {
+    uint32 nAdjust = seg.seq + seg.len - m_rcv_nxt - static_cast<uint32>(available_space);
     if (nAdjust < seg.len) {
       seg.len -= nAdjust;
     } else {
@@ -843,14 +894,12 @@
     } else {
       uint32 nOffset = seg.seq - m_rcv_nxt;
 
-      if (getReceiveBufferConsecutiveSpace() < seg.len + nOffset) {
-        consolidateReceiveBufferSpace();
-        ASSERT(getReceiveBufferConsecutiveSpace() >= seg.len + nOffset);
-      }
+      talk_base::StreamResult result = m_rbuf.WriteOffset(seg.data, seg.len,
+                                                          nOffset, NULL);
+      ASSERT(result == talk_base::SR_SUCCESS);
 
-      memcpy(m_rbuf + m_rlen + nOffset, seg.data, seg.len);
       if (seg.seq == m_rcv_nxt) {
-        m_rlen += seg.len;
+        m_rbuf.ConsumeWriteBuffer(seg.len);
         m_rcv_nxt += seg.len;
         m_rcv_wnd -= seg.len;
         bNewData = true;
@@ -863,7 +912,7 @@
 #if _DEBUGMSG >= _DBG_NORMAL
             LOG(LS_INFO) << "Recovered " << nAdjust << " bytes (" << m_rcv_nxt << " -> " << m_rcv_nxt + nAdjust << ")";
 #endif // _DEBUGMSG
-            m_rlen += nAdjust;
+            m_rbuf.ConsumeWriteBuffer(nAdjust);
             m_rcv_nxt += nAdjust;
             m_rcv_wnd -= nAdjust;
           }
@@ -910,8 +959,10 @@
   while (true) {
     uint32 seq = seg->seq;
     uint8 flags = (seg->bCtrl ? FLAG_CTL : 0);
-    const char* buffer = m_sbuf + (seg->seq - m_snd_una);
-    IPseudoTcpNotify::WriteResult wres = this->packet(seq, flags, buffer, nTransmit);
+    IPseudoTcpNotify::WriteResult wres = packet(seq,
+                                                flags,
+                                                seg->seq - m_snd_una,
+                                                nTransmit);
 
     if (wres == IPseudoTcpNotify::WR_SUCCESS)
       break;
@@ -987,7 +1038,10 @@
     uint32 nInFlight = m_snd_nxt - m_snd_una;
     uint32 nUseable = (nInFlight < nWindow) ? (nWindow - nInFlight) : 0;
 
-    uint32 nAvailable = talk_base::_min(m_slen - nInFlight, m_mss);
+    size_t snd_buffered = 0;
+    m_sbuf.GetBuffered(&snd_buffered);
+    uint32 nAvailable =
+        talk_base::_min(static_cast<uint32>(snd_buffered) - nInFlight, m_mss);
 
     if (nAvailable > nUseable) {
       if (nUseable * 4 < nWindow) {
@@ -1000,13 +1054,16 @@
 
 #if _DEBUGMSG >= _DBG_VERBOSE
     if (bFirst) {
+      size_t available_space = 0;
+      m_sbuf.GetWriteRemaining(&available_space);
+
       bFirst = false;
       LOG(LS_INFO) << "[cwnd: " << m_cwnd
                    << "  nWindow: " << nWindow
                    << "  nInFlight: " << nInFlight
                    << "  nAvailable: " << nAvailable
-                   << "  nQueued: " << m_slen - nInFlight
-                   << "  nEmpty: " << sizeof(m_sbuf) - m_slen
+                   << "  nQueued: " << snd_buffered
+                   << "  nEmpty: " << available_space
                    << "  ssthresh: " << m_ssthresh << "]";
     }
 #endif // _DEBUGMSG
@@ -1059,8 +1116,6 @@
 
 void
 PseudoTcp::closedown(uint32 err) {
-  m_slen = 0;
-
   LOG(LS_INFO) << "State: TCP_CLOSED";
   m_state = TCP_CLOSED;
   if (m_notify) {
@@ -1089,24 +1144,131 @@
 
 bool
 PseudoTcp::isReceiveBufferFull() const {
-  return !getReceiveBufferSpace();
-}
-
-uint32
-PseudoTcp::getReceiveBufferSpace() const {
-  return sizeof(m_rbuf) - m_rlen + m_rpos;
-}
-
-uint32
-PseudoTcp::getReceiveBufferConsecutiveSpace() const {
-  return sizeof(m_rbuf) - m_rlen;
+  size_t available_space = 0;
+  m_rbuf.GetWriteRemaining(&available_space);
+  return !available_space;
 }
 
 void
-PseudoTcp::consolidateReceiveBufferSpace() {
-  memmove(m_rbuf, m_rbuf + m_rpos, sizeof(m_rbuf) - m_rpos);
-  m_rlen -= m_rpos;
-  m_rpos = 0;
+PseudoTcp::disableWindowScale() {
+  m_support_wnd_scale = false;
+}
+
+void
+PseudoTcp::queueConnectMessage() {
+  talk_base::ByteBuffer buf(talk_base::ByteBuffer::ORDER_NETWORK);
+
+  buf.WriteUInt8(CTL_CONNECT);
+  if (m_support_wnd_scale) {
+    buf.WriteUInt8(TCP_OPT_WND_SCALE);
+    buf.WriteUInt8(1);
+    buf.WriteUInt8(m_rwnd_scale);
+  }
+  m_snd_wnd = buf.Length();
+  queue(buf.Data(), buf.Length(), true);
+}
+
+void
+PseudoTcp::parseOptions(const char* data, uint32 len) {
+  std::set<uint8> options_specified;
+
+  // See http://www.freesoft.org/CIE/Course/Section4/8.htm for
+  // parsing the options list.
+  talk_base::ByteBuffer buf(data, len);
+  while (buf.Length()) {
+    uint8 kind = TCP_OPT_EOL;
+    buf.ReadUInt8(&kind);
+
+    if (kind == TCP_OPT_EOL) {
+      // End of option list.
+      break;
+    } else if (kind == TCP_OPT_NOOP) {
+      // No op.
+      continue;
+    }
+
+    // Length of this option.
+    ASSERT(len);
+    uint8 opt_len = 0;
+    buf.ReadUInt8(&opt_len);
+
+    // Content of this option.
+    if (opt_len <= buf.Length()) {
+      applyOption(kind, buf.Data(), opt_len);
+      buf.Consume(opt_len);
+    } else {
+      LOG(LS_ERROR) << "Invalid option length received.";
+      return;
+    }
+    options_specified.insert(kind);
+  }
+
+  if (options_specified.find(TCP_OPT_WND_SCALE) == options_specified.end()) {
+    LOG(LS_WARNING) << "Peer doesn't support window scaling";
+
+    if (m_rwnd_scale > 0) {
+      // Peer doesn't support TCP options and window scaling.
+      // Revert receive buffer size to default value.
+      resizeReceiveBuffer(DEFAULT_RCV_BUF_SIZE);
+      m_swnd_scale = 0;
+    }
+  }
+}
+
+void
+PseudoTcp::applyOption(char kind, const char* data, uint32 len) {
+  if (kind == TCP_OPT_MSS) {
+    LOG(LS_WARNING) << "Peer specified MSS option which is not supported.";
+    // TODO: Implement.
+  } else if (kind == TCP_OPT_WND_SCALE) {
+    // Window scale factor.
+    // http://www.ietf.org/rfc/rfc1323.txt
+    if (len != 1) {
+      LOG_F(WARNING) << "Invalid window scale option received.";
+      return;
+    }
+    applyWindowScaleOption(data[0]);
+  }
+}
+
+void
+PseudoTcp::applyWindowScaleOption(uint8 scale_factor) {
+  m_swnd_scale = scale_factor;
+}
+
+void
+PseudoTcp::resizeSendBuffer(uint32 new_size) {
+  m_sbuf_len = new_size;
+  m_sbuf.SetCapacity(new_size);
+}
+
+void
+PseudoTcp::resizeReceiveBuffer(uint32 new_size) {
+  uint8 scale_factor = 0;
+
+  // Determine the scale factor such that the scaled window size can fit
+  // in a 16-bit unsigned integer.
+  while (new_size > 0xFFFF) {
+    ++scale_factor;
+    new_size >>= 1;
+  }
+
+  // Determine the proper size of the buffer.
+  new_size <<= scale_factor;
+  bool result = m_rbuf.SetCapacity(new_size);
+
+  // Make sure the new buffer is large enough to contain data in the old
+  // buffer. This should always be true because this method is called either
+  // before connection is established or when peers are exchanging connect
+  // messages.
+  ASSERT(result);
+  m_rbuf_len = new_size;
+  m_rwnd_scale = scale_factor;
+  m_ssthresh = new_size;
+
+  size_t available_space = 0;
+  m_rbuf.GetWriteRemaining(&available_space);
+  m_rcv_wnd = available_space;
 }
 
 }  // namespace cricket
diff --git a/talk/p2p/base/pseudotcp.h b/talk/p2p/base/pseudotcp.h
index fd11c86..f07a555 100644
--- a/talk/p2p/base/pseudotcp.h
+++ b/talk/p2p/base/pseudotcp.h
@@ -31,6 +31,7 @@
 #include <list>
 
 #include "talk/base/basictypes.h"
+#include "talk/base/stream.h"
 
 namespace cricket {
 
@@ -95,24 +96,20 @@
   // Call these to get/set option values to tailor this PseudoTcp
   // instance's behaviour for the kind of data it will carry.
   // If an unrecognized option is set or got, an assertion will fire.
+  //
+  // Setting options for OPT_RCVBUF or OPT_SNDBUF after Connect() is called
+  // will result in an assertion.
   enum Option {
     OPT_NODELAY,      // Whether to enable Nagle's algorithm (0 == off)
     OPT_ACKDELAY,     // The Delayed ACK timeout (0 == off).
-    //kOptRcvBuf,     // Set the receive buffer size, in bytes.
-    //kOptSndBuf,     // Set the send buffer size, in bytes.
+    OPT_RCVBUF,       // Set the receive buffer size, in bytes.
+    OPT_SNDBUF,       // Set the send buffer size, in bytes.
   };
   void GetOption(Option opt, int* value);
   void SetOption(Option opt, int value);
 
  protected:
   enum SendFlags { sfNone, sfDelayedAck, sfImmediateAck };
-  enum {
-    // Note: can't go as high as 1024 * 64, because of uint16 precision
-    kRcvBufSize = 1024 * 60,
-    // Note: send buffer should be larger to make sure we can always fill the
-    // receiver window
-    kSndBufSize = 1024 * 90
-  };
 
   struct Segment {
     uint32 conv, seq, ack;
@@ -140,8 +137,16 @@
 
   uint32 queue(const char* data, uint32 len, bool bCtrl);
 
+  // Creates a packet and submits it to the network. This method can either
+  // send payload or just an ACK packet.
+  //
+  // |seq| is the sequence number of this packet.
+  // |flags| is the flags for sending this packet.
+  // |offset| is the offset to read from |m_sbuf|.
+  // |len| is the number of bytes to read from |m_sbuf| as payload. If this
+  // value is 0 then this is an ACK packet, otherwise this packet has payload.
   IPseudoTcpNotify::WriteResult packet(uint32 seq, uint8 flags,
-                                       const char* data, uint32 len);
+                                       uint32 offset, uint32 len);
   bool parse(const uint8* buffer, uint32 size);
 
   void attemptSend(SendFlags sflags = sfNone);
@@ -159,15 +164,29 @@
   // This method is used in test only to query receive buffer state.
   bool isReceiveBufferFull() const;
 
+  // This method is only used in tests, to disable window scaling
+  // support for testing backward compatibility.
+  void disableWindowScale();
+
  private:
-  // Get the total number of bytes of free space in m_rbuf, consecutive or not.
-  uint32 getReceiveBufferSpace() const;
+  // Queue the connect message with TCP options.
+  void queueConnectMessage();
 
-  // Get the number of bytes that can be written to m_rbuf.
-  uint32 getReceiveBufferConsecutiveSpace() const;
+  // Parse TCP options in the header.
+  void parseOptions(const char* data, uint32 len);
 
-  // Consolidate free space in m_rbuf so that it is a consecutive segment.
-  void consolidateReceiveBufferSpace();
+  // Apply a TCP option that has been read from the header.
+  void applyOption(char kind, const char* data, uint32 len);
+
+  // Apply window scale option.
+  void applyWindowScaleOption(uint8 scale_factor);
+
+  // Resize the send buffer with |new_size| in bytes.
+  void resizeSendBuffer(uint32 new_size);
+
+  // Resize the receive buffer with |new_size| in bytes. This call adjusts
+  // window scale factor |m_swnd_scale| accordingly.
+  void resizeReceiveBuffer(uint32 new_size);
 
   IPseudoTcpNotify* m_notify;
   enum Shutdown { SD_NONE, SD_GRACEFUL, SD_FORCEFUL } m_shutdown;
@@ -182,13 +201,16 @@
   // Incoming data
   typedef std::list<RSegment> RList;
   RList m_rlist;
-  char m_rbuf[kRcvBufSize];
-  uint32 m_rcv_nxt, m_rcv_wnd, m_rpos, m_rlen, m_lastrecv;
+  uint32 m_rbuf_len, m_rcv_nxt, m_rcv_wnd, m_lastrecv;
+  uint8 m_rwnd_scale;  // Window scale factor.
+  talk_base::FifoBuffer m_rbuf;
 
   // Outgoing data
   SList m_slist;
-  char m_sbuf[kSndBufSize];
-  uint32 m_snd_nxt, m_snd_wnd, m_slen, m_lastsend, m_snd_una;
+  uint32 m_sbuf_len, m_snd_nxt, m_snd_wnd, m_lastsend, m_snd_una;
+  uint8 m_swnd_scale;  // Window scale factor.
+  talk_base::FifoBuffer m_sbuf;
+
   // Maximum segment size, estimated protocol level, largest segment sent
   uint32 m_mss, m_msslevel, m_largest, m_mtu_advise;
   // Retransmit timer
@@ -209,6 +231,10 @@
   // Configuration options
   bool m_use_nagling;
   uint32 m_ack_delay;
+
+  // This is used by unit tests to test backward compatibility of
+  // PseudoTcp implementations that don't support window scaling.
+  bool m_support_wnd_scale;
 };
 
 }  // namespace cricket