conman: Export system status.
This will be useful for at least ledmonitor, possibly also gftest.
If this seems overengineered (which I recognize it very well may),
please see the comment at the top of status.py.
Change-Id: I509e35973d87900b2c84ee7192d307efbf229f38
diff --git a/conman/connection_manager.py b/conman/connection_manager.py
index 708954f..26fe6b6 100755
--- a/conman/connection_manager.py
+++ b/conman/connection_manager.py
@@ -17,6 +17,7 @@
import cycler
import interface
import iw
+import status
GFIBER_OUIS = ['f4:f5:e8']
VENDOR_IE_FEATURE_ID_AUTOPROVISIONING = '01'
@@ -53,7 +54,7 @@
WIFI_SETCLIENT = ['wifi', 'setclient', '--persist']
WIFI_STOPCLIENT = ['wifi', 'stopclient']
- def __init__(self, band, wifi, command_lines):
+ def __init__(self, band, wifi, command_lines, _status):
self.band = band
self.wifi = wifi
self.command = command_lines.splitlines()
@@ -63,6 +64,7 @@
self.passphrase = None
self.interface_suffix = None
self.access_point = None
+ self._status = _status
binwifi_option_attrs = {
'-s': 'ssid',
@@ -136,8 +138,10 @@
if self.passphrase:
env['WIFI_CLIENT_PSK'] = self.passphrase
try:
+ self._status.trying_wlan = True
subprocess.check_output(command, stderr=subprocess.STDOUT, env=env)
self.client_up = True
+ self._status.connected_to_wlan = True
logging.info('Started wifi client on %s GHz', self.band)
except subprocess.CalledProcessError as e:
logging.error('Failed to start wifi client: %s', e.output)
@@ -153,6 +157,8 @@
subprocess.check_output(self.WIFI_STOPCLIENT + ['-b', self.band],
stderr=subprocess.STDOUT)
self.client_up = False
+ # TODO(rofrankel): Make this work for dual-radio devices.
+ self._status.connected_to_wlan = False
logging.debug('Stopped wifi client on %s GHz', self.band)
except subprocess.CalledProcessError as e:
logging.error('Failed to stop wifi client: %s', e.output)
@@ -181,17 +187,18 @@
def __init__(self,
bridge_interface='br0',
- status_dir='/tmp/conman',
+ tmp_dir='/tmp/conman',
config_dir='/config/conman',
- moca_status_dir='/tmp/cwmp/monitoring/moca2',
+ moca_tmp_dir='/tmp/cwmp/monitoring/moca2',
wpa_control_interface='/var/run/wpa_supplicant',
run_duration_s=1, interface_update_period=5,
wifi_scan_period_s=120, wlan_retry_s=15, acs_update_wait_s=10):
- self._status_dir = status_dir
+ self._tmp_dir = tmp_dir
self._config_dir = config_dir
- self._interface_status_dir = os.path.join(status_dir, 'interfaces')
- self._moca_status_dir = moca_status_dir
+ self._interface_status_dir = os.path.join(tmp_dir, 'interfaces')
+ self._status_dir = os.path.join(tmp_dir, 'status')
+ self._moca_tmp_dir = moca_tmp_dir
self._wpa_control_interface = wpa_control_interface
self._run_duration_s = run_duration_s
self._interface_update_period = interface_update_period
@@ -200,7 +207,14 @@
self._acs_update_wait_s = acs_update_wait_s
self._wlan_configuration = {}
- acs_autoprov_filepath = os.path.join(self._status_dir,
+ # Make sure all necessary directories exist.
+ for directory in (self._tmp_dir, self._config_dir, self._moca_tmp_dir,
+ self._interface_status_dir, self._moca_tmp_dir):
+ if not os.path.exists(directory):
+ os.makedirs(directory)
+ logging.info('Created monitored directory: %s', directory)
+
+ acs_autoprov_filepath = os.path.join(self._tmp_dir,
'acs_autoprovisioning')
self.bridge = self.Bridge(
bridge_interface, '10',
@@ -224,22 +238,17 @@
for wifi in self.wifi:
wifi.last_wifi_scan_time = -self._wifi_scan_period_s
- # Make sure all necessary directories exist.
- for directory in (self._status_dir, self._config_dir,
- self._interface_status_dir, self._moca_status_dir):
- if not os.path.exists(directory):
- os.makedirs(directory)
- logging.info('Created monitored directory: %s', directory)
+ self._status = status.Status(self._status_dir)
wm = pyinotify.WatchManager()
wm.add_watch(self._config_dir,
pyinotify.IN_CLOSE_WRITE | pyinotify.IN_MOVED_TO |
pyinotify.IN_DELETE | pyinotify.IN_MOVED_FROM)
- wm.add_watch(self._status_dir,
+ wm.add_watch(self._tmp_dir,
pyinotify.IN_CLOSE_WRITE | pyinotify.IN_MOVED_TO)
wm.add_watch(self._interface_status_dir,
pyinotify.IN_CLOSE_WRITE | pyinotify.IN_MOVED_TO)
- wm.add_watch(self._moca_status_dir,
+ wm.add_watch(self._moca_tmp_dir,
pyinotify.IN_CLOSE_WRITE | pyinotify.IN_MOVED_TO)
self.notifier = pyinotify.Notifier(wm, FileChangeHandler(self), timeout=0)
@@ -261,9 +270,9 @@
if wifi_up:
wifi.attach_wpa_control(self._wpa_control_interface)
- for path, prefix in ((self._status_dir, self.GATEWAY_FILE_PREFIX),
+ for path, prefix in ((self._tmp_dir, self.GATEWAY_FILE_PREFIX),
(self._interface_status_dir, ''),
- (self._moca_status_dir, self.MOCA_NODE_FILE_PREFIX),
+ (self._moca_tmp_dir, self.MOCA_NODE_FILE_PREFIX),
(self._config_dir, self.COMMAND_FILE_PREFIX)):
for filepath in glob.glob(os.path.join(path, prefix + '*')):
self._process_file(path, os.path.split(filepath)[-1])
@@ -373,6 +382,7 @@
# If this interface is connected to the user's WLAN, there is nothing else
# to do.
if self._connected_to_wlan(wifi):
+ self._status.connected_to_wlan = True
logging.debug('Connected to WLAN on %s, nothing else to do.', wifi.name)
return
@@ -395,10 +405,12 @@
wlan_configuration.start_client()
if self._connected_to_wlan(wifi):
logging.debug('Joined WLAN on %s.', wifi.name)
+ self._status.connected_to_wlan = True
self._try_wlan_after[band] = 0
break
else:
logging.error('Failed to connect to WLAN on %s.', wifi.name)
+ self._status.connected_to_wlan = False
self._try_wlan_after[band] = time.time() + self._wlan_retry_s
else:
# If we are aren't on the WLAN, can ping the ACS, and haven't gotten a
@@ -408,6 +420,7 @@
# 2) cwmpd isn't writing a configuration, possibly because the device
# isn't registered to any accounts.
logging.debug('Unable to join WLAN on %s', wifi.name)
+ self._status.connected_to_wlan = False
if self.acs():
logging.debug('Connected to ACS on %s', wifi.name)
now = time.time()
@@ -437,16 +450,26 @@
time.sleep(max(0, self._run_duration_s - (time.time() - start_time)))
def acs(self):
- return self.bridge.acs() or any(wifi.acs() for wifi in self.wifi)
+ result = self.bridge.acs() or any(wifi.acs() for wifi in self.wifi)
+ self._status.can_reach_acs = result
+ return result
def internet(self):
- return self.bridge.internet() or any(wifi.internet() for wifi in self.wifi)
+ result = self.bridge.internet() or any(wifi.internet()
+ for wifi in self.wifi)
+ self._status.can_reach_internet = result
+ return result
def _update_interfaces_and_routes(self):
self.bridge.update_routes()
for wifi in self.wifi:
wifi.update_routes()
+ # Make sure these get called semi-regularly so that exported status is up-
+ # to-date.
+ self.acs()
+ self.internet()
+
def handle_event(self, path, filename, deleted):
if deleted:
self._handle_deleted_file(path, filename)
@@ -474,6 +497,8 @@
config.stop_client()
config.stop_access_point()
del self._wlan_configuration[band]
+ if not self._wlan_configuration:
+ self._status.have_config = False
def _process_file(self, path, filename):
"""Process or ignore an updated file in a watched directory."""
@@ -507,7 +532,7 @@
wifi = self.wifi_for_band(band)
if wifi:
self._update_wlan_configuration(
- self.WLANConfiguration(band, wifi, contents))
+ self.WLANConfiguration(band, wifi, contents, self._status))
elif filename.startswith(self.ACCESS_POINT_FILE_PREFIX):
match = re.match(self.ACCESS_POINT_FILE_REGEXP, filename)
if match:
@@ -517,7 +542,7 @@
self._wlan_configuration[band].access_point = True
logging.debug('AP enabled for %s GHz', band)
- elif path == self._status_dir:
+ elif path == self._tmp_dir:
if filename.startswith(self.GATEWAY_FILE_PREFIX):
interface_name = filename.split(self.GATEWAY_FILE_PREFIX)[-1]
ifc = self.interface_by_name(interface_name)
@@ -526,7 +551,7 @@
logging.debug('Received gateway %r for interface %s', contents,
ifc.name)
- elif path == self._moca_status_dir:
+ elif path == self._moca_tmp_dir:
match = re.match(r'^%s\d+$' % self.MOCA_NODE_FILE_PREFIX, filename)
if match:
try:
@@ -596,16 +621,25 @@
bss_info = wifi.cycler.next()
if bss_info is not None:
+ self._status.trying_open = True
connected = subprocess.call(self.WIFI_SETCLIENT +
['--ssid', bss_info.ssid,
'--band', wifi.bands[0],
'--bssid', bss_info.bssid]) == 0
if connected:
+ self._status.connected_to_open = True
now = time.time()
wifi.waiting_for_acs_since = now
wifi.complain_about_acs_at = now + 5
logging.info('Attempting to provision via SSID %s', bss_info.ssid)
return connected
+ else:
+ # TODO(rofrankel): There are probably more cases in which this should be
+ # true, e.g. if we keep trying the same few unsuccessful BSSIDs.
+ # Relatedly, once we find ACS access on an open network we may want to
+ # save that SSID/BSSID and that first in future. If we do that then we
+ # can declare that provisioning has failed much more aggressively.
+ self._status.provisioning_failed = True
return False
@@ -628,6 +662,7 @@
if wlan_configuration.interface_suffix else '') + band)
wlan_configuration.access_point = os.path.exists(ap_file)
self._wlan_configuration[band] = wlan_configuration
+ self._status.have_config = True
logging.debug('Updated WLAN configuration for %s GHz', band)
self._update_access_point(wlan_configuration)
diff --git a/conman/connection_manager_test.py b/conman/connection_manager_test.py
index 335f7ee..9e582e7 100755
--- a/conman/connection_manager_test.py
+++ b/conman/connection_manager_test.py
@@ -10,6 +10,7 @@
import connection_manager
import interface_test
import iw
+import status
from wvtest import wvtest
logging.basicConfig(level=logging.DEBUG)
@@ -160,7 +161,7 @@
return os.path.join(self._wpa_control_interface, self.wifi.name)
def write_gateway_file(self):
- gateway_file = os.path.join(self.status_dir,
+ gateway_file = os.path.join(self.tmp_dir,
self.gateway_file_prefix + self.wifi.name)
with open(gateway_file, 'w') as f:
# This value doesn't matter to conman, so it's fine to hard code it here.
@@ -280,7 +281,7 @@
def _update_wlan_configuration(self, wlan_configuration):
wlan_configuration.command.insert(0, 'echo')
wlan_configuration._wpa_control_interface = self._wpa_control_interface
- wlan_configuration.status_dir = self._status_dir
+ wlan_configuration.tmp_dir = self._tmp_dir
wlan_configuration.interface_status_dir = self._interface_status_dir
wlan_configuration.gateway_file_prefix = self.GATEWAY_FILE_PREFIX
@@ -347,7 +348,7 @@
os.unlink(ap_filename)
def write_gateway_file(self, interface_name):
- gateway_file = os.path.join(self._status_dir,
+ gateway_file = os.path.join(self._tmp_dir,
self.GATEWAY_FILE_PREFIX + interface_name)
with open(gateway_file, 'w') as f:
# This value doesn't matter to conman, so it's fine to hard code it here.
@@ -363,7 +364,7 @@
self.ifplugd_action('eth0', up)
def set_moca(self, up):
- moca_node1_file = os.path.join(self._moca_status_dir,
+ moca_node1_file = os.path.join(self._moca_tmp_dir,
self.MOCA_NODE_FILE_PREFIX + '1')
with open(moca_node1_file, 'w') as f:
f.write(FAKE_MOCA_NODE1_FILE if up else
@@ -381,6 +382,9 @@
while wifi_scan_counter == wifi.wifi_scan_counter:
self.run_once()
+ def has_status_files(self, files):
+ return not set(files) - set(os.listdir(self._status_dir))
+
def connection_manager_test(radio_config, **cm_kwargs):
"""Returns a decorator that does ConnectionManager test boilerplate."""
@@ -399,18 +403,18 @@
try:
# No initial state.
- status_dir = tempfile.mkdtemp()
+ tmp_dir = tempfile.mkdtemp()
config_dir = tempfile.mkdtemp()
- os.mkdir(os.path.join(status_dir, 'interfaces'))
- moca_status_dir = tempfile.mkdtemp()
+ os.mkdir(os.path.join(tmp_dir, 'interfaces'))
+ moca_tmp_dir = tempfile.mkdtemp()
wpa_control_interface = tempfile.mkdtemp()
# Test that missing directories are created by ConnectionManager.
- shutil.rmtree(status_dir)
+ shutil.rmtree(tmp_dir)
- c = ConnectionManager(status_dir=status_dir,
+ c = ConnectionManager(tmp_dir=tmp_dir,
config_dir=config_dir,
- moca_status_dir=moca_status_dir,
+ moca_tmp_dir=moca_tmp_dir,
wpa_control_interface=wpa_control_interface,
run_duration_s=run_duration_s,
interface_update_period=interface_update_period,
@@ -421,11 +425,10 @@
c.test_wifi_scan_period = wifi_scan_period
f(c)
-
finally:
- shutil.rmtree(status_dir)
+ shutil.rmtree(tmp_dir)
shutil.rmtree(config_dir)
- shutil.rmtree(moca_status_dir)
+ shutil.rmtree(moca_tmp_dir)
shutil.rmtree(wpa_control_interface)
# pylint: disable=protected-access
connection_manager._wifi_show = original_wifi_show
@@ -450,12 +453,14 @@
# ConnectionManager cares that the file is created *where* expected, but it is
# Bridge's responsbility to make sure its creation and deletion are generally
# correct; more thorough tests are in bridge_test in interface_test.py.
- acs_autoprov_filepath = os.path.join(c._status_dir, 'acs_autoprovisioning')
+ acs_autoprov_filepath = os.path.join(c._tmp_dir, 'acs_autoprovisioning')
# Initially, there is ethernet access (via explicit check of ethernet status,
# rather than the interface status file).
wvtest.WVPASS(c.acs())
wvtest.WVPASS(c.internet())
+ wvtest.WVPASS(c.has_status_files([status.P.CAN_REACH_ACS,
+ status.P.CAN_REACH_INTERNET]))
c.run_once()
wvtest.WVPASS(c.acs())
@@ -464,6 +469,8 @@
wvtest.WVPASS(os.path.exists(acs_autoprov_filepath))
wvtest.WVFAIL(c.wifi_for_band('2.4').current_route())
wvtest.WVFAIL(c.wifi_for_band('5').current_route())
+ wvtest.WVFAIL(c.has_status_files([status.P.CONNECTED_TO_WLAN,
+ status.P.HAVE_CONFIG]))
# Take down ethernet, no access.
c.set_ethernet(False)
@@ -472,6 +479,8 @@
wvtest.WVFAIL(c.internet())
wvtest.WVFAIL(c.bridge.current_route())
wvtest.WVFAIL(os.path.exists(acs_autoprov_filepath))
+ wvtest.WVFAIL(c.has_status_files([status.P.CAN_REACH_ACS,
+ status.P.CAN_REACH_INTERNET]))
# Bring up moca, access.
c.set_moca(True)
@@ -522,6 +531,7 @@
c.run_until_scan('2.4')
for _ in range(3):
c.run_once()
+ wvtest.WVPASS(c.has_status_files([status.P.CONNECTED_TO_OPEN]))
wvtest.WVPASSEQ(c.last_provisioning_attempt.ssid, 's2')
wvtest.WVPASSEQ(c.last_provisioning_attempt.bssid, '01:23:45:67:89:ab')
# Wait for the connection to be processed.
@@ -540,6 +550,7 @@
c.run_once()
wvtest.WVPASS(c.client_up('2.4'))
wvtest.WVPASS(c.wifi_for_band('2.4').current_route())
+ wvtest.WVPASS(c.has_status_files([status.P.CONNECTED_TO_WLAN]))
# Now enable the AP. Since we have no wired connection, this should have no
# effect.
@@ -570,6 +581,7 @@
wvtest.WVFAIL(c.client_up('2.4'))
wvtest.WVFAIL(c.wifi_for_band('2.4').current_route())
wvtest.WVPASS(c.bridge.current_route())
+ wvtest.WVFAIL(c.has_status_files([status.P.HAVE_CONFIG]))
# Now move it back, and the AP should come back.
os.rename(other_filename, filename)
@@ -754,7 +766,6 @@
wvtest.WVPASS(c.wifi_for_band('5').current_route())
-
@wvtest.wvtest
@connection_manager_test(WIFI_SHOW_OUTPUT_ONE_RADIO_NO_5GHZ)
def connection_manager_test_one_radio_no_5ghz(c):
diff --git a/conman/main.py b/conman/main.py
index 3b81bfd..b6632a8 100755
--- a/conman/main.py
+++ b/conman/main.py
@@ -8,7 +8,7 @@
import connection_manager
-STATUS_DIR = '/tmp/conman'
+TMP_DIR = '/tmp/conman'
if __name__ == '__main__':
loglevel = logging.INFO
@@ -20,8 +20,8 @@
sys.stdout = os.fdopen(1, 'w', 1) # force line buffering even if redirected
sys.stderr = os.fdopen(2, 'w', 1) # force line buffering even if redirected
- if not os.path.exists(STATUS_DIR):
- os.makedirs(STATUS_DIR)
+ if not os.path.exists(TMP_DIR):
+ os.makedirs(TMP_DIR)
- c = connection_manager.ConnectionManager(status_dir=STATUS_DIR)
+ c = connection_manager.ConnectionManager(tmp_dir=TMP_DIR)
c.run()
diff --git a/conman/status.py b/conman/status.py
new file mode 100644
index 0000000..118bafc
--- /dev/null
+++ b/conman/status.py
@@ -0,0 +1,179 @@
+#!/usr/bin/python
+
+"""Tracks and exports conman status information for e.g. ledmonitor."""
+
+# This may seem over-engineered, but conman has enough loosely-coupled moving
+# parts that it is worth being able to reason formally and separately about the
+# state of the system. Otherwise it would be very easy for new conman code to
+# create subtle bugs in e.g. LED behavior.
+
+import inspect
+import logging
+import os
+
+
+class P(object):
+ """Enumerate propositions about conman status.
+
+ Using class attributes rather than just strings will help prevent typos.
+ """
+
+ TRYING_OPEN = 'TRYING_OPEN'
+ TRYING_WLAN = 'TRYING_WLAN'
+ CONNECTED_TO_OPEN = 'CONNECTED_TO_OPEN'
+ CONNECTED_TO_WLAN = 'CONNECTED_TO_WLAN'
+ HAVE_CONFIG = 'HAVE_CONFIG'
+ HAVE_WORKING_CONFIG = 'HAVE_WORKING_CONFIG'
+ CAN_REACH_ACS = 'CAN_REACH_ACS'
+ # Were we able to connect to the ACS last time we expected to be able to?
+ COULD_REACH_ACS = 'COULD_REACH_ACS'
+ CAN_REACH_INTERNET = 'CAN_REACH_INTERNET'
+ PROVISIONING_FAILED = 'PROVISIONING_FAILED'
+
+
+# Format: { proposition: (implications, counter-implications), ... }
+# If you want to add a new proposition to the Status class, just edit this dict.
+IMPLICATIONS = {
+ P.TRYING_OPEN: (
+ (),
+ (P.CONNECTED_TO_OPEN, P.TRYING_WLAN, P.CONNECTED_TO_WLAN)
+ ),
+ P.TRYING_WLAN: (
+ (),
+ (P.TRYING_OPEN, P.CONNECTED_TO_OPEN, P.CONNECTED_TO_WLAN)
+ ),
+ P.CONNECTED_TO_OPEN: (
+ (),
+ (P.CONNECTED_TO_WLAN, P.TRYING_OPEN, P.TRYING_WLAN)
+ ),
+ P.CONNECTED_TO_WLAN: (
+ (P.HAVE_WORKING_CONFIG,),
+ (P.CONNECTED_TO_OPEN, P.TRYING_OPEN, P.TRYING_WLAN)
+ ),
+ P.CAN_REACH_ACS: (
+ (P.COULD_REACH_ACS,),
+ (P.TRYING_OPEN, P.TRYING_WLAN)
+ ),
+ P.COULD_REACH_ACS: (
+ (),
+ (P.PROVISIONING_FAILED,),
+ ),
+ P.PROVISIONING_FAILED: (
+ (),
+ (P.COULD_REACH_ACS,),
+ ),
+ P.HAVE_WORKING_CONFIG: (
+ (),
+ (P.HAVE_CONFIG,),
+ ),
+}
+
+
+class Proposition(object):
+ """Represents a proposition.
+
+ May imply truth or falsity of other propositions.
+ """
+
+ def __init__(self, name, export_path):
+ self._name = name
+ self._export_path = export_path
+ self._value = False
+ self._implications = set()
+ self._counter_implications = set()
+ self._impliers = set()
+ self._counter_impliers = set()
+
+ def implies(self, implication):
+ self._counter_implications.discard(implication)
+ self._implications.add(implication)
+ # pylint: disable=protected-access
+ implication._implied_by(self)
+
+ def implies_not(self, counter_implication):
+ self._implications.discard(counter_implication)
+ self._counter_implications.add(counter_implication)
+ # pylint: disable=protected-access
+ counter_implication._counter_implied_by(self)
+
+ def _implied_by(self, implier):
+ self._counter_impliers.discard(implier)
+ self._impliers.add(implier)
+
+ def _counter_implied_by(self, counter_implier):
+ self._impliers.discard(counter_implier)
+ self._counter_impliers.add(counter_implier)
+
+ def set(self, value):
+ if value == self._value:
+ return
+
+ self._value = value
+ self.export()
+ logging.debug('%s is now %s', self._name, self._value)
+
+ if value:
+ for implication in self._implications:
+ implication.set(True)
+ for counter_implication in self._counter_implications:
+ counter_implication.set(False)
+ # Contrapositive: (A -> ~B) -> (B -> ~A)
+ for counter_implier in self._counter_impliers:
+ counter_implier.set(False)
+ # Contrapositive: (A -> B) -> (~B -> ~A)
+ else:
+ for implier in self._impliers:
+ implier.set(False)
+
+ def export(self):
+ filepath = os.path.join(self._export_path, self._name)
+ if self._value:
+ if not os.path.exists(filepath):
+ open(filepath, 'w')
+ else:
+ if os.path.exists(filepath):
+ os.unlink(filepath)
+
+
+class Status(object):
+ """Provides a convenient API for conman to describe system status."""
+
+ def __init__(self, export_path):
+ if not os.path.isdir(export_path):
+ os.makedirs(export_path)
+
+ self._export_path = export_path
+
+ self._propositions = {
+ p: Proposition(p, self._export_path)
+ for p in dict(inspect.getmembers(P)) if not p.startswith('_')
+ }
+
+ for p, (implications, counter_implications) in IMPLICATIONS.iteritems():
+ for implication in implications:
+ self._propositions[p].implies(self._propositions[implication])
+ for counter_implication in counter_implications:
+ self._propositions[p].implies_not(
+ self._propositions[counter_implication])
+
+ def _proposition(self, p):
+ return self._propositions[p]
+
+ def __setattr__(self, attr, value):
+ """Allow setting of propositions with attributes.
+
+ If _propositions contains an attribute 'FOO', then `Status().foo = True`
+ will set that Proposition to True. This means that this class doesn't have
+ to be changed when IMPLICATIONS is updated.
+
+ Args:
+ attr: The attribute name.
+ value: The attribute value.
+ """
+ if hasattr(self, '_propositions') and not hasattr(self, attr):
+ if attr.islower():
+ if attr.upper() in self._propositions:
+ self._propositions[attr.upper()].set(value)
+ return
+
+ super(Status, self).__setattr__(attr, value)
diff --git a/conman/status_test.py b/conman/status_test.py
new file mode 100755
index 0000000..03223f8
--- /dev/null
+++ b/conman/status_test.py
@@ -0,0 +1,122 @@
+#!/usr/bin/python
+
+"""Tests for connection_manager.py."""
+
+import logging
+import os
+import shutil
+import tempfile
+
+import status
+from wvtest import wvtest
+
+logging.basicConfig(level=logging.DEBUG)
+
+
+def file_in(path, filename):
+ return os.path.exists(os.path.join(path, filename))
+
+
+@wvtest.wvtest
+def test_proposition():
+ export_path = tempfile.mkdtemp()
+
+ try:
+ rain = status.Proposition('rain', export_path)
+ wet = status.Proposition('wet', export_path)
+ dry = status.Proposition('dry', export_path)
+
+ rain.implies(wet)
+ wet.implies_not(dry)
+
+ # Test basics.
+ rain.set(True)
+ wvtest.WVPASS(file_in(export_path, 'rain'))
+ wvtest.WVPASS(file_in(export_path, 'wet'))
+ wvtest.WVFAIL(file_in(export_path, 'dry'))
+
+ # It may be wet even if it is not raining, but even in that case it is still
+ # not dry.
+ rain.set(False)
+ wvtest.WVFAIL(file_in(export_path, 'rain'))
+ wvtest.WVPASS(file_in(export_path, 'wet'))
+ wvtest.WVFAIL(file_in(export_path, 'dry'))
+
+ # Test contrapositives.
+ dry.set(True)
+ wvtest.WVFAIL(file_in(export_path, 'rain'))
+ wvtest.WVFAIL(file_in(export_path, 'wet'))
+ wvtest.WVPASS(file_in(export_path, 'dry'))
+
+ # Make sure cycles are okay.
+ tautology = status.Proposition('tautology', export_path)
+ tautology.implies(tautology)
+ tautology.set(True)
+ wvtest.WVPASS(file_in(export_path, 'tautology'))
+
+ zig = status.Proposition('zig', export_path)
+ zag = status.Proposition('zag', export_path)
+ zig.implies(zag)
+ zag.implies(zig)
+ zig.set(True)
+ wvtest.WVPASS(file_in(export_path, 'zig'))
+ wvtest.WVPASS(file_in(export_path, 'zag'))
+ zag.set(False)
+ wvtest.WVFAIL(file_in(export_path, 'zig'))
+ wvtest.WVFAIL(file_in(export_path, 'zag'))
+
+ finally:
+ shutil.rmtree(export_path)
+
+
+@wvtest.wvtest
+def test_status():
+ export_path = tempfile.mkdtemp()
+
+ try:
+ s = status.Status(export_path)
+
+ # Sanity check that there are no contradictions.
+ for p, (want_true, want_false) in status.IMPLICATIONS.iteritems():
+ setattr(s, p.lower(), True)
+ wvtest.WVPASS(file_in(export_path, p))
+ for wt in want_true:
+ wvtest.WVPASS(file_in(export_path, wt))
+ for wf in want_false:
+ wvtest.WVFAIL(file_in(export_path, wf))
+
+ s.trying_wlan = True
+ wvtest.WVPASS(file_in(export_path, status.P.TRYING_WLAN))
+ wvtest.WVFAIL(file_in(export_path, status.P.CONNECTED_TO_WLAN))
+
+ s.connected_to_open = True
+ wvtest.WVPASS(file_in(export_path, status.P.CONNECTED_TO_OPEN))
+ wvtest.WVFAIL(file_in(export_path, status.P.CONNECTED_TO_WLAN))
+
+ s.connected_to_wlan = True
+ wvtest.WVPASS(file_in(export_path, status.P.CONNECTED_TO_WLAN))
+ wvtest.WVPASS(file_in(export_path, status.P.HAVE_WORKING_CONFIG))
+ wvtest.WVFAIL(file_in(export_path, status.P.CONNECTED_TO_OPEN))
+ wvtest.WVFAIL(file_in(export_path, status.P.TRYING_WLAN))
+ wvtest.WVFAIL(file_in(export_path, status.P.TRYING_OPEN))
+
+ s.can_reach_acs = True
+ s.can_reach_internet = True
+ wvtest.WVPASS(file_in(export_path, status.P.CAN_REACH_ACS))
+ wvtest.WVPASS(file_in(export_path, status.P.COULD_REACH_ACS))
+ wvtest.WVPASS(file_in(export_path, status.P.CAN_REACH_INTERNET))
+ wvtest.WVFAIL(file_in(export_path, status.P.PROVISIONING_FAILED))
+
+ # These should not have changed
+ wvtest.WVPASS(file_in(export_path, status.P.CONNECTED_TO_WLAN))
+ wvtest.WVPASS(file_in(export_path, status.P.HAVE_WORKING_CONFIG))
+ wvtest.WVFAIL(file_in(export_path, status.P.CONNECTED_TO_OPEN))
+ wvtest.WVFAIL(file_in(export_path, status.P.TRYING_WLAN))
+ wvtest.WVFAIL(file_in(export_path, status.P.TRYING_OPEN))
+
+ finally:
+ shutil.rmtree(export_path)
+
+
+if __name__ == '__main__':
+ wvtest.wvtest_main()