| #!/usr/bin/python |
| # Copyright 2012 Google Inc. All Rights Reserved. |
| |
| """This test performs TR069 related DataModel verification. |
| |
| This class extends the TestCase class, it does the following: |
| 1. Configure/Verify Bruno with correct TR69 parameters: |
| - ACS URL |
| - Image Version |
| 2. Verify Bruno has connection to ACS (IPv6/IPv4) |
| 3. Set/Get TR69 data model parameters: |
| - getParameterNames() |
| - getParameterValues() |
| 4. Data mode: |
| - Data Models for GVSB |
| - Data Models for TR098 |
| - Data Models for MoCA |
| - Data Models for TR140 |
| - Data Models for TR181 |
| 5. This file includes the following test cases: |
| (all data models test case are pending on bug: http://b/issue?id=6813492 |
| SetParameterValues does not work currently) |
| - testID: 14165033 7.6.13 Support of Data Models for GVSB (TBD) |
| - testID: 14315008 7.6.14 Support of Data Models for TR098 |
| - testID: 14165035 7.6.15 Support of Data Models for MoCA |
| - testID: 14187024 7.6.16 Support of Data Models for TR140 |
| - testID: 14314007 7.6.17 Support of Data Models for TR181 |
| - testID: 11865133 7.6.9.1 Support of Data Models: TR135 |
| - testID: 14335001 7.6.31 Implement X_CATAWAMPUS-ORG parameters |
| - testID: 14319149 7.6.29 Implement X_GOOGLE-COM data model for Sage |
| information |
| - testID: 14314008 7.6.30 Implement tr-157 PeriodicStatistics |
| |
| - testID: 14315009 7.6.20 Support of modification of |
| InternetGatewayDevice.ManagementServer.URL |
| - testID: 14314009 7.6.35 Monitor temperature |
| - testID: 14266498 7.6.36 X_CATAWAMPUS-ORG_FlashDevice Storage data |
| model (for tracking flash wear) |
| - testID: 14165038 7.6.37 Implement AutoChannelEnable in catawampus |
| - testID: 11722375 7.6.6 TR069 - SetParameter RPC Request |
| - testID: 15643845 7.6.39 NTP-Time zone configuration |
| - testID: 14165034 7.6.14.1 WiFi Test: ACS configuration |
| """ |
| |
| __author__ = 'Lehan Meng (lmeng@google.com)' |
| |
| |
| import os |
| import re |
| import string |
| import time |
| import xml.etree.ElementTree as ET |
| |
| import acs |
| import device |
| import ssh |
| import testCase |
| |
| |
| class DataModelTest(testCase.TestCase): |
| """Class for TR69 DataModel and parameter verification test. |
| |
| Configuration Parameters: |
| TBD |
| testID: 14187024 |
| title =DataModel |
| req_file = 14187024-requirement.req |
| """ |
| |
| def Init(self): |
| """Initialte the test case.""" |
| info = self.log.CreateResultInfo( |
| '---', 'Data model verification test started...') |
| self.log.SendLine(self.test_info, info) |
| self.bruno = device.Bruno(user=self.params['user'], |
| addr=self.params['addr'], |
| pwd=self.params['pwd'], |
| cmd_prompt=self.params['bruno_prompt']) |
| if not self.p_ssh: |
| self.p_ssh = ssh.SSH(user=self.params['user'], |
| addr=self.params['addr'], |
| pwd=self.params['pwd'], |
| bruno_prompt=self.params['bruno_prompt'], |
| addr_ipv6=self.params['addr_ipv6']) |
| self.p_ssh.SetLogging(self.log) |
| self.bruno.SetLogging(self.log) |
| self.bruno.dev_ssh = self.p_ssh |
| |
| self.__short_delay = 30 #short delay: 5 seconds |
| self.__delay = 80 #medium delay: 180 seconds |
| self.__long_delay = 120 #long delay: 600 seconds |
| |
| self.req_file = open(self.params['req_file'], 'r') |
| self.req_tr069_params = [] |
| self.acs_instance = acs.ACS() |
| self.acs_instance.SetLogging(self.log) |
| |
| def SetACSURL(self, url=None): |
| """Set the ACS URL on device.""" |
| if not url: |
| url = self.params['acs_url'] |
| self.p_ssh.SendCmd('echo '+ url + ' > /tmp/cwmp/acs_url') |
| self.p_ssh.GetCmdOutput()[1].rstrip() |
| |
| def GetACSURL(self): |
| """Get ACS URL from Device.""" |
| self.p_ssh.SendCmd(r'cat /tmp/cwmp/acs_url') |
| line = self.p_ssh.GetCmdOutput()[1].strip() |
| m = re.match('http[s]?://([\w-]+\.)+\.com/cwmp', line) |
| if m is None: |
| info = self.log.CreateErrorInfo('Warning', |
| 'Can not get ACS url') |
| self.log.SendLine(None, info) |
| else: |
| info = self.log.CreateProgressInfo('---', |
| 'Get ACS URL from device file: ' |
| + line) |
| self.log.SendLine(None, info) |
| return line |
| |
| def GetCurrentVersion(self): |
| """Get current software version.""" |
| self.p_ssh.SendCmd('more /etc/version') |
| return self.p_ssh.GetCmdOutput()[1].strip() |
| |
| def TestEnvCheck(self): |
| """Verify device configuration meets the test requirement.""" |
| acs_url = self.GetACSURL() |
| if acs_url != self.params['acs_url']: |
| info = self.log.CreateErrorInfo( |
| 'Warning', 'ACS URL incorrect!. Expected: ' |
| + self.params['acs_url'] + ', Current: ' |
| + acs_url) |
| self.log.SendLine(self.test_info, info) |
| #os.sys.exit(1) |
| info = self.log.CreateProgressInfo( |
| '---', 'Reset ACS URL to Expected: ' + self.params['acs_url']) |
| self.log.SendLine(self.test_info, info) |
| self.SetACSURL() |
| |
| current_version = self.GetCurrentVersion() |
| if (not current_version |
| or current_version != self.params['expected_version']): |
| info = self.log.CreateErrorInfo( |
| 'critical', 'Not expected image version. Expected: ' |
| + self.params['expected_version'] + ', Current: ' |
| + current_version) |
| self.log.SendLine(self.test_info, info) |
| else: |
| self.AddConfigParams('current_version', current_version) |
| |
| def CheckRequirement(self, chk_list, req_list): |
| """Check parameter names with parameters in requirement file. |
| |
| This file will compare the parameters from Device and from requirement |
| File. The differences between the two will be written to the result log |
| file |
| Args: |
| chk_list: the list of parameters results returned by query |
| req_list: the list of parameters from requirement |
| Returns: |
| missing_list: the list of parameters that is missing from requirement |
| net_list: the list of parameters that are newly added but not in the |
| requirement |
| """ |
| missing_list = [] |
| new_list = [] |
| #check missing parameters from Requirement |
| for req_line in req_list: |
| verify = False |
| for chk_line in chk_list: |
| if req_line == chk_line: |
| verify = True |
| elif string.find(chk_line, req_line) == 0: |
| verify = True |
| if not verify: |
| missing_list.append(req_line) |
| |
| #check newly added parameters to Requirement |
| for line in chk_list: |
| if req_list.count(line) < 1 and new_list.count(line) < 1: |
| new_list.append(line) |
| return (missing_list, new_list) |
| |
| def GetACSAppFromURL(self): |
| """Parse ACS appID from ACS URL string.""" |
| return re.sub(r'https?://(.*).appspot.com/cwmp', r'\1', |
| self.params['acs_url']) |
| |
| def GetSerialNumber(self): |
| """This function returns Bruno device serial number.""" |
| self.p_ssh.SendCmd('hnvram -r 1st_serial_number') |
| line = self.p_ssh.GetCmdOutput()[1].strip() |
| if line is None: |
| info = self.log.CreateErrorInfo('Warning', |
| 'Can not get serial number') |
| self.log.SendLine(None, info) |
| return False |
| return re.sub(r'1st_serial_number=([0-9A-Z]{12})', r'\1', line) |
| |
| def GetProductClass(self): |
| """Get product class from device system.""" |
| self.p_ssh.SendCmd('cat /etc/platform') |
| return self.p_ssh.GetCmdOutput()[1].strip() |
| |
| def BatchSetParametersValues(self, tree, param): |
| """Set multiple parameter values in a single nbi_client RPC call. |
| |
| Set multiple parameter values |
| Args: |
| tree: the parameter names are in TreeElement structure. It has |
| parameter 'writable' states and test_value to set. |
| Parameter names/paths are sorted in alphabetical order in tree. |
| param: the parameters name list that needs to be set with new value |
| """ |
| nbi_path = self.params['nbi_client_path'] |
| app_id = self.GetACSAppFromURL() |
| product_class = self.GetProductClass() |
| serial_no = self.GetSerialNumber() |
| |
| container = tree.findall('parameter_value_struct') |
| batch_size = 1 |
| counter = 0 |
| parameter_struct = '' |
| |
| for node in container: |
| name = node.findtext('name') |
| |
| if name in param: |
| nbi_type = node.findtext('nbi_type') |
| |
| if not nbi_type: |
| info = self.log.CreateErrorInfo('Warning', |
| 'Can not determin parameter type!') |
| self.log.SendLine(None, info) |
| |
| test_value = node.findtext('test_value') |
| if nbi_type == 'string_value': |
| test_value = '"' + test_value + '"' |
| if (node.findtext('writable') == '1' |
| or node.findtext('writable') == 'true'): |
| parameter_struct += ('parameter_value_struct: <name: "' |
| + name + '" ' + nbi_type + ': ' |
| + test_value + '> ') |
| counter += 1 |
| else: |
| continue |
| |
| if counter >= batch_size: |
| counter = 0 |
| cmd = (nbi_path |
| + ' --server /gns/project/apphosting/stubby/prod-appengine/' |
| '*/' + app_id + '/default/* --service CpeParametersService ' |
| '--method SetParameterValues --request \'' |
| 'cpe_id: <oui: "F88FCA" product_class: "' + product_class |
| + '" serial_no: "' + serial_no |
| + '"> request: <parameter_list < ' |
| + parameter_struct + ' > parameter_key: "myParamKey">\'') |
| print cmd |
| parameter_struct = '' |
| time_stamp = self.GetTimeStamp() |
| call_result = self.acs_instance.StubbyClientCall(cmd) |
| print call_result |
| if not self.acs_instance.CheckCmdCall(call_result): |
| # command not succeed according to nbi_client reply |
| # However, there is possibility that the request is successful, |
| # nbi_client reports a failure only because the timeout 60s. |
| # check device log to see if this request actually fails |
| self.p_ssh.SendCmd('dmesg | grep cwmpd:') |
| # read the last 500 lines from log |
| log_list = self.p_ssh.GetCmdOutput(500) |
| log_list.reverse() |
| reply_msg = self.ParseDeviceLogForError( |
| time_stamp, log_list, name, 'SetParameterValues') |
| if '<soap:Fault>' in reply_msg[0] or '<soap:fault>' in reply_msg[0]: |
| # failed to set parameter values (on nbi_client and device): |
| info = self.log.CreateErrorInfo( |
| 'Critical', 'Batch SetParameterValues() failed on Parameter: ' |
| + name + '. Parameter type: ' + nbi_type |
| + '. Set test value: ' + test_value + '. Error message: \n' |
| + ''.join(reply_msg)) |
| self.log.SendLines(self.test_info, info) |
| elif '<cwmp:SetParameterValuesResponse>' in reply_msg[0]: |
| # nbi_reports failure, but success on device |
| info = self.log.CreateResultInfo( |
| '---', 'Batch SetParameterValues() Succeeded on Parameter: ' |
| + name + '. Parameter type: ' + nbi_type |
| + '. Set test value: ' + test_value) |
| self.log.SendLine(self.test_info, info) |
| else: |
| info = self.log.CreateErrorInfo( |
| 'Critical', 'Batch SetParameterValues() failed on Parameter: ' |
| + name + '. Parameter type: ' + nbi_type |
| + '. Set test value: ' + test_value + '. Error message: \n' |
| + ''.join(reply_msg)) |
| self.log.SendLines(self.test_info, info) |
| else: |
| # Succeed, logging to result file: |
| self.log.SendTestData(call_result) |
| info = self.log.CreateResultInfo( |
| '---', 'Batch SetParameterValues() Succeeded on Parameter: ' |
| + name + '. Parameter type: ' + nbi_type |
| + '. Set test value: ' + test_value) |
| self.log.SendLine(self.test_info, info) |
| |
| def SetParameterValues(self, param='', param_type='', value=''): |
| """Set parameter values on Device. |
| |
| Set parameter values |
| Args: |
| param: the parameter name that needs to be set with new value |
| param_type: type of value: string, boolean, long_int, etc |
| value: parameter value |
| Returns: |
| return setParameterValues if succeeded. |
| return error information if failed |
| """ |
| nbi_path = self.params['nbi_client_path'] |
| app_id = self.GetACSAppFromURL() |
| product_class = self.GetProductClass() |
| serial_no = self.GetSerialNumber() |
| # parse type: |
| if 'string' in param_type: |
| param_type = 'string_value' |
| value = '"' + value + '"' |
| elif 'bool' in param_type: |
| param_type = 'boolean_value' |
| elif 'unsignedInt' in param_type: |
| param_type = 'long_value' |
| else: |
| info = self.log.CreateErrorInfo( |
| 'Critical', 'Unknown parameter type: ' + param_type) |
| self.log.SendLines(self.test_info, info) |
| os.sys.exit(1) |
| |
| cmd = (nbi_path + ' --server /gns/project/apphosting/stubby/prod-appengine/' |
| '*/' + app_id + '/default/* --service CpeParametersService ' |
| '--method SetParameterValues --request \'' |
| 'cpe_id: <oui: "F88FCA" product_class: "' + product_class |
| + '" serial_no: "' + serial_no + '"> request: <parameter_list < ' |
| 'parameter_value_struct: <name: "' + param + |
| '" ' + param_type + ': ' + value |
| + '> > parameter_key: "myParamKey">\'') |
| print cmd |
| call_result = self.acs_instance.StubbyClientCall(cmd) |
| if 'FAILED' in call_result[0]: |
| info = self.log.CreateErrorInfo( |
| 'Warning', 'ACS failed on SetParameterValues query: ' + cmd) |
| self.log.SendLines(self.test_info, info) |
| return False |
| else: |
| print call_result |
| return call_result |
| |
| def GetParameterNames(self, param='.', next_level=True): |
| """Get next level parameter names of the current queried parameter. |
| |
| The next level parameter names will be output to result file |
| Due to the large number of parameter instances, parameters in the same |
| parameter family will be grouped together into a single nbi_client |
| RPC call (getParameterNames() call, this call will feedback the access |
| state information). In order to minimize the total processing time, as |
| well as the number of RPC calls to ACS appEngine. This will greatly |
| reduce the execution time. |
| Each getParameterNames call will be created as an individual thread. |
| And multiple calls are made in parallel. |
| |
| Args: |
| param: |
| This is the parameter that is queried for its next level names |
| Note that currently GetParameterNames does not support multiple |
| parameter names in a query command. If parameter name is a dot, |
| and the next level is 'False', then device will return all |
| parameters under the queried parameter path |
| next_level: |
| if this parameter is True: |
| only the next level parameter names are returned |
| if this parameter is False: |
| all parameter in the requested parameter path will be returned |
| Returns: |
| return the nbi_client RPC call response. Response can be used to |
| parse for parameters' name, access attribute, etc. |
| """ |
| nbi_path = self.params['nbi_client_path'] |
| app_id = self.GetACSAppFromURL() |
| product_class = self.GetProductClass() |
| serial_no = self.GetSerialNumber() |
| if next_level: |
| next_level = 'true' |
| else: |
| next_level = 'false' |
| cmd = (nbi_path + ' --server /gns/project/apphosting/stubby/prod-appengine/' |
| '*/' + app_id + '/default/* --service CpeParametersService ' |
| '--method GetParameterNames --request \'' |
| 'cpe_id: <oui: "F88FCA" product_class: "' + product_class |
| + '" serial_no: "' + serial_no + '"> request: <parameter_path: "' |
| + param + '" next_level: ' + next_level + '>\'') |
| print cmd |
| call_result = self.acs_instance.StubbyClientCall(cmd) |
| if 'FAILED' in call_result[0]: |
| self.nbi_result = [] |
| info = self.log.CreateErrorInfo( |
| 'Warning', 'ACS failed on GetParameterNames query: ' + cmd) |
| self.log.SendLines(self.test_info, info) |
| return False |
| else: |
| self.nbi_result = call_result |
| print call_result |
| return call_result |
| |
| def GetParameterValues(self, param=''): |
| """Make a Get parameter value(s) query at ACS to request the device. |
| |
| This method get value(s) of the designated parameter(s) |
| Args: |
| param: this is the parameter (path) that need to get value(s) from. |
| 1. If this path ends with a dot '.', all parameters |
| under this path will be queried for values. |
| 2. If this path does not end with a '.', the parameter |
| must be a leaf parameter which has a value. |
| Returns: |
| This method returns a list of parameter values and record them to file. |
| """ |
| nbi_path = self.params['nbi_client_path'] |
| if param == '.': |
| param = '' |
| app_id = self.GetACSAppFromURL() |
| product_class = self.GetProductClass() |
| serial_no = self.GetSerialNumber() |
| cmd = (nbi_path + ' --server /gns/project/apphosting/stubby/prod-appengine/' |
| '*/' + app_id + '/default/* --service CpeParametersService ' |
| '--method GetParameterValues --request \'' |
| 'cpe_id: <oui: "F88FCA" product_class: "' + product_class |
| + '" serial_no: "' + serial_no + '"> request: <parameter_names: "' |
| +param + '">\'') |
| |
| print cmd |
| call_result = self.acs_instance.StubbyClientCall(cmd) |
| if 'FAILED' in call_result[0]: |
| self.nbi_result = [] |
| info = self.log.CreateErrorInfo( |
| 'Warning', 'ACS failed on GetParameterValues query: ' + cmd) |
| self.log.SendLines(self.test_info, info) |
| return False |
| else: |
| self.nbi_result = call_result |
| print call_result |
| return call_result |
| |
| def FindEndofRequest(self, log_list, index, pattern): |
| """Find the end of a request string from device log. |
| |
| Some times the close tag, for example: </cwmp:SetParameterValues> is |
| separated to two consecutive log lines, due to the line length limitation. |
| This method try to search the request closing tag on separate lines |
| |
| Args: |
| log_list: the device log list. |
| index: the index of the line to search for tag |
| pattern: the closing tag that need to be searched in log file |
| Returns: |
| True: if pattern find in log |
| False: if pattern not find in log |
| """ |
| if index >= len(log_list)-1: |
| if pattern in log_list[index]: |
| return True |
| else: |
| return False |
| |
| s = re.sub(r'\[\s*\d*\.\d*\]\s*cwmpd:\s*(.*)', r'\1', log_list[index + 1]) |
| s = log_list[index].strip() + s.strip() |
| |
| if pattern in s: |
| return True |
| return False |
| |
| def MatchParameterName(self, line, param_name, rpc): |
| """Match rpc request logs to a parameter name.""" |
| if rpc == 'GetParameterNames': |
| m = re.search(r'.*<ParameterPath>(.*)</ParameterPath>.*', line) |
| if m is not None and param_name in m.group(1): |
| return True |
| else: |
| return False |
| elif rpc == 'GetParameterValues': |
| m = re.search(r'.*<cwmp:GetParameterValues>.*<ParameterNames.*>(.*)' |
| '</ParameterNames></cwmp:GetParameterValues>.*', line) |
| if m is not None and param_name in m.group(1): |
| return True |
| else: |
| return False |
| elif rpc == 'SetParameterValues': |
| m = re.search(r'.*<Name>(.*)</Name><Value.*', line) |
| if m is not None and param_name in m.group(1): |
| return True |
| else: |
| return False |
| else: |
| return False |
| |
| def ParseDeviceLogForError(self, time_stamp, log_list, param, rpc): |
| """Parse the log information from device, Looking for cwmp errors. |
| |
| Args: |
| time_stamp: looking for log events that happened only after this time |
| log_list: the cwmp event information obtained from device log |
| param: the parameter name to get value from |
| rpc: the rpc function name to search in log |
| Returns: |
| return cwmp error message found in device log. |
| """ |
| |
| reply_msg = [] |
| start_of_rpc_reply = False |
| end_of_rpc_reply = False |
| start_of_rpc_request = False |
| end_of_rpc_request = False |
| param_matching = False |
| current_index = 0 |
| |
| for line in log_list: |
| # check only the getParameter query starts after the specified time: |
| if float(self.GetTimeStamp(line)) > float(time_stamp): |
| if 'CPE RECEIVED (at' in line and not start_of_rpc_request: |
| index = log_list.index(line) |
| # macthing the request query in log messages |
| for i in range(index+1, len(log_list)-1): |
| if i < len(log_list): |
| # match rpc request string |
| if '<cwmp:' + rpc + '>' in log_list[i]: |
| start_of_rpc_request = True |
| # match rpc request string end |
| if (self.FindEndofRequest(log_list, i, '</cwmp:' + rpc + '>') |
| and start_of_rpc_request): |
| if (start_of_rpc_request and end_of_rpc_request |
| and not param in log_list[i]): |
| end_of_rpc_request = True |
| # rpc request scan completed, requested parameter not matched |
| # this request is not request of interest |
| start_of_rpc_request = False |
| end_of_rpc_request = False |
| break |
| # match queried parameter in rpc request |
| if start_of_rpc_request and not end_of_rpc_request: |
| if self.MatchParameterName(log_list[i], param, rpc): |
| param_matching = True |
| current_index = i |
| break |
| |
| # search for end of rpc request command |
| if param_matching and not end_of_rpc_request: |
| for i in range(current_index, len(log_list)-1): |
| if i < len(log_list): |
| # match rpc request string end |
| if self.FindEndofRequest(log_list, i, '</cwmp:' + rpc + '>'): |
| end_of_rpc_request = True |
| current_index = i |
| break |
| |
| # search for rpc request reply message |
| if param_matching and end_of_rpc_request: |
| for i in range(current_index, len(log_list)-1): |
| if i < len(log_list): |
| if ('<soap:Fault>' in log_list[i] or '<soap:fault>' in log_list[i] |
| or '<cwmp:' + rpc + 'Response>' in log_list[i]): |
| start_of_rpc_reply = True |
| if (' </soap:Fault>' in log_list[i] |
| or ' </soap:fault>' in log_list[i] |
| or '</cwmp:' + rpc + 'Response>' in log_list[i]): |
| end_of_rpc_reply = True |
| reply_msg.append(log_list[i].strip()+'\n') |
| if start_of_rpc_reply and not end_of_rpc_reply: |
| reply_msg.append(log_list[i].strip()+'\n') |
| if start_of_rpc_reply and end_of_rpc_reply: |
| return reply_msg |
| return ['No rpc message is available in device log'] |
| |
| def GetParameterListFromDeviceLog(self, time_stamp, param=''): |
| """Parse TR069 data model parameters from Device log information. |
| |
| Args: |
| time_stamp: looking for log events that happened only after this time |
| param: the cwmp parameter to search in device log |
| Returns: |
| return parameter list if succeeded |
| return false otherwise |
| """ |
| if param == '.': |
| param = '' |
| param_list = [] |
| param_list_start = False |
| param_list_end = False |
| self.device_log_result = [] |
| |
| self.p_ssh.SendCmd('dmesg | grep cwmpd:') |
| log_list = self.p_ssh.GetCmdOutput() |
| log_list.reverse() |
| |
| for line in log_list: |
| # check only the getParameter query starts after the last known event: |
| if float(self.GetTimeStamp(line)) > float(time_stamp): |
| if '<cwmp:GetParameterValues>' in line: |
| # check the parameter query from nbi_client has been answered: |
| m = re.search(r'.*<cwmp:GetParameterValues>.*<ParameterNames.*>(.*)' |
| '</ParameterNames></cwmp:GetParameterValues>.*', line) |
| if m is not None and param in m.group(1): |
| index = log_list.index(line) |
| for i in range(index+1, index+50): |
| if '<cwmp:GetParameterValuesResponse>' in log_list[i]: |
| param_list_start = True |
| break |
| continue |
| |
| if '</cwmp:GetParameterValuesResponse>' in line: |
| if param_list_start: |
| param_list_end = True |
| break |
| |
| if param_list_start: |
| self.device_log_result.append(line.strip()) |
| if '<Name>' in line: |
| output = re.sub(r'.*(<Name>)(.*)(</Name>)', r'\2', line) |
| output = output.strip() |
| output = re.sub(r'(\.)[\d]+(\.)', r'\1{i}\2', output) |
| if param_list.count(output) < 1: |
| param_list.append(output) |
| |
| if (not param_list_start) or (not param_list_end): |
| error_msg = self.ParseDeviceLogForError( |
| time_stamp, log_list, param, 'GetParameterValues') |
| if 'fault' in error_msg[0] or 'Fault' in error_msg[0]: |
| # error occurred in rpc request |
| info = self.log.CreateErrorInfo( |
| 'Critical', 'Exit! Error while get values: ' + ''.join(error_msg)) |
| self.log.SendLines(self.test_info, info) |
| os.sys.exit(1) |
| else: |
| # no error in request, request still in progress |
| return False |
| return param_list |
| |
| def CreateElement(self, param_name, param_type_device, |
| param_type_nbi, param_value): |
| """Create a new parameter node that contains parameter name and value. |
| |
| Currently try to set the same value as being read from the device. |
| Parameter type may have different representations on different platforms. |
| E.g.: type 'long_value' on nbi_client is 'unsignedInt' on bruno. |
| type 'string_value' on nbi_client is 'string' on bruno |
| Args: |
| param_name: the name of the parameter (Path) |
| param_type_device: the type of the parameter represented by device format |
| param_type_nbi: the type of parameter, represented by nbi_client format |
| param_value: the value of the parameter |
| Returns: |
| return the created element node |
| """ |
| e = ET.Element('parameter_value_struct') |
| name = ET.Element('name') |
| name.text = param_name |
| e.append(name) |
| device_type = ET.Element('device_type') |
| device_type.text = param_type_device |
| e.append(device_type) |
| nbi_type = ET.Element('nbi_type') |
| if not param_type_nbi: |
| if param_type_device == 'unsignedInt': |
| param_type_nbi = 'long_value' |
| elif param_type_device == 'boolean': |
| param_type_nbi = 'boolean_value' |
| elif param_type_device == 'string': |
| param_type_nbi = 'string_value' |
| else: |
| info = self.log.CreateErrorInfo( |
| 'Critical', 'Exit! Unknown (new) Parameter types found from device:' |
| ': ' + param_type_device + '. Define as string_value') |
| self.log.SendLine(self.test_info, info) |
| param_type_nbi = 'string_value' |
| nbi_type.text = param_type_nbi |
| e.append(nbi_type) |
| value = ET.Element('value') |
| value.text = param_value |
| e.append(value) |
| test_value = ET.Element('test_value') |
| test_value.text = param_value |
| e.append(test_value) |
| access_state = ET.Element('writable') |
| access_state.text = 'true' |
| e.append(access_state) |
| return e |
| |
| def SortElementTree(self, tree): |
| """Sort the parameters in element tree, in alphabetical order of names. |
| |
| Args: |
| tree: the ElementTree data structure that holds the parameter list |
| Returns: |
| return the sorted tree data structure |
| """ |
| data = [] |
| container = tree.findall('parameter_value_struct') |
| for node in container: |
| key = node.findtext('name') |
| data.append((key, node)) |
| |
| root = ET.Element(tree.getroot().tag, tree.getroot().attrib) |
| root.text = tree.getroot().text |
| data.sort() |
| for node in data: |
| root.append(node[1]) |
| return ET.ElementTree(root) |
| |
| def ParseToElementTree(self): |
| """Parse the parameter list to Element Tree structure.""" |
| root = ET.Element('Bruno_Automation_Test', name='Set_Parameter_Value_Test', |
| testCases=self.test_info['testID']) |
| root.text = 'This structure stores all parameters and values for this test' |
| |
| if self.nbi_result: |
| for line in self.nbi_result: |
| if 'name:' in line: |
| param_name = re.sub('.*\"(.*)\".*', r'\1', line).strip() |
| index = self.nbi_result.index(line) |
| param_type = self.nbi_result[index+1].split(':')[0].strip() |
| param_value = self.nbi_result[index+1].split(':')[1].strip() |
| root.append(self.CreateElement(param_name, None, |
| param_type, param_value)) |
| elif self.device_log_result: |
| #self.device_log_result.reverse() |
| for line in self.device_log_result: |
| if '<Name>' in line: |
| param_name = re.sub(r'.*(<Name>)(.*)(</Name>)', r'\2', line).strip() |
| |
| index = self.device_log_result.index(line) + 1 |
| while not 'Value' in self.device_log_result[index]: |
| index += 1 |
| if index >= len(self.device_log_result) or '<Name>' in line: |
| info = self.log.createErrorInfo( |
| 'Critical', 'Mis-matching name-value pair in log file!') |
| self.log.sendLine(self.test_info, info) |
| return False |
| |
| line = self.device_log_result[index] |
| # In case value string spans multiple lines: |
| while '</Value>' not in line: |
| index +=1 |
| if index >= len(self.device_log_result): |
| info = self.log.CreateErrorInfo( |
| 'Critical', 'Exit! Cannot find matching </Value> tag in log!') |
| self.log.SendLine(self.test_info, info) |
| os.sys.exit(1) |
| new_line = self.device_log_result[index] |
| # if <Name> or EOF reached before a matching </Value> is found |
| if '<Name>' in new_line: |
| info = self.log.CreateErrorInfo( |
| 'Critical', 'Exit! Cannot find matching </Value> tag in log!') |
| self.log.SendLine(self.test_info, info) |
| os.sys.exit(1) |
| new_line = re.sub('\[\s*\d*\.\d{3}\]\s*cwmpd:\s*(.*)', |
| r'\1', new_line).strip() |
| line += new_line.strip() |
| |
| param_type = re.search('.*type\s*=\s*\"[a-zA-Z]*:([a-zA-Z]*)\">.*', |
| line, re.DOTALL).group(1) |
| param_value = re.search('.*<Value\s*[a-zA-Z]*:type="[a-zA-Z]*:' |
| '[a-zA-Z]*">(.*)</Value>', |
| line, re.DOTALL).group(1) |
| root.append(self.CreateElement(param_name, param_type, |
| None, param_value)) |
| else: |
| info = self.log.CreateErrorInfo( |
| 'Critical', 'Failed to get parameter list from device!') |
| self.log.SendLine(self.test_info, info) |
| return False |
| |
| tree = ET.ElementTree(root) |
| tree = self.SortElementTree(tree) |
| return tree |
| |
| def ParametersFromXMLFile(self, file_path=None): |
| """Read parameter data from an XML format file. |
| |
| Args: |
| file_path: file path to the xml file |
| Returns: |
| return the parameter list stored in the file, in the format of ElementTree |
| """ |
| f = None |
| try: |
| if not file_path: |
| f = open(self.params['para_tree_file'], 'r') |
| else: |
| f = open(file_path, 'r') |
| tree = ET.parse(f) |
| except IOError, inst: |
| err_string = ('Exit! Unexpected error opening ' |
| + str(f.name) + '. ' + str(inst)) |
| info = self.log.CreateErrorInfo('Critical', err_string) |
| self.log.SendLine(self.test_info, info) |
| os.sys.exit(1) |
| return tree |
| |
| def ParametersToXMLFile(self, tree=None, file_path=None): |
| """Output the available parameter instance tree to XML format file. |
| |
| Parameter instance results will be written to XML formated file, |
| with test value that has been set for the parameter(s) |
| For those parameters that failed to set value, they are recorded in |
| the result file. |
| |
| Args: |
| tree: the parameters structure to parse to file |
| file_path: path to the output file |
| """ |
| if not tree: |
| tree = self.para_tree |
| if not file_path: |
| fout = open(self.params['para_tree_file'], 'w') |
| else: |
| fout = open(file_path, 'w') |
| |
| try: |
| tree.write(fout) |
| except IOError, inst: |
| err_string = ('Exit! Unexpected error writing to file ' |
| + fout.name() + str(inst)) |
| info = self.log.CreateErrorInfo('Critical', err_string) |
| self.log.SendLine(self.test_info, info) |
| os.sys.exit(1) |
| fout.close() |
| |
| def GetRequirementFromFile(self): |
| """Get TR69 data model parameters from requirement file. |
| |
| The format of the requirement file is as follows: |
| 1. Each line represents a data model parameter |
| 2. If a parameter has indexed nodes, index numbers are replaced by {i} |
| ###### Begin of File ###### |
| Device.Bridging. |
| Device.Bridging.Bridge.{i}. |
| Device.Bridging.Bridge.{i}.Port.{i}. |
| Device.Bridging.Bridge.{i}.Port.{i}.Stats. |
| ... ... ... |
| Device.DeviceInfo.Manufacturer |
| Device.DeviceInfo.ManufacturerOUI |
| X_GOOGLE-COM_GVSB.EpgSecondary |
| X_GOOGLE-COM_GVSB.GvsbChannelLineup |
| X_GOOGLE-COM_GVSB.GvsbKick |
| X_GOOGLE-COM_GVSB.GvsbServer |
| ###### End of File ###### |
| Returns: |
| return the requirement parameters in a list |
| """ |
| req_list = [] |
| for line in self.req_file: |
| line = line.strip() |
| if req_list.count(line) < 1: |
| req_list.append(line) |
| return req_list |
| |
| def GetTimeStamp(self, line=None): |
| """Extract the time information of most current event from Device log. |
| |
| i.e., the time that the last event happens since reboot (in seconds) |
| Args: |
| line: a line of device log which contains event time information. |
| If it has value of 'None', then retrieve the last a few lines from |
| device log to extract time information. |
| Returns: |
| return the time stamp string if succeed, return -1 otherwise. |
| """ |
| if line is None: |
| # get time stamp from Device log file |
| self.p_ssh.SendCmd('dmesg | grep cwmpd:') |
| # search the last 30 lines in Device log for time information |
| latest_log_entry = 30 |
| log_list = self.p_ssh.GetCmdOutput(latest_log_entry) |
| |
| for line in log_list: |
| m = re.match('\\[[\s]*(\d*\\.\d{3})\\]', line) |
| if m is not None: |
| # Match, return the time stamp |
| return m.group(1) |
| else: |
| # get time stamp from a Device log line |
| m = re.match('\\[[\s]*(\d*\\.\d{3})\\]', line) |
| if m is not None: |
| # Match, return the time stamp |
| return m.group(1) |
| # No time stamp found |
| info = self.log.CreateErrorInfo( |
| 'Critical', 'Cannot get time information from Device log!') |
| self.log.SendLine(self.test_info, info) |
| os.sys.exit(1) |
| |
| def VerifyAllReqParameters(self): |
| """Check available parameters and compare to requirement list. |
| |
| This method checks all available parameters on current image, and |
| compare the list with requirement list. It will find out: |
| 1. the parameters that are missing from the requirement list. |
| 2. the newly added parameters that are not in the requirement list. |
| Both lists are written to the result file. |
| |
| This method also output all parameter instances on current device to |
| a xml formated file, together with their types and values |
| """ |
| # get parameter name lists, compare with requirement list |
| info = self.log.CreateProgressInfo( |
| '30%', 'Get parameters from nbi_client ...') |
| self.log.SendLine(self.test_info, info) |
| time_stamp = self.GetTimeStamp() |
| chk_list = self.acs_instance.ParseParameterNames(self.GetParameterValues(), |
| True) |
| |
| if not chk_list: |
| info = self.log.CreateProgressInfo( |
| '35%', 'ACS failed to reply, get parameters from device log ...') |
| self.log.SendLine(self.test_info, info) |
| # nbi client may fail because of timeout |
| # if that is the case, then get parameter list from device log |
| count = 0 |
| while not chk_list: |
| time.sleep(self.__delay) |
| chk_list = self.GetParameterListFromDeviceLog(time_stamp) |
| count += 1 |
| if count*self.__delay >= self.__long_delay: |
| info = self.log.CreateErrorInfo( |
| 'Critical', 'Can not retrieve complete log info from device') |
| self.log.SendLine(self.test_info, info) |
| os.sys.exit(1) |
| |
| if not chk_list: |
| info = self.log.CreateProgressInfo( |
| '40%', 'GetParameterValue query in progress, retry after ' |
| + str(self.__delay) + ' seconds.') |
| self.log.SendLine(self.test_info, info) |
| chk_list.sort() |
| |
| info = self.log.CreateProgressInfo( |
| '55%', 'GetParameterValue query completed, verify parameter list ' |
| 'with parameter requirement document.') |
| self.log.SendLine(self.test_info, info) |
| req_list = self.GetRequirementFromFile() |
| (mis_list, new_list) = self.CheckRequirement(chk_list, req_list) |
| mis_list.sort() |
| new_list.sort() |
| self.log.SendTestData(['############# Missing Parameters #############']) |
| self.log.SendTestData(mis_list) |
| |
| self.log.SendTestData(['############ Newly Added Parameters ############']) |
| self.log.SendTestData(new_list) |
| |
| info = self.log.CreateProgressInfo( |
| '75%', 'Verify current parameter list with requirement completed. ' |
| 'Check result file:' + self.log.params['f_error'] + ' for details.') |
| self.log.SendLine(self.test_info, info) |
| |
| # write parameter instance to file |
| self.para_tree = self.ParseToElementTree() |
| self.ParametersToXMLFile() |
| info = self.log.CreateProgressInfo( |
| '100%', 'Update parameter file: ' + self.params['para_tree_file'] |
| + ' completed.') |
| self.log.SendLine(self.test_info, info) |
| info = self.log.CreateResultInfo( |
| 'Pass', 'Verification completed. ' |
| 'Check result file: ' + self.log.params['f_result'] + ' for missing ' |
| 'parameters and new parameters.') |
| self.log.SendLine(self.test_info, info) |
| |
| def GetParameterAccessStateFromDeviceLog(self, time_stamp=None, param='.'): |
| """Get parameter access state from device logs. |
| |
| Args: |
| time_stamp: search events only after this time in log file |
| param: the parameter to search in device log |
| Returns: |
| return dictionary: parameter names as key, and writable status as value |
| """ |
| self.p_ssh.SendCmd('dmesg | grep cwmpd:') |
| log_list = self.p_ssh.GetCmdOutput() |
| log_list.reverse() |
| |
| param_state_dict = {} |
| param_list_start = False |
| param_list_end = False |
| |
| for line in log_list: |
| # check only the getParameter query starts after the last known event: |
| if float(self.GetTimeStamp(line)) > float(time_stamp): |
| if '<cwmp:GetParameterNames>' in line: |
| # check the parameter query from nbi_client has been answered: |
| m = re.search( |
| r'.*<cwmp:GetParameterNames>.*<ParameterPath>(.*)' |
| '</ParameterPath>.*<NextLevel>false</NextLevel>.*' |
| '</cwmp:GetParameterNames>.*', line) |
| if m and param in line: |
| index = log_list.index(line) |
| for i in range(index+1, index+30): |
| if i < len(log_list): |
| if '<cwmp:GetParameterNamesResponse>' in log_list[i]: |
| param_list_start = True |
| break |
| continue |
| |
| if '</cwmp:GetParameterNamesResponse>' in line: |
| if param_list_start: |
| param_list_end = True |
| break |
| |
| if param_list_start: |
| if '<Name>' in line: |
| name = re.search(r'.*<Name>(.*)</Name>', line).group(1).strip() |
| index = log_list.index(line)+1 |
| if '<Writable>' in log_list[index]: |
| access = re.sub(r'.*<Writable>(.*)</Writable>', |
| r'\1', log_list[index]).strip() |
| if access == '1': |
| access = 'true' |
| else: |
| access = 'false' |
| param_state_dict.update({name: access}) |
| |
| if (not param_list_start) or (not param_list_end): |
| error_msg = self.ParseDeviceLogForError( |
| time_stamp, log_list, param, 'GetParameterNames') |
| if 'fault' in error_msg[0] or 'Fault' in error_msg[0]: |
| info = self.log.CreateErrorInfo( |
| 'Critical', 'Exit! Error while get values: ' + ''.join(error_msg)) |
| self.log.SendLines(self.test_info, info) |
| os.sys.exit(1) |
| else: |
| return False |
| return param_state_dict |
| |
| def UpdateParameterAccessState(self, tree, param_name='.'): |
| """Update parameters' access state (read-only, read-write status). |
| |
| TR69 parameters could be either in a read-only state, or a writable |
| state. The setParameterValues() test only verifies those parameters |
| in the 'writable' state. Try to set them with new values. |
| This method is called to determine the state of each supported parameter |
| |
| Args: |
| tree: |
| the ElementTree data structure that holds the parameter attribute |
| list in XML format. This tree needs to be sorted before passed to |
| this function, according to the alphabetical order of parameter |
| names. |
| param_name: |
| the parameter name to update the access state |
| Returns: |
| returns this tree with updated parameter access state information |
| """ |
| time_stamp = self.GetTimeStamp() |
| cmd_list = self.GetParameterNames(param_name, False) |
| if not self.acs_instance.CheckCmdCall(cmd_list): |
| info = self.log.CreateProgressInfo( |
| '---', 'GetParameterNames() query failed on ACS. Analyzing device' |
| ' logs...') |
| self.log.SendLine(self.test_info, info) |
| |
| param_dict = self.GetParameterAccessStateFromDeviceLog(time_stamp, |
| param_name) |
| while not param_dict: |
| info = self.log.CreateProgressInfo( |
| '---', 'GetParameterNames() analyze in progress, wait for ' |
| + str(self.__short_delay) + ' seconds.') |
| self.log.SendLine(self.test_info, info) |
| time.sleep(self.__short_delay) |
| param_dict = self.GetParameterAccessStateFromDeviceLog(time_stamp, |
| param_name) |
| else: |
| param_dict = self.acs_instance.ParseParameterAccessStates(cmd_list) |
| for key in param_dict.keys(): |
| if param_name.endswith('.') and param_name != '.': |
| param_dict[param_name + key] = param_dict.pop(key) |
| else: |
| param_dict[param_name + '.' + key] = param_dict.pop(key) |
| |
| container = tree.findall('parameter_value_struct') |
| |
| for node in container: |
| name = node.findtext('name') |
| if name in param_dict.keys(): |
| e = node.find('writable') |
| if not e: |
| # Create this tag: |
| e = ET.Element('writable') |
| e.text = param_dict[name] |
| node.append(e) |
| else: |
| e.text = param_dict[name] |
| else: |
| continue |
| return tree |
| |
| def MarkParametersToSet(self, param_list, param): |
| """In the sorted tree, mark all parameters before param to read-only.""" |
| if not param: |
| return param_list |
| |
| ls = list(param_list) |
| for name in param_list: |
| if name != param: |
| ls.remove(name) |
| else: |
| break |
| return ls |
| |
| def VerifySetAllParameterValues(self): |
| """This verifies the setParameterValues() on all supported parameters. |
| |
| This method tests the 'set' capability on all parameters. According |
| to the data type, the value will be set as 'string', 'long_int', 'boolean', |
| etc. Note the current test only verify the 'set' action is supported by |
| each parameter. Therefore, it will try to set the same default value on each |
| parameter (it does not change the default value of each parameter). |
| |
| This test assumes that the parameter list has already been retrieved from |
| device by 'GetparameterValues', and has been written to the parameter XML |
| file. |
| """ |
| tree = self.ParametersFromXMLFile() |
| #tree = self.UpdateParameterAccessState(tree) |
| tree = self.SortElementTree(tree) |
| param_list = [] |
| # Currently Assume all parameters are read-write |
| container = tree.findall('parameter_value_struct') |
| |
| for node in container: |
| #name = node.findtext('name') |
| #test_value = node.findtext('test_value') |
| dev_type = node.findtext('device_type') |
| nbi_type = None |
| if dev_type == 'unsignedInt': |
| nbi_type = 'long_value' |
| elif dev_type == 'boolean': |
| nbi_type = 'boolean_value' |
| elif dev_type == 'string': |
| nbi_type = 'string_value' |
| else: |
| info = self.log.CreateErrorInfo( |
| 'Critical', 'Exit! Unknown (new) Parameter types found from device:' |
| ': ' + dev_type) |
| self.log.SendLine(self.test_info, info) |
| os.sys.exit(1) |
| node.find('nbi_type').text = nbi_type |
| param_list.append(node.findtext('name')) |
| e = node.find('writable') |
| if not e: |
| access_state = ET.Element('writable') |
| access_state.text = 'true' |
| node.append(access_state) |
| param_list.sort() |
| param_list = self.MarkParametersToSet( |
| param_list, self.params['from_which_param_to_set']) |
| self.BatchSetParametersValues(tree, param_list) |
| |
| def VerifyNTPTimeZoneConfig(self): |
| """Verify the Time zone configuration is received and processed by Device. |
| |
| Check the log of device, to find TR69 parameter: |
| InternetGatewayDevice.Time. |
| is pushed down to device. Also verify the device correctly updated local |
| time information. |
| """ |
| ## Blocked by bug: http://b/issue?id=6804551 |
| ## TODO(lmeng): add set values and verify set values succeeded |
| self.p_ssh.SendCmd('dmesg | grep cwmpd:') |
| log_list = self.p_ssh.GetCmdOutput() |
| time_stamp = self.GetTimeStamp() |
| log_list = self.ParseDeviceLogForError(time_stamp, log_list, |
| 'InternetGatewayDevice.Time', |
| 'SetParameterValues') |
| if 'No rpc message is available in device log' in log_list[0]: |
| info = self.log.CreateResultInfo( |
| '---', 'Failed. Can not find TR69 Time zone information in device log') |
| self.log.SendLine(self.test_info, info) |
| elif 'fault' in log_list[0]: |
| info = self.log.CreateResultInfo( |
| '---', 'Failed to apply Time zone configuration on device.') |
| self.log.SendLine(self.test_info, info) |
| else: |
| info = self.log.CreateResultInfo( |
| '---', 'Succeed to setup Time zone on deivce.') |
| self.log.SendLine(self.test_info, info) |
| |
| def VerifyManagementServerURL(self): |
| """Verify set and get Management Server URL from device.""" |
| # This test is blocked by bug: http://b/issue?id=6813492 |
| mngmt_url_param = 'InternetGatewayDevice.ManagementServer.URL' |
| url_from_dev_param = self.acs_instance.ParseManagementURL( |
| self.GetParameterValues(mngmt_url_param)) |
| info = self.log.CreateProgressInfo( |
| '15%', 'Get management url from TR069 parameter: ' |
| + mngmt_url_param + '. Value: ' + url_from_dev_param) |
| self.log.SendLine(self.test_info, info) |
| if not url_from_dev_param: |
| info = self.log.CreateProgressInfo( |
| '30%', 'Can not found Value for: ' |
| 'InternetGatewayDevice.ManagementServer.URL' |
| '. Verify to set default URL: ' + self.params['default_url']) |
| self.log.SendLine(self.test_info, info) |
| url_from_dev_param = self.params['default_url'] |
| |
| info = self.log.CreateProgressInfo( |
| '50%', 'Set parameter value for: ' |
| 'InternetGatewayDevice.ManagementServer.URL' |
| '. Value: ' + url_from_dev_param) |
| self.log.SendLine(self.test_info, info) |
| result = self.SetParameterValues(mngmt_url_param, 'string', |
| url_from_dev_param) |
| if not result: |
| info = self.log.CreateErrorInfo( |
| 'Critical', 'Failed to set parameter: ' |
| 'InternetGatewayDevice.ManagementServer.URL') |
| self.log.SendLine(self.test_info, info) |
| info = self.log.CreateResultInfo( |
| 'Failed', 'Failed to set parameter: ' |
| 'InternetGatewayDevice.ManagementServer.URL') |
| self.log.SendLine(self.test_info, info) |
| else: |
| info = self.log.CreateProgressInfo( |
| '100%', 'Succeeded to set parameter: ' |
| 'InternetGatewayDevice.ManagementServer.URL') |
| self.log.SendLine(self.test_info, info) |
| info = self.log.CreateResultInfo( |
| 'Pass', 'Succeeded to set parameter: ' |
| 'InternetGatewayDevice.ManagementServer.URL') |
| self.log.SendLine(self.test_info, info) |
| |
| def VerifySetParameterValuesRPC(self): |
| """Test case: 11722375. To verify SetParameterValues RPC. |
| |
| To verify SetParametersRPC, this methond also try to set all parameters |
| which have 'write-able' status, with test value. |
| """ |
| # update parameter access state: |
| info = self.log.CreateProgressInfo( |
| '20%', 'Read Parameters read/write status from file.') |
| self.log.SendLine(self.test_info, info) |
| tree = self.ParametersFromXMLFile() |
| info = self.log.CreateProgressInfo( |
| '50%', 'Update parameters read/write status from Device.') |
| self.log.SendLine(self.test_info, info) |
| tree = self.UpdateParameterAccessState(tree, self.params['param_path']) |
| self.ParametersToXMLFile(tree) |
| # set parameter values: |
| self.VerifySetAllParameterValues() |
| info = self.log.CreateProgressInfo( |
| '100%', 'Verify: Set values for all parameters completed.') |
| self.log.SendLine(self.test_info, info) |
| info = self.log.CreateResultInfo( |
| 'Pass', 'Verification: set parameter ' |
| 'values completed. ' |
| 'Check result file: ' + self.log.params['f_result'] + ' for details.') |
| self.log.SendLine(self.test_info, info) |
| |
| def VerifyMonitorTemperature(self): |
| """Test the temperature monitor on device. |
| |
| This feature is not available for first launch. |
| """ |
| info = self.log.CreateResultInfo( |
| 'Block', 'This feature is not availalbe ' |
| 'on the first launch images.') |
| self.log.SendLine(self.test_info, info) |
| |
| def VerifyAutoChannelEnable(self): |
| """Test AutoChannelEnable on device. |
| |
| This feature is not available for first launch. |
| """ |
| info = self.log.CreateResultInfo( |
| 'Block', 'This feature is not availalbe ' |
| 'on the first launch images.') |
| self.log.SendLine(self.test_info, info) |
| |
| def SetServiceConfigs(self, param_dict): |
| """Set service parameter profile on ACS. |
| |
| This method sets the service configurations on ACS, and returns the |
| SetServiceConfigs result. |
| Args: |
| param_dict: |
| it is in this format in config file: |
| "wifi_params":{ |
| "Google.Wifi.SSID":{"string_value":"Home"}, |
| "Google.Wifi.Home.SSID_2GHz":{"string_value":"Home"}, |
| "Google.Wifi.Enable":{"boolean_value":"true"}, |
| "Google.Wifi.BeaconType":{"string_value": "Basic"}, |
| "Google.Wifi.BasicEncryptionModes":{"string_value":"WEPEncryption"} |
| Returns: |
| True: if set succeeded |
| False: otherwise |
| """ |
| app_id = self.GetACSAppFromURL() |
| nbi_path = self.params['nbi_client_path'] |
| account_id = self.params['account_id'] |
| device_label = self.params['device_label'] |
| parameter_value_struct = '' |
| |
| # parameter_value_struct: <name: "Google.Wifi.BasicEncryptionModes" |
| # string_value: "WEPEncryption"> |
| for key in param_dict.iterkeys(): |
| name = key |
| param_type = param_dict[key].keys()[0] |
| value = None |
| if 'string' in param_type: |
| value = '"' + param_dict[key].values()[0] + '"' |
| else: |
| value = param_dict[key].values()[0] |
| parameter_value_struct += ( |
| ' parameter_value_struct: <name: "' |
| + name + '" ' + param_type +': ' + value + '> ') |
| |
| parameter_value_struct = None |
| cmd = (nbi_path |
| + ' --server /gns/project/apphosting/stubby/prod-appengine/' |
| '*/' + app_id + '/default/* --service AccountService ' |
| '--method SetServiceConfigs --request \'' |
| 'account_device_id: <account_id: "' + account_id |
| + '" device_label: "' + device_label + '"> parameter_list: < ' |
| + parameter_value_struct + ' > async_apply_now: false\'') |
| print cmd |
| call_result = self.acs_instance.StubbyClientCall(cmd) |
| if 'FAILED' in call_result[0]: |
| info = self.log.CreateErrorInfo( |
| 'Warning', 'ACS failed on SetServiceConfigs query: ' + cmd) |
| self.log.SendLines(self.test_info, info) |
| return False |
| else: |
| print call_result |
| return call_result |
| |
| def VerifyWiFiConfiguration(self): |
| """Test case: 14165034, WiFi configuration verification. |
| |
| This test case will set WiFi parameters on ACS, and verify |
| that the service profile is correctly pushed to device. |
| Thin Bruno is configured with correct WiFi network. |
| |
| The WiFi parameters and values in specified in config file |
| in JSON format, e.g.: |
| "Google.Wifi.SSID":{"string_value":"Home"}, |
| "Google.Wifi.Home.SSID_2GHz":{"string_value":"Home"}, |
| "Google.Wifi.Enable":{"boolean_value":"true"}, |
| "Google.Wifi.BeaconType":{string_value: "Basic"}, |
| "Google.Wifi.BasicEncryptionModes":{"string_value":"WEPEncryption"} |
| |
| Returns: |
| return False if test failed |
| """ |
| info = self.log.CreateProgressInfo( |
| '10%', 'Setup SetServiceConfigs on ACS...') |
| self.log.SendLine(self.test_info, info) |
| if not self.SetServiceConfigs(self.params['wifi_params']): |
| return False |
| else: |
| info = self.log.CreateProgressInfo( |
| '20%', 'Setup SetServiceConfigs on ACS succeed.') |
| self.log.SendLine(self.test_info, info) |
| |
| # Verify if the service is pushed to device |
| # TODO(lmeng): what parameters to set and to verify |
| |
| #################### for Data Model Test Case #################### |
| |
| def Run(self): |
| """Run the test case.""" |
| ####### Add your code here -- BEGIN ####### |
| print 'Test Started...' |
| self.Init() |
| self.p_ssh.SshRetry(5, 15) |
| info = self.log.CreateProgressInfo( |
| '5%', 'SSH session to device successfully established!') |
| self.log.SendLine(self.test_info, info) |
| |
| #check device profile/image is as erquired management url from ACS |
| self.TestEnvCheck() |
| info = self.log.CreateProgressInfo( |
| '10%', 'Check device test setup completed!') |
| self.log.SendLine(self.test_info, info) |
| |
| if (self.test_info['testID'] == '14187024' |
| or self.test_info['testID'] == '14165033' |
| or self.test_info['testID'] == '14315008' |
| or self.test_info['testID'] == '14165035' |
| or self.test_info['testID'] == '14314007' |
| or self.test_info['testID'] == '11865133' |
| or self.test_info['testID'] == '14335001' |
| or self.test_info['testID'] == '14266498' |
| or self.test_info['testID'] == '14319149' |
| or self.test_info['testID'] == '14314008'): |
| self.VerifyAllReqParameters() |
| elif self.test_info['testID'] == '11722375': |
| self.VerifySetParameterValuesRPC() |
| elif self.test_info['testID'] == '14315009': |
| self.VerifyManagementServerURL() |
| elif self.test_info['testID'] == '14314009': |
| self.VerifyMonitorTemperature() |
| elif self.test_info['testID'] == '14165038': |
| self.VerifyAutoChannelEnable() |
| elif self.test_info['testID'] == '15643845': |
| self.VerifyNTPTimeZoneConfig() |
| elif self.test_info['testID'] == '14165034': |
| self.VerifyWiFiConfiguration() |
| else: |
| return |
| print 'Test Completed...' |
| ####### Add your code here -- END ####### |
| |
| def Destructor(self): |
| self.p_ssh.Close() |
| |
| if __name__ == '__main__': |
| # Test cases defined in this file: |
| test_defined = ['14187024', '14165033', '14315008', '14165035', '14314007', |
| '11865133', '14335001', '14266498', '14319149', '14314008', |
| '11722375', '14315009', '14314009', '14165038', '15643845', |
| '14165034'] |
| # To execute test cases that defined in config file: |
| test = DataModelTest(testID='14165034') |
| test_to_execute = testCase.TestCase.FindTestCaseToExecute('config.cfg') |
| for test_id in test_to_execute: |
| if test_id in test_defined: |
| test = DataModelTest(testID=test_id) |