Source code for libmuscle.pytest.implementation_tester

import logging
import os
from typing import Optional
from libmuscle import Instance, InstanceFlags, Message
from ymmsl.v0_2 import Configuration, Operator, Reference


_logger = logging.getLogger(__name__)


[docs] class ImplementationTester: """ The ImplementationTester creates a MUSCLE3 Instance that acts as the "tester" component, which is connected to the implementation under test. """ def __init__(self, default_timeout: float, muscle_manager_address: str, test_ymmsl_config: Configuration) -> None: """ Initialize the implementation tester. Args: default_timeout: Default timeout for receive operations in seconds. """ self._default_timeout = default_timeout self._is_shut_down = False # Pass manager address and instance name through environment os.environ["MUSCLE_MANAGER"] = muscle_manager_address os.environ["MUSCLE_INSTANCE"] = "muscle3_implementation_tester" test_model = test_ymmsl_config.programs[ Reference("muscle3_implementation_tester") ] instance_ports = { Operator.O_I: [str(p) for p in test_model.ports.sending_port_names()], Operator.S: [str(p) for p in test_model.ports.receiving_port_names()] } self._instance = Instance( ports=instance_ports, flags=InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS) # Configure the deadlock-detection timeout to match the default receive # timeout: after `default_timeout` seconds of waiting the manager is # notified, and if a deadlock is detected the simulation is aborted. self._instance._communicator.set_receive_timeout(default_timeout) self._instance.reuse_instance()
[docs] def send( self, port_name: str, message: Message, slot: Optional[int] = None ) -> None: """ Send a message on the specified port. Args: port_name: Name of the port to send on (without 'send_' prefix). message: The message to send. slot: Optional slot number for vector ports. """ self._instance.send(port_name, message, slot)
[docs] def receive( self, port_name: str, slot: Optional[int] = None, *, timeout: Optional[float] = None, ) -> Message: """ Receive a message from the specified port. Args: port_name: Name of the port to receive from (without 'receive_' prefix). slot: Optional slot number for vector ports. timeout: Timeout in seconds. If None, uses default_timeout. Raises: RuntimeError: If a deadlock is detected or the connection to the implementation was lost while waiting for a message. """ if timeout is None: timeout = self._default_timeout self._instance._communicator.set_receive_timeout(timeout) try: return self._instance.receive(port_name, slot) except RuntimeError as exc: # A RuntimeError here means either a deadlock was detected by the # manager, or the connection to the implementation was lost (e.g. # it crashed) _logger.error( "ImplementationTester: error while waiting for a message on" " port '%s'. Shutting down.", port_name ) self._is_shut_down = True self._instance.error_shutdown(str(exc)) raise
[docs] def cleanup(self) -> None: """Clean up the tester instance. Safe to call even if the instance was already shut down due to a timeout or deadlock error. """ if not self._is_shut_down: while self._instance.reuse_instance(): pass