moved future core server and handler code to act as the default core-daemon, updated future examples and tests to leverage new api

This commit is contained in:
Blake J. Harnden 2018-04-25 16:33:58 -07:00
parent f431895357
commit 8644e9d61e
24 changed files with 618 additions and 2728 deletions

File diff suppressed because it is too large Load diff

View file

@ -1,263 +0,0 @@
"""
Defines server classes and request handlers for TCP and UDP.
"""
import SocketServer
import threading
import time
from core import logger
from core.api import coreapi
from core.enumerations import EventTypes
from core.enumerations import SessionTlvs
from core.session import Session
class CoreServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
"""
TCP server class, manages sessions and spawns request handlers for
incoming connections.
"""
daemon_threads = True
allow_reuse_address = True
servers = set()
def __init__(self, server_address, handler_class, config=None):
"""
Server class initialization takes configuration data and calls
the SocketServer constructor
:param tuple[str, int] server_address: server host and port to use
:param class handler_class: request handler
:param dict config: configuration setting
:return:
"""
self.config = config
self.sessions = {}
self.udpserver = None
self.udpthread = None
self._sessions_lock = threading.Lock()
CoreServer.add_server(self)
SocketServer.TCPServer.__init__(self, server_address, handler_class)
@classmethod
def add_server(cls, server):
"""
Add a core server to the known servers set.
:param CoreServer server: server to add
:return: nothing
"""
cls.servers.add(server)
@classmethod
def remove_server(cls, server):
"""
Remove a core server from the known servers set.
:param CoreServer server: server to remove
:return: nothing
"""
if server in cls.servers:
cls.servers.remove(server)
def shutdown(self):
"""
Shutdown the server, all known sessions, and remove server from known servers set.
:return: nothing
"""
# shutdown all known sessions
for session in self.sessions.values():
session.shutdown()
# remove server from server list
CoreServer.remove_server(self)
def add_session(self, session):
"""
Add a session to our dictionary of sessions, ensuring a unique session number.
:param core.session.Session session: session to add
:return: added session
:raise KeyError: when a session with the same id already exists
"""
with self._sessions_lock:
if session.session_id in self.sessions:
raise KeyError("non-unique session id %s for %s" % (session.session_id, session))
self.sessions[session.session_id] = session
return session
def remove_session(self, session):
"""
Remove a session from our dictionary of sessions.
:param core.session.Session session: session to remove
:return: removed session
:rtype: core.session.Session
"""
with self._sessions_lock:
if session.session_id not in self.sessions:
logger.info("session id %s not found (sessions=%s)", session.session_id, self.sessions.keys())
else:
del self.sessions[session.session_id]
return session
def get_session_ids(self):
"""
Return a list of active session numbers.
:return: known session ids
:rtype: list
"""
with self._sessions_lock:
session_ids = self.sessions.keys()
return session_ids
def create_session(self, session_id=None):
"""
Convenience method for creating sessions with the servers config.
:param int session_id: session id for new session
:return: create session
:rtype: core.session.Session
"""
# create random id when necessary, seems to be 1 case wanted, based on legacy code
# creating a value so high, typical client side generation schemes hopefully wont collide
if not session_id:
session_id = next(
session_id for session_id in xrange(60000, 65000)
if session_id not in self.sessions
)
# create and add session to local manager
session = Session(session_id, config=self.config)
self.add_session(session)
# add shutdown handler to remove session from manager
session.shutdown_handlers.append(self.session_shutdown)
return session
def get_session(self, session_id=None):
"""
Create a new session or retrieve an existing one from our
dictionary of sessions. When the session_id=0 and the use_existing
flag is set, return on of the existing sessions.
:param int session_id: session id of session to retrieve, defaults to returning random session
:return: session
:rtype: core.session.Session
"""
with self._sessions_lock:
# return specified session or none
if session_id:
return self.sessions.get(session_id)
# retrieving known session
session = None
# find runtime session with highest node count
for known_session in filter(lambda x: x.state == EventTypes.RUNTIME_STATE.value,
self.sessions.itervalues()):
if not session or known_session.get_node_count() > session.get_node_count():
session = known_session
# return first known session otherwise
if not session:
for known_session in self.sessions.itervalues():
session = known_session
break
return session
def session_shutdown(self, session):
"""
Handler method to be used as a callback when a session has shutdown.
:param core.session.Session session: session shutting down
:return: nothing
"""
self.remove_session(session)
def to_session_message(self, flags=0):
"""
Build CORE API Sessions message based on current session info.
:param int flags: message flags
:return: session message
"""
id_list = []
name_list = []
file_list = []
node_count_list = []
date_list = []
thumb_list = []
num_sessions = 0
with self._sessions_lock:
for session_id in self.sessions:
session = self.sessions[session_id]
# debug: session.dumpsession()
num_sessions += 1
id_list.append(str(session_id))
name = session.name
if not name:
name = ""
name_list.append(name)
file = session.file_name
if not file:
file = ""
file_list.append(file)
node_count_list.append(str(session.get_node_count()))
date_list.append(time.ctime(session._state_time))
thumb = session.thumbnail
if not thumb:
thumb = ""
thumb_list.append(thumb)
session_ids = "|".join(id_list)
names = "|".join(name_list)
files = "|".join(file_list)
node_counts = "|".join(node_count_list)
dates = "|".join(date_list)
thumbs = "|".join(thumb_list)
if num_sessions > 0:
tlv_data = ""
if len(session_ids) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NUMBER.value, session_ids)
if len(names) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NAME.value, names)
if len(files) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.FILE.value, files)
if len(node_counts) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NODE_COUNT.value, node_counts)
if len(dates) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.DATE.value, dates)
if len(thumbs) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.THUMB.value, thumbs)
message = coreapi.CoreSessionMessage.pack(flags, tlv_data)
else:
message = None
return message
def dump_sessions(self):
"""
Log currently known session information.
"""
logger.info("sessions:")
with self._sessions_lock:
for session_id in self.sessions:
logger.info(session_id)

