libmuscle.mmp_client module
- exception libmuscle.mmp_client.ConnectionLockedError[source]
Bases:
RuntimeError
- class libmuscle.mmp_client.MMPClient(instance_id: Reference, location: str)[source]
Bases:
objectThe client for the MUSCLE Manager Protocol.
This class connects to the Manager and communicates with it on behalf of the rest of libmuscle.
It manages the connection, and converts between our native types and the gRPC generated types.
Communication is protected by an internal lock, so this class can be called simultaneously from different threads.
- close() None[source]
Close the connection
This closes the connection. After this no other member functions can be called.
- get_checkpoint_info() tuple[float, Checkpoints, Path | None, Path | None][source]
Get the checkpoint info from the manager.
- Returns:
current elapsed time checkpoints: checkpoint configuration resume: path to the resume snapshot snapshot_directory: path to store snapshots
- Return type:
elapsed_time
- get_settings() Settings[source]
Get the central settings from the manager.
- Returns:
The requested settings.
- register_instance(locations: list[str], ports: list[Port]) None[source]
Register a component instance with the manager.
- Parameters:
locations – List of places where the instance can be reached.
ports – List of ports of this instance.
- request_peers() PeerInfo[source]
Request connection information about peers.
This will repeat the request at an exponentially increasing query interval at first, until it reaches the interval specified by PEER_INTERVAL_MIN and PEER_INTERVAL_MAX. From there on, intervals are drawn randomly from that range.
- Returns:
PeerInfo received from the muscle manager.
- submit_log_message(message: LogMessage) None[source]
Send a log message to the manager.
This particular call is a bit tricky because of its potentially recursive nature. It’s used by a special logging handler (see logging_handler.py) to send high-priority log messages to the manager for inclusion in the manager log. The problem is that the connection to the manager may fail while doing so, which causes more log messages to be generated (dropped connection are rare and shouldn’t really happen, so we want the user to know about them). Of course those then get picked up by the handler, which sends them here recursively.
The Python logging system has an internal mutex, and we’ve got a mutex here as well, and this recursion causes a thread to try to lock them alternatingly. If the thread starts with making a request to the manager, then it will try to lock the MMPClient mutex first, then the logging mutex, but if it starts with sending a log message, then it will try to lock the logging mutex first and then the MMPClient one. If two threads try to lock two mutexes in a different order then you’ll get a deadlock.
To avoid this, this function will not wait for the internal lock forever, but time out after a while and raise. Note that the actual implementation is in _call_manager().
- Parameters:
message – The message to send.
- Raises:
ConnectionLockedError – if the connection to the manager was being used already.
- submit_profile_events(events: Iterable[ProfileEvent]) None[source]
Sends profiling events to the manager.
- Parameters:
events – The events to send.
- submit_snapshot_metadata(snapshot_metadata: SnapshotMetadata) None[source]
Send snapshot metadata to the manager.
- Parameters:
snapshot_metadata – Snapshot metadata to supply to the manager.
- libmuscle.mmp_client.decode_checkpoint_info(elapsed_time: float, checkpoints_dict: dict[str, Any], resume: str | None, snapshot_dir: str | None) tuple[float, Checkpoints, Path | None, Path | None][source]
Decode checkpoint info from a MsgPack-compatible value.
- Parameters:
elapsed_time – current elapsed time according to the manager
checkpoints_dict – checkpoint definitions from the MsgPack
resume – path to the snapshot we should resume from, if any
snapshot_dir – path to the directory to store new snapshots in
- Returns:
current elapsed time according to the manager checkpoints: checkpoint configuration resume: path to the snapshot we should resume from, if any snapshot_dir: path to the directory to store new snapshots in
- Return type:
elapsed_time
- libmuscle.mmp_client.decode_checkpoint_rule(rule: dict[str, Any]) CheckpointRule[source]
Decode a checkpoint rule from a MsgPack-compatible value.
- libmuscle.mmp_client.encode_operator(op: Operator) str[source]
Convert an Operator to a MsgPack-compatible value.
- libmuscle.mmp_client.encode_port(port: Port) list[str][source]
Convert a Port to a MsgPack-compatible value.
- libmuscle.mmp_client.encode_profile_event(event: ProfileEvent) Any[source]
Converts a ProfileEvent to a list.
- Parameters:
event – A profile event
- Returns:
A list with its attributes, for MMP serialisation.