blob: e8f0876a07fe3f76fa0a05cca5e592d5af050fdb [file] [log] [blame]
#!/usr/bin/python
"""Tests for connection_manager.py."""
import logging
import os
import shutil
import socket
import struct
import subprocess
import tempfile
import time
# This has to be called before another module calls it with a higher log level.
# pylint: disable=g-import-not-at-top
logging.basicConfig(level=logging.DEBUG)
# This is in site-packages on the device, but not when running tests, and so
# raises lint errors.
# pylint: disable=g-bad-import-order
import wpactrl
import experiment_testutils
import interface
from wvtest import wvtest
# pylint: disable=line-too-long
_IP_ADDR_SHOW_TPL = """4: {name}: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000
inet {ip}/21 brd 100.100.55.255 scope global {name}
valid_lft forever preferred_lft forever
"""
class FakeInterfaceMixin(object):
"""Replace Interface methods which interact with the system."""
def __init__(self, *args, **kwargs):
super(FakeInterfaceMixin, self).__init__(*args, **kwargs)
self.set_connection_check_result('succeed')
self.routing_table = {}
self.ip_testonly = None
def _connection_check(self, *args, **kwargs):
result = super(FakeInterfaceMixin, self)._connection_check(*args, **kwargs)
if (self.current_routes().get('default', {}).get('via', None) !=
self._gateway_ip):
return False
return result
def set_connection_check_result(self, result):
if result in ['succeed', 'fail', 'restricted']:
# pylint: disable=invalid-name
self.CONNECTION_CHECK = './test/' + result
else:
raise ValueError('Invalid fake connection_check script.')
def _really_ip_route(self, *args):
def can_add_route():
def ip_to_int(ip):
return struct.unpack('!I', socket.inet_pton(socket.AF_INET, ip))[0]
if args[1] != 'default':
return True
via = ip_to_int(args[args.index('via') + 1])
for (ifc, route, _), _ in self.routing_table.iteritems():
if ifc != self.name:
continue
netmask = 0
if '/' in route:
route, netmask = route.split('/')
netmask = 32 - int(netmask)
route = ip_to_int(route)
if (route >> netmask) == (via >> netmask):
return True
return False
if not args:
return '\n'.join(self.routing_table.values() +
['1.2.3.4/24 dev fake0 proto kernel scope link',
# Non-subnet route, e.g. to NFS host.
'1.2.3.1 dev %s proto kernel scope link' % self.name,
'default via 1.2.3.4 dev fake0',
'random junk'])
metric = None
if 'metric' in args:
metric = args[args.index('metric') + 1]
if args[0] in ('add', 'del'):
route = args[1]
key = (self.name, route, metric)
if args[0] == 'add' and key not in self.routing_table:
if not can_add_route():
raise subprocess.CalledProcessError(
'Tried to add default route without subnet route: %r',
self.routing_table)
logging.debug('Adding route for %r', key)
self.routing_table[key] = ' '.join(args[1:])
elif args[0] == 'del':
if key in self.routing_table:
logging.debug('Deleting route for %r', key)
del self.routing_table[key]
elif key[2] is None:
# pylint: disable=g-builtin-op
for k in self.routing_table.keys():
if k[:-1] == key[:-1]:
logging.debug('Deleting route for %r (generalized from %s)', k, key)
del self.routing_table[k]
break
def _ip_addr_show(self):
if self.ip_testonly:
return _IP_ADDR_SHOW_TPL.format(name=self.name, ip=self.ip_testonly)
return ''
def current_routes_normal_testonly(self):
result = self.current_routes()
return {k: v for k, v in result.iteritems() if int(v.get('metric', 0)) < 50}
class Bridge(FakeInterfaceMixin, interface.Bridge):
pass
class FakeWPACtrl(object):
"""Fake wpactrl.WPACtrl."""
# pylint: disable=unused-argument
def __init__(self, wpa_socket):
self._socket = wpa_socket
self.events = []
self.attached = False
self.connected = False
self.ssid_testonly = None
self.secure_testonly = False
self.request_status_fails = False
def pending(self):
self.check_socket_exists('pending: socket does not exist')
return bool(self.events)
def recv(self):
self.check_socket_exists('recv: socket does not exist')
return self.events.pop(0)
def attach(self):
if not os.path.exists(self._socket):
raise wpactrl.error('wpactrl_attach failed')
self.attached = True
def detach(self):
self.attached = False
self.ssid_testonly = None
self.secure_testonly = False
self.connected = False
self.check_socket_exists('wpactrl_detach failed')
def request(self, request_type):
if request_type == 'STATUS':
if self.request_status_fails:
raise wpactrl.error('test error')
return self.wpa_cli_status_testonly()
else:
raise ValueError('Invalid request_type %s' % request_type)
@property
def ctrl_iface_path(self):
return os.path.split(self._socket)[0]
# Below methods are not part of WPACtrl.
def add_event(self, event):
self.events.append(event)
def add_connected_event(self):
self.connected = True
self.add_event(Wifi.CONNECTED_EVENT)
def add_disconnected_event(self):
self.connected = False
self.add_event(Wifi.DISCONNECTED_EVENT)
def add_terminating_event(self):
self.connected = False
self.add_event(Wifi.TERMINATING_EVENT)
def check_socket_exists(self, msg='Fake socket does not exist'):
if not os.path.exists(self._socket):
raise wpactrl.error(msg)
def wpa_cli_status_testonly(self):
if self.connected:
return ('foo\nwpa_state=COMPLETED\nssid=%s\nkey_mgmt=%s\nbar' %
(self.ssid_testonly,
'WPA2-PSK' if self.secure_testonly else 'NONE'))
else:
return 'wpa_state=SCANNING\naddress=12:34:56:78:90:ab'
class Wifi(FakeInterfaceMixin, interface.Wifi):
"""Fake Wifi for testing."""
CONNECTED_EVENT = '<2>CTRL-EVENT-CONNECTED'
DISCONNECTED_EVENT = '<2>CTRL-EVENT-DISCONNECTED'
TERMINATING_EVENT = '<2>CTRL-EVENT-TERMINATING'
WPACtrl = FakeWPACtrl
def __init__(self, *args, **kwargs):
super(Wifi, self).__init__(*args, **kwargs)
self._initial_ssid_testonly = None
self._secure_testonly = False
def attach_wpa_control(self, path):
if self._initial_ssid_testonly and self._wpa_control:
self._wpa_control.connected = True
super(Wifi, self).attach_wpa_control(path)
def get_wpa_control(self, *args, **kwargs):
result = super(Wifi, self).get_wpa_control(*args, **kwargs)
if self._initial_ssid_testonly:
result.connected = True
result.ssid_testonly = self._initial_ssid_testonly
result.secure_testonly = self._secure_testonly
return result
def add_connected_event(self):
if self.attached():
self._wpa_control.add_connected_event()
def add_disconnected_event(self):
self._initial_ssid_testonly = None
self._secure_testonly = False
if self.attached():
self._wpa_control.add_disconnected_event()
def add_terminating_event(self):
self._initial_ssid_testonly = None
self._secure_testonly = False
if self.attached():
self._wpa_control.add_terminating_event()
def detach_wpa_control(self):
self._initial_ssid_testonly = None
self._secure_testonly = False
super(Wifi, self).detach_wpa_control()
def wpa_cli_status(self):
# This is just a convenient way of keeping things dry; the actual wpa_cli
# status makes a subprocess call which returns the same string.
return self._wpa_control.wpa_cli_status_testonly()
def start_wpa_supplicant_testonly(self, path):
wpa_socket = os.path.join(path, self.name)
logging.debug('Starting fake wpa_supplicant for %s: %s',
self.name, wpa_socket)
open(wpa_socket, 'w')
def kill_wpa_supplicant_testonly(self, path):
logging.debug('Killing fake wpa_supplicant for %s', self.name)
if self.attached():
self.detach_wpa_control()
os.unlink(os.path.join(path, self.name))
else:
raise RuntimeError('Trying to kill wpa_supplicant while not attached')
class FrenzyWPACtrl(interface.FrenzyWPACtrl):
def __init__(self, *args, **kwargs):
super(FrenzyWPACtrl, self).__init__(*args, **kwargs)
self.ssid_testonly = None
self.secure_testonly = False
self.request_status_fails = False
def _qcsapi(self, *command):
return self.fake_qcsapi.get(command[0], None)
def add_connected_event(self):
self.fake_qcsapi['get_mode'] = 'Station'
self.fake_qcsapi['get_ssid'] = self.ssid_testonly
security = 'PSKAuthentication' if self.secure_testonly else 'NONE'
self.fake_qcsapi['ssid_get_authentication_mode'] = security
def add_disconnected_event(self):
self.ssid_testonly = None
self.secure_testonly = False
self.fake_qcsapi['get_ssid'] = None
self.fake_qcsapi['ssid_get_authentication_mode'] = 'NONE'
def add_terminating_event(self):
self.ssid_testonly = None
self.secure_testonly = False
self.fake_qcsapi['get_ssid'] = None
self.fake_qcsapi['get_mode'] = 'AP'
self.fake_qcsapi['ssid_get_authentication_mode'] = 'NONE'
def detach(self):
self.add_terminating_event()
super(FrenzyWPACtrl, self).detach()
class FrenzyWifi(FakeInterfaceMixin, interface.FrenzyWifi):
WPACtrl = FrenzyWPACtrl
def __init__(self, *args, **kwargs):
super(FrenzyWifi, self).__init__(*args, **kwargs)
self._initial_ssid_testonly = None
self._secure_testonly = False
self.fake_qcsapi = {}
def attach_wpa_control(self, *args, **kwargs):
super(FrenzyWifi, self).attach_wpa_control(*args, **kwargs)
if self._wpa_control:
self._wpa_control.ssid_testonly = self._initial_ssid_testonly
self._wpa_control.secure_testonly = self._secure_testonly
if self._initial_ssid_testonly:
self._wpa_control.add_connected_event()
def get_wpa_control(self, *args, **kwargs):
result = super(FrenzyWifi, self).get_wpa_control(*args, **kwargs)
result.fake_qcsapi = self.fake_qcsapi
if self._initial_ssid_testonly:
result.fake_qcsapi['get_mode'] = 'Station'
result.ssid_testonly = self._initial_ssid_testonly
result.secure_testonly = self._secure_testonly
result.add_connected_event()
return result
def add_connected_event(self):
if self.attached():
self._wpa_control.add_connected_event()
def add_disconnected_event(self):
self._initial_ssid_testonly = None
self._secure_testonly = False
if self.attached():
self._wpa_control.add_disconnected_event()
def add_terminating_event(self):
self._initial_ssid_testonly = None
self._secure_testonly = False
if self.attached():
self._wpa_control.add_terminating_event()
def detach_wpa_control(self):
self._initial_ssid_testonly = None
self._secure_testonly = False
super(FrenzyWifi, self).detach_wpa_control()
def start_wpa_supplicant_testonly(self, unused_path):
logging.debug('Starting fake wpa_supplicant for %s', self.name)
self.fake_qcsapi['get_mode'] = 'Station'
def kill_wpa_supplicant_testonly(self, unused_path):
logging.debug('Killing fake wpa_supplicant for %s', self.name)
if self.attached():
# This happens to do what we need.
self.add_terminating_event()
self.detach_wpa_control()
else:
raise RuntimeError('Trying to kill wpa_supplicant while not attached')
@wvtest.wvtest
def bridge_test():
"""Test Interface and Bridge."""
tmp_dir = tempfile.mkdtemp()
try:
autoprov_filepath = os.path.join(tmp_dir, 'autoprov')
b = Bridge('br0', '10', acs_autoprovisioning_filepath=autoprov_filepath)
b.set_connection_check_result('succeed')
wvtest.WVFAIL(b.acs())
wvtest.WVFAIL(b.internet())
wvtest.WVFAIL(b.current_routes())
wvtest.WVFAIL(b.current_routes_normal_testonly())
wvtest.WVFAIL(os.path.exists(autoprov_filepath))
b.add_moca_station(0)
wvtest.WVFAIL(os.path.exists(autoprov_filepath))
b.set_subnet('192.168.1.0/24')
b.set_gateway_ip('192.168.1.1')
wvtest.WVFAIL(os.path.exists(autoprov_filepath))
# Everything should fail because the interface is not initialized.
wvtest.WVFAIL(b.acs())
wvtest.WVFAIL(b.internet())
wvtest.WVFAIL(b.current_routes_normal_testonly())
wvtest.WVFAIL(os.path.exists(autoprov_filepath))
b.initialize()
wvtest.WVPASS(b.acs())
wvtest.WVPASS(b.internet())
current_routes = b.current_routes()
wvtest.WVPASSEQ(len(current_routes), 3)
wvtest.WVPASS('default' in current_routes)
wvtest.WVPASS('subnet' in current_routes)
wvtest.WVPASS('multicast' in current_routes)
wvtest.WVPASS(os.path.exists(autoprov_filepath))
b.add_moca_station(1)
wvtest.WVPASS(b.acs())
wvtest.WVPASS(b.internet())
wvtest.WVPASSEQ(len(b.current_routes()), 3)
wvtest.WVPASS(os.path.exists(autoprov_filepath))
b.remove_moca_station(0)
b.remove_moca_station(1)
wvtest.WVFAIL(b.acs())
wvtest.WVFAIL(b.internet())
# We have no links, so should have no routes.
wvtest.WVFAIL(b.current_routes())
wvtest.WVFAIL(os.path.exists(autoprov_filepath))
b.add_moca_station(2)
wvtest.WVPASS(b.acs())
wvtest.WVPASS(b.internet())
wvtest.WVPASSEQ(len(b.current_routes()), 3)
wvtest.WVPASS(os.path.exists(autoprov_filepath))
b.set_connection_check_result('fail')
b.update_routes()
wvtest.WVFAIL(b.acs())
wvtest.WVFAIL(b.internet())
# We have links but the connection check failed, so we should only have a
# low priority route, i.e. metric at least 50.
wvtest.WVPASSEQ(len(b.current_routes()), 3)
wvtest.WVFAIL(b.current_routes_normal_testonly())
wvtest.WVFAIL(os.path.exists(autoprov_filepath))
b.set_connection_check_result('restricted')
b.update_routes()
wvtest.WVPASS(b.acs())
wvtest.WVFAIL(b.internet())
wvtest.WVPASSEQ(len(b.current_routes()), 3)
wvtest.WVPASS(os.path.exists(autoprov_filepath))
wvtest.WVFAIL(b.get_ip_address())
b.ip_testonly = '192.168.1.100'
wvtest.WVPASSEQ(b.get_ip_address(), '192.168.1.100')
# Get a new gateway/subnet (e.g. due to joining a new network).
# Not on the subnet; adding IP should fail.
b.set_gateway_ip('192.168.2.1')
wvtest.WVFAIL('default' in b.current_routes())
wvtest.WVPASS('subnet' in b.current_routes())
# Without a default route, the connection check should fail.
wvtest.WVFAIL(b.acs())
# Now we get the subnet and should add updated subnet and gateway routes.
b.set_subnet('192.168.2.0/24')
wvtest.WVPASSEQ(b.current_routes()['default']['via'], '192.168.2.1')
wvtest.WVPASSLE(int(b.current_routes()['default']['metric']), 50)
wvtest.WVPASSEQ(b.current_routes()['subnet']['route'], '192.168.2.0/24')
wvtest.WVPASSLE(int(b.current_routes()['subnet']['metric']), 50)
wvtest.WVPASS(b.acs())
# If we have no subnet, make sure that both subnet and default routes are
# removed.
b.set_subnet(None)
wvtest.WVFAIL('subnet' in b.current_routes())
wvtest.WVFAIL('default' in b.current_routes())
# Now repeat the new-network test, but with a faulty connection. Make sure
# the metrics are set appropriately.
b.set_connection_check_result('fail')
b.set_subnet('192.168.3.0/24')
b.set_gateway_ip('192.168.3.1')
wvtest.WVPASSGE(int(b.current_routes()['default']['metric']), 50)
wvtest.WVPASSGE(int(b.current_routes()['subnet']['metric']), 50)
# Now test deleting only the gateway IP.
b.set_gateway_ip(None)
wvtest.WVPASS('subnet' in b.current_routes())
wvtest.WVFAIL('default' in b.current_routes())
finally:
shutil.rmtree(tmp_dir)
def generic_wifi_test(w, wpa_path):
# Not currently connected.
w.start_wpa_supplicant_testonly(wpa_path)
w.attach_wpa_control(wpa_path)
wvtest.WVFAIL(w.wpa_supplicant)
# pylint: disable=protected-access
wpa_control = w._wpa_control
# wpa_supplicant connects.
wpa_control.ssid_testonly = 'my=ssid'
wpa_control.add_connected_event()
wvtest.WVFAIL(w.wpa_supplicant)
w.handle_wpa_events()
wvtest.WVPASS(w.wpa_supplicant)
w.set_gateway_ip('192.168.1.1')
# wpa_supplicant disconnects.
wpa_control.add_disconnected_event()
w.handle_wpa_events()
wvtest.WVFAIL(w.wpa_supplicant)
# Now, start over so we can test what happens when wpa_supplicant is already
# connected when we attach.
w.detach_wpa_control()
# pylint: disable=protected-access
w._initial_ssid_testonly = 'my=ssid'
w._initialized = False
w.attach_wpa_control(wpa_path)
wpa_control = w._wpa_control
# wpa_supplicant was already connected when we attached.
wvtest.WVPASS(w.wpa_supplicant)
wvtest.WVPASSEQ(w.initial_ssid, 'my=ssid')
w.initialize()
wvtest.WVPASSEQ(w.initial_ssid, None)
wvtest.WVPASSNE(w.wpa_status(), {})
w._wpa_control.request_status_fails = True
wvtest.WVPASSNE(w.wpa_status(), {})
# The wpa_supplicant process disconnects and terminates.
wpa_control.add_disconnected_event()
wpa_control.add_terminating_event()
w.handle_wpa_events()
wvtest.WVFAIL(w.wpa_supplicant)
@wvtest.wvtest
def wifi_test():
"""Test Wifi."""
w = Wifi('wcli0', '21')
w.set_connection_check_result('succeed')
w.initialize()
try:
wpa_path = tempfile.mkdtemp()
generic_wifi_test(w, wpa_path)
finally:
shutil.rmtree(wpa_path)
@wvtest.wvtest
def frenzy_wifi_test():
"""Test FrenzyWifi."""
w = FrenzyWifi('wlan0', '20')
w.set_connection_check_result('succeed')
w.initialize()
try:
wpa_path = tempfile.mkdtemp()
FrenzyWifi.WPACtrl.WIFIINFO_PATH = wifiinfo_path = tempfile.mkdtemp()
generic_wifi_test(w, wpa_path)
finally:
shutil.rmtree(wpa_path)
shutil.rmtree(wifiinfo_path)
@wvtest.wvtest
def simulate_wireless_test():
"""Test the WifiSimulateWireless experiment."""
unused_raii = experiment_testutils.MakeExperimentDirs()
tmp_dir = tempfile.mkdtemp()
interface.CWMP_PATH = tempfile.mkdtemp()
interface.MAX_ACS_FAILURE_S = 1
contact = os.path.join(interface.CWMP_PATH, 'acscontact')
connected = os.path.join(interface.CWMP_PATH, 'acsconnected')
try:
autoprov_filepath = os.path.join(tmp_dir, 'autoprov')
b = Bridge('br0', '10', acs_autoprovisioning_filepath=autoprov_filepath)
b.add_moca_station(0)
b.set_gateway_ip('192.168.1.1')
b.set_subnet('192.168.1.0/24')
b.set_connection_check_result('succeed')
b.initialize()
# Initially, the connection check passes.
wvtest.WVPASS(b.internet())
# Enable the experiment.
experiment_testutils.enable('WifiSimulateWireless')
# Calling update_routes overwrites the connection status cache, which we
# need in order to see the effects we are looking for immediately
# (ConnectionManager calls this every few seconds).
b.update_routes()
wvtest.WVFAIL(b.internet())
# Create an ACS connection attempt.
open(contact, 'w')
b.update_routes()
wvtest.WVFAIL(b.internet())
# Record success.
open(connected, 'w')
b.update_routes()
wvtest.WVFAIL(b.internet())
# Disable the experiment and the connection check should pass again.
experiment_testutils.disable('WifiSimulateWireless')
b.update_routes()
wvtest.WVPASS(b.internet())
# Reenable the experiment and the connection check should fail again.
experiment_testutils.enable('WifiSimulateWireless')
b.update_routes()
wvtest.WVFAIL(b.internet())
# Wait until we've failed for long enough for the experiment to "expire",
# then log another attempt without success. Make sure the connection check
# passes.
time.sleep(interface.MAX_ACS_FAILURE_S)
open(contact, 'w')
b.update_routes()
wvtest.WVPASS(b.internet())
finally:
shutil.rmtree(tmp_dir)
shutil.rmtree(interface.CWMP_PATH)
if __name__ == '__main__':
wvtest.wvtest_main()