blob: 56d3f987634b166924e55b3b081e5dc9d595de9d [file] [log] [blame]
#!/usr/bin/python
# Copyright 2014 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TR-069 has mandatory attribute names that don't comply with policy
# pylint: disable=invalid-name
"""Implementation of tr-98 WLAN objects for /bin/wifi.
The platform code is expected to set the BSSID (which is really a MAC address).
"""
__author__ = 'dgentry@google.com (Denton Gentry)'
import errno
import json
import os
import subprocess
import netdev
import tr.cwmpbool
import tr.cwmptypes
import tr.experiment
import tr.helpers
import tr.mainloop
import tr.session
import tr.tr098_v1_6
import tr.x_catawampus_tr098_1_0
BASE98IGD = tr.tr098_v1_6.InternetGatewayDevice_v1_12.InternetGatewayDevice
BASE98WIFI = BASE98IGD.LANDevice.WLANConfiguration
CATA98 = tr.x_catawampus_tr098_1_0.X_CATAWAMPUS_ORG_InternetGatewayDevice_v1_0
CATA98WIFI = CATA98.InternetGatewayDevice.LANDevice.WLANConfiguration
ISOSTREAM_KEY = 'Device.X_CATAWAMPUS-ORG.Isostream.'
# Unit tests can override these.
BINWIFI = ['wifi']
CONMAN_DIR = ['/config/conman']
CONMAN_TMP_DIR = ['/tmp/conman']
STATIONS_DIR = ['/tmp/stations']
TMPWAVEGUIDE = ['/tmp/waveguide']
WIFIINFO_DIR = ['/tmp/wifi/wifiinfo']
class WifiConfig(object):
"""A dumb data object to store config settings."""
pass
def _WifiConfigs(roothandle):
try:
landevices = roothandle.obj.InternetGatewayDevice.LANDeviceList
except AttributeError as e:
print '_WifiConfigs: %r' % e
return
for lann, lan in landevices.iteritems():
try:
wlanconfigs = lan.WLANConfigurationList
except AttributeError:
pass
for wlann, wlan in wlanconfigs.iteritems():
yield ('InternetGatewayDevice.LANDevice.%s.WLANConfiguration.%s.'
% (lann, wlann)), wlan
@tr.experiment.Experiment
def AutoDisableWifi(roothandle):
for wlankey, wlan in _WifiConfigs(roothandle):
if hasattr(wlan, 'X_CATAWAMPUS_ORG_AllowAutoDisable'):
yield (wlankey + 'X_CATAWAMPUS-ORG_AllowAutoDisable'), True
@tr.experiment.Experiment
def AlwaysEnableSetupNetwork(roothandle):
for wlankey, _ in _WifiConfigs(roothandle):
landevice_i = int(wlankey.split('.')[2])
if landevice_i == 2:
yield (wlankey + 'Enable'), True
yield (wlankey + 'SSIDAdvertisementEnabled'), False
yield (wlankey + 'X_CATAWAMPUS-ORG_OverrideSSID'), 'GFiberSetupAutomation'
@tr.experiment.Experiment
def AlwaysEnableFiberManagedWifi(roothandle):
"""Enable a filtered guest network for early access users."""
for wlankey, _ in _WifiConfigs(roothandle):
landevice_i = int(wlankey.split('.')[2])
if landevice_i == 2:
yield (wlankey + 'Enable'), True
yield (wlankey + 'SSIDAdvertisementEnabled'), True
yield (wlankey + 'X_CATAWAMPUS-ORG_OverrideSSID'), 'Google Fiber Wi-Fi'
yield ('Device.CaptivePortal.URL',
'https://fiber-managed-wifi-tos.appspot.com/?id=%(mac)s')
yield ('Device.CaptivePortal.X_CATAWAMPUS-ORG_AuthorizerURL',
'https://fiber-managed-wifi-tos.appspot.com/tos-accepted?id=%(mac)s')
yield ('Device.CaptivePortal.X_CATAWAMPUS-ORG_ExtraTLSHosts',
'*.gfsvc.com fonts.googleapis.com fonts.gstatic.com')
yield 'Device.CaptivePortal.Enable', True
@tr.experiment.Experiment
def WaveguideInitialChannel(roothandle):
for wlankey, wlan in _WifiConfigs(roothandle):
yield (wlankey + 'AutoChannelEnable'), True
if hasattr(wlan, 'X_CATAWAMPUS_ORG_AutoChannelAlgorithm'):
yield (wlankey + 'X_CATAWAMPUS-ORG_AutoChannelAlgorithm'), 'INITIAL'
@tr.experiment.Experiment
def WaveguideDynamicChannel(roothandle):
for wlankey, wlan in _WifiConfigs(roothandle):
yield (wlankey + 'AutoChannelEnable'), True
if hasattr(wlan, 'X_CATAWAMPUS_ORG_AutoChannelAlgorithm'):
yield (wlankey + 'X_CATAWAMPUS-ORG_AutoChannelAlgorithm'), 'DYNAMIC'
@tr.experiment.Experiment
def Wifi24GForce40M(roothandle):
for wlankey, wlan in _WifiConfigs(roothandle):
if hasattr(wlan, 'X_CATAWAMPUS_ORG_Width24G'):
yield (wlankey + 'X_CATAWAMPUS-ORG_Width24G'), '40'
@tr.experiment.Experiment
def Wifi5GForce20M(roothandle):
for wlankey, wlan in _WifiConfigs(roothandle):
if hasattr(wlan, 'X_CATAWAMPUS_ORG_Width5G'):
yield (wlankey + 'X_CATAWAMPUS-ORG_Width5G'), '20'
@tr.experiment.Experiment
def Wifi5GForce40M(roothandle):
for wlankey, wlan in _WifiConfigs(roothandle):
if hasattr(wlan, 'X_CATAWAMPUS_ORG_Width5G'):
yield (wlankey + 'X_CATAWAMPUS-ORG_Width5G'), '40'
@tr.experiment.Experiment
def Wifi5GDisable(roothandle):
for wlankey, wlan in _WifiConfigs(roothandle):
if getattr(wlan, '_fixed_band', '') == '5':
yield (wlankey + 'Enable'), False
@tr.experiment.Experiment
def WifiShortGuardInterval(roothandle):
for wlankey, wlan in _WifiConfigs(roothandle):
if hasattr(wlan, 'GuardInterval'):
yield (wlankey + 'GuardInterval'), '400nsec'
@tr.experiment.Experiment
def WifiDisableWMM(roothandle):
"""Note: only applies if not using 802.11n or later."""
for wlankey, wlan in _WifiConfigs(roothandle):
if hasattr(wlan, 'WMMEnable'):
yield (wlankey + 'WMMEnable'), False
@tr.experiment.Experiment
def WifiRekeyNever(roothandle):
for wlankey, wlan in _WifiConfigs(roothandle):
if hasattr(wlan, 'RekeyingInterval'):
yield (wlankey + 'RekeyingInterval'), 0
@tr.experiment.Experiment
def WifiRekeyPTK(roothandle):
for wlankey, wlan in _WifiConfigs(roothandle):
if hasattr(wlan, 'RekeyingInterval'):
yield (wlankey + 'RekeyingInterval'), 1
@tr.experiment.Experiment
def WifiRekeyOften(roothandle):
for wlankey, wlan in _WifiConfigs(roothandle):
if hasattr(wlan, 'RekeyingInterval'):
yield (wlankey + 'RekeyingInterval'), 10
@tr.experiment.Experiment
def Wifi80211g(roothandle):
for wlankey, wlan in _WifiConfigs(roothandle):
if hasattr(wlan, 'X_CATAWAMPUS_ORG_Width24G'):
yield (wlankey + 'X_CATAWAMPUS-ORG_Width24G'), '20'
if hasattr(wlan, 'X_CATAWAMPUS_ORG_Width5G'):
yield (wlankey + 'X_CATAWAMPUS-ORG_Width5G'), '20'
yield (wlankey + 'Standard'), 'g'
@tr.experiment.Experiment
def Wifi24G80211g(roothandle):
for wlankey, wlan in _WifiConfigs(roothandle):
# Note: this intentionally doesn't match dual-band interfaces, because
# there's no clean way to switch this back in 5 GHz mode on those.
if hasattr(wlan, 'X_CATAWAMPUS_ORG_Width24G'):
yield (wlankey + 'X_CATAWAMPUS-ORG_Width24G'), '20'
if getattr(wlan, '_fixed_band', '') == '2.4':
yield (wlankey + 'Standard'), 'g'
@tr.experiment.Experiment
def ForceTvBoxWifi(roothandle):
try:
model_name = roothandle.obj.Device.DeviceInfo.ModelName
except AttributeError as e:
print 'ForceTvBoxWifi: %r' % e
return
if not model_name.startswith('GFHD'):
print 'ForceTvBoxWifi: %r is not a TV box. skipping.' % model_name
return
for wlankey, unused_wlan in _WifiConfigs(roothandle):
yield wlankey + 'Enable', True
yield wlankey + 'RadioEnabled', True
@tr.experiment.Experiment
def ForceNoTvBoxWifi(roothandle):
try:
model_name = roothandle.obj.Device.DeviceInfo.ModelName
except AttributeError as e:
print 'ForceNoTvBoxWifi: %r' % e
return
if not model_name.startswith('GFHD'):
print 'ForceNoTvBoxWifi: %r is not a TV box. skipping.' % model_name
return
for wlankey, unused_wlan in _WifiConfigs(roothandle):
yield wlankey + 'Enable', False
yield wlankey + 'RadioEnabled', False
@tr.experiment.Experiment
def ForceTvBoxWifiClient(roothandle):
"""Force join the wireless network provided by the InternetGatewayDevice."""
try:
model_name = roothandle.obj.Device.DeviceInfo.ModelName
except AttributeError as e:
print 'ForceTvBoxWifiClient: %r' % e
return
if not model_name.startswith('GFHD'):
print 'ForceTvBoxWifiClient: %r is not a TV box. Skipping.' % model_name
return
for wlankey, wlan in _WifiConfigs(roothandle):
if hasattr(wlan, 'ClientEnable'):
yield wlankey + 'RadioEnabled', True
yield wlankey + 'Enable', False
yield wlankey + 'ClientEnable', True
yield ISOSTREAM_KEY + 'ClientInterface', 'wcli0'
yield ISOSTREAM_KEY + 'ClientDisableIfPortActive', 0
return # ensure we only try to associate with one WLAN
print 'ForceTvBoxWifiClient: %r does not support .ClientEnable' % model_name
@tr.experiment.Experiment
def Wifi24GLegacySuffix(roothandle):
for wlankey, wlan in _WifiConfigs(roothandle):
if hasattr(wlan, 'X_CATAWAMPUS_ORG_Suffix24G'):
yield (wlankey + 'X_CATAWAMPUS-ORG_Suffix24G'), ' (Legacy)'
def _FreqToChan(mhz):
if mhz / 100 == 24:
return 1 + (mhz - 2412) / 5
elif mhz / 1000 == 5:
return 36 + (mhz - 5180) / 5
else:
print 'invalid wifi frequency: %r' % (mhz,)
return 0
class _SoftInt(tr.cwmptypes.Int):
"""Like tr.cwmptypes.Int, but converts invalid values to zero."""
def validate(self, obj, value):
try:
value = int(value)
except (TypeError, ValueError):
value = 0
return super(_SoftInt, self).validate(obj, value)
class _SoftBool(tr.cwmptypes.Bool):
"""Like tr.cwmptypes.Bool, but any value (even empty) is true."""
def validate(self, obj, value):
# A file which exists but is empty is true.
return value is not None
class WlanConfiguration(CATA98WIFI):
"""An implementation of tr98 WLANConfiguration for /bin/wifi."""
encryption_modes = ['X_CATAWAMPUS-ORG_None', 'None', 'WEPEncryption',
'TKIPEncryption', 'WEPandTKIPEncryption', 'AESEncryption',
'WEPandAESEncryption', 'TKIPandAESEncryption',
'WEPandTKIPandAESEncryption']
BasicAuthenticationMode = tr.cwmptypes.TriggerEnum(
['None', 'SharedAuthentication'], init='SharedAuthentication')
BasicEncryptionModes = tr.cwmptypes.TriggerEnum(
['None', 'WEPEncryption'], init='None')
BeaconAdvertisementEnabled = tr.cwmptypes.ReadOnlyBool(True)
BeaconType = tr.cwmptypes.TriggerEnum(
['None', 'Basic', 'WPA', '11i', 'BasicandWPA', 'Basicand11i', 'WPAand11i',
'BasicandWPAand11i'], init='11i')
ClientEnable = tr.cwmptypes.TriggerBool(False)
DeviceOperationMode = tr.cwmptypes.ReadOnlyString('InfrastructureAccessPoint')
Enable = tr.cwmptypes.TriggerBool(False)
GuardInterval = tr.cwmptypes.TriggerEnum(
['400nsec', '800nsec', 'Auto'], init='Auto')
IEEE11iAuthenticationMode = tr.cwmptypes.TriggerEnum(
['PSKAuthentication'], init='PSKAuthentication')
IEEE11iEncryptionModes = tr.cwmptypes.TriggerEnum(
encryption_modes, init='AESEncryption')
LocationDescription = tr.cwmptypes.String()
Name = tr.cwmptypes.ReadOnlyString()
RadioEnabled = tr.cwmptypes.TriggerBool(False)
RekeyingInterval = tr.cwmptypes.TriggerUnsigned(3600)
SSIDAdvertisementEnabled = tr.cwmptypes.TriggerBool(True)
SupportedStandards = tr.cwmptypes.ReadOnlyString()
OperatingStandards = tr.cwmptypes.TriggerString()
SupportedFrequencyBands = tr.cwmptypes.ReadOnlyString('2.4GHz,5GHz')
TransmitPowerSupported = tr.cwmptypes.ReadOnlyString('0,20,40,60,80,100')
UAPSDSupported = tr.cwmptypes.ReadOnlyBool(False)
WEPEncryptionLevel = tr.cwmptypes.ReadOnlyString('Disabled,40-bit,104-bit')
WEPKeyIndex = tr.cwmptypes.Unsigned(1)
WMMSupported = tr.cwmptypes.ReadOnlyBool(True)
WMMEnable = tr.cwmptypes.TriggerBool(True)
WPAAuthenticationMode = tr.cwmptypes.TriggerEnum(
['PSKAuthentication'], init='PSKAuthentication')
WPAEncryptionModes = tr.cwmptypes.TriggerEnum(
encryption_modes, init='AESEncryption')
SignalsStr = tr.cwmptypes.ReadOnlyString()
def __init__(self, ifname, if_suffix, bridge, band=None, standard='n',
width_2_4g=0, width_5g=0, autochan=None):
super(WlanConfiguration, self).__init__()
self._initialized = False
self._if_suffix = if_suffix
self._ifname = ifname + if_suffix
type(self).Name.Set(self, self._ifname)
self._band = band if band else '5'
self._bridge = bridge
self._fixed_band = band
if standard == 'ac':
type(self).SupportedStandards.Set(self, 'a,b,g,n,ac')
elif standard == 'n':
type(self).SupportedStandards.Set(self, 'a,b,g,n')
else:
raise ValueError('unsupported wifi standards level: %r' % (standard,))
self.Standard = standard
self.X_CATAWAMPUS_ORG_Width24G = str(width_2_4g) if width_2_4g else ''
self.X_CATAWAMPUS_ORG_Width5G = str(width_5g) if width_5g else ''
self._autochan = autochan
self.new_config = None
self._Stats = WlanConfigurationStats(ifname=self._ifname)
self._initialized = True
self._sig_dict = {}
# Need to be implemented, but not done yet.
self.Unexport(['BasicDataTransmitRates', 'AutoRateFallBackEnabled',
'OperationalDataTransmitRates',
'PossibleChannels', 'TransmitPower'])
# Unimplemented, but not yet evaluated
self.Unexport(['Alias', 'ChannelsInUse', 'MaxBitRate',
'PossibleDataTransmitRates',
'TotalIntegrityFailures', 'TotalPSKFailures'])
self.PreSharedKeyList = {}
for i in range(1, 11):
self.PreSharedKeyList[str(i)] = PreSharedKey(parent=self)
self.WEPKeyList = {}
for i in range(1, 5):
self.WEPKeyList[str(i)] = WEPKey(parent=self)
# No RADIUS support, could be added later.
self.Unexport(['AuthenticationServiceMode'])
# Local settings, currently unimplemented. Will require more
# coordination with the underlying platform support.
self.Unexport(['InsecureOOBAccessEnabled'])
# MAC Access controls, currently unimplemented but could be supported.
self.Unexport(['MACAddressControlEnabled'])
# Wifi Protected Setup, currently unimplemented and not recommended.
self.Unexport(objects=['WPS'])
# Wifi MultiMedia, currently unimplemented but could be supported.
# "wl wme_*" commands
self.Unexport(lists=['APWMMParameter', 'STAWMMParameter'])
self.Unexport(['UAPSDEnable'])
# WDS, currently unimplemented but could be supported at some point.
self.Unexport(['PeerBSSID', 'DistanceFromRoot'])
# Waveguide interface
try:
os.makedirs(TMPWAVEGUIDE[0], 0755)
except OSError as e:
if e.errno != errno.EEXIST:
raise
# pylint:disable=protected-access
type(self).X_CATAWAMPUS_ORG_AutoDisableRecommended.attr.attr.SetFileName(
self, TMPWAVEGUIDE[0] + '/%s.disabled' % self.Name)
type(self)._RecommendedChannel_2G.attr.attr.SetFileName(
self, TMPWAVEGUIDE[0] + '/%s.autochan_2g' % self.Name)
type(self)._RecommendedChannel_5G.attr.attr.SetFileName(
self, TMPWAVEGUIDE[0] + '/%s.autochan_5g' % self.Name)
type(self)._RecommendedChannel_Free.attr.attr.SetFileName(
self, TMPWAVEGUIDE[0] + '/%s.autochan_free' % self.Name)
type(self)._InitiallyRecommendedChannel_2G.attr.attr.SetFileName(
self, TMPWAVEGUIDE[0] + '/%s.autochan_2g.init' % self.Name)
type(self)._InitiallyRecommendedChannel_5G.attr.attr.SetFileName(
self, TMPWAVEGUIDE[0] + '/%s.autochan_5g.init' % self.Name)
type(self)._InitiallyRecommendedChannel_Free.attr.attr.SetFileName(
self, TMPWAVEGUIDE[0] + '/%s.autochan_free.init' % self.Name)
if not os.path.isdir(STATIONS_DIR[0]):
os.mkdir(STATIONS_DIR[0])
def release(self):
tr.helpers.Unlink(self.WifiCommandFileName())
tr.helpers.Unlink(self.APEnabledFileName())
@tr.session.cache
def _BinwifiShow(self):
wifi_data = {}
wifiinfo_filename = os.path.join(WIFIINFO_DIR[0], self._ifname)
try:
wifi_data = json.load(open(wifiinfo_filename))
wifi_data['Band'] = self._band
except (IOError, OSError, subprocess.CalledProcessError) as e:
print 'Unable to load wifi info from %s: %s' % (wifiinfo_filename, e)
return {}
return wifi_data
def StartTransaction(self):
"""Returns a dict of config updates to be applied."""
if self.new_config is None:
self.new_config = WifiConfig()
atype = self.X_CATAWAMPUS_ORG_AutoChanType
self.new_config.AutoChannelType = self._autochan or atype
self.new_config.AutoChannelEnable = self.AutoChannelEnable
self.new_config.Channel = self.Channel
self.new_config.SSID = self.SSID
@property
def signals(self):
return self._sig_dict
@signals.setter
def signals(self, new_dict):
self._sig_dict = new_dict
type(self).SignalsStr.Set(self, str(self._sig_dict))
@property
def AssociatedDeviceList(self):
directory = STATIONS_DIR[0]
if not os.path.isdir(directory):
return
stations = []
for dirfile in os.listdir(directory):
isfile = os.path.isfile(os.path.join(directory, dirfile))
if isfile and not dirfile.endswith('.new'):
try:
station = json.load(open(os.path.join(directory, dirfile)))
except ValueError:
station = {}
station['PhysAddr'] = dirfile
stations.append(station)
for station in list(stations):
if (('authorized' in station and station['authorized'] != 'yes') or
('authenticated' in station and station['authenticated'] != 'yes')):
stations.remove(station)
elif station.get('ifname') != self._ifname:
stations.remove(station)
self.CollectSignalStrengths(stations)
associated_device_list = {}
for idx, device in enumerate(sorted(stations), start=1):
filename = os.path.join(directory, device['PhysAddr'])
associated_device_list[str(idx)] = AssociatedDevice(device, filename)
return associated_device_list
def CollectSignalStrengths(self, stations):
"""Iterate through AssociatedDeviceList to populate self.signals.
self.signals is a dictionary of signal strengths by MAC address,
used in the TechUI graphs of RSSI.
Args:
stations: a dict from JSON files about this station.
"""
new_sig_dict = dict()
for station in stations:
mac_addr = station.get('PhysAddr', '00:00:00:00:00:00')
new_sig_dict[mac_addr] = station.get('signal_avg', 0)
self.signals = new_sig_dict
def GetAutoChannelEnable(self):
acalg = self.X_CATAWAMPUS_ORG_AutoChannelAlgorithm
if acalg == 'LEGACY':
d = self._BinwifiShow()
return d.get('AutoChannel', 'TRUE') == 'TRUE'
else:
# In non-legacy modes, we're passing an explicit channel to
# /bin/wifi, so in its mind, we are never using autochannel.
# Report back the real autochannel setting.
return self.new_config.AutoChannelEnable
def SetAutoChannelEnable(self, value):
b = tr.cwmpbool.parse(value)
self.new_config.AutoChannelEnable = b
self.Triggered()
AutoChannelEnable = property(GetAutoChannelEnable, SetAutoChannelEnable, None,
'WLANConfiguration.AutoChannelEnable')
@property
def BSSID(self):
d = self._BinwifiShow()
return d.get('BSSID', '')
def GetChannel(self):
d = self._BinwifiShow()
try:
return int(d.get('Channel', 0))
except ValueError:
print 'Failed to convert wifi channel to integer: %s' % (d,)
return 0
def ValidateChannel(self, value):
"""Check for a valid Wifi channel number."""
# TODO(dgentry) can /bin/wifi do this? It knows the regulatory domain.
if value in range(1, 14):
return True # 2.4 GHz. US allows 1-11, Japan allows 1-13.
if value in range(36, 144, 4):
return True # 5 GHz lower bands
if value in range(149, 169, 4):
return True # 5 GHz upper bands
return False
def SetChannel(self, value):
ivalue = int(value)
if not self.ValidateChannel(ivalue):
raise ValueError('Invalid Channel: %d' % ivalue)
self.new_config.Channel = ivalue
self.AutoChannelEnable = 'False'
self.Triggered()
Channel = property(GetChannel, SetChannel, None, 'WLANConfiguration.Channel')
def GetKeyPassphrase(self):
psk = self.PreSharedKeyList['1']
return psk.KeyPassphrase
def SetKeyPassphrase(self, value):
psk = self.PreSharedKeyList['1']
psk.KeyPassphrase = value
# TODO(dgentry) need to set WEPKeys, but this is fraught with peril.
# If KeyPassphrase is not exactly 5 or 13 bytes it must be padded.
# Apple uses different padding than Windows (and others).
# http://support.apple.com/kb/HT1344
KeyPassphrase = property(GetKeyPassphrase, SetKeyPassphrase, None,
'WLANConfiguration.KeyPassphrase')
def _InternalBandToExternal(self, internal):
return '2.4GHz' if internal == '2.4' else '5GHz'
def _ExternalBandToInternal(self, external):
return '2.4' if external == '2.4GHz' else '5'
def GetOperatingFrequencyBand(self):
return self._InternalBandToExternal(self._band)
def SetOperatingFrequencyBand(self, value):
if str(value) not in ['2.4GHz', '5GHz', '']:
raise ValueError('Invalid band')
internal = self._ExternalBandToInternal(str(value))
if self._fixed_band and self._fixed_band != internal:
raise AttributeError("can't set read-only attribute")
else:
self._band = internal
self.Triggered()
OperatingFrequencyBand = property(GetOperatingFrequencyBand,
SetOperatingFrequencyBand, None,
'WLANConfiguration.OperatingFrequencyBand')
@property
def Standard(self):
l = self.OperatingStandards.split(',')
if 'ac' in l:
return 'ac'
elif 'n' in l:
return 'n'
elif 'g' in l:
return 'g'
else:
return 'b'
@Standard.setter
def Standard(self, v):
if v == 'ac':
self.OperatingStandards = 'a,b,g,n,ac'
elif v == 'n':
self.OperatingStandards = 'a,b,g,n'
elif v == 'g':
self.OperatingStandards = 'a,b,g'
elif v == 'a' or v == 'b':
self.OperatingStandards = 'a,b'
else:
raise ValueError('unknown wifi standard %r' % (v,))
@property
def RegulatoryDomain(self):
return self._BinwifiShow().get('RegDomain', '').strip()
def GetSSID(self):
return self._BinwifiShow().get('SSID', '').strip()
def ValidateSSID(self, value):
if len(value) > 32:
raise ValueError('SSID must be <= 32 characters')
return value
def SetSSID(self, value):
value = str(value)
if not self.ValidateSSID(value):
raise ValueError('Invalid SSID: %s' % value)
self.new_config.SSID = value
self.Triggered()
SSID = property(GetSSID, SetSSID, None, 'WLANConfiguration.SSID')
@property
def Stats(self):
return self._Stats
X_CATAWAMPUS_ORG_Width24G = tr.cwmptypes.TriggerEnum(
['', '20', '40'], '')
X_CATAWAMPUS_ORG_Width5G = tr.cwmptypes.TriggerEnum(
['', '20', '40', '80'], '')
X_CATAWAMPUS_ORG_AllowAutoDisable = tr.cwmptypes.TriggerBool(False)
X_CATAWAMPUS_ORG_AutoDisableRecommended = tr.cwmptypes.Trigger(
tr.cwmptypes.ReadOnly(
tr.cwmptypes.FileBacked(
TMPWAVEGUIDE[0] + '/disabled', # filename varies
_SoftBool())))
X_CATAWAMPUS_ORG_AutoChannelAlgorithm = tr.cwmptypes.TriggerEnum(
['LEGACY', 'INITIAL', 'DYNAMIC'], 'LEGACY')
_RecommendedChannel_2G = tr.cwmptypes.Trigger(
tr.cwmptypes.ReadOnly(
tr.cwmptypes.FileBacked(
TMPWAVEGUIDE[0] + '/autochan_2g', # filename varies
_SoftInt())))
_RecommendedChannel_5G = tr.cwmptypes.Trigger(
tr.cwmptypes.ReadOnly(
tr.cwmptypes.FileBacked(
TMPWAVEGUIDE[0] + '/autochan_5g', # filename varies
_SoftInt())))
_RecommendedChannel_Free = tr.cwmptypes.Trigger(
tr.cwmptypes.ReadOnly(
tr.cwmptypes.FileBacked(
TMPWAVEGUIDE[0] + '/autochan_free', # filename varies
_SoftInt())))
@property
def X_CATAWAMPUS_ORG_RecommendedChannel(self):
# TODO(apenwarr): Add some way to express "choose best band automatically".
# Waveguide (coordinating with other APs on the LAN) can probably make
# better decisions than ACS. Then we'd use _RecommendedChannel_Free.
# For now, we'll just take the ACS's recommendation.
if self.OperatingFrequencyBand == '2.4GHz':
return self._RecommendedChannel_2G
elif self.OperatingFrequencyBand == '5GHz':
return self._RecommendedChannel_5G
_InitiallyRecommendedChannel_2G = tr.cwmptypes.Trigger(
tr.cwmptypes.ReadOnly(
tr.cwmptypes.FileBacked(
TMPWAVEGUIDE[0] + '/autochan_2g.init', # filename varies
_SoftInt())))
_InitiallyRecommendedChannel_5G = tr.cwmptypes.Trigger(
tr.cwmptypes.ReadOnly(
tr.cwmptypes.FileBacked(
TMPWAVEGUIDE[0] + '/autochan_5g.init', # filename varies
_SoftInt())))
_InitiallyRecommendedChannel_Free = tr.cwmptypes.Trigger(
tr.cwmptypes.ReadOnly(
tr.cwmptypes.FileBacked(
TMPWAVEGUIDE[0] + '/autochan_free.init', # filename varies
_SoftInt())))
@property
def X_CATAWAMPUS_ORG_InitiallyRecommendedChannel(self):
# TODO(apenwarr): Add some way to express "choose best band automatically".
# See note in RecommendedChannel above.
if self.OperatingFrequencyBand == '2.4GHz':
return self._InitiallyRecommendedChannel_2G
elif self.OperatingFrequencyBand == '5GHz':
return self._InitiallyRecommendedChannel_5G
def _ReallyWantWifi(self):
return (self.Enable and
self.RadioEnabled and
not (self.X_CATAWAMPUS_ORG_AllowAutoDisable and
self.X_CATAWAMPUS_ORG_AutoDisableRecommended))
@property
def Status(self):
"""WLANConfiguration.Status."""
if not self._ReallyWantWifi():
return 'Down'
if not self.SSID:
return 'Error'
# TODO(dgentry): get /bin/wifi to return a status.
return 'Up'
@property
def TotalAssociations(self):
return len(self.AssociatedDeviceList)
@property
def TotalBytesReceived(self):
return self.Stats.BytesReceived
@property
def TotalBytesSent(self):
return self.Stats.BytesSent
@property
def TotalPacketsReceived(self):
return self.Stats.PacketsReceived
@property
def TotalPacketsSent(self):
return self.Stats.PacketsSent
@WEPKeyIndex.validator
def ValidateWEPKeyIndex(self, value):
ivalue = int(value)
if ivalue < 1 or ivalue > 4:
raise ValueError('WEPKeyIndex must be in the range 1-4')
return ivalue
def GetAutoChanType(self):
return self._BinwifiShow().get('AutoType', '').strip()
def SetAutoChanType(self, value):
self.new_config.AutoChannelType = str(value)
X_CATAWAMPUS_ORG_AutoChanType = property(
GetAutoChanType, SetAutoChanType, None,
'WLANConfiguration.X_CATAWAMPUS-ORG_AutoChanType')
X_CATAWAMPUS_ORG_Suffix24G = tr.cwmptypes.TriggerString('')
X_CATAWAMPUS_ORG_OverrideSSID = tr.cwmptypes.TriggerString('')
@X_CATAWAMPUS_ORG_Suffix24G.validator
def ValidateSuffix24G(self, value):
if len(value) > 32:
raise ValueError('Suffix24G must be <= 32 characters')
return value
def Triggered(self):
"""Called when a parameter is modified."""
if self._initialized:
self.ExportWifiPolicy()
def _IsConfigComplete(self):
"""Returns true if configuration is ready to be applied."""
if not self.new_config.SSID:
return False
return True
def _GetEncryptionMode(self):
"""Return /bin/wifi -e argument.
Returns:
The -e argument to pass to /bin/wifi.
"""
if 'Basic' in self.BeaconType:
return 'WEP' if 'WEP' in self.BasicEncryptionModes else 'NONE'
if 'WPAand11i' in self.BeaconType:
auth = 'WPA12'
encryption = self.WPAEncryptionModes
elif '11i' in self.BeaconType:
auth = 'WPA2'
encryption = self.IEEE11iEncryptionModes
elif 'WPA' in self.BeaconType:
auth = 'WPA'
encryption = self.WPAEncryptionModes
else:
print 'Invalid BeaconType %s, using WPA2' % self.BeaconType
auth = 'WPA2'
encryption = self.IEEE11iEncryptionModes
if 'None' in encryption:
return 'NONE'
if 'AES' in encryption:
crypto = '_PSK_AES'
elif 'TKIP' in encryption:
crypto = '_PSK_TKIP'
else:
print 'Invalid EncryptionMode %s, using AES' % encryption
crypto = '_PSK_AES'
return auth + crypto
def _MakeBinWifiCommand(self):
"""Return command to run /bin/wifi."""
if self.new_config is None:
print '_MakeBinWifiCommand: WiFi configuration not yet received.'
return ''
if not self.new_config.SSID:
print '_MakeBinWifiCommand: No SSID; WiFi not configured.'
return ''
cmd = ['set', '-P', '-b', self._band, '-e', self._GetEncryptionMode()]
if self._if_suffix:
cmd += ['--interface-suffix=%s' % self._if_suffix]
cmd += ['--bridge=%s' % self._bridge or '']
if not self.SSIDAdvertisementEnabled:
cmd += ['-H']
if self.new_config.AutoChannelEnable:
acalg = self.X_CATAWAMPUS_ORG_AutoChannelAlgorithm
if acalg == 'INITIAL':
ch = _FreqToChan(self.X_CATAWAMPUS_ORG_InitiallyRecommendedChannel)
elif acalg == 'DYNAMIC':
ch = _FreqToChan(self.X_CATAWAMPUS_ORG_RecommendedChannel)
else: # LEGACY
ch = 'auto'
else:
ch = self.new_config.Channel
if ch:
cmd += ['-c', str(ch)]
ssid = self.new_config.SSID
if ssid:
if self.X_CATAWAMPUS_ORG_OverrideSSID:
ssid = self.X_CATAWAMPUS_ORG_OverrideSSID
if self.OperatingFrequencyBand == '2.4GHz':
ssid = (ssid[:32 - len(self.X_CATAWAMPUS_ORG_Suffix24G)] +
self.X_CATAWAMPUS_ORG_Suffix24G)
cmd += ['-s', ssid]
autotype = self.new_config.AutoChannelType
if autotype:
cmd += ['-a', autotype]
cw24 = self.X_CATAWAMPUS_ORG_Width24G
cw5 = self.X_CATAWAMPUS_ORG_Width5G
if self._band == '2.4' and cw24:
cmd += ['-w', cw24]
elif self._band == '5' and cw5:
cmd += ['-w', cw5]
cmd += ['-p', '/'.join(self.OperatingStandards.split(',')).encode('utf8')]
if self.Standard in ('n', 'ac') or self.WMMEnable:
cmd += ['-M']
if self.RekeyingInterval == 1:
cmd += ['-X'] # normal rekeying but add PTK rekeying
elif self.RekeyingInterval == 0:
cmd += ['-Y'] # disable all rekeying
elif self.RekeyingInterval <= 300:
cmd += ['-XX'] # very fast rekeying
if self.GuardInterval == '400nsec':
cmd += ['-G']
def validate(s):
if '\0' in s:
raise ValueError('string %r contains a NUL character' % s)
if '\n' in s:
raise ValueError('string %r contains a newline character' % s)
if '\r' in s:
raise ValueError('string %r contains a carriage return character' % s)
return s
wifi_psk = []
sl = sorted(self.PreSharedKeyList.iteritems(), key=lambda x: int(x[0]))
for (_, psk) in sl:
key = psk.GetKey()
if key:
wifi_psk = ['env', 'WIFI_PSK=%s' % validate(key)]
break
cmd = [validate(token) for token in cmd]
print '/bin/wifi options:', cmd
return '\n'.join(wifi_psk + BINWIFI + cmd)
def _ConmanFileName(self, prefix):
if self._if_suffix:
return os.path.join(CONMAN_TMP_DIR[0],
'%s.%s.%s' % (prefix, self._if_suffix, self._band))
else:
return os.path.join(CONMAN_DIR[0], '%s.%s' % (prefix, self._band))
def WifiCommandFileName(self):
return self._ConmanFileName('command')
def APEnabledFileName(self):
return self._ConmanFileName('access_point')
@tr.mainloop.WaitUntilIdle
def ExportWifiPolicy(self):
"""Export /bin/wifi command and wifi policy."""
if not os.path.exists(CONMAN_TMP_DIR[0]):
os.makedirs(CONMAN_TMP_DIR[0])
if not os.path.exists(CONMAN_DIR[0]):
os.makedirs(CONMAN_DIR[0])
binwifi_command = self._MakeBinWifiCommand()
if not binwifi_command:
tr.helpers.Unlink(self.WifiCommandFileName())
tr.helpers.Unlink(self.APEnabledFileName())
return
tr.helpers.WriteFileAtomic(self.WifiCommandFileName(), binwifi_command)
ap_enabled_filename = self.APEnabledFileName()
if self._ReallyWantWifi():
with open(ap_enabled_filename, 'w'):
pass
else:
tr.helpers.Unlink(ap_enabled_filename)
class PreSharedKey(BASE98WIFI.PreSharedKey):
"""InternetGatewayDevice.WLANConfiguration.{i}.PreSharedKey.{i}."""
def __init__(self, parent):
super(PreSharedKey, self).__init__()
self.Unexport(['Alias', 'PreSharedKey', 'AssociatedDeviceMACAddress'])
self.passphrase = ''
self.parent = parent
def Triggered(self):
self.parent.Triggered()
def GetKey(self):
"""Return the key to program into the Wifi chipset."""
return self.passphrase
def SetKeyPassphrase(self, value):
self.passphrase = str(value)
self.Triggered()
def GetKeyPassphrase(self):
"""tr98 says KeyPassphrase always reads back an empty value."""
return ''
KeyPassphrase = property(
GetKeyPassphrase, SetKeyPassphrase, None,
'WLANConfiguration.{i}.PreSharedKey.{i}.KeyPassphrase')
class WEPKey(BASE98WIFI.WEPKey):
"""InternetGatewayDevice.WLANConfiguration.{i}.WEPKey.{i}."""
WEPKey = tr.cwmptypes.TriggerString('')
def __init__(self, parent):
super(WEPKey, self).__init__()
self.Unexport(['Alias'])
self.parent = parent
def Triggered(self):
self.parent.Triggered()
class WlanConfigurationStats(netdev.NetdevStatsLinux26, BASE98WIFI.Stats):
"""tr98 InternetGatewayDevice.LANDevice.WLANConfiguration.Stats."""
def __init__(self, ifname):
netdev.NetdevStatsLinux26.__init__(self, ifname)
BASE98WIFI.Stats.__init__(self)
class AssociatedDevice(CATA98WIFI.AssociatedDevice):
"""InternetGatewayDevice.LANDevice.WLANConfiguration.AssociatedDevice."""
AssociatedDeviceMACAddress = tr.cwmptypes.ReadOnlyMacAddr()
def __init__(self, params, filename):
"""Constructor.
Args:
params: A dictionary containing information about the associated device.
filename: The path to the associated device information file, typically
in /tmp/stations.
"""
super(AssociatedDevice, self).__init__()
self._params = params
type(self).AssociatedDeviceMACAddress.Set(self, params.get('PhysAddr', ''))
self.Unexport(['AssociatedDeviceIPAddress', 'LastPMKId',
'LastRequestedUnicastCipher',
'LastRequestedMulticastCipher'])
@property
def AssociatedDeviceAuthenticationState(self):
auth = self._params.get('authenticated', 'no')
if auth == 'yes' or auth == 'true':
return True
@property
def LastDataTransmitRate(self):
# tr-098-1-6 defines LastDataTransmitRate as a string(4). Bizarre.
return str(int(self._params.get('rx bitrate', 0.0)))
@property
def X_CATAWAMPUS_ORG_Active(self):
return bool(self._params.get('active', False))
@property
def X_CATAWAMPUS_ORG_LastDataDownlinkRate(self):
return int(self._params.get('rx bitrate', 0.0) * 1000.0)
@property
def X_CATAWAMPUS_ORG_LastDataUplinkRate(self):
return int(self._params.get('tx bitrate', 0.0) * 1000.0)
@property
def X_CATAWAMPUS_ORG_SignalStrength(self):
return int(self._params.get('signal', 0))
@property
def X_CATAWAMPUS_ORG_SignalStrengthAverage(self):
return int(self._params.get('signal_avg', 0))
@property
def X_CATAWAMPUS_ORG_StationInfo(self):
return str(self._params)