import argparse import time import schedule from prometheus_client import Gauge, start_http_server from awinlib.PrometheusDataExporter import ( CpuFanTempDataExporter, HddTempDataExporter, KasaPowerStripDataExporter, TapoT315DataExporter, ) from awinlib.utilities import datetimeNow, loadJson, log ENABLE_EXPORTER = True DEFAULT_PORT = 8087 DEFAULT_POLLING_INTERVAL = 60 DEFAULT_SECRET_FILEPATH = "~/.secret" DEFAULT_CONFIGS = { "port": DEFAULT_PORT, "polling_interval": DEFAULT_POLLING_INTERVAL, "secret_filepath": DEFAULT_SECRET_FILEPATH } def job_5s(gaugeDict, exporters): exportData(gaugeDict, exporters) def job_60s(gaugeDict, exporters): exportData(gaugeDict, exporters) def exportData(gaugeDict, exporters): for exporter in exporters: data = exporter.export() for d in data: gaugeName = d["name"] gaugeDesc = d["description"] gaugeValue = d["value"] log(f"Set {gaugeName}: {gaugeValue}") if gaugeName not in gaugeDict: gaugeDict[gaugeName] = Gauge(gaugeName, gaugeDesc) if ENABLE_EXPORTER: gaugeDict[gaugeName].set(gaugeValue) def main(configs): log(f"Start: {datetimeNow()}") secrect = loadJson(configs.get("secret_filepath", DEFAULT_POLLING_INTERVAL)) if secrect is None: log(f'Cannot read secret file({configs.get("secret_filepath", DEFAULT_POLLING_INTERVAL)})') return hddTempExporter = HddTempDataExporter() cpuFanTempExporter = CpuFanTempDataExporter() powerStripLivingroomExporter = KasaPowerStripDataExporter("livingroom", "192.168.1.203") powerStripAtticExporter = KasaPowerStripDataExporter("attic", "192.168.1.203") t315Atticexporter = TapoT315DataExporter(secrect["ip"], secrect["username"], secrect["password"], "鐵皮屋-溫濕度感測器", "attic-sensors") gaugeDict = {} schedule.every( 5).seconds.do(exportData, gaugeDict=gaugeDict, exporters=(hddTempExporter, cpuFanTempExporter, powerStripLivingroomExporter, powerStripAtticExporter)) schedule.every(60).seconds.do(exportData, gaugeDict=gaugeDict, exporters=(t315Atticexporter,)) ## Start Prometheus server prometheusServerPort = configs.get("port", DEFAULT_PORT) log(f"Start Prometheus server on port {prometheusServerPort}") start_http_server(prometheusServerPort) if ENABLE_EXPORTER else None while True: try: schedule.run_pending() time.sleep(1) except KeyboardInterrupt: log("User stopped process.") break if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("-p", "--port", type=int, default=DEFAULT_PORT, help="The port of exportor") parser.add_argument("-i", "--polling_interval", type=int, default=DEFAULT_POLLING_INTERVAL, help="Polling interval for collect HDD temperature") parser.add_argument("-f", "--secrect_file", default=".secret", help="The file that contains the secrets") args = parser.parse_args() DEFAULT_CONFIGS["port"] = args.port DEFAULT_CONFIGS["polling_interval"] = args.polling_interval DEFAULT_CONFIGS["secret_filepath"] = args.secrect_file main(DEFAULT_CONFIGS)