| /* |
| * libjingle |
| * Copyright 2004--2005, Google Inc. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * |
| * 1. Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * 2. Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * 3. The name of the author may not be used to endorse or promote products |
| * derived from this software without specific prior written permission. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED |
| * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF |
| * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO |
| * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
| * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; |
| * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, |
| * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR |
| * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF |
| * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| #include "talk/p2p/base/transport.h" |
| |
| #include "talk/base/common.h" |
| #include "talk/base/logging.h" |
| #include "talk/p2p/base/candidate.h" |
| #include "talk/p2p/base/constants.h" |
| #include "talk/p2p/base/sessionmanager.h" |
| #include "talk/p2p/base/parsing.h" |
| #include "talk/p2p/base/transportchannelimpl.h" |
| #include "talk/xmllite/xmlelement.h" |
| #include "talk/xmpp/constants.h" |
| |
| namespace cricket { |
| |
| struct ChannelParams { |
| ChannelParams() : channel(NULL), candidate(NULL) {} |
| explicit ChannelParams(const std::string& name) |
| : name(name), channel(NULL), candidate(NULL) {} |
| ChannelParams(const std::string& name, |
| const std::string& content_type) |
| : name(name), content_type(content_type), |
| channel(NULL), candidate(NULL) {} |
| explicit ChannelParams(cricket::Candidate* candidate) : |
| channel(NULL), candidate(candidate) { |
| name = candidate->name(); |
| } |
| |
| ~ChannelParams() { |
| delete candidate; |
| } |
| |
| std::string name; |
| std::string content_type; |
| cricket::TransportChannelImpl* channel; |
| cricket::Candidate* candidate; |
| }; |
| // TODO: Merge ChannelParams and ChannelMessage. |
| typedef talk_base::ScopedMessageData<ChannelParams> ChannelMessage; |
| |
| enum { |
| MSG_CREATECHANNEL = 1, |
| MSG_DESTROYCHANNEL = 2, |
| MSG_DESTROYALLCHANNELS = 3, |
| MSG_CONNECTCHANNELS = 4, |
| MSG_RESETCHANNELS = 5, |
| MSG_ONSIGNALINGREADY = 6, |
| MSG_ONREMOTECANDIDATE = 7, |
| MSG_READSTATE = 8, |
| MSG_WRITESTATE = 9, |
| MSG_REQUESTSIGNALING = 10, |
| MSG_CANDIDATEREADY = 11, |
| MSG_ROUTECHANGE = 12, |
| MSG_CONNECTING = 13, |
| }; |
| |
| Transport::Transport(talk_base::Thread* signaling_thread, |
| talk_base::Thread* worker_thread, |
| const std::string& type, |
| PortAllocator* allocator) |
| : signaling_thread_(signaling_thread), |
| worker_thread_(worker_thread), type_(type), allocator_(allocator), |
| destroyed_(false), readable_(false), writable_(false), |
| connect_requested_(false), allow_local_ips_(false) { |
| } |
| |
| Transport::~Transport() { |
| ASSERT(signaling_thread_->IsCurrent()); |
| ASSERT(destroyed_); |
| } |
| |
| TransportChannelImpl* Transport::CreateChannel( |
| const std::string& name, const std::string& content_type) { |
| ChannelMessage msg(new ChannelParams(name, content_type)); |
| worker_thread()->Send(this, MSG_CREATECHANNEL, &msg); |
| return msg.data()->channel; |
| } |
| |
| TransportChannelImpl* Transport::CreateChannel_w( |
| const std::string& name, const std::string& content_type) { |
| ASSERT(worker_thread()->IsCurrent()); |
| |
| TransportChannelImpl* impl = CreateTransportChannel(name, content_type); |
| impl->SignalReadableState.connect(this, &Transport::OnChannelReadableState); |
| impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState); |
| impl->SignalRequestSignaling.connect( |
| this, &Transport::OnChannelRequestSignaling); |
| impl->SignalCandidateReady.connect(this, &Transport::OnChannelCandidateReady); |
| impl->SignalRouteChange.connect(this, &Transport::OnChannelRouteChange); |
| |
| talk_base::CritScope cs(&crit_); |
| ASSERT(channels_.find(name) == channels_.end()); |
| channels_[name] = impl; |
| destroyed_ = false; |
| if (connect_requested_) { |
| impl->Connect(); |
| if (channels_.size() == 1) { |
| // If this is the first channel, then indicate that we have started |
| // connecting. |
| signaling_thread()->Post(this, MSG_CONNECTING, NULL); |
| } |
| } |
| return impl; |
| } |
| |
| TransportChannelImpl* Transport::GetChannel(const std::string& name) { |
| talk_base::CritScope cs(&crit_); |
| ChannelMap::iterator iter = channels_.find(name); |
| return (iter != channels_.end()) ? iter->second : NULL; |
| } |
| |
| bool Transport::HasChannels() { |
| talk_base::CritScope cs(&crit_); |
| return !channels_.empty(); |
| } |
| |
| void Transport::DestroyChannel(const std::string& name) { |
| ChannelMessage msg(new ChannelParams(name)); |
| worker_thread()->Send(this, MSG_DESTROYCHANNEL, &msg); |
| } |
| |
| void Transport::DestroyChannel_w(const std::string& name) { |
| ASSERT(worker_thread()->IsCurrent()); |
| |
| TransportChannelImpl* impl = NULL; |
| { |
| talk_base::CritScope cs(&crit_); |
| ChannelMap::iterator iter = channels_.find(name); |
| if (iter == channels_.end()) |
| return; |
| impl = iter->second; |
| channels_.erase(iter); |
| } |
| |
| if (connect_requested_ && channels_.empty()) { |
| // We're no longer attempting to connect. |
| signaling_thread()->Post(this, MSG_CONNECTING, NULL); |
| } |
| |
| if (impl) { |
| // Check in case the deleted channel was the only non-writable channel. |
| OnChannelWritableState(impl); |
| DestroyTransportChannel(impl); |
| } |
| } |
| |
| void Transport::ConnectChannels() { |
| ASSERT(signaling_thread()->IsCurrent()); |
| worker_thread()->Send(this, MSG_CONNECTCHANNELS, NULL); |
| } |
| |
| void Transport::ConnectChannels_w() { |
| ASSERT(worker_thread()->IsCurrent()); |
| if (connect_requested_ || channels_.empty()) |
| return; |
| connect_requested_ = true; |
| signaling_thread()->Post( |
| this, MSG_CANDIDATEREADY, NULL); |
| CallChannels_w(&TransportChannelImpl::Connect); |
| if (!channels_.empty()) { |
| signaling_thread()->Post(this, MSG_CONNECTING, NULL); |
| } |
| } |
| |
| void Transport::OnConnecting_s() { |
| ASSERT(signaling_thread()->IsCurrent()); |
| SignalConnecting(this); |
| } |
| |
| void Transport::DestroyAllChannels() { |
| ASSERT(signaling_thread()->IsCurrent()); |
| worker_thread()->Send(this, MSG_DESTROYALLCHANNELS, NULL); |
| worker_thread()->Clear(this); |
| signaling_thread()->Clear(this); |
| destroyed_ = true; |
| } |
| |
| void Transport::DestroyAllChannels_w() { |
| ASSERT(worker_thread()->IsCurrent()); |
| std::vector<TransportChannelImpl*> impls; |
| { |
| talk_base::CritScope cs(&crit_); |
| for (ChannelMap::iterator iter = channels_.begin(); |
| iter != channels_.end(); |
| ++iter) { |
| impls.push_back(iter->second); |
| } |
| channels_.clear(); |
| } |
| |
| for (size_t i = 0; i < impls.size(); ++i) |
| DestroyTransportChannel(impls[i]); |
| } |
| |
| void Transport::ResetChannels() { |
| ASSERT(signaling_thread()->IsCurrent()); |
| worker_thread()->Send(this, MSG_RESETCHANNELS, NULL); |
| } |
| |
| void Transport::ResetChannels_w() { |
| ASSERT(worker_thread()->IsCurrent()); |
| |
| // We are no longer attempting to connect |
| connect_requested_ = false; |
| |
| // Clear out the old messages, they aren't relevant |
| talk_base::CritScope cs(&crit_); |
| ready_candidates_.clear(); |
| |
| // Reset all of the channels |
| CallChannels_w(&TransportChannelImpl::Reset); |
| } |
| |
| void Transport::OnSignalingReady() { |
| ASSERT(signaling_thread()->IsCurrent()); |
| if (destroyed_) return; |
| |
| worker_thread()->Post(this, MSG_ONSIGNALINGREADY, NULL); |
| |
| // Notify the subclass. |
| OnTransportSignalingReady(); |
| } |
| |
| void Transport::CallChannels_w(TransportChannelFunc func) { |
| ASSERT(worker_thread()->IsCurrent()); |
| talk_base::CritScope cs(&crit_); |
| for (ChannelMap::iterator iter = channels_.begin(); |
| iter != channels_.end(); |
| ++iter) { |
| ((iter->second)->*func)(); |
| } |
| } |
| |
| bool Transport::VerifyCandidate(const Candidate& cand, ParseError* error) { |
| if (cand.address().IsLocalIP() && !allow_local_ips_) |
| return BadParse("candidate has local IP address", error); |
| |
| // No address zero. |
| if (cand.address().IsAny()) { |
| return BadParse("candidate has address of zero", error); |
| } |
| |
| // Disallow all ports below 1024, except for 80 and 443 on public addresses. |
| int port = cand.address().port(); |
| if (port < 1024) { |
| if ((port != 80) && (port != 443)) |
| return BadParse( |
| "candidate has port below 1024, but not 80 or 443", error); |
| if (cand.address().IsPrivateIP()) { |
| return BadParse( |
| "candidate has port of 80 or 443 with private IP address", error); |
| } |
| } |
| |
| return true; |
| } |
| |
| void Transport::OnRemoteCandidates(const std::vector<Candidate>& candidates) { |
| for (std::vector<Candidate>::const_iterator iter = candidates.begin(); |
| iter != candidates.end(); |
| ++iter) { |
| OnRemoteCandidate(*iter); |
| } |
| } |
| |
| void Transport::OnRemoteCandidate(const Candidate& candidate) { |
| ASSERT(signaling_thread()->IsCurrent()); |
| if (destroyed_) return; |
| if (!HasChannel(candidate.name())) { |
| LOG(LS_WARNING) << "Ignoring candidate for unknown channel " |
| << candidate.name(); |
| return; |
| } |
| |
| ChannelMessage* msg = new ChannelMessage( |
| new ChannelParams(new Candidate(candidate))); |
| worker_thread()->Post(this, MSG_ONREMOTECANDIDATE, msg); |
| } |
| |
| void Transport::OnRemoteCandidate_w(const Candidate& candidate) { |
| ASSERT(worker_thread()->IsCurrent()); |
| ChannelMap::iterator iter = channels_.find(candidate.name()); |
| // It's ok for a channel to go away while this message is in transit. |
| if (iter != channels_.end()) { |
| iter->second->OnCandidate(candidate); |
| } |
| } |
| |
| void Transport::OnChannelReadableState(TransportChannel* channel) { |
| ASSERT(worker_thread()->IsCurrent()); |
| signaling_thread()->Post(this, MSG_READSTATE, NULL); |
| } |
| |
| void Transport::OnChannelReadableState_s() { |
| ASSERT(signaling_thread()->IsCurrent()); |
| bool readable = GetTransportState_s(true); |
| if (readable_ != readable) { |
| readable_ = readable; |
| SignalReadableState(this); |
| } |
| } |
| |
| void Transport::OnChannelWritableState(TransportChannel* channel) { |
| ASSERT(worker_thread()->IsCurrent()); |
| signaling_thread()->Post(this, MSG_WRITESTATE, NULL); |
| } |
| |
| void Transport::OnChannelWritableState_s() { |
| ASSERT(signaling_thread()->IsCurrent()); |
| bool writable = GetTransportState_s(false); |
| if (writable_ != writable) { |
| writable_ = writable; |
| SignalWritableState(this); |
| } |
| } |
| |
| bool Transport::GetTransportState_s(bool read) { |
| ASSERT(signaling_thread()->IsCurrent()); |
| bool result = false; |
| talk_base::CritScope cs(&crit_); |
| for (ChannelMap::iterator iter = channels_.begin(); |
| iter != channels_.end(); |
| ++iter) { |
| bool b = (read ? iter->second->readable() : iter->second->writable()); |
| result = result || b; |
| } |
| return result; |
| } |
| |
| void Transport::OnChannelRequestSignaling() { |
| ASSERT(worker_thread()->IsCurrent()); |
| signaling_thread()->Post(this, MSG_REQUESTSIGNALING, NULL); |
| } |
| |
| void Transport::OnChannelRequestSignaling_s() { |
| ASSERT(signaling_thread()->IsCurrent()); |
| SignalRequestSignaling(this); |
| } |
| |
| void Transport::OnChannelCandidateReady(TransportChannelImpl* channel, |
| const Candidate& candidate) { |
| ASSERT(worker_thread()->IsCurrent()); |
| talk_base::CritScope cs(&crit_); |
| ready_candidates_.push_back(candidate); |
| |
| // We hold any messages until the client lets us connect. |
| if (connect_requested_) { |
| signaling_thread()->Post( |
| this, MSG_CANDIDATEREADY, NULL); |
| } |
| } |
| |
| void Transport::OnChannelCandidateReady_s() { |
| ASSERT(signaling_thread()->IsCurrent()); |
| ASSERT(connect_requested_); |
| |
| std::vector<Candidate> candidates; |
| { |
| talk_base::CritScope cs(&crit_); |
| candidates.swap(ready_candidates_); |
| } |
| |
| // we do the deleting of Candidate* here to keep the new above and |
| // delete below close to each other |
| if (!candidates.empty()) { |
| SignalCandidatesReady(this, candidates); |
| } |
| } |
| |
| void Transport::OnChannelRouteChange(TransportChannel* channel, |
| const Candidate& remote_candidate) { |
| ASSERT(worker_thread()->IsCurrent()); |
| ChannelParams* params = new ChannelParams(new Candidate(remote_candidate)); |
| signaling_thread()->Post(this, MSG_ROUTECHANGE, new ChannelMessage(params)); |
| } |
| |
| void Transport::OnChannelRouteChange_s(const std::string& name, |
| const Candidate& remote_candidate) { |
| ASSERT(signaling_thread()->IsCurrent()); |
| SignalRouteChange(this, name, remote_candidate); |
| } |
| |
| void Transport::OnMessage(talk_base::Message* msg) { |
| switch (msg->message_id) { |
| case MSG_CREATECHANNEL: |
| { |
| ChannelParams* params = |
| static_cast<ChannelMessage*>(msg->pdata)->data().get(); |
| params->channel = CreateChannel_w(params->name, params->content_type); |
| } |
| break; |
| case MSG_DESTROYCHANNEL: |
| { |
| ChannelParams* params = |
| static_cast<ChannelMessage*>(msg->pdata)->data().get(); |
| DestroyChannel_w(params->name); |
| } |
| break; |
| case MSG_CONNECTCHANNELS: |
| ConnectChannels_w(); |
| break; |
| case MSG_RESETCHANNELS: |
| ResetChannels_w(); |
| break; |
| case MSG_DESTROYALLCHANNELS: |
| DestroyAllChannels_w(); |
| break; |
| case MSG_ONSIGNALINGREADY: |
| CallChannels_w(&TransportChannelImpl::OnSignalingReady); |
| break; |
| case MSG_ONREMOTECANDIDATE: |
| { |
| ChannelMessage* channel_msg = static_cast<ChannelMessage*>(msg->pdata); |
| OnRemoteCandidate_w(*(channel_msg->data()->candidate)); |
| delete channel_msg; |
| } |
| break; |
| case MSG_CONNECTING: |
| OnConnecting_s(); |
| break; |
| case MSG_READSTATE: |
| OnChannelReadableState_s(); |
| break; |
| case MSG_WRITESTATE: |
| OnChannelWritableState_s(); |
| break; |
| case MSG_REQUESTSIGNALING: |
| OnChannelRequestSignaling_s(); |
| break; |
| case MSG_CANDIDATEREADY: |
| OnChannelCandidateReady_s(); |
| break; |
| case MSG_ROUTECHANGE: |
| { |
| ChannelMessage* channel_msg = static_cast<ChannelMessage*>(msg->pdata); |
| ChannelParams* params = channel_msg->data().get(); |
| OnChannelRouteChange_s(params->name, *params->candidate); |
| delete channel_msg; |
| } |
| break; |
| } |
| } |
| |
| bool TransportParser::ParseAddress(const buzz::XmlElement* elem, |
| const buzz::QName& address_name, |
| const buzz::QName& port_name, |
| talk_base::SocketAddress* address, |
| ParseError* error) { |
| if (!elem->HasAttr(address_name)) |
| return BadParse("address does not have " + address_name.LocalPart(), error); |
| if (!elem->HasAttr(port_name)) |
| return BadParse("address does not have " + port_name.LocalPart(), error); |
| |
| address->SetIP(elem->Attr(address_name)); |
| std::istringstream ist(elem->Attr(port_name)); |
| int port = 0; |
| ist >> port; |
| address->SetPort(port); |
| |
| return true; |
| } |
| |
| } // namespace cricket |