updated grpc throughputs to only check a specific session and verify the data being collected and sent is for that session, fixed data from throughputs being in hex getting converted to int, updated coretk to only run throughputs when enabled, updated grpc streams to return the stream to allow it being canceled

This commit is contained in:
Blake Harnden 2019-12-16 11:14:05 -08:00
parent 85521e8c8f
commit 424f69bb15
6 changed files with 47 additions and 23 deletions

View file

@ -84,7 +84,7 @@ class CoreClient:
self.service_configs = {}
self.file_configs = {}
self.mobility_players = {}
self.throughput = False
self.handling_throughputs = None
def reset(self):
# helpers
@ -176,8 +176,19 @@ class CoreClient:
canvas_node = self.canvas_nodes[node_id]
canvas_node.move(x, y)
def enable_throughputs(self):
self.handling_throughputs = self.client.throughputs(
self.session_id, self.handle_throughputs
)
def cancel_throughputs(self):
self.handling_throughputs.cancel()
self.handling_throughputs = None
def handle_throughputs(self, event):
if self.throughput:
if event.session_id != self.session_id:
return
logging.info("handling throughputs event: %s", event)
self.app.canvas.throughput_draw.process_grpc_throughput_event(
event.interface_throughputs
)
@ -200,7 +211,6 @@ class CoreClient:
session = response.session
self.state = session.state
self.client.events(self.session_id, self.handle_events)
self.client.throughputs(self.handle_throughputs)
# get location
if query_location:

View file

@ -151,8 +151,7 @@ class MenuAction:
dialog.show()
def throughput(self):
throughput = self.app.core.throughput
if throughput:
self.app.core.throughput = False
if not self.app.core.handling_throughputs:
self.app.core.enable_throughputs()
else:
self.app.core.throughput = True
self.app.core.cancel_throughputs()

View file

@ -385,23 +385,27 @@ class CoreGrpcClient:
:param int session_id: id of session
:param handler: handler for received events
:param list events: events to listen to, defaults to all
:return: nothing
:return: stream processing events, can be used to cancel stream
:raises grpc.RpcError: when session doesn't exist
"""
request = core_pb2.EventsRequest(session_id=session_id, events=events)
stream = self.stub.Events(request)
start_streamer(stream, handler)
return stream
def throughputs(self, handler):
def throughputs(self, session_id, handler):
"""
Listen for throughput events with information for interfaces and bridges.
:param int session_id: session id
:param handler: handler for every event
:return: nothing
:return: stream processing events, can be used to cancel stream
:raises grpc.RpcError: when session doesn't exist
"""
request = core_pb2.ThroughputsRequest()
request = core_pb2.ThroughputsRequest(session_id=session_id)
stream = self.stub.Throughputs(request)
start_streamer(stream, handler)
return stream
def add_node(self, session_id, node):
"""

View file

@ -27,7 +27,7 @@ from core.nodes.lxd import LxcNode
from core.services.coreservices import ServiceManager
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
_INTERFACE_REGEX = re.compile(r"\d+")
_INTERFACE_REGEX = re.compile(r"[0-9a-fA-F]+")
class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
@ -452,9 +452,11 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
:param grpc.SrevicerContext context: context object
:return: nothing
"""
session = self.get_session(request.session_id, context)
delay = 3
last_check = None
last_stats = None
while self._is_running(context):
now = time.monotonic()
stats = get_net_stats()
@ -462,7 +464,7 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
# calculate average
if last_check is not None:
interval = now - last_check
throughputs_event = core_pb2.ThroughputsEvent()
throughputs_event = core_pb2.ThroughputsEvent(session_id=session.id)
for key in stats:
current_rxtx = stats[key]
previous_rxtx = last_stats.get(key)
@ -477,8 +479,11 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
throughput = rx_kbps + tx_kbps
if key.startswith("veth"):
key = key.split(".")
node_id = int(_INTERFACE_REGEX.search(key[0]).group())
interface_id = int(key[1])
node_id = int(_INTERFACE_REGEX.search(key[0]).group(), base=16)
interface_id = int(key[1], base=16)
session_id = int(key[2], base=16)
if session.id != session_id:
continue
interface_throughput = (
throughputs_event.interface_throughputs.add()
)
@ -487,7 +492,11 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
interface_throughput.throughput = throughput
elif key.startswith("b."):
try:
node_id = int(key.split(".")[1])
key = key.split(".")
node_id = int(key[1], base=16)
session_id = int(key[2], base=16)
if session.id != session_id:
continue
bridge_throughput = (
throughputs_event.bridge_throughputs.add()
)

View file

@ -272,11 +272,13 @@ message EventsRequest {
}
message ThroughputsRequest {
int32 session_id = 1;
}
message ThroughputsEvent {
repeated BridgeThroughput bridge_throughputs = 1;
repeated InterfaceThroughput interface_throughputs = 2;
int32 session_id = 1;
repeated BridgeThroughput bridge_throughputs = 2;
repeated InterfaceThroughput interface_throughputs = 3;
}
message InterfaceThroughput {

View file

@ -1032,7 +1032,7 @@ class TestGrpc:
# given
client = CoreGrpcClient()
grpc_server.coreemu.create_session()
session = grpc_server.coreemu.create_session()
queue = Queue()
def handle_event(event_data):
@ -1040,7 +1040,7 @@ class TestGrpc:
# then
with client.context_connect():
client.throughputs(handle_event)
client.throughputs(session.id, handle_event)
time.sleep(0.1)
# then