API Documentation for Python

class libmuscle.Grid(array: ndarray, indexes: List[str] | None = None)[source]

Bases: object

Represents a grid of data to send or receive.

Note that for received grids, the array of data is a read-only NumPy array. If you have another array that you want to put the received data into, use np.copyto(dest, source) to copy the contents of the received array across into your destination array. If you don’t have an array yet and want a writable version of the received array, use array.copy() to create a writable copy. See the tutorial for examples.

array

An array of data

Type:

np.ndarray

indexes

The names of the array’s indexes.

Type:

Optional[List[str]]

__init__(array: ndarray, indexes: List[str] | None = None) None[source]

Creates a Grid object.

A Grid object represents an multi-dimensional array of data. It has a type, a shape, and optionally a list of index names.

Supported data types are 4- and 8-byte integers (numpy.int32, numpy.int64), 4- and 8-byte floats (numpy.float32, numpy.float64), and booleans (numpy.bool_, numpy.bool8). The data argument must be a NumPy array of one of those types.

If indexes is given, then it must be a list of strings of the same length as the number of dimensions of data, and contain the names of the indexes of the array. For a 2D Cartesian grid, these may be 'x' and 'y' for example, or for a polar grid, 'phi' and 'rho'.

Parameters:
  • array – An array of data, of a supported type (see above).

  • indexes – Names of the indexes (see above).

class libmuscle.Instance(ports: ~typing.Dict[~ymmsl.component.Operator, ~typing.List[str]] | None = None, flags: ~libmuscle.instance.InstanceFlags = InstanceFlags.None)[source]

Bases: object

Represents a component instance in a MUSCLE3 simulation.

This class provides a low-level send/receive API for the instance to use.

__init__(ports: ~typing.Dict[~ymmsl.component.Operator, ~typing.List[str]] | None = None, flags: ~libmuscle.instance.InstanceFlags = InstanceFlags.None) None[source]

Create an Instance.

Parameters:
  • ports – A list of port names for each Operator of this component.

  • flags – Indicate properties for this instance. See InstanceFlags for a detailed description of possible flags.

error_shutdown(message: str) None[source]

Logs an error and shuts down the Instance.

If you detect that something is wrong (invalid input, invalid settings, simulation diverged, or anything else really), you should call this method before calling exit() or raising an exception that you don’t expect to catch.

If you do so, the Instance will tell the rest of the simulation that it encountered an error and will shut down. That makes it easier to debug the situation (the message will be logged), and it reduces the chance that other parts of the simulation will sit around waiting forever for a message that this instance was supposed to send.

Parameters:

message – An error message describing the problem.

get_port_length(port: str) int[source]

Returns the current length of the port.

Parameters:

port – The name of the port to measure.

Raises:

RuntimeError – If this is a scalar port.

get_setting(name: str, typ: typing_extensions.Literal[str]) str[source]
get_setting(name: str, typ: typing_extensions.Literal[int]) int
get_setting(name: str, typ: typing_extensions.Literal[float]) float
get_setting(name: str, typ: typing_extensions.Literal[bool]) bool
get_setting(name: str, typ: typing_extensions.Literal[[float]]) List[float]
get_setting(name: str, typ: typing_extensions.Literal[[[float]]]) List[List[float]]
get_setting(name: str, typ: None = None) str | int | float | bool | List[float] | List[List[float]] | bool_union_fix

Returns the value of a model setting.

Parameters:
  • name – The name of the setting, without any instance prefix.

  • typ – The expected type of the value. If the value does not match this type, a TypeError will be raised. If not specified, any of the supported types will be accepted, and you’ll have to figure out what you got yourself.

Raises:
  • KeyError – If no value was set for this setting.

  • TypeError – If the type of the setting’s value was not as expected.

is_connected(port: str) bool[source]

Returns whether the given port is connected.

Parameters:

port – The name of the port to inspect.

Returns:

True if there is a conduit attached to this port, False if not.

is_resizable(port: str) bool[source]

Returns whether the given port is resizable.

Scalar ports are never resizable. Whether a vector port is resizable depends on what it is connected to.

Parameters:

port – Name of the port to inspect.

Returns:

True if the port can be resized.

is_vector_port(port: str) bool[source]

Returns whether a port is a vector or scalar port

