Source code for libmuscle.planner.planner

from copy import copy, deepcopy
import logging
from typing import Dict, Iterable, List, Mapping, Optional, Set, Tuple

from ymmsl import (
        Component, Configuration, Model, MPICoresResReq, MPINodesResReq,
        Operator, Reference, ResourceRequirements, ThreadedResReq)

from libmuscle.util import instance_indices


_logger = logging.getLogger(__name__)


_PredSuccType = Dict[Component, Set[Tuple[Component, int]]]


[docs]class ModelGraph: """Represents a yMMSL model as a graph.""" def __init__(self, model: Model) -> None: """Create a ModelGraph. This is essentially a helper class that makes it easier to analyse a yMMSL model definition. Args: model: The model to represent. """ self._model = model self._direct_superpreds: _PredSuccType = {} self._direct_predecessors: _PredSuccType = {} self._direct_subpreds: _PredSuccType = {} self._direct_supersuccs: _PredSuccType = {} self._direct_successors: _PredSuccType = {} self._direct_subsuccs: _PredSuccType = {} self._superpreds: _PredSuccType = {} self._predecessors: _PredSuccType = {} self._subpreds: _PredSuccType = {} self._supersuccs: _PredSuccType = {} self._successors: _PredSuccType = {} self._subsuccs: _PredSuccType = {} self._calc_direct_succs_preds() self._calc_predecessors() self._calc_successors()
[docs] def components(self) -> Iterable[Component]: """Return the components of the model (nodes).""" return self._model.components
[docs] def component(self, name: Reference) -> Component: """Return a component by name. Args: name: Name of the component to look up Returns: The component with the given name Raises: KeyError: If no component could be found """ for component in self._model.components: if component.name == name: return component raise KeyError('Component {} not found'.format(name))
[docs] def successors(self, component: Component) -> Set[Tuple[Component, int]]: """Return the successors of the given component. Args: component: The reference component Returns: The set of components for which a path exists from component's O_F to their F_INIT. Raises: KeyError: If the component is not in the model used to construct this object. """ return self._successors[component]
[docs] def predecessors(self, component: Component) -> Set[Tuple[Component, int]]: """Return the predecessors of the given component. Args: component: The reference component Returns: The set of components for which a path exists from their O_F to component's F_INIT. Raises: KeyError: If the component is not in the model used to construct this object. """ return self._predecessors[component]
[docs] def macros(self, component: Component) -> Set[Tuple[Component, int]]: """Return the macros of the given component. These are components that are both before the component's F_INIT and after its O_F. Args: component: The reference component Returns: The set of components that are both super-predecessor and super-successor of component. """ return self._superpreds[component] & self._supersuccs[component]
[docs] def micros(self, component: Component) -> Set[Tuple[Component, int]]: """Return the micros of the given component. These are components that are in between the component's O_I and its S. Args: component: The reference component Returns: The set of components that are both sub-successor and sub-predecessor of component. """ return self._subsuccs[component] & self._subpreds[component]
def _propagate( self, from_set: Set[Tuple[Component, int]], to_set: Set[Tuple[Component, int]], shared_dims: int ) -> None: """Propagates from_set into to_set. Note: Modifies to_set. Args: from_set: Set to propagate components from to_set: Set to propagate them into shared_dims: Maximum shared dimensions to carry """ to_set.update({ (cmp, min(sd, shared_dims)) for cmp, sd in from_set}) def _calc_predecessors(self) -> None: """Calculates predecessors of each component in the model. This function determines for each component in the model which other components can be reached by following incoming conduits. Preconditions: self._model set self._direct_* calculated Side effects: Fills self._superpreds, self._predecessors, self._subpreds with for each component in the model the corresponding set of components. """ self._superpreds = {c: set() for c in self._model.components} self._predecessors = {c: set() for c in self._model.components} self._subpreds = {c: set() for c in self._model.components} num_remaining_preds = { c: ( len(self._direct_predecessors[c]) + len(self._direct_superpreds[c])) for c in self._model.components} num_remaining_subpreds = { c: len(self._direct_subpreds[c]) for c in self._model.components} todo = set(self._model.components) started: Set[Component] = set() doing: Set[Component] = set() finished: Set[Component] = set() done: Set[Component] = set() while todo or doing: started.clear() for component in todo: if num_remaining_preds[component] == 0: for subsucc, shared_dims in self._direct_subsuccs[component]: self._superpreds[subsucc].add((component, shared_dims)) self._propagate( self._predecessors[component], self._predecessors[subsucc], shared_dims) self._propagate( self._superpreds[component], self._superpreds[subsucc], shared_dims) num_remaining_preds[subsucc] -= 1 started.add(component) todo -= started doing |= started finished.clear() for component in doing: if num_remaining_subpreds[component] == 0: for succ, shared_dims in self._direct_successors[component]: self._predecessors[succ].add((component, shared_dims)) self._propagate( self._subpreds[component], self._predecessors[succ], shared_dims) self._propagate( self._predecessors[component], self._predecessors[succ], shared_dims) self._propagate( self._superpreds[component], self._superpreds[succ], shared_dims) num_remaining_preds[succ] -= 1 for supersucc, shared_dims in self._direct_supersuccs[component]: self._subpreds[supersucc].add((component, shared_dims)) self._propagate( self._subpreds[component], self._subpreds[supersucc], shared_dims) self._propagate( self._predecessors[component], self._subpreds[supersucc], shared_dims) num_remaining_subpreds[supersucc] -= 1 finished.add(component) doing -= finished done |= finished if not started and not finished: raise RuntimeError( 'Could not plan resource allocation for this model.' ' Do you have a cycle of O_F -> F_INIT conduits?' ' That does not work, because the models will all be' ' waiting for each other to start.') def _calc_successors(self) -> None: """Calculates successors of each component in the model. This function determines for each component in the model which other components can be reached by following outgoing conduits. Preconditions: self._model set self._direct_* calculated Side effects: Fills self._supersuccs, self._successors, self._subsuccs with for each component in the model the corresponding set of components. """ self._supersuccs = {c: set() for c in self._model.components} self._successors = {c: set() for c in self._model.components} self._subsuccs = {c: set() for c in self._model.components} num_remaining_succs = { c: ( len(self._direct_successors[c]) + len(self._direct_supersuccs[c])) for c in self._model.components} num_remaining_subsuccs = { c: len(self._direct_subsuccs[c]) for c in self._model.components} todo = set(self._model.components) started: Set[Component] = set() doing: Set[Component] = set() finished: Set[Component] = set() done: Set[Component] = set() while todo or doing: started.clear() for component in todo: if num_remaining_succs[component] == 0: for subpred, shared_dims in self._direct_subpreds[component]: self._supersuccs[subpred].add((component, shared_dims)) self._propagate( self._successors[component], self._successors[subpred], shared_dims) self._propagate( self._supersuccs[component], self._supersuccs[subpred], shared_dims) num_remaining_succs[subpred] -= 1 started.add(component) todo -= started doing |= started finished.clear() for component in doing: if num_remaining_subsuccs[component] == 0: for pred, shared_dims in self._direct_predecessors[component]: self._successors[pred].add((component, shared_dims)) self._propagate( self._successors[component], self._successors[pred], shared_dims) self._propagate( self._supersuccs[component], self._supersuccs[pred], shared_dims) self._propagate( self._subsuccs[component], self._successors[pred], shared_dims) num_remaining_succs[pred] -= 1 for superpred, shared_dims in self._direct_superpreds[component]: self._subsuccs[superpred].add((component, shared_dims)) self._propagate( self._successors[component], self._subsuccs[superpred], shared_dims) self._propagate( self._subsuccs[component], self._subsuccs[superpred], shared_dims) num_remaining_subsuccs[superpred] -= 1 finished.add(component) doing -= finished done |= finished def _calc_direct_succs_preds(self) -> None: """Calculates all successors and predecessors of components. Preconditions: self._model set Side effects: Sets self._direct_successors to a dictionary mapping each component in the model to the set of components that it has a * -> F_INIT conduit to. """ components = {c.name: c for c in self._model.components} self._direct_supersuccs = {c: set() for c in self._model.components} self._direct_successors = {c: set() for c in self._model.components} self._direct_subsuccs = {c: set() for c in self._model.components} self._direct_superpreds = {c: set() for c in self._model.components} self._direct_predecessors = {c: set() for c in self._model.components} self._direct_subpreds = {c: set() for c in self._model.components} for conduit in self._model.conduits: sender = components[conduit.sending_component()] snd_op = None if sender.ports: if conduit.sending_port() in sender.ports.o_i: snd_op = Operator.O_I elif conduit.sending_port() in sender.ports.o_f: snd_op = Operator.O_F receiver = components[conduit.receiving_component()] recv_op = None if conduit.receiving_port() == 'muscle_settings_in': recv_op = Operator.F_INIT elif receiver.ports: if conduit.receiving_port() in receiver.ports.f_init: recv_op = Operator.F_INIT elif conduit.receiving_port() in receiver.ports.s: recv_op = Operator.S shared_dims = min(len(sender.multiplicity), len(receiver.multiplicity)) if (snd_op, recv_op) == (Operator.O_I, Operator.F_INIT): self._direct_superpreds[receiver].add((sender, shared_dims)) self._direct_subsuccs[sender].add((receiver, shared_dims)) elif (snd_op, recv_op) == (Operator.O_F, Operator.F_INIT): self._direct_successors[sender].add((receiver, shared_dims)) self._direct_predecessors[receiver].add((sender, shared_dims)) elif (snd_op, recv_op) == (Operator.O_F, Operator.S): self._direct_subpreds[receiver].add((sender, shared_dims)) self._direct_supersuccs[sender].add((receiver, shared_dims))
[docs]class Resources: """Designates a (sub)set of resources. Whether these resources are free or allocated in general or by something specific depends on the context, this just says which resources we're talking about. Attributes: cores: A dictionary mapping designated nodes to designated cores on them. """ def __init__(self, cores: Optional[Dict[str, Set[int]]] = None) -> None: """Create a Resources object with the given cores. Args: cores: Cores to be designated by this object. """ if cores is None: self.cores: Dict[str, Set[int]] = {} else: self.cores = cores def __copy__(self) -> 'Resources': """Copy the object.""" return Resources(deepcopy(self.cores)) def __eq__(self, other: object) -> bool: """Check for equality.""" if not isinstance(other, Resources): return NotImplemented if len(self.cores) != len(other.cores): return False for node, cores in self.cores.items(): if node not in other.cores: return False if other.cores[node] != cores: return False return True def __iadd__(self, other: 'Resources') -> 'Resources': """Add the resources in the argument to this object.""" for node in other.cores: if node in self.cores: self.cores[node] |= other.cores[node] else: self.cores[node] = set(other.cores[node]) return self def __isub__(self, other: 'Resources') -> 'Resources': """Remove the resources in the argument from this object.""" for node in other.cores: if node in self.cores: self.cores[node] -= other.cores[node] if not self.cores[node]: del self.cores[node] return self def __str__(self) -> str: """Return a human-readable string representation.""" def collapse_ranges(cores: Set[int]) -> str: if len(cores) == 0: return '' result = list() scores = sorted(cores) start = 0 i = 1 while i <= len(scores): if (i == len(scores)) or (scores[i-1] != scores[i] - 1): if start == i - 1: # run of one result.append(str(scores[i-1])) else: # run of at least two result.append(f'{scores[start]}-{scores[i-1]}') start = i i += 1 return ','.join(result) return 'Resources(' + '; '.join([ n + ': ' + collapse_ranges(cs) for n, cs in self.cores.items()]) + ')' def __repr__(self) -> str: """Return a string representation.""" return f'Resources({self.cores})'
[docs] def nodes(self) -> Iterable[str]: """Returns the nodes on which we designate resources.""" return self.cores.keys()
[docs] def total_cores(self) -> int: """Returns the total number of cores designated.""" return sum([len(cs) for cs in self.cores.values()])
[docs] def isdisjoint(self, other: 'Resources') -> bool: """Returns whether we share resources with other.""" for node, cores in self.cores.items(): if node in other.cores: if not cores.isdisjoint(other.cores[node]): return False return True
[docs] @staticmethod def union(resources: Iterable['Resources']) -> 'Resources': """Combines the resources into one. Args: resources: A collection of resources to merge. Return: A Resources object referring to all the resources in the input. """ result = Resources() for cur_resources in resources: result += cur_resources return result
[docs]class InsufficientResourcesAvailable(RuntimeError): pass
[docs]class Planner: """Allocates resources and keeps track of allocations.""" def __init__(self, all_resources: Resources): """Create a ResourceManager. Args: all_resources: An object describing the available resources to be managed by this ResourceManager. """ self._all_resources = all_resources self._allocations: Dict[Reference, Resources] = {} self._oversubscribed: Dict[Reference, Resources] = {} self._next_virtual_node = 1
[docs] def allocate_all( self, configuration: Configuration, virtual: bool = False ) -> Dict[Reference, Resources]: """Allocates resources for the given components. Allocation can occur either on a fixed set of available resources (virtual set to False), or on an elastic set of virtual resources (virtual set to True). Use the former inside of a (fixed) cluster allocation and the latter to determine how many nodes are needed to run a simulation without oversubscribing. If virtual is True, additional nodes will be added as needed to obtain the resources needed to allocate all instances. Each additional node will have as many cores as a random existing one. The intended use is to pass a single node to __init__ when using this mode. Args: configuration: Configuration to allocate all components of virtual: Allocate on virtual resources or not, see above Returns: Resources for each instance required by the model. """ result: Dict[Reference, Resources] = {} # Analyse model model = ModelGraph(configuration.model) requirements = configuration.resources implementations = configuration.implementations exclusive = { i for c in model.components() for i in c.instances() if (c.implementation and not implementations[c.implementation].can_share_resources)} # Allocate unallocated_instances = [ i for c in model.components() for i in c.instances()] leftover_instances: List[Reference] = [] while unallocated_instances: leftover_instances.clear() to_allocate = self._sort_instances( unallocated_instances, requirements) for instance in to_allocate: component = model.component(instance.without_trailing_ints()) conflicting_names = self._conflicting_names( model, exclusive, component, instance) done = False while not done: try: result[instance] = self._allocate_instance( instance, component, requirements[component.name], conflicting_names, virtual) done = True except InsufficientResourcesAvailable: if virtual: self._expand_resources( component.name, requirements[component.name]) else: leftover_instances.append(instance) done = True if leftover_instances: _logger.warning( 'Planner ran out of resources, oversubscribing remaining' ' instances.') self._oversubscribed.update(self._allocations) self._allocations.clear() unallocated_instances.clear() unallocated_instances.extend(leftover_instances) return result
def _sort_instances( self, instances: List[Reference], requirements: Mapping[Reference, ResourceRequirements] ) -> List[Reference]: """Return to be allocated components in optimal order. This is a heuristic, it's not actually optimal but it should give decent results most of the time. Args: instances: The instances to sort requirements: The resource requirements for their components, indexed by name """ cmp_names = map(Reference.without_trailing_ints, instances) reqs = map(lambda n: requirements[n], cmp_names) instances_reqs = list(zip(instances, reqs)) threaded = [ (i, r.threads) for i, r in instances_reqs if isinstance(r, ThreadedResReq)] sorted_threaded = sorted(threaded, key=lambda x: x[1], reverse=True) sorted_threaded_instances = [x[0] for x in sorted_threaded] mpi = [ (i, r.mpi_processes) for i, r in instances_reqs if isinstance(r, MPICoresResReq)] sorted_mpi = sorted(mpi, key=lambda x: x[1], reverse=True) sorted_mpi_instances = [x[0] for x in sorted_mpi] return sorted_threaded_instances + sorted_mpi_instances def _conflicting_names( self, model: ModelGraph, exclusive: Set[Reference], component: Component, instance: Reference) -> Set[Reference]: """Find conflicting components. This returns the names of instances that cannot share resources with the given component, so that they can be avoided when assigning resources. Args: model: Model to search exclusive: List of instances that cannot share resources component: Component to find conflicts for instance: Instance (of component) to find conflicts for """ def indices_match( instance1: Reference, instance2: Reference, dims: int) -> bool: idx1 = instance_indices(instance1) idx2 = instance_indices(instance2) return idx1[:dims] == idx2[:dims] def matching_instances( others: Set[Tuple[Component, int]]) -> Set[Reference]: return { i for c, d in others for i in c.instances() if indices_match(i, instance, d)} conflicting_instances = { i for c in model.components() for i in c.instances()} if instance in exclusive: return conflicting_instances conflicting_instances -= matching_instances(model.predecessors(component)) conflicting_instances -= matching_instances(model.successors(component)) if component not in exclusive: micros = matching_instances(model.micros(component)) macros = matching_instances(model.macros(component)) nonconflicting_mms = (micros | macros) - exclusive conflicting_instances -= nonconflicting_mms return conflicting_instances def _expand_resources( self, name: Reference, req: ResourceRequirements) -> None: """Adds an extra virtual node to the available resources.""" taken = True while taken: new_node = 'node{:06d}'.format(self._next_virtual_node) taken = new_node in self._all_resources.cores self._next_virtual_node += 1 num_cores = len(next(iter(self._all_resources.cores.values()))) if isinstance(req, ThreadedResReq): if req.threads > num_cores: raise InsufficientResourcesAvailable( f'Instance {name} requires {req.threads} threads,' f' which is impossible with {num_cores} cores per' ' node.') if isinstance(req, MPICoresResReq): if req.threads_per_mpi_process > num_cores: raise InsufficientResourcesAvailable( f'Instance {name} requires' f' {req.threads_per_mpi_process} threads per process,' f' which is impossible with {num_cores} cores per' ' node.') self._all_resources.cores[new_node] = set(range(num_cores)) def _allocate_instance( self, instance: Reference, component: Component, requirements: ResourceRequirements, simultaneous_instances: Set[Reference], virtual: bool ) -> Resources: """Allocates resources for the given instance. If we are on real resources, and the instance requires more threads than our nodes have cores, then we'll just give it all cores on a node and hope for the best. If we are on virtual resources, this will raise InsufficientResourcesAvailable. Args: instance: The instance to allocate for component: The component it is an instance of requirements: Its resource requirements simultaneous_instances: Instances which may execute simultaneously and whose resources we therefore cannot overlap with virtual: Whether we are on virtual resources Returns: A Resources object describing the resources allocated """ allocation = Resources({}) free_resources = copy(self._all_resources) for other in self._allocations: if other in simultaneous_instances: free_resources -= self._allocations[other] try: if isinstance(requirements, ThreadedResReq): allocation = self._allocate_thread_block( free_resources, requirements.threads) elif isinstance(requirements, MPICoresResReq): if requirements.threads_per_mpi_process != 1: raise RuntimeError( 'Multiple threads per MPI process is not supported' ' yet. Please make an issue on GitHub.') for proc in range(requirements.mpi_processes): allocation += self._allocate_thread_block( free_resources, requirements.threads_per_mpi_process) free_resources -= allocation elif isinstance(requirements, MPINodesResReq): raise RuntimeError( 'Node-based MPI resource requirements are not' ' supported yet. Please make an issue on GitHub.') except InsufficientResourcesAvailable: if not self._allocations and not virtual: # There are no other allocations and it's still not # enough. Just give it all and hope for the best. _logger.warning(( 'Instance {} requires more resources than are' ' available in total. Oversubscribing this' ' instance.').format(instance)) allocation = copy(self._all_resources) else: raise self._allocations[instance] = allocation return allocation def _allocate_thread_block( self, free_resources: Resources, threads: int) -> Resources: """Allocate resources for a group of threads. This chooses a set of <threads> cores on the same node. It returns the allocated resources; it doesn't update self._allocations or free_resources. Args: threads: Number of cores free_resources: Available resources to allocate from Returns: The allocated resources """ for node in free_resources.nodes(): if len(free_resources.cores[node]) >= threads: available_cores = sorted(free_resources.cores[node]) to_reserve = set(available_cores[:threads]) return Resources({node: to_reserve}) raise InsufficientResourcesAvailable()