conman: Fake all subprocesses consistently.
conman unit tests currently fake subprocess calls in a variety of ad
hoc methods.
This adds a fake subprocess module that delegates subprocess calls to
fake Python implementations. This made it possible to rewrite the
conman unit tests to be much more consistent, organized, and bug-free.
This rewrite uncovered some small bugs (all changes to non-test files
comprise the resulting fixes, or related cleanup).
BUG=31772343
Change-Id: Iebfba1c985d877ff2c3849a90300cc3fd83b8dda
diff --git a/conman/Makefile b/conman/Makefile
index 0faf301..6126f84 100644
--- a/conman/Makefile
+++ b/conman/Makefile
@@ -15,7 +15,7 @@
%.test: %_test.py
echo ./$<
- PYTHONPATH=..:./test/fake_wpactrl:$(PYTHONPATH) ./$<
+ PYTHONPATH=..:./test/fake_python:$(PYTHONPATH) ./$<
runtests: \
$(patsubst %_test.py,%.test,$(wildcard *_test.py))
diff --git a/conman/connection_manager.py b/conman/connection_manager.py
index 333e050..b4c3887 100755
--- a/conman/connection_manager.py
+++ b/conman/connection_manager.py
@@ -245,7 +245,8 @@
wpa_control_interface='/var/run/wpa_supplicant',
run_duration_s=1, interface_update_period=5,
wifi_scan_period_s=120, wlan_retry_s=120, associate_wait_s=15,
- dhcp_wait_s=10, acs_start_wait_s=20, acs_finish_wait_s=120,
+ dhcp_wait_s=10, acs_connection_check_wait_s=1,
+ acs_start_wait_s=20, acs_finish_wait_s=120,
bssid_cycle_length_s=30):
self._tmp_dir = tmp_dir
@@ -260,6 +261,7 @@
self._wlan_retry_s = wlan_retry_s
self._associate_wait_s = associate_wait_s
self._dhcp_wait_s = dhcp_wait_s
+ self._acs_connection_check_wait_s = acs_connection_check_wait_s
self._acs_start_wait_s = acs_start_wait_s
self._acs_finish_wait_s = acs_finish_wait_s
self._bssid_cycle_length_s = bssid_cycle_length_s
@@ -366,7 +368,10 @@
ratchet.Condition('trying_open', wifi.connected_to_open,
self._associate_wait_s,
callback=wifi.expire_connection_status_cache),
- ratchet.Condition('waiting_for_dhcp', wifi.gateway, self._dhcp_wait_s,
+ ratchet.Condition('waiting_for_dhcp', wifi.gateway,
+ self._dhcp_wait_s),
+ ratchet.Condition('acs_connection_check', wifi.acs,
+ self._acs_connection_check_wait_s,
callback=self.cwmp_wakeup),
ratchet.FileTouchedCondition('waiting_for_cwmp_wakeup',
os.path.join(CWMP_PATH, 'acscontact'),
@@ -445,16 +450,18 @@
1. Process any changes in watched files.
2. Check interfaces for changed connectivity, if
update_interfaces_and_routes is true.
- 3. Start, stop, or restart access points as appropriate. If running an
+ 3. Try to upload logs, if we just joined a new open network.
+ 4. Start, stop, or restart access points as appropriate. If running an
access point, skip all remaining wifi steps for that band.
- 3. Handle any wpa_supplicant events.
- 4. Periodically, perform a wifi scan.
- 5. If not connected to the WLAN or to the ACS, try to connect to something.
- 6. If connected to the ACS but not the WLAN, and enough time has passed
+ 5. Handle any wpa_supplicant events.
+ 6. Periodically, perform a wifi scan.
+ 7. If not connected to the WLAN or to the ACS, try to connect to something.
+ 8. If connected to the ACS but not the WLAN, and enough time has passed
since connecting that we should expect a current WLAN configuration, try
to join the WLAN again.
- 7. Sleep for the rest of the duration of _run_duration_s.
+ 9. Sleep for the rest of the duration of _run_duration_s.
"""
+
start_time = _gettime()
self.notifier.process_events()
while self.notifier.check_events():
@@ -466,12 +473,22 @@
self._interface_update_counter = 0
self._update_interfaces_and_routes()
+ if self.acs() and self._try_to_upload_logs:
+ self._try_upload_logs()
+ self._try_to_upload_logs = False
+
for wifi in self.wifi:
- continue_wifi = False
if self.currently_provisioning(wifi):
+ logging.debug('Currently provisioning, nothing else to do.')
continue
provisioning_failed = self.provisioning_failed(wifi)
+ if provisioning_failed and (
+ getattr(wifi, 'last_attempted_bss_info', None) ==
+ getattr(wifi, 'last_successful_bss_info', None)):
+ wifi.last_successful_bss_info = None
+
+ continue_wifi = False
# Only one wlan_configuration per interface will have access_point ==
# True. Try 5 GHz first, then 2.4 GHz. If both bands are supported by
@@ -549,9 +566,6 @@
wifi.status.connected_to_wlan = False
if self.acs():
logging.debug('Connected to ACS')
- if self._try_to_upload_logs:
- self._try_upload_logs()
- self._try_to_upload_logs = False
if wifi.acs():
wifi.last_successful_bss_info = getattr(wifi,
@@ -858,7 +872,7 @@
wifi.last_attempted_bss_info = bss_info
return subprocess.call(self.WIFI_SETCLIENT +
['--ssid', bss_info.ssid,
- '--band', wifi.bands[0],
+ '--band', bss_info.band,
'--bssid', bss_info.bssid]) == 0
def _connected_to_wlan(self, wifi):
@@ -876,6 +890,7 @@
band = wlan_configuration.band
current = self._wlan_configuration.get(band, None)
if current is None or wlan_configuration.command != current.command:
+ logging.debug('Received new WLAN configuration for band %s', band)
if current is not None:
wlan_configuration.access_point = current.access_point
else:
diff --git a/conman/connection_manager_test.py b/conman/connection_manager_test.py
index d7d0a40..22cf89f 100755
--- a/conman/connection_manager_test.py
+++ b/conman/connection_manager_test.py
@@ -5,8 +5,10 @@
import logging
import os
import shutil
+import subprocess # Fake subprocess module in test/fake_python.
import tempfile
import time
+import traceback
import connection_manager
import experiment_testutils
@@ -29,92 +31,26 @@
}
"""
-WIFI_SHOW_OUTPUT_MARVELL8897 = """Band: 2.4
-RegDomain: US
-Interface: wlan0 # 2.4 GHz ap
-Channel: 149
-BSSID: f4:f5:e8:81:1b:a0
-AutoChannel: True
-AutoType: NONDFS
-Station List for band: 2.4
-Client Interface: wcli0 # 2.4 GHz client
-Client BSSID: f4:f5:e8:81:1b:a1
-
-Band: 5
-RegDomain: US
-Interface: wlan0 # 5 GHz ap
-Channel: 149
-BSSID: f4:f5:e8:81:1b:a0
-AutoChannel: True
-AutoType: NONDFS
-Station List for band: 5
-
-Client Interface: wcli0 # 5 GHz client
-Client BSSID: f4:f5:e8:81:1b:a1
-"""
-
-WIFI_SHOW_OUTPUT_ATH9K_ATH10K = """Band: 2.4
-RegDomain: US
-Interface: wlan0 # 2.4 GHz ap
-Channel: 149
-BSSID: f4:f5:e8:81:1b:a0
-AutoChannel: True
-AutoType: NONDFS
-Station List for band: 2.4
-
-Client Interface: wcli0 # 2.4 GHz client
-Client BSSID: f4:f5:e8:81:1b:a1
-
-Band: 5
-RegDomain: US
-Interface: wlan1 # 5 GHz ap
-Channel: 149
-BSSID: f4:f5:e8:81:1b:a0
-AutoChannel: True
-AutoType: NONDFS
-Station List for band: 5
-
-Client Interface: wcli1 # 5 GHz client
-Client BSSID: f4:f5:e8:81:1b:a1
-"""
-
+WIFI_SHOW_OUTPUT_MARVELL8897 = (
+ subprocess.wifi.MockInterface(phynum='0', bands=['2.4', '5'],
+ driver='cfg80211'),
+)
+WIFI_SHOW_OUTPUT_ATH9K_ATH10K = (
+ subprocess.wifi.MockInterface(phynum='0', bands=['2.4'], driver='cfg80211'),
+ subprocess.wifi.MockInterface(phynum='1', bands=['5'], driver='cfg80211'),
+)
# See b/27328894.
-WIFI_SHOW_OUTPUT_MARVELL8897_NO_5GHZ = """Band: 2.4
-RegDomain: 00
-Interface: wlan0 # 2.4 GHz ap
-BSSID: 00:50:43:02:fe:01
-AutoChannel: False
-Station List for band: 2.4
-
-Client Interface: wcli0 # 2.4 GHz client
-Client BSSID: 00:50:43:02:fe:02
-
-Band: 5
-RegDomain: 00
-"""
-
-WIFI_SHOW_OUTPUT_ATH9K_FRENZY = """Band: 2.4
-RegDomain: US
-Interface: wlan0 # 2.4 GHz ap
-Channel: 149
-BSSID: f4:f5:e8:81:1b:a0
-AutoChannel: True
-AutoType: NONDFS
-Station List for band: 2.4
-
-Client Interface: wcli0 # 2.4 GHz client
-Client BSSID: f4:f5:e8:81:1b:a1
-
-Band: 5
-RegDomain: 00
-"""
-
-WIFI_SHOW_OUTPUT_FRENZY = """Band: 2.4
-RegDomain: 00
-Band: 5
-RegDomain: 00
-"""
+WIFI_SHOW_OUTPUT_MARVELL8897_NO_5GHZ = (
+ subprocess.wifi.MockInterface(phynum='0', bands=['2.4'], driver='cfg80211'),
+)
+WIFI_SHOW_OUTPUT_ATH9K_FRENZY = (
+ subprocess.wifi.MockInterface(phynum='0', bands=['2.4'], driver='cfg80211'),
+ subprocess.wifi.MockInterface(phynum='1', bands=['5'], driver='frenzy'),
+)
+WIFI_SHOW_OUTPUT_FRENZY = (
+ subprocess.wifi.MockInterface(phynum='0', bands=['5'], driver='frenzy'),
+)
IW_SCAN_DEFAULT_OUTPUT = """BSS 00:11:22:33:44:55(on wcli0)
SSID: s1
@@ -133,22 +69,13 @@
@wvtest.wvtest
def get_client_interfaces_test():
"""Test get_client_interfaces."""
- wifi_show = None
- quantenna_interfaces = None
+ subprocess.reset()
- # pylint: disable=protected-access
- old_wifi_show = connection_manager._wifi_show
- old_get_quantenna_interfaces = connection_manager._get_quantenna_interfaces
- connection_manager._wifi_show = lambda: wifi_show
- connection_manager._get_quantenna_interfaces = lambda: quantenna_interfaces
-
- wifi_show = WIFI_SHOW_OUTPUT_MARVELL8897
- quantenna_interfaces = []
+ subprocess.mock('wifi', 'interfaces', *WIFI_SHOW_OUTPUT_MARVELL8897)
wvtest.WVPASSEQ(connection_manager.get_client_interfaces(),
{'wcli0': {'bands': set(['2.4', '5'])}})
- wifi_show = WIFI_SHOW_OUTPUT_ATH9K_ATH10K
- quantenna_interfaces = []
+ subprocess.mock('wifi', 'interfaces', *WIFI_SHOW_OUTPUT_ATH9K_ATH10K)
wvtest.WVPASSEQ(connection_manager.get_client_interfaces(), {
'wcli0': {'bands': set(['2.4'])},
'wcli1': {'bands': set(['5'])}
@@ -156,113 +83,30 @@
# Test Quantenna devices.
- # 2.4 GHz cfg80211 radio + 5 GHz Frenzy (Optimus Prime).
- wifi_show = WIFI_SHOW_OUTPUT_ATH9K_FRENZY
- quantenna_interfaces = ['wlan1', 'wlan1_portal', 'wcli1']
+ # 2.4 GHz cfg80211 radio + 5 GHz Frenzy (e.g. Optimus Prime).
+ subprocess.mock('wifi', 'interfaces', *WIFI_SHOW_OUTPUT_ATH9K_FRENZY)
wvtest.WVPASSEQ(connection_manager.get_client_interfaces(), {
'wcli0': {'bands': set(['2.4'])},
'wcli1': {'frenzy': True, 'bands': set(['5'])}
})
# Only Frenzy (e.g. Lockdown).
- wifi_show = WIFI_SHOW_OUTPUT_FRENZY
- quantenna_interfaces = ['wlan0', 'wlan0_portal', 'wcli0']
+ subprocess.mock('wifi', 'interfaces', *WIFI_SHOW_OUTPUT_FRENZY)
wvtest.WVPASSEQ(connection_manager.get_client_interfaces(),
{'wcli0': {'frenzy': True, 'bands': set(['5'])}})
- connection_manager._wifi_show = old_wifi_show
- connection_manager._get_quantenna_interfaces = old_get_quantenna_interfaces
-
-
-class WLANConfiguration(connection_manager.WLANConfiguration):
- """WLANConfiguration subclass for testing."""
-
- WIFI_STOPAP = ['echo', 'stopap']
- WIFI_SETCLIENT = ['echo', 'setclient']
- WIFI_STOPCLIENT = ['echo', 'stopclient']
-
- def __init__(self, *args, **kwargs):
- super(WLANConfiguration, self).__init__(*args, **kwargs)
- self.stale = False
-
- def _actually_start_client(self):
- self.client_was_up = self.client_up
- self.was_attached = self.wifi.attached()
- self.wifi._secure_testonly = True
-
- super(WLANConfiguration, self)._actually_start_client()
-
- if not self.client_was_up and not self.was_attached:
- self.wifi._initial_ssid_testonly = self.ssid
- self.wifi.start_wpa_supplicant_testonly(self._wpa_control_interface)
-
- if self.wifi._wpa_control:
- self.wifi._wpa_control.connected = not self.stale
- return not self.stale
-
- def _post_start_client(self):
- if not self.client_was_up:
- self.wifi.set_connection_check_result('succeed')
-
- if self.was_attached:
- self.wifi._wpa_control.ssid_testonly = self.ssid
- self.wifi._wpa_control.secure_testonly = True
- self.wifi.add_connected_event()
-
- # Normally, wpa_supplicant would bring up the client interface, which
- # would trigger ifplugd, which would run ifplugd.action, which would do
- # two things:
- #
- # 1) Write an interface status file.
- # 2) Call run-dhclient, which would call dhclient-script, which would
- # call ipapply, which would write gateway and subnet files.
- #
- # Fake both of these things instead.
- self.write_interface_status_file('1')
- self.write_gateway_file()
- self.write_subnet_file()
-
- def stop_client(self):
- client_was_up = self.client_up
-
- super(WLANConfiguration, self).stop_client()
-
- if client_was_up:
- self.wifi.add_terminating_event()
- self.wifi.set_connection_check_result('fail')
-
- # See comments in start_client.
- self.write_interface_status_file('0')
-
- def write_gateway_file(self):
- gateway_file = os.path.join(self.tmp_dir,
- self.gateway_file_prefix + self.wifi.name)
- with open(gateway_file, 'w') as f:
- # This value doesn't matter to conman, so it's fine to hard code it here.
- f.write('192.168.1.1')
-
- def write_subnet_file(self):
- subnet_file = os.path.join(self.tmp_dir,
- self.subnet_file_prefix + self.wifi.name)
- with open(subnet_file, 'w') as f:
- # This value doesn't matter to conman, so it's fine to hard code it here.
- f.write('192.168.1.0/24')
-
- def write_interface_status_file(self, value):
- status_file = os.path.join(self.interface_status_dir, self.wifi.name)
- with open(status_file, 'w') as f:
- # This value doesn't matter to conman, so it's fine to hard code it here.
- f.write(value)
-
@wvtest.wvtest
def WLANConfigurationParseTest(): # pylint: disable=invalid-name
"""Test WLANConfiguration parsing."""
+ subprocess.reset()
+
cmd = '\n'.join([
'WIFI_PSK=abcdWIFI_PSK=qwer', 'wifi', 'set', '-P', '-b', '5',
'--bridge=br0', '-s', 'my ssid=1', '--interface-suffix', '_suffix',
])
- config = WLANConfiguration('5', interface_test.Wifi('wcli0', 20), cmd, None)
+ config = connection_manager.WLANConfiguration(
+ '5', interface_test.Wifi('wcli0', 20), cmd, None)
wvtest.WVPASSEQ('my ssid=1', config.ssid)
wvtest.WVPASSEQ('abcdWIFI_PSK=qwer', config.passphrase)
@@ -290,150 +134,41 @@
Bridge = interface_test.Bridge
Wifi = Wifi
FrenzyWifi = FrenzyWifi
- WLANConfiguration = WLANConfiguration
-
- WIFI_SETCLIENT = ['echo', 'setclient']
- IFUP = ['echo', 'ifup']
- IFPLUGD_ACTION = ['echo', 'ifplugd.action']
- BINWIFI = ['echo', 'wifi']
- UPLOAD_LOGS_AND_WAIT = ['echo', 'upload-logs-and-wait']
- CWMP_WAKEUP = ['echo', 'cwmp', 'wakeup']
def __init__(self, *args, **kwargs):
self._binwifi_commands = []
- self.interfaces_already_up = kwargs.pop('__test_interfaces_already_up',
- ['eth0'])
+ for interface_name in kwargs.pop('__test_interfaces_already_up', ['eth0']):
+ subprocess.call(['ifup', interface_name])
+ if interface_name.startswith('w'):
+ phynum = interface_name[-1]
+ for band, interface in subprocess.wifi.INTERFACE_FOR_BAND.iteritems():
+ if interface.phynum == phynum:
+ break
+ else:
+ raise ValueError('Could not find matching interface for '
+ '__test_interfaces_already_up')
+ ssid = 'my ssid'
+ psk = 'passphrase'
+ # If band is undefined then a ValueError will be raised above. pylint
+ # isn't smart enough to figure that out.
+ # pylint: disable=undefined-loop-variable
+ subprocess.mock('cwmp', band, ssid=ssid, psk=psk, write_now=True)
+ subprocess.mock('wifi', 'remote_ap', band=band, ssid=ssid, psk=psk,
+ bssid='00:00:00:00:00:00')
- self.wifi_interfaces_already_up = [ifc for ifc in self.interfaces_already_up
- if ifc.startswith('w')]
- for wifi in self.wifi_interfaces_already_up:
- # wcli1 is always 5 GHz. wcli0 always *includes* 2.4. wlan* client
- # interfaces are Frenzy interfaces and therefore 5 GHz-only.
- band = '5' if wifi in ('wlan0', 'wlan1', 'wcli1') else '2.4'
- # This will happen in the super function, but in order for
- # write_wlan_config to work we have to do it now. This has to happen
- # before the super function so that the files exist before the inotify
- # registration.
- self._config_dir = kwargs['config_dir']
- self.write_wlan_config(band, 'my ssid', 'passphrase')
-
- # Also create the wpa_supplicant socket to which to attach.
- open(os.path.join(kwargs['wpa_control_interface'], wifi), 'w')
+ # Also create the wpa_supplicant socket to which to attach.
+ open(os.path.join(kwargs['wpa_control_interface'], interface_name),
+ 'w')
super(ConnectionManager, self).__init__(*args, **kwargs)
- self.interface_with_scan_results = None
- self.scan_results_include_hidden = False
- # Should we be able to connect to open network s2?
- self.can_connect_to_s2 = True
- self.can_connect_to_s3 = True
- # Will s2 fail rather than providing ACS access?
- self.s2_fail = False
- # Will s3 fail to acquire a DHCP lease?
- self.dhcp_failure_on_s3 = False
- self.log_upload_count = 0
- self.acs_session_fails = False
- for wifi in self.wifi:
- wifi.bssids_tried_testonly = 0
-
- def create_wifi_interfaces(self):
- super(ConnectionManager, self).create_wifi_interfaces()
- for wifi in self.wifi_interfaces_already_up:
- # pylint: disable=protected-access
- self.interface_by_name(wifi)._initial_ssid_testonly = 'my ssid'
- self.interface_by_name(wifi)._secure_testonly = True
-
- @property
- def IP_LINK(self):
- return ['echo'] + ['%s LOWER_UP' % ifc
- for ifc in self.interfaces_already_up]
-
- def _update_access_point(self, wlan_configuration):
- client_was_up = wlan_configuration.client_up
- super(ConnectionManager, self)._update_access_point(wlan_configuration)
- if wlan_configuration.access_point_up:
- if client_was_up:
- wifi = self.wifi_for_band(wlan_configuration.band)
- wifi.add_terminating_event()
-
- def _try_bssid(self, wifi, bss_info):
- if wifi.wpa_status().get('wpa_state', None) == 'COMPLETED':
- wifi.add_disconnected_event()
- self.last_provisioning_attempt = bss_info
-
- super(ConnectionManager, self)._try_bssid(wifi, bss_info)
-
- wifi.bssids_tried_testonly += 1
-
- def connect(connection_check_result, dhcp_failure=False):
- # pylint: disable=protected-access
- if wifi.attached():
- wifi._wpa_control.ssid_testonly = bss_info.ssid
- wifi._wpa_control.secure_testonly = False
- wifi.add_connected_event()
- else:
- wifi._initial_ssid_testonly = bss_info.ssid
- wifi._secure_testonly = False
- wifi.start_wpa_supplicant_testonly(self._wpa_control_interface)
- wifi.set_connection_check_result(connection_check_result)
- self.ifplugd_action(wifi.name, True, dhcp_failure)
-
- if bss_info and bss_info.ssid == 's1':
- connect('fail')
- return True
-
- if bss_info and bss_info.ssid == 's2' and self.can_connect_to_s2:
- connect('fail' if self.s2_fail else 'succeed')
- return True
-
- if bss_info and bss_info.ssid == 's3' and self.can_connect_to_s3:
- connect('restricted', self.dhcp_failure_on_s3)
- return True
-
- return False
-
- # pylint: disable=unused-argument,protected-access
- def _find_bssids(self, band):
- scan_output = ''
- if (self.interface_with_scan_results and
- band in self.interface_by_name(self.interface_with_scan_results).bands):
- scan_output = IW_SCAN_DEFAULT_OUTPUT
- if self.scan_results_include_hidden:
- scan_output += IW_SCAN_HIDDEN_OUTPUT
- iw._scan = lambda interface: scan_output
- return super(ConnectionManager, self)._find_bssids(band)
-
- def _update_wlan_configuration(self, wlan_configuration):
- wlan_configuration.command.insert(0, 'echo')
- wlan_configuration._wpa_control_interface = self._wpa_control_interface
- wlan_configuration.tmp_dir = self._tmp_dir
- wlan_configuration.interface_status_dir = self._interface_status_dir
- wlan_configuration.gateway_file_prefix = self.GATEWAY_FILE_PREFIX
- wlan_configuration.subnet_file_prefix = self.SUBNET_FILE_PREFIX
-
- super(ConnectionManager, self)._update_wlan_configuration(
- wlan_configuration)
-
# Just looking for last_wifi_scan_time to change doesn't work because the
# tests run too fast.
def _wifi_scan(self, wifi):
super(ConnectionManager, self)._wifi_scan(wifi)
wifi.wifi_scan_counter += 1
- def ifplugd_action(self, interface_name, up, dhcp_failure=False):
- # Typically, when moca comes up, conman calls ifplugd.action, which writes
- # this file. Also, when conman starts, it calls ifplugd.action for eth0.
- self.write_interface_status_file(interface_name, '1' if up else '0')
-
- # ifplugd calls run-dhclient, which results in a gateway file if the link is
- # up (and working).
- if up and not dhcp_failure:
- self.write_gateway_file('br0' if interface_name in ('eth0', 'moca0')
- else interface_name)
- self.write_subnet_file('br0' if interface_name in ('eth0', 'moca0')
- else interface_name)
-
def _binwifi(self, *command):
super(ConnectionManager, self)._binwifi(*command)
self._binwifi_commands.append(command)
@@ -452,50 +187,7 @@
return self._wlan_configuration[band].client_up
- def _try_upload_logs(self):
- self.log_upload_count += 1
- return super(ConnectionManager, self)._try_upload_logs()
-
- # Test methods
-
- def tried_to_upload_logs(self):
- result = getattr(self, 'last_log_upload_count', 0) < self.log_upload_count
- self.last_log_upload_count = self.log_upload_count
- return result
-
- def delete_wlan_config(self, band):
- delete_wlan_config(self._config_dir, band)
-
- def write_wlan_config(self, *args, **kwargs):
- write_wlan_config(self._config_dir, *args, **kwargs)
-
- def enable_access_point(self, band):
- enable_access_point(self._config_dir, band)
-
- def disable_access_point(self, band):
- disable_access_point(self._config_dir, band)
-
- def write_gateway_file(self, interface_name):
- gateway_file = os.path.join(self._tmp_dir,
- self.GATEWAY_FILE_PREFIX + interface_name)
- with open(gateway_file, 'w') as f:
- logging.debug('Writing gateway file %s', gateway_file)
- # This value doesn't matter to conman, so it's fine to hard code it here.
- f.write('192.168.1.1')
-
- def write_subnet_file(self, interface_name):
- subnet_file = os.path.join(self._tmp_dir,
- self.SUBNET_FILE_PREFIX + interface_name)
- with open(subnet_file, 'w') as f:
- logging.debug('Writing subnet file %s', subnet_file)
- # This value doesn't matter to conman, so it's fine to hard code it here.
- f.write('192.168.1.0/24')
-
- def write_interface_status_file(self, interface_name, value):
- status_file = os.path.join(self._interface_status_dir, interface_name)
- with open(status_file, 'w') as f:
- # This value doesn't matter to conman, so it's fine to hard code it here.
- f.write(value)
+ # # Test methods
def set_ethernet(self, up):
self.ifplugd_action('eth0', up)
@@ -514,6 +206,7 @@
self.run_once()
def run_until_scan(self, band):
+ logging.debug('running until scan on band %r', band)
wifi = self.wifi_for_band(band)
wifi_scan_counter = wifi.wifi_scan_counter
while wifi_scan_counter == wifi.wifi_scan_counter:
@@ -529,58 +222,12 @@
def has_status_files(self, files):
return not set(files) - set(os.listdir(self._status_dir))
- def cwmp_wakeup(self):
- super(ConnectionManager, self).cwmp_wakeup()
- self.write_acscontact()
- if self.acs():
- self.write_acsconnected()
-
- def write_acscontact(self):
- open(os.path.join(connection_manager.CWMP_PATH, 'acscontact'), 'w')
-
- def write_acsconnected(self):
- if not self.acs_session_fails:
- open(os.path.join(connection_manager.CWMP_PATH, 'acsconnected'), 'w')
-
-
-def wlan_config_filename(path, band):
- return os.path.join(path, 'command.%s' % band)
-
-
-def access_point_filename(path, band):
- return os.path.join(path, 'access_point.%s' % band)
-
-
-def write_wlan_config(path, band, ssid, psk, atomic=False):
- final_filename = wlan_config_filename(path, band)
- filename = final_filename + ('.tmp' if atomic else '')
- with open(filename, 'w') as f:
- f.write('\n'.join(['env', 'WIFI_PSK=%s' % psk,
- 'wifi', 'set', '-b', band, '--ssid', ssid]))
- if atomic:
- os.rename(filename, final_filename)
-
-
-def delete_wlan_config(path, band):
- os.unlink(wlan_config_filename(path, band))
-
-
-def enable_access_point(path, band):
- open(access_point_filename(path, band), 'w')
-
-
-def disable_access_point(path, band):
- ap_filename = access_point_filename(path, band)
- if os.path.isfile(ap_filename):
- os.unlink(ap_filename)
-
def check_tmp_hosts(expected_contents):
wvtest.WVPASSEQ(open(connection_manager.TMP_HOSTS).read(), expected_contents)
-def connection_manager_test(radio_config, wlan_configs=None,
- quantenna_interfaces=None, **cm_kwargs):
+def connection_manager_test(radio_config, wlan_configs=None, **cm_kwargs):
"""Returns a decorator that does ConnectionManager test boilerplate."""
if wlan_configs is None:
wlan_configs = {}
@@ -589,38 +236,39 @@
"""The actual decorator."""
def actual_test():
"""The actual test function."""
+ subprocess.reset()
+
run_duration_s = .01
interface_update_period = 5
wifi_scan_period = 15
wifi_scan_period_s = run_duration_s * wifi_scan_period
associate_wait_s = 0
dhcp_wait_s = .5
+ acs_cc_wait_s = 0
acs_start_wait_s = 0
- acs_finish_wait_s = 0
+ acs_finish_wait_s = 0.25
- # pylint: disable=protected-access
- old_wifi_show = connection_manager._wifi_show
- connection_manager._wifi_show = lambda: radio_config
-
- old_gqi = connection_manager._get_quantenna_interfaces
- connection_manager._get_quantenna_interfaces = (
- lambda: quantenna_interfaces or [])
+ subprocess.mock('wifi', 'interfaces', *radio_config)
try:
# No initial state.
connection_manager.TMP_HOSTS = tempfile.mktemp()
tmp_dir = tempfile.mkdtemp()
config_dir = tempfile.mkdtemp()
- os.mkdir(os.path.join(tmp_dir, 'interfaces'))
+ interfaces_dir = os.path.join(tmp_dir, 'interfaces')
+ if not os.path.exists(interfaces_dir):
+ os.mkdir(interfaces_dir)
moca_tmp_dir = tempfile.mkdtemp()
wpa_control_interface = tempfile.mkdtemp()
+ subprocess.mock('wifi', 'wpa_path', wpa_control_interface)
FrenzyWifi.WPACtrl.WIFIINFO_PATH = tempfile.mkdtemp()
connection_manager.CWMP_PATH = tempfile.mkdtemp()
+ subprocess.set_conman_paths(tmp_dir, config_dir,
+ connection_manager.CWMP_PATH)
for band, access_point in wlan_configs.iteritems():
- write_wlan_config(config_dir, band, 'initial ssid', 'initial psk')
- if access_point:
- open(os.path.join(config_dir, 'access_point.%s' % band), 'w')
+ subprocess.mock('cwmp', band, ssid='initial ssid', psk='initial psk',
+ access_point=access_point, write_now=True)
# Test that missing directories are created by ConnectionManager.
shutil.rmtree(tmp_dir)
@@ -635,12 +283,17 @@
wifi_scan_period_s=wifi_scan_period_s,
associate_wait_s=associate_wait_s,
dhcp_wait_s=dhcp_wait_s,
+ acs_connection_check_wait_s=acs_cc_wait_s,
acs_start_wait_s=acs_start_wait_s,
acs_finish_wait_s=acs_finish_wait_s,
bssid_cycle_length_s=1,
**cm_kwargs)
f(c)
+ except Exception:
+ logging.error('Uncaught exception!')
+ traceback.print_exc()
+ raise
finally:
if os.path.exists(connection_manager.TMP_HOSTS):
os.unlink(connection_manager.TMP_HOSTS)
@@ -650,9 +303,6 @@
shutil.rmtree(wpa_control_interface)
shutil.rmtree(FrenzyWifi.WPACtrl.WIFIINFO_PATH)
shutil.rmtree(connection_manager.CWMP_PATH)
- # pylint: disable=protected-access
- connection_manager._wifi_show = old_wifi_show
- connection_manager._get_quantenna_interfaces = old_gqi
actual_test.func_name = f.func_name
return actual_test
@@ -660,6 +310,20 @@
return inner
+def _enable_basic_scan_results(band):
+ for bssid, ssid, ccr in (('00:11:22:33:44:55', 's1', 'fail'),
+ ('66:77:88:99:aa:bb', 's1', 'fail'),
+ ('01:23:45:67:89:ab', 's2', 'succeed')):
+ subprocess.mock('wifi', 'remote_ap', bssid=bssid, ssid=ssid,
+ band=band, security=None, connection_check_result=ccr)
+
+
+def _disable_basic_scan_results(band):
+ for bssid in (('00:11:22:33:44:55'), ('66:77:88:99:aa:bb'),
+ ('01:23:45:67:89:ab')):
+ subprocess.mock('wifi', 'remote_ap_remove', bssid=bssid, band=band)
+
+
def connection_manager_test_generic(c, band):
"""Test ConnectionManager for things independent of radio configuration.
@@ -754,16 +418,28 @@
check_tmp_hosts('127.0.0.1 localhost')
# Now there are some scan results.
- c.interface_with_scan_results = c.wifi_for_band(band).name
- # Wait for a scan, plus 3 cycles, so that s2 will have been tried.
+ _enable_basic_scan_results(band)
+
+ # Create a WLAN configuration which should eventually be connected to.
+ ssid = 'wlan'
+ psk = 'password'
+ subprocess.mock('wifi', 'remote_ap',
+ bssid='11:22:33:44:55:66',
+ ssid=ssid, psk=psk, band=band, security='WPA2')
+ subprocess.mock('cwmp', band, ssid=ssid, psk=psk, access_point=False)
+
+ wvtest.WVFAIL(subprocess.upload_logs_and_wait.uploaded_logs())
+ # Wait for a scan, then until s2 is tried.
c.run_until_scan(band)
- wvtest.WVPASSEQ(c.log_upload_count, 0)
- c.wifi_for_band(band).ip_testonly = '192.168.1.100'
+ subprocess.call(['ip', 'addr', 'add', '192.168.1.100',
+ 'dev', c.wifi_for_band(band).name])
for _ in range(len(c.wifi_for_band(band).cycler)):
c.run_once()
wvtest.WVPASS(c.has_status_files([status.P.CONNECTED_TO_OPEN]))
+ last_bss_info = c.wifi_for_band(band).last_attempted_bss_info
+ if last_bss_info.ssid == 's2':
+ break
- last_bss_info = c.wifi_for_band(band).last_attempted_bss_info
wvtest.WVPASSEQ(last_bss_info.ssid, 's2')
wvtest.WVPASSEQ(last_bss_info.bssid, '01:23:45:67:89:ab')
@@ -773,17 +449,10 @@
wvtest.WVPASS(c.internet())
wvtest.WVFAIL(c.client_up(band))
wvtest.WVPASS(c.wifi_for_band(band).current_routes())
- wvtest.WVPASS(c.tried_to_upload_logs())
- # Disable scan results again.
- c.interface_with_scan_results = None
+ wvtest.WVPASS(subprocess.upload_logs_and_wait.uploaded_logs())
c.run_until_interface_update()
check_tmp_hosts('192.168.1.100 %s\n127.0.0.1 localhost' % hostname)
- # Now, create a WLAN configuration which should be connected to.
- ssid = 'wlan'
- psk = 'password'
- c.write_wlan_config(band, ssid, psk)
- c.disable_access_point(band)
c.run_once()
wvtest.WVPASS(c.client_up(band))
wvtest.WVPASS(c.wifi_for_band(band).current_routes())
@@ -792,7 +461,7 @@
# Kill wpa_supplicant. conman should restart it.
wvtest.WVPASS(c.client_up(band))
wvtest.WVPASS(c._connected_to_wlan(c.wifi_for_band(band)))
- c.wifi_for_band(band).kill_wpa_supplicant_testonly(c._wpa_control_interface)
+ subprocess.mock('wifi', 'kill_wpa_supplicant', band)
wvtest.WVFAIL(c.client_up(band))
wvtest.WVFAIL(c._connected_to_wlan(c.wifi_for_band(band)))
# Make sure we stay connected to s2, rather than disconnecting and
@@ -806,18 +475,25 @@
# The AP restarts with a new configuration, kicking us off. We should
# reprovision.
- c._wlan_configuration[band].stale = True
- c.wifi_for_band(band).add_disconnected_event()
+ ssid = 'wlan2'
+ psk = 'password2'
+ subprocess.mock('cwmp', band, ssid=ssid, psk=psk)
+ subprocess.mock('wifi', 'remote_ap',
+ bssid='11:22:33:44:55:66',
+ ssid=ssid, psk=psk, band=band, security='WPA2')
+ subprocess.mock('wifi', 'disconnected_event', band)
c.run_once()
wvtest.WVFAIL(c.client_up(band))
wvtest.WVPASS(c._connected_to_open(c.wifi_for_band(band)))
- wvtest.WVPASSEQ(c.last_provisioning_attempt.ssid, 's2')
+ wvtest.WVPASSEQ(c.wifi_for_band(band).last_attempted_bss_info.ssid, 's2')
- # Now that we're on the provisioning network, create the new WLAN
- # configuration, which should be connected to.
- ssid = 'wlan2'
- psk = 'password2'
- c.write_wlan_config(band, ssid, psk)
+ # Overwrites previous one due to same BSSID.
+ subprocess.mock('wifi', 'remote_ap',
+ bssid='11:22:33:44:55:66',
+ ssid=ssid, psk=psk, band=band, security='WPA2')
+ # Run once for cwmp wakeup to get called, then once more for the new config to
+ # be received.
+ c.run_once()
c.run_once()
wvtest.WVPASS(c.client_up(band))
wvtest.WVPASS(c.wifi_for_band(band).wpa_status()['ssid'] == ssid)
@@ -829,23 +505,26 @@
# add the user's WLAN to the scan results, and scan again. This time, the
# first SSID tried should be 's3', which is now present in the scan results
# (with its SSID hidden, but included via vendor IE).
- c.delete_wlan_config(band)
- c.can_connect_to_s2 = False
- c.interface_with_scan_results = c.wifi_for_band(band).name
- c.scan_results_include_hidden = True
- c.run_until_interface_update_and_scan(band)
- wvtest.WVFAIL(c.has_status_files([status.P.CONNECTED_TO_WLAN]))
- c.run_until_interface_update()
- wvtest.WVPASS(c.has_status_files([status.P.CONNECTED_TO_OPEN]))
- wvtest.WVPASSEQ(c.last_provisioning_attempt.ssid, 's3')
- wvtest.WVPASSEQ(c.last_provisioning_attempt.bssid, 'ff:ee:dd:cc:bb:aa')
- # The log upload happens on the next main loop after joining s3.
- c.run_once()
- wvtest.WVPASS(c.tried_to_upload_logs())
+ subprocess.mock('cwmp', band, delete_config=True, write_now=True)
+ del c.wifi_for_band(band).cycler
+ _enable_basic_scan_results(band)
+ # Remove s2.
+ subprocess.mock('wifi', 'remote_ap_remove',
+ bssid='01:23:45:67:89:ab', band=band)
+ # Create s3.
+ subprocess.mock('wifi', 'remote_ap', bssid='ff:ee:dd:cc:bb:aa', ssid='s3',
+ band=band, security=None, hidden=True,
+ vendor_ies=(('f4:f5:e8', '01'), ('f4:f5:e8', '03 73 33')))
+ #### Now, recreate the same WLAN configuration, which should be connected to.
+ subprocess.mock('cwmp', band, ssid=ssid, psk=psk)
- # Now, recreate the same WLAN configuration, which should be connected to.
- # Also, test that atomic writes/renames work.
- c.write_wlan_config(band, ssid, psk, atomic=True)
+ c.run_until_interface_update_and_scan(band)
+ c.run_until_interface_update()
+ wvtest.WVPASSEQ(c.wifi_for_band(band).last_attempted_bss_info.ssid, 's3')
+ wvtest.WVPASSEQ(c.wifi_for_band(band).last_attempted_bss_info.bssid,
+ 'ff:ee:dd:cc:bb:aa')
+ wvtest.WVPASS(subprocess.upload_logs_and_wait.uploaded_logs())
+
c.run_once()
wvtest.WVPASS(c.client_up(band))
wvtest.WVPASS(c.wifi_for_band(band).current_routes())
@@ -853,7 +532,7 @@
# Now enable the AP. Since we have no wired connection, this should have no
# effect.
- c.enable_access_point(band)
+ subprocess.mock('cwmp', band, access_point=True, write_now=True)
c.run_once()
wvtest.WVPASS(c.client_up(band))
wvtest.WVPASS(c.wifi_for_band(band).current_routes())
@@ -865,7 +544,7 @@
# an AP.
c.set_ethernet(True)
c.bridge.set_connection_check_result('succeed')
- c.bridge.ip_testonly = '192.168.1.101'
+ subprocess.call(['ip', 'addr', 'add', '192.168.1.101', 'dev', c.bridge.name])
c.run_until_interface_update()
wvtest.WVPASS(c.access_point_up(band))
wvtest.WVFAIL(c.client_up(band))
@@ -876,7 +555,7 @@
# Now move (rather than delete) the configuration file. The AP should go
# away, and we should not be able to join the WLAN. Routes should not be
# affected.
- filename = wlan_config_filename(c._config_dir, band)
+ filename = subprocess.cwmp.wlan_config_filename(band)
other_filename = filename + '.bak'
os.rename(filename, other_filename)
c.run_once()
@@ -896,7 +575,7 @@
# Now delete the config and bring down the bridge and make sure we reprovision
# via the last working BSS.
- c.delete_wlan_config(band)
+ subprocess.mock('cwmp', band, delete_config=True, write_now=True)
c.bridge.set_connection_check_result('fail')
scan_count_for_band = c.wifi_for_band(band).wifi_scan_counter
c.run_until_interface_update()
@@ -907,6 +586,7 @@
check_tmp_hosts('192.168.1.101 %s\n127.0.0.1 localhost' % hostname)
# s3 is not what the cycler would suggest trying next.
wvtest.WVPASSNE('s3', c.wifi_for_band(band).cycler.peek())
+ subprocess.mock('cwmp', band, ssid=ssid, psk=psk)
# Run only once, so that only one BSS can be tried. It should be the s3 one,
# since that worked previously.
c.run_once()
@@ -914,18 +594,23 @@
# Make sure we didn't scan on `band`.
wvtest.WVPASSEQ(scan_count_for_band, c.wifi_for_band(band).wifi_scan_counter)
c.run_once()
- wvtest.WVPASS(c.tried_to_upload_logs())
+ wvtest.WVPASS(subprocess.upload_logs_and_wait.uploaded_logs())
- # Now re-create the WLAN config, connect to the WLAN, and make sure that s3 is
- # unset as last_successful_bss_info, since it is no longer available.
- c.write_wlan_config(band, ssid, psk)
+ # Now wait for the WLAN config to be created, connect to the WLAN, and make
+ # sure that s3 is unset as last_successful_bss_info, since it is no longer
+ # available.
c.run_once()
wvtest.WVPASS(c.acs())
wvtest.WVPASS(c.internet())
- c.can_connect_to_s3 = False
- c.scan_results_include_hidden = False
- c.delete_wlan_config(band)
+ # Remove s3.
+ subprocess.mock('wifi', 'remote_ap_remove',
+ bssid='ff:ee:dd:cc:bb:aa', band=band)
+ # Bring s2 back.
+ subprocess.mock('wifi', 'remote_ap', bssid='01:23:45:67:89:ab', ssid='s2',
+ band=band, security=None)
+
+ subprocess.mock('cwmp', band, delete_config=True, write_now=True)
c.run_once()
wvtest.WVPASSEQ(c.wifi_for_band(band).last_successful_bss_info, None)
@@ -939,7 +624,7 @@
# disconnecting.
# 4) Connect to s2 but get no ACS access; see that last_successful_bss_info is
# unset.
- c.write_wlan_config(band, ssid, psk)
+ subprocess.mock('cwmp', band, ssid=ssid, psk=psk, write_now=True)
# Connect
c.run_once()
# Process DHCP results
@@ -947,31 +632,37 @@
wvtest.WVPASS(c.acs())
wvtest.WVPASS(c.internet())
- c.delete_wlan_config(band)
+ subprocess.mock('cwmp', band, delete_config=True, write_now=True)
c.run_once()
wvtest.WVFAIL(c.wifi_for_band(band).acs())
- c.can_connect_to_s2 = True
# Give it time to try all BSSIDs. This means sleeping long enough that
# everything in the cycler is active, then doing n+1 loops (the n+1st loop is
# when we decide that the SSID in the nth loop was successful).
time.sleep(c._bssid_cycle_length_s)
+ subprocess.mock('cwmp', band, ssid=ssid, psk=psk)
for _ in range(len(c.wifi_for_band(band).cycler) + 1):
c.run_once()
- s2_bss = iw.BssInfo('01:23:45:67:89:ab', 's2')
+ s2_bss = iw.BssInfo(bssid='01:23:45:67:89:ab', ssid='s2', band=band)
wvtest.WVPASSEQ(c.wifi_for_band(band).last_successful_bss_info, s2_bss)
c.run_once()
- wvtest.WVPASS(c.tried_to_upload_logs())
+ wvtest.WVPASS(subprocess.upload_logs_and_wait.uploaded_logs())
- c.s2_fail = True
- c.write_wlan_config(band, ssid, psk)
+ # Make s2's connection check fail.
+ subprocess.mock('wifi', 'remote_ap', bssid='01:23:45:67:89:ab', ssid='s2',
+ band=band, security=None, connection_check_result='fail')
+
c.run_once()
wvtest.WVPASS(c.acs())
wvtest.WVPASS(c.internet())
-
wvtest.WVPASSEQ(c.wifi_for_band(band).last_successful_bss_info, s2_bss)
- c._wlan_configuration[band].stale = True
- c.wifi_for_band(band).add_disconnected_event()
+ # Disconnect.
+ ssid = 'wlan3'
+ psk = 'password3'
+ subprocess.mock('wifi', 'remote_ap',
+ bssid='11:22:33:44:55:66',
+ ssid=ssid, psk=psk, band=band, security='WPA2')
+ subprocess.mock('wifi', 'disconnected_event', band)
# Run once so that c will reconnect to s2.
c.run_once()
wvtest.WVPASS(c.wifi_for_band(band).connected_to_open())
@@ -986,16 +677,18 @@
# which lets us force a timeout and proceed to the next AP. Having a stale
# WLAN configuration shouldn't interrupt provisioning.
del c.wifi_for_band(band).cycler
- c.interface_with_scan_results = c.wifi_for_band(band).name
- c.scan_results_include_hidden = True
- c.can_connect_to_s3 = True
- c.dhcp_failure_on_s3 = True
+ subprocess.mock('wifi', 'remote_ap', bssid='ff:ee:dd:cc:bb:aa', ssid='s3',
+ band=band, security=None, hidden=True,
+ vendor_ies=(('f4:f5:e8', '01'), ('f4:f5:e8', '03 73 33')))
+ subprocess.mock('run-dhclient', c.wifi_for_band(band).name, failure=True)
+
# First iteration: check that we try s3.
c.run_until_scan(band)
last_bss_info = c.wifi_for_band(band).last_attempted_bss_info
wvtest.WVPASSEQ(last_bss_info.ssid, 's3')
wvtest.WVPASSEQ(last_bss_info.bssid, 'ff:ee:dd:cc:bb:aa')
- c.write_wlan_config(band, ssid, psk)
+ # Attempt to interrupt provisioning, make sure it doesn't work.
+ c._try_wlan_after[band] = 0
# Second iteration: check that we try s3 again since there's no gateway yet.
c.run_once()
last_bss_info = c.wifi_for_band(band).last_attempted_bss_info
@@ -1011,7 +704,8 @@
wvtest.WVPASSNE(last_bss_info.bssid, 'ff:ee:dd:cc:bb:aa')
# We can delete the stale WLAN config now, to simplify subsequent tests.
- c.delete_wlan_config(band)
+ # c.delete_wlan_config(band)
+ subprocess.mock('cwmp', band, delete_config=True, write_now=True)
# Now repeat the above, but for an ACS session that takes a while. We don't
# necessarily want to leave if it fails (so we don't want the third check),
@@ -1020,12 +714,13 @@
# Unlike DHCP, which we can always simulate working immediately above, it is
# wrong to simulate ACS sessions working for connections without ACS access.
# This means we can either always wait for the ACS session timeout in every
- # test above, making the tests much slower, or we can set that timeout to 0
- # and then be a little gross here and change it. The latter is unfortunately
- # the lesser evil, because slow tests are bad.
+ # test above, making the tests much slower, or we can set that timeout very
+ # low and then be a little gross here and change it. The latter is
+ # unfortunately the lesser evil, because slow tests are bad.
del c.wifi_for_band(band).cycler
- c.dhcp_failure_on_s3 = False
- c.acs_session_fails = True
+ subprocess.mock('run-dhclient', c.wifi_for_band(band).name, failure=False)
+ subprocess.mock('cwmp', band, acs_session_fails=True)
+
# First iteration: check that we try s3.
c.run_until_scan(band)
last_bss_info = c.wifi_for_band(band).last_attempted_bss_info
@@ -1050,8 +745,7 @@
# Finally, test successful provisioning.
del c.wifi_for_band(band).cycler
- c.dhcp_failure_on_s3 = False
- c.acs_session_fails = False
+ subprocess.mock('cwmp', band, acs_session_fails=False)
# First iteration: check that we try s3.
c.run_until_scan(band)
last_bss_info = c.wifi_for_band(band).last_attempted_bss_info
@@ -1092,25 +786,19 @@
@wvtest.wvtest
-@connection_manager_test(WIFI_SHOW_OUTPUT_ATH9K_FRENZY,
- quantenna_interfaces=['wlan1', 'wlan1_portal', 'wcli1']
- )
+@connection_manager_test(WIFI_SHOW_OUTPUT_ATH9K_FRENZY)
def connection_manager_test_generic_ath9k_frenzy_2g(c):
connection_manager_test_generic(c, '2.4')
@wvtest.wvtest
-@connection_manager_test(WIFI_SHOW_OUTPUT_ATH9K_FRENZY,
- quantenna_interfaces=['wlan1', 'wlan1_portal', 'wcli1']
- )
+@connection_manager_test(WIFI_SHOW_OUTPUT_ATH9K_FRENZY)
def connection_manager_test_generic_ath9k_frenzy_5g(c):
connection_manager_test_generic(c, '5')
@wvtest.wvtest
-@connection_manager_test(WIFI_SHOW_OUTPUT_FRENZY,
- quantenna_interfaces=['wlan0', 'wlan0_portal', 'wcli0']
- )
+@connection_manager_test(WIFI_SHOW_OUTPUT_FRENZY)
def connection_manager_test_generic_frenzy_5g(c):
connection_manager_test_generic(c, '5')
@@ -1123,24 +811,28 @@
Args:
c: The ConnectionManager set up by @connection_manager_test.
"""
+ ssid = 'my ssid'
+ psk = 'passphrase'
+
wvtest.WVPASSEQ(len(c._binwifi_commands), 2)
+
for band in ['2.4', '5']:
wvtest.WVPASS(('stop', '--band', band, '--persist') in c._binwifi_commands)
+ subprocess.mock('wifi', 'remote_ap',
+ bssid='11:22:33:44:55:66',
+ ssid=ssid, psk=psk, band=band, security='WPA2')
+
# Bring up ethernet, access.
c.set_ethernet(True)
c.run_once()
wvtest.WVPASS(c.acs())
wvtest.WVPASS(c.internet())
- ssid = 'my ssid'
- psk = 'passphrase'
-
# Bring up both access points.
- c.write_wlan_config('2.4', ssid, psk)
- c.enable_access_point('2.4')
- c.write_wlan_config('5', ssid, psk)
- c.enable_access_point('5')
+ for band in ('2.4', '5'):
+ subprocess.mock('cwmp', band, ssid=ssid, psk=psk, access_point=True,
+ write_now=True)
c.run_once()
wvtest.WVPASS(c.access_point_up('2.4'))
wvtest.WVPASS(c.access_point_up('5'))
@@ -1152,7 +844,7 @@
# Disable the 2.4 GHz AP, make sure the 5 GHz AP stays up. 2.4 GHz should
# join the WLAN.
- c.disable_access_point('2.4')
+ subprocess.mock('cwmp', '2.4', access_point=False, write_now=True)
c.run_until_interface_update()
wvtest.WVFAIL(c.access_point_up('2.4'))
wvtest.WVPASS(c.access_point_up('5'))
@@ -1163,7 +855,7 @@
# Delete the 2.4 GHz WLAN configuration; it should leave the WLAN but nothing
# else should change.
- c.delete_wlan_config('2.4')
+ subprocess.mock('cwmp', '2.4', delete_config=True, write_now=True)
c.run_until_interface_update()
wvtest.WVFAIL(c.access_point_up('2.4'))
wvtest.WVPASS(c.access_point_up('5'))
@@ -1175,7 +867,7 @@
# Disable the wired connection and remove the WLAN configurations. Both
# radios should scan. Wait for 5 GHz to scan, then enable scan results for
# 2.4. This should lead to ACS access.
- c.delete_wlan_config('5')
+ subprocess.mock('cwmp', '5', delete_config=True, write_now=True)
c.set_ethernet(False)
c.run_once()
wvtest.WVFAIL(c.acs())
@@ -1193,7 +885,7 @@
wvtest.WVFAIL(c.wifi_for_band('5').current_routes_normal_testonly())
# The next 2.4 GHz scan will have results.
- c.interface_with_scan_results = c.wifi_for_band('2.4').name
+ _enable_basic_scan_results('2.4')
c.run_until_scan('2.4')
# Now run for enough cycles that s2 will have been tried.
for _ in range(len(c.wifi_for_band('2.4').cycler)):
@@ -1204,7 +896,7 @@
wvtest.WVPASS(c.wifi_for_band('2.4').current_routes())
wvtest.WVFAIL(c.wifi_for_band('5').current_routes_normal_testonly())
c.run_once()
- wvtest.WVPASS(c.tried_to_upload_logs())
+ wvtest.WVPASS(subprocess.upload_logs_and_wait.uploaded_logs())
@wvtest.wvtest
@@ -1214,9 +906,7 @@
@wvtest.wvtest
-@connection_manager_test(WIFI_SHOW_OUTPUT_ATH9K_FRENZY,
- quantenna_interfaces=['wlan1', 'wlan1_portal', 'wcli1']
- )
+@connection_manager_test(WIFI_SHOW_OUTPUT_ATH9K_FRENZY)
def connection_manager_test_dual_band_two_radios_ath9k_frenzy(c):
connection_manager_test_dual_band_two_radios(c)
@@ -1243,10 +933,10 @@
psk = 'passphrase'
# Enable both access points. Only 5 should be up.
- c.write_wlan_config('2.4', ssid, psk)
- c.enable_access_point('2.4')
- c.write_wlan_config('5', ssid, psk)
- c.enable_access_point('5')
+ subprocess.mock('cwmp', '2.4', ssid=ssid, psk=psk, access_point=True,
+ write_now=True)
+ subprocess.mock('cwmp', '5', ssid=ssid, psk=psk, access_point=True,
+ write_now=True)
c.run_once()
wvtest.WVFAIL(c.access_point_up('2.4'))
wvtest.WVPASS(c.access_point_up('5'))
@@ -1256,7 +946,7 @@
# Disable the 2.4 GHz AP; nothing should change. The 2.4 GHz client should
# not be up because the same radio is being used to run a 5 GHz AP.
- c.disable_access_point('2.4')
+ subprocess.mock('cwmp', '2.4', access_point=False, write_now=True)
c.run_until_interface_update()
wvtest.WVFAIL(c.access_point_up('2.4'))
wvtest.WVPASS(c.access_point_up('5'))
@@ -1266,7 +956,7 @@
wvtest.WVFAIL(c.wifi_for_band('5').current_routes_normal_testonly())
# Delete the 2.4 GHz WLAN configuration; nothing should change.
- c.delete_wlan_config('2.4')
+ subprocess.mock('cwmp', '2.4', delete_config=True, write_now=True)
c.run_once()
wvtest.WVFAIL(c.access_point_up('2.4'))
wvtest.WVPASS(c.access_point_up('5'))
@@ -1279,7 +969,7 @@
# should be a single scan that leads to ACS access. (It doesn't matter which
# band we specify in run_until_scan, since both bands point to the same
# interface.)
- c.delete_wlan_config('5')
+ subprocess.mock('cwmp', '5', delete_config=True, write_now=True)
c.set_ethernet(False)
c.run_once()
wvtest.WVFAIL(c.acs())
@@ -1288,7 +978,7 @@
wvtest.WVFAIL(c.wifi_for_band('5').current_routes_normal_testonly())
# The scan will have results that will lead to ACS access.
- c.interface_with_scan_results = c.wifi_for_band('2.4').name
+ _enable_basic_scan_results('2.4')
c.run_until_scan('5')
for _ in range(len(c.wifi_for_band('2.4').cycler)):
c.run_once()
@@ -1298,7 +988,7 @@
wvtest.WVPASS(c.wifi_for_band('2.4').current_routes())
wvtest.WVPASS(c.wifi_for_band('5').current_routes())
c.run_once()
- wvtest.WVPASSEQ(c.log_upload_count, 1)
+ wvtest.WVPASS(subprocess.upload_logs_and_wait.uploaded_logs())
@wvtest.wvtest
@@ -1320,6 +1010,12 @@
"""
# Make sure we've correctly set up the test; that there is no 5 GHz wifi
# interface.
+ ssid = 'my ssid'
+ psk = 'my psk'
+ subprocess.mock('wifi', 'remote_ap', band='2.4', ssid=ssid, psk=psk,
+ bssid='00:00:00:00:00:00', security='WPA2',
+ connection_check_result='succeed')
+
wvtest.WVPASSEQ(c.wifi_for_band('5'), None)
c.set_ethernet(True)
@@ -1327,17 +1023,17 @@
wvtest.WVPASS(c.internet())
# Make sure this doesn't crash.
- c.write_wlan_config('5', 'my ssid', 'my psk')
+ subprocess.mock('cwmp', '5', ssid=ssid, psk=psk, write_now=True)
c.run_once()
- c.enable_access_point('5')
+ subprocess.mock('cwmp', '5', access_point=True, write_now=True)
c.run_once()
- c.disable_access_point('5')
+ subprocess.mock('cwmp', '5', access_point=False, write_now=True)
c.run_once()
- c.delete_wlan_config('5')
+ subprocess.mock('cwmp', '5', delete_config=True, write_now=True)
c.run_once()
# Make sure 2.4 still works.
- c.write_wlan_config('2.4', 'my ssid', 'my psk')
+ subprocess.mock('cwmp', '2.4', ssid=ssid, psk=psk, write_now=True)
# Connect
c.run_once()
# Process DHCP results
@@ -1394,19 +1090,23 @@
# First, establish that we connect on 2.4 without the experiment, to make sure
# this test doesn't spuriously pass.
- c.write_wlan_config('2.4', 'my ssid', 'my psk')
+ ssid = 'my ssid'
+ psk = 'my psk'
+ subprocess.mock('wifi', 'remote_ap', ssid=ssid, psk=psk, band='2.4',
+ bssid='00:00:00:00:00:00')
+ subprocess.mock('cwmp', '2.4', ssid=ssid, psk=psk, write_now=True)
c.run_once()
wvtest.WVPASS(c.client_up('2.4'))
# Now, force a disconnect by deleting the config.
- c.delete_wlan_config('2.4')
+ subprocess.mock('cwmp', '2.4', delete_config=True, write_now=True)
c.run_once()
wvtest.WVFAIL(c.client_up('2.4'))
# Now enable the experiment, recreate the config, and make sure we don't
# connect.
experiment_testutils.enable('WifiNo2GClient')
- c.write_wlan_config('2.4', 'my ssid', 'my psk')
+ subprocess.mock('cwmp', '2.4', ssid=ssid, psk=psk, write_now=True)
c.run_once()
wvtest.WVFAIL(c.client_up('2.4'))
diff --git a/conman/interface.py b/conman/interface.py
index e71a322..66855dc 100755
--- a/conman/interface.py
+++ b/conman/interface.py
@@ -229,15 +229,12 @@
try:
logging.debug('%s calling ip route %s', self.name, ' '.join(args))
- return self._really_ip_route(*args)
+ return subprocess.check_output(self.IP_ROUTE + list(args))
except subprocess.CalledProcessError as e:
logging.error('Failed to call "ip route" with args %r: %s', args,
e.message)
return ''
- def _really_ip_route(self, *args):
- return subprocess.check_output(self.IP_ROUTE + list(args))
-
def _ip_addr_show(self):
try:
return subprocess.check_output(self.IP_ADDR_SHOW + [self.name])
@@ -546,13 +543,13 @@
def initialize(self):
"""Unset self.initial_ssid, which is only relevant during initialization."""
-
self.initial_ssid = None
super(Wifi, self).initialize()
def connected_to_open(self):
- return (self.wpa_status().get('wpa_state', None) == 'COMPLETED' and
- self.wpa_status().get('key_mgmt', None) == 'NONE')
+ status = self.wpa_status()
+ return (status.get('wpa_state', None) == 'COMPLETED' and
+ status.get('key_mgmt', None) == 'NONE')
# TODO(rofrankel): Remove this if and when the wpactrl failures are fixed.
def wpa_cli_status(self):
@@ -600,6 +597,7 @@
return self._client_mode
def detach(self):
+ self._events = []
raise wpactrl.error('Real WPACtrl always raises this when detaching.')
def pending(self):
diff --git a/conman/interface_test.py b/conman/interface_test.py
index 74363b8..f9c1e6d 100755
--- a/conman/interface_test.py
+++ b/conman/interface_test.py
@@ -5,8 +5,6 @@
import logging
import os
import shutil
-import socket
-import struct
import subprocess
import tempfile
import time
@@ -15,11 +13,6 @@
# pylint: disable=g-import-not-at-top
logging.basicConfig(level=logging.DEBUG)
-# This is in site-packages on the device, but not when running tests, and so
-# raises lint errors.
-# pylint: disable=g-bad-import-order
-import wpactrl
-
import experiment_testutils
import interface
from wvtest import wvtest
@@ -38,87 +31,13 @@
def __init__(self, *args, **kwargs):
super(FakeInterfaceMixin, self).__init__(*args, **kwargs)
self.set_connection_check_result('succeed')
- self.routing_table = {}
- self.ip_testonly = None
-
- def _connection_check(self, *args, **kwargs):
- result = super(FakeInterfaceMixin, self)._connection_check(*args, **kwargs)
- if not self.links:
- return False
- if (self.current_routes().get('default', {}).get('via', None) !=
- self._gateway_ip):
- return False
- return result
+ subprocess.ip.register_testonly(self.name)
def set_connection_check_result(self, result):
if result in ['succeed', 'fail', 'restricted']:
- # pylint: disable=invalid-name
- self.CONNECTION_CHECK = './test/' + result
+ subprocess.mock(self.CONNECTION_CHECK, self.name, result)
else:
- raise ValueError('Invalid fake connection_check script.')
-
- def _really_ip_route(self, *args):
- def can_add_route():
- def ip_to_int(ip):
- return struct.unpack('!I', socket.inet_pton(socket.AF_INET, ip))[0]
-
- if args[1] != 'default':
- return True
-
- via = ip_to_int(args[args.index('via') + 1])
- for (ifc, route, _), _ in self.routing_table.iteritems():
- if ifc != self.name:
- continue
-
- netmask = 0
- if '/' in route:
- route, netmask = route.split('/')
- netmask = 32 - int(netmask)
- route = ip_to_int(route)
-
- if (route >> netmask) == (via >> netmask):
- return True
-
- return False
-
- if not args:
- return '\n'.join(self.routing_table.values() +
- ['1.2.3.4/24 dev fake0 proto kernel scope link',
- # Non-subnet route, e.g. to NFS host.
- '1.2.3.1 dev %s proto kernel scope link' % self.name,
- 'default via 1.2.3.4 dev fake0',
- 'random junk'])
-
- metric = None
- if 'metric' in args:
- metric = args[args.index('metric') + 1]
- if args[0] in ('add', 'del'):
- route = args[1]
- key = (self.name, route, metric)
- if args[0] == 'add' and key not in self.routing_table:
- if not can_add_route():
- raise subprocess.CalledProcessError(
- 'Tried to add default route without subnet route: %r',
- self.routing_table)
- logging.debug('Adding route for %r', key)
- self.routing_table[key] = ' '.join(args[1:])
- elif args[0] == 'del':
- if key in self.routing_table:
- logging.debug('Deleting route for %r', key)
- del self.routing_table[key]
- elif key[2] is None:
- # pylint: disable=g-builtin-op
- for k in self.routing_table.keys():
- if k[:-1] == key[:-1]:
- logging.debug('Deleting route for %r (generalized from %s)', k, key)
- del self.routing_table[k]
- break
-
- def _ip_addr_show(self):
- if self.ip_testonly:
- return _IP_ADDR_SHOW_TPL.format(name=self.name, ip=self.ip_testonly)
-
- return ''
+ raise ValueError('Invalid fake connection_check value.')
def current_routes_normal_testonly(self):
result = self.current_routes()
@@ -129,244 +48,13 @@
pass
-class FakeWPACtrl(object):
- """Fake wpactrl.WPACtrl."""
-
- # pylint: disable=unused-argument
- def __init__(self, wpa_socket):
- self._socket = wpa_socket
- self.events = []
- self.attached = False
- self.connected = False
- self.ssid_testonly = None
- self.secure_testonly = False
- self.request_status_fails = False
-
- def pending(self):
- self.check_socket_exists('pending: socket does not exist')
- return bool(self.events)
-
- def recv(self):
- self.check_socket_exists('recv: socket does not exist')
- return self.events.pop(0)
-
- def attach(self):
- if not os.path.exists(self._socket):
- raise wpactrl.error('wpactrl_attach failed')
- self.attached = True
-
- def detach(self):
- self.attached = False
- self.ssid_testonly = None
- self.secure_testonly = False
- self.connected = False
- self.check_socket_exists('wpactrl_detach failed')
-
- def request(self, request_type):
- if request_type == 'STATUS':
- if self.request_status_fails:
- raise wpactrl.error('test error')
- return self.wpa_cli_status_testonly()
- else:
- raise ValueError('Invalid request_type %s' % request_type)
-
- @property
- def ctrl_iface_path(self):
- return os.path.split(self._socket)[0]
-
- # Below methods are not part of WPACtrl.
-
- def add_event(self, event):
- self.events.append(event)
-
- def add_connected_event(self):
- self.connected = True
- self.add_event(Wifi.CONNECTED_EVENT)
-
- def add_disconnected_event(self):
- self.connected = False
- self.add_event(Wifi.DISCONNECTED_EVENT)
-
- def add_terminating_event(self):
- self.connected = False
- self.add_event(Wifi.TERMINATING_EVENT)
-
- def check_socket_exists(self, msg='Fake socket does not exist'):
- if not os.path.exists(self._socket):
- raise wpactrl.error(msg)
-
- def wpa_cli_status_testonly(self):
- if self.connected:
- return ('foo\nwpa_state=COMPLETED\nssid=%s\nkey_mgmt=%s\nbar' %
- (self.ssid_testonly,
- 'WPA2-PSK' if self.secure_testonly else 'NONE'))
- else:
- return 'wpa_state=SCANNING\naddress=12:34:56:78:90:ab'
-
-
class Wifi(FakeInterfaceMixin, interface.Wifi):
"""Fake Wifi for testing."""
-
- CONNECTED_EVENT = '<2>CTRL-EVENT-CONNECTED'
- DISCONNECTED_EVENT = '<2>CTRL-EVENT-DISCONNECTED'
- TERMINATING_EVENT = '<2>CTRL-EVENT-TERMINATING'
-
- WPACtrl = FakeWPACtrl
-
- def __init__(self, *args, **kwargs):
- super(Wifi, self).__init__(*args, **kwargs)
- self._initial_ssid_testonly = None
- self._secure_testonly = False
-
- def attach_wpa_control(self, path):
- if self._initial_ssid_testonly and self._wpa_control:
- self._wpa_control.connected = True
- super(Wifi, self).attach_wpa_control(path)
-
- def get_wpa_control(self, *args, **kwargs):
- result = super(Wifi, self).get_wpa_control(*args, **kwargs)
- if self._initial_ssid_testonly:
- result.connected = True
- result.ssid_testonly = self._initial_ssid_testonly
- result.secure_testonly = self._secure_testonly
- return result
-
- def add_connected_event(self):
- if self.attached():
- self._wpa_control.add_connected_event()
-
- def add_disconnected_event(self):
- self._initial_ssid_testonly = None
- self._secure_testonly = False
- if self.attached():
- self._wpa_control.add_disconnected_event()
-
- def add_terminating_event(self):
- self._initial_ssid_testonly = None
- self._secure_testonly = False
- if self.attached():
- self._wpa_control.add_terminating_event()
-
- def detach_wpa_control(self):
- self._initial_ssid_testonly = None
- self._secure_testonly = False
- super(Wifi, self).detach_wpa_control()
-
- def wpa_cli_status(self):
- # This is just a convenient way of keeping things dry; the actual wpa_cli
- # status makes a subprocess call which returns the same string.
- return self._wpa_control.wpa_cli_status_testonly()
-
- def start_wpa_supplicant_testonly(self, path):
- wpa_socket = os.path.join(path, self.name)
- logging.debug('Starting fake wpa_supplicant for %s: %s',
- self.name, wpa_socket)
- open(wpa_socket, 'w')
-
- def kill_wpa_supplicant_testonly(self, path):
- logging.debug('Killing fake wpa_supplicant for %s', self.name)
- if self.attached():
- self.detach_wpa_control()
- os.unlink(os.path.join(path, self.name))
- else:
- raise RuntimeError('Trying to kill wpa_supplicant while not attached')
-
-
-class FrenzyWPACtrl(interface.FrenzyWPACtrl):
-
- def __init__(self, *args, **kwargs):
- super(FrenzyWPACtrl, self).__init__(*args, **kwargs)
- self.ssid_testonly = None
- self.secure_testonly = False
- self.request_status_fails = False
-
- def _qcsapi(self, *command):
- return self.fake_qcsapi.get(command[0], None)
-
- def add_connected_event(self):
- self.fake_qcsapi['get_mode'] = 'Station'
- self.fake_qcsapi['get_ssid'] = self.ssid_testonly
- security = 'PSKAuthentication' if self.secure_testonly else 'NONE'
- self.fake_qcsapi['ssid_get_authentication_mode'] = security
-
- def add_disconnected_event(self):
- self.ssid_testonly = None
- self.secure_testonly = False
- self.fake_qcsapi['get_ssid'] = None
- self.fake_qcsapi['ssid_get_authentication_mode'] = 'NONE'
-
- def add_terminating_event(self):
- self.ssid_testonly = None
- self.secure_testonly = False
- self.fake_qcsapi['get_ssid'] = None
- self.fake_qcsapi['get_mode'] = 'AP'
- self.fake_qcsapi['ssid_get_authentication_mode'] = 'NONE'
-
- def detach(self):
- self.add_terminating_event()
- super(FrenzyWPACtrl, self).detach()
+ pass
class FrenzyWifi(FakeInterfaceMixin, interface.FrenzyWifi):
- WPACtrl = FrenzyWPACtrl
-
- def __init__(self, *args, **kwargs):
- super(FrenzyWifi, self).__init__(*args, **kwargs)
- self._initial_ssid_testonly = None
- self._secure_testonly = False
- self.fake_qcsapi = {}
-
- def attach_wpa_control(self, *args, **kwargs):
- super(FrenzyWifi, self).attach_wpa_control(*args, **kwargs)
- if self._wpa_control:
- self._wpa_control.ssid_testonly = self._initial_ssid_testonly
- self._wpa_control.secure_testonly = self._secure_testonly
- if self._initial_ssid_testonly:
- self._wpa_control.add_connected_event()
-
- def get_wpa_control(self, *args, **kwargs):
- result = super(FrenzyWifi, self).get_wpa_control(*args, **kwargs)
- result.fake_qcsapi = self.fake_qcsapi
- if self._initial_ssid_testonly:
- result.fake_qcsapi['get_mode'] = 'Station'
- result.ssid_testonly = self._initial_ssid_testonly
- result.secure_testonly = self._secure_testonly
- result.add_connected_event()
- return result
-
- def add_connected_event(self):
- if self.attached():
- self._wpa_control.add_connected_event()
-
- def add_disconnected_event(self):
- self._initial_ssid_testonly = None
- self._secure_testonly = False
- if self.attached():
- self._wpa_control.add_disconnected_event()
-
- def add_terminating_event(self):
- self._initial_ssid_testonly = None
- self._secure_testonly = False
- if self.attached():
- self._wpa_control.add_terminating_event()
-
- def detach_wpa_control(self):
- self._initial_ssid_testonly = None
- self._secure_testonly = False
- super(FrenzyWifi, self).detach_wpa_control()
-
- def start_wpa_supplicant_testonly(self, unused_path):
- logging.debug('Starting fake wpa_supplicant for %s', self.name)
- self.fake_qcsapi['get_mode'] = 'Station'
-
- def kill_wpa_supplicant_testonly(self, unused_path):
- logging.debug('Killing fake wpa_supplicant for %s', self.name)
- if self.attached():
- # This happens to do what we need.
- self.add_terminating_event()
- self.detach_wpa_control()
- else:
- raise RuntimeError('Trying to kill wpa_supplicant while not attached')
+ pass
@wvtest.wvtest
@@ -443,7 +131,7 @@
wvtest.WVPASS(os.path.exists(autoprov_filepath))
wvtest.WVFAIL(b.get_ip_address())
- b.ip_testonly = '192.168.1.100'
+ subprocess.call(['ip', 'addr', 'add', '192.168.1.100', 'dev', b.name])
wvtest.WVPASSEQ(b.get_ip_address(), '192.168.1.100')
# Get a new gateway/subnet (e.g. due to joining a new network).
@@ -487,38 +175,40 @@
def generic_wifi_test(w, wpa_path):
# Not currently connected.
- w.start_wpa_supplicant_testonly(wpa_path)
+ # w.start_wpa_supplicant_testonly(wpa_path)
+ subprocess.wifi.WPA_PATH = wpa_path
w.attach_wpa_control(wpa_path)
wvtest.WVFAIL(w.wpa_supplicant)
- # pylint: disable=protected-access
- wpa_control = w._wpa_control
-
# wpa_supplicant connects.
- wpa_control.ssid_testonly = 'my=ssid'
- wpa_control.add_connected_event()
+ ssid = 'my=ssid'
+ psk = 'passphrase'
+ subprocess.mock('wifi', 'remote_ap', ssid=ssid, psk=psk, band='5',
+ bssid='00:00:00:00:00:00', connection_check_result='succeed')
+ subprocess.check_call(['wifi', 'setclient', '--ssid', ssid, '--band', '5'],
+ env={'WIFI_CLIENT_PSK': psk})
wvtest.WVFAIL(w.wpa_supplicant)
+ w.attach_wpa_control(wpa_path)
w.handle_wpa_events()
wvtest.WVPASS(w.wpa_supplicant)
w.set_gateway_ip('192.168.1.1')
# wpa_supplicant disconnects.
- wpa_control.add_disconnected_event()
+ subprocess.mock('wifi', 'disconnected_event', '5')
w.handle_wpa_events()
wvtest.WVFAIL(w.wpa_supplicant)
# Now, start over so we can test what happens when wpa_supplicant is already
# connected when we attach.
w.detach_wpa_control()
- # pylint: disable=protected-access
- w._initial_ssid_testonly = 'my=ssid'
w._initialized = False
+ subprocess.check_call(['wifi', 'setclient', '--ssid', ssid, '--band', '5'],
+ env={'WIFI_CLIENT_PSK': psk})
w.attach_wpa_control(wpa_path)
- wpa_control = w._wpa_control
# wpa_supplicant was already connected when we attached.
wvtest.WVPASS(w.wpa_supplicant)
- wvtest.WVPASSEQ(w.initial_ssid, 'my=ssid')
+ wvtest.WVPASSEQ(w.initial_ssid, ssid)
w.initialize()
wvtest.WVPASSEQ(w.initial_ssid, None)
@@ -527,8 +217,7 @@
wvtest.WVPASSNE(w.wpa_status(), {})
# The wpa_supplicant process disconnects and terminates.
- wpa_control.add_disconnected_event()
- wpa_control.add_terminating_event()
+ subprocess.check_call(['wifi', 'stopclient', '--band', '5'])
w.handle_wpa_events()
wvtest.WVFAIL(w.wpa_supplicant)
@@ -537,32 +226,42 @@
def wifi_test():
"""Test Wifi."""
w = Wifi('wcli0', '21')
- w.set_connection_check_result('succeed')
w.initialize()
try:
wpa_path = tempfile.mkdtemp()
+ conman_path = tempfile.mkdtemp()
+ subprocess.set_conman_paths(conman_path, None)
+ subprocess.mock('wifi', 'interfaces',
+ subprocess.wifi.MockInterface(phynum='0', bands=['5'],
+ driver='cfg80211'))
generic_wifi_test(w, wpa_path)
finally:
shutil.rmtree(wpa_path)
+ shutil.rmtree(conman_path)
@wvtest.wvtest
def frenzy_wifi_test():
"""Test FrenzyWifi."""
w = FrenzyWifi('wlan0', '20')
- w.set_connection_check_result('succeed')
w.initialize()
try:
wpa_path = tempfile.mkdtemp()
+ conman_path = tempfile.mkdtemp()
+ subprocess.set_conman_paths(conman_path, None)
+ subprocess.mock('wifi', 'interfaces',
+ subprocess.wifi.MockInterface(phynum='0', bands=['5'],
+ driver='frenzy'))
FrenzyWifi.WPACtrl.WIFIINFO_PATH = wifiinfo_path = tempfile.mkdtemp()
generic_wifi_test(w, wpa_path)
finally:
shutil.rmtree(wpa_path)
+ shutil.rmtree(conman_path)
shutil.rmtree(wifiinfo_path)
diff --git a/conman/iw.py b/conman/iw.py
index f2e15d8..8d80010 100755
--- a/conman/iw.py
+++ b/conman/iw.py
@@ -15,6 +15,7 @@
_BSSID_RE = r'BSS (?P<BSSID>([0-9a-f]{2}:?){6})\(on .*\)'
_SSID_RE = r'SSID: (?P<SSID>.*)'
_RSSI_RE = r'signal: (?P<RSSI>.*) dBm'
+_FREQ_RE = r'freq: (?P<freq>\d+)'
_VENDOR_IE_RE = (r'Vendor specific: OUI (?P<OUI>([0-9a-f]{2}:?){3}), '
'data:(?P<data>( [0-9a-f]{2})+)')
@@ -29,16 +30,17 @@
class BssInfo(object):
"""Contains info about a BSS, parsed from 'iw scan'."""
- def __init__(self, bssid='', ssid='', rssi=-100, security=None,
+ def __init__(self, bssid='', ssid='', rssi=0, band=None, security=None,
vendor_ies=None):
self.bssid = bssid
self.ssid = ssid
self.rssi = rssi
+ self.band = band
self.vendor_ies = vendor_ies or []
self.security = security or []
def __attrs(self):
- return (self.bssid, self.ssid, tuple(sorted(self.vendor_ies)),
+ return (self.bssid, self.ssid, self.band, tuple(sorted(self.vendor_ies)),
tuple(sorted(self.security)), self.rssi)
def __eq__(self, other):
@@ -52,9 +54,9 @@
return hash(self.__attrs())
def __repr__(self):
- return '<BssInfo: SSID=%s BSSID=%s Security=%s Vendor IEs=%s>' % (
- self.ssid, self.bssid, ','.join(self.security),
- ','.join('|'.join(ie) for ie in self.vendor_ies))
+ return ('<BssInfo: SSID=%s BSSID=%s Band=%s Security=%s Vendor IEs=%s>'
+ % (self.ssid, self.bssid, self.band, ','.join(self.security),
+ ','.join('|'.join(ie) for ie in self.vendor_ies)))
# TODO(rofrankel): waveguide also scans. Can we find a way to avoid two programs
@@ -79,6 +81,10 @@
if match:
bss_info.rssi = float(match.group('RSSI'))
continue
+ match = re.match(_FREQ_RE, line)
+ if match:
+ bss_info.band = '2.4' if match.group('freq').startswith('2') else '5'
+ continue
match = re.match(_VENDOR_IE_RE, line)
if match:
bss_info.vendor_ies.append((match.group('OUI'),
diff --git a/conman/iw_test.py b/conman/iw_test.py
index 55b2e7b..202d10c 100755
--- a/conman/iw_test.py
+++ b/conman/iw_test.py
@@ -2,633 +2,48 @@
"""Tests for iw.py."""
+import subprocess
+
import iw
from wvtest import wvtest
-SCAN_OUTPUT = """BSS 00:23:97:57:f4:d8(on wcli0)
- TSF: 1269828266773 usec (14d, 16:43:48)
- freq: 2437
- beacon interval: 100 TUs
- capability: ESS Privacy ShortSlotTime (0x0411)
- signal: -60.00 dBm
- last seen: 2190 ms ago
- Information elements from Probe Response frame:
- Vendor specific: OUI 00:11:22, data: 01 23 45 67
- SSID: short scan result
- Supported rates: 1.0* 2.0* 5.5* 11.0* 18.0 24.0 36.0 54.0
- DS Parameter set: channel 6
- ERP: <no flags>
- ERP D4.0: <no flags>
- Privacy: WEP
- Extended supported rates: 6.0 9.0 12.0 48.0
-BSS 94:b4:0f:f1:02:a0(on wcli0)
- TSF: 16233722683 usec (0d, 04:30:33)
- freq: 2412
- beacon interval: 100 TUs
- capability: ESS Privacy ShortPreamble SpectrumMgmt ShortSlotTime RadioMeasure (0x1531)
- signal: -54.00 dBm
- last seen: 2490 ms ago
- Information elements from Probe Response frame:
- SSID: Google
- Supported rates: 36.0* 48.0 54.0
- DS Parameter set: channel 1
- Country: US Environment: Indoor/Outdoor
- Channels [1 - 11] @ 36 dBm
- Power constraint: 0 dB
- TPC report: TX power: 3 dBm
- ERP: <no flags>
- RSN: * Version: 1
- * Group cipher: CCMP
- * Pairwise ciphers: CCMP
- * Authentication suites: IEEE 802.1X
- * Capabilities: 4-PTKSA-RC 4-GTKSA-RC (0x0028)
- BSS Load:
- * station count: 0
- * channel utilisation: 33/255
- * available admission capacity: 25625 [*32us]
- HT capabilities:
- Capabilities: 0x19ad
- RX LDPC
- HT20
- SM Power Save disabled
- RX HT20 SGI
- TX STBC
- RX STBC 1-stream
- Max AMSDU length: 7935 bytes
- DSSS/CCK HT40
- Maximum RX AMPDU length 65535 bytes (exponent: 0x003)
- Minimum RX AMPDU time spacing: 4 usec (0x05)
- HT RX MCS rate indexes supported: 0-23
- HT TX MCS rate indexes are undefined
- HT operation:
- * primary channel: 1
- * secondary channel offset: no secondary
- * STA channel width: 20 MHz
- * RIFS: 1
- * HT protection: nonmember
- * non-GF present: 0
- * OBSS non-GF present: 1
- * dual beacon: 0
- * dual CTS protection: 0
- * STBC beacon: 0
- * L-SIG TXOP Prot: 0
- * PCO active: 0
- * PCO phase: 0
- Overlapping BSS scan params:
- * passive dwell: 20 TUs
- * active dwell: 10 TUs
- * channel width trigger scan interval: 300 s
- * scan passive total per channel: 200 TUs
- * scan active total per channel: 20 TUs
- * BSS width channel transition delay factor: 5
- * OBSS Scan Activity Threshold: 0.25 %
- Extended capabilities: HT Information Exchange Supported, Extended Channel Switching, BSS Transition, 6
- WMM: * Parameter version 1
- * u-APSD
- * BE: CW 15-1023, AIFSN 3
- * BK: CW 15-1023, AIFSN 7
- * VI: CW 7-15, AIFSN 2, TXOP 3008 usec
- * VO: CW 3-7, AIFSN 2, TXOP 1504 usec
-BSS 94:b4:0f:f1:35:60(on wcli0)
- TSF: 1739987968 usec (0d, 00:28:59)
- freq: 2462
- beacon interval: 100 TUs
- capability: ESS Privacy ShortPreamble SpectrumMgmt ShortSlotTime RadioMeasure (0x1531)
- signal: -39.00 dBm
- last seen: 1910 ms ago
- Information elements from Probe Response frame:
- SSID: Google
- Supported rates: 36.0* 48.0 54.0
- DS Parameter set: channel 11
- Country: US Environment: Indoor/Outdoor
- Channels [1 - 11] @ 36 dBm
- Power constraint: 0 dB
- TPC report: TX power: 3 dBm
- ERP: <no flags>
- RSN: * Version: 1
- * Group cipher: CCMP
- * Pairwise ciphers: CCMP
- * Authentication suites: IEEE 802.1X
- * Capabilities: 4-PTKSA-RC 4-GTKSA-RC (0x0028)
- BSS Load:
- * station count: 0
- * channel utilisation: 49/255
- * available admission capacity: 26875 [*32us]
- HT capabilities:
- Capabilities: 0x19ad
- RX LDPC
- HT20
- SM Power Save disabled
- RX HT20 SGI
- TX STBC
- RX STBC 1-stream
- Max AMSDU length: 7935 bytes
- DSSS/CCK HT40
- Maximum RX AMPDU length 65535 bytes (exponent: 0x003)
- Minimum RX AMPDU time spacing: 4 usec (0x05)
- HT RX MCS rate indexes supported: 0-23
- HT TX MCS rate indexes are undefined
- HT operation:
- * primary channel: 11
- * secondary channel offset: no secondary
- * STA channel width: 20 MHz
- * RIFS: 1
- * HT protection: nonmember
- * non-GF present: 1
- * OBSS non-GF present: 1
- * dual beacon: 0
- * dual CTS protection: 0
- * STBC beacon: 0
- * L-SIG TXOP Prot: 0
- * PCO active: 0
- * PCO phase: 0
- Overlapping BSS scan params:
- * passive dwell: 20 TUs
- * active dwell: 10 TUs
- * channel width trigger scan interval: 300 s
- * scan passive total per channel: 200 TUs
- * scan active total per channel: 20 TUs
- * BSS width channel transition delay factor: 5
- * OBSS Scan Activity Threshold: 0.25 %
- Extended capabilities: HT Information Exchange Supported, Extended Channel Switching, BSS Transition, 6
- WMM: * Parameter version 1
- * u-APSD
- * BE: CW 15-1023, AIFSN 3
- * BK: CW 15-1023, AIFSN 7
- * VI: CW 7-15, AIFSN 2, TXOP 3008 usec
- * VO: CW 3-7, AIFSN 2, TXOP 1504 usec
-BSS 94:b4:0f:f1:35:61(on wcli0)
- TSF: 1739988134 usec (0d, 00:28:59)
- freq: 2462
- beacon interval: 100 TUs
- capability: ESS ShortPreamble SpectrumMgmt ShortSlotTime RadioMeasure (0x1521)
- signal: -38.00 dBm
- last seen: 1910 ms ago
- Information elements from Probe Response frame:
- SSID: GoogleGuest
- Supported rates: 36.0* 48.0 54.0
- DS Parameter set: channel 11
- Country: US Environment: Indoor/Outdoor
- Channels [1 - 11] @ 36 dBm
- Power constraint: 0 dB
- TPC report: TX power: 3 dBm
- ERP: <no flags>
- BSS Load:
- * station count: 1
- * channel utilisation: 49/255
- * available admission capacity: 26875 [*32us]
- HT capabilities:
- Capabilities: 0x19ad
- RX LDPC
- HT20
- SM Power Save disabled
- RX HT20 SGI
- TX STBC
- RX STBC 1-stream
- Max AMSDU length: 7935 bytes
- DSSS/CCK HT40
- Maximum RX AMPDU length 65535 bytes (exponent: 0x003)
- Minimum RX AMPDU time spacing: 4 usec (0x05)
- HT RX MCS rate indexes supported: 0-23
- HT TX MCS rate indexes are undefined
- HT operation:
- * primary channel: 11
- * secondary channel offset: no secondary
- * STA channel width: 20 MHz
- * RIFS: 1
- * HT protection: nonmember
- * non-GF present: 1
- * OBSS non-GF present: 1
- * dual beacon: 0
- * dual CTS protection: 0
- * STBC beacon: 0
- * L-SIG TXOP Prot: 0
- * PCO active: 0
- * PCO phase: 0
- Overlapping BSS scan params:
- * passive dwell: 20 TUs
- * active dwell: 10 TUs
- * channel width trigger scan interval: 300 s
- * scan passive total per channel: 200 TUs
- * scan active total per channel: 20 TUs
- * BSS width channel transition delay factor: 5
- * OBSS Scan Activity Threshold: 0.25 %
- Extended capabilities: HT Information Exchange Supported, Extended Channel Switching, BSS Transition, 6
- WMM: * Parameter version 1
- * u-APSD
- * BE: CW 15-1023, AIFSN 3
- * BK: CW 15-1023, AIFSN 7
- * VI: CW 7-15, AIFSN 2, TXOP 3008 usec
- * VO: CW 3-7, AIFSN 2, TXOP 1504 usec
-BSS 94:b4:0f:f1:3a:e0(on wcli0)
- TSF: 24578560051 usec (0d, 06:49:38)
- freq: 2437
- beacon interval: 100 TUs
- capability: ESS Privacy ShortPreamble SpectrumMgmt ShortSlotTime RadioMeasure (0x1531)
- signal: -55.00 dBm
- last seen: 2310 ms ago
- Information elements from Probe Response frame:
- SSID: Google
- Supported rates: 36.0* 48.0 54.0
- DS Parameter set: channel 6
- TIM: DTIM Count 0 DTIM Period 1 Bitmap Control 0x0 Bitmap[0] 0x0
- Country: US Environment: Indoor/Outdoor
- Channels [1 - 11] @ 36 dBm
- Power constraint: 0 dB
- TPC report: TX power: 3 dBm
- ERP: <no flags>
- RSN: * Version: 1
- * Group cipher: CCMP
- * Pairwise ciphers: CCMP
- * Authentication suites: IEEE 802.1X
- * Capabilities: 4-PTKSA-RC 4-GTKSA-RC (0x0028)
- BSS Load:
- * station count: 1
- * channel utilisation: 21/255
- * available admission capacity: 28125 [*32us]
- HT capabilities:
- Capabilities: 0x19ad
- RX LDPC
- HT20
- SM Power Save disabled
- RX HT20 SGI
- TX STBC
- RX STBC 1-stream
- Max AMSDU length: 7935 bytes
- DSSS/CCK HT40
- Maximum RX AMPDU length 65535 bytes (exponent: 0x003)
- Minimum RX AMPDU time spacing: 4 usec (0x05)
- HT RX MCS rate indexes supported: 0-23
- HT TX MCS rate indexes are undefined
- HT operation:
- * primary channel: 6
- * secondary channel offset: no secondary
- * STA channel width: 20 MHz
- * RIFS: 1
- * HT protection: nonmember
- * non-GF present: 1
- * OBSS non-GF present: 1
- * dual beacon: 0
- * dual CTS protection: 0
- * STBC beacon: 0
- * L-SIG TXOP Prot: 0
- * PCO active: 0
- * PCO phase: 0
- Overlapping BSS scan params:
- * passive dwell: 20 TUs
- * active dwell: 10 TUs
- * channel width trigger scan interval: 300 s
- * scan passive total per channel: 200 TUs
- * scan active total per channel: 20 TUs
- * BSS width channel transition delay factor: 5
- * OBSS Scan Activity Threshold: 0.25 %
- Extended capabilities: HT Information Exchange Supported, Extended Channel Switching, BSS Transition, 6
- WMM: * Parameter version 1
- * u-APSD
- * BE: CW 15-1023, AIFSN 3
- * BK: CW 15-1023, AIFSN 7
- * VI: CW 7-15, AIFSN 2, TXOP 3008 usec
- * VO: CW 3-7, AIFSN 2, TXOP 1504 usec
-BSS 94:b4:0f:f1:3a:e1(on wcli0)
- TSF: 24578576547 usec (0d, 06:49:38)
- freq: 2437
- beacon interval: 100 TUs
- capability: ESS ShortPreamble SpectrumMgmt ShortSlotTime RadioMeasure (0x1521)
- signal: -65.00 dBm
- last seen: 80 ms ago
- Information elements from Probe Response frame:
- SSID: GoogleGuest
- Supported rates: 36.0* 48.0 54.0
- DS Parameter set: channel 6
- Country: US Environment: Indoor/Outdoor
- Channels [1 - 11] @ 36 dBm
- Power constraint: 0 dB
- TPC report: TX power: 3 dBm
- ERP: <no flags>
- BSS Load:
- * station count: 2
- * channel utilisation: 21/255
- * available admission capacity: 28125 [*32us]
- HT capabilities:
- Capabilities: 0x19ad
- RX LDPC
- HT20
- SM Power Save disabled
- RX HT20 SGI
- TX STBC
- RX STBC 1-stream
- Max AMSDU length: 7935 bytes
- DSSS/CCK HT40
- Maximum RX AMPDU length 65535 bytes (exponent: 0x003)
- Minimum RX AMPDU time spacing: 4 usec (0x05)
- HT RX MCS rate indexes supported: 0-23
- HT TX MCS rate indexes are undefined
- HT operation:
- * primary channel: 6
- * secondary channel offset: no secondary
- * STA channel width: 20 MHz
- * RIFS: 1
- * HT protection: nonmember
- * non-GF present: 1
- * OBSS non-GF present: 1
- * dual beacon: 0
- * dual CTS protection: 0
- * STBC beacon: 0
- * L-SIG TXOP Prot: 0
- * PCO active: 0
- * PCO phase: 0
- Overlapping BSS scan params:
- * passive dwell: 20 TUs
- * active dwell: 10 TUs
- * channel width trigger scan interval: 300 s
- * scan passive total per channel: 200 TUs
- * scan active total per channel: 20 TUs
- * BSS width channel transition delay factor: 5
- * OBSS Scan Activity Threshold: 0.25 %
- Extended capabilities: HT Information Exchange Supported, Extended Channel Switching, BSS Transition, 6
- WMM: * Parameter version 1
- * u-APSD
- * BE: CW 15-1023, AIFSN 3
- * BK: CW 15-1023, AIFSN 7
- * VI: CW 7-15, AIFSN 2, TXOP 3008 usec
- * VO: CW 3-7, AIFSN 2, TXOP 1504 usec
-
-BSS 94:b4:0f:f1:36:41(on wcli0)
- TSF: 12499149351 usec (0d, 03:28:19)
- freq: 2437
- beacon interval: 100 TUs
- capability: ESS ShortPreamble SpectrumMgmt ShortSlotTime RadioMeasure (0x1521)
- signal: -67.00 dBm
- last seen: 80 ms ago
- Information elements from Probe Response frame:
- SSID: GoogleGuest
- Supported rates: 36.0* 48.0 54.0
- DS Parameter set: channel 6
- Country: US Environment: Indoor/Outdoor
- Channels [1 - 11] @ 36 dBm
- Power constraint: 0 dB
- TPC report: TX power: 3 dBm
- ERP: <no flags>
- BSS Load:
- * station count: 1
- * channel utilisation: 28/255
- * available admission capacity: 27500 [*32us]
- HT capabilities:
- Capabilities: 0x19ad
- RX LDPC
- HT20
- SM Power Save disabled
- RX HT20 SGI
- TX STBC
- RX STBC 1-stream
- Max AMSDU length: 7935 bytes
- DSSS/CCK HT40
- Maximum RX AMPDU length 65535 bytes (exponent: 0x003)
- Minimum RX AMPDU time spacing: 4 usec (0x05)
- HT RX MCS rate indexes supported: 0-23
- HT TX MCS rate indexes are undefined
- HT operation:
- * primary channel: 6
- * secondary channel offset: no secondary
- * STA channel width: 20 MHz
- * RIFS: 1
- * HT protection: nonmember
- * non-GF present: 1
- * OBSS non-GF present: 1
- * dual beacon: 0
- * dual CTS protection: 0
- * STBC beacon: 0
- * L-SIG TXOP Prot: 0
- * PCO active: 0
- * PCO phase: 0
- Overlapping BSS scan params:
- * passive dwell: 20 TUs
- * active dwell: 10 TUs
- * channel width trigger scan interval: 300 s
- * scan passive total per channel: 200 TUs
- * scan active total per channel: 20 TUs
- * BSS width channel transition delay factor: 5
- * OBSS Scan Activity Threshold: 0.25 %
- Extended capabilities: HT Information Exchange Supported, Extended Channel Switching, BSS Transition, 6
- WMM: * Parameter version 1
- * u-APSD
- * BE: CW 15-1023, AIFSN 3
- * BK: CW 15-1023, AIFSN 7
- * VI: CW 7-15, AIFSN 2, TXOP 3008 usec
- * VO: CW 3-7, AIFSN 2, TXOP 1504 usec
-BSS 94:b4:0f:f1:36:40(on wcli0)
- TSF: 12499150000 usec (0d, 03:28:19)
- freq: 2437
- beacon interval: 100 TUs
- capability: ESS Privacy ShortPreamble SpectrumMgmt ShortSlotTime RadioMeasure (0x1531)
- signal: -66.00 dBm
- last seen: 2350 ms ago
- Information elements from Probe Response frame:
- SSID: Google
- Supported rates: 36.0* 48.0 54.0
- DS Parameter set: channel 6
- TIM: DTIM Count 0 DTIM Period 1 Bitmap Control 0x0 Bitmap[0] 0x0
- Country: US Environment: Indoor/Outdoor
- Channels [1 - 11] @ 36 dBm
- Power constraint: 0 dB
- TPC report: TX power: 3 dBm
- ERP: <no flags>
- RSN: * Version: 1
- * Group cipher: CCMP
- * Pairwise ciphers: CCMP
- * Authentication suites: IEEE 802.1X
- * Capabilities: 4-PTKSA-RC 4-GTKSA-RC (0x0028)
- BSS Load:
- * station count: 0
- * channel utilisation: 28/255
- * available admission capacity: 27500 [*32us]
- HT capabilities:
- Capabilities: 0x19ad
- RX LDPC
- HT20
- SM Power Save disabled
- RX HT20 SGI
- TX STBC
- RX STBC 1-stream
- Max AMSDU length: 7935 bytes
- DSSS/CCK HT40
- Maximum RX AMPDU length 65535 bytes (exponent: 0x003)
- Minimum RX AMPDU time spacing: 4 usec (0x05)
- HT RX MCS rate indexes supported: 0-23
- HT TX MCS rate indexes are undefined
- HT operation:
- * primary channel: 6
- * secondary channel offset: no secondary
- * STA channel width: 20 MHz
- * RIFS: 1
- * HT protection: nonmember
- * non-GF present: 1
- * OBSS non-GF present: 1
- * dual beacon: 0
- * dual CTS protection: 0
- * STBC beacon: 0
- * L-SIG TXOP Prot: 0
- * PCO active: 0
- * PCO phase: 0
- Overlapping BSS scan params:
- * passive dwell: 20 TUs
- * active dwell: 10 TUs
- * channel width trigger scan interval: 300 s
- * scan passive total per channel: 200 TUs
- * scan active total per channel: 20 TUs
- * BSS width channel transition delay factor: 5
- * OBSS Scan Activity Threshold: 0.25 %
- Extended capabilities: HT Information Exchange Supported, Extended Channel Switching, BSS Transition, 6
- WMM: * Parameter version 1
- * u-APSD
- * BE: CW 15-1023, AIFSN 3
- * BK: CW 15-1023, AIFSN 7
- * VI: CW 7-15, AIFSN 2, TXOP 3008 usec
- * VO: CW 3-7, AIFSN 2, TXOP 1504 usec
-BSS 94:b4:0f:f1:36:42(on wcli0)
- TSF: 12499150000 usec (0d, 03:28:19)
- freq: 2437
- beacon interval: 100 TUs
- capability: ESS Privacy ShortPreamble SpectrumMgmt ShortSlotTime RadioMeasure (0x1531)
- signal: -66.00 dBm
- last seen: 2350 ms ago
- Information elements from Probe Response frame:
- SSID:
- Supported rates: 36.0* 48.0 54.0
- DS Parameter set: channel 6
- TIM: DTIM Count 0 DTIM Period 1 Bitmap Control 0x0 Bitmap[0] 0x0
- Country: US Environment: Indoor/Outdoor
- Channels [1 - 11] @ 36 dBm
- Power constraint: 0 dB
- TPC report: TX power: 3 dBm
- ERP: <no flags>
- BSS Load:
- * station count: 0
- * channel utilisation: 28/255
- * available admission capacity: 27500 [*32us]
- HT capabilities:
- Capabilities: 0x19ad
- RX LDPC
- HT20
- SM Power Save disabled
- RX HT20 SGI
- TX STBC
- RX STBC 1-stream
- Max AMSDU length: 7935 bytes
- DSSS/CCK HT40
- Maximum RX AMPDU length 65535 bytes (exponent: 0x003)
- Minimum RX AMPDU time spacing: 4 usec (0x05)
- HT RX MCS rate indexes supported: 0-23
- HT TX MCS rate indexes are undefined
- HT operation:
- * primary channel: 6
- * secondary channel offset: no secondary
- * STA channel width: 20 MHz
- * RIFS: 1
- * HT protection: nonmember
- * non-GF present: 1
- * OBSS non-GF present: 1
- * dual beacon: 0
- * dual CTS protection: 0
- * STBC beacon: 0
- * L-SIG TXOP Prot: 0
- * PCO active: 0
- * PCO phase: 0
- Overlapping BSS scan params:
- * passive dwell: 20 TUs
- * active dwell: 10 TUs
- * channel width trigger scan interval: 300 s
- * scan passive total per channel: 200 TUs
- * scan active total per channel: 20 TUs
- * BSS width channel transition delay factor: 5
- * OBSS Scan Activity Threshold: 0.25 %
- Extended capabilities: HT Information Exchange Supported, Extended Channel Switching, BSS Transition, 6
- WMM: * Parameter version 1
- * u-APSD
- * BE: CW 15-1023, AIFSN 3
- * BK: CW 15-1023, AIFSN 7
- * VI: CW 7-15, AIFSN 2, TXOP 3008 usec
- * VO: CW 3-7, AIFSN 2, TXOP 1504 usec
- Vendor specific: OUI 00:11:22, data: 01 23 45 67
- Vendor specific: OUI f4:f5:e8, data: 01
- Vendor specific: OUI f4:f5:e8, data: 03 47 46 69 62 65 72 53 65 74 75 70 41 75 74 6f 6d 61 74 69 6f 6e
-BSS 00:1a:11:f1:36:43(on wcli0)
- TSF: 12499150000 usec (0d, 03:28:19)
- freq: 2437
- beacon interval: 100 TUs
- capability: ESS Privacy ShortPreamble SpectrumMgmt ShortSlotTime RadioMeasure (0x1531)
- signal: -66.00 dBm
- last seen: 2350 ms ago
- Information elements from Probe Response frame:
- SSID:
- Supported rates: 36.0* 48.0 54.0
- DS Parameter set: channel 6
- TIM: DTIM Count 0 DTIM Period 1 Bitmap Control 0x0 Bitmap[0] 0x0
- Country: US Environment: Indoor/Outdoor
- Channels [1 - 11] @ 36 dBm
- Power constraint: 0 dB
- TPC report: TX power: 3 dBm
- ERP: <no flags>
- BSS Load:
- * station count: 0
- * channel utilisation: 28/255
- * available admission capacity: 27500 [*32us]
- HT capabilities:
- Capabilities: 0x19ad
- RX LDPC
- HT20
- SM Power Save disabled
- RX HT20 SGI
- TX STBC
- RX STBC 1-stream
- Max AMSDU length: 7935 bytes
- DSSS/CCK HT40
- Maximum RX AMPDU length 65535 bytes (exponent: 0x003)
- Minimum RX AMPDU time spacing: 4 usec (0x05)
- HT RX MCS rate indexes supported: 0-23
- HT TX MCS rate indexes are undefined
- HT operation:
- * primary channel: 6
- * secondary channel offset: no secondary
- * STA channel width: 20 MHz
- * RIFS: 1
- * HT protection: nonmember
- * non-GF present: 1
- * OBSS non-GF present: 1
- * dual beacon: 0
- * dual CTS protection: 0
- * STBC beacon: 0
- * L-SIG TXOP Prot: 0
- * PCO active: 0
- * PCO phase: 0
- Overlapping BSS scan params:
- * passive dwell: 20 TUs
- * active dwell: 10 TUs
- * channel width trigger scan interval: 300 s
- * scan passive total per channel: 200 TUs
- * scan active total per channel: 20 TUs
- * BSS width channel transition delay factor: 5
- * OBSS Scan Activity Threshold: 0.25 %
- Extended capabilities: HT Information Exchange Supported, Extended Channel Switching, BSS Transition, 6
- WMM: * Parameter version 1
- * u-APSD
- * BE: CW 15-1023, AIFSN 3
- * BK: CW 15-1023, AIFSN 7
- * VI: CW 7-15, AIFSN 2, TXOP 3008 usec
- * VO: CW 3-7, AIFSN 2, TXOP 1504 usec
-"""
-
-
-# pylint: disable=unused-argument,protected-access
-def fake_scan(*args, **kwargs):
- return SCAN_OUTPUT
-iw._scan = fake_scan
+SCAN_RESULTS = (
+ {'rssi': -60, 'ssid': 'short scan result', 'bssid': '00:23:97:57:f4:d8',
+ 'vendor_ies': [('00:11:22', '01 23 45 67')], 'security': 'WEP'},
+ {'rssi': -54, 'ssid': 'Google', 'bssid': '94:b4:0f:f1:02:a0',
+ 'vendor_ies': [], 'security': 'WPA2'},
+ {'rssi': -39, 'ssid': 'Google', 'bssid': '94:b4:0f:f1:35:60',
+ 'vendor_ies': [], 'security': 'WPA2'},
+ {'rssi': -38, 'ssid': 'GoogleGuest', 'bssid': '94:b4:0f:f1:35:61',
+ 'vendor_ies': [], 'security': None},
+ {'rssi': -55, 'ssid': 'Google', 'bssid': '94:b4:0f:f1:3a:e0',
+ 'vendor_ies': [], 'security': 'WPA2'},
+ {'rssi': -65, 'ssid': 'GoogleGuest', 'bssid': '94:b4:0f:f1:3a:e1',
+ 'vendor_ies': [], 'security': None},
+ {'rssi': -67, 'ssid': 'GoogleGuest', 'bssid': '94:b4:0f:f1:36:41',
+ 'vendor_ies': [], 'security': None},
+ {'rssi': -66, 'ssid': 'Google', 'bssid': '94:b4:0f:f1:36:40',
+ 'vendor_ies': [], 'security': 'WPA2'},
+ {'rssi': -66, 'ssid': '', 'bssid': '94:b4:0f:f1:36:42',
+ 'vendor_ies': [('00:11:22', '01 23 45 67'), ('f4:f5:e8', '01'),
+ ('f4:f5:e8', '03 47 46 69 62 65 72 53 65 74 75 70 41 75 74 '
+ '6f 6d 61 74 69 6f 6e')], 'security': None},
+ {'rssi': -66, 'ssid': '', 'bssid': '00:1a:11:f1:36:43',
+ 'vendor_ies': [], 'security': None},
+)
@wvtest.wvtest
def find_bssids_test():
"""Test iw.find_bssids."""
+ subprocess.mock('wifi', 'interfaces',
+ subprocess.wifi.MockInterface(phynum='0', bands=['2.4', '5'],
+ driver='cfg80211'))
+ subprocess.call(['ifup', 'wcli0'])
+ for scan_result in SCAN_RESULTS:
+ subprocess.mock('wifi', 'remote_ap', band='5', **scan_result)
+
test_ie = ('00:11:22', '01 23 45 67')
provisioning_ie = ('f4:f5:e8', '01')
ssid_ie = (
@@ -637,48 +52,51 @@
)
short_scan_result = iw.BssInfo(ssid='short scan result',
bssid='00:23:97:57:f4:d8',
+ band='5',
rssi=-60,
security=['WEP'],
vendor_ies=[test_ie])
provisioning_bss_info = iw.BssInfo(ssid=iw.DEFAULT_GFIBERSETUP_SSID,
bssid='94:b4:0f:f1:36:42',
+ band='5',
rssi=-66,
vendor_ies=[test_ie, provisioning_ie,
ssid_ie])
provisioning_bss_info_frenzy = iw.BssInfo(ssid=iw.DEFAULT_GFIBERSETUP_SSID,
bssid='00:1a:11:f1:36:43',
+ band='5',
rssi=-66)
wvtest.WVPASSEQ(
- set(iw.find_bssids('wcli0', True)),
+ set(iw.find_bssids('2.4', True)),
set([(short_scan_result, 2.4),
(provisioning_bss_info, 5.34),
(provisioning_bss_info_frenzy, 4.34),
- (iw.BssInfo(ssid='GoogleGuest', bssid='94:b4:0f:f1:36:41', rssi=-67),
- 2.33),
- (iw.BssInfo(ssid='GoogleGuest', bssid='94:b4:0f:f1:3a:e1', rssi=-65),
- 2.35),
- (iw.BssInfo(ssid='GoogleGuest', bssid='94:b4:0f:f1:35:61', rssi=-38),
- 2.62),
- (iw.BssInfo(ssid='Google', bssid='94:b4:0f:f1:36:40', rssi=-66,
- security=['WPA2']), 2.34),
- (iw.BssInfo(ssid='Google', bssid='94:b4:0f:f1:3a:e0', rssi=-55,
- security=['WPA2']), 2.45),
- (iw.BssInfo(ssid='Google', bssid='94:b4:0f:f1:35:60', rssi=-39,
- security=['WPA2']), 2.61),
- (iw.BssInfo(ssid='Google', bssid='94:b4:0f:f1:02:a0', rssi=-54,
- security=['WPA2']), 2.46)]))
+ (iw.BssInfo(ssid='GoogleGuest', bssid='94:b4:0f:f1:36:41',
+ band='5', rssi=-67), 2.33),
+ (iw.BssInfo(ssid='GoogleGuest', bssid='94:b4:0f:f1:3a:e1',
+ band='5', rssi=-65), 2.35),
+ (iw.BssInfo(ssid='GoogleGuest', bssid='94:b4:0f:f1:35:61',
+ band='5', rssi=-38), 2.62),
+ (iw.BssInfo(ssid='Google', bssid='94:b4:0f:f1:36:40', band='5',
+ rssi=-66, security=['WPA2']), 2.34),
+ (iw.BssInfo(ssid='Google', bssid='94:b4:0f:f1:3a:e0', band='5',
+ rssi=-55, security=['WPA2']), 2.45),
+ (iw.BssInfo(ssid='Google', bssid='94:b4:0f:f1:35:60', band='5',
+ rssi=-39, security=['WPA2']), 2.61),
+ (iw.BssInfo(ssid='Google', bssid='94:b4:0f:f1:02:a0', band='5',
+ rssi=-54, security=['WPA2']), 2.46)]))
wvtest.WVPASSEQ(
- set(iw.find_bssids('wcli0', False)),
+ set(iw.find_bssids('2.4', False)),
set([(provisioning_bss_info, 5.34),
(provisioning_bss_info_frenzy, 4.34),
- (iw.BssInfo(ssid='GoogleGuest', bssid='94:b4:0f:f1:36:41', rssi=-67),
- 2.33),
- (iw.BssInfo(ssid='GoogleGuest', bssid='94:b4:0f:f1:3a:e1', rssi=-65),
- 2.35),
- (iw.BssInfo(ssid='GoogleGuest', bssid='94:b4:0f:f1:35:61', rssi=-38),
- 2.62)]))
+ (iw.BssInfo(ssid='GoogleGuest', bssid='94:b4:0f:f1:36:41', band='5',
+ rssi=-67), 2.33),
+ (iw.BssInfo(ssid='GoogleGuest', bssid='94:b4:0f:f1:3a:e1', band='5',
+ rssi=-65), 2.35),
+ (iw.BssInfo(ssid='GoogleGuest', bssid='94:b4:0f:f1:35:61', band='5',
+ rssi=-38), 2.62)]))
if __name__ == '__main__':
wvtest.wvtest_main()
diff --git a/conman/status.py b/conman/status.py
index 8f8d3a1..c5d4187 100644
--- a/conman/status.py
+++ b/conman/status.py
@@ -34,6 +34,7 @@
WAITING_FOR_PROVISIONING = 'WAITING_FOR_PROVISIONING'
WAITING_FOR_DHCP = 'WAITING_FOR_DHCP'
+ ACS_CONNECTION_CHECK = 'ACS_CONNECTION_CHECK'
WAITING_FOR_CWMP_WAKEUP = 'WAITING_FOR_CWMP_WAKEUP'
WAITING_FOR_ACS_SESSION = 'WAITING_FOR_ACS_SESSION'
PROVISIONING_COMPLETED = 'PROVISIONING_COMPLETED'
@@ -93,13 +94,18 @@
(P.WAITING_FOR_PROVISIONING,),
(P.WAITING_FOR_CWMP_WAKEUP, P.WAITING_FOR_ACS_SESSION),
),
+ P.ACS_CONNECTION_CHECK: (
+ (P.WAITING_FOR_PROVISIONING,),
+ (P.WAITING_FOR_DHCP, P.WAITING_FOR_CWMP_WAKEUP,
+ P.WAITING_FOR_ACS_SESSION),
+ ),
P.WAITING_FOR_CWMP_WAKEUP: (
(P.WAITING_FOR_PROVISIONING,),
- (P.WAITING_FOR_DHCP, P.WAITING_FOR_ACS_SESSION),
+ (P.WAITING_FOR_DHCP, P.ACS_CONNECTION_CHECK, P.WAITING_FOR_ACS_SESSION),
),
P.WAITING_FOR_ACS_SESSION: (
(P.WAITING_FOR_PROVISIONING,),
- (P.WAITING_FOR_DHCP, P.WAITING_FOR_CWMP_WAKEUP),
+ (P.WAITING_FOR_DHCP, P.ACS_CONNECTION_CHECK, P.WAITING_FOR_CWMP_WAKEUP),
),
P.PROVISIONING_COMPLETED: (
(),
diff --git a/conman/status_test.py b/conman/status_test.py
index befebbf..38dda83 100755
--- a/conman/status_test.py
+++ b/conman/status_test.py
@@ -137,8 +137,11 @@
check_exported(True, False, status.P.CONNECTED_TO_OPEN)
check_exported(True, False, status.P.WAITING_FOR_PROVISIONING)
check_exported(True, False, status.P.WAITING_FOR_DHCP)
- s.waiting_for_cwmp_wakeup = True
+ s.acs_connection_check = True
check_exported(False, False, status.P.WAITING_FOR_DHCP)
+ check_exported(True, False, status.P.ACS_CONNECTION_CHECK)
+ s.waiting_for_cwmp_wakeup = True
+ check_exported(False, False, status.P.ACS_CONNECTION_CHECK)
check_exported(True, False, status.P.WAITING_FOR_CWMP_WAKEUP)
s.waiting_for_acs_session = True
check_exported(False, False, status.P.WAITING_FOR_DHCP)
@@ -147,6 +150,7 @@
s.provisioning_completed = True
check_exported(False, False, status.P.WAITING_FOR_PROVISIONING)
check_exported(False, False, status.P.WAITING_FOR_DHCP)
+ check_exported(False, False, status.P.ACS_CONNECTION_CHECK)
check_exported(False, False, status.P.WAITING_FOR_CWMP_WAKEUP)
check_exported(False, False, status.P.WAITING_FOR_CWMP_WAKEUP)
diff --git a/conman/test/fail b/conman/test/fail
deleted file mode 100755
index 2bb8d86..0000000
--- a/conman/test/fail
+++ /dev/null
@@ -1,3 +0,0 @@
-#!/bin/sh
-
-exit 1
diff --git a/conman/test/fake_python/subprocess/__init__.py b/conman/test/fake_python/subprocess/__init__.py
new file mode 100644
index 0000000..3d73d4d
--- /dev/null
+++ b/conman/test/fake_python/subprocess/__init__.py
@@ -0,0 +1,149 @@
+#!/usr/bin/python
+
+"""subprocess replacement that implements specific programs in Python."""
+
+import importlib
+import logging
+import os
+import types
+
+logger = logging.getLogger('subprocess')
+logger.setLevel(logging.DEBUG)
+
+
+# Values are only for when the module name does not match the command name.
+_COMMAND_NAMES = {
+ 'connection_check': None,
+ 'cwmp': None,
+ 'get_quantenna_interfaces': 'get-quantenna-interfaces',
+ 'ifdown': None,
+ 'ifplugd_action': '/etc/ifplugd/ifplugd.action',
+ 'ifup': None,
+ 'ip': None,
+ 'register_experiment': None,
+ 'run_dhclient': 'run-dhclient',
+ 'qcsapi': None,
+ 'upload_logs_and_wait': 'upload-logs-and-wait',
+ 'wifi': None,
+ 'wpa_cli': None,
+}
+_COMMANDS = {v or k: importlib.import_module('.' + k, __name__)
+ for k, v in _COMMAND_NAMES.iteritems()}
+
+STDOUT = 1
+STDERR = 2
+
+
+class CalledProcessError(Exception):
+
+ def __init__(self, returncode, cmd, output):
+ super(CalledProcessError, self).__init__()
+ self.returncode = returncode
+ self.cmd = cmd
+ self.output = output
+
+ def __repr__(self):
+ return ('CalledProcessError: '
+ 'Command "%r" returned non-zero exit status %d: %s'
+ % (self.cmd, self.returncode, self.output))
+
+
+def _call(command, **kwargs):
+ """Fake subprocess call."""
+ if type(command) not in (tuple, list):
+ raise Exception('Fake subprocess.call only supports list/tuple commands, '
+ 'got: %s', command)
+
+ ignored_kwargs = ('stdout', 'stderr')
+ for ignored_kwarg in ignored_kwargs:
+ kwargs.pop(ignored_kwarg, None)
+ extra_env = kwargs.pop('env', {})
+ if kwargs:
+ raise Exception('Fake subprocess.call does not support these kwargs: %s'
+ % kwargs.keys())
+
+ logger.debug('%r%s', command, (', env %r' % extra_env) if extra_env else '')
+
+ command, args = command[0], command[1:]
+
+ if command not in _COMMANDS:
+ raise Exception('Fake subprocess.call does not support %r, supports %r' %
+ (command, _COMMANDS.keys()))
+
+ impl = _COMMANDS[command]
+ if isinstance(impl, types.ModuleType):
+ impl = impl.call
+
+ forwarded_kwargs = {}
+ if extra_env:
+ forwarded_kwargs['env'] = extra_env
+ return impl(*args, **forwarded_kwargs)
+
+
+def call(command, **kwargs):
+ rc, _ = _call(command, **kwargs)
+ return rc
+
+
+def check_call(command, **kwargs):
+ rc, output = _call(command, **kwargs)
+ if rc:
+ raise CalledProcessError(rc, command, output)
+ return True
+
+
+def check_output(command, **kwargs):
+ rc, output = _call(command, **kwargs)
+ if rc != 0:
+ raise CalledProcessError(rc, command, output)
+ return output
+
+
+def mock(command, *args, **kwargs):
+ _COMMANDS[command].mock(*args, **kwargs)
+
+
+def reset():
+ """Reset any module-level state."""
+ for command in _COMMANDS.itervalues():
+ if isinstance(command, types.ModuleType):
+ reload(command)
+
+
+def set_conman_paths(tmp_path=None, config_path=None, cwmp_path=None):
+ for command in ('run-dhclient', '/etc/ifplugd/ifplugd.action'):
+ _COMMANDS[command].CONMAN_PATH = tmp_path
+
+ for command in ('cwmp',):
+ _COMMANDS[command].CONMAN_CONFIG_PATH = config_path
+
+ for command in ('cwmp',):
+ _COMMANDS[command].CWMP_PATH = cwmp_path
+
+ # Make sure <tmp_path>/interfaces exists.
+ tmp_interfaces_path = os.path.join(tmp_path, 'interfaces')
+ if not os.path.exists(tmp_interfaces_path):
+ os.mkdir(tmp_interfaces_path)
+
+
+# Some tiny fake implementations don't need their own file.
+
+
+def echo(*s):
+ return 0, ' '.join(s)
+
+
+def env(extra_env, *command, **kwargs):
+ final_env = kwargs.get('env', {})
+ k, v = extra_env.split('=')
+ final_env[k] = v
+ kwargs['env'] = final_env
+ return _call(command, **kwargs)
+
+
+def timeout(unused_t, *command, **kwargs):
+ """Just a transparent pass-through."""
+ return _call(command, **kwargs)
+
+
+_COMMANDS.update({'echo': echo, 'env': env, 'timeout': timeout,})
diff --git a/conman/test/fake_python/subprocess/connection_check.py b/conman/test/fake_python/subprocess/connection_check.py
new file mode 100644
index 0000000..8c23c50
--- /dev/null
+++ b/conman/test/fake_python/subprocess/connection_check.py
@@ -0,0 +1,19 @@
+#!/usr/bin/python
+
+"""Fake connection_check implementation."""
+
+RESULTS = {}
+
+
+def mock(interface, result):
+ RESULTS[interface] = result
+
+
+def call(*args):
+ interface = args[args.index('-I') + 1]
+ result = RESULTS.get(interface, 'fail')
+
+ if result == 'restricted' and '-a' in args:
+ result = 'succeed'
+
+ return (0 if result == 'succeed' else 1), ''
diff --git a/conman/test/fake_python/subprocess/cwmp.py b/conman/test/fake_python/subprocess/cwmp.py
new file mode 100644
index 0000000..f34cd2f
--- /dev/null
+++ b/conman/test/fake_python/subprocess/cwmp.py
@@ -0,0 +1,148 @@
+#!/usr/bin/python
+
+"""Fake catawampus implementation."""
+
+import logging
+import os
+
+import connection_check
+
+
+logger = logging.getLogger('subprocess.cwmp')
+
+CONMAN_CONFIG_PATH = None
+CWMP_PATH = None
+CONFIG = {}
+ACCESS_POINT = {}
+ACS_SESSION_FAILS = False
+
+
+def call(command, env=None):
+ if command == 'wakeup':
+ if not CONMAN_CONFIG_PATH:
+ raise ValueError('Call subprocess.set_conman_paths before calling '
+ '"cwmp wakeup".')
+
+ write_acscontact()
+
+ if ACS_SESSION_FAILS:
+ return 0, ''
+
+ if ((env and 'write_now_testonly' in env) or
+ [result for result in connection_check.RESULTS.itervalues()
+ if result in ('restricted', 'succeed')]):
+ for band in ('2.4', '5'):
+ if CONFIG.get(band, None):
+ write_wlan_config(band)
+ else:
+ delete_wlan_config(band)
+ disable_access_point(band)
+
+ if ACCESS_POINT.get(band, False):
+ enable_access_point(band)
+ else:
+ disable_access_point(band)
+
+ logger.debug('Fake ACS session completing')
+ write_acsconnected()
+ else:
+ logger.debug('ACS session failed due to no working connections')
+
+ return 0, ''
+
+ raise ValueError('Fake cwmp only supports "wakeup" command.')
+
+
+def wlan_config_filename(band):
+ return os.path.join(CONMAN_CONFIG_PATH, 'command.%s' % band)
+
+
+def access_point_filename(band):
+ return os.path.join(CONMAN_CONFIG_PATH, 'access_point.%s' % band)
+
+
+def write_wlan_config(band):
+ final_filename = wlan_config_filename(band)
+ logger.debug('Writing config for band %s: %s', band, final_filename)
+ # We don't care which writes are atomic, as long as some but not all are.
+ # Making it depend on band achieves this.
+ atomic = band == '2.4'
+ filename = final_filename + ('.tmp' if atomic else '')
+ with open(filename, 'w') as f:
+ f.write('\n'.join(['env', 'WIFI_PSK=%s' % CONFIG[band]['psk'],
+ 'wifi', 'set', '--band', band,
+ '--ssid', CONFIG[band]['ssid']]))
+ logger.debug( 'wrote to filename %s', filename)
+ if atomic:
+ logger.debug( 'moving from %s to %s', filename, final_filename)
+ os.rename(filename, final_filename)
+
+
+def enable_access_point(band):
+ logger.debug('Enabling AP for band %s', band)
+ open(access_point_filename(band), 'w')
+
+
+def delete_wlan_config(band):
+ config_filename = wlan_config_filename(band)
+ if os.path.exists(config_filename):
+ logger.debug('Deleting config for band %s', band)
+ os.unlink(config_filename)
+
+
+def disable_access_point(band):
+ ap_filename = access_point_filename(band)
+ if os.path.isfile(ap_filename):
+ logger.debug('Disabling AP for band %s', band)
+ os.unlink(ap_filename)
+
+
+def write_acscontact():
+ logger.debug('ACS session started')
+ open(os.path.join(CWMP_PATH, 'acscontact'), 'w')
+
+
+def write_acsconnected():
+ logger.debug('ACS session completed')
+ open(os.path.join(CWMP_PATH, 'acsconnected'), 'w')
+
+
+def mock(band, access_point=None, delete_config=False, ssid=None, psk=None,
+ write_now=False, acs_session_fails=None):
+ """Mock the config written by catawampus.
+
+ Args:
+ band: The band for which things are being mocked.
+ access_point: Set to True or False to enable/disable the AP.
+ delete_config: Set to True to delete the config.
+ ssid: If updating config, the ssid to use. psk must also be set.
+ psk: If updating config, the psk to use. ssid must also be set.
+ write_now: If updating config, write it immediately.
+
+ Raises:
+ ValueError: If invalid values are specified.
+ """
+ if acs_session_fails is not None:
+ global ACS_SESSION_FAILS
+ ACS_SESSION_FAILS = acs_session_fails
+
+ if access_point is not None:
+ if access_point not in (True, False):
+ raise ValueError('access_point should only be mocked as True/False')
+ ACCESS_POINT[band] = access_point
+ logger.debug('AP mocked %s', access_point)
+
+ if delete_config:
+ logger.debug('Config mock removed for band %s', band)
+ CONFIG[band] = None
+ elif ssid and psk:
+ logger.debug('Config mock updated for band %s', band)
+ CONFIG[band] = {'ssid': ssid, 'psk': psk}
+ elif ssid or psk:
+ raise ValueError('Cannot set only one of ssid (%r) and psk (%r).',
+ ssid, psk)
+
+ if write_now:
+ call('wakeup', env={'write_now_testonly': True})
+
+
diff --git a/conman/test/fake_python/subprocess/get_quantenna_interfaces.py b/conman/test/fake_python/subprocess/get_quantenna_interfaces.py
new file mode 100644
index 0000000..7316139
--- /dev/null
+++ b/conman/test/fake_python/subprocess/get_quantenna_interfaces.py
@@ -0,0 +1,14 @@
+#!/usr/bin/python -S
+
+"""Fake get-quantenna-interfaces implementation."""
+
+_INTERFACES = []
+
+
+def call(*unused_args, **unused_kwargs):
+ return 0, '\n'.join(_INTERFACES)
+
+
+def mock(interfaces):
+ global _INTERFACES
+ _INTERFACES = list(interfaces)
diff --git a/conman/test/fake_python/subprocess/ifdown.py b/conman/test/fake_python/subprocess/ifdown.py
new file mode 100644
index 0000000..0677e4f
--- /dev/null
+++ b/conman/test/fake_python/subprocess/ifdown.py
@@ -0,0 +1,10 @@
+#!/usr/bin/python
+
+"""Fake ifdown implementation."""
+
+import ifup
+
+
+def call(interface):
+ ifup.INTERFACE_STATE[interface] = False
+ return 0, ''
diff --git a/conman/test/fake_python/subprocess/ifplugd_action.py b/conman/test/fake_python/subprocess/ifplugd_action.py
new file mode 100644
index 0000000..ab6a97a
--- /dev/null
+++ b/conman/test/fake_python/subprocess/ifplugd_action.py
@@ -0,0 +1,27 @@
+#!/usr/bin/python
+
+"""Fake ifplugd.action implementation."""
+
+import os
+
+import run_dhclient
+
+CONMAN_PATH = None
+
+
+def call(interface, state):
+ if CONMAN_PATH is None:
+ raise ValueError('Need to set subprocess.ifplugd_action.CONMAN_PATH')
+
+ if state not in ('up', 'down'):
+ raise ValueError('state should be "up" or "down"')
+
+ status_file = os.path.join(CONMAN_PATH, 'interfaces', interface)
+ with open(status_file, 'w') as f:
+ # This value doesn't matter to conman, so it's fine to hard code it here.
+ f.write('1' if state == 'up' else '0')
+
+ # ifplugd.action calls run-dhclient.
+ run_dhclient.call('br0' if interface in ('eth0', 'moca0') else interface)
+
+ return 0, ''
diff --git a/conman/test/fake_python/subprocess/ifup.py b/conman/test/fake_python/subprocess/ifup.py
new file mode 100644
index 0000000..7669555
--- /dev/null
+++ b/conman/test/fake_python/subprocess/ifup.py
@@ -0,0 +1,10 @@
+#!/usr/bin/python
+
+"""Fake ifup implementation."""
+
+INTERFACE_STATE = {}
+
+
+def call(interface):
+ INTERFACE_STATE[interface] = True
+ return 0, ''
diff --git a/conman/test/fake_python/subprocess/ip.py b/conman/test/fake_python/subprocess/ip.py
new file mode 100644
index 0000000..d8baaf3
--- /dev/null
+++ b/conman/test/fake_python/subprocess/ip.py
@@ -0,0 +1,136 @@
+#!/usr/bin/python
+
+"""Fake ip route implementation."""
+
+import logging
+import socket
+import struct
+
+import ifup
+
+
+_ROUTING_TABLE = {}
+_IP_TABLE = {}
+
+
+def call(subcommand, *args):
+ """Fake ip command."""
+ subcommands = {
+ 'route': _ip_route,
+ 'addr': _ip_addr,
+ 'link': _link,
+ }
+
+ if subcommand not in subcommands:
+ return 1, 'ip subcommand %r not supported' % subcommand
+
+ return subcommands[subcommand](args)
+
+
+def register_testonly(interface):
+ if interface not in _IP_TABLE:
+ _IP_TABLE[interface] = set()
+
+
+def _ip_route(args):
+ def can_add_route(dev):
+ def ip_to_int(ip_addr):
+ return struct.unpack('!I', socket.inet_pton(socket.AF_INET, ip_addr))[0]
+
+ if args[1] != 'default':
+ return True
+
+ via = ip_to_int(args[args.index('via') + 1])
+ for (ifc, route, _), _ in _ROUTING_TABLE.iteritems():
+ if ifc != dev:
+ continue
+
+ netmask = 0
+ if '/' in route:
+ route, netmask = route.split('/')
+ netmask = 32 - int(netmask)
+ route = ip_to_int(route)
+
+ if (route >> netmask) == (via >> netmask):
+ return True
+
+ return False
+
+ if not args:
+ return 0, '\n'.join(_ROUTING_TABLE.values())
+
+ if 'dev' not in args:
+ raise Exception('fake ip route got no dev')
+
+ dev = args[args.index('dev') + 1]
+
+ metric = None
+ if 'metric' in args:
+ metric = args[args.index('metric') + 1]
+ if args[0] in ('add', 'del'):
+ route = args[1]
+ key = (dev, route, metric)
+ if args[0] == 'add' and key not in _ROUTING_TABLE:
+ if not can_add_route(dev):
+ return (1, 'Tried to add default route without subnet route: %r' %
+ _ROUTING_TABLE)
+ logging.debug('Adding route for %r', key)
+ _ROUTING_TABLE[key] = ' '.join(args[1:])
+ elif args[0] == 'del':
+ if key in _ROUTING_TABLE:
+ logging.debug('Deleting route for %r', key)
+ del _ROUTING_TABLE[key]
+ elif key[2] is None:
+ # pylint: disable=g-builtin-op
+ for k in _ROUTING_TABLE.keys():
+ if k[:-1] == key[:-1]:
+ logging.debug('Deleting route for %r (generalized from %s)', k, key)
+ del _ROUTING_TABLE[k]
+ break
+
+ return 0, ''
+
+
+# pylint: disable=line-too-long
+_IP_ADDR_SHOW_TPL = """4: {name}: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000
+ link/ether fe:fb:01:80:1b:74 brd ff:ff:ff:ff:ff:ff
+{ips}
+"""
+
+_IP_ADDR_SHOW_IP_TPL = """ inet {ip}/24 brd 100.100.255.255 scope global {name}
+ valid_lft forever preferred_lft forever
+"""
+
+
+def _ip_addr(args):
+ if 'dev' not in args:
+ raise Exception('fake ip addr show got no dev')
+
+ dev = args[args.index('dev') + 1]
+ if dev not in _IP_TABLE:
+ return 255, 'Device "%r" does not exist' % dev
+
+ if 'show' in args:
+ ips = '\n'.join(_IP_ADDR_SHOW_IP_TPL.format(name=dev, ip=addr)
+ for addr in _IP_TABLE[dev])
+ return 0, _IP_ADDR_SHOW_TPL.format(name=dev, ips=ips)
+
+ if 'add' in args:
+ add = args[args.index('add') + 1]
+ _IP_TABLE[dev].add(add)
+ return 0, ''
+
+ if 'del' in args:
+ remove = args[args.index('del') + 1]
+ if remove in _IP_TABLE[dev]:
+ _IP_TABLE[dev].remove(remove)
+ return 0, ''
+ return 254, 'RTNETLINK answers: Cannot assign requested address'
+
+ raise Exception('no recognized ip addr command in %r' % args)
+
+
+def _link(args):
+ return 0, '\n'.join('%s LOWER_UP' % interface
+ for interface, state in ifup.INTERFACE_STATE.iteritems()
+ if state)
diff --git a/conman/test/fake_python/subprocess/qcsapi.py b/conman/test/fake_python/subprocess/qcsapi.py
new file mode 100644
index 0000000..3625772
--- /dev/null
+++ b/conman/test/fake_python/subprocess/qcsapi.py
@@ -0,0 +1,25 @@
+#!/usr/bin/python -S
+
+"""Fake QCSAPI implementation."""
+
+
+STATE = {}
+
+
+def call(*args):
+ if args not in STATE:
+ return 1, 'No mocked value for args %r' % (args,)
+
+ return 0, STATE[args]
+
+
+def mock(*args, **kwargs):
+ import logging
+ if 'value' not in kwargs:
+ raise ValueError('Must specify value for mock qcsapi call %r' % args)
+ value = kwargs['value']
+ logging.debug ('qcsapi %r mocked: %r', args, value)
+ if value is None and args in STATE:
+ del STATE[args]
+ else:
+ STATE[args] = value
diff --git a/conman/test/fake_python/subprocess/register_experiment.py b/conman/test/fake_python/subprocess/register_experiment.py
new file mode 100644
index 0000000..a2dab49
--- /dev/null
+++ b/conman/test/fake_python/subprocess/register_experiment.py
@@ -0,0 +1,11 @@
+#!/usr/bin/python
+
+"""Fake register_experiment implementation."""
+
+
+REGISTERED_EXPERIMENTS = set()
+
+
+def call(experiment):
+ REGISTERED_EXPERIMENTS.add(experiment)
+ return 0, ''
diff --git a/conman/test/fake_python/subprocess/run_dhclient.py b/conman/test/fake_python/subprocess/run_dhclient.py
new file mode 100644
index 0000000..a1bffb3
--- /dev/null
+++ b/conman/test/fake_python/subprocess/run_dhclient.py
@@ -0,0 +1,36 @@
+#!/usr/bin/python
+
+"""Fake run-dhclient implementation."""
+
+import os
+
+
+CONMAN_PATH = None
+_FAILURE = {}
+
+
+def mock(interface, failure=False):
+ _FAILURE[interface] = failure
+
+
+def call(interface):
+ if CONMAN_PATH is None:
+ raise ValueError('Need to set subprocess.ifplugd_action.CONMAN_PATH')
+
+ if not _FAILURE.get(interface, False):
+ _write_subnet_file(interface)
+ _write_gateway_file(interface)
+
+
+def _write_gateway_file(interface):
+ gateway_file = os.path.join(CONMAN_PATH, 'gateway.' + interface)
+ with open(gateway_file, 'w') as f:
+ # This value doesn't matter to conman, so it's fine to hard code it here.
+ f.write('192.168.1.1')
+
+
+def _write_subnet_file(interface):
+ subnet_file = os.path.join(CONMAN_PATH, 'subnet.' + interface)
+ with open(subnet_file, 'w') as f:
+ # This value doesn't matter to conman, so it's fine to hard code it here.
+ f.write('192.168.1.0/24')
diff --git a/conman/test/fake_python/subprocess/upload_logs_and_wait.py b/conman/test/fake_python/subprocess/upload_logs_and_wait.py
new file mode 100644
index 0000000..6c45f87
--- /dev/null
+++ b/conman/test/fake_python/subprocess/upload_logs_and_wait.py
@@ -0,0 +1,18 @@
+#!/usr/bin/python
+
+"""Fake upload-logs-and-wait implementation."""
+
+UPLOADED = False
+
+
+def call():
+ global UPLOADED
+ UPLOADED = True
+ return 0, ''
+
+
+def uploaded_logs():
+ global UPLOADED
+ result = UPLOADED
+ UPLOADED = False
+ return result
diff --git a/conman/test/fake_python/subprocess/wifi.py b/conman/test/fake_python/subprocess/wifi.py
new file mode 100644
index 0000000..13d1be3
--- /dev/null
+++ b/conman/test/fake_python/subprocess/wifi.py
@@ -0,0 +1,323 @@
+#!/usr/bin/python
+
+"""Fake /bin/wifi implementation."""
+
+import collections
+import os
+import random
+
+import connection_check
+import get_quantenna_interfaces
+import ifplugd_action
+import ifup
+import qcsapi
+import wpa_cli
+
+
+MockInterface = collections.namedtuple('MockInterface',
+ ['phynum', 'bands', 'driver'])
+
+
+# A randomly selceted wifi scan result with the interesting stuff templated.
+WIFI_SCAN_TPL = '''BSS {bssid}(on wcli0)
+ TSF: 1269828266773 usec (14d, 16:43:48)
+ freq: {freq}
+ beacon interval: 100 TUs
+ capability: ESS Privacy ShortSlotTime (0x0411)
+ signal: {rssi}
+ last seen: 2190 ms ago
+ Information elements from Probe Response frame:
+ {vendor_ies}
+ SSID: {ssid}
+ Supported rates: 1.0* 2.0* 5.5* 11.0* 18.0 24.0 36.0 54.0
+ DS Parameter set: channel 6
+ ERP: <no flags>
+ ERP D4.0: <no flags>
+ {security}
+ Extended supported rates: 6.0 9.0 12.0 48.0
+'''
+
+VENDOR_IE_TPL = ' Vendor specific: OUI {oui}, data: {data}'
+
+
+WIFI_SHOW_TPL = '''Band: {band}
+RegDomain: US
+Interface: wlan{phynum} # {band} GHz ap
+BSSID: f4:f5:e8:81:1b:a0
+AutoChannel: True
+AutoType: NONDFS
+Station List for band: {band}
+
+Client Interface: wcli{phynum} # {band} GHz client
+Client BSSID: f4:f5:e8:81:1b:a1
+'''
+
+WIFI_SHOW_NO_RADIO_TPL = '''Band: {band}
+RegDomain: 00
+'''
+
+WPA_PATH = None
+REMOTE_ACCESS_POINTS = collections.defaultdict(dict)
+INTERFACE_FOR_BAND = collections.defaultdict(lambda: None)
+INTERFACE_EVENTS = collections.defaultdict(list)
+LOCAL_ACCESS_POINTS = {}
+CLIENT_ASSOCIATIONS = {}
+
+
+class AccessPoint(object):
+
+ def __init__(self, **kwargs):
+ for attr in ('ssid', 'psk', 'band', 'bssid', 'security', 'rssi',
+ 'vendor_ies', 'connection_check_result', 'hidden'):
+ setattr(self, attr, kwargs.get(attr, None))
+
+ def scan_str(self):
+ security_strs = {
+ 'WEP': ' Privacy: WEP',
+ 'WPA': ' WPA:',
+ 'WPA2': ' RSN: * Version: 1',
+ }
+ return WIFI_SCAN_TPL.format(
+ ssid=self.ssid if not self.hidden else '',
+ freq='2437' if self.band == '2.4' else '5160',
+ bssid=self.bssid,
+ vendor_ies='\n'.join(VENDOR_IE_TPL.format(oui=oui, data=data)
+ for oui, data in (self.vendor_ies or [])),
+ rssi='%.2f dBm' % (self.rssi or 0),
+ security=security_strs.get(self.security, ''))
+
+
+def call(*args, **kwargs):
+ wifi_commands = {
+ 'scan': _scan,
+ 'set': _set,
+ 'stopap': _stopap,
+ 'setclient': _setclient,
+ 'stopclient': _stopclient,
+ 'stop': _stop,
+ 'show': _show,
+ }
+
+ if WPA_PATH is None and args[0].endswith('client'):
+ raise ValueError('Set subprocess.wifi.WPA_PATH before calling a fake '
+ '"wifi *client" command')
+
+ if args[0] in wifi_commands:
+ return wifi_commands[args[0]](args[1:], env=kwargs.get('env', {}))
+
+ return 99, 'unrecognized command %s' % args[0]
+
+
+def _set(args, env=None):
+ band = _get_flag(args, ('-b', '--band'))
+ LOCAL_ACCESS_POINTS[band] = args, env
+ return 0, ''
+
+
+def _stopap(args, env=None):
+ bands = _get_flag(args, ('-b', '--band')) or '2.4 5'
+ for band in bands.split():
+ if band in LOCAL_ACCESS_POINTS:
+ del LOCAL_ACCESS_POINTS[band]
+
+ return 0, ''
+
+
+def _setclient(args, env=None):
+ env = env or {}
+
+ band = _get_flag(args, ('-b', '--band'))
+ bssid = _get_flag(args, ('--bssid',))
+ ssid = _get_flag(args, ('S', '--ssid',))
+
+ if band not in INTERFACE_FOR_BAND:
+ raise ValueError('No interface for band %r' % band)
+
+ interface = INTERFACE_FOR_BAND[band]
+ interface_name = 'wcli%s' % interface.phynum
+
+ if bssid:
+ ap = REMOTE_ACCESS_POINTS[band].get(bssid, None)
+ if not ap or ap.ssid != ssid:
+ _setclient_error_not_found(interface_name, ssid, interface.driver)
+ return 1, ('AP with band %r and BSSID %r and ssid %s not found'
+ % (band, bssid, ssid))
+ elif ssid:
+ candidates = [ap for ap in REMOTE_ACCESS_POINTS[band].itervalues()
+ if ap.ssid == ssid]
+ if not candidates:
+ _setclient_error_not_found(interface_name, ssid, interface.driver)
+ return 1, 'AP with SSID %r not found' % ssid
+ ap = random.choice(candidates)
+ else:
+ raise ValueError('Did not specify BSSID or SSID in %r' % args)
+
+ psk = env.get('WIFI_CLIENT_PSK', None)
+ if psk != ap.psk:
+ _setclient_error_auth(interface_name, ssid, interface.driver)
+ return 1, 'Wrong PSK, got %r, expected %r' % (psk, ap.psk)
+
+ _setclient_success(interface_name, ssid, bssid, psk, interface.driver, ap,
+ band)
+
+ return 0, ''
+
+
+def _setclient_error_not_found(interface_name, ssid, driver):
+ if driver == 'cfg80211':
+ wpa_cli.mock(interface_name, wpa_state='SCANNING')
+ elif driver == 'frenzy':
+ qcsapi.mock('get_mode', 'wifi0', value='Station')
+ qcsapi.mock('get_ssid', 'wifi0', value='')
+ qcsapi.mock('ssid_get_authentication_mode', 'wifi0', ssid, value='')
+ qcsapi.mock('get_status', 'wifi0', value='Error')
+
+ CLIENT_ASSOCIATIONS[interface_name] = None
+
+
+def _setclient_error_auth(interface_name, ssid, driver):
+ if driver == 'cfg80211':
+ # This is what our version of wpa_supplicant does for auth failures.
+ INTERFACE_EVENTS[interface_name].append('<2>CTRL-EVENT-SSID-TEMP-DISABLED')
+ wpa_cli.mock(interface_name, wpa_state='SCANNING')
+ elif driver == 'frenzy':
+ qcsapi.mock('get_mode', 'wifi0', value='Station')
+ qcsapi.mock('get_ssid', 'wifi0', value='')
+ qcsapi.mock('ssid_get_authentication_mode', 'wifi0', ssid, value='')
+ qcsapi.mock('get_status', 'wifi0', value='Error')
+
+ CLIENT_ASSOCIATIONS[interface_name] = None
+
+
+def _setclient_success(interface_name, ssid, bssid, psk, driver, ap, band):
+ if CLIENT_ASSOCIATIONS.get(interface_name, None):
+ _disconnected_event(band)
+ if driver == 'cfg80211':
+ # Make sure the wpa_supplicant socket exists.
+ open(os.path.join(WPA_PATH, interface_name), 'w')
+
+ # Tell wpa_cli what to return.
+ key_mgmt = 'WPA2-PSK' if psk else 'NONE'
+ wpa_cli.mock(interface_name, wpa_state='COMPLETED', ssid=ssid, bssid=bssid,
+ key_mgmt=key_mgmt)
+
+ # Send the CONNECTED event.
+ INTERFACE_EVENTS[interface_name].append('<2>CTRL-EVENT-CONNECTED')
+
+ elif driver == 'frenzy':
+ qcsapi.mock('get_mode', 'wifi0', value='Station')
+ qcsapi.mock('get_ssid', 'wifi0', value=ssid)
+ qcsapi.mock('ssid_get_authentication_mode', 'wifi0', ssid,
+ value='PSKAuthentication' if psk else 'NONE')
+ qcsapi.mock('get_status', 'wifi0', value='')
+
+ CLIENT_ASSOCIATIONS[interface_name] = ap
+ connection_check.mock(interface_name, ap.connection_check_result or 'succeed')
+
+ # Call ifplugd.action for the interface coming up (wifi/quantenna.py does this
+ # manually).
+ ifplugd_action.call(interface_name, 'up')
+
+
+def _disconnected_event(band):
+ interface = INTERFACE_FOR_BAND[band]
+ interface_name = 'wcli%s' % interface.phynum
+ if interface.driver == 'cfg80211':
+ INTERFACE_EVENTS[interface_name].append('<2>CTRL-EVENT-DISCONNECTED')
+ wpa_cli.mock(interface_name, wpa_state='SCANNING')
+ else:
+ qcsapi.mock('get_ssid', 'wifi0', value='')
+ qcsapi.mock('get_status', 'wifi0', value='Error')
+
+ CLIENT_ASSOCIATIONS[interface_name] = None
+
+
+def _stopclient(args, env=None):
+ bands = _get_flag(args, ('-b', '--band')) or '2.4 5'
+ for band in bands.split():
+ interface = INTERFACE_FOR_BAND[band]
+ interface_name = 'wcli%s' % interface.phynum
+
+ if interface.driver == 'cfg80211':
+ # Send the DISCONNECTED and TERMINATING events.
+ INTERFACE_EVENTS[interface_name].append('<2>CTRL-EVENT-DISCONNECTED')
+ INTERFACE_EVENTS[interface_name].append('<2>CTRL-EVENT-TERMINATING')
+
+ # Clear the wpa_cli status response.
+ wpa_cli.mock(interface_name)
+
+ # Make sure the wpa_supplicant socket does not.
+ if os.path.exists(os.path.join(WPA_PATH, interface_name)):
+ os.unlink(os.path.join(WPA_PATH, interface_name))
+
+ elif interface.driver == 'frenzy':
+ qcsapi.mock('get_ssid', 'wifi0', value='')
+ qcsapi.mock('get_status', 'wifi0', value='Error')
+
+ CLIENT_ASSOCIATIONS[interface_name] = None
+
+ # Call ifplugd.action for the interface going down (wifi/quantenna.py does this
+ # manually).
+ ifplugd_action.call(interface_name, 'down')
+
+ return 0, ''
+
+
+def _stop(*args, **kwargs):
+ _stopap(*args, **kwargs)
+ _stopclient(*args, **kwargs)
+ return 0, ''
+
+
+def _kill_wpa_supplicant(band):
+ # From conman's perspective, there's no difference between someone running
+ # 'wifi stopclient' and the process dying for some other reason.
+ _stopclient(['--band', band])
+
+
+def _scan(args, **unused_kwargs):
+ band_flag = _get_flag(args, ('-b', '--band'))
+ interface = INTERFACE_FOR_BAND[band_flag]
+ interface_name = 'wcli%s' % interface.phynum
+ if not ifup.INTERFACE_STATE.get(interface_name, False):
+ return 1, 'interface down'
+
+ return 0, '\n'.join(ap.scan_str()
+ for band in interface.bands
+ for ap in REMOTE_ACCESS_POINTS[band].itervalues())
+
+
+def _show(unused_args, **unused_kwargs):
+ return 0, '\n\n'.join(WIFI_SHOW_TPL.format(band=band, **interface._asdict()) if interface
+ else WIFI_SHOW_NO_RADIO_TPL.format(band)
+ for band, interface in INTERFACE_FOR_BAND.iteritems())
+
+
+def _get_flag(args, flags):
+ for flag in flags:
+ if flag in args:
+ return args[args.index(flag) + 1]
+
+
+def mock(command, *args, **kwargs):
+ if command == 'remote_ap':
+ remote_ap = AccessPoint(**kwargs)
+ REMOTE_ACCESS_POINTS[kwargs['band']][kwargs['bssid']] = remote_ap
+ elif command == 'remote_ap_remove':
+ del REMOTE_ACCESS_POINTS[kwargs['band']][kwargs['bssid']]
+ elif command == 'interfaces':
+ INTERFACE_FOR_BAND.clear()
+ for interface in args:
+ for band in interface.bands:
+ INTERFACE_FOR_BAND[band] = interface
+ if interface.driver == 'frenzy':
+ get_quantenna_interfaces.mock(
+ fmt % interface.phynum
+ for fmt in ('wlan%s', 'wlan%s_portal', 'wcli%s'))
+ elif command == 'wpa_path':
+ global WPA_PATH
+ WPA_PATH = args[0]
+ elif command == 'disconnected_event':
+ _disconnected_event(args[0])
+ elif command == 'kill_wpa_supplicant':
+ _kill_wpa_supplicant(args[0])
diff --git a/conman/test/fake_python/subprocess/wpa_cli.py b/conman/test/fake_python/subprocess/wpa_cli.py
new file mode 100644
index 0000000..c1849a9
--- /dev/null
+++ b/conman/test/fake_python/subprocess/wpa_cli.py
@@ -0,0 +1,37 @@
+#!/usr/bin/python
+
+"""Fake wpa_cli implementation. Used by fake WPACtrl too."""
+
+import ifdown
+import ifup
+
+
+_INTERFACE_STATE = {}
+
+
+def call(*args, **unused_kwargs):
+ if 'status' not in args:
+ raise ValueError('Fake wpa_cli can only do status requests.')
+
+ if '-i' not in args:
+ raise ValueError('Must specify interface with -i.')
+
+ interface = args[args.index('-i') + 1]
+
+ # Fails for not present or empty dict.
+ if not _INTERFACE_STATE.get(interface, None):
+ return 1, ('Failed to connect to non-global ctrl_ifname: %r '
+ 'error: No such file or directory' % interface)
+
+ state = _INTERFACE_STATE[interface]
+
+ return 0, '\n'.join('%s=%s' % (k, v) for k, v in state.iteritems())
+
+
+# Pass no kwargs to "kill" wpa_supplicant.
+def mock(interface, **kwargs):
+ _INTERFACE_STATE[interface] = {k: v for k, v in kwargs.iteritems() if v}
+ if kwargs:
+ ifup.call(interface)
+ else:
+ ifdown.call(interface)
diff --git a/conman/test/fake_python/wpactrl.py b/conman/test/fake_python/wpactrl.py
new file mode 100644
index 0000000..3d8e300
--- /dev/null
+++ b/conman/test/fake_python/wpactrl.py
@@ -0,0 +1,73 @@
+#!/usr/bin/python
+
+"""Fake WPACtrl implementation."""
+
+import os
+
+import subprocess
+import subprocess.wifi
+
+
+CONNECTED_EVENT = '<2>CTRL-EVENT-CONNECTED'
+DISCONNECTED_EVENT = '<2>CTRL-EVENT-DISCONNECTED'
+TERMINATING_EVENT = '<2>CTRL-EVENT-TERMINATING'
+
+
+# pylint: disable=invalid-name
+class error(Exception):
+ pass
+
+
+class WPACtrl(object):
+ """Fake wpactrl.WPACtrl."""
+
+ # pylint: disable=unused-argument
+ def __init__(self, wpa_socket):
+ self._socket = wpa_socket
+ self.interface_name = os.path.split(self._socket)[-1]
+ self.attached = False
+ self.connected = False
+ self.request_status_fails = False
+ self._clear_events()
+
+ def pending(self):
+ return bool(subprocess.wifi.INTERFACE_EVENTS[self.interface_name])
+
+ def recv(self):
+ return subprocess.wifi.INTERFACE_EVENTS[self.interface_name].pop(0)
+
+ def attach(self):
+ if not os.path.exists(self._socket):
+ raise error('wpactrl_attach failed')
+ self.attached = True
+
+ def detach(self):
+ self.attached = False
+ self.connected = False
+ self.check_socket_exists('wpactrl_detach failed')
+ self._clear_events()
+
+ def request(self, request_type):
+ if request_type == 'STATUS':
+ if self.request_status_fails:
+ raise error('test error')
+ try:
+ return subprocess.check_output(['wpa_cli', '-i', self.interface_name,
+ 'status'])
+ except subprocess.CalledProcessError as e:
+ raise error(e.output)
+ else:
+ raise ValueError('Invalid request_type %s' % request_type)
+
+ @property
+ def ctrl_iface_path(self):
+ return os.path.split(self._socket)[0]
+
+ # Below methods are not part of WPACtrl.
+
+ def check_socket_exists(self, msg='Socket does not exist'):
+ if not os.path.exists(self._socket):
+ raise error(msg)
+
+ def _clear_events(self):
+ subprocess.wifi.INTERFACE_EVENTS[self.interface_name] = []
diff --git a/conman/test/fake_wpactrl/wpactrl/__init__.py b/conman/test/fake_wpactrl/wpactrl/__init__.py
deleted file mode 100644
index b8ce1fd..0000000
--- a/conman/test/fake_wpactrl/wpactrl/__init__.py
+++ /dev/null
@@ -1,5 +0,0 @@
-class error(Exception):
- pass
-
-class WPACtrl(object):
- pass
diff --git a/conman/test/restricted b/conman/test/restricted
deleted file mode 100755
index 9bd8fc3..0000000
--- a/conman/test/restricted
+++ /dev/null
@@ -1,4 +0,0 @@
-#!/bin/sh
-
-echo "$@" | grep -q -- "-a"
-
diff --git a/conman/test/succeed b/conman/test/succeed
deleted file mode 100755
index c52d3c2..0000000
--- a/conman/test/succeed
+++ /dev/null
@@ -1,3 +0,0 @@
-#!/bin/sh
-
-exit 0