blob: 8725e029eb7936053f62cd0fab5494e3ead526c4 [file] [log] [blame]
#!/usr/bin/python
# Copyright 2012 Google Inc. All Rights Reserved.
"""This File executes create/modify/delete operations on ACS datastore.
This class is used to interact with scripts that directly modifies
ACS datastore configuration. These ACS related actions may require
google3 dependencies
"""
__author__ = 'Lehan Meng (lmeng@google.com)'
import os
import re
class ACS(object):
"""This class executes operations that change Device profiles on ACS."""
def __init__(self, expected_version=None, **kwargs):
"""Constructor for this Class."""
self.acs_info = {
'url': None,
# expected image version ID on ACS
'imageVersion': expected_version,
'acs_path': None,
'outfile': 'cmdOutput.txt'}
for s in kwargs:
self.acs_info[s] = kwargs[s]
def SetLogging(self, log):
"""Setup the logging file(s)."""
self.log = log
def SetImageVersion(self, expected_version):
"""Set the expected image version that needs to be configured at ACS.
Args:
expected_version: the version string of the expected image.
"""
m = re.match('\d+', expected_version)
if m is None:
info = self.log.CreateErrorInfo(
'critical',
'Error! Please use software id as expected image version value')
self.log.SendLine(None, info)
else:
self.acs_info['imageVersion'] = expected_version
def ParseAppIDFromACSURL(self, url='https://acs.e2e.gfsvc.com/cwmp'):
"""Parse app_id from ACS instance host name."""
pass
def SaveConfiguration(self, profile_id, version_id, app_id,
host='https://acs.e2e.gfsvc.com/cwmp'):
"""Run the ACS configuration script from Google3 location."""
if not version_id:
v_id = self.acs_info['imageVersion']
else:
v_id = version_id
cur_dir = os.getcwd()
print '============' + cur_dir
# the acs_path is the google3 repository where upgrade_test.py
# locates. This test depends on it to modify ACS datastore.
os.chdir(self.acs_info['acs_path'])
print '============' + os.getcwd()
os.system(
'blaze test --notest_loasd --test_arg=--imageVersion='
+ v_id + ' --test_arg=--profile_id='
+ profile_id + ' --test_arg=--app_id=' + app_id
+ ' --test_arg=--host=' + host
+ ' --test_strategy=local :upgrade_test ')
os.chdir(cur_dir)
print '============' + os.getcwd()
def StubbyClientCall(self, cmd=None):
"""This is the nbi_client stubby call to get parameter names and values."""
p = os.popen(cmd, 'r')
log_list = []
while 1:
line = p.readline()
if not line: break
log_list.append(line.strip())
return log_list
def ParseParameterAccessStates(self, cmd_list):
"""Find out if a parameter is 'writable' or 'read-write'.
This method process the nbi_client getParameterNames() call result
to find out the access state of the parameters.
Args:
cmd_list: the getParameterNames() call returned result list
Returns:
a dictionary: parameter name as index, and its access state
as value.
"""
if ('FAILED' in cmd_list[0]) or (not 'SUCCESS' in cmd_list[0]):
info = self.log.CreateErrorInfo(
'Critical', 'nbi_clienet call failed. ' + str(cmd_list[0]))
self.log.SendLine(None, info)
return False
para_dict = {}
for line in cmd_list:
if 'name:' in line:
name = re.sub('.*name:\s*\"(.*)\".*', r'\1', line)
index = cmd_list.index(line)+1
access = re.sub('.*writable:\s*([a-zA-Z]+)', r'\1', cmd_list[index])
para_dict.update({name: access})
return para_dict
def CheckCmdCall(self, cmd_list):
"""Check if the RPC call is a success or failure."""
if ('FAILED' in cmd_list[0]) or (not 'SUCCESS' in cmd_list[0]):
info = self.log.CreateErrorInfo(
'Critical', 'nbi_client call failed. ' + str(cmd_list[0]))
self.log.SendLine(None, info)
return False
else:
return True
def ParseManagementURL(self, cmd_list):
"""Parse Management URL from GetParameterValues query reply message.
Reply message is in the following format:
#########################################################
SUCCESS! Response: parameter_value_list {
parameter_value_struct {
name: "InternetGatewayDevice.ManagementServer.URL"
string_value: "https://acs.e2e.gfsvc.com/cwmp"
}
}
#########################################################
Args:
cmd_list: the ACS url information obtained from device log or file
Returns:
returns the management url found, or return false if url not found
"""
if not cmd_list:
return False
for line in cmd_list:
if ('name:' in line and
'InternetGatewayDevice.ManagementServer.URL' in line):
s = cmd_list[cmd_list.index(line)+1]
m = re.search('http[s]?://(\w*\.)+com/cwmp', s)
if not m:
info = self.log.CreateErrorInfo(
'Warning', 'No management server URL found in GetParameterValues '
'query reply.')
self.log.SendLine(None, info)
return False
else:
return m.group(0)
def ParseParameterNames(self, cmd_list, replace_index=False):
"""Parse the get values RPC call response, retrieve parameter name list.
Args:
cmd_list: the command reply message from nbi_client
replace_index: if True, the index digits in parameter names will be
replaced with '.{i}.'
Returns:
return the parsed parameter name list
"""
if not self.CheckCmdCall(cmd_list):
return False
chk_list = []
for line in cmd_list:
if 'name:' in line:
output = re.sub('.*\"(.*)\".*', r'\1', line)
if replace_index:
output = re.sub(r'(\.)[\d]+(\.)', r'\1{i}\2', output)
if chk_list.count(output) < 1:
chk_list.append(output)
return chk_list
if __name__ == '__main__':
pass