Source code for juham.automation.watercirculator
from juham.base import Base
import json
[docs]
class WaterCirculator(Base):
"""Hot Water Circulation Controller.
Uses motion sensor data to determine if someone is at home. If so,
it runs the water circulator pump to ensure that hot water is
instantly available when the tap is turned on.
"""
_class_id = None
uptime = 60 * 5
min_temperature = 37
relay_topic = "shellyplus1-a0a3b3c309c4/command/switch:0"
temperature_topic = Base.mqtt_root_topic + "/temperature/103"
motion_sensor_topic = "shellies/shellymotion2/info"
motion_topics = "shellies/shellymotion2/#"
def __init__(self, name="rwatercirculator"):
super().__init__(name)
self.current_motion = 0
self.relay_started_ts = 0
self.water_temperature = 0
self.water_temperature_updated = 0
[docs]
def on_connect(self, client, userdata, flags, rc):
super().on_connect(client, userdata, flags, rc)
if rc == 0:
self.subscribe(self.motion_topics)
self.subscribe(self.temperature_topic)
[docs]
def on_message(self, client, userdata, msg):
if msg.topic == self.temperature_topic:
m = json.loads(msg.payload.decode())
self.on_temperature_sensor(m, self.timestamp())
elif msg.topic == self.motion_sensor_topic:
m = json.loads(msg.payload.decode())
self.on_motion_sensor(m, self.timestamp())
else:
super().on_message(client, userdata, msg)
[docs]
def on_temperature_sensor(self, m: dict, ts_utc_now: float) -> None:
"""Handle message from the hot water pipe temperature sensor.
Records the temperature and updates the water_temperature_updated attribute.
Args:
m (dict): temperature reading from the hot water blump sensor
ts_utc_now (float): _current utc time
"""
self.water_temperature = m["temperature"]
self.water_temperature_updated = ts_utc_now
self.debug(
f"Temperature of circulating water updated to {self.water_temperature} C"
)
[docs]
def on_motion_sensor(self, m: dict, ts_utc_now: float) -> None:
"""Control the water cirulator bump.
Given message from the motion sensor consider switching the
circulator bump on.
Args:
msg (dict): directionary holding motion sensor data
ts_utc_now (float): current time stamp
"""
sensor = m["sensor"]
vibration = sensor["vibration"]
motion = sensor["motion"]
if motion == True:
if not self.current_motion:
temp_check_ok = False
if ts_utc_now - self.water_temperature_updated < 15 * 60:
temp_check_ok = True
if (
self.water_temperature > self.min_temperature
) and temp_check_ok is True:
self.debug(
f"Motion detected but water is warm enough already {self.water_temperature} C"
)
else:
self.current_motion = True
self.relay_started_ts = ts_utc_now
self.mqtt_client.publish(self.relay_topic, "on", 1)
self.info(
f"Water circulator pump switched on for {int(self.uptime / 60)} minutes "
)
else:
self.debug(
f"Circulator pump has been running for {str(int(ts_utc_now - self.relay_started_ts)/60)} minutes",
" ",
)
else:
if self.current_motion == True:
if ts_utc_now - self.relay_started_ts > self.uptime:
self.mqtt_client.publish(self.relay_topic, "off", 1)
self.info("Water circulator pump switched off", "")
self.current_motion = False
else:
self.debug(
f"Pump shutdown countdown {str(int(self.uptime - (ts_utc_now - self.relay_started_ts ))/60)} min"
)
else:
self.debug(
f"Pump off already, temperature {self.water_temperature} C", ""
)
[docs]
@classmethod
def register(cls):
if cls._class_id is None:
Base.register()
cls.initialize_class()