Track multiple isoping client sessions in the server.

We keep a map of each client's session based on their IP and port, and a
priority_queue of the next scheduled outgoing packet.

Change-Id: I28423f24ccbb268226bab0b815d841f94d84f597
diff --git a/cmds/isoping.cc b/cmds/isoping.cc
index aac7935..05fffd6 100644
--- a/cmds/isoping.cc
+++ b/cmds/isoping.cc
@@ -81,11 +81,11 @@
   want_to_die = 1;
 }
 
-Session::Session(uint32_t now)
-    : usec_per_pkt(1e6 / packets_per_sec),
+Session::Session(uint32_t first_send, uint32_t usec_per_pkt,
+                 const struct sockaddr_storage &raddr, size_t raddr_len)
+    : usec_per_pkt(usec_per_pkt),
       usec_per_print(prints_per_sec > 0 ? 1e6 / prints_per_sec : 0),
-      remoteaddr(NULL),
-      remoteaddr_len(0),
+      remoteaddr_len(raddr_len),
       next_tx_id(1),
       next_rx_id(0),
       next_rxack_id(0),
@@ -94,18 +94,29 @@
       last_rxtime(0),
       min_cycle_rxdiff(0),
       next_cycle(0),
-      next_send(now + usec_per_pkt),
+      next_send(first_send),
       num_lost(0),
       next_txack_index(0),
-      last_print(now - usec_per_pkt),
+      last_print(first_send - usec_per_pkt),
       lat_tx(0), lat_tx_min(0x7fffffff), lat_tx_max(0),
       lat_tx_count(0), lat_tx_sum(0), lat_tx_var_sum(0),
       lat_rx(0), lat_rx_min(0x7fffffff), lat_rx_max(0),
       lat_rx_count(0), lat_rx_sum(0), lat_rx_var_sum(0) {
+  memcpy(&remoteaddr, &raddr, raddr_len);
   memset(&tx, 0, sizeof(tx));
   strcpy(last_ackinfo, "");
 }
 
+SessionMap::iterator Sessions::NewSession(uint32_t first_send,
+                                          uint32_t usec_per_pkt,
+                                          struct sockaddr_storage *addr,
+                                          socklen_t addr_len) {
+  std::pair<SessionMap::iterator, bool> p = session_map.insert(std::make_pair(
+      *addr, Session(first_send, usec_per_pkt, *addr, addr_len)));
+  next_sends.push(p.first);
+  return p.first;
+}
+
 // Returns the kernel monotonic timestamp in microseconds, truncated to
 // 32 bits.  That will wrap around every ~4000 seconds, which is okay
 // for our purposes.  We use 32 bits to save space in our packets.
@@ -189,6 +200,34 @@
   return addrbuf;
 }
 
+bool CompareSockaddr::operator()(const struct sockaddr_storage &lhs,
+                                 const struct sockaddr_storage &rhs) {
+  if (lhs.ss_family != rhs.ss_family) {
+    return lhs.ss_family < rhs.ss_family;
+  }
+  if (lhs.ss_family == AF_INET) {
+    const struct sockaddr_in &lhs4 = *(const struct sockaddr_in*)&lhs;
+    const struct sockaddr_in &rhs4 = *(const struct sockaddr_in*)&rhs;
+    long long c = (ntohl(lhs4.sin_addr.s_addr) - ntohl(rhs4.sin_addr.s_addr));
+    if (c == 0) {
+      return ntohs(lhs4.sin_port) < ntohs(rhs4.sin_port);
+    }
+    return c < 0;
+  } else {
+    const struct sockaddr_in6 &lhs6 = *(const struct sockaddr_in6*)&lhs;
+    const struct sockaddr_in6 &rhs6 = *(const struct sockaddr_in6*)&rhs;
+    int c = memcmp(&lhs6.sin6_addr, &rhs6.sin6_addr, sizeof(struct in6_addr));
+    if (c == 0) {
+      return ntohs(lhs6.sin6_port) < ntohs(rhs6.sin6_port);
+    }
+    return c < 0;
+  }
+}
+
+bool CompareNextSend::operator()(const SessionMap::iterator &lhs,
+                                 const SessionMap::iterator &rhs) {
+  return lhs->second.next_send > rhs->second.next_send;
+}
 
 // Print the timestamp corresponding to the current time.
 // Deliberately the same format as tcpdump uses, so we can easily sort and
