import struct
import msgpack
import numpy as np
from ymmsl import Reference, Settings
from libmuscle.grid import Grid
from libmuscle.mpp_message import MPPMessage
[docs]def test_create() -> None:
sender = Reference('sender.port')
receiver = Reference('receiver.port')
timestamp = 10.0
next_timestamp = 11.0
settings_overlay = (6789).to_bytes(2, 'little', signed=True)
message_number = 0
saved_until = 1.6
data = (12345).to_bytes(2, 'little', signed=True)
msg = MPPMessage(
sender, receiver, None, timestamp, next_timestamp,
settings_overlay, message_number, saved_until, data)
assert msg.sender == sender
assert msg.receiver == receiver
assert msg.port_length is None
assert msg.timestamp == 10.0
assert msg.next_timestamp == 11.0
assert msg.settings_overlay == settings_overlay
assert msg.message_number == 0
assert msg.saved_until == 1.6
assert msg.data == data
[docs]def test_grid_encode() -> None:
sender = Reference('sender.port')
receiver = Reference('receiver.port')
timestamp = 10.0
next_timestamp = 11.0
array = np.array(
[[[1.0, 2.0, 3.0],
[4.0, 5.0, 6.0]],
[[7.0, 8.0, 9.0],
[10.0, 11.0, 12.0]]], np.float32)
grid = Grid(array, ['x', 'y', 'z'])
msg = MPPMessage(
sender, receiver, None, timestamp, next_timestamp, Settings(),
0, 1.0, grid)
wire_data = msg.encoded()
mcp_decoded = msgpack.unpackb(wire_data, raw=False)
grid_decoded = msgpack.unpackb(mcp_decoded['data'].data, raw=False)
assert grid_decoded['type'] == 'float32'
assert grid_decoded['shape'] == [2, 2, 3]
assert grid_decoded['order'] == 'la'
next_value = 1.0
for value in struct.iter_unpack('<f', grid_decoded['data']):
assert value[0] == next_value
next_value = next_value + 1.0
assert grid_decoded['indexes'] == ['x', 'y', 'z']
[docs]def test_grid_decode() -> None:
settings_data = msgpack.packb({}, use_bin_type=True)
grid_buf = bytes(
[1, 0, 0, 0,
2, 0, 0, 0,
3, 0, 0, 0,
0, 0, 4, 0,
0, 0, 5, 0,
0, 0, 6, 0])
grid_dict = {
'type': 'int32',
'shape': [2, 3],
'order': 'la',
'data': grid_buf,
'indexes': []}
grid_data = msgpack.packb(grid_dict, use_bin_type=True)
msg_dict = {
'sender': 'elem1.port1',
'receiver': 'elem2.port2',
'port_length': 0,
'timestamp': 0.0,
'next_timestamp': None,
'settings_overlay': msgpack.ExtType(1, settings_data),
'message_number': 0,
'saved_until': 9.9,
'data': msgpack.ExtType(2, grid_data)}
wire_data = msgpack.packb(msg_dict, use_bin_type=True)
msg = MPPMessage.from_bytes(wire_data)
assert isinstance(msg.data, Grid)
assert msg.data.array.dtype == np.int32
assert msg.data.array.shape == (2, 3)
assert msg.data.array.flags.c_contiguous
assert msg.data.array[0, 0] == 1
assert msg.data.array[0, 1] == 2
assert msg.data.array[1, 1] == 5 * 65536
assert msg.data.indexes is None
grid_dict['order'] = 'fa'
grid_data = msgpack.packb(grid_dict, use_bin_type=True)
msg_dict['data'] = msgpack.ExtType(2, grid_data)
wire_data = msgpack.packb(msg_dict, use_bin_type=True)
msg = MPPMessage.from_bytes(wire_data)
assert isinstance(msg.data, Grid)
assert msg.data.array.dtype == np.int32
assert msg.data.array.shape == (2, 3)
assert msg.data.array.flags.f_contiguous
assert msg.data.array[0, 0] == 1
assert msg.data.array[0, 1] == 3
assert msg.data.array[0, 2] == 5 * 65536
assert msg.data.indexes is None
[docs]def test_grid_roundtrip() -> None:
sender = Reference('sender.port')
receiver = Reference('receiver.port')
timestamp = 10.0
next_timestamp = 11.0
for order in ('C', 'F'):
array = np.array(
[[[1.0, 2.0, 3.0],
[4.0, 5.0, 6.0]],
[[7.0, 8.0, 9.0],
[10.0, 11.0, 12.0]]], np.float64, order=order)
assert array[0, 0, 0] == 1.0
grid = Grid(array, ['x', 'y', 'z'])
msg = MPPMessage(
sender, receiver, None, timestamp, next_timestamp, Settings(),
0, 1.0, grid)
wire_data = msg.encoded()
msg_out = MPPMessage.from_bytes(wire_data)
assert isinstance(msg_out.data, Grid)
grid_out = msg_out.data
assert grid_out.indexes == ['x', 'y', 'z']
assert isinstance(grid_out.array, np.ndarray)
assert grid_out.array.dtype == np.float64
assert grid_out.array.shape == (2, 2, 3)
assert grid_out.array.size == 12
assert grid_out.array[1, 0, 1] == 8.0
assert grid_out.array[0, 0, 2] == 3.0
[docs]def test_non_contiguous_grid_roundtrip() -> None:
sender = Reference('sender.port')
receiver = Reference('receiver.port')
timestamp = 10.0
next_timestamp = 11.0
array = np.array(
[[[1.0 + 0.125j, 2.0 + 0.25j, 3.0 + 0.375j],
[4.0 + 0.4375j, 5.0 + 0.5j, 6.0 + 0.625j]],
[[7.0 + 0.75j, 8.0 + 0.875j, 9.0 + 0.9375j],
[10.0 + 0.10j, 11.0 + 0.11j, 12.0 + 0.12j]]], np.complex64)
assert array.real[0, 0, 0] == 1.0
assert array.imag[0, 0, 0] == 0.125
grid = Grid(array.real, ['a', 'b', 'c'])
msg = MPPMessage(
sender, receiver, None, timestamp, next_timestamp, Settings(),
0, 7.7, grid)
wire_data = msg.encoded()
msg_out = MPPMessage.from_bytes(wire_data)
assert isinstance(msg_out.data, Grid)
grid_out = msg_out.data
assert isinstance(grid_out.array, np.ndarray)
assert grid_out.array.dtype == np.float32
assert grid_out.array.shape == (2, 2, 3)
assert grid_out.array.size == 12
assert grid_out.array[1, 0, 1] == 8.0
assert grid_out.array[0, 0, 2] == 3.0