make populate function smaller
This commit is contained in:
parent
f7f54f8eb7
commit
c95f95fb8b
|
@ -1,5 +1,5 @@
|
|||
import logging
|
||||
from typing import Literal
|
||||
from typing import Any, Literal
|
||||
|
||||
import dns.rdata
|
||||
import octodns.record
|
||||
|
@ -65,7 +65,7 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
|
|||
super().__init__(id)
|
||||
|
||||
self.api = pynetbox.core.api.Api(url, token)
|
||||
self.nb_view = self._get_view(view)
|
||||
self.nb_view = self._get_nb_view(view)
|
||||
self.ttl = ttl
|
||||
self.replace_duplicates = replace_duplicates
|
||||
self.make_absolute = make_absolute
|
||||
|
@ -86,7 +86,7 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
|
|||
|
||||
return absolute_value
|
||||
|
||||
def _get_view(self, view: str | None | Literal[False]) -> dict[str, int | str]:
|
||||
def _get_nb_view(self, view: str | None | Literal[False]) -> dict[str, int | str]:
|
||||
"""
|
||||
Get the correct netbox view when requested
|
||||
|
||||
|
@ -101,7 +101,7 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
|
|||
|
||||
nb_view: pynetbox.core.response.Record = self.api.plugins.netbox_dns.views.get(name=view)
|
||||
if nb_view is None:
|
||||
msg = f"dns view: '{view}' has not been found"
|
||||
msg = f"dns view={view}, has not been found"
|
||||
self.log.error(msg)
|
||||
raise ValueError(msg)
|
||||
|
||||
|
@ -127,6 +127,146 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
|
|||
|
||||
return nb_zone
|
||||
|
||||
def _format_rdata(self, rdata: dns.Rdata, raw_value: str) -> dict[str, Any]:
|
||||
"""
|
||||
Format netbox record values to correct octodns record values
|
||||
|
||||
@param rdata: rrdata record value
|
||||
@param raw_value: raw record value
|
||||
|
||||
@return: formatted rrdata value
|
||||
"""
|
||||
match rdata.rdtype.name:
|
||||
case "A" | "AAAA":
|
||||
value = rdata.address
|
||||
|
||||
case "CNAME":
|
||||
value = self._make_absolute(rdata.target.to_text())
|
||||
|
||||
case "DNAME" | "NS" | "PTR":
|
||||
value = rdata.target.to_text()
|
||||
|
||||
case "CAA":
|
||||
value = {
|
||||
"flags": rdata.flags,
|
||||
"tag": rdata.tag,
|
||||
"value": rdata.value,
|
||||
}
|
||||
|
||||
case "LOC":
|
||||
value = {
|
||||
"lat_direction": "N" if rdata.latitude[4] >= 0 else "S",
|
||||
"lat_degrees": rdata.latitude[0],
|
||||
"lat_minutes": rdata.latitude[1],
|
||||
"lat_seconds": rdata.latitude[2] + rdata.latitude[3] / 1000,
|
||||
"long_direction": "W" if rdata.latitude[4] >= 0 else "E",
|
||||
"long_degrees": rdata.longitude[0],
|
||||
"long_minutes": rdata.longitude[1],
|
||||
"long_seconds": rdata.longitude[2] + rdata.longitude[3] / 1000,
|
||||
"altitude": rdata.altitude / 100,
|
||||
"size": rdata.size / 100,
|
||||
"precision_horz": rdata.horizontal_precision / 100,
|
||||
"precision_vert": rdata.veritical_precision / 100,
|
||||
}
|
||||
|
||||
case "MX":
|
||||
value = {
|
||||
"preference": rdata.preference,
|
||||
"exchange": self._make_absolute(rdata.exchange.to_text()),
|
||||
}
|
||||
|
||||
case "NAPTR":
|
||||
value = {
|
||||
"order": rdata.order,
|
||||
"preference": rdata.preference,
|
||||
"flags": rdata.flags,
|
||||
"service": rdata.service,
|
||||
"regexp": rdata.regexp,
|
||||
"replacement": rdata.replacement.to_text(),
|
||||
}
|
||||
|
||||
case "SSHFP":
|
||||
value = {
|
||||
"algorithm": rdata.algorithm,
|
||||
"fingerprint_type": rdata.fp_type,
|
||||
"fingerprint": rdata.fingerprint,
|
||||
}
|
||||
|
||||
case "SPF" | "TXT":
|
||||
value = raw_value.replace(";", r"\;")
|
||||
|
||||
case "SRV":
|
||||
value = {
|
||||
"priority": rdata.priority,
|
||||
"weight": rdata.weight,
|
||||
"port": rdata.port,
|
||||
"target": self._make_absolute(rdata.target.to_text()),
|
||||
}
|
||||
|
||||
case "SOA":
|
||||
self.log.warning("SOA record type not implemented")
|
||||
raise NotImplementedError
|
||||
|
||||
case _:
|
||||
self.log.error("invalid record type")
|
||||
raise ValueError
|
||||
|
||||
self.log.debug(f"formatted record value={value}")
|
||||
|
||||
return value
|
||||
|
||||
def _format_nb_records(self, zone: octodns.zone.Zone) -> list[dict[str, Any]]:
|
||||
"""
|
||||
Format netbox dns records to the octodns format
|
||||
|
||||
@param zone: octodns zone
|
||||
|
||||
@return: a list of octodns compatible record dicts
|
||||
"""
|
||||
records: dict[tuple[str, str], dict[str, Any]] = {}
|
||||
|
||||
nb_zone = self._get_nb_zone(zone.name, view=self.nb_view)
|
||||
if not nb_zone:
|
||||
self.log.error(f"zone={zone.name}, not found in view={self.nb_view}")
|
||||
raise LookupError
|
||||
|
||||
nb_records: pynetbox.core.response.RecordSet = self.api.plugins.netbox_dns.records.filter(
|
||||
zone_id=nb_zone.id
|
||||
)
|
||||
for nb_record in nb_records:
|
||||
nb_record: pynetbox.core.response.Record
|
||||
|
||||
rcd_name: str = nb_record.name if nb_record.name != "@" else ""
|
||||
raw_value: str = nb_record.value if nb_record.value != "@" else nb_record.zone.name
|
||||
rcd_type: str = nb_record.type
|
||||
rcd_ttl: int = nb_record.ttl or nb_zone.default_ttl
|
||||
if nb_record.type == "NS":
|
||||
rcd_ttl = nb_zone.soa_refresh
|
||||
|
||||
rcd_data = {
|
||||
"name": rcd_name,
|
||||
"type": rcd_type,
|
||||
"ttl": rcd_ttl,
|
||||
"values": [],
|
||||
}
|
||||
|
||||
self.log.debug(f"record data={rcd_data}")
|
||||
|
||||
rdata = dns.rdata.from_text("IN", nb_record.type, raw_value)
|
||||
try:
|
||||
rcd_value = self._format_rdata(rdata, raw_value)
|
||||
except NotImplementedError:
|
||||
continue
|
||||
except Exception as exc:
|
||||
raise exc
|
||||
|
||||
if (rcd_name, rcd_type) not in records:
|
||||
records[(rcd_name, rcd_type)] = rcd_data
|
||||
|
||||
records[(rcd_name, rcd_type)]["values"].append(rcd_value)
|
||||
|
||||
return list(records.values())
|
||||
|
||||
def populate(
|
||||
self, zone: octodns.zone.Zone, target: bool = False, lenient: bool = False
|
||||
) -> None:
|
||||
|
@ -137,117 +277,10 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
|
|||
@param target: when `True`, load the current state of the provider.
|
||||
@param lenient: when `True`, skip record validation and do a "best effort" load of data.
|
||||
"""
|
||||
self.log.info(f"populate: name={zone.name}, target={target}, lenient={lenient}")
|
||||
self.log.info(f"populate -> name={zone.name}, target={target}, lenient={lenient}")
|
||||
|
||||
records = {}
|
||||
|
||||
nb_zone = self._get_nb_zone(zone.name, view=self.nb_view)
|
||||
if not nb_zone:
|
||||
self.log.error(f"Zone '{zone.name[:-1]}' not found in view: '{self.nb_view}'")
|
||||
raise LookupError
|
||||
|
||||
nb_records = self.api.plugins.netbox_dns.records.filter(zone_id=nb_zone.id)
|
||||
for nb_record in nb_records:
|
||||
self.log.debug(f"{nb_record.name!r} {nb_record.type!r} {nb_record.value!r}")
|
||||
|
||||
rcd_name: str = nb_record.name if nb_record.name != "@" else ""
|
||||
rcd_value: str = nb_record.value if nb_record.value != "@" else nb_record.zone.name
|
||||
|
||||
if nb_record.ttl:
|
||||
nb_ttl = nb_record.ttl
|
||||
elif nb_record.type == "NS":
|
||||
nb_ttl = nb_zone.soa_refresh
|
||||
else:
|
||||
nb_ttl = nb_zone.default_ttl
|
||||
|
||||
data = {
|
||||
"name": rcd_name,
|
||||
"type": nb_record.type,
|
||||
"ttl": nb_ttl,
|
||||
"values": [],
|
||||
}
|
||||
rdata = dns.rdata.from_text("IN", nb_record.type, rcd_value)
|
||||
match rdata.rdtype.name:
|
||||
case "A" | "AAAA":
|
||||
value = rdata.address
|
||||
|
||||
case "CNAME":
|
||||
value = self._make_absolute(rdata.target.to_text())
|
||||
|
||||
case "DNAME" | "NS" | "PTR":
|
||||
value = rdata.target.to_text()
|
||||
|
||||
case "CAA":
|
||||
value = {
|
||||
"flags": rdata.flags,
|
||||
"tag": rdata.tag,
|
||||
"value": rdata.value,
|
||||
}
|
||||
|
||||
case "LOC":
|
||||
value = {
|
||||
"lat_direction": "N" if rdata.latitude[4] >= 0 else "S",
|
||||
"lat_degrees": rdata.latitude[0],
|
||||
"lat_minutes": rdata.latitude[1],
|
||||
"lat_seconds": rdata.latitude[2] + rdata.latitude[3] / 1000,
|
||||
"long_direction": "W" if rdata.latitude[4] >= 0 else "E",
|
||||
"long_degrees": rdata.longitude[0],
|
||||
"long_minutes": rdata.longitude[1],
|
||||
"long_seconds": rdata.longitude[2] + rdata.longitude[3] / 1000,
|
||||
"altitude": rdata.altitude / 100,
|
||||
"size": rdata.size / 100,
|
||||
"precision_horz": rdata.horizontal_precision / 100,
|
||||
"precision_vert": rdata.veritical_precision / 100,
|
||||
}
|
||||
|
||||
case "MX":
|
||||
value = {
|
||||
"preference": rdata.preference,
|
||||
"exchange": self._make_absolute(rdata.exchange.to_text()),
|
||||
}
|
||||
|
||||
case "NAPTR":
|
||||
value = {
|
||||
"order": rdata.order,
|
||||
"preference": rdata.preference,
|
||||
"flags": rdata.flags,
|
||||
"service": rdata.service,
|
||||
"regexp": rdata.regexp,
|
||||
"replacement": rdata.replacement.to_text(),
|
||||
}
|
||||
|
||||
case "SSHFP":
|
||||
value = {
|
||||
"algorithm": rdata.algorithm,
|
||||
"fingerprint_type": rdata.fp_type,
|
||||
"fingerprint": rdata.fingerprint,
|
||||
}
|
||||
|
||||
case "SOA":
|
||||
self.log.debug("SOA")
|
||||
continue
|
||||
|
||||
case "SPF" | "TXT":
|
||||
value = rcd_value.replace(";", r"\;")
|
||||
|
||||
case "SRV":
|
||||
value = {
|
||||
"priority": rdata.priority,
|
||||
"weight": rdata.weight,
|
||||
"port": rdata.port,
|
||||
"target": self._make_absolute(rdata.target.to_text()),
|
||||
}
|
||||
|
||||
case _:
|
||||
raise ValueError
|
||||
|
||||
if (rcd_name, nb_record.type) not in records:
|
||||
records[(rcd_name, nb_record.type)] = data
|
||||
records[(rcd_name, nb_record.type)]["values"].append(value)
|
||||
|
||||
for data in records.values():
|
||||
if len(data["values"]) == 1:
|
||||
data["value"] = data.pop("values")[0]
|
||||
records = self._format_nb_records(zone)
|
||||
for data in records:
|
||||
record = octodns.record.Record.new(
|
||||
zone=zone,
|
||||
name=data["name"],
|
||||
|
@ -257,4 +290,4 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
|
|||
)
|
||||
zone.add_record(record, lenient=lenient, replace=self.replace_duplicates)
|
||||
|
||||
self.log.info(f"populate: found {len(zone.records)} records for zone {zone.name}")
|
||||
self.log.info(f"populate -> found {len(zone.records)} records for zone {zone.name}")
|
||||
|
|
Loading…
Reference in New Issue