| """ifstats: get information about a network interface.""" |
| |
| import os |
| import pipes |
| import re |
| |
| |
| class InterfaceStats(object): |
| """InterfaceStats collects statistics from a network interface. |
| |
| It's specialized for wireless interfaces on OS X and Linux, but will handle |
| wired interfaces too. This is useful when testing a router that you have |
| connected to over Ethernet. |
| """ |
| |
| def __init__(self, system=None, runner=None, interface=None): |
| self.system = system |
| self.run = runner |
| self.interface = interface |
| |
| def IpAddr(self): |
| """Invoke `ip addr` in one-line mode and return output as a string.""" |
| return self.run('ip -o -f inet addr') |
| |
| def IwLink(self, devname): |
| """Invoke `iw dev <devname> link` and return its output as a string.""" |
| return self.run('iw dev {} link'.format(pipes.quote(devname))) |
| |
| def IwScan(self, devname): |
| """Invoke `iw dev <devname> scan` and return its output as a string.""" |
| devname_quoted = pipes.quote(devname) |
| text = self.run('iw dev {} scan dump'.format(devname_quoted)) |
| if not text: |
| text = self.run('iw dev {} scan'.format(devname_quoted)) |
| return text |
| |
| def Ifconfig(self): |
| """Gather information about known network interfaces from `ifconfig`.""" |
| return self.run('ifconfig') |
| |
| def AirportI(self): |
| """Gather information about the current wireless network from `airport`.""" |
| return self.run('airport -I') |
| |
| def AirportScan(self): |
| """Gather information about other observable networks from `airport`.""" |
| return self.run('airport -s') |
| |
| def SystemProfiler(self): |
| """Gather information about wireless capabilities from `system_profiler`. |
| |
| On a Mac with 802.11ac wireless, this will also contain a wireless network |
| scan result with more information than `airport -s` provides. |
| |
| Returns: |
| A string containing the output of `system_profiler`, and attributes as |
| described in the Fabric documentation. |
| """ |
| return self.run('system_profiler -xml SPAirPortDataType') |
| |
| def GatherDarwin(self): |
| """Gather wireless network information on Darwin (Mac OS X).""" |
| outputs = { |
| 'ifconfig': self.Ifconfig(), |
| 'airport': self.AirportI(), |
| 'airportscan': self.AirportScan(), |
| 'systemprofiler': self.SystemProfiler() |
| } |
| return outputs |
| |
| def GatherLinux(self): |
| """Gather wireless network information on Linux.""" |
| outputs = {'ipaddr': self.IpAddr()} |
| |
| if self.interface is not None: |
| outputs.update({ |
| 'iwlink': self.IwLink(self.interface), |
| 'iwscan': self.IwScan(self.interface) |
| }) |
| return outputs |
| |
| # If no interface was supplied, try to use the first connected one (like |
| # `airport` does for you automatically.) |
| iw_dev_raw = self.run('iw dev') |
| for line in iw_dev_raw.splitlines(): |
| tokens = line.split() |
| if len(tokens) >= 2 and tokens[0] == 'Interface': |
| interface = tokens[1] |
| try: |
| outputs.update({ |
| 'iwlink': self.IwLink(interface), |
| 'iwscan': self.IwScan(interface) |
| }) |
| break |
| except ValueError: |
| pass |
| |
| return outputs |
| |
| def Gather(self): |
| """Gather all the wireless network information we know how to.""" |
| if self.system == 'Darwin': |
| return self.GatherDarwin() |
| elif self.system == 'Linux': |
| return self.GatherLinux() |
| else: |
| raise OSError('System {} unsupported for InterfaceStats'.format( |
| self.system)) |
| |
| |
| def ParseIwLink(text): |
| """Parse the text output by `iw dev <devname> link` into a dict.""" |
| ol = text.splitlines() |
| if not ol: |
| return {} |
| |
| # BSSID is in the first line, in an idiosyncratic format. |
| # sample: Connected to d8:c7:c8:d7:72:30 (on wlan0) |
| m = re.search(r'(\w{2}:){5}\w{2}', ol[0]) |
| if not m: |
| return {} |
| |
| result = {'BSSID': m.group(0)} |
| for line in ol[1:]: |
| try: |
| key, value = line.split(':', 1) |
| result[key.strip()] = value.strip() |
| except ValueError: |
| continue |
| |
| return result |
| |
| |
| def ParseIpAddr(text): |
| ol = text.splitlines() |
| result = {} |
| for line in ol: |
| _, interface, _, addr, _ = line.split(None, 4) |
| result[interface] = addr |
| |
| return result |
| |
| |
| def ParseAirportI(text): |
| """Parse the text output by `airport -i` into a dict.""" |
| result = {} |
| for line in text.splitlines(): |
| try: |
| key, value = [cell.strip() for cell in line.split(':', 1)] |
| if key in ['agrCtlRSSI', 'agrCtlNoise']: |
| result[key] = int(value) |
| else: |
| result[key] = value |
| except ValueError: |
| continue |
| |
| return result |
| |
| |
| def ParseAirportScan(text): |
| """Parse the text output by `airport -S` into a table. |
| |
| The table is similar to what would be returned by a csv.reader, and suitable |
| for output by a csv.writer. |
| |
| Arguments: |
| text: a string containing the text output of `airport -s` |
| Returns: |
| a list of lists, where the first list is a header row (all strings) |
| and subsequent rows are mixed strings and integers. |
| """ |
| # This is a simple fixed-width format. |
| header = ['SSID', 'BSSID', 'RSSI', 'CHANNEL', 'HT', 'CC', |
| 'SECURITY (auth/unicast/group)'] |
| result = [] |
| |
| chre = re.compile(r'\d+(?:,\+|-\d+)?') |
| for line in text.splitlines(): |
| ssid, bssid, rssi, channel, ht, cc, security = ( |
| [cell.strip() for cell in (line[:32], line[33:50], line[51:55], |
| line[56:63], line[64:66], line[67:69], |
| line[70:])]) |
| |
| # the scan sometimes includes comment lines. assume that anything that has |
| # a valid channel isn't a comment line. |
| if chre.match(channel): |
| result += [dict(zip(header, |
| [ssid, bssid, int(rssi), channel, ht, cc, security]))] |
| |
| return result |
| |
| |
| def Parse(system, cache): |
| if system == 'Darwin': |
| return { |
| 'link': ParseAirportI(cache.get('airport', '')), |
| 'scan': ParseAirportScan(cache.get('airportscan', '')) |
| } |
| elif system == 'Linux': |
| return { |
| 'link': ParseIwLink(cache.get('iwlink', '')), |
| 'addr': ParseIpAddr(cache.get('ipaddr', '')) |
| } |
| else: |
| raise OSError('Attempt to parse cache from ' |
| 'unrecognized system {}'.format(system)) |
| |
| |
| def Restore(report_dir): |
| """Restore an InterfaceStats cache from data on the filesystem.""" |
| cache = {} |
| system_results = { |
| 'Darwin': ['ifconfig', 'airport', 'airportscan', 'systemprofiler'], |
| 'Linux': ['ipaddr', 'iwlink', 'iwscan'], |
| } |
| |
| read = set() |
| for system, names in system_results.items(): |
| for name in names: |
| try: |
| with open(os.path.join(report_dir, name)) as infile: |
| cache[name] = infile.read() |
| read.add(system) |
| except IOError: |
| cache[name] = '' |
| |
| if read: |
| return read, cache |
| |
| raise IOError('Could not open report in {}'.format(report_dir)) |
| |