blob: 9fffb82b7715094d15e312de91cba6f8b0773b28 [file] [log] [blame]
/*
** Copyright (C) 2010-12 ntop.org
**
** Authors: Luca Deri <deri@ntop.org>
** Alfredo Cardigliano <cardigliano@ntop.org>
**
** Copyright(C) 2010 Sourcefire, Inc.
** Author: Michael R. Altizer <maltizer@sourcefire.com>
** Will Metcalf <william.metcalf@gmail.com>
**
** Contributors: Tim Covel <tcovel@metaflows.com>
** Hong Zhu <hongzhu.ca@gmail.com>
**
** This program is free software; you can redistribute it and/or modify
** it under the terms of the GNU General Public License Version 2 as
** published by the Free Software Foundation. You may not use, modify or
** distribute this program under any other version of the GNU General
** Public License.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software
** Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <sys/types.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/sysinfo.h> /* get_nprocs(void) */
#include <unistd.h>
#include <signal.h>
#include <arpa/inet.h>
#include "pfring.h"
#include "sfbpf.h"
#include "daq_api.h"
#ifdef HAVE_REDIS
#include "hiredis/hiredis.h"
#endif
#define DAQ_PF_RING_VERSION 1
#define DAQ_PF_RING_DEFAULT_WATERMARK 128
#define DAQ_PF_RING_DEFAULT_IDLE_RULES_TIMEOUT 300 /* 5 minutes */
#define DAQ_PF_RING_MAX_NUM_DEVICES 16
#define DAQ_PF_RING_PASSIVE_DEV_IDX 0
typedef struct _pfring_context
{
DAQ_Mode mode;
char *devices[DAQ_PF_RING_MAX_NUM_DEVICES];
int ifindexes[DAQ_PF_RING_MAX_NUM_DEVICES];
pfring *ring_handles[DAQ_PF_RING_MAX_NUM_DEVICES];
int num_devices;
int snaplen;
char *filter_string;
char errbuf[1024];
u_char *pkt_buffer;
u_int breakloop;
int promisc_flag;
int timeout;
int watermark;
u_int16_t filter_count;
DAQ_Analysis_Func_t analysis_func;
uint32_t netmask;
DAQ_Stats_t stats;
u_int clusterids[DAQ_PF_RING_MAX_NUM_DEVICES];
int num_reflector_devices;
char *reflector_devices[DAQ_PF_RING_MAX_NUM_DEVICES];
u_int8_t use_kernel_filters;
int idle_rules_timeout;
u_int8_t use_fast_tx;
cluster_type cluster_mode;
u_int bindcpu;
uint32_t base_recv[DAQ_PF_RING_MAX_NUM_DEVICES];
uint32_t base_drop[DAQ_PF_RING_MAX_NUM_DEVICES];
DAQ_State state;
#ifdef HAVE_REDIS
redisContext *redis_ctx;
char *redis_ip;
int redis_port;
#endif
} Pfring_Context_t;
static void pfring_daq_reset_stats(void *handle);
static int pfring_daq_set_filter(void *handle, const char *filter);
static int pfring_daq_open(Pfring_Context_t *context, int id) {
uint32_t default_net = 0xFFFFFF00;
char *device = context->devices[id];
int pfring_rc;
pfring *ring_handle;
char buf[32];
if(!device) {
DPE(context->errbuf, "%s", "PF_RING a device must be specified");
return -1;
}
if(device) {
if(strncmp(device, "dna", 3) == 0) {
DPE(context->errbuf, "DNA is not supported by daq_pfring. Please get daq_pfring_dna from http://shop.ntop.org");
return(-1);
}
context->pkt_buffer = NULL;
ring_handle = pfring_open(device, context->snaplen,
PF_RING_LONG_HEADER
| (context->promisc_flag ? PF_RING_PROMISC : 0)
| (context->use_fast_tx ? PF_RING_RX_PACKET_BOUNCE : 0)
);
if(!ring_handle) {
DPE(context->errbuf, "pfring_open(): unable to open device '%s'. Please use -i <device>", device);
return -1;
}
}
pfring_get_bound_device_ifindex(ring_handle, &context->ifindexes[id]);
/* TODO this is because rules purging is not yet available with hw rules */
pfring_set_filtering_mode(ring_handle, software_only);
if (context->mode == DAQ_MODE_INLINE) {
/* Default mode: recv_and_send_mode */
pfring_set_direction(ring_handle, rx_only_direction);
} else if (context->mode == DAQ_MODE_PASSIVE) {
/* Default direction: rx_and_tx_direction */
if(context->num_reflector_devices > id) { /* lowlevelbridge ON */
filtering_rule rule;
memset(&rule, 0, sizeof(rule));
rule.rule_id = 1;
rule.rule_action = reflect_packet_and_continue_rule_evaluation;
snprintf(rule.reflector_device_name, REFLECTOR_NAME_LEN, "%s", context->reflector_devices[id]);
if(pfring_add_filtering_rule(ring_handle, &rule) < 0) {
DPE(context->errbuf, "unable to set the low level packet reflector %s -> %s", device, rule.reflector_device_name);
pfring_close(ring_handle);
return -1;
} else
printf("%s -> %s\n", context->devices[id], context->reflector_devices[id]);
pfring_set_direction(ring_handle, rx_only_direction);
}
pfring_set_socket_mode(ring_handle, recv_only_mode);
}
if(context->clusterids[id] > 0) {
pfring_rc = pfring_set_cluster(ring_handle, context->clusterids[id], context->cluster_mode);
if(pfring_rc != 0) {
DPE(context->errbuf, "pfring_set_cluster returned %d", pfring_rc);
pfring_close(ring_handle);
return -1;
}
snprintf(buf, sizeof(buf), "snort-cluster-%d-socket-%d", context->clusterids[id], id);
pfring_set_application_name(ring_handle, buf);
} else {
snprintf(buf, sizeof(buf), "snort-socket-%d", id);
pfring_set_application_name(ring_handle, buf);
}
pfring_set_poll_watermark(ring_handle, context->watermark);
context->netmask = htonl(default_net);
context->ring_handles[id] = ring_handle;
return(0);
}
static int update_hw_stats(Pfring_Context_t *context) {
pfring_stat ps;
int i;
for (i = 0; i < context->num_devices; i++)
if (context->ring_handles[i] == NULL)
/* daq stopped - using last available stats */
return DAQ_SUCCESS;
context->stats.hw_packets_received = 0;
context->stats.hw_packets_dropped = 0;
for (i = 0; i < context->num_devices; i++) {
memset(&ps, 0, sizeof(pfring_stat));
if(pfring_stats(context->ring_handles[i], &ps) < 0) {
DPE(context->errbuf, "%s: pfring_stats error [ring_idx = %d]", __FUNCTION__, i);
return DAQ_ERROR;
}
context->stats.hw_packets_received += (ps.recv - context->base_recv[i]);
context->stats.hw_packets_dropped += (ps.drop - context->base_drop[i]);
}
return DAQ_SUCCESS;
}
static sighandler_t default_sig_reload_handler = NULL;
static u_int8_t pfring_daq_reload_requested = 0;
static void pfring_daq_sig_reload(int sig) {
if(default_sig_reload_handler != NULL)
default_sig_reload_handler(sig);
pfring_daq_reload_requested = 1;
}
static void pfring_daq_reload(Pfring_Context_t *context) {
int i;
pfring_daq_reload_requested = 0;
if (context->use_kernel_filters) {
for (i = 0; i < context->num_devices; i++) {
if(context->ring_handles[i] != NULL) {
pfring_purge_idle_hash_rules(context->ring_handles[i], 0 /* all */);
}
}
}
}
static int pfring_daq_initialize(const DAQ_Config_t *config,
void **ctxt_ptr, char *errbuf, size_t len) {
Pfring_Context_t *context;
DAQ_Dict* entry;
int i;
/* taken from pfcount example */
u_int numCPU = get_nprocs();
context = calloc(1, sizeof(Pfring_Context_t));
if(!context) {
snprintf(errbuf, len, "%s: Couldn't allocate memory for the new PF_RING context!", __FUNCTION__);
return DAQ_ERROR_NOMEM;
}
context->mode = config->mode;
context->snaplen = config->snaplen;
context->promisc_flag =(config->flags & DAQ_CFG_PROMISC);
context->timeout = (config->timeout > 0) ? (int) config->timeout : -1;
context->watermark = DAQ_PF_RING_DEFAULT_WATERMARK;
context->filter_count = 0;
context->use_kernel_filters = 1;
context->idle_rules_timeout = DAQ_PF_RING_DEFAULT_IDLE_RULES_TIMEOUT;
context->use_fast_tx = 0;
context->devices[DAQ_PF_RING_PASSIVE_DEV_IDX] = strdup(config->name);
context->num_devices = 1;
context->cluster_mode = cluster_per_flow_2_tuple;
#ifdef HAVE_REDIS
context->redis_port = -1;
#endif
if(!context->devices[DAQ_PF_RING_PASSIVE_DEV_IDX]) {
snprintf(errbuf, len, "%s: Couldn't allocate memory for the device string!", __FUNCTION__);
free(context);
return DAQ_ERROR_NOMEM;
}
if(context->mode == DAQ_MODE_READ_FILE) {
snprintf(errbuf, len, "%s: function not supported on PF_RING", __FUNCTION__);
free(context);
return DAQ_ERROR;
} else if(context->mode == DAQ_MODE_INLINE) {
/* ethX:ethY;ethZ:ethJ */
char *twins, *twins_pos = NULL;
context->num_devices = 0;
twins = strtok_r(context->devices[DAQ_PF_RING_PASSIVE_DEV_IDX], ",", &twins_pos);
while(twins != NULL) {
char *dev, *dev_pos = NULL;
int last_twin = 0;
dev = strtok_r(twins, ":", &dev_pos);
while(dev != NULL) {
last_twin = context->num_devices;
context->devices[context->num_devices++] = dev;
dev = strtok_r(NULL, ":", &dev_pos);
}
if (context->num_devices & 0x1) {
snprintf(errbuf, len, "%s: Wrong format: inline mode requires pairs of devices", __FUNCTION__);
free(context);
return DAQ_ERROR;
}
if (last_twin > 0) /* new dev pair */
printf("%s <-> %s\n", context->devices[last_twin - 1], context->devices[last_twin]);
twins = strtok_r(NULL, ",", &twins_pos);
}
} else if(context->mode == DAQ_MODE_PASSIVE) {
/* ethX,ethY */
char *dev, *dev_pos = NULL;
context->num_devices = 0;
dev = strtok_r(context->devices[DAQ_PF_RING_PASSIVE_DEV_IDX], ",", &dev_pos);
while(dev != NULL) {
context->devices[context->num_devices++] = dev;
dev = strtok_r(NULL, ",", &dev_pos);
}
}
for(entry = config->values; entry; entry = entry->next) {
if(!entry->value || !*entry->value) {
snprintf(errbuf, len,
"%s: variable needs value(%s)\n", __FUNCTION__, entry->key);
return DAQ_ERROR;
} else if(!strcmp(entry->key, "clusterid")) {
char *clusters = strdup(entry->value);
if (clusters != NULL) {
char *clusterid, *clusterid_pos = NULL;
char* end;
clusterid = strtok_r(clusters, ",", &clusterid_pos);
for (i = 0; i < context->num_devices; i++) {
if (clusterid == NULL) {
snprintf(errbuf, len, "%s: not enough cluster ids (%d)\n", __FUNCTION__, i);
return DAQ_ERROR;
}
end = entry->value;
context->clusterids[i] =(int)strtol(clusterid, &end, 0);
if(*end
|| (context->clusterids[i] <= 0)
|| (context->clusterids[i] > 65535)) {
snprintf(errbuf, len, "%s: bad clusterid(%s)\n",
__FUNCTION__, clusterid);
return DAQ_ERROR;
}
clusterid = strtok_r(NULL, ",", &clusterid_pos);
}
free(clusters);
}
} else if(!strcmp(entry->key, "no-kernel-filters")) {
context->use_kernel_filters = 0;
} else if(!strcmp(entry->key, "kernel-filters-idle-timeout")) {
char* end = entry->value;
context->idle_rules_timeout = (int) strtol(entry->value, &end, 0);
if(*end || (context->idle_rules_timeout < 0)) {
snprintf(errbuf, len, "%s: bad kernel filters idle timeout(%s)\n",
__FUNCTION__, entry->value);
return DAQ_ERROR;
}
} else if(!strcmp(entry->key, "fast-tx")) {
context->use_fast_tx = 1;
} else if(!strcmp(entry->key, "bindcpu")) {
char* end = entry->value;
context->bindcpu =(int)strtol(entry->value, &end, 0);
if(*end
|| (context->bindcpu >= numCPU)) {
snprintf(errbuf, len, "%s: bad bindcpu(%s)\n",
__FUNCTION__, entry->value);
return DAQ_ERROR;
} else {
cpu_set_t mask;
CPU_ZERO(&mask);
CPU_SET((int)context->bindcpu, &mask);
if(sched_setaffinity(0, sizeof(mask), &mask) < 0) {
snprintf(errbuf, len, "%s:failed to set bindcpu(%u) on pid %i\n",
__FUNCTION__, context->bindcpu, getpid());
return DAQ_ERROR;
}
}
} else if(!strcmp(entry->key, "timeout")) {
char* end = entry->value;
context->timeout = (int) strtol(entry->value, &end, 0);
if(*end || (context->timeout < 0)) {
snprintf(errbuf, len, "%s: bad timeout(%s)\n",
__FUNCTION__, entry->value);
return DAQ_ERROR;
}
} else if(!strcmp(entry->key, "watermark")) {
char* end = entry->value;
context->watermark = (int) strtol(entry->value, &end, 0);
if(*end || (context->watermark < 0)) {
snprintf(errbuf, len, "%s: bad watermark(%s)\n",
__FUNCTION__, entry->value);
return DAQ_ERROR;
}
} else if(!strcmp(entry->key, "clustermode")) {
char* end = entry->value;
int cmode = (int) strtol(entry->value, &end, 0);
if(*end || (cmode != 2 && cmode != 4 && cmode != 5 && cmode != 6)) {
snprintf(errbuf, len, "%s: bad cluster mode(%s)\n",
__FUNCTION__, entry->value);
return DAQ_ERROR;
} else {
switch (cmode) {
case 2: context->cluster_mode = cluster_per_flow_2_tuple; break;
case 4: context->cluster_mode = cluster_per_flow_4_tuple; break;
case 5: context->cluster_mode = cluster_per_flow_5_tuple; break;
case 6: context->cluster_mode = cluster_per_flow; break;
default: break;
}
}
} else if(!strcmp(entry->key, "lowlevelbridge")) {
if (context->mode == DAQ_MODE_PASSIVE) {
char *reflector_devices = strdup(entry->value);
context->num_reflector_devices = 0;
if (reflector_devices != NULL) {
/* ethX,ethY */
char *dev, *dev_pos = NULL;
dev = strtok_r(reflector_devices, ",", &dev_pos);
while(dev != NULL) {
context->reflector_devices[context->num_reflector_devices++] = dev;
dev = strtok_r(NULL, ",", &dev_pos);
}
if (context->num_reflector_devices != context->num_devices) {
snprintf(errbuf, len, "%s: not enough reflector devices (%d)\n",
__FUNCTION__, context->num_reflector_devices);
return DAQ_ERROR;
}
}
} else {
snprintf(errbuf, len, "%s: lowlevelbridge is for passive mode only\n",
__FUNCTION__);
return DAQ_ERROR;
}
}
#ifdef HAVE_REDIS
else if (!strcmp(entry->key, "redis")) {
char *ipPort = strdup(entry->value);
if (ipPort != NULL) {
int i = 0;
char *temp, *temp2 = NULL;
temp = strtok_r(ipPort, ":", &temp2);
while (temp != NULL || i < 2) {
if (i == 0)
context->redis_ip = strdup(temp);
else
context->redis_port = atoi(temp);
temp = strtok_r(NULL, ":", &temp2);
i++;
}
if (temp != NULL) {
snprintf(errbuf, len, "%s: Incorrect format for <redis ip>:<redis port>\n", __FUNCTION__);
free(temp);
return DAQ_ERROR;
}
}
}
#endif
else {
snprintf(errbuf, len,
"%s: unsupported variable(%s=%s)\n",
__FUNCTION__, entry->key, entry->value);
return DAQ_ERROR;
}
}
/* catching the SIGRELOAD signal, replacing the default snort handler */
if ((default_sig_reload_handler = signal(SIGHUP, pfring_daq_sig_reload)) == SIG_ERR)
default_sig_reload_handler = NULL;
for (i = 0; i < context->num_devices; i++) {
if(context->ring_handles[i] == NULL) {
if (pfring_daq_open(context, i) == -1)
return DAQ_ERROR;
}
}
#ifdef HAVE_REDIS
if (context->redis_ip != NULL && context->redis_port != -1) {
if ((context->redis_ctx = redisConnect(context->redis_ip, context->redis_port)) == NULL || context->redis_ctx->err) {
snprintf(errbuf, len, "redis connection error: %d", context->redis_ctx->err);
return DAQ_ERROR;
}
}
#endif
context->state = DAQ_STATE_INITIALIZED;
*ctxt_ptr = context;
return DAQ_SUCCESS;
}
static int pfring_daq_set_filter(void *handle, const char *filter) {
Pfring_Context_t *context = (Pfring_Context_t *) handle;
int ret, i;
struct sfbpf_program fcode;
if(context->ring_handles[DAQ_PF_RING_PASSIVE_DEV_IDX]) {
if(sfbpf_compile(context->snaplen, DLT_EN10MB, &fcode,
filter, 0 /* 1: optimize */, htonl(context->netmask)) < 0) {
DPE(context->errbuf, "%s: BPF state machine compilation failed!", __FUNCTION__);
return DAQ_ERROR;
}
ret = DAQ_SUCCESS;
for (i = 0; i < context->num_devices; i++) {
if(setsockopt(pfring_get_selectable_fd(context->ring_handles[i]), 0,
SO_ATTACH_FILTER, &fcode, sizeof(fcode)) != 0) {
ret = DAQ_ERROR;
}
}
sfbpf_freecode(&fcode);
} else {
/* Just check if the filter is valid */
if(sfbpf_compile(context->snaplen, DLT_EN10MB, &fcode,
filter, 0 /* 1: optimize */, 0 /* netmask */) < 0) {
DPE(context->errbuf, "%s: BPF state machine compilation failed!", __FUNCTION__);
return DAQ_ERROR;
}
ret = DAQ_SUCCESS;
if(context->filter_string)
free(context->filter_string);
context->filter_string = strdup(filter);
if(!context->filter_string) {
DPE(context->errbuf, "%s: Couldn't allocate memory for the filter string!",
__FUNCTION__);
ret = DAQ_ERROR;
}
sfbpf_freecode(&fcode);
}
return ret;
}
static int pfring_daq_start(void *handle) {
Pfring_Context_t *context =(Pfring_Context_t *) handle;
if(context->filter_string) {
if(pfring_daq_set_filter(context, context->filter_string))
return DAQ_ERROR;
}
pfring_daq_reset_stats(context);
context->state = DAQ_STATE_STARTED;
return DAQ_SUCCESS;
}
static int pfring_daq_send_packet(Pfring_Context_t *context, pfring *send_ring,
u_int pkt_len, pfring *recv_ring, int send_ifindex)
{
int rc;
if(( !context->use_fast_tx && send_ring == NULL)
||(context->use_fast_tx && recv_ring == NULL))
return(DAQ_SUCCESS);
if(context->use_fast_tx)
rc = pfring_send_last_rx_packet(recv_ring, send_ifindex);
else
rc = pfring_send(send_ring, (char *) context->pkt_buffer, pkt_len, 1 /* flush packet */);
if (rc < 0) {
DPE(context->errbuf, "%s", "pfring_send() error");
return DAQ_ERROR;
}
context->stats.packets_injected++;
return(DAQ_SUCCESS);
}
#ifdef HAVE_REDIS
int pfring_daq_redis_insert_to_set(redisContext *redis_ctx, const char *set_name, char *ip) {
redisReply *r = NULL;
const int TTL = 3600;
#if 0
if ((r = redisCommand(redis_ctx, "SISMEMBER %s %s", set_name, ip)) != NULL) {
if (r->integer != 0) {
freeReplyObject(r);
return DAQ_ERROR;
}
} else {
freeReplyObject(r);
return DAQ_ERROR;
}
freeReplyObject(r);
#endif
if ((r = redisCommand(redis_ctx, "SADD %s %s", set_name, ip)) != NULL) {
//printf("[DEBUG] Entry added to %s SET: %s\n",set_name,ip);
freeReplyObject(r);
} else{
freeReplyObject(r);
return DAQ_ERROR;
}
if((r = redisCommand(redis_ctx, "INCR %s", ip)) != NULL) {
//printf("[DEBUG] Incrementing the entry added to %s SET: %s\n",set_name,ip);
freeReplyObject(r);
} else {
freeReplyObject(r);
return DAQ_ERROR;
}
if((r = redisCommand(redis_ctx, "EXPIRE %s %d", ip, TTL)) != NULL){
//printf("[DEBUG] Setting the expire time of %d sec to the entry added to %s SET: %s\n",TTL,set_name,ip);
freeReplyObject(r);
} else {
freeReplyObject(r);
return DAQ_ERROR;
}
return DAQ_SUCCESS;
}
#endif
static int pfring_daq_acquire(void *handle, int cnt, DAQ_Analysis_Func_t callback,
#if (DAQ_API_VERSION >= 0x00010002)
DAQ_Meta_Func_t metaback,
#endif
void *user) {
Pfring_Context_t *context =(Pfring_Context_t *) handle;
int ret = 0, i, current_ring_idx = context->num_devices - 1, rx_ring_idx;
struct pollfd pfd[DAQ_PF_RING_MAX_NUM_DEVICES];
hash_filtering_rule hash_rule;
memset(&hash_rule, 0, sizeof(hash_rule));
context->analysis_func = callback;
context->breakloop = 0;
for (i = 0; i < context->num_devices; i++)
pfring_enable_ring(context->ring_handles[i]);
while((!context->breakloop) && ((cnt == -1) || (cnt > 0))) {
struct pfring_pkthdr phdr;
DAQ_PktHdr_t hdr;
DAQ_Verdict verdict;
memset(&phdr, 0, sizeof(phdr));
if(pfring_daq_reload_requested)
pfring_daq_reload(context);
for (i = 0; i < context->num_devices; i++) {
current_ring_idx = (current_ring_idx + 1) % context->num_devices;
ret = pfring_recv(context->ring_handles[current_ring_idx], &context->pkt_buffer, 0, &phdr, 0 /* Dont't wait */);
if (ret > 0) break;
}
if(ret <= 0) {
/* No packet to read: let's poll */
int rc;
for (i = 0; i < context->num_devices; i++) {
pfd[i].fd = pfring_get_selectable_fd(context->ring_handles[i]);
pfd[i].events = POLLIN;
pfd[i].revents = 0;
}
rc = poll(pfd, context->num_devices, context->timeout);
if(rc < 0) {
if(errno == EINTR)
break;
DPE(context->errbuf, "%s: Poll failed: %s(%d)", __FUNCTION__, strerror(errno), errno);
return DAQ_ERROR;
}
} else {
hdr.caplen = phdr.caplen;
hdr.pktlen = phdr.len;
hdr.ts = phdr.ts;
#if (DAQ_API_VERSION >= 0x00010002)
hdr.ingress_index = phdr.extended_hdr.if_index;
hdr.egress_index = -1;
hdr.ingress_group = -1;
hdr.egress_group = -1;
#else
hdr.device_index = phdr.extended_hdr.if_index;
#endif
hdr.flags = 0;
rx_ring_idx = current_ring_idx;
context->stats.packets_received++;
verdict = context->analysis_func(user, &hdr,(u_char*)context->pkt_buffer);
#if 0
printf("[DEBUG] %d.%d.%d.%d:%d -> %d.%d.%d.%d:%d Verdict=%d\n",
phdr.extended_hdr.parsed_pkt.ipv4_src >> 24 & 0xFF, phdr.extended_hdr.parsed_pkt.ipv4_src >> 16 & 0xFF,
phdr.extended_hdr.parsed_pkt.ipv4_src >> 8 & 0xFF, phdr.extended_hdr.parsed_pkt.ipv4_src >> 0 & 0xFF,
phdr.extended_hdr.parsed_pkt.l4_src_port & 0xFFFF,
phdr.extended_hdr.parsed_pkt.ipv4_dst >> 24 & 0xFF, phdr.extended_hdr.parsed_pkt.ipv4_dst >> 16 & 0xFF,
phdr.extended_hdr.parsed_pkt.ipv4_dst >> 8 & 0xFF, phdr.extended_hdr.parsed_pkt.ipv4_dst >> 0 & 0xFF,
phdr.extended_hdr.parsed_pkt.l4_src_port & 0xFFFF,
verdict);
#endif
if(verdict >= MAX_DAQ_VERDICT)
verdict = DAQ_VERDICT_PASS;
if (phdr.extended_hdr.parsed_pkt.eth_type == 0x0806 /* ARP */ )
verdict = DAQ_VERDICT_PASS;
switch(verdict) {
case DAQ_VERDICT_BLACKLIST: /* Block the packet and block all future packets in the same flow systemwide. */
if (context->use_kernel_filters) {
pfring_parse_pkt(context->pkt_buffer, &phdr, 4, 0, 0);
/* or use pfring_recv_parsed() to force parsing. */
hash_rule.rule_id = context->filter_count++;
hash_rule.vlan_id = phdr.extended_hdr.parsed_pkt.vlan_id;
hash_rule.proto = phdr.extended_hdr.parsed_pkt.l3_proto;
memcpy(&hash_rule.host_peer_a, &phdr.extended_hdr.parsed_pkt.ipv4_src, sizeof(ip_addr));
memcpy(&hash_rule.host_peer_b, &phdr.extended_hdr.parsed_pkt.ipv4_dst, sizeof(ip_addr));
hash_rule.port_peer_a = phdr.extended_hdr.parsed_pkt.l4_src_port;
hash_rule.port_peer_b = phdr.extended_hdr.parsed_pkt.l4_dst_port;
hash_rule.plugin_action.plugin_id = NO_PLUGIN_ID;
if (context->mode == DAQ_MODE_PASSIVE && context->num_reflector_devices > rx_ring_idx) { /* lowlevelbridge ON */
hash_rule.rule_action = reflect_packet_and_stop_rule_evaluation;
snprintf(hash_rule.reflector_device_name, REFLECTOR_NAME_LEN, "%s", context->reflector_devices[rx_ring_idx]);
} else {
hash_rule.rule_action = dont_forward_packet_and_stop_rule_evaluation;
}
pfring_handle_hash_filtering_rule(context->ring_handles[rx_ring_idx], &hash_rule, 1 /* add_rule */);
/* Purge rules idle (i.e. with no packet matching) for more than 1h */
pfring_purge_idle_hash_rules(context->ring_handles[rx_ring_idx], context->idle_rules_timeout);
#if DEBUG
printf("[DEBUG] %d.%d.%d.%d:%d -> %d.%d.%d.%d:%d Verdict=%d Action=%d\n",
hash_rule.host_peer_a.v4 >> 24 & 0xFF, hash_rule.host_peer_a.v4 >> 16 & 0xFF,
hash_rule.host_peer_a.v4 >> 8 & 0xFF, hash_rule.host_peer_a.v4 >> 0 & 0xFF,
hash_rule.port_peer_a & 0xFFFF,
hash_rule.host_peer_b.v4 >> 24 & 0xFF, hash_rule.host_peer_b.v4 >> 16 & 0xFF,
hash_rule.host_peer_b.v4 >> 8 & 0xFF, hash_rule.host_peer_b.v4 >> 0 & 0xFF,
hash_rule.port_peer_b & 0xFFFF,
verdict,
hash_rule.rule_action);
#endif
}
#ifdef HAVE_REDIS
if (context->redis_ctx != NULL) {
char ipAttacker[INET_ADDRSTRLEN];
char ipTarget[INET_ADDRSTRLEN];
pfring_parse_pkt(context->pkt_buffer, &phdr, 4, 0, 0);
/* Attacker */
if (inet_ntop(AF_INET, (const void *) &phdr.extended_hdr.parsed_pkt.ipv4_src, ipAttacker, INET_ADDRSTRLEN) != NULL) {
if (pfring_daq_redis_insert_to_set(context->redis_ctx, "Attackers", ipAttacker) != DAQ_SUCCESS) {
DPE(context->errbuf, "%s: Insert into Attackers Set failed: %s", __FUNCTION__, ipAttacker);
return DAQ_ERROR;
}
}
/* target */
if (inet_ntop(AF_INET,(const void *) &phdr.extended_hdr.parsed_pkt.ipv4_dst, ipTarget, INET_ADDRSTRLEN) != NULL) {
if (pfring_daq_redis_insert_to_set(context->redis_ctx, "Targets", ipTarget) != DAQ_SUCCESS) {
DPE(context->errbuf, "%s: Insert into Targets Set failed: %s", __FUNCTION__, ipTarget);
return DAQ_ERROR;
}
}
}
#endif
break;
case DAQ_VERDICT_WHITELIST: /* Pass the packet and fastpath all future packets in the same flow systemwide. */
case DAQ_VERDICT_IGNORE: /* Pass the packet and fastpath all future packets in the same flow for this application. */
/* Setting a rule for reflectiong packets when lowlevelbridge is ON could be an optimization here,
* but we can't set "forward" (reflector won't work) or "reflect" (packets reflected twice) hash rules */
case DAQ_VERDICT_PASS: /* Pass the packet */
case DAQ_VERDICT_REPLACE: /* Pass a packet that has been modified in-place.(No resizing allowed!) */
if (context->mode == DAQ_MODE_INLINE) {
pfring_daq_send_packet(context, context->ring_handles[rx_ring_idx ^ 0x1], hdr.caplen,
context->ring_handles[rx_ring_idx], context->ifindexes[rx_ring_idx ^ 0x1]);
}
break;
case DAQ_VERDICT_BLOCK: /* Block the packet. */
/* Nothing to do really */
break;
case MAX_DAQ_VERDICT:
/* No way we can reach this point */
break;
}
context->stats.verdicts[verdict]++;
if(cnt > 0) cnt--;
}
}
return 0;
}
static int pfring_daq_inject(void *handle, const DAQ_PktHdr_t *hdr,
const uint8_t *packet_data, uint32_t len, int reverse) {
Pfring_Context_t *context = (Pfring_Context_t *) handle;
int i, tx_ring_idx = DAQ_PF_RING_PASSIVE_DEV_IDX;
if (context->mode == DAQ_MODE_INLINE) { /* looking for the device idx */
for (i = 0; i < context->num_devices; i++)
#if (DAQ_API_VERSION >= 0x00010002)
if (context->ifindexes[i] == hdr->ingress_index) {
#else
if (context->ifindexes[i] == hdr->device_index) {
#endif
tx_ring_idx = i ^ 0x1;
break;
}
}
if(pfring_send(context->ring_handles[tx_ring_idx],
(char *) packet_data, len, 1 /* flush packet */) < 0) {
DPE(context->errbuf, "%s", "pfring_send() error");
return DAQ_ERROR;
}
context->stats.packets_injected++;
return DAQ_SUCCESS;
}
static int pfring_daq_breakloop(void *handle) {
Pfring_Context_t *context =(Pfring_Context_t *) handle;
if(!context->ring_handles[DAQ_PF_RING_PASSIVE_DEV_IDX])
return DAQ_ERROR;
context->breakloop = 1;
return DAQ_SUCCESS;
}
static int pfring_daq_stop(void *handle) {
Pfring_Context_t *context =(Pfring_Context_t *) handle;
int i;
update_hw_stats(context);
for (i = 0; i < context->num_devices; i++) {
if(context->ring_handles[i]) {
/* Store the hardware stats for post-stop stat calls. */
pfring_close(context->ring_handles[i]);
context->ring_handles[i] = NULL;
}
}
context->state = DAQ_STATE_STOPPED;
return DAQ_SUCCESS;
}
static void pfring_daq_shutdown(void *handle) {
Pfring_Context_t *context =(Pfring_Context_t *) handle;
int i;
for (i = 0; i < context->num_devices; i++)
if(context->ring_handles[i])
pfring_close(context->ring_handles[i]);
if(context->devices[DAQ_PF_RING_PASSIVE_DEV_IDX])
free(context->devices[DAQ_PF_RING_PASSIVE_DEV_IDX]);
if(context->reflector_devices[DAQ_PF_RING_PASSIVE_DEV_IDX])
free(context->reflector_devices[DAQ_PF_RING_PASSIVE_DEV_IDX]);
if(context->filter_string)
free(context->filter_string);
#ifdef HAVE_REDIS
if(context->redis_ctx != NULL)
redisFree(context->redis_ctx);
#endif
free(context);
}
static DAQ_State pfring_daq_check_status(void *handle) {
Pfring_Context_t *context =(Pfring_Context_t *) handle;
return context->state;
}
static int pfring_daq_get_stats(void *handle, DAQ_Stats_t *stats) {
Pfring_Context_t *context =(Pfring_Context_t *) handle;
update_hw_stats(context);
memcpy(stats, &context->stats, sizeof(DAQ_Stats_t));
return DAQ_SUCCESS;
}
static void pfring_daq_reset_stats(void *handle) {
Pfring_Context_t *context =(Pfring_Context_t *) handle;
pfring_stat ps;
int i;
memset(&context->stats, 0, sizeof(DAQ_Stats_t));
memset(&ps, 0, sizeof(pfring_stat));
for (i = 0; i < context->num_devices; i++)
if(context->ring_handles[i]
&& pfring_stats(context->ring_handles[i], &ps) == 0) {
context->base_recv[i] = ps.recv;
context->base_drop[i] = ps.drop;
}
}
static int pfring_daq_get_snaplen(void *handle) {
Pfring_Context_t *context =(Pfring_Context_t *) handle;
if(!context->ring_handles[DAQ_PF_RING_PASSIVE_DEV_IDX])
return DAQ_ERROR;
else
return context->snaplen;
}
static uint32_t pfring_daq_get_capabilities(void *handle) {
return DAQ_CAPA_BLOCK | DAQ_CAPA_REPLACE | DAQ_CAPA_INJECT |
DAQ_CAPA_INJECT_RAW | DAQ_CAPA_BREAKLOOP | DAQ_CAPA_UNPRIV_START | DAQ_CAPA_BPF;
}
static int pfring_daq_get_datalink_type(void *handle) {
Pfring_Context_t *context =(Pfring_Context_t *) handle;
if(!context)
return DAQ_ERROR;
else
return DLT_EN10MB;
}
static const char *pfring_daq_get_errbuf(void *handle) {
Pfring_Context_t *context =(Pfring_Context_t *) handle;
return context->errbuf;
}
static void pfring_daq_set_errbuf(void *handle, const char *string) {
Pfring_Context_t *context =(Pfring_Context_t *) handle;
if(!string)
return;
DPE(context->errbuf, "%s", string);
}
static int pfring_daq_get_device_index(void *handle, const char *device) {
return DAQ_ERROR_NOTSUP;
}
#ifdef BUILDING_SO
DAQ_SO_PUBLIC const DAQ_Module_t DAQ_MODULE_DATA =
#else
const DAQ_Module_t pfring_daq_module_data =
#endif
{
.api_version = DAQ_API_VERSION,
.module_version = DAQ_PF_RING_VERSION,
.name = "pfring",
.type = DAQ_TYPE_INLINE_CAPABLE | DAQ_TYPE_INTF_CAPABLE | DAQ_TYPE_MULTI_INSTANCE,
.initialize = pfring_daq_initialize,
.set_filter = pfring_daq_set_filter,
.start = pfring_daq_start,
.acquire = pfring_daq_acquire,
.inject = pfring_daq_inject,
.breakloop = pfring_daq_breakloop,
.stop = pfring_daq_stop,
.shutdown = pfring_daq_shutdown,
.check_status = pfring_daq_check_status,
.get_stats = pfring_daq_get_stats,
.reset_stats = pfring_daq_reset_stats,
.get_snaplen = pfring_daq_get_snaplen,
.get_capabilities = pfring_daq_get_capabilities,
.get_datalink_type = pfring_daq_get_datalink_type,
.get_errbuf = pfring_daq_get_errbuf,
.set_errbuf = pfring_daq_set_errbuf,
.get_device_index = pfring_daq_get_device_index
};