import asyncio import weakref import awinlib.cpufantemp as cpufantemp import awinlib.hdd as hdd import awinlib.PrometheusDataExporterInterface as interface import awinlib.tapo import awinlib.utilities from awinlib.utilities import log class HddTempDataExporter(interface.IPrometheusDataExporter): DEFAULT_ERROR_HDD_TEMP = -273 def export(self): exportData = [] hddTempDict = hdd.getHddTemp() for hddKey in sorted(hddTempDict.keys()): hddName = hddKey.replace("/dev/", "") hddType = hddTempDict[hddKey].get("TYPE", "") hddTemperature = int(hddTempDict[hddKey].get("temp", HddTempDataExporter.DEFAULT_ERROR_HDD_TEMP)) # uuid = hddTempDict[hddKey].get("UUID", "").replace("-", "_") # partUuid = hddTempDict[hddKey].get("PARTUUID", "") gaugeName = f"{hddName}_{hddType}_celsius" gaugeDescription = f"HDD temperature of {gaugeName}" exportData.append({ 'name': gaugeName, 'description': gaugeDescription, 'value': hddTemperature }) return exportData class CpuFanTempDataExporter(interface.IPrometheusDataExporter): def export(self): exportData = [] fanTempDict = cpufantemp.getCpuTempAndFanSpeed() if "fan" not in fanTempDict: return exportData for fanKey in sorted(fanTempDict["fan"].keys()): fanSpeed = fanTempDict["fan"][fanKey] gaugeName = f"{fanKey}_rpm" gaugeDescription = f"Fan RPM of {gaugeName}" exportData.append({ 'name': gaugeName, 'description': gaugeDescription, 'value': fanSpeed }) return exportData class KasaPowerStripDataExporter(interface.IPrometheusDataExporter): def __init__(self, areaName, ip): self.powerStrip = awinlib.tapo.KasaPowerStrip(areaName, ip) def export(self): exportData = [] powerStripData = self.powerStrip.collectData() for keyId in sorted(powerStripData.keys()): for gaugeName in powerStripData[keyId].keys(): gaugeDesc = powerStripData[keyId][gaugeName]["description"] gaugeValue = powerStripData[keyId][gaugeName]["value"] exportData.append({ 'name': gaugeName, 'description': gaugeDesc, 'value': gaugeValue }) return exportData class TapoT315DataExporter(interface.IPrometheusDataExporter): def __init__(self, ip, username, password, devName, devNameInEnglish=""): self.devIp = ip self.devUsername = username self.devPassword = password self.devName = devName self.devNameInEnglish = devNameInEnglish self.devInstance = awinlib.tapo.TplinkT315(ip, username, password, devName) if awinlib.utilities.syncRun(self.devInstance.connect()) is True: log(f'Connected to T315({ip}, {devName})') self.__finalizer = weakref.finalize(self, self._del, self) else: self.devInstance = None raise ConnectionError(f"Failed to connect to T315({ip}, {devName})") @staticmethod def _del(this): if this.devInstance: log('Disconnecting from T315') awinlib.utilities.syncRun(this.devInstance.disconnect()) def export(self): exportData = [] temperatureValue = awinlib.utilities.syncRun(self.devInstance.getTemperature()) if temperatureValue is not None: gaugeName = f"{self.normalizeGaugename(self.devNameInEnglish)}_temperature_celsius" if self.devNameInEnglish != "" else "temperature_celsius" gaugeDescription = "Temperature of T315" exportData.append({ 'name': gaugeName, 'description': gaugeDescription, 'value': temperatureValue }) humidityValue = awinlib.utilities.syncRun(self.devInstance.getHumidity()) if humidityValue is not None: gaugeName = f"{self.normalizeGaugename(self.devNameInEnglish)}_humidity_percent" if self.devNameInEnglish != "" else "humidity_percent" gaugeDescription = "Humidity of T315" exportData.append({ 'name': gaugeName, 'description': gaugeDescription, 'value': humidityValue }) return exportData