Distributed execution

In the previous section, we created a simple macro-micro multiscale model with MUSCLE3, and ran it as a single Python script. This section briefly explains how to go from there to a distributed simulation, possibly on multiple nodes.

Note that distributed simulations are not as easy to use as we would like them to be yet. All the necessary pieces are in place however. Below, we explain how they work. If you want to run the example below using a premade script, then you can go to the docs/examples/ directory in the source code, and set up and run the Python example like this:

docs/examples$ make python
docs/examples$ ./reaction_diffusion_python.sh

Below, we explain how it works and how to run it by hand. First though, a bit of terminology:

Component

Coupled simulations are built out of components. A component is a model, or some none-model helper component that e.g. converts data, samples parameters, or does load balancing. Components have ports, which can be connected by conduits. Components are abstract things in the MUSCLE3 configuration.

Implementation

An implementation is a computer program that implements a particular component. If you browse through the examples, you’ll find implementations of the reaction and diffusion models in different programming languages. They all implement the same model (component) though.

Instance

An instance is a running program. Instances are made by starting, or instantiating, an implementation. Components have an attribute called their multiplicity which specifies how many instances of the associated implementation there are. Often, and by default, this will be one but for e.g. UQ ensembles or spatial scale separation many instances of a component may be required.

Making separate implementations

Previously, we started a simulation by starting a single Python script that contained all of the implementations and also the model configuration. For distributed running, we want to run the models separately, so our script needs to be split up. Doing that is very simple: just copy the model function (and anything it depends on) to a new file, and add a main clause that runs it. For the reaction model of the previous example, that looks like this:

docs/source/examples/python/reaction.py
import logging

from libmuscle import Grid, Instance, Message
from ymmsl import Operator


def reaction() -> None:
    """A simple exponential reaction model on a 1D grid.
    """
    instance = Instance({
            Operator.F_INIT: ['initial_state'],     # 1D Grid
            Operator.O_F: ['final_state']})         # 1D Grid

    while instance.reuse_instance():
        # F_INIT
        t_max = instance.get_setting('t_max', 'float')
        dt = instance.get_setting('dt', 'float')
        k = instance.get_setting('k', 'float')

        msg = instance.receive('initial_state')
        U = msg.data.array.copy()

        t_cur = msg.timestamp
        while t_cur + dt < msg.timestamp + t_max:
            # O_I

            # S
            U += k * U * dt
            t_cur += dt

        # O_F
        instance.send('final_state', Message(t_cur, data=Grid(U, ['x'])))


if __name__ == '__main__':
    logging.basicConfig()
    logging.getLogger().setLevel(logging.INFO)
    reaction()

Note that the code is exactly the same, we’ve just removed everything related to the diffusion model and to the coupling between them. We can do the same to the diffusion model:

docs/source/examples/python/diffusion.py
import logging
import os

import numpy as np

from libmuscle import Grid, Instance, Message
from ymmsl import Operator


def laplacian(Z: np.array, dx: float) -> np.array:
    """Calculates the Laplacian of vector Z.

    Args:
        Z: A vector representing a series of samples along a line.
        dx: The spacing between the samples.

    Returns:
        The second spatial derivative of Z.
    """
    Zleft = Z[:-2]
    Zright = Z[2:]
    Zcenter = Z[1:-1]
    return (Zleft + Zright - 2. * Zcenter) / dx**2


def diffusion() -> None:
    """A simple diffusion model on a 1d grid.

    The state of this model is a 1D grid of concentrations. It sends
    out the state on each timestep on `state_out`, and can receive an
    updated state on `state_in` at each state update.
    """
    logger = logging.getLogger()
    instance = Instance({
            Operator.O_I: ['state_out'],
            Operator.S: ['state_in'],
            Operator.O_F: ['final_state_out']})

    while instance.reuse_instance():
        # F_INIT
        t_max = instance.get_setting('t_max', 'float')
        dt = instance.get_setting('dt', 'float')
        x_max = instance.get_setting('x_max', 'float')
        dx = instance.get_setting('dx', 'float')
        d = instance.get_setting('d', 'float')

        U = np.zeros(int(round(x_max / dx))) + 1e-20
        U[25] = 2.0
        U[50] = 2.0
        U[75] = 2.0
        Us = U

        t_cur = 0.0
        while t_cur + dt <= t_max:
            # O_I
            t_next = t_cur + dt
            if t_next + dt > t_max:
                t_next = None
            cur_state_msg = Message(t_cur, t_next, Grid(U, ['x']))
            instance.send('state_out', cur_state_msg)

            # S
            msg = instance.receive('state_in', default=cur_state_msg)
            if msg.timestamp > t_cur + dt:
                logger.warning('Received a message from the future!')
            np.copyto(U, msg.data.array)

            dU = np.zeros_like(U)
            dU[1:-1] = d * laplacian(U, dx) * dt
            dU[0] = dU[1]
            dU[-1] = dU[-2]

            U += dU
            Us = np.vstack((Us, U))
            t_cur += dt

        # O_F
        final_state_msg = Message(t_cur, data=Grid(U, ['x']))
        instance.send('final_state_out', final_state_msg)

        if 'DONTPLOT' not in os.environ and 'SLURM_NODENAME' not in os.environ:
            from matplotlib import pyplot as plt
            plt.figure()
            plt.imshow(
                    np.log(Us + 1e-20),
                    origin='upper',
                    extent=[
                        -0.5*dx, x_max - 0.5*dx,
                        (t_max - 0.5*dt) * 1000.0, -0.5*dt * 1000.0],
                    interpolation='none',
                    aspect='auto'
                    )
            cbar = plt.colorbar()
            cbar.set_label('log(Concentration)', rotation=270, labelpad=20)
            plt.xlabel('x')
            plt.ylabel('t (ms)')
            plt.title('Concentration over time')
            plt.show()


