import dataclasses
from pathlib import Path
from random import uniform
from threading import get_ident, RLock
from time import perf_counter, sleep
from typing import Any, Dict, Iterable, List, Optional, Tuple
import msgpack
from ymmsl.v0_2 import (
Conduit, Operator, Port, Reference, Settings, Checkpoints,
CheckpointRule, CheckpointRangeRule, CheckpointAtRule, Identifier)
import libmuscle
from libmuscle.mcp.protocol import RequestType, ResponseType
from libmuscle.mcp.tcp_transport_client import TcpTransportClient
from libmuscle.peer_info import PeerInfo
from libmuscle.profiling import ProfileEvent
from libmuscle.logging import LogMessage
from libmuscle.snapshot import SnapshotMetadata
from libmuscle.util import instance_to_kernel, instance_indices
TIMID_WAIT = 1.0
PEER_TIMEOUT = 600
PEER_INTERVAL_MIN = 5.0
PEER_INTERVAL_MAX = 10.0
_CheckpointInfoType = Tuple[
float, Checkpoints, Optional[Path], Optional[Path]]
[docs]
def encode_operator(op: Operator) -> str:
"""Convert an Operator to a MsgPack-compatible value."""
return op.name
[docs]
def encode_port(port: Port) -> List[str]:
"""Convert a Port to a MsgPack-compatible value."""
return [str(port.name), encode_operator(port.operator)]
[docs]
def encode_profile_event(event: ProfileEvent) -> Any:
"""Converts a ProfileEvent to a list.
Args:
event: A profile event
Returns:
A list with its attributes, for MMP serialisation.
"""
if event.start_time is None or event.stop_time is None:
raise RuntimeError(
'Incomplete ProfileEvent sent. This is a bug, please'
' report it.')
encoded_port = encode_port(event.port) if event.port else None
return [
event.event_type.value,
event.start_time.nanoseconds, event.stop_time.nanoseconds,
encoded_port, event.port_length, event.slot,
event.message_number, event.message_size, event.message_timestamp]
[docs]
def decode_checkpoint_rule(rule: Dict[str, Any]) -> CheckpointRule:
"""Decode a checkpoint rule from a MsgPack-compatible value."""
if rule.keys() == {'at'}:
return CheckpointAtRule(**rule)
if rule.keys() == {'start', 'stop', 'every'}:
return CheckpointRangeRule(**rule)
raise ValueError(f'Cannot convert {rule} to a checkpoint rule.')
[docs]
def decode_checkpoint_info(
elapsed_time: float,
checkpoints_dict: Dict[str, Any],
resume: Optional[str],
snapshot_dir: Optional[str]
) -> _CheckpointInfoType:
"""Decode checkpoint info from a MsgPack-compatible value.
Args:
elapsed_time: current elapsed time according to the manager
checkpoints_dict: checkpoint definitions from the MsgPack
resume: path to the snapshot we should resume from, if any
snapshot_dir: path to the directory to store new snapshots in
Returns:
elapsed_time: current elapsed time according to the manager
checkpoints: checkpoint configuration
resume: path to the snapshot we should resume from, if any
snapshot_dir: path to the directory to store new snapshots in
"""
checkpoints = Checkpoints(
at_end=checkpoints_dict["at_end"],
wallclock_time=[decode_checkpoint_rule(rule)
for rule in checkpoints_dict["wallclock_time"]],
simulation_time=[decode_checkpoint_rule(rule)
for rule in checkpoints_dict["simulation_time"]])
resume_path = None if resume is None else Path(resume)
snapshot_path = None if snapshot_dir is None else Path(snapshot_dir)
return (elapsed_time, checkpoints, resume_path, snapshot_path)
[docs]
class ConnectionLockedError(RuntimeError):
pass
[docs]
class MMPClient():
"""The client for the MUSCLE Manager Protocol.
This class connects to the Manager and communicates with it on
behalf of the rest of libmuscle.
It manages the connection, and converts between our native types
and the gRPC generated types.
Communication is protected by an internal lock, so this class can
be called simultaneously from different threads.
"""
def __init__(self, instance_id: Reference, location: str) -> None:
"""Create an MMPClient
Args:
location: A connection string of the form hostname:port
"""
self._instance_id = instance_id
self._transport_client = TcpTransportClient(location)
self._mutex = RLock()
self._cur_owner = 0
[docs]
def close(self) -> None:
"""Close the connection
This closes the connection. After this no other member
functions can be called.
"""
self._transport_client.close()
[docs]
def submit_log_message(self, message: LogMessage) -> None:
"""Send a log message to the manager.
This particular call is a bit tricky because of its potentially recursive
nature. It's used by a special logging handler (see logging_handler.py) to send
high-priority log messages to the manager for inclusion in the manager log. The
problem is that the connection to the manager may fail while doing so, which
causes more log messages to be generated (dropped connection are rare and
shouldn't really happen, so we want the user to know about them). Of course
those then get picked up by the handler, which sends them here recursively.
The Python logging system has an internal mutex, and we've got a mutex here as
well, and this recursion causes a thread to try to lock them alternatingly. If
the thread starts with making a request to the manager, then it will try to lock
the MMPClient mutex first, then the logging mutex, but if it starts with sending
a log message, then it will try to lock the logging mutex first and then the
MMPClient one. If two threads try to lock two mutexes in a different order then
you'll get a deadlock.
To avoid this, this function will not wait for the internal lock forever, but
time out after a while and raise. Note that the actual implementation is in
_call_manager().
Args:
message: The message to send.
Raises:
ConnectionLockedError: if the connection to the manager was being used
already.
"""
request = [
RequestType.SUBMIT_LOG_MESSAGE.value,
message.instance_id, message.timestamp.seconds,
message.level.value, message.text]
self._call_manager(request, True)
[docs]
def submit_profile_events(self, events: Iterable[ProfileEvent]) -> None:
"""Sends profiling events to the manager.
Args:
events: The events to send.
"""
request = [
RequestType.SUBMIT_PROFILE_EVENTS.value,
str(self._instance_id),
[encode_profile_event(e) for e in events]]
self._call_manager(request)
[docs]
def get_settings(self) -> Settings:
"""Get the central settings from the manager.
Returns:
The requested settings.
"""
request = [RequestType.GET_SETTINGS.value]
response = self._call_manager(request)
return Settings(response[1])
[docs]
def get_checkpoint_info(self) -> _CheckpointInfoType:
"""Get the checkpoint info from the manager.
Returns:
elapsed_time: current elapsed time
checkpoints: checkpoint configuration
resume: path to the resume snapshot
snapshot_directory: path to store snapshots
"""
request = [RequestType.GET_CHECKPOINT_INFO.value, str(self._instance_id)]
response = self._call_manager(request)
return decode_checkpoint_info(*response[1:])
[docs]
def register_instance(
self, locations: List[str], ports: List[Port]) -> None:
"""Register a component instance with the manager.
Args:
locations: List of places where the instance can be
reached.
ports: List of ports of this instance.
"""
request = [
RequestType.REGISTER_INSTANCE.value,
str(self._instance_id), locations,
[encode_port(p) for p in ports],
libmuscle.__version__]
response = self._call_manager(request)
if response[0] == ResponseType.ERROR.value:
raise RuntimeError(
f'Error registering instance: {response[1]}')
[docs]
def request_peers(self) -> PeerInfo:
"""Request connection information about peers.
This will repeat the request at an exponentially increasing
query interval at first, until it reaches the interval
specified by PEER_INTERVAL_MIN and PEER_INTERVAL_MAX. From
there on, intervals are drawn randomly from that range.
Returns:
PeerInfo received from the muscle manager.
"""
sleep_time = 0.1
start_time = perf_counter()
request = [RequestType.GET_PEERS.value, str(self._instance_id)]
response = self._call_manager(request)
while (response[0] == ResponseType.PENDING.value and
perf_counter() < start_time + PEER_TIMEOUT and
sleep_time < PEER_INTERVAL_MIN):
sleep(sleep_time)
response = self._call_manager(request)
sleep_time *= 1.5
while (response[0] == ResponseType.PENDING.value and
perf_counter() < start_time + PEER_TIMEOUT):
sleep(uniform(PEER_INTERVAL_MIN, PEER_INTERVAL_MAX))
response = self._call_manager(request)
if response[0] == ResponseType.PENDING.value:
raise RuntimeError('Timeout waiting for peers to appear')
if response[0] == ResponseType.ERROR.value:
raise RuntimeError('Error getting peers from manager: {}'.format(
response[1]))
conduits = [Conduit(snd, recv) for snd, recv in response[1]]
peer_dimensions = {
Reference(component): dims
for component, dims in response[2].items()}
peer_locations = {
Reference(instance): locs
for instance, locs in response[3].items()}
ports = [
Port(Identifier(name), Operator[op]) for name, op in response[4]
]
name = instance_to_kernel(self._instance_id)
index = instance_indices(self._instance_id)
return PeerInfo(name, index, conduits, peer_dimensions, peer_locations, ports)
[docs]
def deregister_instance(self) -> None:
"""Deregister a component instance with the manager.
"""
request = [
RequestType.DEREGISTER_INSTANCE.value, str(self._instance_id)]
response = self._call_manager(request)
if response[0] == ResponseType.ERROR.value:
raise RuntimeError('Error deregistering instance: {}'.format(
response[1]))
[docs]
def waiting_for_receive(
self, peer_instance_id: Reference, port_name: str, slot: Optional[int]
) -> None:
"""Notify the manager that we're waiting to receive a message."""
request = [
RequestType.WAITING_FOR_RECEIVE.value,
str(self._instance_id),
str(peer_instance_id), port_name, slot]
self._call_manager(request)
[docs]
def waiting_for_receive_done(
self, peer_instance_id: Reference, port_name: str, slot: Optional[int]
) -> None:
"""Notify the manager that we're done waiting to receive a message."""
request = [
RequestType.WAITING_FOR_RECEIVE_DONE.value,
str(self._instance_id),
str(peer_instance_id), port_name, slot]
self._call_manager(request)
[docs]
def is_deadlocked(self) -> bool:
"""Ask the manager if we're part of a deadlock."""
request = [
RequestType.IS_DEADLOCKED.value, str(self._instance_id)]
response = self._call_manager(request)
return bool(response[1])
def _call_manager(self, request: Any, timid: bool = False) -> Any:
"""Call the manager and do en/decoding.
Args:
request: The request to encode and send
timid: If True, raise if we can't get the lock after a while
Returns:
The decoded response
Raises:
ConnectionLockedError: If timid was True and we failed to get the lock
"""
if timid:
if self._mutex.acquire(timeout=TIMID_WAIT):
# We were trying to log, disconnected, then logged the disconnect and
# now we're back here. Since we're still disconnected, we need to raise
# and drop this message.
if self._cur_owner == get_ident():
self._mutex.release()
raise ConnectionLockedError()
else:
# Timed out, someone else has held the lock for a long time, so either
# the manager is overloaded or there are connection problems. Raise and
# drop.
raise ConnectionLockedError()
else:
self._mutex.acquire()
self._cur_owner = get_ident()
try:
encoded_request = msgpack.packb(request, use_bin_type=True)
response, _ = self._transport_client.call(encoded_request)
result = msgpack.unpackb(response, raw=False)
return result
finally:
self._cur_owner = 0
self._mutex.release()