Source code for gryffin.gryffin

#!/usr/bin/env python

__author__ = 'Florian Hase, Matteo Aldeghi'

from .acquisition import Acquisition
from .bayesian_network import BayesianNetwork
from .descriptor_generator import DescriptorGenerator
from .observation_processor import ObservationProcessor, param_vectors_to_dicts, param_dicts_to_vectors
from .random_sampler import RandomSampler
from .sample_selector import SampleSelector
from .utilities import ConfigParser, Logger, GryffinNotFoundError
from .utilities import (
    parse_time,
    memory_usage,
    estimate_feas_fraction,
    compute_constrained_cartesian,
)

import os
import numpy as np
import pandas as pd
import time
from contextlib import nullcontext
from typing import Callable, Union, List, Dict


[docs]class Gryffin(Logger):
[docs] def __init__( self, config_file: str = None, config_dict: Dict = None, known_constraints: Callable[[Dict], bool] = None, frac_feas = None, silent: bool = False, ) -> None: """ Initialize Gryffin from a config dict or file. A config file or dict must be provided. If both a config file and a config dict are provided, the file will be ignored. :param config_file: Gryffin config filepath :param config_dict: Gryffin config dict :param known_constraints: Function imposing contraints on the Gryffin search space. :param frac_feas: Feasability fraction TODO: Not sure what this is for :param silent: Suppress all standard output. If True, the ``verbosity`` settings in ``config`` will be overwritten. Default is False. """ # parse configuration self.config = ConfigParser(config_file, config_dict) self.config.parse() self.config.set_home(os.path.dirname(os.path.abspath(__file__))) # set verbosity if silent is True: self.verbosity = 2 self.config.general.verbosity = 2 else: self.verbosity = self.config.get('verbosity') Logger.__init__(self, 'Gryffin', verbosity=self.verbosity) # parse constraints function self.known_constraints = known_constraints if self.known_constraints: if frac_feas is not None: # override the feasible fraction self.frac_feas = frac_feas else: # if we have known constraints, estimate the feasible fraction self.frac_feas = estimate_feas_fraction(self.known_constraints, self.config) else: # no known constriants, assume full domain is feasibile self.frac_feas = 1. # if param space is fully categorical, maintain list of all options if np.all([p['type'] in ['categorical', 'discrete'] for p in self.config.parameters]): self.all_options = compute_constrained_cartesian(self.known_constraints, self.config) else: self.all_options = None # store timings for possible analysis self.timings = {} np.random.seed(self.config.get('random_seed')) self._create_folders() # folders created only if we are saving to database # Instantiate all objects needed self.random_sampler = RandomSampler(self.config, constraints=self.known_constraints) self.obs_processor = ObservationProcessor(self.config) self.descriptor_generator = DescriptorGenerator(self.config) self.descriptor_generator_feas = DescriptorGenerator(self.config) self.bayesian_network = BayesianNetwork(config=self.config, frac_feas=self.frac_feas) self.acquisition = Acquisition(self.config, known_constraints=self.known_constraints) self.sample_selector = SampleSelector(self.config, self.all_options) self.iter_counter = 0 self.sampling_param_values = None self.sampling_strategies = None self.num_batches = None # attributes used mainly for investigation/debugging self.parsed_input_data = {} self.proposals = None
def _create_folders(self): if self.config.get('save_database') is True and not os.path.isdir(self.config.get_db('path')): try: os.mkdir(self.config.get_db('path')) except FileNotFoundError: GryffinNotFoundError('Could not create database directory: %s' % self.config.get_db('path')) if self.config.get('save_database') is True: from .database_handler import DatabaseHandler self.db_handler = DatabaseHandler(self.config)
[docs] def build_surrogate(self, observations: List = None) -> None: """Builds surrogate models of Gryffin without proposing any new experiments. :param observations: List of dictionaries with the previous observations """ self.log('', 'INFO') self.log_chapter("Gryffin", line='=', style='bold #d9ed92') if observations is None or len(observations) == 0: self.log('Could not find any observations, cannot build surrogate models', 'WARNING') return None self.log(f'{len(observations)} observations found', 'STATS') # obs_params == all observed parameters # obs_objs == all observed objective function evaluations (including NaNs) # obs_feas == whether observed parameters are feasible (0) or infeasible (1) # mask_kwn == mask that selects only known/feasible params/objs (including mirrored params) # mask_mirror == mask that selects the parameters that have been mirrored across opt bounds obs_params, obs_objs, obs_feas, mask_kwn, mask_mirror = self.obs_processor.process_observations(observations) # keep for inspection/debugging self.parsed_input_data['obs_params'] = obs_params self.parsed_input_data['obs_objs'] = obs_objs self.parsed_input_data['obs_feas'] = obs_feas self.parsed_input_data['mask_kwn'] = mask_kwn self.parsed_input_data['mask_mirror'] = mask_mirror # ----------------------------- # Build categorical descriptors # ----------------------------- # can generate descriptors if we have: # (i) at least 3 feasible observations (normal desc generation) # (ii) at least 2 feasible and 1 infeasible observation (desc generation for feasibility) can_generate_desc = len(obs_params[mask_kwn]) > 3 or (len(obs_params) > 3 and np.sum(obs_feas) > 0.1) if self.config.get('auto_desc_gen') is True and can_generate_desc is True: self.log_chapter('Descriptor Refinement') start = time.time() # use status context manager only at INFO verbosity level if self.verbosity > 3.5: cm = self.console.status("Refining categories descriptors...") else: cm = nullcontext() with cm: # only feasible points with known objectives if len(obs_params[mask_kwn]) > 3: self.descriptor_generator.generate_descriptors(obs_params[mask_kwn], obs_objs[mask_kwn]) # for feasibility descriptors, we use all data, but we run descriptor generation # only if we have at least 1 infeasible point, otherwise they are all feasible and there is no point # running this. Remember that feasible = 0 and infeasible = 1. if len(obs_params) > 3 and np.sum(obs_feas) > 0.1: self.descriptor_generator_feas.generate_descriptors(obs_params, obs_feas) end = time.time() time_string = parse_time(start, end) self.log(f"Categorical descriptors refined by [italic]Dynamic Gryffin[/italic] in {time_string}", "STATS") # extract descriptors and build kernels descriptors_kwn = self.descriptor_generator.get_descriptors() descriptors_feas = self.descriptor_generator_feas.get_descriptors() # ---------------------------------------------- # sample bnn to get kernels for all observations # ---------------------------------------------- self.log_chapter('Bayesian Network') self.bayesian_network.sample(obs_params) # infer kernel densities # build kernel smoothing/classification surrogates self.bayesian_network.build_kernels(descriptors_kwn=descriptors_kwn, descriptors_feas=descriptors_feas, obs_objs=obs_objs, obs_feas=obs_feas, mask_kwn=mask_kwn) # ----------- # Print info # ----------- self.log_chapter('Summary') GB, MB, kB = memory_usage() self.log(f'Memory usage: {GB:.0f} GB, {MB:.0f} MB, {kB:.0f} kB', 'STATS') self.log_chapter("End", line='=', style='bold #d9ed92') self.log('', 'INFO')
[docs] def recommend(self, observations: List = None, sampling_strategies: List = None, num_batches: int = None, as_array: bool = False) -> List: """Recommends the next set(s) of parameters based on the provided observations. :param observations: List of dictionaries with the previous observations. :param sampling_strategies: List of the chosen sampling strategies. When providing this argument, the config setting ``strategies`` will be ignored. :param num_batches: Number of parameter batches requested. When providing this argument, the config setting ``batches`` will be ignored. :param as_array: Whether to return suggested samples as numpy arrays instead of a list of dictionaries. Default is False. :return params: List of dictionaries with the suggested parameters. """ self.log('', 'INFO') self.log_chapter("Gryffin", line='=', style='bold #d9ed92') start_time = time.time() if sampling_strategies is None: num_sampling_strategies = self.config.get('sampling_strategies') # positive lambda is exploitation, negative is exploration # we start with exploitation and follow with exploration # in sample selector, we first choose exploitation, then exploration (with distance penalty for exploration # points close to already selected exploitation ones) sampling_strategies = np.linspace(1, -1, num_sampling_strategies) else: sampling_strategies = np.array(sampling_strategies) num_sampling_strategies = len(sampling_strategies) # register last sampling strategies self.sampling_strategies = sampling_strategies if num_batches is None: self.num_batches = self.config.get('batches') else: self.num_batches = num_batches # print summary of what will be proposed num_recommended_samples = self.num_batches * num_sampling_strategies samples_str = 'samples' if num_recommended_samples > 1 else 'sample' batches_str = 'batches' if self.num_batches > 1 else 'batch' strategy_str = 'strategies' if num_sampling_strategies > 1 else 'strategy' self.log(f'Gryffin will propose {num_recommended_samples} {samples_str}: {self.num_batches} {batches_str} with' f' {num_sampling_strategies} sampling {strategy_str}', 'INFO') # ----------------------------------------------------- # no observations, need to fall back to random sampling # ----------------------------------------------------- if observations is None or len(observations) == 0: self.log('Could not find any observations, falling back to random sampling', 'WARNING') samples = self.random_sampler.draw(num=num_recommended_samples) if self.config.process_constrained: dominant_features = self.config.feature_process_constrained samples[:, dominant_features] = samples[0, dominant_features] # if fully categorical, remove random samples from list of available options if np.all([p['type']=='categorical' for p in self.config.parameters]): for sample in samples: sample_ix = np.where(np.all(self.all_options==sample, axis=1))[0] self.all_options = np.delete(self.all_options, sample_ix, axis=0) # update sample selector attribute setattr(self.sample_selector, 'all_options', self.all_options) # -------------------- # we have observations # -------------------- else: self.log(f'{len(observations)} observations found', 'STATS') # obs_params == all observed parameters # obs_objs == all observed objective function evaluations (including NaNs) # obs_feas == whether observed parameters are feasible (0) or infeasible (1) # mask_kwn == mask that selects only known/feasible params/objs (including mirrored params) # mask_mirror == mask that selects the parameters that have been mirrored across opt bounds obs_params, obs_objs, obs_feas, mask_kwn, mask_mirror = self.obs_processor.process_observations(observations) # keep for inspection/debugging self.parsed_input_data['obs_params'] = obs_params self.parsed_input_data['obs_objs'] = obs_objs self.parsed_input_data['obs_feas'] = obs_feas self.parsed_input_data['mask_kwn'] = mask_kwn self.parsed_input_data['mask_mirror'] = mask_mirror # ----------------------------- # Build categorical descriptors # ----------------------------- # can generate descriptors if we have: # (i) at least 4 feasible observations (normal desc generation) # (ii) at least 4 feasible and 1 infeasible observation (desc generation for feasibility) can_generate_desc = len(obs_params[mask_kwn]) > 3 or (len(obs_params) > 3 and np.sum(obs_feas) > 0.1) if self.config.get('auto_desc_gen') is True and can_generate_desc is True: self.log_chapter('Descriptor Refinement') start = time.time() #with self.console.status("Refining categories descriptors..."): #import pdb; pdb.set_trace() # only feasible points with known objectives if len(obs_params[mask_kwn]) > 3: self.descriptor_generator.generate_descriptors(obs_params[mask_kwn], obs_objs[mask_kwn]) # for feasibility descriptors, we use all data, but we run descriptor generation # only if we have at least 1 infeasible point, otherwise they are all feasible and there is no point # running this. Remember that feasible = 0 and infeasible = 1. if len(obs_params) > 3 and np.sum(obs_feas) > 0.1: self.descriptor_generator_feas.generate_descriptors(obs_params, obs_feas) end = time.time() time_string = parse_time(start, end) self.log(f"Categorical descriptors refined by [italic]Dynamic Gryffin[/italic] in {time_string}", "STATS") # extract descriptors and build kernels descriptors_kwn = self.descriptor_generator.get_descriptors() descriptors_feas = self.descriptor_generator_feas.get_descriptors() # ---------------------------------------------- # get lambda values for exploration/exploitation # ---------------------------------------------- self.sampling_param_values = sampling_strategies * self.bayesian_network.inverse_volume dominant_strategy_index = self.iter_counter % len(self.sampling_param_values) dominant_strategy_value = np.array([self.sampling_param_values[dominant_strategy_index]]) # ---------------------------------------------- # sample bnn to get kernels for all observations # ---------------------------------------------- self.log_chapter('Bayesian Network') self.bayesian_network.sample(obs_params) # infer kernel densities # build kernel smoothing/classification surrogates self.bayesian_network.build_kernels(descriptors_kwn=descriptors_kwn, descriptors_feas=descriptors_feas, obs_objs=obs_objs, obs_feas=obs_feas, mask_kwn=mask_kwn) # get incumbent if len(obs_params[mask_kwn]) > 0: # if we have kwn samples ==> pick params with best merit best_params = obs_params[mask_kwn][np.argmin(obs_objs[mask_kwn])] else: # if we have do not have any feasible sample ==> pick any feasible param at random best_params_idx = np.random.randint(low=0, high=len(obs_params)) best_params = obs_params[best_params_idx] # ---------------------------------------------- # optimize acquisition and select samples # ---------------------------------------------- num_samples_per_dim = self.config.get('num_random_samples') # if there are process constraining parameters, run those first if self.config.process_constrained: self.proposals = self.acquisition.propose(best_params=best_params, bayesian_network=self.bayesian_network, sampling_param_values=self.sampling_param_values, num_samples_per_dim=num_samples_per_dim, dominant_samples=None, timings_dict=None) constraining_samples = self.sample_selector.select(num_batches=self.num_batches, proposals=self.proposals, eval_acquisition=self.acquisition.eval_acquisition, sampling_param_values=dominant_strategy_value, obs_params=obs_params) else: constraining_samples = None # then select the remaining proposals # note num_samples get multiplied by the number of input variables self.log_chapter('Acquisition') self.proposals = self.acquisition.propose(best_params=best_params, bayesian_network=self.bayesian_network, sampling_param_values=self.sampling_param_values, num_samples_per_dim=num_samples_per_dim, dominant_samples=constraining_samples, timings_dict=self.timings) self.log_chapter('Sample Selector') # note: provide `obs_params` as it contains the params for _all_ samples, including the unfeasible ones samples = self.sample_selector.select(num_batches=self.num_batches, proposals=self.proposals, eval_acquisition=self.acquisition.eval_acquisition, sampling_param_values=self.sampling_param_values, obs_params=obs_params) # -------------------------------- # Print overall info for recommend # -------------------------------- self.log_chapter('Summary') GB, MB, kB = memory_usage() self.log(f'Memory usage: {GB:.0f} GB, {MB:.0f} MB, {kB:.0f} kB', 'STATS') end_time = time.time() time_string = parse_time(start_time, end_time) self.log(f'Overall time required: {time_string}', 'STATS') self.log_chapter("End", line='=', style='bold #d9ed92') self.log('', 'INFO') # ----------------------- # Return proposed samples # ----------------------- if as_array: # return as is return_samples = samples else: # return as dictionary return_samples = param_vectors_to_dicts(param_vectors=samples, param_names=self.config.param_names, param_options=self.config.param_options, param_types=self.config.param_types) if self.config.get('save_database') is True: db_entry = {'start_time': start_time, 'end_time': end_time, 'received_obs': observations, 'suggested_params': return_samples} if self.config.get('auto_desc_gen') is True: # save summary of learned descriptors descriptor_summary = self.descriptor_generator.get_summary() db_entry['descriptor_summary'] = descriptor_summary self.db_handler.save(db_entry) self.iter_counter += 1 return return_samples
def read_db(self, outfile='database.csv', verbose=True): self.db_handler.read_db(outfile, verbose) @staticmethod def _df_to_list_of_dicts(df): list_of_dicts = [] for index, row in df.iterrows(): d = {} for col in df.columns: d[col] = row[col] list_of_dicts.append(d) return list_of_dicts
[docs] def get_regression_surrogate(self, params: Union[List, pd.DataFrame]): """Retrieve the surrogate model. :param params: List of dicts with input parameters to evaluate. Alternatively it can also be a pandas DataFrame where each column name corresponds to one of the input parameters in Gryffin. :return y_pred: Surrigate model evaluated at the locations defined in params. """ if isinstance(params, pd.DataFrame): params = self._df_to_list_of_dicts(params) X = param_dicts_to_vectors(params, param_names=self.config.param_names, param_options=self.config.param_options, param_types=self.config.param_types) y_preds = [] for x in X: y_pred = self.bayesian_network.regression_surrogate(x.astype(np.float64)) y_preds.append(y_pred) # invert transform the surrogate according to the chosen transform y_preds = np.array(y_preds) transform = self.config.get('obj_transform') if transform is None: pass elif transform == 'sqrt': # accentuate global minimum y_preds = np.square(y_preds) elif transform == 'cbrt': # accentuate global minimum more than sqrt y_preds = np.power(y_preds, 3) elif transform == 'square': # de-emphasise global minimum y_preds = np.sqrt(y_preds) # scale the predicted objective back to the original range if self.obs_processor.min_obj != self.obs_processor.max_obj: y_preds = y_preds * (self.obs_processor.max_obj - self.obs_processor.min_obj) + self.obs_processor.min_obj else: y_preds = y_preds + self.obs_processor.min_obj return y_preds
[docs] def get_feasibility_surrogate(self, params: Union[List, pd.DataFrame], threshold: float = None) -> List: """Retrieve the feasibility surrogate model. :param params: List of dicts with input parameters to evaluate. Alternatively it can also be a pandas DataFrame where each column name corresponds to one of the input parameters in Gryffin. :param threshold: Threshold used to classify whether a set of parameters is feasible or not. If ``None``, the probability of feasibility is returned instead of a binary True/False (feasible/infeasible) output. Default is None. :return y_pred: Surrogate model evaluated at the locations defined in params. """ if isinstance(params, pd.DataFrame): params = self._df_to_list_of_dicts(params) X = param_dicts_to_vectors(params, param_names=self.config.param_names, param_options=self.config.param_options, param_types=self.config.param_types) y_preds = [] for x in X: if threshold is None: y_pred = self.bayesian_network.prob_feasible(x) else: y_pred = self.bayesian_network.classification_surrogate(x, threshold=threshold) y_preds.append(y_pred) return np.array(y_preds)
[docs] def get_kernel_density_estimate(self, params: Union[List, pd.DataFrame], separate_kwn_ukwn: bool = False) -> List: """Retrieve the feasibility surrogate model. :param params: List of dicts with input parameters to evaluate. Alternatively it can also be a pandas DataFrame where each column name corresponds to one of the input parameters in Gryffin. :param separate_kwn_ukwn: Return the density for all samples, or separate the density for feasible/infeasible samples. :return y_pred: Kernel density estimates. """ if isinstance(params, pd.DataFrame): params = self._df_to_list_of_dicts(params) X = param_dicts_to_vectors(params, param_names=self.config.param_names, param_options=self.config.param_options, param_types=self.config.param_types) y_preds = [] for x in X: log_density_0, log_density_1 = self.bayesian_network.kernel_classification.get_binary_kernel_densities(x.astype(np.float64)) density_0 = np.exp(log_density_0) density_1 = np.exp(log_density_1) if separate_kwn_ukwn is True: y_pred = [density_0, density_1] else: y_pred = density_0 + density_1 y_preds.append(y_pred) return np.array(y_preds)
[docs] def get_acquisition(self, X): """Retrieve the last acquisition functions for a specific lambda value. """ if isinstance(X, pd.DataFrame): X = self._df_to_list_of_dicts(X) X_parsed = param_dicts_to_vectors(X, param_names=self.config.param_names, param_options=self.config.param_options, param_types=self.config.param_types) # collect acquisition values acquisition_values = {} for batch_index, sampling_param in enumerate(self.sampling_param_values): acquisition_values_at_l = [] for Xi_parsed in X_parsed: acq_value = self.acquisition.eval_acquisition(Xi_parsed, batch_index) acquisition_values_at_l.append(acq_value) lambda_value = self.sampling_strategies[batch_index] acquisition_values[lambda_value] = np.array(acquisition_values_at_l) return acquisition_values
[docs] def get_descriptor_summary_regression(self): ''' Retrieve a summary of descriptor relavances for the regression surrogate ''' return self.descriptor_generator.get_summary()
[docs] def get_descriptor_summary_feasibility(self): ''' Retrieve a summary of the descriptor relavances for the feasibility surrogate ''' return self.descriptor_generator_feas.get_summary()