import dataclasses
from pathlib import Path
from unittest.mock import MagicMock
import msgpack
from ymmsl import (
Operator, Reference, Checkpoints, CheckpointRangeRule, CheckpointAtRule)
import libmuscle
from libmuscle.logging import LogLevel
from libmuscle.manager.mmp_server import MMPRequestHandler
from libmuscle.mcp.protocol import RequestType, ResponseType
from libmuscle.snapshot import SnapshotMetadata
[docs]def test_create_servicer(
logger, profile_store, mmp_configuration, instance_registry,
topology_store, snapshot_registry):
MMPRequestHandler(
logger, profile_store, mmp_configuration, instance_registry,
topology_store, snapshot_registry, None)
[docs]def test_log_message(mmp_request_handler, caplog):
request = [
RequestType.SUBMIT_LOG_MESSAGE.value,
'test_instance_id', 0.0, LogLevel.WARNING.value,
'Testing log message']
encoded_request = msgpack.packb(request, use_bin_type=True)
result = mmp_request_handler.handle_request(encoded_request)
decoded_result = msgpack.unpackb(result, raw=False)
assert isinstance(decoded_result, list)
assert len(decoded_result) == 1
assert decoded_result[0] == ResponseType.SUCCESS.value
assert caplog.records[0].name == 'test_instance_id'
assert caplog.records[0].levelname == 'WARNING'
assert caplog.records[0].message == 'Testing log message'
[docs]def test_get_settings(mmp_configuration, mmp_request_handler):
request = [RequestType.GET_SETTINGS.value]
encoded_request = msgpack.packb(request, use_bin_type=True)
result = mmp_request_handler.handle_request(encoded_request)
decoded_result = msgpack.unpackb(result, raw=False)
assert len(decoded_result) == 2
assert decoded_result[0] == ResponseType.SUCCESS.value
assert decoded_result[1] == {}
mmp_configuration.settings['test1'] = 13
mmp_configuration.settings['test2'] = 12.3
mmp_configuration.settings['test3'] = 'testing'
mmp_configuration.settings['test4'] = True
mmp_configuration.settings['test5'] = [2.3, 7.4]
mmp_configuration.settings['test6'] = [[1.0, 2.0], [2.0, 1.0]]
result = mmp_request_handler.handle_request(encoded_request)
decoded_result = msgpack.unpackb(result, raw=False)
assert len(decoded_result) == 2
assert decoded_result[0] == ResponseType.SUCCESS.value
result_dict = decoded_result[1]
assert len(result_dict) == 6
assert result_dict['test1'] == 13
assert result_dict['test2'] == 12.3
assert result_dict['test3'] == 'testing'
assert result_dict['test4'] is True
assert result_dict['test5'] == [2.3, 7.4]
assert result_dict['test6'] == [[1.0, 2.0], [2.0, 1.0]]
assert result_dict == mmp_configuration.settings.as_ordered_dict()
[docs]def test_register_instance(mmp_request_handler, instance_registry):
request = [
RequestType.REGISTER_INSTANCE.value,
'test_instance',
['tcp://localhost:10000'],
[['test_in', 'F_INIT']],
libmuscle.__version__]
encoded_request = msgpack.packb(request, use_bin_type=True)
result = mmp_request_handler.handle_request(encoded_request)
decoded_result = msgpack.unpackb(result, raw=False)
assert decoded_result[0] == ResponseType.SUCCESS.value
assert (instance_registry._locations['test_instance'] ==
['tcp://localhost:10000'])
registered_ports = instance_registry._ports
assert registered_ports['test_instance'][0].name == 'test_in'
assert registered_ports['test_instance'][0].operator == Operator.F_INIT
[docs]def test_register_instance_no_version(mmp_request_handler):
request = [
RequestType.REGISTER_INSTANCE.value,
'test_instance',
['tcp://localhost:10000'],
[['test_in', 'F_INIT']]]
encoded_request = msgpack.packb(request, use_bin_type=True)
result = mmp_request_handler.handle_request(encoded_request)
decoded_result = msgpack.unpackb(result, raw=False)
assert decoded_result[0] == ResponseType.ERROR.value
assert 'version' in decoded_result[1]
[docs]def test_register_instance_version_mismatch(mmp_request_handler):
request = [
RequestType.REGISTER_INSTANCE.value,
'test_instance',
['tcp://localhost:10000'],
[['test_in', 'F_INIT']],
libmuscle.__version__ + "dev"]
encoded_request = msgpack.packb(request, use_bin_type=True)
result = mmp_request_handler.handle_request(encoded_request)
decoded_result = msgpack.unpackb(result, raw=False)
assert decoded_result[0] == ResponseType.ERROR.value
assert 'version' in decoded_result[1]
[docs]def test_get_checkpoint_info(mmp_configuration, mmp_request_handler):
resume_path = Path('/path/to/resume.pack')
mmp_configuration.resume = {Reference('test_instance'): resume_path}
mmp_configuration.checkpoints = Checkpoints(
True,
[CheckpointRangeRule(every=10), CheckpointAtRule([1, 2, 3.0])])
request = [RequestType.GET_CHECKPOINT_INFO.value, 'test_instance']
encoded_request = msgpack.packb(request, use_bin_type=True)
result = mmp_request_handler.handle_request(encoded_request)
decoded_result = msgpack.unpackb(result, raw=False)
assert decoded_result[0] == ResponseType.SUCCESS.value
elapsed_time, checkpoints, resume, snapshot_directory = decoded_result[1:]
assert elapsed_time > 0.0
assert isinstance(checkpoints, dict)
assert checkpoints.keys() == {'at_end', 'wallclock_time', 'simulation_time'}
assert checkpoints['at_end'] is True
assert checkpoints['simulation_time'] == []
wallclock_time = checkpoints['wallclock_time']
assert len(wallclock_time) == 2
assert wallclock_time[0] == {'start': None, 'stop': None, 'every': 10}
assert all(isinstance(obj, (type(None), float))
for obj in wallclock_time[0].values())
assert wallclock_time[1] == {'at': [1, 2, 3.0]}
assert all(isinstance(obj, (type(None), float))
for obj in wallclock_time[1]['at'])
assert resume is not None
assert Path(resume) == resume_path
assert snapshot_directory is None
[docs]def test_get_checkpoint_info2(registered_mmp_request_handler2, tmp_path):
request = [RequestType.GET_CHECKPOINT_INFO.value, 'test_instance']
encoded_request = msgpack.packb(request, use_bin_type=True)
result = registered_mmp_request_handler2.handle_request(encoded_request)
decoded_result = msgpack.unpackb(result, raw=False)
assert decoded_result[0] == ResponseType.SUCCESS.value
snapshot_directory = decoded_result[4]
assert snapshot_directory == (
str(tmp_path) + '/instances/test_instance/snapshots')
[docs]def test_double_register_instance(mmp_request_handler):
request = [
RequestType.REGISTER_INSTANCE.value,
'test_instance',
['tcp://localhost:10000'],
[['test_in', 'F_INIT']],
libmuscle.__version__]
encoded_request = msgpack.packb(request, use_bin_type=True)
result = mmp_request_handler.handle_request(encoded_request)
decoded_result = msgpack.unpackb(result, raw=False)
assert decoded_result[0] == ResponseType.SUCCESS.value
result = mmp_request_handler.handle_request(encoded_request)
decoded_result = msgpack.unpackb(result, raw=False)
assert decoded_result[0] == ResponseType.ERROR.value
assert 'test_instance' in decoded_result[1]
[docs]def test_deregister_instance(
registered_mmp_request_handler, instance_registry):
assert Reference('macro') in instance_registry._locations
request = [RequestType.DEREGISTER_INSTANCE.value, 'macro']
encoded_request = msgpack.packb(request, use_bin_type=True)
result = registered_mmp_request_handler.handle_request(encoded_request)
decoded_result = msgpack.unpackb(result, raw=False)
assert decoded_result[0] == ResponseType.SUCCESS.value
assert Reference('macro') not in instance_registry._locations
[docs]def test_get_peers_pending(mmp_request_handler):
request = [RequestType.GET_PEERS.value, 'micro[0][0]']
encoded_request = msgpack.packb(request, use_bin_type=True)
result = mmp_request_handler.handle_request(encoded_request)
decoded_result = msgpack.unpackb(result, raw=False)
assert decoded_result[0] == ResponseType.PENDING.value
[docs]def test_request_peers_fanout(registered_mmp_request_handler):
request = [RequestType.GET_PEERS.value, 'macro']
encoded_request = msgpack.packb(request, use_bin_type=True)
result = registered_mmp_request_handler.handle_request(encoded_request)
decoded_result = msgpack.unpackb(result, raw=False)
status, conduits, dims, locations = decoded_result
assert status == ResponseType.SUCCESS.value
assert conduits[0][0] == 'macro.out'
assert conduits[0][1] == 'micro.in'
assert conduits[1][0] == 'micro.out'
assert conduits[1][1] == 'macro.in'
assert dims['micro'] == [10, 10]
for i, (name, locs) in enumerate(locations.items()):
assert name == f'micro[{i // 10}][{i % 10}]'
assert locs == [f'direct:{name}']
[docs]def test_request_peers_fanin(registered_mmp_request_handler):
request = [RequestType.GET_PEERS.value, 'micro[4][3]']
encoded_request = msgpack.packb(request, use_bin_type=True)
result = registered_mmp_request_handler.handle_request(encoded_request)
decoded_result = msgpack.unpackb(result, raw=False)
status, conduits, dims, locations = decoded_result
assert status == ResponseType.SUCCESS.value
assert conduits[0][0] == 'macro.out'
assert conduits[0][1] == 'micro.in'
assert conduits[1][0] == 'micro.out'
assert conduits[1][1] == 'macro.in'
assert dims['macro'] == []
assert locations['macro'] == ['direct:macro']
[docs]def test_request_peers_bidir(registered_mmp_request_handler2):
request = [RequestType.GET_PEERS.value, 'meso[2]']
encoded_request = msgpack.packb(request, use_bin_type=True)
result = registered_mmp_request_handler2.handle_request(encoded_request)
decoded_result = msgpack.unpackb(result, raw=False)
status, conduits, dims, locations = decoded_result
assert status == ResponseType.SUCCESS.value
assert conduits[0][0] == 'macro.out'
assert conduits[0][1] == 'meso.in'
assert conduits[1][0] == 'meso.out'
assert conduits[1][1] == 'micro.in'
assert conduits[2][0] == 'micro.out'
assert conduits[2][1] == 'meso.in'
assert conduits[3][0] == 'meso.out'
assert conduits[3][1] == 'macro.in'
assert dims['micro'] == [5, 10]
assert dims['macro'] == []
assert locations['macro'] == ['direct:macro']
assert locations['macro'] == ['direct:macro']
for i in range(10):
assert locations[f'micro[2][{i}]'] == [f'direct:micro[2][{i}]']
[docs]def test_request_peers_own_conduits(registered_mmp_request_handler2):
request = [RequestType.GET_PEERS.value, 'macro']
encoded_request = msgpack.packb(request, use_bin_type=True)
result = registered_mmp_request_handler2.handle_request(encoded_request)
decoded_result = msgpack.unpackb(result, raw=False)
status, conduits, dims, locations = decoded_result
assert status == ResponseType.SUCCESS.value
assert conduits[0][0] == 'macro.out'
assert conduits[0][1] == 'meso.in'
assert conduits[1][0] == 'meso.out'
assert conduits[1][1] == 'macro.in'
[docs]def test_request_peers_unknown(registered_mmp_request_handler2):
request = [RequestType.GET_PEERS.value, 'does_not_exist']
encoded_request = msgpack.packb(request, use_bin_type=True)
result = registered_mmp_request_handler2.handle_request(encoded_request)
decoded_result = msgpack.unpackb(result, raw=False)
status, error_msg = decoded_result
assert status == ResponseType.ERROR.value
assert error_msg is not None
assert 'does_not_exist' in error_msg
[docs]def test_submit_snapshot(registered_mmp_request_handler):
register_snapshot = MagicMock()
registered_mmp_request_handler._snapshot_registry.register_snapshot = \
register_snapshot
instance_id = 'micro[1][2]'
snapshot = SnapshotMetadata(
['1', '2'], 1.234, 2.345, 3.456,
{'in': [1], 'out': [0]}, True, 'fname')
snapshot_dict = dataclasses.asdict(snapshot)
request = [RequestType.SUBMIT_SNAPSHOT.value, instance_id, snapshot_dict]
encoded_request = msgpack.packb(request, use_bin_type=True)
result = registered_mmp_request_handler.handle_request(encoded_request)
decoded_result = msgpack.unpackb(result, raw=False)
assert decoded_result[0] == ResponseType.SUCCESS.value
register_snapshot.assert_called_once_with(Reference(instance_id), snapshot)