diff --git a/hosts/pi4/configuration.nix b/hosts/pi4/configuration.nix index 3f52c60..bf5dd3d 100644 --- a/hosts/pi4/configuration.nix +++ b/hosts/pi4/configuration.nix @@ -19,6 +19,7 @@ in ./hardware-configuration.nix ./impermanence.nix ./sops.nix + ./ups-monitor.nix ../default.nix ]; @@ -56,6 +57,10 @@ in }; }; + services.ups-monitor = { + enable = true; + }; + # hardware = { # raspberry-pi."4".fkms-3d.enable = true; # raspberry-pi."4".apply-overlays-dtmerge.enable = true; diff --git a/hosts/pi4/ups-monitor.nix b/hosts/pi4/ups-monitor.nix new file mode 100644 index 0000000..7a25430 --- /dev/null +++ b/hosts/pi4/ups-monitor.nix @@ -0,0 +1,310 @@ +# /etc/nixos/modules/ups-monitor/default.nix +{ config, lib, pkgs, ... }: + +with lib; + +let + cfg = config.services.ups-monitor; + + pythonScript = pkgs.writeText "ups_monitor.py" '' + import smbus + import time + import logging + import json + from datetime import datetime + import os + from pathlib import Path + import sqlite3 + import signal + import sys + + class INA219: + def __init__(self, i2c_bus=1, addr=0x40): + self.bus = smbus.SMBus(i2c_bus) + self.addr = addr + self._cal_value = 0 + self._current_lsb = 0 + self._power_lsb = 0 + self.set_calibration_32V_2A() + + def read(self,address): + data = self.bus.read_i2c_block_data(self.addr, address, 2) + return ((data[0] * 256 ) + data[1]) + + def write(self,address,data): + temp = [0,0] + temp[1] = data & 0xFF + temp[0] =(data & 0xFF00) >> 8 + self.bus.write_i2c_block_data(self.addr,address,temp) + + def set_calibration_32V_2A(self): + self._current_lsb = .1 + self._cal_value = 4096 + self._power_lsb = .002 + self.write(0x05, self._cal_value) + self.bus_voltage_range = 0x01 + self.gain = 0x03 + self.bus_adc_resolution = 0x0D + self.shunt_adc_resolution = 0x0D + self.mode = 0x07 + self.config = self.bus_voltage_range << 13 | \ + self.gain << 11 | \ + self.bus_adc_resolution << 7 | \ + self.shunt_adc_resolution << 3 | \ + self.mode + self.write(0x00, self.config) + + def getBusVoltage_V(self): + self.write(0x05, self._cal_value) + self.read(0x02) + return (self.read(0x02) >> 3) * 0.004 + + def getShuntVoltage_mV(self): + self.write(0x05, self._cal_value) + value = self.read(0x01) + if value > 32767: + value -= 65535 + return value * 0.01 + + def getCurrent_mA(self): + value = self.read(0x04) + if value > 32767: + value -= 65535 + return value * self._current_lsb + + def getPower_W(self): + self.write(0x05, self._cal_value) + value = self.read(0x03) + if value > 32767: + value -= 65535 + return value * self._power_lsb + + class UPSMonitor: + def __init__(self, i2c_address, log_interval, battery_warning_threshold, + voltage_warning_threshold, log_dir): + self.ina219 = INA219(addr=i2c_address) + self.log_interval = log_interval + self.battery_warning_threshold = battery_warning_threshold + self.voltage_warning_threshold = voltage_warning_threshold + self.log_dir = Path(log_dir) + self.running = True + + self.log_dir.mkdir(parents=True, exist_ok=True) + self._setup_logging() + self._setup_database() + + signal.signal(signal.SIGINT, self._signal_handler) + signal.signal(signal.SIGTERM, self._signal_handler) + + def _setup_logging(self): + self.logger = logging.getLogger('UPSMonitor') + self.logger.setLevel(logging.INFO) + + fh = logging.FileHandler(self.log_dir / 'ups_monitor.log') + fh.setLevel(logging.INFO) + + ch = logging.StreamHandler() + ch.setLevel(logging.INFO) + + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + fh.setFormatter(formatter) + ch.setFormatter(formatter) + + self.logger.addHandler(fh) + self.logger.addHandler(ch) + + def _setup_database(self): + db_path = self.log_dir / 'ups_metrics.db' + self.conn = sqlite3.connect(str(db_path)) + cursor = self.conn.cursor() + + cursor.execute(''' + CREATE TABLE IF NOT EXISTS metrics ( + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + bus_voltage REAL, + shunt_voltage REAL, + current REAL, + power REAL, + battery_percentage REAL + ) + ''') + self.conn.commit() + + def _signal_handler(self, signum, frame): + self.logger.info("Received shutdown signal, cleaning up...") + self.running = False + self.conn.close() + sys.exit(0) + + def _calculate_battery_percentage(self, voltage): + p = (voltage - 6) / 2.4 * 100 + return max(0, min(100, p)) + + def _check_thresholds(self, voltage, battery_percentage): + if battery_percentage <= self.battery_warning_threshold: + self.logger.warning( + f"Low battery warning: {battery_percentage:.1f}%") + + if voltage <= self.voltage_warning_threshold: + self.logger.warning( + f"Low voltage warning: {voltage:.2f}V") + + def _store_metrics(self, metrics): + cursor = self.conn.cursor() + cursor.execute(''' + INSERT INTO metrics ( + bus_voltage, shunt_voltage, current, power, battery_percentage + ) VALUES (?, ?, ?, ?, ?) + ''', ( + metrics['bus_voltage'], + metrics['shunt_voltage'], + metrics['current'], + metrics['power'], + metrics['battery_percentage'] + )) + self.conn.commit() + + def get_metrics(self): + bus_voltage = self.ina219.getBusVoltage_V() + shunt_voltage = self.ina219.getShuntVoltage_mV() / 1000 + current = self.ina219.getCurrent_mA() / 1000 + power = self.ina219.getPower_W() + battery_percentage = self._calculate_battery_percentage(bus_voltage) + + return { + 'timestamp': datetime.now().isoformat(), + 'bus_voltage': bus_voltage, + 'shunt_voltage': shunt_voltage, + 'current': current, + 'power': power, + 'battery_percentage': battery_percentage + } + + def run(self): + self.logger.info("Starting UPS monitoring service...") + + while self.running: + try: + metrics = self.get_metrics() + self._store_metrics(metrics) + self._check_thresholds( + metrics['bus_voltage'], + metrics['battery_percentage'] + ) + + self.logger.info( + f"Status: {metrics['battery_percentage']:.1f}% | " + f"Voltage: {metrics['bus_voltage']:.2f}V | " + f"Current: {metrics['current']:.3f}A | " + f"Power: {metrics['power']:.2f}W" + ) + + time.sleep(self.log_interval) + + except Exception as e: + self.logger.error(f"Error in monitoring loop: {str(e)}") + time.sleep(self.log_interval) + + if __name__ == '__main__': + monitor = UPSMonitor( + i2c_address=int(os.getenv('UPS_I2C_ADDRESS', '0x42'), 16), + log_interval=int(os.getenv('UPS_LOG_INTERVAL', '2')), + battery_warning_threshold=float(os.getenv('UPS_BATTERY_WARNING_THRESHOLD', '20')), + voltage_warning_threshold=float(os.getenv('UPS_VOLTAGE_WARNING_THRESHOLD', '6.5')), + log_dir=os.getenv('UPS_LOG_DIR', '/var/log/ups_monitor') + ) + monitor.run() + ''; + + pythonEnv = pkgs.python3.withPackages (ps: with ps; [ + smbus2 + ]); + +in { + options.services.ups-monitor = { + enable = mkEnableOption "UPS HAT monitoring service"; + + i2cAddress = mkOption { + type = types.str; + default = "0x42"; + description = "I2C address of the UPS HAT"; + }; + + logInterval = mkOption { + type = types.int; + default = 2; + description = "Interval between log entries in seconds"; + }; + + batteryWarningThreshold = mkOption { + type = types.float; + default = 20.0; + description = "Battery percentage threshold for warnings"; + }; + + voltageWarningThreshold = mkOption { + type = types.float; + default = 6.5; + description = "Voltage threshold for warnings"; + }; + + logDir = mkOption { + type = types.str; + default = "/var/log/ups_monitor"; + description = "Directory for storing logs and metrics"; + }; + + user = mkOption { + type = types.str; + default = "root"; + description = "User to run the service as"; + }; + + group = mkOption { + type = types.str; + default = "root"; + description = "Group to run the service as"; + }; + }; + + config = mkIf cfg.enable { + systemd.services.ups-monitor = { + description = "UPS HAT Monitoring Service"; + wantedBy = [ "multi-user.target" ]; + after = [ "network.target" ]; + + environment = { + UPS_I2C_ADDRESS = cfg.i2cAddress; + UPS_LOG_INTERVAL = toString cfg.logInterval; + UPS_BATTERY_WARNING_THRESHOLD = toString cfg.batteryWarningThreshold; + UPS_VOLTAGE_WARNING_THRESHOLD = toString cfg.voltageWarningThreshold; + UPS_LOG_DIR = cfg.logDir; + PYTHONPATH = "${pythonEnv}/${pythonEnv.python.sitePackages}"; + }; + + serviceConfig = { + ExecStart = "${pythonEnv}/bin/python3 ${pythonScript}"; + User = cfg.user; + Group = cfg.group; + Restart = "always"; + RestartSec = "10s"; + + # Hardening options + ProtectSystem = "strict"; + ProtectHome = true; + PrivateTmp = true; + ReadWritePaths = [ cfg.logDir ]; + CapabilityBoundingSet = [ "CAP_SYS_RAWIO" ]; + DeviceAllow = [ "/dev/i2c-1 rw" ]; + }; + }; + + # Ensure the log directory exists and has correct permissions + systemd.tmpfiles.rules = [ + "d '${cfg.logDir}' 0750 ${cfg.user} ${cfg.group} -" + ]; + + # Enable I2C + hardware.i2c.enable = true; + }; +} \ No newline at end of file