Bruno automation test: Image download test case script code upload
Change-Id: I8cf59d6daab97062f824978407aab0498b980acc
diff --git a/config.cfg b/config.cfg
new file mode 100644
index 0000000..6659a21
--- /dev/null
+++ b/config.cfg
@@ -0,0 +1,39 @@
+##############################################
+# This is the config file for the test cases #
+# #
+# All test cases share the same config file. #
+# Each test case configuration starts with #
+# the testID of that test case. #
+# #
+# comment line starts with '#' and will not #
+# be parsed as configuration information #
+##############################################
+testID: 11843140
+user = root
+addr = 192.168.1.4
+addr_ipv6 = 2605:a601:15:9001:21a:11ff:fe30:64d5
+pwd = google
+bruno_prompt = gfibertv#
+title =Image_Download
+expected_version_id=481109
+expected_version=bruno-jaguar-3
+downgrade_version=bruno-koala-2
+downgrade_version_id=662005
+ACS_URL = https://gfiber-acs-staging.appspot.com/cwmp
+# if authentication server is required to access the device
+jump_server = jmp.googlefiber.net
+# if athena server is required to access the device:
+athena_user = lmeng
+athena_pwd = changeme
+
+
+
+#testID: 14187024
+#title =DataModel
+#rq_file = 14187024-requirement.req
+#device_mURL = http://[2607:f8b0:400b:800::1011]/
+#device_IP = 2607:f8b0:400b:800::1011
+#serial_NO =
+
+
+
diff --git a/download.py b/download.py
new file mode 100755
index 0000000..475b287
--- /dev/null
+++ b/download.py
@@ -0,0 +1,368 @@
+#!/usr/bin/python
+# Copyright 2012 Google Inc. All Rights Reserved.
+
+"""This class defines the test case "image download" from ACS.
+
+This class extends the TestCase Class.
+testID: 11843140
+This test does the following tasks:
+ - check current image version
+ - if the current version is the same as expected version,
+ this test will first to try to downgrade it to the downgrade_version.
+ - Then, it will Run image download test
+ - Fianlly, it will downgrade to downgrade_version if this has not been
+ done. This is to test the download handling capabilities of the
+ expected_version
+ - configure ACS instance with these software versions
+ - verify image download and installation to be complete and successful, by
+ observing a successfully reboot after installation
+"""
+
+__author__ = 'Lehan Meng (lmeng@google.com)'
+
+
+import os
+import re
+import time
+
+import acs
+import ssh
+import testCase
+
+
+class DownloadTest(testCase.TestCase):
+ """Verify Bruno image download from ACS (TestID: 11843140).
+
+ Configuration Parameters:
+ testID: 11843140
+ user = root
+ addr = 10.0.28.191
+ pwd = google
+ bruno_prompt = (none)#
+ file=cmdOutput.txt
+ title =Image_Download
+ current_version=None
+ expected_version=bruno-iguana-2-dg
+ ACS_URL = https://gfiber-acs-staging.appspot.com/cwmp
+ """
+
+ def Init(self):
+ """Initialte the DownloadTest instance."""
+ self.p_ssh = ssh.SSH(user=self.params['user'],
+ addr=self.params['addr'],
+ pwd=self.params['pwd'],
+ bruno_prompt=self.params['bruno_prompt'],
+ addr_ipv6=self.params['addr_ipv6'],
+ athena_user=self.params['athena_user'],
+ athena_pwd=self.params['athena_pwd'],
+ jump_server=self.params['jump_server'])
+ self.p_ssh.Setlogging(self.log)
+ self.__debug = False
+ self.__short_delay = 5 #short delay: 5 seconds
+ self.__delay = 180 #medium delay: 180 seconds
+ self.__long_delay = 600 #long delay: 600 seconds
+
+ def CheckACSUrl(self):
+ """Check current ACS URL of the device.
+
+ Set it to a default URL if the current URL is not recognized
+ """
+ self.p_ssh.SendCmd('cat /tmp/cwmp/acs_url')
+ line = self.p_ssh.GetCmdOutput()[1]
+ m_1 = re.match(self.params['ACS_URL'], line)
+ if m_1 is None:
+ info = self.log.CreateErrorInfo(
+ 'intermediate',
+ 'ACS URL unknown! Update ACS URL manually')
+ self.log.SendLine(self.test_info, info)
+
+ self.p_ssh.SendCmd('echo ' + self.params['ACS_URL']
+ + ' > /tmp/cwmp/acs_url')
+ self.AddConfigParams('current_ACS_url', self.params['ACS_URL'])
+ else:
+ self.AddConfigParams('current_ACS_url', self.params['ACS_URL'])
+
+ def SetACSUrl(self, url='http://acs-sandbox.appspot.com/cwmp'):
+ """Set ACS URL on the device."""
+ self.p_ssh.SendCmd('echo ' + url + ' > /tmp/cwmp/acs_url')
+ self.p_ssh.SendCmd('cat /tmp/cwmp/acs_url')
+ info = self.log.CreateProgressInfo('---', 'Set ACS url to: ' + url)
+ self.log.SendLine(self.test_info, info)
+
+ def CheckVersion(self):
+ """Check current software version."""
+ self.p_ssh.SendCmd('more /etc/version')
+ current_version = self.p_ssh.GetCmdOutput(2)[1].strip()
+ self.params['current_version'] = current_version
+
+ def GetVersion(self):
+ """Get current software version."""
+ self.p_ssh.SendCmd('more /etc/version')
+ return self.p_ssh.GetCmdOutput(2)[1].strip()
+
+ def GetTimeStamp(self, line=None):
+ """Extract the time information of most current event from Device log.
+
+ i.e., the time that the last event happens since reboot (in seconds)
+ Args:
+ line: a line of device log which contains event time information.
+ If it has value of 'None', then retrieve the last a few lines from
+ device log to extract time information.
+ Returns:
+ return the time stamp string if succeed, return -1 otherwise.
+ """
+ if line is None:
+ # get time stamp from Device log file
+ self.p_ssh.SendCmd('dmesg | grep cwmpd:')
+ # search the last 30 lines in Device log for time information
+ latest_log_entry = 30
+ log_list = self.p_ssh.GetCmdOutput(latest_log_entry)
+
+ for line in log_list:
+ m = re.match('\\[[\s]*(\d*\\.\d{3})\\]', line)
+ if m is not None:
+ # Match, return the time stamp
+ return m.group(1)
+ else:
+ # get time stamp from a Device log line
+ m = re.match('\\[[\s]*(\d*\\.\d{3})\\]', line)
+ if m is not None:
+ # Match, return the time stamp
+ return m.group(1)
+
+ # No time stamp found
+ return -1
+
+ def ConfigACS(self):
+ """Setup the ACS device profile with expected image.
+
+ Then restart the cwmpd process on device to
+ initiate the inform/download process
+ Returns:
+ return the time stamp string if succeed, return -1 otherwise.
+ """
+ #configure ACS:
+ acs_instance = acs.ACS(self.params['expected_version_id'])
+ acs_instance.SetLogging(self.log)
+ acs_instance.SaveConfiguration()
+
+ #self.p_ssh.SendCmd('/etc/Init.d/S85catawampus restart')
+ # To setup any device service parameter using nbi_client will trigger
+ # ACS to make a remote connection request to the device.
+ # However, currently ACS does not provide separate stubby call to
+ # only request a remote connection.
+ # As walk around:
+ # 1. call remote management URL locally to initiate Inform immediately
+ # 2. wait for the next periodic inform to initiate download
+ # Here go for the 2nd option.
+ time_stamp = self.GetTimeStamp()
+ return time_stamp
+
+ def GetRemoteConnectionURL(self):
+ """Get remote connection URL from the cwmp log of the device."""
+ self.p_ssh.SendCmd('dmesg | grep cwmpd:')
+ log_list = self.p_ssh.GetCmdOutput()
+ for line in log_list:
+ if 'InternetGatewayDevice.ManagementServer.ConnectionRequestURL' in line:
+ index = log_list.index(line)
+ index -= 1
+ if index < 0:
+ # End of long while download not started, wait 281
+ info = self.log.createErrorInfo(
+ 'low', 'Can not find ConnectionRequestURL from device log ...')
+ self.log.sendLine(self.test_info, info)
+ return None
+
+ log_str = log_list[index]
+ m = re.search('(http://[\d\l:]+)</Value>', log_str)
+ if m is not None:
+ # match successful:
+ return m.group(1)
+ # No ConnectionRequestURL find:
+ info = self.log.createErrorInfo(
+ 'low', 'Can not find ConnectionRequestURL from device log ...')
+ self.log.sendLine(self.test_info, info)
+ return None
+
+ def CheckDownloadFileExists(self):
+ """check if a downloaded file is still in the folder.
+
+ A file in the folder '/rw/tr69/dnld/' indicates either of the
+ following:
+ 1. download is in progress
+ 2. download completed, installation is in progress
+ 3. download completed, installation completed, waiting for reboot
+
+ the upgrade is not considered as complete until a reboot is done
+
+ Returns:
+ return True if file exists, return False otherwise.
+ """
+ self.p_ssh.SendCmd('ls -l /rw/tr69/dnld/')
+ latest_log_entry = 4
+ log_list = self.p_ssh.GetCmdOutput(latest_log_entry)
+ for line in log_list:
+ m = re.search(r'[\s]+([\w,\d]{9})', line)
+ if m is not None:
+ # download file found
+ return True
+ return False
+
+ def IsRebooted(self, time_stamp):
+ """check if the device rebooted after the software installation.
+
+ This method checks device log file, if any event found with smaller time
+ stamp value than the last known device event, it indicates that the device
+ has been rebooted since then
+ Args:
+ time_stamp: time stamp that the last event recorded on device log
+ Returns:
+ return True if rebooted
+ return False otherwise
+ """
+ s = self.GetTimeStamp()
+ if float(s) < float(time_stamp):
+ return True
+ return False
+
+ def CheckImageInstallation(self, time_stamp):
+ """Check if the Image installation and Device reboot is successful.
+
+ An upgrade is considered complete only after the device is rebooted
+ after image installation
+ Args:
+ time_stamp: check if a reboot is done after this time
+ Returns:
+ return True if installation succeeded
+ return False otherwise
+ """
+ self.p_ssh.ExitToAthena()
+ time.sleep(3*self.__short_delay)
+ self.p_ssh.SshRetryFromAthena()
+ count = 0
+ ts = time_stamp
+ while not self.IsRebooted(ts):
+ count +=1
+ if count*self.__short_delay*3 > self.__long_delay:
+ info = self.log.CreateErrorInfo(
+ 'critical',
+ 'Error! File download longer than ' + str(self.__long_delay)
+ + ' seconds, check network. Exit!')
+ self.log.SendLine(self.test_info, info)
+ os.sys.exit(1)
+
+ t = self.GetTimeStamp()
+ if t > ts:
+ ts = t
+ self.p_ssh.ExitToAthena()
+ info = self.log.CreateProgressInfo(
+ '90%', 'Wait ' + str(3*self.__short_delay)
+ + ' seconds for image installation and device reboot...')
+ self.log.SendLine(self.test_info, info)
+ time.sleep(3*self.__short_delay)
+ self.p_ssh.SshRetryFromAthena()
+
+ time.sleep(2*self.__short_delay)
+ self.p_ssh.SendCmd('more /etc/version')
+ line = self.p_ssh.GetCmdOutput()[1]
+ crt_version = line.strip()
+ print crt_version + '########' + self.params['expected_version']
+ if crt_version == self.params['expected_version']:
+ return True
+ else:
+ return False
+
+ def Run(self):
+ """Run the test case."""
+ ####### Add your code here -- BEGIN #######
+ print 'Test Started...'
+ self.Init()
+ self.p_ssh.SshToAthena()
+ info = self.log.CreateProgressInfo(
+ '5%', 'SSH session to Athena server successfully established!')
+ self.log.SendLine(self.test_info, info)
+
+ max_test_idx = 2
+ current_test_idx = 1
+
+ while current_test_idx < max_test_idx:
+ self.p_ssh.SshRetryFromAthena()
+ info = self.log.CreateResultInfo(
+ '---', 'Test Sequence Number: ' + str(current_test_idx))
+ self.log.SendLine(self.test_info, info)
+ while self.CheckDownloadFileExists():
+ # current in download
+ info = self.log.CreateProgressInfo(
+ '10%', 'Another downloading in progress, retry after '
+ + str(self.__short_delay) + 'seconds...')
+ self.log.SendLine(self.test_info, info)
+ self.p_ssh.ExitToAthena()
+ time.sleep(self.__short_delay)
+ self.p_ssh.SshRetryFromAthena()
+
+ info = self.log.CreateProgressInfo(
+ '15%', 'SSH session to device successfully established!')
+ self.log.SendLine(self.test_info, info)
+
+ self.CheckACSUrl()
+ info = self.log.CreateProgressInfo('20%', 'Current ACS url: '
+ + self.params['ACS_URL'])
+ self.log.SendLine(self.test_info, info)
+
+ self.CheckVersion()
+ info = self.log.CreateProgressInfo('30%', 'Current Version: '
+ + self.params['current_version'])
+ self.log.SendLine(self.test_info, info)
+
+ # Need to do:
+ # check if the currently version is already expected version,
+ # if so, downgrade first, then Run this test
+ #if self.params['current_version'] == self.params['expected_version']:
+ # info = self.log.createErrorInfo(
+ # 'low', 'Currently image is expected version, '
+ # + 'please try other images.')
+ # self.log.sendLine(self.test_info, info)
+
+ self.time_stamp = self.ConfigACS()
+ info = self.log.CreateProgressInfo('40%', 'Expected Version: '
+ + self.params['expected_version'])
+ self.log.SendLine(self.test_info, info)
+ info = self.log.CreateProgressInfo(
+ '40%', 'Successfully configured ACS for expected image version')
+ self.log.SendLine(self.test_info, info)
+
+ time.sleep(self.__short_delay) # time delay for log update
+ if self.CheckImageInstallation(self.time_stamp):
+ info = self.log.CreateProgressInfo(
+ '100%', 'Image successfully upgraded to expected version: '
+ 'from: ' + self.params['current_version']
+ + ' to: ' + self.params['expected_version'])
+ self.log.SendLine(self.test_info, info)
+ info = self.log.CreateResultInfo(
+ 'Pass', 'Image successfully upgraded to expected version: '
+ 'from: ' + self.params['current_version']
+ + ' to: ' + self.params['expected_version'])
+ self.log.SendLine(self.test_info, info)
+ else:
+ info = self.log.CreateErrorInfo(
+ 'critical',
+ 'Error! Image version does not match expected version!')
+ self.log.SendLine(self.test_info, info)
+
+ current_test_idx += 1
+ self.p_ssh.ExitToAthena()
+ # Need to do:
+ # downgrade from expected_version
+ # To verify the capability of the download handling on expected version
+ # image
+ print 'Test Completed...'
+ ####### Add your code here -- END #######
+
+ def Destructor(self):
+ """This is the Destructor function to call for the user extened class."""
+ # exit the ssh tunnel
+ self.p_ssh.Close()
+
+
+if __name__ == '__main__':
+ DownloadTest(testID='11843140', key_word='Download')
\ No newline at end of file
diff --git a/logging.py b/logging.py
new file mode 100644
index 0000000..ade9f50
--- /dev/null
+++ b/logging.py
@@ -0,0 +1,245 @@
+#!/usr/bin/python
+# Copyright 2012 Google Inc. All Rights Reserved.
+
+"""This Class writes test progress/error/result to files.
+
+This file defines the logging function of the automation test. It
+logs test progress, results, error/warning to different files:
+
+ results.txt: this is the file to log the test output
+ error.txt: all test errors are logged into this file
+ prompt.txt: a copy of terminal output, which hints the progress
+ of the current test.
+"""
+
+__author__ = 'Lehan Meng (lmeng@google.com)'
+
+
+import datetime
+
+
+class Logging(object):
+ """This Class collects and writes test information to log files."""
+
+ params = {'std_out': '1',
+ 'f_result': 'result.log',
+ 'f_error': 'error.log',
+ 'f_progress': 'progress.log',
+ 'delimiter': '\t'}
+
+ def __init__(self, **kwargs):
+ """initiate the logging instance.
+
+ Args:
+ kwargs['f_result']: file name for test result
+ kwargs['f_error']: file name for test errors
+ kwargs['f_progres']: file name for test progres
+ kwargs['std_out']: by default, send a copy of the log info to stdout
+ """
+ for s in ('f_result', 'f_error', 'f_progress', 'std_out'):
+ if s in kwargs:
+ self.params[s] = kwargs[s]
+
+ self.rst_file = open(self.params['f_result'], 'a')
+ self.err_file = open(self.params['f_error'], 'a')
+ self.prg_file = open(self.params['f_progress'], 'a')
+ self.NewTestStart()
+
+ def SendLineToResult(self, test_info, result_info):
+ """Send test result (one-line) information to the result file.
+
+ Args:
+ test_info: test case related info: testID, start_time, key_word, etc
+ result_info: test results {Pass_Fail: pass, note: Extra_Information}
+ Returns:
+ return the status of file write operation
+ """
+ now = datetime.datetime.now()
+ if test_info is not None:
+ line = (now.strftime('%Y-%m-%d %H:%M') + self.params['delimiter']
+ + 'testID:' + test_info['testID']
+ + self.params['delimiter'] + 'TestStartAt:'
+ + test_info['start_time']
+ + self.params['delimiter'] + 'Keyword:' + test_info['key_word']
+ + self.params['delimiter'] + result_info['Pass_Fail']
+ + self.params['delimiter'] + result_info['note'] + '\n')
+ else:
+ line = ('Result log information should have testInfo available! '
+ + self.params['delimiter'] + 'No logging performed')
+
+ if self.params['std_out'] > 0:
+ print line
+ return self.rst_file.write(line)
+
+ def NewTestStart(self):
+ """Mark the start of a new test in the file."""
+ self.rst_file.write(
+ '################### NEW TEST STARTED ###################\n')
+ self.err_file.write(
+ '################### NEW TEST STARTED ###################\n')
+ self.prg_file.write(
+ '################### NEW TEST STARTED ###################\n')
+
+ def SendLineToError(self, test_info, error_info):
+ """Send test error (one-line) information to the error file.
+
+ Args:
+ test_info: test case related info: testID, start_time, key_word, etc
+ error_info: test errors, severity: critical/intermediate/low, and info
+ Returns:
+ returns the status of file write operation
+ """
+ now = datetime.datetime.now()
+ if test_info is not None:
+ line = (now.strftime('%Y-%m-%d %H:%M') + self.params['delimiter']
+ + 'testID:' + test_info['testID'] + self.params['delimiter']
+ + 'TestStartAt:' + test_info['start_time']
+ + self.params['delimiter'] + 'Keyword:'
+ + test_info['key_word'] + self.params['delimiter']
+ + error_info['severity'] + self.params['delimiter']
+ + error_info['note'] + '\n')
+ else:
+ line = (now.strftime('%Y-%m-%d %H:%M') + self.params['delimiter'] +
+ error_info['severity'] + self.params['delimiter']
+ + error_info['note'] + '\n')
+
+ if self.params['std_out'] > 0:
+ print line
+ return self.err_file.write(line)
+
+ def SendLineToProgress(self, test_info, progress_info):
+ """Send test progress (one-line) information to the progress file.
+
+ Args:
+ test_info: test case related info: testID, start_time, key_word, etc
+ progress_info: test progress indication
+ {percent: [0-100]%, note: Information}
+ Returns:
+ returns the status of file write operation
+ """
+ now = datetime.datetime.now()
+ if test_info is not None:
+ line = (now.strftime('%Y-%m-%d %H:%M') + self.params['delimiter']
+ + 'testID:' + test_info['testID'] + self.params['delimiter']
+ + 'TestStartAt:'+ test_info['start_time']
+ + self.params['delimiter'] + 'Keyword:' + test_info['key_word']
+ + self.params['delimiter'] + progress_info['percent']
+ + self.params['delimiter'] + progress_info['note'] + '\n')
+ else:
+ line = (now.strftime('%Y-%m-%d %H:%M') + self.params['delimiter']
+ + progress_info['percent'] + self.params['delimiter']
+ + progress_info['note'] + '\n')
+
+ if self.params['std_out'] > 0:
+ print line
+ return self.prg_file.write(line)
+
+ def SendLine(self, test_info, info):
+ """Send test (one-line) information to the corresponding log file.
+
+ Args:
+ test_info: test case related info: testID, start_time, key_word, etc
+ info: test results/error/progress information (single line)
+ test_info could be "None" for class other than TestCase
+ Returns:
+ return the status of file write operation
+ """
+ if info['type'] == 'result':
+ return self.SendLineToResult(test_info, info)
+ elif info['type'] == 'error':
+ return self.SendLineToError(test_info, info)
+ elif info['type'] == 'progress':
+ return self.SendLineToProgress(test_info, info)
+
+ def SendLines(self, test_info, info):
+ """Send test output (multiple lines) to the corresponding log file.
+
+ Args:
+ test_info: test case related info: testID, start_time, key_word, etc
+ info: test results/error/progress information (single line)
+ test_info could be "None" for class other than TestCase
+ Returns:
+ return 1 when succeed
+ """
+ if info['type'] == 'result':
+ for line in info['note'].split('\n'):
+ if line == info['note'].split('\n')[0]:
+ dup_info = info.copy()
+ dup_info['note'] = line
+ self.SendLineToResult(test_info, dup_info)
+ else:
+ self.rst_file.write(line + '\n')
+ if self.params['std_out']: print line
+ self.rst_file.flush()
+ elif info['type'] == 'error':
+ for line in info['note'].split('\n'):
+ if line == info['note'].split('\n')[0]:
+ dup_info = info.copy()
+ dup_info['note'] = line
+ self.SendLineToError(test_info, dup_info)
+ else:
+ self.err_file.write(line + '\n')
+ if self.params['std_out']: print line
+ self.err_file.flush()
+ elif info['type'] == 'progress':
+ for line in info['note'].split('\n'):
+ if line == info['note'].split('\n')[0]:
+ dup_info = info.copy()
+ dup_info['note'] = line
+ self.SendLineToProgress(test_info, dup_info)
+ else:
+ self.prg_file.write(line + '\n')
+ if self.params['std_out']: print line
+ self.prg_file.flush()
+ return 1
+
+ def CreateResultInfo(self, status='Pass', msg=''):
+ """Create the result Info structure for logging.
+
+ Args:
+ status: has value 'Pass' or 'Fail'
+ msg: test results information line(s)
+ Returns:
+ returns the infomation structure that should be logged to file
+ """
+ d = {}
+ d.setdefault('type', 'result')
+ d.setdefault('Pass_Fail', status)
+ d.setdefault('note', msg)
+ return d
+
+ def CreateErrorInfo(self, severity='intermediate', msg=''):
+ """Create the error Info structure for logging.
+
+ Args:
+ severity: has value 'low', 'intermediate' or 'critical'
+ msg: test error information line(s)
+ Returns:
+ returns the infomation structure that should be logged to file
+ """
+ d = {}
+ d.setdefault('type', 'error')
+ d.setdefault('severity', severity)
+ d.setdefault('note', msg)
+ return d
+
+ def CreateProgressInfo(self, percent='0%', msg=''):
+ """Create the progress Info structure for logging.
+
+ Args:
+ percent: completeness of test progress, in %
+ msg: test progress information line(s)
+ Returns:
+ returns the infomation structure that should be logged to file
+ """
+ d = {}
+ d.setdefault('percent', percent)
+ d.setdefault('type', 'progress')
+ d.setdefault('note', msg)
+ return d
+
+ def __del__(self):
+ """Close the log files."""
+ self.rst_file.close()
+ self.err_file.close()
+ self.prg_file.close()
\ No newline at end of file
diff --git a/ssh.py b/ssh.py
new file mode 100644
index 0000000..8b9f2e3
--- /dev/null
+++ b/ssh.py
@@ -0,0 +1,406 @@
+#!/usr/bin/python
+# Copyright 2012 Google Inc. All Rights Reserved.
+
+"""This class creates SSH session to Device.
+
+This file defines the command/console interaction between the test
+instance and the CPE device. This includes ssh tunnel establishment,
+shell command passing, and device log collection, etc.
+
+"""
+
+__author__ = 'Lehan Meng (lmeng@google.com)'
+
+
+import os
+import tempfile
+import time
+
+import pexpect
+
+
+class SSH(object):
+ """SSH class that create a ssh tunnel to the CPE device.
+
+ ssh tunnel can be established from various locations:
+ - from PC that on the same LAN of the device
+ - from PC that must ssh to device over authentication server (e.g., jump)
+ - from PC that must ssh to device over lab server (e.g., athenasrv3)
+
+ """
+
+ def __init__(self, dev=None, **kwargs):
+ """Initiated the SSH class.
+
+ If a device is used to initiate the SSH Class, try to create an ssh tunnel
+ using parameters provided by this device, otherwise use default parameter
+ values.
+
+ Args:
+ dev: a device instance, which has necessary parameters to create a
+ tunnel
+ kwargs['user']: ssh user name
+ kwargs['addr']: device ip address
+ kwargs['pwd']: ssh password
+ kwargs['bruno_prompt']: device shell command prompt
+ etc.
+ """
+
+ self.params = {
+ 'user': 'root',
+ 'addr': None,
+ 'addr_ipv6': None,
+ 'pwd': 'google',
+ 'bruno_prompt': '\(none\)#',
+ 'jump_server': 'jmp.googlefiber.net',
+ 'jump_prompt': None,
+ 'athena_user': None,
+ 'athena_pwd': None,
+ 'athena_prompt': None}
+
+ self.tmp_file = tempfile.TemporaryFile(mode='w+t')
+
+ self.__short_delay = 5 # 5 seconds delay
+ self.__delay = 10 # medium delay
+ self.__long_delay = 30 #long delay
+
+ for s in kwargs:
+ self.params[s] = kwargs[s]
+
+ #populate prompt on jump and athena server:
+ if self.params['athena_user'] is not 'None':
+ self.params['jump_prompt'] = self.params['athena_user'] + '@jmp101.nuq1:'
+ self.params['athena_prompt'] = ('[' + self.params['athena_user']
+ + '@athenasrv3 ~]$')
+
+ if dev is not None:
+ # parameters used to open an ssh tunnel to this device
+ for s in ('user', 'addr', 'pwd', 'bruno_prompt', 'addr_ipv6'):
+ self.params[s] = kwargs[s]
+
+ def Setlogging(self, logging):
+ """Setup the logging file(s)."""
+ self.log = logging
+
+ def Ssh(self):
+ """Setup an ssh tunnel to device, return immediately upon failure.
+
+ This method create an ssh tunnel from PC that on the same network of device
+ Returns:
+ return True when succeed, return False otherwise
+ """
+ # clear the temp file
+ self.tmp_file.close()
+ self.tmp_file = tempfile.TemporaryFile(mode='w+t')
+ ssh_newkey = 'Are you sure you want to continue connecting'
+
+ if self.params['addr_ipv6']:
+ addr = self.params['addr_ipv6']
+ else:
+ addr = self.params['addr']
+
+ p_ssh = pexpect.spawn('ssh '+self.params['user']+'@'+addr)
+ i = p_ssh.expect(
+ [ssh_newkey, 'password:', self.params['bruno_prompt'], pexpect.EOF,
+ '\(none\)#', 'gfibertv#', pexpect.TIMEOUT])
+
+ while 1:
+ if i == 0:
+ print 'choose Yes'
+ p_ssh.sendline('yes')
+ i = p_ssh.expect(
+ [ssh_newkey, 'password:', self.params['bruno_prompt'], pexpect.EOF])
+ if i == 1:
+ print 'need password'
+ p_ssh.sendline(self.param['pwd'])
+ i = p_ssh.expect(
+ [ssh_newkey, 'password:', self.params['bruno_prompt'], pexpect.EOF])
+ if i == 2 or i == 4 or i == 5:
+ print 'Login OK'
+ p_ssh.logfile = self.tmp_file
+ self.p_ssh = p_ssh
+ if i == 4: self.params['bruno_prompt'] = '\(none\)#'
+ if i == 5: self.params['bruno_prompt'] = 'gfibertv#'
+ return True
+ if i == 3 or i == 6:
+ print 'Key or connection timeout'
+ return False
+ if i < 0 or i > 6:
+ print 'Error, quit'
+ return False
+
+ def SshFromAthena(self):
+ """Initiate an ssh tunnel from the athenasrv3 to the device.
+
+ Returns:
+ return True when succeed, return False otherwise
+ """
+ # clear the temp file
+ self.tmp_file.close()
+ self.tmp_file = tempfile.TemporaryFile(mode='w+t')
+ self.p_ssh.logfile = self.tmp_file
+ #Const
+ ssh_newkey = 'Are you sure you want to continue connecting'
+
+ self.p_ssh.sendline('ssh-agent /bin/bash')
+ self.p_ssh.expect(self.params['athena_prompt'])
+ #p_ssh.expect(self.params['athena_prompt'])
+ #print self.p_ssh.before, self.p_ssh.after
+ print '========================'
+ print 'add bruno key file:'
+ self.p_ssh.sendline('ssh-add /home/' + self.params['athena_user']
+ + '/.ssh/bruno-sshkey')
+ self.p_ssh.expect(self.params['athena_prompt'])
+ #print self.p_ssh.before
+
+ if self.params['addr_ipv6']:
+ addr = self.params['addr_ipv6']
+ else:
+ addr = self.params['addr']
+
+ self.p_ssh.sendline('ssh '+self.params['user']+'@'+addr)
+ i = self.p_ssh.expect(
+ [ssh_newkey, 'password:', self.params['bruno_prompt'], pexpect.EOF,
+ '\(none\)#', 'gfibertv#', pexpect.TIMEOUT])
+
+ while 1:
+ if i == 0:
+ print 'choose Yes'
+ self.p_ssh.sendline('yes')
+ i = self.p_ssh.expect(
+ [ssh_newkey, 'password:', self.params['bruno_prompt'], pexpect.EOF])
+ if i == 1:
+ print 'need password'
+ self.p_ssh.sendline(self.params['pwd'])
+ i = self.p_ssh.expect(
+ [ssh_newkey, 'password:', self.params['bruno_prompt'], pexpect.EOF])
+ if i == 2 or i == 4 or i == 5:
+ print 'Login OK'
+ self.p_ssh.logfile = self.tmp_file
+ if i == 4: self.params['bruno_prompt'] = '\(none\)#'
+ if i == 5: self.params['bruno_prompt'] = 'gfibertv#'
+ return True
+ if i == 3 or i == 6:
+ print 'Key or connection timeout'
+ return False
+ if i < 0 or i > 6:
+ print 'Error, quit'
+ return False
+
+ def ExitToAthena(self):
+ """Terminate the ssh tunnel to device, and exit to athenasrv3."""
+ self.p_ssh.sendline('exit')
+ self.p_ssh.expect(self.params['athena_prompt'])
+ self.p_ssh.sendline('exit')
+ self.p_ssh.expect(self.params['athena_prompt'])
+
+ def SshToAthena(self):
+ """ssh to athenasrv3, over jump server from a Google PC.
+
+ This is for the reason that some test environment can only be accessed via
+ corp PC, authentication server (e.g., jump) and lab server
+ (e.g., athenasrv3)
+ Returns:
+ return True upon success
+ """
+ p_ssh = pexpect.spawn('ssh -a ' + self.params['jump_server'])
+ print 'ssh -a jmp.googlefiber.net ...'
+ #p_ssh.expect('lmeng@jmp101.nuq1:.*[\\$]')
+ i = p_ssh.expect(
+ [self.params['jump_prompt'], pexpect.TIMEOUT, pexpect.EOF],
+ self.__short_delay)
+ if i == 1 or i == 2:
+ info = self.log.CreateErrorInfo(
+ 'critical', 'Cannot establish SSH tunnel to jump server.'
+ 'Timeout or incorrect prompt!')
+ self.log.SendLine(None, info)
+
+ print p_ssh.before, p_ssh.after
+ print '========================'
+ p_ssh.sendline('ssh -a ' + self.params['athena_user'] + '@10.1.16.250')
+ print 'ssh -a ' + self.params['athena_user'] + '@10.1.16.250 ...'
+ i = p_ssh.expect([self.params['athena_user'] + '@10.1.16.250\'s password:',
+ pexpect.TIMEOUT, pexpect.EOF], self.__short_delay)
+ if i == 1 or i == 2:
+ info = self.log.CreateErrorInfo(
+ 'critical', 'Cannot establish SSH tunnel to athena server.'
+ 'Timeout or incorrect prompt!')
+ self.log.SendLine(None, info)
+
+ print '========================'
+ print self.params['athena_user'] + '@10.1.16.250\'s password:'
+ p_ssh.sendline(self.params['athena_pwd'])
+ p_ssh.expect(self.params['athena_prompt'])
+ p_ssh.expect(self.params['athena_prompt'])
+ print p_ssh.before
+ print '========================'
+ print 'login athena3'
+ self.p_ssh = p_ssh
+
+ def SshRetry(self, max_retry=20, retry_delay=15):
+ """Establish ssh connection to Device, retry upon failure.
+
+ Args:
+ max_retry: the total number of retry
+ retry_delay: delay between each ssh try
+ """
+ tunnel = self.Ssh()
+ if not tunnel:
+ retry = 0
+ while not tunnel and retry < max_retry:
+ delay = retry_delay
+ info = self.log.CreateErrorInfo(
+ 'Warning', 'Failed to create ssh tunnel, retry after '
+ + str(delay) + ' seconds')
+ self.log.SendLine(None, info)
+ time.sleep(delay)
+ tunnel = self.p_ssh.Ssh()
+ retry += 1
+
+ if retry >= max_retry:
+ info = self.log.CreateErrorInfo(
+ 'critical', 'Cannot establish ssh tunnel to Device within '
+ + str(max_retry*delay)
+ + ' seconds! Timeout or incorrect command prompt!')
+ self.log.SendLine(None, info)
+ os.sys.exit(1)
+
+ info = self.log.CreateProgressInfo(
+ '---', 'ssh session to Device successfully established!')
+ self.log.SendLine(None, info)
+
+ def SshRetryFromAthena(self, max_retry=5, retry_delay=15):
+ """Establish ssh connection to Device from athenasrv3, retry upon failure.
+
+ Args:
+ max_retry: the total number of retry
+ retry_delay: delay between each ssh try
+ """
+ tunnel = self.SshFromAthena()
+ if not tunnel:
+ retry = 0
+ while not tunnel and retry < max_retry:
+ delay = retry_delay
+ info = self.log.CreateErrorInfo(
+ 'Warning', 'Failed to create ssh tunnel, retry after '
+ + str(delay) + ' seconds')
+ self.log.SendLine(None, info)
+ time.sleep(delay)
+ tunnel = self.SshFromAthena()
+ retry += 1
+
+ if retry >= max_retry:
+ info = self.log.CreateErrorInfo(
+ 'critical', 'Cannot establish ssh tunnel to Device within '
+ + str(max_retry*delay)
+ + ' seconds! Timeout or incorrect command prompt!')
+ self.log.SendLine(None, info)
+ os.sys.exit(1)
+
+ info = self.log.CreateProgressInfo(
+ '---', 'ssh session to Device successfully established!')
+ self.log.SendLine(None, info)
+
+ def SendCmd(self, cmd):
+ """Send a command to the Device over ssh tunnel."""
+ self.p_ssh.sendline(cmd)
+ i = self.p_ssh.expect(
+ [self.params['bruno_prompt'], pexpect.EOF, pexpect.TIMEOUT], 10)
+
+ while 1:
+ if i == 0:
+ # send command successfully
+ return True
+ if i == 1 or i == 2:
+ # prompt not returned
+ info = self.log.CreateErrorInfo(
+ 'Warning', 'Device not responding to shell command or timeout.'
+ ' Connect after 3 seconds ...')
+ self.log.SendLine(None, info)
+ time.sleep(3)
+ self.SshRetryFromAthena()
+ self.p_ssh.sendline(cmd)
+ i = self.p_ssh.expect(
+ [self.params['bruno_prompt'], pexpect.EOF, pexpect.TIMEOUT], 5)
+
+ def GetCmdOutput(self, buff_size=0):
+ """Get lines from the command output of the device, in a bottom up manner.
+
+ Args:
+ buff_size: if buff_size<=0, return all output of a command
+ if buff_size>0, return 'buff_size' from command output
+ Returns:
+ f_list: The most recent output lines comes at the begging of the f_list
+ """
+ self.tmp_file.seek(0)
+ if buff_size > 0:
+ f_list = reversed(self.tmp_file.readlines()[-buff_size:])
+ else:
+ f_list = reversed(self.tmp_file.readlines())
+ f_list = list(f_list)
+ return f_list
+
+ def IsClosed(self):
+ """Verify if the ssh tunnel is closed.
+
+ Returns:
+ return the status of ssh tunnel
+ """
+ return self.p_ssh.closed
+
+ def ExitStatus(self):
+ """Exit status of the ssh process.
+
+ Returns:
+ return the exit status of ssh process
+ """
+ return self.p_ssh.ExitStatus
+
+ def SignalStatus(self):
+ """Signal status of the ssh process.
+
+ Returns:
+ return the signal status of ssh process
+ """
+ return self.p_ssh.SignalStatus
+
+ def Close(self):
+ # exit the ssh tunnel
+ self.p_ssh.sendline('exit')
+ i = self.expect([self.p_ssh.params['athena_prompt'],
+ self.p_ssh.params['jump_prompt'],
+ self.p_ssh.params['bruno_prompt'],
+ pexpect.EOF, pexpect.TIMEOUT])
+ while 1:
+ if i == 0:
+ # now at athena server:
+ self.p_ssh.sendline('exit')
+ i = self.expect([self.p_ssh.params['athena_prompt'],
+ self.p_ssh.params['jump_prompt'],
+ self.p_ssh.params['bruno_prompt'],
+ pexpect.EOF, pexpect.TIMEOUT])
+ elif i == 1:
+ # now at jump server:
+ self.p_ssh.sendline('exit')
+ break
+ elif i == 2:
+ # now at bruno:
+ self.p_ssh.sendline('exit')
+ i = self.expect([self.p_ssh.params['athena_prompt'],
+ self.p_ssh.params['jump_prompt'],
+ self.p_ssh.params['bruno_prompt'],
+ pexpect.EOF, pexpect.TIMEOUT])
+ elif i == 3 or i == 4:
+ # time out:
+ print 'Timeout when closing the ssh tunnel!'
+ break
+ else:
+ print 'Error'
+ break
+
+ def __del__(self):
+ self.tempFile.Close()
+ self.p_ssh.Close()
+
+# End of SSH class
+
diff --git a/testCase.py b/testCase.py
new file mode 100644
index 0000000..81e289d
--- /dev/null
+++ b/testCase.py
@@ -0,0 +1,162 @@
+#!/usr/bin/python
+# Copyright 2012 Google Inc. All Rights Reserved.
+
+"""This class defines the base class for automated Bruno test cases.
+
+This file also defines some basic functions that is required for
+most test cases. Users can define their own functions in the extended
+class, the Run() method should be use as the start point of test case.
+"""
+
+__author__ = 'Lehan Meng (lmeng@google.com)'
+
+import datetime
+import logging
+import re
+
+
+class TestCase(object):
+ """Base class for all test cases.
+
+ This base class provides basic functions for the test case:
+ - configure file parsing
+ - logging
+ - test information creation
+ - etc.
+
+ Test case starts in the Run() method, test case specific code can be
+ placed in here.
+ """
+
+ def __init__(self, **kwargs):
+ """Initiate a test class instance.
+
+ Args:
+ kwargs['testID']: The assigned test ID on TCM
+ kwargs['title']: Title (section number) of the test case
+ kwargs['start_time']: Start time of the test case (for statistic purpose)
+ kwargs['key_word']: the label of the test case
+ kwargs['description']: description if required
+ """
+ self.test_info = {
+ 'testID': '0',
+ 'title': 'None',
+ 'start_time': str(datetime.datetime.now()
+ .strftime('%Y-%m-%d %H:%M')),
+ 'key_word': 'None',
+ 'description': 'None',
+ 'configFile': 'config.cfg'}
+ self.params = {}
+
+ for s in ('testID', 'title', 'start_time', 'key_word',
+ 'description', 'configFile'):
+ if s in kwargs:
+ self.test_info[s] = kwargs[s]
+
+ self.log = logging.Logging()
+ self.ParseConfig()
+ self.Run()
+
+ def ParseConfig(self):
+ """Parse the configuration file for test case.
+
+ The structure of the configuration file:
+ Each test case configuration begins with its testID in a sigle line,
+ the following lines have the format:
+ ############# configuration for Test 11362 ########################
+ testID: 11362
+ parameter_1 = value_1
+ parameter_2 = value_2
+ parameter_3 = value_3
+ ...
+
+ ############# configuration for Test 11363 ########################
+ testID: 11363
+ parameter_1 = value_1
+ parameter_2 = value_2
+ parameter_3 = value_3
+ ...
+ ############# comment line start with '#' #########################
+
+ Returns:
+ reuturn 1 if succeed
+ """
+ # dictionary for params
+ self.cfg_file = open(self.test_info['configFile'], 'r')
+
+ cfg_list = self.cfg_file.readlines()
+ test_id_matched = False
+
+ for line in cfg_list:
+ s = line.lstrip().strip().strip('\n')
+ m = re.match('testID: ' + str(self.test_info['testID']), s)
+ if (m is not None) and not s.startswith('#'):
+ # test case matching succeed
+ test_id_matched = True
+ # get next line index:
+ index = cfg_list.index(line)+1
+
+ if index >= len(cfg_list):
+ # end of file reached
+ info = self.log.createErrorInfo(
+ 'Low',
+ 'Warning: No configuration found for this test case in file: '
+ + self.test_info['configFile'])
+ self.log.sendLine(self.test_info, info)
+
+ s = cfg_list[index]
+ s = s.lstrip().rstrip()
+ s = s.split('#', 2)[0]
+
+ m = re.match('testID: ', s)
+ while m is None:
+ # not reaching configuration for next test case,
+ # read and create current test case configuration data as dictionary
+ # format: parameter = value
+ if s is not '' and not s.startswith('#'):
+ name = s.split('=')[0].strip().lstrip()
+ value = s.split('=')[1].strip().lstrip()
+ if not value or value == 'None':
+ self.params[name] = None
+ else:
+ self.params[name] = value
+
+ index += 1
+ if index == len(cfg_list): break
+ s = cfg_list[index]
+ s = s.lstrip().rstrip()
+ m = re.match('testID: ', s)
+
+ # all configuration parsed
+ self.cfg_file.close()
+ if not test_id_matched:
+ # end of file reached
+ info = self.log.createErrorInfo(
+ 'Low', 'Warning: No configuration found for this test case in file: '
+ + self.test_info['configFile'])
+ self.log.sendLine(self.test_info, info)
+ return True
+
+ def AddConfigParams(self, par='par', value='value'):
+ """Add new parameter to the params list.
+
+ Args:
+ par: parameter name
+ value: parameter value
+ """
+ self.params.setdefault(par.strip().lstrip(), value.strip().lstrip())
+
+ def __del__(self):
+ pass
+
+ #################### define your own functions here #######################
+ def Run(self):
+ """Starts to Run the test case."""
+ ##### Add your code here -- BEGIN #######
+ print 'Test Started...'
+
+ print 'Test Completed...'
+ ##### Add your code here -- END #######
+
+ def Destructor(self):
+ pass
\ No newline at end of file