#!/usr/bin/env python3 ##################################################################### # # P0188 # # World domination panel # # This software checks status of systems (Ping/Website/MQTT) # and writes status out to serial interface for # RGB-LED-stripe # ##################################################################### # # 2024-09-08 xsider version for public # 2024-08-28 xsider code cleanup # 2024-08-21 xsider created # ##################################################################### from http.server import HTTPServer, BaseHTTPRequestHandler #Python’s built-in library import os import time from threading import Thread import serial import queue import paho.mqtt.client as mqtt from enum import Enum,auto from ping3 import ping hostName = "0.0.0.0" serverPort = 8082 #You can choose any available port; by default, it is 8000 DISPTTY="/dev/ttyACM0" DISPBAUD=115200 MQTTHOST="sigint5.internal" MQTTPORT=1883 brightness = 0 # this is a replacement for enum BLACK = "BLACK" RED = "RED" GREEN = "GREEN" BLUE = "BLUE" YELLOW = "YELLOW" WHITE = "WHITE" NOBLINK = False BLINK = True colordef = { BLACK : { "html" : "000000", "rgb" : b";0;0;0" }, RED : { "html" : "FF0000", "rgb" : b";255;0;0" }, GREEN : { "html" : "00FF00", "rgb" : b";0;255;0" }, BLUE : { "html" : "7070FF", "rgb": b";0;0;255" }, YELLOW : { "html" : "FFFF00", "rgb" : b";255;255;0" }, WHITE : { "html" : "FFFFFF", "rgb" : b";255;255;255" } } class AlarmTyp(Enum): RUHE = auto() TEST = auto() q = queue.Queue() hostlist = [ [0,"192.168.0.1","Router
AP EG",WHITE,NOBLINK,RED,NOBLINK], [1,"192.168.0.2","Blackfin
Server",WHITE,NOBLINK,RED,BLINK], [2,"192.168.0.3","Coreswitch",WHITE,NOBLINK,RED,BLINK], [3,"192.168.0.4","Odin
Fileserver",WHITE,NOBLINK,RED,NOBLINK], [4,"192.168.0.5","Thor
Mailserver", WHITE, NOBLINK, RED, NOBLINK], [5,"192.168.0.6","Tyr
Webserver", WHITE, NOBLINK, RED, NOBLINK], [6,"192.168.0.7","Frigg
Loginserver", WHITE, NOBLINK, RED, NOBLINK], [7,"192.168.0.8","Fulla
Lizenzserver", WHITE, NOBLINK, BLACK, NOBLINK], [8,"192.168.0.9","Observer 1", WHITE, NOBLINK, RED, NOBLINK], [9,"192.168.0.10","Sigint 5", WHITE, NOBLINK, RED, NOBLINK], [10,"192.168.0.11","AP4
1.OG Nord", WHITE, NOBLINK, BLACK, NOBLINK], [11,"192.168.0.12","AP7
1.OG SÜd", WHITE, NOBLINK, BLACK, NOBLINK], [12,"192.168.0.13","AP5
2.OG", WHITE, NOBLINK, BLACK, NOBLINK], [13,"192.168.0.14","Bridge
3.OG", WHITE, NOBLINK, BLACK, NOBLINK], [14,"192.168.0.15","AP3
Labor", WHITE, NOBLINK, BLACK, NOBLINK], [15,"192.168.0.16","Blitzortung", WHITE, NOBLINK, RED, NOBLINK], [16,"192.168.0.17","urad
Monitor", WHITE, NOBLINK, RED, NOBLINK], [17,"192.168.0.18","Feinstaub
Sensor", WHITE, NOBLINK, RED, NOBLINK], [18,"192.168.0.19","Kiwi
SDR",WHITE , NOBLINK, BLACK, NOBLINK], [19,"192.168.0.20","LoRaWAN
Gateway", WHITE, NOBLINK, RED, NOBLINK], [20,"192.168.0.21","APC1", WHITE, NOBLINK, BLACK, NOBLINK], [21,"192.168.0.22","APC2", WHITE, NOBLINK, BLACK, NOBLINK], [22,"192.168.0.23","APC3", WHITE, NOBLINK, BLACK, NOBLINK], [23,"192.168.0.24","Laborraum 4", WHITE, NOBLINK, BLACK, NOBLINK], [24,"192.168.0.25","Kamera 2", WHITE, NOBLINK, BLACK, NOBLINK], [25,"192.168.0.26","Kamera 4", WHITE, NOBLINK, BLACK, NOBLINK], [32,"192.168.0.33","Hamclock", WHITE, NOBLINK, BLACK, NOBLINK], [33,"192.168.0.34","Manik
Backup", WHITE, NOBLINK, RED, NOBLINK], [34,"192.168.0.35","Automatix
CNC-Fräse", WHITE, NOBLINK, BLACK, NOBLINK], [35,"192.168.0.36","Indra
Solarmess", WHITE, NOBLINK, BLACK, NOBLINK], [36,"192.168.0.37","Shiva
Bat.umschaltung", WHITE, NOBLINK, BLACK, NOBLINK], [38,"192.168.0.39","Redpitaya", WHITE, NOBLINK, BLACK, NOBLINK], [39,"192.168.0.40","Rigol
Oszi", WHITE, NOBLINK, BLACK, NOBLINK], [40,"192.168.0.41","Rigol
Spektrum", WHITE, NOBLINK, BLACK, NOBLINK], [41,"192.168.0.42","Rigol
DMM", WHITE, NOBLINK, BLACK, NOBLINK], [42,"192.168.0.43","Rigol
Arb.gen", WHITE, NOBLINK, BLACK, NOBLINK], [43,"192.168.0.44","Victron
Gateway", WHITE, NOBLINK, BLACK, NOBLINK], [44,"192.168.0.45","Roboterarm", WHITE, NOBLINK, BLACK, NOBLINK], [45,"192.168.0.46","Unmatched
RISC V", WHITE, NOBLINK, BLACK, NOBLINK], [46,"192.168.0.47","Drucker
A4 Labor", WHITE, NOBLINK, BLACK, NOBLINK], [47,"192.168.0.48","Drucker
A3 Labor", WHITE, NOBLINK, BLACK, NOBLINK], [48,"192.168.0.49","DSKY", WHITE, NOBLINK, BLACK, NOBLINK], [49,"192.168.0.50","Oc 5", WHITE, NOBLINK, BLACK, NOBLINK], [60,"192.168.0.61","Arbeitsplatz
EG", WHITE, NOBLINK, BLACK, NOBLINK], [61,"192.168.0.62","Drucker
EG", WHITE, NOBLINK, BLACK, NOBLINK], [62,"192.168.0.63","Inferno
1.OG", WHITE, NOBLINK, BLACK, NOBLINK], [63,"192.168.0.64","Drucker
1.OG", WHITE, NOBLINK, BLACK, NOBLINK], [64,"192.168.0.65","Eb
Wetterstation", WHITE, NOBLINK, BLACK, NOBLINK], [65,"192.168.0.66","Sigint 2
NOAA Sat.", WHITE, NOBLINK, BLACK, NOBLINK], [66,"192.168.0.67","Weishaupt
Heizung", WHITE, NOBLINK, BLACK, NOBLINK], [67,"192.168.0.68","Wago 1
SPS", WHITE, NOBLINK, BLACK, NOBLINK], [68,"192.168.0.69","VUZero", WHITE, NOBLINK, BLACK, NOBLINK], [69,"192.168.0.70","Vishnu
Thermodrucker", WHITE, NOBLINK, BLACK, NOBLINK], ] colorlist= ["666666"]*15*5 textlist=[""]*15*5 blinklist=[0]*15*5 funkok=False funktest=False def set_color( field, color, blink ): sendstr = b"%d" % field colorlist[field] = colordef[color]["html"] blinklist[field] = blink sendstr += colordef[color]["rgb"] if ( blink ): sendstr += colordef[BLACK]["rgb"] else: sendstr += colordef[color]["rgb"] sendstr += b"\n" q.put(sendstr) TOPIC_POC=[("funk/+/ok/#",1), # Netzkennung ("funk/+/test/#",1), # Testkennung ] def on_connect(client,userdata,flags,reason_code,properties): print(f"Connected with result code {reason_code}") for e in TOPIC_POC: mqttclient.subscribe(e) print("Subscribed", e) def on_disconnet(client,userdata,rc): print("Disconnect with result code"+str(rc)) time.sleep(1) mqttclient.reconnect() def on_message(client,userdata,msg): global funkok,funktest topic = msg.topic Void,quelle,kennung,sub = topic.split("/") if ( kennung == "ok" ): funkok=True if ( kennung == "test" ): funktest=True # old mosquitto_lib #mqttclient=mqtt.Client(client_id="Anzeige",clean_session=False) # new mosquitto_lib mqttclient = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) mqttclient.on_connect = on_connect mqttclient.on_message = on_message def funk_observe(): global funkok,funktest timeoutfunk=time.time() timeoutfunktest=time.time() funkstate=AlarmTyp.RUHE textlist[70] = "Funk OK" while ( True ): try: if funkok: timeoutfunk = time.time() funkok=False; if funkstate != AlarmTyp.TEST : # a test alarm is running, so keep green light instead of white set_color( 70, WHITE, NOBLINK ) if funktest: timeoutfunktest = time.time() funkstate = AlarmTyp.TEST funktest=False set_color(70, GREEN, NOBLINK ) if funkstate == AlarmTyp.TEST and time.time() > (timeoutfunktest + 15*60): funkstate = AlarmTyp.RUHE # whitecolor comes from next LoRaWAN signal if time.time() > ( timeoutfunk + 200 ): # no LoRaWAN for 3 minutes, color red timeoutfunk = time.time() set_color(70, RED, NOBLINK ) except Exception as error: print ("Error: ", error) time.sleep(0.1) def HostUp(hostname): # '''Function returns True if host IP returns a ping, else False''' if ping(hostname,timeout=1): HOST_UP = True else: HOST_UP = False return HOST_UP def ping_hosts(): for i in hostlist: textlist[ i[0] ] = i[2] while True: for i in hostlist: if HostUp( i[1] ): set_color( i[0], i[3], i[4] ) # send a rgb string to display time.sleep(1) # this is mandatory for slow down ping for prevent ping slowdown for unprivileged users else: set_color( i[0], i[5], i[6] ) from urllib.request import urlopen import requests import json def decode_prusa_status( fieldnumber, statustext ): if statustext == 'IDLE' or statustext == 'FINISHED' or statustext == 'READY': set_color( fieldnumber, WHITE, NOBLINK ) if statustext == 'PRINTING': set_color( fieldnumber, GREEN, NOBLINK ) if statustext == 'BUSY' or statustext == 'PAUSED' or statustext == 'STOPPED' or statustext == 'ATTENTION': set_color( fieldnumber, YELLOW, NOBLINK ) if statustext == 'ERROR': set_color( fieldnumber, RED, NOBLINK ) def decode_octopi_status( fieldnumber, statustext ): if statustext == "Printing": set_color( fieldnumber, GREEN, NOBLINK ) elif statustext == "Error" or statustext == "Offline after error": set_color( fieldnumber, RED, NOBLINK ) elif statustext == "Operational": set_color( fieldnumber, WHITE, NOBLINK ) else: set_color( fieldnumber, YELLOW, NOBLINK ) def check_websites(): textlist[74] = "Aktivität < 50 cpm" textlist[26] = "Ultimaker" textlist[27] = "Prusa
MK3 MMU2" textlist[28] = "Prusa
MK3+" textlist[29] = "Prusa
Mini+" textlist[30] = "Prusa
XL" textlist[31] = "Prusa
SL1" headersmk3={'X-Api-Key': 'keymk3'} headersmmu={'X-Api-Key': 'keymmu'} headersmin={'X-Api-Key': 'keymini'} headersxl={'X-Api-Key': 'keyxl'} headersulti={'X-Api-Key': 'keyulti'} authsl = requests.auth.HTTPDigestAuth('maker','123456') while (True): time.sleep(60) try: r = requests.get("http://geiger.local/j") a = r.json() value = a["data"]["cpm"] if value < 50: set_color( 74, WHITE, NOBLINK ) else: set_color( 74, RED, BLINK ) except: set_color( 74, BLACK, NOBLINK ) try: r = requests.get('http://octopi.local/api/job', headers=headersulti ) a = r.json() decode_octopi_status(26, a['state'] ) except: set_color( 26, BLACK, NOBLINK ) try: r = requests.get('http://prusamk3mmu.local/api/v1/status', headers=headersmmu) a = r.json() decode_prusa_status(27, a['printer']['state']) except: set_color( 27, BLACK, NOBLINK ) try: r = requests.get('http://prusamk3.local/api/v1/status', headers=headersmk3) a = r.json() decode_prusa_status(28, a['printer']['state']) except: set_color( 28, BLACK, NOBLINK ) try: r = requests.get('http://prusamini.local/api/v1/status', headers=headersmin) a = r.json() decode_prusa_status(29, a['printer']['state']) except: set_color( 29, BLACK, NOBLINK ) try: r = requests.get('http://prusaxl.local/api/v1/status', headers=headersxl) a = r.json() decode_prusa_status(30, a['printer']['state']) except: set_color( 30, BLACK, NOBLINK ) try: r = requests.get('http://prusasl1.local/api/printer', auth=authsl) a = r.json() decode_prusa_status(31, a['state']['text'].upper()) except: set_color( 31, BLACK, NOBLINK ) def serialcommunication(): ser = serial.Serial( DISPTTY,DISPBAUD, timeout=1) time.sleep(1) # wait for ARDUINO for wake up ser.reset_input_buffer() ser.write(b"\n") a = ser.readline() # read buffers if ( a != b''): # System not ready yet time.sleep(1) else: print("System Ready") while True: item = q.get(block=True) a = b'-' while ( a != item ): ser.reset_input_buffer() ser.write(item) a = ser.readline() if ( a != item ): time.sleep(1) class MyServer(BaseHTTPRequestHandler): # make log quiet def log_message(self, format, *args): pass def do_GET(self): #the do_GET method is inherited from BaseHTTPRequestHandler self.send_response(200) self.send_header("Content-type", "text/html") self.end_headers() self.wfile.write("\n".encode() ) self.wfile.write("\n".encode() ) self.wfile.write("\n".encode() ) self.wfile.write("\n".encode() ) self.wfile.write("World Domination Status".encode() ) self.wfile.write("".encode() ) self.wfile.write("".encode() ) self.wfile.write("\n\n".encode() ) self.wfile.write("

