blob: d5c4a3ba661503ba8844e2cc1ef6417ddc9c0d91 [file] [log] [blame]
/*
* libjingle
* Copyright 2007, Google Inc.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
* OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "talk/base/socket_unittest.h"
#include "talk/base/asyncudpsocket.h"
#include "talk/base/gunit.h"
#include "talk/base/socketserver.h"
#include "talk/base/testclient.h"
#include "talk/base/testutils.h"
#include "talk/base/thread.h"
namespace talk_base {
static const SocketAddress kEmptyAddr;
static const SocketAddress kLoopbackAddr(INADDR_LOOPBACK, 0);
void SocketTest::TestConnect() {
testing::StreamSink sink;
SocketAddress accept_addr;
// Create client.
scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(client.get());
EXPECT_EQ(AsyncSocket::CS_CLOSED, client->GetState());
EXPECT_EQ(kEmptyAddr, client->GetLocalAddress());
// Create server and listen.
scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(server.get());
EXPECT_EQ(0, server->Bind(kLoopbackAddr));
EXPECT_EQ(0, server->Listen(5));
EXPECT_EQ(AsyncSocket::CS_CONNECTING, server->GetState());
// Ensure no pending server connections, since we haven't done anything yet.
EXPECT_FALSE(sink.Check(server.get(), testing::SSE_READ));
EXPECT_TRUE(NULL == server->Accept(&accept_addr));
EXPECT_EQ(kEmptyAddr, accept_addr);
// Attempt connect to listening socket.
EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
EXPECT_NE(kEmptyAddr, client->GetLocalAddress()); // Implicit Bind
EXPECT_NE(server->GetLocalAddress(), client->GetLocalAddress());
// Client is connecting, outcome not yet determined.
EXPECT_EQ(AsyncSocket::CS_CONNECTING, client->GetState());
EXPECT_FALSE(sink.Check(client.get(), testing::SSE_OPEN));
EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
// Server has pending connection, accept it.
EXPECT_TRUE_WAIT((sink.Check(server.get(), testing::SSE_READ)), kTimeout);
scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
ASSERT_TRUE(NULL != accepted.get());
EXPECT_NE(kEmptyAddr, accept_addr);
EXPECT_EQ(accepted->GetRemoteAddress(), accept_addr);
// Connected from server perspective, check the addresses are correct.
EXPECT_EQ(AsyncSocket::CS_CONNECTED, accepted->GetState());
EXPECT_EQ(server->GetLocalAddress(), accepted->GetLocalAddress());
EXPECT_EQ(client->GetLocalAddress(), accepted->GetRemoteAddress());
// Connected from client perspective, check the addresses are correct.
EXPECT_EQ_WAIT(AsyncSocket::CS_CONNECTED, client->GetState(), kTimeout);
EXPECT_TRUE(sink.Check(client.get(), testing::SSE_OPEN));
EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
}
void SocketTest::TestConnectWithDnsLookup() {
testing::StreamSink sink;
SocketAddress accept_addr;
// Create client.
scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(client.get());
// Create server and listen.
scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(server.get());
EXPECT_EQ(0, server->Bind(kLoopbackAddr));
EXPECT_EQ(0, server->Listen(5));
// Attempt connect to listening socket.
SocketAddress dns_addr(server->GetLocalAddress());
dns_addr.SetIP("localhost");
EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
// TODO: Bind when doing DNS lookup.
//EXPECT_NE(kEmptyAddr, client->GetLocalAddress()); // Implicit Bind
// Client is connecting, outcome not yet determined.
EXPECT_EQ(AsyncSocket::CS_CONNECTING, client->GetState());
EXPECT_FALSE(sink.Check(client.get(), testing::SSE_OPEN));
EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
// Server has pending connection, accept it.
EXPECT_TRUE_WAIT((sink.Check(server.get(), testing::SSE_READ)), kTimeout);
scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
ASSERT_TRUE(NULL != accepted.get());
EXPECT_NE(kEmptyAddr, accept_addr);
EXPECT_EQ(accepted->GetRemoteAddress(), accept_addr);
// Connected from server perspective, check the addresses are correct.
EXPECT_EQ(AsyncSocket::CS_CONNECTED, accepted->GetState());
EXPECT_EQ(server->GetLocalAddress(), accepted->GetLocalAddress());
EXPECT_EQ(client->GetLocalAddress(), accepted->GetRemoteAddress());
// Connected from client perspective, check the addresses are correct.
EXPECT_EQ_WAIT(AsyncSocket::CS_CONNECTED, client->GetState(), kTimeout);
EXPECT_TRUE(sink.Check(client.get(), testing::SSE_OPEN));
EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
}
void SocketTest::TestConnectFail() {
testing::StreamSink sink;
SocketAddress accept_addr;
// Create client.
scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(client.get());
// Create server, but don't listen yet.
scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(server.get());
EXPECT_EQ(0, server->Bind(kLoopbackAddr));
// Attempt connect to a non-existent socket.
// We don't connect to the server socket created above, since on
// MacOS it takes about 75 seconds to get back an error!
SocketAddress bogus_addr(INADDR_LOOPBACK, 65535);
EXPECT_EQ(0, client->Connect(bogus_addr));
// Wait for connection to fail (ECONNREFUSED).
EXPECT_EQ_WAIT(AsyncSocket::CS_CLOSED, client->GetState(), kTimeout);
EXPECT_FALSE(sink.Check(client.get(), testing::SSE_OPEN));
EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR));
EXPECT_EQ(kEmptyAddr, client->GetRemoteAddress());
// Should be no pending server connections.
EXPECT_FALSE(sink.Check(server.get(), testing::SSE_READ));
EXPECT_TRUE(NULL == server->Accept(&accept_addr));
EXPECT_EQ(kEmptyAddr, accept_addr);
}
void SocketTest::TestConnectWithDnsLookupFail() {
testing::StreamSink sink;
SocketAddress accept_addr;
// Create client.
scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(client.get());
// Create server, but don't listen yet.
scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(server.get());
EXPECT_EQ(0, server->Bind(kLoopbackAddr));
// Attempt connect to a non-existent host.
// We don't connect to the server socket created above, since on
// MacOS it takes about 75 seconds to get back an error!
SocketAddress bogus_dns_addr("not-a-real-hostname", 65535);
EXPECT_EQ(0, client->Connect(bogus_dns_addr));
// Wait for connection to fail (EHOSTNOTFOUND).
EXPECT_EQ_WAIT(AsyncSocket::CS_CLOSED, client->GetState(), kTimeout);
EXPECT_FALSE(sink.Check(client.get(), testing::SSE_OPEN));
EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR));
EXPECT_EQ(kEmptyAddr, client->GetRemoteAddress());
// Should be no pending server connections.
EXPECT_FALSE(sink.Check(server.get(), testing::SSE_READ));
EXPECT_TRUE(NULL == server->Accept(&accept_addr));
EXPECT_EQ(kEmptyAddr, accept_addr);
}
void SocketTest::TestConnectWithClosedSocket() {
// Create server and listen.
scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(SOCK_STREAM));
EXPECT_EQ(0, server->Bind(kLoopbackAddr));
EXPECT_EQ(0, server->Listen(5));
// Create a client and put in to CS_CLOSED state.
scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(SOCK_STREAM));
EXPECT_EQ(0, client->Close());
EXPECT_EQ(AsyncSocket::CS_CLOSED, client->GetState());
// Connect() should reinitialize the socket, and put it in to CS_CONNECTING.
EXPECT_EQ(0, client->Connect(SocketAddress(server->GetLocalAddress())));
EXPECT_EQ(AsyncSocket::CS_CONNECTING, client->GetState());
}
void SocketTest::TestServerCloseDuringConnect() {
testing::StreamSink sink;
SocketAddress accept_addr;
// Create client.
scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(client.get());
// Create server and listen.
scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(server.get());
EXPECT_EQ(0, server->Bind(kLoopbackAddr));
EXPECT_EQ(0, server->Listen(5));
// Attempt connect to listening socket.
EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
// Close down the server while the socket is in the accept queue.
EXPECT_TRUE_WAIT(sink.Check(server.get(), testing::SSE_READ), kTimeout);
server->Close();
// This should fail the connection for the client. Clean up.
EXPECT_EQ_WAIT(AsyncSocket::CS_CLOSED, client->GetState(), kTimeout);
EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR));
client->Close();
}
void SocketTest::TestClientCloseDuringConnect() {
testing::StreamSink sink;
SocketAddress accept_addr;
// Create client.
scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(client.get());
// Create server and listen.
scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(server.get());
EXPECT_EQ(0, server->Bind(kLoopbackAddr));
EXPECT_EQ(0, server->Listen(5));
// Attempt connect to listening socket.
EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
// Close down the client while the socket is in the accept queue.
EXPECT_TRUE_WAIT(sink.Check(server.get(), testing::SSE_READ), kTimeout);
client->Close();
// The connection should still be able to be accepted.
scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
ASSERT_TRUE(NULL != accepted.get());
sink.Monitor(accepted.get());
EXPECT_EQ(AsyncSocket::CS_CONNECTED, accepted->GetState());
// The accepted socket should then close (possibly with err, timing-related)
EXPECT_EQ_WAIT(AsyncSocket::CS_CLOSED, accepted->GetState(), kTimeout);
EXPECT_TRUE(sink.Check(accepted.get(), testing::SSE_CLOSE) ||
sink.Check(accepted.get(), testing::SSE_ERROR));
// The client should not get a close event.
EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
}
void SocketTest::TestServerClose() {
testing::StreamSink sink;
SocketAddress accept_addr;
// Create client.
scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(client.get());
// Create server and listen.
scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(server.get());
EXPECT_EQ(0, server->Bind(kLoopbackAddr));
EXPECT_EQ(0, server->Listen(5));
// Attempt connection.
EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
// Accept connection.
EXPECT_TRUE_WAIT((sink.Check(server.get(), testing::SSE_READ)), kTimeout);
scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
ASSERT_TRUE(NULL != accepted.get());
sink.Monitor(accepted.get());
// Both sides are now connected.
EXPECT_EQ_WAIT(AsyncSocket::CS_CONNECTED, client->GetState(), kTimeout);
EXPECT_TRUE(sink.Check(client.get(), testing::SSE_OPEN));
EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress());
// Send data to the client, and then close the connection.
EXPECT_EQ(1, accepted->Send("a", 1));
accepted->Close();
EXPECT_EQ(AsyncSocket::CS_CLOSED, accepted->GetState());
// Expect that the client is notified, and has not yet closed.
EXPECT_TRUE_WAIT(sink.Check(client.get(), testing::SSE_READ), kTimeout);
EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
EXPECT_EQ(AsyncSocket::CS_CONNECTED, client->GetState());
// Ensure the data can be read.
char buffer[10];
EXPECT_EQ(1, client->Recv(buffer, sizeof(buffer)));
EXPECT_EQ('a', buffer[0]);
// Now we should close, but the remote address will remain.
EXPECT_EQ_WAIT(AsyncSocket::CS_CLOSED, client->GetState(), kTimeout);
EXPECT_TRUE(sink.Check(client.get(), testing::SSE_CLOSE));
EXPECT_NE(kEmptyAddr, client->GetRemoteAddress());
// The closer should not get a close signal.
EXPECT_FALSE(sink.Check(accepted.get(), testing::SSE_CLOSE));
EXPECT_EQ(kEmptyAddr, accepted->GetRemoteAddress());
// And the closee should only get a single signal.
Thread::Current()->ProcessMessages(0);
EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
// Close down the client and ensure all is good.
client->Close();
EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
EXPECT_EQ(kEmptyAddr, client->GetRemoteAddress());
}
class SocketCloser : public sigslot::has_slots<> {
public:
void OnClose(AsyncSocket* socket, int error) {
socket->Close(); // Deleting here would blow up the vector of handlers
// for the socket's signal.
}
};
void SocketTest::TestCloseInClosedCallback() {
testing::StreamSink sink;
SocketCloser closer;
SocketAddress accept_addr;
// Create client.
scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(client.get());
client->SignalCloseEvent.connect(&closer, &SocketCloser::OnClose);
// Create server and listen.
scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(server.get());
EXPECT_EQ(0, server->Bind(kLoopbackAddr));
EXPECT_EQ(0, server->Listen(5));
// Attempt connection.
EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
// Accept connection.
EXPECT_TRUE_WAIT((sink.Check(server.get(), testing::SSE_READ)), kTimeout);
scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
ASSERT_TRUE(NULL != accepted.get());
sink.Monitor(accepted.get());
// Both sides are now connected.
EXPECT_EQ_WAIT(AsyncSocket::CS_CONNECTED, client->GetState(), kTimeout);
EXPECT_TRUE(sink.Check(client.get(), testing::SSE_OPEN));
EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress());
// Send data to the client, and then close the connection.
accepted->Close();
EXPECT_EQ(AsyncSocket::CS_CLOSED, accepted->GetState());
// Expect that the client is notified, and has not yet closed.
EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
EXPECT_EQ(AsyncSocket::CS_CONNECTED, client->GetState());
// Now we should be closed and invalidated
EXPECT_EQ_WAIT(AsyncSocket::CS_CLOSED, client->GetState(), kTimeout);
EXPECT_TRUE(sink.Check(client.get(), testing::SSE_CLOSE));
EXPECT_TRUE(Socket::CS_CLOSED == client->GetState());
}
class Sleeper : public MessageHandler {
public:
Sleeper() {}
void OnMessage(Message* msg) {
Thread::Current()->SleepMs(500);
}
};
void SocketTest::TestSocketServerWait() {
testing::StreamSink sink;
SocketAddress accept_addr;
// Create & connect server and client sockets.
scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(SOCK_STREAM));
scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(client.get());
sink.Monitor(server.get());
EXPECT_EQ(0, server->Bind(kLoopbackAddr));
EXPECT_EQ(0, server->Listen(5));
EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
EXPECT_TRUE_WAIT((sink.Check(server.get(), testing::SSE_READ)), kTimeout);
scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
ASSERT_TRUE(NULL != accepted.get());
sink.Monitor(accepted.get());
EXPECT_EQ(AsyncSocket::CS_CONNECTED, accepted->GetState());
EXPECT_EQ(server->GetLocalAddress(), accepted->GetLocalAddress());
EXPECT_EQ(client->GetLocalAddress(), accepted->GetRemoteAddress());
EXPECT_EQ_WAIT(AsyncSocket::CS_CONNECTED, client->GetState(), kTimeout);
EXPECT_TRUE(sink.Check(client.get(), testing::SSE_OPEN));
EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
// Do an i/o operation, triggering an eventual callback.
EXPECT_FALSE(sink.Check(accepted.get(), testing::SSE_READ));
char buf[1024] = {0};
EXPECT_EQ(1024, client->Send(buf, 1024));
EXPECT_FALSE(sink.Check(accepted.get(), testing::SSE_READ));
// Shouldn't signal when blocked in a thread Send, where process_io is false.
scoped_ptr<Thread> thread(new Thread());
thread->Start();
Sleeper sleeper;
TypedMessageData<AsyncSocket*> data(client.get());
thread->Send(&sleeper, 0, &data);
EXPECT_FALSE(sink.Check(accepted.get(), testing::SSE_READ));
// But should signal when process_io is true.
EXPECT_TRUE_WAIT((sink.Check(accepted.get(), testing::SSE_READ)), kTimeout);
EXPECT_LT(0, accepted->Recv(buf, 1024));
}
void SocketTest::TestTcp() {
testing::StreamSink sink;
SocketAddress accept_addr;
// Create test data.
const size_t kDataSize = 1024 * 1024;
scoped_array<char> send_buffer(new char[kDataSize]);
scoped_array<char> recv_buffer(new char[kDataSize]);
size_t send_pos = 0, recv_pos = 0;
for (size_t i = 0; i < kDataSize; ++i) {
send_buffer[i] = i;
recv_buffer[i] = 0;
}
// Create client.
scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(client.get());
// Create server and listen.
scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(server.get());
EXPECT_EQ(0, server->Bind(kLoopbackAddr));
EXPECT_EQ(0, server->Listen(5));
// Attempt connection.
EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
// Accept connection.
EXPECT_TRUE_WAIT((sink.Check(server.get(), testing::SSE_READ)), kTimeout);
scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
ASSERT_TRUE(NULL != accepted.get());
sink.Monitor(accepted.get());
// Both sides are now connected.
EXPECT_EQ_WAIT(AsyncSocket::CS_CONNECTED, client->GetState(), kTimeout);
EXPECT_TRUE(sink.Check(client.get(), testing::SSE_OPEN));
EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress());
// Send and receive a bunch of data.
bool send_waiting_for_writability = false;
bool send_expect_success = true;
bool recv_waiting_for_readability = true;
bool recv_expect_success = false;
int data_in_flight = 0;
while (recv_pos < kDataSize) {
// Send as much as we can if we've been cleared to send.
while (!send_waiting_for_writability && send_pos < kDataSize) {
int tosend = kDataSize - send_pos;
int sent = accepted->Send(send_buffer.get() + send_pos, tosend);
if (send_expect_success) {
// The first Send() after connecting or getting writability should
// succeed and send some data.
EXPECT_GT(sent, 0);
send_expect_success = false;
}
if (sent >= 0) {
EXPECT_LE(sent, tosend);
send_pos += sent;
data_in_flight += sent;
} else {
ASSERT_TRUE(accepted->IsBlocking());
send_waiting_for_writability = true;
}
}
// Read all the sent data.
while (data_in_flight > 0) {
if (recv_waiting_for_readability) {
// Wait until data is available.
EXPECT_TRUE_WAIT(sink.Check(client.get(), testing::SSE_READ), kTimeout);
recv_waiting_for_readability = false;
recv_expect_success = true;
}
// Receive as much as we can get in a single recv call.
int rcvd = client->Recv(recv_buffer.get() + recv_pos,
kDataSize - recv_pos);
if (recv_expect_success) {
// The first Recv() after getting readability should succeed and receive
// some data.
EXPECT_GT(rcvd, 0);
recv_expect_success = false;
}
if (rcvd >= 0) {
EXPECT_LE(rcvd, data_in_flight);
recv_pos += rcvd;
data_in_flight -= rcvd;
} else {
ASSERT_TRUE(client->IsBlocking());
recv_waiting_for_readability = true;
}
}
// Once all that we've sent has been rcvd, expect to be able to send again.
if (send_waiting_for_writability) {
EXPECT_TRUE_WAIT(sink.Check(accepted.get(), testing::SSE_WRITE),
kTimeout);
send_waiting_for_writability = false;
send_expect_success = true;
}
}
// The received data matches the sent data.
EXPECT_EQ(kDataSize, send_pos);
EXPECT_EQ(kDataSize, recv_pos);
EXPECT_EQ(0, memcmp(recv_buffer.get(), send_buffer.get(), kDataSize));
// Close down.
accepted->Close();
EXPECT_EQ_WAIT(AsyncSocket::CS_CLOSED, client->GetState(), kTimeout);
EXPECT_TRUE(sink.Check(client.get(), testing::SSE_CLOSE));
client->Close();
}
void SocketTest::TestSingleFlowControlCallback() {
testing::StreamSink sink;
SocketAddress accept_addr;
// Create client.
scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(client.get());
// Create server and listen.
scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(SOCK_STREAM));
sink.Monitor(server.get());
EXPECT_EQ(0, server->Bind(kLoopbackAddr));
EXPECT_EQ(0, server->Listen(5));
// Attempt connection.
EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
// Accept connection.
EXPECT_TRUE_WAIT((sink.Check(server.get(), testing::SSE_READ)), kTimeout);
scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
ASSERT_TRUE(NULL != accepted.get());
sink.Monitor(accepted.get());
// Both sides are now connected.
EXPECT_EQ_WAIT(AsyncSocket::CS_CONNECTED, client->GetState(), kTimeout);
EXPECT_TRUE(sink.Check(client.get(), testing::SSE_OPEN));
EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress());
// Fill the socket buffer.
char buf[1024 * 16] = {0};
while (accepted->Send(&buf, ARRAY_SIZE(buf)) != -1) {}
EXPECT_TRUE(accepted->IsBlocking());
// Expect no writable callbacks
EXPECT_FALSE(sink.Check(accepted.get(), testing::SSE_WRITE));
// Wait until data is available.
EXPECT_TRUE_WAIT(sink.Check(client.get(), testing::SSE_READ), kTimeout);
// Pull some data.
client->Recv(buf, ARRAY_SIZE(buf));
// Expect at least one additional writable callback.
EXPECT_TRUE_WAIT(sink.Check(accepted.get(), testing::SSE_WRITE), kTimeout);
// Adding data in response to the writeable callback shouldn't cause infinite
// callbacks.
int extras = 0;
for (int i = 0; i < 100; ++i) {
accepted->Send(&buf, ARRAY_SIZE(buf));
talk_base::Thread::Current()->ProcessMessages(1);
if (sink.Check(accepted.get(), testing::SSE_WRITE)) {
extras++;
}
}
EXPECT_LT(extras, 2);
// Close down.
accepted->Close();
client->Close();
}
void SocketTest::TestUdp() {
// Test basic bind and connect behavior.
SocketAddress addr1(kLoopbackAddr);
AsyncSocket* socket = ss_->CreateAsyncSocket(SOCK_DGRAM);
EXPECT_EQ(AsyncSocket::CS_CLOSED, socket->GetState());
EXPECT_EQ(0, socket->Bind(addr1));
addr1 = socket->GetLocalAddress();
EXPECT_EQ(0, socket->Connect(addr1));
EXPECT_EQ(AsyncSocket::CS_CONNECTED, socket->GetState());
socket->Close();
EXPECT_EQ(AsyncSocket::CS_CLOSED, socket->GetState());
delete socket;
// Test send/receive behavior.
scoped_ptr<TestClient> client1(new TestClient(
AsyncUDPSocket::Create(ss_, addr1)));
scoped_ptr<TestClient> client2(new TestClient(
AsyncUDPSocket::Create(ss_, SocketAddress())));
SocketAddress addr2;
EXPECT_EQ(3, client2->SendTo("foo", 3, addr1));
EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &addr2));
SocketAddress addr3;
EXPECT_EQ(6, client1->SendTo("bizbaz", 6, addr2));
EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &addr3));
EXPECT_EQ(addr3, addr1);
// TODO: figure out what the intent is here
for (int i = 0; i < 10; ++i) {
client2.reset(new TestClient(AsyncUDPSocket::Create(ss_, SocketAddress())));
SocketAddress addr4;
EXPECT_EQ(3, client2->SendTo("foo", 3, addr1));
EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &addr4));
EXPECT_EQ(addr4.ip(), addr2.ip());
SocketAddress addr5;
EXPECT_EQ(6, client1->SendTo("bizbaz", 6, addr4));
EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &addr5));
EXPECT_EQ(addr5, addr1);
addr2 = addr4;
}
}
void SocketTest::TestGetSetOptions() {
talk_base::scoped_ptr<AsyncSocket> socket(ss_->CreateAsyncSocket(SOCK_DGRAM));
socket->Bind(kLoopbackAddr);
// Check SNDBUF/RCVBUF.
const int desired_size = 12345;
#if defined(LINUX) || defined(ANDROID)
// Yes, really. It's in the kernel source.
const int expected_size = desired_size * 2;
#else // !LINUX && !ANDROID
const int expected_size = desired_size;
#endif // !LINUX && !ANDROID
int recv_size = 0;
int send_size = 0;
// get the initial sizes
ASSERT_NE(-1, socket->GetOption(Socket::OPT_RCVBUF, &recv_size));
ASSERT_NE(-1, socket->GetOption(Socket::OPT_SNDBUF, &send_size));
// set our desired sizes
ASSERT_NE(-1, socket->SetOption(Socket::OPT_RCVBUF, desired_size));
ASSERT_NE(-1, socket->SetOption(Socket::OPT_SNDBUF, desired_size));
// get the sizes again
ASSERT_NE(-1, socket->GetOption(Socket::OPT_RCVBUF, &recv_size));
ASSERT_NE(-1, socket->GetOption(Socket::OPT_SNDBUF, &send_size));
// make sure they are right
ASSERT_EQ(expected_size, recv_size);
ASSERT_EQ(expected_size, send_size);
// Check that we can't set NODELAY on a UDP socket.
int current_nd, desired_nd = 1;
ASSERT_EQ(-1, socket->GetOption(Socket::OPT_NODELAY, &current_nd));
ASSERT_EQ(-1, socket->SetOption(Socket::OPT_NODELAY, desired_nd));
// Try estimating MTU.
talk_base::scoped_ptr<AsyncSocket>
mtu_socket(ss_->CreateAsyncSocket(SOCK_DGRAM));
mtu_socket->Bind(kLoopbackAddr);
uint16 mtu;
// should fail until we connect
ASSERT_EQ(-1, mtu_socket->EstimateMTU(&mtu));
mtu_socket->Connect(kLoopbackAddr);
#if defined(WIN32)
// now it should succeed
ASSERT_NE(-1, mtu_socket->EstimateMTU(&mtu));
ASSERT_GE(mtu, 1492); // should be at least the 1492 "plateau" on localhost
#elif defined(OSX)
// except on OSX, where it's not yet implemented
ASSERT_EQ(-1, mtu_socket->EstimateMTU(&mtu));
#else
// and the behavior seems unpredictable on Linux, failing on the build machine
// but succeeding on my Ubiquity instance.
#endif
}
} // namespace talk_base