Track multiple isoping client sessions in the server.
We keep a map of each client's session based on their IP and port, and a
priority_queue of the next scheduled outgoing packet.
Change-Id: I28423f24ccbb268226bab0b815d841f94d84f597
diff --git a/cmds/isoping.cc b/cmds/isoping.cc
index aac7935..05fffd6 100644
--- a/cmds/isoping.cc
+++ b/cmds/isoping.cc
@@ -81,11 +81,11 @@
want_to_die = 1;
}
-Session::Session(uint32_t now)
- : usec_per_pkt(1e6 / packets_per_sec),
+Session::Session(uint32_t first_send, uint32_t usec_per_pkt,
+ const struct sockaddr_storage &raddr, size_t raddr_len)
+ : usec_per_pkt(usec_per_pkt),
usec_per_print(prints_per_sec > 0 ? 1e6 / prints_per_sec : 0),
- remoteaddr(NULL),
- remoteaddr_len(0),
+ remoteaddr_len(raddr_len),
next_tx_id(1),
next_rx_id(0),
next_rxack_id(0),
@@ -94,18 +94,29 @@
last_rxtime(0),
min_cycle_rxdiff(0),
next_cycle(0),
- next_send(now + usec_per_pkt),
+ next_send(first_send),
num_lost(0),
next_txack_index(0),
- last_print(now - usec_per_pkt),
+ last_print(first_send - usec_per_pkt),
lat_tx(0), lat_tx_min(0x7fffffff), lat_tx_max(0),
lat_tx_count(0), lat_tx_sum(0), lat_tx_var_sum(0),
lat_rx(0), lat_rx_min(0x7fffffff), lat_rx_max(0),
lat_rx_count(0), lat_rx_sum(0), lat_rx_var_sum(0) {
+ memcpy(&remoteaddr, &raddr, raddr_len);
memset(&tx, 0, sizeof(tx));
strcpy(last_ackinfo, "");
}
+SessionMap::iterator Sessions::NewSession(uint32_t first_send,
+ uint32_t usec_per_pkt,
+ struct sockaddr_storage *addr,
+ socklen_t addr_len) {
+ std::pair<SessionMap::iterator, bool> p = session_map.insert(std::make_pair(
+ *addr, Session(first_send, usec_per_pkt, *addr, addr_len)));
+ next_sends.push(p.first);
+ return p.first;
+}
+
// Returns the kernel monotonic timestamp in microseconds, truncated to
// 32 bits. That will wrap around every ~4000 seconds, which is okay
// for our purposes. We use 32 bits to save space in our packets.
@@ -189,6 +200,34 @@
return addrbuf;
}
+bool CompareSockaddr::operator()(const struct sockaddr_storage &lhs,
+ const struct sockaddr_storage &rhs) {
+ if (lhs.ss_family != rhs.ss_family) {
+ return lhs.ss_family < rhs.ss_family;
+ }
+ if (lhs.ss_family == AF_INET) {
+ const struct sockaddr_in &lhs4 = *(const struct sockaddr_in*)&lhs;
+ const struct sockaddr_in &rhs4 = *(const struct sockaddr_in*)&rhs;
+ long long c = (ntohl(lhs4.sin_addr.s_addr) - ntohl(rhs4.sin_addr.s_addr));
+ if (c == 0) {
+ return ntohs(lhs4.sin_port) < ntohs(rhs4.sin_port);
+ }
+ return c < 0;
+ } else {
+ const struct sockaddr_in6 &lhs6 = *(const struct sockaddr_in6*)&lhs;
+ const struct sockaddr_in6 &rhs6 = *(const struct sockaddr_in6*)&rhs;
+ int c = memcmp(&lhs6.sin6_addr, &rhs6.sin6_addr, sizeof(struct in6_addr));
+ if (c == 0) {
+ return ntohs(lhs6.sin6_port) < ntohs(rhs6.sin6_port);
+ }
+ return c < 0;
+ }
+}
+
+bool CompareNextSend::operator()(const SessionMap::iterator &lhs,
+ const SessionMap::iterator &rhs) {
+ return lhs->second.next_send > rhs->second.next_send;
+}
// Print the timestamp corresponding to the current time.
// Deliberately the same format as tcpdump uses, so we can easily sort and
@@ -235,7 +274,7 @@
// receive the timing information for the packets it sent us.
if (is_server) {
if (sendto(sock, &s->tx, sizeof(s->tx), 0,
- s->remoteaddr, s->remoteaddr_len) < 0) {
+ (struct sockaddr *)&s->remoteaddr, s->remoteaddr_len) < 0) {
perror("sendto");
}
} else {
@@ -250,59 +289,84 @@
}
-int maybe_send_packet(struct Session *s, int sock, uint32_t now) {
- if (s->remoteaddr && DIFF(now, s->next_send) >= 0) {
- prepare_tx_packet(s);
- int err = send_packet(s, sock);
+int send_waiting_packets(Sessions *sessions, int sock, uint32_t now) {
+ if (sessions == NULL) {
+ return -1;
+ }
+ // TODO(pmccurdy): This will incorrectly send packets too early during the
+ // time period where now + usec_per_pkt wraps around, i.e. about once per 71
+ // minutes, and will end up blasting packets as fast as possible for that
+ // duration. Consider calculating next_send and now as 64-bit values, and
+ // truncating to 32 bits when transmitting.
+ while (sessions->next_sends.size() > 0 &&
+ DIFF(now, sessions->next_send_time()) >= 0) {
+ SessionMap::iterator it = sessions->next_sends.top();
+ sessions->next_sends.pop();
+ Session &s = it->second;
+ prepare_tx_packet(&s);
+ int err = send_packet(&s, sock);
if (err != 0) {
return err;
}
+ // TODO(pmccurdy): Detect connection refused on a per-client basis. Use
+ // recvmsg with the MSG_ERRQUEUE flag to get error and client address,
+ // instead of waiting for timeout.
+ // TODO(pmccurdy): Support very low packet-per-second values, e.g. one
+ // packet per hour, without constantly disconnecting the client.
+ if (is_server && DIFF(now, s.last_rxtime) > 60 * 1000 * 1000) {
+ fprintf(stderr, "client %s disconnected.\n",
+ sockaddr_to_str((struct sockaddr *)&s.remoteaddr));
+ sessions->session_map.erase(s.remoteaddr);
+ } else {
+ sessions->next_sends.push(it);
+ }
}
return 0;
}
-
-int read_incoming_packet(struct Session *s, int sock, uint32_t now) {
- struct sockaddr_in6 rxaddr;
+int read_incoming_packet(Sessions *s, int sock, uint32_t now, int is_server) {
+ struct sockaddr_storage rxaddr;
socklen_t rxaddr_len = 0;
- // TODO(pmccurdy): Temporary until we properly support multiple clients.
- static struct sockaddr_in6 last_rxaddr;
+ Packet rx;
rxaddr_len = sizeof(rxaddr);
- ssize_t got = recvfrom(sock, &s->rx, sizeof(s->rx), 0,
+ ssize_t got = recvfrom(sock, &rx, sizeof(rx), 0,
(struct sockaddr *)&rxaddr, &rxaddr_len);
if (got < 0) {
int e = errno;
perror("recvfrom");
return e;
}
- if (got != sizeof(s->rx) || s->rx.magic != htonl(MAGIC)) {
+ if (got != sizeof(rx) || rx.magic != htonl(MAGIC)) {
fprintf(stderr, "got invalid packet of length %ld\n", (long)got);
return EINVAL;
}
- // is it a new client?
+ SessionMap::iterator it;
if (is_server) {
- // TODO(pmccurdy): Maintain a hash table of Sessions, look up based
- // on rxaddr, create a new one if necessary, remove this resetting code.
- if (!s->remoteaddr ||
- memcmp(&rxaddr, &last_rxaddr, sizeof(rxaddr)) != 0) {
- fprintf(stderr, "new client connected: %s\n",
+ it = s->session_map.find(rxaddr);
+ if (it == s->session_map.end()) {
+ fprintf(stderr, "New client connection: %s\n",
sockaddr_to_str((struct sockaddr *)&rxaddr));
- memcpy(&last_rxaddr, &rxaddr, sizeof(rxaddr));
- s->remoteaddr = (struct sockaddr *)&last_rxaddr;
- s->remoteaddr_len = rxaddr_len;
-
- s->next_send = now + 10*1000;
- s->next_tx_id = 1;
- s->next_rx_id = s->next_rxack_id = 0;
- s->start_rtxtime = s->start_rxtime = 0;
- s->num_lost = 0;
- s->next_txack_index = 0;
- s->usec_per_pkt = ntohl(s->rx.usec_per_pkt);
- memset(&s->tx, 0, sizeof(s->tx));
+ // TODO(pmccurdy): This lets clients unconditionally set the usec_per_pkt
+ // values used. Add some mechanism to let the server override or reject
+ // this value (e.g. limit to a certain range, or reduce per-client pps as
+ // more clients connect).
+ it = s->NewSession(now + 10 * 1000, ntohl(rx.usec_per_pkt), &rxaddr,
+ rxaddr_len);
+ }
+ } else {
+ it = s->session_map.begin();
+ if (it == s->session_map.end()) {
+ fprintf(stderr, "No session configured for %s when receiving packet\n",
+ sockaddr_to_str((struct sockaddr *)&rxaddr));
+ return EINVAL;
}
}
+ Session &session = it->second;
+ memcpy(&session.rx, &rx, sizeof(session.rx));
+ handle_packet(&session, now);
+
return 0;
}
@@ -473,7 +537,7 @@
uint32_t rxtime = rrxtime + offset;
// note: already contains 1/2 rtt, unlike rxdiff
int32_t txdiff = DIFF(rxtime, txtime);
- if (s->usec_per_print <= 0 && s->last_ackinfo[0]) {
+ if (!quiet && s->usec_per_print <= 0 && s->last_ackinfo[0]) {
// only print multiple acks per rx if no usec_per_print limit
if (want_timestamps) print_timestamp(rxtime);
printf("%12s\n", s->last_ackinfo);
@@ -550,8 +614,7 @@
}
uint32_t now = ustime(); // current time
-
- struct Session s(now);
+ Sessions sessions;
if (argc - optind == 0) {
is_server = 1;
@@ -588,8 +651,8 @@
perror("connect");
return 1;
}
- s.remoteaddr = ai->ai_addr;
- s.remoteaddr_len = ai->ai_addrlen;
+ sessions.NewSession(now, 1e6 / packets_per_sec,
+ (struct sockaddr_storage *)ai->ai_addr, ai->ai_addrlen);
} else {
usage_and_die(argv[0]);
}
@@ -626,55 +689,53 @@
tv.tv_sec = 0;
now = ustime();
- if (DIFF(s.next_send, now) < 0) {
+ if (sessions.next_sends.size() == 0 ||
+ DIFF(sessions.next_send_time(), now) < 0) {
tv.tv_usec = 0;
} else {
- tv.tv_usec = DIFF(s.next_send, now);
+ tv.tv_usec = DIFF(sessions.next_send_time(), now);
}
- int nfds = select(sock + 1, &rfds, NULL, NULL, s.remoteaddr ? &tv : NULL);
+ int nfds = select(sock + 1, &rfds, NULL, NULL,
+ sessions.next_sends.size() > 0 ? &tv : NULL);
now = ustime();
if (nfds < 0 && errno != EINTR) {
perror("select");
return 1;
}
- // time to send the next packet?
- int err = maybe_send_packet(&s, sock, now);
+ int err = send_waiting_packets(&sessions, sock, now);
if (err != 0) {
return err;
}
- // TODO(pmccurdy): Track disconnections across multiple clients. Use
- // recvmsg with the MSG_ERRQUEUE flag to detect connection refused.
- if (is_server && DIFF(now, s.last_rxtime) > 60 * 1000 * 1000) {
- fprintf(stderr, "client disconnected.\n");
- s.remoteaddr = NULL;
- }
if (nfds > 0) {
- err = read_incoming_packet(&s, sock, now);
+ err = read_incoming_packet(&sessions, sock, now, is_server);
if (!is_server && err == ECONNREFUSED) return 2;
if (err != 0) {
continue;
}
- handle_packet(&s, now);
}
}
- // TODO(pmccurdy): Separate out per-client and global stats.
- printf("\n---\n");
- printf("tx: min/avg/max/mdev = %.2f/%.2f/%.2f/%.2f ms\n",
- s.lat_tx_min / 1000.0,
- DIV(s.lat_tx_sum, s.lat_tx_count) / 1000.0,
- s.lat_tx_max / 1000.0,
- onepass_stddev(
- s.lat_tx_var_sum, s.lat_tx_sum, s.lat_tx_count) / 1000.0);
- printf("rx: min/avg/max/mdev = %.2f/%.2f/%.2f/%.2f ms\n",
- s.lat_rx_min / 1000.0,
- DIV(s.lat_rx_sum, s.lat_rx_count) / 1000.0,
- s.lat_rx_max / 1000.0,
- onepass_stddev(
- s.lat_rx_var_sum, s.lat_rx_sum, s.lat_rx_count) / 1000.0);
- printf("\n");
+ // TODO(pmccurdy): Separate out per-client and global stats, print stats for
+ // the server when each client disconnects.
+ if (!is_server) {
+ Session &s = sessions.session_map.begin()->second;
+ printf("\n---\n");
+ printf("tx: min/avg/max/mdev = %.2f/%.2f/%.2f/%.2f ms\n",
+ s.lat_tx_min / 1000.0,
+ DIV(s.lat_tx_sum, s.lat_tx_count) / 1000.0,
+ s.lat_tx_max / 1000.0,
+ onepass_stddev(
+ s.lat_tx_var_sum, s.lat_tx_sum, s.lat_tx_count) / 1000.0);
+ printf("rx: min/avg/max/mdev = %.2f/%.2f/%.2f/%.2f ms\n",
+ s.lat_rx_min / 1000.0,
+ DIV(s.lat_rx_sum, s.lat_rx_count) / 1000.0,
+ s.lat_rx_max / 1000.0,
+ onepass_stddev(
+ s.lat_rx_var_sum, s.lat_rx_sum, s.lat_rx_count) / 1000.0);
+ printf("\n");
+ }
if (ai) freeaddrinfo(ai);
if (sock >= 0) close(sock);
diff --git a/cmds/isoping.h b/cmds/isoping.h
index 11feae2..7100a51 100644
--- a/cmds/isoping.h
+++ b/cmds/isoping.h
@@ -16,6 +16,9 @@
#ifndef ISOPING_H
#define ISOPING_H
+#include <map>
+#include <netinet/in.h>
+#include <queue>
#include <stdint.h>
#include <sys/socket.h>
@@ -40,12 +43,13 @@
// Data we track per session.
struct Session {
- Session(uint32_t now);
+ Session(uint32_t first_send, uint32_t usec_per_pkt,
+ const struct sockaddr_storage &remoteaddr, size_t remoteaddr_len);
int32_t usec_per_pkt;
int32_t usec_per_print;
// The peer's address.
- struct sockaddr *remoteaddr;
+ struct sockaddr_storage remoteaddr;
socklen_t remoteaddr_len;
// WARNING: lots of math below relies on well-defined uint32/int32
@@ -75,18 +79,60 @@
lat_rx_count, lat_rx_sum, lat_rx_var_sum;
};
+// Comparator for use with sockaddr_in and sockaddr_in6 values. Sorts by
+// address family first, then on the IPv4/6 address, then the port number.
+struct CompareSockaddr {
+ bool operator()(const struct sockaddr_storage &lhs,
+ const struct sockaddr_storage &rhs);
+};
+
+typedef std::map<struct sockaddr_storage, Session, CompareSockaddr>
+ SessionMap;
+
+// Compares the next_send values of each referenced Session, sorting the earlier
+// timestamps first.
+struct CompareNextSend {
+ bool operator()(const SessionMap::iterator &lhs,
+ const SessionMap::iterator &rhs);
+};
+
+struct Sessions {
+ public:
+ Sessions() {}
+
+ // All active sessions, indexed by remote address/port.
+ SessionMap session_map;
+ // A queue of upcoming send times, ordered most recent first, referencing
+ // entries in the session map.
+ std::priority_queue<SessionMap::iterator, std::vector<SessionMap::iterator>,
+ CompareNextSend> next_sends;
+
+ SessionMap::iterator NewSession(uint32_t first_send,
+ uint32_t usec_per_pkt,
+ struct sockaddr_storage *addr,
+ socklen_t addr_len);
+
+ uint32_t next_send_time() {
+ if (next_sends.size() == 0) {
+ return 0;
+ }
+ return next_sends.top()->second.next_send;
+ }
+};
+
// Process the Session's incoming packet, from s->rx.
void handle_packet(struct Session *s, uint32_t now);
// Sets all the elements of s->tx to be ready to be sent to the other side.
void prepare_tx_packet(struct Session *s);
-// Sends a packet to the socket if the appropriate amount of time has passed.
-int maybe_send_packet(struct Session *s, int sock, uint32_t now);
+// Sends a packet to all waiting sessions where the appropriate amount of time
+// has passed.
+int send_waiting_packets(Sessions *s, int sock, uint32_t now);
// Reads a packet from sock and stores it in s->rx. Assumes a packet is
// currently readable.
-int read_incoming_packet(struct Session *s, int sock, uint32_t now);
+int read_incoming_packet(Sessions *s, int sock, uint32_t now, int is_server);
// Parses arguments and runs the main loop. Distinct from main() for unit test
// purposes.
diff --git a/cmds/isoping_test.cc b/cmds/isoping_test.cc
index 572490b..a8da304 100644
--- a/cmds/isoping_test.cc
+++ b/cmds/isoping_test.cc
@@ -15,6 +15,7 @@
*/
#include <arpa/inet.h>
+#include <errno.h>
#include <limits.h>
#include <memory.h>
#include <stdio.h>
@@ -56,10 +57,12 @@
uint32_t cbase = 400 * 1000;
uint32_t sbase = 600 * 1000;
uint32_t real_clockdiff = sbase - cbase;
+ uint32_t usec_per_pkt = 100 * 1000;
// The states of the client and server.
- struct Session c(cbase);
- struct Session s(sbase);
+ struct sockaddr_storage empty_sockaddr;
+ struct Session c(cbase, usec_per_pkt, empty_sockaddr, sizeof(empty_sockaddr));
+ struct Session s(sbase, usec_per_pkt, empty_sockaddr, sizeof(empty_sockaddr));
// One-way latencies: cs_latency is the latency from client to server;
// sc_latency is from server to client.
@@ -72,9 +75,6 @@
// Send the initial packet from client to server. This isn't enough to let us
// draw any useful latency conclusions.
- // TODO(pmccurdy): Setting next_send is duplicating some work done in the main
- // loop / send_packet. Extract that into somewhere testable, then test it.
- c.next_send = cbase;
t = send_next_packet(&c, cbase, &s, sbase, cs_latency);
uint32_t rxtime = sbase + t;
s.next_send = rxtime + 10 * 1000;
@@ -256,10 +256,12 @@
WVTEST_MAIN("isoping clock drift") {
uint32_t cbase = 1400 * 1000;
uint32_t sbase = 1600 * 1000;
+ uint32_t usec_per_pkt = 100 * 1000;
// The states of the client and server.
- struct Session c(cbase);
- struct Session s(sbase);
+ struct sockaddr_storage empty_sockaddr;
+ struct Session c(cbase, usec_per_pkt, empty_sockaddr, sizeof(empty_sockaddr));
+ struct Session s(sbase, usec_per_pkt, empty_sockaddr, sizeof(empty_sockaddr));
// Send packets infrequently, to get new cycles more often.
s.usec_per_pkt = 1 * 1000 * 1000;
c.usec_per_pkt = 1 * 1000 * 1000;
@@ -424,10 +426,6 @@
uint32_t cbase = 1400 * 1000;
uint32_t sbase = 1600 * 1000;
- // The states of the client and server.
- struct Session c(cbase);
- struct Session s(sbase);
-
// Sockets for the client and server.
int ssock, csock;
struct addrinfo hints, *res;
@@ -461,7 +459,7 @@
}
// Figure out the local port we got.
- struct sockaddr_in6 listenaddr;
+ struct sockaddr_storage listenaddr;
socklen_t listenaddr_len = sizeof(listenaddr);
memset(&listenaddr, 0, listenaddr_len);
if (!WVPASS(!getsockname(ssock, (struct sockaddr *)&listenaddr,
@@ -470,7 +468,10 @@
return;
}
- printf("Bound server socket to port=%d\n", listenaddr.sin6_port);
+ printf("Bound server socket to port=%d\n",
+ listenaddr.ss_family == AF_INET
+ ? ntohs(((struct sockaddr_in *)&listenaddr)->sin_port)
+ : ntohs(((struct sockaddr_in6 *)&listenaddr)->sin6_port));
// Connect the client's socket.
if (!WVPASS(
@@ -478,14 +479,30 @@
perror("connect");
return;
}
+ struct sockaddr_in6 caddr;
+ socklen_t caddr_len = sizeof(caddr);
+ memset(&caddr, 0, caddr_len);
+ if (!WVPASS(!getsockname(csock, (struct sockaddr *)&caddr, &caddr_len))) {
+ perror("getsockname");
+ return;
+ }
+ char buf[128];
+ inet_ntop(AF_INET6, (struct sockaddr *)&caddr, buf, sizeof(buf));
+ printf("Created client connection on %s:%d\n", buf, ntohs(caddr.sin6_port));
- c.remoteaddr = (struct sockaddr *)&listenaddr;
- c.remoteaddr_len = listenaddr_len;
+ // All active sessions for the client and server.
+ Sessions c;
+ Sessions s;
+ uint32_t usec_per_pkt = 100 * 1000;
+
+ c.NewSession(cbase + 1, usec_per_pkt, &listenaddr, listenaddr_len);
uint32_t cs_latency = 4000;
uint32_t sc_latency = 5000;
- uint32_t t = c.usec_per_pkt - 1;
- WVPASS(!maybe_send_packet(&c, csock, cbase + t));
+
+ Session &cSession = c.session_map.begin()->second;
+ uint32_t t = cSession.next_send - cbase - 1;
+ WVPASS(!send_waiting_packets(&c, csock, cbase + t));
// Verify we didn't send a packet before its time.
fd_set rfds;
@@ -497,8 +514,8 @@
// Send a packet in each direction.
t += 1;
- WVPASS(!maybe_send_packet(&c, csock, cbase + t));
- WVPASSEQ(c.next_tx_id, 2);
+ WVPASS(!send_waiting_packets(&c, csock, cbase + t));
+ WVPASSEQ(cSession.next_tx_id, 2);
FD_ZERO(&rfds);
FD_SET(ssock, &rfds);
@@ -506,23 +523,26 @@
WVPASSEQ(nfds, 1);
t += cs_latency;
- WVPASS(!read_incoming_packet(&s, ssock, sbase + t));
+ int is_server = 1;
+ int is_client = 0;
+ WVPASS(!read_incoming_packet(&s, ssock, sbase + t, is_server));
+ WVPASSEQ(s.session_map.size(), 1);
+ WVPASSEQ(s.next_sends.size(), 1);
+ WVPASSEQ(s.next_send_time(), sbase + t + 10 * 1000);
- WVPASS(s.remoteaddr != NULL);
- WVPASS(s.remoteaddr_len > 0);
- WVPASSEQ(s.next_tx_id, 1);
+ WVPASSEQ(s.session_map.size(), 1);
+ Session &sSession = s.session_map.begin()->second;
+ WVPASS(sSession.remoteaddr_len > 0);
+ WVPASSEQ(sSession.next_tx_id, 1);
- handle_packet(&s, sbase + t);
-
- t = s.next_send - sbase;
- WVPASS(!maybe_send_packet(&s, ssock, sbase + t));
- WVPASSEQ(s.next_send, sbase + t + s.usec_per_pkt);
- WVPASSEQ(s.next_tx_id, 2);
+ t = s.next_send_time() - sbase;
+ WVPASS(!send_waiting_packets(&s, ssock, sbase + t));
+ WVPASSEQ(s.next_send_time(), sbase + t + sSession.usec_per_pkt);
+ WVPASSEQ(sSession.next_tx_id, 2);
t += sc_latency;
- WVPASS(!read_incoming_packet(&c, csock, cbase + t));
- handle_packet(&c, cbase + t);
- WVPASSEQ(c.lat_rx_count, 1);
+ WVPASS(!read_incoming_packet(&c, csock, cbase + t, is_client));
+ WVPASSEQ(cSession.lat_rx_count, 1);
// Verify we reject garbage data.
Packet p;
@@ -532,19 +552,18 @@
return;
}
- WVPASSEQ(read_incoming_packet(&s, ssock, sbase + t), EINVAL);
+ WVPASSEQ(read_incoming_packet(&s, ssock, sbase + t, is_server), EINVAL);
- // Make a new client, getting a new source port.
- struct Session c2(cbase);
- c2.usec_per_pkt *= 2;
- c2.remoteaddr = c.remoteaddr;
- c2.remoteaddr_len = c.remoteaddr_len;
+ // Make a new client, who sends more frequently, getting a new source port.
+ Sessions c2;
+ c2.NewSession(cbase, usec_per_pkt/4, &listenaddr, listenaddr_len);
int c2sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (!WVPASS(c2sock > 0)) {
perror("client socket 2");
return;
}
- if (!WVPASS(!connect(c2sock, c.remoteaddr, c.remoteaddr_len))) {
+ if (!WVPASS(
+ !connect(c2sock, (struct sockaddr *)&listenaddr, listenaddr_len))) {
perror("connect");
return;
}
@@ -555,20 +574,22 @@
perror("getsockname");
return;
}
+ inet_ntop(AF_INET6, (struct sockaddr *)&c2addr, buf, sizeof(buf));
+ printf("Created new client connection on %s:%d\n", buf,
+ ntohs(c2addr.sin6_port));
- t = c2.next_send - cbase;
- WVPASS(!maybe_send_packet(&c2, c2sock, cbase + t));
+ Session &c2Session = c2.session_map.begin()->second;
+ t = c2Session.next_send - cbase;
+ WVPASS(!send_waiting_packets(&c2, c2sock, cbase + t));
t += cs_latency;
- // Check that a new client resets some state.
- WVPASS(!read_incoming_packet(&s, ssock, sbase + t));
-
- WVPASSEQ(ntohs(((sockaddr_in6 *)s.remoteaddr)->sin6_port),
- ntohs(c2addr.sin6_port));
- WVPASSEQ(s.next_tx_id, 1);
- WVPASSEQ(s.next_rx_id, 0);
- WVPASSEQ(s.usec_per_pkt, c2.usec_per_pkt);
+ // Check that a new client is added to the server's state, and it will be sent
+ // next.
+ WVPASS(!read_incoming_packet(&s, ssock, sbase + t, is_server));
+ WVPASSEQ(s.session_map.size(), 2);
+ WVPASSEQ(s.next_sends.size(), 2);
+ WVPASSEQ(s.next_send_time(), sbase + t + 10 * 1000);
// Cleanup
close(ssock);