daemon: refactoring to remove usage of os.path where possible and pathlib.Path instead
This commit is contained in:
parent
d0a55dd471
commit
1c970bbe00
38 changed files with 520 additions and 606 deletions
|
@ -271,11 +271,11 @@ def get_node_proto(session: Session, node: NodeBase) -> core_pb2.Node:
|
|||
node_dir = None
|
||||
config_services = []
|
||||
if isinstance(node, CoreNodeBase):
|
||||
node_dir = node.nodedir
|
||||
node_dir = str(node.nodedir)
|
||||
config_services = [x for x in node.config_services]
|
||||
channel = None
|
||||
if isinstance(node, CoreNode):
|
||||
channel = node.ctrlchnlname
|
||||
channel = str(node.ctrlchnlname)
|
||||
emane_model = None
|
||||
if isinstance(node, EmaneNet):
|
||||
emane_model = node.model.name
|
||||
|
|
|
@ -6,6 +6,7 @@ import tempfile
|
|||
import threading
|
||||
import time
|
||||
from concurrent import futures
|
||||
from pathlib import Path
|
||||
from typing import Iterable, Optional, Pattern, Type
|
||||
|
||||
import grpc
|
||||
|
@ -221,8 +222,7 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
|
|||
|
||||
# clear previous state and setup for creation
|
||||
session.clear()
|
||||
if not os.path.exists(session.session_dir):
|
||||
os.mkdir(session.session_dir)
|
||||
session.session_dir.mkdir(exist_ok=True)
|
||||
session.set_state(EventTypes.CONFIGURATION_STATE)
|
||||
|
||||
# location
|
||||
|
@ -366,12 +366,13 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
|
|||
sessions = []
|
||||
for session_id in self.coreemu.sessions:
|
||||
session = self.coreemu.sessions[session_id]
|
||||
session_file = str(session.file_path) if session.file_path else None
|
||||
session_summary = core_pb2.SessionSummary(
|
||||
id=session_id,
|
||||
state=session.state.value,
|
||||
nodes=session.get_node_count(),
|
||||
file=session.file_name,
|
||||
dir=session.session_dir,
|
||||
file=session_file,
|
||||
dir=str(session.session_dir),
|
||||
)
|
||||
sessions.append(session_summary)
|
||||
return core_pb2.GetSessionsResponse(sessions=sessions)
|
||||
|
@ -423,14 +424,11 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
|
|||
"""
|
||||
logging.debug("set session state: %s", request)
|
||||
session = self.get_session(request.session_id, context)
|
||||
|
||||
try:
|
||||
state = EventTypes(request.state)
|
||||
session.set_state(state)
|
||||
|
||||
if state == EventTypes.INSTANTIATION_STATE:
|
||||
if not os.path.exists(session.session_dir):
|
||||
os.mkdir(session.session_dir)
|
||||
session.session_dir.mkdir(exist_ok=True)
|
||||
session.instantiate()
|
||||
elif state == EventTypes.SHUTDOWN_STATE:
|
||||
session.shutdown()
|
||||
|
@ -438,11 +436,9 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
|
|||
session.data_collect()
|
||||
elif state == EventTypes.DEFINITION_STATE:
|
||||
session.clear()
|
||||
|
||||
result = True
|
||||
except KeyError:
|
||||
result = False
|
||||
|
||||
return core_pb2.SetSessionStateResponse(result=result)
|
||||
|
||||
def SetSessionUser(
|
||||
|
@ -573,12 +569,13 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
|
|||
mobility_configs = grpcutils.get_mobility_configs(session)
|
||||
service_configs = grpcutils.get_node_service_configs(session)
|
||||
config_service_configs = grpcutils.get_node_config_service_configs(session)
|
||||
session_file = str(session.file_path) if session.file_path else None
|
||||
session_proto = core_pb2.Session(
|
||||
id=session.id,
|
||||
state=session.state.value,
|
||||
nodes=nodes,
|
||||
links=links,
|
||||
dir=session.session_dir,
|
||||
dir=str(session.session_dir),
|
||||
user=session.user,
|
||||
default_services=default_services,
|
||||
location=location,
|
||||
|
@ -591,7 +588,7 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
|
|||
config_service_configs=config_service_configs,
|
||||
mobility_configs=mobility_configs,
|
||||
metadata=session.metadata,
|
||||
file=session.file_name,
|
||||
file=session_file,
|
||||
)
|
||||
return core_pb2.GetSessionResponse(session=session_proto)
|
||||
|
||||
|
@ -1508,15 +1505,15 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
|
|||
"""
|
||||
logging.debug("open xml: %s", request)
|
||||
session = self.coreemu.create_session()
|
||||
|
||||
temp = tempfile.NamedTemporaryFile(delete=False)
|
||||
temp.write(request.data.encode("utf-8"))
|
||||
temp.close()
|
||||
|
||||
temp_path = Path(temp.name)
|
||||
file_path = Path(request.file)
|
||||
try:
|
||||
session.open_xml(temp.name, request.start)
|
||||
session.name = os.path.basename(request.file)
|
||||
session.file_name = request.file
|
||||
session.open_xml(temp_path, request.start)
|
||||
session.name = file_path.name
|
||||
session.file_path = file_path
|
||||
return core_pb2.OpenXmlResponse(session_id=session.id, result=True)
|
||||
except IOError:
|
||||
logging.exception("error opening session file")
|
||||
|
@ -1733,12 +1730,10 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
|
|||
|
||||
def ExecuteScript(self, request, context):
|
||||
existing_sessions = set(self.coreemu.sessions.keys())
|
||||
file_path = Path(request.script)
|
||||
thread = threading.Thread(
|
||||
target=utils.execute_file,
|
||||
args=(
|
||||
request.script,
|
||||
{"__file__": request.script, "coreemu": self.coreemu},
|
||||
),
|
||||
args=(file_path, {"coreemu": self.coreemu}),
|
||||
daemon=True,
|
||||
)
|
||||
thread.start()
|
||||
|
|
|
@ -3,7 +3,6 @@ socket server request handlers leveraged by core servers.
|
|||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import shlex
|
||||
import shutil
|
||||
import socketserver
|
||||
|
@ -11,6 +10,7 @@ import sys
|
|||
import threading
|
||||
import time
|
||||
from itertools import repeat
|
||||
from pathlib import Path
|
||||
from queue import Empty, Queue
|
||||
from typing import Optional
|
||||
|
||||
|
@ -167,39 +167,27 @@ class CoreHandler(socketserver.BaseRequestHandler):
|
|||
date_list = []
|
||||
thumb_list = []
|
||||
num_sessions = 0
|
||||
|
||||
with self._sessions_lock:
|
||||
for _id in self.coreemu.sessions:
|
||||
session = self.coreemu.sessions[_id]
|
||||
num_sessions += 1
|
||||
id_list.append(str(_id))
|
||||
|
||||
name = session.name
|
||||
if not name:
|
||||
name = ""
|
||||
name_list.append(name)
|
||||
|
||||
file_name = session.file_name
|
||||
if not file_name:
|
||||
file_name = ""
|
||||
file_list.append(file_name)
|
||||
|
||||
file_name = str(session.file_path) if session.file_path else ""
|
||||
file_list.append(str(file_name))
|
||||
node_count_list.append(str(session.get_node_count()))
|
||||
|
||||
date_list.append(time.ctime(session.state_time))
|
||||
|
||||
thumb = session.thumbnail
|
||||
if not thumb:
|
||||
thumb = ""
|
||||
thumb = str(session.thumbnail) if session.thumbnail else ""
|
||||
thumb_list.append(thumb)
|
||||
|
||||
session_ids = "|".join(id_list)
|
||||
names = "|".join(name_list)
|
||||
files = "|".join(file_list)
|
||||
node_counts = "|".join(node_count_list)
|
||||
dates = "|".join(date_list)
|
||||
thumbs = "|".join(thumb_list)
|
||||
|
||||
if num_sessions > 0:
|
||||
tlv_data = b""
|
||||
if len(session_ids) > 0:
|
||||
|
@ -221,7 +209,6 @@ class CoreHandler(socketserver.BaseRequestHandler):
|
|||
message = coreapi.CoreSessionMessage.pack(flags, tlv_data)
|
||||
else:
|
||||
message = None
|
||||
|
||||
return message
|
||||
|
||||
def handle_broadcast_event(self, event_data):
|
||||
|
@ -931,22 +918,18 @@ class CoreHandler(socketserver.BaseRequestHandler):
|
|||
if message.flags & MessageFlags.STRING.value:
|
||||
old_session_ids = set(self.coreemu.sessions.keys())
|
||||
sys.argv = shlex.split(execute_server)
|
||||
file_name = sys.argv[0]
|
||||
|
||||
if os.path.splitext(file_name)[1].lower() == ".xml":
|
||||
file_path = Path(sys.argv[0])
|
||||
if file_path.suffix == ".xml":
|
||||
session = self.coreemu.create_session()
|
||||
try:
|
||||
session.open_xml(file_name)
|
||||
session.open_xml(file_path)
|
||||
except Exception:
|
||||
self.coreemu.delete_session(session.id)
|
||||
raise
|
||||
else:
|
||||
thread = threading.Thread(
|
||||
target=utils.execute_file,
|
||||
args=(
|
||||
file_name,
|
||||
{"__file__": file_name, "coreemu": self.coreemu},
|
||||
),
|
||||
args=(file_path, {"coreemu": self.coreemu}),
|
||||
daemon=True,
|
||||
)
|
||||
thread.start()
|
||||
|
@ -1465,10 +1448,12 @@ class CoreHandler(socketserver.BaseRequestHandler):
|
|||
:return: reply messages
|
||||
"""
|
||||
if message.flags & MessageFlags.ADD.value:
|
||||
node_num = message.get_tlv(FileTlvs.NODE.value)
|
||||
node_id = message.get_tlv(FileTlvs.NODE.value)
|
||||
file_name = message.get_tlv(FileTlvs.NAME.value)
|
||||
file_type = message.get_tlv(FileTlvs.TYPE.value)
|
||||
source_name = message.get_tlv(FileTlvs.SOURCE_NAME.value)
|
||||
src_path = message.get_tlv(FileTlvs.SOURCE_NAME.value)
|
||||
if src_path:
|
||||
src_path = Path(src_path)
|
||||
data = message.get_tlv(FileTlvs.DATA.value)
|
||||
compressed_data = message.get_tlv(FileTlvs.COMPRESSED_DATA.value)
|
||||
|
||||
|
@ -1478,7 +1463,7 @@ class CoreHandler(socketserver.BaseRequestHandler):
|
|||
)
|
||||
return ()
|
||||
|
||||
if source_name and data:
|
||||
if src_path and data:
|
||||
logging.warning(
|
||||
"ignoring invalid File message: source and data TLVs are both present"
|
||||
)
|
||||
|
@ -1490,7 +1475,7 @@ class CoreHandler(socketserver.BaseRequestHandler):
|
|||
if file_type.startswith("service:"):
|
||||
_, service_name = file_type.split(":")[:2]
|
||||
self.session.services.set_service_file(
|
||||
node_num, service_name, file_name, data
|
||||
node_id, service_name, file_name, data
|
||||
)
|
||||
return ()
|
||||
elif file_type.startswith("hook:"):
|
||||
|
@ -1500,19 +1485,20 @@ class CoreHandler(socketserver.BaseRequestHandler):
|
|||
return ()
|
||||
state = int(state)
|
||||
state = EventTypes(state)
|
||||
self.session.add_hook(state, file_name, data, source_name)
|
||||
self.session.add_hook(state, file_name, data, src_path)
|
||||
return ()
|
||||
|
||||
# writing a file to the host
|
||||
if node_num is None:
|
||||
if source_name is not None:
|
||||
shutil.copy2(source_name, file_name)
|
||||
if node_id is None:
|
||||
if src_path is not None:
|
||||
shutil.copy2(src_path, file_name)
|
||||
else:
|
||||
with open(file_name, "w") as open_file:
|
||||
open_file.write(data)
|
||||
with file_name.open("w") as f:
|
||||
f.write(data)
|
||||
return ()
|
||||
|
||||
self.session.add_node_file(node_num, source_name, file_name, data)
|
||||
file_path = Path(file_name)
|
||||
self.session.add_node_file(node_id, src_path, file_path, data)
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
|
@ -1567,26 +1553,32 @@ class CoreHandler(socketserver.BaseRequestHandler):
|
|||
"dropping unhandled event message for node: %s", node.name
|
||||
)
|
||||
return ()
|
||||
self.session.set_state(event_type)
|
||||
|
||||
if event_type == EventTypes.DEFINITION_STATE:
|
||||
self.session.set_state(event_type)
|
||||
# clear all session objects in order to receive new definitions
|
||||
self.session.clear()
|
||||
elif event_type == EventTypes.CONFIGURATION_STATE:
|
||||
self.session.set_state(event_type)
|
||||
elif event_type == EventTypes.INSTANTIATION_STATE:
|
||||
self.session.set_state(event_type)
|
||||
if len(self.handler_threads) > 1:
|
||||
# TODO: sync handler threads here before continuing
|
||||
time.sleep(2.0) # XXX
|
||||
# done receiving node/link configuration, ready to instantiate
|
||||
self.session.instantiate()
|
||||
|
||||
# after booting nodes attempt to send emulation id for nodes waiting on status
|
||||
# after booting nodes attempt to send emulation id for nodes
|
||||
# waiting on status
|
||||
for _id in self.session.nodes:
|
||||
self.send_node_emulation_id(_id)
|
||||
elif event_type == EventTypes.RUNTIME_STATE:
|
||||
self.session.set_state(event_type)
|
||||
logging.warning("Unexpected event message: RUNTIME state received")
|
||||
elif event_type == EventTypes.DATACOLLECT_STATE:
|
||||
self.session.data_collect()
|
||||
elif event_type == EventTypes.SHUTDOWN_STATE:
|
||||
self.session.set_state(event_type)
|
||||
logging.warning("Unexpected event message: SHUTDOWN state received")
|
||||
elif event_type in {
|
||||
EventTypes.START,
|
||||
|
@ -1613,13 +1605,13 @@ class CoreHandler(socketserver.BaseRequestHandler):
|
|||
name,
|
||||
)
|
||||
elif event_type == EventTypes.FILE_OPEN:
|
||||
filename = event_data.name
|
||||
self.session.open_xml(filename, start=False)
|
||||
file_path = Path(event_data.name)
|
||||
self.session.open_xml(file_path, start=False)
|
||||
self.send_objects()
|
||||
return ()
|
||||
elif event_type == EventTypes.FILE_SAVE:
|
||||
filename = event_data.name
|
||||
self.session.save_xml(filename)
|
||||
file_path = Path(event_data.name)
|
||||
self.session.save_xml(file_path)
|
||||
elif event_type == EventTypes.SCHEDULED:
|
||||
etime = event_data.time
|
||||
node_id = event_data.node
|
||||
|
@ -1733,20 +1725,16 @@ class CoreHandler(socketserver.BaseRequestHandler):
|
|||
session = self.session
|
||||
else:
|
||||
session = self.coreemu.sessions.get(session_id)
|
||||
|
||||
if session is None:
|
||||
logging.warning("session %s not found", session_id)
|
||||
continue
|
||||
|
||||
if names is not None:
|
||||
session.name = names[index]
|
||||
|
||||
if files is not None:
|
||||
session.file_name = files[index]
|
||||
|
||||
session.file_path = Path(files[index])
|
||||
if thumb:
|
||||
thumb = Path(thumb)
|
||||
session.set_thumbnail(thumb)
|
||||
|
||||
if user:
|
||||
session.set_user(user)
|
||||
elif (
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue