3033 lines
121 KiB
Python
Executable File
3033 lines
121 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
Interactive TUI for browsing and updating unified version.json files.
|
||
|
||
Features:
|
||
- Scans packages/**/version.json and lists all packages
|
||
- Per-package view:
|
||
- Choose base or any variant
|
||
- List all sources/components with current ref (tag/rev/url/version) and hash
|
||
- For GitHub sources: fetch candidates (latest release tag, latest tag, latest commit)
|
||
- For Git sources: fetch latest commit (HEAD)
|
||
- For URL sources: recompute hash (url/urlTemplate with rendered variables)
|
||
- Actions on a component:
|
||
- Update to one of the candidates (sets tag or rev) and optionally re-hash
|
||
- Recompute hash (prefetch)
|
||
- Edit any field via path=value (e.g., variables.version=2025.07)
|
||
- Writes changes back to version.json
|
||
|
||
Dependencies:
|
||
- Standard library + external CLI tools:
|
||
- nix-prefetch-url (or `nix prefetch-url`) and `nix hash to-sri`
|
||
- nix-prefetch-git
|
||
- git
|
||
- Optional: GITHUB_TOKEN env var to increase GitHub API rate limits
|
||
|
||
Usage:
|
||
scripts/version_tui.py
|
||
Controls:
|
||
- Up/Down to navigate lists
|
||
- Enter to select
|
||
- Backspace to go back
|
||
- q to quit
|
||
- On component screen:
|
||
r = refresh candidates
|
||
h = recompute hash (prefetch)
|
||
e = edit arbitrary field (path=value)
|
||
s = save to disk
|
||
"""
|
||
|
||
import curses
|
||
import json
|
||
import os
|
||
import re
|
||
import subprocess
|
||
import sys
|
||
import traceback
|
||
import urllib.request
|
||
import urllib.error
|
||
import urllib.parse
|
||
from urllib.parse import urlparse
|
||
from pathlib import Path
|
||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||
|
||
ROOT = Path(__file__).resolve().parents[1]
|
||
PKGS_DIR = ROOT / "packages"
|
||
|
||
Json = Dict[str, Any]
|
||
|
||
# ------------------------------ Utilities ------------------------------
|
||
|
||
|
||
def eprintln(*args, **kwargs):
|
||
print(*args, file=sys.stderr, **kwargs)
|
||
|
||
|
||
def load_json(path: Path) -> Json:
|
||
with path.open("r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
|
||
|
||
def save_json(path: Path, data: Json):
|
||
tmp = path.with_suffix(".tmp")
|
||
with tmp.open("w", encoding="utf-8") as f:
|
||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||
f.write("\n")
|
||
tmp.replace(path)
|
||
|
||
|
||
def render_templates(value: Any, variables: Dict[str, Any]) -> Any:
|
||
if isinstance(value, str):
|
||
|
||
def repl(m):
|
||
name = m.group(1)
|
||
return str(variables.get(name, m.group(0)))
|
||
|
||
return re.sub(r"\$\{([^}]+)\}", repl, value)
|
||
elif isinstance(value, dict):
|
||
return {k: render_templates(v, variables) for k, v in value.items()}
|
||
elif isinstance(value, list):
|
||
return [render_templates(v, variables) for v in value]
|
||
return value
|
||
|
||
|
||
def deep_set(o: Json, path: List[str], value: Any):
|
||
cur = o
|
||
for p in path[:-1]:
|
||
if p not in cur or not isinstance(cur[p], dict):
|
||
cur[p] = {}
|
||
cur = cur[p]
|
||
cur[path[-1]] = value
|
||
|
||
|
||
# ------------------------------ Merge helpers (match lib/versioning.nix) ------------------------------
|
||
|
||
|
||
def deep_merge(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]:
|
||
out = dict(a)
|
||
for k, v in b.items():
|
||
if k in out and isinstance(out[k], dict) and isinstance(v, dict):
|
||
out[k] = deep_merge(out[k], v)
|
||
else:
|
||
out[k] = v
|
||
return out
|
||
|
||
|
||
def merge_sources(
|
||
base_sources: Dict[str, Any], overrides: Dict[str, Any]
|
||
) -> Dict[str, Any]:
|
||
names = set(base_sources.keys()) | set(overrides.keys())
|
||
result: Dict[str, Any] = {}
|
||
for n in names:
|
||
if n in base_sources and n in overrides:
|
||
if isinstance(base_sources[n], dict) and isinstance(overrides[n], dict):
|
||
result[n] = deep_merge(base_sources[n], overrides[n])
|
||
else:
|
||
result[n] = overrides[n]
|
||
elif n in overrides:
|
||
result[n] = overrides[n]
|
||
else:
|
||
result[n] = base_sources[n]
|
||
return result
|
||
|
||
|
||
def merged_view(
|
||
spec: Json, variant_name: Optional[str]
|
||
) -> Tuple[Dict[str, Any], Dict[str, Any], Json]:
|
||
"""
|
||
Returns (merged_variables, merged_sources, target_dict_to_write)
|
||
merged_* are used for display/prefetch; target_dict_to_write is where updates must be written (base or selected variant).
|
||
"""
|
||
base_vars = spec.get("variables", {}) or {}
|
||
base_sources = spec.get("sources", {}) or {}
|
||
if variant_name:
|
||
vdict = spec.get("variants", {}).get(variant_name)
|
||
if not isinstance(vdict, dict):
|
||
raise ValueError(f"Variant '{variant_name}' not found")
|
||
v_vars = vdict.get("variables", {}) or {}
|
||
v_sources = vdict.get("sources", {}) or {}
|
||
merged_vars = dict(base_vars)
|
||
merged_vars.update(v_vars)
|
||
merged_srcs = merge_sources(base_sources, v_sources)
|
||
return merged_vars, merged_srcs, vdict
|
||
else:
|
||
return dict(base_vars), dict(base_sources), spec
|
||
|
||
|
||
def run_cmd(args: List[str]) -> Tuple[int, str, str]:
|
||
try:
|
||
p = subprocess.run(
|
||
args, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False
|
||
)
|
||
return p.returncode, p.stdout.strip(), p.stderr.strip()
|
||
except Exception as e:
|
||
return 1, "", str(e)
|
||
|
||
|
||
def run_get_stdout(args: List[str]) -> Optional[str]:
|
||
code, out, err = run_cmd(args)
|
||
if code != 0:
|
||
eprintln(f"Command failed: {' '.join(args)}\n{err}")
|
||
return None
|
||
return out
|
||
|
||
|
||
def _nix_fakeHash_build(expr: str) -> Optional[str]:
|
||
"""
|
||
Run `nix build --impure --expr expr` with lib.fakeHash and parse the correct
|
||
hash from the 'got:' line in nix's error output.
|
||
Returns the SRI hash string, or None on failure.
|
||
"""
|
||
p = subprocess.run(
|
||
["nix", "build", "--impure", "--expr", expr],
|
||
text=True,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
check=False,
|
||
)
|
||
m = re.search(r"got:\s+(sha256-[A-Za-z0-9+/=]+)", p.stderr)
|
||
if m:
|
||
return m.group(1)
|
||
eprintln(f"nix fakeHash build failed:\n{p.stderr[-600:]}")
|
||
return None
|
||
|
||
|
||
def nix_prefetch_github(
|
||
owner: str, repo: str, rev: str, submodules: bool = False
|
||
) -> Optional[str]:
|
||
"""
|
||
Compute the hash that pkgs.fetchFromGitHub expects for the given revision.
|
||
Uses nix build with lib.fakeHash to get the exact NAR hash of the unpacked
|
||
tarball, which is what Nix stores and validates against.
|
||
"""
|
||
sub = "true" if submodules else "false"
|
||
expr = (
|
||
f"let pkgs = import <nixpkgs> {{}};\n"
|
||
f"in pkgs.fetchFromGitHub {{\n"
|
||
f' owner = "{owner}";\n'
|
||
f' repo = "{repo}";\n'
|
||
f' rev = "{rev}";\n'
|
||
f" fetchSubmodules = {sub};\n"
|
||
f" hash = pkgs.lib.fakeHash;\n"
|
||
f"}}"
|
||
)
|
||
return _nix_fakeHash_build(expr)
|
||
|
||
|
||
def nix_prefetch_url(url: str) -> Optional[str]:
|
||
"""
|
||
Compute the flat (non-unpacked) hash for a fetchurl source.
|
||
Uses nix store prefetch-file which gives the same hash as pkgs.fetchurl.
|
||
Do NOT use this for fetchFromGitHub or fetchzip — those need the NAR hash
|
||
of the unpacked content (use nix_prefetch_github or nix_prefetch_fetchzip).
|
||
"""
|
||
out = run_get_stdout(
|
||
["nix", "store", "prefetch-file", "--hash-type", "sha256", "--json", url]
|
||
)
|
||
if out is not None and out.strip():
|
||
try:
|
||
data = json.loads(out)
|
||
if "hash" in data:
|
||
return data["hash"]
|
||
except Exception:
|
||
pass
|
||
|
||
# Fallback to legacy nix-prefetch-url
|
||
out = run_get_stdout(["nix-prefetch-url", "--type", "sha256", url])
|
||
if out is None:
|
||
out = run_get_stdout(["nix-prefetch-url", url])
|
||
if out is None:
|
||
return None
|
||
|
||
sri = run_get_stdout(["nix", "hash", "to-sri", "--type", "sha256", out.strip()])
|
||
return sri
|
||
|
||
|
||
def nix_prefetch_fetchzip(url: str, strip_root: bool = True) -> Optional[str]:
|
||
"""
|
||
Compute the hash that pkgs.fetchzip expects for the given URL.
|
||
fetchzip hashes the NAR of the UNPACKED archive, which differs from the
|
||
flat hash of the zip file itself. Uses nix build with lib.fakeHash.
|
||
"""
|
||
strip_attr = "true" if strip_root else "false"
|
||
expr = (
|
||
f"let pkgs = import <nixpkgs> {{}};\n"
|
||
f"in pkgs.fetchzip {{\n"
|
||
f' url = "{url}";\n'
|
||
f" stripRoot = {strip_attr};\n"
|
||
f" hash = pkgs.lib.fakeHash;\n"
|
||
f"}}"
|
||
)
|
||
return _nix_fakeHash_build(expr)
|
||
|
||
|
||
def nix_prefetch_git(url: str, rev: str) -> Optional[str]:
|
||
"""
|
||
Compute the hash that pkgs.fetchgit expects.
|
||
Uses nix-prefetch-git as the primary method (reliable for both commit SHAs
|
||
and tag names). Falls back to builtins.fetchGit + nix hash path for commit
|
||
SHAs when nix-prefetch-git is unavailable.
|
||
"""
|
||
# Primary: nix-prefetch-git (works for both commit SHAs and tag names)
|
||
out = run_get_stdout(["nix-prefetch-git", "--no-deepClone", "--rev", rev, url])
|
||
if out is not None:
|
||
base32 = None
|
||
try:
|
||
data = json.loads(out)
|
||
base32 = data.get("sha256") or data.get("hash")
|
||
except Exception:
|
||
lines = [l for l in out.splitlines() if l.strip()]
|
||
if lines:
|
||
base32 = lines[-1].strip()
|
||
if base32:
|
||
sri = run_get_stdout(["nix", "hash", "to-sri", "--type", "sha256", base32])
|
||
if sri:
|
||
return sri
|
||
|
||
# Fallback: builtins.fetchGit + nix hash path (commit SHA only — tag names fail)
|
||
# Only attempt if rev looks like a commit SHA (40 hex chars)
|
||
if re.match(r"^[0-9a-f]{40}$", rev):
|
||
expr = f'builtins.fetchGit {{ url = "{url}"; rev = "{rev}"; }}'
|
||
store_path = run_get_stdout(["nix", "eval", "--raw", "--expr", expr])
|
||
if store_path is not None and store_path.strip():
|
||
hash_out = run_get_stdout(
|
||
["nix", "hash", "path", "--type", "sha256", store_path.strip()]
|
||
)
|
||
if hash_out is not None and hash_out.strip():
|
||
return hash_out.strip()
|
||
|
||
return None
|
||
|
||
|
||
def nix_prefetch_cargo_vendor(
|
||
fetcher: str,
|
||
src_hash: str,
|
||
*,
|
||
url: str = "",
|
||
owner: str = "",
|
||
repo: str = "",
|
||
rev: str = "",
|
||
subdir: str = "",
|
||
) -> Optional[str]:
|
||
"""
|
||
Compute the cargo vendor hash for a Rust source using nix build + fakeHash.
|
||
|
||
Builds rustPlatform.fetchCargoVendor with lib.fakeHash, parses the correct
|
||
hash from the 'got:' line in nix's error output.
|
||
|
||
Args:
|
||
fetcher: "github" or "git"
|
||
src_hash: SRI hash of the source (already known)
|
||
url: git URL (for "git" fetcher)
|
||
owner/repo: GitHub owner and repo (for "github" fetcher)
|
||
rev: tag or commit rev
|
||
subdir: optional subdirectory within the source that contains Cargo.lock
|
||
|
||
Returns:
|
||
SRI hash string, or None on failure.
|
||
"""
|
||
if fetcher == "github" and owner and repo and rev and src_hash:
|
||
src_expr = (
|
||
f'pkgs.fetchFromGitHub {{ owner = "{owner}"; repo = "{repo}";'
|
||
f' rev = "{rev}"; hash = "{src_hash}"; }}'
|
||
)
|
||
elif fetcher == "git" and url and rev and src_hash:
|
||
# For GitHub git URLs, fetchFromGitHub is more reliable than fetchgit
|
||
parsed = urlparse(url)
|
||
parts = [p for p in parsed.path.split("/") if p]
|
||
if parsed.hostname in ("github.com",) and len(parts) >= 2:
|
||
gh_owner, gh_repo = parts[0], parts[1]
|
||
src_expr = (
|
||
f'pkgs.fetchFromGitHub {{ owner = "{gh_owner}"; repo = "{gh_repo}";'
|
||
f' rev = "{rev}"; hash = "{src_hash}"; }}'
|
||
)
|
||
else:
|
||
src_expr = f'pkgs.fetchgit {{ url = "{url}"; rev = "{rev}"; hash = "{src_hash}"; }}'
|
||
else:
|
||
return None
|
||
|
||
subdir_attr = f'sourceRoot = "${{src.name}}/{subdir}";' if subdir else ""
|
||
|
||
expr = (
|
||
f"let pkgs = import <nixpkgs> {{}};\n"
|
||
f" src = {src_expr};\n"
|
||
f"in pkgs.rustPlatform.fetchCargoVendor {{\n"
|
||
f" inherit src;\n"
|
||
f" {subdir_attr}\n"
|
||
f" hash = pkgs.lib.fakeHash;\n"
|
||
f"}}"
|
||
)
|
||
|
||
p = subprocess.run(
|
||
["nix", "build", "--impure", "--expr", expr],
|
||
text=True,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
check=False,
|
||
)
|
||
m = re.search(r"got:\s+(sha256-[A-Za-z0-9+/=]+)", p.stderr)
|
||
if m:
|
||
return m.group(1)
|
||
eprintln(f"nix_prefetch_cargo_vendor failed:\n{p.stderr[-600:]}")
|
||
return None
|
||
|
||
|
||
def http_get_json(url: str, token: Optional[str] = None) -> Any:
|
||
try:
|
||
req = urllib.request.Request(
|
||
url, headers={"Accept": "application/vnd.github+json"}
|
||
)
|
||
if token:
|
||
req.add_header("Authorization", f"Bearer {token}")
|
||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||
if resp.status != 200:
|
||
return None
|
||
return json.loads(resp.read().decode("utf-8"))
|
||
except urllib.error.HTTPError as e:
|
||
eprintln(f"HTTP error for {url}: {e.code} {e.reason}")
|
||
return None
|
||
except Exception as e:
|
||
eprintln(f"Request failed for {url}: {e}")
|
||
return None
|
||
|
||
|
||
def http_get_text(url: str) -> Optional[str]:
|
||
try:
|
||
# Provide a basic User-Agent to avoid some hosts rejecting the request
|
||
req = urllib.request.Request(url, headers={"User-Agent": "version-tui/1.0"})
|
||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||
if resp.status != 200:
|
||
return None
|
||
return resp.read().decode("utf-8")
|
||
except urllib.error.HTTPError as e:
|
||
eprintln(f"HTTP error for {url}: {e.code} {e.reason}")
|
||
return None
|
||
except Exception as e:
|
||
eprintln(f"Request failed for {url}: {e}")
|
||
return None
|
||
|
||
|
||
def gh_latest_release(owner: str, repo: str, token: Optional[str]) -> Optional[str]:
|
||
try:
|
||
data = http_get_json(
|
||
f"https://api.github.com/repos/{owner}/{repo}/releases/latest", token
|
||
)
|
||
if not data:
|
||
return None
|
||
return data.get("tag_name")
|
||
except Exception as e:
|
||
eprintln(f"latest_release failed for {owner}/{repo}: {e}")
|
||
return None
|
||
|
||
|
||
def gh_latest_tag(owner: str, repo: str, token: Optional[str]) -> Optional[str]:
|
||
try:
|
||
data = http_get_json(
|
||
f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=100", token
|
||
)
|
||
if not isinstance(data, list):
|
||
return None
|
||
tags = [
|
||
t.get("name")
|
||
for t in data
|
||
if isinstance(t, dict) and "name" in t and t.get("name") is not None
|
||
]
|
||
return tags[0] if tags else None
|
||
except Exception as e:
|
||
eprintln(f"latest_tag failed for {owner}/{repo}: {e}")
|
||
return None
|
||
|
||
|
||
def gh_list_tags(owner: str, repo: str, token: Optional[str]) -> List[str]:
|
||
try:
|
||
data = http_get_json(
|
||
f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=100", token
|
||
)
|
||
return [
|
||
str(t.get("name"))
|
||
for t in data
|
||
if isinstance(t, dict) and "name" in t and t.get("name") is not None
|
||
]
|
||
except Exception as e:
|
||
eprintln(f"list_tags failed for {owner}/{repo}: {e}")
|
||
return []
|
||
|
||
|
||
def gh_head_commit(
|
||
owner: str, repo: str, branch: Optional[str] = None
|
||
) -> Optional[str]:
|
||
"""Return the latest commit SHA for a GitHub repo, optionally restricted to a branch."""
|
||
try:
|
||
ref = f"refs/heads/{branch}" if branch else "HEAD"
|
||
out = run_get_stdout(
|
||
["git", "ls-remote", f"https://github.com/{owner}/{repo}.git", ref]
|
||
)
|
||
if not out:
|
||
return None
|
||
# ls-remote can return multiple lines; take the first match
|
||
for line in out.splitlines():
|
||
parts = line.split()
|
||
if parts:
|
||
return parts[0]
|
||
return None
|
||
except Exception as e:
|
||
eprintln(f"head_commit failed for {owner}/{repo} (branch={branch}): {e}")
|
||
return None
|
||
|
||
|
||
def _iso_to_date(iso: str) -> str:
|
||
"""Convert an ISO-8601 timestamp like '2026-03-02T13:32:38Z' to 'YYYY-MM-DD'."""
|
||
if iso and len(iso) >= 10:
|
||
return iso[:10]
|
||
return ""
|
||
|
||
|
||
def gh_ref_date(owner: str, repo: str, ref: str, token: Optional[str]) -> str:
|
||
"""
|
||
Return the committer date (YYYY-MM-DD) for any ref on a GitHub repo.
|
||
Works for commit SHAs, tag names, and branch names.
|
||
Returns empty string on failure.
|
||
"""
|
||
try:
|
||
data = http_get_json(
|
||
f"https://api.github.com/repos/{owner}/{repo}/commits/{urllib.parse.quote(ref, safe='')}",
|
||
token,
|
||
)
|
||
if not isinstance(data, dict):
|
||
return ""
|
||
iso = (
|
||
data.get("commit", {}).get("committer", {}).get("date")
|
||
or data.get("commit", {}).get("author", {}).get("date")
|
||
or ""
|
||
)
|
||
return _iso_to_date(iso)
|
||
except Exception:
|
||
return ""
|
||
|
||
|
||
def gh_release_date(owner: str, repo: str, tag: str, token: Optional[str]) -> str:
|
||
"""
|
||
Return the published date (YYYY-MM-DD) for a GitHub release by tag name.
|
||
Falls back to gh_ref_date if the release is not found.
|
||
"""
|
||
try:
|
||
data = http_get_json(
|
||
f"https://api.github.com/repos/{owner}/{repo}/releases/tags/{urllib.parse.quote(tag, safe='')}",
|
||
token,
|
||
)
|
||
if isinstance(data, dict):
|
||
iso = data.get("published_at") or data.get("created_at") or ""
|
||
if iso:
|
||
return _iso_to_date(iso)
|
||
except Exception:
|
||
pass
|
||
return gh_ref_date(owner, repo, tag, token)
|
||
|
||
|
||
def git_commit_date(url: str, sha: str) -> str:
|
||
"""
|
||
Return the committer date (YYYY-MM-DD) for a commit SHA on a plain git repo.
|
||
Only works for GitHub URLs (uses the REST API). Returns '' for others.
|
||
"""
|
||
try:
|
||
parsed = urlparse(url)
|
||
if parsed.hostname != "github.com":
|
||
return ""
|
||
parts = [p for p in parsed.path.split("/") if p]
|
||
if len(parts) < 2:
|
||
return ""
|
||
owner = parts[0]
|
||
repo = parts[1].removesuffix(".git") # strip .git suffix if present
|
||
return gh_ref_date(owner, repo, sha, None)
|
||
except Exception:
|
||
return ""
|
||
|
||
|
||
def git_branch_commit(url: str, branch: Optional[str] = None) -> Optional[str]:
|
||
"""Return the latest commit SHA for a git URL, optionally restricted to a branch."""
|
||
try:
|
||
ref = f"refs/heads/{branch}" if branch else "HEAD"
|
||
out = run_get_stdout(["git", "ls-remote", url, ref])
|
||
if not out:
|
||
return None
|
||
for line in out.splitlines():
|
||
parts = line.split()
|
||
if parts:
|
||
return parts[0]
|
||
return None
|
||
except Exception as e:
|
||
eprintln(f"git_branch_commit failed for {url} (branch={branch}): {e}")
|
||
return None
|
||
|
||
|
||
def gh_tarball_url(owner: str, repo: str, ref: str) -> str:
|
||
return f"https://codeload.github.com/{owner}/{repo}/tar.gz/{ref}"
|
||
|
||
|
||
def gh_release_tags_api(owner: str, repo: str, token: Optional[str]) -> List[str]:
|
||
"""
|
||
Return recent release tag names for a repo using GitHub API.
|
||
"""
|
||
try:
|
||
data = http_get_json(
|
||
f"https://api.github.com/repos/{owner}/{repo}/releases?per_page=50", token
|
||
)
|
||
return [
|
||
str(r.get("tag_name"))
|
||
for r in data
|
||
if isinstance(r, dict) and "tag_name" in r and r.get("tag_name") is not None
|
||
]
|
||
except Exception as e:
|
||
eprintln(f"releases list failed for {owner}/{repo}: {e}")
|
||
return []
|
||
|
||
|
||
# ------------------------------ Data scanning ------------------------------
|
||
|
||
|
||
def find_packages() -> List[Tuple[str, Path, bool, bool]]:
|
||
results = []
|
||
# Find regular packages with version.json
|
||
for p in PKGS_DIR.rglob("version.json"):
|
||
# name is directory name under packages (e.g., raspberrypi/linux-rpi => raspberrypi/linux-rpi)
|
||
rel = p.relative_to(PKGS_DIR).parent
|
||
results.append(
|
||
(str(rel), p, False, False)
|
||
) # (name, path, is_python, is_homeassistant)
|
||
|
||
# Find Python packages with default.nix
|
||
python_dir = PKGS_DIR / "python"
|
||
if python_dir.exists():
|
||
for pkg_dir in python_dir.iterdir():
|
||
if pkg_dir.is_dir():
|
||
nix_file = pkg_dir / "default.nix"
|
||
if nix_file.exists():
|
||
# name is python/package-name
|
||
rel = pkg_dir.relative_to(PKGS_DIR)
|
||
results.append(
|
||
(str(rel), nix_file, True, False)
|
||
) # (name, path, is_python, is_homeassistant)
|
||
|
||
# Find Home Assistant components with default.nix
|
||
homeassistant_dir = PKGS_DIR / "homeassistant"
|
||
if homeassistant_dir.exists():
|
||
for pkg_dir in homeassistant_dir.iterdir():
|
||
if pkg_dir.is_dir():
|
||
nix_file = pkg_dir / "default.nix"
|
||
if nix_file.exists():
|
||
# Only treat as an HA component if it uses buildHomeAssistantComponent;
|
||
# otherwise fall through to Python package handling.
|
||
try:
|
||
nix_content = nix_file.read_text(encoding="utf-8")
|
||
except Exception:
|
||
nix_content = ""
|
||
rel = pkg_dir.relative_to(PKGS_DIR)
|
||
if "buildHomeAssistantComponent" in nix_content:
|
||
results.append(
|
||
(str(rel), nix_file, False, True)
|
||
) # (name, path, is_python, is_homeassistant)
|
||
else:
|
||
# Treat as a Python package instead
|
||
results.append((str(rel), nix_file, True, False))
|
||
|
||
results.sort()
|
||
return results
|
||
|
||
|
||
def _extract_brace_block(content: str, keyword: str) -> Optional[str]:
|
||
"""
|
||
Find 'keyword {' in content and return the text between the matching braces,
|
||
handling nested braces (e.g. ${var} inside strings).
|
||
Returns None if not found.
|
||
"""
|
||
idx = content.find(keyword + " {")
|
||
if idx == -1:
|
||
idx = content.find(keyword + "{")
|
||
if idx == -1:
|
||
return None
|
||
start = content.find("{", idx + len(keyword))
|
||
if start == -1:
|
||
return None
|
||
depth = 0
|
||
for i in range(start, len(content)):
|
||
c = content[i]
|
||
if c == "{":
|
||
depth += 1
|
||
elif c == "}":
|
||
depth -= 1
|
||
if depth == 0:
|
||
return content[start + 1 : i]
|
||
return None
|
||
|
||
|
||
def _resolve_nix_str(value: str, pname: str, version: str) -> str:
|
||
"""Resolve simple Nix string interpolations like ${pname} and ${version}."""
|
||
value = re.sub(r"\$\{pname\}", pname, value)
|
||
value = re.sub(r"\$\{version\}", version, value)
|
||
return value
|
||
|
||
|
||
def _extract_nix_attr(block: str, attr: str) -> str:
|
||
"""
|
||
Extract attribute value from a Nix attribute set block.
|
||
Handles:
|
||
attr = "quoted string";
|
||
attr = unquoted_ident;
|
||
Returns empty string if not found.
|
||
"""
|
||
# Quoted string value
|
||
m = re.search(rf'\b{attr}\s*=\s*"([^"]*)"', block)
|
||
if m:
|
||
return m.group(1)
|
||
# Unquoted identifier (e.g. repo = pname;)
|
||
m = re.search(rf"\b{attr}\s*=\s*([A-Za-z_][A-Za-z0-9_-]*)\s*;", block)
|
||
if m:
|
||
return m.group(1)
|
||
return ""
|
||
|
||
|
||
def parse_python_package(path: Path) -> Dict[str, Any]:
|
||
"""Parse a Python package's default.nix file to extract version and source information."""
|
||
with path.open("r", encoding="utf-8") as f:
|
||
content = f.read()
|
||
|
||
# Extract version
|
||
version_match = re.search(r'version\s*=\s*"([^"]+)"', content)
|
||
version = version_match.group(1) if version_match else ""
|
||
|
||
# Extract pname (package name)
|
||
pname_match = re.search(r'pname\s*=\s*"([^"]+)"', content)
|
||
pname = pname_match.group(1) if pname_match else ""
|
||
|
||
# Create a structure similar to version.json for compatibility
|
||
result: Dict[str, Any] = {"variables": {}, "sources": {}}
|
||
|
||
# Only add non-empty values to variables
|
||
if version:
|
||
result["variables"]["version"] = version
|
||
|
||
# Determine source name - use pname or derive from path
|
||
source_name = pname.lower() if pname else path.parent.name.lower()
|
||
|
||
# Try to extract brace-balanced fetchFromGitHub block (handles ${var} inside strings)
|
||
fetch_block = _extract_brace_block(content, "fetchFromGitHub")
|
||
|
||
# Check for fetchPypi pattern (simple [^}]+ is fine here as PyPI blocks lack ${})
|
||
fetch_pypi_match = re.search(
|
||
r"src\s*=\s*.*fetchPypi\s*\{([^}]+)\}", content, re.DOTALL
|
||
)
|
||
|
||
if fetch_block is not None:
|
||
owner_raw = _extract_nix_attr(fetch_block, "owner")
|
||
repo_raw = _extract_nix_attr(fetch_block, "repo")
|
||
rev_raw = _extract_nix_attr(fetch_block, "rev")
|
||
hash_match = re.search(r'(sha256|hash)\s*=\s*"([^"]+)"', fetch_block)
|
||
hash_value = hash_match.group(2) if hash_match else ""
|
||
|
||
def _resolve_nix_ident(raw: str) -> str:
|
||
"""Resolve unquoted Nix identifier or string-with-interpolation to its value."""
|
||
if raw == "pname":
|
||
return pname
|
||
if raw == "version":
|
||
return version
|
||
return _resolve_nix_str(raw, pname, version)
|
||
|
||
owner = _resolve_nix_ident(owner_raw)
|
||
repo = _resolve_nix_ident(repo_raw)
|
||
rev = _resolve_nix_ident(rev_raw)
|
||
|
||
# Create source entry
|
||
result["sources"][source_name] = {
|
||
"fetcher": "github",
|
||
"owner": owner,
|
||
"repo": repo,
|
||
"hash": hash_value,
|
||
}
|
||
|
||
# Classify rev as tag or commit ref
|
||
if rev:
|
||
if rev.startswith("v") or "${version}" in rev_raw:
|
||
result["sources"][source_name]["tag"] = rev
|
||
elif rev in ("master", "main"):
|
||
result["sources"][source_name]["rev"] = rev
|
||
else:
|
||
result["sources"][source_name]["rev"] = rev
|
||
|
||
elif fetch_pypi_match:
|
||
fetch_block_pypi = fetch_pypi_match.group(1)
|
||
|
||
hash_match = re.search(r'(sha256|hash)\s*=\s*"([^"]+)"', fetch_block_pypi)
|
||
hash_value = hash_match.group(2) if hash_match else ""
|
||
|
||
# Look for GitHub info in meta section
|
||
homepage_match = re.search(
|
||
r'homepage\s*=\s*"https://github\.com/([^/]+)/([^"]+)"', content
|
||
)
|
||
|
||
if homepage_match:
|
||
owner = homepage_match.group(1)
|
||
repo = homepage_match.group(2)
|
||
result["sources"][source_name] = {
|
||
"fetcher": "github",
|
||
"owner": owner,
|
||
"repo": repo,
|
||
"hash": hash_value,
|
||
"pypi": True,
|
||
}
|
||
if version:
|
||
result["sources"][source_name]["tag"] = f"v{version}"
|
||
else:
|
||
result["sources"][source_name] = {
|
||
"fetcher": "pypi",
|
||
"pname": pname,
|
||
"version": version,
|
||
"hash": hash_value,
|
||
}
|
||
else:
|
||
# Fallback: scan whole file for GitHub or URL info
|
||
owner_match = re.search(r'owner\s*=\s*"([^"]+)"', content)
|
||
repo_match = re.search(r'repo\s*=\s*"([^"]+)"', content)
|
||
rev_match = re.search(r'rev\s*=\s*"([^"]+)"', content)
|
||
tag_match = re.search(r'tag\s*=\s*"([^"]+)"', content)
|
||
hash_match = re.search(r'(sha256|hash)\s*=\s*"([^"]+)"', content)
|
||
url_match = re.search(r'url\s*=\s*"([^"]+)"', content)
|
||
homepage_match = re.search(
|
||
r'homepage\s*=\s*"https://github\.com/([^/]+)/([^"]+)"', content
|
||
)
|
||
|
||
owner = owner_match.group(1) if owner_match else ""
|
||
repo = repo_match.group(1) if repo_match else ""
|
||
rev = rev_match.group(1) if rev_match else ""
|
||
tag = tag_match.group(1) if tag_match else ""
|
||
hash_value = hash_match.group(2) if hash_match else ""
|
||
url = url_match.group(1) if url_match else ""
|
||
|
||
if homepage_match and not (owner and repo):
|
||
owner = homepage_match.group(1)
|
||
repo = homepage_match.group(2)
|
||
|
||
if owner and repo:
|
||
result["sources"][source_name] = {
|
||
"fetcher": "github",
|
||
"owner": owner,
|
||
"repo": repo,
|
||
"hash": hash_value,
|
||
}
|
||
if tag:
|
||
result["sources"][source_name]["tag"] = tag
|
||
elif rev:
|
||
result["sources"][source_name]["rev"] = rev
|
||
elif url:
|
||
result["sources"][source_name] = {
|
||
"fetcher": "url",
|
||
"url": url,
|
||
"hash": hash_value,
|
||
}
|
||
else:
|
||
result["sources"][source_name] = {"fetcher": "unknown", "hash": hash_value}
|
||
|
||
return result
|
||
|
||
|
||
def update_python_package(
|
||
path: Path, source_name: str, updates: Dict[str, Any]
|
||
) -> bool:
|
||
"""Update a Python package's default.nix file with new version and/or hash."""
|
||
with path.open("r", encoding="utf-8") as f:
|
||
content = f.read()
|
||
|
||
modified = False
|
||
|
||
# Update version if provided
|
||
if "version" in updates:
|
||
new_version = updates["version"]
|
||
content, version_count = re.subn(
|
||
r'(version\s*=\s*)"([^"]+)"', f'\\1"{new_version}"', content
|
||
)
|
||
if version_count > 0:
|
||
modified = True
|
||
|
||
# Update hash if provided
|
||
if "hash" in updates:
|
||
new_hash = updates["hash"]
|
||
# Match both sha256 and hash attributes
|
||
content, hash_count = re.subn(
|
||
r'(sha256|hash)\s*=\s*"([^"]+)"', f'\\1 = "{new_hash}"', content
|
||
)
|
||
if hash_count > 0:
|
||
modified = True
|
||
|
||
# Update tag if provided
|
||
if "tag" in updates:
|
||
new_tag = updates["tag"]
|
||
content, tag_count = re.subn(
|
||
r'(tag\s*=\s*)"([^"]+)"', f'\\1"{new_tag}"', content
|
||
)
|
||
if tag_count > 0:
|
||
modified = True
|
||
|
||
# Update rev if provided
|
||
if "rev" in updates:
|
||
new_rev = updates["rev"]
|
||
content, rev_count = re.subn(
|
||
r'(rev\s*=\s*)"([^"]+)"', f'\\1"{new_rev}"', content
|
||
)
|
||
if rev_count > 0:
|
||
modified = True
|
||
|
||
if modified:
|
||
with path.open("w", encoding="utf-8") as f:
|
||
f.write(content)
|
||
|
||
return modified
|
||
|
||
|
||
def parse_homeassistant_component(path: Path) -> Dict[str, Any]:
|
||
"""Parse a Home Assistant component's default.nix file to extract version and source information."""
|
||
with path.open("r", encoding="utf-8") as f:
|
||
content = f.read()
|
||
|
||
# Extract domain, version, and owner
|
||
domain_match = re.search(r'domain\s*=\s*"([^"]+)"', content)
|
||
version_match = re.search(r'version\s*=\s*"([^"]+)"', content)
|
||
owner_match = re.search(r'owner\s*=\s*"([^"]+)"', content)
|
||
|
||
domain = domain_match.group(1) if domain_match else ""
|
||
version = version_match.group(1) if version_match else ""
|
||
owner = owner_match.group(1) if owner_match else ""
|
||
|
||
# Extract GitHub repo info
|
||
repo_match = re.search(r'repo\s*=\s*"([^"]+)"', content)
|
||
rev_match = re.search(r'rev\s*=\s*"([^"]+)"', content)
|
||
tag_match = re.search(r'tag\s*=\s*"([^"]+)"', content)
|
||
hash_match = re.search(r'(sha256|hash)\s*=\s*"([^"]+)"', content)
|
||
|
||
repo = repo_match.group(1) if repo_match else ""
|
||
rev = rev_match.group(1) if rev_match else ""
|
||
tag = tag_match.group(1) if tag_match else ""
|
||
hash_value = hash_match.group(2) if hash_match else ""
|
||
|
||
# Create a structure similar to version.json for compatibility
|
||
result = {"variables": {}, "sources": {}}
|
||
|
||
# Only add non-empty values to variables
|
||
if version:
|
||
result["variables"]["version"] = version
|
||
if domain:
|
||
result["variables"]["domain"] = domain
|
||
|
||
# Determine source name - use domain or directory name
|
||
source_name = domain if domain else path.parent.name.lower()
|
||
|
||
# Handle GitHub sources
|
||
if owner:
|
||
repo_name = repo if repo else source_name
|
||
result["sources"][source_name] = {
|
||
"fetcher": "github",
|
||
"owner": owner,
|
||
"repo": repo_name,
|
||
}
|
||
|
||
# Only add non-empty values
|
||
if hash_value:
|
||
result["sources"][source_name]["hash"] = hash_value
|
||
|
||
# Handle tag or rev; resolve ${version} references
|
||
if tag:
|
||
result["sources"][source_name]["tag"] = _resolve_nix_str(tag, "", version)
|
||
elif rev:
|
||
rev_resolved = _resolve_nix_str(rev, "", version)
|
||
# If rev was a ${version} template or equals version, treat as tag
|
||
if "${version}" in rev or rev_resolved == version:
|
||
result["sources"][source_name]["tag"] = rev_resolved
|
||
else:
|
||
result["sources"][source_name]["rev"] = rev_resolved
|
||
elif version: # fallback: use version as tag
|
||
result["sources"][source_name]["tag"] = version
|
||
else:
|
||
# Fallback for components with no clear source info
|
||
result["sources"][source_name] = {"fetcher": "unknown"}
|
||
if hash_value:
|
||
result["sources"][source_name]["hash"] = hash_value
|
||
|
||
return result
|
||
|
||
|
||
def update_homeassistant_component(
|
||
path: Path, source_name: str, updates: Dict[str, Any]
|
||
) -> bool:
|
||
"""Update a Home Assistant component's default.nix file with new version and/or hash."""
|
||
with path.open("r", encoding="utf-8") as f:
|
||
content = f.read()
|
||
|
||
modified = False
|
||
|
||
# Update version if provided
|
||
if "version" in updates:
|
||
new_version = updates["version"]
|
||
content, version_count = re.subn(
|
||
r'(version\s*=\s*)"([^"]+)"', f'\\1"{new_version}"', content
|
||
)
|
||
if version_count > 0:
|
||
modified = True
|
||
|
||
# Update hash if provided
|
||
if "hash" in updates:
|
||
new_hash = updates["hash"]
|
||
# Match both sha256 and hash attributes in src = fetchFromGitHub { ... }
|
||
content, hash_count = re.subn(
|
||
r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(sha256|hash)\s*=\s*"([^"]+)"([^}]*\})',
|
||
f'\\1\\2 = "{new_hash}"\\4',
|
||
content,
|
||
)
|
||
if hash_count > 0:
|
||
modified = True
|
||
|
||
# Update tag if provided
|
||
if "tag" in updates:
|
||
new_tag = updates["tag"]
|
||
content, tag_count = re.subn(
|
||
r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(tag|rev)\s*=\s*"([^"]+)"([^}]*\})',
|
||
f'\\1\\2 = "{new_tag}"\\4',
|
||
content,
|
||
)
|
||
if tag_count == 0: # If no tag/rev found, try to add it
|
||
content, tag_count = re.subn(
|
||
r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(hash\s*=\s*"[^"]+")([^}]*\})',
|
||
f'\\1\\2;\n tag = "{new_tag}"\\3',
|
||
content,
|
||
)
|
||
if tag_count > 0:
|
||
modified = True
|
||
|
||
# Update rev if provided
|
||
if "rev" in updates:
|
||
new_rev = updates["rev"]
|
||
content, rev_count = re.subn(
|
||
r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(rev|tag)\s*=\s*"([^"]+)"([^}]*\})',
|
||
f'\\1\\2 = "{new_rev}"\\4',
|
||
content,
|
||
)
|
||
if rev_count == 0: # If no rev/tag found, try to add it
|
||
content, rev_count = re.subn(
|
||
r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(hash\s*=\s*"[^"]+")([^}]*\})',
|
||
f'\\1\\2;\n rev = "{new_rev}"\\3',
|
||
content,
|
||
)
|
||
if rev_count > 0:
|
||
modified = True
|
||
|
||
if modified:
|
||
with path.open("w", encoding="utf-8") as f:
|
||
f.write(content)
|
||
|
||
return modified
|
||
|
||
|
||
# ------------------------------ Display helpers ------------------------------
|
||
|
||
|
||
def source_display_ref(comp: Dict[str, Any], merged_vars: Dict[str, Any]) -> str:
|
||
"""
|
||
Build a concise human-readable reference string for a source component.
|
||
|
||
Rules per fetcher:
|
||
github -> owner/repo@tag or owner/repo@rev[:7] (fully rendered)
|
||
git -> tag-or-rev[:12] (commit SHAs truncated)
|
||
url -> owner/repo@release-tag · filename (when vars are resolved)
|
||
filename only (when no release tag)
|
||
urlTemplate pattern (when vars still unresolved)
|
||
none -> version field or empty
|
||
"""
|
||
fetcher = comp.get("fetcher", "none")
|
||
rendered = render_templates(comp, merged_vars)
|
||
|
||
if fetcher == "github":
|
||
tag = rendered.get("tag") or ""
|
||
rev = rendered.get("rev") or ""
|
||
owner = rendered.get("owner") or str(merged_vars.get("owner") or "")
|
||
repo = rendered.get("repo") or str(merged_vars.get("repo") or "")
|
||
if tag and owner and repo:
|
||
return f"{owner}/{repo}@{tag}"
|
||
if tag:
|
||
return tag
|
||
if rev and owner and repo:
|
||
return f"{owner}/{repo}@{rev[:7]}"
|
||
if rev:
|
||
return rev[:12]
|
||
return ""
|
||
|
||
if fetcher == "git":
|
||
ref = rendered.get("tag") or rendered.get("rev") or comp.get("version") or ""
|
||
# Truncate bare commit SHAs to 12 chars; keep short tags intact
|
||
if len(ref) == 40 and all(c in "0123456789abcdef" for c in ref):
|
||
return ref[:12]
|
||
return ref
|
||
|
||
if fetcher == "url":
|
||
url = rendered.get("url") or rendered.get("urlTemplate") or ""
|
||
if not url:
|
||
return ""
|
||
# If the rendered URL still contains unresolved ${…} templates, show the
|
||
# filename portion with remaining placeholders rendered as <varname> so the
|
||
# user sees a meaningful pattern rather than literal '${base}' strings.
|
||
if "${" in url:
|
||
tmpl = comp.get("urlTemplate") or comp.get("url") or url
|
||
filename = os.path.basename(urlparse(tmpl).path) if tmpl else tmpl
|
||
# Replace remaining ${var} with <var> for readability
|
||
filename = re.sub(r"\$\{([^}]+)\}", r"<\1>", filename)
|
||
return filename
|
||
owner = str(merged_vars.get("owner") or "")
|
||
repo = str(merged_vars.get("repo") or "")
|
||
rp = str(merged_vars.get("releasePrefix") or "")
|
||
rs = str(merged_vars.get("releaseSuffix") or "")
|
||
base = str(merged_vars.get("base") or "")
|
||
rel = str(merged_vars.get("release") or "")
|
||
tag = f"{rp}{base}-{rel}{rs}" if (base and rel) else ""
|
||
filename = os.path.basename(urlparse(url).path) if url else ""
|
||
if owner and repo and tag and filename:
|
||
return f"{owner}/{repo}@{tag} · {filename}"
|
||
if filename:
|
||
return filename
|
||
return url
|
||
|
||
# none / pypi / unknown – fall back to version or empty
|
||
return str(comp.get("version") or comp.get("tag") or comp.get("rev") or "")
|
||
|
||
|
||
# ------------------------------ TUI helpers ------------------------------
|
||
|
||
# Define color pairs
|
||
COLOR_NORMAL = 1
|
||
COLOR_HIGHLIGHT = 2
|
||
COLOR_HEADER = 3
|
||
COLOR_STATUS = 4
|
||
COLOR_ERROR = 5
|
||
COLOR_SUCCESS = 6
|
||
COLOR_BORDER = 7
|
||
COLOR_TITLE = 8
|
||
COLOR_DIM = 9 # muted text used for dates / secondary info
|
||
|
||
|
||
def init_colors():
|
||
"""Initialize color pairs for the TUI."""
|
||
curses.start_color()
|
||
curses.use_default_colors()
|
||
|
||
# Define color pairs
|
||
curses.init_pair(COLOR_NORMAL, curses.COLOR_WHITE, -1)
|
||
curses.init_pair(COLOR_HIGHLIGHT, curses.COLOR_BLACK, curses.COLOR_CYAN)
|
||
curses.init_pair(COLOR_HEADER, curses.COLOR_CYAN, -1)
|
||
curses.init_pair(COLOR_STATUS, curses.COLOR_YELLOW, -1)
|
||
curses.init_pair(COLOR_ERROR, curses.COLOR_RED, -1)
|
||
curses.init_pair(COLOR_SUCCESS, curses.COLOR_GREEN, -1)
|
||
curses.init_pair(COLOR_BORDER, curses.COLOR_BLUE, -1)
|
||
curses.init_pair(COLOR_TITLE, curses.COLOR_MAGENTA, -1)
|
||
# Dim colour for secondary info like dates (white + A_DIM applied at render time)
|
||
curses.init_pair(COLOR_DIM, curses.COLOR_WHITE, -1)
|
||
|
||
|
||
def draw_border(win, y, x, h, w):
|
||
"""Draw a border around a region of the window."""
|
||
# Draw corners
|
||
win.addch(y, x, curses.ACS_ULCORNER, curses.color_pair(COLOR_BORDER))
|
||
win.addch(y, x + w - 1, curses.ACS_URCORNER, curses.color_pair(COLOR_BORDER))
|
||
win.addch(y + h - 1, x, curses.ACS_LLCORNER, curses.color_pair(COLOR_BORDER))
|
||
|
||
# Draw bottom-right corner safely
|
||
try:
|
||
win.addch(
|
||
y + h - 1, x + w - 1, curses.ACS_LRCORNER, curses.color_pair(COLOR_BORDER)
|
||
)
|
||
except curses.error:
|
||
# This is expected when trying to write to the bottom-right corner
|
||
pass
|
||
|
||
# Draw horizontal lines
|
||
for i in range(1, w - 1):
|
||
win.addch(y, x + i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER))
|
||
win.addch(y + h - 1, x + i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER))
|
||
|
||
# Draw vertical lines
|
||
for i in range(1, h - 1):
|
||
win.addch(y + i, x, curses.ACS_VLINE, curses.color_pair(COLOR_BORDER))
|
||
win.addch(y + i, x + w - 1, curses.ACS_VLINE, curses.color_pair(COLOR_BORDER))
|
||
|
||
|
||
class ScreenBase:
|
||
def __init__(self, stdscr):
|
||
self.stdscr = stdscr
|
||
self.status = ""
|
||
self.status_type = "normal" # "normal", "error", "success"
|
||
|
||
def draw_status(self, height, width):
|
||
if self.status:
|
||
color = COLOR_STATUS
|
||
if self.status_type == "error":
|
||
color = COLOR_ERROR
|
||
elif self.status_type == "success":
|
||
color = COLOR_SUCCESS
|
||
self.stdscr.addstr(
|
||
height - 1,
|
||
0,
|
||
self.status[: max(0, width - 1)],
|
||
curses.color_pair(color),
|
||
)
|
||
# else: leave the bottom line blank when no status message
|
||
|
||
def set_status(self, text: str, status_type="normal"):
|
||
self.status = text
|
||
self.status_type = status_type
|
||
|
||
def run(self):
|
||
raise NotImplementedError
|
||
|
||
|
||
def prompt_input(stdscr, prompt: str) -> Optional[str]:
|
||
curses.echo()
|
||
stdscr.addstr(prompt, curses.color_pair(COLOR_HEADER))
|
||
stdscr.clrtoeol()
|
||
s = stdscr.getstr().decode("utf-8")
|
||
curses.noecho()
|
||
return s
|
||
|
||
|
||
def show_popup(stdscr, lines: List[str], title: str = ""):
|
||
h, w = stdscr.getmaxyx()
|
||
box_h = min(len(lines) + 4, h - 2)
|
||
box_w = min(max(max(len(l) for l in lines), len(title)) + 6, w - 2)
|
||
top = (h - box_h) // 2
|
||
left = (w - box_w) // 2
|
||
win = curses.newwin(box_h, box_w, top, left)
|
||
|
||
# Draw fancy border
|
||
draw_border(win, 0, 0, box_h, box_w)
|
||
|
||
# Add title if provided
|
||
if title:
|
||
title_x = (box_w - len(title)) // 2
|
||
win.addstr(0, title_x, f" {title} ", curses.color_pair(COLOR_TITLE))
|
||
|
||
# Add content
|
||
for i, line in enumerate(lines, start=1):
|
||
if i >= box_h - 1:
|
||
break
|
||
win.addstr(i, 2, line[: box_w - 4], curses.color_pair(COLOR_NORMAL))
|
||
|
||
# Add footer
|
||
footer = "Press any key to continue"
|
||
footer_x = (box_w - len(footer)) // 2
|
||
win.addstr(box_h - 1, footer_x, footer, curses.color_pair(COLOR_STATUS))
|
||
|
||
win.refresh()
|
||
win.getch()
|
||
|
||
|
||
# ------------------------------ Screens ------------------------------
|
||
|
||
|
||
class PackagesScreen(ScreenBase):
|
||
def __init__(self, stdscr):
|
||
super().__init__(stdscr)
|
||
self.packages = find_packages()
|
||
self.idx = 0
|
||
self.filter_mode = "all" # "all", "regular", "python"
|
||
self.scroll_offset = 0 # Add scroll offset to handle long lists
|
||
|
||
def run(self):
|
||
while True:
|
||
self.stdscr.clear()
|
||
h, w = self.stdscr.getmaxyx()
|
||
|
||
# Determine split layout
|
||
left_w = max(30, min(60, w // 3))
|
||
right_x = left_w + 1
|
||
right_w = max(0, w - right_x)
|
||
|
||
# Draw borders for left and right panes
|
||
draw_border(self.stdscr, 0, 0, h - 1, left_w)
|
||
if right_w >= 20:
|
||
draw_border(self.stdscr, 0, right_x, h - 1, right_w)
|
||
|
||
# Filter packages based on mode
|
||
if self.filter_mode == "regular":
|
||
filtered_packages = [p for p in self.packages if not p[2] and not p[3]]
|
||
elif self.filter_mode == "python":
|
||
filtered_packages = [p for p in self.packages if p[2]]
|
||
elif self.filter_mode == "homeassistant":
|
||
filtered_packages = [p for p in self.packages if p[3]]
|
||
else:
|
||
filtered_packages = self.packages
|
||
|
||
# Left pane title with count and active filter
|
||
count = len(filtered_packages)
|
||
if self.filter_mode == "regular":
|
||
title = f"Packages [{count}] f:filter"
|
||
elif self.filter_mode == "python":
|
||
title = f"Python [{count}] f:filter"
|
||
elif self.filter_mode == "homeassistant":
|
||
title = f"Home Assistant [{count}] f:filter"
|
||
else:
|
||
title = f"All Packages [{count}] f:filter"
|
||
|
||
title_x = max(1, (left_w - len(title)) // 2)
|
||
self.stdscr.addstr(
|
||
0, title_x, f" {title} ", curses.color_pair(COLOR_TITLE) | curses.A_BOLD
|
||
)
|
||
|
||
# Implement scrolling for long lists
|
||
max_rows = h - 3
|
||
total_packages = len(filtered_packages)
|
||
|
||
# Adjust scroll offset if needed
|
||
if self.idx >= self.scroll_offset + max_rows:
|
||
self.scroll_offset = self.idx - max_rows + 1
|
||
elif self.idx < self.scroll_offset:
|
||
self.scroll_offset = self.idx
|
||
|
||
# Display visible packages with scroll offset
|
||
visible_packages = filtered_packages[
|
||
self.scroll_offset : self.scroll_offset + max_rows
|
||
]
|
||
|
||
# Show scroll indicators if needed
|
||
if self.scroll_offset > 0:
|
||
self.stdscr.addstr(1, left_w - 3, "↑", curses.color_pair(COLOR_STATUS))
|
||
if self.scroll_offset + max_rows < total_packages:
|
||
self.stdscr.addstr(
|
||
min(1 + len(visible_packages), h - 2),
|
||
left_w - 3,
|
||
"↓",
|
||
curses.color_pair(COLOR_STATUS),
|
||
)
|
||
|
||
for i, (name, _path, is_python, is_homeassistant) in enumerate(
|
||
visible_packages, start=0
|
||
):
|
||
# Use consistent display style for all packages
|
||
pkg_type = "" # Remove the [Py] prefix for consistent display
|
||
|
||
# Highlight the selected item
|
||
if i + self.scroll_offset == self.idx:
|
||
attr = curses.color_pair(COLOR_HIGHLIGHT)
|
||
sel = "►" # Use a fancier selector
|
||
else:
|
||
attr = curses.color_pair(COLOR_NORMAL)
|
||
sel = " "
|
||
|
||
# Type badge: [py] [ha] shown in a fixed column before name
|
||
if is_python:
|
||
badge = "[py]"
|
||
elif is_homeassistant:
|
||
badge = "[ha]"
|
||
else:
|
||
badge = " "
|
||
|
||
name_col_w = max(0, left_w - 9)
|
||
self.stdscr.addstr(1 + i, 2, f"{sel} {badge} {name[:name_col_w]}", attr)
|
||
|
||
# Right pane: preview of selected package (non-interactive summary)
|
||
if right_w >= 20 and filtered_packages:
|
||
try:
|
||
name, path, is_python, is_homeassistant = filtered_packages[
|
||
self.idx
|
||
]
|
||
|
||
# Right pane header: package name centred
|
||
type_badge = (
|
||
" [py]" if is_python else (" [ha]" if is_homeassistant else "")
|
||
)
|
||
hdr = f" {name}{type_badge} "
|
||
title_x = right_x + max(1, (right_w - len(hdr)) // 2)
|
||
self.stdscr.addstr(
|
||
0,
|
||
title_x,
|
||
hdr[: max(0, right_w - 2)],
|
||
curses.color_pair(COLOR_TITLE) | curses.A_BOLD,
|
||
)
|
||
|
||
# Show path relative to /etc/nixos
|
||
try:
|
||
rel_path = str(path.relative_to(Path("/etc/nixos")))
|
||
except ValueError:
|
||
rel_path = str(path)
|
||
self.stdscr.addstr(
|
||
1,
|
||
right_x + 2,
|
||
rel_path[: max(0, right_w - 3)],
|
||
curses.color_pair(COLOR_NORMAL) | curses.A_DIM,
|
||
)
|
||
|
||
# Sources header
|
||
self.stdscr.addstr(
|
||
2, right_x + 2, "Sources:", curses.color_pair(COLOR_HEADER)
|
||
)
|
||
|
||
if is_python:
|
||
spec = parse_python_package(path)
|
||
elif is_homeassistant:
|
||
spec = parse_homeassistant_component(path)
|
||
else:
|
||
spec = load_json(path)
|
||
merged_vars, merged_srcs, _ = merged_view(spec, None)
|
||
snames = sorted(list(merged_srcs.keys()))
|
||
max_src_rows = max(0, h - 6)
|
||
for i2, sname in enumerate(snames[:max_src_rows], start=0):
|
||
comp = merged_srcs[sname]
|
||
fetcher = comp.get("fetcher", "none")
|
||
display_ref = source_display_ref(comp, merged_vars)
|
||
|
||
# Column layout: name(16) fetcher(7) ref(rest)
|
||
NAME_W, FETCH_W = 16, 7
|
||
ref_col = right_x + 2 + NAME_W + 1 + FETCH_W + 1
|
||
max_ref = max(0, right_w - (NAME_W + 1 + FETCH_W + 1) - 3)
|
||
ref_short = display_ref[:max_ref] + (
|
||
"..." if len(display_ref) > max_ref else ""
|
||
)
|
||
|
||
fetcher_color = COLOR_NORMAL
|
||
if fetcher == "github":
|
||
fetcher_color = COLOR_SUCCESS
|
||
elif fetcher == "url":
|
||
fetcher_color = COLOR_STATUS
|
||
elif fetcher == "git":
|
||
fetcher_color = COLOR_HEADER
|
||
|
||
self.stdscr.addstr(
|
||
3 + i2,
|
||
right_x + 2,
|
||
f"{sname[:NAME_W]:<{NAME_W}}",
|
||
curses.color_pair(COLOR_NORMAL),
|
||
)
|
||
self.stdscr.addstr(
|
||
3 + i2,
|
||
right_x + 2 + NAME_W + 1,
|
||
f"{fetcher[:FETCH_W]:<{FETCH_W}}",
|
||
curses.color_pair(fetcher_color),
|
||
)
|
||
self.stdscr.addstr(
|
||
3 + i2,
|
||
ref_col,
|
||
ref_short[: max(0, right_w - (ref_col - right_x) - 1)],
|
||
curses.color_pair(COLOR_NORMAL),
|
||
)
|
||
|
||
# Hint line just above the bottom border
|
||
hint = "Enter: details k/j: move f: filter q: quit"
|
||
if h >= 4:
|
||
hint_x = right_x + max(1, (right_w - len(hint)) // 2)
|
||
self.stdscr.addstr(
|
||
h - 2,
|
||
hint_x,
|
||
hint[: max(0, right_w - 2)],
|
||
curses.color_pair(COLOR_STATUS),
|
||
)
|
||
except Exception as e:
|
||
self.stdscr.addstr(
|
||
2, right_x + 2, "Error:", curses.color_pair(COLOR_ERROR)
|
||
)
|
||
self.stdscr.addstr(
|
||
2,
|
||
right_x + 9,
|
||
f"{e}"[: max(0, right_w - 11)],
|
||
curses.color_pair(COLOR_ERROR),
|
||
)
|
||
|
||
self.draw_status(h, w)
|
||
self.stdscr.refresh()
|
||
ch = self.stdscr.getch()
|
||
if ch in (ord("q"), 27): # q or ESC
|
||
return None
|
||
elif ch in (curses.KEY_UP, ord("k")):
|
||
self.idx = max(0, self.idx - 1)
|
||
elif ch in (curses.KEY_DOWN, ord("j")):
|
||
self.idx = min(len(filtered_packages) - 1, self.idx + 1)
|
||
elif ch == curses.KEY_PPAGE: # Page Up
|
||
self.idx = max(0, self.idx - (h - 4))
|
||
elif ch == curses.KEY_NPAGE: # Page Down
|
||
self.idx = min(len(filtered_packages) - 1, self.idx + (h - 4))
|
||
elif ch == ord("g"): # Go to top
|
||
self.idx = 0
|
||
elif ch == ord("G"): # Go to bottom
|
||
self.idx = max(0, len(filtered_packages) - 1)
|
||
elif ch == ord("f"):
|
||
# Cycle: all -> regular -> python -> homeassistant -> all
|
||
modes = ["all", "regular", "python", "homeassistant"]
|
||
self.filter_mode = modes[
|
||
(modes.index(self.filter_mode) + 1) % len(modes)
|
||
]
|
||
self.idx = 0
|
||
self.scroll_offset = 0
|
||
elif ch in (curses.KEY_ENTER, 10, 13):
|
||
filtered_packages = self.packages
|
||
if self.filter_mode == "regular":
|
||
filtered_packages = [p for p in self.packages if not p[2]]
|
||
elif self.filter_mode == "python":
|
||
filtered_packages = [p for p in self.packages if p[2]]
|
||
|
||
if not filtered_packages:
|
||
continue
|
||
|
||
name, path, is_python, is_homeassistant = filtered_packages[self.idx]
|
||
try:
|
||
if is_python:
|
||
spec = parse_python_package(path)
|
||
elif is_homeassistant:
|
||
spec = parse_homeassistant_component(path)
|
||
else:
|
||
spec = load_json(path)
|
||
except Exception as e:
|
||
self.set_status(f"Failed to load {path}: {e}")
|
||
continue
|
||
screen = PackageDetailScreen(
|
||
self.stdscr, name, path, spec, is_python, is_homeassistant
|
||
)
|
||
ret = screen.run()
|
||
if ret == "reload":
|
||
# re-scan on save
|
||
self.packages = find_packages()
|
||
self.idx = min(self.idx, len(self.packages) - 1)
|
||
else:
|
||
pass
|
||
|
||
|
||
class PackageDetailScreen(ScreenBase):
|
||
def __init__(
|
||
self,
|
||
stdscr,
|
||
pkg_name: str,
|
||
path: Path,
|
||
spec: Json,
|
||
is_python: bool = False,
|
||
is_homeassistant: bool = False,
|
||
):
|
||
super().__init__(stdscr)
|
||
self.pkg_name = pkg_name
|
||
self.path = path
|
||
self.spec = spec
|
||
self.is_python = is_python
|
||
self.is_homeassistant = is_homeassistant
|
||
# Preserve JSON insertion order for variants (do not sort alphabetically)
|
||
self.variants = ["<base>"] + list(self.spec.get("variants", {}).keys())
|
||
# Honour the spec's defaultVariant — pre-select it so the user lands on a
|
||
# meaningful view immediately (e.g. proton-cachyos opens at cachyos-v4, not
|
||
# the uninformative <base> which has no base/release variables populated).
|
||
default = self.spec.get("defaultVariant")
|
||
if default and default in self.variants:
|
||
self.vidx = self.variants.index(default)
|
||
else:
|
||
self.vidx = 0
|
||
self.gh_token = os.environ.get("GITHUB_TOKEN")
|
||
self.candidates: Dict[
|
||
str, Dict[str, str]
|
||
] = {} # name -> {release, tag, commit}
|
||
self.url_candidates: Dict[
|
||
str, Dict[str, str]
|
||
] = {} # name -> {base, release, tag}
|
||
# initialize view
|
||
self.recompute_view()
|
||
|
||
def select_variant(self):
|
||
# Recompute merged and target views when variant changes
|
||
self.recompute_view()
|
||
|
||
def recompute_view(self):
|
||
# Set cursor to base or selected variant dict for manual edits
|
||
if self.vidx == 0:
|
||
self.cursor = self.spec
|
||
variant_name = None
|
||
else:
|
||
variant_name = self.variants[self.vidx]
|
||
self.cursor = self.spec["variants"][variant_name]
|
||
# Compute merged view and target dict for writing
|
||
self.merged_vars, self.merged_srcs, self.target_dict = merged_view(
|
||
self.spec, variant_name
|
||
)
|
||
self.snames = sorted(list(self.merged_srcs.keys()))
|
||
self.sidx = 0
|
||
|
||
def fetch_candidates_for(self, name: str):
|
||
comp = self.merged_srcs[name]
|
||
fetcher = comp.get("fetcher", "none")
|
||
branch = comp.get("branch") or None # optional branch override
|
||
c: Dict[str, str] = {
|
||
"release": "",
|
||
"tag": "",
|
||
"commit": "",
|
||
"release_date": "",
|
||
"tag_date": "",
|
||
"commit_date": "",
|
||
}
|
||
if fetcher == "github":
|
||
owner = comp.get("owner")
|
||
repo = comp.get("repo")
|
||
if owner and repo:
|
||
# Only fetch release/tag candidates when not locked to a specific branch
|
||
if not branch:
|
||
r = gh_latest_release(owner, repo, self.gh_token)
|
||
if r:
|
||
c["release"] = r
|
||
c["release_date"] = gh_release_date(
|
||
owner, repo, r, self.gh_token
|
||
)
|
||
t = gh_latest_tag(owner, repo, self.gh_token)
|
||
if t:
|
||
c["tag"] = t
|
||
c["tag_date"] = gh_ref_date(owner, repo, t, self.gh_token)
|
||
|
||
m = gh_head_commit(owner, repo, branch)
|
||
if m:
|
||
c["commit"] = m
|
||
c["commit_date"] = gh_ref_date(owner, repo, m, self.gh_token)
|
||
|
||
# Special-case raspberrypi/linux: prefer latest stable_* tag or series-specific tags
|
||
# (only when not branch-locked, as branch-locked tracks a rolling branch via commit)
|
||
if not branch:
|
||
try:
|
||
if owner == "raspberrypi" and repo == "linux":
|
||
tags_all = gh_list_tags(owner, repo, self.gh_token)
|
||
rendered = render_templates(comp, self.merged_vars)
|
||
cur_tag = str(rendered.get("tag") or "")
|
||
# If current tag uses stable_YYYYMMDD scheme, pick latest stable_* tag
|
||
if cur_tag.startswith("stable_"):
|
||
stable_tags = sorted(
|
||
[
|
||
x
|
||
for x in tags_all
|
||
if re.match(r"^stable_\d{8}$", x)
|
||
],
|
||
reverse=True,
|
||
)
|
||
if stable_tags:
|
||
new_tag = stable_tags[0]
|
||
if new_tag != c["tag"]:
|
||
c["tag"] = new_tag
|
||
c["tag_date"] = gh_ref_date(
|
||
owner, repo, new_tag, self.gh_token
|
||
)
|
||
else:
|
||
# Try to pick a tag matching the current major.minor series if available
|
||
mm = str(self.merged_vars.get("modDirVersion") or "")
|
||
m2 = re.match(r"^(\d+)\.(\d+)", mm)
|
||
if m2:
|
||
base = f"rpi-{m2.group(1)}.{m2.group(2)}"
|
||
series_tags = [
|
||
x
|
||
for x in tags_all
|
||
if (
|
||
x == f"{base}.y"
|
||
or x.startswith(f"{base}.y")
|
||
or x.startswith(f"{base}.")
|
||
)
|
||
]
|
||
series_tags.sort(reverse=True)
|
||
if series_tags:
|
||
new_tag = series_tags[0]
|
||
if new_tag != c["tag"]:
|
||
c["tag"] = new_tag
|
||
c["tag_date"] = gh_ref_date(
|
||
owner, repo, new_tag, self.gh_token
|
||
)
|
||
except Exception as _e:
|
||
# Fallback to previously computed values
|
||
pass
|
||
elif fetcher == "git":
|
||
url = comp.get("url")
|
||
if url:
|
||
# Special-case: CachyOS ZFS — read commit from PKGBUILD rather than
|
||
# tracking HEAD of the repo (the repo has many branches and HEAD is
|
||
# not necessarily what the kernel package uses).
|
||
if (
|
||
self.pkg_name == "linux-cachyos"
|
||
and name == "zfs"
|
||
and "cachyos/zfs" in url
|
||
):
|
||
pkgbuild_commit = self.fetch_cachyos_zfs_commit()
|
||
if pkgbuild_commit:
|
||
c["commit"] = pkgbuild_commit
|
||
c["commit_date"] = git_commit_date(url, pkgbuild_commit)
|
||
else:
|
||
commit = git_branch_commit(url, branch)
|
||
if commit:
|
||
c["commit"] = commit
|
||
c["commit_date"] = git_commit_date(url, commit)
|
||
elif fetcher == "url":
|
||
# Heuristic for GitHub release assets with variables in version.json (e.g., proton-cachyos)
|
||
owner = self.merged_vars.get("owner")
|
||
repo = self.merged_vars.get("repo")
|
||
if owner and repo:
|
||
tags = gh_release_tags_api(str(owner), str(repo), self.gh_token)
|
||
prefix = str(self.merged_vars.get("releasePrefix", ""))
|
||
suffix = str(self.merged_vars.get("releaseSuffix", ""))
|
||
latest = next(
|
||
(
|
||
t
|
||
for t in tags
|
||
if (t and t.startswith(prefix) and t.endswith(suffix))
|
||
),
|
||
None,
|
||
)
|
||
if latest:
|
||
c["release"] = latest
|
||
c["release_date"] = gh_release_date(
|
||
str(owner), str(repo), latest, self.gh_token
|
||
)
|
||
mid = latest
|
||
if prefix and mid.startswith(prefix):
|
||
mid = mid[len(prefix) :]
|
||
if suffix and mid.endswith(suffix):
|
||
mid = mid[: -len(suffix)]
|
||
parts = mid.split("-")
|
||
if len(parts) >= 2:
|
||
base, rel = parts[0], parts[-1]
|
||
self.url_candidates[name] = {
|
||
"base": base,
|
||
"release": rel,
|
||
"tag": latest,
|
||
}
|
||
self.candidates[name] = c
|
||
|
||
def prefetch_hash_for(self, name: str) -> Optional[str]:
|
||
comp = self.merged_srcs[name]
|
||
fetcher = comp.get("fetcher", "none")
|
||
if fetcher == "github":
|
||
owner = comp.get("owner")
|
||
repo = comp.get("repo")
|
||
rendered = render_templates(comp, self.merged_vars)
|
||
ref = rendered.get("tag") or rendered.get("rev")
|
||
submodules = bool(comp.get("submodules", False))
|
||
if owner and repo and ref:
|
||
# fetchFromGitHub hashes the NAR of the unpacked tarball, not the
|
||
# raw tarball file — must use nix_prefetch_github (fakeHash method).
|
||
return nix_prefetch_github(owner, repo, ref, submodules=submodules)
|
||
elif fetcher == "git":
|
||
url = comp.get("url")
|
||
rev = comp.get("rev")
|
||
if url and rev:
|
||
return nix_prefetch_git(url, rev)
|
||
elif fetcher == "url":
|
||
rendered = render_templates(comp, self.merged_vars)
|
||
url = rendered.get("url") or rendered.get("urlTemplate")
|
||
if url:
|
||
extra = comp.get("extra") or {}
|
||
if extra.get("unpack") == "zip":
|
||
# fetchzip hashes the NAR of extracted content, not the zip file.
|
||
strip_root = extra.get("stripRoot", True)
|
||
return nix_prefetch_fetchzip(url, strip_root=strip_root)
|
||
else:
|
||
return nix_prefetch_url(url)
|
||
return None
|
||
|
||
def prefetch_cargo_hash_for(self, name: str) -> Optional[str]:
|
||
"""
|
||
Compute the cargo vendor hash for a source component that carries a
|
||
'cargoHash' field (or a linked 'cargoDeps' sibling source).
|
||
|
||
Uses nix_prefetch_cargo_vendor() which builds fetchCargoVendor with
|
||
lib.fakeHash and parses the correct hash from the error output.
|
||
"""
|
||
comp = self.merged_srcs[name]
|
||
fetcher = comp.get("fetcher", "none")
|
||
src_hash = comp.get("hash", "")
|
||
subdir = comp.get("cargoSubdir", "")
|
||
|
||
rendered = render_templates(comp, self.merged_vars)
|
||
|
||
if fetcher == "github":
|
||
owner = comp.get("owner", "")
|
||
repo = comp.get("repo", "")
|
||
ref = rendered.get("tag") or rendered.get("rev") or ""
|
||
if owner and repo and ref and src_hash:
|
||
return nix_prefetch_cargo_vendor(
|
||
"github",
|
||
src_hash,
|
||
owner=owner,
|
||
repo=repo,
|
||
rev=ref,
|
||
subdir=subdir,
|
||
)
|
||
elif fetcher == "git":
|
||
url = comp.get("url", "")
|
||
rev = rendered.get("rev") or rendered.get("tag") or ""
|
||
if url and rev and src_hash:
|
||
return nix_prefetch_cargo_vendor(
|
||
"git",
|
||
src_hash,
|
||
url=url,
|
||
rev=rev,
|
||
subdir=subdir,
|
||
)
|
||
return None
|
||
|
||
def _source_has_cargo(self, name: str) -> bool:
|
||
"""Return True if this source carries a cargoHash field."""
|
||
comp = self.merged_srcs.get(name, {})
|
||
return "cargoHash" in comp
|
||
|
||
def _apply_cargo_hash_to_sibling(self, name: str, cargo_hash: str):
|
||
"""
|
||
Propagate a freshly-computed cargo hash to any sibling source that mirrors
|
||
the cargoDeps pattern — a source whose only meaningful field is "hash" and
|
||
which is meant to stay in sync with the main source's cargoHash.
|
||
|
||
Detection heuristic (any match triggers update):
|
||
- Sibling is literally named "cargoDeps", OR
|
||
- Sibling has no fetcher and its only field is "hash" (pure hash mirror)
|
||
"""
|
||
ts = self.target_dict.setdefault("sources", {})
|
||
for sibling_name, sibling in list(self.merged_srcs.items()):
|
||
if sibling_name == name:
|
||
continue
|
||
has_fetcher = bool(sibling.get("fetcher"))
|
||
non_fetcher_keys = [k for k in sibling.keys() if k != "fetcher"]
|
||
is_cargo_deps = sibling_name == "cargoDeps"
|
||
is_hash_only = not has_fetcher and non_fetcher_keys == ["hash"]
|
||
if is_cargo_deps or is_hash_only:
|
||
sw = ts.setdefault(sibling_name, {})
|
||
sw["hash"] = cargo_hash
|
||
|
||
def cachyos_suffix(self) -> str:
|
||
if self.vidx == 0:
|
||
return ""
|
||
v = self.variants[self.vidx]
|
||
mapping = {"rc": "-rc", "hardened": "-hardened", "lts": "-lts"}
|
||
return mapping.get(v, "")
|
||
|
||
def fetch_cachyos_linux_latest(self, suffix: str) -> Optional[str]:
|
||
"""
|
||
Try to determine latest linux version from upstream:
|
||
- Prefer .SRCINFO (preprocessed)
|
||
- Fallback to PKGBUILD (parse pkgver= line)
|
||
Tries both 'CachyOS' and 'cachyos' org casing just in case.
|
||
"""
|
||
bases = [
|
||
"https://raw.githubusercontent.com/CachyOS/linux-cachyos/master",
|
||
"https://raw.githubusercontent.com/cachyos/linux-cachyos/master",
|
||
]
|
||
paths = [
|
||
f"linux-cachyos{suffix}/.SRCINFO",
|
||
f"linux-cachyos{suffix}/PKGBUILD",
|
||
]
|
||
|
||
def parse_srcinfo(text: str) -> Optional[str]:
|
||
m = re.search(r"^\s*pkgver\s*=\s*([^\s#]+)\s*$", text, re.MULTILINE)
|
||
if not m:
|
||
return None
|
||
v = m.group(1).strip()
|
||
return v
|
||
|
||
def parse_pkgbuild(text: str) -> Optional[str]:
|
||
# Parse assignments and expand variables in pkgver
|
||
# Build a simple env map from VAR=value lines
|
||
env: Dict[str, str] = {}
|
||
for line in text.splitlines():
|
||
line = line.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
m_assign = re.match(r"^\s*([A-Za-z_][A-Za-z0-9_]*)\s*=\s*(.+)$", line)
|
||
if m_assign:
|
||
key = m_assign.group(1)
|
||
val = m_assign.group(2).strip()
|
||
# Remove trailing comments
|
||
val = re.sub(r"\s+#.*$", "", val).strip()
|
||
# Strip surrounding quotes
|
||
if (val.startswith('"') and val.endswith('"')) or (
|
||
val.startswith("'") and val.endswith("'")
|
||
):
|
||
val = val[1:-1]
|
||
env[key] = val
|
||
|
||
m = re.search(r"^\s*pkgver\s*=\s*(.+)$", text, re.MULTILINE)
|
||
if not m:
|
||
return None
|
||
raw = m.group(1).strip()
|
||
# Strip quotes
|
||
if (raw.startswith('"') and raw.endswith('"')) or (
|
||
raw.startswith("'") and raw.endswith("'")
|
||
):
|
||
raw = raw[1:-1]
|
||
|
||
def expand_vars(s: str) -> str:
|
||
def repl_braced(mb):
|
||
key = mb.group(1)
|
||
return env.get(key, mb.group(0)) or mb.group(0)
|
||
|
||
def repl_unbraced(mu):
|
||
key = mu.group(1)
|
||
return env.get(key, mu.group(0)) or mu.group(0)
|
||
|
||
# Expand ${var} then $var
|
||
s = re.sub(r"\$\{([^}]+)\}", repl_braced, s)
|
||
s = re.sub(r"\$([A-Za-z_][A-Za-z0-9_]*)", repl_unbraced, s)
|
||
return s
|
||
|
||
v = expand_vars(raw).strip()
|
||
# normalize rc form like 6.19.rc6 -> 6.19-rc6
|
||
v = v.replace(".rc", "-rc")
|
||
return v
|
||
|
||
# Try .SRCINFO first, then PKGBUILD
|
||
for base in bases:
|
||
# .SRCINFO
|
||
url = f"{base}/{paths[0]}"
|
||
text = http_get_text(url)
|
||
if text:
|
||
ver = parse_srcinfo(text)
|
||
if ver:
|
||
return ver.replace(".rc", "-rc")
|
||
# PKGBUILD fallback
|
||
url = f"{base}/{paths[1]}"
|
||
text = http_get_text(url)
|
||
if text:
|
||
ver = parse_pkgbuild(text)
|
||
if ver:
|
||
return ver.replace(".rc", "-rc")
|
||
|
||
return None
|
||
|
||
def linux_tarball_url_for_version(self, version: str) -> str:
|
||
# Use torvalds snapshot for -rc, stable releases from CDN
|
||
if "-rc" in version:
|
||
return f"https://git.kernel.org/torvalds/t/linux-{version}.tar.gz"
|
||
parts = version.split(".")
|
||
major = parts[0] if parts else "6"
|
||
major_minor = ".".join(parts[:2]) if len(parts) >= 2 else version
|
||
ver_for_tar = major_minor if version.endswith(".0") else version
|
||
return f"https://cdn.kernel.org/pub/linux/kernel/v{major}.x/linux-{ver_for_tar}.tar.xz"
|
||
|
||
def fetch_cachyos_zfs_commit(self) -> Optional[str]:
|
||
"""
|
||
Read the CachyOS PKGBUILD for the current variant and extract the ZFS
|
||
commit SHA from the source line:
|
||
git+https://github.com/cachyos/zfs.git#commit=<SHA>
|
||
Returns the commit SHA string, or None on failure.
|
||
"""
|
||
suffix = self.cachyos_suffix()
|
||
bases = [
|
||
"https://raw.githubusercontent.com/CachyOS/linux-cachyos/master",
|
||
"https://raw.githubusercontent.com/cachyos/linux-cachyos/master",
|
||
]
|
||
for base in bases:
|
||
url = f"{base}/linux-cachyos{suffix}/PKGBUILD"
|
||
text = http_get_text(url)
|
||
if not text:
|
||
continue
|
||
m = re.search(
|
||
r"git\+https://github\.com/cachyos/zfs\.git#commit=([0-9a-f]+)",
|
||
text,
|
||
)
|
||
if m:
|
||
return m.group(1)
|
||
return None
|
||
|
||
def update_linux_from_pkgbuild(self, name: str):
|
||
suffix = self.cachyos_suffix()
|
||
latest = self.fetch_cachyos_linux_latest(suffix)
|
||
if not latest:
|
||
self.set_status("linux: failed to get version from PKGBUILD")
|
||
return
|
||
url = self.linux_tarball_url_for_version(latest)
|
||
sri = nix_prefetch_url(url)
|
||
if not sri:
|
||
self.set_status("linux: prefetch failed")
|
||
return
|
||
ts = self.target_dict.setdefault("sources", {})
|
||
compw = ts.setdefault(name, {})
|
||
compw["version"] = latest
|
||
compw["hash"] = sri
|
||
self._refresh_merged()
|
||
self.set_status(f"{name}: updated version to {latest} and refreshed hash")
|
||
|
||
def _cachyos_config_nix_dir(self) -> Optional[Path]:
|
||
"""Return the config-nix/x86_64-linux dir relative to this package."""
|
||
d = self.path.parent / "config-nix" / "x86_64-linux"
|
||
return d if d.is_dir() else None
|
||
|
||
def _cachyos_regen_flavors(self) -> List[str]:
|
||
"""
|
||
Parse regen-config.sh to extract the flavor list, so TUI stays in sync
|
||
with whatever the script defines. Falls back to a hard-coded default.
|
||
"""
|
||
script = self.path.parent / "regen-config.sh"
|
||
if script.exists():
|
||
text = script.read_text()
|
||
# Match: for flavor in cachyos{-a,-b,...}; do OR for flavor in a b c; do
|
||
m = re.search(r"for\s+flavor\s+in\s+(cachyos\{[^}]+\}|[^\n;]+?)\s*;", text)
|
||
if m:
|
||
raw = m.group(1).strip()
|
||
# Brace expansion: cachyos{-gcc,-lto} → ["cachyos-gcc", "cachyos-lto"]
|
||
bm = re.match(r"^(\w+)\{([^}]+)\}$", raw)
|
||
if bm:
|
||
prefix = bm.group(1)
|
||
suffixes = [s.strip() for s in bm.group(2).split(",")]
|
||
return [f"{prefix}{s}" for s in suffixes]
|
||
# Plain space-separated list
|
||
return raw.split()
|
||
# Hard-coded fallback matching regen-config.sh
|
||
return [
|
||
"cachyos-gcc",
|
||
"cachyos-lto",
|
||
"cachyos-lto-full",
|
||
"cachyos-server",
|
||
"cachyos-lts",
|
||
"cachyos-hardened",
|
||
"cachyos-server-lto",
|
||
"cachyos-lts-lto",
|
||
"cachyos-hardened-lto",
|
||
]
|
||
|
||
def _flake_root(self) -> Optional[Path]:
|
||
"""Walk up from self.path to find the directory containing flake.nix."""
|
||
d = self.path.parent
|
||
for _ in range(10):
|
||
if (d / "flake.nix").exists():
|
||
return d
|
||
parent = d.parent
|
||
if parent == d:
|
||
break
|
||
d = parent
|
||
return None
|
||
|
||
def regen_config_nix(self):
|
||
"""
|
||
For each flavor in regen-config.sh, run:
|
||
nix build .#nixosConfigurations.jallen-nas.pkgs.mjallen.linuxPackages_<flavor>.kernel.kconfigToNix
|
||
--no-link --print-out-paths
|
||
then copy the output store path into config-nix/x86_64-linux/<flavor>.x86_64-linux.nix.
|
||
Shows live progress in the status bar.
|
||
"""
|
||
config_dir = self._cachyos_config_nix_dir()
|
||
if config_dir is None:
|
||
self.set_status("regen: config-nix/x86_64-linux/ not found")
|
||
return
|
||
flake_root = self._flake_root()
|
||
if flake_root is None:
|
||
self.set_status("regen: could not find flake.nix root")
|
||
return
|
||
flavors = self._cachyos_regen_flavors()
|
||
n = len(flavors)
|
||
errors: List[str] = []
|
||
for i, flavor in enumerate(flavors):
|
||
self.set_status(f"regen [{i + 1}/{n}] building {flavor}...")
|
||
self.stdscr.refresh()
|
||
attr = f".#nixosConfigurations.jallen-nas.pkgs.mjallen.linuxPackages_{flavor}.kernel.kconfigToNix"
|
||
code, out, err = run_cmd(
|
||
["nix", "build", attr, "--no-link", "--print-out-paths"],
|
||
)
|
||
if code != 0 or not out:
|
||
errors.append(flavor)
|
||
eprintln(f"regen {flavor} failed:\n{err[-400:]}")
|
||
continue
|
||
store_path = out.strip().splitlines()[0].strip()
|
||
try:
|
||
content = Path(store_path).read_text()
|
||
except Exception as e:
|
||
errors.append(flavor)
|
||
eprintln(f"regen {flavor}: read {store_path} failed: {e}")
|
||
continue
|
||
dest = config_dir / f"{flavor}.x86_64-linux.nix"
|
||
try:
|
||
dest.write_text(content)
|
||
except Exception as e:
|
||
errors.append(flavor)
|
||
eprintln(f"regen {flavor}: write {dest} failed: {e}")
|
||
continue
|
||
if errors:
|
||
self.set_status(f"regen: done with errors on: {', '.join(errors)}")
|
||
else:
|
||
self.set_status(f"regen: updated {n} config.nix files")
|
||
|
||
def _refresh_merged(self):
|
||
"""Re-compute merged_vars/merged_srcs/target_dict without resetting sidx."""
|
||
variant_name = None if self.vidx == 0 else self.variants[self.vidx]
|
||
self.merged_vars, self.merged_srcs, self.target_dict = merged_view(
|
||
self.spec, variant_name
|
||
)
|
||
self.snames = sorted(list(self.merged_srcs.keys()))
|
||
# Clamp sidx in case source list changed
|
||
if self.snames:
|
||
self.sidx = min(self.sidx, len(self.snames) - 1)
|
||
|
||
def set_ref(self, name: str, kind: str, value: str):
|
||
# Write to selected target dict (base or variant override)
|
||
ts = self.target_dict.setdefault("sources", {})
|
||
comp = ts.setdefault(name, {})
|
||
if kind in ("release", "tag"):
|
||
comp["tag"] = value
|
||
if "rev" in comp:
|
||
del comp["rev"]
|
||
elif kind == "commit":
|
||
comp["rev"] = value
|
||
if "tag" in comp:
|
||
del comp["tag"]
|
||
# Refresh merged_srcs so prefetch_hash_for sees the updated ref
|
||
self._refresh_merged()
|
||
|
||
def save(self):
|
||
if self.is_python:
|
||
# For Python packages, update the default.nix file
|
||
for name in self.snames:
|
||
source = self.merged_srcs[name]
|
||
updates = {}
|
||
|
||
# Get version from variables
|
||
if "version" in self.merged_vars:
|
||
updates["version"] = self.merged_vars["version"]
|
||
|
||
# Get hash from source
|
||
if "hash" in source:
|
||
updates["hash"] = source["hash"]
|
||
|
||
# Get tag from source
|
||
if "tag" in source:
|
||
updates["tag"] = source["tag"]
|
||
|
||
# Get rev from source
|
||
if "rev" in source:
|
||
updates["rev"] = source["rev"]
|
||
|
||
if updates:
|
||
update_python_package(self.path, name, updates)
|
||
return True
|
||
elif self.is_homeassistant:
|
||
# For Home Assistant components, update the default.nix file
|
||
for name in self.snames:
|
||
source = self.merged_srcs[name]
|
||
updates = {}
|
||
|
||
# Get version from variables
|
||
if "version" in self.merged_vars:
|
||
updates["version"] = self.merged_vars["version"]
|
||
|
||
# Get hash from source
|
||
if "hash" in source:
|
||
updates["hash"] = source["hash"]
|
||
|
||
# Get tag from source
|
||
if "tag" in source:
|
||
updates["tag"] = source["tag"]
|
||
|
||
# Get rev from source
|
||
if "rev" in source:
|
||
updates["rev"] = source["rev"]
|
||
|
||
if updates:
|
||
update_homeassistant_component(self.path, name, updates)
|
||
return True
|
||
else:
|
||
# For regular packages, save to version.json
|
||
save_json(self.path, self.spec)
|
||
return True
|
||
|
||
def run(self):
|
||
while True:
|
||
self.stdscr.clear()
|
||
h, w = self.stdscr.getmaxyx()
|
||
|
||
# Draw main border around the entire screen
|
||
draw_border(self.stdscr, 0, 0, h - 1, w)
|
||
|
||
# Row 0: package name + type badge centred in border
|
||
if self.is_python:
|
||
type_tag = " [py]"
|
||
elif self.is_homeassistant:
|
||
type_tag = " [ha]"
|
||
else:
|
||
type_tag = ""
|
||
title = f" {self.pkg_name}{type_tag} "
|
||
title_x = max(1, (w - len(title)) // 2)
|
||
self.stdscr.addstr(
|
||
0,
|
||
title_x,
|
||
title[: w - 2],
|
||
curses.color_pair(COLOR_TITLE) | curses.A_BOLD,
|
||
)
|
||
|
||
# Row 1 left: relative path (dim)
|
||
try:
|
||
rel_path = str(self.path.relative_to(Path("/etc/nixos")))
|
||
except ValueError:
|
||
rel_path = str(self.path)
|
||
self.stdscr.addstr(
|
||
1, 2, rel_path[: w - 4], curses.color_pair(COLOR_NORMAL) | curses.A_DIM
|
||
)
|
||
|
||
# Row 2: Variants or Version
|
||
DETAIL_HDR_ROW = 2 # variant/version row
|
||
DETAIL_SEP_ROW = 3 # separator
|
||
DETAIL_SRC_ROW = 4 # first source row
|
||
|
||
if not self.is_python:
|
||
self.stdscr.addstr(
|
||
DETAIL_HDR_ROW, 2, "Variants:", curses.color_pair(COLOR_HEADER)
|
||
)
|
||
x_pos = 12
|
||
for i, v in enumerate(self.variants):
|
||
if x_pos >= w - 4:
|
||
break
|
||
if i > 0:
|
||
self.stdscr.addstr(
|
||
DETAIL_HDR_ROW,
|
||
x_pos,
|
||
" | ",
|
||
curses.color_pair(COLOR_NORMAL),
|
||
)
|
||
x_pos += 3
|
||
if i == self.vidx:
|
||
self.stdscr.addstr(
|
||
DETAIL_HDR_ROW,
|
||
x_pos,
|
||
f"[{v}]",
|
||
curses.color_pair(COLOR_HIGHLIGHT),
|
||
)
|
||
else:
|
||
self.stdscr.addstr(
|
||
DETAIL_HDR_ROW, x_pos, v, curses.color_pair(COLOR_NORMAL)
|
||
)
|
||
x_pos += len(v) + (2 if i == self.vidx else 0)
|
||
else:
|
||
version = self.merged_vars.get("version", "")
|
||
self.stdscr.addstr(
|
||
DETAIL_HDR_ROW, 2, "Version:", curses.color_pair(COLOR_HEADER)
|
||
)
|
||
self.stdscr.addstr(
|
||
DETAIL_HDR_ROW, 11, version, curses.color_pair(COLOR_SUCCESS)
|
||
)
|
||
|
||
# Separator + Sources header
|
||
for i in range(1, w - 1):
|
||
self.stdscr.addch(
|
||
DETAIL_SEP_ROW, i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER)
|
||
)
|
||
self.stdscr.addstr(
|
||
DETAIL_SEP_ROW,
|
||
2,
|
||
" Sources ",
|
||
curses.color_pair(COLOR_HEADER) | curses.A_BOLD,
|
||
)
|
||
|
||
# footer occupies h-4 (sep), h-3, h-2, h-1 (status)
|
||
# latest section: separator + up to 3 content rows → y_latest = h-9
|
||
y_latest = h - 9
|
||
|
||
# Source rows — columns: sel+name(20) | fetcher(6) | ref
|
||
SRC_NAME_W = 20
|
||
SRC_FETCH_W = 6
|
||
SRC_REF_COL = 2 + SRC_NAME_W + 1 + SRC_FETCH_W + 1 # col 30
|
||
|
||
# Source rows fit between DETAIL_SRC_ROW and y_latest-1
|
||
_max_src_rows = max(0, y_latest - DETAIL_SRC_ROW)
|
||
for i, name in enumerate(self.snames[:_max_src_rows], start=0):
|
||
comp = self.merged_srcs[name]
|
||
fetcher = comp.get("fetcher", "none")
|
||
display_ref = source_display_ref(comp, self.merged_vars)
|
||
branch = comp.get("branch") or ""
|
||
has_cargo = "cargoHash" in comp
|
||
|
||
# Append badge tokens to the ref string
|
||
badges = ""
|
||
if branch:
|
||
badges += f" [{branch}]"
|
||
if has_cargo:
|
||
badges += " [cargo]"
|
||
ref_text = display_ref + badges
|
||
ref_short = ref_text[: w - SRC_REF_COL - 2] + (
|
||
"…" if len(ref_text) > w - SRC_REF_COL - 2 else ""
|
||
)
|
||
|
||
if i == self.sidx:
|
||
row_attr = curses.color_pair(COLOR_HIGHLIGHT)
|
||
sel = "►"
|
||
else:
|
||
row_attr = curses.color_pair(COLOR_NORMAL)
|
||
sel = " "
|
||
|
||
fetcher_color = COLOR_NORMAL
|
||
if fetcher == "github":
|
||
fetcher_color = COLOR_SUCCESS
|
||
elif fetcher == "url":
|
||
fetcher_color = COLOR_STATUS
|
||
elif fetcher == "git":
|
||
fetcher_color = COLOR_HEADER
|
||
|
||
row = DETAIL_SRC_ROW + i
|
||
self.stdscr.addstr(
|
||
row,
|
||
2,
|
||
f"{sel} {name[: SRC_NAME_W - 2]:<{SRC_NAME_W - 2}}",
|
||
row_attr,
|
||
)
|
||
self.stdscr.addstr(
|
||
row,
|
||
2 + SRC_NAME_W,
|
||
f"{fetcher[:SRC_FETCH_W]:<{SRC_FETCH_W}}",
|
||
curses.color_pair(fetcher_color),
|
||
)
|
||
self.stdscr.addstr(
|
||
row, SRC_REF_COL, ref_short, curses.color_pair(COLOR_NORMAL)
|
||
)
|
||
|
||
# Draw a separator line before the latest candidates section
|
||
for i in range(1, w - 1):
|
||
self.stdscr.addch(
|
||
y_latest, i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER)
|
||
)
|
||
|
||
# Latest candidates section for selected component (auto-fetched)
|
||
if self.snames:
|
||
_sel_name = self.snames[self.sidx]
|
||
_comp = self.merged_srcs[_sel_name]
|
||
_fetcher = _comp.get("fetcher", "none")
|
||
# Preload candidates lazily for selected item
|
||
if (
|
||
_fetcher in ("github", "git", "url")
|
||
and _sel_name not in self.candidates
|
||
):
|
||
self.fetch_candidates_for(_sel_name)
|
||
|
||
# Latest header with decoration — show branch if locked
|
||
_branch = _comp.get("branch") or ""
|
||
_latest_hdr = (
|
||
f"Latest Versions: (branch: {_branch})"
|
||
if _branch
|
||
else "Latest Versions:"
|
||
)
|
||
self.stdscr.addstr(
|
||
y_latest + 1,
|
||
2,
|
||
_latest_hdr[: w - 4],
|
||
curses.color_pair(COLOR_HEADER) | curses.A_BOLD,
|
||
)
|
||
|
||
if _fetcher in ("github", "git"):
|
||
_cand = self.candidates.get(_sel_name, {})
|
||
_dim = curses.color_pair(COLOR_DIM) | curses.A_DIM
|
||
|
||
def _put_cand(
|
||
row: int, label: str, value: str, date: str, val_color: int
|
||
):
|
||
"""Write one candidate row: Label value date"""
|
||
lbl_w = 9 # "Release: " etc
|
||
self.stdscr.addstr(
|
||
row, 4, f"{label:<{lbl_w}}", curses.color_pair(COLOR_HEADER)
|
||
)
|
||
val_end = 4 + lbl_w + len(value)
|
||
self.stdscr.addstr(
|
||
row,
|
||
4 + lbl_w,
|
||
value[: w - 4 - lbl_w - 2],
|
||
curses.color_pair(val_color),
|
||
)
|
||
if date and val_end + 2 < w - 2:
|
||
self.stdscr.addstr(row, val_end + 1, date, _dim)
|
||
|
||
_row = y_latest + 2
|
||
if _cand.get("release"):
|
||
_put_cand(
|
||
_row,
|
||
"Release:",
|
||
_cand["release"],
|
||
_cand.get("release_date", ""),
|
||
COLOR_SUCCESS,
|
||
)
|
||
_row += 1
|
||
if _cand.get("tag"):
|
||
_put_cand(
|
||
_row,
|
||
"Tag:",
|
||
_cand["tag"],
|
||
_cand.get("tag_date", ""),
|
||
COLOR_SUCCESS,
|
||
)
|
||
_row += 1
|
||
if _cand.get("commit"):
|
||
_put_cand(
|
||
_row,
|
||
"Commit:",
|
||
(_cand["commit"] or "")[:12],
|
||
_cand.get("commit_date", ""),
|
||
COLOR_NORMAL,
|
||
)
|
||
|
||
elif _fetcher == "url":
|
||
_cand_u = self.url_candidates.get(_sel_name, {}) or {}
|
||
_cand_r = self.candidates.get(_sel_name, {})
|
||
_dim = curses.color_pair(COLOR_DIM) | curses.A_DIM
|
||
_url_date = _cand_r.get("release_date", "")
|
||
_urow = y_latest + 2
|
||
|
||
_tag = _cand_u.get("tag") or (_cand_r.get("release") or "")
|
||
if _tag:
|
||
self.stdscr.addstr(
|
||
_urow, 4, "Tag: ", curses.color_pair(COLOR_HEADER)
|
||
)
|
||
self.stdscr.addstr(
|
||
_urow, 13, _tag[: w - 16], curses.color_pair(COLOR_SUCCESS)
|
||
)
|
||
if _url_date and 13 + len(_tag) + 2 < w - 2:
|
||
self.stdscr.addstr(
|
||
_urow, 13 + len(_tag) + 1, _url_date, _dim
|
||
)
|
||
_urow += 1
|
||
|
||
if _cand_u.get("base") and _cand_u.get("release"):
|
||
_b = _cand_u["base"]
|
||
_r = _cand_u["release"]
|
||
self.stdscr.addstr(
|
||
_urow,
|
||
4,
|
||
f"base={_b} release={_r}"[: w - 6],
|
||
curses.color_pair(COLOR_NORMAL),
|
||
)
|
||
|
||
else:
|
||
if self.pkg_name == "linux-cachyos" and _sel_name == "linux":
|
||
_suffix = self.cachyos_suffix()
|
||
_latest = self.fetch_cachyos_linux_latest(_suffix)
|
||
self.stdscr.addstr(
|
||
y_latest + 2,
|
||
4,
|
||
"PKGBUILD version:",
|
||
curses.color_pair(COLOR_HEADER),
|
||
)
|
||
self.stdscr.addstr(
|
||
y_latest + 2,
|
||
21,
|
||
_latest or "-",
|
||
curses.color_pair(
|
||
COLOR_SUCCESS if _latest else COLOR_NORMAL
|
||
),
|
||
)
|
||
elif self.pkg_name == "linux-cachyos" and _sel_name == "zfs":
|
||
_pkgb_commit = self.fetch_cachyos_zfs_commit()
|
||
_cur_rev = self.merged_srcs.get("zfs", {}).get("rev", "")
|
||
_dim = curses.color_pair(COLOR_DIM) | curses.A_DIM
|
||
self.stdscr.addstr(
|
||
y_latest + 2,
|
||
4,
|
||
"PKGBUILD commit:",
|
||
curses.color_pair(COLOR_HEADER),
|
||
)
|
||
if _pkgb_commit:
|
||
_same = _pkgb_commit == _cur_rev
|
||
_col = COLOR_NORMAL if _same else COLOR_SUCCESS
|
||
self.stdscr.addstr(
|
||
y_latest + 2,
|
||
21,
|
||
_pkgb_commit[:12],
|
||
curses.color_pair(_col),
|
||
)
|
||
if _same:
|
||
self.stdscr.addstr(
|
||
y_latest + 2, 34, "(up to date)", _dim
|
||
)
|
||
else:
|
||
self.stdscr.addstr(
|
||
y_latest + 2, 21, "-", curses.color_pair(COLOR_NORMAL)
|
||
)
|
||
else:
|
||
self.stdscr.addstr(
|
||
y_latest + 2,
|
||
4,
|
||
"No candidates available",
|
||
curses.color_pair(COLOR_NORMAL),
|
||
)
|
||
|
||
# Separator before footer
|
||
for i in range(1, w - 1):
|
||
self.stdscr.addch(
|
||
h - 4, i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER)
|
||
)
|
||
|
||
# Footer: two concise lines
|
||
footer1 = "Enter:actions r:refresh h:hash i:url e:edit s:save"
|
||
footer2 = "←/→:variant k/j:source Bksp:back q:quit"
|
||
f1x = max(1, (w - len(footer1)) // 2)
|
||
f2x = max(1, (w - len(footer2)) // 2)
|
||
self.stdscr.addstr(
|
||
h - 3, f1x, footer1[: w - 2], curses.color_pair(COLOR_STATUS)
|
||
)
|
||
self.stdscr.addstr(
|
||
h - 2, f2x, footer2[: w - 2], curses.color_pair(COLOR_STATUS)
|
||
)
|
||
|
||
# Draw status at the bottom
|
||
self.draw_status(h, w)
|
||
self.stdscr.refresh()
|
||
|
||
ch = self.stdscr.getch()
|
||
if ch in (ord("q"), 27):
|
||
return None
|
||
elif ch == curses.KEY_BACKSPACE or ch == 127:
|
||
return "reload"
|
||
elif ch in (curses.KEY_LEFT,):
|
||
self.vidx = max(0, self.vidx - 1)
|
||
self.select_variant()
|
||
elif ch in (curses.KEY_RIGHT,):
|
||
self.vidx = min(len(self.variants) - 1, self.vidx + 1)
|
||
self.select_variant()
|
||
elif ch in (curses.KEY_UP, ord("k")):
|
||
self.sidx = max(0, self.sidx - 1)
|
||
elif ch in (curses.KEY_DOWN, ord("j")):
|
||
self.sidx = min(len(self.snames) - 1, self.sidx + 1)
|
||
elif ch in (ord("r"),):
|
||
if self.snames:
|
||
name = self.snames[self.sidx]
|
||
comp = self.merged_srcs[name]
|
||
fetcher = comp.get("fetcher", "none")
|
||
if self.pkg_name == "linux-cachyos" and name == "linux":
|
||
# Show available linux version from upstream PKGBUILD (.SRCINFO)
|
||
suffix = self.cachyos_suffix()
|
||
latest = self.fetch_cachyos_linux_latest(suffix)
|
||
rendered = render_templates(comp, self.merged_vars)
|
||
cur_version = str(rendered.get("version") or "")
|
||
url_hint = (
|
||
self.linux_tarball_url_for_version(latest)
|
||
if latest
|
||
else "-"
|
||
)
|
||
lines = [
|
||
f"linux-cachyos ({'base' if self.vidx == 0 else self.variants[self.vidx]}):",
|
||
f" current : {cur_version or '-'}",
|
||
f" available: {latest or '-'}",
|
||
f" tarball : {url_hint}",
|
||
]
|
||
show_popup(self.stdscr, lines)
|
||
elif self.pkg_name == "linux-cachyos" and name == "zfs":
|
||
pkgbuild_commit = self.fetch_cachyos_zfs_commit()
|
||
cur_rev = comp.get("rev", "")
|
||
up_to_date = pkgbuild_commit and pkgbuild_commit == cur_rev
|
||
lines = [
|
||
f"linux-cachyos/zfs ({'base' if self.vidx == 0 else self.variants[self.vidx]}):",
|
||
f" current : {cur_rev[:12] or '-'}",
|
||
f" PKGBUILD : {pkgbuild_commit[:12] if pkgbuild_commit else '-'}",
|
||
f" status : {'up to date' if up_to_date else 'update available' if pkgbuild_commit else 'unknown'}",
|
||
]
|
||
show_popup(self.stdscr, lines)
|
||
else:
|
||
self.fetch_candidates_for(name)
|
||
cand = self.candidates.get(name, {})
|
||
branch = comp.get("branch") or ""
|
||
|
||
def _fmt(val: str, date: str) -> str:
|
||
return f"{val} {date}" if val and date else (val or "-")
|
||
|
||
lines = [
|
||
f"Candidates for {name}:"
|
||
+ (f" (branch: {branch})" if branch else ""),
|
||
f" latest release: {_fmt(cand.get('release', ''), cand.get('release_date', ''))}",
|
||
f" latest tag : {_fmt(cand.get('tag', ''), cand.get('tag_date', ''))}",
|
||
f" latest commit : {_fmt(cand.get('commit', '')[:12] if cand.get('commit') else '', cand.get('commit_date', ''))}",
|
||
]
|
||
show_popup(self.stdscr, lines)
|
||
elif ch in (ord("i"),):
|
||
# Show full rendered URL for URL-based sources
|
||
if self.snames:
|
||
name = self.snames[self.sidx]
|
||
comp = self.merged_srcs[name]
|
||
if comp.get("fetcher", "none") == "url":
|
||
rendered = render_templates(comp, self.merged_vars)
|
||
url = rendered.get("url") or rendered.get("urlTemplate") or ""
|
||
if url:
|
||
show_popup(self.stdscr, ["Full URL:", url])
|
||
else:
|
||
self.set_status("No URL available")
|
||
elif ch in (ord("h"),):
|
||
if self.snames:
|
||
name = self.snames[self.sidx]
|
||
sri = self.prefetch_hash_for(name)
|
||
if sri:
|
||
ts = self.target_dict.setdefault("sources", {})
|
||
compw = ts.setdefault(name, {})
|
||
compw["hash"] = sri
|
||
self._refresh_merged()
|
||
# If this source also has a cargoHash, recompute it now
|
||
if self._source_has_cargo(name):
|
||
self.set_status(
|
||
f"{name}: updated hash; computing cargo hash..."
|
||
)
|
||
self.stdscr.refresh()
|
||
cargo_sri = self.prefetch_cargo_hash_for(name)
|
||
if cargo_sri:
|
||
compw["cargoHash"] = cargo_sri
|
||
self._apply_cargo_hash_to_sibling(name, cargo_sri)
|
||
self._refresh_merged()
|
||
self.set_status(f"{name}: updated hash + cargo hash")
|
||
else:
|
||
self.set_status(
|
||
f"{name}: updated hash; cargo hash failed"
|
||
)
|
||
else:
|
||
self.set_status(f"{name}: updated hash")
|
||
else:
|
||
self.set_status(f"{name}: hash prefetch failed")
|
||
elif ch in (ord("e"),):
|
||
s = prompt_input(
|
||
self.stdscr, "Edit path=value (relative to selected base/variant): "
|
||
)
|
||
if s:
|
||
if "=" not in s:
|
||
self.set_status("Invalid input, expected key.path=value")
|
||
else:
|
||
k, v = s.split("=", 1)
|
||
path = [p for p in k.split(".") if p]
|
||
deep_set(self.cursor, path, v)
|
||
self.set_status(f"Set {k}={v}")
|
||
elif ch in (ord("s"),):
|
||
try:
|
||
self.save()
|
||
self.set_status("Saved.")
|
||
except Exception as e:
|
||
self.set_status(f"Save failed: {e}")
|
||
elif ch in (curses.KEY_ENTER, 10, 13):
|
||
if not self.snames:
|
||
continue
|
||
name = self.snames[self.sidx]
|
||
comp = self.merged_srcs[name]
|
||
fetcher = comp.get("fetcher", "none")
|
||
if fetcher in ("github", "git"):
|
||
# Ensure candidates loaded
|
||
if name not in self.candidates:
|
||
self.fetch_candidates_for(name)
|
||
cand = self.candidates.get(name, {})
|
||
branch = comp.get("branch") or ""
|
||
# Present small menu
|
||
items = []
|
||
if fetcher == "github":
|
||
# When branch-locked, only offer latest commit (tags are irrelevant)
|
||
if branch:
|
||
items = [
|
||
(
|
||
"Use latest commit (rev)",
|
||
("commit", cand.get("commit")),
|
||
),
|
||
("Recompute hash", ("hash", None)),
|
||
("Cancel", ("cancel", None)),
|
||
]
|
||
else:
|
||
items = [
|
||
(
|
||
"Use latest release (tag)",
|
||
("release", cand.get("release")),
|
||
),
|
||
("Use latest tag", ("tag", cand.get("tag"))),
|
||
(
|
||
"Use latest commit (rev)",
|
||
("commit", cand.get("commit")),
|
||
),
|
||
("Recompute hash", ("hash", None)),
|
||
("Cancel", ("cancel", None)),
|
||
]
|
||
else:
|
||
items = [
|
||
("Use latest commit (rev)", ("commit", cand.get("commit"))),
|
||
("Recompute hash", ("hash", None)),
|
||
("Cancel", ("cancel", None)),
|
||
]
|
||
# Inject cargo hash option before Cancel when applicable
|
||
has_cargo = self._source_has_cargo(name)
|
||
if has_cargo:
|
||
items = [item for item in items if item[1][0] != "cancel"] + [
|
||
("Recompute cargo hash", ("cargo_hash", None)),
|
||
("Cancel", ("cancel", None)),
|
||
]
|
||
# Build header with current and available refs
|
||
rendered = render_templates(comp, self.merged_vars)
|
||
cur_tag = rendered.get("tag") or ""
|
||
cur_rev = rendered.get("rev") or ""
|
||
cur_version = rendered.get("version") or ""
|
||
if cur_tag:
|
||
current_str = f"current: tag={cur_tag}"
|
||
elif cur_rev:
|
||
current_str = f"current: rev={cur_rev[:12]}"
|
||
elif cur_version:
|
||
current_str = f"current: version={cur_version}"
|
||
else:
|
||
current_str = "current: -"
|
||
if branch:
|
||
current_str += f" (branch: {branch})"
|
||
cur_cargo = comp.get("cargoHash", "")
|
||
|
||
def _av(val: str, date: str) -> str:
|
||
v = val or "-"
|
||
return f"{v} {date}" if val and date else v
|
||
|
||
header_lines = [
|
||
current_str,
|
||
f"available:",
|
||
f" release : {_av(cand.get('release', ''), cand.get('release_date', ''))}",
|
||
f" tag : {_av(cand.get('tag', ''), cand.get('tag_date', ''))}",
|
||
f" commit : {_av((cand.get('commit') or '')[:12], cand.get('commit_date', ''))}",
|
||
]
|
||
if has_cargo:
|
||
header_lines.append(
|
||
f"cargoHash: {cur_cargo[:32] + '...' if len(cur_cargo) > 32 else cur_cargo or '-'}"
|
||
)
|
||
choice = select_menu(
|
||
self.stdscr,
|
||
f"Actions for {name}",
|
||
[label for label, _ in items],
|
||
header=header_lines,
|
||
)
|
||
if choice is not None:
|
||
kind, val = items[choice][1]
|
||
if kind in ("release", "tag", "commit"):
|
||
if val:
|
||
self.set_ref(name, kind, val)
|
||
# update src hash
|
||
sri = self.prefetch_hash_for(name)
|
||
if sri:
|
||
ts = self.target_dict.setdefault("sources", {})
|
||
compw = ts.setdefault(name, {})
|
||
compw["hash"] = sri
|
||
self._refresh_merged()
|
||
# also update cargo hash if applicable
|
||
if has_cargo:
|
||
self.set_status(
|
||
f"{name}: set {kind}, hashing (src)..."
|
||
)
|
||
cargo_sri = self.prefetch_cargo_hash_for(name)
|
||
if cargo_sri:
|
||
ts = self.target_dict.setdefault("sources", {})
|
||
compw = ts.setdefault(name, {})
|
||
compw["cargoHash"] = cargo_sri
|
||
self._apply_cargo_hash_to_sibling(
|
||
name, cargo_sri
|
||
)
|
||
self._refresh_merged()
|
||
self.set_status(
|
||
f"{name}: set {kind}, updated src + cargo hash"
|
||
)
|
||
else:
|
||
self.set_status(
|
||
f"{name}: set {kind}, updated src hash; cargo hash failed"
|
||
)
|
||
else:
|
||
self.set_status(
|
||
f"{name}: set {kind} and updated hash"
|
||
)
|
||
else:
|
||
self.set_status(f"No candidate {kind}")
|
||
elif kind == "hash":
|
||
sri = self.prefetch_hash_for(name)
|
||
if sri:
|
||
ts = self.target_dict.setdefault("sources", {})
|
||
compw = ts.setdefault(name, {})
|
||
compw["hash"] = sri
|
||
self._refresh_merged()
|
||
self.set_status(f"{name}: updated hash")
|
||
else:
|
||
self.set_status("hash prefetch failed")
|
||
elif kind == "cargo_hash":
|
||
self.set_status(f"{name}: computing cargo hash...")
|
||
self.stdscr.refresh()
|
||
cargo_sri = self.prefetch_cargo_hash_for(name)
|
||
if cargo_sri:
|
||
ts = self.target_dict.setdefault("sources", {})
|
||
compw = ts.setdefault(name, {})
|
||
compw["cargoHash"] = cargo_sri
|
||
self._apply_cargo_hash_to_sibling(name, cargo_sri)
|
||
self._refresh_merged()
|
||
self.set_status(f"{name}: updated cargo hash")
|
||
else:
|
||
self.set_status(
|
||
f"{name}: cargo hash computation failed"
|
||
)
|
||
else:
|
||
pass
|
||
elif fetcher == "url":
|
||
# Offer latest release update (for proton-cachyos-like schemas) and/or hash recompute
|
||
cand = self.url_candidates.get(name)
|
||
menu_items: List[
|
||
Tuple[str, Tuple[str, Optional[Dict[str, str]]]]
|
||
] = []
|
||
if cand and cand.get("base") and cand.get("release"):
|
||
menu_items.append(
|
||
(
|
||
"Use latest release (update variables.base/release)",
|
||
("update_vars", cand),
|
||
)
|
||
)
|
||
menu_items.append(("Recompute hash (prefetch)", ("hash", None)))
|
||
menu_items.append(("Cancel", ("cancel", None)))
|
||
|
||
# Build header with current and available release info
|
||
base = str(self.merged_vars.get("base") or "")
|
||
rel = str(self.merged_vars.get("release") or "")
|
||
rp = str(self.merged_vars.get("releasePrefix") or "")
|
||
rs = str(self.merged_vars.get("releaseSuffix") or "")
|
||
current_tag = f"{rp}{base}-{rel}{rs}" if (base and rel) else ""
|
||
if current_tag:
|
||
current_str = f"current: {current_tag}"
|
||
elif base or rel:
|
||
current_str = (
|
||
f"current: base={base or '-'} release={rel or '-'}"
|
||
)
|
||
else:
|
||
current_str = "current: -"
|
||
header_lines = [
|
||
current_str,
|
||
f"available: tag={(cand.get('tag') or '-') if cand else '-'} base={(cand.get('base') or '-') if cand else '-'} release={(cand.get('release') or '-') if cand else '-'}",
|
||
]
|
||
choice = select_menu(
|
||
self.stdscr,
|
||
f"Actions for {name}",
|
||
[label for label, _ in menu_items],
|
||
header=header_lines,
|
||
)
|
||
if choice is not None:
|
||
kind, payload = menu_items[choice][1]
|
||
if kind == "update_vars" and isinstance(payload, dict):
|
||
# Write variables into selected base/variant dict
|
||
vars_dict = self.target_dict.setdefault("variables", {})
|
||
vars_dict["base"] = payload["base"]
|
||
vars_dict["release"] = payload["release"]
|
||
# Recompute merged view to reflect new variables
|
||
self.recompute_view()
|
||
# Prefetch and update hash
|
||
sri = self.prefetch_hash_for(name)
|
||
if sri:
|
||
ts = self.target_dict.setdefault("sources", {})
|
||
compw = ts.setdefault(name, {})
|
||
compw["hash"] = sri
|
||
self.set_status(
|
||
f"{name}: updated to {payload['base']}.{payload['release']} and refreshed hash"
|
||
)
|
||
else:
|
||
self.set_status(
|
||
"hash prefetch failed after variable update"
|
||
)
|
||
elif kind == "hash":
|
||
sri = self.prefetch_hash_for(name)
|
||
if sri:
|
||
ts = self.target_dict.setdefault("sources", {})
|
||
compw = ts.setdefault(name, {})
|
||
compw["hash"] = sri
|
||
self.set_status(f"{name}: updated hash")
|
||
else:
|
||
self.set_status("hash prefetch failed")
|
||
else:
|
||
pass
|
||
else:
|
||
if self.pkg_name == "linux-cachyos" and name == "linux":
|
||
# Offer update of linux version from upstream PKGBUILD (.SRCINFO)
|
||
suffix = self.cachyos_suffix()
|
||
latest = self.fetch_cachyos_linux_latest(suffix)
|
||
rendered = render_templates(comp, self.merged_vars)
|
||
cur_version = str(rendered.get("version") or "")
|
||
regen_flavors = self._cachyos_regen_flavors()
|
||
header_lines = [
|
||
f"current: version={cur_version or '-'}",
|
||
f"available: version={latest or '-'}",
|
||
f"regen flavors ({len(regen_flavors)}): {', '.join(regen_flavors[:4])}{'...' if len(regen_flavors) > 4 else ''}",
|
||
]
|
||
opts = []
|
||
if latest:
|
||
opts.append(
|
||
f"Update linux version to {latest} from PKGBUILD (.SRCINFO)"
|
||
)
|
||
else:
|
||
opts.append("Update linux version from PKGBUILD (.SRCINFO)")
|
||
opts.append(
|
||
f"Regen all config.nix files ({len(regen_flavors)} flavors)"
|
||
)
|
||
opts.append("Cancel")
|
||
choice = select_menu(
|
||
self.stdscr,
|
||
f"Actions for {name}",
|
||
opts,
|
||
header=header_lines,
|
||
)
|
||
if choice is not None:
|
||
chosen = opts[choice]
|
||
if chosen.startswith("Update linux version") and latest:
|
||
self.update_linux_from_pkgbuild(name)
|
||
elif chosen.startswith("Regen all config.nix"):
|
||
self.regen_config_nix()
|
||
elif self.pkg_name == "linux-cachyos" and name == "zfs":
|
||
# ZFS commit is pinned in the PKGBUILD — read it from there
|
||
pkgbuild_commit = self.fetch_cachyos_zfs_commit()
|
||
rendered = render_templates(comp, self.merged_vars)
|
||
cur_rev = str(comp.get("rev") or "")
|
||
header_lines = [
|
||
f"current: {cur_rev[:12] or '-'}",
|
||
f"PKGBUILD: {pkgbuild_commit[:12] if pkgbuild_commit else '-'}",
|
||
]
|
||
opts = []
|
||
if pkgbuild_commit:
|
||
opts.append(
|
||
f"Update to PKGBUILD commit ({pkgbuild_commit[:12]})"
|
||
)
|
||
opts.append("Recompute hash")
|
||
opts.append("Cancel")
|
||
choice = select_menu(
|
||
self.stdscr,
|
||
f"Actions for {name}",
|
||
opts,
|
||
header=header_lines,
|
||
)
|
||
if choice is not None:
|
||
chosen = opts[choice]
|
||
if (
|
||
chosen.startswith("Update to PKGBUILD")
|
||
and pkgbuild_commit
|
||
):
|
||
self.set_status("zfs: fetching commit and hash...")
|
||
self.stdscr.refresh()
|
||
ts = self.target_dict.setdefault("sources", {})
|
||
compw = ts.setdefault(name, {})
|
||
compw["rev"] = pkgbuild_commit
|
||
self._refresh_merged()
|
||
sri = self.prefetch_hash_for(name)
|
||
if sri:
|
||
compw["hash"] = sri
|
||
self._refresh_merged()
|
||
self.set_status(
|
||
f"zfs: updated to {pkgbuild_commit[:12]} and refreshed hash"
|
||
)
|
||
else:
|
||
self.set_status(
|
||
"zfs: updated rev but hash prefetch failed"
|
||
)
|
||
elif chosen == "Recompute hash":
|
||
self.set_status("zfs: recomputing hash...")
|
||
self.stdscr.refresh()
|
||
sri = self.prefetch_hash_for(name)
|
||
if sri:
|
||
ts = self.target_dict.setdefault("sources", {})
|
||
compw = ts.setdefault(name, {})
|
||
compw["hash"] = sri
|
||
self._refresh_merged()
|
||
self.set_status("zfs: updated hash")
|
||
else:
|
||
self.set_status("zfs: hash prefetch failed")
|
||
else:
|
||
show_popup(
|
||
self.stdscr,
|
||
[
|
||
f"{name}: fetcher={fetcher}",
|
||
"Use 'e' to edit fields manually.",
|
||
],
|
||
)
|
||
else:
|
||
pass
|
||
|
||
|
||
def select_menu(
|
||
stdscr, title: str, options: List[str], header: Optional[List[str]] = None
|
||
) -> Optional[int]:
|
||
idx = 0
|
||
while True:
|
||
stdscr.clear()
|
||
h, w = stdscr.getmaxyx()
|
||
|
||
# Calculate menu dimensions — account for title, header lines, and options
|
||
max_opt_len = max((len(opt) + 4 for opt in options), default=0)
|
||
max_hdr_len = max((len(str(l)) + 4 for l in header), default=0) if header else 0
|
||
title_len = len(title) + 4
|
||
menu_width = min(w - 4, max(44, title_len, max_opt_len, max_hdr_len))
|
||
menu_height = min(h - 4, len(options) + (len(header) + 1 if header else 0) + 4)
|
||
|
||
# Calculate position for centered menu
|
||
start_x = (w - menu_width) // 2
|
||
start_y = (h - menu_height) // 2
|
||
|
||
# Draw border around menu
|
||
draw_border(stdscr, start_y, start_x, menu_height, menu_width)
|
||
|
||
# Draw title
|
||
title_x = start_x + (menu_width - len(title)) // 2
|
||
stdscr.addstr(
|
||
start_y,
|
||
title_x,
|
||
f" {title} ",
|
||
curses.color_pair(COLOR_TITLE) | curses.A_BOLD,
|
||
)
|
||
|
||
# Draw header if provided
|
||
y = start_y + 1
|
||
if header:
|
||
for line in header:
|
||
if y >= start_y + menu_height - 2:
|
||
break
|
||
# Ensure we don't write beyond menu width
|
||
line_str = str(line)
|
||
if len(line_str) > menu_width - 4:
|
||
line_str = line_str[: menu_width - 7] + "..."
|
||
stdscr.addstr(
|
||
y,
|
||
start_x + 2,
|
||
line_str,
|
||
curses.color_pair(COLOR_HEADER),
|
||
)
|
||
y += 1
|
||
|
||
# Add separator line after header
|
||
for i in range(1, menu_width - 1):
|
||
stdscr.addch(
|
||
y, start_x + i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER)
|
||
)
|
||
y += 1
|
||
|
||
# Draw options
|
||
options_start_y = y
|
||
max_visible_options = max(1, start_y + menu_height - options_start_y - 1)
|
||
visible_options = min(len(options), max_visible_options)
|
||
|
||
for i, opt in enumerate(options[:visible_options], start=0):
|
||
# Highlight selected option
|
||
if i == idx:
|
||
attr = curses.color_pair(COLOR_HIGHLIGHT)
|
||
sel = "►" # Use a fancier selector
|
||
else:
|
||
attr = curses.color_pair(COLOR_NORMAL)
|
||
sel = " "
|
||
|
||
# Truncate long options to fit in menu
|
||
opt_text = f"{sel} {opt}"
|
||
if len(opt_text) > menu_width - 4:
|
||
opt_text = opt_text[: menu_width - 7] + "..."
|
||
stdscr.addstr(options_start_y + i, start_x + 2, opt_text, attr)
|
||
|
||
# Draw footer
|
||
footer = "Enter: select | Backspace: cancel"
|
||
footer_x = start_x + (menu_width - len(footer)) // 2
|
||
stdscr.addstr(
|
||
start_y + menu_height - 1, footer_x, footer, curses.color_pair(COLOR_STATUS)
|
||
)
|
||
|
||
stdscr.refresh()
|
||
ch = stdscr.getch()
|
||
if ch in (curses.KEY_UP, ord("k")):
|
||
idx = max(0, idx - 1)
|
||
elif ch in (curses.KEY_DOWN, ord("j")):
|
||
idx = min(len(options) - 1, idx + 1)
|
||
elif ch in (curses.KEY_ENTER, 10, 13):
|
||
return idx
|
||
elif ch == curses.KEY_BACKSPACE or ch == 127 or ch == 27:
|
||
return None
|
||
|
||
|
||
# ------------------------------ main ------------------------------
|
||
|
||
|
||
def main(stdscr):
|
||
curses.curs_set(0) # Hide cursor
|
||
stdscr.nodelay(False) # Blocking input
|
||
|
||
# Initialize colors
|
||
if curses.has_colors():
|
||
init_colors()
|
||
|
||
try:
|
||
screen = PackagesScreen(stdscr)
|
||
screen.run()
|
||
except Exception:
|
||
curses.endwin()
|
||
traceback.print_exc()
|
||
sys.exit(1)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
curses.wrapper(main)
|