Distributed execution

In the previous section, we created a simple macro-micro multiscale model with MUSCLE 3, and ran it as a single Python script. This section briefly explains how to go from there to a distributed simulation, possibly on multiple nodes.

Note that distributed simulations are not as easy to use as we would like them to be yet. All the necessary pieces are in place however. Below, we explain how they work. If you want to run the example below using a premade script, then you can go to the docs/examples/ directory in the source code, and set up and run the Python example like this:

docs/examples$ make python
docs/examples$ ./reaction_diffusion_python.sh

Below, we explain how it works and how to run it by hand.

Making separate instances

Previously, we started a simulation by starting a single Python script that contained all of the implementations and also the model configuration. For distributed running, we want to run the instances separately, so our script needs to be split up. Doing that is very simple: just copy the model function (and anything it depends on) to a new file, and add a main clause that runs it. For the reaction model of the previous example, that looks like this:

docs/source/examples/python/reaction.py
import numpy as np

from libmuscle import Grid, Instance, Message
from ymmsl import Operator


def reaction() -> None:
    """A simple exponential reaction model on a 1D grid.
    """
    instance = Instance({
            Operator.F_INIT: ['initial_state'],     # list of float
            Operator.O_F: ['final_state']})         # list of float

    while instance.reuse_instance():
        # F_INIT
        t_max = instance.get_setting('t_max', 'float')
        dt = instance.get_setting('dt', 'float')
        k = instance.get_setting('k', 'float')

        msg = instance.receive('initial_state')
        U = msg.data.array.copy()

        t_cur = msg.timestamp
        while t_cur + dt < msg.timestamp + t_max:
            # O_I

            # S
            U += k * U * dt
            t_cur += dt

        # O_F
        instance.send('final_state', Message(t_cur, None, Grid(U, ['x'])))


if __name__ == '__main__':
    reaction()

Note that the code is exactly the same, we’ve just removed everything related to the diffusion model and to the coupling between them. We can do the same to the diffusion model:

docs/source/examples/python/diffusion.py
import logging
import os

import numpy as np

from libmuscle import Grid, Instance, Message
from ymmsl import Operator


def laplacian(Z: np.array, dx: float) -> np.array:
    """Calculates the Laplacian of vector Z.

    Args:
        Z: A vector representing a series of samples along a line.
        dx: The spacing between the samples.

    Returns:
        The second spatial derivative of Z.
    """
    Zleft = Z[:-2]
    Zright = Z[2:]
    Zcenter = Z[1:-1]
    return (Zleft + Zright - 2. * Zcenter) / dx**2


def diffusion() -> None:
    """A simple diffusion model on a 1d grid.

    The state of this model is a 1D grid of concentrations. It sends
    out the state on each timestep on `state_out`, and can receive an
    updated state on `state_in` at each state update.
    """
    logger = logging.getLogger()
    instance = Instance({
            Operator.O_I: ['state_out'],
            Operator.S: ['state_in']})

    while instance.reuse_instance():
        # F_INIT
        t_max = instance.get_setting('t_max', 'float')
        dt = instance.get_setting('dt', 'float')
        x_max = instance.get_setting('x_max', 'float')
        dx = instance.get_setting('dx', 'float')
        d = instance.get_setting('d', 'float')

        U = np.zeros(int(round(x_max / dx))) + 1e-20
        U[25] = 2.0
        U[50] = 2.0
        U[75] = 2.0
        Us = U

        t_cur = 0.0
        while t_cur + dt <= t_max:
            # O_I
            t_next = t_cur + dt
            if t_next + dt > t_max:
                t_next = None
            cur_state_msg = Message(t_cur, t_next, Grid(U, ['x']))
            instance.send('state_out', cur_state_msg)

            # S
            msg = instance.receive('state_in', default=cur_state_msg)
            if msg.timestamp > t_cur + dt:
                logger.warning('Received a message from the future!')
            np.copyto(U, msg.data.array)

            dU = np.zeros_like(U)
            dU[1:-1] = d * laplacian(U, dx) * dt
            dU[0] = dU[1]
            dU[-1] = dU[-2]

            U += dU
            Us = np.vstack((Us, U))
            t_cur += dt

        if 'DONTPLOT' not in os.environ:
            from matplotlib import pyplot as plt
            plt.figure()
            plt.imshow(
                    np.log(Us + 1e-20),
                    origin='upper',
                    extent=[
                        -0.5*dx, x_max - 0.5*dx,
                        (t_max - 0.5*dt) * 1000.0, -0.5*dt * 1000.0],
                    interpolation='none',
                    aspect='auto'
                    )
            cbar = plt.colorbar()
            cbar.set_label('log(Concentration)', rotation=270, labelpad=20)
            plt.xlabel('x')
            plt.ylabel('t (ms)')
            plt.title('Concentration over time')
            plt.show()


if __name__ == '__main__':
    diffusion()

Again, it’s exactly the same code as before, just split off into a separate file.

yMMSL files

In a distributed set-up, the manager and each instance run as separate programs, communicating via the network. To make this work, the manager is started first, with a yMMSL file that describes the simulation. The yMMSL file corresponding to our previous example looks like this:

docs/source/examples/reaction_diffusion.ymmsl
ymmsl_version: v0.1

model:
  name: reaction_diffusion
  components:
    macro: diffusion
    micro: reaction
  conduits:
    macro.state_out: micro.initial_state
    micro.final_state: macro.state_in

As you can see, this looks very much like the object representation. You can load a yMMSL file to objects using ymmsl.load and save it back using ymmsl.save.

Starting the manager

Every MUSCLE 3 simulation needs a manager. The manager helps the instances find each other, and distributes settings to them. To start the manager, run

(venv)$ muscle_manager reaction_diffusion.ymmsl

in an environment in which you’ve installed MUSCLE 3. The manager will start, and will print its location on the standard output. This is the network address on which it listens for connections from instances.

Starting instances

Starting an instance works just like starting any other Python script:

(venv)$ python ./reaction.py --muscle-instance=micro

In the original script, we made a dictionary that mapped compute element names to Python functions, which MUSCLE 3 used to start the submodel instances. Here, we need to pass the same information. Instead of a function name, we start a particular script, and we pass the instance name using the --muscle-instance= argument. Note that the argument parser is not very flexible, so it needs to be written exactly like that, and of course the instance name needs to match the compute element name in the yMMSL file passed to the manager.

When the instance starts, it will register with the manager. Since we didn’t pass a location using the --muscle-manager=<host:port> option, MUSCLE 3 tried to connect to the default location (localhost:9000), which the manager was conveniently listening on. The micro instance registered, and if you have a look at the muscle3_manager.log file produced by the manager (in the directory in which you started it), you should find a record of this there.

Now, we need to start the second submodel in the same way:

(venv)$ python ./diffusion.py --muscle-instance=macro

and then the simulation will run. Once it’s done, the instances disconnect and the manager shuts itself down automatically.