blob: f29157f49c6e512364976d448bef9b99f2387ab5 [file] [log] [blame]
"""tcpdump: helper module that gathers wireless network MCS using tcpdump."""
import logging
import multiprocessing
import os
import re
import subprocess
logger = logging.getLogger('tcpdump')
def MCSBackground(tcpdump_proc, out):
"""Continually extract wireless MCS from `tcpdump` output.
If tcpdump is actively capturing packets, this function will not return as
long as tcpdump is running. You probably want to run it in a background
activity using multiprocessing.
Args:
tcpdump_proc: A tcpdump process that's writing text output with Radiotap
headers to its stdout stream.
out: A Python file-like object to write MCS information to.
"""
mcsre = re.compile(r'MCS (\d+)')
x = 0
for row in iter(tcpdump_proc.stdout.readline, b''):
x += 1
match = mcsre.search(row)
if match:
mcs = int(match.group(1))
out.write('{:02} '.format(mcs))
else:
out.write(' . ')
if x % 25 == 0:
out.write('\n')
def MCS(bssid, interface, report_dir=''):
"""Runs tcpdump in the background to gather wireless MCS."""
print 'Please enter password for `sudo` if prompted.'
subprocess.call(['sudo', '-v'])
out = open(os.path.join(report_dir, 'mcs'), 'w+b')
err = open(os.path.join(report_dir, 'mcserr'), 'w+b')
filt = ('(not subtype beacon and not subtype ack) and '
'(wlan addr1 {0} or wlan addr2 {0} or wlan addr3 {0})'.format(
bssid, bssid, bssid))
sudo_tcpdump = subprocess.Popen(['sudo', 'tcpdump', '-Z', os.getlogin(),
'-Ilnei', interface, filt],
stdout=subprocess.PIPE,
stderr=err)
proc = multiprocessing.Process(target=MCSBackground,
args=(sudo_tcpdump, out))
proc.start()
return sudo_tcpdump, out, err
def Tcpdump(interface, bssid='', report_dir='', filtered=True):
"""Runs tcpdump in the background to gather wireless MCS."""
cap = os.path.join(report_dir, 'testnetwork.pcap')
tcpdump_args = ['tcpdump', '-IUnei', interface, '-w', cap]
login = os.getlogin()
if login != 'root':
print 'Please enter password for `sudo` if prompted.'
subprocess.call(['sudo', '-v'])
tcpdump_args = ['sudo'] + tcpdump_args + ['-Z', login]
if filtered:
filt = '(not subtype beacon and not subtype ack)'
if bssid:
# pylint: disable=line-too-long
filt += ' and (wlan addr1 {0} or wlan addr2 {0} or wlan addr3 {0})'.format(bssid)
else:
logger.warning('Requested filtered tcpdump, but network BSSID not known.')
tcpdump_args += [filt]
err = open(os.path.join(report_dir, 'mcserr'), 'w+b')
tcpdump_proc = subprocess.Popen(tcpdump_args, stderr=err)
return tcpdump_proc, err