diff --git a/awinlib/KasaSmartPowerStrip.py b/awinlib/KasaSmartPowerStrip.py new file mode 100644 index 0000000..661890d --- /dev/null +++ b/awinlib/KasaSmartPowerStrip.py @@ -0,0 +1,239 @@ +import socket +import json +import struct +from builtins import bytes + +class SmartPowerStrip(object): + + def __init__(self, ip, device_id=None, timeout=2.0, protocol='tcp'): + self.ip = ip + self.port = 9999 + self.protocol = protocol + self.device_id = device_id + self.sys_info = None + self.timeout = timeout + + self.sys_info = self.get_system_info()['system']['get_sysinfo'] + + if not self.device_id: + self.device_id = self.sys_info['deviceId'] + + def set_wifi_credentials(self, ssid, psk, key_type='3'): + ''' + :param ssid: router ssid + :param psk: router passkey + :param key_type: 3 is WPA2, 2 might be WPA and 1 might be WEP? + :return: command response + ''' + + wifi_command = '{"netif":{"set_stainfo":{"ssid":"' + ssid + '","password":"' + \ + psk + '","key_type":' + key_type + '}}}' + + return self.send_command(wifi_command, self.protocol) + + def set_cloud_server_url(self, server_url=''): + + server_command = '{"cnCloud":{"set_server_url":{"server":"' + server_url + '"}}}' + + return self.send_command(server_command, self.protocol) + + def get_system_info(self): + + return self._udp_send_command('{"system":{"get_sysinfo":{}}}') + + def get_realtime_energy_info(self, plug_num=None, plug_name=None): + + plug_id = self._get_plug_id(plug_num=plug_num, plug_name=plug_name) + + energy_command = '{"context":{"child_ids":["' + plug_id + '"]},"emeter":{"get_realtime":{}}}' + + response = self.send_command(energy_command, self.protocol) + + realtime_energy_data = response['emeter']['get_realtime'] + + return realtime_energy_data + + def get_historical_energy_info(self, month, year, plug_num=None, plug_name=None): + + plug_id = self._get_plug_id(plug_num=plug_num, plug_name=plug_name) + + energy_command = '{"context":{"child_ids":["' + plug_id + '"]},' + \ + '"emeter":{"get_daystat":{"month": ' + month + ',"year":' + year + '}}}' + + response = self.send_command(energy_command, self.protocol) + + historical_energy_data = response['emeter']['get_daystat']['day_list'] + + return historical_energy_data + + def toggle_relay_leds(self, state): + + state_int = self._get_plug_state_int(state, reverse=True) + + led_command = '{"system":{"set_led_off":{"off":' + str(state_int) + '}}}' + + return self.send_command(led_command, self.protocol) + + def set_plug_name(self, plug_num, plug_name): + + plug_id = self._get_plug_id(plug_num=plug_num) + + set_name_command = '{"context":{"child_ids":["' + plug_id + \ + '"]},"system":{"set_dev_alias":{"alias":"' + plug_name + '"}}}' + + return self.send_command(set_name_command, self.protocol) + + def get_plug_info(self, plug_num): + + target_plug = [plug for plug in self.sys_info['children'] if plug['id'] == str(int(plug_num)-1).zfill(2)] + + return target_plug + + # toggle multiple plugs by id or name + def toggle_plugs(self, state, plug_num_list=None, plug_name_list=None): + + state_int = self._get_plug_state_int(state) + + plug_id_list_str = self._get_plug_id_list_str(plug_num_list=plug_num_list, plug_name_list=plug_name_list) + + all_relay_command = '{"context":{"child_ids":' + plug_id_list_str + '},' + \ + '"system":{"set_relay_state":{"state":' + str(state_int) + '}}}' + + return self.send_command(all_relay_command, self.protocol) + + # toggle a single plug + def toggle_plug(self, state, plug_num=None, plug_name=None): + + state_int = self._get_plug_state_int(state) + + plug_id = self._get_plug_id(plug_num=plug_num, plug_name=plug_name) + + relay_command = '{"context":{"child_ids":["' + plug_id + '"]},' + \ + '"system":{"set_relay_state":{"state":' + str(state_int) + '}}}' + + return self.send_command(relay_command, self.protocol) + + def reboot(self, delay=1): + reboot_command = '{"system":{"reboot":{"delay":' + str(delay) + '}}}' + return self.send_command(reboot_command, self.protocol) + + # manually send a command + def send_command(self, command, protocol='tcp'): + + if protocol == 'tcp': + return self._tcp_send_command(command) + elif protocol == 'udp': + return self._udp_send_command(command) + else: + raise ValueError("Protocol must be 'tcp' or 'udp'") + + def _get_plug_state_int(self, state, reverse=False): + + if state.lower() == 'on': + if reverse: + state_int = 0 + else: + state_int = 1 + elif state.lower() == 'off': + if reverse: + state_int = 1 + else: + state_int = 0 + else: + raise ValueError("Invalid state, must be 'on' or 'off'") + + return state_int + + # create a string with a list of plug_ids that can be inserted directly into a command + def _get_plug_id_list_str(self, plug_num_list=None, plug_name_list=None): + + plug_id_list = [] + + if plug_num_list: + for plug_num in plug_num_list: + + # add as str to remove the leading u + plug_id_list.append(str(self._get_plug_id(plug_num=plug_num))) + + elif plug_name_list: + + for plug_name in plug_name_list: + # add as str to remove the leading u + plug_id_list.append(str(self._get_plug_id(plug_name=plug_name))) + + # convert to double quotes and turn the whole list into a string + plug_id_list_str = str(plug_id_list).replace("'", '"') + + return plug_id_list_str + + # get the plug child_id to be used with commands + def _get_plug_id(self, plug_num=None, plug_name=None): + + if plug_num and self.device_id: + plug_id = self.device_id + str(plug_num-1).zfill(2) + + elif plug_name and self.sys_info: + target_plug = [plug for plug in self.sys_info['children'] if plug['alias'] == plug_name] + if target_plug: + plug_id = self.device_id + target_plug[0]['id'] + else: + raise ValueError('Unable to find plug with name ' + plug_name) + else: + raise ValueError('Unable to find plug. Provide a valid plug_num or plug_name') + + return plug_id + + def _tcp_send_command(self, command): + + sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock_tcp.settimeout(self.timeout) + sock_tcp.connect((self.ip, self.port)) + + sock_tcp.send(self._encrypt_command(command)) + + data = sock_tcp.recv(2048) + sock_tcp.close() + + # the first 4 chars are the length of the command so can be excluded + return json.loads(self._decrypt_command(data[4:])) + + def _udp_send_command(self, command): + + client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + client_socket.settimeout(self.timeout) + + addr = (self.ip, self.port) + + client_socket.sendto(self._encrypt_command(command, prepend_length=False), addr) + + data, server = client_socket.recvfrom(2048) + + return json.loads(self._decrypt_command(data)) + + @staticmethod + def _encrypt_command(string, prepend_length=True): + + key = 171 + result = b'' + + # when sending get_sysinfo using udp the length of the command is not needed but + # with all other commands using tcp it is + if prepend_length: + result = struct.pack(">I", len(string)) + + for i in bytes(string.encode('latin-1')): + a = key ^ i + key = a + result += bytes([a]) + return result + + @staticmethod + def _decrypt_command(string): + + key = 171 + result = b'' + for i in bytes(string): + a = key ^ i + key = i + result += bytes([a]) + return result.decode('latin-1') diff --git a/kasa_exporter.py b/kasa_exporter.py new file mode 100755 index 0000000..2ed483e --- /dev/null +++ b/kasa_exporter.py @@ -0,0 +1,124 @@ +#!/usr/bin/python3 + +import time +import argparse +# import awinlib +# import awinlib.KasaSmartPowerStrip +import awinlib.KasaSmartPowerStrip as Kasa +from rich import print as pp +from prometheus_client import start_http_server, Gauge + + +ENABLE_EXPORTER = True +DEFAULT_PORT = 8088 +DEFAULT_POLLING_INTERVAL = 5 +DEFAULT_CONFIGS = { + "port": DEFAULT_PORT, + "polling_interval": DEFAULT_POLLING_INTERVAL +} + + +class KasaPowerStrip: + def __init__(self, areaName, ip): + self.areaName = areaName + self.ip = ip + try: + self.powerStrip = Kasa.SmartPowerStrip(ip) + except: + self.powerStrip = None + + def collectData(self): + dataDict = {} + devSysInfo = self.powerStrip.get_system_info() + for c in devSysInfo['system']['get_sysinfo']['children']: + try: + cid = int(c['id']) + 1 + except: + continue + + # aliasName = c['alias'].encode('latin1').decode('UTF-8') + gaugeName_Current_ma = f"{self.areaName}_{cid}_current_ma" + gaugeName_Current_ma_desc = f"Current(milliampere) of {self.areaName}:{cid}" + gaugeName_Voltage_mv = f"{self.areaName}_{cid}_voltage_mv" + gaugeName_Voltage_mv_desc = f"Voltage(millivolt) of {self.areaName}:{cid}" + gaugeName_Power_mw = f"{self.areaName}_{cid}_power_mw" + gaugeName_Power_mw_desc = f"Power(milliwatt) of {self.areaName}:{cid}" + gaugeName_TotalWh = f"{self.areaName}_{cid}_total_wh" + gaugeName_TotalWh_desc = f"Total watt-hour of {self.areaName}:{cid}" + + ## Get Data + realtimeInfo = self.powerStrip.get_realtime_energy_info(plug_num=cid) + realTime_current_ma = realtimeInfo.get("current_ma", 0) + realTime_voltage_mv = realtimeInfo.get("voltage_mv", 0) + realTime_power_mw = realtimeInfo.get("power_mw", 0) + realTime_total_wh = realtimeInfo.get("total_wh", 0) + + pp(f"Set {gaugeName_Current_ma}: {realTime_current_ma}") + pp(f"Set {gaugeName_Voltage_mv}: {realTime_voltage_mv}") + pp(f"Set {gaugeName_Power_mw}: {realTime_power_mw}") + pp(f"Set {gaugeName_TotalWh}: {realTime_total_wh}") + + dataDict[cid] = { + gaugeName_Current_ma: { + "description": gaugeName_Current_ma_desc, + "value": realTime_current_ma, + }, + gaugeName_Voltage_mv: { + "description": gaugeName_Voltage_mv_desc, + "value": realTime_voltage_mv, + }, + gaugeName_Power_mw: { + "description": gaugeName_Power_mw_desc, + "value": realTime_power_mw, + }, + gaugeName_TotalWh: { + "description": gaugeName_TotalWh_desc, + "value": realTime_total_wh, + } + } + return dataDict + + +def main(configs): + time.sleep(10) + gaugeDict = {} + pollingInterval = configs.get("polling_interval", DEFAULT_POLLING_INTERVAL) + start_http_server(configs.get("port", DEFAULT_PORT)) if ENABLE_EXPORTER else None + + powerStripList = [ + KasaPowerStrip("livingroom", "192.168.1.203"), + KasaPowerStrip("attic", "192.168.1.203") + ] + + while True: + try: + # collectData(gaugeDict) + for powerStrip in powerStripList: + powerStripData = powerStrip.collectData() + for keyId in sorted(powerStripData.keys()): + for gaugeName in powerStripData[keyId].keys(): + gaugeDesc = powerStripData[keyId][gaugeName]["description"] + gaugeValue = powerStripData[keyId][gaugeName]["value"] + + if gaugeName not in gaugeDict: + gaugeDict[gaugeName] = Gauge(gaugeName, gaugeDesc) + + if ENABLE_EXPORTER: + gaugeDict[gaugeName].set(gaugeValue) + + pp(f"Set {gaugeName}: {gaugeValue}") + + time.sleep(pollingInterval) + except KeyboardInterrupt: + print("\nUser stopped process.") + break + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("-p", "--port", type=int, default=DEFAULT_PORT, help="The port of exportor") + parser.add_argument("-i", "--polling_interval", default=DEFAULT_POLLING_INTERVAL, help="Polling interval for collect HDD temperature") + args = parser.parse_args() + DEFAULT_CONFIGS["port"] = args.port + DEFAULT_CONFIGS["polling_interval"] = args.polling_interval + main(DEFAULT_CONFIGS)