| #!/usr/bin/python |
| # Copyright 2012 Google Inc. All Rights Reserved. |
| |
| """This File executes create/modify/delete operations on ACS datastore. |
| |
| This class is used to interact with scripts that directly modifies |
| ACS datastore configuration. These ACS related actions may require |
| google3 dependencies |
| """ |
| |
| __author__ = 'Lehan Meng (lmeng@google.com)' |
| |
| |
| import os |
| import re |
| |
| |
| class ACS(object): |
| """This class executes operations that change Device profiles on ACS.""" |
| |
| def __init__(self, expected_version=None, **kwargs): |
| """Constructor for this Class.""" |
| self.acs_info = { |
| 'url': None, |
| # expected image version ID on ACS |
| 'imageVersion': expected_version, |
| 'acs_path': None, |
| 'outfile': 'cmdOutput.txt'} |
| for s in kwargs: |
| self.acs_info[s] = kwargs[s] |
| |
| def SetLogging(self, log): |
| """Setup the logging file(s).""" |
| self.log = log |
| |
| def SetImageVersion(self, expected_version): |
| """Set the expected image version that needs to be configured at ACS. |
| |
| Args: |
| expected_version: the version string of the expected image. |
| """ |
| m = re.match('\d+', expected_version) |
| if m is None: |
| info = self.log.CreateErrorInfo( |
| 'critical', |
| 'Error! Please use software id as expected image version value') |
| self.log.SendLine(None, info) |
| else: |
| self.acs_info['imageVersion'] = expected_version |
| |
| def ParseAppIDFromACSURL(self, url='https://acs.e2e.gfsvc.com/cwmp'): |
| """Parse app_id from ACS instance host name.""" |
| pass |
| |
| def SaveConfiguration(self, profile_id, version_id, app_id, |
| host='https://acs.e2e.gfsvc.com/cwmp'): |
| """Run the ACS configuration script from Google3 location.""" |
| if not version_id: |
| v_id = self.acs_info['imageVersion'] |
| else: |
| v_id = version_id |
| |
| cur_dir = os.getcwd() |
| print '============' + cur_dir |
| # the acs_path is the google3 repository where upgrade_test.py |
| # locates. This test depends on it to modify ACS datastore. |
| os.chdir(self.acs_info['acs_path']) |
| print '============' + os.getcwd() |
| os.system( |
| 'blaze test --notest_loasd --test_arg=--imageVersion=' |
| + v_id + ' --test_arg=--profile_id=' |
| + profile_id + ' --test_arg=--app_id=' + app_id |
| + ' --test_arg=--host=' + host |
| + ' --test_strategy=local :upgrade_test ') |
| os.chdir(cur_dir) |
| print '============' + os.getcwd() |
| |
| def StubbyClientCall(self, cmd=None): |
| """This is the nbi_client stubby call to get parameter names and values.""" |
| p = os.popen(cmd, 'r') |
| log_list = [] |
| while 1: |
| line = p.readline() |
| if not line: break |
| log_list.append(line.strip()) |
| return log_list |
| |
| def ParseParameterAccessStates(self, cmd_list): |
| """Find out if a parameter is 'writable' or 'read-write'. |
| |
| This method process the nbi_client getParameterNames() call result |
| to find out the access state of the parameters. |
| Args: |
| cmd_list: the getParameterNames() call returned result list |
| Returns: |
| a dictionary: parameter name as index, and its access state |
| as value. |
| """ |
| if ('FAILED' in cmd_list[0]) or (not 'SUCCESS' in cmd_list[0]): |
| info = self.log.CreateErrorInfo( |
| 'Critical', 'nbi_clienet call failed. ' + str(cmd_list[0])) |
| self.log.SendLine(None, info) |
| return False |
| |
| para_dict = {} |
| for line in cmd_list: |
| if 'name:' in line: |
| name = re.sub('.*name:\s*\"(.*)\".*', r'\1', line) |
| index = cmd_list.index(line)+1 |
| access = re.sub('.*writable:\s*([a-zA-Z]+)', r'\1', cmd_list[index]) |
| para_dict.update({name: access}) |
| return para_dict |
| |
| def CheckCmdCall(self, cmd_list): |
| """Check if the RPC call is a success or failure.""" |
| if ('FAILED' in cmd_list[0]) or (not 'SUCCESS' in cmd_list[0]): |
| info = self.log.CreateErrorInfo( |
| 'Critical', 'nbi_client call failed. ' + str(cmd_list[0])) |
| self.log.SendLine(None, info) |
| return False |
| else: |
| return True |
| |
| def ParseManagementURL(self, cmd_list): |
| """Parse Management URL from GetParameterValues query reply message. |
| |
| Reply message is in the following format: |
| ######################################################### |
| SUCCESS! Response: parameter_value_list { |
| parameter_value_struct { |
| name: "InternetGatewayDevice.ManagementServer.URL" |
| string_value: "https://acs.e2e.gfsvc.com/cwmp" |
| } |
| } |
| ######################################################### |
| |
| Args: |
| cmd_list: the ACS url information obtained from device log or file |
| Returns: |
| returns the management url found, or return false if url not found |
| """ |
| if not cmd_list: |
| return False |
| for line in cmd_list: |
| if ('name:' in line and |
| 'InternetGatewayDevice.ManagementServer.URL' in line): |
| s = cmd_list[cmd_list.index(line)+1] |
| m = re.search('http[s]?://(\w*\.)+com/cwmp', s) |
| if not m: |
| info = self.log.CreateErrorInfo( |
| 'Warning', 'No management server URL found in GetParameterValues ' |
| 'query reply.') |
| self.log.SendLine(None, info) |
| return False |
| else: |
| return m.group(0) |
| |
| def ParseParameterNames(self, cmd_list, replace_index=False): |
| """Parse the get values RPC call response, retrieve parameter name list. |
| |
| Args: |
| cmd_list: the command reply message from nbi_client |
| replace_index: if True, the index digits in parameter names will be |
| replaced with '.{i}.' |
| Returns: |
| return the parsed parameter name list |
| """ |
| if not self.CheckCmdCall(cmd_list): |
| return False |
| |
| chk_list = [] |
| for line in cmd_list: |
| if 'name:' in line: |
| output = re.sub('.*\"(.*)\".*', r'\1', line) |
| if replace_index: |
| output = re.sub(r'(\.)[\d]+(\.)', r'\1{i}\2', output) |
| if chk_list.count(output) < 1: |
| chk_list.append(output) |
| return chk_list |
| |
| |
| if __name__ == '__main__': |
| pass |