Source code for libmuscle.peer_manager

from typing import cast, Dict, List

from ymmsl import Conduit, Identifier, Reference

from libmuscle.endpoint import Endpoint


[docs]class PeerManager: """Manages information about peers for a Communicator """ def __init__(self, kernel: Reference, index: List[int], conduits: List[Conduit], peer_dims: Dict[Reference, List[int]], peer_locations: Dict[Reference, List[str]]) -> None: """Create a PeerManager. Peers here are instances, and peer_dims and peer_locations are indexed by a Reference to an instance. Instance sets are multi-dimensional arrays with sizes given by peer_dims. Args: kernel: The kernel for the instance whose peers we're managing. index: The index of the instance whose peers we're managing. conduits: A list of conduits attached to this component, as received from the manager. peer_dims: For each peer we share a conduit with, the dimensions of the instance set. peer_locations: A list of locations for each peer instance we share a conduit with. """ self.__kernel = kernel self.__index = index # peer port ids, indexed by local kernel.port id self.__peers: Dict[Reference, List[Reference]] = {} for conduit in conduits: if str(conduit.sending_component()) == str(kernel): # we send on the port this conduit attaches to self.__peers.setdefault( conduit.sender, []).append(conduit.receiver) if str(conduit.receiving_component()) == str(kernel): # we receive on the port this conduit attaches to if conduit.receiver in self.__peers: raise RuntimeError(('Receiving port "{}" is connected by' ' multiple conduits, but at most one' ' is allowed.' ).format(conduit.receiving_port())) self.__peers[conduit.receiver] = [conduit.sender] self.__peer_dims = peer_dims # indexed by kernel id self.__peer_locations = peer_locations # indexed by instance id
[docs] def is_connected(self, port: Identifier) -> bool: """Determine whether the given port is connected. Args: port: The port to check. """ recv_port_full = self.__kernel + port return recv_port_full in self.__peers
[docs] def get_peer_ports(self, port: Identifier) -> List[Reference]: """Get a reference for the peer ports. Args: port: Name of the port on this side. """ return self.__peers[self.__kernel + port]
[docs] def get_peer_dims(self, peer_kernel: Reference) -> List[int]: """Get the dimensions of a peer kernel. Args: peer_kernel: The peer kernel whose dimensions to get. """ return self.__peer_dims[peer_kernel]
[docs] def get_peer_locations(self, peer_instance: Reference) -> List[str]: """Get the locations of a peer instance. There may be multiple, if the peer supports more than one protocol. Args: peer_instance: The instance whose locations to get. """ return self.__peer_locations[peer_instance]
[docs] def get_peer_endpoints(self, port: Identifier, slot: List[int] ) -> List[Endpoint]: """Determine the peer endpoints for the given port and slot. Args: port: The port on our side to send or receive on. slot: The slot to send or receive on. Returns: The peer endpoints. """ peers = self.__peers[self.__kernel + port] endpoints = [] for peer in peers: peer_kernel = peer[:-1] peer_port = cast(Identifier, peer[-1]) total_index = self.__index + slot # rebalance the indices peer_dim = len(self.__peer_dims[peer_kernel]) peer_index = total_index[0:peer_dim] peer_slot = total_index[peer_dim:] endpoints.append( Endpoint(peer_kernel, peer_index, peer_port, peer_slot)) return endpoints