Skip to content

Commit

Permalink
auth-py tests: test ECS in ALIAS forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
Habbie committed Jan 11, 2024
1 parent 75970d0 commit 93e5793
Showing 1 changed file with 42 additions and 2 deletions.
44 changes: 42 additions & 2 deletions regression-tests.auth-py/test_ALIAS.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import threading
import unittest
import clientsubnetoption

import dns
from twisted.internet.protocol import DatagramProtocol
Expand All @@ -20,6 +21,7 @@ class TestALIAS(AuthTest):
resolver=%s.1:5301
any-to-tcp=no
launch=bind
edns-subnet-processing=yes
"""

_config_params = ['_PREFIX']
Expand All @@ -34,7 +36,8 @@ class TestALIAS(AuthTest):
noerror.example.org. 3600 IN ALIAS noerror.example.com.
nxd.example.org. 3600 IN ALIAS nxd.example.com.
servfail.example.org. 3600 IN ALIAS servfail.example.com
servfail.example.org. 3600 IN ALIAS servfail.example.com.
subnet.example.org. 3600 IN ALIAS subnet.example.com.
""",
}

Expand Down Expand Up @@ -171,6 +174,30 @@ def testServFailTCP(self):
res = self.sendTCPQuery(query)
self.assertRcodeEqual(res, dns.rcode.SERVFAIL)

def testECS(self):
expected_a = [dns.rrset.from_text('subnet.example.org.',
0, dns.rdataclass.IN, 'A',
'192.0.2.1')]
expected_aaaa = [dns.rrset.from_text('subnet.example.org.',
0, dns.rdataclass.IN, 'AAAA',
'2001:DB8::1')]

ecso = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24)
ecso2 = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24, 22)
query = dns.message.make_query('subnet.example.org', 'A', use_edns=True, options=[ecso])
res = self.sendUDPQuery(query)
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertAnyRRsetInAnswer(res, expected_a)
self.assertEqual(res.options[0], ecso2)

ecso = clientsubnetoption.ClientSubnetOption('2001:db8:db6:db5::', 64)
ecso2 = clientsubnetoption.ClientSubnetOption('2001:db8:db6::', 64, 48)
query = dns.message.make_query('subnet.example.org', 'A', use_edns=True, options=[ecso])
res = self.sendUDPQuery(query)
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertAnyRRsetInAnswer(res, expected_a)
self.assertEqual(res.options[0], ecso2)


class AliasUDPResponder(DatagramProtocol):
def datagramReceived(self, datagram, address):
Expand All @@ -183,7 +210,12 @@ def datagramReceived(self, datagram, address):
name = question.name
name_text = name.to_text()

if name_text == 'noerror.example.com.':
if name_text in ('noerror.example.com.', 'subnet.example.com.'):

do_ecs = False
if name_text == 'subnet.example.com.':
do_ecs=True

response.set_rcode(dns.rcode.NOERROR)
if question.rdtype in [dns.rdatatype.A,
dns.rdatatype.ANY]:
Expand All @@ -198,6 +230,14 @@ def datagramReceived(self, datagram, address):
dns.rrset.from_text(name,
0, dns.rdataclass.IN, 'AAAA',
'2001:DB8::1'))

if do_ecs:
if request.options[0].family == clientsubnetoption.FAMILY_IPV4:
ecso = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24, 22)
else:
ecso = clientsubnetoption.ClientSubnetOption('2001:db8:db6::', 64, 48)
response.use_edns(edns=True, options=[ecso])

if name_text == 'nxd.example.com.':
response.set_rcode(dns.rcode.NXDOMAIN)
response.authority.append(
Expand Down

0 comments on commit 93e5793

Please sign in to comment.