Source code for libmuscle.mcp.test.conftest

from unittest.mock import MagicMock

import pytest
from ymmsl.v0_2 import Reference

from libmuscle.mcp.tcp_transport_server import TcpTransportServer
from libmuscle.mpp_message import MPPMessage
from libmuscle.outbox import Outbox
from libmuscle.post_office import PostOffice


[docs] @pytest.fixture def receiver(): return Reference('test_receiver.test_port')
[docs] @pytest.fixture def post_office(receiver): class MockPO(PostOffice): outboxes = {receiver: Outbox()} def get_message(self, receiver: Reference) -> MPPMessage: return self.outboxes[receiver].retrieve() return MockPO()
[docs] @pytest.fixture def tcp_transport_server(): server = TcpTransportServer(MagicMock()) yield server server.close()