"""Optimisation class"""
"""
Copyright (c) 2016-2022, EPFL/Blue Brain Project
This file is part of BluePyOpt <https://github.com/BlueBrain/BluePyOpt>
This library is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License version 3.0 as published
by the Free Software Foundation.
This library is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
details.
You should have received a copy of the GNU Lesser General Public License
along with this library; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
# pylint: disable=R0912, R0914
import random
import logging
import functools
import deap
import deap.base
import deap.algorithms
import deap.tools
from . import algorithms
from . import tools
from . import utils
import bluepyopt.optimisations
logger = logging.getLogger('__main__')
# TODO decide which variables go in constructor,which ones go in 'run' function
# TODO abstract the algorithm by creating a class for every algorithm, that way
# settings of the algorithm can be stored in objects of these classes
[docs]
class WeightedSumFitness(deap.base.Fitness):
"""Fitness that compares by weighted sum"""
def __init__(self, values=(), obj_size=None):
self.weights = [-1.0] * obj_size if obj_size is not None else [-1]
super(WeightedSumFitness, self).__init__(values)
@property
def weighted_sum(self):
"""Weighted sum of wvalues"""
return sum(self.wvalues)
@property
def sum(self):
"""Weighted sum of values"""
return sum(self.values)
def __le__(self, other):
return self.weighted_sum <= other.weighted_sum
def __lt__(self, other):
return self.weighted_sum < other.weighted_sum
def __deepcopy__(self, _):
"""Override deepcopy"""
cls = self.__class__
result = cls.__new__(cls)
result.__dict__.update(self.__dict__)
return result
[docs]
class WSListIndividual(list):
"""Individual consisting of list with weighted sum field"""
def __init__(self, *args, **kwargs):
"""Constructor"""
self.fitness = WeightedSumFitness(obj_size=kwargs['obj_size'])
del kwargs['obj_size']
super(WSListIndividual, self).__init__(*args, **kwargs)
[docs]
class DEAPOptimisation(bluepyopt.optimisations.Optimisation):
"""DEAP Optimisation class"""
def __init__(self, evaluator=None,
use_scoop=False,
seed=1,
offspring_size=10,
eta=10,
mutpb=1.0,
cxpb=1.0,
map_function=None,
hof=None,
selector_name=None):
"""Constructor
Args:
evaluator (Evaluator): Evaluator object
use_scoop (bool): use scoop map for parallel computation
seed (float): Random number generator seed
offspring_size (int): Number of offspring individuals in each
generation
eta (float): Parameter that controls how far the crossover and
mutation operator disturbe the original individuals
mutpb (float): Mutation probability
cxpb (float): Crossover probability
map_function (function): Function used to map (parallelise) the
evaluation function calls
hof (hof): Hall of Fame object
selector_name (str): The selector used in the evolutionary
algorithm, possible values are 'IBEA' or 'NSGA2'
"""
super(DEAPOptimisation, self).__init__(evaluator=evaluator)
self.use_scoop = use_scoop
self.seed = seed
self.offspring_size = offspring_size
self.eta = eta
self.cxpb = cxpb
self.mutpb = mutpb
self.map_function = map_function
self.selector_name = selector_name
if self.selector_name is None:
self.selector_name = 'IBEA'
self.hof = hof
if self.hof is None:
self.hof = deap.tools.HallOfFame(10)
# Create a DEAP toolbox
self.toolbox = deap.base.Toolbox()
self.setup_deap()
[docs]
def setup_deap(self):
"""Set up optimisation"""
# Number of objectives
OBJ_SIZE = len(self.evaluator.objectives)
# Set random seed
random.seed(self.seed)
# Eta parameter of crossover / mutation parameters
# Basically defines how much they 'spread' solution around
# The lower this value, the more spread
ETA = self.eta
# Number of parameters
IND_SIZE = len(self.evaluator.params)
if IND_SIZE == 0:
raise ValueError(
"Length of evaluator.params is zero. At least one "
"non-fix parameter is needed to run an optimization."
)
# Bounds for the parameters
LOWER = []
UPPER = []
for parameter in self.evaluator.params:
LOWER.append(parameter.lower_bound)
UPPER.append(parameter.upper_bound)
# Register the 'uniform' function
self.toolbox.register("uniformparams", utils.uniform, LOWER, UPPER,
IND_SIZE)
# Register the individual format
# An indiviual is create by WSListIndividual and parameters
# are initially
# picked by 'uniform'
self.toolbox.register(
"Individual",
deap.tools.initIterate,
functools.partial(WSListIndividual, obj_size=OBJ_SIZE),
self.toolbox.uniformparams)
# Register the population format. It is a list of individuals
self.toolbox.register(
"population",
deap.tools.initRepeat,
list,
self.toolbox.Individual)
# Register the evaluation function for the individuals
# import deap_efel_eval1
self.toolbox.register(
"evaluate",
self.evaluator.init_simulator_and_evaluate_with_lists
)
# Register the mate operator
self.toolbox.register(
"mate",
deap.tools.cxSimulatedBinaryBounded,
eta=ETA,
low=LOWER,
up=UPPER)
# Register the mutation operator
self.toolbox.register(
"mutate",
deap.tools.mutPolynomialBounded,
eta=ETA,
low=LOWER,
up=UPPER,
indpb=0.5)
# Register the variate operator
self.toolbox.register("variate", deap.algorithms.varAnd)
# Register the selector (picks parents from population)
if self.selector_name == 'IBEA':
self.toolbox.register("select", tools.selIBEA)
elif self.selector_name == 'NSGA2':
self.toolbox.register("select", deap.tools.emo.selNSGA2)
else:
raise ValueError('DEAPOptimisation: Constructor selector_name '
'argument only accepts "IBEA" or "NSGA2"')
import copyreg
import types
copyreg.pickle(types.MethodType, utils.reduce_method)
if self.use_scoop:
if self.map_function:
raise Exception(
'Impossible to use scoop is providing self '
'defined map function: %s' %
self.map_function)
from scoop import futures
self.toolbox.register("map", futures.map)
elif self.map_function:
self.toolbox.register("map", self.map_function)
[docs]
def run(self,
max_ngen=10,
offspring_size=None,
continue_cp=False,
cp_filename=None,
cp_frequency=1,
cp_period=None,
parent_population=None,
terminator=None):
"""Run optimisation"""
# Allow run function to override offspring_size
# TODO probably in the future this should not be an object field
# anymore
# keeping for backward compatibility
if offspring_size is None:
offspring_size = self.offspring_size
# Generate the population object
if parent_population is not None:
if len(parent_population) != offspring_size:
offspring_size = len(parent_population)
self.offspring_size = len(parent_population)
logger.warning(
'The length of the provided population is different from '
'the offspring_size. The offspring_size will be '
'overwritten.'
)
OBJ_SIZE = len(self.evaluator.objectives)
IND_SIZE = len(self.evaluator.params)
pop = []
for ind in parent_population:
if len(ind) != IND_SIZE:
raise Exception(
'The length of the provided individual is not equal '
'to the number of parameter in the evaluator ')
pop.append(WSListIndividual(ind, obj_size=OBJ_SIZE))
else:
pop = self.toolbox.population(n=offspring_size)
stats = deap.tools.Statistics(key=lambda ind: ind.fitness.sum)
import numpy
stats.register("avg", numpy.mean)
stats.register("std", numpy.std)
stats.register("min", numpy.min)
stats.register("max", numpy.max)
param_names = []
if hasattr(self.evaluator, "param_names"):
param_names = self.evaluator.param_names
pop, hof, log, history = algorithms.eaAlphaMuPlusLambdaCheckpoint(
pop,
self.toolbox,
offspring_size,
self.cxpb,
self.mutpb,
max_ngen,
stats=stats,
halloffame=self.hof,
cp_frequency=cp_frequency,
cp_period=None,
continue_cp=continue_cp,
cp_filename=cp_filename,
terminator=terminator,
param_names=param_names)
# Update hall of fame
self.hof = hof
return pop, self.hof, log, history
[docs]
class IBEADEAPOptimisation(DEAPOptimisation):
"""IBEA DEAP class"""
def __init__(self, *args, **kwargs):
"""Constructor"""
super(IBEADEAPOptimisation, self).__init__(*args, **kwargs)