blob: ce40ae105dac81acc25bd592b0c4c48f0caa7bbc [file] [log] [blame]
#!/usr/bin/python
# Copyright 2012 Google Inc. All Rights Reserved.
"""This class Extends the base class TestCase."""
__author__ = 'Lin Xue (linxue@google.com)'
"""This class extends the TestCase Class.
This file is for test cases of Bruno networking features.
The networking features test cases include following:
testID 11874104
Verify Bruno can obtain IPv4 address via RG's DHCP service
testID 11792168
Verify different IPv4 subnet (/24, /27)
testID 11724358
Verify DHCP address renewal after its timeout
testID 11792169
Verify DNS information
testID 11874105
Verify NTP service
testID 11721380
Verify TCP/UDP, ICMP Traffic Pass Through
testID 11722376
Verify Bruno IPv6 address
testID 11724362
Verify IGMP/Multicast traffic Pass Through
"""
import os
import re
import time
import pexpect
import testCase
import device
import ip
class NetworkingTest(testCase.TestCase):
""" Class for test cases of Bruno networking features
Configuration Parameters:
testID: 11874104/11792168/11724358/11792169
/11874105/11721380/11722376/11724362/etc.
addr = Bruno's IP address
user = root
pwd = google
cmd_prompt = gfibertv#
file = cmdOutput.txt
title = Networking
"""
verify_progress = 0
def get_Dev(self):
"""Get all device information."""
self.bruno = device.Bruno(user=self.params['user'],
addr=self.params['addr'],
pwd=self.params['pwd'],
cmd_prompt=self.params['cmd_prompt'])
def ssh_Tunnel(self):
"""Create ssh tunnel for the test."""
self.p_ssh = self.bruno.createSSH()
def verifyIP(self):
"""Test Bruno IP address
testID: 11874104
"""
# Use ifconfig to check interface IP address
self.p_ssh.sendCmd(r'ifconfig br0')
line = self.p_ssh.getCmdOutput(3)[1]
# Get Bruno IP address
address = ip.IPADDR(line).is_like_ipv4_address()
if address is not None:
ip.IPADDR(address).is_valid_ipv4_address()
info = self.log.createProgressInfo('50% ',
'Verify IP: Bruno has IP address '
+ address)
self.log.sendLine(self.TESTINFO, info)
else:
info = self.log.createErrorInfo('critical',
'Error! Bruno does not have IP address!'
' Exit!')
self.log.sendLine(self.TESTINFO, info)
os.sys.exit(-1)
# Get Bruno netmask
netmask = re.search(r'mask [0-9]+(?:\.[0-9]+){3}', line).group()
address = ip.IPADDR(netmask).is_like_ipv4_address()
if address is not None:
ip.IPADDR(address).is_valid_ipv4_address()
info = self.log.createProgressInfo('100% ',
'Verify IP: Bruno has netmask: '
+ address)
self.log.sendLine(self.TESTINFO, info)
else:
info = self.log.createErrorInfo('critical',
'Error! Bruno does not have netmask!'
' Exit!')
self.log.sendLine(self.TESTINFO, info)
os.sys.exit(-1)
return
def verifyDHCP(self):
"""Test Bruno communication with RG through DHCP service
testID 11724358
"""
# check DHCP communications (DHCP REQUEST, DHCP ACK) between Bruno and RG
self.p_ssh.sendCmd(r'dmesg | grep DHCPREQUEST')
dhcp_request = self.p_ssh.getCmdOutput(3)[1]
self.p_ssh.sendCmd(r'dmesg | grep DHCPACK')
dhcp_ack = self.p_ssh.getCmdOutput(3)[1]
# check DHCP REQUEST
dhcp_request_time = re.search(r'[0-9]+.[0-9]+', dhcp_request)
address = ip.IPADDR(dhcp_request).is_like_ipv4_address()
if address is not None:
ip.IPADDR(address).is_valid_ipv4_address()
info = self.log.createProgressInfo('20% ',
'Verify DHCP: at time '
+ dhcp_request_time.group()
+', Bruno sent DHCP REQUEST to '
+ address)
self.log.sendLine(self.TESTINFO, info)
else:
info = self.log.createErrorInfo('critical',
'Error! Bruno did not send DHCP requst!'
' Exit!')
self.log.sendLine(self.TESTINFO, info)
os.sys.exit(-1)
# check DHCP ACK
dhcp_ack_time = re.search(r'[0-9]+.[0-9]+', dhcp_ack)
address = ip.IPADDR(dhcp_ack).is_like_ipv4_address()
if address is not None:
ip.IPADDR(address).is_valid_ipv4_address()
info = self.log.createProgressInfo('40% ',
'Verify DHCP: at time '
+ dhcp_ack_time.group()
+', Bruno got DHCP ACK from '
+ address)
self.log.sendLine(self.TESTINFO, info)
else:
info = self.log.createErrorInfo('critical',
'Error! Bruno did not get DHCP ACK!'
' Exit!')
self.log.sendLine(self.TESTINFO, info)
os.sys.exit(-1)
# check DHCP renewal time
self.p_ssh.sendCmd(r'dmesg | grep renewal')
line = self.p_ssh.getCmdOutput(3)[1]
renewtime = re.search(r'[0-9]+ seconds', line)
if renewtime is not None:
info = self.log.createProgressInfo('60% ',
'Verify DHCP: Bruno DHCP service '
'renewal time is '
+ renewtime.group())
self.log.sendLine(self.TESTINFO, info)
else:
info = self.log.createErrorInfo('critical',
'Error! Bruno does not have DHCP '
'renewal time! ')
self.log.sendLine(self.TESTINFO, info)
os.sys.exit(-1)
# DHCP ACK time before renewal
dhcp_before_renew = dhcp_ack_time.group()
# check if Bruno can renew DHCP after renew time
renew_time = re.search(r'[0-9]+', renewtime.group()).group()
info = self.log.createProgressInfo('80% ',
'Verify DHCP: Wait ' + renew_time +
' seconds and see if Bruno can get'
' DHCP renew... ')
self.log.sendLine(self.TESTINFO, info)
# sleep for renew_time seconds
time.sleep(int(renew_time))
self.p_ssh.sendCmd(r'dmesg | grep DHCPACK')
line = self.p_ssh.getCmdOutput(3)[1]
# DHCP ACK time after renewal
dhcp_after_renew = re.search(r'[0-9]+.[0-9]+', line).group()
address = ip.IPADDR(line).is_like_ipv4_address()
# if DHCP ACK before renewal and DHCP ACK after renewal are different,
# then Bruno got DHCP renewed
if address is not None and dhcp_after_renew != dhcp_before_renew:
ip.IPADDR(address).is_valid_ipv4_address()
info = self.log.createProgressInfo('100% ',
'Verify DHCP: Bruno got DHCP renewal,'
' at ' + dhcp_after_renew +
' seconds from ' + address)
self.log.sendLine(self.TESTINFO, info)
else:
info = self.log.createErrorInfo('critical',
'Error! Bruno did not get DHCP renewal! '
'Exit!')
self.log.sendLine(self.TESTINFO, info)
os.sys.exit(-1)
return
def verifyDNS(self):
"""Check Bruno DNS server information
testID:11792169
"""
# Check DNS server information on Bruno
self.p_ssh.sendCmd(r'cat /etc/resolv.conf')
lines = self.p_ssh.getCmdOutput(3)
for line in lines:
address = ip.IPADDR(line).is_like_ipv4_address()
if address is not None:
ip.IPADDR(address).is_valid_ipv4_address()
info = self.log.createProgressInfo('100% ',
'Bruno has DNS server ' + address)
self.log.sendLine(self.TESTINFO, info)
break
else:
info = self.log.createErrorInfo('critical',
'Error! Bruno does not have DNS server! '
'Exit!')
self.log.sendLine(self.TESTINFO, info)
os.sys.exit(-1)
return
def verifyNTP(self):
"""Check Bruno NTP information
testID:11874105
"""
# Check DNS server information on Bruno
self.p_ssh.sendCmd(r'dmesg | grep synced')
line = self.p_ssh.getCmdOutput(3)[1]
sync = re.search(r'synced', line)
if sync.group() is not None:
info = self.log.createProgressInfo('100% ',
'Bruno\'s NTP service is synchronised')
self.log.sendLine(self.TESTINFO, info)
else:
info = self.log.createErrorInfo('intermediate',
'Warning! Bruno\'s NTP service has not '
'been synchronised yet.')
self.log.sendLine(self.TESTINFO, info)
return
def checkTraffic(self, protocol='', isMOCA=0):
"""This function checks TCP/UDP/ICMP traffic on Bruno interface
will be called from function verifyTCPUDPICMP()
Args:
protocol:
to check traffic from specific protocol
tcp: to check tcp traffic
udp: to check udp traffic
icmp: to check icmp traffic
isMOCA:
to check specific interface on Bruno
0: Ethernet interface eth0
1: MOCA interface eth1
Bruno interfaces:
eth0: ethernet port
eth1: MOCA port
"""
# Set the protocol to be checked this time
# protocol_cmd is the specific string in tcpdump for each protocol
# expect_pattern is the specific regex pattern to be searched
# in the expect return string
if protocol == 'tcp':
protocol_cmd = 'tcp'
expect_pattern = 'IP.*ack'
elif protocol == 'udp':
protocol_cmd = 'udp'
expect_pattern = 'UDP'
elif protocol == 'icmp':
protocol_cmd = 'icmp'
expect_pattern = 'ICMP'
else:
# Now support TCP/UDP/ICMP
info = self.log.createErrorInfo('intermediate',
'Warning! Protocol not supported ')
self.log.sendLine(self.TESTINFO, info)
# Set interface to be checked this time
# interface_cmd is the interface string for tcpdump
# interface_output is the interface string for output
if isMOCA == 0:
interface_cmd = 'eth0'
interface_output = 'Ethernet port eth0'
else:
interface_cmd = 'eth1'
interface_output = 'MOCA port eth1'
# Set the progress output for generate traffic
# after each output, verify progress need to be increased
if isMOCA == 0:
info = self.log.createProgressInfo(str(self.verify_progress) + '%',
'Verify ' + protocol +
': Generate ' + protocol +
' traffic ... now manual, '
'need Spirient traffic generator ')
self.log.sendLine(self.TESTINFO, info)
self.verify_progress += 10
# Send tcpdump command to prompt
self.p_ssh.p_ssh.sendline('tcpdump -i ' + interface_cmd + ' -c 10 ' + protocol_cmd)
i = self.p_ssh.p_ssh.expect([self.bruno.devInfo['cmd_prompt'], pexpect.EOF, pexpect.TIMEOUT], 5)
if i == 0:
# Got the tcpdump return results
for result in reversed(self.p_ssh.p_ssh.before.splitlines()[-10:]):
protocol_flag = re.search(expect_pattern, result)
if protocol_flag is not None:
# Found the specific protocol traffic in the tcpdump results
info = self.log.createProgressInfo(str(self.verify_progress) + '%',
'Verify ' + protocol +
': Bruno got ' + protocol +
' traffic went through ' +
interface_output)
self.log.sendLine(self.TESTINFO, info)
self.verify_progress += 10
break
else:
# Did not find the specific protocol traffic in the tcpdump results
info = self.log.createErrorInfo('intermediate',
'Warning! packet not found '
'Bruno does not have ' + protocol +
' traffic went through ' + interface_output)
self.log.sendLine(self.TESTINFO, info)
self.verify_progress += 10
else:
# Got TIMEOUT or EOF, no results got from tcpdump
info = self.log.createErrorInfo('intermediate',
'Warning! TIMEOUT or EOF '
'Bruno does not have ' + protocol +
' traffic went through ' + interface_output)
self.log.sendLine(self.TESTINFO, info)
self.verify_progress += 10
# If it is timeout or EOF, ctl + c to end the command
self.p_ssh.p_ssh.sendline('\x03')
# Get rid of the command prompt for next protocol command
self.p_ssh.p_ssh.expect(self.bruno.devInfo['cmd_prompt'])
self.p_ssh.p_ssh.expect(self.bruno.devInfo['cmd_prompt'])
def verifyTCPUDPICMP(self):
"""Verify TCP/UDP/ICMP traffic can go through Bruno
testID: 11721380
"""
# Check TCP traffic
# TCP: first check ethernet port (eth0) on Bruno
self.checkTraffic('tcp', 0)
# TCP: then check MOCA port (eth1) on Bruno
self.checkTraffic('tcp', 1)
# Check UDP traffic
# UDP: first check ethernet port (eth0) on Bruno
self.checkTraffic('udp', 0)
# UDP: then check MOCA port (eth1) on Bruno
self.checkTraffic('udp', 1)
# Check ICMP traffic
# ICMP: first check ethernet port (eth0) on Bruno
self.checkTraffic('icmp', 0)
# ICMP: then check MOCA port (eth1) on Bruno
self.checkTraffic('icmp', 1)
return
def run(self):
"""Run the test case."""
####### Add your code here -- BEGIN #######
print "Test Started..."
self.get_Dev()
time.sleep(2) # time delay between commands issued to Device
self.ssh_Tunnel()
print "Get dev info ..."
self.bruno.getDevInfo()
self.verifyIP()
self.verifyNTP()
self.verifyDHCP()
self.verifyDNS()
self.verifyTCPUDPICMP()
info = self.log.createResultInfo('Pass',
'Finish all networking '
'feature test cases')
self.log.sendLine(self.TESTINFO, info)
print "Test Completed..."
####### Add your code here -- END #######
def destructor(self):
"""This is the destructor function to call for the user extened class."""
self.p_ssh.close()
if __name__ == "__main__":
"""now do all the networking tests together."""
test = NetworkingTest(testID='11364', key_word='Networking')