Planification avec PSC

Solutions du code

Module .../moteur_psc_planification/axiomecadre.py

from moteur_psc.contrainte import Contrainte

class ContrainteAxiomeCadre(Contrainte):
    """ Contrainte forçant l'utilisation d'opérateurs pour agir sur les\
        variables d'un problème."""

    def __init__(self, var_pre, ops, var_post):
        """
            :param var_pre: la variable au début de l'état - prop(Si).
            :param list var_ops: les variables correspondant aux opérateurs qui\
            ont prop comme postcondition.
            :param var_post: la variable à la fin de l'état - prop(Si+1).
        """

        Contrainte.__init__(self, (var_pre, var_post) + tuple(ops))

        self.var_pre = var_pre
        self.var_post = var_post
        self.vars_ops = ops

    def est_valide(self, var, val):
        """ Evalue la validité de la contrainte pour une valeur de variable\
            donnée.

            L'évaluation est paresseuse : si au moins une variable n'est pas\
            instanciée, on retourne ``True``.
            Sinon, si la variable change de valeur, alors au moins une variable\
            d'opérateur doit être ``True``.

            :param var: la variable à laquelle assigner la valeur ``val``.
            :param val: la valeur à assigner à ``var``.
            :return: ``True`` si la contrainte est valide pour la paire\
            variable/valeur passée en paramètre.
        """
        ancienne_valeur = var.val
        var.val = val

        # On part du principe que la contrainte est valide si au moins une
        # variable n'est pas instanciée.
        for var2 in self.variables:
            if var2.val is None:
                var.val = ancienne_valeur
                return True

        valide = False
        # Si toutes les variables sont instanciées et qu'une variable passe de
        # False à True.
        if self.var_pre.val == False and self.var_post.val == True:
            # Vérifie qu'au moins un des opérateurs est appliqué.
            for op in self.vars_ops:
                if op.val == True:
                    valide = True
                    break
        else:
            valide = True

        var.val = ancienne_valeur
        return valide

    def propage(self, var):
        """ Propage la nouvelle valeur d'une variable afin de vérifier que la\
            contrainte est toujours satisfaisable.

            La propagation est paresseuse: elle retourne ``True`` si plus d'une\
            variable n'a pas encore la valeur ``None``.

            :param var: la variable ayant reçu une nouvelle valeur.
            :return: ``True`` si la contrainte peut encore être satisfaite.
        """
        var2 = None
        for var_i in self.variables:
            if var_i.val is None:
                if var2 is None:
                    var2 = var_i
                else:
                    # Il reste plus d'une variable à instancier.
                    return True

        # Teste les valeurs du label pour la dernière variable non instanciée.
        for val in var2.label[:]:
            if not self.est_valide(var2, val):
                var2.label.remove(val)

        return len(var2.label) > 0

    def reviser(self):
        """ Réviser n'est pas définie pour les contraintes n-aires.

            :return: ``False``.
        """

        return False

    def __repr__(self):
        return 'Axiome de cadre:\n\t{}\n\t{}\n\t{}'.format(self.var_pre,
                                                           [op for op in self.vars_ops],
                                                           self.var_post)

Module .../moteur_planification/etat.py

from moteur_psc_heuristique.variable_avec_label import VariableAvecLabel

class Etat:
    """ Un état du plan."""

    def __init__(self, no_etat, propositions, operateurs, etat_prec=None):
        """
            :param no_etat: numéro de l'état dans le plan.
            :param list propositions: liste des propositions de planification.
            :param list operateurs: liste des opérateurs applicables dans le plan.
            :param etat_prec: l'état précédent.
        """

        self.no_etat = no_etat
        self.etat_prec = etat_prec

        self.operateurs = { op.nom: op for op in operateurs }

        self.vars_initiales = {}
        self.vars_finales = {}

        self.construire_vars_operateurs(operateurs)
        self.construire_vars_propositions(propositions)

    def construire_vars_operateurs(self, ops):
        """ Construit les variables de l'état représentant l'utilisation des\
            opérateurs.
        """

        self.vars_operateurs = {}

        for op in ops:
            var_nom = '{} état {}'.format(op.nom, self.no_etat)
            self.vars_operateurs[op.nom] = VariableAvecLabel(var_nom,
                                                             [True, False])

    def construire_vars_propositions(self, props):
        """ Construit les variables de l'état représentant les propositions.

            Les variables initiales sont les variables finales de l'état\
            précédent sauf lorsqu'il n'en existe pas.
        """
        if self.no_etat > 0:
            self.vars_initiales = self.etat_prec.vars_finales

        for prop in props:
            var_nom = '{} état {}'.format(prop, self.no_etat + 1)
            self.vars_finales[prop] = VariableAvecLabel(var_nom,
                                                        [True, False])
            if self.no_etat == 0:
                var_nom = '{} état 0'.format(prop)
                self.vars_initiales[prop] = VariableAvecLabel(var_nom,
                                                              [True, False])

    def variables(self):
        """ La liste des variables de l'état (variables initiales, finales et\
            variables d'opérateurs).

            :return: une liste de variables.
        """

        return (list(self.vars_initiales.values()) +
                list(self.vars_finales.values()) +
                list(self.vars_operateurs.values()))

