#!/usr/bin/env python3 """ Interactive TUI for browsing and updating unified version.json files. Features: - Scans packages/**/version.json and lists all packages - Per-package view: - Choose base or any variant - List all sources/components with current ref (tag/rev/url/version) and hash - For GitHub sources: fetch candidates (latest release tag, latest tag, latest commit) - For Git sources: fetch latest commit (HEAD) - For URL sources: recompute hash (url/urlTemplate with rendered variables) - Actions on a component: - Update to one of the candidates (sets tag or rev) and optionally re-hash - Recompute hash (prefetch) - Edit any field via path=value (e.g., variables.version=2025.07) - Writes changes back to version.json Dependencies: - Standard library + external CLI tools: - nix-prefetch-url (or `nix prefetch-url`) and `nix hash to-sri` - nix-prefetch-git - git - Optional: GITHUB_TOKEN env var to increase GitHub API rate limits Usage: scripts/version_tui.py Controls: - Up/Down to navigate lists - Enter to select - Backspace to go back - q to quit - On component screen: r = refresh candidates h = recompute hash (prefetch) e = edit arbitrary field (path=value) s = save to disk """ import curses import json import os import re import subprocess import sys import traceback import urllib.request import urllib.error from urllib.parse import urlparse from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union ROOT = Path(__file__).resolve().parents[1] PKGS_DIR = ROOT / "packages" Json = Dict[str, Any] # ------------------------------ Utilities ------------------------------ def eprintln(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) def load_json(path: Path) -> Json: with path.open("r", encoding="utf-8") as f: return json.load(f) def save_json(path: Path, data: Json): tmp = path.with_suffix(".tmp") with tmp.open("w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) f.write("\n") tmp.replace(path) def render_templates(value: Any, variables: Dict[str, Any]) -> Any: if isinstance(value, str): def repl(m): name = m.group(1) return str(variables.get(name, m.group(0))) return re.sub(r"\$\{([^}]+)\}", repl, value) elif isinstance(value, dict): return {k: render_templates(v, variables) for k, v in value.items()} elif isinstance(value, list): return [render_templates(v, variables) for v in value] return value def deep_set(o: Json, path: List[str], value: Any): cur = o for p in path[:-1]: if p not in cur or not isinstance(cur[p], dict): cur[p] = {} cur = cur[p] cur[path[-1]] = value # ------------------------------ Merge helpers (match lib/versioning.nix) ------------------------------ def deep_merge(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]: out = dict(a) for k, v in b.items(): if k in out and isinstance(out[k], dict) and isinstance(v, dict): out[k] = deep_merge(out[k], v) else: out[k] = v return out def merge_sources( base_sources: Dict[str, Any], overrides: Dict[str, Any] ) -> Dict[str, Any]: names = set(base_sources.keys()) | set(overrides.keys()) result: Dict[str, Any] = {} for n in names: if n in base_sources and n in overrides: if isinstance(base_sources[n], dict) and isinstance(overrides[n], dict): result[n] = deep_merge(base_sources[n], overrides[n]) else: result[n] = overrides[n] elif n in overrides: result[n] = overrides[n] else: result[n] = base_sources[n] return result def merged_view( spec: Json, variant_name: Optional[str] ) -> Tuple[Dict[str, Any], Dict[str, Any], Json]: """ Returns (merged_variables, merged_sources, target_dict_to_write) merged_* are used for display/prefetch; target_dict_to_write is where updates must be written (base or selected variant). """ base_vars = spec.get("variables", {}) or {} base_sources = spec.get("sources", {}) or {} if variant_name: vdict = spec.get("variants", {}).get(variant_name) if not isinstance(vdict, dict): raise ValueError(f"Variant '{variant_name}' not found") v_vars = vdict.get("variables", {}) or {} v_sources = vdict.get("sources", {}) or {} merged_vars = dict(base_vars) merged_vars.update(v_vars) merged_srcs = merge_sources(base_sources, v_sources) return merged_vars, merged_srcs, vdict else: return dict(base_vars), dict(base_sources), spec def run_cmd(args: List[str]) -> Tuple[int, str, str]: try: p = subprocess.run( args, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False ) return p.returncode, p.stdout.strip(), p.stderr.strip() except Exception as e: return 1, "", str(e) def run_get_stdout(args: List[str]) -> Optional[str]: code, out, err = run_cmd(args) if code != 0: eprintln(f"Command failed: {' '.join(args)}\n{err}") return None return out def nix_prefetch_url(url: str) -> Optional[str]: # returns SRI # Try modern nix store prefetch-file first (it returns SRI format directly) out = run_get_stdout( ["nix", "store", "prefetch-file", "--hash-type", "sha256", "--json", url] ) if out is not None and out.strip(): try: data = json.loads(out) if "hash" in data: return data["hash"] except Exception: pass # Fallback to legacy nix-prefetch-url out = run_get_stdout(["nix-prefetch-url", "--type", "sha256", url]) if out is None: out = run_get_stdout(["nix-prefetch-url", url]) if out is None: return None # Convert to SRI sri = run_get_stdout(["nix", "hash", "to-sri", "--type", "sha256", out.strip()]) return sri def nix_prefetch_git(url: str, rev: str) -> Optional[str]: # Try two-step approach: fetchGit then hash path expr = f'builtins.fetchGit {{ url = "{url}"; rev = "{rev}"; }}' out = run_get_stdout(["nix", "eval", "--raw", "--expr", expr]) if out is not None and out.strip(): # Now hash the fetched path hash_out = run_get_stdout( ["nix", "hash", "path", "--type", "sha256", out.strip()] ) if hash_out is not None and hash_out.strip(): return hash_out.strip() # Fallback to nix-prefetch-git out = run_get_stdout(["nix-prefetch-git", "--no-deepClone", "--rev", rev, url]) if out is None: return None base32 = None try: data = json.loads(out) base32 = data.get("sha256") or data.get("hash") except Exception: lines = [l for l in out.splitlines() if l.strip()] if lines: base32 = lines[-1].strip() if not base32: return None # Convert to SRI sri = run_get_stdout(["nix", "hash", "to-sri", "--type", "sha256", base32]) return sri def http_get_json(url: str, token: Optional[str] = None) -> Any: try: req = urllib.request.Request( url, headers={"Accept": "application/vnd.github+json"} ) if token: req.add_header("Authorization", f"Bearer {token}") with urllib.request.urlopen(req, timeout=10) as resp: if resp.status != 200: return None return json.loads(resp.read().decode("utf-8")) except urllib.error.HTTPError as e: eprintln(f"HTTP error for {url}: {e.code} {e.reason}") return None except Exception as e: eprintln(f"Request failed for {url}: {e}") return None def http_get_text(url: str) -> Optional[str]: try: # Provide a basic User-Agent to avoid some hosts rejecting the request req = urllib.request.Request(url, headers={"User-Agent": "version-tui/1.0"}) with urllib.request.urlopen(req, timeout=10) as resp: if resp.status != 200: return None return resp.read().decode("utf-8") except urllib.error.HTTPError as e: eprintln(f"HTTP error for {url}: {e.code} {e.reason}") return None except Exception as e: eprintln(f"Request failed for {url}: {e}") return None def gh_latest_release(owner: str, repo: str, token: Optional[str]) -> Optional[str]: try: data = http_get_json( f"https://api.github.com/repos/{owner}/{repo}/releases/latest", token ) if not data: return None return data.get("tag_name") except Exception as e: eprintln(f"latest_release failed for {owner}/{repo}: {e}") return None def gh_latest_tag(owner: str, repo: str, token: Optional[str]) -> Optional[str]: try: data = http_get_json( f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=100", token ) if not isinstance(data, list): return None tags = [ t.get("name") for t in data if isinstance(t, dict) and "name" in t and t.get("name") is not None ] return tags[0] if tags else None except Exception as e: eprintln(f"latest_tag failed for {owner}/{repo}: {e}") return None def gh_list_tags(owner: str, repo: str, token: Optional[str]) -> List[str]: try: data = http_get_json( f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=100", token ) return [ str(t.get("name")) for t in data if isinstance(t, dict) and "name" in t and t.get("name") is not None ] except Exception as e: eprintln(f"list_tags failed for {owner}/{repo}: {e}") return [] def gh_head_commit(owner: str, repo: str) -> Optional[str]: try: out = run_get_stdout( ["git", "ls-remote", f"https://github.com/{owner}/{repo}.git", "HEAD"] ) if not out: return None parts = out.split() return parts[0] if parts else None except Exception as e: eprintln(f"head_commit failed for {owner}/{repo}: {e}") return None def gh_tarball_url(owner: str, repo: str, ref: str) -> str: return f"https://codeload.github.com/{owner}/{repo}/tar.gz/{ref}" def gh_release_tags_api(owner: str, repo: str, token: Optional[str]) -> List[str]: """ Return recent release tag names for a repo using GitHub API. """ try: data = http_get_json( f"https://api.github.com/repos/{owner}/{repo}/releases?per_page=50", token ) return [ str(r.get("tag_name")) for r in data if isinstance(r, dict) and "tag_name" in r and r.get("tag_name") is not None ] except Exception as e: eprintln(f"releases list failed for {owner}/{repo}: {e}") return [] # ------------------------------ Data scanning ------------------------------ def find_packages() -> List[Tuple[str, Path, bool, bool]]: results = [] # Find regular packages with version.json for p in PKGS_DIR.rglob("version.json"): # name is directory name under packages (e.g., raspberrypi/linux-rpi => raspberrypi/linux-rpi) rel = p.relative_to(PKGS_DIR).parent results.append( (str(rel), p, False, False) ) # (name, path, is_python, is_homeassistant) # Find Python packages with default.nix python_dir = PKGS_DIR / "python" if python_dir.exists(): for pkg_dir in python_dir.iterdir(): if pkg_dir.is_dir(): nix_file = pkg_dir / "default.nix" if nix_file.exists(): # name is python/package-name rel = pkg_dir.relative_to(PKGS_DIR) results.append( (str(rel), nix_file, True, False) ) # (name, path, is_python, is_homeassistant) # Find Home Assistant components with default.nix homeassistant_dir = PKGS_DIR / "homeassistant" if homeassistant_dir.exists(): for pkg_dir in homeassistant_dir.iterdir(): if pkg_dir.is_dir(): nix_file = pkg_dir / "default.nix" if nix_file.exists(): # name is homeassistant/component-name rel = pkg_dir.relative_to(PKGS_DIR) results.append( (str(rel), nix_file, False, True) ) # (name, path, is_python, is_homeassistant) results.sort() return results def parse_python_package(path: Path) -> Dict[str, Any]: """Parse a Python package's default.nix file to extract version and source information.""" with path.open("r", encoding="utf-8") as f: content = f.read() # Extract version version_match = re.search(r'version\s*=\s*"([^"]+)"', content) version = version_match.group(1) if version_match else "" # Extract pname (package name) pname_match = re.search(r'pname\s*=\s*"([^"]+)"', content) pname = pname_match.group(1) if pname_match else "" # Check for fetchFromGitHub pattern fetch_github_match = re.search( r"src\s*=\s*fetchFromGitHub\s*\{([^}]+)\}", content, re.DOTALL ) # Check for fetchPypi pattern fetch_pypi_match = re.search( r"src\s*=\s*.*fetchPypi\s*\{([^}]+)\}", content, re.DOTALL ) # Create a structure similar to version.json for compatibility result = {"variables": {}, "sources": {}} # Only add non-empty values to variables if version: result["variables"]["version"] = version # Determine source name - use pname, repo name, or derive from path source_name = "" if pname: source_name = pname.lower() else: # Use directory name as source name source_name = path.parent.name.lower() # Handle fetchFromGitHub pattern if fetch_github_match: fetch_block = fetch_github_match.group(1) # Extract GitHub info from the fetchFromGitHub block owner_match = re.search(r'owner\s*=\s*"([^"]+)"', fetch_block) repo_match = re.search(r'repo\s*=\s*"([^"]+)"', fetch_block) rev_match = re.search(r'rev\s*=\s*"([^"]+)"', fetch_block) hash_match = re.search(r'(sha256|hash)\s*=\s*"([^"]+)"', fetch_block) owner = owner_match.group(1) if owner_match else "" repo = repo_match.group(1) if repo_match else "" rev = rev_match.group(1) if rev_match else "" hash_value = hash_match.group(2) if hash_match else "" # Create source entry result["sources"][source_name] = { "fetcher": "github", "owner": owner, "repo": repo, "hash": hash_value, } # Handle rev field which might contain a tag or version reference if rev: # Check if it's a tag reference (starts with v) if rev.startswith("v"): result["sources"][source_name]["tag"] = rev # Check if it contains ${version} variable elif "${version}" in rev: result["sources"][source_name]["tag"] = rev # Check if it's "master" or a specific branch elif rev in ["master", "main"]: result["sources"][source_name]["rev"] = rev # Otherwise treat as a regular revision else: result["sources"][source_name]["rev"] = rev # Handle fetchPypi pattern elif fetch_pypi_match: fetch_block = fetch_pypi_match.group(1) # Extract PyPI info hash_match = re.search(r'(sha256|hash)\s*=\s*"([^"]+)"', fetch_block) hash_value = hash_match.group(2) if hash_match else "" # Look for GitHub info in meta section homepage_match = re.search( r'homepage\s*=\s*"https://github.com/([^/]+)/([^"]+)"', content ) if homepage_match: owner = homepage_match.group(1) repo = homepage_match.group(2) # Create source entry with GitHub info result["sources"][source_name] = { "fetcher": "github", "owner": owner, "repo": repo, "hash": hash_value, "pypi": True, # Mark as PyPI source } # Add version as tag if available if version: result["sources"][source_name]["tag"] = f"v{version}" else: # Create PyPI source entry result["sources"][source_name] = { "fetcher": "pypi", "pname": pname, "version": version, "hash": hash_value, } else: # Try to extract standalone GitHub info if present owner_match = re.search(r'owner\s*=\s*"([^"]+)"', content) repo_match = re.search(r'repo\s*=\s*"([^"]+)"', content) rev_match = re.search(r'rev\s*=\s*"([^"]+)"', content) tag_match = re.search(r'tag\s*=\s*"([^"]+)"', content) hash_match = re.search(r'(sha256|hash)\s*=\s*"([^"]+)"', content) owner = owner_match.group(1) if owner_match else "" repo = repo_match.group(1) if repo_match else "" rev = rev_match.group(1) if rev_match else "" tag = tag_match.group(1) if tag_match else "" hash_value = hash_match.group(2) if hash_match else "" # Try to extract URL if GitHub info is not present url_match = re.search(r'url\s*=\s*"([^"]+)"', content) url = url_match.group(1) if url_match else "" # Check for GitHub homepage in meta section homepage_match = re.search( r'homepage\s*=\s*"https://github.com/([^/]+)/([^"]+)"', content ) if homepage_match and not (owner and repo): owner = homepage_match.group(1) repo = homepage_match.group(2) # Handle GitHub sources if owner and repo: result["sources"][source_name] = { "fetcher": "github", "owner": owner, "repo": repo, "hash": hash_value, } # Handle tag if tag: result["sources"][source_name]["tag"] = tag # Handle rev elif rev: result["sources"][source_name]["rev"] = rev # Handle URL sources elif url: result["sources"][source_name] = { "fetcher": "url", "url": url, "hash": hash_value, } # Fallback for packages with no clear source info else: # Create a minimal source entry so the package shows up in the UI result["sources"][source_name] = {"fetcher": "unknown", "hash": hash_value} return result def update_python_package( path: Path, source_name: str, updates: Dict[str, Any] ) -> bool: """Update a Python package's default.nix file with new version and/or hash.""" with path.open("r", encoding="utf-8") as f: content = f.read() modified = False # Update version if provided if "version" in updates: new_version = updates["version"] content, version_count = re.subn( r'(version\s*=\s*)"([^"]+)"', f'\\1"{new_version}"', content ) if version_count > 0: modified = True # Update hash if provided if "hash" in updates: new_hash = updates["hash"] # Match both sha256 and hash attributes content, hash_count = re.subn( r'(sha256|hash)\s*=\s*"([^"]+)"', f'\\1 = "{new_hash}"', content ) if hash_count > 0: modified = True # Update tag if provided if "tag" in updates: new_tag = updates["tag"] content, tag_count = re.subn( r'(tag\s*=\s*)"([^"]+)"', f'\\1"{new_tag}"', content ) if tag_count > 0: modified = True # Update rev if provided if "rev" in updates: new_rev = updates["rev"] content, rev_count = re.subn( r'(rev\s*=\s*)"([^"]+)"', f'\\1"{new_rev}"', content ) if rev_count > 0: modified = True if modified: with path.open("w", encoding="utf-8") as f: f.write(content) return modified def parse_homeassistant_component(path: Path) -> Dict[str, Any]: """Parse a Home Assistant component's default.nix file to extract version and source information.""" with path.open("r", encoding="utf-8") as f: content = f.read() # Extract domain, version, and owner domain_match = re.search(r'domain\s*=\s*"([^"]+)"', content) version_match = re.search(r'version\s*=\s*"([^"]+)"', content) owner_match = re.search(r'owner\s*=\s*"([^"]+)"', content) domain = domain_match.group(1) if domain_match else "" version = version_match.group(1) if version_match else "" owner = owner_match.group(1) if owner_match else "" # Extract GitHub repo info repo_match = re.search(r'repo\s*=\s*"([^"]+)"', content) rev_match = re.search(r'rev\s*=\s*"([^"]+)"', content) tag_match = re.search(r'tag\s*=\s*"([^"]+)"', content) hash_match = re.search(r'(sha256|hash)\s*=\s*"([^"]+)"', content) repo = repo_match.group(1) if repo_match else "" rev = rev_match.group(1) if rev_match else "" tag = tag_match.group(1) if tag_match else "" hash_value = hash_match.group(2) if hash_match else "" # Create a structure similar to version.json for compatibility result = {"variables": {}, "sources": {}} # Only add non-empty values to variables if version: result["variables"]["version"] = version if domain: result["variables"]["domain"] = domain # Determine source name - use domain or directory name source_name = domain if domain else path.parent.name.lower() # Handle GitHub sources if owner: repo_name = repo if repo else source_name result["sources"][source_name] = { "fetcher": "github", "owner": owner, "repo": repo_name, } # Only add non-empty values if hash_value: result["sources"][source_name]["hash"] = hash_value # Handle tag or rev if tag: result["sources"][source_name]["tag"] = tag elif rev: result["sources"][source_name]["rev"] = rev elif ( version ): # If no tag or rev specified, but version exists, use version as tag result["sources"][source_name]["tag"] = version else: # Fallback for components with no clear source info result["sources"][source_name] = {"fetcher": "unknown"} if hash_value: result["sources"][source_name]["hash"] = hash_value return result def update_homeassistant_component( path: Path, source_name: str, updates: Dict[str, Any] ) -> bool: """Update a Home Assistant component's default.nix file with new version and/or hash.""" with path.open("r", encoding="utf-8") as f: content = f.read() modified = False # Update version if provided if "version" in updates: new_version = updates["version"] content, version_count = re.subn( r'(version\s*=\s*)"([^"]+)"', f'\\1"{new_version}"', content ) if version_count > 0: modified = True # Update hash if provided if "hash" in updates: new_hash = updates["hash"] # Match both sha256 and hash attributes in src = fetchFromGitHub { ... } content, hash_count = re.subn( r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(sha256|hash)\s*=\s*"([^"]+)"([^}]*\})', f'\\1\\2 = "{new_hash}"\\4', content, ) if hash_count > 0: modified = True # Update tag if provided if "tag" in updates: new_tag = updates["tag"] content, tag_count = re.subn( r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(tag|rev)\s*=\s*"([^"]+)"([^}]*\})', f'\\1\\2 = "{new_tag}"\\4', content, ) if tag_count == 0: # If no tag/rev found, try to add it content, tag_count = re.subn( r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(hash\s*=\s*"[^"]+")([^}]*\})', f'\\1\\2;\n tag = "{new_tag}"\\3', content, ) if tag_count > 0: modified = True # Update rev if provided if "rev" in updates: new_rev = updates["rev"] content, rev_count = re.subn( r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(rev|tag)\s*=\s*"([^"]+)"([^}]*\})', f'\\1\\2 = "{new_rev}"\\4', content, ) if rev_count == 0: # If no rev/tag found, try to add it content, rev_count = re.subn( r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(hash\s*=\s*"[^"]+")([^}]*\})', f'\\1\\2;\n rev = "{new_rev}"\\3', content, ) if rev_count > 0: modified = True if modified: with path.open("w", encoding="utf-8") as f: f.write(content) return modified # ------------------------------ TUI helpers ------------------------------ # Define color pairs COLOR_NORMAL = 1 COLOR_HIGHLIGHT = 2 COLOR_HEADER = 3 COLOR_STATUS = 4 COLOR_ERROR = 5 COLOR_SUCCESS = 6 COLOR_BORDER = 7 COLOR_TITLE = 8 def init_colors(): """Initialize color pairs for the TUI.""" curses.start_color() curses.use_default_colors() # Define color pairs curses.init_pair(COLOR_NORMAL, curses.COLOR_WHITE, -1) curses.init_pair(COLOR_HIGHLIGHT, curses.COLOR_BLACK, curses.COLOR_CYAN) curses.init_pair(COLOR_HEADER, curses.COLOR_CYAN, -1) curses.init_pair(COLOR_STATUS, curses.COLOR_YELLOW, -1) curses.init_pair(COLOR_ERROR, curses.COLOR_RED, -1) curses.init_pair(COLOR_SUCCESS, curses.COLOR_GREEN, -1) curses.init_pair(COLOR_BORDER, curses.COLOR_BLUE, -1) curses.init_pair(COLOR_TITLE, curses.COLOR_MAGENTA, -1) def draw_border(win, y, x, h, w): """Draw a border around a region of the window.""" # Draw corners win.addch(y, x, curses.ACS_ULCORNER, curses.color_pair(COLOR_BORDER)) win.addch(y, x + w - 1, curses.ACS_URCORNER, curses.color_pair(COLOR_BORDER)) win.addch(y + h - 1, x, curses.ACS_LLCORNER, curses.color_pair(COLOR_BORDER)) # Draw bottom-right corner safely try: win.addch( y + h - 1, x + w - 1, curses.ACS_LRCORNER, curses.color_pair(COLOR_BORDER) ) except curses.error: # This is expected when trying to write to the bottom-right corner pass # Draw horizontal lines for i in range(1, w - 1): win.addch(y, x + i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER)) win.addch(y + h - 1, x + i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER)) # Draw vertical lines for i in range(1, h - 1): win.addch(y + i, x, curses.ACS_VLINE, curses.color_pair(COLOR_BORDER)) win.addch(y + i, x + w - 1, curses.ACS_VLINE, curses.color_pair(COLOR_BORDER)) class ScreenBase: def __init__(self, stdscr): self.stdscr = stdscr self.status = "" self.status_type = "normal" # "normal", "error", "success" def draw_status(self, height, width): if self.status: color = COLOR_STATUS if self.status_type == "error": color = COLOR_ERROR elif self.status_type == "success": color = COLOR_SUCCESS self.stdscr.addstr( height - 1, 0, self.status[: max(0, width - 1)], curses.color_pair(color), ) else: self.stdscr.addstr( height - 1, 0, "q: quit, Backspace: back, Enter: select", curses.color_pair(COLOR_STATUS), ) def set_status(self, text: str, status_type="normal"): self.status = text self.status_type = status_type def run(self): raise NotImplementedError def prompt_input(stdscr, prompt: str) -> Optional[str]: curses.echo() stdscr.addstr(prompt, curses.color_pair(COLOR_HEADER)) stdscr.clrtoeol() s = stdscr.getstr().decode("utf-8") curses.noecho() return s def show_popup(stdscr, lines: List[str], title: str = ""): h, w = stdscr.getmaxyx() box_h = min(len(lines) + 4, h - 2) box_w = min(max(max(len(l) for l in lines), len(title)) + 6, w - 2) top = (h - box_h) // 2 left = (w - box_w) // 2 win = curses.newwin(box_h, box_w, top, left) # Draw fancy border draw_border(win, 0, 0, box_h, box_w) # Add title if provided if title: title_x = (box_w - len(title)) // 2 win.addstr(0, title_x, f" {title} ", curses.color_pair(COLOR_TITLE)) # Add content for i, line in enumerate(lines, start=1): if i >= box_h - 1: break win.addstr(i, 2, line[: box_w - 4], curses.color_pair(COLOR_NORMAL)) # Add footer footer = "Press any key to continue" footer_x = (box_w - len(footer)) // 2 win.addstr(box_h - 1, footer_x, footer, curses.color_pair(COLOR_STATUS)) win.refresh() win.getch() # ------------------------------ Screens ------------------------------ class PackagesScreen(ScreenBase): def __init__(self, stdscr): super().__init__(stdscr) self.packages = find_packages() self.idx = 0 self.filter_mode = "all" # "all", "regular", "python" self.scroll_offset = 0 # Add scroll offset to handle long lists def run(self): while True: self.stdscr.clear() h, w = self.stdscr.getmaxyx() # Determine split layout left_w = max(30, min(60, w // 3)) right_x = left_w + 1 right_w = max(0, w - right_x) # Draw borders for left and right panes draw_border(self.stdscr, 0, 0, h - 1, left_w) if right_w >= 20: draw_border(self.stdscr, 0, right_x, h - 1, right_w) # Left pane: package list title = "Packages" if self.filter_mode == "regular": title = "Packages (version.json)" elif self.filter_mode == "python": title = "Python Packages" else: title = "All Packages [f to filter]" # Center the title in the left pane title_x = (left_w - len(title)) // 2 self.stdscr.addstr( 0, title_x, f" {title} ", curses.color_pair(COLOR_TITLE) | curses.A_BOLD ) # Filter packages based on mode filtered_packages = self.packages if self.filter_mode == "regular": filtered_packages = [ p for p in self.packages if not p[2] ] # Not Python packages elif self.filter_mode == "python": filtered_packages = [ p for p in self.packages if p[2] ] # Only Python packages # Implement scrolling for long lists max_rows = h - 3 total_packages = len(filtered_packages) # Adjust scroll offset if needed if self.idx >= self.scroll_offset + max_rows: self.scroll_offset = self.idx - max_rows + 1 elif self.idx < self.scroll_offset: self.scroll_offset = self.idx # Display visible packages with scroll offset visible_packages = filtered_packages[ self.scroll_offset : self.scroll_offset + max_rows ] # Show scroll indicators if needed if self.scroll_offset > 0: self.stdscr.addstr(1, left_w - 3, "โ†‘", curses.color_pair(COLOR_STATUS)) if self.scroll_offset + max_rows < total_packages: self.stdscr.addstr( min(1 + len(visible_packages), h - 2), left_w - 3, "โ†“", curses.color_pair(COLOR_STATUS), ) for i, (name, _path, is_python, is_homeassistant) in enumerate( visible_packages, start=0 ): # Use consistent display style for all packages pkg_type = "" # Remove the [Py] prefix for consistent display # Highlight the selected item if i + self.scroll_offset == self.idx: attr = curses.color_pair(COLOR_HIGHLIGHT) sel = "โ–บ" # Use a fancier selector else: attr = curses.color_pair(COLOR_NORMAL) sel = " " # Add a small icon for Python packages or Home Assistant components if is_python: pkg_type = "๐Ÿ " # Python icon elif is_homeassistant: pkg_type = "๐Ÿ  " # Home Assistant icon self.stdscr.addstr( 1 + i, 2, f"{sel} {pkg_type}{name}"[: max(0, left_w - 5)], attr ) # Right pane: preview of selected package (non-interactive summary) if right_w >= 20 and filtered_packages: try: name, path, is_python, is_homeassistant = filtered_packages[ self.idx ] # Center the package name in the right pane header title_x = right_x + (right_w - len(name)) // 2 self.stdscr.addstr( 0, title_x, f" {name} ", curses.color_pair(COLOR_TITLE) | curses.A_BOLD, ) # Path with a nice label self.stdscr.addstr( 1, right_x + 2, "Path:", curses.color_pair(COLOR_HEADER) ) self.stdscr.addstr( 1, right_x + 8, f"{path}"[: max(0, right_w - 10)], curses.color_pair(COLOR_NORMAL), ) # Sources header self.stdscr.addstr( 2, right_x + 2, "Sources:", curses.color_pair(COLOR_HEADER) ) if is_python: spec = parse_python_package(path) elif is_homeassistant: spec = parse_homeassistant_component(path) else: spec = load_json(path) merged_vars, merged_srcs, _ = merged_view(spec, None) snames = sorted(list(merged_srcs.keys())) max_src_rows = max(0, h - 6) for i2, sname in enumerate(snames[:max_src_rows], start=0): comp = merged_srcs[sname] fetcher = comp.get("fetcher", "none") # Construct concise reference similar to detail view display_ref = ( comp.get("tag") or comp.get("rev") or comp.get("version") or "" ) if fetcher == "github": rendered = render_templates(comp, merged_vars) tag = rendered.get("tag") rev = rendered.get("rev") owner = ( rendered.get("owner") or merged_vars.get("owner") or "" ) repo = rendered.get("repo") or merged_vars.get("repo") or "" if tag and owner and repo: display_ref = f"{owner}/{repo}@{tag}" elif tag: display_ref = tag elif rev and owner and repo: display_ref = f"{owner}/{repo}@{rev[:7]}" elif rev: display_ref = rev[:12] elif fetcher == "url": rendered = render_templates(comp, merged_vars) url = ( rendered.get("url") or rendered.get("urlTemplate") or "" ) if url: owner = str(merged_vars.get("owner", "") or "") repo = str(merged_vars.get("repo", "") or "") rp = str(merged_vars.get("releasePrefix", "") or "") rs = str(merged_vars.get("releaseSuffix", "") or "") base = str(merged_vars.get("base", "") or "") rel = str(merged_vars.get("release", "") or "") tag = f"{rp}{base}-{rel}{rs}" if (base and rel) else "" parsed = urlparse(url) filename = ( os.path.basename(parsed.path) if parsed and parsed.path else "" ) if owner and repo and tag and filename: display_ref = f"{owner}/{repo}@{tag} ยท {filename}" elif filename: display_ref = filename else: display_ref = url else: display_ref = "" # Truncate reference to fit right pane if isinstance(display_ref, str): max_ref = max(0, right_w - 30) ref_short = display_ref[:max_ref] + ( "..." if len(display_ref) > max_ref else "" ) else: ref_short = display_ref # Color-code the fetcher type fetcher_color = COLOR_NORMAL if fetcher == "github": fetcher_color = COLOR_SUCCESS elif fetcher == "url": fetcher_color = COLOR_STATUS elif fetcher == "git": fetcher_color = COLOR_HEADER # Display source name self.stdscr.addstr( 3 + i2, right_x + 2, f"{sname:<18}", curses.color_pair(COLOR_NORMAL), ) # Display fetcher with color self.stdscr.addstr( 3 + i2, right_x + 21, f"{fetcher:<7}", curses.color_pair(fetcher_color), ) # Display reference self.stdscr.addstr( 3 + i2, right_x + 29, f"{ref_short}"[: max(0, right_w - 31)], curses.color_pair(COLOR_NORMAL), ) # Hint line for workflow hint = "Enter: open details | k/j: move | q: quit" if h >= 5: hint_x = right_x + (right_w - len(hint)) // 2 self.stdscr.addstr( h - 5, hint_x, hint[: max(0, right_w - 1)], curses.color_pair(COLOR_STATUS), ) except Exception as e: self.stdscr.addstr( 2, right_x + 2, "Error:", curses.color_pair(COLOR_ERROR) ) self.stdscr.addstr( 2, right_x + 9, f"{e}"[: max(0, right_w - 11)], curses.color_pair(COLOR_ERROR), ) self.draw_status(h, w) self.stdscr.refresh() ch = self.stdscr.getch() if ch in (ord("q"), 27): # q or ESC return None elif ch in (curses.KEY_UP, ord("k")): self.idx = max(0, self.idx - 1) elif ch in (curses.KEY_DOWN, ord("j")): self.idx = min(len(self.packages) - 1, self.idx + 1) elif ch == curses.KEY_PPAGE: # Page Up self.idx = max(0, self.idx - (h - 4)) elif ch == curses.KEY_NPAGE: # Page Down self.idx = min(len(self.packages) - 1, self.idx + (h - 4)) elif ch == ord("g"): # Go to top self.idx = 0 elif ch == ord("G"): # Go to bottom self.idx = len(self.packages) - 1 elif ch == ord("f"): # Cycle through filter modes if self.filter_mode == "all": self.filter_mode = "regular" elif self.filter_mode == "regular": self.filter_mode = "python" else: self.filter_mode = "all" self.idx = 0 # Reset selection when changing filters elif ch in (curses.KEY_ENTER, 10, 13): filtered_packages = self.packages if self.filter_mode == "regular": filtered_packages = [p for p in self.packages if not p[2]] elif self.filter_mode == "python": filtered_packages = [p for p in self.packages if p[2]] if not filtered_packages: continue name, path, is_python, is_homeassistant = filtered_packages[self.idx] try: if is_python: spec = parse_python_package(path) elif is_homeassistant: spec = parse_homeassistant_component(path) else: spec = load_json(path) except Exception as e: self.set_status(f"Failed to load {path}: {e}") continue screen = PackageDetailScreen( self.stdscr, name, path, spec, is_python, is_homeassistant ) ret = screen.run() if ret == "reload": # re-scan on save self.packages = find_packages() self.idx = min(self.idx, len(self.packages) - 1) else: pass class PackageDetailScreen(ScreenBase): def __init__( self, stdscr, pkg_name: str, path: Path, spec: Json, is_python: bool = False, is_homeassistant: bool = False, ): super().__init__(stdscr) self.pkg_name = pkg_name self.path = path self.spec = spec self.is_python = is_python self.is_homeassistant = is_homeassistant self.variants = [""] + sorted(list(self.spec.get("variants", {}).keys())) self.vidx = 0 self.gh_token = os.environ.get("GITHUB_TOKEN") self.candidates: Dict[ str, Dict[str, str] ] = {} # name -> {release, tag, commit} self.url_candidates: Dict[ str, Dict[str, str] ] = {} # name -> {base, release, tag} # initialize view self.recompute_view() def select_variant(self): # Recompute merged and target views when variant changes self.recompute_view() def recompute_view(self): # Set cursor to base or selected variant dict for manual edits if self.vidx == 0: self.cursor = self.spec variant_name = None else: variant_name = self.variants[self.vidx] self.cursor = self.spec["variants"][variant_name] # Compute merged view and target dict for writing self.merged_vars, self.merged_srcs, self.target_dict = merged_view( self.spec, variant_name ) self.snames = sorted(list(self.merged_srcs.keys())) self.sidx = 0 def fetch_candidates_for(self, name: str): comp = self.merged_srcs[name] fetcher = comp.get("fetcher", "none") c = {"release": "", "tag": "", "commit": ""} if fetcher == "github": owner = comp.get("owner") repo = comp.get("repo") if owner and repo: r = gh_latest_release(owner, repo, self.gh_token) if r: c["release"] = r t = gh_latest_tag(owner, repo, self.gh_token) if t: c["tag"] = t m = gh_head_commit(owner, repo) if m: c["commit"] = m # Special-case raspberrypi/linux: prefer latest stable_* tag or series-specific tags try: if owner == "raspberrypi" and repo == "linux": tags_all = gh_list_tags(owner, repo, self.gh_token) rendered = render_templates(comp, self.merged_vars) cur_tag = str(rendered.get("tag") or "") # If current tag uses stable_YYYYMMDD scheme, pick latest stable_* tag if cur_tag.startswith("stable_"): stable_tags = sorted( [x for x in tags_all if re.match(r"^stable_\d{8}$", x)], reverse=True, ) if stable_tags: c["tag"] = stable_tags[0] else: # Try to pick a tag matching the current major.minor series if available mm = str(self.merged_vars.get("modDirVersion") or "") m2 = re.match(r"^(\d+)\.(\d+)", mm) if m2: base = f"rpi-{m2.group(1)}.{m2.group(2)}" series_tags = [ x for x in tags_all if ( x == f"{base}.y" or x.startswith(f"{base}.y") or x.startswith(f"{base}.") ) ] series_tags.sort(reverse=True) if series_tags: c["tag"] = series_tags[0] except Exception as _e: # Fallback to previously computed values pass elif fetcher == "git": url = comp.get("url") if url: out = run_get_stdout(["git", "ls-remote", url, "HEAD"]) if out: c["commit"] = out.split()[0] elif fetcher == "url": # Heuristic for GitHub release assets with variables in version.json (e.g., proton-cachyos) owner = self.merged_vars.get("owner") repo = self.merged_vars.get("repo") if owner and repo: tags = gh_release_tags_api(str(owner), str(repo), self.gh_token) prefix = str(self.merged_vars.get("releasePrefix", "")) suffix = str(self.merged_vars.get("releaseSuffix", "")) latest = next( ( t for t in tags if (t and t.startswith(prefix) and t.endswith(suffix)) ), None, ) if latest: c["release"] = latest mid = latest if prefix and mid.startswith(prefix): mid = mid[len(prefix) :] if suffix and mid.endswith(suffix): mid = mid[: -len(suffix)] parts = mid.split("-") if len(parts) >= 2: base, rel = parts[0], parts[-1] self.url_candidates[name] = { "base": base, "release": rel, "tag": latest, } self.candidates[name] = c def prefetch_hash_for(self, name: str) -> Optional[str]: comp = self.merged_srcs[name] fetcher = comp.get("fetcher", "none") if fetcher == "github": owner = comp.get("owner") repo = comp.get("repo") rendered = render_templates(comp, self.merged_vars) ref = rendered.get("tag") or rendered.get("rev") if owner and repo and ref: url = gh_tarball_url(owner, repo, ref) return nix_prefetch_url(url) elif fetcher == "git": url = comp.get("url") rev = comp.get("rev") if url and rev: return nix_prefetch_git(url, rev) elif fetcher == "url": rendered = render_templates(comp, self.merged_vars) url = rendered.get("url") or rendered.get("urlTemplate") if url: return nix_prefetch_url(url) return None def cachyos_suffix(self) -> str: if self.vidx == 0: return "" v = self.variants[self.vidx] mapping = {"rc": "-rc", "hardened": "-hardened", "lts": "-lts"} return mapping.get(v, "") def fetch_cachyos_linux_latest(self, suffix: str) -> Optional[str]: """ Try to determine latest linux version from upstream: - Prefer .SRCINFO (preprocessed) - Fallback to PKGBUILD (parse pkgver= line) Tries both 'CachyOS' and 'cachyos' org casing just in case. """ bases = [ "https://raw.githubusercontent.com/CachyOS/linux-cachyos/master", "https://raw.githubusercontent.com/cachyos/linux-cachyos/master", ] paths = [ f"linux-cachyos{suffix}/.SRCINFO", f"linux-cachyos{suffix}/PKGBUILD", ] def parse_srcinfo(text: str) -> Optional[str]: m = re.search(r"^\s*pkgver\s*=\s*([^\s#]+)\s*$", text, re.MULTILINE) if not m: return None v = m.group(1).strip() return v def parse_pkgbuild(text: str) -> Optional[str]: # Parse assignments and expand variables in pkgver # Build a simple env map from VAR=value lines env: Dict[str, str] = {} for line in text.splitlines(): line = line.strip() if not line or line.startswith("#"): continue m_assign = re.match(r"^\s*([A-Za-z_][A-Za-z0-9_]*)\s*=\s*(.+)$", line) if m_assign: key = m_assign.group(1) val = m_assign.group(2).strip() # Remove trailing comments val = re.sub(r"\s+#.*$", "", val).strip() # Strip surrounding quotes if (val.startswith('"') and val.endswith('"')) or ( val.startswith("'") and val.endswith("'") ): val = val[1:-1] env[key] = val m = re.search(r"^\s*pkgver\s*=\s*(.+)$", text, re.MULTILINE) if not m: return None raw = m.group(1).strip() # Strip quotes if (raw.startswith('"') and raw.endswith('"')) or ( raw.startswith("'") and raw.endswith("'") ): raw = raw[1:-1] def expand_vars(s: str) -> str: def repl_braced(mb): key = mb.group(1) return env.get(key, mb.group(0)) or mb.group(0) def repl_unbraced(mu): key = mu.group(1) return env.get(key, mu.group(0)) or mu.group(0) # Expand ${var} then $var s = re.sub(r"\$\{([^}]+)\}", repl_braced, s) s = re.sub(r"\$([A-Za-z_][A-Za-z0-9_]*)", repl_unbraced, s) return s v = expand_vars(raw).strip() # normalize rc form like 6.19.rc6 -> 6.19-rc6 v = v.replace(".rc", "-rc") return v # Try .SRCINFO first, then PKGBUILD for base in bases: # .SRCINFO url = f"{base}/{paths[0]}" text = http_get_text(url) if text: ver = parse_srcinfo(text) if ver: return ver.replace(".rc", "-rc") # PKGBUILD fallback url = f"{base}/{paths[1]}" text = http_get_text(url) if text: ver = parse_pkgbuild(text) if ver: return ver.replace(".rc", "-rc") return None def linux_tarball_url_for_version(self, version: str) -> str: # Use torvalds snapshot for -rc, stable releases from CDN if "-rc" in version: return f"https://git.kernel.org/torvalds/t/linux-{version}.tar.gz" parts = version.split(".") major = parts[0] if parts else "6" major_minor = ".".join(parts[:2]) if len(parts) >= 2 else version ver_for_tar = major_minor if version.endswith(".0") else version return f"https://cdn.kernel.org/pub/linux/kernel/v{major}.x/linux-{ver_for_tar}.tar.xz" def update_linux_from_pkgbuild(self, name: str): suffix = self.cachyos_suffix() latest = self.fetch_cachyos_linux_latest(suffix) if not latest: self.set_status("linux: failed to get version from PKGBUILD") return url = self.linux_tarball_url_for_version(latest) sri = nix_prefetch_url(url) if not sri: self.set_status("linux: prefetch failed") return ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["version"] = latest compw["hash"] = sri self.set_status(f"{name}: updated version to {latest} and refreshed hash") def set_ref(self, name: str, kind: str, value: str): # Write to selected target dict (base or variant override) ts = self.target_dict.setdefault("sources", {}) comp = ts.setdefault(name, {}) if kind in ("release", "tag"): comp["tag"] = value if "rev" in comp: del comp["rev"] elif kind == "commit": comp["rev"] = value if "tag" in comp: del comp["tag"] def save(self): if self.is_python: # For Python packages, update the default.nix file for name in self.snames: source = self.merged_srcs[name] updates = {} # Get version from variables if "version" in self.merged_vars: updates["version"] = self.merged_vars["version"] # Get hash from source if "hash" in source: updates["hash"] = source["hash"] # Get tag from source if "tag" in source: updates["tag"] = source["tag"] # Get rev from source if "rev" in source: updates["rev"] = source["rev"] if updates: update_python_package(self.path, name, updates) return True elif self.is_homeassistant: # For Home Assistant components, update the default.nix file for name in self.snames: source = self.merged_srcs[name] updates = {} # Get version from variables if "version" in self.merged_vars: updates["version"] = self.merged_vars["version"] # Get hash from source if "hash" in source: updates["hash"] = source["hash"] # Get tag from source if "tag" in source: updates["tag"] = source["tag"] # Get rev from source if "rev" in source: updates["rev"] = source["rev"] if updates: update_homeassistant_component(self.path, name, updates) return True else: # For regular packages, save to version.json save_json(self.path, self.spec) return True def run(self): while True: self.stdscr.clear() h, w = self.stdscr.getmaxyx() # Draw main border around the entire screen draw_border(self.stdscr, 0, 0, h - 1, w) # Title with package name and path title = f"{self.pkg_name} [{self.path}]" if self.is_python: title += " [Python Package]" # Center the title title_x = (w - len(title)) // 2 self.stdscr.addstr( 0, title_x, f" {title} ", curses.color_pair(COLOR_TITLE) | curses.A_BOLD ) # Variant line with highlighting for selected variant if not self.is_python: vline_parts = [] for i, v in enumerate(self.variants): if i == self.vidx: vline_parts.append(f"[{v}]") else: vline_parts.append(v) vline = "Variants: " + " | ".join(vline_parts) self.stdscr.addstr(1, 2, "Variants:", curses.color_pair(COLOR_HEADER)) # Display each variant with appropriate highlighting x_pos = 12 # Position after "Variants: " for i, v in enumerate(self.variants): if i > 0: self.stdscr.addstr( 1, x_pos, " | ", curses.color_pair(COLOR_NORMAL) ) x_pos += 3 if i == self.vidx: self.stdscr.addstr( 1, x_pos, f"[{v}]", curses.color_pair(COLOR_HIGHLIGHT) ) x_pos += len(f"[{v}]") else: self.stdscr.addstr(1, x_pos, v, curses.color_pair(COLOR_NORMAL)) x_pos += len(v) else: # For Python packages, show version instead of variants version = self.merged_vars.get("version", "") self.stdscr.addstr(1, 2, "Version:", curses.color_pair(COLOR_HEADER)) self.stdscr.addstr(1, 11, version, curses.color_pair(COLOR_SUCCESS)) # Sources header with decoration self.stdscr.addstr( 2, 2, "Sources:", curses.color_pair(COLOR_HEADER) | curses.A_BOLD ) # Draw a separator line under the header for i in range(1, w - 1): self.stdscr.addch( 3, i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER) ) # List sources for i, name in enumerate(self.snames[: h - 10], start=0): comp = self.merged_srcs[name] fetcher = comp.get("fetcher", "none") # Render refs so variables resolve; compress long forms for display display_ref = ( comp.get("tag") or comp.get("rev") or comp.get("version") or "" ) if fetcher == "github": rendered = render_templates(comp, self.merged_vars) tag = rendered.get("tag") rev = rendered.get("rev") owner = rendered.get("owner") or self.merged_vars.get("owner") or "" repo = rendered.get("repo") or self.merged_vars.get("repo") or "" if tag and owner and repo: display_ref = f"{owner}/{repo}@{tag}" elif tag: display_ref = tag elif rev and owner and repo: display_ref = f"{owner}/{repo}@{rev[:7]}" elif rev: display_ref = rev[:12] elif fetcher == "url": rendered = render_templates(comp, self.merged_vars) url = rendered.get("url") or rendered.get("urlTemplate") or "" if url: # Prefer a concise label like owner/repo@tag ยท filename owner = str(self.merged_vars.get("owner", "") or "") repo = str(self.merged_vars.get("repo", "") or "") rp = str(self.merged_vars.get("releasePrefix", "") or "") rs = str(self.merged_vars.get("releaseSuffix", "") or "") base = str(self.merged_vars.get("base", "") or "") rel = str(self.merged_vars.get("release", "") or "") tag = f"{rp}{base}-{rel}{rs}" if (base and rel) else "" parsed = urlparse(url) filename = ( os.path.basename(parsed.path) if parsed and parsed.path else "" ) if owner and repo and tag and filename: display_ref = f"{owner}/{repo}@{tag} ยท {filename}" elif filename: display_ref = filename else: display_ref = url else: display_ref = "" ref_short = ( display_ref if not isinstance(display_ref, str) else (display_ref[:60] + ("..." if len(display_ref) > 60 else "")) ) # Determine colors and styles based on selection and fetcher type if i == self.sidx: # Selected item attr = curses.color_pair(COLOR_HIGHLIGHT) sel = "โ–บ" # Use a fancier selector else: # Non-selected item attr = curses.color_pair(COLOR_NORMAL) sel = " " # Determine fetcher color fetcher_color = COLOR_NORMAL if fetcher == "github": fetcher_color = COLOR_SUCCESS elif fetcher == "url": fetcher_color = COLOR_STATUS elif fetcher == "git": fetcher_color = COLOR_HEADER # Display source name with selection indicator self.stdscr.addstr(4 + i, 2, f"{sel} {name:<20}", attr) # Display fetcher with appropriate color self.stdscr.addstr(4 + i, 24, fetcher, curses.color_pair(fetcher_color)) # Display reference self.stdscr.addstr( 4 + i, 32, f"ref={ref_short}"[: w - 34], curses.color_pair(COLOR_NORMAL), ) # Draw a separator line before the latest candidates section y_latest = h - 8 for i in range(1, w - 1): self.stdscr.addch( y_latest, i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER) ) # Latest candidates section for selected component (auto-fetched) if self.snames: _sel_name = self.snames[self.sidx] _comp = self.merged_srcs[_sel_name] _fetcher = _comp.get("fetcher", "none") # Preload candidates lazily for selected item if ( _fetcher in ("github", "git", "url") and _sel_name not in self.candidates ): self.fetch_candidates_for(_sel_name) # Latest header with decoration self.stdscr.addstr( y_latest + 1, 2, "Latest Versions:", curses.color_pair(COLOR_HEADER) | curses.A_BOLD, ) if _fetcher in ("github", "git"): _cand = self.candidates.get(_sel_name, {}) # Display each candidate with appropriate color if _cand.get("release"): self.stdscr.addstr( y_latest + 2, 4, "Release:", curses.color_pair(COLOR_HEADER) ) self.stdscr.addstr( y_latest + 2, 13, _cand.get("release"), curses.color_pair(COLOR_SUCCESS), ) if _cand.get("tag"): self.stdscr.addstr( y_latest + 2, 30, "Tag:", curses.color_pair(COLOR_HEADER) ) self.stdscr.addstr( y_latest + 2, 35, _cand.get("tag"), curses.color_pair(COLOR_SUCCESS), ) if _cand.get("commit"): self.stdscr.addstr( y_latest + 3, 4, "Commit:", curses.color_pair(COLOR_HEADER) ) self.stdscr.addstr( y_latest + 3, 12, (_cand.get("commit") or "")[:12], curses.color_pair(COLOR_NORMAL), ) elif _fetcher == "url": _cand_u = self.url_candidates.get(_sel_name, {}) or {} _tag = _cand_u.get("tag") or ( self.candidates.get(_sel_name, {}).get("release") or "-" ) if _tag != "-": self.stdscr.addstr( y_latest + 2, 4, "Tag:", curses.color_pair(COLOR_HEADER) ) self.stdscr.addstr( y_latest + 2, 9, _tag, curses.color_pair(COLOR_SUCCESS) ) if _cand_u.get("base"): self.stdscr.addstr( y_latest + 2, 30, "Base:", curses.color_pair(COLOR_HEADER) ) self.stdscr.addstr( y_latest + 2, 36, _cand_u.get("base"), curses.color_pair(COLOR_NORMAL), ) if _cand_u.get("release"): self.stdscr.addstr( y_latest + 3, 4, "Release:", curses.color_pair(COLOR_HEADER) ) self.stdscr.addstr( y_latest + 3, 13, _cand_u.get("release"), curses.color_pair(COLOR_NORMAL), ) else: if self.pkg_name == "linux-cachyos" and _sel_name == "linux": _suffix = self.cachyos_suffix() _latest = self.fetch_cachyos_linux_latest(_suffix) self.stdscr.addstr( y_latest + 2, 4, "Linux from PKGBUILD:", curses.color_pair(COLOR_HEADER), ) if _latest: self.stdscr.addstr( y_latest + 2, 24, _latest, curses.color_pair(COLOR_SUCCESS), ) else: self.stdscr.addstr( y_latest + 2, 24, "-", curses.color_pair(COLOR_NORMAL) ) else: self.stdscr.addstr( y_latest + 2, 4, "No candidates available", curses.color_pair(COLOR_NORMAL), ) # Draw a separator line before the footer for i in range(1, w - 1): self.stdscr.addch( h - 5, i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER) ) # Footer instructions with better formatting footer = "Enter: component actions | r: refresh | h: hash | e: edit | s: save | Backspace: back | q: quit" footer_x = (w - len(footer)) // 2 self.stdscr.addstr(h - 4, footer_x, footer, curses.color_pair(COLOR_STATUS)) # Draw status at the bottom self.draw_status(h, w) self.stdscr.refresh() ch = self.stdscr.getch() if ch in (ord("q"), 27): return None elif ch == curses.KEY_BACKSPACE or ch == 127: return "reload" elif ch in (curses.KEY_LEFT, ord("h")): self.vidx = max(0, self.vidx - 1) self.select_variant() elif ch in (curses.KEY_RIGHT, ord("l")): self.vidx = min(len(self.variants) - 1, self.vidx + 1) self.select_variant() elif ch in (curses.KEY_UP, ord("k")): self.sidx = max(0, self.sidx - 1) elif ch in (curses.KEY_DOWN, ord("j")): self.sidx = min(len(self.snames) - 1, self.sidx + 1) elif ch in (ord("r"),): if self.snames: name = self.snames[self.sidx] comp = self.merged_srcs[name] fetcher = comp.get("fetcher", "none") if self.pkg_name == "linux-cachyos" and name == "linux": # Show available linux version from upstream PKGBUILD (.SRCINFO) suffix = self.cachyos_suffix() latest = self.fetch_cachyos_linux_latest(suffix) rendered = render_templates(comp, self.merged_vars) cur_version = str(rendered.get("version") or "") url_hint = ( self.linux_tarball_url_for_version(latest) if latest else "-" ) lines = [ f"linux-cachyos ({'base' if self.vidx == 0 else self.variants[self.vidx]}):", f" current : {cur_version or '-'}", f" available: {latest or '-'}", f" tarball : {url_hint}", ] show_popup(self.stdscr, lines) else: self.fetch_candidates_for(name) cand = self.candidates.get(name, {}) lines = [ f"Candidates for {name}:", f" latest release: {cand.get('release') or '-'}", f" latest tag : {cand.get('tag') or '-'}", f" latest commit : {cand.get('commit') or '-'}", ] show_popup(self.stdscr, lines) elif ch in (ord("i"),): # Show full rendered URL for URL-based sources if self.snames: name = self.snames[self.sidx] comp = self.merged_srcs[name] if comp.get("fetcher", "none") == "url": rendered = render_templates(comp, self.merged_vars) url = rendered.get("url") or rendered.get("urlTemplate") or "" if url: show_popup(self.stdscr, ["Full URL:", url]) else: self.set_status("No URL available") elif ch in (ord("h"),): if self.snames: name = self.snames[self.sidx] sri = self.prefetch_hash_for(name) if sri: ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["hash"] = sri self.set_status(f"{name}: updated hash") else: self.set_status(f"{name}: hash prefetch failed") elif ch in (ord("e"),): s = prompt_input( self.stdscr, "Edit path=value (relative to selected base/variant): " ) if s: if "=" not in s: self.set_status("Invalid input, expected key.path=value") else: k, v = s.split("=", 1) path = [p for p in k.split(".") if p] deep_set(self.cursor, path, v) self.set_status(f"Set {k}={v}") elif ch in (ord("s"),): try: self.save() self.set_status("Saved.") except Exception as e: self.set_status(f"Save failed: {e}") elif ch in (curses.KEY_ENTER, 10, 13): if not self.snames: continue name = self.snames[self.sidx] comp = self.merged_srcs[name] fetcher = comp.get("fetcher", "none") if fetcher in ("github", "git"): # Ensure candidates loaded if name not in self.candidates: self.fetch_candidates_for(name) cand = self.candidates.get(name, {}) # Present small menu items = [] if fetcher == "github": items = [ ( "Use latest release (tag)", ("release", cand.get("release")), ), ("Use latest tag", ("tag", cand.get("tag"))), ("Use latest commit (rev)", ("commit", cand.get("commit"))), ("Recompute hash", ("hash", None)), ("Cancel", ("cancel", None)), ] else: items = [ ("Use latest commit (rev)", ("commit", cand.get("commit"))), ("Recompute hash", ("hash", None)), ("Cancel", ("cancel", None)), ] # Build header with current and available refs rendered = render_templates(comp, self.merged_vars) cur_tag = rendered.get("tag") or "" cur_rev = rendered.get("rev") or "" cur_version = rendered.get("version") or "" if cur_tag: current_str = f"current: tag={cur_tag}" elif cur_rev: current_str = f"current: rev={cur_rev[:12]}" elif cur_version: current_str = f"current: version={cur_version}" else: current_str = "current: -" header_lines = [ current_str, f"available: release={cand.get('release') or '-'} tag={cand.get('tag') or '-'} commit={(cand.get('commit') or '')[:12] or '-'}", ] choice = select_menu( self.stdscr, f"Actions for {name}", [label for label, _ in items], header=header_lines, ) if choice is not None: kind, val = items[choice][1] if kind in ("release", "tag", "commit"): if val: self.set_ref(name, kind, val) # update hash sri = self.prefetch_hash_for(name) if sri: ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["hash"] = sri self.set_status(f"{name}: set {kind} and updated hash") else: self.set_status(f"No candidate {kind}") elif kind == "hash": sri = self.prefetch_hash_for(name) if sri: ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["hash"] = sri self.set_status(f"{name}: updated hash") else: self.set_status("hash prefetch failed") else: pass elif fetcher == "url": # Offer latest release update (for proton-cachyos-like schemas) and/or hash recompute cand = self.url_candidates.get(name) menu_items: List[ Tuple[str, Tuple[str, Optional[Dict[str, str]]]] ] = [] if cand and cand.get("base") and cand.get("release"): menu_items.append( ( "Use latest release (update variables.base/release)", ("update_vars", cand), ) ) menu_items.append(("Recompute hash (prefetch)", ("hash", None))) menu_items.append(("Cancel", ("cancel", None))) # Build header with current and available release info base = str(self.merged_vars.get("base") or "") rel = str(self.merged_vars.get("release") or "") rp = str(self.merged_vars.get("releasePrefix") or "") rs = str(self.merged_vars.get("releaseSuffix") or "") current_tag = f"{rp}{base}-{rel}{rs}" if (base and rel) else "" if current_tag: current_str = f"current: {current_tag}" elif base or rel: current_str = ( f"current: base={base or '-'} release={rel or '-'}" ) else: current_str = "current: -" header_lines = [ current_str, f"available: tag={(cand.get('tag') or '-') if cand else '-'} base={(cand.get('base') or '-') if cand else '-'} release={(cand.get('release') or '-') if cand else '-'}", ] choice = select_menu( self.stdscr, f"Actions for {name}", [label for label, _ in menu_items], header=header_lines, ) if choice is not None: kind, payload = menu_items[choice][1] if kind == "update_vars" and isinstance(payload, dict): # Write variables into selected base/variant dict vars_dict = self.target_dict.setdefault("variables", {}) vars_dict["base"] = payload["base"] vars_dict["release"] = payload["release"] # Recompute merged view to reflect new variables self.recompute_view() # Prefetch and update hash sri = self.prefetch_hash_for(name) if sri: ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["hash"] = sri self.set_status( f"{name}: updated to {payload['base']}.{payload['release']} and refreshed hash" ) else: self.set_status( "hash prefetch failed after variable update" ) elif kind == "hash": sri = self.prefetch_hash_for(name) if sri: ts = self.target_dict.setdefault("sources", {}) compw = ts.setdefault(name, {}) compw["hash"] = sri self.set_status(f"{name}: updated hash") else: self.set_status("hash prefetch failed") else: pass else: if self.pkg_name == "linux-cachyos" and name == "linux": # Offer update of linux version from upstream PKGBUILD (.SRCINFO) suffix = self.cachyos_suffix() latest = self.fetch_cachyos_linux_latest(suffix) rendered = render_templates(comp, self.merged_vars) cur_version = str(rendered.get("version") or "") header_lines = [ f"current: version={cur_version or '-'}", f"available: version={latest or '-'}", ] opts = [] if latest: opts.append( f"Update linux version to {latest} from PKGBUILD (.SRCINFO)" ) else: opts.append("Update linux version from PKGBUILD (.SRCINFO)") opts.append("Cancel") choice = select_menu( self.stdscr, f"Actions for {name}", opts, header=header_lines, ) if choice == 0 and latest: self.update_linux_from_pkgbuild(name) else: pass else: show_popup( self.stdscr, [ f"{name}: fetcher={fetcher}", "Use 'e' to edit fields manually.", ], ) else: pass def select_menu( stdscr, title: str, options: List[str], header: Optional[List[str]] = None ) -> Optional[int]: idx = 0 while True: stdscr.clear() h, w = stdscr.getmaxyx() # Calculate menu dimensions with better bounds checking max_opt_len = max((len(opt) + 4 for opt in options), default=0) title_len = len(title) + 4 menu_width = min(w - 4, max(40, max(title_len, max_opt_len))) menu_height = min(h - 4, len(options) + (len(header) if header else 0) + 4) # Calculate position for centered menu start_x = (w - menu_width) // 2 start_y = (h - menu_height) // 2 # Draw border around menu draw_border(stdscr, start_y, start_x, menu_height, menu_width) # Draw title title_x = start_x + (menu_width - len(title)) // 2 stdscr.addstr( start_y, title_x, f" {title} ", curses.color_pair(COLOR_TITLE) | curses.A_BOLD, ) # Draw header if provided y = start_y + 1 if header: for line in header: if y >= start_y + menu_height - 2: break # Ensure we don't write beyond menu width line_str = str(line) if len(line_str) > menu_width - 4: line_str = line_str[: menu_width - 7] + "..." stdscr.addstr( y, start_x + 2, line_str, curses.color_pair(COLOR_HEADER), ) y += 1 # Add separator line after header for i in range(1, menu_width - 1): stdscr.addch( y, start_x + i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER) ) y += 1 # Draw options options_start_y = y max_visible_options = max(1, start_y + menu_height - options_start_y - 1) visible_options = min(len(options), max_visible_options) for i, opt in enumerate(options[:visible_options], start=0): # Highlight selected option if i == idx: attr = curses.color_pair(COLOR_HIGHLIGHT) sel = "โ–บ" # Use a fancier selector else: attr = curses.color_pair(COLOR_NORMAL) sel = " " # Truncate long options to fit in menu opt_text = f"{sel} {opt}" if len(opt_text) > menu_width - 4: opt_text = opt_text[: menu_width - 7] + "..." stdscr.addstr(options_start_y + i, start_x + 2, opt_text, attr) # Draw footer footer = "Enter: select | Backspace: cancel" footer_x = start_x + (menu_width - len(footer)) // 2 stdscr.addstr( start_y + menu_height - 1, footer_x, footer, curses.color_pair(COLOR_STATUS) ) stdscr.refresh() ch = stdscr.getch() if ch in (curses.KEY_UP, ord("k")): idx = max(0, idx - 1) elif ch in (curses.KEY_DOWN, ord("j")): idx = min(len(options) - 1, idx + 1) elif ch in (curses.KEY_ENTER, 10, 13): return idx elif ch == curses.KEY_BACKSPACE or ch == 127 or ch == 27: return None # ------------------------------ main ------------------------------ def main(stdscr): curses.curs_set(0) # Hide cursor stdscr.nodelay(False) # Blocking input # Initialize colors if curses.has_colors(): init_colors() try: # Display welcome screen h, w = stdscr.getmaxyx() welcome_lines = [ "โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—", "โ•‘ โ•‘", "โ•‘ NixOS Package Version Manager โ•‘", "โ•‘ โ•‘", "โ•‘ Browse and update package versions with โ•‘", "โ•‘ this interactive TUI. Navigate using โ•‘", "โ•‘ arrow keys and Enter to select. โ•‘", "โ•‘ โ•‘", "โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•", "", "Loading packages...", ] # Center the welcome message start_y = (h - len(welcome_lines)) // 2 for i, line in enumerate(welcome_lines): if start_y + i < h: start_x = (w - len(line)) // 2 if "NixOS Package Version Manager" in line: stdscr.addstr( start_y + i, start_x, line, curses.color_pair(COLOR_TITLE) | curses.A_BOLD, ) elif "Loading packages..." in line: stdscr.addstr( start_y + i, start_x, line, curses.color_pair(COLOR_STATUS) ) else: stdscr.addstr( start_y + i, start_x, line, curses.color_pair(COLOR_NORMAL) ) stdscr.refresh() # Start the main screen after a short delay screen = PackagesScreen(stdscr) screen.run() except Exception: curses.endwin() traceback.print_exc() sys.exit(1) if __name__ == "__main__": curses.wrapper(main)