blob: a148f72ba9e62105ac39ac06e739d6f65923dce8 [file] [log] [blame]
"""ifstats: get information about a network interface."""
import os
import pipes
import re
class InterfaceStats(object):
"""InterfaceStats collects statistics from a network interface.
It's specialized for wireless interfaces on OS X and Linux, but will handle
wired interfaces too. This is useful when testing a router that you have
connected to over Ethernet.
"""
def __init__(self, system=None, runner=None, interface=None):
self.system = system
self.run = runner
self.interface = interface
def IpAddr(self):
"""Invoke `ip addr` in one-line mode and return output as a string."""
return self.run('ip -o -f inet addr')
def IwLink(self, devname):
"""Invoke `iw dev <devname> link` and return its output as a string."""
return self.run('iw dev {} link'.format(pipes.quote(devname)))
def IwScan(self, devname):
"""Invoke `iw dev <devname> scan` and return its output as a string."""
devname_quoted = pipes.quote(devname)
text = self.run('iw dev {} scan dump'.format(devname_quoted))
if not text:
text = self.run('iw dev {} scan'.format(devname_quoted))
return text
def Ifconfig(self):
"""Gather information about known network interfaces from `ifconfig`."""
return self.run('ifconfig')
def AirportI(self):
"""Gather information about the current wireless network from `airport`."""
return self.run('airport -I')
def AirportScan(self):
"""Gather information about other observable networks from `airport`."""
return self.run('airport -s')
def SystemProfiler(self):
"""Gather information about wireless capabilities from `system_profiler`.
On a Mac with 802.11ac wireless, this will also contain a wireless network
scan result with more information than `airport -s` provides.
Returns:
A string containing the output of `system_profiler`, and attributes as
described in the Fabric documentation.
"""
return self.run('system_profiler -xml SPAirPortDataType')
def GatherDarwin(self):
"""Gather wireless network information on Darwin (Mac OS X)."""
outputs = {
'ifconfig': self.Ifconfig(),
'airport': self.AirportI(),
'airportscan': self.AirportScan(),
'systemprofiler': self.SystemProfiler()
}
return outputs
def GatherLinux(self):
"""Gather wireless network information on Linux."""
outputs = {'ipaddr': self.IpAddr()}
if self.interface is not None:
outputs.update({
'iwlink': self.IwLink(self.interface),
'iwscan': self.IwScan(self.interface)
})
return outputs
# If no interface was supplied, try to use the first connected one (like
# `airport` does for you automatically.)
iw_dev_raw = self.run('iw dev')
for line in iw_dev_raw.splitlines():
tokens = line.split()
if len(tokens) >= 2 and tokens[0] == 'Interface':
interface = tokens[1]
try:
outputs.update({
'iwlink': self.IwLink(interface),
'iwscan': self.IwScan(interface)
})
break
except ValueError:
pass
return outputs
def Gather(self):
"""Gather all the wireless network information we know how to."""
if self.system == 'Darwin':
return self.GatherDarwin()
elif self.system == 'Linux':
return self.GatherLinux()
else:
raise OSError('System {} unsupported for InterfaceStats'.format(
self.system))
def ParseIwLink(text):
"""Parse the text output by `iw dev <devname> link` into a dict."""
ol = text.splitlines()
if not ol:
return {}
# BSSID is in the first line, in an idiosyncratic format.
# sample: Connected to d8:c7:c8:d7:72:30 (on wlan0)
m = re.search(r'(\w{2}:){5}\w{2}', ol[0])
if not m:
return {}
result = {'BSSID': m.group(0)}
for line in ol[1:]:
try:
key, value = line.split(':', 1)
result[key.strip()] = value.strip()
except ValueError:
continue
return result
def ParseIpAddr(text):
ol = text.splitlines()
result = {}
for line in ol:
_, interface, _, addr, _ = line.split(None, 4)
result[interface] = addr
return result
def ParseAirportI(text):
"""Parse the text output by `airport -i` into a dict."""
result = {}
for line in text.splitlines():
try:
key, value = [cell.strip() for cell in line.split(':', 1)]
if key in ['agrCtlRSSI', 'agrCtlNoise']:
result[key] = int(value)
else:
result[key] = value
except ValueError:
continue
return result
def ParseAirportScan(text):
"""Parse the text output by `airport -S` into a table.
The table is similar to what would be returned by a csv.reader, and suitable
for output by a csv.writer.
Arguments:
text: a string containing the text output of `airport -s`
Returns:
a list of lists, where the first list is a header row (all strings)
and subsequent rows are mixed strings and integers.
"""
# This is a simple fixed-width format.
header = ['SSID', 'BSSID', 'RSSI', 'CHANNEL', 'HT', 'CC',
'SECURITY (auth/unicast/group)']
result = []
chre = re.compile(r'\d+(?:,\+|-\d+)?')
for line in text.splitlines():
ssid, bssid, rssi, channel, ht, cc, security = (
[cell.strip() for cell in (line[:32], line[33:50], line[51:55],
line[56:63], line[64:66], line[67:69],
line[70:])])
# the scan sometimes includes comment lines. assume that anything that has
# a valid channel isn't a comment line.
if chre.match(channel):
result += [[ssid, bssid, int(rssi), channel, ht, cc, security]]
return [header] + result
def Parse(system, cache):
if system == 'Darwin':
return {
'link': ParseAirportI(cache.get('airport', '')),
'scan': ParseAirportScan(cache.get('airportscan', ''))
}
elif system == 'Linux':
return {
'link': ParseIwLink(cache.get('iwlink', '')),
'addr': ParseIpAddr(cache.get('ipaddr', ''))
}
else:
raise OSError('Attempt to parse cache from '
'unrecognized system {}'.format(system))
def Restore(report_dir):
"""Restore an InterfaceStats cache from data on the filesystem."""
cache = {}
system_results = {
'Darwin': ['ifconfig', 'airport', 'airportscan', 'systemprofiler'],
'Linux': ['ipaddr', 'iwlink', 'iwscan'],
}
for system, names in system_results.items():
read = 0
for name in names:
try:
with open(os.path.join(report_dir, name)) as infile:
cache[name] = infile.read()
read += 1
except IOError:
cache[name] = ''
if read:
return system, cache
raise IOError('Could not open report in {}'.format(report_dir))