Compare commits
2 commits
6b06cada38
...
a8ecfab930
Author | SHA1 | Date | |
---|---|---|---|
a8ecfab930 | |||
afe182628a |
3 changed files with 140 additions and 8 deletions
|
@ -91,6 +91,23 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
|
||||||
|
|
||||||
return absolute_value
|
return absolute_value
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _fix_semicolon(value: str, escape: bool) -> str:
|
||||||
|
"""escape and un-escape semicolons in record values for netbox/octodns
|
||||||
|
|
||||||
|
@param value: the record value
|
||||||
|
@param escape: if set to true, all semicolons get escaped with a backslash.
|
||||||
|
if false it un-escapes all semicolons.
|
||||||
|
|
||||||
|
@return: the modified record value
|
||||||
|
"""
|
||||||
|
if escape:
|
||||||
|
value = re.sub(r"\\*;", "\\;", value)
|
||||||
|
else:
|
||||||
|
value = re.sub(r"\\*;", ";", value)
|
||||||
|
|
||||||
|
return value
|
||||||
|
|
||||||
def _get_nb_view(self, view: str | None | Literal[False]) -> dict[str, int | str]:
|
def _get_nb_view(self, view: str | None | Literal[False]) -> dict[str, int | str]:
|
||||||
"""get the correct netbox view when requested
|
"""get the correct netbox view when requested
|
||||||
|
|
||||||
|
@ -196,7 +213,7 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
|
||||||
}
|
}
|
||||||
|
|
||||||
case "SPF" | "TXT":
|
case "SPF" | "TXT":
|
||||||
value = re.sub(r"\\*;", "\\;", rcd_value)
|
value = self._fix_semicolon(rcd_value, escape=True)
|
||||||
|
|
||||||
case "SRV":
|
case "SRV":
|
||||||
value = {
|
value = {
|
||||||
|
@ -236,8 +253,8 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
|
||||||
zone_id=nb_zone.id, status="active"
|
zone_id=nb_zone.id, status="active"
|
||||||
)
|
)
|
||||||
for nb_record in nb_records:
|
for nb_record in nb_records:
|
||||||
rcd_name: str = nb_record.name if nb_record.name != "@" else ""
|
rcd_name: str = "" if nb_record.name == "@" else nb_record.name
|
||||||
rcd_value: str = nb_record.value if nb_record.value != "@" else nb_record.zone.name
|
rcd_value: str = nb_record.zone.name if nb_record.value == "@" else nb_record.value
|
||||||
rcd_type: str = nb_record.type
|
rcd_type: str = nb_record.type
|
||||||
rcd_ttl: int = nb_record.ttl or nb_zone.default_ttl
|
rcd_ttl: int = nb_record.ttl or nb_zone.default_ttl
|
||||||
if nb_record.type == "NS":
|
if nb_record.type == "NS":
|
||||||
|
@ -249,11 +266,10 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
|
||||||
"ttl": rcd_ttl,
|
"ttl": rcd_ttl,
|
||||||
"values": [],
|
"values": [],
|
||||||
}
|
}
|
||||||
|
self.log.debug(f"working on record={rcd_data} -> {rcd_value}")
|
||||||
self.log.debug(f"record data={rcd_data}")
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
rcd_rdata = self._format_rdata(nb_record, rcd_value)
|
rcd_rdata = self._format_rdata(rcd_type, rcd_value)
|
||||||
except NotImplementedError:
|
except NotImplementedError:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -262,6 +278,8 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
|
||||||
|
|
||||||
records[(rcd_name, rcd_type)]["values"].append(rcd_rdata)
|
records[(rcd_name, rcd_type)]["values"].append(rcd_rdata)
|
||||||
|
|
||||||
|
self.log.debug(f"record data={records[(rcd_name, rcd_type)]}")
|
||||||
|
|
||||||
return list(records.values())
|
return list(records.values())
|
||||||
|
|
||||||
def populate(
|
def populate(
|
||||||
|
@ -353,7 +371,7 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
|
||||||
name=rcd_name,
|
name=rcd_name,
|
||||||
type=change.new._type,
|
type=change.new._type,
|
||||||
ttl=change.new.ttl,
|
ttl=change.new.ttl,
|
||||||
value=re.sub(r"\\*;", ";", record),
|
value=self._fix_semicolon(record, escape=False),
|
||||||
disable_ptr=self.disable_ptr,
|
disable_ptr=self.disable_ptr,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -413,7 +431,7 @@ class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
|
||||||
name=rcd_name,
|
name=rcd_name,
|
||||||
type=change.new._type,
|
type=change.new._type,
|
||||||
ttl=change.new.ttl,
|
ttl=change.new.ttl,
|
||||||
value=re.sub(r"\\*;", ";", record),
|
value=self._fix_semicolon(record, escape=False),
|
||||||
disable_ptr=self.disable_ptr,
|
disable_ptr=self.disable_ptr,
|
||||||
)
|
)
|
||||||
self.log.debug(f"ADD {nb_record.type} {nb_record.name} {nb_record.value}")
|
self.log.debug(f"ADD {nb_record.type} {nb_record.name} {nb_record.value}")
|
||||||
|
|
75
tests/test_escaple_semicolon.py
Normal file
75
tests/test_escaple_semicolon.py
Normal file
|
@ -0,0 +1,75 @@
|
||||||
|
from octodns_netbox_dns import NetBoxDNSProvider
|
||||||
|
|
||||||
|
|
||||||
|
DEFAULT_CONFIG = {
|
||||||
|
"id": 1,
|
||||||
|
"url": "https://localhost:8000",
|
||||||
|
"token": "",
|
||||||
|
"view": False,
|
||||||
|
"replace_duplicates": False,
|
||||||
|
"make_absolute": True,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_escape1():
|
||||||
|
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||||
|
rcd_value = r"v=TLSRPTv1\\; rua=mailto:tlsrpt@example.com"
|
||||||
|
value = nbdns._fix_semicolon(rcd_value, escape=True)
|
||||||
|
|
||||||
|
assert value == r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
|
||||||
|
|
||||||
|
|
||||||
|
def test_escape2():
|
||||||
|
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||||
|
rcd_value = r"v=TLSRPTv1; rua=mailto:tlsrpt@example.com"
|
||||||
|
value = nbdns._fix_semicolon(rcd_value, escape=True)
|
||||||
|
|
||||||
|
assert value == r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
|
||||||
|
|
||||||
|
|
||||||
|
def test_escape3():
|
||||||
|
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||||
|
rcd_value = r"t=y\;o=~\;"
|
||||||
|
value = nbdns._fix_semicolon(rcd_value, escape=True)
|
||||||
|
|
||||||
|
assert value == r"t=y\;o=~\;"
|
||||||
|
|
||||||
|
|
||||||
|
def test_escape4():
|
||||||
|
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||||
|
rcd_value = r"t=y;o=~;"
|
||||||
|
value = nbdns._fix_semicolon(rcd_value, escape=True)
|
||||||
|
|
||||||
|
assert value == r"t=y\;o=~\;"
|
||||||
|
|
||||||
|
|
||||||
|
def test_unescape1():
|
||||||
|
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||||
|
rcd_value = r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
|
||||||
|
value = nbdns._fix_semicolon(rcd_value, escape=False)
|
||||||
|
|
||||||
|
assert value == r"v=TLSRPTv1; rua=mailto:tlsrpt@example.com"
|
||||||
|
|
||||||
|
|
||||||
|
def test_unescape2():
|
||||||
|
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||||
|
rcd_value = r"v=TLSRPTv1\\; rua=mailto:tlsrpt@example.com"
|
||||||
|
value = nbdns._fix_semicolon(rcd_value, escape=False)
|
||||||
|
|
||||||
|
assert value == r"v=TLSRPTv1; rua=mailto:tlsrpt@example.com"
|
||||||
|
|
||||||
|
|
||||||
|
def test_unescape3():
|
||||||
|
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||||
|
rcd_value = r"t=y\\;o=~\;"
|
||||||
|
value = nbdns._fix_semicolon(rcd_value, escape=False)
|
||||||
|
|
||||||
|
assert value == r"t=y;o=~;"
|
||||||
|
|
||||||
|
|
||||||
|
def test_unescape4():
|
||||||
|
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||||
|
rcd_value = r"t=y;o=~;"
|
||||||
|
value = nbdns._fix_semicolon(rcd_value, escape=False)
|
||||||
|
|
||||||
|
assert value == r"t=y;o=~;"
|
|
@ -59,6 +59,45 @@ def test_txt2():
|
||||||
assert value == r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
|
assert value == r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
|
||||||
|
|
||||||
|
|
||||||
|
def test_txt3():
|
||||||
|
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||||
|
rcd_type = "TXT"
|
||||||
|
rcd_value = r"v=DKIM1; k=rsa; p=/0f+sikE+k9ZKbn1BJu0/soWht/+Zd/nc/+Gy//mQ1B5sCKYKgAmYTSWkxRjFzkc6KAQhi+/IzaFogEV050wcscdC8Rc8lAQzDUFrMs2ZZK1vFtkwIDAQAB"
|
||||||
|
value = nbdns._format_rdata(rcd_type, rcd_value)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
value
|
||||||
|
== r"v=DKIM1\; k=rsa\; p=/0f+sikE+k9ZKbn1BJu0/soWht/+Zd/nc/+Gy//mQ1B5sCKYKgAmYTSWkxRjFzkc6KAQhi+/IzaFogEV050wcscdC8Rc8lAQzDUFrMs2ZZK1vFtkwIDAQAB"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_txt4():
|
||||||
|
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||||
|
rcd_type = "TXT"
|
||||||
|
rcd_value = r"t=y\;o=~\;"
|
||||||
|
value = nbdns._format_rdata(rcd_type, rcd_value)
|
||||||
|
|
||||||
|
assert value == r"t=y\;o=~\;"
|
||||||
|
|
||||||
|
|
||||||
|
def test_txt5():
|
||||||
|
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||||
|
rcd_type = "TXT"
|
||||||
|
rcd_value = r"t=y;o=~;"
|
||||||
|
value = nbdns._format_rdata(rcd_type, rcd_value)
|
||||||
|
|
||||||
|
assert value == r"t=y\;o=~\;"
|
||||||
|
|
||||||
|
|
||||||
|
def test_txt4():
|
||||||
|
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||||
|
rcd_type = "TXT"
|
||||||
|
rcd_value = r"v=TLSRPTv1\\; rua=mailto:tlsrpt@example.com"
|
||||||
|
value = nbdns._format_rdata(rcd_type, rcd_value)
|
||||||
|
|
||||||
|
assert value == r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
|
||||||
|
|
||||||
|
|
||||||
def test_srv():
|
def test_srv():
|
||||||
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
|
||||||
rcd_type = "SRV"
|
rcd_type = "SRV"
|
||||||
|
|
Loading…
Reference in a new issue