from unittest.mock import patch
import msgpack
import pytest
from ymmsl.v0_2 import Conduit, Operator, Port, Reference
import libmuscle
from libmuscle.logging import LogLevel, LogMessage, Timestamp
from libmuscle.mcp.protocol import RequestType, ResponseType
from libmuscle.mmp_client import MMPClient
[docs]
def test_init() -> None:
with patch('libmuscle.mmp_client.TcpTransportClient') as mock_ttc:
stub = mock_ttc.return_value
client = MMPClient(Reference([]), '')
assert client._transport_client == stub # type: ignore
[docs]
def test_connection_fail() -> None:
ttc = 'libmuscle.mcp.tcp_transport_client'
with patch(f'{ttc}._CONNECT_TIMEOUT', 0.1):
with patch(f'{ttc}.RECONNECT_TIMEOUT', 0.5):
with pytest.raises(ConnectionError):
# Port 255 is reserved and privileged, so there's probably
# nothing there.
client = MMPClient(Reference([]), 'tcp:localhost:255')
client.get_settings()
[docs]
def test_submit_log_message(mocked_mmp_client, profile_data) -> None:
client, stub = mocked_mmp_client
result = [ResponseType.SUCCESS.value]
stub.call.return_value = (
msgpack.packb(result, use_bin_type=True), profile_data)
message = LogMessage(
'test_mmp_client',
Timestamp(1.0),
LogLevel.WARNING,
'Testing the MMPClient')
client.submit_log_message(message)
assert stub.call.called
sent_request = stub.call.call_args[0][0]
decoded_request = msgpack.unpackb(sent_request, raw=False)
assert decoded_request == [
RequestType.SUBMIT_LOG_MESSAGE.value,
'test_mmp_client', 1.0, LogLevel.WARNING.value,
'Testing the MMPClient']
[docs]
def test_get_settings(mocked_mmp_client, profile_data) -> None:
client, stub = mocked_mmp_client
settings_msg = {
'test1': 'test',
'test2': 12,
'test3': 3.14,
'test4': True,
'test5': [1.2, 3.4],
'test6': [[1.2, 3.4], [5.6, 7.8]]}
transport_result = [ResponseType.SUCCESS.value, settings_msg]
stub.call.return_value = (
msgpack.packb(transport_result, use_bin_type=True), profile_data)
settings = client.get_settings()
assert len(settings) == 6
assert settings['test1'] == 'test'
assert settings['test2'] == 12
assert settings['test3'] == 3.14
assert settings['test4'] is True
assert settings['test5'] == [1.2, 3.4]
assert settings['test6'] == [[1.2, 3.4], [5.6, 7.8]]
[docs]
def test_register_instance(mocked_mmp_client, profile_data) -> None:
client, stub = mocked_mmp_client
result = [ResponseType.SUCCESS.value]
stub.call.return_value = (
msgpack.packb(result, use_bin_type=True), profile_data)
client.register_instance(
['direct:test', 'tcp:test'],
[Port('out', Operator.O_I), Port('in', Operator.S)])
assert stub.call.called
sent_msg = msgpack.unpackb(stub.call.call_args[0][0], raw=False)
assert sent_msg == [
RequestType.REGISTER_INSTANCE.value, 'component[13]',
['direct:test', 'tcp:test'], [['out', 'O_I'], ['in', 'S']],
libmuscle.__version__]
[docs]
def test_request_peers(mocked_mmp_client, profile_data) -> None:
client, stub = mocked_mmp_client
result_msg = [
ResponseType.SUCCESS.value,
[['component.out', 'other.in']],
{'other': [20]},
{'other': ['direct:test', 'tcp:test']},
[["out", "O_F"]]]
stub.call.return_value = (
msgpack.packb(result_msg, use_bin_type=True), profile_data)
peer_info = client.request_peers()
assert stub.call.called
sent_msg = msgpack.unpackb(stub.call.call_args[0][0], raw=False)
assert sent_msg[0] == RequestType.GET_PEERS.value
assert sent_msg[1] == 'component[13]'
assert peer_info._kernel == Reference("component")
assert peer_info._index == [13]
assert len(peer_info._conduits) == 1
assert isinstance(peer_info._conduits[0], Conduit)
assert peer_info._conduits[0].sender == 'component.out'
assert peer_info._conduits[0].receiver == 'other.in'
assert peer_info._incoming_ports == []
assert peer_info._outgoing_ports == [Reference("component.out")]
assert peer_info._peers == {"component.out": ["other.in"]}
assert peer_info._peer_dims == {"other": [20]}
assert peer_info._peer_locations == {"other": ['direct:test', 'tcp:test']}
assert len(peer_info._ymmsl_ports) == 1
assert peer_info._ymmsl_ports[0].name == "out"
assert peer_info._ymmsl_ports[0].operator == Operator.O_F
[docs]
def test_request_peers_error(mocked_mmp_client, profile_data) -> None:
client, stub = mocked_mmp_client
result_msg = [ResponseType.ERROR.value, 'test_error_message']
stub.call.return_value = (
msgpack.packb(result_msg, use_bin_type=True), profile_data)
with pytest.raises(RuntimeError):
client.request_peers()
[docs]
def test_request_peers_timeout(mocked_mmp_client, profile_data) -> None:
client, stub = mocked_mmp_client
result_msg = [ResponseType.PENDING.value, 'test_status_message']
stub.call.return_value = (
msgpack.packb(result_msg, use_bin_type=True), profile_data)
with patch('libmuscle.mmp_client.PEER_TIMEOUT', 1), \
patch('libmuscle.mmp_client.PEER_INTERVAL_MIN', 0.1), \
patch('libmuscle.mmp_client.PEER_INTERVAL_MAX', 1.0):
with pytest.raises(RuntimeError):
client.request_peers()
[docs]
def test_deregister_instance(mocked_mmp_client, profile_data) -> None:
client, stub = mocked_mmp_client
result = [ResponseType.SUCCESS.value]
stub.call.return_value = (
msgpack.packb(result, use_bin_type=True), profile_data)
client.deregister_instance()
assert stub.call.called
sent_msg = msgpack.unpackb(stub.call.call_args[0][0], raw=False)
assert sent_msg == [RequestType.DEREGISTER_INSTANCE.value, 'component[13]']
[docs]
def test_deregister_instance_error(mocked_mmp_client, profile_data) -> None:
client, stub = mocked_mmp_client
result = [ResponseType.ERROR.value, 'Instance component[13] unknown']
stub.call.return_value = (
msgpack.packb(result, use_bin_type=True), profile_data)
with pytest.raises(RuntimeError):
client.deregister_instance()
assert stub.call.called
sent_msg = msgpack.unpackb(stub.call.call_args[0][0], raw=False)
assert sent_msg == [RequestType.DEREGISTER_INSTANCE.value, 'component[13]']