Source code for libmuscle.test.conftest

from copy import copy
import pytest
from typing import Dict, List, Set, Union
from unittest.mock import patch

from ymmsl.v0_2 import Operator, Reference, Settings

from libmuscle.api_guard import APIGuard
from libmuscle.communicator import Message
from libmuscle.mcp.transport_client import ProfileData
from libmuscle.mmp_client import MMPClient
from libmuscle.planner.resources import Core, CoreSet, OnNodeResources, Resources
from libmuscle.port import Port
from libmuscle.profiler import Profiler
from libmuscle.timestamp import Timestamp


[docs] @pytest.fixture def mocked_mmp_client(): with patch('libmuscle.mmp_client.TcpTransportClient') as mock_ttc: yield MMPClient(Reference('component[13]'), ''), mock_ttc.return_value
[docs] @pytest.fixture def message() -> Message: return Message(0.0, None, b'test', Settings())
[docs] @pytest.fixture def message2() -> Message: return Message(0.0, None, {'test': 17}, Settings())
[docs] @pytest.fixture def guard() -> APIGuard: return APIGuard(True)
[docs] @pytest.fixture def profile_data() -> ProfileData: return Timestamp(0.0), Timestamp(0.0), Timestamp(0.0)
[docs] @pytest.fixture def mocked_profiler(): class MockMMPClient: def __init__(self): self.sent_events = None def submit_profile_events(self, events): self.sent_events = copy(events) mock_mmp_client = MockMMPClient() profiler = Profiler(mock_mmp_client) yield profiler, mock_mmp_client profiler.shutdown()
[docs] @pytest.fixture def profiler_comm_int_10ms(): with patch('libmuscle.profiler._COMMUNICATION_INTERVAL', 0.01): yield None
[docs] @pytest.fixture def declared_ports(): return { Operator.F_INIT: ['in', 'not_connected'], Operator.O_I: ['out_v', 'out_r'], Operator.S: ['in_v', 'in_r', 'not_connected_v'], Operator.O_F: ['out']}
[docs] @pytest.fixture def mock_ports(): in_port = Port('in', Operator.F_INIT, False, True, 0, []) nc_port = Port('not_connected', Operator.F_INIT, False, False, 0, []) outv_port = Port('out_v', Operator.O_I, True, True, 0, [13]) outr_port = Port('out_r', Operator.O_I, True, True, 0, []) inv_port = Port('in_v', Operator.S, True, True, 0, [13]) inr_port = Port('in_r', Operator.S, True, True, 0, []) ncv_port = Port('not_connected_v', Operator.S, True, False, 0, []) out_port = Port('out', Operator.O_F, False, True, 0, []) return { 'in': in_port, 'not_connected': nc_port, 'out_v': outv_port, 'out_r': outr_port, 'in_v': inv_port, 'in_r': inr_port, 'not_connected_v': ncv_port, 'out': out_port}
[docs] @pytest.fixture def connected_port_manager(port_manager, declared_ports, mock_ports): def get_port(name): return mock_ports[name] def port_exists(name): return name in mock_ports port_manager.get_port = get_port port_manager.list_ports.return_value = declared_ports port_manager.port_exists = port_exists return port_manager
[docs] def core(hwthread_id: int) -> Core: """Helper that defines a core with the given core and hwthread id.""" return Core(hwthread_id, {hwthread_id})
[docs] def on_node_resources(node_name: str, cores: Union[int, Set[int]]) -> OnNodeResources: """Helper that defines resources on a node from the name and a CPU core.""" if isinstance(cores, int): cores = {cores} return OnNodeResources(node_name, CoreSet([Core(core, {core}) for core in cores]))
[docs] def resources(node_resources: Dict[str, List[Core]]) -> Resources: """Helper that defines a Resources from a dict.""" return Resources([ OnNodeResources(node_name, CoreSet(cores)) for node_name, cores in node_resources.items()])