#!/usr/bin/env python3 """ Interactive TUI for browsing and updating unified version.json files. Features: - Scans packages/**/version.json and lists all packages - Per-package view: - Choose base or any variant - List all sources/components with current ref (tag/rev/url/version) and hash - For GitHub sources: fetch candidates (latest release tag, latest tag, latest commit) - For Git sources: fetch latest commit (HEAD) - For URL sources: recompute hash (url/urlTemplate with rendered variables) - Actions on a component: - Update to one of the candidates (sets tag or rev) and optionally re-hash - Recompute hash (prefetch) - Edit any field via path=value (e.g., variables.version=2025.07) - Writes changes back to version.json Dependencies: - Standard library + external CLI tools: - nix-prefetch-url (or `nix prefetch-url`) and `nix hash to-sri` - nix-prefetch-git - git - Optional: GITHUB_TOKEN env var to increase GitHub API rate limits Usage: scripts/version_tui.py Controls: - Up/Down to navigate lists - Enter to select - Backspace to go back - q to quit - On component screen: r = refresh candidates h = recompute hash (prefetch) e = edit arbitrary field (path=value) s = save to disk """ import curses import json import os import re import subprocess import sys import traceback import urllib.request import urllib.error import urllib.parse from urllib.parse import urlparse from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union ROOT = Path(__file__).resolve().parents[1] PKGS_DIR = ROOT / "packages" Json = Dict[str, Any] # ------------------------------ Utilities ------------------------------ def eprintln(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) def load_json(path: Path) -> Json: with path.open("r", encoding="utf-8") as f: return json.load(f) def save_json(path: Path, data: Json): tmp = path.with_suffix(".tmp") with tmp.open("w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) f.write("\n") tmp.replace(path) def render_templates(value: Any, variables: Dict[str, Any]) -> Any: if isinstance(value, str): def repl(m): name = m.group(1) return str(variables.get(name, m.group(0))) return re.sub(r"\$\{([^}]+)\}", repl, value) elif isinstance(value, dict): return {k: render_templates(v, variables) for k, v in value.items()} elif isinstance(value, list): return [render_templates(v, variables) for v in value] return value def deep_set(o: Json, path: List[str], value: Any): cur = o for p in path[:-1]: if p not in cur or not isinstance(cur[p], dict): cur[p] = {} cur = cur[p] cur[path[-1]] = value # ------------------------------ Merge helpers (match lib/versioning.nix) ------------------------------ def deep_merge(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]: out = dict(a) for k, v in b.items(): if k in out and isinstance(out[k], dict) and isinstance(v, dict): out[k] = deep_merge(out[k], v) else: out[k] = v return out def merge_sources( base_sources: Dict[str, Any], overrides: Dict[str, Any] ) -> Dict[str, Any]: names = set(base_sources.keys()) | set(overrides.keys()) result: Dict[str, Any] = {} for n in names: if n in base_sources and n in overrides: if isinstance(base_sources[n], dict) and isinstance(overrides[n], dict): result[n] = deep_merge(base_sources[n], overrides[n]) else: result[n] = overrides[n] elif n in overrides: result[n] = overrides[n] else: result[n] = base_sources[n] return result def merged_view( spec: Json, variant_name: Optional[str] ) -> Tuple[Dict[str, Any], Dict[str, Any], Json]: """ Returns (merged_variables, merged_sources, target_dict_to_write) merged_* are used for display/prefetch; target_dict_to_write is where updates must be written (base or selected variant). """ base_vars = spec.get("variables", {}) or {} base_sources = spec.get("sources", {}) or {} if variant_name: vdict = spec.get("variants", {}).get(variant_name) if not isinstance(vdict, dict): raise ValueError(f"Variant '{variant_name}' not found") v_vars = vdict.get("variables", {}) or {} v_sources = vdict.get("sources", {}) or {} merged_vars = dict(base_vars) merged_vars.update(v_vars) merged_srcs = merge_sources(base_sources, v_sources) return merged_vars, merged_srcs, vdict else: return dict(base_vars), dict(base_sources), spec def run_cmd(args: List[str]) -> Tuple[int, str, str]: try: p = subprocess.run( args, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False ) return p.returncode, p.stdout.strip(), p.stderr.strip() except Exception as e: return 1, "", str(e) def run_get_stdout(args: List[str]) -> Optional[str]: code, out, err = run_cmd(args) if code != 0: eprintln(f"Command failed: {' '.join(args)}\n{err}") return None return out def _nix_fakeHash_build(expr: str) -> Optional[str]: """ Run `nix build --impure --expr expr` with lib.fakeHash and parse the correct hash from the 'got:' line in nix's error output. Returns the SRI hash string, or None on failure. """ p = subprocess.run( ["nix", "build", "--impure", "--expr", expr], text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) m = re.search(r"got:\s+(sha256-[A-Za-z0-9+/=]+)", p.stderr) if m: return m.group(1) eprintln(f"nix fakeHash build failed:\n{p.stderr[-600:]}") return None def nix_prefetch_github( owner: str, repo: str, rev: str, submodules: bool = False ) -> Optional[str]: """ Compute the hash that pkgs.fetchFromGitHub expects for the given revision. Uses nix build with lib.fakeHash to get the exact NAR hash of the unpacked tarball, which is what Nix stores and validates against. """ sub = "true" if submodules else "false" expr = ( f"let pkgs = import {{}};\n" f"in pkgs.fetchFromGitHub {{\n" f' owner = "{owner}";\n' f' repo = "{repo}";\n' f' rev = "{rev}";\n' f" fetchSubmodules = {sub};\n" f" hash = pkgs.lib.fakeHash;\n" f"}}" ) return _nix_fakeHash_build(expr) def nix_prefetch_url(url: str) -> Optional[str]: """ Compute the flat (non-unpacked) hash for a fetchurl source. Uses nix store prefetch-file which gives the same hash as pkgs.fetchurl. Do NOT use this for fetchFromGitHub or fetchzip — those need the NAR hash of the unpacked content (use nix_prefetch_github or nix_prefetch_fetchzip). """ out = run_get_stdout( ["nix", "store", "prefetch-file", "--hash-type", "sha256", "--json", url] ) if out is not None and out.strip(): try: data = json.loads(out) if "hash" in data: return data["hash"] except Exception: pass # Fallback to legacy nix-prefetch-url out = run_get_stdout(["nix-prefetch-url", "--type", "sha256", url]) if out is None: out = run_get_stdout(["nix-prefetch-url", url]) if out is None: return None sri = run_get_stdout(["nix", "hash", "to-sri", "--type", "sha256", out.strip()]) return sri def nix_prefetch_fetchzip(url: str, strip_root: bool = True) -> Optional[str]: """ Compute the hash that pkgs.fetchzip expects for the given URL. fetchzip hashes the NAR of the UNPACKED archive, which differs from the flat hash of the zip file itself. Uses nix build with lib.fakeHash. """ strip_attr = "true" if strip_root else "false" expr = ( f"let pkgs = import {{}};\n" f"in pkgs.fetchzip {{\n" f' url = "{url}";\n' f" stripRoot = {strip_attr};\n" f" hash = pkgs.lib.fakeHash;\n" f"}}" ) return _nix_fakeHash_build(expr) def nix_prefetch_git(url: str, rev: str) -> Optional[str]: """ Compute the hash that pkgs.fetchgit expects. Uses nix-prefetch-git as the primary method (reliable for both commit SHAs and tag names). Falls back to builtins.fetchGit + nix hash path for commit SHAs when nix-prefetch-git is unavailable. """ # Primary: nix-prefetch-git (works for both commit SHAs and tag names) out = run_get_stdout(["nix-prefetch-git", "--no-deepClone", "--rev", rev, url]) if out is not None: base32 = None try: data = json.loads(out) base32 = data.get("sha256") or data.get("hash") except Exception: lines = [l for l in out.splitlines() if l.strip()] if lines: base32 = lines[-1].strip() if base32: sri = run_get_stdout(["nix", "hash", "to-sri", "--type", "sha256", base32]) if sri: return sri # Fallback: builtins.fetchGit + nix hash path (commit SHA only — tag names fail) # Only attempt if rev looks like a commit SHA (40 hex chars) if re.match(r"^[0-9a-f]{40}$", rev): expr = f'builtins.fetchGit {{ url = "{url}"; rev = "{rev}"; }}' store_path = run_get_stdout(["nix", "eval", "--raw", "--expr", expr]) if store_path is not None and store_path.strip(): hash_out = run_get_stdout( ["nix", "hash", "path", "--type", "sha256", store_path.strip()] ) if hash_out is not None and hash_out.strip(): return hash_out.strip() return None def nix_prefetch_cargo_vendor( fetcher: str, src_hash: str, *, url: str = "", owner: str = "", repo: str = "", rev: str = "", subdir: str = "", ) -> Optional[str]: """ Compute the cargo vendor hash for a Rust source using nix build + fakeHash. Builds rustPlatform.fetchCargoVendor with lib.fakeHash, parses the correct hash from the 'got:' line in nix's error output. Args: fetcher: "github" or "git" src_hash: SRI hash of the source (already known) url: git URL (for "git" fetcher) owner/repo: GitHub owner and repo (for "github" fetcher) rev: tag or commit rev subdir: optional subdirectory within the source that contains Cargo.lock Returns: SRI hash string, or None on failure. """ if fetcher == "github" and owner and repo and rev and src_hash: src_expr = ( f'pkgs.fetchFromGitHub {{ owner = "{owner}"; repo = "{repo}";' f' rev = "{rev}"; hash = "{src_hash}"; }}' ) elif fetcher == "git" and url and rev and src_hash: # For GitHub git URLs, fetchFromGitHub is more reliable than fetchgit parsed = urlparse(url) parts = [p for p in parsed.path.split("/") if p] if parsed.hostname in ("github.com",) and len(parts) >= 2: gh_owner, gh_repo = parts[0], parts[1] src_expr = ( f'pkgs.fetchFromGitHub {{ owner = "{gh_owner}"; repo = "{gh_repo}";' f' rev = "{rev}"; hash = "{src_hash}"; }}' ) else: src_expr = f'pkgs.fetchgit {{ url = "{url}"; rev = "{rev}"; hash = "{src_hash}"; }}' else: return None subdir_attr = f'sourceRoot = "${{src.name}}/{subdir}";' if subdir else "" expr = ( f"let pkgs = import {{}};\n" f" src = {src_expr};\n" f"in pkgs.rustPlatform.fetchCargoVendor {{\n" f" inherit src;\n" f" {subdir_attr}\n" f" hash = pkgs.lib.fakeHash;\n" f"}}" ) p = subprocess.run( ["nix", "build", "--impure", "--expr", expr], text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) m = re.search(r"got:\s+(sha256-[A-Za-z0-9+/=]+)", p.stderr) if m: return m.group(1) eprintln(f"nix_prefetch_cargo_vendor failed:\n{p.stderr[-600:]}") return None def http_get_json(url: str, token: Optional[str] = None) -> Any: try: req = urllib.request.Request( url, headers={"Accept": "application/vnd.github+json"} ) if token: req.add_header("Authorization", f"Bearer {token}") with urllib.request.urlopen(req, timeout=10) as resp: if resp.status != 200: return None return json.loads(resp.read().decode("utf-8")) except urllib.error.HTTPError as e: eprintln(f"HTTP error for {url}: {e.code} {e.reason}") return None except Exception as e: eprintln(f"Request failed for {url}: {e}") return None def http_get_text(url: str) -> Optional[str]: try: # Provide a basic User-Agent to avoid some hosts rejecting the request req = urllib.request.Request(url, headers={"User-Agent": "version-tui/1.0"}) with urllib.request.urlopen(req, timeout=10) as resp: if resp.status != 200: return None return resp.read().decode("utf-8") except urllib.error.HTTPError as e: eprintln(f"HTTP error for {url}: {e.code} {e.reason}") return None except Exception as e: eprintln(f"Request failed for {url}: {e}") return None def gh_latest_release(owner: str, repo: str, token: Optional[str]) -> Optional[str]: try: data = http_get_json( f"https://api.github.com/repos/{owner}/{repo}/releases/latest", token ) if not data: return None return data.get("tag_name") except Exception as e: eprintln(f"latest_release failed for {owner}/{repo}: {e}") return None def gh_latest_tag(owner: str, repo: str, token: Optional[str]) -> Optional[str]: try: data = http_get_json( f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=100", token ) if not isinstance(data, list): return None tags = [ t.get("name") for t in data if isinstance(t, dict) and "name" in t and t.get("name") is not None ] return tags[0] if tags else None except Exception as e: eprintln(f"latest_tag failed for {owner}/{repo}: {e}") return None def gh_list_tags(owner: str, repo: str, token: Optional[str]) -> List[str]: try: data = http_get_json( f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=100", token ) return [ str(t.get("name")) for t in data if isinstance(t, dict) and "name" in t and t.get("name") is not None ] except Exception as e: eprintln(f"list_tags failed for {owner}/{repo}: {e}") return [] def gh_head_commit( owner: str, repo: str, branch: Optional[str] = None ) -> Optional[str]: """Return the latest commit SHA for a GitHub repo, optionally restricted to a branch.""" try: ref = f"refs/heads/{branch}" if branch else "HEAD" out = run_get_stdout( ["git", "ls-remote", f"https://github.com/{owner}/{repo}.git", ref] ) if not out: return None # ls-remote can return multiple lines; take the first match for line in out.splitlines(): parts = line.split() if parts: return parts[0] return None except Exception as e: eprintln(f"head_commit failed for {owner}/{repo} (branch={branch}): {e}") return None def _iso_to_date(iso: str) -> str: """Convert an ISO-8601 timestamp like '2026-03-02T13:32:38Z' to 'YYYY-MM-DD'.""" if iso and len(iso) >= 10: return iso[:10] return "" def gh_ref_date(owner: str, repo: str, ref: str, token: Optional[str]) -> str: """ Return the committer date (YYYY-MM-DD) for any ref on a GitHub repo. Works for commit SHAs, tag names, and branch names. Returns empty string on failure. """ try: data = http_get_json( f"https://api.github.com/repos/{owner}/{repo}/commits/{urllib.parse.quote(ref, safe='')}", token, ) if not isinstance(data, dict): return "" iso = ( data.get("commit", {}).get("committer", {}).get("date") or data.get("commit", {}).get("author", {}).get("date") or "" ) return _iso_to_date(iso) except Exception: return "" def gh_release_date(owner: str, repo: str, tag: str, token: Optional[str]) -> str: """ Return the published date (YYYY-MM-DD) for a GitHub release by tag name. Falls back to gh_ref_date if the release is not found. """ try: data = http_get_json( f"https://api.github.com/repos/{owner}/{repo}/releases/tags/{urllib.parse.quote(tag, safe='')}", token, ) if isinstance(data, dict): iso = data.get("published_at") or data.get("created_at") or "" if iso: return _iso_to_date(iso) except Exception: pass return gh_ref_date(owner, repo, tag, token) def git_commit_date(url: str, sha: str) -> str: """ Return the committer date (YYYY-MM-DD) for a commit SHA on a plain git repo. Only works for GitHub URLs (uses the REST API). Returns '' for others. """ try: parsed = urlparse(url) if parsed.hostname != "github.com": return "" parts = [p for p in parsed.path.split("/") if p] if len(parts) < 2: return "" owner = parts[0] repo = parts[1].removesuffix(".git") # strip .git suffix if present return gh_ref_date(owner, repo, sha, None) except Exception: return "" def git_branch_commit(url: str, branch: Optional[str] = None) -> Optional[str]: """Return the latest commit SHA for a git URL, optionally restricted to a branch.""" try: ref = f"refs/heads/{branch}" if branch else "HEAD" out = run_get_stdout(["git", "ls-remote", url, ref]) if not out: return None for line in out.splitlines(): parts = line.split() if parts: return parts[0] return None except Exception as e: eprintln(f"git_branch_commit failed for {url} (branch={branch}): {e}") return None def gh_tarball_url(owner: str, repo: str, ref: str) -> str: return f"https://codeload.github.com/{owner}/{repo}/tar.gz/{ref}" def gh_release_tags_api(owner: str, repo: str, token: Optional[str]) -> List[str]: """ Return recent release tag names for a repo using GitHub API. """ try: data = http_get_json( f"https://api.github.com/repos/{owner}/{repo}/releases?per_page=50", token ) return [ str(r.get("tag_name")) for r in data if isinstance(r, dict) and "tag_name" in r and r.get("tag_name") is not None ] except Exception as e: eprintln(f"releases list failed for {owner}/{repo}: {e}") return [] # ------------------------------ Data scanning ------------------------------ def find_packages() -> List[Tuple[str, Path, bool, bool]]: results = [] # Find regular packages with version.json for p in PKGS_DIR.rglob("version.json"): # name is directory name under packages (e.g., raspberrypi/linux-rpi => raspberrypi/linux-rpi) rel = p.relative_to(PKGS_DIR).parent results.append( (str(rel), p, False, False) ) # (name, path, is_python, is_homeassistant) # Find Python packages with default.nix python_dir = PKGS_DIR / "python" if python_dir.exists(): for pkg_dir in python_dir.iterdir(): if pkg_dir.is_dir(): nix_file = pkg_dir / "default.nix" if nix_file.exists(): # name is python/package-name rel = pkg_dir.relative_to(PKGS_DIR) results.append( (str(rel), nix_file, True, False) ) # (name, path, is_python, is_homeassistant) # Find Home Assistant components with default.nix homeassistant_dir = PKGS_DIR / "homeassistant" if homeassistant_dir.exists(): for pkg_dir in homeassistant_dir.iterdir(): if pkg_dir.is_dir(): nix_file = pkg_dir / "default.nix" if nix_file.exists(): # Only treat as an HA component if it uses buildHomeAssistantComponent; # otherwise fall through to Python package handling. try: nix_content = nix_file.read_text(encoding="utf-8") except Exception: nix_content = "" rel = pkg_dir.relative_to(PKGS_DIR) if "buildHomeAssistantComponent" in nix_content: results.append( (str(rel), nix_file, False, True) ) # (name, path, is_python, is_homeassistant) else: # Treat as a Python package instead results.append((str(rel), nix_file, True, False)) results.sort() return results def _extract_brace_block(content: str, keyword: str) -> Optional[str]: """ Find 'keyword {' in content and return the text between the matching braces, handling nested braces (e.g. ${var} inside strings). Returns None if not found. """ idx = content.find(keyword + " {") if idx == -1: idx = content.find(keyword + "{") if idx == -1: return None start = content.find("{", idx + len(keyword)) if start == -1: return None depth = 0 for i in range(start, len(content)): c = content[i] if c == "{": depth += 1 elif c == "}": depth -= 1 if depth == 0: return content[start + 1 : i] return None def _resolve_nix_str(value: str, pname: str, version: str) -> str: """Resolve simple Nix string interpolations like ${pname} and ${version}.""" value = re.sub(r"\$\{pname\}", pname, value) value = re.sub(r"\$\{version\}", version, value) return value def _extract_nix_attr(block: str, attr: str) -> str: """ Extract attribute value from a Nix attribute set block. Handles: attr = "quoted string"; attr = unquoted_ident; Returns empty string if not found. """ # Quoted string value m = re.search(rf'\b{attr}\s*=\s*"([^"]*)"', block) if m: return m.group(1) # Unquoted identifier (e.g. repo = pname;) m = re.search(rf"\b{attr}\s*=\s*([A-Za-z_][A-Za-z0-9_-]*)\s*;", block) if m: return m.group(1) return "" def parse_python_package(path: Path) -> Dict[str, Any]: """Parse a Python package's default.nix file to extract version and source information.""" with path.open("r", encoding="utf-8") as f: content = f.read() # Extract version version_match = re.search(r'version\s*=\s*"([^"]+)"', content) version = version_match.group(1) if version_match else "" # Extract pname (package name) pname_match = re.search(r'pname\s*=\s*"([^"]+)"', content) pname = pname_match.group(1) if pname_match else "" # Create a structure similar to version.json for compatibility result: Dict[str, Any] = {"variables": {}, "sources": {}} # Only add non-empty values to variables if version: result["variables"]["version"] = version # Determine source name - use pname or derive from path source_name = pname.lower() if pname else path.parent.name.lower() # Try to extract brace-balanced fetchFromGitHub block (handles ${var} inside strings) fetch_block = _extract_brace_block(content, "fetchFromGitHub") # Check for fetchPypi pattern (simple [^}]+ is fine here as PyPI blocks lack ${}) fetch_pypi_match = re.search( r"src\s*=\s*.*fetchPypi\s*\{([^}]+)\}", content, re.DOTALL ) if fetch_block is not None: owner_raw = _extract_nix_attr(fetch_block, "owner") repo_raw = _extract_nix_attr(fetch_block, "repo") rev_raw = _extract_nix_attr(fetch_block, "rev") hash_match = re.search(r'(sha256|hash)\s*=\s*"([^"]+)"', fetch_block) hash_value = hash_match.group(2) if hash_match else "" def _resolve_nix_ident(raw: str) -> str: """Resolve unquoted Nix identifier or string-with-interpolation to its value.""" if raw == "pname": return pname if raw == "version": return version return _resolve_nix_str(raw, pname, version) owner = _resolve_nix_ident(owner_raw) repo = _resolve_nix_ident(repo_raw) rev = _resolve_nix_ident(rev_raw) # Create source entry result["sources"][source_name] = { "fetcher": "github", "owner": owner, "repo": repo, "hash": hash_value, } # Classify rev as tag or commit ref if rev: if rev.startswith("v") or "${version}" in rev_raw: result["sources"][source_name]["tag"] = rev elif rev in ("master", "main"): result["sources"][source_name]["rev"] = rev else: result["sources"][source_name]["rev"] = rev elif fetch_pypi_match: fetch_block_pypi = fetch_pypi_match.group(1) hash_match = re.search(r'(sha256|hash)\s*=\s*"([^"]+)"', fetch_block_pypi) hash_value = hash_match.group(2) if hash_match else "" # Look for GitHub info in meta section homepage_match = re.search( r'homepage\s*=\s*"https://github\.com/([^/]+)/([^"]+)"', content ) if homepage_match: owner = homepage_match.group(1) repo = homepage_match.group(2) result["sources"][source_name] = { "fetcher": "github", "owner": owner, "repo": repo, "hash": hash_value, "pypi": True, } if version: result["sources"][source_name]["tag"] = f"v{version}" else: result["sources"][source_name] = { "fetcher": "pypi", "pname": pname, "version": version, "hash": hash_value, } else: # Fallback: scan whole file for GitHub or URL info owner_match = re.search(r'owner\s*=\s*"([^"]+)"', content) repo_match = re.search(r'repo\s*=\s*"([^"]+)"', content) rev_match = re.search(r'rev\s*=\s*"([^"]+)"', content) tag_match = re.search(r'tag\s*=\s*"([^"]+)"', content) hash_match = re.search(r'(sha256|hash)\s*=\s*"([^"]+)"', content) url_match = re.search(r'url\s*=\s*"([^"]+)"', content) homepage_match = re.search( r'homepage\s*=\s*"https://github\.com/([^/]+)/([^"]+)"', content ) owner = owner_match.group(1) if owner_match else "" repo = repo_match.group(1) if repo_match else "" rev = rev_match.group(1) if rev_match else "" tag = tag_match.group(1) if tag_match else "" hash_value = hash_match.group(2) if hash_match else "" url = url_match.group(1) if url_match else "" if homepage_match and not (owner and repo): owner = homepage_match.group(1) repo = homepage_match.group(2) if owner and repo: result["sources"][source_name] = { "fetcher": "github", "owner": owner, "repo": repo, "hash": hash_value, } if tag: result["sources"][source_name]["tag"] = tag elif rev: result["sources"][source_name]["rev"] = rev elif url: result["sources"][source_name] = { "fetcher": "url", "url": url, "hash": hash_value, } else: result["sources"][source_name] = {"fetcher": "unknown", "hash": hash_value} return result def update_python_package( path: Path, source_name: str, updates: Dict[str, Any] ) -> bool: """Update a Python package's default.nix file with new version and/or hash.""" with path.open("r", encoding="utf-8") as f: content = f.read() modified = False # Update version if provided if "version" in updates: new_version = updates["version"] content, version_count = re.subn( r'(version\s*=\s*)"([^"]+)"', f'\\1"{new_version}"', content ) if version_count > 0: modified = True # Update hash if provided if "hash" in updates: new_hash = updates["hash"] # Match both sha256 and hash attributes content, hash_count = re.subn( r'(sha256|hash)\s*=\s*"([^"]+)"', f'\\1 = "{new_hash}"', content ) if hash_count > 0: modified = True # Update tag if provided if "tag" in updates: new_tag = updates["tag"] content, tag_count = re.subn( r'(tag\s*=\s*)"([^"]+)"', f'\\1"{new_tag}"', content ) if tag_count > 0: modified = True # Update rev if provided if "rev" in updates: new_rev = updates["rev"] content, rev_count = re.subn( r'(rev\s*=\s*)"([^"]+)"', f'\\1"{new_rev}"', content ) if rev_count > 0: modified = True if modified: with path.open("w", encoding="utf-8") as f: f.write(content) return modified def parse_homeassistant_component(path: Path) -> Dict[str, Any]: """Parse a Home Assistant component's default.nix file to extract version and source information.""" with path.open("r", encoding="utf-8") as f: content = f.read() # Extract domain, version, and owner domain_match = re.search(r'domain\s*=\s*"([^"]+)"', content) version_match = re.search(r'version\s*=\s*"([^"]+)"', content) owner_match = re.search(r'owner\s*=\s*"([^"]+)"', content) domain = domain_match.group(1) if domain_match else "" version = version_match.group(1) if version_match else "" owner = owner_match.group(1) if owner_match else "" # Extract GitHub repo info repo_match = re.search(r'repo\s*=\s*"([^"]+)"', content) rev_match = re.search(r'rev\s*=\s*"([^"]+)"', content) tag_match = re.search(r'tag\s*=\s*"([^"]+)"', content) hash_match = re.search(r'(sha256|hash)\s*=\s*"([^"]+)"', content) repo = repo_match.group(1) if repo_match else "" rev = rev_match.group(1) if rev_match else "" tag = tag_match.group(1) if tag_match else "" hash_value = hash_match.group(2) if hash_match else "" # Create a structure similar to version.json for compatibility result = {"variables": {}, "sources": {}} # Only add non-empty values to variables if version: result["variables"]["version"] = version if domain: result["variables"]["domain"] = domain # Determine source name - use domain or directory name source_name = domain if domain else path.parent.name.lower() # Handle GitHub sources if owner: repo_name = repo if repo else source_name result["sources"][source_name] = { "fetcher": "github", "owner": owner, "repo": repo_name, } # Only add non-empty values if hash_value: result["sources"][source_name]["hash"] = hash_value # Handle tag or rev; resolve ${version} references if tag: result["sources"][source_name]["tag"] = _resolve_nix_str(tag, "", version) elif rev: rev_resolved = _resolve_nix_str(rev, "", version) # If rev was a ${version} template or equals version, treat as tag if "${version}" in rev or rev_resolved == version: result["sources"][source_name]["tag"] = rev_resolved else: result["sources"][source_name]["rev"] = rev_resolved elif version: # fallback: use version as tag result["sources"][source_name]["tag"] = version else: # Fallback for components with no clear source info result["sources"][source_name] = {"fetcher": "unknown"} if hash_value: result["sources"][source_name]["hash"] = hash_value return result def update_homeassistant_component( path: Path, source_name: str, updates: Dict[str, Any] ) -> bool: """Update a Home Assistant component's default.nix file with new version and/or hash.""" with path.open("r", encoding="utf-8") as f: content = f.read() modified = False # Update version if provided if "version" in updates: new_version = updates["version"] content, version_count = re.subn( r'(version\s*=\s*)"([^"]+)"', f'\\1"{new_version}"', content ) if version_count > 0: modified = True # Update hash if provided if "hash" in updates: new_hash = updates["hash"] # Match both sha256 and hash attributes in src = fetchFromGitHub { ... } content, hash_count = re.subn( r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(sha256|hash)\s*=\s*"([^"]+)"([^}]*\})', f'\\1\\2 = "{new_hash}"\\4', content, ) if hash_count > 0: modified = True # Update tag if provided if "tag" in updates: new_tag = updates["tag"] content, tag_count = re.subn( r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(tag|rev)\s*=\s*"([^"]+)"([^}]*\})', f'\\1\\2 = "{new_tag}"\\4', content, ) if tag_count == 0: # If no tag/rev found, try to add it content, tag_count = re.subn( r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(hash\s*=\s*"[^"]+")([^}]*\})', f'\\1\\2;\n tag = "{new_tag}"\\3', content, ) if tag_count > 0: modified = True # Update rev if provided if "rev" in updates: new_rev = updates["rev"] content, rev_count = re.subn( r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(rev|tag)\s*=\s*"([^"]+)"([^}]*\})', f'\\1\\2 = "{new_rev}"\\4', content, ) if rev_count == 0: # If no rev/tag found, try to add it content, rev_count = re.subn( r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(hash\s*=\s*"[^"]+")([^}]*\})', f'\\1\\2;\n rev = "{new_rev}"\\3', content, ) if rev_count > 0: modified = True if modified: with path.open("w", encoding="utf-8") as f: f.write(content) return modified # ------------------------------ Display helpers ------------------------------ def source_display_ref(comp: Dict[str, Any], merged_vars: Dict[str, Any]) -> str: """ Build a concise human-readable reference string for a source component. Rules per fetcher: github -> owner/repo@tag or owner/repo@rev[:7] (fully rendered) git -> tag-or-rev[:12] (commit SHAs truncated) url -> owner/repo@release-tag · filename (when vars are resolved) filename only (when no release tag) urlTemplate pattern (when vars still unresolved) none -> version field or empty """ fetcher = comp.get("fetcher", "none") rendered = render_templates(comp, merged_vars) if fetcher == "github": tag = rendered.get("tag") or "" rev = rendered.get("rev") or "" owner = rendered.get("owner") or str(merged_vars.get("owner") or "") repo = rendered.get("repo") or str(merged_vars.get("repo") or "") if tag and owner and repo: return f"{owner}/{repo}@{tag}" if tag: return tag if rev and owner and repo: return f"{owner}/{repo}@{rev[:7]}" if rev: return rev[:12] return "" if fetcher == "git": ref = rendered.get("tag") or rendered.get("rev") or comp.get("version") or "" # Truncate bare commit SHAs to 12 chars; keep short tags intact if len(ref) == 40 and all(c in "0123456789abcdef" for c in ref): return ref[:12] return ref if fetcher == "url": url = rendered.get("url") or rendered.get("urlTemplate") or "" if not url: return "" # If the rendered URL still contains unresolved ${…} templates, show the # filename portion with remaining placeholders rendered as so the # user sees a meaningful pattern rather than literal '${base}' strings. if "${" in url: tmpl = comp.get("urlTemplate") or comp.get("url") or url filename = os.path.basename(urlparse(tmpl).path) if tmpl else tmpl # Replace remaining ${var} with for readability filename = re.sub(r"\$\{([^}]+)\}", r"<\1>", filename) return filename owner = str(merged_vars.get("owner") or "") repo = str(merged_vars.get("repo") or "") rp = str(merged_vars.get("releasePrefix") or "") rs = str(merged_vars.get("releaseSuffix") or "") base = str(merged_vars.get("base") or "") rel = str(merged_vars.get("release") or "") tag = f"{rp}{base}-{rel}{rs}" if (base and rel) else "" filename = os.path.basename(urlparse(url).path) if url else "" if owner and repo and tag and filename: return f"{owner}/{repo}@{tag} · {filename}" if filename: return filename return url # none / pypi / unknown – fall back to version or empty return str(comp.get("version") or comp.get("tag") or comp.get("rev") or "") # ------------------------------ TUI helpers ------------------------------ # Define color pairs COLOR_NORMAL = 1 COLOR_HIGHLIGHT = 2 COLOR_HEADER = 3 COLOR_STATUS = 4 COLOR_ERROR = 5 COLOR_SUCCESS = 6 COLOR_BORDER = 7 COLOR_TITLE = 8 COLOR_DIM = 9 # muted text used for dates / secondary info def init_colors(): """Initialize color pairs for the TUI.""" curses.start_color() curses.use_default_colors() # Define color pairs curses.init_pair(COLOR_NORMAL, curses.COLOR_WHITE, -1) curses.init_pair(COLOR_HIGHLIGHT, curses.COLOR_BLACK, curses.COLOR_CYAN) curses.init_pair(COLOR_HEADER, curses.COLOR_CYAN, -1) curses.init_pair(COLOR_STATUS, curses.COLOR_YELLOW, -1) curses.init_pair(COLOR_ERROR, curses.COLOR_RED, -1) curses.init_pair(COLOR_SUCCESS, curses.COLOR_GREEN, -1) curses.init_pair(COLOR_BORDER, curses.COLOR_BLUE, -1) curses.init_pair(COLOR_TITLE, curses.COLOR_MAGENTA, -1) # Dim colour for secondary info like dates (white + A_DIM applied at render time) curses.init_pair(COLOR_DIM, curses.COLOR_WHITE, -1) def draw_border(win, y, x, h, w): """Draw a border around a region of the window.""" # Draw corners win.addch(y, x, curses.ACS_ULCORNER, curses.color_pair(COLOR_BORDER)) win.addch(y, x + w - 1, curses.ACS_URCORNER, curses.color_pair(COLOR_BORDER)) win.addch(y + h - 1, x, curses.ACS_LLCORNER, curses.color_pair(COLOR_BORDER)) # Draw bottom-right corner safely try: win.addch( y + h - 1, x + w - 1, curses.ACS_LRCORNER, curses.color_pair(COLOR_BORDER) ) except curses.error: # This is expected when trying to write to the bottom-right corner pass # Draw horizontal lines for i in range(1, w - 1): win.addch(y, x + i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER)) win.addch(y + h - 1, x + i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER)) # Draw vertical lines for i in range(1, h - 1): win.addch(y + i, x, curses.ACS_VLINE, curses.color_pair(COLOR_BORDER)) win.addch(y + i, x + w - 1, curses.ACS_VLINE, curses.color_pair(COLOR_BORDER)) class ScreenBase: def __init__(self, stdscr): self.stdscr = stdscr self.status = "" self.status_type = "normal" # "normal", "error", "success" def draw_status(self, height, width): if self.status: color = COLOR_STATUS if self.status_type == "error": color = COLOR_ERROR elif self.status_type == "success": color = COLOR_SUCCESS self.stdscr.addstr( height - 1, 0, self.status[: max(0, width - 1)], curses.color_pair(color), ) # else: leave the bottom line blank when no status message def set_status(self, text: str, status_type="normal"): self.status = text self.status_type = status_type def run(self): raise NotImplementedError def prompt_input(stdscr, prompt: str) -> Optional[str]: curses.echo() stdscr.addstr(prompt, curses.color_pair(COLOR_HEADER)) stdscr.clrtoeol() s = stdscr.getstr().decode("utf-8") curses.noecho() return s def show_popup(stdscr, lines: List[str], title: str = ""): h, w = stdscr.getmaxyx() box_h = min(len(lines) + 4, h - 2) box_w = min(max(max(len(l) for l in lines), len(title)) + 6, w - 2) top = (h - box_h) // 2 left = (w - box_w) // 2 win = curses.newwin(box_h, box_w, top, left) # Draw fancy border draw_border(win, 0, 0, box_h, box_w) # Add title if provided if title: title_x = (box_w - len(title)) // 2 win.addstr(0, title_x, f" {title} ", curses.color_pair(COLOR_TITLE)) # Add content for i, line in enumerate(lines, start=1): if i >= box_h - 1: break win.addstr(i, 2, line[: box_w - 4], curses.color_pair(COLOR_NORMAL)) # Add footer footer = "Press any key to continue" footer_x = (box_w - len(footer)) // 2 win.addstr(box_h - 1, footer_x, footer, curses.color_pair(COLOR_STATUS)) win.refresh() win.getch() # ------------------------------ Screens ------------------------------ class PackagesScreen(ScreenBase): def __init__(self, stdscr): super().__init__(stdscr) self.packages = find_packages() self.idx = 0 self.filter_mode = "all" # "all", "regular", "python" self.scroll_offset = 0 # Add scroll offset to handle long lists def run(self): while True: self.stdscr.clear() h, w = self.stdscr.getmaxyx() # Determine split layout left_w = max(30, min(60, w // 3)) right_x = left_w + 1 right_w = max(0, w - right_x) # Draw borders for left and right panes draw_border(self.stdscr, 0, 0, h - 1, left_w) if right_w >= 20: draw_border(self.stdscr, 0, right_x, h - 1, right_w) # Filter packages based on mode if self.filter_mode == "regular": filtered_packages = [p for p in self.packages if not p[2] and not p[3]] elif self.filter_mode == "python": filtered_packages = [p for p in self.packages if p[2]] elif self.filter_mode == "homeassistant": filtered_packages = [p for p in self.packages if p[3]] else: filtered_packages = self.packages # Left pane title with count and active filter count = len(filtered_packages) if self.filter_mode == "regular": title = f"Packages [{count}] f:filter" elif self.filter_mode == "python": title = f"Python [{count}] f:filter" elif self.filter_mode == "homeassistant": title = f"Home Assistant [{count}] f:filter" else: title = f"All Packages [{count}] f:filter" title_x = max(1, (left_w - len(title)) // 2) self.stdscr.addstr( 0, title_x, f" {title} ", curses.color_pair(COLOR_TITLE) | curses.A_BOLD ) # Implement scrolling for long lists max_rows = h - 3 total_packages = len(filtered_packages) # Adjust scroll offset if needed if self.idx >= self.scroll_offset + max_rows: self.scroll_offset = self.idx - max_rows + 1 elif self.idx < self.scroll_offset: self.scroll_offset = self.idx # Display visible packages with scroll offset visible_packages = filtered_packages[ self.scroll_offset : self.scroll_offset + max_rows ] # Show scroll indicators if needed if self.scroll_offset > 0: self.stdscr.addstr(1, left_w - 3, "↑", curses.color_pair(COLOR_STATUS)) if self.scroll_offset + max_rows < total_packages: self.stdscr.addstr( min(1 + len(visible_packages), h - 2), left_w - 3, "↓", curses.color_pair(COLOR_STATUS), ) for i, (name, _path, is_python, is_homeassistant) in enumerate( visible_packages, start=0 ): # Use consistent display style for all packages pkg_type = "" # Remove the [Py] prefix for consistent display # Highlight the selected item if i + self.scroll_offset == self.idx: attr = curses.color_pair(COLOR_HIGHLIGHT) sel = "►" # Use a fancier selector else: attr = curses.color_pair(COLOR_NORMAL) sel = " " # Type badge: [py] [ha] shown in a fixed column before name if is_python: badge = "[py]" elif is_homeassistant: badge = "[ha]" else: badge = " " name_col_w = max(0, left_w - 9) self.stdscr.addstr(1 + i, 2, f"{sel} {badge} {name[:name_col_w]}", attr) # Right pane: preview of selected package (non-interactive summary) if right_w >= 20 and filtered_packages: try: name, path, is_python, is_homeassistant = filtered_packages[ self.idx ] # Right pane header: package name centred type_badge = ( " [py]" if is_python else (" [ha]" if is_homeassistant else "") ) hdr = f" {name}{type_badge} " title_x = right_x + max(1, (right_w - len(hdr)) // 2) self.stdscr.addstr( 0, title_x, hdr[: max(0, right_w - 2)], curses.color_pair(COLOR_TITLE) | curses.A_BOLD, ) # Show path relative to /etc/nixos try: rel_path = str(path.relative_to(Path("/etc/nixos"))) except ValueError: rel_path = str(path) self.stdscr.addstr( 1, right_x + 2, rel_path[: max(0, right_w - 3)], curses.color_pair(COLOR_NORMAL) | curses.A_DIM, ) # Sources header self.stdscr.addstr( 2, right_x + 2, "Sources:", curses.color_pair(COLOR_HEADER) ) if is_python: spec = parse_python_package(path) elif is_homeassistant: spec = parse_homeassistant_component(path) else: spec = load_json(path) merged_vars, merged_srcs, _ = merged_view(spec, None) snames = sorted(list(merged_srcs.keys())) max_src_rows = max(0, h - 6) for i2, sname in enumerate(snames[:max_src_rows], start=0): comp = merged_srcs[sname] fetcher = comp.get("fetcher", "none") display_ref = source_display_ref(comp, merged_vars) # Column layout: name(16) fetcher(7) ref(rest) NAME_W, FETCH_W = 16, 7 ref_col = right_x + 2 + NAME_W + 1 + FETCH_W + 1 max_ref = max(0, right_w - (NAME_W + 1 + FETCH_W + 1) - 3) ref_short = display_ref[:max_ref] + ( "..." if len(display_ref) > max_ref else "" ) fetcher_color = COLOR_NORMAL if fetcher == "github": fetcher_color = COLOR_SUCCESS elif fetcher == "url": fetcher_color = COLOR_STATUS elif fetcher == "git": fetcher_color = COLOR_HEADER self.stdscr.addstr( 3 + i2, right_x + 2, f"{sname[:NAME_W]:<{NAME_W}}", curses.color_pair(COLOR_NORMAL), ) self.stdscr.addstr( 3 + i2, right_x + 2 + NAME_W + 1, f"{fetcher[:FETCH_W]:<{FETCH_W}}", curses.color_pair(fetcher_color), ) self.stdscr.addstr( 3 + i2, ref_col, ref_short[: max(0, right_w - (ref_col - right_x) - 1)], curses.color_pair(COLOR_NORMAL), ) # Hint line just above the bottom border hint = "Enter: details k/j: move f: filter q: quit" if h >= 4: hint_x = right_x + max(1, (right_w - len(hint)) // 2) self.stdscr.addstr( h - 2, hint_x, hint[: max(0, right_w - 2)], curses.color_pair(COLOR_STATUS), ) except Exception as e: self.stdscr.addstr( 2, right_x + 2, "Error:", curses.color_pair(COLOR_ERROR) ) self.stdscr.addstr( 2, right_x + 9, f"{e}"[: max(0, right_w - 11)], curses.color_pair(COLOR_ERROR), ) self.draw_status(h, w) self.stdscr.refresh() ch = self.stdscr.getch() if ch in (ord("q"), 27): # q or ESC return None elif ch in (curses.KEY_UP, ord("k")): self.idx = max(0, self.idx - 1) elif ch in (curses.KEY_DOWN, ord("j")): self.idx = min(len(filtered_packages) - 1, self.idx + 1) elif ch == curses.KEY_PPAGE: # Page Up self.idx = max(0, self.idx - (h - 4)) elif ch == curses.KEY_NPAGE: # Page Down self.idx = min(len(filtered_packages) - 1, self.idx + (h - 4)) elif ch == ord("g"): # Go to top self.idx = 0 elif ch == ord("G"): # Go to bottom self.idx = max(0, len(filtered_packages) - 1) elif ch == ord("f"): # Cycle: all -> regular -> python -> homeassistant -> all modes = ["all", "regular", "python", "homeassistant"] self.filter_mode = modes[ (modes.index(self.filter_mode) + 1) % len(modes) ] self.idx = 0 self.scroll_offset = 0 elif ch in (curses.KEY_ENTER, 10, 13): filtered_packages = self.packages if self.filter_mode == "regular": filtered_packages = [p for p in self.packages if not p[2]] elif self.filter_mode == "python": filtered_packages = [p for p in self.packages if p[2]] if not filtered_packages: continue name, path, is_python, is_homeassistant = filtered_packages[self.idx] try: if is_python: spec = parse_python_package(path) elif is_homeassistant: spec = parse_homeassistant_component(path) else: spec = load_json(path) except Exception as e: self.set_status(f"Failed to load {path}: {e}") continue screen = PackageDetailScreen( self.stdscr, name, path, spec, is_python, is_homeassistant ) ret = screen.run() if ret == "reload": # re-scan on save self.packages = find_packages() self.idx = min(self.idx, len(self.packages) - 1) else: pass class PackageDetailScreen(ScreenBase): def __init__( self, stdscr, pkg_name: str, path: Path, spec: Json, is_python: bool = False, is_homeassistant: bool = False, ): super().__init__(stdscr) self.pkg_name = pkg_name self.path = path self.spec = spec self.is_python = is_python self.is_homeassistant = is_homeassistant # Preserve JSON insertion order for variants (do not sort alphabetically) self.variants = [""] + list(self.spec.get("variants", {}).keys()) # Honour the spec's defaultVariant — pre-select it so the user lands on a # meaningful view immediately (e.g. proton-cachyos opens at cachyos-v4, not # the uninformative which has no base/release variables populated). default = self.spec.get("defaultVariant") if default and default in self.variants: self.vidx = self.variants.index(default) else: self.vidx = 0 self.gh_token = os.environ.get("GITHUB_TOKEN") self.candidates: Dict[ str, Dict[str, str] ] = {} # name -> {release, tag, commit} self.url_candidates: Dict[ str, Dict[str, str] ] = {} # name -> {base, release, tag} # initialize view self.recompute_view() def select_variant(self): # Recompute merged and target views when variant changes self.recompute_view() def recompute_view(self): # Set cursor to base or selected variant dict for manual edits if self.vidx == 0: self.cursor = self.spec variant_name = None else: variant_name = self.variants[self.vidx] self.cursor = self.spec["variants"][variant_name] # Compute merged view and target dict for writing self.merged_vars, self.merged_srcs, self.target_dict = merged_view( self.spec, variant_name ) self.snames = sorted(list(self.merged_srcs.keys())) self.sidx = 0 def fetch_candidates_for(self, name: str): comp = self.merged_srcs[name] fetcher = comp.get("fetcher", "none") branch = comp.get("branch") or None # optional branch override c: Dict[str, str] = { "release": "", "tag": "", "commit": "", "release_date": "", "tag_date": "", "commit_date": "", } if fetcher == "github": owner = comp.get("owner") repo = comp.get("repo") if owner and repo: # Only fetch release/tag candidates when not locked to a specific branch if not branch: r = gh_latest_release(owner, repo, self.gh_token) if r: c["release"] = r c["release_date"] = gh_release_date( owner, repo, r, self.gh_token ) t = gh_latest_tag(owner, repo, self.gh_token) if t: c["tag"] = t c["tag_date"] = gh_ref_date(owner, repo, t, self.gh_token) m = gh_head_commit(owner, repo, branch) if m: c["commit"] = m c["commit_date"] = gh_ref_date(owner, repo, m, self.gh_token) # Special-case raspberrypi/linux: prefer latest stable_* tag or series-specific tags # (only when not branch-locked, as branch-locked tracks a rolling branch via commit) if not branch: try: if owner == "raspberrypi" and repo == "linux": tags_all = gh_list_tags(owner, repo, self.gh_token) rendered = render_templates(comp, self.merged_vars) cur_tag = str(rendered.get("tag") or "") # If current tag uses stable_YYYYMMDD scheme, pick latest stable_* tag if cur_tag.startswith("stable_"): stable_tags = sorted( [ x for x in tags_all if re.match(r"^stable_\d{8}$", x) ], reverse=True, ) if stable_tags: new_tag = stable_tags[0] if new_tag != c["tag"]: c["tag"] = new_tag c["tag_date"] = gh_ref_date( owner, repo, new_tag, self.gh_token ) else: # Try to pick a tag matching the current major.minor series if available mm = str(self.merged_vars.get("modDirVersion") or "") m2 = re.match(r"^(\d+)\.(\d+)", mm) if m2: base = f"rpi-{m2.group(1)}.{m2.group(2)}" series_tags = [ x for x in tags_all if ( x == f"{base}.y" or x.startswith(f"{base}.y") or x.startswith(f"{base}.") ) ] series_tags.sort(reverse=True) if series_tags: new_tag = series_tags[0] if new_tag != c["tag"]: c["tag"] = new_tag c["tag_date"] = gh_ref_date( owner, repo, new_tag, self.gh_token ) except Exception as _e: # Fallback to previously computed values pass elif fetcher == "git": url = comp.get("url") if url: # Special-case: CachyOS ZFS — read commit from PKGBUILD rather than # tracking HEAD of the repo (the repo has many branches and HEAD is # not necessarily what the kernel package uses). if ( self.pkg_name == "linux-cachyos" and name == "zfs" and "cachyos/zfs" in url ): pkgbuild_commit = self.fetch_cachyos_zfs_commit() if pkgbuild_commit: c["commit"] = pkgbuild_commit c["commit_date"] = git_commit_date(url, pkgbuild_commit) else: commit = git_branch_commit(url, branch) if commit: c["commit"] = commit c["commit_date"] = git_commit_date(url, commit) elif fetcher == "url": # Heuristic for GitHub release assets with variables in version.json (e.g., proton-cachyos) owner = self.merged_vars.get("owner") repo = self.merged_vars.get("repo") if owner and repo: tags = gh_release_tags_api(str(owner), str(repo), self.gh_token) prefix = str(self.merged_vars.get("releasePrefix", "")) suffix = str(self.merged_vars.get("releaseSuffix", "")) latest = next( ( t for t in tags if (t and t.startswith(prefix) and t.endswith(suffix)) ), None, ) if latest: c["release"] = latest c["release_date"] = gh_release_date( str(owner), str(repo), latest, self.gh_token ) mid = latest if prefix and mid.startswith(prefix): mid = mid[len(prefix) :] if suffix and mid.endswith(suffix): mid = mid[: -len(suffix)] parts = mid.split("-") if len(parts) >= 2: base, rel = parts[0], parts[-1] self.url_candidates[name] = { "base": base, "release": rel, "tag": latest, } self.candidates[name] = c def prefetch_hash_for(self, name: str) -> Optional[str]: comp = self.merged_srcs[name] fetcher = comp.get("fetcher", "none") if fetcher == "github": owner = comp.get("owner") repo = comp.get("repo") rendered = render_templates(comp, self.merged_vars) ref = rendered.get("tag") or rendered.get("rev") submodules = bool(comp.get("submodules", False)) if owner and repo and ref: # fetchFromGitHub hashes the NAR of the unpacked tarball, not the # raw tarball file — must use nix_prefetch_github (fakeHash method). return nix_prefetch_github(owner, repo, ref, submodules=submodules) elif fetcher == "git": url = comp.get("url") rev = comp.get("rev") if url and rev: return nix_prefetch_git(url, rev) elif fetcher == "url": rendered = render_templates(comp, self.merged_vars) url = rendered.get("url") or rendered.get("urlTemplate") if url: extra = comp.get("extra") or {} if extra.get("unpack") == "zip": # fetchzip hashes the NAR of extracted content, not the zip file. strip_root = extra.get("stripRoot", True) return nix_prefetch_fetchzip(url, strip_root=strip_root) else: return nix_prefetch_url(url) return None def prefetch_cargo_hash_for(self, name: str) -> Optional[str]: """ Compute the cargo vendor hash for a source component that carries a 'cargoHash' field (or a linked 'cargoDeps' sibling source). Uses nix_prefetch_cargo_vendor() which builds fetchCargoVendor with lib.fakeHash and parses the correct hash from the error output. """ comp = self.merged_srcs[name] fetcher = comp.get("fetcher", "none") src_hash = comp.get("hash", "") subdir = comp.get("cargoSubdir", "") rendered = render_templates(comp, self.merged_vars) if fetcher == "github": owner = comp.get("owner", "") repo = comp.get("repo", "") ref = rendered.get("tag") or rendered.get("rev") or "" if owner and repo and ref and src_hash: return nix_prefetch_cargo_vendor( "github", src_hash, owner=owner, repo=repo, rev=ref, subdir=subdir, ) elif fetcher == "git": url = comp.get("url", "") rev = rendered.get("rev") or rendered.get("tag") or "" if url and rev and src_hash: return nix_prefetch_cargo_vendor( "git", src_hash, url=url, rev=rev, subdir=subdir, ) return None def _source_has_cargo(self, name: str) -> bool: """Return True if this source carries a cargoHash field.""" comp = self.merged_srcs.get(name, {}) return "cargoHash" in comp def _apply_cargo_hash_to_sibling(self, name: str, cargo_hash: str): """ Propagate a freshly-computed cargo hash to any sibling source that mirrors the cargoDeps pattern — a source whose only meaningful field is "hash" and which is meant to stay in sync with the main source's cargoHash. Detection heuristic (any match triggers update): - Sibling is literally named "cargoDeps", OR - Sibling has no fetcher and its only field is "hash" (pure hash mirror) """ ts = self.target_dict.setdefault("sources", {}) for sibling_name, sibling in list(self.merged_srcs.items()): if sibling_name == name: continue has_fetcher = bool(sibling.get("fetcher")) non_fetcher_keys = [k for k in sibling.keys() if k != "fetcher"] is_cargo_deps = sibling_name == "cargoDeps" is_hash_only = not has_fetcher and non_fetcher_keys == ["hash"] if is_cargo_deps or is_hash_only: sw = ts.setdefault(sibling_name, {}) sw["hash"] = cargo_hash def cachyos_suffix(self) -> str: if self.vidx == 0: return "" v = self.variants[self.vidx] mapping = {"rc": "-rc", "hardened": "-hardened", "lts": "-lts"} return mapping.get(v, "") def fetch_cachyos_linux_latest(self, suffix: str) -> Optional[str]: """ Try to determine latest linux version from upstream: - Prefer .SRCINFO (preprocessed) - Fallback to PKGBUILD (parse pkgver= line) Tries both 'CachyOS' and 'cachyos' org casing just in case. """ bases = [ "https://raw.githubusercontent.com/CachyOS/linux-cachyos/master", "https://raw.githubusercontent.com/cachyos/linux-cachyos/master", ] paths = [ f"linux-cachyos{suffix}/.SRCINFO", f"linux-cachyos{suffix}/PKGBUILD", ] def parse_srcinfo(text: str) -> Optional[str]: m = re.search(r"^\s*pkgver\s*=\s*([^\s#]+)\s*$", text, re.MULTILINE) if not m: return None v = m.group(1).strip() return v def parse_pkgbuild(text: str) -> Optional[str]: # Parse assignments and expand variables in pkgver # Build a simple env map from VAR=value lines env: Dict[str, str] = {} for line in text.splitlines(): line = line.strip() if not line or line.startswith("#"): continue m_assign = re.match(r"^\s*([A-Za-z_][A-Za-z0-9_]*)\s*=\s*(.+)$", line) if m_assign: key = m_assign.group(1) val = m_assign.group(2).strip() # Remove trailing comments val = re.sub(r"\s+#.*$", "", val).strip() # Strip surrounding quotes if (val.startswith('"') and val.endswith('"')) or ( val.startswith("'") and val.endswith("'") ): val = val[1:-1] env[key] = val m = re.search(r"^\s*pkgver\s*=\s*(.+)$", text, re.MULTILINE) if not m: return None raw = m.group(1).strip() # Strip quotes if (raw.startswith('"') and raw.endswith('"')) or ( raw.startswith("'") and raw.endswith("'") ): raw = raw[1:-1] def expand_vars(s: str) -> str: def repl_braced(mb): key = mb.group(1) return env.get(key, mb.group(0)) or mb.group(0) def repl_unbraced(mu): key = mu.group(1) return env.get(key, mu.group(0)) or mu.group(0) # Expand ${var} then $var s = re.sub(r"\$\{([^}]+)\}", repl_braced, s) s = re.sub(r"\$([A-Za-z_][A-Za-z0-9_]*)", repl_unbraced, s) return s v = expand_vars(raw).strip() # normalize rc form like 6.19.rc6 -> 6.19-rc6 v = v.replace(".rc", "-rc") return v # Try .SRCINFO first, then PKGBUILD for base in bases: # .SRCINFO url = f"{base}/{paths[0]}" text = http_get_text(url) if text: ver = parse_srcinfo(text) if ver: return ver.replace(".rc", "-rc") # PKGBUILD fallback url = f"{base}/{paths[1]}" text = http_get_text(url) if text: ver = parse_pkgbuild(text) if ver: return ver.replace(".rc", "-rc") return None def linux_tarball_url_for_version(self, version: str) -> str: # Use torvalds snapshot for -rc, stable releases from CDN if "-rc" in version: return f"https://git.kernel.org/torvalds/t/linux-{version}.tar.gz" parts = version.split(".") major = parts[0] if parts else "6" major_minor = ".".join(parts[:2]) if len(parts) >= 2 else version ver_for_tar = major_minor if version.endswith(".0") else version return f"https://cdn.kernel.org/pub/linux/kernel/v{major}.x/linux-{ver_for_tar}.tar.xz" def fetch_cachyos_zfs_commit(self) -> Optional[str]: """ Read the CachyOS PKGBUILD for the current variant and extract the ZFS commit SHA from the source line: git+https://github.com/cachyos/zfs.git#commit= Returns the commit SHA string, or None on failure. """ suffix = self.cachyos_suffix() bases = [ "https://raw.githubusercontent.com/CachyOS/linux-cachyos/master", "https://raw.githubusercontent.com/cachyos/linux-cachyos/master", ] for base in bases: url = f"{base}/linux-cachyos{suffix}/PKGBUILD" text = http_get_text(url) if not text: continue m = re.search( r"git\+https://github\.com/cachyos/zfs\.git#commit=([0-9a-f]+)", text, ) if m: return m.group(1) return None def update_linux_from_pkgbuild(self, name: str): suffix = self.cachyos_suffix() latest = self.fetch_cachyos_linux_latest(suffix) if not latest: self.set_status("linux: failed to get version from PKGBUILD") return url = self.linux_tarball_url_for_version(latest) sri = nix_prefetch_url(url) if not sri: self.set_status("linux: prefetch failed") return ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["version"] = latest compw["hash"] = sri self._refresh_merged() self.set_status(f"{name}: updated version to {latest} and refreshed hash") def _cachyos_config_nix_dir(self) -> Optional[Path]: """Return the config-nix/x86_64-linux dir relative to this package.""" d = self.path.parent / "config-nix" / "x86_64-linux" return d if d.is_dir() else None def _cachyos_regen_flavors(self) -> List[str]: """ Parse regen-config.sh to extract the flavor list, so TUI stays in sync with whatever the script defines. Falls back to a hard-coded default. """ script = self.path.parent / "regen-config.sh" if script.exists(): text = script.read_text() # Match: for flavor in cachyos{-a,-b,...}; do OR for flavor in a b c; do m = re.search(r"for\s+flavor\s+in\s+(cachyos\{[^}]+\}|[^\n;]+?)\s*;", text) if m: raw = m.group(1).strip() # Brace expansion: cachyos{-gcc,-lto} → ["cachyos-gcc", "cachyos-lto"] bm = re.match(r"^(\w+)\{([^}]+)\}$", raw) if bm: prefix = bm.group(1) suffixes = [s.strip() for s in bm.group(2).split(",")] return [f"{prefix}{s}" for s in suffixes] # Plain space-separated list return raw.split() # Hard-coded fallback matching regen-config.sh return [ "cachyos-gcc", "cachyos-lto", "cachyos-lto-full", "cachyos-server", "cachyos-lts", "cachyos-hardened", "cachyos-server-lto", "cachyos-lts-lto", "cachyos-hardened-lto", ] def _flake_root(self) -> Optional[Path]: """Walk up from self.path to find the directory containing flake.nix.""" d = self.path.parent for _ in range(10): if (d / "flake.nix").exists(): return d parent = d.parent if parent == d: break d = parent return None def regen_config_nix(self): """ For each flavor in regen-config.sh, run: nix build .#nixosConfigurations.jallen-nas.pkgs.mjallen.linuxPackages_.kernel.kconfigToNix --no-link --print-out-paths then copy the output store path into config-nix/x86_64-linux/.x86_64-linux.nix. Shows live progress in the status bar. """ config_dir = self._cachyos_config_nix_dir() if config_dir is None: self.set_status("regen: config-nix/x86_64-linux/ not found") return flake_root = self._flake_root() if flake_root is None: self.set_status("regen: could not find flake.nix root") return flavors = self._cachyos_regen_flavors() n = len(flavors) errors: List[str] = [] for i, flavor in enumerate(flavors): self.set_status(f"regen [{i + 1}/{n}] building {flavor}...") self.stdscr.refresh() attr = f".#nixosConfigurations.jallen-nas.pkgs.mjallen.linuxPackages_{flavor}.kernel.kconfigToNix" code, out, err = run_cmd( ["nix", "build", attr, "--no-link", "--print-out-paths"], ) if code != 0 or not out: errors.append(flavor) eprintln(f"regen {flavor} failed:\n{err[-400:]}") continue store_path = out.strip().splitlines()[0].strip() try: content = Path(store_path).read_text() except Exception as e: errors.append(flavor) eprintln(f"regen {flavor}: read {store_path} failed: {e}") continue dest = config_dir / f"{flavor}.x86_64-linux.nix" try: dest.write_text(content) except Exception as e: errors.append(flavor) eprintln(f"regen {flavor}: write {dest} failed: {e}") continue if errors: self.set_status(f"regen: done with errors on: {', '.join(errors)}") else: self.set_status(f"regen: updated {n} config.nix files") def _refresh_merged(self): """Re-compute merged_vars/merged_srcs/target_dict without resetting sidx.""" variant_name = None if self.vidx == 0 else self.variants[self.vidx] self.merged_vars, self.merged_srcs, self.target_dict = merged_view( self.spec, variant_name ) self.snames = sorted(list(self.merged_srcs.keys())) # Clamp sidx in case source list changed if self.snames: self.sidx = min(self.sidx, len(self.snames) - 1) def set_ref(self, name: str, kind: str, value: str): # Write to selected target dict (base or variant override) ts = self.target_dict.setdefault("sources", {}) comp = ts.setdefault(name, {}) if kind in ("release", "tag"): comp["tag"] = value if "rev" in comp: del comp["rev"] elif kind == "commit": comp["rev"] = value if "tag" in comp: del comp["tag"] # Refresh merged_srcs so prefetch_hash_for sees the updated ref self._refresh_merged() def save(self): if self.is_python: # For Python packages, update the default.nix file for name in self.snames: source = self.merged_srcs[name] updates = {} # Get version from variables if "version" in self.merged_vars: updates["version"] = self.merged_vars["version"] # Get hash from source if "hash" in source: updates["hash"] = source["hash"] # Get tag from source if "tag" in source: updates["tag"] = source["tag"] # Get rev from source if "rev" in source: updates["rev"] = source["rev"] if updates: update_python_package(self.path, name, updates) return True elif self.is_homeassistant: # For Home Assistant components, update the default.nix file for name in self.snames: source = self.merged_srcs[name] updates = {} # Get version from variables if "version" in self.merged_vars: updates["version"] = self.merged_vars["version"] # Get hash from source if "hash" in source: updates["hash"] = source["hash"] # Get tag from source if "tag" in source: updates["tag"] = source["tag"] # Get rev from source if "rev" in source: updates["rev"] = source["rev"] if updates: update_homeassistant_component(self.path, name, updates) return True else: # For regular packages, save to version.json save_json(self.path, self.spec) return True def run(self): while True: self.stdscr.clear() h, w = self.stdscr.getmaxyx() # Draw main border around the entire screen draw_border(self.stdscr, 0, 0, h - 1, w) # Row 0: package name + type badge centred in border if self.is_python: type_tag = " [py]" elif self.is_homeassistant: type_tag = " [ha]" else: type_tag = "" title = f" {self.pkg_name}{type_tag} " title_x = max(1, (w - len(title)) // 2) self.stdscr.addstr( 0, title_x, title[: w - 2], curses.color_pair(COLOR_TITLE) | curses.A_BOLD, ) # Row 1 left: relative path (dim) try: rel_path = str(self.path.relative_to(Path("/etc/nixos"))) except ValueError: rel_path = str(self.path) self.stdscr.addstr( 1, 2, rel_path[: w - 4], curses.color_pair(COLOR_NORMAL) | curses.A_DIM ) # Row 2: Variants or Version DETAIL_HDR_ROW = 2 # variant/version row DETAIL_SEP_ROW = 3 # separator DETAIL_SRC_ROW = 4 # first source row if not self.is_python: self.stdscr.addstr( DETAIL_HDR_ROW, 2, "Variants:", curses.color_pair(COLOR_HEADER) ) x_pos = 12 for i, v in enumerate(self.variants): if x_pos >= w - 4: break if i > 0: self.stdscr.addstr( DETAIL_HDR_ROW, x_pos, " | ", curses.color_pair(COLOR_NORMAL), ) x_pos += 3 if i == self.vidx: self.stdscr.addstr( DETAIL_HDR_ROW, x_pos, f"[{v}]", curses.color_pair(COLOR_HIGHLIGHT), ) else: self.stdscr.addstr( DETAIL_HDR_ROW, x_pos, v, curses.color_pair(COLOR_NORMAL) ) x_pos += len(v) + (2 if i == self.vidx else 0) else: version = self.merged_vars.get("version", "") self.stdscr.addstr( DETAIL_HDR_ROW, 2, "Version:", curses.color_pair(COLOR_HEADER) ) self.stdscr.addstr( DETAIL_HDR_ROW, 11, version, curses.color_pair(COLOR_SUCCESS) ) # Separator + Sources header for i in range(1, w - 1): self.stdscr.addch( DETAIL_SEP_ROW, i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER) ) self.stdscr.addstr( DETAIL_SEP_ROW, 2, " Sources ", curses.color_pair(COLOR_HEADER) | curses.A_BOLD, ) # footer occupies h-4 (sep), h-3, h-2, h-1 (status) # latest section: separator + up to 3 content rows → y_latest = h-9 y_latest = h - 9 # Source rows — columns: sel+name(20) | fetcher(6) | ref SRC_NAME_W = 20 SRC_FETCH_W = 6 SRC_REF_COL = 2 + SRC_NAME_W + 1 + SRC_FETCH_W + 1 # col 30 # Source rows fit between DETAIL_SRC_ROW and y_latest-1 _max_src_rows = max(0, y_latest - DETAIL_SRC_ROW) for i, name in enumerate(self.snames[:_max_src_rows], start=0): comp = self.merged_srcs[name] fetcher = comp.get("fetcher", "none") display_ref = source_display_ref(comp, self.merged_vars) branch = comp.get("branch") or "" has_cargo = "cargoHash" in comp # Append badge tokens to the ref string badges = "" if branch: badges += f" [{branch}]" if has_cargo: badges += " [cargo]" ref_text = display_ref + badges ref_short = ref_text[: w - SRC_REF_COL - 2] + ( "…" if len(ref_text) > w - SRC_REF_COL - 2 else "" ) if i == self.sidx: row_attr = curses.color_pair(COLOR_HIGHLIGHT) sel = "►" else: row_attr = curses.color_pair(COLOR_NORMAL) sel = " " fetcher_color = COLOR_NORMAL if fetcher == "github": fetcher_color = COLOR_SUCCESS elif fetcher == "url": fetcher_color = COLOR_STATUS elif fetcher == "git": fetcher_color = COLOR_HEADER row = DETAIL_SRC_ROW + i self.stdscr.addstr( row, 2, f"{sel} {name[: SRC_NAME_W - 2]:<{SRC_NAME_W - 2}}", row_attr, ) self.stdscr.addstr( row, 2 + SRC_NAME_W, f"{fetcher[:SRC_FETCH_W]:<{SRC_FETCH_W}}", curses.color_pair(fetcher_color), ) self.stdscr.addstr( row, SRC_REF_COL, ref_short, curses.color_pair(COLOR_NORMAL) ) # Draw a separator line before the latest candidates section for i in range(1, w - 1): self.stdscr.addch( y_latest, i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER) ) # Latest candidates section for selected component (auto-fetched) if self.snames: _sel_name = self.snames[self.sidx] _comp = self.merged_srcs[_sel_name] _fetcher = _comp.get("fetcher", "none") # Preload candidates lazily for selected item if ( _fetcher in ("github", "git", "url") and _sel_name not in self.candidates ): self.fetch_candidates_for(_sel_name) # Latest header with decoration — show branch if locked _branch = _comp.get("branch") or "" _latest_hdr = ( f"Latest Versions: (branch: {_branch})" if _branch else "Latest Versions:" ) self.stdscr.addstr( y_latest + 1, 2, _latest_hdr[: w - 4], curses.color_pair(COLOR_HEADER) | curses.A_BOLD, ) if _fetcher in ("github", "git"): _cand = self.candidates.get(_sel_name, {}) _dim = curses.color_pair(COLOR_DIM) | curses.A_DIM def _put_cand( row: int, label: str, value: str, date: str, val_color: int ): """Write one candidate row: Label value date""" lbl_w = 9 # "Release: " etc self.stdscr.addstr( row, 4, f"{label:<{lbl_w}}", curses.color_pair(COLOR_HEADER) ) val_end = 4 + lbl_w + len(value) self.stdscr.addstr( row, 4 + lbl_w, value[: w - 4 - lbl_w - 2], curses.color_pair(val_color), ) if date and val_end + 2 < w - 2: self.stdscr.addstr(row, val_end + 1, date, _dim) _row = y_latest + 2 if _cand.get("release"): _put_cand( _row, "Release:", _cand["release"], _cand.get("release_date", ""), COLOR_SUCCESS, ) _row += 1 if _cand.get("tag"): _put_cand( _row, "Tag:", _cand["tag"], _cand.get("tag_date", ""), COLOR_SUCCESS, ) _row += 1 if _cand.get("commit"): _put_cand( _row, "Commit:", (_cand["commit"] or "")[:12], _cand.get("commit_date", ""), COLOR_NORMAL, ) elif _fetcher == "url": _cand_u = self.url_candidates.get(_sel_name, {}) or {} _cand_r = self.candidates.get(_sel_name, {}) _dim = curses.color_pair(COLOR_DIM) | curses.A_DIM _url_date = _cand_r.get("release_date", "") _urow = y_latest + 2 _tag = _cand_u.get("tag") or (_cand_r.get("release") or "") if _tag: self.stdscr.addstr( _urow, 4, "Tag: ", curses.color_pair(COLOR_HEADER) ) self.stdscr.addstr( _urow, 13, _tag[: w - 16], curses.color_pair(COLOR_SUCCESS) ) if _url_date and 13 + len(_tag) + 2 < w - 2: self.stdscr.addstr( _urow, 13 + len(_tag) + 1, _url_date, _dim ) _urow += 1 if _cand_u.get("base") and _cand_u.get("release"): _b = _cand_u["base"] _r = _cand_u["release"] self.stdscr.addstr( _urow, 4, f"base={_b} release={_r}"[: w - 6], curses.color_pair(COLOR_NORMAL), ) else: if self.pkg_name == "linux-cachyos" and _sel_name == "linux": _suffix = self.cachyos_suffix() _latest = self.fetch_cachyos_linux_latest(_suffix) self.stdscr.addstr( y_latest + 2, 4, "PKGBUILD version:", curses.color_pair(COLOR_HEADER), ) self.stdscr.addstr( y_latest + 2, 21, _latest or "-", curses.color_pair( COLOR_SUCCESS if _latest else COLOR_NORMAL ), ) elif self.pkg_name == "linux-cachyos" and _sel_name == "zfs": _pkgb_commit = self.fetch_cachyos_zfs_commit() _cur_rev = self.merged_srcs.get("zfs", {}).get("rev", "") _dim = curses.color_pair(COLOR_DIM) | curses.A_DIM self.stdscr.addstr( y_latest + 2, 4, "PKGBUILD commit:", curses.color_pair(COLOR_HEADER), ) if _pkgb_commit: _same = _pkgb_commit == _cur_rev _col = COLOR_NORMAL if _same else COLOR_SUCCESS self.stdscr.addstr( y_latest + 2, 21, _pkgb_commit[:12], curses.color_pair(_col), ) if _same: self.stdscr.addstr( y_latest + 2, 34, "(up to date)", _dim ) else: self.stdscr.addstr( y_latest + 2, 21, "-", curses.color_pair(COLOR_NORMAL) ) else: self.stdscr.addstr( y_latest + 2, 4, "No candidates available", curses.color_pair(COLOR_NORMAL), ) # Separator before footer for i in range(1, w - 1): self.stdscr.addch( h - 4, i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER) ) # Footer: two concise lines footer1 = "Enter:actions r:refresh h:hash i:url e:edit s:save" footer2 = "←/→:variant k/j:source Bksp:back q:quit" f1x = max(1, (w - len(footer1)) // 2) f2x = max(1, (w - len(footer2)) // 2) self.stdscr.addstr( h - 3, f1x, footer1[: w - 2], curses.color_pair(COLOR_STATUS) ) self.stdscr.addstr( h - 2, f2x, footer2[: w - 2], curses.color_pair(COLOR_STATUS) ) # Draw status at the bottom self.draw_status(h, w) self.stdscr.refresh() ch = self.stdscr.getch() if ch in (ord("q"), 27): return None elif ch == curses.KEY_BACKSPACE or ch == 127: return "reload" elif ch in (curses.KEY_LEFT,): self.vidx = max(0, self.vidx - 1) self.select_variant() elif ch in (curses.KEY_RIGHT,): self.vidx = min(len(self.variants) - 1, self.vidx + 1) self.select_variant() elif ch in (curses.KEY_UP, ord("k")): self.sidx = max(0, self.sidx - 1) elif ch in (curses.KEY_DOWN, ord("j")): self.sidx = min(len(self.snames) - 1, self.sidx + 1) elif ch in (ord("r"),): if self.snames: name = self.snames[self.sidx] comp = self.merged_srcs[name] fetcher = comp.get("fetcher", "none") if self.pkg_name == "linux-cachyos" and name == "linux": # Show available linux version from upstream PKGBUILD (.SRCINFO) suffix = self.cachyos_suffix() latest = self.fetch_cachyos_linux_latest(suffix) rendered = render_templates(comp, self.merged_vars) cur_version = str(rendered.get("version") or "") url_hint = ( self.linux_tarball_url_for_version(latest) if latest else "-" ) lines = [ f"linux-cachyos ({'base' if self.vidx == 0 else self.variants[self.vidx]}):", f" current : {cur_version or '-'}", f" available: {latest or '-'}", f" tarball : {url_hint}", ] show_popup(self.stdscr, lines) elif self.pkg_name == "linux-cachyos" and name == "zfs": pkgbuild_commit = self.fetch_cachyos_zfs_commit() cur_rev = comp.get("rev", "") up_to_date = pkgbuild_commit and pkgbuild_commit == cur_rev lines = [ f"linux-cachyos/zfs ({'base' if self.vidx == 0 else self.variants[self.vidx]}):", f" current : {cur_rev[:12] or '-'}", f" PKGBUILD : {pkgbuild_commit[:12] if pkgbuild_commit else '-'}", f" status : {'up to date' if up_to_date else 'update available' if pkgbuild_commit else 'unknown'}", ] show_popup(self.stdscr, lines) else: self.fetch_candidates_for(name) cand = self.candidates.get(name, {}) branch = comp.get("branch") or "" def _fmt(val: str, date: str) -> str: return f"{val} {date}" if val and date else (val or "-") lines = [ f"Candidates for {name}:" + (f" (branch: {branch})" if branch else ""), f" latest release: {_fmt(cand.get('release', ''), cand.get('release_date', ''))}", f" latest tag : {_fmt(cand.get('tag', ''), cand.get('tag_date', ''))}", f" latest commit : {_fmt(cand.get('commit', '')[:12] if cand.get('commit') else '', cand.get('commit_date', ''))}", ] show_popup(self.stdscr, lines) elif ch in (ord("i"),): # Show full rendered URL for URL-based sources if self.snames: name = self.snames[self.sidx] comp = self.merged_srcs[name] if comp.get("fetcher", "none") == "url": rendered = render_templates(comp, self.merged_vars) url = rendered.get("url") or rendered.get("urlTemplate") or "" if url: show_popup(self.stdscr, ["Full URL:", url]) else: self.set_status("No URL available") elif ch in (ord("h"),): if self.snames: name = self.snames[self.sidx] sri = self.prefetch_hash_for(name) if sri: ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["hash"] = sri self._refresh_merged() # If this source also has a cargoHash, recompute it now if self._source_has_cargo(name): self.set_status( f"{name}: updated hash; computing cargo hash..." ) self.stdscr.refresh() cargo_sri = self.prefetch_cargo_hash_for(name) if cargo_sri: compw["cargoHash"] = cargo_sri self._apply_cargo_hash_to_sibling(name, cargo_sri) self._refresh_merged() self.set_status(f"{name}: updated hash + cargo hash") else: self.set_status( f"{name}: updated hash; cargo hash failed" ) else: self.set_status(f"{name}: updated hash") else: self.set_status(f"{name}: hash prefetch failed") elif ch in (ord("e"),): s = prompt_input( self.stdscr, "Edit path=value (relative to selected base/variant): " ) if s: if "=" not in s: self.set_status("Invalid input, expected key.path=value") else: k, v = s.split("=", 1) path = [p for p in k.split(".") if p] deep_set(self.cursor, path, v) self.set_status(f"Set {k}={v}") elif ch in (ord("s"),): try: self.save() self.set_status("Saved.") except Exception as e: self.set_status(f"Save failed: {e}") elif ch in (curses.KEY_ENTER, 10, 13): if not self.snames: continue name = self.snames[self.sidx] comp = self.merged_srcs[name] fetcher = comp.get("fetcher", "none") if fetcher in ("github", "git"): # Ensure candidates loaded if name not in self.candidates: self.fetch_candidates_for(name) cand = self.candidates.get(name, {}) branch = comp.get("branch") or "" # Present small menu items = [] if fetcher == "github": # When branch-locked, only offer latest commit (tags are irrelevant) if branch: items = [ ( "Use latest commit (rev)", ("commit", cand.get("commit")), ), ("Recompute hash", ("hash", None)), ("Cancel", ("cancel", None)), ] else: items = [ ( "Use latest release (tag)", ("release", cand.get("release")), ), ("Use latest tag", ("tag", cand.get("tag"))), ( "Use latest commit (rev)", ("commit", cand.get("commit")), ), ("Recompute hash", ("hash", None)), ("Cancel", ("cancel", None)), ] else: items = [ ("Use latest commit (rev)", ("commit", cand.get("commit"))), ("Recompute hash", ("hash", None)), ("Cancel", ("cancel", None)), ] # Inject cargo hash option before Cancel when applicable has_cargo = self._source_has_cargo(name) if has_cargo: items = [item for item in items if item[1][0] != "cancel"] + [ ("Recompute cargo hash", ("cargo_hash", None)), ("Cancel", ("cancel", None)), ] # Build header with current and available refs rendered = render_templates(comp, self.merged_vars) cur_tag = rendered.get("tag") or "" cur_rev = rendered.get("rev") or "" cur_version = rendered.get("version") or "" if cur_tag: current_str = f"current: tag={cur_tag}" elif cur_rev: current_str = f"current: rev={cur_rev[:12]}" elif cur_version: current_str = f"current: version={cur_version}" else: current_str = "current: -" if branch: current_str += f" (branch: {branch})" cur_cargo = comp.get("cargoHash", "") def _av(val: str, date: str) -> str: v = val or "-" return f"{v} {date}" if val and date else v header_lines = [ current_str, f"available:", f" release : {_av(cand.get('release', ''), cand.get('release_date', ''))}", f" tag : {_av(cand.get('tag', ''), cand.get('tag_date', ''))}", f" commit : {_av((cand.get('commit') or '')[:12], cand.get('commit_date', ''))}", ] if has_cargo: header_lines.append( f"cargoHash: {cur_cargo[:32] + '...' if len(cur_cargo) > 32 else cur_cargo or '-'}" ) choice = select_menu( self.stdscr, f"Actions for {name}", [label for label, _ in items], header=header_lines, ) if choice is not None: kind, val = items[choice][1] if kind in ("release", "tag", "commit"): if val: self.set_ref(name, kind, val) # update src hash sri = self.prefetch_hash_for(name) if sri: ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["hash"] = sri self._refresh_merged() # also update cargo hash if applicable if has_cargo: self.set_status( f"{name}: set {kind}, hashing (src)..." ) cargo_sri = self.prefetch_cargo_hash_for(name) if cargo_sri: ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["cargoHash"] = cargo_sri self._apply_cargo_hash_to_sibling( name, cargo_sri ) self._refresh_merged() self.set_status( f"{name}: set {kind}, updated src + cargo hash" ) else: self.set_status( f"{name}: set {kind}, updated src hash; cargo hash failed" ) else: self.set_status( f"{name}: set {kind} and updated hash" ) else: self.set_status(f"No candidate {kind}") elif kind == "hash": sri = self.prefetch_hash_for(name) if sri: ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["hash"] = sri self._refresh_merged() self.set_status(f"{name}: updated hash") else: self.set_status("hash prefetch failed") elif kind == "cargo_hash": self.set_status(f"{name}: computing cargo hash...") self.stdscr.refresh() cargo_sri = self.prefetch_cargo_hash_for(name) if cargo_sri: ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["cargoHash"] = cargo_sri self._apply_cargo_hash_to_sibling(name, cargo_sri) self._refresh_merged() self.set_status(f"{name}: updated cargo hash") else: self.set_status( f"{name}: cargo hash computation failed" ) else: pass elif fetcher == "url": # Offer latest release update (for proton-cachyos-like schemas) and/or hash recompute cand = self.url_candidates.get(name) menu_items: List[ Tuple[str, Tuple[str, Optional[Dict[str, str]]]] ] = [] if cand and cand.get("base") and cand.get("release"): menu_items.append( ( "Use latest release (update variables.base/release)", ("update_vars", cand), ) ) menu_items.append(("Recompute hash (prefetch)", ("hash", None))) menu_items.append(("Cancel", ("cancel", None))) # Build header with current and available release info base = str(self.merged_vars.get("base") or "") rel = str(self.merged_vars.get("release") or "") rp = str(self.merged_vars.get("releasePrefix") or "") rs = str(self.merged_vars.get("releaseSuffix") or "") current_tag = f"{rp}{base}-{rel}{rs}" if (base and rel) else "" if current_tag: current_str = f"current: {current_tag}" elif base or rel: current_str = ( f"current: base={base or '-'} release={rel or '-'}" ) else: current_str = "current: -" header_lines = [ current_str, f"available: tag={(cand.get('tag') or '-') if cand else '-'} base={(cand.get('base') or '-') if cand else '-'} release={(cand.get('release') or '-') if cand else '-'}", ] choice = select_menu( self.stdscr, f"Actions for {name}", [label for label, _ in menu_items], header=header_lines, ) if choice is not None: kind, payload = menu_items[choice][1] if kind == "update_vars" and isinstance(payload, dict): # Write variables into selected base/variant dict vars_dict = self.target_dict.setdefault("variables", {}) vars_dict["base"] = payload["base"] vars_dict["release"] = payload["release"] # Recompute merged view to reflect new variables self.recompute_view() # Prefetch and update hash sri = self.prefetch_hash_for(name) if sri: ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["hash"] = sri self.set_status( f"{name}: updated to {payload['base']}.{payload['release']} and refreshed hash" ) else: self.set_status( "hash prefetch failed after variable update" ) elif kind == "hash": sri = self.prefetch_hash_for(name) if sri: ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["hash"] = sri self.set_status(f"{name}: updated hash") else: self.set_status("hash prefetch failed") else: pass else: if self.pkg_name == "linux-cachyos" and name == "linux": # Offer update of linux version from upstream PKGBUILD (.SRCINFO) suffix = self.cachyos_suffix() latest = self.fetch_cachyos_linux_latest(suffix) rendered = render_templates(comp, self.merged_vars) cur_version = str(rendered.get("version") or "") regen_flavors = self._cachyos_regen_flavors() header_lines = [ f"current: version={cur_version or '-'}", f"available: version={latest or '-'}", f"regen flavors ({len(regen_flavors)}): {', '.join(regen_flavors[:4])}{'...' if len(regen_flavors) > 4 else ''}", ] opts = [] if latest: opts.append( f"Update linux version to {latest} from PKGBUILD (.SRCINFO)" ) else: opts.append("Update linux version from PKGBUILD (.SRCINFO)") opts.append( f"Regen all config.nix files ({len(regen_flavors)} flavors)" ) opts.append("Cancel") choice = select_menu( self.stdscr, f"Actions for {name}", opts, header=header_lines, ) if choice is not None: chosen = opts[choice] if chosen.startswith("Update linux version") and latest: self.update_linux_from_pkgbuild(name) elif chosen.startswith("Regen all config.nix"): self.regen_config_nix() elif self.pkg_name == "linux-cachyos" and name == "zfs": # ZFS commit is pinned in the PKGBUILD — read it from there pkgbuild_commit = self.fetch_cachyos_zfs_commit() rendered = render_templates(comp, self.merged_vars) cur_rev = str(comp.get("rev") or "") header_lines = [ f"current: {cur_rev[:12] or '-'}", f"PKGBUILD: {pkgbuild_commit[:12] if pkgbuild_commit else '-'}", ] opts = [] if pkgbuild_commit: opts.append( f"Update to PKGBUILD commit ({pkgbuild_commit[:12]})" ) opts.append("Recompute hash") opts.append("Cancel") choice = select_menu( self.stdscr, f"Actions for {name}", opts, header=header_lines, ) if choice is not None: chosen = opts[choice] if ( chosen.startswith("Update to PKGBUILD") and pkgbuild_commit ): self.set_status("zfs: fetching commit and hash...") self.stdscr.refresh() ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["rev"] = pkgbuild_commit self._refresh_merged() sri = self.prefetch_hash_for(name) if sri: compw["hash"] = sri self._refresh_merged() self.set_status( f"zfs: updated to {pkgbuild_commit[:12]} and refreshed hash" ) else: self.set_status( "zfs: updated rev but hash prefetch failed" ) elif chosen == "Recompute hash": self.set_status("zfs: recomputing hash...") self.stdscr.refresh() sri = self.prefetch_hash_for(name) if sri: ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["hash"] = sri self._refresh_merged() self.set_status("zfs: updated hash") else: self.set_status("zfs: hash prefetch failed") else: show_popup( self.stdscr, [ f"{name}: fetcher={fetcher}", "Use 'e' to edit fields manually.", ], ) else: pass def select_menu( stdscr, title: str, options: List[str], header: Optional[List[str]] = None ) -> Optional[int]: idx = 0 while True: stdscr.clear() h, w = stdscr.getmaxyx() # Calculate menu dimensions — account for title, header lines, and options max_opt_len = max((len(opt) + 4 for opt in options), default=0) max_hdr_len = max((len(str(l)) + 4 for l in header), default=0) if header else 0 title_len = len(title) + 4 menu_width = min(w - 4, max(44, title_len, max_opt_len, max_hdr_len)) menu_height = min(h - 4, len(options) + (len(header) + 1 if header else 0) + 4) # Calculate position for centered menu start_x = (w - menu_width) // 2 start_y = (h - menu_height) // 2 # Draw border around menu draw_border(stdscr, start_y, start_x, menu_height, menu_width) # Draw title title_x = start_x + (menu_width - len(title)) // 2 stdscr.addstr( start_y, title_x, f" {title} ", curses.color_pair(COLOR_TITLE) | curses.A_BOLD, ) # Draw header if provided y = start_y + 1 if header: for line in header: if y >= start_y + menu_height - 2: break # Ensure we don't write beyond menu width line_str = str(line) if len(line_str) > menu_width - 4: line_str = line_str[: menu_width - 7] + "..." stdscr.addstr( y, start_x + 2, line_str, curses.color_pair(COLOR_HEADER), ) y += 1 # Add separator line after header for i in range(1, menu_width - 1): stdscr.addch( y, start_x + i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER) ) y += 1 # Draw options options_start_y = y max_visible_options = max(1, start_y + menu_height - options_start_y - 1) visible_options = min(len(options), max_visible_options) for i, opt in enumerate(options[:visible_options], start=0): # Highlight selected option if i == idx: attr = curses.color_pair(COLOR_HIGHLIGHT) sel = "►" # Use a fancier selector else: attr = curses.color_pair(COLOR_NORMAL) sel = " " # Truncate long options to fit in menu opt_text = f"{sel} {opt}" if len(opt_text) > menu_width - 4: opt_text = opt_text[: menu_width - 7] + "..." stdscr.addstr(options_start_y + i, start_x + 2, opt_text, attr) # Draw footer footer = "Enter: select | Backspace: cancel" footer_x = start_x + (menu_width - len(footer)) // 2 stdscr.addstr( start_y + menu_height - 1, footer_x, footer, curses.color_pair(COLOR_STATUS) ) stdscr.refresh() ch = stdscr.getch() if ch in (curses.KEY_UP, ord("k")): idx = max(0, idx - 1) elif ch in (curses.KEY_DOWN, ord("j")): idx = min(len(options) - 1, idx + 1) elif ch in (curses.KEY_ENTER, 10, 13): return idx elif ch == curses.KEY_BACKSPACE or ch == 127 or ch == 27: return None # ------------------------------ main ------------------------------ def main(stdscr): curses.curs_set(0) # Hide cursor stdscr.nodelay(False) # Blocking input # Initialize colors if curses.has_colors(): init_colors() try: screen = PackagesScreen(stdscr) screen.run() except Exception: curses.endwin() traceback.print_exc() sys.exit(1) if __name__ == "__main__": curses.wrapper(main)