Compare commits

...

4 commits

Author SHA1 Message Date
c692ff18d3 Bump version 0.2.1 → 0.3.0
Some checks failed
check code / check-code (push) Failing after 19s
update changelog / update-changelog (push) Failing after 4s
build and publish / build-pypackage (push) Has been cancelled
2024-01-25 11:39:30 +01:00
c95f95fb8b make populate function smaller 2024-01-25 11:39:22 +01:00
f7f54f8eb7 add docstrings and refactor make absolute function 2024-01-25 10:51:42 +01:00
ff3dd03ce5 add new workflows 2024-01-25 10:25:37 +01:00
7 changed files with 213 additions and 125 deletions

View file

@ -0,0 +1,16 @@
name: build and publish
on:
push:
tags:
- "v*.*.*"
pull_request:
branches: [main, master]
jobs:
build-pypackage:
uses: actions/workflows/.gitea/workflows/build_pypackage.yml@master
secrets:
username: __token__
token: ${{ secrets.PACKAGE_TOKEN }}

View file

@ -0,0 +1,10 @@
name: update changelog
on:
push:
tags:
- "v*.*.*"
jobs:
update-changelog:
uses: actions/workflows/.gitea/workflows/update_changelog.yml@master

View file

@ -1,4 +1,4 @@
name: check/lint python code with hatch name: check code
on: on:
push: push:
@ -8,17 +8,7 @@ on:
branches: [main, master] branches: [main, master]
jobs: jobs:
check-python-hatch: check-code:
runs-on: python311 uses: actions/workflows/.gitea/workflows/check_python_hatch.yml@master
steps: with:
- name: checkout code run-tests: false
uses: actions/checkout@v3
- name: install hatch
run: pip install -U hatch
- name: test codestyle
run: hatch run lint:style
- name: test typing
run: hatch run lint:typing

View file

@ -31,6 +31,12 @@ providers:
## install ## install
### via pip
```bash
pip install octodns-netbox-dns
```
### via pip + git ### via pip + git
```bash ```bash

View file

@ -3,7 +3,7 @@ requires = ["hatchling>=1.18", "hatch-regex-commit>=0.0.3"]
build-backend = "hatchling.build" build-backend = "hatchling.build"
[project] [project]
name = "octodns_netbox_dns" name = "octodns-netbox-dns"
description = "octodns netbox-dns provider" description = "octodns netbox-dns provider"
readme = "README.md" readme = "README.md"
license = "MIT" license = "MIT"

View file

@ -1 +1 @@
__version__ = "0.2.1" __version__ = "0.3.0"

View file

