Source code for libmuscle.test.test_mmp_client

from unittest.mock import patch

import msgpack
import pytest
from ymmsl.v0_2 import Conduit, Operator, Port, Reference

import libmuscle
from libmuscle.logging import LogLevel, LogMessage, Timestamp
from libmuscle.mcp.protocol import RequestType, ResponseType
from libmuscle.mmp_client import MMPClient


[docs] def test_init() -> None: with patch('libmuscle.mmp_client.TcpTransportClient') as mock_ttc: stub = mock_ttc.return_value client = MMPClient(Reference([]), '') assert client._transport_client == stub # type: ignore
[docs] def test_connection_fail() -> None: ttc = 'libmuscle.mcp.tcp_transport_client' with patch(f'{ttc}._CONNECT_TIMEOUT', 0.1): with patch(f'{ttc}.RECONNECT_TIMEOUT', 0.5): with pytest.raises(ConnectionError): # Port 255 is reserved and privileged, so there's probably # nothing there. client = MMPClient(Reference([]), 'tcp:localhost:255') client.get_settings()
[docs] def test_submit_log_message(mocked_mmp_client, profile_data) -> None: client, stub = mocked_mmp_client result = [ResponseType.SUCCESS.value] stub.call.return_value = ( msgpack.packb(result, use_bin_type=True), profile_data) message = LogMessage( 'test_mmp_client', Timestamp(1.0), LogLevel.WARNING, 'Testing the MMPClient') client.submit_log_message(message) assert stub.call.called sent_request = stub.call.call_args[0][0] decoded_request = msgpack.unpackb(sent_request, raw=False) assert decoded_request == [ RequestType.SUBMIT_LOG_MESSAGE.value, 'test_mmp_client', 1.0, LogLevel.WARNING.value, 'Testing the MMPClient']
[docs] def test_get_settings(mocked_mmp_client, profile_data) -> None: client, stub = mocked_mmp_client settings_msg = { 'test1': 'test', 'test2': 12, 'test3': 3.14, 'test4': True, 'test5': [1.2, 3.4], 'test6': [[1.2, 3.4], [5.6, 7.8]]} transport_result = [ResponseType.SUCCESS.value, settings_msg] stub.call.return_value = ( msgpack.packb(transport_result, use_bin_type=True), profile_data) settings = client.get_settings() assert len(settings) == 6 assert settings['test1'] == 'test' assert settings['test2'] == 12 assert settings['test3'] == 3.14 assert settings['test4'] is True assert settings['test5'] == [1.2, 3.4] assert settings['test6'] == [[1.2, 3.4], [5.6, 7.8]]
[docs] def test_register_instance(mocked_mmp_client, profile_data) -> None: client, stub = mocked_mmp_client result = [ResponseType.SUCCESS.value] stub.call.return_value = ( msgpack.packb(result, use_bin_type=True), profile_data) client.register_instance( ['direct:test', 'tcp:test'], [Port('out', Operator.O_I), Port('in', Operator.S)]) assert stub.call.called sent_msg = msgpack.unpackb(stub.call.call_args[0][0], raw=False) assert sent_msg == [ RequestType.REGISTER_INSTANCE.value, 'component[13]', ['direct:test', 'tcp:test'], [['out', 'O_I'], ['in', 'S']], libmuscle.__version__]
[docs] def test_request_peers(mocked_mmp_client, profile_data) -> None: client, stub = mocked_mmp_client result_msg = [ ResponseType.SUCCESS.value, [['component.out', 'other.in']], {'other': [20]}, {'other': ['direct:test', 'tcp:test']}, [["out", "O_F"]]] stub.call.return_value = ( msgpack.packb(result_msg, use_bin_type=True), profile_data) peer_info = client.request_peers() assert stub.call.called sent_msg = msgpack.unpackb(stub.call.call_args[0][0], raw=False) assert sent_msg[0] == RequestType.GET_PEERS.value assert sent_msg[1] == 'component[13]' assert peer_info._kernel == Reference("component") assert peer_info._index == [13] assert len(peer_info._conduits) == 1 assert isinstance(peer_info._conduits[0], Conduit) assert peer_info._conduits[0].sender == 'component.out' assert peer_info._conduits[0].receiver == 'other.in' assert peer_info._incoming_ports == [] assert peer_info._outgoing_ports == [Reference("component.out")] assert peer_info._peers == {"component.out": ["other.in"]} assert peer_info._peer_dims == {"other": [20]} assert peer_info._peer_locations == {"other": ['direct:test', 'tcp:test']} assert len(peer_info._ymmsl_ports) == 1 assert peer_info._ymmsl_ports[0].name == "out" assert peer_info._ymmsl_ports[0].operator == Operator.O_F
[docs] def test_request_peers_error(mocked_mmp_client, profile_data) -> None: client, stub = mocked_mmp_client result_msg = [ResponseType.ERROR.value, 'test_error_message'] stub.call.return_value = ( msgpack.packb(result_msg, use_bin_type=True), profile_data) with pytest.raises(RuntimeError): client.request_peers()
[docs] def test_request_peers_timeout(mocked_mmp_client, profile_data) -> None: client, stub = mocked_mmp_client result_msg = [ResponseType.PENDING.value, 'test_status_message'] stub.call.return_value = ( msgpack.packb(result_msg, use_bin_type=True), profile_data) with patch('libmuscle.mmp_client.PEER_TIMEOUT', 1), \ patch('libmuscle.mmp_client.PEER_INTERVAL_MIN', 0.1), \ patch('libmuscle.mmp_client.PEER_INTERVAL_MAX', 1.0): with pytest.raises(RuntimeError): client.request_peers()
[docs] def test_deregister_instance(mocked_mmp_client, profile_data) -> None: client, stub = mocked_mmp_client result = [ResponseType.SUCCESS.value] stub.call.return_value = ( msgpack.packb(result, use_bin_type=True), profile_data) client.deregister_instance() assert stub.call.called sent_msg = msgpack.unpackb(stub.call.call_args[0][0], raw=False) assert sent_msg == [RequestType.DEREGISTER_INSTANCE.value, 'component[13]']
[docs] def test_deregister_instance_error(mocked_mmp_client, profile_data) -> None: client, stub = mocked_mmp_client result = [ResponseType.ERROR.value, 'Instance component[13] unknown'] stub.call.return_value = ( msgpack.packb(result, use_bin_type=True), profile_data) with pytest.raises(RuntimeError): client.deregister_instance() assert stub.call.called sent_msg = msgpack.unpackb(stub.call.call_args[0][0], raw=False) assert sent_msg == [RequestType.DEREGISTER_INSTANCE.value, 'component[13]']