updates to drive most core logic from CoreEmu and Sessions themselves instead of outside code, updated an example to leverage new API, fixed issues testing when executing a script

This commit is contained in:
Blake J. Harnden 2018-04-20 17:00:47 -07:00
parent 93394b042a
commit 424c08c5e0
7 changed files with 484 additions and 305 deletions

View file

@ -1,4 +1,7 @@
import atexit
import os
import signal
import sys
import core.services
from core import logger
@ -14,8 +17,42 @@ from core.xml.xmlparser import core_document_parser
from core.xml.xmlwriter import core_document_writer
def signal_handler(signal_number, _):
"""
Handle signals and force an exit with cleanup.
:param int signal_number: signal number
:param _: ignored
:return: nothing
"""
logger.info("caught signal: %s", signal_number)
sys.exit(signal_number)
signal.signal(signal.SIGHUP, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGUSR1, signal_handler)
signal.signal(signal.SIGUSR2, signal_handler)
class InterfaceData(object):
"""
Convenience class for storing interface data.
"""
def __init__(self, _id, name, mac, ip4, ip4_mask, ip6, ip6_mask):
"""
Creates an InterfaceData object.
:param int _id:
:param str name:
:param str mac:
:param str ip4:
:param int ip4_mask:
:param str ip6:
:param int ip6_mask:
"""
self.id = _id
self.name = name
self.mac = mac
@ -49,6 +86,13 @@ class InterfaceData(object):
def get_interfaces(link_data):
"""
Creates interface data objects for the interfaces defined within link data.
:param core.data.LinkData link_data: data to create interface data from
:return: interface one and two data
:rtype: tuple[InterfaceData]
"""
interface_one = InterfaceData(
_id=link_data.interface1_id,
name=link_data.interface1_name,
@ -77,7 +121,7 @@ def create_interface(node, network, interface_data):
:param node: node to create interface for
:param network: network to associate interface with
:param InterfaceData interface_data: interface data
:return:
:return: created interface
"""
node.newnetif(
network,
@ -90,6 +134,16 @@ def create_interface(node, network, interface_data):
def link_config(network, interface, link_data, devname=None, interface_two=None):
"""
Convenience method for configuring a link,
:param network: network to configure link for
:param interface: interface to configure
:param core.data.LinkData link_data: data to configure link with
:param str devname: device name, default is None
:param interface_two: other interface associated, default is None
:return: nothing
"""
config = {
"netif": interface,
"bw": link_data.bandwidth,
@ -130,8 +184,8 @@ def is_core_node(node):
class IdGen(object):
def __init__(self):
self.id = 0
def __init__(self, _id=0):
self.id = _id
def next(self):
self.id += 1
@ -148,9 +202,6 @@ class FutureSession(Session):
def __init__(self, session_id, config=None, persistent=True, mkdir=True):
super(FutureSession, self).__init__(session_id, config, persistent, mkdir)
# set master
self.master = True
# object management
self.node_id_gen = IdGen()
@ -164,6 +215,13 @@ class FutureSession(Session):
}
def link_nodes(self, link_data):
"""
Convenience method for retrieving nodes within link data.
:param core.data.LinkData link_data: data to retrieve nodes from
:return: nodes, network nodes if presetn, and tunnel
:rtype: tuple
"""
logger.info("link message between node1(%s:%s) and node2(%s:%s)",
link_data.node1_id, link_data.interface1_id, link_data.node2_id, link_data.interface2_id)
@ -239,6 +297,14 @@ class FutureSession(Session):
raise ValueError("no common network found for wireless link/unlink")
def link_add(self, link_data):
"""
Add a link between nodes.
:param core.data.LinkData link_data: data to create a link with
:return: nothing
"""
logger.info("link_data: %s", link_data)
# interface data
interface_one_data, interface_two_data = get_interfaces(link_data)
@ -324,6 +390,12 @@ class FutureSession(Session):
node_two.lock.release()
def link_delete(self, link_data):
"""
Delete a link between nodes.
:param core.data.LinkData link_data: data to delete link with
:return: nothing
"""
# interface data
interface_one_data, interface_two_data = get_interfaces(link_data)
@ -375,6 +447,12 @@ class FutureSession(Session):
node_two.lock.release()
def link_update(self, link_data):
"""
Update link information between nodes.
:param core.data.LinkData link_data: data to update link with
:return: nothing
"""
# interface data
interface_one_data, interface_two_data = get_interfaces(link_data)
@ -470,7 +548,10 @@ class FutureSession(Session):
# determine node id
node_id = node_data.id
if not node_id:
node_id = self.node_id_gen.next()
while True:
node_id = self.node_id_gen.next()
if node_id not in self.objects:
break
# generate name if not provided
name = node_data.name
@ -508,6 +589,12 @@ class FutureSession(Session):
return node_id
def node_update(self, node_data):
"""
Update node information.
:param core.data.NodeData node_data: data to update node with
:return: nothing
"""
try:
# get node to update
node = self.get_object(node_data.id)
@ -522,6 +609,12 @@ class FutureSession(Session):
logger.error("failure to update node that does not exist: %s", node_data.id)
def node_delete(self, node_id):
"""
Delete a node from the session and check if session should shutdown, if no nodes are left.
:param int node_id:
:return: True if node deleted, False otherwise
"""
# delete node and check for session shutdown if a node was removed
result = self.custom_delete_object(node_id)
if result:
@ -529,6 +622,13 @@ class FutureSession(Session):
return result
def node_set_position(self, node, node_data):
"""
Set position for a node, use lat/lon/alt if needed.
:param node: node to set position for
:param NodeData node_data: data for node
:return: nothing
"""
# extract location values
x = node_data.x_position
y = node_data.y_position
@ -566,9 +666,20 @@ class FutureSession(Session):
self.broadcast_node(node_data)
def start_mobility(self, node_ids=None):
"""
Start mobility for the provided node ids.
:param list[int] node_ids: nodes to start mobility for
:return: nothing
"""
self.mobility.startup(node_ids)
def shutdown(self):
"""
Shutdown session.
:return: nothing
"""
self.set_state(state=EventTypes.DATACOLLECT_STATE.value, send_event=True)
self.set_state(state=EventTypes.SHUTDOWN_STATE.value, send_event=True)
super(FutureSession, self).shutdown()
@ -589,7 +700,14 @@ class FutureSession(Session):
return result
def is_active(self):
return self.state in {EventTypes.RUNTIME_STATE.value, EventTypes.DATACOLLECT_STATE.value}
"""
Determine if this session is considered to be active. (Runtime or Data collect states)
:return: True if active, False otherwise
"""
result = self.state in {EventTypes.RUNTIME_STATE.value, EventTypes.DATACOLLECT_STATE.value}
logger.info("checking if session is active: %s", result)
return result
def open_xml(self, file_name, start=False):
"""
@ -637,11 +755,31 @@ class FutureSession(Session):
self.set_hook(state, file_name, source_name, data)
def node_service_file(self, node_id, service_name, file_name, source_name, data):
"""
Add a service file for a node.
:param int node_id: node to add service file to
:param str service_name: service file to add
:param str file_name: file name to use
:param str source_name: source file
:param str data: file data to save
:return: nothing
"""
# hack to conform with old logic until updated
service_name = ":%s" % service_name
self.services.setservicefile(node_id, service_name, file_name, source_name, data)
def node_file(self, node_id, source_name, file_name, data):
"""
Add a file to a node.
:param int node_id: node to add file to
:param str source_name: source file name
:param str file_name: file name to add
:param str data: file data
:return: nothing
"""
node = self.get_object(node_id)
if source_name is not None:
@ -650,20 +788,50 @@ class FutureSession(Session):
node.nodefile(file_name, data)
def clear(self):
"""
Clear all CORE session data. (objects, hooks, broker)
:return: nothing
"""
self.delete_objects()
self.del_hooks()
self.broker.reset()
def start_events(self):
"""
Start event loop.
:return: nothing
"""
self.event_loop.run()
def services_event(self, event_data):
"""
Handle a service event.
:param core.data.EventData event_data: event data to handle
:return:
"""
self.services.handleevent(event_data)
def mobility_event(self, event_data):
"""
Handle a mobility event.
:param core.data.EventData event_data: event data to handle
:return: nothing
"""
self.mobility.handleevent(event_data)
def create_node(self, cls, name=None, model=None):
"""
Create a node
:param cls:
:param name:
:param model:
:return:
"""
object_id = self.node_id_gen.next()
if not name:
@ -677,6 +845,12 @@ class FutureSession(Session):
return node
def create_emane_node(self, name=None):
"""
Create an EMANE node for use within an EMANE network.
:param str name: name to five node
:return: CoreNode
"""
return self.create_node(cls=CoreNode, name=name, model="mdr")
def create_emane_network(self, model, geo_reference, geo_scale=None, name=None):
@ -717,25 +891,71 @@ class CoreEmu(object):
"""
def __init__(self, config=None):
"""
Create a CoreEmu object.
:param dict config: configuration options
"""
# configuration
self.config = config
# session management
self.session_id_gen = IdGen()
self.session_id_gen = IdGen(_id=59999)
self.sessions = {}
# load default services
core.services.load()
def create_session(self):
"""
Create a new CORE session.
# catch exit event
atexit.register(self.shutdown)
def shutdown(self):
"""
Shutdown all CORE session.
:return: nothing
"""
logger.info("shutting down all session")
for session in self.sessions.values():
session.shutdown()
self.sessions.clear()
def create_session(self, _id=None, master=False):
"""
Create a new CORE session, set to master if running standalone.
:param int _id: session id for new session
:param bool master: sets session to master
:return: created session
:rtype: FutureSession
"""
session_id = self.session_id_gen.next()
return FutureSession(session_id, config=self.config)
session_id = _id
if not session_id:
while True:
session_id = self.session_id_gen.next()
if session_id not in self.sessions:
break
session = FutureSession(session_id, config=self.config)
logger.info("created session: %s", session_id)
if master:
session.master = True
self.sessions[session_id] = session
return session
def delete_session(self, _id):
"""
Deletes a CORE session.
:param int _id: session id to delete
:return: nothing
"""
logger.info("deleting session: %s", _id)
session = self.sessions.pop(_id, None)
if not session:
logger.error("session to delete did not exist: %s", _id)
def set_wireless_model(self, node, model):
"""

View file

@ -64,6 +64,7 @@ class FutureHandler(SocketServer.BaseRequestHandler):
self.message_queue = Queue.Queue()
self.node_status_request = {}
self._shutdown_lock = threading.Lock()
self._sessions_lock = threading.Lock()
self.handler_threads = []
num_threads = int(server.config["numthreads"])
@ -79,6 +80,9 @@ class FutureHandler(SocketServer.BaseRequestHandler):
self.master = False
self.session = None
# core emulator
self.coreemu = server.coreemu
utils.close_onexec(request.fileno())
SocketServer.BaseRequestHandler.__init__(self, request, client_address, server)
@ -98,12 +102,21 @@ class FutureHandler(SocketServer.BaseRequestHandler):
:return: nothing
"""
logger.info("finishing request handler")
self.done = True
logger.info("remaining message queue size: %s", self.message_queue.qsize())
# seconds
timeout = 10.0
# give some time for message queue to deplete
timeout = 10
wait = 0
while not self.message_queue.empty():
logger.info("waiting for message queue to empty: %s seconds", wait)
time.sleep(1)
wait += 1
if wait == timeout:
logger.warn("queue failed to be empty, finishing request handler")
break
logger.info("client disconnected: notifying threads")
self.done = True
for thread in self.handler_threads:
logger.info("waiting for thread: %s", thread.getName())
thread.join(timeout)
@ -112,16 +125,84 @@ class FutureHandler(SocketServer.BaseRequestHandler):
logger.info("connection closed: %s", self.client_address)
if self.session:
self.remove_session_handlers()
# remove client from session broker and shutdown if there are no clients
self.remove_session_handlers()
self.session.broker.session_clients.remove(self)
if not self.session.broker.session_clients:
logger.info("no session clients left, initiating shutdown")
if not self.session.broker.session_clients and not self.session.is_active():
logger.info("no session clients left and not active, initiating shutdown")
self.session.shutdown()
self.coreemu.delete_session(self.session.session_id)
return SocketServer.BaseRequestHandler.finish(self)
def session_message(self, flags=0):
"""
Build CORE API Sessions message based on current session info.
:param int flags: message flags
:return: session message
"""
id_list = []
name_list = []
file_list = []
node_count_list = []
date_list = []
thumb_list = []
num_sessions = 0
logger.info("creating session message: %s", self.coreemu.sessions.keys())
with self._sessions_lock:
for session_id, session in self.coreemu.sessions.iteritems():
num_sessions += 1
id_list.append(str(session_id))
name = session.name
if not name:
name = ""
name_list.append(name)
file = session.file_name
if not file:
file = ""
file_list.append(file)
node_count_list.append(str(session.get_node_count()))
date_list.append(time.ctime(session._state_time))
thumb = session.thumbnail
if not thumb:
thumb = ""
thumb_list.append(thumb)
session_ids = "|".join(id_list)
names = "|".join(name_list)
files = "|".join(file_list)
node_counts = "|".join(node_count_list)
dates = "|".join(date_list)
thumbs = "|".join(thumb_list)
if num_sessions > 0:
tlv_data = ""
if len(session_ids) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NUMBER.value, session_ids)
if len(names) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NAME.value, names)
if len(files) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.FILE.value, files)
if len(node_counts) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NODE_COUNT.value, node_counts)
if len(dates) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.DATE.value, dates)
if len(thumbs) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.THUMB.value, thumbs)
message = coreapi.CoreSessionMessage.pack(flags, tlv_data)
else:
message = None
return message
def handle_broadcast_event(self, event_data):
"""
Callback to handle an event broadcast out from a session.
@ -407,8 +488,11 @@ class FutureHandler(SocketServer.BaseRequestHandler):
:return: nothing
"""
while not self.done:
message = self.message_queue.get()
self.handle_message(message)
try:
message = self.message_queue.get(timeout=1)
self.handle_message(message)
except Queue.Empty:
pass
def handle_message(self, message):
"""
@ -467,6 +551,9 @@ class FutureHandler(SocketServer.BaseRequestHandler):
except IOError:
logger.exception("error dispatching reply")
def session_shutdown(self, session):
self.coreemu.delete_session(session.session_id)
def handle(self):
"""
Handle a new connection request from a client. Dispatch to the
@ -478,8 +565,10 @@ class FutureHandler(SocketServer.BaseRequestHandler):
# use port as session id
port = self.request.getpeername()[1]
logger.info("creating new session for client: %s", port)
self.session = self.server.create_session(session_id=port)
# TODO: add shutdown handler for session
self.session = self.coreemu.create_session(port)
# self.session.shutdown_handlers.append(self.session_shutdown)
logger.info("created new session for client: %s", self.session.session_id)
# TODO: hack to associate this handler with this sessions broker for broadcasting
# TODO: broker needs to be pulled out of session to the server/handler level
@ -735,22 +824,22 @@ class FutureHandler(SocketServer.BaseRequestHandler):
try:
logger.info("executing: %s", execute_server)
if message.flags & MessageFlags.STRING.value:
old_session_ids = set(self.server.get_session_ids())
old_session_ids = set(self.coreemu.sessions.keys())
sys.argv = shlex.split(execute_server)
file_name = sys.argv[0]
if os.path.splitext(file_name)[1].lower() == ".xml":
session = self.server.create_session()
session = self.coreemu.create_session()
try:
session.open_xml(file_name, start=True)
except:
session.shutdown()
self.server.remove_session(session)
self.coreemu.delete_session(session.session_id)
raise
else:
thread = threading.Thread(
target=execfile,
args=(file_name, {"__file__": file_name, "server": self.server})
args=(file_name, {"__file__": file_name, "coreemu": self.coreemu})
)
thread.daemon = True
thread.start()
@ -758,7 +847,7 @@ class FutureHandler(SocketServer.BaseRequestHandler):
time.sleep(0.25)
if message.flags & MessageFlags.STRING.value:
new_session_ids = set(self.server.get_session_ids())
new_session_ids = set(self.coreemu.sessions.keys())
new_sid = new_session_ids.difference(old_session_ids)
try:
sid = new_sid.pop()
@ -767,17 +856,17 @@ class FutureHandler(SocketServer.BaseRequestHandler):
logger.info("executed %s with unknown session ID", execute_server)
return replies
logger.info("checking session %d for RUNTIME state" % sid)
session = self.server.get_session(session_id=sid)
logger.info("checking session %d for RUNTIME state", sid)
session = self.coreemu.sessions.get(sid)
retries = 10
# wait for session to enter RUNTIME state, to prevent GUI from
# connecting while nodes are still being instantiated
while session.state != EventTypes.RUNTIME_STATE.value:
logger.info("waiting for session %d to enter RUNTIME state" % sid)
logger.info("waiting for session %d to enter RUNTIME state", sid)
time.sleep(1)
retries -= 1
if retries <= 0:
logger.info("session %d did not enter RUNTIME state" % sid)
logger.info("session %d did not enter RUNTIME state", sid)
return replies
tlv_data = coreapi.CoreRegisterTlv.pack(RegisterTlvs.EXECUTE_SERVER.value, execute_server)
@ -800,17 +889,15 @@ class FutureHandler(SocketServer.BaseRequestHandler):
# register capabilities with the GUI
self.master = True
# TODO: need to replicate functionality?
# self.server.set_session_master(self)
# find the session containing this client and set the session to master
for session in self.server.sessions.itervalues():
for session in self.coreemu.sessions.itervalues():
if self in session.broker.session_clients:
logger.info("setting session to master: %s", session.session_id)
session.master = True
break
replies.append(self.register())
replies.append(self.server.to_session_message())
replies.append(self.session_message())
return replies
@ -1040,7 +1127,7 @@ class FutureHandler(SocketServer.BaseRequestHandler):
if session_id == 0:
session = self.session
else:
session = self.server.get_session(session_id=session_id)
session = self.coreemu.sessions.get(session_id)
if session is None:
logger.info("session %s not found", session_id)
@ -1060,12 +1147,12 @@ class FutureHandler(SocketServer.BaseRequestHandler):
session.set_user(user)
elif message.flags & MessageFlags.STRING.value and not message.flags & MessageFlags.ADD.value:
# status request flag: send list of sessions
return self.server.to_session_message(),
return self.session_message(),
else:
# handle ADD or DEL flags
for session_id in session_ids:
session_id = int(session_id)
session = self.server.get_session(session_id=session_id)
session = self.coreemu.sessions.get(session_id)
if session is None:
logger.info("session %s not found (flags=0x%x)", session_id, message.flags)
@ -1073,12 +1160,14 @@ class FutureHandler(SocketServer.BaseRequestHandler):
if message.flags & MessageFlags.ADD.value:
# connect to the first session that exists
logger.info("request to connect to session %s" % session_id)
logger.info("request to connect to session %s", session_id)
# remove client from session broker and shutdown if needed
self.remove_session_handlers()
self.session.broker.session_clients.remove(self)
if not self.session.broker.session_clients and not self.session.is_active():
self.session.shutdown()
self.coreemu.delete_session(self.session.session_id)
# set session to join
self.session = session

