Source code for libmuscle.communicator

import logging
from typing import Any, Dict, List, Optional, Tuple, cast
from ymmsl import Conduit, Identifier, Operator, Reference, Settings

from libmuscle.endpoint import Endpoint
from libmuscle.mpp_message import ClosePort, MPPMessage
from libmuscle.mpp_client import MPPClient
from libmuscle.mcp.tcp_util import SocketClosed
from libmuscle.mcp.transport_server import TransportServer
from libmuscle.mcp.type_registry import transport_server_types
from libmuscle.peer_manager import PeerManager
from libmuscle.post_office import PostOffice
from libmuscle.port import Port
from libmuscle.profiler import Profiler
from libmuscle.profiling import (
        ProfileEvent, ProfileEventType, ProfileTimestamp)


_logger = logging.getLogger(__name__)


MessageObject = Any


[docs]class Message: """A message to be sent or received. This class describes a message to be sent or that has been received. Attributes: timestamp (float): Simulation time for which this data is valid. next_timestamp (Optional[float]): Simulation time for the next message to be transmitted through this port. data (MessageObject): An object to send or that was received. settings (Settings): Overlay settings to send or that was received. """ # Note: This is for communication with the user, it's not what # actually goes out on the wire, see libmuscle.mcp.Message for that.
[docs] def __init__(self, timestamp: float, next_timestamp: Optional[float] = None, data: MessageObject = None, settings: Optional[Settings] = None ) -> None: """Create a Message. Args: timestamp: Simulation time for which this data is valid. next_timestamp: Simulation time for the next message to be transmitted through this port. data: An object to send or that was received. settings: Overlay settings to send or that were received. """ # make sure timestamp and next_timestamp are floats timestamp = float(timestamp) if next_timestamp is not None: next_timestamp = float(next_timestamp) self.timestamp = timestamp self.next_timestamp = next_timestamp self.data = data self.settings = settings
[docs]class Communicator: """Communication engine for MUSCLE3. This class is the mailroom for a kernel that uses MUSCLE3. It manages the sending and receiving of messages, although it leaves the actual data transmission to various protocol-specific servers and clients. """ def __init__(self, kernel: Reference, index: List[int], declared_ports: Optional[Dict[Operator, List[str]]], profiler: Profiler) -> None: """Create a Communicator. The instance reference must start with one or more Identifiers, giving the kernel id, followed by one or more integers which specify the instance index. Args: kernel: The kernel this is the Communicator for. index: The index for this instance. declared_ports: The declared ports for this instance profiler: The profiler to use for recording sends and receives. """ self._kernel = kernel self._index = index self._declared_ports = declared_ports self._post_office = PostOffice() self._profiler = profiler self._servers: List[TransportServer] = [] # indexed by remote instance id self._clients: Dict[Reference, MPPClient] = {} for server_type in transport_server_types: server = server_type(self._post_office) self._servers.append(server) self._ports: Dict[str, Port] = {}
[docs] def get_locations(self) -> List[str]: """Returns a list of locations that we can be reached at. These locations are of the form 'protocol:location', where the protocol name does not contain a colon and location may be an arbitrary string. Returns: A list of strings describing network locations. """ return [server.get_location() for server in self._servers]
[docs] def connect(self, conduits: List[Conduit], peer_dims: Dict[Reference, List[int]], peer_locations: Dict[Reference, List[str]]) -> None: """Connect this Communicator to its peers. This is the second stage in the simulation wiring process. Peers here are instances, and peer_dims and peer_locations are indexed by a Reference to an instance. Instance sets are multi-dimensional arrays with sizes given by peer_dims. Args: conduits: A list of conduits attached to this component, as received from the manager. peer_dims: For each peer we share a conduit with, the dimensions of the instance set. peer_locations: A list of locations for each peer instance we share a conduit with. """ self._peer_manager = PeerManager( self._kernel, self._index, conduits, peer_dims, peer_locations) if self._declared_ports is not None: self._ports = self.__ports_from_declared() else: self._ports = self.__ports_from_conduits(conduits) self._muscle_settings_in = self.__settings_in_port(conduits)
[docs] def settings_in_connected(self) -> bool: """Returns True iff muscle_settings_in is connected. """ return self._muscle_settings_in.is_connected()
[docs] def list_ports(self) -> Dict[Operator, List[str]]: """Returns a description of the ports this Communicator has. Returns: A dictionary, indexed by Operator, containing lists of port names. Operators with no associated ports are not included. """ result: Dict[Operator, List[str]] = {} for port_name, port in self._ports.items(): if port.operator not in result: result[port.operator] = list() result[port.operator].append(port_name) return result
[docs] def port_exists(self, port_name: str) -> bool: """Returns whether a port with the given name exists. Args: port_name: Port name to check. """ return port_name in self._ports
[docs] def get_port(self, port_name: str) -> Port: """Returns a Port object describing a port with the given name. Args: port: The port to retrieve. """ return self._ports[port_name]
[docs] def send_message( self, port_name: str, message: Message, slot: Optional[int] = None, checkpoints_considered_until: float = float('-inf')) -> None: """Send a message and settings to the outside world. Sending is non-blocking, a copy of the message will be made and stored until the receiver is ready to receive it. Args: port_name: The port on which this message is to be sent. message: The message to be sent. slot: The slot to send the message on, if any. checkpoints_considered_until: When we last checked if we should save a snapshot (wallclock time). """ if slot is None: _logger.debug('Sending message on {}'.format(port_name)) slot_list: List[int] = [] else: _logger.debug('Sending message on {}[{}]'.format(port_name, slot)) slot_list = [slot] slot_length = self._ports[port_name].get_length() if slot_length <= slot: raise RuntimeError(('Slot out of bounds. You are sending on' ' slot {} of port "{}", which is of length' ' {}, so that slot does not exist' ).format(slot, port_name, slot_length)) snd_endpoint = self.__get_endpoint(port_name, slot_list) if not self._peer_manager.is_connected(snd_endpoint.port): # log sending on disconnected port return port = self._ports[port_name] profile_event = ProfileEvent( ProfileEventType.SEND, ProfileTimestamp(), None, port, None, slot, port.get_num_messages(slot), None, message.timestamp) recv_endpoints = self._peer_manager.get_peer_endpoints( snd_endpoint.port, slot_list) port_length = None if port.is_resizable(): port_length = port.get_length() for recv_endpoint in recv_endpoints: mpp_message = MPPMessage(snd_endpoint.ref(), recv_endpoint.ref(), port_length, message.timestamp, message.next_timestamp, cast(Settings, message.settings), port.get_num_messages(slot), checkpoints_considered_until, message.data) encoded_message = mpp_message.encoded() self._post_office.deposit(recv_endpoint.ref(), encoded_message) port.increment_num_messages(slot) profile_event.stop() if port.is_vector(): profile_event.port_length = port.get_length() profile_event.message_size = len(encoded_message) if not isinstance(message.data, ClosePort): self._profiler.record_event(profile_event)
[docs] def receive_message(self, port_name: str, slot: Optional[int] = None, default: Optional[Message] = None ) -> Tuple[Message, float]: """Receive a message and attached settings overlay. Receiving is a blocking operation. This function will contact the sender, wait for a message to be available, and receive and return it. If the port is not connected, then the default value will be returned if one was given, exactly as it was given. If no default was given then a RuntimeError will be raised. Args: port_name: The endpoint on which a message is to be received. slot: The slot to receive the message on, if any. default: A message to return if this port is not connected. Returns: The received message, with message.settings holding the settings overlay. The settings attribute is guaranteed to not be None. Secondly, the saved_until metadata field from the received message. Raises: RuntimeError: If no default was given and the port is not connected. """ if slot is None: port_and_slot = port_name slot_list: List[int] = [] else: port_and_slot = f"{port_name}[{slot}]" slot_list = [slot] _logger.debug('Waiting for message on {}'.format(port_and_slot)) recv_endpoint = self.__get_endpoint(port_name, slot_list) if not self._peer_manager.is_connected(recv_endpoint.port): if default is None: raise RuntimeError(('Tried to receive on port "{}", which is' ' disconnected, and no default value was' ' given. Either specify a default, or' ' connect a sending component to this' ' port.').format(port_name)) _logger.debug( 'No message received on {} as it is not connected'.format( port_name)) return default, float('-inf') if port_name in self._ports: port = self._ports[port_name] else: # it's muscle_settings_in here, because we check for unknown # user ports in Instance already, and we don't have any other # built-in automatic ports. port = self._muscle_settings_in receive_event = ProfileEvent( ProfileEventType.RECEIVE, ProfileTimestamp(), None, port, None, slot, port.get_num_messages()) # peer_manager already checks that there is at most one snd_endpoint # connected to the port we receive on snd_endpoint = self._peer_manager.get_peer_endpoints( recv_endpoint.port, slot_list)[0] client = self.__get_client(snd_endpoint.instance()) try: mpp_message_bytes, profile = client.receive(recv_endpoint.ref()) except (ConnectionError, SocketClosed) as exc: raise RuntimeError( "Error while receiving a message: connection with peer" f" '{snd_endpoint.kernel}' was lost. Did the peer crash?" ) from exc recv_decode_event = ProfileEvent( ProfileEventType.RECEIVE_DECODE, ProfileTimestamp(), None, port, None, slot, port.get_num_messages(), len(mpp_message_bytes)) mpp_message = MPPMessage.from_bytes(mpp_message_bytes) recv_decode_event.stop() if mpp_message.port_length is not None: if port.is_resizable(): port.set_length(mpp_message.port_length) if isinstance(mpp_message.data, ClosePort): port.set_closed(slot) message = Message( mpp_message.timestamp, mpp_message.next_timestamp, mpp_message.data, mpp_message.settings_overlay) recv_wait_event = ProfileEvent( ProfileEventType.RECEIVE_WAIT, profile[0], profile[1], port, mpp_message.port_length, slot, port.get_num_messages(), len(mpp_message_bytes), message.timestamp) recv_xfer_event = ProfileEvent( ProfileEventType.RECEIVE_TRANSFER, profile[1], profile[2], port, mpp_message.port_length, slot, port.get_num_messages(), len(mpp_message_bytes), message.timestamp) recv_decode_event.message_timestamp = message.timestamp receive_event.message_timestamp = message.timestamp if port.is_vector(): receive_event.port_length = port.get_length() recv_wait_event.port_length = port.get_length() recv_xfer_event.port_length = port.get_length() recv_decode_event.port_length = port.get_length() receive_event.message_size = len(mpp_message_bytes) if not isinstance(mpp_message.data, ClosePort): self._profiler.record_event(recv_wait_event) self._profiler.record_event(recv_xfer_event) self._profiler.record_event(recv_decode_event) self._profiler.record_event(receive_event) expected_message_number = port.get_num_messages(slot) if expected_message_number != mpp_message.message_number: if (expected_message_number - 1 == mpp_message.message_number and port.is_resuming(slot)): _logger.debug(f'Discarding received message on {port_and_slot}' ': resuming from weakly consistent snapshot') port.set_resumed(slot) return self.receive_message(port_name, slot, default) raise RuntimeError(f'Received message on {port_and_slot} with' ' unexpected message number' f' {mpp_message.message_number}. Was expecting' f' {expected_message_number}. Are you resuming' ' from an inconsistent snapshot?') port.increment_num_messages(slot) _logger.debug('Received message on {}'.format(port_and_slot)) if isinstance(mpp_message.data, ClosePort): _logger.debug('Port {} is now closed'.format(port_and_slot)) return message, mpp_message.saved_until
[docs] def close_port(self, port_name: str, slot: Optional[int] = None ) -> None: """Closes the given port. This signals to any connected instance that no more messages will be sent on this port, which it can use to decide whether to shut down or continue running. Args: port_name: The name of the port to close. """ message = Message(float('inf'), None, ClosePort(), Settings()) if slot is None: _logger.debug('Closing port {}'.format(port_name)) else: _logger.debug('Closing port {}[{}]'.format(port_name, slot)) self.send_message(port_name, message, slot)
[docs] def shutdown(self) -> None: """Shuts down the Communicator, closing connections. """ for client in self._clients.values(): client.close() wait_event = ProfileEvent(ProfileEventType.DISCONNECT_WAIT, ProfileTimestamp()) self._post_office.wait_for_receivers() self._profiler.record_event(wait_event) shutdown_event = ProfileEvent(ProfileEventType.SHUTDOWN, ProfileTimestamp()) for server in self._servers: server.close() self._profiler.record_event(shutdown_event)
[docs] def restore_message_counts(self, port_message_counts: Dict[str, List[int]] ) -> None: """Restore message counts on all ports. """ for port_name, num_messages in port_message_counts.items(): if port_name == "muscle_settings_in": self._muscle_settings_in.restore_message_counts(num_messages) elif port_name in self._ports: self._ports[port_name].restore_message_counts(num_messages) else: raise RuntimeError(f'Unknown port {port_name} in snapshot.' ' Have your port definitions changed since' ' the snapshot was taken?')
[docs] def get_message_counts(self) -> Dict[str, List[int]]: """Get message counts for all ports on the communicator. """ port_message_counts = {port_name: port.get_message_counts() for port_name, port in self._ports.items()} port_message_counts["muscle_settings_in"] = \ self._muscle_settings_in.get_message_counts() return port_message_counts
def __instance_id(self) -> Reference: """Returns our complete instance id. """ return self._kernel + self._index def __ports_from_declared(self) -> Dict[str, Port]: """Derives port definitions from supplied declaration. """ ports = dict() declared_ports = cast(Dict[Operator, List[str]], self._declared_ports) for operator, port_list in declared_ports.items(): for port_desc in port_list: port_name, is_vector = self.__split_port_desc(port_desc) if port_name.startswith('muscle_'): raise RuntimeError(('Port names starting with "muscle_"' ' are reserved for MUSCLE, please' ' rename port "{}"'.format(port_name))) port_id = Identifier(port_name) is_connected = self._peer_manager.is_connected(port_id) if is_connected: peer_ports = self._peer_manager.get_peer_ports(port_id) peer_port = peer_ports[0] peer_ce = peer_port[:-1] port_peer_dims = self._peer_manager.get_peer_dims(peer_ce) for peer_port in peer_ports[1:]: peer_ce = peer_port[:-1] if port_peer_dims != self._peer_manager.get_peer_dims( peer_ce): port_strs = ', '.join(map(str, peer_ports)) raise RuntimeError(('Multicast port "{}" is' ' connected to peers with' ' different dimensions. All' ' peer components that this' ' port is connected to must' ' have the same multiplicity.' ' Connected to ports: {}.' ).format(port_name, port_strs)) else: port_peer_dims = [] ports[port_name] = Port( port_name, operator, is_vector, is_connected, len(self._index), port_peer_dims) return ports def __ports_from_conduits(self, conduits: List[Conduit] ) -> Dict[str, Port]: """Derives port definitions from conduits. Args: conduits: The list of conduits. """ ports = dict() for conduit in conduits: if conduit.sending_component() == self._kernel: port_id = conduit.sending_port() operator = Operator.O_F port_peer_dims = self._peer_manager.get_peer_dims( conduit.receiving_component()) elif conduit.receiving_component() == self._kernel: port_id = conduit.receiving_port() operator = Operator.F_INIT port_peer_dims = self._peer_manager.get_peer_dims( conduit.sending_component()) else: continue ndims = max(0, len(port_peer_dims) - len(self._index)) is_vector = (ndims == 1) is_connected = self._peer_manager.is_connected(port_id) if not str(port_id).startswith('muscle_'): ports[str(port_id)] = Port( str(port_id), operator, is_vector, is_connected, len(self._index), port_peer_dims) return ports def __settings_in_port(self, conduits: List[Conduit]) -> Port: """Creates a Port representing muscle_settings_in. Args: conduits: The list of conduits. """ for conduit in conduits: if conduit.receiving_component() == self._kernel: port_id = conduit.receiving_port() if str(port_id) == 'muscle_settings_in': return Port(str(port_id), Operator.F_INIT, False, self._peer_manager.is_connected(port_id), len(self._index), self._peer_manager.get_peer_dims( conduit.sending_component())) return Port('muscle_settings_in', Operator.F_INIT, False, False, len(self._index), []) def __get_client(self, instance: Reference) -> MPPClient: """Get or create a client to connect to the given instance. Args: instance: A reference to the instance to connect to. Returns: An existing or new MCP client. """ if instance not in self._clients: locations = self._peer_manager.get_peer_locations(instance) _logger.info(f'Connecting to peer {instance} at {locations}') self._clients[instance] = MPPClient(locations) return self._clients[instance] def __get_endpoint(self, port_name: str, slot: List[int]) -> Endpoint: """Determines the endpoint on our side. Args: port_name: Name of the port to send or receive on. slot: Slot to send or receive on. """ try: port = Identifier(port_name) except ValueError as e: raise ValueError('"{}" is not a valid port name: {}'.format( port_name, e)) return Endpoint(self._kernel, self._index, port, slot) def __split_port_desc(self, port_desc: str) -> Tuple[str, bool]: """Split a port description into its name and dimensionality. Expects a port description of the form port_name or port_name[], and returns the port name and whether it is a vector port. Args: port_desc: A port description string, as above. """ is_vector = False if port_desc.endswith('[]'): is_vector = True port_desc = port_desc[:-2] if port_desc.endswith('[]'): raise ValueError(('Port description "{}" is invalid: ports can' ' have at most one dimension.').format( port_desc)) return port_desc, is_vector