Source code for juham.shelly.shellypro3em

from juham.base import Base
from juham.shelly.jshelly import JShelly
from influxdb_client_3 import Point
import json


[docs] class ShellyPro3EM(JShelly): """The Shelly Pro3EM energy meter. Publishes the active power (also called real power) to MQTT, as it represents the part of the power that can be converted to useful work. Full set of measurements e.g. reactive power, current, and voltage, available from the sensor, are written to a time series database for inspection purposes. """ _class_id = None shelly_topic = "shellypro3em-powermeter/events/rpc" # input topic power_topic = Base.mqtt_root_topic + "/powerconsumption" # target topic def __init__(self, name="shellypro3em"): super().__init__(name)
[docs] def on_connect(self, client, userdata, flags, rc): super().on_connect(client, userdata, flags, rc) if rc == 0: self.subscribe(ShellyPro3EM.shelly_topic)
[docs] def on_message(self, client, userdata, msg): super().on_message(client, userdata, msg) m = json.loads(msg.payload.decode()) mth = m["method"] if mth == "NotifyStatus": params = m["params"] ts = params["ts"] if "em:0" in params: self.on_em_0(ts, params["em:0"]) elif "emdata:0" in params: self.on_emdata_0(ts, params["emdata:0"]) else: self.error("Unknown powermeter id", str(m)) elif mth == "NotifyEvent": pass else: self.error("UNKNOWN POWERMETER METHOD {mth}", str(m))
[docs] def on_em_0(self, ts, em): """Handle the incoming Shelly message containing all power meter readings. Publish the real power component, which is of interest to Juham, and record all measurements to the time series database. Args: ts (str): time stamp of the event em (dict): message from the Shelly device """ self.publish_active_power(ts, em) self.record_power(ts, em)
[docs] def publish_active_power(self, ts, em): """Publish the active power, also known as real power. This is that part of the power that can be converted to useful work. Args: ts (str): time stamp of the event em (dict): message from the Shelly device """ msg = { "timestamp": ts, "real_a": em["a_act_power"], "real_b": em["b_act_power"], "real_c": em["c_act_power"], "real_total": em["total_act_power"], } self.mqtt_client.publish(self.power_topic, json.dumps(msg), 1, True)
[docs] def record_power(self, ts: float, em: dict): """Given current time in UTC and energy meter message update the time series database accordingly. Args: ts (float): utc time em (dict): energy meter message """ point = ( Point("powermeter") .tag("sensor", "em0") .field("real_A", em["a_act_power"]) .field("real_B", em["b_act_power"]) .field("real_C", em["c_act_power"]) .field("total_real_power", em["total_act_power"]) .field("total_apparent_power", em["total_aprt_power"]) .field("apparent_a", em["a_aprt_power"]) .field("apparent_b", em["b_aprt_power"]) .field("apparent_c", em["c_aprt_power"]) .field("current_a", em["a_current"]) .field("current_b", em["b_current"]) .field("current_c", em["c_current"]) .field("current_n", em["n_current"]) .field("current_total", em["total_current"]) .field("voltage_a", em["a_voltage"]) .field("voltage_b", em["b_voltage"]) .field("voltage_c", em["c_voltage"]) .field("freq_a", em["a_freq"]) .field("freq_b", em["b_freq"]) .field("freq_c", em["c_freq"]) .time(self.epoc2utc(ts)) ) try: self.write(point) except Exception as e: self.error(f"Writing to influx failed {str(e)}")
[docs] def on_emdata_0(self, ts, em): """Handle energy meter sensor message. Args: ts (str): utc time em (dict): energy meter sensor specific message """ point = ( Point("powermeter") .tag("sensor", "emdata0") .field("total_act_energy_A", em["a_total_act_energy"]) .field("total_act_energy_B", em["b_total_act_energy"]) .field("total_act_energy_C", em["c_total_act_energy"]) .field("total_act_ret_energy_A", em["a_total_act_ret_energy"]) .field("total_act_ret_energy_B", em["b_total_act_ret_energy"]) .field("total_act_ret_energy_C", em["c_total_act_ret_energy"]) .field("total_act", em["total_act"]) .field("total_act_ret", em["total_act_ret"]) .time(self.epoc2utc(ts)) ) try: self.write(point) self.debug(f"Total energy consumed {em["total_act"]}, exported {em["total_act_ret"]}") except Exception as e: self.error(f"Writing to influx failed {str(e)}")
[docs] def to_dict(self): data = super().to_dict() data["_shellypro3em"] = {"shelly_topic": self.shelly_topic, "power_topic": self.power_topic} return data
[docs] def from_dict(self, data): super().from_dict(data) if "_shellypro3em" in data: for key, value in data["_shellypro3em"].items(): setattr(self, key, value)
[docs] @classmethod def register(cls): if cls._class_id is None: JShelly.register() cls.initialize_class()