View file

@ -195,6 +195,9 @@ class FutureSession(Session):
raise ValueError("wireless link failure: %s", objects)
logger.info("handling wireless linking objects(%) connect(%s)", objects, connect)
common_networks = objects[0].commonnets(objects[1])
if not common_networks:
raise ValueError("no common network found for wireless link/unlink")
for common_network, interface_one, interface_two in common_networks:
if not nodeutils.is_node(common_network, [NodeTypes.WIRELESS_LAN, NodeTypes.EMANE]):
logger.info("skipping common network that is not wireless/emane: %s", common_network)
@ -205,8 +208,6 @@ class FutureSession(Session):
common_network.link(interface_one, interface_two)
else:
common_network.unlink(interface_one, interface_two)
else:
raise ValueError("no common network found for wireless link/unlink")
def add_link(self, node_one_id, node_two_id, interface_one=None, interface_two=None, link_options=LinkOptions()):
"""
@ -429,15 +430,17 @@ class FutureSession(Session):
link_config(net_one, interface, link_options)
else:
common_networks = node_one.commonnets(node_two)
if not common_networks:
raise ValueError("no common network found")
for net_one, interface_one, interface_two in common_networks:
if interface_one_id and interface_one_id != node_one.getifindex(interface_one):
if interface_one_id is not None and interface_one_id != node_one.getifindex(interface_one):
continue
link_config(net_one, interface_one, link_options, interface_two=interface_two)
if not link_options.unidirectional:
link_config(net_one, interface_two, link_options, interface_two=interface_one)
else:
raise ValueError("no common network found")
finally:
if node_one:
node_one.lock.release()
@ -610,8 +613,8 @@ class FutureSession(Session):
:return: nothing
"""
self.set_state(state=EventTypes.DATACOLLECT_STATE.value, send_event=True)
self.set_state(state=EventTypes.SHUTDOWN_STATE.value, send_event=True)
self.set_state(EventTypes.DATACOLLECT_STATE, send_event=True)
self.set_state(EventTypes.SHUTDOWN_STATE, send_event=True)
super(FutureSession, self).shutdown()
def custom_delete_object(self, object_id):
@ -753,9 +756,9 @@ class FutureSession(Session):
"""
self.mobility.handleevent(event_data)
def create_emane_node(self, _id=None, node_options=NodeOptions()):
def create_wireless_node(self, _id=None, node_options=NodeOptions()):
"""
Create an EMANE node for use within an EMANE network.
Create a wireless node for use within an wireless/EMANE networks.
:param int _id: int for node, defaults to None and will be generated
:param core.future.futuredata.NodeOptions node_options: options for emane node, model will always be "mdr"
@ -796,6 +799,29 @@ class FutureSession(Session):
values = list(emane_model.getdefaultvalues())
self.emane.setconfig(emane_node.objid, emane_model.name, values)
def set_wireless_model(self, node, model):
"""
Convenience method for setting a wireless model.
:param node: node to set wireless model for
:param core.mobility.WirelessModel model: wireless model to set node to
:return: nothing
"""
values = list(model.getdefaultvalues())
node.setmodel(model, values)
def wireless_link_all(self, network, nodes):
"""
Link all nodes to the provided wireless network.
:param network: wireless network to link nodes to
:param nodes: nodes to link to wireless network
:return: nothing
"""
for node in nodes:
for common_network, interface_one, interface_two in node.commonnets(network):
common_network.link(interface_one, interface_two)
class CoreEmu(object):
"""
@ -876,26 +902,3 @@ class CoreEmu(object):
logger.error("session to delete did not exist: %s", _id)
return result
def set_wireless_model(self, node, model):
"""
Convenience method for setting a wireless model.
:param node: node to set wireless model for
:param core.mobility.WirelessModel model: wireless model to set node to
:return: nothing
"""
values = list(model.getdefaultvalues())
node.setmodel(model, values)
def wireless_link_all(self, network, nodes):
"""
Link all nodes to the provided wireless network.
:param network: wireless network to link nodes to
:param nodes: nodes to link to wireless network
:return: nothing
"""
for node in nodes:
for common_network, interface_one, interface_two in node.commonnets(network):
common_network.link(interface_one, interface_two)

View file

View file

@ -13,29 +13,31 @@ import time
from core import logger
from core.api import coreapi
from core.coreserver import CoreServer
from core.data import ConfigData
from core.data import EventData
from core.enumerations import ConfigTlvs, LinkTypes
from core.enumerations import ConfigTlvs
from core.enumerations import EventTlvs
from core.enumerations import EventTypes
from core.enumerations import ExceptionTlvs
from core.enumerations import ExecuteTlvs
from core.enumerations import FileTlvs
from core.enumerations import LinkTlvs
from core.enumerations import LinkTypes
from core.enumerations import MessageFlags
from core.enumerations import MessageTypes
from core.enumerations import NodeTlvs
from core.enumerations import NodeTypes
from core.enumerations import RegisterTlvs
from core.enumerations import SessionTlvs
from core.future.futuredata import NodeOptions, LinkOptions, InterfaceData
from core.future.futuredata import InterfaceData
from core.future.futuredata import LinkOptions
from core.future.futuredata import NodeOptions
from core.misc import nodeutils
from core.misc import structutils
from core.misc import utils
class FutureHandler(SocketServer.BaseRequestHandler):
class CoreHandler(SocketServer.BaseRequestHandler):
"""
The SocketServer class uses the RequestHandler class for servicing requests.
"""
@ -578,7 +580,7 @@ class FutureHandler(SocketServer.BaseRequestHandler):
self.add_session_handlers()
# set initial session state
self.session.set_state(state=EventTypes.DEFINITION_STATE.value)
self.session.set_state(EventTypes.DEFINITION_STATE)
while True:
try:
@ -1032,13 +1034,13 @@ class FutureHandler(SocketServer.BaseRequestHandler):
session=message.get_tlv(EventTlvs.SESSION.value)
)
event_type = event_data.event_type
if event_type is None:
if event_data.event_type is None:
raise NotImplementedError("Event message missing event type")
event_type = EventTypes(event_data.event_type)
node_id = event_data.node
logger.info("EVENT %d: %s at %s", event_type, EventTypes(event_type).name, time.ctime())
if event_type <= EventTypes.SHUTDOWN_STATE.value:
logger.info("EVENT %s at %s", event_type.name, time.ctime())
if event_type.value <= EventTypes.SHUTDOWN_STATE.value:
if node_id is not None:
try:
node = self.session.get_object(node_id)
@ -1046,19 +1048,18 @@ class FutureHandler(SocketServer.BaseRequestHandler):
raise KeyError("Event message for unknown node %d" % node_id)
# configure mobility models for WLAN added during runtime
if event_type == EventTypes.INSTANTIATION_STATE.value and nodeutils.is_node(node,
NodeTypes.WIRELESS_LAN):
if event_type == EventTypes.INSTANTIATION_STATE and nodeutils.is_node(node, NodeTypes.WIRELESS_LAN):
self.session.start_mobility(node_ids=(node.objid,))
return ()
logger.warn("dropping unhandled Event message with node number")
return ()
self.session.set_state(state=event_type)
self.session.set_state(event_type)
if event_type == EventTypes.DEFINITION_STATE.value:
if event_type == EventTypes.DEFINITION_STATE:
# clear all session objects in order to receive new definitions
self.session.clear()
elif event_type == EventTypes.INSTANTIATION_STATE.value:
elif event_type == EventTypes.INSTANTIATION_STATE:
if len(self.handler_threads) > 1:
# TODO: sync handler threads here before continuing
time.sleep(2.0) # XXX
@ -1068,21 +1069,19 @@ class FutureHandler(SocketServer.BaseRequestHandler):
# after booting nodes attempt to send emulation id for nodes waiting on status
for obj in self.session.objects.itervalues():
self.send_node_emulation_id(obj.objid)
elif event_type == EventTypes.RUNTIME_STATE.value:
elif event_type == EventTypes.RUNTIME_STATE:
if self.session.master:
logger.warn("Unexpected event message: RUNTIME state received at session master")
else:
# master event queue is started in session.checkruntime()
self.session.start_events()
elif event_type == EventTypes.DATACOLLECT_STATE.value:
elif event_type == EventTypes.DATACOLLECT_STATE:
self.session.data_collect()
elif event_type == EventTypes.SHUTDOWN_STATE.value:
elif event_type == EventTypes.SHUTDOWN_STATE:
if self.session.master:
logger.warn("Unexpected event message: SHUTDOWN state received at session master")
elif event_type in (EventTypes.START.value, EventTypes.STOP.value,
EventTypes.RESTART.value,
EventTypes.PAUSE.value,
EventTypes.RECONFIGURE.value):
elif event_type in {EventTypes.START, EventTypes.STOP, EventTypes.RESTART, EventTypes.PAUSE,
EventTypes.RECONFIGURE}:
handled = False
name = event_data.name
if name:
@ -1095,17 +1094,16 @@ class FutureHandler(SocketServer.BaseRequestHandler):
self.session.mobility_event(event_data)
handled = True
if not handled:
logger.warn("Unhandled event message: event type %s (%s)",
event_type, coreapi.state_name(event_type))
elif event_type == EventTypes.FILE_OPEN.value:
logger.warn("Unhandled event message: event type %s ", event_type.name)
elif event_type == EventTypes.FILE_OPEN:
filename = event_data.name
self.session.open_xml(filename, start=False)
self.session.send_objects()
return ()
elif event_type == EventTypes.FILE_SAVE.value:
elif event_type == EventTypes.FILE_SAVE:
filename = event_data.name
self.session.save_xml(filename, self.session.config["xmlfilever"])
elif event_type == EventTypes.SCHEDULED.value:
elif event_type == EventTypes.SCHEDULED:
etime = event_data.time
node = event_data.node
name = event_data.name
@ -1118,7 +1116,7 @@ class FutureHandler(SocketServer.BaseRequestHandler):
else:
raise NotImplementedError
else:
logger.warn("Unhandled event message: event type %d", event_type)
logger.warn("Unhandled event message: event type %s", event_type)
return ()

View file

@ -1,5 +1,5 @@
"""
Defines server classes and request handlers for TCP and UDP.
Defines core server for handling TCP connections.
"""
import SocketServer
@ -7,7 +7,7 @@ import SocketServer
from core.future.coreemu import CoreEmu
class FutureServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
class CoreServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
"""
TCP server class, manages sessions and spawns request handlers for
incoming connections.
@ -28,13 +28,3 @@ class FutureServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
self.coreemu = CoreEmu(config)
self.config = config
SocketServer.TCPServer.__init__(self, server_address, handler_class)
def shutdown(self):
"""
Shutdown the server, all known sessions, and remove server from known servers set.
:return: nothing
"""
# shutdown all known sessions
for session in self.coreemu.sessions.itervalues():
session.shutdown()

