from typing import List, Optional, Tuple
import msgpack
from ymmsl import Reference
from libmuscle.mcp.protocol import RequestType
from libmuscle.mcp.transport_client import ProfileData, TransportClient
from libmuscle.mcp.type_registry import transport_client_types
[docs]class MPPClient:
"""A client that connects to an MPP server.
This client connects to a peer to retrieve messages. It uses an MCP
Transport to connect.
"""
def __init__(self, locations: List[str]) -> None:
"""Create an MPPClient for the given peer.
The client will connect to the peer on one of its locations. It
tries the most efficient protocol first. Once connected, it can
request messages from any component and port represented by it.
Args:
locations: The peer's location strings
"""
client: Optional[TransportClient] = None
for ClientType in transport_client_types:
for location in locations:
if ClientType.can_connect_to(location):
try:
client = ClientType(location)
break
except Exception:
pass
if client:
break
else:
raise RuntimeError('Failed to connect')
self._transport_client = client
[docs] def receive(self, receiver: Reference) -> Tuple[bytes, ProfileData]:
"""Receive a message from a port this client connects to.
Args:
receiver: The receiving (local) port.
Returns:
The received message, and profiling data
"""
request = [RequestType.GET_NEXT_MESSAGE.value, str(receiver)]
encoded_request = msgpack.packb(request, use_bin_type=True)
return self._transport_client.call(encoded_request)
[docs] def close(self) -> None:
"""Closes this client.
This closes any connections this client has and/or performs
other shutdown activities.
"""
self._transport_client.close()