Module .../moteur_planification/planification.py

from moteur_psc.contrainte import ContrainteUnaire
from moteur_psc_heuristique.contrainte_avec_propagation import ContrainteAvecPropagation
from moteur_psc_heuristique.psc_heuristique import PSCHeuristique
from moteur_psc_planification.axiomecadre import ContrainteAxiomeCadre
from .etat import Etat

class Planification:
    def __init__(self, propositions, operateurs,
                 mutex_propositions, mutex_operateurs,
                 depart, but, nb_etats):
        """
            :param list propositions: liste des propositions du problème.
            :param list operateurs: liste des opérateurs du problème.
            :param list mutex_propositions: liste des mutex de propositions.
            :param list mutex_operateurs: liste des mutex d'opérateurs.
            :param depart: contraintes de départ.
            :param but: contraintes finales.
            :param nb_etats: nombre d'états du plan.
        """

        self.operateurs = operateurs
        self.mutex_propositions = mutex_propositions
        self.mutex_operateurs = mutex_operateurs

        self.depart = depart
        self.but = but

        self.nb_etats = nb_etats

        self.propositions = propositions

        self.etats = []
        self.construire_etats()

        self.psc = PSCHeuristique(self.variables(), self.construire_contraintes())

    def construire_etats(self):
        """ Construction des états de la planification.

            Chaque état est constitué de ses variables initiales, de ses\
            variables d'opérateurs et de ses variables finales.

            /!\ Les variables finales d'un état coïncident avec les variables\
            initiales de l'état suivant /!\.
        """
        self.etats.append(Etat(0, self.propositions, self.operateurs, None))

        for i in range(1, self.nb_etats):
            self.etats.append(Etat(i, self.propositions,
                                   self.operateurs, self.etats[-1]))

    def variables(self):
        """
            :return: la liste des variables du problème transformé en PSC.
        """

        # Utiliser un set évite les doublons entre variables finales et
        # initiales.
        variables = set()
        for etat in self.etats:
            variables.update(etat.variables())

        return list(variables)

    def construire_contraintes(self):
        """ Méthode intermédiaire pour faire l'aggrégation de toutes les\
            contraintes.

            :return: les contraintes du problème transformé en PSC.
        """

        return (self.construire_contraintes_propositions() +
                self.construire_contraintes_operateurs() +
                self.construire_contraintes_conditions() +
                self.construire_contraintes_axiomes_cadre() +
                self.construire_contraintes_initiales() +
                self.construire_contraintes_finales())

    def construire_contraintes_propositions(self):
        """ Construit les contraintes traduisant les mutex de propositions.

            :return: une liste de contraintes.
        """
        contraintes = []
        nand = lambda x,y: not (x and y)

        # Construction des contraintes générées par les mutex de propositions
        # **pour chaque état**.
        for mutex in self.mutex_propositions:
            for etat in self.etats:
                contr = ContrainteAvecPropagation(etat.vars_initiales[mutex[0]],
                                                  etat.vars_initiales[mutex[1]],
                                                  nand)
                contraintes.append(contr)
                # Les mutex de propositions doivent aussi être valides pour les
                # variables finales du dernier état.
                if etat.no_etat == (self.nb_etats - 1):
                    contr = ContrainteAvecPropagation(etat.vars_finales[mutex[0]],
                                                      etat.vars_finales[mutex[1]],
                                                      nand)
                    contraintes.append(contr)

        return contraintes

    def construire_contraintes_operateurs(self):
        """ Construit les contraintes traduisant les mutex d'opérateurs.

            :return: une liste de Contraintes.
        """
        contraintes = []
        nand = lambda x,y: not (x and y)

        for mutex in self.mutex_operateurs:
            for etat in self.etats:
                contr = ContrainteAvecPropagation(etat.vars_operateurs[mutex[0].nom],
                                                  etat.vars_operateurs[mutex[1].nom],
                                                  nand)
                contraintes.append(contr)

        return contraintes

    def construire_contraintes_conditions(self):
        """ Constuit les contraintes traduisant les pré- et post-conditions des\
            opérateurs.

            :return: une liste de contraintes.
        """
        contraintes = []
        # Contraintes générées par les pré- et post-conditions.
        # Implication logique.
        imp = lambda x,y: (not x) or y

        for etat in self.etats:
            for op in self.operateurs:
                for precond in op.precond:
                    contr = ContrainteAvecPropagation(etat.vars_operateurs[op.nom],
                                                      etat.vars_initiales[precond],
                                                      imp)
                    contraintes.append(contr)
                for postcond in op.postcond:
                    contr = ContrainteAvecPropagation(etat.vars_operateurs[op.nom],
                                                      etat.vars_finales[postcond],
                                                      imp)
                    contraintes.append(contr)

        return contraintes

    def construire_contraintes_axiomes_cadre(self):
        """ Construit les contraintes d'axiomes de cadre.

            Un axiome de cadre traduit le fait qu'une variable ne peut pas\
            changer de valeur sans l'action d'un opérateur.

            :return: une liste de contraintes.
        """
        contraintes = []

        for etat in self.etats:
            for prop in self.propositions:
                vars_ops = [etat.vars_operateurs[op.nom]
                            for op in self.operateurs
                            if prop in op.postcond]
                contr = ContrainteAxiomeCadre(etat.vars_initiales[prop],
                                              vars_ops,
                                              etat.vars_finales[prop])
                contraintes.append(contr)
        return contraintes

    def construire_contraintes_initiales(self):
        """ Construit les contraintes qui fixent la situation de départ.

            :return: une liste de contraintes.
        """
        contraintes = []
        for contrainte in self.depart:
            eq = lambda x: x == contrainte[1]
            contr = ContrainteUnaire(self.etats[0].vars_initiales[contrainte[0]], eq)
            contraintes.append(contr)

        return contraintes

    def construire_contraintes_finales(self):
        """ Construit les contraintes qui fixent la situation but.

            :return: une liste de contraintes.
        """
        contraintes = []

        for contrainte in self.but:
            eq = lambda x: x == contrainte[1]
            contr = ContrainteUnaire(self.etats[-1].vars_finales[contrainte[0]], eq)
            contraintes.append(contr)

        return contraintes

    def resoudre(self):
        """ Utilise le Forward Checking pour résoudre le problème de\
            planification transformé en PSC.

            Note: La méthode ``solution`` permet d'afficher la solution sans\
            avoir à traiter la valeur de retour de cette méthode.

            :return: le dictionnaire ``{variable : valeur}`` retourné par la\
            méthode de Forward Checking.
        """

        self.psc.consistance_noeuds()
        self.psc.consistance_arcs()
        self.psc.variable_ordering()

        self.psc.forward_checking(0, True)
        self.sol = self.psc.solutions

        return self.sol

    def affice_solutions(self):
        """ Méthode pour afficher les solutions découvertes par l'algorithme."""

        print('Recherche terminée en {} itérations'.format(self.psc.iterations))

        if len(self.psc.solutions) == 0:
            print('Aucune solution trouvée')
            return

        for sol in self.psc.solutions:
            print('Solution')
            print('========')
            for etat in self.etats:
                print('État {}: '.format(etat.no_etat))
                print('  Propositions initiales:')
                for nom, var in sorted(etat.vars_initiales.items()):
                    if sol[var.nom]:
                        print('    ' + nom)

                print('  Opérateurs:')
                for nom, var in sorted(etat.vars_operateurs.items()):
                    if sol[var.nom]:
                        print('    ' + nom)

                print('  Propositions finales:')
                for nom, var in sorted(etat.vars_finales.items()):
                    if sol[var.nom]:
                        print('    ' + nom)
                print()

