Track time internally using 64-bit microseconds.

Avoids strangeness with scheduling the next session to send a packet on
when the server's elapsed time wraps around the 32-bit microsecond mark
(every ~71 minutes).  We still transmit 32-bit timestamps on the wire,
the wrapping doesn't cause problems there.

Change-Id: I29a7d8158f0ae66597f9e9a149f61e7265f03150
diff --git a/cmds/isoping.cc b/cmds/isoping.cc
index 48999c3..f3bf99e 100644
--- a/cmds/isoping.cc
+++ b/cmds/isoping.cc
@@ -63,6 +63,7 @@
 
 #define ARRAY_LEN(a) (sizeof(a) / sizeof((a)[0]))
 #define DIFF(x, y) ((int32_t)((uint32_t)(x) - (uint32_t)(y)))
+#define DIFF64(x, y) ((int64_t)((uint64_t)(x) - (uint64_t)(y)))
 #define DIV(x, y) ((y) ? ((double)(x)/(y)) : 0)
 #define _STR(n) #n
 #define STR(n) _STR(n)
@@ -120,7 +121,7 @@
   DLOG("\n");
 }
 
-Session::Session(uint32_t first_send, uint32_t usec_per_pkt,
+Session::Session(uint64_t first_send, uint32_t usec_per_pkt,
                  const struct sockaddr_storage &raddr, size_t raddr_len)
     : usec_per_pkt(usec_per_pkt),
       usec_per_print(prints_per_sec > 0 ? 1e6 / prints_per_sec : 0),
@@ -149,7 +150,7 @@
   DLOG("Handshake state: NEW_SESSION\n");
 }
 
