Source code for aiida_optimize.engines._particle_swarm

# -*- coding: utf-8 -*-

# pylint: disable=invalid-name
# Author: Emanuele Bosoni <bosoe4@gmail.com>
"""
Defines a Particle-Swarm optimization engine.
"""

from copy import deepcopy
import typing as ty

from aiida import orm
from decorator import decorator
import numpy as np
from numpy.random import get_state, set_state, uniform

from ..helpers import get_nested_result
from .base import OptimizationEngineImpl, OptimizationEngineWrapper

__all__ = ["ParticleSwarm"]

# Parameters choosen according to http://dx.doi.org/10.1109/CEC.2003.1299391
C1 = 1.49445
C2 = 1.49445
OMEGA = 0.5


def update_method(next_submit=None):
    """
    Decorator for methods which update the results.
    """

    @decorator
    def inner(func, self, outputs):
        self.next_submit = next_submit
        self.next_update = None
        func(self, outputs)

    return inner


def submit_method(next_update=None):
    """
    Decorator for methods which submit new evaluations.
    """

    @decorator
    def inner(func, self):
        self.next_submit = None
        self.next_update = next_update
        return func(self)

    return inner


class _ParticleSwarmImpl(OptimizationEngineImpl):
    """
    Implementation class for the Particle-Swarm optimization engine.
    """

    def __init__(  # pylint: disable=too-many-arguments
        self,
        particles: ty.List[float],  # ty.Optional[ty.List[float]],
        max_iter: int,
        input_key: str,
        result_key: str,
        logger,
        num_iter=0,
        next_submit="submit_initialize",
        next_update=None,
        finished=False,
        exceeded_max_iters=False,
        result_state=None,
        global_best=None,
        fun_global_best=None,
        local_best=None,
        fun_local_best=None,
        velocities=None,
        rand_state=None,
    ):
        super().__init__(logger=logger, result_state=result_state)

        self.particles = np.array(particles)
        n_vars = len(self.particles[0])
        assert all(len(part) == n_vars for part in self.particles)
        self.velocities = velocities

        self.global_best = global_best
        self.fun_global_best = fun_global_best
        self.local_best = local_best
        self.fun_local_best = fun_local_best

        self.max_iter = max_iter
        self.num_iter = num_iter

        self.input_key = input_key
        self.result_key = result_key

        self.next_submit = next_submit
        self.next_update = next_update

        self.finished = finished
        self.exceeded_max_iters = exceeded_max_iters

        self.rand_state = rand_state

    @submit_method(next_update="update_general")
    def submit_initialize(self):
        n_parts = len(self.particles)
        n_vars = len(self.particles[0])
        self.local_best = self.particles
        self.fun_local_best = np.full(n_parts, np.inf)
        self.fun_global_best = np.inf
        # Initialize the velocities to random number in [-1,1]
        self.velocities = np.zeros((n_parts, n_vars))
        for line, _ in enumerate(self.velocities):
            for col, _ in enumerate(self.velocities[line]):
                self.velocities[line][col] = uniform(-1, 1)
        self.rand_state = get_state()
        self._logger.report("Submitting first step.")
        return [self._to_input_list(x) for x in self.particles]

    def _to_input_list(self, x):
        input_list = orm.List()
        input_list.extend(x)
        return {self.input_key: input_list}

    @update_method(next_submit="new_iter")
    def update_general(self, outputs):  # pylint: disable=missing-function-docstring
        fun_particles = np.array(self._get_values(outputs))
        for index, val in enumerate(fun_particles):
            if val < self.fun_local_best[index]:
                self.fun_local_best[index] = val
                self.local_best[index] = self.particles[index]
        for index, val in enumerate(self.fun_local_best):
            if val < self.fun_global_best:
                self.fun_global_best = val
                self.global_best = self.local_best[index]

    def _get_values(self, outputs):
        return [get_nested_result(res, self.result_key).value for _, res in sorted(outputs.items())]

    def create_particle(self):  # pylint: disable=missing-function-docstring
        n_var = len(self.particles[0])
        new_vel = deepcopy(self.velocities)
        for idx, val in enumerate(self.particles):
            new_vel[idx] = [
                self.update_vel(
                    OMEGA,
                    self.velocities[idx][i],
                    C1,
                    C2,
                    val[i],
                    self.local_best[idx][i],
                    self.global_best[i],
                )
                for i in range(n_var)
            ]

        new_parts = deepcopy(self.particles)
        for fd, _ in enumerate(self.particles):
            new_parts[fd] = np.array(new_vel[fd]) + self.particles[fd]

        return np.array(new_parts), np.array(new_vel)

    @staticmethod
    def update_vel(omega, v, c1, c2, x, pi, pg):  # pylint: disable=too-many-arguments
        return omega * v + c1 * uniform(0, 1) * (pi - x) + c2 * uniform(0, 1) * (pg - x)

    @submit_method()
    def new_iter(self):  # pylint: disable=missing-function-docstring
        self.check_finished()
        if self.finished:
            self.next_update = "finalize"
            return []
        self.num_iter += 1
        self._logger.report(
            f"Start of Particle-Swarm iteration {self.num_iter}, max number of iterations: {self.max_iter}."
        )
        self.next_update = "update_general"
        set_state(self.rand_state)
        self.particles, self.velocities = self.create_particle()
        self.rand_state = get_state()

        return [self._to_input_list(x) for x in self.particles]

    @update_method()
    def finalize(self, outputs):
        pass

    def check_finished(self):
        """
        Updates the 'finished' attribute.
        """
        self._logger.report(
            f"End of Particle-Swarm iteration {self.num_iter}, max number of iterations: {self.max_iter}."
        )
        self._logger.report(f"Function value at global best {self.fun_global_best}")
        self._logger.report(f"Variables at global best {self.global_best}")
        if not self.finished:
            if self.num_iter >= self.max_iter:
                self._logger.report("Number of iterations exceeded the maximum. Stop.")
                self.exceeded_max_iters = True
                self.finished = True

    @property
    def _state(self):
        state_dict = {
            k: v
            for k, v in self.__dict__.items()
            if k not in ["_result_mapping", "_logger", "xtol", "ftol"]
        }
        return state_dict

    @property
    def is_finished(self):
        return self.finished

    @property
    def is_finished_ok(self):
        # If reeintroduce the tollerance values, also need to modify this!!!
        # return self.is_finished and not self.exceeded_max_iters
        return self.is_finished

    def _create_inputs(self):
        return getattr(self, self.next_submit)()

    def _update(self, outputs):
        getattr(self, self.next_update)(outputs)

    @property
    def result_value(self):
        value = super().result_value  # pylint: disable=no-member
        return value

    def _get_optimal_result(self):
        """
        Return the index and optimization value of the best evaluation process.
        """
        cost_values = {
            k: get_nested_result(v.output, self.result_key) for k, v in self._result_mapping.items()
        }
        opt_index, opt_output = min(cost_values.items(), key=lambda item: item[1].value)
        opt_input = self._result_mapping[opt_index].input[self.input_key]

        return (opt_index, opt_input, opt_output)

    def get_engine_outputs(self):
        return {"last_particles": orm.List(list=self.local_best.tolist()).store()}


[docs]class ParticleSwarm(OptimizationEngineWrapper): """ Engine to perform the Particle-Swarm optimization (http://dx.doi.org/10.1109/CEC.2003.1299391). :param particles: The current / initial set of particles. Must be a list of shape (M, N), where N is the dimension of the problem and M is free to choose, it will be the number of particles! :type particles: array :param max_iter: Maximum number of iteration steps. :type max_iter: int :param input_key: Name of the input argument in the evaluation process. :type input_key: str :param result_key: Name of the output argument in the evaluation process. :type result_key: str """ _IMPL_CLASS = _ParticleSwarmImpl def __new__( # pylint: disable=arguments-differ,too-many-arguments cls, particles, max_iter=20, input_key="x", result_key="result", logger=None, ): return cls._IMPL_CLASS( # pylint: disable=no-member particles=particles, max_iter=max_iter, input_key=input_key, result_key=result_key, logger=logger, )