Files
nix-config/scripts/lib.py
mjallen18 70002a19e2 hmm
2026-04-07 18:39:42 -05:00

858 lines
28 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Shared library for version.json management.
Provides:
- JSON load/save
- Variable template rendering
- Base+variant merge (mirrors lib/versioning/default.nix)
- GitHub/Git candidate fetching
- Nix hash prefetching (fetchFromGitHub, fetchgit, fetchurl, fetchzip, cargo vendor)
- Package scanning
"""
from __future__ import annotations
import json
import os
import re
import subprocess
import sys
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
Json = Dict[str, Any]
ROOT = Path(__file__).resolve().parents[1]
PKGS_DIR = ROOT / "packages"
# ---------------------------------------------------------------------------
# I/O
# ---------------------------------------------------------------------------
def load_json(path: Path) -> Json:
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def save_json(path: Path, data: Json) -> None:
tmp = path.with_suffix(".tmp")
with tmp.open("w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
f.write("\n")
tmp.replace(path)
def eprint(*args: Any, **kwargs: Any) -> None:
print(*args, file=sys.stderr, **kwargs)
# ---------------------------------------------------------------------------
# Template rendering
# ---------------------------------------------------------------------------
def render(value: Any, variables: Dict[str, Any]) -> Any:
"""Recursively substitute ${var} in strings using the given variable map."""
if isinstance(value, str):
return re.sub(
r"\$\{([^}]+)\}",
lambda m: str(variables.get(m.group(1), m.group(0))),
value,
)
if isinstance(value, dict):
return {k: render(v, variables) for k, v in value.items()}
if isinstance(value, list):
return [render(v, variables) for v in value]
return value
# ---------------------------------------------------------------------------
# Merge (matches lib/versioning/default.nix)
# ---------------------------------------------------------------------------
def _deep_merge(a: Json, b: Json) -> Json:
out = dict(a)
for k, v in b.items():
if k in out and isinstance(out[k], dict) and isinstance(v, dict):
out[k] = _deep_merge(out[k], v)
else:
out[k] = v
return out
def _merge_sources(base: Json, overrides: Json) -> Json:
names = set(base) | set(overrides)
result: Json = {}
for n in names:
if n in base and n in overrides:
b, o = base[n], overrides[n]
result[n] = (
_deep_merge(b, o) if isinstance(b, dict) and isinstance(o, dict) else o
)
elif n in overrides:
result[n] = overrides[n]
else:
result[n] = base[n]
return result
def merged_view(spec: Json, variant_name: Optional[str]) -> Tuple[Json, Json, Json]:
"""
Return (merged_variables, merged_sources, write_target).
merged_variables / merged_sources: what to use for display and prefetching.
write_target: the dict to mutate when saving changes (base spec or the
variant sub-dict).
"""
base_vars: Json = spec.get("variables") or {}
base_srcs: Json = spec.get("sources") or {}
if variant_name:
vdict = (spec.get("variants") or {}).get(variant_name)
if not isinstance(vdict, dict):
raise ValueError(f"Variant '{variant_name}' not found in spec")
v_vars: Json = vdict.get("variables") or {}
v_srcs: Json = vdict.get("sources") or {}
merged_vars = {**base_vars, **v_vars}
merged_srcs = _merge_sources(base_srcs, v_srcs)
return merged_vars, merged_srcs, vdict
return dict(base_vars), dict(base_srcs), spec
# ---------------------------------------------------------------------------
# Shell helpers
# ---------------------------------------------------------------------------
def _run(args: List[str], *, capture_stderr: bool = True) -> Tuple[int, str, str]:
p = subprocess.run(
args,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE if capture_stderr else None,
check=False,
)
return p.returncode, (p.stdout or "").strip(), (p.stderr or "").strip()
def _run_out(args: List[str]) -> Optional[str]:
code, out, err = _run(args)
if code != 0:
eprint(f"Command failed: {' '.join(args)}\n{err}")
return None
return out
# ---------------------------------------------------------------------------
# HTTP helpers
# ---------------------------------------------------------------------------
def http_get_json(url: str, token: Optional[str] = None) -> Optional[Any]:
try:
req = urllib.request.Request(
url, headers={"Accept": "application/vnd.github+json"}
)
if token:
req.add_header("Authorization", f"Bearer {token}")
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
eprint(f"HTTP {e.code} for {url}: {e.reason}")
except Exception as e:
eprint(f"Request failed for {url}: {e}")
return None
def http_get_text(url: str) -> Optional[str]:
try:
req = urllib.request.Request(
url, headers={"User-Agent": "nix-version-manager/2.0"}
)
with urllib.request.urlopen(req, timeout=15) as resp:
return resp.read().decode("utf-8")
except urllib.error.HTTPError as e:
eprint(f"HTTP {e.code} for {url}: {e.reason}")
except Exception as e:
eprint(f"Request failed for {url}: {e}")
return None
# ---------------------------------------------------------------------------
# GitHub API helpers
# ---------------------------------------------------------------------------
def gh_token() -> Optional[str]:
return os.environ.get("GITHUB_TOKEN")
def gh_latest_release(owner: str, repo: str) -> Optional[str]:
data = http_get_json(
f"https://api.github.com/repos/{owner}/{repo}/releases/latest", gh_token()
)
return data.get("tag_name") if isinstance(data, dict) else None
def gh_latest_tag(
owner: str, repo: str, *, tag_regex: Optional[str] = None
) -> Optional[str]:
data = http_get_json(
f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=100", gh_token()
)
if not isinstance(data, list):
return None
tags = [t["name"] for t in data if isinstance(t, dict) and t.get("name")]
if tag_regex:
rx = re.compile(tag_regex)
tags = [t for t in tags if rx.search(t)]
return tags[0] if tags else None
def gh_list_tags(owner: str, repo: str) -> List[str]:
data = http_get_json(
f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=100", gh_token()
)
if not isinstance(data, list):
return []
return [t["name"] for t in data if isinstance(t, dict) and t.get("name")]
def gh_head_commit(
owner: str, repo: str, branch: Optional[str] = None
) -> Optional[str]:
ref = f"refs/heads/{branch}" if branch else "HEAD"
out = _run_out(["git", "ls-remote", f"https://github.com/{owner}/{repo}.git", ref])
if not out:
return None
for line in out.splitlines():
parts = line.split()
if parts:
return parts[0]
return None
def gh_release_tags(owner: str, repo: str) -> List[str]:
data = http_get_json(
f"https://api.github.com/repos/{owner}/{repo}/releases?per_page=50", gh_token()
)
if not isinstance(data, list):
return []
return [r["tag_name"] for r in data if isinstance(r, dict) and r.get("tag_name")]
def _iso_to_date(iso: str) -> str:
return iso[:10] if iso and len(iso) >= 10 else ""
def gh_ref_date(owner: str, repo: str, ref: str) -> str:
data = http_get_json(
f"https://api.github.com/repos/{owner}/{repo}/commits/{urllib.parse.quote(ref, safe='')}",
gh_token(),
)
if not isinstance(data, dict):
return ""
iso = (
(data.get("commit") or {}).get("committer", {}).get("date")
or (data.get("commit") or {}).get("author", {}).get("date")
or ""
)
return _iso_to_date(iso)
def gh_release_date(owner: str, repo: str, tag: str) -> str:
data = http_get_json(
f"https://api.github.com/repos/{owner}/{repo}/releases/tags/{urllib.parse.quote(tag, safe='')}",
gh_token(),
)
if isinstance(data, dict):
iso = data.get("published_at") or data.get("created_at") or ""
if iso:
return _iso_to_date(iso)
return gh_ref_date(owner, repo, tag)
def git_branch_commit(url: str, branch: Optional[str] = None) -> Optional[str]:
ref = f"refs/heads/{branch}" if branch else "HEAD"
out = _run_out(["git", "ls-remote", url, ref])
if not out:
return None
for line in out.splitlines():
parts = line.split()
if parts:
return parts[0]
return None
def git_commit_date_for_github(url: str, sha: str) -> str:
"""Only works for github.com URLs; returns YYYY-MM-DD or empty string."""
try:
parsed = urllib.parse.urlparse(url)
if parsed.hostname != "github.com":
return ""
parts = [p for p in parsed.path.split("/") if p]
if len(parts) < 2:
return ""
owner = parts[0]
repo = parts[1].removesuffix(".git")
return gh_ref_date(owner, repo, sha)
except Exception:
return ""
# ---------------------------------------------------------------------------
# Nix prefetch helpers
# ---------------------------------------------------------------------------
def _nix_fakehash_build(expr: str) -> Optional[str]:
"""
Build a Nix expression that intentionally uses lib.fakeHash, parse the
correct hash from the 'got:' line in the error output.
"""
p = subprocess.run(
["nix", "build", "--impure", "--expr", expr],
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
m = re.search(r"got:\s+(sha256-[A-Za-z0-9+/=]+)", p.stderr)
if m:
return m.group(1)
eprint(f"nix fakeHash build failed:\n{p.stderr[-800:]}")
return None
def prefetch_github(
owner: str, repo: str, rev: str, *, submodules: bool = False
) -> Optional[str]:
"""
Hash for fetchFromGitHub — NAR hash of unpacked tarball.
Must use the fakeHash trick; nix store prefetch-file gives the wrong hash.
"""
sub = "true" if submodules else "false"
expr = (
f"let pkgs = import <nixpkgs> {{}};\n"
f"in pkgs.fetchFromGitHub {{\n"
f' owner = "{owner}";\n'
f' repo = "{repo}";\n'
f' rev = "{rev}";\n'
f" fetchSubmodules = {sub};\n"
f" hash = pkgs.lib.fakeHash;\n"
f"}}"
)
return _nix_fakehash_build(expr)
def prefetch_url(url: str) -> Optional[str]:
"""
Flat (non-unpacked) hash for fetchurl.
Uses nix store prefetch-file; falls back to nix-prefetch-url.
"""
out = _run_out(
["nix", "store", "prefetch-file", "--hash-type", "sha256", "--json", url]
)
if out:
try:
data = json.loads(out)
if "hash" in data:
return data["hash"]
except Exception:
pass
out = _run_out(["nix-prefetch-url", "--type", "sha256", url])
if out is None:
out = _run_out(["nix-prefetch-url", url])
if out is None:
return None
return _run_out(["nix", "hash", "to-sri", "--type", "sha256", out])
def prefetch_fetchzip(url: str, *, strip_root: bool = True) -> Optional[str]:
"""Hash for fetchzip — NAR of unpacked archive. Must use the fakeHash trick."""
expr = (
f"let pkgs = import <nixpkgs> {{}};\n"
f"in pkgs.fetchzip {{\n"
f' url = "{url}";\n'
f" stripRoot = {'true' if strip_root else 'false'};\n"
f" hash = pkgs.lib.fakeHash;\n"
f"}}"
)
return _nix_fakehash_build(expr)
def prefetch_git(url: str, rev: str) -> Optional[str]:
"""Hash for fetchgit."""
out = _run_out(["nix-prefetch-git", "--no-deepClone", "--rev", rev, url])
if out is not None:
base32 = None
try:
data = json.loads(out)
base32 = data.get("sha256") or data.get("hash")
except Exception:
lines = [l for l in out.splitlines() if l.strip()]
if lines:
base32 = lines[-1].strip()
if base32:
return _run_out(["nix", "hash", "to-sri", "--type", "sha256", base32])
# Fallback: builtins.fetchGit + nix hash path (commit SHA only)
if re.match(r"^[0-9a-f]{40}$", rev):
expr = f'builtins.fetchGit {{ url = "{url}"; rev = "{rev}"; }}'
store_path = _run_out(["nix", "eval", "--raw", "--expr", expr])
if store_path:
return _run_out(["nix", "hash", "path", "--type", "sha256", store_path])
return None
def prefetch_cargo_vendor(
fetcher: str,
src_hash: str,
*,
url: str = "",
owner: str = "",
repo: str = "",
rev: str = "",
subdir: str = "",
) -> Optional[str]:
"""Compute the cargo vendor hash via fetchCargoVendor + fakeHash."""
if fetcher == "github" and owner and repo and rev and src_hash:
src_expr = (
f'pkgs.fetchFromGitHub {{ owner = "{owner}"; repo = "{repo}";'
f' rev = "{rev}"; hash = "{src_hash}"; }}'
)
elif fetcher == "git" and url and rev and src_hash:
parsed = urllib.parse.urlparse(url)
parts = [p for p in parsed.path.split("/") if p]
if parsed.hostname == "github.com" and len(parts) >= 2:
gh_owner, gh_repo = parts[0], parts[1]
src_expr = (
f'pkgs.fetchFromGitHub {{ owner = "{gh_owner}"; repo = "{gh_repo}";'
f' rev = "{rev}"; hash = "{src_hash}"; }}'
)
else:
src_expr = f'pkgs.fetchgit {{ url = "{url}"; rev = "{rev}"; hash = "{src_hash}"; }}'
else:
return None
subdir_attr = f'sourceRoot = "${{src.name}}/{subdir}";' if subdir else ""
expr = (
f"let pkgs = import <nixpkgs> {{}};\n"
f" src = {src_expr};\n"
f"in pkgs.rustPlatform.fetchCargoVendor {{\n"
f" inherit src;\n"
f" {subdir_attr}\n"
f" hash = pkgs.lib.fakeHash;\n"
f"}}"
)
p = subprocess.run(
["nix", "build", "--impure", "--expr", expr],
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
m = re.search(r"got:\s+(sha256-[A-Za-z0-9+/=]+)", p.stderr)
if m:
return m.group(1)
eprint(f"cargo vendor prefetch failed:\n{p.stderr[-600:]}")
return None
# ---------------------------------------------------------------------------
# Source prefetch dispatch
# ---------------------------------------------------------------------------
def prefetch_source(comp: Json, merged_vars: Json) -> Optional[str]:
"""
Compute and return the SRI hash for a source component using the correct
Nix fetcher. Returns None on failure.
"""
fetcher = comp.get("fetcher", "none")
rendered = render(comp, merged_vars)
if fetcher == "github":
owner = comp.get("owner") or ""
repo = comp.get("repo") or ""
ref = rendered.get("tag") or rendered.get("rev") or ""
submodules = bool(comp.get("submodules", False))
if owner and repo and ref:
return prefetch_github(owner, repo, ref, submodules=submodules)
elif fetcher == "git":
url = comp.get("url") or ""
rev = rendered.get("rev") or rendered.get("tag") or ""
if url and rev:
return prefetch_git(url, rev)
elif fetcher == "url":
url = rendered.get("url") or rendered.get("urlTemplate") or ""
if url:
extra = comp.get("extra") or {}
if extra.get("unpack") == "zip":
return prefetch_fetchzip(url, strip_root=extra.get("stripRoot", True))
return prefetch_url(url)
return None
# ---------------------------------------------------------------------------
# Candidate fetching (what versions are available upstream)
# ---------------------------------------------------------------------------
class Candidates:
"""Latest available refs for a source component."""
__slots__ = ("release", "release_date", "tag", "tag_date", "commit", "commit_date")
def __init__(self) -> None:
self.release = self.release_date = ""
self.tag = self.tag_date = ""
self.commit = self.commit_date = ""
def fetch_candidates(comp: Json, merged_vars: Json) -> Candidates:
"""
Fetch the latest release, tag, and commit for a source component.
For 'url' fetcher with github variables, fetches the latest release tag.
"""
c = Candidates()
fetcher = comp.get("fetcher", "none")
branch: Optional[str] = comp.get("branch") or None
if fetcher == "github":
owner = comp.get("owner") or ""
repo = comp.get("repo") or ""
if not (owner and repo):
return c
if not branch:
r = gh_latest_release(owner, repo)
if r:
c.release = r
c.release_date = gh_release_date(owner, repo, r)
t = gh_latest_tag(owner, repo)
if t:
c.tag = t
c.tag_date = gh_ref_date(owner, repo, t)
m = gh_head_commit(owner, repo, branch)
if m:
c.commit = m
c.commit_date = gh_ref_date(owner, repo, m)
elif fetcher == "git":
url = comp.get("url") or ""
if url:
m = git_branch_commit(url, branch)
if m:
c.commit = m
c.commit_date = git_commit_date_for_github(url, m)
elif fetcher == "url":
url_info = _url_source_info(comp, merged_vars)
kind = url_info.get("kind")
if kind == "github":
owner = url_info["owner"]
repo = url_info["repo"]
tags = gh_release_tags(owner, repo)
prefix = str(merged_vars.get("releasePrefix") or "")
suffix = str(merged_vars.get("releaseSuffix") or "")
if prefix or suffix:
latest = next(
(t for t in tags if t.startswith(prefix) and t.endswith(suffix)),
None,
)
else:
latest = tags[0] if tags else None
if latest:
c.release = latest
c.release_date = gh_release_date(owner, repo, latest)
elif kind == "pypi":
name = url_info["name"]
latest = pypi_latest_version(name)
if latest:
c.release = latest
elif kind == "openvsx":
publisher = url_info["publisher"]
ext_name = url_info["ext_name"]
latest = openvsx_latest_version(publisher, ext_name)
if latest:
c.release = latest
return c
# ---------------------------------------------------------------------------
# Non-git upstream version helpers
# ---------------------------------------------------------------------------
def pypi_latest_version(name: str) -> Optional[str]:
"""Return the latest stable release version from PyPI."""
data = http_get_json(f"https://pypi.org/pypi/{urllib.parse.quote(name)}/json")
if not isinstance(data, dict):
return None
return (data.get("info") or {}).get("version") or None
def pypi_hash(name: str, version: str) -> Optional[str]:
"""
Return the SRI hash for a PyPI sdist or wheel using nix-prefetch.
Falls back to a fake-hash Nix build if nix-prefetch-url is unavailable.
"""
data = http_get_json(
f"https://pypi.org/pypi/{urllib.parse.quote(name)}/{urllib.parse.quote(version)}/json"
)
if not isinstance(data, dict):
return None
urls = data.get("urls") or []
# Prefer sdist; fall back to any wheel
sdist_url = next((u["url"] for u in urls if u.get("packagetype") == "sdist"), None)
wheel_url = next(
(u["url"] for u in urls if u.get("packagetype") == "bdist_wheel"), None
)
url = sdist_url or wheel_url
if not url:
return None
return prefetch_url(url)
def openvsx_latest_version(publisher: str, ext_name: str) -> Optional[str]:
"""Return the latest version of an extension from Open VSX Registry."""
data = http_get_json(
f"https://open-vsx.org/api/{urllib.parse.quote(publisher)}/{urllib.parse.quote(ext_name)}"
)
if not isinstance(data, dict):
return None
return data.get("version") or None
def _url_source_info(comp: Json, merged_vars: Json) -> Json:
"""
Classify a url-fetcher source and extract the relevant identifiers.
Returns a dict with at least 'kind' in:
'github' — GitHub release asset; includes 'owner', 'repo'
'pypi' — PyPI package; includes 'name', 'version_var'
'openvsx' — Open VSX extension; includes 'publisher', 'ext_name', 'version_var'
'plain' — plain URL with a version variable; includes 'version_var' if found
'static' — hardcoded URL with no variable parts
"""
tmpl = comp.get("urlTemplate") or comp.get("url") or ""
# Check merged_vars for explicit github owner/repo
owner = str(merged_vars.get("owner") or "")
repo = str(merged_vars.get("repo") or "")
if owner and repo:
return {"kind": "github", "owner": owner, "repo": repo}
# Detect from URL template
gh_m = re.search(r"github\.com/([^/\$]+)/([^/\$]+)/releases/download", tmpl)
if gh_m:
vvar = _find_version_var(tmpl, merged_vars)
return {
"kind": "github",
"owner": gh_m.group(1),
"repo": gh_m.group(2),
"version_var": vvar,
}
# Open VSX (open-vsx.org/api/${publisher}/${name}/${version}/...)
vsx_m = re.search(
r"open-vsx\.org/api/([^/\$]+)/([^/\$]+)/(?:\$\{[^}]+\}|[^/]+)/file", tmpl
)
if not vsx_m:
# Also match when publisher/name come from variables
if "open-vsx.org/api/" in tmpl:
publisher = str(merged_vars.get("publisher") or "")
ext_name = str(merged_vars.get("name") or "")
if publisher and ext_name:
vvar = _find_version_var(tmpl, merged_vars)
return {
"kind": "openvsx",
"publisher": publisher,
"ext_name": ext_name,
"version_var": vvar,
}
if vsx_m:
publisher = vsx_m.group(1)
ext_name = vsx_m.group(2)
# publisher/ext_name may be literal or variable refs
publisher = str(merged_vars.get(publisher.lstrip("${").rstrip("}"), publisher))
ext_name = str(merged_vars.get(ext_name.lstrip("${").rstrip("}"), ext_name))
vvar = _find_version_var(tmpl, merged_vars)
return {
"kind": "openvsx",
"publisher": publisher,
"ext_name": ext_name,
"version_var": vvar,
}
# PyPI: files.pythonhosted.org URLs
if "files.pythonhosted.org" in tmpl or "pypi.org" in tmpl:
pypi_name = str(merged_vars.get("name") or "")
if not pypi_name:
m = re.search(r"/packages/[^/]+/[^/]+/([^/]+)-\d", tmpl)
pypi_name = m.group(1).replace("_", "-") if m else ""
vvar = _find_version_var(tmpl, merged_vars)
return {"kind": "pypi", "name": pypi_name, "version_var": vvar}
vvar = _find_version_var(tmpl, merged_vars)
if vvar:
return {"kind": "plain", "version_var": vvar}
return {"kind": "static"}
def _find_version_var(tmpl: str, merged_vars: Json) -> str:
"""
Return the name of the variable in merged_vars that looks most like
a version string and appears in the template, or '' if none found.
Prefers keys named 'version', then anything whose value looks like a
semver/calver string.
"""
candidates = [k for k in merged_vars if f"${{{k}}}" in tmpl]
if "version" in candidates:
return "version"
# Pick the one whose value most resembles a version
ver_re = re.compile(r"^\d+[\.\-]\d")
for k in candidates:
if ver_re.match(str(merged_vars.get(k, ""))):
return k
return candidates[0] if candidates else ""
def apply_version_update(
comp: Json,
merged_vars: Json,
target_dict: Json,
new_version: str,
version_var: str = "version",
) -> None:
"""
Write `new_version` into the correct location in `target_dict`.
For url sources the version lives in `variables.<version_var>`.
For pypi sources it also lives in `variables.version` (the name is fixed).
Clears any URL-path hash so it gets re-prefetched.
"""
# Update the variable
vs = target_dict.setdefault("variables", {})
vs[version_var] = new_version
# Clear the old hash on the source so it must be re-prefetched
src_name = None
for k, v in (target_dict.get("sources") or {}).items():
if isinstance(v, dict) and "hash" in v:
src_name = k
break
# If no source entry yet, or the hash is on the base spec, clear it there too
if src_name:
target_dict["sources"][src_name].pop("hash", None)
else:
# Hash might be at base level (non-variant path)
for k, v in (comp if isinstance(comp, dict) else {}).items():
pass # read-only; we write through target_dict only
# ---------------------------------------------------------------------------
# Package discovery
# ---------------------------------------------------------------------------
def find_packages() -> List[Tuple[str, Path]]:
"""
Scan packages/ for version.json files.
Returns sorted list of (display_name, path) tuples.
"""
results: List[Tuple[str, Path]] = []
for p in PKGS_DIR.rglob("version.json"):
rel = p.relative_to(PKGS_DIR).parent
results.append((str(rel), p))
results.sort()
return results
# ---------------------------------------------------------------------------
# Source display helper
# ---------------------------------------------------------------------------
def source_ref_label(comp: Json, merged_vars: Json) -> str:
"""Return a short human-readable reference string for a source."""
fetcher = comp.get("fetcher", "none")
rendered = render(comp, merged_vars)
if fetcher == "github":
tag = rendered.get("tag") or ""
rev = rendered.get("rev") or ""
owner = rendered.get("owner") or str(merged_vars.get("owner") or "")
repo = rendered.get("repo") or str(merged_vars.get("repo") or "")
if tag and owner and repo:
return f"{owner}/{repo}@{tag}"
if tag:
return tag
if rev and owner and repo:
return f"{owner}/{repo}@{rev[:7]}"
if rev:
return rev[:12]
return ""
if fetcher == "git":
ref = rendered.get("tag") or rendered.get("rev") or comp.get("version") or ""
if len(ref) == 40 and all(c in "0123456789abcdef" for c in ref):
return ref[:12]
return ref
if fetcher == "url":
url = rendered.get("url") or rendered.get("urlTemplate") or ""
if not url:
return ""
if "${" in url:
tmpl = comp.get("urlTemplate") or comp.get("url") or url
filename = os.path.basename(urllib.parse.urlparse(tmpl).path)
return re.sub(r"\$\{([^}]+)\}", r"<\1>", filename)
filename = os.path.basename(urllib.parse.urlparse(url).path)
owner = str(merged_vars.get("owner") or "")
repo = str(merged_vars.get("repo") or "")
rp = str(merged_vars.get("releasePrefix") or "")
rs = str(merged_vars.get("releaseSuffix") or "")
base = str(merged_vars.get("base") or "")
rel = str(merged_vars.get("release") or "")
tag = f"{rp}{base}-{rel}{rs}" if (base and rel) else ""
if owner and repo and tag and filename:
return f"{owner}/{repo}@{tag} · {filename}"
return filename or url
return str(comp.get("version") or comp.get("tag") or comp.get("rev") or "")
# ---------------------------------------------------------------------------
# Deep set helper
# ---------------------------------------------------------------------------
def deep_set(obj: Json, path: List[str], value: Any) -> None:
cur = obj
for key in path[:-1]:
if key not in cur or not isinstance(cur[key], dict):
cur[key] = {}
cur = cur[key]
cur[path[-1]] = value