-SessionMap::iterator Sessions::NewSession(uint32_t first_send,
+SessionMap::iterator Sessions::NewSession(uint64_t first_send,
                                           uint32_t usec_per_pkt,
                                           struct sockaddr_storage *addr,
                                           socklen_t addr_len) {
@@ -242,7 +243,7 @@
   return true;
 }
 
-void Sessions::MaybeRotateCookieSecrets(uint32_t now, int is_server) {
+void Sessions::MaybeRotateCookieSecrets(uint64_t now, int is_server) {
   if (is_server && (now - last_secret_update_time) > 1000000) {
     // Round off the unix timestamp to 64 seconds as an epoch, so we don't have
     // to track which ones we've already used.
@@ -272,11 +273,9 @@
   DLOG("Generated new cookie secret.\n");
 }
 
-// Returns the kernel monotonic timestamp in microseconds, truncated to
-// 32 bits.  That will wrap around every ~4000 seconds, which is okay
-// for our purposes.  We use 32 bits to save space in our packets.
-// This function never returns the value 0; it returns 1 instead, so that
-// 0 can be used as a magic value.
+// Returns the kernel monotonic timestamp in microseconds.  This function never
+// returns the value 0; it returns 1 instead, so that 0 can be used as a magic
+// value.
 #ifdef __MACH__  // MacOS X doesn't have clock_gettime()
 #include <mach/mach.h>
 #include <mach/mach_time.h>
@@ -310,11 +309,6 @@
 #endif
 
 
-static uint32_t ustime(void) {
-  return (uint32_t)ustime64();
-}
-
-
 static void usage_and_die(char *argv0) {
   fprintf(stderr,
           "\n"
@@ -368,16 +362,13 @@
 // Print the timestamp corresponding to the current time.
 // Deliberately the same format as tcpdump uses, so we can easily sort and
 // correlate messages between isoping and tcpdump.
-static void print_timestamp(uint32_t when) {
-  uint64_t now = ustime64();
-  int32_t nowdiff = DIFF(now, when);
-  uint64_t when64 = now - nowdiff;
-  time_t t = when64 / 1000000;
+static void print_timestamp(uint64_t when) {
+  time_t t = when / 1000000;
   struct tm tm;
   memset(&tm, 0, sizeof(tm));
   localtime_r(&t, &tm);
   printf("%02d:%02d:%02d.%06d ", tm.tm_hour, tm.tm_min, tm.tm_sec,
-         (int)(when64 % 1000000));
+         (int)(when % 1000000));
 }
 
 
@@ -442,14 +433,14 @@
 }
 
 
-void prepare_handshake_reply_packet(Packet *tx, Packet *rx, uint32_t now) {
+void prepare_handshake_reply_packet(Packet *tx, Packet *rx, uint64_t now) {
   memset(tx, 0, sizeof(*tx));
   tx->magic = htonl(MAGIC);
   tx->id = rx->id;
   tx->usec_per_pkt = htonl(
       std::max(ntohl(rx->usec_per_pkt), (uint32_t)(1e6 / packets_per_sec)));
-  tx->txtime = now;
-  tx->clockdiff = htonl(now - ntohl(rx->txtime));
+  tx->txtime = (uint32_t)now;
+  tx->clockdiff = htonl((uint32_t)now - ntohl(rx->txtime));
   tx->num_lost = htonl(0);
   tx->packet_type = PACKET_TYPE_HANDSHAKE;
 }
@@ -462,7 +453,8 @@
       perror("sendto");
     }
   } else {
-    DLOG("Calling send on socket %d, size=%ld\n", sock, sizeof(s->tx));
+    DLOG("Calling send on socket %d, size=%ld, is_server=%d\n", sock,
+         sizeof(s->tx), is_server);
     if (send(sock, &s->tx, sizeof(s->tx), 0) < 0) {
       int e = errno;
       perror("send");
@@ -472,7 +464,7 @@
   if (is_server ||
       s->handshake_state == Session::ESTABLISHED ||
       s->handshake_state == Session::COOKIE_GENERATED) {
-    DLOG("send_packet: ack packet, next_send in %d (from %d to %d)\n",
+    DLOG("send_packet: ack packet, next_send in %d (from %ld to %ld)\n",
          s->usec_per_pkt, s->next_send, s->next_send + s->usec_per_pkt);
     s->next_send += s->usec_per_pkt;
   } else {
@@ -490,9 +482,10 @@
     // Limit the backoff to a factor of 2^10.
     uint32_t timeout = Session::handshake_timeout_usec *
                        (1 << std::min(10, s->handshake_retry_count));
-    DLOG("Sending handshake, retries=%d, next_send in %d us (from %u to %u)\n",
-         s->handshake_retry_count, timeout, s->next_send,
-         s->next_send + timeout);
+    DLOG(
+        "Sending handshake, retries=%d, next_send in %d us (from %lu to %lu)\n",
+        s->handshake_retry_count, timeout, s->next_send,
+        s->next_send + timeout);
     s->next_send += timeout;
     // Don't count the handshake packet as part of the sequence.
     s->next_tx_id--;
@@ -500,18 +493,19 @@
   return 0;
 }
 
-int send_waiting_packets(Sessions *sessions, int sock, uint32_t now,
+int send_waiting_packets(Sessions *sessions, int sock, uint64_t now,
                          int is_server) {
   if (sessions == NULL) {
     return -1;
   }
-  // TODO(pmccurdy): This will incorrectly send packets too early during the
-  // time period where now + usec_per_pkt wraps around, i.e. about once per 71
-  // minutes, and will end up blasting packets as fast as possible for that
-  // duration.  Consider calculating next_send and now as 64-bit values, and
-  // truncating to 32 bits when transmitting.
+  DLOG("send_waiting_packets: %ld waiting, now=%lu (0x%lx), "
+       "next send time=%ld, diff=%ld\n",
+       sessions->next_sends.size(), now, now, sessions->next_send_time(),
+       DIFF64(now, sessions->next_send_time()));
   while (sessions->next_sends.size() > 0 &&
-         DIFF(now, sessions->next_send_time()) >= 0) {
+         DIFF64(now, sessions->next_send_time()) >= 0) {
+    DLOG("%ld waiting packets, now=%ld, next send time=%ld\n",
+         sessions->next_sends.size(), now, sessions->next_send_time());
     SessionMap::iterator it = sessions->next_sends.top();
     sessions->next_sends.pop();
     Session &s = it->second;
@@ -539,7 +533,7 @@
   return 0;
 }
 
-int read_incoming_packet(Sessions *s, int sock, uint32_t now, int is_server) {
+int read_incoming_packet(Sessions *s, int sock, uint64_t now, int is_server) {
   struct sockaddr_storage rxaddr;
   socklen_t rxaddr_len = sizeof(rxaddr);
 
@@ -603,7 +597,7 @@
 // connection.
 void handle_packet(struct Sessions *s, struct Session *session, Packet *rx,
                    int sock, struct sockaddr_storage *rxaddr,
-                   socklen_t rxaddr_len, uint32_t now, int is_server) {
+                   socklen_t rxaddr_len, uint64_t now, int is_server) {
   switch (rx->packet_type) {
     case PACKET_TYPE_HANDSHAKE:
       if (is_server) {
@@ -624,7 +618,7 @@
           // Now we know the server has accepted our connection.  Clear out the
           // handshake data from the send buffer and prepare to track acks.
           DLOG("Ack from server on new connection; moving to state "
-               "ESTABLISHED.");
+               "ESTABLISHED.\n");
           session->handshake_state = Session::ESTABLISHED;
           memset(&session->tx.data.acks, 0, sizeof(session->tx.data.acks));
         }
@@ -640,7 +634,7 @@
 
 void handle_new_client_handshake_packet(Sessions *s, Packet *rx, int sock,
                                        struct sockaddr_storage *remoteaddr,
-                                       size_t remoteaddr_len, uint32_t now) {
+                                       size_t remoteaddr_len, uint64_t now) {
   assert(s != NULL);
   assert(rx != NULL);
   assert(remoteaddr != NULL);
@@ -683,7 +677,7 @@
   }
 }
 
-void handle_server_handshake_packet(Sessions *s, Packet *rx, uint32_t now) {
+void handle_server_handshake_packet(Sessions *s, Packet *rx, uint64_t now) {
   assert(s != NULL);
   assert(rx != NULL);
   assert(s->session_map.size() == 1);
@@ -705,7 +699,7 @@
     session.usec_per_pkt = usec_per_pkt;
   }
   DLOG("Handshake state: client received cookie from server, moving to "
-       "COOKIE_GENERATED; next_send=%d (was %d)\n",
+       "COOKIE_GENERATED; next_send=%ld (was %ld)\n",
        now, session.next_send);
   DLOG("Handshake: cookie epoch=%d, cookie=0x",
        rx->data.handshake.cookie_epoch);
@@ -715,7 +709,7 @@
   s->next_sends.push(it);
 }
 
-void handle_ack_packet(struct Session *s, uint32_t now) {
+void handle_ack_packet(struct Session *s, uint64_t now) {
   assert(s != NULL);
   assert(s->rx.packet_type == PACKET_TYPE_ACK);
   // process the incoming packet header.
@@ -727,7 +721,8 @@
   // tick... except for inevitable clock rate errors, which we have to
   // account for occasionally.
 
-  uint32_t txtime = ntohl(s->rx.txtime), rxtime = now;
+  uint32_t txtime = ntohl(s->rx.txtime);
+  uint64_t rxtime = now;
   uint32_t id = ntohl(s->rx.id);
   if (!s->next_rx_id) {
     // The remote txtime is told to us by the sender, so it is always perfectly
@@ -770,7 +765,7 @@
   }
 
   // fix up the clock offset if there's any drift.
-  tmpdiff = DIFF(rxtime, s->start_rxtime + id * s->usec_per_pkt);
+  tmpdiff = DIFF64(rxtime, s->start_rxtime + id * s->usec_per_pkt);
   if (tmpdiff < -20) {
     // packet arrived before predicted time, so prediction was based on
     // a packet that was "slow" before, or else one of our clocks is
@@ -779,8 +774,8 @@
             (long)tmpdiff);
     s->start_rxtime = rxtime - id * s->usec_per_pkt;
   }
-  int32_t rxdiff = DIFF(rxtime, s->start_rxtime + id * s->usec_per_pkt);
-  DLOG("ack: rxdiff=%d, rxtime=%u, start_rxtime=%u, id=%d, usec_per_pkt=%d\n",
+  int32_t rxdiff = DIFF64(rxtime, s->start_rxtime + id * s->usec_per_pkt);
+  DLOG("ack: rxdiff=%d, rxtime=%lu, start_rxtime=%lu, id=%d, usec_per_pkt=%d\n",
        rxdiff, rxtime, s->start_rxtime, id, s->usec_per_pkt);
 
   // Figure out the offset between our clock and the remote's clock, so we can
@@ -967,7 +962,7 @@
   }
 
   int is_server;
-  uint32_t now = ustime();       // current time
+  uint64_t now = ustime64();     // current time
 
   if (argc - optind == 0) {
     is_server = 1;
@@ -1045,7 +1040,7 @@
     struct timeval tv;
     tv.tv_sec = 0;
 
-    now = ustime();
+    now = ustime64();
     if (sessions->next_sends.size() == 0 ||
         DIFF(sessions->next_send_time(), now) < 0 ||
         extrasock > 0) {
@@ -1058,7 +1053,7 @@
       tvp = &tv;
     }
     int nfds = select(std::max(sock, extrasock) + 1, &rfds, NULL, NULL, tvp);
-    now = ustime();
+    now = ustime64();
     if (nfds < 0 && errno != EINTR) {
       perror("select");
       return 1;
diff --git a/cmds/isoping.h b/cmds/isoping.h
index 8ff819d..45c9bef 100644
--- a/cmds/isoping.h
+++ b/cmds/isoping.h
@@ -66,7 +66,7 @@
 
 // Data we track per session.
 struct Session {
-  Session(uint32_t first_send, uint32_t usec_per_pkt,
+  Session(uint64_t first_send, uint32_t usec_per_pkt,
           const struct sockaddr_storage &remoteaddr, size_t remoteaddr_len);
   int32_t usec_per_pkt;
   int32_t usec_per_print;
@@ -94,11 +94,11 @@
   uint32_t next_rx_id;       // expected id field for next receive
   uint32_t next_rxack_id;    // expected ack.id field in next received ack
   uint32_t start_rtxtime;    // remote's txtime at startup
-  uint32_t start_rxtime;     // local rxtime at startup
+  uint64_t start_rxtime;     // local rxtime at startup
   uint32_t last_rxtime;      // local rxtime of last received packet
   int32_t min_cycle_rxdiff;  // smallest packet delay seen this cycle
   uint32_t next_cycle;       // time when next cycle begins
-  uint32_t next_send;        // time when we'll send next pkt
+  uint64_t next_send;        // time when we'll send next pkt
   uint32_t num_lost;         // number of rx packets not received
   int next_txack_index;      // next array item to fill in tx.acks
   struct Packet tx, rx;      // transmit and received packet buffers
@@ -134,7 +134,7 @@
   ~Sessions();
 
   // Rotates the cookie secrets if they haven't been changed in a while.
-  virtual void MaybeRotateCookieSecrets(uint32_t now, int is_server);
+  virtual void MaybeRotateCookieSecrets(uint64_t now, int is_server);
 
   // Rotate the cookie secrets using the given epoch directly.  Only for use in
   // unit tests.
@@ -150,12 +150,12 @@
   virtual bool ValidateCookie(Packet *p, struct sockaddr_storage *addr,
                               socklen_t addr_len);
 
-  SessionMap::iterator NewSession(uint32_t first_send,
+  SessionMap::iterator NewSession(uint64_t first_send,
                                   uint32_t usec_per_pkt,
                                   struct sockaddr_storage *addr,
                                   socklen_t addr_len);
 
-  uint32_t next_send_time() {
+  uint64_t next_send_time() {
     if (next_sends.size() == 0) {
       return 0;
     }
@@ -189,35 +189,35 @@
 // Process an incoming packet from the socket.
 void handle_packet(struct Sessions *s, struct Session *session, Packet *rx,
                    int sock, struct sockaddr_storage *rxaddr,
-                   socklen_t rxaddr_len, uint32_t now, int is_server);
+                   socklen_t rxaddr_len, uint64_t now, int is_server);
 
 // Process an established Session's incoming ack packet, from s->rx.
-void handle_ack_packet(struct Session *s, uint32_t now);
+void handle_ack_packet(struct Session *s, uint64_t now);
 
 // Server-only: processes a handshake packet from a new client in rx. Replies
 // with a cookie if no cookie provided, or validates the provided cookie and
 // establishes a new Session.
 void handle_new_client_handshake_packet(Sessions *s, Packet *rx, int sock,
                                        struct sockaddr_storage *remoteaddr,
-                                       size_t remoteaddr_len, uint32_t now);
+                                       size_t remoteaddr_len, uint64_t now);
 
 // Client-only: processes a handshake packet received from the server.
 // Configures the Session to echo the provided cookie back to the server.
-void handle_server_handshake_packet(Sessions *s, Packet *rx, uint32_t now);
+void handle_server_handshake_packet(Sessions *s, Packet *rx, uint64_t now);
 
 // Sets all the elements of s->tx to be ready to be sent to the other side.
 void prepare_tx_packet(struct Session *s);
 
 // Sends a packet to all waiting sessions where the appropriate amount of time
 // has passed.
-int send_waiting_packets(Sessions *s, int sock, uint32_t now, int is_server);
+int send_waiting_packets(Sessions *s, int sock, uint64_t now, int is_server);
 
 // Sends a packet from the given session to the given socket immediately.
 int send_packet(struct Session *s, int sock, int is_server);
 
 // Reads a packet from sock and stores it in s->rx.  Assumes a packet is
 // currently readable.
-int read_incoming_packet(Sessions *s, int sock, uint32_t now, int is_server);
+int read_incoming_packet(Sessions *s, int sock, uint64_t now, int is_server);
 
 // Sets the global packets_per_sec value.  Used for test purposes only.
 void set_packets_per_sec(double new_pps);
diff --git a/cmds/isoping_test.cc b/cmds/isoping_test.cc
index acf96fd..d1f9ef7 100644
--- a/cmds/isoping_test.cc
+++ b/cmds/isoping_test.cc
@@ -27,17 +27,17 @@
 
 #include "isoping.h"
 
-uint32_t send_next_ack_packet(Session *from, uint32_t from_base,
-                          Session *to, uint32_t to_base, uint32_t latency) {
-  uint32_t t = from->next_send - from_base;
+uint32_t send_next_ack_packet(Session *from, uint64_t from_base, Session *to,
+                              uint64_t to_base, uint32_t latency) {
+  uint64_t t = from->next_send - from_base;
   prepare_tx_packet(from);
   to->rx = from->tx;
   from->next_send += from->usec_per_pkt;
   t += latency;
   handle_ack_packet(to, to_base + t);
   fprintf(stderr,
-          "**Sent packet: txtime=%d, start_txtime=%d, rxtime=%d, "
-          "start_rxtime=%d, latency=%d, t_from=%d, t_to=%d\n",
+          "**Sent packet: txtime=%ld, start_txtime=%d, rxtime=%lu, "
+          "start_rxtime=%lu, latency=%d, t_from=%lu, t_to=%lu\n",
           from->next_send,
           to->start_rtxtime,
           to_base + t,
@@ -49,15 +49,91 @@
   return t;
 }
 
+// Returns a new socket, connected to the given address.  Returns a negative
+// value on error.
+int create_client_socket(struct sockaddr_storage *listenaddr,
+                         socklen_t listenaddr_len, struct addrinfo *res) {
+  int csock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+  if (!WVPASS(csock >= 0)) {
+    perror("client socket");
+    return -1;
+  }
+  if (!WVPASS(!connect(csock, (struct sockaddr *)listenaddr, listenaddr_len))) {
+    perror("connect");
+    return -1;
+  }
+  struct sockaddr_in6 caddr;
+  socklen_t caddr_len = sizeof(caddr);
+  memset(&caddr, 0, caddr_len);
+  if (!WVPASS(!getsockname(csock, (struct sockaddr *)&caddr, &caddr_len))) {
+    perror("getsockname");
+    return -1;
+  }
+  char buf[128];
+  inet_ntop(AF_INET6, (struct sockaddr *)&caddr, buf, sizeof(buf));
+  printf("Created client connection on %s:%d\n", buf, ntohs(caddr.sin6_port));
+  return csock;
+}
+
+// Creates two sockets and puts them in *csock and *ssock.  *ssock is listening
+// to the given address, and *csock is connected to it.  Returns true on
+// success.
+bool create_local_socketpair(struct sockaddr_storage *listenaddr,
+                             socklen_t listenaddr_len, int *csock, int *ssock,
+                             struct addrinfo **res) {
+  struct addrinfo hints;
+
+  // Get local interface information.
+  memset(&hints, 0, sizeof(hints));
+  hints.ai_family = AF_INET6;
+  hints.ai_socktype = SOCK_DGRAM;
+  hints.ai_flags = AI_PASSIVE | AI_V4MAPPED;
+  int err = getaddrinfo(NULL, "0", &hints, res);
+  if (err != 0) {
+    WVPASSEQ("Error from getaddrinfo: ", gai_strerror(err));
+    return false;
+  }
+
+  *ssock = socket((*res)->ai_family, (*res)->ai_socktype, (*res)->ai_protocol);
+  if (!WVPASS(*ssock >= 0)) {
+    perror("server socket");
+    return false;
+  }
+
+  if (!WVPASS(!bind(*ssock, (*res)->ai_addr, (*res)->ai_addrlen))) {
+    perror("bind");
+    return false;
+  }
+
+  // Figure out the local port we got.
+  memset(listenaddr, 0, listenaddr_len);
+  if (!WVPASS(!getsockname(*ssock, (struct sockaddr *)listenaddr,
+                           &listenaddr_len))) {
+    perror("getsockname");
+    return false;
+  }
+
+  printf("Bound server socket to port=%d\n",
+         listenaddr->ss_family == AF_INET
+             ? ntohs(((struct sockaddr_in *)listenaddr)->sin_port)
+             : ntohs(((struct sockaddr_in6 *)listenaddr)->sin6_port));
+
+  *csock = create_client_socket(listenaddr, listenaddr_len, *res);
+  if (*csock < 0) {
+    return false;
+  }
+  return true;
+}
+
 WVTEST_MAIN("isoping algorithm logic") {
   // Establish a positive base time for client and server.  This is conceptually
   // the instant when the client sends its first message to the server, as
   // measured by the clocks on each side (note: this is before the server
   // receives the message).
-  uint32_t cbase = 400 * 1000;
-  uint32_t sbase = 600 * 1000;
-  uint32_t real_clockdiff = sbase - cbase;
-  uint32_t usec_per_pkt = 100 * 1000;
+  uint64_t cbase = 400 * 1000;
+  uint64_t sbase = 600 * 1000;
+  uint64_t real_clockdiff = sbase - cbase;
+  uint64_t usec_per_pkt = 100 * 1000;
 
   // The states of the client and server.
   struct sockaddr_storage empty_sockaddr;
@@ -434,68 +510,14 @@
 
   // Sockets for the client and server.
   int ssock, csock;
-  struct addrinfo hints, *res;
-
-  // Get local interface information.
-  memset(&hints, 0, sizeof(hints));
-  hints.ai_family = AF_INET6;
-  hints.ai_socktype = SOCK_DGRAM;
-  hints.ai_flags = AI_PASSIVE | AI_V4MAPPED;
-  int err = getaddrinfo(NULL, "0", &hints, &res);
-  if (err != 0) {
-    WVPASSEQ("Error from getaddrinfo: ", gai_strerror(err));
-    return;
-  }
-
-  ssock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
-  if (!WVPASS(ssock >= 0)) {
-    perror("server socket");
-    return;
-  }
-
-  csock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
-  if (!WVPASS(csock >= 0)) {
-    perror("client socket");
-    return;
-  }
-
-  if (!WVPASS(!bind(ssock, res->ai_addr, res->ai_addrlen))) {
-    perror("bind");
-    return;
-  }
-
-  // Figure out the local port we got.
   struct sockaddr_storage listenaddr;
   socklen_t listenaddr_len = sizeof(listenaddr);
-  memset(&listenaddr, 0, listenaddr_len);
-  if (!WVPASS(!getsockname(ssock, (struct sockaddr *)&listenaddr,
-                           &listenaddr_len))) {
-    perror("getsockname");
+  struct addrinfo *res;
+  if (!create_local_socketpair(&listenaddr, listenaddr_len, &csock, &ssock,
+                               &res)) {
     return;
   }
 
-  printf("Bound server socket to port=%d\n",
-         listenaddr.ss_family == AF_INET
-             ? ntohs(((struct sockaddr_in *)&listenaddr)->sin_port)
-             : ntohs(((struct sockaddr_in6 *)&listenaddr)->sin6_port));
-
-  // Connect the client's socket.
-  if (!WVPASS(
-          !connect(csock, (struct sockaddr *)&listenaddr, listenaddr_len))) {
-    perror("connect");
-    return;
-  }
-  struct sockaddr_in6 caddr;
-  socklen_t caddr_len = sizeof(caddr);
-  memset(&caddr, 0, caddr_len);
-  if (!WVPASS(!getsockname(csock, (struct sockaddr *)&caddr, &caddr_len))) {
-    perror("getsockname");
-    return;
-  }
-  char buf[128];
-  inet_ntop(AF_INET6, (struct sockaddr *)&caddr, buf, sizeof(buf));
-  printf("Created client connection on %s:%d\n", buf, ntohs(caddr.sin6_port));
-
   // All active sessions for the client and server.
   Sessions c;
   Sessions s;
@@ -539,7 +561,7 @@
   WVPASS(!send_waiting_packets(&c, csock, cbase + t, is_client));
   FD_ZERO(&rfds);
   FD_SET(csock, &rfds);
-  nfds = select(csock + 1, &rfds, NULL, NULL, &tv);
+  nfds = select(ssock + 1, &rfds, NULL, NULL, &tv);
   WVPASSEQ(nfds, 0);
 
   // Wait for the client to time out and resend the initial handshake packet.
@@ -620,26 +642,7 @@
   Sessions c2;
   set_packets_per_sec(4e6/usec_per_pkt);
   c2.NewSession(cbase, usec_per_pkt/10, &listenaddr, listenaddr_len);
-  int c2sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
-  if (!WVPASS(c2sock > 0)) {
-    perror("client socket 2");
-    return;
-  }
-  if (!WVPASS(
-          !connect(c2sock, (struct sockaddr *)&listenaddr, listenaddr_len))) {
-    perror("connect");
-    return;
-  }
-  struct sockaddr_in6 c2addr;
-  socklen_t c2addr_len = sizeof(c2addr);
-  memset(&c2addr, 0, c2addr_len);
-  if (!WVPASS(!getsockname(c2sock, (struct sockaddr *)&c2addr, &c2addr_len))) {
-    perror("getsockname");
-    return;
-  }
-  inet_ntop(AF_INET6, (struct sockaddr *)&c2addr, buf, sizeof(buf));
-  printf("Created new client connection on %s:%d\n", buf,
-         ntohs(c2addr.sin6_port));
+  int c2sock = create_client_socket(&listenaddr, listenaddr_len, res);
 
   Session &c2Session = c2.session_map.begin()->second;
   // Perform the handshake dance so the server knows we're legit.
@@ -853,3 +856,119 @@
   close(sock);
   freeaddrinfo(res);
 }
+
+WVTEST_MAIN("sending packets when time rolls over 32 bits") {
+  // Have the server's time be close to rolling over the 32-bit microsecond
+  // boundary.
+  uint32_t usec_per_pkt = 100 * 1000;
+  uint64_t cbase = 400 * 1000;
+  uint64_t wrap = 1ull << 32;
+  // It takes one round-trip to establish the handshake
+  uint64_t sbase = wrap - 1.5 * usec_per_pkt;
+
+  // Sockets for the clients and server.
+  int ssock, csock;
+  struct sockaddr_storage listenaddr;
+  socklen_t listenaddr_len = sizeof(listenaddr);
+  struct addrinfo *res;
+  if (!create_local_socketpair(&listenaddr, listenaddr_len, &csock, &ssock,
+                               &res)) {
+    return;
+  }
+
+  int c2sock = create_client_socket(&listenaddr, listenaddr_len, res);
+
+  // All active sessions for the clients and server.
+  Sessions s;
+  Sessions c;
+  Sessions c2;
+
+  int is_server = 1;
+  int is_client = 0;
+
+  s.MaybeRotateCookieSecrets(sbase, is_server);
+  // The first session sends slowly, the second one sends more frequently.
+  c.NewSession(cbase, 4 * usec_per_pkt, &listenaddr, listenaddr_len);
+  c2.NewSession(cbase, usec_per_pkt, &listenaddr, listenaddr_len);
+
+  // Send the initial handshake packets.
+  Session &cSession = c.session_map.begin()->second;
+  uint64_t t = cSession.next_send - cbase;
+  WVPASS(!send_waiting_packets(&c, csock, cbase + t, is_client));
+
+  //Session &c2Session = c2.session_map.begin()->second;
+  WVPASS(!send_waiting_packets(&c2, c2sock, cbase + t, is_client));
+
+  // The server sends the handshake response immediately.
+  WVPASS(!read_incoming_packet(&s, ssock, sbase, is_server));
+  WVPASS(!read_incoming_packet(&s, ssock, sbase, is_server));
+
+  // Finish the handshake.
+  WVPASS(!read_incoming_packet(&c, csock, cbase + t, is_client));
+  WVPASS(!read_incoming_packet(&c2, c2sock, cbase + t, is_client));
+  t = cSession.next_send - cbase;
+  WVPASS(!send_waiting_packets(&c, csock, cbase + t, is_client));
+  WVPASS(!send_waiting_packets(&c2, c2sock, cbase + t, is_client));
+
+  // Receive the full handshake packets at a time where the fast client can
+  // still send before the time wraps, but the slow client will exceed 32 bits.
+  t = wrap - 2 * usec_per_pkt - sbase;
+  WVPASS(!read_incoming_packet(&s, ssock, sbase + t, is_server));
+  WVPASS(!read_incoming_packet(&s, ssock, sbase + t, is_server));
+
+  WVPASSEQ(s.session_map.size(), 2);
+
+  Session &sSession = s.session_map.begin()->second;
+  t += usec_per_pkt;
+  printf("Finishing handshake: next_send_time=%lu (0x%lx)\n",
+         s.next_send_time(), s.next_send_time());
+  printf("last_rxtime: %d\n", sSession.last_rxtime);
+  printf("min_cycle_rxdiff: %d\n", sSession.min_cycle_rxdiff);
+  WVPASS(s.next_send_time() < wrap);
+  WVPASS(!send_waiting_packets(&s, ssock, sbase + t, is_server));
+  printf("Finished handshake, next_send_time=%lu (0x%lx)\n",
+         s.next_send_time(), s.next_send_time());
+  // The fast client still needs to send before we wrap.
+  WVPASS(s.next_send_time() < wrap);
+
+  // Verify we don't spuriously send any packets.
+  WVPASS(!read_incoming_packet(&c, csock, cbase + t, is_client));
+  WVPASS(!read_incoming_packet(&c2, c2sock, cbase + t, is_client));
+  WVPASS(!send_waiting_packets(&s, ssock, sbase + t, is_server));
+
+  fd_set rfds;
+  FD_ZERO(&rfds);
+  FD_SET(csock, &rfds);
+  FD_SET(c2sock, &rfds);
+  struct timeval tv = {0, 0};
+  int nfds = select(std::max(csock, c2sock) + 1, &rfds, NULL, NULL, &tv);
+  WVPASSEQ(nfds, 0);
+
+  // Verify that we still send pre-wrap-scheduled packets before the server's
+  // time actually wraps.
+  t = wrap - 1 - sbase;
+  WVPASS(!send_waiting_packets(&s, ssock, sbase + t, is_server));
+  WVPASS(wrap < s.next_send_time());
+
+  FD_ZERO(&rfds);
+  FD_SET(c2sock, &rfds);
+  nfds = select(std::max(csock, c2sock) + 1, &rfds, NULL, NULL, &tv);
+  WVPASSEQ(nfds, 1);
+  WVPASS(!read_incoming_packet(&c2, c2sock, cbase + t, is_client));
+
+  // Verify we can still send packets after crossing the 32-bit boundary.
+  t = s.next_send_time() - sbase;
+  WVPASS(!send_waiting_packets(&s, ssock, sbase + t, is_server));
+
+  FD_ZERO(&rfds);
+  FD_SET(c2sock, &rfds);
+  nfds = select(std::max(csock, c2sock) + 1, &rfds, NULL, NULL, &tv);
+  WVPASSEQ(nfds, 1);
+  WVPASS(!read_incoming_packet(&c2, c2sock, cbase + t, is_client));
+
+  // Cleanup
+  close(ssock);
+  close(csock);
+  close(c2sock);
+  freeaddrinfo(res);
+}