Source code for juham.automation.energycostcalculator
import json
from influxdb_client_3 import Point
from juham.base import Base
[docs]
class EnergyCostCalculator(Base):
"""Energy revenue/cost calculator for calculating energy balance between
produced and consumed energy.
Subscribes to spot and power MQTT topics, calculates per hour, per
day, per month and per year costs and writes them the it to the
time series database.
"""
_class_id = ""
topic_in_spot = Base.mqtt_root_topic + "/spot"
topic_in_powerconsumption = Base.mqtt_root_topic + "/powerconsumption"
to_joule_coeff = 1.0 / (1000.0 * 3600)
def __init__(self, name="ecc"):
super().__init__(name)
self.current_ts = 0
self.current_cost = 0
[docs]
def on_connect(self, client, userdata, flags, rc):
super().on_connect(client, userdata, flags, rc)
if rc == 0:
self.subscribe(self.topic_in_spot)
self.subscribe(self.topic_in_powerconsumption)
[docs]
def on_message(self, client, userdata, msg):
"""Handle MQTT message.
Args:
client (object) : client
userdata (any): user data
msg (object): mqtt message
"""
ts_now = self.timestamp()
m = json.loads(msg.payload.decode())
if msg.topic == self.topic_in_spot:
self.on_spot(m)
elif msg.topic == self.topic_in_powerconsumption:
self.on_powerconsumption(ts_now, m)
else:
self.error(f"Unknown event {msg.topic}")
[docs]
def on_spot(self, spot):
"""Stores the received per hour electricity prices to spots list.
Args:
spot (list): list of hourly spot prices
"""
self.spots = []
for s in spot:
self.spots.append(
{"Timestamp": s["Timestamp"], "PriceWithTax": s["PriceWithTax"]}
)
[docs]
def map_prices_to_joules(self, price):
"""Convert the given electricity price in kWh to Watt seconds (J)
Args:
price (float): electricity price given as kWh
Returns:
Electricity price per watt second (J)
"""
return price * self.to_joule_coeff
[docs]
def get_prices(self, ts_prev, ts_now):
"""Fetch the electricity prices for the given two subsequent time
stamps.
Args:
ts_prev (timestamp): previous time
ts_now (timestamp): current time
Returns:
Electricity prices for the given interval
"""
prev_price = None
current_price = None
for i in range(0, len(self.spots) - 1):
r0 = self.spots[i]
r1 = self.spots[i + 1]
ts0 = r0["Timestamp"]
ts1 = r1["Timestamp"]
if ts_prev >= ts0 and ts_prev <= ts1:
prev_price = r0["PriceWithTax"]
if ts_now >= ts0 and ts_now <= ts1:
current_price = r0["PriceWithTax"]
if prev_price is not None and current_price is not None:
return prev_price, current_price
self.error("PANIC: run out of spot prices")
return 0.0, 0.0
[docs]
def calculate_net_energy_cost(self, ts_prev, ts_now, energy):
"""Given time interval as start and stop Calculate the cost over the
given time period. Positive values indicate revenue, negative cost.
Args:
ts_prev (timestamp): beginning time stamp of the interval
ts_now (timestamp): end of the interval
energy (float): energy consumed during the time interval
Returns:
Cost or revenue
"""
cost = 0
prev = ts_prev
while prev < ts_now:
elapsed_seconds = ts_now - prev
if elapsed_seconds > 3600:
elapsed_seconds = 3600
now = prev + elapsed_seconds
start_per_kwh, stop_per_kwh = self.get_prices(prev, now)
start_price = self.map_prices_to_joules(start_per_kwh)
stop_price = self.map_prices_to_joules(stop_per_kwh)
if abs(stop_price - start_price) < 1e-24:
cost = cost + energy * elapsed_seconds * start_price
self.debug(
f"Energy cost {str(cost)} e = {str(energy)} J x {str(start_price)} e/J x {str(elapsed_seconds)} s"
)
else:
# calcualte cost over hour boundary
ts_0 = (now // 3600.0) * 3600
t1 = (ts_0 - prev) / (now - prev)
t2 = (now - ts_0) / (now - prev)
cost = (
cost
+ energy
* ((1.0 - t1) * start_price + t2 * stop_price)
* elapsed_seconds
)
self.debug(
f"Cost over hour boundary {str(cost)} e = {str(energy)} J x {str(start_price)} e/J x {str(t1)} s + {str(stop_price)}e x {str(t2)} s"
)
prev = prev + elapsed_seconds
return cost
[docs]
def on_powerconsumption(self, ts_now, m):
"""Update energy consumption.
Args:
ts_now (timestamp): time stamp of the energy consumed
m (dictionary): Juham MQTT message holding energy reading
"""
current_power = m["real_total"]
if self.spots is None:
self.info("Waiting for electricity prices...")
elif self.current_ts == 0:
self.current_ts = ts_now
self.info("Energy cost calculator initialized")
else:
dp = self.calculate_net_energy_cost(self.current_ts, ts_now, current_power)
self.current_cost = self.current_cost + dp
self.current_ts = ts_now
self.record_energycost(ts_now, self.name, self.current_cost)
[docs]
def record_energycost(self, ts_now, site, cost):
"""Record energy cost/revenue to data storage. Positive values represent
revenue whereas negative cost.
Args:
ts_now (float): timestamp
site ([type]): site
cost ([type]): cost or revenue.
"""
try:
point = (
Point("energycost")
.tag("site", site)
.field("cost", cost)
.time(self.epoc2utc(ts_now))
)
self.write(point)
except Exception as e:
self.error(f"Cannot write energycost at{self.timestampstr(ts_now)}", str(e))
[docs]
@classmethod
def register(cls):
if cls._class_id == "":
Base.register()
cls.initialize_class()
cls.register_topic(Base.mqtt_root_topic + "/spot")
cls.register_topic(Base.mqtt_root_topic + "/powerconsumption")