first tests with provider support

This commit is contained in:
Ivan Schaller 2024-02-21 16:21:25 +01:00
parent 09898b7a22
commit 284f001236
1 changed files with 136 additions and 6 deletions

View File

@ -2,20 +2,23 @@ import logging
from typing import Any, Literal from typing import Any, Literal
import dns.rdata import dns.rdata
import octodns.provider.base
import octodns.provider.plan
import octodns.record import octodns.record
import octodns.source.base
import octodns.zone import octodns.zone
import pynetbox.core.api import pynetbox.core.api
import pynetbox.core.response import pynetbox.core.response
class NetBoxDNSSource(octodns.source.base.BaseSource): class NetBoxDNSSource(octodns.provider.base.BaseProvider):
""" """
OctoDNS provider for NetboxDNS OctoDNS provider for NetboxDNS
""" """
SUPPORTS_GEO = False SUPPORTS_GEO = False
SUPPORTS_DYNAMIC = False SUPPORTS_DYNAMIC = False
SUPPORTS_ROOT_NS = True
SUPPORTS_MULTIVALUE_PTR = True
SUPPORTS: set[str] = { # noqa SUPPORTS: set[str] = { # noqa
"A", "A",
"AAAA", "AAAA",
@ -56,13 +59,15 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
ttl=3600, ttl=3600,
replace_duplicates=False, replace_duplicates=False,
make_absolute=False, make_absolute=False,
*args,
**kwargs,
): ):
""" """
Initialize the NetboxDNSSource Initialize the NetboxDNSSource
""" """
self.log = logging.getLogger(f"NetboxDNSSource[{id}]") self.log = logging.getLogger(f"NetboxDNSSource[{id}]")
self.log.debug(f"__init__: {id=}, {url=}, {view=}, {replace_duplicates=}, {make_absolute=}") self.log.debug(f"__init__: {id=}, {url=}, {view=}, {replace_duplicates=}, {make_absolute=}")
super().__init__(id) super().__init__(id, *args, **kwargs)
self.api = pynetbox.core.api.Api(url, token) self.api = pynetbox.core.api.Api(url, token)
self.nb_view = self._get_nb_view(view) self.nb_view = self._get_nb_view(view)
@ -193,7 +198,7 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
} }
case "SPF" | "TXT": case "SPF" | "TXT":
value = raw_value.replace(";", r"\;") value = raw_value.replace(";", "\\;")
case "SRV": case "SRV":
value = { value = {
@ -267,7 +272,7 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
def populate( def populate(
self, zone: octodns.zone.Zone, target: bool = False, lenient: bool = False self, zone: octodns.zone.Zone, target: bool = False, lenient: bool = False
) -> None: ) -> bool:
""" """
Get all the records of a zone from NetBox and add them to the OctoDNS zone Get all the records of a zone from NetBox and add them to the OctoDNS zone
@ -277,10 +282,17 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
""" """
self.log.info(f"populate -> '{zone.name}', target={target}, lenient={lenient}") self.log.info(f"populate -> '{zone.name}', target={target}, lenient={lenient}")
records = self._format_nb_records(zone) try:
records = self._format_nb_records(zone)
except LookupError:
return False
for data in records: for data in records:
if len(data["values"]) == 1: if len(data["values"]) == 1:
data["value"] = data.pop("values")[0] data["value"] = data.pop("values")[0]
if target and data["type"] in ["NS", "SOA", "PTR"]:
self.log.debug(f"{data['type']} type not supported in target mode")
continue
record = octodns.record.Record.new( record = octodns.record.Record.new(
zone=zone, zone=zone,
name=data["name"], name=data["name"],
@ -291,3 +303,121 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
zone.add_record(record, lenient=lenient, replace=self.replace_duplicates) zone.add_record(record, lenient=lenient, replace=self.replace_duplicates)
self.log.info(f"populate -> found {len(zone.records)} records for zone '{zone.name}'") self.log.info(f"populate -> found {len(zone.records)} records for zone '{zone.name}'")
return True
def _include_change(self, change: octodns.record.change.Change) -> bool:
"""Filter out record types which the provider can't create in netbox"""
if change.new._type in ["SOA", "PTR", "NS"]:
return False
return True
def _apply(self, plan: octodns.provider.plan.Plan) -> None:
"""Apply the changes to the NetBox DNS zone."""
self.log.debug(f"_apply: zone={plan.desired.name}, len(changes)={len(plan.changes)}")
nb_zone = self._get_nb_zone(plan.desired.name, view=self.nb_view)
for change in plan.changes:
match change:
case octodns.record.Create():
name = change.new.name
if name == "":
name = "@"
match change.new:
case octodns.record.ValueMixin():
new = {repr(change.new.value)[1:-1]}
case octodns.record.ValuesMixin():
new = {repr(v)[1:-1] for v in change.new.values}
case _:
raise ValueError
for value in new:
nb_record = self.api.plugins.netbox_dns.records.create(
zone=nb_zone.id,
name=name,
type=change.new._type,
ttl=change.new.ttl,
value=value.replace("\\\\", "\\").replace("\\;", ";"),
disable_ptr=True,
)
self.log.debug(f"{nb_record!r}")
case octodns.record.Delete():
name = change.existing.name
if name == "":
name = "@"
nb_records = self.api.plugins.netbox_dns.records.filter(
zone_id=nb_zone.id,
name=change.existing.name,
type=change.existing._type,
)
match change.existing:
case octodns.record.ValueMixin():
existing = {repr(change.existing.value)[1:-1]}
case octodns.record.ValuesMixin():
existing = {repr(v)[1:-1] for v in change.existing.values}
case _:
raise ValueError
for nb_record in nb_records:
for value in existing:
if nb_record.value == value:
self.log.debug(
f"{nb_record.id} {nb_record.name} {nb_record.type} {nb_record.value} {value}"
)
self.log.debug(f"{nb_record.url} {nb_record.endpoint.url}")
nb_record.delete()
case octodns.record.Update():
name = change.existing.name
if name == "":
name = "@"
nb_records = self.api.plugins.netbox_dns.records.filter(
zone_id=nb_zone.id,
name=name,
type=change.existing._type,
)
match change.existing:
case octodns.record.ValueMixin():
existing = {repr(change.existing.value)[1:-1]}
case octodns.record.ValuesMixin():
existing = {repr(v)[1:-1] for v in change.existing.values}
case _:
raise ValueError
match change.new:
case octodns.record.ValueMixin():
new = {repr(change.new.value)[1:-1]}
case octodns.record.ValuesMixin():
new = {repr(v)[1:-1] for v in change.new.values}
case _:
raise ValueError
delete = existing.difference(new)
update = existing.intersection(new)
create = new.difference(existing)
for nb_record in nb_records:
if nb_record.value in delete:
nb_record.delete()
if nb_record.value in update:
nb_record.ttl = change.new.ttl
nb_record.save()
for value in create:
nb_record = self.api.plugins.netbox_dns.records.create(
zone=nb_zone.id,
name=name,
type=change.new._type,
ttl=change.new.ttl,
value=value.replace("\\\\", "\\").replace("\\;", ";"),
disable_ptr=True,
)
self.log.debug(f"{nb_record!r}")