Source code for libmuscle.manager.test.test_profile_database

from unittest.mock import patch

from libmuscle.manager.profile_database import ProfileDatabase
from libmuscle.manager.profile_store import ProfileStore
from libmuscle.planner.planner import ResourceAssignment
from libmuscle.profiling import (
        ProfileEvent, ProfileEventType, ProfileTimestamp)

from libmuscle.test.conftest import on_node_resources as onr

from ymmsl.v0_2 import Operator, Port, Reference

import pytest

from pathlib import Path


[docs] @pytest.fixture def db_file(tmp_path) -> Path: with patch('libmuscle.manager.profile_store._SYNCHED', True): store = ProfileStore(tmp_path) t1 = ProfileTimestamp() store.store_instances([Reference('instance1'), Reference('instance2')]) resources1 = ResourceAssignment([ onr('node001', {0, 1}), onr('node002', {0, 1})]) resources2 = ResourceAssignment([ onr('node001', {0}), onr('node002', {0, 1, 2})]) store.store_resources({ Reference('instance1'): resources1, Reference('instance2'): resources2}) def t(offset: int) -> ProfileTimestamp: return ProfileTimestamp(t1.nanoseconds + offset) e1 = [ ProfileEvent(ProfileEventType.REGISTER, t(0), t(1000)), ProfileEvent(ProfileEventType.CONNECT, t(1000), t(2000)), ProfileEvent( ProfileEventType.SEND, t(2000), t(2100), Port('out', Operator.O_I), None, None, 1000000, 0.0), ProfileEvent( ProfileEventType.SEND, t(2200), t(2400), Port('out', Operator.O_I), None, None, 1000000, 0.0), ProfileEvent( ProfileEventType.SEND, t(2600), t(2900), Port('out', Operator.O_I), None, None, 1000000, 0.0), ProfileEvent(ProfileEventType.SHUTDOWN_WAIT, t(10000), t(11000)), ProfileEvent(ProfileEventType.DEREGISTER, t(11000), t(11100))] store.add_events(Reference('instance1'), e1) e2 = [ ProfileEvent(ProfileEventType.REGISTER, t(0), t(800)), ProfileEvent(ProfileEventType.CONNECT, t(800), t(1600)), ProfileEvent( ProfileEventType.RECEIVE, t(2000), t(2100), Port('in', Operator.O_I), None, None, 1000000, 0.0), ProfileEvent( ProfileEventType.RECEIVE_WAIT, t(2000), t(2090), Port('in', Operator.O_I), None, None, 1000000, 0.0), ProfileEvent( ProfileEventType.RECEIVE, t(2200), t(2400), Port('in', Operator.O_I), None, None, 1000000, 0.0), ProfileEvent( ProfileEventType.RECEIVE_WAIT, t(2200), t(2380), Port('in', Operator.O_I), None, None, 1000000, 0.0), ProfileEvent( ProfileEventType.RECEIVE, t(2600), t(2900), Port('in', Operator.O_I), None, None, 1000000, 0.0), ProfileEvent( ProfileEventType.RECEIVE_WAIT, t(2600), t(2870), Port('in', Operator.O_I), None, None, 1000000, 0.0), ProfileEvent(ProfileEventType.SHUTDOWN_WAIT, t(10000), t(11000)), ProfileEvent(ProfileEventType.DEREGISTER, t(11000), t(11100))] store.add_events(Reference('instance2'), e2) db_file = store._db_file store.shutdown() return db_file
[docs] def test_instance_stats(db_file): with ProfileDatabase(db_file) as db: instances, run_time, comm_time, wait_time = db.instance_stats() assert set(instances) == {'instance1', 'instance2'} i1 = instances.index('instance1') assert run_time[i1] == pytest.approx(7400.0 * 1e-9) assert comm_time[i1] == pytest.approx(600.0 * 1e-9) assert wait_time[i1] == 0.0 i2 = instances.index('instance2') assert run_time[i2] == pytest.approx(7800.0 * 1e-9) assert comm_time[i2] == pytest.approx(60.0 * 1e-9) assert wait_time[i2] == pytest.approx(540.0 * 1e-9)
[docs] def test_resource_stats(db_file): with ProfileDatabase(db_file) as db: stats = db.resource_stats() assert set(stats.keys()) == set([ 'node001:0', 'node001:1', 'node002:0', 'node002:1', 'node002:2']) assert stats['node001:0'] == { 'instance1': 8000.0 * 1e-9, 'instance2': 7860.0 * 1e-9} assert stats['node001:1'] == { 'instance1': 8000.0 * 1e-9} assert stats['node002:0'] == { 'instance1': 8000.0 * 1e-9, 'instance2': 7860.0 * 1e-9} assert stats['node002:1'] == { 'instance1': 8000.0 * 1e-9, 'instance2': 7860.0 * 1e-9} assert stats['node002:2'] == { 'instance2': 7860.0 * 1e-9}
[docs] def test_time_taken(db_file): with ProfileDatabase(db_file) as db: assert 1000.0 == db.time_taken(etype='REGISTER', instance='instance1') assert 100.0 == db.time_taken(etype='DEREGISTER') assert 11100.0 == db.time_taken( etype='REGISTER', instance='instance1', etype2='DEREGISTER') assert 10000.0 == db.time_taken( etype='REGISTER', instance='instance1', time='stop', etype2='DEREGISTER', time2='start') assert 200.0 == db.time_taken(etype='SEND') assert 200.0 == db.time_taken(etype='DEREGISTER', aggregate='sum') assert 600.0 == db.time_taken(etype='SEND', aggregate='sum')