grpc/pygui: shifted source field in node events to base event message to apply to all events, updated add_link/delete_link rpc calls to broadcast events, updated pygui to handle these events

This commit is contained in:
Blake Harnden 2020-06-26 14:39:12 -07:00
parent eac941ce72
commit aebbff8c22
8 changed files with 185 additions and 105 deletions

View file

@ -1,6 +1,6 @@
import logging
from queue import Empty, Queue
from typing import Iterable
from typing import Iterable, Optional
from core.api.grpc import core_pb2
from core.api.grpc.grpcutils import convert_link
@ -15,7 +15,7 @@ from core.emulator.data import (
from core.emulator.session import Session
def handle_node_event(node_data: NodeData) -> core_pb2.NodeEvent:
def handle_node_event(node_data: NodeData) -> core_pb2.Event:
"""
Handle node event when there is a node event
@ -36,98 +36,105 @@ def handle_node_event(node_data: NodeData) -> core_pb2.NodeEvent:
geo=geo,
services=services,
)
return core_pb2.NodeEvent(node=node_proto, source=node_data.source)
node_event = core_pb2.NodeEvent(node=node_proto)
return core_pb2.Event(node_event=node_event, source=node_data.source)
def handle_link_event(event: LinkData) -> core_pb2.LinkEvent:
def handle_link_event(link_data: LinkData) -> core_pb2.Event:
"""
Handle link event when there is a link event
:param event: link data
:param link_data: link data
:return: link event that has message type and link information
"""
link = convert_link(event)
return core_pb2.LinkEvent(message_type=event.message_type.value, link=link)
link = convert_link(link_data)
message_type = link_data.message_type.value
link_event = core_pb2.LinkEvent(message_type=message_type, link=link)
return core_pb2.Event(link_event=link_event, source=link_data.source)
def handle_session_event(event: EventData) -> core_pb2.SessionEvent:
def handle_session_event(event_data: EventData) -> core_pb2.Event:
"""
Handle session event when there is a session event
:param event: event data
:param event_data: event data
:return: session event
"""
event_time = event.time
event_time = event_data.time
if event_time is not None:
event_time = float(event_time)
return core_pb2.SessionEvent(
node_id=event.node,
event=event.event_type.value,
name=event.name,
data=event.data,
session_event = core_pb2.SessionEvent(
node_id=event_data.node,
event=event_data.event_type.value,
name=event_data.name,
data=event_data.data,
time=event_time,
)
return core_pb2.Event(session_event=session_event)
def handle_config_event(event: ConfigData) -> core_pb2.ConfigEvent:
def handle_config_event(config_data: ConfigData) -> core_pb2.Event:
"""
Handle configuration event when there is configuration event
:param event: configuration data
:param config_data: configuration data
:return: configuration event
"""
return core_pb2.ConfigEvent(
message_type=event.message_type,
node_id=event.node,
object=event.object,
type=event.type,
captions=event.captions,
bitmap=event.bitmap,
data_values=event.data_values,
possible_values=event.possible_values,
groups=event.groups,
iface_id=event.iface_id,
network_id=event.network_id,
opaque=event.opaque,
data_types=event.data_types,
config_event = core_pb2.ConfigEvent(
message_type=config_data.message_type,
node_id=config_data.node,
object=config_data.object,
type=config_data.type,
captions=config_data.captions,
bitmap=config_data.bitmap,
data_values=config_data.data_values,
possible_values=config_data.possible_values,
groups=config_data.groups,
iface_id=config_data.iface_id,
network_id=config_data.network_id,
opaque=config_data.opaque,
data_types=config_data.data_types,
)
return core_pb2.Event(config_event=config_event)
def handle_exception_event(event: ExceptionData) -> core_pb2.ExceptionEvent:
def handle_exception_event(exception_data: ExceptionData) -> core_pb2.Event:
"""
Handle exception event when there is exception event
:param event: exception data
:param exception_data: exception data
:return: exception event
"""
return core_pb2.ExceptionEvent(
node_id=event.node,
level=event.level.value,
source=event.source,
date=event.date,
text=event.text,
opaque=event.opaque,
exception_event = core_pb2.ExceptionEvent(
node_id=exception_data.node,
level=exception_data.level.value,
source=exception_data.source,
date=exception_data.date,
text=exception_data.text,
opaque=exception_data.opaque,
)
return core_pb2.Event(exception_event=exception_event)
def handle_file_event(event: FileData) -> core_pb2.FileEvent:
def handle_file_event(file_data: FileData) -> core_pb2.Event:
"""
Handle file event
:param event: file data
:param file_data: file data
:return: file event
"""
return core_pb2.FileEvent(
message_type=event.message_type.value,
node_id=event.node,
name=event.name,
mode=event.mode,
number=event.number,
type=event.type,
source=event.source,
data=event.data,
compressed_data=event.compressed_data,
file_event = core_pb2.FileEvent(
message_type=file_data.message_type.value,
node_id=file_data.node,
name=file_data.name,
mode=file_data.mode,
number=file_data.number,
type=file_data.type,
source=file_data.source,
data=file_data.data,
compressed_data=file_data.compressed_data,
)
return core_pb2.Event(file_event=file_event)
class EventStreamer:
@ -168,32 +175,33 @@ class EventStreamer:
if core_pb2.EventType.SESSION in self.event_types:
self.session.event_handlers.append(self.queue.put)
def process(self) -> core_pb2.Event:
def process(self) -> Optional[core_pb2.Event]:
"""
Process the next event in the queue.
:return: grpc event, or None when invalid event or queue timeout
"""
event = core_pb2.Event(session_id=self.session.id)
event = None
try:
data = self.queue.get(timeout=1)
if isinstance(data, NodeData):
event.node_event.CopyFrom(handle_node_event(data))
event = handle_node_event(data)
elif isinstance(data, LinkData):
event.link_event.CopyFrom(handle_link_event(data))
event = handle_link_event(data)
elif isinstance(data, EventData):
event.session_event.CopyFrom(handle_session_event(data))
event = handle_session_event(data)
elif isinstance(data, ConfigData):
event.config_event.CopyFrom(handle_config_event(data))
event = handle_config_event(data)
elif isinstance(data, ExceptionData):
event.exception_event.CopyFrom(handle_exception_event(data))
event = handle_exception_event(data)
elif isinstance(data, FileData):
event.file_event.CopyFrom(handle_file_event(data))
event = handle_file_event(data)
else:
logging.error("unknown event: %s", data)
event = None
except Empty:
event = None
pass
if event:
event.session_id = self.session.id
return event
def remove_handlers(self) -> None:

View file

@ -435,6 +435,24 @@ def get_service_configuration(service: CoreService) -> NodeServiceData:
)
def iface_to_data(iface: CoreInterface) -> InterfaceData:
ip4 = iface.get_ip4()
ip4_addr = str(ip4.ip) if ip4 else None
ip4_mask = ip4.prefixlen if ip4 else None
ip6 = iface.get_ip6()
ip6_addr = str(ip6.ip) if ip6 else None
ip6_mask = ip6.prefixlen if ip6 else None
return InterfaceData(
id=iface.node_id,
name=iface.name,
mac=str(iface.mac),
ip4=ip4_addr,
ip4_mask=ip4_mask,
ip6=ip6_addr,
ip6_mask=ip6_mask,
)
def iface_to_proto(iface: CoreInterface) -> core_pb2.Interface:
"""
Convenience for converting a core interface to the protobuf representation.

View file

@ -108,7 +108,7 @@ from core.api.grpc.wlan_pb2 import (
WlanLinkResponse,
)
from core.emulator.coreemu import CoreEmu
from core.emulator.data import LinkData, LinkOptions, NodeOptions
from core.emulator.data import InterfaceData, LinkData, LinkOptions, NodeOptions
from core.emulator.enumerations import EventTypes, LinkTypes, MessageFlags
from core.emulator.session import NT, Session
from core.errors import CoreCommandError, CoreError
@ -853,6 +853,22 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
node1_iface, node2_iface = session.add_link(
node1_id, node2_id, iface1_data, iface2_data, options, link_type
)
iface1_data = None
if node1_iface:
iface1_data = grpcutils.iface_to_data(node1_iface)
iface2_data = None
if node2_iface:
iface2_data = grpcutils.iface_to_data(node2_iface)
source = request.source if request.source else None
link_data = LinkData(
message_type=MessageFlags.ADD,
node1_id=node1_id,
node2_id=node2_id,
iface1=iface1_data,
iface2=iface2_data,
source=source,
)
session.broadcast_link(link_data)
iface1_proto = None
iface2_proto = None
if node1_iface:
@ -912,6 +928,18 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
iface1_id = request.iface1_id
iface2_id = request.iface2_id
session.delete_link(node1_id, node2_id, iface1_id, iface2_id)
iface1 = InterfaceData(id=iface1_id)
iface2 = InterfaceData(id=iface2_id)
source = request.source if request.source else None
link_data = LinkData(
message_type=MessageFlags.DELETE,
node1_id=node1_id,
node2_id=node2_id,
iface1=iface1,
iface2=iface2,
source=source,
)
session.broadcast_link(link_data)
return core_pb2.DeleteLinkResponse(result=True)
def GetHooks(

View file

@ -190,6 +190,7 @@ class LinkData:
iface2: InterfaceData = None
options: LinkOptions = LinkOptions()
color: str = None
source: str = None
class IpPrefixes:

View file

@ -833,11 +833,12 @@ class Session:
for handler in self.config_handlers:
handler(config_data)
def broadcast_link(self, link_data: LinkData) -> None:
def broadcast_link(self, link_data: LinkData, source: str = None) -> None:
"""
Handle link data that should be provided to link handlers.
:param link_data: link data to send out
:param source: source of broadcast, None by default
:return: nothing
"""
for handler in self.link_handlers:

View file

@ -148,6 +148,8 @@ class CoreClient:
self.custom_observers[observer.name] = observer
def handle_events(self, event: Event) -> None:
if event.source == GUI_SOURCE:
return
if event.session_id != self.session_id:
logging.warning(
"ignoring event session(%s) current(%s)",
@ -193,19 +195,32 @@ class CoreClient:
return
canvas_node1 = self.canvas_nodes[node1_id]
canvas_node2 = self.canvas_nodes[node2_id]
if event.message_type == MessageType.ADD:
self.app.canvas.add_wireless_edge(canvas_node1, canvas_node2, event.link)
elif event.message_type == MessageType.DELETE:
self.app.canvas.delete_wireless_edge(canvas_node1, canvas_node2, event.link)
elif event.message_type == MessageType.NONE:
self.app.canvas.update_wireless_edge(canvas_node1, canvas_node2, event.link)
if event.link.type == LinkType.WIRELESS:
if event.message_type == MessageType.ADD:
self.app.canvas.add_wireless_edge(
canvas_node1, canvas_node2, event.link
)
elif event.message_type == MessageType.DELETE:
self.app.canvas.delete_wireless_edge(
canvas_node1, canvas_node2, event.link
)
elif event.message_type == MessageType.NONE:
self.app.canvas.update_wireless_edge(
canvas_node1, canvas_node2, event.link
)
else:
logging.warning("unknown link event: %s", event)
else:
logging.warning("unknown link event: %s", event)
if event.message_type == MessageType.ADD:
self.app.canvas.add_wired_edge(canvas_node1, canvas_node2, event.link)
self.app.canvas.organize()
elif event.message_type == MessageType.DELETE:
self.app.canvas.delete_wired_edge(canvas_node1, canvas_node2)
else:
logging.warning("unknown link event: %s", event)
def handle_node_event(self, event: NodeEvent) -> None:
logging.debug("node event: %s", event)
if event.source == GUI_SOURCE:
return
node_id = event.node.id
x = event.node.position.x
y = event.node.position.y

View file

@ -225,6 +225,43 @@ class CanvasGraph(tk.Canvas):
self.tag_lower(tags.GRIDLINE)
self.tag_lower(self.rect)
def add_wired_edge(self, src: CanvasNode, dst: CanvasNode, link: Link) -> None:
token = create_edge_token(src.id, dst.id)
if token in self.edges and link.options.unidirectional:
edge = self.edges[token]
edge.asymmetric_link = link
elif token not in self.edges:
node1 = src.core_node
node2 = dst.core_node
src_pos = (node1.position.x, node1.position.y)
dst_pos = (node2.position.x, node2.position.y)
edge = CanvasEdge(self, src.id, src_pos, dst_pos)
edge.token = token
edge.dst = dst.id
edge.set_link(link)
edge.check_wireless()
src.edges.add(edge)
dst.edges.add(edge)
self.edges[edge.token] = edge
self.core.links[edge.token] = edge
if link.HasField("iface1"):
iface1 = link.iface1
self.core.iface_to_edge[(node1.id, iface1.id)] = token
src.ifaces[iface1.id] = iface1
edge.src_iface = iface1
if link.HasField("iface2"):
iface2 = link.iface2
self.core.iface_to_edge[(node2.id, iface2.id)] = edge.token
dst.ifaces[iface2.id] = iface2
edge.dst_iface = iface2
def delete_wired_edge(self, src: CanvasNode, dst: CanvasNode) -> None:
token = create_edge_token(src.id, dst.id)
edge = self.edges.get(token)
if not edge:
return
self.delete_edge(edge)
def add_wireless_edge(self, src: CanvasNode, dst: CanvasNode, link: Link) -> None:
network_id = link.network_id if link.network_id else None
token = create_edge_token(src.id, dst.id, network_id)
@ -297,41 +334,11 @@ class CanvasGraph(tk.Canvas):
for link in session.links:
logging.debug("drawing link: %s", link)
canvas_node1 = self.core.canvas_nodes[link.node1_id]
node1 = canvas_node1.core_node
canvas_node2 = self.core.canvas_nodes[link.node2_id]
node2 = canvas_node2.core_node
token = create_edge_token(canvas_node1.id, canvas_node2.id)
if link.type == LinkType.WIRELESS:
self.add_wireless_edge(canvas_node1, canvas_node2, link)
else:
if token not in self.edges:
src_pos = (node1.position.x, node1.position.y)
dst_pos = (node2.position.x, node2.position.y)
edge = CanvasEdge(self, canvas_node1.id, src_pos, dst_pos)
edge.token = token
edge.dst = canvas_node2.id
edge.set_link(link)
edge.check_wireless()
canvas_node1.edges.add(edge)
canvas_node2.edges.add(edge)
self.edges[edge.token] = edge
self.core.links[edge.token] = edge
if link.HasField("iface1"):
iface1 = link.iface1
self.core.iface_to_edge[(node1.id, iface1.id)] = token
canvas_node1.ifaces[iface1.id] = iface1
edge.src_iface = iface1
if link.HasField("iface2"):
iface2 = link.iface2
self.core.iface_to_edge[(node2.id, iface2.id)] = edge.token
canvas_node2.ifaces[iface2.id] = iface2
edge.dst_iface = iface2
elif link.options.unidirectional:
edge = self.edges[token]
edge.asymmetric_link = link
else:
logging.error("duplicate link received: %s", link)
self.add_wired_edge(canvas_node1, canvas_node2, link)
def stopped_session(self) -> None:
# clear wireless edges

View file

@ -343,11 +343,11 @@ message Event {
FileEvent file_event = 6;
}
int32 session_id = 7;
string source = 8;
}
message NodeEvent {
Node node = 1;
string source = 2;
}
message LinkEvent {
@ -488,6 +488,7 @@ message GetNodeLinksResponse {
message AddLinkRequest {
int32 session_id = 1;
Link link = 2;
string source = 3;
}
message AddLinkResponse {
@ -515,6 +516,7 @@ message DeleteLinkRequest {
int32 node2_id = 3;
int32 iface1_id = 4;
int32 iface2_id = 5;
string source = 6;
}
message DeleteLinkResponse {