conman: Fake WPACtrl support for Frenzy.
This creates a FrenzyWPACtrl class that uses wifi_files and QCSAPI to
emulate WPACtrl.
Refactored interface.Wifi tests a bit so that the same test code could
be used for interface.Wifi and interface.FrenzyWifi.
Change-Id: I5d2befa387ede9a62fc1b70f69c8f655ae4b9dd2
diff --git a/conman/connection_manager_test.py b/conman/connection_manager_test.py
index e84cf26..76ee910 100755
--- a/conman/connection_manager_test.py
+++ b/conman/connection_manager_test.py
@@ -197,7 +197,6 @@
def stop_client(self):
if self.client_up:
self.wifi.add_terminating_event()
- os.unlink(self._socket())
self.wifi.set_connection_check_result('fail')
# See comments in start_client.
diff --git a/conman/interface.py b/conman/interface.py
index 0d81b65..3acdf24 100755
--- a/conman/interface.py
+++ b/conman/interface.py
@@ -2,6 +2,7 @@
"""Models wired and wireless interfaces."""
+import json
import logging
import os
import re
@@ -311,9 +312,11 @@
class Wifi(Interface):
- """Represents the wireless interface."""
+ """Represents a wireless interface."""
WPA_EVENT_RE = re.compile(r'<\d+>CTRL-EVENT-(?P<event>[A-Z\-]+).*')
+ # pylint: disable=invalid-name
+ WPACtrl = wpactrl.WPACtrl
def __init__(self, *args, **kwargs):
self.bands = kwargs.pop('bands', [])
@@ -345,30 +348,26 @@
return True
socket = os.path.join(path, self.name)
- if os.path.exists(socket):
- try:
- self._wpa_control = self.get_wpa_control(socket)
- self._wpa_control.attach()
- except wpactrl.error as e:
- logging.error('Error attaching to wpa_supplicant: %s', e)
- return False
-
- for line in self._wpa_control.request('STATUS').splitlines():
- if '=' not in line:
- continue
- key, value = line.split('=', 1)
- if key == 'wpa_state':
- self.wpa_supplicant = value == 'COMPLETED'
- elif key == 'ssid' and not self._initialized:
- self.initial_ssid = value
- else:
- logging.error('wpa control socket does not exist: %s', socket)
+ try:
+ self._wpa_control = self.get_wpa_control(socket)
+ self._wpa_control.attach()
+ except wpactrl.error as e:
+ logging.error('Error attaching to wpa_supplicant: %s', e)
return False
+ for line in self._wpa_control.request('STATUS').splitlines():
+ if '=' not in line:
+ continue
+ key, value = line.split('=', 1)
+ if key == 'wpa_state':
+ self.wpa_supplicant = value == 'COMPLETED'
+ elif key == 'ssid' and not self._initialized:
+ self.initial_ssid = value
+
return True
def get_wpa_control(self, socket):
- return wpactrl.WPACtrl(socket)
+ return self.WPACtrl(socket)
def detach_wpa_control(self):
if self.attached():
@@ -407,3 +406,107 @@
self.initial_ssid = None
super(Wifi, self).initialize()
+
+class FrenzyWPACtrl(object):
+ """A fake WPACtrl for Frenzy devices.
+
+ Implements the same functions used on the normal WPACtrl, using a combination
+ of the QCSAPI and wifi_files. Keeps state in order to generate fake events by
+ diffing saved state with current system state.
+ """
+
+ WIFIINFO_PATH = '/tmp/wifi/wifiinfo'
+
+ def __init__(self, socket):
+ self._interface = os.path.split(socket)[-1]
+
+ # State from QCSAPI and wifi_files.
+ self._client_mode = False
+ self._ssid = None
+ self._status = None
+
+ self._events = []
+
+ def _qcsapi(self, *command):
+ try:
+ return subprocess.check_output(['qcsapi'] + list(command)).strip()
+ except subprocess.CalledProcessError:
+ return None
+
+ def _wifiinfo_filename(self):
+ return os.path.join(self.WIFIINFO_PATH, self._interface)
+
+ def _get_wifiinfo(self):
+ try:
+ return json.load(open(self._wifiinfo_filename()))
+ except IOError:
+ return None
+
+ def _get_ssid(self):
+ wifiinfo = self._get_wifiinfo()
+ if wifiinfo:
+ return wifiinfo.get('SSID')
+
+ def _check_client_mode(self):
+ return self._qcsapi('get_mode', 'wifi0') == 'Station'
+
+ def attach(self):
+ self._update()
+
+ def attached(self):
+ return self._client_mode and self._ssid
+
+ def detach(self):
+ raise wpactrl.error('Real WPACtrl always raises this when detaching.')
+
+ def pending(self):
+ self._update()
+ return bool(self._events)
+
+ def _update(self):
+ """Generate and cache events, update state."""
+ client_mode = self._check_client_mode()
+ ssid = self._get_ssid()
+ status = self._qcsapi('get_status', 'wifi0')
+
+ # If we have an SSID and are in client mode, and at least one of those is
+ # new, then we have just connected.
+ if client_mode and ssid and (not self._client_mode or ssid != self._ssid):
+ self._events.append('<2>CTRL-EVENT-CONNECTED')
+
+ # If we are in client mode but lost SSID, we disconnected.
+ if client_mode and self._ssid and not ssid:
+ self._events.append('<2>CTRL-EVENT-DISCONNECTED')
+
+ # If there is an auth/assoc failure, then status (above) is 'Error'. We
+ # really want the converse of this implication (i.e. that 'Error' implies an
+ # auth/assoc failure), but due to limited documentation this will have to
+ # do. It should be good enough: if something else causes get_status to
+ # return 'Error', we are probably not connected, and we don't do anything
+ # special with auth/assoc failures specifically.
+ if client_mode and status == 'Error' and self._status != 'Error':
+ self._events.append('<2>CTRL-EVENT-AUTH-REJECT')
+
+ # If we left client mode, wpa_supplicant has terminated.
+ if self._client_mode and not client_mode:
+ self._events.append('<2>CTRL-EVENT-TERMINATING')
+
+ self._client_mode = client_mode
+ self._ssid = ssid
+ self._status = status
+
+ def recv(self):
+ return self._events.pop(0)
+
+ def request(self, request_type):
+ if request_type == 'STATUS' and self.attached():
+ return 'wpa_state=COMPLETED\nssid=%s\n' % self._ssid
+
+ return ''
+
+
+class FrenzyWifi(Wifi):
+ """Represents a Frenzy wireless interface."""
+
+ # pylint: disable=invalid-name
+ WPACtrl = FrenzyWPACtrl
diff --git a/conman/interface_test.py b/conman/interface_test.py
index bea0d24..20579e3 100755
--- a/conman/interface_test.py
+++ b/conman/interface_test.py
@@ -2,6 +2,7 @@
"""Tests for connection_manager.py."""
+import json
import logging
import os
import shutil
@@ -102,6 +103,16 @@
def add_event(self, event):
self.events.append(event)
+ def add_connected_event(self):
+ self.add_event(Wifi.CONNECTED_EVENT)
+
+ def add_disconnected_event(self):
+ self.add_event(Wifi.DISCONNECTED_EVENT)
+
+ def add_terminating_event(self):
+ os.unlink(self._socket)
+ self.add_event(Wifi.TERMINATING_EVENT)
+
class Wifi(FakeInterfaceMixin, interface.Wifi):
"""Fake Wifi for testing."""
@@ -110,31 +121,85 @@
DISCONNECTED_EVENT = '<2>CTRL-EVENT-DISCONNECTED'
TERMINATING_EVENT = '<2>CTRL-EVENT-TERMINATING'
+ WPACtrl = FakeWPACtrl
+
def __init__(self, *args, **kwargs):
super(Wifi, self).__init__(*args, **kwargs)
self._initially_connected = False
- def attach_wpa_control(self, *args, **kwargs):
+ def attach_wpa_control(self, path):
+ open(os.path.join(path, self.name), 'w')
if self._initially_connected and self._wpa_control:
self._wpa_control.connected = True
- super(Wifi, self).attach_wpa_control(*args, **kwargs)
+ super(Wifi, self).attach_wpa_control(path)
- def get_wpa_control(self, socket):
- result = FakeWPACtrl(socket)
+ def get_wpa_control(self, *args, **kwargs):
+ result = super(Wifi, self).get_wpa_control(*args, **kwargs)
result.connected = self._initially_connected
return result
def add_connected_event(self):
if self.attached():
- self._wpa_control.add_event(self.CONNECTED_EVENT)
+ self._wpa_control.add_connected_event()
def add_disconnected_event(self):
if self.attached():
- self._wpa_control.add_event(self.DISCONNECTED_EVENT)
+ self._wpa_control.add_disconnected_event()
def add_terminating_event(self):
if self.attached():
- self._wpa_control.add_event(self.TERMINATING_EVENT)
+ self._wpa_control.add_terminating_event()
+
+
+class FrenzyWPACtrl(interface.FrenzyWPACtrl):
+
+ def __init__(self, *args, **kwargs):
+ super(FrenzyWPACtrl, self).__init__(*args, **kwargs)
+ self.fake_qcsapi = {}
+
+ def _qcsapi(self, *command):
+ return self.fake_qcsapi.get(command[0], None)
+
+ def add_connected_event(self):
+ json.dump({'SSID': 'my ssid'}, open(self._wifiinfo_filename(), 'w'))
+
+ def add_disconnected_event(self):
+ json.dump({'SSID': ''}, open(self._wifiinfo_filename(), 'w'))
+
+ def add_terminating_event(self):
+ self.fake_qcsapi['get_mode'] = 'AP'
+
+
+class FrenzyWifi(FakeInterfaceMixin, interface.FrenzyWifi):
+ WPACtrl = FrenzyWPACtrl
+
+ def __init__(self, *args, **kwargs):
+ super(FrenzyWifi, self).__init__(*args, **kwargs)
+ self._initially_connected = False
+
+ def attach_wpa_control(self, *args, **kwargs):
+ super(FrenzyWifi, self).attach_wpa_control(*args, **kwargs)
+ if self._wpa_control:
+ self._wpa_control.fake_qcsapi['get_mode'] = 'Station'
+
+ def get_wpa_control(self, *args, **kwargs):
+ result = super(FrenzyWifi, self).get_wpa_control(*args, **kwargs)
+ if self._initially_connected:
+ result.fake_qcsapi['get_mode'] = 'Station'
+ result.add_connected_event()
+ return result
+
+ def add_connected_event(self):
+ if self.attached():
+ self._wpa_control.add_connected_event()
+
+ def add_disconnected_event(self):
+ if self.attached():
+ self._wpa_control.add_disconnected_event()
+
+ def add_terminating_event(self):
+ if self.attached():
+ self._wpa_control.add_terminating_event()
@wvtest.wvtest
@@ -202,6 +267,48 @@
shutil.rmtree(tmp_dir)
+def generic_wifi_test(w, wpa_path):
+ # Not currently connected.
+ w.attach_wpa_control(wpa_path)
+ wvtest.WVFAIL(w.wpa_supplicant)
+
+ # pylint: disable=protected-access
+ wpa_control = w._wpa_control
+
+ # wpa_supplicant connects.
+ wpa_control.add_connected_event()
+ wvtest.WVFAIL(w.wpa_supplicant)
+ w.handle_wpa_events()
+ wvtest.WVPASS(w.wpa_supplicant)
+ w.set_gateway_ip('192.168.1.1')
+
+ # wpa_supplicant disconnects.
+ wpa_control.add_disconnected_event()
+ w.handle_wpa_events()
+ wvtest.WVFAIL(w.wpa_supplicant)
+
+ # Now, start over so we can test what happens when wpa_supplicant is already
+ # connected when we attach.
+ w.detach_wpa_control()
+ # pylint: disable=protected-access
+ w._initially_connected = True
+ w._initialized = False
+ w.attach_wpa_control(wpa_path)
+ wpa_control = w._wpa_control
+
+ # wpa_supplicant was already connected when we attached.
+ wvtest.WVPASS(w.wpa_supplicant)
+ wvtest.WVPASSEQ(w.initial_ssid, 'my ssid')
+ w.initialize()
+ wvtest.WVPASSEQ(w.initial_ssid, None)
+
+ # The wpa_supplicant process disconnects and terminates.
+ wpa_control.add_disconnected_event()
+ wpa_control.add_terminating_event()
+ w.handle_wpa_events()
+ wvtest.WVFAIL(w.wpa_supplicant)
+
+
@wvtest.wvtest
def wifi_test():
"""Test Wifi."""
@@ -211,53 +318,29 @@
try:
wpa_path = tempfile.mkdtemp()
- socket = os.path.join(wpa_path, w.name)
- open(socket, 'w')
-
- # Not currently connected.
- w.attach_wpa_control(wpa_path)
- wvtest.WVFAIL(w.wpa_supplicant)
-
- # pylint: disable=protected-access
- wpa_control = w._wpa_control
-
- # wpa_supplicant connects.
- wpa_control.add_event(Wifi.CONNECTED_EVENT)
- wvtest.WVFAIL(w.wpa_supplicant)
- w.handle_wpa_events()
- wvtest.WVPASS(w.wpa_supplicant)
- w.set_gateway_ip('192.168.1.1')
-
- # wpa_supplicant disconnects.
- wpa_control.add_event(Wifi.DISCONNECTED_EVENT)
- w.handle_wpa_events()
- wvtest.WVFAIL(w.wpa_supplicant)
-
- # Now, start over so we can test what happens when wpa_supplicant is already
- # connected when we attach.
- w.detach_wpa_control()
- # pylint: disable=protected-access
- w._initially_connected = True
- w._initialized = False
- w.attach_wpa_control(wpa_path)
- wpa_control = w._wpa_control
-
- # wpa_supplicant was already connected when we attached.
- wvtest.WVPASS(w.wpa_supplicant)
- wvtest.WVPASSEQ(w.initial_ssid, 'my ssid')
- w.initialize()
- wvtest.WVPASSEQ(w.initial_ssid, None)
-
- # The wpa_supplicant process disconnects and terminates.
- wpa_control.add_event(Wifi.DISCONNECTED_EVENT)
- wpa_control.add_event(Wifi.TERMINATING_EVENT)
- os.unlink(socket)
- w.handle_wpa_events()
- wvtest.WVFAIL(w.wpa_supplicant)
+ generic_wifi_test(w, wpa_path)
finally:
shutil.rmtree(wpa_path)
+@wvtest.wvtest
+def frenzy_wifi_test():
+ """Test FrenzyWifi."""
+ w = FrenzyWifi('wlan0', '20')
+ w.set_connection_check_result('succeed')
+ w.initialize()
+
+ try:
+ wpa_path = tempfile.mkdtemp()
+ FrenzyWifi.WPACtrl.WIFIINFO_PATH = wifiinfo_path = tempfile.mkdtemp()
+
+ generic_wifi_test(w, wpa_path)
+
+ finally:
+ shutil.rmtree(wpa_path)
+ shutil.rmtree(wifiinfo_path)
+
+
if __name__ == '__main__':
wvtest.wvtest_main()