| #!/usr/bin/python |
| # Copyright 2012 Google Inc. All Rights Reserved. |
| |
| """This class Extends the base class TestCase.""" |
| |
| __author__ = 'Lin Xue (linxue@google.com)' |
| |
| """This class extends the TestCase Class. |
| This file is for test cases of Bruno networking features. |
| The networking features test cases include following: |
| |
| testID 11874104 |
| Verify Bruno can obtain IPv4 address via RG's DHCP service |
| |
| testID 11792168 |
| Verify different IPv4 subnet (/24, /27) |
| |
| testID 11724358 |
| Verify DHCP address renewal after its timeout |
| |
| testID 11792169 |
| Verify DNS information |
| |
| testID 11874105 |
| Verify NTP service |
| |
| testID 11721380 |
| Verify TCP/UDP, ICMP Traffic Pass Through |
| |
| testID 11722376 |
| Verify Bruno IPv6 address |
| |
| testID 11723377 |
| Verify IPv6 SLAAC function over Ethernet/MoCA |
| |
| testID 11724362 |
| Verify IGMP/Multicast traffic Pass Through |
| |
| """ |
| |
| import os |
| import re |
| import time |
| |
| import pexpect |
| |
| import device |
| import ip |
| import ssh |
| import testCase |
| |
| |
| class NetworkingTest(testCase.TestCase): |
| """Class for test cases of Bruno networking features. |
| |
| Configuration Parameters: |
| testID: 11874104/11792168/11724358/11792169 |
| /11874105/11721380/11722376/11724362/etc. |
| addr = Bruno's IP address |
| user = root |
| pwd = google |
| bruno_prompt = gfibertv# |
| file = cmdOutput.txt |
| title = Networking |
| """ |
| |
| def Init(self): |
| """Initialize the networking feature tests.""" |
| # Get all device information. |
| self.bruno = device.Bruno(user=self.params['user'], |
| addr=self.params['addr'], |
| pwd=self.params['pwd'], |
| bruno_prompt=self.params['bruno_prompt'], |
| addr_ipv6=self.params['addr_ipv6']) |
| |
| # Pass logging instance to device |
| self.bruno.SetLogging(self.log) |
| |
| return |
| |
| def SshTunnel(self): |
| """Create ssh tunnel for the test.""" |
| self.p_ssh = ssh.SSH(self.bruno) |
| |
| if self.p_ssh.Ssh() < 0: |
| info = self.log.CreateErrorInfo('critical', |
| 'Cannot establish ssh tunnel to Device! ' |
| 'Exit!') |
| self.log.SendLine(None, info) |
| os.sys.exit(-1) |
| |
| else: |
| # Set bruno device ssh instance |
| self.bruno.dev_ssh = self.p_ssh |
| |
| info = self.log.CreateProgressInfo('---', |
| 'ssh session to Device ' |
| 'successfully established!') |
| self.log.SendLine(None, info) |
| |
| return |
| |
| def VerifyIP(self): |
| """Test Bruno IP address. |
| testID: 11874104 |
| """ |
| # Use ifconfig to check interface IP address |
| self.p_ssh.SendCmd(r'ifconfig br0') |
| line = self.p_ssh.GetCmdOutput(3)[1] |
| |
| # Get Bruno IP address |
| address = ip.IPADDR(line).IsLikeIpv4Address() |
| if address is not None: |
| ip.IPADDR(address).IsValidIpv4Address() |
| info = self.log.CreateProgressInfo('50% ', |
| 'Verify IP: Bruno has IP address ' |
| + address) |
| self.log.SendLine(self.test_info, info) |
| else: |
| info = self.log.CreateErrorInfo('critical', |
| 'Error! Bruno does not have IP address!' |
| ' Exit!') |
| self.log.SendLine(self.test_info, info) |
| os.sys.exit(-1) |
| |
| # Get Bruno netmask |
| netmask = re.search(r'mask [0-9]+(?:\.[0-9]+){3}', line).group() |
| address = ip.IPADDR(netmask).IsLikeIpv4Address() |
| if address is not None: |
| ip.IPADDR(address).IsValidIpv4Address() |
| info = self.log.CreateProgressInfo('100% ', |
| 'Verify IP: Bruno has netmask: ' |
| + address) |
| self.log.SendLine(self.test_info, info) |
| else: |
| info = self.log.CreateErrorInfo('critical', |
| 'Error! Bruno does not have netmask!' |
| ' Exit!') |
| self.log.SendLine(self.test_info, info) |
| os.sys.exit(-1) |
| |
| return |
| |
| def VerifyIPv6(self): |
| """Test Bruno IPv6 address. |
| testID: 11722376 |
| """ |
| # Use ip addr to check interface IPv6 address |
| self.p_ssh.SendCmd(r'ip addr show dev br0') |
| |
| for line in reversed(self.p_ssh.p_ssh.before.splitlines()[-10:]): |
| ipv6_addr = re.search('inet6.*dynamic', line) |
| # Get Bruno IPv6 address |
| if ipv6_addr is not None: |
| info = self.log.CreateProgressInfo('100% ', |
| 'Verify IPv6: ' |
| 'Bruno has IPv6 address: ' |
| + ipv6_addr.group()) |
| self.log.SendLine(self.test_info, info) |
| break |
| else: |
| info = self.log.CreateErrorInfo('intermediate', |
| 'Wanring! ' |
| 'Bruno did not have IPv6 address') |
| self.log.SendLine(self.test_info, info) |
| |
| return |
| |
| def VerifyDHCP(self): |
| """Test Bruno communication with RG through DHCP service. |
| testID 11724358 |
| """ |
| # check DHCP communications (DHCP REQUEST, DHCP ACK) between Bruno and RG |
| self.p_ssh.SendCmd(r'dmesg | grep DHCPREQUEST') |
| dhcp_request = self.p_ssh.GetCmdOutput(3)[1] |
| self.p_ssh.SendCmd(r'dmesg | grep DHCPACK') |
| dhcp_ack = self.p_ssh.GetCmdOutput(3)[1] |
| |
| # check DHCP REQUEST |
| dhcp_request_time = re.search(r'[0-9]+.[0-9]+', dhcp_request) |
| address = ip.IPADDR(dhcp_request).IsLikeIpv4Address() |
| if address is not None: |
| ip.IPADDR(address).IsValidIpv4Address() |
| info = self.log.CreateProgressInfo('20% ', |
| 'Verify DHCP: at time ' |
| + dhcp_request_time.group() + |
| ', Bruno sent DHCP REQUEST to ' |
| + address) |
| self.log.SendLine(self.test_info, info) |
| else: |
| info = self.log.CreateErrorInfo('critical', |
| 'Error! Bruno did not send DHCP requst!' |
| ' Exit!') |
| self.log.SendLine(self.test_info, info) |
| os.sys.exit(-1) |
| |
| # check DHCP ACK |
| dhcp_ack_time = re.search(r'[0-9]+.[0-9]+', dhcp_ack) |
| address = ip.IPADDR(dhcp_ack).IsLikeIpv4Address() |
| if address is not None: |
| ip.IPADDR(address).IsValidIpv4Address() |
| info = self.log.CreateProgressInfo('40% ', |
| 'Verify DHCP: at time ' |
| + dhcp_ack_time.group() + |
| ', Bruno got DHCP ACK from ' |
| + address) |
| self.log.SendLine(self.test_info, info) |
| else: |
| info = self.log.CreateErrorInfo('critical', |
| 'Error! Bruno did not get DHCP ACK!' |
| ' Exit!') |
| self.log.SendLine(self.test_info, info) |
| os.sys.exit(-1) |
| |
| # check DHCP renewal time |
| self.p_ssh.SendCmd(r'dmesg | grep renewal') |
| line = self.p_ssh.GetCmdOutput(3)[1] |
| renewtime = re.search(r'[0-9]+ seconds', line) |
| if renewtime is not None: |
| info = self.log.CreateProgressInfo('60% ', |
| 'Verify DHCP: Bruno DHCP service ' |
| 'renewal time is ' |
| + renewtime.group()) |
| self.log.SendLine(self.test_info, info) |
| else: |
| info = self.log.CreateErrorInfo('critical', |
| 'Error! Bruno does not have DHCP ' |
| 'renewal time! ') |
| self.log.SendLine(self.test_info, info) |
| os.sys.exit(-1) |
| |
| # DHCP ACK time before renewal |
| dhcp_before_renew = dhcp_ack_time.group() |
| |
| # check if Bruno can renew DHCP after renew time |
| renew_time = re.search(r'[0-9]+', renewtime.group()).group() |
| info = self.log.CreateProgressInfo('80% ', |
| 'Verify DHCP: Wait ' |
| + renew_time + |
| ' seconds and see if Bruno can get' |
| ' DHCP renew... ') |
| self.log.SendLine(self.test_info, info) |
| |
| # sleep for renew_time seconds |
| time.sleep(int(renew_time)) |
| |
| self.p_ssh.SendCmd(r'dmesg | grep DHCPACK') |
| line = self.p_ssh.GetCmdOutput(3)[1] |
| |
| # DHCP ACK time after renewal |
| dhcp_after_renew = re.search(r'[0-9]+.[0-9]+', line) |
| if dhcp_after_renew is not None: |
| dhcp_after_renew = dhcp_after_renew.group() |
| |
| address = ip.IPADDR(line).IsLikeIpv4Address() |
| # if DHCP ACK before renewal and DHCP ACK after renewal are different, |
| # then Bruno got DHCP renewed |
| if address is not None and dhcp_after_renew != dhcp_before_renew: |
| ip.IPADDR(address).IsValidIpv4Address() |
| info = self.log.CreateProgressInfo('100% ', |
| 'Verify DHCP: Bruno got ' |
| 'DHCP renewal, at ' |
| + dhcp_after_renew + |
| ' seconds from' |
| + address) |
| self.log.SendLine(self.test_info, info) |
| else: |
| info = self.log.CreateErrorInfo('critical', |
| 'Error! Bruno did not get DHCP renewal! ' |
| 'Exit!') |
| self.log.SendLine(self.test_info, info) |
| os.sys.exit(-1) |
| |
| return |
| |
| def VerifyDNS(self): |
| """Check Bruno DNS server information. |
| This function observes the DNS messages exchanged between RG and Bruno |
| testID:11792169 |
| """ |
| # Check DNS server information on Bruno |
| self.p_ssh.SendCmd(r'dmesg | grep dns') |
| lines = self.p_ssh.GetCmdOutput(4) |
| |
| for line in lines: |
| dns4 = re.search(r'dns4.*', line) |
| if dns4 is not None: |
| info = self.log.CreateProgressInfo('100% ', |
| 'Bruno has DNS server ' |
| + dns4.group()) |
| self.log.SendLine(self.test_info, info) |
| break |
| else: |
| info = self.log.CreateErrorInfo('critical', |
| 'Error! Bruno does not have DNS server! ' |
| 'Exit!') |
| self.log.SendLine(self.test_info, info) |
| os.sys.exit(-1) |
| |
| return |
| |
| def VerifyNTP(self): |
| """Check Bruno NTP information. |
| This function observes the NTP messages exchanged between RG and Bruno |
| testID:11874105 |
| """ |
| # Check NTP server information on Bruno |
| self.p_ssh.SendCmd(r'dmesg | grep ntpd') |
| |
| for line in reversed(self.p_ssh.p_ssh.before.splitlines()[-10:]): |
| sync = re.search(r'reply from .*', line) |
| if sync is not None: |
| info = self.log.CreateProgressInfo('100% ', |
| 'Bruno NTP is synchronised. ' |
| 'NTP info: ' |
| + sync.group()) |
| self.log.SendLine(self.test_info, info) |
| break |
| else: |
| info = self.log.CreateErrorInfo('intermediate', |
| 'Warning! Bruno\'s NTP service has not ' |
| 'been synchronised yet.') |
| self.log.SendLine(self.test_info, info) |
| |
| return |
| |
| def TcpdumpCmd(self, cmd='', pattern='', testcase='', traffic='', timeout=0): |
| """This function send TCPDUMP command to Bruno prompt |
| Then it gets back the TCPDUMP results |
| If expected traffic is found, send success log |
| If no expected results, or TIMEOUT, or EOF, send Warning or Error |
| |
| Args: |
| cmd: |
| The TCPDUMP command which will be sent to detect traffic |
| pattern: |
| The search pattern in the results |
| For example, for IPv6 traffic results, it is IPv6 |
| testcase: |
| The test case we are running now |
| traffic: |
| The traffic we are meausuring in one test case |
| timeout: |
| Maximun wait time for the traffic |
| """ |
| |
| # Send tcpdump command |
| self.p_ssh.p_ssh.sendline(cmd) |
| |
| # Wait timeout seconds to see if there is any result |
| info = self.log.CreateProgressInfo('50%', |
| 'Verify ' + testcase +': Wait for ' |
| + str(timeout) + ' seconds' |
| ' for ' + traffic + ' traffic...') |
| self.log.SendLine(self.test_info, info) |
| |
| i = self.p_ssh.p_ssh.expect([self.bruno.dev_info['bruno_prompt'], |
| pexpect.EOF, pexpect.TIMEOUT], timeout) |
| |
| if i == 0: |
| # Got the tcpdump return results |
| for result in self.p_ssh.p_ssh.before.splitlines()[-20:]: |
| ra = re.search(pattern, result) |
| if ra is not None: |
| info = self.log.CreateProgressInfo('100%', |
| 'Verify ' + testcase + |
| ': Bruno got ' |
| + traffic + |
| ' traffic come through!') |
| self.log.SendLine(self.test_info, info) |
| break |
| else: |
| # Did not find the specific protocol traffic in the tcpdump results |
| info = self.log.CreateErrorInfo('intermediate', |
| 'Warning! Bruno does not have ' |
| + traffic + ' traffic come through!') |
| self.log.SendLine(self.test_info, info) |
| |
| else: |
| # TIMEOUT or EOF |
| info = self.log.CreateErrorInfo('intermediate', |
| 'Warning! TIMEOUT, Bruno does not have ' |
| + traffic + ' traffic come through!') |
| self.log.SendLine(self.test_info, info) |
| |
| # If it is timeout or EOF, ctl + c to end the command |
| self.p_ssh.p_ssh.sendline('\x03') |
| # Get rid of the command prompt for next protocol command |
| self.p_ssh.p_ssh.expect(self.bruno.dev_info['bruno_prompt']) |
| self.p_ssh.p_ssh.expect(self.bruno.dev_info['bruno_prompt']) |
| |
| return |
| |
| def VerifyTCPUDPICMP(self): |
| """Verify TCP/UDP/ICMP traffic can go through Bruno |
| testID: 11721380 |
| thress steps: |
| 1. check if TCP can come through |
| 2. check if UDP can come through |
| 3. check if ICMP can come through |
| |
| Note: need to make sure traffic is generated first |
| no need to manually generate traffic |
| need to add Spirient automation traffic generation |
| """ |
| |
| # Check TCP traffic |
| # Use tcpdump to check on br0 for TCP packets |
| self.TcpdumpCmd('tcpdump -i br0 -c 1 tcp', 'IP.*ack', |
| 'TCPUDPICMP', 'TCP', 10) |
| |
| # Check UDP traffic |
| # Use tcpdump to check on br0 for UDP packets |
| self.TcpdumpCmd('tcpdump -i br0 -c 1 udp', 'UDP', |
| 'TCPUDPICMP', 'UDP', 10) |
| |
| # Check ICMP traffic |
| # Use tcpdump to check on br0 for ICMP packets |
| self.TcpdumpCmd('tcpdump -i br0 -c 1 icmp', 'ICMP', |
| 'TCPUDPICMP', 'ICMP', 10) |
| |
| return |
| |
| def VerifySLAAC(self): |
| """Check Bruno IPv6 SLAAC function over Ethernet/MoCA. |
| testID:11723377 |
| Two steps: |
| 1. check if Router Advertisement can come through |
| 2. check if IPv6 packages can come through |
| """ |
| |
| # Check Router Advertisement message |
| # Router Advertisement message is in Neighbor Discovery ICMPv6 message |
| # Type Field Value 134 |
| # Send tcpdump command to check Router Advertisement message |
| self.TcpdumpCmd('tcpdump -s0 -n -c 1 ip6[40+0]==134', 'IPv6', |
| 'SLACC', 'Router Advertisement message', 30) |
| |
| # Send tcpdump command to check IPv6 traffic |
| self.TcpdumpCmd('tcpdump -s0 -n -c 1 ip6', 'IPv6', |
| 'SLACC', 'IPv6', 30) |
| |
| def VerifyIgmpMulticast(self): |
| """Check Bruno IGMP/Multicast traffic pass through. |
| testID:11724362 |
| Two steps: |
| 1. check if IGMP can come through |
| 2. check if Multicast can come through |
| """ |
| |
| # Send tcpdump command to check IGMP traffic |
| self.TcpdumpCmd('tcpdump -c 1 igmp', 'igmp', |
| 'IGMPMULTICAST', 'IGMP', 30) |
| |
| # Send tcpdump command to check Multicast traffic |
| # Bruno multicast address is 255.0.0.0/8 |
| self.TcpdumpCmd('tcpdump -c 1 ip net 225.0.0.0/8', 'UDP', |
| 'IGMPMULTICAST', 'Multicast 225.0.0.0/8', 30) |
| |
| return |
| |
| def Run(self): |
| """Run the test case.""" |
| ####### Add your code here -- BEGIN ####### |
| print 'Test Started...' |
| |
| self.Init() |
| time.sleep(2) # time delay between commands issued to Device |
| self.SshTunnel() |
| |
| print 'Get dev info ...' |
| self.bruno.GetDevInfo() |
| |
| self.VerifyIP() |
| self.VerifyIPv6() |
| self.VerifyDHCP() |
| self.VerifyNTP() |
| self.VerifyDNS() |
| self.VerifyTCPUDPICMP() |
| self.VerifySLAAC() |
| self.VerifyIgmpMulticast() |
| |
| info = self.log.CreateResultInfo('Pass', |
| 'Finish all networking ' |
| 'feature test cases') |
| self.log.SendLine(self.test_info, info) |
| print 'Test Completed...' |
| ####### Add your code here -- END ####### |
| |
| def destructor(self): |
| """This is the destructor function to call for the user extened class.""" |
| self.p_ssh.close() |
| |
| if __name__ == '__main__': |
| """now do all the networking tests together.""" |
| test = NetworkingTest(testID='11364', key_word='Networking') |