Source code for libmuscle.communicator

import logging
from typing import Any, Dict, List, Optional, Tuple, cast
from ymmsl import Identifier, Reference, Settings

from libmuscle.endpoint import Endpoint
from libmuscle.mpp_message import ClosePort, MPPMessage
from libmuscle.mpp_client import MPPClient
from libmuscle.mpp_server import MPPServer
from libmuscle.mcp.tcp_util import SocketClosed
from libmuscle.peer_info import PeerInfo
from libmuscle.port_manager import PortManager
from libmuscle.profiler import Profiler
from libmuscle.profiling import (
        ProfileEvent, ProfileEventType, ProfileTimestamp)

_logger = logging.getLogger(__name__)

MessageObject = Any

[docs]class Message: """A message to be sent or received. This class describes a message to be sent or that has been received. Attributes: timestamp (float): Simulation time for which this data is valid. next_timestamp (Optional[float]): Simulation time for the next message to be transmitted through this port. data (MessageObject): An object to send or that was received. settings (Settings): Overlay settings to send or that was received. """ # Note: This is for communication with the user, it's not what # actually goes out on the wire, see libmuscle.mcp.Message for that.
[docs] def __init__(self, timestamp: float, next_timestamp: Optional[float] = None, data: MessageObject = None, settings: Optional[Settings] = None ) -> None: """Create a Message. Args: timestamp: Simulation time for which this data is valid. next_timestamp: Simulation time for the next message to be transmitted through this port. data: An object to send or that was received. settings: Overlay settings to send or that were received. """ # make sure timestamp and next_timestamp are floats timestamp = float(timestamp) if next_timestamp is not None: next_timestamp = float(next_timestamp) self.timestamp = timestamp self.next_timestamp = next_timestamp = data self.settings = settings
[docs]class Communicator: """Communication engine for MUSCLE3. This class is the mailroom for a kernel that uses MUSCLE3. It manages the sending and receiving of messages, although it leaves the actual data transmission to various protocol-specific servers and clients. """ def __init__( self, kernel: Reference, index: List[int], port_manager: PortManager, profiler: Profiler) -> None: """Create a Communicator. The instance reference must start with one or more Identifiers, giving the kernel id, followed by one or more integers which specify the instance index. Args: kernel: The kernel this is the Communicator for. index: The index for this instance. port_manager: The PortManager to use. profiler: The profiler to use for recording sends and receives. """ self._kernel = kernel self._index = index self._port_manager = port_manager self._profiler = profiler self._server = MPPServer() # indexed by remote instance id self._clients: Dict[Reference, MPPClient] = {}
[docs] def get_locations(self) -> List[str]: """Returns a list of locations that we can be reached at. These locations are of the form 'protocol:location', where the protocol name does not contain a colon and location may be an arbitrary string. Returns: A list of strings describing network locations. """ return self._server.get_locations()
[docs] def set_peer_info(self, peer_info: PeerInfo) -> None: """Inform this Communicator about its peers. This tells the Communicator about its peers, so that it can route messages accordingly. Args: peer_info: Information about the peers. """ self._peer_info = peer_info
[docs] def send_message( self, port_name: str, message: Message, slot: Optional[int] = None, checkpoints_considered_until: float = float('-inf')) -> None: """Send a message and settings to the outside world. Sending is non-blocking, a copy of the message will be made and stored until the receiver is ready to receive it. Args: port_name: The port on which this message is to be sent. message: The message to be sent. slot: The slot to send the message on, if any. checkpoints_considered_until: When we last checked if we should save a snapshot (wallclock time). """ if slot is None: _logger.debug('Sending message on {}'.format(port_name)) slot_list: List[int] = [] else: _logger.debug('Sending message on {}[{}]'.format(port_name, slot)) slot_list = [slot] snd_endpoint = self.__get_endpoint(port_name, slot_list) if not self._port_manager.get_port(str(snd_endpoint.port)).is_connected(): # log sending on disconnected port return port = self._port_manager.get_port(port_name) profile_event = ProfileEvent( ProfileEventType.SEND, ProfileTimestamp(), None, port, None, slot, port.get_num_messages(slot), None, message.timestamp) recv_endpoints = self._peer_info.get_peer_endpoints( snd_endpoint.port, slot_list) port_length = None if port.is_resizable(): port_length = port.get_length() for recv_endpoint in recv_endpoints: mpp_message = MPPMessage(snd_endpoint.ref(), recv_endpoint.ref(), port_length, message.timestamp, message.next_timestamp, cast(Settings, message.settings), port.get_num_messages(slot), checkpoints_considered_until, encoded_message = mpp_message.encoded() self._server.deposit(recv_endpoint.ref(), encoded_message) port.increment_num_messages(slot) profile_event.stop() if port.is_vector(): profile_event.port_length = port.get_length() profile_event.message_size = len(encoded_message) if not isinstance(, ClosePort): self._profiler.record_event(profile_event)
[docs] def receive_message( self, port_name: str, slot: Optional[int] = None) -> Tuple[Message, float]: """Receive a message and attached settings overlay. Receiving is a blocking operation. This function will contact the sender, wait for a message to be available, and receive and return it. If the port is not connected, then the default value will be returned if one was given, exactly as it was given. If no default was given then a RuntimeError will be raised. Args: port_name: The endpoint on which a message is to be received. slot: The slot to receive the message on, if any. Returns: The received message, with message.settings holding the settings overlay. The settings attribute is guaranteed to not be None. Secondly, the saved_until metadata field from the received message. Raises: RuntimeError: If the network connection had an error, or the message number was incorrect. """ if port_name == 'muscle_settings_in': port = self._port_manager._muscle_settings_in else: port = self._port_manager.get_port(port_name) if slot is None: port_and_slot = port_name slot_list: List[int] = [] else: port_and_slot = f"{port_name}[{slot}]" slot_list = [slot] _logger.debug('Waiting for message on {}'.format(port_and_slot)) recv_endpoint = self.__get_endpoint(port_name, slot_list) receive_event = ProfileEvent( ProfileEventType.RECEIVE, ProfileTimestamp(), None, port, None, slot, port.get_num_messages()) # peer_info already checks that there is at most one snd_endpoint # connected to the port we receive on snd_endpoint = self._peer_info.get_peer_endpoints( recv_endpoint.port, slot_list)[0] client = self.__get_client(snd_endpoint.instance()) try: mpp_message_bytes, profile = client.receive(recv_endpoint.ref()) except (ConnectionError, SocketClosed) as exc: raise RuntimeError( "Error while receiving a message: connection with peer" f" '{snd_endpoint.kernel}' was lost. Did the peer crash?" ) from exc recv_decode_event = ProfileEvent( ProfileEventType.RECEIVE_DECODE, ProfileTimestamp(), None, port, None, slot, port.get_num_messages(), len(mpp_message_bytes)) mpp_message = MPPMessage.from_bytes(mpp_message_bytes) recv_decode_event.stop() if mpp_message.port_length is not None: if port.is_resizable(): port.set_length(mpp_message.port_length) if isinstance(, ClosePort): port.set_closed(slot) message = Message( mpp_message.timestamp, mpp_message.next_timestamp,, mpp_message.settings_overlay) recv_wait_event = ProfileEvent( ProfileEventType.RECEIVE_WAIT, profile[0], profile[1], port, mpp_message.port_length, slot, port.get_num_messages(), len(mpp_message_bytes), message.timestamp) recv_xfer_event = ProfileEvent( ProfileEventType.RECEIVE_TRANSFER, profile[1], profile[2], port, mpp_message.port_length, slot, port.get_num_messages(), len(mpp_message_bytes), message.timestamp) recv_decode_event.message_timestamp = message.timestamp receive_event.message_timestamp = message.timestamp if port.is_vector(): receive_event.port_length = port.get_length() recv_wait_event.port_length = port.get_length() recv_xfer_event.port_length = port.get_length() recv_decode_event.port_length = port.get_length() receive_event.message_size = len(mpp_message_bytes) if not isinstance(, ClosePort): self._profiler.record_event(recv_wait_event) self._profiler.record_event(recv_xfer_event) self._profiler.record_event(recv_decode_event) self._profiler.record_event(receive_event) expected_message_number = port.get_num_messages(slot) if expected_message_number != mpp_message.message_number: if (expected_message_number - 1 == mpp_message.message_number and port.is_resuming(slot)): _logger.debug(f'Discarding received message on {port_and_slot}' ': resuming from weakly consistent snapshot') port.set_resumed(slot) return self.receive_message(port_name, slot) raise RuntimeError(f'Received message on {port_and_slot} with' ' unexpected message number' f' {mpp_message.message_number}. Was expecting' f' {expected_message_number}. Are you resuming' ' from an inconsistent snapshot?') port.increment_num_messages(slot) _logger.debug('Received message on {}'.format(port_and_slot)) if isinstance(, ClosePort): _logger.debug('Port {} is now closed'.format(port_and_slot)) return message, mpp_message.saved_until
[docs] def shutdown(self) -> None: """Shuts down the Communicator, closing connections. """ self._close_ports() for client in self._clients.values(): client.close() wait_event = ProfileEvent(ProfileEventType.DISCONNECT_WAIT, ProfileTimestamp()) self._server.wait_for_receivers() self._profiler.record_event(wait_event) shutdown_event = ProfileEvent(ProfileEventType.SHUTDOWN, ProfileTimestamp()) self._server.shutdown() self._profiler.record_event(shutdown_event)
def __instance_id(self) -> Reference: """Returns our complete instance id. """ return self._kernel + self._index def __get_client(self, instance: Reference) -> MPPClient: """Get or create a client to connect to the given instance. Args: instance: A reference to the instance to connect to. Returns: An existing or new MCP client. """ if instance not in self._clients: locations = self._peer_info.get_peer_locations(instance)'Connecting to peer {instance} at {locations}') self._clients[instance] = MPPClient(locations) return self._clients[instance] def __get_endpoint(self, port_name: str, slot: List[int]) -> Endpoint: """Determines the endpoint on our side. Args: port_name: Name of the port to send or receive on. slot: Slot to send or receive on. """ try: port = Identifier(port_name) except ValueError as e: raise ValueError('"{}" is not a valid port name: {}'.format( port_name, e)) return Endpoint(self._kernel, self._index, port, slot) def _close_port(self, port_name: str, slot: Optional[int] = None) -> None: """Closes the given port. This signals to any connected instance that no more messages will be sent on this port, which it can use to decide whether to shut down or continue running. Args: port_name: The name of the port to close. """ message = Message(float('inf'), None, ClosePort(), Settings()) if slot is None: _logger.debug('Closing port {}'.format(port_name)) else: _logger.debug('Closing port {}[{}]'.format(port_name, slot)) self.send_message(port_name, message, slot) def _close_outgoing_ports(self) -> None: """Closes outgoing ports. This sends a close port message on all slots of all outgoing ports. """ for operator, ports in self._port_manager.list_ports().items(): if operator.allows_sending(): for port_name in ports: port = self._port_manager.get_port(port_name) if port.is_vector(): for slot in range(port.get_length()): self._close_port(port_name, slot) else: self._close_port(port_name) def _drain_incoming_port(self, port_name: str) -> None: """Receives messages until a ClosePort is received. Receives at least once. Args: port_name: Port to drain. """ port = self._port_manager.get_port(port_name) while port.is_open(): # TODO: log warning if not a ClosePort self.receive_message(port_name) def _drain_incoming_vector_port(self, port_name: str) -> None: """Receives messages until a ClosePort is received. Works with (resizable) vector ports. Args: port_name: Port to drain. """ port = self._port_manager.get_port(port_name) while not all([not port.is_open(slot) for slot in range(port.get_length())]): for slot in range(port.get_length()): if port.is_open(slot): self.receive_message(port_name, slot) def _close_incoming_ports(self) -> None: """Closes incoming ports. This receives on all incoming ports until a ClosePort is received on them, signaling that there will be no more messages, and allowing the sending instance to shut down cleanly. """ for operator, port_names in self._port_manager.list_ports().items(): if operator.allows_receiving(): for port_name in port_names: port = self._port_manager.get_port(port_name) if not port.is_connected(): continue if not port.is_vector(): self._drain_incoming_port(port_name) else: self._drain_incoming_vector_port(port_name) def _close_ports(self) -> None: """Closes all ports. This sends a close port message on all slots of all outgoing ports, then receives one on all incoming ports. """ self._close_outgoing_ports() self._close_incoming_ports()