blob: 9fcd5da405f78738eed8c62ca93ea4122d799886 [file] [log] [blame]
#!/usr/bin/python
"""Fake ip route implementation."""
import logging
import socket
import struct
import ifup
logger = logging.getLogger(__name__)
_ROUTING_TABLE = {}
_IP_TABLE = {}
def call(subcommand, *args):
"""Fake ip command."""
subcommands = {
'route': _ip_route,
'addr': _ip_addr,
'link': _link,
}
if subcommand not in subcommands:
return 1, 'ip subcommand %r not supported' % subcommand
return subcommands[subcommand](args)
def register_testonly(interface):
if interface not in _IP_TABLE:
_IP_TABLE[interface] = set()
def _ip_route(args):
def can_add_route(dev):
def ip_to_int(ip_addr):
return struct.unpack('!I', socket.inet_pton(socket.AF_INET, ip_addr))[0]
if args[1] != 'default':
return True
via = ip_to_int(args[args.index('via') + 1])
for (ifc, route, _), _ in _ROUTING_TABLE.iteritems():
if ifc != dev:
continue
netmask = 0
if '/' in route:
route, netmask = route.split('/')
netmask = 32 - int(netmask)
route = ip_to_int(route)
if (route >> netmask) == (via >> netmask):
return True
return False
if not args:
return 0, '\n'.join(_ROUTING_TABLE.values())
if 'dev' not in args:
raise Exception('fake ip route got no dev')
dev = args[args.index('dev') + 1]
metric = None
if 'metric' in args:
metric = args[args.index('metric') + 1]
if args[0] in ('add', 'del'):
route = args[1]
key = (dev, route, metric)
if args[0] == 'add' and key not in _ROUTING_TABLE:
if not can_add_route(dev):
return (1, 'Tried to add default route without subnet route: %r' %
_ROUTING_TABLE)
logger.debug('Adding route for %r', key)
_ROUTING_TABLE[key] = ' '.join(args[1:])
elif args[0] == 'del':
if key in _ROUTING_TABLE:
logger.debug('Deleting route for %r', key)
del _ROUTING_TABLE[key]
elif key[2] is None:
# pylint: disable=g-builtin-op
for k in _ROUTING_TABLE.keys():
if k[:-1] == key[:-1]:
logger.debug('Deleting route for %r (generalized from %s)', k, key)
del _ROUTING_TABLE[k]
break
return 0, ''
# pylint: disable=line-too-long
_IP_ADDR_SHOW_TPL = """4: {name}: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000
link/ether fe:fb:01:80:1b:74 brd ff:ff:ff:ff:ff:ff
{ips}
"""
_IP_ADDR_SHOW_IP_TPL = """ inet {ip}/24 brd 100.100.255.255 scope global {name}
valid_lft forever preferred_lft forever
"""
def _ip_addr(args):
if 'dev' not in args:
raise Exception('fake ip addr show got no dev')
dev = args[args.index('dev') + 1]
if dev not in _IP_TABLE:
return 255, 'Device "%r" does not exist' % dev
if 'show' in args:
ips = '\n'.join(_IP_ADDR_SHOW_IP_TPL.format(name=dev, ip=addr)
for addr in _IP_TABLE[dev])
return 0, _IP_ADDR_SHOW_TPL.format(name=dev, ips=ips)
if 'add' in args:
add = args[args.index('add') + 1]
_IP_TABLE[dev].add(add)
return 0, ''
if 'del' in args:
remove = args[args.index('del') + 1]
if remove in _IP_TABLE[dev]:
_IP_TABLE[dev].remove(remove)
return 0, ''
return 254, 'RTNETLINK answers: Cannot assign requested address'
raise Exception('no recognized ip addr command in %r' % args)
def _link(args):
return 0, '\n'.join('%s LOWER_UP' % interface
for interface, state in ifup.INTERFACE_STATE.iteritems()
if state)