blob: 361561ae1b7bdc06da9f6aaf862c7295508b0373 [file] [log] [blame]
/*
*
* (C) 2011-13 - ntop.org
*
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lessed General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
*
*/
#include "pfring.h"
#include "pfring_mod.h"
#include "pfring_mod_usring.h"
#define gcc_mb() __asm__ __volatile__("": : :"memory");
#define ALIGN(a,b) (((a) + ((b)-1) ) & ~((b)-1))
#define USE_WATERMARK
//#define PROFILING
#ifdef PROFILING
typedef unsigned long long ticks;
ticks tick_delta_tot = 0;
u_int32_t n_call_tot = 0;
static __inline__ ticks getticks(void)
{
unsigned a, d;
/* asm("cpuid"); */
asm volatile("rdtsc" : "=a" (a), "=d" (d));
return (((ticks)a) | (((ticks)d) << 32));
}
#endif
/* ******************************* */
int pfring_mod_usring_open(pfring *ring) {
int rc;
int tot_ring_mem;
socklen_t s_len;
ring->close = pfring_mod_usring_close;
ring->send = pfring_mod_usring_enqueue;
ring->send_parsed = pfring_mod_usring_enqueue_parsed;
ring->enable_ring = pfring_mod_enable_ring;
#ifdef USE_WATERMARK
ring->dna.dna_rx_sync_watermark = ring->dna.dna_tx_sync_watermark = DEFAULT_MIN_PKT_QUEUED;
#endif
if(strncmp(ring->device_name, "usr", 3))
return -1; /* Device name must me usrX */
ring->fd = socket(PF_RING, SOCK_RAW, htons(ETH_P_ALL));
if(ring->fd < 0)
return -1;
rc = setsockopt(ring->fd, 0, SO_ATTACH_USERSPACE_RING, ring->device_name, sizeof(ring->device_name) + 1);
if (rc < 0) {
close(ring->fd);
return -1;
}
ring->buffer = (char *) mmap(NULL, PAGE_SIZE, PROT_READ|PROT_WRITE, MAP_SHARED, ring->fd, 0);
if(ring->buffer == MAP_FAILED) {
close(ring->fd);
return -1;
}
ring->slots_info = (FlowSlotInfo *) ring->buffer;
if(ring->slots_info->version != RING_FLOWSLOT_VERSION) {
printf("Wrong RING version: "
"kernel is %i, libpfring was compiled with %i\n",
ring->slots_info->version, RING_FLOWSLOT_VERSION);
close(ring->fd);
return -1;
}
tot_ring_mem = ring->slots_info->tot_mem;
munmap(ring->buffer, PAGE_SIZE);
ring->buffer = (char *) mmap(NULL, tot_ring_mem, PROT_READ|PROT_WRITE, MAP_SHARED, ring->fd, 0);
if(ring->buffer == MAP_FAILED) {
close(ring->fd);
return -1;
}
ring->slots_info = (FlowSlotInfo *) ring->buffer;
ring->slots = (char *) (ring->buffer + sizeof(FlowSlotInfo));
s_len = sizeof(ring->slot_header_len);
rc = getsockopt(ring->fd, 0, SO_GET_PKT_HEADER_LEN, &ring->slot_header_len, &s_len);
if (rc < 0) {
pfring_close(ring);
return -1;
}
s_len = sizeof(ring->caplen);
rc = getsockopt(ring->fd, 0, SO_GET_BUCKET_LEN, &ring->caplen, &s_len);
if (rc < 0) {
pfring_close(ring);
return -1;
}
return 0;
}
/* ******************************* */
void pfring_mod_usring_close(pfring *ring) {
if(ring->buffer != NULL)
munmap(ring->buffer, ring->slots_info->tot_mem);
close(ring->fd);
#ifdef PROFILING
printf("[PROFILING] num_sendto_calls: %lu avg_sendto_ticks:%llu\n",
n_call_tot, tick_delta_tot, tick_delta_tot/n_call_tot);
#endif
}
/* ******************************* */
static inline char* get_slot(pfring *ring, u_int32_t off) {
return &(ring->slots[off]);
}
/* ******************************* */
static inline int get_next_slot_offset(pfring *ring, u_int32_t off)
{
struct pfring_pkthdr *hdr;
u_int32_t real_slot_size;
hdr = (struct pfring_pkthdr *) get_slot(ring, off);
real_slot_size = ring->slot_header_len + hdr->caplen;
/* padding at the end of the packet (magic number added on insert) */
real_slot_size += sizeof(u_int16_t); /* RING_MAGIC_VALUE */
/* Align slot size to 64 bit */
real_slot_size = ALIGN(real_slot_size, sizeof(u_int64_t));
//TODO extended_hdr.parsed_header
//if(ring->slot_header_len == sizeof(struct pfring_pkthdr)) /* !quick_mode */
// real_slot_size += hdr->extended_hdr.parsed_header_len;
if((off + real_slot_size + ring->slots_info->slot_len) > (ring->slots_info->tot_mem - sizeof(FlowSlotInfo)))
return 0;
return (off + real_slot_size);
}
/* ******************************* */
static inline u_int32_t num_queued_pkts(pfring *ring)
{
u_int32_t tot_insert = ring->slots_info->tot_insert;
u_int32_t tot_read = ring->slots_info->tot_read;
if(tot_insert >= tot_read)
return (tot_insert - tot_read);
else
return(((u_int32_t) - 1) + tot_insert - tot_read);
}
/* ******************************* */
static inline int check_free_ring_slot(pfring *ring, int off)
{
if(ring->slots_info->insert_off == ring->slots_info->remove_off) {
if(num_queued_pkts(ring) >= ring->slots_info->min_num_slots)
return 0;
} else {
if(ring->slots_info->insert_off < ring->slots_info->remove_off) {
if((ring->slots_info->remove_off - ring->slots_info->insert_off) < (2 * ring->slots_info->slot_len))
return 0;
} else {
if ((ring->slots_info->tot_mem - sizeof(FlowSlotInfo) - ring->slots_info->insert_off) < (2 * ring->slots_info->slot_len) &&
ring->slots_info->remove_off == 0)
return 0;
}
}
return 1;
}
/* ******************************* */
static inline int copy_data_to_ring(pfring *ring, struct pfring_pkthdr *pkt_hdr, void *pkt, uint pkt_len) {
struct pfring_pkthdr *hdr;
char *ring_bucket;
u_int32_t off;
off = ring->slots_info->insert_off;
ring->slots_info->tot_pkts++;
if(!check_free_ring_slot(ring, off)) {
ring->slots_info->tot_lost++;
return -1;
}
ring_bucket = get_slot(ring, off);
hdr = (struct pfring_pkthdr *) ring_bucket;
if (pkt_hdr != NULL) {
memcpy(hdr, pkt_hdr, ring->slot_header_len);
//TODO extended_hdr.parsed_header
} else {
memset(hdr, 0, ring->slot_header_len);
//TODO
//gettimeofday(&hdr->ts, NULL);
//if (ring->slot_header_len == sizeof(struct pfring_pkthdr)) /* !quick_mode */
// hdr->extended_hdr.if_index = FAKE_PACKET;
}
hdr->len = pkt_len;
hdr->caplen = min_val(pkt_len, ring->caplen);
memcpy(&ring_bucket[ring->slot_header_len], pkt, hdr->caplen);
ring->slots_info->insert_off = get_next_slot_offset(ring, off);
/*
NOTE: smp_* barriers are _compiler_ barriers on UP, mandatory barriers on SMP
a consumer _must_ see the new value of tot_insert only after the buffer update completes
*/
//smp_mb();
gcc_mb();
ring->slots_info->tot_insert++;
return 1;
}
/* ******************************* */
static inline void pfring_mod_usring_signal(pfring *ring, u_int8_t flush_packet) {
if (!(ring->slots_info->userspace_ring_flags & USERSPACE_RING_NO_INTERRUPT)) {
#ifdef USE_WATERMARK
if(!flush_packet && ring->dna.num_tx_pkts_before_dna_sync < ring->dna.dna_tx_sync_watermark)
ring->dna.num_tx_pkts_before_dna_sync++;
else {
ring->dna.num_tx_pkts_before_dna_sync = 0;
#endif
#ifdef PROFILING
ticks tick_start = getticks();
#endif
sendto(ring->fd, NULL, 0, 0, NULL, 0);
#ifdef PROFILING
tick_delta_tot += (getticks() - tick_start);
n_call_tot++;
#endif
#ifdef USE_WATERMARK
}
#endif
}
}
/* ******************************* */
int pfring_mod_usring_enqueue(pfring *ring, char *pkt, u_int pkt_len, u_int8_t flush_packet) {
int rc;
rc = copy_data_to_ring(ring, NULL, pkt, pkt_len);
if (rc == 1)
pfring_mod_usring_signal(ring, flush_packet);
return rc;
}
/* ******************************* */
int pfring_mod_usring_enqueue_parsed(pfring *ring, char *pkt, struct pfring_pkthdr *hdr, u_int8_t flush_packet) {
int rc;
rc = copy_data_to_ring(ring, hdr, pkt, hdr->len);
if (rc == 1)
pfring_mod_usring_signal(ring, flush_packet);
return rc;
}