from typing import List
import msgpack
from ymmsl import Reference
from libmuscle.mcp.protocol import RequestType
from libmuscle.mcp.transport_server import RequestHandler, TransportServer
from libmuscle.mcp.type_registry import transport_server_types
from libmuscle.post_office import PostOffice
[docs]class MPPRequestHandler(RequestHandler):
"""Handles peer protocol requests.
This accepts peer protocol message requests and responds to them by
getting messages from a PostOffice.
"""
def __init__(self, post_office: PostOffice) -> None:
"""Create an MPPRequestHandler.
Args:
post_office: The PostOffice to get messages from.
"""
self._post_office = post_office
[docs] def handle_request(self, request: bytes) -> bytes:
"""Handle a request.
This receives an MCP request and handles it by blocking until
the requested message is available, then returning it.
Args:
request: A received request
Returns:
An encoded response
"""
req = msgpack.unpackb(request, raw=False)
if len(req) != 2 or req[0] != RequestType.GET_NEXT_MESSAGE.value:
raise RuntimeError(
'Invalid request type. Did the streams get crossed?')
recv_port = Reference(req[1])
return self._post_office.get_message(recv_port)
[docs]class MPPServer:
"""Serves MPP requests.
This manages a collection of servers for different protocols and a
PostOffice that stores outgoing messages.
"""
def __init__(self) -> None:
self._post_office = PostOffice()
self._handler = MPPRequestHandler(self._post_office)
self._servers: List[TransportServer] = []
for server_type in transport_server_types:
server = server_type(self._handler)
self._servers.append(server)
[docs] def get_locations(self) -> List[str]:
"""Returns a list of locations that we can be reached at.
These locations are of the form 'protocol:location', where
the protocol name does not contain a colon and location may
be an arbitrary string.
Returns:
A list of strings describing network locations.
"""
return [server.get_location() for server in self._servers]
[docs] def deposit(self, receiver: Reference, message: bytes) -> None:
"""Deposits a message for the receiver to retrieve.
Args:
receiver: Receiver of the message.
message: The message to deposit.
"""
self._post_office.deposit(receiver, message)
[docs] def wait_for_receivers(self) -> None:
"""Waits for all deposited messages to have been received."""
self._post_office.wait_for_receivers()
[docs] def shutdown(self) -> None:
"""Shut down all servers."""
for server in self._servers:
server.close()