Source code for juham.web.rcloud
import requests
from typing import Any
from juham.web.rthread import RThread, IWorkerThread
[docs]
class RCloudThread(IWorkerThread):
"""Base class for thread that query data from clouds and other web
resources via url.
Can be configured how often the query is being run.
"""
timeout: float = 60
def __init__(self, client=None):
super().__init__()
[docs]
def make_url(self) -> str:
"""Build http url for acquiring data from the web resource. Up to the
sub classes to implement.
This method is periodically called by update method.
Returns: Url to be used as parameter to requests.get().
"""
return ""
[docs]
def update(self) -> bool:
"""Acquire and process.
This method is periodically called to acquire data from a the configured web url
and publish it to respective MQTT topic in the process_data() method.
Returns: True if the update succeeded. Returning False implies an error and
in which case the method should be called shortly again to retry. It is up
to the caller to decide the number of failed attempts before giving up.
"""
headers: dict = {}
params: dict = {}
url = self.make_url()
try:
response = requests.get(
url, headers=headers, params=params, timeout=self.timeout
)
if response.status_code == 200:
self.process_data(response)
return True
else:
self.error(f"Reading {url} failed: {str(response)}")
except Exception as e:
self.error(f"Requesting data from {url} failed", str(e))
return False
[docs]
def process_data(self, data: Any) -> None:
"""Process the acquired data.
This method is called from the update method, to process the
data from the acquired data source. It is up to the sub classes
to implement this.
"""
[docs]
@classmethod
def register(cls):
IWorkerThread.register()
[docs]
class RCloud(RThread):
"""Base class for automation objects that query data using a URL.
Spawns an asynchronous thread to acquire data at a specified time
interval.
"""
[docs]
@classmethod
def register(cls):
RThread.register()