| /* |
| * libjingle |
| * Copyright 2004--2005, Google Inc. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * |
| * 1. Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * 2. Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * 3. The name of the author may not be used to endorse or promote products |
| * derived from this software without specific prior written permission. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED |
| * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF |
| * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO |
| * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
| * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; |
| * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, |
| * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR |
| * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF |
| * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| #if defined(_MSC_VER) && _MSC_VER < 1300 |
| #pragma warning(disable:4786) |
| #endif |
| #include "talk/base/logging.h" |
| #include "talk/base/asynctcpsocket.h" |
| #include "talk/base/helpers.h" |
| #include "talk/p2p/base/relayport.h" |
| #include <iostream> |
| #include <cassert> |
| #ifdef OSX |
| #include <errno.h> |
| #endif |
| |
| #if defined(_MSC_VER) && _MSC_VER < 1300 |
| namespace std { |
| using ::strerror; |
| } |
| #endif |
| |
| #ifdef POSIX |
| extern "C" { |
| #include <errno.h> |
| } |
| #endif // POSIX |
| |
| namespace talk_base { |
| class AsyncTCPSocket; |
| }; |
| |
| namespace cricket { |
| |
| const int KEEPALIVE_DELAY = 10 * 60 * 1000; |
| const int RETRY_DELAY = 50; // 50ms, from ICE spec |
| const uint32 RETRY_TIMEOUT = 50 * 1000; // ICE says 50 secs |
| |
| // Manages a single connection to the relayserver. We aim to use each |
| // connection for only a specific destination address so that we can avoid |
| // wrapping every packet in a STUN send / data indication. |
| class RelayEntry : public sigslot::has_slots<> { |
| public: |
| RelayEntry(RelayPort* port, const talk_base::SocketAddress& ext_addr, |
| const talk_base::SocketAddress& local_addr); |
| ~RelayEntry(); |
| |
| RelayPort* port() { return port_; } |
| |
| const talk_base::SocketAddress& address() const { return ext_addr_; } |
| void set_address(const talk_base::SocketAddress& addr) { ext_addr_ = addr; } |
| |
| talk_base::AsyncPacketSocket* socket() { return socket_; } |
| |
| bool connected() const { return connected_; } |
| bool locked() const { return locked_; } |
| |
| // Returns the last error on the socket of this entry. |
| int GetError() const { return socket_->GetError(); } |
| |
| // Sends the STUN requests to the server to initiate this connection. |
| void Connect(); |
| |
| // Called when this entry becomes connected. The address given is the one |
| // exposed to the outside world on the relay server. |
| void OnConnect(const talk_base::SocketAddress& mapped_addr); |
| |
| // Sends a packet to the given destination address using the socket of this |
| // entry. This will wrap the packet in STUN if necessary. |
| int SendTo(const void* data, size_t size, |
| const talk_base::SocketAddress& addr); |
| |
| // Schedules a keep-alive allocate request. |
| void ScheduleKeepAlive(); |
| |
| void SetServerIndex(size_t sindex) { server_index_ = sindex; } |
| size_t ServerIndex() const { return server_index_; } |
| |
| // Try a different server address |
| void HandleConnectFailure(); |
| |
| private: |
| RelayPort* port_; |
| talk_base::SocketAddress ext_addr_, local_addr_; |
| size_t server_index_; |
| talk_base::AsyncPacketSocket* socket_; |
| bool connected_; |
| bool locked_; |
| StunRequestManager requests_; |
| |
| // Called when a TCP connection is established or fails |
| void OnSocketConnect(talk_base::AsyncTCPSocket* socket); |
| void OnSocketClose(talk_base::AsyncTCPSocket* socket, int error); |
| |
| // Called when a packet is received on this socket. |
| void OnReadPacket( |
| const char* data, size_t size, |
| const talk_base::SocketAddress& remote_addr, |
| talk_base::AsyncPacketSocket* socket); |
| |
| // Called on behalf of a StunRequest to write data to the socket. This is |
| // already STUN intended for the server, so no wrapping is necessary. |
| void OnSendPacket(const void* data, size_t size, StunRequest* req); |
| |
| // Sends the given data on the socket to the server with no wrapping. This |
| // returns the number of bytes written or -1 if an error occurred. |
| int SendPacket(const void* data, size_t size); |
| }; |
| |
| // Handles an allocate request for a particular RelayEntry. |
| class AllocateRequest : public StunRequest { |
| public: |
| AllocateRequest(RelayEntry* entry); |
| virtual ~AllocateRequest() {} |
| |
| virtual void Prepare(StunMessage* request); |
| |
| virtual int GetNextDelay(); |
| |
| virtual void OnResponse(StunMessage* response); |
| virtual void OnErrorResponse(StunMessage* response); |
| virtual void OnTimeout(); |
| |
| private: |
| RelayEntry* entry_; |
| uint32 start_time_; |
| }; |
| |
| const std::string RELAY_PORT_TYPE("relay"); |
| |
| RelayPort::RelayPort( |
| talk_base::Thread* thread, talk_base::SocketFactory* factory, |
| talk_base::Network* network, const talk_base::SocketAddress& local_addr, |
| const std::string& username, const std::string& password, |
| const std::string& magic_cookie) |
| : Port(thread, RELAY_PORT_TYPE, factory, network), local_addr_(local_addr), |
| ready_(false), magic_cookie_(magic_cookie), error_(0) { |
| |
| entries_.push_back( |
| new RelayEntry(this, talk_base::SocketAddress(), local_addr_)); |
| |
| set_username_fragment(username); |
| set_password(password); |
| |
| if (magic_cookie_.size() == 0) |
| magic_cookie_.append(STUN_MAGIC_COOKIE_VALUE, 4); |
| } |
| |
| RelayPort::~RelayPort() { |
| for (unsigned i = 0; i < entries_.size(); ++i) |
| delete entries_[i]; |
| thread_->Clear(this); |
| } |
| |
| void RelayPort::AddServerAddress(const ProtocolAddress& addr) { |
| // Since HTTP proxies usually only allow 443, let's up the priority on PROTO_SSLTCP |
| if ((addr.proto == PROTO_SSLTCP) |
| && ((proxy().type == talk_base::PROXY_HTTPS) |
| || (proxy().type == talk_base::PROXY_UNKNOWN))) { |
| server_addr_.push_front(addr); |
| } else { |
| server_addr_.push_back(addr); |
| } |
| } |
| |
| void RelayPort::AddExternalAddress(const ProtocolAddress& addr) { |
| std::string proto_name = ProtoToString(addr.proto); |
| for (std::vector<Candidate>::const_iterator it = candidates().begin(); it != candidates().end(); ++it) { |
| if ((it->address() == addr.address) && (it->protocol() == proto_name)) { |
| LOG(INFO) << "Redundant relay address: " << proto_name << " @ " << addr.address.ToString(); |
| return; |
| } |
| } |
| AddAddress(addr.address, proto_name, false); |
| } |
| |
| void RelayPort::SetReady() { |
| if (!ready_) { |
| ready_ = true; |
| SignalAddressReady(this); |
| } |
| } |
| |
| const ProtocolAddress * RelayPort::ServerAddress(size_t index) const { |
| if ((index >= 0) && (index < server_addr_.size())) |
| return &server_addr_[index]; |
| return 0; |
| } |
| |
| bool RelayPort::HasMagicCookie(const char* data, size_t size) { |
| if (size < 24 + magic_cookie_.size()) { |
| return false; |
| } else { |
| return 0 == std::memcmp(data + 24, |
| magic_cookie_.c_str(), |
| magic_cookie_.size()); |
| } |
| } |
| |
| void RelayPort::PrepareAddress() { |
| // We initiate a connect on the first entry. If this completes, it will fill |
| // in the server address as the address of this port. |
| assert(entries_.size() == 1); |
| entries_[0]->Connect(); |
| ready_ = false; |
| } |
| |
| Connection* RelayPort::CreateConnection(const Candidate& address, CandidateOrigin origin) { |
| // We only create connections to non-udp sockets if they are incoming on this port |
| if ((address.protocol() != "udp") && (origin != ORIGIN_THIS_PORT)) |
| return 0; |
| |
| // We don't support loopback on relays |
| if (address.type() == type()) |
| return 0; |
| |
| size_t index = 0; |
| for (size_t i = 0; i < candidates().size(); ++i) { |
| const Candidate& local = candidates()[i]; |
| if (local.protocol() == address.protocol()) { |
| index = i; |
| break; |
| } |
| } |
| |
| Connection * conn = new ProxyConnection(this, index, address); |
| AddConnection(conn); |
| return conn; |
| } |
| |
| int RelayPort::SendTo(const void* data, size_t size, |
| const talk_base::SocketAddress& addr, bool payload) { |
| |
| // Try to find an entry for this specific address. Note that the first entry |
| // created was not given an address initially, so it can be set to the first |
| // address that comes along. |
| |
| RelayEntry* entry = 0; |
| |
| for (unsigned i = 0; i < entries_.size(); ++i) { |
| if (entries_[i]->address().IsAny() && payload) { |
| entry = entries_[i]; |
| entry->set_address(addr); |
| break; |
| } else if (entries_[i]->address() == addr) { |
| entry = entries_[i]; |
| break; |
| } |
| } |
| |
| // If we did not find one, then we make a new one. This will not be useable |
| // until it becomes connected, however. |
| if (!entry && payload) { |
| entry = new RelayEntry(this, addr, local_addr_); |
| if (!entries_.empty()) { |
| // Use the same port to connect to relay server |
| entry->SetServerIndex(entries_[0]->ServerIndex()); |
| } |
| entry->Connect(); |
| entries_.push_back(entry); |
| } |
| |
| // If the entry is connected, then we can send on it (though wrapping may |
| // still be necessary). Otherwise, we can't yet use this connection, so we |
| // default to the first one. |
| if (!entry || !entry->connected()) { |
| assert(!entries_.empty()); |
| entry = entries_[0]; |
| if (!entry->connected()) { |
| error_ = EWOULDBLOCK; |
| return SOCKET_ERROR; |
| } |
| } |
| |
| // Send the actual contents to the server using the usual mechanism. |
| int sent = entry->SendTo(data, size, addr); |
| if (sent <= 0) { |
| assert(sent < 0); |
| error_ = entry->GetError(); |
| return SOCKET_ERROR; |
| } |
| |
| // The caller of the function is expecting the number of user data bytes, |
| // rather than the size of the packet. |
| return (int)size; |
| } |
| |
| int RelayPort::SetOption(talk_base::Socket::Option opt, int value) { |
| int result = 0; |
| for (unsigned i = 0; i < entries_.size(); ++i) { |
| if (entries_[i]->socket()->SetOption(opt, value) < 0) { |
| result = -1; |
| error_ = entries_[i]->socket()->GetError(); |
| } |
| } |
| options_.push_back(OptionValue(opt, value)); |
| return result; |
| } |
| |
| int RelayPort::GetError() { |
| return error_; |
| } |
| |
| void RelayPort::OnReadPacket( |
| const char* data, size_t size, |
| const talk_base::SocketAddress& remote_addr) { |
| if (Connection* conn = GetConnection(remote_addr)) { |
| conn->OnReadPacket(data, size); |
| } else { |
| Port::OnReadPacket(data, size, remote_addr); |
| } |
| } |
| |
| void RelayPort::DisposeSocket(talk_base::AsyncPacketSocket * socket) { |
| thread_->Dispose(socket); |
| } |
| |
| RelayEntry::RelayEntry(RelayPort* port, |
| const talk_base::SocketAddress& ext_addr, |
| const talk_base::SocketAddress& local_addr) |
| : port_(port), ext_addr_(ext_addr), local_addr_(local_addr), server_index_(0), |
| socket_(0), connected_(false), locked_(false), requests_(port->thread()) { |
| |
| requests_.SignalSendPacket.connect(this, &RelayEntry::OnSendPacket); |
| } |
| |
| RelayEntry::~RelayEntry() { |
| delete socket_; |
| } |
| |
| void RelayEntry::Connect() { |
| assert(socket_ == 0); |
| const ProtocolAddress * ra = port()->ServerAddress(server_index_); |
| if (!ra) { |
| LOG(INFO) << "Out of relay server connections"; |
| return; |
| } |
| |
| LOG(INFO) << "Connecting to relay via " << ProtoToString(ra->proto) << " @ " << ra->address.ToString(); |
| |
| socket_ = port_->CreatePacketSocket(ra->proto); |
| assert(socket_ != 0); |
| |
| socket_->SignalReadPacket.connect(this, &RelayEntry::OnReadPacket); |
| if (socket_->Bind(local_addr_) < 0) |
| LOG(INFO) << "bind: " << std::strerror(socket_->GetError()); |
| |
| for (unsigned i = 0; i < port_->options().size(); ++i) |
| socket_->SetOption(port_->options()[i].first, port_->options()[i].second); |
| |
| if ((ra->proto == PROTO_TCP) || (ra->proto == PROTO_SSLTCP)) { |
| talk_base::AsyncTCPSocket * tcp |
| = static_cast<talk_base::AsyncTCPSocket *>(socket_); |
| tcp->SignalClose.connect(this, &RelayEntry::OnSocketClose); |
| tcp->SignalConnect.connect(this, &RelayEntry::OnSocketConnect); |
| tcp->Connect(ra->address); |
| } else { |
| requests_.Send(new AllocateRequest(this)); |
| } |
| } |
| |
| void RelayEntry::OnConnect(const talk_base::SocketAddress& mapped_addr) { |
| ProtocolType proto = PROTO_UDP; |
| LOG(INFO) << "Relay allocate succeeded: " << ProtoToString(proto) << " @ " << mapped_addr.ToString(); |
| connected_ = true; |
| |
| port_->AddExternalAddress(ProtocolAddress(mapped_addr, proto)); |
| port_->SetReady(); |
| } |
| |
| int RelayEntry::SendTo(const void* data, size_t size, |
| const talk_base::SocketAddress& addr) { |
| |
| // If this connection is locked to the address given, then we can send the |
| // packet with no wrapper. |
| if (locked_ && (ext_addr_ == addr)) |
| return SendPacket(data, size); |
| |
| // Otherwise, we must wrap the given data in a STUN SEND request so that we |
| // can communicate the destination address to the server. |
| // |
| // Note that we do not use a StunRequest here. This is because there is |
| // likely no reason to resend this packet. If it is late, we just drop it. |
| // The next send to this address will try again. |
| |
| StunMessage request; |
| request.SetType(STUN_SEND_REQUEST); |
| request.SetTransactionID(CreateRandomString(16)); |
| |
| StunByteStringAttribute* magic_cookie_attr = |
| StunAttribute::CreateByteString(STUN_ATTR_MAGIC_COOKIE); |
| magic_cookie_attr->CopyBytes(port_->magic_cookie().c_str(), |
| (uint16)port_->magic_cookie().size()); |
| request.AddAttribute(magic_cookie_attr); |
| |
| StunByteStringAttribute* username_attr = |
| StunAttribute::CreateByteString(STUN_ATTR_USERNAME); |
| username_attr->CopyBytes(port_->username_fragment().c_str(), |
| (uint16)port_->username_fragment().size()); |
| request.AddAttribute(username_attr); |
| |
| StunAddressAttribute* addr_attr = |
| StunAttribute::CreateAddress(STUN_ATTR_DESTINATION_ADDRESS); |
| addr_attr->SetFamily(1); |
| addr_attr->SetIP(addr.ip()); |
| addr_attr->SetPort(addr.port()); |
| request.AddAttribute(addr_attr); |
| |
| // Attempt to lock |
| if (ext_addr_ == addr) { |
| StunUInt32Attribute* options_attr = |
| StunAttribute::CreateUInt32(STUN_ATTR_OPTIONS); |
| options_attr->SetValue(0x1); |
| request.AddAttribute(options_attr); |
| } |
| |
| StunByteStringAttribute* data_attr = |
| StunAttribute::CreateByteString(STUN_ATTR_DATA); |
| data_attr->CopyBytes(data, (uint16)size); |
| request.AddAttribute(data_attr); |
| |
| // TODO: compute the HMAC. |
| |
| talk_base::ByteBuffer buf; |
| request.Write(&buf); |
| |
| return SendPacket(buf.Data(), buf.Length()); |
| } |
| |
| void RelayEntry::ScheduleKeepAlive() { |
| requests_.SendDelayed(new AllocateRequest(this), KEEPALIVE_DELAY); |
| } |
| |
| void RelayEntry::HandleConnectFailure() { |
| //if (GetMillisecondCount() - start_time_ > RETRY_TIMEOUT) |
| // return; |
| //ScheduleKeepAlive(); |
| |
| connected_ = false; |
| port()->DisposeSocket(socket_); |
| socket_ = 0; |
| requests_.Clear(); |
| |
| server_index_ += 1; |
| Connect(); |
| } |
| |
| void RelayEntry::OnSocketConnect(talk_base::AsyncTCPSocket* socket) { |
| assert(socket == socket_); |
| LOG(INFO) << "relay tcp connected to " << socket->GetRemoteAddress().ToString(); |
| requests_.Send(new AllocateRequest(this)); |
| } |
| |
| void RelayEntry::OnSocketClose(talk_base::AsyncTCPSocket* socket, int error) { |
| assert(socket == socket_); |
| PLOG(LERROR, error) << "relay tcp connect failed"; |
| HandleConnectFailure(); |
| } |
| |
| void RelayEntry::OnReadPacket(const char* data, size_t size, |
| const talk_base::SocketAddress& remote_addr, |
| talk_base::AsyncPacketSocket* socket) { |
| assert(socket == socket_); |
| //assert(remote_addr == port_->server_addr()); TODO: are we worried about this? |
| |
| // If the magic cookie is not present, then this is an unwrapped packet sent |
| // by the server, The actual remote address is the one we recorded. |
| if (!port_->HasMagicCookie(data, size)) { |
| if (locked_) { |
| port_->OnReadPacket(data, size, ext_addr_); |
| } else { |
| LOG(WARNING) << "Dropping packet: entry not locked"; |
| } |
| return; |
| } |
| |
| talk_base::ByteBuffer buf(data, size); |
| StunMessage msg; |
| if (!msg.Read(&buf)) { |
| LOG(INFO) << "Incoming packet was not STUN"; |
| return; |
| } |
| |
| // The incoming packet should be a STUN ALLOCATE response, SEND response, or |
| // DATA indication. |
| if (requests_.CheckResponse(&msg)) { |
| return; |
| } else if (msg.type() == STUN_SEND_RESPONSE) { |
| if (const StunUInt32Attribute* options_attr = msg.GetUInt32(STUN_ATTR_OPTIONS)) { |
| if (options_attr->value() & 0x1) { |
| locked_ = true; |
| } |
| } |
| return; |
| } else if (msg.type() != STUN_DATA_INDICATION) { |
| LOG(INFO) << "Received BAD stun type from server: " << msg.type() |
| ; |
| return; |
| } |
| |
| // This must be a data indication. |
| |
| const StunAddressAttribute* addr_attr = |
| msg.GetAddress(STUN_ATTR_SOURCE_ADDRESS2); |
| if (!addr_attr) { |
| LOG(INFO) << "Data indication has no source address"; |
| return; |
| } else if (addr_attr->family() != 1) { |
| LOG(INFO) << "Source address has bad family"; |
| return; |
| } |
| |
| talk_base::SocketAddress remote_addr2(addr_attr->ip(), addr_attr->port()); |
| |
| const StunByteStringAttribute* data_attr = msg.GetByteString(STUN_ATTR_DATA); |
| if (!data_attr) { |
| LOG(INFO) << "Data indication has no data"; |
| return; |
| } |
| |
| // Process the actual data and remote address in the normal manner. |
| port_->OnReadPacket(data_attr->bytes(), data_attr->length(), remote_addr2); |
| } |
| |
| void RelayEntry::OnSendPacket(const void* data, size_t size, StunRequest* req) { |
| SendPacket(data, size); |
| } |
| |
| int RelayEntry::SendPacket(const void* data, size_t size) { |
| const ProtocolAddress * ra = port_->ServerAddress(server_index_); |
| if (!ra) { |
| if (socket_) |
| socket_->SetError(ENOTCONN); |
| return SOCKET_ERROR; |
| } |
| int sent = socket_->SendTo(data, size, ra->address); |
| if (sent <= 0) { |
| LOG(LS_VERBOSE) << "sendto: " << std::strerror(socket_->GetError()); |
| assert(sent < 0); |
| } |
| return sent; |
| } |
| |
| AllocateRequest::AllocateRequest(RelayEntry* entry) : entry_(entry) { |
| start_time_ = talk_base::GetMillisecondCount(); |
| } |
| |
| void AllocateRequest::Prepare(StunMessage* request) { |
| request->SetType(STUN_ALLOCATE_REQUEST); |
| |
| StunByteStringAttribute* magic_cookie_attr = |
| StunAttribute::CreateByteString(STUN_ATTR_MAGIC_COOKIE); |
| magic_cookie_attr->CopyBytes( |
| entry_->port()->magic_cookie().c_str(), |
| (uint16)entry_->port()->magic_cookie().size()); |
| request->AddAttribute(magic_cookie_attr); |
| |
| StunByteStringAttribute* username_attr = |
| StunAttribute::CreateByteString(STUN_ATTR_USERNAME); |
| username_attr->CopyBytes( |
| entry_->port()->username_fragment().c_str(), |
| (uint16)entry_->port()->username_fragment().size()); |
| request->AddAttribute(username_attr); |
| } |
| |
| int AllocateRequest::GetNextDelay() { |
| int delay = 100 * talk_base::_max(1 << count_, 2); |
| count_ += 1; |
| if (count_ == 5) |
| timeout_ = true; |
| return delay; |
| } |
| |
| void AllocateRequest::OnResponse(StunMessage* response) { |
| const StunAddressAttribute* addr_attr = |
| response->GetAddress(STUN_ATTR_MAPPED_ADDRESS); |
| if (!addr_attr) { |
| LOG(INFO) << "Allocate response missing mapped address."; |
| } else if (addr_attr->family() != 1) { |
| LOG(INFO) << "Mapped address has bad family"; |
| } else { |
| talk_base::SocketAddress addr(addr_attr->ip(), addr_attr->port()); |
| entry_->OnConnect(addr); |
| } |
| |
| // We will do a keep-alive regardless of whether this request suceeds. |
| // This should have almost no impact on network usage. |
| entry_->ScheduleKeepAlive(); |
| } |
| |
| void AllocateRequest::OnErrorResponse(StunMessage* response) { |
| const StunErrorCodeAttribute* attr = response->GetErrorCode(); |
| if (!attr) { |
| LOG(INFO) << "Bad allocate response error code"; |
| } else { |
| LOG(INFO) << "Allocate error response:" |
| << " code=" << static_cast<int>(attr->error_code()) |
| << " reason='" << attr->reason() << "'"; |
| } |
| |
| if (talk_base::GetMillisecondCount() - start_time_ <= RETRY_TIMEOUT) |
| entry_->ScheduleKeepAlive(); |
| } |
| |
| void AllocateRequest::OnTimeout() { |
| LOG(INFO) << "Allocate request timed out"; |
| entry_->HandleConnectFailure(); |
| } |
| |
| } // namespace cricket |