Source code for libmuscle.manager.qcgpj_instantiator

import asyncio
import logging
import multiprocessing as mp
import os
from pathlib import Path
import queue
import sys
import traceback
from typing import Dict, List, Tuple

from qcg.pilotjob.allocation import (
        Allocation as qcg_Allocation, NodeAllocation as qcg_NodeAllocation)
from qcg.pilotjob.config import Config as qcg_Config
from qcg.pilotjob.errors import InternalError as qcg_InternalError
from qcg.pilotjob.executor import Executor as qcg_Executor
from qcg.pilotjob.joblist import (
        Job as qcg_Job, JobExecution as qcg_JobExecution,
        JobResources as qcg_JobResources)
from qcg.pilotjob.manager import (
        SchedulingIteration as qcg_SchedulingIteration,
        SchedulingJob as qcg_SchedulingJob)
from qcg.pilotjob.parseres import get_resources as qcg_get_resources
from qcg.pilotjob.resources import (
        Node as qcg_Node, ResourcesType as qcg_ResourcesType)

from ymmsl import ExecutionModel, MPICoresResReq, Reference, ThreadedResReq

from libmuscle.manager.instantiator import (
        CancelAllRequest, CrashedResult, InstantiationRequest, Process,
        ProcessStatus, QueueingLogHandler, ShutdownRequest)
from libmuscle.planner.planner import Resources

_logger = logging.getLogger(__name__)

