#!/usr/bin/env python3
#####################################################################
#
# P0188
#
# World domination panel
#
# This software checks status of systems (Ping/Website/MQTT)
# and writes status out to serial interface for
# RGB-LED-stripe
#
#####################################################################
#
# 2024-09-08 xsider version for public
# 2024-08-28 xsider code cleanup
# 2024-08-21 xsider created
#
#####################################################################
from http.server import HTTPServer, BaseHTTPRequestHandler #Python’s built-in library
import os
import time
from threading import Thread
import serial
import queue
import paho.mqtt.client as mqtt
from enum import Enum,auto
from ping3 import ping
hostName = "0.0.0.0"
serverPort = 8082 #You can choose any available port; by default, it is 8000
DISPTTY="/dev/ttyACM0"
DISPBAUD=115200
MQTTHOST="sigint5.internal"
MQTTPORT=1883
brightness = 0
# this is a replacement for enum
BLACK = "BLACK"
RED = "RED"
GREEN = "GREEN"
BLUE = "BLUE"
YELLOW = "YELLOW"
WHITE = "WHITE"
NOBLINK = False
BLINK = True
colordef = { BLACK : { "html" : "000000", "rgb" : b";0;0;0" },
RED : { "html" : "FF0000", "rgb" : b";255;0;0" },
GREEN : { "html" : "00FF00", "rgb" : b";0;255;0" },
BLUE : { "html" : "7070FF", "rgb": b";0;0;255" },
YELLOW : { "html" : "FFFF00", "rgb" : b";255;255;0" },
WHITE : { "html" : "FFFFFF", "rgb" : b";255;255;255" }
}
class AlarmTyp(Enum):
RUHE = auto()
TEST = auto()
q = queue.Queue()
hostlist = [
[0,"192.168.0.1","Router
AP EG",WHITE,NOBLINK,RED,NOBLINK],
[1,"192.168.0.2","Blackfin
Server",WHITE,NOBLINK,RED,BLINK],
[2,"192.168.0.3","Coreswitch",WHITE,NOBLINK,RED,BLINK],
[3,"192.168.0.4","Odin
Fileserver",WHITE,NOBLINK,RED,NOBLINK],
[4,"192.168.0.5","Thor
Mailserver", WHITE, NOBLINK, RED, NOBLINK],
[5,"192.168.0.6","Tyr
Webserver", WHITE, NOBLINK, RED, NOBLINK],
[6,"192.168.0.7","Frigg
Loginserver", WHITE, NOBLINK, RED, NOBLINK],
[7,"192.168.0.8","Fulla
Lizenzserver", WHITE, NOBLINK, BLACK, NOBLINK],
[8,"192.168.0.9","Observer 1", WHITE, NOBLINK, RED, NOBLINK],
[9,"192.168.0.10","Sigint 5", WHITE, NOBLINK, RED, NOBLINK],
[10,"192.168.0.11","AP4
1.OG Nord", WHITE, NOBLINK, BLACK, NOBLINK],
[11,"192.168.0.12","AP7
1.OG SÜd", WHITE, NOBLINK, BLACK, NOBLINK],
[12,"192.168.0.13","AP5
2.OG", WHITE, NOBLINK, BLACK, NOBLINK],
[13,"192.168.0.14","Bridge
3.OG", WHITE, NOBLINK, BLACK, NOBLINK],
[14,"192.168.0.15","AP3
Labor", WHITE, NOBLINK, BLACK, NOBLINK],
[15,"192.168.0.16","Blitzortung", WHITE, NOBLINK, RED, NOBLINK],
[16,"192.168.0.17","urad
Monitor", WHITE, NOBLINK, RED, NOBLINK],
[17,"192.168.0.18","Feinstaub
Sensor", WHITE, NOBLINK, RED, NOBLINK],
[18,"192.168.0.19","Kiwi
SDR",WHITE , NOBLINK, BLACK, NOBLINK],
[19,"192.168.0.20","LoRaWAN
Gateway", WHITE, NOBLINK, RED, NOBLINK],
[20,"192.168.0.21","APC1", WHITE, NOBLINK, BLACK, NOBLINK],
[21,"192.168.0.22","APC2", WHITE, NOBLINK, BLACK, NOBLINK],
[22,"192.168.0.23","APC3", WHITE, NOBLINK, BLACK, NOBLINK],
[23,"192.168.0.24","Laborraum 4", WHITE, NOBLINK, BLACK, NOBLINK],
[24,"192.168.0.25","Kamera 2", WHITE, NOBLINK, BLACK, NOBLINK],
[25,"192.168.0.26","Kamera 4", WHITE, NOBLINK, BLACK, NOBLINK],
[32,"192.168.0.33","Hamclock", WHITE, NOBLINK, BLACK, NOBLINK],
[33,"192.168.0.34","Manik
Backup", WHITE, NOBLINK, RED, NOBLINK],
[34,"192.168.0.35","Automatix
CNC-Fräse", WHITE, NOBLINK, BLACK, NOBLINK],
[35,"192.168.0.36","Indra
Solarmess", WHITE, NOBLINK, BLACK, NOBLINK],
[36,"192.168.0.37","Shiva
Bat.umschaltung", WHITE, NOBLINK, BLACK, NOBLINK],
[38,"192.168.0.39","Redpitaya", WHITE, NOBLINK, BLACK, NOBLINK],
[39,"192.168.0.40","Rigol
Oszi", WHITE, NOBLINK, BLACK, NOBLINK],
[40,"192.168.0.41","Rigol
Spektrum", WHITE, NOBLINK, BLACK, NOBLINK],
[41,"192.168.0.42","Rigol
DMM", WHITE, NOBLINK, BLACK, NOBLINK],
[42,"192.168.0.43","Rigol
Arb.gen", WHITE, NOBLINK, BLACK, NOBLINK],
[43,"192.168.0.44","Victron
Gateway", WHITE, NOBLINK, BLACK, NOBLINK],
[44,"192.168.0.45","Roboterarm", WHITE, NOBLINK, BLACK, NOBLINK],
[45,"192.168.0.46","Unmatched
RISC V", WHITE, NOBLINK, BLACK, NOBLINK],
[46,"192.168.0.47","Drucker
A4 Labor", WHITE, NOBLINK, BLACK, NOBLINK],
[47,"192.168.0.48","Drucker
A3 Labor", WHITE, NOBLINK, BLACK, NOBLINK],
[48,"192.168.0.49","DSKY", WHITE, NOBLINK, BLACK, NOBLINK],
[49,"192.168.0.50","Oc 5", WHITE, NOBLINK, BLACK, NOBLINK],
[60,"192.168.0.61","Arbeitsplatz
EG", WHITE, NOBLINK, BLACK, NOBLINK],
[61,"192.168.0.62","Drucker
EG", WHITE, NOBLINK, BLACK, NOBLINK],
[62,"192.168.0.63","Inferno
1.OG", WHITE, NOBLINK, BLACK, NOBLINK],
[63,"192.168.0.64","Drucker
1.OG", WHITE, NOBLINK, BLACK, NOBLINK],
[64,"192.168.0.65","Eb
Wetterstation", WHITE, NOBLINK, BLACK, NOBLINK],
[65,"192.168.0.66","Sigint 2
NOAA Sat.", WHITE, NOBLINK, BLACK, NOBLINK],
[66,"192.168.0.67","Weishaupt
Heizung", WHITE, NOBLINK, BLACK, NOBLINK],
[67,"192.168.0.68","Wago 1
SPS", WHITE, NOBLINK, BLACK, NOBLINK],
[68,"192.168.0.69","VUZero", WHITE, NOBLINK, BLACK, NOBLINK],
[69,"192.168.0.70","Vishnu
Thermodrucker", WHITE, NOBLINK, BLACK, NOBLINK],
]
colorlist= ["666666"]*15*5
textlist=[""]*15*5
blinklist=[0]*15*5
funkok=False
funktest=False
def set_color( field, color, blink ):
sendstr = b"%d" % field
colorlist[field] = colordef[color]["html"]
blinklist[field] = blink
sendstr += colordef[color]["rgb"]
if ( blink ):
sendstr += colordef[BLACK]["rgb"]
else:
sendstr += colordef[color]["rgb"]
sendstr += b"\n"
q.put(sendstr)
TOPIC_POC=[("funk/+/ok/#",1), # Netzkennung
("funk/+/test/#",1), # Testkennung
]
def on_connect(client,userdata,flags,reason_code,properties):
print(f"Connected with result code {reason_code}")
for e in TOPIC_POC:
mqttclient.subscribe(e)
print("Subscribed", e)
def on_disconnet(client,userdata,rc):
print("Disconnect with result code"+str(rc))
time.sleep(1)
mqttclient.reconnect()
def on_message(client,userdata,msg):
global funkok,funktest
topic = msg.topic
Void,quelle,kennung,sub = topic.split("/")
if ( kennung == "ok" ):
funkok=True
if ( kennung == "test" ):
funktest=True
# old mosquitto_lib
#mqttclient=mqtt.Client(client_id="Anzeige",clean_session=False)
# new mosquitto_lib
mqttclient = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttclient.on_connect = on_connect
mqttclient.on_message = on_message
def funk_observe():
global funkok,funktest
timeoutfunk=time.time()
timeoutfunktest=time.time()
funkstate=AlarmTyp.RUHE
textlist[70] = "Funk OK"
while ( True ):
try:
if funkok:
timeoutfunk = time.time()
funkok=False;
if funkstate != AlarmTyp.TEST : # a test alarm is running, so keep green light instead of white
set_color( 70, WHITE, NOBLINK )
if funktest:
timeoutfunktest = time.time()
funkstate = AlarmTyp.TEST
funktest=False
set_color(70, GREEN, NOBLINK )
if funkstate == AlarmTyp.TEST and time.time() > (timeoutfunktest + 15*60):
funkstate = AlarmTyp.RUHE
# whitecolor comes from next LoRaWAN signal
if time.time() > ( timeoutfunk + 200 ): # no LoRaWAN for 3 minutes, color red
timeoutfunk = time.time()
set_color(70, RED, NOBLINK )
except Exception as error:
print ("Error: ", error)
time.sleep(0.1)
def HostUp(hostname):
# '''Function returns True if host IP returns a ping, else False'''
if ping(hostname,timeout=1):
HOST_UP = True
else:
HOST_UP = False
return HOST_UP
def ping_hosts():
for i in hostlist:
textlist[ i[0] ] = i[2]
while True:
for i in hostlist:
if HostUp( i[1] ):
set_color( i[0], i[3], i[4] )
# send a rgb string to display
time.sleep(1) # this is mandatory for slow down ping for prevent ping slowdown for unprivileged users
else:
set_color( i[0], i[5], i[6] )
from urllib.request import urlopen
import requests
import json
def decode_prusa_status( fieldnumber, statustext ):
if statustext == 'IDLE' or statustext == 'FINISHED' or statustext == 'READY':
set_color( fieldnumber, WHITE, NOBLINK )
if statustext == 'PRINTING':
set_color( fieldnumber, GREEN, NOBLINK )
if statustext == 'BUSY' or statustext == 'PAUSED' or statustext == 'STOPPED' or statustext == 'ATTENTION':
set_color( fieldnumber, YELLOW, NOBLINK )
if statustext == 'ERROR':
set_color( fieldnumber, RED, NOBLINK )
def decode_octopi_status( fieldnumber, statustext ):
if statustext == "Printing":
set_color( fieldnumber, GREEN, NOBLINK )
elif statustext == "Error" or statustext == "Offline after error":
set_color( fieldnumber, RED, NOBLINK )
elif statustext == "Operational":
set_color( fieldnumber, WHITE, NOBLINK )
else:
set_color( fieldnumber, YELLOW, NOBLINK )
def check_websites():
textlist[74] = "Aktivität < 50 cpm"
textlist[26] = "Ultimaker"
textlist[27] = "Prusa
MK3 MMU2"
textlist[28] = "Prusa
MK3+"
textlist[29] = "Prusa
Mini+"
textlist[30] = "Prusa
XL"
textlist[31] = "Prusa
SL1"
headersmk3={'X-Api-Key': 'keymk3'}
headersmmu={'X-Api-Key': 'keymmu'}
headersmin={'X-Api-Key': 'keymini'}
headersxl={'X-Api-Key': 'keyxl'}
headersulti={'X-Api-Key': 'keyulti'}
authsl = requests.auth.HTTPDigestAuth('maker','123456')
while (True):
time.sleep(60)
try:
r = requests.get("http://geiger.local/j")
a = r.json()
value = a["data"]["cpm"]
if value < 50:
set_color( 74, WHITE, NOBLINK )
else:
set_color( 74, RED, BLINK )
except:
set_color( 74, BLACK, NOBLINK )
try:
r = requests.get('http://octopi.local/api/job', headers=headersulti )
a = r.json()
decode_octopi_status(26, a['state'] )
except:
set_color( 26, BLACK, NOBLINK )
try:
r = requests.get('http://prusamk3mmu.local/api/v1/status', headers=headersmmu)
a = r.json()
decode_prusa_status(27, a['printer']['state'])
except:
set_color( 27, BLACK, NOBLINK )
try:
r = requests.get('http://prusamk3.local/api/v1/status', headers=headersmk3)
a = r.json()
decode_prusa_status(28, a['printer']['state'])
except:
set_color( 28, BLACK, NOBLINK )
try:
r = requests.get('http://prusamini.local/api/v1/status', headers=headersmin)
a = r.json()
decode_prusa_status(29, a['printer']['state'])
except:
set_color( 29, BLACK, NOBLINK )
try:
r = requests.get('http://prusaxl.local/api/v1/status', headers=headersxl)
a = r.json()
decode_prusa_status(30, a['printer']['state'])
except:
set_color( 30, BLACK, NOBLINK )
try:
r = requests.get('http://prusasl1.local/api/printer', auth=authsl)
a = r.json()
decode_prusa_status(31, a['state']['text'].upper())
except:
set_color( 31, BLACK, NOBLINK )
def serialcommunication():
ser = serial.Serial( DISPTTY,DISPBAUD, timeout=1)
time.sleep(1) # wait for ARDUINO for wake up
ser.reset_input_buffer()
ser.write(b"\n")
a = ser.readline() # read buffers
if ( a != b''):
# System not ready yet
time.sleep(1)
else:
print("System Ready")
while True:
item = q.get(block=True)
a = b'-'
while ( a != item ):
ser.reset_input_buffer()
ser.write(item)
a = ser.readline()
if ( a != item ):
time.sleep(1)
class MyServer(BaseHTTPRequestHandler):
# make log quiet
def log_message(self, format, *args):
pass
def do_GET(self): #the do_GET method is inherited from BaseHTTPRequestHandler
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write("\n".encode() )
self.wfile.write("\n".encode() )
self.wfile.write("
" + \
( " " if blinklist[i*15+j*5+k] else " " ) + \
textlist[i*15+j*5+k] + \
( " " if blinklist[i*15+j*5+k] else " " ) + \
" | ")\
.encode() )
self.wfile.write( "
\n".encode() ) self.wfile.write( "".encode() ) if __name__ == "__main__": thread = Thread( target = ping_hosts, daemon=True ) thread2 = Thread( target = serialcommunication,daemon=True ) thread3 = Thread( target = funk_observe,daemon=True ) thread4 = Thread( target = check_websites,daemon=True ) thread.start() thread2.start() thread3.start() thread4.start() webServer = HTTPServer((hostName, serverPort), MyServer) print("Server started http://%s:%s" % (hostName, serverPort)) #Server starts mqttclient.connect(MQTTHOST, port=MQTTPORT,keepalive=60) print("MQTT") mqttclient.loop_start() try: webServer.serve_forever() except KeyboardInterrupt: pass finally: mqttclient.loop_stop() webServer.server_close() #Executes when you hit a keyboard interrupt, closing the server print("Server stopped.")