Update mac build with async sockets.
git-svn-id: http://libjingle.googlecode.com/svn/trunk@66 dd674b97-3498-5ee5-1854-bdd07cd0ff33
diff --git a/CHANGELOG b/CHANGELOG
index 88c5d23..4909ffe 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,7 @@
Libjingle
0.5.6 - Jun 2, 2011
+ - Improved mac socket server
- Add IqTask
- Flush output in examples/call
- Bug fixes
diff --git a/talk/base/macasyncsocket.cc b/talk/base/macasyncsocket.cc
new file mode 100644
index 0000000..e31be57
--- /dev/null
+++ b/talk/base/macasyncsocket.cc
@@ -0,0 +1,437 @@
+// Copyright 2010 Google Inc. All Rights Reserved.
+
+// thaloun@google.com (Tim Haloun)
+//
+// MacAsyncSocket is a kind of AsyncSocket. It does not support the SOCK_DGRAM
+// type (yet). It works asynchronously, which means that users of this socket
+// should connect to the various events declared in asyncsocket.h to receive
+// notifications about this socket. It uses CFSockets for signals, but prefers
+// the basic bsd socket operations rather than their CFSocket wrappers when
+// possible.
+
+#include <CoreFoundation/CoreFoundation.h>
+#include <fcntl.h>
+
+#include "talk/base/macasyncsocket.h"
+
+#include "talk/base/logging.h"
+#include "talk/base/macsocketserver.h"
+
+namespace talk_base {
+
+static const int kCallbackFlags = kCFSocketReadCallBack |
+ kCFSocketConnectCallBack |
+ kCFSocketWriteCallBack;
+
+MacAsyncSocket::MacAsyncSocket(MacBaseSocketServer* ss)
+ : ss_(ss),
+ socket_(NULL),
+ native_socket_(INVALID_SOCKET),
+ source_(NULL),
+ current_callbacks_(0),
+ disabled_(false),
+ error_(0),
+ state_(CS_CLOSED) {
+ Initialize();
+}
+
+MacAsyncSocket::~MacAsyncSocket() {
+ Close();
+}
+
+// Returns the address to which the socket is bound. If the socket is not
+// bound, then the any-address is returned.
+SocketAddress MacAsyncSocket::GetLocalAddress() const {
+ SocketAddress address;
+
+ // The CFSocket doesn't pick up on implicit binds from the connect call.
+ // Calling bind in before connect explicitly causes errors, so just query
+ // the underlying bsd socket.
+ sockaddr_in addr;
+ socklen_t addrlen = sizeof(addr);
+ int result = ::getsockname(native_socket_,
+ reinterpret_cast<sockaddr*>(&addr), &addrlen);
+ if (result >= 0) {
+ ASSERT(addrlen == sizeof(addr));
+ address.FromSockAddr(addr);
+ }
+ return address;
+}
+
+// Returns the address to which the socket is connected. If the socket is not
+// connected, then the any-address is returned.
+SocketAddress MacAsyncSocket::GetRemoteAddress() const {
+ SocketAddress address;
+
+ // Use native_socket for consistency with GetLocalAddress.
+ sockaddr_in addr;
+ socklen_t addrlen = sizeof(addr);
+ int result = ::getpeername(native_socket_,
+ reinterpret_cast<sockaddr*>(&addr), &addrlen);
+ if (result >= 0) {
+ ASSERT(addrlen == sizeof(addr));
+ address.FromSockAddr(addr);
+ }
+ return address;
+}
+
+// Bind the socket to a local address.
+int MacAsyncSocket::Bind(const SocketAddress& address) {
+ sockaddr_in saddr;
+ address.ToSockAddr(&saddr);
+ int err = ::bind(native_socket_, reinterpret_cast<sockaddr*>(&saddr),
+ sizeof(saddr));
+ if (err == SOCKET_ERROR) error_ = errno;
+ return err;
+}
+
+// Connect to a remote address.
+int MacAsyncSocket::Connect(const SocketAddress& address) {
+ if (!valid()) {
+ Initialize();
+ if (!valid())
+ return SOCKET_ERROR;
+ }
+
+ SocketAddress addr2(address);
+ if (addr2.IsUnresolved()) {
+ LOG(LS_VERBOSE) << "Resolving addr in MacAsyncSocket::Connect";
+ // TODO: Convert to using AsyncResolver
+ if (!addr2.ResolveIP(false, &error_)) {
+ return SOCKET_ERROR;
+ }
+ }
+
+ sockaddr_in saddr;
+ addr2.ToSockAddr(&saddr);
+ int result = ::connect(native_socket_, reinterpret_cast<sockaddr*>(&saddr),
+ sizeof(saddr));
+
+ if (result != SOCKET_ERROR) {
+ state_ = CS_CONNECTED;
+ } else {
+ error_ = errno;
+ if (error_ == EINPROGRESS) {
+ state_ = CS_CONNECTING;
+ result = 0;
+ }
+ }
+ return result;
+}
+
+// Send to the remote end we're connected to.
+int MacAsyncSocket::Send(const void* pv, size_t cb) {
+ if (!valid()) {
+ return SOCKET_ERROR;
+ }
+
+ int sent = ::send(native_socket_, pv, cb, 0);
+
+ if (sent == SOCKET_ERROR) {
+ error_ = errno;
+
+ if (IsBlocking()) {
+ // Reenable the writable callback (once), since we are flow controlled.
+ LOG(LS_VERBOSE) << "Enabling flow control callback.";
+ CFSocketEnableCallBacks(socket_, kCallbackFlags);
+ current_callbacks_ = kCallbackFlags;
+ }
+ }
+ return sent;
+}
+
+// Send to the given address. We may or may not be connected to anyone.
+int MacAsyncSocket::SendTo(const void* pv, size_t cb,
+ const SocketAddress& address) {
+ if (!valid()) {
+ return SOCKET_ERROR;
+ }
+
+ sockaddr_in saddr;
+ address.ToSockAddr(&saddr);
+ int sent = ::sendto(native_socket_, pv, cb, 0,
+ reinterpret_cast<sockaddr*>(&saddr), sizeof(saddr));
+
+ if (sent == SOCKET_ERROR) {
+ error_ = errno;
+ }
+
+ return sent;
+}
+
+// Read data received from the remote end we're connected to.
+int MacAsyncSocket::Recv(void* pv, size_t cb) {
+ int received = ::recv(native_socket_, reinterpret_cast<char*>(pv), cb, 0);
+ if (received == SOCKET_ERROR) error_ = errno;
+
+ // Recv should only be called when there is data to read
+ ASSERT((received != 0) || (cb == 0));
+ return received;
+}
+
+// Read data received from any remote party
+int MacAsyncSocket::RecvFrom(void* pv, size_t cb, SocketAddress* paddr) {
+ sockaddr_in saddr;
+ socklen_t cbAddr = sizeof(saddr);
+ int received = ::recvfrom(native_socket_, reinterpret_cast<char*>(pv), cb, 0,
+ reinterpret_cast<sockaddr*>(&saddr), &cbAddr);
+ if (received >= 0 && paddr != NULL) {
+ paddr->FromSockAddr(saddr);
+ } else if (received == SOCKET_ERROR) {
+ error_ = errno;
+ }
+ return received;
+}
+
+int MacAsyncSocket::Listen(int backlog) {
+ if (!valid()) {
+ return SOCKET_ERROR;
+ }
+
+ int res = ::listen(native_socket_, backlog);
+ if (res != SOCKET_ERROR)
+ state_ = CS_CONNECTING;
+ else
+ error_ = errno;
+
+ return res;
+}
+
+MacAsyncSocket* MacAsyncSocket::Accept(SocketAddress* paddr) {
+ sockaddr_in saddr;
+ socklen_t cbAddr = sizeof(saddr);
+
+ int socket_fd = ::accept(native_socket_, reinterpret_cast<sockaddr*>(&saddr),
+ &cbAddr);
+ if (socket_fd == INVALID_SOCKET) {
+ error_ = errno;
+ return NULL;
+ }
+
+ MacAsyncSocket* s = new MacAsyncSocket(ss_, socket_fd);
+ if (s && s->valid()) {
+ s->state_ = CS_CONNECTED;
+ if (paddr)
+ paddr->FromSockAddr(saddr);
+ } else {
+ delete s;
+ s = NULL;
+ }
+ return s;
+}
+
+int MacAsyncSocket::Close() {
+ if (source_ != NULL) {
+ CFRunLoopSourceInvalidate(source_);
+ CFRelease(source_);
+ if (ss_) ss_->UnregisterSocket(this);
+ source_ = NULL;
+ }
+
+ if (socket_ != NULL) {
+ CFSocketInvalidate(socket_);
+ CFRelease(socket_);
+ socket_ = NULL;
+ }
+
+ native_socket_ = INVALID_SOCKET; // invalidates the socket
+ error_ = 0;
+ state_ = CS_CLOSED;
+ return 0;
+}
+
+int MacAsyncSocket::EstimateMTU(uint16* mtu) {
+ ASSERT(false && "NYI");
+ return -1;
+}
+
+int MacAsyncSocket::GetError() const {
+ return error_;
+}
+
+void MacAsyncSocket::SetError(int error) {
+ error_ = error;
+}
+
+Socket::ConnState MacAsyncSocket::GetState() const {
+ return state_;
+}
+
+int MacAsyncSocket::GetOption(Option opt, int* value) {
+ ASSERT(false && "NYI");
+ return -1;
+}
+
+int MacAsyncSocket::SetOption(Option opt, int value) {
+ ASSERT(false && "NYI");
+ return -1;
+}
+
+void MacAsyncSocket::EnableCallbacks() {
+ if (valid()) {
+ disabled_ = false;
+ CFSocketEnableCallBacks(socket_, current_callbacks_);
+ }
+}
+
+void MacAsyncSocket::DisableCallbacks() {
+ if (valid()) {
+ disabled_ = true;
+ CFSocketDisableCallBacks(socket_, kCallbackFlags);
+ }
+}
+
+MacAsyncSocket::MacAsyncSocket(MacBaseSocketServer* ss, int native_socket)
+ : ss_(ss),
+ socket_(NULL),
+ native_socket_(native_socket),
+ source_(NULL),
+ current_callbacks_(0),
+ disabled_(false),
+ error_(0),
+ state_(CS_CLOSED) {
+ Initialize();
+}
+
+// Create a new socket, wrapping the native socket if provided or creating one
+// otherwise. In case of any failure, consume the native socket. We assume the
+// wrapped socket is in the closed state. If this is not the case you must
+// update the state_ field for this socket yourself.
+void MacAsyncSocket::Initialize() {
+ CFSocketContext ctx = { 0 };
+ ctx.info = this;
+
+ // First create the CFSocket
+ CFSocketRef cf_socket = NULL;
+ bool res = false;
+ if (native_socket_ == INVALID_SOCKET) {
+ cf_socket = CFSocketCreate(kCFAllocatorDefault,
+ PF_INET, SOCK_STREAM, IPPROTO_TCP,
+ kCallbackFlags, MacAsyncSocketCallBack, &ctx);
+ } else {
+ cf_socket = CFSocketCreateWithNative(kCFAllocatorDefault,
+ native_socket_, kCallbackFlags,
+ MacAsyncSocketCallBack, &ctx);
+ }
+
+ if (cf_socket) {
+ res = true;
+ socket_ = cf_socket;
+ native_socket_ = CFSocketGetNative(cf_socket);
+ current_callbacks_ = kCallbackFlags;
+ }
+
+ if (res) {
+ // Make the underlying socket asynchronous
+ res = (-1 != ::fcntl(native_socket_, F_SETFL,
+ ::fcntl(native_socket_, F_GETFL, 0) | O_NONBLOCK));
+ }
+
+ if (res) {
+ // Add this socket to the run loop
+ source_ = CFSocketCreateRunLoopSource(kCFAllocatorDefault, socket_, 0);
+ res = (source_ != NULL);
+ }
+
+ if (res) {
+ if (ss_) ss_->RegisterSocket(this);
+ CFRunLoopAddSource(CFRunLoopGetCurrent(), source_, kCFRunLoopCommonModes);
+ }
+
+ if (!res) {
+ int error = errno;
+ Close(); // Clears error_.
+ error_ = error;
+ }
+}
+
+// Call CFRelease on the result when done using it
+CFDataRef MacAsyncSocket::CopyCFAddress(const SocketAddress& address) {
+ sockaddr_in saddr;
+ address.ToSockAddr(&saddr);
+
+ const UInt8* bytes = reinterpret_cast<UInt8*>(&saddr);
+
+ CFDataRef cf_address = CFDataCreate(kCFAllocatorDefault,
+ bytes, sizeof(saddr));
+
+ ASSERT(cf_address != NULL);
+ return cf_address;
+}
+
+void MacAsyncSocket::MacAsyncSocketCallBack(CFSocketRef s,
+ CFSocketCallBackType callbackType,
+ CFDataRef address,
+ const void* data,
+ void* info) {
+ MacAsyncSocket* this_socket =
+ reinterpret_cast<MacAsyncSocket*>(info);
+ ASSERT(this_socket != NULL && this_socket->socket_ == s);
+
+ // Don't signal any socket messages if the socketserver is not listening on
+ // them. When we are reenabled they will be requeued and will fire again.
+ if (this_socket->disabled_)
+ return;
+
+ switch (callbackType) {
+ case kCFSocketReadCallBack:
+ // This callback is invoked in one of 3 situations:
+ // 1. A new connection is waiting to be accepted.
+ // 2. The remote end closed the connection (a recv will return 0).
+ // 3. Data is available to read.
+ // 4. The connection closed unhappily (recv will return -1).
+ if (this_socket->state_ == CS_CONNECTING) {
+ // Case 1.
+ this_socket->SignalReadEvent(this_socket);
+ } else {
+ char ch, amt;
+ amt = ::recv(this_socket->native_socket_, &ch, 1, MSG_PEEK);
+ if (amt == 0) {
+ // Case 2.
+ this_socket->state_ = CS_CLOSED;
+
+ // Disable additional callbacks or we will signal close twice.
+ CFSocketDisableCallBacks(this_socket->socket_, kCFSocketReadCallBack);
+ this_socket->current_callbacks_ &= ~kCFSocketReadCallBack;
+ this_socket->SignalCloseEvent(this_socket, 0);
+ } else if (amt > 0) {
+ // Case 3.
+ this_socket->SignalReadEvent(this_socket);
+ } else {
+ // Case 4.
+ int error = errno;
+ if (error == EAGAIN) {
+ // Observed in practice. Let's hope it's a spurious or out of date
+ // signal, since we just eat it.
+ } else {
+ this_socket->error_ = error;
+ this_socket->SignalCloseEvent(this_socket, error);
+ }
+ }
+ }
+ break;
+
+ case kCFSocketConnectCallBack:
+ if (data != NULL) {
+ // An error occured in the background while connecting
+ this_socket->error_ = errno;
+ this_socket->state_ = CS_CLOSED;
+ this_socket->SignalCloseEvent(this_socket, this_socket->error_);
+ } else {
+ this_socket->state_ = CS_CONNECTED;
+ this_socket->SignalConnectEvent(this_socket);
+ }
+ break;
+
+ case kCFSocketWriteCallBack:
+ // Update our callback tracking. Write doesn't reenable, so it's off now.
+ this_socket->current_callbacks_ &= ~kCFSocketWriteCallBack;
+ this_socket->SignalWriteEvent(this_socket);
+ break;
+
+ default:
+ ASSERT(false && "Invalid callback type for socket");
+ }
+}
+
+} // namespace talk_base
diff --git a/talk/base/macasyncsocket.h b/talk/base/macasyncsocket.h
new file mode 100644
index 0000000..10a1c10
--- /dev/null
+++ b/talk/base/macasyncsocket.h
@@ -0,0 +1,84 @@
+// Copyright 2008 Google Inc. All Rights Reserved.
+
+//
+// MacAsyncSocket is a kind of AsyncSocket. It only creates sockets
+// of the TCP type, and does not (yet) support listen and accept. It works
+// asynchronously, which means that users of this socket should connect to
+// the various events declared in asyncsocket.h to receive notifications about
+// this socket.
+
+#ifndef TALK_BASE_MACASYNCSOCKET_H__
+#define TALK_BASE_MACASYNCSOCKET_H__
+
+#include <CoreFoundation/CoreFoundation.h>
+
+#include "talk/base/asyncsocket.h"
+
+namespace talk_base {
+
+class MacBaseSocketServer;
+
+class MacAsyncSocket : public AsyncSocket {
+ public:
+ MacAsyncSocket(MacBaseSocketServer* ss);
+ virtual ~MacAsyncSocket();
+
+ bool valid() const { return source_ != NULL; }
+
+ // Socket interface
+ virtual SocketAddress GetLocalAddress() const;
+ virtual SocketAddress GetRemoteAddress() const;
+ virtual int Bind(const SocketAddress& addr);
+ virtual int Connect(const SocketAddress& addr);
+ virtual int Send(const void* pv, size_t cb);
+ virtual int SendTo(const void* pv, size_t cb, const SocketAddress& addr);
+ virtual int Recv(void* pv, size_t cb);
+ virtual int RecvFrom(void* pv, size_t cb, SocketAddress* paddr);
+ virtual int Listen(int backlog);
+ virtual MacAsyncSocket* Accept(SocketAddress* paddr);
+ virtual int Close();
+ virtual int GetError() const;
+ virtual void SetError(int error);
+ virtual ConnState GetState() const;
+ virtual int EstimateMTU(uint16* mtu);
+ virtual int GetOption(Option opt, int* value);
+ virtual int SetOption(Option opt, int value);
+
+ // For the MacBaseSocketServer to disable callbacks when process_io is false.
+ void EnableCallbacks();
+ void DisableCallbacks();
+
+ private:
+ // Creates an async socket from an existing bsd socket
+ explicit MacAsyncSocket(MacBaseSocketServer* ss, int native_socket);
+
+ // Attaches the socket to the CFRunloop and sets the wrapped bsd socket
+ // to async mode
+ void Initialize();
+
+ // Translate the SocketAddress into a CFDataRef to pass to CF socket
+ // functions. Caller must call CFRelease on the result when done.
+ static CFDataRef CopyCFAddress(const SocketAddress& address);
+
+ // Callback for the underlying CFSocketRef.
+ static void MacAsyncSocketCallBack(CFSocketRef s,
+ CFSocketCallBackType callbackType,
+ CFDataRef address,
+ const void* data,
+ void* info);
+
+ MacBaseSocketServer* ss_;
+ CFSocketRef socket_;
+ int native_socket_;
+ CFRunLoopSourceRef source_;
+ int current_callbacks_;
+ bool disabled_;
+ int error_;
+ ConnState state_;
+
+ DISALLOW_EVIL_CONSTRUCTORS(MacAsyncSocket);
+};
+
+} // namespace talk_base
+
+#endif // TALK_BASE_MACASYNCSOCKET_H__
diff --git a/talk/base/macsocketserver.cc b/talk/base/macsocketserver.cc
new file mode 100644
index 0000000..154b83d
--- /dev/null
+++ b/talk/base/macsocketserver.cc
@@ -0,0 +1,359 @@
+
+
+#include "talk/base/macsocketserver.h"
+
+#include "talk/base/common.h"
+#include "talk/base/logging.h"
+#include "talk/base/macasyncsocket.h"
+#include "talk/base/macutils.h"
+#include "talk/base/thread.h"
+
+namespace talk_base {
+
+///////////////////////////////////////////////////////////////////////////////
+// MacBaseSocketServer
+///////////////////////////////////////////////////////////////////////////////
+
+MacBaseSocketServer::MacBaseSocketServer() {
+}
+
+MacBaseSocketServer::~MacBaseSocketServer() {
+}
+
+AsyncSocket* MacBaseSocketServer::CreateAsyncSocket(int type) {
+ if (SOCK_STREAM != type)
+ return NULL;
+
+ MacAsyncSocket* socket = new MacAsyncSocket(this);
+ if (!socket->valid()) {
+ delete socket;
+ return NULL;
+ }
+ return socket;
+}
+
+void MacBaseSocketServer::RegisterSocket(MacAsyncSocket* s) {
+ sockets_.insert(s);
+}
+
+void MacBaseSocketServer::UnregisterSocket(MacAsyncSocket* s) {
+ size_t found = sockets_.erase(s);
+ ASSERT(found == 1);
+}
+
+void MacBaseSocketServer::EnableSocketCallbacks(bool enable) {
+ for (std::set<MacAsyncSocket*>::iterator it = sockets().begin();
+ it != sockets().end(); ++it) {
+ if (enable) {
+ (*it)->EnableCallbacks();
+ } else {
+ (*it)->DisableCallbacks();
+ }
+ }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// MacCFSocketServer
+///////////////////////////////////////////////////////////////////////////////
+
+void WakeUpCallback(void* info) {
+ MacCFSocketServer* server = static_cast<MacCFSocketServer*>(info);
+ ASSERT(NULL != server);
+ server->OnWakeUpCallback();
+}
+
+MacCFSocketServer::MacCFSocketServer()
+ : run_loop_(CFRunLoopGetCurrent()),
+ wake_up_(NULL) {
+ CFRunLoopSourceContext ctx;
+ memset(&ctx, 0, sizeof(ctx));
+ ctx.info = this;
+ ctx.perform = &WakeUpCallback;
+ wake_up_ = CFRunLoopSourceCreate(NULL, 0, &ctx);
+ ASSERT(NULL != wake_up_);
+ if (wake_up_) {
+ CFRunLoopAddSource(run_loop_, wake_up_, kCFRunLoopCommonModes);
+ }
+}
+
+MacCFSocketServer::~MacCFSocketServer() {
+ if (wake_up_) {
+ CFRunLoopSourceInvalidate(wake_up_);
+ CFRelease(wake_up_);
+ }
+}
+
+bool MacCFSocketServer::Wait(int cms, bool process_io) {
+ ASSERT(CFRunLoopGetCurrent() == run_loop_);
+
+ if (!process_io && cms == 0) {
+ // No op.
+ return true;
+ }
+
+ if (!process_io) {
+ // No way to listen to common modes and not get socket events, unless
+ // we disable each one's callbacks.
+ EnableSocketCallbacks(false);
+ }
+
+ SInt32 result;
+ if (kForever == cms) {
+ do {
+ // Would prefer to run in a custom mode that only listens to wake_up,
+ // but we have qtkit sending work to the main thread which is effectively
+ // blocked here, causing deadlock. Thus listen to the common modes.
+ // TODO: If QTKit becomes thread safe, do the above.
+ result = CFRunLoopRunInMode(kCFRunLoopDefaultMode, 10000000, false);
+ } while (result != kCFRunLoopRunFinished && result != kCFRunLoopRunStopped);
+ } else {
+ // TODO: In the case of 0ms wait, this will only process one event, so we
+ // may want to loop until it returns TimedOut.
+ CFTimeInterval seconds = cms / 1000.0;
+ result = CFRunLoopRunInMode(kCFRunLoopDefaultMode, seconds, false);
+ }
+
+ if (!process_io) {
+ // Reenable them. Hopefully this won't cause spurious callbacks or
+ // missing ones while they were disabled.
+ EnableSocketCallbacks(true);
+ }
+
+ if (kCFRunLoopRunFinished == result) {
+ return false;
+ }
+ return true;
+}
+
+void MacCFSocketServer::WakeUp() {
+ if (wake_up_) {
+ CFRunLoopSourceSignal(wake_up_);
+ CFRunLoopWakeUp(run_loop_);
+ }
+}
+
+void MacCFSocketServer::OnWakeUpCallback() {
+ ASSERT(run_loop_ == CFRunLoopGetCurrent());
+ CFRunLoopStop(run_loop_);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// MacCarbonSocketServer
+///////////////////////////////////////////////////////////////////////////////
+
+const UInt32 kEventClassSocketServer = 'MCSS';
+const UInt32 kEventWakeUp = 'WAKE';
+const EventTypeSpec kEventWakeUpSpec[] = {
+ { kEventClassSocketServer, kEventWakeUp }
+};
+
+MacCarbonSocketServer::MacCarbonSocketServer()
+ : event_queue_(GetCurrentEventQueue()), wake_up_(NULL) {
+ VERIFY(noErr == CreateEvent(NULL, kEventClassSocketServer, kEventWakeUp, 0,
+ kEventAttributeUserEvent, &wake_up_));
+}
+
+MacCarbonSocketServer::~MacCarbonSocketServer() {
+ if (wake_up_) {
+ ReleaseEvent(wake_up_);
+ }
+}
+
+bool MacCarbonSocketServer::Wait(int cms, bool process_io) {
+ ASSERT(GetCurrentEventQueue() == event_queue_);
+
+ // Listen to all events if we're processing I/O.
+ // Only listen for our wakeup event if we're not.
+ UInt32 num_types = 0;
+ const EventTypeSpec* events = NULL;
+ if (!process_io) {
+ num_types = GetEventTypeCount(kEventWakeUpSpec);
+ events = kEventWakeUpSpec;
+ }
+
+ EventTargetRef target = GetEventDispatcherTarget();
+ EventTimeout timeout =
+ (kForever == cms) ? kEventDurationForever : cms / 1000.0;
+ EventTimeout end_time = GetCurrentEventTime() + timeout;
+
+ bool done = false;
+ while (!done) {
+ EventRef event;
+ OSStatus result = ReceiveNextEvent(num_types, events, timeout, true,
+ &event);
+ if (noErr == result) {
+ if (wake_up_ != event) {
+ LOG_F(LS_VERBOSE) << "Dispatching event: " << DecodeEvent(event);
+ result = SendEventToEventTarget(event, target);
+ if ((noErr != result) && (eventNotHandledErr != result)) {
+ LOG_E(LS_ERROR, OS, result) << "SendEventToEventTarget";
+ }
+ } else {
+ done = true;
+ }
+ ReleaseEvent(event);
+ } else if (eventLoopTimedOutErr == result) {
+ ASSERT(cms != kForever);
+ done = true;
+ } else if (eventLoopQuitErr == result) {
+ // Ignore this... we get spurious quits for a variety of reasons.
+ LOG_E(LS_VERBOSE, OS, result) << "ReceiveNextEvent";
+ } else {
+ // Some strange error occurred. Log it.
+ LOG_E(LS_WARNING, OS, result) << "ReceiveNextEvent";
+ return false;
+ }
+ if (kForever != cms) {
+ timeout = end_time - GetCurrentEventTime();
+ }
+ }
+ return true;
+}
+
+void MacCarbonSocketServer::WakeUp() {
+ if (!IsEventInQueue(event_queue_, wake_up_)) {
+ RetainEvent(wake_up_);
+ OSStatus result = PostEventToQueue(event_queue_, wake_up_,
+ kEventPriorityStandard);
+ if (noErr != result) {
+ LOG_E(LS_ERROR, OS, result) << "PostEventToQueue";
+ }
+ }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// MacCarbonAppSocketServer
+///////////////////////////////////////////////////////////////////////////////
+
+MacCarbonAppSocketServer::MacCarbonAppSocketServer()
+ : event_queue_(GetCurrentEventQueue()) {
+ // Install event handler
+ VERIFY(noErr == InstallApplicationEventHandler(
+ NewEventHandlerUPP(WakeUpEventHandler), 1, kEventWakeUpSpec, this,
+ &event_handler_));
+
+ // Install a timer and set it idle to begin with.
+ VERIFY(noErr == InstallEventLoopTimer(GetMainEventLoop(),
+ kEventDurationForever,
+ kEventDurationForever,
+ NewEventLoopTimerUPP(TimerHandler),
+ this,
+ &timer_));
+}
+
+MacCarbonAppSocketServer::~MacCarbonAppSocketServer() {
+ RemoveEventLoopTimer(timer_);
+ RemoveEventHandler(event_handler_);
+}
+
+OSStatus MacCarbonAppSocketServer::WakeUpEventHandler(
+ EventHandlerCallRef next, EventRef event, void *data) {
+ QuitApplicationEventLoop();
+ return noErr;
+}
+
+void MacCarbonAppSocketServer::TimerHandler(
+ EventLoopTimerRef timer, void *data) {
+ QuitApplicationEventLoop();
+}
+
+bool MacCarbonAppSocketServer::Wait(int cms, bool process_io) {
+ if (!process_io && cms == 0) {
+ // No op.
+ return true;
+ }
+ if (kForever != cms) {
+ // Start a timer.
+ OSStatus error =
+ SetEventLoopTimerNextFireTime(timer_, cms / 1000.0);
+ if (error != noErr) {
+ LOG(LS_ERROR) << "Failed setting next fire time.";
+ }
+ }
+ if (!process_io) {
+ // No way to listen to common modes and not get socket events, unless
+ // we disable each one's callbacks.
+ EnableSocketCallbacks(false);
+ }
+ RunApplicationEventLoop();
+ if (!process_io) {
+ // Reenable them. Hopefully this won't cause spurious callbacks or
+ // missing ones while they were disabled.
+ EnableSocketCallbacks(true);
+ }
+ return true;
+}
+
+void MacCarbonAppSocketServer::WakeUp() {
+ // TODO: No-op if there's already a WakeUp in flight.
+ EventRef wake_up;
+ VERIFY(noErr == CreateEvent(NULL, kEventClassSocketServer, kEventWakeUp, 0,
+ kEventAttributeUserEvent, &wake_up));
+ OSStatus result = PostEventToQueue(event_queue_, wake_up,
+ kEventPriorityStandard);
+ if (noErr != result) {
+ LOG_E(LS_ERROR, OS, result) << "PostEventToQueue";
+ }
+ ReleaseEvent(wake_up);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// MacNotificationsSocketServer
+///////////////////////////////////////////////////////////////////////////////
+
+static const CFStringRef kNotificationName =
+ CFSTR("MacNotificationsSocketServer");
+
+MacNotificationsSocketServer::MacNotificationsSocketServer()
+ : sent_notification_(false) {
+ CFNotificationCenterRef nc = CFNotificationCenterGetLocalCenter();
+
+ // Passing NULL for the observed object
+ CFNotificationCenterAddObserver(
+ nc, this, NotificationCallBack, kNotificationName, NULL,
+ CFNotificationSuspensionBehaviorDeliverImmediately);
+}
+
+MacNotificationsSocketServer::~MacNotificationsSocketServer() {
+ CFNotificationCenterRef nc = CFNotificationCenterGetLocalCenter();
+ CFNotificationCenterRemoveObserver(nc, this, kNotificationName, NULL);
+}
+
+bool MacNotificationsSocketServer::Wait(int cms, bool process_io) {
+ return cms == 0;
+}
+
+void MacNotificationsSocketServer::WakeUp() {
+ // We could be invoked recursively, so this stops the infinite loop
+ if (!sent_notification_) {
+ sent_notification_ = true;
+ CFNotificationCenterRef nc = CFNotificationCenterGetLocalCenter();
+ CFNotificationCenterPostNotification(nc, kNotificationName, this, NULL,
+ true);
+ sent_notification_ = false;
+ }
+}
+
+void MacNotificationsSocketServer::NotificationCallBack(
+ CFNotificationCenterRef center, void* observer, CFStringRef name,
+ const void* object, CFDictionaryRef userInfo) {
+
+ ASSERT(CFStringCompare(name, kNotificationName, 0) == kCFCompareEqualTo);
+ ASSERT(userInfo == NULL);
+
+ // We have thread messages to process.
+ Thread* thread = Thread::Current();
+ if (thread == NULL) {
+ // We're shutting down
+ return;
+ }
+
+ Message msg;
+ while (thread->Get(&msg, 0)) {
+ thread->Dispatch(&msg);
+ }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+
+} // namespace talk_base
diff --git a/talk/base/macsocketserver.h b/talk/base/macsocketserver.h
new file mode 100644
index 0000000..6a4a729
--- /dev/null
+++ b/talk/base/macsocketserver.h
@@ -0,0 +1,159 @@
+// Copyright 2007, Google Inc.
+
+
+#ifndef TALK_BASE_MACSOCKETSERVER_H__
+#define TALK_BASE_MACSOCKETSERVER_H__
+
+#include <set>
+#ifdef OSX // Invalid on IOS
+#include <Carbon/Carbon.h>
+#endif
+#include "talk/base/physicalsocketserver.h"
+
+namespace talk_base {
+
+///////////////////////////////////////////////////////////////////////////////
+// MacBaseSocketServer
+///////////////////////////////////////////////////////////////////////////////
+class MacAsyncSocket;
+
+class MacBaseSocketServer : public PhysicalSocketServer {
+ public:
+ MacBaseSocketServer();
+ virtual ~MacBaseSocketServer();
+
+ // SocketServer Interface
+ virtual Socket* CreateSocket(int type) { return NULL; }
+ virtual AsyncSocket* CreateAsyncSocket(int type);
+ virtual bool Wait(int cms, bool process_io) = 0;
+ virtual void WakeUp() = 0;
+
+ void RegisterSocket(MacAsyncSocket* socket);
+ void UnregisterSocket(MacAsyncSocket* socket);
+
+ protected:
+ void EnableSocketCallbacks(bool enable);
+ const std::set<MacAsyncSocket*>& sockets() {
+ return sockets_;
+ }
+
+ private:
+ std::set<MacAsyncSocket*> sockets_;
+};
+
+// Core Foundation implementation of the socket server. While idle it
+// will run the current CF run loop. When the socket server has work
+// to do the run loop will be paused. Does not support Carbon or Cocoa
+// UI interaction.
+class MacCFSocketServer : public MacBaseSocketServer {
+ public:
+ MacCFSocketServer();
+ virtual ~MacCFSocketServer();
+
+ // SocketServer Interface
+ virtual bool Wait(int cms, bool process_io);
+ virtual void WakeUp();
+ void OnWakeUpCallback();
+
+ private:
+ CFRunLoopRef run_loop_;
+ CFRunLoopSourceRef wake_up_;
+};
+
+#ifdef OSX
+
+///////////////////////////////////////////////////////////////////////////////
+// MacCarbonSocketServer
+///////////////////////////////////////////////////////////////////////////////
+
+// Interacts with the Carbon event queue. While idle it will block,
+// waiting for events. When the socket server has work to do, it will
+// post a 'wake up' event to the queue, causing the thread to exit the
+// event loop until the next call to Wait. Other events are dispatched
+// to their target. Supports Carbon and Cocoa UI interaction.
+class MacCarbonSocketServer : public MacBaseSocketServer {
+ public:
+ MacCarbonSocketServer();
+ virtual ~MacCarbonSocketServer();
+
+ // SocketServer Interface
+ virtual bool Wait(int cms, bool process_io);
+ virtual void WakeUp();
+
+ private:
+ EventQueueRef event_queue_;
+ EventRef wake_up_;
+};
+
+///////////////////////////////////////////////////////////////////////////////
+// MacCarbonAppSocketServer
+///////////////////////////////////////////////////////////////////////////////
+
+// Runs the Carbon application event loop on the current thread while
+// idle. When the socket server has work to do, it will post an event
+// to the queue, causing the thread to exit the event loop until the
+// next call to Wait. Other events are automatically dispatched to
+// their target.
+class MacCarbonAppSocketServer : public MacBaseSocketServer {
+ public:
+ MacCarbonAppSocketServer();
+ virtual ~MacCarbonAppSocketServer();
+
+ // SocketServer Interface
+ virtual bool Wait(int cms, bool process_io);
+ virtual void WakeUp();
+
+ private:
+ static OSStatus WakeUpEventHandler(EventHandlerCallRef next, EventRef event,
+ void *data);
+ static void TimerHandler(EventLoopTimerRef timer, void *data);
+
+ EventQueueRef event_queue_;
+ EventHandlerRef event_handler_;
+ EventLoopTimerRef timer_;
+};
+
+#endif
+
+///////////////////////////////////////////////////////////////////////////////
+// MacNotificationsSocketServer
+///////////////////////////////////////////////////////////////////////////////
+
+// The name "SocketServer" is misleading for this class. This class inherits
+// from SocketServer, some variants of which create/use physical sockets
+// (specifically, PhysicalSocketServer). But generally, this class is a way for
+// a thread to schedule tasks (see task.h, thread.h and taskrunner.h).
+//
+// Since we don't want to write a custom Cocoa event loop, we will use this
+// in a non-standard way. The "Wait" method will never actually wait - it will
+// return false if cms > 0. Whenever a task needs to be woken up, the WakeUp
+// method here will get called, and will cause the thread to cycle through all
+// messages currently available.
+
+class MacNotificationsSocketServer : public SocketServer {
+ public:
+ MacNotificationsSocketServer();
+ virtual ~MacNotificationsSocketServer();
+
+ // SocketServer Interface
+ virtual Socket* CreateSocket(int type) { return NULL; }
+ virtual AsyncSocket* CreateAsyncSocket(int type) { return NULL; }
+ // process_io argument is ignored.
+ virtual bool Wait(int cms, bool process_io);
+ virtual void WakeUp();
+
+ private:
+ static void NotificationCallBack(CFNotificationCenterRef center,
+ void* observer,
+ CFStringRef name,
+ const void* object,
+ CFDictionaryRef userInfo);
+
+ bool sent_notification_;
+};
+
+///////////////////////////////////////////////////////////////////////////////
+
+} // namespace talk_base
+
+#endif // TALK_BASE_MACSOCKETSERVER_H__
diff --git a/talk/libjingle.scons b/talk/libjingle.scons
index 989ac70..1b7f10b 100644
--- a/talk/libjingle.scons
+++ b/talk/libjingle.scons
@@ -71,7 +71,9 @@
"sound/soundsystemproxy.cc",
],
mac_srcs = [
+ "base/macasyncsocket.cc",
"base/macconversion.cc",
+ "base/macsocketserver.cc",
"base/macutils.cc",
"session/phone/carbonvideorenderer.cc",
"session/phone/devicemanager_mac.mm",