| #! /usr/bin/python |
| # Copyright 2016 Google Inc. All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Chimera craft UI. Code lifted from catawampus diag and tech UI.""" |
| |
| __author__ = 'edjames@google.com (Ed James)' |
| |
| import base64 |
| import getopt |
| import json |
| import os |
| import re |
| import StringIO |
| import subprocess |
| import sys |
| import urllib2 |
| import digest |
| import png |
| import tornado.httpserver |
| import tornado.ioloop |
| import tornado.web |
| |
| |
| class ConfigError(Exception): |
| """Configuration errors to pass to browser.""" |
| |
| def __init__(self, message): |
| super(ConfigError, self).__init__(message) |
| |
| |
| class Validator(object): |
| """Validate the user value and convert to safe config value.""" |
| pattern = r'^(.*)$' |
| example = 'any string' |
| |
| def __init__(self): |
| self.Reset() |
| |
| def Reset(self): |
| self.fields = () |
| self.config = '' |
| |
| def Validate(self, value): |
| self.Reset() |
| self.value = value |
| m = re.search(self.pattern, value) |
| if not m: |
| raise ConfigError('value "%s" does not match pattern "%s", eg: "%s"' % |
| (value, self.pattern, self.example)) |
| self.fields = m.groups() |
| self.config = self.fields[0] |
| |
| |
| class VInt(Validator): |
| """Validate as integer.""" |
| pattern = r'^(\d+)$' |
| example = '123' |
| |
| |
| class VRange(VInt): |
| """Validate as integer in a range.""" |
| |
| def __init__(self, low, high): |
| super(VRange, self).__init__() |
| self.low = low |
| self.high = high |
| |
| def Validate(self, value): |
| super(VRange, self).Validate(value) |
| self.CheckInRange(int(self.config), self.low, self.high) |
| |
| @staticmethod |
| def CheckInRange(num, low, high): |
| if num < low or num > high: |
| raise ConfigError('number %d is out of range %d-%d' % (num, low, high)) |
| |
| |
| class VSlash(Validator): |
| """Validate as slash notation (eg 192.168.1.1/24).""" |
| pattern = r'^((\d+).(\d+).(\d+).(\d+)/(\d+))$' |
| example = '192.168.1.1/24' |
| |
| def __init__(self): |
| super(VSlash, self).__init__() |
| |
| def Validate(self, value): |
| super(VSlash, self).Validate(value) |
| mask = int(self.fields[5]) |
| VRange.CheckInRange(mask, 0, 32) |
| for dotted_quad_part in self.fields[1:4]: |
| num = int(dotted_quad_part) |
| VRange.CheckInRange(num, 0, 255) |
| |
| |
| class VVlan(VRange): |
| """Validate as vlan.""" |
| |
| def __init__(self): |
| super(VVlan, self).__init__(0, 4095) |
| |
| |
| class VFreqHi(VRange): |
| """Validate as Hi E-Band frequency.""" |
| |
| def __init__(self): |
| super(VFreqHi, self).__init__(82000000, 85000000) |
| |
| |
| class VFreqLo(VRange): |
| """Validate as Low E-Band frequency.""" |
| |
| def __init__(self): |
| super(VFreqLo, self).__init__(72000000, 75000000) |
| |
| |
| class VPower(VRange): |
| """Validate as PA power level.""" |
| |
| def __init__(self): |
| super(VPower, self).__init__(0, 2000) |
| |
| |
| class VGain(VRange): |
| """Validate as gain level.""" |
| |
| def __init__(self): |
| super(VGain, self).__init__(0, 63) |
| |
| |
| class VGainIndex(VRange): |
| """Validate as gain index.""" |
| |
| def __init__(self): |
| super(VGainIndex, self).__init__(1, 5) |
| |
| |
| class VDict(Validator): |
| """Validate as member of dict.""" |
| dict = {} |
| |
| def Validate(self, value): |
| super(VDict, self).Validate(value) |
| if value not in self.dict: |
| keys = self.dict.keys() |
| raise ConfigError('value "%s" must be one of "%s"' % (value, keys)) |
| self.config = self.dict[value] |
| |
| |
| class VTx(VDict): |
| """Validate: tx/rx.""" |
| dict = {'tx': 'tx', 'rx': 'rx'} |
| |
| |
| class VTrueFalse(VDict): |
| """Validate as true or false.""" |
| dict = {'true': 'true', 'false': 'false'} |
| |
| |
| class VPassword(Validator): |
| """Validate as base64 encoded and reasonable length.""" |
| example = '******' |
| |
| def Validate(self, value): |
| # value is { admin: admin_pw, new: new_pw, confirm: confirm_pw } |
| # passwords are in base64 |
| super(VPassword, self).Validate(value['new']) |
| |
| # TODO(edjames) ascii decodes legally; how to check it's really base64? |
| try: |
| current = base64.b64decode(self.admin) |
| admin_pw = base64.b64decode(value['admin']) |
| new_pw = base64.b64decode(value['new']) |
| except TypeError: |
| raise ConfigError('passwords must be base64 encoded') |
| |
| # verify correct admin pw is passed, confirm matches |
| if current != admin_pw: |
| raise ConfigError('admin password is incorrect') |
| if value['new'] != value['confirm']: |
| raise ConfigError('new password does not match confirm password') |
| if len(new_pw) < 5 or len(new_pw) > 16: |
| raise ConfigError('passwords should be 5-16 characters') |
| |
| |
| class Config(object): |
| """Configure the device after validation.""" |
| |
| def __init__(self, validator): |
| self.validator = validator() |
| |
| def Validate(self, value): |
| self.validator.Validate(value) |
| |
| def Configure(self): |
| raise Exception('override Config.Configure') |
| |
| def SetUI(self, ui): |
| self.ui = ui |
| |
| @staticmethod |
| def Run(command): |
| """Run a command.""" |
| print 'running: %s' % command |
| try: |
| subprocess.check_output(command) |
| except subprocess.CalledProcessError as e: |
| print 'Run: ', str(e) |
| raise ConfigError('command failed with %d' % e.returncode) |
| |
| |
| class PtpConfig(Config): |
| """Configure using ptp-config.""" |
| |
| def __init__(self, validator, key): |
| super(PtpConfig, self).__init__(validator) |
| self.key = key |
| |
| def Configure(self): |
| Config.Run(['ptp-config', '-s', self.key, self.validator.config]) |
| |
| |
| class PtpPassword(PtpConfig): |
| """Configure a password (need password_admin setting).""" |
| |
| def Validate(self, value): |
| admin = self.ui.ReadFile('%s/config/settings/password_admin' % self.ui.sim) |
| self.validator.admin = admin |
| super(PtpPassword, self).Validate(value) |
| |
| |
| class PtpActivate(Config): |
| """Configure using ptp-config.""" |
| |
| def __init__(self, validator, key): |
| super(PtpActivate, self).__init__(validator) |
| self.key = key |
| |
| def Configure(self): |
| Config.Run(['ptp-config', '-i', self.key]) |
| |
| |
| class Glaukus(Config): |
| """Configure using glaukus json api.""" |
| baseurl = 'http://localhost:8080' |
| |
| def __init__(self, validator, api, fmt): |
| super(Glaukus, self).__init__(validator) |
| self.api = api |
| self.fmt = fmt |
| |
| def CallJson(self, url, payload): |
| """Handle a JSON request to glaukusd.""" |
| print 'Glaukus: ', url, payload |
| |
| try: |
| fd = urllib2.urlopen(url, payload, timeout=2) |
| except urllib2.URLError as ex: |
| print 'Connection to %s failed: %s' % (url, ex.reason) |
| raise ConfigError('failed to contact glaukus') |
| response = fd.read() |
| j = json.loads(response) |
| print j |
| if j['code'] != 'SUCCESS': |
| if j['message']: |
| raise ConfigError(j.message) |
| raise ConfigError('failed to configure glaukus') |
| |
| def Configure(self): |
| url = self.baseurl + self.api |
| payload = self.fmt % self.validator.config |
| self.CallJson(url, payload) |
| |
| |
| class GlaukusACM(Glaukus): |
| """Configure glaukus ACM.""" |
| |
| def __init__(self, validator): |
| super(GlaukusACM, self).__init__(validator, '/unused', 'unused') |
| |
| def Configure(self): |
| enable = self.validator.config |
| if enable is 'true': |
| url = self.baseurl + '/api/modem/acm' |
| payload = '{"rxSensorsEnabled":true,"txSwitchEnabled":true}' |
| self.CallJson(url, payload) |
| else: |
| url = self.baseurl + '/api/modem/acm' |
| payload = '{"rxSensorsEnabled":false,"txSwitchEnabled":false}' |
| self.CallJson(url, payload) |
| |
| url = self.baseurl + '/api/modem/acm/profile' |
| payload = '{"profileIndex":0,"isLocal":true}' |
| self.CallJson(url, payload) |
| |
| |
| class Reboot(Config): |
| """Reboot.""" |
| |
| def Configure(self): |
| if self.validator.config == 'true': |
| cmd = '(sleep 5; reboot)&' |
| os.system(cmd) |
| |
| |
| class FactoryReset(Config): |
| """Factory Reset.""" |
| |
| def Configure(self): |
| if self.validator.config == 'true': |
| cmd = 'zap --i-really-mean-it --erase-backups && ((sleep 5; reboot) &)' |
| os.system(cmd) |
| |
| |
| class CraftUI(object): |
| """A web server that configures and displays Chimera data.""" |
| |
| handlers = { |
| 'password_admin': PtpPassword(VPassword, 'password_admin'), |
| 'password_guest': PtpPassword(VPassword, 'password_guest'), |
| |
| 'craft_ipaddr': PtpConfig(VSlash, 'craft_ipaddr'), |
| 'link_ipaddr': PtpConfig(VSlash, 'local_ipaddr'), |
| 'peer_ipaddr': PtpConfig(VSlash, 'peer_ipaddr'), |
| |
| 'vlan_inband': PtpConfig(VVlan, 'vlan_inband'), |
| 'vlan_ooband': PtpConfig(VVlan, 'vlan_ooband'), |
| 'vlan_peer': PtpConfig(VVlan, 'vlan_peer'), |
| |
| 'craft_ipaddr_activate': PtpActivate(VTrueFalse, 'craft_ipaddr'), |
| 'link_ipaddr_activate': PtpActivate(VTrueFalse, 'local_ipaddr'), |
| 'peer_ipaddr_activate': PtpActivate(VTrueFalse, 'peer_ipaddr'), |
| |
| 'vlan_inband_activate': PtpActivate(VTrueFalse, 'vlan_inband'), |
| 'vlan_ooband_activate': PtpActivate(VTrueFalse, 'vlan_ooband'), |
| 'vlan_peer_activate': PtpActivate(VTrueFalse, 'vlan_peer'), |
| |
| 'freq_hi': Glaukus(VFreqHi, '/api/radio/frequency', '{"hiFrequency":%s}'), |
| 'freq_lo': Glaukus(VFreqLo, '/api/radio/frequency', '{"loFrequency":%s}'), |
| 'mode_hi': Glaukus(VTx, '/api/radio/hiTransceiver/mode', '%s'), |
| 'tx_powerlevel': Glaukus(VPower, '/api/radio/tx/paPowerSet', '%s'), |
| 'tx_gain': Glaukus(VGain, '/api/radio/tx/vgaGain', '%s'), |
| 'rx_gainindex': Glaukus(VGainIndex, '/api/radio/rx/agcDigitalGainIndex', |
| '%s'), |
| 'palna_on': Glaukus(VTrueFalse, '/api/radio/paLnaPowerEnabled', '%s'), |
| |
| 'acm_on': GlaukusACM(VTrueFalse), |
| |
| 'reboot': Reboot(VTrueFalse), |
| 'factory_reset': FactoryReset(VTrueFalse) |
| } |
| ifmap = { |
| 'craft0': 'craft', |
| 'br0': 'bridge', |
| 'sw0.ooband': 'ooband', |
| 'sw0.inband': 'inband', |
| 'sw0.peer': 'link', |
| } |
| ifvlan = [ |
| 'sw0.ooband', |
| 'sw0.inband', |
| 'sw0.peer' |
| ] |
| stats = [ |
| 'multicast', |
| 'collisions', |
| 'rx_bytes', |
| 'rx_packets', |
| 'rx_errors', |
| 'rx_dropped', |
| 'tx_bytes', |
| 'tx_packets', |
| 'tx_errors', |
| 'tx_dropped' |
| ] |
| |
| def __init__(self, wwwroot, http_port, https_port, sim): |
| """Initialize.""" |
| self.wwwroot = wwwroot |
| self.http_port = http_port |
| self.https_port = https_port |
| self.sim = sim |
| self.data = {} |
| self.data['refreshCount'] = 0 |
| platform = self.ReadFile(sim + '/etc/platform') |
| serial = self.ReadFile(sim + '/etc/serial') |
| self.realm = '%s-%s' % (platform, serial) |
| |
| def ApplyChanges(self, changes): |
| """Apply changes to system.""" |
| if 'config' not in changes: |
| raise ConfigError('missing required config array') |
| conf = changes['config'] |
| try: |
| # dry run to validate all |
| for c in conf: |
| for k, v in c.items(): |
| if k not in self.handlers: |
| raise ConfigError('unknown key "%s"' % k) |
| h = self.handlers[k] |
| h.SetUI(self) |
| h.Validate(v) |
| # do it again for real |
| for c in conf: |
| for k, v in c.items(): |
| h = self.handlers[k] |
| h.Validate(v) |
| h.Configure() |
| except ConfigError as e: |
| raise ConfigError('key "%s": %s' % (k, e)) |
| |
| def ReadFile(self, filepath): |
| """cat file.""" |
| text = '' |
| try: |
| with open(filepath) as fd: |
| text = fd.read().rstrip() |
| except IOError as e: |
| text = 'ReadFile failed: %s: %s' % (filepath, e.strerror) |
| |
| return text |
| |
| def GetValue(self, data, path): |
| """Walk down the dicts to get a value.""" |
| keys = path.split('/') |
| v = data |
| for key in keys: |
| if v: |
| v = v.get(key, None) |
| return v |
| |
| def AddLeds(self, data): |
| """Add status leds to data.""" |
| red = 'red.gif' |
| green = 'green.gif' |
| leds = { |
| 'ACS': red, |
| 'Switch': red, |
| 'Modem': red, |
| 'Radio': red, |
| 'RSSI': red, |
| 'MSE': red, |
| 'Peer': red |
| } |
| if self.GetValue(data, 'platform/ledstate') is 'ACSCONTACT': |
| leds['ACS'] = green |
| if self.GetValue(data, 'platform/cpss_ready'): |
| leds['Switch'] = green |
| if self.GetValue(data, 'modem/status/acquireStatus') == 1: |
| leds['Modem'] = green |
| if self.GetValue(data, 'radio/paLnaPowerEnabled'): |
| leds['Radio'] = green |
| rssi = self.GetValue(data, 'radio/rx/rssi') |
| if rssi >= 1500 and rssi <= 2000: |
| leds['RSSI'] = green |
| mse = self.GetValue(data, 'modem/status/normalizedMse') |
| if mse is not None and mse <= -180: |
| leds['MSE'] = green |
| if self.GetValue(data, 'platform/peer_up'): |
| leds['Peer'] = green |
| data['leds'] = leds |
| |
| def GetData(self): |
| """Get system data, return a json string.""" |
| data = {} |
| data['platform'] = self.GetPlatformData() |
| data['modem'] = self.GetModemData() |
| data['radio'] = self.GetRadioData() |
| self.AddLeds(data) |
| js = json.dumps(data) |
| return js |
| |
| def AddIpAddr(self, data): |
| """Run ip addr and parse results.""" |
| ipaddr = '' |
| try: |
| ipaddr = subprocess.check_output(['ip', '-o', 'addr']) |
| except subprocess.CalledProcessError as e: |
| print 'warning: "ip -o addr" failed: ', e |
| v = {} |
| for line in ipaddr.splitlines(): |
| f = line.split() |
| ifname = re.sub(r'[@:].*', '', f[1]) |
| m = re.search(r'scope (global|link)', line) |
| scope = m.group(1) if m else 'noscope' |
| v[ifname + ':' + f[2] + ':' + scope] = f[3] |
| m = re.search(r'link/ether (\S+)', line) |
| if m: |
| mac = m.group(1) |
| v[ifname + ':' + 'mac'] = mac |
| for ifname, uiname in self.ifmap.items(): |
| mac = v.get(ifname + ':mac') |
| data[uiname + '_mac'] = mac if mac else 'unknown' |
| for inet in ('inet', 'inet6'): |
| kglobal = ifname + ':' + inet + ':' + 'global' |
| vdata = v.get(kglobal, 'unknown') |
| kdata = 'active_' + uiname + '_' + inet |
| data[kdata] = vdata |
| |
| def AddInterfaceStats(self, data): |
| """Get if stats.""" |
| for ifname, uiname in self.ifmap.items(): |
| d = self.sim + '/sys/class/net/' + ifname + '/statistics/' |
| for stat in self.stats: |
| k = uiname + '_' + stat |
| data[k] = self.ReadFile(d + stat) |
| |
| def AddSwitchStats(self, data): |
| """Run presterastats and send json.""" |
| stats = '' |
| try: |
| stats = subprocess.check_output(['presterastats']) |
| except subprocess.CalledProcessError as e: |
| print 'warning: "presterastats" failed: ', e |
| try: |
| data['switch'] = json.loads(stats)['port-interface-statistics'] |
| except ValueError as e: |
| print 'warning: "presterastats" json parse failed: ', e |
| |
| def AddVlans(self, data): |
| """Run ip -d link and parse results for vlans.""" |
| iplink = '' |
| try: |
| iplink = subprocess.check_output(['ip', '-o', '-d', 'link']) |
| except subprocess.CalledProcessError as e: |
| print 'warning: "ip -o -d link" failed: ', e |
| v = {} |
| for line in iplink.splitlines(): |
| m = re.search(r'^\d+: ([\w\.]+)@\w+: .* vlan id (\w+)', line) |
| if m: |
| v[m.group(1)] = m.group(2) |
| for ifname in self.ifvlan: |
| uiname = self.ifmap[ifname] |
| vdata = v.get(ifname, 'unknown') |
| kdata = 'active_' + uiname + '_vlan' |
| data[kdata] = vdata |
| |
| def GetPlatformData(self): |
| """Get platform data.""" |
| data = self.data |
| sim = self.sim |
| |
| if data['refreshCount'] == 0: |
| data['serialno'] = self.ReadFile(sim + '/etc/serial') |
| data['version'] = self.ReadFile(sim + '/etc/version') |
| data['platform'] = self.ReadFile(sim + '/etc/platform') |
| data['softwaredate'] = self.ReadFile(sim + '/etc/softwaredate') |
| data['refreshCount'] += 1 |
| data['uptime'] = self.ReadFile(sim + '/proc/uptime') |
| data['ledstate'] = self.ReadFile(sim + '/tmp/gpio/ledstate') |
| data['cpu_temperature'] = self.ReadFile(sim + '/tmp/gpio/cpu_temperature') |
| data['peer_up'] = os.path.exists(sim + '/tmp/peer-up') |
| data['cpss_ready'] = os.path.exists(sim + '/tmp/cpss_ready') |
| cs = '/config/settings/' |
| data['craft_ipaddr'] = self.ReadFile(sim + cs + 'craft_ipaddr') |
| data['link_ipaddr'] = self.ReadFile(sim + cs + 'local_ipaddr') |
| data['peer_ipaddr'] = self.ReadFile(sim + cs + 'peer_ipaddr') |
| data['vlan_inband'] = self.ReadFile(sim + cs + 'vlan_inband') |
| data['vlan_ooband'] = self.ReadFile(sim + cs + 'vlan_ooband') |
| data['vlan_link'] = self.ReadFile(sim + cs + 'vlan_peer') |
| self.AddIpAddr(data) |
| self.AddInterfaceStats(data) |
| self.AddSwitchStats(data) |
| self.AddVlans(data) |
| return data |
| |
| def GetModemData(self): |
| """Get modem data.""" |
| data = {} |
| response = '{}' |
| if self.sim: |
| response = self.ReadFile(self.sim + '/tmp/glaukus/modem.json') |
| else: |
| try: |
| url = 'http://localhost:8080/api/modem' |
| handle = urllib2.urlopen(url, timeout=2) |
| response = handle.read() |
| except urllib2.URLError as ex: |
| print 'Connection to %s failed: %s' % (url, ex.reason) |
| try: |
| data = json.loads(response) |
| except ValueError as e: |
| print 'json format error: %s' % e |
| return data |
| |
| def GetRadioData(self): |
| """Get radio data, return a json string.""" |
| data = {} |
| response = '{}' |
| if self.sim: |
| response = self.ReadFile(self.sim + '/tmp/glaukus/radio.json') |
| else: |
| try: |
| url = 'http://localhost:8080/api/radio' |
| handle = urllib2.urlopen(url, timeout=2) |
| response = handle.read() |
| except urllib2.URLError as ex: |
| print 'Connection to %s failed: %s' % (url, ex.reason) |
| try: |
| data = json.loads(response) |
| except ValueError as e: |
| print 'json format error: %s' % e |
| return data |
| |
| def GetIQPNG(self, path): |
| """Get IQ points and render as PNG.""" |
| response = '[0,0]' |
| if self.sim: |
| response = self.ReadFile(self.sim + '/tmp/glaukus/' + path + '.json') |
| else: |
| try: |
| url = 'http://localhost:8080/api/modem/iq/' + path |
| handle = urllib2.urlopen(url, timeout=2) |
| response = handle.read() |
| except urllib2.URLError as ex: |
| print 'Connection to %s failed: %s' % (url, ex.reason) |
| |
| coords = [0, 0] |
| try: |
| coords = json.loads(response) |
| except ValueError as e: |
| print 'json format error: %s' % e |
| |
| # owh is original width/height of data (-1200 to 1200) |
| owh = (2400, 2400) |
| # wh is display size (400x400) |
| wh = (400, 400) |
| |
| w = png.Writer(size=wh, greyscale=True, bitdepth=1) |
| scanline = int((wh[0] + 7) / 8) |
| rows = [scanline*[0] for i in xrange(0, wh[1])] |
| for i in xrange(0, len(coords) / 2): |
| # data is a series of x,y,x,y,x,y... |
| xy = (coords[i*2], coords[i*2+1]) |
| # transform and scale data to display |
| sxy = (int((xy[0] + owh[0]/2 + .5) * wh[0] / owh[0]), |
| int((xy[1] + owh[1]/2 + .5) * wh[1] / owh[1])) |
| if sxy[0] < 0 or sxy[0] >= wh[0] or sxy[1] < 0 or sxy[1] >= wh[1]: |
| continue |
| # set a pixel in the PNG |
| pos = int(sxy[0] / 8) |
| shift = sxy[0] % 8 |
| rows[sxy[1]][pos] |= 1 << (7 - shift) |
| f = StringIO.StringIO() |
| w.write_packed(f, rows) |
| image = f.getvalue() |
| f.close() |
| return image |
| |
| def GetUserCreds(self, user): |
| """Create a dict with the requested password.""" |
| if user not in ('admin', 'guest'): |
| return None |
| b64 = self.ReadFile('%s/config/settings/password_%s' % (self.sim, user)) |
| pw = base64.b64decode(b64) |
| return {'auth_username': user, 'auth_password': pw} |
| |
| def GetAdminCreds(self, user): |
| if user != 'admin': |
| return None |
| return self.GetUserCreds(user) |
| |
| def Authenticate(self, request): |
| """Check if user is authenticated (sends challenge if not).""" |
| if not request.get_authenticated_user(self.GetUserCreds, self.realm): |
| return False |
| return True |
| |
| def AuthenticateAdmin(self, request): |
| """Check if user is authenticated (sends challenge if not).""" |
| if not request.get_authenticated_user(self.GetAdminCreds, self.realm): |
| return False |
| return True |
| |
| class CraftHandler(digest.DigestAuthMixin, tornado.web.RequestHandler): |
| """Common class to add args to html template.""" |
| auth = 'unset' |
| |
| def IsProxy(self): |
| """Check if this request was proxied, (ie, we are the peer).""" |
| return self.request.headers.get('craftui-proxy', 0) == '1' |
| |
| def IsPeer(self): |
| """Check args to see if this is a request for the peer.""" |
| return self.get_argument('peer', default='0') == '1' |
| |
| def IsHttps(self): |
| """See if https:// was used.""" |
| return (self.request.protocol == 'https' or |
| self.request.headers.get('craftui-https', 0) == '1') |
| |
| def TemplateArgs(self): |
| """Build template args to dynamically adjust html file.""" |
| is_https = self.IsHttps() |
| is_proxy = self.IsProxy() |
| |
| peer_arg = '?peer=1' |
| |
| args = {} |
| args['hidden_on_https'] = 'hidden' if is_https else '' |
| args['hidden_on_peer'] = 'hidden' if is_proxy else '' |
| args['shown_on_peer'] = 'hidden' if not is_proxy else '' |
| args['peer_arg'] = peer_arg |
| args['peer_arg_on_peer'] = peer_arg if is_proxy else '' |
| return args |
| |
| def TryProxy(self): |
| """Check if we should proxy this request to the peer.""" |
| if not self.IsPeer() or self.IsProxy(): |
| return False |
| self.Proxy() |
| return True |
| |
| class ErrorHandler(urllib2.HTTPDefaultErrorHandler): |
| """Catch the error, don't raise exception.""" |
| error = {} |
| |
| def http_error_default(self, req, fd, code, msg, hdrs): |
| self.error = { |
| 'request': req, |
| 'fd': fd, |
| 'code': code, |
| 'msg': msg, |
| 'hdrs': hdrs |
| } |
| |
| def Proxy(self): |
| """Proxy to the peer.""" |
| ui = self.settings['ui'] |
| r = self.request |
| cs = '/config/settings/' |
| peer_ipaddr = ui.ReadFile(ui.sim + cs + 'peer_ipaddr') |
| peer_ipaddr = re.sub(r'/\d+$', '', peer_ipaddr) |
| if ui.sim: |
| peer_ipaddr = 'localhost:8890' |
| url = 'http://' + peer_ipaddr + r.uri |
| print 'proxy: ', url |
| |
| eh = self.ErrorHandler() |
| opener = urllib2.build_opener(eh) |
| |
| body = None |
| if r.method == 'POST': |
| body = '' if r.body is None else r.body |
| req = urllib2.Request(url, body, r.headers) |
| req.add_header('CraftUI-Proxy', 1) |
| req.add_header('CraftUI-Https', int(self.IsHttps())) |
| fd = opener.open(req, timeout=2) |
| if eh.error: |
| fd = eh.error['fd'] |
| self.set_status(eh.error['code']) |
| hdrs = eh.error['hdrs'] |
| for h in hdrs: |
| v = hdrs.get(h) |
| self.set_header(h, v) |
| |
| response = fd.read() |
| if response: |
| self.write(response) |
| self.finish() |
| |
| def Authenticated(self): |
| """Authenticate the user per the required auth type.""" |
| ui = self.settings['ui'] |
| if self.auth == 'any': |
| if not ui.Authenticate(self): |
| return False |
| elif self.auth == 'admin': |
| if not ui.AuthenticateAdmin(self): |
| return False |
| elif self.auth != 'none': |
| raise Exception('unknown authentication type "%s"' % self.auth) |
| return True |
| |
| def get(self): |
| if self.TryProxy(): |
| return |
| if not self.Authenticated(): |
| return |
| ui = self.settings['ui'] |
| path = ui.wwwroot + '/' + self.page + '.thtml' |
| print '%s %s page (%s)' % (self.request.method, self.page, ui.sim) |
| self.render(path, **self.TemplateArgs()) |
| |
| class WelcomeHandler(CraftHandler): |
| page = 'welcome' |
| auth = 'none' |
| |
| class StatusHandler(CraftHandler): |
| page = 'status' |
| auth = 'any' |
| |
| class ConfigHandler(CraftHandler): |
| page = 'config' |
| auth = 'admin' |
| |
| class JsonHandler(CraftHandler): |
| """Provides JSON-formatted content to be displayed in the UI.""" |
| page = 'json' |
| |
| def get(self): |
| if self.TryProxy(): |
| return |
| self.auth = 'any' |
| if not self.Authenticated(): |
| return |
| ui = self.settings['ui'] |
| print '%s %s page (%s)' % (self.request.method, self.page, ui.sim) |
| jsonstring = ui.GetData() |
| self.set_header('Content-Type', 'application/json') |
| self.write(jsonstring) |
| self.finish() |
| |
| def post(self): |
| if self.TryProxy(): |
| return |
| self.auth = 'admin' |
| if not self.Authenticated(): |
| return |
| ui = self.settings['ui'] |
| print '%s %s page (%s)' % (self.request.method, self.page, ui.sim) |
| request = self.request.body |
| result = {} |
| result['error'] = 0 |
| result['errorstring'] = '' |
| try: |
| try: |
| json_args = json.loads(request) |
| request = json.dumps(json_args) |
| except ValueError as e: |
| print e |
| raise ConfigError('json format error') |
| ui.ApplyChanges(json_args) |
| except ConfigError as e: |
| print e |
| result['error'] += 1 |
| result['errorstring'] += str(e) |
| |
| response = json.dumps(result) |
| print 'request: ', request |
| print 'response: ', response |
| self.set_header('Content-Type', 'application/json') |
| self.write(response) |
| self.finish() |
| |
| class PNGHandler(CraftHandler): |
| """Returns a PNG showing plotted IQ values.""" |
| baseurl = 'http://localhost:8080/api/modem/iq/' |
| auth = 'any' |
| page = 'IQ' |
| path = None |
| |
| def get(self): |
| if self.TryProxy(): |
| return |
| if not self.Authenticated(): |
| return |
| ui = self.settings['ui'] |
| print '%s %s page (%s)' % (self.request.method, self.page, ui.sim) |
| |
| image = ui.GetIQPNG(self.path) |
| self.set_header('Content-Type', 'image/png') |
| self.write(image) |
| self.finish() |
| |
| class RXSlicerPNGHandler(PNGHandler): |
| path = 'rxslicer' |
| |
| def RunUI(self): |
| """Create the http redirect and https web server and run forever.""" |
| sim = self.sim |
| |
| craftui_handlers = [ |
| (r'^/$', self.WelcomeHandler), |
| (r'^/status/?$', self.StatusHandler), |
| (r'^/config/?$', self.ConfigHandler), |
| (r'^/content.json/?$', self.JsonHandler), |
| (r'^/rxslicer.png$', self.RXSlicerPNGHandler), |
| (r'^/static/([^/]*)$', tornado.web.StaticFileHandler, |
| {'path': self.wwwroot + '/static'}), |
| ] |
| |
| http_app = tornado.web.Application(craftui_handlers) |
| http_app.settings['ui'] = self |
| http_app.listen(self.http_port) |
| |
| certfile = sim + '/tmp/ssl/certs/craftui.pem' |
| keyfile = sim + '/tmp/ssl/private/craftui.key' |
| # use the device cert if signed one is not available |
| if not os.path.exists(certfile) or not os.path.exists(keyfile): |
| certfile = sim + '/tmp/ssl/certs/device.pem' |
| keyfile = sim + '/tmp/ssl/private/device.key' |
| |
| print 'certfile=', certfile |
| print 'keyfile=', keyfile |
| |
| https_app = tornado.web.Application(craftui_handlers) |
| https_app.settings['ui'] = self |
| https_server = tornado.httpserver.HTTPServer(https_app, ssl_options={ |
| 'certfile': certfile, 'keyfile': keyfile}) |
| https_server.listen(self.https_port) |
| |
| ioloop = tornado.ioloop.IOLoop.instance() |
| ioloop.start() |
| |
| |
| def Usage(): |
| """Show usage.""" |
| print 'Usage: % [-p)ort 80] [-d)ir web] [-s)im top]' |
| print '\tUse -s to provide an alternate rootfs' |
| |
| |
| def main(): |
| www = '/usr/craftui/www' |
| http_port = 80 |
| https_port = 443 |
| sim = '' |
| try: |
| opts, args = getopt.getopt(sys.argv[1:], 's:p:P:w:S', |
| ['sim=', 'http-port=', 'https-port=', 'www=']) |
| except getopt.GetoptError as err: |
| # print help information and exit: |
| print str(err) |
| Usage() |
| sys.exit(1) |
| for o, a in opts: |
| if o in ('-s', '--sim'): |
| sim = a |
| elif o in ('-p', '--http-port'): |
| http_port = int(a) |
| elif o in ('-P', '--https-port'): |
| https_port = int(a) |
| elif o in ('-w', '--www'): |
| www = a |
| else: |
| assert False, 'unhandled option' |
| Usage() |
| sys.exit(1) |
| if args: |
| assert False, 'extra args' |
| Usage() |
| sys.exit(1) |
| craftui = CraftUI(www, http_port, https_port, sim) |
| craftui.RunUI() |
| |
| |
| if __name__ == '__main__': |
| main() |