blob: dfa2dfd4b4e359370f9d31287f1fb3f13c976b8a [file] [log] [blame]
/*
* libjingle
* Copyright 2004--2005, Google Inc.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
* OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "talk/base/common.h"
#include "talk/p2p/base/transport.h"
#include "talk/p2p/base/sessionmanager.h"
#include "talk/p2p/base/transportchannelimpl.h"
#include "talk/p2p/base/constants.h"
#include "talk/xmllite/xmlelement.h"
#include "talk/xmpp/constants.h"
namespace {
struct ChannelParams {
std::string name;
std::string session_type;
cricket::TransportChannelImpl* channel;
buzz::XmlElement* elem;
ChannelParams() : channel(NULL), elem(NULL) {}
};
typedef talk_base::TypedMessageData<ChannelParams*> ChannelMessage;
const int MSG_CREATECHANNEL = 1;
const int MSG_DESTROYCHANNEL = 2;
const int MSG_DESTROYALLCHANNELS = 3;
const int MSG_CONNECTCHANNELS = 4;
const int MSG_RESETCHANNELS = 5;
const int MSG_ONSIGNALINGREADY = 6;
const int MSG_FORWARDCHANNELMESSAGE = 7;
const int MSG_READSTATE = 8;
const int MSG_WRITESTATE = 9;
const int MSG_REQUESTSIGNALING = 10;
const int MSG_ONCHANNELMESSAGE = 11;
const int MSG_CONNECTING = 12;
} // namespace
namespace cricket {
Transport::Transport(SessionManager* session_manager, const std::string& name)
: session_manager_(session_manager), name_(name), destroyed_(false),
readable_(false), writable_(false), connect_requested_(false),
allow_local_ips_(false) {
ASSERT(session_manager_->signaling_thread()->IsCurrent());
}
Transport::~Transport() {
ASSERT(session_manager_->signaling_thread()->IsCurrent());
ASSERT(destroyed_);
}
TransportChannelImpl* Transport::CreateChannel(const std::string& name, const std::string &session_type) {
ChannelParams params;
params.name = name;
params.session_type = session_type;
ChannelMessage msg(&params);
session_manager_->worker_thread()->Send(this, MSG_CREATECHANNEL, &msg);
return msg.data()->channel;
}
TransportChannelImpl* Transport::CreateChannel_w(const std::string& name, const std::string &session_type) {
ASSERT(session_manager_->worker_thread()->IsCurrent());
TransportChannelImpl* impl = CreateTransportChannel(name, session_type);
impl->SignalReadableState.connect(this, &Transport::OnChannelReadableState);
impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState);
impl->SignalRequestSignaling.connect(
this, &Transport::OnChannelRequestSignaling);
impl->SignalChannelMessage.connect(this, &Transport::OnChannelMessage);
talk_base::CritScope cs(&crit_);
ASSERT(channels_.find(name) == channels_.end());
channels_[name] = impl;
destroyed_ = false;
if (connect_requested_) {
impl->Connect();
if (channels_.size() == 1) {
// If this is the first channel, then indicate that we have started
// connecting.
session_manager_->signaling_thread()->Post(this, MSG_CONNECTING, NULL);
}
}
return impl;
}
TransportChannelImpl* Transport::GetChannel(const std::string& name) {
talk_base::CritScope cs(&crit_);
ChannelMap::iterator iter = channels_.find(name);
return (iter != channels_.end()) ? iter->second : NULL;
}
bool Transport::HasChannels() {
talk_base::CritScope cs(&crit_);
return !channels_.empty();
}
void Transport::DestroyChannel(const std::string& name) {
ChannelParams params;
params.name = name;
ChannelMessage msg(&params);
session_manager_->worker_thread()->Send(this, MSG_DESTROYCHANNEL, &msg);
}
void Transport::DestroyChannel_w(const std::string& name) {
ASSERT(session_manager_->worker_thread()->IsCurrent());
TransportChannelImpl* impl = NULL;
{
talk_base::CritScope cs(&crit_);
ChannelMap::iterator iter = channels_.find(name);
ASSERT(iter != channels_.end());
impl = iter->second;
channels_.erase(iter);
}
if (connect_requested_ && channels_.empty()) {
// We're not longer attempting to connect.
session_manager_->signaling_thread()->Post(this, MSG_CONNECTING, NULL);
}
if (impl)
DestroyTransportChannel(impl);
}
void Transport::ConnectChannels() {
ASSERT(session_manager_->signaling_thread()->IsCurrent());
session_manager_->worker_thread()->Post(this, MSG_CONNECTCHANNELS, NULL);
}
void Transport::ConnectChannels_w() {
ASSERT(session_manager_->worker_thread()->IsCurrent());
if (connect_requested_)
return;
connect_requested_ = true;
session_manager_->signaling_thread()->Post(this, MSG_ONCHANNELMESSAGE, NULL);
CallChannels_w(&TransportChannelImpl::Connect);
if (!channels_.empty()) {
session_manager_->signaling_thread()->Post(this, MSG_CONNECTING, NULL);
}
}
void Transport::OnConnecting_s() {
ASSERT(session_manager_->signaling_thread()->IsCurrent());
SignalConnecting(this);
}
void Transport::DestroyAllChannels() {
ASSERT(session_manager_->signaling_thread()->IsCurrent());
session_manager_->worker_thread()->Send(this, MSG_DESTROYALLCHANNELS, NULL);
destroyed_ = true;
}
void Transport::DestroyAllChannels_w() {
ASSERT(session_manager_->worker_thread()->IsCurrent());
std::vector<TransportChannelImpl*> impls;
{
talk_base::CritScope cs(&crit_);
for (ChannelMap::iterator iter = channels_.begin();
iter != channels_.end();
++iter) {
impls.push_back(iter->second);
}
channels_.clear();
}
for (size_t i = 0; i < impls.size(); ++i)
DestroyTransportChannel(impls[i]);
}
void Transport::ResetChannels() {
ASSERT(session_manager_->signaling_thread()->IsCurrent());
session_manager_->worker_thread()->Post(this, MSG_RESETCHANNELS, NULL);
}
void Transport::ResetChannels_w() {
ASSERT(session_manager_->worker_thread()->IsCurrent());
// We are no longer attempting to connect
connect_requested_ = false;
// Clear out the old messages, they aren't relevant
talk_base::CritScope cs(&crit_);
for (size_t i=0; i<messages_.size(); ++i) {
delete messages_[i];
}
messages_.clear();
// Reset all of the channels
CallChannels_w(&TransportChannelImpl::Reset);
}
void Transport::OnSignalingReady() {
ASSERT(session_manager_->signaling_thread()->IsCurrent());
session_manager_->worker_thread()->Post(this, MSG_ONSIGNALINGREADY, NULL);
// Notify the subclass.
OnTransportSignalingReady();
}
void Transport::CallChannels_w(TransportChannelFunc func) {
ASSERT(session_manager_->worker_thread()->IsCurrent());
talk_base::CritScope cs(&crit_);
for (ChannelMap::iterator iter = channels_.begin();
iter != channels_.end();
++iter) {
((iter->second)->*func)();
}
}
void Transport::ForwardChannelMessage(const std::string& name,
buzz::XmlElement* elem) {
ASSERT(session_manager_->signaling_thread()->IsCurrent());
ASSERT(HasChannel(name));
ChannelParams* params = new ChannelParams();
params->name = name;
params->elem = elem;
ChannelMessage* msg = new ChannelMessage(params);
session_manager_->worker_thread()->Post(this, MSG_FORWARDCHANNELMESSAGE, msg);
}
void Transport::ForwardChannelMessage_w(const std::string& name,
buzz::XmlElement* elem) {
ASSERT(session_manager_->worker_thread()->IsCurrent());
ChannelMap::iterator iter = channels_.find(name);
// It's ok for a channel to go away while this message is in transit.
if (iter != channels_.end()) {
iter->second->OnChannelMessage(elem);
}
delete elem;
}
void Transport::OnChannelReadableState(TransportChannel* channel) {
ASSERT(session_manager_->worker_thread()->IsCurrent());
session_manager_->signaling_thread()->Post(this, MSG_READSTATE, NULL);
}
void Transport::OnChannelReadableState_s() {
ASSERT(session_manager_->signaling_thread()->IsCurrent());
bool readable = GetTransportState_s(true);
if (readable_ != readable) {
readable_ = readable;
SignalReadableState(this);
}
}
void Transport::OnChannelWritableState(TransportChannel* channel) {
ASSERT(session_manager_->worker_thread()->IsCurrent());
session_manager_->signaling_thread()->Post(this, MSG_WRITESTATE, NULL);
}
void Transport::OnChannelWritableState_s() {
ASSERT(session_manager_->signaling_thread()->IsCurrent());
bool writable = GetTransportState_s(false);
if (writable_ != writable) {
writable_ = writable;
SignalWritableState(this);
}
}
bool Transport::GetTransportState_s(bool read) {
ASSERT(session_manager_->signaling_thread()->IsCurrent());
bool result = false;
talk_base::CritScope cs(&crit_);
for (ChannelMap::iterator iter = channels_.begin();
iter != channels_.end();
++iter) {
bool b = (read ? iter->second->readable() : iter->second->writable());
result = result || b;
}
return result;
}
void Transport::OnChannelRequestSignaling() {
ASSERT(session_manager_->worker_thread()->IsCurrent());
session_manager_->signaling_thread()->Post(this, MSG_REQUESTSIGNALING, NULL);
}
void Transport::OnChannelRequestSignaling_s() {
ASSERT(session_manager_->signaling_thread()->IsCurrent());
SignalRequestSignaling(this);
}
void Transport::OnChannelMessage(TransportChannelImpl* impl,
buzz::XmlElement* elem) {
ASSERT(session_manager_->worker_thread()->IsCurrent());
talk_base::CritScope cs(&crit_);
messages_.push_back(elem);
// We hold any messages until the client lets us connect.
if (connect_requested_) {
session_manager_->signaling_thread()->Post(
this, MSG_ONCHANNELMESSAGE, NULL);
}
}
void Transport::OnChannelMessage_s() {
ASSERT(session_manager_->signaling_thread()->IsCurrent());
ASSERT(connect_requested_);
std::vector<buzz::XmlElement*> msgs;
{
talk_base::CritScope cs(&crit_);
msgs.swap(messages_);
}
if (!msgs.empty())
OnTransportChannelMessages(msgs);
}
void Transport::OnTransportChannelMessages(
const std::vector<buzz::XmlElement*>& msgs) {
std::vector<buzz::XmlElement*> elems;
for (size_t i = 0; i < msgs.size(); ++i) {
buzz::XmlElement* elem =
new buzz::XmlElement(buzz::QName(name(), "transport"));
elem->AddElement(msgs[i]);
elems.push_back(elem);
}
SignalTransportMessage(this, elems);
}
void Transport::OnMessage(talk_base::Message* msg) {
switch (msg->message_id) {
case MSG_CREATECHANNEL:
{
ChannelParams* params = static_cast<ChannelMessage*>(msg->pdata)->data();
params->channel = CreateChannel_w(params->name, params->session_type);
}
break;
case MSG_DESTROYCHANNEL:
{
ChannelParams* params = static_cast<ChannelMessage*>(msg->pdata)->data();
DestroyChannel_w(params->name);
}
break;
case MSG_CONNECTCHANNELS:
ConnectChannels_w();
break;
case MSG_RESETCHANNELS:
ResetChannels_w();
break;
case MSG_DESTROYALLCHANNELS:
DestroyAllChannels_w();
break;
case MSG_ONSIGNALINGREADY:
CallChannels_w(&TransportChannelImpl::OnSignalingReady);
break;
case MSG_FORWARDCHANNELMESSAGE:
{
ChannelParams* params = static_cast<ChannelMessage*>(msg->pdata)->data();
ForwardChannelMessage_w(params->name, params->elem);
delete params;
}
break;
case MSG_CONNECTING:
OnConnecting_s();
break;
case MSG_READSTATE:
OnChannelReadableState_s();
break;
case MSG_WRITESTATE:
OnChannelWritableState_s();
break;
case MSG_REQUESTSIGNALING:
OnChannelRequestSignaling_s();
break;
case MSG_ONCHANNELMESSAGE:
OnChannelMessage_s();
break;
}
}
bool Transport::BadRequest(const buzz::XmlElement* stanza,
const std::string& text,
const buzz::XmlElement* extra_info) {
SignalTransportError(this, stanza, buzz::QN_STANZA_BAD_REQUEST, "modify",
text, extra_info);
return false;
}
bool Transport::ParseAddress(const buzz::XmlElement* stanza,
const buzz::XmlElement* elem,
talk_base::SocketAddress* address) {
ASSERT(elem->HasAttr(QN_ADDRESS));
ASSERT(elem->HasAttr(QN_PORT));
// Record the parts of the address.
address->SetIP(elem->Attr(QN_ADDRESS));
std::istringstream ist(elem->Attr(QN_PORT));
int port;
ist >> port;
address->SetPort(port);
// No address zero.
if (address->IsAny())
return BadRequest(stanza, "candidate has address of zero", NULL);
// Always disallow addresses that refer to the local host.
if (address->IsLocalIP() && !allow_local_ips_)
return BadRequest(stanza, "candidate has local IP address", NULL);
// Disallow all ports below 1024, except for 80 and 443 on public addresses.
if (port < 1024) {
if ((port != 80) && (port != 443))
return BadRequest(stanza,
"candidate has port below 1024, but not 80 or 443",
NULL);
if (address->IsPrivateIP()) {
return BadRequest(stanza, "candidate has port of 80 or 443 with private "
"IP address", NULL);
}
}
return true;
}
} // namespace cricket