Source code for juham.base.jmqtt

from .object import Object


[docs] class JMqtt(Object): """Abstract base class for MQTT brokers.""" _class_id = None def __init__(self, name): super().__init__(name)
[docs] def disconnect_from_server(self): """Disconnect from the MQTT broker. It is up to the sub classes to implement the method. """
[docs] def connect_to_server( self, host: str = "localhost", port: int = 1883, keepalive: int = 60 ): """Connect to MQTT server Args: host (str, optional): host. Defaults to "localhost". port (int, optional): port. Defaults to 1883. keepalive (int, optional): keep alive, in seconds. Defaults to 60. """
[docs] def loop_stop(self): """Stop the network loop. No further messages shall be dispatched. """
[docs] @classmethod def register(cls): if cls._class_id is None: Object.register() cls.initialize_class()