Files
nix-config/scripts/version_tui.py
mjallen18 d17d096a97 versions
2026-03-04 13:43:18 -06:00

2621 lines
103 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Interactive TUI for browsing and updating unified version.json files.
Features:
- Scans packages/**/version.json and lists all packages
- Per-package view:
- Choose base or any variant
- List all sources/components with current ref (tag/rev/url/version) and hash
- For GitHub sources: fetch candidates (latest release tag, latest tag, latest commit)
- For Git sources: fetch latest commit (HEAD)
- For URL sources: recompute hash (url/urlTemplate with rendered variables)
- Actions on a component:
- Update to one of the candidates (sets tag or rev) and optionally re-hash
- Recompute hash (prefetch)
- Edit any field via path=value (e.g., variables.version=2025.07)
- Writes changes back to version.json
Dependencies:
- Standard library + external CLI tools:
- nix-prefetch-url (or `nix prefetch-url`) and `nix hash to-sri`
- nix-prefetch-git
- git
- Optional: GITHUB_TOKEN env var to increase GitHub API rate limits
Usage:
scripts/version_tui.py
Controls:
- Up/Down to navigate lists
- Enter to select
- Backspace to go back
- q to quit
- On component screen:
r = refresh candidates
h = recompute hash (prefetch)
e = edit arbitrary field (path=value)
s = save to disk
"""
import curses
import json
import os
import re
import subprocess
import sys
import traceback
import urllib.request
import urllib.error
from urllib.parse import urlparse
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
ROOT = Path(__file__).resolve().parents[1]
PKGS_DIR = ROOT / "packages"
Json = Dict[str, Any]
# ------------------------------ Utilities ------------------------------
def eprintln(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
def load_json(path: Path) -> Json:
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def save_json(path: Path, data: Json):
tmp = path.with_suffix(".tmp")
with tmp.open("w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
f.write("\n")
tmp.replace(path)
def render_templates(value: Any, variables: Dict[str, Any]) -> Any:
if isinstance(value, str):
def repl(m):
name = m.group(1)
return str(variables.get(name, m.group(0)))
return re.sub(r"\$\{([^}]+)\}", repl, value)
elif isinstance(value, dict):
return {k: render_templates(v, variables) for k, v in value.items()}
elif isinstance(value, list):
return [render_templates(v, variables) for v in value]
return value
def deep_set(o: Json, path: List[str], value: Any):
cur = o
for p in path[:-1]:
if p not in cur or not isinstance(cur[p], dict):
cur[p] = {}
cur = cur[p]
cur[path[-1]] = value
# ------------------------------ Merge helpers (match lib/versioning.nix) ------------------------------
def deep_merge(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]:
out = dict(a)
for k, v in b.items():
if k in out and isinstance(out[k], dict) and isinstance(v, dict):
out[k] = deep_merge(out[k], v)
else:
out[k] = v
return out
def merge_sources(
base_sources: Dict[str, Any], overrides: Dict[str, Any]
) -> Dict[str, Any]:
names = set(base_sources.keys()) | set(overrides.keys())
result: Dict[str, Any] = {}
for n in names:
if n in base_sources and n in overrides:
if isinstance(base_sources[n], dict) and isinstance(overrides[n], dict):
result[n] = deep_merge(base_sources[n], overrides[n])
else:
result[n] = overrides[n]
elif n in overrides:
result[n] = overrides[n]
else:
result[n] = base_sources[n]
return result
def merged_view(
spec: Json, variant_name: Optional[str]
) -> Tuple[Dict[str, Any], Dict[str, Any], Json]:
"""
Returns (merged_variables, merged_sources, target_dict_to_write)
merged_* are used for display/prefetch; target_dict_to_write is where updates must be written (base or selected variant).
"""
base_vars = spec.get("variables", {}) or {}
base_sources = spec.get("sources", {}) or {}
if variant_name:
vdict = spec.get("variants", {}).get(variant_name)
if not isinstance(vdict, dict):
raise ValueError(f"Variant '{variant_name}' not found")
v_vars = vdict.get("variables", {}) or {}
v_sources = vdict.get("sources", {}) or {}
merged_vars = dict(base_vars)
merged_vars.update(v_vars)
merged_srcs = merge_sources(base_sources, v_sources)
return merged_vars, merged_srcs, vdict
else:
return dict(base_vars), dict(base_sources), spec
def run_cmd(args: List[str]) -> Tuple[int, str, str]:
try:
p = subprocess.run(
args, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False
)
return p.returncode, p.stdout.strip(), p.stderr.strip()
except Exception as e:
return 1, "", str(e)
def run_get_stdout(args: List[str]) -> Optional[str]:
code, out, err = run_cmd(args)
if code != 0:
eprintln(f"Command failed: {' '.join(args)}\n{err}")
return None
return out
def nix_prefetch_url(url: str) -> Optional[str]:
# returns SRI
# Try modern nix store prefetch-file first (it returns SRI format directly)
out = run_get_stdout(
["nix", "store", "prefetch-file", "--hash-type", "sha256", "--json", url]
)
if out is not None and out.strip():
try:
data = json.loads(out)
if "hash" in data:
return data["hash"]
except Exception:
pass
# Fallback to legacy nix-prefetch-url
out = run_get_stdout(["nix-prefetch-url", "--type", "sha256", url])
if out is None:
out = run_get_stdout(["nix-prefetch-url", url])
if out is None:
return None
# Convert to SRI
sri = run_get_stdout(["nix", "hash", "to-sri", "--type", "sha256", out.strip()])
return sri
def nix_prefetch_git(url: str, rev: str) -> Optional[str]:
# Try two-step approach: fetchGit then hash path
expr = f'builtins.fetchGit {{ url = "{url}"; rev = "{rev}"; }}'
out = run_get_stdout(["nix", "eval", "--raw", "--expr", expr])
if out is not None and out.strip():
# Now hash the fetched path
hash_out = run_get_stdout(
["nix", "hash", "path", "--type", "sha256", out.strip()]
)
if hash_out is not None and hash_out.strip():
return hash_out.strip()
# Fallback to nix-prefetch-git
out = run_get_stdout(["nix-prefetch-git", "--no-deepClone", "--rev", rev, url])
if out is None:
return None
base32 = None
try:
data = json.loads(out)
base32 = data.get("sha256") or data.get("hash")
except Exception:
lines = [l for l in out.splitlines() if l.strip()]
if lines:
base32 = lines[-1].strip()
if not base32:
return None
# Convert to SRI
sri = run_get_stdout(["nix", "hash", "to-sri", "--type", "sha256", base32])
return sri
def nix_prefetch_cargo_vendor(
fetcher: str,
src_hash: str,
*,
url: str = "",
owner: str = "",
repo: str = "",
rev: str = "",
subdir: str = "",
) -> Optional[str]:
"""
Compute the cargo vendor hash for a Rust source using nix build + fakeHash.
Builds rustPlatform.fetchCargoVendor with lib.fakeHash, parses the correct
hash from the 'got:' line in nix's error output.
Args:
fetcher: "github" or "git"
src_hash: SRI hash of the source (already known)
url: git URL (for "git" fetcher)
owner/repo: GitHub owner and repo (for "github" fetcher)
rev: tag or commit rev
subdir: optional subdirectory within the source that contains Cargo.lock
Returns:
SRI hash string, or None on failure.
"""
if fetcher == "github" and owner and repo and rev and src_hash:
src_expr = (
f'pkgs.fetchFromGitHub {{ owner = "{owner}"; repo = "{repo}";'
f' rev = "{rev}"; hash = "{src_hash}"; }}'
)
elif fetcher == "git" and url and rev and src_hash:
# For GitHub git URLs, fetchFromGitHub is more reliable than fetchgit
parsed = urlparse(url)
parts = [p for p in parsed.path.split("/") if p]
if parsed.hostname in ("github.com",) and len(parts) >= 2:
gh_owner, gh_repo = parts[0], parts[1]
src_expr = (
f'pkgs.fetchFromGitHub {{ owner = "{gh_owner}"; repo = "{gh_repo}";'
f' rev = "{rev}"; hash = "{src_hash}"; }}'
)
else:
src_expr = f'pkgs.fetchgit {{ url = "{url}"; rev = "{rev}"; hash = "{src_hash}"; }}'
else:
return None
subdir_attr = f'sourceRoot = "${{src.name}}/{subdir}";' if subdir else ""
expr = (
f"let pkgs = import <nixpkgs> {{}};\n"
f" src = {src_expr};\n"
f"in pkgs.rustPlatform.fetchCargoVendor {{\n"
f" inherit src;\n"
f" {subdir_attr}\n"
f" hash = pkgs.lib.fakeHash;\n"
f"}}"
)
p = subprocess.run(
["nix", "build", "--impure", "--expr", expr],
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
m = re.search(r"got:\s+(sha256-[A-Za-z0-9+/=]+)", p.stderr)
if m:
return m.group(1)
eprintln(f"nix_prefetch_cargo_vendor failed:\n{p.stderr[-600:]}")
return None
def http_get_json(url: str, token: Optional[str] = None) -> Any:
try:
req = urllib.request.Request(
url, headers={"Accept": "application/vnd.github+json"}
)
if token:
req.add_header("Authorization", f"Bearer {token}")
with urllib.request.urlopen(req, timeout=10) as resp:
if resp.status != 200:
return None
return json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
eprintln(f"HTTP error for {url}: {e.code} {e.reason}")
return None
except Exception as e:
eprintln(f"Request failed for {url}: {e}")
return None
def http_get_text(url: str) -> Optional[str]:
try:
# Provide a basic User-Agent to avoid some hosts rejecting the request
req = urllib.request.Request(url, headers={"User-Agent": "version-tui/1.0"})
with urllib.request.urlopen(req, timeout=10) as resp:
if resp.status != 200:
return None
return resp.read().decode("utf-8")
except urllib.error.HTTPError as e:
eprintln(f"HTTP error for {url}: {e.code} {e.reason}")
return None
except Exception as e:
eprintln(f"Request failed for {url}: {e}")
return None
def gh_latest_release(owner: str, repo: str, token: Optional[str]) -> Optional[str]:
try:
data = http_get_json(
f"https://api.github.com/repos/{owner}/{repo}/releases/latest", token
)
if not data:
return None
return data.get("tag_name")
except Exception as e:
eprintln(f"latest_release failed for {owner}/{repo}: {e}")
return None
def gh_latest_tag(owner: str, repo: str, token: Optional[str]) -> Optional[str]:
try:
data = http_get_json(
f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=100", token
)
if not isinstance(data, list):
return None
tags = [
t.get("name")
for t in data
if isinstance(t, dict) and "name" in t and t.get("name") is not None
]
return tags[0] if tags else None
except Exception as e:
eprintln(f"latest_tag failed for {owner}/{repo}: {e}")
return None
def gh_list_tags(owner: str, repo: str, token: Optional[str]) -> List[str]:
try:
data = http_get_json(
f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=100", token
)
return [
str(t.get("name"))
for t in data
if isinstance(t, dict) and "name" in t and t.get("name") is not None
]
except Exception as e:
eprintln(f"list_tags failed for {owner}/{repo}: {e}")
return []
def gh_head_commit(
owner: str, repo: str, branch: Optional[str] = None
) -> Optional[str]:
"""Return the latest commit SHA for a GitHub repo, optionally restricted to a branch."""
try:
ref = f"refs/heads/{branch}" if branch else "HEAD"
out = run_get_stdout(
["git", "ls-remote", f"https://github.com/{owner}/{repo}.git", ref]
)
if not out:
return None
# ls-remote can return multiple lines; take the first match
for line in out.splitlines():
parts = line.split()
if parts:
return parts[0]
return None
except Exception as e:
eprintln(f"head_commit failed for {owner}/{repo} (branch={branch}): {e}")
return None
def git_branch_commit(url: str, branch: Optional[str] = None) -> Optional[str]:
"""Return the latest commit SHA for a git URL, optionally restricted to a branch."""
try:
ref = f"refs/heads/{branch}" if branch else "HEAD"
out = run_get_stdout(["git", "ls-remote", url, ref])
if not out:
return None
for line in out.splitlines():
parts = line.split()
if parts:
return parts[0]
return None
except Exception as e:
eprintln(f"git_branch_commit failed for {url} (branch={branch}): {e}")
return None
def gh_tarball_url(owner: str, repo: str, ref: str) -> str:
return f"https://codeload.github.com/{owner}/{repo}/tar.gz/{ref}"
def gh_release_tags_api(owner: str, repo: str, token: Optional[str]) -> List[str]:
"""
Return recent release tag names for a repo using GitHub API.
"""
try:
data = http_get_json(
f"https://api.github.com/repos/{owner}/{repo}/releases?per_page=50", token
)
return [
str(r.get("tag_name"))
for r in data
if isinstance(r, dict) and "tag_name" in r and r.get("tag_name") is not None
]
except Exception as e:
eprintln(f"releases list failed for {owner}/{repo}: {e}")
return []
# ------------------------------ Data scanning ------------------------------
def find_packages() -> List[Tuple[str, Path, bool, bool]]:
results = []
# Find regular packages with version.json
for p in PKGS_DIR.rglob("version.json"):
# name is directory name under packages (e.g., raspberrypi/linux-rpi => raspberrypi/linux-rpi)
rel = p.relative_to(PKGS_DIR).parent
results.append(
(str(rel), p, False, False)
) # (name, path, is_python, is_homeassistant)
# Find Python packages with default.nix
python_dir = PKGS_DIR / "python"
if python_dir.exists():
for pkg_dir in python_dir.iterdir():
if pkg_dir.is_dir():
nix_file = pkg_dir / "default.nix"
if nix_file.exists():
# name is python/package-name
rel = pkg_dir.relative_to(PKGS_DIR)
results.append(
(str(rel), nix_file, True, False)
) # (name, path, is_python, is_homeassistant)
# Find Home Assistant components with default.nix
homeassistant_dir = PKGS_DIR / "homeassistant"
if homeassistant_dir.exists():
for pkg_dir in homeassistant_dir.iterdir():
if pkg_dir.is_dir():
nix_file = pkg_dir / "default.nix"
if nix_file.exists():
# Only treat as an HA component if it uses buildHomeAssistantComponent;
# otherwise fall through to Python package handling.
try:
nix_content = nix_file.read_text(encoding="utf-8")
except Exception:
nix_content = ""
rel = pkg_dir.relative_to(PKGS_DIR)
if "buildHomeAssistantComponent" in nix_content:
results.append(
(str(rel), nix_file, False, True)
) # (name, path, is_python, is_homeassistant)
else:
# Treat as a Python package instead
results.append((str(rel), nix_file, True, False))
results.sort()
return results
def _extract_brace_block(content: str, keyword: str) -> Optional[str]:
"""
Find 'keyword {' in content and return the text between the matching braces,
handling nested braces (e.g. ${var} inside strings).
Returns None if not found.
"""
idx = content.find(keyword + " {")
if idx == -1:
idx = content.find(keyword + "{")
if idx == -1:
return None
start = content.find("{", idx + len(keyword))
if start == -1:
return None
depth = 0
for i in range(start, len(content)):
c = content[i]
if c == "{":
depth += 1
elif c == "}":
depth -= 1
if depth == 0:
return content[start + 1 : i]
return None
def _resolve_nix_str(value: str, pname: str, version: str) -> str:
"""Resolve simple Nix string interpolations like ${pname} and ${version}."""
value = re.sub(r"\$\{pname\}", pname, value)
value = re.sub(r"\$\{version\}", version, value)
return value
def _extract_nix_attr(block: str, attr: str) -> str:
"""
Extract attribute value from a Nix attribute set block.
Handles:
attr = "quoted string";
attr = unquoted_ident;
Returns empty string if not found.
"""
# Quoted string value
m = re.search(rf'\b{attr}\s*=\s*"([^"]*)"', block)
if m:
return m.group(1)
# Unquoted identifier (e.g. repo = pname;)
m = re.search(rf"\b{attr}\s*=\s*([A-Za-z_][A-Za-z0-9_-]*)\s*;", block)
if m:
return m.group(1)
return ""
def parse_python_package(path: Path) -> Dict[str, Any]:
"""Parse a Python package's default.nix file to extract version and source information."""
with path.open("r", encoding="utf-8") as f:
content = f.read()
# Extract version
version_match = re.search(r'version\s*=\s*"([^"]+)"', content)
version = version_match.group(1) if version_match else ""
# Extract pname (package name)
pname_match = re.search(r'pname\s*=\s*"([^"]+)"', content)
pname = pname_match.group(1) if pname_match else ""
# Create a structure similar to version.json for compatibility
result: Dict[str, Any] = {"variables": {}, "sources": {}}
# Only add non-empty values to variables
if version:
result["variables"]["version"] = version
# Determine source name - use pname or derive from path
source_name = pname.lower() if pname else path.parent.name.lower()
# Try to extract brace-balanced fetchFromGitHub block (handles ${var} inside strings)
fetch_block = _extract_brace_block(content, "fetchFromGitHub")
# Check for fetchPypi pattern (simple [^}]+ is fine here as PyPI blocks lack ${})
fetch_pypi_match = re.search(
r"src\s*=\s*.*fetchPypi\s*\{([^}]+)\}", content, re.DOTALL
)
if fetch_block is not None:
owner_raw = _extract_nix_attr(fetch_block, "owner")
repo_raw = _extract_nix_attr(fetch_block, "repo")
rev_raw = _extract_nix_attr(fetch_block, "rev")
hash_match = re.search(r'(sha256|hash)\s*=\s*"([^"]+)"', fetch_block)
hash_value = hash_match.group(2) if hash_match else ""
def _resolve_nix_ident(raw: str) -> str:
"""Resolve unquoted Nix identifier or string-with-interpolation to its value."""
if raw == "pname":
return pname
if raw == "version":
return version
return _resolve_nix_str(raw, pname, version)
owner = _resolve_nix_ident(owner_raw)
repo = _resolve_nix_ident(repo_raw)
rev = _resolve_nix_ident(rev_raw)
# Create source entry
result["sources"][source_name] = {
"fetcher": "github",
"owner": owner,
"repo": repo,
"hash": hash_value,
}
# Classify rev as tag or commit ref
if rev:
if rev.startswith("v") or "${version}" in rev_raw:
result["sources"][source_name]["tag"] = rev
elif rev in ("master", "main"):
result["sources"][source_name]["rev"] = rev
else:
result["sources"][source_name]["rev"] = rev
elif fetch_pypi_match:
fetch_block_pypi = fetch_pypi_match.group(1)
hash_match = re.search(r'(sha256|hash)\s*=\s*"([^"]+)"', fetch_block_pypi)
hash_value = hash_match.group(2) if hash_match else ""
# Look for GitHub info in meta section
homepage_match = re.search(
r'homepage\s*=\s*"https://github\.com/([^/]+)/([^"]+)"', content
)
if homepage_match:
owner = homepage_match.group(1)
repo = homepage_match.group(2)
result["sources"][source_name] = {
"fetcher": "github",
"owner": owner,
"repo": repo,
"hash": hash_value,
"pypi": True,
}
if version:
result["sources"][source_name]["tag"] = f"v{version}"
else:
result["sources"][source_name] = {
"fetcher": "pypi",
"pname": pname,
"version": version,
"hash": hash_value,
}
else:
# Fallback: scan whole file for GitHub or URL info
owner_match = re.search(r'owner\s*=\s*"([^"]+)"', content)
repo_match = re.search(r'repo\s*=\s*"([^"]+)"', content)
rev_match = re.search(r'rev\s*=\s*"([^"]+)"', content)
tag_match = re.search(r'tag\s*=\s*"([^"]+)"', content)
hash_match = re.search(r'(sha256|hash)\s*=\s*"([^"]+)"', content)
url_match = re.search(r'url\s*=\s*"([^"]+)"', content)
homepage_match = re.search(
r'homepage\s*=\s*"https://github\.com/([^/]+)/([^"]+)"', content
)
owner = owner_match.group(1) if owner_match else ""
repo = repo_match.group(1) if repo_match else ""
rev = rev_match.group(1) if rev_match else ""
tag = tag_match.group(1) if tag_match else ""
hash_value = hash_match.group(2) if hash_match else ""
url = url_match.group(1) if url_match else ""
if homepage_match and not (owner and repo):
owner = homepage_match.group(1)
repo = homepage_match.group(2)
if owner and repo:
result["sources"][source_name] = {
"fetcher": "github",
"owner": owner,
"repo": repo,
"hash": hash_value,
}
if tag:
result["sources"][source_name]["tag"] = tag
elif rev:
result["sources"][source_name]["rev"] = rev
elif url:
result["sources"][source_name] = {
"fetcher": "url",
"url": url,
"hash": hash_value,
}
else:
result["sources"][source_name] = {"fetcher": "unknown", "hash": hash_value}
return result
def update_python_package(
path: Path, source_name: str, updates: Dict[str, Any]
) -> bool:
"""Update a Python package's default.nix file with new version and/or hash."""
with path.open("r", encoding="utf-8") as f:
content = f.read()
modified = False
# Update version if provided
if "version" in updates:
new_version = updates["version"]
content, version_count = re.subn(
r'(version\s*=\s*)"([^"]+)"', f'\\1"{new_version}"', content
)
if version_count > 0:
modified = True
# Update hash if provided
if "hash" in updates:
new_hash = updates["hash"]
# Match both sha256 and hash attributes
content, hash_count = re.subn(
r'(sha256|hash)\s*=\s*"([^"]+)"', f'\\1 = "{new_hash}"', content
)
if hash_count > 0:
modified = True
# Update tag if provided
if "tag" in updates:
new_tag = updates["tag"]
content, tag_count = re.subn(
r'(tag\s*=\s*)"([^"]+)"', f'\\1"{new_tag}"', content
)
if tag_count > 0:
modified = True
# Update rev if provided
if "rev" in updates:
new_rev = updates["rev"]
content, rev_count = re.subn(
r'(rev\s*=\s*)"([^"]+)"', f'\\1"{new_rev}"', content
)
if rev_count > 0:
modified = True
if modified:
with path.open("w", encoding="utf-8") as f:
f.write(content)
return modified
def parse_homeassistant_component(path: Path) -> Dict[str, Any]:
"""Parse a Home Assistant component's default.nix file to extract version and source information."""
with path.open("r", encoding="utf-8") as f:
content = f.read()
# Extract domain, version, and owner
domain_match = re.search(r'domain\s*=\s*"([^"]+)"', content)
version_match = re.search(r'version\s*=\s*"([^"]+)"', content)
owner_match = re.search(r'owner\s*=\s*"([^"]+)"', content)
domain = domain_match.group(1) if domain_match else ""
version = version_match.group(1) if version_match else ""
owner = owner_match.group(1) if owner_match else ""
# Extract GitHub repo info
repo_match = re.search(r'repo\s*=\s*"([^"]+)"', content)
rev_match = re.search(r'rev\s*=\s*"([^"]+)"', content)
tag_match = re.search(r'tag\s*=\s*"([^"]+)"', content)
hash_match = re.search(r'(sha256|hash)\s*=\s*"([^"]+)"', content)
repo = repo_match.group(1) if repo_match else ""
rev = rev_match.group(1) if rev_match else ""
tag = tag_match.group(1) if tag_match else ""
hash_value = hash_match.group(2) if hash_match else ""
# Create a structure similar to version.json for compatibility
result = {"variables": {}, "sources": {}}
# Only add non-empty values to variables
if version:
result["variables"]["version"] = version
if domain:
result["variables"]["domain"] = domain
# Determine source name - use domain or directory name
source_name = domain if domain else path.parent.name.lower()
# Handle GitHub sources
if owner:
repo_name = repo if repo else source_name
result["sources"][source_name] = {
"fetcher": "github",
"owner": owner,
"repo": repo_name,
}
# Only add non-empty values
if hash_value:
result["sources"][source_name]["hash"] = hash_value
# Handle tag or rev; resolve ${version} references
if tag:
result["sources"][source_name]["tag"] = _resolve_nix_str(tag, "", version)
elif rev:
rev_resolved = _resolve_nix_str(rev, "", version)
# If rev was a ${version} template or equals version, treat as tag
if "${version}" in rev or rev_resolved == version:
result["sources"][source_name]["tag"] = rev_resolved
else:
result["sources"][source_name]["rev"] = rev_resolved
elif version: # fallback: use version as tag
result["sources"][source_name]["tag"] = version
else:
# Fallback for components with no clear source info
result["sources"][source_name] = {"fetcher": "unknown"}
if hash_value:
result["sources"][source_name]["hash"] = hash_value
return result
def update_homeassistant_component(
path: Path, source_name: str, updates: Dict[str, Any]
) -> bool:
"""Update a Home Assistant component's default.nix file with new version and/or hash."""
with path.open("r", encoding="utf-8") as f:
content = f.read()
modified = False
# Update version if provided
if "version" in updates:
new_version = updates["version"]
content, version_count = re.subn(
r'(version\s*=\s*)"([^"]+)"', f'\\1"{new_version}"', content
)
if version_count > 0:
modified = True
# Update hash if provided
if "hash" in updates:
new_hash = updates["hash"]
# Match both sha256 and hash attributes in src = fetchFromGitHub { ... }
content, hash_count = re.subn(
r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(sha256|hash)\s*=\s*"([^"]+)"([^}]*\})',
f'\\1\\2 = "{new_hash}"\\4',
content,
)
if hash_count > 0:
modified = True
# Update tag if provided
if "tag" in updates:
new_tag = updates["tag"]
content, tag_count = re.subn(
r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(tag|rev)\s*=\s*"([^"]+)"([^}]*\})',
f'\\1\\2 = "{new_tag}"\\4',
content,
)
if tag_count == 0: # If no tag/rev found, try to add it
content, tag_count = re.subn(
r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(hash\s*=\s*"[^"]+")([^}]*\})',
f'\\1\\2;\n tag = "{new_tag}"\\3',
content,
)
if tag_count > 0:
modified = True
# Update rev if provided
if "rev" in updates:
new_rev = updates["rev"]
content, rev_count = re.subn(
r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(rev|tag)\s*=\s*"([^"]+)"([^}]*\})',
f'\\1\\2 = "{new_rev}"\\4',
content,
)
if rev_count == 0: # If no rev/tag found, try to add it
content, rev_count = re.subn(
r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(hash\s*=\s*"[^"]+")([^}]*\})',
f'\\1\\2;\n rev = "{new_rev}"\\3',
content,
)
if rev_count > 0:
modified = True
if modified:
with path.open("w", encoding="utf-8") as f:
f.write(content)
return modified
# ------------------------------ TUI helpers ------------------------------
# Define color pairs
COLOR_NORMAL = 1
COLOR_HIGHLIGHT = 2
COLOR_HEADER = 3
COLOR_STATUS = 4
COLOR_ERROR = 5
COLOR_SUCCESS = 6
COLOR_BORDER = 7
COLOR_TITLE = 8
def init_colors():
"""Initialize color pairs for the TUI."""
curses.start_color()
curses.use_default_colors()
# Define color pairs
curses.init_pair(COLOR_NORMAL, curses.COLOR_WHITE, -1)
curses.init_pair(COLOR_HIGHLIGHT, curses.COLOR_BLACK, curses.COLOR_CYAN)
curses.init_pair(COLOR_HEADER, curses.COLOR_CYAN, -1)
curses.init_pair(COLOR_STATUS, curses.COLOR_YELLOW, -1)
curses.init_pair(COLOR_ERROR, curses.COLOR_RED, -1)
curses.init_pair(COLOR_SUCCESS, curses.COLOR_GREEN, -1)
curses.init_pair(COLOR_BORDER, curses.COLOR_BLUE, -1)
curses.init_pair(COLOR_TITLE, curses.COLOR_MAGENTA, -1)
def draw_border(win, y, x, h, w):
"""Draw a border around a region of the window."""
# Draw corners
win.addch(y, x, curses.ACS_ULCORNER, curses.color_pair(COLOR_BORDER))
win.addch(y, x + w - 1, curses.ACS_URCORNER, curses.color_pair(COLOR_BORDER))
win.addch(y + h - 1, x, curses.ACS_LLCORNER, curses.color_pair(COLOR_BORDER))
# Draw bottom-right corner safely
try:
win.addch(
y + h - 1, x + w - 1, curses.ACS_LRCORNER, curses.color_pair(COLOR_BORDER)
)
except curses.error:
# This is expected when trying to write to the bottom-right corner
pass
# Draw horizontal lines
for i in range(1, w - 1):
win.addch(y, x + i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER))
win.addch(y + h - 1, x + i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER))
# Draw vertical lines
for i in range(1, h - 1):
win.addch(y + i, x, curses.ACS_VLINE, curses.color_pair(COLOR_BORDER))
win.addch(y + i, x + w - 1, curses.ACS_VLINE, curses.color_pair(COLOR_BORDER))
class ScreenBase:
def __init__(self, stdscr):
self.stdscr = stdscr
self.status = ""
self.status_type = "normal" # "normal", "error", "success"
def draw_status(self, height, width):
if self.status:
color = COLOR_STATUS
if self.status_type == "error":
color = COLOR_ERROR
elif self.status_type == "success":
color = COLOR_SUCCESS
self.stdscr.addstr(
height - 1,
0,
self.status[: max(0, width - 1)],
curses.color_pair(color),
)
else:
self.stdscr.addstr(
height - 1,
0,
"q: quit, Backspace: back, Enter: select",
curses.color_pair(COLOR_STATUS),
)
def set_status(self, text: str, status_type="normal"):
self.status = text
self.status_type = status_type
def run(self):
raise NotImplementedError
def prompt_input(stdscr, prompt: str) -> Optional[str]:
curses.echo()
stdscr.addstr(prompt, curses.color_pair(COLOR_HEADER))
stdscr.clrtoeol()
s = stdscr.getstr().decode("utf-8")
curses.noecho()
return s
def show_popup(stdscr, lines: List[str], title: str = ""):
h, w = stdscr.getmaxyx()
box_h = min(len(lines) + 4, h - 2)
box_w = min(max(max(len(l) for l in lines), len(title)) + 6, w - 2)
top = (h - box_h) // 2
left = (w - box_w) // 2
win = curses.newwin(box_h, box_w, top, left)
# Draw fancy border
draw_border(win, 0, 0, box_h, box_w)
# Add title if provided
if title:
title_x = (box_w - len(title)) // 2
win.addstr(0, title_x, f" {title} ", curses.color_pair(COLOR_TITLE))
# Add content
for i, line in enumerate(lines, start=1):
if i >= box_h - 1:
break
win.addstr(i, 2, line[: box_w - 4], curses.color_pair(COLOR_NORMAL))
# Add footer
footer = "Press any key to continue"
footer_x = (box_w - len(footer)) // 2
win.addstr(box_h - 1, footer_x, footer, curses.color_pair(COLOR_STATUS))
win.refresh()
win.getch()
# ------------------------------ Screens ------------------------------
class PackagesScreen(ScreenBase):
def __init__(self, stdscr):
super().__init__(stdscr)
self.packages = find_packages()
self.idx = 0
self.filter_mode = "all" # "all", "regular", "python"
self.scroll_offset = 0 # Add scroll offset to handle long lists
def run(self):
while True:
self.stdscr.clear()
h, w = self.stdscr.getmaxyx()
# Determine split layout
left_w = max(30, min(60, w // 3))
right_x = left_w + 1
right_w = max(0, w - right_x)
# Draw borders for left and right panes
draw_border(self.stdscr, 0, 0, h - 1, left_w)
if right_w >= 20:
draw_border(self.stdscr, 0, right_x, h - 1, right_w)
# Left pane: package list
title = "Packages"
if self.filter_mode == "regular":
title = "Packages (version.json)"
elif self.filter_mode == "python":
title = "Python Packages"
else:
title = "All Packages [f to filter]"
# Center the title in the left pane
title_x = (left_w - len(title)) // 2
self.stdscr.addstr(
0, title_x, f" {title} ", curses.color_pair(COLOR_TITLE) | curses.A_BOLD
)
# Filter packages based on mode
filtered_packages = self.packages
if self.filter_mode == "regular":
filtered_packages = [
p for p in self.packages if not p[2]
] # Not Python packages
elif self.filter_mode == "python":
filtered_packages = [
p for p in self.packages if p[2]
] # Only Python packages
# Implement scrolling for long lists
max_rows = h - 3
total_packages = len(filtered_packages)
# Adjust scroll offset if needed
if self.idx >= self.scroll_offset + max_rows:
self.scroll_offset = self.idx - max_rows + 1
elif self.idx < self.scroll_offset:
self.scroll_offset = self.idx
# Display visible packages with scroll offset
visible_packages = filtered_packages[
self.scroll_offset : self.scroll_offset + max_rows
]
# Show scroll indicators if needed
if self.scroll_offset > 0:
self.stdscr.addstr(1, left_w - 3, "", curses.color_pair(COLOR_STATUS))
if self.scroll_offset + max_rows < total_packages:
self.stdscr.addstr(
min(1 + len(visible_packages), h - 2),
left_w - 3,
"",
curses.color_pair(COLOR_STATUS),
)
for i, (name, _path, is_python, is_homeassistant) in enumerate(
visible_packages, start=0
):
# Use consistent display style for all packages
pkg_type = "" # Remove the [Py] prefix for consistent display
# Highlight the selected item
if i + self.scroll_offset == self.idx:
attr = curses.color_pair(COLOR_HIGHLIGHT)
sel = "" # Use a fancier selector
else:
attr = curses.color_pair(COLOR_NORMAL)
sel = " "
# Add a small icon for Python packages or Home Assistant components
if is_python:
pkg_type = "🐍 " # Python icon
elif is_homeassistant:
pkg_type = "🏠 " # Home Assistant icon
self.stdscr.addstr(
1 + i, 2, f"{sel} {pkg_type}{name}"[: max(0, left_w - 5)], attr
)
# Right pane: preview of selected package (non-interactive summary)
if right_w >= 20 and filtered_packages:
try:
name, path, is_python, is_homeassistant = filtered_packages[
self.idx
]
# Center the package name in the right pane header
title_x = right_x + (right_w - len(name)) // 2
self.stdscr.addstr(
0,
title_x,
f" {name} ",
curses.color_pair(COLOR_TITLE) | curses.A_BOLD,
)
# Path with a nice label
self.stdscr.addstr(
1, right_x + 2, "Path:", curses.color_pair(COLOR_HEADER)
)
self.stdscr.addstr(
1,
right_x + 8,
f"{path}"[: max(0, right_w - 10)],
curses.color_pair(COLOR_NORMAL),
)
# Sources header
self.stdscr.addstr(
2, right_x + 2, "Sources:", curses.color_pair(COLOR_HEADER)
)
if is_python:
spec = parse_python_package(path)
elif is_homeassistant:
spec = parse_homeassistant_component(path)
else:
spec = load_json(path)
merged_vars, merged_srcs, _ = merged_view(spec, None)
snames = sorted(list(merged_srcs.keys()))
max_src_rows = max(0, h - 6)
for i2, sname in enumerate(snames[:max_src_rows], start=0):
comp = merged_srcs[sname]
fetcher = comp.get("fetcher", "none")
# Construct concise reference similar to detail view
display_ref = (
comp.get("tag")
or comp.get("rev")
or comp.get("version")
or ""
)
if fetcher == "github":
rendered = render_templates(comp, merged_vars)
tag = rendered.get("tag")
rev = rendered.get("rev")
owner = (
rendered.get("owner") or merged_vars.get("owner") or ""
)
repo = rendered.get("repo") or merged_vars.get("repo") or ""
if tag and owner and repo:
display_ref = f"{owner}/{repo}@{tag}"
elif tag:
display_ref = tag
elif rev and owner and repo:
display_ref = f"{owner}/{repo}@{rev[:7]}"
elif rev:
display_ref = rev[:12]
elif fetcher == "url":
rendered = render_templates(comp, merged_vars)
url = (
rendered.get("url") or rendered.get("urlTemplate") or ""
)
if url:
owner = str(merged_vars.get("owner", "") or "")
repo = str(merged_vars.get("repo", "") or "")
rp = str(merged_vars.get("releasePrefix", "") or "")
rs = str(merged_vars.get("releaseSuffix", "") or "")
base = str(merged_vars.get("base", "") or "")
rel = str(merged_vars.get("release", "") or "")
tag = f"{rp}{base}-{rel}{rs}" if (base and rel) else ""
parsed = urlparse(url)
filename = (
os.path.basename(parsed.path)
if parsed and parsed.path
else ""
)
if owner and repo and tag and filename:
display_ref = f"{owner}/{repo}@{tag} · {filename}"
elif filename:
display_ref = filename
else:
display_ref = url
else:
display_ref = ""
# Truncate reference to fit right pane
if isinstance(display_ref, str):
max_ref = max(0, right_w - 30)
ref_short = display_ref[:max_ref] + (
"..." if len(display_ref) > max_ref else ""
)
else:
ref_short = display_ref
# Color-code the fetcher type
fetcher_color = COLOR_NORMAL
if fetcher == "github":
fetcher_color = COLOR_SUCCESS
elif fetcher == "url":
fetcher_color = COLOR_STATUS
elif fetcher == "git":
fetcher_color = COLOR_HEADER
# Display source name
self.stdscr.addstr(
3 + i2,
right_x + 2,
f"{sname:<18}",
curses.color_pair(COLOR_NORMAL),
)
# Display fetcher with color
self.stdscr.addstr(
3 + i2,
right_x + 21,
f"{fetcher:<7}",
curses.color_pair(fetcher_color),
)
# Display reference
self.stdscr.addstr(
3 + i2,
right_x + 29,
f"{ref_short}"[: max(0, right_w - 31)],
curses.color_pair(COLOR_NORMAL),
)
# Hint line for workflow
hint = "Enter: open details | k/j: move | q: quit"
if h >= 5:
hint_x = right_x + (right_w - len(hint)) // 2
self.stdscr.addstr(
h - 5,
hint_x,
hint[: max(0, right_w - 1)],
curses.color_pair(COLOR_STATUS),
)
except Exception as e:
self.stdscr.addstr(
2, right_x + 2, "Error:", curses.color_pair(COLOR_ERROR)
)
self.stdscr.addstr(
2,
right_x + 9,
f"{e}"[: max(0, right_w - 11)],
curses.color_pair(COLOR_ERROR),
)
self.draw_status(h, w)
self.stdscr.refresh()
ch = self.stdscr.getch()
if ch in (ord("q"), 27): # q or ESC
return None
elif ch in (curses.KEY_UP, ord("k")):
self.idx = max(0, self.idx - 1)
elif ch in (curses.KEY_DOWN, ord("j")):
self.idx = min(len(filtered_packages) - 1, self.idx + 1)
elif ch == curses.KEY_PPAGE: # Page Up
self.idx = max(0, self.idx - (h - 4))
elif ch == curses.KEY_NPAGE: # Page Down
self.idx = min(len(filtered_packages) - 1, self.idx + (h - 4))
elif ch == ord("g"): # Go to top
self.idx = 0
elif ch == ord("G"): # Go to bottom
self.idx = max(0, len(filtered_packages) - 1)
elif ch == ord("f"):
# Cycle through filter modes
if self.filter_mode == "all":
self.filter_mode = "regular"
elif self.filter_mode == "regular":
self.filter_mode = "python"
else:
self.filter_mode = "all"
self.idx = 0 # Reset selection when changing filters
elif ch in (curses.KEY_ENTER, 10, 13):
filtered_packages = self.packages
if self.filter_mode == "regular":
filtered_packages = [p for p in self.packages if not p[2]]
elif self.filter_mode == "python":
filtered_packages = [p for p in self.packages if p[2]]
if not filtered_packages:
continue
name, path, is_python, is_homeassistant = filtered_packages[self.idx]
try:
if is_python:
spec = parse_python_package(path)
elif is_homeassistant:
spec = parse_homeassistant_component(path)
else:
spec = load_json(path)
except Exception as e:
self.set_status(f"Failed to load {path}: {e}")
continue
screen = PackageDetailScreen(
self.stdscr, name, path, spec, is_python, is_homeassistant
)
ret = screen.run()
if ret == "reload":
# re-scan on save
self.packages = find_packages()
self.idx = min(self.idx, len(self.packages) - 1)
else:
pass
class PackageDetailScreen(ScreenBase):
def __init__(
self,
stdscr,
pkg_name: str,
path: Path,
spec: Json,
is_python: bool = False,
is_homeassistant: bool = False,
):
super().__init__(stdscr)
self.pkg_name = pkg_name
self.path = path
self.spec = spec
self.is_python = is_python
self.is_homeassistant = is_homeassistant
self.variants = ["<base>"] + sorted(list(self.spec.get("variants", {}).keys()))
self.vidx = 0
self.gh_token = os.environ.get("GITHUB_TOKEN")
self.candidates: Dict[
str, Dict[str, str]
] = {} # name -> {release, tag, commit}
self.url_candidates: Dict[
str, Dict[str, str]
] = {} # name -> {base, release, tag}
# initialize view
self.recompute_view()
def select_variant(self):
# Recompute merged and target views when variant changes
self.recompute_view()
def recompute_view(self):
# Set cursor to base or selected variant dict for manual edits
if self.vidx == 0:
self.cursor = self.spec
variant_name = None
else:
variant_name = self.variants[self.vidx]
self.cursor = self.spec["variants"][variant_name]
# Compute merged view and target dict for writing
self.merged_vars, self.merged_srcs, self.target_dict = merged_view(
self.spec, variant_name
)
self.snames = sorted(list(self.merged_srcs.keys()))
self.sidx = 0
def fetch_candidates_for(self, name: str):
comp = self.merged_srcs[name]
fetcher = comp.get("fetcher", "none")
branch = comp.get("branch") or None # optional branch override
c = {"release": "", "tag": "", "commit": ""}
if fetcher == "github":
owner = comp.get("owner")
repo = comp.get("repo")
if owner and repo:
# Only fetch release/tag candidates when not locked to a specific branch
if not branch:
r = gh_latest_release(owner, repo, self.gh_token)
if r:
c["release"] = r
t = gh_latest_tag(owner, repo, self.gh_token)
if t:
c["tag"] = t
m = gh_head_commit(owner, repo, branch)
if m:
c["commit"] = m
# Special-case raspberrypi/linux: prefer latest stable_* tag or series-specific tags
# (only when not branch-locked, as branch-locked tracks a rolling branch via commit)
if not branch:
try:
if owner == "raspberrypi" and repo == "linux":
tags_all = gh_list_tags(owner, repo, self.gh_token)
rendered = render_templates(comp, self.merged_vars)
cur_tag = str(rendered.get("tag") or "")
# If current tag uses stable_YYYYMMDD scheme, pick latest stable_* tag
if cur_tag.startswith("stable_"):
stable_tags = sorted(
[
x
for x in tags_all
if re.match(r"^stable_\d{8}$", x)
],
reverse=True,
)
if stable_tags:
c["tag"] = stable_tags[0]
else:
# Try to pick a tag matching the current major.minor series if available
mm = str(self.merged_vars.get("modDirVersion") or "")
m2 = re.match(r"^(\d+)\.(\d+)", mm)
if m2:
base = f"rpi-{m2.group(1)}.{m2.group(2)}"
series_tags = [
x
for x in tags_all
if (
x == f"{base}.y"
or x.startswith(f"{base}.y")
or x.startswith(f"{base}.")
)
]
series_tags.sort(reverse=True)
if series_tags:
c["tag"] = series_tags[0]
except Exception as _e:
# Fallback to previously computed values
pass
elif fetcher == "git":
url = comp.get("url")
if url:
commit = git_branch_commit(url, branch)
if commit:
c["commit"] = commit
elif fetcher == "url":
# Heuristic for GitHub release assets with variables in version.json (e.g., proton-cachyos)
owner = self.merged_vars.get("owner")
repo = self.merged_vars.get("repo")
if owner and repo:
tags = gh_release_tags_api(str(owner), str(repo), self.gh_token)
prefix = str(self.merged_vars.get("releasePrefix", ""))
suffix = str(self.merged_vars.get("releaseSuffix", ""))
latest = next(
(
t
for t in tags
if (t and t.startswith(prefix) and t.endswith(suffix))
),
None,
)
if latest:
c["release"] = latest
mid = latest
if prefix and mid.startswith(prefix):
mid = mid[len(prefix) :]
if suffix and mid.endswith(suffix):
mid = mid[: -len(suffix)]
parts = mid.split("-")
if len(parts) >= 2:
base, rel = parts[0], parts[-1]
self.url_candidates[name] = {
"base": base,
"release": rel,
"tag": latest,
}
self.candidates[name] = c
def prefetch_hash_for(self, name: str) -> Optional[str]:
comp = self.merged_srcs[name]
fetcher = comp.get("fetcher", "none")
if fetcher == "github":
owner = comp.get("owner")
repo = comp.get("repo")
rendered = render_templates(comp, self.merged_vars)
ref = rendered.get("tag") or rendered.get("rev")
if owner and repo and ref:
url = gh_tarball_url(owner, repo, ref)
return nix_prefetch_url(url)
elif fetcher == "git":
url = comp.get("url")
rev = comp.get("rev")
if url and rev:
return nix_prefetch_git(url, rev)
elif fetcher == "url":
rendered = render_templates(comp, self.merged_vars)
url = rendered.get("url") or rendered.get("urlTemplate")
if url:
return nix_prefetch_url(url)
return None
def prefetch_cargo_hash_for(self, name: str) -> Optional[str]:
"""
Compute the cargo vendor hash for a source component that carries a
'cargoHash' field (or a linked 'cargoDeps' sibling source).
Uses nix_prefetch_cargo_vendor() which builds fetchCargoVendor with
lib.fakeHash and parses the correct hash from the error output.
"""
comp = self.merged_srcs[name]
fetcher = comp.get("fetcher", "none")
src_hash = comp.get("hash", "")
subdir = comp.get("cargoSubdir", "")
rendered = render_templates(comp, self.merged_vars)
if fetcher == "github":
owner = comp.get("owner", "")
repo = comp.get("repo", "")
ref = rendered.get("tag") or rendered.get("rev") or ""
if owner and repo and ref and src_hash:
return nix_prefetch_cargo_vendor(
"github",
src_hash,
owner=owner,
repo=repo,
rev=ref,
subdir=subdir,
)
elif fetcher == "git":
url = comp.get("url", "")
rev = rendered.get("rev") or rendered.get("tag") or ""
if url and rev and src_hash:
return nix_prefetch_cargo_vendor(
"git",
src_hash,
url=url,
rev=rev,
subdir=subdir,
)
return None
def _source_has_cargo(self, name: str) -> bool:
"""Return True if this source carries a cargoHash field."""
comp = self.merged_srcs.get(name, {})
return "cargoHash" in comp
def _apply_cargo_hash_to_sibling(self, name: str, cargo_hash: str):
"""
Propagate a freshly-computed cargo hash to any sibling source that mirrors
the cargoDeps pattern — a source whose only meaningful field is "hash" and
which is meant to stay in sync with the main source's cargoHash.
Detection heuristic (any match triggers update):
- Sibling is literally named "cargoDeps", OR
- Sibling has no fetcher and its only field is "hash" (pure hash mirror)
"""
ts = self.target_dict.setdefault("sources", {})
for sibling_name, sibling in list(self.merged_srcs.items()):
if sibling_name == name:
continue
has_fetcher = bool(sibling.get("fetcher"))
non_fetcher_keys = [k for k in sibling.keys() if k != "fetcher"]
is_cargo_deps = sibling_name == "cargoDeps"
is_hash_only = not has_fetcher and non_fetcher_keys == ["hash"]
if is_cargo_deps or is_hash_only:
sw = ts.setdefault(sibling_name, {})
sw["hash"] = cargo_hash
def cachyos_suffix(self) -> str:
if self.vidx == 0:
return ""
v = self.variants[self.vidx]
mapping = {"rc": "-rc", "hardened": "-hardened", "lts": "-lts"}
return mapping.get(v, "")
def fetch_cachyos_linux_latest(self, suffix: str) -> Optional[str]:
"""
Try to determine latest linux version from upstream:
- Prefer .SRCINFO (preprocessed)
- Fallback to PKGBUILD (parse pkgver= line)
Tries both 'CachyOS' and 'cachyos' org casing just in case.
"""
bases = [
"https://raw.githubusercontent.com/CachyOS/linux-cachyos/master",
"https://raw.githubusercontent.com/cachyos/linux-cachyos/master",
]
paths = [
f"linux-cachyos{suffix}/.SRCINFO",
f"linux-cachyos{suffix}/PKGBUILD",
]
def parse_srcinfo(text: str) -> Optional[str]:
m = re.search(r"^\s*pkgver\s*=\s*([^\s#]+)\s*$", text, re.MULTILINE)
if not m:
return None
v = m.group(1).strip()
return v
def parse_pkgbuild(text: str) -> Optional[str]:
# Parse assignments and expand variables in pkgver
# Build a simple env map from VAR=value lines
env: Dict[str, str] = {}
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
m_assign = re.match(r"^\s*([A-Za-z_][A-Za-z0-9_]*)\s*=\s*(.+)$", line)
if m_assign:
key = m_assign.group(1)
val = m_assign.group(2).strip()
# Remove trailing comments
val = re.sub(r"\s+#.*$", "", val).strip()
# Strip surrounding quotes
if (val.startswith('"') and val.endswith('"')) or (
val.startswith("'") and val.endswith("'")
):
val = val[1:-1]
env[key] = val
m = re.search(r"^\s*pkgver\s*=\s*(.+)$", text, re.MULTILINE)
if not m:
return None
raw = m.group(1).strip()
# Strip quotes
if (raw.startswith('"') and raw.endswith('"')) or (
raw.startswith("'") and raw.endswith("'")
):
raw = raw[1:-1]
def expand_vars(s: str) -> str:
def repl_braced(mb):
key = mb.group(1)
return env.get(key, mb.group(0)) or mb.group(0)
def repl_unbraced(mu):
key = mu.group(1)
return env.get(key, mu.group(0)) or mu.group(0)
# Expand ${var} then $var
s = re.sub(r"\$\{([^}]+)\}", repl_braced, s)
s = re.sub(r"\$([A-Za-z_][A-Za-z0-9_]*)", repl_unbraced, s)
return s
v = expand_vars(raw).strip()
# normalize rc form like 6.19.rc6 -> 6.19-rc6
v = v.replace(".rc", "-rc")
return v
# Try .SRCINFO first, then PKGBUILD
for base in bases:
# .SRCINFO
url = f"{base}/{paths[0]}"
text = http_get_text(url)
if text:
ver = parse_srcinfo(text)
if ver:
return ver.replace(".rc", "-rc")
# PKGBUILD fallback
url = f"{base}/{paths[1]}"
text = http_get_text(url)
if text:
ver = parse_pkgbuild(text)
if ver:
return ver.replace(".rc", "-rc")
return None
def linux_tarball_url_for_version(self, version: str) -> str:
# Use torvalds snapshot for -rc, stable releases from CDN
if "-rc" in version:
return f"https://git.kernel.org/torvalds/t/linux-{version}.tar.gz"
parts = version.split(".")
major = parts[0] if parts else "6"
major_minor = ".".join(parts[:2]) if len(parts) >= 2 else version
ver_for_tar = major_minor if version.endswith(".0") else version
return f"https://cdn.kernel.org/pub/linux/kernel/v{major}.x/linux-{ver_for_tar}.tar.xz"
def update_linux_from_pkgbuild(self, name: str):
suffix = self.cachyos_suffix()
latest = self.fetch_cachyos_linux_latest(suffix)
if not latest:
self.set_status("linux: failed to get version from PKGBUILD")
return
url = self.linux_tarball_url_for_version(latest)
sri = nix_prefetch_url(url)
if not sri:
self.set_status("linux: prefetch failed")
return
ts = self.target_dict.setdefault("sources", {})
compw = ts.setdefault(name, {})
compw["version"] = latest
compw["hash"] = sri
self._refresh_merged()
self.set_status(f"{name}: updated version to {latest} and refreshed hash")
def _refresh_merged(self):
"""Re-compute merged_vars/merged_srcs/target_dict without resetting sidx."""
variant_name = None if self.vidx == 0 else self.variants[self.vidx]
self.merged_vars, self.merged_srcs, self.target_dict = merged_view(
self.spec, variant_name
)
self.snames = sorted(list(self.merged_srcs.keys()))
# Clamp sidx in case source list changed
if self.snames:
self.sidx = min(self.sidx, len(self.snames) - 1)
def set_ref(self, name: str, kind: str, value: str):
# Write to selected target dict (base or variant override)
ts = self.target_dict.setdefault("sources", {})
comp = ts.setdefault(name, {})
if kind in ("release", "tag"):
comp["tag"] = value
if "rev" in comp:
del comp["rev"]
elif kind == "commit":
comp["rev"] = value
if "tag" in comp:
del comp["tag"]
# Refresh merged_srcs so prefetch_hash_for sees the updated ref
self._refresh_merged()
def save(self):
if self.is_python:
# For Python packages, update the default.nix file
for name in self.snames:
source = self.merged_srcs[name]
updates = {}
# Get version from variables
if "version" in self.merged_vars:
updates["version"] = self.merged_vars["version"]
# Get hash from source
if "hash" in source:
updates["hash"] = source["hash"]
# Get tag from source
if "tag" in source:
updates["tag"] = source["tag"]
# Get rev from source
if "rev" in source:
updates["rev"] = source["rev"]
if updates:
update_python_package(self.path, name, updates)
return True
elif self.is_homeassistant:
# For Home Assistant components, update the default.nix file
for name in self.snames:
source = self.merged_srcs[name]
updates = {}
# Get version from variables
if "version" in self.merged_vars:
updates["version"] = self.merged_vars["version"]
# Get hash from source
if "hash" in source:
updates["hash"] = source["hash"]
# Get tag from source
if "tag" in source:
updates["tag"] = source["tag"]
# Get rev from source
if "rev" in source:
updates["rev"] = source["rev"]
if updates:
update_homeassistant_component(self.path, name, updates)
return True
else:
# For regular packages, save to version.json
save_json(self.path, self.spec)
return True
def run(self):
while True:
self.stdscr.clear()
h, w = self.stdscr.getmaxyx()
# Draw main border around the entire screen
draw_border(self.stdscr, 0, 0, h - 1, w)
# Title with package name and path
title = f"{self.pkg_name} [{self.path}]"
if self.is_python:
title += " [Python Package]"
# Center the title
title_x = (w - len(title)) // 2
self.stdscr.addstr(
0, title_x, f" {title} ", curses.color_pair(COLOR_TITLE) | curses.A_BOLD
)
# Variant line with highlighting for selected variant
if not self.is_python:
vline_parts = []
for i, v in enumerate(self.variants):
if i == self.vidx:
vline_parts.append(f"[{v}]")
else:
vline_parts.append(v)
vline = "Variants: " + " | ".join(vline_parts)
self.stdscr.addstr(1, 2, "Variants:", curses.color_pair(COLOR_HEADER))
# Display each variant with appropriate highlighting
x_pos = 12 # Position after "Variants: "
for i, v in enumerate(self.variants):
if i > 0:
self.stdscr.addstr(
1, x_pos, " | ", curses.color_pair(COLOR_NORMAL)
)
x_pos += 3
if i == self.vidx:
self.stdscr.addstr(
1, x_pos, f"[{v}]", curses.color_pair(COLOR_HIGHLIGHT)
)
x_pos += len(f"[{v}]")
else:
self.stdscr.addstr(1, x_pos, v, curses.color_pair(COLOR_NORMAL))
x_pos += len(v)
else:
# For Python packages, show version instead of variants
version = self.merged_vars.get("version", "")
self.stdscr.addstr(1, 2, "Version:", curses.color_pair(COLOR_HEADER))
self.stdscr.addstr(1, 11, version, curses.color_pair(COLOR_SUCCESS))
# Sources header with decoration
self.stdscr.addstr(
2, 2, "Sources:", curses.color_pair(COLOR_HEADER) | curses.A_BOLD
)
# Draw a separator line under the header
for i in range(1, w - 1):
self.stdscr.addch(
3, i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER)
)
# List sources
for i, name in enumerate(self.snames[: h - 10], start=0):
comp = self.merged_srcs[name]
fetcher = comp.get("fetcher", "none")
# Render refs so variables resolve; compress long forms for display
display_ref = (
comp.get("tag") or comp.get("rev") or comp.get("version") or ""
)
if fetcher == "github":
rendered = render_templates(comp, self.merged_vars)
tag = rendered.get("tag")
rev = rendered.get("rev")
owner = rendered.get("owner") or self.merged_vars.get("owner") or ""
repo = rendered.get("repo") or self.merged_vars.get("repo") or ""
if tag and owner and repo:
display_ref = f"{owner}/{repo}@{tag}"
elif tag:
display_ref = tag
elif rev and owner and repo:
display_ref = f"{owner}/{repo}@{rev[:7]}"
elif rev:
display_ref = rev[:12]
elif fetcher == "url":
rendered = render_templates(comp, self.merged_vars)
url = rendered.get("url") or rendered.get("urlTemplate") or ""
if url:
# Prefer a concise label like owner/repo@tag · filename
owner = str(self.merged_vars.get("owner", "") or "")
repo = str(self.merged_vars.get("repo", "") or "")
rp = str(self.merged_vars.get("releasePrefix", "") or "")
rs = str(self.merged_vars.get("releaseSuffix", "") or "")
base = str(self.merged_vars.get("base", "") or "")
rel = str(self.merged_vars.get("release", "") or "")
tag = f"{rp}{base}-{rel}{rs}" if (base and rel) else ""
parsed = urlparse(url)
filename = (
os.path.basename(parsed.path)
if parsed and parsed.path
else ""
)
if owner and repo and tag and filename:
display_ref = f"{owner}/{repo}@{tag} · {filename}"
elif filename:
display_ref = filename
else:
display_ref = url
else:
display_ref = ""
ref_short = (
display_ref
if not isinstance(display_ref, str)
else (display_ref[:60] + ("..." if len(display_ref) > 60 else ""))
)
# Determine colors and styles based on selection and fetcher type
if i == self.sidx:
# Selected item
attr = curses.color_pair(COLOR_HIGHLIGHT)
sel = "" # Use a fancier selector
else:
# Non-selected item
attr = curses.color_pair(COLOR_NORMAL)
sel = " "
# Determine fetcher color
fetcher_color = COLOR_NORMAL
if fetcher == "github":
fetcher_color = COLOR_SUCCESS
elif fetcher == "url":
fetcher_color = COLOR_STATUS
elif fetcher == "git":
fetcher_color = COLOR_HEADER
# Display source name with selection indicator
self.stdscr.addstr(4 + i, 2, f"{sel} {name:<20}", attr)
# Display fetcher with appropriate color
self.stdscr.addstr(4 + i, 24, fetcher, curses.color_pair(fetcher_color))
# Display reference, with optional branch and cargo indicators
branch = comp.get("branch") or ""
branch_suffix = f" [{branch}]" if branch else ""
cargo_suffix = " [cargo]" if "cargoHash" in comp else ""
ref_with_extras = f"ref={ref_short}{branch_suffix}{cargo_suffix}"
self.stdscr.addstr(
4 + i,
32,
ref_with_extras[: w - 34],
curses.color_pair(COLOR_NORMAL),
)
# Draw a separator line before the latest candidates section
y_latest = h - 8
for i in range(1, w - 1):
self.stdscr.addch(
y_latest, i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER)
)
# Latest candidates section for selected component (auto-fetched)
if self.snames:
_sel_name = self.snames[self.sidx]
_comp = self.merged_srcs[_sel_name]
_fetcher = _comp.get("fetcher", "none")
# Preload candidates lazily for selected item
if (
_fetcher in ("github", "git", "url")
and _sel_name not in self.candidates
):
self.fetch_candidates_for(_sel_name)
# Latest header with decoration — show branch if locked
_branch = _comp.get("branch") or ""
_latest_hdr = (
f"Latest Versions: (branch: {_branch})"
if _branch
else "Latest Versions:"
)
self.stdscr.addstr(
y_latest + 1,
2,
_latest_hdr[: w - 4],
curses.color_pair(COLOR_HEADER) | curses.A_BOLD,
)
if _fetcher in ("github", "git"):
_cand = self.candidates.get(_sel_name, {})
# Display each candidate with appropriate color
if _cand.get("release"):
self.stdscr.addstr(
y_latest + 2, 4, "Release:", curses.color_pair(COLOR_HEADER)
)
self.stdscr.addstr(
y_latest + 2,
13,
_cand.get("release"),
curses.color_pair(COLOR_SUCCESS),
)
if _cand.get("tag"):
self.stdscr.addstr(
y_latest + 2, 30, "Tag:", curses.color_pair(COLOR_HEADER)
)
self.stdscr.addstr(
y_latest + 2,
35,
_cand.get("tag"),
curses.color_pair(COLOR_SUCCESS),
)
if _cand.get("commit"):
self.stdscr.addstr(
y_latest + 3, 4, "Commit:", curses.color_pair(COLOR_HEADER)
)
self.stdscr.addstr(
y_latest + 3,
12,
(_cand.get("commit") or "")[:12],
curses.color_pair(COLOR_NORMAL),
)
elif _fetcher == "url":
_cand_u = self.url_candidates.get(_sel_name, {}) or {}
_tag = _cand_u.get("tag") or (
self.candidates.get(_sel_name, {}).get("release") or "-"
)
if _tag != "-":
self.stdscr.addstr(
y_latest + 2, 4, "Tag:", curses.color_pair(COLOR_HEADER)
)
self.stdscr.addstr(
y_latest + 2, 9, _tag, curses.color_pair(COLOR_SUCCESS)
)
if _cand_u.get("base"):
self.stdscr.addstr(
y_latest + 2, 30, "Base:", curses.color_pair(COLOR_HEADER)
)
self.stdscr.addstr(
y_latest + 2,
36,
_cand_u.get("base"),
curses.color_pair(COLOR_NORMAL),
)
if _cand_u.get("release"):
self.stdscr.addstr(
y_latest + 3, 4, "Release:", curses.color_pair(COLOR_HEADER)
)
self.stdscr.addstr(
y_latest + 3,
13,
_cand_u.get("release"),
curses.color_pair(COLOR_NORMAL),
)
else:
if self.pkg_name == "linux-cachyos" and _sel_name == "linux":
_suffix = self.cachyos_suffix()
_latest = self.fetch_cachyos_linux_latest(_suffix)
self.stdscr.addstr(
y_latest + 2,
4,
"Linux from PKGBUILD:",
curses.color_pair(COLOR_HEADER),
)
if _latest:
self.stdscr.addstr(
y_latest + 2,
24,
_latest,
curses.color_pair(COLOR_SUCCESS),
)
else:
self.stdscr.addstr(
y_latest + 2, 24, "-", curses.color_pair(COLOR_NORMAL)
)
else:
self.stdscr.addstr(
y_latest + 2,
4,
"No candidates available",
curses.color_pair(COLOR_NORMAL),
)
# Draw a separator line before the footer
for i in range(1, w - 1):
self.stdscr.addch(
h - 5, i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER)
)
# Footer instructions with better formatting
footer = "Enter: component actions | r: refresh | h: hash | i: url | e: edit | s: save | ←/→: variant | Backspace: back | q: quit"
footer_x = (w - len(footer)) // 2
self.stdscr.addstr(h - 4, footer_x, footer, curses.color_pair(COLOR_STATUS))
# Draw status at the bottom
self.draw_status(h, w)
self.stdscr.refresh()
ch = self.stdscr.getch()
if ch in (ord("q"), 27):
return None
elif ch == curses.KEY_BACKSPACE or ch == 127:
return "reload"
elif ch in (curses.KEY_LEFT,):
self.vidx = max(0, self.vidx - 1)
self.select_variant()
elif ch in (curses.KEY_RIGHT,):
self.vidx = min(len(self.variants) - 1, self.vidx + 1)
self.select_variant()
elif ch in (curses.KEY_UP, ord("k")):
self.sidx = max(0, self.sidx - 1)
elif ch in (curses.KEY_DOWN, ord("j")):
self.sidx = min(len(self.snames) - 1, self.sidx + 1)
elif ch in (ord("r"),):
if self.snames:
name = self.snames[self.sidx]
comp = self.merged_srcs[name]
fetcher = comp.get("fetcher", "none")
if self.pkg_name == "linux-cachyos" and name == "linux":
# Show available linux version from upstream PKGBUILD (.SRCINFO)
suffix = self.cachyos_suffix()
latest = self.fetch_cachyos_linux_latest(suffix)
rendered = render_templates(comp, self.merged_vars)
cur_version = str(rendered.get("version") or "")
url_hint = (
self.linux_tarball_url_for_version(latest)
if latest
else "-"
)
lines = [
f"linux-cachyos ({'base' if self.vidx == 0 else self.variants[self.vidx]}):",
f" current : {cur_version or '-'}",
f" available: {latest or '-'}",
f" tarball : {url_hint}",
]
show_popup(self.stdscr, lines)
else:
self.fetch_candidates_for(name)
cand = self.candidates.get(name, {})
branch = comp.get("branch") or ""
lines = [
f"Candidates for {name}:"
+ (f" (branch: {branch})" if branch else ""),
f" latest release: {cand.get('release') or '-'}",
f" latest tag : {cand.get('tag') or '-'}",
f" latest commit : {cand.get('commit') or '-'}",
]
show_popup(self.stdscr, lines)
elif ch in (ord("i"),):
# Show full rendered URL for URL-based sources
if self.snames:
name = self.snames[self.sidx]
comp = self.merged_srcs[name]
if comp.get("fetcher", "none") == "url":
rendered = render_templates(comp, self.merged_vars)
url = rendered.get("url") or rendered.get("urlTemplate") or ""
if url:
show_popup(self.stdscr, ["Full URL:", url])
else:
self.set_status("No URL available")
elif ch in (ord("h"),):
if self.snames:
name = self.snames[self.sidx]
sri = self.prefetch_hash_for(name)
if sri:
ts = self.target_dict.setdefault("sources", {})
compw = ts.setdefault(name, {})
compw["hash"] = sri
self._refresh_merged()
# If this source also has a cargoHash, recompute it now
if self._source_has_cargo(name):
self.set_status(
f"{name}: updated hash; computing cargo hash..."
)
self.stdscr.refresh()
cargo_sri = self.prefetch_cargo_hash_for(name)
if cargo_sri:
compw["cargoHash"] = cargo_sri
self._apply_cargo_hash_to_sibling(name, cargo_sri)
self._refresh_merged()
self.set_status(f"{name}: updated hash + cargo hash")
else:
self.set_status(
f"{name}: updated hash; cargo hash failed"
)
else:
self.set_status(f"{name}: updated hash")
else:
self.set_status(f"{name}: hash prefetch failed")
elif ch in (ord("e"),):
s = prompt_input(
self.stdscr, "Edit path=value (relative to selected base/variant): "
)
if s:
if "=" not in s:
self.set_status("Invalid input, expected key.path=value")
else:
k, v = s.split("=", 1)
path = [p for p in k.split(".") if p]
deep_set(self.cursor, path, v)
self.set_status(f"Set {k}={v}")
elif ch in (ord("s"),):
try:
self.save()
self.set_status("Saved.")
except Exception as e:
self.set_status(f"Save failed: {e}")
elif ch in (curses.KEY_ENTER, 10, 13):
if not self.snames:
continue
name = self.snames[self.sidx]
comp = self.merged_srcs[name]
fetcher = comp.get("fetcher", "none")
if fetcher in ("github", "git"):
# Ensure candidates loaded
if name not in self.candidates:
self.fetch_candidates_for(name)
cand = self.candidates.get(name, {})
branch = comp.get("branch") or ""
# Present small menu
items = []
if fetcher == "github":
# When branch-locked, only offer latest commit (tags are irrelevant)
if branch:
items = [
(
"Use latest commit (rev)",
("commit", cand.get("commit")),
),
("Recompute hash", ("hash", None)),
("Cancel", ("cancel", None)),
]
else:
items = [
(
"Use latest release (tag)",
("release", cand.get("release")),
),
("Use latest tag", ("tag", cand.get("tag"))),
(
"Use latest commit (rev)",
("commit", cand.get("commit")),
),
("Recompute hash", ("hash", None)),
("Cancel", ("cancel", None)),
]
else:
items = [
("Use latest commit (rev)", ("commit", cand.get("commit"))),
("Recompute hash", ("hash", None)),
("Cancel", ("cancel", None)),
]
# Inject cargo hash option before Cancel when applicable
has_cargo = self._source_has_cargo(name)
if has_cargo:
items = [item for item in items if item[1][0] != "cancel"] + [
("Recompute cargo hash", ("cargo_hash", None)),
("Cancel", ("cancel", None)),
]
# Build header with current and available refs
rendered = render_templates(comp, self.merged_vars)
cur_tag = rendered.get("tag") or ""
cur_rev = rendered.get("rev") or ""
cur_version = rendered.get("version") or ""
if cur_tag:
current_str = f"current: tag={cur_tag}"
elif cur_rev:
current_str = f"current: rev={cur_rev[:12]}"
elif cur_version:
current_str = f"current: version={cur_version}"
else:
current_str = "current: -"
if branch:
current_str += f" (branch: {branch})"
cur_cargo = comp.get("cargoHash", "")
header_lines = [
current_str,
f"available: release={cand.get('release') or '-'} tag={cand.get('tag') or '-'} commit={(cand.get('commit') or '')[:12] or '-'}",
]
if has_cargo:
header_lines.append(
f"cargoHash: {cur_cargo[:32] + '...' if len(cur_cargo) > 32 else cur_cargo or '-'}"
)
choice = select_menu(
self.stdscr,
f"Actions for {name}",
[label for label, _ in items],
header=header_lines,
)
if choice is not None:
kind, val = items[choice][1]
if kind in ("release", "tag", "commit"):
if val:
self.set_ref(name, kind, val)
# update src hash
sri = self.prefetch_hash_for(name)
if sri:
ts = self.target_dict.setdefault("sources", {})
compw = ts.setdefault(name, {})
compw["hash"] = sri
self._refresh_merged()
# also update cargo hash if applicable
if has_cargo:
self.set_status(
f"{name}: set {kind}, hashing (src)..."
)
cargo_sri = self.prefetch_cargo_hash_for(name)
if cargo_sri:
ts = self.target_dict.setdefault("sources", {})
compw = ts.setdefault(name, {})
compw["cargoHash"] = cargo_sri
self._apply_cargo_hash_to_sibling(
name, cargo_sri
)
self._refresh_merged()
self.set_status(
f"{name}: set {kind}, updated src + cargo hash"
)
else:
self.set_status(
f"{name}: set {kind}, updated src hash; cargo hash failed"
)
else:
self.set_status(
f"{name}: set {kind} and updated hash"
)
else:
self.set_status(f"No candidate {kind}")
elif kind == "hash":
sri = self.prefetch_hash_for(name)
if sri:
ts = self.target_dict.setdefault("sources", {})
compw = ts.setdefault(name, {})
compw["hash"] = sri
self._refresh_merged()
self.set_status(f"{name}: updated hash")
else:
self.set_status("hash prefetch failed")
elif kind == "cargo_hash":
self.set_status(f"{name}: computing cargo hash...")
self.stdscr.refresh()
cargo_sri = self.prefetch_cargo_hash_for(name)
if cargo_sri:
ts = self.target_dict.setdefault("sources", {})
compw = ts.setdefault(name, {})
compw["cargoHash"] = cargo_sri
self._apply_cargo_hash_to_sibling(name, cargo_sri)
self._refresh_merged()
self.set_status(f"{name}: updated cargo hash")
else:
self.set_status(
f"{name}: cargo hash computation failed"
)
else:
pass
elif fetcher == "url":
# Offer latest release update (for proton-cachyos-like schemas) and/or hash recompute
cand = self.url_candidates.get(name)
menu_items: List[
Tuple[str, Tuple[str, Optional[Dict[str, str]]]]
] = []
if cand and cand.get("base") and cand.get("release"):
menu_items.append(
(
"Use latest release (update variables.base/release)",
("update_vars", cand),
)
)
menu_items.append(("Recompute hash (prefetch)", ("hash", None)))
menu_items.append(("Cancel", ("cancel", None)))
# Build header with current and available release info
base = str(self.merged_vars.get("base") or "")
rel = str(self.merged_vars.get("release") or "")
rp = str(self.merged_vars.get("releasePrefix") or "")
rs = str(self.merged_vars.get("releaseSuffix") or "")
current_tag = f"{rp}{base}-{rel}{rs}" if (base and rel) else ""
if current_tag:
current_str = f"current: {current_tag}"
elif base or rel:
current_str = (
f"current: base={base or '-'} release={rel or '-'}"
)
else:
current_str = "current: -"
header_lines = [
current_str,
f"available: tag={(cand.get('tag') or '-') if cand else '-'} base={(cand.get('base') or '-') if cand else '-'} release={(cand.get('release') or '-') if cand else '-'}",
]
choice = select_menu(
self.stdscr,
f"Actions for {name}",
[label for label, _ in menu_items],
header=header_lines,
)
if choice is not None:
kind, payload = menu_items[choice][1]
if kind == "update_vars" and isinstance(payload, dict):
# Write variables into selected base/variant dict
vars_dict = self.target_dict.setdefault("variables", {})
vars_dict["base"] = payload["base"]
vars_dict["release"] = payload["release"]
# Recompute merged view to reflect new variables
self.recompute_view()
# Prefetch and update hash
sri = self.prefetch_hash_for(name)
if sri:
ts = self.target_dict.setdefault("sources", {})
compw = ts.setdefault(name, {})
compw["hash"] = sri
self.set_status(
f"{name}: updated to {payload['base']}.{payload['release']} and refreshed hash"
)
else:
self.set_status(
"hash prefetch failed after variable update"
)
elif kind == "hash":
sri = self.prefetch_hash_for(name)
if sri:
ts = self.target_dict.setdefault("sources", {})
compw = ts.setdefault(name, {})
compw["hash"] = sri
self.set_status(f"{name}: updated hash")
else:
self.set_status("hash prefetch failed")
else:
pass
else:
if self.pkg_name == "linux-cachyos" and name == "linux":
# Offer update of linux version from upstream PKGBUILD (.SRCINFO)
suffix = self.cachyos_suffix()
latest = self.fetch_cachyos_linux_latest(suffix)
rendered = render_templates(comp, self.merged_vars)
cur_version = str(rendered.get("version") or "")
header_lines = [
f"current: version={cur_version or '-'}",
f"available: version={latest or '-'}",
]
opts = []
if latest:
opts.append(
f"Update linux version to {latest} from PKGBUILD (.SRCINFO)"
)
else:
opts.append("Update linux version from PKGBUILD (.SRCINFO)")
opts.append("Cancel")
choice = select_menu(
self.stdscr,
f"Actions for {name}",
opts,
header=header_lines,
)
if choice == 0 and latest:
self.update_linux_from_pkgbuild(name)
else:
pass
else:
show_popup(
self.stdscr,
[
f"{name}: fetcher={fetcher}",
"Use 'e' to edit fields manually.",
],
)
else:
pass
def select_menu(
stdscr, title: str, options: List[str], header: Optional[List[str]] = None
) -> Optional[int]:
idx = 0
while True:
stdscr.clear()
h, w = stdscr.getmaxyx()
# Calculate menu dimensions with better bounds checking
max_opt_len = max((len(opt) + 4 for opt in options), default=0)
title_len = len(title) + 4
menu_width = min(w - 4, max(40, max(title_len, max_opt_len)))
menu_height = min(h - 4, len(options) + (len(header) if header else 0) + 4)
# Calculate position for centered menu
start_x = (w - menu_width) // 2
start_y = (h - menu_height) // 2
# Draw border around menu
draw_border(stdscr, start_y, start_x, menu_height, menu_width)
# Draw title
title_x = start_x + (menu_width - len(title)) // 2
stdscr.addstr(
start_y,
title_x,
f" {title} ",
curses.color_pair(COLOR_TITLE) | curses.A_BOLD,
)
# Draw header if provided
y = start_y + 1
if header:
for line in header:
if y >= start_y + menu_height - 2:
break
# Ensure we don't write beyond menu width
line_str = str(line)
if len(line_str) > menu_width - 4:
line_str = line_str[: menu_width - 7] + "..."
stdscr.addstr(
y,
start_x + 2,
line_str,
curses.color_pair(COLOR_HEADER),
)
y += 1
# Add separator line after header
for i in range(1, menu_width - 1):
stdscr.addch(
y, start_x + i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER)
)
y += 1
# Draw options
options_start_y = y
max_visible_options = max(1, start_y + menu_height - options_start_y - 1)
visible_options = min(len(options), max_visible_options)
for i, opt in enumerate(options[:visible_options], start=0):
# Highlight selected option
if i == idx:
attr = curses.color_pair(COLOR_HIGHLIGHT)
sel = "" # Use a fancier selector
else:
attr = curses.color_pair(COLOR_NORMAL)
sel = " "
# Truncate long options to fit in menu
opt_text = f"{sel} {opt}"
if len(opt_text) > menu_width - 4:
opt_text = opt_text[: menu_width - 7] + "..."
stdscr.addstr(options_start_y + i, start_x + 2, opt_text, attr)
# Draw footer
footer = "Enter: select | Backspace: cancel"
footer_x = start_x + (menu_width - len(footer)) // 2
stdscr.addstr(
start_y + menu_height - 1, footer_x, footer, curses.color_pair(COLOR_STATUS)
)
stdscr.refresh()
ch = stdscr.getch()
if ch in (curses.KEY_UP, ord("k")):
idx = max(0, idx - 1)
elif ch in (curses.KEY_DOWN, ord("j")):
idx = min(len(options) - 1, idx + 1)
elif ch in (curses.KEY_ENTER, 10, 13):
return idx
elif ch == curses.KEY_BACKSPACE or ch == 127 or ch == 27:
return None
# ------------------------------ main ------------------------------
def main(stdscr):
curses.curs_set(0) # Hide cursor
stdscr.nodelay(False) # Blocking input
# Initialize colors
if curses.has_colors():
init_colors()
try:
# Display welcome screen
h, w = stdscr.getmaxyx()
welcome_lines = [
"╔═══════════════════════════════════════════╗",
"║ ║",
"║ NixOS Package Version Manager ║",
"║ ║",
"║ Browse and update package versions with ║",
"║ this interactive TUI. Navigate using ║",
"║ arrow keys and Enter to select. ║",
"║ ║",
"╚═══════════════════════════════════════════╝",
"",
"Loading packages...",
]
# Center the welcome message
start_y = (h - len(welcome_lines)) // 2
for i, line in enumerate(welcome_lines):
if start_y + i < h:
start_x = (w - len(line)) // 2
if "NixOS Package Version Manager" in line:
stdscr.addstr(
start_y + i,
start_x,
line,
curses.color_pair(COLOR_TITLE) | curses.A_BOLD,
)
elif "Loading packages..." in line:
stdscr.addstr(
start_y + i, start_x, line, curses.color_pair(COLOR_STATUS)
)
else:
stdscr.addstr(
start_y + i, start_x, line, curses.color_pair(COLOR_NORMAL)
)
stdscr.refresh()
# Start the main screen after a short delay
screen = PackagesScreen(stdscr)
screen.run()
except Exception:
curses.endwin()
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
curses.wrapper(main)