daemon: added type hinting throughout all services and made small tweaks/fixes that were ran across
This commit is contained in:
parent
250bc6e1f5
commit
cd74a44558
11 changed files with 560 additions and 636 deletions
|
@ -4,78 +4,79 @@ firewall)
|
|||
"""
|
||||
|
||||
import logging
|
||||
from typing import Tuple
|
||||
|
||||
from core import constants
|
||||
from core.nodes.base import CoreNode
|
||||
from core.nodes.interface import CoreInterface
|
||||
from core.services.coreservices import CoreService
|
||||
|
||||
|
||||
class VPNClient(CoreService):
|
||||
name = "VPNClient"
|
||||
group = "Security"
|
||||
configs = ("vpnclient.sh",)
|
||||
startup = ("sh vpnclient.sh",)
|
||||
shutdown = ("killall openvpn",)
|
||||
validate = ("pidof openvpn",)
|
||||
custom_needed = True
|
||||
name: str = "VPNClient"
|
||||
group: str = "Security"
|
||||
configs: Tuple[str, ...] = ("vpnclient.sh",)
|
||||
startup: Tuple[str, ...] = ("sh vpnclient.sh",)
|
||||
shutdown: Tuple[str, ...] = ("killall openvpn",)
|
||||
validate: Tuple[str, ...] = ("pidof openvpn",)
|
||||
custom_needed: bool = True
|
||||
|
||||
@classmethod
|
||||
def generate_config(cls, node, filename):
|
||||
def generate_config(cls, node: CoreNode, filename: str) -> str:
|
||||
"""
|
||||
Return the client.conf and vpnclient.sh file contents to
|
||||
"""
|
||||
cfg = "#!/bin/sh\n"
|
||||
cfg += "# custom VPN Client configuration for service (security.py)\n"
|
||||
fname = "%s/examples/services/sampleVPNClient" % constants.CORE_DATA_DIR
|
||||
|
||||
fname = f"{constants.CORE_DATA_DIR}/examples/services/sampleVPNClient"
|
||||
try:
|
||||
cfg += open(fname, "rb").read()
|
||||
with open(fname, "r") as f:
|
||||
cfg += f.read()
|
||||
except IOError:
|
||||
logging.exception(
|
||||
"Error opening VPN client configuration template (%s)", fname
|
||||
"error opening VPN client configuration template (%s)", fname
|
||||
)
|
||||
|
||||
return cfg
|
||||
|
||||
|
||||
class VPNServer(CoreService):
|
||||
name = "VPNServer"
|
||||
group = "Security"
|
||||
configs = ("vpnserver.sh",)
|
||||
startup = ("sh vpnserver.sh",)
|
||||
shutdown = ("killall openvpn",)
|
||||
validate = ("pidof openvpn",)
|
||||
custom_needed = True
|
||||
name: str = "VPNServer"
|
||||
group: str = "Security"
|
||||
configs: Tuple[str, ...] = ("vpnserver.sh",)
|
||||
startup: Tuple[str, ...] = ("sh vpnserver.sh",)
|
||||
shutdown: Tuple[str, ...] = ("killall openvpn",)
|
||||
validate: Tuple[str, ...] = ("pidof openvpn",)
|
||||
custom_needed: bool = True
|
||||
|
||||
@classmethod
|
||||
def generate_config(cls, node, filename):
|
||||
def generate_config(cls, node: CoreNode, filename: str) -> str:
|
||||
"""
|
||||
Return the sample server.conf and vpnserver.sh file contents to
|
||||
GUI for user customization.
|
||||
"""
|
||||
cfg = "#!/bin/sh\n"
|
||||
cfg += "# custom VPN Server Configuration for service (security.py)\n"
|
||||
fname = "%s/examples/services/sampleVPNServer" % constants.CORE_DATA_DIR
|
||||
|
||||
fname = f"{constants.CORE_DATA_DIR}/examples/services/sampleVPNServer"
|
||||
try:
|
||||
cfg += open(fname, "rb").read()
|
||||
with open(fname, "r") as f:
|
||||
cfg += f.read()
|
||||
except IOError:
|
||||
logging.exception(
|
||||
"Error opening VPN server configuration template (%s)", fname
|
||||
)
|
||||
|
||||
return cfg
|
||||
|
||||
|
||||
class IPsec(CoreService):
|
||||
name = "IPsec"
|
||||
group = "Security"
|
||||
configs = ("ipsec.sh",)
|
||||
startup = ("sh ipsec.sh",)
|
||||
shutdown = ("killall racoon",)
|
||||
custom_needed = True
|
||||
name: str = "IPsec"
|
||||
group: str = "Security"
|
||||
configs: Tuple[str, ...] = ("ipsec.sh",)
|
||||
startup: Tuple[str, ...] = ("sh ipsec.sh",)
|
||||
shutdown: Tuple[str, ...] = ("killall racoon",)
|
||||
custom_needed: bool = True
|
||||
|
||||
@classmethod
|
||||
def generate_config(cls, node, filename):
|
||||
def generate_config(cls, node: CoreNode, filename: str) -> str:
|
||||
"""
|
||||
Return the ipsec.conf and racoon.conf file contents to
|
||||
GUI for user customization.
|
||||
|
@ -83,7 +84,7 @@ class IPsec(CoreService):
|
|||
cfg = "#!/bin/sh\n"
|
||||
cfg += "# set up static tunnel mode security assocation for service "
|
||||
cfg += "(security.py)\n"
|
||||
fname = "%s/examples/services/sampleIPsec" % constants.CORE_DATA_DIR
|
||||
fname = f"{constants.CORE_DATA_DIR}/examples/services/sampleIPsec"
|
||||
try:
|
||||
with open(fname, "r") as f:
|
||||
cfg += f.read()
|
||||
|
@ -93,28 +94,27 @@ class IPsec(CoreService):
|
|||
|
||||
|
||||
class Firewall(CoreService):
|
||||
name = "Firewall"
|
||||
group = "Security"
|
||||
configs = ("firewall.sh",)
|
||||
startup = ("sh firewall.sh",)
|
||||
custom_needed = True
|
||||
name: str = "Firewall"
|
||||
group: str = "Security"
|
||||
configs: Tuple[str, ...] = ("firewall.sh",)
|
||||
startup: Tuple[str, ...] = ("sh firewall.sh",)
|
||||
custom_needed: bool = True
|
||||
|
||||
@classmethod
|
||||
def generate_config(cls, node, filename):
|
||||
def generate_config(cls, node: CoreNode, filename: str) -> str:
|
||||
"""
|
||||
Return the firewall rule examples to GUI for user customization.
|
||||
"""
|
||||
cfg = "#!/bin/sh\n"
|
||||
cfg += "# custom node firewall rules for service (security.py)\n"
|
||||
fname = "%s/examples/services/sampleFirewall" % constants.CORE_DATA_DIR
|
||||
|
||||
fname = f"{constants.CORE_DATA_DIR}/examples/services/sampleFirewall"
|
||||
try:
|
||||
cfg += open(fname, "rb").read()
|
||||
with open(fname, "r") as f:
|
||||
cfg += f.read()
|
||||
except IOError:
|
||||
logging.exception(
|
||||
"Error opening Firewall configuration template (%s)", fname
|
||||
)
|
||||
|
||||
return cfg
|
||||
|
||||
|
||||
|
@ -123,30 +123,28 @@ class Nat(CoreService):
|
|||
IPv4 source NAT service.
|
||||
"""
|
||||
|
||||
name = "NAT"
|
||||
executables = ("iptables",)
|
||||
group = "Security"
|
||||
configs = ("nat.sh",)
|
||||
startup = ("sh nat.sh",)
|
||||
custom_needed = False
|
||||
name: str = "NAT"
|
||||
group: str = "Security"
|
||||
executables: Tuple[str, ...] = ("iptables",)
|
||||
configs: Tuple[str, ...] = ("nat.sh",)
|
||||
startup: Tuple[str, ...] = ("sh nat.sh",)
|
||||
custom_needed: bool = False
|
||||
|
||||
@classmethod
|
||||
def generate_iface_nat_rule(cls, iface, line_prefix=""):
|
||||
def generate_iface_nat_rule(cls, iface: CoreInterface, prefix: str = "") -> str:
|
||||
"""
|
||||
Generate a NAT line for one interface.
|
||||
"""
|
||||
cfg = line_prefix + "iptables -t nat -A POSTROUTING -o "
|
||||
cfg = prefix + "iptables -t nat -A POSTROUTING -o "
|
||||
cfg += iface.name + " -j MASQUERADE\n"
|
||||
|
||||
cfg += line_prefix + "iptables -A FORWARD -i " + iface.name
|
||||
cfg += prefix + "iptables -A FORWARD -i " + iface.name
|
||||
cfg += " -m state --state RELATED,ESTABLISHED -j ACCEPT\n"
|
||||
|
||||
cfg += line_prefix + "iptables -A FORWARD -i "
|
||||
cfg += prefix + "iptables -A FORWARD -i "
|
||||
cfg += iface.name + " -j DROP\n"
|
||||
return cfg
|
||||
|
||||
@classmethod
|
||||
def generate_config(cls, node, filename):
|
||||
def generate_config(cls, node: CoreNode, filename: str) -> str:
|
||||
"""
|
||||
NAT out the first interface
|
||||
"""
|
||||
|
@ -156,7 +154,7 @@ class Nat(CoreService):
|
|||
have_nat = False
|
||||
for iface in node.get_ifaces(control=False):
|
||||
if have_nat:
|
||||
cfg += cls.generate_iface_nat_rule(iface, line_prefix="#")
|
||||
cfg += cls.generate_iface_nat_rule(iface, prefix="#")
|
||||
else:
|
||||
have_nat = True
|
||||
cfg += "# NAT out the " + iface.name + " interface\n"
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue