Merge branch 'coretk' into coretk-color

This commit is contained in:
Huy Pham 2019-12-16 11:43:53 -08:00
commit 3ceb5cde3e
6 changed files with 58 additions and 24 deletions

View file

@ -84,7 +84,8 @@ class CoreClient:
self.service_configs = {} self.service_configs = {}
self.file_configs = {} self.file_configs = {}
self.mobility_players = {} self.mobility_players = {}
self.throughput = False self.handling_throughputs = None
self.handling_events = None
def reset(self): def reset(self):
# helpers # helpers
@ -101,6 +102,13 @@ class CoreClient:
self.service_configs.clear() self.service_configs.clear()
self.file_configs.clear() self.file_configs.clear()
self.mobility_players.clear() self.mobility_players.clear()
# clear streams
if self.handling_throughputs:
self.handling_throughputs.cancel()
self.handling_throughputs = None
if self.handling_events:
self.handling_events.cancel()
self.handling_events = None
def set_observer(self, value): def set_observer(self, value):
self.observer = value self.observer = value
@ -176,11 +184,22 @@ class CoreClient:
canvas_node = self.canvas_nodes[node_id] canvas_node = self.canvas_nodes[node_id]
canvas_node.move(x, y) canvas_node.move(x, y)
def enable_throughputs(self):
self.handling_throughputs = self.client.throughputs(
self.session_id, self.handle_throughputs
)
def cancel_throughputs(self):
self.handling_throughputs.cancel()
self.handling_throughputs = None
def handle_throughputs(self, event): def handle_throughputs(self, event):
if self.throughput: if event.session_id != self.session_id:
self.app.canvas.throughput_draw.process_grpc_throughput_event( return
event.interface_throughputs logging.info("handling throughputs event: %s", event)
) self.app.canvas.throughput_draw.process_grpc_throughput_event(
event.interface_throughputs
)
def handle_exception_event(self, event): def handle_exception_event(self, event):
logging.info("exception event: %s", event) logging.info("exception event: %s", event)
@ -199,8 +218,9 @@ class CoreClient:
response = self.client.get_session(self.session_id) response = self.client.get_session(self.session_id)
session = response.session session = response.session
self.state = session.state self.state = session.state
self.client.events(self.session_id, self.handle_events) self.handling_events = self.client.events(
self.client.throughputs(self.handle_throughputs) self.session_id, self.handle_events
)
# get location # get location
if query_location: if query_location:

View file

@ -151,8 +151,7 @@ class MenuAction:
dialog.show() dialog.show()
def throughput(self): def throughput(self):
throughput = self.app.core.throughput if not self.app.core.handling_throughputs:
if throughput: self.app.core.enable_throughputs()
self.app.core.throughput = False
else: else:
self.app.core.throughput = True self.app.core.cancel_throughputs()

View file

@ -385,23 +385,27 @@ class CoreGrpcClient:
:param int session_id: id of session :param int session_id: id of session
:param handler: handler for received events :param handler: handler for received events
:param list events: events to listen to, defaults to all :param list events: events to listen to, defaults to all
:return: nothing :return: stream processing events, can be used to cancel stream
:raises grpc.RpcError: when session doesn't exist :raises grpc.RpcError: when session doesn't exist
""" """
request = core_pb2.EventsRequest(session_id=session_id, events=events) request = core_pb2.EventsRequest(session_id=session_id, events=events)
stream = self.stub.Events(request) stream = self.stub.Events(request)
start_streamer(stream, handler) start_streamer(stream, handler)
return stream
def throughputs(self, handler): def throughputs(self, session_id, handler):
""" """
Listen for throughput events with information for interfaces and bridges. Listen for throughput events with information for interfaces and bridges.
:param int session_id: session id
:param handler: handler for every event :param handler: handler for every event
:return: nothing :return: stream processing events, can be used to cancel stream
:raises grpc.RpcError: when session doesn't exist
""" """
request = core_pb2.ThroughputsRequest() request = core_pb2.ThroughputsRequest(session_id=session_id)
stream = self.stub.Throughputs(request) stream = self.stub.Throughputs(request)
start_streamer(stream, handler) start_streamer(stream, handler)
return stream
def add_node(self, session_id, node): def add_node(self, session_id, node):
""" """

View file

@ -27,7 +27,7 @@ from core.nodes.lxd import LxcNode
from core.services.coreservices import ServiceManager from core.services.coreservices import ServiceManager
_ONE_DAY_IN_SECONDS = 60 * 60 * 24 _ONE_DAY_IN_SECONDS = 60 * 60 * 24
_INTERFACE_REGEX = re.compile(r"\d+") _INTERFACE_REGEX = re.compile(r"[0-9a-fA-F]+")
class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
@ -452,9 +452,11 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
:param grpc.SrevicerContext context: context object :param grpc.SrevicerContext context: context object
:return: nothing :return: nothing
""" """
session = self.get_session(request.session_id, context)
delay = 3 delay = 3
last_check = None last_check = None
last_stats = None last_stats = None
while self._is_running(context): while self._is_running(context):
now = time.monotonic() now = time.monotonic()
stats = get_net_stats() stats = get_net_stats()
@ -462,7 +464,7 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
# calculate average # calculate average
if last_check is not None: if last_check is not None:
interval = now - last_check interval = now - last_check
throughputs_event = core_pb2.ThroughputsEvent() throughputs_event = core_pb2.ThroughputsEvent(session_id=session.id)
for key in stats: for key in stats:
current_rxtx = stats[key] current_rxtx = stats[key]
previous_rxtx = last_stats.get(key) previous_rxtx = last_stats.get(key)
@ -477,8 +479,11 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
throughput = rx_kbps + tx_kbps throughput = rx_kbps + tx_kbps
if key.startswith("veth"): if key.startswith("veth"):
key = key.split(".") key = key.split(".")
node_id = int(_INTERFACE_REGEX.search(key[0]).group()) node_id = int(_INTERFACE_REGEX.search(key[0]).group(), base=16)
interface_id = int(key[1]) interface_id = int(key[1], base=16)
session_id = int(key[2], base=16)
if session.id != session_id:
continue
interface_throughput = ( interface_throughput = (
throughputs_event.interface_throughputs.add() throughputs_event.interface_throughputs.add()
) )
@ -487,7 +492,11 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
interface_throughput.throughput = throughput interface_throughput.throughput = throughput
elif key.startswith("b."): elif key.startswith("b."):
try: try:
node_id = int(key.split(".")[1]) key = key.split(".")
node_id = int(key[1], base=16)
session_id = int(key[2], base=16)
if session.id != session_id:
continue
bridge_throughput = ( bridge_throughput = (
throughputs_event.bridge_throughputs.add() throughputs_event.bridge_throughputs.add()
) )

View file

@ -272,11 +272,13 @@ message EventsRequest {
} }
message ThroughputsRequest { message ThroughputsRequest {
int32 session_id = 1;
} }
message ThroughputsEvent { message ThroughputsEvent {
repeated BridgeThroughput bridge_throughputs = 1; int32 session_id = 1;
repeated InterfaceThroughput interface_throughputs = 2; repeated BridgeThroughput bridge_throughputs = 2;
repeated InterfaceThroughput interface_throughputs = 3;
} }
message InterfaceThroughput { message InterfaceThroughput {

View file

@ -1032,7 +1032,7 @@ class TestGrpc:
# given # given
client = CoreGrpcClient() client = CoreGrpcClient()
grpc_server.coreemu.create_session() session = grpc_server.coreemu.create_session()
queue = Queue() queue = Queue()
def handle_event(event_data): def handle_event(event_data):
@ -1040,7 +1040,7 @@ class TestGrpc:
# then # then
with client.context_connect(): with client.context_connect():
client.throughputs(handle_event) client.throughputs(session.id, handle_event)
time.sleep(0.1) time.sleep(0.1)
# then # then