Implement Prometheus data exporters for CPU fan temperature and HDD temperature, along with utility functions for logging and command execution.

This commit is contained in:
2025-02-05 10:47:08 +08:00
parent d34588467b
commit 665a8ea05a
8 changed files with 546 additions and 0 deletions

View File

@@ -0,0 +1,122 @@
import asyncio
import weakref
import awinlib.cpufantemp as cpufantemp
import awinlib.hdd as hdd
import awinlib.PrometheusDataExporterInterface as interface
import awinlib.tapo
import awinlib.utilities
from awinlib.utilities import log
class HddTempDataExporter(interface.IPrometheusDataExporter):
DEFAULT_ERROR_HDD_TEMP = -273
def export(self):
exportData = []
hddTempDict = hdd.getHddTemp()
for hddKey in sorted(hddTempDict.keys()):
hddName = hddKey.replace("/dev/", "")
hddType = hddTempDict[hddKey].get("TYPE", "")
hddTemperature = int(hddTempDict[hddKey].get("temp", HddTempDataExporter.DEFAULT_ERROR_HDD_TEMP))
# uuid = hddTempDict[hddKey].get("UUID", "").replace("-", "_")
# partUuid = hddTempDict[hddKey].get("PARTUUID", "")
gaugeName = f"{hddName}_{hddType}_celsius"
gaugeDescription = f"HDD temperature of {gaugeName}"
exportData.append({
'name': gaugeName,
'description': gaugeDescription,
'value': hddTemperature
})
return exportData
class CpuFanTempDataExporter(interface.IPrometheusDataExporter):
def export(self):
exportData = []
fanTempDict = cpufantemp.getCpuTempAndFanSpeed()
if "fan" not in fanTempDict:
return exportData
for fanKey in sorted(fanTempDict["fan"].keys()):
fanSpeed = fanTempDict["fan"][fanKey]
gaugeName = f"{fanKey}_rpm"
gaugeDescription = f"Fan RPM of {gaugeName}"
exportData.append({
'name': gaugeName,
'description': gaugeDescription,
'value': fanSpeed
})
return exportData
class KasaPowerStripDataExporter(interface.IPrometheusDataExporter):
def __init__(self, areaName, ip):
self.powerStrip = awinlib.tapo.KasaPowerStrip(areaName, ip)
def export(self):
exportData = []
powerStripData = self.powerStrip.collectData()
for keyId in sorted(powerStripData.keys()):
for gaugeName in powerStripData[keyId].keys():
gaugeDesc = powerStripData[keyId][gaugeName]["description"]
gaugeValue = powerStripData[keyId][gaugeName]["value"]
exportData.append({
'name': gaugeName,
'description': gaugeDesc,
'value': gaugeValue
})
return exportData
class TapoT315DataExporter(interface.IPrometheusDataExporter):
def __init__(self, ip, username, password, devName, devNameInEnglish=""):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
self.devIp = ip
self.devUsername = username
self.devPassword = password
self.devName = devName
self.devNameInEnglish = devNameInEnglish
self.devInstance = awinlib.tapo.TplinkT315(ip, username, password, devName)
if awinlib.utilities.syncRun(self.devInstance.connect()) is True:
log(f'Connected to T315({ip}, {devName})')
self.__finalizer = weakref.finalize(self, self._del, self)
else:
self.devInstance = None
raise ConnectionError(f"Failed to connect to T315({ip}, {devName})")
@staticmethod
def _del(this):
if this.devInstance:
log('Disconnecting from T315')
awinlib.utilities.syncRun(this.devInstance.disconnect())
def export(self):
exportData = []
temperatureValue = awinlib.utilities.syncRun(self.devInstance.getTemperature())
if temperatureValue is not None:
gaugeName = f"{self.normalizeGaugename(self.devNameInEnglish)}_temperature_celsius" if self.devNameInEnglish != "" else "temperature_celsius"
gaugeDescription = "Temperature of T315"
exportData.append({
'name': gaugeName,
'description': gaugeDescription,
'value': temperatureValue
})
humidityValue = awinlib.utilities.syncRun(self.devInstance.getHumidity())
if humidityValue is not None:
gaugeName = f"{self.normalizeGaugename(self.devNameInEnglish)}_humidity_percent" if self.devNameInEnglish != "" else "humidity_percent"
gaugeDescription = "Humidity of T315"
exportData.append({
'name': gaugeName,
'description': gaugeDescription,
'value': humidityValue
})
return exportData