@ -1,5 +1,5 @@
import logging import logging
from typing import Literal from typing import Any, Literal
import dns.rdata import dns.rdata
import octodns.record import octodns.record
@ -16,7 +16,7 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
SUPPORTS_GEO = False SUPPORTS_GEO = False
SUPPORTS_DYNAMIC = False SUPPORTS_DYNAMIC = False
SUPPORTS: set[str] = { SUPPORTS: set[str] = { # noqa
"A", "A",
"AAAA", "AAAA",
"AFSDB", "AFSDB",
@ -49,7 +49,7 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
def __init__( def __init__(
self, self,
id: int, id: int, # noqa
url: str, url: str,
token: str, token: str,
view: str | None | Literal[False] = False, view: str | None | Literal[False] = False,
@ -63,29 +63,49 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
self.log = logging.getLogger(f"NetboxDNSSource[{id}]") self.log = logging.getLogger(f"NetboxDNSSource[{id}]")
self.log.debug(f"__init__: {id=}, {url=}, {view=}, {replace_duplicates=}, {make_absolute=}") self.log.debug(f"__init__: {id=}, {url=}, {view=}, {replace_duplicates=}, {make_absolute=}")
super().__init__(id) super().__init__(id)
self._api = pynetbox.core.api.Api(url, token)
self._nb_view = self._get_view(view) self.api = pynetbox.core.api.Api(url, token)
self._ttl = ttl self.nb_view = self._get_nb_view(view)
self.ttl = ttl
self.replace_duplicates = replace_duplicates self.replace_duplicates = replace_duplicates
self.make_absolute = make_absolute self.make_absolute = make_absolute
def _make_absolute(self, value: str) -> str: def _make_absolute(self, value: str) -> str:
if not self.make_absolute or value[-1] == ".": """
return value Return dns name with trailing dot to make it absolute
return value + "."
def _get_view(self, view: str | None | Literal[False]) -> dict[str, int | str]: @param value: dns record value
@return: absolute dns record value
"""
if not self.make_absolute or value.endswith("."):
return value
absolute_value = value + "."
self.log.debug(f"relative={value}, absolute={absolute_value}")
return absolute_value
def _get_nb_view(self, view: str | None | Literal[False]) -> dict[str, int | str]:
"""
Get the correct netbox view when requested
@param view: `False` for no view, `None` for zones without a view, else the view name
@return: the netbox view id in the netbox query format
"""
if view is False: if view is False:
return {} return {}
if view is None: if view is None:
return {"view": "null"} return {"view": "null"}
nb_view: pynetbox.core.response.Record = self._api.plugins.netbox_dns.views.get(name=view) nb_view: pynetbox.core.response.Record = self.api.plugins.netbox_dns.views.get(name=view)
if nb_view is None: if nb_view is None:
msg = f"dns view: '{view}' has not been found" msg = f"dns view={view}, has not been found"
self.log.error(msg) self.log.error(msg)
raise ValueError(msg) raise ValueError(msg)
self.log.debug(f"found {nb_view.name} {nb_view.id}")
self.log.debug(f"found view={nb_view.name}, id={nb_view.id}")
return {"view_id": nb_view.id} return {"view_id": nb_view.id}
@ -93,128 +113,174 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
""" """
Given a zone name and a view name, look it up in NetBox. Given a zone name and a view name, look it up in NetBox.
@param name: name of the dns zone
@param view: the netbox view id in the api query format
@raise pynetbox.RequestError: if declared view is not existent @raise pynetbox.RequestError: if declared view is not existent
@return: the netbox dns zone object
""" """
query_params = {"name": name[:-1], **view} query_params = {"name": name[:-1], **view}
nb_zone = self._api.plugins.netbox_dns.zones.get(**query_params) nb_zone = self.api.plugins.netbox_dns.zones.get(**query_params)
self.log.debug(f"found zone={nb_zone.name}, id={nb_zone.id}")
return nb_zone return nb_zone
def populate(self, zone: octodns.zone.Zone, target: bool = False, lenient: bool = False): def _format_rdata(self, rdata: dns.Rdata, raw_value: str) -> dict[str, Any]:
""" """
Get all the records of a zone from NetBox and add them to the OctoDNS zone. Format netbox record values to correct octodns record values
@param rdata: rrdata record value
@param raw_value: raw record value
@return: formatted rrdata value
""" """
self.log.debug(f"populate: name={zone.name}, target={target}, lenient={lenient}") match rdata.rdtype.name:
case "A" | "AAAA":
value = rdata.address
records = {} case "CNAME":
value = self._make_absolute(rdata.target.to_text())
nb_zone = self._get_nb_zone(zone.name, view=self._nb_view) case "DNAME" | "NS" | "PTR":
value = rdata.target.to_text()
case "CAA":
value = {
"flags": rdata.flags,
"tag": rdata.tag,
"value": rdata.value,
}
case "LOC":
value = {
"lat_direction": "N" if rdata.latitude[4] >= 0 else "S",
"lat_degrees": rdata.latitude[0],
"lat_minutes": rdata.latitude[1],
"lat_seconds": rdata.latitude[2] + rdata.latitude[3] / 1000,
"long_direction": "W" if rdata.latitude[4] >= 0 else "E",
"long_degrees": rdata.longitude[0],
"long_minutes": rdata.longitude[1],
"long_seconds": rdata.longitude[2] + rdata.longitude[3] / 1000,
"altitude": rdata.altitude / 100,
"size": rdata.size / 100,
"precision_horz": rdata.horizontal_precision / 100,
"precision_vert": rdata.veritical_precision / 100,
}
case "MX":
value = {
"preference": rdata.preference,
"exchange": self._make_absolute(rdata.exchange.to_text()),
}
case "NAPTR":
value = {
"order": rdata.order,
"preference": rdata.preference,
"flags": rdata.flags,
"service": rdata.service,
"regexp": rdata.regexp,
"replacement": rdata.replacement.to_text(),
}
case "SSHFP":
value = {
"algorithm": rdata.algorithm,
"fingerprint_type": rdata.fp_type,
"fingerprint": rdata.fingerprint,
}
case "SPF" | "TXT":
value = raw_value.replace(";", r"\;")
case "SRV":
value = {
"priority": rdata.priority,
"weight": rdata.weight,
"port": rdata.port,
"target": self._make_absolute(rdata.target.to_text()),
}
case "SOA":
self.log.warning("SOA record type not implemented")
raise NotImplementedError
case _:
self.log.error("invalid record type")
raise ValueError
self.log.debug(f"formatted record value={value}")
return value
def _format_nb_records(self, zone: octodns.zone.Zone) -> list[dict[str, Any]]:
"""
Format netbox dns records to the octodns format
@param zone: octodns zone
@return: a list of octodns compatible record dicts
"""
records: dict[tuple[str, str], dict[str, Any]] = {}
nb_zone = self._get_nb_zone(zone.name, view=self.nb_view)
if not nb_zone: if not nb_zone:
self.log.error(f"Zone '{zone.name[:-1]}' not found in view: '{self._nb_view}'") self.log.error(f"zone={zone.name}, not found in view={self.nb_view}")
raise LookupError raise LookupError
nb_records = self._api.plugins.netbox_dns.records.filter(zone_id=nb_zone.id) nb_records: pynetbox.core.response.RecordSet = self.api.plugins.netbox_dns.records.filter(
zone_id=nb_zone.id
)
for nb_record in nb_records: for nb_record in nb_records:
self.log.debug(f"{nb_record.name!r} {nb_record.type!r} {nb_record.value!r}") nb_record: pynetbox.core.response.Record
rcd_name: str = nb_record.name if nb_record.name != "@" else "" rcd_name: str = nb_record.name if nb_record.name != "@" else ""
rcd_value: str = nb_record.value if nb_record.value != "@" else nb_record.zone.name raw_value: str = nb_record.value if nb_record.value != "@" else nb_record.zone.name
rcd_type: str = nb_record.type
rcd_ttl: int = nb_record.ttl or nb_zone.default_ttl
if nb_record.type == "NS":
rcd_ttl = nb_zone.soa_refresh
if nb_record.ttl: rcd_data = {
nb_ttl = nb_record.ttl
elif nb_record.type == "NS":
nb_ttl = nb_zone.soa_refresh
else:
nb_ttl = nb_zone.default_ttl
data = {
"name": rcd_name, "name": rcd_name,
"type": nb_record.type, "type": rcd_type,
"ttl": nb_ttl, "ttl": rcd_ttl,
"values": [], "values": [],
} }
rdata = dns.rdata.from_text("IN", nb_record.type, rcd_value)
match rdata.rdtype.name:
case "A" | "AAAA":
value = rdata.address
case "CNAME": self.log.debug(f"record data={rcd_data}")
value = self._make_absolute(rdata.target.to_text())
case "DNAME" | "NS" | "PTR": rdata = dns.rdata.from_text("IN", nb_record.type, raw_value)
value = rdata.target.to_text() try:
rcd_value = self._format_rdata(rdata, raw_value)
except NotImplementedError:
continue
except Exception as exc:
raise exc
case "CAA": if (rcd_name, rcd_type) not in records:
value = { records[(rcd_name, rcd_type)] = rcd_data
"flags": rdata.flags,
"tag": rdata.tag,
"value": rdata.value,
}
case "LOC": records[(rcd_name, rcd_type)]["values"].append(rcd_value)
value = {
"lat_direction": "N" if rdata.latitude[4] >= 0 else "S",
"lat_degrees": rdata.latitude[0],
"lat_minutes": rdata.latitude[1],
"lat_seconds": rdata.latitude[2] + rdata.latitude[3] / 1000,
"long_direction": "W" if rdata.latitude[4] >= 0 else "E",
"long_degrees": rdata.longitude[0],
"long_minutes": rdata.longitude[1],
"long_seconds": rdata.longitude[2] + rdata.longitude[3] / 1000,
"altitude": rdata.altitude / 100,
"size": rdata.size / 100,
"precision_horz": rdata.horizontal_precision / 100,
"precision_vert": rdata.veritical_precision / 100,
}
case "MX": return list(records.values())
value = {
"preference": rdata.preference,
"exchange": self._make_absolute(rdata.exchange.to_text()),
}
case "NAPTR": def populate(
value = { self, zone: octodns.zone.Zone, target: bool = False, lenient: bool = False
"order": rdata.order, ) -> None:
"preference": rdata.preference, """
"flags": rdata.flags, Get all the records of a zone from NetBox and add them to the OctoDNS zone
"service": rdata.service,
"regexp": rdata.regexp,
"replacement": rdata.replacement.to_text(),
}
case "SSHFP": @param zone: octodns zone
value = { @param target: when `True`, load the current state of the provider.
"algorithm": rdata.algorithm, @param lenient: when `True`, skip record validation and do a "best effort" load of data.
"fingerprint_type": rdata.fp_type, """
"fingerprint": rdata.fingerprint, self.log.info(f"populate -> name={zone.name}, target={target}, lenient={lenient}")
}
case "SOA": records = self._format_nb_records(zone)
self.log.debug("SOA") for data in records:
continue
case "SPF" | "TXT":
value = rcd_value.replace(";", r"\;")
case "SRV":
value = {
"priority": rdata.priority,
"weight": rdata.weight,
"port": rdata.port,
"target": self._make_absolute(rdata.target.to_text()),
}
case _:
raise ValueError
if (rcd_name, nb_record.type) not in records:
records[(rcd_name, nb_record.type)] = data
records[(rcd_name, nb_record.type)]["values"].append(value)
for data in records.values():
if len(data["values"]) == 1:
data["value"] = data.pop("values")[0]
record = octodns.record.Record.new( record = octodns.record.Record.new(
zone=zone, zone=zone,
name=data["name"], name=data["name"],
@ -224,4 +290,4 @@ class NetBoxDNSSource(octodns.source.base.BaseSource):
) )
zone.add_record(record, lenient=lenient, replace=self.replace_duplicates) zone.add_record(record, lenient=lenient, replace=self.replace_duplicates)
self.log.info(f"populate: found {len(zone.records)} records for zone {zone.name}") self.log.info(f"populate -> found {len(zone.records)} records for zone {zone.name}")