@@ -235,7 +274,7 @@
   // receive the timing information for the packets it sent us.
   if (is_server) {
     if (sendto(sock, &s->tx, sizeof(s->tx), 0,
-               s->remoteaddr, s->remoteaddr_len) < 0) {
+               (struct sockaddr *)&s->remoteaddr, s->remoteaddr_len) < 0) {
       perror("sendto");
     }
   } else {
@@ -250,59 +289,84 @@
 }
 
 
-int maybe_send_packet(struct Session *s, int sock, uint32_t now) {
-  if (s->remoteaddr && DIFF(now, s->next_send) >= 0) {
-    prepare_tx_packet(s);
-    int err = send_packet(s, sock);
+int send_waiting_packets(Sessions *sessions, int sock, uint32_t now) {
+  if (sessions == NULL) {
+    return -1;
+  }
+  // TODO(pmccurdy): This will incorrectly send packets too early during the
+  // time period where now + usec_per_pkt wraps around, i.e. about once per 71
+  // minutes, and will end up blasting packets as fast as possible for that
+  // duration.  Consider calculating next_send and now as 64-bit values, and
+  // truncating to 32 bits when transmitting.
+  while (sessions->next_sends.size() > 0 &&
+         DIFF(now, sessions->next_send_time()) >= 0) {
+    SessionMap::iterator it = sessions->next_sends.top();
+    sessions->next_sends.pop();
+    Session &s = it->second;
+    prepare_tx_packet(&s);
+    int err = send_packet(&s, sock);
     if (err != 0) {
       return err;
     }
+    // TODO(pmccurdy): Detect connection refused on a per-client basis.  Use
+    // recvmsg with the MSG_ERRQUEUE flag to get error and client address,
+    // instead of waiting for timeout.
+    // TODO(pmccurdy): Support very low packet-per-second values, e.g. one
+    // packet per hour, without constantly disconnecting the client.
+    if (is_server && DIFF(now, s.last_rxtime) > 60 * 1000 * 1000) {
+      fprintf(stderr, "client %s disconnected.\n",
+              sockaddr_to_str((struct sockaddr *)&s.remoteaddr));
+      sessions->session_map.erase(s.remoteaddr);
+    } else {
+      sessions->next_sends.push(it);
+    }
   }
   return 0;
 }
 
-
-int read_incoming_packet(struct Session *s, int sock, uint32_t now) {
-  struct sockaddr_in6 rxaddr;
+int read_incoming_packet(Sessions *s, int sock, uint32_t now, int is_server) {
+  struct sockaddr_storage rxaddr;
   socklen_t rxaddr_len = 0;
-  // TODO(pmccurdy): Temporary until we properly support multiple clients.
-  static struct sockaddr_in6 last_rxaddr;
 
+  Packet rx;
   rxaddr_len = sizeof(rxaddr);
-  ssize_t got = recvfrom(sock, &s->rx, sizeof(s->rx), 0,
+  ssize_t got = recvfrom(sock, &rx, sizeof(rx), 0,
                          (struct sockaddr *)&rxaddr, &rxaddr_len);
   if (got < 0) {
     int e = errno;
     perror("recvfrom");
     return e;
   }
-  if (got != sizeof(s->rx) || s->rx.magic != htonl(MAGIC)) {
+  if (got != sizeof(rx) || rx.magic != htonl(MAGIC)) {
     fprintf(stderr, "got invalid packet of length %ld\n", (long)got);
     return EINVAL;
   }
 
-  // is it a new client?
+  SessionMap::iterator it;
   if (is_server) {
-    // TODO(pmccurdy): Maintain a hash table of Sessions, look up based
-    // on rxaddr, create a new one if necessary, remove this resetting code.
-    if (!s->remoteaddr ||
-        memcmp(&rxaddr, &last_rxaddr, sizeof(rxaddr)) != 0) {
-      fprintf(stderr, "new client connected: %s\n",
+    it = s->session_map.find(rxaddr);
+    if (it == s->session_map.end()) {
+      fprintf(stderr, "New client connection: %s\n",
               sockaddr_to_str((struct sockaddr *)&rxaddr));
-      memcpy(&last_rxaddr, &rxaddr, sizeof(rxaddr));
-      s->remoteaddr = (struct sockaddr *)&last_rxaddr;
-      s->remoteaddr_len = rxaddr_len;
-
-      s->next_send = now + 10*1000;
-      s->next_tx_id = 1;
-      s->next_rx_id = s->next_rxack_id = 0;
-      s->start_rtxtime = s->start_rxtime = 0;
-      s->num_lost = 0;
-      s->next_txack_index = 0;
-      s->usec_per_pkt = ntohl(s->rx.usec_per_pkt);
-      memset(&s->tx, 0, sizeof(s->tx));
+      // TODO(pmccurdy):  This lets clients unconditionally set the usec_per_pkt
+      // values used.  Add some mechanism to let the server override or reject
+      // this value (e.g. limit to a certain range, or reduce per-client pps as
+      // more clients connect).
+      it = s->NewSession(now + 10 * 1000, ntohl(rx.usec_per_pkt), &rxaddr,
+                         rxaddr_len);
+    }
+  } else {
+    it = s->session_map.begin();
+    if (it == s->session_map.end()) {
+      fprintf(stderr, "No session configured for %s when receiving packet\n",
+              sockaddr_to_str((struct sockaddr *)&rxaddr));
+      return EINVAL;
     }
   }
+  Session &session = it->second;
+  memcpy(&session.rx, &rx, sizeof(session.rx));
+  handle_packet(&session, now);
+
   return 0;
 }
 
@@ -473,7 +537,7 @@
       uint32_t rxtime = rrxtime + offset;
       // note: already contains 1/2 rtt, unlike rxdiff
       int32_t txdiff = DIFF(rxtime, txtime);
-      if (s->usec_per_print <= 0 && s->last_ackinfo[0]) {
+      if (!quiet && s->usec_per_print <= 0 && s->last_ackinfo[0]) {
         // only print multiple acks per rx if no usec_per_print limit
         if (want_timestamps) print_timestamp(rxtime);
         printf("%12s\n", s->last_ackinfo);
@@ -550,8 +614,7 @@
   }
 
   uint32_t now = ustime();       // current time
-
-  struct Session s(now);
+  Sessions sessions;
 
   if (argc - optind == 0) {
     is_server = 1;
@@ -588,8 +651,8 @@
       perror("connect");
       return 1;
     }
-    s.remoteaddr = ai->ai_addr;
-    s.remoteaddr_len = ai->ai_addrlen;
+    sessions.NewSession(now, 1e6 / packets_per_sec,
+                        (struct sockaddr_storage *)ai->ai_addr, ai->ai_addrlen);
   } else {
     usage_and_die(argv[0]);
   }
@@ -626,55 +689,53 @@
     tv.tv_sec = 0;
 
     now = ustime();
-    if (DIFF(s.next_send, now) < 0) {
+    if (sessions.next_sends.size() == 0 ||
+        DIFF(sessions.next_send_time(), now) < 0) {
       tv.tv_usec = 0;
     } else {
-      tv.tv_usec = DIFF(s.next_send, now);
+      tv.tv_usec = DIFF(sessions.next_send_time(), now);
     }
-    int nfds = select(sock + 1, &rfds, NULL, NULL, s.remoteaddr ? &tv : NULL);
+    int nfds = select(sock + 1, &rfds, NULL, NULL,
+                      sessions.next_sends.size() > 0 ? &tv : NULL);
     now = ustime();
     if (nfds < 0 && errno != EINTR) {
       perror("select");
       return 1;
     }
 
-    // time to send the next packet?
-    int err = maybe_send_packet(&s, sock, now);
+    int err = send_waiting_packets(&sessions, sock, now);
     if (err != 0) {
       return err;
     }
-    // TODO(pmccurdy): Track disconnections across multiple clients.  Use
-    // recvmsg with the MSG_ERRQUEUE flag to detect connection refused.
-    if (is_server && DIFF(now, s.last_rxtime) > 60 * 1000 * 1000) {
-      fprintf(stderr, "client disconnected.\n");
-      s.remoteaddr = NULL;
-    }
 
     if (nfds > 0) {
-      err = read_incoming_packet(&s, sock, now);
+      err = read_incoming_packet(&sessions, sock, now, is_server);
       if (!is_server && err == ECONNREFUSED) return 2;
       if (err != 0) {
         continue;
       }
-      handle_packet(&s, now);
     }
   }
 
-  // TODO(pmccurdy): Separate out per-client and global stats.
-  printf("\n---\n");
-  printf("tx: min/avg/max/mdev = %.2f/%.2f/%.2f/%.2f ms\n",
-         s.lat_tx_min / 1000.0,
-         DIV(s.lat_tx_sum, s.lat_tx_count) / 1000.0,
-         s.lat_tx_max / 1000.0,
-         onepass_stddev(
-             s.lat_tx_var_sum, s.lat_tx_sum, s.lat_tx_count) / 1000.0);
-  printf("rx: min/avg/max/mdev = %.2f/%.2f/%.2f/%.2f ms\n",
-         s.lat_rx_min / 1000.0,
-         DIV(s.lat_rx_sum, s.lat_rx_count) / 1000.0,
-         s.lat_rx_max / 1000.0,
-         onepass_stddev(
-             s.lat_rx_var_sum, s.lat_rx_sum, s.lat_rx_count) / 1000.0);
-  printf("\n");
+  // TODO(pmccurdy): Separate out per-client and global stats, print stats for
+  // the server when each client disconnects.
+  if (!is_server) {
+    Session &s = sessions.session_map.begin()->second;
+    printf("\n---\n");
+    printf("tx: min/avg/max/mdev = %.2f/%.2f/%.2f/%.2f ms\n",
+           s.lat_tx_min / 1000.0,
+           DIV(s.lat_tx_sum, s.lat_tx_count) / 1000.0,
+           s.lat_tx_max / 1000.0,
+           onepass_stddev(
+               s.lat_tx_var_sum, s.lat_tx_sum, s.lat_tx_count) / 1000.0);
+    printf("rx: min/avg/max/mdev = %.2f/%.2f/%.2f/%.2f ms\n",
+           s.lat_rx_min / 1000.0,
+           DIV(s.lat_rx_sum, s.lat_rx_count) / 1000.0,
+           s.lat_rx_max / 1000.0,
+           onepass_stddev(
+               s.lat_rx_var_sum, s.lat_rx_sum, s.lat_rx_count) / 1000.0);
+    printf("\n");
+  }
 
   if (ai) freeaddrinfo(ai);
   if (sock >= 0) close(sock);
diff --git a/cmds/isoping.h b/cmds/isoping.h
index 11feae2..7100a51 100644
--- a/cmds/isoping.h
+++ b/cmds/isoping.h
@@ -16,6 +16,9 @@
 #ifndef ISOPING_H
 #define ISOPING_H
 
+#include <map>
+#include <netinet/in.h>
+#include <queue>
 #include <stdint.h>
 #include <sys/socket.h>
 
@@ -40,12 +43,13 @@
 
 // Data we track per session.
 struct Session {
-  Session(uint32_t now);
+  Session(uint32_t first_send, uint32_t usec_per_pkt,
+          const struct sockaddr_storage &remoteaddr, size_t remoteaddr_len);
   int32_t usec_per_pkt;
   int32_t usec_per_print;
 
   // The peer's address.
-  struct sockaddr *remoteaddr;
+  struct sockaddr_storage remoteaddr;
   socklen_t remoteaddr_len;
 
   // WARNING: lots of math below relies on well-defined uint32/int32
@@ -75,18 +79,60 @@
       lat_rx_count, lat_rx_sum, lat_rx_var_sum;
 };
 
+// Comparator for use with sockaddr_in and sockaddr_in6 values.  Sorts by
+// address family first, then on the IPv4/6 address, then the port number.
+struct CompareSockaddr {
+  bool operator()(const struct sockaddr_storage &lhs,
+                  const struct sockaddr_storage &rhs);
+};
+
+typedef std::map<struct sockaddr_storage, Session, CompareSockaddr>
+    SessionMap;
+
+// Compares the next_send values of each referenced Session, sorting the earlier
+// timestamps first.
+struct CompareNextSend {
+  bool operator()(const SessionMap::iterator &lhs,
+                  const SessionMap::iterator &rhs);
+};
+
+struct Sessions {
+ public:
+  Sessions() {}
+
+  // All active sessions, indexed by remote address/port.
+  SessionMap session_map;
+  // A queue of upcoming send times, ordered most recent first, referencing
+  // entries in the session map.
+  std::priority_queue<SessionMap::iterator, std::vector<SessionMap::iterator>,
+      CompareNextSend> next_sends;
+
+  SessionMap::iterator NewSession(uint32_t first_send,
+                                  uint32_t usec_per_pkt,
+                                  struct sockaddr_storage *addr,
+                                  socklen_t addr_len);
+
+  uint32_t next_send_time() {
+    if (next_sends.size() == 0) {
+      return 0;
+    }
+    return next_sends.top()->second.next_send;
+  }
+};
+
 // Process the Session's incoming packet, from s->rx.
 void handle_packet(struct Session *s, uint32_t now);
 
 // Sets all the elements of s->tx to be ready to be sent to the other side.
 void prepare_tx_packet(struct Session *s);
 
-// Sends a packet to the socket if the appropriate amount of time has passed.
-int maybe_send_packet(struct Session *s, int sock, uint32_t now);
+// Sends a packet to all waiting sessions where the appropriate amount of time
+// has passed.
+int send_waiting_packets(Sessions *s, int sock, uint32_t now);
 
 // Reads a packet from sock and stores it in s->rx.  Assumes a packet is
 // currently readable.
-int read_incoming_packet(struct Session *s, int sock, uint32_t now);
+int read_incoming_packet(Sessions *s, int sock, uint32_t now, int is_server);
 
 // Parses arguments and runs the main loop.  Distinct from main() for unit test
 // purposes.
diff --git a/cmds/isoping_test.cc b/cmds/isoping_test.cc
index 572490b..a8da304 100644
--- a/cmds/isoping_test.cc
+++ b/cmds/isoping_test.cc
@@ -15,6 +15,7 @@
  */
 
 #include <arpa/inet.h>
+#include <errno.h>
 #include <limits.h>
 #include <memory.h>
 #include <stdio.h>
@@ -56,10 +57,12 @@
   uint32_t cbase = 400 * 1000;
   uint32_t sbase = 600 * 1000;
   uint32_t real_clockdiff = sbase - cbase;
+  uint32_t usec_per_pkt = 100 * 1000;
 
   // The states of the client and server.
-  struct Session c(cbase);
-  struct Session s(sbase);
+  struct sockaddr_storage empty_sockaddr;
+  struct Session c(cbase, usec_per_pkt, empty_sockaddr, sizeof(empty_sockaddr));
+  struct Session s(sbase, usec_per_pkt, empty_sockaddr, sizeof(empty_sockaddr));
 
   // One-way latencies: cs_latency is the latency from client to server;
   // sc_latency is from server to client.
@@ -72,9 +75,6 @@
 
   // Send the initial packet from client to server.  This isn't enough to let us
   // draw any useful latency conclusions.
-  // TODO(pmccurdy): Setting next_send is duplicating some work done in the main
-  // loop / send_packet.  Extract that into somewhere testable, then test it.
-  c.next_send = cbase;
   t = send_next_packet(&c, cbase, &s, sbase, cs_latency);
   uint32_t rxtime = sbase + t;
   s.next_send = rxtime + 10 * 1000;
@@ -256,10 +256,12 @@
 WVTEST_MAIN("isoping clock drift") {
   uint32_t cbase = 1400 * 1000;
   uint32_t sbase = 1600 * 1000;
+  uint32_t usec_per_pkt = 100 * 1000;
 
   // The states of the client and server.
-  struct Session c(cbase);
-  struct Session s(sbase);
+  struct sockaddr_storage empty_sockaddr;
+  struct Session c(cbase, usec_per_pkt, empty_sockaddr, sizeof(empty_sockaddr));
+  struct Session s(sbase, usec_per_pkt, empty_sockaddr, sizeof(empty_sockaddr));
   // Send packets infrequently, to get new cycles more often.
   s.usec_per_pkt = 1 * 1000 * 1000;
   c.usec_per_pkt = 1 * 1000 * 1000;
@@ -424,10 +426,6 @@
   uint32_t cbase = 1400 * 1000;
   uint32_t sbase = 1600 * 1000;
 
-  // The states of the client and server.
-  struct Session c(cbase);
-  struct Session s(sbase);
-
   // Sockets for the client and server.
   int ssock, csock;
   struct addrinfo hints, *res;
@@ -461,7 +459,7 @@
   }
 
   // Figure out the local port we got.
-  struct sockaddr_in6 listenaddr;
+  struct sockaddr_storage listenaddr;
   socklen_t listenaddr_len = sizeof(listenaddr);
   memset(&listenaddr, 0, listenaddr_len);
   if (!WVPASS(!getsockname(ssock, (struct sockaddr *)&listenaddr,
@@ -470,7 +468,10 @@
     return;
   }
 
-  printf("Bound server socket to port=%d\n", listenaddr.sin6_port);
+  printf("Bound server socket to port=%d\n",
+         listenaddr.ss_family == AF_INET
+             ? ntohs(((struct sockaddr_in *)&listenaddr)->sin_port)
+             : ntohs(((struct sockaddr_in6 *)&listenaddr)->sin6_port));
 
   // Connect the client's socket.
   if (!WVPASS(
@@ -478,14 +479,30 @@
     perror("connect");
     return;
   }
+  struct sockaddr_in6 caddr;
+  socklen_t caddr_len = sizeof(caddr);
+  memset(&caddr, 0, caddr_len);
+  if (!WVPASS(!getsockname(csock, (struct sockaddr *)&caddr, &caddr_len))) {
+    perror("getsockname");
+    return;
+  }
+  char buf[128];
+  inet_ntop(AF_INET6, (struct sockaddr *)&caddr, buf, sizeof(buf));
+  printf("Created client connection on %s:%d\n", buf, ntohs(caddr.sin6_port));
 
-  c.remoteaddr = (struct sockaddr *)&listenaddr;
-  c.remoteaddr_len = listenaddr_len;
+  // All active sessions for the client and server.
+  Sessions c;
+  Sessions s;
+  uint32_t usec_per_pkt = 100 * 1000;
+
+  c.NewSession(cbase + 1, usec_per_pkt, &listenaddr, listenaddr_len);
 
   uint32_t cs_latency = 4000;
   uint32_t sc_latency = 5000;
-  uint32_t t = c.usec_per_pkt - 1;
-  WVPASS(!maybe_send_packet(&c, csock, cbase + t));
+
+  Session &cSession = c.session_map.begin()->second;
+  uint32_t t = cSession.next_send - cbase - 1;
+  WVPASS(!send_waiting_packets(&c, csock, cbase + t));
 
   // Verify we didn't send a packet before its time.
   fd_set rfds;
@@ -497,8 +514,8 @@
 
   // Send a packet in each direction.
   t += 1;
-  WVPASS(!maybe_send_packet(&c, csock, cbase + t));
-  WVPASSEQ(c.next_tx_id, 2);
+  WVPASS(!send_waiting_packets(&c, csock, cbase + t));
+  WVPASSEQ(cSession.next_tx_id, 2);
 
   FD_ZERO(&rfds);
   FD_SET(ssock, &rfds);
@@ -506,23 +523,26 @@
   WVPASSEQ(nfds, 1);
 
   t += cs_latency;
-  WVPASS(!read_incoming_packet(&s, ssock, sbase + t));
+  int is_server = 1;
+  int is_client = 0;
+  WVPASS(!read_incoming_packet(&s, ssock, sbase + t, is_server));
+  WVPASSEQ(s.session_map.size(), 1);
+  WVPASSEQ(s.next_sends.size(), 1);
+  WVPASSEQ(s.next_send_time(), sbase + t + 10 * 1000);
 
-  WVPASS(s.remoteaddr != NULL);
-  WVPASS(s.remoteaddr_len > 0);
-  WVPASSEQ(s.next_tx_id, 1);
+  WVPASSEQ(s.session_map.size(), 1);
+  Session &sSession = s.session_map.begin()->second;
+  WVPASS(sSession.remoteaddr_len > 0);
+  WVPASSEQ(sSession.next_tx_id, 1);
 
-  handle_packet(&s, sbase + t);
-
-  t = s.next_send - sbase;
-  WVPASS(!maybe_send_packet(&s, ssock, sbase + t));
-  WVPASSEQ(s.next_send, sbase + t + s.usec_per_pkt);
-  WVPASSEQ(s.next_tx_id, 2);
+  t = s.next_send_time() - sbase;
+  WVPASS(!send_waiting_packets(&s, ssock, sbase + t));
+  WVPASSEQ(s.next_send_time(), sbase + t + sSession.usec_per_pkt);
+  WVPASSEQ(sSession.next_tx_id, 2);
 
   t += sc_latency;
-  WVPASS(!read_incoming_packet(&c, csock, cbase + t));
-  handle_packet(&c, cbase + t);
-  WVPASSEQ(c.lat_rx_count, 1);
+  WVPASS(!read_incoming_packet(&c, csock, cbase + t, is_client));
+  WVPASSEQ(cSession.lat_rx_count, 1);
 
   // Verify we reject garbage data.
   Packet p;
@@ -532,19 +552,18 @@
     return;
   }
 
-  WVPASSEQ(read_incoming_packet(&s, ssock, sbase + t), EINVAL);
+  WVPASSEQ(read_incoming_packet(&s, ssock, sbase + t, is_server), EINVAL);
 
-  // Make a new client, getting a new source port.
-  struct Session c2(cbase);
-  c2.usec_per_pkt *= 2;
-  c2.remoteaddr = c.remoteaddr;
-  c2.remoteaddr_len = c.remoteaddr_len;
+  // Make a new client, who sends more frequently, getting a new source port.
+  Sessions c2;
+  c2.NewSession(cbase, usec_per_pkt/4, &listenaddr, listenaddr_len);
   int c2sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
   if (!WVPASS(c2sock > 0)) {
     perror("client socket 2");
     return;
   }
-  if (!WVPASS(!connect(c2sock, c.remoteaddr, c.remoteaddr_len))) {
+  if (!WVPASS(
+          !connect(c2sock, (struct sockaddr *)&listenaddr, listenaddr_len))) {
     perror("connect");
     return;
   }
@@ -555,20 +574,22 @@
     perror("getsockname");
     return;
   }
+  inet_ntop(AF_INET6, (struct sockaddr *)&c2addr, buf, sizeof(buf));
+  printf("Created new client connection on %s:%d\n", buf,
+         ntohs(c2addr.sin6_port));
 
-  t = c2.next_send - cbase;
-  WVPASS(!maybe_send_packet(&c2, c2sock, cbase + t));
+  Session &c2Session = c2.session_map.begin()->second;
+  t = c2Session.next_send - cbase;
+  WVPASS(!send_waiting_packets(&c2, c2sock, cbase + t));
 
   t += cs_latency;
 
-  // Check that a new client resets some state.
-  WVPASS(!read_incoming_packet(&s, ssock, sbase + t));
-
-  WVPASSEQ(ntohs(((sockaddr_in6 *)s.remoteaddr)->sin6_port),
-           ntohs(c2addr.sin6_port));
-  WVPASSEQ(s.next_tx_id, 1);
-  WVPASSEQ(s.next_rx_id, 0);
-  WVPASSEQ(s.usec_per_pkt, c2.usec_per_pkt);
+  // Check that a new client is added to the server's state, and it will be sent
+  // next.
+  WVPASS(!read_incoming_packet(&s, ssock, sbase + t, is_server));
+  WVPASSEQ(s.session_map.size(), 2);
+  WVPASSEQ(s.next_sends.size(), 2);
+  WVPASSEQ(s.next_send_time(), sbase + t + 10 * 1000);
 
   // Cleanup
   close(ssock);