libmuscle.mmp_client module

exception libmuscle.mmp_client.ConnectionLockedError[source]

Bases: RuntimeError

class libmuscle.mmp_client.MMPClient(instance_id: Reference, location: str)[source]

Bases: object

The client for the MUSCLE Manager Protocol.

This class connects to the Manager and communicates with it on behalf of the rest of libmuscle.

It manages the connection, and converts between our native types and the gRPC generated types.

Communication is protected by an internal lock, so this class can be called simultaneously from different threads.

close() None[source]

Close the connection

This closes the connection. After this no other member functions can be called.

deregister_instance() None[source]

Deregister a component instance with the manager.

get_checkpoint_info() tuple[float, Checkpoints, Path | None, Path | None][source]

Get the checkpoint info from the manager.

Returns:

current elapsed time checkpoints: checkpoint configuration resume: path to the resume snapshot snapshot_directory: path to store snapshots

Return type:

elapsed_time

get_settings() Settings[source]

Get the central settings from the manager.

Returns:

The requested settings.

is_deadlocked() bool[source]

Ask the manager if we’re part of a deadlock.

register_instance(locations: list[str], ports: list[Port]) None[source]

Register a component instance with the manager.

Parameters:
  • locations – List of places where the instance can be reached.

  • ports – List of ports of this instance.

request_peers() PeerInfo[source]

Request connection information about peers.

This will repeat the request at an exponentially increasing query interval at first, until it reaches the interval specified by PEER_INTERVAL_MIN and PEER_INTERVAL_MAX. From there on, intervals are drawn randomly from that range.

Returns:

PeerInfo received from the muscle manager.

submit_log_message(message: LogMessage) None[source]

Send a log message to the manager.

This particular call is a bit tricky because of its potentially recursive nature. It’s used by a special logging handler (see logging_handler.py) to send high-priority log messages to the manager for inclusion in the manager log. The problem is that the connection to the manager may fail while doing so, which causes more log messages to be generated (dropped connection are rare and shouldn’t really happen, so we want the user to know about them). Of course those then get picked up by the handler, which sends them here recursively.

The Python logging system has an internal mutex, and we’ve got a mutex here as well, and this recursion causes a thread to try to lock them alternatingly. If the thread starts with making a request to the manager, then it will try to lock the MMPClient mutex first, then the logging mutex, but if it starts with sending a log message, then it will try to lock the logging mutex first and then the MMPClient one. If two threads try to lock two mutexes in a different order then you’ll get a deadlock.

To avoid this, this function will not wait for the internal lock forever, but time out after a while and raise. Note that the actual implementation is in _call_manager().

Parameters:

message – The message to send.

Raises:

ConnectionLockedError – if the connection to the manager was being used already.

submit_profile_events(events: Iterable[ProfileEvent]) None[source]

Sends profiling events to the manager.

Parameters:

events – The events to send.

submit_snapshot_metadata(snapshot_metadata: SnapshotMetadata) None[source]

Send snapshot metadata to the manager.

Parameters:

snapshot_metadata – Snapshot metadata to supply to the manager.

waiting_for_receive(peer_instance_id: Reference, port_name: str, slot: int | None) None[source]

Notify the manager that we’re waiting to receive a message.

waiting_for_receive_done(peer_instance_id: Reference, port_name: str, slot: int | None) None[source]

Notify the manager that we’re done waiting to receive a message.

libmuscle.mmp_client.decode_checkpoint_info(elapsed_time: float, checkpoints_dict: dict[str, Any], resume: str | None, snapshot_dir: str | None) tuple[float, Checkpoints, Path | None, Path | None][source]

Decode checkpoint info from a MsgPack-compatible value.

Parameters:
  • elapsed_time – current elapsed time according to the manager

  • checkpoints_dict – checkpoint definitions from the MsgPack

  • resume – path to the snapshot we should resume from, if any

  • snapshot_dir – path to the directory to store new snapshots in

Returns:

current elapsed time according to the manager checkpoints: checkpoint configuration resume: path to the snapshot we should resume from, if any snapshot_dir: path to the directory to store new snapshots in

Return type:

elapsed_time

libmuscle.mmp_client.decode_checkpoint_rule(rule: dict[str, Any]) CheckpointRule[source]

Decode a checkpoint rule from a MsgPack-compatible value.

libmuscle.mmp_client.encode_operator(op: Operator) str[source]

Convert an Operator to a MsgPack-compatible value.

libmuscle.mmp_client.encode_port(port: Port) list[str][source]

Convert a Port to a MsgPack-compatible value.

libmuscle.mmp_client.encode_profile_event(event: ProfileEvent) Any[source]

Converts a ProfileEvent to a list.

Parameters:

event – A profile event

Returns:

A list with its attributes, for MMP serialisation.