Add awinlib, kasa_exporter.py
This commit is contained in:
239
awinlib/KasaSmartPowerStrip.py
Normal file
239
awinlib/KasaSmartPowerStrip.py
Normal file
@@ -0,0 +1,239 @@
|
||||
import socket
|
||||
import json
|
||||
import struct
|
||||
from builtins import bytes
|
||||
|
||||
class SmartPowerStrip(object):
|
||||
|
||||
def __init__(self, ip, device_id=None, timeout=2.0, protocol='tcp'):
|
||||
self.ip = ip
|
||||
self.port = 9999
|
||||
self.protocol = protocol
|
||||
self.device_id = device_id
|
||||
self.sys_info = None
|
||||
self.timeout = timeout
|
||||
|
||||
self.sys_info = self.get_system_info()['system']['get_sysinfo']
|
||||
|
||||
if not self.device_id:
|
||||
self.device_id = self.sys_info['deviceId']
|
||||
|
||||
def set_wifi_credentials(self, ssid, psk, key_type='3'):
|
||||
'''
|
||||
:param ssid: router ssid
|
||||
:param psk: router passkey
|
||||
:param key_type: 3 is WPA2, 2 might be WPA and 1 might be WEP?
|
||||
:return: command response
|
||||
'''
|
||||
|
||||
wifi_command = '{"netif":{"set_stainfo":{"ssid":"' + ssid + '","password":"' + \
|
||||
psk + '","key_type":' + key_type + '}}}'
|
||||
|
||||
return self.send_command(wifi_command, self.protocol)
|
||||
|
||||
def set_cloud_server_url(self, server_url=''):
|
||||
|
||||
server_command = '{"cnCloud":{"set_server_url":{"server":"' + server_url + '"}}}'
|
||||
|
||||
return self.send_command(server_command, self.protocol)
|
||||
|
||||
def get_system_info(self):
|
||||
|
||||
return self._udp_send_command('{"system":{"get_sysinfo":{}}}')
|
||||
|
||||
def get_realtime_energy_info(self, plug_num=None, plug_name=None):
|
||||
|
||||
plug_id = self._get_plug_id(plug_num=plug_num, plug_name=plug_name)
|
||||
|
||||
energy_command = '{"context":{"child_ids":["' + plug_id + '"]},"emeter":{"get_realtime":{}}}'
|
||||
|
||||
response = self.send_command(energy_command, self.protocol)
|
||||
|
||||
realtime_energy_data = response['emeter']['get_realtime']
|
||||
|
||||
return realtime_energy_data
|
||||
|
||||
def get_historical_energy_info(self, month, year, plug_num=None, plug_name=None):
|
||||
|
||||
plug_id = self._get_plug_id(plug_num=plug_num, plug_name=plug_name)
|
||||
|
||||
energy_command = '{"context":{"child_ids":["' + plug_id + '"]},' + \
|
||||
'"emeter":{"get_daystat":{"month": ' + month + ',"year":' + year + '}}}'
|
||||
|
||||
response = self.send_command(energy_command, self.protocol)
|
||||
|
||||
historical_energy_data = response['emeter']['get_daystat']['day_list']
|
||||
|
||||
return historical_energy_data
|
||||
|
||||
def toggle_relay_leds(self, state):
|
||||
|
||||
state_int = self._get_plug_state_int(state, reverse=True)
|
||||
|
||||
led_command = '{"system":{"set_led_off":{"off":' + str(state_int) + '}}}'
|
||||
|
||||
return self.send_command(led_command, self.protocol)
|
||||
|
||||
def set_plug_name(self, plug_num, plug_name):
|
||||
|
||||
plug_id = self._get_plug_id(plug_num=plug_num)
|
||||
|
||||
set_name_command = '{"context":{"child_ids":["' + plug_id + \
|
||||
'"]},"system":{"set_dev_alias":{"alias":"' + plug_name + '"}}}'
|
||||
|
||||
return self.send_command(set_name_command, self.protocol)
|
||||
|
||||
def get_plug_info(self, plug_num):
|
||||
|
||||
target_plug = [plug for plug in self.sys_info['children'] if plug['id'] == str(int(plug_num)-1).zfill(2)]
|
||||
|
||||
return target_plug
|
||||
|
||||
# toggle multiple plugs by id or name
|
||||
def toggle_plugs(self, state, plug_num_list=None, plug_name_list=None):
|
||||
|
||||
state_int = self._get_plug_state_int(state)
|
||||
|
||||
plug_id_list_str = self._get_plug_id_list_str(plug_num_list=plug_num_list, plug_name_list=plug_name_list)
|
||||
|
||||
all_relay_command = '{"context":{"child_ids":' + plug_id_list_str + '},' + \
|
||||
'"system":{"set_relay_state":{"state":' + str(state_int) + '}}}'
|
||||
|
||||
return self.send_command(all_relay_command, self.protocol)
|
||||
|
||||
# toggle a single plug
|
||||
def toggle_plug(self, state, plug_num=None, plug_name=None):
|
||||
|
||||
state_int = self._get_plug_state_int(state)
|
||||
|
||||
plug_id = self._get_plug_id(plug_num=plug_num, plug_name=plug_name)
|
||||
|
||||
relay_command = '{"context":{"child_ids":["' + plug_id + '"]},' + \
|
||||
'"system":{"set_relay_state":{"state":' + str(state_int) + '}}}'
|
||||
|
||||
return self.send_command(relay_command, self.protocol)
|
||||
|
||||
def reboot(self, delay=1):
|
||||
reboot_command = '{"system":{"reboot":{"delay":' + str(delay) + '}}}'
|
||||
return self.send_command(reboot_command, self.protocol)
|
||||
|
||||
# manually send a command
|
||||
def send_command(self, command, protocol='tcp'):
|
||||
|
||||
if protocol == 'tcp':
|
||||
return self._tcp_send_command(command)
|
||||
elif protocol == 'udp':
|
||||
return self._udp_send_command(command)
|
||||
else:
|
||||
raise ValueError("Protocol must be 'tcp' or 'udp'")
|
||||
|
||||
def _get_plug_state_int(self, state, reverse=False):
|
||||
|
||||
if state.lower() == 'on':
|
||||
if reverse:
|
||||
state_int = 0
|
||||
else:
|
||||
state_int = 1
|
||||
elif state.lower() == 'off':
|
||||
if reverse:
|
||||
state_int = 1
|
||||
else:
|
||||
state_int = 0
|
||||
else:
|
||||
raise ValueError("Invalid state, must be 'on' or 'off'")
|
||||
|
||||
return state_int
|
||||
|
||||
# create a string with a list of plug_ids that can be inserted directly into a command
|
||||
def _get_plug_id_list_str(self, plug_num_list=None, plug_name_list=None):
|
||||
|
||||
plug_id_list = []
|
||||
|
||||
if plug_num_list:
|
||||
for plug_num in plug_num_list:
|
||||
|
||||
# add as str to remove the leading u
|
||||
plug_id_list.append(str(self._get_plug_id(plug_num=plug_num)))
|
||||
|
||||
elif plug_name_list:
|
||||
|
||||
for plug_name in plug_name_list:
|
||||
# add as str to remove the leading u
|
||||
plug_id_list.append(str(self._get_plug_id(plug_name=plug_name)))
|
||||
|
||||
# convert to double quotes and turn the whole list into a string
|
||||
plug_id_list_str = str(plug_id_list).replace("'", '"')
|
||||
|
||||
return plug_id_list_str
|
||||
|
||||
# get the plug child_id to be used with commands
|
||||
def _get_plug_id(self, plug_num=None, plug_name=None):
|
||||
|
||||
if plug_num and self.device_id:
|
||||
plug_id = self.device_id + str(plug_num-1).zfill(2)
|
||||
|
||||
elif plug_name and self.sys_info:
|
||||
target_plug = [plug for plug in self.sys_info['children'] if plug['alias'] == plug_name]
|
||||
if target_plug:
|
||||
plug_id = self.device_id + target_plug[0]['id']
|
||||
else:
|
||||
raise ValueError('Unable to find plug with name ' + plug_name)
|
||||
else:
|
||||
raise ValueError('Unable to find plug. Provide a valid plug_num or plug_name')
|
||||
|
||||
return plug_id
|
||||
|
||||
def _tcp_send_command(self, command):
|
||||
|
||||
sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock_tcp.settimeout(self.timeout)
|
||||
sock_tcp.connect((self.ip, self.port))
|
||||
|
||||
sock_tcp.send(self._encrypt_command(command))
|
||||
|
||||
data = sock_tcp.recv(2048)
|
||||
sock_tcp.close()
|
||||
|
||||
# the first 4 chars are the length of the command so can be excluded
|
||||
return json.loads(self._decrypt_command(data[4:]))
|
||||
|
||||
def _udp_send_command(self, command):
|
||||
|
||||
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
client_socket.settimeout(self.timeout)
|
||||
|
||||
addr = (self.ip, self.port)
|
||||
|
||||
client_socket.sendto(self._encrypt_command(command, prepend_length=False), addr)
|
||||
|
||||
data, server = client_socket.recvfrom(2048)
|
||||
|
||||
return json.loads(self._decrypt_command(data))
|
||||
|
||||
@staticmethod
|
||||
def _encrypt_command(string, prepend_length=True):
|
||||
|
||||
key = 171
|
||||
result = b''
|
||||
|
||||
# when sending get_sysinfo using udp the length of the command is not needed but
|
||||
# with all other commands using tcp it is
|
||||
if prepend_length:
|
||||
result = struct.pack(">I", len(string))
|
||||
|
||||
for i in bytes(string.encode('latin-1')):
|
||||
a = key ^ i
|
||||
key = a
|
||||
result += bytes([a])
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _decrypt_command(string):
|
||||
|
||||
key = 171
|
||||
result = b''
|
||||
for i in bytes(string):
|
||||
a = key ^ i
|
||||
key = i
|
||||
result += bytes([a])
|
||||
return result.decode('latin-1')
|
||||
Reference in New Issue
Block a user