World Domination Panel

".encode()) # define len of total entries, create table 5x3 totallen = len(colorlist) for i in range( int(totallen/15) ): self.wfile.write( "\n".encode() ) for j in range(3): self.wfile.write( "".encode() ) for k in range(5): self.wfile.write( ("")\ .encode() ) self.wfile.write( "\n".encode() ) self.wfile.write( "
" + \ ( " " if blinklist[i*15+j*5+k] else " " ) + \ "

\n".encode() ) self.wfile.write( "".encode() ) if __name__ == "__main__": thread = Thread( target = ping_hosts, daemon=True ) thread2 = Thread( target = serialcommunication,daemon=True ) thread3 = Thread( target = funk_observe,daemon=True ) thread4 = Thread( target = check_websites,daemon=True ) thread.start() thread2.start() thread3.start() thread4.start() webServer = HTTPServer((hostName, serverPort), MyServer) print("Server started http://%s:%s" % (hostName, serverPort)) #Server starts mqttclient.connect(MQTTHOST, port=MQTTPORT,keepalive=60) print("MQTT") mqttclient.loop_start() try: webServer.serve_forever() except KeyboardInterrupt: pass finally: mqttclient.loop_stop() webServer.server_close() #Executes when you hit a keyboard interrupt, closing the server print("Server stopped.")