| /* |
| * |
| * (C) 2012 - Luca Deri <deri@ntop.org> |
| * Alfredo Cardigliano <cardigliano@ntop.org> |
| * |
| * |
| * This program is free software; you can redistribute it and/or modify |
| * it under the terms of the GNU General Public License as published by |
| * the Free Software Foundation; either version 2 of the License, or |
| * (at your option) any later version. |
| * |
| * This program is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| * GNU General Public License for more details. |
| * |
| * You should have received a copy of the GNU General Public License |
| * along with this program; if not, write to the Free Software Foundation, |
| * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. |
| * |
| */ |
| |
| #ifndef _GNU_SOURCE |
| #define _GNU_SOURCE |
| #endif |
| #include <signal.h> |
| #include <sched.h> |
| #include <sys/types.h> |
| #include <sys/stat.h> |
| #include <fcntl.h> |
| #include <string.h> |
| #include <unistd.h> |
| #include <sys/mman.h> |
| #include <errno.h> |
| #include <sys/poll.h> |
| #include <netinet/in_systm.h> |
| #include <netinet/in.h> |
| #include <netinet/ip.h> |
| #include <netinet/ip6.h> |
| #include <net/ethernet.h> |
| #include <sys/time.h> |
| #include <time.h> |
| #include <sys/socket.h> |
| #include <arpa/inet.h> |
| |
| #include "pfring.h" |
| #include "pfutils.c" |
| |
| #define ALARM_SLEEP 1 |
| #define MAX_NUM_THREADS DNA_CLUSTER_MAX_NUM_SLAVES |
| #define DEFAULT_DEVICE "dna0" |
| |
| u_int numCPU; |
| static struct timeval startTime; |
| u_int8_t wait_for_packet = 1, print_interface_stats = 0, do_shutdown = 0; |
| int rx_bind_core = 1; /* core 0 free if possible */ |
| int hashing_mode = 0; |
| int bridge_interfaces = 0, use_hugepages = 0, low_latency = 0; |
| int cluster_id = -1; |
| pfring_dna_cluster *dna_cluster_handle; |
| |
| #define DIRECTIONS 2 |
| #define DIR_1 0 |
| #define DIR_2 1 |
| pfring *rss_rings[DIRECTIONS][MAX_NUM_THREADS]; |
| |
| int num_dev = 0; |
| pfring *pd[DIRECTIONS]; |
| int if_indexes[DIRECTIONS]; |
| |
| int num_threads = 1; |
| struct thread_info { |
| pfring *ring; |
| int thread_core_affinity; |
| pthread_t pd_thread __attribute__((__aligned__(64))); |
| u_int64_t numPkts; |
| u_int64_t numBytes __attribute__((__aligned__(64))); |
| }; |
| struct thread_info thread_stats[MAX_NUM_THREADS]; |
| |
| /* ******************************** */ |
| |
| void print_stats() { |
| static u_int64_t lastPkts[MAX_NUM_THREADS] = { 0 }; |
| static u_int64_t lastRXPkts = 0, lastTXPkts = 0, lastRXProcPkts = 0; |
| static struct timeval lastTime; |
| pfring_stat pfringStat; |
| struct timeval endTime; |
| double delta, deltaABS; |
| u_int64_t diff; |
| u_int64_t RXdiff, TXdiff, RXProcdiff; |
| pfring_dna_cluster_stat cluster_stats; |
| char buf1[32], buf2[32], buf3[32]; |
| int i; |
| |
| if(startTime.tv_sec == 0) { |
| gettimeofday(&startTime, NULL); |
| return; |
| } |
| |
| gettimeofday(&endTime, NULL); |
| deltaABS = delta_time(&endTime, &startTime); |
| |
| delta = delta_time(&endTime, &lastTime); |
| |
| for(i=0; i < num_threads; i++) { |
| if(pfring_stats(thread_stats[i].ring, &pfringStat) >= 0) { |
| double thpt = ((double)8*thread_stats[i].numBytes)/(deltaABS*1000); |
| |
| fprintf(stderr, "=========================\n" |
| "Thread %d\n" |
| "Absolute Stats: [%u pkts rcvd][%lu bytes rcvd]\n" |
| " [%u total pkts][%u pkts dropped (%.1f %%)]\n" |
| " [%s pkt/sec][%.2f Mbit/sec]\n", i, |
| (unsigned int) thread_stats[i].numPkts, |
| (long unsigned int)thread_stats[i].numBytes, |
| (unsigned int) (thread_stats[i].numPkts+pfringStat.drop), |
| (unsigned int) pfringStat.drop, |
| thread_stats[i].numPkts == 0 ? 0 : (double)(pfringStat.drop*100)/(double)(thread_stats[i].numPkts+pfringStat.drop), |
| pfring_format_numbers(((double)(thread_stats[i].numPkts*1000)/deltaABS), buf1, sizeof(buf1), 1), |
| thpt); |
| |
| if(lastTime.tv_sec > 0) { |
| // double pps; |
| |
| diff = thread_stats[i].numPkts-lastPkts[i]; |
| // pps = ((double)diff/(double)(delta/1000)); |
| fprintf(stderr, "Actual Stats: [%llu pkts][%.1f ms][%s pkt/sec]\n", |
| (long long unsigned int) diff, |
| delta, |
| pfring_format_numbers(((double)diff/(double)(delta/1000)), buf1, sizeof(buf1), 1)); |
| } |
| |
| lastPkts[i] = thread_stats[i].numPkts; |
| } |
| } |
| |
| if(dna_cluster_stats(dna_cluster_handle, &cluster_stats) == 0) { |
| if(lastTime.tv_sec > 0) { |
| RXdiff = cluster_stats.tot_rx_packets - lastRXPkts; |
| RXProcdiff = cluster_stats.tot_rx_processed - lastRXProcPkts; |
| TXdiff = cluster_stats.tot_tx_packets - lastTXPkts; |
| |
| fprintf(stderr, "=========================\n" |
| "Aggregate Actual Stats: [Captured %s pkt/sec][Processed %s pkt/sec][Sent %s pkt/sec]\n", |
| pfring_format_numbers(((double)RXdiff/(double)(delta/1000)), buf1, sizeof(buf1), 1), |
| pfring_format_numbers(((double)RXProcdiff/(double)(delta/1000)), buf2, sizeof(buf2), 1), |
| pfring_format_numbers(((double)TXdiff/(double)(delta/1000)), buf3, sizeof(buf3), 1)); |
| } |
| |
| lastRXPkts = cluster_stats.tot_rx_packets; |
| lastRXProcPkts = cluster_stats.tot_rx_processed; |
| lastTXPkts = cluster_stats.tot_tx_packets; |
| } |
| |
| if (print_interface_stats) { |
| pfring_stat if_stats; |
| fprintf(stderr, "=========================\nInterface Absolute Stats\n"); |
| for (i = 0; i < num_dev; i++) |
| if (pfring_stats(pd[i], &if_stats) >= 0) |
| fprintf(stderr, "%s RX [%" PRIu64 " pkts rcvd][%" PRIu64 " pkts dropped (%.1f %%)]\n", |
| pd[i]->device_name, if_stats.recv, if_stats.drop, |
| if_stats.recv == 0 ? 0 : ((double)(if_stats.drop*100)/(double)(if_stats.recv + if_stats.drop))); |
| } |
| |
| |
| fprintf(stderr, "=========================\n\n"); |
| |
| lastTime.tv_sec = endTime.tv_sec, lastTime.tv_usec = endTime.tv_usec; |
| } |
| |
| /* ******************************** */ |
| |
| void sigproc(int sig) { |
| static int called = 0; |
| int i; |
| |
| fprintf(stderr, "Leaving...\n"); |
| |
| if(called) return; |
| else called = 1; |
| |
| dna_cluster_disable(dna_cluster_handle); |
| |
| print_stats(); |
| |
| for(i=0; i<num_threads; i++) |
| pfring_shutdown(thread_stats[i].ring); |
| |
| do_shutdown = 1; |
| } |
| |
| /* ******************************** */ |
| |
| void my_sigalarm(int sig) { |
| if (do_shutdown) |
| return; |
| |
| print_stats(); |
| alarm(ALARM_SLEEP); |
| signal(SIGALRM, my_sigalarm); |
| } |
| |
| /* *************************************** */ |
| |
| void printHelp(void) { |
| printf("pfdnacluster_mt_rss_frwd - (C) 2012 ntop.org\n\n"); |
| printf("Forward packets received from a DNA Cluster in zero-copy to a DNA device\n" |
| "using a per-thread RSS channel. The \"all rx packets to queue 0\" RSS mode\n" |
| "is applied to the devices involved in order to capture from a single queue.\n\n"); |
| printf("-h Print this help\n"); |
| printf("-i <device> Ingress device name\n"); |
| printf("-o <device> Egress device name\n"); |
| printf("-b Bridge mode (bidirectional)\n"); |
| printf("-c <id> DNA Cluster ID\n"); |
| printf("-n <num> Number of consumer threads (<= num rss queues)\n"); |
| printf("-r <core> Bind the RX thread to a core\n"); |
| printf("-g <id:id...> Specifies the thread affinity mask (consumers). Each <id> represents\n" |
| " the codeId where the i-th will bind. Example: -g 7:6:5:4 binds thread\n" |
| " 0 on coreId 7, 1 on coreId 6 and so on.\n"); |
| printf("-m <hash mode> Hashing modes:\n" |
| " 0 - IP hash (default)\n" |
| " 1 - MAC Address hash\n" |
| " 2 - IP protocol hash\n"); |
| printf("-a Active packet wait\n"); |
| printf("-l Low latency (worse throughput)\n"); |
| printf("-u <mountpoint> Use hugepages for packet memory allocation\n"); |
| printf("-p Print per-interface absolute stats\n"); |
| exit(0); |
| } |
| |
| /* *************************************** */ |
| |
| static inline u_int32_t master_custom_hash_function(const u_char *buffer, const u_int16_t buffer_len) { |
| u_int32_t l3_offset = sizeof(struct compact_eth_hdr); |
| u_int16_t eth_type; |
| |
| if(hashing_mode == 1 /* MAC hash */) |
| return(buffer[3] + buffer[4] + buffer[5] + buffer[9] + buffer[10] + buffer[11]); |
| |
| eth_type = (buffer[12] << 8) + buffer[13]; |
| |
| while (eth_type == 0x8100 /* VLAN */) { |
| l3_offset += 4; |
| eth_type = (buffer[l3_offset - 2] << 8) + buffer[l3_offset - 1]; |
| } |
| |
| switch (eth_type) { |
| case 0x0800: |
| { |
| /* IPv4 */ |
| struct compact_ip_hdr *iph; |
| |
| if (unlikely(buffer_len < l3_offset + sizeof(struct compact_ip_hdr))) |
| return 0; |
| |
| iph = (struct compact_ip_hdr *) &buffer[l3_offset]; |
| |
| if(hashing_mode == 0 /* IP hash */) |
| return ntohl(iph->saddr) + ntohl(iph->daddr); /* this can be optimized by avoiding calls to ntohl(), but it can lead to balancing issues */ |
| else /* IP protocol hash */ |
| return iph->protocol; |
| } |
| break; |
| case 0x86DD: |
| { |
| /* IPv6 */ |
| struct compact_ipv6_hdr *ipv6h; |
| u_int32_t *s, *d; |
| |
| if (unlikely(buffer_len < l3_offset + sizeof(struct compact_ipv6_hdr))) |
| return 0; |
| |
| ipv6h = (struct compact_ipv6_hdr *) &buffer[l3_offset]; |
| |
| if(hashing_mode == 0 /* IP hash */) { |
| s = (u_int32_t *) &ipv6h->saddr, d = (u_int32_t *) &ipv6h->daddr; |
| return(s[0] + s[1] + s[2] + s[3] + d[0] + d[1] + d[2] + d[3]); |
| } else |
| return(ipv6h->nexthdr); |
| } |
| break; |
| default: |
| return 0; /* Unknown protocol */ |
| } |
| } |
| |
| /* ******************************* */ |
| |
| static int master_distribution_function(const u_char *buffer, const u_int16_t buffer_len, const pfring_dna_cluster_slaves_info *slaves_info, u_int32_t *id_mask, u_int32_t *hash) { |
| u_int32_t slave_idx; |
| |
| /* computing a bidirectional software hash */ |
| *hash = master_custom_hash_function(buffer, buffer_len); |
| |
| /* balancing on hash */ |
| slave_idx = (*hash) % slaves_info->num_slaves; |
| *id_mask = (1 << slave_idx); |
| |
| return DNA_CLUSTER_PASS; |
| } |
| |
| /* *************************************** */ |
| |
| void* packet_consumer_thread(void *_id) { |
| int rc; |
| long thread_id = (long)_id; |
| pfring_pkt_buff *pkt_handle = NULL; |
| struct pfring_pkthdr hdr; |
| |
| if (numCPU > 1) { |
| #ifdef HAVE_PTHREAD_SETAFFINITY_NP |
| /* Bind this thread to a specific core */ |
| cpu_set_t cpuset; |
| u_long core_id; |
| int s; |
| |
| if (thread_stats[thread_id].thread_core_affinity != -1) |
| core_id = thread_stats[thread_id].thread_core_affinity % numCPU; |
| else |
| core_id = (rx_bind_core + 1 + thread_id) % numCPU; |
| |
| CPU_ZERO(&cpuset); |
| CPU_SET(core_id, &cpuset); |
| if ((s = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset)) != 0) |
| fprintf(stderr, "Error while binding thread %ld to core %ld: errno=%i\n", |
| thread_id, core_id, s); |
| else { |
| printf("Set thread %lu on core %lu/%u\n", thread_id, core_id, numCPU); |
| } |
| #endif |
| } |
| |
| memset(&hdr, 0, sizeof(hdr)); |
| |
| if ((pkt_handle = pfring_alloc_pkt_buff(thread_stats[thread_id].ring)) == NULL) { |
| printf("Error allocating pkt buff\n"); |
| return NULL; |
| } |
| |
| while (!do_shutdown) { |
| /* tx is not enabled in the cluster, but we will send |
| * received packets to a tx ring using zero-copy anyway */ |
| rc = pfring_recv_pkt_buff(thread_stats[thread_id].ring, pkt_handle, &hdr, wait_for_packet); |
| |
| if (rc > 0) { |
| if (bridge_interfaces && hdr.extended_hdr.if_index == if_indexes[DIR_2]) |
| pfring_send_pkt_buff(rss_rings[DIR_2][thread_id], pkt_handle, low_latency /* flush */); |
| else |
| pfring_send_pkt_buff(rss_rings[DIR_1][thread_id], pkt_handle, low_latency /* flush */); |
| |
| thread_stats[thread_id].numPkts++; |
| thread_stats[thread_id].numBytes += hdr.len + 24 /* 8 Preamble + 4 CRC + 12 IFG */; |
| } else { |
| if (!wait_for_packet) |
| usleep(1); //sched_yield(); |
| } |
| } |
| |
| return(NULL); |
| } |
| |
| /* *************************************** */ |
| |
| int open_rss_rings(char *rss_device, pfring **rss_rings) { |
| long i; |
| int rc, num_channels; |
| |
| num_channels = pfring_open_multichannel(rss_device, 1500 /* snaplen */, PF_RING_PROMISC | PF_RING_DNA_FIXED_RSS_Q_0, rss_rings); |
| |
| if(num_channels < num_threads) { |
| fprintf(stderr, "Error: interface %s has %u channels available, you need at least %u as the number of slave threads\n", |
| rss_device, num_channels, num_threads); |
| return -1; |
| } |
| |
| for(i=0; i<num_channels; i++) { |
| char buf[32]; |
| snprintf(buf, sizeof(buf), "pfdnacluster-%s-channel-%ld", rss_device, i); |
| pfring_set_application_name(rss_rings[i], buf); |
| |
| if((rc = pfring_set_socket_mode(rss_rings[i], send_only_mode)) != 0) |
| fprintf(stderr, "pfring_set_socket_mode returned [rc=%d]\n", rc); |
| } |
| |
| return 0; |
| } |
| |
| /* *************************************** */ |
| |
| int open_rx_ring(char *rx_device) { |
| u_int32_t version; |
| char buf[32]; |
| |
| pd[num_dev] = pfring_open(rx_device, 1500 /* snaplen */, PF_RING_PROMISC | PF_RING_DNA_FIXED_RSS_Q_0 /* if RSS is enabled, steer all to queue 0 */); |
| if(pd[num_dev] == NULL) { |
| printf("pfring_open %s error [%s]\n", rx_device, strerror(errno)); |
| return(-1); |
| } |
| |
| if (num_dev == 0) { |
| pfring_version(pd[num_dev], &version); |
| printf("Using PF_RING v.%d.%d.%d\n", (version & 0xFFFF0000) >> 16, |
| (version & 0x0000FF00) >> 8, version & 0x000000FF); |
| } |
| |
| snprintf(buf, sizeof(buf), "pfdnacluster-%u-%s", cluster_id, rx_device); |
| pfring_set_application_name(pd[num_dev], buf); |
| |
| if (bridge_interfaces && pfring_get_bound_device_ifindex(pd[num_dev], &if_indexes[num_dev]) < 0) { |
| fprintf(stderr, "Error reading interface id\n"); |
| dna_cluster_destroy(dna_cluster_handle); |
| return -1; |
| } |
| |
| if (bridge_interfaces && num_dev > 0) |
| printf("Bridging interfaces %d <-> %d\n", if_indexes[0], if_indexes[1]); |
| |
| /* Add the ring we created to the cluster */ |
| if (dna_cluster_register_ring(dna_cluster_handle, pd[num_dev]) < 0) { |
| fprintf(stderr, "Error registering rx socket\n"); |
| dna_cluster_destroy(dna_cluster_handle); |
| return -1; |
| } |
| |
| num_dev++; |
| return 0; |
| } |
| |
| /* *************************************** */ |
| |
| int main(int argc, char* argv[]) { |
| char c; |
| char buf[32]; |
| char *bind_mask = NULL; |
| char *in_device = NULL, *out_device = NULL, *hugepages_mountpoint = NULL; |
| socket_mode mode = recv_only_mode; |
| int rc; |
| long i; |
| |
| memset(rss_rings, 0, sizeof(rss_rings)); |
| |
| memset(thread_stats, 0, sizeof(thread_stats)); |
| for (i = 0; i < MAX_NUM_THREADS; i++) |
| thread_stats[i].thread_core_affinity = -1; |
| |
| numCPU = sysconf( _SC_NPROCESSORS_ONLN ); |
| startTime.tv_sec = 0; |
| |
| while ((c = getopt(argc,argv,"ahi:bc:n:m:r:g:pu:lo:")) != -1) { |
| switch (c) { |
| case 'a': |
| wait_for_packet = 0; |
| break; |
| case 'r': |
| rx_bind_core = atoi(optarg); |
| break; |
| case 'g': |
| bind_mask = strdup(optarg); |
| break; |
| case 'h': |
| printHelp(); |
| break; |
| case 'i': |
| in_device = strdup(optarg); |
| break; |
| case 'o': |
| out_device = strdup(optarg); |
| break; |
| case 'l': |
| low_latency = 1; |
| break; |
| case 'c': |
| cluster_id = atoi(optarg); |
| break; |
| case 'n': |
| num_threads = atoi(optarg); |
| break; |
| case 'm': |
| hashing_mode = atoi(optarg); |
| break; |
| case 'b': |
| bridge_interfaces = 1; |
| break; |
| case 'p': |
| print_interface_stats = 1; |
| break; |
| case 'u': |
| use_hugepages = 1; |
| hugepages_mountpoint = strdup(optarg); |
| break; |
| } |
| } |
| |
| if (out_device == NULL || cluster_id < 0 || num_threads < 1 || hashing_mode < 0 || hashing_mode > 2) |
| printHelp(); |
| |
| if (num_threads > MAX_NUM_THREADS) { |
| printf("WARNING: You cannot instantiate more than %u slave threads\n", MAX_NUM_THREADS); |
| num_threads = MAX_NUM_THREADS; |
| } |
| |
| if (in_device == NULL) in_device = strdup(DEFAULT_DEVICE); |
| |
| if(bind_mask != NULL) { |
| char *id = strtok(bind_mask, ":"); |
| int idx = 0; |
| |
| while(id != NULL) { |
| thread_stats[idx++].thread_core_affinity = atoi(id) % numCPU; |
| if(idx >= num_threads) break; |
| id = strtok(NULL, ":"); |
| } |
| } |
| |
| if (open_rss_rings(out_device, rss_rings[DIR_1]) < 0) |
| return -1; |
| |
| if (bridge_interfaces) { |
| if (open_rss_rings(in_device, rss_rings[DIR_2]) < 0) |
| return -1; |
| } |
| |
| printf("Capturing from %s", in_device); |
| if (bridge_interfaces) |
| printf(" and %s", out_device); |
| printf("\n"); |
| |
| /* Create the DNA cluster */ |
| if ((dna_cluster_handle = dna_cluster_create(cluster_id, |
| num_threads, |
| 0 |
| /* | DNA_CLUSTER_DCA */ |
| | (use_hugepages ? DNA_CLUSTER_HUGEPAGES : 0) |
| )) == NULL) { |
| fprintf(stderr, "Error creating DNA Cluster\n"); |
| return(-1); |
| } |
| |
| /* Changing the default settings (experts only) */ |
| dna_cluster_low_level_settings(dna_cluster_handle, |
| 8192, // slave rx queue slots |
| 0, // slave tx queue slots |
| // slave additional buffers (available with alloc/release) |
| 1 + rss_rings[DIR_1][0]->dna.dna_dev.mem_info.tx.packet_memory_num_slots + |
| (bridge_interfaces ? rss_rings[DIR_2][0]->dna.dna_dev.mem_info.tx.packet_memory_num_slots : 0) |
| ); |
| |
| if (use_hugepages) { |
| if (dna_cluster_set_hugepages_mountpoint(dna_cluster_handle, hugepages_mountpoint) < 0) { |
| fprintf(stderr, "Error setting the hugepages mountpoint: did you mount it?\n"); |
| return(-1); |
| } |
| } |
| |
| if (dna_cluster_set_mode(dna_cluster_handle, mode) < 0) { |
| printf("dna_cluster_set_mode error\n"); |
| return(-1); |
| } |
| |
| if (open_rx_ring(in_device) < 0) |
| return -1; |
| |
| if (bridge_interfaces) { |
| if (open_rx_ring(out_device) < 0) |
| return -1; |
| } |
| |
| /* Setting up important details... */ |
| dna_cluster_set_wait_mode(dna_cluster_handle, !wait_for_packet /* active_wait */); |
| dna_cluster_set_cpu_affinity(dna_cluster_handle, rx_bind_core, -1); |
| |
| /* The default distribution function allows to balance per IP |
| in a coherent mode (not like RSS that does not do that) */ |
| if (hashing_mode > 0) { |
| dna_cluster_set_distribution_function(dna_cluster_handle, master_distribution_function); |
| } |
| |
| switch(hashing_mode) { |
| case 0: |
| printf("Hashing packets per-IP Address\n"); |
| break; |
| case 1: |
| printf("Hashing packets per-MAC Address\n"); |
| break; |
| case 2: |
| printf("Hashing packets per-IP protocol (TCP, UDP, ICMP...)\n"); |
| break; |
| } |
| |
| /* Now enable the cluster */ |
| if (dna_cluster_enable(dna_cluster_handle) < 0) { |
| fprintf(stderr, "Error enabling the engine; dna NICs already in use?\n"); |
| dna_cluster_destroy(dna_cluster_handle); |
| return -1; |
| } |
| |
| printf("The DNA cluster [id: %u][num consumer threads: %u] is running...\n", |
| cluster_id, num_threads); |
| |
| for (i = 0; i < num_threads; i++) { |
| snprintf(buf, sizeof(buf), "dnacluster:%d@%ld", cluster_id, i); |
| thread_stats[i].ring = pfring_open(buf, 1500 /* snaplen */, PF_RING_PROMISC); |
| if (thread_stats[i].ring == NULL) { |
| printf("pfring_open %s error [%s]\n", in_device, strerror(errno)); |
| return(-1); |
| } |
| |
| snprintf(buf, sizeof(buf), "pfdnacluster_multithread-cluster-%d-thread-%ld", cluster_id, i); |
| pfring_set_application_name(thread_stats[i].ring, buf); |
| |
| if ((rc = pfring_set_socket_mode(thread_stats[i].ring, mode)) != 0) |
| fprintf(stderr, "pfring_set_socket_mode returned [rc=%d]\n", rc); |
| |
| /* this call will do the magic making rss_rings[dir][i] a zero-copy ring */ |
| if ((rc = pfring_register_zerocopy_tx_ring(thread_stats[i].ring, rss_rings[DIR_1][i])) != 0) { |
| printf("pfring_register_zerocopy_tx_ring error: %d\n", rc); |
| return(-1); |
| } |
| |
| if (bridge_interfaces) { |
| if ((rc = pfring_register_zerocopy_tx_ring(thread_stats[i].ring, rss_rings[DIR_2][i])) != 0) { |
| printf("pfring_register_zerocopy_tx_ring error: %d\n", rc); |
| return(-1); |
| } |
| } |
| |
| pfring_enable_ring(thread_stats[i].ring); |
| |
| pthread_create(&thread_stats[i].pd_thread, NULL, packet_consumer_thread, (void *) i); |
| |
| printf("Consumer thread #%ld is running...\n", i); |
| } |
| |
| signal(SIGINT, sigproc); |
| signal(SIGTERM, sigproc); |
| signal(SIGINT, sigproc); |
| |
| signal(SIGALRM, my_sigalarm); |
| alarm(ALARM_SLEEP); |
| |
| for(i = 0; i < num_threads; i++) { |
| pthread_join(thread_stats[i].pd_thread, NULL); |
| pfring_close(thread_stats[i].ring); |
| } |
| |
| dna_cluster_destroy(dna_cluster_handle); |
| |
| return(0); |
| } |
| |