| """tcpdump: helper module that gathers wireless network MCS using tcpdump.""" |
| |
| import logging |
| import multiprocessing |
| import os |
| import re |
| import subprocess |
| |
| |
| logger = logging.getLogger('tcpdump') |
| |
| |
| def MCSBackground(tcpdump_proc, out): |
| """Continually extract wireless MCS from `tcpdump` output. |
| |
| If tcpdump is actively capturing packets, this function will not return as |
| long as tcpdump is running. You probably want to run it in a background |
| activity using multiprocessing. |
| |
| Args: |
| tcpdump_proc: A tcpdump process that's writing text output with Radiotap |
| headers to its stdout stream. |
| out: A Python file-like object to write MCS information to. |
| """ |
| mcsre = re.compile(r'MCS (\d+)') |
| x = 0 |
| |
| for row in iter(tcpdump_proc.stdout.readline, b''): |
| x += 1 |
| match = mcsre.search(row) |
| if match: |
| mcs = int(match.group(1)) |
| out.write('{:02} '.format(mcs)) |
| else: |
| out.write(' . ') |
| |
| if x % 25 == 0: |
| out.write('\n') |
| |
| |
| def MCS(bssid, interface, report_dir=''): |
| """Runs tcpdump in the background to gather wireless MCS.""" |
| print 'Please enter password for `sudo` if prompted.' |
| subprocess.call(['sudo', '-v']) |
| |
| out = open(os.path.join(report_dir, 'mcs'), 'w+b') |
| err = open(os.path.join(report_dir, 'mcserr'), 'w+b') |
| |
| filt = ('(not subtype beacon and not subtype ack) and ' |
| '(wlan addr1 {0} or wlan addr2 {0} or wlan addr3 {0})'.format( |
| bssid, bssid, bssid)) |
| |
| sudo_tcpdump = subprocess.Popen(['sudo', 'tcpdump', '-Z', os.getlogin(), |
| '-Ilnei', interface, filt], |
| stdout=subprocess.PIPE, |
| stderr=err) |
| proc = multiprocessing.Process(target=MCSBackground, |
| args=(sudo_tcpdump, out)) |
| proc.start() |
| |
| return sudo_tcpdump, out, err |
| |
| |
| def Tcpdump(interface, bssid='', report_dir='', filtered=True): |
| """Runs tcpdump in the background to gather wireless MCS.""" |
| |
| cap = os.path.join(report_dir, 'testnetwork.pcap') |
| tcpdump_args = ['tcpdump', '-IUnei', interface, '-w', cap] |
| |
| login = os.getlogin() |
| if login != 'root': |
| print 'Please enter password for `sudo` if prompted.' |
| subprocess.call(['sudo', '-v']) |
| tcpdump_args = ['sudo'] + tcpdump_args + ['-Z', login] |
| |
| if filtered: |
| filt = '(not subtype beacon and not subtype ack)' |
| if bssid: |
| # pylint: disable=line-too-long |
| filt += ' and (wlan addr1 {0} or wlan addr2 {0} or wlan addr3 {0})'.format(bssid) |
| else: |
| logger.warning('Requested filtered tcpdump, but network BSSID not known.') |
| tcpdump_args += [filt] |
| |
| err = open(os.path.join(report_dir, 'mcserr'), 'w+b') |
| tcpdump_proc = subprocess.Popen(tcpdump_args, stderr=err) |
| |
| return tcpdump_proc, err |
| |