#!/usr/bin/python3 import time import argparse import subprocess import socket import cpufantemp from prometheus_client import start_http_server, Gauge DEFAULT_ERROR_HDD_TEMP = -273 DEFAULT_PORT = 8087 DEFAULT_POLLING_INTERVAL = 5 DEFAULT_CONFIGS = { "port": DEFAULT_PORT, "polling_interval": DEFAULT_POLLING_INTERVAL } def runCmd(cmdString): result = subprocess.run(cmdString, stdout=subprocess.PIPE, shell=True).stdout.decode('utf-8') return result def findHddId(): hddDict = {} results = runCmd("blkid").split("\n") for line in results: if line == "": continue hddInfoList = line.split(" ") if len(hddInfoList) > 1 and hddInfoList[0].startswith("/dev/sd"): hddName = hddInfoList[0].replace(":", "") hddDict[hddName] = {} for hddInfoline in hddInfoList[1:]: kvList = hddInfoline.split("=") if len(kvList) > 1: hddDict[hddName][kvList[0].replace("\"", "")] = kvList[1].replace("\"", "") return hddDict def findHddTemp(hddDict): for hddName in sorted(hddDict.keys()): cmd = f"smartctl -A {hddName} | grep Temperature_Celsius" hddTempString = runCmd(cmd) hddTempList = hddTempString.split(" ") hddTempList = [item for item in hddTempList if item != ""] if len(hddTempList) > 9: hddDict[hddName]["temp"] = int(hddTempList[9]) def getHddTemp(): hddDict = findHddId() findHddTemp(hddDict) return hddDict def collectHddTemp(gaugeDict): hostname = socket.gethostname() hddTempDict = getHddTemp() for hddKey in sorted(hddTempDict.keys()): hddName = hddKey.replace("/dev/", "") uuid = hddTempDict[hddKey].get("UUID", "").replace("-", "_") hddType = hddTempDict[hddKey].get("TYPE", "") partUuid = hddTempDict[hddKey].get("PARTUUID", "") hddTemperature = int(hddTempDict[hddKey].get("temp", DEFAULT_ERROR_HDD_TEMP)) gaugeName = f"{hddName}_{hddType}_celsius" gaugeDescription = f"HDD temperature of {gaugeName}" ## If gauge doesn't exist, create it gaugeObj = gaugeDict.get(gaugeName, None) if gaugeObj is None: gaugeObj = Gauge(gaugeName, gaugeDescription) gaugeDict[gaugeName] = gaugeObj gaugeObj.set(hddTemperature) print(f"{gaugeName} {hddTemperature}°C") print("") def collectFanSpeed(gaugeDict): fanTempDict = cpufantemp.getCpuTempAndFanSpeed() if "fan" not in fanTempDict: return for fanKey in sorted(fanTempDict["fan"].keys()): fanSpeed = fanTempDict["fan"][fanKey] gaugeName = f"{fanKey}_rpm" gaugeDescription = f"Fan RPM of {gaugeName}" ## If gauge doesn't exist, create it gaugeObj = gaugeDict.get(gaugeName, None) if gaugeObj is None: gaugeObj = Gauge(gaugeName, gaugeDescription) gaugeDict[gaugeName] = gaugeObj gaugeObj.set(fanSpeed) print(f"{gaugeName}{fanSpeed:>5} RPM") print("") def main(configs): gaugeDict = {} pollingInterval = configs.get("polling_interval", DEFAULT_POLLING_INTERVAL) start_http_server(configs.get("port", DEFAULT_PORT)) while True: try: collectHddTemp(gaugeDict) collectFanSpeed(gaugeDict) time.sleep(pollingInterval) except KeyboardInterrupt: print("\nUser stopped process.") break if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("-p", "--port", type=int, default=DEFAULT_PORT, help="The port of exportor") parser.add_argument("-i", "--polling_interval", default=DEFAULT_POLLING_INTERVAL, help="Polling interval for collect HDD temperature") args = parser.parse_args() DEFAULT_CONFIGS["port"] = args.port DEFAULT_CONFIGS["polling_interval"] = args.polling_interval main(DEFAULT_CONFIGS)