Source code for juham.shelly.shellymotion_simulator
import json
import time
from juham.base import Base
from juham.shelly.jshelly import JShelly
from juham.web import RCloud
from juham.web import IWorkerThread, RThread
class ShellyMotionSimulatorThread(IWorkerThread):
"""Thread simulating Shelly Plus 1 wifi relay with four temperature
sensors."""
_class_id = "ShellyMotionSimulatorThread"
def __init__(self, topic: str = "", interval: float = 60) -> None:
"""Construct thread for simulating data from Shelly motion sensors.
Args:
topic (str, optional): MQTT topic to post the sensor readings. Defaults to None.
interval (float, optional): Interval specifying how often the sensor is read. Defaults to 60 seconds.
"""
super().__init__()
self.shelly_topic: str = topic
self.interval: float = interval
def update_interval(self) -> float:
return self.interval
def update(self) -> bool:
super().update()
m = {
"tmp": {"value": 22.5}, # Room temperature value
"sensor": {
"vibration": True, # Vibration status
"motion": False, # Motion status
},
"unixtime": int(time.time()),
}
msg = json.dumps(m)
self.mqtt_client.publish(self.shelly_topic, msg, qos=1, retain=True)
return True
@classmethod
def register(cls):
IWorkerThread.register()
Base.register_class(cls._class_id, cls)
[docs]
class ShellyMotionSimulator(RThread, JShelly):
"""Simulator for Shelly Motion 2 - a wifi motion sensor. Spawns a thread
to generate MQTT messages as if they origin from the actual Shelly motion sensor"""
_class_id = None
workerThreadId = ShellyMotionSimulatorThread.get_class_id()
shelly_topic = "shellies/shellymotion2/info"
update_interval = 60
def __init__(
self,
name="shellymotionsensor",
topic: str = "",
interval: float = 60,
) -> None:
"""Create Shelly motion sensor simulator.
Args:
name (str, optional): Name of the object. Defaults to 'shellymotionsensor'.
topic (str, optional): MQTT topic to publish motion sensor events. Defaults to None.
interval (float, optional): interval between events, in seconds. Defaults to None.
"""
super().__init__(name)
self.active_liter_lpm = -1
self.update_ts = None
if topic is not None:
self.topic = topic
if interval is not None:
self.interval = interval
[docs]
def on_connect(self, client, userdata, flags, rc):
super().on_connect(client, userdata, flags, rc)
if rc == 0:
self.subscribe(self.shelly_topic)
[docs]
def on_message(self, client, userdata, msg):
if msg.topic == self.shelly_topic:
em = json.loads(msg.payload.decode())
self.on_sensor(em)
else:
super().on_message(client, userdata, msg)
[docs]
def on_sensor(self, em: dict) -> None:
"""Handle data coming from the Shelly motion sensors.
Simply log the event to indicate the presense of simulated device.
Args:
em (dict): data from the sensor
"""
self.debug(f"Motion sensor sensor {em}")
[docs]
def run(self):
self.worker = Base.instantiate(ShellyMotionSimulatorThread.get_class_id())
self.worker.shelly_topic = self.shelly_topic
self.worker.interval = self.update_interval
super().run()
[docs]
def to_dict(self):
data = super().to_dict()
data["_shellymotionsimulator"] = {"shelly_topic": self.shelly_topic}
return data
[docs]
def from_dict(self, data):
super().from_dict(data)
if "_shellymotionsimulator" in data:
for key, value in data["_shellymotionsimulator"].items():
setattr(self, key, value)
[docs]
@classmethod
def register(cls):
if cls._class_id is None:
RCloud.register()
ShellyMotionSimulatorThread.register()
cls.initialize_class()