Python für MQTT-Steuerung

Gespeichert von Erik Wegner am/um
Aufmacherbild

Mit dem folgenden Script werden nacheinander zwei Steuerbefehle an die Heimautomation abgesendet. Sobald die Sensoren die Ausführung eines Befehls bestätigen, wird zum nächsten Befehl übergegangen.

#!/usr/bin/env python
import time

import paho.mqtt.client as mqtt


ESSTISCH = "P1"
TV = "P2"
CA_FILE = "ca.crt"
MQTT_USERNAME = "user"
MQTT_PASSWORD = "secret"
MQTT_SERVER = "mqtt.example.com"
SENSOR_TOPIC_BASE = "/a/b/position/"
ACTOR_TOPIC = "/a/b/c/cmd"


values = {}
topic_to_check = ""
value_to_check = b'1'

def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))

def on_message(client, userdata, msg):
    #print(msg.topic + " " + str(msg.payload))
    values[msg.topic] = msg.payload
    global topic_to_check
    if msg.topic == topic_to_check:
        print(str(msg.payload))
#    else:
#        print(F"{msg.topic} != {topic_to_check}")

def down(text, name, pause = 2):
    global topic_to_check
    print(text)
    topic_to_check = F"{SENSOR_TOPIC_BASE}{name}"
    client.publish(ACTOR_TOPIC, payload=F"{name}-down")
    while topic_to_check not in values or values[topic_to_check] != value_to_check:
        time.sleep(1)
    print("Pause")
    time.sleep(pause)

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.tls_set(ca_certs=CA_FILE)
client.username_pw_set(MQTT_USERNAME, password=MQTT_PASSWORD)
client.connect(MQTT_SERVER, 8883, 60)
client.subscribe(F"{SENSOR_TOPIC_BASE}+")
client.loop_start()

down("Esstisch absenken", ESSTISCH)
down("TV absenken", TV)

client.disconnect()

Das Projekt verwendet die Bibliothek paho-mqtt.