from typing import cast, Dict, List
from ymmsl import Conduit, Identifier, Reference
from libmuscle.endpoint import Endpoint
[docs]class PeerManager:
"""Manages information about peers for a Communicator
"""
def __init__(self, kernel: Reference, index: List[int],
conduits: List[Conduit],
peer_dims: Dict[Reference, List[int]],
peer_locations: Dict[Reference, List[str]]) -> None:
"""Create a PeerManager.
Peers here are instances, and peer_dims and peer_locations are
indexed by a Reference to an instance. Instance sets are
multi-dimensional arrays with sizes given by peer_dims.
Args:
kernel: The kernel for the instance whose peers we're
managing.
index: The index of the instance whose peers we're
managing.
conduits: A list of conduits attached to this component,
as received from the manager.
peer_dims: For each peer we share a conduit with, the
dimensions of the instance set.
peer_locations: A list of locations for each peer instance
we share a conduit with.
"""
self.__kernel = kernel
self.__index = index
# peer port ids, indexed by local kernel.port id
self.__peers: Dict[Reference, List[Reference]] = {}
for conduit in conduits:
if str(conduit.sending_component()) == str(kernel):
# we send on the port this conduit attaches to
self.__peers.setdefault(
conduit.sender, []).append(conduit.receiver)
if str(conduit.receiving_component()) == str(kernel):
# we receive on the port this conduit attaches to
if conduit.receiver in self.__peers:
raise RuntimeError(('Receiving port "{}" is connected by'
' multiple conduits, but at most one'
' is allowed.'
).format(conduit.receiving_port()))
self.__peers[conduit.receiver] = [conduit.sender]
self.__peer_dims = peer_dims # indexed by kernel id
self.__peer_locations = peer_locations # indexed by instance id
[docs] def is_connected(self, port: Identifier) -> bool:
"""Determine whether the given port is connected.
Args:
port: The port to check.
"""
recv_port_full = self.__kernel + port
return recv_port_full in self.__peers
[docs] def get_peer_ports(self, port: Identifier) -> List[Reference]:
"""Get a reference for the peer ports.
Args:
port: Name of the port on this side.
"""
return self.__peers[self.__kernel + port]
[docs] def get_peer_dims(self, peer_kernel: Reference) -> List[int]:
"""Get the dimensions of a peer kernel.
Args:
peer_kernel: The peer kernel whose dimensions to get.
"""
return self.__peer_dims[peer_kernel]
[docs] def get_peer_locations(self, peer_instance: Reference) -> List[str]:
"""Get the locations of a peer instance.
There may be multiple, if the peer supports more than one
protocol.
Args:
peer_instance: The instance whose locations to get.
"""
return self.__peer_locations[peer_instance]
[docs] def get_peer_endpoints(self, port: Identifier, slot: List[int]
) -> List[Endpoint]:
"""Determine the peer endpoints for the given port and slot.
Args:
port: The port on our side to send or receive on.
slot: The slot to send or receive on.
Returns:
The peer endpoints.
"""
peers = self.__peers[self.__kernel + port]
endpoints = []
for peer in peers:
peer_kernel = peer[:-1]
peer_port = cast(Identifier, peer[-1])
total_index = self.__index + slot
# rebalance the indices
peer_dim = len(self.__peer_dims[peer_kernel])
peer_index = total_index[0:peer_dim]
peer_slot = total_index[peer_dim:]
endpoints.append(
Endpoint(peer_kernel, peer_index, peer_port, peer_slot))
return endpoints