attempt to fix missing updates for sdt, moved broker specific code to broker

This commit is contained in:
Blake J. Harnden 2017-08-07 15:37:41 -07:00
parent 8612c73d49
commit 73eea80f51
11 changed files with 146 additions and 139 deletions

View file

@ -34,11 +34,11 @@ from core.netns.vnet import GreTapBridge
from core.phys.pnodes import PhysicalNode
# TODO: name conflict with main core server, probably should rename
class CoreServer(object):
class CoreDistributedServer(object):
"""
Represents CORE daemon servers for communication.
"""
def __init__(self, name, host, port):
"""
Creates a CoreServer instance.
@ -121,6 +121,7 @@ class CoreBroker(ConfigurableManager):
self.physical_nodes = set()
# allows for other message handlers to process API messages (e.g. EMANE)
self.handlers = set()
self.handlers.add(self.handle_distributed)
# dict with tunnel key to tunnel device mapping
self.tunnels = {}
self.dorecvloop = False
@ -223,7 +224,7 @@ class CoreBroker(ConfigurableManager):
and forwarded. Return value of zero indicates the socket has closed
and should be removed from the self.servers dict.
:param CoreServer server: server to receive from
:param CoreDistributedServer server: server to receive from
:return: message length
:rtype: int
"""
@ -302,7 +303,7 @@ class CoreBroker(ConfigurableManager):
del self.servers[name]
logger.info("adding server: %s @ %s:%s" % (name, host, port))
server = CoreServer(name, host, port)
server = CoreDistributedServer(name, host, port)
if host is not None and port is not None:
try:
server.connect()
@ -316,7 +317,7 @@ class CoreBroker(ConfigurableManager):
"""
Remove a server and hang up any connection.
:param CoreServer server: server to delete
:param CoreDistributedServer server: server to delete
:return: nothing
"""
with self.servers_lock:
@ -336,7 +337,7 @@ class CoreBroker(ConfigurableManager):
:param str name: name of server to retrieve
:return: server for given name
:rtype: CoreServer
:rtype: CoreDistributedServer
"""
with self.servers_lock:
return self.servers.get(name)
@ -347,7 +348,7 @@ class CoreBroker(ConfigurableManager):
:param sock: socket associated with a server
:return: core server associated wit the socket
:rtype: CoreServer
:rtype: CoreDistributedServer
"""
with self.servers_lock:
for server in self.servers.itervalues():
@ -540,7 +541,7 @@ class CoreBroker(ConfigurableManager):
"""
Record a node number to emulation server mapping.
:param CoreServer server: core server to associate node with
:param CoreDistributedServer server: core server to associate node with
:param int nodenum: node id
:return: nothing
"""
@ -562,7 +563,7 @@ class CoreBroker(ConfigurableManager):
Remove a node number to emulation server mapping.
Return the number of nodes left on this server.
:param CoreServer server: server to remove from node map
:param CoreDistributedServer server: server to remove from node map
:param int nodenum: node id
:return: number of nodes left on server
:rtype: int
@ -1056,7 +1057,7 @@ class CoreBroker(ConfigurableManager):
VnodeClient class.
:param str nodestr: node string
:param CoreServer server: core server
:param CoreDistributedServer server: core server
:return: nothing
"""
serverstr = "%s %s %s" % (server.name, server.host, server.port)
@ -1107,3 +1108,62 @@ class CoreBroker(ConfigurableManager):
if not server.instantiation_complete:
return False
return True
def handle_distributed(self, message):
"""
Handle the session options config message as it has reached the
broker. Options requiring modification for distributed operation should
be handled here.
:param message: message to handle
:return: nothing
"""
if not self.session.master:
return
if message.message_type != MessageTypes.CONFIG.value or message.get_tlv(ConfigTlvs.OBJECT.value) != "session":
return
values_str = message.get_tlv(ConfigTlvs.VALUES.value)
if values_str is None:
return
value_strings = values_str.split('|')
for value_string in value_strings:
key, value = value_string.split('=', 1)
if key == "controlnet":
self.handle_distributed_control_net(message, value_strings, value_strings.index(value_string))
def handle_distributed_control_net(self, message, values, index):
"""
Modify Config Message if multiple control network prefixes are
defined. Map server names to prefixes and repack the message before
it is forwarded to slave servers.
:param message: message to handle
:param list values: values to handle
:param int index: index ti get key value from
:return: nothing
"""
key_value = values[index]
key, value = key_value.split('=', 1)
control_nets = value.split()
if len(control_nets) < 2:
logger.warn("multiple controlnet prefixes do not exist")
return
servers = self.session.broker.getservernames()
if len(servers) < 2:
logger.warn("not distributed")
return
servers.remove("localhost")
# master always gets first prefix
servers.insert(0, "localhost")
# create list of "server1:ctrlnet1 server2:ctrlnet2 ..."
control_nets = map(lambda x: "%s:%s" % (x[0], x[1]), zip(servers, control_nets))
values[index] = "controlnet=%s" % (" ".join(control_nets))
values_str = "|".join(values)
message.tlvdata[ConfigTlvs.VALUES.value] = values_str
message.repack()

View file

@ -301,7 +301,6 @@ class Configurable(object):
:return: tuple of default values
:rtype: tuple
"""
# TODO: why the need for a tuple?
return tuple(map(lambda x: x[2], cls.config_matrix))
@classmethod
@ -312,7 +311,6 @@ class Configurable(object):
:return: tuple of name values
:rtype: tuple
"""
# TODO: why the need for a tuple?
return tuple(map(lambda x: x[0], cls.config_matrix))
@classmethod

View file

@ -945,8 +945,7 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
Remove a link.
"""
if node1 and node2:
# TODO: fix this for the case where ifindex[1,2] are
# not specified
# TODO: fix this for the case where ifindex[1,2] are not specified
# a wired unlink event, delete the connecting bridge
netif1 = node1.netif(interface_index1)
netif2 = node2.netif(interface_index2)
@ -1211,7 +1210,7 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
# find the session containing this client and set the session to master
for session in self.server.sessions.itervalues():
if self in session.broker.session_clients:
logger.info("SESSION SET TO MASTER!: %s", session.session_id)
logger.info("setting session to master: %s", session.session_id)
session.master = True
break
@ -1746,8 +1745,6 @@ class BaseAuxRequestHandler(CoreRequestHandler):
messages = self.receive_message()
if messages:
for message in messages:
# TODO: do we really want to broadcast node and link messages from a client to other clients?
# self.session.broadcast(self, message)
self.handle_message(message)
except EOFError:
break

