Source code for libmuscle.native_instantiator.agent.map_client

from pathlib import Path
from typing import Any, Optional

import msgpack

from libmuscle.mcp.protocol import AgentCommandType, RequestType, ResponseType
from libmuscle.mcp.tcp_transport_client import TcpTransportClient
from libmuscle.native_instantiator.agent.agent_commands import (
    AgentCommand,
    CancelAllCommand,
    ShutdownCommand,
    StartCommand,
)
from libmuscle.planner.resources import OnNodeResources


[docs] class MAPClient: """The client for the MUSCLE Agent Protocol. This class connects to the AgentManager and communicates with it. """ def __init__(self, node_name: str, location: str) -> None: """Create a MAPClient Args: node_name: Name (hostname) of the local node location: A connection string of the form hostname:port """ self._node_name = node_name self._transport_client = TcpTransportClient(location)
[docs] def close(self) -> None: """Close the connection This closes the connection. After this no other member functions can be called. """ self._transport_client.close()
[docs] def report_resources(self, resources: OnNodeResources) -> None: """Report local resources Args: resources: Description of the resources on this node """ enc_cpu_resources = [[c.cid] + list(c.hwthreads) for c in resources.cpu_cores] request = [ RequestType.REPORT_RESOURCES.value, resources.node_name, {'cpu': enc_cpu_resources}] self._call_agent_manager(request)
[docs] def get_command(self) -> Optional[AgentCommand]: """Get a command from the agent manager. Returns: A command, or None if there are no commands pending. """ request = [RequestType.GET_COMMAND.value, self._node_name] response = self._call_agent_manager(request) if response[0] == ResponseType.PENDING.value: return None else: command = msgpack.unpackb(response[1], raw=False) if command[0] == AgentCommandType.START.value: name = command[1] workdir = Path(command[2]) args = command[3] env = command[4] stdout = Path(command[5]) stderr = Path(command[6]) return StartCommand(name, workdir, args, env, stdout, stderr) elif command[0] == AgentCommandType.CANCEL_ALL.value: return CancelAllCommand() elif command[0] == AgentCommandType.SHUTDOWN.value: return ShutdownCommand() raise Exception('Unknown AgentCommand')
[docs] def report_result(self, names_exit_codes: list[tuple[str, int]]) -> None: """Report results of finished processes. Args: names_exit_codes: A list of names and exit codes of finished processes. """ request = [RequestType.REPORT_RESULT.value, names_exit_codes] self._call_agent_manager(request)
def _call_agent_manager(self, request: Any) -> Any: """Call the manager and do en/decoding. Args: request: The request to encode and send Returns: The decoded response """ encoded_request = msgpack.packb(request, use_bin_type=True) response, _ = self._transport_client.call(encoded_request) return msgpack.unpackb(response, raw=False)