1413 lines
51 KiB
Python
Executable File
1413 lines
51 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Interactive TUI for browsing and updating version.json files.
|
|
|
|
Controls:
|
|
Package list:
|
|
j/k / arrows navigate
|
|
PgUp/PgDn page scroll
|
|
g/G top/bottom
|
|
f cycle filter (all / github / git / url)
|
|
Enter open package detail
|
|
q / ESC quit
|
|
|
|
Package detail:
|
|
j/k / arrows navigate sources
|
|
Left/Right cycle variants
|
|
r refresh candidates (fetch from upstream)
|
|
h recompute hash (prefetch)
|
|
c recompute cargo hash (if cargoHash present)
|
|
e edit arbitrary field (path=value)
|
|
s save to disk
|
|
Backspace back to list
|
|
q / ESC quit
|
|
|
|
Action menu / popup:
|
|
j/k / arrows navigate
|
|
Enter confirm
|
|
Backspace/ESC cancel
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import curses
|
|
import json
|
|
import sys
|
|
import traceback
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
|
|
|
import lib
|
|
import hooks # registers hooks as a side effect # noqa: F401
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Color pairs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
C_NORMAL = 1
|
|
C_HIGHLIGHT = 2
|
|
C_HEADER = 3
|
|
C_STATUS = 4
|
|
C_ERROR = 5
|
|
C_SUCCESS = 6
|
|
C_BORDER = 7
|
|
C_TITLE = 8
|
|
C_DIM = 9
|
|
|
|
|
|
def _init_colors() -> None:
|
|
curses.start_color()
|
|
curses.use_default_colors()
|
|
curses.init_pair(C_NORMAL, curses.COLOR_WHITE, -1)
|
|
curses.init_pair(C_HIGHLIGHT, curses.COLOR_BLACK, curses.COLOR_CYAN)
|
|
curses.init_pair(C_HEADER, curses.COLOR_CYAN, -1)
|
|
curses.init_pair(C_STATUS, curses.COLOR_YELLOW, -1)
|
|
curses.init_pair(C_ERROR, curses.COLOR_RED, -1)
|
|
curses.init_pair(C_SUCCESS, curses.COLOR_GREEN, -1)
|
|
curses.init_pair(C_BORDER, curses.COLOR_BLUE, -1)
|
|
curses.init_pair(C_TITLE, curses.COLOR_MAGENTA, -1)
|
|
curses.init_pair(C_DIM, curses.COLOR_WHITE, -1)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Drawing helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _draw_border(win: Any, y: int, x: int, h: int, w: int) -> None:
|
|
bp = curses.color_pair(C_BORDER)
|
|
win.addch(y, x, curses.ACS_ULCORNER, bp)
|
|
win.addch(y, x + w - 1, curses.ACS_URCORNER, bp)
|
|
win.addch(y + h - 1, x, curses.ACS_LLCORNER, bp)
|
|
try:
|
|
win.addch(y + h - 1, x + w - 1, curses.ACS_LRCORNER, bp)
|
|
except curses.error:
|
|
pass
|
|
for i in range(1, w - 1):
|
|
win.addch(y, x + i, curses.ACS_HLINE, bp)
|
|
win.addch(y + h - 1, x + i, curses.ACS_HLINE, bp)
|
|
for i in range(1, h - 1):
|
|
win.addch(y + i, x, curses.ACS_VLINE, bp)
|
|
win.addch(y + i, x + w - 1, curses.ACS_VLINE, bp)
|
|
|
|
|
|
def _draw_hline(win: Any, y: int, x1: int, x2: int) -> None:
|
|
for x in range(x1, x2):
|
|
try:
|
|
win.addch(y, x, curses.ACS_HLINE, curses.color_pair(C_BORDER))
|
|
except curses.error:
|
|
pass
|
|
|
|
|
|
def _addstr(win: Any, y: int, x: int, text: str, attr: int = 0, max_w: int = 0) -> None:
|
|
"""Safe addstr that clips to max_w and ignores curses.error."""
|
|
try:
|
|
h, w = win.getmaxyx()
|
|
avail = (w - x - 1) if max_w <= 0 else min(max_w, w - x - 1)
|
|
if avail <= 0:
|
|
return
|
|
win.addstr(y, x, text[:avail], attr)
|
|
except curses.error:
|
|
pass
|
|
|
|
|
|
def _show_popup(stdscr: Any, lines: List[str], title: str = "") -> None:
|
|
h, w = stdscr.getmaxyx()
|
|
content_w = max((len(l) for l in lines), default=0)
|
|
box_w = min(w - 4, max(44, len(title) + 4, content_w + 4))
|
|
box_h = min(h - 4, len(lines) + 4)
|
|
top = (h - box_h) // 2
|
|
left = (w - box_w) // 2
|
|
win = curses.newwin(box_h, box_w, top, left)
|
|
_draw_border(win, 0, 0, box_h, box_w)
|
|
if title:
|
|
tx = max(1, (box_w - len(title) - 2) // 2)
|
|
_addstr(win, 0, tx, f" {title} ", curses.color_pair(C_TITLE))
|
|
for i, line in enumerate(lines, 1):
|
|
if i >= box_h - 2:
|
|
break
|
|
_addstr(win, i, 2, line, curses.color_pair(C_NORMAL), box_w - 4)
|
|
footer = "any key to close"
|
|
_addstr(
|
|
win,
|
|
box_h - 1,
|
|
max(1, (box_w - len(footer)) // 2),
|
|
footer,
|
|
curses.color_pair(C_STATUS),
|
|
)
|
|
win.refresh()
|
|
win.getch()
|
|
|
|
|
|
def _select_menu(
|
|
stdscr: Any,
|
|
title: str,
|
|
options: List[str],
|
|
header: Optional[List[str]] = None,
|
|
) -> Optional[int]:
|
|
"""Show a centered menu; returns chosen index or None on cancel."""
|
|
idx = 0
|
|
while True:
|
|
stdscr.clear()
|
|
h, w = stdscr.getmaxyx()
|
|
hdr_lines = header or []
|
|
opt_h = len(options)
|
|
box_h = min(h - 4, opt_h + len(hdr_lines) + (2 if hdr_lines else 0) + 4)
|
|
max_len = max(
|
|
[len(title) + 4]
|
|
+ [len(o) + 4 for o in options]
|
|
+ [len(str(l)) + 4 for l in hdr_lines]
|
|
)
|
|
box_w = min(w - 4, max(44, max_len))
|
|
sy = (h - box_h) // 2
|
|
sx = (w - box_w) // 2
|
|
_draw_border(stdscr, sy, sx, box_h, box_w)
|
|
tx = sx + max(1, (box_w - len(title) - 2) // 2)
|
|
_addstr(
|
|
stdscr, sy, tx, f" {title} ", curses.color_pair(C_TITLE) | curses.A_BOLD
|
|
)
|
|
|
|
y = sy + 1
|
|
for line in hdr_lines:
|
|
if y >= sy + box_h - 2:
|
|
break
|
|
_addstr(
|
|
stdscr, y, sx + 2, str(line), curses.color_pair(C_HEADER), box_w - 4
|
|
)
|
|
y += 1
|
|
if hdr_lines:
|
|
_draw_hline(stdscr, y, sx + 1, sx + box_w - 1)
|
|
y += 1
|
|
|
|
opt_start = y
|
|
for i, opt in enumerate(options):
|
|
if y >= sy + box_h - 1:
|
|
break
|
|
sel = "►" if i == idx else " "
|
|
attr = (
|
|
curses.color_pair(C_HIGHLIGHT)
|
|
if i == idx
|
|
else curses.color_pair(C_NORMAL)
|
|
)
|
|
_addstr(stdscr, y, sx + 2, f"{sel} {opt}", attr, box_w - 4)
|
|
y += 1
|
|
|
|
footer = "Enter:select Bksp/ESC:cancel"
|
|
_addstr(
|
|
stdscr,
|
|
sy + box_h - 1,
|
|
sx + max(1, (box_w - len(footer)) // 2),
|
|
footer,
|
|
curses.color_pair(C_STATUS),
|
|
)
|
|
stdscr.refresh()
|
|
|
|
ch = stdscr.getch()
|
|
if ch in (curses.KEY_UP, ord("k")):
|
|
idx = max(0, idx - 1)
|
|
elif ch in (curses.KEY_DOWN, ord("j")):
|
|
idx = min(len(options) - 1, idx + 1)
|
|
elif ch in (curses.KEY_ENTER, 10, 13):
|
|
return idx
|
|
elif ch in (curses.KEY_BACKSPACE, 127, 27):
|
|
return None
|
|
|
|
|
|
def _prompt(stdscr: Any, prompt: str) -> Optional[str]:
|
|
h, w = stdscr.getmaxyx()
|
|
curses.echo()
|
|
_addstr(
|
|
stdscr,
|
|
h - 1,
|
|
0,
|
|
prompt + " " * (w - len(prompt) - 1),
|
|
curses.color_pair(C_HEADER),
|
|
)
|
|
stdscr.move(h - 1, len(prompt))
|
|
stdscr.refresh()
|
|
try:
|
|
s = stdscr.getstr().decode("utf-8").strip()
|
|
except Exception:
|
|
s = ""
|
|
curses.noecho()
|
|
return s or None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Status bar mixin
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class _StatusMixin:
|
|
def __init__(self) -> None:
|
|
self._status = ""
|
|
self._status_color = C_STATUS
|
|
|
|
def set_status(self, text: str, *, error: bool = False, ok: bool = False) -> None:
|
|
self._status = text
|
|
self._status_color = C_ERROR if error else (C_SUCCESS if ok else C_STATUS)
|
|
|
|
def draw_status(self, win: Any, row: int) -> None:
|
|
if not self._status:
|
|
return
|
|
h, w = win.getmaxyx()
|
|
_addstr(win, row, 0, self._status, curses.color_pair(self._status_color), w - 1)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Package list screen
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_FETCHER_FILTERS = ["all", "github", "git", "url", "pypi"]
|
|
|
|
|
|
class PackagesScreen(_StatusMixin):
|
|
def __init__(self, stdscr: Any) -> None:
|
|
super().__init__()
|
|
self.stdscr = stdscr
|
|
self.packages: List[Tuple[str, Path]] = lib.find_packages()
|
|
self.idx = 0
|
|
self.scroll = 0
|
|
self.filter_fetcher = "all" # filter by primary fetcher
|
|
|
|
def _filtered(self) -> List[Tuple[str, Path]]:
|
|
if self.filter_fetcher == "all":
|
|
return self.packages
|
|
result = []
|
|
for name, path in self.packages:
|
|
try:
|
|
spec = lib.load_json(path)
|
|
srcs = spec.get("sources") or {}
|
|
fetchers = {(s.get("fetcher") or "none") for s in srcs.values()}
|
|
if self.filter_fetcher in fetchers:
|
|
result.append((name, path))
|
|
except Exception:
|
|
pass
|
|
return result
|
|
|
|
def run(self) -> None:
|
|
while True:
|
|
filtered = self._filtered()
|
|
self._draw(filtered)
|
|
ch = self.stdscr.getch()
|
|
|
|
if ch in (ord("q"), 27):
|
|
return
|
|
elif ch in (curses.KEY_UP, ord("k")):
|
|
self.idx = max(0, self.idx - 1)
|
|
elif ch in (curses.KEY_DOWN, ord("j")):
|
|
self.idx = min(max(0, len(filtered) - 1), self.idx + 1)
|
|
elif ch == curses.KEY_PPAGE:
|
|
h, _ = self.stdscr.getmaxyx()
|
|
self.idx = max(0, self.idx - (h - 4))
|
|
elif ch == curses.KEY_NPAGE:
|
|
h, _ = self.stdscr.getmaxyx()
|
|
self.idx = min(max(0, len(filtered) - 1), self.idx + (h - 4))
|
|
elif ch == ord("g"):
|
|
self.idx = 0
|
|
elif ch == ord("G"):
|
|
self.idx = max(0, len(filtered) - 1)
|
|
elif ch == ord("f"):
|
|
fi = _FETCHER_FILTERS.index(self.filter_fetcher)
|
|
self.filter_fetcher = _FETCHER_FILTERS[(fi + 1) % len(_FETCHER_FILTERS)]
|
|
self.idx = 0
|
|
self.scroll = 0
|
|
elif ch in (curses.KEY_ENTER, 10, 13):
|
|
if not filtered:
|
|
continue
|
|
name, path = filtered[self.idx]
|
|
try:
|
|
spec = lib.load_json(path)
|
|
except Exception as e:
|
|
self.set_status(f"Failed to load {path.name}: {e}", error=True)
|
|
continue
|
|
detail = PackageDetailScreen(self.stdscr, name, path, spec)
|
|
detail.run()
|
|
# Reload in case something changed on disk
|
|
self.packages = lib.find_packages()
|
|
self.idx = min(self.idx, max(0, len(self._filtered()) - 1))
|
|
|
|
def _draw(self, filtered: List[Tuple[str, Path]]) -> None:
|
|
self.stdscr.clear()
|
|
h, w = self.stdscr.getmaxyx()
|
|
left_w = max(30, min(55, w // 3))
|
|
right_x = left_w + 1
|
|
right_w = max(0, w - right_x)
|
|
|
|
_draw_border(self.stdscr, 0, 0, h - 1, left_w)
|
|
if right_w >= 20:
|
|
_draw_border(self.stdscr, 0, right_x, h - 1, right_w)
|
|
|
|
filt_label = "" if self.filter_fetcher == "all" else f" [{self.filter_fetcher}]"
|
|
title = f" Packages{filt_label} [{len(filtered)}] f:filter "
|
|
tx = max(1, (left_w - len(title)) // 2)
|
|
_addstr(self.stdscr, 0, tx, title, curses.color_pair(C_TITLE) | curses.A_BOLD)
|
|
|
|
max_rows = h - 3
|
|
if self.idx >= self.scroll + max_rows:
|
|
self.scroll = self.idx - max_rows + 1
|
|
elif self.idx < self.scroll:
|
|
self.scroll = self.idx
|
|
|
|
visible = filtered[self.scroll : self.scroll + max_rows]
|
|
|
|
if self.scroll > 0:
|
|
_addstr(self.stdscr, 1, left_w - 3, "↑", curses.color_pair(C_STATUS))
|
|
if self.scroll + max_rows < len(filtered):
|
|
_addstr(
|
|
self.stdscr,
|
|
min(1 + len(visible), h - 2),
|
|
left_w - 3,
|
|
"↓",
|
|
curses.color_pair(C_STATUS),
|
|
)
|
|
|
|
for i, (name, _path) in enumerate(visible):
|
|
row = i + self.scroll
|
|
if row == self.idx:
|
|
attr = curses.color_pair(C_HIGHLIGHT)
|
|
sel = "►"
|
|
else:
|
|
attr = curses.color_pair(C_NORMAL)
|
|
sel = " "
|
|
_addstr(self.stdscr, 1 + i, 2, f"{sel} {name}", attr, left_w - 4)
|
|
|
|
# Right pane: preview
|
|
if right_w >= 20 and filtered:
|
|
try:
|
|
name, path = filtered[self.idx]
|
|
hdr = f" {name} "
|
|
_addstr(
|
|
self.stdscr,
|
|
0,
|
|
right_x + max(1, (right_w - len(hdr)) // 2),
|
|
hdr,
|
|
curses.color_pair(C_TITLE) | curses.A_BOLD,
|
|
right_w - 2,
|
|
)
|
|
try:
|
|
rel_path = str(path.relative_to(lib.ROOT))
|
|
except ValueError:
|
|
rel_path = str(path)
|
|
_addstr(
|
|
self.stdscr,
|
|
1,
|
|
right_x + 2,
|
|
rel_path,
|
|
curses.color_pair(C_NORMAL) | curses.A_DIM,
|
|
right_w - 3,
|
|
)
|
|
_addstr(
|
|
self.stdscr, 2, right_x + 2, "Sources:", curses.color_pair(C_HEADER)
|
|
)
|
|
|
|
spec = lib.load_json(path)
|
|
mvars, msrcs, _ = lib.merged_view(spec, None)
|
|
NAME_W, FETCH_W = 16, 7
|
|
ref_col = right_x + 2 + NAME_W + 1 + FETCH_W + 1
|
|
for i2, (sname, comp) in enumerate(sorted(msrcs.items())):
|
|
if 3 + i2 >= h - 3:
|
|
break
|
|
fetcher = comp.get("fetcher", "none")
|
|
ref = lib.source_ref_label(comp, mvars)
|
|
max_ref = max(0, right_w - (NAME_W + 1 + FETCH_W + 1) - 3)
|
|
fc = (
|
|
C_SUCCESS
|
|
if fetcher == "github"
|
|
else (
|
|
C_STATUS
|
|
if fetcher == "url"
|
|
else (C_HEADER if fetcher == "git" else C_NORMAL)
|
|
)
|
|
)
|
|
_addstr(
|
|
self.stdscr,
|
|
3 + i2,
|
|
right_x + 2,
|
|
f"{sname[:NAME_W]:<{NAME_W}}",
|
|
curses.color_pair(C_NORMAL),
|
|
)
|
|
_addstr(
|
|
self.stdscr,
|
|
3 + i2,
|
|
right_x + 2 + NAME_W + 1,
|
|
f"{fetcher[:FETCH_W]:<{FETCH_W}}",
|
|
curses.color_pair(fc),
|
|
)
|
|
_addstr(
|
|
self.stdscr,
|
|
3 + i2,
|
|
ref_col,
|
|
ref[:max_ref] + ("…" if len(ref) > max_ref else ""),
|
|
curses.color_pair(C_NORMAL),
|
|
)
|
|
|
|
hint = "Enter:details j/k:move f:filter q:quit"
|
|
_addstr(
|
|
self.stdscr,
|
|
h - 2,
|
|
right_x + max(1, (right_w - len(hint)) // 2),
|
|
hint,
|
|
curses.color_pair(C_STATUS),
|
|
right_w - 2,
|
|
)
|
|
except Exception as e:
|
|
_addstr(
|
|
self.stdscr,
|
|
2,
|
|
right_x + 2,
|
|
f"Error: {e}",
|
|
curses.color_pair(C_ERROR),
|
|
right_w - 4,
|
|
)
|
|
|
|
self.draw_status(self.stdscr, h - 1)
|
|
self.stdscr.refresh()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Package detail screen
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class PackageDetailScreen(_StatusMixin):
|
|
# Layout constants
|
|
_SRC_NAME_W = 20
|
|
_SRC_FETCH_W = 6
|
|
_SRC_REF_COL = 2 + 20 + 1 + 6 + 1 # = 30
|
|
|
|
# How many rows the "latest" + "footer" sections occupy at the bottom
|
|
_BOTTOM_ROWS = 9 # sep + 3 latest rows + sep + 2 footer + status
|
|
|
|
def __init__(
|
|
self,
|
|
stdscr: Any,
|
|
pkg_name: str,
|
|
path: Path,
|
|
spec: lib.Json,
|
|
) -> None:
|
|
super().__init__()
|
|
self.stdscr = stdscr
|
|
self.pkg_name = pkg_name
|
|
self.path = path
|
|
self.spec = spec
|
|
|
|
self.variants: List[str] = ["<base>"] + list(
|
|
(spec.get("variants") or {}).keys()
|
|
)
|
|
default = spec.get("defaultVariant")
|
|
self.vidx = self.variants.index(default) if default in self.variants else 0 # type: ignore[arg-type]
|
|
|
|
self.sidx = 0
|
|
self.candidates: Dict[str, lib.Candidates] = {}
|
|
self.url_candidates: Dict[
|
|
str, Dict[str, str]
|
|
] = {} # name -> {base, release, tag}
|
|
|
|
self._refresh_view()
|
|
|
|
# ------------------------------------------------------------------
|
|
# View management
|
|
# ------------------------------------------------------------------
|
|
|
|
def _variant_name(self) -> Optional[str]:
|
|
return None if self.vidx == 0 else self.variants[self.vidx]
|
|
|
|
def _refresh_view(self) -> None:
|
|
vname = self._variant_name()
|
|
self.merged_vars, self.merged_srcs, self.target_dict = lib.merged_view(
|
|
self.spec, vname
|
|
)
|
|
self.snames = sorted(self.merged_srcs.keys())
|
|
self.sidx = min(self.sidx, max(0, len(self.snames) - 1))
|
|
# Inject variant suffix hint for special hooks
|
|
if self.pkg_name == "linux-cachyos":
|
|
from hooks import _cachyos_linux_suffix
|
|
|
|
self.merged_vars["_cachyos_suffix"] = _cachyos_linux_suffix(vname)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Candidate fetching
|
|
# ------------------------------------------------------------------
|
|
|
|
def _fetch_candidates_for(self, name: str) -> None:
|
|
comp = self.merged_srcs.get(name, {})
|
|
hook = hooks.get_candidates_hook(self.pkg_name, name)
|
|
if hook:
|
|
self.candidates[name] = hook(comp, self.merged_vars)
|
|
else:
|
|
self.candidates[name] = lib.fetch_candidates(comp, self.merged_vars)
|
|
|
|
# For URL fetcher with github variables, parse base/release from the tag
|
|
c = self.candidates[name]
|
|
if comp.get("fetcher") == "url" and c.release:
|
|
prefix = str(self.merged_vars.get("releasePrefix") or "")
|
|
suffix = str(self.merged_vars.get("releaseSuffix") or "")
|
|
mid = c.release
|
|
if prefix and mid.startswith(prefix):
|
|
mid = mid[len(prefix) :]
|
|
if suffix and mid.endswith(suffix):
|
|
mid = mid[: -len(suffix)]
|
|
parts = mid.split("-")
|
|
if len(parts) >= 2:
|
|
self.url_candidates[name] = {
|
|
"base": parts[0],
|
|
"release": parts[-1],
|
|
"tag": c.release,
|
|
}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Hash prefetch
|
|
# ------------------------------------------------------------------
|
|
|
|
def _prefetch_hash(self, name: str) -> Optional[str]:
|
|
comp = self.merged_srcs[name]
|
|
return lib.prefetch_source(comp, self.merged_vars)
|
|
|
|
def _has_cargo(self, name: str) -> bool:
|
|
return "cargoHash" in self.merged_srcs.get(name, {})
|
|
|
|
def _prefetch_cargo(self, name: str) -> Optional[str]:
|
|
comp = self.merged_srcs[name]
|
|
rendered = lib.render(comp, self.merged_vars)
|
|
fetcher = comp.get("fetcher", "none")
|
|
src_hash = comp.get("hash", "")
|
|
subdir = comp.get("cargoSubdir", "")
|
|
return lib.prefetch_cargo_vendor(
|
|
fetcher,
|
|
src_hash,
|
|
url=comp.get("url", ""),
|
|
owner=comp.get("owner", ""),
|
|
repo=comp.get("repo", ""),
|
|
rev=rendered.get("tag") or rendered.get("rev") or "",
|
|
subdir=subdir,
|
|
)
|
|
|
|
def _propagate_cargo_hash(self, name: str, cargo_hash: str) -> None:
|
|
"""Copy cargo hash to any sibling cargoDeps or hash-only source."""
|
|
ts = self.target_dict.setdefault("sources", {})
|
|
for sib_name, sib in self.merged_srcs.items():
|
|
if sib_name == name:
|
|
continue
|
|
is_cargo_deps = sib_name == "cargoDeps"
|
|
is_hash_only = not sib.get("fetcher") and list(sib.keys()) == ["hash"]
|
|
if is_cargo_deps or is_hash_only:
|
|
ts.setdefault(sib_name, {})["hash"] = cargo_hash
|
|
|
|
# ------------------------------------------------------------------
|
|
# Write helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _set_ref(self, name: str, kind: str, value: str) -> None:
|
|
ts = self.target_dict.setdefault("sources", {})
|
|
comp = ts.setdefault(name, {})
|
|
if kind in ("release", "tag"):
|
|
comp["tag"] = value
|
|
comp.pop("rev", None)
|
|
elif kind == "commit":
|
|
comp["rev"] = value
|
|
comp.pop("tag", None)
|
|
self._refresh_view()
|
|
|
|
def _write_hash(self, name: str, sri: str) -> None:
|
|
ts = self.target_dict.setdefault("sources", {})
|
|
ts.setdefault(name, {})["hash"] = sri
|
|
self._refresh_view()
|
|
|
|
def _write_cargo_hash(self, name: str, sri: str) -> None:
|
|
ts = self.target_dict.setdefault("sources", {})
|
|
ts.setdefault(name, {})["cargoHash"] = sri
|
|
self._propagate_cargo_hash(name, sri)
|
|
self._refresh_view()
|
|
|
|
def _save(self) -> None:
|
|
lib.save_json(self.path, self.spec)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Drawing
|
|
# ------------------------------------------------------------------
|
|
|
|
def _draw(self) -> None:
|
|
self.stdscr.clear()
|
|
h, w = self.stdscr.getmaxyx()
|
|
_draw_border(self.stdscr, 0, 0, h - 1, w)
|
|
|
|
# Title
|
|
title = f" {self.pkg_name} "
|
|
_addstr(
|
|
self.stdscr,
|
|
0,
|
|
max(1, (w - len(title)) // 2),
|
|
title,
|
|
curses.color_pair(C_TITLE) | curses.A_BOLD,
|
|
)
|
|
|
|
# Path
|
|
try:
|
|
rel = str(self.path.relative_to(lib.ROOT))
|
|
except ValueError:
|
|
rel = str(self.path)
|
|
_addstr(
|
|
self.stdscr, 1, 2, rel, curses.color_pair(C_NORMAL) | curses.A_DIM, w - 4
|
|
)
|
|
|
|
# Variants row
|
|
_addstr(self.stdscr, 2, 2, "Variants:", curses.color_pair(C_HEADER))
|
|
xp = 12
|
|
for i, v in enumerate(self.variants):
|
|
if xp >= w - 4:
|
|
break
|
|
if i > 0:
|
|
_addstr(self.stdscr, 2, xp, " | ", curses.color_pair(C_NORMAL))
|
|
xp += 3
|
|
if i == self.vidx:
|
|
_addstr(self.stdscr, 2, xp, f"[{v}]", curses.color_pair(C_HIGHLIGHT))
|
|
xp += len(v) + 2
|
|
else:
|
|
_addstr(self.stdscr, 2, xp, v, curses.color_pair(C_NORMAL))
|
|
xp += len(v)
|
|
|
|
# Sources section separator
|
|
_draw_hline(self.stdscr, 3, 1, w - 1)
|
|
_addstr(
|
|
self.stdscr, 3, 2, " Sources ", curses.color_pair(C_HEADER) | curses.A_BOLD
|
|
)
|
|
|
|
# Latest section layout: separator at y_latest, 3 content rows below
|
|
y_latest = h - self._BOTTOM_ROWS
|
|
_max_src_rows = max(0, y_latest - 4)
|
|
|
|
for i, name in enumerate(self.snames[:_max_src_rows]):
|
|
comp = self.merged_srcs[name]
|
|
fetcher = comp.get("fetcher", "none")
|
|
ref = lib.source_ref_label(comp, self.merged_vars)
|
|
branch = comp.get("branch") or ""
|
|
has_cargo = "cargoHash" in comp
|
|
badges = (f" [{branch}]" if branch else "") + (
|
|
" [cargo]" if has_cargo else ""
|
|
)
|
|
ref_text = ref + badges
|
|
ref_short = ref_text[: w - self._SRC_REF_COL - 2]
|
|
if len(ref_text) > w - self._SRC_REF_COL - 2:
|
|
ref_short = ref_short[:-1] + "…"
|
|
|
|
if i == self.sidx:
|
|
row_attr = curses.color_pair(C_HIGHLIGHT)
|
|
sel = "►"
|
|
else:
|
|
row_attr = curses.color_pair(C_NORMAL)
|
|
sel = " "
|
|
|
|
fc = (
|
|
C_SUCCESS
|
|
if fetcher == "github"
|
|
else (
|
|
C_STATUS
|
|
if fetcher == "url"
|
|
else (C_HEADER if fetcher == "git" else C_NORMAL)
|
|
)
|
|
)
|
|
|
|
row = 4 + i
|
|
_addstr(
|
|
self.stdscr,
|
|
row,
|
|
2,
|
|
f"{sel} {name[: self._SRC_NAME_W - 2]:<{self._SRC_NAME_W - 2}}",
|
|
row_attr,
|
|
)
|
|
_addstr(
|
|
self.stdscr,
|
|
row,
|
|
2 + self._SRC_NAME_W,
|
|
f"{fetcher[: self._SRC_FETCH_W]:<{self._SRC_FETCH_W}}",
|
|
curses.color_pair(fc),
|
|
)
|
|
_addstr(
|
|
self.stdscr,
|
|
row,
|
|
self._SRC_REF_COL,
|
|
ref_short,
|
|
curses.color_pair(C_NORMAL),
|
|
)
|
|
|
|
# Latest candidates section
|
|
_draw_hline(self.stdscr, y_latest, 1, w - 1)
|
|
self._draw_candidates(y_latest, w)
|
|
|
|
# Footer separator + keys
|
|
_draw_hline(self.stdscr, h - 4, 1, w - 1)
|
|
f1 = "Enter:actions r:refresh h:hash c:cargo e:edit s:save"
|
|
f2 = "←/→:variant j/k:source Bksp:back q:quit"
|
|
_addstr(
|
|
self.stdscr,
|
|
h - 3,
|
|
max(1, (w - len(f1)) // 2),
|
|
f1,
|
|
curses.color_pair(C_STATUS),
|
|
)
|
|
_addstr(
|
|
self.stdscr,
|
|
h - 2,
|
|
max(1, (w - len(f2)) // 2),
|
|
f2,
|
|
curses.color_pair(C_STATUS),
|
|
)
|
|
|
|
self.draw_status(self.stdscr, h - 1)
|
|
self.stdscr.refresh()
|
|
|
|
def _draw_candidates(self, y: int, w: int) -> None:
|
|
if not self.snames:
|
|
return
|
|
name = self.snames[self.sidx]
|
|
comp = self.merged_srcs[name]
|
|
fetcher = comp.get("fetcher", "none")
|
|
branch = comp.get("branch") or ""
|
|
|
|
hdr = "Latest Versions:" + (f" (branch: {branch})" if branch else "")
|
|
_addstr(self.stdscr, y + 1, 2, hdr, curses.color_pair(C_HEADER) | curses.A_BOLD)
|
|
|
|
# Lazy-load candidates on first draw of this source
|
|
if fetcher in ("github", "git", "url", "pypi") and name not in self.candidates:
|
|
self._fetch_candidates_for(name)
|
|
|
|
c = self.candidates.get(name)
|
|
dim = curses.color_pair(C_DIM) | curses.A_DIM
|
|
|
|
def _row(r: int, label: str, value: str, date: str, color: int) -> None:
|
|
lw = 9
|
|
_addstr(self.stdscr, r, 4, f"{label:<{lw}}", curses.color_pair(C_HEADER))
|
|
_addstr(
|
|
self.stdscr, r, 4 + lw, value[: w - lw - 6], curses.color_pair(color)
|
|
)
|
|
if date and 4 + lw + len(value) + 2 < w - 2:
|
|
_addstr(self.stdscr, r, 4 + lw + len(value) + 1, date, dim)
|
|
|
|
if c and fetcher in ("github", "git"):
|
|
row = y + 2
|
|
if c.release:
|
|
_row(row, "Release:", c.release, c.release_date, C_SUCCESS)
|
|
row += 1
|
|
if c.tag:
|
|
_row(row, "Tag:", c.tag, c.tag_date, C_SUCCESS)
|
|
row += 1
|
|
if c.commit:
|
|
_row(row, "Commit:", c.commit[:12], c.commit_date, C_NORMAL)
|
|
|
|
elif fetcher in ("url", "pypi"):
|
|
url_info = lib._url_source_info(comp, self.merged_vars)
|
|
kind = url_info.get("kind", "plain")
|
|
version_var = url_info.get("version_var") or "version"
|
|
cur_ver = str(self.merged_vars.get(version_var) or "")
|
|
|
|
if kind == "github":
|
|
uc = self.url_candidates.get(name)
|
|
tag = (uc or {}).get("tag") or (c.release if c else "")
|
|
if tag:
|
|
same = tag == cur_ver or (
|
|
uc and f"{uc.get('base', '')}-{uc.get('release', '')}" in tag
|
|
)
|
|
_row(
|
|
y + 2,
|
|
"Latest:",
|
|
tag,
|
|
(c.release_date if c else ""),
|
|
C_NORMAL if same else C_SUCCESS,
|
|
)
|
|
if uc and uc.get("base") and uc.get("release"):
|
|
_addstr(
|
|
self.stdscr,
|
|
y + 3,
|
|
4,
|
|
f"base={uc['base']} release={uc['release']}",
|
|
curses.color_pair(C_NORMAL),
|
|
w - 6,
|
|
)
|
|
else:
|
|
# pypi / openvsx / plain
|
|
latest = c.release if c else ""
|
|
if cur_ver:
|
|
_addstr(
|
|
self.stdscr,
|
|
y + 2,
|
|
4,
|
|
f"{'current':<9}{cur_ver}",
|
|
curses.color_pair(C_NORMAL),
|
|
w - 6,
|
|
)
|
|
if latest:
|
|
same = latest == cur_ver
|
|
_row(y + 3, "Latest:", latest, "", C_NORMAL if same else C_SUCCESS)
|
|
if same:
|
|
_addstr(
|
|
self.stdscr,
|
|
y + 4,
|
|
4,
|
|
"(up to date)",
|
|
curses.color_pair(C_DIM) | curses.A_DIM,
|
|
)
|
|
else:
|
|
_addstr(
|
|
self.stdscr,
|
|
y + 3,
|
|
4,
|
|
"No candidates (press r to fetch)",
|
|
curses.color_pair(C_NORMAL),
|
|
)
|
|
|
|
else:
|
|
# Special case display: CachyOS hooks return value in c.tag slot
|
|
if c and c.tag:
|
|
_row(y + 2, "Latest:", c.tag, c.tag_date, C_SUCCESS)
|
|
elif c and c.commit:
|
|
_row(y + 2, "Commit:", c.commit[:12], c.commit_date, C_NORMAL)
|
|
else:
|
|
_addstr(
|
|
self.stdscr,
|
|
y + 2,
|
|
4,
|
|
"No candidates (press r to fetch)",
|
|
curses.color_pair(C_NORMAL),
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Action dispatch
|
|
# ------------------------------------------------------------------
|
|
|
|
def _action_for_source(self, name: str) -> None:
|
|
comp = self.merged_srcs[name]
|
|
fetcher = comp.get("fetcher", "none")
|
|
has_cargo = self._has_cargo(name)
|
|
|
|
if fetcher in ("github", "git"):
|
|
self._action_github_git(name, comp, fetcher, has_cargo)
|
|
elif fetcher in ("url", "pypi"):
|
|
self._action_url(name, comp)
|
|
else:
|
|
_show_popup(
|
|
self.stdscr,
|
|
[f"fetcher: {fetcher}", "Use 'e' to edit fields manually."],
|
|
title=name,
|
|
)
|
|
|
|
def _action_github_git(
|
|
self, name: str, comp: lib.Json, fetcher: str, has_cargo: bool
|
|
) -> None:
|
|
if name not in self.candidates:
|
|
self._fetch_candidates_for(name)
|
|
c = self.candidates.get(name, lib.Candidates())
|
|
branch = comp.get("branch") or ""
|
|
|
|
rendered = lib.render(comp, self.merged_vars)
|
|
cur_tag = rendered.get("tag") or ""
|
|
cur_rev = rendered.get("rev") or ""
|
|
cur_str = (
|
|
f"current: tag={cur_tag}"
|
|
if cur_tag
|
|
else f"current: rev={cur_rev[:12]}"
|
|
if cur_rev
|
|
else "current: -"
|
|
)
|
|
if branch:
|
|
cur_str += f" (branch: {branch})"
|
|
|
|
def _av(v: str, d: str) -> str:
|
|
return f"{v} {d}" if v and d else (v or "-")
|
|
|
|
hdr = [
|
|
cur_str,
|
|
"available:",
|
|
f" release : {_av(c.release, c.release_date)}",
|
|
f" tag : {_av(c.tag, c.tag_date)}",
|
|
f" commit : {_av(c.commit[:12] if c.commit else '', c.commit_date)}",
|
|
]
|
|
if has_cargo:
|
|
cargo = comp.get("cargoHash", "")
|
|
hdr.append(
|
|
f"cargoHash: {cargo[:32]}{'...' if len(cargo) > 32 else cargo if not cargo else ''}"
|
|
)
|
|
|
|
items: List[Tuple[str, Tuple[str, str]]] = []
|
|
if fetcher == "github" and not branch:
|
|
if c.release:
|
|
items.append(
|
|
(f"Use latest release ({c.release})", ("release", c.release))
|
|
)
|
|
if c.tag:
|
|
items.append((f"Use latest tag ({c.tag})", ("tag", c.tag)))
|
|
if c.commit:
|
|
items.append(
|
|
(f"Use latest commit ({c.commit[:12]})", ("commit", c.commit))
|
|
)
|
|
items.append(("Recompute hash", ("hash", "")))
|
|
if has_cargo:
|
|
items.append(("Recompute cargo hash", ("cargo_hash", "")))
|
|
items.append(("Change branch", ("change_branch", "")))
|
|
items.append(("Cancel", ("cancel", "")))
|
|
|
|
choice = _select_menu(
|
|
self.stdscr,
|
|
f"Actions: {name}",
|
|
[label for label, _ in items],
|
|
header=hdr,
|
|
)
|
|
if choice is None:
|
|
return
|
|
|
|
kind, val = items[choice][1]
|
|
if kind == "cancel":
|
|
return
|
|
|
|
if kind in ("release", "tag", "commit"):
|
|
if not val:
|
|
self.set_status(f"No candidate for {kind}", error=True)
|
|
return
|
|
self._set_ref(name, kind, val)
|
|
self.set_status(f"{name}: fetching hash...")
|
|
self.stdscr.refresh()
|
|
sri = self._prefetch_hash(name)
|
|
if sri:
|
|
self._write_hash(name, sri)
|
|
if has_cargo:
|
|
self.set_status(f"{name}: computing cargo hash...")
|
|
self.stdscr.refresh()
|
|
cargo = self._prefetch_cargo(name)
|
|
if cargo:
|
|
self._write_cargo_hash(name, cargo)
|
|
self.set_status(
|
|
f"{name}: updated ref + hash + cargo hash", ok=True
|
|
)
|
|
else:
|
|
self.set_status(
|
|
f"{name}: updated ref + hash; cargo hash failed", error=True
|
|
)
|
|
else:
|
|
self.set_status(f"{name}: updated ref and hash", ok=True)
|
|
else:
|
|
self.set_status(f"{name}: hash prefetch failed", error=True)
|
|
|
|
elif kind == "hash":
|
|
self.set_status(f"{name}: fetching hash...")
|
|
self.stdscr.refresh()
|
|
sri = self._prefetch_hash(name)
|
|
if sri:
|
|
self._write_hash(name, sri)
|
|
self.set_status(f"{name}: hash updated", ok=True)
|
|
else:
|
|
self.set_status(f"{name}: hash prefetch failed", error=True)
|
|
|
|
elif kind == "cargo_hash":
|
|
self.set_status(f"{name}: computing cargo hash...")
|
|
self.stdscr.refresh()
|
|
cargo = self._prefetch_cargo(name)
|
|
if cargo:
|
|
self._write_cargo_hash(name, cargo)
|
|
self.set_status(f"{name}: cargo hash updated", ok=True)
|
|
else:
|
|
self.set_status(f"{name}: cargo hash failed", error=True)
|
|
|
|
elif kind == "change_branch":
|
|
self._action_change_branch(name, comp, fetcher, has_cargo)
|
|
|
|
def _action_change_branch(
|
|
self, name: str, comp: lib.Json, fetcher: str, has_cargo: bool
|
|
) -> None:
|
|
cur_branch = comp.get("branch") or ""
|
|
prompt = (
|
|
f"New branch for '{name}' (blank to clear, current: {cur_branch!r}): "
|
|
if cur_branch
|
|
else f"Branch to track for '{name}' (blank to cancel): "
|
|
)
|
|
new_branch = _prompt(self.stdscr, prompt)
|
|
|
|
# blank input when there was no branch → cancelled
|
|
if new_branch is None:
|
|
self.set_status("Cancelled.")
|
|
return
|
|
|
|
ts = self.target_dict.setdefault("sources", {})
|
|
comp_w = ts.setdefault(name, {})
|
|
|
|
if new_branch:
|
|
comp_w["branch"] = new_branch
|
|
else:
|
|
comp_w.pop("branch", None)
|
|
# Also remove from merged view target if previously set at this level
|
|
if not new_branch and not cur_branch:
|
|
self.set_status("No branch to clear.")
|
|
return
|
|
|
|
# Resolve HEAD of the new branch
|
|
self.set_status(
|
|
f"{name}: resolving HEAD of {new_branch!r}..."
|
|
if new_branch
|
|
else f"{name}: branch cleared, fetching HEAD..."
|
|
)
|
|
self.stdscr.refresh()
|
|
self._refresh_view()
|
|
|
|
if fetcher == "github":
|
|
owner = comp.get("owner") or ""
|
|
repo = comp.get("repo") or ""
|
|
rev = (
|
|
lib.gh_head_commit(owner, repo, new_branch or None)
|
|
if (owner and repo)
|
|
else None
|
|
)
|
|
else: # git
|
|
url = comp.get("url") or ""
|
|
rev = lib.git_branch_commit(url, new_branch or None) if url else None
|
|
|
|
if not rev:
|
|
self.set_status(
|
|
f"{name}: branch {'set' if new_branch else 'cleared'} but could not resolve HEAD",
|
|
error=True,
|
|
)
|
|
return
|
|
|
|
comp_w["rev"] = rev
|
|
comp_w.pop("tag", None)
|
|
self._refresh_view()
|
|
|
|
# Invalidate cached candidates so next fetch uses the new branch
|
|
self.candidates.pop(name, None)
|
|
|
|
self.set_status(f"{name}: fetching hash for {rev[:12]}...")
|
|
self.stdscr.refresh()
|
|
sri = self._prefetch_hash(name)
|
|
if not sri:
|
|
self.set_status(
|
|
f"{name}: branch updated to {rev[:12]}; hash prefetch failed",
|
|
error=True,
|
|
)
|
|
return
|
|
|
|
self._write_hash(name, sri)
|
|
|
|
if has_cargo:
|
|
self.set_status(f"{name}: computing cargo hash...")
|
|
self.stdscr.refresh()
|
|
cargo = self._prefetch_cargo(name)
|
|
if cargo:
|
|
self._write_cargo_hash(name, cargo)
|
|
result = f"branch={'none' if not new_branch else new_branch!r}, rev={rev[:12]}, hash+cargo updated"
|
|
else:
|
|
result = f"branch={'none' if not new_branch else new_branch!r}, rev={rev[:12]}, hash updated; cargo hash failed"
|
|
self.set_status(
|
|
f"{name}: {result}", ok=cargo is not None, error=cargo is None
|
|
)
|
|
else:
|
|
self.set_status(
|
|
f"{name}: branch={'none' if not new_branch else repr(new_branch)}, rev={rev[:12]}, hash updated",
|
|
ok=True,
|
|
)
|
|
|
|
def _action_url(self, name: str, comp: lib.Json) -> None:
|
|
if name not in self.candidates:
|
|
self._fetch_candidates_for(name)
|
|
c = self.candidates.get(name, lib.Candidates())
|
|
|
|
url_info = lib._url_source_info(comp, self.merged_vars)
|
|
kind_label = url_info.get("kind", "plain")
|
|
|
|
# Determine current version display
|
|
cur_version = ""
|
|
version_var = url_info.get("version_var") or "version"
|
|
if kind_label in ("pypi", "openvsx", "plain"):
|
|
cur_version = str(self.merged_vars.get(version_var) or "")
|
|
elif kind_label == "github":
|
|
# proton-cachyos style: base+release variables
|
|
uc = self.url_candidates.get(name)
|
|
base = str(self.merged_vars.get("base") or "")
|
|
rel = str(self.merged_vars.get("release") or "")
|
|
rp = str(self.merged_vars.get("releasePrefix") or "")
|
|
rs = str(self.merged_vars.get("releaseSuffix") or "")
|
|
cur_version = (
|
|
f"{rp}{base}-{rel}{rs}"
|
|
if (base and rel)
|
|
else (lib.source_ref_label(comp, self.merged_vars))
|
|
)
|
|
|
|
latest = c.release or ""
|
|
hdr = [
|
|
f"type : {kind_label}",
|
|
f"current : {cur_version or '-'}",
|
|
f"latest : {latest or '(press r to fetch)'}",
|
|
]
|
|
|
|
items: List[Tuple[str, str]] = []
|
|
|
|
if kind_label == "github":
|
|
uc = self.url_candidates.get(name)
|
|
if uc and uc.get("base") and uc.get("release"):
|
|
items.append((f"Use latest release ({uc['tag']})", "update_vars"))
|
|
elif latest:
|
|
items.append((f"Use latest release ({latest})", "update_version"))
|
|
|
|
elif kind_label in ("pypi", "openvsx"):
|
|
if latest and latest != cur_version:
|
|
items.append((f"Update to {latest}", "update_version"))
|
|
elif latest:
|
|
hdr.append("(already at latest)")
|
|
|
|
elif kind_label == "plain":
|
|
if latest and latest != cur_version:
|
|
items.append((f"Update to {latest}", "update_version"))
|
|
|
|
items.append(("Recompute hash", "hash"))
|
|
items.append(("Cancel", "cancel"))
|
|
|
|
choice = _select_menu(
|
|
self.stdscr,
|
|
f"Actions: {name}",
|
|
[label for label, _ in items],
|
|
header=hdr,
|
|
)
|
|
if choice is None:
|
|
return
|
|
_, action = items[choice]
|
|
if action == "cancel":
|
|
return
|
|
|
|
if action == "update_vars":
|
|
# GitHub release with base+release variable split (proton-cachyos style)
|
|
uc = self.url_candidates.get(name)
|
|
if uc:
|
|
vs = self.target_dict.setdefault("variables", {})
|
|
vs["base"] = uc["base"]
|
|
vs["release"] = uc["release"]
|
|
self._refresh_view()
|
|
self.set_status(f"{name}: fetching hash for {uc['tag']}...")
|
|
self.stdscr.refresh()
|
|
sri = self._prefetch_hash(name)
|
|
if sri:
|
|
self._write_hash(name, sri)
|
|
self.set_status(f"{name}: updated to {uc['tag']}", ok=True)
|
|
else:
|
|
self.set_status(
|
|
f"{name}: variables updated; hash prefetch failed", error=True
|
|
)
|
|
|
|
elif action == "update_version":
|
|
new_ver = latest
|
|
self.set_status(f"{name}: updating to {new_ver}...")
|
|
self.stdscr.refresh()
|
|
|
|
if kind_label == "pypi":
|
|
# For pypi fetcher: also need to fetch a new hash from PyPI directly
|
|
pkg_name = url_info.get("name") or str(
|
|
self.merged_vars.get("name") or name
|
|
)
|
|
vs = self.target_dict.setdefault("variables", {})
|
|
vs[version_var] = new_ver
|
|
self._refresh_view()
|
|
self.set_status(f"{name}: fetching PyPI hash for {new_ver}...")
|
|
self.stdscr.refresh()
|
|
sri = lib.pypi_hash(pkg_name, new_ver)
|
|
if sri:
|
|
self._write_hash(name, sri)
|
|
self.set_status(f"{name}: updated to {new_ver}", ok=True)
|
|
else:
|
|
self.set_status(
|
|
f"{name}: version updated; hash prefetch failed", error=True
|
|
)
|
|
|
|
else:
|
|
# url/openvsx/plain: update variable, render new URL, prefetch
|
|
vs = self.target_dict.setdefault("variables", {})
|
|
vs[version_var] = new_ver
|
|
# For GitHub release assets, also update the tag field on the source
|
|
if kind_label == "github":
|
|
tag_tmpl = comp.get("tag") or ""
|
|
if tag_tmpl:
|
|
ts = self.target_dict.setdefault("sources", {})
|
|
ts.setdefault(name, {})["tag"] = lib.render(
|
|
tag_tmpl, {**self.merged_vars, version_var: new_ver}
|
|
)
|
|
self._refresh_view()
|
|
self.set_status(f"{name}: fetching hash for {new_ver}...")
|
|
self.stdscr.refresh()
|
|
sri = self._prefetch_hash(name)
|
|
if sri:
|
|
self._write_hash(name, sri)
|
|
self.set_status(f"{name}: updated to {new_ver}", ok=True)
|
|
else:
|
|
self.set_status(
|
|
f"{name}: version updated; hash prefetch failed", error=True
|
|
)
|
|
|
|
elif action == "hash":
|
|
self.set_status(f"{name}: fetching hash...")
|
|
self.stdscr.refresh()
|
|
sri = self._prefetch_hash(name)
|
|
if sri:
|
|
self._write_hash(name, sri)
|
|
self.set_status(f"{name}: hash updated", ok=True)
|
|
else:
|
|
self.set_status(f"{name}: hash prefetch failed", error=True)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Main loop
|
|
# ------------------------------------------------------------------
|
|
|
|
def run(self) -> None:
|
|
while True:
|
|
self._draw()
|
|
ch = self.stdscr.getch()
|
|
|
|
if ch in (ord("q"), 27):
|
|
return
|
|
elif ch in (curses.KEY_BACKSPACE, 127):
|
|
return
|
|
elif ch in (curses.KEY_LEFT,):
|
|
self.vidx = max(0, self.vidx - 1)
|
|
self.candidates.clear()
|
|
self.url_candidates.clear()
|
|
self._refresh_view()
|
|
elif ch in (curses.KEY_RIGHT,):
|
|
self.vidx = min(len(self.variants) - 1, self.vidx + 1)
|
|
self.candidates.clear()
|
|
self.url_candidates.clear()
|
|
self._refresh_view()
|
|
elif ch in (curses.KEY_UP, ord("k")):
|
|
self.sidx = max(0, self.sidx - 1)
|
|
elif ch in (curses.KEY_DOWN, ord("j")):
|
|
self.sidx = min(max(0, len(self.snames) - 1), self.sidx + 1)
|
|
elif ch == ord("r"):
|
|
if self.snames:
|
|
name = self.snames[self.sidx]
|
|
self.set_status(f"{name}: fetching candidates...")
|
|
self.stdscr.refresh()
|
|
self._fetch_candidates_for(name)
|
|
c = self.candidates.get(name, lib.Candidates())
|
|
|
|
def _fv(v: str, d: str) -> str:
|
|
return f"{v} {d}" if v and d else (v or "-")
|
|
|
|
_show_popup(
|
|
self.stdscr,
|
|
[
|
|
f"Candidates for {name}:",
|
|
f" release : {_fv(c.release, c.release_date)}",
|
|
f" tag : {_fv(c.tag, c.tag_date)}",
|
|
f" commit : {_fv(c.commit[:12] if c.commit else '', c.commit_date)}",
|
|
],
|
|
title=name,
|
|
)
|
|
self.set_status("")
|
|
|
|
elif ch == ord("h"):
|
|
if self.snames:
|
|
name = self.snames[self.sidx]
|
|
self.set_status(f"{name}: fetching hash...")
|
|
self.stdscr.refresh()
|
|
sri = self._prefetch_hash(name)
|
|
if sri:
|
|
self._write_hash(name, sri)
|
|
if self._has_cargo(name):
|
|
self.set_status(f"{name}: computing cargo hash...")
|
|
self.stdscr.refresh()
|
|
cargo = self._prefetch_cargo(name)
|
|
if cargo:
|
|
self._write_cargo_hash(name, cargo)
|
|
self.set_status(
|
|
f"{name}: updated hash + cargo hash", ok=True
|
|
)
|
|
else:
|
|
self.set_status(
|
|
f"{name}: updated hash; cargo hash failed",
|
|
error=True,
|
|
)
|
|
else:
|
|
self.set_status(f"{name}: hash updated", ok=True)
|
|
else:
|
|
self.set_status(f"{name}: hash prefetch failed", error=True)
|
|
|
|
elif ch == ord("c"):
|
|
if self.snames:
|
|
name = self.snames[self.sidx]
|
|
if self._has_cargo(name):
|
|
self.set_status(f"{name}: computing cargo hash...")
|
|
self.stdscr.refresh()
|
|
cargo = self._prefetch_cargo(name)
|
|
if cargo:
|
|
self._write_cargo_hash(name, cargo)
|
|
self.set_status(f"{name}: cargo hash updated", ok=True)
|
|
else:
|
|
self.set_status(f"{name}: cargo hash failed", error=True)
|
|
else:
|
|
self.set_status(f"{self.snames[self.sidx]}: no cargoHash field")
|
|
|
|
elif ch == ord("e"):
|
|
val = _prompt(
|
|
self.stdscr, "Edit path=value (relative to selected base/variant):"
|
|
)
|
|
if val and "=" in val:
|
|
k, v = val.split("=", 1)
|
|
path_tokens = [p for p in k.split(".") if p]
|
|
# Write to the current base/variant cursor dict
|
|
cursor = (
|
|
self.spec
|
|
if self.vidx == 0
|
|
else (
|
|
self.spec.get("variants", {}).get(
|
|
self._variant_name() or "", self.spec
|
|
)
|
|
)
|
|
)
|
|
lib.deep_set(cursor, path_tokens, v)
|
|
self._refresh_view()
|
|
self.set_status(f"Set {k} = {v!r}", ok=True)
|
|
elif val:
|
|
self.set_status(
|
|
"Invalid format; expected key.path=value", error=True
|
|
)
|
|
|
|
elif ch == ord("i"):
|
|
# Show full rendered URL for url fetcher sources
|
|
if self.snames:
|
|
name = self.snames[self.sidx]
|
|
comp = self.merged_srcs[name]
|
|
if comp.get("fetcher") == "url":
|
|
rendered = lib.render(comp, self.merged_vars)
|
|
url = rendered.get("url") or rendered.get("urlTemplate") or ""
|
|
_show_popup(self.stdscr, ["Full URL:", url], title=name)
|
|
else:
|
|
self.set_status(f"Not a url fetcher source")
|
|
|
|
elif ch == ord("s"):
|
|
try:
|
|
self._save()
|
|
self.set_status("Saved.", ok=True)
|
|
except Exception as e:
|
|
self.set_status(f"Save failed: {e}", error=True)
|
|
|
|
elif ch in (curses.KEY_ENTER, 10, 13):
|
|
if self.snames:
|
|
self._action_for_source(self.snames[self.sidx])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _main(stdscr: Any) -> None:
|
|
curses.curs_set(0)
|
|
stdscr.nodelay(False)
|
|
if curses.has_colors():
|
|
_init_colors()
|
|
try:
|
|
PackagesScreen(stdscr).run()
|
|
except Exception:
|
|
curses.endwin()
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
curses.wrapper(_main)
|