Finalized sig_fich implementation
This commit is contained in:
parent
b8b32fe189
commit
b7918a2e30
1 changed files with 135 additions and 22 deletions
|
@ -1,6 +1,28 @@
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from cryptography.hazmat.primitives import serialization
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import padding
|
||||||
|
from cryptography.hazmat.primitives import hashes
|
||||||
|
from cryptography.hazmat.backends import default_backend
|
||||||
from cryptography import x509
|
from cryptography import x509
|
||||||
|
from cryptography.x509.oid import NameOID
|
||||||
|
from cryptography.exceptions import InvalidSignature
|
||||||
|
import argparse
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
|
def mkpair(x, y):
|
||||||
|
""" produz uma byte-string contendo o tuplo '(x,y)' ('x' e 'y' são byte-strings) """
|
||||||
|
len_x = len(x)
|
||||||
|
len_x_bytes = len_x.to_bytes(2, 'little')
|
||||||
|
return len_x_bytes + x + y
|
||||||
|
|
||||||
|
def unpair(xy):
|
||||||
|
""" extrai componentes de um par codificado com 'mkpair' """
|
||||||
|
len_x = int.from_bytes(xy[:2], 'little')
|
||||||
|
x = xy[2:len_x+2]
|
||||||
|
y = xy[len_x+2:]
|
||||||
|
return x, y
|
||||||
|
|
||||||
def cert_load(fname):
|
def cert_load(fname):
|
||||||
""" lê certificado de ficheiro """
|
""" lê certificado de ficheiro """
|
||||||
with open(fname, "rb") as fcert:
|
with open(fname, "rb") as fcert:
|
||||||
|
@ -8,43 +30,134 @@ def cert_load(fname):
|
||||||
return cert
|
return cert
|
||||||
|
|
||||||
def cert_validtime(cert, now=None):
|
def cert_validtime(cert, now=None):
|
||||||
""" valida que 'now' se encontra no período
|
""" valida que 'now' se encontra no período
|
||||||
de validade do certificado. """
|
de validade do certificado. """
|
||||||
if now is None:
|
if now is None:
|
||||||
now = datetime.datetime.now(tz=datetime.timezone.utc)
|
now = datetime.datetime.now(tz=datetime.timezone.utc)
|
||||||
if now < cert.not_valid_before_utc or now > cert.not_valid_after_utc:
|
if now < cert.not_valid_before_utc or now > cert.not_valid_after_utc:
|
||||||
raise x509.verification.VerificationError("Certificate is not valid at this time")
|
raise x509.verification.VerificationError("Certificate is not valid at this time")
|
||||||
|
|
||||||
def cert_validsubject(cert, attrs=[]):
|
def cert_validsubject(cert, attrs=[]):
|
||||||
""" verifica atributos do campo 'subject'. 'attrs'
|
""" verifica atributos do campo 'subject'. 'attrs'
|
||||||
é uma lista de pares '(attr,value)' que condiciona
|
é uma lista de pares '(attr,value)' que condiciona
|
||||||
os valores de 'attr' a 'value'. """
|
os valores de 'attr' a 'value'. """
|
||||||
print(cert.subject)
|
print(cert.subject)
|
||||||
for attr in attrs:
|
for attr in attrs:
|
||||||
if cert.subject.get_attributes_for_oid(attr[0])[0].value != attr[1]:
|
if cert.subject.get_attributes_for_oid(attr[0])[0].value != attr[1]:
|
||||||
raise x509.verification.VerificationError("Certificate subject does not match expected value")
|
raise x509.verification.VerificationError("Certificate subject does not match expected value")
|
||||||
|
|
||||||
def cert_validexts(cert, policy=[]):
|
def cert_validexts(cert, policy=[]):
|
||||||
""" valida extensões do certificado. 'policy' é uma lista de pares '(ext,pred)' onde 'ext' é o OID de uma extensão e 'pred'
|
""" valida extensões do certificado. 'policy' é uma lista de pares '(ext,pred)' onde 'ext' é o OID de uma extensão e 'pred'
|
||||||
o predicado responsável por verificar o conteúdo dessa extensão. """
|
o predicado responsável por verificar o conteúdo dessa extensão. """
|
||||||
for check in policy:
|
for check in policy:
|
||||||
ext = cert.extensions.get_extension_for_oid(check[0]).value
|
ext = cert.extensions.get_extension_for_oid(check[0]).value
|
||||||
if not check[1](ext):
|
if not check[1](ext):
|
||||||
raise x509.verification.VerificationError("Certificate extensions does not match expected value")
|
raise x509.verification.VerificationError("Certificate extensions does not match expected value")
|
||||||
|
|
||||||
def valida_certALICE(ca_cert):
|
def valida_cert(cert, ca_cert, user):
|
||||||
try:
|
try:
|
||||||
cert = cert_load("ALICE.crt")
|
ca_cert.public_key().verify(
|
||||||
# obs: pressupõe que a cadeia de certifica só contém 2 níveis
|
cert.signature,
|
||||||
cert.verify_directly_issued_by(ca_cert)
|
cert.tbs_certificate_bytes,
|
||||||
# verificar período de validade...
|
padding.PKCS1v15(),
|
||||||
cert_validtime(cert)
|
cert.signature_hash_algorithm
|
||||||
# verificar identidade... (e.g.)
|
)
|
||||||
cert_validsubject(cert, [(x509.NameOID.COMMON_NAME, "ALICE")])
|
except InvalidSignature:
|
||||||
# verificar aplicabilidade... (e.g.)
|
raise x509.verification.VerificationError("Certificate signature is invalid")
|
||||||
#cert_validexts(cert, [(x509.ExtensionOID.EXTENDED_KEY_USAGE, lambda e: x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH in e)])
|
|
||||||
#print("Certificate is valid!")
|
cert_validtime(cert)
|
||||||
return True
|
|
||||||
except:
|
cert_validsubject(cert, [(NameOID.COMMON_NAME, user)])
|
||||||
#print("Certificate is invalid!")
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
"""
|
||||||
|
sign <user> <fich> -- em que assina o conteúdo de <fich> usando a chave privada armazenada em <user>.key.
|
||||||
|
Deve produzir o ficheiro <fich>.sig contendo o par composto pela assinatura e certificado do assinante;
|
||||||
|
"""
|
||||||
|
def sign(user, filename):
|
||||||
|
|
||||||
|
with open(user + ".key", "rb") as key_file:
|
||||||
|
private_key = serialization.load_pem_private_key(key_file.read(), password=b'1234', backend=default_backend())
|
||||||
|
|
||||||
|
user_cert = cert_load(user + ".crt")
|
||||||
|
|
||||||
|
with open(filename, "rb") as file:
|
||||||
|
data_to_sign = file.read()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
signature = private_key.sign(data_to_sign,
|
||||||
|
padding.PSS(mgf=padding.MGF1(hashes.SHA256()),
|
||||||
|
salt_length=padding.PSS.MAX_LENGTH),
|
||||||
|
hashes.SHA256())
|
||||||
|
|
||||||
|
|
||||||
|
signature_and_cert = mkpair(signature, user_cert.public_bytes(serialization.Encoding.PEM))
|
||||||
|
|
||||||
|
with open(filename + ".sig", "wb") as sig_file:
|
||||||
|
sig_file.write(signature_and_cert)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
"""
|
||||||
|
verify <fich>-- verifica a assinatura contida em <fich>.sig usando a
|
||||||
|
informação do signatário contida no certificado (também incluído em <fich>.sig).
|
||||||
|
Deve apresentar o status de validade da assinatura (Válida/Inválida) e,
|
||||||
|
no caso de ser válida, ainda os dados do signatário.
|
||||||
|
"""
|
||||||
|
def verify(filename, user):
|
||||||
|
|
||||||
|
with open(filename + ".sig", "rb") as sig_file:
|
||||||
|
signature_and_cert = sig_file.read()
|
||||||
|
|
||||||
|
signature, cert_bytes = unpair(signature_and_cert)
|
||||||
|
|
||||||
|
cert = x509.load_pem_x509_certificate(cert_bytes, default_backend())
|
||||||
|
|
||||||
|
ca_cert = cert_load("EC.crt")
|
||||||
|
|
||||||
|
try:
|
||||||
|
valida_cert(cert, ca_cert, user)
|
||||||
|
|
||||||
|
with open(filename, "rb") as file:
|
||||||
|
data_to_verify = file.read()
|
||||||
|
|
||||||
|
cert.public_key().verify(signature, data_to_verify,
|
||||||
|
padding.PSS(mgf=padding.MGF1(hashes.SHA256()),
|
||||||
|
salt_length=padding.PSS.MAX_LENGTH),
|
||||||
|
hashes.SHA256())
|
||||||
|
|
||||||
|
print("Valid signature")
|
||||||
|
print(cert.subject)
|
||||||
|
|
||||||
|
except x509.verification.VerificationError as e:
|
||||||
|
print("Invalid signature: " + str(e))
|
||||||
|
return
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(description="Sign or verify files using X.509 certificates.")
|
||||||
|
parser.add_argument("command", choices=["sign", "verify"], help="Command to execute: 'sign' or 'verify'")
|
||||||
|
parser.add_argument("user", help="Username")
|
||||||
|
parser.add_argument("filename", help="File name")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if args.command == "sign":
|
||||||
|
if os.path.exists(args.user + ".crt") and os.path.exists(args.user + ".key"):
|
||||||
|
sign(args.user, args.filename)
|
||||||
|
print("File signed successfully.")
|
||||||
|
else:
|
||||||
|
print("User certificate or private key not found.")
|
||||||
|
elif args.command == "verify":
|
||||||
|
if os.path.exists(args.filename + ".sig"):
|
||||||
|
verify(args.filename, args.user)
|
||||||
|
else:
|
||||||
|
print("Signature file not found.")
|
||||||
|
else:
|
||||||
|
parser.print_help()
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue