From b1726e0c05b1fcbdf9e8fd65cd75fceae135acaa Mon Sep 17 00:00:00 2001 From: Chris Hofstaedtler Date: Sun, 8 Sep 2024 00:45:42 +0200 Subject: [PATCH] auth ALIAS: add DNSSEC tests --- regression-tests.auth-py/authtests.py | 80 +++++++++++++++++++ regression-tests.auth-py/requirements.txt | 1 + regression-tests.auth-py/test_ALIAS.py | 34 +++++--- regression-tests.auth-py/test_GSSTSIG.py | 3 + .../test_XFRIncomplete.py | 2 + 5 files changed, 108 insertions(+), 12 deletions(-) diff --git a/regression-tests.auth-py/authtests.py b/regression-tests.auth-py/authtests.py index 5eb6534df8fe..89e2daa51363 100644 --- a/regression-tests.auth-py/authtests.py +++ b/regression-tests.auth-py/authtests.py @@ -137,6 +137,68 @@ def generateAuthConfig(cls, confdir): except subprocess.CalledProcessError as e: raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd, e.returncode, e.output)) + @classmethod + def listKeys(cls, confdir: str, zonename: str): + zone = '.' if zonename == 'ROOT' else zonename + pdnsutilCmd = [os.environ['PDNSUTIL'], + '--config-dir=%s' % confdir, + 'list-keys', + zone] + + print(' '.join(pdnsutilCmd)) + try: + lines = subprocess.check_output(pdnsutilCmd, stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd, e.returncode, e.output)) + + keys = [] + found_header = False + for line in lines.splitlines(): + if not found_header: + if line.startswith(b"----"): + found_header = True + continue + line = line.decode().split() + if not (len(line) >= 8 and line[1] in ("CSK", "KSK", "ZSK")): + continue + print(line) + keys.append( + { + "zone": line[0], + "type": line[1], + "act": line[2], + "pub": line[3], + "size": int(line[4]), + "algorithm": line[5], + "id": int(line[6]), + "location": line[7], + "keytag": int(line[8]), + }) + return keys + + @classmethod + def exportZoneDnsKey(cls, confdir: str, zonename: str, keyid: int): + zone = '.' if zonename == 'ROOT' else zonename + pdnsutilCmd = [os.environ['PDNSUTIL'], + '--config-dir=%s' % confdir, + 'export-zone-dnskey', + zone, + str(keyid)] + + print(' '.join(pdnsutilCmd)) + try: + lines = subprocess.check_output(pdnsutilCmd, stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd, e.returncode, e.output)) + + for line in lines.splitlines(): + line = line.strip().decode().split(maxsplit=3) + print(line) + if line[2] == "DNSKEY": + _, rdclass, rdtype, key = line + return dns.rrset.from_text(dns.name.from_text(zone), 3600, rdclass, rdtype, key) + return None + @classmethod def secureZone(cls, confdir, zonename, key=None): zone = '.' if zonename == 'ROOT' else zonename @@ -168,6 +230,7 @@ def secureZone(cls, confdir, zonename, key=None): def generateAllAuthConfig(cls, confdir): cls.generateAuthConfig(confdir) cls.generateAuthNamedConf(confdir, cls._zones.keys()) + cls._zone_dnskeys = {} for zonename, zonecontent in cls._zones.items(): cls.generateAuthZone(confdir, @@ -175,6 +238,10 @@ def generateAllAuthConfig(cls, confdir): zonecontent) if cls._zone_keys.get(zonename, None): cls.secureZone(confdir, zonename, cls._zone_keys.get(zonename)) + zone_keys = cls.listKeys(confdir, zonename) + cls._zone_dnskeys[dns.name.from_text(zonename)] = cls.exportZoneDnsKey( + confdir, zonename, zone_keys[-1]["id"] + ) @classmethod def waitForTCPSocket(cls, ipaddress, port): @@ -590,3 +657,16 @@ def assertAuthorityHasSOA(self, msg): if not found: raise AssertionError("No SOA record found in the authority section:\n%s" % msg.to_text()) + + def assertSigned(self, msg: dns.message.Message, name: str | dns.name.Name, rdatatype: str | int): + if not isinstance(msg, dns.message.Message): + raise TypeError("msg is not a dns.message.Message but a %s" % type(msg)) + + name = dns.name.from_text(name) + + if not isinstance(rdatatype, int): + rdatatype = dns.rdatatype.from_text(rdatatype) + + rrset = msg.find_rrset(msg.answer, name, dns.rdataclass.IN, rdatatype) + rrsig = msg.find_rrset(msg.answer, name, dns.rdataclass.IN, dns.rdatatype.RRSIG, covers=rdatatype) + dns.dnssec.validate(rrset, rrsig, self._zone_dnskeys) diff --git a/regression-tests.auth-py/requirements.txt b/regression-tests.auth-py/requirements.txt index f1236ed89f9b..aaa56746ff34 100644 --- a/regression-tests.auth-py/requirements.txt +++ b/regression-tests.auth-py/requirements.txt @@ -1,3 +1,4 @@ +cryptography dnspython==2.1.0 pytest Twisted>0.15.0 diff --git a/regression-tests.auth-py/test_ALIAS.py b/regression-tests.auth-py/test_ALIAS.py index 8cd7f4f3deb7..8bb8088e7865 100644 --- a/regression-tests.auth-py/test_ALIAS.py +++ b/regression-tests.auth-py/test_ALIAS.py @@ -62,32 +62,37 @@ def startResponders(cls): cls._ALIASResponder.start() def testNoError(self): - expected_a = [dns.rrset.from_text('noerror.example.org.', + name = 'noerror.example.org.' + expected_a = [dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.1')] - expected_aaaa = [dns.rrset.from_text('noerror.example.org.', + expected_aaaa = [dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'AAAA', '2001:DB8::1')] - query = dns.message.make_query('noerror.example.org', 'A') + query = dns.message.make_query(name, 'A', want_dnssec=True) res = self.sendUDPQuery(query) self.assertRcodeEqual(res, dns.rcode.NOERROR) self.assertAnyRRsetInAnswer(res, expected_a) self.assertEqual(len(res.options), 0) # this checks that we don't invent ECS on non-ECS queries + self.assertSigned(res, name, dns.rdatatype.A) - query = dns.message.make_query('noerror.example.org', 'AAAA') + query = dns.message.make_query(name, 'AAAA', want_dnssec=True) res = self.sendUDPQuery(query) self.assertRcodeEqual(res, dns.rcode.NOERROR) self.assertAnyRRsetInAnswer(res, expected_aaaa) + self.assertSigned(res, name, dns.rdatatype.AAAA) - query = dns.message.make_query('noerror.example.org', 'ANY') + query = dns.message.make_query(name, 'ANY', want_dnssec=True) res = self.sendUDPQuery(query) self.assertRcodeEqual(res, dns.rcode.NOERROR) self.assertAnyRRsetInAnswer(res, expected_a) self.assertAnyRRsetInAnswer(res, expected_aaaa) + self.assertSigned(res, name, dns.rdatatype.A) + self.assertSigned(res, name, dns.rdatatype.AAAA) # NODATA - query = dns.message.make_query('noerror.example.org', 'MX') + query = dns.message.make_query(name, 'MX') res = self.sendUDPQuery(query) self.assertRcodeEqual(res, dns.rcode.NOERROR) self.assertEqual(len(res.answer), 0) @@ -121,31 +126,36 @@ def testServFail(self): self.assertRcodeEqual(res, dns.rcode.SERVFAIL) def testNoErrorTCP(self): - expected_a = [dns.rrset.from_text('noerror.example.org.', + name = 'noerror.example.org.' + expected_a = [dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.1')] - expected_aaaa = [dns.rrset.from_text('noerror.example.org.', + expected_aaaa = [dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'AAAA', '2001:DB8::1')] - query = dns.message.make_query('noerror.example.org', 'A') + query = dns.message.make_query(name, 'A', want_dnssec=True) res = self.sendTCPQuery(query) self.assertRcodeEqual(res, dns.rcode.NOERROR) self.assertAnyRRsetInAnswer(res, expected_a) + self.assertSigned(res, name, dns.rdatatype.A) - query = dns.message.make_query('noerror.example.org', 'AAAA') + query = dns.message.make_query(name, 'AAAA', want_dnssec=True) res = self.sendTCPQuery(query) self.assertRcodeEqual(res, dns.rcode.NOERROR) self.assertAnyRRsetInAnswer(res, expected_aaaa) + self.assertSigned(res, name, dns.rdatatype.AAAA) - query = dns.message.make_query('noerror.example.org', 'ANY') + query = dns.message.make_query(name, 'ANY', want_dnssec=True) res = self.sendTCPQuery(query) self.assertRcodeEqual(res, dns.rcode.NOERROR) self.assertAnyRRsetInAnswer(res, expected_a) self.assertAnyRRsetInAnswer(res, expected_aaaa) + self.assertSigned(res, name, dns.rdatatype.A) + self.assertSigned(res, name, dns.rdatatype.AAAA) # NODATA - query = dns.message.make_query('noerror.example.org', 'MX') + query = dns.message.make_query(name, 'MX', want_dnssec=True) res = self.sendTCPQuery(query) self.assertRcodeEqual(res, dns.rcode.NOERROR) self.assertEqual(len(res.answer), 0) diff --git a/regression-tests.auth-py/test_GSSTSIG.py b/regression-tests.auth-py/test_GSSTSIG.py index 2297f46c2a98..ec30f2089c3f 100644 --- a/regression-tests.auth-py/test_GSSTSIG.py +++ b/regression-tests.auth-py/test_GSSTSIG.py @@ -33,6 +33,9 @@ class GSSTSIGBase(AuthTest): 'KRB5_KTNAME' : './kerberos-client/kt.keytab' } + # zones will be created by our own setUpClass() code + _zones = {} + @classmethod def setUpClass(cls): super(GSSTSIGBase, cls).setUpClass() diff --git a/regression-tests.auth-py/test_XFRIncomplete.py b/regression-tests.auth-py/test_XFRIncomplete.py index 3db145e293f1..effbf19f3653 100644 --- a/regression-tests.auth-py/test_XFRIncomplete.py +++ b/regression-tests.auth-py/test_XFRIncomplete.py @@ -154,6 +154,8 @@ class XFRIncompleteAuthTest(AuthTest): #axfr-fetch-timeout=20 """ + _zones = {} # zone setup happens in setUpClass + @classmethod def setUpClass(cls): super(XFRIncompleteAuthTest, cls).setUpClass()