View File

@@ -0,0 +1,10 @@
import abc
class IPrometheusDataExporter:
@abc.abstractmethod
def export(self):
pass
def normalizeGaugename(self, originalName):
return originalName.replace("-", "_").lower()

15
awinlib/__init__.py Normal file
View File

@@ -0,0 +1,15 @@
import logging
from rich.logging import RichHandler
LOGFORMAT_RICH = '%(thread)d %(message)s'
richHandler = RichHandler(
rich_tracebacks=True,
)
richHandler.setFormatter(logging.Formatter(LOGFORMAT_RICH))
logging.basicConfig(
level=logging.INFO,
handlers=[
richHandler,
]
)

54
awinlib/cpufantemp.py Normal file
View File

@@ -0,0 +1,54 @@
#!/usr/bin/python3
import json
import os
from .utilities import runCmd
def getCpuTempAndFanSpeed():
if os.name == "nt":
return {}
cmd = "sensors"
ret = runCmd(cmd).split("\n")
fanTempDict = {
"cpu": {},
"fan": {}
}
for line in ret:
if line.startswith("Core"):
line = line.split(":")
if len(line) > 1:
cpuIndex, tempString = line[0], line[1]
tempString = tempString.split(" ")
tempString = [x for x in tempString if x != ""]
if len(tempString) > 1:
temp = float(tempString[0].replace("+", "").replace("°C", ""))
else:
temp = -999.9
fanTempDict["cpu"][cpuIndex] = temp
elif line.startswith("fan"):
line = line.split(":")
if len(line) > 1:
fanId, fanNumber = line[0], line[1]
fanNumber = fanNumber.split(" ")
fanNumber = [x for x in fanNumber if x != ""]
if len(fanNumber) > 1:
fanSpeed = int(fanNumber[0])
else:
fanSpeed = 0
if fanSpeed > 0:
fanTempDict["fan"][fanId] = fanSpeed
return fanTempDict
def main():
fanTempDict = getCpuTempAndFanSpeed()
print(f"{json.dumps(fanTempDict, indent=4)}")
if __name__ == "__main__":
main()

49
awinlib/hdd.py Normal file
View File

@@ -0,0 +1,49 @@
import os
from .utilities import runCmd
def findHddIdInNt():
return {}
def findHddIdInPosix():
hddDict = {}
results = runCmd("blkid").split("\n")
for line in results:
if line == "":
continue
hddInfoList = line.split(" ")
if len(hddInfoList) > 1 and hddInfoList[0].startswith("/dev/sd"):
hddName = hddInfoList[0].replace(":", "")
hddDict[hddName] = {}
for hddInfoline in hddInfoList[1:]:
kvList = hddInfoline.split("=")
if len(kvList) > 1:
hddDict[hddName][kvList[0].replace("\"", "")] = kvList[1].replace("\"", "")
return hddDict
def findHddId():
hddDict = {}
if os.name == "nt":
hddDict = findHddIdInNt()
elif os.name == "posix":
hddDict = findHddIdInPosix()
return hddDict
def findHddTemp(hddDict):
for hddName in sorted(hddDict.keys()):
cmd = f"smartctl -A {hddName} | grep Temperature_Celsius"
hddTempString = runCmd(cmd)
hddTempList = hddTempString.split(" ")
hddTempList = [item for item in hddTempList if item != ""]
if len(hddTempList) > 9:
hddDict[hddName]["temp"] = int(hddTempList[9])
def getHddTemp():
hddDict = findHddId()
findHddTemp(hddDict)
return hddDict

132
awinlib/tapo.py Normal file
View File

