libmuscle.communicator module

class libmuscle.communicator.Communicator(kernel: Reference, index: List[int], declared_ports: Dict[Operator, List[str]] | None, profiler: Profiler)[source]

Bases: object

Communication engine for MUSCLE3.

This class is the mailroom for a kernel that uses MUSCLE3. It manages the sending and receiving of messages, although it leaves the actual data transmission to various protocol-specific servers and clients.

close_port(port_name: str, slot: int | None = None) None[source]

Closes the given port.

This signals to any connected instance that no more messages will be sent on this port, which it can use to decide whether to shut down or continue running.

Parameters:

port_name – The name of the port to close.

connect(conduits: List[Conduit], peer_dims: Dict[Reference, List[int]], peer_locations: Dict[Reference, List[str]]) None[source]

Connect this Communicator to its peers.

This is the second stage in the simulation wiring process.

Peers here are instances, and peer_dims and peer_locations are indexed by a Reference to an instance. Instance sets are multi-dimensional arrays with sizes given by peer_dims.

Parameters:
  • conduits – A list of conduits attached to this component, as received from the manager.

  • peer_dims – For each peer we share a conduit with, the dimensions of the instance set.

  • peer_locations – A list of locations for each peer instance we share a conduit with.

get_locations() List[str][source]

Returns a list of locations that we can be reached at.

These locations are of the form ‘protocol:location’, where the protocol name does not contain a colon and location may be an arbitrary string.

Returns:

A list of strings describing network locations.

get_message_counts() Dict[str, List[int]][source]

Get message counts for all ports on the communicator.

get_port(port_name: str) Port[source]

Returns a Port object describing a port with the given name.

Parameters:

port – The port to retrieve.

list_ports() Dict[Operator, List[str]][source]

Returns a description of the ports this Communicator has.

Returns:

A dictionary, indexed by Operator, containing lists of port names. Operators with no associated ports are not included.

port_exists(port_name: str) bool[source]

Returns whether a port with the given name exists.

Parameters:

port_name – Port name to check.

receive_message(port_name: str, slot: int | None = None, default: Message | None = None) Tuple[Message, float][source]

Receive a message and attached settings overlay.

Receiving is a blocking operation. This function will contact the sender, wait for a message to be available, and receive and return it.

If the port is not connected, then the default value will be returned if one was given, exactly as it was given. If no default was given then a RuntimeError will be raised.

Parameters:
  • port_name – The endpoint on which a message is to be received.

  • slot – The slot to receive the message on, if any.

  • default – A message to return if this port is not connected.

Returns:

The received message, with message.settings holding the settings overlay. The settings attribute is guaranteed to not be None. Secondly, the saved_until metadata field from the received message.

Raises:

RuntimeError – If no default was given and the port is not connected.

restore_message_counts(port_message_counts: Dict[str, List[int]]) None[source]

Restore message counts on all ports.

send_message(port_name: str, message: Message, slot: int | None = None, checkpoints_considered_until: float = -inf) None[source]

Send a message and settings to the outside world.

Sending is non-blocking, a copy of the message will be made and stored until the receiver is ready to receive it.

Parameters:
  • port_name – The port on which this message is to be sent.

  • message – The message to be sent.

  • slot – The slot to send the message on, if any.

  • checkpoints_considered_until – When we last checked if we should save a snapshot (wallclock time).

settings_in_connected() bool[source]

Returns True iff muscle_settings_in is connected.

shutdown() None[source]

Shuts down the Communicator, closing connections.

class libmuscle.communicator.Message(timestamp: float, next_timestamp: float | None = None, data: Any | None = None, settings: Settings | None = None)[source]

Bases: object

A message to be sent or received.

This class describes a message to be sent or that has been received.

timestamp

Simulation time for which this data is valid.

Type:

float

next_timestamp

Simulation time for the next message to be transmitted through this port.

Type:

Optional[float]

data

An object to send or that was received.

Type:

MessageObject

settings

Overlay settings to send or that was received.

Type:

Settings