Source code for libmuscle.test.test_outbox

from copy import copy

import pytest
from ymmsl.v0_2 import Reference

from libmuscle.mpp_message import MPPMessage
from libmuscle.outbox import Outbox


[docs] @pytest.fixture def outbox(): return Outbox()
[docs] @pytest.fixture def message(): Ref = Reference return MPPMessage( Ref('sender.out'), Ref('receiver.in'), None, 0.0, 1.0, b'', 0, 1.0, b'testing')
[docs] def test_create_outbox(): box = Outbox() assert box._Outbox__queue.qsize() == 0
[docs] def test_deposit_message(outbox, message): outbox.deposit(message) assert outbox._Outbox__queue.qsize() == 1 assert outbox._Outbox__queue.get(message)
[docs] def test_retrieve_message(outbox, message): outbox._Outbox__queue.put(message) assert outbox.retrieve() == message
[docs] def test_deposit_retrieve_order(outbox, message): m1 = copy(message) m2 = copy(message) outbox.deposit(m1) outbox.deposit(m2) assert outbox.retrieve() == m1 assert outbox.retrieve() == m2