View file

@ -300,27 +300,28 @@ class Session(object):
"""
Set the session's current state.
:param int state: state to set to
:param core.enumerations.EventTypes state: state to set to
:param send_event: if true, generate core API event messages
:return: nothing
"""
state_name = coreapi.state_name(state)
state_value = state.value
state_name = state.name
if self.state == state:
if self.state == state_value:
logger.info("session is already in state: %s, skipping change", state_name)
return
self.state = state
self.state = state_value
self._state_time = time.time()
logger.info("changing session %s to state %s(%s) at %s",
self.session_id, state, state_name, self._state_time)
self.session_id, state_value, state_name, self._state_time)
self.write_state(state)
self.run_hooks(state)
self.run_state_hooks(state)
self.write_state(state_value)
self.run_hooks(state_value)
self.run_state_hooks(state_value)
if send_event:
event_data = EventData(event_type=state, time="%s" % time.time())
event_data = EventData(event_type=state_value, time="%s" % time.time())
self.broadcast_event(event_data)
def write_state(self, state):
@ -868,7 +869,7 @@ class Session(object):
# start event loop and set to runtime
self.event_loop.run()
self.set_state(EventTypes.RUNTIME_STATE.value, send_event=True)
self.set_state(EventTypes.RUNTIME_STATE, send_event=True)
def data_collect(self):
"""
@ -908,7 +909,7 @@ class Session(object):
shutdown = False
if node_count == 0:
shutdown = True
self.set_state(state=EventTypes.SHUTDOWN_STATE.value)
self.set_state(EventTypes.SHUTDOWN_STATE)
return shutdown

View file

@ -250,7 +250,7 @@ class CoreDocumentParser0(object):
geo.append(a)
location.setrefgeo(geo[0], geo[1], geo[2])
scale = origin.getAttribute("scale100")
if scale is not None:
if scale is not None and scale:
location.refscale = float(scale)
point = xmlutils.get_one_element(origin, "point")
if point is not None and point.firstChild is not None: