blob: 0b9bfaeae2556bf04c2af044301775ace6526d4b [file] [log] [blame]
#!/usr/bin/python
# Copyright 2012 Google Inc. All Rights Reserved.
"""This class implements some basic TR069 test cases.
This class extends the TestCase Class.
This file implements some basic TR069 test cases to verify Bruno management
features. In the following test cases, only those that are defined in
'config.cfg' file will be executed.
Basic TR069 test cases include the following:
testID: 11865129 7.6.1 ACS Connection via IPv6
testID: 11721375 7.6.10 Verify there is a way to reset to factory default
if user modifies the local configuration
'/etc/init.d/S05factoryreset later'
testID: 14319146 7.6.19 Default ACS URL
testID: 11721371 7.6.2 ACS URL Pass to Bruno via Moca
testID: 14312009 7.6.21 Restrict the ability to locally configure ACS URL
testID: 14186028 7.6.22 Security of the link between the DHCPv6 server
and the CPE
testID: 14312010 7.6.23 CPE Connection Initiation
testID: 14247012 7.6.24.1 Support of Connection Request notification
mechanism
testID: 14186029 7.6.24.2 InternetGatewayDevice.ManagementServer.
ConnectionRequestURL
testID: 14312011 7.6.24.3 InternetGatewayDevice.ManagementServer.
ConnectionRequestURL format
testID: 14293012 7.6.24.4 No data is conveyed in the Connection
Request HTTP Get notification
testID: 14165037 7.6.24.5 Restrict the number of Connection Request
notifications
testID: 14315010 7.6.24.6 Connection Request Notification does not
terminate current session
testID: 14247013 7.6.25.1 MaxEnvelopes
testID: 14312012 7.6.25.2 Cookie support (optional)
testID: 14293013 7.6.25.4 Authentication
testID: 14311008 7.6.26.1 SOAP namespace identifier
testID: 14293014 7.6.26.2 SOAP Fault element
testID: 14311009 7.6.26.3 SOAP future extensibility
testID: 14186030 7.6.27 Fault Handling
testID: 14319148 7.6.27.1 SOAP requests respond
testID: 14266497 7.6.27.2 Session Termination
testID: 14312013 7.6.28 Correctly handle concurrent downloads in catawampus
testID: 11874098 7.6.3.1 TLS (1.2)/SSL based Transport for TR-069 on IPv6
testID: 14315007 7.6.3.2 Support for encryption algorithms
testID: 14247014 7.6.32 Persist TR-069 config across reboots
testID: 14293015 7.6.33 Send notification when disk is full
testID: 14293016 7.6.34 Support for multiple SSIDs in WLANConfiguration
testID: 11865130 7.6.4 TR069-ACS Validation
testID: 11722370 7.6.5 TR069-Inform Request
testID: 11874103 7.6.7 Digest Authentication
testID: 11843139 7.6.8.1 Support of RPCs: REBOOT
"""
__author__ = 'Lehan Meng (lmeng@google.com)'
import os
import re
import time
from IPy import IP
import device
import ssh
import testCase
class BasicTR069Test(testCase.TestCase):
"""Class for test cases: Bruno basic TR069 features.
A range of basic TR069 test cases are defined in this Class
"""
def Init(self):
"""Initialize the networking feature tests."""
# Get all device information.
info = self.log.CreateResultInfo(
'---', 'Networking test verification started...')
self.log.SendLine(self.test_info, info)
self.bruno = device.Bruno(user=self.params['user'],
addr=self.params['addr'],
bruno_prompt=self.params['bruno_prompt'],
addr_ipv6=self.params['addr_ipv6'])
self.bruno.SetLogging(self.log)
if not self.p_ssh:
self.p_ssh = ssh.SSH(user=self.params['user'],
addr=self.params['addr'],
bruno_prompt=self.params['bruno_prompt'],
addr_ipv6=self.params['addr_ipv6'])
self.bruno.dev_ssh = self.p_ssh
self.p_ssh.SetLogging(self.log)
self.__delay_5_sec = 5
self.__delay_5_min = 300
self.__delay_15_min = 900
return
def SshTunnel(self):
"""Create ssh tunnel for the test."""
self.p_ssh.SshRetry(5, 15)
return
def VerifyDuplicatedAddress(self, ip_addr):
"""Verify device can detect duplicated ipv6 address existing in network.
Set a duplicated ip address on the device.
Verify that the device can detect duplicated ip address
Args:
ip_addr: the duplicated address
Returns:
True: if duplicated detected
False: if duplicated address is not detected
"""
time_stamp = self.bruno.GetTimeStamp()
self.bruno.dev_ssh.SendCmd('ip -6 addr add ' + ip_addr + '/64 dev br0')
time.sleep(1)
self.bruno.dev_ssh.SendCmd('dmesg')
log_list = self.bruno.dev_ssh.GetCmdOutput()
pattern_line = (
'\[\d*\.\d{3}\]\s*\w*\d:\s* IPv6 duplicate address\s*('
+ ip_addr + ')\s*[/\d+]*\s*detected!')
self.bruno.dev_ssh.SendCmd('ip -6 addr del ' + ip_addr + '/64 dev br0')
if self.bruno.FindLineInLog(log_list, time_stamp, pattern_line):
info = self.log.CreateResultInfo('Pass', 'Duplicated IPv6 address '
'detected: ' + ip_addr)
self.log.SendLine(self.test_info, info)
else:
info = self.log.CreateResultInfo('Failed',
'Duplicated IPv6 address not detected: '
+ ip_addr + '. Verification Failed!')
self.log.SendLine(self.test_info, info)
#[132344.324] br0: IPv6 duplicate address fe80:... detected!
def VerifyExpectedVersion(self):
"""Verify if device is running on expected software version."""
if not self.bruno.VerifyCurrentVersion(self.params['expected_version']):
info = self.log.CreateErrorInfo('critical',
'Device software version is not expected!'
' Exit!')
self.log.SendLine(self.test_info, info)
os.sys.exit(1)
def VerifyACSWithIPv6(self):
"""Test ID: 11865129 - Verify IPv6 connection to ACS.
To verify device connects to ACS via IPv6, use tcpdump to capture
network packets, filtered with source IPv6 address, and destination
IPv6 address of Bruno and ACS.
tcpdump filter:
1. source address: device address
destination address: ACS address
2. source address: ACS address
destination address: device address
if packets on both directions are found, test succeeds.
Otherwise, test failed.
"""
info = self.log.CreateProgressInfo(
'10%', 'Verifying current image version, expected: '
+ self.params['expected_version'])
self.log.SendLine(self.test_info, info)
self.VerifyExpectedVersion()
# capture 10 packets on both directions.
# capture length: 100 bytes
packet_to_catch = '5'
bytes_to_catch = '100'
interface_to_catch = 'br0'
tcpdump_command = ('tcpdump -i ' + interface_to_catch + ' -c '
+ packet_to_catch + ' -s ' + bytes_to_catch
+ ' \'(((src ' + self.params['addr_ipv6']
+ ') and (dst ' + self.params['acs_ipv6']
+ ')) or ((src ' + self.params['acs_ipv6']
+ ') and (dst ' + self.params['addr_ipv6']
+ ')))\'')
info = self.log.CreateProgressInfo(
'30%', 'Calling tcpdump to capture device traffic to ACS server: '
'capturing on interface: ' + interface_to_catch
+ ' Catch packets: ' + packet_to_catch)
self.log.SendLine(self.test_info, info)
self.p_ssh.SendCmd(tcpdump_command, 180)
info = self.log.CreateProgressInfo(
'60%', 'Packets captured. Analyzing...')
self.log.SendLine(self.test_info, info)
tcpdump_result = self.p_ssh.GetCmdOutput()
tcpdump_result.reverse()
if self.CheckACSIPv6Connection(tcpdump_result):
info = self.log.CreateProgressInfo(
'100%', 'TR069 traffic verified: Device connected to ACS via IPv6')
self.log.SendLine(self.test_info, info)
info = self.log.CreateResultInfo(
'Pass', 'TR069 traffic verified: Device connected to ACS via IPv6')
self.log.SendLine(self.test_info, info)
else:
info = self.log.CreateResultInfo(
'Failed', 'TR069 traffic verified: No IPv6 connection detected.')
self.log.SendLine(self.test_info, info)
def ParseDumpMsgToPackets(self, tcpdump_result):
"""Parse tcpdump dump messages information to packets.
tcpdump result (for example):
14:24:36.887936 00:1a:11:30:64:e6 (oui Unknown) > a8:d0:e5:a1:e3:02 L1
(oui Unknown), ethertype IPv6 (0x86dd), length 86: L2
0x0000: 6000 0000 0020 0640 2605 a601 fe00 fd18 `......@&....... L3
0x0010: 021a 11ff fe30 64e6 2001 4860 8005 0000 .....0d...H`.... L4
0x0020: 0000 0000 0000 008d e2e8 01bb 73e4 0dc8 ............s... L5
0x0030: a1bf d109 8011 0429 276b 0000 0101 080a .......)'k...... L6
0x0040: 0389 b875 d29d 2401 ...u..$. L7
14:24:36.912863 a8:d0:e5:a1:e3:02 (oui Unknown) > 00:1a:11:30:64:e6 L8
(oui Unknown), ethertype IPv6 (0x86dd), length 86: L9
0x0000: 6000 0000 0020 0634 2001 4860 8005 0000 `......4..H`.... L10
0x0010: 0000 0000 0000 008d 2605 a601 fe00 fd18 ........&....... L11
0x0020: 021a 11ff fe30 64e6 01bb e2e8 a1bf d109 .....0d......... L12
0x0030: 73e4 0dc9 8011 05a8 6bdc 0000 0101 080a s.......k....... L13
0x0040: d29d 763c 0389 b875 ..v<...u L14
This method will parse the dump message to two packets dump:
(new packet starts with time stamp: 14:24:36.912863)
packet_list[0]: line L1 ~ line L7
packet_list[1]: line L8 ~ line L14
Args:
tcpdump_result: raw dump message from tcpdump
Returns:
packet_list
"""
packet_list = []
packet = []
for line in tcpdump_result:
m = re.search('^\s*\d{2}:\d{2}:\d{2}\.\d+\s*', line)
if m:
# new packet found:
packet = []
packet.append(line)
index = tcpdump_result.index(line)
while index+1 < len(tcpdump_result):
packet_line = tcpdump_result[index+1]
m = re.search('^\s*\d{2}:\d{2}:\d{2}\.\d+\s*', packet_line)
if not m:
# not start of next packet
packet.append(packet_line)
index +=1
else:
# next packet starts
packet_list.append(packet)
break
return packet_list
def FindSrcAddrFromTcpdump(self, packet):
"""Parse source IPv6 address from tcpdump message.
Args:
packet: tcpdump dump message
Returns:
source address in packet dump message
"""
src_addr = None
for line in packet:
m = re.search('^\s*0x0000:\s*([\da-f]{4})\s*([\da-f]{4})\s*([\da-f]{4})'
'\s*([\da-f]{4})\s*([\da-f]{4})\s*([\da-f]{4})'
'\s*([\da-f]{4})\s*([\da-f]{4})\s*.*', line)
if m:
# first line of packet data:
src_addr = (m.group(5) + ':' + m.group(6) + ':' + m.group(7)
+ ':' + m.group(8)) + ':'
index = packet.index(line)
m = re.search('^\s*0x0010:\s*([\da-f]{4})\s*([\da-f]{4})\s*([\da-f]{4})'
'\s*([\da-f]{4})\s*([\da-f]{4})\s*([\da-f]{4})'
'\s*([\da-f]{4})\s*([\da-f]{4})\s*.*', packet[index+1])
src_addr += (m.group(1) + ':' + m.group(2) + ':' + m.group(3)
+ ':' + m.group(4))
break
return src_addr
def FindDstAddrFromTcpdump(self, packet):
"""Parse destination IPv6 address from tcpdump message.
Args:
packet: tcpdump dump message
Returns:
source address in packet dump message
"""
dst_addr = None
for line in packet:
m = re.search('^\s*0x0010:\s*([\da-f]{4})\s*([\da-f]{4})\s*([\da-f]{4})'
'\s*([\da-f]{4})\s*([\da-f]{4})\s*([\da-f]{4})'
'\s*([\da-f]{4})\s*([\da-f]{4})\s*.*', line)
if m:
# first line of packet data:
dst_addr = (m.group(5) + ':' + m.group(6) + ':' + m.group(7)
+ ':' + m.group(8)) + ':'
index = packet.index(line)
m = re.search('^\s*0x0020:\s*([\da-f]{4})\s*([\da-f]{4})\s*([\da-f]{4})'
'\s*([\da-f]{4})\s*([\da-f]{4})\s*([\da-f]{4})'
'\s*([\da-f]{4})\s*([\da-f]{4})\s*.*', packet[index+1])
dst_addr += (m.group(1) + ':' + m.group(2) + ':' + m.group(3)
+ ':' + m.group(4))
break
return dst_addr
def CheckACSIPv6Connection(self, tcpdump_result):
"""Examine captured packets, find IPv6 communication between ACS and device.
Args:
tcpdump_result: tcpdump result: captured packets message dump
Returns:
True: if bi-directional communication found from packet list.
False: otherwise
"""
packet_list = self.ParseDumpMsgToPackets(tcpdump_result)
dev_to_acs = False
acs_to_dev = False
for pacekt in packet_list:
src_addr = IP(self.FindSrcAddrFromTcpdump(pacekt))
dst_addr = IP(self.FindDstAddrFromTcpdump(pacekt))
if (src_addr == IP(self.params['addr_ipv6'])
and dst_addr == IP(self.params['acs_ipv6'])):
dev_to_acs = True
if (dst_addr == IP(self.params['addr_ipv6'])
and src_addr == IP(self.params['acs_ipv6'])):
acs_to_dev = True
if dev_to_acs and acs_to_dev:
return True
else:
return False
def VerifyFactoryReset(self):
"""Test case: 11721375, Verify the factory reset function of device.
Verify factory reset according to the following conditions:
- version
- address
- cat /etc/ntpd.conf
- cat /etc/resolv.conf
- cat /tmp/gvsbhost
- ls /tmp/gvsb*
- cat /tmp/cwmp/acs_url
- ls -l /var/media
- ls -l /rw/sage
This test will need to manually reset the device using reset button.
"""
test_failed = False
info = self.log.CreateProgressInfo(
'10%', 'Verifying current image version, expected: '
+ self.params['expected_version'])
self.log.SendLine(self.test_info, info)
self.VerifyExpectedVersion()
# get file time information:
wiz_bin_date = None
self.p_ssh.SendCmd('ls -l /rw/sage')
cmd_list = self.p_ssh.GetCmdOutput()
for line in cmd_list:
if 'Wiz.bin' in line:
m = re.search('.*(\d{4}-\d{2}-\d{2}\s*\d+:\d+).*', line)
if m:
wiz_bin_date = m.group(1)
info = self.log.CreateProgressInfo(
'20%', 'Please reset the device. '
'Hold down reset button for 5 seconds...')
self.log.SendLine(self.test_info, info)
info = self.log.CreateProgressInfo(
'30%', 'Wait ' + str(self.__delay_5_min)
+ ' for device to reset and reboot...')
self.log.SendLine(self.test_info, info)
time.sleep(self.__delay_5_min)
info = self.log.CreateProgressInfo(
'40%', 'Connecting to device...')
self.log.SendLine(self.test_info, info)
self.SshTunnel()
addr = self.bruno.GetIPv4Address()
if not addr:
info = self.log.CreateErrorInfo('critical',
'Error! No IPv4 address found!')
self.log.SendLine(self.test_info, info)
test_failed = True
else:
info = self.log.CreateProgressInfo(
'50%', 'Checking device IPv4 address succeed: IPv4: '
+ addr)
self.log.SendLine(self.test_info, info)
addr_v6 = self.bruno.GetIPv6Address()
if not addr_v6:
info = self.log.CreateErrorInfo('warning', 'No IPv6 address found!')
self.log.SendLine(self.test_info, info)
else:
info = self.log.CreateProgressInfo(
'55%', 'Checking device IPv6 address succeed: IPv6: '
+ addr_v6[0])
self.log.SendLine(self.test_info, info)
if self.bruno.GetProductClass() == 'GFMS100':
# fat bruno, check Wiz.bin file:
found_file = False
self.p_ssh.SendCmd('ls -l /rw/sage')
cmd_list = self.p_ssh.GetCmdOutput()
for line in cmd_list:
if 'Wiz.bin' in line:
found_file = True
m = re.search('.*(\d{4}-\d{2}-\d{2}\s*\d+:\d+).*', line)
new_date = m.group(1)
if new_date == wiz_bin_date:
# if file is not updated
test_failed = True
info = self.log.CreateErrorInfo(
'warning', 'Wiz.bin not updated: '
'file date before reset: ' + wiz_bin_date
+ ' new file date: ' + new_date)
self.log.SendLine(self.test_info, info)
else:
info = self.log.CreateProgressInfo(
'60%', 'Checking Wiz.bin file succeed: new file created at: '
+ new_date)
self.log.SendLine(self.test_info, info)
break
if not found_file:
test_failed = True
info = self.log.CreateErrorInfo(
'critical', 'No Wiz.bin file found after reset!')
self.log.SendLine(self.test_info, info)
# verify ntpd.conf
ts = self.bruno.GetTimeServer()
if not ts:
test_failed = True
info = self.log.CreateErrorInfo(
'critical', 'No time server found in file /etc/ntpd.conf!')
self.log.SendLine(self.test_info, info)
else:
info = self.log.CreateProgressInfo(
'65%', 'Checking time server (ntpd.conf) succeed: Timeserver: '
+ ts)
self.log.SendLine(self.test_info, info)
# verify resolv.conf
dns = self.bruno.GetDNSServer()
if not dns:
test_failed = True
info = self.log.CreateErrorInfo(
'critical', 'No DNS server found in file /etc/resolv.conf!')
self.log.SendLine(self.test_info, info)
else:
info = self.log.CreateProgressInfo(
'70%', 'Checking DNS server (resolv.conf) succeed: DNS: '
+ dns)
self.log.SendLine(self.test_info, info)
# check gvsb:
gvsb = self.bruno.GetGVSBHost().strip()
if not gvsb or (gvsb != self.params['gvsb_host']):
test_failed = True
info = self.log.CreateErrorInfo(
'critical', 'GVSB host not found or not matching expected host!')
self.log.SendLine(self.test_info, info)
else:
info = self.log.CreateProgressInfo(
'75%', 'Checking GVSB server (gvsbhost) succeed: URL: '
+ gvsb)
self.log.SendLine(self.test_info, info)
# check gvsb files:
self.bruno.dev_ssh.SendCmd('ls /tmp/gvsb*')
cmd_list = self.bruno.dev_ssh.GetCmdOutput()
gvsb_channel = False
gvsb_kick = False
gvsb_host = False
for line in cmd_list:
if 'gvsbhost' in line:
gvsb_host = True
elif 'gvsbchannel' in line:
gvsb_channel = True
elif 'gvsbkick' in line:
gvsb_kick = True
if not gvsb_channel or not gvsb_kick or not gvsb_host:
test_failed = True
info = self.log.CreateErrorInfo(
'critical', 'GVSB files missing! Check /tmp/for gvsb files')
self.log.SendLine(self.test_info, info)
else:
info = self.log.CreateProgressInfo(
'80%', 'Checking GVSB files (/tmp/gvsb*) succeed. Files found: '
'gvsbhost, gvsbchannel, gvsbkick.')
self.log.SendLine(self.test_info, info)
# verify acs url:
if self.bruno.GetACSUrl() != self.params['acs_url']:
test_failed = True
info = self.log.CreateErrorInfo(
'critical', 'ACS URL is not expected! Expected URL: '
+ self.params['acs_url'])
self.log.SendLine(self.test_info, info)
else:
info = self.log.CreateProgressInfo(
'85%', 'Checking ACS URL (/tmp/cwmp/acs_url) succeed. URL: '
+ self.params['acs_url'])
self.log.SendLine(self.test_info, info)
# verify sage properties
self.bruno.dev_ssh.SendCmd('ls -l /rw/sage')
cmd_list = self.bruno.dev_ssh.GetCmdOutput()
sage_prop = False
sage_prop_backup = False
for line in cmd_list:
if 'Sage.properties.autobackup' in line:
sage_prop_backup = True
elif 'Sage.properties' in line:
sage_prop = True
if not sage_prop or not sage_prop_backup:
test_failed = True
info = self.log.CreateErrorInfo(
'critical', 'Missing file in /rw/sage folder.')
self.log.SendLine(self.test_info, info)
else:
info = self.log.CreateProgressInfo(
'90%', 'Checking sage properties files (/rw/sage/) succeed.')
self.log.SendLine(self.test_info, info)
if test_failed:
info = self.log.CreateResultInfo(
'Failed', 'Device configuration not '
'restored completely after factory reset!')
self.log.SendLine(self.test_info, info)
else:
info = self.log.CreateResultInfo(
'Pass', 'Device configuration '
'successfully restored after factory reset.')
self.log.SendLine(self.test_info, info)
info = self.log.CreateProgressInfo(
'100%', 'Device configuration '
'successfully restored after factory reset.')
self.log.SendLine(self.test_info, info)
def VerifyDefaultACSURL(self):
"""Test case: 14319146. This method verifies default ACS URL on device."""
# this test needs to disable the Ethernet port, therefore can not
# be automated via ssh tunnel
print 'pass this test case'
def Run(self):
"""Run the test case."""
####### Add your code here -- BEGIN #######
print 'Test Started...'
self.Init()
time.sleep(2) # time delay between commands issued to Device
self.SshTunnel()
info = self.log.CreateProgressInfo(
'5%', 'SSH session to device successfully established!')
self.log.SendLine(self.test_info, info)
if self.test_info['testID'] == '11865129':
self.VerifyACSWithIPv6()
elif self.test_info['testID'] == '11721375':
self.VerifyFactoryReset()
elif self.test_info['testID'] == '14319146':
self.VerifyDefaultACSURL()
elif self.test_info['testID'] == '11721371':
print 'run test'
elif self.test_info['testID'] == '14312009':
print 'run test'
elif self.test_info['testID'] == '14186028':
print 'run test'
elif self.test_info['testID'] == '14312010':
print 'run test'
elif self.test_info['testID'] == '14247012':
print 'run test'
elif self.test_info['testID'] == '14186029':
print 'run test'
elif self.test_info['testID'] == '14312011':
print 'run test'
elif self.test_info['testID'] == '14293012':
print 'run test'
elif self.test_info['testID'] == '14165037':
print 'run test'
elif self.test_info['testID'] == '14315010':
print 'run test'
elif self.test_info['testID'] == '14247013':
print 'run test'
elif self.test_info['testID'] == '14312012':
print 'run test'
elif self.test_info['testID'] == '14293013':
print 'run test'
elif self.test_info['testID'] == '14311008':
print 'run test'
elif self.test_info['testID'] == '14293014':
print 'run test'
elif self.test_info['testID'] == '14311009':
print 'run test'
elif self.test_info['testID'] == '14186030':
print 'run test'
elif self.test_info['testID'] == '14319148':
print 'run test'
elif self.test_info['testID'] == '14266497':
print 'run test'
elif self.test_info['testID'] == '14312013':
print 'run test'
elif self.test_info['testID'] == '11874098':
print 'run test'
elif self.test_info['testID'] == '14315007':
print 'run test'
elif self.test_info['testID'] == '14247014':
print 'run test'
elif self.test_info['testID'] == '14293015':
print 'run test'
elif self.test_info['testID'] == '14293016':
print 'run test'
elif self.test_info['testID'] == '11865130':
print 'run test'
elif self.test_info['testID'] == '11722370':
print 'run test'
elif self.test_info['testID'] == '11874103':
print 'run test'
elif self.test_info['testID'] == '11843139':
print 'run test'
else:
print 'do nothing...'
print 'Test Completed...'
####### Add your code here -- END #######
def Destructor(self):
"""This is the destructor function to call for the user extened class."""
self.p_ssh.close()
if __name__ == '__main__':
# Test cases defined in this file:
test_defined = ['11865129', '11721375', '14319146', '11721371',
'14312009', '14186028', '14312010', '14247012', '14186029',
'14312011', '14293012', '14165037', '14315010', '14247013',
'14312012', '14293013', '14311008', '14293014', '14311009',
'14186030', '14319148', '14266497', '14312013', '11874098',
'14315007', '14247014', '14293015', '14293016', '11865130',
'11722370', '11874103', '11843139']
# To execute test cases that defined in config file:
test_to_execute = testCase.TestCase.FindTestCaseToExecute('config.cfg')
for test_id in test_to_execute:
if test_id in test_defined:
test = BasicTR069Test(testID=test_id)