blob: 53a3c628748b2a7f3de2517f44f934a7bcc0f6ad [file] [log] [blame]
#!/usr/bin/env python
"""sample: measure wireless performance and write a report to the filesystem."""
import multiprocessing
import os
import platform
import re
import shutil
import subprocess
import sys
import tempfile
import time
import options
import report
optspec = """
sample [options...]
d,destination= host to run tests against []
s,steps= number of steps test was run from [10]
j,journal= append to journal tracking a series of test runs
i,interface= wireless interface to use for outgoing connections [{0}]
m,monitor= wireless monitor interface to use [{1}]
def Iperf(host, udp=False, bandwidth=20, bind=None):
"""Run iperf against host and report results."""
line = ['iperf', '-c', host]
prefix = 'iperf'
if udp:
line += ['-u', '-b', str(bandwidth * 1000000)]
prefix += 'u'
if bind:
line += ['-B', bind]
out = tempfile.NamedTemporaryFile(prefix=prefix)
subprocess.check_call(line, stdout=out)
return out
def MCSBackground(tcpdump, out):
"""Continually extract wireless MCS from a running `tcpdump` process.
This function will not return as long as tcpdump is running. You probably want
to run it in a background activity using multiprocessing.
tcpdump: a tcpdump process that's monitoring a wireless interface
and writing textual output to its stdout stream.
out: Python file-like object to write MCS information to.
mcs = re.compile(r'MCS (\d+)')
x = 0
for row in iter(tcpdump.stdout.readline, b''):
x += 1
match =
if match:
print >> out, '%2d ' % int(,
print >> out, ' . ',
if x % 25 == 0:
print >> out
def MCS(bssid, interface):
"""Runs tcpdump in the background to gather wireless MCS."""
print 'Please enter password for `sudo` if prompted.'['sudo', '-v'])
out = tempfile.NamedTemporaryFile(prefix='mcs')
err = tempfile.NamedTemporaryFile(prefix='mcserr')
filt = ('(not subtype beacon and not subtype ack) and '
'(wlan addr1 {0} or wlan addr2 {0} or wlan addr3 {0})'.format(
bssid, bssid, bssid))
sudo_tcpdump = subprocess.Popen(['sudo', 'tcpdump', '-Z', os.getlogin(),
'-Ilnei', interface, filt],
stdout=subprocess.PIPE, stderr=err)
proc = multiprocessing.Process(target=MCSBackground, args=(sudo_tcpdump, out))
return sudo_tcpdump, out, err
def IwLink(devname):
out = tempfile.NamedTemporaryFile(prefix='iwlink')
subprocess.check_call(['iw', 'dev', devname, 'link'], stdout=out)
return out
def IwScan(devname):
out = tempfile.NamedTemporaryFile(prefix='iwlink')
subprocess.check_call(['iw', 'dev', devname, 'scan', 'dump'], stdout=out)
if os.fstat(out.file.fileno()).st_size: return out
subprocess.check_call(['iw', 'dev', devname, 'scan'], stdout=out)
return out
def IpAddr():
out = tempfile.NamedTemporaryFile(prefix='ipaddr')
subprocess.check_call(['ip', '-o', '-f', 'inet', 'addr'], stdout=out)
return out
def AirportI():
"""Gather information about the current wireless network from `airport`."""
out = tempfile.NamedTemporaryFile(prefix='airport')
subprocess.check_call(['airport', '-I'], stdout=out)
return out
def AirportScan():
"""Gather information about other observable networks from `airport`."""
out = tempfile.NamedTemporaryFile(prefix='airportscan')
subprocess.check_call(['airport', '-s'], stdout=out)
return out
def main():
system = platform.system()
defaults = {
# on a modern MacBook, en0 is the AirPort and can monitor and send at once
'Darwin': ['en0', 'en0'],
# on Linux, separate client and monitor interfaces are needed.
# these defaults are for a TV box, other platforms will be different.
'Linux': ['wcli0', ''],
if not defaults.get(system):
raise OSError('Running on unsupported system {0}; '
'supported systems are {1}'.format(system,
' '.join(defaults.keys())))
o = options.Options(optspec.format(*defaults[system]))
(opt, _, extra) = o.parse(sys.argv[1:])
if extra:
o.fatal('did not understand supplied extra arguments.')
# we run diagnostics, write their output to files, and gather the files into
# a report that we present at the end of the run.
outputs = []
addr = ''
if system == 'Darwin':
ai = AirportI()
bssid = report.ParseAirportI(['BSSID']
outputs += [ai, AirportScan()]
elif system == 'Linux':
# It's really likely we're running on a device with more than one interface.
# Be sure we're using the one that we're trying to test.
ip = IpAddr()
addrmap = report.ParseIpAddr(
addr = addrmap.get(opt.interface)
if not addr:
raise ValueError('Interface {0} does not have an IPv4 address.'.format(
# because ip addr usually includes a subnet mask, which will prevent iperf
# from binding to the address
mask = addr.find('/')
if mask > -1:
addr = addr[:mask]
il = IwLink(opt.interface)
bssid = report.ParseIwLink(['BSSID']
ic = IwScan(opt.interface)
outputs += [ip, il, ic]
raise OSError('This script requires Mac OS X or Linux.')
if opt.monitor:
sudo_tcpdump, mcs_out, mcs_err = MCS(bssid, opt.monitor)
print 'Gathering tcpdump in background as',
outputs += [mcs_out, mcs_err]
it = Iperf(opt.destination, bind=addr)
# Empirically about 1.25x more packets make it through in UDP than TCP.
# Try to saturate the channel by sending a bit more than that over UDP.
it_iperf = report.ParseIperfTCP(
bandwidth = it_iperf.get('bandwidth', 0.01)
outputs += [it, Iperf(opt.destination, udp=True, bandwidth=bandwidth * 1.5,
if opt.monitor:
subprocess.check_call(['sudo', 'kill', str(])
report_name = 'wifi-{}-{:04}'.format(time.time(), opt.steps)
if opt.journal:
with open(opt.journal, 'a') as journal:
print >> journal, report_name
report_dir = os.path.join(os.path.dirname(opt.journal), report_name)
report_dir = report_name
for page in outputs:
os.path.join(report_dir, os.path.basename([:-6])))
print 'Report written to', report_dir
if __name__ == '__main__':