Source code for libmuscle.native_instantiator.map_server

import errno
import logging
from typing import Any, Dict, cast, List, Optional
from typing_extensions import Buffer

import msgpack

from libmuscle.mcp.protocol import AgentCommandType, RequestType, ResponseType
from libmuscle.mcp.tcp_transport_server import TcpTransportServer
from libmuscle.mcp.transport_server import RequestHandler
from libmuscle.native_instantiator.agent.agent_commands import (
        AgentCommand, CancelAllCommand, ShutdownCommand, StartCommand)
from libmuscle.native_instantiator.iagent_manager import IAgentManager
from libmuscle.planner.resources import Core, CoreSet, OnNodeResources
from libmuscle.post_office import PostOffice

from ymmsl.v0_2 import Reference


_logger = logging.getLogger(__name__)


[docs] class MAPRequestHandler(RequestHandler): """Handles Agent requests.""" def __init__(self, agent_manager: IAgentManager, post_office: PostOffice) -> None: """Create a MAPRequestHandler. Args: agent_manager: The AgentManager to forward reports to post_office: The PostOffice to get commands from """ self._agent_manager = agent_manager self._post_office = post_office
[docs] def handle_request(self, request: Buffer) -> Buffer: """Handles an agent request. Args: request: The encoded request Returns: response: An encoded response """ req_list = msgpack.unpackb(request, raw=False) req_type = req_list[0] req_args = req_list[1:] if req_type == RequestType.REPORT_RESOURCES.value: response = self._report_resources(*req_args) elif req_type == RequestType.GET_COMMAND.value: response = self._get_command(*req_args) elif req_type == RequestType.REPORT_RESULT.value: response = self._report_result(*req_args) return cast(Buffer, msgpack.packb(response, use_bin_type=True))
def _report_resources( self, node_name: str, data: Dict[str, Any]) -> Any: """Handle a report resources request. This is used by the agent to report available resources on its node when it starts up. Args: node_name: Name (hostname) of the node data: Resource dictionary, containing a single key 'cpu' which maps to a list of cores, where each core is a list of ints, starting with the core id at index [0] followed by the hwthread ids of all hwthreads in this core. """ cores = CoreSet((Core(ids[0], set(ids[1:])) for ids in data['cpu'])) node_resources = OnNodeResources(node_name, cores) self._agent_manager.report_resources(node_resources) return [ResponseType.SUCCESS.value] def _get_command(self, node_name: str) -> Any: """Handle a get command request. This is used by the agent to ask if there's anything we would like it to do. Command sounds a bit brusque, but we already have the agent sending requests to this handler, so I needed a different word to distinguish them. Requests are sent by the agent to the manager (because it's the client in an RPC setup), commands are returned by the manager to the agent (because it tells it what to do). Args: node_name: Hostname (name) of the agent's node """ node_ref = Reference('_' + node_name.replace('-', '_')) next_request: Optional[Buffer] = None if self._post_office.have_message(node_ref): next_request = self._post_office.get_message(node_ref) if next_request is not None: return [ResponseType.SUCCESS.value, next_request] return [ResponseType.PENDING.value] def _report_result(self, instances: List[List[Any]]) -> Any: """Handle a report result request. This is sent by the agent if an instance it launched exited. Args: instances: List of instance descriptions, comprising an id str and exit code int. Really a List[Tuple[str, int]] but msgpack doesn't know about tuples. """ self._agent_manager.report_result(list(map(tuple, instances))) return [ResponseType.SUCCESS.value]
[docs] class MAPServer: """The MUSCLE Agent Protocol server. This class accepts connections from the agents and services them using a MAPRequestHandler. """ def __init__(self, agent_manager: IAgentManager) -> None: """Create a MAPServer. This starts a TCP Transport server and connects it to a MAPRequestHandler, which uses the given agent manager to service the requests. By default, we listen on port 9009, unless it's not available in which case we use a random other one. Args: agent_manager: AgentManager to forward requests to """ self._post_office = PostOffice() self._handler = MAPRequestHandler(agent_manager, self._post_office) try: self._server = TcpTransportServer(self._handler, 9009) except OSError as e: if e.errno != errno.EADDRINUSE: raise self._server = TcpTransportServer(self._handler)
[docs] def get_location(self) -> str: """Return this server's network location. This is a string of the form tcp:<hostname>:<port>. """ return self._server.get_location()
[docs] def stop(self) -> None: """Stop the server. This makes the server stop serving requests, and shuts down its background threads. """ self._server.close()
[docs] def deposit_command(self, node_name: str, command: AgentCommand) -> None: """Deposit a command for the given agent. This takes the given command and queues it for the given agent to pick up next time it asks us for one. Args: node_name: Name of the node whose agent should execute the command command: The command to send """ agent = Reference('_' + node_name.replace('-', '_')) if isinstance(command, StartCommand): command_obj = [ AgentCommandType.START.value, command.name, str(command.work_dir), command.args, command.env, str(command.stdout), str(command.stderr) ] elif isinstance(command, CancelAllCommand): command_obj = [AgentCommandType.CANCEL_ALL.value] elif isinstance(command, ShutdownCommand): command_obj = [AgentCommandType.SHUTDOWN.value] encoded_command = cast(bytes, msgpack.packb(command_obj, use_bin_type=True)) self._post_office.deposit(agent, encoded_command)