Merge wifitv into master
Change-Id: Ia4edb3026ce2bb2a5beca30781a9d69786920113
diff --git a/conman/connection_manager.py b/conman/connection_manager.py
index a2de7d6..aa1b320 100755
--- a/conman/connection_manager.py
+++ b/conman/connection_manager.py
@@ -12,6 +12,9 @@
import subprocess
import time
+# This is in site-packages on the device, but not when running tests, and so
+# raises lint errors.
+# pylint: disable=g-bad-import-order
import pyinotify
import cycler
@@ -50,21 +53,21 @@
class WLANConfiguration(object):
"""Represents a WLAN configuration from cwmpd."""
- WIFI_STOPAP = ['wifi', 'stopap']
+ WIFI_STOPAP = ['wifi', 'stopap', '--persist']
WIFI_SETCLIENT = ['wifi', 'setclient', '--persist']
- WIFI_STOPCLIENT = ['wifi', 'stopclient']
+ WIFI_STOPCLIENT = ['wifi', 'stopclient', '--persist']
- def __init__(self, band, wifi, command_lines, _status):
+ def __init__(self, band, wifi, command_lines, _status, wpa_control_interface):
self.band = band
self.wifi = wifi
self.command = command_lines.splitlines()
self.access_point_up = False
- self.client_up = False
self.ssid = None
self.passphrase = None
self.interface_suffix = None
self.access_point = None
self._status = _status
+ self._wpa_control_interface = wpa_control_interface
binwifi_option_attrs = {
'-s': 'ssid',
@@ -89,7 +92,12 @@
if self.wifi.initial_ssid == self.ssid:
logging.debug('Connected to WLAN at startup')
- self.client_up = True
+
+ @property
+ def client_up(self):
+ wpa_cli_status = self.wifi.wpa_cli_status()
+ return (wpa_cli_status.get('wpa_state') == 'COMPLETED'
+ and wpa_cli_status.get('ssid') == self.ssid)
def start_access_point(self):
"""Start an access point."""
@@ -127,12 +135,11 @@
def start_client(self):
"""Join the WLAN as a client."""
- if self.client_up:
+ up = self.client_up
+ if up:
logging.debug('Wifi client already started on %s GHz', self.band)
return
- self.wifi.detach_wpa_control()
-
command = self.WIFI_SETCLIENT + ['--ssid', self.ssid, '--band', self.band]
env = dict(os.environ)
if self.passphrase:
@@ -140,11 +147,13 @@
try:
self._status.trying_wlan = True
subprocess.check_output(command, stderr=subprocess.STDOUT, env=env)
- self.client_up = True
- self._status.connected_to_wlan = True
- logging.info('Started wifi client on %s GHz', self.band)
except subprocess.CalledProcessError as e:
logging.error('Failed to start wifi client: %s', e.output)
+ return
+
+ self._status.connected_to_wlan = True
+ logging.info('Started wifi client on %s GHz', self.band)
+ self.wifi.attach_wpa_control(self._wpa_control_interface)
def stop_client(self):
if not self.client_up:
@@ -156,7 +165,6 @@
try:
subprocess.check_output(self.WIFI_STOPCLIENT + ['-b', self.band],
stderr=subprocess.STDOUT)
- self.client_up = False
# TODO(rofrankel): Make this work for dual-radio devices.
self._status.connected_to_wlan = False
logging.debug('Stopped wifi client on %s GHz', self.band)
@@ -170,6 +178,7 @@
# pylint: disable=invalid-name
Bridge = interface.Bridge
Wifi = interface.Wifi
+ FrenzyWifi = interface.FrenzyWifi
WLANConfiguration = WLANConfiguration
ETHERNET_STATUS_FILE = 'eth0'
@@ -184,6 +193,7 @@
IFUP = ['ifup']
IP_LINK = ['ip', 'link']
IFPLUGD_ACTION = ['/etc/ifplugd/ifplugd.action']
+ BINWIFI = ['wifi']
def __init__(self,
bridge_interface='br0',
@@ -220,23 +230,7 @@
bridge_interface, '10',
acs_autoprovisioning_filepath=acs_autoprov_filepath)
- # If we have multiple wcli interfaces, 5 GHz-only < both < 2.4 GHz-only.
- def metric_for_bands(bands):
- if '5' in bands:
- if '2.4' in bands:
- return interface.METRIC_24GHZ_5GHZ
- return interface.METRIC_5GHZ
- return interface.METRIC_24GHZ
-
- self.wifi = sorted([self.Wifi(interface_name, metric_for_bands(bands),
- # Prioritize 5 GHz over 2.4.
- bands=sorted(bands, reverse=True))
- for interface_name, bands
- in get_client_interfaces().iteritems()],
- key=lambda w: w.metric)
-
- for wifi in self.wifi:
- wifi.last_wifi_scan_time = -self._wifi_scan_period_s
+ self.create_wifi_interfaces()
self._status = status.Status(self._status_dir)
@@ -268,7 +262,8 @@
wifi_up = self.is_interface_up(wifi.name)
self.ifplugd_action(wifi.name, wifi_up)
if wifi_up:
- wifi.attach_wpa_control(self._wpa_control_interface)
+ self._status.attached_to_wpa_supplicant = wifi.attach_wpa_control(
+ self._wpa_control_interface)
for path, prefix in ((self._tmp_dir, self.GATEWAY_FILE_PREFIX),
(self._interface_status_dir, ''),
@@ -277,7 +272,25 @@
for filepath in glob.glob(os.path.join(path, prefix + '*')):
self._process_file(path, os.path.split(filepath)[-1])
- # Now that we've ready any existing state, it's okay to let interfaces touch
+ # Make sure no unwanted APs or clients are running.
+ for wifi in self.wifi:
+ for band in wifi.bands:
+ config = self._wlan_configuration.get(band, None)
+ if config:
+ if config.access_point:
+ # If we have a config and want an AP, we don't want a client.
+ self._stop_wifi(band, False, True)
+ else:
+ # If we have a config but don't want an AP, make sure we aren't
+ # running one.
+ self._stop_wifi(band, True, False)
+ break
+ else:
+ # If we have no config for this radio, neither a client nor an AP should
+ # be running.
+ self._stop_wifi(wifi.bands[0], True, True)
+
+ # Now that we've read any existing state, it's okay to let interfaces touch
# the routing table.
for ifc in [self.bridge] + self.wifi:
ifc.initialize()
@@ -286,6 +299,32 @@
self._interface_update_counter = 0
self._try_wlan_after = {'5': 0, '2.4': 0}
+ def create_wifi_interfaces(self):
+ """Create Wifi interfaces."""
+
+ # If we have multiple client interfaces, 5 GHz-only < both < 2.4 GHz-only.
+ def metric_for_bands(bands):
+ if '5' in bands:
+ if '2.4' in bands:
+ return interface.METRIC_24GHZ_5GHZ
+ return interface.METRIC_5GHZ
+ return interface.METRIC_24GHZ
+
+ def wifi_class(attrs):
+ return self.FrenzyWifi if 'frenzy' in attrs else self.Wifi
+
+ self.wifi = sorted([
+ wifi_class(attrs)(interface_name,
+ metric_for_bands(attrs['bands']),
+ # Prioritize 5 GHz over 2.4.
+ bands=sorted(attrs['bands'], reverse=True))
+ for interface_name, attrs
+ in get_client_interfaces().iteritems()
+ ], key=lambda w: w.metric)
+
+ for wifi in self.wifi:
+ wifi.last_wifi_scan_time = -self._wifi_scan_period_s
+
def is_interface_up(self, interface_name):
"""Explicitly check whether an interface is up.
@@ -349,11 +388,6 @@
for wifi in self.wifi:
continue_wifi = False
- if not wifi.attached():
- logging.debug('Attempting to attach to wpa control interface for %s',
- wifi.name)
- wifi.attach_wpa_control(self._wpa_control_interface)
- wifi.handle_wpa_events()
# Only one wlan_configuration per interface will have access_point ==
# True. Try 5 GHz first, then 2.4 GHz. If both bands are supported by
@@ -375,6 +409,13 @@
if wlan_configuration.access_point_up:
continue_wifi = True
+ if not wifi.attached():
+ logging.debug('Attempting to attach to wpa control interface for %s',
+ wifi.name)
+ self._status.attached_to_wpa_supplicant = wifi.attach_wpa_control(
+ self._wpa_control_interface)
+ wifi.handle_wpa_events()
+
if continue_wifi:
logging.debug('Running AP on %s, nothing else to do.', wifi.name)
continue
@@ -550,7 +591,8 @@
wifi = self.wifi_for_band(band)
if wifi:
self._update_wlan_configuration(
- self.WLANConfiguration(band, wifi, contents, self._status))
+ self.WLANConfiguration(band, wifi, contents, self._status,
+ self._wpa_control_interface))
elif filename.startswith(self.ACCESS_POINT_FILE_PREFIX):
match = re.match(self.ACCESS_POINT_FILE_REGEXP, filename)
if match:
@@ -610,20 +652,22 @@
logging.info('Scanning on %s...', wifi.name)
wifi.last_wifi_scan_time = time.time()
subprocess.call(self.IFUP + [wifi.name])
- with_ie, without_ie = self._find_bssids(wifi.name)
+ # /bin/wifi takes a --band option but then finds the right interface for it,
+ # so it's okay to just pick the first band here.
+ with_ie, without_ie = self._find_bssids(wifi.bands[0])
logging.info('Done scanning on %s', wifi.name)
items = [(bss_info, 3) for bss_info in with_ie]
items += [(bss_info, 1) for bss_info in without_ie]
wifi.cycler = cycler.AgingPriorityCycler(cycle_length_s=30, items=items)
- def _find_bssids(self, wcli):
+ def _find_bssids(self, band):
def supports_autoprovisioning(oui, vendor_ie):
if oui not in GFIBER_OUIS:
return False
return vendor_ie.startswith(VENDOR_IE_FEATURE_ID_AUTOPROVISIONING)
- return iw.find_bssids(wcli, supports_autoprovisioning, False)
+ return iw.find_bssids(band, supports_autoprovisioning, False)
def _try_next_bssid(self, wifi):
"""Attempt to connect to the next BSSID in wifi's BSSID cycler.
@@ -701,22 +745,86 @@
wlan_configuration.stop_access_point()
wlan_configuration.start_client()
+ def _stop_wifi(self, band, stopap, stopclient):
+ """Stop running wifi processes.
+
+ At least one of [stopap, stopclient] must be True.
+
+ Args:
+ band: The band on which to stop wifi.
+ stopap: Whether to stop access points.
+ stopclient: Whether to stop wifi clients.
+
+ Raises:
+ ValueError: If neither stopap nor stopclient is True.
+ """
+ if stopap and stopclient:
+ command = 'stop'
+ elif stopap:
+ command = 'stopap'
+ elif stopclient:
+ command = 'stopclient'
+ else:
+ raise ValueError('Called _stop_wifi without specifying AP or client.')
+
+ full_command = [command, '--band', band, '--persist']
+
+ try:
+ self._binwifi(*full_command)
+ except subprocess.CalledProcessError as e:
+ logging.error('wifi %s failed: "%s"', ' '.join(full_command), e.output)
+
+ def _binwifi(self, *command):
+ """Test seam for calls to /bin/wifi.
+
+ Only used by _stop_wifi, and probably shouldn't be used by anything else.
+
+ Args:
+ *command: A command for /bin/wifi
+
+ Raises:
+ subprocess.CalledProcessError: If the command fails. Deliberately not
+ handled here to make future authors think twice before using this.
+ """
+ subprocess.check_output(self.BINWIFI + list(command),
+ stderr=subprocess.STDOUT)
+
def _wifi_show():
try:
return subprocess.check_output(['wifi', 'show'])
except subprocess.CalledProcessError as e:
logging.error('Failed to call "wifi show": %s', e)
+ return ''
+
+
+def _get_quantenna_interface():
+ try:
+ return subprocess.check_output(['get-quantenna-interface']).strip()
+ except subprocess.CalledProcessError:
+ logging.fatal('Failed to call get-quantenna-interface')
+ raise
def get_client_interfaces():
+ """Find all client interfaces on the device.
+
+ Returns:
+ A dict mapping wireless client interfaces to their supported bands.
+ """
+
current_band = None
- result = collections.defaultdict(set)
+ result = collections.defaultdict(lambda: collections.defaultdict(set))
for line in _wifi_show().splitlines():
if line.startswith('Band:'):
current_band = line.split()[1]
elif line.startswith('Client Interface:'):
- result[line.split()[2]].add(current_band)
+ result[line.split()[2]]['bands'].add(current_band)
+
+ # TODO(rofrankel): Make 'wifi show' (or wifi_files) include this information
+ # so we don't need a subprocess call to check.
+ quantenna_interface = _get_quantenna_interface()
+ if quantenna_interface in result:
+ result[quantenna_interface]['frenzy'] = True
return result
-
diff --git a/conman/connection_manager_test.py b/conman/connection_manager_test.py
index 6b1142d..d96022e 100755
--- a/conman/connection_manager_test.py
+++ b/conman/connection_manager_test.py
@@ -27,7 +27,7 @@
}
"""
-WIFI_SHOW_OUTPUT_ONE_RADIO = """Band: 2.4
+WIFI_SHOW_OUTPUT_MARVELL8897 = """Band: 2.4
RegDomain: US
Interface: wlan0 # 2.4 GHz ap
Channel: 149
@@ -52,7 +52,7 @@
Client BSSID: f4:f5:e8:81:1b:a1
"""
-WIFI_SHOW_OUTPUT_TWO_RADIOS = """Band: 2.4
+WIFI_SHOW_OUTPUT_ATH9K_ATH10K = """Band: 2.4
RegDomain: US
Interface: wlan0 # 2.4 GHz ap
Channel: 149
@@ -78,7 +78,7 @@
"""
# See b/27328894.
-WIFI_SHOW_OUTPUT_ONE_RADIO_NO_5GHZ = """Band: 2.4
+WIFI_SHOW_OUTPUT_MARVELL8897_NO_5GHZ = """Band: 2.4
RegDomain: 00
Interface: wlan0 # 2.4 GHz ap
BSSID: 00:50:43:02:fe:01
@@ -92,29 +92,86 @@
RegDomain: 00
"""
-IW_SCAN_OUTPUT = """BSS 00:11:22:33:44:55(on wcli0)
+WIFI_SHOW_OUTPUT_ATH9K_FRENZY = """Band: 2.4
+RegDomain: US
+Interface: wlan0 # 2.4 GHz ap
+Channel: 149
+BSSID: f4:f5:e8:81:1b:a0
+AutoChannel: True
+AutoType: NONDFS
+Station List for band: 2.4
+
+Client Interface: wcli0 # 2.4 GHz client
+Client BSSID: f4:f5:e8:81:1b:a1
+
+Band: 5
+RegDomain: 00
+Interface: wlan0 # 5 GHz ap
+AutoChannel: False
+Station List for band: 5
+
+Client Interface: wlan1 # 5 GHz client
+"""
+
+WIFI_SHOW_OUTPUT_FRENZY = """Band: 2.4
+RegDomain: 00
+Band: 5
+RegDomain: 00
+Interface: wlan0 # 5 GHz ap
+AutoChannel: False
+Station List for band: 5
+
+Client Interface: wlan0 # 5 GHz client
+"""
+
+IW_SCAN_DEFAULT_OUTPUT = """BSS 00:11:22:33:44:55(on wcli0)
SSID: s1
- Vendor specific: OUI f4:f5:e8, data: 01
BSS 66:77:88:99:aa:bb(on wcli0)
SSID: s1
- Vendor specific: OUI f4:f5:e8, data: 01
BSS 01:23:45:67:89:ab(on wcli0)
SSID: s2
"""
+IW_SCAN_HIDDEN_OUTPUT = """BSS ff:ee:dd:cc:bb:aa(on wcli0)
+ Vendor specific: OUI f4:f5:e8, data: 01
+ Vendor specific: OUI f4:f5:e8, data: 03 73 33
+"""
+
@wvtest.wvtest
def get_client_interfaces_test():
"""Test get_client_interfaces."""
# pylint: disable=protected-access
original_wifi_show = connection_manager._wifi_show
- connection_manager._wifi_show = lambda: WIFI_SHOW_OUTPUT_ONE_RADIO
+ original_get_quantenna_interface = connection_manager._get_quantenna_interface
+ connection_manager._get_quantenna_interface = lambda: ''
+ connection_manager._wifi_show = lambda: WIFI_SHOW_OUTPUT_MARVELL8897
wvtest.WVPASSEQ(connection_manager.get_client_interfaces(),
- {'wcli0': set(['2.4', '5'])})
- connection_manager._wifi_show = lambda: WIFI_SHOW_OUTPUT_TWO_RADIOS
+ {'wcli0': {'bands': set(['2.4', '5'])}})
+ connection_manager._wifi_show = lambda: WIFI_SHOW_OUTPUT_ATH9K_ATH10K
+ wvtest.WVPASSEQ(connection_manager.get_client_interfaces(), {
+ 'wcli0': {'bands': set(['2.4'])},
+ 'wcli1': {'bands': set(['5'])}
+ })
+
+ # Test Quantenna devices.
+
+ # 2.4 GHz cfg80211 radio + 5 GHz Frenzy (Optimus Prime).
+ connection_manager._wifi_show = lambda: WIFI_SHOW_OUTPUT_ATH9K_FRENZY
+ connection_manager._get_quantenna_interface = lambda: 'wlan1'
+ wvtest.WVPASSEQ(connection_manager.get_client_interfaces(), {
+ 'wcli0': {'bands': set(['2.4'])},
+ 'wlan1': {'frenzy': True, 'bands': set(['5'])}
+ })
+
+ # Only Frenzy (e.g. Lockdown).
+ connection_manager._wifi_show = lambda: WIFI_SHOW_OUTPUT_FRENZY
+ connection_manager._get_quantenna_interface = lambda: 'wlan0'
wvtest.WVPASSEQ(connection_manager.get_client_interfaces(),
- {'wcli0': set(['2.4']), 'wcli1': set(['5'])})
+ {'wlan0': {'frenzy': True, 'bands': set(['5'])}})
+
connection_manager._wifi_show = original_wifi_show
+ connection_manager._get_quantenna_interface = original_get_quantenna_interface
class WLANConfiguration(connection_manager.WLANConfiguration):
@@ -125,16 +182,26 @@
WIFI_STOPCLIENT = ['echo', 'stopclient']
def start_client(self):
- if not self.client_up:
+ client_was_up = self.client_up
+ was_attached = self.wifi.attached()
+ # Do this before calling the super method so that the attach call at the end
+ # succeeds.
+ if not client_was_up and not was_attached:
+ self.wifi._initial_ssid_testonly = self.ssid
+ self.wifi.start_wpa_supplicant_testonly(self._wpa_control_interface)
+
+ super(WLANConfiguration, self).start_client()
+
+ if not client_was_up:
self.wifi.set_connection_check_result('succeed')
- if self.wifi.attached():
+ if was_attached:
+ self.wifi._wpa_control.ssid_testonly = self.ssid
self.wifi.add_connected_event()
- else:
- open(self._socket(), 'w')
- # Normally, wpa_supplicant would bring up wcli*, which would trigger
- # ifplugd, which would run ifplugd.action, which would do two things:
+ # Normally, wpa_supplicant would bring up the client interface, which
+ # would trigger ifplugd, which would run ifplugd.action, which would do
+ # two things:
#
# 1) Write an interface status file.
# 2) Call run-dhclient, which would call dhclient-script, which would
@@ -144,22 +211,18 @@
self.write_interface_status_file('1')
self.write_gateway_file()
- super(WLANConfiguration, self).start_client()
-
def stop_client(self):
- if self.client_up:
+ client_was_up = self.client_up
+
+ super(WLANConfiguration, self).stop_client()
+
+ if client_was_up:
self.wifi.add_terminating_event()
- os.unlink(self._socket())
self.wifi.set_connection_check_result('fail')
# See comments in start_client.
self.write_interface_status_file('0')
- super(WLANConfiguration, self).stop_client()
-
- def _socket(self):
- return os.path.join(self._wpa_control_interface, self.wifi.name)
-
def write_gateway_file(self):
gateway_file = os.path.join(self.tmp_dir,
self.gateway_file_prefix + self.wifi.name)
@@ -178,8 +241,13 @@
def __init__(self, *args, **kwargs):
super(Wifi, self).__init__(*args, **kwargs)
- # Whether wpa_supplicant is connected to a network.
- self._initially_connected = True
+ self.wifi_scan_counter = 0
+
+
+class FrenzyWifi(interface_test.FrenzyWifi):
+
+ def __init__(self, *args, **kwargs):
+ super(FrenzyWifi, self).__init__(*args, **kwargs)
self.wifi_scan_counter = 0
@@ -189,21 +257,26 @@
# pylint: disable=invalid-name
Bridge = interface_test.Bridge
Wifi = Wifi
+ FrenzyWifi = FrenzyWifi
WLANConfiguration = WLANConfiguration
WIFI_SETCLIENT = ['echo', 'setclient']
IFUP = ['echo', 'ifup']
IFPLUGD_ACTION = ['echo', 'ifplugd.action']
+ BINWIFI = ['echo', 'wifi']
def __init__(self, *args, **kwargs):
+ self._binwifi_commands = []
+
self.interfaces_already_up = kwargs.pop('__test_interfaces_already_up',
['eth0'])
- wifi_interfaces_already_up = [ifc for ifc in self.interfaces_already_up
- if ifc.startswith('wcli')]
- for wifi in wifi_interfaces_already_up:
- # wcli1 is always 5 GHz. wcli0 always *includes* 2.4.
- band = '5' if wifi == 'wcli1' else '2.4'
+ self.wifi_interfaces_already_up = [ifc for ifc in self.interfaces_already_up
+ if ifc.startswith('w')]
+ for wifi in self.wifi_interfaces_already_up:
+ # wcli1 is always 5 GHz. wcli0 always *includes* 2.4. wlan* client
+ # interfaces are Frenzy interfaces and therefore 5 GHz-only.
+ band = '5' if wifi in ('wlan0', 'wlan1', 'wcli1') else '2.4'
# This will happen in the super function, but in order for
# write_wlan_config to work we have to do it now. This has to happen
# before the super function so that the files exist before the inotify
@@ -216,16 +289,20 @@
super(ConnectionManager, self).__init__(*args, **kwargs)
- for wifi in wifi_interfaces_already_up:
- # pylint: disable=protected-access
- self.interface_by_name(wifi)._initially_connected = True
-
- self.scan_has_results = False
+ self.interface_with_scan_results = None
+ self.scan_results_include_hidden = False
# Should we be able to connect to open network s2?
- self.s2_connect = True
+ self.can_connect_to_s2 = True
+ self.can_connect_to_s3 = True
# Will s2 fail rather than providing ACS access?
self.s2_fail = False
+ def create_wifi_interfaces(self):
+ super(ConnectionManager, self).create_wifi_interfaces()
+ for wifi in self.wifi_interfaces_already_up:
+ # pylint: disable=protected-access
+ self.interface_by_name(wifi)._initial_ssid_testonly = 'my ssid'
+
@property
def IP_LINK(self):
return ['echo'] + ['%s LOWER_UP' % ifc
@@ -240,50 +317,45 @@
wifi.add_terminating_event()
def _try_bssid(self, wifi, bss_info):
+ self.last_provisioning_attempt = bss_info
+
super(ConnectionManager, self)._try_bssid(wifi, bss_info)
- socket = os.path.join(self._wpa_control_interface, wifi.name)
-
- if bss_info and bss_info.ssid == 's1':
+ def connect(connection_check_result):
+ # pylint: disable=protected-access
if wifi.attached():
+ wifi._wpa_control._ssid_testonly = bss_info.ssid
wifi.add_connected_event()
else:
- open(socket, 'w')
- wifi.set_connection_check_result('fail')
- self.write_interface_status_file(wifi.name, '1')
+ wifi._initial_ssid_testonly = bss_info.ssid
+ wifi.start_wpa_supplicant_testonly(self._wpa_control_interface)
+ wifi.set_connection_check_result(connection_check_result)
+ self.ifplugd_action(wifi.name, True)
+
+ if bss_info and bss_info.ssid == 's1':
+ connect('fail')
return True
- if bss_info and bss_info.ssid == 's2':
- if self.s2_connect:
- if wifi.attached():
- wifi.add_connected_event()
- else:
- open(socket, 'w')
- if self.s2_fail:
- connection_check_result = 'fail'
- logging.debug('s2 configured to have no ACS access')
- else:
- connection_check_result = 'restricted'
- wifi.set_connection_check_result(connection_check_result)
- self.ifplugd_action(wifi.name, True)
- return True
- else:
- logging.debug('s2 configured not to connect')
+ if bss_info and bss_info.ssid == 's2' and self.can_connect_to_s2:
+ connect('fail' if self.s2_fail else 'succeed')
+ return True
+
+ if bss_info and bss_info.ssid == 's3' and self.can_connect_to_s3:
+ connect('restricted')
+ return True
return False
- def _wifi_stopclient(self, band):
- super(ConnectionManager, self)._wifi_stopclient(band)
- self.wifi_for_band(band).add_terminating_event()
-
# pylint: disable=unused-argument,protected-access
- def _find_bssids(self, wcli):
- # Only the 5 GHz scan finds anything.
- if wcli == 'wcli0' and self.scan_has_results:
- iw._scan = lambda interface: IW_SCAN_OUTPUT
- else:
- iw._scan = lambda interface: ''
- return super(ConnectionManager, self)._find_bssids(wcli)
+ def _find_bssids(self, band):
+ scan_output = ''
+ if (self.interface_with_scan_results and
+ band in self.interface_by_name(self.interface_with_scan_results).bands):
+ scan_output = IW_SCAN_DEFAULT_OUTPUT
+ if self.scan_results_include_hidden:
+ scan_output += IW_SCAN_HIDDEN_OUTPUT
+ iw._scan = lambda interface: scan_output
+ return super(ConnectionManager, self)._find_bssids(band)
def _update_wlan_configuration(self, wlan_configuration):
wlan_configuration.command.insert(0, 'echo')
@@ -312,6 +384,10 @@
self.write_gateway_file('br0' if interface_name in ('eth0', 'moca0')
else interface_name)
+ def _binwifi(self, *command):
+ super(ConnectionManager, self)._binwifi(*command)
+ self._binwifi_commands.append(command)
+
# Non-overrides
def access_point_up(self, band):
@@ -328,31 +404,17 @@
# Test methods
- def wlan_config_filename(self, band):
- return os.path.join(self._config_dir, 'command.%s' % band)
-
- def access_point_filename(self, band):
- return os.path.join(self._config_dir, 'access_point.%s' % band)
-
def delete_wlan_config(self, band):
- os.unlink(self.wlan_config_filename(band))
+ delete_wlan_config(self._config_dir, band)
- def write_wlan_config(self, band, ssid, psk, atomic=False):
- final_filename = self.wlan_config_filename(band)
- filename = final_filename + ('.tmp' if atomic else '')
- with open(filename, 'w') as f:
- f.write('\n'.join(['env', 'WIFI_PSK=%s' % psk,
- 'wifi', 'set', '-b', band, '--ssid', ssid]))
- if atomic:
- os.rename(filename, final_filename)
+ def write_wlan_config(self, *args, **kwargs):
+ write_wlan_config(self._config_dir, *args, **kwargs)
def enable_access_point(self, band):
- open(self.access_point_filename(band), 'w')
+ enable_access_point(self._config_dir, band)
def disable_access_point(self, band):
- ap_filename = self.access_point_filename(band)
- if os.path.isfile(ap_filename):
- os.unlink(ap_filename)
+ disable_access_point(self._config_dir, band)
def write_gateway_file(self, interface_name):
gateway_file = os.path.join(self._tmp_dir,
@@ -389,25 +451,71 @@
while wifi_scan_counter == wifi.wifi_scan_counter:
self.run_once()
+ def run_until_interface_update_and_scan(self, band):
+ wifi = self.wifi_for_band(band)
+ wifi_scan_counter = wifi.wifi_scan_counter
+ self.run_until_interface_update()
+ while wifi_scan_counter == wifi.wifi_scan_counter:
+ self.run_once()
+
def has_status_files(self, files):
return not set(files) - set(os.listdir(self._status_dir))
-def connection_manager_test(radio_config, **cm_kwargs):
+def wlan_config_filename(path, band):
+ return os.path.join(path, 'command.%s' % band)
+
+
+def access_point_filename(path, band):
+ return os.path.join(path, 'access_point.%s' % band)
+
+
+def write_wlan_config(path, band, ssid, psk, atomic=False):
+ final_filename = wlan_config_filename(path, band)
+ filename = final_filename + ('.tmp' if atomic else '')
+ with open(filename, 'w') as f:
+ f.write('\n'.join(['env', 'WIFI_PSK=%s' % psk,
+ 'wifi', 'set', '-b', band, '--ssid', ssid]))
+ if atomic:
+ os.rename(filename, final_filename)
+
+
+def delete_wlan_config(path, band):
+ os.unlink(wlan_config_filename(path, band))
+
+
+def enable_access_point(path, band):
+ open(access_point_filename(path, band), 'w')
+
+
+def disable_access_point(path, band):
+ ap_filename = access_point_filename(path, band)
+ if os.path.isfile(ap_filename):
+ os.unlink(ap_filename)
+
+
+def connection_manager_test(radio_config, wlan_configs=None,
+ quantenna_interface='', **cm_kwargs):
"""Returns a decorator that does ConnectionManager test boilerplate."""
+ if wlan_configs is None:
+ wlan_configs = {}
+
def inner(f):
"""The actual decorator."""
def actual_test():
"""The actual test function."""
run_duration_s = .01
interface_update_period = 5
- wifi_scan_period = 5
+ wifi_scan_period = 15
wifi_scan_period_s = run_duration_s * wifi_scan_period
# pylint: disable=protected-access
original_wifi_show = connection_manager._wifi_show
connection_manager._wifi_show = lambda: radio_config
+ original_gqi = connection_manager._get_quantenna_interface
+ connection_manager._get_quantenna_interface = lambda: quantenna_interface
+
try:
# No initial state.
tmp_dir = tempfile.mkdtemp()
@@ -415,6 +523,12 @@
os.mkdir(os.path.join(tmp_dir, 'interfaces'))
moca_tmp_dir = tempfile.mkdtemp()
wpa_control_interface = tempfile.mkdtemp()
+ FrenzyWifi.WPACtrl.WIFIINFO_PATH = tempfile.mkdtemp()
+
+ for band, access_point in wlan_configs.iteritems():
+ write_wlan_config(config_dir, band, 'initial ssid', 'initial psk')
+ if access_point:
+ open(os.path.join(config_dir, 'access_point.%s' % band), 'w')
# Test that missing directories are created by ConnectionManager.
shutil.rmtree(tmp_dir)
@@ -437,8 +551,10 @@
shutil.rmtree(config_dir)
shutil.rmtree(moca_tmp_dir)
shutil.rmtree(wpa_control_interface)
+ shutil.rmtree(FrenzyWifi.WPACtrl.WIFIINFO_PATH)
# pylint: disable=protected-access
connection_manager._wifi_show = original_wifi_show
+ connection_manager._get_quantenna_interface = original_gqi
actual_test.func_name = f.func_name
return actual_test
@@ -446,15 +562,15 @@
return inner
-def connection_manager_test_radio_independent(c):
+def connection_manager_test_generic(c, band):
"""Test ConnectionManager for things independent of radio configuration.
- To verify that these things are both independent, this function is called
- twice below, once with each radio configuration. Those wrappers have the
- relevant test decorators.
+ To verify that these things are both independent, this function is called once
+ below with each radio configuration.
Args:
- c: A ConnectionManager set up by @connection_manager_test.
+ c: The ConnectionManager set up by @connection_manager_test.
+ band: The band to test.
"""
# This test only checks that this file gets created and deleted once each.
# ConnectionManager cares that the file is created *where* expected, but it is
@@ -474,8 +590,8 @@
wvtest.WVPASS(c.internet())
wvtest.WVPASS(c.bridge.current_route())
wvtest.WVPASS(os.path.exists(acs_autoprov_filepath))
- wvtest.WVFAIL(c.wifi_for_band('2.4').current_route())
- wvtest.WVFAIL(c.wifi_for_band('5').current_route())
+ for wifi in c.wifi:
+ wvtest.WVFAIL(wifi.current_route())
wvtest.WVFAIL(c.has_status_files([status.P.CONNECTED_TO_WLAN,
status.P.HAVE_CONFIG]))
@@ -533,41 +649,80 @@
wvtest.WVFAIL(c.bridge.current_route())
# Now there are some scan results.
- c.scan_has_results = True
+ c.interface_with_scan_results = c.wifi_for_band(band).name
# Wait for a scan, plus 3 cycles, so that s2 will have been tried.
- c.run_until_scan('2.4')
+ c.run_until_scan(band)
for _ in range(3):
c.run_once()
wvtest.WVPASS(c.has_status_files([status.P.CONNECTED_TO_OPEN]))
- last_bss_info = c.wifi_for_band('2.4').last_attempted_bss_info
+ last_bss_info = c.wifi_for_band(band).last_attempted_bss_info
wvtest.WVPASSEQ(last_bss_info.ssid, 's2')
wvtest.WVPASSEQ(last_bss_info.bssid, '01:23:45:67:89:ab')
# Wait for the connection to be processed.
c.run_once()
wvtest.WVPASS(c.acs())
- wvtest.WVFAIL(c.internet())
- wvtest.WVFAIL(c.client_up('2.4'))
- wvtest.WVPASS(c.wifi_for_band('2.4').current_route())
+ wvtest.WVPASS(c.internet())
+ wvtest.WVFAIL(c.client_up(band))
+ wvtest.WVPASS(c.wifi_for_band(band).current_route())
+ # Disable scan results again.
+ c.interface_with_scan_results = None
- # Now, create a WLAN configuration which should be connected to. Also, test
- # that atomic writes/renames work.
+ # Now, create a WLAN configuration which should be connected to.
ssid = 'wlan'
psk = 'password'
- c.write_wlan_config('2.4', ssid, psk, atomic=True)
- c.disable_access_point('2.4')
+ c.write_wlan_config(band, ssid, psk)
+ c.disable_access_point(band)
c.run_once()
- wvtest.WVPASS(c.client_up('2.4'))
- wvtest.WVPASS(c.wifi_for_band('2.4').current_route())
+ wvtest.WVPASS(c.client_up(band))
+ wvtest.WVPASS(c.wifi_for_band(band).current_route())
+ wvtest.WVPASS(c.has_status_files([status.P.CONNECTED_TO_WLAN]))
+
+ # Kill wpa_supplicant. conman should restart it.
+ wvtest.WVPASS(c.client_up(band))
+ wvtest.WVPASS(c._connected_to_wlan(c.wifi_for_band(band)))
+ c.wifi_for_band(band).kill_wpa_supplicant_testonly(c._wpa_control_interface)
+ wvtest.WVFAIL(c.client_up(band))
+ wvtest.WVFAIL(c._connected_to_wlan(c.wifi_for_band(band)))
+ c.run_once()
+ wvtest.WVPASS(c.has_status_files([status.P.CONNECTED_TO_WLAN]))
+ wvtest.WVPASS(c.client_up(band))
+ wvtest.WVPASS(c._connected_to_wlan(c.wifi_for_band(band)))
+
+ # Now, remove the WLAN configuration and make sure we are disconnected. Then
+ # disable the previously used ACS connection via s2, re-enable scan results,
+ # add the user's WLAN to the scan results, and scan again. This time, the
+ # first SSID tried should be 's3', which is now present in the scan results
+ # (with its SSID hidden, but included via vendor IE).
+ c.delete_wlan_config(band)
+ c.can_connect_to_s2 = False
+ c.interface_with_scan_results = c.wifi_for_band(band).name
+ c.scan_results_include_hidden = True
+ c.run_until_interface_update_and_scan(band)
+ wvtest.WVFAIL(c.has_status_files([status.P.CONNECTED_TO_WLAN]))
+ c.run_until_interface_update()
+ wvtest.WVPASS(c.has_status_files([status.P.CONNECTED_TO_OPEN]))
+ wvtest.WVPASSEQ(c.last_provisioning_attempt.ssid, 's3')
+ wvtest.WVPASSEQ(c.last_provisioning_attempt.bssid, 'ff:ee:dd:cc:bb:aa')
+
+ # Now, recreate the same WLAN configuration, which should be connected to.
+ # Also, test that atomic writes/renames work.
+ ssid = 'wlan'
+ psk = 'password'
+ c.write_wlan_config(band, ssid, psk, atomic=True)
+ c.disable_access_point(band)
+ c.run_once()
+ wvtest.WVPASS(c.client_up(band))
+ wvtest.WVPASS(c.wifi_for_band(band).current_route())
wvtest.WVPASS(c.has_status_files([status.P.CONNECTED_TO_WLAN]))
# Now enable the AP. Since we have no wired connection, this should have no
# effect.
- c.enable_access_point('2.4')
+ c.enable_access_point(band)
c.run_once()
- wvtest.WVPASS(c.client_up('2.4'))
- wvtest.WVPASS(c.wifi_for_band('2.4').current_route())
+ wvtest.WVPASS(c.client_up(band))
+ wvtest.WVPASS(c.wifi_for_band(band).current_route())
wvtest.WVFAIL(c.bridge.current_route())
# Now bring up the bridge. We should remove the wifi connection and start
@@ -575,60 +730,61 @@
c.set_ethernet(True)
c.bridge.set_connection_check_result('succeed')
c.run_until_interface_update()
- wvtest.WVPASS(c.access_point_up('2.4'))
- wvtest.WVFAIL(c.client_up('2.4'))
- wvtest.WVFAIL(c.wifi_for_band('2.4').current_route())
+ wvtest.WVPASS(c.access_point_up(band))
+ wvtest.WVFAIL(c.client_up(band))
+ wvtest.WVFAIL(c.wifi_for_band(band).current_route())
wvtest.WVPASS(c.bridge.current_route())
# Now move (rather than delete) the configuration file. The AP should go
# away, and we should not be able to join the WLAN. Routes should not be
# affected.
- filename = c.wlan_config_filename('2.4')
+ filename = wlan_config_filename(c._config_dir, band)
other_filename = filename + '.bak'
os.rename(filename, other_filename)
c.run_once()
- wvtest.WVFAIL(c.access_point_up('2.4'))
- wvtest.WVFAIL(c.client_up('2.4'))
- wvtest.WVFAIL(c.wifi_for_band('2.4').current_route())
+ wvtest.WVFAIL(c.access_point_up(band))
+ wvtest.WVFAIL(c.client_up(band))
+ wvtest.WVFAIL(c.wifi_for_band(band).current_route())
wvtest.WVPASS(c.bridge.current_route())
wvtest.WVFAIL(c.has_status_files([status.P.HAVE_CONFIG]))
# Now move it back, and the AP should come back.
os.rename(other_filename, filename)
c.run_once()
- wvtest.WVPASS(c.access_point_up('2.4'))
- wvtest.WVFAIL(c.client_up('2.4'))
- wvtest.WVFAIL(c.wifi_for_band('2.4').current_route())
+ wvtest.WVPASS(c.access_point_up(band))
+ wvtest.WVFAIL(c.client_up(band))
+ wvtest.WVFAIL(c.wifi_for_band(band).current_route())
wvtest.WVPASS(c.bridge.current_route())
# Now delete the config and bring down the bridge and make sure we reprovision
# via the last working BSS.
- c.delete_wlan_config('2.4')
+ c.delete_wlan_config(band)
c.bridge.set_connection_check_result('fail')
- scan_count_2_4 = c.wifi_for_band('2.4').wifi_scan_counter
+ scan_count_for_band = c.wifi_for_band(band).wifi_scan_counter
c.run_until_interface_update()
wvtest.WVFAIL(c.acs())
wvtest.WVFAIL(c.internet())
- # s2 is not what the cycler would suggest trying next.
- wvtest.WVPASSNE('s2', c.wifi_for_band('2.4').cycler.peek())
- # Run only once, so that only one BSS can be tried. It should be the s2 one,
+ # s3 is not what the cycler would suggest trying next.
+ wvtest.WVPASSNE('s3', c.wifi_for_band(band).cycler.peek())
+ # Run only once, so that only one BSS can be tried. It should be the s3 one,
# since that worked previously.
c.run_once()
wvtest.WVPASS(c.acs())
- # Make sure we didn't scan on 2.4.
- wvtest.WVPASSEQ(scan_count_2_4, c.wifi_for_band('2.4').wifi_scan_counter)
+ # Make sure we didn't scan on `band`.
+ wvtest.WVPASSEQ(scan_count_for_band, c.wifi_for_band(band).wifi_scan_counter)
- # Now re-create the WLAN config, connect to the WLAN, and make sure that s2 is
- # unset as last_successful_bss_info if it is no longer available.
- c.write_wlan_config('2.4', ssid, psk)
+ # Now re-create the WLAN config, connect to the WLAN, and make sure that s3 is
+ # unset as last_successful_bss_info, since it is no longer available.
+ c.write_wlan_config(band, ssid, psk)
c.run_once()
wvtest.WVPASS(c.acs())
wvtest.WVPASS(c.internet())
- c.s2_connect = False
- c.delete_wlan_config('2.4')
+ c.can_connect_to_s3 = False
+ c.scan_results_include_hidden = False
+ c.delete_wlan_config(band)
c.run_once()
- wvtest.WVPASSEQ(c.wifi_for_band('2.4').last_successful_bss_info, None)
+ wvtest.WVPASSEQ(c.wifi_for_band(band).last_successful_bss_info, None)
# Now do the same, except this time s2 is connected to but doesn't provide ACS
# access. This requires first re-establishing s2 as successful, so there are
@@ -640,52 +796,83 @@
# disconnecting.
# 4) Connect to s2 but get no ACS access; see that last_successful_bss_info is
# unset.
- c.write_wlan_config('2.4', ssid, psk)
+ c.write_wlan_config(band, ssid, psk)
c.run_once()
wvtest.WVPASS(c.acs())
wvtest.WVPASS(c.internet())
- c.delete_wlan_config('2.4')
+ c.delete_wlan_config(band)
c.run_once()
- wvtest.WVFAIL(c.wifi_for_band('2.4').acs())
+ wvtest.WVFAIL(c.wifi_for_band(band).acs())
- c.s2_connect = True
+ c.can_connect_to_s2 = True
# Give it time to try all BSSIDs.
for _ in range(3):
c.run_once()
s2_bss = iw.BssInfo('01:23:45:67:89:ab', 's2')
- wvtest.WVPASSEQ(c.wifi_for_band('2.4').last_successful_bss_info, s2_bss)
+ wvtest.WVPASSEQ(c.wifi_for_band(band).last_successful_bss_info, s2_bss)
c.s2_fail = True
- c.write_wlan_config('2.4', ssid, psk)
+ c.write_wlan_config(band, ssid, psk)
c.run_once()
wvtest.WVPASS(c.acs())
wvtest.WVPASS(c.internet())
- wvtest.WVPASSEQ(c.wifi_for_band('2.4').last_successful_bss_info, s2_bss)
- c.delete_wlan_config('2.4')
+ wvtest.WVPASSEQ(c.wifi_for_band(band).last_successful_bss_info, s2_bss)
+ c.delete_wlan_config(band)
# Run once so that c will reconnect to s2.
c.run_once()
# Now run until it sees the lack of ACS access.
c.run_until_interface_update()
- wvtest.WVPASSEQ(c.wifi_for_band('2.4').last_successful_bss_info, None)
+ wvtest.WVPASSEQ(c.wifi_for_band(band).last_successful_bss_info, None)
@wvtest.wvtest
-@connection_manager_test(WIFI_SHOW_OUTPUT_ONE_RADIO)
-def connection_manager_test_radio_independent_one_radio(c):
- connection_manager_test_radio_independent(c)
+@connection_manager_test(WIFI_SHOW_OUTPUT_MARVELL8897)
+def connection_manager_test_generic_marvell8897_2g(c):
+ connection_manager_test_generic(c, '2.4')
@wvtest.wvtest
-@connection_manager_test(WIFI_SHOW_OUTPUT_TWO_RADIOS)
-def connection_manager_test_radio_independent_two_radios(c):
- connection_manager_test_radio_independent(c)
+@connection_manager_test(WIFI_SHOW_OUTPUT_MARVELL8897)
+def connection_manager_test_generic_marvell8897_5g(c):
+ connection_manager_test_generic(c, '5')
@wvtest.wvtest
-@connection_manager_test(WIFI_SHOW_OUTPUT_TWO_RADIOS)
-def connection_manager_test_two_radios(c):
+@connection_manager_test(WIFI_SHOW_OUTPUT_ATH9K_ATH10K)
+def connection_manager_test_generic_ath9k_ath10k_2g(c):
+ connection_manager_test_generic(c, '2.4')
+
+
+@wvtest.wvtest
+@connection_manager_test(WIFI_SHOW_OUTPUT_ATH9K_ATH10K)
+def connection_manager_test_generic_ath9k_ath10k_5g(c):
+ connection_manager_test_generic(c, '5')
+
+
+@wvtest.wvtest
+@connection_manager_test(WIFI_SHOW_OUTPUT_ATH9K_FRENZY,
+ quantenna_interface='wlan1')
+def connection_manager_test_generic_ath9k_frenzy_2g(c):
+ connection_manager_test_generic(c, '2.4')
+
+
+@wvtest.wvtest
+@connection_manager_test(WIFI_SHOW_OUTPUT_ATH9K_FRENZY,
+ quantenna_interface='wlan1')
+def connection_manager_test_generic_ath9k_frenzy_5g(c):
+ connection_manager_test_generic(c, '5')
+
+
+@wvtest.wvtest
+@connection_manager_test(WIFI_SHOW_OUTPUT_FRENZY,
+ quantenna_interface='wlan0')
+def connection_manager_test_generic_frenzy_5g(c):
+ connection_manager_test_generic(c, '5')
+
+
+def connection_manager_test_dual_band_two_radios(c):
"""Test ConnectionManager for devices with two radios.
This test should be kept roughly parallel to the one-radio test.
@@ -693,6 +880,10 @@
Args:
c: The ConnectionManager set up by @connection_manager_test.
"""
+ wvtest.WVPASSEQ(len(c._binwifi_commands), 2)
+ for band in ['2.4', '5']:
+ wvtest.WVPASS(('stop', '--band', band, '--persist') in c._binwifi_commands)
+
# Bring up ethernet, access.
c.set_ethernet(True)
c.run_once()
@@ -711,6 +902,8 @@
wvtest.WVPASS(c.access_point_up('2.4'))
wvtest.WVPASS(c.access_point_up('5'))
wvtest.WVPASS(c.bridge.current_route())
+ wvtest.WVFAIL(c.client_up('2.4'))
+ wvtest.WVFAIL(c.client_up('5'))
wvtest.WVFAIL(c.wifi_for_band('2.4').current_route())
wvtest.WVFAIL(c.wifi_for_band('5').current_route())
@@ -757,7 +950,7 @@
wvtest.WVFAIL(c.wifi_for_band('5').current_route())
# The next 2.4 GHz scan will have results.
- c.scan_has_results = True
+ c.interface_with_scan_results = c.wifi_for_band('2.4').name
c.run_until_scan('2.4')
# Now run 3 cycles, so that s2 will have been tried.
for _ in range(3):
@@ -770,15 +963,30 @@
@wvtest.wvtest
-@connection_manager_test(WIFI_SHOW_OUTPUT_ONE_RADIO)
-def connection_manager_test_one_radio(c):
- """Test ConnectionManager for devices with one radio.
+@connection_manager_test(WIFI_SHOW_OUTPUT_ATH9K_ATH10K)
+def connection_manager_test_dual_band_two_radios_ath9k_ath10k(c):
+ connection_manager_test_dual_band_two_radios(c)
- This test should be kept roughly parallel to the two-radio test.
+
+@wvtest.wvtest
+@connection_manager_test(WIFI_SHOW_OUTPUT_ATH9K_FRENZY,
+ quantenna_interface='wlan1')
+def connection_manager_test_dual_band_two_radios_ath9k_frenzy(c):
+ connection_manager_test_dual_band_two_radios(c)
+
+
+def connection_manager_test_dual_band_one_radio(c):
+ """Test ConnectionManager for devices with one dual-band radio.
+
+ This test should be kept roughly parallel to
+ connection_manager_test_dual_band_two_radios.
Args:
c: The ConnectionManager set up by @connection_manager_test.
"""
+ wvtest.WVPASSEQ(len(c._binwifi_commands), 1)
+ wvtest.WVPASSEQ(('stop', '--band', '5', '--persist'), c._binwifi_commands[0])
+
# Bring up ethernet, access.
c.set_ethernet(True)
c.run_once()
@@ -833,8 +1041,8 @@
wvtest.WVFAIL(c.wifi_for_band('2.4').current_route())
wvtest.WVFAIL(c.wifi_for_band('5').current_route())
- # The wcli0 scan will have results that will lead to ACS access.
- c.scan_has_results = True
+ # The 2.4 GHz scan will have results that will lead to ACS access.
+ c.interface_with_scan_results = c.wifi_for_band('2.4').name
c.run_until_scan('5')
for _ in range(3):
c.run_once()
@@ -846,8 +1054,14 @@
@wvtest.wvtest
-@connection_manager_test(WIFI_SHOW_OUTPUT_ONE_RADIO_NO_5GHZ)
-def connection_manager_test_one_radio_no_5ghz(c):
+@connection_manager_test(WIFI_SHOW_OUTPUT_MARVELL8897)
+def connection_manager_test_dual_band_one_radio_marvell8897(c):
+ connection_manager_test_dual_band_one_radio(c)
+
+
+@wvtest.wvtest
+@connection_manager_test(WIFI_SHOW_OUTPUT_MARVELL8897_NO_5GHZ)
+def connection_manager_test_marvell8897_no_5ghz(c):
"""Test ConnectionManager for the case documented in b/27328894.
conman should be able to handle the lack of 5 GHz without actually
@@ -883,7 +1097,7 @@
@wvtest.wvtest
-@connection_manager_test(WIFI_SHOW_OUTPUT_ONE_RADIO,
+@connection_manager_test(WIFI_SHOW_OUTPUT_MARVELL8897,
__test_interfaces_already_up=['eth0', 'wcli0'])
def connection_manager_test_wifi_already_up(c):
"""Test ConnectionManager when wifi is already up.
@@ -895,5 +1109,32 @@
wvtest.WVPASS(c.wifi_for_band('2.4').current_route)
+@wvtest.wvtest
+@connection_manager_test(WIFI_SHOW_OUTPUT_MARVELL8897, wlan_configs={'5': True})
+def connection_manager_one_radio_marvell8897_existing_config_5g_ap(c):
+ wvtest.WVPASSEQ(len(c._binwifi_commands), 1)
+ wvtest.WVPASSEQ(('stopclient', '--band', '5', '--persist'),
+ c._binwifi_commands[0])
+
+
+@wvtest.wvtest
+@connection_manager_test(WIFI_SHOW_OUTPUT_MARVELL8897,
+ wlan_configs={'5': False})
+def connection_manager_one_radio_marvell8897_existing_config_5g_no_ap(c):
+ wvtest.WVPASSEQ(len(c._binwifi_commands), 1)
+ wvtest.WVPASSEQ(('stopap', '--band', '5', '--persist'),
+ c._binwifi_commands[0])
+
+
+@wvtest.wvtest
+@connection_manager_test(WIFI_SHOW_OUTPUT_ATH9K_ATH10K,
+ wlan_configs={'5': True})
+def connection_manager_two_radios_ath9k_ath10k_existing_config_5g_ap(c):
+ wvtest.WVPASSEQ(len(c._binwifi_commands), 2)
+ wvtest.WVPASS(('stop', '--band', '2.4', '--persist') in c._binwifi_commands)
+ wvtest.WVPASS(('stopclient', '--band', '5', '--persist')
+ in c._binwifi_commands)
+
+
if __name__ == '__main__':
wvtest.wvtest_main()
diff --git a/conman/interface.py b/conman/interface.py
index 0f42e20..d45f42e 100755
--- a/conman/interface.py
+++ b/conman/interface.py
@@ -2,6 +2,7 @@
"""Models wired and wireless interfaces."""
+import json
import logging
import os
import re
@@ -311,9 +312,11 @@
class Wifi(Interface):
- """Represents the wireless interface."""
+ """Represents a wireless interface."""
WPA_EVENT_RE = re.compile(r'<\d+>CTRL-EVENT-(?P<event>[A-Z\-]+).*')
+ # pylint: disable=invalid-name
+ WPACtrl = wpactrl.WPACtrl
def __init__(self, *args, **kwargs):
self.bands = kwargs.pop('bands', [])
@@ -333,29 +336,53 @@
return self._wpa_control and self._wpa_control.attached
def attach_wpa_control(self, path):
+ """Attach to the wpa_supplicant control interface.
+
+ Args:
+ path: The path containing the wpa_supplicant control interface socket.
+
+ Returns:
+ Whether attaching was successful.
+ """
if self.attached():
- return
+ return True
socket = os.path.join(path, self.name)
- if os.path.exists(socket):
- try:
- self._wpa_control = self.get_wpa_control(socket)
- self._wpa_control.attach()
- except wpactrl.error as e:
- logging.error('Error attaching to wpa_supplicant: %s', e)
- return
+ try:
+ self._wpa_control = self.get_wpa_control(socket)
+ self._wpa_control.attach()
+ except wpactrl.error as e:
+ logging.error('Error attaching to wpa_supplicant: %s', e)
+ return False
- for line in self._wpa_control.request('STATUS').splitlines():
+ status = self.wpa_cli_status()
+ self.wpa_supplicant = status.get('wpa_state') == 'COMPLETED'
+ if not self._initialized:
+ self.initial_ssid = status.get('ssid')
+
+ return True
+
+ def wpa_cli_status(self):
+ """Parse the STATUS response from the wpa_supplicant CLI.
+
+ Returns:
+ A dict containing the parsed results, where key and value are separated by
+ '=' on each line.
+ """
+ status = {}
+
+ if self._wpa_control:
+ lines = self._wpa_control.request('STATUS').splitlines()
+ for line in lines:
if '=' not in line:
continue
- key, value = line.split('=', 1)
- if key == 'wpa_state':
- self.wpa_supplicant = value == 'COMPLETED'
- elif key == 'ssid' and not self._initialized:
- self.initial_ssid = value
+ k, v = line.strip().split('=', 1)
+ status[k] = v
+
+ return status
def get_wpa_control(self, socket):
- return wpactrl.WPACtrl(socket)
+ return self.WPACtrl(socket)
def detach_wpa_control(self):
if self.attached():
@@ -394,3 +421,115 @@
self.initial_ssid = None
super(Wifi, self).initialize()
+
+class FrenzyWPACtrl(object):
+ """A WPACtrl for Frenzy devices.
+
+ Implements the same functions used on the normal WPACtrl, using a combination
+ of the QCSAPI and wifi_files. Keeps state in order to generate events by
+ diffing saved state with current system state.
+ """
+
+ WIFIINFO_PATH = '/tmp/wifi/wifiinfo'
+
+ def __init__(self, socket):
+ self._interface = os.path.split(socket)[-1]
+
+ # State from QCSAPI and wifi_files.
+ self._client_mode = False
+ self._ssid = None
+ self._status = None
+
+ self._events = []
+
+ def _qcsapi(self, *command):
+ try:
+ return subprocess.check_output(['qcsapi'] + list(command)).strip()
+ except subprocess.CalledProcessError:
+ return None
+
+ def _wifiinfo_filename(self):
+ return os.path.join(self.WIFIINFO_PATH, self._interface)
+
+ def _get_wifiinfo(self):
+ try:
+ return json.load(open(self._wifiinfo_filename()))
+ except IOError:
+ return None
+
+ def _get_ssid(self):
+ wifiinfo = self._get_wifiinfo()
+ if wifiinfo:
+ return wifiinfo.get('SSID')
+
+ def _check_client_mode(self):
+ return self._qcsapi('get_mode', 'wifi0') == 'Station'
+
+ def attach(self):
+ self._update()
+
+ @property
+ def attached(self):
+ return self._client_mode
+
+ def detach(self):
+ raise wpactrl.error('Real WPACtrl always raises this when detaching.')
+
+ def pending(self):
+ self._update()
+ return bool(self._events)
+
+ def _update(self):
+ """Generate and cache events, update state."""
+ client_mode = self._check_client_mode()
+ ssid = self._get_ssid()
+ status = self._qcsapi('get_status', 'wifi0')
+
+ # If we have an SSID and are in client mode, and at least one of those is
+ # new, then we have just connected.
+ if client_mode and ssid and (not self._client_mode or ssid != self._ssid):
+ self._events.append('<2>CTRL-EVENT-CONNECTED')
+
+ # If we are in client mode but lost SSID, we disconnected.
+ if client_mode and self._ssid and not ssid:
+ self._events.append('<2>CTRL-EVENT-DISCONNECTED')
+
+ # If there is an auth/assoc failure, then status (above) is 'Error'. We
+ # really want the converse of this implication (i.e. that 'Error' implies an
+ # auth/assoc failure), but due to limited documentation this will have to
+ # do. It should be good enough: if something else causes get_status to
+ # return 'Error', we are probably not connected, and we don't do anything
+ # special with auth/assoc failures specifically.
+ if client_mode and status == 'Error' and self._status != 'Error':
+ self._events.append('<2>CTRL-EVENT-AUTH-REJECT')
+
+ # If we left client mode, wpa_supplicant has terminated.
+ if self._client_mode and not client_mode:
+ self._events.append('<2>CTRL-EVENT-TERMINATING')
+
+ self._client_mode = client_mode
+ self._ssid = ssid
+ self._status = status
+
+ def recv(self):
+ return self._events.pop(0)
+
+ def request(self, request_type):
+ """Partial implementation of WPACtrl.request."""
+
+ if request_type != 'STATUS':
+ return ''
+
+ self._update()
+
+ if not self._client_mode or not self._ssid:
+ return ''
+
+ return 'wpa_state=COMPLETED\nssid=%s' % self._ssid
+
+
+class FrenzyWifi(Wifi):
+ """Represents a Frenzy wireless interface."""
+
+ # pylint: disable=invalid-name
+ WPACtrl = FrenzyWPACtrl
diff --git a/conman/interface_test.py b/conman/interface_test.py
index f6e03d2..8a376a6 100755
--- a/conman/interface_test.py
+++ b/conman/interface_test.py
@@ -2,11 +2,15 @@
"""Tests for connection_manager.py."""
+import json
import logging
import os
import shutil
import tempfile
+# This is in site-packages on the device, but not when running tests, and so
+# raises lint errors.
+# pylint: disable=g-bad-import-order
import wpactrl
import interface
@@ -70,11 +74,14 @@
self.events = []
self.attached = False
self.connected = False
+ self.ssid_testonly = None
def pending(self):
+ self.check_socket_exists('pending: socket does not exist')
return bool(self.events)
def recv(self):
+ self.check_socket_exists('recv: socket does not exist')
return self.events.pop(0)
def attach(self):
@@ -83,14 +90,15 @@
self.attached = True
def detach(self):
- if not os.path.exists(self._socket):
- raise wpactrl.error('wpactrl_detach failed')
self.attached = False
+ self.ssid_testonly = None
+ self.connected = False
+ self.check_socket_exists('wpactrl_detach failed')
def request(self, request_type):
if request_type == 'STATUS':
- return ('foo\nwpa_state=COMPLETED\nssid=my ssid\nbar' if self.connected
- else 'foo')
+ return ('foo\nwpa_state=COMPLETED\nssid=%s\nbar' % self.ssid_testonly
+ if self.connected else 'foo')
else:
raise ValueError('Invalid request_type %s' % request_type)
@@ -99,6 +107,22 @@
def add_event(self, event):
self.events.append(event)
+ def add_connected_event(self):
+ self.connected = True
+ self.add_event(Wifi.CONNECTED_EVENT)
+
+ def add_disconnected_event(self):
+ self.connected = False
+ self.add_event(Wifi.DISCONNECTED_EVENT)
+
+ def add_terminating_event(self):
+ self.connected = False
+ self.add_event(Wifi.TERMINATING_EVENT)
+
+ def check_socket_exists(self, msg='Fake socket does not exist'):
+ if not os.path.exists(self._socket):
+ raise wpactrl.error(msg)
+
class Wifi(FakeInterfaceMixin, interface.Wifi):
"""Fake Wifi for testing."""
@@ -107,31 +131,137 @@
DISCONNECTED_EVENT = '<2>CTRL-EVENT-DISCONNECTED'
TERMINATING_EVENT = '<2>CTRL-EVENT-TERMINATING'
+ WPACtrl = FakeWPACtrl
+
def __init__(self, *args, **kwargs):
super(Wifi, self).__init__(*args, **kwargs)
- self._initially_connected = False
+ self._initial_ssid_testonly = None
- def attach_wpa_control(self, *args, **kwargs):
- if self._initially_connected and self._wpa_control:
+ def attach_wpa_control(self, path):
+ if self._initial_ssid_testonly and self._wpa_control:
self._wpa_control.connected = True
- super(Wifi, self).attach_wpa_control(*args, **kwargs)
+ super(Wifi, self).attach_wpa_control(path)
- def get_wpa_control(self, socket):
- result = FakeWPACtrl(socket)
- result.connected = self._initially_connected
+ def get_wpa_control(self, *args, **kwargs):
+ result = super(Wifi, self).get_wpa_control(*args, **kwargs)
+ if self._initial_ssid_testonly:
+ result.connected = True
+ result.ssid_testonly = self._initial_ssid_testonly
return result
def add_connected_event(self):
if self.attached():
- self._wpa_control.add_event(self.CONNECTED_EVENT)
+ self._wpa_control.add_connected_event()
def add_disconnected_event(self):
+ self._initial_ssid_testonly = None
if self.attached():
- self._wpa_control.add_event(self.DISCONNECTED_EVENT)
+ self._wpa_control.add_disconnected_event()
def add_terminating_event(self):
+ self._initial_ssid_testonly = None
if self.attached():
- self._wpa_control.add_event(self.TERMINATING_EVENT)
+ self._wpa_control.add_terminating_event()
+
+ def detach_wpa_control(self):
+ self._initial_ssid_testonly = None
+ super(Wifi, self).detach_wpa_control()
+
+ def start_wpa_supplicant_testonly(self, path):
+ logging.debug('Starting fake wpa_supplicant for %s', self.name)
+ open(os.path.join(path, self.name), 'w')
+
+ def kill_wpa_supplicant_testonly(self, path):
+ logging.debug('Killing fake wpa_supplicant for %s', self.name)
+ if self.attached():
+ self.detach_wpa_control()
+ os.unlink(os.path.join(path, self.name))
+ else:
+ raise RuntimeError('Trying to kill wpa_supplicant while not attached')
+
+
+class FrenzyWPACtrl(interface.FrenzyWPACtrl):
+
+ def __init__(self, *args, **kwargs):
+ super(FrenzyWPACtrl, self).__init__(*args, **kwargs)
+ self.ssid_testonly = None
+
+ def _qcsapi(self, *command):
+ return self.fake_qcsapi.get(command[0], None)
+
+ def add_connected_event(self):
+ self.fake_qcsapi['get_mode'] = 'Station'
+ json.dump({'SSID': self.ssid_testonly},
+ open(self._wifiinfo_filename(), 'w'))
+
+ def add_disconnected_event(self):
+ self.ssid_testonly = None
+ json.dump({'SSID': ''}, open(self._wifiinfo_filename(), 'w'))
+
+ def add_terminating_event(self):
+ self.ssid_testonly = None
+ json.dump({'SSID': ''}, open(self._wifiinfo_filename(), 'w'))
+ self.fake_qcsapi['get_mode'] = 'AP'
+
+ def detach(self):
+ self.add_terminating_event()
+ super(FrenzyWPACtrl, self).detach()
+
+
+class FrenzyWifi(FakeInterfaceMixin, interface.FrenzyWifi):
+ WPACtrl = FrenzyWPACtrl
+
+ def __init__(self, *args, **kwargs):
+ super(FrenzyWifi, self).__init__(*args, **kwargs)
+ self._initial_ssid_testonly = None
+ self.fake_qcsapi = {}
+
+ def attach_wpa_control(self, *args, **kwargs):
+ super(FrenzyWifi, self).attach_wpa_control(*args, **kwargs)
+ if self._wpa_control:
+ self._wpa_control.ssid_testonly = self._initial_ssid_testonly
+ if self._initial_ssid_testonly:
+ self._wpa_control.add_connected_event()
+
+ def get_wpa_control(self, *args, **kwargs):
+ result = super(FrenzyWifi, self).get_wpa_control(*args, **kwargs)
+ result.fake_qcsapi = self.fake_qcsapi
+ if self._initial_ssid_testonly:
+ result.fake_qcsapi['get_mode'] = 'Station'
+ result.ssid_testonly = self._initial_ssid_testonly
+ result.add_connected_event()
+ return result
+
+ def add_connected_event(self):
+ if self.attached():
+ self._wpa_control.add_connected_event()
+
+ def add_disconnected_event(self):
+ self._initial_ssid_testonly = None
+ if self.attached():
+ self._wpa_control.add_disconnected_event()
+
+ def add_terminating_event(self):
+ self._initial_ssid_testonly = None
+ if self.attached():
+ self._wpa_control.add_terminating_event()
+
+ def detach_wpa_control(self):
+ self._initial_ssid_testonly = None
+ super(FrenzyWifi, self).detach_wpa_control()
+
+ def start_wpa_supplicant_testonly(self, unused_path):
+ logging.debug('Starting fake wpa_supplicant for %s', self.name)
+ self.fake_qcsapi['get_mode'] = 'Station'
+
+ def kill_wpa_supplicant_testonly(self, unused_path):
+ logging.debug('Killing fake wpa_supplicant for %s', self.name)
+ if self.attached():
+ # This happens to do what we need.
+ self.add_terminating_event()
+ self.detach_wpa_control()
+ else:
+ raise RuntimeError('Trying to kill wpa_supplicant while not attached')
@wvtest.wvtest
@@ -199,6 +329,50 @@
shutil.rmtree(tmp_dir)
+def generic_wifi_test(w, wpa_path):
+ # Not currently connected.
+ w.start_wpa_supplicant_testonly(wpa_path)
+ w.attach_wpa_control(wpa_path)
+ wvtest.WVFAIL(w.wpa_supplicant)
+
+ # pylint: disable=protected-access
+ wpa_control = w._wpa_control
+
+ # wpa_supplicant connects.
+ wpa_control.ssid_testonly = 'my=ssid'
+ wpa_control.add_connected_event()
+ wvtest.WVFAIL(w.wpa_supplicant)
+ w.handle_wpa_events()
+ wvtest.WVPASS(w.wpa_supplicant)
+ w.set_gateway_ip('192.168.1.1')
+
+ # wpa_supplicant disconnects.
+ wpa_control.add_disconnected_event()
+ w.handle_wpa_events()
+ wvtest.WVFAIL(w.wpa_supplicant)
+
+ # Now, start over so we can test what happens when wpa_supplicant is already
+ # connected when we attach.
+ w.detach_wpa_control()
+ # pylint: disable=protected-access
+ w._initial_ssid_testonly = 'my=ssid'
+ w._initialized = False
+ w.attach_wpa_control(wpa_path)
+ wpa_control = w._wpa_control
+
+ # wpa_supplicant was already connected when we attached.
+ wvtest.WVPASS(w.wpa_supplicant)
+ wvtest.WVPASSEQ(w.initial_ssid, 'my=ssid')
+ w.initialize()
+ wvtest.WVPASSEQ(w.initial_ssid, None)
+
+ # The wpa_supplicant process disconnects and terminates.
+ wpa_control.add_disconnected_event()
+ wpa_control.add_terminating_event()
+ w.handle_wpa_events()
+ wvtest.WVFAIL(w.wpa_supplicant)
+
+
@wvtest.wvtest
def wifi_test():
"""Test Wifi."""
@@ -208,53 +382,29 @@
try:
wpa_path = tempfile.mkdtemp()
- socket = os.path.join(wpa_path, w.name)
- open(socket, 'w')
-
- # Not currently connected.
- w.attach_wpa_control(wpa_path)
- wvtest.WVFAIL(w.wpa_supplicant)
-
- # pylint: disable=protected-access
- wpa_control = w._wpa_control
-
- # wpa_supplicant connects.
- wpa_control.add_event(Wifi.CONNECTED_EVENT)
- wvtest.WVFAIL(w.wpa_supplicant)
- w.handle_wpa_events()
- wvtest.WVPASS(w.wpa_supplicant)
- w.set_gateway_ip('192.168.1.1')
-
- # wpa_supplicant disconnects.
- wpa_control.add_event(Wifi.DISCONNECTED_EVENT)
- w.handle_wpa_events()
- wvtest.WVFAIL(w.wpa_supplicant)
-
- # Now, start over so we can test what happens when wpa_supplicant is already
- # connected when we attach.
- w.detach_wpa_control()
- # pylint: disable=protected-access
- w._initially_connected = True
- w._initialized = False
- w.attach_wpa_control(wpa_path)
- wpa_control = w._wpa_control
-
- # wpa_supplicant was already connected when we attached.
- wvtest.WVPASS(w.wpa_supplicant)
- wvtest.WVPASSEQ(w.initial_ssid, 'my ssid')
- w.initialize()
- wvtest.WVPASSEQ(w.initial_ssid, None)
-
- # The wpa_supplicant process disconnects and terminates.
- wpa_control.add_event(Wifi.DISCONNECTED_EVENT)
- wpa_control.add_event(Wifi.TERMINATING_EVENT)
- os.unlink(socket)
- w.handle_wpa_events()
- wvtest.WVFAIL(w.wpa_supplicant)
+ generic_wifi_test(w, wpa_path)
finally:
shutil.rmtree(wpa_path)
+@wvtest.wvtest
+def frenzy_wifi_test():
+ """Test FrenzyWifi."""
+ w = FrenzyWifi('wlan0', '20')
+ w.set_connection_check_result('succeed')
+ w.initialize()
+
+ try:
+ wpa_path = tempfile.mkdtemp()
+ FrenzyWifi.WPACtrl.WIFIINFO_PATH = wifiinfo_path = tempfile.mkdtemp()
+
+ generic_wifi_test(w, wpa_path)
+
+ finally:
+ shutil.rmtree(wpa_path)
+ shutil.rmtree(wifiinfo_path)
+
+
if __name__ == '__main__':
wvtest.wvtest_main()
diff --git a/conman/iw.py b/conman/iw.py
index 973d653..f4932f1 100755
--- a/conman/iw.py
+++ b/conman/iw.py
@@ -6,9 +6,12 @@
import subprocess
-def _scan(interface, **kwargs):
+FIBER_OUI = 'f4:f5:e8'
+
+
+def _scan(band, **kwargs):
try:
- return subprocess.check_output(('iw', 'dev', interface, 'scan'), **kwargs)
+ return subprocess.check_output(('wifi', 'scan', '-b', band), **kwargs)
except subprocess.CalledProcessError:
return ''
@@ -36,6 +39,9 @@
# pylint: disable=protected-access
return isinstance(other, BssInfo) and self.__attrs() == other.__attrs()
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
def __hash__(self):
return hash(self.__attrs())
@@ -47,11 +53,11 @@
# TODO(rofrankel): waveguide also scans. Can we find a way to avoid two programs
# scanning in parallel?
-def scan_parsed(interface, **kwargs):
+def scan_parsed(band, **kwargs):
"""Return the parsed results of 'iw scan'."""
result = []
bss_info = None
- for line in _scan(interface, **kwargs).splitlines():
+ for line in _scan(band, **kwargs).splitlines():
line = line.strip()
match = re.match(_BSSID_RE, line)
if match:
@@ -81,11 +87,11 @@
return result
-def find_bssids(interface, vendor_ie_function, include_secure):
+def find_bssids(band, vendor_ie_function, include_secure):
"""Return information about interesting access points.
Args:
- interface: The wireless interface with which to scan.
+ band: The band on which to scan.
vendor_ie_function: A function that takes a vendor IE and returns a bool.
include_secure: Whether to exclude secure networks.
@@ -94,13 +100,21 @@
BSSIDs which have a vendor IE accepted by vendor_ie_function, and the second
list has those which don't.
"""
- parsed = scan_parsed(interface)
+ parsed = scan_parsed(band)
result_with_ie = set()
result_without_ie = set()
for bss_info in parsed:
if bss_info.security and not include_secure:
continue
+
+ for oui, data in bss_info.vendor_ies:
+ if oui == FIBER_OUI:
+ octets = data.split()
+ if octets[0] == '03' and not bss_info.ssid:
+ bss_info.ssid = ''.join(octets[1:]).decode('hex')
+ continue
+
for oui, data in bss_info.vendor_ies:
if vendor_ie_function(oui, data):
result_with_ie.add(bss_info)
diff --git a/conman/iw_test.py b/conman/iw_test.py
index c069c91..9c259e8 100755
--- a/conman/iw_test.py
+++ b/conman/iw_test.py
@@ -486,6 +486,72 @@
* BK: CW 15-1023, AIFSN 7
* VI: CW 7-15, AIFSN 2, TXOP 3008 usec
* VO: CW 3-7, AIFSN 2, TXOP 1504 usec
+BSS 94:b4:0f:f1:36:41(on wcli0)
+ TSF: 12499150000 usec (0d, 03:28:19)
+ freq: 2437
+ beacon interval: 100 TUs
+ capability: ESS Privacy ShortPreamble SpectrumMgmt ShortSlotTime RadioMeasure (0x1531)
+ signal: -66.00 dBm
+ last seen: 2350 ms ago
+ Information elements from Probe Response frame:
+ SSID:
+ Supported rates: 36.0* 48.0 54.0
+ DS Parameter set: channel 6
+ TIM: DTIM Count 0 DTIM Period 1 Bitmap Control 0x0 Bitmap[0] 0x0
+ Country: US Environment: Indoor/Outdoor
+ Channels [1 - 11] @ 36 dBm
+ Power constraint: 0 dB
+ TPC report: TX power: 3 dBm
+ ERP: <no flags>
+ BSS Load:
+ * station count: 0
+ * channel utilisation: 28/255
+ * available admission capacity: 27500 [*32us]
+ HT capabilities:
+ Capabilities: 0x19ad
+ RX LDPC
+ HT20
+ SM Power Save disabled
+ RX HT20 SGI
+ TX STBC
+ RX STBC 1-stream
+ Max AMSDU length: 7935 bytes
+ DSSS/CCK HT40
+ Maximum RX AMPDU length 65535 bytes (exponent: 0x003)
+ Minimum RX AMPDU time spacing: 4 usec (0x05)
+ HT RX MCS rate indexes supported: 0-23
+ HT TX MCS rate indexes are undefined
+ HT operation:
+ * primary channel: 6
+ * secondary channel offset: no secondary
+ * STA channel width: 20 MHz
+ * RIFS: 1
+ * HT protection: nonmember
+ * non-GF present: 1
+ * OBSS non-GF present: 1
+ * dual beacon: 0
+ * dual CTS protection: 0
+ * STBC beacon: 0
+ * L-SIG TXOP Prot: 0
+ * PCO active: 0
+ * PCO phase: 0
+ Overlapping BSS scan params:
+ * passive dwell: 20 TUs
+ * active dwell: 10 TUs
+ * channel width trigger scan interval: 300 s
+ * scan passive total per channel: 200 TUs
+ * scan active total per channel: 20 TUs
+ * BSS width channel transition delay factor: 5
+ * OBSS Scan Activity Threshold: 0.25 %
+ Extended capabilities: HT Information Exchange Supported, Extended Channel Switching, BSS Transition, 6
+ WMM: * Parameter version 1
+ * u-APSD
+ * BE: CW 15-1023, AIFSN 3
+ * BK: CW 15-1023, AIFSN 7
+ * VI: CW 7-15, AIFSN 2, TXOP 3008 usec
+ * VO: CW 3-7, AIFSN 2, TXOP 1504 usec
+ Vendor specific: OUI 00:11:22, data: 01 23 45 67
+ Vendor specific: OUI f4:f5:e8, data: 03 47 46 69 62 65 72 53 65 74 75 70 41 75 74 6f 6d 61 74 69 6f 6e
"""
@@ -498,15 +564,23 @@
@wvtest.wvtest
def find_bssids_test():
"""Test iw.find_bssids."""
+ test_ie = ('00:11:22', '01 23 45 67')
+ ssid_ie = (
+ 'f4:f5:e8',
+ '03 47 46 69 62 65 72 53 65 74 75 70 41 75 74 6f 6d 61 74 69 6f 6e',
+ )
short_scan_result = iw.BssInfo(ssid='short scan result',
bssid='00:23:97:57:f4:d8',
security=['WEP'],
- vendor_ies=[('00:11:22', '01 23 45 67')])
+ vendor_ies=[test_ie])
+ provisioning_bss_info = iw.BssInfo(ssid='GFiberSetupAutomation',
+ bssid='94:b4:0f:f1:36:41',
+ vendor_ies=[test_ie, ssid_ie])
with_ie, without_ie = iw.find_bssids('wcli0', lambda o, d: o == '00:11:22',
True)
- wvtest.WVPASSEQ(with_ie, set([short_scan_result]))
+ wvtest.WVPASSEQ(with_ie, set([short_scan_result, provisioning_bss_info]))
wvtest.WVPASSEQ(
without_ie,
@@ -524,7 +598,7 @@
with_ie, without_ie = iw.find_bssids('wcli0', lambda o, d: o == '00:11:22',
False)
- wvtest.WVPASSEQ(with_ie, set())
+ wvtest.WVPASSEQ(with_ie, set([provisioning_bss_info]))
wvtest.WVPASSEQ(
without_ie,
set([iw.BssInfo(ssid='GoogleGuest', bssid='94:b4:0f:f1:36:41'),
diff --git a/conman/status.py b/conman/status.py
index 118bafc..e21dc01 100644
--- a/conman/status.py
+++ b/conman/status.py
@@ -29,6 +29,7 @@
COULD_REACH_ACS = 'COULD_REACH_ACS'
CAN_REACH_INTERNET = 'CAN_REACH_INTERNET'
PROVISIONING_FAILED = 'PROVISIONING_FAILED'
+ ATTACHED_TO_WPA_SUPPLICANT = 'ATTACHED_TO_WPA_SUPPLICANT'
# Format: { proposition: (implications, counter-implications), ... }
@@ -63,9 +64,13 @@
(P.COULD_REACH_ACS,),
),
P.HAVE_WORKING_CONFIG: (
- (),
(P.HAVE_CONFIG,),
+ (),
),
+ P.ATTACHED_TO_WPA_SUPPLICANT: (
+ (),
+ (),
+ )
}
@@ -78,7 +83,7 @@
def __init__(self, name, export_path):
self._name = name
self._export_path = export_path
- self._value = False
+ self._value = None
self._implications = set()
self._counter_implications = set()
self._impliers = set()
diff --git a/wifi/configs.py b/wifi/configs.py
index 11867b2..676182a 100644
--- a/wifi/configs.py
+++ b/wifi/configs.py
@@ -23,6 +23,14 @@
experiment.register(_i)
+# From http://go/alphabet-ie-registry, OUI f4f5e8.
+# The properties of this class are hex string representations of varints.
+# pylint: disable=invalid-name
+class VENDOR_IE_FEATURE_ID(object):
+ SUPPORTS_PROVISIONING = '01'
+ PROVISIONING_SSID = '03'
+
+
# Recommended HT40/VHT80 settings for given primary channels.
# HT40 channels can fall back to 20 MHz, and VHT80 can fall back to 40 or 20.
# So we configure using a "primary" 20 MHz channel, then allow wider
@@ -78,6 +86,7 @@
{require_vht}
{hidden}
{ap_isolate}
+{vendor_elements}
ht_capab={ht20}{ht40}{guard_interval}{ht_rxstbc}
{vht_settings}
@@ -285,7 +294,8 @@
ht_rxstbc=ht_rxstbc, vht_settings=vht_settings,
guard_interval=guard_interval, enable_wmm=enable_wmm, hidden=hidden,
ap_isolate=ap_isolate, auth_algs=auth_algs, bridge=bridge,
- ssid=utils.sanitize_ssid(opt.ssid))]
+ ssid=utils.sanitize_ssid(opt.ssid),
+ vendor_elements=get_vendor_elements(opt))]
if opt.encryption != 'NONE':
hostapd_conf_parts.append(_HOSTCONF_WPA_TPL.format(
@@ -342,3 +352,47 @@
]
return '\n'.join(lines)
+
+def create_vendor_ie(feature_id, payload=''):
+ """Create a vendor IE in hostapd config format.
+
+ Args:
+ feature_id: The go/alphabet-ie-registry feature ID for OUI f4f5e8.
+ payload: A string payload (must be ASCII), or none.
+
+ Returns:
+ The vendor IE, as a string.
+ """
+ length = '%02x' % (3 + (len(feature_id)/2) + len(payload))
+ oui = 'f4f5e8'
+ return 'dd%s%s%s%s' % (length, oui, feature_id, payload.encode('hex'))
+
+
+def get_vendor_elements(opt):
+ """Get vendor_elements value hostapd config.
+
+ The way to specify multiple vendor IEs in hostapd is to concatenate them, e.g.
+
+ vendor_elements=dd0411223301dd051122330203
+
+ Args:
+ opt: The optdict containing user-specified options.
+
+ Returns:
+ The vendor_elements string (including that prefix, or empty if there are no
+ vendor IEs.)
+ """
+ vendor_ies = []
+
+ if opt.supports_provisioning:
+ vendor_ies.append(
+ create_vendor_ie(VENDOR_IE_FEATURE_ID.SUPPORTS_PROVISIONING))
+
+ if opt.hidden_mode:
+ vendor_ies.append(
+ create_vendor_ie(VENDOR_IE_FEATURE_ID.PROVISIONING_SSID, opt.ssid))
+
+ if vendor_ies:
+ return 'vendor_elements=%s' % ''.join(vendor_ies)
+
+ return ''
diff --git a/wifi/configs_test.py b/wifi/configs_test.py
index 77773ce..ece4ca9 100755
--- a/wifi/configs_test.py
+++ b/wifi/configs_test.py
@@ -310,6 +310,7 @@
+
ht_capab=[HT20][RX-STBC1]
"""
@@ -333,6 +334,31 @@
+
+ht_capab=[HT20][RX-STBC1]
+
+"""
+
+_HOSTAPD_CONFIG_PROVISION_VIA = """ctrl_interface=/var/run/hostapd
+interface=wlan0
+
+ssid=TEST_SSID
+utf8_ssid=1
+auth_algs=1
+hw_mode=g
+channel=1
+country_code=US
+ieee80211d=1
+ieee80211h=1
+ieee80211n=1
+
+
+
+
+ignore_broadcast_ssid=1
+
+vendor_elements=dd04f4f5e801dd0df4f5e803544553545f53534944
+
ht_capab=[HT20][RX-STBC1]
"""
@@ -366,6 +392,7 @@
self.persist = False
self.interface_suffix = ''
self.client_isolation = False
+ self.supports_provisioning = False
# pylint: disable=protected-access
@@ -393,6 +420,20 @@
config)
opt.bridge = default_bridge
+ # Test provisioning IEs.
+ default_hidden_mode, opt.hidden_mode = opt.hidden_mode, True
+ default_supports_provisioning, opt.supports_provisioning = (
+ opt.supports_provisioning, True)
+ config = configs.generate_hostapd_config(
+ _PHY_INFO, 'wlan0', '2.4', '1', '20', set(('a', 'b', 'g', 'n', 'ac')),
+ 'asdfqwer', opt)
+ wvtest.WVPASSEQ('\n'.join((_HOSTAPD_CONFIG_PROVISION_VIA,
+ _HOSTAPD_CONFIG_WPA,
+ '# Experiments: ()\n')),
+ config)
+ opt.hidden_mode = default_hidden_mode
+ opt.supports_provisioning = default_supports_provisioning
+
# Test with no encryption.
default_encryption, opt.encryption = opt.encryption, 'NONE'
config = configs.generate_hostapd_config(
@@ -444,5 +485,12 @@
wvtest.WVPASSEQ(new_config, config)
+@wvtest.wvtest
+def create_vendor_ie_test():
+ wvtest.WVPASSEQ(configs.create_vendor_ie('01'), 'dd04f4f5e801')
+ wvtest.WVPASSEQ(configs.create_vendor_ie('03', 'GFiberSetupAutomation'),
+ 'dd19f4f5e80347466962657253657475704175746f6d6174696f6e')
+
+
if __name__ == '__main__':
wvtest.wvtest_main()
diff --git a/wifi/iw.py b/wifi/iw.py
index 647e56c..ae2a8b6 100644
--- a/wifi/iw.py
+++ b/wifi/iw.py
@@ -54,8 +54,8 @@
def _scan(interface, scan_args, **kwargs):
- return subprocess.check_output(['iw', 'dev', interface, 'scan'] + scan_args,
- **kwargs)
+ return subprocess.check_output(
+ ['iw', 'dev', interface, 'scan', '-u'] + scan_args, **kwargs)
_WIPHY_RE = re.compile(r'Wiphy (?P<phy>\S+)')
diff --git a/wifi/quantenna.py b/wifi/quantenna.py
index 12ae6f4..e0f5a28 100755
--- a/wifi/quantenna.py
+++ b/wifi/quantenna.py
@@ -2,6 +2,7 @@
"""Wifi commands for Quantenna using QCSAPI."""
+import json
import os
import subprocess
import time
@@ -9,6 +10,9 @@
import utils
+WIFIINFO_PATH = '/tmp/wifi/wifiinfo'
+
+
ALREADY_MEMBER_FMT = ('device %s is already a member of a bridge; '
"can't enslave it to bridge %s.")
NOT_MEMBER_FMT = 'device %s is not a slave of %s'
@@ -35,6 +39,23 @@
stderr=subprocess.STDOUT).strip()
+def _ifplugd_action(*args):
+ return subprocess.check_output(['/etc/ifplugd/ifplugd.action'] + list(args),
+ stderr=subprocess.STDOUT).strip()
+
+
+def info_parsed(interface):
+ """Fake version of iw.info_parsed."""
+ wifiinfo_filename = os.path.join(WIFIINFO_PATH, interface)
+
+ if not os.path.exists(wifiinfo_filename):
+ return {}
+
+ wifiinfo = json.load(open(wifiinfo_filename))
+ return {'addr' if k == 'BSSID' else k.lower(): v
+ for k, v in wifiinfo.iteritems()}
+
+
def _set_interface_in_bridge(bridge, interface, want_in_bridge):
"""Add/remove Quantenna interface from/to the bridge."""
if want_in_bridge:
@@ -106,6 +127,12 @@
else:
raise utils.BinWifiException('wpa_supplicant failed to connect')
+ try:
+ _ifplugd_action(interface, 'up')
+ except subprocess.CalledProcessError:
+ utils.log('Failed to call ifplugd.action. %s may not get an IP address.'
+ % interface)
+
return True
diff --git a/wifi/quantenna_test.py b/wifi/quantenna_test.py
index 19aa0a2..ad6db09 100755
--- a/wifi/quantenna_test.py
+++ b/wifi/quantenna_test.py
@@ -2,8 +2,11 @@
"""Tests for quantenna.py."""
+import json
import os
+import shutil
from subprocess import CalledProcessError
+import tempfile
from configs_test import FakeOptDict
import quantenna
@@ -11,6 +14,7 @@
calls = []
+ifplugd_action_calls = []
def fake_qcsapi(*args):
@@ -51,6 +55,7 @@
def set_fakes(interface='wlan1'):
del calls[:]
+ del ifplugd_action_calls[:]
bridge_interfaces.clear()
os.environ['WIFI_PSK'] = 'wifi_psk'
os.environ['WIFI_CLIENT_PSK'] = 'wifi_client_psk'
@@ -58,6 +63,7 @@
quantenna._get_mac_address = lambda _: '00:11:22:33:44:55'
quantenna._qcsapi = fake_qcsapi
quantenna._brctl = fake_brctl
+ quantenna._ifplugd_action = lambda *args: ifplugd_action_calls.append(args)
def matching_calls_indices(accept):
@@ -132,6 +138,9 @@
wvtest.WVPASSLT(sp, i[0])
wvtest.WVPASSLT(i[-1], calls.index(['rfenable', '1']))
+ # We shouldn't touch ifplugd in AP mode.
+ wvtest.WVPASSEQ(len(ifplugd_action_calls), 0)
+
# Run set_wifi again in client mode with new options.
opt.channel = '147'
opt.ssid = 'TEST_SSID2'
@@ -178,6 +187,10 @@
wvtest.WVPASSLT(rim, i[0])
wvtest.WVPASSLT(i[-1], calls.index(['apply_security_config', 'wifi0']))
+ # We should have called ipflugd.action after setclient.
+ wvtest.WVPASSEQ(len(ifplugd_action_calls), 1)
+ wvtest.WVPASSEQ(ifplugd_action_calls[0], ('wlan1', 'up'))
+
# Make sure subsequent equivalent calls don't fail despite the redundant
# bridge changes.
wvtest.WVPASS(quantenna.set_client_wifi(opt))
@@ -200,5 +213,25 @@
wvtest.WVPASS(['rfenable', '0'] not in calls[new_calls_start:])
+@wvtest.wvtest
+def info_parsed_test():
+ set_fakes()
+
+ try:
+ quantenna.WIFIINFO_PATH = tempfile.mkdtemp()
+ json.dump({
+ 'Channel': '64',
+ 'SSID': 'my ssid',
+ 'BSSID': '00:00:00:00:00:00',
+ }, open(os.path.join(quantenna.WIFIINFO_PATH, 'wlan0'), 'w'))
+
+ wvtest.WVPASSEQ(quantenna.info_parsed('wlan0'), {
+ 'ssid': 'my ssid',
+ 'addr': '00:00:00:00:00:00',
+ 'channel': '64',
+ })
+ finally:
+ shutil.rmtree(quantenna.WIFIINFO_PATH)
+
if __name__ == '__main__':
wvtest.wvtest_main()
diff --git a/wifi/wifi.py b/wifi/wifi.py
index 9ba8b31..01bba84 100755
--- a/wifi/wifi.py
+++ b/wifi/wifi.py
@@ -55,6 +55,7 @@
scan-ap-force (Scan only) scan when in AP mode
scan-passive (Scan only) do not probe, scan passively
scan-freq= (Scan only) limit scan to specific frequencies.
+supports-provisioning Indicate via vendor IE that this AP supports provisioning. Corresponds to feature ID 01 of OUI f4f5e8 at go/alphabet-ie-registry.
"""
_FINGERPRINTS_DIRECTORY = '/tmp/wifi/fingerprints'
@@ -458,6 +459,7 @@
True.
"""
for band in opt.band.split():
+ frenzy = False
print('Band: %s' % band)
for tokens in utils.subprocess_line_tokens(('iw', 'reg', 'get')):
if len(tokens) >= 2 and tokens[0] == 'country':
@@ -469,11 +471,20 @@
interface = iw.find_interface_from_band(
band, interface_type, opt.interface_suffix)
if interface is None:
- continue
- print('%sInterface: %s # %s GHz %s' %
- (prefix, interface, band, 'client' if 'cli' in interface else 'ap'))
+ if band == '5':
+ interface = _get_quantenna_interface()
+ if interface:
+ frenzy = True
+ if not interface:
+ continue
- info_parsed = iw.info_parsed(interface)
+ print('%sInterface: %s # %s GHz %s' %
+ (prefix, interface, band, prefix.lower() or 'ap'))
+
+ if frenzy:
+ info_parsed = quantenna.info_parsed(interface)
+ else:
+ info_parsed = iw.info_parsed(interface)
for k, label in (('channel', 'Channel'),
('ssid', 'SSID'),
('addr', 'BSSID')):
@@ -500,6 +511,13 @@
return True
+def _get_quantenna_interface():
+ try:
+ return subprocess.check_output(['get-quantenna-interface']).strip()
+ except subprocess.CalledProcessError as e:
+ utils.log('Failed to call get-quantenna-interface: %s', e)
+
+
@iw.requires_iw
def scan_wifi(opt):
"""Prints 'iw scan' results.