View file

@ -190,18 +190,21 @@ class PyCoreObj(object):
self.ifindex += 1
return ifindex
def data(self, message_type):
def data(self, message_type, lat=None, lon=None, alt=None):
"""
Build a data object for this node.
:param message_type: purpose for the data object we are creating
:param float lat: latitude
:param float lon: longitude
:param float alt: altitude
:return: node data object
:rtype: core.data.NodeData
"""
if self.apitype is None:
return None
x, y, z = self.getposition()
x, y, _ = self.getposition()
model = None
if hasattr(self, "type"):
@ -229,6 +232,9 @@ class PyCoreObj(object):
opaque=self.opaque,
x_position=x,
y_position=y,
latitude=lat,
longitude=lon,
altitude=alt,
model=model,
emulation_server=emulation_server,
services=services
@ -254,7 +260,6 @@ class PyCoreNode(PyCoreObj):
Base class for CORE nodes.
"""
# TODO: start seems like it should go away
def __init__(self, session, objid=None, name=None, start=True):
"""
Create a PyCoreNode instance.
@ -414,7 +419,6 @@ class PyCoreNet(PyCoreObj):
"""
linktype = LinkTypes.WIRED.value
# TODO: remove start if appropriate
def __init__(self, session, objid, name, start=True):
"""
Create a PyCoreNet instance.
@ -502,8 +506,6 @@ class PyCoreNet(PyCoreObj):
interface2_ip6 = ipaddress.IpAddress(af=family, address=ipl)
interface2_ip6_mask = mask
# TODO: not currently used
# loss = netif.getparam('loss')
link_data = LinkData(
message_type=flags,
node1_id=self.objid,

View file

@ -1094,25 +1094,25 @@ class EmaneManager(ConfigurableManager):
alt = attrs["altitude"]
self.handlelocationeventtoxyz(txnemid, lat, long, alt)
def handlelocationeventtoxyz(self, nemid, lat, long, alt):
def handlelocationeventtoxyz(self, nemid, lat, lon, alt):
"""
Convert the (NEM ID, lat, long, alt) from a received location event
into a node and x,y,z coordinate values, sending a Node Message.
Returns True if successfully parsed and a Node Message was sent.
"""
# convert nemid to node number
(emanenode, netif) = self.nemlookup(nemid)
emanenode, netif = self.nemlookup(nemid)
if netif is None:
logger.info("location event for unknown NEM %s" % nemid)
logger.info("location event for unknown NEM %s", nemid)
return False
n = netif.node.objid
# convert from lat/long/alt to x,y,z coordinates
x, y, z = self.session.location.getxyz(lat, long, alt)
x, y, z = self.session.location.getxyz(lat, lon, alt)
x = int(x)
y = int(y)
z = int(z)
logger.info("location event NEM %s (%s, %s, %s) -> (%s, %s, %s)",
nemid, lat, long, alt, x, y, z)
nemid, lat, lon, alt, x, y, z)
try:
if (x.bit_length() > 16) or (y.bit_length() > 16) or \
(z.bit_length() > 16) or (x < 0) or (y < 0) or (z < 0):
@ -1123,7 +1123,7 @@ class EmaneManager(ConfigurableManager):
return False
except AttributeError:
# int.bit_length() not present on Python 2.6
logger.exception("error wusing bit_length")
logger.exception("error using bit_length")
# generate a node message for this location update
try:
@ -1134,12 +1134,9 @@ class EmaneManager(ConfigurableManager):
# don"t use node.setposition(x,y,z) which generates an event
node.position.set(x, y, z)
node_data = node.data(message_type=0)
node_data = node.data(message_type=0, lat=lat, lon=lon, alt=alt)
self.session.broadcast_node(node_data)
# TODO: determinehow to add SDT handlers
# self.session.sdt.updatenodegeo(node.objid, lat, long, alt)
return True
def emanerunning(self, node):

