Source code for libmuscle.receive_timeout_handler

import logging
from typing import Optional

from ymmsl.v0_2 import Reference

from libmuscle.mcp.transport_client import TimeoutHandler
from libmuscle.mmp_client import MMPClient


_logger = logging.getLogger(__name__)


[docs] class Deadlock(Exception): """Exception that is raised when the simulation has deadlocked."""
[docs] class ReceiveTimeoutHandler(TimeoutHandler): """Timeout handler when receiving messages from peers. This handler sends a message to the Muscle Manager when the receive times out (and another message when the message does arrive). This is used by the manager to detect if the simulation is in a deadlock, where a cycle of instances is waiting for each other. """ def __init__( self, manager: MMPClient, peer_instance: Reference, port_name: str, slot: Optional[int], timeout: float ) -> None: """Initialize a new timeout handler. Args: manager: Connection to the muscle manager. peer_instance: the peer instance we try to receive from. port_name: the name of the port we try to receive on. slot: the slot we try to receive on. timeout: Timeout in seconds. """ self._manager = manager self._peer_instance = peer_instance self._port_name = port_name self._slot = slot self._timeout = timeout # Counter to keep track of the number of timeouts self._num_timeouts = 0 @property def timeout(self) -> float: # Increase timeout by a factor 1.5 with every timeout we hit: factor = 1.5 ** self._num_timeouts return self._timeout * factor
[docs] def on_timeout(self) -> None: if self._num_timeouts == 0: # Notify the manager that we're waiting for a receive self._manager.waiting_for_receive( self._peer_instance, self._port_name, self._slot) else: # Ask the manager if we're part of a detected deadlock if self._manager.is_deadlocked(): _logger.error( 'Deadlock detected, shutting down! Please see the manager log' ' for a description of the problem.') raise Deadlock() self._num_timeouts += 1
[docs] def on_receive(self) -> None: self._manager.waiting_for_receive_done( self._peer_instance, self._port_name, self._slot)