#!/usr/bin/env python3 """ Automatic Nix package update checker Auto-discovers and checks GitHub-based Nix packages for updates """ import re import json import argparse import requests import subprocess import shutil from pathlib import Path from typing import Dict, List, Optional, Tuple, NamedTuple from dataclasses import dataclass import sys class PackageInfo(NamedTuple): owner: str repo: str version: str rev: str current_hash: str package_name: str file_path: Path @dataclass class UpdateResult: name: str current_version: str latest_version: str has_update: bool file_path: Path repo_url: str current_hash: Optional[str] = None new_hash: Optional[str] = None error: Optional[str] = None class NixPackageChecker: def __init__(self, search_paths: List[str] = None, max_depth: int = 3): self.search_paths = search_paths or ["."] self.max_depth = max_depth self.session = requests.Session() self.session.headers.update({'User-Agent': 'nix-package-checker'}) def find_nix_packages(self) -> List[Path]: """Auto-discover Nix package files with fetchFromGitHub""" packages = [] for search_path in self.search_paths: base_path = Path(search_path) if not base_path.exists(): continue # Find .nix files up to max_depth for depth in range(self.max_depth + 1): pattern = "**/" * depth + "*.nix" for nix_file in base_path.glob(pattern): if self._is_github_package(nix_file): packages.append(nix_file) return sorted(set(packages)) def _is_github_package(self, nix_file: Path) -> bool: """Check if a .nix file contains fetchFromGitHub""" try: content = nix_file.read_text(encoding='utf-8') return 'fetchFromGitHub' in content and any( pattern in content for pattern in ['owner =', 'repo =', 'version ='] ) except (UnicodeDecodeError, PermissionError): return False def compare_versions(self, current: str, latest: str) -> bool: """Compare versions, return True if latest is newer""" if current == latest: return False # Handle HACS-X format hacs_current = re.match(r'HACS-(\d+)', current) hacs_latest = re.match(r'HACS-(\d+)', latest) if hacs_current and hacs_latest: return int(hacs_latest.group(1)) > int(hacs_current.group(1)) # Handle semantic versioning vX.Y.Z sem_current = re.match(r'v?(\d+)\.(\d+)\.(\d+)', current) sem_latest = re.match(r'v?(\d+)\.(\d+)\.(\d+)', latest) if sem_current and sem_latest: curr_parts = tuple(map(int, sem_current.groups())) lat_parts = tuple(map(int, sem_latest.groups())) return lat_parts > curr_parts # Fallback to string comparison return latest > current def parse_nix_file(self, nix_file: Path) -> Optional[PackageInfo]: """Extract package information from a .nix file""" try: content = nix_file.read_text(encoding='utf-8') except (UnicodeDecodeError, PermissionError) as e: print(f"❌ Error reading {nix_file}: {e}") return None # Patterns to extract fields patterns = { 'owner': r'owner\s*=\s*"([^"]+)"', 'repo': r'repo\s*=\s*"([^"]+)"', 'version': r'version\s*=\s*"([^"]+)"', 'rev': r'rev\s*=\s*(?:"([^"]+)"|([^;"\s]+))', 'hash': r'hash\s*=\s*"([^"]+)"', # Package name patterns (in order of preference) 'domain': r'domain\s*=\s*"([^"]+)"', # Home Assistant components 'pname': r'pname\s*=\s*"([^"]+)"', # Standard Nix convention 'name': r'name\s*=\s*"([^"]+)"' # Older convention } extracted = {} for field, pattern in patterns.items(): match = re.search(pattern, content) if match: if field == 'rev': # Handle both quoted and unquoted rev values extracted[field] = match.group(1) or match.group(2) else: extracted[field] = match.group(1) # Validate required fields required = ['owner', 'repo', 'version'] if not all(field in extracted for field in required): missing = [f for f in required if f not in extracted] print(f"⚠️ {nix_file.name}: Missing fields: {missing}") return None # Handle rev = version case rev = extracted.get('rev', extracted['version']) if rev == 'version': rev = extracted['version'] # Extract current hash (may not exist for all packages) current_hash = extracted.get('hash', '') # Determine package name (priority: domain > pname > name > repo > directory) package_name = None for name_field in ['domain', 'pname', 'name']: if name_field in extracted: package_name = extracted[name_field] break if not package_name: # Fall back to repo name package_name = extracted['repo'] # If still no name and it's in a subdirectory, use directory name if not package_name or package_name == extracted['repo']: parent_dir = nix_file.parent.name if parent_dir != '.' and parent_dir != nix_file.parent.parent.name: package_name = f"{parent_dir}-{extracted['repo']}" if package_name == extracted['repo'] else parent_dir return PackageInfo( owner=extracted['owner'], repo=extracted['repo'], version=extracted['version'], rev=rev, current_hash=current_hash, package_name=package_name, file_path=nix_file ) def get_latest_release(self, owner: str, repo: str) -> Optional[str]: """Get latest GitHub release tag""" url = f"https://api.github.com/repos/{owner}/{repo}/releases/latest" try: response = self.session.get(url, timeout=10) if response.status_code == 200: return response.json().get('tag_name') elif response.status_code == 404: # Try getting tags if no releases return self._get_latest_tag(owner, repo) else: print(f"⚠️ API error for {owner}/{repo}: {response.status_code}") return None except requests.RequestException as e: print(f"⚠️ Network error for {owner}/{repo}: {e}") return None def _get_latest_tag(self, owner: str, repo: str) -> Optional[str]: """Fallback to get latest tag if no releases""" url = f"https://api.github.com/repos/{owner}/{repo}/tags" try: response = self.session.get(url, timeout=10) if response.status_code == 200: tags = response.json() return tags[0]['name'] if tags else None except requests.RequestException: pass return None def get_github_hash(self, owner: str, repo: str, rev: str) -> Optional[str]: """Get hash for GitHub source using nix-prefetch-url or nix-prefetch-github""" # Try nix-prefetch-url first (more commonly available) if shutil.which('nix-prefetch-url'): return self._get_hash_with_prefetch_url(owner, repo, rev) # Fall back to nix-prefetch-github elif shutil.which('nix-prefetch-github'): return self._get_hash_with_prefetch_github(owner, repo, rev) else: print("⚠️ Neither nix-prefetch-url nor nix-prefetch-github found.") print(" nix-prefetch-url is included in nix by default") print(" nix-prefetch-github: nix-env -iA nixpkgs.nix-prefetch-github") return None def _get_hash_with_prefetch_url(self, owner: str, repo: str, rev: str) -> Optional[str]: """Get hash using nix-prefetch-url with GitHub archive URL""" # GitHub archive URL format url = f"https://github.com/{owner}/{repo}/archive/{rev}.tar.gz" try: # Use --unpack to match fetchFromGitHub behavior cmd = ['nix-prefetch-url', '--unpack', url] result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) if result.returncode == 0: # nix-prefetch-url outputs the hash directly (sha256) hash_value = result.stdout.strip() return hash_value else: print(f"⚠️ nix-prefetch-url failed for {owner}/{repo}@{rev}:") print(f" URL: {url}") print(f" Error: {result.stderr.strip()}") return None except subprocess.TimeoutExpired: print(f"⚠️ Timeout fetching hash for {owner}/{repo}@{rev} (60s limit)") return None except subprocess.SubprocessError as e: print(f"⚠️ Error with nix-prefetch-url for {owner}/{repo}@{rev}: {e}") return None def _get_hash_with_prefetch_github(self, owner: str, repo: str, rev: str) -> Optional[str]: """Get hash using nix-prefetch-github (fallback method)""" try: cmd = ['nix-prefetch-github', owner, repo, '--rev', rev] result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) if result.returncode == 0: # Parse JSON output to get sha256 data = json.loads(result.stdout) return data.get('sha256') else: print(f"⚠️ nix-prefetch-github failed for {owner}/{repo}@{rev}:") print(f" {result.stderr.strip()}") return None except subprocess.TimeoutExpired: print(f"⚠️ Timeout fetching hash for {owner}/{repo}@{rev}") return None except (subprocess.SubprocessError, json.JSONDecodeError) as e: print(f"⚠️ Error with nix-prefetch-github for {owner}/{repo}@{rev}: {e}") return None def update_nix_file(self, pkg_info: PackageInfo, new_version: str, new_hash: str) -> bool: """Update a .nix file with new version and hash""" try: content = pkg_info.file_path.read_text(encoding='utf-8') # Create backup backup_path = pkg_info.file_path.with_suffix('.nix.backup') backup_path.write_text(content, encoding='utf-8') # Update version content = re.sub( r'(version\s*=\s*)"[^"]+";', f'\\1"{new_version}";', content ) # Update hash if pkg_info.current_hash: content = re.sub( r'(hash\s*=\s*)"[^"]+";', f'\\1"sha256-{new_hash}";', content ) # Write updated content pkg_info.file_path.write_text(content, encoding='utf-8') return True except Exception as e: print(f"❌ Error updating {pkg_info.file_path}: {e}") return False """Compare versions, return True if latest is newer""" if current == latest: return False # Handle HACS-X format hacs_current = re.match(r'HACS-(\d+)', current) hacs_latest = re.match(r'HACS-(\d+)', latest) if hacs_current and hacs_latest: return int(hacs_latest.group(1)) > int(hacs_current.group(1)) # Handle semantic versioning vX.Y.Z sem_current = re.match(r'v?(\d+)\.(\d+)\.(\d+)', current) sem_latest = re.match(r'v?(\d+)\.(\d+)\.(\d+)', latest) if sem_current and sem_latest: curr_parts = tuple(map(int, sem_current.groups())) lat_parts = tuple(map(int, sem_latest.groups())) return lat_parts > curr_parts # Fallback to string comparison return latest > current def check_package(self, pkg_info: PackageInfo, fetch_hash: bool = False) -> UpdateResult: """Check a single package for updates""" latest_version = self.get_latest_release(pkg_info.owner, pkg_info.repo) if latest_version is None: return UpdateResult( name=pkg_info.package_name, current_version=pkg_info.version, latest_version="unknown", has_update=False, file_path=pkg_info.file_path, repo_url=f"https://github.com/{pkg_info.owner}/{pkg_info.repo}", current_hash=pkg_info.current_hash, error="Could not fetch latest release" ) has_update = self.compare_versions(pkg_info.version, latest_version) new_hash = None # Fetch new hash if update available and requested if has_update and fetch_hash: print(f" 🔄 Fetching hash for {pkg_info.package_name} ({pkg_info.owner}/{pkg_info.repo}@{latest_version})...") new_hash = self.get_github_hash(pkg_info.owner, pkg_info.repo, latest_version) return UpdateResult( name=pkg_info.package_name, current_version=pkg_info.version, latest_version=latest_version, has_update=has_update, file_path=pkg_info.file_path, repo_url=f"https://github.com/{pkg_info.owner}/{pkg_info.repo}", current_hash=pkg_info.current_hash, new_hash=new_hash ) def check_all_packages(self, fetch_hash: bool = False, auto_update: bool = False) -> List[UpdateResult]: """Check all discovered packages""" nix_files = self.find_nix_packages() if not nix_files: print("No Nix package files found") return [] print(f"Found {len(nix_files)} package files") # Show which hash fetching tool is available if fetch_hash or auto_update: if shutil.which('nix-prefetch-url'): print("Hash fetching: using nix-prefetch-url") elif shutil.which('nix-prefetch-github'): print("Hash fetching: using nix-prefetch-github") else: print("⚠️ No hash fetching tool available") results = [] for nix_file in nix_files: pkg_info = self.parse_nix_file(nix_file) if pkg_info: result = self.check_package(pkg_info, fetch_hash=fetch_hash) results.append(result) # Auto-update if requested and update available if auto_update and result.has_update and result.new_hash: print(f" 🔄 Auto-updating {result.name}...") if self.update_nix_file(pkg_info, result.latest_version, result.new_hash): print(f" ✅ Updated {result.file_path}") else: print(f" ❌ Failed to update {result.file_path}") return results def print_results(results: List[UpdateResult], show_all: bool = True, show_hashes: bool = False): """Print results in a nice format""" if not results: return updates_available = [r for r in results if r.has_update] if show_all: print(f"\n{'Package':<25} {'Current':<15} {'Latest':<15} {'Status'}") print("-" * 70) for result in results: if result.error: status = f"❌ {result.error}" elif result.has_update: status = "🔄 Update available" else: status = "✅ Up to date" print(f"{result.name:<25} {result.current_version:<15} {result.latest_version:<15} {status}") # Show file path for default.nix files or when there might be confusion if result.file_path.name == 'default.nix' or len([r for r in results if r.name == result.name]) > 1: rel_path = result.file_path.relative_to(Path.cwd()) if result.file_path.is_absolute() else result.file_path print(f"{'':>25} File: {rel_path}") # Show hash information if available and requested if show_hashes and result.has_update and result.new_hash: print(f"{'':>25} Current hash: {result.current_hash[:16]}..." if result.current_hash else "") print(f"{'':>25} New hash: sha256-{result.new_hash[:16]}...") # Summary print(f"\nSummary:") print(f" Total packages: {len(results)}") print(f" Updates available: {len(updates_available)}") if updates_available: print(f"\nPackages with updates:") for result in updates_available: rel_path = result.file_path.relative_to(Path.cwd()) if result.file_path.is_absolute() else result.file_path print(f" • {result.name}: {result.current_version} → {result.latest_version}") print(f" File: {rel_path}") print(f" Repo: {result.repo_url}") if show_hashes and result.new_hash: print(f" New hash: sha256-{result.new_hash}") def print_updates_only(results: List[UpdateResult], show_hashes: bool = False): """Print only packages with updates""" updates = [r for r in results if r.has_update] if not updates: print("No updates available") return print("Updates available:") for result in updates: rel_path = result.file_path.relative_to(Path.cwd()) if result.file_path.is_absolute() else result.file_path print(f" {result.name}: {result.current_version} → {result.latest_version}") print(f" File: {rel_path}") if show_hashes and result.new_hash: print(f" New hash: sha256-{result.new_hash}") elif show_hashes: print(f" Hash: (not fetched)") def output_json(results: List[UpdateResult]): """Output results as JSON""" data = {} for result in results: data[result.name] = { "current_version": result.current_version, "latest_version": result.latest_version, "has_update": result.has_update, "file_path": str(result.file_path), "repo_url": result.repo_url, "current_hash": result.current_hash, "new_hash": result.new_hash, "error": result.error } print(json.dumps(data, indent=2)) def main(): parser = argparse.ArgumentParser( description="Automatically check Nix packages for GitHub updates", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s # Check all packages in current directory %(prog)s --updates # Show only packages with updates %(prog)s --fetch-hash # Also fetch new hashes for updates %(prog)s --auto-update # Automatically update .nix files %(prog)s --json # Output as JSON %(prog)s --path ./packages # Check specific directory %(prog)s --depth 5 # Search deeper directory levels Requirements: For hash fetching: nix-prefetch-url (part of nix) or nix-prefetch-github nix-prefetch-url is preferred and usually already available """ ) parser.add_argument('--updates', action='store_true', help='Show only packages with updates available') parser.add_argument('--fetch-hash', action='store_true', help='Fetch new hashes for packages with updates (requires nix-prefetch-url or nix-prefetch-github)') parser.add_argument('--auto-update', action='store_true', help='Automatically update .nix files with new versions and hashes') parser.add_argument('--json', action='store_true', help='Output results as JSON') parser.add_argument('--path', action='append', default=[], help='Search path for .nix files (can be used multiple times)') parser.add_argument('--depth', type=int, default=3, help='Maximum directory depth to search (default: 3)') parser.add_argument('--list', action='store_true', help='List discovered package files without checking updates') args = parser.parse_args() # Auto-update implies fetch-hash if args.auto_update: args.fetch_hash = True # Use provided paths or default to current directory search_paths = args.path if args.path else ["."] checker = NixPackageChecker(search_paths=search_paths, max_depth=args.depth) if args.list: # Just list discovered files nix_files = checker.find_nix_packages() print(f"Discovered {len(nix_files)} package files:") for nix_file in nix_files: pkg_info = checker.parse_nix_file(nix_file) if pkg_info: rel_path = nix_file.relative_to(Path.cwd()) if nix_file.is_absolute() else nix_file print(f" {pkg_info.package_name:<25} {pkg_info.owner}/{pkg_info.repo} ({pkg_info.version}) - {rel_path}") else: rel_path = nix_file.relative_to(Path.cwd()) if nix_file.is_absolute() else nix_file print(f" {'(parse failed)':<25} - {rel_path}") return # Check for updates results = checker.check_all_packages( fetch_hash=args.fetch_hash, auto_update=args.auto_update ) if not results: print("No packages found to check") return # Output results if args.json: output_json(results) elif args.updates: print_updates_only(results, show_hashes=args.fetch_hash) else: print_results(results, show_all=True, show_hashes=args.fetch_hash) # Set exit code based on updates available updates_available = any(r.has_update for r in results) sys.exit(1 if updates_available else 0) if __name__ == "__main__": main()