If a port has been declared to be a vector port (i.e. the name passed when creating this Instance had ‘[]’ at the end), then you can pass a ‘slot’ argument when sending or receiving. It’s like the port is a vector of slots on which you can send or receive messages.

This function returns True if the given port is a vector port, and False if it is a scalar port.

Parameters:

port – The port to check this property of.

list_ports() Dict[Operator, List[str]][source]

Returns a description of the ports that this Instance has.

Note that the result has almost the same format as the port declarations you pass when making an Instance. The only difference is that the port names never have [] at the end, even if the port is a vector port.

Returns:

A dictionary, indexed by Operator, containing lists of port names. Operators with no associated ports are not included.

list_settings() List[str][source]

List settings by name.

This function returns a list of names of the available settings.

Returns:

A list of setting names.

load_snapshot() Message[source]

Load a snapshot.

Must only be called when resuming() returns True.

Returns:

Message object containing the state as saved in a previous run through save_snapshot() or save_final_snapshot().

Raises:

RuntimeError – if not resuming from a snapshot.

receive(port_name: str, slot: int | None = None, default: Message | None = None) Message[source]

Receive a message from the outside world.

Receiving is a blocking operation. This function will contact the sender, wait for a message to be available, and receive and return it.

If the port you are receiving on is not connected, the default value you specified will be returned exactly as you passed it. If you didn’t specify a default value (e.g. because there is no reasonable default, you really need the outside input) and the port is not connected, you’ll get a RuntimeError.

Parameters:
  • port_name – The endpoint on which a message is to be received.

  • slot – The slot to receive the message on, if any.

  • default – A default value to return if this port is not connected.

Returns:

The received message. The settings attribute of the received message will be None.

Raises:

RuntimeError – If the given port is not connected and no default value was given.

receive_with_settings(port_name: str, slot: int | None = None, default: Message | None = None) Message[source]

Receive a message with attached settings overlay.

This function should not be used in submodels. It is intended for use by special component that are ensemble-aware and have to pass on overlay settings explicitly.

Receiving is a blocking operation. This function will contact the sender, wait for a message to be available, and receive and return it.

If the port you are receiving on is not connected, the default value you specified will be returned exactly as you passed it. If you didn’t specify a default value (e.g. because there is no reasonable default, and you really need the outside input) and the port is not connected, then you’ll get a RuntimeError.

Parameters:
  • port_name – The endpoint on which a message is to be received.

  • slot – The slot to receive the message on, if any.

  • default – A default value to return if this port is not connected.

Returns:

The received message. The settings attribute will contain the received Settings, and will not be None.

Raises:

RuntimeError – If the given port is not connected and no default value was given.

resuming() bool[source]

Check if this instance is resuming from a snapshot.

Must be used by submodels that implement the checkpointing API. You’ll get a RuntimeError when not calling this method in an iteration of the reuse loop.

This method returns True for the first iteration of the reuse loop after resuming from a previously taken snapshot. When resuming from a snapshot, the submodel must load its state from the snapshot as returned by load_snapshot().

Returns:

True iff the submodel must resume from a snapshot.

reuse_instance() bool[source]

Decide whether to run this instance again.

In a multiscale simulation, instances get reused all the time. For example, in a macro-micro simulation, the micromodel does a complete run for every timestep of the macromodel. Rather than starting up a new instance of the micromodel, which could be expensive, we reuse a single instance many times.

This may bring other advantages, such as faster convergence when starting from the previous final state, and in some cases may be necessary if micromodel state needs to be preserved from one macro timestep to the next.

So in MUSCLE, submodels run in a reuse loop, which runs them over and over again until their work is done and they should be shut down. Whether to do another F_INIT, O_I, S, O_F cycle is decided by this method.

This method must be called at the beginning of the reuse loop, i.e. before the F_INIT operator, and its return value should decide whether to enter that loop again.

Raises:

RuntimeError – When implementing the checkpointing API, but libmuscle detected incorrect API calls. The description of the RuntimeError indicates which calls are incorrect or missing. For more information see the checkpointing API documentation in resuming(), load_snapshot(), should_save_snapshot(), save_snapshot(), should_save_final_snapshot() and save_final_snapshot(), or the checkpointing tutorial.

save_final_snapshot(message: Message) None[source]

Save a snapshot at the end of the reuse loop.

Before saving a snapshot, you should check using should_save_final_snapshot() if a snapshot should be saved according to the checkpoint rules specified in the ymmsl configuration.

