Track time internally using 64-bit microseconds.
Avoids strangeness with scheduling the next session to send a packet on
when the server's elapsed time wraps around the 32-bit microsecond mark
(every ~71 minutes). We still transmit 32-bit timestamps on the wire,
the wrapping doesn't cause problems there.
Change-Id: I29a7d8158f0ae66597f9e9a149f61e7265f03150
diff --git a/cmds/isoping.cc b/cmds/isoping.cc
index 48999c3..f3bf99e 100644
--- a/cmds/isoping.cc
+++ b/cmds/isoping.cc
@@ -63,6 +63,7 @@
#define ARRAY_LEN(a) (sizeof(a) / sizeof((a)[0]))
#define DIFF(x, y) ((int32_t)((uint32_t)(x) - (uint32_t)(y)))
+#define DIFF64(x, y) ((int64_t)((uint64_t)(x) - (uint64_t)(y)))
#define DIV(x, y) ((y) ? ((double)(x)/(y)) : 0)
#define _STR(n) #n
#define STR(n) _STR(n)
@@ -120,7 +121,7 @@
DLOG("\n");
}
-Session::Session(uint32_t first_send, uint32_t usec_per_pkt,
+Session::Session(uint64_t first_send, uint32_t usec_per_pkt,
const struct sockaddr_storage &raddr, size_t raddr_len)
: usec_per_pkt(usec_per_pkt),
usec_per_print(prints_per_sec > 0 ? 1e6 / prints_per_sec : 0),
@@ -149,7 +150,7 @@
DLOG("Handshake state: NEW_SESSION\n");
}
-SessionMap::iterator Sessions::NewSession(uint32_t first_send,
+SessionMap::iterator Sessions::NewSession(uint64_t first_send,
uint32_t usec_per_pkt,
struct sockaddr_storage *addr,
socklen_t addr_len) {
@@ -242,7 +243,7 @@
return true;
}
-void Sessions::MaybeRotateCookieSecrets(uint32_t now, int is_server) {
+void Sessions::MaybeRotateCookieSecrets(uint64_t now, int is_server) {
if (is_server && (now - last_secret_update_time) > 1000000) {
// Round off the unix timestamp to 64 seconds as an epoch, so we don't have
// to track which ones we've already used.
@@ -272,11 +273,9 @@
DLOG("Generated new cookie secret.\n");
}
-// Returns the kernel monotonic timestamp in microseconds, truncated to
-// 32 bits. That will wrap around every ~4000 seconds, which is okay
-// for our purposes. We use 32 bits to save space in our packets.
-// This function never returns the value 0; it returns 1 instead, so that
-// 0 can be used as a magic value.
+// Returns the kernel monotonic timestamp in microseconds. This function never
+// returns the value 0; it returns 1 instead, so that 0 can be used as a magic
+// value.
#ifdef __MACH__ // MacOS X doesn't have clock_gettime()
#include <mach/mach.h>
#include <mach/mach_time.h>
@@ -310,11 +309,6 @@
#endif
-static uint32_t ustime(void) {
- return (uint32_t)ustime64();
-}
-
-
static void usage_and_die(char *argv0) {
fprintf(stderr,
"\n"
@@ -368,16 +362,13 @@
// Print the timestamp corresponding to the current time.
// Deliberately the same format as tcpdump uses, so we can easily sort and
// correlate messages between isoping and tcpdump.
-static void print_timestamp(uint32_t when) {
- uint64_t now = ustime64();
- int32_t nowdiff = DIFF(now, when);
- uint64_t when64 = now - nowdiff;
- time_t t = when64 / 1000000;
+static void print_timestamp(uint64_t when) {
+ time_t t = when / 1000000;
struct tm tm;
memset(&tm, 0, sizeof(tm));
localtime_r(&t, &tm);
printf("%02d:%02d:%02d.%06d ", tm.tm_hour, tm.tm_min, tm.tm_sec,
- (int)(when64 % 1000000));
+ (int)(when % 1000000));
}
@@ -442,14 +433,14 @@
}
-void prepare_handshake_reply_packet(Packet *tx, Packet *rx, uint32_t now) {
+void prepare_handshake_reply_packet(Packet *tx, Packet *rx, uint64_t now) {
memset(tx, 0, sizeof(*tx));
tx->magic = htonl(MAGIC);
tx->id = rx->id;
tx->usec_per_pkt = htonl(
std::max(ntohl(rx->usec_per_pkt), (uint32_t)(1e6 / packets_per_sec)));
- tx->txtime = now;
- tx->clockdiff = htonl(now - ntohl(rx->txtime));
+ tx->txtime = (uint32_t)now;
+ tx->clockdiff = htonl((uint32_t)now - ntohl(rx->txtime));
tx->num_lost = htonl(0);
tx->packet_type = PACKET_TYPE_HANDSHAKE;
}
@@ -462,7 +453,8 @@
perror("sendto");
}
} else {
- DLOG("Calling send on socket %d, size=%ld\n", sock, sizeof(s->tx));
+ DLOG("Calling send on socket %d, size=%ld, is_server=%d\n", sock,
+ sizeof(s->tx), is_server);
if (send(sock, &s->tx, sizeof(s->tx), 0) < 0) {
int e = errno;
perror("send");
@@ -472,7 +464,7 @@
if (is_server ||
s->handshake_state == Session::ESTABLISHED ||
s->handshake_state == Session::COOKIE_GENERATED) {
- DLOG("send_packet: ack packet, next_send in %d (from %d to %d)\n",
+ DLOG("send_packet: ack packet, next_send in %d (from %ld to %ld)\n",
s->usec_per_pkt, s->next_send, s->next_send + s->usec_per_pkt);
s->next_send += s->usec_per_pkt;
} else {
@@ -490,9 +482,10 @@
// Limit the backoff to a factor of 2^10.
uint32_t timeout = Session::handshake_timeout_usec *
(1 << std::min(10, s->handshake_retry_count));
- DLOG("Sending handshake, retries=%d, next_send in %d us (from %u to %u)\n",
- s->handshake_retry_count, timeout, s->next_send,
- s->next_send + timeout);
+ DLOG(
+ "Sending handshake, retries=%d, next_send in %d us (from %lu to %lu)\n",
+ s->handshake_retry_count, timeout, s->next_send,
+ s->next_send + timeout);
s->next_send += timeout;
// Don't count the handshake packet as part of the sequence.
s->next_tx_id--;
@@ -500,18 +493,19 @@
return 0;
}
-int send_waiting_packets(Sessions *sessions, int sock, uint32_t now,
+int send_waiting_packets(Sessions *sessions, int sock, uint64_t now,
int is_server) {
if (sessions == NULL) {
return -1;
}
- // TODO(pmccurdy): This will incorrectly send packets too early during the
- // time period where now + usec_per_pkt wraps around, i.e. about once per 71
- // minutes, and will end up blasting packets as fast as possible for that
- // duration. Consider calculating next_send and now as 64-bit values, and
- // truncating to 32 bits when transmitting.
+ DLOG("send_waiting_packets: %ld waiting, now=%lu (0x%lx), "
+ "next send time=%ld, diff=%ld\n",
+ sessions->next_sends.size(), now, now, sessions->next_send_time(),
+ DIFF64(now, sessions->next_send_time()));
while (sessions->next_sends.size() > 0 &&
- DIFF(now, sessions->next_send_time()) >= 0) {
+ DIFF64(now, sessions->next_send_time()) >= 0) {
+ DLOG("%ld waiting packets, now=%ld, next send time=%ld\n",
+ sessions->next_sends.size(), now, sessions->next_send_time());
SessionMap::iterator it = sessions->next_sends.top();
sessions->next_sends.pop();
Session &s = it->second;
@@ -539,7 +533,7 @@
return 0;
}
-int read_incoming_packet(Sessions *s, int sock, uint32_t now, int is_server) {
+int read_incoming_packet(Sessions *s, int sock, uint64_t now, int is_server) {
struct sockaddr_storage rxaddr;
socklen_t rxaddr_len = sizeof(rxaddr);
@@ -603,7 +597,7 @@
// connection.
void handle_packet(struct Sessions *s, struct Session *session, Packet *rx,
int sock, struct sockaddr_storage *rxaddr,
- socklen_t rxaddr_len, uint32_t now, int is_server) {
+ socklen_t rxaddr_len, uint64_t now, int is_server) {
switch (rx->packet_type) {
case PACKET_TYPE_HANDSHAKE:
if (is_server) {
@@ -624,7 +618,7 @@
// Now we know the server has accepted our connection. Clear out the
// handshake data from the send buffer and prepare to track acks.
DLOG("Ack from server on new connection; moving to state "
- "ESTABLISHED.");
+ "ESTABLISHED.\n");
session->handshake_state = Session::ESTABLISHED;
memset(&session->tx.data.acks, 0, sizeof(session->tx.data.acks));
}
@@ -640,7 +634,7 @@
void handle_new_client_handshake_packet(Sessions *s, Packet *rx, int sock,
struct sockaddr_storage *remoteaddr,
- size_t remoteaddr_len, uint32_t now) {
+ size_t remoteaddr_len, uint64_t now) {
assert(s != NULL);
assert(rx != NULL);
assert(remoteaddr != NULL);
@@ -683,7 +677,7 @@
}
}
-void handle_server_handshake_packet(Sessions *s, Packet *rx, uint32_t now) {
+void handle_server_handshake_packet(Sessions *s, Packet *rx, uint64_t now) {
assert(s != NULL);
assert(rx != NULL);
assert(s->session_map.size() == 1);
@@ -705,7 +699,7 @@
session.usec_per_pkt = usec_per_pkt;
}
DLOG("Handshake state: client received cookie from server, moving to "
- "COOKIE_GENERATED; next_send=%d (was %d)\n",
+ "COOKIE_GENERATED; next_send=%ld (was %ld)\n",
now, session.next_send);
DLOG("Handshake: cookie epoch=%d, cookie=0x",
rx->data.handshake.cookie_epoch);
@@ -715,7 +709,7 @@
s->next_sends.push(it);
}
-void handle_ack_packet(struct Session *s, uint32_t now) {
+void handle_ack_packet(struct Session *s, uint64_t now) {
assert(s != NULL);
assert(s->rx.packet_type == PACKET_TYPE_ACK);
// process the incoming packet header.
@@ -727,7 +721,8 @@
// tick... except for inevitable clock rate errors, which we have to
// account for occasionally.
- uint32_t txtime = ntohl(s->rx.txtime), rxtime = now;
+ uint32_t txtime = ntohl(s->rx.txtime);
+ uint64_t rxtime = now;
uint32_t id = ntohl(s->rx.id);
if (!s->next_rx_id) {
// The remote txtime is told to us by the sender, so it is always perfectly
@@ -770,7 +765,7 @@
}
// fix up the clock offset if there's any drift.
- tmpdiff = DIFF(rxtime, s->start_rxtime + id * s->usec_per_pkt);
+ tmpdiff = DIFF64(rxtime, s->start_rxtime + id * s->usec_per_pkt);
if (tmpdiff < -20) {
// packet arrived before predicted time, so prediction was based on
// a packet that was "slow" before, or else one of our clocks is
@@ -779,8 +774,8 @@
(long)tmpdiff);
s->start_rxtime = rxtime - id * s->usec_per_pkt;
}
- int32_t rxdiff = DIFF(rxtime, s->start_rxtime + id * s->usec_per_pkt);
- DLOG("ack: rxdiff=%d, rxtime=%u, start_rxtime=%u, id=%d, usec_per_pkt=%d\n",
+ int32_t rxdiff = DIFF64(rxtime, s->start_rxtime + id * s->usec_per_pkt);
+ DLOG("ack: rxdiff=%d, rxtime=%lu, start_rxtime=%lu, id=%d, usec_per_pkt=%d\n",
rxdiff, rxtime, s->start_rxtime, id, s->usec_per_pkt);
// Figure out the offset between our clock and the remote's clock, so we can
@@ -967,7 +962,7 @@
}
int is_server;
- uint32_t now = ustime(); // current time
+ uint64_t now = ustime64(); // current time
if (argc - optind == 0) {
is_server = 1;
@@ -1045,7 +1040,7 @@
struct timeval tv;
tv.tv_sec = 0;
- now = ustime();
+ now = ustime64();
if (sessions->next_sends.size() == 0 ||
DIFF(sessions->next_send_time(), now) < 0 ||
extrasock > 0) {
@@ -1058,7 +1053,7 @@
tvp = &tv;
}
int nfds = select(std::max(sock, extrasock) + 1, &rfds, NULL, NULL, tvp);
- now = ustime();
+ now = ustime64();
if (nfds < 0 && errno != EINTR) {
perror("select");
return 1;
diff --git a/cmds/isoping.h b/cmds/isoping.h
index 8ff819d..45c9bef 100644
--- a/cmds/isoping.h
+++ b/cmds/isoping.h
@@ -66,7 +66,7 @@
// Data we track per session.
struct Session {
- Session(uint32_t first_send, uint32_t usec_per_pkt,
+ Session(uint64_t first_send, uint32_t usec_per_pkt,
const struct sockaddr_storage &remoteaddr, size_t remoteaddr_len);
int32_t usec_per_pkt;
int32_t usec_per_print;
@@ -94,11 +94,11 @@
uint32_t next_rx_id; // expected id field for next receive
uint32_t next_rxack_id; // expected ack.id field in next received ack
uint32_t start_rtxtime; // remote's txtime at startup
- uint32_t start_rxtime; // local rxtime at startup
+ uint64_t start_rxtime; // local rxtime at startup
uint32_t last_rxtime; // local rxtime of last received packet
int32_t min_cycle_rxdiff; // smallest packet delay seen this cycle
uint32_t next_cycle; // time when next cycle begins
- uint32_t next_send; // time when we'll send next pkt
+ uint64_t next_send; // time when we'll send next pkt
uint32_t num_lost; // number of rx packets not received
int next_txack_index; // next array item to fill in tx.acks
struct Packet tx, rx; // transmit and received packet buffers
@@ -134,7 +134,7 @@
~Sessions();
// Rotates the cookie secrets if they haven't been changed in a while.
- virtual void MaybeRotateCookieSecrets(uint32_t now, int is_server);
+ virtual void MaybeRotateCookieSecrets(uint64_t now, int is_server);
// Rotate the cookie secrets using the given epoch directly. Only for use in
// unit tests.
@@ -150,12 +150,12 @@
virtual bool ValidateCookie(Packet *p, struct sockaddr_storage *addr,
socklen_t addr_len);
- SessionMap::iterator NewSession(uint32_t first_send,
+ SessionMap::iterator NewSession(uint64_t first_send,
uint32_t usec_per_pkt,
struct sockaddr_storage *addr,
socklen_t addr_len);
- uint32_t next_send_time() {
+ uint64_t next_send_time() {
if (next_sends.size() == 0) {
return 0;
}
@@ -189,35 +189,35 @@
// Process an incoming packet from the socket.
void handle_packet(struct Sessions *s, struct Session *session, Packet *rx,
int sock, struct sockaddr_storage *rxaddr,
- socklen_t rxaddr_len, uint32_t now, int is_server);
+ socklen_t rxaddr_len, uint64_t now, int is_server);
// Process an established Session's incoming ack packet, from s->rx.
-void handle_ack_packet(struct Session *s, uint32_t now);
+void handle_ack_packet(struct Session *s, uint64_t now);
// Server-only: processes a handshake packet from a new client in rx. Replies
// with a cookie if no cookie provided, or validates the provided cookie and
// establishes a new Session.
void handle_new_client_handshake_packet(Sessions *s, Packet *rx, int sock,
struct sockaddr_storage *remoteaddr,
- size_t remoteaddr_len, uint32_t now);
+ size_t remoteaddr_len, uint64_t now);
// Client-only: processes a handshake packet received from the server.
// Configures the Session to echo the provided cookie back to the server.
-void handle_server_handshake_packet(Sessions *s, Packet *rx, uint32_t now);
+void handle_server_handshake_packet(Sessions *s, Packet *rx, uint64_t now);
// Sets all the elements of s->tx to be ready to be sent to the other side.
void prepare_tx_packet(struct Session *s);
// Sends a packet to all waiting sessions where the appropriate amount of time
// has passed.
-int send_waiting_packets(Sessions *s, int sock, uint32_t now, int is_server);
+int send_waiting_packets(Sessions *s, int sock, uint64_t now, int is_server);
// Sends a packet from the given session to the given socket immediately.
int send_packet(struct Session *s, int sock, int is_server);
// Reads a packet from sock and stores it in s->rx. Assumes a packet is
// currently readable.
-int read_incoming_packet(Sessions *s, int sock, uint32_t now, int is_server);
+int read_incoming_packet(Sessions *s, int sock, uint64_t now, int is_server);
// Sets the global packets_per_sec value. Used for test purposes only.
void set_packets_per_sec(double new_pps);
diff --git a/cmds/isoping_test.cc b/cmds/isoping_test.cc
index acf96fd..d1f9ef7 100644
--- a/cmds/isoping_test.cc
+++ b/cmds/isoping_test.cc
@@ -27,17 +27,17 @@
#include "isoping.h"
-uint32_t send_next_ack_packet(Session *from, uint32_t from_base,
- Session *to, uint32_t to_base, uint32_t latency) {
- uint32_t t = from->next_send - from_base;
+uint32_t send_next_ack_packet(Session *from, uint64_t from_base, Session *to,
+ uint64_t to_base, uint32_t latency) {
+ uint64_t t = from->next_send - from_base;
prepare_tx_packet(from);
to->rx = from->tx;
from->next_send += from->usec_per_pkt;
t += latency;
handle_ack_packet(to, to_base + t);
fprintf(stderr,
- "**Sent packet: txtime=%d, start_txtime=%d, rxtime=%d, "
- "start_rxtime=%d, latency=%d, t_from=%d, t_to=%d\n",
+ "**Sent packet: txtime=%ld, start_txtime=%d, rxtime=%lu, "
+ "start_rxtime=%lu, latency=%d, t_from=%lu, t_to=%lu\n",
from->next_send,
to->start_rtxtime,
to_base + t,
@@ -49,15 +49,91 @@
return t;
}
+// Returns a new socket, connected to the given address. Returns a negative
+// value on error.
+int create_client_socket(struct sockaddr_storage *listenaddr,
+ socklen_t listenaddr_len, struct addrinfo *res) {
+ int csock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+ if (!WVPASS(csock >= 0)) {
+ perror("client socket");
+ return -1;
+ }
+ if (!WVPASS(!connect(csock, (struct sockaddr *)listenaddr, listenaddr_len))) {
+ perror("connect");
+ return -1;
+ }
+ struct sockaddr_in6 caddr;
+ socklen_t caddr_len = sizeof(caddr);
+ memset(&caddr, 0, caddr_len);
+ if (!WVPASS(!getsockname(csock, (struct sockaddr *)&caddr, &caddr_len))) {
+ perror("getsockname");
+ return -1;
+ }
+ char buf[128];
+ inet_ntop(AF_INET6, (struct sockaddr *)&caddr, buf, sizeof(buf));
+ printf("Created client connection on %s:%d\n", buf, ntohs(caddr.sin6_port));
+ return csock;
+}
+
+// Creates two sockets and puts them in *csock and *ssock. *ssock is listening
+// to the given address, and *csock is connected to it. Returns true on
+// success.
+bool create_local_socketpair(struct sockaddr_storage *listenaddr,
+ socklen_t listenaddr_len, int *csock, int *ssock,
+ struct addrinfo **res) {
+ struct addrinfo hints;
+
+ // Get local interface information.
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_INET6;
+ hints.ai_socktype = SOCK_DGRAM;
+ hints.ai_flags = AI_PASSIVE | AI_V4MAPPED;
+ int err = getaddrinfo(NULL, "0", &hints, res);
+ if (err != 0) {
+ WVPASSEQ("Error from getaddrinfo: ", gai_strerror(err));
+ return false;
+ }
+
+ *ssock = socket((*res)->ai_family, (*res)->ai_socktype, (*res)->ai_protocol);
+ if (!WVPASS(*ssock >= 0)) {
+ perror("server socket");
+ return false;
+ }
+
+ if (!WVPASS(!bind(*ssock, (*res)->ai_addr, (*res)->ai_addrlen))) {
+ perror("bind");
+ return false;
+ }
+
+ // Figure out the local port we got.
+ memset(listenaddr, 0, listenaddr_len);
+ if (!WVPASS(!getsockname(*ssock, (struct sockaddr *)listenaddr,
+ &listenaddr_len))) {
+ perror("getsockname");
+ return false;
+ }
+
+ printf("Bound server socket to port=%d\n",
+ listenaddr->ss_family == AF_INET
+ ? ntohs(((struct sockaddr_in *)listenaddr)->sin_port)
+ : ntohs(((struct sockaddr_in6 *)listenaddr)->sin6_port));
+
+ *csock = create_client_socket(listenaddr, listenaddr_len, *res);
+ if (*csock < 0) {
+ return false;
+ }
+ return true;
+}
+
WVTEST_MAIN("isoping algorithm logic") {
// Establish a positive base time for client and server. This is conceptually
// the instant when the client sends its first message to the server, as
// measured by the clocks on each side (note: this is before the server
// receives the message).
- uint32_t cbase = 400 * 1000;
- uint32_t sbase = 600 * 1000;
- uint32_t real_clockdiff = sbase - cbase;
- uint32_t usec_per_pkt = 100 * 1000;
+ uint64_t cbase = 400 * 1000;
+ uint64_t sbase = 600 * 1000;
+ uint64_t real_clockdiff = sbase - cbase;
+ uint64_t usec_per_pkt = 100 * 1000;
// The states of the client and server.
struct sockaddr_storage empty_sockaddr;
@@ -434,68 +510,14 @@
// Sockets for the client and server.
int ssock, csock;
- struct addrinfo hints, *res;
-
- // Get local interface information.
- memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_INET6;
- hints.ai_socktype = SOCK_DGRAM;
- hints.ai_flags = AI_PASSIVE | AI_V4MAPPED;
- int err = getaddrinfo(NULL, "0", &hints, &res);
- if (err != 0) {
- WVPASSEQ("Error from getaddrinfo: ", gai_strerror(err));
- return;
- }
-
- ssock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
- if (!WVPASS(ssock >= 0)) {
- perror("server socket");
- return;
- }
-
- csock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
- if (!WVPASS(csock >= 0)) {
- perror("client socket");
- return;
- }
-
- if (!WVPASS(!bind(ssock, res->ai_addr, res->ai_addrlen))) {
- perror("bind");
- return;
- }
-
- // Figure out the local port we got.
struct sockaddr_storage listenaddr;
socklen_t listenaddr_len = sizeof(listenaddr);
- memset(&listenaddr, 0, listenaddr_len);
- if (!WVPASS(!getsockname(ssock, (struct sockaddr *)&listenaddr,
- &listenaddr_len))) {
- perror("getsockname");
+ struct addrinfo *res;
+ if (!create_local_socketpair(&listenaddr, listenaddr_len, &csock, &ssock,
+ &res)) {
return;
}
- printf("Bound server socket to port=%d\n",
- listenaddr.ss_family == AF_INET
- ? ntohs(((struct sockaddr_in *)&listenaddr)->sin_port)
- : ntohs(((struct sockaddr_in6 *)&listenaddr)->sin6_port));
-
- // Connect the client's socket.
- if (!WVPASS(
- !connect(csock, (struct sockaddr *)&listenaddr, listenaddr_len))) {
- perror("connect");
- return;
- }
- struct sockaddr_in6 caddr;
- socklen_t caddr_len = sizeof(caddr);
- memset(&caddr, 0, caddr_len);
- if (!WVPASS(!getsockname(csock, (struct sockaddr *)&caddr, &caddr_len))) {
- perror("getsockname");
- return;
- }
- char buf[128];
- inet_ntop(AF_INET6, (struct sockaddr *)&caddr, buf, sizeof(buf));
- printf("Created client connection on %s:%d\n", buf, ntohs(caddr.sin6_port));
-
// All active sessions for the client and server.
Sessions c;
Sessions s;
@@ -539,7 +561,7 @@
WVPASS(!send_waiting_packets(&c, csock, cbase + t, is_client));
FD_ZERO(&rfds);
FD_SET(csock, &rfds);
- nfds = select(csock + 1, &rfds, NULL, NULL, &tv);
+ nfds = select(ssock + 1, &rfds, NULL, NULL, &tv);
WVPASSEQ(nfds, 0);
// Wait for the client to time out and resend the initial handshake packet.
@@ -620,26 +642,7 @@
Sessions c2;
set_packets_per_sec(4e6/usec_per_pkt);
c2.NewSession(cbase, usec_per_pkt/10, &listenaddr, listenaddr_len);
- int c2sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
- if (!WVPASS(c2sock > 0)) {
- perror("client socket 2");
- return;
- }
- if (!WVPASS(
- !connect(c2sock, (struct sockaddr *)&listenaddr, listenaddr_len))) {
- perror("connect");
- return;
- }
- struct sockaddr_in6 c2addr;
- socklen_t c2addr_len = sizeof(c2addr);
- memset(&c2addr, 0, c2addr_len);
- if (!WVPASS(!getsockname(c2sock, (struct sockaddr *)&c2addr, &c2addr_len))) {
- perror("getsockname");
- return;
- }
- inet_ntop(AF_INET6, (struct sockaddr *)&c2addr, buf, sizeof(buf));
- printf("Created new client connection on %s:%d\n", buf,
- ntohs(c2addr.sin6_port));
+ int c2sock = create_client_socket(&listenaddr, listenaddr_len, res);
Session &c2Session = c2.session_map.begin()->second;
// Perform the handshake dance so the server knows we're legit.
@@ -853,3 +856,119 @@
close(sock);
freeaddrinfo(res);
}
+
+WVTEST_MAIN("sending packets when time rolls over 32 bits") {
+ // Have the server's time be close to rolling over the 32-bit microsecond
+ // boundary.
+ uint32_t usec_per_pkt = 100 * 1000;
+ uint64_t cbase = 400 * 1000;
+ uint64_t wrap = 1ull << 32;
+ // It takes one round-trip to establish the handshake
+ uint64_t sbase = wrap - 1.5 * usec_per_pkt;
+
+ // Sockets for the clients and server.
+ int ssock, csock;
+ struct sockaddr_storage listenaddr;
+ socklen_t listenaddr_len = sizeof(listenaddr);
+ struct addrinfo *res;
+ if (!create_local_socketpair(&listenaddr, listenaddr_len, &csock, &ssock,
+ &res)) {
+ return;
+ }
+
+ int c2sock = create_client_socket(&listenaddr, listenaddr_len, res);
+
+ // All active sessions for the clients and server.
+ Sessions s;
+ Sessions c;
+ Sessions c2;
+
+ int is_server = 1;
+ int is_client = 0;
+
+ s.MaybeRotateCookieSecrets(sbase, is_server);
+ // The first session sends slowly, the second one sends more frequently.
+ c.NewSession(cbase, 4 * usec_per_pkt, &listenaddr, listenaddr_len);
+ c2.NewSession(cbase, usec_per_pkt, &listenaddr, listenaddr_len);
+
+ // Send the initial handshake packets.
+ Session &cSession = c.session_map.begin()->second;
+ uint64_t t = cSession.next_send - cbase;
+ WVPASS(!send_waiting_packets(&c, csock, cbase + t, is_client));
+
+ //Session &c2Session = c2.session_map.begin()->second;
+ WVPASS(!send_waiting_packets(&c2, c2sock, cbase + t, is_client));
+
+ // The server sends the handshake response immediately.
+ WVPASS(!read_incoming_packet(&s, ssock, sbase, is_server));
+ WVPASS(!read_incoming_packet(&s, ssock, sbase, is_server));
+
+ // Finish the handshake.
+ WVPASS(!read_incoming_packet(&c, csock, cbase + t, is_client));
+ WVPASS(!read_incoming_packet(&c2, c2sock, cbase + t, is_client));
+ t = cSession.next_send - cbase;
+ WVPASS(!send_waiting_packets(&c, csock, cbase + t, is_client));
+ WVPASS(!send_waiting_packets(&c2, c2sock, cbase + t, is_client));
+
+ // Receive the full handshake packets at a time where the fast client can
+ // still send before the time wraps, but the slow client will exceed 32 bits.
+ t = wrap - 2 * usec_per_pkt - sbase;
+ WVPASS(!read_incoming_packet(&s, ssock, sbase + t, is_server));
+ WVPASS(!read_incoming_packet(&s, ssock, sbase + t, is_server));
+
+ WVPASSEQ(s.session_map.size(), 2);
+
+ Session &sSession = s.session_map.begin()->second;
+ t += usec_per_pkt;
+ printf("Finishing handshake: next_send_time=%lu (0x%lx)\n",
+ s.next_send_time(), s.next_send_time());
+ printf("last_rxtime: %d\n", sSession.last_rxtime);
+ printf("min_cycle_rxdiff: %d\n", sSession.min_cycle_rxdiff);
+ WVPASS(s.next_send_time() < wrap);
+ WVPASS(!send_waiting_packets(&s, ssock, sbase + t, is_server));
+ printf("Finished handshake, next_send_time=%lu (0x%lx)\n",
+ s.next_send_time(), s.next_send_time());
+ // The fast client still needs to send before we wrap.
+ WVPASS(s.next_send_time() < wrap);
+
+ // Verify we don't spuriously send any packets.
+ WVPASS(!read_incoming_packet(&c, csock, cbase + t, is_client));
+ WVPASS(!read_incoming_packet(&c2, c2sock, cbase + t, is_client));
+ WVPASS(!send_waiting_packets(&s, ssock, sbase + t, is_server));
+
+ fd_set rfds;
+ FD_ZERO(&rfds);
+ FD_SET(csock, &rfds);
+ FD_SET(c2sock, &rfds);
+ struct timeval tv = {0, 0};
+ int nfds = select(std::max(csock, c2sock) + 1, &rfds, NULL, NULL, &tv);
+ WVPASSEQ(nfds, 0);
+
+ // Verify that we still send pre-wrap-scheduled packets before the server's
+ // time actually wraps.
+ t = wrap - 1 - sbase;
+ WVPASS(!send_waiting_packets(&s, ssock, sbase + t, is_server));
+ WVPASS(wrap < s.next_send_time());
+
+ FD_ZERO(&rfds);
+ FD_SET(c2sock, &rfds);
+ nfds = select(std::max(csock, c2sock) + 1, &rfds, NULL, NULL, &tv);
+ WVPASSEQ(nfds, 1);
+ WVPASS(!read_incoming_packet(&c2, c2sock, cbase + t, is_client));
+
+ // Verify we can still send packets after crossing the 32-bit boundary.
+ t = s.next_send_time() - sbase;
+ WVPASS(!send_waiting_packets(&s, ssock, sbase + t, is_server));
+
+ FD_ZERO(&rfds);
+ FD_SET(c2sock, &rfds);
+ nfds = select(std::max(csock, c2sock) + 1, &rfds, NULL, NULL, &tv);
+ WVPASSEQ(nfds, 1);
+ WVPASS(!read_incoming_packet(&c2, c2sock, cbase + t, is_client));
+
+ // Cleanup
+ close(ssock);
+ close(csock);
+ close(c2sock);
+ freeaddrinfo(res);
+}