2022-05-02 04:23:21 +02:00
|
|
|
import logging
|
2024-01-25 11:39:22 +01:00
|
|
|
from typing import Any, Literal
|
2022-05-02 04:23:21 +02:00
|
|
|
|
|
|
|
import dns.rdata
|
|
|
|
import octodns.record
|
2024-01-09 22:54:52 +01:00
|
|
|
import octodns.source.base
|
2022-05-02 04:23:21 +02:00
|
|
|
import octodns.zone
|
|
|
|
import pynetbox.core.api
|
2024-01-09 22:54:52 +01:00
|
|
|
import pynetbox.core.response
|
2022-05-02 04:23:21 +02:00
|
|
|
|
|
|
|
|
2024-01-09 22:54:52 +01:00
|
|
|
class NetBoxDNSSource(octodns.source.base.BaseSource):
|
|
|
|
"""
|
|
|
|
OctoDNS provider for NetboxDNS
|
|
|
|
"""
|
|
|
|
|
|
|
|
SUPPORTS_GEO = False
|
|
|
|
SUPPORTS_DYNAMIC = False
|
2024-01-25 10:51:42 +01:00
|
|
|
SUPPORTS: set[str] = { # noqa
|
2022-05-02 04:23:21 +02:00
|
|
|
"A",
|
|
|
|
"AAAA",
|
|
|
|
"AFSDB",
|
|
|
|
"APL",
|
|
|
|
"CAA",
|
|
|
|
"CDNSKEY",
|
|
|
|
"CERT",
|
|
|
|
"CNAME",
|
|
|
|
"DCHID",
|
|
|
|
"DNAME",
|
|
|
|
"DNSKEY",
|
|
|
|
"DS",
|
|
|
|
"HIP",
|
|
|
|
"IPSECKEY",
|
|
|
|
"LOC",
|
|
|
|
"MX",
|
|
|
|
"NAPTR",
|
|
|
|
"NS",
|
|
|
|
"NSEC",
|
|
|
|
"PTR",
|
|
|
|
"RP",
|
|
|
|
"RRSIG",
|
|
|
|
"SOA",
|
|
|
|
"SPF",
|
|
|
|
"SRV",
|
|
|
|
"SSHFP",
|
|
|
|
"TLSA",
|
|
|
|
"TXT",
|
|
|
|
}
|
|
|
|
|
2022-09-10 15:24:58 +02:00
|
|
|
def __init__(
|
2023-11-09 13:08:03 +01:00
|
|
|
self,
|
2024-01-25 10:51:42 +01:00
|
|
|
id: int, # noqa
|
2023-11-09 13:08:03 +01:00
|
|
|
url: str,
|
|
|
|
token: str,
|
2023-11-09 14:22:05 +01:00
|
|
|
view: str | None | Literal[False] = False,
|
2023-11-09 13:08:03 +01:00
|
|
|
ttl=3600,
|
2024-01-09 22:54:52 +01:00
|
|
|
replace_duplicates=False,
|
|
|
|
make_absolute=False,
|
2022-09-10 15:24:58 +02:00
|
|
|
):
|
2024-01-09 22:54:52 +01:00
|
|
|
"""
|
|
|
|
Initialize the NetboxDNSSource
|
|
|
|
"""
|
2022-09-13 12:02:17 +02:00
|
|
|
self.log = logging.getLogger(f"NetboxDNSSource[{id}]")
|
2024-01-09 22:54:52 +01:00
|
|
|
self.log.debug(f"__init__: {id=}, {url=}, {view=}, {replace_duplicates=}, {make_absolute=}")
|
|
|
|
super().__init__(id)
|
2024-01-25 10:51:42 +01:00
|
|
|
|
|
|
|
self.api = pynetbox.core.api.Api(url, token)
|
2024-01-25 11:39:22 +01:00
|
|
|
self.nb_view = self._get_nb_view(view)
|
2024-01-25 10:51:42 +01:00
|
|
|
self.ttl = ttl
|
2022-09-10 15:24:58 +02:00
|
|
|
self.replace_duplicates = replace_duplicates
|
2023-11-10 17:06:52 +01:00
|
|
|
self.make_absolute = make_absolute
|
|
|
|
|
|
|
|
def _make_absolute(self, value: str) -> str:
|
2024-01-25 10:51:42 +01:00
|
|
|
"""
|
|
|
|
Return dns name with trailing dot to make it absolute
|
|
|
|
|
|
|
|
@param value: dns record value
|
|
|
|
|
|
|
|
@return: absolute dns record value
|
|
|
|
"""
|
|
|
|
if not self.make_absolute or value.endswith("."):
|
2023-11-10 17:06:52 +01:00
|
|
|
return value
|
2024-01-25 10:51:42 +01:00
|
|
|
|
|
|
|
absolute_value = value + "."
|
|
|
|
self.log.debug(f"relative={value}, absolute={absolute_value}")
|
|
|
|
|
|
|
|
return absolute_value
|
2022-05-02 04:23:21 +02:00
|
|
|
|
2024-01-25 11:39:22 +01:00
|
|
|
def _get_nb_view(self, view: str | None | Literal[False]) -> dict[str, int | str]:
|
2024-01-25 10:51:42 +01:00
|
|
|
"""
|
|
|
|
Get the correct netbox view when requested
|
|
|
|
|
|
|
|
@param view: `False` for no view, `None` for zones without a view, else the view name
|
|
|
|
|
|
|
|
@return: the netbox view id in the netbox query format
|
|
|
|
"""
|
2024-01-09 23:07:04 +01:00
|
|
|
if view is False:
|
|
|
|
return {}
|
2023-11-09 14:17:36 +01:00
|
|
|
if view is None:
|
|
|
|
return {"view": "null"}
|
|
|
|
|
2024-01-25 10:51:42 +01:00
|
|
|
nb_view: pynetbox.core.response.Record = self.api.plugins.netbox_dns.views.get(name=view)
|
2023-11-09 13:08:03 +01:00
|
|
|
if nb_view is None:
|
2024-01-25 11:39:22 +01:00
|
|
|
msg = f"dns view={view}, has not been found"
|
2024-01-09 22:54:52 +01:00
|
|
|
self.log.error(msg)
|
|
|
|
raise ValueError(msg)
|
2024-01-25 10:51:42 +01:00
|
|
|
|
|
|
|
self.log.debug(f"found view={nb_view.name}, id={nb_view.id}")
|
2023-11-09 13:08:03 +01:00
|
|
|
|
2023-11-09 14:17:36 +01:00
|
|
|
return {"view_id": nb_view.id}
|
2023-11-09 13:08:03 +01:00
|
|
|
|
2024-01-09 22:54:52 +01:00
|
|
|
def _get_nb_zone(self, name: str, view: dict[str, str | int]) -> pynetbox.core.response.Record:
|
|
|
|
"""
|
|
|
|
Given a zone name and a view name, look it up in NetBox.
|
|
|
|
|
2024-01-25 10:51:42 +01:00
|
|
|
@param name: name of the dns zone
|
|
|
|
@param view: the netbox view id in the api query format
|
|
|
|
|
2024-01-09 22:54:52 +01:00
|
|
|
@raise pynetbox.RequestError: if declared view is not existent
|
2024-01-25 10:51:42 +01:00
|
|
|
|
|
|
|
@return: the netbox dns zone object
|
2024-01-09 22:54:52 +01:00
|
|
|
"""
|
2023-11-09 14:33:45 +01:00
|
|
|
query_params = {"name": name[:-1], **view}
|
2024-01-25 10:51:42 +01:00
|
|
|
nb_zone = self.api.plugins.netbox_dns.zones.get(**query_params)
|
|
|
|
|
|
|
|
self.log.debug(f"found zone={nb_zone.name}, id={nb_zone.id}")
|
2023-06-07 18:18:16 +02:00
|
|
|
|
2022-10-11 15:43:43 +02:00
|
|
|
return nb_zone
|
2022-05-02 04:23:21 +02:00
|
|
|
|
2024-01-25 11:45:28 +01:00
|
|
|
def _format_rdata(self, rdata: dns.rdata.Rdata, raw_value: str) -> str | dict[str, Any]:
|
2024-01-09 22:54:52 +01:00
|
|
|
"""
|
2024-01-25 11:39:22 +01:00
|
|
|
Format netbox record values to correct octodns record values
|
2024-01-25 10:51:42 +01:00
|
|
|
|
2024-01-25 11:39:22 +01:00
|
|
|
@param rdata: rrdata record value
|
|
|
|
@param raw_value: raw record value
|
|
|
|
|
|
|
|
@return: formatted rrdata value
|
2024-01-09 22:54:52 +01:00
|
|
|
"""
|
2024-01-25 11:39:22 +01:00
|
|
|
match rdata.rdtype.name:
|
|
|
|
case "A" | "AAAA":
|
|
|
|
value = rdata.address
|
|
|
|
|
|
|
|
case "CNAME":
|
|
|
|
value = self._make_absolute(rdata.target.to_text())
|
|
|
|
|
|
|
|
case "DNAME" | "NS" | "PTR":
|
|
|
|
value = rdata.target.to_text()
|
|
|
|
|
|
|
|
case "CAA":
|
|
|
|
value = {
|
|
|
|
"flags": rdata.flags,
|
|
|
|
"tag": rdata.tag,
|
|
|
|
"value": rdata.value,
|
|
|
|
}
|
|
|
|
|
|
|
|
case "LOC":
|
|
|
|
value = {
|
|
|
|
"lat_direction": "N" if rdata.latitude[4] >= 0 else "S",
|
|
|
|
"lat_degrees": rdata.latitude[0],
|
|
|
|
"lat_minutes": rdata.latitude[1],
|
|
|
|
"lat_seconds": rdata.latitude[2] + rdata.latitude[3] / 1000,
|
|
|
|
"long_direction": "W" if rdata.latitude[4] >= 0 else "E",
|
|
|
|
"long_degrees": rdata.longitude[0],
|
|
|
|
"long_minutes": rdata.longitude[1],
|
|
|
|
"long_seconds": rdata.longitude[2] + rdata.longitude[3] / 1000,
|
|
|
|
"altitude": rdata.altitude / 100,
|
|
|
|
"size": rdata.size / 100,
|
|
|
|
"precision_horz": rdata.horizontal_precision / 100,
|
|
|
|
"precision_vert": rdata.veritical_precision / 100,
|
|
|
|
}
|
|
|
|
|
|
|
|
case "MX":
|
|
|
|
value = {
|
|
|
|
"preference": rdata.preference,
|
|
|
|
"exchange": self._make_absolute(rdata.exchange.to_text()),
|
|
|
|
}
|
|
|
|
|
|
|
|
case "NAPTR":
|
|
|
|
value = {
|
|
|
|
"order": rdata.order,
|
|
|
|
"preference": rdata.preference,
|
|
|
|
"flags": rdata.flags,
|
|
|
|
"service": rdata.service,
|
|
|
|
"regexp": rdata.regexp,
|
|
|
|
"replacement": rdata.replacement.to_text(),
|
|
|
|
}
|
|
|
|
|
|
|
|
case "SSHFP":
|
|
|
|
value = {
|
|
|
|
"algorithm": rdata.algorithm,
|
|
|
|
"fingerprint_type": rdata.fp_type,
|
|
|
|
"fingerprint": rdata.fingerprint,
|
|
|
|
}
|
|
|
|
|
|
|
|
case "SPF" | "TXT":
|
|
|
|
value = raw_value.replace(";", r"\;")
|
|
|
|
|
|
|
|
case "SRV":
|
|
|
|
value = {
|
|
|
|
"priority": rdata.priority,
|
|
|
|
"weight": rdata.weight,
|
|
|
|
"port": rdata.port,
|
|
|
|
"target": self._make_absolute(rdata.target.to_text()),
|
|
|
|
}
|
|
|
|
|
|
|
|
case "SOA":
|
2024-01-25 11:51:10 +01:00
|
|
|
self.log.debug("SOA record type not implemented")
|
2024-01-25 11:39:22 +01:00
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
case _:
|
|
|
|
self.log.error("invalid record type")
|
|
|
|
raise ValueError
|
|
|
|
|
|
|
|
self.log.debug(f"formatted record value={value}")
|
|
|
|
|
2024-01-25 11:43:20 +01:00
|
|
|
return value # type:ignore
|
2024-01-25 11:39:22 +01:00
|
|
|
|
|
|
|
def _format_nb_records(self, zone: octodns.zone.Zone) -> list[dict[str, Any]]:
|
|
|
|
"""
|
|
|
|
Format netbox dns records to the octodns format
|
|
|
|
|
|
|
|
@param zone: octodns zone
|
2022-05-02 04:23:21 +02:00
|
|
|
|
2024-01-25 11:39:22 +01:00
|
|
|
@return: a list of octodns compatible record dicts
|
|
|
|
"""
|
|
|
|
records: dict[tuple[str, str], dict[str, Any]] = {}
|
2022-05-02 04:23:21 +02:00
|
|
|
|
2024-01-25 10:51:42 +01:00
|
|
|
nb_zone = self._get_nb_zone(zone.name, view=self.nb_view)
|
2023-06-07 18:18:16 +02:00
|
|
|
if not nb_zone:
|
2024-01-25 11:39:22 +01:00
|
|
|
self.log.error(f"zone={zone.name}, not found in view={self.nb_view}")
|
2023-06-07 18:18:16 +02:00
|
|
|
raise LookupError
|
2023-11-09 13:08:03 +01:00
|
|
|
|
2024-01-25 11:39:22 +01:00
|
|
|
nb_records: pynetbox.core.response.RecordSet = self.api.plugins.netbox_dns.records.filter(
|
|
|
|
zone_id=nb_zone.id
|
|
|
|
)
|
2022-05-02 04:23:21 +02:00
|
|
|
for nb_record in nb_records:
|
2023-11-10 16:32:07 +01:00
|
|
|
rcd_name: str = nb_record.name if nb_record.name != "@" else ""
|
2024-01-25 11:39:22 +01:00
|
|
|
raw_value: str = nb_record.value if nb_record.value != "@" else nb_record.zone.name
|
|
|
|
rcd_type: str = nb_record.type
|
|
|
|
rcd_ttl: int = nb_record.ttl or nb_zone.default_ttl
|
|
|
|
if nb_record.type == "NS":
|
|
|
|
rcd_ttl = nb_zone.soa_refresh
|
2023-11-10 16:32:07 +01:00
|
|
|
|
2024-01-25 11:39:22 +01:00
|
|
|
rcd_data = {
|
2023-11-10 16:32:07 +01:00
|
|
|
"name": rcd_name,
|
2024-01-25 11:39:22 +01:00
|
|
|
"type": rcd_type,
|
|
|
|
"ttl": rcd_ttl,
|
2022-05-02 04:23:21 +02:00
|
|
|
"values": [],
|
|
|
|
}
|
2024-01-25 11:39:22 +01:00
|
|
|
|
|
|
|
self.log.debug(f"record data={rcd_data}")
|
|
|
|
|
|
|
|
rdata = dns.rdata.from_text("IN", nb_record.type, raw_value)
|
|
|
|
try:
|
|
|
|
rcd_value = self._format_rdata(rdata, raw_value)
|
|
|
|
except NotImplementedError:
|
|
|
|
continue
|
|
|
|
except Exception as exc:
|
|
|
|
raise exc
|
|
|
|
|
|
|
|
if (rcd_name, rcd_type) not in records:
|
|
|
|
records[(rcd_name, rcd_type)] = rcd_data
|
|
|
|
|
|
|
|
records[(rcd_name, rcd_type)]["values"].append(rcd_value)
|
|
|
|
|
|
|
|
return list(records.values())
|
|
|
|
|
|
|
|
def populate(
|
|
|
|
self, zone: octodns.zone.Zone, target: bool = False, lenient: bool = False
|
|
|
|
) -> None:
|
|
|
|
"""
|
|
|
|
Get all the records of a zone from NetBox and add them to the OctoDNS zone
|
|
|
|
|
|
|
|
@param zone: octodns zone
|
|
|
|
@param target: when `True`, load the current state of the provider.
|
|
|
|
@param lenient: when `True`, skip record validation and do a "best effort" load of data.
|
|
|
|
"""
|
2024-01-25 11:51:10 +01:00
|
|
|
self.log.info(f"populate -> '{zone.name}', target={target}, lenient={lenient}")
|
2024-01-25 11:39:22 +01:00
|
|
|
|
|
|
|
records = self._format_nb_records(zone)
|
|
|
|
for data in records:
|
2024-01-25 11:51:10 +01:00
|
|
|
if len(data["values"]) == 1:
|
|
|
|
data["value"] = data.pop("values")[0]
|
2022-05-02 04:23:21 +02:00
|
|
|
record = octodns.record.Record.new(
|
|
|
|
zone=zone,
|
|
|
|
name=data["name"],
|
|
|
|
data=data,
|
|
|
|
source=self,
|
|
|
|
lenient=lenient,
|
|
|
|
)
|
2022-09-10 15:24:58 +02:00
|
|
|
zone.add_record(record, lenient=lenient, replace=self.replace_duplicates)
|
2022-05-02 04:23:21 +02:00
|
|
|
|
2024-01-25 11:51:10 +01:00
|
|
|
self.log.info(f"populate -> found {len(zone.records)} records for zone '{zone.name}'")
|