See also save_snapshot() for the variant that may be called after each S Operator of the submodel.

Parameters:

message – Message object that is saved as snapshot. The data attribute can be used to store the internal state of the submodel.

save_snapshot(message: Message) None[source]

Save a snapshot after the S Operator of the submodel.

Before saving a snapshot, you should check using should_save_snapshot() if a snapshot should be saved according to the checkpoint rules specified in the ymmsl configuration. You should use the same timestamp in the provided Message object as used to query should_save_snapshot.

See also save_final_snapshot() for the variant that must be called at the end of the reuse loop.

Parameters:

message – Message object that is saved as snapshot. The message timestamp attribute should be the same as passed to should_save_snapshot(). The data attribute can be used to store the internal state of the submodel.

send(port_name: str, message: Message, slot: int | None = None) None[source]

Send a message to the outside world.

Sending is non-blocking, a copy of the message will be made and stored in memory until the receiver is ready to receive it.

Parameters:
  • port_name – The port on which this message is to be sent.

  • message – The message to be sent.

  • slot – The slot to send the message on, if any.

set_port_length(port: str, length: int) None[source]

Resizes the port to the given length.

You should check whether the port is resizable using is_resizable() first; whether it is depends on how this component is wired up, so you should check.

Parameters:
  • port – Name of the port to resize.

  • length – The new length.

Raises:

RuntimeError – If the port is not resizable.

should_init() bool[source]

Check if this instance should initialize.

Must be used by submodels that implement the checkpointing API.

When resuming from a previous snapshot, instances need not always execute the F_INIT phase of the submodel execution loop. Use this method before attempting to receive data on F_INIT ports.

Returns:

True if the submodel must execute the F_INIT step, False otherwise.

should_save_final_snapshot() bool[source]

Check if a snapshot should be saved at the end of the reuse loop.

This method checks if a snapshot should be saved at the end of the reuse loop. All your communication on O_F ports must be finished before calling this method, otherwise your simulation may deadlock.

When this method returns True, the submodel must also save a snapshot through save_final_snapshot(). A RuntimeError will be generated if this is not done.

See also should_save_snapshot() for the variant that may be called inside of a time-integration loop of the submodel.

Note

This method will block until it can determine whether a final snapshot should be taken, because it must determine if this instance is reused.

Returns:

True iff a final snapshot should be taken by the submodel according to the checkpoint rules provided in the ymmsl configuration.

should_save_snapshot(timestamp: float) bool[source]

Check if a snapshot should be saved after the S Operator of the submodel.

This method checks if a snapshot should be saved right now, based on the provided timestamp and passed wallclock time.

When this method returns True, the submodel must also save a snapshot through save_snapshot(). A RuntimeError will be generated if this is not done.

See also should_save_final_snapshot() for the variant that must be called at the end of the reuse loop.

Parameters:

timestamp – current timestamp of the submodel.

Returns:

True iff a snapshot should be taken by the submodel according to the checkpoint rules provided in the ymmsl configuration.

class libmuscle.InstanceFlags(value)[source]

Bases: Flag

Enumeration of properties that an instance may have.

You may combine multiple flags using the bitwise OR operator |. For example:

from libmuscle import (
        Instance, USES_CHECKPOINT_API, DONT_APPLY_OVERLAY)

ports = ...
flags = USES_CHECKPOINT_API | DONT_APPLY_OVERLAY
instance = Instance(ports, flags)
DONT_APPLY_OVERLAY = 1

Do not apply the received settings overlay during prereceive of F_INIT messages. If you’re going to use Instance.receive_with_settings() on your F_INIT ports, you need to set this flag when creating an Instance.

If you don’t know what that means, do not specify this flag and everything will be fine. If it turns out that you did need to specify the flag, MUSCLE3 will tell you about it in an error message and you can add it still.

KEEPS_NO_STATE_FOR_NEXT_USE = 4

Indicate this instance does not carry state between iterations of the reuse loop. Specifying this flag is equivalent to ymmsl.KeepsStateForNextUse.NO.

By default, (if neither KEEPS_NO_STATE_FOR_NEXT_USE nor STATE_NOT_REQUIRED_FOR_NEXT_USE are provided), the instance is assumed to keep state between reuses, and to require that state (equivalent to ymmsl.KeepsStateForNextUse.NECESSARY).

STATE_NOT_REQUIRED_FOR_NEXT_USE = 8

