Skip to content

Commit

Permalink
chg: usr: Add filtering to certificate retrieval tool.
Browse files Browse the repository at this point in the history
Close #39
  • Loading branch information
ashdwilson committed Mar 2, 2021
1 parent cf7d089 commit 1cd8e05
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 26 deletions.
56 changes: 48 additions & 8 deletions dane_discovery/identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,23 +306,63 @@ def process_tlsa(cls, tlsa_record):
retval["public_key_object"] = retval["certificate_object"].public_key()
return retval

def get_all_pkix_cd_certificates(self):
def get_all_certificates(self, filters=[]):
"""Return a dictionary of all PKIX-CD certificates for this identity.
This method uses available methods for validating certificates retrieved
from TLSA records associated with the identity's DNS name.
For DANE-EE, we really just care that it parses and it was delivered with
DNSSEC.
For PKIX-EE, we require delivery to be protected by DNSSEC. In the future,
when the Python cryptography library supports full PKIX validation, we will
also include PKIX validation. https://github.com/pyca/cryptography/issues/2381
For PKIX-CD, we require that the trust chain be represented out-of-band in
accordance with the proposed standard for certificate and trust chain discovery.
Keyword args:
filters (list): List of filters for specific DANE certificate usages.
Valid filters are: "DANE-EE", "PKIX-EE", "PKIX-CD".
Return:
dict: Dictionary key is ``${DNSNAME}-${CERTHASH}``, and the value is the
the PEM-encoded certificate.
"""
retval = {}
# Bail if a bad filter is used.
if filters:
for filter_val in filters:
if filter_val not in ["DANE-EE", "PKIX-EE", "PKIX-CD"]:
raise ValueError("Invalid filter: {}".format(filter_val))
else:
filters = ["DANE-EE", "PKIX-EE", "PKIX-CD"]
# Iterate and authenticate
for cred in self.dane_credentials:
tlsa = cred["tlsa_parsed"]
if (tlsa["certificate_usage"] == 4
and tlsa["selector"] == 0
and tlsa["matching_type"] == 0):
id_name = self.dnsname
cert_obj = cred["certificate_object"]
cert_pem = cert_obj.public_bytes(serialization.Encoding.PEM)
cert_hash = DANE.generate_sha_by_selector(cert_pem, "sha256", 0)
# If it's not a full cert, skip
if not tlsa["matching_type"] == 0:
continue
id_name = self.dnsname
cert_obj = cred["certificate_object"]
cert_pem = cert_obj.public_bytes(serialization.Encoding.PEM)
cert_hash = DANE.generate_sha_by_selector(cert_pem, "sha256", 0)
# Validate for PKIX-CD
if (tlsa["certificate_usage"] == 4 and "PKIX-CD" in filters):
valid, _ = self.validate_pkix_cd(cert_obj, cred)
if not valid:
continue
retval["{}-{}".format(id_name, cert_hash)] = cert_pem
# Validate for DANE-EE (delivered via DNSSEC?)
if (tlsa["certificate_usage"] == 3 and "DANE-EE" in filters):
if not self.dnssec:
continue
retval["{}-{}".format(id_name, cert_hash)] = cert_pem
# Validate for PKIX-EE (delivered via DNSSEC?)
if (tlsa["certificate_usage"] == 1 and "PKIX-EE" in filters):
if not self.dnssec:
continue
retval["{}-{}".format(id_name, cert_hash)] = cert_pem
return retval

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dane_discovery.identity import Identity


parser = argparse.ArgumentParser("Authenticate a local certificate using PKIX-CD")
parser = argparse.ArgumentParser(description="Authenticate a local certificate using PKIX-CD")
parser.add_argument("--certificate_path", dest="cert_path", required=True, help="Path to certificate")
parser.add_argument("--identity_name", dest="dnsname", required=False, help="Identity DNS name")
parser.add_argument("--silent", dest="silent", action="store_true", help="No output, only exit code")
Expand Down
6 changes: 3 additions & 3 deletions dane_discovery/scripts/dane_discovery_get_ca_certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@



