blob: 22897383766a1f534c45f5cc425919a9908ba68c [file] [log] [blame]
#!/usr/bin/python
# Copyright 2012 Google Inc. All Rights Reserved.
"""This class Extends the base class TestCase."""
__author__ = 'Lin Xue (linxue@google.com)'
"""This class extends the TestCase Class.
This file is for test cases of Bruno networking features.
The networking features test cases include following:
testID 11874104
Verify Bruno can obtain IPv4 address via RG's DHCP service
testID 11792168
Verify different IPv4 subnet (/24, /27)
testID 11724358
Verify DHCP address renewal after its timeout
testID 11792169
Verify DNS information
testID 11874105
Verify NTP service
testID 11721380
Verify TCP/UDP, ICMP Traffic Pass Through
testID 11722376
Verify Bruno IPv6 address
testID 11723377
Verify IPv6 SLAAC function over Ethernet/MoCA
testID 11724362
Verify IGMP/Multicast traffic Pass Through
"""
import os
import re
import time
import pexpect
import device
import ip
import ssh
import testCase
class NetworkingTest(testCase.TestCase):
"""Class for test cases of Bruno networking features.
Configuration Parameters:
testID: 11874104/11792168/11724358/11792169
/11874105/11721380/11722376/11724362/etc.
addr = Bruno's IP address
user = root
pwd = google
bruno_prompt = gfibertv#
file = cmdOutput.txt
title = Networking
"""
def Init(self):
"""Initialize the networking feature tests."""
# Get all device information.
self.bruno = device.Bruno(user=self.params['user'],
addr=self.params['addr'],
pwd=self.params['pwd'],
bruno_prompt=self.params['bruno_prompt'],
addr_ipv6=self.params['addr_ipv6'])
# Pass logging instance to device
self.bruno.SetLogging(self.log)
return
def SshTunnel(self):
"""Create ssh tunnel for the test."""
self.p_ssh = ssh.SSH(self.bruno)
if self.p_ssh.Ssh() < 0:
info = self.log.CreateErrorInfo('critical',
'Cannot establish ssh tunnel to Device! '
'Exit!')
self.log.SendLine(None, info)
os.sys.exit(-1)
else:
# Set bruno device ssh instance
self.bruno.dev_ssh = self.p_ssh
info = self.log.CreateProgressInfo('---',
'ssh session to Device '
'successfully established!')
self.log.SendLine(None, info)
return
def VerifyIP(self):
"""Test Bruno IP address.
testID: 11874104
"""
# Use ifconfig to check interface IP address
self.p_ssh.SendCmd(r'ifconfig br0')
line = self.p_ssh.GetCmdOutput(3)[1]
# Get Bruno IP address
address = ip.IPADDR(line).IsLikeIpv4Address()
if address is not None:
ip.IPADDR(address).IsValidIpv4Address()
info = self.log.CreateProgressInfo('50% ',
'Verify IP: Bruno has IP address '
+ address)
self.log.SendLine(self.test_info, info)
else:
info = self.log.CreateErrorInfo('critical',
'Error! Bruno does not have IP address!'
' Exit!')
self.log.SendLine(self.test_info, info)
os.sys.exit(-1)
# Get Bruno netmask
netmask = re.search(r'mask [0-9]+(?:\.[0-9]+){3}', line).group()
address = ip.IPADDR(netmask).IsLikeIpv4Address()
if address is not None:
ip.IPADDR(address).IsValidIpv4Address()
info = self.log.CreateProgressInfo('100% ',
'Verify IP: Bruno has netmask: '
+ address)
self.log.SendLine(self.test_info, info)
else:
info = self.log.CreateErrorInfo('critical',
'Error! Bruno does not have netmask!'
' Exit!')
self.log.SendLine(self.test_info, info)
os.sys.exit(-1)
return
def VerifyIPv6(self):
"""Test Bruno IPv6 address.
testID: 11722376
"""
# Use ip addr to check interface IPv6 address
self.p_ssh.SendCmd(r'ip addr show dev br0')
for line in reversed(self.p_ssh.p_ssh.before.splitlines()[-10:]):
ipv6_addr = re.search('inet6.*dynamic', line)
# Get Bruno IPv6 address
if ipv6_addr is not None:
info = self.log.CreateProgressInfo('100% ',
'Verify IPv6: '
'Bruno has IPv6 address: '
+ ipv6_addr.group())
self.log.SendLine(self.test_info, info)
break
else:
info = self.log.CreateErrorInfo('intermediate',
'Wanring! '
'Bruno did not have IPv6 address')
self.log.SendLine(self.test_info, info)
return
def VerifyDHCP(self):
"""Test Bruno communication with RG through DHCP service.
testID 11724358
"""
# check DHCP communications (DHCP REQUEST, DHCP ACK) between Bruno and RG
self.p_ssh.SendCmd(r'dmesg | grep DHCPREQUEST')
dhcp_request = self.p_ssh.GetCmdOutput(3)[1]
self.p_ssh.SendCmd(r'dmesg | grep DHCPACK')
dhcp_ack = self.p_ssh.GetCmdOutput(3)[1]
# check DHCP REQUEST
dhcp_request_time = re.search(r'[0-9]+.[0-9]+', dhcp_request)
address = ip.IPADDR(dhcp_request).IsLikeIpv4Address()
if address is not None:
ip.IPADDR(address).IsValidIpv4Address()
info = self.log.CreateProgressInfo('20% ',
'Verify DHCP: at time '
+ dhcp_request_time.group() +
', Bruno sent DHCP REQUEST to '
+ address)
self.log.SendLine(self.test_info, info)
else:
info = self.log.CreateErrorInfo('critical',
'Error! Bruno did not send DHCP requst!'
' Exit!')
self.log.SendLine(self.test_info, info)
os.sys.exit(-1)
# check DHCP ACK
dhcp_ack_time = re.search(r'[0-9]+.[0-9]+', dhcp_ack)
address = ip.IPADDR(dhcp_ack).IsLikeIpv4Address()
if address is not None:
ip.IPADDR(address).IsValidIpv4Address()
info = self.log.CreateProgressInfo('40% ',
'Verify DHCP: at time '
+ dhcp_ack_time.group() +
', Bruno got DHCP ACK from '
+ address)
self.log.SendLine(self.test_info, info)
else:
info = self.log.CreateErrorInfo('critical',
'Error! Bruno did not get DHCP ACK!'
' Exit!')
self.log.SendLine(self.test_info, info)
os.sys.exit(-1)
# check DHCP renewal time
self.p_ssh.SendCmd(r'dmesg | grep renewal')
line = self.p_ssh.GetCmdOutput(3)[1]
renewtime = re.search(r'[0-9]+ seconds', line)
if renewtime is not None:
info = self.log.CreateProgressInfo('60% ',
'Verify DHCP: Bruno DHCP service '
'renewal time is '
+ renewtime.group())
self.log.SendLine(self.test_info, info)
else:
info = self.log.CreateErrorInfo('critical',
'Error! Bruno does not have DHCP '
'renewal time! ')
self.log.SendLine(self.test_info, info)
os.sys.exit(-1)
# DHCP ACK time before renewal
dhcp_before_renew = dhcp_ack_time.group()
# check if Bruno can renew DHCP after renew time
renew_time = re.search(r'[0-9]+', renewtime.group()).group()
info = self.log.CreateProgressInfo('80% ',
'Verify DHCP: Wait '
+ renew_time +
' seconds and see if Bruno can get'
' DHCP renew... ')
self.log.SendLine(self.test_info, info)
# sleep for renew_time seconds
time.sleep(int(renew_time))
self.p_ssh.SendCmd(r'dmesg | grep DHCPACK')
line = self.p_ssh.GetCmdOutput(3)[1]
# DHCP ACK time after renewal
dhcp_after_renew = re.search(r'[0-9]+.[0-9]+', line)
if dhcp_after_renew is not None:
dhcp_after_renew = dhcp_after_renew.group()
address = ip.IPADDR(line).IsLikeIpv4Address()
# if DHCP ACK before renewal and DHCP ACK after renewal are different,
# then Bruno got DHCP renewed
if address is not None and dhcp_after_renew != dhcp_before_renew:
ip.IPADDR(address).IsValidIpv4Address()
info = self.log.CreateProgressInfo('100% ',
'Verify DHCP: Bruno got '
'DHCP renewal, at '
+ dhcp_after_renew +
' seconds from'
+ address)
self.log.SendLine(self.test_info, info)
else:
info = self.log.CreateErrorInfo('critical',
'Error! Bruno did not get DHCP renewal! '
'Exit!')
self.log.SendLine(self.test_info, info)
os.sys.exit(-1)
return
def VerifyDNS(self):
"""Check Bruno DNS server information.
This function observes the DNS messages exchanged between RG and Bruno
testID:11792169
"""
# Check DNS server information on Bruno
self.p_ssh.SendCmd(r'dmesg | grep dns')
lines = self.p_ssh.GetCmdOutput(4)
for line in lines:
dns4 = re.search(r'dns4.*', line)
if dns4 is not None:
info = self.log.CreateProgressInfo('100% ',
'Bruno has DNS server '
+ dns4.group())
self.log.SendLine(self.test_info, info)
break
else:
info = self.log.CreateErrorInfo('critical',
'Error! Bruno does not have DNS server! '
'Exit!')
self.log.SendLine(self.test_info, info)
os.sys.exit(-1)
return
def VerifyNTP(self):
"""Check Bruno NTP information.
This function observes the NTP messages exchanged between RG and Bruno
testID:11874105
"""
# Check NTP server information on Bruno
self.p_ssh.SendCmd(r'dmesg | grep ntpd')
for line in reversed(self.p_ssh.p_ssh.before.splitlines()[-10:]):
sync = re.search(r'reply from .*', line)
if sync is not None:
info = self.log.CreateProgressInfo('100% ',
'Bruno NTP is synchronised. '
'NTP info: '
+ sync.group())
self.log.SendLine(self.test_info, info)
break
else:
info = self.log.CreateErrorInfo('intermediate',
'Warning! Bruno\'s NTP service has not '
'been synchronised yet.')
self.log.SendLine(self.test_info, info)
return
def TcpdumpCmd(self, cmd='', pattern='', testcase='', traffic='', timeout=0):
"""This function send TCPDUMP command to Bruno prompt
Then it gets back the TCPDUMP results
If expected traffic is found, send success log
If no expected results, or TIMEOUT, or EOF, send Warning or Error
Args:
cmd:
The TCPDUMP command which will be sent to detect traffic
pattern:
The search pattern in the results
For example, for IPv6 traffic results, it is IPv6
testcase:
The test case we are running now
traffic:
The traffic we are meausuring in one test case
timeout:
Maximun wait time for the traffic
"""
# Send tcpdump command
self.p_ssh.p_ssh.sendline(cmd)
# Wait timeout seconds to see if there is any result
info = self.log.CreateProgressInfo('50%',
'Verify ' + testcase +': Wait for '
+ str(timeout) + ' seconds'
' for ' + traffic + ' traffic...')
self.log.SendLine(self.test_info, info)
i = self.p_ssh.p_ssh.expect([self.bruno.dev_info['bruno_prompt'],
pexpect.EOF, pexpect.TIMEOUT], timeout)
if i == 0:
# Got the tcpdump return results
for result in self.p_ssh.p_ssh.before.splitlines()[-20:]:
ra = re.search(pattern, result)
if ra is not None:
info = self.log.CreateProgressInfo('100%',
'Verify ' + testcase +
': Bruno got '
+ traffic +
' traffic come through!')
self.log.SendLine(self.test_info, info)
break
else:
# Did not find the specific protocol traffic in the tcpdump results
info = self.log.CreateErrorInfo('intermediate',
'Warning! Bruno does not have '
+ traffic + ' traffic come through!')
self.log.SendLine(self.test_info, info)
else:
# TIMEOUT or EOF
info = self.log.CreateErrorInfo('intermediate',
'Warning! TIMEOUT, Bruno does not have '
+ traffic + ' traffic come through!')
self.log.SendLine(self.test_info, info)
# If it is timeout or EOF, ctl + c to end the command
self.p_ssh.p_ssh.sendline('\x03')
# Get rid of the command prompt for next protocol command
self.p_ssh.p_ssh.expect(self.bruno.dev_info['bruno_prompt'])
self.p_ssh.p_ssh.expect(self.bruno.dev_info['bruno_prompt'])
return
def VerifyTCPUDPICMP(self):
"""Verify TCP/UDP/ICMP traffic can go through Bruno
testID: 11721380
thress steps:
1. check if TCP can come through
2. check if UDP can come through
3. check if ICMP can come through
Note: need to make sure traffic is generated first
no need to manually generate traffic
need to add Spirient automation traffic generation
"""
# Check TCP traffic
# Use tcpdump to check on br0 for TCP packets
self.TcpdumpCmd('tcpdump -i br0 -c 1 tcp', 'IP.*ack',
'TCPUDPICMP', 'TCP', 10)
# Check UDP traffic
# Use tcpdump to check on br0 for UDP packets
self.TcpdumpCmd('tcpdump -i br0 -c 1 udp', 'UDP',
'TCPUDPICMP', 'UDP', 10)
# Check ICMP traffic
# Use tcpdump to check on br0 for ICMP packets
self.TcpdumpCmd('tcpdump -i br0 -c 1 icmp', 'ICMP',
'TCPUDPICMP', 'ICMP', 10)
return
def VerifySLAAC(self):
"""Check Bruno IPv6 SLAAC function over Ethernet/MoCA.
testID:11723377
Two steps:
1. check if Router Advertisement can come through
2. check if IPv6 packages can come through
"""
# Check Router Advertisement message
# Router Advertisement message is in Neighbor Discovery ICMPv6 message
# Type Field Value 134
# Send tcpdump command to check Router Advertisement message
self.TcpdumpCmd('tcpdump -s0 -n -c 1 ip6[40+0]==134', 'IPv6',
'SLACC', 'Router Advertisement message', 30)
# Send tcpdump command to check IPv6 traffic
self.TcpdumpCmd('tcpdump -s0 -n -c 1 ip6', 'IPv6',
'SLACC', 'IPv6', 30)
def VerifyIgmpMulticast(self):
"""Check Bruno IGMP/Multicast traffic pass through.
testID:11724362
Two steps:
1. check if IGMP can come through
2. check if Multicast can come through
"""
# Send tcpdump command to check IGMP traffic
self.TcpdumpCmd('tcpdump -c 1 igmp', 'igmp',
'IGMPMULTICAST', 'IGMP', 30)
# Send tcpdump command to check Multicast traffic
# Bruno multicast address is 255.0.0.0/8
self.TcpdumpCmd('tcpdump -c 1 ip net 225.0.0.0/8', 'UDP',
'IGMPMULTICAST', 'Multicast 225.0.0.0/8', 30)
return
def Run(self):
"""Run the test case."""
####### Add your code here -- BEGIN #######
print 'Test Started...'
self.Init()
time.sleep(2) # time delay between commands issued to Device
self.SshTunnel()
print 'Get dev info ...'
self.bruno.GetDevInfo()
self.VerifyIP()
self.VerifyIPv6()
self.VerifyDHCP()
self.VerifyNTP()
self.VerifyDNS()
self.VerifyTCPUDPICMP()
self.VerifySLAAC()
self.VerifyIgmpMulticast()
info = self.log.CreateResultInfo('Pass',
'Finish all networking '
'feature test cases')
self.log.SendLine(self.test_info, info)
print 'Test Completed...'
####### Add your code here -- END #######
def destructor(self):
"""This is the destructor function to call for the user extened class."""
self.p_ssh.close()
if __name__ == '__main__':
"""now do all the networking tests together."""
test = NetworkingTest(testID='11364', key_word='Networking')