from pathlib import Path
from unittest.mock import MagicMock
from ymmsl import Reference, Settings
from libmuscle.communicator import Message
from libmuscle.snapshot import SnapshotMetadata
from libmuscle.snapshot_manager import SnapshotManager
[docs]def test_no_checkpointing(tmp_path: Path) -> None:
manager = MagicMock()
communicator = MagicMock()
communicator.get_message_counts.return_value = {}
snapshot_manager = SnapshotManager(Reference('test'), manager, communicator)
snapshot_manager.prepare_resume(None, tmp_path)
assert not snapshot_manager.resuming_from_intermediate()
assert not snapshot_manager.resuming_from_final()
[docs]def test_save_load_snapshot(tmp_path: Path) -> None:
manager = MagicMock()
communicator = MagicMock()
port_message_counts = {'in': [1], 'out': [2], 'muscle_settings_in': [0]}
communicator.get_message_counts.return_value = port_message_counts
instance_id = Reference('test[1]')
snapshot_manager = SnapshotManager(instance_id, manager, communicator)
snapshot_manager.prepare_resume(None, tmp_path)
assert not snapshot_manager.resuming_from_intermediate()
assert not snapshot_manager.resuming_from_final()
snapshot_manager.save_snapshot(
Message(0.2, None, 'test data'), False, ['test'], 13.0, None,
Settings())
communicator.get_message_counts.assert_called_with()
manager.submit_snapshot_metadata.assert_called()
metadata, = manager.submit_snapshot_metadata.call_args[0]
assert isinstance(metadata, SnapshotMetadata)
assert metadata.triggers == ['test']
assert metadata.wallclock_time == 13.0
assert metadata.timestamp == 0.2
assert metadata.next_timestamp is None
assert metadata.port_message_counts == port_message_counts
assert not metadata.is_final_snapshot
snapshot_path = Path(metadata.snapshot_filename)
assert snapshot_path.parent == tmp_path
assert snapshot_path.name == 'test-1_1.pack'
snapshot_manager2 = SnapshotManager(instance_id, manager, communicator)
snapshot_manager2.prepare_resume(snapshot_path, tmp_path)
communicator.restore_message_counts.assert_called_with(port_message_counts)
assert snapshot_manager2.resuming_from_intermediate()
assert not snapshot_manager2.resuming_from_final()
msg = snapshot_manager2.load_snapshot()
assert msg.timestamp == 0.2
assert msg.next_timestamp is None
assert msg.data == 'test data'
snapshot_manager2.save_snapshot(
Message(0.6, None, 'test data2'), True, ['test'], 42.2, 1.2,
Settings())
metadata, = manager.submit_snapshot_metadata.call_args[0]
assert isinstance(metadata, SnapshotMetadata)
assert metadata.triggers == ['test']
assert metadata.wallclock_time == 42.2
assert metadata.timestamp == 0.6
assert metadata.next_timestamp is None
assert metadata.port_message_counts == port_message_counts
assert metadata.is_final_snapshot
snapshot_path = Path(metadata.snapshot_filename)
assert snapshot_path.parent == tmp_path
assert snapshot_path.name == 'test-1_3.pack'
[docs]def test_save_load_implicit_snapshot(tmp_path: Path) -> None:
manager = MagicMock()
communicator = MagicMock()
port_message_counts = {'in': [1], 'out': [2], 'muscle_settings_in': [0]}
communicator.get_message_counts.return_value = port_message_counts
instance_id = Reference('test[1]')
snapshot_manager = SnapshotManager(instance_id, manager, communicator)
snapshot_manager.prepare_resume(None, tmp_path)
assert not snapshot_manager.resuming_from_intermediate()
assert not snapshot_manager.resuming_from_final()
# save implicit snapshot
snapshot_manager.save_snapshot(
None, True, ['implicit'], 1.0, 1.5, Settings())
manager.submit_snapshot_metadata.assert_called_once()
metadata, = manager.submit_snapshot_metadata.call_args[0]
assert isinstance(metadata, SnapshotMetadata)
snapshot_path = Path(metadata.snapshot_filename)
manager.submit_snapshot_metadata.reset_mock()
snapshot_manager2 = SnapshotManager(instance_id, manager, communicator)
snapshot_manager2.prepare_resume(snapshot_path, tmp_path)
communicator.restore_message_counts.assert_called_with(port_message_counts)
manager.submit_snapshot_metadata.assert_called_once()
manager.submit_snapshot_metadata.reset_mock()
assert not snapshot_manager2.resuming_from_intermediate()
assert not snapshot_manager2.resuming_from_final()
snapshot_manager2.save_snapshot(
None, True, ['implicit'], 12.3, 2.5, Settings())
manager.submit_snapshot_metadata.assert_called_once()