| """ifstats: get information about a wireless interface.""" |
| |
| import os |
| import pipes |
| import re |
| |
| |
| class InterfaceStats(object): |
| """InterfaceStats collects statistics from a wireless interface.""" |
| |
| def __init__(self, system=None, runner=None, interface=None): |
| self.system = system |
| self.run = runner |
| self.interface = interface |
| |
| def IwLink(self, devname): |
| """Invoke `iw dev <devname> link` and return its output as a string.""" |
| return self.run('iw dev {} link'.format(pipes.quote(devname))) |
| |
| def IwScan(self, devname): |
| """Invoke `iw dev <devname> scan` and return its output as a string.""" |
| devname_quoted = pipes.quote(devname) |
| text = self.run('iw dev {} scan dump'.format(devname_quoted)) |
| if not text: |
| text = self.run('iw dev {} scan'.format(devname_quoted)) |
| return text |
| |
| def IpAddr(self): |
| """Invoke `ip addr` in one-line mode and return output as a string.""" |
| return self.run('ip -o -f inet addr') |
| |
| def AirportI(self): |
| """Gather information about the current wireless network from `airport`.""" |
| return self.run('airport -I') |
| |
| def AirportScan(self): |
| """Gather information about other observable networks from `airport`.""" |
| return self.run('airport -s') |
| |
| def SystemProfiler(self): |
| """Gather information about wireless capabilities from `system_profiler`. |
| |
| On a Mac with 802.11ac wireless, this will also contain a wireless network |
| scan result with more information than `airport -s` provides. |
| """ |
| return self.run('system_profiler -xml SPAirPortDataType') |
| |
| def GatherDarwin(self): |
| """Gather wireless network information on Darwin (Mac OS X).""" |
| return { |
| 'airport': self.AirportI(), |
| 'airportscan': self.AirportScan(), |
| 'systemprofiler': self.SystemProfiler() |
| } |
| |
| def GatherLinux(self): |
| """Gather wireless network information on Linux.""" |
| outputs = {'ipaddr': self.IpAddr()} |
| if self.interface is not None: |
| outputs.update({ |
| 'iwlink': self.IwLink(self.interface), |
| 'iwscan': self.IwScan(self.interface) |
| }) |
| return outputs |
| |
| # If no interface was supplied, use the first connected one (like `airport` |
| # does for you automatically.) |
| iw_dev_raw = self.run('iw dev') |
| for line in iw_dev_raw.splitlines(): |
| tokens = line.split() |
| if len(tokens) >= 2 and tokens[0] == 'Interface': |
| interface = tokens[1] |
| try: |
| outputs.update({ |
| 'iwlink': self.IwLink(interface), |
| 'iwscan': self.IwScan(interface) |
| }) |
| return outputs |
| except ValueError: |
| pass |
| |
| raise ValueError('No connected interfaces found.') |
| |
| def Gather(self): |
| """Gather all the wireless network information we know how to.""" |
| if self.system == 'Darwin': |
| return self.GatherDarwin() |
| elif self.system == 'Linux': |
| return self.GatherLinux() |
| else: |
| raise OSError('System {} unsupported for InterfaceStats'.format( |
| self.system)) |
| |
| |
| def ParseIwLink(text): |
| ol = text.splitlines() |
| |
| # BSSID is in the first line, in an idiosyncratic format. |
| # sample: Connected to d8:c7:c8:d7:72:30 (on wlan0) |
| m = re.search(r'(\w{2}:){5}\w{2}', ol[0]) |
| if m: |
| result = {'BSSID': m.group(0)} |
| else: |
| raise ValueError('dev was not connected.') |
| |
| for line in ol[1:]: |
| try: |
| key, value = line.split(':', 1) |
| result[key.strip()] = value.strip() |
| except ValueError: |
| continue |
| |
| return result |
| |
| def ParseIpAddr(text): |
| ol = text.splitlines() |
| result = {} |
| for line in ol: |
| _, interface, _, addr, _ = line.split(None, 4) |
| result[interface] = addr |
| |
| return result |
| |
| |
| def ParseAirportI(text): |
| result = {} |
| for line in text.splitlines(): |
| try: |
| key, value = [cell.strip() for cell in line.split(':', 1)] |
| if key in ['agrCtlRSSI', 'agrCtlNoise']: |
| result[key] = int(value) |
| else: |
| result[key] = value |
| except ValueError: |
| continue |
| |
| return result |
| |
| def ParseAirportScan(text): |
| # This is a simple fixed-width format. |
| header = ['SSID', 'BSSID', 'RSSI', 'CHANNEL', 'HT', 'CC', |
| 'SECURITY (auth/unicast/group)'] |
| result = [] |
| |
| chre = re.compile(r'\d+(?:,\+|-\d+)?') |
| for line in text.splitlines(): |
| ssid, bssid, rssi, channel, ht, cc, security = ( |
| [cell.strip() for cell in (line[:32], line[33:50], line[51:55], |
| line[56:63], line[64:66], line[67:69], |
| line[70:])]) |
| |
| # the scan sometimes includes comment lines. assume that anything that has |
| # a valid channel isn't a comment line. |
| if chre.match(channel): |
| result += [[ssid, bssid, int(rssi), channel, ht, cc, security]] |
| |
| return [header] + result |
| |
| def Parse(system, cache): |
| if system == 'Darwin': |
| return { |
| 'link': ParseAirportI(cache.get('airport')), |
| 'scan': ParseAirportScan(cache.get('airportscan')) |
| } |
| elif system == 'Linux': |
| return { |
| 'link': ParseIwLink(cache.get('iwlink')), |
| 'addr': ParseIpAddr(cache.get('ipaddr')) |
| } |
| else: |
| raise OSError('Attempt to parse cache from ' |
| 'unrecognized system {}'.format(system)) |
| |
| def Restore(report_dir): |
| """Restore an InterfaceStats cache from data on the filesystem.""" |
| cache = {} |
| |
| apath = os.path.join(report_dir, 'airport') |
| if os.path.exists(apath): |
| system = 'Darwin' |
| names = ['airport', 'airportscan'] |
| else: |
| iwpath = os.path.join(report_dir, 'iwlink') |
| if os.path.exists(iwpath): |
| system = 'Linux' |
| names = ['ipaddr', 'iwlink', 'iwscan'] |
| else: |
| raise IOError('Could not open report in {}'.format(report_dir)) |
| |
| for name in names: |
| try: |
| with open(os.path.join(report_dir, name)) as infile: |
| cache[name] = infile.read() |
| except IOError: |
| cache[name] = '' |
| |
| return system, cache |
| |