Compare commits

..

No commits in common. "78448d2bde85fda1718c64e6cd8983b4fcb20fe0" and "6b06cada38751648b9365fd27b850aa2b6d250a8" have entirely different histories.

3 changed files with 21 additions and 139 deletions

View file

@ -1,4 +1,5 @@
import logging import logging
import re
from typing import Any, Literal from typing import Any, Literal
import dns.rdata import dns.rdata
@ -90,16 +91,6 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
return absolute_value return absolute_value
def _escape_semicolon(self, value: str) -> str:
fixed = value.replace(";", "\\;")
self.log.debug(f"in='{value}', escaped='{fixed}'")
return fixed
def _unescape_semicolon(self, value: str) -> str:
fixed = value.replace("\\\\", "\\").replace("\\;", ";")
self.log.debug(f"in='{value}', unescaped='{fixed}'")
return fixed
def _get_nb_view(self, view: str | None | Literal[False]) -> dict[str, int | str]: def _get_nb_view(self, view: str | None | Literal[False]) -> dict[str, int | str]:
"""get the correct netbox view when requested """get the correct netbox view when requested
@ -205,7 +196,7 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
} }
case "SPF" | "TXT": case "SPF" | "TXT":
value = self._escape_semicolon(rcd_value) value = re.sub(r"\\*;", "\\;", rcd_value)
case "SRV": case "SRV":
value = { value = {
@ -245,8 +236,8 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
zone_id=nb_zone.id, status="active" zone_id=nb_zone.id, status="active"
) )
for nb_record in nb_records: for nb_record in nb_records:
rcd_name: str = "" if nb_record.name == "@" else nb_record.name rcd_name: str = nb_record.name if nb_record.name != "@" else ""
rcd_value: str = nb_record.zone.name if nb_record.value == "@" else nb_record.value rcd_value: str = nb_record.value if nb_record.value != "@" else nb_record.zone.name
rcd_type: str = nb_record.type rcd_type: str = nb_record.type
rcd_ttl: int = nb_record.ttl or nb_zone.default_ttl rcd_ttl: int = nb_record.ttl or nb_zone.default_ttl
if nb_record.type == "NS": if nb_record.type == "NS":
@ -258,10 +249,11 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
"ttl": rcd_ttl, "ttl": rcd_ttl,
"values": [], "values": [],
} }
self.log.debug(f"working on record={rcd_data} -> {rcd_value}")
self.log.debug(f"record data={rcd_data}")
try: try:
rcd_rdata = self._format_rdata(rcd_type, rcd_value) rcd_rdata = self._format_rdata(nb_record, rcd_value)
except NotImplementedError: except NotImplementedError:
continue continue
@ -270,8 +262,6 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
records[(rcd_name, rcd_type)]["values"].append(rcd_rdata) records[(rcd_name, rcd_type)]["values"].append(rcd_rdata)
self.log.debug(f"record data={records[(rcd_name, rcd_type)]}")
return list(records.values()) return list(records.values())
def populate( def populate(
@ -339,11 +329,6 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
return True return True
def _unescape_for_netbox(self, rcd_type: str, value: str) -> str:
if rcd_type not in ["TXT", "SPF"]:
return value
return self._unescape_semicolon(value)
def _apply(self, plan: octodns.provider.plan.Plan) -> None: def _apply(self, plan: octodns.provider.plan.Plan) -> None:
"""apply the changes to the NetBox DNS zone. """apply the changes to the NetBox DNS zone.
@ -362,15 +347,17 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
new_changeset = self._format_changeset(change.new) new_changeset = self._format_changeset(change.new)
for record in new_changeset: for record in new_changeset:
self.log.debug(f"ADD {change.new._type} {rcd_name} {record}") nb_record: pynetbox.core.response.Record = (
self.api.plugins.netbox_dns.records.create( self.api.plugins.netbox_dns.records.create(
zone=nb_zone.id, zone=nb_zone.id,
name=rcd_name, name=rcd_name,
type=change.new._type, type=change.new._type,
ttl=change.new.ttl, ttl=change.new.ttl,
value=self._unescape_for_netbox(change.new._type, record), value=re.sub(r"\\*;", ";", record),
disable_ptr=self.disable_ptr, disable_ptr=self.disable_ptr,
)
) )
self.log.debug(f"ADD {nb_record.type} {nb_record.name} {nb_record.value}")
case octodns.record.Delete(): case octodns.record.Delete():
nb_records: pynetbox.core.response.RecordSet = ( nb_records: pynetbox.core.response.RecordSet = (
@ -414,19 +401,19 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
) )
nb_record.delete() nb_record.delete()
if nb_record.value in to_update: if nb_record.value in to_update:
nb_record.ttl = change.new.ttl
self.log.debug( self.log.debug(
f"MODIFY {nb_record.type} {nb_record.name} {nb_record.value}" f"MODIFY {nb_record.type} {nb_record.name} {nb_record.value}"
) )
nb_record.ttl = change.new.ttl
nb_record.save() nb_record.save()
for record in to_create: for record in to_create:
self.log.debug(f"ADD {change.new._type} {rcd_name} {record}")
nb_record = self.api.plugins.netbox_dns.records.create( nb_record = self.api.plugins.netbox_dns.records.create(
zone=nb_zone.id, zone=nb_zone.id,
name=rcd_name, name=rcd_name,
type=change.new._type, type=change.new._type,
ttl=change.new.ttl, ttl=change.new.ttl,
value=self._unescape_for_netbox(change.new._type, record), value=re.sub(r"\\*;", ";", record),
disable_ptr=self.disable_ptr, disable_ptr=self.disable_ptr,
) )
self.log.debug(f"ADD {nb_record.type} {nb_record.name} {nb_record.value}")

View file

@ -1,75 +0,0 @@
from octodns_netbox_dns import NetBoxDNSProvider
DEFAULT_CONFIG = {
"id": 1,
"url": "https://localhost:8000",
"token": "",
"view": False,
"replace_duplicates": False,
"make_absolute": True,
}
def test_escape1():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_value = r"v=TLSRPTv1; rua=mailto:tlsrpt@example.com"
value = nbdns._escape_semicolon(rcd_value)
assert value == r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
def test_escape2():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_value = r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
value = nbdns._escape_semicolon(rcd_value)
assert value == r"v=TLSRPTv1\\; rua=mailto:tlsrpt@example.com"
def test_escape3():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_value = r"t=y\;o=~\;"
value = nbdns._escape_semicolon(rcd_value)
assert value == r"t=y\\;o=~\\;"
def test_escape4():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_value = r"t=y;o=~;"
value = nbdns._escape_semicolon(rcd_value)
assert value == r"t=y\;o=~\;"
def test_unescape1():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_value = r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
value = nbdns._unescape_semicolon(rcd_value)
assert value == r"v=TLSRPTv1; rua=mailto:tlsrpt@example.com"
def test_unescape2():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_value = r"v=TLSRPTv1\\; rua=mailto:tlsrpt@example.com"
value = nbdns._unescape_semicolon(rcd_value)
assert value == r"v=TLSRPTv1; rua=mailto:tlsrpt@example.com"
def test_unescape3():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_value = r"t=y\\;o=~\;"
value = nbdns._unescape_semicolon(rcd_value)
assert value == r"t=y;o=~;"
def test_unescape4():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_value = r"t=y;o=~;"
value = nbdns._unescape_semicolon(rcd_value)
assert value == r"t=y;o=~;"

View file

@ -56,37 +56,7 @@ def test_txt2():
rcd_value = r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com" rcd_value = r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
value = nbdns._format_rdata(rcd_type, rcd_value) value = nbdns._format_rdata(rcd_type, rcd_value)
assert value == r"v=TLSRPTv1\\; rua=mailto:tlsrpt@example.com" assert value == r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
def test_txt3():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_type = "TXT"
rcd_value = r"v=DKIM1; k=rsa; p=/0f+sikE+k9ZKbn1BJu0/soWht/+Zd/nc/+Gy//mQ1B5sCKYKgAmYTSWkxRjFzkc6KAQhi+/IzaFogEV050wcscdC8Rc8lAQzDUFrMs2ZZK1vFtkwIDAQAB"
value = nbdns._format_rdata(rcd_type, rcd_value)
assert (
value
== r"v=DKIM1\; k=rsa\; p=/0f+sikE+k9ZKbn1BJu0/soWht/+Zd/nc/+Gy//mQ1B5sCKYKgAmYTSWkxRjFzkc6KAQhi+/IzaFogEV050wcscdC8Rc8lAQzDUFrMs2ZZK1vFtkwIDAQAB"
)
def test_txt4():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_type = "TXT"
rcd_value = r"t=y\;o=~\;"
value = nbdns._format_rdata(rcd_type, rcd_value)
assert value == r"t=y\\;o=~\\;"
def test_txt5():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_type = "TXT"
rcd_value = r"t=y;o=~;"
value = nbdns._format_rdata(rcd_type, rcd_value)
assert value == r"t=y\;o=~\;"
def test_srv(): def test_srv():