Source code for libmuscle.manager.mmp_server

import errno
import logging
import time
from typing import Any, Dict, cast, List, Optional

import msgpack
from ymmsl import (
        Conduit, Identifier, Operator, Port, Reference, PartialConfiguration,
        Checkpoints, CheckpointRule, CheckpointAtRule, CheckpointRangeRule)

import libmuscle
from libmuscle.logging import LogLevel
from libmuscle.manager.instance_registry import (
        AlreadyRegistered, InstanceRegistry)
from libmuscle.manager.logger import Logger
from libmuscle.manager.run_dir import RunDir
from libmuscle.manager.snapshot_registry import SnapshotRegistry
from libmuscle.manager.topology_store import TopologyStore
from libmuscle.mcp.protocol import RequestType, ResponseType
from libmuscle.mcp.tcp_transport_server import TcpTransportServer
from libmuscle.mcp.transport_server import RequestHandler
from libmuscle.manager.profile_store import ProfileStore
from libmuscle.profiling import (
        ProfileEvent, ProfileEventType, ProfileTimestamp)
from libmuscle.snapshot import SnapshotMetadata
from libmuscle.timestamp import Timestamp


_logger = logging.getLogger(__name__)


[docs]def decode_operator(data: str) -> Operator: """Create an Operator from a MsgPack-compatible value.""" return Operator[data]
[docs]def decode_port(data: List[str]) -> Port: """Create a Port from a MsgPack-compatible value.""" return Port(Identifier(data[0]), decode_operator(data[1]))
[docs]def encode_conduit(conduit: Conduit) -> List[str]: """Convert a Conduit to a MsgPack-compatible value.""" return [str(conduit.sender), str(conduit.receiver)]
[docs]def encode_checkpoint_rule(rule: CheckpointRule) -> Dict[str, Any]: """Convert a CheckpointRule to a MsgPack-compatible value.""" if isinstance(rule, CheckpointAtRule): return {'at': list(map(float, rule.at))} if isinstance(rule, CheckpointRangeRule): return { 'start': None if rule.start is None else float(rule.start), 'stop': None if rule.stop is None else float(rule.stop), 'every': float(rule.every)} raise TypeError(f"Unknown checkpoint rule type: {type(rule)}.")
[docs]def encode_checkpoints(checkpoints: Checkpoints) -> Dict[str, Any]: """Convert a Checkpoins to a MsgPack-compatible value.""" return { "at_end": checkpoints.at_end, "wallclock_time": list(map(encode_checkpoint_rule, checkpoints.wallclock_time)), "simulation_time": list(map(encode_checkpoint_rule, checkpoints.simulation_time)), }
[docs]class MMPRequestHandler(RequestHandler): """Handles Manager requests.""" def __init__( self, logger: Logger, profile_store: ProfileStore, configuration: PartialConfiguration, instance_registry: InstanceRegistry, topology_store: TopologyStore, snapshot_registry: SnapshotRegistry, run_dir: Optional[RunDir] ) -> None: """Create an MMPRequestHandler. Args: logger: The Logger component to log messages to. settings: The global settings to serve to instances. instance_registry: The database for instances. topology_store: Keeps track of how to connect things. """ self._logger = logger self._profile_store = profile_store self._configuration = configuration self._instance_registry = instance_registry self._topology_store = topology_store self._snapshot_registry = snapshot_registry self._run_dir = run_dir self._reference_time = time.monotonic()
[docs] def handle_request(self, request: bytes) -> bytes: """Handles a manager request. Args: request: The encoded request Returns: response: An encoded response """ req_list = msgpack.unpackb(request, raw=False) req_type = req_list[0] req_args = req_list[1:] if req_type == RequestType.REGISTER_INSTANCE.value: response = self._register_instance(*req_args) elif req_type == RequestType.GET_PEERS.value: response = self._get_peers(*req_args) elif req_type == RequestType.DEREGISTER_INSTANCE.value: response = self._deregister_instance(*req_args) elif req_type == RequestType.GET_SETTINGS.value: response = self._get_settings(*req_args) elif req_type == RequestType.SUBMIT_LOG_MESSAGE.value: response = self._submit_log_message(*req_args) elif req_type == RequestType.SUBMIT_PROFILE_EVENTS.value: response = self._submit_profile_events(*req_args) elif req_type == RequestType.SUBMIT_SNAPSHOT.value: response = self._submit_snapshot(*req_args) elif req_type == RequestType.GET_CHECKPOINT_INFO.value: response = self._get_checkpoint_info(*req_args) return cast(bytes, msgpack.packb(response, use_bin_type=True))
[docs] def close(self) -> None: """Free per-thread resources. On shutdown of the server, this will be called by each server thread before it shuts down. """ self._profile_store.close()
def _register_instance( self, instance_id: str, locations: List[str], ports: List[List[str]], version: str = '') -> Any: """Handle a register instance request. Args: instance_id: ID of the instance to register locations: Locations where it can be reached ports: Ports of this instance version: Version of libmuscle that this instance uses Returns: A list containing the following values: status (ResponseType): SUCCESS or ERROR error_msg (str): An error message, only present if status equals ERROR """ if version != libmuscle.__version__: return [ ResponseType.ERROR.value, f'Instance libmuscle version ({version}) does not match' f' manager libmuscle version ({libmuscle.__version__}).' ' Please ensure that the instance and the manager use the' ' same version of libmuscle.'] port_objs = [decode_port(p) for p in ports] instance = Reference(instance_id) try: self._instance_registry.add(instance, locations, port_objs) _logger.info(f'Registered instance {instance_id}') return [ResponseType.SUCCESS.value] except AlreadyRegistered: return [ ResponseType.ERROR.value, f'An instance with name {instance_id} was already' ' registered. Did you start a non-MPI component using' ' mpirun?'] def _get_peers(self, instance_id: str) -> Any: """Handle a get peers request. Args: instance_id: ID of the instance requesting peers Returns: A list containing the following values on success: status (ResponseType): SUCCESS conduits (List[List[str]]): Conduits from/to peers dimensions (Dict[str, List[int]]): Dimensions of peer components locations (Dict[str, List[str]]): Locations where peer instances can be contacted. Or the following values on error: status (ResponseType): ERROR error_msg (str): An error message Or the following values if the result is not yet available: status (ResponseType): PENDING status_msg (str): A message on what we're waiting for. """ # get info from yMMSL instance = Reference(instance_id) component = instance.without_trailing_ints() if not self._topology_store.has_kernel(component): return [ResponseType.ERROR.value, f'Unknown component {component}'] conduits = self._topology_store.get_conduits(component) mmp_conduits = [encode_conduit(c) for c in conduits] peer_dims = self._topology_store.get_peer_dimensions(component) mmp_dimensions = {str(name): dims for name, dims in peer_dims.items()} # generate instances try: peers = self._topology_store.get_peer_instances(instance) instance_locations = { str(peer): self._instance_registry.get_locations(peer) for peer in peers} except KeyError as e: return [ ResponseType.PENDING.value, f'Waiting for component {e.args[0]}'] _logger.debug(f'Sent peers to {instance_id}') return [ ResponseType.SUCCESS.value, mmp_conduits, mmp_dimensions, instance_locations] def _deregister_instance(self, instance_id: str) -> Any: """Handle a deregister instance request. Args: instance_id: ID of the instance to deregister Returns: A list containing the following values on success: status (ResponseType): SUCCESS Or the following if an error occurred: status (ResponseType): ERROR error_msg (str): An error message """ try: self._instance_registry.remove(Reference(instance_id)) _logger.info(f'Deregistered instance {instance_id}') return [ResponseType.SUCCESS.value] except ValueError: return [ ResponseType.ERROR.value, f'No instance with name {instance_id} was registered'] def _get_settings(self) -> Any: """Handle a get settings request. Returns: A list containing the following values on success: status (ResponseType): SUCCESS settings (Dict[str, SettingValue]): The global settings """ return [ ResponseType.SUCCESS.value, self._configuration.settings.as_ordered_dict()] def _submit_log_message( self, instance_id: str, timestamp: float, level: int, text: str ) -> Any: """Handle a submit log message request. Args: instance_id: Sending instance timestamp: Time since epoch of the logged event level: Log level of the message text: Message text Returns: A list containing the following values on success: status (ResponseType): SUCCESS """ self._logger.log_message( instance_id, Timestamp(timestamp), LogLevel(level), text) return [ResponseType.SUCCESS.value] def _submit_profile_events( self, instance_id: str, events: List[List[Any]]) -> Any: """Handle a submit profile events request. Args: instance_id: Instance that sent these events events: Profiling events to store Returns: A list containing the following values on success: status (ResponseType): SUCCESS """ ev = [ ProfileEvent( ProfileEventType(e[0]), ProfileTimestamp(e[1]), ProfileTimestamp(e[2]), Port(e[3][0], Operator[e[3][1]]) if e[3] else None, e[4], e[5], e[6], e[7], e[8]) for e in events] self._profile_store.add_events(Reference(instance_id), ev) return [ResponseType.SUCCESS.value] def _submit_snapshot( self, instance_id: str, snapshot: Dict[str, Any]) -> Any: """Handle a submit snapshot request. Returns: A list containing the following values on success: status (ResponseType): SUCCESS """ snapshot_obj = SnapshotMetadata(**snapshot) instance = Reference(instance_id) self._snapshot_registry.register_snapshot(instance, snapshot_obj) return [ResponseType.SUCCESS.value] def _get_checkpoint_info(self, instance_id: str) -> Any: """Get checkpoint info for an instance Args: instance: The instance whose checkpoint info to get Returns: A list containing the following values on success: status (ResponseType): SUCCESS wallclock_reference_time (float): Unix timestamp (in UTC) indicating wallclock time of the start of the workflow. checkpoints (dict): Dictionary encoding a ymmsl.Checkpoints object. resume_path (Optional[str]): Checkpoint filename to resume from. snapshot_directory (Optional[str]): Directory to store instance snapshots. """ instance = Reference(instance_id) resume = None if instance in self._configuration.resume: resume = str(self._configuration.resume[instance]) snapshot_directory = None if self._run_dir is not None: snapshot_directory = str(self._run_dir.snapshot_dir(instance)) return [ResponseType.SUCCESS.value, time.monotonic() - self._reference_time, encode_checkpoints(self._configuration.checkpoints), resume, snapshot_directory]
[docs]class MMPServer: """The MUSCLE Manager Protocol server. This class accepts connections from the instances comprising the multiscale model to be executed, and services them using an MMPRequestHandler. """ def __init__( self, logger: Logger, profile_store: ProfileStore, configuration: PartialConfiguration, instance_registry: InstanceRegistry, topology_store: TopologyStore, snapshot_registry: SnapshotRegistry, run_dir: Optional[RunDir] ) -> None: """Create an MMPServer. This starts a TCP Transport server and connects it to an MMPRequestHandler, which uses the given components to service the requests. By default, we listen on port 9000, unless it's not available in which case we use a random other one. Args: logger: Logger to send log messages to profile_store: ProfileStore to store profile data in configuration: Configuration component to get settings, checkpoints and resumes from instance_registry: To register instances with and get peer locations from topology_store: To get peers and conduits from snapshot_registry: To register snapshots with run_dir: To save snapshots to """ self._handler = MMPRequestHandler( logger, profile_store, configuration, instance_registry, topology_store, snapshot_registry, run_dir) try: self._server = TcpTransportServer(self._handler, 9000) except OSError as e: if e.errno != errno.EADDRINUSE: raise self._server = TcpTransportServer(self._handler)
[docs] def get_location(self) -> str: """Returns this server's network location. This is a string of the form tcp:<hostname>:<port>. """ return self._server.get_location()
[docs] def stop(self) -> None: """Stops the server. This makes the server stop serving requests, and shuts down its background threads. """ self._server.close()