View file

@ -3,14 +3,8 @@ Defines server classes and request handlers for TCP and UDP.
"""
import SocketServer
import threading
import time
from core import logger
from core.api import coreapi
from core.enumerations import EventTypes
from core.enumerations import SessionTlvs
from core.future.coreemu import FutureSession
from core.future.coreemu import CoreEmu
class FutureServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
@ -20,7 +14,6 @@ class FutureServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
"""
daemon_threads = True
allow_reuse_address = True
servers = set()
def __init__(self, server_address, handler_class, config=None):
"""
@ -32,35 +25,10 @@ class FutureServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
:param dict config: configuration setting
:return:
"""
self.coreemu = CoreEmu(config)
self.config = config
self.sessions = {}
self.udpserver = None
self.udpthread = None
self._sessions_lock = threading.Lock()
FutureServer.add_server(self)
SocketServer.TCPServer.__init__(self, server_address, handler_class)
@classmethod
def add_server(cls, server):
"""
Add a core server to the known servers set.
:param CoreServer server: server to add
:return: nothing
"""
cls.servers.add(server)
@classmethod
def remove_server(cls, server):
"""
Remove a core server from the known servers set.
:param CoreServer server: server to remove
:return: nothing
"""
if server in cls.servers:
cls.servers.remove(server)
def shutdown(self):
"""
Shutdown the server, all known sessions, and remove server from known servers set.
@ -68,196 +36,5 @@ class FutureServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
:return: nothing
"""
# shutdown all known sessions
for session in self.sessions.values():
for session in self.coreemu.sessions.itervalues():
session.shutdown()
# remove server from server list
FutureServer.remove_server(self)
def add_session(self, session):
"""
Add a session to our dictionary of sessions, ensuring a unique session number.
:param core.session.Session session: session to add
:return: added session
:raise KeyError: when a session with the same id already exists
"""
with self._sessions_lock:
if session.session_id in self.sessions:
raise KeyError("non-unique session id %s for %s" % (session.session_id, session))
self.sessions[session.session_id] = session
return session
def remove_session(self, session):
"""
Remove a session from our dictionary of sessions.
:param core.session.Session session: session to remove
:return: removed session
:rtype: core.session.Session
"""
with self._sessions_lock:
if session.session_id not in self.sessions:
logger.info("session id %s not found (sessions=%s)", session.session_id, self.sessions.keys())
else:
del self.sessions[session.session_id]
return session
def get_session_ids(self):
"""
Return a list of active session numbers.
:return: known session ids
:rtype: list
"""
with self._sessions_lock:
session_ids = self.sessions.keys()
return session_ids
def create_session(self, session_id=None):
"""
Convenience method for creating sessions with the servers config.
:param int session_id: session id for new session
:return: create session
:rtype: FutureSession
"""
# create random id when necessary, seems to be 1 case wanted, based on legacy code
# creating a value so high, typical client side generation schemes hopefully wont collide
if not session_id:
session_id = next(
session_id for session_id in xrange(60000, 65000)
if session_id not in self.sessions
)
# create and add session to local manager
session = FutureSession(session_id, config=self.config)
self.add_session(session)
# add shutdown handler to remove session from manager
session.shutdown_handlers.append(self.session_shutdown)
return session
def get_session(self, session_id=None):
"""
Create a new session or retrieve an existing one from our
dictionary of sessions. When the session_id=0 and the use_existing
flag is set, return on of the existing sessions.
:param int session_id: session id of session to retrieve, defaults to returning random session
:return: session
:rtype: core.session.Session
"""
with self._sessions_lock:
# return specified session or none
if session_id:
return self.sessions.get(session_id)
# retrieving known session
session = None
# find runtime session with highest node count
for known_session in filter(lambda x: x.state == EventTypes.RUNTIME_STATE.value,
self.sessions.itervalues()):
if not session or known_session.get_node_count() > session.get_node_count():
session = known_session
# return first known session otherwise
if not session:
for known_session in self.sessions.itervalues():
session = known_session
break
return session
def session_shutdown(self, session):
"""
Handler method to be used as a callback when a session has shutdown.
:param core.session.Session session: session shutting down
:return: nothing
"""
self.remove_session(session)
def to_session_message(self, flags=0):
"""
Build CORE API Sessions message based on current session info.
:param int flags: message flags
:return: session message
"""
id_list = []
name_list = []
file_list = []
node_count_list = []
date_list = []
thumb_list = []
num_sessions = 0
with self._sessions_lock:
for session_id in self.sessions:
session = self.sessions[session_id]
# debug: session.dumpsession()
num_sessions += 1
id_list.append(str(session_id))
name = session.name
if not name:
name = ""
name_list.append(name)
file = session.file_name
if not file:
file = ""
file_list.append(file)
node_count_list.append(str(session.get_node_count()))
date_list.append(time.ctime(session._state_time))
thumb = session.thumbnail
if not thumb:
thumb = ""
thumb_list.append(thumb)
session_ids = "|".join(id_list)
names = "|".join(name_list)
files = "|".join(file_list)
node_counts = "|".join(node_count_list)
dates = "|".join(date_list)
thumbs = "|".join(thumb_list)
if num_sessions > 0:
tlv_data = ""
if len(session_ids) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NUMBER.value, session_ids)
if len(names) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NAME.value, names)
if len(files) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.FILE.value, files)
if len(node_counts) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NODE_COUNT.value, node_counts)
if len(dates) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.DATE.value, dates)
if len(thumbs) > 0:
tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.THUMB.value, thumbs)
message = coreapi.CoreSessionMessage.pack(flags, tlv_data)
else:
message = None
return message
def dump_sessions(self):
"""
Log currently known session information.
"""
logger.info("sessions:")
with self._sessions_lock:
for session_id in self.sessions:
logger.info(session_id)

