#!/usr/bin/env python3 """ Shared library for version.json management. Provides: - JSON load/save - Variable template rendering - Base+variant merge (mirrors lib/versioning/default.nix) - GitHub/Git candidate fetching - Nix hash prefetching (fetchFromGitHub, fetchgit, fetchurl, fetchzip, cargo vendor) - Package scanning """ from __future__ import annotations import json import os import re import subprocess import sys import urllib.error import urllib.parse import urllib.request from pathlib import Path from typing import Any, Dict, List, Optional, Tuple Json = Dict[str, Any] ROOT = Path(__file__).resolve().parents[1] PKGS_DIR = ROOT / "packages" # --------------------------------------------------------------------------- # I/O # --------------------------------------------------------------------------- def load_json(path: Path) -> Json: with path.open("r", encoding="utf-8") as f: return json.load(f) def save_json(path: Path, data: Json) -> None: tmp = path.with_suffix(".tmp") with tmp.open("w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) f.write("\n") tmp.replace(path) def eprint(*args: Any, **kwargs: Any) -> None: print(*args, file=sys.stderr, **kwargs) # --------------------------------------------------------------------------- # Template rendering # --------------------------------------------------------------------------- def render(value: Any, variables: Dict[str, Any]) -> Any: """Recursively substitute ${var} in strings using the given variable map.""" if isinstance(value, str): return re.sub( r"\$\{([^}]+)\}", lambda m: str(variables.get(m.group(1), m.group(0))), value, ) if isinstance(value, dict): return {k: render(v, variables) for k, v in value.items()} if isinstance(value, list): return [render(v, variables) for v in value] return value # --------------------------------------------------------------------------- # Merge (matches lib/versioning/default.nix) # --------------------------------------------------------------------------- def _deep_merge(a: Json, b: Json) -> Json: out = dict(a) for k, v in b.items(): if k in out and isinstance(out[k], dict) and isinstance(v, dict): out[k] = _deep_merge(out[k], v) else: out[k] = v return out def _merge_sources(base: Json, overrides: Json) -> Json: names = set(base) | set(overrides) result: Json = {} for n in names: if n in base and n in overrides: b, o = base[n], overrides[n] result[n] = ( _deep_merge(b, o) if isinstance(b, dict) and isinstance(o, dict) else o ) elif n in overrides: result[n] = overrides[n] else: result[n] = base[n] return result def merged_view(spec: Json, variant_name: Optional[str]) -> Tuple[Json, Json, Json]: """ Return (merged_variables, merged_sources, write_target). merged_variables / merged_sources: what to use for display and prefetching. write_target: the dict to mutate when saving changes (base spec or the variant sub-dict). """ base_vars: Json = spec.get("variables") or {} base_srcs: Json = spec.get("sources") or {} if variant_name: vdict = (spec.get("variants") or {}).get(variant_name) if not isinstance(vdict, dict): raise ValueError(f"Variant '{variant_name}' not found in spec") v_vars: Json = vdict.get("variables") or {} v_srcs: Json = vdict.get("sources") or {} merged_vars = {**base_vars, **v_vars} merged_srcs = _merge_sources(base_srcs, v_srcs) return merged_vars, merged_srcs, vdict return dict(base_vars), dict(base_srcs), spec # --------------------------------------------------------------------------- # Shell helpers # --------------------------------------------------------------------------- def _run(args: List[str], *, capture_stderr: bool = True) -> Tuple[int, str, str]: p = subprocess.run( args, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE if capture_stderr else None, check=False, ) return p.returncode, (p.stdout or "").strip(), (p.stderr or "").strip() def _run_out(args: List[str]) -> Optional[str]: code, out, err = _run(args) if code != 0: eprint(f"Command failed: {' '.join(args)}\n{err}") return None return out # --------------------------------------------------------------------------- # HTTP helpers # --------------------------------------------------------------------------- def http_get_json(url: str, token: Optional[str] = None) -> Optional[Any]: try: req = urllib.request.Request( url, headers={"Accept": "application/vnd.github+json"} ) if token: req.add_header("Authorization", f"Bearer {token}") with urllib.request.urlopen(req, timeout=15) as resp: return json.loads(resp.read().decode("utf-8")) except urllib.error.HTTPError as e: eprint(f"HTTP {e.code} for {url}: {e.reason}") except Exception as e: eprint(f"Request failed for {url}: {e}") return None def http_get_text(url: str) -> Optional[str]: try: req = urllib.request.Request( url, headers={"User-Agent": "nix-version-manager/2.0"} ) with urllib.request.urlopen(req, timeout=15) as resp: return resp.read().decode("utf-8") except urllib.error.HTTPError as e: eprint(f"HTTP {e.code} for {url}: {e.reason}") except Exception as e: eprint(f"Request failed for {url}: {e}") return None # --------------------------------------------------------------------------- # GitHub API helpers # --------------------------------------------------------------------------- def gh_token() -> Optional[str]: return os.environ.get("GITHUB_TOKEN") def gh_latest_release(owner: str, repo: str) -> Optional[str]: data = http_get_json( f"https://api.github.com/repos/{owner}/{repo}/releases/latest", gh_token() ) return data.get("tag_name") if isinstance(data, dict) else None def gh_latest_tag( owner: str, repo: str, *, tag_regex: Optional[str] = None ) -> Optional[str]: data = http_get_json( f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=100", gh_token() ) if not isinstance(data, list): return None tags = [t["name"] for t in data if isinstance(t, dict) and t.get("name")] if tag_regex: rx = re.compile(tag_regex) tags = [t for t in tags if rx.search(t)] return tags[0] if tags else None def gh_list_tags(owner: str, repo: str) -> List[str]: data = http_get_json( f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=100", gh_token() ) if not isinstance(data, list): return [] return [t["name"] for t in data if isinstance(t, dict) and t.get("name")] def gh_head_commit( owner: str, repo: str, branch: Optional[str] = None ) -> Optional[str]: ref = f"refs/heads/{branch}" if branch else "HEAD" out = _run_out(["git", "ls-remote", f"https://github.com/{owner}/{repo}.git", ref]) if not out: return None for line in out.splitlines(): parts = line.split() if parts: return parts[0] return None def gh_release_tags(owner: str, repo: str) -> List[str]: data = http_get_json( f"https://api.github.com/repos/{owner}/{repo}/releases?per_page=50", gh_token() ) if not isinstance(data, list): return [] return [r["tag_name"] for r in data if isinstance(r, dict) and r.get("tag_name")] def _iso_to_date(iso: str) -> str: return iso[:10] if iso and len(iso) >= 10 else "" def gh_ref_date(owner: str, repo: str, ref: str) -> str: data = http_get_json( f"https://api.github.com/repos/{owner}/{repo}/commits/{urllib.parse.quote(ref, safe='')}", gh_token(), ) if not isinstance(data, dict): return "" iso = ( (data.get("commit") or {}).get("committer", {}).get("date") or (data.get("commit") or {}).get("author", {}).get("date") or "" ) return _iso_to_date(iso) def gh_release_date(owner: str, repo: str, tag: str) -> str: data = http_get_json( f"https://api.github.com/repos/{owner}/{repo}/releases/tags/{urllib.parse.quote(tag, safe='')}", gh_token(), ) if isinstance(data, dict): iso = data.get("published_at") or data.get("created_at") or "" if iso: return _iso_to_date(iso) return gh_ref_date(owner, repo, tag) def git_branch_commit(url: str, branch: Optional[str] = None) -> Optional[str]: ref = f"refs/heads/{branch}" if branch else "HEAD" out = _run_out(["git", "ls-remote", url, ref]) if not out: return None for line in out.splitlines(): parts = line.split() if parts: return parts[0] return None def git_commit_date_for_github(url: str, sha: str) -> str: """Only works for github.com URLs; returns YYYY-MM-DD or empty string.""" try: parsed = urllib.parse.urlparse(url) if parsed.hostname != "github.com": return "" parts = [p for p in parsed.path.split("/") if p] if len(parts) < 2: return "" owner = parts[0] repo = parts[1].removesuffix(".git") return gh_ref_date(owner, repo, sha) except Exception: return "" # --------------------------------------------------------------------------- # Nix prefetch helpers # --------------------------------------------------------------------------- def _nix_fakehash_build(expr: str) -> Optional[str]: """ Build a Nix expression that intentionally uses lib.fakeHash, parse the correct hash from the 'got:' line in the error output. """ p = subprocess.run( ["nix", "build", "--impure", "--expr", expr], text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) m = re.search(r"got:\s+(sha256-[A-Za-z0-9+/=]+)", p.stderr) if m: return m.group(1) eprint(f"nix fakeHash build failed:\n{p.stderr[-800:]}") return None def prefetch_github( owner: str, repo: str, rev: str, *, submodules: bool = False ) -> Optional[str]: """ Hash for fetchFromGitHub — NAR hash of unpacked tarball. Must use the fakeHash trick; nix store prefetch-file gives the wrong hash. """ sub = "true" if submodules else "false" expr = ( f"let pkgs = import {{}};\n" f"in pkgs.fetchFromGitHub {{\n" f' owner = "{owner}";\n' f' repo = "{repo}";\n' f' rev = "{rev}";\n' f" fetchSubmodules = {sub};\n" f" hash = pkgs.lib.fakeHash;\n" f"}}" ) return _nix_fakehash_build(expr) def prefetch_url(url: str) -> Optional[str]: """ Flat (non-unpacked) hash for fetchurl. Uses nix store prefetch-file; falls back to nix-prefetch-url. """ out = _run_out( ["nix", "store", "prefetch-file", "--hash-type", "sha256", "--json", url] ) if out: try: data = json.loads(out) if "hash" in data: return data["hash"] except Exception: pass out = _run_out(["nix-prefetch-url", "--type", "sha256", url]) if out is None: out = _run_out(["nix-prefetch-url", url]) if out is None: return None return _run_out(["nix", "hash", "to-sri", "--type", "sha256", out]) def prefetch_fetchzip(url: str, *, strip_root: bool = True) -> Optional[str]: """Hash for fetchzip — NAR of unpacked archive. Must use the fakeHash trick.""" expr = ( f"let pkgs = import {{}};\n" f"in pkgs.fetchzip {{\n" f' url = "{url}";\n' f" stripRoot = {'true' if strip_root else 'false'};\n" f" hash = pkgs.lib.fakeHash;\n" f"}}" ) return _nix_fakehash_build(expr) def prefetch_git(url: str, rev: str) -> Optional[str]: """Hash for fetchgit.""" out = _run_out(["nix-prefetch-git", "--no-deepClone", "--rev", rev, url]) if out is not None: base32 = None try: data = json.loads(out) base32 = data.get("sha256") or data.get("hash") except Exception: lines = [l for l in out.splitlines() if l.strip()] if lines: base32 = lines[-1].strip() if base32: return _run_out(["nix", "hash", "to-sri", "--type", "sha256", base32]) # Fallback: builtins.fetchGit + nix hash path (commit SHA only) if re.match(r"^[0-9a-f]{40}$", rev): expr = f'builtins.fetchGit {{ url = "{url}"; rev = "{rev}"; }}' store_path = _run_out(["nix", "eval", "--raw", "--expr", expr]) if store_path: return _run_out(["nix", "hash", "path", "--type", "sha256", store_path]) return None def prefetch_cargo_vendor( fetcher: str, src_hash: str, *, url: str = "", owner: str = "", repo: str = "", rev: str = "", subdir: str = "", ) -> Optional[str]: """Compute the cargo vendor hash via fetchCargoVendor + fakeHash.""" if fetcher == "github" and owner and repo and rev and src_hash: src_expr = ( f'pkgs.fetchFromGitHub {{ owner = "{owner}"; repo = "{repo}";' f' rev = "{rev}"; hash = "{src_hash}"; }}' ) elif fetcher == "git" and url and rev and src_hash: parsed = urllib.parse.urlparse(url) parts = [p for p in parsed.path.split("/") if p] if parsed.hostname == "github.com" and len(parts) >= 2: gh_owner, gh_repo = parts[0], parts[1] src_expr = ( f'pkgs.fetchFromGitHub {{ owner = "{gh_owner}"; repo = "{gh_repo}";' f' rev = "{rev}"; hash = "{src_hash}"; }}' ) else: src_expr = f'pkgs.fetchgit {{ url = "{url}"; rev = "{rev}"; hash = "{src_hash}"; }}' else: return None subdir_attr = f'sourceRoot = "${{src.name}}/{subdir}";' if subdir else "" expr = ( f"let pkgs = import {{}};\n" f" src = {src_expr};\n" f"in pkgs.rustPlatform.fetchCargoVendor {{\n" f" inherit src;\n" f" {subdir_attr}\n" f" hash = pkgs.lib.fakeHash;\n" f"}}" ) p = subprocess.run( ["nix", "build", "--impure", "--expr", expr], text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) m = re.search(r"got:\s+(sha256-[A-Za-z0-9+/=]+)", p.stderr) if m: return m.group(1) eprint(f"cargo vendor prefetch failed:\n{p.stderr[-600:]}") return None # --------------------------------------------------------------------------- # Source prefetch dispatch # --------------------------------------------------------------------------- def prefetch_source(comp: Json, merged_vars: Json) -> Optional[str]: """ Compute and return the SRI hash for a source component using the correct Nix fetcher. Returns None on failure. """ fetcher = comp.get("fetcher", "none") rendered = render(comp, merged_vars) if fetcher == "github": owner = comp.get("owner") or "" repo = comp.get("repo") or "" ref = rendered.get("tag") or rendered.get("rev") or "" submodules = bool(comp.get("submodules", False)) if owner and repo and ref: return prefetch_github(owner, repo, ref, submodules=submodules) elif fetcher == "git": url = comp.get("url") or "" rev = rendered.get("rev") or rendered.get("tag") or "" if url and rev: return prefetch_git(url, rev) elif fetcher == "url": url = rendered.get("url") or rendered.get("urlTemplate") or "" if url: extra = comp.get("extra") or {} if extra.get("unpack") == "zip": return prefetch_fetchzip(url, strip_root=extra.get("stripRoot", True)) return prefetch_url(url) return None # --------------------------------------------------------------------------- # Candidate fetching (what versions are available upstream) # --------------------------------------------------------------------------- class Candidates: """Latest available refs for a source component.""" __slots__ = ("release", "release_date", "tag", "tag_date", "commit", "commit_date") def __init__(self) -> None: self.release = self.release_date = "" self.tag = self.tag_date = "" self.commit = self.commit_date = "" def fetch_candidates(comp: Json, merged_vars: Json) -> Candidates: """ Fetch the latest release, tag, and commit for a source component. For 'url' fetcher with github variables, fetches the latest release tag. """ c = Candidates() fetcher = comp.get("fetcher", "none") branch: Optional[str] = comp.get("branch") or None if fetcher == "github": owner = comp.get("owner") or "" repo = comp.get("repo") or "" if not (owner and repo): return c if not branch: r = gh_latest_release(owner, repo) if r: c.release = r c.release_date = gh_release_date(owner, repo, r) t = gh_latest_tag(owner, repo) if t: c.tag = t c.tag_date = gh_ref_date(owner, repo, t) m = gh_head_commit(owner, repo, branch) if m: c.commit = m c.commit_date = gh_ref_date(owner, repo, m) elif fetcher == "git": url = comp.get("url") or "" if url: m = git_branch_commit(url, branch) if m: c.commit = m c.commit_date = git_commit_date_for_github(url, m) elif fetcher == "url": url_info = _url_source_info(comp, merged_vars) kind = url_info.get("kind") if kind == "github": owner = url_info["owner"] repo = url_info["repo"] tags = gh_release_tags(owner, repo) prefix = str(merged_vars.get("releasePrefix") or "") suffix = str(merged_vars.get("releaseSuffix") or "") if prefix or suffix: latest = next( (t for t in tags if t.startswith(prefix) and t.endswith(suffix)), None, ) else: latest = tags[0] if tags else None if latest: c.release = latest c.release_date = gh_release_date(owner, repo, latest) elif kind == "pypi": name = url_info["name"] latest = pypi_latest_version(name) if latest: c.release = latest elif kind == "openvsx": publisher = url_info["publisher"] ext_name = url_info["ext_name"] latest = openvsx_latest_version(publisher, ext_name) if latest: c.release = latest return c # --------------------------------------------------------------------------- # Non-git upstream version helpers # --------------------------------------------------------------------------- def pypi_latest_version(name: str) -> Optional[str]: """Return the latest stable release version from PyPI.""" data = http_get_json(f"https://pypi.org/pypi/{urllib.parse.quote(name)}/json") if not isinstance(data, dict): return None return (data.get("info") or {}).get("version") or None def pypi_hash(name: str, version: str) -> Optional[str]: """ Return the SRI hash for a PyPI sdist or wheel using nix-prefetch. Falls back to a fake-hash Nix build if nix-prefetch-url is unavailable. """ data = http_get_json( f"https://pypi.org/pypi/{urllib.parse.quote(name)}/{urllib.parse.quote(version)}/json" ) if not isinstance(data, dict): return None urls = data.get("urls") or [] # Prefer sdist; fall back to any wheel sdist_url = next((u["url"] for u in urls if u.get("packagetype") == "sdist"), None) wheel_url = next( (u["url"] for u in urls if u.get("packagetype") == "bdist_wheel"), None ) url = sdist_url or wheel_url if not url: return None return prefetch_url(url) def openvsx_latest_version(publisher: str, ext_name: str) -> Optional[str]: """Return the latest version of an extension from Open VSX Registry.""" data = http_get_json( f"https://open-vsx.org/api/{urllib.parse.quote(publisher)}/{urllib.parse.quote(ext_name)}" ) if not isinstance(data, dict): return None return data.get("version") or None def _url_source_info(comp: Json, merged_vars: Json) -> Json: """ Classify a url-fetcher source and extract the relevant identifiers. Returns a dict with at least 'kind' in: 'github' — GitHub release asset; includes 'owner', 'repo' 'pypi' — PyPI package; includes 'name', 'version_var' 'openvsx' — Open VSX extension; includes 'publisher', 'ext_name', 'version_var' 'plain' — plain URL with a version variable; includes 'version_var' if found 'static' — hardcoded URL with no variable parts """ tmpl = comp.get("urlTemplate") or comp.get("url") or "" # Check merged_vars for explicit github owner/repo owner = str(merged_vars.get("owner") or "") repo = str(merged_vars.get("repo") or "") if owner and repo: return {"kind": "github", "owner": owner, "repo": repo} # Detect from URL template gh_m = re.search(r"github\.com/([^/\$]+)/([^/\$]+)/releases/download", tmpl) if gh_m: vvar = _find_version_var(tmpl, merged_vars) return { "kind": "github", "owner": gh_m.group(1), "repo": gh_m.group(2), "version_var": vvar, } # Open VSX (open-vsx.org/api/${publisher}/${name}/${version}/...) vsx_m = re.search( r"open-vsx\.org/api/([^/\$]+)/([^/\$]+)/(?:\$\{[^}]+\}|[^/]+)/file", tmpl ) if not vsx_m: # Also match when publisher/name come from variables if "open-vsx.org/api/" in tmpl: publisher = str(merged_vars.get("publisher") or "") ext_name = str(merged_vars.get("name") or "") if publisher and ext_name: vvar = _find_version_var(tmpl, merged_vars) return { "kind": "openvsx", "publisher": publisher, "ext_name": ext_name, "version_var": vvar, } if vsx_m: publisher = vsx_m.group(1) ext_name = vsx_m.group(2) # publisher/ext_name may be literal or variable refs publisher = str(merged_vars.get(publisher.lstrip("${").rstrip("}"), publisher)) ext_name = str(merged_vars.get(ext_name.lstrip("${").rstrip("}"), ext_name)) vvar = _find_version_var(tmpl, merged_vars) return { "kind": "openvsx", "publisher": publisher, "ext_name": ext_name, "version_var": vvar, } # PyPI: files.pythonhosted.org URLs if "files.pythonhosted.org" in tmpl or "pypi.org" in tmpl: pypi_name = str(merged_vars.get("name") or "") if not pypi_name: m = re.search(r"/packages/[^/]+/[^/]+/([^/]+)-\d", tmpl) pypi_name = m.group(1).replace("_", "-") if m else "" vvar = _find_version_var(tmpl, merged_vars) return {"kind": "pypi", "name": pypi_name, "version_var": vvar} vvar = _find_version_var(tmpl, merged_vars) if vvar: return {"kind": "plain", "version_var": vvar} return {"kind": "static"} def _find_version_var(tmpl: str, merged_vars: Json) -> str: """ Return the name of the variable in merged_vars that looks most like a version string and appears in the template, or '' if none found. Prefers keys named 'version', then anything whose value looks like a semver/calver string. """ candidates = [k for k in merged_vars if f"${{{k}}}" in tmpl] if "version" in candidates: return "version" # Pick the one whose value most resembles a version ver_re = re.compile(r"^\d+[\.\-]\d") for k in candidates: if ver_re.match(str(merged_vars.get(k, ""))): return k return candidates[0] if candidates else "" def apply_version_update( comp: Json, merged_vars: Json, target_dict: Json, new_version: str, version_var: str = "version", ) -> None: """ Write `new_version` into the correct location in `target_dict`. For url sources the version lives in `variables.`. For pypi sources it also lives in `variables.version` (the name is fixed). Clears any URL-path hash so it gets re-prefetched. """ # Update the variable vs = target_dict.setdefault("variables", {}) vs[version_var] = new_version # Clear the old hash on the source so it must be re-prefetched src_name = None for k, v in (target_dict.get("sources") or {}).items(): if isinstance(v, dict) and "hash" in v: src_name = k break # If no source entry yet, or the hash is on the base spec, clear it there too if src_name: target_dict["sources"][src_name].pop("hash", None) else: # Hash might be at base level (non-variant path) for k, v in (comp if isinstance(comp, dict) else {}).items(): pass # read-only; we write through target_dict only # --------------------------------------------------------------------------- # Package discovery # --------------------------------------------------------------------------- def find_packages() -> List[Tuple[str, Path]]: """ Scan packages/ for version.json files. Returns sorted list of (display_name, path) tuples. """ results: List[Tuple[str, Path]] = [] for p in PKGS_DIR.rglob("version.json"): rel = p.relative_to(PKGS_DIR).parent results.append((str(rel), p)) results.sort() return results # --------------------------------------------------------------------------- # Source display helper # --------------------------------------------------------------------------- def source_ref_label(comp: Json, merged_vars: Json) -> str: """Return a short human-readable reference string for a source.""" fetcher = comp.get("fetcher", "none") rendered = render(comp, merged_vars) if fetcher == "github": tag = rendered.get("tag") or "" rev = rendered.get("rev") or "" owner = rendered.get("owner") or str(merged_vars.get("owner") or "") repo = rendered.get("repo") or str(merged_vars.get("repo") or "") if tag and owner and repo: return f"{owner}/{repo}@{tag}" if tag: return tag if rev and owner and repo: return f"{owner}/{repo}@{rev[:7]}" if rev: return rev[:12] return "" if fetcher == "git": ref = rendered.get("tag") or rendered.get("rev") or comp.get("version") or "" if len(ref) == 40 and all(c in "0123456789abcdef" for c in ref): return ref[:12] return ref if fetcher == "url": url = rendered.get("url") or rendered.get("urlTemplate") or "" if not url: return "" if "${" in url: tmpl = comp.get("urlTemplate") or comp.get("url") or url filename = os.path.basename(urllib.parse.urlparse(tmpl).path) return re.sub(r"\$\{([^}]+)\}", r"<\1>", filename) filename = os.path.basename(urllib.parse.urlparse(url).path) owner = str(merged_vars.get("owner") or "") repo = str(merged_vars.get("repo") or "") rp = str(merged_vars.get("releasePrefix") or "") rs = str(merged_vars.get("releaseSuffix") or "") base = str(merged_vars.get("base") or "") rel = str(merged_vars.get("release") or "") tag = f"{rp}{base}-{rel}{rs}" if (base and rel) else "" if owner and repo and tag and filename: return f"{owner}/{repo}@{tag} · {filename}" return filename or url return str(comp.get("version") or comp.get("tag") or comp.get("rev") or "") # --------------------------------------------------------------------------- # Deep set helper # --------------------------------------------------------------------------- def deep_set(obj: Json, path: List[str], value: Any) -> None: cur = obj for key in path[:-1]: if key not in cur or not isinstance(cur[key], dict): cur[key] = {} cur = cur[key] cur[path[-1]] = value