@@ -0,0 +1,132 @@
from kasa import Discover
from kasa.exceptions import KasaException
from . import KasaSmartPowerStrip
from .utilities import log as log
class KasaPowerStrip:
def __init__(self, areaName, ip):
self.areaName = areaName
self.ip = ip
try:
self.powerStrip = KasaSmartPowerStrip.SmartPowerStrip(ip)
except:
self.powerStrip = None
def collectData(self):
dataDict = {}
devSysInfo = self.powerStrip.get_system_info()
for c in devSysInfo['system']['get_sysinfo']['children']:
try:
cid = int(c['id']) + 1
except:
continue
# aliasName = c['alias'].encode('latin1').decode('UTF-8')
gaugeName_Current_ma = f"{self.areaName}_{cid}_current_ma"
gaugeName_Current_ma_desc = f"Current(milliampere) of {self.areaName}:{cid}"
gaugeName_Voltage_mv = f"{self.areaName}_{cid}_voltage_mv"
gaugeName_Voltage_mv_desc = f"Voltage(millivolt) of {self.areaName}:{cid}"
gaugeName_Power_mw = f"{self.areaName}_{cid}_power_mw"
gaugeName_Power_mw_desc = f"Power(milliwatt) of {self.areaName}:{cid}"
gaugeName_TotalWh = f"{self.areaName}_{cid}_total_wh"
gaugeName_TotalWh_desc = f"Total watt-hour of {self.areaName}:{cid}"
## Get Data
realtimeInfo = self.powerStrip.get_realtime_energy_info(plug_num=cid)
realTime_current_ma = realtimeInfo.get("current_ma", 0)
realTime_voltage_mv = realtimeInfo.get("voltage_mv", 0)
realTime_power_mw = realtimeInfo.get("power_mw", 0)
realTime_total_wh = realtimeInfo.get("total_wh", 0)
# log(f"Set {gaugeName_Current_ma}: {realTime_current_ma}")
# log(f"Set {gaugeName_Voltage_mv}: {realTime_voltage_mv}")
# log(f"Set {gaugeName_Power_mw}: {realTime_power_mw}")
# log(f"Set {gaugeName_TotalWh}: {realTime_total_wh}")
dataDict[cid] = {
gaugeName_Current_ma: {
"description": gaugeName_Current_ma_desc,
"value": realTime_current_ma,
},
gaugeName_Voltage_mv: {
"description": gaugeName_Voltage_mv_desc,
"value": realTime_voltage_mv,
},
gaugeName_Power_mw: {
"description": gaugeName_Power_mw_desc,
"value": realTime_power_mw,
},
gaugeName_TotalWh: {
"description": gaugeName_TotalWh_desc,
"value": realTime_total_wh,
}
}
return dataDict
class TplinkT315:
def __init__(self, ip, username, password, devName):
self.ip = ip
self.username = username
self.password = password
self.devName = devName
self.hubDev = None
self.t315Dev = None
async def connect(self):
try:
self.hubDev = await Discover.discover_single(self.ip, username=self.username, password=self.password)
await self.hubDev.update()
except KasaException as e:
self.hubDev = None
log(f"Hub Error: {e}")
return False
try:
self.t315Dev = self.hubDev.get_plug_by_name(self.devName)
await self.t315Dev.update()
except KasaException as e:
self.t315Dev = None
log(f"Device Error: {e}")
return False
return True
async def getTemperature(self):
temperatureValue = await self.getProperty("temperature")
if temperatureValue is None:
log("temperatureValue is None")
return None
temperatureValue = float(temperatureValue)
return temperatureValue
async def getHumidity(self):
humidityValue = await self.getProperty("humidity")
if humidityValue is None:
log("humidityValue is None")
return None
humidityValue = float(humidityValue)
return humidityValue
async def getProperty(self, propName):
if self.t315Dev is None:
return None
await self.hubDev.update()
await self.t315Dev.update()
featuresOfTempDev = self.t315Dev.features
if propName not in featuresOfTempDev:
return None
return featuresOfTempDev[propName].value
async def disconnect(self):
if self.hubDev is None:
return
await self.hubDev.disconnect()

77
awinlib/utilities.py Normal file
View File

