2022-11-24 18:30:37 +01:00
|
|
|
import json
|
|
|
|
import sys
|
2022-11-27 17:06:17 +01:00
|
|
|
from pathlib import Path
|
2024-09-03 09:04:47 +02:00
|
|
|
from typing import Any, Iterator
|
2024-09-03 08:57:21 +02:00
|
|
|
from dataclasses import dataclass
|
2022-11-24 18:30:37 +01:00
|
|
|
|
|
|
|
|
2024-09-03 09:25:50 +02:00
|
|
|
@dataclass
|
|
|
|
class Address:
|
|
|
|
address: str
|
|
|
|
family: str
|
|
|
|
prefixlen: int
|
|
|
|
preferred_life_time: int = 0
|
|
|
|
valid_life_time: int = 0
|
|
|
|
|
|
|
|
|
2024-09-03 08:57:21 +02:00
|
|
|
@dataclass
|
|
|
|
class Interface:
|
|
|
|
name: str
|
|
|
|
ifname: str | None
|
|
|
|
mac_address: str
|
2024-09-03 09:25:50 +02:00
|
|
|
dynamic_addresses: list[Address]
|
|
|
|
static_addresses: list[Address]
|
2024-09-03 08:57:21 +02:00
|
|
|
static_routes: list[dict[str, Any]]
|
|
|
|
|
|
|
|
|
|
|
|
def filter_interfaces(network: list[dict[str, Any]]) -> list[Interface]:
|
|
|
|
interfaces = []
|
2022-11-24 18:30:37 +01:00
|
|
|
for net in network:
|
2022-11-27 17:06:17 +01:00
|
|
|
if net.get("link_type") == "loopback":
|
|
|
|
continue
|
2024-09-03 08:57:21 +02:00
|
|
|
if not (mac_address := net.get("address")):
|
2022-11-27 17:06:17 +01:00
|
|
|
# We need a mac address to match devices reliable
|
2022-11-24 18:30:37 +01:00
|
|
|
continue
|
2024-09-03 08:57:21 +02:00
|
|
|
static_addresses = []
|
|
|
|
dynamic_addresses = []
|
2024-09-03 09:25:50 +02:00
|
|
|
for info in net.get("addr_info", []):
|
2022-11-27 17:06:17 +01:00
|
|
|
# no link-local ipv4/ipv6
|
2024-09-03 09:25:50 +02:00
|
|
|
if info.get("scope") == "link":
|
|
|
|
continue
|
|
|
|
if (preferred_life_time := info.get("preferred_life_time")) is None:
|
|
|
|
continue
|
|
|
|
if (valid_life_time := info.get("valid_life_time")) is None:
|
|
|
|
continue
|
|
|
|
if (prefixlen := info.get("prefixlen")) is None:
|
|
|
|
continue
|
|
|
|
if (family := info.get("family")) not in ["inet", "inet6"]:
|
|
|
|
continue
|
|
|
|
if (local := info.get("local")) is None:
|
|
|
|
continue
|
|
|
|
if (dynamic := info.get("dynamic", False)) is None:
|
2022-11-27 17:06:17 +01:00
|
|
|
continue
|
2024-09-03 09:25:50 +02:00
|
|
|
|
|
|
|
address = Address(
|
|
|
|
address=local,
|
|
|
|
family=family,
|
|
|
|
prefixlen=prefixlen,
|
|
|
|
preferred_life_time=preferred_life_time,
|
|
|
|
valid_life_time=valid_life_time,
|
|
|
|
)
|
|
|
|
|
|
|
|
if dynamic:
|
|
|
|
dynamic_addresses.append(address)
|
2022-11-24 18:30:37 +01:00
|
|
|
else:
|
2024-09-03 09:25:50 +02:00
|
|
|
static_addresses.append(address)
|
2024-09-03 08:57:21 +02:00
|
|
|
interfaces.append(
|
|
|
|
Interface(
|
|
|
|
name=net.get("ifname", mac_address.replace(":", "-")),
|
|
|
|
ifname=net.get("ifname"),
|
|
|
|
mac_address=mac_address,
|
|
|
|
dynamic_addresses=dynamic_addresses,
|
|
|
|
static_addresses=static_addresses,
|
|
|
|
static_routes=[],
|
|
|
|
)
|
|
|
|
)
|
2022-11-24 18:30:37 +01:00
|
|
|
|
2024-09-03 08:57:21 +02:00
|
|
|
return interfaces
|
2022-11-24 18:30:37 +01:00
|
|
|
|
|
|
|
|
2022-11-27 17:06:17 +01:00
|
|
|
def filter_routes(routes: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
|
|
filtered = []
|
|
|
|
for route in routes:
|
2023-10-16 23:22:54 +02:00
|
|
|
# Filter out routes set by addresses with subnets, dhcp and router advertisement
|
2022-11-27 17:06:17 +01:00
|
|
|
if route.get("protocol") in ["dhcp", "kernel", "ra"]:
|
|
|
|
continue
|
|
|
|
filtered.append(route)
|
|
|
|
|
|
|
|
return filtered
|
|
|
|
|
|
|
|
|
2024-09-03 09:25:50 +02:00
|
|
|
def find_most_recent_v4_lease(addresses: list[Address]) -> Address | None:
|
|
|
|
most_recent_address = None
|
|
|
|
most_recent_lifetime = -1
|
|
|
|
for addr in addresses:
|
|
|
|
if addr.family == "inet6":
|
|
|
|
continue
|
|
|
|
lifetime = max(addr.preferred_life_time, addr.valid_life_time)
|
|
|
|
if lifetime > most_recent_lifetime:
|
|
|
|
most_recent_lifetime = lifetime
|
|
|
|
most_recent_address = addr
|
|
|
|
return most_recent_address
|
|
|
|
|
|
|
|
|
2024-09-03 09:04:47 +02:00
|
|
|
def generate_routes(
|
|
|
|
interface: Interface, routes: list[dict[str, Any]]
|
|
|
|
) -> Iterator[str]:
|
2024-09-03 08:57:21 +02:00
|
|
|
for route in routes:
|
|
|
|
if interface.ifname is None or route.get("dev") != interface.ifname:
|
|
|
|
continue
|
|
|
|
|
2024-09-03 09:04:47 +02:00
|
|
|
# we may ignore on-link default routes here, but I don't see how
|
|
|
|
# they would be useful for internet connectivity anyway
|
|
|
|
|
|
|
|
yield "[Route]"
|
2024-09-03 08:57:21 +02:00
|
|
|
if route.get("dst") != "default":
|
|
|
|
# can be skipped for default routes
|
2024-09-03 09:04:47 +02:00
|
|
|
yield f"Destination = {route['dst']}"
|
2024-09-03 08:57:21 +02:00
|
|
|
gateway = route.get("gateway")
|
|
|
|
# route v4 via v6
|
|
|
|
route_via = route.get("via")
|
|
|
|
if route_via and route_via.get("family") == "inet6":
|
|
|
|
gateway = route_via.get("host")
|
|
|
|
if route.get("dst") == "default":
|
2024-09-03 09:04:47 +02:00
|
|
|
yield "Destination = 0.0.0.0/0"
|
2024-09-03 08:57:21 +02:00
|
|
|
if gateway:
|
2024-09-03 09:04:47 +02:00
|
|
|
yield f"Gateway = {gateway}"
|
2024-09-03 08:57:21 +02:00
|
|
|
|
|
|
|
|
2022-11-27 17:06:17 +01:00
|
|
|
def generate_networkd_units(
|
2024-09-03 08:57:21 +02:00
|
|
|
interfaces: list[Interface], routes: list[dict[str, Any]], directory: Path
|
2022-11-27 17:06:17 +01:00
|
|
|
) -> None:
|
|
|
|
directory.mkdir(exist_ok=True)
|
|
|
|
for interface in interfaces:
|
2023-10-16 23:22:54 +02:00
|
|
|
# FIXME in some networks we might not want to trust dhcp or router advertisements
|
2024-09-03 09:04:47 +02:00
|
|
|
unit_sections = [
|
|
|
|
f"""
|
2022-11-27 17:06:17 +01:00
|
|
|
[Match]
|
2024-09-03 08:57:21 +02:00
|
|
|
MACAddress = {interface.mac_address}
|
2022-11-27 17:06:17 +01:00
|
|
|
|
|
|
|
[Network]
|
2023-11-02 10:43:08 +01:00
|
|
|
# both ipv4 and ipv6
|
2022-11-27 17:06:17 +01:00
|
|
|
DHCP = yes
|
2023-11-02 10:43:08 +01:00
|
|
|
# lets us discover the switch port we're connected to
|
|
|
|
LLDP = yes
|
|
|
|
# ipv6 router advertisements
|
2022-11-27 17:06:17 +01:00
|
|
|
IPv6AcceptRA = yes
|
2023-11-02 10:43:08 +01:00
|
|
|
# allows us to ping "nixos.local"
|
2024-09-03 09:25:50 +02:00
|
|
|
MulticastDNS = yes"""
|
2024-09-03 09:04:47 +02:00
|
|
|
]
|
|
|
|
unit_sections.extend(
|
2024-09-03 09:25:50 +02:00
|
|
|
f"Address = {addr.address}/{addr.prefixlen}"
|
2024-09-03 09:04:47 +02:00
|
|
|
for addr in interface.static_addresses
|
2024-09-03 08:57:21 +02:00
|
|
|
)
|
2024-09-03 09:04:47 +02:00
|
|
|
unit_sections.extend(generate_routes(interface, routes))
|
2024-09-03 09:25:50 +02:00
|
|
|
most_recent_v4_lease = find_most_recent_v4_lease(interface.dynamic_addresses)
|
|
|
|
if most_recent_v4_lease:
|
|
|
|
unit_sections.append("[DHCPv4]")
|
|
|
|
unit_sections.append(f"RequestAddress = {most_recent_v4_lease.address}")
|
2024-09-03 09:04:47 +02:00
|
|
|
|
2024-09-03 09:25:50 +02:00
|
|
|
# trailing newline at the end
|
|
|
|
unit_sections.append("")
|
|
|
|
|
|
|
|
(directory / f"00-{interface.name}.network").write_text(
|
|
|
|
"\n".join(unit_sections)
|
|
|
|
)
|
2022-11-27 17:06:17 +01:00
|
|
|
|
|
|
|
|
2022-11-25 09:50:03 +01:00
|
|
|
def main() -> None:
|
2022-11-27 19:56:24 +01:00
|
|
|
if len(sys.argv) < 5:
|
2022-11-27 17:06:17 +01:00
|
|
|
print(
|
2022-11-27 19:56:24 +01:00
|
|
|
f"USAGE: {sys.argv[0]} addresses routes-v4 routes-v6 networkd-directory",
|
2022-11-27 17:06:17 +01:00
|
|
|
file=sys.stderr,
|
|
|
|
)
|
2022-11-25 09:50:03 +01:00
|
|
|
sys.exit(1)
|
|
|
|
|
2022-11-24 18:30:37 +01:00
|
|
|
with open(sys.argv[1]) as f:
|
2022-11-27 17:55:14 +01:00
|
|
|
addresses = json.load(f)
|
2022-11-24 18:30:37 +01:00
|
|
|
with open(sys.argv[2]) as f:
|
2022-11-27 17:06:17 +01:00
|
|
|
v4_routes = json.load(f)
|
2022-11-27 17:55:14 +01:00
|
|
|
with open(sys.argv[3]) as f:
|
2022-11-27 17:06:17 +01:00
|
|
|
v6_routes = json.load(f)
|
|
|
|
|
2022-11-27 19:56:24 +01:00
|
|
|
networkd_directory = Path(sys.argv[4])
|
2022-11-27 17:06:17 +01:00
|
|
|
|
2022-11-24 18:30:37 +01:00
|
|
|
relevant_interfaces = filter_interfaces(addresses)
|
2022-11-27 17:06:17 +01:00
|
|
|
relevant_routes = filter_routes(v4_routes) + filter_routes(v6_routes)
|
|
|
|
|
|
|
|
generate_networkd_units(relevant_interfaces, relevant_routes, networkd_directory)
|
2022-11-24 18:30:37 +01:00
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
main()
|