551 lines
22 KiB
Python
Executable File
551 lines
22 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Automatic Nix package update checker
|
|
Auto-discovers and checks GitHub-based Nix packages for updates
|
|
"""
|
|
|
|
import re
|
|
import json
|
|
import argparse
|
|
import requests
|
|
import subprocess
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple, NamedTuple
|
|
from dataclasses import dataclass
|
|
import sys
|
|
|
|
class PackageInfo(NamedTuple):
|
|
owner: str
|
|
repo: str
|
|
version: str
|
|
rev: str
|
|
current_hash: str
|
|
package_name: str
|
|
file_path: Path
|
|
|
|
@dataclass
|
|
class UpdateResult:
|
|
name: str
|
|
current_version: str
|
|
latest_version: str
|
|
has_update: bool
|
|
file_path: Path
|
|
repo_url: str
|
|
current_hash: Optional[str] = None
|
|
new_hash: Optional[str] = None
|
|
error: Optional[str] = None
|
|
|
|
class NixPackageChecker:
|
|
def __init__(self, search_paths: List[str] = None, max_depth: int = 3):
|
|
self.search_paths = search_paths or ["."]
|
|
self.max_depth = max_depth
|
|
self.session = requests.Session()
|
|
self.session.headers.update({'User-Agent': 'nix-package-checker'})
|
|
|
|
def find_nix_packages(self) -> List[Path]:
|
|
"""Auto-discover Nix package files with fetchFromGitHub"""
|
|
packages = []
|
|
|
|
for search_path in self.search_paths:
|
|
base_path = Path(search_path)
|
|
if not base_path.exists():
|
|
continue
|
|
|
|
# Find .nix files up to max_depth
|
|
for depth in range(self.max_depth + 1):
|
|
pattern = "**/" * depth + "*.nix"
|
|
for nix_file in base_path.glob(pattern):
|
|
if self._is_github_package(nix_file):
|
|
packages.append(nix_file)
|
|
|
|
return sorted(set(packages))
|
|
|
|
def _is_github_package(self, nix_file: Path) -> bool:
|
|
"""Check if a .nix file contains fetchFromGitHub"""
|
|
try:
|
|
content = nix_file.read_text(encoding='utf-8')
|
|
return 'fetchFromGitHub' in content and any(
|
|
pattern in content for pattern in ['owner =', 'repo =', 'version =']
|
|
)
|
|
except (UnicodeDecodeError, PermissionError):
|
|
return False
|
|
|
|
def compare_versions(self, current: str, latest: str) -> bool:
|
|
"""Compare versions, return True if latest is newer"""
|
|
if current == latest:
|
|
return False
|
|
|
|
# Handle HACS-X format
|
|
hacs_current = re.match(r'HACS-(\d+)', current)
|
|
hacs_latest = re.match(r'HACS-(\d+)', latest)
|
|
if hacs_current and hacs_latest:
|
|
return int(hacs_latest.group(1)) > int(hacs_current.group(1))
|
|
|
|
# Handle semantic versioning vX.Y.Z
|
|
sem_current = re.match(r'v?(\d+)\.(\d+)\.(\d+)', current)
|
|
sem_latest = re.match(r'v?(\d+)\.(\d+)\.(\d+)', latest)
|
|
if sem_current and sem_latest:
|
|
curr_parts = tuple(map(int, sem_current.groups()))
|
|
lat_parts = tuple(map(int, sem_latest.groups()))
|
|
return lat_parts > curr_parts
|
|
|
|
# Fallback to string comparison
|
|
return latest > current
|
|
|
|
def parse_nix_file(self, nix_file: Path) -> Optional[PackageInfo]:
|
|
"""Extract package information from a .nix file"""
|
|
try:
|
|
content = nix_file.read_text(encoding='utf-8')
|
|
except (UnicodeDecodeError, PermissionError) as e:
|
|
print(f"❌ Error reading {nix_file}: {e}")
|
|
return None
|
|
|
|
# Patterns to extract fields
|
|
patterns = {
|
|
'owner': r'owner\s*=\s*"([^"]+)"',
|
|
'repo': r'repo\s*=\s*"([^"]+)"',
|
|
'version': r'version\s*=\s*"([^"]+)"',
|
|
'rev': r'rev\s*=\s*(?:"([^"]+)"|([^;"\s]+))',
|
|
'hash': r'hash\s*=\s*"([^"]+)"',
|
|
# Package name patterns (in order of preference)
|
|
'domain': r'domain\s*=\s*"([^"]+)"', # Home Assistant components
|
|
'pname': r'pname\s*=\s*"([^"]+)"', # Standard Nix convention
|
|
'name': r'name\s*=\s*"([^"]+)"' # Older convention
|
|
}
|
|
|
|
extracted = {}
|
|
for field, pattern in patterns.items():
|
|
match = re.search(pattern, content)
|
|
if match:
|
|
if field == 'rev':
|
|
# Handle both quoted and unquoted rev values
|
|
extracted[field] = match.group(1) or match.group(2)
|
|
else:
|
|
extracted[field] = match.group(1)
|
|
|
|
# Validate required fields
|
|
required = ['owner', 'repo', 'version']
|
|
if not all(field in extracted for field in required):
|
|
missing = [f for f in required if f not in extracted]
|
|
print(f"⚠️ {nix_file.name}: Missing fields: {missing}")
|
|
return None
|
|
|
|
# Handle rev = version case
|
|
rev = extracted.get('rev', extracted['version'])
|
|
if rev == 'version':
|
|
rev = extracted['version']
|
|
|
|
# Extract current hash (may not exist for all packages)
|
|
current_hash = extracted.get('hash', '')
|
|
|
|
# Determine package name (priority: domain > pname > name > repo > directory)
|
|
package_name = None
|
|
for name_field in ['domain', 'pname', 'name']:
|
|
if name_field in extracted:
|
|
package_name = extracted[name_field]
|
|
break
|
|
|
|
if not package_name:
|
|
# Fall back to repo name
|
|
package_name = extracted['repo']
|
|
|
|
# If still no name and it's in a subdirectory, use directory name
|
|
if not package_name or package_name == extracted['repo']:
|
|
parent_dir = nix_file.parent.name
|
|
if parent_dir != '.' and parent_dir != nix_file.parent.parent.name:
|
|
package_name = f"{parent_dir}-{extracted['repo']}" if package_name == extracted['repo'] else parent_dir
|
|
|
|
return PackageInfo(
|
|
owner=extracted['owner'],
|
|
repo=extracted['repo'],
|
|
version=extracted['version'],
|
|
rev=rev,
|
|
current_hash=current_hash,
|
|
package_name=package_name,
|
|
file_path=nix_file
|
|
)
|
|
|
|
def get_latest_release(self, owner: str, repo: str) -> Optional[str]:
|
|
"""Get latest GitHub release tag"""
|
|
url = f"https://api.github.com/repos/{owner}/{repo}/releases/latest"
|
|
|
|
try:
|
|
response = self.session.get(url, timeout=10)
|
|
if response.status_code == 200:
|
|
return response.json().get('tag_name')
|
|
elif response.status_code == 404:
|
|
# Try getting tags if no releases
|
|
return self._get_latest_tag(owner, repo)
|
|
else:
|
|
print(f"⚠️ API error for {owner}/{repo}: {response.status_code}")
|
|
return None
|
|
except requests.RequestException as e:
|
|
print(f"⚠️ Network error for {owner}/{repo}: {e}")
|
|
return None
|
|
|
|
def _get_latest_tag(self, owner: str, repo: str) -> Optional[str]:
|
|
"""Fallback to get latest tag if no releases"""
|
|
url = f"https://api.github.com/repos/{owner}/{repo}/tags"
|
|
|
|
try:
|
|
response = self.session.get(url, timeout=10)
|
|
if response.status_code == 200:
|
|
tags = response.json()
|
|
return tags[0]['name'] if tags else None
|
|
except requests.RequestException:
|
|
pass
|
|
return None
|
|
|
|
def get_github_hash(self, owner: str, repo: str, rev: str) -> Optional[str]:
|
|
"""Get hash for GitHub source using nix-prefetch-url or nix-prefetch-github"""
|
|
# Try nix-prefetch-url first (more commonly available)
|
|
if shutil.which('nix-prefetch-url'):
|
|
return self._get_hash_with_prefetch_url(owner, repo, rev)
|
|
# Fall back to nix-prefetch-github
|
|
elif shutil.which('nix-prefetch-github'):
|
|
return self._get_hash_with_prefetch_github(owner, repo, rev)
|
|
else:
|
|
print("⚠️ Neither nix-prefetch-url nor nix-prefetch-github found.")
|
|
print(" nix-prefetch-url is included in nix by default")
|
|
print(" nix-prefetch-github: nix-env -iA nixpkgs.nix-prefetch-github")
|
|
return None
|
|
|
|
def _get_hash_with_prefetch_url(self, owner: str, repo: str, rev: str) -> Optional[str]:
|
|
"""Get hash using nix-prefetch-url with GitHub archive URL"""
|
|
# GitHub archive URL format
|
|
url = f"https://github.com/{owner}/{repo}/archive/{rev}.tar.gz"
|
|
|
|
try:
|
|
# Use --unpack to match fetchFromGitHub behavior
|
|
cmd = ['nix-prefetch-url', '--unpack', url]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
|
|
|
|
if result.returncode == 0:
|
|
# nix-prefetch-url outputs the hash directly (sha256)
|
|
hash_value = result.stdout.strip()
|
|
return hash_value
|
|
else:
|
|
print(f"⚠️ nix-prefetch-url failed for {owner}/{repo}@{rev}:")
|
|
print(f" URL: {url}")
|
|
print(f" Error: {result.stderr.strip()}")
|
|
return None
|
|
|
|
except subprocess.TimeoutExpired:
|
|
print(f"⚠️ Timeout fetching hash for {owner}/{repo}@{rev} (60s limit)")
|
|
return None
|
|
except subprocess.SubprocessError as e:
|
|
print(f"⚠️ Error with nix-prefetch-url for {owner}/{repo}@{rev}: {e}")
|
|
return None
|
|
|
|
def _get_hash_with_prefetch_github(self, owner: str, repo: str, rev: str) -> Optional[str]:
|
|
"""Get hash using nix-prefetch-github (fallback method)"""
|
|
try:
|
|
cmd = ['nix-prefetch-github', owner, repo, '--rev', rev]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
|
|
|
|
if result.returncode == 0:
|
|
# Parse JSON output to get sha256
|
|
data = json.loads(result.stdout)
|
|
return data.get('sha256')
|
|
else:
|
|
print(f"⚠️ nix-prefetch-github failed for {owner}/{repo}@{rev}:")
|
|
print(f" {result.stderr.strip()}")
|
|
return None
|
|
|
|
except subprocess.TimeoutExpired:
|
|
print(f"⚠️ Timeout fetching hash for {owner}/{repo}@{rev}")
|
|
return None
|
|
except (subprocess.SubprocessError, json.JSONDecodeError) as e:
|
|
print(f"⚠️ Error with nix-prefetch-github for {owner}/{repo}@{rev}: {e}")
|
|
return None
|
|
|
|
def update_nix_file(self, pkg_info: PackageInfo, new_version: str, new_hash: str) -> bool:
|
|
"""Update a .nix file with new version and hash"""
|
|
try:
|
|
content = pkg_info.file_path.read_text(encoding='utf-8')
|
|
|
|
# Create backup
|
|
backup_path = pkg_info.file_path.with_suffix('.nix.backup')
|
|
backup_path.write_text(content, encoding='utf-8')
|
|
|
|
# Update version
|
|
content = re.sub(
|
|
r'(version\s*=\s*)"[^"]+";',
|
|
f'\\1"{new_version}";',
|
|
content
|
|
)
|
|
|
|
# Update hash
|
|
if pkg_info.current_hash:
|
|
content = re.sub(
|
|
r'(hash\s*=\s*)"[^"]+";',
|
|
f'\\1"sha256-{new_hash}";',
|
|
content
|
|
)
|
|
|
|
# Write updated content
|
|
pkg_info.file_path.write_text(content, encoding='utf-8')
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error updating {pkg_info.file_path}: {e}")
|
|
return False
|
|
"""Compare versions, return True if latest is newer"""
|
|
if current == latest:
|
|
return False
|
|
|
|
# Handle HACS-X format
|
|
hacs_current = re.match(r'HACS-(\d+)', current)
|
|
hacs_latest = re.match(r'HACS-(\d+)', latest)
|
|
if hacs_current and hacs_latest:
|
|
return int(hacs_latest.group(1)) > int(hacs_current.group(1))
|
|
|
|
# Handle semantic versioning vX.Y.Z
|
|
sem_current = re.match(r'v?(\d+)\.(\d+)\.(\d+)', current)
|
|
sem_latest = re.match(r'v?(\d+)\.(\d+)\.(\d+)', latest)
|
|
if sem_current and sem_latest:
|
|
curr_parts = tuple(map(int, sem_current.groups()))
|
|
lat_parts = tuple(map(int, sem_latest.groups()))
|
|
return lat_parts > curr_parts
|
|
|
|
# Fallback to string comparison
|
|
return latest > current
|
|
|
|
def check_package(self, pkg_info: PackageInfo, fetch_hash: bool = False) -> UpdateResult:
|
|
"""Check a single package for updates"""
|
|
latest_version = self.get_latest_release(pkg_info.owner, pkg_info.repo)
|
|
|
|
if latest_version is None:
|
|
return UpdateResult(
|
|
name=pkg_info.package_name,
|
|
current_version=pkg_info.version,
|
|
latest_version="unknown",
|
|
has_update=False,
|
|
file_path=pkg_info.file_path,
|
|
repo_url=f"https://github.com/{pkg_info.owner}/{pkg_info.repo}",
|
|
current_hash=pkg_info.current_hash,
|
|
error="Could not fetch latest release"
|
|
)
|
|
|
|
has_update = self.compare_versions(pkg_info.version, latest_version)
|
|
new_hash = None
|
|
|
|
# Fetch new hash if update available and requested
|
|
if has_update and fetch_hash:
|
|
print(f" 🔄 Fetching hash for {pkg_info.package_name} ({pkg_info.owner}/{pkg_info.repo}@{latest_version})...")
|
|
new_hash = self.get_github_hash(pkg_info.owner, pkg_info.repo, latest_version)
|
|
|
|
return UpdateResult(
|
|
name=pkg_info.package_name,
|
|
current_version=pkg_info.version,
|
|
latest_version=latest_version,
|
|
has_update=has_update,
|
|
file_path=pkg_info.file_path,
|
|
repo_url=f"https://github.com/{pkg_info.owner}/{pkg_info.repo}",
|
|
current_hash=pkg_info.current_hash,
|
|
new_hash=new_hash
|
|
)
|
|
|
|
def check_all_packages(self, fetch_hash: bool = False, auto_update: bool = False) -> List[UpdateResult]:
|
|
"""Check all discovered packages"""
|
|
nix_files = self.find_nix_packages()
|
|
|
|
if not nix_files:
|
|
print("No Nix package files found")
|
|
return []
|
|
|
|
print(f"Found {len(nix_files)} package files")
|
|
|
|
# Show which hash fetching tool is available
|
|
if fetch_hash or auto_update:
|
|
if shutil.which('nix-prefetch-url'):
|
|
print("Hash fetching: using nix-prefetch-url")
|
|
elif shutil.which('nix-prefetch-github'):
|
|
print("Hash fetching: using nix-prefetch-github")
|
|
else:
|
|
print("⚠️ No hash fetching tool available")
|
|
|
|
results = []
|
|
|
|
for nix_file in nix_files:
|
|
pkg_info = self.parse_nix_file(nix_file)
|
|
if pkg_info:
|
|
result = self.check_package(pkg_info, fetch_hash=fetch_hash)
|
|
results.append(result)
|
|
|
|
# Auto-update if requested and update available
|
|
if auto_update and result.has_update and result.new_hash:
|
|
print(f" 🔄 Auto-updating {result.name}...")
|
|
if self.update_nix_file(pkg_info, result.latest_version, result.new_hash):
|
|
print(f" ✅ Updated {result.file_path}")
|
|
else:
|
|
print(f" ❌ Failed to update {result.file_path}")
|
|
|
|
return results
|
|
|
|
def print_results(results: List[UpdateResult], show_all: bool = True, show_hashes: bool = False):
|
|
"""Print results in a nice format"""
|
|
if not results:
|
|
return
|
|
|
|
updates_available = [r for r in results if r.has_update]
|
|
|
|
if show_all:
|
|
print(f"\n{'Package':<25} {'Current':<15} {'Latest':<15} {'Status'}")
|
|
print("-" * 70)
|
|
|
|
for result in results:
|
|
if result.error:
|
|
status = f"❌ {result.error}"
|
|
elif result.has_update:
|
|
status = "🔄 Update available"
|
|
else:
|
|
status = "✅ Up to date"
|
|
|
|
print(f"{result.name:<25} {result.current_version:<15} {result.latest_version:<15} {status}")
|
|
|
|
# Show file path for default.nix files or when there might be confusion
|
|
if result.file_path.name == 'default.nix' or len([r for r in results if r.name == result.name]) > 1:
|
|
rel_path = result.file_path.relative_to(Path.cwd()) if result.file_path.is_absolute() else result.file_path
|
|
print(f"{'':>25} File: {rel_path}")
|
|
|
|
# Show hash information if available and requested
|
|
if show_hashes and result.has_update and result.new_hash:
|
|
print(f"{'':>25} Current hash: {result.current_hash[:16]}..." if result.current_hash else "")
|
|
print(f"{'':>25} New hash: sha256-{result.new_hash[:16]}...")
|
|
|
|
# Summary
|
|
print(f"\nSummary:")
|
|
print(f" Total packages: {len(results)}")
|
|
print(f" Updates available: {len(updates_available)}")
|
|
|
|
if updates_available:
|
|
print(f"\nPackages with updates:")
|
|
for result in updates_available:
|
|
rel_path = result.file_path.relative_to(Path.cwd()) if result.file_path.is_absolute() else result.file_path
|
|
print(f" • {result.name}: {result.current_version} → {result.latest_version}")
|
|
print(f" File: {rel_path}")
|
|
print(f" Repo: {result.repo_url}")
|
|
if show_hashes and result.new_hash:
|
|
print(f" New hash: sha256-{result.new_hash}")
|
|
|
|
def print_updates_only(results: List[UpdateResult], show_hashes: bool = False):
|
|
"""Print only packages with updates"""
|
|
updates = [r for r in results if r.has_update]
|
|
|
|
if not updates:
|
|
print("No updates available")
|
|
return
|
|
|
|
print("Updates available:")
|
|
for result in updates:
|
|
rel_path = result.file_path.relative_to(Path.cwd()) if result.file_path.is_absolute() else result.file_path
|
|
print(f" {result.name}: {result.current_version} → {result.latest_version}")
|
|
print(f" File: {rel_path}")
|
|
if show_hashes and result.new_hash:
|
|
print(f" New hash: sha256-{result.new_hash}")
|
|
elif show_hashes:
|
|
print(f" Hash: (not fetched)")
|
|
|
|
def output_json(results: List[UpdateResult]):
|
|
"""Output results as JSON"""
|
|
data = {}
|
|
for result in results:
|
|
data[result.name] = {
|
|
"current_version": result.current_version,
|
|
"latest_version": result.latest_version,
|
|
"has_update": result.has_update,
|
|
"file_path": str(result.file_path),
|
|
"repo_url": result.repo_url,
|
|
"current_hash": result.current_hash,
|
|
"new_hash": result.new_hash,
|
|
"error": result.error
|
|
}
|
|
|
|
print(json.dumps(data, indent=2))
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Automatically check Nix packages for GitHub updates",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
%(prog)s # Check all packages in current directory
|
|
%(prog)s --updates # Show only packages with updates
|
|
%(prog)s --fetch-hash # Also fetch new hashes for updates
|
|
%(prog)s --auto-update # Automatically update .nix files
|
|
%(prog)s --json # Output as JSON
|
|
%(prog)s --path ./packages # Check specific directory
|
|
%(prog)s --depth 5 # Search deeper directory levels
|
|
|
|
Requirements:
|
|
For hash fetching: nix-prefetch-url (part of nix) or nix-prefetch-github
|
|
nix-prefetch-url is preferred and usually already available
|
|
"""
|
|
)
|
|
|
|
parser.add_argument('--updates', action='store_true',
|
|
help='Show only packages with updates available')
|
|
parser.add_argument('--fetch-hash', action='store_true',
|
|
help='Fetch new hashes for packages with updates (requires nix-prefetch-url or nix-prefetch-github)')
|
|
parser.add_argument('--auto-update', action='store_true',
|
|
help='Automatically update .nix files with new versions and hashes')
|
|
parser.add_argument('--json', action='store_true',
|
|
help='Output results as JSON')
|
|
parser.add_argument('--path', action='append', default=[],
|
|
help='Search path for .nix files (can be used multiple times)')
|
|
parser.add_argument('--depth', type=int, default=3,
|
|
help='Maximum directory depth to search (default: 3)')
|
|
parser.add_argument('--list', action='store_true',
|
|
help='List discovered package files without checking updates')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Auto-update implies fetch-hash
|
|
if args.auto_update:
|
|
args.fetch_hash = True
|
|
|
|
# Use provided paths or default to current directory
|
|
search_paths = args.path if args.path else ["."]
|
|
|
|
checker = NixPackageChecker(search_paths=search_paths, max_depth=args.depth)
|
|
|
|
if args.list:
|
|
# Just list discovered files
|
|
nix_files = checker.find_nix_packages()
|
|
print(f"Discovered {len(nix_files)} package files:")
|
|
for nix_file in nix_files:
|
|
pkg_info = checker.parse_nix_file(nix_file)
|
|
if pkg_info:
|
|
rel_path = nix_file.relative_to(Path.cwd()) if nix_file.is_absolute() else nix_file
|
|
print(f" {pkg_info.package_name:<25} {pkg_info.owner}/{pkg_info.repo} ({pkg_info.version}) - {rel_path}")
|
|
else:
|
|
rel_path = nix_file.relative_to(Path.cwd()) if nix_file.is_absolute() else nix_file
|
|
print(f" {'(parse failed)':<25} - {rel_path}")
|
|
return
|
|
|
|
# Check for updates
|
|
results = checker.check_all_packages(
|
|
fetch_hash=args.fetch_hash,
|
|
auto_update=args.auto_update
|
|
)
|
|
|
|
if not results:
|
|
print("No packages found to check")
|
|
return
|
|
|
|
# Output results
|
|
if args.json:
|
|
output_json(results)
|
|
elif args.updates:
|
|
print_updates_only(results, show_hashes=args.fetch_hash)
|
|
else:
|
|
print_results(results, show_all=True, show_hashes=args.fetch_hash)
|
|
|
|
# Set exit code based on updates available
|
|
updates_available = any(r.has_update for r in results)
|
|
sys.exit(1 if updates_available else 0)
|
|
|
|
if __name__ == "__main__":
|
|
main() |