Simulation checkpoints
When you execute a long-running simulation, it can be very helpful to store the state of a simulation at certain intervals. For example, your simulation running on a HPC cluster may crash due to insufficient memory available. Instead of restarting this simulation from scratch, you could restart it – with an increased memory allocation – from a checkpoint, which would save a lot of compute time!
Checkpointing in distributed simulations is difficult. Fortunately, MUSCLE3 comes with built-in checkpointing support. This page describes in detail how to use the MUSCLE3 checkpointing API, how to specify checkpoints in the workflow configuration and how to resume a workflow.
In the User tutorial, you can read about the checkpointing concepts and how to use the API when running and resuming MUSCLE3 simulations. This is followed by a Developer tutorial, which explains how to add checkpointing capabilities to your MUSCLE3 component. Finally, the Checkpointing deep-dive describes in detail the (inner) working of checkpointing in MUSCLE3; though this level of detail is not required for general usage of the API.
Glossary
- Checkpoint
A checkpoint is a moment during the workflow where the user wants to have the state of the whole workflow stored.
- Snapshot
A snapshot is the stored state of an instance in the workflow.
- Workflow snapshot
A workflow snapshot is a collection of snapshots for all instances in the workflow, which can be resumed from. This means that the snapshots of every combination of peer instances must be consistent.
- Peer instances
Two instances that are connected by a Conduit.
User tutorial
This user tutorial explains all you need to know about checkpointing for running and resuming simulations. Some details are deliberately left out, though you can read all about those in the Developer tutorial or Checkpointing deep-dive.
Defining checkpoints
The first step for using checkpoints is to define checkpoints in your workflow. The checkpoint definitions are for your whole workflow, and you can specify them in yMMSL as in the following example:
checkpoints:
at_end: true
simulation_time:
- every: 10
start: 0
stop: 100
- every: 20
start: 100
wallclock_time:
- every: 3600
- at:
- 300
- 600
- 1800
Let’s break this down: the first element in this example checkpoints
definition is at_end
. When this is set to true
(as in the example), it
means that every instance in the workflow will create a snapshot just before the
workflow finishes. This set of snapshots can be used to resume a simulation near
the end and, for example, let it run for a longer time. Some caveats apply,
though, see Resuming from at_end snapshots for full details.
The other two items in the checkpoints
definition are the time-based
simulation time and
wallclock time. You can use two types of
rules to set checkpoint moments for these:
at
rules select specific moments. The example rule above request a checkpoint to be taken at 300, 600 and 1800 seconds after the start of the simulation. You can define multiple times in oneat
rule, but you may also add multipleat
rules. The following definitions are all equivalent:checkpoints: wallclock_time: - at: - 300 - 600 - 1800
checkpoints: wallclock_time: - at: [300, 600, 1800]
checkpoints: wallclock_time: - at: 300 - at: 600 - at: 1800
every
rules define a recurring set of checkpoints. In the simplest form you indicate the interval at which checkpoints should be taken – every hour in thewallclock_time
example above. You may optionally indicate astart
orstop
– as in thesimulation_time
example above.checkpoints: wallclock_time: every: 3600
checkpoints: simulation_time: - every: 10 start: 0 stop: 100 - every: 20 start: 100
checkpoints: simulation_time: - every: 1 - every: 0.25 start: 0 stop: 2
Note
When
stop
is specified, the stop time is included whenstop == start + n * every
, withn
a positive whole number. However, this might give surprising results due to the inaccuracies of floating point computations. Compare for example:checkpoints: simulation_time: - every: 1 start: 0 stop: 7
checkpoints: simulation_time: - every: 0.1 start: 0 stop: 0.7
Why the difference? Well - compare in python:
>>> 7 * 1.0 7.0 >>> 7 * 0.1 0.7000000000000001
Since
0.7000000000000001
is larger than0.7
, no checkpoint will be generated for this time.
See also
yMMSL documentation on Checkpoints
yMMSL API reference: ymmsl.Checkpoints
,
ymmsl.CheckpointAtRule
,
ymmsl.CheckpointRangeRule
.
Simulation time checkpoints
Checkpoints defined in the simulation_time
section are taken based on the
time inside your simulation. This will only work correctly if all components in
the simulation have a shared concept of time, which only increases during the
simulation. This should be no problem for physics-based simulations, though it
does require that the instances make correct use of the timestamp in
MUSCLE3 messages. When this requirement is fulfilled,
checkpoints based on simulation time are the most reliable way to checkpoint
your workflow.
MUSCLE3 does not interpret or convert the units that you configure in the checkpoints. The units are the same as the components in the simulation use for the timestamps in the messages. Typically this will be in SI seconds, but components may deviate from this standard. MUSCLE3 assumes that all components in the workflow use the same time units in the interfaces to libmuscle.
Note
MUSCLE3 does not assume anything about the start time of a simulation. Your
simulation time may start at any value, even negative! Therefore,
checkpoint ranges include 0 and negative
numbers when no start
value is provided.
Because MUSCLE3 does not know what internal time your simulation starts on,
an every
rule without a start
value will always trigger a checkpoint
at the first possible moment in the simulation. You should supply a
start
value if you do not want this to happen.
Wallclock time checkpoints
Checkpoints defined in the wallclock_time
section are taken based on the
elapsed wallclock time of your simulation (also known as elapsed real time).
Each component in the simulation will make a snapshot at the earliest possible
moment after a checkpoint is passed.
The checkpoint times in the configuration are interpreted as seconds since the
initialization of muscle_manager
.
Warning
Wallclock time checkpoint definitions are (currently) not a reliable way to create workflow snapshots. While each instance in the simulation will create a snapshot when requested, there is no guarantee that all snapshots are consistent.
When a simulation has relatively simple coupling between components, checkpointing based on wallclock time usually works fine.
However for co-simulation (the interact coupling type) and more complex coupling, it is likely that not all checkpoints lead to a consistent workflow snapshot.
If you intend to use wallclock time checkpoints and find that you often don’t get a consistent workflow snapshot, you may try the following workaround: instead of requesting a wallclock time checkpoint at (for example) 600 seconds, you can specify checkpoints at 600, 601, 602, 603, 604 and 605 seconds. The “right” interval to use will depend on the typical compute times of your components and coupling in the simulation.
Running a simulation with checkpoints
Starting a simulation with checkpoints is no different than starting one
without. You need to start the muscle_manager
with the configuration yMMSL
file (or files), as well as the individual components (or let muscle_manager
start them for you with the --start-all
flag). The sole difference is that
the yMMSL configuration must contain a checkpoints section.
When muscle_manager
is started with checkpoints configured, a couple of
things change. First, all of the component implementations must support
checkpointing: the simulation will stop with an error if this is not the case.
The simulation may also stop with an error if there is an issue in the
checkpointing implementation of any of the components.
Second, all components are instructed to make snapshots according to the
configured checkpoints. muscle_manager
keeps track of all created snapshots
during the simulation, looking for workflow snapshots. When a workflow snapshot is detected, muscle_manager
writes a
yMMSL file that can be used to resume the simulation.
During the simulation, all of the created snapshots are stored on the file
system. See the table below for the directories where MUSCLE3 stores the files.
Note: a run-directory is automatically created when using the --start-all
flag for muscle_manager
. You may also specify a custom run directory through
the --run-dir DIRECTORY
option. When you do not provide a run directory, the
last column in the table below indicates where snapshots are stored.
Snapshot type |
Run directory provided |
No run directory provided |
---|---|---|
Workflow |
|
Working directory of |
Instance |
with |
Working directory of the instance |
Note
When running a distributed simulation on multiple compute nodes, MUSCLE3 assumes that the run directory is accessible to all nodes (i.e. on a shared or distributed file system). This is usually the case on HPC clusters.
Example: running the reaction-diffusion model with checkpoints
The reaction-diffusion example model from the Tutorial with Python also
has a variant with checkpointing enabled. To run this yourself, navigate in a
command line prompt to the docs/source/examples
folder in the MUSCLE3 git
repository. Then execute the following command:
$ mkdir run_rd_example
$ muscle_manager --start-all --run-dir run_rd_example rd_implementations.ymmsl rd_checkpoints_python.ymmsl rd_settings.ymmsl
Note
You may get an error File 'rd_implementations.ymmsl' does not exist.
To
fix this, you need to build the examples in the MUSCLE3 source; in the root
of the git repository, execute:
$ make test_examples
The above command runs the muscle_manager
and starts all components (the
reaction model and the diffusion model). The rd_checkpoints_python.ymmsl
file
contains the checkpoint definitions used in this example:
checkpoints:
simulation_time:
- every: 2.0e-05
MUSCLE3 will create the run directory run_rd_example
for you. In it you’ll
find the instance snapshots in instances/macro/snapshots
and
instances/micro/snapshots
. The workflow snapshots are stored in the
snapshots
folder in the run directory.
Resuming a simulation
You can resume a simulation from a workflow snapshot stored in a previous run of the simulation. This works by appending a workflow snapshot yMMSL file from a previous run to the regular yMMSL configuration. If you started your original simulation with:
$ muscle_manager --run-dir ./run1 configuration.ymmsl
You can resume it from a snapshot of this run like so:
$ muscle_manager --run-dir ./run2 configuration.ymmsl ./run1/snapshots/snapshot_20221202_112840.ymmsl
Here we choose a different run directory, and resume from the snapshot file
snapshot_20221202_112840.ymmsl
that was produced by the first run. This file
contains the information required to resume the workflow:
It contains a
description
which allows you to inspect metadata of the workflow snapshot. It indicates the trigger or triggers leading to this snapshot, and some information of the state of each component in the workflow. This data is for informational purposes only, and ignored bymuscle_manager
.It also contains the paths to the snapshots that each instance needs to resume. Note that these snapshots must still exist on the same location. If you move or delete them (or a parent directory), resuming your simulation will fail with an error message:
Unable to load snapshot: <snapshot filename> is not a file. Please ensure this path exists and can be read.
Example: resuming the reaction-diffusion model
To resume the reaction-diffusion model from a snapshot created in the
previous section, replace <date>
and <time>
in the following command to
point to the snapshot you want to resume from.
$ mkdir run_rd_resume
$ muscle_manager --start-all --run-dir run_rd_resume rd_implementations.ymmsl rd_checkpoints_python.ymmsl rd_settings.ymmsl run_rd_example/snapshots/snapshot_<date>_<time>.ymmsl
When the command completes you can see the output in the new working directory
run_rd_resume
.
Making changes to your simulation
MUSCLE3 checkpointing is designed for resuming simulations as if they never stopped. This means that resuming is only supported for consistent snapshots and for simulation configurations that have not changed.
MUSCLE3 does not support any changes to the model when resuming, such as adding or removing components, or changing conduits. Attempting this will likely lead to deadlocks or error messages.
You are allowed to change the settings of your simulation when resuming. However, it depends on the implementation of your components if and when changed settings take effect. Please ask the developers of your simulation components for this information.
Resuming from at_end snapshots
Warning
Resuming from an at_end
snapshot only will immediately complete.
Snapshot consistency
MUSCLE3 checkpointing was designed for consistency: no messages between the components must be lost when restarting. When we fulfill this criterium, a simulation can resume from a checkpoint as if it was never interrupted.
During a simulation run, each component creates snapshots independently from all other components. For Simulation time checkpoints, the MUSCLE3 checkpointing algorithm is guaranteed to give consistent workflow snapshots when all components adhere to the Multiscale Modeling and Simulation Framework (MMSF).
Wallclock time checkpoints in the currrent implementation are less
reliable: components may take snapshots while messages are still in transit.
When that happens an inconsistent state is produced and no workflow snapshots
are written by muscle_manager
.
MUSCLE3 does not support combining inconsistent snapshots, so it is not possible to freely mix snapshots produced during a simulation. When resuming, MUSCLE3 checks the consistency of all snapshots. The run will end with an error when an inconsistent state is detected:
Received message on <port> with unexpected message number <num>. Was
expecting <num>. Are you resuming from an inconsistent snapshot?
When resuming from a snapshot yMMSL written by muscle_manager
, you should not encounter this
error.
Troubleshooting
General troubleshooting strategy:
First try to find the root cause of the problem that your simulation ran into. You can start by looking in the log file of the
muscle_manager
, located in<run directory>/muscle3_manager.log
. This log file may show the error message or point you in the right direction.If the
muscle_manager
log did not display an error, it may indicate which component failed first. Have a look at the logs of that component to figure out what went wrong. The output of an instance is usually found in<run directory>/instances/<instance name>/
. Openstdout.txt
andstderr.txt
to find out what went wrong.If the
muscle_manager
logs did not point to a specific instance, you should have a look at the log files of each instance (see point 2 for instructions). Note that some instances may logBroken Pipe
errors – this usually happens when a peer component has crashed and it is typically not the root cause of your simulation crash.
Once you find the root cause of your problem, check the list below for common issues and their resolutions. You may also have found a bug in MUSCLE3: please help us and your fellow MUSCLE3 users by creating an issue on GitHub.
- The simulation crashes when using checkpoints.
The first thing you should check is: does the simulation run error-free when checkpoints are disabled? You can test this by commenting the checkpointing section of your input ymmsl file(s).
If it runs error-free without checkpoints, have a look at the error message in the log file generated by your run. MUSCLE3 attempts to have clear error messages to explain what went wrong and give you pointers to a solution.
When the error message indicates a problem with the implementation of the checkpointing API, please check with the developer of the component to fix this. If you are the developer of the component, please see the Developer tutorial section for additional resources.
- The simulation crashes when resuming.
Some common causes for this are:
The snapshot files that the instances are resuming from no longer exist. This could for example happen when a previous run directory has been moved or deleted. For distributed execution, some compute nodes may not be able to access the directories where the instance snapshots are stored. See also Resuming a simulation.
Your simulation configuration has incompatible changes compared to the original simulation that the snapshots were from. See Making changes to your simulation. Luckily, MUSCLE3 stores the previous simulation configuration in the run directory. If the snapshot that your resume from is stored in
run1/snapshots/snapshot_xyz.ymmsl
, then you can find that configuration inrun1/configuration.ymmsl
. Try resuming with that configuration first to see if this is the real problem:$ muscle_manager --run-dir run2 run1/configuration.ymmsl run1/snapshots/snapshot_xyz.ymmsl
One of your components has a bug that is triggered when resuming from a previous snapshot, or perhaps your snapshot belonged to a different version of the component. Please ask your component developer(s) for help.
Developer tutorial
This developer tutorial explains all you need to know about implementing checkpointing in your MUSCLE3 simulation component. If you’re not a developer and want to learn how to define checkpoints and resume simulations, please have a look at the User tutorial.
Some details are deliberately left out in this developer tutorial, though you can read all about those in the Checkpointing deep-dive.
Start situation: components without checkpointing
In this tutorial we will add checkpointing to the reaction and diffusion components from the Python, C++ and Fortran tutorials.
Additionally, we will do the same for a generic MUSCLE3 component template. These templates illustrate the structure of a MUSCLE3 component, but they are not complete and cannot be executed.
import logging
from libmuscle import Grid, Instance, Message
from ymmsl import Operator
def reaction() -> None:
"""A simple exponential reaction model on a 1D grid.
"""
instance = Instance({
Operator.F_INIT: ['initial_state'], # 1D Grid
Operator.O_F: ['final_state']}) # 1D Grid
while instance.reuse_instance():
# F_INIT
t_max = instance.get_setting('t_max', 'float')
dt = instance.get_setting('dt', 'float')
k = instance.get_setting('k', 'float')
msg = instance.receive('initial_state')
U = msg.data.array.copy()
t_cur = msg.timestamp
while t_cur + dt < msg.timestamp + t_max:
# O_I
# S
U += k * U * dt
t_cur += dt
# O_F
instance.send('final_state', Message(t_cur, data=Grid(U, ['x'])))
if __name__ == '__main__':
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
reaction()
#include <cstdlib>
#include <vector>
#include <libmuscle/libmuscle.hpp>
#include <ymmsl/ymmsl.hpp>
using libmuscle::Data;
using libmuscle::DataConstRef;
using libmuscle::Instance;
using libmuscle::Message;
using ymmsl::Operator;
/** A simple exponential reaction model on a 1D grid.
*/
void reaction(int argc, char * argv[]) {
Instance instance(argc, argv, {
{Operator::F_INIT, {"initial_state"}}, // 1D Grid
{Operator::O_F, {"final_state"}}}); // 1D Grid
while (instance.reuse_instance()) {
// F_INIT
double t_max = instance.get_setting_as<double>("t_max");
double dt = instance.get_setting_as<double>("dt");
double k = instance.get_setting_as<double>("k");
auto msg = instance.receive("initial_state");
auto data_ptr = msg.data().elements<double>();
std::vector<double> U(data_ptr, data_ptr + msg.data().size());
double t_cur = msg.timestamp();
while (t_cur + dt < msg.timestamp() + t_max) {
// O_I
// S
for (double & u : U)
u += k * u * dt;
t_cur += dt;
}
// O_F
auto result = Data::grid(U.data(), {U.size()}, {"x"});
instance.send("final_state", Message(t_cur, result));
}
}
int main(int argc, char * argv[]) {
reaction(argc, argv);
return EXIT_SUCCESS;
}
program reaction
use ymmsl
use libmuscle
implicit none
type(LIBMUSCLE_PortsDescription) :: ports
type(LIBMUSCLE_Instance) :: instance
type(LIBMUSCLE_Message) :: rmsg
type(LIBMUSCLE_DataConstRef) :: rdata, item
type(LIBMUSCLE_Message) :: smsg
type(LIBMUSCLE_Data) :: sdata
real (selected_real_kind(15)) :: t_cur, t_max, dt, k
real (selected_real_kind(15)), dimension(:), allocatable :: U
ports = LIBMUSCLE_PortsDescription()
call ports%add(YMMSL_Operator_F_INIT, 'initial_state')
call ports%add(YMMSL_Operator_O_F, 'final_state')
instance = LIBMUSCLE_Instance(ports)
call LIBMUSCLE_PortsDescription_free(ports)
do while (instance%reuse_instance())
! F_INIT
t_max = instance%get_setting_as_real8('t_max')
dt = instance%get_setting_as_real8('dt')
k = instance%get_setting_as_real8('k')
rmsg = instance%receive('initial_state')
rdata = rmsg%get_data()
allocate (U(rdata%size()))
call rdata%elements(U)
call LIBMUSCLE_DataConstRef_free(rdata)
t_cur = rmsg%timestamp()
t_max = rmsg%timestamp() + t_max
call LIBMUSCLE_Message_free(rmsg)
do while (t_cur + dt < t_max)
! O_I
! S
U = k * U * dt
t_cur = t_cur + dt
end do
! O_F
sdata = LIBMUSCLE_Data_create_grid(U, 'x')
smsg = LIBMUSCLE_Message(t_cur, sdata)
call instance%send('final_state', smsg)
call LIBMUSCLE_Message_free(smsg)
call LIBMUSCLE_Data_free(sdata)
deallocate (U)
end do
call LIBMUSCLE_Instance_free(instance)
end program reaction
import logging
import os
import numpy as np
from libmuscle import Grid, Instance, Message
from ymmsl import Operator
def laplacian(Z: np.array, dx: float) -> np.array:
"""Calculates the Laplacian of vector Z.
Args:
Z: A vector representing a series of samples along a line.
dx: The spacing between the samples.
Returns:
The second spatial derivative of Z.
"""
Zleft = Z[:-2]
Zright = Z[2:]
Zcenter = Z[1:-1]
return (Zleft + Zright - 2. * Zcenter) / dx**2
def diffusion() -> None:
"""A simple diffusion model on a 1d grid.
The state of this model is a 1D grid of concentrations. It sends
out the state on each timestep on `state_out`, and can receive an
updated state on `state_in` at each state update.
"""
logger = logging.getLogger()
instance = Instance({
Operator.O_I: ['state_out'],
Operator.S: ['state_in'],
Operator.O_F: ['final_state_out']})
while instance.reuse_instance():
# F_INIT
t_max = instance.get_setting('t_max', 'float')
dt = instance.get_setting('dt', 'float')
x_max = instance.get_setting('x_max', 'float')
dx = instance.get_setting('dx', 'float')
d = instance.get_setting('d', 'float')
U = np.zeros(int(round(x_max / dx))) + 1e-20
U[25] = 2.0
U[50] = 2.0
U[75] = 2.0
Us = U
t_cur = 0.0
while t_cur + dt <= t_max:
# O_I
t_next = t_cur + dt
if t_next + dt > t_max:
t_next = None
cur_state_msg = Message(t_cur, t_next, Grid(U, ['x']))
instance.send('state_out', cur_state_msg)
# S
msg = instance.receive('state_in', default=cur_state_msg)
if msg.timestamp > t_cur + dt:
logger.warning('Received a message from the future!')
np.copyto(U, msg.data.array)
dU = np.zeros_like(U)
dU[1:-1] = d * laplacian(U, dx) * dt
dU[0] = dU[1]
dU[-1] = dU[-2]
U += dU
Us = np.vstack((Us, U))
t_cur += dt
# O_F
final_state_msg = Message(t_cur, data=Grid(U, ['x']))
instance.send('final_state_out', final_state_msg)
if 'DONTPLOT' not in os.environ and 'SLURM_NODENAME' not in os.environ:
from matplotlib import pyplot as plt
plt.figure()
plt.imshow(
np.log(Us + 1e-20),
origin='upper',
extent=[
-0.5*dx, x_max - 0.5*dx,
(t_max - 0.5*dt) * 1000.0, -0.5*dt * 1000.0],
interpolation='none',
aspect='auto'
)
cbar = plt.colorbar()
cbar.set_label('log(Concentration)', rotation=270, labelpad=20)
plt.xlabel('x')
plt.ylabel('t (ms)')
plt.title('Concentration over time')
plt.show()
if __name__ == '__main__':
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
diffusion()
#include <cmath>
#include <cstdlib>
#include <iostream>
#include <ostream>
#include <vector>
#include <libmuscle/libmuscle.hpp>
#include <ymmsl/ymmsl.hpp>
using libmuscle::Data;
using libmuscle::Instance;
using libmuscle::Message;
using ymmsl::Operator;
/* Calculates the Laplacian of vector Z.
*
* @param Z A vector representing a series of samples along a line.
* @param dx The spacing between the samples.
*/
std::vector<double> laplacian(std::vector<double> const & Z, double dx) {
std::vector<double> result(Z.size() - 2);
for (std::size_t i = 0u; i < result.size(); ++i)
result[i] = (Z[i] + Z[i+2] - 2.0 * Z[i+1]) / (dx * dx);
return result;
}
/** A simple diffusion model on a 1d grid.
*
* The state of this model is a 1D grid of concentrations. It sends out the
* state on each timestep on 'state_out', and can receive an updated state
* on 'state_in' at each state update.
*/
void diffusion(int argc, char * argv[]) {
Instance instance(argc, argv, {
{Operator::O_I, {"state_out"}},
{Operator::S, {"state_in"}},
{Operator::O_F, {"final_state_out"}}});
while (instance.reuse_instance()) {
// F_INIT
double t_max = instance.get_setting_as<double>("t_max");
double dt = instance.get_setting_as<double>("dt");
double x_max = instance.get_setting_as<double>("x_max");
double dx = instance.get_setting_as<double>("dx");
double d = instance.get_setting_as<double>("d");
std::vector<double> U(lrint(x_max / dx), 1e-20);
U[25] = 2.0;
U[50] = 2.0;
U[75] = 2.0;
std::vector<std::vector<double>> Us;
Us.push_back(U);
double t_cur = 0.0;
while (t_cur + dt <= t_max) {
std::cerr << "t_cur: " << t_cur << ", t_max: " << t_max << std::endl;
// O_I
auto data = Data::grid(U.data(), {U.size()}, {"x"});
Message cur_state_msg(t_cur, data);
double t_next = t_cur + dt;
if (t_next + dt <= t_max)
cur_state_msg.set_next_timestamp(t_next);
instance.send("state_out", cur_state_msg);
// S
auto msg = instance.receive("state_in", cur_state_msg);
if (msg.data().shape().size() != 1u || msg.data().size() != U.size()) {
auto msg = "Received state of incorrect shape or size!";
instance.error_shutdown(msg);
throw std::runtime_error(msg);
}
std::copy_n(msg.data().elements<double>(), msg.data().size(), U.begin());
std::vector<double> dU(U.size());
auto lpl = laplacian(U, dx);
for (std::size_t i = 1u; i < dU.size() - 1; ++i)
dU[i] = d * lpl[i-1] * dt;
dU[0] = dU[1];
dU[dU.size() - 1] = dU[dU.size() - 2];
for (std::size_t i = 0u; i < dU.size(); ++i)
U[i] += dU[i];
Us.push_back(U);
t_cur += dt;
}
// O_F
auto data = Data::grid(U.data(), {U.size()}, {"x"});
instance.send("final_state_out", Message(t_cur, data));
std::cerr << "All done" << std::endl;
}
}
int main(int argc, char * argv[]) {
diffusion(argc, argv);
return EXIT_SUCCESS;
}
! A simple diffusion model on a 1D grid.
!
! The state of this model is a 1D grid of concentrations. It sends out the
! state on each timestep on 'state_out', and can receive an updated state
! on 'state_in' at each state update.
program diffusion
use ymmsl
use libmuscle
implicit none
type(LIBMUSCLE_PortsDescription) :: ports
type(LIBMUSCLE_Instance) :: instance
type(LIBMUSCLE_Message) :: rmsg
type(LIBMUSCLE_DataConstRef) :: rdata, item
type(LIBMUSCLE_Message) :: smsg
type(LIBMUSCLE_Data) :: sdata
real (selected_real_kind(15)) :: t_cur, t_next, t_max, dt, x_max, dx, d
integer (LIBMUSCLE_size) :: U_size, n_steps, iteration
real (selected_real_kind(15)), dimension(:), allocatable :: U, dU
real (selected_real_kind(15)), dimension(:, :), allocatable :: Us
ports = LIBMUSCLE_PortsDescription()
call ports%add(YMMSL_Operator_O_I, 'state_out')
call ports%add(YMMSL_Operator_S, 'state_in')
call ports%add(YMMSL_Operator_O_F, 'final_state_out')
instance = LIBMUSCLE_Instance(ports)
call LIBMUSCLE_PortsDescription_free(ports)
do while (instance%reuse_instance())
! F_INIT
t_max = instance%get_setting_as_real8('t_max')
dt = instance%get_setting_as_real8('dt')
x_max = instance%get_setting_as_real8('x_max')
dx = instance%get_setting_as_real8('dx')
d = instance%get_setting_as_real8('d')
U_size = nint(x_max / dx)
allocate (U(U_size), dU(U_size))
U = 1e-20
U(26) = 2.0
U(51) = 2.0
U(76) = 2.0
n_steps = int(t_max / dt)
allocate (Us(U_size, n_steps))
iteration = 1
Us(:, iteration) = U
t_cur = 0.0
do while (t_cur + dt < t_max .and. iteration < n_steps - 1)
print *, 't_cur: ', t_cur, 't_max: ', t_max
! O_I
sdata = LIBMUSCLE_Data_create_grid(U, 'x')
smsg = LIBMUSCLE_Message(t_cur, sdata)
call LIBMUSCLE_Data_free(sdata)
t_next = t_cur + dt
if (t_next + dt <= t_max) then
call smsg%set_next_timestamp(t_next)
end if
call instance%send('state_out', smsg)
! S
rmsg = instance%receive('state_in', smsg)
rdata = rmsg%get_data()
call rdata%elements(U)
call LIBMUSCLE_DataConstRef_free(rdata)
call LIBMUSCLE_Message_free(rmsg)
call LIBMUSCLE_Message_free(smsg)
dU(2:U_size-1) = d * laplacian(U, dx) * dt
dU(1) = dU(2)
dU(U_size) = dU(U_size - 1)
U = U + dU
iteration = iteration + 1
Us(:, iteration) = U
t_cur = t_cur + dt
end do
! O_F
sdata = LIBMUSCLE_Data_create_grid(U, 'x')
smsg = LIBMUSCLE_Message(t_cur, sdata)
call instance%send('final_state_out', smsg)
call LIBMUSCLE_Message_free(smsg)
call LIBMUSCLE_Data_free(sdata)
deallocate (U, dU, Us)
print *, 'All done'
end do
call LIBMUSCLE_Instance_free(instance)
contains
! Calculates the Laplacian of vector Z.
!
! @param Z A vector representing a series of samples along a line.
! @param dx The spacing between the samples.
function laplacian(Z, dx)
real (selected_real_kind(15)), dimension(:), intent(in) :: Z
real (selected_real_kind(15)), intent(in) :: dx
real (selected_real_kind(15)), allocatable, dimension(:) :: laplacian
integer :: n
n = size(Z)
allocate(laplacian(size(Z) - 2))
laplacian = (Z(1:n-2) + Z(3:n) - 2.0d0 * Z(2:n-1)) / (dx * dx)
end function laplacian
end program diffusion
"""MUSCLE3 Python component template.
Note that this template is not executable as is, please have a look at the
examples in ``docs/source/examples`` to see working components."""
from libmuscle import Instance, Message
from ymmsl import Operator
instance = Instance({
Operator.F_INIT: ["F_INIT_Port"],
Operator.O_I: ["O_I_Port"],
Operator.S: ["S_Port"],
Operator.O_F: ["O_F_Port"]})
while instance.reuse_instance():
# F_INIT
setting = instance.get_setting("setting")
...
instance.receive("F_INIT_Port")
...
while t_cur <= t_max:
# O_I
t_next = t + dt
if t_next > t_max:
t_next = None
instance.send("O_I_Port", Message(t_cur, t_next, data))
...
# S
instance.receive("S_Port")
...
t_cur += dt
# O_F
instance.send("O_F_Port", Message(t_cur, None, data))
#include <cstdlib>
#include <vector>
#include <libmuscle/libmuscle.hpp>
#include <ymmsl/ymmsl.hpp>
using libmuscle::Instance;
using libmuscle::Message;
using ymmsl::Operator;
/**
* MUSCLE3 C++ component template.
*
* Note that this template is not executable as is, please have a look at the
* examples in ``docs/source/examples`` to see working components.
*/
int main(int argc, char * argv[]) {
Instance instance(argc, argv, {
{Operator::F_INIT, {"F_INIT_Port"}},
{Operator::O_I, {"O_I_Port"}},
{Operator::S, {"S_Port"}},
{Operator::O_F, {"O_F_Port"}}});
while (instance.reuse_instance()) {
// F_INIT
auto setting = instance.get_setting("setting");
// ...
instance.receive("F_INIT_Port");
// ...
while (t_cur <= t_max) {
// O_I
Message msg(t_cur, data);
if(t_cur + dt <= t_max)
msg.set_next_timestamp(t_cur + dt);
instance.send("O_I_Port", msg);
// ...
// S
instance.receive("S_Port");
// ...
t_cur += dt;
}
// O_F
instance.send("final_state", Message(t_cur, data));
}
return EXIT_SUCCESS;
}
program instance
! MUSCLE3 Fortran component template.
!
! Note that this template is not executable as is, please have a look at the
! examples in ``docs/source/examples`` to see working components.
use ymmsl
use libmuscle
implicit none
type(LIBMUSCLE_PortsDescription) :: ports
type(LIBMUSCLE_Instance) :: instance
type(LIBMUSCLE_Message) :: rmsg, smsg
type(LIBMUSCLE_Data) :: data
real (selected_real_kind(15)) :: t_cur, t_max, dt
ports = LIBMUSCLE_PortsDescription()
call ports%add(YMMSL_Operator_F_INIT, 'F_INIT_Port')
call ports%add(YMMSL_Operator_O_I, 'O_I_Port')
call ports%add(YMMSL_Operator_S, 'S_Port')
call ports%add(YMMSL_Operator_O_F, 'O_F_Port')
instance = LIBMUSCLE_Instance(ports)
call LIBMUSCLE_PortsDescription_free(ports)
do while (instance%reuse_instance())
! F_INIT
setting = instance%get_setting_as_real8('setting')
! ...
rmsg = instance%receive('F_INIT')
call LIBMUSCLE_Message_free(rmsg)
! ...
do while (t_cur <= t_max)
! O_I
smsg = LIBMUSCLE_Message(t_cur, data)
if (t_cur + dt <= t_max) then
call smsg%set_next_timestamp(t_cur + dt)
end if
call instance%send('O_I', smsg)
call LIBMUSCLE_Message_free(smsg)
! ...
! S
rmsg = instance%receive('S')
call LIBMUSCLE_Message_free(rmsg)
! ...
t_cur = t_cur + dt
end do
! O_F
smsg = LIBMUSCLE_Message(t_cur, data)
call instance%send('final_state', smsg)
call LIBMUSCLE_Message_free(smsg)
! ...
end do
call LIBMUSCLE_Instance_free(instance)
end program reaction
Step 1: Set USES_CHECKPOINT_API
on instance creation
As first step, you need to indicate that you intend to use the checkpoint API.
You do this through the USES_CHECKPOINT_API
flag when
creating the instance:
from libmuscle import Instance, USES_CHECKPOINT_API
...
ports = ...
instance = Instance(ports, USES_CHECKPOINT_API)
See also
API documentation for
USES_CHECKPOINT_API
.
#include <libmuscle/libmuscle.hpp>
#include <ymmsl/ymmsl.hpp>
using libmuscle::PortsDescription;
using libmuscle::Instance;
using libmuscle::InstanceFlags;
...
int main(int argc, char * argv[]) {
PortsDescription ports = ...;
Instance instance(argc, argv, ports, InstanceFlags::USES_CHECKPOINT_API);
...
}
See also
API documentation for
USES_CHECKPOINT_API
.
use ymmsl
use libmuscle
type(LIBMUSCLE_PortsDescription) :: ports
type(LIBMUSCLE_Instance) :: instance
ports = ...
instance = LIBMUSCLE_Instance_create( &
ports, LIBMUSCLE_InstanceFlags(USES_CHECKPOINT_API=.true.))
See also
API documentation for LIBMUSCLE_InstanceFlags()
.
If you do not set this flag, you’ll get a runtime error when trying to use any of the checkpointing API calls on the Instance object.
Step 2: Implement checkpoint hooks
The first step in implementing the checkpointing API is implementing the checkpoint hooks. These are the points where your component can make checkpoints:
-
Intermediate snapshots are taken inside the reuse-loop, immediately after the
S
Operator of your component. -
Final snapshots are taken at the end of the reuse-loop, after the
O_F
Operator of your component.
Intermediate snapshots
Intermediate snapshots are taken inside the reuse-loop, immediately after
the S
Operator of your component.
Taking intermediate snapshots is optional. However, we recommend implementing intermediate snapshots when any of the following points holds for your component:
Your component has a loop containing
O_I
andS
, and you communicate during OperatorO_I
or OperatorS
.Implementing intermediate checkpointing allows submodels connected to your component to also create checkpoints.
Warning
If you do not implement intermediate checkpoints in this case, then it is likely that a large amount of user-provided checkpoints will not lead to consistent workflow snapshots. Please implement intermediate snapshots to give the users of your component a good checkpointing experience.
There is no communication during
O_I
andS
, but the state updateS
is executed in a (time-integration) loop which takes a relatively long time.In this case, intermediate checkpointing allows users to create checkpoints of your component during long-running computations.
In all other cases, there usually is little or no added value in implementing intermediate snapshots in addition to Final snapshots.
You implement taking intermediate snapshots as follows:
Find out where in your code to implement the checkpointing calls. Typically there is a state update loop (e.g. a
while
orfor
loop) in a component. You should implement the checkpointing calls at the end of this state update loop. In this way, your code can resume immediately at the begin of that loop. This allows for consistent restarts with the least amount of code.Ask libmuscle if you need to store your state and create an intermediate snapshot with the API call
should_save_snapshot(t)
. You must provide the current timet
in your simulation, such that MUSCLE3 can determine if Simulation time checkpoints are triggered.Collect the state that you need to store.
Create a
libmuscle.Message
object to put your state in.Store the snapshot Message with the API call
save_snapshot(message)
.
See Example: implemented checkpoint hooks for example implementations in the reaction-diffusion models and the component template.
See also
Python API documentation:
should_save_snapshot()
,save_snapshot()
.C++ API documentation:
should_save_snapshot()
,save_snapshot()
.Fortran API documentation:
LIBMUSCLE_Instance_should_save_snapshot()
,LIBMUSCLE_Instance_save_snapshot()
.
Final snapshots
Final snapshots must be implemented by all components supporting checkpointing. You implement taking a final snapshot as follows:
You must implement the checkpoint calls at the end of the reuse loop.
Ask libmuscle if you need to store your state and create a final snapshot with the API call
should_save_final_checkpoint()
. Contrary to the intermediate checkpoints, this call may block to determine if a checkpoint is needed (this is also the reason it must happen at the end of the reuse loop).Collect the state that you need to store.
Create a
libmuscle.Message
object to put your state in.Store the snapshot Message with the API call
save_final_snapshot(message)
.
See Example: implemented checkpoint hooks for example implementations in the reaction-diffusion models and the component template.
See also
Python API documentation:
should_save_final_snapshot()
,save_final_snapshot()
.C++ API documentation:
should_save_final_snapshot()
,save_final_snapshot()
.Fortran API documentation:
LIBMUSCLE_Instance_should_save_final_snapshot()
,LIBMUSCLE_Instance_save_final_snapshot()
.
Example: implemented checkpoint hooks
Note that below examples only shows the changes compared to the start situation. You can view the full contents of the files in the git repository.
Intermediate snapshots
The state we need to store consists of three parts: the current U
,
the current time t_cur
and the end-time for the time integration
t_stop
. The current time is stored as the timestamp
attribute of
the Message
object. The rest is stored in Message.data
.
Final snapshots
For the final snapshot there is no state that is required for resuming.
The complete state will be received with the next message on the
initial_state
port.
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/python/reaction.py
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/tutorial_code/checkpointing_reaction_partial.py
@@ -1,6 +1,6 @@
import logging
-from libmuscle import Grid, Instance, Message
+from libmuscle import Grid, Instance, Message, USES_CHECKPOINT_API
from ymmsl import Operator
@@ -9,7 +9,8 @@
"""
instance = Instance({
Operator.F_INIT: ['initial_state'], # 1D Grid
- Operator.O_F: ['final_state']}) # 1D Grid
+ Operator.O_F: ['final_state']}, # 1D Grid
+ USES_CHECKPOINT_API)
while instance.reuse_instance():
# F_INIT
@@ -21,15 +22,24 @@
U = msg.data.array.copy()
t_cur = msg.timestamp
- while t_cur + dt < msg.timestamp + t_max:
+ t_stop = msg.timestamp + t_max
+ while t_cur + dt < t_stop:
# O_I
# S
U += k * U * dt
t_cur += dt
+ if instance.should_save_snapshot(t_cur):
+ msg = Message(t_cur, data=[Grid(U, ['x']), t_stop])
+ instance.save_snapshot(msg)
+
# O_F
instance.send('final_state', Message(t_cur, data=Grid(U, ['x'])))
+
+ if instance.should_save_final_snapshot():
+ msg = Message(t_cur)
+ instance.save_final_snapshot(msg)
if __name__ == '__main__':
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/cpp/reaction.cpp
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/tutorial_code/checkpointing_reaction_partial.cpp
@@ -8,6 +8,7 @@
using libmuscle::Data;
using libmuscle::DataConstRef;
using libmuscle::Instance;
+using libmuscle::InstanceFlags;
using libmuscle::Message;
using ymmsl::Operator;
@@ -17,7 +18,8 @@
void reaction(int argc, char * argv[]) {
Instance instance(argc, argv, {
{Operator::F_INIT, {"initial_state"}}, // 1D Grid
- {Operator::O_F, {"final_state"}}}); // 1D Grid
+ {Operator::O_F, {"final_state"}}}, // 1D Grid
+ InstanceFlags::USES_CHECKPOINT_API);
while (instance.reuse_instance()) {
@@ -31,18 +33,30 @@
std::vector<double> U(data_ptr, data_ptr + msg.data().size());
double t_cur = msg.timestamp();
- while (t_cur + dt < msg.timestamp() + t_max) {
+ double t_stop = msg.timestamp() + t_max;
+ while (t_cur + dt < t_stop) {
// O_I
// S
for (double & u : U)
u += k * u * dt;
t_cur += dt;
+
+ if (instance.should_save_snapshot(t_cur)) {
+ Message msg(t_cur,
+ Data::list(Data::grid(U.data(), {U.size()}, {"x"}), t_stop));
+ instance.save_snapshot(msg);
+ }
}
// O_F
auto result = Data::grid(U.data(), {U.size()}, {"x"});
instance.send("final_state", Message(t_cur, result));
+
+ if (instance.should_save_final_snapshot()) {
+ Message msg(t_cur);
+ instance.save_final_snapshot(msg);
+ }
}
}
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/fortran/reaction.f90
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/tutorial_code/checkpointing_reaction_partial.f90
@@ -11,7 +11,7 @@
type(LIBMUSCLE_Message) :: smsg
- type(LIBMUSCLE_Data) :: sdata
+ type(LIBMUSCLE_Data) :: sdata, sitem
real (selected_real_kind(15)) :: t_cur, t_max, dt, k
real (selected_real_kind(15)), dimension(:), allocatable :: U
@@ -20,7 +20,8 @@
ports = LIBMUSCLE_PortsDescription()
call ports%add(YMMSL_Operator_F_INIT, 'initial_state')
call ports%add(YMMSL_Operator_O_F, 'final_state')
- instance = LIBMUSCLE_Instance(ports)
+ instance = LIBMUSCLE_Instance(ports, &
+ LIBMUSCLE_InstanceFlags(USES_CHECKPOINT_API=.true.))
call LIBMUSCLE_PortsDescription_free(ports)
do while (instance%reuse_instance())
@@ -45,6 +46,18 @@
! S
U = k * U * dt
t_cur = t_cur + dt
+
+ if (instance%should_save_snapshot(t_cur)) then
+ sdata = LIBMUSCLE_Data_create_nils(2_LIBMUSCLE_size)
+ sitem = LIBMUSCLE_Data_create_grid(U, 'x')
+ call sdata%set_item(1_LIBMUSCLE_size, sitem)
+ call sdata%set_item(2_LIBMUSCLE_size, t_max)
+ smsg = LIBMUSCLE_Message(t_cur, sdata)
+ call instance%save_snapshot(smsg)
+ call LIBMUSCLE_Message_free(smsg)
+ call LIBMUSCLE_Data_free(sitem)
+ call LIBMUSCLE_Data_free(sdata)
+ end if
end do
! O_F
@@ -54,6 +67,12 @@
call LIBMUSCLE_Message_free(smsg)
call LIBMUSCLE_Data_free(sdata)
deallocate (U)
+
+ if (instance%should_save_final_snapshot()) then
+ smsg = LIBMUSCLE_Message(t_cur)
+ call instance%save_final_snapshot(smsg)
+ call LIBMUSCLE_Message_free(smsg)
+ end if
end do
call LIBMUSCLE_Instance_free(instance)
Intermediate snapshots
The state we need to store consists of two parts: the current time
t_cur
and the history of U
: Us
. Note that the last value of
U
is contained in Us
, so we do not need to save U
explicitly. The current time is stored as the timestamp
attribute of
the Message
object. Us
is stored in Message.data
.
Final snapshots
The same state is stored as for intermediate snapshots.
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/python/diffusion.py
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/tutorial_code/checkpointing_diffusion_partial.py
@@ -3,7 +3,7 @@
import numpy as np
-from libmuscle import Grid, Instance, Message
+from libmuscle import Grid, Instance, Message, USES_CHECKPOINT_API
from ymmsl import Operator
@@ -34,7 +34,7 @@
instance = Instance({
Operator.O_I: ['state_out'],
Operator.S: ['state_in'],
- Operator.O_F: ['final_state_out']})
+ Operator.O_F: ['final_state_out']}, USES_CHECKPOINT_API)
while instance.reuse_instance():
# F_INIT
@@ -74,6 +74,10 @@
Us = np.vstack((Us, U))
t_cur += dt
+ if instance.should_save_snapshot(t_cur):
+ msg = Message(t_cur, data=Grid(Us))
+ instance.save_snapshot(msg)
+
# O_F
final_state_msg = Message(t_cur, data=Grid(U, ['x']))
instance.send('final_state_out', final_state_msg)
@@ -97,6 +101,10 @@
plt.title('Concentration over time')
plt.show()
+ if instance.should_save_final_snapshot():
+ msg = Message(t_cur, data=Grid(Us))
+ instance.save_final_snapshot(msg)
+
if __name__ == '__main__':
logging.basicConfig()
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/cpp/diffusion.cpp
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/tutorial_code/checkpointing_diffusion_partial.cpp
@@ -9,6 +9,7 @@
using libmuscle::Data;
using libmuscle::Instance;
+using libmuscle::InstanceFlags;
using libmuscle::Message;
using ymmsl::Operator;
@@ -26,6 +27,18 @@
}
+/** Utility function packing data in a message for snapshotting
+ */
+Message create_state_message(
+ double t_cur, std::vector<std::vector<double>> const & Us)
+{
+ Data data = Data::nils(Us.size());
+ for (std::size_t i = 0; i < data.size(); ++i)
+ data[i] = Data::grid(Us[i].data(), {Us[i].size()});
+ return Message(t_cur, data);
+}
+
+
/** A simple diffusion model on a 1d grid.
*
* The state of this model is a 1D grid of concentrations. It sends out the
@@ -36,7 +49,8 @@
Instance instance(argc, argv, {
{Operator::O_I, {"state_out"}},
{Operator::S, {"state_in"}},
- {Operator::O_F, {"final_state_out"}}});
+ {Operator::O_F, {"final_state_out"}}},
+ InstanceFlags::USES_CHECKPOINT_API);
while (instance.reuse_instance()) {
// F_INIT
@@ -86,12 +100,21 @@
Us.push_back(U);
t_cur += dt;
+
+ if (instance.should_save_snapshot(t_cur)) {
+ instance.save_snapshot(create_state_message(t_cur, Us));
+ }
}
// O_F
auto data = Data::grid(U.data(), {U.size()}, {"x"});
instance.send("final_state_out", Message(t_cur, data));
std::cerr << "All done" << std::endl;
+
+
+ if (instance.should_save_final_snapshot()) {
+ instance.save_snapshot(create_state_message(t_cur, Us));
+ }
}
}
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/fortran/diffusion.f90
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/tutorial_code/checkpointing_diffusion_partial.f90
@@ -14,10 +14,10 @@
type(LIBMUSCLE_Message) :: rmsg
type(LIBMUSCLE_DataConstRef) :: rdata, item
-
+ integer (LIBMUSCLE_size), dimension(2) :: shp
type(LIBMUSCLE_Message) :: smsg
- type(LIBMUSCLE_Data) :: sdata
+ type(LIBMUSCLE_Data) :: sdata, sitem
real (selected_real_kind(15)) :: t_cur, t_next, t_max, dt, x_max, dx, d
integer (LIBMUSCLE_size) :: U_size, n_steps, iteration
@@ -29,11 +29,12 @@
call ports%add(YMMSL_Operator_O_I, 'state_out')
call ports%add(YMMSL_Operator_S, 'state_in')
call ports%add(YMMSL_Operator_O_F, 'final_state_out')
- instance = LIBMUSCLE_Instance(ports)
+ instance = LIBMUSCLE_Instance(ports, &
+ LIBMUSCLE_InstanceFlags(USES_CHECKPOINT_API=.true.))
call LIBMUSCLE_PortsDescription_free(ports)
do while (instance%reuse_instance())
- ! F_INIT
+ ! F_INIT
t_max = instance%get_setting_as_real8('t_max')
dt = instance%get_setting_as_real8('dt')
x_max = instance%get_setting_as_real8('x_max')
@@ -54,7 +55,7 @@
Us(:, iteration) = U
t_cur = 0.0
- do while (t_cur + dt < t_max .and. iteration < n_steps - 1)
+ do while (t_cur + dt < t_max)
print *, 't_cur: ', t_cur, 't_max: ', t_max
! O_I
sdata = LIBMUSCLE_Data_create_grid(U, 'x')
@@ -83,6 +84,18 @@
Us(:, iteration) = U
t_cur = t_cur + dt
+
+ if (instance%should_save_snapshot(t_cur)) then
+ sdata = LIBMUSCLE_Data_create_nils(2_LIBMUSCLE_size)
+ sitem = LIBMUSCLE_Data_create_grid(Us)
+ call sdata%set_item(1_LIBMUSCLE_size, sitem)
+ call sdata%set_item(2_LIBMUSCLE_size, iteration)
+ smsg = LIBMUSCLE_Message(t_cur, sdata)
+ call instance%save_snapshot(smsg)
+ call LIBMUSCLE_Message_free(smsg)
+ call LIBMUSCLE_Data_free(sdata)
+ call LIBMUSCLE_Data_free(sitem)
+ end if
end do
! O_F
@@ -91,6 +104,19 @@
call instance%send('final_state_out', smsg)
call LIBMUSCLE_Message_free(smsg)
call LIBMUSCLE_Data_free(sdata)
+
+ if (instance%should_save_final_snapshot()) then
+ sdata = LIBMUSCLE_Data_create_nils(2_LIBMUSCLE_size)
+ sitem = LIBMUSCLE_Data_create_grid(Us)
+ call sdata%set_item(1_LIBMUSCLE_size, sitem)
+ call sdata%set_item(2_LIBMUSCLE_size, iteration)
+ smsg = LIBMUSCLE_Message(t_cur, sdata)
+ call instance%save_snapshot(smsg)
+ call LIBMUSCLE_Message_free(smsg)
+ call LIBMUSCLE_Data_free(sdata)
+ call LIBMUSCLE_Data_free(sitem)
+ end if
+
deallocate (U, dU, Us)
print *, 'All done'
end do
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/templates/instance.py
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/tutorial_code/checkpointing_instance_partial.py
@@ -3,14 +3,14 @@
Note that this template is not executable as is, please have a look at the
examples in ``docs/source/examples`` to see working components."""
-from libmuscle import Instance, Message
+from libmuscle import Instance, Message, USES_CHECKPOINT_API
from ymmsl import Operator
instance = Instance({
Operator.F_INIT: ["F_INIT_Port"],
Operator.O_I: ["O_I_Port"],
Operator.S: ["S_Port"],
- Operator.O_F: ["O_F_Port"]})
+ Operator.O_F: ["O_F_Port"]}, USES_CHECKPOINT_API)
while instance.reuse_instance():
# F_INIT
@@ -35,5 +35,15 @@
t_cur += dt
+ if instance.should_save_snapshot(t_cur):
+ state = ... # component implementation
+ msg = Message(t_cur, data=state)
+ instance.save_snapshot(msg)
+
# O_F
- instance.send("O_F_Port", Message(t_cur, None, data))
+ instance.send("O_F_Port", Message(t_cur, data=data))
+
+ if instance.should_save_final_snapshot():
+ state = ... # component implementation
+ msg = Message(t_cur, data=state)
+ instance.save_final_snapshot(msg)
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/templates/instance.cpp
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/tutorial_code/checkpointing_instance_partial.cpp
@@ -5,7 +5,9 @@
#include <ymmsl/ymmsl.hpp>
+using libmuscle::Data;
using libmuscle::Instance;
+using libmuscle::InstanceFlags;
using libmuscle::Message;
using ymmsl::Operator;
@@ -21,7 +23,8 @@
{Operator::F_INIT, {"F_INIT_Port"}},
{Operator::O_I, {"O_I_Port"}},
{Operator::S, {"S_Port"}},
- {Operator::O_F, {"O_F_Port"}}});
+ {Operator::O_F, {"O_F_Port"}}},
+ InstanceFlags::USES_CHECKPOINT_API);
while (instance.reuse_instance()) {
// F_INIT
@@ -44,10 +47,22 @@
// ...
t_cur += dt;
+
+ if (instance.should_save_snapshot(t_cur)) {
+ Data state; // collect state
+ Message msg(t_cur, state);
+ instance.save_snapshot(msg);
+ }
}
// O_F
instance.send("final_state", Message(t_cur, data));
+
+ if (instance.should_save_final_snapshot()) {
+ Data state; // collect state
+ Message msg(t_cur, state);
+ instance.save_final_snapshot(msg);
+ }
}
return EXIT_SUCCESS;
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/templates/instance.f90
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/tutorial_code/checkpointing_instance_partial.f90
@@ -22,7 +22,8 @@
call ports%add(YMMSL_Operator_O_I, 'O_I_Port')
call ports%add(YMMSL_Operator_S, 'S_Port')
call ports%add(YMMSL_Operator_O_F, 'O_F_Port')
- instance = LIBMUSCLE_Instance(ports)
+ instance = LIBMUSCLE_Instance(ports, &
+ LIBMUSCLE_InstanceFlags(USES_CHECKPOINT_API=.true.))
call LIBMUSCLE_PortsDescription_free(ports)
do while (instance%reuse_instance())
@@ -50,6 +51,14 @@
! ...
t_cur = t_cur + dt
+
+ if (instance%should_save_snapshot(t_cur)) then
+ sdata = LIBMUSCLE_Data() ! collect state
+ smsg = LIBMUSCLE_Message(t_cur, sdata)
+ call instance%save_snapshot(smsg)
+ call LIBMUSCLE_Message_free(smsg)
+ call LIBMUSCLE_Data_free(sdata)
+ end if
end do
! O_F
@@ -58,6 +67,14 @@
call LIBMUSCLE_Message_free(smsg)
! ...
+ if (instance%should_save_final_snapshot()) then
+ sdata = LIBMUSCLE_Data() ! collect state
+ smsg = LIBMUSCLE_Message(t_cur, sdata)
+ call instance%save_final_snapshot(smsg)
+ call LIBMUSCLE_Message_free(smsg)
+ call LIBMUSCLE_Data_free(sdata)
+ end if
+
end do
call LIBMUSCLE_Instance_free(instance)
Step 3: Implement resume
Now that the checkpoint hooks are implemented, we can add support for resuming from a previously created checkpoint. When resuming, there are two options: resuming from an intermediate checkpoint and resuming from a final checkpoint.
When resuming from an intermediate checkpoint, your component first loads its
state from the checkpoint. Then it should continue where it left off, which is
at the beginning of O_I
. This means that it has to skip F_INIT
in
order to run as if it had never stopped.
When resuming from a final checkpoint, your component first loads its state from
the checkpoint. Next, your component executes the F_INIT
operator as usual,
as it would have had it continued after writing the snapshot.
Steps to implement the resumption logic:
At the start of – but inside – the reuse loop you check if you need to resume from a previous snapshot with the API call
resuming()
.Note
This takes place inside the reuse loop. Currently resuming can only happen during the first iteration of the reuse loop. However, additional checkpointing features are planned that would allow a model to resume multiple times inside one run. By implementing the resume logic inside the reuse loop, your component will be forwards-compatible with this.
When resuming, you load the previously stored snapshot with
load_snapshot()
and restore the state of your component.Afterwards check if initialization is required with
should_init()
and run the regular initialization logic.Continue with the time-integration loop.
See Example: implemented checkpoint hooks and resume for example implementations in the reaction-diffusion models and the component template.
See also
Python API documentation:
resuming()
,load_snapshot()
,should_init()
.C++ API documentation:
resuming()
,load_snapshot()
,should_init()
.Fortran API documentation:
LIBMUSCLE_Instance_resuming()
,LIBMUSCLE_Instance_load_snapshot()
,LIBMUSCLE_Instance_should_init()
.
Reload settings when resuming
You will notice in the examples that the resume logic is not executed first in the reuse-loop. Instead, the components all retrieve settings. The reason behind this is that it allows the user to resume a simulation with slightly different settings and have those settings take effect immediately after resuming.
It is not required to do this, so you get to decide if (and when) you reload settings after resuming. Be sure to include the behaviour of your component in the documentation, such that users of your component know what they can expect.
Example: implemented checkpoint hooks and resume
Note that below examples only shows the changes compared to the start situation. You can view the full contents of the files in the git repository.
Resume logic
In Example: implemented checkpoint hooks we made the choice to store different data in the message for intermediate and final snapshots. When resuming we therefore need to handle these two cases.
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/python/reaction.py
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/python/checkpointing_reaction.py
@@ -1,6 +1,6 @@
import logging
-from libmuscle import Grid, Instance, Message
+from libmuscle import Grid, Instance, Message, USES_CHECKPOINT_API
from ymmsl import Operator
@@ -9,27 +9,47 @@
"""
instance = Instance({
Operator.F_INIT: ['initial_state'], # 1D Grid
- Operator.O_F: ['final_state']}) # 1D Grid
+ Operator.O_F: ['final_state']}, # 1D Grid
+ USES_CHECKPOINT_API)
while instance.reuse_instance():
- # F_INIT
t_max = instance.get_setting('t_max', 'float')
dt = instance.get_setting('dt', 'float')
k = instance.get_setting('k', 'float')
- msg = instance.receive('initial_state')
- U = msg.data.array.copy()
+ if instance.resuming():
+ msg = instance.load_snapshot()
+ if msg.data is not None:
+ # A final snapshot does not have data in it, but that's fine: we
+ # will do the F_INIT step inside `should_init()` below.
+ U = msg.data[0].array.copy()
+ t_cur = msg.timestamp
+ t_stop = msg.data[1]
- t_cur = msg.timestamp
- while t_cur + dt < msg.timestamp + t_max:
+ # F_INIT
+ if instance.should_init():
+ msg = instance.receive('initial_state')
+ U = msg.data.array.copy()
+ t_cur = msg.timestamp
+ t_stop = msg.timestamp + t_max
+
+ while t_cur + dt < t_stop:
# O_I
# S
U += k * U * dt
t_cur += dt
+ if instance.should_save_snapshot(t_cur):
+ msg = Message(t_cur, data=[Grid(U, ['x']), t_stop])
+ instance.save_snapshot(msg)
+
# O_F
instance.send('final_state', Message(t_cur, data=Grid(U, ['x'])))
+
+ if instance.should_save_final_snapshot():
+ msg = Message(t_cur)
+ instance.save_final_snapshot(msg)
if __name__ == '__main__':
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/cpp/reaction.cpp
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/cpp/checkpointing_reaction.cpp
@@ -8,6 +8,7 @@
using libmuscle::Data;
using libmuscle::DataConstRef;
using libmuscle::Instance;
+using libmuscle::InstanceFlags;
using libmuscle::Message;
using ymmsl::Operator;
@@ -17,7 +18,8 @@
void reaction(int argc, char * argv[]) {
Instance instance(argc, argv, {
{Operator::F_INIT, {"initial_state"}}, // 1D Grid
- {Operator::O_F, {"final_state"}}}); // 1D Grid
+ {Operator::O_F, {"final_state"}}}, // 1D Grid
+ InstanceFlags::USES_CHECKPOINT_API);
while (instance.reuse_instance()) {
@@ -25,24 +27,52 @@
double t_max = instance.get_setting_as<double>("t_max");
double dt = instance.get_setting_as<double>("dt");
double k = instance.get_setting_as<double>("k");
+ double t_stop, t_cur;
+ std::vector<double> U;
- auto msg = instance.receive("initial_state");
- auto data_ptr = msg.data().elements<double>();
- std::vector<double> U(data_ptr, data_ptr + msg.data().size());
+ if (instance.resuming()) {
+ auto msg = instance.load_snapshot();
+ if (!msg.data().is_nil()) {
+ // A final snapshot does not have data in it, but that's fine: we
+ // will do the F_INIT step inside `should_init()` below.
+ auto data_ptr = msg.data()[0].elements<double>();
+ U = std::vector<double>(data_ptr, data_ptr + msg.data()[0].size());
+ t_cur = msg.timestamp();
+ t_stop = msg.data()[1].as<double>();
+ }
+ }
- double t_cur = msg.timestamp();
- while (t_cur + dt < msg.timestamp() + t_max) {
+ if (instance.should_init()) {
+ auto msg = instance.receive("initial_state");
+ auto data_ptr = msg.data().elements<double>();
+ U = std::vector<double>(data_ptr, data_ptr + msg.data().size());
+ t_cur = msg.timestamp();
+ t_stop = msg.timestamp() + t_max;
+ }
+
+ while (t_cur + dt < t_stop) {
// O_I
// S
for (double & u : U)
u += k * u * dt;
t_cur += dt;
+
+ if (instance.should_save_snapshot(t_cur)) {
+ Message msg(t_cur,
+ Data::list(Data::grid(U.data(), {U.size()}, {"x"}), t_stop));
+ instance.save_snapshot(msg);
+ }
}
// O_F
auto result = Data::grid(U.data(), {U.size()}, {"x"});
instance.send("final_state", Message(t_cur, result));
+
+ if (instance.should_save_final_snapshot()) {
+ Message msg(t_cur);
+ instance.save_final_snapshot(msg);
+ }
}
}
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/fortran/reaction.f90
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/fortran/checkpointing_reaction.f90
@@ -11,7 +11,7 @@
type(LIBMUSCLE_Message) :: smsg
- type(LIBMUSCLE_Data) :: sdata
+ type(LIBMUSCLE_Data) :: sdata, sitem
real (selected_real_kind(15)) :: t_cur, t_max, dt, k
real (selected_real_kind(15)), dimension(:), allocatable :: U
@@ -20,7 +20,8 @@
ports = LIBMUSCLE_PortsDescription()
call ports%add(YMMSL_Operator_F_INIT, 'initial_state')
call ports%add(YMMSL_Operator_O_F, 'final_state')
- instance = LIBMUSCLE_Instance(ports)
+ instance = LIBMUSCLE_Instance(ports, &
+ LIBMUSCLE_InstanceFlags(USES_CHECKPOINT_API=.true.))
call LIBMUSCLE_PortsDescription_free(ports)
do while (instance%reuse_instance())
@@ -29,15 +30,38 @@
dt = instance%get_setting_as_real8('dt')
k = instance%get_setting_as_real8('k')
- rmsg = instance%receive('initial_state')
- rdata = rmsg%get_data()
- allocate (U(rdata%size()))
- call rdata%elements(U)
- call LIBMUSCLE_DataConstRef_free(rdata)
+ if (instance%resuming()) then
+ rmsg = instance%load_snapshot()
+ rdata = rmsg%get_data()
+ if (.not. rdata%is_nil()) then
+ ! A final snapshot does not have data in it, but that's fine: we
+ ! will do the F_INIT step inside `should_init` below.
+ item = rdata%get_item(1_LIBMUSCLE_size)
+ allocate (U(item%size()))
+ call item%elements(U)
+ call LIBMUSCLE_DataConstRef_free(item)
- t_cur = rmsg%timestamp()
- t_max = rmsg%timestamp() + t_max
- call LIBMUSCLE_Message_free(rmsg)
+ t_cur = rmsg%timestamp()
+
+ item = rdata%get_item(2_LIBMUSCLE_size)
+ t_max = item%as_real8()
+ call LIBMUSCLE_DataConstRef_free(item)
+ end if
+ call LIBMUSCLE_DataConstRef_free(rdata)
+ call LIBMUSCLE_Message_free(rmsg)
+ end if
+
+ if (instance%should_init()) then
+ rmsg = instance%receive('initial_state')
+ rdata = rmsg%get_data()
+ allocate (U(rdata%size()))
+ call rdata%elements(U)
+ call LIBMUSCLE_DataConstRef_free(rdata)
+
+ t_cur = rmsg%timestamp()
+ t_max = rmsg%timestamp() + t_max
+ call LIBMUSCLE_Message_free(rmsg)
+ end if
do while (t_cur + dt < t_max)
! O_I
@@ -45,6 +69,18 @@
! S
U = k * U * dt
t_cur = t_cur + dt
+
+ if (instance%should_save_snapshot(t_cur)) then
+ sdata = LIBMUSCLE_Data_create_nils(2_LIBMUSCLE_size)
+ sitem = LIBMUSCLE_Data_create_grid(U, 'x')
+ call sdata%set_item(1_LIBMUSCLE_size, sitem)
+ call sdata%set_item(2_LIBMUSCLE_size, t_max)
+ smsg = LIBMUSCLE_Message(t_cur, sdata)
+ call instance%save_snapshot(smsg)
+ call LIBMUSCLE_Message_free(smsg)
+ call LIBMUSCLE_Data_free(sitem)
+ call LIBMUSCLE_Data_free(sdata)
+ end if
end do
! O_F
@@ -54,6 +90,12 @@
call LIBMUSCLE_Message_free(smsg)
call LIBMUSCLE_Data_free(sdata)
deallocate (U)
+
+ if (instance%should_save_final_snapshot()) then
+ smsg = LIBMUSCLE_Message(t_cur)
+ call instance%save_final_snapshot(smsg)
+ call LIBMUSCLE_Message_free(smsg)
+ end if
end do
call LIBMUSCLE_Instance_free(instance)
Resume logic
For the diffusion model we stored the same state for intermediate and
final snapshots. This makes resuming easier because we do not have to
distinguish between the data stored in the loaded Message
object.
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/python/diffusion.py
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/python/checkpointing_diffusion.py
@@ -3,7 +3,7 @@
import numpy as np
-from libmuscle import Grid, Instance, Message
+from libmuscle import Grid, Instance, Message, USES_CHECKPOINT_API
from ymmsl import Operator
@@ -34,7 +34,7 @@
instance = Instance({
Operator.O_I: ['state_out'],
Operator.S: ['state_in'],
- Operator.O_F: ['final_state_out']})
+ Operator.O_F: ['final_state_out']}, USES_CHECKPOINT_API)
while instance.reuse_instance():
# F_INIT
@@ -44,13 +44,20 @@
dx = instance.get_setting('dx', 'float')
d = instance.get_setting('d', 'float')
- U = np.zeros(int(round(x_max / dx))) + 1e-20
- U[25] = 2.0
- U[50] = 2.0
- U[75] = 2.0
- Us = U
+ if instance.resuming():
+ msg = instance.load_snapshot()
+ Us = msg.data.array.copy()
+ U = Us[-1]
+ t_cur = msg.timestamp
- t_cur = 0.0
+ if instance.should_init():
+ U = np.zeros(int(round(x_max / dx))) + 1e-20
+ U[25] = 2.0
+ U[50] = 2.0
+ U[75] = 2.0
+ Us = U
+ t_cur = 0.0
+
while t_cur + dt <= t_max:
# O_I
t_next = t_cur + dt
@@ -73,6 +80,10 @@
U += dU
Us = np.vstack((Us, U))
t_cur += dt
+
+ if instance.should_save_snapshot(t_cur):
+ msg = Message(t_cur, data=Grid(Us))
+ instance.save_snapshot(msg)
# O_F
final_state_msg = Message(t_cur, data=Grid(U, ['x']))
@@ -97,6 +108,10 @@
plt.title('Concentration over time')
plt.show()
+ if instance.should_save_final_snapshot():
+ msg = Message(t_cur, data=Grid(Us))
+ instance.save_final_snapshot(msg)
+
if __name__ == '__main__':
logging.basicConfig()
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/cpp/diffusion.cpp
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/cpp/checkpointing_diffusion.cpp
@@ -9,6 +9,7 @@
using libmuscle::Data;
using libmuscle::Instance;
+using libmuscle::InstanceFlags;
using libmuscle::Message;
using ymmsl::Operator;
@@ -26,6 +27,18 @@
}
+/** Utility function packing data in a message for snapshotting
+ */
+Message create_state_message(
+ double t_cur, std::vector<std::vector<double>> const & Us)
+{
+ Data data = Data::nils(Us.size());
+ for (std::size_t i = 0; i < data.size(); ++i)
+ data[i] = Data::grid(Us[i].data(), {Us[i].size()});
+ return Message(t_cur, data);
+}
+
+
/** A simple diffusion model on a 1d grid.
*
* The state of this model is a 1D grid of concentrations. It sends out the
@@ -36,7 +49,8 @@
Instance instance(argc, argv, {
{Operator::O_I, {"state_out"}},
{Operator::S, {"state_in"}},
- {Operator::O_F, {"final_state_out"}}});
+ {Operator::O_F, {"final_state_out"}}},
+ InstanceFlags::USES_CHECKPOINT_API);
while (instance.reuse_instance()) {
// F_INIT
@@ -46,15 +60,36 @@
double dx = instance.get_setting_as<double>("dx");
double d = instance.get_setting_as<double>("d");
+ double t_cur;
+ std::vector<std::vector<double>> Us;
std::vector<double> U(lrint(x_max / dx), 1e-20);
- U[25] = 2.0;
- U[50] = 2.0;
- U[75] = 2.0;
- std::vector<std::vector<double>> Us;
- Us.push_back(U);
+ if (instance.resuming()) {
+ auto msg = instance.load_snapshot();
+ for (int i = 0; i < msg.data().size(); ++i) {
+ auto const & data_i = msg.data()[i];
+ if (data_i.shape().size() != 1u || data_i.size() != U.size()) {
+ auto err_msg = "Received state of incorrect shape or size!";
+ instance.error_shutdown(err_msg);
+ throw std::runtime_error(err_msg);
+ }
+ std::copy_n(data_i.elements<double>(), data_i.size(), U.begin());
+ Us.push_back(U);
+ }
+ t_cur = msg.timestamp();
+ }
- double t_cur = 0.0;
+ if (instance.should_init()) {
+ U = std::vector<double>(lrint(x_max / dx), 1e-20);
+ U[25] = 2.0;
+ U[50] = 2.0;
+ U[75] = 2.0;
+
+ Us.push_back(U);
+
+ t_cur = 0.0;
+ }
+
while (t_cur + dt <= t_max) {
std::cerr << "t_cur: " << t_cur << ", t_max: " << t_max << std::endl;
// O_I
@@ -86,12 +121,21 @@
Us.push_back(U);
t_cur += dt;
+
+ if (instance.should_save_snapshot(t_cur)) {
+ instance.save_snapshot(create_state_message(t_cur, Us));
+ }
}
// O_F
auto data = Data::grid(U.data(), {U.size()}, {"x"});
instance.send("final_state_out", Message(t_cur, data));
std::cerr << "All done" << std::endl;
+
+
+ if (instance.should_save_final_snapshot()) {
+ instance.save_snapshot(create_state_message(t_cur, Us));
+ }
}
}
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/fortran/diffusion.f90
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/fortran/checkpointing_diffusion.f90
@@ -14,10 +14,10 @@
type(LIBMUSCLE_Message) :: rmsg
type(LIBMUSCLE_DataConstRef) :: rdata, item
-
+ integer (LIBMUSCLE_size), dimension(2) :: shp
type(LIBMUSCLE_Message) :: smsg
- type(LIBMUSCLE_Data) :: sdata
+ type(LIBMUSCLE_Data) :: sdata, sitem
real (selected_real_kind(15)) :: t_cur, t_next, t_max, dt, x_max, dx, d
integer (LIBMUSCLE_size) :: U_size, n_steps, iteration
@@ -29,7 +29,8 @@
call ports%add(YMMSL_Operator_O_I, 'state_out')
call ports%add(YMMSL_Operator_S, 'state_in')
call ports%add(YMMSL_Operator_O_F, 'final_state_out')
- instance = LIBMUSCLE_Instance(ports)
+ instance = LIBMUSCLE_Instance(ports, &
+ LIBMUSCLE_InstanceFlags(USES_CHECKPOINT_API=.true.))
call LIBMUSCLE_PortsDescription_free(ports)
do while (instance%reuse_instance())
@@ -40,20 +41,47 @@
dx = instance%get_setting_as_real8('dx')
d = instance%get_setting_as_real8('d')
- U_size = nint(x_max / dx)
- allocate (U(U_size), dU(U_size))
- U = 1e-20
- U(26) = 2.0
- U(51) = 2.0
- U(76) = 2.0
+ if (instance%resuming()) then
+ rmsg = instance%load_snapshot()
+ rdata = rmsg%get_data()
- n_steps = int(t_max / dt)
- allocate (Us(U_size, n_steps))
+ item = rdata%get_item(1_LIBMUSCLE_size)
+ call item%shape(shp)
+ U_size = shp(1)
+ n_steps = shp(2)
+ allocate (Us(shp(1), shp(2)))
+ call item%elements(Us)
+ call LIBMUSCLE_DataConstRef_free(item)
- iteration = 1
- Us(:, iteration) = U
+ item = rdata%get_item(2_LIBMUSCLE_size)
+ iteration = item%as_int()
+ call LIBMUSCLE_DataConstRef_free(item)
- t_cur = 0.0
+ U = Us(:, iteration)
+ allocate(dU(U_size))
+ t_cur = rmsg%timestamp()
+
+ call LIBMUSCLE_DataConstRef_free(rdata)
+ call LIBMUSCLE_Message_free(rmsg)
+ end if
+
+ if (instance%should_init()) then
+ U_size = nint(x_max / dx)
+ allocate (U(U_size), dU(U_size))
+ U = 1e-20
+ U(26) = 2.0
+ U(51) = 2.0
+ U(76) = 2.0
+
+ n_steps = int(t_max / dt)
+ allocate (Us(U_size, n_steps))
+
+ iteration = 1
+ Us(:, iteration) = U
+
+ t_cur = 0.0
+ end if
+
do while (t_cur + dt < t_max .and. iteration < n_steps - 1)
print *, 't_cur: ', t_cur, 't_max: ', t_max
! O_I
@@ -83,6 +111,18 @@
Us(:, iteration) = U
t_cur = t_cur + dt
+
+ if (instance%should_save_snapshot(t_cur)) then
+ sdata = LIBMUSCLE_Data_create_nils(2_LIBMUSCLE_size)
+ sitem = LIBMUSCLE_Data_create_grid(Us)
+ call sdata%set_item(1_LIBMUSCLE_size, sitem)
+ call sdata%set_item(2_LIBMUSCLE_size, iteration)
+ smsg = LIBMUSCLE_Message(t_cur, sdata)
+ call instance%save_snapshot(smsg)
+ call LIBMUSCLE_Message_free(smsg)
+ call LIBMUSCLE_Data_free(sdata)
+ call LIBMUSCLE_Data_free(sitem)
+ end if
end do
! O_F
@@ -91,6 +131,19 @@
call instance%send('final_state_out', smsg)
call LIBMUSCLE_Message_free(smsg)
call LIBMUSCLE_Data_free(sdata)
+
+ if (instance%should_save_final_snapshot()) then
+ sdata = LIBMUSCLE_Data_create_nils(2_LIBMUSCLE_size)
+ sitem = LIBMUSCLE_Data_create_grid(Us)
+ call sdata%set_item(1_LIBMUSCLE_size, sitem)
+ call sdata%set_item(2_LIBMUSCLE_size, iteration)
+ smsg = LIBMUSCLE_Message(t_cur, sdata)
+ call instance%save_snapshot(smsg)
+ call LIBMUSCLE_Message_free(smsg)
+ call LIBMUSCLE_Data_free(sdata)
+ call LIBMUSCLE_Data_free(sitem)
+ end if
+
deallocate (U, dU, Us)
print *, 'All done'
end do
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/templates/instance.py
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/templates/checkpointing_instance.py
@@ -17,8 +17,13 @@
setting = instance.get_setting("setting")
...
- instance.receive("F_INIT_Port")
- ...
+ if instance.resuming():
+ msg = instance.load_snapshot()
+ ... # restore state from message
+
+ if instance.should_init():
+ instance.receive("F_INIT_Port")
+ ...
while t_cur <= t_max:
# O_I
@@ -35,5 +40,15 @@
t_cur += dt
+ if instance.should_save_snapshot(t_cur):
+ state = ... # component implementation
+ msg = Message(t_cur, None, state)
+ instance.save_snapshot(msg)
+
# O_F
instance.send("O_F_Port", Message(t_cur, None, data))
+
+ if instance.should_save_final_snapshot():
+ state = ... # component implementation
+ msg = Message(t_cur, None, state)
+ instance.save_final_snapshot(msg)
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/templates/instance.cpp
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/templates/checkpointing_instance.cpp
@@ -5,7 +5,9 @@
#include <ymmsl/ymmsl.hpp>
+using libmuscle::Data;
using libmuscle::Instance;
+using libmuscle::InstanceFlags;
using libmuscle::Message;
using ymmsl::Operator;
@@ -21,15 +23,23 @@
{Operator::F_INIT, {"F_INIT_Port"}},
{Operator::O_I, {"O_I_Port"}},
{Operator::S, {"S_Port"}},
- {Operator::O_F, {"O_F_Port"}}});
+ {Operator::O_F, {"O_F_Port"}}},
+ InstanceFlags::USES_CHECKPOINT_API);
while (instance.reuse_instance()) {
// F_INIT
auto setting = instance.get_setting("setting");
// ...
- instance.receive("F_INIT_Port");
- // ...
+ if (instance.resuming()) {
+ auto msg = instance.load_snapshot();
+ // ... restore state from message
+ }
+
+ if (instance.should_init()) {
+ instance.receive("F_INIT_Port");
+ // ...
+ }
while (t_cur <= t_max) {
// O_I
@@ -44,10 +54,22 @@
// ...
t_cur += dt;
+
+ if (instance.should_save_snapshot(t_cur)) {
+ Data state; // collect state
+ Message msg(t_cur, state);
+ instance.save_snapshot(msg);
+ }
}
// O_F
instance.send("final_state", Message(t_cur, data));
+
+ if (instance.should_save_final_snapshot()) {
+ Data state; // collect state
+ Message msg(t_cur, state);
+ instance.save_final_snapshot(msg);
+ }
}
return EXIT_SUCCESS;
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/templates/instance.f90
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/templates/checkpointing_instance.f90
@@ -22,7 +22,8 @@
call ports%add(YMMSL_Operator_O_I, 'O_I_Port')
call ports%add(YMMSL_Operator_S, 'S_Port')
call ports%add(YMMSL_Operator_O_F, 'O_F_Port')
- instance = LIBMUSCLE_Instance(ports)
+ instance = LIBMUSCLE_Instance(ports, &
+ LIBMUSCLE_InstanceFlags(USES_CHECKPOINT_API=.true.))
call LIBMUSCLE_PortsDescription_free(ports)
do while (instance%reuse_instance())
@@ -30,9 +31,18 @@
setting = instance%get_setting_as_real8('setting')
! ...
- rmsg = instance%receive('F_INIT')
- call LIBMUSCLE_Message_free(rmsg)
- ! ...
+
+ if (instance%resuming()) then
+ rmsg = instance%load_snapshot()
+ ! ... restore state from message
+ call LIBMUSCLE_Message_free(rmsg)
+ end if
+
+ if (instance%should_init()) then
+ rmsg = instance%receive('F_INIT')
+ call LIBMUSCLE_Message_free(rmsg)
+ ! ...
+ end if
do while (t_cur <= t_max)
! O_I
@@ -50,6 +60,14 @@
! ...
t_cur = t_cur + dt
+
+ if (instance%should_save_snapshot(t_cur)) then
+ sdata = LIBMUSCLE_Data() ! collect state
+ smsg = LIBMUSCLE_Message(t_cur, sdata)
+ call instance%save_snapshot(smsg)
+ call LIBMUSCLE_Message_free(smsg)
+ call LIBMUSCLE_Data_free(sdata)
+ end if
end do
! O_F
@@ -58,6 +76,14 @@
call LIBMUSCLE_Message_free(smsg)
! ...
+ if (instance%should_save_final_snapshot()) then
+ sdata = LIBMUSCLE_Data() ! collect state
+ smsg = LIBMUSCLE_Message(t_cur, sdata)
+ call instance%save_final_snapshot(smsg)
+ call LIBMUSCLE_Message_free(smsg)
+ call LIBMUSCLE_Data_free(sdata)
+ end if
+
end do
call LIBMUSCLE_Instance_free(instance)
Components that do not keep state between reuse
Some components do not need to keep state between reuses. An example of that is the reaction model from the above examples. In the final snapshot, no state needs to be stored to allow properly resuming this component, see Example: implemented checkpoint hooks.
Other examples of such components may be data transformers, receiving data
on an F_INIT
port and sending the converted data on an O_F
port.
If you indicate to libmuscle that your component does not keep state between
reuse, libmuscle automatically provides checkpointing for your component. You do
this by providing the ~InstanceFlags.KEEPS_NO_STATE_FOR_NEXT_USE
flag
when creating the instance. See the below example for a variant of the example
reaction model.
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/python/reaction.py
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/python/reaction_no_state_for_next_use.py
@@ -1,6 +1,6 @@
import logging
-from libmuscle import Grid, Instance, Message
+from libmuscle import Grid, Instance, Message, KEEPS_NO_STATE_FOR_NEXT_USE
from ymmsl import Operator
@@ -9,7 +9,8 @@
"""
instance = Instance({
Operator.F_INIT: ['initial_state'], # 1D Grid
- Operator.O_F: ['final_state']}) # 1D Grid
+ Operator.O_F: ['final_state']}, # 1D Grid
+ KEEPS_NO_STATE_FOR_NEXT_USE)
while instance.reuse_instance():
# F_INIT
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/cpp/reaction.cpp
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/cpp/reaction_no_state_for_next_use.cpp
@@ -8,6 +8,7 @@
using libmuscle::Data;
using libmuscle::DataConstRef;
using libmuscle::Instance;
+using libmuscle::InstanceFlags;
using libmuscle::Message;
using ymmsl::Operator;
@@ -17,7 +18,8 @@
void reaction(int argc, char * argv[]) {
Instance instance(argc, argv, {
{Operator::F_INIT, {"initial_state"}}, // 1D Grid
- {Operator::O_F, {"final_state"}}}); // 1D Grid
+ {Operator::O_F, {"final_state"}}}, // 1D Grid
+ InstanceFlags::KEEPS_NO_STATE_FOR_NEXT_USE);
while (instance.reuse_instance()) {
--- /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/fortran/reaction.f90
+++ /home/docs/checkouts/readthedocs.org/user_builds/muscle3/checkouts/latest/docs/source/examples/fortran/reaction_no_state_for_next_use.f90
@@ -20,7 +20,8 @@
ports = LIBMUSCLE_PortsDescription()
call ports%add(YMMSL_Operator_F_INIT, 'initial_state')
call ports%add(YMMSL_Operator_O_F, 'final_state')
- instance = LIBMUSCLE_Instance(ports)
+ instance = LIBMUSCLE_Instance(ports, &
+ LIBMUSCLE_InstanceFlags(KEEPS_NO_STATE_FOR_NEXT_USE=.true.))
call LIBMUSCLE_PortsDescription_free(ports)
do while (instance%reuse_instance())
See also
Python API documentation:
InstanceFlags
.C++ API documentation
InstanceFlags
.Fortran API documentation
LIBMUSCLE_InstanceFlags
.
Builtin validation
MUSCLE3’s checkpointing API was carefully designed to allow consistenly resuming a simulation. This is only possible when components carefully implement the checkpointing API. To support you in this task, MUSCLE3 tries to detect any issues with the checkpointing implementation. When MUSCLE3 detects a problem, an error is raised to indicate what went wrong and point you in the right direction for fixing the problem.
Checkpointing in MPI-enabled components
Checkpionting in MPI-enabled components works in the same way as for non-MPI components. The main difference is that some API methods must be called by all processes, while others can only be called from the root process.
resuming()
must be called simultaneously in all processes.load_snapshot()
may only be called on the root process. It is up to the model code to scatter or broadcast the snapshot state to the non-root processes, if necessary.should_init()
must be called simultaneously in all processes.should_save_snapshot()
andshould_save_final_snapshot()
must be called simultaneously in all processes.save_snapshot()
andsave_final_snapshot()
may only be called on the root process. It is therefore up to the model code to gather the necessary state from the non-root processes before saving the snapshot.
See also
C++ API documentation:
resuming()
load_snapshot()
should_init()
should_save_final_snapshot()
save_final_snapshot()
should_save_snapshot()
save_snapshot()
Fortran API documentation: