initial working code for consolidated logic into a future session object, for dealing with nodes/links

This commit is contained in:
Blake J. Harnden 2018-04-19 14:25:45 -07:00
parent 9fe57c6089
commit d8796b377f
4 changed files with 2201 additions and 5 deletions

View file

@ -1,10 +1,133 @@
# import itertools
import os
from core import services
import core.services
from core import logger
from core.coreobj import PyCoreNode, PyCoreNet
from core.data import NodeData
from core.emane.nodes import EmaneNode
from core.enumerations import NodeTypes, EventTypes, LinkTypes
from core.misc import nodeutils
from core.misc.ipaddress import Ipv4Prefix
from core.netns.nodes import CoreNode
from core.session import Session
from core.xml.xmlparser import core_document_parser
from core.xml.xmlwriter import core_document_writer
class InterfaceData(object):
def __init__(self, _id, name, mac, ip4, ip4_mask, ip6, ip6_mask):
self.id = _id
self.name = name
self.mac = mac
self.ip4 = ip4
self.ip4_mask = ip4_mask
self.ip6 = ip6
self.ip6_mask = ip6_mask
def has_ip4(self):
return all([self.ip4, self.ip4_mask])
def has_ip6(self):
return all([self.ip6, self.ip6_mask])
def ip4_address(self):
if self.has_ip4():
return "%s/%s" % (self.ip4, self.ip4_mask)
else:
return None
def ip6_address(self):
if self.has_ip6():
return "%s/%s" % (self.ip6, self.ip6_mask)
else:
return None
def get_addresses(self):
ip4 = self.ip4_address()
ip6 = self.ip6_address()
return [i for i in [ip4, ip6] if i]
def get_interfaces(link_data):
interface_one = InterfaceData(
_id=link_data.interface1_id,
name=link_data.interface1_name,
mac=link_data.interface1_mac,
ip4=link_data.interface1_ip4,
ip4_mask=link_data.interface1_ip4_mask,
ip6=link_data.interface1_ip6,
ip6_mask=link_data.interface1_ip6_mask,
)
interface_two = InterfaceData(
_id=link_data.interface2_id,
name=link_data.interface2_name,
mac=link_data.interface2_mac,
ip4=link_data.interface2_ip4,
ip4_mask=link_data.interface2_ip4_mask,
ip6=link_data.interface2_ip6,
ip6_mask=link_data.interface2_ip6_mask,
)
return interface_one, interface_two
def create_interface(node, network, addresses, interface_data):
"""
Create an interface for a node on a network using provided interface data.
:param node: node to create interface for
:param network: network to associate interface with
:param list[str] addresses:
:param InterfaceData interface_data: interface data
:return:
"""
node.newnetif(
network,
addrlist=addresses,
hwaddr=interface_data.mac,
ifindex=interface_data.id,
ifname=interface_data.name
)
return node.netif(interface_data.id, network)
def link_config(network, interface, link_data, devname=None, interface_two=None):
config = {
"netif": interface,
"bw": link_data.bandwidth,
"delay": link_data.delay,
"loss": link_data.per,
"duplicate": link_data.dup,
"jitter": link_data.jitter,
"netif2": interface_two
}
# hacky check here, because physical and emane nodes do not conform to the same linkconfig interface
if not nodeutils.is_node(network, [NodeTypes.EMANE, NodeTypes.PHYSICAL]):
config["devname"] = devname
network.linkconfig(**config)
def is_net_node(node):
"""
Convenience method for testing if a legacy core node is considered a network node.
:param object node: object to test against
:return: True if object is an instance of a network node, False otherwise
:rtype: bool
"""
return isinstance(node, PyCoreNet)
def is_core_node(node):
"""
Convenience method for testing if a legacy core node is considered a core node.
:param object node: object to test against
:return: True if object is an instance of a core node, False otherwise
:rtype: bool
"""
return isinstance(node, PyCoreNode)
class IdGen(object):
@ -30,7 +153,7 @@ class FutureSession(Session):
self.master = True
# object management
self.object_id_gen = IdGen()
self.node_id_gen = IdGen()
# set default services
self.services.defaultservices = {
@ -41,8 +164,511 @@ class FutureSession(Session):
"host": ("DefaultRoute", "SSH"),
}
def link_nodes(self, link_data):
logger.info("link message between node1(%s:%s) and node2(%s:%s)",
link_data.node1_id, link_data.interface1_id, link_data.node2_id, link_data.interface2_id)
# values to fill
net_one = None
net_two = None
# retrieve node one
n1_id = link_data.node1_id
n2_id = link_data.node2_id
node_one = self.get_object(n1_id)
node_two = self.get_object(n2_id)
# both node ids are provided
tunnel = self.broker.gettunnel(n1_id, n2_id)
logger.info("tunnel between nodes: %s", tunnel)
if nodeutils.is_node(tunnel, NodeTypes.TAP_BRIDGE):
net_one = tunnel
if tunnel.remotenum == n1_id:
node_one = None
else:
node_two = None
# PhysicalNode connected via GreTap tunnel; uses adoptnetif() below
elif tunnel:
if tunnel.remotenum == n1_id:
node_one = None
else:
node_two = None
if is_net_node(node_one):
if not net_one:
net_one = node_one
else:
net_two = node_one
node_one = None
if is_net_node(node_two):
if not net_one:
net_one = node_two
else:
net_two = node_two
node_two = None
logger.info("link node types n1(%s) n2(%s) net1(%s) net2(%s) tunnel(%s)",
node_one, node_two, net_one, net_two, tunnel)
return node_one, node_two, net_one, net_two, tunnel
# TODO: this doesn't appear to ever be used, EMANE or basic wireless range
def _link_wireless(self, objects, connect):
"""
Objects to deal with when connecting/disconnecting wireless links.
:param list objects: possible objects to deal with
:param bool connect: link interfaces if True, unlink otherwise
:return: nothing
"""
objects = [x for x in objects if x]
if len(objects) < 2:
raise ValueError("wireless link failure: %s", objects)
logger.info("handling wireless linking objects(%) connect(%s)", objects, connect)
common_networks = objects[0].commonnets(objects[1])
for common_network, interface_one, interface_two in common_networks:
if not nodeutils.is_node(common_network, [NodeTypes.WIRELESS_LAN, NodeTypes.EMANE]):
logger.info("skipping common network that is not wireless/emane: %s", common_network)
continue
logger.info("wireless linking connect(%s): %s - %s", connect, interface_one, interface_two)
if connect:
common_network.link(interface_one, interface_two)
else:
common_network.unlink(interface_one, interface_two)
else:
raise ValueError("no common network found for wireless link/unlink")
def link_add(self, link_data):
# interface data
interface_one_data, interface_two_data = get_interfaces(link_data)
# get node objects identified by link data
node_one, node_two, net_one, net_two, tunnel = self.link_nodes(link_data)
if node_one:
node_one.lock.acquire()
if node_two:
node_two.lock.acquire()
try:
# wireless link
if link_data.link_type == LinkTypes.WIRELESS.value:
objects = [node_one, node_two, net_one, net_two]
self._link_wireless(objects, connect=True)
# wired link
else:
# 2 nodes being linked, ptp network
if all([node_one, node_two]) and not net_one:
ptp_class = nodeutils.get_node_class(NodeTypes.PEER_TO_PEER)
start = self.state > EventTypes.DEFINITION_STATE.value
net_one = self.add_object(cls=ptp_class, start=start)
# node to network
if node_one and net_one:
addresses = []
addresses.extend(interface_one_data.get_addresses())
addresses.extend(interface_two_data.get_addresses())
interface = create_interface(node_one, net_one, addresses, interface_one_data)
link_config(net_one, interface, link_data)
# network to node
if node_two and net_one:
addresses = []
addresses.extend(interface_one_data.get_addresses())
addresses.extend(interface_two_data.get_addresses())
interface = create_interface(node_two, net_one, addresses, interface_two_data)
if not link_data.unidirectional:
link_config(net_one, interface, link_data)
# network to network
if net_one and net_two:
if nodeutils.is_node(net_two, NodeTypes.RJ45):
interface = net_two.linknet(net_one)
else:
interface = net_one.linknet(net_two)
link_config(net_one, interface, link_data)
if not link_data.unidirectional:
interface.swapparams("_params_up")
link_config(net_two, interface, link_data, devname=interface.name)
interface.swapparams("_params_up")
# a tunnel was found for the nodes
addresses = []
if not node_one and net_one:
addresses.extend(interface_one_data.get_addresses())
if not node_two and net_two:
addresses.extend(interface_two_data.get_addresses())
# tunnel node logic
key = link_data.key
if key and nodeutils.is_node(net_one, NodeTypes.TUNNEL):
net_one.setkey(key)
if addresses:
net_one.addrconfig(addresses)
if key and nodeutils.is_node(net_two, NodeTypes.TUNNEL):
net_two.setkey(key)
if addresses:
net_two.addrconfig(addresses)
if not net_one and not net_two and (not node_one or not node_two):
addresses = []
if node_one and nodeutils.is_node(node_one, NodeTypes.PHYSICAL):
addresses.extend(interface_one_data.get_addresses())
node_one.adoptnetif(tunnel, link_data.interface1_id, link_data.interface1_mac, addresses)
link_config(node_one, tunnel, link_data)
elif node_two and nodeutils.is_node(node_two, NodeTypes.PHYSICAL):
addresses.extend(interface_two_data.get_addresses())
node_two.adoptnetif(tunnel, link_data.interface2_id, link_data.interface2_mac, addresses)
link_config(node_two, tunnel, link_data)
finally:
if node_one:
node_one.lock.release()
if node_two:
node_two.lock.release()
def link_delete(self, link_data):
# interface data
interface_one_data, interface_two_data = get_interfaces(link_data)
# get node objects identified by link data
node_one, node_two, net_one, net_two, tunnel = self.link_nodes(link_data)
if node_one:
node_one.lock.acquire()
if node_two:
node_two.lock.acquire()
try:
# wireless link
if link_data.link_type == LinkTypes.WIRELESS.value:
objects = [node_one, node_two, net_one, net_two]
self._link_wireless(objects, connect=False)
# wired link
else:
if all([node_one, node_two]):
# TODO: fix this for the case where ifindex[1,2] are not specified
# a wired unlink event, delete the connecting bridge
interface_one = node_one.netif(interface_one_data.id)
interface_two = node_two.netif(interface_two_data.id)
# get interfaces from common network, if no network node
# otherwise get interfaces between a node and network
if not interface_one and not interface_two:
common_networks = node_one.commonnets(node_two)
for network, common_interface_one, common_interface_two in common_networks:
if (net_one and network == net_one) or not net_one:
interface_one = common_interface_one
interface_two = common_interface_two
break
if all([interface_one, interface_two]) and any([interface_one.net, interface_two.net]):
if interface_one.net != interface_two.net and all([interface_one.up, interface_two.up]):
raise ValueError("no common network found")
net_one = interface_one.net
interface_one.detachnet()
interface_two.detachnet()
if net_one.numnetif() == 0:
self.delete_object(net_one.objid)
node_one.delnetif(interface_one_data.id)
node_two.delnetif(interface_two_data.id)
finally:
if node_one:
node_one.lock.release()
if node_two:
node_two.lock.release()
def link_update(self, link_data):
# interface data
interface_one_data, interface_two_data = get_interfaces(link_data)
# get node objects identified by link data
node_one, node_two, net_one, net_two, tunnel = self.link_nodes(link_data)
if node_one:
node_one.lock.acquire()
if node_two:
node_two.lock.acquire()
try:
# wireless link
if link_data.link_type == LinkTypes.WIRELESS.value:
raise ValueError("cannot update wireless link")
else:
if not node_one and not node_two:
if net_one and net_two:
# modify link between nets
interface = net_one.getlinknetif(net_two)
upstream = False
if not interface:
upstream = True
interface = net_two.getlinknetif(net_one)
if not interface:
raise ValueError("modify unknown link between nets")
if upstream:
interface.swapparams("_params_up")
link_config(net_one, interface, link_data, devname=interface.name)
interface.swapparams("_params_up")
else:
link_config(net_one, interface, link_data)
if not link_data.unidirectional:
if upstream:
link_config(net_two, interface, link_data)
else:
interface.swapparams("_params_up")
link_config(net_two, interface, link_data, devname=interface.name)
interface.swapparams("_params_up")
else:
raise ValueError("modify link for unknown nodes")
elif not node_one:
# node1 = layer 2node, node2 = layer3 node
interface = node_two.netif(interface_two_data.id, net_one)
link_config(net_one, interface, link_data)
elif not node_two:
# node2 = layer 2node, node1 = layer3 node
interface = node_one.netif(interface_one_data.id, net_one)
link_config(net_one, interface, link_data)
else:
common_networks = node_one.commonnets(node_two)
for net_one, interface_one, interface_two in common_networks:
if interface_one_data.id and interface_one_data.id != node_one.getifindex(interface_one):
continue
link_config(net_one, interface_one, link_data, interface_two=interface_two)
if not link_data.unidirectional:
link_config(net_one, interface_two, link_data, interface_two=interface_one)
else:
raise ValueError("no common network found")
finally:
if node_one:
node_one.lock.release()
if node_two:
node_two.lock.release()
def node_add(self, node_data):
"""
Add a node to the session, based on the provided node data.
:param core.data.NodeData node_data: data to create node with
:return: nothing
"""
# retrieve node class for given node type
try:
node_type = NodeTypes(node_data.node_type)
node_class = nodeutils.get_node_class(node_type)
except KeyError:
logger.error("invalid node type to create: %s", node_data.node_type)
return None
# set node start based on current session state, override and check when rj45
start = self.state > EventTypes.DEFINITION_STATE.value
enable_rj45 = getattr(self.options, "enablerj45", "0") == "1"
if node_type == NodeTypes.RJ45 and not enable_rj45:
start = False
# determine node id
node_id = node_data.id
if not node_id:
node_id = self.node_id_gen.next()
# generate name if not provided
name = node_data.name
if not name:
name = "%s%s" % (node_class.__name__, node_id)
# create node
node = self.add_object(cls=node_class, objid=node_id, name=name, start=start)
# set node attributes
node.type = node_data.model or "router"
node.icon = node_data.icon
node.canvas = node_data.canvas
node.opaque = node_data.opaque
# set node position and broadcast it
self.node_set_position(node, node_data)
# add services to default and physical nodes only
services = node_data.services
if node_type in [NodeTypes.DEFAULT, NodeTypes.PHYSICAL]:
logger.info("setting model (%s) with services (%s)", node.type, services)
self.services.addservicestonode(node, node.type, services)
# boot nodes if created after runtime, LcxNodes, Physical, and RJ45 are all PyCoreNodes
is_boot_node = isinstance(node, PyCoreNode) and not nodeutils.is_node(node, NodeTypes.RJ45)
if self.state == EventTypes.RUNTIME_STATE.value and is_boot_node:
self.write_objects()
self.add_remove_control_interface(node=node, remove=False)
# TODO: common method to both Physical and LxcNodes, but not the common PyCoreNode
node.boot()
# return node id, in case it was generated
return node_id
def node_update(self, node_data):
try:
# get node to update
node = self.get_object(node_data.id)
# set node position and broadcast it
self.node_set_position(node, node_data)
# update attributes
node.canvas = node_data.canvas
node.icon = node_data.icon
except KeyError:
logger.error("failure to update node that does not exist: %s", node_data.id)
def node_delete(self, node_id):
# delete node and check for session shutdown if a node was removed
result = self.custom_delete_object(node_id)
if result:
self.check_shutdown()
return result
def node_set_position(self, node, node_data):
# extract location values
x = node_data.x_position
y = node_data.y_position
lat = node_data.latitude
lon = node_data.longitude
alt = node_data.altitude
# check if we need to generate position from lat/lon/alt
has_empty_position = all(i is None for i in [x, y])
has_lat_lon_alt = all(i is not None for i in [lat, lon, alt])
using_lat_lon_alt = has_empty_position and has_lat_lon_alt
if using_lat_lon_alt:
x, y, _ = self.location.getxyz(lat, lon, alt)
# set position and broadcast
node.setposition(x, y, None)
# broadcast updated location when using lat/lon/alt
if using_lat_lon_alt:
self.broadcast_node_location(node)
def broadcast_node_location(self, node):
"""
Broadcast node location to all listeners.
:param core.netns.nodes.PyCoreObj node: node to broadcast location for
:return: nothing
"""
node_data = NodeData(
message_type=0,
id=node.objid,
x_position=node.position.x,
y_position=node.position.y
)
self.broadcast_node(node_data)
def shutdown(self):
self.set_state(state=EventTypes.DATACOLLECT_STATE.value, send_event=True)
self.set_state(state=EventTypes.SHUTDOWN_STATE.value, send_event=True)
super(FutureSession, self).shutdown()
def custom_delete_object(self, object_id):
"""
Remove an emulation object.
:param int object_id: object id to remove
:return: True if object deleted, False otherwise
"""
result = False
with self._objects_lock:
if object_id in self.objects:
obj = self.objects.pop(object_id)
obj.shutdown()
result = True
return result
def is_active(self):
return self.state in {EventTypes.RUNTIME_STATE.value, EventTypes.DATACOLLECT_STATE.value}
def open_xml(self, file_name, start=False):
"""
Import a session from the EmulationScript XML format.
:param str file_name: xml file to load session from
:param bool start: instantiate session if true, false otherwise
:return: nothing
"""
# clear out existing session
self.clear()
# set default node class when one is not provided
node_class = nodeutils.get_node_class(NodeTypes.DEFAULT)
options = {"start": start, "nodecls": node_class}
core_document_parser(self, file_name, options)
if start:
self.name = os.path.basename(file_name)
self.file_name = file_name
self.instantiate()
def save_xml(self, file_name, version):
"""
Export a session to the EmulationScript XML format.
:param str file_name: file name to write session xml to
:param str version: xml version type
:return: nothing
"""
doc = core_document_writer(self, version)
doc.writexml(file_name)
def hook_add(self, state, file_name, source_name, data):
"""
Store a hook from a received file message.
:param int state: when to run hook
:param str file_name: file name for hook
:param str source_name: source name
:param data: hook data
:return: nothing
"""
# hack to conform with old logic until updated
state = ":%s" % state
self.set_hook(state, file_name, source_name, data)
def node_service_file(self, node_id, service_name, file_name, source_name, data):
# hack to conform with old logic until updated
service_name = ":%s" % service_name
self.services.setservicefile(node_id, service_name, file_name, source_name, data)
def node_file(self, node_id, source_name, file_name, data):
node = self.get_object(node_id)
if source_name is not None:
node.addfile(source_name, file_name)
elif data is not None:
node.nodefile(file_name, data)
def clear(self):
self.delete_objects()
self.del_hooks()
self.broker.reset()
def start_events(self):
self.event_loop.run()
def services_event(self, event_data):
self.services.handleevent(event_data)
def mobility_event(self, event_data):
self.mobility.handleevent(event_data)
def create_node(self, cls, name=None, model=None):
object_id = self.object_id_gen.next()
object_id = self.node_id_gen.next()
if not name:
name = "%s%s" % (cls.__name__, object_id)
@ -103,7 +729,7 @@ class CoreEmu(object):
self.sessions = {}
# load default services
services.load()
core.services.load()
def create_session(self):
"""

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,263 @@
"""
Defines server classes and request handlers for TCP and UDP.
"""
import SocketServer
import threading
import time
from core import logger
from core.api import coreapi
from core.enumerations import EventTypes
from core.enumerations import SessionTlvs
from core.future.coreemu import FutureSession
class FutureServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
"""
TCP server class, manages sessions and spawns request handlers for
incoming connections.
"""
daemon_threads = True
allow_reuse_address = True
servers = set()
def __init__(self, server_address, handler_class, config=None):
"""
Server class initialization takes configuration data and calls
the SocketServer constructor
:param tuple[str, int] server_address: server host and port to use
:param class handler_class: request handler
:param dict config: configuration setting
:return:
"""
self.config = config
self.sessions = {}
self.udpserver = None
self.udpthread = None
self._sessions_lock = threading.Lock()
FutureServer.add_server(self)
SocketServer.TCPServer.__init__(self, server_address, handler_class)
@classmethod
def add_server(cls, server):
"""
Add a core server to the known servers set.
:param CoreServer server: server to add
:return: nothing
"""
cls.servers.add(server)
@classmethod
def remove_server(cls, server):
"""
Remove a core server from the known servers set.
:param CoreServer server: server to remove
:return: nothing
"""
if server in cls.servers:
cls.servers.remove(server)
def shutdown(self):
"""
Shutdown the server, all known sessions, and remove server from known servers set.
:return: nothing
"""
# shutdown all known sessions
for session in self.sessions.values():
session.shutdown()
# remove server from server list
FutureServer.remove_server(self)
def add_session(self, session):
"""
Add a session to our dictionary of sessions, ensuring a unique session number.
:param core.session.Session session: session to add
:return: added session
:raise KeyError: when a session with the same id already exists
"""
with self._sessions_lock:
if session.session_id in self.sessions:
raise KeyError("non-unique session id %s for %s" % (session.session_id, session))
self.sessions[session.session_id] = session
return session
def remove_session(self, session):
"""
Remove a session from our dictionary of sessions.
:param core.session.Session session: session to remove
:return: removed session
:rtype: core.session.Session
"""
with self._sessions_lock:
if session.session_id not in self.sessions:
logger.info("session id %s not found (sessions=%s)", session.session_id, self.sessions.keys())
else:
del self.sessions[session.session_id]
return session
def get_session_ids(self):
"""
Return a list of active session numbers.
:return: known session ids
:rtype: list
"""
with self._sessions_lock:
session_ids = self.sessions.keys()
return session_ids
def create_session(self, session_id=None):
"""
Convenience method for creating sessions with the servers config.
:param int session_id: session id for new session
:return: create session
:rtype: FutureSession
"""
# create random id when necessary, seems to be 1 case wanted, based on legacy code
# creating a value so high, typical client side generation schemes hopefully wont collide
if not session_id:
session_id = next(
session_id for session_id in xrange(60000, 65000)
if session_id not in self.sessions
)
# create and add session to local manager
session = FutureSession(session_id, config=self.config)
self.add_session(session)
# add shutdown handler to remove session from manager
session.shutdown_handlers.append(self.session_shutdown)
return session
def get_session(self, session_id=None):
"""
Create a new session or retrieve an existing one from our
dictionary of sessions. When the session_id=0 and the use_existing
flag is set, return on of the existing sessions.
:param int session_id: session id of session to retrieve, defaults to returning random session
:return: session
:rtype: core.session.Session
"""
with self._sessions_lock:
# return specified session or none
if session_id:
return self.sessions.get(session_id)
# retrieving known session
session = None
# find runtime session with highest node count
for known_session in filter(lambda x: x.state == EventTypes.RUNTIME_STATE.value,
self.sessions.itervalues()):
if not session or known_session.get_node_count() > session.get_node_count():
session = known_session
# return first known session otherwise
if not session:
for known_session in self.sessions.itervalues():
session = known_session
break
return session
def session_shutdown(self, session):
"""
Handler method to be used as a callback when a session has shutdown.
:param core.session.Session session: session shutting down
:return: nothing
"""
self.remove_session(session)
def to_session_message(self, flags=0):
"""
Build CORE API Sessions message based on current session info.
:param int flags: message flags
:return: session message
"""
id_list = []
name_list = []
file_list = []
node_count_list = []
date_list = []
thumb_list = []
num_sessions = 0
with self._sessions_lock:
for session_id in self.sessions:
session = self.sessions[session_id]
# debug: session.dumpsession()
num_sessions += 1
id_list.append(str(session_id))
name = session.name
if not name:
name = ""
name_list.append(name)
file = session.file_name
if not file:
file = ""
file_list.append(file)
node_count_list.append(str(session.get_node_count()))
date_list.append(time.ctime(session._state_time))
thumb = session.thumbnail
if not thumb:
thumb = ""
thumb_list.append(thumb)
session_ids = "|".join(id_list)
names = "|".join(name_list)
files = "|".join(file_list)
node_counts = "|".join(node_count_list)
dates = "|".join(date_list)
thumbs = "|".join(thumb_list)
if num_sessions > 0:
tlv_data = ""
if len(session_ids) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NUMBER.value, session_ids)
if len(names) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NAME.value, names)
if len(files) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.FILE.value, files)
if len(node_counts) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NODE_COUNT.value, node_counts)
if len(dates) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.DATE.value, dates)
if len(thumbs) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.THUMB.value, thumbs)
message = coreapi.CoreSessionMessage.pack(flags, tlv_data)
else:
message = None
return message
def dump_sessions(self):
"""
Log currently known session information.
"""
logger.info("sessions:")
with self._sessions_lock:
for session_id in self.sessions:
logger.info(session_id)

180
daemon/scripts/core-future Normal file
View file

@ -0,0 +1,180 @@
#!/usr/bin/env python
"""
core-daemon: the CORE daemon is a server process that receives CORE API
messages and instantiates emulated nodes and networks within the kernel. Various
message handlers are defined and some support for sending messages.
"""
import ConfigParser
import atexit
import optparse
import signal
import sys
import time
from core import constants
from core import coreserver
from core import enumerations
from core import logger
from core import services
from core.future.futurehandler import FutureHandler
from core.future.futureserver import FutureServer
from core.misc import nodeutils
from core.misc.utils import close_onexec
from core.service import ServiceManager
def banner():
"""
Output the program banner printed to the terminal or log file.
:return: nothing
"""
logger.info("CORE daemon v.%s started %s", constants.COREDPY_VERSION, time.ctime())
def cored(cfg=None):
"""
Start the CoreServer object and enter the server loop.
:param dict cfg: core configuration
:return: nothing
"""
host = cfg["listenaddr"]
port = int(cfg["port"])
if host == "" or host is None:
host = "localhost"
try:
server = FutureServer((host, port), FutureHandler, cfg)
except:
logger.exception("error starting main server on: %s:%s", host, port)
sys.exit(1)
close_onexec(server.fileno())
logger.info("main server started, listening on: %s:%s", host, port)
server.serve_forever()
# TODO: should sessions and the main core daemon both catch exit to shutdown independently?
def cleanup():
"""
Runs server shutdown and cleanup when catching an exit signal.
:return: nothing
"""
while coreserver.CoreServer.servers:
server = coreserver.CoreServer.servers.pop()
server.shutdown()
def sighandler(signum, stackframe):
"""
Signal handler when different signals are sent.
:param int signum: singal number sent
:param stackframe: stack frame sent
:return: nothing
"""
logger.error("terminated by signal: %s", signum)
sys.exit(signum)
signal.signal(signal.SIGHUP, sighandler)
signal.signal(signal.SIGINT, sighandler)
signal.signal(signal.SIGTERM, sighandler)
signal.signal(signal.SIGUSR1, sighandler)
signal.signal(signal.SIGUSR2, sighandler)
atexit.register(cleanup)
def get_merged_config(filename):
"""
Return a configuration after merging config file and command-line arguments.
:param str filename: file name to merge configuration settings with
:return: merged configuration
:rtype: dict
"""
# these are the defaults used in the config file
defaults = {
"port": "%d" % enumerations.CORE_API_PORT,
"listenaddr": "localhost",
"xmlfilever": "1.0",
"numthreads": "1",
}
usagestr = "usage: %prog [-h] [options] [args]\n\n" + \
"CORE daemon v.%s instantiates Linux network namespace " \
"nodes." % constants.COREDPY_VERSION
parser = optparse.OptionParser(usage=usagestr)
parser.add_option("-f", "--configfile", dest="configfile", type="string",
help="read config from specified file; default = %s" % filename)
parser.add_option("-p", "--port", dest="port", type=int,
help="port number to listen on; default = %s" % defaults["port"])
parser.add_option("-t", "--numthreads", dest="numthreads", type=int,
help="number of server threads; default = %s" % defaults["numthreads"])
# parse command line options
options, args = parser.parse_args()
# read the config file
if options.configfile is not None:
filename = options.configfile
del options.configfile
cfg = ConfigParser.SafeConfigParser(defaults)
cfg.read(filename)
section = "core-daemon"
if not cfg.has_section(section):
cfg.add_section(section)
# merge command line with config file
for opt in options.__dict__:
val = options.__dict__[opt]
if val is not None:
cfg.set(section, opt, val.__str__())
return dict(cfg.items(section)), args
def main():
"""
Main program startup.
:return: nothing
"""
# get a configuration merged from config file and command-line arguments
cfg, args = get_merged_config("%s/core.conf" % constants.CORE_CONF_DIR)
for a in args:
logger.error("ignoring command line argument: %s", a)
# attempt load custom services
service_paths = cfg.get("custom_services_dir")
logger.debug("custom service paths: %s", service_paths)
if service_paths:
for service_path in service_paths.split(','):
service_path = service_path.strip()
ServiceManager.add_services(service_path)
banner()
try:
cored(cfg)
except KeyboardInterrupt:
logger.info("keyboard interrupt, stopping core daemon")
sys.exit(0)
if __name__ == "__main__":
# configure nodes to use
if len(sys.argv) == 2 and sys.argv[1] == "ovs":
from core.netns.openvswitch import OVS_NODES
nodeutils.update_node_map(OVS_NODES)
# load default services
services.load()
main()