1866 lines
81 KiB
Python
Executable File
1866 lines
81 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Interactive TUI for browsing and updating unified version.json files.
|
|
|
|
Features:
|
|
- Scans packages/**/version.json and lists all packages
|
|
- Per-package view:
|
|
- Choose base or any variant
|
|
- List all sources/components with current ref (tag/rev/url/version) and hash
|
|
- For GitHub sources: fetch candidates (latest release tag, latest tag, latest commit)
|
|
- For Git sources: fetch latest commit (HEAD)
|
|
- For URL sources: recompute hash (url/urlTemplate with rendered variables)
|
|
- Actions on a component:
|
|
- Update to one of the candidates (sets tag or rev) and optionally re-hash
|
|
- Recompute hash (prefetch)
|
|
- Edit any field via path=value (e.g., variables.version=2025.07)
|
|
- Writes changes back to version.json
|
|
|
|
Dependencies:
|
|
- Standard library + external CLI tools:
|
|
- nix-prefetch-url (or `nix prefetch-url`) and `nix hash to-sri`
|
|
- nix-prefetch-git
|
|
- git
|
|
- Optional: GITHUB_TOKEN env var to increase GitHub API rate limits
|
|
|
|
Usage:
|
|
scripts/version_tui.py
|
|
Controls:
|
|
- Up/Down to navigate lists
|
|
- Enter to select
|
|
- Backspace to go back
|
|
- q to quit
|
|
- On component screen:
|
|
r = refresh candidates
|
|
h = recompute hash (prefetch)
|
|
e = edit arbitrary field (path=value)
|
|
s = save to disk
|
|
"""
|
|
|
|
import curses
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import traceback
|
|
import urllib.request
|
|
import urllib.error
|
|
from urllib.parse import urlparse
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple, Union
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
PKGS_DIR = ROOT / "packages"
|
|
|
|
Json = Dict[str, Any]
|
|
|
|
# ------------------------------ Utilities ------------------------------
|
|
|
|
def eprintln(*args, **kwargs):
|
|
print(*args, file=sys.stderr, **kwargs)
|
|
|
|
def load_json(path: Path) -> Json:
|
|
with path.open("r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
def save_json(path: Path, data: Json):
|
|
tmp = path.with_suffix(".tmp")
|
|
with tmp.open("w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
f.write("\n")
|
|
tmp.replace(path)
|
|
|
|
def render_templates(value: Any, variables: Dict[str, Any]) -> Any:
|
|
if isinstance(value, str):
|
|
def repl(m):
|
|
name = m.group(1)
|
|
return str(variables.get(name, m.group(0)))
|
|
return re.sub(r"\$\{([^}]+)\}", repl, value)
|
|
elif isinstance(value, dict):
|
|
return {k: render_templates(v, variables) for k, v in value.items()}
|
|
elif isinstance(value, list):
|
|
return [render_templates(v, variables) for v in value]
|
|
return value
|
|
|
|
def deep_set(o: Json, path: List[str], value: Any):
|
|
cur = o
|
|
for p in path[:-1]:
|
|
if p not in cur or not isinstance(cur[p], dict):
|
|
cur[p] = {}
|
|
cur = cur[p]
|
|
cur[path[-1]] = value
|
|
|
|
# ------------------------------ Merge helpers (match lib/versioning.nix) ------------------------------
|
|
|
|
def deep_merge(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]:
|
|
out = dict(a)
|
|
for k, v in b.items():
|
|
if k in out and isinstance(out[k], dict) and isinstance(v, dict):
|
|
out[k] = deep_merge(out[k], v)
|
|
else:
|
|
out[k] = v
|
|
return out
|
|
|
|
def merge_sources(base_sources: Dict[str, Any], overrides: Dict[str, Any]) -> Dict[str, Any]:
|
|
names = set(base_sources.keys()) | set(overrides.keys())
|
|
result: Dict[str, Any] = {}
|
|
for n in names:
|
|
if n in base_sources and n in overrides:
|
|
if isinstance(base_sources[n], dict) and isinstance(overrides[n], dict):
|
|
result[n] = deep_merge(base_sources[n], overrides[n])
|
|
else:
|
|
result[n] = overrides[n]
|
|
elif n in overrides:
|
|
result[n] = overrides[n]
|
|
else:
|
|
result[n] = base_sources[n]
|
|
return result
|
|
|
|
def merged_view(spec: Json, variant_name: Optional[str]) -> Tuple[Dict[str, Any], Dict[str, Any], Json]:
|
|
"""
|
|
Returns (merged_variables, merged_sources, target_dict_to_write)
|
|
merged_* are used for display/prefetch; target_dict_to_write is where updates must be written (base or selected variant).
|
|
"""
|
|
base_vars = spec.get("variables", {}) or {}
|
|
base_sources = spec.get("sources", {}) or {}
|
|
if variant_name:
|
|
vdict = spec.get("variants", {}).get(variant_name)
|
|
if not isinstance(vdict, dict):
|
|
raise ValueError(f"Variant '{variant_name}' not found")
|
|
v_vars = vdict.get("variables", {}) or {}
|
|
v_sources = vdict.get("sources", {}) or {}
|
|
merged_vars = dict(base_vars); merged_vars.update(v_vars)
|
|
merged_srcs = merge_sources(base_sources, v_sources)
|
|
return merged_vars, merged_srcs, vdict
|
|
else:
|
|
return dict(base_vars), dict(base_sources), spec
|
|
|
|
def run_cmd(args: List[str]) -> Tuple[int, str, str]:
|
|
try:
|
|
p = subprocess.run(args, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False)
|
|
return p.returncode, p.stdout.strip(), p.stderr.strip()
|
|
except Exception as e:
|
|
return 1, "", str(e)
|
|
|
|
def run_get_stdout(args: List[str]) -> Optional[str]:
|
|
code, out, err = run_cmd(args)
|
|
if code != 0:
|
|
eprintln(f"Command failed: {' '.join(args)}\n{err}")
|
|
return None
|
|
return out
|
|
|
|
def nix_prefetch_url(url: str) -> Optional[str]:
|
|
# returns SRI
|
|
out = run_get_stdout(["nix-prefetch-url", "--type", "sha256", url])
|
|
if out is None:
|
|
out = run_get_stdout(["nix", "prefetch-url", url])
|
|
if out is None:
|
|
return None
|
|
sri = run_get_stdout(["nix", "hash", "to-sri", "--type", "sha256", out.strip()])
|
|
return sri
|
|
|
|
def nix_prefetch_git(url: str, rev: str) -> Optional[str]:
|
|
out = run_get_stdout(["nix-prefetch-git", "--no-deepClone", "--rev", rev, url])
|
|
if out is None:
|
|
return None
|
|
base32 = None
|
|
try:
|
|
data = json.loads(out)
|
|
base32 = data.get("sha256") or data.get("hash")
|
|
except Exception:
|
|
lines = [l for l in out.splitlines() if l.strip()]
|
|
if lines:
|
|
base32 = lines[-1].strip()
|
|
if not base32:
|
|
return None
|
|
sri = run_get_stdout(["nix", "hash", "to-sri", "--type", "sha256", base32])
|
|
return sri
|
|
|
|
def http_get_json(url: str, token: Optional[str] = None) -> Any:
|
|
req = urllib.request.Request(url, headers={"Accept": "application/vnd.github+json"})
|
|
if token:
|
|
req.add_header("Authorization", f"Bearer {token}")
|
|
with urllib.request.urlopen(req) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
def http_get_text(url: str) -> Optional[str]:
|
|
try:
|
|
# Provide a basic User-Agent to avoid some hosts rejecting the request
|
|
req = urllib.request.Request(url, headers={"User-Agent": "version-tui/1.0"})
|
|
with urllib.request.urlopen(req) as resp:
|
|
return resp.read().decode("utf-8")
|
|
except Exception as e:
|
|
eprintln(f"http_get_text failed for {url}: {e}")
|
|
return None
|
|
|
|
def gh_latest_release(owner: str, repo: str, token: Optional[str]) -> Optional[str]:
|
|
try:
|
|
data = http_get_json(f"https://api.github.com/repos/{owner}/{repo}/releases/latest", token)
|
|
return data.get("tag_name")
|
|
except Exception as e:
|
|
eprintln(f"latest_release failed for {owner}/{repo}: {e}")
|
|
return None
|
|
|
|
def gh_latest_tag(owner: str, repo: str, token: Optional[str]) -> Optional[str]:
|
|
try:
|
|
data = http_get_json(f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=100", token)
|
|
tags = [t.get("name") for t in data if "name" in t]
|
|
return tags[0] if tags else None
|
|
except Exception as e:
|
|
eprintln(f"latest_tag failed for {owner}/{repo}: {e}")
|
|
return None
|
|
|
|
def gh_list_tags(owner: str, repo: str, token: Optional[str]) -> List[str]:
|
|
try:
|
|
data = http_get_json(f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=100", token)
|
|
return [t.get("name") for t in data if isinstance(t, dict) and "name" in t]
|
|
except Exception as e:
|
|
eprintln(f"list_tags failed for {owner}/{repo}: {e}")
|
|
return []
|
|
|
|
def gh_head_commit(owner: str, repo: str) -> Optional[str]:
|
|
out = run_get_stdout(["git", "ls-remote", f"https://github.com/{owner}/{repo}.git", "HEAD"])
|
|
if not out:
|
|
return None
|
|
return out.split()[0]
|
|
|
|
def gh_tarball_url(owner: str, repo: str, ref: str) -> str:
|
|
return f"https://codeload.github.com/{owner}/{repo}/tar.gz/{ref}"
|
|
|
|
def gh_release_tags_api(owner: str, repo: str, token: Optional[str]) -> List[str]:
|
|
"""
|
|
Return recent release tag names for a repo using GitHub API.
|
|
"""
|
|
try:
|
|
data = http_get_json(f"https://api.github.com/repos/{owner}/{repo}/releases?per_page=50", token)
|
|
return [r.get("tag_name") for r in data if isinstance(r, dict) and "tag_name" in r]
|
|
except Exception as e:
|
|
eprintln(f"releases list failed for {owner}/{repo}: {e}")
|
|
return []
|
|
|
|
# ------------------------------ Data scanning ------------------------------
|
|
|
|
def find_packages() -> List[Tuple[str, Path, bool, bool]]:
|
|
results = []
|
|
# Find regular packages with version.json
|
|
for p in PKGS_DIR.rglob("version.json"):
|
|
# name is directory name under packages (e.g., raspberrypi/linux-rpi => raspberrypi/linux-rpi)
|
|
rel = p.relative_to(PKGS_DIR).parent
|
|
results.append((str(rel), p, False, False)) # (name, path, is_python, is_homeassistant)
|
|
|
|
# Find Python packages with default.nix
|
|
python_dir = PKGS_DIR / "python"
|
|
if python_dir.exists():
|
|
for pkg_dir in python_dir.iterdir():
|
|
if pkg_dir.is_dir():
|
|
nix_file = pkg_dir / "default.nix"
|
|
if nix_file.exists():
|
|
# name is python/package-name
|
|
rel = pkg_dir.relative_to(PKGS_DIR)
|
|
results.append((str(rel), nix_file, True, False)) # (name, path, is_python, is_homeassistant)
|
|
|
|
# Find Home Assistant components with default.nix
|
|
homeassistant_dir = PKGS_DIR / "homeassistant"
|
|
if homeassistant_dir.exists():
|
|
for pkg_dir in homeassistant_dir.iterdir():
|
|
if pkg_dir.is_dir():
|
|
nix_file = pkg_dir / "default.nix"
|
|
if nix_file.exists():
|
|
# name is homeassistant/component-name
|
|
rel = pkg_dir.relative_to(PKGS_DIR)
|
|
results.append((str(rel), nix_file, False, True)) # (name, path, is_python, is_homeassistant)
|
|
|
|
results.sort()
|
|
return results
|
|
|
|
def parse_python_package(path: Path) -> Dict[str, Any]:
|
|
"""Parse a Python package's default.nix file to extract version and source information."""
|
|
with path.open("r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
|
|
# Extract version
|
|
version_match = re.search(r'version\s*=\s*"([^"]+)"', content)
|
|
version = version_match.group(1) if version_match else ""
|
|
|
|
# Extract pname (package name)
|
|
pname_match = re.search(r'pname\s*=\s*"([^"]+)"', content)
|
|
pname = pname_match.group(1) if pname_match else ""
|
|
|
|
# Check for fetchFromGitHub pattern
|
|
fetch_github_match = re.search(r'src\s*=\s*fetchFromGitHub\s*\{([^}]+)\}', content, re.DOTALL)
|
|
|
|
# Check for fetchPypi pattern
|
|
fetch_pypi_match = re.search(r'src\s*=\s*.*fetchPypi\s*\{([^}]+)\}', content, re.DOTALL)
|
|
|
|
# Create a structure similar to version.json for compatibility
|
|
result = {
|
|
"variables": {},
|
|
"sources": {}
|
|
}
|
|
|
|
# Only add non-empty values to variables
|
|
if version:
|
|
result["variables"]["version"] = version
|
|
|
|
# Determine source name - use pname, repo name, or derive from path
|
|
source_name = ""
|
|
if pname:
|
|
source_name = pname.lower()
|
|
else:
|
|
# Use directory name as source name
|
|
source_name = path.parent.name.lower()
|
|
|
|
# Handle fetchFromGitHub pattern
|
|
if fetch_github_match:
|
|
fetch_block = fetch_github_match.group(1)
|
|
|
|
# Extract GitHub info from the fetchFromGitHub block
|
|
owner_match = re.search(r'owner\s*=\s*"([^"]+)"', fetch_block)
|
|
repo_match = re.search(r'repo\s*=\s*"([^"]+)"', fetch_block)
|
|
rev_match = re.search(r'rev\s*=\s*"([^"]+)"', fetch_block)
|
|
hash_match = re.search(r'(sha256|hash)\s*=\s*"([^"]+)"', fetch_block)
|
|
|
|
owner = owner_match.group(1) if owner_match else ""
|
|
repo = repo_match.group(1) if repo_match else ""
|
|
rev = rev_match.group(1) if rev_match else ""
|
|
hash_value = hash_match.group(2) if hash_match else ""
|
|
|
|
# Create source entry
|
|
result["sources"][source_name] = {
|
|
"fetcher": "github",
|
|
"owner": owner,
|
|
"repo": repo,
|
|
"hash": hash_value
|
|
}
|
|
|
|
# Handle rev field which might contain a tag or version reference
|
|
if rev:
|
|
# Check if it's a tag reference (starts with v)
|
|
if rev.startswith("v"):
|
|
result["sources"][source_name]["tag"] = rev
|
|
# Check if it contains ${version} variable
|
|
elif "${version}" in rev:
|
|
result["sources"][source_name]["tag"] = rev
|
|
# Check if it's "master" or a specific branch
|
|
elif rev in ["master", "main"]:
|
|
result["sources"][source_name]["rev"] = rev
|
|
# Otherwise treat as a regular revision
|
|
else:
|
|
result["sources"][source_name]["rev"] = rev
|
|
# Handle fetchPypi pattern
|
|
elif fetch_pypi_match:
|
|
fetch_block = fetch_pypi_match.group(1)
|
|
|
|
# Extract PyPI info
|
|
hash_match = re.search(r'(sha256|hash)\s*=\s*"([^"]+)"', fetch_block)
|
|
hash_value = hash_match.group(2) if hash_match else ""
|
|
|
|
# Look for GitHub info in meta section
|
|
homepage_match = re.search(r'homepage\s*=\s*"https://github.com/([^/]+)/([^"]+)"', content)
|
|
|
|
if homepage_match:
|
|
owner = homepage_match.group(1)
|
|
repo = homepage_match.group(2)
|
|
|
|
# Create source entry with GitHub info
|
|
result["sources"][source_name] = {
|
|
"fetcher": "github",
|
|
"owner": owner,
|
|
"repo": repo,
|
|
"hash": hash_value,
|
|
"pypi": True # Mark as PyPI source
|
|
}
|
|
|
|
# Add version as tag if available
|
|
if version:
|
|
result["sources"][source_name]["tag"] = f"v{version}"
|
|
else:
|
|
# Create PyPI source entry
|
|
result["sources"][source_name] = {
|
|
"fetcher": "pypi",
|
|
"pname": pname,
|
|
"version": version,
|
|
"hash": hash_value
|
|
}
|
|
else:
|
|
# Try to extract standalone GitHub info if present
|
|
owner_match = re.search(r'owner\s*=\s*"([^"]+)"', content)
|
|
repo_match = re.search(r'repo\s*=\s*"([^"]+)"', content)
|
|
rev_match = re.search(r'rev\s*=\s*"([^"]+)"', content)
|
|
tag_match = re.search(r'tag\s*=\s*"([^"]+)"', content)
|
|
hash_match = re.search(r'(sha256|hash)\s*=\s*"([^"]+)"', content)
|
|
|
|
owner = owner_match.group(1) if owner_match else ""
|
|
repo = repo_match.group(1) if repo_match else ""
|
|
rev = rev_match.group(1) if rev_match else ""
|
|
tag = tag_match.group(1) if tag_match else ""
|
|
hash_value = hash_match.group(2) if hash_match else ""
|
|
|
|
# Try to extract URL if GitHub info is not present
|
|
url_match = re.search(r'url\s*=\s*"([^"]+)"', content)
|
|
url = url_match.group(1) if url_match else ""
|
|
|
|
# Check for GitHub homepage in meta section
|
|
homepage_match = re.search(r'homepage\s*=\s*"https://github.com/([^/]+)/([^"]+)"', content)
|
|
if homepage_match and not (owner and repo):
|
|
owner = homepage_match.group(1)
|
|
repo = homepage_match.group(2)
|
|
|
|
# Handle GitHub sources
|
|
if owner and repo:
|
|
result["sources"][source_name] = {
|
|
"fetcher": "github",
|
|
"owner": owner,
|
|
"repo": repo,
|
|
"hash": hash_value
|
|
}
|
|
|
|
# Handle tag
|
|
if tag:
|
|
result["sources"][source_name]["tag"] = tag
|
|
# Handle rev
|
|
elif rev:
|
|
result["sources"][source_name]["rev"] = rev
|
|
# Handle URL sources
|
|
elif url:
|
|
result["sources"][source_name] = {
|
|
"fetcher": "url",
|
|
"url": url,
|
|
"hash": hash_value
|
|
}
|
|
# Fallback for packages with no clear source info
|
|
else:
|
|
# Create a minimal source entry so the package shows up in the UI
|
|
result["sources"][source_name] = {
|
|
"fetcher": "unknown",
|
|
"hash": hash_value
|
|
}
|
|
|
|
return result
|
|
|
|
def update_python_package(path: Path, source_name: str, updates: Dict[str, Any]) -> bool:
|
|
"""Update a Python package's default.nix file with new version and/or hash."""
|
|
with path.open("r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
|
|
modified = False
|
|
|
|
# Update version if provided
|
|
if "version" in updates:
|
|
new_version = updates["version"]
|
|
content, version_count = re.subn(
|
|
r'(version\s*=\s*)"([^"]+)"',
|
|
f'\\1"{new_version}"',
|
|
content
|
|
)
|
|
if version_count > 0:
|
|
modified = True
|
|
|
|
# Update hash if provided
|
|
if "hash" in updates:
|
|
new_hash = updates["hash"]
|
|
# Match both sha256 and hash attributes
|
|
content, hash_count = re.subn(
|
|
r'(sha256|hash)\s*=\s*"([^"]+)"',
|
|
f'\\1 = "{new_hash}"',
|
|
content
|
|
)
|
|
if hash_count > 0:
|
|
modified = True
|
|
|
|
# Update tag if provided
|
|
if "tag" in updates:
|
|
new_tag = updates["tag"]
|
|
content, tag_count = re.subn(
|
|
r'(tag\s*=\s*)"([^"]+)"',
|
|
f'\\1"{new_tag}"',
|
|
content
|
|
)
|
|
if tag_count > 0:
|
|
modified = True
|
|
|
|
# Update rev if provided
|
|
if "rev" in updates:
|
|
new_rev = updates["rev"]
|
|
content, rev_count = re.subn(
|
|
r'(rev\s*=\s*)"([^"]+)"',
|
|
f'\\1"{new_rev}"',
|
|
content
|
|
)
|
|
if rev_count > 0:
|
|
modified = True
|
|
|
|
if modified:
|
|
with path.open("w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
|
|
return modified
|
|
|
|
def parse_homeassistant_component(path: Path) -> Dict[str, Any]:
|
|
"""Parse a Home Assistant component's default.nix file to extract version and source information."""
|
|
with path.open("r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
|
|
# Extract domain, version, and owner
|
|
domain_match = re.search(r'domain\s*=\s*"([^"]+)"', content)
|
|
version_match = re.search(r'version\s*=\s*"([^"]+)"', content)
|
|
owner_match = re.search(r'owner\s*=\s*"([^"]+)"', content)
|
|
|
|
domain = domain_match.group(1) if domain_match else ""
|
|
version = version_match.group(1) if version_match else ""
|
|
owner = owner_match.group(1) if owner_match else ""
|
|
|
|
# Extract GitHub repo info
|
|
repo_match = re.search(r'repo\s*=\s*"([^"]+)"', content)
|
|
rev_match = re.search(r'rev\s*=\s*"([^"]+)"', content)
|
|
tag_match = re.search(r'tag\s*=\s*"([^"]+)"', content)
|
|
hash_match = re.search(r'(sha256|hash)\s*=\s*"([^"]+)"', content)
|
|
|
|
repo = repo_match.group(1) if repo_match else ""
|
|
rev = rev_match.group(1) if rev_match else ""
|
|
tag = tag_match.group(1) if tag_match else ""
|
|
hash_value = hash_match.group(2) if hash_match else ""
|
|
|
|
# Create a structure similar to version.json for compatibility
|
|
result = {
|
|
"variables": {},
|
|
"sources": {}
|
|
}
|
|
|
|
# Only add non-empty values to variables
|
|
if version:
|
|
result["variables"]["version"] = version
|
|
if domain:
|
|
result["variables"]["domain"] = domain
|
|
|
|
# Determine source name - use domain or directory name
|
|
source_name = domain if domain else path.parent.name.lower()
|
|
|
|
# Handle GitHub sources
|
|
if owner:
|
|
repo_name = repo if repo else source_name
|
|
result["sources"][source_name] = {
|
|
"fetcher": "github",
|
|
"owner": owner,
|
|
"repo": repo_name
|
|
}
|
|
|
|
# Only add non-empty values
|
|
if hash_value:
|
|
result["sources"][source_name]["hash"] = hash_value
|
|
|
|
# Handle tag or rev
|
|
if tag:
|
|
result["sources"][source_name]["tag"] = tag
|
|
elif rev:
|
|
result["sources"][source_name]["rev"] = rev
|
|
elif version: # If no tag or rev specified, but version exists, use version as tag
|
|
result["sources"][source_name]["tag"] = version
|
|
else:
|
|
# Fallback for components with no clear source info
|
|
result["sources"][source_name] = {
|
|
"fetcher": "unknown"
|
|
}
|
|
if hash_value:
|
|
result["sources"][source_name]["hash"] = hash_value
|
|
|
|
return result
|
|
|
|
def update_homeassistant_component(path: Path, source_name: str, updates: Dict[str, Any]) -> bool:
|
|
"""Update a Home Assistant component's default.nix file with new version and/or hash."""
|
|
with path.open("r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
|
|
modified = False
|
|
|
|
# Update version if provided
|
|
if "version" in updates:
|
|
new_version = updates["version"]
|
|
content, version_count = re.subn(
|
|
r'(version\s*=\s*)"([^"]+)"',
|
|
f'\\1"{new_version}"',
|
|
content
|
|
)
|
|
if version_count > 0:
|
|
modified = True
|
|
|
|
# Update hash if provided
|
|
if "hash" in updates:
|
|
new_hash = updates["hash"]
|
|
# Match both sha256 and hash attributes in src = fetchFromGitHub { ... }
|
|
content, hash_count = re.subn(
|
|
r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(sha256|hash)\s*=\s*"([^"]+)"([^}]*\})',
|
|
f'\\1\\2 = "{new_hash}"\\4',
|
|
content
|
|
)
|
|
if hash_count > 0:
|
|
modified = True
|
|
|
|
# Update tag if provided
|
|
if "tag" in updates:
|
|
new_tag = updates["tag"]
|
|
content, tag_count = re.subn(
|
|
r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(tag|rev)\s*=\s*"([^"]+)"([^}]*\})',
|
|
f'\\1\\2 = "{new_tag}"\\4',
|
|
content
|
|
)
|
|
if tag_count == 0: # If no tag/rev found, try to add it
|
|
content, tag_count = re.subn(
|
|
r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(hash\s*=\s*"[^"]+")([^}]*\})',
|
|
f'\\1\\2;\n tag = "{new_tag}"\\3',
|
|
content
|
|
)
|
|
if tag_count > 0:
|
|
modified = True
|
|
|
|
# Update rev if provided
|
|
if "rev" in updates:
|
|
new_rev = updates["rev"]
|
|
content, rev_count = re.subn(
|
|
r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(rev|tag)\s*=\s*"([^"]+)"([^}]*\})',
|
|
f'\\1\\2 = "{new_rev}"\\4',
|
|
content
|
|
)
|
|
if rev_count == 0: # If no rev/tag found, try to add it
|
|
content, rev_count = re.subn(
|
|
r'(src\s*=\s*fetchFromGitHub\s*\{[^}]*)(hash\s*=\s*"[^"]+")([^}]*\})',
|
|
f'\\1\\2;\n rev = "{new_rev}"\\3',
|
|
content
|
|
)
|
|
if rev_count > 0:
|
|
modified = True
|
|
|
|
if modified:
|
|
with path.open("w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
|
|
return modified
|
|
|
|
# ------------------------------ TUI helpers ------------------------------
|
|
|
|
# Define color pairs
|
|
COLOR_NORMAL = 1
|
|
COLOR_HIGHLIGHT = 2
|
|
COLOR_HEADER = 3
|
|
COLOR_STATUS = 4
|
|
COLOR_ERROR = 5
|
|
COLOR_SUCCESS = 6
|
|
COLOR_BORDER = 7
|
|
COLOR_TITLE = 8
|
|
|
|
def init_colors():
|
|
"""Initialize color pairs for the TUI."""
|
|
curses.start_color()
|
|
curses.use_default_colors()
|
|
|
|
# Define color pairs
|
|
curses.init_pair(COLOR_NORMAL, curses.COLOR_WHITE, -1)
|
|
curses.init_pair(COLOR_HIGHLIGHT, curses.COLOR_BLACK, curses.COLOR_CYAN)
|
|
curses.init_pair(COLOR_HEADER, curses.COLOR_CYAN, -1)
|
|
curses.init_pair(COLOR_STATUS, curses.COLOR_YELLOW, -1)
|
|
curses.init_pair(COLOR_ERROR, curses.COLOR_RED, -1)
|
|
curses.init_pair(COLOR_SUCCESS, curses.COLOR_GREEN, -1)
|
|
curses.init_pair(COLOR_BORDER, curses.COLOR_BLUE, -1)
|
|
curses.init_pair(COLOR_TITLE, curses.COLOR_MAGENTA, -1)
|
|
|
|
def draw_border(win, y, x, h, w):
|
|
"""Draw a border around a region of the window."""
|
|
# Draw corners
|
|
win.addch(y, x, curses.ACS_ULCORNER, curses.color_pair(COLOR_BORDER))
|
|
win.addch(y, x + w - 1, curses.ACS_URCORNER, curses.color_pair(COLOR_BORDER))
|
|
win.addch(y + h - 1, x, curses.ACS_LLCORNER, curses.color_pair(COLOR_BORDER))
|
|
|
|
# Draw bottom-right corner safely
|
|
try:
|
|
win.addch(y + h - 1, x + w - 1, curses.ACS_LRCORNER, curses.color_pair(COLOR_BORDER))
|
|
except curses.error:
|
|
# This is expected when trying to write to the bottom-right corner
|
|
pass
|
|
|
|
# Draw horizontal lines
|
|
for i in range(1, w - 1):
|
|
win.addch(y, x + i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER))
|
|
win.addch(y + h - 1, x + i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER))
|
|
|
|
# Draw vertical lines
|
|
for i in range(1, h - 1):
|
|
win.addch(y + i, x, curses.ACS_VLINE, curses.color_pair(COLOR_BORDER))
|
|
win.addch(y + i, x + w - 1, curses.ACS_VLINE, curses.color_pair(COLOR_BORDER))
|
|
|
|
class ScreenBase:
|
|
def __init__(self, stdscr):
|
|
self.stdscr = stdscr
|
|
self.status = ""
|
|
self.status_type = "normal" # "normal", "error", "success"
|
|
|
|
def draw_status(self, height, width):
|
|
if self.status:
|
|
color = COLOR_STATUS
|
|
if self.status_type == "error":
|
|
color = COLOR_ERROR
|
|
elif self.status_type == "success":
|
|
color = COLOR_SUCCESS
|
|
self.stdscr.addstr(height-1, 0, self.status[:max(0, width-1)], curses.color_pair(color))
|
|
else:
|
|
self.stdscr.addstr(height-1, 0, "q: quit, Backspace: back, Enter: select", curses.color_pair(COLOR_STATUS))
|
|
|
|
def set_status(self, text: str, status_type="normal"):
|
|
self.status = text
|
|
self.status_type = status_type
|
|
|
|
def run(self):
|
|
raise NotImplementedError
|
|
|
|
def prompt_input(stdscr, prompt: str) -> Optional[str]:
|
|
curses.echo()
|
|
stdscr.addstr(prompt, curses.color_pair(COLOR_HEADER))
|
|
stdscr.clrtoeol()
|
|
s = stdscr.getstr().decode("utf-8")
|
|
curses.noecho()
|
|
return s
|
|
|
|
def show_popup(stdscr, lines: List[str], title: str = ""):
|
|
h, w = stdscr.getmaxyx()
|
|
box_h = min(len(lines)+4, h-2)
|
|
box_w = min(max(max(len(l) for l in lines), len(title))+6, w-2)
|
|
top = (h - box_h)//2
|
|
left = (w - box_w)//2
|
|
win = curses.newwin(box_h, box_w, top, left)
|
|
|
|
# Draw fancy border
|
|
draw_border(win, 0, 0, box_h, box_w)
|
|
|
|
# Add title if provided
|
|
if title:
|
|
title_x = (box_w - len(title)) // 2
|
|
win.addstr(0, title_x, f" {title} ", curses.color_pair(COLOR_TITLE))
|
|
|
|
# Add content
|
|
for i, line in enumerate(lines, start=1):
|
|
if i >= box_h-1:
|
|
break
|
|
win.addstr(i, 2, line[:box_w-4], curses.color_pair(COLOR_NORMAL))
|
|
|
|
# Add footer
|
|
footer = "Press any key to continue"
|
|
footer_x = (box_w - len(footer)) // 2
|
|
win.addstr(box_h-1, footer_x, footer, curses.color_pair(COLOR_STATUS))
|
|
|
|
win.refresh()
|
|
win.getch()
|
|
|
|
# ------------------------------ Screens ------------------------------
|
|
|
|
class PackagesScreen(ScreenBase):
|
|
def __init__(self, stdscr):
|
|
super().__init__(stdscr)
|
|
self.packages = find_packages()
|
|
self.idx = 0
|
|
self.filter_mode = "all" # "all", "regular", "python"
|
|
self.scroll_offset = 0 # Add scroll offset to handle long lists
|
|
|
|
def run(self):
|
|
while True:
|
|
self.stdscr.clear()
|
|
h, w = self.stdscr.getmaxyx()
|
|
|
|
# Determine split layout
|
|
left_w = max(30, min(60, w // 3))
|
|
right_x = left_w + 1
|
|
right_w = max(0, w - right_x)
|
|
|
|
# Draw borders for left and right panes
|
|
draw_border(self.stdscr, 0, 0, h-1, left_w)
|
|
if right_w >= 20:
|
|
draw_border(self.stdscr, 0, right_x, h-1, right_w)
|
|
|
|
# Left pane: package list
|
|
title = "Packages"
|
|
if self.filter_mode == "regular":
|
|
title = "Packages (version.json)"
|
|
elif self.filter_mode == "python":
|
|
title = "Python Packages"
|
|
else:
|
|
title = "All Packages [f to filter]"
|
|
|
|
# Center the title in the left pane
|
|
title_x = (left_w - len(title)) // 2
|
|
self.stdscr.addstr(0, title_x, f" {title} ", curses.color_pair(COLOR_TITLE) | curses.A_BOLD)
|
|
|
|
# Filter packages based on mode
|
|
filtered_packages = self.packages
|
|
if self.filter_mode == "regular":
|
|
filtered_packages = [p for p in self.packages if not p[2]] # Not Python packages
|
|
elif self.filter_mode == "python":
|
|
filtered_packages = [p for p in self.packages if p[2]] # Only Python packages
|
|
|
|
# Implement scrolling for long lists
|
|
max_rows = h - 3
|
|
total_packages = len(filtered_packages)
|
|
|
|
# Adjust scroll offset if needed
|
|
if self.idx >= self.scroll_offset + max_rows:
|
|
self.scroll_offset = self.idx - max_rows + 1
|
|
elif self.idx < self.scroll_offset:
|
|
self.scroll_offset = self.idx
|
|
|
|
# Display visible packages with scroll offset
|
|
visible_packages = filtered_packages[self.scroll_offset:self.scroll_offset + max_rows]
|
|
|
|
# Show scroll indicators if needed
|
|
if self.scroll_offset > 0:
|
|
self.stdscr.addstr(1, left_w - 3, "↑", curses.color_pair(COLOR_STATUS))
|
|
if self.scroll_offset + max_rows < total_packages:
|
|
self.stdscr.addstr(min(1 + len(visible_packages), h - 2), left_w - 3, "↓", curses.color_pair(COLOR_STATUS))
|
|
|
|
for i, (name, _path, is_python, is_homeassistant) in enumerate(visible_packages, start=0):
|
|
# Use consistent display style for all packages
|
|
pkg_type = "" # Remove the [Py] prefix for consistent display
|
|
|
|
# Highlight the selected item
|
|
if i + self.scroll_offset == self.idx:
|
|
attr = curses.color_pair(COLOR_HIGHLIGHT)
|
|
sel = "►" # Use a fancier selector
|
|
else:
|
|
attr = curses.color_pair(COLOR_NORMAL)
|
|
sel = " "
|
|
|
|
# Add a small icon for Python packages or Home Assistant components
|
|
if is_python:
|
|
pkg_type = "🐍 " # Python icon
|
|
elif is_homeassistant:
|
|
pkg_type = "🏠 " # Home Assistant icon
|
|
|
|
self.stdscr.addstr(1 + i, 2, f"{sel} {pkg_type}{name}"[:max(0, left_w-5)], attr)
|
|
|
|
# Right pane: preview of selected package (non-interactive summary)
|
|
if right_w >= 20 and filtered_packages:
|
|
try:
|
|
name, path, is_python, is_homeassistant = filtered_packages[self.idx]
|
|
|
|
# Center the package name in the right pane header
|
|
title_x = right_x + (right_w - len(name)) // 2
|
|
self.stdscr.addstr(0, title_x, f" {name} ", curses.color_pair(COLOR_TITLE) | curses.A_BOLD)
|
|
|
|
# Path with a nice label
|
|
self.stdscr.addstr(1, right_x + 2, "Path:", curses.color_pair(COLOR_HEADER))
|
|
self.stdscr.addstr(1, right_x + 8, f"{path}"[:max(0, right_w-10)], curses.color_pair(COLOR_NORMAL))
|
|
|
|
# Sources header
|
|
self.stdscr.addstr(2, right_x + 2, "Sources:", curses.color_pair(COLOR_HEADER))
|
|
|
|
if is_python:
|
|
spec = parse_python_package(path)
|
|
elif is_homeassistant:
|
|
spec = parse_homeassistant_component(path)
|
|
else:
|
|
spec = load_json(path)
|
|
merged_vars, merged_srcs, _ = merged_view(spec, None)
|
|
snames = sorted(list(merged_srcs.keys()))
|
|
max_src_rows = max(0, h - 6)
|
|
for i2, sname in enumerate(snames[:max_src_rows], start=0):
|
|
comp = merged_srcs[sname]
|
|
fetcher = comp.get("fetcher", "none")
|
|
# Construct concise reference similar to detail view
|
|
display_ref = comp.get("tag") or comp.get("rev") or comp.get("version") or ""
|
|
if fetcher == "github":
|
|
rendered = render_templates(comp, merged_vars)
|
|
tag = rendered.get("tag")
|
|
rev = rendered.get("rev")
|
|
owner = (rendered.get("owner") or merged_vars.get("owner") or "")
|
|
repo = (rendered.get("repo") or merged_vars.get("repo") or "")
|
|
if tag and owner and repo:
|
|
display_ref = f"{owner}/{repo}@{tag}"
|
|
elif tag:
|
|
display_ref = tag
|
|
elif rev and owner and repo:
|
|
display_ref = f"{owner}/{repo}@{rev[:7]}"
|
|
elif rev:
|
|
display_ref = rev[:12]
|
|
elif fetcher == "url":
|
|
rendered = render_templates(comp, merged_vars)
|
|
url = rendered.get("url") or rendered.get("urlTemplate") or ""
|
|
if url:
|
|
owner = str(merged_vars.get("owner", "") or "")
|
|
repo = str(merged_vars.get("repo", "") or "")
|
|
rp = str(merged_vars.get("releasePrefix", "") or "")
|
|
rs = str(merged_vars.get("releaseSuffix", "") or "")
|
|
base = str(merged_vars.get("base", "") or "")
|
|
rel = str(merged_vars.get("release", "") or "")
|
|
tag = f"{rp}{base}-{rel}{rs}" if (base and rel) else ""
|
|
parsed = urlparse(url)
|
|
filename = os.path.basename(parsed.path) if parsed and parsed.path else ""
|
|
if owner and repo and tag and filename:
|
|
display_ref = f"{owner}/{repo}@{tag} · {filename}"
|
|
elif filename:
|
|
display_ref = filename
|
|
else:
|
|
display_ref = url
|
|
else:
|
|
display_ref = ""
|
|
# Truncate reference to fit right pane
|
|
if isinstance(display_ref, str):
|
|
max_ref = max(0, right_w - 30)
|
|
ref_short = (display_ref[:max_ref] + ("..." if len(display_ref) > max_ref else ""))
|
|
else:
|
|
ref_short = display_ref
|
|
|
|
# Color-code the fetcher type
|
|
fetcher_color = COLOR_NORMAL
|
|
if fetcher == "github":
|
|
fetcher_color = COLOR_SUCCESS
|
|
elif fetcher == "url":
|
|
fetcher_color = COLOR_STATUS
|
|
elif fetcher == "git":
|
|
fetcher_color = COLOR_HEADER
|
|
|
|
# Display source name
|
|
self.stdscr.addstr(3 + i2, right_x + 2, f"{sname:<18}", curses.color_pair(COLOR_NORMAL))
|
|
|
|
# Display fetcher with color
|
|
self.stdscr.addstr(3 + i2, right_x + 21, f"{fetcher:<7}", curses.color_pair(fetcher_color))
|
|
|
|
# Display reference
|
|
self.stdscr.addstr(3 + i2, right_x + 29, f"{ref_short}"[:max(0, right_w-31)], curses.color_pair(COLOR_NORMAL))
|
|
|
|
# Hint line for workflow
|
|
hint = "Enter: open details | k/j: move | q: quit"
|
|
if h >= 5:
|
|
hint_x = right_x + (right_w - len(hint)) // 2
|
|
self.stdscr.addstr(h - 5, hint_x, hint[:max(0, right_w-1)], curses.color_pair(COLOR_STATUS))
|
|
except Exception as e:
|
|
self.stdscr.addstr(2, right_x + 2, "Error:", curses.color_pair(COLOR_ERROR))
|
|
self.stdscr.addstr(2, right_x + 9, f"{e}"[:max(0, right_w-11)], curses.color_pair(COLOR_ERROR))
|
|
|
|
self.draw_status(h, w)
|
|
self.stdscr.refresh()
|
|
ch = self.stdscr.getch()
|
|
if ch in (ord('q'), 27): # q or ESC
|
|
return None
|
|
elif ch in (curses.KEY_UP, ord('k')):
|
|
self.idx = max(0, self.idx-1)
|
|
elif ch in (curses.KEY_DOWN, ord('j')):
|
|
self.idx = min(len(self.packages)-1, self.idx+1)
|
|
elif ch == curses.KEY_PPAGE: # Page Up
|
|
self.idx = max(0, self.idx - (h - 4))
|
|
elif ch == curses.KEY_NPAGE: # Page Down
|
|
self.idx = min(len(self.packages)-1, self.idx + (h - 4))
|
|
elif ch == ord('g'): # Go to top
|
|
self.idx = 0
|
|
elif ch == ord('G'): # Go to bottom
|
|
self.idx = len(self.packages)-1
|
|
elif ch == ord('f'):
|
|
# Cycle through filter modes
|
|
if self.filter_mode == "all":
|
|
self.filter_mode = "regular"
|
|
elif self.filter_mode == "regular":
|
|
self.filter_mode = "python"
|
|
else:
|
|
self.filter_mode = "all"
|
|
self.idx = 0 # Reset selection when changing filters
|
|
elif ch in (curses.KEY_ENTER, 10, 13):
|
|
filtered_packages = self.packages
|
|
if self.filter_mode == "regular":
|
|
filtered_packages = [p for p in self.packages if not p[2]]
|
|
elif self.filter_mode == "python":
|
|
filtered_packages = [p for p in self.packages if p[2]]
|
|
|
|
if not filtered_packages:
|
|
continue
|
|
|
|
name, path, is_python, is_homeassistant = filtered_packages[self.idx]
|
|
try:
|
|
if is_python:
|
|
spec = parse_python_package(path)
|
|
elif is_homeassistant:
|
|
spec = parse_homeassistant_component(path)
|
|
else:
|
|
spec = load_json(path)
|
|
except Exception as e:
|
|
self.set_status(f"Failed to load {path}: {e}")
|
|
continue
|
|
screen = PackageDetailScreen(self.stdscr, name, path, spec, is_python, is_homeassistant)
|
|
ret = screen.run()
|
|
if ret == "reload":
|
|
# re-scan on save
|
|
self.packages = find_packages()
|
|
self.idx = min(self.idx, len(self.packages)-1)
|
|
else:
|
|
pass
|
|
|
|
class PackageDetailScreen(ScreenBase):
|
|
def __init__(self, stdscr, pkg_name: str, path: Path, spec: Json, is_python: bool = False, is_homeassistant: bool = False):
|
|
super().__init__(stdscr)
|
|
self.pkg_name = pkg_name
|
|
self.path = path
|
|
self.spec = spec
|
|
self.is_python = is_python
|
|
self.is_homeassistant = is_homeassistant
|
|
self.variants = ["<base>"] + sorted(list(self.spec.get("variants", {}).keys()))
|
|
self.vidx = 0
|
|
self.gh_token = os.environ.get("GITHUB_TOKEN")
|
|
self.candidates: Dict[str, Dict[str, str]] = {} # name -> {release, tag, commit}
|
|
self.url_candidates: Dict[str, Dict[str, str]] = {} # name -> {base, release, tag}
|
|
# initialize view
|
|
self.recompute_view()
|
|
|
|
def select_variant(self):
|
|
# Recompute merged and target views when variant changes
|
|
self.recompute_view()
|
|
|
|
def recompute_view(self):
|
|
# Set cursor to base or selected variant dict for manual edits
|
|
if self.vidx == 0:
|
|
self.cursor = self.spec
|
|
variant_name = None
|
|
else:
|
|
variant_name = self.variants[self.vidx]
|
|
self.cursor = self.spec["variants"][variant_name]
|
|
# Compute merged view and target dict for writing
|
|
self.merged_vars, self.merged_srcs, self.target_dict = merged_view(self.spec, variant_name)
|
|
self.snames = sorted(list(self.merged_srcs.keys()))
|
|
self.sidx = 0
|
|
|
|
def fetch_candidates_for(self, name: str):
|
|
comp = self.merged_srcs[name]
|
|
fetcher = comp.get("fetcher", "none")
|
|
c = {"release": "", "tag": "", "commit": ""}
|
|
if fetcher == "github":
|
|
owner = comp.get("owner")
|
|
repo = comp.get("repo")
|
|
if owner and repo:
|
|
r = gh_latest_release(owner, repo, self.gh_token)
|
|
if r:
|
|
c["release"] = r
|
|
t = gh_latest_tag(owner, repo, self.gh_token)
|
|
if t:
|
|
c["tag"] = t
|
|
m = gh_head_commit(owner, repo)
|
|
if m:
|
|
c["commit"] = m
|
|
|
|
# Special-case raspberrypi/linux: prefer latest stable_* tag or series-specific tags
|
|
try:
|
|
if owner == "raspberrypi" and repo == "linux":
|
|
tags_all = gh_list_tags(owner, repo, self.gh_token)
|
|
rendered = render_templates(comp, self.merged_vars)
|
|
cur_tag = str(rendered.get("tag") or "")
|
|
# If current tag uses stable_YYYYMMDD scheme, pick latest stable_* tag
|
|
if cur_tag.startswith("stable_"):
|
|
stable_tags = sorted(
|
|
[x for x in tags_all if re.match(r"^stable_\d{8}$", x)],
|
|
reverse=True,
|
|
)
|
|
if stable_tags:
|
|
c["tag"] = stable_tags[0]
|
|
else:
|
|
# Try to pick a tag matching the current major.minor series if available
|
|
mm = str(self.merged_vars.get("modDirVersion") or "")
|
|
m2 = re.match(r"^(\d+)\.(\d+)", mm)
|
|
if m2:
|
|
base = f"rpi-{m2.group(1)}.{m2.group(2)}"
|
|
series_tags = [x for x in tags_all if (
|
|
x == f"{base}.y"
|
|
or x.startswith(f"{base}.y")
|
|
or x.startswith(f"{base}.")
|
|
)]
|
|
series_tags.sort(reverse=True)
|
|
if series_tags:
|
|
c["tag"] = series_tags[0]
|
|
except Exception as _e:
|
|
# Fallback to previously computed values
|
|
pass
|
|
elif fetcher == "git":
|
|
url = comp.get("url")
|
|
if url:
|
|
out = run_get_stdout(["git", "ls-remote", url, "HEAD"])
|
|
if out:
|
|
c["commit"] = out.split()[0]
|
|
elif fetcher == "url":
|
|
# Heuristic for GitHub release assets with variables in version.json (e.g., proton-cachyos)
|
|
owner = self.merged_vars.get("owner")
|
|
repo = self.merged_vars.get("repo")
|
|
if owner and repo:
|
|
tags = gh_release_tags_api(str(owner), str(repo), self.gh_token)
|
|
prefix = str(self.merged_vars.get("releasePrefix", ""))
|
|
suffix = str(self.merged_vars.get("releaseSuffix", ""))
|
|
latest = next((t for t in tags if (t and t.startswith(prefix) and t.endswith(suffix))), None)
|
|
if latest:
|
|
c["release"] = latest
|
|
mid = latest
|
|
if prefix and mid.startswith(prefix):
|
|
mid = mid[len(prefix):]
|
|
if suffix and mid.endswith(suffix):
|
|
mid = mid[:-len(suffix)]
|
|
parts = mid.split("-")
|
|
if len(parts) >= 2:
|
|
base, rel = parts[0], parts[-1]
|
|
self.url_candidates[name] = {"base": base, "release": rel, "tag": latest}
|
|
self.candidates[name] = c
|
|
|
|
def prefetch_hash_for(self, name: str) -> Optional[str]:
|
|
comp = self.merged_srcs[name]
|
|
fetcher = comp.get("fetcher", "none")
|
|
if fetcher == "github":
|
|
owner = comp.get("owner")
|
|
repo = comp.get("repo")
|
|
rendered = render_templates(comp, self.merged_vars)
|
|
ref = rendered.get("tag") or rendered.get("rev")
|
|
if owner and repo and ref:
|
|
url = gh_tarball_url(owner, repo, ref)
|
|
return nix_prefetch_url(url)
|
|
elif fetcher == "git":
|
|
url = comp.get("url")
|
|
rev = comp.get("rev")
|
|
if url and rev:
|
|
return nix_prefetch_git(url, rev)
|
|
elif fetcher == "url":
|
|
rendered = render_templates(comp, self.merged_vars)
|
|
url = rendered.get("url") or rendered.get("urlTemplate")
|
|
if url:
|
|
return nix_prefetch_url(url)
|
|
return None
|
|
|
|
def cachyos_suffix(self) -> str:
|
|
if self.vidx == 0:
|
|
return ""
|
|
v = self.variants[self.vidx]
|
|
mapping = {"rc": "-rc", "hardened": "-hardened", "lts": "-lts"}
|
|
return mapping.get(v, "")
|
|
|
|
def fetch_cachyos_linux_latest(self, suffix: str) -> Optional[str]:
|
|
"""
|
|
Try to determine latest linux version from upstream:
|
|
- Prefer .SRCINFO (preprocessed)
|
|
- Fallback to PKGBUILD (parse pkgver= line)
|
|
Tries both 'CachyOS' and 'cachyos' org casing just in case.
|
|
"""
|
|
bases = [
|
|
"https://raw.githubusercontent.com/CachyOS/linux-cachyos/master",
|
|
"https://raw.githubusercontent.com/cachyos/linux-cachyos/master",
|
|
]
|
|
paths = [
|
|
f"linux-cachyos{suffix}/.SRCINFO",
|
|
f"linux-cachyos{suffix}/PKGBUILD",
|
|
]
|
|
|
|
def parse_srcinfo(text: str) -> Optional[str]:
|
|
m = re.search(r"^\s*pkgver\s*=\s*([^\s#]+)\s*$", text, re.MULTILINE)
|
|
if not m:
|
|
return None
|
|
v = m.group(1).strip()
|
|
return v
|
|
|
|
def parse_pkgbuild(text: str) -> Optional[str]:
|
|
# Parse assignments and expand variables in pkgver
|
|
# Build a simple env map from VAR=value lines
|
|
env: Dict[str, str] = {}
|
|
for line in text.splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
m_assign = re.match(r'^\s*([A-Za-z_][A-Za-z0-9_]*)\s*=\s*(.+)$', line)
|
|
if m_assign:
|
|
key = m_assign.group(1)
|
|
val = m_assign.group(2).strip()
|
|
# Remove trailing comments
|
|
val = re.sub(r'\s+#.*$', '', val).strip()
|
|
# Strip surrounding quotes
|
|
if (val.startswith('"') and val.endswith('"')) or (val.startswith("'") and val.endswith("'")):
|
|
val = val[1:-1]
|
|
env[key] = val
|
|
|
|
m = re.search(r"^\s*pkgver\s*=\s*(.+)$", text, re.MULTILINE)
|
|
if not m:
|
|
return None
|
|
raw = m.group(1).strip()
|
|
# Strip quotes
|
|
if (raw.startswith('"') and raw.endswith('"')) or (raw.startswith("'") and raw.endswith("'")):
|
|
raw = raw[1:-1]
|
|
|
|
def expand_vars(s: str) -> str:
|
|
def repl_braced(mb):
|
|
key = mb.group(1)
|
|
return env.get(key, mb.group(0))
|
|
def repl_unbraced(mu):
|
|
key = mu.group(1)
|
|
return env.get(key, mu.group(0))
|
|
# Expand ${var} then $var
|
|
s = re.sub(r"\$\{([^}]+)\}", repl_braced, s)
|
|
s = re.sub(r"\$([A-Za-z_][A-Za-z0-9_]*)", repl_unbraced, s)
|
|
return s
|
|
|
|
v = expand_vars(raw).strip()
|
|
# normalize rc form like 6.19.rc6 -> 6.19-rc6
|
|
v = v.replace(".rc", "-rc")
|
|
return v
|
|
|
|
# Try .SRCINFO first, then PKGBUILD
|
|
for base in bases:
|
|
# .SRCINFO
|
|
url = f"{base}/{paths[0]}"
|
|
text = http_get_text(url)
|
|
if text:
|
|
ver = parse_srcinfo(text)
|
|
if ver:
|
|
return ver.replace(".rc", "-rc")
|
|
# PKGBUILD fallback
|
|
url = f"{base}/{paths[1]}"
|
|
text = http_get_text(url)
|
|
if text:
|
|
ver = parse_pkgbuild(text)
|
|
if ver:
|
|
return ver.replace(".rc", "-rc")
|
|
|
|
return None
|
|
|
|
def linux_tarball_url_for_version(self, version: str) -> str:
|
|
# Use torvalds snapshot for -rc, stable releases from CDN
|
|
if "-rc" in version:
|
|
return f"https://git.kernel.org/torvalds/t/linux-{version}.tar.gz"
|
|
parts = version.split(".")
|
|
major = parts[0] if parts else "6"
|
|
major_minor = ".".join(parts[:2]) if len(parts) >= 2 else version
|
|
ver_for_tar = major_minor if version.endswith(".0") else version
|
|
return f"https://cdn.kernel.org/pub/linux/kernel/v{major}.x/linux-{ver_for_tar}.tar.xz"
|
|
|
|
def update_linux_from_pkgbuild(self, name: str):
|
|
suffix = self.cachyos_suffix()
|
|
latest = self.fetch_cachyos_linux_latest(suffix)
|
|
if not latest:
|
|
self.set_status("linux: failed to get version from PKGBUILD")
|
|
return
|
|
url = self.linux_tarball_url_for_version(latest)
|
|
sri = nix_prefetch_url(url)
|
|
if not sri:
|
|
self.set_status("linux: prefetch failed")
|
|
return
|
|
ts = self.target_dict.setdefault("sources", {})
|
|
compw = ts.setdefault(name, {})
|
|
compw["version"] = latest
|
|
compw["hash"] = sri
|
|
self.set_status(f"{name}: updated version to {latest} and refreshed hash")
|
|
|
|
def set_ref(self, name: str, kind: str, value: str):
|
|
# Write to selected target dict (base or variant override)
|
|
ts = self.target_dict.setdefault("sources", {})
|
|
comp = ts.setdefault(name, {})
|
|
if kind in ("release", "tag"):
|
|
comp["tag"] = value
|
|
if "rev" in comp:
|
|
del comp["rev"]
|
|
elif kind == "commit":
|
|
comp["rev"] = value
|
|
if "tag" in comp:
|
|
del comp["tag"]
|
|
|
|
def save(self):
|
|
if self.is_python:
|
|
# For Python packages, update the default.nix file
|
|
for name in self.snames:
|
|
source = self.merged_srcs[name]
|
|
updates = {}
|
|
|
|
# Get version from variables
|
|
if "version" in self.merged_vars:
|
|
updates["version"] = self.merged_vars["version"]
|
|
|
|
# Get hash from source
|
|
if "hash" in source:
|
|
updates["hash"] = source["hash"]
|
|
|
|
# Get tag from source
|
|
if "tag" in source:
|
|
updates["tag"] = source["tag"]
|
|
|
|
# Get rev from source
|
|
if "rev" in source:
|
|
updates["rev"] = source["rev"]
|
|
|
|
if updates:
|
|
update_python_package(self.path, name, updates)
|
|
return True
|
|
elif self.is_homeassistant:
|
|
# For Home Assistant components, update the default.nix file
|
|
for name in self.snames:
|
|
source = self.merged_srcs[name]
|
|
updates = {}
|
|
|
|
# Get version from variables
|
|
if "version" in self.merged_vars:
|
|
updates["version"] = self.merged_vars["version"]
|
|
|
|
# Get hash from source
|
|
if "hash" in source:
|
|
updates["hash"] = source["hash"]
|
|
|
|
# Get tag from source
|
|
if "tag" in source:
|
|
updates["tag"] = source["tag"]
|
|
|
|
# Get rev from source
|
|
if "rev" in source:
|
|
updates["rev"] = source["rev"]
|
|
|
|
if updates:
|
|
update_homeassistant_component(self.path, name, updates)
|
|
return True
|
|
else:
|
|
# For regular packages, save to version.json
|
|
save_json(self.path, self.spec)
|
|
return True
|
|
|
|
def run(self):
|
|
while True:
|
|
self.stdscr.clear()
|
|
h, w = self.stdscr.getmaxyx()
|
|
|
|
# Draw main border around the entire screen
|
|
draw_border(self.stdscr, 0, 0, h-1, w)
|
|
|
|
# Title with package name and path
|
|
title = f"{self.pkg_name} [{self.path}]"
|
|
if self.is_python:
|
|
title += " [Python Package]"
|
|
|
|
# Center the title
|
|
title_x = (w - len(title)) // 2
|
|
self.stdscr.addstr(0, title_x, f" {title} ", curses.color_pair(COLOR_TITLE) | curses.A_BOLD)
|
|
|
|
# Variant line with highlighting for selected variant
|
|
if not self.is_python:
|
|
vline_parts = []
|
|
for i, v in enumerate(self.variants):
|
|
if i == self.vidx:
|
|
vline_parts.append(f"[{v}]")
|
|
else:
|
|
vline_parts.append(v)
|
|
|
|
vline = "Variants: " + " | ".join(vline_parts)
|
|
self.stdscr.addstr(1, 2, "Variants:", curses.color_pair(COLOR_HEADER))
|
|
|
|
# Display each variant with appropriate highlighting
|
|
x_pos = 12 # Position after "Variants: "
|
|
for i, v in enumerate(self.variants):
|
|
if i > 0:
|
|
self.stdscr.addstr(1, x_pos, " | ", curses.color_pair(COLOR_NORMAL))
|
|
x_pos += 3
|
|
|
|
if i == self.vidx:
|
|
self.stdscr.addstr(1, x_pos, f"[{v}]", curses.color_pair(COLOR_HIGHLIGHT))
|
|
x_pos += len(f"[{v}]")
|
|
else:
|
|
self.stdscr.addstr(1, x_pos, v, curses.color_pair(COLOR_NORMAL))
|
|
x_pos += len(v)
|
|
else:
|
|
# For Python packages, show version instead of variants
|
|
version = self.merged_vars.get("version", "")
|
|
self.stdscr.addstr(1, 2, "Version:", curses.color_pair(COLOR_HEADER))
|
|
self.stdscr.addstr(1, 11, version, curses.color_pair(COLOR_SUCCESS))
|
|
|
|
# Sources header with decoration
|
|
self.stdscr.addstr(2, 2, "Sources:", curses.color_pair(COLOR_HEADER) | curses.A_BOLD)
|
|
|
|
# Draw a separator line under the header
|
|
for i in range(1, w-1):
|
|
self.stdscr.addch(3, i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER))
|
|
# List sources
|
|
for i, name in enumerate(self.snames[:h-10], start=0):
|
|
comp = self.merged_srcs[name]
|
|
fetcher = comp.get("fetcher", "none")
|
|
# Render refs so variables resolve; compress long forms for display
|
|
display_ref = comp.get("tag") or comp.get("rev") or comp.get("version") or ""
|
|
if fetcher == "github":
|
|
rendered = render_templates(comp, self.merged_vars)
|
|
tag = rendered.get("tag")
|
|
rev = rendered.get("rev")
|
|
owner = (rendered.get("owner") or self.merged_vars.get("owner") or "")
|
|
repo = (rendered.get("repo") or self.merged_vars.get("repo") or "")
|
|
if tag and owner and repo:
|
|
display_ref = f"{owner}/{repo}@{tag}"
|
|
elif tag:
|
|
display_ref = tag
|
|
elif rev and owner and repo:
|
|
display_ref = f"{owner}/{repo}@{rev[:7]}"
|
|
elif rev:
|
|
display_ref = rev[:12]
|
|
elif fetcher == "url":
|
|
rendered = render_templates(comp, self.merged_vars)
|
|
url = rendered.get("url") or rendered.get("urlTemplate") or ""
|
|
if url:
|
|
# Prefer a concise label like owner/repo@tag · filename
|
|
owner = str(self.merged_vars.get("owner", "") or "")
|
|
repo = str(self.merged_vars.get("repo", "") or "")
|
|
rp = str(self.merged_vars.get("releasePrefix", "") or "")
|
|
rs = str(self.merged_vars.get("releaseSuffix", "") or "")
|
|
base = str(self.merged_vars.get("base", "") or "")
|
|
rel = str(self.merged_vars.get("release", "") or "")
|
|
tag = f"{rp}{base}-{rel}{rs}" if (base and rel) else ""
|
|
parsed = urlparse(url)
|
|
filename = os.path.basename(parsed.path) if parsed and parsed.path else ""
|
|
if owner and repo and tag and filename:
|
|
display_ref = f"{owner}/{repo}@{tag} · {filename}"
|
|
elif filename:
|
|
display_ref = filename
|
|
else:
|
|
display_ref = url
|
|
else:
|
|
display_ref = ""
|
|
ref_short = display_ref if not isinstance(display_ref, str) else (display_ref[:60] + ("..." if len(display_ref) > 60 else ""))
|
|
|
|
# Determine colors and styles based on selection and fetcher type
|
|
if i == self.sidx:
|
|
# Selected item
|
|
attr = curses.color_pair(COLOR_HIGHLIGHT)
|
|
sel = "►" # Use a fancier selector
|
|
else:
|
|
# Non-selected item
|
|
attr = curses.color_pair(COLOR_NORMAL)
|
|
sel = " "
|
|
|
|
# Determine fetcher color
|
|
fetcher_color = COLOR_NORMAL
|
|
if fetcher == "github":
|
|
fetcher_color = COLOR_SUCCESS
|
|
elif fetcher == "url":
|
|
fetcher_color = COLOR_STATUS
|
|
elif fetcher == "git":
|
|
fetcher_color = COLOR_HEADER
|
|
|
|
# Display source name with selection indicator
|
|
self.stdscr.addstr(4+i, 2, f"{sel} {name:<20}", attr)
|
|
|
|
# Display fetcher with appropriate color
|
|
self.stdscr.addstr(4+i, 24, fetcher, curses.color_pair(fetcher_color))
|
|
|
|
# Display reference
|
|
self.stdscr.addstr(4+i, 32, f"ref={ref_short}"[:w-34], curses.color_pair(COLOR_NORMAL))
|
|
|
|
# Draw a separator line before the latest candidates section
|
|
y_latest = h - 8
|
|
for i in range(1, w-1):
|
|
self.stdscr.addch(y_latest, i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER))
|
|
|
|
# Latest candidates section for selected component (auto-fetched)
|
|
if self.snames:
|
|
_sel_name = self.snames[self.sidx]
|
|
_comp = self.merged_srcs[_sel_name]
|
|
_fetcher = _comp.get("fetcher", "none")
|
|
# Preload candidates lazily for selected item
|
|
if _fetcher in ("github", "git", "url") and _sel_name not in self.candidates:
|
|
self.fetch_candidates_for(_sel_name)
|
|
|
|
# Latest header with decoration
|
|
self.stdscr.addstr(y_latest+1, 2, "Latest Versions:", curses.color_pair(COLOR_HEADER) | curses.A_BOLD)
|
|
|
|
if _fetcher in ("github", "git"):
|
|
_cand = self.candidates.get(_sel_name, {})
|
|
|
|
# Display each candidate with appropriate color
|
|
if _cand.get('release'):
|
|
self.stdscr.addstr(y_latest+2, 4, "Release:", curses.color_pair(COLOR_HEADER))
|
|
self.stdscr.addstr(y_latest+2, 13, _cand.get('release'), curses.color_pair(COLOR_SUCCESS))
|
|
|
|
if _cand.get('tag'):
|
|
self.stdscr.addstr(y_latest+2, 30, "Tag:", curses.color_pair(COLOR_HEADER))
|
|
self.stdscr.addstr(y_latest+2, 35, _cand.get('tag'), curses.color_pair(COLOR_SUCCESS))
|
|
|
|
if _cand.get('commit'):
|
|
self.stdscr.addstr(y_latest+3, 4, "Commit:", curses.color_pair(COLOR_HEADER))
|
|
self.stdscr.addstr(y_latest+3, 12, (_cand.get('commit') or '')[:12], curses.color_pair(COLOR_NORMAL))
|
|
|
|
elif _fetcher == "url":
|
|
_cand_u = self.url_candidates.get(_sel_name, {}) or {}
|
|
_tag = _cand_u.get("tag") or (self.candidates.get(_sel_name, {}).get("release") or "-")
|
|
|
|
if _tag != "-":
|
|
self.stdscr.addstr(y_latest+2, 4, "Tag:", curses.color_pair(COLOR_HEADER))
|
|
self.stdscr.addstr(y_latest+2, 9, _tag, curses.color_pair(COLOR_SUCCESS))
|
|
|
|
if _cand_u.get('base'):
|
|
self.stdscr.addstr(y_latest+2, 30, "Base:", curses.color_pair(COLOR_HEADER))
|
|
self.stdscr.addstr(y_latest+2, 36, _cand_u.get('base'), curses.color_pair(COLOR_NORMAL))
|
|
|
|
if _cand_u.get('release'):
|
|
self.stdscr.addstr(y_latest+3, 4, "Release:", curses.color_pair(COLOR_HEADER))
|
|
self.stdscr.addstr(y_latest+3, 13, _cand_u.get('release'), curses.color_pair(COLOR_NORMAL))
|
|
|
|
else:
|
|
if self.pkg_name == "linux-cachyos" and _sel_name == "linux":
|
|
_suffix = self.cachyos_suffix()
|
|
_latest = self.fetch_cachyos_linux_latest(_suffix)
|
|
self.stdscr.addstr(y_latest+2, 4, "Linux from PKGBUILD:", curses.color_pair(COLOR_HEADER))
|
|
if _latest:
|
|
self.stdscr.addstr(y_latest+2, 24, _latest, curses.color_pair(COLOR_SUCCESS))
|
|
else:
|
|
self.stdscr.addstr(y_latest+2, 24, "-", curses.color_pair(COLOR_NORMAL))
|
|
else:
|
|
self.stdscr.addstr(y_latest+2, 4, "No candidates available", curses.color_pair(COLOR_NORMAL))
|
|
|
|
# Draw a separator line before the footer
|
|
for i in range(1, w-1):
|
|
self.stdscr.addch(h-5, i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER))
|
|
|
|
# Footer instructions with better formatting
|
|
footer = "Enter: component actions | r: refresh | h: hash | e: edit | s: save | Backspace: back | q: quit"
|
|
footer_x = (w - len(footer)) // 2
|
|
self.stdscr.addstr(h-4, footer_x, footer, curses.color_pair(COLOR_STATUS))
|
|
|
|
# Draw status at the bottom
|
|
self.draw_status(h, w)
|
|
self.stdscr.refresh()
|
|
|
|
ch = self.stdscr.getch()
|
|
if ch in (ord('q'), 27):
|
|
return None
|
|
elif ch == curses.KEY_BACKSPACE or ch == 127:
|
|
return "reload"
|
|
elif ch in (curses.KEY_LEFT, ord('h')):
|
|
self.vidx = max(0, self.vidx-1)
|
|
self.select_variant()
|
|
elif ch in (curses.KEY_RIGHT, ord('l')):
|
|
self.vidx = min(len(self.variants)-1, self.vidx+1)
|
|
self.select_variant()
|
|
elif ch in (curses.KEY_UP, ord('k')):
|
|
self.sidx = max(0, self.sidx-1)
|
|
elif ch in (curses.KEY_DOWN, ord('j')):
|
|
self.sidx = min(len(self.snames)-1, self.sidx+1)
|
|
elif ch in (ord('r'),):
|
|
if self.snames:
|
|
name = self.snames[self.sidx]
|
|
comp = self.merged_srcs[name]
|
|
fetcher = comp.get("fetcher", "none")
|
|
if self.pkg_name == "linux-cachyos" and name == "linux":
|
|
# Show available linux version from upstream PKGBUILD (.SRCINFO)
|
|
suffix = self.cachyos_suffix()
|
|
latest = self.fetch_cachyos_linux_latest(suffix)
|
|
rendered = render_templates(comp, self.merged_vars)
|
|
cur_version = str(rendered.get("version") or "")
|
|
url_hint = self.linux_tarball_url_for_version(latest) if latest else "-"
|
|
lines = [
|
|
f"linux-cachyos ({'base' if self.vidx == 0 else self.variants[self.vidx]}):",
|
|
f" current : {cur_version or '-'}",
|
|
f" available: {latest or '-'}",
|
|
f" tarball : {url_hint}",
|
|
]
|
|
show_popup(self.stdscr, lines)
|
|
else:
|
|
self.fetch_candidates_for(name)
|
|
cand = self.candidates.get(name, {})
|
|
lines = [
|
|
f"Candidates for {name}:",
|
|
f" latest release: {cand.get('release') or '-'}",
|
|
f" latest tag : {cand.get('tag') or '-'}",
|
|
f" latest commit : {cand.get('commit') or '-'}",
|
|
]
|
|
show_popup(self.stdscr, lines)
|
|
elif ch in (ord('i'),):
|
|
# Show full rendered URL for URL-based sources
|
|
if self.snames:
|
|
name = self.snames[self.sidx]
|
|
comp = self.merged_srcs[name]
|
|
if comp.get("fetcher", "none") == "url":
|
|
rendered = render_templates(comp, self.merged_vars)
|
|
url = rendered.get("url") or rendered.get("urlTemplate") or ""
|
|
if url:
|
|
show_popup(self.stdscr, ["Full URL:", url])
|
|
else:
|
|
self.set_status("No URL available")
|
|
elif ch in (ord('h'),):
|
|
if self.snames:
|
|
name = self.snames[self.sidx]
|
|
sri = self.prefetch_hash_for(name)
|
|
if sri:
|
|
ts = self.target_dict.setdefault("sources", {})
|
|
compw = ts.setdefault(name, {})
|
|
compw["hash"] = sri
|
|
self.set_status(f"{name}: updated hash")
|
|
else:
|
|
self.set_status(f"{name}: hash prefetch failed")
|
|
elif ch in (ord('e'),):
|
|
s = prompt_input(self.stdscr, "Edit path=value (relative to selected base/variant): ")
|
|
if s:
|
|
if "=" not in s:
|
|
self.set_status("Invalid input, expected key.path=value")
|
|
else:
|
|
k, v = s.split("=", 1)
|
|
path = [p for p in k.split(".") if p]
|
|
deep_set(self.cursor, path, v)
|
|
self.set_status(f"Set {k}={v}")
|
|
elif ch in (ord('s'),):
|
|
try:
|
|
self.save()
|
|
self.set_status("Saved.")
|
|
except Exception as e:
|
|
self.set_status(f"Save failed: {e}")
|
|
elif ch in (curses.KEY_ENTER, 10, 13):
|
|
if not self.snames:
|
|
continue
|
|
name = self.snames[self.sidx]
|
|
comp = self.merged_srcs[name]
|
|
fetcher = comp.get("fetcher", "none")
|
|
if fetcher in ("github", "git"):
|
|
# Ensure candidates loaded
|
|
if name not in self.candidates:
|
|
self.fetch_candidates_for(name)
|
|
cand = self.candidates.get(name, {})
|
|
# Present small menu
|
|
items = []
|
|
if fetcher == "github":
|
|
items = [
|
|
("Use latest release (tag)", ("release", cand.get("release"))),
|
|
("Use latest tag", ("tag", cand.get("tag"))),
|
|
("Use latest commit (rev)", ("commit", cand.get("commit"))),
|
|
("Recompute hash", ("hash", None)),
|
|
("Cancel", ("cancel", None)),
|
|
]
|
|
else:
|
|
items = [
|
|
("Use latest commit (rev)", ("commit", cand.get("commit"))),
|
|
("Recompute hash", ("hash", None)),
|
|
("Cancel", ("cancel", None)),
|
|
]
|
|
# Build header with current and available refs
|
|
rendered = render_templates(comp, self.merged_vars)
|
|
cur_tag = rendered.get("tag") or ""
|
|
cur_rev = rendered.get("rev") or ""
|
|
cur_version = rendered.get("version") or ""
|
|
if cur_tag:
|
|
current_str = f"current: tag={cur_tag}"
|
|
elif cur_rev:
|
|
current_str = f"current: rev={cur_rev[:12]}"
|
|
elif cur_version:
|
|
current_str = f"current: version={cur_version}"
|
|
else:
|
|
current_str = "current: -"
|
|
header_lines = [
|
|
current_str,
|
|
f"available: release={cand.get('release') or '-'} tag={cand.get('tag') or '-'} commit={(cand.get('commit') or '')[:12] or '-'}",
|
|
]
|
|
choice = select_menu(self.stdscr, f"Actions for {name}", [label for label, _ in items], header=header_lines)
|
|
if choice is not None:
|
|
kind, val = items[choice][1]
|
|
if kind in ("release", "tag", "commit"):
|
|
if val:
|
|
self.set_ref(name, kind, val)
|
|
# update hash
|
|
sri = self.prefetch_hash_for(name)
|
|
if sri:
|
|
ts = self.target_dict.setdefault("sources", {})
|
|
compw = ts.setdefault(name, {})
|
|
compw["hash"] = sri
|
|
self.set_status(f"{name}: set {kind} and updated hash")
|
|
else:
|
|
self.set_status(f"No candidate {kind}")
|
|
elif kind == "hash":
|
|
sri = self.prefetch_hash_for(name)
|
|
if sri:
|
|
ts = self.target_dict.setdefault("sources", {})
|
|
compw = ts.setdefault(name, {})
|
|
compw["hash"] = sri
|
|
self.set_status(f"{name}: updated hash")
|
|
else:
|
|
self.set_status("hash prefetch failed")
|
|
else:
|
|
pass
|
|
elif fetcher == "url":
|
|
# Offer latest release update (for proton-cachyos-like schemas) and/or hash recompute
|
|
cand = self.url_candidates.get(name)
|
|
menu_items: List[Tuple[str, Tuple[str, Optional[Dict[str, str]]]]] = []
|
|
if cand and cand.get("base") and cand.get("release"):
|
|
menu_items.append(("Use latest release (update variables.base/release)", ("update_vars", cand)))
|
|
menu_items.append(("Recompute hash (prefetch)", ("hash", None)))
|
|
menu_items.append(("Cancel", ("cancel", None)))
|
|
|
|
# Build header with current and available release info
|
|
base = str(self.merged_vars.get("base") or "")
|
|
rel = str(self.merged_vars.get("release") or "")
|
|
rp = str(self.merged_vars.get("releasePrefix") or "")
|
|
rs = str(self.merged_vars.get("releaseSuffix") or "")
|
|
current_tag = f"{rp}{base}-{rel}{rs}" if (base and rel) else ""
|
|
if current_tag:
|
|
current_str = f"current: {current_tag}"
|
|
elif base or rel:
|
|
current_str = f"current: base={base or '-'} release={rel or '-'}"
|
|
else:
|
|
current_str = "current: -"
|
|
header_lines = [
|
|
current_str,
|
|
f"available: tag={(cand.get('tag') or '-') if cand else '-'} base={(cand.get('base') or '-') if cand else '-'} release={(cand.get('release') or '-') if cand else '-'}",
|
|
]
|
|
choice = select_menu(self.stdscr, f"Actions for {name}", [label for label, _ in menu_items], header=header_lines)
|
|
if choice is not None:
|
|
kind, payload = menu_items[choice][1]
|
|
if kind == "update_vars" and isinstance(payload, dict):
|
|
# Write variables into selected base/variant dict
|
|
vars_dict = self.target_dict.setdefault("variables", {})
|
|
vars_dict["base"] = payload["base"]
|
|
vars_dict["release"] = payload["release"]
|
|
# Recompute merged view to reflect new variables
|
|
self.recompute_view()
|
|
# Prefetch and update hash
|
|
sri = self.prefetch_hash_for(name)
|
|
if sri:
|
|
ts = self.target_dict.setdefault("sources", {})
|
|
compw = ts.setdefault(name, {})
|
|
compw["hash"] = sri
|
|
self.set_status(f"{name}: updated to {payload['base']}.{payload['release']} and refreshed hash")
|
|
else:
|
|
self.set_status("hash prefetch failed after variable update")
|
|
elif kind == "hash":
|
|
sri = self.prefetch_hash_for(name)
|
|
if sri:
|
|
ts = self.target_dict.setdefault("sources", {})
|
|
compw = ts.setdefault(name, {})
|
|
compw["hash"] = sri
|
|
self.set_status(f"{name}: updated hash")
|
|
else:
|
|
self.set_status("hash prefetch failed")
|
|
else:
|
|
pass
|
|
else:
|
|
if self.pkg_name == "linux-cachyos" and name == "linux":
|
|
# Offer update of linux version from upstream PKGBUILD (.SRCINFO)
|
|
suffix = self.cachyos_suffix()
|
|
latest = self.fetch_cachyos_linux_latest(suffix)
|
|
rendered = render_templates(comp, self.merged_vars)
|
|
cur_version = str(rendered.get("version") or "")
|
|
header_lines = [
|
|
f"current: version={cur_version or '-'}",
|
|
f"available: version={latest or '-'}",
|
|
]
|
|
opts = []
|
|
if latest:
|
|
opts.append(f"Update linux version to {latest} from PKGBUILD (.SRCINFO)")
|
|
else:
|
|
opts.append("Update linux version from PKGBUILD (.SRCINFO)")
|
|
opts.append("Cancel")
|
|
choice = select_menu(self.stdscr, f"Actions for {name}", opts, header=header_lines)
|
|
if choice == 0 and latest:
|
|
self.update_linux_from_pkgbuild(name)
|
|
else:
|
|
pass
|
|
else:
|
|
show_popup(self.stdscr, [f"{name}: fetcher={fetcher}", "Use 'e' to edit fields manually."])
|
|
else:
|
|
pass
|
|
|
|
def select_menu(stdscr, title: str, options: List[str], header: Optional[List[str]] = None) -> Optional[int]:
|
|
idx = 0
|
|
while True:
|
|
stdscr.clear()
|
|
h, w = stdscr.getmaxyx()
|
|
|
|
# Calculate menu dimensions
|
|
menu_width = min(w - 4, max(40, max(len(title) + 4, max(len(opt) + 4 for opt in options))))
|
|
menu_height = min(h - 4, len(options) + (len(header) if header else 0) + 4)
|
|
|
|
# Calculate position for centered menu
|
|
start_x = (w - menu_width) // 2
|
|
start_y = (h - menu_height) // 2
|
|
|
|
# Draw border around menu
|
|
draw_border(stdscr, start_y, start_x, menu_height, menu_width)
|
|
|
|
# Draw title
|
|
title_x = start_x + (menu_width - len(title)) // 2
|
|
stdscr.addstr(start_y, title_x, f" {title} ", curses.color_pair(COLOR_TITLE) | curses.A_BOLD)
|
|
|
|
# Draw header if provided
|
|
y = start_y + 1
|
|
if header:
|
|
for line in header:
|
|
if y >= start_y + menu_height - 2:
|
|
break
|
|
stdscr.addstr(y, start_x + 2, str(line)[:menu_width-4], curses.color_pair(COLOR_HEADER))
|
|
y += 1
|
|
|
|
# Add separator line after header
|
|
for i in range(1, menu_width - 1):
|
|
stdscr.addch(y, start_x + i, curses.ACS_HLINE, curses.color_pair(COLOR_BORDER))
|
|
y += 1
|
|
|
|
# Draw options
|
|
options_start_y = y
|
|
visible_options = min(len(options), start_y + menu_height - options_start_y - 1)
|
|
|
|
for i, opt in enumerate(options[:visible_options], start=0):
|
|
# Highlight selected option
|
|
if i == idx:
|
|
attr = curses.color_pair(COLOR_HIGHLIGHT)
|
|
sel = "►" # Use a fancier selector
|
|
else:
|
|
attr = curses.color_pair(COLOR_NORMAL)
|
|
sel = " "
|
|
|
|
stdscr.addstr(options_start_y + i, start_x + 2, f"{sel} {opt}"[:menu_width-4], attr)
|
|
|
|
# Draw footer
|
|
footer = "Enter: select | Backspace: cancel"
|
|
footer_x = start_x + (menu_width - len(footer)) // 2
|
|
stdscr.addstr(start_y + menu_height - 1, footer_x, footer, curses.color_pair(COLOR_STATUS))
|
|
|
|
stdscr.refresh()
|
|
ch = stdscr.getch()
|
|
if ch in (curses.KEY_UP, ord('k')):
|
|
idx = max(0, idx-1)
|
|
elif ch in (curses.KEY_DOWN, ord('j')):
|
|
idx = min(len(options)-1, idx+1)
|
|
elif ch in (curses.KEY_ENTER, 10, 13):
|
|
return idx
|
|
elif ch == curses.KEY_BACKSPACE or ch == 127 or ch == 27:
|
|
return None
|
|
|
|
# ------------------------------ main ------------------------------
|
|
|
|
def main(stdscr):
|
|
curses.curs_set(0) # Hide cursor
|
|
stdscr.nodelay(False) # Blocking input
|
|
|
|
# Initialize colors
|
|
if curses.has_colors():
|
|
init_colors()
|
|
|
|
try:
|
|
# Display welcome screen
|
|
h, w = stdscr.getmaxyx()
|
|
welcome_lines = [
|
|
"╔═══════════════════════════════════════════╗",
|
|
"║ ║",
|
|
"║ NixOS Package Version Manager ║",
|
|
"║ ║",
|
|
"║ Browse and update package versions with ║",
|
|
"║ this interactive TUI. Navigate using ║",
|
|
"║ arrow keys and Enter to select. ║",
|
|
"║ ║",
|
|
"╚═══════════════════════════════════════════╝",
|
|
"",
|
|
"Loading packages..."
|
|
]
|
|
|
|
# Center the welcome message
|
|
start_y = (h - len(welcome_lines)) // 2
|
|
for i, line in enumerate(welcome_lines):
|
|
if start_y + i < h:
|
|
start_x = (w - len(line)) // 2
|
|
if "NixOS Package Version Manager" in line:
|
|
stdscr.addstr(start_y + i, start_x, line, curses.color_pair(COLOR_TITLE) | curses.A_BOLD)
|
|
elif "Loading packages..." in line:
|
|
stdscr.addstr(start_y + i, start_x, line, curses.color_pair(COLOR_STATUS))
|
|
else:
|
|
stdscr.addstr(start_y + i, start_x, line, curses.color_pair(COLOR_NORMAL))
|
|
|
|
stdscr.refresh()
|
|
|
|
# Start the main screen after a short delay
|
|
screen = PackagesScreen(stdscr)
|
|
screen.run()
|
|
except Exception:
|
|
curses.endwin()
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
curses.wrapper(main)
|