From c95f95fb8b8a2ae79b8f4db05675e98386a723b6 Mon Sep 17 00:00:00 2001 From: olofvndrhr Date: Thu, 25 Jan 2024 11:39:22 +0100 Subject: [PATCH] make populate function smaller --- src/octodns_netbox_dns/__init__.py | 263 ++++++++++++++++------------- 1 file changed, 148 insertions(+), 115 deletions(-) diff --git a/src/octodns_netbox_dns/__init__.py b/src/octodns_netbox_dns/__init__.py index c032433..586fb56 100644 --- a/src/octodns_netbox_dns/__init__.py +++ b/src/octodns_netbox_dns/__init__.py @@ -1,5 +1,5 @@ import logging -from typing import Literal +from typing import Any, Literal import dns.rdata import octodns.record @@ -65,7 +65,7 @@ class NetBoxDNSSource(octodns.source.base.BaseSource): super().__init__(id) self.api = pynetbox.core.api.Api(url, token) - self.nb_view = self._get_view(view) + self.nb_view = self._get_nb_view(view) self.ttl = ttl self.replace_duplicates = replace_duplicates self.make_absolute = make_absolute @@ -86,7 +86,7 @@ class NetBoxDNSSource(octodns.source.base.BaseSource): return absolute_value - def _get_view(self, view: str | None | Literal[False]) -> dict[str, int | str]: + def _get_nb_view(self, view: str | None | Literal[False]) -> dict[str, int | str]: """ Get the correct netbox view when requested @@ -101,7 +101,7 @@ class NetBoxDNSSource(octodns.source.base.BaseSource): nb_view: pynetbox.core.response.Record = self.api.plugins.netbox_dns.views.get(name=view) if nb_view is None: - msg = f"dns view: '{view}' has not been found" + msg = f"dns view={view}, has not been found" self.log.error(msg) raise ValueError(msg) @@ -127,6 +127,146 @@ class NetBoxDNSSource(octodns.source.base.BaseSource): return nb_zone + def _format_rdata(self, rdata: dns.Rdata, raw_value: str) -> dict[str, Any]: + """ + Format netbox record values to correct octodns record values + + @param rdata: rrdata record value + @param raw_value: raw record value + + @return: formatted rrdata value + """ + match rdata.rdtype.name: + case "A" | "AAAA": + value = rdata.address + + case "CNAME": + value = self._make_absolute(rdata.target.to_text()) + + case "DNAME" | "NS" | "PTR": + value = rdata.target.to_text() + + case "CAA": + value = { + "flags": rdata.flags, + "tag": rdata.tag, + "value": rdata.value, + } + + case "LOC": + value = { + "lat_direction": "N" if rdata.latitude[4] >= 0 else "S", + "lat_degrees": rdata.latitude[0], + "lat_minutes": rdata.latitude[1], + "lat_seconds": rdata.latitude[2] + rdata.latitude[3] / 1000, + "long_direction": "W" if rdata.latitude[4] >= 0 else "E", + "long_degrees": rdata.longitude[0], + "long_minutes": rdata.longitude[1], + "long_seconds": rdata.longitude[2] + rdata.longitude[3] / 1000, + "altitude": rdata.altitude / 100, + "size": rdata.size / 100, + "precision_horz": rdata.horizontal_precision / 100, + "precision_vert": rdata.veritical_precision / 100, + } + + case "MX": + value = { + "preference": rdata.preference, + "exchange": self._make_absolute(rdata.exchange.to_text()), + } + + case "NAPTR": + value = { + "order": rdata.order, + "preference": rdata.preference, + "flags": rdata.flags, + "service": rdata.service, + "regexp": rdata.regexp, + "replacement": rdata.replacement.to_text(), + } + + case "SSHFP": + value = { + "algorithm": rdata.algorithm, + "fingerprint_type": rdata.fp_type, + "fingerprint": rdata.fingerprint, + } + + case "SPF" | "TXT": + value = raw_value.replace(";", r"\;") + + case "SRV": + value = { + "priority": rdata.priority, + "weight": rdata.weight, + "port": rdata.port, + "target": self._make_absolute(rdata.target.to_text()), + } + + case "SOA": + self.log.warning("SOA record type not implemented") + raise NotImplementedError + + case _: + self.log.error("invalid record type") + raise ValueError + + self.log.debug(f"formatted record value={value}") + + return value + + def _format_nb_records(self, zone: octodns.zone.Zone) -> list[dict[str, Any]]: + """ + Format netbox dns records to the octodns format + + @param zone: octodns zone + + @return: a list of octodns compatible record dicts + """ + records: dict[tuple[str, str], dict[str, Any]] = {} + + nb_zone = self._get_nb_zone(zone.name, view=self.nb_view) + if not nb_zone: + self.log.error(f"zone={zone.name}, not found in view={self.nb_view}") + raise LookupError + + nb_records: pynetbox.core.response.RecordSet = self.api.plugins.netbox_dns.records.filter( + zone_id=nb_zone.id + ) + for nb_record in nb_records: + nb_record: pynetbox.core.response.Record + + rcd_name: str = nb_record.name if nb_record.name != "@" else "" + raw_value: str = nb_record.value if nb_record.value != "@" else nb_record.zone.name + rcd_type: str = nb_record.type + rcd_ttl: int = nb_record.ttl or nb_zone.default_ttl + if nb_record.type == "NS": + rcd_ttl = nb_zone.soa_refresh + + rcd_data = { + "name": rcd_name, + "type": rcd_type, + "ttl": rcd_ttl, + "values": [], + } + + self.log.debug(f"record data={rcd_data}") + + rdata = dns.rdata.from_text("IN", nb_record.type, raw_value) + try: + rcd_value = self._format_rdata(rdata, raw_value) + except NotImplementedError: + continue + except Exception as exc: + raise exc + + if (rcd_name, rcd_type) not in records: + records[(rcd_name, rcd_type)] = rcd_data + + records[(rcd_name, rcd_type)]["values"].append(rcd_value) + + return list(records.values()) + def populate( self, zone: octodns.zone.Zone, target: bool = False, lenient: bool = False ) -> None: @@ -137,117 +277,10 @@ class NetBoxDNSSource(octodns.source.base.BaseSource): @param target: when `True`, load the current state of the provider. @param lenient: when `True`, skip record validation and do a "best effort" load of data. """ - self.log.info(f"populate: name={zone.name}, target={target}, lenient={lenient}") + self.log.info(f"populate -> name={zone.name}, target={target}, lenient={lenient}") - records = {} - - nb_zone = self._get_nb_zone(zone.name, view=self.nb_view) - if not nb_zone: - self.log.error(f"Zone '{zone.name[:-1]}' not found in view: '{self.nb_view}'") - raise LookupError - - nb_records = self.api.plugins.netbox_dns.records.filter(zone_id=nb_zone.id) - for nb_record in nb_records: - self.log.debug(f"{nb_record.name!r} {nb_record.type!r} {nb_record.value!r}") - - rcd_name: str = nb_record.name if nb_record.name != "@" else "" - rcd_value: str = nb_record.value if nb_record.value != "@" else nb_record.zone.name - - if nb_record.ttl: - nb_ttl = nb_record.ttl - elif nb_record.type == "NS": - nb_ttl = nb_zone.soa_refresh - else: - nb_ttl = nb_zone.default_ttl - - data = { - "name": rcd_name, - "type": nb_record.type, - "ttl": nb_ttl, - "values": [], - } - rdata = dns.rdata.from_text("IN", nb_record.type, rcd_value) - match rdata.rdtype.name: - case "A" | "AAAA": - value = rdata.address - - case "CNAME": - value = self._make_absolute(rdata.target.to_text()) - - case "DNAME" | "NS" | "PTR": - value = rdata.target.to_text() - - case "CAA": - value = { - "flags": rdata.flags, - "tag": rdata.tag, - "value": rdata.value, - } - - case "LOC": - value = { - "lat_direction": "N" if rdata.latitude[4] >= 0 else "S", - "lat_degrees": rdata.latitude[0], - "lat_minutes": rdata.latitude[1], - "lat_seconds": rdata.latitude[2] + rdata.latitude[3] / 1000, - "long_direction": "W" if rdata.latitude[4] >= 0 else "E", - "long_degrees": rdata.longitude[0], - "long_minutes": rdata.longitude[1], - "long_seconds": rdata.longitude[2] + rdata.longitude[3] / 1000, - "altitude": rdata.altitude / 100, - "size": rdata.size / 100, - "precision_horz": rdata.horizontal_precision / 100, - "precision_vert": rdata.veritical_precision / 100, - } - - case "MX": - value = { - "preference": rdata.preference, - "exchange": self._make_absolute(rdata.exchange.to_text()), - } - - case "NAPTR": - value = { - "order": rdata.order, - "preference": rdata.preference, - "flags": rdata.flags, - "service": rdata.service, - "regexp": rdata.regexp, - "replacement": rdata.replacement.to_text(), - } - - case "SSHFP": - value = { - "algorithm": rdata.algorithm, - "fingerprint_type": rdata.fp_type, - "fingerprint": rdata.fingerprint, - } - - case "SOA": - self.log.debug("SOA") - continue - - case "SPF" | "TXT": - value = rcd_value.replace(";", r"\;") - - case "SRV": - value = { - "priority": rdata.priority, - "weight": rdata.weight, - "port": rdata.port, - "target": self._make_absolute(rdata.target.to_text()), - } - - case _: - raise ValueError - - if (rcd_name, nb_record.type) not in records: - records[(rcd_name, nb_record.type)] = data - records[(rcd_name, nb_record.type)]["values"].append(value) - - for data in records.values(): - if len(data["values"]) == 1: - data["value"] = data.pop("values")[0] + records = self._format_nb_records(zone) + for data in records: record = octodns.record.Record.new( zone=zone, name=data["name"], @@ -257,4 +290,4 @@ class NetBoxDNSSource(octodns.source.base.BaseSource): ) zone.add_record(record, lenient=lenient, replace=self.replace_duplicates) - self.log.info(f"populate: found {len(zone.records)} records for zone {zone.name}") + self.log.info(f"populate -> found {len(zone.records)} records for zone {zone.name}")