parser = argparse.ArgumentParser("Retrieve and store all CA certificates required for PKIX-CD authentication of an identity.")
parser = argparse.ArgumentParser(description="Retrieve and store all CA certificates required for PKIX-CD authentication of an identity.")
parser.add_argument("--output_path", dest="out_path", required=True, help="Output path for certificate bundle")
parser.add_argument("--separate_files", dest="separate_files", required=False,
action="store_true", help=("This will use --output_path as"
Expand All @@ -18,12 +18,12 @@
parser.set_defaults(separate_files=False)

def main():
"""Wrap functionality provided by Identity.get_all_pkix_cd_certificates()"""
"""Wrap functionality provided by Identity.get_all_certificates()"""
# Parse args
args = parser.parse_args()
# Get PKIX-CD certs from DNS
identity = Identity(args.dnsname)
ee_certs = identity.get_all_pkix_cd_certificates()
ee_certs = identity.get_all_certificates(filters=["PKIX-CD"])
# Get the CA certificates for the EE certs
certs = {}
for _, ee_cert_pem in ee_certs.items():
Expand Down
21 changes: 17 additions & 4 deletions dane_discovery/scripts/dane_discovery_get_certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,34 @@



parser = argparse.ArgumentParser("Retrieve and store all PKIX-CD certificates for an identity.")
parser = argparse.ArgumentParser(description=("Retrieve, authenticate, and store all certificates for a DANE identity.\n"
"Default behavior retrieves and authenticates all available entity certificates."
"Adding filters for specific types (--pkix-cd, for instance) limits output to those types."))
parser.add_argument("--output_path", dest="out_path", required=True, help="Output path for certificate bundle")
parser.add_argument("--separate_files", dest="separate_files", required=False,
action="store_true", help=("This will use --output_path as"
"a directory for writing individual certificate files."))
parser.add_argument("--identity_name", dest="dnsname", required=True, help="Identity DNS name")
parser.set_defaults(separate_files=False)
# Filters by type
parser.add_argument("--dane_ee", dest="filter_dane_ee", required=False, action="store_true", help="Include DANE-EE.")
parser.add_argument("--pkix_ee", dest="filter_pkix_ee", required=False, action="store_true", help="Include PKIX-EE.")
parser.add_argument("--pkix_cd", dest="filter_pkix_cd", required=False, action="store_true", help="Include PKIX-CD.")
parser.set_defaults(separate_files=False, filter_dane_ee=False, filter_pkix_ee=False, filter_pkix_cd=False)

def main():
"""Wrap functionality provided by Identity.get_all_pkix_cd_certificates()"""
"""Wrap functionality provided by Identity.get_all_certificates()"""
# Parse args
args = parser.parse_args()
filters = []
if args.filter_dane_ee:
filters.append("DANE-EE")
if args.filter_pkix_ee:
filters.append("PKIX-EE")
if args.filter_pkix_cd:
filters.append("PKIX-CD")
# Get PKIX-CD certs from DNS
identity = Identity(args.dnsname)
certs = identity.get_all_pkix_cd_certificates()
certs = identity.get_all_certificates(filters=filters)
# Write out files
if args.separate_files:
write_individual_certs(certs, args.out_path)
Expand Down
7 changes: 5 additions & 2 deletions docs/source/dane_pkix_cd_authenticate_certificate.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ Local Certificate Authentication

::

dane_pkix_cd_authenticate_certificate -h
usage: Authenticate a local certificate using PKIX-CD [-h] --certificate_path CERT_PATH [--identity_name DNSNAME] [--silent]
dane_discovery_authenticate_certificate -h
usage: dane_discovery_authenticate_certificate [-h] --certificate_path CERT_PATH [--identity_name DNSNAME] [--silent]

Authenticate a local certificate using PKIX-CD

optional arguments:
-h, --help show this help message and exit
Expand All @@ -16,3 +18,4 @@ Local Certificate Authentication
Identity DNS name
--silent No output, only exit code


9 changes: 6 additions & 3 deletions docs/source/dane_pkix_cd_get_ca_certificates.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ Download all CA certificates for an identity

::

dane_pkix_cd_get_ca_certificates -h
usage: Retrieve and store all CA certificates required for PKIX-CD authentication of an identity. [-h] --output_path OUT_PATH [--separate_files] --identity_name DNSNAME
dane_discovery_get_ca_certificates -h
usage: dane_discovery_get_ca_certificates [-h] --output_path OUT_PATH [--separate_files] --identity_name DNSNAME

Retrieve and store all CA certificates required for PKIX-CD authentication of an identity.

optional arguments:
-h, --help show this help message and exit
--output_path OUT_PATH
Output path for certificate bundle
--separate_files This will use --output_path asa directory for writing individual certificate files.
--separate_files This will use --output_path asa directory for writing individual certificate files. Individual CA certificate files will be named
AUTHORITY_HOSTNAME-CA-subjectKeyID.crt.pem
--identity_name DNSNAME
Identity DNS name
11 changes: 9 additions & 2 deletions docs/source/dane_pkix_cd_get_certificates.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ Retrieve certificates from DNS

::

dane_pkix_cd_get_certificates -h
usage: Retrieve and store all PKIX-CD certificates for an identity. [-h] --output_path OUT_PATH [--separate_files] --identity_name DNSNAME
dane_discovery_get_certificates -h
usage: dane_discovery_get_certificates [-h] --output_path OUT_PATH [--separate_files] --identity_name DNSNAME [--dane_ee] [--pkix_ee] [--pkix_cd]

Retrieve, authenticate, and store all certificates for a DANE identity. Default behavior retrieves and authenticates all available entity certificates. Adding
filters for specific types (--pkix-cd, for instance) limits output to those types.

optional arguments:
-h, --help show this help message and exit
Expand All @@ -15,3 +18,7 @@ Retrieve certificates from DNS
--separate_files This will use --output_path asa directory for writing individual certificate files.
--identity_name DNSNAME
Identity DNS name
--dane_ee Include DANE-EE.
--pkix_ee Include PKIX-EE.
--pkix_cd Include PKIX-CD.

41 changes: 38 additions & 3 deletions tests/integration/test_integration_identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,55 @@ def test_integration_identity_validate_certificate_pkix_cd_dnssec_fail(self, req
content=ca_certificate)
assert identity.validate_certificate(certificate)

def test_integration_identity_get_all_pkixcd_for_identity(self, requests_mock):
def test_integration_identity_get_all_certs_for_identity(self, requests_mock):
"""Test retrieval of all PKIX-CD certs for an identity."""
identity_name1 = ecc_identity_name
identity_name2 = rsa_identity_name
identity = Identity(identity_name1)
tlsa_dict1 = DANE.process_response(self.tlsa_for_cert(identity_name1, 4, 0, 0))
tlsa_dict2 = DANE.process_response(self.tlsa_for_cert(identity_name2, 4, 0, 0))
tlsa_dict3 = DANE.process_response(self.tlsa_for_cert(identity_name1, 3, 0, 0))
tlsa_dict4 = DANE.process_response(self.tlsa_for_cert(identity_name1, 1, 0, 0))
identity.dane_credentials = [identity.process_tlsa(record) for record
in [tlsa_dict1, tlsa_dict2]]
in [tlsa_dict1, tlsa_dict2, tlsa_dict3, tlsa_dict4]]
identity.tls = True
identity.tcp = True
identity.dnssec = True
certs = identity.get_all_pkix_cd_certificates()
ca_certificate = self.get_dyn_asset(ca_certificate_name)
certificate_path = self.get_path_for_dyn_asset("{}.cert.pem".format(identity_name1))
certificate = self.get_dyn_asset(certificate_path)
# Both identities have the same CA.
aki = DANE.get_authority_key_id_from_certificate(certificate)
requests_mock.get("https://authority.device.example.net/ca/{}.pem".format(aki),
content=ca_certificate)
certs = identity.get_all_certificates()
# We only have two UNIQUE certs, across four TLSA records.
assert len(certs) == 2
cert_list = [y for x, y in certs.items()]
assert cert_list[0] != cert_list[1]

def test_integration_identity_get_all_certs_for_identity_filtered(self, requests_mock):
"""Test retrieval of all PKIX-CD certs for an identity."""
identity_name1 = ecc_identity_name
identity_name2 = rsa_identity_name
identity = Identity(identity_name1)
tlsa_dict1 = DANE.process_response(self.tlsa_for_cert(identity_name1, 4, 0, 0))
tlsa_dict2 = DANE.process_response(self.tlsa_for_cert(identity_name2, 4, 0, 0))
tlsa_dict3 = DANE.process_response(self.tlsa_for_cert(identity_name1, 3, 0, 0))
tlsa_dict4 = DANE.process_response(self.tlsa_for_cert(identity_name1, 1, 0, 0))
identity.dane_credentials = [identity.process_tlsa(record) for record
in [tlsa_dict1, tlsa_dict2, tlsa_dict3, tlsa_dict4]]
identity.tls = True
identity.tcp = True
identity.dnssec = True
ca_certificate = self.get_dyn_asset(ca_certificate_name)
certificate_path = self.get_path_for_dyn_asset("{}.cert.pem".format(identity_name1))
certificate = self.get_dyn_asset(certificate_path)
# Both identities have the same CA.
aki = DANE.get_authority_key_id_from_certificate(certificate)
requests_mock.get("https://authority.device.example.net/ca/{}.pem".format(aki),
content=ca_certificate)
certs = identity.get_all_certificates(filters=["PKIX-EE"])
# We only have one PKIX-EE cert.
assert len(certs) == 1

0 comments on commit 1cd8e05

Please sign in to comment.