fix typo in method call and add function for semicolons

This commit is contained in:
Ivan Schaller 2024-02-29 16:22:52 +01:00
parent 6b06cada38
commit afe182628a
2 changed files with 65 additions and 8 deletions

View File

@ -91,6 +91,23 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
return absolute_value return absolute_value
@staticmethod
def _fix_semicolon(value: str, escape: bool) -> str:
"""escape and un-escape semicolons in record values for netbox/octodns
@param value: the record value
@param escape: if set to true, all semicolons get escaped with a backslash.
if false it un-escapes all semicolons.
@return: the modified record value
"""
if escape:
value = re.sub(r"\\*;", "\\;", value)
else:
value = re.sub(r"\\*;", ";", value)
return value
def _get_nb_view(self, view: str | None | Literal[False]) -> dict[str, int | str]: def _get_nb_view(self, view: str | None | Literal[False]) -> dict[str, int | str]:
"""get the correct netbox view when requested """get the correct netbox view when requested
@ -196,7 +213,7 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
} }
case "SPF" | "TXT": case "SPF" | "TXT":
value = re.sub(r"\\*;", "\\;", rcd_value) value = self._fix_semicolon(rcd_value, escape=True)
case "SRV": case "SRV":
value = { value = {
@ -236,8 +253,8 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
zone_id=nb_zone.id, status="active" zone_id=nb_zone.id, status="active"
) )
for nb_record in nb_records: for nb_record in nb_records:
rcd_name: str = nb_record.name if nb_record.name != "@" else "" rcd_name: str = "" if nb_record.name == "@" else nb_record.name
rcd_value: str = nb_record.value if nb_record.value != "@" else nb_record.zone.name rcd_value: str = nb_record.zone.name if nb_record.value == "@" else nb_record.value
rcd_type: str = nb_record.type rcd_type: str = nb_record.type
rcd_ttl: int = nb_record.ttl or nb_zone.default_ttl rcd_ttl: int = nb_record.ttl or nb_zone.default_ttl
if nb_record.type == "NS": if nb_record.type == "NS":
@ -249,11 +266,10 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
"ttl": rcd_ttl, "ttl": rcd_ttl,
"values": [], "values": [],
} }
self.log.debug(f"working on record={rcd_data} -> {rcd_value}")
self.log.debug(f"record data={rcd_data}")
try: try:
rcd_rdata = self._format_rdata(nb_record, rcd_value) rcd_rdata = self._format_rdata(rcd_type, rcd_value)
except NotImplementedError: except NotImplementedError:
continue continue
@ -262,6 +278,8 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
records[(rcd_name, rcd_type)]["values"].append(rcd_rdata) records[(rcd_name, rcd_type)]["values"].append(rcd_rdata)
self.log.debug(f"record data={records[(rcd_name, rcd_type)]}")
return list(records.values()) return list(records.values())
def populate( def populate(
@ -353,7 +371,7 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
name=rcd_name, name=rcd_name,
type=change.new._type, type=change.new._type,
ttl=change.new.ttl, ttl=change.new.ttl,
value=re.sub(r"\\*;", ";", record), value=self._fix_semicolon(record, escape=False),
disable_ptr=self.disable_ptr, disable_ptr=self.disable_ptr,
) )
) )
@ -413,7 +431,7 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
name=rcd_name, name=rcd_name,
type=change.new._type, type=change.new._type,
ttl=change.new.ttl, ttl=change.new.ttl,
value=re.sub(r"\\*;", ";", record), value=self._fix_semicolon(record, escape=False),
disable_ptr=self.disable_ptr, disable_ptr=self.disable_ptr,
) )
self.log.debug(f"ADD {nb_record.type} {nb_record.name} {nb_record.value}") self.log.debug(f"ADD {nb_record.type} {nb_record.name} {nb_record.value}")

View File

@ -59,6 +59,45 @@ def test_txt2():
assert value == r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com" assert value == r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
def test_txt3():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_type = "TXT"
rcd_value = r"v=DKIM1; k=rsa; p=/0f+sikE+k9ZKbn1BJu0/soWht/+Zd/nc/+Gy//mQ1B5sCKYKgAmYTSWkxRjFzkc6KAQhi+/IzaFogEV050wcscdC8Rc8lAQzDUFrMs2ZZK1vFtkwIDAQAB"
value = nbdns._format_rdata(rcd_type, rcd_value)
assert (
value
== r"v=DKIM1\; k=rsa\; p=/0f+sikE+k9ZKbn1BJu0/soWht/+Zd/nc/+Gy//mQ1B5sCKYKgAmYTSWkxRjFzkc6KAQhi+/IzaFogEV050wcscdC8Rc8lAQzDUFrMs2ZZK1vFtkwIDAQAB"
)
def test_txt4():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_type = "TXT"
rcd_value = r"t=y\;o=~\;"
value = nbdns._format_rdata(rcd_type, rcd_value)
assert value == r"t=y\;o=~\;"
def test_txt5():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_type = "TXT"
rcd_value = r"t=y;o=~;"
value = nbdns._format_rdata(rcd_type, rcd_value)
assert value == r"t=y\;o=~\;"
def test_txt4():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_type = "TXT"
rcd_value = r"v=TLSRPTv1\\; rua=mailto:tlsrpt@example.com"
value = nbdns._format_rdata(rcd_type, rcd_value)
assert value == r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
def test_srv(): def test_srv():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG) nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_type = "SRV" rcd_type = "SRV"