Source code for pyjabber.network.StreamAlivenessMonitor
importasyncioimportwarnings
[docs]classStreamAlivenessMonitor:""" This class is a helper to monitor the aliveness of a stream. It will call a callback if the stream is not alive after a timeout. """def__init__(self,timeout=60,callback=None):self._timeout=timeoutself._timeout_callback=callbackself._timeout_task=Noneself._reset_event=asyncio.Event()def__del__(self):self.cancel()asyncdef_timeout_task_coro(self):try:awaitasyncio.wait_for(self._reset_event.wait(),timeout=self._timeout)exceptasyncio.TimeoutError:ifself._timeout_callbackisnotNone:self._timeout_callback()
[docs]defreset(self):""" Reset the timer. Called always after received a message from the client/server """ifself._timeout_taskisnotNone:self._reset_event.set()self._timeout_task.cancel()self._reset_event.clear()withwarnings.catch_warnings():warnings.simplefilter("ignore")self._timeout_task=asyncio.create_task(self._timeout_task_coro())