updated events streamed from sessions to include session id for easy identification
This commit is contained in:
parent
d248bc09b5
commit
44df926fb9
4 changed files with 35 additions and 26 deletions
|
@ -20,6 +20,7 @@ from coretk.graph.shapeutils import ShapeType
|
||||||
from coretk.interface import InterfaceManager
|
from coretk.interface import InterfaceManager
|
||||||
from coretk.nodeutils import NodeDraw, NodeUtils
|
from coretk.nodeutils import NodeDraw, NodeUtils
|
||||||
|
|
||||||
|
GUI_SOURCE = "gui"
|
||||||
OBSERVERS = {
|
OBSERVERS = {
|
||||||
"processes": "ps",
|
"processes": "ps",
|
||||||
"ifconfig": "ifconfig",
|
"ifconfig": "ifconfig",
|
||||||
|
@ -133,6 +134,14 @@ class CoreClient:
|
||||||
self.custom_observers[observer.name] = observer
|
self.custom_observers[observer.name] = observer
|
||||||
|
|
||||||
def handle_events(self, event):
|
def handle_events(self, event):
|
||||||
|
if event.session_id != self.session_id:
|
||||||
|
logging.warn(
|
||||||
|
"ignoring event session(%s) current(%s)",
|
||||||
|
event.session_id,
|
||||||
|
self.session_id,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
if event.HasField("link_event"):
|
if event.HasField("link_event"):
|
||||||
logging.info("link event: %s", event)
|
logging.info("link event: %s", event)
|
||||||
self.handle_link_event(event.link_event)
|
self.handle_link_event(event.link_event)
|
||||||
|
@ -176,7 +185,7 @@ class CoreClient:
|
||||||
logging.warning("unknown link event: %s", event.message_type)
|
logging.warning("unknown link event: %s", event.message_type)
|
||||||
|
|
||||||
def handle_node_event(self, event):
|
def handle_node_event(self, event):
|
||||||
if event.source == "gui":
|
if event.source == GUI_SOURCE:
|
||||||
return
|
return
|
||||||
node_id = event.node.id
|
node_id = event.node.id
|
||||||
x = event.node.position.x
|
x = event.node.position.x
|
||||||
|
@ -195,6 +204,11 @@ class CoreClient:
|
||||||
|
|
||||||
def handle_throughputs(self, event):
|
def handle_throughputs(self, event):
|
||||||
if event.session_id != self.session_id:
|
if event.session_id != self.session_id:
|
||||||
|
logging.warn(
|
||||||
|
"ignoring throughput event session(%s) current(%s)",
|
||||||
|
event.session_id,
|
||||||
|
self.session_id,
|
||||||
|
)
|
||||||
return
|
return
|
||||||
logging.info("handling throughputs event: %s", event)
|
logging.info("handling throughputs event: %s", event)
|
||||||
self.app.canvas.throughput_draw.process_grpc_throughput_event(
|
self.app.canvas.throughput_draw.process_grpc_throughput_event(
|
||||||
|
@ -424,7 +438,7 @@ class CoreClient:
|
||||||
def edit_node(self, core_node):
|
def edit_node(self, core_node):
|
||||||
try:
|
try:
|
||||||
self.client.edit_node(
|
self.client.edit_node(
|
||||||
self.session_id, core_node.id, core_node.position, source="gui"
|
self.session_id, core_node.id, core_node.position, source=GUI_SOURCE
|
||||||
)
|
)
|
||||||
except grpc.RpcError as e:
|
except grpc.RpcError as e:
|
||||||
show_grpc_error(e)
|
show_grpc_error(e)
|
||||||
|
|
|
@ -107,7 +107,6 @@ def handle_session_event(event):
|
||||||
name=event.name,
|
name=event.name,
|
||||||
data=event.data,
|
data=event.data,
|
||||||
time=event_time,
|
time=event_time,
|
||||||
session_id=event.session,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -119,9 +118,6 @@ def handle_config_event(event):
|
||||||
:return: configuration event
|
:return: configuration event
|
||||||
:rtype: core.api.grpc.core_pb2.ConfigEvent
|
:rtype: core.api.grpc.core_pb2.ConfigEvent
|
||||||
"""
|
"""
|
||||||
session_id = None
|
|
||||||
if event.session is not None:
|
|
||||||
session_id = int(event.session)
|
|
||||||
return core_pb2.ConfigEvent(
|
return core_pb2.ConfigEvent(
|
||||||
message_type=event.message_type,
|
message_type=event.message_type,
|
||||||
node_id=event.node,
|
node_id=event.node,
|
||||||
|
@ -132,7 +128,6 @@ def handle_config_event(event):
|
||||||
data_values=event.data_values,
|
data_values=event.data_values,
|
||||||
possible_values=event.possible_values,
|
possible_values=event.possible_values,
|
||||||
groups=event.groups,
|
groups=event.groups,
|
||||||
session_id=session_id,
|
|
||||||
interface=event.interface_number,
|
interface=event.interface_number,
|
||||||
network_id=event.network_id,
|
network_id=event.network_id,
|
||||||
opaque=event.opaque,
|
opaque=event.opaque,
|
||||||
|
@ -150,7 +145,6 @@ def handle_exception_event(event):
|
||||||
"""
|
"""
|
||||||
return core_pb2.ExceptionEvent(
|
return core_pb2.ExceptionEvent(
|
||||||
node_id=event.node,
|
node_id=event.node,
|
||||||
session_id=int(event.session),
|
|
||||||
level=event.level,
|
level=event.level,
|
||||||
source=event.source,
|
source=event.source,
|
||||||
date=event.date,
|
date=event.date,
|
||||||
|
@ -175,7 +169,6 @@ def handle_file_event(event):
|
||||||
number=event.number,
|
number=event.number,
|
||||||
type=event.type,
|
type=event.type,
|
||||||
source=event.source,
|
source=event.source,
|
||||||
session_id=event.session,
|
|
||||||
data=event.data,
|
data=event.data,
|
||||||
compressed_data=event.compressed_data,
|
compressed_data=event.compressed_data,
|
||||||
)
|
)
|
||||||
|
@ -224,7 +217,7 @@ class EventStreamer:
|
||||||
:return: grpc event, or None when invalid event or queue timeout
|
:return: grpc event, or None when invalid event or queue timeout
|
||||||
:rtype: core.api.grpc.core_pb2.Event
|
:rtype: core.api.grpc.core_pb2.Event
|
||||||
"""
|
"""
|
||||||
event = core_pb2.Event()
|
event = core_pb2.Event(session_id=self.session.id)
|
||||||
try:
|
try:
|
||||||
data = self.queue.get(timeout=1)
|
data = self.queue.get(timeout=1)
|
||||||
if isinstance(data, NodeData):
|
if isinstance(data, NodeData):
|
||||||
|
@ -235,8 +228,6 @@ class EventStreamer:
|
||||||
event.session_event.CopyFrom(handle_session_event(data))
|
event.session_event.CopyFrom(handle_session_event(data))
|
||||||
elif isinstance(data, ConfigData):
|
elif isinstance(data, ConfigData):
|
||||||
event.config_event.CopyFrom(handle_config_event(data))
|
event.config_event.CopyFrom(handle_config_event(data))
|
||||||
# TODO: remove when config events are fixed
|
|
||||||
event.config_event.session_id = self.session.id
|
|
||||||
elif isinstance(data, ExceptionData):
|
elif isinstance(data, ExceptionData):
|
||||||
event.exception_event.CopyFrom(handle_exception_event(data))
|
event.exception_event.CopyFrom(handle_exception_event(data))
|
||||||
elif isinstance(data, FileData):
|
elif isinstance(data, FileData):
|
||||||
|
|
|
@ -301,6 +301,7 @@ message Event {
|
||||||
ExceptionEvent exception_event = 5;
|
ExceptionEvent exception_event = 5;
|
||||||
FileEvent file_event = 6;
|
FileEvent file_event = 6;
|
||||||
}
|
}
|
||||||
|
int32 session_id = 7;
|
||||||
}
|
}
|
||||||
|
|
||||||
message NodeEvent {
|
message NodeEvent {
|
||||||
|
@ -319,7 +320,6 @@ message SessionEvent {
|
||||||
string name = 3;
|
string name = 3;
|
||||||
string data = 4;
|
string data = 4;
|
||||||
float time = 5;
|
float time = 5;
|
||||||
int32 session_id = 6;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message ConfigEvent {
|
message ConfigEvent {
|
||||||
|
@ -333,20 +333,18 @@ message ConfigEvent {
|
||||||
string bitmap = 8;
|
string bitmap = 8;
|
||||||
string possible_values = 9;
|
string possible_values = 9;
|
||||||
string groups = 10;
|
string groups = 10;
|
||||||
int32 session_id = 11;
|
int32 interface = 11;
|
||||||
int32 interface = 12;
|
int32 network_id = 12;
|
||||||
int32 network_id = 13;
|
string opaque = 13;
|
||||||
string opaque = 14;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message ExceptionEvent {
|
message ExceptionEvent {
|
||||||
int32 node_id = 1;
|
int32 node_id = 1;
|
||||||
int32 session_id = 2;
|
ExceptionLevel.Enum level = 2;
|
||||||
ExceptionLevel.Enum level = 3;
|
string source = 3;
|
||||||
string source = 4;
|
string date = 4;
|
||||||
string date = 5;
|
string text = 5;
|
||||||
string text = 6;
|
string opaque = 6;
|
||||||
string opaque = 7;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message FileEvent {
|
message FileEvent {
|
||||||
|
@ -357,9 +355,8 @@ message FileEvent {
|
||||||
int32 number = 5;
|
int32 number = 5;
|
||||||
string type = 6;
|
string type = 6;
|
||||||
string source = 7;
|
string source = 7;
|
||||||
int32 session_id = 8;
|
string data = 8;
|
||||||
string data = 9;
|
string compressed_data = 9;
|
||||||
string compressed_data = 10;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message AddNodeRequest {
|
message AddNodeRequest {
|
||||||
|
|
|
@ -990,6 +990,7 @@ class TestGrpc:
|
||||||
queue = Queue()
|
queue = Queue()
|
||||||
|
|
||||||
def handle_event(event_data):
|
def handle_event(event_data):
|
||||||
|
assert event_data.session_id == session.id
|
||||||
assert event_data.HasField("node_event")
|
assert event_data.HasField("node_event")
|
||||||
queue.put(event_data)
|
queue.put(event_data)
|
||||||
|
|
||||||
|
@ -1014,6 +1015,7 @@ class TestGrpc:
|
||||||
queue = Queue()
|
queue = Queue()
|
||||||
|
|
||||||
def handle_event(event_data):
|
def handle_event(event_data):
|
||||||
|
assert event_data.session_id == session.id
|
||||||
assert event_data.HasField("link_event")
|
assert event_data.HasField("link_event")
|
||||||
queue.put(event_data)
|
queue.put(event_data)
|
||||||
|
|
||||||
|
@ -1036,6 +1038,7 @@ class TestGrpc:
|
||||||
queue = Queue()
|
queue = Queue()
|
||||||
|
|
||||||
def handle_event(event_data):
|
def handle_event(event_data):
|
||||||
|
assert event_data.session_id == session.id
|
||||||
queue.put(event_data)
|
queue.put(event_data)
|
||||||
|
|
||||||
# then
|
# then
|
||||||
|
@ -1053,6 +1056,7 @@ class TestGrpc:
|
||||||
queue = Queue()
|
queue = Queue()
|
||||||
|
|
||||||
def handle_event(event_data):
|
def handle_event(event_data):
|
||||||
|
assert event_data.session_id == session.id
|
||||||
assert event_data.HasField("session_event")
|
assert event_data.HasField("session_event")
|
||||||
queue.put(event_data)
|
queue.put(event_data)
|
||||||
|
|
||||||
|
@ -1075,6 +1079,7 @@ class TestGrpc:
|
||||||
queue = Queue()
|
queue = Queue()
|
||||||
|
|
||||||
def handle_event(event_data):
|
def handle_event(event_data):
|
||||||
|
assert event_data.session_id == session.id
|
||||||
assert event_data.HasField("config_event")
|
assert event_data.HasField("config_event")
|
||||||
queue.put(event_data)
|
queue.put(event_data)
|
||||||
|
|
||||||
|
@ -1098,6 +1103,7 @@ class TestGrpc:
|
||||||
queue = Queue()
|
queue = Queue()
|
||||||
|
|
||||||
def handle_event(event_data):
|
def handle_event(event_data):
|
||||||
|
assert event_data.session_id == session.id
|
||||||
assert event_data.HasField("exception_event")
|
assert event_data.HasField("exception_event")
|
||||||
queue.put(event_data)
|
queue.put(event_data)
|
||||||
|
|
||||||
|
@ -1120,6 +1126,7 @@ class TestGrpc:
|
||||||
queue = Queue()
|
queue = Queue()
|
||||||
|
|
||||||
def handle_event(event_data):
|
def handle_event(event_data):
|
||||||
|
assert event_data.session_id == session.id
|
||||||
assert event_data.HasField("file_event")
|
assert event_data.HasField("file_event")
|
||||||
queue.put(event_data)
|
queue.put(event_data)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue