Compare commits

..

12 commits

Author SHA1 Message Date
c62453b0ac
Merge pull request #3 from olofvndrhr/dev
All checks were successful
check code / check-code (push) Successful in 28s
merge dev into main
2024-02-29 12:52:28 +01:00
0fb38b9ae5 add tests and move multiple replaces to regex replace
All checks were successful
check code / check-code (push) Successful in 33s
2024-02-29 12:48:50 +01:00
6953643876 Update dependency octodns to v1.5.0
All checks were successful
check code / check-code (push) Successful in 33s
2024-02-29 09:22:49 +01:00
537cd2061b Update lscr.io/linuxserver/netbox Docker tag to v3.7.3 2024-02-29 09:22:49 +01:00
6326903bb3 add typehint for __init__
All checks were successful
check code / check-code (push) Successful in 29s
Signed-off-by: Ivan Schaller <ivan@schaller.sh>
2024-02-28 16:20:46 +01:00
408cf3fdd2 rename class and add new tests
All checks were successful
check code / check-code (push) Successful in 32s
2024-02-28 16:07:32 +01:00
29cf05c888 fix definition
All checks were successful
check code / check-code (push) Successful in 32s
2024-02-28 14:38:08 +01:00
59a9b01fef rename some variables
Some checks failed
check code / check-code (push) Failing after 21s
2024-02-28 14:33:53 +01:00
6612daeee7 fix some errors in record filtering 2024-02-28 14:28:42 +01:00
792b9d5429 simplify some code 2024-02-28 14:07:30 +01:00
e27f8938a5 add dev branch to tests
All checks were successful
check code / check-code (push) Successful in 37s
2024-02-21 16:23:21 +01:00
284f001236 first tests with provider support 2024-02-21 16:21:25 +01:00
11 changed files with 312 additions and 60 deletions

View file

@ -2,10 +2,10 @@ name: check code
on: on:
push: push:
branches: [main, master] branches: [main, master, dev]
pull_request: pull_request:
branches: [main, master] branches: [main, master, dev]
jobs: jobs:
check-code: check-code:

View file

@ -2,10 +2,10 @@ name: check code
on: on:
push: push:
branches: [main, master] branches: [main, master, dev]
pull_request: pull_request:
branches: [main, master] branches: [main, master, dev]
jobs: jobs:
check-code: check-code:

View file

@ -1,4 +1,4 @@
# netbox-plugin-dns source for octodns # netbox-plugin-dns provider for octodns
> works with https://github.com/peteeckel/netbox-plugin-dns > works with https://github.com/peteeckel/netbox-plugin-dns
@ -7,7 +7,7 @@
```yml ```yml
providers: providers:
config: config:
class: octodns_netbox_dns.NetBoxDNSSource class: octodns_netbox_dns.NetBoxDNSProvider
# Netbox url # Netbox url
# [mandatory, default=null] # [mandatory, default=null]
url: "https://some-url" url: "https://some-url"
@ -27,8 +27,19 @@ providers:
# Make CNAME, MX and SRV records absolute if they are missing the trailing "." # Make CNAME, MX and SRV records absolute if they are missing the trailing "."
# [optional, default=false] # [optional, default=false]
make_absolute: false make_absolute: false
# Disable automatic PTR record creating in the NetboxDNS plugin.
# [optional, default=true]
disable_ptr: true
``` ```
## compatibility
> actively tested on the newest netbox-plugin-dns and netbox versions
| provider | [netbox-plugin-dns](https://github.com/peteeckel/netbox-plugin-dns) | [netbox](https://github.com/netbox-community/netbox) |
|-------------|---------------------------------------------------------------------|------------------------------------------------------|
| `>= v0.3.3` | `>=0.21.0` | `>=3.6.0` |
## install ## install
### via pip ### via pip

View file

@ -27,7 +27,7 @@ providers:
enforce_order: true enforce_order: true
populate_should_replace: false populate_should_replace: false
netbox: netbox:
class: octodns_netbox_dns.NetBoxDNSSource class: octodns_netbox_dns.NetBoxDNSProvider
url: http://localhost:8000 url: http://localhost:8000
token: 1ca8f8de1d651b0859052dc5e6a0858fd1e43e3d # change token for netbox token: 1ca8f8de1d651b0859052dc5e6a0858fd1e43e3d # change token for netbox
view: false view: false

View file

@ -1,11 +1,10 @@
--- ---
? '' ? ''
: : ttl: 172800
ttl: 172800
type: NS type: NS
values: values:
- ns1.example.com. - ns1.example.com.
- ns2.example.com. - ns3.example.com.
ns1: ns1:
type: A type: A
value: 192.168.1.1 value: 192.168.1.1
@ -21,3 +20,6 @@ record11:
record12: record12:
type: A type: A
value: 192.168.1.12 value: 192.168.1.12
x._domainkey:
type: TXT
value: v=DKIM1\; k=rsa\; p=MIIBIjANBgkasdasdasIIBCgKCAQEAq7OOAhfjaOKMSiJR8xkG+sadasdasd+OiWdZEZ7T4blBQxuWTNGoaG1CKOFeJSf72JAlqxF++z2CB4ypdUOoRNn96KlcpI2LBmoW1c7ZzFzqPvgCs+EzaOnt5S2FH2njHb15+atdE8cuZ7+MGmHf5HSD/asdasdasdasdasdadadasd+2Chr68t4wc+qPrUIGM3JnOhzKyiK6FfQXVwoDTxwmKTE2EzhPAQjlCHrRLaynDqiGjXjZcqoF9UEB5cBGw+asdasdasdasdas/dwIDAQAB

View file

@ -1,11 +1,13 @@
--- ---
? '' ? ''
: : ttl: 172800
ttl: 172800
type: NS type: NS
values: values:
- ns1.example.com. - ns1.example.com.
- ns2.example.com. - ns2.example.com.
_smtp._tls:
type: TXT
value: v=TLSRPTv1\; rua=mailto:tlsrpt@example.com
record1: record1:
type: CNAME type: CNAME
value: record11.test.example.com. value: record11.test.example.com.

View file

@ -1,11 +1,10 @@
--- ---
? '' ? ''
: : ttl: 172800
ttl: 172800
type: NS type: NS
values: values:
- ns1.example.com. - ns1.example.com.
- ns2.example.com. - ns2.example.com.
record1: record1:
type: CNAME type: CNAME
value: record11.view.example.com. value: record11.view.example.com.
@ -14,4 +13,4 @@ record11:
value: 192.168.1.11 value: 192.168.1.11
record12: record12:
type: A type: A
value: 192.168.1.12 value: 192.168.1.13

View file

@ -1,21 +1,23 @@
import logging import logging
import re
from typing import Any, Literal from typing import Any, Literal
import dns.rdata import dns.rdata
import octodns.provider.base
import octodns.provider.plan
import octodns.record import octodns.record
import octodns.source.base
import octodns.zone import octodns.zone
import pynetbox.core.api import pynetbox.core.api
import pynetbox.core.response import pynetbox.core.response
class NetBoxDNSSource(octodns.source.base.BaseSource): class NetBoxDNSProvider(octodns.provider.base.BaseProvider):
""" """OctoDNS provider for NetboxDNS"""
OctoDNS provider for NetboxDNS
"""
SUPPORTS_GEO = False SUPPORTS_GEO = False
SUPPORTS_DYNAMIC = False SUPPORTS_DYNAMIC = False
SUPPORTS_ROOT_NS = True
SUPPORTS_MULTIVALUE_PTR = True
SUPPORTS: set[str] = { # noqa SUPPORTS: set[str] = { # noqa
"A", "A",
"AAAA", "AAAA",
@ -56,23 +58,26 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
ttl=3600, ttl=3600,
replace_duplicates=False, replace_duplicates=False,
make_absolute=False, make_absolute=False,
): disable_ptr=True,
""" *args,
Initialize the NetboxDNSSource **kwargs,
""" ) -> None:
"""initialize the NetboxDNSSource"""
self.log = logging.getLogger(f"NetboxDNSSource[{id}]") self.log = logging.getLogger(f"NetboxDNSSource[{id}]")
self.log.debug(f"__init__: {id=}, {url=}, {view=}, {replace_duplicates=}, {make_absolute=}") self.log.debug(
super().__init__(id) f"__init__: {id=}, {url=}, {view=}, {replace_duplicates=}, {make_absolute=}, {disable_ptr=}, {args=}, {kwargs=}"
)
super().__init__(id, *args, **kwargs)
self.api = pynetbox.core.api.Api(url, token) self.api = pynetbox.core.api.Api(url, token)
self.nb_view = self._get_nb_view(view) self.nb_view = self._get_nb_view(view)
self.ttl = ttl self.ttl = ttl
self.replace_duplicates = replace_duplicates self.replace_duplicates = replace_duplicates
self.make_absolute = make_absolute self.make_absolute = make_absolute
self.disable_ptr = disable_ptr
def _make_absolute(self, value: str) -> str: def _make_absolute(self, value: str) -> str:
""" """return dns name with trailing dot to make it absolute
Return dns name with trailing dot to make it absolute
@param value: dns record value @param value: dns record value
@ -87,8 +92,7 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
return absolute_value return absolute_value
def _get_nb_view(self, view: str | None | Literal[False]) -> dict[str, int | str]: def _get_nb_view(self, view: str | None | Literal[False]) -> dict[str, int | str]:
""" """get the correct netbox view when requested
Get the correct netbox view when requested
@param view: `False` for no view, `None` for zones without a view, else the view name @param view: `False` for no view, `None` for zones without a view, else the view name
@ -110,8 +114,7 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
return {"view_id": nb_view.id} return {"view_id": nb_view.id}
def _get_nb_zone(self, name: str, view: dict[str, str | int]) -> pynetbox.core.response.Record: def _get_nb_zone(self, name: str, view: dict[str, str | int]) -> pynetbox.core.response.Record:
""" """given a zone name and a view name, look it up in NetBox.
Given a zone name and a view name, look it up in NetBox.
@param name: name of the dns zone @param name: name of the dns zone
@param view: the netbox view id in the api query format @param view: the netbox view id in the api query format
@ -127,15 +130,15 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
return nb_zone return nb_zone
def _format_rdata(self, rdata: dns.rdata.Rdata, raw_value: str) -> str | dict[str, Any]: def _format_rdata(self, rcd_type: str, rcd_value: str) -> str | dict[str, Any]:
""" """format netbox record values to correct octodns record values
Format netbox record values to correct octodns record values
@param rdata: rrdata record value @param rcd_type: record type
@param raw_value: raw record value @param rcd_value: record value
@return: formatted rrdata value @return: formatted rrdata value
""" """
rdata = dns.rdata.from_text("IN", rcd_type, rcd_value)
match rdata.rdtype.name: match rdata.rdtype.name:
case "A" | "AAAA": case "A" | "AAAA":
value = rdata.address value = rdata.address
@ -193,7 +196,7 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
} }
case "SPF" | "TXT": case "SPF" | "TXT":
value = raw_value.replace(";", r"\;") value = re.sub(r"\\*;", "\\;", rcd_value)
case "SRV": case "SRV":
value = { value = {
@ -216,8 +219,7 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
return value # type:ignore return value # type:ignore
def _format_nb_records(self, zone: octodns.zone.Zone) -> list[dict[str, Any]]: def _format_nb_records(self, zone: octodns.zone.Zone) -> list[dict[str, Any]]:
""" """format netbox dns records to the octodns format
Format netbox dns records to the octodns format
@param zone: octodns zone @param zone: octodns zone
@ -235,7 +237,7 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
) )
for nb_record in nb_records: for nb_record in nb_records:
rcd_name: str = nb_record.name if nb_record.name != "@" else "" rcd_name: str = nb_record.name if nb_record.name != "@" else ""
raw_value: str = nb_record.value if nb_record.value != "@" else nb_record.zone.name rcd_value: str = nb_record.value if nb_record.value != "@" else nb_record.zone.name
rcd_type: str = nb_record.type rcd_type: str = nb_record.type
rcd_ttl: int = nb_record.ttl or nb_zone.default_ttl rcd_ttl: int = nb_record.ttl or nb_zone.default_ttl
if nb_record.type == "NS": if nb_record.type == "NS":
@ -250,34 +252,36 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
self.log.debug(f"record data={rcd_data}") self.log.debug(f"record data={rcd_data}")
rdata = dns.rdata.from_text("IN", nb_record.type, raw_value)
try: try:
rcd_value = self._format_rdata(rdata, raw_value) rcd_rdata = self._format_rdata(nb_record, rcd_value)
except NotImplementedError: except NotImplementedError:
continue continue
except Exception as exc:
raise exc
if (rcd_name, rcd_type) not in records: if (rcd_name, rcd_type) not in records:
records[(rcd_name, rcd_type)] = rcd_data records[(rcd_name, rcd_type)] = rcd_data
records[(rcd_name, rcd_type)]["values"].append(rcd_value) records[(rcd_name, rcd_type)]["values"].append(rcd_rdata)
return list(records.values()) return list(records.values())
def populate( def populate(
self, zone: octodns.zone.Zone, target: bool = False, lenient: bool = False self, zone: octodns.zone.Zone, target: bool = False, lenient: bool = False
) -> None: ) -> bool:
""" """get all the records of a zone from NetBox and add them to the OctoDNS zone
Get all the records of a zone from NetBox and add them to the OctoDNS zone
@param zone: octodns zone @param zone: octodns zone
@param target: when `True`, load the current state of the provider. @param target: when `True`, load the current state of the provider.
@param lenient: when `True`, skip record validation and do a "best effort" load of data. @param lenient: when `True`, skip record validation and do a "best effort" load of data.
@return: true if the zone exists, else false.
""" """
self.log.info(f"populate -> '{zone.name}', target={target}, lenient={lenient}") self.log.info(f"populate -> '{zone.name}', target={target}, lenient={lenient}")
records = self._format_nb_records(zone) try:
records = self._format_nb_records(zone)
except LookupError:
return False
for data in records: for data in records:
if len(data["values"]) == 1: if len(data["values"]) == 1:
data["value"] = data.pop("values")[0] data["value"] = data.pop("values")[0]
@ -291,3 +295,125 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
zone.add_record(record, lenient=lenient, replace=self.replace_duplicates) zone.add_record(record, lenient=lenient, replace=self.replace_duplicates)
self.log.info(f"populate -> found {len(zone.records)} records for zone '{zone.name}'") self.log.info(f"populate -> found {len(zone.records)} records for zone '{zone.name}'")
return True
@staticmethod
def _format_changeset(change: Any) -> set[str]:
"""format the changeset
@param change: the raw changes
@return: the formatted changeset
"""
match change:
case octodns.record.ValueMixin():
changeset = {repr(change.value)[1:-1]}
case octodns.record.ValuesMixin():
changeset = {repr(v)[1:-1] for v in change.values}
case _:
raise ValueError
return changeset
def _include_change(self, change: octodns.record.change.Change) -> bool:
"""filter out record types which the provider can't create in netbox
@param change: the planned change
@return: false if the change should be discarded, true if it should be kept.
"""
if change.record._type in ["SOA", "PTR", "NS"]:
self.log.debug(f"record not supported as provider, ignoring: {change.record}")
return False
return True
def _apply(self, plan: octodns.provider.plan.Plan) -> None:
"""apply the changes to the NetBox DNS zone.
@param plan: the planned changes
@return: none
"""
self.log.debug(f"_apply: zone={plan.desired.name}, changes={len(plan.changes)}")
nb_zone = self._get_nb_zone(plan.desired.name, view=self.nb_view)
for change in plan.changes:
match change:
case octodns.record.Create():
rcd_name = "@" if change.new.name == "" else change.new.name
new_changeset = self._format_changeset(change.new)
for record in new_changeset:
nb_record: pynetbox.core.response.Record = (
self.api.plugins.netbox_dns.records.create(
zone=nb_zone.id,
name=rcd_name,
type=change.new._type,
ttl=change.new.ttl,
value=re.sub(r"\\*;", ";", record),
disable_ptr=self.disable_ptr,
)
)
self.log.debug(f"ADD {nb_record.type} {nb_record.name} {nb_record.value}")
case octodns.record.Delete():
nb_records: pynetbox.core.response.RecordSet = (
self.api.plugins.netbox_dns.records.filter(
zone_id=nb_zone.id,
name=change.existing.name,
type=change.existing._type,
)
)
existing_changeset = self._format_changeset(change.existing)
for nb_record in nb_records:
for record in existing_changeset:
if nb_record.value != record:
continue
self.log.debug(
f"DELETE {nb_record.type} {nb_record.name} {nb_record.value}"
)
nb_record.delete()
case octodns.record.Update():
rcd_name = "@" if change.existing.name == "" else change.existing.name
nb_records = self.api.plugins.netbox_dns.records.filter(
zone_id=nb_zone.id,
name=rcd_name,
type=change.existing._type,
)
existing_changeset = self._format_changeset(change.existing)
new_changeset = self._format_changeset(change.new)
to_delete = existing_changeset.difference(new_changeset)
to_update = existing_changeset.intersection(new_changeset)
to_create = new_changeset.difference(existing_changeset)
for nb_record in nb_records:
if nb_record.value in to_delete:
self.log.debug(
f"DELETE {nb_record.type} {nb_record.name} {nb_record.value}"
)
nb_record.delete()
if nb_record.value in to_update:
nb_record.ttl = change.new.ttl
self.log.debug(
f"MODIFY {nb_record.type} {nb_record.name} {nb_record.value}"
)
nb_record.save()
for record in to_create:
nb_record = self.api.plugins.netbox_dns.records.create(
zone=nb_zone.id,
name=rcd_name,
type=change.new._type,
ttl=change.new.ttl,
value=re.sub(r"\\*;", ";", record),
disable_ptr=self.disable_ptr,
)
self.log.debug(f"ADD {nb_record.type} {nb_record.name} {nb_record.value}")