Indicate this instance carries state between iterations of the reuse loop, however this state is not required for restarting. Specifying this flag is equivalent to ymmsl.KeepsStateForNextUse.HELPFUL.

By default, (if neither KEEPS_NO_STATE_FOR_NEXT_USE nor STATE_NOT_REQUIRED_FOR_NEXT_USE are provided), the instance is assumed to keep state between reuses, and to require that state (equivalent to ymmsl.KeepsStateForNextUse.NECESSARY).

USES_CHECKPOINT_API = 2

Indicate that this instance supports checkpointing.

You may not use any checkpointing API calls when this flag is not supplied.

class libmuscle.Message(timestamp: float, next_timestamp: float | None = None, data: Any | None = None, settings: Settings | None = None)[source]

Bases: object

A message to be sent or received.

This class describes a message to be sent or that has been received.

timestamp

Simulation time for which this data is valid.

Type:

float

next_timestamp

Simulation time for the next message to be transmitted through this port.

Type:

Optional[float]

data

An object to send or that was received.

Type:

MessageObject

settings

Overlay settings to send or that was received.

Type:

Settings

__init__(timestamp: float, next_timestamp: float | None = None, data: Any | None = None, settings: Settings | None = None) None[source]

Create a Message.

Parameters:
  • timestamp – Simulation time for which this data is valid.

  • next_timestamp – Simulation time for the next message to be transmitted through this port.

  • data – An object to send or that was received.

  • settings – Overlay settings to send or that were received.

class libmuscle.ProfileDatabase(db_file: str | Path)[source]

Bases: object

Accesses a profiling database.

This class accesses a MUSCLE3 profiling database and provides basic analysis functionality.

__init__(db_file: str | Path) None[source]

Open a ProfileDatabase.

This opens the database file and creates a ProfileDatabase object that operates on it.

Note that the connection to the database needs to be closed again when done. You should use this as a context manager for that, e.g.

with ProfileDatabase('performance.sqlite') as db:
    # use db here
close() None[source]

Close the connection to the database.

This should be called once by each thread that used this object before it goes down, so that its connection to the database can be closed.

It is usually better to use this class as a context manager, e.g.

with ProfileDatabase('performance.sqlite') as db:
    # use db here
instance_stats() Tuple[List[str], List[float], List[float], List[float]][source]

Calculate per-instance statistics.

This calculates the total time spent computing, the total time spent communicating, and the total time spent waiting for a message to arrive, for each instance, in seconds.

It returns a tuple of four lists, containing instance names, run times, communication times, and wait times. Note that the run times do not include data transfer or waiting for messages, so these three are exclusive and add up to the total time the instance was active.

Note that sending messages in MUSCLE3 is partially done in the background. Transfer times include encoding and queueing of any sent messages as well as downloading received messages, but it does not include the sending side of the transfer, as this is done by a background thread in parallel with other work that is recorded (usually waiting, sometimes computing or sending another message).

Nevertheless, this should give you an idea of which instances use the most resources. Keep in mind though that different instances may use different numbers of cores, so a model that doesn’t spend much wallclock time may still spend many core hours. Secondly, waiting does not necessarily leave a core idle as MUSCLE3 is capable of detecting when models will not compute at the same time and having them share cores.

See the profiling documentation page for an example.

Returns:

A list of instance names, a corresponding list of compute times, a corresponding list of transfer times, and a corresponding list of wait times.

resource_stats() Dict[str, Dict[str, float]][source]

Calculate per-core statistics.

This function calculates the amount of time each core has spent running each component assigned to it. It returns a dictionary indexed by (node, core) tuples which contains for each core a nested dictionary mapping components to the number of seconds that component used that core for. This includes time spent calculating and time spent receiving data, but not time spent waiting for input.

Returns:

A dictionary containing for each (node, core) tuple a dictionary containing for each component the total time it ran on that core.

time_taken(*, etype: str, instance: str | None = None, port: str | None = None, slot: int | None = None, time: str | None = 'start', etype2: str | None = None, port2: str | None = None, slot2: int | None = None, time2: str | None = 'stop', aggregate: str = 'mean') float[source]

Calculate time of and between events.

This function returns the mean or total time spent on or between selected points in time recorded in the database, in nanoseconds. Note that due to operating system limitations, actual precision for individual measurements is limited to about a microsecond.

