Source code for juham.base.jmqtt
from .object import Object
[docs]
class JMqtt(Object):
"""Abstract base class for MQTT brokers."""
_class_id = None
def __init__(self, name):
super().__init__(name)
[docs]
def disconnect_from_server(self):
"""Disconnect from the MQTT broker.
It is up to the sub classes to implement the method.
"""
[docs]
def connect_to_server(
self, host: str = "localhost", port: int = 1883, keepalive: int = 60
):
"""Connect to MQTT server
Args:
host (str, optional): host. Defaults to "localhost".
port (int, optional): port. Defaults to 1883.
keepalive (int, optional): keep alive, in seconds. Defaults to 60.
"""
[docs]
def loop_stop(self):
"""Stop the network loop.
No further messages shall be dispatched.
"""
[docs]
@classmethod
def register(cls):
if cls._class_id is None:
Object.register()
cls.initialize_class()