Source code for libmuscle.manager.instance_manager

import logging
from multiprocessing import Queue
from pathlib import Path
from queue import Empty
from textwrap import indent
from threading import Thread
from typing import Optional, Union

from ymmsl.v0_2 import Configuration, ExecutionModel, Reference

from libmuscle.errors import ConfigurationError
from libmuscle.manager.instance_registry import InstanceRegistry
from libmuscle.manager.instantiator import (
    CancelAllRequest,
    CrashedResult,
    InstantiationRequest,
    InstantiatorRequest,
    Process,
    ProcessStatus,
    ShutdownRequest,
)
from libmuscle.manager.logger import last_lines
from libmuscle.manager.run_dir import RunDir
from libmuscle.native_instantiator.native_instantiator import NativeInstantiator
from libmuscle.planner.planner import Planner, ResourceAssignment
from libmuscle.planner.resources import Resources

_logger = logging.getLogger(__name__)


[docs] class LogHandlingThread(Thread): """Pumps log records from a queue. This gets log records from a queue and sends them to the logging system. """ def __init__(self, queue: Queue) -> None: """Creates a LogHandlingThread. Args: queue: Queue to get log records from. """ super().__init__() self._queue = queue self._shutting_down = False
[docs] def shutdown(self) -> None: """Process remaining records and stop the thread.""" self._shutting_down = True
[docs] def run(self) -> None: """The thread's entry point.""" while True: try: record = self._queue.get(True, 0.1) logger = logging.getLogger(record.name) logger.handle(record) except Empty: if self._shutting_down: break
_ResultType = Union[Process, CrashedResult]
[docs] class InstanceManager: """Instantiates and manages running instances""" def __init__( self, configuration: Configuration, run_dir: RunDir, instance_registry: InstanceRegistry) -> None: """Create an InstanceManager. Args: configuration: The global configuration, flattened run_dir: Directory to run in instance_registry: The InstanceRegistry to use """ self._configuration = configuration self._run_dir = run_dir self._instance_registry = instance_registry self._resources_in: Queue[Resources] = Queue() self._requests_out: Queue[InstantiatorRequest] = Queue() self._results_in: Queue[_ResultType] = Queue() self._log_records_in: Queue[logging.LogRecord] = Queue() self._instantiator = NativeInstantiator( self._resources_in, self._requests_out, self._results_in, self._log_records_in, self._run_dir.path) self._instantiator.start() self._log_handler = LogHandlingThread(self._log_records_in) self._log_handler.start() self._allocations: Optional[dict[Reference, ResourceAssignment]] = None resources = self._resources_in.get() _logger.debug(f'Got resources {resources}') if isinstance(resources, CrashedResult): msg = ( 'Instantiator crashed. This should not happen, please file a bug' ' report.') _logger.error(msg) raise RuntimeError(msg) from resources.exception self._planner = Planner(resources) self._num_running = 0
[docs] def set_manager_location(self, location: str) -> None: """Sets the network location of the manager. Call this before starting any instances so that the location can be passed to them. Args: location: The network location (e.g. localhost:5000) """ self._manager_location = location
[docs] def start_all(self) -> None: """Starts all the instances of the model.""" self._allocations = self._planner.allocate_all(self._configuration) for instance, resources in self._allocations.items(): _logger.info(f'Planned {instance} on {resources.as_resources()}') model = self._configuration.root_model() for instance, resources in self._allocations.items(): component = model.components[instance.without_trailing_ints()] if component.implementation is None: _logger.warning( f'No implementation specified for {component.name}' ', not starting it.') continue program = self._configuration.programs[component.implementation] if program.execution_model is ExecutionModel.MANUAL: _logger.info( f'Instance {instance} has execution_model MANUAL' ' - please start it manually.') continue program.env['MUSCLE_MANAGER'] = self._manager_location idir = self._run_dir.add_instance_dir(instance) workdir = idir / 'workdir' workdir.mkdir() stdout_path = idir / 'stdout.txt' stderr_path = idir / 'stderr.txt' res_req = self._configuration.get_resources(model.name + component.name) request = InstantiationRequest( instance, program, res_req, resources, idir, workdir, stdout_path, stderr_path) _logger.info(f'Instantiating {instance}') self._requests_out.put(request) self._num_running += 1
[docs] def get_resources(self) -> dict[Reference, ResourceAssignment]: """Returns the resources allocated to each instance. Only call this after start_all() has been called, or it will raise an exception because the information is not available. Return: The resources for each instance instantiated by start_all() """ if self._allocations is None: raise RuntimeError( 'Tried to get resources but we are running without --start-all') return self._allocations
[docs] def wait(self) -> bool: """Waits for all instances to be done.""" all_seemingly_okay = True def cancel_all() -> None: nonlocal all_seemingly_okay if all_seemingly_okay: self._requests_out.put(CancelAllRequest()) all_seemingly_okay = False # Get all results results: list[Process] = list() while self._num_running > 0: result = self._results_in.get() if isinstance(result, CrashedResult): if isinstance(result.exception, ConfigurationError): _logger.error(str(result.exception)) else: _logger.error( 'Instantiator crashed. This should not happen, please file' ' a bug report.') return False results.append(result) if result.status != ProcessStatus.CANCELED: registered = self._instance_registry.did_register(result.instance) if result.exit_code != 0 or not registered: cancel_all() self._num_running -= 1 # Summarise outcome crashes: list[tuple[Process, Path]] = list() indirect_crashes: list[tuple[Process, Path]] = list() for result in results: if result.status == ProcessStatus.CANCELED: if result.exit_code == 0: _logger.info( f'Instance {result.instance} was not started' f' because of an error elsewhere') else: _logger.info( f'Instance {result.instance} was shut down by' f' MUSCLE3 because an error occurred elsewhere') # Ensure we don't see this as a succesful run when shutdown() is called # by another thread: all_seemingly_okay = False else: stderr_file = ( self._run_dir.instance_dir(result.instance) / 'stderr.txt') if result.exit_code == 0: if self._instance_registry.did_register(result.instance): _logger.info( f'Instance {result.instance} finished with' ' exit code 0') else: _logger.error( f'Instance {result.instance} quit with no error' ' (exit code 0), but it never registered with the' ' manager. Maybe it never created an Instance' ' object?') crashes.append((result, stderr_file)) else: try: with stderr_file.open() as f: peer_crash = any(['peer crash?' in line for line in f]) except FileNotFoundError: peer_crash = False if peer_crash: _logger.warning( f'Instance {result.instance} crashed, likely because' f' an error occurred elsewhere.') indirect_crashes.append((result, stderr_file)) else: _logger.error( f'Instance {result.instance} quit with exit code' f' {result.exit_code}') crashes.append((result, stderr_file)) _logger.debug(f'Status: {result.status}') _logger.debug(f'Exit code: {result.exit_code}') _logger.debug(f'Error msg: {result.error_msg}') # Show errors from crashed components if crashes: for result, stderr_file in crashes: _logger.error( f'The last error output of {result.instance} was:') _logger.error( '\n' + indent(last_lines(stderr_file, 20), ' ')) _logger.error( 'More output may be found in' f' {self._run_dir.instance_dir(result.instance)}\n' ) elif indirect_crashes: # Possibly a component exited without error, but prematurely. If this # caused ancillary crashes due to dropped connections, then the logs # of those will give a hint as to what the problem may be, so print # those instead. _logger.error( 'At this point, one or more instances crashed because they' ' lost their connection to another instance, but no other' ' crashing instance was found that could have caused this.') _logger.error( 'This means that either another instance quit before it was' ' supposed to, but with exit code 0, or there was an actual' ' network problem that caused the connection to drop.') _logger.error( 'Here is the output of the instances that lost connection:') for result, stderr_file in indirect_crashes: _logger.error( f'The last error output of {result.instance} was:') _logger.error( '\n' + indent(last_lines(stderr_file, 20), ' ')) _logger.error( 'More output may be found in' f' {self._run_dir.instance_dir(result.instance)}\n' ) elif not all_seemingly_okay: # shutdown() was called by another thread (e.g. the DeadlockDetector): _logger.error('The simulation was aborted.') else: _logger.info('The simulation finished without error.') return all_seemingly_okay
[docs] def shutdown(self) -> None: """Shuts down the backend. This will wait for any processes still running before shutting down and returning. """ _logger.debug('Shutting down instance manager') self._requests_out.put(CancelAllRequest()) self._requests_out.put(ShutdownRequest()) self._instantiator.join() self._log_handler.shutdown() self._log_handler.join() # Close multiprocessing.Queues and ensure their feeder threads exit queues: list[Queue] = [ self._resources_in, self._requests_out, self._results_in, self._log_records_in] for queue in queues: queue.close() queue.join_thread() _logger.debug('Instance manager shut down cleanly')