import enum
import logging
import multiprocessing as mp
import os
import traceback
from pathlib import Path
from typing import Optional
from ymmsl.v0_2 import BaseEnv, Program, Reference, ResourceRequirements
from libmuscle.planner.planner import ResourceAssignment
[docs]
class ProcessStatus(enum.Enum):
"""Status of a process (instance)."""
STARTED = 0
RUNNING = 1
SUCCESS = 2
ERROR = 3
CANCELED = 4
[docs]
def is_finished(self) -> bool:
"""Returns whether the job is finished.
Canceled jobs are considered finished, even if they've never
run.
"""
return self in (
ProcessStatus.SUCCESS, ProcessStatus.ERROR,
ProcessStatus.CANCELED)
[docs]
class Process:
"""Represents a process.
Attributes:
instance: Name of instance this is the process of
resources: The resources allocated to this process
status: Current status of the process
exit_code: Exit code, if status is ERROR
error_msg: Error message, if status is ERROR
"""
def __init__(self, instance: Reference, resources: ResourceAssignment) -> None:
"""Create a Process object.
Args:
resources: Resources allocated to this process.
"""
self.instance = instance
self.resources = resources
self.status = ProcessStatus.STARTED
self.exit_code: Optional[int] = None
self.error_msg: Optional[str] = None
[docs]
class InstantiatorRequest:
"""Base class for requests to an instantiator."""
pass
[docs]
class ShutdownRequest(InstantiatorRequest):
"""Requests shutting down the background process.
The process will stop once all running processes are done.
"""
pass
[docs]
class InstantiationRequest(InstantiatorRequest):
"""Requests instantiating a new process.
Attributes:
instance: The name of the instance
program: The program to start for it
res_req: The resource requirements for this instance
resources: The specific resources to start it on
instance_dir: The main directory for this instance
work_dir: The directory in which to start it
stdout_path: Path of file to redirect stdout to
stderr_path: Path of file to redirect stderr to
"""
def __init__(
self, instance: Reference, program: Program,
res_req: ResourceRequirements, resources: ResourceAssignment,
instance_dir: Path, work_dir: Path, stdout_path: Path, stderr_path: Path
) -> None:
"""Create an InstantiationRequest.
Args:
instance: The name of the instance
program: The program to start for it
res_req: The resource requirements for this instance
resources: The specific resources to instantiate on
instance_dir: The main directory for this instance
work_dir: The directory in which to start it
stdout_path: Path of file to redirect stdout to
stderr_path: Path of file to redirect stderr to
"""
self.instance = instance
self.program = program
self.res_req = res_req
self.resources = resources
self.instance_dir = instance_dir
self.work_dir = work_dir
self.stdout_path = stdout_path
self.stderr_path = stderr_path
[docs]
class CancelAllRequest(InstantiatorRequest):
"""Requests stopping all running processes."""
pass
[docs]
class CrashedResult:
"""Signals that the instantiator process crashed."""
def __init__(self, exception: Optional[BaseException] = None) -> None:
self.exception = exception
[docs]
class QueueingLogHandler(logging.Handler):
"""A logging Handler that enqueues records."""
def __init__(self, queue: mp.Queue) -> None:
"""Create a QueueingLogHandler.
Args:
level: The level of this handler
"""
super().__init__()
self._queue = queue
[docs]
def emit(self, record: logging.LogRecord) -> None:
"""Emit the record by enqueueing it.
Args:
record: A log record to enqueue.
"""
if record.exc_info:
record.msg += '\n' + ''.join(
traceback.format_exception(*record.exc_info))
record.exc_info = None
self._queue.put(record)
[docs]
def create_instance_env(
instance: Reference, base_env: BaseEnv, overlay: dict[str, str]
) -> dict[str, str]:
"""Creates an environment for an instance.
This takes the current (manager) environment variables and makes
a copy, then adds or extends it according to the overlay given.
Keys from overlay that start with + will have the corresponding
value appended to the matching (by key, without the +) value in
env, otherwise the value in env gets overwritten.
"""
if base_env == BaseEnv.LOGIN:
env = dict()
for var in ('HOME', 'LOGNAME', 'SHELL', 'TERM', 'USER'):
if var in os.environ:
env[var] = os.environ[var]
else:
env = os.environ.copy()
if base_env == BaseEnv.CLEAN:
# Remove any virtual environment
#
# The deactivate command does not get exported by the activate script, so it
# isn't available to us, as are most of the environment variables it uses to
# deactivate the venv. So this is a bit of a hack, but it's the best we can
# do. Note that _OLD_VIRTUAL_PYTHONHOME isn't exported by default either,
# but we check for it anyway; if we're on a system where PYTHONHOME is
# needed and the manager needs to be run with an active virtualenv and one
# of the programs needs a clean shell, then the user will have to export
# it manually between activating the virtualenv and starting the manager
# and things will work.
vep = env.get('VIRTUAL_ENV_PROMPT')
if vep is not None:
if 'PS1' in env:
if env['PS1'].startswith(vep):
env['PS1'] = env['PS1'][len(vep):]
del env['VIRTUAL_ENV_PROMPT']
ovp = env.get('_OLD_VIRTUAL_PYTHONHOME')
if ovp is not None:
env['PYTHONHOME'] = ovp
del env['_OLD_VIRTUAL_PYTHONHOME']
venv = env.get('VIRTUAL_ENV')
if venv is not None:
path = env.get('PATH')
if path is not None:
paths = [p for p in path.split(':') if p != venv + '/bin']
env['PATH'] = ':'.join(paths)
del env['VIRTUAL_ENV']
env['MUSCLE_INSTANCE'] = str(instance)
for key, value in overlay.items():
if key.startswith('+'):
if key[1:] in env:
env[key[1:]] += value
else:
env[key[1:]] = value
else:
env[key] = value
return env