from typing import cast
from ymmsl.v0_2 import Conduit, Identifier, Port, Reference
from libmuscle.endpoint import Endpoint
[docs]
class PeerInfo:
"""Interprets information about peers for a Communicator
"""
def __init__(self, kernel: Reference, index: list[int],
conduits: list[Conduit],
peer_dims: dict[Reference, list[int]],
peer_locations: dict[Reference, list[str]],
ymmsl_ports: list[Port]) -> None:
"""Create a PeerInfo.
Peers here are instances, and peer_dims and peer_locations are
indexed by a Reference to an instance. Instance sets are
multi-dimensional arrays with sizes given by peer_dims.
Args:
kernel: The kernel for the instance whose peers we're
managing.
index: The index of the instance whose peers we're
managing.
conduits: A list of conduits attached to this component,
as received from the manager.
peer_dims: For each peer we share a conduit with, the
dimensions of the instance set.
peer_locations: A list of locations for each peer instance
we share a conduit with.
ymmsl_ports: Port declaration for this component from the yMMSL
configuration.
"""
self._kernel = kernel
self._index = index
self._conduits = conduits
self._incoming_ports: list[Reference] = []
self._outgoing_ports: list[Reference] = []
# peer port ids, indexed by local kernel.port id
self._peers: dict[Reference, list[Reference]] = {}
for conduit in conduits:
if str(conduit.sending_component()) == str(kernel):
# we send on the port this conduit attaches to
if conduit.sender not in self._outgoing_ports:
self._outgoing_ports.append(conduit.sender)
self._peers.setdefault(conduit.sender, []).append(
conduit.receiver)
if str(conduit.receiving_component()) == str(kernel):
# we receive on the port this conduit attaches to
if conduit.receiver in self._peers:
raise RuntimeError(
f'Receiving port "{conduit.receiving_port()}" is connected'
' by multiple conduits, but at most one is allowed.')
self._incoming_ports.append(conduit.receiver)
self._peers[conduit.receiver] = [conduit.sender]
self._peer_dims = peer_dims # indexed by kernel id
self._peer_locations = peer_locations # indexed by instance id
self._ymmsl_ports = ymmsl_ports
[docs]
def list_ymmsl_ports(self) -> list[Port]:
"""list ports declared in the yMMSL configuration"""
return self._ymmsl_ports
[docs]
def list_incoming_ports(self) -> list[tuple[Identifier, Reference]]:
"""list incoming ports.
Returns:
A list of tuples containing a port id and a reference to the
peer endpoint.
"""
return [
(cast(Identifier, port_ref[-1]), self._peers[port_ref][0])
for port_ref in self._incoming_ports]
[docs]
def list_outgoing_ports(self) -> list[tuple[Identifier, list[Reference]]]:
"""list outgoing ports.
Returns:
A list of tuples containing a port id and a list of references
to the peer endpoint(s).
"""
return [
(cast(Identifier, port_ref[-1]), self._peers[port_ref])
for port_ref in self._outgoing_ports]
[docs]
def is_connected(self, port: Identifier) -> bool:
"""Determine whether the given port is connected.
Args:
port: The port to check.
"""
recv_port_full = self._kernel + port
return recv_port_full in self._peers
[docs]
def get_peer_ports(self, port: Identifier) -> list[Reference]:
"""Get a reference for the peer ports.
Args:
port: Name of the port on this side.
"""
return self._peers[self._kernel + port]
[docs]
def get_peer_dims(self, peer_kernel: Reference) -> list[int]:
"""Get the dimensions of a peer kernel.
Args:
peer_kernel: The peer kernel whose dimensions to get.
"""
return self._peer_dims[peer_kernel]
[docs]
def get_peer_locations(self, peer_instance: Reference) -> list[str]:
"""Get the locations of a peer instance.
There may be multiple, if the peer supports more than one
protocol.
Args:
peer_instance: The instance whose locations to get.
"""
return self._peer_locations[peer_instance]
[docs]
def get_peer_endpoints(self, port: Identifier, slot: list[int]
) -> list[Endpoint]:
"""Determine the peer endpoints for the given port and slot.
Args:
port: The port on our side to send or receive on.
slot: The slot to send or receive on.
Returns:
The peer endpoints.
"""
peers = self._peers[self._kernel + port]
endpoints = []
for peer in peers:
peer_kernel = peer[:-1]
peer_port = cast(Identifier, peer[-1])
total_index = self._index + slot
# rebalance the indices
peer_dim = len(self._peer_dims[peer_kernel])
peer_index = total_index[0:peer_dim]
peer_slot = total_index[peer_dim:]
endpoints.append(
Endpoint(peer_kernel, peer_index, peer_port, peer_slot))
return endpoints
[docs]
def check_peer_dimensions(self, port_id: Identifier) -> list[int]:
"""Checks peer dimensions are as expected.
Args:
port_id: Port to check peer dimensions for.
Returns:
Dimensions of connected peers.
"""
if not self.is_connected(port_id):
return []
peer_ports = self.get_peer_ports(port_id)
peer_port = peer_ports[0]
peer_component = peer_port[:-1]
port_peer_dims = self.get_peer_dims(peer_component)
for peer_port in peer_ports[1:]:
peer_component = peer_port[:-1]
if port_peer_dims != self.get_peer_dims(peer_component):
port_strs = ", ".join(map(str, peer_ports))
raise RuntimeError(
f'Multicast port "{port_id}" is connected to peers with'
" different dimensions. All peer components that"
" this port is connected to must have the same"
f" multiplicity. Connected to ports: {port_strs}."
)
return port_peer_dims