from random import uniform
from threading import Condition, Lock, Thread
import time
from typing import List
from libmuscle.mmp_client import MMPClient
from libmuscle.profiling import ProfileEvent, ProfileTimestamp
_COMMUNICATION_INTERVAL = 10.0 # seconds
[docs]class Profiler:
"""Collects profiling events and sends them to the manager.
"""
def __init__(self, manager: MMPClient) -> None:
"""Create a Profiler.
Args:
manager: The client used to submit data to the manager.
"""
# Protects all member variables and _flush()
self._mutex = Lock()
self._manager = manager
self._enabled = True
self._events: List[ProfileEvent] = []
self._thread = Thread(target=self._communicate, daemon=True)
self._done_cv = Condition(self._mutex)
self._done = False
self._next_send = 0.0
self._thread.start()
[docs] def shutdown(self) -> None:
with self._mutex:
if self._done:
return
self._done = True
self._done_cv.notify_all()
self._thread.join()
# with the thread gone, there's no need to lock anymore
self._flush()
[docs] def set_level(self, level: str) -> None:
"""Set the detail level at which data is collected.
Args:
level: Either 'none' or 'all' to disable or enable sending
events to the manager.
"""
with self._mutex:
self._enabled = level == 'all'
[docs] def record_event(self, event: ProfileEvent) -> None:
"""Record a profiling event.
This will record the event, and may flush this and previously
recorded events to the manager. If the time is still running,
it will be stopped. Other than this the event must be complete
when it is submitted. Do not use the event object after calling
this function with it.
Args:
event: The event to record.
"""
if event.stop_time is None:
event.stop_time = ProfileTimestamp()
if self._enabled:
with self._mutex:
self._events.append(event)
if len(self._events) >= 10000:
self._flush()
self._next_send = time.monotonic() + _COMMUNICATION_INTERVAL
def _communicate(self) -> None:
"""Background thread that communicates with the manager.
This runs in the background, and periodically sends events to
the manager.
"""
initial_delay = uniform(0.0, _COMMUNICATION_INTERVAL)
with self._mutex:
self._next_send = time.monotonic() + initial_delay
while not self._done:
now = time.monotonic()
notified = self._done_cv.wait(self._next_send - now)
if not notified:
now = time.monotonic()
if self._next_send <= now:
self._flush()
self._next_send = now + _COMMUNICATION_INTERVAL
def _flush(self) -> None:
"""Send events to the manager and empty the queue.
Make sure to lock self._mutex before calling this.
"""
if self._events:
self._manager.submit_profile_events(self._events)
self._events.clear()