Source code for juham.shelly.shellymotion

import json
from typing import Any
from influxdb_client_3 import Point
from juham.base import Base, MqttMsg
from juham.shelly.jshelly import JShelly


[docs] class ShellyMotion(JShelly): """Shelly Motion 2 - a wifi motion sensor with light and temperature metering.""" _class_id = "" shelly_topic = "shellies/shellymotion2/info" # source topic motion_topic = Base.mqtt_root_topic + "/motion/" # target topic def __init__(self, name="shellymotion"): super().__init__(name) self.shelly_topic = ShellyMotion.shelly_topic
[docs] def on_connect(self, client, userdata, flags, rc): super().on_connect(client, userdata, flags, rc) if rc == 0: self.subscribe(self.shelly_topic) self.debug("Topic " + self.shelly_topic + " subscribed")
[docs] def on_message(self, client: object, userdata: Any, msg: MqttMsg) -> None: if msg.topic == self.shelly_topic: m = json.loads(msg.payload.decode()) self.on_sensor(m) else: super().on_message(client, userdata, msg)
[docs] def on_sensor(self, m: dict) -> None: """Handle motion sensor event. This method reads the incoming event, translates it, and publishes it to the Juham topic. It also writes the attributes to the time series database. Args: m (dict): MQTT event from Shelly motion sensor """ tmp = m["tmp"] sensor_id = "motion" roomtemperature = tmp["value"] sensor = m["sensor"] vibration = sensor["vibration"] motion = sensor["motion"] timestamp = m["unixtime"] self.debug(f"Motion sensor event {self.timestampstr(timestamp)}") msg = { "sensor": sensor_id, "timestamp": timestamp, "temperature": int(roomtemperature), "motion": motion, "vibration": vibration, } self.publish(self.motion_topic, json.dumps(msg), 1, True) point = ( Point("motion") .tag("sensor", sensor_id) .field("motion", motion) .field("vibration", vibration) .field("roomtemp", roomtemperature) .field("timestamp", int(timestamp)) .time(self.epoc2utc(timestamp)) ) self.write(point)
[docs] def to_dict(self): data = super().to_dict() data["_shellymotion"] = { "shelly_topic": self.shelly_topic, "motion_topic": self.motion_topic, } return data
[docs] def from_dict(self, data): super().from_dict(data) if "_shellymotion" in data: for key, value in data["_shellymotion"].items(): setattr(self, key, value)
[docs] @classmethod def register(cls): if cls._class_id == "": JShelly.register() cls.initialize_class() cls.register_topic(cls.motion_topic) cls.register_topic("shellies/shellymotion2/#")