Merge branch 'rel/5.1'

This commit is contained in:
bharnden 2018-05-22 20:44:26 -07:00
commit c3d0b01b7f
293 changed files with 6907 additions and 34130 deletions

View file

@ -4,11 +4,11 @@ implementing specific node types.
"""
import socket
import subprocess
import threading
from socket import AF_INET
from socket import AF_INET6
from core import CoreCommandError
from core import constants
from core import logger
from core.coreobj import PyCoreNetIf
@ -68,32 +68,35 @@ class CtrlNet(LxBrNet):
Startup functionality for the control network.
:return: nothing
:raises CoreCommandError: when there is a command exception
"""
if self.detectoldbridge():
return
LxBrNet.startup(self)
if self.hostid:
addr = self.prefix.addr(self.hostid)
else:
addr = self.prefix.max_addr()
msg = "Added control network bridge: %s %s" % (self.brname, self.prefix)
addrlist = ["%s/%s" % (addr, self.prefix.prefixlen)]
logger.info("added control network bridge: %s %s", self.brname, self.prefix)
if self.assign_address:
addrlist = ["%s/%s" % (addr, self.prefix.prefixlen)]
self.addrconfig(addrlist=addrlist)
msg += " address %s" % addr
logger.info(msg)
if self.updown_script is not None:
logger.info("interface %s updown script (%s startup) called",
self.brname, self.updown_script)
subprocess.check_call([self.updown_script, self.brname, "startup"])
if self.serverintf is not None:
try:
subprocess.check_call([constants.BRCTL_BIN, "addif", self.brname, self.serverintf])
subprocess.check_call([constants.IP_BIN, "link", "set", self.serverintf, "up"])
except subprocess.CalledProcessError:
logger.exception("Error joining server interface %s to controlnet bridge %s",
self.serverintf, self.brname)
logger.info("address %s", addr)
if self.updown_script:
logger.info("interface %s updown script (%s startup) called", self.brname, self.updown_script)
utils.check_cmd([self.updown_script, self.brname, "startup"])
if self.serverintf:
# sets the interface as a port of the bridge
utils.check_cmd([constants.BRCTL_BIN, "addif", self.brname, self.serverintf])
# bring interface up
utils.check_cmd([constants.IP_BIN, "link", "set", self.serverintf, "up"])
def detectoldbridge(self):
"""
@ -103,32 +106,23 @@ class CtrlNet(LxBrNet):
:return: True if an old bridge was detected, False otherwise
:rtype: bool
"""
retstat, retstr = utils.cmdresult([constants.BRCTL_BIN, "show"])
if retstat != 0:
status, output = utils.cmd_output([constants.BRCTL_BIN, "show"])
if status != 0:
logger.error("Unable to retrieve list of installed bridges")
lines = retstr.split("\n")
for line in lines[1:]:
cols = line.split("\t")
oldbr = cols[0]
flds = cols[0].split(".")
if len(flds) == 3:
if flds[0] == "b" and flds[1] == self.objid:
logger.error(
"Error: An active control net bridge (%s) found. " \
"An older session might still be running. " \
"Stop all sessions and, if needed, delete %s to continue." % \
(oldbr, oldbr)
)
return True
"""
# Do this if we want to delete the old bridge
logger.warn("Warning: Old %s bridge found: %s" % (self.objid, oldbr))
try:
check_call([BRCTL_BIN, "delbr", oldbr])
except subprocess.CalledProcessError as e:
logger.exception("Error deleting old bridge %s", oldbr, e)
logger.info("Deleted %s", oldbr)
"""
else:
lines = output.split("\n")
for line in lines[1:]:
cols = line.split("\t")
oldbr = cols[0]
flds = cols[0].split(".")
if len(flds) == 3:
if flds[0] == "b" and flds[1] == self.objid:
logger.error(
"error: An active control net bridge (%s) found. "
"An older session might still be running. "
"Stop all sessions and, if needed, delete %s to continue.", oldbr, oldbr
)
return True
return False
def shutdown(self):
@ -139,21 +133,26 @@ class CtrlNet(LxBrNet):
"""
if self.serverintf is not None:
try:
subprocess.check_call([constants.BRCTL_BIN, "delif", self.brname, self.serverintf])
except subprocess.CalledProcessError:
logger.exception("Error deleting server interface %s to controlnet bridge %s",
self.serverintf, self.brname)
utils.check_cmd([constants.BRCTL_BIN, "delif", self.brname, self.serverintf])
except CoreCommandError:
logger.exception("error deleting server interface %s from bridge %s", self.serverintf, self.brname)
if self.updown_script is not None:
logger.info("interface %s updown script (%s shutdown) called" % (self.brname, self.updown_script))
subprocess.check_call([self.updown_script, self.brname, "shutdown"])
try:
logger.info("interface %s updown script (%s shutdown) called", self.brname, self.updown_script)
utils.check_cmd([self.updown_script, self.brname, "shutdown"])
except CoreCommandError:
logger.exception("error issuing shutdown script shutdown")
LxBrNet.shutdown(self)
def all_link_data(self, flags):
"""
Do not include CtrlNet in link messages describing this session.
:return: nothing
:param flags: message flags
:return: list of link data
:rtype: list[core.data.LinkData]
"""
return []
@ -175,29 +174,36 @@ class PtpNet(LxBrNet):
"""
Attach a network interface, but limit attachment to two interfaces.
:param core.coreobj.PyCoreNetIf netif: network interface
:param core.netns.vif.VEth netif: network interface
:return: nothing
"""
if len(self._netif) >= 2:
raise ValueError("Point-to-point links support at most 2 network interfaces")
LxBrNet.attach(self, netif)
def data(self, message_type):
def data(self, message_type, lat=None, lon=None, alt=None):
"""
Do not generate a Node Message for point-to-point links. They are
built using a link message instead.
:return: nothing
:param message_type: purpose for the data object we are creating
:param float lat: latitude
:param float lon: longitude
:param float alt: altitude
:return: node data object
:rtype: core.data.NodeData
"""
pass
return None
def all_link_data(self, flags):
"""
Build CORE API TLVs for a point-to-point link. One Link message
describes this network.
:return: all link data
:rtype: list[LinkData]
:param flags: message flags
:return: list of link data
:rtype: list[core.data.LinkData]
"""
all_links = []
@ -321,10 +327,13 @@ class HubNode(LxBrNet):
:param int objid: node id
:param str name: node namee
:param bool start: start flag
:raises CoreCommandError: when there is a command exception
"""
LxBrNet.__init__(self, session, objid, name, start)
# TODO: move to startup method
if start:
subprocess.check_call([constants.BRCTL_BIN, "setageing", self.brname, "0"])
utils.check_cmd([constants.BRCTL_BIN, "setageing", self.brname, "0"])
class WlanNode(LxBrNet):
@ -356,7 +365,7 @@ class WlanNode(LxBrNet):
"""
Attach a network interface.
:param core.coreobj.PyCoreNetIf netif: network interface
:param core.netns.vif.VEth netif: network interface
:return: nothing
"""
LxBrNet.attach(self, netif)
@ -367,7 +376,6 @@ class WlanNode(LxBrNet):
x, y, z = netif.node.position.get()
# invokes any netif.poshook
netif.setposition(x, y, z)
# self.model.setlinkparams()
def setmodel(self, model, config):
"""
@ -401,25 +409,28 @@ class WlanNode(LxBrNet):
logger.info("updating model %s" % model_name)
if self.model is None or self.model.name != model_name:
return
model = self.model
if model.config_type == RegisterTlvs.WIRELESS.value:
if not model.updateconfig(values):
return
if self.model.position_callback:
for netif in self.netifs():
netif.poshook = self.model.position_callback
if netif.node is not None:
(x, y, z) = netif.node.position.get()
netif.poshook(netif, x, y, z)
self.model.setlinkparams()
def all_link_data(self, flags):
"""
Retrieve all link data.
:param flags: link flags
:return: all link data
:rtype: list[LinkData]
:param flags: message flags
:return: list of link data
:rtype: list[core.data.LinkData]
"""
all_links = LxBrNet.all_link_data(self, flags)
@ -449,7 +460,6 @@ class RJ45Node(PyCoreNode, PyCoreNetIf):
:return:
"""
PyCoreNode.__init__(self, session, objid, name, start=start)
# this initializes net, params, poshook
PyCoreNetIf.__init__(self, node=self, name=name, mtu=mtu)
self.up = False
self.lock = threading.RLock()
@ -457,6 +467,9 @@ class RJ45Node(PyCoreNode, PyCoreNetIf):
# the following are PyCoreNetIf attributes
self.transport_type = "raw"
self.localname = name
self.old_up = False
self.old_addrs = []
if start:
self.startup()
@ -465,15 +478,12 @@ class RJ45Node(PyCoreNode, PyCoreNetIf):
Set the interface in the up state.
:return: nothing
:raises CoreCommandError: when there is a command exception
"""
# interface will also be marked up during net.attach()
self.savestate()
try:
subprocess.check_call([constants.IP_BIN, "link", "set", self.localname, "up"])
self.up = True
except subprocess.CalledProcessError:
logger.exception("failed to run command: %s link set %s up", constants.IP_BIN, self.localname)
utils.check_cmd([constants.IP_BIN, "link", "set", self.localname, "up"])
self.up = True
def shutdown(self):
"""
@ -484,9 +494,14 @@ class RJ45Node(PyCoreNode, PyCoreNetIf):
"""
if not self.up:
return
subprocess.check_call([constants.IP_BIN, "link", "set", self.localname, "down"])
subprocess.check_call([constants.IP_BIN, "addr", "flush", "dev", self.localname])
utils.mutecall([constants.TC_BIN, "qdisc", "del", "dev", self.localname, "root"])
try:
utils.check_cmd([constants.IP_BIN, "link", "set", self.localname, "down"])
utils.check_cmd([constants.IP_BIN, "addr", "flush", "dev", self.localname])
utils.check_cmd([constants.TC_BIN, "qdisc", "del", "dev", self.localname, "root"])
except CoreCommandError:
logger.exception("error shutting down")
self.up = False
self.restorestate()
@ -500,6 +515,7 @@ class RJ45Node(PyCoreNode, PyCoreNetIf):
"""
PyCoreNetIf.attachnet(self, net)
# TODO: issue in that both classes inherited from provide the same method with different signatures
def detachnet(self):
"""
Detach a network.
@ -519,7 +535,9 @@ class RJ45Node(PyCoreNode, PyCoreNetIf):
:param str hwaddr: hardware address
:param int ifindex: interface index
:param str ifname: interface name
:return:
:return: interface index
:rtype: int
:raises ValueError: when an interface has already been created, one max
"""
with self.lock:
if ifindex is None:
@ -537,7 +555,7 @@ class RJ45Node(PyCoreNode, PyCoreNetIf):
self.attachnet(net)
if addrlist:
for addr in utils.maketuple(addrlist):
for addr in utils.make_tuple(addrlist):
self.addaddr(addr)
return ifindex
@ -552,14 +570,12 @@ class RJ45Node(PyCoreNode, PyCoreNetIf):
if ifindex is None:
ifindex = 0
if ifindex not in self._netif:
raise ValueError, "ifindex %s does not exist" % ifindex
self._netif.pop(ifindex)
if ifindex == self.ifindex:
self.shutdown()
else:
raise ValueError, "ifindex %s does not exist" % ifindex
raise ValueError("ifindex %s does not exist" % ifindex)
def netif(self, ifindex, net=None):
"""
@ -602,9 +618,11 @@ class RJ45Node(PyCoreNode, PyCoreNetIf):
:param str addr: address to add
:return: nothing
:raises CoreCommandError: when there is a command exception
"""
if self.up:
subprocess.check_call([constants.IP_BIN, "addr", "add", str(addr), "dev", self.name])
utils.check_cmd([constants.IP_BIN, "addr", "add", str(addr), "dev", self.name])
PyCoreNetIf.addaddr(self, addr)
def deladdr(self, addr):
@ -613,9 +631,11 @@ class RJ45Node(PyCoreNode, PyCoreNetIf):
:param str addr: address to delete
:return: nothing
:raises CoreCommandError: when there is a command exception
"""
if self.up:
subprocess.check_call([constants.IP_BIN, "addr", "del", str(addr), "dev", self.name])
utils.check_cmd([constants.IP_BIN, "addr", "del", str(addr), "dev", self.name])
PyCoreNetIf.deladdr(self, addr)
def savestate(self):
@ -624,23 +644,17 @@ class RJ45Node(PyCoreNode, PyCoreNetIf):
interface for emulation purposes. TODO: save/restore the PROMISC flag
:return: nothing
:raises CoreCommandError: when there is a command exception
"""
self.old_up = False
self.old_addrs = []
cmd = [constants.IP_BIN, "addr", "show", "dev", self.localname]
try:
tmp = subprocess.Popen(cmd, stdout=subprocess.PIPE)
except OSError:
logger.exception("Failed to run %s command: %s", constants.IP_BIN, cmd)
if tmp.wait():
logger.warn("Command failed: %s", cmd)
return
lines = tmp.stdout.read()
tmp.stdout.close()
for l in lines.split("\n"):
items = l.split()
args = [constants.IP_BIN, "addr", "show", "dev", self.localname]
output = utils.check_cmd(args)
for line in output.split("\n"):
items = line.split()
if len(items) < 2:
continue
if items[1] == "%s:" % self.localname:
flags = items[2][1:-1].split(",")
if "UP" in flags:
@ -657,24 +671,71 @@ class RJ45Node(PyCoreNode, PyCoreNetIf):
Restore the addresses and other interface state after using it.
:return: nothing
:raises CoreCommandError: when there is a command exception
"""
for addr in self.old_addrs:
if addr[1] is None:
subprocess.check_call([constants.IP_BIN, "addr", "add", addr[0], "dev", self.localname])
utils.check_cmd([constants.IP_BIN, "addr", "add", addr[0], "dev", self.localname])
else:
subprocess.check_call([constants.IP_BIN, "addr", "add", addr[0], "brd", addr[1], "dev", self.localname])
utils.check_cmd([constants.IP_BIN, "addr", "add", addr[0], "brd", addr[1], "dev", self.localname])
if self.old_up:
subprocess.check_call([constants.IP_BIN, "link", "set", self.localname, "up"])
utils.check_cmd([constants.IP_BIN, "link", "set", self.localname, "up"])
def setposition(self, x=None, y=None, z=None):
"""
Use setposition() from both parent classes.
Uses setposition from both parent classes.
:return: nothing
:param float x: x position
:param float y: y position
:param float z: z position
:return: True if position changed, False otherwise
:rtype: bool
"""
PyCoreObj.setposition(self, x, y, z)
# invoke any poshook
result = PyCoreObj.setposition(self, x, y, z)
PyCoreNetIf.setposition(self, x, y, z)
return result
def check_cmd(self, args):
"""
Runs shell command on node.
:param list[str]|str args: command to run
:return: exist status and combined stdout and stderr
:rtype: tuple[int, str]
:raises CoreCommandError: when a non-zero exit status occurs
"""
raise NotImplementedError
def cmd(self, args, wait=True):
"""
Runs shell command on node, with option to not wait for a result.
:param list[str]|str args: command to run
:param bool wait: wait for command to exit, defaults to True
:return: exit status for command
:rtype: int
"""
raise NotImplementedError
def cmd_output(self, args):
"""
Runs shell command on node and get exit status and output.
:param list[str]|str args: command to run
:return: exit status and combined stdout and stderr
:rtype: tuple[int, str]
"""
raise NotImplementedError
def termcmdstring(self, sh):
"""
Create a terminal command string.
:param str sh: shell to execute command in
:return: str
"""
raise NotImplementedError
class TunnelNode(GreTapBridge):

View file

@ -3,11 +3,11 @@ TODO: probably goes away, or implement the usage of "unshare", or docker formal.
"""
import socket
import subprocess
import threading
from socket import AF_INET
from socket import AF_INET6
from core import CoreCommandError
from core import constants
from core import logger
from core.coreobj import PyCoreNet
@ -36,12 +36,9 @@ utils.check_executables([
def ebtables_commands(call, commands):
ebtables_lock.acquire()
try:
with ebtables_lock:
for command in commands:
call(command)
finally:
ebtables_lock.release()
class OvsNet(PyCoreNet):
@ -81,24 +78,23 @@ class OvsNet(PyCoreNet):
ebtables_queue.startupdateloop(self)
def startup(self):
try:
subprocess.check_call([constants.OVS_BIN, "add-br", self.bridge_name])
except subprocess.CalledProcessError:
logger.exception("error adding bridge")
"""
try:
# turn off spanning tree protocol and forwarding delay
# TODO: appears stp and rstp are off by default, make sure this always holds true
# TODO: apears ovs only supports rstp forward delay and again it"s off by default
subprocess.check_call([constants.IP_BIN, "link", "set", self.bridge_name, "up"])
:return:
:raises CoreCommandError: when there is a command exception
"""
utils.check_cmd([constants.OVS_BIN, "add-br", self.bridge_name])
# create a new ebtables chain for this bridge
ebtables_commands(subprocess.check_call, [
[constants.EBTABLES_BIN, "-N", self.bridge_name, "-P", self.policy],
[constants.EBTABLES_BIN, "-A", "FORWARD", "--logical-in", self.bridge_name, "-j", self.bridge_name]
])
except subprocess.CalledProcessError:
logger.exception("Error setting bridge parameters")
# turn off spanning tree protocol and forwarding delay
# TODO: appears stp and rstp are off by default, make sure this always holds true
# TODO: apears ovs only supports rstp forward delay and again it's off by default
utils.check_cmd([constants.IP_BIN, "link", "set", self.bridge_name, "up"])
# create a new ebtables chain for this bridge
ebtables_commands(utils.check_cmd, [
[constants.EBTABLES_BIN, "-N", self.bridge_name, "-P", self.policy],
[constants.EBTABLES_BIN, "-A", "FORWARD", "--logical-in", self.bridge_name, "-j", self.bridge_name]
])
self.up = True
@ -109,16 +105,18 @@ class OvsNet(PyCoreNet):
ebtables_queue.stopupdateloop(self)
utils.mutecall([constants.IP_BIN, "link", "set", self.bridge_name, "down"])
utils.mutecall([constants.OVS_BIN, "del-br", self.bridge_name])
ebtables_commands(utils.mutecall, [
[constants.EBTABLES_BIN, "-D", "FORWARD", "--logical-in", self.bridge_name, "-j", self.bridge_name],
[constants.EBTABLES_BIN, "-X", self.bridge_name]
])
try:
utils.check_cmd([constants.IP_BIN, "link", "set", self.bridge_name, "down"])
utils.check_cmd([constants.OVS_BIN, "del-br", self.bridge_name])
ebtables_commands(utils.check_cmd, [
[constants.EBTABLES_BIN, "-D", "FORWARD", "--logical-in", self.bridge_name, "-j", self.bridge_name],
[constants.EBTABLES_BIN, "-X", self.bridge_name]
])
except CoreCommandError:
logger.exception("error bringing bridge down and removing it")
# removes veth pairs used for bridge-to-bridge connections
for interface in self.netifs():
# removes veth pairs used for bridge-to-bridge connections
interface.shutdown()
self._netif.clear()
@ -128,22 +126,14 @@ class OvsNet(PyCoreNet):
def attach(self, interface):
if self.up:
try:
subprocess.check_call([constants.OVS_BIN, "add-port", self.bridge_name, interface.localname])
subprocess.check_call([constants.IP_BIN, "link", "set", interface.localname, "up"])
except subprocess.CalledProcessError:
logger.exception("error joining interface %s to bridge %s", interface.localname, self.bridge_name)
return
utils.check_cmd([constants.OVS_BIN, "add-port", self.bridge_name, interface.localname])
utils.check_cmd([constants.IP_BIN, "link", "set", interface.localname, "up"])
PyCoreNet.attach(self, interface)
def detach(self, interface):
if self.up:
try:
subprocess.check_call([constants.OVS_BIN, "del-port", self.bridge_name, interface.localname])
except subprocess.CalledProcessError:
logger.exception("error removing interface %s from bridge %s", interface.localname, self.bridge_name)
return
utils.check_cmd([constants.OVS_BIN, "del-port", self.bridge_name, interface.localname])
PyCoreNet.detach(self, interface)
@ -217,14 +207,14 @@ class OvsNet(PyCoreNet):
limit = 0xffff # max IP payload
tbf = ["tbf", "rate", str(bw), "burst", str(burst), "limit", str(limit)]
logger.info("linkconfig: %s" % [tc + parent + ["handle", "1:"] + tbf])
subprocess.check_call(tc + parent + ["handle", "1:"] + tbf)
utils.check_cmd(tc + parent + ["handle", "1:"] + tbf)
interface.setparam("has_tbf", True)
elif interface.getparam("has_tbf") and bw <= 0:
tcd = [] + tc
tcd[2] = "delete"
if self.up:
subprocess.check_call(tcd + parent)
utils.check_cmd(tcd + parent)
interface.setparam("has_tbf", False)
# removing the parent removes the child
@ -258,10 +248,10 @@ class OvsNet(PyCoreNet):
if jitter is not None:
netem += ["%sus" % jitter, "25%"]
if loss is not None:
if loss is not None and loss > 0:
netem += ["loss", "%s%%" % min(loss, 100)]
if duplicate is not None:
if duplicate is not None and duplicate > 0:
netem += ["duplicate", "%s%%" % min(duplicate, 100)]
if delay <= 0 and jitter <= 0 and loss <= 0 and duplicate <= 0:
@ -273,12 +263,12 @@ class OvsNet(PyCoreNet):
if self.up:
logger.info("linkconfig: %s" % ([tc + parent + ["handle", "10:"]],))
subprocess.check_call(tc + parent + ["handle", "10:"])
utils.check_cmd(tc + parent + ["handle", "10:"])
interface.setparam("has_netem", False)
elif len(netem) > 1:
if self.up:
logger.info("linkconfig: %s" % ([tc + parent + ["handle", "10:"] + netem],))
subprocess.check_call(tc + parent + ["handle", "10:"] + netem)
utils.check_cmd(tc + parent + ["handle", "10:"] + netem)
interface.setparam("has_netem", True)
def linknet(self, network):
@ -312,8 +302,8 @@ class OvsNet(PyCoreNet):
if network.up:
# this is similar to net.attach() but uses netif.name instead
# of localname
subprocess.check_call([constants.OVS_BIN, "add-port", network.bridge_name, interface.name])
subprocess.check_call([constants.IP_BIN, "link", "set", interface.name, "up"])
utils.check_cmd([constants.OVS_BIN, "add-port", network.bridge_name, interface.name])
utils.check_cmd([constants.IP_BIN, "link", "set", interface.name, "up"])
# TODO: is there a native method for this? see if this causes issues
# i = network.newifindex()
@ -346,10 +336,7 @@ class OvsNet(PyCoreNet):
return
for address in addresses:
try:
subprocess.check_call([constants.IP_BIN, "addr", "add", str(address), "dev", self.bridge_name])
except subprocess.CalledProcessError:
logger.exception("error adding IP address")
utils.check_cmd([constants.IP_BIN, "addr", "add", str(address), "dev", self.bridge_name])
class OvsCtrlNet(OvsNet):
@ -390,23 +377,19 @@ class OvsCtrlNet(OvsNet):
if self.updown_script:
logger.info("interface %s updown script %s startup called" % (self.bridge_name, self.updown_script))
subprocess.check_call([self.updown_script, self.bridge_name, "startup"])
utils.check_cmd([self.updown_script, self.bridge_name, "startup"])
if self.serverintf:
try:
subprocess.check_call([constants.OVS_BIN, "add-port", self.bridge_name, self.serverintf])
subprocess.check_call([constants.IP_BIN, "link", "set", self.serverintf, "up"])
except subprocess.CalledProcessError:
logger.exception("error joining server interface %s to controlnet bridge %s",
self.serverintf, self.bridge_name)
utils.check_cmd([constants.OVS_BIN, "add-port", self.bridge_name, self.serverintf])
utils.check_cmd([constants.IP_BIN, "link", "set", self.serverintf, "up"])
def detectoldbridge(self):
"""
Occassionally, control net bridges from previously closed sessions are not cleaned up.
Occasionally, control net bridges from previously closed sessions are not cleaned up.
Check if there are old control net bridges and delete them
"""
status, output = utils.cmdresult([constants.OVS_BIN, "list-br"])
output = utils.check_cmd([constants.OVS_BIN, "list-br"])
output = output.strip()
if output:
for line in output.split("\n"):
@ -420,14 +403,17 @@ class OvsCtrlNet(OvsNet):
def shutdown(self):
if self.serverintf:
try:
subprocess.check_call([constants.OVS_BIN, "del-port", self.bridge_name, self.serverintf])
except subprocess.CalledProcessError:
logger.exception("Error deleting server interface %s to controlnet bridge %s",
utils.check_cmd([constants.OVS_BIN, "del-port", self.bridge_name, self.serverintf])
except CoreCommandError:
logger.exception("error deleting server interface %s to controlnet bridge %s",
self.serverintf, self.bridge_name)
if self.updown_script:
logger.info("interface %s updown script (%s shutdown) called", self.bridge_name, self.updown_script)
subprocess.check_call([self.updown_script, self.bridge_name, "shutdown"])
try:
logger.info("interface %s updown script (%s shutdown) called", self.bridge_name, self.updown_script)
utils.check_cmd([self.updown_script, self.bridge_name, "shutdown"])
except CoreCommandError:
logger.exception("error during updown script shutdown")
OvsNet.shutdown(self)
@ -446,12 +432,12 @@ class OvsPtpNet(OvsNet):
raise ValueError("point-to-point links support at most 2 network interfaces")
OvsNet.attach(self, interface)
def data(self, message_type):
def data(self, message_type, lat=None, lon=None, alt=None):
"""
Do not generate a Node Message for point-to-point links. They are
built using a link message instead.
"""
pass
return None
def all_link_data(self, flags):
"""
@ -576,7 +562,7 @@ class OvsHubNode(OvsNet):
if start:
# TODO: verify that the below flow accomplishes what is desired for a "HUB"
# TODO: replace "brctl setageing 0"
subprocess.check_call([constants.OVS_FLOW_BIN, "add-flow", self.bridge_name, "action=flood"])
utils.check_cmd([constants.OVS_FLOW_BIN, "add-flow", self.bridge_name, "action=flood"])
class OvsWlanNode(OvsNet):
@ -686,8 +672,8 @@ class OvsGreTapBridge(OvsNet):
if remoteip is None:
self.gretap = None
else:
self.gretap = GreTap(node=self, name=None, session=session, remoteip=remoteip,
objid=None, localip=localip, ttl=ttl, key=self.grekey)
self.gretap = GreTap(node=self, session=session, remoteip=remoteip,
localip=localip, ttl=ttl, key=self.grekey)
if start:
self.startup()
@ -727,7 +713,7 @@ class OvsGreTapBridge(OvsNet):
if len(addresses) > 1:
localip = addresses[1].split("/")[0]
self.gretap = GreTap(session=self.session, remoteip=remoteip, objid=None, name=None,
self.gretap = GreTap(session=self.session, remoteip=remoteip,
localip=localip, ttl=self.ttl, key=self.grekey)
self.attach(self.gretap)

View file

@ -2,9 +2,9 @@
virtual ethernet classes that implement the interfaces available under Linux.
"""
import subprocess
import time
from core import CoreCommandError
from core import constants
from core import logger
from core.coreobj import PyCoreNetIf
@ -25,13 +25,13 @@ class VEth(PyCoreNetIf):
"""
Creates a VEth instance.
:param core.netns.nodes.CoreNode node: related core node
:param core.netns.vnode.SimpleLxcNode node: related core node
:param str name: interface name
:param str localname: interface local name
:param mtu: interface mtu
:param net: network
:param bool start: start flag
:return:
:raises CoreCommandError: when there is a command exception
"""
# note that net arg is ignored
PyCoreNetIf.__init__(self, node=node, name=name, mtu=mtu)
@ -45,10 +45,11 @@ class VEth(PyCoreNetIf):
Interface startup logic.
:return: nothing
:raises CoreCommandError: when there is a command exception
"""
subprocess.check_call([constants.IP_BIN, "link", "add", "name", self.localname,
"type", "veth", "peer", "name", self.name])
subprocess.check_call([constants.IP_BIN, "link", "set", self.localname, "up"])
utils.check_cmd([constants.IP_BIN, "link", "add", "name", self.localname,
"type", "veth", "peer", "name", self.name])
utils.check_cmd([constants.IP_BIN, "link", "set", self.localname, "up"])
self.up = True
def shutdown(self):
@ -59,10 +60,19 @@ class VEth(PyCoreNetIf):
"""
if not self.up:
return
if self.node:
self.node.cmd([constants.IP_BIN, "-6", "addr", "flush", "dev", self.name])
try:
self.node.check_cmd([constants.IP_BIN, "-6", "addr", "flush", "dev", self.name])
except CoreCommandError:
logger.exception("error shutting down interface")
if self.localname:
utils.mutedetach([constants.IP_BIN, "link", "delete", self.localname])
try:
utils.check_cmd([constants.IP_BIN, "link", "delete", self.localname])
except CoreCommandError:
logger.exception("error deleting link")
self.up = False
@ -76,7 +86,7 @@ class TunTap(PyCoreNetIf):
"""
Create a TunTap instance.
:param core.netns.nodes.CoreNode node: related core node
:param core.netns.vnode.SimpleLxcNode node: related core node
:param str name: interface name
:param str localname: local interface name
:param mtu: interface mtu
@ -98,10 +108,10 @@ class TunTap(PyCoreNetIf):
"""
# TODO: more sophisticated TAP creation here
# Debian does not support -p (tap) option, RedHat does.
# For now, this is disabled to allow the TAP to be created by another
# system (e.g. EMANE"s emanetransportd)
# check_call(["tunctl", "-t", self.name])
# self.install()
# For now, this is disabled to allow the TAP to be created by another
# system (e.g. EMANE"s emanetransportd)
# check_call(["tunctl", "-t", self.name])
# self.install()
self.up = True
def shutdown(self):
@ -112,9 +122,12 @@ class TunTap(PyCoreNetIf):
"""
if not self.up:
return
self.node.cmd([constants.IP_BIN, "-6", "addr", "flush", "dev", self.name])
# if self.name:
# mutedetach(["tunctl", "-d", self.localname])
try:
self.node.check_cmd([constants.IP_BIN, "-6", "addr", "flush", "dev", self.name])
except CoreCommandError:
logger.exception("error shutting down tunnel tap")
self.up = False
def waitfor(self, func, attempts=10, maxretrydelay=0.25):
@ -124,26 +137,29 @@ class TunTap(PyCoreNetIf):
:param func: function to wait for a result of zero
:param int attempts: number of attempts to wait for a zero result
:param float maxretrydelay: maximum retry delay
:return: nothing
:return: True if wait succeeded, False otherwise
:rtype: bool
"""
delay = 0.01
result = False
for i in xrange(1, attempts + 1):
r = func()
if r == 0:
return
result = True
break
msg = "attempt %s failed with nonzero exit status %s" % (i, r)
if i < attempts + 1:
msg += ", retrying..."
logger.info(msg)
time.sleep(delay)
delay = delay + delay
delay += delay
if delay > maxretrydelay:
delay = maxretrydelay
else:
msg += ", giving up"
logger.info(msg)
raise RuntimeError("command failed after %s attempts" % attempts)
return result
def waitfordevicelocal(self):
"""
@ -155,8 +171,8 @@ class TunTap(PyCoreNetIf):
"""
def localdevexists():
cmd = (constants.IP_BIN, "link", "show", self.localname)
return utils.mutecall(cmd)
args = [constants.IP_BIN, "link", "show", self.localname]
return utils.cmd(args)
self.waitfor(localdevexists)
@ -168,23 +184,25 @@ class TunTap(PyCoreNetIf):
"""
def nodedevexists():
cmd = (constants.IP_BIN, "link", "show", self.name)
return self.node.cmd(cmd)
args = [constants.IP_BIN, "link", "show", self.name]
return self.node.cmd(args)
count = 0
while True:
try:
self.waitfor(nodedevexists)
result = self.waitfor(nodedevexists)
if result:
break
except RuntimeError as e:
# check if this is an EMANE interface; if so, continue
# waiting if EMANE is still running
# TODO: remove emane code
if count < 5 and nodeutils.is_node(self.net, NodeTypes.EMANE) and \
self.node.session.emane.emanerunning(self.node):
count += 1
else:
raise e
# check if this is an EMANE interface; if so, continue
# waiting if EMANE is still running
# TODO: remove emane code
should_retry = count < 5
is_emane_node = nodeutils.is_node(self.net, NodeTypes.EMANE)
is_emane_running = self.node.session.emane.emanerunning(self.node)
if all([should_retry, is_emane_node, is_emane_running]):
count += 1
else:
raise RuntimeError("node device failed to exist")
def install(self):
"""
@ -194,20 +212,13 @@ class TunTap(PyCoreNetIf):
end of the TAP.
:return: nothing
:raises CoreCommandError: when there is a command exception
"""
self.waitfordevicelocal()
netns = str(self.node.pid)
try:
subprocess.check_call([constants.IP_BIN, "link", "set", self.localname, "netns", netns])
except subprocess.CalledProcessError:
msg = "error installing TAP interface %s, command:" % self.localname
msg += "ip link set %s netns %s" % (self.localname, netns)
logger.exception(msg)
return
self.node.cmd([constants.IP_BIN, "link", "set", self.localname, "name", self.name])
self.node.cmd([constants.IP_BIN, "link", "set", self.name, "up"])
utils.check_cmd([constants.IP_BIN, "link", "set", self.localname, "netns", netns])
self.node.check_cmd([constants.IP_BIN, "link", "set", self.localname, "name", self.name])
self.node.check_cmd([constants.IP_BIN, "link", "set", self.name, "up"])
def setaddrs(self):
"""
@ -217,7 +228,7 @@ class TunTap(PyCoreNetIf):
"""
self.waitfordevicenode()
for addr in self.addrlist:
self.node.cmd([constants.IP_BIN, "addr", "add", str(addr), "dev", self.name])
self.node.check_cmd([constants.IP_BIN, "addr", "add", str(addr), "dev", self.name])
class GreTap(PyCoreNetIf):
@ -233,7 +244,7 @@ class GreTap(PyCoreNetIf):
"""
Creates a GreTap instance.
:param core.netns.nodes.CoreNode node: related core node
:param core.netns.vnode.SimpleLxcNode node: related core node
:param str name: interface name
:param core.session.Session session: core session instance
:param mtu: interface mtu
@ -243,6 +254,7 @@ class GreTap(PyCoreNetIf):
:param ttl: ttl value
:param key: gre tap key
:param bool start: start flag
:raises CoreCommandError: when there is a command exception
"""
PyCoreNetIf.__init__(self, node=node, name=name, mtu=mtu)
self.session = session
@ -260,17 +272,17 @@ class GreTap(PyCoreNetIf):
if remoteip is None:
raise ValueError, "missing remote IP required for GRE TAP device"
cmd = ("ip", "link", "add", self.localname, "type", "gretap",
"remote", str(remoteip))
args = ["ip", "link", "add", self.localname, "type", "gretap",
"remote", str(remoteip)]
if localip:
cmd += ("local", str(localip))
args += ["local", str(localip)]
if ttl:
cmd += ("ttl", str(ttl))
args += ["ttl", str(ttl)]
if key:
cmd += ("key", str(key))
subprocess.check_call(cmd)
cmd = ("ip", "link", "set", self.localname, "up")
subprocess.check_call(cmd)
args += ["key", str(key)]
utils.check_cmd(args)
args = ["ip", "link", "set", self.localname, "up"]
utils.check_cmd(args)
self.up = True
def shutdown(self):
@ -280,10 +292,14 @@ class GreTap(PyCoreNetIf):
:return: nothing
"""
if self.localname:
cmd = ("ip", "link", "set", self.localname, "down")
subprocess.check_call(cmd)
cmd = ("ip", "link", "del", self.localname)
subprocess.check_call(cmd)
try:
args = ["ip", "link", "set", self.localname, "down"]
utils.check_cmd(args)
args = ["ip", "link", "del", self.localname]
utils.check_cmd(args)
except CoreCommandError:
logger.exception("error during shutdown")
self.localname = None
def data(self, message_type):

View file

@ -4,10 +4,10 @@ Linux Ethernet bridging and ebtables rules.
"""
import os
import subprocess
import threading
import time
from core import CoreCommandError
from core import constants
from core import logger
from core.coreobj import PyCoreNet
@ -59,11 +59,12 @@ class EbtablesQueue(object):
:return: nothing
"""
self.updatelock.acquire()
self.last_update_time[wlan] = time.time()
self.updatelock.release()
with self.updatelock:
self.last_update_time[wlan] = time.time()
if self.doupdateloop:
return
self.doupdateloop = True
self.updatethread = threading.Thread(target=self.updateloop)
self.updatethread.daemon = True
@ -75,15 +76,15 @@ class EbtablesQueue(object):
:return: nothing
"""
self.updatelock.acquire()
try:
del self.last_update_time[wlan]
except KeyError:
logger.exception("error deleting last update time for wlan, ignored before: %s", wlan)
with self.updatelock:
try:
del self.last_update_time[wlan]
except KeyError:
logger.exception("error deleting last update time for wlan, ignored before: %s", wlan)
self.updatelock.release()
if len(self.last_update_time) > 0:
return
self.doupdateloop = False
if self.updatethread:
self.updatethread.join()
@ -137,25 +138,26 @@ class EbtablesQueue(object):
:return: nothing
"""
while self.doupdateloop:
self.updatelock.acquire()
for wlan in self.updates:
"""
Check if wlan is from a previously closed session. Because of the
rate limiting scheme employed here, this may happen if a new session
is started soon after closing a previous session.
"""
try:
wlan.session
except:
# Just mark as updated to remove from self.updates.
self.updated(wlan)
continue
if self.lastupdate(wlan) > self.rate:
self.buildcmds(wlan)
# print "ebtables commit %d rules" % len(self.cmds)
self.ebcommit(wlan)
self.updated(wlan)
self.updatelock.release()
with self.updatelock:
for wlan in self.updates:
"""
Check if wlan is from a previously closed session. Because of the
rate limiting scheme employed here, this may happen if a new session
is started soon after closing a previous session.
"""
# TODO: if these are WlanNodes, this will never throw an exception
try:
wlan.session
except:
# Just mark as updated to remove from self.updates.
self.updated(wlan)
continue
if self.lastupdate(wlan) > self.rate:
self.buildcmds(wlan)
self.ebcommit(wlan)
self.updated(wlan)
time.sleep(self.rate)
def ebcommit(self, wlan):
@ -165,30 +167,23 @@ class EbtablesQueue(object):
:return: nothing
"""
# save kernel ebtables snapshot to a file
cmd = self.ebatomiccmd(["--atomic-save", ])
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
logger.exception("atomic-save (%s)", cmd)
# no atomic file, exit
return
args = self.ebatomiccmd(["--atomic-save", ])
utils.check_cmd(args)
# modify the table file using queued ebtables commands
for c in self.cmds:
cmd = self.ebatomiccmd(c)
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
logger.exception("cmd=%s", cmd)
args = self.ebatomiccmd(c)
utils.check_cmd(args)
self.cmds = []
# commit the table file to the kernel
cmd = self.ebatomiccmd(["--atomic-commit", ])
args = self.ebatomiccmd(["--atomic-commit", ])
utils.check_cmd(args)
try:
subprocess.check_call(cmd)
os.unlink(self.atomic_file)
except OSError:
logger.exception("atomic-commit (%s)", cmd)
logger.exception("error removing atomic file: %s", self.atomic_file)
def ebchange(self, wlan):
"""
@ -197,10 +192,9 @@ class EbtablesQueue(object):
:return: nothing
"""
self.updatelock.acquire()
if wlan not in self.updates:
self.updates.append(wlan)
self.updatelock.release()
with self.updatelock:
if wlan not in self.updates:
self.updates.append(wlan)
def buildcmds(self, wlan):
"""
@ -208,23 +202,22 @@ class EbtablesQueue(object):
:return: nothing
"""
wlan._linked_lock.acquire()
# flush the chain
self.cmds.extend([["-F", wlan.brname], ])
# rebuild the chain
for netif1, v in wlan._linked.items():
for netif2, linked in v.items():
if wlan.policy == "DROP" and linked:
self.cmds.extend([["-A", wlan.brname, "-i", netif1.localname,
"-o", netif2.localname, "-j", "ACCEPT"],
["-A", wlan.brname, "-o", netif1.localname,
"-i", netif2.localname, "-j", "ACCEPT"]])
elif wlan.policy == "ACCEPT" and not linked:
self.cmds.extend([["-A", wlan.brname, "-i", netif1.localname,
"-o", netif2.localname, "-j", "DROP"],
["-A", wlan.brname, "-o", netif1.localname,
"-i", netif2.localname, "-j", "DROP"]])
wlan._linked_lock.release()
with wlan._linked_lock:
# flush the chain
self.cmds.extend([["-F", wlan.brname], ])
# rebuild the chain
for netif1, v in wlan._linked.items():
for netif2, linked in v.items():
if wlan.policy == "DROP" and linked:
self.cmds.extend([["-A", wlan.brname, "-i", netif1.localname,
"-o", netif2.localname, "-j", "ACCEPT"],
["-A", wlan.brname, "-o", netif1.localname,
"-i", netif2.localname, "-j", "ACCEPT"]])
elif wlan.policy == "ACCEPT" and not linked:
self.cmds.extend([["-A", wlan.brname, "-i", netif1.localname,
"-o", netif2.localname, "-j", "DROP"],
["-A", wlan.brname, "-o", netif1.localname,
"-i", netif2.localname, "-j", "DROP"]])
# a global object because all WLANs share the same queue
@ -241,8 +234,8 @@ def ebtablescmds(call, cmds):
:return: nothing
"""
with ebtables_lock:
for cmd in cmds:
call(cmd)
for args in cmds:
call(args)
class LxBrNet(PyCoreNet):
@ -279,28 +272,24 @@ class LxBrNet(PyCoreNet):
Linux bridge starup logic.
:return: nothing
:raises CoreCommandError: when there is a command exception
"""
try:
subprocess.check_call([constants.BRCTL_BIN, "addbr", self.brname])
except subprocess.CalledProcessError:
logger.exception("Error adding bridge")
utils.check_cmd([constants.BRCTL_BIN, "addbr", self.brname])
try:
# turn off spanning tree protocol and forwarding delay
subprocess.check_call([constants.BRCTL_BIN, "stp", self.brname, "off"])
subprocess.check_call([constants.BRCTL_BIN, "setfd", self.brname, "0"])
subprocess.check_call([constants.IP_BIN, "link", "set", self.brname, "up"])
# create a new ebtables chain for this bridge
ebtablescmds(subprocess.check_call, [
[constants.EBTABLES_BIN, "-N", self.brname, "-P", self.policy],
[constants.EBTABLES_BIN, "-A", "FORWARD", "--logical-in", self.brname, "-j", self.brname]
])
# turn off multicast snooping so mcast forwarding occurs w/o IGMP joins
snoop = "/sys/devices/virtual/net/%s/bridge/multicast_snooping" % self.brname
if os.path.exists(snoop):
open(snoop, "w").write("0")
except subprocess.CalledProcessError:
logger.exception("Error setting bridge parameters")
# turn off spanning tree protocol and forwarding delay
utils.check_cmd([constants.BRCTL_BIN, "stp", self.brname, "off"])
utils.check_cmd([constants.BRCTL_BIN, "setfd", self.brname, "0"])
utils.check_cmd([constants.IP_BIN, "link", "set", self.brname, "up"])
# create a new ebtables chain for this bridge
ebtablescmds(utils.check_cmd, [
[constants.EBTABLES_BIN, "-N", self.brname, "-P", self.policy],
[constants.EBTABLES_BIN, "-A", "FORWARD", "--logical-in", self.brname, "-j", self.brname]
])
# turn off multicast snooping so mcast forwarding occurs w/o IGMP joins
snoop = "/sys/devices/virtual/net/%s/bridge/multicast_snooping" % self.brname
if os.path.exists(snoop):
with open(snoop, "w") as snoop_file:
snoop_file.write("0")
self.up = True
@ -312,35 +301,40 @@ class LxBrNet(PyCoreNet):
"""
if not self.up:
return
ebq.stopupdateloop(self)
utils.mutecall([constants.IP_BIN, "link", "set", self.brname, "down"])
utils.mutecall([constants.BRCTL_BIN, "delbr", self.brname])
ebtablescmds(utils.mutecall, [
[constants.EBTABLES_BIN, "-D", "FORWARD",
"--logical-in", self.brname, "-j", self.brname],
[constants.EBTABLES_BIN, "-X", self.brname]])
try:
utils.check_cmd([constants.IP_BIN, "link", "set", self.brname, "down"])
utils.check_cmd([constants.BRCTL_BIN, "delbr", self.brname])
ebtablescmds(utils.check_cmd, [
[constants.EBTABLES_BIN, "-D", "FORWARD", "--logical-in", self.brname, "-j", self.brname],
[constants.EBTABLES_BIN, "-X", self.brname]
])
except CoreCommandError:
logger.exception("error during shutdown")
# removes veth pairs used for bridge-to-bridge connections
for netif in self.netifs():
# removes veth pairs used for bridge-to-bridge connections
netif.shutdown()
self._netif.clear()
self._linked.clear()
del self.session
self.up = False
# TODO: this depends on a subtype with localname defined, seems like the wrong place for this to live
def attach(self, netif):
"""
Attach a network interface.
:param core.netns.vif.VEth netif: network interface to attach
:param core.netns.vnode.VEth netif: network interface to attach
:return: nothing
"""
if self.up:
try:
subprocess.check_call([constants.BRCTL_BIN, "addif", self.brname, netif.localname])
subprocess.check_call([constants.IP_BIN, "link", "set", netif.localname, "up"])
except subprocess.CalledProcessError:
logger.exception("Error joining interface %s to bridge %s", netif.localname, self.brname)
return
utils.check_cmd([constants.BRCTL_BIN, "addif", self.brname, netif.localname])
utils.check_cmd([constants.IP_BIN, "link", "set", netif.localname, "up"])
PyCoreNet.attach(self, netif)
def detach(self, netif):
@ -351,11 +345,8 @@ class LxBrNet(PyCoreNet):
:return: nothing
"""
if self.up:
try:
subprocess.check_call([constants.BRCTL_BIN, "delif", self.brname, netif.localname])
except subprocess.CalledProcessError:
logger.exception("Error removing interface %s from bridge %s", netif.localname, self.brname)
return
utils.check_cmd([constants.BRCTL_BIN, "delif", self.brname, netif.localname])
PyCoreNet.detach(self, netif)
def linked(self, netif1, netif2):
@ -396,12 +387,11 @@ class LxBrNet(PyCoreNet):
:param core.netns.vif.Veth netif2: interface two
:return: nothing
"""
self._linked_lock.acquire()
if not self.linked(netif1, netif2):
self._linked_lock.release()
return
self._linked[netif1][netif2] = False
self._linked_lock.release()
with self._linked_lock:
if not self.linked(netif1, netif2):
return
self._linked[netif1][netif2] = False
ebq.ebchange(self)
def link(self, netif1, netif2):
@ -413,12 +403,11 @@ class LxBrNet(PyCoreNet):
:param core.netns.vif.Veth netif2: interface two
:return: nothing
"""
self._linked_lock.acquire()
if self.linked(netif1, netif2):
self._linked_lock.release()
return
self._linked[netif1][netif2] = True
self._linked_lock.release()
with self._linked_lock:
if self.linked(netif1, netif2):
return
self._linked[netif1][netif2] = True
ebq.ebchange(self)
def linkconfig(self, netif, bw=None, delay=None, loss=None, duplicate=None,
@ -452,14 +441,14 @@ class LxBrNet(PyCoreNet):
if bw > 0:
if self.up:
logger.info("linkconfig: %s" % ([tc + parent + ["handle", "1:"] + tbf],))
subprocess.check_call(tc + parent + ["handle", "1:"] + tbf)
utils.check_cmd(tc + parent + ["handle", "1:"] + tbf)
netif.setparam("has_tbf", True)
changed = True
elif netif.getparam("has_tbf") and bw <= 0:
tcd = [] + tc
tcd[2] = "delete"
if self.up:
subprocess.check_call(tcd + parent)
utils.check_cmd(tcd + parent)
netif.setparam("has_tbf", False)
# removing the parent removes the child
netif.setparam("has_netem", False)
@ -486,9 +475,9 @@ class LxBrNet(PyCoreNet):
else:
netem += ["%sus" % jitter, "25%"]
if loss is not None:
if loss is not None and loss > 0:
netem += ["loss", "%s%%" % min(loss, 100)]
if duplicate is not None:
if duplicate is not None and duplicate > 0:
netem += ["duplicate", "%s%%" % min(duplicate, 100)]
if delay <= 0 and jitter <= 0 and loss <= 0 and duplicate <= 0:
# possibly remove netem if it exists and parent queue wasn't removed
@ -497,12 +486,12 @@ class LxBrNet(PyCoreNet):
tc[2] = "delete"
if self.up:
logger.info("linkconfig: %s" % ([tc + parent + ["handle", "10:"]],))
subprocess.check_call(tc + parent + ["handle", "10:"])
utils.check_cmd(tc + parent + ["handle", "10:"])
netif.setparam("has_netem", False)
elif len(netem) > 1:
if self.up:
logger.info("linkconfig: %s" % ([tc + parent + ["handle", "10:"] + netem],))
subprocess.check_call(tc + parent + ["handle", "10:"] + netem)
utils.check_cmd(tc + parent + ["handle", "10:"] + netem)
netif.setparam("has_netem", True)
def linknet(self, net):
@ -519,24 +508,27 @@ class LxBrNet(PyCoreNet):
self_objid = "%x" % self.objid
except TypeError:
self_objid = "%s" % self.objid
try:
net_objid = "%x" % net.objid
except TypeError:
net_objid = "%s" % net.objid
localname = "veth%s.%s.%s" % (self_objid, net_objid, sessionid)
if len(localname) >= 16:
raise ValueError("interface local name %s too long" % localname)
name = "veth%s.%s.%s" % (net_objid, self_objid, sessionid)
if len(name) >= 16:
raise ValueError("interface name %s too long" % name)
netif = VEth(node=None, name=name, localname=localname,
mtu=1500, net=self, start=self.up)
netif = VEth(node=None, name=name, localname=localname, mtu=1500, net=self, start=self.up)
self.attach(netif)
if net.up:
# this is similar to net.attach() but uses netif.name instead
# of localname
subprocess.check_call([constants.BRCTL_BIN, "addif", net.brname, netif.name])
subprocess.check_call([constants.IP_BIN, "link", "set", netif.name, "up"])
utils.check_cmd([constants.BRCTL_BIN, "addif", net.brname, netif.name])
utils.check_cmd([constants.IP_BIN, "link", "set", netif.name, "up"])
i = net.newifindex()
net._netif[i] = netif
with net._linked_lock:
@ -557,6 +549,7 @@ class LxBrNet(PyCoreNet):
for netif in self.netifs():
if hasattr(netif, "othernet") and netif.othernet == net:
return netif
return None
def addrconfig(self, addrlist):
@ -568,11 +561,9 @@ class LxBrNet(PyCoreNet):
"""
if not self.up:
return
for addr in addrlist:
try:
subprocess.check_call([constants.IP_BIN, "addr", "add", str(addr), "dev", self.brname])
except subprocess.CalledProcessError:
logger.exception("Error adding IP address")
utils.check_cmd([constants.IP_BIN, "addr", "add", str(addr), "dev", self.brname])
class GreTapBridge(LxBrNet):
@ -609,8 +600,8 @@ class GreTapBridge(LxBrNet):
if remoteip is None:
self.gretap = None
else:
self.gretap = GreTap(node=self, name=None, session=session, remoteip=remoteip,
objid=None, localip=localip, ttl=ttl, key=self.grekey)
self.gretap = GreTap(node=self, session=session, remoteip=remoteip,
localip=localip, ttl=ttl, key=self.grekey)
if start:
self.startup()
@ -652,7 +643,7 @@ class GreTapBridge(LxBrNet):
localip = None
if len(addrlist) > 1:
localip = addrlist[1].split("/")[0]
self.gretap = GreTap(session=self.session, remoteip=remoteip, objid=None, name=None,
self.gretap = GreTap(session=self.session, remoteip=remoteip,
localip=localip, ttl=self.ttl, key=self.grekey)
self.attach(self.gretap)

View file

@ -2,14 +2,15 @@
PyCoreNode and LxcNode classes that implement the network namespac virtual node.
"""
import errno
import os
import random
import shutil
import signal
import string
import subprocess
import threading
from core import CoreCommandError
from core import constants
from core import logger
from core.coreobj import PyCoreNetIf
@ -17,18 +18,29 @@ from core.coreobj import PyCoreNode
from core.enumerations import NodeTypes
from core.misc import nodeutils
from core.misc import utils
from core.misc.ipaddress import MacAddress
from core.netns import vnodeclient
from core.netns.vif import TunTap
from core.netns.vif import VEth
_DEFAULT_MTU = 1500
utils.check_executables([constants.IP_BIN])
class SimpleLxcNode(PyCoreNode):
"""
Provides simple lxc functionality for core nodes.
:var nodedir: str
:var ctrlchnlname: str
:var client: core.netns.vnodeclient.VnodeClient
:var pid: int
:var up: bool
:var lock: threading.RLock
:var _mounts: list[tuple[str, str]]
"""
valid_deladdrtype = ("inet", "inet6", "inet6link")
valid_address_types = {"inet", "inet6", "inet6link"}
def __init__(self, session, objid=None, name=None, nodedir=None, start=True):
"""
@ -43,7 +55,7 @@ class SimpleLxcNode(PyCoreNode):
PyCoreNode.__init__(self, session, objid, name, start=start)
self.nodedir = nodedir
self.ctrlchnlname = os.path.abspath(os.path.join(self.session.session_dir, self.name))
self.vnodeclient = None
self.client = None
self.pid = None
self.up = False
self.lock = threading.RLock()
@ -72,39 +84,37 @@ class SimpleLxcNode(PyCoreNode):
:return: nothing
"""
if self.up:
raise Exception("already up")
vnoded = ["%s/vnoded" % constants.CORE_SBIN_DIR, "-v", "-c", self.ctrlchnlname,
"-l", self.ctrlchnlname + ".log",
"-p", self.ctrlchnlname + ".pid"]
raise ValueError("starting a node that is already up")
# create a new namespace for this node using vnoded
vnoded = [
constants.VNODED_BIN,
"-v",
"-c", self.ctrlchnlname,
"-l", self.ctrlchnlname + ".log",
"-p", self.ctrlchnlname + ".pid"
]
if self.nodedir:
vnoded += ["-C", self.nodedir]
env = self.session.get_environment(state=False)
env["NODE_NUMBER"] = str(self.objid)
env["NODE_NAME"] = str(self.name)
try:
tmp = subprocess.Popen(vnoded, stdout=subprocess.PIPE, env=env)
except OSError:
msg = "error running vnoded command: %s" % vnoded
logger.exception("SimpleLxcNode.startup(): %s", msg)
raise Exception(msg)
output = utils.check_cmd(vnoded, env=env)
self.pid = int(output)
try:
self.pid = int(tmp.stdout.read())
tmp.stdout.close()
except ValueError:
msg = "vnoded failed to create a namespace; "
msg += "check kernel support and user priveleges"
logger.exception("SimpleLxcNode.startup(): %s", msg)
# create vnode client
self.client = vnodeclient.VnodeClient(self.name, self.ctrlchnlname)
if tmp.wait():
raise Exception("command failed: %s" % vnoded)
# bring up the loopback interface
logger.debug("bringing up loopback interface")
self.check_cmd([constants.IP_BIN, "link", "set", "lo", "up"])
self.vnodeclient = vnodeclient.VnodeClient(self.name, self.ctrlchnlname)
logger.info("bringing up loopback interface")
self.cmd([constants.IP_BIN, "link", "set", "lo", "up"])
logger.info("setting hostname: %s" % self.name)
self.cmd(["hostname", self.name])
# set hostname for node
logger.debug("setting hostname: %s", self.name)
self.check_cmd(["hostname", self.name])
# mark node as up
self.up = True
def shutdown(self):
@ -129,107 +139,71 @@ class SimpleLxcNode(PyCoreNode):
try:
os.kill(self.pid, signal.SIGTERM)
os.waitpid(self.pid, 0)
except OSError:
logger.exception("error killing process")
except OSError as e:
if e.errno != 10:
logger.exception("error killing process")
# remove node directory if present
try:
if os.path.exists(self.ctrlchnlname):
os.unlink(self.ctrlchnlname)
except OSError:
logger.exception("error removing file")
os.unlink(self.ctrlchnlname)
except OSError as e:
# no such file or directory
if e.errno != errno.ENOENT:
logger.exception("error removing node directory")
# clear interface data, close client, and mark self and not up
self._netif.clear()
self.vnodeclient.close()
self.client.close()
self.up = False
# TODO: potentially remove all these wrapper methods, just make use of object itself.
def cmd(self, args, wait=True):
"""
Wrapper around vnodeclient cmd.
:param args: arguments for ocmmand
:param wait: wait or not
:return:
"""
return self.vnodeclient.cmd(args, wait)
def cmdresult(self, args):
"""
Wrapper around vnodeclient cmdresult.
:param args: arguments for ocmmand
:return:
"""
return self.vnodeclient.cmdresult(args)
def popen(self, args):
"""
Wrapper around vnodeclient popen.
:param args: arguments for ocmmand
:return:
"""
return self.vnodeclient.popen(args)
def icmd(self, args):
"""
Wrapper around vnodeclient icmd.
:param args: arguments for ocmmand
:return:
"""
return self.vnodeclient.icmd(args)
def redircmd(self, infd, outfd, errfd, args, wait=True):
"""
Wrapper around vnodeclient redircmd.
:param infd: input file descriptor
:param outfd: output file descriptor
:param errfd: err file descriptor
:param args: command arguments
:param wait: wait or not
:return:
"""
return self.vnodeclient.redircmd(infd, outfd, errfd, args, wait)
def term(self, sh="/bin/sh"):
"""
Wrapper around vnodeclient term.
:param sh: shell to create terminal for
:return:
"""
return self.vnodeclient.term(sh=sh)
def termcmdstring(self, sh="/bin/sh"):
"""
Wrapper around vnodeclient termcmdstring.
:param sh: shell to run command in
:return:
"""
return self.vnodeclient.termcmdstring(sh=sh)
def shcmd(self, cmdstr, sh="/bin/sh"):
"""
Wrapper around vnodeclient shcmd.
:param str cmdstr: command string
:param sh: shell to run command in
:return:
"""
return self.vnodeclient.shcmd(cmdstr, sh=sh)
def boot(self):
"""
Boot logic.
:return: nothing
"""
pass
return None
def cmd(self, args, wait=True):
"""
Runs shell command on node, with option to not wait for a result.
:param list[str]|str args: command to run
:param bool wait: wait for command to exit, defaults to True
:return: exit status for command
:rtype: int
"""
return self.client.cmd(args, wait)
def cmd_output(self, args):
"""
Runs shell command on node and get exit status and output.
:param list[str]|str args: command to run
:return: exit status and combined stdout and stderr
:rtype: tuple[int, str]
"""
return self.client.cmd_output(args)
def check_cmd(self, args):
"""
Runs shell command on node.
:param list[str]|str args: command to run
:return: combined stdout and stderr
:rtype: str
:raises CoreCommandError: when a non-zero exit status occurs
"""
return self.client.check_cmd(args)
def termcmdstring(self, sh="/bin/sh"):
"""
Create a terminal command string.
:param str sh: shell to execute command in
:return: str
"""
return self.client.termcmdstring(sh)
def mount(self, source, target):
"""
@ -238,16 +212,16 @@ class SimpleLxcNode(PyCoreNode):
:param str source: source directory to mount
:param str target: target directory to create
:return: nothing
:raises CoreCommandError: when a non-zero exit status occurs
"""
source = os.path.abspath(source)
logger.info("mounting %s at %s" % (source, target))
try:
shcmd = 'mkdir -p "%s" && %s -n --bind "%s" "%s"' % (
target, constants.MOUNT_BIN, source, target)
self.shcmd(shcmd)
self._mounts.append((source, target))
except IOError:
logger.exception("mounting failed for %s at %s", source, target)
logger.info("node(%s) mounting: %s at %s", self.name, source, target)
cmd = 'mkdir -p "%s" && %s -n --bind "%s" "%s"' % (target, constants.MOUNT_BIN, source, target)
status, output = self.client.shcmd_result(cmd)
if status:
raise CoreCommandError(status, cmd, output)
self._mounts.append((source, target))
def newifindex(self):
"""
@ -268,8 +242,7 @@ class SimpleLxcNode(PyCoreNode):
:param net: network to associate interface with
:return: nothing
"""
self.lock.acquire()
try:
with self.lock:
if ifindex is None:
ifindex = self.newifindex()
@ -286,36 +259,38 @@ class SimpleLxcNode(PyCoreNode):
localname = "veth" + suffix
if len(localname) >= 16:
raise ValueError("interface local name (%s) too long" % localname)
name = localname + "p"
if len(name) >= 16:
raise ValueError("interface name (%s) too long" % name)
veth = VEth(node=self, name=name, localname=localname, mtu=1500, net=net, start=self.up)
veth = VEth(node=self, name=name, localname=localname, net=net, start=self.up)
if self.up:
subprocess.check_call([constants.IP_BIN, "link", "set", veth.name, "netns", str(self.pid)])
self.cmd([constants.IP_BIN, "link", "set", veth.name, "name", ifname])
utils.check_cmd([constants.IP_BIN, "link", "set", veth.name, "netns", str(self.pid)])
self.check_cmd([constants.IP_BIN, "link", "set", veth.name, "name", ifname])
veth.name = ifname
# retrieve interface information
result, output = self.cmdresult(["ip", "link", "show", veth.name])
logger.info("interface command output: %s", output)
output = output.split("\n")
veth.flow_id = int(output[0].strip().split(":")[0]) + 1
logger.info("interface flow index: %s - %s", veth.name, veth.flow_id)
veth.hwaddr = output[1].strip().split()[1]
logger.info("interface mac: %s - %s", veth.name, veth.hwaddr)
if self.up:
# TODO: potentially find better way to query interface ID
# retrieve interface information
output = self.check_cmd(["ip", "link", "show", veth.name])
logger.debug("interface command output: %s", output)
output = output.split("\n")
veth.flow_id = int(output[0].strip().split(":")[0]) + 1
logger.debug("interface flow index: %s - %s", veth.name, veth.flow_id)
veth.hwaddr = MacAddress.from_string(output[1].strip().split()[1])
logger.debug("interface mac: %s - %s", veth.name, veth.hwaddr)
try:
self.addnetif(veth, ifindex)
except:
except ValueError as e:
veth.shutdown()
del veth
raise
raise e
return ifindex
finally:
self.lock.release()
def newtuntap(self, ifindex=None, ifname=None, net=None):
"""
@ -327,27 +302,26 @@ class SimpleLxcNode(PyCoreNode):
:return: interface index
:rtype: int
"""
self.lock.acquire()
try:
with self.lock:
if ifindex is None:
ifindex = self.newifindex()
if ifname is None:
ifname = "eth%d" % ifindex
sessionid = self.session.short_session_id()
localname = "tap%s.%s.%s" % (self.objid, ifindex, sessionid)
name = ifname
ifclass = TunTap
tuntap = ifclass(node=self, name=name, localname=localname,
mtu=1500, net=net, start=self.up)
tuntap = TunTap(node=self, name=name, localname=localname, net=net, start=self.up)
try:
self.addnetif(tuntap, ifindex)
except Exception as e:
except ValueError as e:
tuntap.shutdown()
del tuntap
raise e
return ifindex
finally:
self.lock.release()
def sethwaddr(self, ifindex, addr):
"""
@ -355,14 +329,13 @@ class SimpleLxcNode(PyCoreNode):
:param int ifindex: index of interface to set hardware address for
:param core.misc.ipaddress.MacAddress addr: hardware address to set
:return: mothing
:return: nothing
:raises CoreCommandError: when a non-zero exit status occurs
"""
self._netif[ifindex].sethwaddr(addr)
if self.up:
(status, result) = self.cmdresult([constants.IP_BIN, "link", "set", "dev",
self.ifname(ifindex), "address", str(addr)])
if status:
logger.error("error setting MAC address %s", str(addr))
args = [constants.IP_BIN, "link", "set", "dev", self.ifname(ifindex), "address", str(addr)]
self.check_cmd(args)
def addaddr(self, ifindex, addr):
"""
@ -373,12 +346,14 @@ class SimpleLxcNode(PyCoreNode):
:return: nothing
"""
if self.up:
if ":" in str(addr): # check if addr is ipv6
self.cmd([constants.IP_BIN, "addr", "add", str(addr),
"dev", self.ifname(ifindex)])
# check if addr is ipv6
if ":" in str(addr):
args = [constants.IP_BIN, "addr", "add", str(addr), "dev", self.ifname(ifindex)]
self.check_cmd(args)
else:
self.cmd([constants.IP_BIN, "addr", "add", str(addr), "broadcast", "+",
"dev", self.ifname(ifindex)])
args = [constants.IP_BIN, "addr", "add", str(addr), "broadcast", "+", "dev", self.ifname(ifindex)]
self.check_cmd(args)
self._netif[ifindex].addaddr(addr)
def deladdr(self, ifindex, addr):
@ -388,6 +363,7 @@ class SimpleLxcNode(PyCoreNode):
:param int ifindex: index of interface to delete address from
:param str addr: address to delete from interface
:return: nothing
:raises CoreCommandError: when a non-zero exit status occurs
"""
try:
self._netif[ifindex].deladdr(addr)
@ -395,24 +371,28 @@ class SimpleLxcNode(PyCoreNode):
logger.exception("trying to delete unknown address: %s" % addr)
if self.up:
self.cmd([constants.IP_BIN, "addr", "del", str(addr), "dev", self.ifname(ifindex)])
self.check_cmd([constants.IP_BIN, "addr", "del", str(addr), "dev", self.ifname(ifindex)])
def delalladdr(self, ifindex, addrtypes=valid_deladdrtype):
def delalladdr(self, ifindex, address_types=valid_address_types):
"""
Delete all addresses from an interface.
:param int ifindex: index of interface to delete all addresses from
:param tuple addrtypes: address types to delete
:param int ifindex: index of interface to delete address types from
:param tuple[str] address_types: address types to delete
:return: nothing
:raises CoreCommandError: when a non-zero exit status occurs
"""
addr = self.getaddr(self.ifname(ifindex), rescan=True)
for t in addrtypes:
if t not in self.valid_deladdrtype:
raise ValueError("addr type must be in: " + " ".join(self.valid_deladdrtype))
for a in addr[t]:
self.deladdr(ifindex, a)
interface_name = self.ifname(ifindex)
addresses = self.client.getaddr(interface_name, rescan=True)
for address_type in address_types:
if address_type not in self.valid_address_types:
raise ValueError("addr type must be in: %s" % " ".join(self.valid_address_types))
for address in addresses[address_type]:
self.deladdr(ifindex, address)
# update cached information
self.getaddr(self.ifname(ifindex), rescan=True)
self.client.getaddr(interface_name, rescan=True)
def ifup(self, ifindex):
"""
@ -422,7 +402,7 @@ class SimpleLxcNode(PyCoreNode):
:return: nothing
"""
if self.up:
self.cmd([constants.IP_BIN, "link", "set", self.ifname(ifindex), "up"])
self.check_cmd([constants.IP_BIN, "link", "set", self.ifname(ifindex), "up"])
def newnetif(self, net=None, addrlist=None, hwaddr=None, ifindex=None, ifname=None):
"""
@ -436,8 +416,10 @@ class SimpleLxcNode(PyCoreNode):
:return: interface index
:rtype: int
"""
self.lock.acquire()
try:
if not addrlist:
addrlist = []
with self.lock:
# TODO: see if you can move this to emane specific code
if nodeutils.is_node(net, NodeTypes.EMANE):
ifindex = self.newtuntap(ifindex=ifindex, ifname=ifname, net=net)
@ -448,8 +430,8 @@ class SimpleLxcNode(PyCoreNode):
self.attachnet(ifindex, net)
netif = self.netif(ifindex)
netif.sethwaddr(hwaddr)
for addr in utils.maketuple(addrlist):
netif.addaddr(addr)
for address in utils.make_tuple(addrlist):
netif.addaddr(address)
return ifindex
else:
ifindex = self.newveth(ifindex=ifindex, ifname=ifname, net=net)
@ -460,14 +442,11 @@ class SimpleLxcNode(PyCoreNode):
if hwaddr:
self.sethwaddr(ifindex, hwaddr)
if addrlist:
for addr in utils.maketuple(addrlist):
self.addaddr(ifindex, addr)
for address in utils.make_tuple(addrlist):
self.addaddr(ifindex, address)
self.ifup(ifindex)
return ifindex
finally:
self.lock.release()
def connectnode(self, ifname, othernode, otherifname):
"""
@ -479,21 +458,19 @@ class SimpleLxcNode(PyCoreNode):
:return: nothing
"""
tmplen = 8
tmp1 = "tmp." + "".join([random.choice(string.ascii_lowercase)
for x in xrange(tmplen)])
tmp2 = "tmp." + "".join([random.choice(string.ascii_lowercase)
for x in xrange(tmplen)])
subprocess.check_call([constants.IP_BIN, "link", "add", "name", tmp1,
"type", "veth", "peer", "name", tmp2])
tmp1 = "tmp." + "".join([random.choice(string.ascii_lowercase) for _ in xrange(tmplen)])
tmp2 = "tmp." + "".join([random.choice(string.ascii_lowercase) for _ in xrange(tmplen)])
utils.check_cmd([constants.IP_BIN, "link", "add", "name", tmp1, "type", "veth", "peer", "name", tmp2])
subprocess.call([constants.IP_BIN, "link", "set", tmp1, "netns", str(self.pid)])
self.cmd([constants.IP_BIN, "link", "set", tmp1, "name", ifname])
self.addnetif(PyCoreNetIf(self, ifname), self.newifindex())
utils.check_cmd([constants.IP_BIN, "link", "set", tmp1, "netns", str(self.pid)])
self.check_cmd([constants.IP_BIN, "link", "set", tmp1, "name", ifname])
interface = PyCoreNetIf(node=self, name=ifname, mtu=_DEFAULT_MTU)
self.addnetif(interface, self.newifindex())
subprocess.check_call([constants.IP_BIN, "link", "set", tmp2, "netns", str(othernode.pid)])
othernode.cmd([constants.IP_BIN, "link", "set", tmp2, "name", otherifname])
othernode.addnetif(PyCoreNetIf(othernode, otherifname),
othernode.newifindex())
utils.check_cmd([constants.IP_BIN, "link", "set", tmp2, "netns", str(othernode.pid)])
othernode.check_cmd([constants.IP_BIN, "link", "set", tmp2, "name", otherifname])
other_interface = PyCoreNetIf(node=othernode, name=otherifname, mtu=_DEFAULT_MTU)
othernode.addnetif(other_interface, othernode.newifindex())
def addfile(self, srcname, filename):
"""
@ -502,28 +479,15 @@ class SimpleLxcNode(PyCoreNode):
:param str srcname: source file name
:param str filename: file name to add
:return: nothing
:raises CoreCommandError: when a non-zero exit status occurs
"""
shcmd = 'mkdir -p $(dirname "%s") && mv "%s" "%s" && sync' % (filename, srcname, filename)
self.shcmd(shcmd)
logger.info("adding file from %s to %s", srcname, filename)
directory = os.path.dirname(filename)
def getaddr(self, ifname, rescan=False):
"""
Wrapper around vnodeclient getaddr.
:param str ifname: interface name to get address for
:param bool rescan: rescan flag
:return:
"""
return self.vnodeclient.getaddr(ifname=ifname, rescan=rescan)
def netifstats(self, ifname=None):
"""
Wrapper around vnodeclient netifstate.
:param str ifname: interface name to get state for
:return:
"""
return self.vnodeclient.netifstats(ifname=ifname)
cmd = 'mkdir -p "%s" && mv "%s" "%s" && sync' % (directory, srcname, filename)
status, output = self.client.shcmd_result(cmd)
if status:
raise CoreCommandError(status, cmd, output)
class LxcNode(SimpleLxcNode):
@ -531,8 +495,7 @@ class LxcNode(SimpleLxcNode):
Provides lcx node functionality for core nodes.
"""
def __init__(self, session, objid=None, name=None,
nodedir=None, bootsh="boot.sh", start=True):
def __init__(self, session, objid=None, name=None, nodedir=None, bootsh="boot.sh", start=True):
"""
Create a LxcNode instance.
@ -543,8 +506,7 @@ class LxcNode(SimpleLxcNode):
:param bootsh: boot shell
:param bool start: start flag
"""
super(LxcNode, self).__init__(session=session, objid=objid,
name=name, nodedir=nodedir, start=start)
super(LxcNode, self).__init__(session=session, objid=objid, name=name, nodedir=nodedir, start=start)
self.bootsh = bootsh
if start:
self.startup()
@ -571,16 +533,11 @@ class LxcNode(SimpleLxcNode):
:return: nothing
"""
self.lock.acquire()
try:
with self.lock:
self.makenodedir()
super(LxcNode, self).startup()
self.privatedir("/var/run")
self.privatedir("/var/log")
except OSError:
logger.exception("error during LxcNode.startup()")
finally:
self.lock.release()
def shutdown(self):
"""
@ -590,16 +547,14 @@ class LxcNode(SimpleLxcNode):
"""
if not self.up:
return
self.lock.acquire()
# services are instead stopped when session enters datacollect state
# self.session.services.stopnodeservices(self)
try:
super(LxcNode, self).shutdown()
except:
logger.exception("error during shutdown")
finally:
self.rmnodedir()
self.lock.release()
with self.lock:
try:
super(LxcNode, self).shutdown()
except OSError:
logger.exception("error during shutdown")
finally:
self.rmnodedir()
def privatedir(self, path):
"""
@ -611,12 +566,7 @@ class LxcNode(SimpleLxcNode):
if path[0] != "/":
raise ValueError("path not fully qualified: %s" % path)
hostpath = os.path.join(self.nodedir, os.path.normpath(path).strip("/").replace("/", "."))
try:
os.mkdir(hostpath)
except OSError:
logger.exception("error creating directory: %s", hostpath)
os.mkdir(hostpath)
self.mount(hostpath, path)
def hostfilename(self, filename):
@ -628,7 +578,7 @@ class LxcNode(SimpleLxcNode):
"""
dirname, basename = os.path.split(filename)
if not basename:
raise ValueError("no basename for filename: " + filename)
raise ValueError("no basename for filename: %s" % filename)
if dirname and dirname[0] == "/":
dirname = dirname[1:]
dirname = dirname.replace("/", ".")
@ -659,11 +609,10 @@ class LxcNode(SimpleLxcNode):
:param int mode: mode for file
:return: nothing
"""
f = self.opennodefile(filename, "w")
f.write(contents)
os.chmod(f.name, mode)
f.close()
logger.info("created nodefile: %s; mode: 0%o", f.name, mode)
with self.opennodefile(filename, "w") as open_file:
open_file.write(contents)
os.chmod(open_file.name, mode)
logger.info("node(%s) added file: %s; mode: 0%o", self.name, open_file.name, mode)
def nodefilecopy(self, filename, srcfilename, mode=None):
"""
@ -679,4 +628,4 @@ class LxcNode(SimpleLxcNode):
shutil.copy2(srcfilename, hostfilename)
if mode is not None:
os.chmod(hostfilename, mode)
logger.info("copied nodefile: %s; mode: %s", hostfilename, mode)
logger.info("node(%s) copied file: %s; mode: %s", self.name, hostfilename, mode)

View file

@ -6,18 +6,13 @@ by invoking the vcmd shell command.
"""
import os
import stat
import subprocess
import vcmd
from core import CoreCommandError
from core import constants
from core import logger
USE_VCMD_MODULE = True
if USE_VCMD_MODULE:
import vcmd
VCMD = os.path.join(constants.CORE_SBIN_DIR, "vcmd")
from core.misc import utils
class VnodeClient(object):
@ -34,12 +29,19 @@ class VnodeClient(object):
"""
self.name = name
self.ctrlchnlname = ctrlchnlname
if USE_VCMD_MODULE:
self.cmdchnl = vcmd.VCmd(self.ctrlchnlname)
else:
self.cmdchnl = None
self.cmdchnl = vcmd.VCmd(self.ctrlchnlname)
self._addr = {}
def _verify_connection(self):
"""
Checks that the vcmd client is properly connected.
:return: nothing
:raises IOError: when not connected
"""
if not self.connected():
raise IOError("vcmd not connected")
def connected(self):
"""
Check if node is connected or not.
@ -47,10 +49,7 @@ class VnodeClient(object):
:return: True if connected, False otherwise
:rtype: bool
"""
if USE_VCMD_MODULE:
return self.cmdchnl.connected()
else:
return True
return self.cmdchnl.connected()
def close(self):
"""
@ -58,85 +57,82 @@ class VnodeClient(object):
:return: nothing
"""
if USE_VCMD_MODULE:
self.cmdchnl.close()
self.cmdchnl.close()
def cmd(self, args, wait=True):
"""
Execute a command on a node and return the status (return code).
:param list args: command arguments
:param list[str]|str args: command arguments
:param bool wait: wait for command to end or not
:return: command status
:rtype: int
"""
if USE_VCMD_MODULE:
if not self.cmdchnl.connected():
raise ValueError("self.cmdchnl not connected")
tmp = self.cmdchnl.qcmd(args)
if not wait:
return tmp
tmp = tmp.wait()
else:
if wait:
mode = os.P_WAIT
else:
mode = os.P_NOWAIT
tmp = os.spawnlp(mode, VCMD, VCMD, "-c", self.ctrlchnlname, "-q", "--", *args)
if not wait:
return tmp
self._verify_connection()
args = utils.split_args(args)
if tmp:
logger.warn("cmd exited with status %s: %s" % (tmp, str(args)))
# run command, return process when not waiting
p = self.cmdchnl.qcmd(args)
if not wait:
return 0
return tmp
# wait for and return exit status
return p.wait()
def cmdresult(self, args):
def cmd_output(self, args):
"""
Execute a command on a node and return a tuple containing the
exit status and result string. stderr output
is folded into the stdout result string.
:param list args: command arguments
:param list[str]|str args: command to run
:return: command status and combined stdout and stderr output
:rtype: tuple[int, str]
"""
cmdid, cmdin, cmdout, cmderr = self.popen(args)
result = cmdout.read()
result += cmderr.read()
cmdin.close()
cmdout.close()
cmderr.close()
status = cmdid.wait()
return status, result
p, stdin, stdout, stderr = self.popen(args)
stdin.close()
output = stdout.read() + stderr.read()
stdout.close()
stderr.close()
status = p.wait()
return status, output.strip()
def check_cmd(self, args):
"""
Run command and return exit status and combined stdout and stderr.
:param list[str]|str args: command to run
:return: combined stdout and stderr
:rtype: str
:raises core.CoreCommandError: when there is a non-zero exit status
"""
status, output = self.cmd_output(args)
if status != 0:
raise CoreCommandError(status, args, output)
return output.strip()
def popen(self, args):
"""
Execute a popen command against the node.
:param list args: command arguments
:param list[str]|str args: command arguments
:return: popen object, stdin, stdout, and stderr
:rtype: tuple
"""
if USE_VCMD_MODULE:
if not self.cmdchnl.connected():
raise ValueError("self.cmdchnl not connected")
return self.cmdchnl.popen(args)
else:
cmd = [VCMD, "-c", self.ctrlchnlname, "--"]
cmd.extend(args)
tmp = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return tmp, tmp.stdin, tmp.stdout, tmp.stderr
self._verify_connection()
args = utils.split_args(args)
return self.cmdchnl.popen(args)
def icmd(self, args):
"""
Execute an icmd against a node.
:param list args: command arguments
:param list[str]|str args: command arguments
:return: command result
:rtype: int
"""
return os.spawnlp(os.P_WAIT, VCMD, VCMD, "-c", self.ctrlchnlname, "--", *args)
args = utils.split_args(args)
return os.spawnlp(os.P_WAIT, constants.VCMD_BIN, constants.VCMD_BIN, "-c", self.ctrlchnlname, "--", *args)
def redircmd(self, infd, outfd, errfd, args, wait=True):
"""
@ -146,22 +142,24 @@ class VnodeClient(object):
:param infd: stdin file descriptor
:param outfd: stdout file descriptor
:param errfd: stderr file descriptor
:param list args: command arguments
:param list[str]|str args: command arguments
:param bool wait: wait flag
:return: command status
:rtype: int
"""
if not USE_VCMD_MODULE:
raise NotImplementedError
if not self.cmdchnl.connected():
raise ValueError("self.cmdchnl not connected")
tmp = self.cmdchnl.redircmd(infd, outfd, errfd, args)
self._verify_connection()
# run command, return process when not waiting
args = utils.split_args(args)
p = self.cmdchnl.redircmd(infd, outfd, errfd, args)
if not wait:
return tmp
tmp = tmp.wait()
if tmp:
logger.warn("cmd exited with status %s: %s" % (tmp, str(args)))
return tmp
return p
# wait for and return exit status
status = p.wait()
if status:
logger.warn("cmd exited with status %s: %s", status, args)
return status
def term(self, sh="/bin/sh"):
"""
@ -171,13 +169,12 @@ class VnodeClient(object):
:return: terminal command result
:rtype: int
"""
cmd = ("xterm", "-ut", "-title", self.name, "-e",
VCMD, "-c", self.ctrlchnlname, "--", sh)
args = ("xterm", "-ut", "-title", self.name, "-e", constants.VCMD_BIN, "-c", self.ctrlchnlname, "--", sh)
if "SUDO_USER" in os.environ:
cmd = ("su", "-s", "/bin/sh", "-c",
"exec " + " ".join(map(lambda x: "'%s'" % x, cmd)),
os.environ["SUDO_USER"])
return os.spawnvp(os.P_NOWAIT, cmd[0], cmd)
args = ("su", "-s", "/bin/sh", "-c",
"exec " + " ".join(map(lambda x: "'%s'" % x, args)),
os.environ["SUDO_USER"])
return os.spawnvp(os.P_NOWAIT, args[0], args)
def termcmdstring(self, sh="/bin/sh"):
"""
@ -186,18 +183,29 @@ class VnodeClient(object):
:param str sh: shell to execute command in
:return: str
"""
return "%s -c %s -- %s" % (VCMD, self.ctrlchnlname, sh)
return "%s -c %s -- %s" % (constants.VCMD_BIN, self.ctrlchnlname, sh)
def shcmd(self, cmdstr, sh="/bin/sh"):
def shcmd(self, cmd, sh="/bin/sh"):
"""
Execute a shell command.
:param str cmdstr: command string
:param str cmd: command string
:param str sh: shell to run command in
:return: command result
:rtype: int
"""
return self.cmd([sh, "-c", cmdstr])
return self.cmd([sh, "-c", cmd])
def shcmd_result(self, cmd, sh="/bin/sh"):
"""
Execute a shell command and return the exist status and combined output.
:param str cmd: shell command to run
:param str sh: shell to run command in
:return: exist status and combined output
:rtype: tuple[int, str]
"""
return self.cmd_output([sh, "-c", cmd])
def getaddr(self, ifname, rescan=False):
"""
@ -210,35 +218,36 @@ class VnodeClient(object):
"""
if ifname in self._addr and not rescan:
return self._addr[ifname]
tmp = {"ether": [], "inet": [], "inet6": [], "inet6link": []}
cmd = [constants.IP_BIN, "addr", "show", "dev", ifname]
cmdid, cmdin, cmdout, cmderr = self.popen(cmd)
cmdin.close()
for line in cmdout:
interface = {"ether": [], "inet": [], "inet6": [], "inet6link": []}
args = [constants.IP_BIN, "addr", "show", "dev", ifname]
p, stdin, stdout, stderr = self.popen(args)
stdin.close()
for line in stdout:
line = line.strip().split()
if line[0] == "link/ether":
tmp["ether"].append(line[1])
interface["ether"].append(line[1])
elif line[0] == "inet":
tmp["inet"].append(line[1])
interface["inet"].append(line[1])
elif line[0] == "inet6":
if line[3] == "global":
tmp["inet6"].append(line[1])
interface["inet6"].append(line[1])
elif line[3] == "link":
tmp["inet6link"].append(line[1])
interface["inet6link"].append(line[1])
else:
logger.warn("unknown scope: %s" % line[3])
err = cmderr.read()
cmdout.close()
cmderr.close()
status = cmdid.wait()
err = stderr.read()
stdout.close()
stderr.close()
status = p.wait()
if status:
logger.warn("nonzero exist status (%s) for cmd: %s" % (status, cmd))
logger.warn("nonzero exist status (%s) for cmd: %s", status, args)
if err:
logger.warn("error output: %s" % err)
self._addr[ifname] = tmp
return tmp
logger.warn("error output: %s", err)
self._addr[ifname] = interface
return interface
def netifstats(self, ifname=None):
"""
@ -249,16 +258,16 @@ class VnodeClient(object):
:rtype: dict
"""
stats = {}
cmd = ["cat", "/proc/net/dev"]
cmdid, cmdin, cmdout, cmderr = self.popen(cmd)
cmdin.close()
args = ["cat", "/proc/net/dev"]
p, stdin, stdout, stderr = self.popen(args)
stdin.close()
# ignore first line
cmdout.readline()
stdout.readline()
# second line has count names
tmp = cmdout.readline().strip().split("|")
tmp = stdout.readline().strip().split("|")
rxkeys = tmp[1].split()
txkeys = tmp[2].split()
for line in cmdout:
for line in stdout:
line = line.strip().split()
devname, tmp = line[0].split(":")
if tmp:
@ -271,53 +280,15 @@ class VnodeClient(object):
for count in txkeys:
stats[devname]["tx"][count] = int(line[field])
field += 1
err = cmderr.read()
cmdout.close()
cmderr.close()
status = cmdid.wait()
err = stderr.read()
stdout.close()
stderr.close()
status = p.wait()
if status:
logger.warn("nonzero exist status (%s) for cmd: %s" % (status, cmd))
logger.warn("nonzero exist status (%s) for cmd: %s", status, args)
if err:
logger.warn("error output: %s" % err)
logger.warn("error output: %s", err)
if ifname is not None:
return stats[ifname]
else:
return stats
def createclients(sessiondir, clientcls=VnodeClient, cmdchnlfilterfunc=None):
"""
Create clients
:param str sessiondir: session directory to create clients
:param class clientcls: class to create clients from
:param func cmdchnlfilterfunc: command channel filter function
:return: list of created clients
:rtype: list
"""
direntries = map(lambda x: os.path.join(sessiondir, x), os.listdir(sessiondir))
cmdchnls = filter(lambda x: stat.S_ISSOCK(os.stat(x).st_mode), direntries)
if cmdchnlfilterfunc:
cmdchnls = filter(cmdchnlfilterfunc, cmdchnls)
cmdchnls.sort()
return map(lambda x: clientcls(os.path.basename(x), x), cmdchnls)
def createremoteclients(sessiondir, clientcls=VnodeClient, filterfunc=None):
"""
Creates remote VnodeClients, for nodes emulated on other machines. The
session.Broker writes a n1.conf/server file having the server's info.
:param str sessiondir: session directory to create clients
:param class clientcls: class to create clients from
:param func filterfunc: filter function
:return: list of remove clients
:rtype: list
"""
direntries = map(lambda x: os.path.join(sessiondir, x), os.listdir(sessiondir))
nodedirs = filter(lambda x: stat.S_ISDIR(os.stat(x).st_mode), direntries)
nodedirs = filter(lambda x: os.path.exists(os.path.join(x, "server")), nodedirs)
if filterfunc:
nodedirs = filter(filterfunc, nodedirs)
nodedirs.sort()
return map(lambda x: clientcls(x), nodedirs)