#!/usr/bin/env python3 """ Interactive TUI for browsing and updating unified version.json files. Features: - Scans packages/**/version.json and lists all packages - Per-package view: - Choose base or any variant - List all sources/components with current ref (tag/rev/url/version) and hash - For GitHub sources: fetch candidates (latest release tag, latest tag, latest commit) - For Git sources: fetch latest commit (HEAD) - For URL sources: recompute hash (url/urlTemplate with rendered variables) - Actions on a component: - Update to one of the candidates (sets tag or rev) and optionally re-hash - Recompute hash (prefetch) - Edit any field via path=value (e.g., variables.version=2025.07) - Writes changes back to version.json Dependencies: - Standard library + external CLI tools: - nix-prefetch-url (or `nix prefetch-url`) and `nix hash to-sri` - nix-prefetch-git - git - Optional: GITHUB_TOKEN env var to increase GitHub API rate limits Usage: scripts/version_tui.py Controls: - Up/Down to navigate lists - Enter to select - Backspace to go back - q to quit - On component screen: r = refresh candidates h = recompute hash (prefetch) e = edit arbitrary field (path=value) s = save to disk """ import curses import json import os import re import subprocess import sys import traceback import urllib.request import urllib.error from urllib.parse import urlparse from pathlib import Path from typing import Any, Dict, List, Optional, Tuple ROOT = Path(__file__).resolve().parents[1] PKGS_DIR = ROOT / "packages" Json = Dict[str, Any] # ------------------------------ Utilities ------------------------------ def eprintln(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) def load_json(path: Path) -> Json: with path.open("r", encoding="utf-8") as f: return json.load(f) def save_json(path: Path, data: Json): tmp = path.with_suffix(".tmp") with tmp.open("w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) f.write("\n") tmp.replace(path) def render_templates(value: Any, variables: Dict[str, Any]) -> Any: if isinstance(value, str): def repl(m): name = m.group(1) return str(variables.get(name, m.group(0))) return re.sub(r"\$\{([^}]+)\}", repl, value) elif isinstance(value, dict): return {k: render_templates(v, variables) for k, v in value.items()} elif isinstance(value, list): return [render_templates(v, variables) for v in value] return value def deep_set(o: Json, path: List[str], value: Any): cur = o for p in path[:-1]: if p not in cur or not isinstance(cur[p], dict): cur[p] = {} cur = cur[p] cur[path[-1]] = value # ------------------------------ Merge helpers (match lib/versioning.nix) ------------------------------ def deep_merge(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]: out = dict(a) for k, v in b.items(): if k in out and isinstance(out[k], dict) and isinstance(v, dict): out[k] = deep_merge(out[k], v) else: out[k] = v return out def merge_sources(base_sources: Dict[str, Any], overrides: Dict[str, Any]) -> Dict[str, Any]: names = set(base_sources.keys()) | set(overrides.keys()) result: Dict[str, Any] = {} for n in names: if n in base_sources and n in overrides: if isinstance(base_sources[n], dict) and isinstance(overrides[n], dict): result[n] = deep_merge(base_sources[n], overrides[n]) else: result[n] = overrides[n] elif n in overrides: result[n] = overrides[n] else: result[n] = base_sources[n] return result def merged_view(spec: Json, variant_name: Optional[str]) -> Tuple[Dict[str, Any], Dict[str, Any], Json]: """ Returns (merged_variables, merged_sources, target_dict_to_write) merged_* are used for display/prefetch; target_dict_to_write is where updates must be written (base or selected variant). """ base_vars = spec.get("variables", {}) or {} base_sources = spec.get("sources", {}) or {} if variant_name: vdict = spec.get("variants", {}).get(variant_name) if not isinstance(vdict, dict): raise ValueError(f"Variant '{variant_name}' not found") v_vars = vdict.get("variables", {}) or {} v_sources = vdict.get("sources", {}) or {} merged_vars = dict(base_vars); merged_vars.update(v_vars) merged_srcs = merge_sources(base_sources, v_sources) return merged_vars, merged_srcs, vdict else: return dict(base_vars), dict(base_sources), spec def run_cmd(args: List[str]) -> Tuple[int, str, str]: try: p = subprocess.run(args, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False) return p.returncode, p.stdout.strip(), p.stderr.strip() except Exception as e: return 1, "", str(e) def run_get_stdout(args: List[str]) -> Optional[str]: code, out, err = run_cmd(args) if code != 0: eprintln(f"Command failed: {' '.join(args)}\n{err}") return None return out def nix_prefetch_url(url: str) -> Optional[str]: # returns SRI out = run_get_stdout(["nix-prefetch-url", "--type", "sha256", url]) if out is None: out = run_get_stdout(["nix", "prefetch-url", url]) if out is None: return None sri = run_get_stdout(["nix", "hash", "to-sri", "--type", "sha256", out.strip()]) return sri def nix_prefetch_git(url: str, rev: str) -> Optional[str]: out = run_get_stdout(["nix-prefetch-git", "--no-deepClone", "--rev", rev, url]) if out is None: return None base32 = None try: data = json.loads(out) base32 = data.get("sha256") or data.get("hash") except Exception: lines = [l for l in out.splitlines() if l.strip()] if lines: base32 = lines[-1].strip() if not base32: return None sri = run_get_stdout(["nix", "hash", "to-sri", "--type", "sha256", base32]) return sri def http_get_json(url: str, token: Optional[str] = None) -> Any: req = urllib.request.Request(url, headers={"Accept": "application/vnd.github+json"}) if token: req.add_header("Authorization", f"Bearer {token}") with urllib.request.urlopen(req) as resp: return json.loads(resp.read().decode("utf-8")) def gh_latest_release(owner: str, repo: str, token: Optional[str]) -> Optional[str]: try: data = http_get_json(f"https://api.github.com/repos/{owner}/{repo}/releases/latest", token) return data.get("tag_name") except Exception as e: eprintln(f"latest_release failed for {owner}/{repo}: {e}") return None def gh_latest_tag(owner: str, repo: str, token: Optional[str]) -> Optional[str]: try: data = http_get_json(f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=100", token) tags = [t.get("name") for t in data if "name" in t] return tags[0] if tags else None except Exception as e: eprintln(f"latest_tag failed for {owner}/{repo}: {e}") return None def gh_head_commit(owner: str, repo: str) -> Optional[str]: out = run_get_stdout(["git", "ls-remote", f"https://github.com/{owner}/{repo}.git", "HEAD"]) if not out: return None return out.split()[0] def gh_tarball_url(owner: str, repo: str, ref: str) -> str: return f"https://codeload.github.com/{owner}/{repo}/tar.gz/{ref}" def gh_release_tags_api(owner: str, repo: str, token: Optional[str]) -> List[str]: """ Return recent release tag names for a repo using GitHub API. """ try: data = http_get_json(f"https://api.github.com/repos/{owner}/{repo}/releases?per_page=50", token) return [r.get("tag_name") for r in data if isinstance(r, dict) and "tag_name" in r] except Exception as e: eprintln(f"releases list failed for {owner}/{repo}: {e}") return [] # ------------------------------ Data scanning ------------------------------ def find_packages() -> List[Tuple[str, Path]]: results = [] for p in PKGS_DIR.rglob("version.json"): # name is directory name under packages (e.g., raspberrypi/linux-rpi => raspberrypi/linux-rpi) rel = p.relative_to(PKGS_DIR).parent results.append((str(rel), p)) results.sort() return results # ------------------------------ TUI helpers ------------------------------ class ScreenBase: def __init__(self, stdscr): self.stdscr = stdscr self.status = "" def draw_status(self, height, width): if self.status: self.stdscr.addstr(height-1, 0, self.status[:max(0, width-1)]) else: self.stdscr.addstr(height-1, 0, "q: quit, Backspace: back, Enter: select") def set_status(self, text: str): self.status = text def run(self): raise NotImplementedError def prompt_input(stdscr, prompt: str) -> Optional[str]: curses.echo() stdscr.addstr(prompt) stdscr.clrtoeol() s = stdscr.getstr().decode("utf-8") curses.noecho() return s def show_popup(stdscr, lines: List[str]): h, w = stdscr.getmaxyx() box_h = min(len(lines)+4, h-2) box_w = min(max(len(l) for l in lines)+4, w-2) top = (h - box_h)//2 left = (w - box_w)//2 win = curses.newwin(box_h, box_w, top, left) win.box() for i, line in enumerate(lines, start=1): if i >= box_h-1: break win.addstr(i, 2, line[:box_w-3]) win.addstr(box_h-2, 2, "Press any key") win.refresh() win.getch() # ------------------------------ Screens ------------------------------ class PackagesScreen(ScreenBase): def __init__(self, stdscr): super().__init__(stdscr) self.packages = find_packages() self.idx = 0 def run(self): while True: self.stdscr.clear() h, w = self.stdscr.getmaxyx() self.stdscr.addstr(0, 0, "Packages (version.json)") for i, (name, _path) in enumerate(self.packages[:h-3], start=0): sel = ">" if i == self.idx else " " self.stdscr.addstr(1+i, 0, f"{sel} {name}"[:w-1]) self.draw_status(h, w) self.stdscr.refresh() ch = self.stdscr.getch() if ch in (ord('q'), 27): # q or ESC return None elif ch in (curses.KEY_UP, ord('k')): self.idx = max(0, self.idx-1) elif ch in (curses.KEY_DOWN, ord('j')): self.idx = min(len(self.packages)-1, self.idx+1) elif ch in (curses.KEY_ENTER, 10, 13): if not self.packages: continue name, path = self.packages[self.idx] try: spec = load_json(path) except Exception as e: self.set_status(f"Failed to load {path}: {e}") continue screen = PackageDetailScreen(self.stdscr, name, path, spec) ret = screen.run() if ret == "reload": # re-scan on save self.packages = find_packages() self.idx = min(self.idx, len(self.packages)-1) else: pass class PackageDetailScreen(ScreenBase): def __init__(self, stdscr, pkg_name: str, path: Path, spec: Json): super().__init__(stdscr) self.pkg_name = pkg_name self.path = path self.spec = spec self.variants = [""] + sorted(list(self.spec.get("variants", {}).keys())) self.vidx = 0 self.gh_token = os.environ.get("GITHUB_TOKEN") self.candidates: Dict[str, Dict[str, str]] = {} # name -> {release, tag, commit} self.url_candidates: Dict[str, Dict[str, str]] = {} # name -> {base, release, tag} # initialize view self.recompute_view() def select_variant(self): # Recompute merged and target views when variant changes self.recompute_view() def recompute_view(self): # Set cursor to base or selected variant dict for manual edits if self.vidx == 0: self.cursor = self.spec variant_name = None else: variant_name = self.variants[self.vidx] self.cursor = self.spec["variants"][variant_name] # Compute merged view and target dict for writing self.merged_vars, self.merged_srcs, self.target_dict = merged_view(self.spec, variant_name) self.snames = sorted(list(self.merged_srcs.keys())) self.sidx = 0 def fetch_candidates_for(self, name: str): comp = self.merged_srcs[name] fetcher = comp.get("fetcher", "none") c = {"release": "", "tag": "", "commit": ""} if fetcher == "github": owner = comp.get("owner") repo = comp.get("repo") if owner and repo: r = gh_latest_release(owner, repo, self.gh_token) if r: c["release"] = r t = gh_latest_tag(owner, repo, self.gh_token) if t: c["tag"] = t m = gh_head_commit(owner, repo) if m: c["commit"] = m elif fetcher == "git": url = comp.get("url") if url: out = run_get_stdout(["git", "ls-remote", url, "HEAD"]) if out: c["commit"] = out.split()[0] elif fetcher == "url": # Heuristic for GitHub release assets with variables in version.json (e.g., proton-cachyos) owner = self.merged_vars.get("owner") repo = self.merged_vars.get("repo") if owner and repo: tags = gh_release_tags_api(str(owner), str(repo), self.gh_token) prefix = str(self.merged_vars.get("releasePrefix", "")) suffix = str(self.merged_vars.get("releaseSuffix", "")) latest = next((t for t in tags if (t and t.startswith(prefix) and t.endswith(suffix))), None) if latest: c["release"] = latest mid = latest if prefix and mid.startswith(prefix): mid = mid[len(prefix):] if suffix and mid.endswith(suffix): mid = mid[:-len(suffix)] parts = mid.split("-") if len(parts) >= 2: base, rel = parts[0], parts[-1] self.url_candidates[name] = {"base": base, "release": rel, "tag": latest} self.candidates[name] = c def prefetch_hash_for(self, name: str) -> Optional[str]: comp = self.merged_srcs[name] fetcher = comp.get("fetcher", "none") if fetcher == "github": owner = comp.get("owner") repo = comp.get("repo") rendered = render_templates(comp, self.merged_vars) ref = rendered.get("tag") or rendered.get("rev") if owner and repo and ref: url = gh_tarball_url(owner, repo, ref) return nix_prefetch_url(url) elif fetcher == "git": url = comp.get("url") rev = comp.get("rev") if url and rev: return nix_prefetch_git(url, rev) elif fetcher == "url": rendered = render_templates(comp, self.merged_vars) url = rendered.get("url") or rendered.get("urlTemplate") if url: return nix_prefetch_url(url) return None def set_ref(self, name: str, kind: str, value: str): # Write to selected target dict (base or variant override) ts = self.target_dict.setdefault("sources", {}) comp = ts.setdefault(name, {}) if kind in ("release", "tag"): comp["tag"] = value if "rev" in comp: del comp["rev"] elif kind == "commit": comp["rev"] = value if "tag" in comp: del comp["tag"] def save(self): save_json(self.path, self.spec) def run(self): while True: self.stdscr.clear() h, w = self.stdscr.getmaxyx() title = f"{self.pkg_name} [{self.path}]" self.stdscr.addstr(0, 0, title[:w-1]) # Variant line vline = "Variants: " + " | ".join( [f"[{v}]" if i == self.vidx else v for i, v in enumerate(self.variants)] ) self.stdscr.addstr(1, 0, vline[:w-1]) # Sources header self.stdscr.addstr(2, 0, "Sources:") # List sources for i, name in enumerate(self.snames[:h-8], start=0): comp = self.merged_srcs[name] fetcher = comp.get("fetcher", "none") # Render refs so variables resolve; compress long forms for display display_ref = comp.get("tag") or comp.get("rev") or comp.get("version") or "" if fetcher == "github": rendered = render_templates(comp, self.merged_vars) tag = rendered.get("tag") rev = rendered.get("rev") owner = (rendered.get("owner") or self.merged_vars.get("owner") or "") repo = (rendered.get("repo") or self.merged_vars.get("repo") or "") if tag and owner and repo: display_ref = f"{owner}/{repo}@{tag}" elif tag: display_ref = tag elif rev and owner and repo: display_ref = f"{owner}/{repo}@{rev[:7]}" elif rev: display_ref = rev[:12] elif fetcher == "url": rendered = render_templates(comp, self.merged_vars) url = rendered.get("url") or rendered.get("urlTemplate") or "" if url: # Prefer a concise label like owner/repo@tag · filename owner = str(self.merged_vars.get("owner", "") or "") repo = str(self.merged_vars.get("repo", "") or "") rp = str(self.merged_vars.get("releasePrefix", "") or "") rs = str(self.merged_vars.get("releaseSuffix", "") or "") base = str(self.merged_vars.get("base", "") or "") rel = str(self.merged_vars.get("release", "") or "") tag = f"{rp}{base}-{rel}{rs}" if (base and rel) else "" parsed = urlparse(url) filename = os.path.basename(parsed.path) if parsed and parsed.path else "" if owner and repo and tag and filename: display_ref = f"{owner}/{repo}@{tag} · {filename}" elif filename: display_ref = filename else: display_ref = url else: display_ref = "" ref_short = display_ref if not isinstance(display_ref, str) else (display_ref[:60] + ("..." if len(display_ref) > 60 else "")) sel = ">" if i == self.sidx else " " self.stdscr.addstr(3+i, 0, f"{sel} {name:<20} {fetcher:<7} ref={ref_short}"[:w-1]) # Footer instructions self.stdscr.addstr(h-4, 0, "Enter: component actions | r: refresh candidates | h: prefetch hash | e: edit field | s: save | Backspace: back | q: quit") self.draw_status(h, w) self.stdscr.refresh() ch = self.stdscr.getch() if ch in (ord('q'), 27): return None elif ch == curses.KEY_BACKSPACE or ch == 127: return "reload" elif ch in (curses.KEY_LEFT, ord('h')): self.vidx = max(0, self.vidx-1) self.select_variant() elif ch in (curses.KEY_RIGHT, ord('l')): self.vidx = min(len(self.variants)-1, self.vidx+1) self.select_variant() elif ch in (curses.KEY_UP, ord('k')): self.sidx = max(0, self.sidx-1) elif ch in (curses.KEY_DOWN, ord('j')): self.sidx = min(len(self.snames)-1, self.sidx+1) elif ch in (ord('r'),): if self.snames: name = self.snames[self.sidx] self.fetch_candidates_for(name) cand = self.candidates.get(name, {}) lines = [ f"Candidates for {name}:", f" latest release: {cand.get('release') or '-'}", f" latest tag : {cand.get('tag') or '-'}", f" latest commit : {cand.get('commit') or '-'}", ] show_popup(self.stdscr, lines) elif ch in (ord('i'),): # Show full rendered URL for URL-based sources if self.snames: name = self.snames[self.sidx] comp = self.merged_srcs[name] if comp.get("fetcher", "none") == "url": rendered = render_templates(comp, self.merged_vars) url = rendered.get("url") or rendered.get("urlTemplate") or "" if url: show_popup(self.stdscr, ["Full URL:", url]) else: self.set_status("No URL available") elif ch in (ord('h'),): if self.snames: name = self.snames[self.sidx] sri = self.prefetch_hash_for(name) if sri: ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["hash"] = sri self.set_status(f"{name}: updated hash") else: self.set_status(f"{name}: hash prefetch failed") elif ch in (ord('e'),): s = prompt_input(self.stdscr, "Edit path=value (relative to selected base/variant): ") if s: if "=" not in s: self.set_status("Invalid input, expected key.path=value") else: k, v = s.split("=", 1) path = [p for p in k.split(".") if p] deep_set(self.cursor, path, v) self.set_status(f"Set {k}={v}") elif ch in (ord('s'),): try: self.save() self.set_status("Saved.") except Exception as e: self.set_status(f"Save failed: {e}") elif ch in (curses.KEY_ENTER, 10, 13): if not self.snames: continue name = self.snames[self.sidx] comp = self.merged_srcs[name] fetcher = comp.get("fetcher", "none") if fetcher in ("github", "git"): # Ensure candidates loaded if name not in self.candidates: self.fetch_candidates_for(name) cand = self.candidates.get(name, {}) # Present small menu items = [] if fetcher == "github": items = [ ("Use latest release (tag)", ("release", cand.get("release"))), ("Use latest tag", ("tag", cand.get("tag"))), ("Use latest commit (rev)", ("commit", cand.get("commit"))), ("Recompute hash", ("hash", None)), ("Cancel", ("cancel", None)), ] else: items = [ ("Use latest commit (rev)", ("commit", cand.get("commit"))), ("Recompute hash", ("hash", None)), ("Cancel", ("cancel", None)), ] choice = select_menu(self.stdscr, f"Actions for {name}", [label for label, _ in items]) if choice is not None: kind, val = items[choice][1] if kind in ("release", "tag", "commit"): if val: self.set_ref(name, kind, val) # update hash sri = self.prefetch_hash_for(name) if sri: ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["hash"] = sri self.set_status(f"{name}: set {kind} and updated hash") else: self.set_status(f"No candidate {kind}") elif kind == "hash": sri = self.prefetch_hash_for(name) if sri: ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["hash"] = sri self.set_status(f"{name}: updated hash") else: self.set_status("hash prefetch failed") else: pass elif fetcher == "url": # Offer latest release update (for proton-cachyos-like schemas) and/or hash recompute cand = self.url_candidates.get(name) menu_items: List[Tuple[str, Tuple[str, Optional[Dict[str, str]]]]] = [] if cand and cand.get("base") and cand.get("release"): menu_items.append(("Use latest release (update variables.base/release)", ("update_vars", cand))) menu_items.append(("Recompute hash (prefetch)", ("hash", None))) menu_items.append(("Cancel", ("cancel", None))) choice = select_menu(self.stdscr, f"Actions for {name}", [label for label, _ in menu_items]) if choice is not None: kind, payload = menu_items[choice][1] if kind == "update_vars" and isinstance(payload, dict): # Write variables into selected base/variant dict vars_dict = self.target_dict.setdefault("variables", {}) vars_dict["base"] = payload["base"] vars_dict["release"] = payload["release"] # Recompute merged view to reflect new variables self.recompute_view() # Prefetch and update hash sri = self.prefetch_hash_for(name) if sri: ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["hash"] = sri self.set_status(f"{name}: updated to {payload['base']}.{payload['release']} and refreshed hash") else: self.set_status("hash prefetch failed after variable update") elif kind == "hash": sri = self.prefetch_hash_for(name) if sri: ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["hash"] = sri self.set_status(f"{name}: updated hash") else: self.set_status("hash prefetch failed") else: pass else: show_popup(self.stdscr, [f"{name}: fetcher={fetcher}", "Use 'e' to edit fields manually."]) else: pass def select_menu(stdscr, title: str, options: List[str]) -> Optional[int]: idx = 0 while True: stdscr.clear() h, w = stdscr.getmaxyx() stdscr.addstr(0, 0, title[:w-1]) for i, opt in enumerate(options[:h-3], start=0): sel = ">" if i == idx else " " stdscr.addstr(2+i, 0, f"{sel} {opt}"[:w-1]) stdscr.addstr(h-1, 0, "Enter: select | Backspace: cancel") stdscr.refresh() ch = stdscr.getch() if ch in (curses.KEY_UP, ord('k')): idx = max(0, idx-1) elif ch in (curses.KEY_DOWN, ord('j')): idx = min(len(options)-1, idx+1) elif ch in (curses.KEY_ENTER, 10, 13): return idx elif ch == curses.KEY_BACKSPACE or ch == 127 or ch == 27: return None # ------------------------------ main ------------------------------ def main(stdscr): curses.curs_set(0) stdscr.nodelay(False) try: screen = PackagesScreen(stdscr) screen.run() except Exception: curses.endwin() traceback.print_exc() sys.exit(1) if __name__ == "__main__": curses.wrapper(main)