View file

@ -607,9 +607,6 @@ class BasicRangeModel(WirelessModel):
link_data = self.create_link_data(netif, netif2, message_type)
self.session.broadcast_link(link_data)
# TODO: account for SDT wanting to listen as well
# self.session.sdt.updatelink(netif.node.objid, netif2.node.objid, flags, wireless=True)
def all_link_data(self, flags):
"""
Return a list of wireless link messages for when the GUI reconnects.
@ -913,9 +910,6 @@ class WayPointMobility(WirelessModel):
node_data = node.data(message_type=0)
self.session.broadcast_node(node_data)
# TODO: determine how to add handler for SDT
# self.session.sdt.updatenode(node.objid, flags=0, x=x, y=y, z=z)
def setendtime(self):
"""
Set self.endtime to the time of the last waypoint in the queue of

View file

@ -249,8 +249,6 @@ class PtpNet(LxBrNet):
interface2_ip6 = ipaddress.IpAddress(af=family, address=ipl)
interface2_ip6_mask = mask
# TODO: not currently used
# loss=netif.getparam("loss")
link_data = LinkData(
message_type=flags,
node1_id=if1.node.objid,
@ -510,7 +508,6 @@ class RJ45Node(PyCoreNode, PyCoreNetIf):
"""
PyCoreNetIf.detachnet(self)
# TODO: parameters are not used
def newnetif(self, net=None, addrlist=None, hwaddr=None, ifindex=None, ifname=None):
"""
This is called when linking with another node. Since this node

View file

@ -71,6 +71,42 @@ class Sdt(object):
self.remotes = {}
session.broker.handlers.add(self.handle_distributed)
# add handler for node updates
self.session.node_handlers.append(self.handle_node_update)
# add handler for link updates
self.session.link_handlers.append(self.handle_link_update)
def handle_node_update(self, node_data):
"""
Handler for node updates, specifically for updating their location.
:param core.data.NodeData node_data: node data being updated
:return: nothing
"""
x = node_data.x_position
y = node_data.y_position
lat = node_data.latitude
lon = node_data.longitude
alt = node_data.altitude
if all([lat, lon, alt]):
self.updatenodegeo(node_data.id, node_data.latitude, node_data.longitude, node_data.altitude)
if node_data.message_type == 0:
# TODO: z is not currently supported by node messages
self.updatenode(node_data.id, 0, x, y, 0)
def handle_link_update(self, link_data):
"""
Handler for link updates, checking for wireless link/unlink messages.
:param core.data.LinkData link_data: link data being updated
:return: nothing
"""
if link_data.link_type == LinkTypes.WIRELESS.value:
self.updatelink(link_data.node1_id, link_data.node2_id, link_data.message_type, wireless=True)
def is_enabled(self):
"""
Check for "enablesdt" session option. Return False by default if
@ -180,6 +216,7 @@ class Sdt(object):
:return: nothing
"""
logger.info("SDT shutdown!")
self.cmd("clear all")
self.disconnect()
self.showerror = True
@ -227,8 +264,8 @@ class Sdt(object):
return
if x is None or y is None:
return
lat, long, alt = self.session.location.getgeo(x, y, z)
pos = "pos %.6f,%.6f,%.6f" % (long, lat, alt)
lat, lon, alt = self.session.location.getgeo(x, y, z)
pos = "pos %.6f,%.6f,%.6f" % (lon, lat, alt)
if flags & MessageFlags.ADD.value:
if icon is not None:
type = name
@ -327,7 +364,6 @@ class Sdt(object):
for n2num, wl in r.links:
self.updatelink(n1num, n2num, MessageFlags.ADD.value, wl)
# TODO: remove the need for this
def handle_distributed(self, message):
"""
Broker handler for processing CORE API messages as they are
@ -342,7 +378,6 @@ class Sdt(object):
elif message.message_type == MessageTypes.NODE.value:
return self.handlenodemsg(message)
# TODO: remove the need for this
def handlenodemsg(self, msg):
"""
Process a Node Message to add/delete or move a node on
@ -403,7 +438,6 @@ class Sdt(object):
remote.pos = (x, y, z)
self.updatenode(nodenum, msg.flags, x, y, z, name, type, icon)
# TODO: remove the need for this
def handlelinkmsg(self, msg):
"""
Process a Link Message to add/remove links on the SDT display.

View file

@ -867,7 +867,8 @@ class CoreServices(ConfigurableManager):
if len(cfgfiles) > 0:
for filename in cfgfiles:
if filename[:7] == "file:///":
raise NotImplementedError # TODO
# TODO: implement this
raise NotImplementedError
cfg = self.getservicefiledata(s, filename)
if cfg is None:
cfg = s.generateconfig(node, filename, services)

View file

@ -27,13 +27,11 @@ from core.data import EventData
from core.data import ExceptionData
from core.data import FileData
from core.emane.emanemanager import EmaneManager
from core.enumerations import ConfigDataTypes, EventTlvs
from core.enumerations import ConfigDataTypes
from core.enumerations import ConfigFlags
from core.enumerations import ConfigTlvs
from core.enumerations import EventTypes
from core.enumerations import ExceptionLevels
from core.enumerations import MessageFlags
from core.enumerations import MessageTypes
from core.enumerations import NodeTypes
from core.enumerations import RegisterTlvs
from core.location import CoreLocation
@ -156,6 +154,15 @@ class Session(object):
self.master = False
# handlers for broadcasting information
self.event_handlers = []
self.exception_handlers = []
self.node_handlers = []
self.link_handlers = []
self.file_handlers = []
self.config_handlers = []
self.shutdown_handlers = []
# setup broker
self.broker = CoreBroker(session=self)
self.add_config_object(CoreBroker.name, CoreBroker.config_type, self.broker.configure)
@ -192,15 +199,6 @@ class Session(object):
self.metadata = SessionMetaData()
self.add_config_object(SessionMetaData.name, SessionMetaData.config_type, self.metadata.configure)
# handlers for broadcasting information
self.event_handlers = []
self.exception_handlers = []
self.node_handlers = []
self.link_handlers = []
self.file_handlers = []
self.config_handlers = []
self.shutdown_handlers = []
def shutdown(self):
"""
Shutdown all emulation objects and remove the session directory.
@ -326,10 +324,6 @@ class Session(object):
event_data = EventData(event_type=state, time="%s" % time.time())
self.broadcast_event(event_data)
# also inform slave servers
# TODO: deal with broker, potentially broker should really live within the core server/handlers
# self.broker.handlerawmsg(message)
def write_state(self, state):
"""
Write the current state to a state file in the session dir.
@ -915,8 +909,7 @@ class Session(object):
if node_count == 0:
shutdown = True
self.set_state(state=EventTypes.SHUTDOWN_STATE.value)
# TODO: this seems redundant as it occurs during shutdown as well
self.sdt.shutdown()
return shutdown
def short_session_id(self):
@ -935,16 +928,13 @@ class Session(object):
"""
with self._objects_lock:
for obj in self.objects.itervalues():
# TODO: determine instance type we need to check, due to method issue below
# TODO: PyCoreNode is not the type to check, but there are two types, due to bsd and netns
if isinstance(obj, nodes.PyCoreNode) and not nodeutils.is_node(obj, NodeTypes.RJ45):
# add a control interface if configured
logger.info("booting node: %s - %s", obj.objid, obj.name)
self.add_remove_control_interface(node=obj, remove=False)
obj.boot()
# TODO(blake): send node emu ids back
# self.sendnodeemuid(obj.objid)
self.update_control_interface_hosts()
def validate_nodes(self):
@ -955,7 +945,7 @@ class Session(object):
"""
with self._objects_lock:
for obj in self.objects.itervalues():
# TODO: this can be extended to validate everything, bad node check here as well
# TODO: issues with checking PyCoreNode alone, validate is not a method
# such as vnoded process, bridges, etc.
if not isinstance(obj, nodes.PyCoreNode):
continue
@ -1015,7 +1005,7 @@ class Session(object):
return -1
def get_control_net_object(self, net_index):
# TODO: all nodes use an integer id and now this wants to use a string =(
# TODO: all nodes use an integer id and now this wants to use a string
object_id = "ctrl%dnet" % net_index
return self.get_object(object_id)
@ -1123,7 +1113,8 @@ class Session(object):
updown_script=updown_script, serverintf=server_interface)
# tunnels between controlnets will be built with Broker.addnettunnels()
# TODO: potentialy remove documentation saying object ids are ints
# TODO: potentially remove documentation saying object ids are ints
# TODO: need to move broker code out of the session object
self.broker.addnet(object_id)
for server in self.broker.getservers():
self.broker.addnodemap(server, object_id)
@ -1377,7 +1368,6 @@ class SessionConfig(ConfigurableManager, Configurable):
"""
ConfigurableManager.__init__(self)
self.session = session
self.session.broker.handlers.add(self.handle_distributed)
self.reset()
def reset(self):
@ -1423,70 +1413,6 @@ class SessionConfig(ConfigurableManager, Configurable):
return self.config_data(0, node_id, type_flags, values)
# TODO: update logic to not be tied to old style messages
def handle_distributed(self, message):
"""
Handle the session options config message as it has reached the
broker. Options requiring modification for distributed operation should
be handled here.
:param message: message to handle
:return: nothing
"""
if not self.session.master:
return
if message.message_type != MessageTypes.CONFIG.value or message.get_tlv(ConfigTlvs.OBJECT.value) != "session":
return
values_str = message.get_tlv(ConfigTlvs.VALUES.value)
if values_str is None:
return
value_strings = values_str.split('|')
if not self.haskeyvalues(value_strings):
return
for value_string in value_strings:
key, value = value_string.split('=', 1)
if key == "controlnet":
self.handle_distributed_control_net(message, value_strings, value_strings.index(value_string))
# TODO: update logic to not be tied to old style messages
def handle_distributed_control_net(self, message, values, index):
"""
Modify Config Message if multiple control network prefixes are
defined. Map server names to prefixes and repack the message before
it is forwarded to slave servers.
:param message: message to handle
:param list values: values to handle
:param int index: index ti get key value from
:return: nothing
"""
key_value = values[index]
key, value = key_value.split('=', 1)
control_nets = value.split()
if len(control_nets) < 2:
logger.warn("multiple controlnet prefixes do not exist")
return
servers = self.session.broker.getservernames()
if len(servers) < 2:
logger.warn("not distributed")
return
servers.remove("localhost")
# master always gets first prefix
servers.insert(0, "localhost")
# create list of "server1:ctrlnet1 server2:ctrlnet2 ..."
control_nets = map(lambda x: "%s:%s" % (x[0], x[1]), zip(servers, control_nets))
values[index] = "controlnet=%s" % (" ".join(control_nets))
values_str = "|".join(values)
message.tlvdata[ConfigTlvs.VALUES.value] = values_str
message.repack()
class SessionMetaData(ConfigurableManager):
"""

View file

@ -296,7 +296,8 @@ def exec_file(cfg):
tlvdata = coreapi.CoreRegisterTlv.pack(RegisterTlvs.EXECUTE_SERVER.value, filename)
msg = coreapi.CoreRegMessage.pack(MessageFlags.ADD.value, tlvdata)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.connect(("localhost", int(cfg["port"]))) # TODO: connect address option
# TODO: connect address option
sock.connect(("localhost", int(cfg["port"])))
sock.sendall(msg)
return 0