blob: 3f5cdc9c38b854f3b2cd55274590b25a4f65dc24 [file] [log] [blame]
#!/usr/bin/python
"""Fake /bin/wifi implementation."""
import collections
import os
import random
import connection_check
import get_quantenna_interfaces
import hotplug
import ifplugd_action
import ifup
import qcsapi
import wpa_cli
MockInterface = collections.namedtuple('MockInterface',
['phynum', 'bands', 'driver'])
# A randomly selceted wifi scan result with the interesting stuff templated.
WIFI_SCAN_TPL = '''BSS {bssid}(on wcli0)
TSF: 1269828266773 usec (14d, 16:43:48)
freq: {freq}
beacon interval: 100 TUs
capability: ESS Privacy ShortSlotTime (0x0411)
signal: {rssi}
last seen: 2190 ms ago
Information elements from Probe Response frame:
{vendor_ies}
SSID: {ssid}
Supported rates: 1.0* 2.0* 5.5* 11.0* 18.0 24.0 36.0 54.0
DS Parameter set: channel 6
ERP: <no flags>
ERP D4.0: <no flags>
{security}
Extended supported rates: 6.0 9.0 12.0 48.0
'''
VENDOR_IE_TPL = ' Vendor specific: OUI {oui}, data: {data}'
WIFI_SHOW_TPL = '''Band: {band}
RegDomain: US
Interface: wlan{phynum} # {band} GHz ap
BSSID: f4:f5:e8:81:1b:a0
AutoChannel: True
AutoType: NONDFS
Station List for band: {band}
Client Interface: wcli{phynum} # {band} GHz client
Client BSSID: f4:f5:e8:81:1b:a1
'''
WIFI_SHOW_NO_RADIO_TPL = '''Band: {band}
RegDomain: 00
'''
WPA_PATH = None
REMOTE_ACCESS_POINTS = collections.defaultdict(dict)
INTERFACE_FOR_BAND = collections.defaultdict(lambda: None)
INTERFACE_EVENTS = collections.defaultdict(list)
LOCAL_ACCESS_POINTS = {}
CLIENT_ASSOCIATIONS = {}
class AccessPoint(object):
def __init__(self, **kwargs):
self._attrs = ('ssid', 'psk', 'band', 'bssid', 'security', 'rssi',
'vendor_ies', 'connection_check_result', 'hidden')
for attr in self._attrs:
setattr(self, attr, kwargs.get(attr, None))
self.stuck = False
def scan_str(self):
security_strs = {
'WEP': ' Privacy: WEP',
'WPA': ' WPA:',
'WPA2': ' RSN: * Version: 1',
}
return WIFI_SCAN_TPL.format(
ssid=self.ssid if not self.hidden else '',
freq='2437' if self.band == '2.4' else '5160',
bssid=self.bssid,
vendor_ies='\n'.join(VENDOR_IE_TPL.format(oui=oui, data=data)
for oui, data in (self.vendor_ies or [])),
rssi='%.2f dBm' % (self.rssi or 0),
security=security_strs.get(self.security, ''))
def __str__(self):
return 'AccessPoint<%s>' % ' '.join('%s=%s' % (attr, getattr(self, attr))
for attr in self._attrs)
def __repr__(self):
return str(self)
def call(*args, **kwargs):
wifi_commands = {
'scan': _scan,
'set': _set,
'stopap': _stopap,
'setclient': _setclient,
'stopclient': _stopclient,
'stop': _stop,
'show': _show,
}
if WPA_PATH is None and args[0].endswith('client'):
raise ValueError('Set subprocess.wifi.WPA_PATH before calling a fake '
'"wifi *client" command')
if args[0] in wifi_commands:
return wifi_commands[args[0]](args[1:], env=kwargs.get('env', {}))
return 99, 'unrecognized command %s' % args[0]
def _set(args, env=None):
band = _get_flag(args, ('-b', '--band'))
LOCAL_ACCESS_POINTS[band] = args, env
return 0, ''
def _stopap(args, env=None):
bands = _get_flag(args, ('-b', '--band')) or '2.4 5'
for band in bands.split():
if band in LOCAL_ACCESS_POINTS:
del LOCAL_ACCESS_POINTS[band]
return 0, ''
def _setclient(args, env=None):
env = env or {}
band = _get_flag(args, ('-b', '--band'))
bssid = _get_flag(args, ('--bssid',))
ssid = _get_flag(args, ('-S', '--ssid',))
force_restart = '--force-restart' in args
if band not in INTERFACE_FOR_BAND:
raise ValueError('No interface for band %r' % band)
interface = INTERFACE_FOR_BAND[band]
interface_name = 'wcli%s' % interface.phynum
if bssid:
ap = REMOTE_ACCESS_POINTS[band].get(bssid, None)
if not ap or ap.ssid != ssid:
_setclient_error_not_found(interface_name, ssid, interface.driver)
return 1, ('AP with band %r and BSSID %r and ssid %s not found: %s'
% (band, bssid, ssid, REMOTE_ACCESS_POINTS))
elif ssid:
candidates = [ap for ap in REMOTE_ACCESS_POINTS[band].itervalues()
if ap.ssid == ssid]
if not candidates:
_setclient_error_not_found(interface_name, ssid, interface.driver)
return 1, 'AP with SSID %r not found: %s' % (ssid, REMOTE_ACCESS_POINTS)
ap = random.choice(candidates)
else:
raise ValueError('Did not specify BSSID or SSID in %r' % args)
psk = env.get('WIFI_CLIENT_PSK', None)
if psk != ap.psk:
_setclient_error_auth(interface_name, ssid, interface.driver)
return 1, 'Wrong PSK, got %r, expected %r' % (psk, ap.psk)
_setclient_success(interface_name, ssid, bssid, psk, interface.driver, ap,
band, force_restart)
return 0, ''
def _setclient_error_not_found(interface_name, ssid, driver):
if driver == 'cfg80211':
wpa_cli.mock(interface_name, wpa_state='SCANNING')
elif driver == 'frenzy':
qcsapi.mock('get_mode', 'wifi0', value='Station')
qcsapi.mock('get_ssid', 'wifi0', value='')
qcsapi.mock('ssid_get_authentication_mode', 'wifi0', ssid, value='')
qcsapi.mock('get_status', 'wifi0', value='Error')
CLIENT_ASSOCIATIONS[interface_name] = None
def _setclient_error_auth(interface_name, ssid, driver):
if driver == 'cfg80211':
# This is what our version of wpa_supplicant does for auth failures.
INTERFACE_EVENTS[interface_name].append('<2>CTRL-EVENT-SSID-TEMP-DISABLED')
wpa_cli.mock(interface_name, wpa_state='SCANNING')
elif driver == 'frenzy':
qcsapi.mock('get_mode', 'wifi0', value='Station')
qcsapi.mock('get_ssid', 'wifi0', value='')
qcsapi.mock('ssid_get_authentication_mode', 'wifi0', ssid, value='')
qcsapi.mock('get_status', 'wifi0', value='Error')
CLIENT_ASSOCIATIONS[interface_name] = None
def _setclient_success(interface_name, ssid, bssid, psk, driver, ap, band,
force_restart):
same_ap = False
current_association = CLIENT_ASSOCIATIONS.get(interface_name, None)
if current_association:
same_ap = current_association == ap
else:
_disconnected_event(band)
if driver == 'cfg80211':
if same_ap and not force_restart:
return
# Make sure the association is unstuck after a firmware reset.
if current_association:
current_association.stuck = False
# Make sure the wpa_supplicant socket exists.
open(os.path.join(WPA_PATH, interface_name), 'w')
# Tell wpa_cli what to return.
key_mgmt = 'WPA2-PSK' if psk else 'NONE'
wpa_cli.mock(interface_name, wpa_state='COMPLETED', ssid=ssid, bssid=bssid,
key_mgmt=key_mgmt)
# Send the CONNECTED event.
INTERFACE_EVENTS[interface_name].append('<2>CTRL-EVENT-CONNECTED')
elif driver == 'frenzy':
qcsapi.mock('get_mode', 'wifi0', value='Station')
qcsapi.mock('get_ssid', 'wifi0', value=ssid)
qcsapi.mock('ssid_get_authentication_mode', 'wifi0', ssid,
value='PSKAuthentication' if psk else 'NONE')
qcsapi.mock('get_status', 'wifi0', value='')
CLIENT_ASSOCIATIONS[interface_name] = ap
connection_check.mock(interface_name, ap.connection_check_result or 'succeed')
# Call ifplugd.action for the interface coming up (wifi/quantenna.py does this
# manually).
ifplugd_action.call(interface_name, 'up')
def _disconnected_event(band):
interface = INTERFACE_FOR_BAND[band]
interface_name = 'wcli%s' % interface.phynum
if interface.driver == 'cfg80211':
INTERFACE_EVENTS[interface_name].append('<2>CTRL-EVENT-DISCONNECTED')
wpa_cli.mock(interface_name, wpa_state='SCANNING')
else:
qcsapi.mock('get_ssid', 'wifi0', value='')
qcsapi.mock('get_status', 'wifi0', value='Error')
CLIENT_ASSOCIATIONS[interface_name] = None
def _stopclient(args, env=None):
bands = _get_flag(args, ('-b', '--band')) or '2.4 5'
for band in bands.split():
interface = INTERFACE_FOR_BAND[band]
interface_name = 'wcli%s' % interface.phynum
if interface.driver == 'cfg80211':
# Send the DISCONNECTED and TERMINATING events.
INTERFACE_EVENTS[interface_name].append('<2>CTRL-EVENT-DISCONNECTED')
INTERFACE_EVENTS[interface_name].append('<2>CTRL-EVENT-TERMINATING')
# Clear the wpa_cli status response.
wpa_cli.mock(interface_name)
# Make sure the wpa_supplicant socket does not.
if os.path.exists(os.path.join(WPA_PATH, interface_name)):
os.unlink(os.path.join(WPA_PATH, interface_name))
elif interface.driver == 'frenzy':
qcsapi.mock('get_ssid', 'wifi0', value='')
qcsapi.mock('get_status', 'wifi0', value='Error')
CLIENT_ASSOCIATIONS[interface_name] = None
# Call ifplugd.action for the interface going down (wifi/quantenna.py does
# this manually).
ifplugd_action.call(interface_name, 'down')
return 0, ''
def _stop(*args, **kwargs):
_stopap(*args, **kwargs)
_stopclient(*args, **kwargs)
return 0, ''
def _kill_wpa_supplicant(band):
# From conman's perspective, there's no difference between someone running
# 'wifi stopclient' and the process dying for some other reason.
_stopclient(['--band', band])
def _scan(args, **unused_kwargs):
band_flag = _get_flag(args, ('-b', '--band'))
interface = INTERFACE_FOR_BAND[band_flag]
interface_name = 'wcli%s' % interface.phynum
if not ifup.INTERFACE_STATE.get(interface_name, False):
return 1, 'interface down'
return 0, '\n'.join(ap.scan_str()
for band in interface.bands
for ap in REMOTE_ACCESS_POINTS[band].itervalues())
def _show(unused_args, **unused_kwargs):
return 0, '\n\n'.join(WIFI_SHOW_TPL.format(band=band, **interface._asdict())
if interface
else WIFI_SHOW_NO_RADIO_TPL.format(band)
for band, interface in INTERFACE_FOR_BAND.iteritems())
def _get_flag(args, flags):
for flag in flags:
if flag in args:
return args[args.index(flag) + 1]
def mock(command, *args, **kwargs):
if command == 'remote_ap':
remote_ap = AccessPoint(**kwargs)
REMOTE_ACCESS_POINTS[kwargs['band']][kwargs['bssid']] = remote_ap
elif command == 'remote_ap_remove':
del REMOTE_ACCESS_POINTS[kwargs['band']][kwargs['bssid']]
elif command == 'interfaces':
INTERFACE_FOR_BAND.clear()
for interface in args:
for band in interface.bands:
INTERFACE_FOR_BAND[band] = interface
if interface.driver == 'frenzy':
get_quantenna_interfaces.mock(
fmt % interface.phynum
for fmt in ('wlan%s', 'wlan%s_portal', 'wcli%s'))
elif command == 'wpa_path':
global WPA_PATH
WPA_PATH = args[0]
elif command == 'disconnected_event':
_disconnected_event(args[0])
elif command == 'kill_wpa_supplicant':
_kill_wpa_supplicant(args[0])
elif command == 'mwifiex_reset':
interface_name, band = args[0:2]
print CLIENT_ASSOCIATIONS
association = CLIENT_ASSOCIATIONS.get(interface_name, None)
association.stuck = True
wpa_cli.mock(interface_name, wpa_state='SCANNING')
hotplug.call([], env={'SUBSYSTEM': 'net', 'ACTION': 'add',
'INTERFACE': args[0]})