from typing import List
from libmuscle.peer_info import PeerInfo
from libmuscle.port_manager import PortManager
from ymmsl.v0_2 import Conduit, Identifier as Id, Operator, Reference as Ref, Port
import pytest
[docs]
@pytest.fixture
def index() -> List[int]:
return [13]
[docs]
@pytest.fixture
def port_manager(index) -> PortManager:
declared_ports = {
Operator.O_I: ['out'],
Operator.S: ['in']}
return PortManager(index, declared_ports)
[docs]
@pytest.fixture
def index2() -> List[int]:
return []
[docs]
@pytest.fixture
def port_manager2(index2) -> PortManager:
declared_ports = {
Operator.F_INIT: ['in[]'],
Operator.O_F: ['out[]']}
port_manager = PortManager(index2, declared_ports)
component_id = Ref('other')
conduits = [Conduit('component.out', 'other.in'),
Conduit('other.out', 'component.in')]
peer_dims = {Ref('component'): [20]}
peer_locations = {Ref('component'): ['direct:test']}
peer_info = PeerInfo(component_id, index2, conduits, peer_dims, peer_locations, [])
port_manager.connect_ports(peer_info)
return port_manager
[docs]
def test_connect_ports(index, port_manager) -> None:
component_id = Ref('component')
conduits = [Conduit('component.out', 'other.in'),
Conduit('other.settings_out', 'component.muscle_settings_in'),
Conduit('other.out', 'component.in')]
peer_dims = {Ref('other'): []}
peer_locations = {Ref('other'): ['direct:test']}
peer_info = PeerInfo(component_id, index, conduits, peer_dims, peer_locations, [])
port_manager.connect_ports(peer_info)
# check automatic ports
assert port_manager.settings_in_connected()
assert port_manager._muscle_settings_in.name == Id('muscle_settings_in')
assert port_manager._muscle_settings_in.operator == Operator.F_INIT
assert port_manager._muscle_settings_in._length is None
# check declared ports
ports = port_manager._ports
assert ports['in'].name == Id('in')
assert ports['in'].operator == Operator.S
assert ports['in']._length is None
assert ports['out'].name == Id('out')
assert ports['out'].operator == Operator.O_I
assert ports['out']._length is None
[docs]
def test_connect_vector_ports(index) -> None:
declared_ports = {
Operator.F_INIT: ['in[]'],
Operator.O_F: ['out1', 'out2[]']}
port_manager = PortManager(index, declared_ports)
component_id = Ref('component')
conduits = [Conduit('other1.out', 'component.in'),
Conduit('component.out1', 'other.in'),
Conduit('component.out2', 'other3.in')]
peer_dims = {
Ref('other1'): [20, 7],
Ref('other'): [25],
Ref('other3'): [20]}
peer_locations = {
Ref('other'): ['direct:test'],
Ref('other1'): ['direct:test1'],
Ref('other3'): ['direct:test3']}
peer_info = PeerInfo(component_id, index, conduits, peer_dims, peer_locations, [])
port_manager.connect_ports(peer_info)
ports = port_manager._ports
assert ports['in'].name == Id('in')
assert ports['in'].operator == Operator.F_INIT
assert ports['in']._length == 7
assert ports['in']._is_resizable is False
assert ports['out1'].name == Id('out1')
assert ports['out1'].operator == Operator.O_F
assert ports['out1']._length is None
assert ports['out2'].name == Id('out2')
assert ports['out2'].operator == Operator.O_F
assert ports['out2']._length == 0
assert ports['out2']._is_resizable is True
[docs]
def test_connect_multidimensional_ports() -> None:
index = [13]
declared_ports = {Operator.F_INIT: ['in[][]']}
port_manager = PortManager(index, declared_ports)
component_id = Ref('component')
conduits = [Conduit('other.out', 'component.in')]
peer_dims = {Ref('other'): [20, 7, 30]}
peer_locations = {Ref('other'): ['direct:test']}
peer_info = PeerInfo(component_id, index, conduits, peer_dims, peer_locations, [])
with pytest.raises(ValueError):
port_manager.connect_ports(peer_info)
[docs]
def test_connect_inferred_ports() -> None:
index = [13]
port_manager = PortManager(index, None)
component_id = Ref('component')
conduits = [Conduit('other1.out', 'component.in'),
Conduit('component.out1', 'other.in'),
Conduit('component.out3', 'other2.in')]
peer_dims = {
Ref('other1'): [20, 7],
Ref('other'): [25],
Ref('other2'): []}
peer_locations = {
Ref('other'): ['direct:test'],
Ref('other1'): ['direct:test1'],
Ref('other2'): ['direct:test2']}
ymmsl_ports = [
Port(Id("in"), Operator.F_INIT),
Port(Id("out1"), Operator.O_F),
Port(Id("out3"), Operator.O_F)]
peer_info = PeerInfo(
component_id, index, conduits, peer_dims, peer_locations, ymmsl_ports)
port_manager.connect_ports(peer_info)
ports = port_manager._ports
assert ports['in'].name == Id('in')
assert ports['in'].operator == Operator.F_INIT
assert ports['in']._length == 7
assert ports['in']._is_resizable is False
assert ports['out1'].name == Id('out1')
assert ports['out1'].operator == Operator.O_F
assert ports['out1']._length is None
assert ports['out3'].name == Id('out3')
assert ports['out3'].operator == Operator.O_F
assert ports['out3']._length is None
[docs]
def test_connect_inferred_ports_oi_s() -> None:
port_manager = PortManager([], None)
peer_info = PeerInfo(
Ref("Component"),
[],
[Conduit("component.oi", "other.init"), Conduit("other.final", "component.s")],
{Ref("other"): []},
{Ref("other"): ["direct:test"]},
[Port(Id("oi"), Operator.O_I), Port(Id("s"), Operator.S)],
)
port_manager.connect_ports(peer_info)
ports = port_manager._ports
assert ports["oi"].name == Id("oi")
assert ports["oi"].operator == Operator.O_I
assert ports["oi"]._length is None
assert ports["s"].name == Id("s")
assert ports["s"].operator == Operator.S
assert ports["s"]._length is None
[docs]
def test_port_message_counts(port_manager) -> None:
port_manager.connect_ports(PeerInfo('component', [], [], {}, {}, []))
msg_counts = port_manager.get_message_counts()
assert msg_counts == {'in': [0], 'out': [0], 'muscle_settings_in': [0]}
port_manager.get_port('in').increment_num_messages()
msg_counts2 = port_manager.get_message_counts()
assert msg_counts2 == {'in': [1], 'out': [0], 'muscle_settings_in': [0]}
port_manager.get_port('out').increment_num_messages()
port_manager.get_port('out').increment_num_messages()
msg_counts3 = port_manager.get_message_counts()
assert msg_counts3 == {'in': [1], 'out': [2], 'muscle_settings_in': [0]}
port_manager._muscle_settings_in.increment_num_messages()
msg_counts4 = port_manager.get_message_counts()
assert msg_counts4 == {'in': [1], 'out': [2], 'muscle_settings_in': [1]}
port_manager.restore_message_counts(msg_counts)
msg_counts5 = port_manager.get_message_counts()
assert msg_counts5 == {'in': [0], 'out': [0], 'muscle_settings_in': [0]}
with pytest.raises(RuntimeError):
port_manager.restore_message_counts({"x?invalid_port": 3})
[docs]
def test_vector_port_message_counts(port_manager2) -> None:
msg_counts = port_manager2.get_message_counts()
assert msg_counts == {'out': [0] * 20,
'in': [0] * 20,
'muscle_settings_in': [0]}
port_manager2.get_port('out').increment_num_messages(13)
msg_counts = port_manager2.get_message_counts()
assert msg_counts == {'out': [0] * 13 + [1] + [0] * 6,
'in': [0] * 20,
'muscle_settings_in': [0]}
port_manager2.restore_message_counts({'out': list(range(20)),
'in': list(range(20)),
'muscle_settings_in': [4]})
port_manager2.get_port('out').increment_num_messages(13)
msg_counts = port_manager2.get_message_counts()
assert msg_counts == {'out': list(range(13)) + [14] + list(range(14, 20)),
'in': list(range(20)),
'muscle_settings_in': [4]}