| #!/usr/bin/python |
| |
| """Tests for connection_manager.py.""" |
| |
| import logging |
| import os |
| import shutil |
| import socket |
| import struct |
| import subprocess |
| import tempfile |
| import time |
| |
| # This has to be called before another module calls it with a higher log level. |
| # pylint: disable=g-import-not-at-top |
| logging.basicConfig(level=logging.DEBUG) |
| |
| # This is in site-packages on the device, but not when running tests, and so |
| # raises lint errors. |
| # pylint: disable=g-bad-import-order |
| import wpactrl |
| |
| import experiment_testutils |
| import interface |
| from wvtest import wvtest |
| |
| |
| # pylint: disable=line-too-long |
| _IP_ADDR_SHOW_TPL = """4: {name}: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000 |
| inet {ip}/21 brd 100.100.55.255 scope global {name} |
| valid_lft forever preferred_lft forever |
| """ |
| |
| |
| class FakeInterfaceMixin(object): |
| """Replace Interface methods which interact with the system.""" |
| |
| def __init__(self, *args, **kwargs): |
| super(FakeInterfaceMixin, self).__init__(*args, **kwargs) |
| self.set_connection_check_result('succeed') |
| self.routing_table = {} |
| self.ip_testonly = None |
| |
| def _connection_check(self, *args, **kwargs): |
| result = super(FakeInterfaceMixin, self)._connection_check(*args, **kwargs) |
| if (self.current_routes().get('default', {}).get('via', None) != |
| self._gateway_ip): |
| return False |
| return result |
| |
| def set_connection_check_result(self, result): |
| if result in ['succeed', 'fail', 'restricted']: |
| # pylint: disable=invalid-name |
| self.CONNECTION_CHECK = './test/' + result |
| else: |
| raise ValueError('Invalid fake connection_check script.') |
| |
| def _really_ip_route(self, *args): |
| def can_add_route(): |
| def ip_to_int(ip): |
| return struct.unpack('!I', socket.inet_pton(socket.AF_INET, ip))[0] |
| |
| if args[1] != 'default': |
| return True |
| |
| via = ip_to_int(args[args.index('via') + 1]) |
| for (ifc, route, _), _ in self.routing_table.iteritems(): |
| if ifc != self.name: |
| continue |
| |
| netmask = 0 |
| if '/' in route: |
| route, netmask = route.split('/') |
| netmask = 32 - int(netmask) |
| route = ip_to_int(route) |
| |
| if (route >> netmask) == (via >> netmask): |
| return True |
| |
| return False |
| |
| if not args: |
| return '\n'.join(self.routing_table.values() + |
| ['1.2.3.4/24 dev fake0 proto kernel scope link', |
| # Non-subnet route, e.g. to NFS host. |
| '1.2.3.1 dev %s proto kernel scope link' % self.name, |
| 'default via 1.2.3.4 dev fake0', |
| 'random junk']) |
| |
| metric = None |
| if 'metric' in args: |
| metric = args[args.index('metric') + 1] |
| if args[0] in ('add', 'del'): |
| route = args[1] |
| key = (self.name, route, metric) |
| if args[0] == 'add' and key not in self.routing_table: |
| if not can_add_route(): |
| raise subprocess.CalledProcessError( |
| 'Tried to add default route without subnet route: %r', |
| self.routing_table) |
| logging.debug('Adding route for %r', key) |
| self.routing_table[key] = ' '.join(args[1:]) |
| elif args[0] == 'del': |
| if key in self.routing_table: |
| logging.debug('Deleting route for %r', key) |
| del self.routing_table[key] |
| elif key[2] is None: |
| # pylint: disable=g-builtin-op |
| for k in self.routing_table.keys(): |
| if k[:-1] == key[:-1]: |
| logging.debug('Deleting route for %r (generalized from %s)', k, key) |
| del self.routing_table[k] |
| break |
| |
| def _ip_addr_show(self): |
| if self.ip_testonly: |
| return _IP_ADDR_SHOW_TPL.format(name=self.name, ip=self.ip_testonly) |
| |
| return '' |
| |
| def current_routes_normal_testonly(self): |
| result = self.current_routes() |
| return {k: v for k, v in result.iteritems() if int(v.get('metric', 0)) < 50} |
| |
| |
| class Bridge(FakeInterfaceMixin, interface.Bridge): |
| pass |
| |
| |
| class FakeWPACtrl(object): |
| """Fake wpactrl.WPACtrl.""" |
| |
| # pylint: disable=unused-argument |
| def __init__(self, wpa_socket): |
| self._socket = wpa_socket |
| self.events = [] |
| self.attached = False |
| self.connected = False |
| self.ssid_testonly = None |
| self.secure_testonly = False |
| self.request_status_fails = False |
| |
| def pending(self): |
| self.check_socket_exists('pending: socket does not exist') |
| return bool(self.events) |
| |
| def recv(self): |
| self.check_socket_exists('recv: socket does not exist') |
| return self.events.pop(0) |
| |
| def attach(self): |
| if not os.path.exists(self._socket): |
| raise wpactrl.error('wpactrl_attach failed') |
| self.attached = True |
| |
| def detach(self): |
| self.attached = False |
| self.ssid_testonly = None |
| self.secure_testonly = False |
| self.connected = False |
| self.check_socket_exists('wpactrl_detach failed') |
| |
| def request(self, request_type): |
| if request_type == 'STATUS': |
| if self.request_status_fails: |
| raise wpactrl.error('test error') |
| if self.connected: |
| return ('foo\nwpa_state=COMPLETED\nssid=%s\nkey_mgmt=%s\nbar' % |
| (self.ssid_testonly, |
| 'WPA2-PSK' if self.secure_testonly else 'NONE')) |
| else: |
| return 'wpa_state=SCANNING\naddress=12:34:56:78:90:ab' |
| else: |
| raise ValueError('Invalid request_type %s' % request_type) |
| |
| @property |
| def ctrl_iface_path(self): |
| return os.path.split(self._socket)[0] |
| |
| # Below methods are not part of WPACtrl. |
| |
| def add_event(self, event): |
| self.events.append(event) |
| |
| def add_connected_event(self): |
| self.connected = True |
| self.add_event(Wifi.CONNECTED_EVENT) |
| |
| def add_disconnected_event(self): |
| self.connected = False |
| self.add_event(Wifi.DISCONNECTED_EVENT) |
| |
| def add_terminating_event(self): |
| self.connected = False |
| self.add_event(Wifi.TERMINATING_EVENT) |
| |
| def check_socket_exists(self, msg='Fake socket does not exist'): |
| if not os.path.exists(self._socket): |
| raise wpactrl.error(msg) |
| |
| |
| class Wifi(FakeInterfaceMixin, interface.Wifi): |
| """Fake Wifi for testing.""" |
| |
| CONNECTED_EVENT = '<2>CTRL-EVENT-CONNECTED' |
| DISCONNECTED_EVENT = '<2>CTRL-EVENT-DISCONNECTED' |
| TERMINATING_EVENT = '<2>CTRL-EVENT-TERMINATING' |
| |
| WPACtrl = FakeWPACtrl |
| |
| def __init__(self, *args, **kwargs): |
| super(Wifi, self).__init__(*args, **kwargs) |
| self._initial_ssid_testonly = None |
| self._secure_testonly = False |
| |
| def attach_wpa_control(self, path): |
| if self._initial_ssid_testonly and self._wpa_control: |
| self._wpa_control.connected = True |
| super(Wifi, self).attach_wpa_control(path) |
| |
| def get_wpa_control(self, *args, **kwargs): |
| result = super(Wifi, self).get_wpa_control(*args, **kwargs) |
| if self._initial_ssid_testonly: |
| result.connected = True |
| result.ssid_testonly = self._initial_ssid_testonly |
| result.secure_testonly = self._secure_testonly |
| return result |
| |
| def add_connected_event(self): |
| if self.attached(): |
| self._wpa_control.add_connected_event() |
| |
| def add_disconnected_event(self): |
| self._initial_ssid_testonly = None |
| self._secure_testonly = False |
| if self.attached(): |
| self._wpa_control.add_disconnected_event() |
| |
| def add_terminating_event(self): |
| self._initial_ssid_testonly = None |
| self._secure_testonly = False |
| if self.attached(): |
| self._wpa_control.add_terminating_event() |
| |
| def detach_wpa_control(self): |
| self._initial_ssid_testonly = None |
| self._secure_testonly = False |
| super(Wifi, self).detach_wpa_control() |
| |
| def start_wpa_supplicant_testonly(self, path): |
| wpa_socket = os.path.join(path, self.name) |
| logging.debug('Starting fake wpa_supplicant for %s: %s', |
| self.name, wpa_socket) |
| open(wpa_socket, 'w') |
| |
| def kill_wpa_supplicant_testonly(self, path): |
| logging.debug('Killing fake wpa_supplicant for %s', self.name) |
| if self.attached(): |
| self.detach_wpa_control() |
| os.unlink(os.path.join(path, self.name)) |
| else: |
| raise RuntimeError('Trying to kill wpa_supplicant while not attached') |
| |
| |
| class FrenzyWPACtrl(interface.FrenzyWPACtrl): |
| |
| def __init__(self, *args, **kwargs): |
| super(FrenzyWPACtrl, self).__init__(*args, **kwargs) |
| self.ssid_testonly = None |
| self.secure_testonly = False |
| self.request_status_fails = False |
| |
| def _qcsapi(self, *command): |
| return self.fake_qcsapi.get(command[0], None) |
| |
| def add_connected_event(self): |
| self.fake_qcsapi['get_mode'] = 'Station' |
| self.fake_qcsapi['get_ssid'] = self.ssid_testonly |
| security = 'PSKAuthentication' if self.secure_testonly else 'NONE' |
| self.fake_qcsapi['ssid_get_authentication_mode'] = security |
| |
| def add_disconnected_event(self): |
| self.ssid_testonly = None |
| self.secure_testonly = False |
| self.fake_qcsapi['get_ssid'] = None |
| self.fake_qcsapi['ssid_get_authentication_mode'] = 'NONE' |
| |
| def add_terminating_event(self): |
| self.ssid_testonly = None |
| self.secure_testonly = False |
| self.fake_qcsapi['get_ssid'] = None |
| self.fake_qcsapi['get_mode'] = 'AP' |
| self.fake_qcsapi['ssid_get_authentication_mode'] = 'NONE' |
| |
| def detach(self): |
| self.add_terminating_event() |
| super(FrenzyWPACtrl, self).detach() |
| |
| def request(self, request_type): |
| if request_type == 'STATUS' and self.request_status_fails: |
| raise wpactrl.error('test error') |
| |
| return super(FrenzyWPACtrl, self).request(request_type) |
| |
| |
| class FrenzyWifi(FakeInterfaceMixin, interface.FrenzyWifi): |
| WPACtrl = FrenzyWPACtrl |
| |
| def __init__(self, *args, **kwargs): |
| super(FrenzyWifi, self).__init__(*args, **kwargs) |
| self._initial_ssid_testonly = None |
| self._secure_testonly = False |
| self.fake_qcsapi = {} |
| |
| def attach_wpa_control(self, *args, **kwargs): |
| super(FrenzyWifi, self).attach_wpa_control(*args, **kwargs) |
| if self._wpa_control: |
| self._wpa_control.ssid_testonly = self._initial_ssid_testonly |
| self._wpa_control.secure_testonly = self._secure_testonly |
| if self._initial_ssid_testonly: |
| self._wpa_control.add_connected_event() |
| |
| def get_wpa_control(self, *args, **kwargs): |
| result = super(FrenzyWifi, self).get_wpa_control(*args, **kwargs) |
| result.fake_qcsapi = self.fake_qcsapi |
| if self._initial_ssid_testonly: |
| result.fake_qcsapi['get_mode'] = 'Station' |
| result.ssid_testonly = self._initial_ssid_testonly |
| result.secure_testonly = self._secure_testonly |
| result.add_connected_event() |
| return result |
| |
| def add_connected_event(self): |
| if self.attached(): |
| self._wpa_control.add_connected_event() |
| |
| def add_disconnected_event(self): |
| self._initial_ssid_testonly = None |
| self._secure_testonly = False |
| if self.attached(): |
| self._wpa_control.add_disconnected_event() |
| |
| def add_terminating_event(self): |
| self._initial_ssid_testonly = None |
| self._secure_testonly = False |
| if self.attached(): |
| self._wpa_control.add_terminating_event() |
| |
| def detach_wpa_control(self): |
| self._initial_ssid_testonly = None |
| self._secure_testonly = False |
| super(FrenzyWifi, self).detach_wpa_control() |
| |
| def start_wpa_supplicant_testonly(self, unused_path): |
| logging.debug('Starting fake wpa_supplicant for %s', self.name) |
| self.fake_qcsapi['get_mode'] = 'Station' |
| |
| def kill_wpa_supplicant_testonly(self, unused_path): |
| logging.debug('Killing fake wpa_supplicant for %s', self.name) |
| if self.attached(): |
| # This happens to do what we need. |
| self.add_terminating_event() |
| self.detach_wpa_control() |
| else: |
| raise RuntimeError('Trying to kill wpa_supplicant while not attached') |
| |
| |
| @wvtest.wvtest |
| def bridge_test(): |
| """Test Interface and Bridge.""" |
| tmp_dir = tempfile.mkdtemp() |
| |
| try: |
| autoprov_filepath = os.path.join(tmp_dir, 'autoprov') |
| b = Bridge('br0', '10', acs_autoprovisioning_filepath=autoprov_filepath) |
| b.set_connection_check_result('succeed') |
| |
| wvtest.WVFAIL(b.acs()) |
| wvtest.WVFAIL(b.internet()) |
| wvtest.WVFAIL(b.current_routes()) |
| wvtest.WVFAIL(b.current_routes_normal_testonly()) |
| wvtest.WVFAIL(os.path.exists(autoprov_filepath)) |
| |
| b.add_moca_station(0) |
| wvtest.WVFAIL(os.path.exists(autoprov_filepath)) |
| b.set_subnet('192.168.1.0/24') |
| b.set_gateway_ip('192.168.1.1') |
| wvtest.WVFAIL(os.path.exists(autoprov_filepath)) |
| # Everything should fail because the interface is not initialized. |
| wvtest.WVFAIL(b.acs()) |
| wvtest.WVFAIL(b.internet()) |
| wvtest.WVFAIL(b.current_routes_normal_testonly()) |
| wvtest.WVFAIL(os.path.exists(autoprov_filepath)) |
| b.initialize() |
| wvtest.WVPASS(b.acs()) |
| wvtest.WVPASS(b.internet()) |
| current_routes = b.current_routes() |
| wvtest.WVPASSEQ(len(current_routes), 3) |
| wvtest.WVPASS('default' in current_routes) |
| wvtest.WVPASS('subnet' in current_routes) |
| wvtest.WVPASS('multicast' in current_routes) |
| wvtest.WVPASS(os.path.exists(autoprov_filepath)) |
| |
| b.add_moca_station(1) |
| wvtest.WVPASS(b.acs()) |
| wvtest.WVPASS(b.internet()) |
| wvtest.WVPASSEQ(len(b.current_routes()), 3) |
| wvtest.WVPASS(os.path.exists(autoprov_filepath)) |
| |
| b.remove_moca_station(0) |
| b.remove_moca_station(1) |
| wvtest.WVFAIL(b.acs()) |
| wvtest.WVFAIL(b.internet()) |
| # We have no links, so should have no routes. |
| wvtest.WVFAIL(b.current_routes()) |
| wvtest.WVFAIL(os.path.exists(autoprov_filepath)) |
| |
| b.add_moca_station(2) |
| wvtest.WVPASS(b.acs()) |
| wvtest.WVPASS(b.internet()) |
| wvtest.WVPASSEQ(len(b.current_routes()), 3) |
| wvtest.WVPASS(os.path.exists(autoprov_filepath)) |
| |
| b.set_connection_check_result('fail') |
| b.update_routes() |
| wvtest.WVFAIL(b.acs()) |
| wvtest.WVFAIL(b.internet()) |
| # We have links but the connection check failed, so we should only have a |
| # low priority route, i.e. metric at least 50. |
| wvtest.WVPASSEQ(len(b.current_routes()), 3) |
| wvtest.WVFAIL(b.current_routes_normal_testonly()) |
| wvtest.WVFAIL(os.path.exists(autoprov_filepath)) |
| |
| b.set_connection_check_result('restricted') |
| b.update_routes() |
| wvtest.WVPASS(b.acs()) |
| wvtest.WVFAIL(b.internet()) |
| wvtest.WVPASSEQ(len(b.current_routes()), 3) |
| wvtest.WVPASS(os.path.exists(autoprov_filepath)) |
| |
| wvtest.WVFAIL(b.get_ip_address()) |
| b.ip_testonly = '192.168.1.100' |
| wvtest.WVPASSEQ(b.get_ip_address(), '192.168.1.100') |
| |
| # Get a new gateway/subnet (e.g. due to joining a new network). |
| # Not on the subnet; adding IP should fail. |
| b.set_gateway_ip('192.168.2.1') |
| wvtest.WVFAIL('default' in b.current_routes()) |
| wvtest.WVPASS('subnet' in b.current_routes()) |
| # Without a default route, the connection check should fail. |
| wvtest.WVFAIL(b.acs()) |
| |
| # Now we get the subnet and should add updated subnet and gateway routes. |
| b.set_subnet('192.168.2.0/24') |
| wvtest.WVPASSEQ(b.current_routes()['default']['via'], '192.168.2.1') |
| wvtest.WVPASSLE(int(b.current_routes()['default']['metric']), 50) |
| wvtest.WVPASSEQ(b.current_routes()['subnet']['route'], '192.168.2.0/24') |
| wvtest.WVPASSLE(int(b.current_routes()['subnet']['metric']), 50) |
| wvtest.WVPASS(b.acs()) |
| |
| # If we have no subnet, make sure that both subnet and default routes are |
| # removed. |
| b.set_subnet(None) |
| wvtest.WVFAIL('subnet' in b.current_routes()) |
| wvtest.WVFAIL('default' in b.current_routes()) |
| |
| # Now repeat the new-network test, but with a faulty connection. Make sure |
| # the metrics are set appropriately. |
| b.set_connection_check_result('fail') |
| b.set_subnet('192.168.3.0/24') |
| b.set_gateway_ip('192.168.3.1') |
| wvtest.WVPASSGE(int(b.current_routes()['default']['metric']), 50) |
| wvtest.WVPASSGE(int(b.current_routes()['subnet']['metric']), 50) |
| |
| # Now test deleting only the gateway IP. |
| b.set_gateway_ip(None) |
| wvtest.WVPASS('subnet' in b.current_routes()) |
| wvtest.WVFAIL('default' in b.current_routes()) |
| |
| finally: |
| shutil.rmtree(tmp_dir) |
| |
| |
| def generic_wifi_test(w, wpa_path): |
| # Not currently connected. |
| w.start_wpa_supplicant_testonly(wpa_path) |
| w.attach_wpa_control(wpa_path) |
| wvtest.WVFAIL(w.wpa_supplicant) |
| |
| # pylint: disable=protected-access |
| wpa_control = w._wpa_control |
| |
| # wpa_supplicant connects. |
| wpa_control.ssid_testonly = 'my=ssid' |
| wpa_control.add_connected_event() |
| wvtest.WVFAIL(w.wpa_supplicant) |
| w.handle_wpa_events() |
| wvtest.WVPASS(w.wpa_supplicant) |
| w.set_gateway_ip('192.168.1.1') |
| |
| # wpa_supplicant disconnects. |
| wpa_control.add_disconnected_event() |
| w.handle_wpa_events() |
| wvtest.WVFAIL(w.wpa_supplicant) |
| |
| # Now, start over so we can test what happens when wpa_supplicant is already |
| # connected when we attach. |
| w.detach_wpa_control() |
| # pylint: disable=protected-access |
| w._initial_ssid_testonly = 'my=ssid' |
| w._initialized = False |
| w.attach_wpa_control(wpa_path) |
| wpa_control = w._wpa_control |
| |
| # wpa_supplicant was already connected when we attached. |
| wvtest.WVPASS(w.wpa_supplicant) |
| wvtest.WVPASSEQ(w.initial_ssid, 'my=ssid') |
| w.initialize() |
| wvtest.WVPASSEQ(w.initial_ssid, None) |
| |
| wvtest.WVPASSNE(w.wpa_status(), {}) |
| w._wpa_control.request_status_fails = True |
| wvtest.WVPASSEQ(w.wpa_status(), {}) |
| |
| # The wpa_supplicant process disconnects and terminates. |
| wpa_control.add_disconnected_event() |
| wpa_control.add_terminating_event() |
| w.handle_wpa_events() |
| wvtest.WVFAIL(w.wpa_supplicant) |
| |
| |
| @wvtest.wvtest |
| def wifi_test(): |
| """Test Wifi.""" |
| w = Wifi('wcli0', '21') |
| w.set_connection_check_result('succeed') |
| w.initialize() |
| |
| try: |
| wpa_path = tempfile.mkdtemp() |
| generic_wifi_test(w, wpa_path) |
| |
| finally: |
| shutil.rmtree(wpa_path) |
| |
| |
| @wvtest.wvtest |
| def frenzy_wifi_test(): |
| """Test FrenzyWifi.""" |
| w = FrenzyWifi('wlan0', '20') |
| w.set_connection_check_result('succeed') |
| w.initialize() |
| |
| try: |
| wpa_path = tempfile.mkdtemp() |
| FrenzyWifi.WPACtrl.WIFIINFO_PATH = wifiinfo_path = tempfile.mkdtemp() |
| |
| generic_wifi_test(w, wpa_path) |
| |
| finally: |
| shutil.rmtree(wpa_path) |
| shutil.rmtree(wifiinfo_path) |
| |
| |
| @wvtest.wvtest |
| def simulate_wireless_test(): |
| """Test the WifiSimulateWireless experiment.""" |
| unused_raii = experiment_testutils.MakeExperimentDirs() |
| |
| tmp_dir = tempfile.mkdtemp() |
| interface.CWMP_PATH = tempfile.mkdtemp() |
| interface.MAX_ACS_FAILURE_S = 1 |
| |
| contact = os.path.join(interface.CWMP_PATH, 'acscontact') |
| connected = os.path.join(interface.CWMP_PATH, 'acsconnected') |
| |
| try: |
| autoprov_filepath = os.path.join(tmp_dir, 'autoprov') |
| b = Bridge('br0', '10', acs_autoprovisioning_filepath=autoprov_filepath) |
| b.add_moca_station(0) |
| b.set_gateway_ip('192.168.1.1') |
| b.set_subnet('192.168.1.0/24') |
| b.set_connection_check_result('succeed') |
| b.initialize() |
| |
| # Initially, the connection check passes. |
| wvtest.WVPASS(b.internet()) |
| |
| # Enable the experiment. |
| experiment_testutils.enable('WifiSimulateWireless') |
| # Calling update_routes overwrites the connection status cache, which we |
| # need in order to see the effects we are looking for immediately |
| # (ConnectionManager calls this every few seconds). |
| b.update_routes() |
| wvtest.WVFAIL(b.internet()) |
| |
| # Create an ACS connection attempt. |
| open(contact, 'w') |
| b.update_routes() |
| wvtest.WVFAIL(b.internet()) |
| |
| # Record success. |
| open(connected, 'w') |
| b.update_routes() |
| wvtest.WVFAIL(b.internet()) |
| |
| # Disable the experiment and the connection check should pass again. |
| experiment_testutils.disable('WifiSimulateWireless') |
| b.update_routes() |
| wvtest.WVPASS(b.internet()) |
| |
| # Reenable the experiment and the connection check should fail again. |
| experiment_testutils.enable('WifiSimulateWireless') |
| b.update_routes() |
| wvtest.WVFAIL(b.internet()) |
| |
| # Wait until we've failed for long enough for the experiment to "expire", |
| # then log another attempt without success. Make sure the connection check |
| # passes. |
| time.sleep(interface.MAX_ACS_FAILURE_S) |
| open(contact, 'w') |
| b.update_routes() |
| wvtest.WVPASS(b.internet()) |
| |
| finally: |
| shutil.rmtree(tmp_dir) |
| shutil.rmtree(interface.CWMP_PATH) |
| |
| |
| if __name__ == '__main__': |
| wvtest.wvtest_main() |