From 5c58374fa966b0669b4270e952e066220f158959 Mon Sep 17 00:00:00 2001 From: Daniel Weindl Date: Mon, 16 Sep 2024 14:02:14 +0200 Subject: [PATCH] SacessOptimizer: expose more hyperparameters + minor fixes (#1459) * Introduces a new SacessOptions class to collect SacessOptimizer hyperparameters and makes some previously hard-coded values configurable. Closes #1458 * Fixes a mismatch in adaptation conditions between the original SaCeSS implementation and SacessOptimizer (adaptation condition: AND -> OR) * Log final worker configuration * Sort refset before resizing. --- pypesto/optimize/__init__.py | 1 + pypesto/optimize/ess/__init__.py | 1 + pypesto/optimize/ess/refset.py | 7 +- pypesto/optimize/ess/sacess.py | 162 ++++++++++++++++++++++--------- 4 files changed, 124 insertions(+), 47 deletions(-) diff --git a/pypesto/optimize/__init__.py b/pypesto/optimize/__init__.py index 968fe9e55..9c2679562 100644 --- a/pypesto/optimize/__init__.py +++ b/pypesto/optimize/__init__.py @@ -10,6 +10,7 @@ ESSOptimizer, SacessFidesFactory, SacessOptimizer, + SacessOptions, get_default_ess_options, ) from .load import ( diff --git a/pypesto/optimize/ess/__init__.py b/pypesto/optimize/ess/__init__.py index fef613895..c5f2d0df4 100644 --- a/pypesto/optimize/ess/__init__.py +++ b/pypesto/optimize/ess/__init__.py @@ -10,5 +10,6 @@ from .sacess import ( SacessFidesFactory, SacessOptimizer, + SacessOptions, get_default_ess_options, ) diff --git a/pypesto/optimize/ess/refset.py b/pypesto/optimize/ess/refset.py index 5c75d54a2..dd0556c42 100644 --- a/pypesto/optimize/ess/refset.py +++ b/pypesto/optimize/ess/refset.py @@ -80,7 +80,7 @@ def initialize_random( self, n_diverse: int, ): - """Create initial reference set from random parameters. + """Create an initial reference set from random parameters. Sample ``n_diverse`` random points, populate half of the RefSet using the best solutions and fill the rest with random points. @@ -90,7 +90,7 @@ def initialize_random( self.initialize_from_array(x_diverse=x_diverse, fx_diverse=fx_diverse) def initialize_from_array(self, x_diverse: np.array, fx_diverse: np.array): - """Create initial reference set using the provided points. + """Create an initial reference set using the provided points. Populate half of the RefSet using the best given solutions and fill the rest with a random selection from the remaining points. @@ -174,7 +174,8 @@ def resize(self, new_dim: int): If the dimension does not change, do nothing. If size is decreased, drop entries from the end (i.e., the worst values, assuming it is sorted). If size is increased, the new - entries are filled with randomly and the refset is sorted. + entries are filled with randomly sampled parameters and the refset is + sorted. NOTE: Any attributes are just truncated or filled with zeros. """ diff --git a/pypesto/optimize/ess/sacess.py b/pypesto/optimize/ess/sacess.py index a6ed8915f..775d5458f 100644 --- a/pypesto/optimize/ess/sacess.py +++ b/pypesto/optimize/ess/sacess.py @@ -1,4 +1,5 @@ """Self-adaptive cooperative enhanced scatter search (SACESS).""" +from __future__ import annotations import itertools import logging @@ -11,7 +12,7 @@ from multiprocessing import get_context from multiprocessing.managers import SyncManager from pathlib import Path -from typing import Any, Callable, Optional, Union +from typing import Any, Callable from uuid import uuid1 from warnings import warn @@ -31,6 +32,7 @@ "SacessOptimizer", "get_default_ess_options", "SacessFidesFactory", + "SacessOptions", ] logger = logging.getLogger(__name__) @@ -62,13 +64,14 @@ class SacessOptimizer: def __init__( self, - num_workers: Optional[int] = None, - ess_init_args: Optional[list[dict[str, Any]]] = None, + num_workers: int | None = None, + ess_init_args: list[dict[str, Any]] | None = None, max_walltime_s: float = np.inf, sacess_loglevel: int = logging.INFO, ess_loglevel: int = logging.WARNING, - tmpdir: Union[Path, str] = None, + tmpdir: Path | str = None, mp_start_method: str = "spawn", + options: SacessOptions = None, ): """Construct. @@ -110,6 +113,8 @@ def __init__( mp_start_method: The start method for the multiprocessing context. See :mod:`multiprocessing` for details. + options: + Further optimizer hyperparameters. """ if (num_workers is None and ess_init_args is None) or ( num_workers is not None and ess_init_args is not None @@ -138,10 +143,11 @@ def __init__( self._tmpdir = Path(f"SacessOptimizerTemp-{str(uuid1())[:8]}") self._tmpdir = Path(self._tmpdir).absolute() self._tmpdir.mkdir(parents=True, exist_ok=True) - self.histories: Optional[ - list["pypesto.history.memory.MemoryHistory"] - ] = None + self.histories: list[ + pypesto.history.memory.MemoryHistory + ] | None = None self.mp_ctx = get_context(mp_start_method) + self.options = options or SacessOptions() def minimize( self, @@ -212,6 +218,7 @@ def minimize( shmem_manager=shmem_manager, ess_options=ess_init_args, dim=problem.dim, + options=self.options, ) # create workers workers = [ @@ -225,6 +232,7 @@ def minimize( tmp_result_file=SacessWorker.get_temp_result_filename( worker_idx, self._tmpdir ), + options=self.options, ) for worker_idx, ess_kwargs in enumerate(ess_init_args) ] @@ -344,15 +352,13 @@ class SacessManager: more promising the respective worker is considered) _worker_comms: Number of communications received from the individual workers - _rejections: Number of rejected solutions received from workers since last - adaptation of ``_rejection_threshold``. + _rejections: Number of rejected solutions received from workers since the + last adaptation of ``_rejection_threshold``. _rejection_threshold: Threshold for relative objective improvements that incoming solutions have to pass to be accepted - _rejection_threshold_min: ``_rejection_threshold`` will be reduced (halved) - if too few solutions are accepted. This value is the lower limit for - ``_rejection_threshold``. _lock: Lock for accessing shared state. _logger: A logger instance + _options: Further optimizer hyperparameters. """ def __init__( @@ -360,7 +366,9 @@ def __init__( shmem_manager: SyncManager, ess_options: list[dict[str, Any]], dim: int, + options: SacessOptions = None, ): + self._options = options or SacessOptions() self._num_workers = len(ess_options) self._ess_options = [shmem_manager.dict(o) for o in ess_options] self._best_known_fx = shmem_manager.Value("d", np.inf) @@ -370,8 +378,9 @@ def __init__( # [PenasGon2017]_ p.9 is 0.1. # However, their implementation uses 0.1 *percent*. I assume this is a # mistake in the paper. - self._rejection_threshold = shmem_manager.Value("d", 0.001) - self._rejection_threshold_min = 0.001 + self._rejection_threshold = shmem_manager.Value( + "d", self._options.manager_initial_rejection_threshold + ) # scores of the workers, ordered by worker-index # initial score is the worker index @@ -433,7 +442,7 @@ def submit_solution( np.isfinite(fx) and not np.isfinite(self._best_known_fx.value) ) - # avoid division by 0. just accept any improvement if best + # avoid division by 0. just accept any improvement if the best # known value is 0. or (self._best_known_fx.value == 0 and fx < 0) or ( @@ -475,12 +484,12 @@ def submit_solution( f"(threshold: {self._rejection_threshold.value}) " f"(total rejections: {self._rejections.value})." ) - # adapt acceptance threshold if too many solutions have been - # rejected + # adapt the acceptance threshold if too many solutions have + # been rejected if self._rejections.value >= self._num_workers: self._rejection_threshold.value = min( self._rejection_threshold.value / 2, - self._rejection_threshold_min, + self._options.manager_minimum_rejection_threshold, ) self._logger.debug( "Lowered acceptance threshold to " @@ -507,9 +516,6 @@ class SacessWorker: to the manager. _ess_kwargs: ESSOptimizer options for this worker (may get updated during the self-adaptive step). - _acceptance_threshold: Minimum relative improvement of the objective - compared to the best known value to be eligible for submission to the - Manager. _n_sent_solutions: Number of solutions sent to the Manager. _max_walltime_s: Walltime limit. _logger: A Logger instance. @@ -527,6 +533,7 @@ def __init__( loglevel: int = logging.INFO, ess_loglevel: int = logging.WARNING, tmp_result_file: str = None, + options: SacessOptions = None, ): self._manager = manager self._worker_idx = worker_idx @@ -534,8 +541,6 @@ def __init__( self._n_received_solutions = 0 self._neval = 0 self._ess_kwargs = ess_kwargs - # Default value from original SaCeSS implementation - self._acceptance_threshold = 0.0001 self._n_sent_solutions = 0 self._max_walltime_s = max_walltime_s self._start_time = None @@ -544,6 +549,7 @@ def __init__( self._logger = None self._tmp_result_file = tmp_result_file self._refset = None + self._options = options or SacessOptions() def run( self, @@ -618,6 +624,7 @@ def run( exit_flag=ess.exit_flag, ) self._manager._result_queue.put(worker_result) + self._logger.debug(f"Final configuration: {self._ess_kwargs}") ess._report_final() def _setup_ess(self, startpoint_method: StartpointMethod) -> ESSOptimizer: @@ -665,19 +672,23 @@ def _cooperate(self): self.replace_solution(self._refset, x=recv_x, fx=recv_fx) def _maybe_adapt(self, problem: Problem): - """Perform adaptation step. + """Perform the adaptation step if needed. Update ESS settings if conditions are met. """ # Update ESS settings if we received way more solutions than we sent - # Magic numbers from [PenasGon2017]_ algorithm 5 + # Note: [PenasGon2017]_ Algorithm 5 uses AND in the following + # condition, but the accompanying implementation uses OR. if ( - self._n_received_solutions > 10 * self._n_sent_solutions + 20 - and self._neval > problem.dim * 5000 + self._n_received_solutions + > self._options.adaptation_sent_coeff * self._n_sent_solutions + + self._options.adaptation_sent_offset + or self._neval > problem.dim * self._options.adaptation_min_evals ): self._ess_kwargs = self._manager.reconfigure_worker( self._worker_idx ) + self._refset.sort() self._refset.resize(self._ess_kwargs["dim_refset"]) self._logger.debug( f"Updated settings on worker {self._worker_idx} to " @@ -693,17 +704,17 @@ def maybe_update_best(self, x: np.array, fx: float): f"Worker {self._worker_idx} maybe sending solution {fx}. " f"best known: {self._best_known_fx}, " f"rel change: {rel_change:.4g}, " - f"threshold: {self._acceptance_threshold}" + f"threshold: {self._options.worker_acceptance_threshold}" ) - # solution improves best value by at least a factor of ... + # solution improves the best value by at least a factor of ... if ( (np.isfinite(fx) and not np.isfinite(self._best_known_fx)) or (self._best_known_fx == 0 and fx < 0) or ( fx < self._best_known_fx and abs((self._best_known_fx - fx) / fx) - > self._acceptance_threshold + > self._options.worker_acceptance_threshold ) ): self._logger.debug( @@ -738,7 +749,7 @@ def replace_solution(refset: RefSet, x: np.array, fx: float): refset.attributes["cooperative_solution"] ) ).size == 0: - # the attribute exists, but no member is marked as cooperative + # the attribute exists, but no member is marked as the cooperative # solution. this may happen if we shrink the refset. cooperative_solution_idx = np.argmax(refset.fx) @@ -767,9 +778,7 @@ def _keep_going(self): return True @staticmethod - def get_temp_result_filename( - worker_idx: int, tmpdir: Union[str, Path] - ) -> str: + def get_temp_result_filename(worker_idx: int, tmpdir: str | Path) -> str: return str(Path(tmpdir, f"sacess-{worker_idx:02d}_tmp.h5").absolute()) @@ -786,7 +795,7 @@ def _run_worker( # different random seeds per process np.random.seed((os.getpid() * int(time.time() * 1000)) % 2**32) - # Forward log messages to logging process + # Forward log messages to the logging process h = logging.handlers.QueueHandler(log_process_queue) worker._logger = logging.getLogger(multiprocessing.current_process().name) worker._logger.addHandler(h) @@ -797,11 +806,9 @@ def _run_worker( def get_default_ess_options( num_workers: int, dim: int, - local_optimizer: Union[ - bool, - "pypesto.optimize.Optimizer", - Callable[..., "pypesto.optimize.Optimizer"], - ] = True, + local_optimizer: bool + | pypesto.optimize.Optimizer + | Callable[..., pypesto.optimize.Optimizer] = True, ) -> list[dict]: """Get default ESS settings for (SA)CESS. @@ -1017,8 +1024,8 @@ class SacessFidesFactory: def __init__( self, - fides_options: Optional[dict[str, Any]] = None, - fides_kwargs: Optional[dict[str, Any]] = None, + fides_options: dict[str, Any] | None = None, + fides_kwargs: dict[str, Any] | None = None, ): if fides_options is None: fides_options = {} @@ -1038,7 +1045,7 @@ def __init__( def __call__( self, max_walltime_s: int, max_eval: int - ) -> "pypesto.optimize.FidesOptimizer": + ) -> pypesto.optimize.FidesOptimizer: """Create a :class:`FidesOptimizer` instance.""" from fides.constants import Options as FidesOptions @@ -1085,5 +1092,72 @@ class SacessWorkerResult: fx: float n_eval: int n_iter: int - history: "pypesto.history.memory.MemoryHistory" + history: pypesto.history.memory.MemoryHistory exit_flag: ESSExitFlag + + +@dataclass +class SacessOptions: + """Container for :class:`SacessOptimizer` hyperparameters. + + Parameters + ---------- + manager_initial_rejection_threshold, manager_minimum_rejection_threshold: + Initial and minimum threshold for relative objective improvements that + incoming solutions have to pass to be accepted. If the number of + rejected solutions exceeds the number of workers, the threshold is + halved until it reaches ``manager_minimum_rejection_threshold``. + + worker_acceptance_threshold: + Minimum relative improvement of the objective compared to the best + known value to be eligible for submission to the Manager. + + adaptation_min_evals, adaptation_sent_offset, adaptation_sent_coeff: + Hyperparameters that control when the workers will adapt their settings + based on the performance of the other workers. + + The adaptation step is performed if all the following conditions are + met: + + * The number of function evaluations since the last solution was sent + to the manager times the number of optimization parameters is greater + than ``adaptation_min_evals``. + + * The number of solutions received by the worker since the last + solution it sent to the manager is greater than + ``adaptation_sent_coeff * n_sent_solutions + adaptation_sent_offset``, + where ``n_sent_solutions`` is the number of solutions sent to the + manager by the given worker. + + """ + + manager_initial_rejection_threshold: float = 0.001 + manager_minimum_rejection_threshold: float = 0.001 + + # Default value from original SaCeSS implementation + worker_acceptance_threshold: float = 0.0001 + + # Magic numbers for adaptation, taken from [PenasGon2017]_ algorithm 5 + adaptation_min_evals: int = 5000 + adaptation_sent_offset: int = 20 + adaptation_sent_coeff: int = 10 + + def __post_init__(self): + if self.adaptation_min_evals < 0: + raise ValueError("adaptation_min_evals must be non-negative.") + if self.adaptation_sent_offset < 0: + raise ValueError("adaptation_sent_offset must be non-negative.") + if self.adaptation_sent_coeff < 0: + raise ValueError("adaptation_sent_coeff must be non-negative.") + if self.manager_initial_rejection_threshold < 0: + raise ValueError( + "manager_initial_rejection_threshold must be non-negative." + ) + if self.manager_minimum_rejection_threshold < 0: + raise ValueError( + "manager_minimum_rejection_threshold must be non-negative." + ) + if self.worker_acceptance_threshold < 0: + raise ValueError( + "worker_acceptance_threshold must be non-negative." + )