Merge pull request #481 from coreemu/enhancement/broadcast-links

Enhancement/broadcast links
This commit is contained in:
bharnden 2020-06-26 15:33:52 -07:00 committed by GitHub
commit adf28f6b55
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 223 additions and 109 deletions

View file

@ -509,7 +509,7 @@ class CoreGrpcClient:
:param node_id: node id
:param position: position to set node to
:param icon: path to icon for gui to use for node
:param source: application source editing node
:param source: application source
:param geo: lon,lat,alt location for node
:return: response with result of success or failure
:raises grpc.RpcError: when session or node doesn't exist
@ -614,6 +614,7 @@ class CoreGrpcClient:
iface1: core_pb2.Interface = None,
iface2: core_pb2.Interface = None,
options: core_pb2.LinkOptions = None,
source: str = None,
) -> core_pb2.AddLinkResponse:
"""
Add a link between nodes.
@ -624,6 +625,7 @@ class CoreGrpcClient:
:param iface1: node one interface data
:param iface2: node two interface data
:param options: options for link (jitter, bandwidth, etc)
:param source: application source
:return: response with result of success or failure
:raises grpc.RpcError: when session or one of the nodes don't exist
"""
@ -635,7 +637,9 @@ class CoreGrpcClient:
iface2=iface2,
options=options,
)
request = core_pb2.AddLinkRequest(session_id=session_id, link=link)
request = core_pb2.AddLinkRequest(
session_id=session_id, link=link, source=source
)
return self.stub.AddLink(request)
def edit_link(
@ -646,6 +650,7 @@ class CoreGrpcClient:
options: core_pb2.LinkOptions,
iface1_id: int = None,
iface2_id: int = None,
source: str = None,
) -> core_pb2.EditLinkResponse:
"""
Edit a link between nodes.
@ -656,6 +661,7 @@ class CoreGrpcClient:
:param options: options for link (jitter, bandwidth, etc)
:param iface1_id: node one interface id
:param iface2_id: node two interface id
:param source: application source
:return: response with result of success or failure
:raises grpc.RpcError: when session or one of the nodes don't exist
"""
@ -666,6 +672,7 @@ class CoreGrpcClient:
options=options,
iface1_id=iface1_id,
iface2_id=iface2_id,
source=source,
)
return self.stub.EditLink(request)
@ -676,6 +683,7 @@ class CoreGrpcClient:
node2_id: int,
iface1_id: int = None,
iface2_id: int = None,
source: str = None,
) -> core_pb2.DeleteLinkResponse:
"""
Delete a link between nodes.
@ -685,6 +693,7 @@ class CoreGrpcClient:
:param node2_id: node two id
:param iface1_id: node one interface id
:param iface2_id: node two interface id
:param source: application source
:return: response with result of success or failure
:raises grpc.RpcError: when session doesn't exist
"""
@ -694,6 +703,7 @@ class CoreGrpcClient:
node2_id=node2_id,
iface1_id=iface1_id,
iface2_id=iface2_id,
source=source,
)
return self.stub.DeleteLink(request)

View file