For profiling purposes, an event is an operation performed by one of the instances in the simulation. It has a type, a start time, and a stop time. For example, when an instance sends a message, this is recorded as an event of type SEND, with associated timestamps. For some events, other information may also be recorded, such as the port and slot a message was sent or received on, message size, and so on.

This function takes two points in time, each of which is the beginning or the end of a certain kind of event, and calculates the time between those two points. For example, to calculate how long it takes instance micro to send a message on port final_state, you can do

db.time_taken(
        etype='SEND', instance='micro', port='final_state')

This selects events of type SEND, as well as an instance and a port, and since we didn’t specify anything else, we get the time taken from the beginning to the end of the selected event. The micro model is likely to have sent many messages, and this function will automatically calculate the mean duration. So this tells us on average how long it takes micro to send a message on final_state.

Averaging will be done over all attributes that are not specified, so for example if final_state is a vector port, then the average will be taken over all sends on all slots, unless a specific slot is specified by a slot argument.

It is also possible to calculate time between different events. For example, if we know that micro receives on initial_state, does some calculations, and then sends on state_out, and we want to know how long the calculations take, then we can use

db.time_taken(
        instance='micro', port='initial_state',
        etype='RECEIVE', time='stop',
        port2='final_state', etype2='SEND',
        time2='start')

This gives the time between the end of a receive on initial_state and the start of a subsequent send on final_state. The arguments with a 2 at the end of their name refer to the end of the period we’re measuring, and by default their value is taken from the corresponding start argument. So, the first command is actually equivalent to

db.time_taken(
    etype='SEND', instance='micro', port='final_state',
    slot=None, time='start', etype2='SEND',
    port2='final_state', slot2=None, time2='stop')

which says that we measure the time from the start of each send by micro on final_state to the end of each send on final_state, aggregating over all slots if applicable.

Speaking of aggregation, there is a final argument aggregate which defaults to mean, but can be set to sum to calculate the sum instead. For example:

db.time_taken(
        etype='RECEIVE_WAIT', instance='macro',
        port='state_in', aggregate='sum')

gives the total time that macro has spent waiting for a message to arrive on its state_in port.

If you are taking points in time from different events (e.g. different instances, ports, slots or types) then there must be the same number of events in the database for the start and end event. So starting at the end of REGISTER and stopping at the beginning of a SEND on an O_I port will likely not work, because the instance only registers once and probably sends more than once.

Parameters:
  • etype – Type of event to get the starting point from. Possible values: ‘REGISTER’, ‘CONNECT’, ‘SHUTDOWN_WAIT’, ‘DISCONNECT_WAIT’, ‘SHUTDOWN’, ‘DEREGISTER’, ‘SEND’, ‘RECEIVE’, ‘RECEIVE_WAIT’, ‘RECEIVE_TRANSFER’, ‘RECEIVE_DECODE’. See the documentation for a description of each.

  • instance – Name of the instance to get the event from. You can use % as a wildcard matching anything. For example, ‘macro[%’ will match all instances of the macro component, if it has many.

  • port – Selected port, for send and receive events.

  • slot – Selected slot, for send and receive events.

  • time – Either ‘start’ or ‘stop’, to select the beginning or the end of the specified event. Defaults to ‘start’.

  • etype2 – Type of event to get the stopping point from. See etype. Defaults to the value of etype.

  • port2 – Selected port. See port. Defaults to the value of port.

  • slot2 – Selected slot. See slot. Defaults to the value of slot.

  • time2 – Either ‘start’ or ‘stop’, to select the beginning or the end of the specified event. Defaults to ‘stop’.

  • aggregate – Either ‘mean’ (default) or ‘sum’, to calculate that statistic.

Returns:

The mean or total time taken in nanoseconds.

A simple runner for Python-only models.

Starting instances is out of scope for MUSCLE3, but is also very useful for testing and prototyping. So we have a little bit of support for it in this module.

libmuscle.runner.run_simulation(configuration: Configuration, implementations: Dict[str, Callable]) None[source]

Runs a simulation with the given configuration and instances.

The yMMSL document must contain both a model and settings.

This function will start the necessary instances described in the yMMSL document. To do so, it needs the corresponding implementations, which are given as a dictionary mapping the implementation name to a Python function (or any callable).

Parameters:
  • configuration – A description of the model and settings.

  • instances – A dictionary of instances to run.