This commit is contained in:
mjallen18
2025-01-30 20:39:49 -06:00
parent 3ec513162f
commit 19605b7e63
2 changed files with 315 additions and 0 deletions

View File

@@ -19,6 +19,7 @@ in
./hardware-configuration.nix
./impermanence.nix
./sops.nix
./ups-monitor.nix
../default.nix
];
@@ -56,6 +57,10 @@ in
};
};
services.ups-monitor = {
enable = true;
};
# hardware = {
# raspberry-pi."4".fkms-3d.enable = true;
# raspberry-pi."4".apply-overlays-dtmerge.enable = true;

310
hosts/pi4/ups-monitor.nix Normal file
View File

@@ -0,0 +1,310 @@
# /etc/nixos/modules/ups-monitor/default.nix
{ config, lib, pkgs, ... }:
with lib;
let
cfg = config.services.ups-monitor;
pythonScript = pkgs.writeText "ups_monitor.py" ''
import smbus
import time
import logging
import json
from datetime import datetime
import os
from pathlib import Path
import sqlite3
import signal
import sys
class INA219:
def __init__(self, i2c_bus=1, addr=0x40):
self.bus = smbus.SMBus(i2c_bus)
self.addr = addr
self._cal_value = 0
self._current_lsb = 0
self._power_lsb = 0
self.set_calibration_32V_2A()
def read(self,address):
data = self.bus.read_i2c_block_data(self.addr, address, 2)
return ((data[0] * 256 ) + data[1])
def write(self,address,data):
temp = [0,0]
temp[1] = data & 0xFF
temp[0] =(data & 0xFF00) >> 8
self.bus.write_i2c_block_data(self.addr,address,temp)
def set_calibration_32V_2A(self):
self._current_lsb = .1
self._cal_value = 4096
self._power_lsb = .002
self.write(0x05, self._cal_value)
self.bus_voltage_range = 0x01
self.gain = 0x03
self.bus_adc_resolution = 0x0D
self.shunt_adc_resolution = 0x0D
self.mode = 0x07
self.config = self.bus_voltage_range << 13 | \
self.gain << 11 | \
self.bus_adc_resolution << 7 | \
self.shunt_adc_resolution << 3 | \
self.mode
self.write(0x00, self.config)
def getBusVoltage_V(self):
self.write(0x05, self._cal_value)
self.read(0x02)
return (self.read(0x02) >> 3) * 0.004
def getShuntVoltage_mV(self):
self.write(0x05, self._cal_value)
value = self.read(0x01)
if value > 32767:
value -= 65535
return value * 0.01
def getCurrent_mA(self):
value = self.read(0x04)
if value > 32767:
value -= 65535
return value * self._current_lsb
def getPower_W(self):
self.write(0x05, self._cal_value)
value = self.read(0x03)
if value > 32767:
value -= 65535
return value * self._power_lsb
class UPSMonitor:
def __init__(self, i2c_address, log_interval, battery_warning_threshold,
voltage_warning_threshold, log_dir):
self.ina219 = INA219(addr=i2c_address)
self.log_interval = log_interval
self.battery_warning_threshold = battery_warning_threshold
self.voltage_warning_threshold = voltage_warning_threshold
self.log_dir = Path(log_dir)
self.running = True
self.log_dir.mkdir(parents=True, exist_ok=True)
self._setup_logging()
self._setup_database()
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _setup_logging(self):
self.logger = logging.getLogger('UPSMonitor')
self.logger.setLevel(logging.INFO)
fh = logging.FileHandler(self.log_dir / 'ups_monitor.log')
fh.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
self.logger.addHandler(fh)
self.logger.addHandler(ch)
def _setup_database(self):
db_path = self.log_dir / 'ups_metrics.db'
self.conn = sqlite3.connect(str(db_path))
cursor = self.conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS metrics (
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
bus_voltage REAL,
shunt_voltage REAL,
current REAL,
power REAL,
battery_percentage REAL
)
''')
self.conn.commit()
def _signal_handler(self, signum, frame):
self.logger.info("Received shutdown signal, cleaning up...")
self.running = False
self.conn.close()
sys.exit(0)
def _calculate_battery_percentage(self, voltage):
p = (voltage - 6) / 2.4 * 100
return max(0, min(100, p))
def _check_thresholds(self, voltage, battery_percentage):
if battery_percentage <= self.battery_warning_threshold:
self.logger.warning(
f"Low battery warning: {battery_percentage:.1f}%")
if voltage <= self.voltage_warning_threshold:
self.logger.warning(
f"Low voltage warning: {voltage:.2f}V")
def _store_metrics(self, metrics):
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO metrics (
bus_voltage, shunt_voltage, current, power, battery_percentage
) VALUES (?, ?, ?, ?, ?)
''', (
metrics['bus_voltage'],
metrics['shunt_voltage'],
metrics['current'],
metrics['power'],
metrics['battery_percentage']
))
self.conn.commit()
def get_metrics(self):
bus_voltage = self.ina219.getBusVoltage_V()
shunt_voltage = self.ina219.getShuntVoltage_mV() / 1000
current = self.ina219.getCurrent_mA() / 1000
power = self.ina219.getPower_W()
battery_percentage = self._calculate_battery_percentage(bus_voltage)
return {
'timestamp': datetime.now().isoformat(),
'bus_voltage': bus_voltage,
'shunt_voltage': shunt_voltage,
'current': current,
'power': power,
'battery_percentage': battery_percentage
}
def run(self):
self.logger.info("Starting UPS monitoring service...")
while self.running:
try:
metrics = self.get_metrics()
self._store_metrics(metrics)
self._check_thresholds(
metrics['bus_voltage'],
metrics['battery_percentage']
)
self.logger.info(
f"Status: {metrics['battery_percentage']:.1f}% | "
f"Voltage: {metrics['bus_voltage']:.2f}V | "
f"Current: {metrics['current']:.3f}A | "
f"Power: {metrics['power']:.2f}W"
)
time.sleep(self.log_interval)
except Exception as e:
self.logger.error(f"Error in monitoring loop: {str(e)}")
time.sleep(self.log_interval)
if __name__ == '__main__':
monitor = UPSMonitor(
i2c_address=int(os.getenv('UPS_I2C_ADDRESS', '0x42'), 16),
log_interval=int(os.getenv('UPS_LOG_INTERVAL', '2')),
battery_warning_threshold=float(os.getenv('UPS_BATTERY_WARNING_THRESHOLD', '20')),
voltage_warning_threshold=float(os.getenv('UPS_VOLTAGE_WARNING_THRESHOLD', '6.5')),
log_dir=os.getenv('UPS_LOG_DIR', '/var/log/ups_monitor')
)
monitor.run()
'';
pythonEnv = pkgs.python3.withPackages (ps: with ps; [
smbus2
]);
in {
options.services.ups-monitor = {
enable = mkEnableOption "UPS HAT monitoring service";
i2cAddress = mkOption {
type = types.str;
default = "0x42";
description = "I2C address of the UPS HAT";
};
logInterval = mkOption {
type = types.int;
default = 2;
description = "Interval between log entries in seconds";
};
batteryWarningThreshold = mkOption {
type = types.float;
default = 20.0;
description = "Battery percentage threshold for warnings";
};
voltageWarningThreshold = mkOption {
type = types.float;
default = 6.5;
description = "Voltage threshold for warnings";
};
logDir = mkOption {
type = types.str;
default = "/var/log/ups_monitor";
description = "Directory for storing logs and metrics";
};
user = mkOption {
type = types.str;
default = "root";
description = "User to run the service as";
};
group = mkOption {
type = types.str;
default = "root";
description = "Group to run the service as";
};
};
config = mkIf cfg.enable {
systemd.services.ups-monitor = {
description = "UPS HAT Monitoring Service";
wantedBy = [ "multi-user.target" ];
after = [ "network.target" ];
environment = {
UPS_I2C_ADDRESS = cfg.i2cAddress;
UPS_LOG_INTERVAL = toString cfg.logInterval;
UPS_BATTERY_WARNING_THRESHOLD = toString cfg.batteryWarningThreshold;
UPS_VOLTAGE_WARNING_THRESHOLD = toString cfg.voltageWarningThreshold;
UPS_LOG_DIR = cfg.logDir;
PYTHONPATH = "${pythonEnv}/${pythonEnv.python.sitePackages}";
};
serviceConfig = {
ExecStart = "${pythonEnv}/bin/python3 ${pythonScript}";
User = cfg.user;
Group = cfg.group;
Restart = "always";
RestartSec = "10s";
# Hardening options
ProtectSystem = "strict";
ProtectHome = true;
PrivateTmp = true;
ReadWritePaths = [ cfg.logDir ];
CapabilityBoundingSet = [ "CAP_SYS_RAWIO" ];
DeviceAllow = [ "/dev/i2c-1 rw" ];
};
};
# Ensure the log directory exists and has correct permissions
systemd.tmpfiles.rules = [
"d '${cfg.logDir}' 0750 ${cfg.user} ${cfg.group} -"
];
# Enable I2C
hardware.i2c.enable = true;
};
}