Source code for festivalgrid.management.commands.festivalgrid_worker

import sys
import traceback

import json
import requests
import humanize
import schedule
import dateutil.parser

from datetime import datetime, timezone

from django.core.management.base import BaseCommand
from django.conf import settings

from influxdb import InfluxDBClient

from ...models import Node, Timer, Device
from ...api.serializers import NodeSerializer
from ...device_templates.mqtt_device import init_mqtt


# MIGRATE_LABEL
# ERROR
# NOTICE
# SUCCESS
# HTTP_NOT_MODIFIED
# MIGRATE_HEADING


[docs]def broadcast_gotify_message(title, message, node, payload): return if "title" not in payload: payload['title'] = title if "message" not in payload: payload['message'] = message if "node" not in payload: payload['node'] = NodeSerializer(node).to_representation(node) data = { "priority": 0, "title": title, "message": message, "extras": { "client::display": { "contentType": "text/markdown" }, "android::action": { "onReceive": { "intentUrl": "festivalgrid://" + json.dumps(payload) } } }, } return requests.post("%s:%d/message?token=%s" % (settings.GOTIFY_HOST, int(settings.GOTIFY_PORT), settings.GOTIFY_TOKEN), json=data)
[docs]class Command(BaseCommand): help = 'FestivalGrid worker' node_last_message = {} influx_client = None nodes_template = {}
[docs] def broadcast_message(self, id, title, message, node=None, payload={}): if node: if node in self.node_last_message and self.node_last_message[node] == id: return self.node_last_message[node] = id broadcast_gotify_message(title, message, node, payload)
[docs] def monitor_nodes(self): nodes = Node.objects.filter(device__isnull=False).order_by('name') now = datetime.now(timezone.utc) self.stdout.write("\n---------------------- [%s] ----------------------\n\n" % now.strftime("%H:%M:%S")) self.stdout.write(self.style.MIGRATE_LABEL(" name power timestamp stale status\n")) for node in nodes: sql = 'SELECT last("Power") FROM "%d"' % node.pk points = self.influx_client.query(sql).get_points() for point in points: power = int(point["last"]) timestamp = dateutil.parser.parse(point["time"]) timedelta = now - timestamp log = ('OK', self.style.SUCCESS) if node.cut_power: log = ("POWER OFF", self.style.ERROR) else: if power > node.shutdown_level: log = ("EMERGENCY SHUTDOWN", self.style.ERROR) node.cut_power = True node.save() self.broadcast_message( log[0], "Emergency Shutdown: %s" % node, "%s has been shutdown because power consumption exceeded %d W (was %d W)." % (node, node.shutdown_level, power), node, {"color": "#ffaaaa"} ) elif power > node.warning_level: log = ("POWER LEVEL WARNING", self.style.WARNING) self.broadcast_message( log[0], "Warning: %s @ %d W" % (node, power), "Warning level: %d W | Shutdown level: %d W" % (node.warning_level, node.shutdown_level), node, {"color": "#ffcc6c"} ) if timedelta.total_seconds() > 60: log = ("STALE", self.style.WARNING) self.broadcast_message( log[0], "Stale: %s" % node, "%s has not sent and updates for %s." % (node.name, humanize.naturaldelta(timedelta)), node, {"color": "#ffcc6c"} ) self.stdout.write(log[1](" %s %5s W %s %5s s %s" % ( node.name.ljust(36), power, timestamp.strftime("%H:%M:%S"), int(timedelta.total_seconds()), log[0] )))
[docs] def poll_devices(self): for node in Node.objects.filter(device__template__contains="demo"): # ".polling" try: data = node.device.get_template().handle_polling(node) if data: self.stdout.write(" data: %s" % data) self.influx_client.write_points([data]) except Device.DoesNotExist: pass
[docs] def timer_action(self, timer): self.stdout.write('running timer action!') if timer.action == "TOGGLE": timer.node.cut_power = not timer.node.cut_power else: timer.node.cut_power = timer.action == 'OFF' timer.node.save()
[docs] def schedule_timers(self): schedule.clear('timers') self.stdout.write("(re)scheduling...") for timer in Timer.objects.filter(enabled=True): self.stdout.write(" %s %s" % (timer.name, timer.time)) schedule.every().day.at(str(timer.time)).do(self.timer_action, timer).tag('timers')
[docs] def cache_node_templates(self): self.nodes_template = {} for node in Node.objects.filter(device__mqtt_topic__isnull=False): try: self.nodes_template[node.device.mqtt_topic] = node.device.get_template() except Device.DoesNotExist: pass
[docs] def on_message(self, mosq, userdata, msg): self.stdout.write(self.style.MIGRATE_LABEL('\ntopic: %s' % msg.topic)) self.stdout.write(' payload: %s' % msg.payload) if "setup_changed" in msg.topic: self.stdout.write("\n Setup changed. Re-caching node templates...") self.cache_node_templates() self.schedule_timers() return influx_name = msg.topic.split('/')[1] if influx_name in self.nodes_template: try: data = self.nodes_template[influx_name].digest_message(msg.topic, msg.payload) if data: self.stdout.write(" data: %s" % data) self.influx_client.write_points([data]) except: traceback.print_exc()
[docs] def on_connect(self, client, userdata, flags, result_code): self.stdout.write("connected to %s:%d" % (settings.MQTT_HOST, settings.MQTT_PORT)) client.subscribe('festivalgrid/+/setup_changed', 2) for mqtt_topic in self.nodes_template.keys(): topic = 'festivalgrid/%s/#' % mqtt_topic self.stdout.write(' subscribing to: %s' % topic) client.subscribe(topic, 2)
[docs] def on_disconnect(self, mosq, userdata, rc): sys.exit()
[docs] def handle(self, *args, **options): self.cache_node_templates() self.influx_client = InfluxDBClient(settings.INFLUXDB_HOST, settings.INFLUXDB_PORT, settings.INFLUXDB_USERNAME, settings.INFLUXDB_PASSWORD, timeout=1, retries=0) self.influx_client.ping() self.stdout.write("connected to influxdb.") self.influx_client.create_database(settings.INFLUXDB_DATABASE_NAME) self.influx_client.switch_database(settings.INFLUXDB_DATABASE_NAME) schedule.every(10).seconds.do(self.monitor_nodes) schedule.every(10).seconds.do(self.poll_devices) schedule.run_all() self.schedule_timers() mqtt_client = init_mqtt('festivalgrid-worker', self.on_message, self.on_connect, self.on_disconnect) while True: mqtt_client.loop(timeout=1) schedule.run_pending()