# /etc/nixos/modules/ups-monitor/default.nix { config, lib, pkgs, ... }: with lib; let cfg = config.services.ups-monitor; pythonScript = pkgs.writeText "ups_monitor.py" '' import smbus import time import logging import json from datetime import datetime import os from pathlib import Path import sqlite3 import signal import sys class INA219: def __init__(self, i2c_bus=1, addr=0x40): self.bus = smbus.SMBus(i2c_bus) self.addr = addr self._cal_value = 0 self._current_lsb = 0 self._power_lsb = 0 self.set_calibration_32V_2A() def read(self,address): data = self.bus.read_i2c_block_data(self.addr, address, 2) return ((data[0] * 256 ) + data[1]) def write(self,address,data): temp = [0,0] temp[1] = data & 0xFF temp[0] =(data & 0xFF00) >> 8 self.bus.write_i2c_block_data(self.addr,address,temp) def set_calibration_32V_2A(self): self._current_lsb = .1 self._cal_value = 4096 self._power_lsb = .002 self.write(0x05, self._cal_value) self.bus_voltage_range = 0x01 self.gain = 0x03 self.bus_adc_resolution = 0x0D self.shunt_adc_resolution = 0x0D self.mode = 0x07 self.config = self.bus_voltage_range << 13 | \ self.gain << 11 | \ self.bus_adc_resolution << 7 | \ self.shunt_adc_resolution << 3 | \ self.mode self.write(0x00, self.config) def getBusVoltage_V(self): self.write(0x05, self._cal_value) self.read(0x02) return (self.read(0x02) >> 3) * 0.004 def getShuntVoltage_mV(self): self.write(0x05, self._cal_value) value = self.read(0x01) if value > 32767: value -= 65535 return value * 0.01 def getCurrent_mA(self): value = self.read(0x04) if value > 32767: value -= 65535 return value * self._current_lsb def getPower_W(self): self.write(0x05, self._cal_value) value = self.read(0x03) if value > 32767: value -= 65535 return value * self._power_lsb class UPSMonitor: def __init__(self, i2c_address, log_interval, battery_warning_threshold, voltage_warning_threshold, log_dir): self.ina219 = INA219(addr=i2c_address) self.log_interval = log_interval self.battery_warning_threshold = battery_warning_threshold self.voltage_warning_threshold = voltage_warning_threshold self.log_dir = Path(log_dir) self.running = True self.log_dir.mkdir(parents=True, exist_ok=True) self._setup_logging() self._setup_database() signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) def _setup_logging(self): self.logger = logging.getLogger('UPSMonitor') self.logger.setLevel(logging.INFO) fh = logging.FileHandler(self.log_dir / 'ups_monitor.log') fh.setLevel(logging.INFO) ch = logging.StreamHandler() ch.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') fh.setFormatter(formatter) ch.setFormatter(formatter) self.logger.addHandler(fh) self.logger.addHandler(ch) def _setup_database(self): db_path = self.log_dir / 'ups_metrics.db' self.conn = sqlite3.connect(str(db_path)) cursor = self.conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS metrics ( timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, bus_voltage REAL, shunt_voltage REAL, current REAL, power REAL, battery_percentage REAL ) ''') self.conn.commit() def _signal_handler(self, signum, frame): self.logger.info("Received shutdown signal, cleaning up...") self.running = False self.conn.close() sys.exit(0) def _calculate_battery_percentage(self, voltage): p = (voltage - 6) / 2.4 * 100 return max(0, min(100, p)) def _check_thresholds(self, voltage, battery_percentage): if battery_percentage <= self.battery_warning_threshold: self.logger.warning( f"Low battery warning: {battery_percentage:.1f}%") if voltage <= self.voltage_warning_threshold: self.logger.warning( f"Low voltage warning: {voltage:.2f}V") def _store_metrics(self, metrics): cursor = self.conn.cursor() cursor.execute(''' INSERT INTO metrics ( bus_voltage, shunt_voltage, current, power, battery_percentage ) VALUES (?, ?, ?, ?, ?) ''', ( metrics['bus_voltage'], metrics['shunt_voltage'], metrics['current'], metrics['power'], metrics['battery_percentage'] )) self.conn.commit() def get_metrics(self): bus_voltage = self.ina219.getBusVoltage_V() shunt_voltage = self.ina219.getShuntVoltage_mV() / 1000 current = self.ina219.getCurrent_mA() / 1000 power = self.ina219.getPower_W() battery_percentage = self._calculate_battery_percentage(bus_voltage) return { 'timestamp': datetime.now().isoformat(), 'bus_voltage': bus_voltage, 'shunt_voltage': shunt_voltage, 'current': current, 'power': power, 'battery_percentage': battery_percentage } def run(self): self.logger.info("Starting UPS monitoring service...") while self.running: try: metrics = self.get_metrics() self._store_metrics(metrics) self._check_thresholds( metrics['bus_voltage'], metrics['battery_percentage'] ) self.logger.info( f"Status: {metrics['battery_percentage']:.1f}% | " f"Voltage: {metrics['bus_voltage']:.2f}V | " f"Current: {metrics['current']:.3f}A | " f"Power: {metrics['power']:.2f}W" ) time.sleep(self.log_interval) except Exception as e: self.logger.error(f"Error in monitoring loop: {str(e)}") time.sleep(self.log_interval) if __name__ == '__main__': monitor = UPSMonitor( i2c_address=int(os.getenv('UPS_I2C_ADDRESS', '0x42'), 16), log_interval=int(os.getenv('UPS_LOG_INTERVAL', '2')), battery_warning_threshold=float(os.getenv('UPS_BATTERY_WARNING_THRESHOLD', '20')), voltage_warning_threshold=float(os.getenv('UPS_VOLTAGE_WARNING_THRESHOLD', '6.5')), log_dir=os.getenv('UPS_LOG_DIR', '/var/log/ups_monitor') ) monitor.run() ''; pythonEnv = pkgs.python3.withPackages (ps: with ps; [ smbus2 ]); in { options.services.ups-monitor = { enable = mkEnableOption "UPS HAT monitoring service"; i2cAddress = mkOption { type = types.str; default = "0x42"; description = "I2C address of the UPS HAT"; }; logInterval = mkOption { type = types.int; default = 2; description = "Interval between log entries in seconds"; }; batteryWarningThreshold = mkOption { type = types.float; default = 20.0; description = "Battery percentage threshold for warnings"; }; voltageWarningThreshold = mkOption { type = types.float; default = 6.5; description = "Voltage threshold for warnings"; }; logDir = mkOption { type = types.str; default = "/var/log/ups_monitor"; description = "Directory for storing logs and metrics"; }; user = mkOption { type = types.str; default = "root"; description = "User to run the service as"; }; group = mkOption { type = types.str; default = "root"; description = "Group to run the service as"; }; }; config = mkIf cfg.enable { systemd.services.ups-monitor = { description = "UPS HAT Monitoring Service"; wantedBy = [ "multi-user.target" ]; after = [ "network.target" ]; environment = { UPS_I2C_ADDRESS = cfg.i2cAddress; UPS_LOG_INTERVAL = toString cfg.logInterval; UPS_BATTERY_WARNING_THRESHOLD = toString cfg.batteryWarningThreshold; UPS_VOLTAGE_WARNING_THRESHOLD = toString cfg.voltageWarningThreshold; UPS_LOG_DIR = cfg.logDir; PYTHONPATH = "${pythonEnv}/${pythonEnv.python.sitePackages}"; }; serviceConfig = { ExecStart = "${pythonEnv}/bin/python3 ${pythonScript}"; User = cfg.user; Group = cfg.group; Restart = "always"; RestartSec = "10s"; # Hardening options ProtectSystem = "strict"; ProtectHome = true; PrivateTmp = true; ReadWritePaths = [ cfg.logDir ]; CapabilityBoundingSet = [ "CAP_SYS_RAWIO" ]; DeviceAllow = [ "/dev/i2c-1 rw" ]; }; }; # Ensure the log directory exists and has correct permissions systemd.tmpfiles.rules = [ "d '${cfg.logDir}' 0750 ${cfg.user} ${cfg.group} -" ]; # Enable I2C hardware.i2c.enable = true; }; }