if __name__ == '__main__':
    logging.basicConfig()
    logging.getLogger().setLevel(logging.INFO)
    diffusion()

Again, it’s exactly the same code as before, just split off into a separate file.

yMMSL files

In a distributed set-up, the manager and each instance run as separate programs, communicating via the network. To make this work, we need to describe how the different models need to be connected, how the programs implementing them need to be started, and we may want to configure them by specifying some simulation settings. All this information is stored in one or more yMMSL files. This is the YAML version of the Multiscale Modelling and Simulation Language, in case you were wondering about the acronym.

It is often convenient to split your configuration over multiple files. That way, you can easily run the same simulation with different settings for example: just specify a different settings file while keeping the others the same.

Here is a yMMSL file describing our previous example. Note that it’s one directory up, in the examples/ dir rather than under examples/python. This is because there are some examples that mix languages as well.

docs/source/examples/rd_python.ymmsl
ymmsl_version: v0.1

model:
  name: reaction_diffusion_python

  components:
    macro:
      implementation: diffusion_python
      ports:
        o_i: state_out
        s: state_in

    micro:
      implementation: reaction_python
      ports:
        f_init: initial_state
        o_f: final_state

  conduits:
    macro.state_out: micro.initial_state
    micro.final_state: macro.state_in

resources:
  macro:
    threads: 1
  micro:
    threads: 1

As you can see, this looks like the object representation, although there are a few more things being specified. You can load a yMMSL file from Python using ymmsl.load and save it back using ymmsl.save.

Let’s have a look at this file:

ymmsl_version: v0.1

model:
  name: reaction_diffusion_python

This specifies the version of the file format (v0.1 is currently the only version), and the name of the model.

components:
  macro:
    implementation: diffusion_python
    ports:
      o_i: state_out
      s: state_in

  micro:
    implementation: reaction_python
    ports:
      f_init: initial_state
      o_f: final_state

We have seen the macro and micro components before. The implementations have an added _python in their names here, because there are also examples in other languages. A new thing is that the ports are now declared in the configuration (they are also still in the code, and need to be). The manager needs this information to be able to assign resources to the components. You can write a list of ports if you have more than one port for an operator, see ymmsl.Ports.

conduits:
  macro.state_out: micro.initial_state
  micro.final_state: macro.state_in

Here are the conduits that connect the models together. The ports used here should match those given above, of course. As before, macro sends messages on its state_out port to micro``s ``initial_state, and micro sends its answer on its final_state port, which is routed to macro``s ``state_in.

resources:
  macro:
    threads: 1
  micro:
    threads: 1

Finally, we need to tell MUSCLE3 which resources each model instance should be given. For the moment, only the number of threads or MPI processes can be specified. In this case, the implementations are single-threaded Python scripts, so we specify one thread each. See the yMMSL documentation on resources for other options.

To be able to instantiate this model, we still need to define the diffusion_python and reaction_python implementations. We also need some settings for the configuration. These are specified in two additional files, rd_implementations.ymmsl and rd_settings.ymmsl. The former actually contains definitions for all the example implementations, but we’ll focus on the parts that define the Python ones we’re using here:

implementations:
  reaction_python:
    virtual_env: <muscle3_src>/docs/source/examples/python/build/venv
    executable: python
    args: <muscle3_src>/docs/source/examples/python/reaction.py

  diffusion_python:
    virtual_env: <muscle3_src>/docs/source/examples/python/build/venv
    executable: python
    args: <muscle3_src>/docs/source/examples/python/diffusion.py

As you can see, there are some absolute paths here, with a prefix shown here as <muscle3_src>. If you’ve run make in the docs/source/examples/ directory then you should have a version of this file with the correct paths for your local setup.

There are two implementations shown here, which the components above refer to by name. They are Python scripts, and they have some dependencies which are installed in the virtual environment created earlier by make. So in order to run them, that virtualenv has to be activated, and then we can run python /path/to/reaction.py or equivalent for the diffusion.py script, and that is exactly these definitions do. Of course, it’s also possible to add a #!/usr/bin/env python3 line to the top of the Python script and start it directly as an executable.

Finally, the settings in rd_settings.ymmsl:

docs/source/examples/rd_settings.ymmsl
ymmsl_version: v0.1