@@ -0,0 +1,77 @@
import asyncio
import datetime
import inspect
import json
import logging
import subprocess
from rich import print as pp
def runCmd(cmdString):
result = subprocess.run(cmdString, stdout=subprocess.PIPE, shell=True).stdout.decode('utf-8')
return result
def syncRun(coroutine):
loop = asyncio.get_event_loop()
# coroutine = funcInstace()
return loop.run_until_complete(coroutine)
def datetimeNow():
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def loadJson(fileanme):
try:
with open(fileanme, "r") as f:
secrets = json.load(f)
return secrets
except FileNotFoundError:
log(f"File not found: {fileanme}")
return None
def log2(message, *args):
frame = inspect.currentframe().f_back
class_name = frame.f_locals.get('self', '').__class__.__name__
func_name = frame.f_code.co_name
line_no = frame.f_lineno
caller = f"{class_name}::{func_name},{line_no}"
prefix = f"[{datetimeNow()}|{caller}]"
pp(f"{prefix} {message}", end='')
for arg in args:
pp(arg, end='')
pp("")
def log(message, *args):
log = logging.getLogger()
log.info(message)
def logD(message, *args):
log = logging.getLogger()
log.debug(message)
def logI(message, *args):
log = logging.getLogger()
log.info(message)
def logW(message, *args):
log = logging.getLogger()
log.warning(message)
def logE(message, *args):
log = logging.getLogger()
log.error(message)
def logC(message, *args):
log = logging.getLogger()
log.critical(message)

View File

@@ -0,0 +1,87 @@
import argparse
import time
import schedule
from prometheus_client import Gauge, start_http_server
from awinlib.PrometheusDataExporter import (
CpuFanTempDataExporter,
HddTempDataExporter,
KasaPowerStripDataExporter,
)
from awinlib.utilities import datetimeNow, loadJson, log
ENABLE_EXPORTER = True
DEFAULT_PORT = 8089
DEFAULT_POLLING_INTERVAL = 60
DEFAULT_SECRET_FILEPATH = "~/.secret"
DEFAULT_CONFIGS = {
"port": DEFAULT_PORT,
"polling_interval": DEFAULT_POLLING_INTERVAL,
"secret_filepath": DEFAULT_SECRET_FILEPATH
}
def job_5s(gaugeDict, exporters):
exportData(gaugeDict, exporters)
def job_60s(gaugeDict, exporters):
exportData(gaugeDict, exporters)
def exportData(gaugeDict, exporters):
for exporter in exporters:
data = exporter.export()
for d in data:
gaugeName = d["name"]
gaugeDesc = d["description"]
gaugeValue = d["value"]
log(f"Set {gaugeName}: {gaugeValue}")
if gaugeName not in gaugeDict:
gaugeDict[gaugeName] = Gauge(gaugeName, gaugeDesc)
if ENABLE_EXPORTER:
gaugeDict[gaugeName].set(gaugeValue)
def main(configs):
log(f"Start: {datetimeNow()}")
secrect = loadJson(configs.get("secret_filepath", DEFAULT_POLLING_INTERVAL))
if secrect is None:
log(f'Cannot read secret file({configs.get("secret_filepath", DEFAULT_POLLING_INTERVAL)})')
return
hddTempExporter = HddTempDataExporter()
cpuFanTempExporter = CpuFanTempDataExporter()
powerStripLivingroomExporter = KasaPowerStripDataExporter("livingroom", "192.168.1.203")
powerStripAtticExporter = KasaPowerStripDataExporter("attic", "192.168.1.203")
t315Atticexporter = TapoT315DataExporter(secrect["ip"], secrect["username"], secrect["password"], "鐵皮屋-溫濕度感測器", "attic-sensors")
gaugeDict = {}
schedule.every( 1).seconds.do(exportData, gaugeDict=gaugeDict, exporters=(hddTempExporter, cpuFanTempExporter, powerStripLivingroomExporter, powerStripAtticExporter))
schedule.every(5).seconds.do(exportData, gaugeDict=gaugeDict, exporters=(t315Atticexporter,))
## Start Prometheus server
prometheusServerPort = configs.get("port", DEFAULT_PORT)
log(f"Start Prometheus server on port {prometheusServerPort}")
start_http_server(prometheusServerPort) if ENABLE_EXPORTER else None
while True:
try:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
log("User stopped process.")
break
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--port", type=int, default=DEFAULT_PORT, help="The port of exportor")
parser.add_argument("-i", "--polling_interval", type=int, default=DEFAULT_POLLING_INTERVAL, help="Polling interval for collect HDD temperature")
parser.add_argument("-f", "--secrect_file", default=".secret", help="The file that contains the secrets")
args = parser.parse_args()
DEFAULT_CONFIGS["port"] = args.port
DEFAULT_CONFIGS["polling_interval"] = args.polling_interval
DEFAULT_CONFIGS["secret_filepath"] = args.secrect_file
main(DEFAULT_CONFIGS)