nixos-images/nix/kexec-installer/restore_routes.py

193 lines
5.8 KiB
Python
Raw Permalink Normal View History

2022-11-24 18:30:37 +01:00
import json
import sys
from pathlib import Path
from typing import Any, Iterator
from dataclasses import dataclass
2022-11-24 18:30:37 +01:00
2024-09-03 09:25:50 +02:00
@dataclass
class Address:
address: str
family: str
prefixlen: int
preferred_life_time: int = 0
valid_life_time: int = 0
@dataclass
class Interface:
name: str
ifname: str | None
mac_address: str
2024-09-03 09:25:50 +02:00
dynamic_addresses: list[Address]
static_addresses: list[Address]
static_routes: list[dict[str, Any]]
def filter_interfaces(network: list[dict[str, Any]]) -> list[Interface]:
interfaces = []
2022-11-24 18:30:37 +01:00
for net in network:
if net.get("link_type") == "loopback":
continue
if not (mac_address := net.get("address")):
# We need a mac address to match devices reliable
2022-11-24 18:30:37 +01:00
continue
static_addresses = []
dynamic_addresses = []
2024-09-03 09:25:50 +02:00
for info in net.get("addr_info", []):
# no link-local ipv4/ipv6
2024-09-03 09:25:50 +02:00
if info.get("scope") == "link":
continue
if (preferred_life_time := info.get("preferred_life_time")) is None:
continue
if (valid_life_time := info.get("valid_life_time")) is None:
continue
if (prefixlen := info.get("prefixlen")) is None:
continue
if (family := info.get("family")) not in ["inet", "inet6"]:
continue
if (local := info.get("local")) is None:
continue
if (dynamic := info.get("dynamic", False)) is None:
continue
2024-09-03 09:25:50 +02:00
address = Address(
address=local,
family=family,
prefixlen=prefixlen,
preferred_life_time=preferred_life_time,
valid_life_time=valid_life_time,
)
if dynamic:
dynamic_addresses.append(address)
2022-11-24 18:30:37 +01:00
else:
2024-09-03 09:25:50 +02:00
static_addresses.append(address)
interfaces.append(
Interface(
name=net.get("ifname", mac_address.replace(":", "-")),
ifname=net.get("ifname"),
mac_address=mac_address,
dynamic_addresses=dynamic_addresses,
static_addresses=static_addresses,
static_routes=[],
)
)
2022-11-24 18:30:37 +01:00
return interfaces
2022-11-24 18:30:37 +01:00
def filter_routes(routes: list[dict[str, Any]]) -> list[dict[str, Any]]:
filtered = []
for route in routes:
2023-10-16 23:22:54 +02:00
# Filter out routes set by addresses with subnets, dhcp and router advertisement
if route.get("protocol") in ["dhcp", "kernel", "ra"]:
continue
filtered.append(route)
return filtered
2024-09-03 09:25:50 +02:00
def find_most_recent_v4_lease(addresses: list[Address]) -> Address | None:
most_recent_address = None
most_recent_lifetime = -1
for addr in addresses:
if addr.family == "inet6":
continue
lifetime = max(addr.preferred_life_time, addr.valid_life_time)
if lifetime > most_recent_lifetime:
most_recent_lifetime = lifetime
most_recent_address = addr
return most_recent_address
def generate_routes(
interface: Interface, routes: list[dict[str, Any]]
) -> Iterator[str]:
for route in routes:
if interface.ifname is None or route.get("dev") != interface.ifname:
continue
# we may ignore on-link default routes here, but I don't see how
# they would be useful for internet connectivity anyway
yield "[Route]"
if route.get("dst") != "default":
# can be skipped for default routes
yield f"Destination = {route['dst']}"
gateway = route.get("gateway")
# route v4 via v6
route_via = route.get("via")
if route_via and route_via.get("family") == "inet6":
gateway = route_via.get("host")
if route.get("dst") == "default":
yield "Destination = 0.0.0.0/0"
if gateway:
yield f"Gateway = {gateway}"
def generate_networkd_units(
interfaces: list[Interface], routes: list[dict[str, Any]], directory: Path
) -> None:
directory.mkdir(exist_ok=True)
for interface in interfaces:
2023-10-16 23:22:54 +02:00
# FIXME in some networks we might not want to trust dhcp or router advertisements
unit_sections = [
f"""
[Match]
MACAddress = {interface.mac_address}
[Network]
# both ipv4 and ipv6
DHCP = yes
# lets us discover the switch port we're connected to
LLDP = yes
# ipv6 router advertisements
IPv6AcceptRA = yes
# allows us to ping "nixos.local"
2024-09-03 09:25:50 +02:00
MulticastDNS = yes"""
]
unit_sections.extend(
2024-09-03 09:25:50 +02:00
f"Address = {addr.address}/{addr.prefixlen}"
for addr in interface.static_addresses
)
unit_sections.extend(generate_routes(interface, routes))
2024-09-03 09:25:50 +02:00
most_recent_v4_lease = find_most_recent_v4_lease(interface.dynamic_addresses)
if most_recent_v4_lease:
unit_sections.append("[DHCPv4]")
unit_sections.append(f"RequestAddress = {most_recent_v4_lease.address}")
2024-09-03 09:25:50 +02:00
# trailing newline at the end
unit_sections.append("")
(directory / f"00-{interface.name}.network").write_text(
"\n".join(unit_sections)
)
2022-11-25 09:50:03 +01:00
def main() -> None:
2022-11-27 19:56:24 +01:00
if len(sys.argv) < 5:
print(
2022-11-27 19:56:24 +01:00
f"USAGE: {sys.argv[0]} addresses routes-v4 routes-v6 networkd-directory",
file=sys.stderr,
)
2022-11-25 09:50:03 +01:00
sys.exit(1)
2022-11-24 18:30:37 +01:00
with open(sys.argv[1]) as f:
addresses = json.load(f)
2022-11-24 18:30:37 +01:00
with open(sys.argv[2]) as f:
v4_routes = json.load(f)
with open(sys.argv[3]) as f:
v6_routes = json.load(f)
2022-11-27 19:56:24 +01:00
networkd_directory = Path(sys.argv[4])
2022-11-24 18:30:37 +01:00
relevant_interfaces = filter_interfaces(addresses)
relevant_routes = filter_routes(v4_routes) + filter_routes(v6_routes)
generate_networkd_units(relevant_interfaces, relevant_routes, networkd_directory)
2022-11-24 18:30:37 +01:00
if __name__ == "__main__":
main()