settings:
  muscle_local_log_level: INFO
  muscle_remote_log_level: WARNING

  micro.t_max: 2.469136e-06
  micro.dt: 2.469136e-08
  macro.t_max: 0.0001234568
  macro.dt: 2.469136e-06
  x_max: 1.0
  dx: 0.01
  k: -40500.0
  d: 0.0405

This matches what we’ve seen before in the integrated Python example, it’s now just written up as YAML.

Starting the simulation

With the above in place, we now have all the pieces we need to start a distributed simulation. We do this by starting the MUSCLE3 manager and giving it the configuration files. It will then start the needed model instances, help then to find each other, and distribute the settings to them. The instances will then connect to each other via the network and run the simulation.

The manager is included with the Python version of MUSCLE3. Running make or make python in the docs/source/examples directory will create a virtual env with MUSCLE3 and all the dependencies needed by the examples installed. With that set up, you can run the simulation from the docs/source/examples directory like this:

. python/build/venv/bin/activate
muscle_manager --start-all rd_implementations.ymmsl rd_python.ymmsl rd_settings.ymmsl

This will start the manager, run the simulation, plot the results on the screen, and when you close the plot, finish the simulation. It will also produce a directory named run_reaction_diffusion_python_<date>_<time> in which some output is written. Have a look at the muscle3_manager.log file for an overview of the execution, or see instance/<instance>/stderr.txt for log output of a particular instance (we use the default Python logging configuration here, which writes to standard error, which is redirected to this file by the manager).

If you want, you can change the log levels in rd_settings.ymmsl to DEBUG and re-run the simulation to have a more detailed look at what’s going on. The remote log level setting determines the minimum severity a message needs to have to be sent from the instance to the manager to be included in the manager log. The local log level determines whether the message will be logged to the local Python logger. This is also very useful if something goes wrong and you need to debug, of course.

High-Performance Computing

Coupled simulations often take a significant amount of compute resources, requiring the use of High-Performance Computing facilities. MUSCLE3 has support for running inside of an allocation on an HPC cluster running the SLURM scheduling system. It can determine the available resources (nodes and cores), suballocate them to the various instances required to run the simulation, and start each instance on its allocated resources. (With thanks to its friend QCG-PilotJob.)

Determining resource requirements

The MUSCLE Manager will automatically detect when it’s inside of a SLURM allocation, and instantiate implementations accordingly. However, that allocation still needs to be made by the user, and to do that we need to know how many resources to request. Since instances may share resources (cores) in some cases, this is not easy to see from the configuration.

Fortunately, MUSCLE3 can do this for us, using the muscle3 resources command:

muscle3 resources --cores-per-node <n> <ymmsl files>

This will print a single number, the number of nodes to allocate, which can then be passed to sbatch -N <n>.

For example, for the Python example above, we can run:

muscle3 resources --cores-per-node 16 rd_implementations.ymmsl rd_python.ymmsl rd_settings.ymmsl

and be informed that we’ll need only a single node. (rd_settings.ymmsl is actually redundant here and could be omitted, the analysis is based only on the model components, conduits, implementations and resources.)

To get more insight into how the instances will be divided over the available resources, you can use the -v option to enable verbose output. For the Python model, this prints:

A total of 1 node will be needed, as follows:

macro: Resources(node000001: 0)
micro: Resources(node000001: 0)

As we can see, the models are allocated a single core each (they are both single-threaded), and MUSCLE3 has determined that they can share a core because they won’t be computing at the same time due to the macro-micro multiscale coupling pattern. For more interesting output, try this command on the example from the Uncertainty Quantification section, and increase the number of instances a bit.

To determine whether models can overlap, MUSCLE3 assumes that models do not do any significant amount of computation before receiving the F_INIT messages, in between having sent the O_I messages and receiving the S messages, and after having sent the O_F messages. If an implementation does do this, then the simulation will still run, but different models may end up competing for the same cores. This will slow down the simulation. To avoid this, an implementation may be marked as not being able to share resources:

implementations:
  industrious:
    executable: /home/user/models/industrious
    execution_model: openmpi
    can_share_resources: false

If an implementation is marked as such, the MUSCLE Manager will give it a set of cores of its own, so that it can compute whenever it wants.

Loading modules

On an HPC machine, you often need to load some environment modules to make needed software available. If an implementation needs to have modules available to run, then you should use the modules option when describing the implementation:

implementations:
  on_hpc:
    modules: c++ openmpi
    executable: /home/user/models/mpi_model
    execution_model: openmpi

Hyperthreading

If hyperthreading is enabled on the HPC cluster you are running on, then SLURM will allocate threads, not cores. This may give you a performance boost, it may make no difference, or it may decrease performance. To disable hyperthreading and allocate full physical cores, you can pass --ntasks-per-node=<x> to sbatch, with <x> being the number of physical cores per node. This will cause SLURM to tell MUSCLE3 (via QCG-PilotJob) to only use the first x virtual cores on each machine, which then get a physical core to themselves because the second set of virtual cores isn’t used.