blob: fdd8520da6d51a43d95f19226a8dee9148323f87 [file] [log] [blame]
/*
* libjingle
* Copyright 2004--2005, Google Inc.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
* OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "talk/p2p/base/transport.h"
#include "talk/base/common.h"
#include "talk/base/logging.h"
#include "talk/p2p/base/candidate.h"
#include "talk/p2p/base/constants.h"
#include "talk/p2p/base/sessionmanager.h"
#include "talk/p2p/base/parsing.h"
#include "talk/p2p/base/transportchannelimpl.h"
#include "talk/xmllite/xmlelement.h"
#include "talk/xmpp/constants.h"
namespace cricket {
struct ChannelParams {
ChannelParams() : channel(NULL), candidate(NULL) {}
explicit ChannelParams(const std::string& name)
: name(name), channel(NULL), candidate(NULL) {}
ChannelParams(const std::string& name,
const std::string& content_type)
: name(name), content_type(content_type),
channel(NULL), candidate(NULL) {}
explicit ChannelParams(cricket::Candidate* candidate) :
channel(NULL), candidate(candidate) {
name = candidate->name();
}
~ChannelParams() {
delete candidate;
}
std::string name;
std::string content_type;
cricket::TransportChannelImpl* channel;
cricket::Candidate* candidate;
};
// TODO: Merge ChannelParams and ChannelMessage.
typedef talk_base::ScopedMessageData<ChannelParams> ChannelMessage;
enum {
MSG_CREATECHANNEL = 1,
MSG_DESTROYCHANNEL = 2,
MSG_DESTROYALLCHANNELS = 3,
MSG_CONNECTCHANNELS = 4,
MSG_RESETCHANNELS = 5,
MSG_ONSIGNALINGREADY = 6,
MSG_ONREMOTECANDIDATE = 7,
MSG_READSTATE = 8,
MSG_WRITESTATE = 9,
MSG_REQUESTSIGNALING = 10,
MSG_CANDIDATEREADY = 11,
MSG_ROUTECHANGE = 12,
MSG_CONNECTING = 13,
};
Transport::Transport(talk_base::Thread* signaling_thread,
talk_base::Thread* worker_thread,
const std::string& type,
PortAllocator* allocator)
: signaling_thread_(signaling_thread),
worker_thread_(worker_thread), type_(type), allocator_(allocator),
destroyed_(false), readable_(false), writable_(false),
connect_requested_(false), allow_local_ips_(false) {
}
Transport::~Transport() {
ASSERT(signaling_thread_->IsCurrent());
ASSERT(destroyed_);
}
TransportChannelImpl* Transport::CreateChannel(
const std::string& name, const std::string& content_type) {
ChannelMessage msg(new ChannelParams(name, content_type));
worker_thread()->Send(this, MSG_CREATECHANNEL, &msg);
return msg.data()->channel;
}
TransportChannelImpl* Transport::CreateChannel_w(
const std::string& name, const std::string& content_type) {
ASSERT(worker_thread()->IsCurrent());
TransportChannelImpl* impl = CreateTransportChannel(name, content_type);
impl->SignalReadableState.connect(this, &Transport::OnChannelReadableState);
impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState);
impl->SignalRequestSignaling.connect(
this, &Transport::OnChannelRequestSignaling);
impl->SignalCandidateReady.connect(this, &Transport::OnChannelCandidateReady);
impl->SignalRouteChange.connect(this, &Transport::OnChannelRouteChange);
talk_base::CritScope cs(&crit_);
ASSERT(channels_.find(name) == channels_.end());
channels_[name] = impl;
destroyed_ = false;
if (connect_requested_) {
impl->Connect();
if (channels_.size() == 1) {
// If this is the first channel, then indicate that we have started
// connecting.
signaling_thread()->Post(this, MSG_CONNECTING, NULL);
}
}
return impl;
}
TransportChannelImpl* Transport::GetChannel(const std::string& name) {
talk_base::CritScope cs(&crit_);
ChannelMap::iterator iter = channels_.find(name);
return (iter != channels_.end()) ? iter->second : NULL;
}
bool Transport::HasChannels() {
talk_base::CritScope cs(&crit_);
return !channels_.empty();
}
void Transport::DestroyChannel(const std::string& name) {
ChannelMessage msg(new ChannelParams(name));
worker_thread()->Send(this, MSG_DESTROYCHANNEL, &msg);
}
void Transport::DestroyChannel_w(const std::string& name) {
ASSERT(worker_thread()->IsCurrent());
TransportChannelImpl* impl = NULL;
{
talk_base::CritScope cs(&crit_);
ChannelMap::iterator iter = channels_.find(name);
if (iter == channels_.end())
return;
impl = iter->second;
channels_.erase(iter);
}
if (connect_requested_ && channels_.empty()) {
// We're no longer attempting to connect.
signaling_thread()->Post(this, MSG_CONNECTING, NULL);
}
if (impl) {
// Check in case the deleted channel was the only non-writable channel.
OnChannelWritableState(impl);
DestroyTransportChannel(impl);
}
}
void Transport::ConnectChannels() {
ASSERT(signaling_thread()->IsCurrent());
worker_thread()->Send(this, MSG_CONNECTCHANNELS, NULL);
}
void Transport::ConnectChannels_w() {
ASSERT(worker_thread()->IsCurrent());
if (connect_requested_ || channels_.empty())
return;
connect_requested_ = true;
signaling_thread()->Post(
this, MSG_CANDIDATEREADY, NULL);
CallChannels_w(&TransportChannelImpl::Connect);
if (!channels_.empty()) {
signaling_thread()->Post(this, MSG_CONNECTING, NULL);
}
}
void Transport::OnConnecting_s() {
ASSERT(signaling_thread()->IsCurrent());
SignalConnecting(this);
}
void Transport::DestroyAllChannels() {
ASSERT(signaling_thread()->IsCurrent());
worker_thread()->Send(this, MSG_DESTROYALLCHANNELS, NULL);
worker_thread()->Clear(this);
signaling_thread()->Clear(this);
destroyed_ = true;
}
void Transport::DestroyAllChannels_w() {
ASSERT(worker_thread()->IsCurrent());
std::vector<TransportChannelImpl*> impls;
{
talk_base::CritScope cs(&crit_);
for (ChannelMap::iterator iter = channels_.begin();
iter != channels_.end();
++iter) {
impls.push_back(iter->second);
}
channels_.clear();
}
for (size_t i = 0; i < impls.size(); ++i)
DestroyTransportChannel(impls[i]);
}
void Transport::ResetChannels() {
ASSERT(signaling_thread()->IsCurrent());
worker_thread()->Send(this, MSG_RESETCHANNELS, NULL);
}
void Transport::ResetChannels_w() {
ASSERT(worker_thread()->IsCurrent());
// We are no longer attempting to connect
connect_requested_ = false;
// Clear out the old messages, they aren't relevant
talk_base::CritScope cs(&crit_);
ready_candidates_.clear();
// Reset all of the channels
CallChannels_w(&TransportChannelImpl::Reset);
}
void Transport::OnSignalingReady() {
ASSERT(signaling_thread()->IsCurrent());
if (destroyed_) return;
worker_thread()->Post(this, MSG_ONSIGNALINGREADY, NULL);
// Notify the subclass.
OnTransportSignalingReady();
}
void Transport::CallChannels_w(TransportChannelFunc func) {
ASSERT(worker_thread()->IsCurrent());
talk_base::CritScope cs(&crit_);
for (ChannelMap::iterator iter = channels_.begin();
iter != channels_.end();
++iter) {
((iter->second)->*func)();
}
}
bool Transport::VerifyCandidate(const Candidate& cand, ParseError* error) {
if (cand.address().IsLocalIP() && !allow_local_ips_)
return BadParse("candidate has local IP address", error);
// No address zero.
if (cand.address().IsAny()) {
return BadParse("candidate has address of zero", error);
}
// Disallow all ports below 1024, except for 80 and 443 on public addresses.
int port = cand.address().port();
if (port < 1024) {
if ((port != 80) && (port != 443))
return BadParse(
"candidate has port below 1024, but not 80 or 443", error);
if (cand.address().IsPrivateIP()) {
return BadParse(
"candidate has port of 80 or 443 with private IP address", error);
}
}
return true;
}
void Transport::OnRemoteCandidates(const std::vector<Candidate>& candidates) {
for (std::vector<Candidate>::const_iterator iter = candidates.begin();
iter != candidates.end();
++iter) {
OnRemoteCandidate(*iter);
}
}
void Transport::OnRemoteCandidate(const Candidate& candidate) {
ASSERT(signaling_thread()->IsCurrent());
if (destroyed_) return;
if (!HasChannel(candidate.name())) {
LOG(LS_WARNING) << "Ignoring candidate for unknown channel "
<< candidate.name();
return;
}
ChannelMessage* msg = new ChannelMessage(
new ChannelParams(new Candidate(candidate)));
worker_thread()->Post(this, MSG_ONREMOTECANDIDATE, msg);
}
void Transport::OnRemoteCandidate_w(const Candidate& candidate) {
ASSERT(worker_thread()->IsCurrent());
ChannelMap::iterator iter = channels_.find(candidate.name());
// It's ok for a channel to go away while this message is in transit.
if (iter != channels_.end()) {
iter->second->OnCandidate(candidate);
}
}
void Transport::OnChannelReadableState(TransportChannel* channel) {
ASSERT(worker_thread()->IsCurrent());
signaling_thread()->Post(this, MSG_READSTATE, NULL);
}
void Transport::OnChannelReadableState_s() {
ASSERT(signaling_thread()->IsCurrent());
bool readable = GetTransportState_s(true);
if (readable_ != readable) {
readable_ = readable;
SignalReadableState(this);
}
}
void Transport::OnChannelWritableState(TransportChannel* channel) {
ASSERT(worker_thread()->IsCurrent());
signaling_thread()->Post(this, MSG_WRITESTATE, NULL);
}
void Transport::OnChannelWritableState_s() {
ASSERT(signaling_thread()->IsCurrent());
bool writable = GetTransportState_s(false);
if (writable_ != writable) {
writable_ = writable;
SignalWritableState(this);
}
}
bool Transport::GetTransportState_s(bool read) {
ASSERT(signaling_thread()->IsCurrent());
bool result = false;
talk_base::CritScope cs(&crit_);
for (ChannelMap::iterator iter = channels_.begin();
iter != channels_.end();
++iter) {
bool b = (read ? iter->second->readable() : iter->second->writable());
result = result || b;
}
return result;
}
void Transport::OnChannelRequestSignaling() {
ASSERT(worker_thread()->IsCurrent());
signaling_thread()->Post(this, MSG_REQUESTSIGNALING, NULL);
}
void Transport::OnChannelRequestSignaling_s() {
ASSERT(signaling_thread()->IsCurrent());
SignalRequestSignaling(this);
}
void Transport::OnChannelCandidateReady(TransportChannelImpl* channel,
const Candidate& candidate) {
ASSERT(worker_thread()->IsCurrent());
talk_base::CritScope cs(&crit_);
ready_candidates_.push_back(candidate);
// We hold any messages until the client lets us connect.
if (connect_requested_) {
signaling_thread()->Post(
this, MSG_CANDIDATEREADY, NULL);
}
}
void Transport::OnChannelCandidateReady_s() {
ASSERT(signaling_thread()->IsCurrent());
ASSERT(connect_requested_);
std::vector<Candidate> candidates;
{
talk_base::CritScope cs(&crit_);
candidates.swap(ready_candidates_);
}
// we do the deleting of Candidate* here to keep the new above and
// delete below close to each other
if (!candidates.empty()) {
SignalCandidatesReady(this, candidates);
}
}
void Transport::OnChannelRouteChange(TransportChannel* channel,
const Candidate& remote_candidate) {
ASSERT(worker_thread()->IsCurrent());
ChannelParams* params = new ChannelParams(new Candidate(remote_candidate));
signaling_thread()->Post(this, MSG_ROUTECHANGE, new ChannelMessage(params));
}
void Transport::OnChannelRouteChange_s(const std::string& name,
const Candidate& remote_candidate) {
ASSERT(signaling_thread()->IsCurrent());
SignalRouteChange(this, name, remote_candidate);
}
void Transport::OnMessage(talk_base::Message* msg) {
switch (msg->message_id) {
case MSG_CREATECHANNEL:
{
ChannelParams* params =
static_cast<ChannelMessage*>(msg->pdata)->data().get();
params->channel = CreateChannel_w(params->name, params->content_type);
}
break;
case MSG_DESTROYCHANNEL:
{
ChannelParams* params =
static_cast<ChannelMessage*>(msg->pdata)->data().get();
DestroyChannel_w(params->name);
}
break;
case MSG_CONNECTCHANNELS:
ConnectChannels_w();
break;
case MSG_RESETCHANNELS:
ResetChannels_w();
break;
case MSG_DESTROYALLCHANNELS:
DestroyAllChannels_w();
break;
case MSG_ONSIGNALINGREADY:
CallChannels_w(&TransportChannelImpl::OnSignalingReady);
break;
case MSG_ONREMOTECANDIDATE:
{
ChannelMessage* channel_msg = static_cast<ChannelMessage*>(msg->pdata);
OnRemoteCandidate_w(*(channel_msg->data()->candidate));
delete channel_msg;
}
break;
case MSG_CONNECTING:
OnConnecting_s();
break;
case MSG_READSTATE:
OnChannelReadableState_s();
break;
case MSG_WRITESTATE:
OnChannelWritableState_s();
break;
case MSG_REQUESTSIGNALING:
OnChannelRequestSignaling_s();
break;
case MSG_CANDIDATEREADY:
OnChannelCandidateReady_s();
break;
case MSG_ROUTECHANGE:
{
ChannelMessage* channel_msg = static_cast<ChannelMessage*>(msg->pdata);
ChannelParams* params = channel_msg->data().get();
OnChannelRouteChange_s(params->name, *params->candidate);
delete channel_msg;
}
break;
}
}
bool TransportParser::ParseAddress(const buzz::XmlElement* elem,
const buzz::QName& address_name,
const buzz::QName& port_name,
talk_base::SocketAddress* address,
ParseError* error) {
if (!elem->HasAttr(address_name))
return BadParse("address does not have " + address_name.LocalPart(), error);
if (!elem->HasAttr(port_name))
return BadParse("address does not have " + port_name.LocalPart(), error);
address->SetIP(elem->Attr(address_name));
std::istringstream ist(elem->Attr(port_name));
int port = 0;
ist >> port;
address->SetPort(port);
return true;
}
} // namespace cricket