Python für MQTT-Steuerung
Mit dem folgenden Script werden nacheinander zwei Steuerbefehle an die Heimautomation abgesendet. Sobald die Sensoren die Ausführung eines Befehls bestätigen, wird zum nächsten Befehl übergegangen.
#!/usr/bin/env python import time import paho.mqtt.client as mqtt ESSTISCH = "P1" TV = "P2" CA_FILE = "ca.crt" MQTT_USERNAME = "user" MQTT_PASSWORD = "secret" MQTT_SERVER = "mqtt.example.com" SENSOR_TOPIC_BASE = "/a/b/position/" ACTOR_TOPIC = "/a/b/c/cmd" values = {} topic_to_check = "" value_to_check = b'1' def on_connect(client, userdata, flags, rc): print("Connected with result code " + str(rc)) def on_message(client, userdata, msg): #print(msg.topic + " " + str(msg.payload)) values[msg.topic] = msg.payload global topic_to_check if msg.topic == topic_to_check: print(str(msg.payload)) # else: # print(F"{msg.topic} != {topic_to_check}") def down(text, name, pause = 2): global topic_to_check print(text) topic_to_check = F"{SENSOR_TOPIC_BASE}{name}" client.publish(ACTOR_TOPIC, payload=F"{name}-down") while topic_to_check not in values or values[topic_to_check] != value_to_check: time.sleep(1) print("Pause") time.sleep(pause) client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.tls_set(ca_certs=CA_FILE) client.username_pw_set(MQTT_USERNAME, password=MQTT_PASSWORD) client.connect(MQTT_SERVER, 8883, 60) client.subscribe(F"{SENSOR_TOPIC_BASE}+") client.loop_start() down("Esstisch absenken", ESSTISCH) down("TV absenken", TV) client.disconnect()
Das Projekt verwendet die Bibliothek paho-mqtt.