Source code for libmuscle.manager.instance_manager

import logging
from pathlib import Path
from textwrap import indent
from threading import Thread
from typing import Dict, List, Optional, Tuple, Union
from multiprocessing import Queue
import queue

from ymmsl import Configuration, Reference

from libmuscle.manager.instance_registry import InstanceRegistry
from libmuscle.manager.instantiator import (
        CancelAllRequest, CrashedResult, InstantiatorRequest,
        InstantiationRequest, ProcessStatus, ShutdownRequest)
from libmuscle.manager.logger import last_lines
from libmuscle.manager.qcgpj_instantiator import Process, QCGPJInstantiator
from libmuscle.manager.run_dir import RunDir
from libmuscle.planner.planner import Planner, Resources


_logger = logging.getLogger(__name__)


[docs]class LogHandlingThread(Thread): """Pumps log records from a queue. This gets log records from a queue and sends them to the logging system. """ def __init__(self, queue: Queue) -> None: """Creates a LogHandlingThread. Args: queue: Queue to get log records from. """ super().__init__() self._queue = queue self._shutting_down = False
[docs] def shutdown(self) -> None: """Process remaining records and stop the thread.""" self._shutting_down = True
[docs] def run(self) -> None: """The thread's entry point.""" while True: try: record = self._queue.get(True, 0.1) logger = logging.getLogger(record.name) logger.handle(record) except queue.Empty: if self._shutting_down: break
_ResultType = Union[Process, CrashedResult]
[docs]class InstanceManager: """Instantiates and manages running instances""" def __init__( self, configuration: Configuration, run_dir: RunDir, instance_registry: InstanceRegistry) -> None: """Create a ProcessManager. Args: configuration: The global configuration run_dir: Directory to run in instance_registry: The InstanceRegistry to use """ self._configuration = configuration self._run_dir = run_dir self._instance_registry = instance_registry self._resources_in: Queue[Resources] = Queue() self._requests_out: Queue[InstantiatorRequest] = Queue() self._results_in: Queue[_ResultType] = Queue() self._log_records_in: Queue[logging.LogRecord] = Queue() self._instantiator = QCGPJInstantiator( self._resources_in, self._requests_out, self._results_in, self._log_records_in, self._run_dir.path) self._instantiator.start() self._log_handler = LogHandlingThread(self._log_records_in) self._log_handler.start() self._allocations: Optional[Dict[Reference, Resources]] = None self._planner = Planner(self._resources_in.get()) self._num_running = 0
[docs] def set_manager_location(self, location: str) -> None: """Sets the network location of the manager. Call this before starting any instances so that the location can be passed to them. Args: location: The network location (e.g. localhost:5000) """ self._manager_location = location
[docs] def start_all(self) -> None: """Starts all the instances of the model.""" self._allocations = self._planner.allocate_all(self._configuration) for instance, resources in self._allocations.items(): _logger.info(f'Planned {instance} on {resources}') components = {c.name: c for c in self._configuration.model.components} for instance, resources in self._allocations.items(): component = components[instance.without_trailing_ints()] if component.implementation is None: _logger.warning( f'No implementation specified for {component.name}' ', not starting it.') continue implementation = self._configuration.implementations[ component.implementation] implementation.env['MUSCLE_MANAGER'] = self._manager_location idir = self._run_dir.add_instance_dir(instance) workdir = idir / 'workdir' workdir.mkdir() stdout_path = idir / 'stdout.txt' stderr_path = idir / 'stderr.txt' request = InstantiationRequest( instance, implementation, self._configuration.resources[component.name], resources, idir, workdir, stdout_path, stderr_path) _logger.info(f'Instantiating {instance} on {resources}') self._requests_out.put(request) self._num_running += 1
[docs] def get_resources(self) -> Dict[Reference, Resources]: """Returns the resources allocated to each instance. Only call this after start_all() has been called, or it will raise an exception because the information is not available. Return: The resources for each instance instantiated by start_all() """ if self._allocations is None: raise RuntimeError( 'Tried to get resources but we are running without' ' --start-all') return self._allocations
[docs] def wait(self) -> bool: """Waits for all instances to be done.""" all_seemingly_okay = True def cancel_all() -> None: nonlocal all_seemingly_okay if all_seemingly_okay: self._requests_out.put(CancelAllRequest()) all_seemingly_okay = False # Get all results results: List[Process] = list() while self._num_running > 0: result = self._results_in.get() if isinstance(result, CrashedResult): _logger.error( 'Instantiator crashed. This should not happen, please file' ' a bug report.') return False results.append(result) if result.status != ProcessStatus.CANCELED: registered = self._instance_registry.did_register(result.instance) if result.exit_code != 0 or not registered: cancel_all() self._num_running -= 1 # Summarise outcome crashes: List[Tuple[Process, Path]] = list() indirect_crashes: List[Tuple[Process, Path]] = list() for result in results: if result.status == ProcessStatus.CANCELED: if result.exit_code == 0: _logger.info( f'Instance {result.instance} was not started' f' because of an error elsewhere') else: _logger.info( f'Instance {result.instance} was shut down by' f' MUSCLE3 because an error occurred elsewhere') else: stderr_file = ( self._run_dir.instance_dir(result.instance) / 'stderr.txt') if result.exit_code == 0: if self._instance_registry.did_register(result.instance): _logger.info( f'Instance {result.instance} finished with' ' exit code 0') else: _logger.error( f'Instance {result.instance} quit with no error' ' (exit code 0), but it never registered with the' ' manager. Maybe it never created an Instance' ' object?') crashes.append((result, stderr_file)) else: with stderr_file.open() as f: peer_crash = any(['peer crash?' in line for line in f]) if peer_crash: _logger.warning( f'Instance {result.instance} crashed, likely because' f' an error occurred elsewhere.') indirect_crashes.append((result, stderr_file)) else: _logger.error( f'Instance {result.instance} quit with exit code' f' {result.exit_code}') crashes.append((result, stderr_file)) _logger.debug(f'Status: {result.status}') _logger.debug(f'Exit code: {result.exit_code}') _logger.debug(f'Error msg: {result.error_msg}') # Show errors from crashed components if crashes: for result, stderr_file in crashes: _logger.error( f'The last error output of {result.instance} was:') _logger.error( '\n' + indent(last_lines(stderr_file, 20), ' ')) _logger.error( 'More output may be found in' f' {self._run_dir.instance_dir(result.instance)}\n' ) else: # Possibly a component exited without error, but prematurely. If this # caused ancillary crashes due to dropped connections, then the logs # of those will give a hint as to what the problem may be, so print # those instead. _logger.error( 'At this point, one or more instances crashed because they' ' lost their connection to another instance, but no other' ' crashing instance was found that could have caused this.') _logger.error( 'This means that either another instance quit before it was' ' supposed to, but with exit code 0, or there was an actual' ' network problem that caused the connection to drop.') _logger.error( 'Here is the output of the instances that lost connection:') for result, stderr_file in indirect_crashes: _logger.error( f'The last error output of {result.instance} was:') _logger.error( '\n' + indent(last_lines(stderr_file, 20), ' ')) _logger.error( 'More output may be found in' f' {self._run_dir.instance_dir(result.instance)}\n' ) return all_seemingly_okay
[docs] def shutdown(self) -> None: """Shuts down the backend. This will wait for any processes still running before shutting down and returning. """ _logger.debug('Shutting down instance manager') self._requests_out.put(CancelAllRequest()) self._requests_out.put(ShutdownRequest()) self._instantiator.join() self._log_handler.shutdown() self._log_handler.join() _logger.debug('Instance manager shut down cleanly')