| #!/usr/bin/python |
| # Copyright 2012 Google Inc. All Rights Reserved. |
| |
| """This test performs TR069 related DataModel verification. |
| |
| This class extends the TestCase class, it does the following: |
| 1. Configure/Verify Bruno with correct TR69 parameters: |
| - ACS URL |
| - Image Version |
| 2. Verify Bruno has connection to ACS (IPv6/IPv4) |
| 3. Set/Get TR69 data model parameters: |
| - getParameterNames() |
| - getParameterValues() |
| 4. Data mode: |
| - Data Models for GVSB |
| - Data Models for TR098 |
| - Data Models for MoCA |
| - Data Models for TR140 |
| - Data Models for TR181 |
| """ |
| |
| __author__ = 'Lehan Meng (lmeng@google.com)' |
| |
| |
| import os |
| import re |
| import string |
| import time |
| import xml.etree.ElementTree as ET |
| |
| import acs |
| import device |
| import ssh |
| import testCase |
| |
| |
| class DataModelTest(testCase.TestCase): |
| """Class for TR69 DataModel and parameter verification test. |
| |
| Configuration Parameters: |
| TBD |
| testID: 14187024 |
| title =DataModel |
| req_file = 14187024-requirement.req |
| """ |
| |
| def Init(self): |
| """Initialte the test case.""" |
| info = self.log.CreateResultInfo( |
| '---', 'Data model verification test started...') |
| self.log.SendLine(self.test_info, info) |
| |
| self.bruno = device.Bruno(user=self.params['user'], |
| addr=self.params['addr'], |
| pwd=self.params['pwd'], |
| cmd_prompt=self.params['bruno_prompt']) |
| self.p_ssh = ssh.SSH(user=self.params['user'], |
| addr=self.params['addr'], |
| pwd=self.params['pwd'], |
| bruno_prompt=self.params['bruno_prompt'], |
| addr_ipv6=self.params['addr_ipv6'], |
| athena_user=self.params['athena_user'], |
| athena_pwd=self.params['athena_pwd'], |
| jump_server=self.params['jump_server']) |
| self.p_ssh.Setlogging(self.log) |
| self.bruno.Setlogging(self.log) |
| self.bruno.dev_ssh = self.p_ssh |
| |
| self.__short_delay = 30 #short delay: 5 seconds |
| self.__delay = 80 #medium delay: 180 seconds |
| self.__long_delay = 120 #long delay: 600 seconds |
| |
| self.req_file = open(self.params['req_file'], 'r') |
| self.req_tr069_params = [] |
| |
| def TestEnvCheck(self): |
| """Verify device configuration meets the test requirement.""" |
| acs_url = self.bruno.GetACSUrl() |
| self.AddConfigParams('acs_url', acs_url) |
| |
| current_version = self.bruno.getCurrentVersion() |
| if (not current_version |
| or current_version != self.params['expected_version']): |
| info = self.log.CreateErrorInfo( |
| 'critical', 'Not expected image version. Expected: ' |
| + self.params['expected_version'] + ', Current: ' |
| + current_version) |
| self.log.SendLine(self.test_info, info) |
| else: |
| self.AddConfigParams('current_version', current_version) |
| |
| def CheckRequirement(self, chk_list, req_list): |
| """Check parameter names with parameters in requirement file. |
| |
| This file will compare the parameters from Device and from requirement |
| File. The differences between the two will be written to the result log |
| file |
| Args: |
| chk_list: the list of parameters results returned by query |
| req_list: the list of parameters from requirement |
| Returns: |
| missing_list: the list of parameters that is missing from requirement |
| net_list: the list of parameters that are newly added but not in the |
| requirement |
| """ |
| missing_list = [] |
| new_list = [] |
| #check missing parameters from Requirement |
| for req_line in req_list: |
| verify = False |
| for chk_line in chk_list: |
| if req_line == chk_line: |
| verify = True |
| elif string.find(chk_line, req_line) == 0: |
| verify = True |
| if not verify: |
| missing_list.append(req_line) |
| |
| #check newly added parameters to Requirement |
| for line in chk_list: |
| if req_list.count(line) < 1 and new_list.count(line) < 1: |
| new_list.append(line) |
| return (missing_list, new_list) |
| |
| def SetParameterValues(self, param='', value=''): |
| """Set parameter values on Device.""" |
| nbi_path = self.params['nbi_client_path'] |
| cmd = (nbi_path + ' --server /gns/project/apphosting/stubby/prod-appengine/' |
| '*/gfiber-acs-staging/default/* --service CpeParametersService ' |
| '--method SetParameterValues --request \'' |
| 'cpe_id: <oui: "F88FCA" product_class: "GFMS100" serial_no: ' |
| '"110120906BDE"> request: <parameter_list < ' |
| 'parameter_value_struct: <name: "' |
| + param + |
| '" string_value: "' + value + '"> > parameter_key: "myParamKey">\'') |
| acs_instance = acs.ACS() |
| acs_instance.SetLogging(self.log) |
| call_result = acs_instance.StubbyClientCall(cmd) |
| if 'FAILED' in call_result[0]: |
| self.nbi_result = [] |
| else: |
| self.nbi_result = call_result |
| print call_result |
| |
| def GetParameterValues(self, param=''): |
| """Get parameter value(s) from the data model tree. |
| |
| This method get value(s) of the designated parameter(s) |
| Args: |
| param: this is the parameter (path) that need to get value(s) from. |
| 1. If this path ends with a dot '.', all parameters |
| under this path will be queried for values. |
| 2. If this path does not end with a '.', the parameter |
| must be a leaf parameter which has a value. |
| Returns: |
| This method returns a list of parameter values and record them to file. |
| """ |
| nbi_path = self.params['nbi_client_path'] |
| cmd = (nbi_path + ' --server /gns/project/apphosting/stubby/prod-appengine/' |
| '*/gfiber-acs-staging/default/* --service CpeParametersService ' |
| '--method GetParameterValues --request \'' |
| 'cpe_id: <oui: "F88FCA" product_class: "GFMS100" serial_no: ' |
| '"110120906BDE"> request: <parameter_names: "' +param + '">\'') |
| acs_instance = acs.ACS() |
| acs_instance.SetLogging(self.log) |
| call_result = acs_instance.StubbyClientCall(cmd) |
| if 'FAILED' in call_result[0]: |
| self.nbi_result = [] |
| else: |
| self.nbi_result = call_result |
| print call_result |
| return acs_instance.ParseParameterNames(call_result) |
| |
| def GetParameterNamesFromDeviceLog(self, time_stamp, param=''): |
| """Get TR069 data model parameters that are supported by current image.""" |
| param_list = [] |
| param_list_start = False |
| param_list_end = False |
| self.device_log_result = [] |
| num_of_params = 0 |
| |
| self.p_ssh.SendCmd('dmesg | grep cwmpd:') |
| log_list = self.p_ssh.GetCmdOutput() |
| log_list.reverse() |
| |
| for line in log_list: |
| # check only the getParameter query starts after the last known event: |
| if float(self.GetTimeStamp(line)) > float(time_stamp): |
| if '<cwmp:GetParameterValues>' in line: |
| # check the parameter query from nbi_client has been answered: |
| m = re.search(r'.*<cwmp:GetParameterValues>.*<ParameterNames.*/>(.*)' |
| '</ParameterNames></cwmp:GetParameterValues>.*', line) |
| if m is not None: |
| output = re.sub(r'.*<cwmp:GetParameterValues>.*<ParameterNames.*/>' |
| '(.*)</ParameterNames></cwmp:GetParameterValues>.*', |
| r'\1', line) |
| if output.strip() == param: |
| param_list_start = True |
| continue |
| |
| if '</cwmp:GetParameterValuesResponse>' in line: |
| if param_list_start: |
| param_list_end = True |
| break |
| |
| if param_list_start: |
| self.device_log_result.append(line.strip()) |
| if '<Name>' in line: |
| num_of_params += 1 |
| output = re.sub(r'.*(<Name>)(.*)(</Name>)', r'\2', line) |
| output = output.strip() |
| output = re.sub(r'(\.)[\d]+(\.)', r'\1{i}\2', output) |
| if param_list.count(output) < 1: |
| param_list.append(output) |
| |
| if (not param_list_start) or (not param_list_end): |
| return False |
| return param_list |
| |
| def CreateElement(self, param_name, param_type, param_value): |
| """Create a new parameter node that contains parameter name and value. |
| |
| Currently try to set the same value as being read from the device |
| Args: |
| param_name: the name of the parameter (Path) |
| param_type: the type of the parameter |
| param_value: the value of the parameter |
| Returns: |
| return the created element node |
| """ |
| e = ET.Element('parameter_value_struct') |
| name = ET.Element('name') |
| name.text = param_name |
| e.append(name) |
| value = ET.Element('value', type=param_type) |
| value.text = param_value |
| e.append(value) |
| test_value = ET.Element('test_value', type=param_type) |
| test_value.text = param_value |
| e.append(test_value) |
| return e |
| |
| def ParseToElementTree(self): |
| """Parse the parameter list to Element Tree structure.""" |
| root = ET.Element('Bruno_Automation_Test', name='Set_Parameter_Value_Test', |
| testCases=self.test_info['testID']) |
| root.text = 'This structure stores all parameters and values for this test' |
| |
| if self.nbi_result != []: |
| for line in self.nbi_result: |
| if 'name:' in line: |
| param_name = re.sub('.*\"(.*)\".*', r'\1', line).strip() |
| index = self.nbi_result.index(line) |
| param_type = self.nbi_result[index+1].split(':')[0].strip() |
| param_value = self.nbi_result[index+1].split(':')[1].strip() |
| root.append(self.CreateElement(param_name, param_type, param_value)) |
| elif self.device_log_result != []: |
| #self.device_log_result.reverse() |
| for line in self.device_log_result: |
| if '<Name>' in line: |
| param_name = re.sub(r'.*(<Name>)(.*)(</Name>)', r'\2', line).strip() |
| index = self.device_log_result.index(line) + 1 |
| while not 'Value' in self.device_log_result[index]: |
| index += 1 |
| if index >= len(self.device_log_result) or '<Name>' in line: |
| info = self.log.createErrorInfo( |
| 'Critical', 'Mis-matching name-value pair in log file!') |
| self.log.sendLine(self.test_info, info) |
| return False |
| |
| line = self.device_log_result[index] |
| param_type = re.sub('.*type\w*=\w*\".*:(.*)\".*', r'\1', line).strip() |
| param_value = re.sub('.*<.*type.*>(.*)</Value>', r'\1', line).strip() |
| root.append(self.CreateElement(param_name, param_type, param_value)) |
| else: |
| info = self.log.CreateErrorInfo( |
| 'Critical', 'Failed to get parameter list from device!') |
| self.log.SendLine(self.test_info, info) |
| return False |
| tree = ET.ElementTree(root) |
| fout = open(self.params['para_tree_file'], 'w') |
| tree.write(fout) |
| fout.close() |
| return tree |
| |
| def VerifyParameterValues(self, param=None): |
| """Verify the designated parameter can be set with expected value. |
| |
| Test results will be written to XML formated file, with test value |
| that has been set for the parameter(s) |
| For those parameters that failed to set value, they are recorded in |
| the result file specified by self.log instance. |
| Args: |
| param: |
| 1. if parameter is given, this method will set the value of this |
| parameter |
| 2. if the parameter is not given, this method will set all parameters |
| that the device currently supports |
| """ |
| self.para_tree = self.ParseToElementTree() |
| self.SetParameterValues(param, 'none') |
| |
| def GetRequirementFromFile(self): |
| """Get TR69 data model parameters from requirement file. |
| |
| The format of the requirement file is as follows: |
| 1. Each line represents a data model parameter |
| 2. If a parameter has indexed nodes, index numbers are replaced by {i} |
| ###### Begin of File ###### |
| Device.Bridging. |
| Device.Bridging.Bridge.{i}. |
| Device.Bridging.Bridge.{i}.Port.{i}. |
| Device.Bridging.Bridge.{i}.Port.{i}.Stats. |
| ... ... ... |
| Device.DeviceInfo.Manufacturer |
| Device.DeviceInfo.ManufacturerOUI |
| X_GOOGLE-COM_GVSB.EpgSecondary |
| X_GOOGLE-COM_GVSB.GvsbChannelLineup |
| X_GOOGLE-COM_GVSB.GvsbKick |
| X_GOOGLE-COM_GVSB.GvsbServer |
| ###### End of File ###### |
| Returns: |
| return the requirement parameters in a list |
| """ |
| req_list = [] |
| for line in self.req_file: |
| line = line.strip() |
| if req_list.count(line) < 1: |
| req_list.append(line) |
| return req_list |
| |
| def GetTimeStamp(self, line=None): |
| """Extract the time information of most current event from Device log. |
| |
| i.e., the time that the last event happens since reboot (in seconds) |
| Args: |
| line: a line of device log which contains event time information. |
| If it has value of 'None', then retrieve the last a few lines from |
| device log to extract time information. |
| Returns: |
| return the time stamp string if succeed, return -1 otherwise. |
| """ |
| if line is None: |
| # get time stamp from Device log file |
| self.p_ssh.SendCmd('dmesg | grep cwmpd:') |
| # search the last 30 lines in Device log for time information |
| latest_log_entry = 30 |
| log_list = self.p_ssh.GetCmdOutput(latest_log_entry) |
| |
| for line in log_list: |
| m = re.match('\\[[\s]*(\d*\\.\d{3})\\]', line) |
| if m is not None: |
| # Match, return the time stamp |
| return m.group(1) |
| else: |
| # get time stamp from a Device log line |
| m = re.match('\\[[\s]*(\d*\\.\d{3})\\]', line) |
| if m is not None: |
| # Match, return the time stamp |
| return m.group(1) |
| # No time stamp found |
| return -1 |
| |
| #################### for Data Model Test Case #################### |
| def Run(self): |
| """Run the test case.""" |
| ####### Add your code here -- BEGIN ####### |
| print 'Test Started...' |
| self.Init() |
| self.p_ssh.SshRetry(5, 15) |
| #self.p_ssh.SshToAthena() |
| info = self.log.CreateProgressInfo( |
| '5%', 'SSH session to Athena server successfully established!') |
| self.log.SendLine(self.test_info, info) |
| |
| #self.p_ssh.SshRetryFromAthena() |
| #info = self.log.CreateProgressInfo( |
| # '15%', 'SSH session to Device successfully established!') |
| #self.log.SendLine(self.test_info, info) |
| |
| #check device profile/image is as erquired management url from ACS |
| self.TestEnvCheck() |
| info = self.log.CreateProgressInfo( |
| '25%', 'Check device test setup completed!') |
| self.log.SendLine(self.test_info, info) |
| |
| # get parameter name lists, compare with requirement |
| info = self.log.CreateProgressInfo( |
| '35%', 'Get parameters from nbi_client ...') |
| self.log.SendLine(self.test_info, info) |
| time_stamp = self.GetTimeStamp() |
| if time_stamp == -1: |
| info = self.log.CreateErrorInfo( |
| 'Critical', 'Cannot get time information from Device log!') |
| self.log.SendLine(self.test_info, info) |
| os.sys.exit(1) |
| chk_list = self.GetParameterValues() |
| |
| if not chk_list: |
| info = self.log.CreateProgressInfo( |
| '40%', 'Get parameters from device log ...') |
| self.log.SendLine(self.test_info, info) |
| # nbi client may fail because of timeout |
| # then get parameter list from device log |
| while not chk_list: |
| time.sleep(self.__delay) |
| chk_list = self.GetParameterNamesFromDeviceLog(time_stamp) |
| chk_list.sort() |
| |
| if not chk_list: |
| info = self.log.CreateErrorInfo( |
| 'Critical', 'Cannot find parameter list requirement from log file!') |
| self.log.SendLine(self.test_info, info) |
| os.sys.exit(1) |
| |
| req_list = self.GetRequirementFromFile() |
| (mis_list, new_list) = self.CheckRequirement(chk_list, req_list) |
| mis_list.sort() |
| new_list.sort() |
| self.log.SendTestData(['############# Missing Parameters #############']) |
| self.log.SendTestData(mis_list) |
| |
| self.log.SendTestData(['############ Newly Added Parameters ############']) |
| self.log.SendTestData(new_list) |
| info = self.log.CreateProgressInfo( |
| '50%', 'Get parameter names completed. Check ' |
| + self.log.params['f_result'] + ' file for missing parameters.') |
| self.log.SendLine(self.test_info, info) |
| |
| # TODO(lmeng): |
| #set parameter values: |
| #self.VerifyParameterValues('X_GOOGLE-COM_GVSB.GvsbKick') |
| #info = self.log.CreateProgressInfo( |
| # '60%', 'Set parameter values completed. Check ' |
| # + self.log.params['f_result'] + ' file for missing parameters.') |
| #self.log.SendLine(self.test_info, info) |
| |
| #self.SetParameterValues('X_GOOGLE-COM_GVSB.GvsbKick', 'arp') |
| #self.GetParameterValues('X_GOOGLE-COM_GVSB.GvsbKick') |
| #self.SetParameterValues('X_GOOGLE-COM_GVSB.GvsbKick', 'none') |
| #self.GetParameterValues('X_GOOGLE-COM_GVSB.GvsbKick') |
| #self.p_ssh.ExitToAthena() |
| print 'Test Completed...' |
| ####### Add your code here -- END ####### |
| |
| def Destructor(self): |
| self.p_ssh.Close() |
| |
| if __name__ == '__main__': |
| test = DataModelTest(testID='14187024', key_word='DataModel') |