restore-routes: introduce more type-safety and split up networkd unit generation

This commit is contained in:
Jörg Thalheim 2024-09-03 08:57:21 +02:00
parent d39dd6fb61
commit 73910674dc

View file

@ -2,33 +2,49 @@ import json
import sys
from pathlib import Path
from typing import Any
from dataclasses import dataclass
def filter_interfaces(network: list[dict[str, Any]]) -> list[dict[str, Any]]:
output = []
@dataclass
class Interface:
name: str
ifname: str | None
mac_address: str
dynamic_addresses: list[str]
static_addresses: list[dict[str, Any]]
static_routes: list[dict[str, Any]]
def filter_interfaces(network: list[dict[str, Any]]) -> list[Interface]:
interfaces = []
for net in network:
if net.get("link_type") == "loopback":
continue
if not net.get("address"):
if not (mac_address := net.get("address")):
# We need a mac address to match devices reliable
continue
addr_info = []
has_dynamic_address = False
static_addresses = []
dynamic_addresses = []
for addr in net.get("addr_info", []):
# no link-local ipv4/ipv6
if addr.get("scope") == "link":
continue
# do not explicitly configure addresses from dhcp or router advertisement
if addr.get("dynamic", False):
has_dynamic_address = True
continue
dynamic_addresses.append(addr["local"])
else:
addr_info.append(addr)
if addr_info != [] or has_dynamic_address:
net["addr_info"] = addr_info
output.append(net)
static_addresses.append(addr)
interfaces.append(
Interface(
name=net.get("ifname", mac_address.replace(":", "-")),
ifname=net.get("ifname"),
mac_address=mac_address,
dynamic_addresses=dynamic_addresses,
static_addresses=static_addresses,
static_routes=[],
)
)
return output
return interfaces
def filter_routes(routes: list[dict[str, Any]]) -> list[dict[str, Any]]:
@ -42,44 +58,43 @@ def filter_routes(routes: list[dict[str, Any]]) -> list[dict[str, Any]]:
return filtered
def generate_routes(interface: Interface, routes: list[dict[str, Any]]) -> list[str]:
route_sections = []
for route in routes:
if interface.ifname is None or route.get("dev") != interface.ifname:
continue
route_section = "[Route]\n"
if route.get("dst") != "default":
# can be skipped for default routes
route_section += f"Destination = {route['dst']}\n"
gateway = route.get("gateway")
# route v4 via v6
route_via = route.get("via")
if route_via and route_via.get("family") == "inet6":
gateway = route_via.get("host")
if route.get("dst") == "default":
route_section += "Destination = 0.0.0.0/0\n"
if gateway:
route_section += f"Gateway = {gateway}\n"
# we may ignore on-link default routes here, but I don't see how
# they would be useful for internet connectivity anyway
route_sections.append(route_section)
return route_sections
def generate_networkd_units(
interfaces: list[dict[str, Any]], routes: list[dict[str, Any]], directory: Path
interfaces: list[Interface], routes: list[dict[str, Any]], directory: Path
) -> None:
directory.mkdir(exist_ok=True)
for interface in interfaces:
name = f"00-{interface['ifname']}.network"
addresses = [
f"Address = {addr['local']}/{addr['prefixlen']}"
for addr in interface.get("addr_info", [])
]
route_sections = []
for route in routes:
if route.get("dev", "nodev") != interface.get("ifname", "noif"):
continue
route_section = "[Route]\n"
if route.get("dst") != "default":
# can be skipped for default routes
route_section += f"Destination = {route['dst']}\n"
gateway = route.get("gateway")
# route v4 via v6
route_via = route.get("via")
if route_via and route_via.get("family") == "inet6":
gateway = route_via.get("host")
if route.get("dst") == "default":
route_section += "Destination = 0.0.0.0/0\n"
if gateway:
route_section += f"Gateway = {gateway}\n"
# we may ignore on-link default routes here, but I don't see how
# they would be useful for internet connectivity anyway
route_sections.append(route_section)
name = f"00-{interface.name}.network"
# FIXME in some networks we might not want to trust dhcp or router advertisements
unit = f"""
[Match]
MACAddress = {interface["address"]}
MACAddress = {interface.mac_address}
[Network]
# both ipv4 and ipv6
@ -92,8 +107,13 @@ IPv6AcceptRA = yes
MulticastDNS = yes
"""
unit += "\n".join(addresses)
unit += "\n" + "\n".join(route_sections)
unit += "\n".join(
[
f"Address = {addr['local']}/{addr['prefixlen']}"
for addr in interface.static_addresses
]
)
unit += "\n" + "\n".join(generate_routes(interface, routes))
(directory / name).write_text(unit)