Module .../exemple_missionnaires.py

from moteur_planification.operateur import Operateur
from moteur_planification.planification import Planification

def format_g(acteur):
    """ Retourne une string représentant un acteur à gauche. """

    return 'g({})'.format(acteur)

def format_d(acteur):
    """ Retourne une string représentant un acteur à droite. """

    return 'd({})'.format(acteur)

def format_dg(bateau, pilote):
    """ Retourne une string représentant une traversée de droite à gauche. """

    return 'dg({}, {})'.format(bateau, pilote)

def format_gd(bateau, pilote, passager):
    """ Retourne une string représentant une traversée de gauche à droite. """

    return 'gd({}, {}, {})'.format(bateau, pilote, passager)


bateaux = ['B']
missionnaires = ['M1', 'M2']
cannibales = ['C1', 'C2']

acteurs = bateaux + missionnaires + cannibales

# Ajoute les propositions pour la position des acteurs.
propositions = []
for acteur in acteurs:
    propositions.append(format_g(acteur))
    propositions.append(format_d(acteur))

# Ajoute les opérateurs de déplacement.
operateurs = []
for bateau in bateaux:
    for pilote in missionnaires:
        # Déplacements du bateau sans passagers (droite à gauche).
        operateurs.append(Operateur(
            format_dg(bateau, pilote),
            [format_d(bateau), format_d(pilote)],
            [format_g(bateau), format_g(pilote)])
            )

        for passager in missionnaires + cannibales:
            # Déplacements du bateau avec passagers (gauche à droite).
            if passager != pilote:
                operateurs.append(Operateur(
                    format_gd(bateau, pilote, passager),
                    [format_g(bateau), format_g(pilote), format_g(passager)],
                    [format_d(bateau), format_d(pilote), format_d(passager)])
                    )

