refactor: bash+jq instead of python closure

This commit is contained in:
David 2024-05-10 02:52:31 +02:00
parent 8c9cab8c44
commit 2881477e1b
No known key found for this signature in database
GPG key ID: AB15A6AF1101390D
3 changed files with 124 additions and 120 deletions

View file

@ -1,7 +1,7 @@
{ config, lib, modulesPath, pkgs, ... }: { config, lib, modulesPath, pkgs, ... }:
let let
restore-network = pkgs.writers.writePython3 "restore-network" { flakeIgnore = [ "E501" ]; }
./restore_routes.py; restore-network = pkgs.writers.writeBash "restore-network" ./restore_routes.sh;
# does not link with iptables enabled # does not link with iptables enabled
iprouteStatic = pkgs.pkgsStatic.iproute2.override { iptables = null; }; iprouteStatic = pkgs.pkgsStatic.iproute2.override { iptables = null; };
@ -56,6 +56,7 @@ in
environment.etc.is_kexec.text = "true"; environment.etc.is_kexec.text = "true";
systemd.services.restore-network = { systemd.services.restore-network = {
path = [pkgs.jq];
before = [ "network-pre.target" ]; before = [ "network-pre.target" ];
wants = [ "network-pre.target" ]; wants = [ "network-pre.target" ];
wantedBy = [ "multi-user.target" ]; wantedBy = [ "multi-user.target" ];

View file

@ -1,118 +0,0 @@
import json
import sys
from pathlib import Path
from typing import Any
def filter_interfaces(network: list[dict[str, Any]]) -> list[dict[str, Any]]:
output = []
for net in network:
if net.get("link_type") == "loopback":
continue
if not net.get("address"):
# We need a mac address to match devices reliable
continue
addr_info = []
has_dynamic_address = False
for addr in net.get("addr_info", []):
# no link-local ipv4/ipv6
if addr.get("scope") == "link":
continue
# do not explicitly configure addresses from dhcp or router advertisement
if addr.get("dynamic", False):
has_dynamic_address = True
continue
else:
addr_info.append(addr)
if addr_info != [] or has_dynamic_address:
net["addr_info"] = addr_info
output.append(net)
return output
def filter_routes(routes: list[dict[str, Any]]) -> list[dict[str, Any]]:
filtered = []
for route in routes:
# Filter out routes set by addresses with subnets, dhcp and router advertisement
if route.get("protocol") in ["dhcp", "kernel", "ra"]:
continue
filtered.append(route)
return filtered
def generate_networkd_units(
interfaces: list[dict[str, Any]], routes: list[dict[str, Any]], directory: Path
) -> None:
directory.mkdir(exist_ok=True)
for interface in interfaces:
name = f"00-{interface['ifname']}.network"
addresses = [
f"Address = {addr['local']}/{addr['prefixlen']}"
for addr in interface.get("addr_info", [])
]
route_sections = []
for route in routes:
if route.get("dev", "nodev") != interface.get("ifname", "noif"):
continue
route_section = "[Route]\n"
if route.get("dst") != "default":
# can be skipped for default routes
route_section += f"Destination = {route['dst']}\n"
gateway = route.get("gateway")
if gateway:
route_section += f"Gateway = {gateway}\n"
# we may ignore on-link default routes here, but I don't see how
# they would be useful for internet connectivity anyway
route_sections.append(route_section)
# FIXME in some networks we might not want to trust dhcp or router advertisements
unit = f"""
[Match]
MACAddress = {interface["address"]}
[Network]
# both ipv4 and ipv6
DHCP = yes
# lets us discover the switch port we're connected to
LLDP = yes
# ipv6 router advertisements
IPv6AcceptRA = yes
# allows us to ping "nixos.local"
MulticastDNS = yes
"""
unit += "\n".join(addresses)
unit += "\n" + "\n".join(route_sections)
(directory / name).write_text(unit)
def main() -> None:
if len(sys.argv) < 5:
print(
f"USAGE: {sys.argv[0]} addresses routes-v4 routes-v6 networkd-directory",
file=sys.stderr,
)
sys.exit(1)
with open(sys.argv[1]) as f:
addresses = json.load(f)
with open(sys.argv[2]) as f:
v4_routes = json.load(f)
with open(sys.argv[3]) as f:
v6_routes = json.load(f)
networkd_directory = Path(sys.argv[4])
relevant_interfaces = filter_interfaces(addresses)
relevant_routes = filter_routes(v4_routes) + filter_routes(v6_routes)
generate_networkd_units(relevant_interfaces, relevant_routes, networkd_directory)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,121 @@
#!/usr/bin/env bash
# filter_interfaces function
filter_interfaces() {
# This function takes a list of network interfaces as input and filters
# out loopback interfaces, interfaces without a MAC address, and addresses
# with a "link" scope or marked as dynamic (from DHCP or router
# advertisements). The filtered interfaces are returned as an array.
local network=("$@")
for net in "${network[@]}"; do
local link_type="$(jq -r '.link_type' <<< "$net")"
local address="$(jq -r '.address // ""' <<< "$net")"
local addr_info="$(jq -r '.addr_info | map(select(.scope != "link" and (.dynamic | not)))' <<< "$net")"
local has_dynamic_address=$(jq -r '.addr_info | any(.dynamic)' <<< "$net")
# echo "Link Type: $link_type -- Address: $address -- Has Dynamic Address: $has_dynamic_address -- Addr Info: $addr_info"
if [[ "$link_type" != "loopback" && -n "$address" && ("$addr_info" != "[]" || "$has_dynamic_address" == "true") ]]; then
net=$(jq -c --argjson addr_info "$addr_info" '.addr_info = $addr_info' <<< "$net")
echo "$net" # "return"
fi
done
}
# filter_routes function
filter_routes() {
# This function takes a list of routes as input and filters out routes
# with protocols "dhcp", "kernel", or "ra". The filtered routes are
# returned as an array.
local routes=("$@")
for route in "${routes[@]}"; do
local protocol=$(jq -r '.protocol' <<< "$route")
if [[ $protocol != "dhcp" && $protocol != "kernel" && $protocol != "ra" ]]; then
echo "$route" # "return"
fi
done
}
# generate_networkd_units function
generate_networkd_units() {
# This function takes the filtered interfaces and routes, along with a
# directory path. It generates systemd-networkd unit files for each interface,
# including the configured addresses and routes. The unit files are written
# to the specified directory with the naming convention 00-<ifname>.network.
local -n interfaces=$1
local -n routes=$2
local directory="$3"
mkdir -p "$directory"
for interface in "${interfaces[@]}"; do
local ifname=$(jq -r '.ifname' <<< "$interface")
local address=$(jq -r '.address' <<< "$interface")
local addresses=$(jq -r '.addr_info | map("Address = \(.local)/\(.prefixlen)") | join("\n")' <<< "$interface")
local route_sections=()
for route in "${routes[@]}"; do
local dev=$(jq -r '.dev' <<< "$route")
if [[ $dev == $ifname ]]; then
local route_section="[Route]"
local dst=$(jq -r '.dst' <<< "$route")
if [[ $dst != "default" ]]; then
route_section+="\nDestination = $dst"
fi
local gateway=$(jq -r '.gateway // ""' <<< "$route")
if [[ -n $gateway ]]; then
route_section+="\nGateway = $gateway"
fi
route_sections+=("$route_section")
fi
done
local unit=$(cat <<-EOF
[Match]
MACAddress = $address
[Network]
DHCP = yes
LLDP = yes
IPv6AcceptRA = yes
MulticastDNS = yes
$addresses
$(printf '%s\n' "${route_sections[@]}")
EOF
)
echo -e "$unit" > "$directory/00-$ifname.network"
done
}
# main function
main() {
if [[ $# -lt 4 ]]; then
echo "USAGE: $0 addresses routes-v4 routes-v6 networkd-directory" >&2
# exit 1
return 1
fi
local addresses
readarray -t addresses < <(jq -c '.[]' "$1") # Read JSON data into array
local v4_routes
readarray -t v4_routes < <(jq -c '.[]' "$2")
local v6_routes
readarray -t v6_routes < <(jq -c '.[]' "$3")
local networkd_directory="$4"
local relevant_interfaces
readarray -t relevant_interfaces < <(filter_interfaces "${addresses[@]}")
local relevant_routes
readarray -t relevant_routes < <(filter_routes "${v4_routes[@]}" "${v6_routes[@]}")
generate_networkd_units relevant_interfaces relevant_routes "$networkd_directory"
}
main "$@"