Source code for juham.shelly.shellymotion_simulator

import json
import time
from juham.base import Base
from juham.shelly.jshelly import JShelly
from juham.web import RCloud
from juham.web import IWorkerThread, RThread


class ShellyMotionSimulatorThread(IWorkerThread):
    """Thread simulating Shelly Plus 1 wifi relay with four temperature
    sensors."""

    _class_id = "ShellyMotionSimulatorThread"

    def __init__(self, topic: str = "", interval: float = 60) -> None:
        """Construct thread for simulating data from Shelly motion sensors.

        Args:
            topic (str, optional): MQTT topic to post the sensor readings. Defaults to None.
            interval (float, optional): Interval specifying how often the sensor is read. Defaults to 60 seconds.
        """
        super().__init__()
        self.shelly_topic: str = topic
        self.interval: float = interval

    def update_interval(self) -> float:
        return self.interval

    def update(self) -> bool:
        super().update()

        m = {
            "tmp": {"value": 22.5},  # Room temperature value
            "sensor": {
                "vibration": True,  # Vibration status
                "motion": False,  # Motion status
            },
            "unixtime": int(time.time()),
        }

        msg = json.dumps(m)
        self.mqtt_client.publish(self.shelly_topic, msg, qos=1, retain=True)
        return True

    @classmethod
    def register(cls):
        IWorkerThread.register()
        Base.register_class(cls._class_id, cls)


[docs] class ShellyMotionSimulator(RThread, JShelly): """Simulator for Shelly Motion 2 - a wifi motion sensor. Spawns a thread to generate MQTT messages as if they origin from the actual Shelly motion sensor""" _class_id = None workerThreadId = ShellyMotionSimulatorThread.get_class_id() shelly_topic = "shellies/shellymotion2/info" update_interval = 60 def __init__( self, name="shellymotionsensor", topic: str = "", interval: float = 60, ) -> None: """Create Shelly motion sensor simulator. Args: name (str, optional): Name of the object. Defaults to 'shellymotionsensor'. topic (str, optional): MQTT topic to publish motion sensor events. Defaults to None. interval (float, optional): interval between events, in seconds. Defaults to None. """ super().__init__(name) self.active_liter_lpm = -1 self.update_ts = None if topic is not None: self.topic = topic if interval is not None: self.interval = interval
[docs] def on_connect(self, client, userdata, flags, rc): super().on_connect(client, userdata, flags, rc) if rc == 0: self.subscribe(self.shelly_topic)
[docs] def on_message(self, client, userdata, msg): if msg.topic == self.shelly_topic: em = json.loads(msg.payload.decode()) self.on_sensor(em) else: super().on_message(client, userdata, msg)
[docs] def on_sensor(self, em: dict) -> None: """Handle data coming from the Shelly motion sensors. Simply log the event to indicate the presense of simulated device. Args: em (dict): data from the sensor """ self.debug(f"Motion sensor sensor {em}")
[docs] def run(self): self.worker = Base.instantiate(ShellyMotionSimulatorThread.get_class_id()) self.worker.shelly_topic = self.shelly_topic self.worker.interval = self.update_interval super().run()
[docs] def to_dict(self): data = super().to_dict() data["_shellymotionsimulator"] = {"shelly_topic": self.shelly_topic} return data
[docs] def from_dict(self, data): super().from_dict(data) if "_shellymotionsimulator" in data: for key, value in data["_shellymotionsimulator"].items(): setattr(self, key, value)
[docs] @classmethod def register(cls): if cls._class_id is None: RCloud.register() ShellyMotionSimulatorThread.register() cls.initialize_class()