"""A simple runner for Python-only models.
Starting instances is out of scope for MUSCLE3, but is also very
useful for testing and prototyping. So we have a little bit of
support for it in this module.
"""
import logging
import multiprocessing as mp
import multiprocessing.connection as mpc
from pathlib import Path
import sys
from time import sleep
import traceback
from typing import Callable, Dict, List, Set, Tuple, cast
from warnings import catch_warnings, filterwarnings
from ymmsl import convert_to, Document
import ymmsl.v0_1 as v0_1
from ymmsl.v0_2 import Configuration, Identifier, Reference
from libmuscle.util import generate_indices
from libmuscle.manager.hammer import flatten
from libmuscle.manager.logger import last_lines
from libmuscle.manager.manager import Manager
__all__ = ['run_simulation']
_logger = logging.getLogger(__name__)
Pipe = Tuple[mpc.Connection, mpc.Connection]
class MMPServerController:
def __init__(
self, process: mp.Process, control_pipe: Pipe,
manager_location: str) -> None:
"""Create an MMPServerController.
This class controls a manager running in a separate process.
Args:
process: The process the server is running in
control_pipe: The control pipe for the server process
manager_location: Network location of the manager
"""
self._process = process
self._control_pipe = control_pipe
self.manager_location = manager_location
def stop(self) -> None:
"""Stop the server process.
Tells the server to stop listening, handle any running
requests, then quit, after which this function returns.
"""
self._control_pipe[0].send(True)
self._control_pipe[0].close()
self._process.join()
def manager_process(control_pipe: Pipe, configuration: Configuration) -> None:
"""Run function for a separate manager process.
Args:
pipe: The pipe through which to communicate with the parent.
configuration: The configuration to run.
"""
control_pipe[0].close()
manager = Manager(configuration)
control_pipe[1].send(manager.get_server_location())
# wait for shutdown command
control_pipe[1].recv()
control_pipe[1].close()
manager.stop()
def start_server_process(configuration: Configuration) -> MMPServerController:
"""Starts a manager in a separate process.
Args:
configuration: The configuration to run.
Returns:
A controller through which the manager can be shut down.
"""
control_pipe = mp.Pipe()
process = mp.Process(target=manager_process,
args=(control_pipe, configuration),
name='MuscleManager')
process.start()
control_pipe[1].close()
# wait for start
manager_location = control_pipe[0].recv()
return MMPServerController(process, control_pipe, manager_location)
def implementation_process(
instance_id: str, manager_location: str,
implementation: Callable) -> None:
prefix_tag = '--muscle-prefix='
name_prefix = str()
index_prefix: List[int] = []
instance = Reference(instance_id)
for i, arg in enumerate(sys.argv):
if arg.startswith(prefix_tag):
prefix_str = arg[len(prefix_tag):]
name_prefix, index_prefix = _parse_prefix(prefix_str)
name, index = _split_reference(instance)
if len(name_prefix) > 0:
name = Reference(name_prefix) + name
index = index_prefix + index
# replace it with the combined one
sys.argv[i] = '--muscle-instance={}'.format(str(name + index))
break
else:
sys.argv.append('--muscle-instance={}'.format(instance_id))
for arg in sys.argv:
if arg.startswith('--muscle-manager='):
break
else:
sys.argv.append(f'--muscle-manager={manager_location}')
root_logger = logging.getLogger()
handler = logging.FileHandler(f'muscle3.{instance}.log', 'w')
handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s %(filename)s:%(lineno)d %(message)s'))
root_logger.addHandler(handler)
# chain call
try:
implementation()
except Exception:
traceback.print_exc()
_logger.error(
f'Component {instance} crashed, please check the log file'
' for error messages')
exit(1)
def _parse_prefix(prefix: str) -> Tuple[str, List[int]]:
"""Parse a --muscle-prefix argument.
This is like a Reference, but not quite, because the
initial identifier may be omitted. That is, [1][2] is
also a valid prefix.
This parses an initial identifier, subsequent identifiers
separated by periods, then a list of square-bracketed integers.
Args:
prefix: The prefix to parse.
Returns:
The identifier sequence and the list of ints.
"""
def parse_identifier(prefix: str, i: int) -> Tuple[str, int]:
name = str()
while i < len(prefix) and prefix[i] not in '[.':
name += prefix[i]
i += 1
return name, i
def parse_number(prefix: str, i: int) -> Tuple[int, int]:
number = str()
while i < len(prefix) and prefix[i] in '0123456789':
number += prefix[i]
i += 1
return int(number), i
name = str()
index: List[int] = []
i = 0
if i == len(prefix):
return name, index
idt, i = parse_identifier(prefix, i)
name += idt
while i < len(prefix) and prefix[i] == '.':
name += '.'
part, i = parse_identifier(prefix, i + 1)
name += part
while i < len(prefix) and prefix[i] == '[':
nmb, i = parse_number(prefix, i + 1)
index.append(nmb)
if prefix[i] != ']':
raise ValueError('Missing closing bracket in'
' --muscle-prefix.')
i += 1
if i < len(prefix):
raise ValueError(('Found invalid extra character {} in'
' --muscle-prefix.').format(prefix[i]))
return name, index
def _split_reference(ref: Reference) -> Tuple[Reference, List[int]]:
index: List[int] = []
i = 0
while i < len(ref) and isinstance(ref[i], Identifier):
i += 1
name = ref[:i]
while i < len(ref) and isinstance(ref[i], int):
index.append(cast(int, ref[i]))
i += 1
return name, index
def run_instances(
instances: Dict[str, Callable], manager_location: str) -> None:
"""Runs the given instances and waits for them to finish.
The instances are described in a dictionary with their instance
id (e.g. 'macro' or 'micro[12]' or 'my_mapper') as the key, and
a function to run as the corresponding value. Each instance
will be run in a separate process.
Args:
instances: A dictionary of instances to run
manager_location: Network location of the manager
"""
# start processes
instance_processes = list()
for instance_id_str, implementation in instances.items():
instance_id = Reference(instance_id_str)
process = mp.Process(
target=implementation_process,
args=(instance_id_str, manager_location, implementation),
name=str(instance_id))
process.start()
instance_processes.append(process)
# wait for them to finish or one to fail
failed_processes: List[mp.Process] = list()
done_processes: Set[mp.Process] = set()
while len(done_processes) < len(instance_processes) and not failed_processes:
sleep(0.5)
for instance_process in instance_processes:
if instance_process not in done_processes:
if not instance_process.is_alive():
done_processes.add(instance_process)
if instance_process.exitcode != 0:
failed_processes.append(instance_process)
# if one failed, shut down the rest
if failed_processes:
for process in instance_processes:
if process not in done_processes:
process.terminate()
process.join(1.0)
if process.is_alive():
process.kill()
failed_names = [proc.name for proc in failed_processes]
log_files = [Path(f'muscle3.{name}.log') for name in failed_names]
outputs = [last_lines(log_file, 20) for log_file in log_files]
msg = (
'Instance(s) {} failed to shut down cleanly. Here is the final'
' bit of the output:').format(', '.join(failed_names))
for name, output in zip(failed_names, outputs):
msg += '\n ---------- ' + name + ' ----------\n'
msg += output + '\n'
msg += f'See muscle3.{name}.log for the complete output\n'
raise RuntimeError(msg)
[docs]
def run_simulation(
configuration: Document, implementations: Dict[str, Callable]
) -> None:
"""Runs a simulation with the given configuration and instances.
The yMMSL document must contain a complete configuration.
This function will start the necessary instances described in the configuration. To
do so, it needs the corresponding implementations, which are given as a dictionary
mapping the implementation name to a Python function (or any callable).
Args:
configuration: A description of the model and settings, either v0.1 or v0.2.
instances: A dictionary of instances to run.
"""
if isinstance(configuration, v0_1.PartialConfiguration):
with catch_warnings():
filterwarnings('ignore', 'In yMMSL v0.2.*')
filterwarnings('ignore', 'Comments can unfortunately.*')
configuration = convert_to(Configuration, configuration)
elif not isinstance(configuration, Configuration):
raise RuntimeError('Invalid configuration type {type(configuration)}')
if configuration.imports:
raise RuntimeError(
'Imports are not supported when running directly from Python. To run'
' larger models, please use the command line.')
configuration.check_consistent(False)
configuration = flatten(configuration)
model = configuration.root_model()
instances = dict()
for ce in model.components.values():
impl_name = str(ce.implementation)
if impl_name not in implementations:
raise ValueError(('The model specifies an implementation named'
' "{}" but the given set of implementations does'
' not include it.').format(impl_name))
impl_fn = implementations[impl_name]
if not ce.multiplicity:
instances[str(ce.name)] = impl_fn
else:
for index in generate_indices(ce.multiplicity):
instance_id = str(ce.name + index)
instances[instance_id] = impl_fn
controller = start_server_process(configuration)
try:
run_instances(instances, controller.manager_location)
finally:
controller.stop()