View file

@ -17,6 +17,7 @@ from core.coreobj import PyCoreNode
from core.enumerations import NodeTypes
from core.misc import nodeutils
from core.misc import utils
from core.misc.ipaddress import MacAddress
from core.netns import vnodeclient
from core.netns.vif import TunTap
from core.netns.vif import VEth
@ -290,7 +291,7 @@ class SimpleLxcNode(PyCoreNode):
output = output.split("\n")
veth.flow_id = int(output[0].strip().split(":")[0]) + 1
logger.info("interface flow index: %s - %s", veth.name, veth.flow_id)
veth.hwaddr = output[1].strip().split()[1]
veth.hwaddr = MacAddress.from_string(output[1].strip().split()[1])
logger.info("interface mac: %s - %s", veth.name, veth.hwaddr)
try:

View file

@ -0,0 +1,75 @@
#!/usr/bin/python
#
# run iperf to measure the effective throughput between two nodes when
# n nodes are connected to a virtual wlan; run test for testsec
# and repeat for minnodes <= n <= maxnodes with a step size of
# nodestep
import datetime
import parser
from core.data import NodeData, LinkData
from core.enumerations import NodeTypes, EventTypes
from core.future.coreemu import FutureIpv4Prefix, CoreEmu
def example(options):
# ip generator for example
prefix = FutureIpv4Prefix("10.83.0.0/16")
# create emulator instance for creating sessions and utility methods
coreemu = CoreEmu()
session = coreemu.create_session(master=True)
# must be in configuration state for nodes to start, when using "node_add" below
session.set_state(EventTypes.CONFIGURATION_STATE.value)
# create switch network node
node_data = NodeData(node_type=NodeTypes.SWITCH.value)
switch_id = session.node_add(node_data)
# create nodes
for _ in xrange(options.nodes):
node_data = NodeData(node_type=NodeTypes.DEFAULT.value)
node_id = session.node_add(node_data)
node = session.get_object(node_id)
inteface_index = node.newifindex()
address = prefix.addr(node_id)
link_data = LinkData(
node1_id=node_id,
node2_id=switch_id,
interface1_id=inteface_index,
interface1_ip4=str(address),
interface1_ip4_mask=prefix.prefixlen,
)
session.link_add(link_data)
# instantiate session
session.instantiate()
# get nodes to run example
first_node = session.get_object(2)
last_node = session.get_object(options.nodes + 1)
print "starting iperf server on node: %s" % first_node.name
first_node.cmd(["iperf", "-s", "-D"])
address = str(prefix.addr(first_node.objid))
print "node %s connecting to %s" % (last_node.name, address)
last_node.client.icmd(["iperf", "-t", str(options.time), "-c", address])
first_node.cmd(["killall", "-9", "iperf"])
# shutdown session
coreemu.shutdown()
def main():
options = parser.parse_options("switch")
start = datetime.datetime.now()
print "running switch example: nodes(%s) time(%s)" % (options.nodes, options.time)
example(options)
print "elapsed time: %s" % (datetime.datetime.now() - start)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,52 @@
#!/usr/bin/python
#
# run iperf to measure the effective throughput between two nodes when
# n nodes are connected to a virtual wlan; run test for testsec
# and repeat for minnodes <= n <= maxnodes with a step size of
# nodestep
import datetime
import parser
from core.data import NodeData, LinkData
from core.enumerations import NodeTypes, EventTypes
from core.future.coreemu import FutureIpv4Prefix, CoreEmu
def example(nodes):
# ip generator for example
prefix = FutureIpv4Prefix("10.83.0.0/16")
# create emulator instance for creating sessions and utility methods
coreemu = globals()["coreemu"]
session = coreemu.create_session(master=True)
# must be in configuration state for nodes to start, when using "node_add" below
session.set_state(EventTypes.CONFIGURATION_STATE.value)
# create switch network node
node_data = NodeData(node_type=NodeTypes.SWITCH.value)
switch_id = session.node_add(node_data)
# create nodes
for _ in xrange(nodes):
node_data = NodeData(node_type=NodeTypes.DEFAULT.value)
node_id = session.node_add(node_data)
node = session.get_object(node_id)
inteface_index = node.newifindex()
address = prefix.addr(node_id)
link_data = LinkData(
node1_id=node_id,
node2_id=switch_id,
interface1_id=inteface_index,
interface1_ip4=str(address),
interface1_ip4_mask=prefix.prefixlen,
)
session.link_add(link_data)
# instantiate session
session.instantiate()
if __name__ in {"__main__", "__builtin__"}:
example(2)

View file

@ -6,14 +6,11 @@ message handlers are defined and some support for sending messages.
"""
import ConfigParser
import atexit
import optparse
import signal
import sys
import time
from core import constants
from core import coreserver
from core import enumerations
from core import logger
from core import services
@ -56,38 +53,6 @@ def cored(cfg=None):
server.serve_forever()
# TODO: should sessions and the main core daemon both catch exit to shutdown independently?
def cleanup():
"""
Runs server shutdown and cleanup when catching an exit signal.
:return: nothing
"""
while coreserver.CoreServer.servers:
server = coreserver.CoreServer.servers.pop()
server.shutdown()
def sighandler(signum, stackframe):
"""
Signal handler when different signals are sent.
:param int signum: singal number sent
:param stackframe: stack frame sent
:return: nothing
"""
logger.error("terminated by signal: %s", signum)
sys.exit(signum)
signal.signal(signal.SIGHUP, sighandler)
signal.signal(signal.SIGINT, sighandler)
signal.signal(signal.SIGTERM, sighandler)
signal.signal(signal.SIGUSR1, sighandler)
signal.signal(signal.SIGUSR2, sighandler)
atexit.register(cleanup)
def get_merged_config(filename):
"""
Return a configuration after merging config file and command-line arguments.