from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock
import pytest
from ymmsl.v0_2 import (
Configuration, Model, Component, Conduit, KeepsStateForNextUse, Ports, Program,
Reference)
from libmuscle.manager.snapshot_registry import (
SnapshotNode, SnapshotRegistry, calc_consistency, calc_consistency_list,
safe_get, _ConnectionInfo)
from libmuscle.manager.topology_store import TopologyStore
from libmuscle.snapshot import SnapshotMetadata
[docs]
def make_snapshot(**msg_counts) -> SnapshotMetadata:
return SnapshotMetadata([], 0, 0, 0, {**msg_counts}, False, '')
[docs]
@pytest.fixture(params=[True, False])
def micro_is_stateless(request: pytest.FixtureRequest) -> bool:
return request.param
[docs]
@pytest.fixture
def macro_micro(micro_is_stateless: bool) -> Configuration:
components = [
Component('macro', Ports(), '', 'macro_impl'),
Component('micro', Ports(), '', 'micro_impl')]
conduits = [
Conduit('macro.o_i', 'micro.f_i'),
Conduit('micro.o_f', 'macro.s')]
model = Model('macro_micro', None, '', None, components, conduits)
if micro_is_stateless:
micro_impl = Program(
'micro_impl',
keeps_state_for_next_use=KeepsStateForNextUse.NO,
executable='pass')
else:
micro_impl = Program('micro_impl', executable='pass')
programs = [
Program('macro_impl', executable='pass'),
micro_impl]
return Configuration('macro_micro', [], [model], programs=programs)
[docs]
@pytest.fixture
def uq(macro_micro: Configuration) -> Configuration:
model = macro_micro.models['macro_micro']
for component in model.components.values():
component.multiplicity = [5]
model.components[Reference('qmc')] = Component(
'qmc', Ports(o_i='parameters_out', s='states_in'), '', 'qmc_impl')
model.components[Reference('rr')] = Component(
'rr',
Ports(f_init='front_in', o_i='back_out', s='back_in', o_f='front_out'), '',
'rr_impl')
model.conduits.extend([
Conduit('qmc.parameters_out', 'rr.front_in'),
Conduit('rr.front_out', 'qmc.states_in'),
Conduit('rr.back_out', 'macro.muscle_settings_in'),
Conduit('macro.final_state_out', 'rr.back_in')])
macro_micro.programs[Reference('qmc_impl')] = Program('qmc_impl', executable='pass')
macro_micro.programs[Reference('rr_impl')] = Program('rr_impl', executable='pass')
return macro_micro
[docs]
def test_safe_get() -> None:
assert safe_get([], 0, 1) == 1
assert safe_get([3], 0, 1) == 3
assert safe_get([3], 1, 5) == 5
for i in range(10):
expected = -1 if i >= 3 else i + 3
assert safe_get([3, 4, 5], i, -1) == expected
[docs]
def test_calc_consistency() -> None:
num_sent = 3
for num_received in [2, 3, 4, 5]:
expect = num_received in [3, 4]
assert calc_consistency(num_sent, num_received, True, False) is expect
assert calc_consistency(num_received, num_sent, False, False) is expect
num_received = 10
for num_sent in [8, 9, 10, 11]:
expect = num_sent in [9, 10]
assert calc_consistency(num_sent, num_received, True, False) is expect
assert calc_consistency(num_received, num_sent, False, False) is expect
[docs]
def test_calc_consistency_with_restart() -> None:
# Check normal rules
assert calc_consistency(0, 0, True, True)
assert calc_consistency(0, 0, False, True)
assert not calc_consistency(1, 0, True, True)
assert not calc_consistency(1, 0, True, False)
assert calc_consistency(1, 0, False, False)
# Different: num2 == 0 comes from the restarted actor, we do not want a
# resume file to be created in this instance (because an instance further in
# the call chain is ahead of the one that would be restarted):
assert not calc_consistency(1, 0, False, True)
[docs]
def test_calc_consistency_list() -> None:
num_sent = [3, 3]
for num_received in [[2, 3], [3, 2], [3, 5], [], [4, 4, 0, 0, 2]]:
assert not calc_consistency_list(num_sent, num_received, True, False)
assert not calc_consistency_list(num_received, num_sent, False, False)
for num_received in [[3, 3], [3, 4], [4, 3], [4, 4],
[3, 3, 1], [4, 4, 0, 0, 0, 1, 0, 1]]:
assert calc_consistency_list(num_sent, num_received, True, False)
assert calc_consistency_list(num_received, num_sent, False, False)
[docs]
def test_write_ymmsl(tmp_path: Path):
configuration = Configuration('', [], [Model('empty', None, '')])
snapshot_registry = SnapshotRegistry(
configuration, tmp_path, TopologyStore(configuration))
snapshot_registry._write_snapshot_ymmsl([])
paths = list(tmp_path.iterdir())
assert len(paths) == 1
assert paths[0].suffix == ".ymmsl"
paths[0].unlink()
now = datetime.now()
for seconds in range(3):
time = (now + timedelta(seconds=seconds)).strftime("%Y%m%d_%H%M%S")
(tmp_path / f'snapshot_{time}.ymmsl').touch()
snapshot_registry._write_snapshot_ymmsl([])
paths = list(tmp_path.iterdir())
assert len(paths) == 4
paths = list(tmp_path.glob('*_1.ymmsl'))
assert len(paths) == 1
[docs]
def test_snapshot_config():
configuration = Configuration('', [], [Model('empty', None, '')])
snapshot_registry = SnapshotRegistry(
configuration, None, TopologyStore(configuration))
micro_metadata = SnapshotMetadata(
['simulation_time >= 24.0', 'wallclocktime >= 10'],
10.123456789, 24.3456789, None, {}, False, 'micro_snapshot')
macro_metadata = SnapshotMetadata(
['simulation_time >= 12.0', 'wallclocktime >= 10'],
10.123456789, 12.3456789, None, {}, False, 'macro_snapshot')
snapshots = [
SnapshotNode(1, Reference('micro'), micro_metadata, set()),
SnapshotNode(1, Reference('macro'), macro_metadata, set())]
now = datetime.now()
config = snapshot_registry._generate_snapshot_config(snapshots, now)
assert len(config.resume) == 2
assert config.resume[Reference('macro')] == Path('macro_snapshot')
assert config.resume[Reference('micro')] == Path('micro_snapshot')
# note: no automatic testing for formatting, should verify by eye if this
# looks okay..
print(config.description)
long_metadata = SnapshotMetadata(
['simulation_time >= 24.0'], 1.23456789e-10, 1.23456789e10, None,
{}, False, '/this/is/a/long/path/to/the/snapshot/file.pack')
snapshots.append(SnapshotNode(
1, Reference('this.is.a.long.reference[10]'), long_metadata, set()))
config = snapshot_registry._generate_snapshot_config(snapshots, now)
assert len(config.resume) == 3
assert config.resume[Reference('this.is.a.long.reference[10]')] == Path(
'/this/is/a/long/path/to/the/snapshot/file.pack')
print(config.description)
[docs]
def test_peers(uq: Configuration) -> None:
snapshot_registry = SnapshotRegistry(uq, None, TopologyStore(uq))
macro = Reference('macro')
micro = Reference('micro')
qmc = Reference('qmc')
rr = Reference('rr')
all_instances = {qmc, rr} | {macro + i for i in range(5)}
all_instances.update(micro + i for i in range(5))
assert snapshot_registry._instances == all_instances
assert snapshot_registry._get_peers(qmc) == {rr}
expected_rr_peers = {qmc} | {macro + i for i in range(5)}
assert snapshot_registry._get_peers(rr) == expected_rr_peers
for i in range(5):
assert snapshot_registry._get_peers(macro + i) == {rr, micro + i}
assert snapshot_registry._get_peers(micro + i) == {macro + i}
[docs]
def test_connections(uq: Configuration) -> None:
snapshot_registry = SnapshotRegistry(uq, None, TopologyStore(uq))
macro = Reference('macro')
micro = Reference('micro')
qmc = Reference('qmc')
rr = Reference('rr')
assert not snapshot_registry._get_connections(qmc, macro + 1)
assert not snapshot_registry._get_connections(macro + 3, qmc)
assert not snapshot_registry._get_connections(qmc, micro + 0)
assert not snapshot_registry._get_connections(micro + 1, qmc)
assert not snapshot_registry._get_connections(rr, micro + 4)
assert not snapshot_registry._get_connections(micro + 0, rr)
connections = snapshot_registry._get_connections(rr, qmc)
assert len(connections) == 2
for rr_port, qmc_port, info in connections:
assert rr_port in (Reference('front_out'), Reference('front_in'))
assert qmc_port in (Reference('parameters_out'), Reference('states_in'))
is_sending = bool(info & _ConnectionInfo.SELF_IS_SENDING)
assert is_sending is (rr_port == Reference('front_out'))
# Note: actually both are vector ports, but this is undetectable from
# the ymmsl configuration. Luckily we treat it the same as scalar-scalar
assert not (info & _ConnectionInfo.SELF_IS_VECTOR)
assert not (info & _ConnectionInfo.PEER_IS_VECTOR)
connections = snapshot_registry._get_connections(macro + 0, rr)
assert len(connections) == 2
for macro_port, rr_port, info in connections:
assert macro_port in (
Reference('muscle_settings_in'), Reference('final_state_out'))
assert rr_port in (Reference('back_out'), Reference('back_in'))
is_sending = bool(info & _ConnectionInfo.SELF_IS_SENDING)
assert is_sending is (macro_port == Reference('final_state_out'))
assert not (info & _ConnectionInfo.SELF_IS_VECTOR)
assert (info & _ConnectionInfo.PEER_IS_VECTOR)
connections = snapshot_registry._get_connections(rr, macro + 1)
assert len(connections) == 2
for rr_port, macro_port, info in connections:
assert macro_port in (
Reference('muscle_settings_in'), Reference('final_state_out'))
assert rr_port in (Reference('back_out'), Reference('back_in'))
is_sending = bool(info & _ConnectionInfo.SELF_IS_SENDING)
assert is_sending is (rr_port == Reference('back_out'))
assert (info & _ConnectionInfo.SELF_IS_VECTOR)
assert not (info & _ConnectionInfo.PEER_IS_VECTOR)
[docs]
def test_implementation(uq: Configuration) -> None:
snapshot_registry = SnapshotRegistry(uq, None, TopologyStore(uq))
qmc_impl = snapshot_registry._implementation(Reference('qmc'))
assert qmc_impl.name == 'qmc_impl'
missing_impl = snapshot_registry._implementation(Reference('missing'))
assert missing_impl is None
[docs]
def test_macro_micro_snapshots(macro_micro: Configuration) -> None:
snapshot_registry = SnapshotRegistry(
macro_micro, None, TopologyStore(macro_micro))
# prevent actually writing a ymmsl file, testing that separately
snapshot_registry._write_snapshot_ymmsl = MagicMock()
macro = Reference('macro')
micro = Reference('micro')
macro_snapshot = make_snapshot(o_i=[3], s=[3])
snapshot_registry._add_snapshot(macro, macro_snapshot)
assert len(snapshot_registry._snapshots[macro]) == 1
node = snapshot_registry._snapshots[macro][0]
assert node.consistent is False
assert node.consistent_peers == {}
assert node.instance == macro
assert node.num == 1
assert node.snapshot is macro_snapshot
assert node.peers == {micro}
snapshot_registry._write_snapshot_ymmsl.assert_not_called()
# Note: this snapshot is not realistic, it should have come in before
# the macro snapshot above. However, it's still useful for testing the
# consistency algorithm
micro_snapshot = make_snapshot(f_i=[2], o_f=[1])
snapshot_registry._add_snapshot(micro, micro_snapshot)
assert len(snapshot_registry._snapshots[micro]) == 1
assert snapshot_registry._snapshots[micro][0].consistent is False
snapshot_registry._write_snapshot_ymmsl.assert_not_called()
micro_snapshot = make_snapshot(f_i=[3], o_f=[2])
snapshot_registry._add_snapshot(micro, micro_snapshot)
# The first micro snapshots should be cleaned up now
assert len(snapshot_registry._snapshots[micro]) == 1
micro_node = snapshot_registry._snapshots[micro][0]
assert micro_node.consistent
snapshot_registry._write_snapshot_ymmsl.assert_called_once_with(
[micro_node, node])
snapshot_registry._write_snapshot_ymmsl.reset_mock()
# 3 micro snapshots in the same reuse:
for _ in range(3):
micro_snapshot = make_snapshot(f_i=[4], o_f=[3])
snapshot_registry._add_snapshot(micro, micro_snapshot)
# Previous micro snapshot should be cleaned up now
assert len(snapshot_registry._snapshots[micro]) == 1
micro_node = snapshot_registry._snapshots[micro][-1]
assert snapshot_registry._write_snapshot_ymmsl.call_count == 3
snapshot_registry._write_snapshot_ymmsl.assert_called_with(
[micro_node, node])
snapshot_registry._write_snapshot_ymmsl.reset_mock()
macro_snapshot = make_snapshot(o_i=[4], s=[4])
snapshot_registry._add_snapshot(macro, macro_snapshot)
snapshot_registry._write_snapshot_ymmsl.assert_called_once()
snapshot_registry._write_snapshot_ymmsl.reset_mock()
# 3 micro snapshots in the same reuse, but inconcistent with previous macro
for _ in range(3):
micro_snapshot = make_snapshot(f_i=[6], o_f=[5])
snapshot_registry._add_snapshot(micro, micro_snapshot)
# All three should be present now in addition to the one last used in
# workflow snapshot
assert len(snapshot_registry._snapshots[micro]) == 4
snapshot_registry._write_snapshot_ymmsl.assert_not_called()
macro_snapshot = make_snapshot(o_i=[6], s=[6])
snapshot_registry._add_snapshot(macro, macro_snapshot)
assert snapshot_registry._write_snapshot_ymmsl.call_count == 3
assert len(snapshot_registry._snapshots[micro]) == 1
assert len(snapshot_registry._snapshots[macro]) == 1
[docs]
def test_uq(uq: Configuration) -> None:
snapshot_registry = SnapshotRegistry(uq, None, TopologyStore(uq))
# prevent actually writing a ymmsl file, testing that separately
snapshot_registry._write_snapshot_ymmsl = MagicMock()
macro = Reference('macro')
micro = Reference('micro')
qmc = Reference('qmc')
rr = Reference('rr')
qmc_snapshot = make_snapshot(parameters_out=[], states_in=[])
snapshot_registry._add_snapshot(qmc, qmc_snapshot)
rr_snapshot = make_snapshot(
front_in=[1, 1, 1, 1, 1, 0, 0, 0, 0, 0],
front_out=[0] * 10,
back_out=[1, 1, 1, 1, 1],
back_in=[0] * 5)
snapshot_registry._add_snapshot(rr, rr_snapshot)
node = snapshot_registry._snapshots[rr][-1]
assert qmc in node.consistent_peers
snapshot_registry._write_snapshot_ymmsl.assert_not_called()
macro_snapshot = make_snapshot(
muscle_settings_in=[1], final_state_out=[0], o_i=[0], s=[0])
for i in range(5):
snapshot_registry._add_snapshot(macro + i, macro_snapshot)
node = snapshot_registry._snapshots[macro + i][-1]
assert node.consistent_peers.keys() == {rr}
snapshot_registry._write_snapshot_ymmsl.assert_not_called()
micro_snapshot = make_snapshot(f_i=[1], o_f=[0])
for i in range(5):
snapshot_registry._add_snapshot(micro + i, micro_snapshot)
node = snapshot_registry._snapshots[micro + i][-1]
assert node.consistent_peers.keys() == {macro + i}
if i == 4:
snapshot_registry._write_snapshot_ymmsl.assert_called_once()
snapshot_registry._write_snapshot_ymmsl.reset_mock()
else:
snapshot_registry._write_snapshot_ymmsl.assert_not_called()
qmc_snapshot = make_snapshot(parameters_out=[1, 1, 1, 1, 1], states_in=[])
snapshot_registry._add_snapshot(qmc, qmc_snapshot)
node = snapshot_registry._snapshots[qmc][-1]
assert node.consistent_peers.keys() == {rr}
snapshot_registry._write_snapshot_ymmsl.assert_called_once()
snapshot_registry._write_snapshot_ymmsl.reset_mock()
assert len(snapshot_registry._snapshots[qmc]) == 1 # previous is cleaned up
[docs]
def test_heuristic_rollbacks() -> None:
components = [
Component(f'comp{i}', Ports(f_init='f_i', o_f='o_f'), '', f'impl{i}')
for i in range(4)]
conduits = [Conduit(f'comp{i}.o_f', f'comp{i+1}.f_i') for i in range(3)]
model = Model('linear', None, '', None, components, conduits)
programs = [Program(f'impl{i}', script='xyz') for i in range(4)]
config = Configuration('', [], [model], programs=programs)
comp1, comp2, comp3, comp4 = (Reference(f'comp{i}') for i in range(4))
snapshot_registry = SnapshotRegistry(config, None, TopologyStore(config))
# prevent actually writing a ymmsl file, testing that separately
snapshot_registry._write_snapshot_ymmsl = MagicMock()
for i in range(4):
snapshot_registry._add_snapshot(comp1, make_snapshot(o_f=[i]))
assert len(snapshot_registry._snapshots[comp1]) == 4
for i in range(10):
snapshot_registry._add_snapshot(
comp2, make_snapshot(f_i=[1], o_f=[0]))
snapshot_registry._add_snapshot(
comp3, make_snapshot(f_i=[1], o_f=[0]))
assert len(snapshot_registry._snapshots[comp2]) == 10
assert len(snapshot_registry._snapshots[comp3]) == 10
snapshot_registry._add_snapshot(comp2, make_snapshot(f_i=[2], o_f=[1]))
assert len(snapshot_registry._snapshots[comp2]) == 11
snapshot_registry._add_snapshot(comp2, make_snapshot(f_i=[3], o_f=[2]))
assert len(snapshot_registry._snapshots[comp2]) == 12
snapshot_registry._write_snapshot_ymmsl.assert_not_called()
snapshot_registry._add_snapshot(
comp4, make_snapshot(f_i=[1]))
snapshot_registry._write_snapshot_ymmsl.assert_called()
assert len(snapshot_registry._snapshots[comp1]) == 2
assert len(snapshot_registry._snapshots[comp2]) == 2
assert len(snapshot_registry._snapshots[comp3]) == 1
assert len(snapshot_registry._snapshots[comp4]) == 1