import json
from typing import Any
from juham.base import Base
# TODO: rewrite and implement heat sensors to publish boiler temperatures in device independent format and more
[docs]
class RPowerPlan(Base):
"""Automation class for optimized control of home energy consumers e.g hot
water boilers. Reads spot prices, boiler water temperatures and controls
heating radiators.
"""
_class_id = None
topic_spot = Base.mqtt_root_topic + "/spot"
topic_forecast = Base.mqtt_root_topic + "/forecast"
topic_temperature = Base.mqtt_root_topic + "/temperature/102"
topic_powerplan = Base.mqtt_root_topic + "/powerplan"
topic_power = Base.mqtt_root_topic + "/power"
topic_in_powerconsumption = Base.mqtt_root_topic + "/powerconsumption"
uoi_limit = 0.75
maximum_boiler_temperature = 70
minimum_boiler_temperature = 45
def __init__(self, name="rpowerplan"):
super().__init__(name)
self.main_boiler_temperature = 100
self.pre_boiler_temperature = 0
self.current_heating_plan = 0
self.heating_plan = None
self.power_plan = None
self.ranked_spot_prices = None
self.ranked_solarpower = None
self.relay = 0
self.relay_started = 0
self.current_power = 0
[docs]
def on_connect(self, client, userdata, flags, rc):
super().on_connect(client, userdata, flags, rc)
if rc == 0:
self.subscribe(self.topic_spot)
self.subscribe(self.topic_forecast)
self.subscribe(self.topic_temperature)
self.subscribe(self.topic_in_powerconsumption)
[docs]
def sort_by_rank(self, hours: list, ts_utc_now: float) -> list:
"""Sort the given electricity prices by their rank value. Given a list
of electricity prices, return a sorted list from the cheapest to the
most expensive hours. Entries that represent electricity prices in the
past are excluded.
Args:
hours (list): list of hourly electricity prices
ts_utc_now (float): current time
Returns:
list: sorted list of electricity prices
"""
sh = sorted(hours, key=lambda x: x["Rank"])
ranked_hours = []
for h in sh:
utc_ts = h["Timestamp"]
if utc_ts > ts_utc_now:
ranked_hours.append(h)
return ranked_hours
[docs]
def sort_by_power(self, solarpower: list, ts_utc_now: float) -> list:
"""Sort forecast of solarpower to decreasing order.
Args:
solarpower (list): list of entries describing hourly solar energy forecast
ts_utc_now (float): curren time, for exluding entries that are in the past
Returns:
list: list from the highest solarenergy to lowest.
"""
self.debug(
f"Sorting {len(solarpower)} days of forecast starting at {self.timestampstr(ts_utc_now)}"
)
sh = sorted(solarpower, key=lambda x: x["solarenergy"], reverse=True)
ranked_hours = []
for h in sh:
utc_ts = h["ts"]
if utc_ts > ts_utc_now:
ranked_hours.append(h)
self.debug(f"Forecast sorted for the next {str(len(ranked_hours))} days")
return ranked_hours
[docs]
def on_message(self, client: object, userdata: Any, msg: object) -> None:
"""Process MQTT message
Args:
client (object): _description_
userdata (Any): _description_
msg (object): _description_
"""
m = None
ts_utc_now = self.timestamp()
if msg.topic == self.topic_spot:
self.ranked_spot_prices = self.sort_by_rank(
json.loads(msg.payload.decode()), ts_utc_now
)
self.power_plan = None # reset power plan, it depends on spot prices
return
elif msg.topic == self.topic_forecast:
self.ranked_solarpower = self.sort_by_power(
json.loads(msg.payload.decode()), ts_utc_now
)
self.power_plan = None # reset power plan, it depends on forecast
return
elif msg.topic == self.topic_temperature:
m = json.loads(msg.payload.decode())
self.main_boiler_temperature = m["temperature"]
elif msg.topic == self.topic_in_powerconsumption:
m = json.loads(msg.payload.decode())
self.current_power = m["real_total"]
self.debug(f"Current power {self.current_power/1000.0} kW")
else:
super().on_message(client, userdata, msg)
return
self.on_powerplan(ts_utc_now)
[docs]
def on_powerplan(self, ts_utc_now: float) -> None:
"""Apply power plan.
Args:
ts_utc_now (float): utc time
"""
if self.ranked_solarpower is None:
self.debug("waiting forecast ...", "")
return
if self.ranked_spot_prices is None:
self.debug("Waiting spot prices...", "")
return
if self.power_plan is None:
self.power_plan = self.create_power_plan()
self.heating_plan = None
self.info(
f"Power plan of length {len(self.power_plan)} created",
str(self.power_plan),
)
if self.power_plan is None:
self.error("Failed to create a power plan", "")
return
if len(self.power_plan) < 4:
self.warning(
f"Suspiciously short {len(self.power_plan)} power plan, wait more data ..",
"",
)
self.heating_plan = None
self.power_plan = None
return
if self.ranked_solarpower is None or len(self.ranked_solarpower) < 4:
self.warning("No forecast, optimization compromized..", "")
if self.heating_plan is None:
self.heating_plan = self.create_heating_plan()
self.info(f"Heating plan of length {len(self.heating_plan)} created", "")
if self.heating_plan is None:
self.error("Failed to create heating plan")
return
if len(self.heating_plan) < 4:
self.info("Ditch remaining short heating plan ..", "")
self.heating_plan = None
self.power_plan = None
return
if ts_utc_now - self.relay_started < 10:
self.info(
f"Suspend relay update, started just {int(ts_utc_now - self.relay_started)} s ago"
)
return
self.relay_started = ts_utc_now
relay = self.consider_heating(ts_utc_now)
heat = {"Unit": "main_boiler", "Timestamp": ts_utc_now, "State": relay}
self.mqtt_client.publish(self.topic_power, json.dumps(heat), 1, True)
self.info(f"Heating state published with relay state {relay}", "")
[docs]
def consider_heating(self, ts: float) -> int:
"""Consider whether the target boiler needs heating.
Args:
ts (float): current UTC time
Returns:
int: 1 if heating is needed, 0 if not
"""
if self.main_boiler_temperature < 45.0:
self.info(
f"Low temp, force heating because {self.main_boiler_temperature} is less than {self.minimum_boiler_temperature}"
)
return 1
if self.main_boiler_temperature > self.maximum_boiler_temperature:
self.info(
f"Temperature beyond maximum already {self.main_boiler_temperature}"
)
return 0
# Check if heating plan has FOM > 0
if self.heating_plan[0]["FOM"] > self.uoi_limit:
self.info(
f"Best UOI is {self.heating_plan[0]['FOM']}, good enough to proceed..."
)
for p in self.heating_plan:
if ts > float(p["Timestamp"]) and ts < float(p["Timestamp"]) + 3600:
if float(p["FOM"]) > 1.5:
self.info(f"Cheap electricity at {self.timestampstr(p['Timestamp'])} UOI > {p['FOM']}")
return 1
# sorted by price, sheapest hours at the head
tplan = self.heating_plan[0:4]
plan = sorted(tplan, key=lambda x: x["Timestamp"])
ts_utc_start = plan[0]["Timestamp"]
ts_utc_stop = plan[-1]["Timestamp"]
if ts > ts_utc_stop:
self.error(
f"Run out of heating plan { self.timestampstr(ts_utc_start)} ... {self.timestampstr(ts_utc_stop)}"
)
self.heating_plan = None
self.power_plan = None
return 0
if ts < ts_utc_start:
self.info(f"Heating will start at {self.timestampstr(ts_utc_start)}")
return 0
# check the current plan slot and if FOM > 1 then yes
for p in plan:
if ts > p["Timestamp"] and ts < ts > p["Timestamp"] + 3600:
if p["FOM"] > self.uoi_limit:
self.info(f"Heat {self.timestampstr(p['Timestamp'])} because UOI > {p['FOM']}")
return 1
else:
self.info(f"No heating: {self.timestampstr(p["Timestamp"])}, because UOI > {p['FOM']}")
return 0
else:
self.info(
f"Expensive hours, no heating as UOI < {self.uoi_limit}",
self.heating_plan,
)
return 0
return 0
# compute figure of merit (FOM) for each hour
# the higher the solarenergy and the lower the spot the higher the FOM
# compute fom
[docs]
def compute_fom(self, solpower: float, spot: float) -> float:
"""Compute UOI - utilization optimization index.
Args:
solpower (float): current solar power forecast
spot (float): spot price
Returns:
float: utilization optimization index
"""
# total solar power is 6kW and max pow consumption about twice as much
# so when sun is shining with full power nearly half of the energy comes for free
if spot < 0.001:
return 2 # use
elif spot > 0.1:
return 0 # try not to use
else:
fom = 2 * (0.101 - spot) / 0.1
return fom
[docs]
def create_power_plan(self) -> list:
"""Create power plan.
Returns:
list: list of utilization entries
"""
ts_now = self.timestamp()
self.info("Creating new powerplan", "")
# syncronize spot and solarenergy by timestamp
spots = []
for s in self.ranked_spot_prices:
if s["Timestamp"] > ts_now:
spots.append(
{"Timestamp": s["Timestamp"], "PriceWithTax": s["PriceWithTax"]}
)
powers = []
for s in self.ranked_solarpower:
if s["ts"] > ts_now:
powers.append({"Timestamp": s["ts"], "Solarenergy": s["solarenergy"]})
hplan = []
for spot, solar in zip(spots, powers):
# maximum FOM is if spot is negative
solarenergy = solar["Solarenergy"]
spotprice = spot["PriceWithTax"]
fom = self.compute_fom(solarenergy, spotprice)
plan = {"Timestamp": spot["Timestamp"], "FOM": fom}
hplan.append(plan)
shplan = sorted(hplan, key=lambda x: x["FOM"], reverse=True)
self.info("Powerplan created", str(shplan))
return shplan
[docs]
def create_heating_plan(self) -> list:
"""Create heating plan.
Returns:
int: list of heating entries
"""
self.info("Creating heating plan", "")
state = 0
heating_plan = []
hours = 0
needed_hours = 5
for hp in self.power_plan:
fom = hp["FOM"]
if float(fom) >= self.uoi_limit and hours < needed_hours:
state = 1
else:
state = 0
heat = {
"Unit": "main_boiler",
"Timestamp": hp["Timestamp"],
"State": state,
"FOM": fom,
"UOI": fom,
}
self.mqtt_client.publish(self.topic_powerplan, json.dumps(heat), 1, True)
heating_plan.append(hp)
hours = hours + 1
self.info(
"Heating plan published starting "
+ self.timestampstr(heating_plan[0]["Timestamp"]),
"",
)
return heating_plan
[docs]
@classmethod
def register(cls) ->None:
if cls._class_id is None:
Base.register()
cls.initialize_class()