# Ajoute les mutex de proposition (un acteur ne peut pas être sur les deux rives
# simultanément).
mutex_propositions = []
for acteur in acteurs:
    mutex_propositions.append((format_g(acteur), format_d(acteur)))

# Ajoute les mutex d'opérateurs.
mutex_operateurs = []
for i in range(len(operateurs)):
    for j in range(i+1, len(operateurs)):
        for acteur in acteurs:

            if ((format_d(acteur) in operateurs[i].precond or
                 format_g(acteur) in operateurs[i].precond)
                and
                (format_d(acteur) in operateurs[j].precond or
                 format_g(acteur) in operateurs[j].precond)):

                mutex_operateurs.append((operateurs[i], operateurs[j]))
                break

# Ajoute les contraintes initiales (tous les acteurs à gauche).
depart = []
for acteur in acteurs:
    depart.append((format_g(acteur), True))

# Ajoute les contraintes finales (but: tous les acteurs à droite).
but = []
for acteur in acteurs:
    but.append((format_d(acteur), True))

# Transforme le problème de planification en PSC.
plan = Planification(propositions, operateurs,
                     mutex_propositions, mutex_operateurs,
                     depart, but,
                     nb_etats=5)
plan.resoudre()

plan.affice_solutions()

Documentation du code

class moteur_psc_planification.axiomecadre.ContrainteAxiomeCadre(var_pre, ops, var_post)

Bases: moteur_psc.contrainte.Contrainte

Contrainte forçant l’utilisation d’opérateurs pour agir sur les variables d’un problème.

ContrainteAxiomeCadre.__init__(var_pre, ops, var_post)
Paramètres:
  • var_pre – la variable au début de l’état - prop(Si).
  • var_ops (list) – les variables correspondant aux opérateurs qui ont prop comme postcondition.
  • var_post – la variable à la fin de l’état - prop(Si+1).
ContrainteAxiomeCadre.dimension()
Retourne:le nombre de variables concernées par la contrainte.
ContrainteAxiomeCadre.est_valide(var, val)

Evalue la validité de la contrainte pour une valeur de variable donnée.

L’évaluation est paresseuse : si au moins une variable n’est pas instanciée, on retourne True. Sinon, si la variable change de valeur, alors au moins une variable d’opérateur doit être True.

Paramètres:
  • var – la variable à laquelle assigner la valeur val.
  • val – la valeur à assigner à var.
Retourne:

True si la contrainte est valide pour la paire variable/valeur passée en paramètre.

ContrainteAxiomeCadre.propage(var)

Propage la nouvelle valeur d’une variable afin de vérifier que la contrainte est toujours satisfaisable.

La propagation est paresseuse: elle retourne True si plus d’une variable n’a pas encore la valeur None.

Paramètres:var – la variable ayant reçu une nouvelle valeur.
Retourne:True si la contrainte peut encore être satisfaite.
ContrainteAxiomeCadre.reviser()

Réviser n’est pas définie pour les contraintes n-aires.

Retourne:False.
class moteur_planification.etat.Etat(no_etat, propositions, operateurs, etat_prec=None)

Un état du plan.

__init__(no_etat, propositions, operateurs, etat_prec=None)
Paramètres:
  • no_etat – numéro de l’état dans le plan.
  • propositions (list) – liste des propositions de planification.
  • operateurs (list) – liste des opérateurs applicables dans le plan.
  • etat_prec – l’état précédent.
construire_vars_operateurs(ops)

Construit les variables de l’état représentant l’utilisation des opérateurs.

construire_vars_propositions(props)

Construit les variables de l’état représentant les propositions.

Les variables initiales sont les variables finales de l’état précédent sauf lorsqu’il n’en existe pas.

variables()

La liste des variables de l’état (variables initiales, finales et variables d’opérateurs).

Retourne:une liste de variables.
class moteur_planification.operateur.Operateur(nom, precond, postcond)

Opérateur de planification.

__init__(nom, precond, postcond)
Paramètres:
  • nom (str) – nom représentant l’opérateur.
  • precond (list) – liste de propositions nécessaires à l’opération.
  • postcond (list) – liste de propositions résultant de l’opération.