further removal and refactoring of methods used within misc/utils.py

This commit is contained in:
Blake J. Harnden 2018-03-02 09:15:52 -08:00
parent 6211b09585
commit 00b3c97448
23 changed files with 181 additions and 293 deletions

View file

@ -136,7 +136,7 @@ router ospf6
:param routerid: router id
:param str redistribute: redistribute value
"""
ospf6ifs = utils.maketuple(ospf6ifs)
ospf6ifs = utils.make_tuple(ospf6ifs)
interfaces = "\n!\n".join(map(str, ospf6ifs))
ospfifs = "\n ".join(map(lambda x: "interface %s area %s" % (x.name(), area), ospf6ifs))
Conf.__init__(self, interfaces=interfaces, routerid=routerid, ospfifs=ospfifs, redistribute=redistribute)
@ -163,9 +163,9 @@ $forwarding
:param str logfile: log file name
:param debugs: debug options
"""
routers = "\n!\n".join(map(str, utils.maketuple(routers)))
routers = "\n!\n".join(map(str, utils.make_tuple(routers)))
if debugs:
debugs = "\n".join(utils.maketuple(debugs))
debugs = "\n".join(utils.make_tuple(debugs))
else:
debugs = "! no debugs"
forwarding = "ip forwarding\nipv6 forwarding"

View file

