Source code for juham.ts.forecast_record
from juham.base import Object
from juham.base import Base
from influxdb_client_3 import Point
import json
[docs]
class ForecastRecord(Base):
"""Forecast database record.
This class listens the forecast topic and writes to the time series
database.
"""
_class_id = None
def __init__(self, name="forecastrecord"):
"""Construct forecast record object with the given name."""
super().__init__(name)
[docs]
def on_connect(self, client, userdata, flags, rc):
"""Standard mqtt connect notification.
This method is called when the client connection with the MQTT
broker is established.
"""
super().on_connect(client, userdata, flags, rc)
self.subscribe(Base.mqtt_root_topic + "/forecast")
self.debug(f"Subscribed to {Base.mqtt_root_topic}/forecast")
[docs]
def on_message(self, client, userdata, msg):
"""Standard mqtt message notification method.
This method is called upon new arrived message.
"""
m = json.loads(msg.payload.decode())
for h in m:
ts = h["ts"]
temperature = h["temperature"]
wind_speed = h["windspeed"]
solarenergy = h["solarenergy"]
try:
point = (
Point("forecast")
.field("wind_speed", wind_speed)
.field("temperature", temperature)
.field("solarenergy", solarenergy)
.time(self.epoc2utc(ts))
)
self.write(point)
except Exception as e:
self.error(f"Cannot write forecast time series", str(e))
[docs]
@classmethod
def register(cls):
if cls._class_id is None:
Base.register()
cls.initialize_class()