[docs]class StateTracker: """Tracks processes and their state. This keeps a list of running processes and their state. It receives callbacks from QCG-PJ when the state of a job changes, and updates the list accordingly. Attributes: processes: Dict mapping instance names to Process objects. """ def __init__(self) -> None: """Create a StateTracker.""" self.processes: Dict[Reference, Process] = {} # These are for communicating with QCG-PJ self.queued_to_execute = 0 self.stop_processing = False self.zmq_address = '' # QCG-PJ callbacks
[docs] def job_executing(self, job_iteration: qcg_SchedulingIteration) -> None: """Called by Executor when a job has started to run. Args: job_iteration: The job iteration that has started """ name = self.processes[name].status = ProcessStatus.RUNNING _logger.debug(f'Job {name} running')
[docs] def job_finished( self, job_iteration: qcg_SchedulingIteration, allocation: qcg_Allocation, exit_code: int, error_msg: str, canceled: bool) -> None: """Called by executor when a job has finished. Args: job_iteration: The job iteration that has finished allocation: The allocation it ran on exit_code: Its exit code error_msg: Its error message canceled: Whether it was canceled """ name = process = self.processes[name] if canceled: process.status = ProcessStatus.CANCELED elif exit_code == 0: process.status = ProcessStatus.SUCCESS else: process.status = ProcessStatus.ERROR process.exit_code = exit_code process.error_msg = error_msg
[docs]class QCGPJInstantiator(mp.Process): """Background process for interacting with the QCG-PJ executor.""" def __init__( self, resources: mp.Queue, requests: mp.Queue, results: mp.Queue, log_records: mp.Queue, run_dir: Path) -> None: """Create a QCGPJProcessManager. Args: resources: Queue for returning the available resources requests: Queue to take requests from results: Queue to communicate finished processes over log_messages: Queue to push log messages to """ super().__init__(name='QCGPJProcessManager') self._resources_out = resources self._requests_in = requests self._results_out = results self._log_records_out = log_records self._run_dir = run_dir
[docs] def run(self) -> None: """Entry point for the process.""" # Put QCG-PJ output in run dir # The configuration setting below is ignored by the agents # due to a bug in QCG-PJ qcgpj_dir = self._run_dir / 'qcgpj' qcgpj_dir.mkdir(exist_ok=True) os.chdir(qcgpj_dir) self._reconfigure_logging() # Executor needs to be instantiated before we go async qcg_config: Dict[str, str] = {qcg_Config.AUX_DIR: str(qcgpj_dir)} self._qcg_resources = qcg_get_resources(qcg_config) self._state_tracker = StateTracker() self._executor = qcg_Executor( self._state_tracker, qcg_config, self._qcg_resources) self._send_resources() try: loop = asyncio.get_event_loop() loop.run_until_complete(self._main()) except: # noqa for line in traceback.format_exception(*sys.exc_info()): _logger.error(line) self._results_out.put(CrashedResult())
async def _main(self) -> None: """Main function for the background process. This sets up QCG-PJ and then accepts requests for instantiating jobs, stopping them, or shutting down. Results of finished jobs are returned via the results queue. """ qcg_iters: Dict[Reference, qcg_SchedulingIteration] = {} await asyncio.sleep(0.01) # allow requests_in queue to be populated shutting_down = False done = False while not done: while not shutting_down: try: request = self._requests_in.get_nowait() if isinstance(request, ShutdownRequest): _logger.debug('Got ShutdownRequest') self._state_tracker.stop_processing = True shutting_down = True elif isinstance(request, CancelAllRequest): _logger.debug('Got CancelAllRequest') await self._cancel_all(qcg_iters) _logger.debug('Done CancelAllRequest') elif isinstance(request, InstantiationRequest): if not shutting_down: qcg_alloc, qcg_iter = self._create_job( request, self._qcg_resources.rtype) qcg_iters[request.instance] = qcg_iter self._state_tracker.processes[request.instance] = ( Process( request.instance, request.resources)) self._state_tracker.queued_to_execute += 1 await self._executor.execute(qcg_alloc, qcg_iter) except queue.Empty: break for name, process in list(self._state_tracker.processes.items()): if process.status.is_finished(): _logger.debug(f'Reporting {name} done') self._results_out.put(process) del self._state_tracker.processes[name] if shutting_down: _logger.debug(f'Done: {self._state_tracker.processes}') done = len(self._state_tracker.processes) == 0 if not done: await asyncio.sleep(0.1) _logger.debug('Stopping executor') await self._executor.stop() def _reconfigure_logging(self) -> None: """Reconfigure logging to send to log_records_out.""" root_logger = logging.getLogger() for h in list(root_logger.handlers): root_logger.removeHandler(h) handler = QueueingLogHandler(self._log_records_out) root_logger.addHandler(handler) def _send_resources(self) -> None: """Converts and sends QCG available resources.""" resources = Resources() for node in self._qcg_resources.nodes: resources.cores[] = set(map(int, node.free_ids)) self._resources_out.put(resources) async def _cancel_all( self, qcg_iters: Dict[Reference, qcg_SchedulingIteration]) -> None: """Cancels all running jobs.""" # Repeat cancel until they're gone to work around QCG-PJ # race condition. while any([ not p.status.is_finished() for p in self._state_tracker.processes.values()]): for instance, process in self._state_tracker.processes.items(): if process.status.is_finished(): continue qcg_iter = qcg_iters[instance] try: await self._executor.cancel_iteration( qcg_iter.job, qcg_iter.iteration) _logger.debug(f'Canceled {instance}') except qcg_InternalError: _logger.debug(f'Canceled {instance} not found') raise except AttributeError: # Workaround for QCG-PJ bug _logger.debug(f'Canceled {instance} not found') raise await asyncio.sleep(0.1) def _create_job( self, request: InstantiationRequest, qcg_resources_type: qcg_ResourcesType ) -> Tuple[qcg_Allocation, qcg_SchedulingIteration]: """Creates a QCG allocation and job for a request.""" total_cores = sum(map(len, request.resources.cores.values())) env = self._create_env(request.instance, request.implementation.env) if request.implementation.script: execution = self._qcg_job_execution_with_script(request, env) else: execution = self._qcg_job_execution_normal( request, env, qcg_resources_type) resources = qcg_JobResources(numCores=total_cores) qcg_job = qcg_Job( str(request.instance), execution=execution, resources=resources) qcg_allocation = qcg_Allocation() for node_name, cores in request.resources.cores.items(): qcg_cores = [str(i) for i in cores] qcg_allocation.add_node( qcg_NodeAllocation(qcg_Node(node_name), qcg_cores, {})) sjob = qcg_SchedulingJob(self._state_tracker, qcg_job) qcg_iteration = qcg_SchedulingIteration(sjob, None, None, resources, []) return qcg_allocation, qcg_iteration def _create_env( self, instance: Reference, overlay: Dict[str, str] ) -> Dict[str, str]: """Updates the environment with the implementation's env. This updates env in-place. Keys from overlay that start with + will have the corresponding value appended to the matching (by key, without the +) value in env, otherwise the value in env gets overwritten. """ env = os.environ.copy() env['MUSCLE_INSTANCE'] = str(instance) for key, value in overlay.items(): if key.startswith('+'): if key[1:] in env: env[key[1:]] += value else: env[key[1:]] = value else: env[key] = value return env def _qcg_job_execution_with_script( self, request: InstantiationRequest, env: Dict[str, str] ) -> qcg_JobExecution: """Create a JobExecution with a run script.""" impl = request.implementation if impl.script is None: raise RuntimeError() script_file = request.instance_dir / '' with'w') as f: f.write(impl.script) script_file.chmod(0o700) if isinstance(request.res_req, ThreadedResReq): env['MUSCLE_THREADS'] = str(request.res_req.threads) elif isinstance(request.res_req, MPICoresResReq): # OpenMPI support rank_file = request.instance_dir / 'rankfile' with'w') as f: i = 0 for node, cores in request.resources.cores.items(): for c in sorted(cores): f.write(f'rank {i}={node} slot={c}\n') i += 1 env['MUSCLE_OPENMPI_RANK_FILE'] = str(rank_file) # IntelMPI support mpi_res_args = list() for node, cores in request.resources.cores.items(): mpi_res_args.extend(['-host', node, '-n', str(len(cores))]) env['MUSCLE_INTELMPI_RESOURCES'] = ' '.join(mpi_res_args) # General environment env['MUSCLE_MPI_PROCESSES'] = str( request.res_req.mpi_processes) env['MUSCLE_THREADS_PER_MPI_PROCESS'] = str( request.res_req.threads_per_mpi_process) return qcg_JobExecution( exec=str(script_file), env=env, stdout=str(request.stdout_path), stderr=str(request.stderr_path), wd=str(request.work_dir), model='default') def _qcg_job_execution_normal( self, request: InstantiationRequest, env: Dict[str, str], qcg_resources_type: qcg_ResourcesType) -> qcg_JobExecution: """Create a JobExecution for a normal description.""" impl = request.implementation total_cores = sum(map(len, request.resources.cores.values())) if impl.execution_model == ExecutionModel.DIRECT: env['OMP_NUM_THREADS'] = str(total_cores) else: env['OMP_NUM_THREADS'] = '1' model_map = { ExecutionModel.DIRECT: 'threads', ExecutionModel.OPENMPI: 'openmpi', ExecutionModel.INTELMPI: 'intelmpi', ExecutionModel.SRUNMPI: 'srunmpi'} qcg_execution_model = model_map[impl.execution_model] executable = str(impl.executable) args = impl.args if impl.args is not None else [] if qcg_resources_type == qcg_ResourcesType.LOCAL: if impl.execution_model == ExecutionModel.DIRECT: if not impl.virtual_env and not impl.modules: # QCG-PJ uses a bash call like this in all but this # case, so we add it here to be consistent. cmd = ' '.join([executable] + args) executable = 'bash' args = ['-l', '-c', cmd] elif impl.execution_model == ExecutionModel.OPENMPI: executable, args = self._with_local_open_mpi( executable, args, total_cores) elif impl.execution_model == ExecutionModel.INTELMPI: executable, args = self._with_local_intel_mpi( executable, args, total_cores) elif impl.execution_model == ExecutionModel.SRUNMPI: raise RuntimeError( f'Cannot instantiate implementation {} with' ' execution model "srunmpi", because there is no' ' Slurm/srun here.') _logger.debug(f'Starting {executable} with {args}') return qcg_JobExecution( exec=executable, args=args, env=env, stdout=str(request.stdout_path), stderr=str(request.stderr_path), modules=impl.modules, venv=str(impl.virtual_env) if impl.virtual_env else None, wd=str(request.work_dir), model=qcg_execution_model, model_opts={'srun_opts': ['--overlap']}) def _with_local_open_mpi( self, executable: str, args: List[str], num_processes: int ) -> Tuple[str, List[str]]: """Create OpenMPI mpirun call.""" new_args = ['-n', str(num_processes), executable] new_args.extend(args) return 'mpirun', new_args def _with_local_intel_mpi( self, executable: str, args: List[str], num_processes: int ) -> Tuple[str, List[str]]: """Create IntelMPI mpirun call.""" new_args = ['-n', str(num_processes), executable] new_args.extend(args) return 'mpirun', new_args