@ -17,7 +17,74 @@ from core import logger
DEVNULL = open(os.devnull, "wb")
def closeonexec(fd):
def _detach_init():
"""
Fork a child process and exit.
:return: nothing
"""
if os.fork():
# parent exits
os._exit(0)
os.setsid()
def _valid_module(path, file_name):
"""
Check if file is a valid python module.
:param str path: path to file
:param str file_name: file name to check
:return: True if a valid python module file, False otherwise
:rtype: bool
"""
file_path = os.path.join(path, file_name)
if not os.path.isfile(file_path):
return False
if file_name.startswith("_"):
return False
if not file_name.endswith(".py"):
return False
return True
def _is_class(module, member, clazz):
"""
Validates if a module member is a class and an instance of a CoreService.
:param module: module to validate for service
:param member: member to validate for service
:param clazz: clazz type to check for validation
:return: True if a valid service, False otherwise
:rtype: bool
"""
if not inspect.isclass(member):
return False
if not issubclass(member, clazz):
return False
if member.__module__ != module.__name__:
return False
return True
def _is_exe(file_path):
"""
Check if a given file path exists and is an executable file.
:param str file_path: file path to check
:return: True if the file is considered and executable file, False otherwise
:rtype: bool
"""
return os.path.isfile(file_path) and os.access(file_path, os.X_OK)
def close_onexec(fd):
"""
Close on execution of a shell process.
@ -37,56 +104,11 @@ def check_executables(executables):
:raises EnvironmentError: when an executable doesn't exist or is not executable
"""
for executable in executables:
if not is_exe(executable):
if not _is_exe(executable):
raise EnvironmentError("executable not found: %s" % executable)
def is_exe(file_path):
"""
Check if a given file path exists and is an executable file.
:param str file_path: file path to check
:return: True if the file is considered and executable file, False otherwise
:rtype: bool
"""
return os.path.isfile(file_path) and os.access(file_path, os.X_OK)
def which(program):
"""
From: http://stackoverflow.com/questions/377017/test-if-executable-exists-in-python
:param str program: program to check for
:return: path if it exists, none otherwise
"""
fpath, fname = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
path = path.strip("\"")
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
return None
def ensurepath(pathlist):
"""
Checks a list of paths are contained within the environment path, if not add it to the path.
:param list[str] pathlist: list of paths to check
:return: nothing
"""
searchpath = os.environ["PATH"].split(":")
for p in set(pathlist):
if p not in searchpath:
os.environ["PATH"] += ":" + p
def maketuple(obj):
def make_tuple(obj):
"""
Create a tuple from an object, or return the object itself.
@ -100,7 +122,7 @@ def maketuple(obj):
return obj,
def maketuplefromstr(s, value_type):
def make_tuple_fromstr(s, value_type):
"""
Create a tuple from a string.
@ -122,101 +144,22 @@ def split_args(args):
:return: shell-like syntax list
:rtype: list
"""
# split shell string to shell array for convenience
if type(args) == str:
args = shlex.split(args)
return args
def mutecall(*args, **kwargs):
"""
Run a muted call command.
:param list args: arguments for the command
:param dict kwargs: keyword arguments for the command
:return: command result
:rtype: int
"""
kwargs["stdout"] = DEVNULL
kwargs["stderr"] = subprocess.STDOUT
return subprocess.call(*args, **kwargs)
def mutecheck_call(*args, **kwargs):
"""
Run a muted check call command.
:param list args: arguments for the command
:param dict kwargs: keyword arguments for the command
:return: command result
:rtype: int
"""
kwargs["stdout"] = DEVNULL
kwargs["stderr"] = subprocess.STDOUT
return subprocess.check_call(*args, **kwargs)
def spawn(*args, **kwargs):
"""
Wrapper for running a spawn command and returning the process id.
:param list args: arguments for the command
:param dict kwargs: keyword arguments for the command
:return: process id of the command
:rtype: int
"""
return subprocess.Popen(*args, **kwargs).pid
def mutespawn(*args, **kwargs):
"""
Wrapper for running a muted spawned command.
:param list args: arguments for the command
:param dict kwargs: keyword arguments for the command
:return: process id of the command
:rtype: int
"""
kwargs["stdout"] = DEVNULL
kwargs["stderr"] = subprocess.STDOUT
return subprocess.Popen(*args, **kwargs).pid
def detachinit():
"""
Fork a child process and exit.
:return: nothing
"""
if os.fork():
# parent exits
os._exit(0)
os.setsid()
def detach(*args, **kwargs):
"""
Run a detached process by forking it.
:param list args: arguments for the command
:param dict kwargs: keyword arguments for the command
:return: process id of the command
:rtype: int
"""
kwargs["preexec_fn"] = detachinit
return subprocess.Popen(*args, **kwargs).pid
def mutedetach(*args, **kwargs):
def mute_detach(args, **kwargs):
"""
Run a muted detached process by forking it.
:param list args: arguments for the command
:param list[str]|str args: arguments for the command
:param dict kwargs: keyword arguments for the command
:return: process id of the command
:rtype: int
"""
kwargs["preexec_fn"] = detachinit
args = split_args(args)
kwargs["preexec_fn"] = _detach_init
kwargs["stdout"] = DEVNULL
kwargs["stderr"] = subprocess.STDOUT
return subprocess.Popen(*args, **kwargs).pid
@ -286,7 +229,7 @@ def check_cmd(args, **kwargs):
raise subprocess.CalledProcessError(-1, args)
def hexdump(s, bytes_per_word=2, words_per_line=8):
def hex_dump(s, bytes_per_word=2, words_per_line=8):
"""
Hex dump of a string.
@ -297,10 +240,11 @@ def hexdump(s, bytes_per_word=2, words_per_line=8):
"""
dump = ""
count = 0
bytes = bytes_per_word * words_per_line
total_bytes = bytes_per_word * words_per_line
while s:
line = s[:bytes]
s = s[bytes:]
line = s[:total_bytes]
s = s[total_bytes:]
tmp = map(lambda x: ("%02x" * bytes_per_word) % x,
zip(*[iter(map(ord, line))] * bytes_per_word))
if len(line) % 2:
@ -310,7 +254,7 @@ def hexdump(s, bytes_per_word=2, words_per_line=8):
return dump[:-1]
def filemunge(pathname, header, text):
def file_munge(pathname, header, text):
"""
Insert text at the end of a file, surrounded by header comments.
@ -320,15 +264,15 @@ def filemunge(pathname, header, text):
:return: nothing
"""
# prevent duplicates
filedemunge(pathname, header)
f = open(pathname, "a")
f.write("# BEGIN %s\n" % header)
f.write(text)
f.write("# END %s\n" % header)
f.close()
file_demunge(pathname, header)
with open(pathname, "a") as append_file:
append_file.write("# BEGIN %s\n" % header)
append_file.write(text)
append_file.write("# END %s\n" % header)
def filedemunge(pathname, header):
def file_demunge(pathname, header):
"""
Remove text that was inserted in a file surrounded by header comments.
@ -336,25 +280,27 @@ def filedemunge(pathname, header):
:param str header: header text to target for removal
:return: nothing
"""
f = open(pathname, "r")
lines = f.readlines()
f.close()
with open(pathname, "r") as read_file:
lines = read_file.readlines()
start = None
end = None
for i in range(len(lines)):
if lines[i] == "# BEGIN %s\n" % header:
start = i
elif lines[i] == "# END %s\n" % header:
end = i + 1
if start is None or end is None:
return
f = open(pathname, "w")
lines = lines[:start] + lines[end:]
f.write("".join(lines))
f.close()
with open(pathname, "w") as write_file:
lines = lines[:start] + lines[end:]
write_file.write("".join(lines))
def expandcorepath(pathname, session=None, node=None):
def expand_corepath(pathname, session=None, node=None):
"""
Expand a file path given session information.
@ -369,13 +315,15 @@ def expandcorepath(pathname, session=None, node=None):
pathname = pathname.replace("%SESSION%", str(session.session_id))
pathname = pathname.replace("%SESSION_DIR%", session.session_dir)
pathname = pathname.replace("%SESSION_USER%", session.user)
if node is not None:
pathname = pathname.replace("%NODE%", str(node.objid))
pathname = pathname.replace("%NODENAME%", node.name)
return pathname
def sysctldevname(devname):
def sysctl_devname(devname):
"""
Translate a device name to the name used with sysctl.
@ -477,7 +425,7 @@ def daemonize(rootdir="/", umask=0, close_fds=False, dontclose=(),
logger.exception("error closing file descriptor")
def readfileintodict(filename, d):
def load_config(filename, d):
"""
Read key=value pairs from a file, into a dict. Skip comments; strip newline characters and spacing.
@ -487,78 +435,18 @@ def readfileintodict(filename, d):
"""
with open(filename, "r") as f:
lines = f.readlines()
for l in lines:
if l[:1] == "#":
for line in lines:
if line[:1] == "#":
continue
try:
key, value = l.split("=", 1)
key, value = line.split("=", 1)
d[key] = value.strip()
except ValueError:
logger.exception("error reading file to dict: %s", filename)
def checkforkernelmodule(name):
"""
Return a string if a Linux kernel module is loaded, None otherwise.
The string is the line from /proc/modules containing the module name,
memory size (bytes), number of loaded instances, dependencies, state,
and kernel memory offset.
:param str name: name of kernel module to check for
:return: kernel module line, None otherwise
:rtype: str
"""
with open("/proc/modules", "r") as f:
for line in f:
if line.startswith(name + " "):
return line.rstrip()
return None
def _valid_module(path, file_name):
"""
Check if file is a valid python module.
:param str path: path to file
:param str file_name: file name to check
:return: True if a valid python module file, False otherwise
:rtype: bool
"""
file_path = os.path.join(path, file_name)
if not os.path.isfile(file_path):
return False
if file_name.startswith("_"):
return False
if not file_name.endswith(".py"):
return False
return True
def _is_class(module, member, clazz):
"""
Validates if a module member is a class and an instance of a CoreService.
:param module: module to validate for service
:param member: member to validate for service
:param clazz: clazz type to check for validation
:return: True if a valid service, False otherwise
:rtype: bool
"""
if not inspect.isclass(member):
return False
if not issubclass(member, clazz):
return False
if member.__module__ != module.__name__:
return False
return True
def load_classes(path, clazz):
"""
Dynamically load classes for use within CORE.