Python für MQTT-Steuerung
Mit dem folgenden Script werden nacheinander zwei Steuerbefehle an die Heimautomation abgesendet. Sobald die Sensoren die Ausführung eines Befehls bestätigen, wird zum nächsten Befehl übergegangen.
#!/usr/bin/env python
import time
import paho.mqtt.client as mqtt
ESSTISCH = "P1"
TV = "P2"
CA_FILE = "ca.crt"
MQTT_USERNAME = "user"
MQTT_PASSWORD = "secret"
MQTT_SERVER = "mqtt.example.com"
SENSOR_TOPIC_BASE = "/a/b/position/"
ACTOR_TOPIC = "/a/b/c/cmd"
values = {}
topic_to_check = ""
value_to_check = b'1'
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
def on_message(client, userdata, msg):
#print(msg.topic + " " + str(msg.payload))
values[msg.topic] = msg.payload
global topic_to_check
if msg.topic == topic_to_check:
print(str(msg.payload))
# else:
# print(F"{msg.topic} != {topic_to_check}")
def down(text, name, pause = 2):
global topic_to_check
print(text)
topic_to_check = F"{SENSOR_TOPIC_BASE}{name}"
client.publish(ACTOR_TOPIC, payload=F"{name}-down")
while topic_to_check not in values or values[topic_to_check] != value_to_check:
time.sleep(1)
print("Pause")
time.sleep(pause)
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.tls_set(ca_certs=CA_FILE)
client.username_pw_set(MQTT_USERNAME, password=MQTT_PASSWORD)
client.connect(MQTT_SERVER, 8883, 60)
client.subscribe(F"{SENSOR_TOPIC_BASE}+")
client.loop_start()
down("Esstisch absenken", ESSTISCH)
down("TV absenken", TV)
client.disconnect()
Das Projekt verwendet die Bibliothek paho-mqtt.