| #!/usr/bin/python |
| # Copyright 2012 Google Inc. All Rights Reserved. |
| |
| """This class implements some basic TR069 test cases. |
| |
| This class extends the TestCase Class. |
| This file implements some basic TR069 test cases to verify Bruno management |
| features. In the following test cases, only those that are defined in |
| 'config.cfg' file will be executed. |
| |
| Basic TR069 test cases include the following: |
| testID: 11865129 7.6.1 ACS Connection via IPv6 |
| testID: 11721375 7.6.10 Verify there is a way to reset to factory default |
| if user modifies the local configuration |
| '/etc/init.d/S05factoryreset later' |
| testID: 14319146 7.6.19 Default ACS URL |
| testID: 11721371 7.6.2 ACS URL Pass to Bruno via Moca |
| testID: 14312009 7.6.21 Restrict the ability to locally configure ACS URL |
| testID: 14186028 7.6.22 Security of the link between the DHCPv6 server |
| and the CPE |
| testID: 14312010 7.6.23 CPE Connection Initiation |
| testID: 14247012 7.6.24.1 Support of Connection Request notification |
| mechanism |
| testID: 14186029 7.6.24.2 InternetGatewayDevice.ManagementServer. |
| ConnectionRequestURL |
| testID: 14312011 7.6.24.3 InternetGatewayDevice.ManagementServer. |
| ConnectionRequestURL format |
| testID: 14293012 7.6.24.4 No data is conveyed in the Connection |
| Request HTTP Get notification |
| testID: 14165037 7.6.24.5 Restrict the number of Connection Request |
| notifications |
| testID: 14315010 7.6.24.6 Connection Request Notification does not |
| terminate current session |
| testID: 14247013 7.6.25.1 MaxEnvelopes |
| testID: 14312012 7.6.25.2 Cookie support (optional) |
| testID: 14293013 7.6.25.4 Authentication |
| testID: 14311008 7.6.26.1 SOAP namespace identifier |
| testID: 14293014 7.6.26.2 SOAP Fault element |
| testID: 14311009 7.6.26.3 SOAP future extensibility |
| testID: 14186030 7.6.27 Fault Handling |
| testID: 14319148 7.6.27.1 SOAP requests respond |
| testID: 14266497 7.6.27.2 Session Termination |
| testID: 14312013 7.6.28 Correctly handle concurrent downloads in catawampus |
| testID: 11874098 7.6.3.1 TLS (1.2)/SSL based Transport for TR-069 on IPv6 |
| testID: 14315007 7.6.3.2 Support for encryption algorithms |
| testID: 14247014 7.6.32 Persist TR-069 config across reboots |
| testID: 14293015 7.6.33 Send notification when disk is full |
| testID: 14293016 7.6.34 Support for multiple SSIDs in WLANConfiguration |
| testID: 11865130 7.6.4 TR069-ACS Validation |
| testID: 11722370 7.6.5 TR069-Inform Request |
| testID: 11874103 7.6.7 Digest Authentication |
| testID: 11843139 7.6.8.1 Support of RPCs: REBOOT |
| """ |
| |
| __author__ = 'Lehan Meng (lmeng@google.com)' |
| |
| |
| import os |
| import re |
| import time |
| |
| from IPy import IP |
| |
| import device |
| import ssh |
| import testCase |
| |
| |
| class BasicTR069Test(testCase.TestCase): |
| """Class for test cases: Bruno basic TR069 features. |
| |
| A range of basic TR069 test cases are defined in this Class |
| """ |
| |
| def Init(self): |
| """Initialize the networking feature tests.""" |
| # Get all device information. |
| info = self.log.CreateResultInfo( |
| '---', 'Networking test verification started...') |
| self.log.SendLine(self.test_info, info) |
| self.bruno = device.Bruno(user=self.params['user'], |
| addr=self.params['addr'], |
| bruno_prompt=self.params['bruno_prompt'], |
| addr_ipv6=self.params['addr_ipv6']) |
| self.bruno.SetLogging(self.log) |
| if not self.p_ssh: |
| self.p_ssh = ssh.SSH(user=self.params['user'], |
| addr=self.params['addr'], |
| bruno_prompt=self.params['bruno_prompt'], |
| addr_ipv6=self.params['addr_ipv6']) |
| self.bruno.dev_ssh = self.p_ssh |
| self.p_ssh.SetLogging(self.log) |
| |
| self.__delay_5_sec = 5 |
| self.__delay_5_min = 300 |
| self.__delay_15_min = 900 |
| return |
| |
| def SshTunnel(self): |
| """Create ssh tunnel for the test.""" |
| self.p_ssh.SshRetry(5, 15) |
| return |
| |
| def VerifyDuplicatedAddress(self, ip_addr): |
| """Verify device can detect duplicated ipv6 address existing in network. |
| |
| Set a duplicated ip address on the device. |
| Verify that the device can detect duplicated ip address |
| |
| Args: |
| ip_addr: the duplicated address |
| Returns: |
| True: if duplicated detected |
| False: if duplicated address is not detected |
| """ |
| time_stamp = self.bruno.GetTimeStamp() |
| self.bruno.dev_ssh.SendCmd('ip -6 addr add ' + ip_addr + '/64 dev br0') |
| time.sleep(1) |
| self.bruno.dev_ssh.SendCmd('dmesg') |
| log_list = self.bruno.dev_ssh.GetCmdOutput() |
| |
| pattern_line = ( |
| '\[\d*\.\d{3}\]\s*\w*\d:\s* IPv6 duplicate address\s*(' |
| + ip_addr + ')\s*[/\d+]*\s*detected!') |
| self.bruno.dev_ssh.SendCmd('ip -6 addr del ' + ip_addr + '/64 dev br0') |
| |
| if self.bruno.FindLineInLog(log_list, time_stamp, pattern_line): |
| info = self.log.CreateResultInfo('Pass', 'Duplicated IPv6 address ' |
| 'detected: ' + ip_addr) |
| self.log.SendLine(self.test_info, info) |
| else: |
| info = self.log.CreateResultInfo('Failed', |
| 'Duplicated IPv6 address not detected: ' |
| + ip_addr + '. Verification Failed!') |
| self.log.SendLine(self.test_info, info) |
| #[132344.324] br0: IPv6 duplicate address fe80:... detected! |
| |
| def VerifyExpectedVersion(self): |
| """Verify if device is running on expected software version.""" |
| if not self.bruno.VerifyCurrentVersion(self.params['expected_version']): |
| info = self.log.CreateErrorInfo('critical', |
| 'Device software version is not expected!' |
| ' Exit!') |
| self.log.SendLine(self.test_info, info) |
| os.sys.exit(1) |
| |
| def VerifyACSWithIPv6(self): |
| """Test ID: 11865129 - Verify IPv6 connection to ACS. |
| |
| To verify device connects to ACS via IPv6, use tcpdump to capture |
| network packets, filtered with source IPv6 address, and destination |
| IPv6 address of Bruno and ACS. |
| |
| tcpdump filter: |
| 1. source address: device address |
| destination address: ACS address |
| 2. source address: ACS address |
| destination address: device address |
| |
| if packets on both directions are found, test succeeds. |
| Otherwise, test failed. |
| """ |
| info = self.log.CreateProgressInfo( |
| '10%', 'Verifying current image version, expected: ' |
| + self.params['expected_version']) |
| self.log.SendLine(self.test_info, info) |
| self.VerifyExpectedVersion() |
| |
| # capture 10 packets on both directions. |
| # capture length: 100 bytes |
| packet_to_catch = '5' |
| bytes_to_catch = '100' |
| interface_to_catch = 'br0' |
| tcpdump_command = ('tcpdump -i ' + interface_to_catch + ' -c ' |
| + packet_to_catch + ' -s ' + bytes_to_catch |
| + ' \'(((src ' + self.params['addr_ipv6'] |
| + ') and (dst ' + self.params['acs_ipv6'] |
| + ')) or ((src ' + self.params['acs_ipv6'] |
| + ') and (dst ' + self.params['addr_ipv6'] |
| + ')))\'') |
| info = self.log.CreateProgressInfo( |
| '30%', 'Calling tcpdump to capture device traffic to ACS server: ' |
| 'capturing on interface: ' + interface_to_catch |
| + ' Catch packets: ' + packet_to_catch) |
| self.log.SendLine(self.test_info, info) |
| self.p_ssh.SendCmd(tcpdump_command, 180) |
| info = self.log.CreateProgressInfo( |
| '60%', 'Packets captured. Analyzing...') |
| self.log.SendLine(self.test_info, info) |
| tcpdump_result = self.p_ssh.GetCmdOutput() |
| tcpdump_result.reverse() |
| if self.CheckACSIPv6Connection(tcpdump_result): |
| info = self.log.CreateProgressInfo( |
| '100%', 'TR069 traffic verified: Device connected to ACS via IPv6') |
| self.log.SendLine(self.test_info, info) |
| info = self.log.CreateResultInfo( |
| 'Pass', 'TR069 traffic verified: Device connected to ACS via IPv6') |
| self.log.SendLine(self.test_info, info) |
| else: |
| info = self.log.CreateResultInfo( |
| 'Failed', 'TR069 traffic verified: No IPv6 connection detected.') |
| self.log.SendLine(self.test_info, info) |
| |
| def ParseDumpMsgToPackets(self, tcpdump_result): |
| """Parse tcpdump dump messages information to packets. |
| |
| tcpdump result (for example): |
| 14:24:36.887936 00:1a:11:30:64:e6 (oui Unknown) > a8:d0:e5:a1:e3:02 L1 |
| (oui Unknown), ethertype IPv6 (0x86dd), length 86: L2 |
| 0x0000: 6000 0000 0020 0640 2605 a601 fe00 fd18 `......@&....... L3 |
| 0x0010: 021a 11ff fe30 64e6 2001 4860 8005 0000 .....0d...H`.... L4 |
| 0x0020: 0000 0000 0000 008d e2e8 01bb 73e4 0dc8 ............s... L5 |
| 0x0030: a1bf d109 8011 0429 276b 0000 0101 080a .......)'k...... L6 |
| 0x0040: 0389 b875 d29d 2401 ...u..$. L7 |
| 14:24:36.912863 a8:d0:e5:a1:e3:02 (oui Unknown) > 00:1a:11:30:64:e6 L8 |
| (oui Unknown), ethertype IPv6 (0x86dd), length 86: L9 |
| 0x0000: 6000 0000 0020 0634 2001 4860 8005 0000 `......4..H`.... L10 |
| 0x0010: 0000 0000 0000 008d 2605 a601 fe00 fd18 ........&....... L11 |
| 0x0020: 021a 11ff fe30 64e6 01bb e2e8 a1bf d109 .....0d......... L12 |
| 0x0030: 73e4 0dc9 8011 05a8 6bdc 0000 0101 080a s.......k....... L13 |
| 0x0040: d29d 763c 0389 b875 ..v<...u L14 |
| |
| This method will parse the dump message to two packets dump: |
| (new packet starts with time stamp: 14:24:36.912863) |
| packet_list[0]: line L1 ~ line L7 |
| packet_list[1]: line L8 ~ line L14 |
| |
| Args: |
| tcpdump_result: raw dump message from tcpdump |
| Returns: |
| packet_list |
| """ |
| packet_list = [] |
| packet = [] |
| for line in tcpdump_result: |
| m = re.search('^\s*\d{2}:\d{2}:\d{2}\.\d+\s*', line) |
| if m: |
| # new packet found: |
| packet = [] |
| packet.append(line) |
| index = tcpdump_result.index(line) |
| while index+1 < len(tcpdump_result): |
| packet_line = tcpdump_result[index+1] |
| m = re.search('^\s*\d{2}:\d{2}:\d{2}\.\d+\s*', packet_line) |
| if not m: |
| # not start of next packet |
| packet.append(packet_line) |
| index +=1 |
| else: |
| # next packet starts |
| packet_list.append(packet) |
| break |
| return packet_list |
| |
| def FindSrcAddrFromTcpdump(self, packet): |
| """Parse source IPv6 address from tcpdump message. |
| |
| Args: |
| packet: tcpdump dump message |
| Returns: |
| source address in packet dump message |
| """ |
| src_addr = None |
| for line in packet: |
| m = re.search('^\s*0x0000:\s*([\da-f]{4})\s*([\da-f]{4})\s*([\da-f]{4})' |
| '\s*([\da-f]{4})\s*([\da-f]{4})\s*([\da-f]{4})' |
| '\s*([\da-f]{4})\s*([\da-f]{4})\s*.*', line) |
| if m: |
| # first line of packet data: |
| src_addr = (m.group(5) + ':' + m.group(6) + ':' + m.group(7) |
| + ':' + m.group(8)) + ':' |
| index = packet.index(line) |
| m = re.search('^\s*0x0010:\s*([\da-f]{4})\s*([\da-f]{4})\s*([\da-f]{4})' |
| '\s*([\da-f]{4})\s*([\da-f]{4})\s*([\da-f]{4})' |
| '\s*([\da-f]{4})\s*([\da-f]{4})\s*.*', packet[index+1]) |
| src_addr += (m.group(1) + ':' + m.group(2) + ':' + m.group(3) |
| + ':' + m.group(4)) |
| break |
| return src_addr |
| |
| def FindDstAddrFromTcpdump(self, packet): |
| """Parse destination IPv6 address from tcpdump message. |
| |
| Args: |
| packet: tcpdump dump message |
| Returns: |
| source address in packet dump message |
| """ |
| dst_addr = None |
| for line in packet: |
| m = re.search('^\s*0x0010:\s*([\da-f]{4})\s*([\da-f]{4})\s*([\da-f]{4})' |
| '\s*([\da-f]{4})\s*([\da-f]{4})\s*([\da-f]{4})' |
| '\s*([\da-f]{4})\s*([\da-f]{4})\s*.*', line) |
| if m: |
| # first line of packet data: |
| dst_addr = (m.group(5) + ':' + m.group(6) + ':' + m.group(7) |
| + ':' + m.group(8)) + ':' |
| index = packet.index(line) |
| m = re.search('^\s*0x0020:\s*([\da-f]{4})\s*([\da-f]{4})\s*([\da-f]{4})' |
| '\s*([\da-f]{4})\s*([\da-f]{4})\s*([\da-f]{4})' |
| '\s*([\da-f]{4})\s*([\da-f]{4})\s*.*', packet[index+1]) |
| dst_addr += (m.group(1) + ':' + m.group(2) + ':' + m.group(3) |
| + ':' + m.group(4)) |
| break |
| return dst_addr |
| |
| def CheckACSIPv6Connection(self, tcpdump_result): |
| """Examine captured packets, find IPv6 communication between ACS and device. |
| |
| Args: |
| tcpdump_result: tcpdump result: captured packets message dump |
| Returns: |
| True: if bi-directional communication found from packet list. |
| False: otherwise |
| """ |
| packet_list = self.ParseDumpMsgToPackets(tcpdump_result) |
| dev_to_acs = False |
| acs_to_dev = False |
| for pacekt in packet_list: |
| src_addr = IP(self.FindSrcAddrFromTcpdump(pacekt)) |
| dst_addr = IP(self.FindDstAddrFromTcpdump(pacekt)) |
| if (src_addr == IP(self.params['addr_ipv6']) |
| and dst_addr == IP(self.params['acs_ipv6'])): |
| dev_to_acs = True |
| if (dst_addr == IP(self.params['addr_ipv6']) |
| and src_addr == IP(self.params['acs_ipv6'])): |
| acs_to_dev = True |
| |
| if dev_to_acs and acs_to_dev: |
| return True |
| else: |
| return False |
| |
| def VerifyFactoryReset(self): |
| """Test case: 11721375, Verify the factory reset function of device. |
| |
| Verify factory reset according to the following conditions: |
| - version |
| - address |
| - cat /etc/ntpd.conf |
| - cat /etc/resolv.conf |
| - cat /tmp/gvsbhost |
| - ls /tmp/gvsb* |
| - cat /tmp/cwmp/acs_url |
| - ls -l /var/media |
| - ls -l /rw/sage |
| |
| This test will need to manually reset the device using reset button. |
| """ |
| test_failed = False |
| info = self.log.CreateProgressInfo( |
| '10%', 'Verifying current image version, expected: ' |
| + self.params['expected_version']) |
| self.log.SendLine(self.test_info, info) |
| self.VerifyExpectedVersion() |
| |
| # get file time information: |
| wiz_bin_date = None |
| self.p_ssh.SendCmd('ls -l /rw/sage') |
| cmd_list = self.p_ssh.GetCmdOutput() |
| for line in cmd_list: |
| if 'Wiz.bin' in line: |
| m = re.search('.*(\d{4}-\d{2}-\d{2}\s*\d+:\d+).*', line) |
| if m: |
| wiz_bin_date = m.group(1) |
| |
| info = self.log.CreateProgressInfo( |
| '20%', 'Please reset the device. ' |
| 'Hold down reset button for 5 seconds...') |
| self.log.SendLine(self.test_info, info) |
| |
| info = self.log.CreateProgressInfo( |
| '30%', 'Wait ' + str(self.__delay_5_min) |
| + ' for device to reset and reboot...') |
| self.log.SendLine(self.test_info, info) |
| time.sleep(self.__delay_5_min) |
| |
| info = self.log.CreateProgressInfo( |
| '40%', 'Connecting to device...') |
| self.log.SendLine(self.test_info, info) |
| self.SshTunnel() |
| |
| addr = self.bruno.GetIPv4Address() |
| if not addr: |
| info = self.log.CreateErrorInfo('critical', |
| 'Error! No IPv4 address found!') |
| self.log.SendLine(self.test_info, info) |
| test_failed = True |
| else: |
| info = self.log.CreateProgressInfo( |
| '50%', 'Checking device IPv4 address succeed: IPv4: ' |
| + addr) |
| self.log.SendLine(self.test_info, info) |
| |
| addr_v6 = self.bruno.GetIPv6Address() |
| if not addr_v6: |
| info = self.log.CreateErrorInfo('warning', 'No IPv6 address found!') |
| self.log.SendLine(self.test_info, info) |
| else: |
| info = self.log.CreateProgressInfo( |
| '55%', 'Checking device IPv6 address succeed: IPv6: ' |
| + addr_v6[0]) |
| self.log.SendLine(self.test_info, info) |
| |
| if self.bruno.GetProductClass() == 'GFMS100': |
| # fat bruno, check Wiz.bin file: |
| found_file = False |
| self.p_ssh.SendCmd('ls -l /rw/sage') |
| cmd_list = self.p_ssh.GetCmdOutput() |
| for line in cmd_list: |
| if 'Wiz.bin' in line: |
| found_file = True |
| m = re.search('.*(\d{4}-\d{2}-\d{2}\s*\d+:\d+).*', line) |
| new_date = m.group(1) |
| if new_date == wiz_bin_date: |
| # if file is not updated |
| test_failed = True |
| info = self.log.CreateErrorInfo( |
| 'warning', 'Wiz.bin not updated: ' |
| 'file date before reset: ' + wiz_bin_date |
| + ' new file date: ' + new_date) |
| self.log.SendLine(self.test_info, info) |
| else: |
| info = self.log.CreateProgressInfo( |
| '60%', 'Checking Wiz.bin file succeed: new file created at: ' |
| + new_date) |
| self.log.SendLine(self.test_info, info) |
| break |
| if not found_file: |
| test_failed = True |
| info = self.log.CreateErrorInfo( |
| 'critical', 'No Wiz.bin file found after reset!') |
| self.log.SendLine(self.test_info, info) |
| |
| # verify ntpd.conf |
| ts = self.bruno.GetTimeServer() |
| if not ts: |
| test_failed = True |
| info = self.log.CreateErrorInfo( |
| 'critical', 'No time server found in file /etc/ntpd.conf!') |
| self.log.SendLine(self.test_info, info) |
| else: |
| info = self.log.CreateProgressInfo( |
| '65%', 'Checking time server (ntpd.conf) succeed: Timeserver: ' |
| + ts) |
| self.log.SendLine(self.test_info, info) |
| |
| # verify resolv.conf |
| dns = self.bruno.GetDNSServer() |
| if not dns: |
| test_failed = True |
| info = self.log.CreateErrorInfo( |
| 'critical', 'No DNS server found in file /etc/resolv.conf!') |
| self.log.SendLine(self.test_info, info) |
| else: |
| info = self.log.CreateProgressInfo( |
| '70%', 'Checking DNS server (resolv.conf) succeed: DNS: ' |
| + dns) |
| self.log.SendLine(self.test_info, info) |
| |
| # check gvsb: |
| gvsb = self.bruno.GetGVSBHost().strip() |
| if not gvsb or (gvsb != self.params['gvsb_host']): |
| test_failed = True |
| info = self.log.CreateErrorInfo( |
| 'critical', 'GVSB host not found or not matching expected host!') |
| self.log.SendLine(self.test_info, info) |
| else: |
| info = self.log.CreateProgressInfo( |
| '75%', 'Checking GVSB server (gvsbhost) succeed: URL: ' |
| + gvsb) |
| self.log.SendLine(self.test_info, info) |
| |
| # check gvsb files: |
| self.bruno.dev_ssh.SendCmd('ls /tmp/gvsb*') |
| cmd_list = self.bruno.dev_ssh.GetCmdOutput() |
| gvsb_channel = False |
| gvsb_kick = False |
| gvsb_host = False |
| for line in cmd_list: |
| if 'gvsbhost' in line: |
| gvsb_host = True |
| elif 'gvsbchannel' in line: |
| gvsb_channel = True |
| elif 'gvsbkick' in line: |
| gvsb_kick = True |
| if not gvsb_channel or not gvsb_kick or not gvsb_host: |
| test_failed = True |
| info = self.log.CreateErrorInfo( |
| 'critical', 'GVSB files missing! Check /tmp/for gvsb files') |
| self.log.SendLine(self.test_info, info) |
| else: |
| info = self.log.CreateProgressInfo( |
| '80%', 'Checking GVSB files (/tmp/gvsb*) succeed. Files found: ' |
| 'gvsbhost, gvsbchannel, gvsbkick.') |
| self.log.SendLine(self.test_info, info) |
| |
| # verify acs url: |
| if self.bruno.GetACSUrl() != self.params['acs_url']: |
| test_failed = True |
| info = self.log.CreateErrorInfo( |
| 'critical', 'ACS URL is not expected! Expected URL: ' |
| + self.params['acs_url']) |
| self.log.SendLine(self.test_info, info) |
| else: |
| info = self.log.CreateProgressInfo( |
| '85%', 'Checking ACS URL (/tmp/cwmp/acs_url) succeed. URL: ' |
| + self.params['acs_url']) |
| self.log.SendLine(self.test_info, info) |
| |
| # verify sage properties |
| self.bruno.dev_ssh.SendCmd('ls -l /rw/sage') |
| cmd_list = self.bruno.dev_ssh.GetCmdOutput() |
| sage_prop = False |
| sage_prop_backup = False |
| for line in cmd_list: |
| if 'Sage.properties.autobackup' in line: |
| sage_prop_backup = True |
| elif 'Sage.properties' in line: |
| sage_prop = True |
| if not sage_prop or not sage_prop_backup: |
| test_failed = True |
| info = self.log.CreateErrorInfo( |
| 'critical', 'Missing file in /rw/sage folder.') |
| self.log.SendLine(self.test_info, info) |
| else: |
| info = self.log.CreateProgressInfo( |
| '90%', 'Checking sage properties files (/rw/sage/) succeed.') |
| self.log.SendLine(self.test_info, info) |
| |
| if test_failed: |
| info = self.log.CreateResultInfo( |
| 'Failed', 'Device configuration not ' |
| 'restored completely after factory reset!') |
| self.log.SendLine(self.test_info, info) |
| else: |
| info = self.log.CreateResultInfo( |
| 'Pass', 'Device configuration ' |
| 'successfully restored after factory reset.') |
| self.log.SendLine(self.test_info, info) |
| info = self.log.CreateProgressInfo( |
| '100%', 'Device configuration ' |
| 'successfully restored after factory reset.') |
| self.log.SendLine(self.test_info, info) |
| |
| def VerifyDefaultACSURL(self): |
| """Test case: 14319146. This method verifies default ACS URL on device.""" |
| # this test needs to disable the Ethernet port, therefore can not |
| # be automated via ssh tunnel |
| print 'pass this test case' |
| |
| def Run(self): |
| """Run the test case.""" |
| ####### Add your code here -- BEGIN ####### |
| print 'Test Started...' |
| |
| self.Init() |
| time.sleep(2) # time delay between commands issued to Device |
| self.SshTunnel() |
| info = self.log.CreateProgressInfo( |
| '5%', 'SSH session to device successfully established!') |
| self.log.SendLine(self.test_info, info) |
| |
| if self.test_info['testID'] == '11865129': |
| self.VerifyACSWithIPv6() |
| elif self.test_info['testID'] == '11721375': |
| self.VerifyFactoryReset() |
| elif self.test_info['testID'] == '14319146': |
| self.VerifyDefaultACSURL() |
| elif self.test_info['testID'] == '11721371': |
| print 'run test' |
| elif self.test_info['testID'] == '14312009': |
| print 'run test' |
| elif self.test_info['testID'] == '14186028': |
| print 'run test' |
| elif self.test_info['testID'] == '14312010': |
| print 'run test' |
| elif self.test_info['testID'] == '14247012': |
| print 'run test' |
| elif self.test_info['testID'] == '14186029': |
| print 'run test' |
| elif self.test_info['testID'] == '14312011': |
| print 'run test' |
| elif self.test_info['testID'] == '14293012': |
| print 'run test' |
| elif self.test_info['testID'] == '14165037': |
| print 'run test' |
| elif self.test_info['testID'] == '14315010': |
| print 'run test' |
| elif self.test_info['testID'] == '14247013': |
| print 'run test' |
| elif self.test_info['testID'] == '14312012': |
| print 'run test' |
| elif self.test_info['testID'] == '14293013': |
| print 'run test' |
| elif self.test_info['testID'] == '14311008': |
| print 'run test' |
| elif self.test_info['testID'] == '14293014': |
| print 'run test' |
| elif self.test_info['testID'] == '14311009': |
| print 'run test' |
| elif self.test_info['testID'] == '14186030': |
| print 'run test' |
| elif self.test_info['testID'] == '14319148': |
| print 'run test' |
| elif self.test_info['testID'] == '14266497': |
| print 'run test' |
| elif self.test_info['testID'] == '14312013': |
| print 'run test' |
| elif self.test_info['testID'] == '11874098': |
| print 'run test' |
| elif self.test_info['testID'] == '14315007': |
| print 'run test' |
| elif self.test_info['testID'] == '14247014': |
| print 'run test' |
| elif self.test_info['testID'] == '14293015': |
| print 'run test' |
| elif self.test_info['testID'] == '14293016': |
| print 'run test' |
| elif self.test_info['testID'] == '11865130': |
| print 'run test' |
| elif self.test_info['testID'] == '11722370': |
| print 'run test' |
| elif self.test_info['testID'] == '11874103': |
| print 'run test' |
| elif self.test_info['testID'] == '11843139': |
| print 'run test' |
| else: |
| print 'do nothing...' |
| |
| print 'Test Completed...' |
| ####### Add your code here -- END ####### |
| |
| def Destructor(self): |
| """This is the destructor function to call for the user extened class.""" |
| self.p_ssh.close() |
| |
| if __name__ == '__main__': |
| # Test cases defined in this file: |
| test_defined = ['11865129', '11721375', '14319146', '11721371', |
| '14312009', '14186028', '14312010', '14247012', '14186029', |
| '14312011', '14293012', '14165037', '14315010', '14247013', |
| '14312012', '14293013', '14311008', '14293014', '14311009', |
| '14186030', '14319148', '14266497', '14312013', '11874098', |
| '14315007', '14247014', '14293015', '14293016', '11865130', |
| '11722370', '11874103', '11843139'] |
| # To execute test cases that defined in config file: |
| test_to_execute = testCase.TestCase.FindTestCaseToExecute('config.cfg') |
| for test_id in test_to_execute: |
| if test_id in test_defined: |
| test = BasicTR069Test(testID=test_id) |