import logging
from contextlib import nullcontext as does_not_raise
from unittest.mock import MagicMock, patch
import pytest
from ymmsl.v0_2 import Checkpoints, Operator, Settings
from ymmsl.v0_2 import Reference as Ref
from libmuscle.instance import Instance, InstanceFlags
from libmuscle.mpp_message import ClosePort
from libmuscle.peer_info import PeerInfo
[docs]
@pytest.fixture
def logger():
with patch('libmuscle.instance._logger') as logger:
yield logger
[docs]
@pytest.fixture(autouse=True)
def MMPClient():
with patch('libmuscle.instance.MMPClient') as MMPClient:
mmp_client = MMPClient.return_value
mmp_client.request_peers.return_value = PeerInfo(*[MagicMock()]*6)
checkpoints = MagicMock()
checkpoints.__bool__.return_value = False
mmp_client.get_checkpoint_info.return_value = [
MagicMock(), checkpoints, MagicMock(), MagicMock()]
yield MMPClient
[docs]
@pytest.fixture
def mmp_client(MMPClient):
return MMPClient.return_value
[docs]
@pytest.fixture
def api_guard():
with patch('libmuscle.instance.APIGuard') as APIGuard:
yield APIGuard.return_value
[docs]
@pytest.fixture
def profiler():
with patch('libmuscle.instance.Profiler') as Profiler:
yield Profiler.return_value
[docs]
@pytest.fixture
def port_manager():
with patch('libmuscle.instance.PortManager') as PortManager:
port_manager = PortManager.return_value
port_manager.settings_in_connected.return_value = False
yield port_manager
[docs]
@pytest.fixture
def communicator():
with patch('libmuscle.instance.Communicator') as Communicator:
yield Communicator.return_value
[docs]
@pytest.fixture
def settings_manager():
with patch('libmuscle.instance.SettingsManager') as SettingsManager:
settings_manager = SettingsManager.return_value
# Emulate no settings available
settings_manager.get_setting.side_effect = KeyError()
yield settings_manager
[docs]
@pytest.fixture(autouse=True)
def snapshot_manager():
with patch('libmuscle.instance.SnapshotManager') as SnapshotManager:
yield SnapshotManager.return_value
[docs]
@pytest.fixture(autouse=True)
def no_resume_snapshot_manager(snapshot_manager):
snapshot_manager.resuming_from_intermediate.return_value = False
snapshot_manager.resuming_from_final.return_value = False
snapshot_manager.resume_overlay = None
return snapshot_manager
[docs]
@pytest.fixture(autouse=True)
def trigger_manager():
with patch('libmuscle.instance.TriggerManager') as TriggerManager:
yield TriggerManager.return_value
[docs]
@pytest.fixture
def sys_argv():
with patch('libmuscle.instance.sys.argv', []) as sys_argv:
yield sys_argv
[docs]
@pytest.fixture
def manager_location_argv(sys_argv):
sys_argv.extend(['--test', '--muscle-manager=tcp:localhost:9001', 'bla'])
yield None
[docs]
@pytest.fixture
def instance_argv(sys_argv):
sys_argv.extend(['distraction', '--muscle-instance=component'])
yield None
[docs]
@pytest.fixture
def os_environ():
with patch('libmuscle.instance.os.environ', {}) as os_environ:
yield os_environ
[docs]
@pytest.fixture
def manager_location_envvar(os_environ):
os_environ['MUSCLE_MANAGER'] = 'tcp:localhost:9002'
yield None
[docs]
@pytest.fixture
def instance_envvar(os_environ):
os_environ['MUSCLE_INSTANCE'] = 'component[13]'
yield None
[docs]
@pytest.fixture
def instance(
logger, MMPClient, mmp_client, api_guard, profiler, connected_port_manager,
communicator, settings_manager, no_resume_snapshot_manager, trigger_manager,
manager_location_argv, instance_argv, declared_ports):
return Instance(declared_ports)
[docs]
@pytest.fixture
def instance_dont_apply_overlay(
logger, MMPClient, mmp_client, api_guard, profiler, connected_port_manager,
communicator, settings_manager, no_resume_snapshot_manager, trigger_manager,
manager_location_argv, instance_argv, declared_ports):
return Instance(declared_ports, InstanceFlags.DONT_APPLY_OVERLAY)
[docs]
def test_create_instance_manager_location_default(
instance_argv, MMPClient, declared_ports):
instance = Instance(declared_ports)
instance.error_shutdown("") # ensure all threads and resources are cleaned up
MMPClient.assert_called_once_with(Ref('component'), 'tcp:localhost:9000')
[docs]
def test_create_instance_manager_location_argv(
manager_location_argv, instance_argv, MMPClient, declared_ports):
instance = Instance(declared_ports)
instance.error_shutdown("") # ensure all threads and resources are cleaned up
MMPClient.assert_called_once_with(Ref('component'), 'tcp:localhost:9001')
[docs]
def test_create_instance_manager_location_envvar(
manager_location_envvar, instance_envvar, MMPClient, declared_ports):
instance = Instance(declared_ports)
instance.error_shutdown("") # ensure all threads and resources are cleaned up
MMPClient.assert_called_once_with(Ref('component[13]'), 'tcp:localhost:9002')
[docs]
def test_create_instance_registration(
manager_location_argv, instance_argv, mmp_client, communicator, port_manager,
profiler, declared_ports):
instance = Instance(declared_ports)
instance.error_shutdown("") # ensure all threads and resources are cleaned up
locations = communicator.get_locations.return_value
mmp_client.register_instance.assert_called_once()
assert mmp_client.register_instance.call_args[0][0] == locations
port_desc = mmp_client.register_instance.call_args[0][1]
assert port_desc[0].name == 'in'
assert port_desc[0].operator == Operator.F_INIT
assert port_desc[1].name == 'not_connected'
assert port_desc[1].operator == Operator.F_INIT
assert port_desc[2].name == 'out_v'
assert port_desc[2].operator == Operator.O_I
assert port_desc[3].name == 'out_r'
assert port_desc[3].operator == Operator.O_I
assert port_desc[4].name == 'in_v'
assert port_desc[4].operator == Operator.S
assert port_desc[5].name == 'in_r'
assert port_desc[5].operator == Operator.S
assert port_desc[6].name == 'not_connected_v'
assert port_desc[6].operator == Operator.S
assert port_desc[7].name == 'out'
assert port_desc[7].operator == Operator.O_F
[docs]
def test_create_instance_profiling(
manager_location_argv, instance_argv, profiler, declared_ports):
instance = Instance(declared_ports)
assert len(profiler.record_event.mock_calls) == 2
instance.error_shutdown("Ensure all threads and resources are cleaned up")
[docs]
def test_create_instance_connecting(
manager_location_argv, instance_argv, mmp_client, port_manager, communicator,
settings_manager, declared_ports):
peer_info = MagicMock()
mmp_client.request_peers.return_value = peer_info
settings = MagicMock()
mmp_client.get_settings.return_value = settings
instance = Instance(declared_ports)
port_manager.connect_ports.assert_called_once_with(peer_info)
communicator.set_peer_info.assert_called_once_with(peer_info)
assert settings_manager.base == settings
instance.error_shutdown("Ensure all threads and resources are cleaned up")
[docs]
def test_create_instance_set_up_checkpointing(
manager_location_argv, instance_argv, mmp_client, trigger_manager,
no_resume_snapshot_manager, settings_manager, declared_ports):
instance = Instance(declared_ports)
elapsed_time, checkpoints, resume_path, snapshot_path = (
mmp_client.get_checkpoint_info.return_value)
trigger_manager.set_checkpoint_info.assert_called_with(elapsed_time, checkpoints)
no_resume_snapshot_manager.prepare_resume.assert_called_with(
resume_path, snapshot_path)
assert settings_manager.overlay != no_resume_snapshot_manager.resume_overlay
instance.error_shutdown("Ensure all threads and resources are cleaned up")
[docs]
def test_create_instance_set_up_logging(
manager_location_argv, instance_argv, settings_manager, declared_ports):
def get_setting(instance, name, typ, default=None):
return {
'muscle_local_log_level': 'debug',
'muscle_remote_log_level': 'error'}[str(name)]
settings_manager.get_setting = get_setting
root_logger = MagicMock()
root_logger.isEnabledFor.return_value = False
libmuscle_logger = MagicMock()
ymmsl_logger = MagicMock()
def get_logger(name=''):
return {
'': root_logger,
'libmuscle': libmuscle_logger,
'ymmsl': ymmsl_logger}[name]
with patch('libmuscle.instance.logging.getLogger', get_logger):
instance = Instance(declared_ports)
assert instance._mmp_handler.level == logging.ERROR
root_logger.setLevel.assert_called_with(logging.ERROR)
libmuscle_logger.setLevel.assert_called_with(logging.DEBUG)
ymmsl_logger.setLevel.assert_called_with(logging.DEBUG)
instance.error_shutdown("Ensure all threads and resources are cleaned up")
[docs]
def test_shutdown_instance(
logger, instance, mmp_client, communicator, profiler):
msg = 'Testing'
num_profile_events = len(profiler.record_event.mock_calls)
instance.error_shutdown(msg)
logger.critical.assert_called_with(msg)
communicator.shutdown.assert_called()
mmp_client.deregister_instance.assert_called_once_with()
mmp_client.close.assert_called_once_with()
assert len(profiler.record_event.mock_calls) == num_profile_events + 1
profiler.shutdown.assert_called_once_with()
[docs]
def test_list_settings(instance, settings_manager):
instance.list_settings()
settings_manager.list_settings.assert_called_with(Ref('component'))
[docs]
def test_get_setting(instance, settings_manager):
settings_manager.get_setting.side_effect = None # don't raise KeyError
instance.get_setting('test', 'int')
settings_manager.get_setting.assert_called_with(
Ref('component'), Ref('test'), 'int')
[docs]
def test_get_setting_with_default(instance, settings_manager):
settings_manager.get_setting.side_effect = None # don't raise KeyError
settings_manager.get_setting.return_value = 'default_value'
result = instance.get_setting('test', default='default_value')
settings_manager.get_setting.assert_called_with(
Ref('component'), Ref('test'), None)
assert result == 'default_value'
[docs]
def test_get_setting_with_default_and_type(instance, settings_manager):
settings_manager.get_setting.side_effect = None # don't raise KeyError
settings_manager.get_setting.return_value = 42
result = instance.get_setting('test', 'int', default=0)
settings_manager.get_setting.assert_called_with(
Ref('component'), Ref('test'), 'int')
assert result == 42
[docs]
def test_list_ports(instance, port_manager):
port_manager.list_ports.assert_called_once_with()
port_manager.list_ports.reset_mock()
instance.list_ports()
port_manager.list_ports.assert_called_once_with()
[docs]
def test_is_connected(instance):
assert instance.is_connected('in') is True
assert instance.is_connected('not_connected') is False
assert instance.is_connected('out_v') is True
assert instance.is_connected('out_r') is True
assert instance.is_connected('in_v') is True
assert instance.is_connected('in_r') is True
assert instance.is_connected('not_connected_v') is False
assert instance.is_connected('out') is True
[docs]
def test_is_vector_port(instance):
assert instance.is_vector_port('in') is False
assert instance.is_vector_port('not_connected') is False
assert instance.is_vector_port('out_v') is True
assert instance.is_vector_port('out_r') is True
assert instance.is_vector_port('in_v') is True
assert instance.is_vector_port('in_r') is True
assert instance.is_vector_port('not_connected_v') is True
assert instance.is_vector_port('out') is False
[docs]
def test_is_resizable(instance):
for port in ('in', 'not_connected', 'out_v', 'in_v', 'out'):
assert instance.is_resizable(port) is False
assert instance.is_resizable('out_r') is True
assert instance.is_resizable('in_r') is True
assert instance.is_resizable('not_connected_v') is True
[docs]
def test_get_port_length(instance):
assert instance.get_port_length('out_v') == 13
assert instance.get_port_length('out_r') == 0
assert instance.get_port_length('in_v') == 13
assert instance.get_port_length('in_r') == 0
[docs]
def test_set_port_length(instance, port_manager):
instance.set_port_length('not_connected_v', 7)
assert port_manager.get_port('not_connected_v').get_length() == 7
[docs]
def test_reuse_set_overlay(
instance, port_manager, mock_ports, communicator, settings_manager):
port_manager.settings_in_connected.return_value = True
mock_ports['in']._is_connected = False
mock_msg = MagicMock()
mock_msg.data = Settings({'s1': 1, 's2': 2})
mock_msg.settings = Settings({'s0': 0})
communicator.receive_message.return_value = mock_msg, 0.0
instance.reuse_instance()
communicator.receive_message.assert_called_with('muscle_settings_in')
assert settings_manager.overlay['s0'] == 0
assert settings_manager.overlay['s1'] == 1
assert settings_manager.overlay['s2'] == 2
[docs]
@pytest.mark.parametrize('closed_port', ['muscle_settings_in', 'in'])
def test_reuse_closed_port(instance, port_manager, communicator, closed_port):
def receive_message(port, slot=None, default=None):
mock_msg = MagicMock()
if port == closed_port:
mock_msg.data = ClosePort()
else:
mock_msg.data = Settings()
return mock_msg, 0.0
port_manager.settings_in_connected.return_value = True
communicator.receive_message = receive_message
assert instance.reuse_instance() is False
[docs]
def test_reuse_f_init_vector_port(instance, port_manager, communicator):
port_manager.get_port('in')._length = 10
def receive_message(port, slot=None, default=None):
mock_msg = MagicMock()
mock_msg.data = Settings()
return mock_msg, 0.0
communicator.receive_message = receive_message
assert instance.reuse_instance() is True
[docs]
def test_reuse_no_f_init_ports(instance, connected_port_manager, communicator):
connected_port_manager.list_ports.return_value = {}
assert instance.reuse_instance() is True
assert instance.reuse_instance() is False
[docs]
def test_send_message(instance, settings_manager, communicator):
port = 'out_v'
msg = MagicMock()
msg.settings = None
slot = 3
instance.send(port, msg, slot)
communicator.send_message.assert_called_once()
args = communicator.send_message.call_args[0]
assert args[0] == port
assert args[1].settings == settings_manager.overlay
assert args[2] == slot
[docs]
def test_send_on_invalid_port(instance):
with pytest.raises(RuntimeError):
instance.send('does_not_exist', MagicMock())
[docs]
def test_send_after_resize(instance, message):
with pytest.raises(RuntimeError):
instance.send('out_r', message, 13)
instance.set_port_length('out_r', 20)
instance.send('out_r', message, 13)
[docs]
def test_send_on_receiving_port(instance, message):
with pytest.raises(RuntimeError):
instance.send("in_v", message, 3)
[docs]
def test_receive_on_invalid_port(instance):
with pytest.raises(RuntimeError):
instance.receive('does_not_exist')
[docs]
def test_receive_on_sending_port(instance):
with pytest.raises(RuntimeError):
instance.receive("out_v", 3)
[docs]
def test_receive_f_init(instance, port_manager, communicator):
mock_msg = MagicMock()
mock_msg.data = Settings()
communicator.receive_message.return_value = mock_msg, 0.0
instance.reuse_instance()
msg = instance.receive('in')
assert msg == mock_msg
[docs]
def test_receive_default_f_init(instance):
default_msg = MagicMock()
msg = instance.receive('not_connected', default=default_msg)
assert msg == default_msg
[docs]
def test_receive_default(instance):
default_msg = MagicMock()
msg = instance.receive('not_connected_v', 42, default_msg)
assert msg == default_msg
[docs]
def test_receive_no_default(instance):
with pytest.raises(RuntimeError):
instance.receive('not_connected')
with pytest.raises(RuntimeError):
instance.receive('not_connected_v', 14)
[docs]
def test_receive_inconsistent_settings(
instance, settings_manager, port_manager, communicator):
def receive_message(port, slot=None):
mock_msg = MagicMock()
if port == 'muscle_settings_in':
mock_msg.data = Settings({'s1': 1})
mock_msg.settings = Settings()
else:
mock_msg.data = None
mock_msg.settings = Settings({'s0': 0})
return mock_msg, 0.0
communicator.receive_message.side_effect = receive_message
port_manager.settings_in_connected.return_value = True
with pytest.raises(RuntimeError):
instance.reuse_instance()
[docs]
def test_receive_with_settings(
instance_dont_apply_overlay, settings_manager, communicator):
mock_msg = MagicMock()
mock_msg.settings = Settings({'s0': 0, 's1': 1})
communicator.receive_message.return_value = mock_msg, 0.0
instance_dont_apply_overlay.reuse_instance()
msg = instance_dont_apply_overlay.receive('in')
assert msg.settings['s0'] == 0
assert msg.settings['s1'] == 1
assert len(settings_manager.overlay) == 0
[docs]
def test_receive_with_settings_default(
instance_dont_apply_overlay, settings_manager, port_manager, communicator):
port_manager.get_port('in')._is_connected = False
instance_dont_apply_overlay.reuse_instance()
default_msg = MagicMock()
default_msg.settings = MagicMock()
msg = instance_dont_apply_overlay.receive('in', default=default_msg)
assert msg == default_msg
assert msg.settings == default_msg.settings
assert len(settings_manager.overlay) == 0
[docs]
@pytest.mark.parametrize('flags, expectation', [
(InstanceFlags(0), pytest.raises(RuntimeError)),
(InstanceFlags.USES_CHECKPOINT_API, does_not_raise()),
(InstanceFlags.KEEPS_NO_STATE_FOR_NEXT_USE, does_not_raise()),
(InstanceFlags.STATE_NOT_REQUIRED_FOR_NEXT_USE, does_not_raise())])
def test_checkpoint_support(
instance_argv, mmp_client, tmp_path, flags, expectation):
checkpoint_info = (0.0, Checkpoints(at_end=True), None, tmp_path)
mmp_client.get_checkpoint_info.return_value = checkpoint_info
with expectation:
instance = Instance(flags=flags)
instance.error_shutdown("Ensure all threads and resources are cleaned up")