libmuscle.checkpoint_triggers module

class libmuscle.checkpoint_triggers.AtCheckpointTrigger(at_rules: List[CheckpointAtRule])[source]

Bases: CheckpointTrigger

Represents a trigger based on an “at” checkpoint rule

This triggers at the specified times.

next_checkpoint(cur_time: float) float | None[source]

Calculate the next checkpoint time

Parameters:

cur_time – current time.

Returns:

The time when a next checkpoint should be taken, or None if this trigger has no checkpoint after cur_time.

previous_checkpoint(cur_time: float) float | None[source]

Calculate the previous checkpoint time

Parameters:

cur_time – current time.

Returns:

The time when a previous checkpoint should have been taken, or None if this trigger has no checkpoint after cur_time.

class libmuscle.checkpoint_triggers.CheckpointTrigger[source]

Bases: object

Represents a trigger for creating snapshots

next_checkpoint(cur_time: float) float | None[source]

Calculate the next checkpoint time

Parameters:

cur_time – current time.

Returns:

The time when a next checkpoint should be taken, or None if this trigger has no checkpoint after cur_time.

previous_checkpoint(cur_time: float) float | None[source]

Calculate the previous checkpoint time

Parameters:

cur_time – current time.

Returns:

The time when a previous checkpoint should have been taken, or None if this trigger has no checkpoint after cur_time.

class libmuscle.checkpoint_triggers.CombinedCheckpointTriggers(checkpoint_rules: List[CheckpointRule])[source]

Bases: CheckpointTrigger

Checkpoint trigger based on a combination of “at” and “ranges”

next_checkpoint(cur_time: float) float | None[source]

Calculate the next checkpoint time

Parameters:

cur_time – current time.

Returns:

The time when a next checkpoint should be taken, or None if this trigger has no checkpoint after cur_time.

previous_checkpoint(cur_time: float) float | None[source]

Calculate the previous checkpoint time

Parameters:

cur_time – current time.

Returns:

The time when a previous checkpoint should have been taken, or None if this trigger has no checkpoint after cur_time.

class libmuscle.checkpoint_triggers.RangeCheckpointTrigger(range: CheckpointRangeRule)[source]

Bases: CheckpointTrigger

Represents a trigger based on a “ranges” checkpoint rule

This triggers at a range of checkpoint moments.

Equivalent an “at” rule [start, start + step, start + 2*step, ...] for as long as start + i*step <= stop.

Stop may be omitted, in which case the range is infinite.

Start may be omitted, in which case the range is equivalent to an “at” rule [..., -n*step, ..., -step, 0, step, 2*step, ...] for as long as i*step <= stop.

next_checkpoint(cur_time: float) float | None[source]

Calculate the next checkpoint time

Parameters:

cur_time – current time.

Returns:

The time when a next checkpoint should be taken, or None if this trigger has no checkpoint after cur_time.

previous_checkpoint(cur_time: float) float | None[source]

Calculate the previous checkpoint time

Parameters:

cur_time – current time.

Returns:

The time when a previous checkpoint should have been taken, or None if this trigger has no checkpoint after cur_time.

class libmuscle.checkpoint_triggers.TriggerManager[source]

Bases: object

Manages all checkpoint triggers and checks if a snapshot must be saved.

checkpoints_considered_until() float[source]

Return elapsed time of last should_save*

elapsed_walltime() float[source]

Returns elapsed wallclock_time in seconds.

get_triggers() List[str][source]

Get trigger description(s) for the current reason for checkpointing.

harmonise_wall_time(at_least: float) None[source]

Ensure our elapsed time is at least the given value

set_checkpoint_info(elapsed: float, checkpoints: Checkpoints) None[source]

Register checkpoint info received from the muscle manager.

should_save_final_snapshot(do_reuse: bool, f_init_max_timestamp: float | None) bool[source]

Handles instance.should_save_final_snapshot

should_save_snapshot(timestamp: float) bool[source]

Handles instance.should_save_snapshot

update_checkpoints(timestamp: float) None[source]

Update last and next checkpoint times when a snapshot is made.

Parameters:

timestamp – timestamp as reported by the instance (or from incoming F_INIT messages for save_final_snapshot).