| #!/usr/bin/python |
| # Copyright 2012 Google Inc. All Rights Reserved. |
| |
| """This class Extends the base class TestCase.""" |
| |
| __author__ = 'Lin Xue (linxue@google.com)' |
| |
| """This class extends the TestCase Class. |
| This file is for test cases of Bruno networking features. |
| The networking features test cases include following: |
| |
| testID 11874104 |
| Verify Bruno can obtain IPv4 address via RG's DHCP service |
| |
| testID 11792168 |
| Verify different IPv4 subnet (/24, /27) |
| |
| testID 11724358 |
| Verify DHCP address renewal after its timeout |
| |
| testID 11792169 |
| Verify DNS information |
| |
| testID 11874105 |
| Verify NTP service |
| |
| testID 11721380 |
| Verify TCP/UDP, ICMP Traffic Pass Through |
| |
| testID 11722376 |
| Verify Bruno IPv6 address |
| |
| testID 11724362 |
| Verify IGMP/Multicast traffic Pass Through |
| """ |
| |
| import os |
| import re |
| import time |
| |
| import pexpect |
| import testCase |
| |
| import device |
| import ip |
| |
| |
| class NetworkingTest(testCase.TestCase): |
| """ Class for test cases of Bruno networking features |
| |
| Configuration Parameters: |
| testID: 11874104/11792168/11724358/11792169 |
| /11874105/11721380/11722376/11724362/etc. |
| addr = Bruno's IP address |
| user = root |
| pwd = google |
| cmd_prompt = gfibertv# |
| file = cmdOutput.txt |
| title = Networking |
| """ |
| |
| verify_progress = 0 |
| |
| def get_Dev(self): |
| """Get all device information.""" |
| self.bruno = device.Bruno(user=self.params['user'], |
| addr=self.params['addr'], |
| pwd=self.params['pwd'], |
| cmd_prompt=self.params['cmd_prompt']) |
| |
| def ssh_Tunnel(self): |
| """Create ssh tunnel for the test.""" |
| self.p_ssh = self.bruno.createSSH() |
| |
| def verifyIP(self): |
| """Test Bruno IP address |
| testID: 11874104 |
| """ |
| # Use ifconfig to check interface IP address |
| self.p_ssh.sendCmd(r'ifconfig br0') |
| line = self.p_ssh.getCmdOutput(3)[1] |
| |
| # Get Bruno IP address |
| address = ip.IPADDR(line).is_like_ipv4_address() |
| if address is not None: |
| ip.IPADDR(address).is_valid_ipv4_address() |
| info = self.log.createProgressInfo('50% ', |
| 'Verify IP: Bruno has IP address ' |
| + address) |
| self.log.sendLine(self.TESTINFO, info) |
| else: |
| info = self.log.createErrorInfo('critical', |
| 'Error! Bruno does not have IP address!' |
| ' Exit!') |
| self.log.sendLine(self.TESTINFO, info) |
| os.sys.exit(-1) |
| |
| # Get Bruno netmask |
| netmask = re.search(r'mask [0-9]+(?:\.[0-9]+){3}', line).group() |
| address = ip.IPADDR(netmask).is_like_ipv4_address() |
| if address is not None: |
| ip.IPADDR(address).is_valid_ipv4_address() |
| info = self.log.createProgressInfo('100% ', |
| 'Verify IP: Bruno has netmask: ' |
| + address) |
| self.log.sendLine(self.TESTINFO, info) |
| else: |
| info = self.log.createErrorInfo('critical', |
| 'Error! Bruno does not have netmask!' |
| ' Exit!') |
| self.log.sendLine(self.TESTINFO, info) |
| os.sys.exit(-1) |
| |
| return |
| |
| def verifyDHCP(self): |
| """Test Bruno communication with RG through DHCP service |
| testID 11724358 |
| """ |
| # check DHCP communications (DHCP REQUEST, DHCP ACK) between Bruno and RG |
| self.p_ssh.sendCmd(r'dmesg | grep DHCPREQUEST') |
| dhcp_request = self.p_ssh.getCmdOutput(3)[1] |
| self.p_ssh.sendCmd(r'dmesg | grep DHCPACK') |
| dhcp_ack = self.p_ssh.getCmdOutput(3)[1] |
| |
| # check DHCP REQUEST |
| dhcp_request_time = re.search(r'[0-9]+.[0-9]+', dhcp_request) |
| address = ip.IPADDR(dhcp_request).is_like_ipv4_address() |
| if address is not None: |
| ip.IPADDR(address).is_valid_ipv4_address() |
| info = self.log.createProgressInfo('20% ', |
| 'Verify DHCP: at time ' |
| + dhcp_request_time.group() |
| +', Bruno sent DHCP REQUEST to ' |
| + address) |
| self.log.sendLine(self.TESTINFO, info) |
| else: |
| info = self.log.createErrorInfo('critical', |
| 'Error! Bruno did not send DHCP requst!' |
| ' Exit!') |
| self.log.sendLine(self.TESTINFO, info) |
| os.sys.exit(-1) |
| |
| # check DHCP ACK |
| dhcp_ack_time = re.search(r'[0-9]+.[0-9]+', dhcp_ack) |
| address = ip.IPADDR(dhcp_ack).is_like_ipv4_address() |
| if address is not None: |
| ip.IPADDR(address).is_valid_ipv4_address() |
| info = self.log.createProgressInfo('40% ', |
| 'Verify DHCP: at time ' |
| + dhcp_ack_time.group() |
| +', Bruno got DHCP ACK from ' |
| + address) |
| self.log.sendLine(self.TESTINFO, info) |
| else: |
| info = self.log.createErrorInfo('critical', |
| 'Error! Bruno did not get DHCP ACK!' |
| ' Exit!') |
| self.log.sendLine(self.TESTINFO, info) |
| os.sys.exit(-1) |
| |
| # check DHCP renewal time |
| self.p_ssh.sendCmd(r'dmesg | grep renewal') |
| line = self.p_ssh.getCmdOutput(3)[1] |
| renewtime = re.search(r'[0-9]+ seconds', line) |
| if renewtime is not None: |
| info = self.log.createProgressInfo('60% ', |
| 'Verify DHCP: Bruno DHCP service ' |
| 'renewal time is ' |
| + renewtime.group()) |
| self.log.sendLine(self.TESTINFO, info) |
| else: |
| info = self.log.createErrorInfo('critical', |
| 'Error! Bruno does not have DHCP ' |
| 'renewal time! ') |
| self.log.sendLine(self.TESTINFO, info) |
| os.sys.exit(-1) |
| |
| # DHCP ACK time before renewal |
| dhcp_before_renew = dhcp_ack_time.group() |
| |
| # check if Bruno can renew DHCP after renew time |
| renew_time = re.search(r'[0-9]+', renewtime.group()).group() |
| info = self.log.createProgressInfo('80% ', |
| 'Verify DHCP: Wait ' + renew_time + |
| ' seconds and see if Bruno can get' |
| ' DHCP renew... ') |
| self.log.sendLine(self.TESTINFO, info) |
| |
| # sleep for renew_time seconds |
| time.sleep(int(renew_time)) |
| |
| self.p_ssh.sendCmd(r'dmesg | grep DHCPACK') |
| line = self.p_ssh.getCmdOutput(3)[1] |
| |
| # DHCP ACK time after renewal |
| dhcp_after_renew = re.search(r'[0-9]+.[0-9]+', line).group() |
| |
| address = ip.IPADDR(line).is_like_ipv4_address() |
| # if DHCP ACK before renewal and DHCP ACK after renewal are different, |
| # then Bruno got DHCP renewed |
| if address is not None and dhcp_after_renew != dhcp_before_renew: |
| ip.IPADDR(address).is_valid_ipv4_address() |
| info = self.log.createProgressInfo('100% ', |
| 'Verify DHCP: Bruno got DHCP renewal,' |
| ' at ' + dhcp_after_renew + |
| ' seconds from ' + address) |
| self.log.sendLine(self.TESTINFO, info) |
| else: |
| info = self.log.createErrorInfo('critical', |
| 'Error! Bruno did not get DHCP renewal! ' |
| 'Exit!') |
| self.log.sendLine(self.TESTINFO, info) |
| os.sys.exit(-1) |
| |
| return |
| |
| def verifyDNS(self): |
| """Check Bruno DNS server information |
| testID:11792169 |
| """ |
| # Check DNS server information on Bruno |
| self.p_ssh.sendCmd(r'cat /etc/resolv.conf') |
| lines = self.p_ssh.getCmdOutput(3) |
| |
| for line in lines: |
| address = ip.IPADDR(line).is_like_ipv4_address() |
| if address is not None: |
| ip.IPADDR(address).is_valid_ipv4_address() |
| info = self.log.createProgressInfo('100% ', |
| 'Bruno has DNS server ' + address) |
| self.log.sendLine(self.TESTINFO, info) |
| break |
| else: |
| info = self.log.createErrorInfo('critical', |
| 'Error! Bruno does not have DNS server! ' |
| 'Exit!') |
| self.log.sendLine(self.TESTINFO, info) |
| os.sys.exit(-1) |
| |
| return |
| |
| def verifyNTP(self): |
| """Check Bruno NTP information |
| testID:11874105 |
| """ |
| # Check DNS server information on Bruno |
| self.p_ssh.sendCmd(r'dmesg | grep synced') |
| line = self.p_ssh.getCmdOutput(3)[1] |
| sync = re.search(r'synced', line) |
| if sync.group() is not None: |
| info = self.log.createProgressInfo('100% ', |
| 'Bruno\'s NTP service is synchronised') |
| self.log.sendLine(self.TESTINFO, info) |
| else: |
| info = self.log.createErrorInfo('intermediate', |
| 'Warning! Bruno\'s NTP service has not ' |
| 'been synchronised yet.') |
| self.log.sendLine(self.TESTINFO, info) |
| |
| return |
| |
| def checkTraffic(self, protocol='', isMOCA=0): |
| """This function checks TCP/UDP/ICMP traffic on Bruno interface |
| will be called from function verifyTCPUDPICMP() |
| |
| Args: |
| protocol: |
| to check traffic from specific protocol |
| tcp: to check tcp traffic |
| udp: to check udp traffic |
| icmp: to check icmp traffic |
| isMOCA: |
| to check specific interface on Bruno |
| 0: Ethernet interface eth0 |
| 1: MOCA interface eth1 |
| Bruno interfaces: |
| eth0: ethernet port |
| eth1: MOCA port |
| """ |
| |
| # Set the protocol to be checked this time |
| # protocol_cmd is the specific string in tcpdump for each protocol |
| # expect_pattern is the specific regex pattern to be searched |
| # in the expect return string |
| if protocol == 'tcp': |
| protocol_cmd = 'tcp' |
| expect_pattern = 'IP.*ack' |
| elif protocol == 'udp': |
| protocol_cmd = 'udp' |
| expect_pattern = 'UDP' |
| elif protocol == 'icmp': |
| protocol_cmd = 'icmp' |
| expect_pattern = 'ICMP' |
| else: |
| # Now support TCP/UDP/ICMP |
| info = self.log.createErrorInfo('intermediate', |
| 'Warning! Protocol not supported ') |
| self.log.sendLine(self.TESTINFO, info) |
| |
| # Set interface to be checked this time |
| # interface_cmd is the interface string for tcpdump |
| # interface_output is the interface string for output |
| if isMOCA == 0: |
| interface_cmd = 'eth0' |
| interface_output = 'Ethernet port eth0' |
| else: |
| interface_cmd = 'eth1' |
| interface_output = 'MOCA port eth1' |
| |
| # Set the progress output for generate traffic |
| # after each output, verify progress need to be increased |
| if isMOCA == 0: |
| info = self.log.createProgressInfo(str(self.verify_progress) + '%', |
| 'Verify ' + protocol + |
| ': Generate ' + protocol + |
| ' traffic ... now manual, ' |
| 'need Spirient traffic generator ') |
| self.log.sendLine(self.TESTINFO, info) |
| self.verify_progress += 10 |
| |
| # Send tcpdump command to prompt |
| self.p_ssh.p_ssh.sendline('tcpdump -i ' + interface_cmd + ' -c 10 ' + protocol_cmd) |
| i = self.p_ssh.p_ssh.expect([self.bruno.devInfo['cmd_prompt'], pexpect.EOF, pexpect.TIMEOUT], 5) |
| |
| if i == 0: |
| # Got the tcpdump return results |
| for result in reversed(self.p_ssh.p_ssh.before.splitlines()[-10:]): |
| protocol_flag = re.search(expect_pattern, result) |
| if protocol_flag is not None: |
| # Found the specific protocol traffic in the tcpdump results |
| info = self.log.createProgressInfo(str(self.verify_progress) + '%', |
| 'Verify ' + protocol + |
| ': Bruno got ' + protocol + |
| ' traffic went through ' + |
| interface_output) |
| self.log.sendLine(self.TESTINFO, info) |
| self.verify_progress += 10 |
| break |
| else: |
| # Did not find the specific protocol traffic in the tcpdump results |
| info = self.log.createErrorInfo('intermediate', |
| 'Warning! packet not found ' |
| 'Bruno does not have ' + protocol + |
| ' traffic went through ' + interface_output) |
| self.log.sendLine(self.TESTINFO, info) |
| self.verify_progress += 10 |
| |
| else: |
| # Got TIMEOUT or EOF, no results got from tcpdump |
| info = self.log.createErrorInfo('intermediate', |
| 'Warning! TIMEOUT or EOF ' |
| 'Bruno does not have ' + protocol + |
| ' traffic went through ' + interface_output) |
| self.log.sendLine(self.TESTINFO, info) |
| self.verify_progress += 10 |
| |
| # If it is timeout or EOF, ctl + c to end the command |
| self.p_ssh.p_ssh.sendline('\x03') |
| # Get rid of the command prompt for next protocol command |
| self.p_ssh.p_ssh.expect(self.bruno.devInfo['cmd_prompt']) |
| self.p_ssh.p_ssh.expect(self.bruno.devInfo['cmd_prompt']) |
| |
| def verifyTCPUDPICMP(self): |
| """Verify TCP/UDP/ICMP traffic can go through Bruno |
| testID: 11721380 |
| """ |
| |
| # Check TCP traffic |
| # TCP: first check ethernet port (eth0) on Bruno |
| self.checkTraffic('tcp', 0) |
| |
| # TCP: then check MOCA port (eth1) on Bruno |
| self.checkTraffic('tcp', 1) |
| |
| # Check UDP traffic |
| # UDP: first check ethernet port (eth0) on Bruno |
| self.checkTraffic('udp', 0) |
| |
| # UDP: then check MOCA port (eth1) on Bruno |
| self.checkTraffic('udp', 1) |
| |
| # Check ICMP traffic |
| # ICMP: first check ethernet port (eth0) on Bruno |
| self.checkTraffic('icmp', 0) |
| |
| # ICMP: then check MOCA port (eth1) on Bruno |
| self.checkTraffic('icmp', 1) |
| |
| return |
| |
| def run(self): |
| """Run the test case.""" |
| ####### Add your code here -- BEGIN ####### |
| print "Test Started..." |
| |
| self.get_Dev() |
| time.sleep(2) # time delay between commands issued to Device |
| |
| self.ssh_Tunnel() |
| |
| print "Get dev info ..." |
| self.bruno.getDevInfo() |
| |
| self.verifyIP() |
| self.verifyNTP() |
| self.verifyDHCP() |
| self.verifyDNS() |
| self.verifyTCPUDPICMP() |
| |
| info = self.log.createResultInfo('Pass', |
| 'Finish all networking ' |
| 'feature test cases') |
| self.log.sendLine(self.TESTINFO, info) |
| print "Test Completed..." |
| ####### Add your code here -- END ####### |
| |
| def destructor(self): |
| """This is the destructor function to call for the user extened class.""" |
| self.p_ssh.close() |
| |
| if __name__ == "__main__": |
| """now do all the networking tests together.""" |
| test = NetworkingTest(testID='11364', key_word='Networking') |