Source code for libmuscle.post_office

from threading import Lock
import time
from typing import Dict

import msgpack
from ymmsl import Reference

from libmuscle.mcp.protocol import RequestType
from libmuscle.mcp.transport_server import RequestHandler
from libmuscle.outbox import Outbox


[docs]class PostOffice(RequestHandler): """A PostOffice is an object that holds messages to be retrieved. A PostOffice holds outboxes with messages for receivers. It also acts as a request handler for incoming requests for messages. """ def __init__(self) -> None: """Create a PostOffice. """ self._outboxes: Dict[Reference, Outbox] = {} self._outbox_lock = Lock()
[docs] def handle_request(self, request: bytes) -> bytes: """Handle a request. This receives an MCP request and handles it by blocking until the requested message is available, then returning it. Args: request: A received request Returns: An encoded response """ req = msgpack.unpackb(request, raw=False) if len(req) != 2 or req[0] != RequestType.GET_NEXT_MESSAGE.value: raise RuntimeError( 'Invalid request type. Did the streams get crossed?') recv_port = Reference(req[1]) self._ensure_outbox_exists(recv_port) return self._outboxes[recv_port].retrieve()
[docs] def get_message(self, receiver: Reference) -> bytes: """Get a message from a receiver's outbox. Used by servers to get messages that have been sent to another instance. Args: receiver: The receiver of the message. """ self._ensure_outbox_exists(receiver) return self._outboxes[receiver].retrieve()
[docs] def deposit(self, receiver: Reference, message: bytes) -> None: """Deposits a message into an outbox. Args: receiver: Receiver of the message. message: The message to deposit. """ self._ensure_outbox_exists(receiver) self._outboxes[receiver].deposit(message)
[docs] def wait_for_receivers(self) -> None: """Waits until all outboxes are empty. """ for outbox in self._outboxes.values(): while not outbox.is_empty(): time.sleep(0.1)
def _ensure_outbox_exists(self, receiver: Reference) -> None: """Ensure that an outbox exists. Outboxes are created dynamically, the first time a message is sent to a receiver. This function checks that an outbox exists for a receiver, and if not, creates one. Args: receiver: The receiver that should have an outbox. """ self._outbox_lock.acquire() if receiver not in self._outboxes: self._outboxes[receiver] = Outbox() self._outbox_lock.release()