blob: 884b3244343daacbd0c6306dbbb689088bbcde7c [file] [log] [blame]
/*
* libjingle
* Copyright 2004--2005, Google Inc.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
* OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <algorithm>
#include <cassert>
#include <cmath>
#include <cstring>
#include <iostream>
#include <vector>
#include <errno.h>
#include "talk/base/virtualsocketserver.h"
#include "talk/base/common.h"
#include "talk/base/time.h"
namespace talk_base {
const uint32 HEADER_SIZE = 28; // IP + UDP headers
const uint32 MSG_ID_PACKET = 1;
// TODO: Add a message type for new connections.
// Packets are passed between sockets as messages. We copy the data just like
// the kernel does.
class Packet : public MessageData {
public:
Packet(const char* data, size_t size, const SocketAddress& from)
: size_(size), from_(from) {
assert(data);
assert(size_ >= 0);
data_ = new char[size_];
std::memcpy(data_, data, size_);
}
virtual ~Packet() {
delete data_;
}
const char* data() const { return data_; }
size_t size() const { return size_; }
const SocketAddress& from() const { return from_; }
// Remove the first size bytes from the data.
void Consume(size_t size) {
assert(size < size_);
size_ -= size;
char* new_data = new char[size_];
std::memcpy(new_data, data_, size);
delete[] data_;
data_ = new_data;
}
private:
char* data_;
size_t size_;
SocketAddress from_;
};
// Implements the socket interface using the virtual network. Packets are
// passed as messages using the message queue of the socket server.
class VirtualSocket : public AsyncSocket, public MessageHandler {
public:
VirtualSocket(
VirtualSocketServer* server, int type, bool async, uint32 ip)
: server_(server), type_(type), async_(async), connected_(false),
local_ip_(ip), readable_(true), queue_size_(0) {
assert((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM));
packets_ = new std::vector<Packet*>();
}
~VirtualSocket() {
Close();
for (unsigned i = 0; i < packets_->size(); i++)
delete (*packets_)[i];
delete packets_;
}
SocketAddress GetLocalAddress() const {
return local_addr_;
}
SocketAddress GetRemoteAddress() const {
return remote_addr_;
}
int Bind(const SocketAddress& addr) {
assert(addr.port() != 0);
int result = server_->Bind(addr, this);
if (result >= 0)
local_addr_ = addr;
else
error_ = EADDRINUSE;
return result;
}
int Connect(const SocketAddress& addr) {
assert(!connected_);
connected_ = true;
remote_addr_ = addr;
assert(type_ == SOCK_DGRAM); // stream not yet implemented
return 0;
}
int Close() {
if (!local_addr_.IsAny())
server_->Unbind(local_addr_, this);
connected_ = false;
local_addr_ = SocketAddress();
remote_addr_ = SocketAddress();
return 0;
}
int Send(const void *pv, size_t cb) {
assert(connected_);
return SendInternal(pv, cb, remote_addr_);
}
int SendTo(const void *pv, size_t cb, const SocketAddress& addr) {
assert(!connected_);
return SendInternal(pv, cb, addr);
}
int SendInternal(const void *pv, size_t cb, const SocketAddress& addr) {
// If we have not been assigned a local port, then get one.
if (local_addr_.IsAny()) {
local_addr_.SetIP(local_ip_);
int result = server_->Bind(this, &local_addr_);
if (result < 0) {
local_addr_.SetIP(0);
error_ = EADDRINUSE;
return result;
}
}
// Send the data in a message to the appropriate socket.
return server_->Send(this, pv, cb, local_addr_, addr);
}
int Recv(void *pv, size_t cb) {
SocketAddress addr;
return RecvFrom(pv, cb, &addr);
}
int RecvFrom(void *pv, size_t cb, SocketAddress *paddr) {
// If we don't have a packet, then either error or wait for one to arrive.
if (packets_->size() == 0) {
if (async_) {
error_ = EAGAIN;
return -1;
}
while (packets_->size() == 0) {
Message msg;
server_->msg_queue_->Get(&msg);
server_->msg_queue_->Dispatch(&msg);
}
}
// Return the packet at the front of the queue.
Packet* packet = packets_->front();
*paddr = packet->from();
int size = (int)packet->size();
if (size <= (int)cb) {
std::memcpy(pv, packet->data(), size);
packets_->erase(packets_->begin());
delete packet;
return size;
} else {
std::memcpy(pv, packet->data(), cb);
packet->Consume(cb);
return (int)cb;
}
}
int Listen(int backlog) {
assert(false); // not yet implemented
return 0;
}
Socket* Accept(SocketAddress *paddr) {
assert(false); // not yet implemented
return 0;
}
bool readable() { return readable_; }
void set_readable(bool value) { readable_ = value; }
bool writable() { return false; }
void set_writable(bool value) {
// TODO: Send ourselves messages (delayed after the first) to give them a
// chance to write.
assert(false);
}
int GetError() const {
return error_;
}
void SetError(int error) {
error_ = error;
}
ConnState GetState() const {
return connected_ ? CS_CONNECTED : CS_CLOSED;
}
int SetOption(Option opt, int value) {
return 0;
}
int EstimateMTU(uint16* mtu) {
if (!connected_)
return ENOTCONN;
else
return 65536;
}
void OnMessage(Message *pmsg) {
if (pmsg->message_id == MSG_ID_PACKET) {
assert(pmsg->pdata);
Packet* packet = static_cast<Packet*>(pmsg->pdata);
if (!readable_)
return;
packets_->push_back(packet);
if (async_) {
SignalReadEvent(this);
// TODO: If the listeners don't want to read this packet now, we will
// need to send ourselves delayed messages to try again.
assert(packets_->size() == 0);
}
} else {
assert(false);
}
}
private:
struct QueueEntry {
uint32 size;
uint32 done_time;
};
typedef std::deque<QueueEntry> SendQueue;
VirtualSocketServer* server_;
int type_;
bool async_;
bool connected_;
uint32 local_ip_;
bool readable_;
SocketAddress local_addr_;
SocketAddress remote_addr_;
std::vector<Packet*>* packets_;
int error_;
SendQueue queue_;
uint32 queue_size_;
CriticalSection queue_crit_;
friend class VirtualSocketServer;
};
VirtualSocketServer::VirtualSocketServer()
: fWait_(false), wait_version_(0), next_ip_(1), next_port_(45000),
bandwidth_(0), queue_capacity_(64 * 1024), delay_mean_(0),
delay_stddev_(0), delay_dist_(0), drop_prob_(0.0) {
msg_queue_ = new MessageQueue(); // uses physical socket server for Wait
bindings_ = new AddressMap();
UpdateDelayDistribution();
}
VirtualSocketServer::~VirtualSocketServer() {
delete bindings_;
delete msg_queue_;
delete delay_dist_;
}
uint32 VirtualSocketServer::GetNextIP() {
return next_ip_++;
}
Socket* VirtualSocketServer::CreateSocket(int type) {
return CreateSocketInternal(type);
}
AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int type) {
return CreateSocketInternal(type);
}
VirtualSocket* VirtualSocketServer::CreateSocketInternal(int type) {
uint32 ip = (next_ip_ > 1) ? next_ip_ - 1 : 1;
return new VirtualSocket(this, type, true, ip);
}
bool VirtualSocketServer::Wait(int cmsWait, bool process_io) {
ASSERT(process_io); // This can't be easily supported.
uint32 msEnd;
if (cmsWait != kForever)
msEnd = GetMillisecondCount() + cmsWait;
uint32 cmsNext = cmsWait;
fWait_ = true;
wait_version_ += 1;
while (fWait_) {
Message msg;
if (!msg_queue_->Get(&msg, cmsNext))
return true;
msg_queue_->Dispatch(&msg);
if (cmsWait != kForever) {
uint32 msCur = GetMillisecondCount();
if (msCur >= msEnd)
return true;
cmsNext = msEnd - msCur;
}
}
return true;
}
const uint32 MSG_WAKE_UP = 1;
struct WakeUpMessage : public MessageData {
WakeUpMessage(uint32 ver) : wait_version(ver) {}
virtual ~WakeUpMessage() {}
uint32 wait_version;
};
void VirtualSocketServer::WakeUp() {
msg_queue_->Post(this, MSG_WAKE_UP, new WakeUpMessage(wait_version_));
}
void VirtualSocketServer::OnMessage(Message* pmsg) {
assert(pmsg->message_id == MSG_WAKE_UP);
assert(pmsg->pdata);
WakeUpMessage* wmsg = static_cast<WakeUpMessage*>(pmsg->pdata);
if (wmsg->wait_version == wait_version_)
fWait_ = false;
delete pmsg->pdata;
}
int VirtualSocketServer::Bind(
const SocketAddress& addr, VirtualSocket* socket) {
assert(addr.ip() > 0); // don't support any-address right now
assert(addr.port() > 0);
assert(socket);
if (bindings_->find(addr) == bindings_->end()) {
(*bindings_)[addr] = socket;
return 0;
} else {
return -1;
}
}
int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) {
assert(addr->ip() > 0); // don't support any-address right now
assert(socket);
for (int i = 0; i < 65536; i++) {
addr->SetPort(next_port_++);
if (addr->port() > 0) {
AddressMap::iterator iter = bindings_->find(*addr);
if (iter == bindings_->end()) {
(*bindings_)[*addr] = socket;
return 0;
}
}
}
errno = EADDRINUSE; // TODO: is there a better error number?
return -1;
}
int VirtualSocketServer::Unbind(
const SocketAddress& addr, VirtualSocket* socket) {
assert((*bindings_)[addr] == socket);
bindings_->erase(bindings_->find(addr));
return 0;
}
static double Random() {
return double(rand()) / RAND_MAX;
}
int VirtualSocketServer::Send(
VirtualSocket* socket, const void *pv, size_t cb,
const SocketAddress& local_addr, const SocketAddress& remote_addr) {
// See if we want to drop this packet.
if (Random() < drop_prob_) {
std::cerr << "Dropping packet: bad luck" << std::endl;
return 0;
}
uint32 cur_time = GetMillisecondCount();
uint32 send_delay;
// Determine whether we have enough bandwidth to accept this packet. To do
// this, we need to update the send queue. Once we know it's current size,
// we know whether we can fit this packet.
//
// NOTE: There are better algorithms for maintaining such a queue (such as
// "Derivative Random Drop"); however, this algorithm is a more accurate
// simulation of what a normal network would do.
{
CritScope cs(&socket->queue_crit_);
while ((socket->queue_.size() > 0) &&
(socket->queue_.front().done_time <= cur_time)) {
assert(socket->queue_size_ >= socket->queue_.front().size);
socket->queue_size_ -= socket->queue_.front().size;
socket->queue_.pop_front();
}
VirtualSocket::QueueEntry entry;
entry.size = uint32(cb) + HEADER_SIZE;
if (socket->queue_size_ + entry.size > queue_capacity_) {
std::cerr << "Dropping packet: queue capacity exceeded" << std::endl;
return 0; // not an error
}
socket->queue_size_ += entry.size;
send_delay = SendDelay(socket->queue_size_);
entry.done_time = cur_time + send_delay;
socket->queue_.push_back(entry);
}
// Find the delay for crossing the many virtual hops of the network.
uint32 transit_delay = GetRandomTransitDelay();
// Post the packet as a message to be delivered (on our own thread)
AddressMap::iterator iter = bindings_->find(remote_addr);
if (iter != bindings_->end()) {
Packet* p = new Packet(static_cast<const char*>(pv), cb, local_addr);
uint32 delay = send_delay + transit_delay;
msg_queue_->PostDelayed(delay, iter->second, MSG_ID_PACKET, p);
} else {
std::cerr << "No one listening at " << remote_addr.ToString() << std::endl;
}
return (int)cb;
}
uint32 VirtualSocketServer::SendDelay(uint32 size) {
if (bandwidth_ == 0)
return 0;
else
return 1000 * size / bandwidth_;
}
void PrintFunction(std::vector<std::pair<double,double> >* f) {
for (uint32 i = 0; i < f->size(); i++)
std::cout << (*f)[i].first << '\t' << (*f)[i].second << std::endl;
}
void VirtualSocketServer::UpdateDelayDistribution() {
Function* dist = GetDelayDistribution();
dist = Resample(Invert(Accumulate(dist)), 0, 1);
// We take a lock just to make sure we don't leak memory.
{
CritScope cs(&delay_crit_);
delete delay_dist_;
delay_dist_ = dist;
}
}
const int NUM_SAMPLES = 100; // 1000;
static double PI = 4 * std::atan(1.0);
static double Normal(double x, double mean, double stddev) {
double a = (x - mean) * (x - mean) / (2 * stddev * stddev);
return std::exp(-a) / (stddev * sqrt(2 * PI));
}
#if 0 // static unused gives a warning
static double Pareto(double x, double min, double k) {
if (x < min)
return 0;
else
return k * std::pow(min, k) / std::pow(x, k+1);
}
#endif
VirtualSocketServer::Function* VirtualSocketServer::GetDelayDistribution() {
Function* f = new Function();
if (delay_stddev_ == 0) {
f->push_back(Point(delay_mean_, 1.0));
} else {
double start = 0;
if (delay_mean_ >= 4 * double(delay_stddev_))
start = delay_mean_ - 4 * double(delay_stddev_);
double end = delay_mean_ + 4 * double(delay_stddev_);
double delay_min = 0;
if (delay_mean_ >= 1.0 * delay_stddev_)
delay_min = delay_mean_ - 1.0 * delay_stddev_;
for (int i = 0; i < NUM_SAMPLES; i++) {
double x = start + (end - start) * i / (NUM_SAMPLES - 1);
double y = Normal(x, delay_mean_, delay_stddev_);
f->push_back(Point(x, y));
}
}
return f;
}
uint32 VirtualSocketServer::GetRandomTransitDelay() {
double delay = (*delay_dist_)[rand() % delay_dist_->size()].second;
return uint32(delay);
}
struct FunctionDomainCmp {
bool operator ()(const VirtualSocketServer::Point& p1, const VirtualSocketServer::Point& p2) {
return p1.first < p2.first;
}
bool operator ()(double v1, const VirtualSocketServer::Point& p2) {
return v1 < p2.first;
}
bool operator ()(const VirtualSocketServer::Point& p1, double v2) {
return p1.first < v2;
}
};
VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) {
assert(f->size() >= 1);
double v = 0;
for (Function::size_type i = 0; i < f->size() - 1; ++i) {
double dx = (*f)[i].second * ((*f)[i+1].first - (*f)[i].first);
v = (*f)[i].second = v + dx;
}
(*f)[f->size()-1].second = v;
return f;
}
VirtualSocketServer::Function* VirtualSocketServer::Invert(Function* f) {
for (Function::size_type i = 0; i < f->size(); ++i)
std::swap((*f)[i].first, (*f)[i].second);
std::sort(f->begin(), f->end(), FunctionDomainCmp());
return f;
}
VirtualSocketServer::Function* VirtualSocketServer::Resample(
Function* f, double x1, double x2) {
Function* g = new Function();
for (int i = 0; i < NUM_SAMPLES; i++) {
double x = x1 + (x2 - x1) * i / (NUM_SAMPLES - 1);
double y = Evaluate(f, x);
g->push_back(Point(x, y));
}
delete f;
return g;
}
double VirtualSocketServer::Evaluate(Function* f, double x) {
Function::iterator iter =
std::lower_bound(f->begin(), f->end(), x, FunctionDomainCmp());
if (iter == f->begin()) {
return (*f)[0].second;
} else if (iter == f->end()) {
assert(f->size() >= 1);
return (*f)[f->size() - 1].second;
} else if (iter->first == x) {
return iter->second;
} else {
double x1 = (iter - 1)->first;
double y1 = (iter - 1)->second;
double x2 = iter->first;
double y2 = iter->second;
return y1 + (y2 - y1) * (x - x1) / (x2 - x1);
}
}
} // namespace talk_base