View file

@ -0,0 +1,39 @@
from octodns_netbox_dns import NetBoxDNSProvider
class Change:
def __init__(self, rtype: str):
self.record = Record(rtype)
class Record:
def __init__(self, rtype: str):
self._type = rtype
DEFAULT_CONFIG = {
"id": 1,
"url": "https://localhost:8000",
"token": "",
"view": False,
"replace_duplicates": False,
"make_absolute": True,
}
def test1():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
for n in ["SOA", "NS", "PTR"]:
change = Change(n)
include_rcd = nbdns._include_change(change)
assert not include_rcd
def test2():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
for n in ["A", "AAA", "CNAME", "TXT", "MX"]:
change = Change(n)
include_rcd = nbdns._include_change(change)
assert include_rcd

View file

@ -0,0 +1,73 @@
from octodns_netbox_dns import NetBoxDNSProvider
DEFAULT_CONFIG = {
"id": 1,
"url": "https://localhost:8000",
"token": "",
"view": False,
"replace_duplicates": False,
"make_absolute": True,
}
def test_a():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_type = "A"
rcd_value = "127.0.0.1"
value = nbdns._format_rdata(rcd_type, rcd_value)
assert value == "127.0.0.1"
def test_aaaa():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_type = "AAAA"
rcd_value = "fc07::1"
value = nbdns._format_rdata(rcd_type, rcd_value)
assert value == "fc07::1"
def test_mx():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_type = "MX"
rcd_value = "10 mx.example.com"
value = nbdns._format_rdata(rcd_type, rcd_value)
assert value == {
"preference": 10,
"exchange": "mx.example.com.",
}
def test_txt1():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_type = "TXT"
rcd_value = "v=TLSRPTv1; rua=mailto:tlsrpt@example.com"
value = nbdns._format_rdata(rcd_type, rcd_value)
assert value == r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
def test_txt2():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_type = "TXT"
rcd_value = r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
value = nbdns._format_rdata(rcd_type, rcd_value)
assert value == r"v=TLSRPTv1\; rua=mailto:tlsrpt@example.com"
def test_srv():
nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd_type = "SRV"
rcd_value = r"0 5 25565 mc.example.com"
value = nbdns._format_rdata(rcd_type, rcd_value)
assert value == {
"priority": 0,
"weight": 5,
"port": 25565,
"target": "mc.example.com.",
}

View file

@ -1,4 +1,4 @@
from octodns_netbox_dns import NetBoxDNSSource from octodns_netbox_dns import NetBoxDNSProvider
DEFAULT_CONFIG = { DEFAULT_CONFIG = {
@ -11,16 +11,16 @@ DEFAULT_CONFIG = {
} }
def test_absolute1(): def test1():
nbdns = NetBoxDNSSource(**DEFAULT_CONFIG) nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd = "example.com" rcd = "example.com"
absolute = nbdns._make_absolute(rcd) absolute = nbdns._make_absolute(rcd)
assert absolute == "example.com." assert absolute == "example.com."
def test_absolute2(): def test2():
nbdns = NetBoxDNSSource(**DEFAULT_CONFIG) nbdns = NetBoxDNSProvider(**DEFAULT_CONFIG)
rcd = "example.com." rcd = "example.com."
absolute = nbdns._make_absolute(rcd) absolute = nbdns._make_absolute(rcd)