daemon: updated core.services to avoid using deprecated type hinting, also updated string formatting to f strings

This commit is contained in:
Blake Harnden 2023-04-13 15:39:40 -07:00
parent 7f58224f43
commit 921bfdf527
11 changed files with 435 additions and 500 deletions

View file

@ -1,7 +1,7 @@
"""
utility.py: defines miscellaneous utility services.
"""
from typing import Optional, Tuple
from typing import Optional
import netaddr
@ -27,8 +27,8 @@ class UtilService(CoreService):
class IPForwardService(UtilService):
name: str = "IPForward"
configs: Tuple[str, ...] = ("ipforward.sh",)
startup: Tuple[str, ...] = ("bash ipforward.sh",)
configs: tuple[str, ...] = ("ipforward.sh",)
startup: tuple[str, ...] = ("bash ipforward.sh",)
@classmethod
def generate_config(cls, node: CoreNode, filename: str) -> str:
@ -36,32 +36,30 @@ class IPForwardService(UtilService):
@classmethod
def generateconfiglinux(cls, node: CoreNode, filename: str) -> str:
cfg = """\
cfg = f"""\
#!/bin/sh
# auto-generated by IPForward service (utility.py)
%(sysctl)s -w net.ipv4.conf.all.forwarding=1
%(sysctl)s -w net.ipv4.conf.default.forwarding=1
%(sysctl)s -w net.ipv6.conf.all.forwarding=1
%(sysctl)s -w net.ipv6.conf.default.forwarding=1
%(sysctl)s -w net.ipv4.conf.all.send_redirects=0
%(sysctl)s -w net.ipv4.conf.default.send_redirects=0
%(sysctl)s -w net.ipv4.conf.all.rp_filter=0
%(sysctl)s -w net.ipv4.conf.default.rp_filter=0
""" % {
"sysctl": SYSCTL
}
{SYSCTL} -w net.ipv4.conf.all.forwarding=1
{SYSCTL} -w net.ipv4.conf.default.forwarding=1
{SYSCTL} -w net.ipv6.conf.all.forwarding=1
{SYSCTL} -w net.ipv6.conf.default.forwarding=1
{SYSCTL} -w net.ipv4.conf.all.send_redirects=0
{SYSCTL} -w net.ipv4.conf.default.send_redirects=0
{SYSCTL} -w net.ipv4.conf.all.rp_filter=0
{SYSCTL} -w net.ipv4.conf.default.rp_filter=0
"""
for iface in node.get_ifaces():
name = utils.sysctl_devname(iface.name)
cfg += "%s -w net.ipv4.conf.%s.forwarding=1\n" % (SYSCTL, name)
cfg += "%s -w net.ipv4.conf.%s.send_redirects=0\n" % (SYSCTL, name)
cfg += "%s -w net.ipv4.conf.%s.rp_filter=0\n" % (SYSCTL, name)
cfg += f"{SYSCTL} -w net.ipv4.conf.{name}.forwarding=1\n"
cfg += f"{SYSCTL} -w net.ipv4.conf.{name}.send_redirects=0\n"
cfg += f"{SYSCTL} -w net.ipv4.conf.{name}.rp_filter=0\n"
return cfg
class DefaultRouteService(UtilService):
name: str = "DefaultRoute"
configs: Tuple[str, ...] = ("defaultroute.sh",)
startup: Tuple[str, ...] = ("bash defaultroute.sh",)
configs: tuple[str, ...] = ("defaultroute.sh",)
startup: tuple[str, ...] = ("bash defaultroute.sh",)
@classmethod
def generate_config(cls, node: CoreNode, filename: str) -> str:
@ -83,8 +81,8 @@ class DefaultRouteService(UtilService):
class DefaultMulticastRouteService(UtilService):
name: str = "DefaultMulticastRoute"
configs: Tuple[str, ...] = ("defaultmroute.sh",)
startup: Tuple[str, ...] = ("bash defaultmroute.sh",)
configs: tuple[str, ...] = ("defaultmroute.sh",)
startup: tuple[str, ...] = ("bash defaultmroute.sh",)
@classmethod
def generate_config(cls, node: CoreNode, filename: str) -> str:
@ -94,7 +92,7 @@ class DefaultMulticastRouteService(UtilService):
cfg += "as needed\n"
for iface in node.get_ifaces(control=False):
rtcmd = "ip route add 224.0.0.0/4 dev"
cfg += "%s %s\n" % (rtcmd, iface.name)
cfg += f"{rtcmd} {iface.name}\n"
cfg += "\n"
break
return cfg
@ -102,8 +100,8 @@ class DefaultMulticastRouteService(UtilService):
class StaticRouteService(UtilService):
name: str = "StaticRoute"
configs: Tuple[str, ...] = ("staticroute.sh",)
startup: Tuple[str, ...] = ("bash staticroute.sh",)
configs: tuple[str, ...] = ("staticroute.sh",)
startup: tuple[str, ...] = ("bash staticroute.sh",)
custom_needed: bool = True
@classmethod
@ -127,16 +125,16 @@ class StaticRouteService(UtilService):
if ip[-2] == ip[1]:
return ""
else:
rtcmd = "#/sbin/ip route add %s via" % dst
return "%s %s" % (rtcmd, ip[1])
rtcmd = f"#/sbin/ip route add {dst} via"
return f"{rtcmd} {ip[1]}"
class SshService(UtilService):
name: str = "SSH"
configs: Tuple[str, ...] = ("startsshd.sh", "/etc/ssh/sshd_config")
dirs: Tuple[str, ...] = ("/etc/ssh", "/var/run/sshd")
startup: Tuple[str, ...] = ("bash startsshd.sh",)
shutdown: Tuple[str, ...] = ("killall sshd",)
configs: tuple[str, ...] = ("startsshd.sh", "/etc/ssh/sshd_config")
dirs: tuple[str, ...] = ("/etc/ssh", "/var/run/sshd")
startup: tuple[str, ...] = ("bash startsshd.sh",)
shutdown: tuple[str, ...] = ("killall sshd",)
validation_mode: ServiceMode = ServiceMode.BLOCKING
@classmethod
@ -149,26 +147,22 @@ class SshService(UtilService):
sshstatedir = cls.dirs[1]
sshlibdir = "/usr/lib/openssh"
if filename == "startsshd.sh":
return """\
return f"""\
#!/bin/sh
# auto-generated by SSH service (utility.py)
ssh-keygen -q -t rsa -N "" -f %s/ssh_host_rsa_key
chmod 655 %s
ssh-keygen -q -t rsa -N "" -f {sshcfgdir}/ssh_host_rsa_key
chmod 655 {sshstatedir}
# wait until RSA host key has been generated to launch sshd
/usr/sbin/sshd -f %s/sshd_config
""" % (
sshcfgdir,
sshstatedir,
sshcfgdir,
)
/usr/sbin/sshd -f {sshcfgdir}/sshd_config
"""
else:
return """\
return f"""\
# auto-generated by SSH service (utility.py)
Port 22
Protocol 2
HostKey %s/ssh_host_rsa_key
HostKey {sshcfgdir}/ssh_host_rsa_key
UsePrivilegeSeparation yes
PidFile %s/sshd.pid
PidFile {sshstatedir}/sshd.pid
KeyRegenerationInterval 3600
ServerKeyBits 768
@ -197,23 +191,19 @@ PrintLastLog yes
TCPKeepAlive yes
AcceptEnv LANG LC_*
Subsystem sftp %s/sftp-server
Subsystem sftp {sshlibdir}/sftp-server
UsePAM yes
UseDNS no
""" % (
sshcfgdir,
sshstatedir,
sshlibdir,
)
"""
class DhcpService(UtilService):
name: str = "DHCP"
configs: Tuple[str, ...] = ("/etc/dhcp/dhcpd.conf",)
dirs: Tuple[str, ...] = ("/etc/dhcp", "/var/lib/dhcp")
startup: Tuple[str, ...] = ("touch /var/lib/dhcp/dhcpd.leases", "dhcpd")
shutdown: Tuple[str, ...] = ("killall dhcpd",)
validate: Tuple[str, ...] = ("pidof dhcpd",)
configs: tuple[str, ...] = ("/etc/dhcp/dhcpd.conf",)
dirs: tuple[str, ...] = ("/etc/dhcp", "/var/lib/dhcp")
startup: tuple[str, ...] = ("touch /var/lib/dhcp/dhcpd.leases", "dhcpd")
shutdown: tuple[str, ...] = ("killall dhcpd",)
validate: tuple[str, ...] = ("pidof dhcpd",)
@classmethod
def generate_config(cls, node: CoreNode, filename: str) -> str:
@ -252,21 +242,15 @@ ddns-update-style none;
index = (ip.size - 2) / 2
rangelow = ip[index]
rangehigh = ip[-2]
return """
subnet %s netmask %s {
pool {
range %s %s;
return f"""
subnet {ip.cidr.ip} netmask {ip.netmask} {{
pool {{
range {rangelow} {rangehigh};
default-lease-time 600;
option routers %s;
}
}
""" % (
ip.cidr.ip,
ip.netmask,
rangelow,
rangehigh,
ip.ip,
)
option routers {ip.ip};
}}
}}
"""
class DhcpClientService(UtilService):
@ -275,10 +259,10 @@ class DhcpClientService(UtilService):
"""
name: str = "DHCPClient"
configs: Tuple[str, ...] = ("startdhcpclient.sh",)
startup: Tuple[str, ...] = ("bash startdhcpclient.sh",)
shutdown: Tuple[str, ...] = ("killall dhclient",)
validate: Tuple[str, ...] = ("pidof dhclient",)
configs: tuple[str, ...] = ("startdhcpclient.sh",)
startup: tuple[str, ...] = ("bash startdhcpclient.sh",)
shutdown: tuple[str, ...] = ("killall dhclient",)
validate: tuple[str, ...] = ("pidof dhclient",)
@classmethod
def generate_config(cls, node: CoreNode, filename: str) -> str:
@ -291,10 +275,10 @@ class DhcpClientService(UtilService):
cfg += "side DNS\n# resolution based on the DHCP server response.\n"
cfg += "#mkdir -p /var/run/resolvconf/interface\n"
for iface in node.get_ifaces(control=False):
cfg += "#ln -s /var/run/resolvconf/interface/%s.dhclient" % iface.name
cfg += f"#ln -s /var/run/resolvconf/interface/{iface.name}.dhclient"
cfg += " /var/run/resolvconf/resolv.conf\n"
cfg += "/sbin/dhclient -nw -pf /var/run/dhclient-%s.pid" % iface.name
cfg += " -lf /var/run/dhclient-%s.lease %s\n" % (iface.name, iface.name)
cfg += f"/sbin/dhclient -nw -pf /var/run/dhclient-{iface.name}.pid"
cfg += f" -lf /var/run/dhclient-{iface.name}.lease {iface.name}\n"
return cfg
@ -304,11 +288,11 @@ class FtpService(UtilService):
"""
name: str = "FTP"
configs: Tuple[str, ...] = ("vsftpd.conf",)
dirs: Tuple[str, ...] = ("/var/run/vsftpd/empty", "/var/ftp")
startup: Tuple[str, ...] = ("vsftpd ./vsftpd.conf",)
shutdown: Tuple[str, ...] = ("killall vsftpd",)
validate: Tuple[str, ...] = ("pidof vsftpd",)
configs: tuple[str, ...] = ("vsftpd.conf",)
dirs: tuple[str, ...] = ("/var/run/vsftpd/empty", "/var/ftp")
startup: tuple[str, ...] = ("vsftpd ./vsftpd.conf",)
shutdown: tuple[str, ...] = ("killall vsftpd",)
validate: tuple[str, ...] = ("pidof vsftpd",)
@classmethod
def generate_config(cls, node: CoreNode, filename: str) -> str:
@ -337,12 +321,12 @@ class HttpService(UtilService):
"""
name: str = "HTTP"
configs: Tuple[str, ...] = (
configs: tuple[str, ...] = (
"/etc/apache2/apache2.conf",
"/etc/apache2/envvars",
"/var/www/index.html",
)
dirs: Tuple[str, ...] = (
dirs: tuple[str, ...] = (
"/etc/apache2",
"/var/run/apache2",
"/var/log/apache2",
@ -350,9 +334,9 @@ class HttpService(UtilService):
"/var/lock/apache2",
"/var/www",
)
startup: Tuple[str, ...] = ("chown www-data /var/lock/apache2", "apache2ctl start")
shutdown: Tuple[str, ...] = ("apache2ctl stop",)
validate: Tuple[str, ...] = ("pidof apache2",)
startup: tuple[str, ...] = ("chown www-data /var/lock/apache2", "apache2ctl start")
shutdown: tuple[str, ...] = ("apache2ctl stop",)
validate: tuple[str, ...] = ("pidof apache2",)
APACHEVER22: int = 22
APACHEVER24: int = 24
@ -538,18 +522,15 @@ export LANG
@classmethod
def generatehtml(cls, node: CoreNode, filename: str) -> str:
body = (
"""\
body = f"""\
<!-- generated by utility.py:HttpService -->
<h1>%s web server</h1>
<h1>{node.name} web server</h1>
<p>This is the default web page for this server.</p>
<p>The web server software is running but no content has been added, yet.</p>
"""
% node.name
)
for iface in node.get_ifaces(control=False):
body += "<li>%s - %s</li>\n" % (iface.name, [str(x) for x in iface.ips()])
return "<html><body>%s</body></html>" % body
body += f"<li>{iface.name} - {[str(x) for x in iface.ips()]}</li>\n"
return f"<html><body>{body}</body></html>"
class PcapService(UtilService):
@ -558,10 +539,10 @@ class PcapService(UtilService):
"""
name: str = "pcap"
configs: Tuple[str, ...] = ("pcap.sh",)
startup: Tuple[str, ...] = ("bash pcap.sh start",)
shutdown: Tuple[str, ...] = ("bash pcap.sh stop",)
validate: Tuple[str, ...] = ("pidof tcpdump",)
configs: tuple[str, ...] = ("pcap.sh",)
startup: tuple[str, ...] = ("bash pcap.sh start",)
shutdown: tuple[str, ...] = ("bash pcap.sh stop",)
validate: tuple[str, ...] = ("pidof tcpdump",)
meta: str = "logs network traffic to pcap packet capture files"
@classmethod
@ -582,11 +563,9 @@ if [ "x$1" = "xstart" ]; then
if iface.control:
cfg += "# "
redir = "< /dev/null"
cfg += "tcpdump ${DUMPOPTS} -w %s.%s.pcap -i %s %s &\n" % (
node.name,
iface.name,
iface.name,
redir,
cfg += (
f"tcpdump ${{DUMPOPTS}} -w {node.name}.{iface.name}.pcap "
f"-i {iface.name} {redir} &\n"
)
cfg += """
@ -600,13 +579,13 @@ fi;
class RadvdService(UtilService):
name: str = "radvd"
configs: Tuple[str, ...] = ("/etc/radvd/radvd.conf",)
dirs: Tuple[str, ...] = ("/etc/radvd", "/var/run/radvd")
startup: Tuple[str, ...] = (
configs: tuple[str, ...] = ("/etc/radvd/radvd.conf",)
dirs: tuple[str, ...] = ("/etc/radvd", "/var/run/radvd")
startup: tuple[str, ...] = (
"radvd -C /etc/radvd/radvd.conf -m logfile -l /var/log/radvd.log",
)
shutdown: Tuple[str, ...] = ("pkill radvd",)
validate: Tuple[str, ...] = ("pidof radvd",)
shutdown: tuple[str, ...] = ("pkill radvd",)
validate: tuple[str, ...] = ("pidof radvd",)
@classmethod
def generate_config(cls, node: CoreNode, filename: str) -> str:
@ -619,32 +598,26 @@ class RadvdService(UtilService):
prefixes = list(map(cls.subnetentry, iface.ips()))
if len(prefixes) < 1:
continue
cfg += (
"""\
interface %s
{
cfg += f"""\
interface {iface.name}
{{
AdvSendAdvert on;
MinRtrAdvInterval 3;
MaxRtrAdvInterval 10;
AdvDefaultPreference low;
AdvHomeAgentFlag off;
"""
% iface.name
)
for prefix in prefixes:
if prefix == "":
continue
cfg += (
"""\
prefix %s
{
cfg += f"""\
prefix {prefix}
{{
AdvOnLink on;
AdvAutonomous on;
AdvRouterAddr on;
};
}};
"""
% prefix
)
cfg += "};\n"
return cfg
@ -667,10 +640,10 @@ class AtdService(UtilService):
"""
name: str = "atd"
configs: Tuple[str, ...] = ("startatd.sh",)
dirs: Tuple[str, ...] = ("/var/spool/cron/atjobs", "/var/spool/cron/atspool")
startup: Tuple[str, ...] = ("bash startatd.sh",)
shutdown: Tuple[str, ...] = ("pkill atd",)
configs: tuple[str, ...] = ("startatd.sh",)
dirs: tuple[str, ...] = ("/var/spool/cron/atjobs", "/var/spool/cron/atspool")
startup: tuple[str, ...] = ("bash startatd.sh",)
shutdown: tuple[str, ...] = ("pkill atd",)
@classmethod
def generate_config(cls, node: CoreNode, filename: str) -> str: