Source code for festivalgrid.device_templates.tasmota
import json
import urllib
import urllib.error
import urllib.request
from django.conf import settings
from .mqtt_device import MQTTDevice
[docs]class Tasmota(MQTTDevice):
sensor_keys = ['Power', 'ApparentPower', 'ReactivePower', 'Factor', 'Voltage', 'Current']
state_keys = ['Uptime', 'Heap', 'POWER']
state_wifi_keys = ['AP', 'SSId', 'BSSId', 'Channel', 'RSSI', 'Signal', 'LinkCount', 'Downtime']
[docs] def get_device_admin_link(self):
return {'title': 'Tasmota Web UI', 'url': 'http://' + self.device.ip}
[docs] def set_cut_power(self, state):
self.send_mqtt_message('cmnd/POWER', 'OFF' if state is True else 'ON')
[docs] def digest_message(self, topic, payload):
try:
payload = json.loads(payload.decode("utf-8"))
except json.decoder.JSONDecodeError:
return
measurement = self.device.node.pk
if topic.endswith('SENSOR'):
return {
"measurement": measurement,
"fields": {your_key: payload["ENERGY"][your_key] for your_key in self.sensor_keys}
# "time": dateutil.parser.parse(payload["Time"]).strftime('%Y-%m-%dT%H:%M:%SZ'),
# "tags": {},
}
elif topic.endswith('STATE'):
state_fields = {your_key: payload[your_key] for your_key in self.state_keys}
wifi_fields = {your_key: payload["Wifi"][your_key] for your_key in self.state_wifi_keys}
return {
"measurement": measurement,
"fields": {**state_fields, **wifi_fields}
}
elif topic.endswith('RESULT'):
return {
"measurement": measurement,
"fields": {"on": payload['POWER'] == "ON"}
}
[docs] @staticmethod
def provision(ip, stdout):
url = 'http://' + str(ip) + '/cm?cmnd=status%200'
with urllib.request.urlopen(url, timeout=10) as request:
status = json.loads(request.read())
tid = int(status["StatusNET"]["Mac"].replace(":", "")[-3:], 16)
commands = [
'WifiConfig 5',
'TelePeriod 10',
'Hostname tasmota-festivalgrid-%04d' % tid,
'MqttHost %s' % settings.MQTT_HOST,
'MqttPort %d' % settings.MQTT_PORT,
'Topic tasmota-festivalgrid-%04d' % tid,
'FullTopic festivalgrid/%topic%/%prefix%/',
'FriendlyName Tasmota FestivalGrid %04d' % tid
]
backlog = "Backlog " + ('; '.join(commands))
try:
url = 'http://%s/cm?%s' % (ip, urllib.parse.urlencode({'cmnd': backlog}))
with urllib.request.urlopen(url) as request:
pass
except urllib.error.URLError:
stdout.write("url error")
TEMPLATE_CLASS = Tasmota