#!/usr/bin/env python3 """ Unified version.json updater (TUI-friendly core logic). Improvements: - Correctly merges base + variant variables and sources (component-wise deep merge) - Updates are written back into the correct dictionary: - Base: top-level spec["sources"][name] - Variant: spec["variants"][variant]["sources"][name] (created if missing) - Hash prefetch uses the merged view with rendered variables Supports: - Updating GitHub components to latest release tag, latest tag, or latest commit - Updating Git (fetchgit) components to latest commit on default branch - Recomputing SRI hash for url/urlTemplate, github tarballs, and fetchgit sources - Setting arbitrary fields (variables.* or sources.*.*) via --set path=value - Operating on a specific variant or the base (top-level) of a version.json Requirements: - nix-prefetch-url (or `nix prefetch-url`) and `nix hash to-sri` for URL hashing - nix-prefetch-git + `nix hash to-sri` for Git fetchers - Network access for GitHub API (optional GITHUB_TOKEN env var) Examples: scripts/update_versions.py --file packages/edk2/version.json --github-latest-release --prefetch scripts/update_versions.py --file packages/edk2/version.json --component edk2 --github-latest-commit --prefetch scripts/update_versions.py --file packages/uboot/version.json --url-prefetch scripts/update_versions.py --file packages/proton-cachyos/version.json --variant cachyos-v4 --set variables.base=10.0 scripts/update_versions.py --file packages/linux-cachyos/version.json --component zfs --git-latest --prefetch """ import argparse import json import os import re import subprocess import sys import urllib.request import urllib.error from typing import Any, Dict, List, Optional, Tuple Json = Dict[str, Any] def eprintln(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) def load_json(path: str) -> Json: with open(path, "r", encoding="utf-8") as f: return json.load(f) def save_json(path: str, data: Json): with open(path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) f.write("\n") def deep_get(o: Json, path: List[str], default=None): cur = o for p in path: if isinstance(cur, dict) and p in cur: cur = cur[p] else: return default return cur def deep_set(o: Json, path: List[str], value: Any): cur = o for p in path[:-1]: if p not in cur or not isinstance(cur[p], dict): cur[p] = {} cur = cur[p] cur[path[-1]] = value def parse_set_pair(pair: str) -> Tuple[List[str], str]: if "=" not in pair: raise ValueError(f"--set requires KEY=VALUE, got: {pair}") key, val = pair.split("=", 1) path = key.strip().split(".") return path, val def render_templates(value: Any, variables: Dict[str, Any]) -> Any: # Simple ${var} string replacement across strings/structures if isinstance(value, str): def repl(m): name = m.group(1) return str(variables.get(name, m.group(0))) return re.sub(r"\$\{([^}]+)\}", repl, value) elif isinstance(value, dict): return {k: render_templates(v, variables) for k, v in value.items()} elif isinstance(value, list): return [render_templates(v, variables) for v in value] return value def http_get_json(url: str, token: Optional[str] = None) -> Any: req = urllib.request.Request(url, headers={"Accept": "application/vnd.github+json"}) if token: req.add_header("Authorization", f"Bearer {token}") with urllib.request.urlopen(req) as resp: return json.loads(resp.read().decode("utf-8")) def github_latest_release_tag(owner: str, repo: str, token: Optional[str] = None) -> Optional[str]: url = f"https://api.github.com/repos/{owner}/{repo}/releases/latest" try: data = http_get_json(url, token) tag = data.get("tag_name") return tag except urllib.error.HTTPError as e: eprintln(f"GitHub latest release failed: {e}") return None def github_latest_tag(owner: str, repo: str, token: Optional[str] = None, tag_regex: Optional[str] = None) -> Optional[str]: url = f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=100" try: data = http_get_json(url, token) tags = [t.get("name") for t in data if "name" in t] if tag_regex: rx = re.compile(tag_regex) tags = [t for t in tags if rx.search(t)] return tags[0] if tags else None except urllib.error.HTTPError as e: eprintln(f"GitHub tags failed: {e}") return None def github_head_commit(owner: str, repo: str, token: Optional[str] = None) -> Optional[str]: # Prefer git ls-remote to avoid API limits url = f"https://github.com/{owner}/{repo}.git" try: out = subprocess.check_output(["git", "ls-remote", url, "HEAD"], text=True).strip() if out: sha = out.split()[0] return sha except Exception as e: eprintln(f"git ls-remote failed for {url}: {e}") return None def run_cmd_get_output(args: List[str]) -> str: eprintln(f"Running: {' '.join(args)}") return subprocess.check_output(args, text=True).strip() def nix_prefetch_url(url: str) -> Optional[str]: # returns SRI (sha256-...) base32 = None try: base32 = run_cmd_get_output(["nix-prefetch-url", "--type", "sha256", url]) except Exception: try: base32 = run_cmd_get_output(["nix", "prefetch-url", url]) except Exception as e: eprintln(f"Failed to prefetch url: {url}: {e}") return None try: sri = run_cmd_get_output(["nix", "hash", "to-sri", "--type", "sha256", base32]) return sri except Exception as e: eprintln(f"Failed to convert base32 to SRI: {e}") return None def github_tarball_url(owner: str, repo: str, ref: str) -> str: # codeload is stable for tarball return f"https://codeload.github.com/{owner}/{repo}/tar.gz/{ref}" def nix_prefetch_github_tarball(owner: str, repo: str, ref: str) -> Optional[str]: url = github_tarball_url(owner, repo, ref) return nix_prefetch_url(url) def nix_prefetch_git(url: str, rev: str) -> Optional[str]: # returns SRI try: out = run_cmd_get_output(["nix-prefetch-git", "--no-deepClone", "--rev", rev, url]) try: data = json.loads(out) base32 = data.get("sha256") or data.get("hash") except Exception: base32 = out.splitlines()[-1].strip() if not base32: eprintln(f"Could not parse nix-prefetch-git output for {url}@{rev}") return None sri = run_cmd_get_output(["nix", "hash", "to-sri", "--type", "sha256", base32]) return sri except Exception as e: eprintln(f"nix-prefetch-git failed for {url}@{rev}: {e}") return None # -------------------- Merging logic (match lib/versioning.nix) -------------------- def deep_merge(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]: out = dict(a) for k, v in b.items(): if k in out and isinstance(out[k], dict) and isinstance(v, dict): out[k] = deep_merge(out[k], v) else: out[k] = v return out def merge_sources(base_sources: Dict[str, Any], overrides: Dict[str, Any]) -> Dict[str, Any]: names = set(base_sources.keys()) | set(overrides.keys()) result: Dict[str, Any] = {} for n in names: if n in base_sources and n in overrides: if isinstance(base_sources[n], dict) and isinstance(overrides[n], dict): result[n] = deep_merge(base_sources[n], overrides[n]) else: result[n] = overrides[n] elif n in overrides: result[n] = overrides[n] else: result[n] = base_sources[n] return result def merged_view(spec: Json, variant: Optional[str]) -> Tuple[Dict[str, Any], Dict[str, Any], Json, List[str]]: """ Returns (merged_variables, merged_sources, target_dict_to_write, base_path) - merged_*: what to display/prefetch with - target_dict_to_write: where to write changes (base or variants[variant]) """ base_vars = spec.get("variables", {}) or {} base_sources = spec.get("sources", {}) or {} if variant: vdict = spec.get("variants", {}).get(variant) if not isinstance(vdict, dict): raise ValueError(f"Variant '{variant}' not found") v_vars = vdict.get("variables", {}) or {} v_sources = vdict.get("sources", {}) or {} merged_vars = dict(base_vars) merged_vars.update(v_vars) merged_srcs = merge_sources(base_sources, v_sources) return merged_vars, merged_srcs, vdict, ["variants", variant] else: return dict(base_vars), dict(base_sources), spec, [] # -------------------- Update operations -------------------- def update_components(spec: Json, variant: Optional[str], components: Optional[List[str]], args: argparse.Namespace) -> bool: changed = False gh_token = os.environ.get("GITHUB_TOKEN") merged_vars, merged_srcs, target_dict, base_path = merged_view(spec, variant) src_names = list(merged_srcs.keys()) if not components else [c for c in components if c in merged_srcs] # Ensure target_dict has a sources dict to write into target_sources = target_dict.setdefault("sources", {}) for name in src_names: view_comp = merged_srcs[name] fetcher = view_comp.get("fetcher", "none") # Ensure a writable component entry exists (always write to the selected target: base or variant override) comp = target_sources.setdefault(name, {}) if not isinstance(comp, dict): comp = target_sources[name] = {} if fetcher == "github": owner = view_comp.get("owner") repo = view_comp.get("repo") if not owner or not repo: eprintln(f"Component {name}: missing owner/repo for github fetcher") continue new_ref = None ref_kind = None if args.github_latest_release: tag = github_latest_release_tag(owner, repo, gh_token) if tag: new_ref = tag ref_kind = "tag" elif args.github_latest_tag: tag = github_latest_tag(owner, repo, gh_token, args.tag_regex) if tag: new_ref = tag ref_kind = "tag" elif args.github_latest_commit: rev = github_head_commit(owner, repo, gh_token) if rev: new_ref = rev ref_kind = "rev" if new_ref: if ref_kind == "tag": comp["tag"] = new_ref if "rev" in comp: del comp["rev"] else: comp["rev"] = new_ref if "tag" in comp: del comp["tag"] eprintln(f"Component {name}: set {ref_kind}={new_ref}") changed = True if args.prefetch: ref = comp.get("tag") or comp.get("rev") if not ref: # fallback to merged view if not in override ref = view_comp.get("tag") or view_comp.get("rev") if ref: sri = nix_prefetch_github_tarball(owner, repo, ref) if sri: comp["hash"] = sri eprintln(f"Component {name}: updated hash={sri}") changed = True elif fetcher == "git": url = view_comp.get("url") if not url: eprintln(f"Component {name}: missing url for git fetcher") continue if args.git_latest: rev = github_head_commit(owner="", repo="", token=None) # placeholder; we will ls-remote below try: out = subprocess.check_output(["git", "ls-remote", url, "HEAD"], text=True).strip() if out: new_rev = out.split()[0] comp["rev"] = new_rev eprintln(f"Component {name}: set rev={new_rev}") changed = True if args.prefetch: sri = nix_prefetch_git(url, new_rev) if sri: comp["hash"] = sri eprintln(f"Component {name}: updated hash={sri}") changed = True except Exception as e: eprintln(f"git ls-remote failed for {name}: {e}") elif fetcher == "url": if args.url_prefetch or args.prefetch: rendered_comp = render_templates(view_comp, merged_vars) url = rendered_comp.get("url") or rendered_comp.get("urlTemplate") if not url: eprintln(f"Component {name}: missing url/urlTemplate for url fetcher") else: sri = nix_prefetch_url(url) if sri: comp["hash"] = sri eprintln(f"Component {name}: updated hash={sri}") changed = True elif fetcher == "pypi": if args.prefetch: eprintln(f"Component {name} (pypi): prefetch not implemented; use nix-prefetch-pypi or set hash manually.") else: # fetcher == "none" or other: no-op unless user --set a value pass return changed # -------------------- Main -------------------- def main(): ap = argparse.ArgumentParser(description="Update unified version.json files") ap.add_argument("--file", required=True, help="Path to version.json") ap.add_argument("--variant", help="Variant name to update (default: base/top-level)") ap.add_argument("--component", dest="components", action="append", help="Limit to specific component(s); can be repeated") ap.add_argument("--github-latest-release", action="store_true", help="Update GitHub components to latest release tag") ap.add_argument("--github-latest-tag", action="store_true", help="Update GitHub components to latest tag") ap.add_argument("--github-latest-commit", action="store_true", help="Update GitHub components to HEAD commit") ap.add_argument("--tag-regex", help="Regex to filter tags for --github-latest-tag") ap.add_argument("--git-latest", action="store_true", help="Update fetchgit components to latest commit (HEAD)") ap.add_argument("--url-prefetch", action="store_true", help="Recompute hash for url/urlTemplate components") ap.add_argument("--prefetch", action="store_true", help="After changing refs, recompute hash as needed") ap.add_argument("--set", dest="sets", action="append", default=[], help="Set a field: KEY=VALUE (dot path), relative to variant/base. Value is treated as string.") ap.add_argument("--dry-run", action="store_true", help="Do not write changes") ap.add_argument("--print", dest="do_print", action="store_true", help="Print result JSON to stdout") args = ap.parse_args() path = args.file spec = load_json(path) # Apply --set mutations (relative to base or selected variant) target = spec if not args.variant else spec.setdefault("variants", {}).setdefault(args.variant, {}) changed = False for pair in args.sets: path_tokens, value = parse_set_pair(pair) deep_set(target, path_tokens, value) eprintln(f"Set {'.'.join((['variants', args.variant] if args.variant else []) + path_tokens)} = {value}") changed = True # Update refs/hashes based on fetcher type and flags with merged view changed = update_components(spec, args.variant, args.components, args) or changed if changed and not args.dry_run: save_json(path, spec) eprintln(f"Wrote changes to {path}") else: eprintln("No changes made.") if args.do_print: print(json.dumps(spec, indent=2, ensure_ascii=False)) if __name__ == "__main__": try: main() except KeyboardInterrupt: sys.exit(130)