@ -1,6 +1,6 @@
import logging
from queue import Empty, Queue
from typing import Iterable
from typing import Iterable, Optional
from core.api.grpc import core_pb2
from core.api.grpc.grpcutils import convert_link
@ -15,7 +15,7 @@ from core.emulator.data import (
from core.emulator.session import Session
def handle_node_event(node_data: NodeData) -> core_pb2.NodeEvent:
def handle_node_event(node_data: NodeData) -> core_pb2.Event:
"""
Handle node event when there is a node event
@ -36,98 +36,105 @@ def handle_node_event(node_data: NodeData) -> core_pb2.NodeEvent:
geo=geo,
services=services,
)
return core_pb2.NodeEvent(node=node_proto, source=node_data.source)
node_event = core_pb2.NodeEvent(node=node_proto)
return core_pb2.Event(node_event=node_event, source=node_data.source)
def handle_link_event(event: LinkData) -> core_pb2.LinkEvent:
def handle_link_event(link_data: LinkData) -> core_pb2.Event:
"""
Handle link event when there is a link event
:param event: link data
:param link_data: link data
:return: link event that has message type and link information
"""
link = convert_link(event)
return core_pb2.LinkEvent(message_type=event.message_type.value, link=link)
link = convert_link(link_data)
message_type = link_data.message_type.value
link_event = core_pb2.LinkEvent(message_type=message_type, link=link)
return core_pb2.Event(link_event=link_event, source=link_data.source)
def handle_session_event(event: EventData) -> core_pb2.SessionEvent:
def handle_session_event(event_data: EventData) -> core_pb2.Event:
"""
Handle session event when there is a session event
:param event: event data
:param event_data: event data
:return: session event
"""
event_time = event.time
event_time = event_data.time
if event_time is not None:
event_time = float(event_time)
return core_pb2.SessionEvent(
node_id=event.node,
event=event.event_type.value,
name=event.name,
data=event.data,
session_event = core_pb2.SessionEvent(
node_id=event_data.node,
event=event_data.event_type.value,
name=event_data.name,
data=event_data.data,
time=event_time,
)
return core_pb2.Event(session_event=session_event)
def handle_config_event(event: ConfigData) -> core_pb2.ConfigEvent:
def handle_config_event(config_data: ConfigData) -> core_pb2.Event:
"""
Handle configuration event when there is configuration event
:param event: configuration data
:param config_data: configuration data
:return: configuration event
"""
return core_pb2.ConfigEvent(
message_type=event.message_type,
node_id=event.node,
object=event.object,
type=event.type,
captions=event.captions,
bitmap=event.bitmap,
data_values=event.data_values,
possible_values=event.possible_values,
groups=event.groups,
iface_id=event.iface_id,
network_id=event.network_id,
opaque=event.opaque,
data_types=event.data_types,
config_event = core_pb2.ConfigEvent(
message_type=config_data.message_type,
node_id=config_data.node,
object=config_data.object,
type=config_data.type,
captions=config_data.captions,
bitmap=config_data.bitmap,
data_values=config_data.data_values,
possible_values=config_data.possible_values,
groups=config_data.groups,
iface_id=config_data.iface_id,
network_id=config_data.network_id,
opaque=config_data.opaque,
data_types=config_data.data_types,
)
return core_pb2.Event(config_event=config_event)
def handle_exception_event(event: ExceptionData) -> core_pb2.ExceptionEvent:
def handle_exception_event(exception_data: ExceptionData) -> core_pb2.Event:
"""
Handle exception event when there is exception event
:param event: exception data
:param exception_data: exception data
:return: exception event
"""
return core_pb2.ExceptionEvent(
node_id=event.node,
level=event.level.value,
source=event.source,
date=event.date,
text=event.text,
opaque=event.opaque,
exception_event = core_pb2.ExceptionEvent(
node_id=exception_data.node,
level=exception_data.level.value,
source=exception_data.source,
date=exception_data.date,
text=exception_data.text,
opaque=exception_data.opaque,
)
return core_pb2.Event(exception_event=exception_event)
def handle_file_event(event: FileData) -> core_pb2.FileEvent:
def handle_file_event(file_data: FileData) -> core_pb2.Event:
"""
Handle file event
:param event: file data
:param file_data: file data
:return: file event
"""
return core_pb2.FileEvent(
message_type=event.message_type.value,
node_id=event.node,
name=event.name,
mode=event.mode,
number=event.number,
type=event.type,
source=event.source,
data=event.data,
compressed_data=event.compressed_data,
file_event = core_pb2.FileEvent(
message_type=file_data.message_type.value,
node_id=file_data.node,
name=file_data.name,
mode=file_data.mode,
number=file_data.number,
type=file_data.type,
source=file_data.source,
data=file_data.data,
compressed_data=file_data.compressed_data,
)
return core_pb2.Event(file_event=file_event)
class EventStreamer:
@ -168,32 +175,33 @@ class EventStreamer:
if core_pb2.EventType.SESSION in self.event_types:
self.session.event_handlers.append(self.queue.put)
def process(self) -> core_pb2.Event:
def process(self) -> Optional[core_pb2.Event]:
"""
Process the next event in the queue.
:return: grpc event, or None when invalid event or queue timeout
"""
event = core_pb2.Event(session_id=self.session.id)
event = None
try:
data = self.queue.get(timeout=1)
if isinstance(data, NodeData):
event.node_event.CopyFrom(handle_node_event(data))
event = handle_node_event(data)
elif isinstance(data, LinkData):
event.link_event.CopyFrom(handle_link_event(data))
event = handle_link_event(data)
elif isinstance(data, EventData):
event.session_event.CopyFrom(handle_session_event(data))
event = handle_session_event(data)
elif isinstance(data, ConfigData):
event.config_event.CopyFrom(handle_config_event(data))
event = handle_config_event(data)
elif isinstance(data, ExceptionData):
event.exception_event.CopyFrom(handle_exception_event(data))
event = handle_exception_event(data)
elif isinstance(data, FileData):
event.file_event.CopyFrom(handle_file_event(data))
event = handle_file_event(data)
else:
logging.error("unknown event: %s", data)
event = None
except Empty:
event = None
pass
if event:
event.session_id = self.session.id
return event
def remove_handlers(self) -> None:

View file

@ -435,6 +435,24 @@ def get_service_configuration(service: CoreService) -> NodeServiceData:
)
def iface_to_data(iface: CoreInterface) -> InterfaceData:
ip4 = iface.get_ip4()
ip4_addr = str(ip4.ip) if ip4 else None
ip4_mask = ip4.prefixlen if ip4 else None
ip6 = iface.get_ip6()
ip6_addr = str(ip6.ip) if ip6 else None
ip6_mask = ip6.prefixlen if ip6 else None
return InterfaceData(
id=iface.node_id,
name=iface.name,
mac=str(iface.mac),
ip4=ip4_addr,
ip4_mask=ip4_mask,
ip6=ip6_addr,
ip6_mask=ip6_mask,
)
def iface_to_proto(iface: CoreInterface) -> core_pb2.Interface:
"""
Convenience for converting a core interface to the protobuf representation.

View file

@ -108,7 +108,7 @@ from core.api.grpc.wlan_pb2 import (
WlanLinkResponse,
)
from core.emulator.coreemu import CoreEmu
from core.emulator.data import LinkData, LinkOptions, NodeOptions
from core.emulator.data import InterfaceData, LinkData, LinkOptions, NodeOptions
from core.emulator.enumerations import EventTypes, LinkTypes, MessageFlags
from core.emulator.session import NT, Session
from core.errors import CoreCommandError, CoreError
@ -853,6 +853,23 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
node1_iface, node2_iface = session.add_link(
node1_id, node2_id, iface1_data, iface2_data, options, link_type
)
iface1_data = None
if node1_iface:
iface1_data = grpcutils.iface_to_data(node1_iface)
iface2_data = None
if node2_iface:
iface2_data = grpcutils.iface_to_data(node2_iface)
source = request.source if request.source else None
link_data = LinkData(
message_type=MessageFlags.ADD,
node1_id=node1_id,
node2_id=node2_id,
iface1=iface1_data,
iface2=iface2_data,
options=options,
source=source,
)
session.broadcast_link(link_data)
iface1_proto = None
iface2_proto = None
if node1_iface:
@ -893,6 +910,19 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
key=options_proto.key,
)
session.update_link(node1_id, node2_id, iface1_id, iface2_id, options)
iface1 = InterfaceData(id=iface1_id)
iface2 = InterfaceData(id=iface2_id)
source = request.source if request.source else None
link_data = LinkData(
message_type=MessageFlags.NONE,
node1_id=node1_id,
node2_id=node2_id,
iface1=iface1,
iface2=iface2,
options=options,
source=source,
)
session.broadcast_link(link_data)
return core_pb2.EditLinkResponse(result=True)
def DeleteLink(
@ -912,6 +942,18 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
iface1_id = request.iface1_id
iface2_id = request.iface2_id
session.delete_link(node1_id, node2_id, iface1_id, iface2_id)
iface1 = InterfaceData(id=iface1_id)
iface2 = InterfaceData(id=iface2_id)
source = request.source if request.source else None
link_data = LinkData(
message_type=MessageFlags.DELETE,
node1_id=node1_id,
node2_id=node2_id,
iface1=iface1,
iface2=iface2,
source=source,
)
session.broadcast_link(link_data)
return core_pb2.DeleteLinkResponse(result=True)
def GetHooks(

View file

@ -190,6 +190,7 @@ class LinkData:
iface2: InterfaceData = None
options: LinkOptions = LinkOptions()
color: str = None
source: str = None
class IpPrefixes:

View file

@ -148,6 +148,8 @@ class CoreClient:
self.custom_observers[observer.name] = observer
def handle_events(self, event: Event) -> None:
if event.source == GUI_SOURCE:
return
if event.session_id != self.session_id:
logging.warning(
"ignoring event session(%s) current(%s)",
@ -193,19 +195,36 @@ class CoreClient:
return
canvas_node1 = self.canvas_nodes[node1_id]
canvas_node2 = self.canvas_nodes[node2_id]
if event.message_type == MessageType.ADD:
self.app.canvas.add_wireless_edge(canvas_node1, canvas_node2, event.link)
elif event.message_type == MessageType.DELETE:
self.app.canvas.delete_wireless_edge(canvas_node1, canvas_node2, event.link)
elif event.message_type == MessageType.NONE:
self.app.canvas.update_wireless_edge(canvas_node1, canvas_node2, event.link)
if event.link.type == LinkType.WIRELESS:
if event.message_type == MessageType.ADD:
self.app.canvas.add_wireless_edge(
canvas_node1, canvas_node2, event.link
)
elif event.message_type == MessageType.DELETE:
self.app.canvas.delete_wireless_edge(
canvas_node1, canvas_node2, event.link
)
elif event.message_type == MessageType.NONE:
self.app.canvas.update_wireless_edge(
canvas_node1, canvas_node2, event.link
)
else:
logging.warning("unknown link event: %s", event)
else:
logging.warning("unknown link event: %s", event)
if event.message_type == MessageType.ADD:
self.app.canvas.add_wired_edge(canvas_node1, canvas_node2, event.link)
self.app.canvas.organize()
elif event.message_type == MessageType.DELETE:
self.app.canvas.delete_wired_edge(canvas_node1, canvas_node2)
elif event.message_type == MessageType.NONE:
self.app.canvas.update_wired_edge(
canvas_node1, canvas_node2, event.link
)
else:
logging.warning("unknown link event: %s", event)
def handle_node_event(self, event: NodeEvent) -> None:
logging.debug("node event: %s", event)
if event.source == GUI_SOURCE:
return
node_id = event.node.id
x = event.node.position.x
y = event.node.position.y

View file

@ -225,6 +225,50 @@ class CanvasGraph(tk.Canvas):
self.tag_lower(tags.GRIDLINE)
self.tag_lower(self.rect)
def add_wired_edge(self, src: CanvasNode, dst: CanvasNode, link: Link) -> None:
token = create_edge_token(src.id, dst.id)
if token in self.edges and link.options.unidirectional:
edge = self.edges[token]
edge.asymmetric_link = link
elif token not in self.edges:
node1 = src.core_node
node2 = dst.core_node
src_pos = (node1.position.x, node1.position.y)
dst_pos = (node2.position.x, node2.position.y)
edge = CanvasEdge(self, src.id, src_pos, dst_pos)
edge.token = token
edge.dst = dst.id
edge.set_link(link)
edge.check_wireless()
src.edges.add(edge)
dst.edges.add(edge)
self.edges[edge.token] = edge
self.core.links[edge.token] = edge
if link.HasField("iface1"):
iface1 = link.iface1
self.core.iface_to_edge[(node1.id, iface1.id)] = token
src.ifaces[iface1.id] = iface1
edge.src_iface = iface1
if link.HasField("iface2"):
iface2 = link.iface2
self.core.iface_to_edge[(node2.id, iface2.id)] = edge.token
dst.ifaces[iface2.id] = iface2
edge.dst_iface = iface2
def delete_wired_edge(self, src: CanvasNode, dst: CanvasNode) -> None:
token = create_edge_token(src.id, dst.id)
edge = self.edges.get(token)
if not edge:
return
self.delete_edge(edge)
def update_wired_edge(self, src: CanvasNode, dst: CanvasNode, link: Link) -> None:
token = create_edge_token(src.id, dst.id)
edge = self.edges.get(token)
if not edge:
return
edge.link.options.CopyFrom(link.options)
def add_wireless_edge(self, src: CanvasNode, dst: CanvasNode, link: Link) -> None:
network_id = link.network_id if link.network_id else None
token = create_edge_token(src.id, dst.id, network_id)
@ -297,41 +341,11 @@ class CanvasGraph(tk.Canvas):
for link in session.links:
logging.debug("drawing link: %s", link)
canvas_node1 = self.core.canvas_nodes[link.node1_id]
node1 = canvas_node1.core_node
canvas_node2 = self.core.canvas_nodes[link.node2_id]
node2 = canvas_node2.core_node
token = create_edge_token(canvas_node1.id, canvas_node2.id)
if link.type == LinkType.WIRELESS:
self.add_wireless_edge(canvas_node1, canvas_node2, link)
else:
if token not in self.edges:
src_pos = (node1.position.x, node1.position.y)
dst_pos = (node2.position.x, node2.position.y)
edge = CanvasEdge(self, canvas_node1.id, src_pos, dst_pos)
edge.token = token
edge.dst = canvas_node2.id
edge.set_link(link)
edge.check_wireless()
canvas_node1.edges.add(edge)
canvas_node2.edges.add(edge)
self.edges[edge.token] = edge
self.core.links[edge.token] = edge
if link.HasField("iface1"):
iface1 = link.iface1
self.core.iface_to_edge[(node1.id, iface1.id)] = token
canvas_node1.ifaces[iface1.id] = iface1
edge.src_iface = iface1
if link.HasField("iface2"):
iface2 = link.iface2
self.core.iface_to_edge[(node2.id, iface2.id)] = edge.token
canvas_node2.ifaces[iface2.id] = iface2
edge.dst_iface = iface2
elif link.options.unidirectional:
edge = self.edges[token]
edge.asymmetric_link = link
else:
logging.error("duplicate link received: %s", link)
self.add_wired_edge(canvas_node1, canvas_node2, link)
def stopped_session(self) -> None:
# clear wireless edges
@ -343,9 +357,8 @@ class CanvasGraph(tk.Canvas):
dst_node.wireless_edges.remove(edge)
self.wireless_edges.clear()
# clear all middle edge labels
for edge in self.edges.values():
edge.reset()
# clear throughputs
self.clear_throughputs()
def canvas_xy(self, event: tk.Event) -> Tuple[float, float]:
"""

View file

@ -343,11 +343,11 @@ message Event {
FileEvent file_event = 6;
}
int32 session_id = 7;
string source = 8;
}
message NodeEvent {
Node node = 1;
string source = 2;
}
message LinkEvent {
@ -488,6 +488,7 @@ message GetNodeLinksResponse {
message AddLinkRequest {
int32 session_id = 1;
Link link = 2;
string source = 3;
}
message AddLinkResponse {
@ -503,6 +504,7 @@ message EditLinkRequest {
int32 iface1_id = 4;
int32 iface2_id = 5;
LinkOptions options = 6;
string source = 7;
}
message EditLinkResponse {
@ -515,6 +517,7 @@ message DeleteLinkRequest {
int32 node2_id = 3;
int32 iface1_id = 4;
int32 iface2_id = 5;
string source = 6;
}
message DeleteLinkResponse {