Source code for libmuscle.planner.planner

import logging
from collections.abc import Iterable, Mapping
from copy import copy

from ymmsl.v0_2 import (
    Component,
    Configuration,
    Model,
    MPICoresResReq,
    MPINodesResReq,
    Operator,
    Reference,
    ResourceRequirements,
    ThreadedResReq,
)

from libmuscle.planner.resources import OnNodeResources, Resources
from libmuscle.util import instance_indices

_logger = logging.getLogger(__name__)


_PredSuccType = dict[Component, set[tuple[Component, int]]]


[docs] class ModelGraph: """Represents a yMMSL model as a graph. This introduces some definitions: - Call conduits (from O_I to F_INIT) go from a direct superpredecessor to a direct subsuccessor. - Dispatch conduits (from O_F to F_INIT) go from a direct predecessor to a direct successor. - Release conduits (from O_F to S) go from a direct subpredecessor to a direct supersuccessor. - Supersuccessors are components that are reachable from the current component via dispatch and at least one release conduits. - Subsuccessors are components that are reachable from the current component via dispatch and at least one call conduit. - Successors are components that will start running after the current component finishes. - Superpredecessors are the inverse of subsuccessors. - Subpredecessors are the inverse of supersuccessors. - Predecessors are the inverse of successors. - Macros are components that are both a superpredecessor and a supersuccessor of the current component. - Micros are components that are both a subsuccessor and a subpredecessor of the current component. """ def __init__(self, model: Model) -> None: """Create a ModelGraph. This is essentially a helper class that makes it easier to analyse a yMMSL model definition. Args: model: The model to represent. """ self._model = model self._direct_superpreds: _PredSuccType = {} self._direct_predecessors: _PredSuccType = {} self._direct_subpreds: _PredSuccType = {} self._direct_supersuccs: _PredSuccType = {} self._direct_successors: _PredSuccType = {} self._direct_subsuccs: _PredSuccType = {} self._superpreds: _PredSuccType = {} self._predecessors: _PredSuccType = {} self._subpreds: _PredSuccType = {} self._supersuccs: _PredSuccType = {} self._successors: _PredSuccType = {} self._subsuccs: _PredSuccType = {} self._calc_direct_succs_preds() self._calc_predecessors() self._calc_successors()
[docs] def components(self) -> Iterable[Component]: """Return the components of the model (nodes).""" return self._model.components.values()
[docs] def component(self, name: Reference) -> Component: """Return a component by name. Args: name: Name of the component to look up Returns: The component with the given name Raises: KeyError: If no component could be found """ return self._model.components[name]
[docs] def successors(self, component: Component) -> set[tuple[Component, int]]: """Return the successors of the given component. Args: component: The reference component Returns: The set of components for which a path exists from component's O_F to their F_INIT. Raises: KeyError: If the component is not in the model used to construct this object. """ return self._successors[component]
[docs] def predecessors(self, component: Component) -> set[tuple[Component, int]]: """Return the predecessors of the given component. Args: component: The reference component Returns: The set of components for which a path exists from their O_F to component's F_INIT. Raises: KeyError: If the component is not in the model used to construct this object. """ return self._predecessors[component]
[docs] def macros(self, component: Component) -> set[tuple[Component, int]]: """Return the macros of the given component. These are components that are both before the component's F_INIT and after its O_F. Args: component: The reference component Returns: The set of components that are both super-predecessor and super-successor of component. """ return self._superpreds[component] & self._supersuccs[component]
[docs] def micros(self, component: Component) -> set[tuple[Component, int]]: """Return the micros of the given component. These are components that are in between the component's O_I and its S. Args: component: The reference component Returns: The set of components that are both sub-successor and sub-predecessor of component. """ return self._subsuccs[component] & self._subpreds[component]
def _propagate( self, from_set: set[tuple[Component, int]], to_set: set[tuple[Component, int]], shared_dims: int ) -> None: """Propagates from_set into to_set. Note: Modifies to_set. Args: from_set: Set to propagate components from to_set: Set to propagate them into shared_dims: Maximum shared dimensions to carry """ to_set.update({ (cmp, min(sd, shared_dims)) for cmp, sd in from_set}) def _calc_predecessors(self) -> None: """Calculates predecessors of each component in the model. This function determines for each component in the model which other components can be reached by following incoming conduits. Preconditions: self._model set self._direct_* calculated Side effects: Fills self._superpreds, self._predecessors, self._subpreds with for each component in the model the corresponding set of components. """ self._superpreds = {c: set() for c in self._model.components.values()} self._predecessors = {c: set() for c in self._model.components.values()} self._subpreds = {c: set() for c in self._model.components.values()} num_remaining_preds = { c: ( len(self._direct_predecessors[c]) + len(self._direct_superpreds[c])) for c in self._model.components.values()} num_remaining_subpreds = { c: len(self._direct_subpreds[c]) for c in self._model.components.values()} todo = set(self._model.components.values()) started: set[Component] = set() doing: set[Component] = set() finished: set[Component] = set() done: set[Component] = set() while todo or doing: started.clear() for component in todo: if num_remaining_preds[component] == 0: for subsucc, shared_dims in self._direct_subsuccs[component]: self._superpreds[subsucc].add((component, shared_dims)) self._propagate( self._predecessors[component], self._predecessors[subsucc], shared_dims) self._propagate( self._superpreds[component], self._superpreds[subsucc], shared_dims) num_remaining_preds[subsucc] -= 1 started.add(component) todo -= started doing |= started finished.clear() for component in doing: if num_remaining_subpreds[component] == 0: for succ, shared_dims in self._direct_successors[component]: self._predecessors[succ].add((component, shared_dims)) self._propagate( self._subpreds[component], self._predecessors[succ], shared_dims) self._propagate( self._predecessors[component], self._predecessors[succ], shared_dims) self._propagate( self._superpreds[component], self._superpreds[succ], shared_dims) num_remaining_preds[succ] -= 1 for supersucc, shared_dims in self._direct_supersuccs[component]: self._subpreds[supersucc].add((component, shared_dims)) self._propagate( self._subpreds[component], self._subpreds[supersucc], shared_dims) self._propagate( self._predecessors[component], self._subpreds[supersucc], shared_dims) num_remaining_subpreds[supersucc] -= 1 finished.add(component) doing -= finished done |= finished if not started and not finished: raise RuntimeError( 'Could not plan resource allocation for this model.' ' Do you have a cycle of O_F -> F_INIT conduits?' ' That does not work, because the models will all be' ' waiting for each other to start.') def _calc_successors(self) -> None: """Calculates successors of each component in the model. This function determines for each component in the model which other components can be reached by following outgoing conduits. Preconditions: self._model set self._direct_* calculated Side effects: Fills self._supersuccs, self._successors, self._subsuccs with for each component in the model the corresponding set of components. """ self._supersuccs = {c: set() for c in self._model.components.values()} self._successors = {c: set() for c in self._model.components.values()} self._subsuccs = {c: set() for c in self._model.components.values()} num_remaining_succs = { c: ( len(self._direct_successors[c]) + len(self._direct_supersuccs[c])) for c in self._model.components.values()} num_remaining_subsuccs = { c: len(self._direct_subsuccs[c]) for c in self._model.components.values()} todo = set(self._model.components.values()) started: set[Component] = set() doing: set[Component] = set() finished: set[Component] = set() done: set[Component] = set() while todo or doing: started.clear() for component in todo: if num_remaining_succs[component] == 0: for subpred, shared_dims in self._direct_subpreds[component]: self._supersuccs[subpred].add((component, shared_dims)) self._propagate( self._successors[component], self._successors[subpred], shared_dims) self._propagate( self._supersuccs[component], self._supersuccs[subpred], shared_dims) num_remaining_succs[subpred] -= 1 started.add(component) todo -= started doing |= started finished.clear() for component in doing: if num_remaining_subsuccs[component] == 0: for pred, shared_dims in self._direct_predecessors[component]: self._successors[pred].add((component, shared_dims)) self._propagate( self._successors[component], self._successors[pred], shared_dims) self._propagate( self._supersuccs[component], self._supersuccs[pred], shared_dims) self._propagate( self._subsuccs[component], self._successors[pred], shared_dims) num_remaining_succs[pred] -= 1 for superpred, shared_dims in self._direct_superpreds[component]: self._subsuccs[superpred].add((component, shared_dims)) self._propagate( self._successors[component], self._subsuccs[superpred], shared_dims) self._propagate( self._subsuccs[component], self._subsuccs[superpred], shared_dims) num_remaining_subsuccs[superpred] -= 1 finished.add(component) doing -= finished done |= finished def _calc_direct_succs_preds(self) -> None: """Calculates all direct successors and predecessors of components. Preconditions: self._model set Side effects: Sets self._direct_supersuccs to a dictionary mapping each component in the model to the set of components that it has ain O_F -> S conduit to. Sets self._direct_successors to a dictionary mapping each component in the model to the set of components that it has an O_F -> F_INIT conduit to. Sets self._direct_subsuccs to a dictionary mapping each component in the model to the set of components that it has an O_I -> F_INIT conduit to. Sets self._direct_superpreds to a dictionary mapping each component in the model to the set of components that it has an O_I -> F_INIT conduit from. Sets self._direct_predecessors to a dictionary mapping each component in the model to the set of components that it has an O_F -> F_INIT conduit from. Sets self._direct_subpreds to a dictionary mapping each component in the model to the set of components that it has an O_F -> S conduit from. """ self._direct_supersuccs = {c: set() for c in self._model.components.values()} self._direct_successors = {c: set() for c in self._model.components.values()} self._direct_subsuccs = {c: set() for c in self._model.components.values()} self._direct_superpreds = {c: set() for c in self._model.components.values()} self._direct_predecessors = {c: set() for c in self._model.components.values()} self._direct_subpreds = {c: set() for c in self._model.components.values()} for conduit in self._model.conduits: sender = self._model.components[conduit.sending_component()] snd_op = None if conduit.sending_port() in sender.ports: snd_op = sender.ports[conduit.sending_port()].operator receiver = self._model.components[conduit.receiving_component()] recv_port = conduit.receiving_port() recv_op = None if recv_port == 'muscle_settings_in': recv_op = Operator.F_INIT elif receiver.ports: if recv_port in receiver.ports: recv_op = receiver.ports[recv_port].operator shared_dims = min(len(sender.multiplicity), len(receiver.multiplicity)) if (snd_op, recv_op) == (Operator.O_I, Operator.F_INIT): self._direct_superpreds[receiver].add((sender, shared_dims)) self._direct_subsuccs[sender].add((receiver, shared_dims)) elif (snd_op, recv_op) == (Operator.O_F, Operator.F_INIT): self._direct_successors[sender].add((receiver, shared_dims)) self._direct_predecessors[receiver].add((sender, shared_dims)) elif (snd_op, recv_op) == (Operator.O_F, Operator.S): self._direct_subpreds[receiver].add((sender, shared_dims)) self._direct_supersuccs[sender].add((receiver, shared_dims))
[docs] class ResourceAssignment: """Assigned resources for each process of an instance. Note that we use the classes from libmuscle.planner.resources to generically refer to collections of resources, either to describe the available hardware or to designate a subset of it that is occupied by a particular instance, or a subset that isn't currently occupied. This class has more detailed information, because it knows for each process (MPI rank) in the instance which subset of the overall resources for the instance it should be on, which we need to launch it in the right place. Attributes: by_rank: List of OnNodeResources objects containing assigned resources, indexed by rank. """ def __init__(self, by_rank: list[OnNodeResources]) -> None: """Create a ResourceAssignment. Args: by_rank: List of OnNodeResources objects containing assigned resources, indexed by rank. """ self.by_rank = by_rank def __eq__(self, other: object) -> bool: if not isinstance(other, ResourceAssignment): return NotImplemented return ( len(self.by_rank) == len(other.by_rank) and all([ snr == onr for snr, onr in zip(self.by_rank, other.by_rank)])) def __str__(self) -> str: # str(list()) uses repr() on the elements, we want str() str_rbr = ', '.join([str(nr) for nr in self.by_rank]) return f'[{str_rbr}]' def __repr__(self) -> str: return f'ResourceAssignment({repr(self.by_rank)})'
[docs] def as_resources(self) -> Resources: """Return a Resources representing the combined assigned resources.""" result = Resources() for node_res in self.by_rank: result.merge_node(node_res) return result
[docs] class InsufficientResourcesAvailable(RuntimeError): pass
[docs] class Planner: """Allocates resources and keeps track of allocations.""" def __init__(self, all_resources: Resources) -> None: """Create a Planner. Args: all_resources: An object describing the available resources for the planner to use. """ self._all_resources = all_resources self._allocations: dict[Reference, Resources] = {} self._oversubscribed: dict[Reference, Resources] = {} self._next_virtual_node = 1
[docs] def allocate_all( self, configuration: Configuration, virtual: bool = False ) -> dict[Reference, ResourceAssignment]: """Allocates resources for the given components. Allocation can occur either on a fixed set of available resources (virtual set to False), or on an elastic set of virtual resources (virtual set to True). Use the former inside of a (fixed) cluster allocation and the latter to determine how many nodes are needed to run a simulation without oversubscribing. If virtual is True, additional nodes will be added as needed to obtain the resources needed to allocate all instances. Each additional node will have as many cores as a random existing one. The intended use is to pass a single node to __init__ when using this mode. Args: configuration: Configuration to allocate all components of virtual: Allocate on virtual resources or not, see above Returns: Assigned resources for each instance required by the model. """ result: dict[Reference, ResourceAssignment] = {} _logger.debug(f'Planning on resources {self._all_resources}') # Analyse model root_model = configuration.root_model() model = ModelGraph(root_model) programs = configuration.programs exclusive = { i for c in model.components() for i in c.instances() if (c.implementation and not programs[c.implementation].can_share_resources)} requirements: dict[Reference, ResourceRequirements] = { root_model.name + component.name: configuration.get_resources(root_model.name + component.name) for component in model.components() } # Allocate unallocated_instances = [ i for c in model.components() for i in c.instances()] leftover_instances: list[Reference] = [] while unallocated_instances: leftover_instances.clear() to_allocate = self._sort_instances( root_model.name, unallocated_instances, requirements) for instance in to_allocate: _logger.debug(f'Placing {instance}') component = model.component(instance.without_trailing_ints()) conflicting_names = self._conflicting_names( model, exclusive, component, instance) done = False while not done: try: result[instance] = self._assign_instance( instance, component, requirements[root_model.name + component.name], conflicting_names, virtual) done = True except InsufficientResourcesAvailable: if virtual: self._expand_resources( component.name, requirements[root_model.name + component.name]) else: leftover_instances.append(instance) done = True if leftover_instances: _logger.warning( 'Planner ran out of resources, oversubscribing remaining' ' instances.') self._oversubscribed.update(self._allocations) self._allocations.clear() unallocated_instances.clear() unallocated_instances.extend(leftover_instances) return result
def _sort_instances( self, model_name: Reference, instances: list[Reference], requirements: Mapping[Reference, ResourceRequirements] ) -> list[Reference]: """Return to be allocated components in optimal order. This is a heuristic, it's not actually optimal but it should give decent results most of the time. Args: model_name: Name of the model we're planning for instances: The instances to sort requirements: The resource requirements for their components, indexed by name """ cmp_names = map(Reference.without_trailing_ints, instances) reqs = map(lambda n: requirements[model_name + n], cmp_names) instances_reqs = list(zip(instances, reqs)) threaded = [ (i, r.threads) for i, r in instances_reqs if isinstance(r, ThreadedResReq)] sorted_threaded = sorted(threaded, key=lambda x: x[1], reverse=True) sorted_threaded_instances = [x[0] for x in sorted_threaded] mpi = [ (i, r.mpi_processes) for i, r in instances_reqs if isinstance(r, MPICoresResReq)] sorted_mpi = sorted(mpi, key=lambda x: x[1], reverse=True) sorted_mpi_instances = [x[0] for x in sorted_mpi] return sorted_threaded_instances + sorted_mpi_instances def _conflicting_names( self, model: ModelGraph, exclusive: set[Reference], component: Component, instance: Reference) -> set[Reference]: """Find conflicting components. This returns the names of instances that cannot share resources with the given component, so that they can be avoided when assigning resources. Args: model: Model to search exclusive: list of instances that cannot share resources component: Component to find conflicts for instance: Instance (of component) to find conflicts for """ def indices_match( instance1: Reference, instance2: Reference, dims: int) -> bool: idx1 = instance_indices(instance1) idx2 = instance_indices(instance2) return idx1[:dims] == idx2[:dims] def matching_instances( others: set[tuple[Component, int]]) -> set[Reference]: return { i for c, d in others for i in c.instances() if indices_match(i, instance, d)} conflicting_instances = { i for c in model.components() for i in c.instances()} if instance in exclusive: return conflicting_instances conflicting_instances -= matching_instances(model.predecessors(component)) conflicting_instances -= matching_instances(model.successors(component)) if component not in exclusive: micros = matching_instances(model.micros(component)) macros = matching_instances(model.macros(component)) nonconflicting_mms = (micros | macros) - exclusive conflicting_instances -= nonconflicting_mms return conflicting_instances def _expand_resources( self, name: Reference, req: ResourceRequirements) -> None: """Adds an extra virtual node to the available resources.""" taken = True while taken: new_node_name = f'node{self._next_virtual_node:06d}' taken = new_node_name in self._all_resources.nodes() self._next_virtual_node += 1 new_node = copy(next(iter(self._all_resources))) new_node.node_name = new_node_name num_cores = len(new_node.cpu_cores) if isinstance(req, ThreadedResReq): if req.threads > num_cores: raise InsufficientResourcesAvailable( f'Instance {name} requires {req.threads} threads,' f' which is impossible with {num_cores} cores per' ' node.') if isinstance(req, MPICoresResReq): if req.threads_per_mpi_process > num_cores: raise InsufficientResourcesAvailable( f'Instance {name} requires' f' {req.threads_per_mpi_process} threads per process,' f' which is impossible with {num_cores} cores per' ' node.') self._all_resources.add_node(new_node) def _assign_instance( self, instance: Reference, component: Component, requirements: ResourceRequirements, simultaneous_instances: set[Reference], virtual: bool ) -> ResourceAssignment: """Allocates resources for the given instance. If we are on real resources, and the instance requires more threads than our nodes have cores, then we'll just give it all cores on a node and hope for the best. If we are on virtual resources, this will raise InsufficientResourcesAvailable. Args: instance: The instance to assign resources to component: The component it is an instance of requirements: Its resource requirements simultaneous_instances: Instances which may execute simultaneously and whose resources we therefore cannot overlap with virtual: Whether we are on virtual resources Returns: The resources assigned to each process in the instance """ assignment = ResourceAssignment([]) free_resources = copy(self._all_resources) for other in self._allocations: if other in simultaneous_instances: free_resources -= self._allocations[other] _logger.debug(f'Free resources: {free_resources}') try: if isinstance(requirements, ThreadedResReq): assignment.by_rank.append(self._assign_thread_block( free_resources, requirements.threads)) elif isinstance(requirements, MPICoresResReq): if requirements.threads_per_mpi_process != 1: raise RuntimeError( 'Multiple threads per MPI process is not supported' ' yet. Please make an issue on GitHub.') for _ in range(requirements.mpi_processes): block = self._assign_thread_block( free_resources, requirements.threads_per_mpi_process) assignment.by_rank.append(block) free_resources -= Resources([block]) elif isinstance(requirements, MPINodesResReq): raise RuntimeError( 'Node-based MPI resource requirements are not' ' supported yet. Please make an issue on GitHub.') except InsufficientResourcesAvailable: if not self._allocations and not virtual: # There are no other allocations and it's still not # enough. Just give it all and hope for the best. assignment = self._oversubscribe_instance(instance, requirements) else: raise self._allocations[instance] = assignment.as_resources() return assignment def _assign_thread_block( self, free_resources: Resources, num_threads: int) -> OnNodeResources: """Assign resources for a group of threads. This chooses a set of <num_threads> cores on the same node. It returns the assigned resources; it doesn't update self._allocations or free_resources. Args: num_threads: Number of threads to allocate for free_resources: Available resources to allocate from Returns: The assigned resources """ for node in free_resources: if len(node.cpu_cores) >= num_threads: available_cores = node.cpu_cores _logger.debug(f'available cores: {available_cores}') to_reserve = available_cores.get_first_cores(num_threads) _logger.debug(f'assigned {to_reserve}') return OnNodeResources(node.node_name, to_reserve) raise InsufficientResourcesAvailable() def _oversubscribe_instance( self, instance: Reference, requirements: ResourceRequirements ) -> ResourceAssignment: """Oversubscribe an instance. This is called when all resources are available and we still cannot fit an instance, i.e. that single instance requires more resources than we have available in total. In that case, we're just going to map it onto the resources we have and hope for the best, which is what this function does. There's a lot of repetition between this and the code above. There's probably a cleaner way to do this, but it'll do for now. Eventually we'll have an optimiser and all this goes away anyway. Args: instance: The instance we're oversubscribing requirements: The required resources Returns: An oversubscribed resource assignment """ _logger.warning( f'Instance {instance} requires more resources than are available in' ' total. Oversubscribing this instance.') res_by_rank: list[OnNodeResources] = list() if isinstance(requirements, ThreadedResReq): res_by_rank.append(copy(next(iter(self._all_resources)))) elif isinstance(requirements, MPICoresResReq): if requirements.threads_per_mpi_process != 1: raise RuntimeError( 'Multiple threads per MPI process is not supported yet. Please' ' make an issue on GitHub.') free_resources = copy(self._all_resources) for _ in range(requirements.mpi_processes): if free_resources.total_cores() < requirements.threads_per_mpi_process: free_resources = copy(self._all_resources) block = self._assign_thread_block( free_resources, requirements.threads_per_mpi_process) res_by_rank.append(block) free_resources -= Resources([block]) return ResourceAssignment(res_by_rank)