Source code for libmuscle.manager.manager

from pathlib import Path
import logging
import sys
import traceback
from typing import Optional

from ymmsl import Model, PartialConfiguration, save as save_ymmsl

from libmuscle.manager.instance_registry import InstanceRegistry
from libmuscle.manager.logger import Logger
from libmuscle.manager.mmp_server import MMPServer
from libmuscle.manager.instance_manager import InstanceManager
from libmuscle.manager.profile_store import ProfileStore
from libmuscle.manager.run_dir import RunDir
from libmuscle.manager.snapshot_registry import SnapshotRegistry
from libmuscle.manager.topology_store import TopologyStore


_logger = logging.getLogger(__name__)


[docs]class Manager: """The MUSCLE3 manager. This creates and manages instances and connects them together according to the simulation configuration. """ def __init__( self, configuration: PartialConfiguration, run_dir: Optional[RunDir] = None, log_level: Optional[str] = None ) -> None: """Create a Manager. This creates the manager and the associated server, but does not start any instances. Args: configuration: The simulation configuration. run_dir: Main working directory. """ self._configuration = configuration self._run_dir = run_dir log_dir = self._run_dir.path if self._run_dir else Path.cwd() self._logger = Logger(log_dir, log_level) self._profile_store = ProfileStore(log_dir) self._topology_store = TopologyStore(configuration) self._instance_registry = InstanceRegistry() if run_dir is not None: snapshot_dir = run_dir.snapshot_dir() else: snapshot_dir = Path.cwd() if self._configuration.checkpoints: _logger.warning('Checkpoints are configured but no run' ' directory is provided. Snapshots will be' ' stored in the current working directory.') if self._run_dir: save_ymmsl( self._configuration, self._run_dir.path / 'configuration.ymmsl') if isinstance(self._configuration.model, Model): self._profile_store.store_instances([ instance_name for c in self._configuration.model.components for instance_name in c.instances()]) self._instance_manager: Optional[InstanceManager] = None try: configuration = self._configuration.as_configuration() if self._run_dir is not None: self._instance_manager = InstanceManager( configuration, self._run_dir, self._instance_registry) except ValueError: pass # SnapshotRegistry creates a worker thread, must be created after # instance_manager which forks the process self._snapshot_registry = SnapshotRegistry( configuration, snapshot_dir, self._topology_store) self._snapshot_registry.start() self._server = MMPServer( self._logger, self._profile_store, self._configuration, self._instance_registry, self._topology_store, self._snapshot_registry, run_dir) if self._instance_manager: self._instance_manager.set_manager_location( self.get_server_location())
[docs] def get_server_location(self) -> str: """Returns the network location of the server.""" return self._server.get_location()
[docs] def start_instances(self) -> None: """Starts all required component instances.""" if self._run_dir is None: message = 'No run dir specified' _logger.error(message) raise RuntimeError(message) if not self._instance_manager: message = ( 'For MUSCLE3 to be able to start instances, the' ' configuration must contain a model, implementations,' ' and resources. Please make sure they are all there.') _logger.error(message) raise RuntimeError(message) try: self._instance_manager.start_all() self._profile_store.store_resources( self._instance_manager.get_resources()) except: # noqa _logger.error('An error occurred while starting the components:') for line in traceback.format_exception(*sys.exc_info()): _logger.error(line) self._instance_manager.shutdown() raise
[docs] def stop(self) -> None: """Shuts down the manager.""" if self._instance_manager: self._instance_manager.shutdown() self._server.stop() self._snapshot_registry.shutdown() self._snapshot_registry.join() self._profile_store.shutdown() self._logger.close()
[docs] def wait(self) -> bool: """Blocks until the simulation is done, then shuts down. Returns: True if success, False if an error occurred. """ if self._instance_manager: try: success = self._instance_manager.wait() finally: self._instance_manager.shutdown() else: self._instance_registry.wait() success = True self.stop() return success