Source code for libmuscle.manager.instantiator

import enum
import logging
import multiprocessing as mp
from pathlib import Path
import traceback
from typing import Optional

from ymmsl import Implementation, Reference, ResourceRequirements

from libmuscle.planner.planner import Resources


[docs]class ProcessStatus(enum.Enum): """Status of a process (instance).""" STARTED = 0 RUNNING = 1 SUCCESS = 2 ERROR = 3 CANCELED = 4
[docs] def is_finished(self) -> bool: """Returns whether the job is finished. Canceled jobs are considered finished, even if they've never run. """ return self in ( ProcessStatus.SUCCESS, ProcessStatus.ERROR, ProcessStatus.CANCELED)
[docs]class Process: """Represents a process. Attributes: instance: Name of instance this is the process of resources: The resources allocated to this process status: Current status of the process exit_code: Exit code, if status is ERROR error_msg: Error message, if status is ERROR """ def __init__(self, instance: Reference, resources: Resources) -> None: """Create a Process object. Args: resources: Resources allocated to this process. """ self.instance = instance self.resources = resources self.status = ProcessStatus.STARTED self.exit_code: Optional[int] = None self.error_msg: Optional[str] = None
[docs]class InstantiatorRequest: """Base class for requests to an instantiator.""" pass
[docs]class ShutdownRequest(InstantiatorRequest): """Requests shutting down the background process. The process will stop once all running processes are done. """ pass
[docs]class InstantiationRequest(InstantiatorRequest): """Requests instantiating a new process. Attributes: instance: The name of the instance implementation: The implementation to start for it resources: The resources to start it on """ def __init__( self, instance: Reference, implementation: Implementation, res_req: ResourceRequirements, resources: Resources, instance_dir: Path, work_dir: Path, stdout_path: Path, stderr_path: Path ) -> None: """Create an InstantiationRequest. Args: instance: The name of the instance implementation: The implementation to start for it res_req: The resource requirements for this instance resources: The resources to instantiate on instance_dir: The main directory for this instance work_dir: The directory in which to start it stdout_path: Path of file to redirect stdout to stderr_path: Path of file to redirect stderr to """ self.instance = instance self.implementation = implementation self.res_req = res_req self.resources = resources self.instance_dir = instance_dir self.work_dir = work_dir self.stdout_path = stdout_path self.stderr_path = stderr_path
[docs]class CancelAllRequest(InstantiatorRequest): """Requests stopping all running processes.""" pass
[docs]class CrashedResult: """Signals that the instantiator process crashed.""" pass
[docs]class QueueingLogHandler(logging.Handler): """A logging Handler that enqueues records.""" def __init__(self, queue: mp.Queue) -> None: """Create a QueueingLogHandler. Args: level: The level of this handler """ super().__init__() self._queue = queue
[docs] def emit(self, record: logging.LogRecord) -> None: """Emit the record by enqueueing it. Args: record: A log record to enqueue. """ if record.exc_info: record.msg += '\n' + ''.join( traceback.format_exception(*record.exc_info)) record.exc_info = None self._queue.put(record)