import sys
import traceback
import json
import requests
import humanize
import schedule
import dateutil.parser
from datetime import datetime, timezone
from django.core.management.base import BaseCommand
from django.conf import settings
from influxdb import InfluxDBClient
from ...models import Node, Timer, Device
from ...api.serializers import NodeSerializer
from ...device_templates.mqtt_device import init_mqtt
# MIGRATE_LABEL
# ERROR
# NOTICE
# SUCCESS
# HTTP_NOT_MODIFIED
# MIGRATE_HEADING
[docs]def broadcast_gotify_message(title, message, node, payload):
return
if "title" not in payload:
payload['title'] = title
if "message" not in payload:
payload['message'] = message
if "node" not in payload:
payload['node'] = NodeSerializer(node).to_representation(node)
data = {
"priority": 0,
"title": title,
"message": message,
"extras": {
"client::display": {
"contentType": "text/markdown"
},
"android::action": {
"onReceive": {
"intentUrl": "festivalgrid://" + json.dumps(payload)
}
}
},
}
return requests.post("%s:%d/message?token=%s" % (settings.GOTIFY_HOST, int(settings.GOTIFY_PORT), settings.GOTIFY_TOKEN), json=data)
[docs]class Command(BaseCommand):
help = 'FestivalGrid worker'
node_last_message = {}
influx_client = None
nodes_template = {}
[docs] def broadcast_message(self, id, title, message, node=None, payload={}):
if node:
if node in self.node_last_message and self.node_last_message[node] == id:
return
self.node_last_message[node] = id
broadcast_gotify_message(title, message, node, payload)
[docs] def monitor_nodes(self):
nodes = Node.objects.filter(device__isnull=False).order_by('name')
now = datetime.now(timezone.utc)
self.stdout.write("\n---------------------- [%s] ----------------------\n\n" % now.strftime("%H:%M:%S"))
self.stdout.write(self.style.MIGRATE_LABEL(" name power timestamp stale status\n"))
for node in nodes:
sql = 'SELECT last("Power") FROM "%d"' % node.pk
points = self.influx_client.query(sql).get_points()
for point in points:
power = int(point["last"])
timestamp = dateutil.parser.parse(point["time"])
timedelta = now - timestamp
log = ('OK', self.style.SUCCESS)
if node.cut_power:
log = ("POWER OFF", self.style.ERROR)
else:
if power > node.shutdown_level:
log = ("EMERGENCY SHUTDOWN", self.style.ERROR)
node.cut_power = True
node.save()
self.broadcast_message(
log[0],
"Emergency Shutdown: %s" % node,
"%s has been shutdown because power consumption exceeded %d W (was %d W)." % (node, node.shutdown_level, power),
node,
{"color": "#ffaaaa"}
)
elif power > node.warning_level:
log = ("POWER LEVEL WARNING", self.style.WARNING)
self.broadcast_message(
log[0],
"Warning: %s @ %d W" % (node, power),
"Warning level: %d W | Shutdown level: %d W" % (node.warning_level, node.shutdown_level),
node,
{"color": "#ffcc6c"}
)
if timedelta.total_seconds() > 60:
log = ("STALE", self.style.WARNING)
self.broadcast_message(
log[0],
"Stale: %s" % node,
"%s has not sent and updates for %s." % (node.name, humanize.naturaldelta(timedelta)),
node,
{"color": "#ffcc6c"}
)
self.stdout.write(log[1](" %s %5s W %s %5s s %s" % (
node.name.ljust(36),
power,
timestamp.strftime("%H:%M:%S"),
int(timedelta.total_seconds()),
log[0]
)))
[docs] def poll_devices(self):
for node in Node.objects.filter(device__template__contains="demo"): # ".polling"
try:
data = node.device.get_template().handle_polling(node)
if data:
self.stdout.write(" data: %s" % data)
self.influx_client.write_points([data])
except Device.DoesNotExist:
pass
[docs] def timer_action(self, timer):
self.stdout.write('running timer action!')
if timer.action == "TOGGLE":
timer.node.cut_power = not timer.node.cut_power
else:
timer.node.cut_power = timer.action == 'OFF'
timer.node.save()
[docs] def schedule_timers(self):
schedule.clear('timers')
self.stdout.write("(re)scheduling...")
for timer in Timer.objects.filter(enabled=True):
self.stdout.write(" %s %s" % (timer.name, timer.time))
schedule.every().day.at(str(timer.time)).do(self.timer_action, timer).tag('timers')
[docs] def cache_node_templates(self):
self.nodes_template = {}
for node in Node.objects.filter(device__mqtt_topic__isnull=False):
try:
self.nodes_template[node.device.mqtt_topic] = node.device.get_template()
except Device.DoesNotExist:
pass
[docs] def on_message(self, mosq, userdata, msg):
self.stdout.write(self.style.MIGRATE_LABEL('\ntopic: %s' % msg.topic))
self.stdout.write(' payload: %s' % msg.payload)
if "setup_changed" in msg.topic:
self.stdout.write("\n Setup changed. Re-caching node templates...")
self.cache_node_templates()
self.schedule_timers()
return
influx_name = msg.topic.split('/')[1]
if influx_name in self.nodes_template:
try:
data = self.nodes_template[influx_name].digest_message(msg.topic, msg.payload)
if data:
self.stdout.write(" data: %s" % data)
self.influx_client.write_points([data])
except:
traceback.print_exc()
[docs] def on_connect(self, client, userdata, flags, result_code):
self.stdout.write("connected to %s:%d" % (settings.MQTT_HOST, settings.MQTT_PORT))
client.subscribe('festivalgrid/+/setup_changed', 2)
for mqtt_topic in self.nodes_template.keys():
topic = 'festivalgrid/%s/#' % mqtt_topic
self.stdout.write(' subscribing to: %s' % topic)
client.subscribe(topic, 2)
[docs] def on_disconnect(self, mosq, userdata, rc):
sys.exit()
[docs] def handle(self, *args, **options):
self.cache_node_templates()
self.influx_client = InfluxDBClient(settings.INFLUXDB_HOST, settings.INFLUXDB_PORT, settings.INFLUXDB_USERNAME, settings.INFLUXDB_PASSWORD, timeout=1, retries=0)
self.influx_client.ping()
self.stdout.write("connected to influxdb.")
self.influx_client.create_database(settings.INFLUXDB_DATABASE_NAME)
self.influx_client.switch_database(settings.INFLUXDB_DATABASE_NAME)
schedule.every(10).seconds.do(self.monitor_nodes)
schedule.every(10).seconds.do(self.poll_devices)
schedule.run_all()
self.schedule_timers()
mqtt_client = init_mqtt('festivalgrid-worker', self.on_message, self.on_connect, self.on_disconnect)
while True:
mqtt_client.loop(timeout=1)
schedule.run_pending()