Source code for libmuscle.mpp_message

from enum import IntEnum
from typing import Any, cast, Optional

import msgpack
import numpy as np

from ymmsl import Reference, Settings

from libmuscle.grid import Grid


[docs]class ExtTypeId(IntEnum): """MessagePack extension type ids. MessagePack lets you define your own types as an extension to the built-in ones. These are distinguished by a number from 0 to 127. This class is our registry of extension type ids. """ CLOSE_PORT = 0 SETTINGS = 1 GRID_INT32 = 2 GRID_INT64 = 3 GRID_FLOAT32 = 4 GRID_FLOAT64 = 5 GRID_BOOL = 6
_grid_types = { ExtTypeId.GRID_INT32, ExtTypeId.GRID_INT64, ExtTypeId.GRID_FLOAT32, ExtTypeId.GRID_FLOAT64, ExtTypeId.GRID_BOOL}
[docs]class ClosePort: """Sentinel value to send when closing a port. Sending an object of this class on a port/conduit conveys to the receiver the message that no further messages will be sent on this port during the simulation. All information is carried by the type, this has no attributes. """ pass
def _encode_grid(grid: Grid) -> msgpack.ExtType: """Encodes a Grid object into the wire format. """ ext_type_map = { 'int32': ExtTypeId.GRID_INT32, 'int64': ExtTypeId.GRID_INT64, 'float32': ExtTypeId.GRID_FLOAT32, 'float64': ExtTypeId.GRID_FLOAT64, 'bool': ExtTypeId.GRID_BOOL} array = grid.array if array.flags.f_contiguous: # indexes that differ in the first place are adjacent order = 'fa' else: # indexes that differ in the last place are adjacent order = 'la' # dtype is a bit weird, but this seems to be consistent if isinstance(array.dtype, np.dtype): array_type = str(array.dtype) else: array_type = str(np.dtype(array_type)) if array_type not in ext_type_map: raise RuntimeError('Unsupported array data type') buf = array.tobytes(order='A') # array_type is redundant, but useful metadata. grid_dict = { 'type': array_type, 'shape': list(array.shape), 'order': order, 'data': buf, 'indexes': grid.indexes} packed_data = msgpack.packb(grid_dict, use_bin_type=True) return msgpack.ExtType(ext_type_map[array_type], packed_data) def _decode_grid(code: int, data: bytes) -> Grid: """Creates a Grid from serialised data. """ type_map = { ExtTypeId.GRID_INT32: np.int32, ExtTypeId.GRID_INT64: np.int64, ExtTypeId.GRID_FLOAT32: np.float32, ExtTypeId.GRID_FLOAT64: np.float64, ExtTypeId.GRID_BOOL: np.bool_} order_map = { 'fa': 'F', 'la': 'C'} grid_dict = msgpack.unpackb(data, raw=False) order = order_map[grid_dict['order']] shape = tuple(grid_dict['shape']) dtype = type_map[ExtTypeId(code)] array = np.ndarray(shape, dtype, grid_dict['data'], order=order) # type: ignore indexes = grid_dict['indexes'] if indexes == []: indexes = None return Grid(array, indexes) def _data_encoder(obj: Any) -> Any: """Encodes custom objects for MessagePack. In particular, this takes care of any Settings, Grid and numpy.ndarray objects the user may want to send. """ if isinstance(obj, ClosePort): return msgpack.ExtType(ExtTypeId.CLOSE_PORT, bytes()) elif isinstance(obj, Settings): packed_data = msgpack.packb(obj.as_ordered_dict(), use_bin_type=True) return msgpack.ExtType(ExtTypeId.SETTINGS, packed_data) elif isinstance(obj, np.ndarray): return _encode_grid(Grid(obj)) elif isinstance(obj, Grid): return _encode_grid(obj) return obj def _ext_decoder(code: int, data: bytes) -> msgpack.ExtType: if code == ExtTypeId.CLOSE_PORT: return ClosePort() elif code == ExtTypeId.SETTINGS: plain_dict = msgpack.unpackb(data, raw=False) return Settings(plain_dict) elif code in _grid_types: return _decode_grid(code, data) return msgpack.ExtType(code, data)
[docs]class MPPMessage: """A MUSCLE Peer Protocol message. Messages carry the identity of their sender and receiver, so that they can be routed by a MUSCLE Transport Overlay when we get to multi-site running in the future. """ def __init__(self, sender: Reference, receiver: Reference, port_length: Optional[int], timestamp: float, next_timestamp: Optional[float], settings_overlay: Settings, message_number: int, saved_until: float, data: Any ) -> None: """Create an MPPMessage. Senders and receivers are refered to by a Reference, which contains Instance[InstanceNumber].Port[Slot]. The port_length field is only used if two vector ports are connected together. In that case the number of slots is not determined by the number of instances, and must be set by the sender and then communicated to the receiver in this additional field in all messages sent on the port. Args: sender: The sending endpoint. receiver: The receiving endpoint. port_length: Length of the slot, where applicable. settings_overlay: The serialised overlay settings. message_number: Sequence number on this conduit. saved_until: Elapsed time until which the sender has processed checkpoints. data: The serialised contents of the message. """ # make sure timestamp and next_timestamp are floats timestamp = float(timestamp) if next_timestamp is not None: next_timestamp = float(next_timestamp) self.sender = sender self.receiver = receiver self.port_length = port_length self.timestamp = timestamp self.next_timestamp = next_timestamp self.settings_overlay = settings_overlay self.message_number = message_number self.saved_until = saved_until if isinstance(data, np.ndarray): self.data = Grid(data) else: self.data = data
[docs] @staticmethod def from_bytes(message: bytes) -> 'MPPMessage': """Create an MPP Message from an encoded buffer. Args: message: MessagePack encoded message data. """ message_dict = msgpack.unpackb( message, ext_hook=_ext_decoder, raw=False) sender = Reference(message_dict["sender"]) receiver = Reference(message_dict["receiver"]) port_length = message_dict["port_length"] timestamp = message_dict["timestamp"] next_timestamp = message_dict["next_timestamp"] settings_overlay = message_dict["settings_overlay"] message_number = message_dict["message_number"] saved_until = message_dict["saved_until"] data = message_dict["data"] return MPPMessage( sender, receiver, port_length, timestamp, next_timestamp, settings_overlay, message_number, saved_until, data)
[docs] def encoded(self) -> bytes: """Encode the message and return as a bytes buffer. """ message_dict = { 'sender': str(self.sender), 'receiver': str(self.receiver), 'port_length': self.port_length, 'timestamp': self.timestamp, 'next_timestamp': self.next_timestamp, 'settings_overlay': self.settings_overlay, 'message_number': self.message_number, 'saved_until': self.saved_until, 'data': self.data } return cast(bytes, msgpack.packb( message_dict, default=_data_encoder, use_bin_type=True))