diff --git a/scripts/update_versions.py b/scripts/update_versions.py new file mode 100755 index 0000000..e06b0bd --- /dev/null +++ b/scripts/update_versions.py @@ -0,0 +1,416 @@ +#!/usr/bin/env python3 +""" +Unified version.json updater (TUI-friendly core logic). + +Improvements: +- Correctly merges base + variant variables and sources (component-wise deep merge) +- Updates are written back into the correct dictionary: + - Base: top-level spec["sources"][name] + - Variant: spec["variants"][variant]["sources"][name] (created if missing) +- Hash prefetch uses the merged view with rendered variables + +Supports: +- Updating GitHub components to latest release tag, latest tag, or latest commit +- Updating Git (fetchgit) components to latest commit on default branch +- Recomputing SRI hash for url/urlTemplate, github tarballs, and fetchgit sources +- Setting arbitrary fields (variables.* or sources.*.*) via --set path=value +- Operating on a specific variant or the base (top-level) of a version.json + +Requirements: +- nix-prefetch-url (or `nix prefetch-url`) and `nix hash to-sri` for URL hashing +- nix-prefetch-git + `nix hash to-sri` for Git fetchers +- Network access for GitHub API (optional GITHUB_TOKEN env var) + +Examples: + scripts/update_versions.py --file packages/edk2/version.json --github-latest-release --prefetch + scripts/update_versions.py --file packages/edk2/version.json --component edk2 --github-latest-commit --prefetch + scripts/update_versions.py --file packages/uboot/version.json --url-prefetch + scripts/update_versions.py --file packages/proton-cachyos/version.json --variant cachyos-v4 --set variables.base=10.0 + scripts/update_versions.py --file packages/linux-cachyos/version.json --component zfs --git-latest --prefetch +""" +import argparse +import json +import os +import re +import subprocess +import sys +import urllib.request +import urllib.error +from typing import Any, Dict, List, Optional, Tuple + +Json = Dict[str, Any] + + +def eprintln(*args, **kwargs): + print(*args, file=sys.stderr, **kwargs) + + +def load_json(path: str) -> Json: + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + + +def save_json(path: str, data: Json): + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + f.write("\n") + + +def deep_get(o: Json, path: List[str], default=None): + cur = o + for p in path: + if isinstance(cur, dict) and p in cur: + cur = cur[p] + else: + return default + return cur + + +def deep_set(o: Json, path: List[str], value: Any): + cur = o + for p in path[:-1]: + if p not in cur or not isinstance(cur[p], dict): + cur[p] = {} + cur = cur[p] + cur[path[-1]] = value + + +def parse_set_pair(pair: str) -> Tuple[List[str], str]: + if "=" not in pair: + raise ValueError(f"--set requires KEY=VALUE, got: {pair}") + key, val = pair.split("=", 1) + path = key.strip().split(".") + return path, val + + +def render_templates(value: Any, variables: Dict[str, Any]) -> Any: + # Simple ${var} string replacement across strings/structures + if isinstance(value, str): + def repl(m): + name = m.group(1) + return str(variables.get(name, m.group(0))) + return re.sub(r"\$\{([^}]+)\}", repl, value) + elif isinstance(value, dict): + return {k: render_templates(v, variables) for k, v in value.items()} + elif isinstance(value, list): + return [render_templates(v, variables) for v in value] + return value + + +def http_get_json(url: str, token: Optional[str] = None) -> Any: + req = urllib.request.Request(url, headers={"Accept": "application/vnd.github+json"}) + if token: + req.add_header("Authorization", f"Bearer {token}") + with urllib.request.urlopen(req) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def github_latest_release_tag(owner: str, repo: str, token: Optional[str] = None) -> Optional[str]: + url = f"https://api.github.com/repos/{owner}/{repo}/releases/latest" + try: + data = http_get_json(url, token) + tag = data.get("tag_name") + return tag + except urllib.error.HTTPError as e: + eprintln(f"GitHub latest release failed: {e}") + return None + + +def github_latest_tag(owner: str, repo: str, token: Optional[str] = None, tag_regex: Optional[str] = None) -> Optional[str]: + url = f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=100" + try: + data = http_get_json(url, token) + tags = [t.get("name") for t in data if "name" in t] + if tag_regex: + rx = re.compile(tag_regex) + tags = [t for t in tags if rx.search(t)] + return tags[0] if tags else None + except urllib.error.HTTPError as e: + eprintln(f"GitHub tags failed: {e}") + return None + + +def github_head_commit(owner: str, repo: str, token: Optional[str] = None) -> Optional[str]: + # Prefer git ls-remote to avoid API limits + url = f"https://github.com/{owner}/{repo}.git" + try: + out = subprocess.check_output(["git", "ls-remote", url, "HEAD"], text=True).strip() + if out: + sha = out.split()[0] + return sha + except Exception as e: + eprintln(f"git ls-remote failed for {url}: {e}") + return None + + +def run_cmd_get_output(args: List[str]) -> str: + eprintln(f"Running: {' '.join(args)}") + return subprocess.check_output(args, text=True).strip() + + +def nix_prefetch_url(url: str) -> Optional[str]: + # returns SRI (sha256-...) + base32 = None + try: + base32 = run_cmd_get_output(["nix-prefetch-url", "--type", "sha256", url]) + except Exception: + try: + base32 = run_cmd_get_output(["nix", "prefetch-url", url]) + except Exception as e: + eprintln(f"Failed to prefetch url: {url}: {e}") + return None + try: + sri = run_cmd_get_output(["nix", "hash", "to-sri", "--type", "sha256", base32]) + return sri + except Exception as e: + eprintln(f"Failed to convert base32 to SRI: {e}") + return None + + +def github_tarball_url(owner: str, repo: str, ref: str) -> str: + # codeload is stable for tarball + return f"https://codeload.github.com/{owner}/{repo}/tar.gz/{ref}" + + +def nix_prefetch_github_tarball(owner: str, repo: str, ref: str) -> Optional[str]: + url = github_tarball_url(owner, repo, ref) + return nix_prefetch_url(url) + + +def nix_prefetch_git(url: str, rev: str) -> Optional[str]: + # returns SRI + try: + out = run_cmd_get_output(["nix-prefetch-git", "--no-deepClone", "--rev", rev, url]) + try: + data = json.loads(out) + base32 = data.get("sha256") or data.get("hash") + except Exception: + base32 = out.splitlines()[-1].strip() + if not base32: + eprintln(f"Could not parse nix-prefetch-git output for {url}@{rev}") + return None + sri = run_cmd_get_output(["nix", "hash", "to-sri", "--type", "sha256", base32]) + return sri + except Exception as e: + eprintln(f"nix-prefetch-git failed for {url}@{rev}: {e}") + return None + + +# -------------------- Merging logic (match lib/versioning.nix) -------------------- + +def deep_merge(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]: + out = dict(a) + for k, v in b.items(): + if k in out and isinstance(out[k], dict) and isinstance(v, dict): + out[k] = deep_merge(out[k], v) + else: + out[k] = v + return out + + +def merge_sources(base_sources: Dict[str, Any], overrides: Dict[str, Any]) -> Dict[str, Any]: + names = set(base_sources.keys()) | set(overrides.keys()) + result: Dict[str, Any] = {} + for n in names: + if n in base_sources and n in overrides: + if isinstance(base_sources[n], dict) and isinstance(overrides[n], dict): + result[n] = deep_merge(base_sources[n], overrides[n]) + else: + result[n] = overrides[n] + elif n in overrides: + result[n] = overrides[n] + else: + result[n] = base_sources[n] + return result + + +def merged_view(spec: Json, variant: Optional[str]) -> Tuple[Dict[str, Any], Dict[str, Any], Json, List[str]]: + """ + Returns (merged_variables, merged_sources, target_dict_to_write, base_path) + - merged_*: what to display/prefetch with + - target_dict_to_write: where to write changes (base or variants[variant]) + """ + base_vars = spec.get("variables", {}) or {} + base_sources = spec.get("sources", {}) or {} + if variant: + vdict = spec.get("variants", {}).get(variant) + if not isinstance(vdict, dict): + raise ValueError(f"Variant '{variant}' not found") + v_vars = vdict.get("variables", {}) or {} + v_sources = vdict.get("sources", {}) or {} + merged_vars = dict(base_vars) + merged_vars.update(v_vars) + merged_srcs = merge_sources(base_sources, v_sources) + return merged_vars, merged_srcs, vdict, ["variants", variant] + else: + return dict(base_vars), dict(base_sources), spec, [] + + +# -------------------- Update operations -------------------- + +def update_components(spec: Json, + variant: Optional[str], + components: Optional[List[str]], + args: argparse.Namespace) -> bool: + changed = False + gh_token = os.environ.get("GITHUB_TOKEN") + + merged_vars, merged_srcs, target_dict, base_path = merged_view(spec, variant) + src_names = list(merged_srcs.keys()) if not components else [c for c in components if c in merged_srcs] + + # Ensure target_dict has a sources dict to write into + target_sources = target_dict.setdefault("sources", {}) + + for name in src_names: + view_comp = merged_srcs[name] + fetcher = view_comp.get("fetcher", "none") + + # Ensure a writable component entry exists (always write to the selected target: base or variant override) + comp = target_sources.setdefault(name, {}) + if not isinstance(comp, dict): + comp = target_sources[name] = {} + + if fetcher == "github": + owner = view_comp.get("owner") + repo = view_comp.get("repo") + if not owner or not repo: + eprintln(f"Component {name}: missing owner/repo for github fetcher") + continue + + new_ref = None + ref_kind = None + if args.github_latest_release: + tag = github_latest_release_tag(owner, repo, gh_token) + if tag: + new_ref = tag + ref_kind = "tag" + elif args.github_latest_tag: + tag = github_latest_tag(owner, repo, gh_token, args.tag_regex) + if tag: + new_ref = tag + ref_kind = "tag" + elif args.github_latest_commit: + rev = github_head_commit(owner, repo, gh_token) + if rev: + new_ref = rev + ref_kind = "rev" + + if new_ref: + if ref_kind == "tag": + comp["tag"] = new_ref + if "rev" in comp: + del comp["rev"] + else: + comp["rev"] = new_ref + if "tag" in comp: + del comp["tag"] + eprintln(f"Component {name}: set {ref_kind}={new_ref}") + changed = True + + if args.prefetch: + ref = comp.get("tag") or comp.get("rev") + if not ref: + # fallback to merged view if not in override + ref = view_comp.get("tag") or view_comp.get("rev") + if ref: + sri = nix_prefetch_github_tarball(owner, repo, ref) + if sri: + comp["hash"] = sri + eprintln(f"Component {name}: updated hash={sri}") + changed = True + + elif fetcher == "git": + url = view_comp.get("url") + if not url: + eprintln(f"Component {name}: missing url for git fetcher") + continue + if args.git_latest: + rev = github_head_commit(owner="", repo="", token=None) # placeholder; we will ls-remote below + try: + out = subprocess.check_output(["git", "ls-remote", url, "HEAD"], text=True).strip() + if out: + new_rev = out.split()[0] + comp["rev"] = new_rev + eprintln(f"Component {name}: set rev={new_rev}") + changed = True + if args.prefetch: + sri = nix_prefetch_git(url, new_rev) + if sri: + comp["hash"] = sri + eprintln(f"Component {name}: updated hash={sri}") + changed = True + except Exception as e: + eprintln(f"git ls-remote failed for {name}: {e}") + + elif fetcher == "url": + if args.url_prefetch or args.prefetch: + rendered_comp = render_templates(view_comp, merged_vars) + url = rendered_comp.get("url") or rendered_comp.get("urlTemplate") + if not url: + eprintln(f"Component {name}: missing url/urlTemplate for url fetcher") + else: + sri = nix_prefetch_url(url) + if sri: + comp["hash"] = sri + eprintln(f"Component {name}: updated hash={sri}") + changed = True + + elif fetcher == "pypi": + if args.prefetch: + eprintln(f"Component {name} (pypi): prefetch not implemented; use nix-prefetch-pypi or set hash manually.") + else: + # fetcher == "none" or other: no-op unless user --set a value + pass + + return changed + + +# -------------------- Main -------------------- + +def main(): + ap = argparse.ArgumentParser(description="Update unified version.json files") + ap.add_argument("--file", required=True, help="Path to version.json") + ap.add_argument("--variant", help="Variant name to update (default: base/top-level)") + ap.add_argument("--component", dest="components", action="append", help="Limit to specific component(s); can be repeated") + ap.add_argument("--github-latest-release", action="store_true", help="Update GitHub components to latest release tag") + ap.add_argument("--github-latest-tag", action="store_true", help="Update GitHub components to latest tag") + ap.add_argument("--github-latest-commit", action="store_true", help="Update GitHub components to HEAD commit") + ap.add_argument("--tag-regex", help="Regex to filter tags for --github-latest-tag") + ap.add_argument("--git-latest", action="store_true", help="Update fetchgit components to latest commit (HEAD)") + ap.add_argument("--url-prefetch", action="store_true", help="Recompute hash for url/urlTemplate components") + ap.add_argument("--prefetch", action="store_true", help="After changing refs, recompute hash as needed") + ap.add_argument("--set", dest="sets", action="append", default=[], help="Set a field: KEY=VALUE (dot path), relative to variant/base. Value is treated as string.") + ap.add_argument("--dry-run", action="store_true", help="Do not write changes") + ap.add_argument("--print", dest="do_print", action="store_true", help="Print result JSON to stdout") + args = ap.parse_args() + + path = args.file + spec = load_json(path) + + # Apply --set mutations (relative to base or selected variant) + target = spec if not args.variant else spec.setdefault("variants", {}).setdefault(args.variant, {}) + changed = False + for pair in args.sets: + path_tokens, value = parse_set_pair(pair) + deep_set(target, path_tokens, value) + eprintln(f"Set {'.'.join((['variants', args.variant] if args.variant else []) + path_tokens)} = {value}") + changed = True + + # Update refs/hashes based on fetcher type and flags with merged view + changed = update_components(spec, args.variant, args.components, args) or changed + + if changed and not args.dry_run: + save_json(path, spec) + eprintln(f"Wrote changes to {path}") + else: + eprintln("No changes made.") + + if args.do_print: + print(json.dumps(spec, indent=2, ensure_ascii=False)) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + sys.exit(130) diff --git a/scripts/version_tui.py b/scripts/version_tui.py new file mode 100644 index 0000000..af4c21b --- /dev/null +++ b/scripts/version_tui.py @@ -0,0 +1,694 @@ +#!/usr/bin/env python3 +""" +Interactive TUI for browsing and updating unified version.json files. + +Features: +- Scans packages/**/version.json and lists all packages +- Per-package view: + - Choose base or any variant + - List all sources/components with current ref (tag/rev/url/version) and hash + - For GitHub sources: fetch candidates (latest release tag, latest tag, latest commit) + - For Git sources: fetch latest commit (HEAD) + - For URL sources: recompute hash (url/urlTemplate with rendered variables) +- Actions on a component: + - Update to one of the candidates (sets tag or rev) and optionally re-hash + - Recompute hash (prefetch) + - Edit any field via path=value (e.g., variables.version=2025.07) +- Writes changes back to version.json + +Dependencies: +- Standard library + external CLI tools: + - nix-prefetch-url (or `nix prefetch-url`) and `nix hash to-sri` + - nix-prefetch-git + - git +- Optional: GITHUB_TOKEN env var to increase GitHub API rate limits + +Usage: + scripts/version_tui.py +Controls: + - Up/Down to navigate lists + - Enter to select + - Backspace to go back + - q to quit + - On component screen: + r = refresh candidates + h = recompute hash (prefetch) + e = edit arbitrary field (path=value) + s = save to disk +""" + +import curses +import json +import os +import re +import subprocess +import sys +import traceback +import urllib.request +import urllib.error +from urllib.parse import urlparse +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +ROOT = Path(__file__).resolve().parents[1] +PKGS_DIR = ROOT / "packages" + +Json = Dict[str, Any] + +# ------------------------------ Utilities ------------------------------ + +def eprintln(*args, **kwargs): + print(*args, file=sys.stderr, **kwargs) + +def load_json(path: Path) -> Json: + with path.open("r", encoding="utf-8") as f: + return json.load(f) + +def save_json(path: Path, data: Json): + tmp = path.with_suffix(".tmp") + with tmp.open("w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + f.write("\n") + tmp.replace(path) + +def render_templates(value: Any, variables: Dict[str, Any]) -> Any: + if isinstance(value, str): + def repl(m): + name = m.group(1) + return str(variables.get(name, m.group(0))) + return re.sub(r"\$\{([^}]+)\}", repl, value) + elif isinstance(value, dict): + return {k: render_templates(v, variables) for k, v in value.items()} + elif isinstance(value, list): + return [render_templates(v, variables) for v in value] + return value + +def deep_set(o: Json, path: List[str], value: Any): + cur = o + for p in path[:-1]: + if p not in cur or not isinstance(cur[p], dict): + cur[p] = {} + cur = cur[p] + cur[path[-1]] = value + +# ------------------------------ Merge helpers (match lib/versioning.nix) ------------------------------ + +def deep_merge(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]: + out = dict(a) + for k, v in b.items(): + if k in out and isinstance(out[k], dict) and isinstance(v, dict): + out[k] = deep_merge(out[k], v) + else: + out[k] = v + return out + +def merge_sources(base_sources: Dict[str, Any], overrides: Dict[str, Any]) -> Dict[str, Any]: + names = set(base_sources.keys()) | set(overrides.keys()) + result: Dict[str, Any] = {} + for n in names: + if n in base_sources and n in overrides: + if isinstance(base_sources[n], dict) and isinstance(overrides[n], dict): + result[n] = deep_merge(base_sources[n], overrides[n]) + else: + result[n] = overrides[n] + elif n in overrides: + result[n] = overrides[n] + else: + result[n] = base_sources[n] + return result + +def merged_view(spec: Json, variant_name: Optional[str]) -> Tuple[Dict[str, Any], Dict[str, Any], Json]: + """ + Returns (merged_variables, merged_sources, target_dict_to_write) + merged_* are used for display/prefetch; target_dict_to_write is where updates must be written (base or selected variant). + """ + base_vars = spec.get("variables", {}) or {} + base_sources = spec.get("sources", {}) or {} + if variant_name: + vdict = spec.get("variants", {}).get(variant_name) + if not isinstance(vdict, dict): + raise ValueError(f"Variant '{variant_name}' not found") + v_vars = vdict.get("variables", {}) or {} + v_sources = vdict.get("sources", {}) or {} + merged_vars = dict(base_vars); merged_vars.update(v_vars) + merged_srcs = merge_sources(base_sources, v_sources) + return merged_vars, merged_srcs, vdict + else: + return dict(base_vars), dict(base_sources), spec + +def run_cmd(args: List[str]) -> Tuple[int, str, str]: + try: + p = subprocess.run(args, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False) + return p.returncode, p.stdout.strip(), p.stderr.strip() + except Exception as e: + return 1, "", str(e) + +def run_get_stdout(args: List[str]) -> Optional[str]: + code, out, err = run_cmd(args) + if code != 0: + eprintln(f"Command failed: {' '.join(args)}\n{err}") + return None + return out + +def nix_prefetch_url(url: str) -> Optional[str]: + # returns SRI + out = run_get_stdout(["nix-prefetch-url", "--type", "sha256", url]) + if out is None: + out = run_get_stdout(["nix", "prefetch-url", url]) + if out is None: + return None + sri = run_get_stdout(["nix", "hash", "to-sri", "--type", "sha256", out.strip()]) + return sri + +def nix_prefetch_git(url: str, rev: str) -> Optional[str]: + out = run_get_stdout(["nix-prefetch-git", "--no-deepClone", "--rev", rev, url]) + if out is None: + return None + base32 = None + try: + data = json.loads(out) + base32 = data.get("sha256") or data.get("hash") + except Exception: + lines = [l for l in out.splitlines() if l.strip()] + if lines: + base32 = lines[-1].strip() + if not base32: + return None + sri = run_get_stdout(["nix", "hash", "to-sri", "--type", "sha256", base32]) + return sri + +def http_get_json(url: str, token: Optional[str] = None) -> Any: + req = urllib.request.Request(url, headers={"Accept": "application/vnd.github+json"}) + if token: + req.add_header("Authorization", f"Bearer {token}") + with urllib.request.urlopen(req) as resp: + return json.loads(resp.read().decode("utf-8")) + +def gh_latest_release(owner: str, repo: str, token: Optional[str]) -> Optional[str]: + try: + data = http_get_json(f"https://api.github.com/repos/{owner}/{repo}/releases/latest", token) + return data.get("tag_name") + except Exception as e: + eprintln(f"latest_release failed for {owner}/{repo}: {e}") + return None + +def gh_latest_tag(owner: str, repo: str, token: Optional[str]) -> Optional[str]: + try: + data = http_get_json(f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=100", token) + tags = [t.get("name") for t in data if "name" in t] + return tags[0] if tags else None + except Exception as e: + eprintln(f"latest_tag failed for {owner}/{repo}: {e}") + return None + +def gh_head_commit(owner: str, repo: str) -> Optional[str]: + out = run_get_stdout(["git", "ls-remote", f"https://github.com/{owner}/{repo}.git", "HEAD"]) + if not out: + return None + return out.split()[0] + +def gh_tarball_url(owner: str, repo: str, ref: str) -> str: + return f"https://codeload.github.com/{owner}/{repo}/tar.gz/{ref}" + +def gh_release_tags_api(owner: str, repo: str, token: Optional[str]) -> List[str]: + """ + Return recent release tag names for a repo using GitHub API. + """ + try: + data = http_get_json(f"https://api.github.com/repos/{owner}/{repo}/releases?per_page=50", token) + return [r.get("tag_name") for r in data if isinstance(r, dict) and "tag_name" in r] + except Exception as e: + eprintln(f"releases list failed for {owner}/{repo}: {e}") + return [] + +# ------------------------------ Data scanning ------------------------------ + +def find_packages() -> List[Tuple[str, Path]]: + results = [] + for p in PKGS_DIR.rglob("version.json"): + # name is directory name under packages (e.g., raspberrypi/linux-rpi => raspberrypi/linux-rpi) + rel = p.relative_to(PKGS_DIR).parent + results.append((str(rel), p)) + results.sort() + return results + +# ------------------------------ TUI helpers ------------------------------ + +class ScreenBase: + def __init__(self, stdscr): + self.stdscr = stdscr + self.status = "" + + def draw_status(self, height, width): + if self.status: + self.stdscr.addstr(height-1, 0, self.status[:max(0, width-1)]) + else: + self.stdscr.addstr(height-1, 0, "q: quit, Backspace: back, Enter: select") + + def set_status(self, text: str): + self.status = text + + def run(self): + raise NotImplementedError + +def prompt_input(stdscr, prompt: str) -> Optional[str]: + curses.echo() + stdscr.addstr(prompt) + stdscr.clrtoeol() + s = stdscr.getstr().decode("utf-8") + curses.noecho() + return s + +def show_popup(stdscr, lines: List[str]): + h, w = stdscr.getmaxyx() + box_h = min(len(lines)+4, h-2) + box_w = min(max(len(l) for l in lines)+4, w-2) + top = (h - box_h)//2 + left = (w - box_w)//2 + win = curses.newwin(box_h, box_w, top, left) + win.box() + for i, line in enumerate(lines, start=1): + if i >= box_h-1: + break + win.addstr(i, 2, line[:box_w-3]) + win.addstr(box_h-2, 2, "Press any key") + win.refresh() + win.getch() + +# ------------------------------ Screens ------------------------------ + +class PackagesScreen(ScreenBase): + def __init__(self, stdscr): + super().__init__(stdscr) + self.packages = find_packages() + self.idx = 0 + + def run(self): + while True: + self.stdscr.clear() + h, w = self.stdscr.getmaxyx() + self.stdscr.addstr(0, 0, "Packages (version.json)") + for i, (name, _path) in enumerate(self.packages[:h-3], start=0): + sel = ">" if i == self.idx else " " + self.stdscr.addstr(1+i, 0, f"{sel} {name}"[:w-1]) + self.draw_status(h, w) + self.stdscr.refresh() + ch = self.stdscr.getch() + if ch in (ord('q'), 27): # q or ESC + return None + elif ch in (curses.KEY_UP, ord('k')): + self.idx = max(0, self.idx-1) + elif ch in (curses.KEY_DOWN, ord('j')): + self.idx = min(len(self.packages)-1, self.idx+1) + elif ch in (curses.KEY_ENTER, 10, 13): + if not self.packages: + continue + name, path = self.packages[self.idx] + try: + spec = load_json(path) + except Exception as e: + self.set_status(f"Failed to load {path}: {e}") + continue + screen = PackageDetailScreen(self.stdscr, name, path, spec) + ret = screen.run() + if ret == "reload": + # re-scan on save + self.packages = find_packages() + self.idx = min(self.idx, len(self.packages)-1) + else: + pass + +class PackageDetailScreen(ScreenBase): + def __init__(self, stdscr, pkg_name: str, path: Path, spec: Json): + super().__init__(stdscr) + self.pkg_name = pkg_name + self.path = path + self.spec = spec + self.variants = [""] + sorted(list(self.spec.get("variants", {}).keys())) + self.vidx = 0 + self.gh_token = os.environ.get("GITHUB_TOKEN") + self.candidates: Dict[str, Dict[str, str]] = {} # name -> {release, tag, commit} + self.url_candidates: Dict[str, Dict[str, str]] = {} # name -> {base, release, tag} + # initialize view + self.recompute_view() + + def select_variant(self): + # Recompute merged and target views when variant changes + self.recompute_view() + + def recompute_view(self): + # Set cursor to base or selected variant dict for manual edits + if self.vidx == 0: + self.cursor = self.spec + variant_name = None + else: + variant_name = self.variants[self.vidx] + self.cursor = self.spec["variants"][variant_name] + # Compute merged view and target dict for writing + self.merged_vars, self.merged_srcs, self.target_dict = merged_view(self.spec, variant_name) + self.snames = sorted(list(self.merged_srcs.keys())) + self.sidx = 0 + + def fetch_candidates_for(self, name: str): + comp = self.merged_srcs[name] + fetcher = comp.get("fetcher", "none") + c = {"release": "", "tag": "", "commit": ""} + if fetcher == "github": + owner = comp.get("owner") + repo = comp.get("repo") + if owner and repo: + r = gh_latest_release(owner, repo, self.gh_token) + if r: c["release"] = r + t = gh_latest_tag(owner, repo, self.gh_token) + if t: c["tag"] = t + m = gh_head_commit(owner, repo) + if m: c["commit"] = m + elif fetcher == "git": + url = comp.get("url") + if url: + out = run_get_stdout(["git", "ls-remote", url, "HEAD"]) + if out: + c["commit"] = out.split()[0] + elif fetcher == "url": + # Heuristic for GitHub release assets with variables in version.json (e.g., proton-cachyos) + owner = self.merged_vars.get("owner") + repo = self.merged_vars.get("repo") + if owner and repo: + tags = gh_release_tags_api(str(owner), str(repo), self.gh_token) + prefix = str(self.merged_vars.get("releasePrefix", "")) + suffix = str(self.merged_vars.get("releaseSuffix", "")) + latest = next((t for t in tags if (t and t.startswith(prefix) and t.endswith(suffix))), None) + if latest: + c["release"] = latest + mid = latest + if prefix and mid.startswith(prefix): + mid = mid[len(prefix):] + if suffix and mid.endswith(suffix): + mid = mid[:-len(suffix)] + parts = mid.split("-") + if len(parts) >= 2: + base, rel = parts[0], parts[-1] + self.url_candidates[name] = {"base": base, "release": rel, "tag": latest} + self.candidates[name] = c + + def prefetch_hash_for(self, name: str) -> Optional[str]: + comp = self.merged_srcs[name] + fetcher = comp.get("fetcher", "none") + if fetcher == "github": + owner = comp.get("owner") + repo = comp.get("repo") + rendered = render_templates(comp, self.merged_vars) + ref = rendered.get("tag") or rendered.get("rev") + if owner and repo and ref: + url = gh_tarball_url(owner, repo, ref) + return nix_prefetch_url(url) + elif fetcher == "git": + url = comp.get("url") + rev = comp.get("rev") + if url and rev: + return nix_prefetch_git(url, rev) + elif fetcher == "url": + rendered = render_templates(comp, self.merged_vars) + url = rendered.get("url") or rendered.get("urlTemplate") + if url: + return nix_prefetch_url(url) + return None + + def set_ref(self, name: str, kind: str, value: str): + # Write to selected target dict (base or variant override) + ts = self.target_dict.setdefault("sources", {}) + comp = ts.setdefault(name, {}) + if kind in ("release", "tag"): + comp["tag"] = value + if "rev" in comp: + del comp["rev"] + elif kind == "commit": + comp["rev"] = value + if "tag" in comp: + del comp["tag"] + + def save(self): + save_json(self.path, self.spec) + + def run(self): + while True: + self.stdscr.clear() + h, w = self.stdscr.getmaxyx() + title = f"{self.pkg_name} [{self.path}]" + self.stdscr.addstr(0, 0, title[:w-1]) + # Variant line + vline = "Variants: " + " | ".join( + [f"[{v}]" if i == self.vidx else v for i, v in enumerate(self.variants)] + ) + self.stdscr.addstr(1, 0, vline[:w-1]) + # Sources header + self.stdscr.addstr(2, 0, "Sources:") + # List sources + for i, name in enumerate(self.snames[:h-8], start=0): + comp = self.merged_srcs[name] + fetcher = comp.get("fetcher", "none") + # Render refs so variables resolve; compress long forms for display + display_ref = comp.get("tag") or comp.get("rev") or comp.get("version") or "" + if fetcher == "github": + rendered = render_templates(comp, self.merged_vars) + tag = rendered.get("tag") + rev = rendered.get("rev") + owner = (rendered.get("owner") or self.merged_vars.get("owner") or "") + repo = (rendered.get("repo") or self.merged_vars.get("repo") or "") + if tag and owner and repo: + display_ref = f"{owner}/{repo}@{tag}" + elif tag: + display_ref = tag + elif rev and owner and repo: + display_ref = f"{owner}/{repo}@{rev[:7]}" + elif rev: + display_ref = rev[:12] + elif fetcher == "url": + rendered = render_templates(comp, self.merged_vars) + url = rendered.get("url") or rendered.get("urlTemplate") or "" + if url: + # Prefer a concise label like owner/repo@tag · filename + owner = str(self.merged_vars.get("owner", "") or "") + repo = str(self.merged_vars.get("repo", "") or "") + rp = str(self.merged_vars.get("releasePrefix", "") or "") + rs = str(self.merged_vars.get("releaseSuffix", "") or "") + base = str(self.merged_vars.get("base", "") or "") + rel = str(self.merged_vars.get("release", "") or "") + tag = f"{rp}{base}-{rel}{rs}" if (base and rel) else "" + parsed = urlparse(url) + filename = os.path.basename(parsed.path) if parsed and parsed.path else "" + if owner and repo and tag and filename: + display_ref = f"{owner}/{repo}@{tag} · {filename}" + elif filename: + display_ref = filename + else: + display_ref = url + else: + display_ref = "" + ref_short = display_ref if not isinstance(display_ref, str) else (display_ref[:60] + ("..." if len(display_ref) > 60 else "")) + sel = ">" if i == self.sidx else " " + self.stdscr.addstr(3+i, 0, f"{sel} {name:<20} {fetcher:<7} ref={ref_short}"[:w-1]) + # Footer instructions + self.stdscr.addstr(h-4, 0, "Enter: component actions | r: refresh candidates | h: prefetch hash | e: edit field | s: save | Backspace: back | q: quit") + self.draw_status(h, w) + self.stdscr.refresh() + + ch = self.stdscr.getch() + if ch in (ord('q'), 27): + return None + elif ch == curses.KEY_BACKSPACE or ch == 127: + return "reload" + elif ch in (curses.KEY_LEFT, ord('h')): + self.vidx = max(0, self.vidx-1) + self.select_variant() + elif ch in (curses.KEY_RIGHT, ord('l')): + self.vidx = min(len(self.variants)-1, self.vidx+1) + self.select_variant() + elif ch in (curses.KEY_UP, ord('k')): + self.sidx = max(0, self.sidx-1) + elif ch in (curses.KEY_DOWN, ord('j')): + self.sidx = min(len(self.snames)-1, self.sidx+1) + elif ch in (ord('r'),): + if self.snames: + name = self.snames[self.sidx] + self.fetch_candidates_for(name) + cand = self.candidates.get(name, {}) + lines = [ + f"Candidates for {name}:", + f" latest release: {cand.get('release') or '-'}", + f" latest tag : {cand.get('tag') or '-'}", + f" latest commit : {cand.get('commit') or '-'}", + ] + show_popup(self.stdscr, lines) + elif ch in (ord('i'),): + # Show full rendered URL for URL-based sources + if self.snames: + name = self.snames[self.sidx] + comp = self.merged_srcs[name] + if comp.get("fetcher", "none") == "url": + rendered = render_templates(comp, self.merged_vars) + url = rendered.get("url") or rendered.get("urlTemplate") or "" + if url: + show_popup(self.stdscr, ["Full URL:", url]) + else: + self.set_status("No URL available") + elif ch in (ord('h'),): + if self.snames: + name = self.snames[self.sidx] + sri = self.prefetch_hash_for(name) + if sri: + ts = self.target_dict.setdefault("sources", {}) + compw = ts.setdefault(name, {}) + compw["hash"] = sri + self.set_status(f"{name}: updated hash") + else: + self.set_status(f"{name}: hash prefetch failed") + elif ch in (ord('e'),): + s = prompt_input(self.stdscr, "Edit path=value (relative to selected base/variant): ") + if s: + if "=" not in s: + self.set_status("Invalid input, expected key.path=value") + else: + k, v = s.split("=", 1) + path = [p for p in k.split(".") if p] + deep_set(self.cursor, path, v) + self.set_status(f"Set {k}={v}") + elif ch in (ord('s'),): + try: + self.save() + self.set_status("Saved.") + except Exception as e: + self.set_status(f"Save failed: {e}") + elif ch in (curses.KEY_ENTER, 10, 13): + if not self.snames: + continue + name = self.snames[self.sidx] + comp = self.merged_srcs[name] + fetcher = comp.get("fetcher", "none") + if fetcher in ("github", "git"): + # Ensure candidates loaded + if name not in self.candidates: + self.fetch_candidates_for(name) + cand = self.candidates.get(name, {}) + # Present small menu + items = [] + if fetcher == "github": + items = [ + ("Use latest release (tag)", ("release", cand.get("release"))), + ("Use latest tag", ("tag", cand.get("tag"))), + ("Use latest commit (rev)", ("commit", cand.get("commit"))), + ("Recompute hash", ("hash", None)), + ("Cancel", ("cancel", None)), + ] + else: + items = [ + ("Use latest commit (rev)", ("commit", cand.get("commit"))), + ("Recompute hash", ("hash", None)), + ("Cancel", ("cancel", None)), + ] + choice = select_menu(self.stdscr, f"Actions for {name}", [label for label, _ in items]) + if choice is not None: + kind, val = items[choice][1] + if kind in ("release", "tag", "commit"): + if val: + self.set_ref(name, kind, val) + # update hash + sri = self.prefetch_hash_for(name) + if sri: + ts = self.target_dict.setdefault("sources", {}) + compw = ts.setdefault(name, {}) + compw["hash"] = sri + self.set_status(f"{name}: set {kind} and updated hash") + else: + self.set_status(f"No candidate {kind}") + elif kind == "hash": + sri = self.prefetch_hash_for(name) + if sri: + ts = self.target_dict.setdefault("sources", {}) + compw = ts.setdefault(name, {}) + compw["hash"] = sri + self.set_status(f"{name}: updated hash") + else: + self.set_status("hash prefetch failed") + else: + pass + elif fetcher == "url": + # Offer latest release update (for proton-cachyos-like schemas) and/or hash recompute + cand = self.url_candidates.get(name) + menu_items: List[Tuple[str, Tuple[str, Optional[Dict[str, str]]]]] = [] + if cand and cand.get("base") and cand.get("release"): + menu_items.append(("Use latest release (update variables.base/release)", ("update_vars", cand))) + menu_items.append(("Recompute hash (prefetch)", ("hash", None))) + menu_items.append(("Cancel", ("cancel", None))) + + choice = select_menu(self.stdscr, f"Actions for {name}", [label for label, _ in menu_items]) + if choice is not None: + kind, payload = menu_items[choice][1] + if kind == "update_vars" and isinstance(payload, dict): + # Write variables into selected base/variant dict + vars_dict = self.target_dict.setdefault("variables", {}) + vars_dict["base"] = payload["base"] + vars_dict["release"] = payload["release"] + # Recompute merged view to reflect new variables + self.recompute_view() + # Prefetch and update hash + sri = self.prefetch_hash_for(name) + if sri: + ts = self.target_dict.setdefault("sources", {}) + compw = ts.setdefault(name, {}) + compw["hash"] = sri + self.set_status(f"{name}: updated to {payload['base']}.{payload['release']} and refreshed hash") + else: + self.set_status("hash prefetch failed after variable update") + elif kind == "hash": + sri = self.prefetch_hash_for(name) + if sri: + ts = self.target_dict.setdefault("sources", {}) + compw = ts.setdefault(name, {}) + compw["hash"] = sri + self.set_status(f"{name}: updated hash") + else: + self.set_status("hash prefetch failed") + else: + pass + else: + show_popup(self.stdscr, [f"{name}: fetcher={fetcher}", "Use 'e' to edit fields manually."]) + else: + pass + +def select_menu(stdscr, title: str, options: List[str]) -> Optional[int]: + idx = 0 + while True: + stdscr.clear() + h, w = stdscr.getmaxyx() + stdscr.addstr(0, 0, title[:w-1]) + for i, opt in enumerate(options[:h-3], start=0): + sel = ">" if i == idx else " " + stdscr.addstr(2+i, 0, f"{sel} {opt}"[:w-1]) + stdscr.addstr(h-1, 0, "Enter: select | Backspace: cancel") + stdscr.refresh() + ch = stdscr.getch() + if ch in (curses.KEY_UP, ord('k')): + idx = max(0, idx-1) + elif ch in (curses.KEY_DOWN, ord('j')): + idx = min(len(options)-1, idx+1) + elif ch in (curses.KEY_ENTER, 10, 13): + return idx + elif ch == curses.KEY_BACKSPACE or ch == 127 or ch == 27: + return None + +# ------------------------------ main ------------------------------ + +def main(stdscr): + curses.curs_set(0) + stdscr.nodelay(False) + try: + screen = PackagesScreen(stdscr) + screen.run() + except Exception: + curses.endwin() + traceback.print_exc() + sys.exit(1) + +if __name__ == "__main__": + curses.wrapper(main)