blob: ae277a921a1157e73e3b06dc500abe9f1d4974b9 [file] [log] [blame]
"""ifstats: get information about a wireless interface."""
import os
import pipes
import re
class InterfaceStats(object):
"""InterfaceStats collects statistics from a wireless interface."""
def __init__(self, system=None, runner=None, interface=None):
self.system = system
self.run = runner
self.interface = interface
def IwLink(self, devname):
"""Invoke `iw dev <devname> link` and return its output as a string."""
return self.run('iw dev {} link'.format(pipes.quote(devname)))
def IwScan(self, devname):
"""Invoke `iw dev <devname> scan` and return its output as a string."""
devname_quoted = pipes.quote(devname)
text = self.run('iw dev {} scan dump'.format(devname_quoted))
if not text:
text = self.run('iw dev {} scan'.format(devname_quoted))
return text
def IpAddr(self):
"""Invoke `ip addr` in one-line mode and return output as a string."""
return self.run('ip -o -f inet addr')
def AirportI(self):
"""Gather information about the current wireless network from `airport`."""
return self.run('airport -I')
def AirportScan(self):
"""Gather information about other observable networks from `airport`."""
return self.run('airport -s')
def GatherDarwin(self):
"""Gather wireless network information on Darwin (Mac OS X)."""
return {
'airport': self.AirportI(),
'airportscan': self.AirportScan(),
}
def GatherLinux(self):
"""Gather wireless network information on Linux."""
outputs = {'ipaddr': self.IpAddr()}
if self.interface is not None:
outputs.update({
'iwlink': self.IwLink(self.interface),
'iwscan': self.IwScan(self.interface)
})
return outputs
# If no interface was supplied, use the first connected one (like `airport`
# does for you automatically.)
iw_dev_raw = self.run('iw dev')
for line in iw_dev_raw.splitlines():
tokens = line.split()
if len(tokens) >= 2 and tokens[0] == 'Interface':
interface = tokens[1]
try:
outputs.update({
'iwlink': self.IwLink(interface),
'iwscan': self.IwScan(interface)
})
return outputs
except ValueError:
pass
raise ValueError('No connected interfaces found.')
def Gather(self):
"""Gather all the wireless network information we know how to."""
if self.system == 'Darwin':
return self.GatherDarwin()
elif self.system == 'Linux':
return self.GatherLinux()
else:
raise OSError('System {} unsupported for InterfaceStats'.format(
self.system))
def ParseIwLink(text):
ol = text.splitlines()
# BSSID is in the first line, in an idiosyncratic format.
# sample: Connected to d8:c7:c8:d7:72:30 (on wlan0)
m = re.search(r'(\w{2}:){5}\w{2}', ol[0])
if m:
result = {'BSSID': m.group(0)}
else:
raise ValueError('dev was not connected.')
for line in ol[1:]:
try:
key, value = line.split(':', 1)
result[key.strip()] = value.strip()
except ValueError:
continue
return result
def ParseIpAddr(text):
ol = text.splitlines()
result = {}
for line in ol:
_, interface, _, addr, _ = line.split(None, 4)
result[interface] = addr
return result
def ParseAirportI(text):
result = {}
for line in text.splitlines():
try:
key, value = [cell.strip() for cell in line.split(':', 1)]
if key in ['agrCtlRSSI', 'agrCtlNoise']:
result[key] = int(value)
else:
result[key] = value
except ValueError:
continue
return result
def ParseAirportScan(text):
# This is a simple fixed-width format.
header = ['SSID', 'BSSID', 'RSSI', 'CHANNEL', 'HT', 'CC',
'SECURITY (auth/unicast/group)']
result = []
chre = re.compile(r'\d+(?:,\+|-\d+)?')
for line in text.splitlines():
ssid, bssid, rssi, channel, ht, cc, security = (
[cell.strip() for cell in (line[:32], line[33:50], line[51:55],
line[56:63], line[64:66], line[67:69],
line[70:])])
# the scan sometimes includes comment lines. assume that anything that has
# a valid channel isn't a comment line.
if chre.match(channel):
result += [[ssid, bssid, int(rssi), channel, ht, cc, security]]
return [header] + result
def Parse(system, cache):
if system == 'Darwin':
return {
'link': ParseAirportI(cache.get('airport')),
'scan': ParseAirportScan(cache.get('airportscan'))
}
elif system == 'Linux':
return {
'link': ParseIwLink(cache.get('iwlink')),
'addr': ParseIpAddr(cache.get('ipaddr'))
}
else:
raise OSError('Attempt to parse cache from '
'unrecognized system {}'.format(system))
def Restore(report_dir):
"""Restore an InterfaceStats cache from data on the filesystem."""
cache = {}
apath = os.path.join(report_dir, 'airport')
if os.path.exists(apath):
system = 'Darwin'
names = ['airport', 'airportscan']
else:
iwpath = os.path.join(report_dir, 'iwlink')
if os.path.exists(iwpath):
system = 'Linux'
names = ['ipaddr', 'iwlink', 'iwscan']
else:
raise IOError('Could not open report in {}'.format(report_dir))
for name in names:
try:
with open(os.path.join(report_dir, name)) as infile:
cache[name] = infile.read()
except IOError:
cache[name] = ''
return system, cache