Source code for libmuscle.mmp_client

import dataclasses
from pathlib import Path
from random import uniform
from threading import get_ident, RLock
from time import perf_counter, sleep
from typing import Any, Dict, Iterable, List, Optional, Tuple

import msgpack
from ymmsl.v0_2 import (
        Conduit, Operator, Port, Reference, Settings, Checkpoints,
        CheckpointRule, CheckpointRangeRule, CheckpointAtRule, Identifier)

import libmuscle
from libmuscle.mcp.protocol import RequestType, ResponseType
from libmuscle.mcp.tcp_transport_client import TcpTransportClient
from libmuscle.peer_info import PeerInfo
from libmuscle.profiling import ProfileEvent
from libmuscle.logging import LogMessage
from libmuscle.snapshot import SnapshotMetadata
from libmuscle.util import instance_to_kernel, instance_indices


TIMID_WAIT = 1.0

PEER_TIMEOUT = 600
PEER_INTERVAL_MIN = 5.0
PEER_INTERVAL_MAX = 10.0

_CheckpointInfoType = Tuple[
        float, Checkpoints, Optional[Path], Optional[Path]]


[docs] def encode_operator(op: Operator) -> str: """Convert an Operator to a MsgPack-compatible value.""" return op.name
[docs] def encode_port(port: Port) -> List[str]: """Convert a Port to a MsgPack-compatible value.""" return [str(port.name), encode_operator(port.operator)]
[docs] def encode_profile_event(event: ProfileEvent) -> Any: """Converts a ProfileEvent to a list. Args: event: A profile event Returns: A list with its attributes, for MMP serialisation. """ if event.start_time is None or event.stop_time is None: raise RuntimeError( 'Incomplete ProfileEvent sent. This is a bug, please' ' report it.') encoded_port = encode_port(event.port) if event.port else None return [ event.event_type.value, event.start_time.nanoseconds, event.stop_time.nanoseconds, encoded_port, event.port_length, event.slot, event.message_number, event.message_size, event.message_timestamp]
[docs] def decode_checkpoint_rule(rule: Dict[str, Any]) -> CheckpointRule: """Decode a checkpoint rule from a MsgPack-compatible value.""" if rule.keys() == {'at'}: return CheckpointAtRule(**rule) if rule.keys() == {'start', 'stop', 'every'}: return CheckpointRangeRule(**rule) raise ValueError(f'Cannot convert {rule} to a checkpoint rule.')
[docs] def decode_checkpoint_info( elapsed_time: float, checkpoints_dict: Dict[str, Any], resume: Optional[str], snapshot_dir: Optional[str] ) -> _CheckpointInfoType: """Decode checkpoint info from a MsgPack-compatible value. Args: elapsed_time: current elapsed time according to the manager checkpoints_dict: checkpoint definitions from the MsgPack resume: path to the snapshot we should resume from, if any snapshot_dir: path to the directory to store new snapshots in Returns: elapsed_time: current elapsed time according to the manager checkpoints: checkpoint configuration resume: path to the snapshot we should resume from, if any snapshot_dir: path to the directory to store new snapshots in """ checkpoints = Checkpoints( at_end=checkpoints_dict["at_end"], wallclock_time=[decode_checkpoint_rule(rule) for rule in checkpoints_dict["wallclock_time"]], simulation_time=[decode_checkpoint_rule(rule) for rule in checkpoints_dict["simulation_time"]]) resume_path = None if resume is None else Path(resume) snapshot_path = None if snapshot_dir is None else Path(snapshot_dir) return (elapsed_time, checkpoints, resume_path, snapshot_path)
[docs] class ConnectionLockedError(RuntimeError): pass
[docs] class MMPClient(): """The client for the MUSCLE Manager Protocol. This class connects to the Manager and communicates with it on behalf of the rest of libmuscle. It manages the connection, and converts between our native types and the gRPC generated types. Communication is protected by an internal lock, so this class can be called simultaneously from different threads. """ def __init__(self, instance_id: Reference, location: str) -> None: """Create an MMPClient Args: location: A connection string of the form hostname:port """ self._instance_id = instance_id self._transport_client = TcpTransportClient(location) self._mutex = RLock() self._cur_owner = 0
[docs] def close(self) -> None: """Close the connection This closes the connection. After this no other member functions can be called. """ self._transport_client.close()
[docs] def submit_log_message(self, message: LogMessage) -> None: """Send a log message to the manager. This particular call is a bit tricky because of its potentially recursive nature. It's used by a special logging handler (see logging_handler.py) to send high-priority log messages to the manager for inclusion in the manager log. The problem is that the connection to the manager may fail while doing so, which causes more log messages to be generated (dropped connection are rare and shouldn't really happen, so we want the user to know about them). Of course those then get picked up by the handler, which sends them here recursively. The Python logging system has an internal mutex, and we've got a mutex here as well, and this recursion causes a thread to try to lock them alternatingly. If the thread starts with making a request to the manager, then it will try to lock the MMPClient mutex first, then the logging mutex, but if it starts with sending a log message, then it will try to lock the logging mutex first and then the MMPClient one. If two threads try to lock two mutexes in a different order then you'll get a deadlock. To avoid this, this function will not wait for the internal lock forever, but time out after a while and raise. Note that the actual implementation is in _call_manager(). Args: message: The message to send. Raises: ConnectionLockedError: if the connection to the manager was being used already. """ request = [ RequestType.SUBMIT_LOG_MESSAGE.value, message.instance_id, message.timestamp.seconds, message.level.value, message.text] self._call_manager(request, True)
[docs] def submit_profile_events(self, events: Iterable[ProfileEvent]) -> None: """Sends profiling events to the manager. Args: events: The events to send. """ request = [ RequestType.SUBMIT_PROFILE_EVENTS.value, str(self._instance_id), [encode_profile_event(e) for e in events]] self._call_manager(request)
[docs] def submit_snapshot_metadata( self, snapshot_metadata: SnapshotMetadata) -> None: """Send snapshot metadata to the manager. Args: snapshot_metadata: Snapshot metadata to supply to the manager. """ request = [ RequestType.SUBMIT_SNAPSHOT.value, str(self._instance_id), dataclasses.asdict(snapshot_metadata)] self._call_manager(request)
[docs] def get_settings(self) -> Settings: """Get the central settings from the manager. Returns: The requested settings. """ request = [RequestType.GET_SETTINGS.value] response = self._call_manager(request) return Settings(response[1])
[docs] def get_checkpoint_info(self) -> _CheckpointInfoType: """Get the checkpoint info from the manager. Returns: elapsed_time: current elapsed time checkpoints: checkpoint configuration resume: path to the resume snapshot snapshot_directory: path to store snapshots """ request = [RequestType.GET_CHECKPOINT_INFO.value, str(self._instance_id)] response = self._call_manager(request) return decode_checkpoint_info(*response[1:])
[docs] def register_instance( self, locations: List[str], ports: List[Port]) -> None: """Register a component instance with the manager. Args: locations: List of places where the instance can be reached. ports: List of ports of this instance. """ request = [ RequestType.REGISTER_INSTANCE.value, str(self._instance_id), locations, [encode_port(p) for p in ports], libmuscle.__version__] response = self._call_manager(request) if response[0] == ResponseType.ERROR.value: raise RuntimeError( f'Error registering instance: {response[1]}')
[docs] def request_peers(self) -> PeerInfo: """Request connection information about peers. This will repeat the request at an exponentially increasing query interval at first, until it reaches the interval specified by PEER_INTERVAL_MIN and PEER_INTERVAL_MAX. From there on, intervals are drawn randomly from that range. Returns: PeerInfo received from the muscle manager. """ sleep_time = 0.1 start_time = perf_counter() request = [RequestType.GET_PEERS.value, str(self._instance_id)] response = self._call_manager(request) while (response[0] == ResponseType.PENDING.value and perf_counter() < start_time + PEER_TIMEOUT and sleep_time < PEER_INTERVAL_MIN): sleep(sleep_time) response = self._call_manager(request) sleep_time *= 1.5 while (response[0] == ResponseType.PENDING.value and perf_counter() < start_time + PEER_TIMEOUT): sleep(uniform(PEER_INTERVAL_MIN, PEER_INTERVAL_MAX)) response = self._call_manager(request) if response[0] == ResponseType.PENDING.value: raise RuntimeError('Timeout waiting for peers to appear') if response[0] == ResponseType.ERROR.value: raise RuntimeError('Error getting peers from manager: {}'.format( response[1])) conduits = [Conduit(snd, recv) for snd, recv in response[1]] peer_dimensions = { Reference(component): dims for component, dims in response[2].items()} peer_locations = { Reference(instance): locs for instance, locs in response[3].items()} ports = [ Port(Identifier(name), Operator[op]) for name, op in response[4] ] name = instance_to_kernel(self._instance_id) index = instance_indices(self._instance_id) return PeerInfo(name, index, conduits, peer_dimensions, peer_locations, ports)
[docs] def deregister_instance(self) -> None: """Deregister a component instance with the manager. """ request = [ RequestType.DEREGISTER_INSTANCE.value, str(self._instance_id)] response = self._call_manager(request) if response[0] == ResponseType.ERROR.value: raise RuntimeError('Error deregistering instance: {}'.format( response[1]))
[docs] def waiting_for_receive( self, peer_instance_id: Reference, port_name: str, slot: Optional[int] ) -> None: """Notify the manager that we're waiting to receive a message.""" request = [ RequestType.WAITING_FOR_RECEIVE.value, str(self._instance_id), str(peer_instance_id), port_name, slot] self._call_manager(request)
[docs] def waiting_for_receive_done( self, peer_instance_id: Reference, port_name: str, slot: Optional[int] ) -> None: """Notify the manager that we're done waiting to receive a message.""" request = [ RequestType.WAITING_FOR_RECEIVE_DONE.value, str(self._instance_id), str(peer_instance_id), port_name, slot] self._call_manager(request)
[docs] def is_deadlocked(self) -> bool: """Ask the manager if we're part of a deadlock.""" request = [ RequestType.IS_DEADLOCKED.value, str(self._instance_id)] response = self._call_manager(request) return bool(response[1])
def _call_manager(self, request: Any, timid: bool = False) -> Any: """Call the manager and do en/decoding. Args: request: The request to encode and send timid: If True, raise if we can't get the lock after a while Returns: The decoded response Raises: ConnectionLockedError: If timid was True and we failed to get the lock """ if timid: if self._mutex.acquire(timeout=TIMID_WAIT): # We were trying to log, disconnected, then logged the disconnect and # now we're back here. Since we're still disconnected, we need to raise # and drop this message. if self._cur_owner == get_ident(): self._mutex.release() raise ConnectionLockedError() else: # Timed out, someone else has held the lock for a long time, so either # the manager is overloaded or there are connection problems. Raise and # drop. raise ConnectionLockedError() else: self._mutex.acquire() self._cur_owner = get_ident() try: encoded_request = msgpack.packb(request, use_bin_type=True) response, _ = self._transport_client.call(encoded_request) result = msgpack.unpackb(response, raw=False) return result finally: self._cur_owner = 0 self._mutex.release()