Files
linux_script/awinlib/PrometheusDataExporter.py

123 lines
4.5 KiB
Python

import asyncio
import weakref
import awinlib.cpufantemp as cpufantemp
import awinlib.hdd as hdd
import awinlib.PrometheusDataExporterInterface as interface
import awinlib.tapo
import awinlib.utilities
from awinlib.utilities import log
class HddTempDataExporter(interface.IPrometheusDataExporter):
DEFAULT_ERROR_HDD_TEMP = -273
def export(self):
exportData = []
hddTempDict = hdd.getHddTemp()
for hddKey in sorted(hddTempDict.keys()):
hddName = hddKey.replace("/dev/", "")
hddType = hddTempDict[hddKey].get("TYPE", "")
hddTemperature = int(hddTempDict[hddKey].get("temp", HddTempDataExporter.DEFAULT_ERROR_HDD_TEMP))
# uuid = hddTempDict[hddKey].get("UUID", "").replace("-", "_")
# partUuid = hddTempDict[hddKey].get("PARTUUID", "")
gaugeName = f"{hddName}_{hddType}_celsius"
gaugeDescription = f"HDD temperature of {gaugeName}"
exportData.append({
'name': gaugeName,
'description': gaugeDescription,
'value': hddTemperature
})
return exportData
class CpuFanTempDataExporter(interface.IPrometheusDataExporter):
def export(self):
exportData = []
fanTempDict = cpufantemp.getCpuTempAndFanSpeed()
if "fan" not in fanTempDict:
return exportData
for fanKey in sorted(fanTempDict["fan"].keys()):
fanSpeed = fanTempDict["fan"][fanKey]
gaugeName = f"{fanKey}_rpm"
gaugeDescription = f"Fan RPM of {gaugeName}"
exportData.append({
'name': gaugeName,
'description': gaugeDescription,
'value': fanSpeed
})
return exportData
class KasaPowerStripDataExporter(interface.IPrometheusDataExporter):
def __init__(self, areaName, ip):
self.powerStrip = awinlib.tapo.KasaPowerStrip(areaName, ip)
def export(self):
exportData = []
powerStripData = self.powerStrip.collectData()
for keyId in sorted(powerStripData.keys()):
for gaugeName in powerStripData[keyId].keys():
gaugeDesc = powerStripData[keyId][gaugeName]["description"]
gaugeValue = powerStripData[keyId][gaugeName]["value"]
exportData.append({
'name': gaugeName,
'description': gaugeDesc,
'value': gaugeValue
})
return exportData
class TapoT315DataExporter(interface.IPrometheusDataExporter):
def __init__(self, ip, username, password, devName, devNameInEnglish=""):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
self.devIp = ip
self.devUsername = username
self.devPassword = password
self.devName = devName
self.devNameInEnglish = devNameInEnglish
self.devInstance = awinlib.tapo.TplinkT315(ip, username, password, devName)
if awinlib.utilities.syncRun(self.devInstance.connect()) is True:
log(f'Connected to T315({ip}, {devName})')
self.__finalizer = weakref.finalize(self, self._del, self)
else:
self.devInstance = None
raise ConnectionError(f"Failed to connect to T315({ip}, {devName})")
@staticmethod
def _del(this):
if this.devInstance:
log('Disconnecting from T315')
awinlib.utilities.syncRun(this.devInstance.disconnect())
def export(self):
exportData = []
temperatureValue = awinlib.utilities.syncRun(self.devInstance.getTemperature())
if temperatureValue is not None:
gaugeName = f"{self.normalizeGaugename(self.devNameInEnglish)}_temperature_celsius" if self.devNameInEnglish != "" else "temperature_celsius"
gaugeDescription = "Temperature of T315"
exportData.append({
'name': gaugeName,
'description': gaugeDescription,
'value': temperatureValue
})
humidityValue = awinlib.utilities.syncRun(self.devInstance.getHumidity())
if humidityValue is not None:
gaugeName = f"{self.normalizeGaugename(self.devNameInEnglish)}_humidity_percent" if self.devNameInEnglish != "" else "humidity_percent"
gaugeDescription = "Humidity of T315"
exportData.append({
'name': gaugeName,
'description': gaugeDescription,
'value': humidityValue
})
return exportData