Update mac build with async sockets.

git-svn-id: http://libjingle.googlecode.com/svn/trunk@66 dd674b97-3498-5ee5-1854-bdd07cd0ff33
diff --git a/CHANGELOG b/CHANGELOG
index 88c5d23..4909ffe 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,7 @@
 Libjingle
 
 0.5.6 - Jun 2, 2011
+  - Improved mac socket server
   - Add IqTask
   - Flush output in examples/call
   - Bug fixes
diff --git a/talk/base/macasyncsocket.cc b/talk/base/macasyncsocket.cc
new file mode 100644
index 0000000..e31be57
--- /dev/null
+++ b/talk/base/macasyncsocket.cc
@@ -0,0 +1,437 @@
+// Copyright 2010 Google Inc. All Rights Reserved.
+
+//         thaloun@google.com (Tim Haloun)
+//
+// MacAsyncSocket is a kind of AsyncSocket. It does not support the SOCK_DGRAM
+// type (yet). It works asynchronously, which means that users of this socket
+// should connect to the various events declared in asyncsocket.h to receive
+// notifications about this socket.  It uses CFSockets for signals, but prefers
+// the basic bsd socket operations rather than their CFSocket wrappers when
+// possible.
+
+#include <CoreFoundation/CoreFoundation.h>
+#include <fcntl.h>
+
+#include "talk/base/macasyncsocket.h"
+
+#include "talk/base/logging.h"
+#include "talk/base/macsocketserver.h"
+
+namespace talk_base {
+
+static const int kCallbackFlags = kCFSocketReadCallBack |
+                                  kCFSocketConnectCallBack |
+                                  kCFSocketWriteCallBack;
+
+MacAsyncSocket::MacAsyncSocket(MacBaseSocketServer* ss)
+    : ss_(ss),
+      socket_(NULL),
+      native_socket_(INVALID_SOCKET),
+      source_(NULL),
+      current_callbacks_(0),
+      disabled_(false),
+      error_(0),
+      state_(CS_CLOSED) {
+  Initialize();
+}
+
+MacAsyncSocket::~MacAsyncSocket() {
+  Close();
+}
+
+// Returns the address to which the socket is bound.  If the socket is not
+// bound, then the any-address is returned.
+SocketAddress MacAsyncSocket::GetLocalAddress() const {
+  SocketAddress address;
+
+  // The CFSocket doesn't pick up on implicit binds from the connect call.
+  // Calling bind in before connect explicitly causes errors, so just query
+  // the underlying bsd socket.
+  sockaddr_in addr;
+  socklen_t addrlen = sizeof(addr);
+  int result = ::getsockname(native_socket_,
+                             reinterpret_cast<sockaddr*>(&addr), &addrlen);
+  if (result >= 0) {
+    ASSERT(addrlen == sizeof(addr));
+    address.FromSockAddr(addr);
+  }
+  return address;
+}
+
+// Returns the address to which the socket is connected.  If the socket is not
+// connected, then the any-address is returned.
+SocketAddress MacAsyncSocket::GetRemoteAddress() const {
+  SocketAddress address;
+
+  // Use native_socket for consistency with GetLocalAddress.
+  sockaddr_in addr;
+  socklen_t addrlen = sizeof(addr);
+  int result = ::getpeername(native_socket_,
+                             reinterpret_cast<sockaddr*>(&addr), &addrlen);
+  if (result >= 0) {
+    ASSERT(addrlen == sizeof(addr));
+    address.FromSockAddr(addr);
+  }
+  return address;
+}
+
+// Bind the socket to a local address.
+int MacAsyncSocket::Bind(const SocketAddress& address) {
+  sockaddr_in saddr;
+  address.ToSockAddr(&saddr);
+  int err = ::bind(native_socket_, reinterpret_cast<sockaddr*>(&saddr),
+                   sizeof(saddr));
+  if (err == SOCKET_ERROR) error_ = errno;
+  return err;
+}
+
+// Connect to a remote address.
+int MacAsyncSocket::Connect(const SocketAddress& address) {
+  if (!valid()) {
+    Initialize();
+    if (!valid())
+      return SOCKET_ERROR;
+  }
+
+  SocketAddress addr2(address);
+  if (addr2.IsUnresolved()) {
+    LOG(LS_VERBOSE) << "Resolving addr in MacAsyncSocket::Connect";
+    // TODO: Convert to using AsyncResolver
+    if (!addr2.ResolveIP(false, &error_)) {
+      return SOCKET_ERROR;
+    }
+  }
+
+  sockaddr_in saddr;
+  addr2.ToSockAddr(&saddr);
+  int result = ::connect(native_socket_, reinterpret_cast<sockaddr*>(&saddr),
+                         sizeof(saddr));
+
+  if (result != SOCKET_ERROR) {
+    state_ = CS_CONNECTED;
+  } else {
+    error_ = errno;
+    if (error_ == EINPROGRESS) {
+      state_ = CS_CONNECTING;
+      result = 0;
+    }
+  }
+  return result;
+}
+
+// Send to the remote end we're connected to.
+int MacAsyncSocket::Send(const void* pv, size_t cb) {
+  if (!valid()) {
+    return SOCKET_ERROR;
+  }
+
+  int sent = ::send(native_socket_, pv, cb, 0);
+
+  if (sent == SOCKET_ERROR) {
+    error_ = errno;
+
+    if (IsBlocking()) {
+      // Reenable the writable callback (once), since we are flow controlled.
+      LOG(LS_VERBOSE) << "Enabling flow control callback.";
+      CFSocketEnableCallBacks(socket_, kCallbackFlags);
+      current_callbacks_ = kCallbackFlags;
+    }
+  }
+  return sent;
+}
+
+// Send to the given address. We may or may not be connected to anyone.
+int MacAsyncSocket::SendTo(const void* pv, size_t cb,
+                           const SocketAddress& address) {
+  if (!valid()) {
+    return SOCKET_ERROR;
+  }
+
+  sockaddr_in saddr;
+  address.ToSockAddr(&saddr);
+  int sent = ::sendto(native_socket_, pv, cb, 0,
+                      reinterpret_cast<sockaddr*>(&saddr), sizeof(saddr));
+
+  if (sent == SOCKET_ERROR) {
+    error_ = errno;
+  }
+
+  return sent;
+}
+
+// Read data received from the remote end we're connected to.
+int MacAsyncSocket::Recv(void* pv, size_t cb) {
+  int received = ::recv(native_socket_, reinterpret_cast<char*>(pv), cb, 0);
+  if (received == SOCKET_ERROR) error_ = errno;
+
+  // Recv should only be called when there is data to read
+  ASSERT((received != 0) || (cb == 0));
+  return received;
+}
+
+// Read data received from any remote party
+int MacAsyncSocket::RecvFrom(void* pv, size_t cb, SocketAddress* paddr) {
+  sockaddr_in saddr;
+  socklen_t cbAddr = sizeof(saddr);
+  int received = ::recvfrom(native_socket_, reinterpret_cast<char*>(pv), cb, 0,
+                          reinterpret_cast<sockaddr*>(&saddr), &cbAddr);
+  if (received >= 0 && paddr != NULL) {
+    paddr->FromSockAddr(saddr);
+  } else if (received == SOCKET_ERROR) {
+    error_ = errno;
+  }
+  return received;
+}
+
+int MacAsyncSocket::Listen(int backlog) {
+  if (!valid()) {
+    return SOCKET_ERROR;
+  }
+
+  int res = ::listen(native_socket_, backlog);
+  if (res != SOCKET_ERROR)
+    state_ = CS_CONNECTING;
+  else
+    error_ = errno;
+
+  return res;
+}
+
+MacAsyncSocket* MacAsyncSocket::Accept(SocketAddress* paddr) {
+  sockaddr_in saddr;
+  socklen_t cbAddr = sizeof(saddr);
+
+  int socket_fd = ::accept(native_socket_, reinterpret_cast<sockaddr*>(&saddr),
+                           &cbAddr);
+  if (socket_fd == INVALID_SOCKET) {
+    error_ = errno;
+    return NULL;
+  }
+
+  MacAsyncSocket* s = new MacAsyncSocket(ss_, socket_fd);
+  if (s && s->valid()) {
+    s->state_ = CS_CONNECTED;
+    if (paddr)
+      paddr->FromSockAddr(saddr);
+  } else {
+    delete s;
+    s = NULL;
+  }
+  return s;
+}
+
+int MacAsyncSocket::Close() {
+  if (source_ != NULL) {
+    CFRunLoopSourceInvalidate(source_);
+    CFRelease(source_);
+    if (ss_) ss_->UnregisterSocket(this);
+    source_ = NULL;
+  }
+
+  if (socket_ != NULL) {
+    CFSocketInvalidate(socket_);
+    CFRelease(socket_);
+    socket_ = NULL;
+  }
+
+  native_socket_ = INVALID_SOCKET;  // invalidates the socket
+  error_ = 0;
+  state_ = CS_CLOSED;
+  return 0;
+}
+
+int MacAsyncSocket::EstimateMTU(uint16* mtu) {
+  ASSERT(false && "NYI");
+  return -1;
+}
+
+int MacAsyncSocket::GetError() const {
+  return error_;
+}
+
+void MacAsyncSocket::SetError(int error) {
+  error_ = error;
+}
+
+Socket::ConnState MacAsyncSocket::GetState() const {
+  return state_;
+}
+
+int MacAsyncSocket::GetOption(Option opt, int* value) {
+  ASSERT(false && "NYI");
+  return -1;
+}
+
+int MacAsyncSocket::SetOption(Option opt, int value) {
+  ASSERT(false && "NYI");
+  return -1;
+}
+
+void MacAsyncSocket::EnableCallbacks() {
+  if (valid()) {
+    disabled_ = false;
+    CFSocketEnableCallBacks(socket_, current_callbacks_);
+  }
+}
+
+void MacAsyncSocket::DisableCallbacks() {
+  if (valid()) {
+    disabled_ = true;
+    CFSocketDisableCallBacks(socket_, kCallbackFlags);
+  }
+}
+
+MacAsyncSocket::MacAsyncSocket(MacBaseSocketServer* ss, int native_socket)
+    : ss_(ss),
+      socket_(NULL),
+      native_socket_(native_socket),
+      source_(NULL),
+      current_callbacks_(0),
+      disabled_(false),
+      error_(0),
+      state_(CS_CLOSED) {
+  Initialize();
+}
+
+// Create a new socket, wrapping the native socket if provided or creating one
+// otherwise. In case of any failure, consume the native socket.  We assume the
+// wrapped socket is in the closed state.  If this is not the case you must
+// update the state_ field for this socket yourself.
+void MacAsyncSocket::Initialize() {
+  CFSocketContext ctx = { 0 };
+  ctx.info = this;
+
+  // First create the CFSocket
+  CFSocketRef cf_socket = NULL;
+  bool res = false;
+  if (native_socket_ == INVALID_SOCKET) {
+    cf_socket = CFSocketCreate(kCFAllocatorDefault,
+                               PF_INET, SOCK_STREAM, IPPROTO_TCP,
+                               kCallbackFlags, MacAsyncSocketCallBack, &ctx);
+  } else {
+    cf_socket = CFSocketCreateWithNative(kCFAllocatorDefault,
+                                         native_socket_, kCallbackFlags,
+                                         MacAsyncSocketCallBack, &ctx);
+  }
+
+  if (cf_socket) {
+    res = true;
+    socket_ = cf_socket;
+    native_socket_ = CFSocketGetNative(cf_socket);
+    current_callbacks_ = kCallbackFlags;
+  }
+
+  if (res) {
+    // Make the underlying socket asynchronous
+    res = (-1 != ::fcntl(native_socket_, F_SETFL,
+                         ::fcntl(native_socket_, F_GETFL, 0) | O_NONBLOCK));
+  }
+
+  if (res) {
+    // Add this socket to the run loop
+    source_ = CFSocketCreateRunLoopSource(kCFAllocatorDefault, socket_, 0);
+    res = (source_ != NULL);
+  }
+
+  if (res) {
+    if (ss_) ss_->RegisterSocket(this);
+    CFRunLoopAddSource(CFRunLoopGetCurrent(), source_, kCFRunLoopCommonModes);
+  }
+
+  if (!res) {
+    int error = errno;
+    Close();  //  Clears error_.
+    error_ = error;
+  }
+}
+
+// Call CFRelease on the result when done using it
+CFDataRef MacAsyncSocket::CopyCFAddress(const SocketAddress& address) {
+  sockaddr_in saddr;
+  address.ToSockAddr(&saddr);
+
+  const UInt8* bytes = reinterpret_cast<UInt8*>(&saddr);
+
+  CFDataRef cf_address = CFDataCreate(kCFAllocatorDefault,
+                                      bytes, sizeof(saddr));
+
+  ASSERT(cf_address != NULL);
+  return cf_address;
+}
+
+void MacAsyncSocket::MacAsyncSocketCallBack(CFSocketRef s,
+                                            CFSocketCallBackType callbackType,
+                                            CFDataRef address,
+                                            const void* data,
+                                            void* info) {
+  MacAsyncSocket* this_socket =
+      reinterpret_cast<MacAsyncSocket*>(info);
+  ASSERT(this_socket != NULL && this_socket->socket_ == s);
+
+  // Don't signal any socket messages if the socketserver is not listening on
+  // them.  When we are reenabled they will be requeued and will fire again.
+  if (this_socket->disabled_)
+    return;
+
+  switch (callbackType) {
+    case kCFSocketReadCallBack:
+      // This callback is invoked in one of 3 situations:
+      // 1. A new connection is waiting to be accepted.
+      // 2. The remote end closed the connection (a recv will return 0).
+      // 3. Data is available to read.
+      // 4. The connection closed unhappily (recv will return -1).
+      if (this_socket->state_ == CS_CONNECTING) {
+        // Case 1.
+        this_socket->SignalReadEvent(this_socket);
+      } else {
+        char ch, amt;
+        amt = ::recv(this_socket->native_socket_, &ch, 1, MSG_PEEK);
+        if (amt == 0) {
+          // Case 2.
+          this_socket->state_ = CS_CLOSED;
+
+          // Disable additional callbacks or we will signal close twice.
+          CFSocketDisableCallBacks(this_socket->socket_, kCFSocketReadCallBack);
+          this_socket->current_callbacks_ &= ~kCFSocketReadCallBack;
+          this_socket->SignalCloseEvent(this_socket, 0);
+        } else if (amt > 0) {
+          // Case 3.
+          this_socket->SignalReadEvent(this_socket);
+        } else {
+          // Case 4.
+          int error = errno;
+          if (error == EAGAIN) {
+            // Observed in practice.  Let's hope it's a spurious or out of date
+            // signal, since we just eat it.
+          } else {
+            this_socket->error_ = error;
+            this_socket->SignalCloseEvent(this_socket, error);
+          }
+        }
+      }
+      break;
+
+    case kCFSocketConnectCallBack:
+      if (data != NULL) {
+        // An error occured in the background while connecting
+        this_socket->error_ = errno;
+        this_socket->state_ = CS_CLOSED;
+        this_socket->SignalCloseEvent(this_socket, this_socket->error_);
+      } else {
+        this_socket->state_ = CS_CONNECTED;
+        this_socket->SignalConnectEvent(this_socket);
+      }
+      break;
+
+    case kCFSocketWriteCallBack:
+      // Update our callback tracking.  Write doesn't reenable, so it's off now.
+      this_socket->current_callbacks_ &= ~kCFSocketWriteCallBack;
+      this_socket->SignalWriteEvent(this_socket);
+      break;
+
+    default:
+      ASSERT(false && "Invalid callback type for socket");
+  }
+}
+
+}  // namespace talk_base
diff --git a/talk/base/macasyncsocket.h b/talk/base/macasyncsocket.h
new file mode 100644
index 0000000..10a1c10
--- /dev/null
+++ b/talk/base/macasyncsocket.h
@@ -0,0 +1,84 @@
+// Copyright 2008 Google Inc. All Rights Reserved.
+
+//
+// MacAsyncSocket is a kind of AsyncSocket. It only creates sockets
+// of the TCP type, and does not (yet) support listen and accept. It works
+// asynchronously, which means that users of this socket should connect to
+// the various events declared in asyncsocket.h to receive notifications about
+// this socket.
+
+#ifndef TALK_BASE_MACASYNCSOCKET_H__
+#define TALK_BASE_MACASYNCSOCKET_H__
+
+#include <CoreFoundation/CoreFoundation.h>
+
+#include "talk/base/asyncsocket.h"
+
+namespace talk_base {
+
+class MacBaseSocketServer;
+
+class MacAsyncSocket : public AsyncSocket {
+ public:
+  MacAsyncSocket(MacBaseSocketServer* ss);
+  virtual ~MacAsyncSocket();
+
+  bool valid() const { return source_ != NULL; }
+
+  // Socket interface
+  virtual SocketAddress GetLocalAddress() const;
+  virtual SocketAddress GetRemoteAddress() const;
+  virtual int Bind(const SocketAddress& addr);
+  virtual int Connect(const SocketAddress& addr);
+  virtual int Send(const void* pv, size_t cb);
+  virtual int SendTo(const void* pv, size_t cb, const SocketAddress& addr);
+  virtual int Recv(void* pv, size_t cb);
+  virtual int RecvFrom(void* pv, size_t cb, SocketAddress* paddr);
+  virtual int Listen(int backlog);
+  virtual MacAsyncSocket* Accept(SocketAddress* paddr);
+  virtual int Close();
+  virtual int GetError() const;
+  virtual void SetError(int error);
+  virtual ConnState GetState() const;
+  virtual int EstimateMTU(uint16* mtu);
+  virtual int GetOption(Option opt, int* value);
+  virtual int SetOption(Option opt, int value);
+
+  // For the MacBaseSocketServer to disable callbacks when process_io is false.
+  void EnableCallbacks();
+  void DisableCallbacks();
+
+ private:
+  // Creates an async socket from an existing bsd socket
+  explicit MacAsyncSocket(MacBaseSocketServer* ss, int native_socket);
+
+   // Attaches the socket to the CFRunloop and sets the wrapped bsd socket
+  // to async mode
+  void Initialize();
+
+  // Translate the SocketAddress into a CFDataRef to pass to CF socket
+  // functions. Caller must call CFRelease on the result when done.
+  static CFDataRef CopyCFAddress(const SocketAddress& address);
+
+  // Callback for the underlying CFSocketRef.
+  static void MacAsyncSocketCallBack(CFSocketRef s,
+                                     CFSocketCallBackType callbackType,
+                                     CFDataRef address,
+                                     const void* data,
+                                     void* info);
+
+  MacBaseSocketServer* ss_;
+  CFSocketRef socket_;
+  int native_socket_;
+  CFRunLoopSourceRef source_;
+  int current_callbacks_;
+  bool disabled_;
+  int error_;
+  ConnState state_;
+
+  DISALLOW_EVIL_CONSTRUCTORS(MacAsyncSocket);
+};
+
+}  // namespace talk_base
+
+#endif  // TALK_BASE_MACASYNCSOCKET_H__
diff --git a/talk/base/macsocketserver.cc b/talk/base/macsocketserver.cc
new file mode 100644
index 0000000..154b83d
--- /dev/null
+++ b/talk/base/macsocketserver.cc
@@ -0,0 +1,359 @@
+
+
+#include "talk/base/macsocketserver.h"
+
+#include "talk/base/common.h"
+#include "talk/base/logging.h"
+#include "talk/base/macasyncsocket.h"
+#include "talk/base/macutils.h"
+#include "talk/base/thread.h"
+
+namespace talk_base {
+
+///////////////////////////////////////////////////////////////////////////////
+// MacBaseSocketServer
+///////////////////////////////////////////////////////////////////////////////
+
+MacBaseSocketServer::MacBaseSocketServer() {
+}
+
+MacBaseSocketServer::~MacBaseSocketServer() {
+}
+
+AsyncSocket* MacBaseSocketServer::CreateAsyncSocket(int type) {
+  if (SOCK_STREAM != type)
+    return NULL;
+
+  MacAsyncSocket* socket = new MacAsyncSocket(this);
+  if (!socket->valid()) {
+    delete socket;
+    return NULL;
+  }
+  return socket;
+}
+
+void MacBaseSocketServer::RegisterSocket(MacAsyncSocket* s) {
+  sockets_.insert(s);
+}
+
+void MacBaseSocketServer::UnregisterSocket(MacAsyncSocket* s) {
+  size_t found = sockets_.erase(s);
+  ASSERT(found == 1);
+}
+
+void MacBaseSocketServer::EnableSocketCallbacks(bool enable) {
+  for (std::set<MacAsyncSocket*>::iterator it = sockets().begin();
+       it != sockets().end(); ++it) {
+    if (enable) {
+      (*it)->EnableCallbacks();
+    } else {
+      (*it)->DisableCallbacks();
+    }
+  }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// MacCFSocketServer
+///////////////////////////////////////////////////////////////////////////////
+
+void WakeUpCallback(void* info) {
+  MacCFSocketServer* server = static_cast<MacCFSocketServer*>(info);
+  ASSERT(NULL != server);
+  server->OnWakeUpCallback();
+}
+
+MacCFSocketServer::MacCFSocketServer()
+    : run_loop_(CFRunLoopGetCurrent()),
+      wake_up_(NULL) {
+  CFRunLoopSourceContext ctx;
+  memset(&ctx, 0, sizeof(ctx));
+  ctx.info = this;
+  ctx.perform = &WakeUpCallback;
+  wake_up_ = CFRunLoopSourceCreate(NULL, 0, &ctx);
+  ASSERT(NULL != wake_up_);
+  if (wake_up_) {
+    CFRunLoopAddSource(run_loop_, wake_up_, kCFRunLoopCommonModes);
+  }
+}
+
+MacCFSocketServer::~MacCFSocketServer() {
+  if (wake_up_) {
+    CFRunLoopSourceInvalidate(wake_up_);
+    CFRelease(wake_up_);
+  }
+}
+
+bool MacCFSocketServer::Wait(int cms, bool process_io) {
+  ASSERT(CFRunLoopGetCurrent() == run_loop_);
+
+  if (!process_io && cms == 0) {
+    // No op.
+    return true;
+  }
+
+  if (!process_io) {
+    // No way to listen to common modes and not get socket events, unless
+    // we disable each one's callbacks.
+    EnableSocketCallbacks(false);
+  }
+
+  SInt32 result;
+  if (kForever == cms) {
+    do {
+      // Would prefer to run in a custom mode that only listens to wake_up,
+      // but we have qtkit sending work to the main thread which is effectively
+      // blocked here, causing deadlock.  Thus listen to the common modes.
+      // TODO: If QTKit becomes thread safe, do the above.
+      result = CFRunLoopRunInMode(kCFRunLoopDefaultMode, 10000000, false);
+    } while (result != kCFRunLoopRunFinished && result != kCFRunLoopRunStopped);
+  } else {
+    // TODO: In the case of 0ms wait, this will only process one event, so we
+    // may want to loop until it returns TimedOut.
+    CFTimeInterval seconds = cms / 1000.0;
+    result = CFRunLoopRunInMode(kCFRunLoopDefaultMode, seconds, false);
+  }
+
+  if (!process_io) {
+    // Reenable them.  Hopefully this won't cause spurious callbacks or
+    // missing ones while they were disabled.
+    EnableSocketCallbacks(true);
+  }
+
+  if (kCFRunLoopRunFinished == result) {
+    return false;
+  }
+  return true;
+}
+
+void MacCFSocketServer::WakeUp() {
+  if (wake_up_) {
+    CFRunLoopSourceSignal(wake_up_);
+    CFRunLoopWakeUp(run_loop_);
+  }
+}
+
+void MacCFSocketServer::OnWakeUpCallback() {
+  ASSERT(run_loop_ == CFRunLoopGetCurrent());
+  CFRunLoopStop(run_loop_);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// MacCarbonSocketServer
+///////////////////////////////////////////////////////////////////////////////
+
+const UInt32 kEventClassSocketServer = 'MCSS';
+const UInt32 kEventWakeUp = 'WAKE';
+const EventTypeSpec kEventWakeUpSpec[] = {
+  { kEventClassSocketServer, kEventWakeUp }
+};
+
+MacCarbonSocketServer::MacCarbonSocketServer()
+    : event_queue_(GetCurrentEventQueue()), wake_up_(NULL) {
+  VERIFY(noErr == CreateEvent(NULL, kEventClassSocketServer, kEventWakeUp, 0,
+                              kEventAttributeUserEvent, &wake_up_));
+}
+
+MacCarbonSocketServer::~MacCarbonSocketServer() {
+  if (wake_up_) {
+    ReleaseEvent(wake_up_);
+  }
+}
+
+bool MacCarbonSocketServer::Wait(int cms, bool process_io) {
+  ASSERT(GetCurrentEventQueue() == event_queue_);
+
+  // Listen to all events if we're processing I/O.
+  // Only listen for our wakeup event if we're not.
+  UInt32 num_types = 0;
+  const EventTypeSpec* events = NULL;
+  if (!process_io) {
+    num_types = GetEventTypeCount(kEventWakeUpSpec);
+    events = kEventWakeUpSpec;
+  }
+
+  EventTargetRef target = GetEventDispatcherTarget();
+  EventTimeout timeout =
+      (kForever == cms) ? kEventDurationForever : cms / 1000.0;
+  EventTimeout end_time = GetCurrentEventTime() + timeout;
+
+  bool done = false;
+  while (!done) {
+    EventRef event;
+    OSStatus result = ReceiveNextEvent(num_types, events, timeout, true,
+                                       &event);
+    if (noErr == result) {
+      if (wake_up_ != event) {
+        LOG_F(LS_VERBOSE) << "Dispatching event: " << DecodeEvent(event);
+        result = SendEventToEventTarget(event, target);
+        if ((noErr != result) && (eventNotHandledErr != result)) {
+          LOG_E(LS_ERROR, OS, result) << "SendEventToEventTarget";
+        }
+      } else {
+        done = true;
+      }
+      ReleaseEvent(event);
+    } else if (eventLoopTimedOutErr == result) {
+      ASSERT(cms != kForever);
+      done = true;
+    } else if (eventLoopQuitErr == result) {
+      // Ignore this... we get spurious quits for a variety of reasons.
+      LOG_E(LS_VERBOSE, OS, result) << "ReceiveNextEvent";
+    } else {
+      // Some strange error occurred. Log it.
+      LOG_E(LS_WARNING, OS, result) << "ReceiveNextEvent";
+      return false;
+    }
+    if (kForever != cms) {
+      timeout = end_time - GetCurrentEventTime();
+    }
+  }
+  return true;
+}
+
+void MacCarbonSocketServer::WakeUp() {
+  if (!IsEventInQueue(event_queue_, wake_up_)) {
+    RetainEvent(wake_up_);
+    OSStatus result = PostEventToQueue(event_queue_, wake_up_,
+                                       kEventPriorityStandard);
+    if (noErr != result) {
+      LOG_E(LS_ERROR, OS, result) << "PostEventToQueue";
+    }
+  }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// MacCarbonAppSocketServer
+///////////////////////////////////////////////////////////////////////////////
+
+MacCarbonAppSocketServer::MacCarbonAppSocketServer()
+    : event_queue_(GetCurrentEventQueue()) {
+  // Install event handler
+  VERIFY(noErr == InstallApplicationEventHandler(
+      NewEventHandlerUPP(WakeUpEventHandler), 1, kEventWakeUpSpec, this,
+      &event_handler_));
+
+  // Install a timer and set it idle to begin with.
+  VERIFY(noErr == InstallEventLoopTimer(GetMainEventLoop(),
+                                        kEventDurationForever,
+                                        kEventDurationForever,
+                                        NewEventLoopTimerUPP(TimerHandler),
+                                        this,
+                                        &timer_));
+}
+
+MacCarbonAppSocketServer::~MacCarbonAppSocketServer() {
+  RemoveEventLoopTimer(timer_);
+  RemoveEventHandler(event_handler_);
+}
+
+OSStatus MacCarbonAppSocketServer::WakeUpEventHandler(
+    EventHandlerCallRef next, EventRef event, void *data) {
+  QuitApplicationEventLoop();
+  return noErr;
+}
+
+void MacCarbonAppSocketServer::TimerHandler(
+    EventLoopTimerRef timer, void *data) {
+  QuitApplicationEventLoop();
+}
+
+bool MacCarbonAppSocketServer::Wait(int cms, bool process_io) {
+  if (!process_io && cms == 0) {
+    // No op.
+    return true;
+  }
+  if (kForever != cms) {
+    // Start a timer.
+    OSStatus error =
+        SetEventLoopTimerNextFireTime(timer_, cms / 1000.0);
+    if (error != noErr) {
+      LOG(LS_ERROR) << "Failed setting next fire time.";
+    }
+  }
+  if (!process_io) {
+    // No way to listen to common modes and not get socket events, unless
+    // we disable each one's callbacks.
+    EnableSocketCallbacks(false);
+  }
+  RunApplicationEventLoop();
+  if (!process_io) {
+    // Reenable them.  Hopefully this won't cause spurious callbacks or
+    // missing ones while they were disabled.
+    EnableSocketCallbacks(true);
+  }
+  return true;
+}
+
+void MacCarbonAppSocketServer::WakeUp() {
+  // TODO: No-op if there's already a WakeUp in flight.
+  EventRef wake_up;
+  VERIFY(noErr == CreateEvent(NULL, kEventClassSocketServer, kEventWakeUp, 0,
+                              kEventAttributeUserEvent, &wake_up));
+  OSStatus result = PostEventToQueue(event_queue_, wake_up,
+                                       kEventPriorityStandard);
+  if (noErr != result) {
+    LOG_E(LS_ERROR, OS, result) << "PostEventToQueue";
+  }
+  ReleaseEvent(wake_up);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// MacNotificationsSocketServer
+///////////////////////////////////////////////////////////////////////////////
+
+static const CFStringRef kNotificationName =
+  CFSTR("MacNotificationsSocketServer");
+
+MacNotificationsSocketServer::MacNotificationsSocketServer()
+    : sent_notification_(false) {
+  CFNotificationCenterRef nc = CFNotificationCenterGetLocalCenter();
+
+  // Passing NULL for the observed object
+  CFNotificationCenterAddObserver(
+      nc, this, NotificationCallBack, kNotificationName, NULL,
+      CFNotificationSuspensionBehaviorDeliverImmediately);
+}
+
+MacNotificationsSocketServer::~MacNotificationsSocketServer() {
+  CFNotificationCenterRef nc = CFNotificationCenterGetLocalCenter();
+  CFNotificationCenterRemoveObserver(nc, this, kNotificationName, NULL);
+}
+
+bool MacNotificationsSocketServer::Wait(int cms, bool process_io) {
+  return cms == 0;
+}
+
+void MacNotificationsSocketServer::WakeUp() {
+  // We could be invoked recursively, so this stops the infinite loop
+  if (!sent_notification_) {
+    sent_notification_ = true;
+    CFNotificationCenterRef nc = CFNotificationCenterGetLocalCenter();
+    CFNotificationCenterPostNotification(nc, kNotificationName, this, NULL,
+                                         true);
+    sent_notification_ = false;
+  }
+}
+
+void MacNotificationsSocketServer::NotificationCallBack(
+    CFNotificationCenterRef center, void* observer, CFStringRef name,
+    const void* object, CFDictionaryRef userInfo) {
+
+  ASSERT(CFStringCompare(name, kNotificationName, 0) == kCFCompareEqualTo);
+  ASSERT(userInfo == NULL);
+
+  // We have thread messages to process.
+  Thread* thread = Thread::Current();
+  if (thread == NULL) {
+    // We're shutting down
+    return;
+  }
+
+  Message msg;
+  while (thread->Get(&msg, 0)) {
+    thread->Dispatch(&msg);
+  }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+
+} // namespace talk_base
diff --git a/talk/base/macsocketserver.h b/talk/base/macsocketserver.h
new file mode 100644
index 0000000..6a4a729
--- /dev/null
+++ b/talk/base/macsocketserver.h
@@ -0,0 +1,159 @@
+// Copyright 2007, Google Inc.
+
+
+#ifndef TALK_BASE_MACSOCKETSERVER_H__
+#define TALK_BASE_MACSOCKETSERVER_H__
+
+#include <set>
+#ifdef OSX // Invalid on IOS
+#include <Carbon/Carbon.h>
+#endif
+#include "talk/base/physicalsocketserver.h"
+
+namespace talk_base {
+
+///////////////////////////////////////////////////////////////////////////////
+// MacBaseSocketServer
+///////////////////////////////////////////////////////////////////////////////
+class MacAsyncSocket;
+
+class MacBaseSocketServer : public PhysicalSocketServer {
+ public:
+  MacBaseSocketServer();
+  virtual ~MacBaseSocketServer();
+
+  // SocketServer Interface
+  virtual Socket* CreateSocket(int type) { return NULL; }
+  virtual AsyncSocket* CreateAsyncSocket(int type);
+  virtual bool Wait(int cms, bool process_io) = 0;
+  virtual void WakeUp() = 0;
+
+  void RegisterSocket(MacAsyncSocket* socket);
+  void UnregisterSocket(MacAsyncSocket* socket);
+
+ protected:
+  void EnableSocketCallbacks(bool enable);
+  const std::set<MacAsyncSocket*>& sockets() {
+    return sockets_;
+  }
+
+ private:
+  std::set<MacAsyncSocket*> sockets_;
+};
+
+// Core Foundation implementation of the socket server. While idle it
+// will run the current CF run loop. When the socket server has work
+// to do the run loop will be paused. Does not support Carbon or Cocoa
+// UI interaction.
+class MacCFSocketServer : public MacBaseSocketServer {
+ public:
+  MacCFSocketServer();
+  virtual ~MacCFSocketServer();
+
+  // SocketServer Interface
+  virtual bool Wait(int cms, bool process_io);
+  virtual void WakeUp();
+  void OnWakeUpCallback();
+
+ private:
+  CFRunLoopRef run_loop_;
+  CFRunLoopSourceRef wake_up_;
+};
+
+#ifdef OSX
+
+///////////////////////////////////////////////////////////////////////////////
+// MacCarbonSocketServer
+///////////////////////////////////////////////////////////////////////////////
+
+// Interacts with the Carbon event queue. While idle it will block,
+// waiting for events. When the socket server has work to do, it will
+// post a 'wake up' event to the queue, causing the thread to exit the
+// event loop until the next call to Wait. Other events are dispatched
+// to their target. Supports Carbon and Cocoa UI interaction.
+class MacCarbonSocketServer : public MacBaseSocketServer {
+ public:
+  MacCarbonSocketServer();
+  virtual ~MacCarbonSocketServer();
+
+  // SocketServer Interface
+  virtual bool Wait(int cms, bool process_io);
+  virtual void WakeUp();
+
+ private:
+  EventQueueRef event_queue_;
+  EventRef wake_up_;
+};
+
+///////////////////////////////////////////////////////////////////////////////
+// MacCarbonAppSocketServer
+///////////////////////////////////////////////////////////////////////////////
+
+// Runs the Carbon application event loop on the current thread while
+// idle. When the socket server has work to do, it will post an event
+// to the queue, causing the thread to exit the event loop until the
+// next call to Wait. Other events are automatically dispatched to
+// their target.
+class MacCarbonAppSocketServer : public MacBaseSocketServer {
+ public:
+  MacCarbonAppSocketServer();
+  virtual ~MacCarbonAppSocketServer();
+
+  // SocketServer Interface
+  virtual bool Wait(int cms, bool process_io);
+  virtual void WakeUp();
+
+ private:
+  static OSStatus WakeUpEventHandler(EventHandlerCallRef next, EventRef event,
+                                     void *data);
+  static void TimerHandler(EventLoopTimerRef timer, void *data);
+
+  EventQueueRef event_queue_;
+  EventHandlerRef event_handler_;
+  EventLoopTimerRef timer_;
+};
+
+#endif
+
+///////////////////////////////////////////////////////////////////////////////
+// MacNotificationsSocketServer
+///////////////////////////////////////////////////////////////////////////////
+
+// The name "SocketServer" is misleading for this class. This class inherits
+// from SocketServer, some variants of which create/use physical sockets
+// (specifically, PhysicalSocketServer). But generally, this class is a way for
+// a thread to schedule tasks (see task.h, thread.h and taskrunner.h).
+//
+// Since we don't want to write a custom Cocoa event loop, we will use this
+// in a non-standard way. The "Wait" method will never actually wait - it will
+// return false if cms > 0. Whenever a task needs to be woken up, the WakeUp
+// method here will get called, and will cause the thread to cycle through all
+// messages currently available.
+
+class MacNotificationsSocketServer : public SocketServer {
+ public:
+  MacNotificationsSocketServer();
+  virtual ~MacNotificationsSocketServer();
+
+  // SocketServer Interface
+  virtual Socket* CreateSocket(int type) { return NULL; }
+  virtual AsyncSocket* CreateAsyncSocket(int type) { return NULL; }
+  // process_io argument is ignored.
+  virtual bool Wait(int cms, bool process_io);
+  virtual void WakeUp();
+
+ private:
+  static void NotificationCallBack(CFNotificationCenterRef center,
+                                   void* observer,
+                                   CFStringRef name,
+                                   const void* object,
+                                   CFDictionaryRef userInfo);
+
+  bool sent_notification_;
+};
+
+///////////////////////////////////////////////////////////////////////////////
+
+} // namespace talk_base
+
+#endif  // TALK_BASE_MACSOCKETSERVER_H__
diff --git a/talk/libjingle.scons b/talk/libjingle.scons
index 989ac70..1b7f10b 100644
--- a/talk/libjingle.scons
+++ b/talk/libjingle.scons
@@ -71,7 +71,9 @@
                "sound/soundsystemproxy.cc",
              ],
              mac_srcs = [
+               "base/macasyncsocket.cc",
                "base/macconversion.cc",
+               "base/macsocketserver.cc",
                "base/macutils.cc",
                "session/phone/carbonvideorenderer.cc",
                "session/phone/devicemanager_mac.mm",