Compare commits
7 commits
6b06cada38
...
78448d2bde
Author | SHA1 | Date | |
---|---|---|---|
78448d2bde | |||
ff1a47cf8c | |||
91ba03af7f | |||
f38a1036b7 | |||
6e624a79a7 | |||
a8ecfab930 | |||
afe182628a |
3 changed files with 139 additions and 21 deletions
|
@ -1,5 +1,4 @@
|
|||
import logging
|
||||
import re
|
||||
from typing import Any, Literal
|
||||
|
||||
import dns.rdata
|
||||
|
@ -91,6 +90,16 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
|
|||
|
||||
return absolute_value
|
||||
|
||||
def _escape_semicolon(self, value: str) -> str:
|
||||
fixed = value.replace(";", "\\;")
|
||||
self.log.debug(f"in='{value}', escaped='{fixed}'")
|
||||
return fixed
|
||||
|
||||
def _unescape_semicolon(self, value: str) -> str:
|
||||
fixed = value.replace("\\\\", "\\").replace("\\;", ";")
|
||||
self.log.debug(f"in='{value}', unescaped='{fixed}'")
|
||||
return fixed
|
||||
|
||||
def _get_nb_view(self, view: str | None | Literal[False]) -> dict[str, int | str]:
|
||||
"""get the correct netbox view when requested
|
||||
|
||||
|
@ -196,7 +205,7 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
|
|||
}
|
||||
|
||||
case "SPF" | "TXT":
|
||||
value = re.sub(r"\\*;", "\\;", rcd_value)
|
||||
value = self._escape_semicolon(rcd_value)
|
||||
|
||||
case "SRV":
|
||||
value = {
|
||||
|
@ -236,8 +245,8 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
|
|||
zone_id=nb_zone.id, status="active"
|
||||
)
|
||||
for nb_record in nb_records:
|
||||
rcd_name: str = nb_record.name if nb_record.name != "@" else ""
|
||||
rcd_value: str = nb_record.value if nb_record.value != "@" else nb_record.zone.name
|
||||
rcd_name: str = "" if nb_record.name == "@" else nb_record.name
|
||||
rcd_value: str = nb_record.zone.name if nb_record.value == "@" else nb_record.value
|
||||
rcd_type: str = nb_record.type
|
||||
rcd_ttl: int = nb_record.ttl or nb_zone.default_ttl
|
||||
if nb_record.type == "NS":
|
||||
|
@ -249,11 +258,10 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
|
|||
"ttl": rcd_ttl,
|
||||
"values": [],
|
||||
}
|
||||
|
||||
self.log.debug(f"record data={rcd_data}")
|
||||
self.log.debug(f"working on record={rcd_data} -> {rcd_value}")
|
||||
|
||||
try:
|
||||
rcd_rdata = self._format_rdata(nb_record, rcd_value)
|
||||
rcd_rdata = self._format_rdata(rcd_type, rcd_value)
|
||||
except NotImplementedError:
|
||||
continue
|
||||
|
||||
|
@ -262,6 +270,8 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
|
|||
|
||||
records[(rcd_name, rcd_type)]["values"].append(rcd_rdata)
|
||||
|
||||
self.log.debug(f"record data={records[(rcd_name, rcd_type)]}")
|
||||
|
||||
return list(records.values())
|
||||
|
||||
def populate(
|
||||
|
@ -329,6 +339,11 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
|
|||
|
||||
return True
|
||||
|
||||
def _unescape_for_netbox(self, rcd_type: str, value: str) -> str:
|
||||
if rcd_type not in ["TXT", "SPF"]:
|
||||
return value
|
||||
return self._unescape_semicolon(value)
|
||||
|
||||
def _apply(self, plan: octodns.provider.plan.Plan) -> None:
|
||||
"""apply the changes to the NetBox DNS zone.
|
||||
|
||||
|
@ -347,17 +362,15 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
|
|||
|
||||
new_changeset = self._format_changeset(change.new)
|
||||
for record in new_changeset:
|
||||
nb_record: pynetbox.core.response.Record = (
|
||||
self.log.debug(f"ADD {change.new._type} {rcd_name} {record}")
|
||||
self.api.plugins.netbox_dns.records.create(
|
||||
zone=nb_zone.id,
|
||||
name=rcd_name,
|
||||
type=change.new._type,
|
||||
ttl=change.new.ttl,
|
||||
value=re.sub(r"\\*;", ";", record),
|
||||
value=self._unescape_for_netbox(change.new._type, record),
|
||||
disable_ptr=self.disable_ptr,
|
||||
)
|
||||
)
|
||||
self.log.debug(f"ADD {nb_record.type} {nb_record.name} {nb_record.value}")
|
||||
|
||||
case octodns.record.Delete():
|
||||
nb_records: pynetbox.core.response.RecordSet = (
|
||||
|
@ -401,19 +414,19 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
|
|||
)
|
||||
nb_record.delete()
|
||||
if nb_record.value in to_update:
|
||||
nb_record.ttl = change.new.ttl
|
||||
self.log.debug(
|
||||
f"MODIFY {nb_record.type} {nb_record.name} {nb_record.value}"
|
||||
)
|
||||
nb_record.ttl = change.new.ttl
|
||||
nb_record.save()
|
||||
|
||||
for record in to_create:
|
||||
self.log.debug(f"ADD {change.new._type} {rcd_name} {record}")
|
||||
nb_record = self.api.plugins.netbox_dns.records.create(
|
||||
zone=nb_zone.id,
|
||||
name=rcd_name,
|
||||
type=change.new._type,
|
||||
ttl=change.new.ttl,
|
||||
value=re.sub(r"\\*;", ";", record),
|
||||
value=self._unescape_for_netbox(change.new._type, record),
|
||||
disable_ptr=self.disable_ptr,
|
||||
)
|
||||
self.log.debug(f"ADD {nb_record.type} {nb_record.name} {nb_record.value}")
|
||||
|
|
75
tests/test_escaple_semicolon.py
Normal file
75
tests/test_escaple_semicolon.py
Normal file
|
@ -0,0 +1,75 @@
|
|||
from octodns_netbox_dns import NetBoxDNSProvider
|
||||
|
||||
|
||||
DEFAULT_CONFIG = {
|
||||
"id": 1,
|
||||
"url": "https://localhost:8000",
|
||||
"token": "",
|
||||
"view": False,
|
||||
"replace_duplicates": False,
|
||||
"make_absolute": True,
|
||||
}
|
||||
|
||||
|
||||
def test_escape1():
|
||||
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||
rcd_value = r"v=TLSRPTv1; rua=mailto:tlsrpt@example.com"
|
||||
value = nbdns._escape_semicolon(rcd_value)
|
||||
|
||||
assert value == r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
|
||||
|
||||
|
||||
def test_escape2():
|
||||
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||
rcd_value = r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
|
||||
value = nbdns._escape_semicolon(rcd_value)
|
||||
|
||||
assert value == r"v=TLSRPTv1\\; rua=mailto:tlsrpt@example.com"
|
||||
|
||||
|
||||
def test_escape3():
|
||||
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||
rcd_value = r"t=y\;o=~\;"
|
||||
value = nbdns._escape_semicolon(rcd_value)
|
||||
|
||||
assert value == r"t=y\\;o=~\\;"
|
||||
|
||||
|
||||
def test_escape4():
|
||||
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||
rcd_value = r"t=y;o=~;"
|
||||
value = nbdns._escape_semicolon(rcd_value)
|
||||
|
||||
assert value == r"t=y\;o=~\;"
|
||||
|
||||
|
||||
def test_unescape1():
|
||||
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||
rcd_value = r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
|
||||
value = nbdns._unescape_semicolon(rcd_value)
|
||||
|
||||
assert value == r"v=TLSRPTv1; rua=mailto:tlsrpt@example.com"
|
||||
|
||||
|
||||
def test_unescape2():
|
||||
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||
rcd_value = r"v=TLSRPTv1\\; rua=mailto:tlsrpt@example.com"
|
||||
value = nbdns._unescape_semicolon(rcd_value)
|
||||
|
||||
assert value == r"v=TLSRPTv1; rua=mailto:tlsrpt@example.com"
|
||||
|
||||
|
||||
def test_unescape3():
|
||||
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||
rcd_value = r"t=y\\;o=~\;"
|
||||
value = nbdns._unescape_semicolon(rcd_value)
|
||||
|
||||
assert value == r"t=y;o=~;"
|
||||
|
||||
|
||||
def test_unescape4():
|
||||
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||
rcd_value = r"t=y;o=~;"
|
||||
value = nbdns._unescape_semicolon(rcd_value)
|
||||
|
||||
assert value == r"t=y;o=~;"
|
|
@ -56,7 +56,37 @@ def test_txt2():
|
|||
rcd_value = r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
|
||||
value = nbdns._format_rdata(rcd_type, rcd_value)
|
||||
|
||||
assert value == r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
|
||||
assert value == r"v=TLSRPTv1\\; rua=mailto:tlsrpt@example.com"
|
||||
|
||||
|
||||
def test_txt3():
|
||||
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||
rcd_type = "TXT"
|
||||
rcd_value = r"v=DKIM1; k=rsa; p=/0f+sikE+k9ZKbn1BJu0/soWht/+Zd/nc/+Gy//mQ1B5sCKYKgAmYTSWkxRjFzkc6KAQhi+/IzaFogEV050wcscdC8Rc8lAQzDUFrMs2ZZK1vFtkwIDAQAB"
|
||||
value = nbdns._format_rdata(rcd_type, rcd_value)
|
||||
|
||||
assert (
|
||||
value
|
||||
== r"v=DKIM1\; k=rsa\; p=/0f+sikE+k9ZKbn1BJu0/soWht/+Zd/nc/+Gy//mQ1B5sCKYKgAmYTSWkxRjFzkc6KAQhi+/IzaFogEV050wcscdC8Rc8lAQzDUFrMs2ZZK1vFtkwIDAQAB"
|
||||
)
|
||||
|
||||
|
||||
def test_txt4():
|
||||
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||
rcd_type = "TXT"
|
||||
rcd_value = r"t=y\;o=~\;"
|
||||
value = nbdns._format_rdata(rcd_type, rcd_value)
|
||||
|
||||
assert value == r"t=y\\;o=~\\;"
|
||||
|
||||
|
||||
def test_txt5():
|
||||
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||
rcd_type = "TXT"
|
||||
rcd_value = r"t=y;o=~;"
|
||||
value = nbdns._format_rdata(rcd_type, rcd_value)
|
||||
|
||||
assert value == r"t=y\;o=~\;"
|
||||
|
||||
|
||||
def test_srv():
|
||||
|
|
Loading…
Reference in a new issue