diff --git a/bayes_opt/acquisition.py b/bayes_opt/acquisition.py index f8dfef1c..68cdf3ae 100644 --- a/bayes_opt/acquisition.py +++ b/bayes_opt/acquisition.py @@ -63,39 +63,27 @@ class AcquisitionFunction(abc.ABC): """ def __init__(self, random_state: int | RandomState | None = None) -> None: - self.random_state = ensure_rng(random_state) - self.i = 0 - - def _serialize_random_state(self) -> dict | None: - """Convert random state to JSON serializable format.""" - if self.random_state is not None: - state = self.random_state.get_state() - return { - "bit_generator": state[0], - "state": state[1].tolist(), # Convert numpy array to list - "pos": state[2], - "has_gauss": state[3], - "cached_gaussian": state[4], - } - return None - - def _deserialize_random_state(self, state_dict: dict | None) -> None: - """Restore random state from JSON serializable format.""" - if state_dict is not None: - if self.random_state is None: - self.random_state = RandomState() - state = ( - state_dict["bit_generator"], - np.array(state_dict["state"], dtype=np.uint32), - state_dict["pos"], - state_dict["has_gauss"], - state_dict["cached_gaussian"], + """ + Initializes the acquisition function and issues a deprecation warning if a random state is provided. + + Args: + random_state: Deprecated. If provided, a warning is issued and the value is ignored. + """ + if random_state is not None: + msg = ( + "Providing a random_state to an acquisition function during initialization is deprecated " + "and will be ignored. The random_state should be provided during the suggest() call." ) - self.random_state.set_state(state) + warnings.warn(msg, DeprecationWarning, stacklevel=2) + self.i = 0 @abc.abstractmethod def base_acq(self, *args: Any, **kwargs: Any) -> NDArray[Float]: - """Provide access to the base acquisition function.""" + """ + Computes the base acquisition function value. + + This method should be implemented by subclasses to return the acquisition value given predictive statistics such as mean and standard deviation. + """ def _fit_gp(self, gp: GaussianProcessRegressor, target_space: TargetSpace) -> None: # Sklearn's GP throws a large number of warnings at times, but @@ -139,34 +127,28 @@ def suggest( gp: GaussianProcessRegressor, target_space: TargetSpace, n_random: int = 10_000, - n_l_bfgs_b: int = 10, + n_smart: int = 10, fit_gp: bool = True, + random_state: int | RandomState | None = None, ) -> NDArray[Float]: - """Suggest a promising point to probe next. - - Parameters - ---------- - gp : GaussianProcessRegressor - A fitted Gaussian Process. - - target_space : TargetSpace - The target space to probe. - - n_random : int, default 10_000 - Number of random samples to use. - - n_l_bfgs_b : int, default 10 - Number of starting points for the L-BFGS-B optimizer. - - fit_gp : bool, default True - Whether to fit the Gaussian Process to the target space. - Set to False if the GP is already fitted. - - Returns - ------- - np.ndarray - Suggested point to probe next. """ + Suggests the next candidate point to evaluate by maximizing the acquisition function. + + Selects a promising point in the parameter space using a combination of random sampling and local optimization, optionally fitting the Gaussian Process to the current data. Raises an error if no previous samples exist in the target space. + + Args: + n_random: Number of random samples for initial exploration. + n_smart: Number of local optimization runs for refining the suggestion. + fit_gp: If True, fits the Gaussian Process to the target space before suggestion. + random_state: Seed or random state for reproducibility. + + Returns: + The suggested parameter vector as a NumPy array. + + Raises: + TargetSpaceEmptyError: If the target space contains no previous samples. + """ + random_state = ensure_rng(random_state) if len(target_space) == 0: msg = ( "Cannot suggest a point without previous samples. Use " @@ -179,31 +161,22 @@ def suggest( self._fit_gp(gp=gp, target_space=target_space) acq = self._get_acq(gp=gp, constraint=target_space.constraint) - return self._acq_min(acq, target_space, n_random=n_random, n_l_bfgs_b=n_l_bfgs_b) + return self._acq_min(acq, target_space, n_random=n_random, n_smart=n_smart, random_state=random_state) def _get_acq( self, gp: GaussianProcessRegressor, constraint: ConstraintModel | None = None ) -> Callable[[NDArray[Float]], NDArray[Float]]: - """Prepare the acquisition function for minimization. - - Transforms a base_acq Callable, which takes `mean` and `std` as - input, into an acquisition function that only requires an array of - parameters. - Handles GP predictions and constraints. - - Parameters - ---------- - gp : GaussianProcessRegressor - A fitted Gaussian Process. - - constraint : ConstraintModel, default None - A fitted constraint model, if constraints are present and the - acquisition function supports them. - - Returns - ------- - Callable - Function to minimize. + """ + Creates a callable acquisition function for minimization, incorporating GP predictions and optional constraints. + + If a constraint model is provided, the acquisition function multiplies the base acquisition value by the predicted constraint satisfaction probability. The returned function takes an array of parameters and returns the negative acquisition value for use in minimization routines. + + Args: + gp: A fitted Gaussian Process regressor. + constraint: Optional fitted constraint model. + + Returns: + A callable that computes the (negative) acquisition value for given parameters, accounting for constraints if provided. """ dim = gp.X_train_.shape[1] if constraint is not None: @@ -235,86 +208,72 @@ def _acq_min( self, acq: Callable[[NDArray[Float]], NDArray[Float]], space: TargetSpace, + random_state: RandomState, n_random: int = 10_000, - n_l_bfgs_b: int = 10, + n_smart: int = 10, ) -> NDArray[Float]: - """Find the maximum of the acquisition function. - - Uses a combination of random sampling (cheap) and the 'L-BFGS-B' - optimization method. First by sampling `n_warmup` (1e5) points at random, - and then running L-BFGS-B from `n_iter` (10) random starting points. - - Parameters - ---------- - acq : Callable - Acquisition function to use. Should accept an array of parameters `x`. - - space : TargetSpace - The target space over which to optimize. - - n_random : int - Number of random samples to use. - - n_l_bfgs_b : int - Number of starting points for the L-BFGS-B optimizer. - - Returns - ------- - np.ndarray - Parameters maximizing the acquisition function. - """ - if n_random == 0 and n_l_bfgs_b == 0: - error_msg = "Either n_random or n_l_bfgs_b needs to be greater than 0." + Finds the parameters that maximize the acquisition function over the target space. + + Combines random sampling and local optimization to efficiently search for the maximum. First, it samples `n_random` random points and identifies the best candidates. Then, it refines the search using local optimization (L-BFGS-B for continuous spaces or differential evolution for mixed spaces) starting from the top candidates. Returns the parameters corresponding to the highest acquisition value found. + + Args: + acq: Acquisition function to maximize. + space: Target space for optimization. + random_state: Random state for reproducibility. + n_random: Number of random samples to evaluate. + n_smart: Number of local optimization runs for refinement. + + Returns: + Parameters that maximize the acquisition function. + """ + if n_random == 0 and n_smart == 0: + error_msg = "Either n_random or n_smart needs to be greater than 0." raise ValueError(error_msg) x_min_r, min_acq_r, x_seeds = self._random_sample_minimize( - acq, space, n_random=max(n_random, n_l_bfgs_b), n_x_seeds=n_l_bfgs_b + acq, space, random_state, n_random=max(n_random, n_smart), n_x_seeds=n_smart ) - if n_l_bfgs_b: - x_min_l, min_acq_l = self._smart_minimize(acq, space, x_seeds=x_seeds) - # Either n_random or n_l_bfgs_b is not 0 => at least one of x_min_r and x_min_l is not None - if min_acq_r > min_acq_l: - return x_min_l + if n_smart: + x_min_s, min_acq_s = self._smart_minimize(acq, space, x_seeds=x_seeds, random_state=random_state) + # Either n_random or n_smart is not 0 => at least one of x_min_r and x_min_s is not None + if min_acq_r > min_acq_s: + return x_min_s return x_min_r def _random_sample_minimize( self, acq: Callable[[NDArray[Float]], NDArray[Float]], space: TargetSpace, + random_state: RandomState, n_random: int, n_x_seeds: int = 0, - ) -> tuple[NDArray[Float] | None, float]: - """Random search to find the minimum of `acq` function. - - Parameters - ---------- - acq : Callable - Acquisition function to use. Should accept an array of parameters `x`. - - space : TargetSpace - The target space over which to optimize. - - n_random : int - Number of random samples to use. - - n_x_seeds : int - Number of top points to return, for use as starting points for L-BFGS-B. - Returns - ------- - x_min : np.ndarray - Random sample minimizing the acquisition function. - - min_acq : float - Acquisition function value at `x_min` + ) -> tuple[NDArray[Float] | None, float, NDArray[Float]]: + """ + Performs random sampling to find the minimum of the acquisition function. + + Randomly samples points from the target space, evaluates the acquisition function at each, and returns the point with the lowest acquisition value. Optionally returns the top points for use as seeds in further optimization. + + Args: + acq: Acquisition function to minimize. + space: Target space to sample from. + random_state: Random state for reproducibility. + n_random: Number of random samples to draw. + n_x_seeds: Number of top points to return as seeds. + + Returns: + A tuple containing: + - The sampled point with the lowest acquisition value (or None if n_random is 0). + - The minimum acquisition value found. + - An array of the top n_x_seeds points with the lowest acquisition values. """ if n_random == 0: - return None, np.inf - x_tries = space.random_sample(n_random, random_state=self.random_state) + return None, np.inf, space.random_sample(n_x_seeds, random_state=random_state) + x_tries = space.random_sample(n_random, random_state=random_state) ys = acq(x_tries) x_min = x_tries[ys.argmin()] min_acq = ys.min() if n_x_seeds != 0: - idxs = np.argsort(ys)[-n_x_seeds:] + idxs = np.argsort(ys)[:n_x_seeds] x_seeds = x_tries[idxs] else: x_seeds = [] @@ -324,28 +283,22 @@ def _smart_minimize( self, acq: Callable[[NDArray[Float]], NDArray[Float]], space: TargetSpace, - x_seeds: NDArray[Float] | None = None, + x_seeds: NDArray[Float], + random_state: RandomState, ) -> tuple[NDArray[Float] | None, float]: - """Random search to find the minimum of `acq` function. - - Parameters - ---------- - acq : Callable - Acquisition function to use. Should accept an array of parameters `x`. - - space : TargetSpace - The target space over which to optimize. - - x_seeds : int - Starting points for the L-BFGS-B optimizer. - - Returns - ------- - x_min : np.ndarray - Minimal result of the L-BFGS-B optimizer. - - min_acq : float - Acquisition function value at `x_min` + """ + Performs local optimization to find the minimum of the acquisition function. + + Uses L-BFGS-B for continuous parameter spaces and differential evolution for mixed-integer spaces, optionally refining continuous parameters after global optimization. Returns the best found point and its acquisition value. + + Args: + acq: Acquisition function to minimize. + space: Target space defining parameter bounds and types. + x_seeds: Initial points for local or global optimization. + random_state: Random state for reproducibility. + + Returns: + A tuple containing the best found point and its acquisition function value. """ continuous_dimensions = space.continuous_dimensions continuous_bounds = space.bounds[continuous_dimensions] @@ -366,50 +319,58 @@ def _smart_minimize( x_try = res.x x_min = x_try min_acq = np.squeeze(res.fun) - # Case of mixed-integer optimization else: - ntrials = max(1, len(x_seeds) // 100) - - for _ in range(ntrials): - xinit = space.random_sample(15 * len(space.bounds), random_state=self.random_state) - - de_parameters = {"func": acq, "bounds": space.bounds, "polish": False, "init": xinit} - if version.parse(scipy_version) < version.parse("1.15.0"): - de_parameters["seed"] = self.random_state - else: - de_parameters["rng"] = self.random_state - - de = DifferentialEvolutionSolver(**de_parameters) - res_de: OptimizeResult = de.solve() - # Check if success - if not res_de.success: - continue - - x_min = res_de.x - min_acq = np.squeeze(res_de.fun) - - # Refine the identification of continous parameters with deterministic search - if any(continuous_dimensions): - x_try = x_min.copy() - - def continuous_acq(x: NDArray[Float], x_try=x_try) -> NDArray[Float]: - x_try[continuous_dimensions] = x - return acq(x_try) - - res: OptimizeResult = minimize( - continuous_acq, x_min[continuous_dimensions], bounds=continuous_bounds - ) - if np.squeeze(res.fun) >= min_acq and res.success: - x_try[continuous_dimensions] = res.x - x_min = x_try - min_acq = np.squeeze(res.fun) + xinit = space.random_sample(15 * len(space.bounds), random_state=random_state) + if len(x_seeds) > 0: + n_seeds = min(len(x_seeds), len(xinit)) + xinit[:n_seeds] = x_seeds[:n_seeds] + + de_parameters = {"func": acq, "bounds": space.bounds, "polish": False, "init": xinit} + if version.parse(scipy_version) < version.parse("1.15.0"): + de_parameters["seed"] = random_state + else: + de_parameters["rng"] = random_state + + de = DifferentialEvolutionSolver(**de_parameters) + res_de: OptimizeResult = de.solve() + # Check if success + if not res_de.success: + msg = f"Differential evolution optimization failed. Message: {res_de.message}" + raise RuntimeError(msg) + + x_min = res_de.x + min_acq = np.squeeze(res_de.fun) + + # Refine the identification of continous parameters with deterministic search + if any(continuous_dimensions): + x_try = x_min.copy() + + def continuous_acq(x: NDArray[Float], x_try=x_try) -> NDArray[Float]: + """ + Evaluates the acquisition function at a point with updated continuous dimensions. + + Args: + x: Array of values for the continuous dimensions. + + Returns: + The acquisition function value at the point formed by replacing the continuous dimensions of `x_try` with `x`. + """ + x_try[continuous_dimensions] = x + return acq(x_try) + + res: OptimizeResult = minimize( + continuous_acq, x_min[continuous_dimensions], bounds=continuous_bounds + ) + if res.success and np.squeeze(res.fun) < min_acq: + x_try[continuous_dimensions] = res.x + x_min = x_try + min_acq = np.squeeze(res.fun) if min_acq is None: min_acq = np.inf x_min = np.array([np.nan] * space.bounds.shape[0]) - x_min = space.kernel_transform(x_min).reshape(x_min.shape) # Clip output to make sure it lies within the bounds. Due to floating # point technicalities this is not always the case. return np.clip(x_min, space.bounds[:, 0], space.bounds[:, 1]), min_acq @@ -435,9 +396,6 @@ class UpperConfidenceBound(AcquisitionFunction): exploration_decay_delay : int, default None Delay for decay. If None, decay is applied from the start. - random_state : int, RandomState, default None - Set the random state for reproducibility. - """ def __init__( @@ -447,6 +405,18 @@ def __init__( exploration_decay_delay: int | None = None, random_state: int | RandomState | None = None, ) -> None: + """ + Initializes the Upper Confidence Bound (UCB) acquisition function. + + Args: + kappa: Controls the exploration/exploitation tradeoff; must be non-negative. + exploration_decay: Optional decay rate for kappa after each suggestion. + exploration_decay_delay: Optional delay (in iterations) before decay starts. + random_state: Deprecated. If provided, triggers a warning; use random_state in suggest() instead. + + Raises: + ValueError: If kappa is negative. + """ if kappa < 0: error_msg = "kappa must be greater than or equal to 0." raise ValueError(error_msg) @@ -479,33 +449,18 @@ def suggest( gp: GaussianProcessRegressor, target_space: TargetSpace, n_random: int = 10_000, - n_l_bfgs_b: int = 10, + n_smart: int = 10, fit_gp: bool = True, + random_state: int | RandomState | None = None, ) -> NDArray[Float]: - """Suggest a promising point to probe next. - - Parameters - ---------- - gp : GaussianProcessRegressor - A fitted Gaussian Process. - - target_space : TargetSpace - The target space to probe. - - n_random : int, default 10_000 - Number of random samples to use. - - n_l_bfgs_b : int, default 10 - Number of starting points for the L-BFGS-B optimizer. - - fit_gp : bool, default True - Whether to fit the Gaussian Process to the target space. - Set to False if the GP is already fitted. - - Returns - ------- - np.ndarray - Suggested point to probe next. + """ + Suggests the next point to evaluate using the Upper Confidence Bound acquisition function. + + Raises: + ConstraintNotSupportedError: If constraints are present in the target space. + + Returns: + The suggested point as a NumPy array. """ if target_space.constraint is not None: msg = ( @@ -514,7 +469,12 @@ def suggest( ) raise ConstraintNotSupportedError(msg) x_max = super().suggest( - gp=gp, target_space=target_space, n_random=n_random, n_l_bfgs_b=n_l_bfgs_b, fit_gp=fit_gp + gp=gp, + target_space=target_space, + n_random=n_random, + n_smart=n_smart, + fit_gp=fit_gp, + random_state=random_state, ) self.decay_exploration() return x_max @@ -535,32 +495,29 @@ def decay_exploration(self) -> None: self.kappa = self.kappa * self.exploration_decay def get_acquisition_params(self) -> dict: - """Get the current acquisition function parameters. - - Returns - ------- - dict - Dictionary containing the current acquisition function parameters. + """ + Returns the current parameters of the Upper Confidence Bound acquisition function. + + Returns: + A dictionary with keys 'kappa', 'exploration_decay', and 'exploration_decay_delay' + representing the current values of these parameters. """ return { "kappa": self.kappa, "exploration_decay": self.exploration_decay, "exploration_decay_delay": self.exploration_decay_delay, - "random_state": self._serialize_random_state(), } def set_acquisition_params(self, params: dict) -> None: - """Set the acquisition function parameters. - - Parameters - ---------- - params : dict - Dictionary containing the acquisition function parameters. + """ + Sets the parameters for the Upper Confidence Bound acquisition function. + + Args: + params: Dictionary with keys "kappa", "exploration_decay", and "exploration_decay_delay". """ self.kappa = params["kappa"] self.exploration_decay = params["exploration_decay"] self.exploration_decay_delay = params["exploration_decay_delay"] - self._deserialize_random_state(params["random_state"]) class ProbabilityOfImprovement(AcquisitionFunction): @@ -636,33 +593,18 @@ def suggest( gp: GaussianProcessRegressor, target_space: TargetSpace, n_random: int = 10_000, - n_l_bfgs_b: int = 10, + n_smart: int = 10, fit_gp: bool = True, + random_state: int | RandomState | None = None, ) -> NDArray[Float]: - """Suggest a promising point to probe next. - - Parameters - ---------- - gp : GaussianProcessRegressor - A fitted Gaussian Process. - - target_space : TargetSpace - The target space to probe. - - n_random : int, default 10_000 - Number of random samples to use. - - n_l_bfgs_b : int, default 10 - Number of starting points for the L-BFGS-B optimizer. - - fit_gp : bool, default True - Whether to fit the Gaussian Process to the target space. - Set to False if the GP is already fitted. - - Returns - ------- - np.ndarray - Suggested point to probe next. + """ + Suggests the next point to evaluate using the Expected Improvement acquisition function. + + Raises: + NoValidPointRegisteredError: If no valid points satisfying constraints are available in the target space. + + Returns: + The suggested point as a NumPy array. """ y_max = target_space._target_max() if y_max is None and not target_space.empty: @@ -675,7 +617,12 @@ def suggest( raise NoValidPointRegisteredError(msg) self.y_max = y_max x_max = super().suggest( - gp=gp, target_space=target_space, n_random=n_random, n_l_bfgs_b=n_l_bfgs_b, fit_gp=fit_gp + gp=gp, + target_space=target_space, + n_random=n_random, + n_smart=n_smart, + fit_gp=fit_gp, + random_state=random_state, ) self.decay_exploration() return x_max @@ -696,32 +643,27 @@ def decay_exploration(self) -> None: self.xi = self.xi * self.exploration_decay def get_acquisition_params(self) -> dict: - """Get the current acquisition function parameters. - - Returns - ------- - dict - Dictionary containing the current acquisition function parameters. + """ + Returns the current parameters of the acquisition function as a dictionary. + + The dictionary includes the exploration parameter `xi`, the decay rate, and the decay delay. """ return { "xi": self.xi, "exploration_decay": self.exploration_decay, "exploration_decay_delay": self.exploration_decay_delay, - "random_state": self._serialize_random_state(), } def set_acquisition_params(self, params: dict) -> None: - """Set the acquisition function parameters. - - Parameters - ---------- - params : dict - Dictionary containing the acquisition function parameters. + """ + Sets the acquisition function parameters from a dictionary. + + Args: + params: Dictionary with keys "xi", "exploration_decay", and "exploration_decay_delay" specifying the acquisition function's configuration. """ self.xi = params["xi"] self.exploration_decay = params["exploration_decay"] self.exploration_decay_delay = params["exploration_decay_delay"] - self._deserialize_random_state(params["random_state"]) class ExpectedImprovement(AcquisitionFunction): @@ -804,33 +746,18 @@ def suggest( gp: GaussianProcessRegressor, target_space: TargetSpace, n_random: int = 10_000, - n_l_bfgs_b: int = 10, + n_smart: int = 10, fit_gp: bool = True, + random_state: int | RandomState | None = None, ) -> NDArray[Float]: - """Suggest a promising point to probe next. - - Parameters - ---------- - gp : GaussianProcessRegressor - A fitted Gaussian Process. - - target_space : TargetSpace - The target space to probe. - - n_random : int, default 10_000 - Number of random samples to use. - - n_l_bfgs_b : int, default 10 - Number of starting points for the L-BFGS-B optimizer. - - fit_gp : bool, default True - Whether to fit the Gaussian Process to the target space. - Set to False if the GP is already fitted. - - Returns - ------- - np.ndarray - Suggested point to probe next. + """ + Suggests the next point to evaluate using the Expected Improvement acquisition function. + + Raises: + NoValidPointRegisteredError: If no valid points satisfying constraints are available in the target space. + + Returns: + The next suggested point as a NumPy array. """ y_max = target_space._target_max() if y_max is None and not target_space.empty: @@ -844,7 +771,12 @@ def suggest( self.y_max = y_max x_max = super().suggest( - gp=gp, target_space=target_space, n_random=n_random, n_l_bfgs_b=n_l_bfgs_b, fit_gp=fit_gp + gp=gp, + target_space=target_space, + n_random=n_random, + n_smart=n_smart, + fit_gp=fit_gp, + random_state=random_state, ) self.decay_exploration() return x_max @@ -865,32 +797,27 @@ def decay_exploration(self) -> None: self.xi = self.xi * self.exploration_decay def get_acquisition_params(self) -> dict: - """Get the current acquisition function parameters. - - Returns - ------- - dict - Dictionary containing the current acquisition function parameters. + """ + Returns the current parameters of the acquisition function as a dictionary. + + The dictionary includes the exploration parameter `xi`, the decay rate, and the decay delay. """ return { "xi": self.xi, "exploration_decay": self.exploration_decay, "exploration_decay_delay": self.exploration_decay_delay, - "random_state": self._serialize_random_state(), } def set_acquisition_params(self, params: dict) -> None: - """Set the acquisition function parameters. - - Parameters - ---------- - params : dict - Dictionary containing the acquisition function parameters. + """ + Sets the acquisition function parameters from a dictionary. + + Args: + params: Dictionary with keys "xi", "exploration_decay", and "exploration_decay_delay" specifying the acquisition function's configuration. """ self.xi = params["xi"] self.exploration_decay = params["exploration_decay"] self.exploration_decay_delay = params["exploration_decay_delay"] - self._deserialize_random_state(params["random_state"]) class ConstantLiar(AcquisitionFunction): @@ -955,26 +882,23 @@ def base_acq(self, *args: Any, **kwargs: Any) -> NDArray[Float]: return self.base_acquisition.base_acq(*args, **kwargs) def _copy_target_space(self, target_space: TargetSpace) -> TargetSpace: - """Create a copy of the target space. - - Parameters - ---------- - target_space : TargetSpace - The target space to copy. - - Returns - ------- - TargetSpace - A copy of the target space. + """ + Creates a deep copy of the given target space, including its bounds, parameters, targets, and constraints. + + Args: + target_space: The target space to be copied. + + Returns: + A new TargetSpace instance with duplicated data and constraints. """ keys = target_space.keys pbounds = {key: bound for key, bound in zip(keys, target_space.bounds)} target_space_copy = TargetSpace( - None, - pbounds=pbounds, - constraint=target_space.constraint, - allow_duplicate_points=target_space._allow_duplicate_points, + None, pbounds=pbounds, allow_duplicate_points=target_space._allow_duplicate_points ) + if target_space._constraint is not None: + target_space_copy.set_constraint(deepcopy(target_space.constraint)) + target_space_copy._params = deepcopy(target_space._params) target_space_copy._target = deepcopy(target_space._target) @@ -1004,33 +928,22 @@ def suggest( gp: GaussianProcessRegressor, target_space: TargetSpace, n_random: int = 10_000, - n_l_bfgs_b: int = 10, + n_smart: int = 10, fit_gp: bool = True, + random_state: int | RandomState | None = None, ) -> NDArray[Float]: - """Suggest a promising point to probe next. - - Parameters - ---------- - gp : GaussianProcessRegressor - A fitted Gaussian Process. - - target_space : TargetSpace - The target space to probe. - - n_random : int, default 10_000 - Number of random samples to use. - - n_l_bfgs_b : int, default 10 - Number of starting points for the L-BFGS-B optimizer. - - fit_gp : bool, default True - Whether to fit the Gaussian Process to the target space. - Set to False if the GP is already fitted. - - Returns - ------- - np.ndarray - Suggested point to probe next. + """ + Suggests the next point to evaluate using the Constant Liar meta acquisition strategy. + + Creates a copy of the target space augmented with "dummy" points representing pending evaluations, assigns them a target value based on the specified strategy, and fits the Gaussian Process to this augmented space. Returns a new candidate point by delegating to the wrapped base acquisition function. Raises an error if no previous samples exist or if constraints are present. + + Args: + n_random: Number of random samples for initial exploration. + n_smart: Number of starting points for local optimization. + random_state: Random seed or generator for reproducibility. + + Returns: + The suggested point as a NumPy array. """ if len(target_space) == 0: msg = ( @@ -1074,7 +987,12 @@ def suggest( # Fit the GP to the dummy target space and suggest a point self._fit_gp(gp=gp, target_space=dummy_target_space) x_max = self.base_acquisition.suggest( - gp, dummy_target_space, n_random=n_random, n_l_bfgs_b=n_l_bfgs_b, fit_gp=False + gp, + dummy_target_space, + n_random=n_random, + n_smart=n_smart, + fit_gp=False, + random_state=random_state, ) # Register the suggested point as a dummy @@ -1083,12 +1001,10 @@ def suggest( return x_max def get_acquisition_params(self) -> dict: - """Get the current acquisition function parameters. - - Returns - ------- - dict - Dictionary containing the current acquisition function parameters. + """ + Returns a dictionary of the current ConstantLiar acquisition function parameters. + + The dictionary includes the list of dummy points, the parameters of the wrapped base acquisition function, the dummy value strategy, and tolerance settings. """ return { "dummies": [dummy.tolist() for dummy in self.dummies], @@ -1096,23 +1012,19 @@ def get_acquisition_params(self) -> dict: "strategy": self.strategy, "atol": self.atol, "rtol": self.rtol, - "random_state": self._serialize_random_state(), } def set_acquisition_params(self, params: dict) -> None: - """Set the acquisition function parameters. - - Parameters - ---------- - params : dict - Dictionary containing the acquisition function parameters. + """ + Sets the acquisition function parameters from a dictionary. + + Updates the dummy points, base acquisition parameters, strategy, and tolerances based on the provided dictionary. """ self.dummies = [np.array(dummy) for dummy in params["dummies"]] self.base_acquisition.set_acquisition_params(params["base_acquisition_params"]) self.strategy = params["strategy"] self.atol = params["atol"] self.rtol = params["rtol"] - self._deserialize_random_state(params["random_state"]) class GPHedge(AcquisitionFunction): @@ -1147,7 +1059,11 @@ def __init__( self.previous_candidates = None def base_acq(self, *args: Any, **kwargs: Any) -> NoReturn: - """Raise an error, since the base acquisition function is ambiguous.""" + """ + Raises an error because the base acquisition function is not defined for GPHedge. + + Calling this method is invalid, as GPHedge combines multiple acquisition functions and does not have a single base acquisition function. Use the base acquisition function of a specific component instead. + """ msg = ( "GPHedge base acquisition function is ambiguous." " You may use self.base_acquisitions[i].base_acq(mean, std)" @@ -1155,14 +1071,27 @@ def base_acq(self, *args: Any, **kwargs: Any) -> NoReturn: ) raise TypeError(msg) - def _sample_idx_from_softmax_gains(self) -> int: - """Sample an index weighted by the softmax of the gains.""" + def _sample_idx_from_softmax_gains(self, random_state: RandomState) -> int: + """ + Samples an index according to the softmax-weighted probabilities of the current gains. + + Args: + random_state: Random state used for reproducible sampling. + + Returns: + The index of the selected base acquisition function, sampled proportionally to its softmax gain. + """ cumsum_softmax_g = np.cumsum(softmax(self.gains)) - r = self.random_state.rand() + r = random_state.rand() return np.argmax(r <= cumsum_softmax_g) # Returns the first True value def _update_gains(self, gp: GaussianProcessRegressor) -> None: - """Update the gains of the base acquisition functions.""" + """ + Updates the cumulative gains for each base acquisition function using the predicted rewards of previous candidates. + + Args: + gp: A fitted GaussianProcessRegressor used to predict rewards for previous candidates. + """ with warnings.catch_warnings(): warnings.simplefilter("ignore") rewards = gp.predict(self.previous_candidates) @@ -1174,33 +1103,26 @@ def suggest( gp: GaussianProcessRegressor, target_space: TargetSpace, n_random: int = 10_000, - n_l_bfgs_b: int = 10, + n_smart: int = 10, fit_gp: bool = True, + random_state: int | RandomState | None = None, ) -> NDArray[Float]: - """Suggest a promising point to probe next. - - Parameters - ---------- - gp : GaussianProcessRegressor - A fitted Gaussian Process. - - target_space : TargetSpace - The target space to probe. - - n_random : int, default 10_000 - Number of random samples to use. - - n_l_bfgs_b : int, default 10 - Number of starting points for the L-BFGS-B optimizer. - - fit_gp : bool, default True - Whether to fit the Gaussian Process to the target space. - Set to False if the GP is already fitted. - - Returns - ------- - np.ndarray - Suggested point to probe next. + """ + Suggests the next point to evaluate by combining multiple acquisition functions using the GP-Hedge strategy. + + Selects a candidate from several base acquisition functions, weighted by their historical performance, to balance exploration and exploitation in Bayesian Optimization. + + Args: + n_random: Number of random samples for candidate generation per acquisition function. + n_smart: Number of local optimizer starts per acquisition function. + fit_gp: If True, fits the Gaussian Process to the target space before suggesting. + random_state: Seed or random state for reproducibility. + + Returns: + The selected point to probe next as a NumPy array. + + Raises: + TargetSpaceEmptyError: If no previous samples exist in the target space. """ if len(target_space) == 0: msg = ( @@ -1210,6 +1132,7 @@ def suggest( ) raise TargetSpaceEmptyError(msg) self.i += 1 + random_state = ensure_rng(random_state) if fit_gp: self._fit_gp(gp=gp, target_space=target_space) @@ -1223,22 +1146,24 @@ def suggest( gp=gp, target_space=target_space, n_random=n_random // self.n_acq, - n_l_bfgs_b=n_l_bfgs_b // self.n_acq, + n_smart=n_smart // self.n_acq, fit_gp=False, + random_state=random_state, ) for base_acq in self.base_acquisitions ] self.previous_candidates = np.array(x_max) - idx = self._sample_idx_from_softmax_gains() + idx = self._sample_idx_from_softmax_gains(random_state=random_state) return x_max[idx] def get_acquisition_params(self) -> dict: - """Get the current acquisition function parameters. - + """ + Returns a dictionary of the current parameters, gains, and previous candidates for the GPHedge acquisition function. + Returns ------- dict - Dictionary containing the current acquisition function parameters. + A dictionary with keys 'base_acquisitions_params', 'gains', and 'previous_candidates' representing the state of the GPHedge acquisition function. """ return { "base_acquisitions_params": [acq.get_acquisition_params() for acq in self.base_acquisitions], @@ -1246,16 +1171,14 @@ def get_acquisition_params(self) -> dict: "previous_candidates": self.previous_candidates.tolist() if self.previous_candidates is not None else None, - "gphedge_random_state": self._serialize_random_state(), } def set_acquisition_params(self, params: dict) -> None: - """Set the acquisition function parameters. - - Parameters - ---------- - params : dict - Dictionary containing the acquisition function parameters. + """ + Sets the parameters for the GPHedge acquisition function and its base acquisition functions. + + Args: + params: Dictionary containing serialized parameters for each base acquisition, cumulative gains, and previous candidates. """ for acq, acq_params in zip(self.base_acquisitions, params["base_acquisitions_params"]): acq.set_acquisition_params(acq_params) @@ -1264,5 +1187,3 @@ def set_acquisition_params(self, params: dict) -> None: self.previous_candidates = ( np.array(params["previous_candidates"]) if params["previous_candidates"] is not None else None ) - - self._deserialize_random_state(params["gphedge_random_state"]) diff --git a/bayes_opt/bayesian_optimization.py b/bayes_opt/bayesian_optimization.py index 2cad5b8f..c84948d0 100644 --- a/bayes_opt/bayesian_optimization.py +++ b/bayes_opt/bayesian_optimization.py @@ -89,41 +89,40 @@ def __init__( bounds_transformer: DomainTransformer | None = None, allow_duplicate_points: bool = False, ): + """ + Initializes a BayesianOptimization instance for maximizing a target function over a parameter space. + + Configures the optimizer with the target function, parameter bounds, optional acquisition function, optional nonlinear constraint, random state, verbosity, optional domain bounds transformer, and duplicate point handling. Sets up the internal Gaussian Process regressor, parameter space management, constraint modeling, and logging. If no acquisition function is provided, selects an appropriate default based on the presence of constraints. + """ self._random_state = ensure_rng(random_state) self._allow_duplicate_points = allow_duplicate_points self._queue: deque[ParamsType] = deque() if acquisition_function is None: if constraint is None: - self._acquisition_function = acquisition.UpperConfidenceBound( - kappa=2.576, random_state=self._random_state - ) + self._acquisition_function = acquisition.UpperConfidenceBound(kappa=2.576) else: - self._acquisition_function = acquisition.ExpectedImprovement( - xi=0.01, random_state=self._random_state - ) + self._acquisition_function = acquisition.ExpectedImprovement(xi=0.01) else: self._acquisition_function = acquisition_function + # Data structure containing the function to be optimized, the + # bounds of its domain, and a record of the evaluations we have + # done so far + self._space = TargetSpace( + f, pbounds, random_state=random_state, allow_duplicate_points=self._allow_duplicate_points + ) if constraint is None: - # Data structure containing the function to be optimized, the - # bounds of its domain, and a record of the evaluations we have - # done so far - self._space = TargetSpace( - f, pbounds, random_state=random_state, allow_duplicate_points=self._allow_duplicate_points - ) self.is_constrained = False else: constraint_ = ConstraintModel( - constraint.fun, constraint.lb, constraint.ub, random_state=random_state - ) - self._space = TargetSpace( - f, - pbounds, - constraint=constraint_, + constraint.fun, + constraint.lb, + constraint.ub, + transform=self._space.kernel_transform, random_state=random_state, - allow_duplicate_points=self._allow_duplicate_points, ) + self._space.set_constraint(constraint_) self.is_constrained = True # Internal GP regressor @@ -248,12 +247,22 @@ def probe(self, params: ParamsType, lazy: bool = True) -> None: ) def suggest(self) -> dict[str, float | NDArray[Float]]: - """Suggest a promising point to probe next.""" + """ + Suggests the next set of parameters to evaluate by maximizing the acquisition function. + + If no points have been evaluated yet, returns a random sample from the parameter space. + Otherwise, uses the acquisition function to propose the next promising point based on the current Gaussian Process model. + + Returns: + A dictionary mapping parameter names to their suggested values. + """ if len(self._space) == 0: return self._space.array_to_params(self._space.random_sample(random_state=self._random_state)) # Finding argmax of the acquisition function. - suggestion = self._acquisition_function.suggest(gp=self._gp, target_space=self._space, fit_gp=True) + suggestion = self._acquisition_function.suggest( + gp=self._gp, target_space=self._space, fit_gp=True, random_state=self._random_state + ) return self._space.array_to_params(suggestion) diff --git a/bayes_opt/target_space.py b/bayes_opt/target_space.py index ba41bbfe..42a735ab 100644 --- a/bayes_opt/target_space.py +++ b/bayes_opt/target_space.py @@ -71,10 +71,20 @@ def __init__( self, target_func: Callable[..., float] | None, pbounds: BoundsMapping, - constraint: ConstraintModel | None = None, random_state: int | RandomState | None = None, allow_duplicate_points: bool | None = False, ) -> None: + """ + Initializes a TargetSpace for managing parameter points and target values. + + Configures the optimization domain with parameter bounds, types, and random state. Prepares internal storage for registered parameter points, target values, and uniqueness checks. Constraints are not set at initialization and must be configured separately if needed. + + Args: + target_func: The function to be optimized, or None if not provided. + pbounds: Mapping of parameter names to their bounds and types. + random_state: Seed or random state for reproducible sampling (optional). + allow_duplicate_points: If True, allows duplicate parameter points to be registered. + """ self.random_state = ensure_rng(random_state) self._allow_duplicate_points = allow_duplicate_points or False self.n_duplicate_points = 0 @@ -98,24 +108,32 @@ def __init__( # keep track of unique points we have seen so far self._cache: dict[tuple[float, ...], float | tuple[float, float | NDArray[Float]]] = {} - self._constraint: ConstraintModel | None = constraint + self._constraint: ConstraintModel | None = None - if constraint is not None: - # preallocated memory for constraint fulfillment - self._constraint_values: NDArray[Float] - if constraint.lb.size == 1: - self._constraint_values = np.empty(shape=(0), dtype=float) - else: - self._constraint_values = np.empty(shape=(0, self._constraint.lb.size), dtype=float) + def set_constraint(self, constraint: ConstraintModel) -> None: + """ + Sets the constraint model for the parameter space. + + Initializes internal storage for constraint values based on the dimensionality of the constraint model. + """ + self._constraint = constraint + + # preallocated memory for constraint fulfillment + self._constraint_values: NDArray[Float] + if constraint.lb.size == 1: + self._constraint_values = np.empty(shape=(0), dtype=float) else: - self._constraint = None + self._constraint_values = np.empty(shape=(0, self._constraint.lb.size), dtype=float) def __contains__(self, x: NDArray[Float]) -> bool: - """Check if this parameter has already been registered. - - Returns - ------- - bool + """ + Checks whether a parameter point has already been registered. + + Args: + x: A flattened numpy array representing a parameter point. + + Returns: + True if the parameter point is present in the internal cache, False otherwise. """ return _hashable(x) in self._cache diff --git a/tests/test_acquisition.py b/tests/test_acquisition.py index 5ecd04fb..1aba6daa 100644 --- a/tests/test_acquisition.py +++ b/tests/test_acquisition.py @@ -43,7 +43,13 @@ def target_func(): @pytest.fixture def random_state(): - return np.random.RandomState() + """ + Creates a NumPy RandomState instance with a fixed seed for reproducible results. + + Returns: + numpy.random.RandomState: Random number generator seeded with 0. + """ + return np.random.RandomState(0) @pytest.fixture @@ -63,17 +69,31 @@ def constraint_func(): @pytest.fixture def constrained_target_space(target_func): + """ + Creates a TargetSpace with parameter bounds and a constraint on the sum of 'x' and 'y'. + + The returned TargetSpace enforces that x + y is between 0.0 and 1.0, inclusive. + """ constraint_model = ConstraintModel(fun=lambda params: params["x"] + params["y"], lb=0.0, ub=1.0) - return TargetSpace( - target_func=target_func, pbounds={"x": (1, 4), "y": (0, 3)}, constraint=constraint_model - ) + space = TargetSpace(target_func=target_func, pbounds={"x": (1, 4), "y": (0, 3)}) + space.set_constraint(constraint_model) + return space class MockAcquisition(acquisition.AcquisitionFunction): - def __init__(self, random_state=None): - super().__init__(random_state=random_state) + def __init__(self): + """ + Initializes the MockAcquisition instance for testing purposes. + """ + super().__init__() def _get_acq(self, gp, constraint=None): + """ + Returns a mock acquisition function for testing that computes a fixed quadratic form. + + The returned function ignores the Gaussian process and constraint inputs, and evaluates + the acquisition value as (3 - x[0])^2 + (1 - x[1])^2 for each input point x. + """ def mock_acq(x: np.ndarray): return (3 - x[..., 0]) ** 2 + (1 - x[..., 1]) ** 2 @@ -86,45 +106,67 @@ def get_acquisition_params(self) -> dict: return {} def set_acquisition_params(self, params: dict) -> None: + """ + Sets the parameters for the acquisition function. + + This method should be implemented by subclasses to update internal parameters + based on the provided dictionary. + """ pass -def test_base_acquisition(): - acq = acquisition.UpperConfidenceBound() - assert isinstance(acq.random_state, np.random.RandomState) - acq = acquisition.UpperConfidenceBound(random_state=42) - assert isinstance(acq.random_state, np.random.RandomState) - - def test_acquisition_optimization(gp, target_space): - acq = MockAcquisition(random_state=42) + """ + Tests that MockAcquisition suggests the correct next point and raises an error when no optimization strategies are enabled. + """ + acq = MockAcquisition() target_space.register(params={"x": 2.5, "y": 0.5}, target=3.0) res = acq.suggest(gp=gp, target_space=target_space) assert np.array([3.0, 1.0]) == pytest.approx(res) with pytest.raises(ValueError): - acq.suggest(gp=gp, target_space=target_space, n_random=0, n_l_bfgs_b=0) + acq.suggest(gp=gp, target_space=target_space, n_random=0, n_smart=0) -def test_acquisition_optimization_only_random(gp, target_space): - acq = MockAcquisition(random_state=42) +def test_acquisition_optimization_only_random(gp, target_space, random_state): + """ + Tests that acquisition optimization using only random sampling suggests the correct point. + + Verifies that the suggestion from the acquisition function matches the expected value when only random sampling is used, and ensures that the best random sample is included among the seed points. + """ + acq = MockAcquisition() target_space.register(params={"x": 2.5, "y": 0.5}, target=3.0) - res = acq.suggest(gp=gp, target_space=target_space, n_l_bfgs_b=0, n_random=10_000) + res = acq.suggest(gp=gp, target_space=target_space, n_smart=0, n_random=10_000, random_state=random_state) # very lenient comparison as we're just considering random samples assert np.array([3.0, 1.0]) == pytest.approx(res, abs=1e-1, rel=1e-1) + # make sure that the best random sample is in the seeds + acq_f = acq._get_acq(gp=gp, constraint=target_space.constraint) + x_min, _, x_seeds = acq._random_sample_minimize( + acq_f, target_space, random_state=random_state, n_random=10_000, n_x_seeds=3 + ) + assert x_min in x_seeds + def test_acquisition_optimization_only_l_bfgs_b(gp, target_space): - acq = MockAcquisition(random_state=42) + """ + Tests that the acquisition function suggests the correct point using only L-BFGS-B optimization. + + Verifies that when random sampling is disabled and only L-BFGS-B optimization is used, the suggested point matches the expected optimum. + """ + acq = MockAcquisition() target_space.register(params={"x": 2.5, "y": 0.5}, target=3.0) - res = acq.suggest(gp=gp, target_space=target_space, n_l_bfgs_b=10, n_random=0) + res = acq.suggest(gp=gp, target_space=target_space, n_smart=10, n_random=0) assert np.array([3.0, 1.0]) == pytest.approx(res) def test_upper_confidence_bound(gp, target_space, random_state): - acq = acquisition.UpperConfidenceBound( - exploration_decay=0.5, exploration_decay_delay=2, kappa=1.0, random_state=random_state - ) + """ + Tests the behavior of the UpperConfidenceBound acquisition function. + + Verifies parameter initialization, error handling when the Gaussian process is unfitted, correct suggestion of new points, and proper decay of the exploration parameter `kappa` after multiple suggestions. + """ + acq = acquisition.UpperConfidenceBound(exploration_decay=0.5, exploration_decay_delay=2, kappa=1.0) assert acq.kappa == 1.0 # Test that the suggest method raises an error if the GP is unfitted @@ -134,27 +176,44 @@ def test_upper_confidence_bound(gp, target_space, random_state): acq.suggest(gp=gp, target_space=target_space) target_space.register(params={"x": 2.5, "y": 0.5}, target=3.0) - acq.suggest(gp=gp, target_space=target_space) + acq.suggest(gp=gp, target_space=target_space, random_state=random_state) assert acq.kappa == 1.0 - acq.suggest(gp=gp, target_space=target_space) + acq.suggest(gp=gp, target_space=target_space, random_state=random_state) assert acq.kappa == 0.5 def test_smart_minimize_fails(target_space, random_state): - acq = acquisition.UpperConfidenceBound(random_state=random_state) + """ + Tests that `_smart_minimize` returns infinity when the objective function produces NaN values. + """ + acq = acquisition.UpperConfidenceBound() def fun(x): + """ + Returns an array of NaNs matching the shape of the first column of `x`, or a single NaN if `x` is not two-dimensional. + + Args: + x: Input array. + + Returns: + An array of NaNs with the same shape as `x[:, 0]`, or a scalar NaN if indexing fails. + """ try: return np.nan * np.zeros_like(x[:, 0]) except IndexError: return np.nan - _, min_acq_l = acq._smart_minimize(fun, space=target_space, x_seeds=np.array([[2.5, 0.5]])) + _, min_acq_l = acq._smart_minimize( + fun, space=target_space, x_seeds=np.array([[2.5, 0.5]]), random_state=random_state + ) assert min_acq_l == np.inf -def test_upper_confidence_bound_with_constraints(gp, constrained_target_space, random_state): - acq = acquisition.UpperConfidenceBound(random_state=random_state) +def test_upper_confidence_bound_with_constraints(gp, constrained_target_space): + """ + Tests that UpperConfidenceBound acquisition raises ConstraintNotSupportedError when used with a constrained target space. + """ + acq = acquisition.UpperConfidenceBound() constrained_target_space.register(params={"x": 2.5, "y": 0.5}, target=3.0, constraint_value=0.5) with pytest.raises(exception.ConstraintNotSupportedError): @@ -162,93 +221,110 @@ def test_upper_confidence_bound_with_constraints(gp, constrained_target_space, r def test_probability_of_improvement(gp, target_space, random_state): - acq = acquisition.ProbabilityOfImprovement( - exploration_decay=0.5, exploration_decay_delay=2, xi=0.01, random_state=random_state - ) + """ + Tests the ProbabilityOfImprovement acquisition function's parameter handling, error conditions, and exploration decay. + + Verifies correct initialization of `xi`, raises an error if `y_max` is unset, checks that suggestions update `xi` according to decay settings, and confirms that no decay leaves `xi` unchanged. + """ + acq = acquisition.ProbabilityOfImprovement(exploration_decay=0.5, exploration_decay_delay=2, xi=0.01) assert acq.xi == 0.01 with pytest.raises(ValueError, match="y_max is not set"): acq.base_acq(0.0, 0.0) target_space.register(params={"x": 2.5, "y": 0.5}, target=3.0) - acq.suggest(gp=gp, target_space=target_space) + acq.suggest(gp=gp, target_space=target_space, random_state=random_state) assert acq.xi == 0.01 - acq.suggest(gp=gp, target_space=target_space) + acq.suggest(gp=gp, target_space=target_space, random_state=random_state) assert acq.xi == 0.005 # no decay - acq = acquisition.ProbabilityOfImprovement(exploration_decay=None, xi=0.01, random_state=random_state) + acq = acquisition.ProbabilityOfImprovement(exploration_decay=None, xi=0.01) assert acq.xi == 0.01 - acq.suggest(gp=gp, target_space=target_space) + acq.suggest(gp=gp, target_space=target_space, random_state=random_state) assert acq.xi == 0.01 - acq.suggest(gp=gp, target_space=target_space) + acq.suggest(gp=gp, target_space=target_space, random_state=random_state) assert acq.xi == 0.01 def test_probability_of_improvement_with_constraints(gp, constrained_target_space, random_state): - acq = acquisition.ProbabilityOfImprovement( - exploration_decay=0.5, exploration_decay_delay=2, xi=0.01, random_state=random_state - ) + """ + Tests the ProbabilityOfImprovement acquisition function with constrained target spaces. + + Verifies correct error handling when no valid points are registered, and ensures that suggestions are made only when valid points exist under constraints. + """ + acq = acquisition.ProbabilityOfImprovement(exploration_decay=0.5, exploration_decay_delay=2, xi=0.01) assert acq.xi == 0.01 with pytest.raises(ValueError, match="y_max is not set"): acq.base_acq(0.0, 0.0) with pytest.raises(exception.TargetSpaceEmptyError): - acq.suggest(gp=gp, target_space=constrained_target_space) + acq.suggest(gp=gp, target_space=constrained_target_space, random_state=random_state) constrained_target_space.register(params={"x": 2.5, "y": 0.5}, target=3.0, constraint_value=3.0) with pytest.raises(exception.NoValidPointRegisteredError): - acq.suggest(gp=gp, target_space=constrained_target_space) + acq.suggest(gp=gp, target_space=constrained_target_space, random_state=random_state) constrained_target_space.register(params={"x": 1.0, "y": 0.0}, target=1.0, constraint_value=1.0) - acq.suggest(gp=gp, target_space=constrained_target_space) + acq.suggest(gp=gp, target_space=constrained_target_space, random_state=random_state) def test_expected_improvement(gp, target_space, random_state): - acq = acquisition.ExpectedImprovement( - exploration_decay=0.5, exploration_decay_delay=2, xi=0.01, random_state=random_state - ) + """ + Tests the behavior of the ExpectedImprovement acquisition function. + + Verifies correct initialization of parameters, error handling when `y_max` is unset, suggestion logic, and the effect of exploration decay on the `xi` parameter. + """ + acq = acquisition.ExpectedImprovement(exploration_decay=0.5, exploration_decay_delay=2, xi=0.01) assert acq.xi == 0.01 with pytest.raises(ValueError, match="y_max is not set"): acq.base_acq(0.0, 0.0) target_space.register(params={"x": 2.5, "y": 0.5}, target=3.0) - acq.suggest(gp=gp, target_space=target_space) + acq.suggest(gp=gp, target_space=target_space, random_state=random_state) assert acq.xi == 0.01 - acq.suggest(gp=gp, target_space=target_space) + acq.suggest(gp=gp, target_space=target_space, random_state=random_state) assert acq.xi == 0.005 - acq = acquisition.ExpectedImprovement(exploration_decay=None, xi=0.01, random_state=random_state) + acq = acquisition.ExpectedImprovement(exploration_decay=None, xi=0.01) assert acq.xi == 0.01 - acq.suggest(gp=gp, target_space=target_space) + acq.suggest(gp=gp, target_space=target_space, random_state=random_state) assert acq.xi == 0.01 - acq.suggest(gp=gp, target_space=target_space) + acq.suggest(gp=gp, target_space=target_space, random_state=random_state) assert acq.xi == 0.01 def test_expected_improvement_with_constraints(gp, constrained_target_space, random_state): - acq = acquisition.ExpectedImprovement( - exploration_decay=0.5, exploration_decay_delay=2, xi=0.01, random_state=random_state - ) + """ + Tests the ExpectedImprovement acquisition function with constrained target spaces. + + Verifies correct error handling when no valid points are registered, checks parameter initialization, and ensures that suggestions are made only when valid points exist under constraints. + """ + acq = acquisition.ExpectedImprovement(exploration_decay=0.5, exploration_decay_delay=2, xi=0.01) assert acq.xi == 0.01 with pytest.raises(ValueError, match="y_max is not set"): acq.base_acq(0.0, 0.0) with pytest.raises(exception.TargetSpaceEmptyError): - acq.suggest(gp=gp, target_space=constrained_target_space) + acq.suggest(gp=gp, target_space=constrained_target_space, random_state=random_state) constrained_target_space.register(params={"x": 2.5, "y": 0.5}, target=3.0, constraint_value=3.0) with pytest.raises(exception.NoValidPointRegisteredError): - acq.suggest(gp=gp, target_space=constrained_target_space) + acq.suggest(gp=gp, target_space=constrained_target_space, random_state=random_state) constrained_target_space.register(params={"x": 1.0, "y": 0.0}, target=1.0, constraint_value=1.0) - acq.suggest(gp=gp, target_space=constrained_target_space) + acq.suggest(gp=gp, target_space=constrained_target_space, random_state=random_state) @pytest.mark.parametrize("strategy", [0.0, "mean", "min", "max"]) def test_constant_liar(gp, target_space, target_func, random_state, strategy): - base_acq = acquisition.UpperConfidenceBound(random_state=random_state) - acq = acquisition.ConstantLiar(base_acquisition=base_acq, strategy=strategy, random_state=random_state) + """ + Tests the behavior of the Constant Liar acquisition strategy in Bayesian optimization. + + Verifies that dummy points are managed correctly, that suggested samples are more diverse than those from the base acquisition, and that new points are registered properly as the optimization progresses. + """ + base_acq = acquisition.UpperConfidenceBound() + acq = acquisition.ConstantLiar(base_acquisition=base_acq, strategy=strategy) target_space.register(params={"x": 2.5, "y": 0.5}, target=3.0) target_space.register(params={"x": 1.0, "y": 1.5}, target=2.5) @@ -257,7 +333,7 @@ def test_constant_liar(gp, target_space, target_func, random_state, strategy): assert len(acq.dummies) == 0 for _ in range(10): - samples.append(acq.suggest(gp=gp, target_space=target_space)) + samples.append(acq.suggest(gp=gp, target_space=target_space, random_state=random_state)) assert len(acq.dummies) == len(samples) samples = np.array(samples) @@ -271,7 +347,7 @@ def test_constant_liar(gp, target_space, target_func, random_state, strategy): for i in range(10): target_space.register(params={"x": samples[i][0], "y": samples[i][1]}, target=target_func(samples[i])) - acq.suggest(gp=gp, target_space=target_space) + acq.suggest(gp=gp, target_space=target_space, random_state=random_state) assert len(acq.dummies) == 1 @@ -282,15 +358,20 @@ def test_constant_liar_invalid_strategy(): def test_constant_liar_with_constraints(gp, constrained_target_space, random_state): - base_acq = acquisition.UpperConfidenceBound(random_state=random_state) - acq = acquisition.ConstantLiar(base_acquisition=base_acq, random_state=random_state) + """ + Tests ConstantLiar acquisition with constrained target spaces. + + Verifies that ConstantLiar raises TargetSpaceEmptyError when no valid points are registered and ConstraintNotSupportedError when constraints are present. Also checks that the base acquisition function produces consistent results. + """ + base_acq = acquisition.UpperConfidenceBound() + acq = acquisition.ConstantLiar(base_acquisition=base_acq) with pytest.raises(exception.TargetSpaceEmptyError): - acq.suggest(gp=gp, target_space=constrained_target_space) + acq.suggest(gp=gp, target_space=constrained_target_space, random_state=random_state) constrained_target_space.register(params={"x": 2.5, "y": 0.5}, target=3.0, constraint_value=0.5) with pytest.raises(exception.ConstraintNotSupportedError): - acq.suggest(gp=gp, target_space=constrained_target_space) + acq.suggest(gp=gp, target_space=constrained_target_space, random_state=random_state) mean = random_state.rand(10) std = random_state.rand(10) @@ -298,10 +379,12 @@ def test_constant_liar_with_constraints(gp, constrained_target_space, random_sta def test_gp_hedge(random_state): - acq = acquisition.GPHedge( - base_acquisitions=[acquisition.UpperConfidenceBound(random_state=random_state)], - random_state=random_state, - ) + """ + Tests the initialization and base acquisition behavior of the GPHedge acquisition function. + + Verifies that GPHedge raises a TypeError when base acquisitions are ambiguous, and checks that its base acquisition functions produce expected outputs when provided with mean and standard deviation arrays. + """ + acq = acquisition.GPHedge(base_acquisitions=[acquisition.UpperConfidenceBound()]) with pytest.raises(TypeError, match="GPHedge base acquisition function is ambiguous"): acq.base_acq(0.0, 0.0) @@ -319,14 +402,23 @@ def test_gp_hedge(random_state): def test_gphedge_update_gains(random_state): - base_acq1 = acquisition.UpperConfidenceBound(random_state=random_state) - base_acq2 = acquisition.ProbabilityOfImprovement(xi=0.01, random_state=random_state) + """ + Tests that GPHedge acquisition correctly updates its internal gains using predictions from a mock Gaussian process. + """ + base_acq1 = acquisition.UpperConfidenceBound() + base_acq2 = acquisition.ProbabilityOfImprovement(xi=0.01) base_acquisitions = [base_acq1, base_acq2] - acq = acquisition.GPHedge(base_acquisitions=base_acquisitions, random_state=random_state) + acq = acquisition.GPHedge(base_acquisitions=base_acquisitions) class MockGP1: def __init__(self, n): + """ + Initializes the object with a gains array of zeros of length n. + + Args: + n: The number of elements in the gains array. + """ self.gains = np.zeros(n) def predict(self, x): @@ -343,14 +435,25 @@ def predict(self, x): def test_gphedge_softmax_sampling(random_state): - base_acq1 = acquisition.UpperConfidenceBound(random_state=random_state) - base_acq2 = acquisition.ProbabilityOfImprovement(xi=0.01, random_state=random_state) + """ + Tests that GPHedge acquisition correctly samples the index of the base acquisition + with the highest gain using softmax sampling, ensuring deterministic selection + when one gain is dominant. + """ + base_acq1 = acquisition.UpperConfidenceBound() + base_acq2 = acquisition.ProbabilityOfImprovement(xi=0.01) base_acquisitions = [base_acq1, base_acq2] - acq = acquisition.GPHedge(base_acquisitions=base_acquisitions, random_state=random_state) + acq = acquisition.GPHedge(base_acquisitions=base_acquisitions) class MockGP2: def __init__(self, good_index=0): + """ + Initializes the object with a specified index value. + + Args: + good_index: The index to be stored, defaulting to 0. + """ self.good_index = good_index def predict(self, x): @@ -363,22 +466,27 @@ def predict(self, x): acq = acquisition.GPHedge(base_acquisitions=base_acquisitions) acq.previous_candidates = np.zeros(len(base_acquisitions)) acq._update_gains(MockGP2(good_index=good_index)) - assert good_index == acq._sample_idx_from_softmax_gains() + assert good_index == acq._sample_idx_from_softmax_gains(random_state=random_state) def test_gphedge_integration(gp, target_space, random_state): - base_acq1 = acquisition.UpperConfidenceBound(random_state=random_state) - base_acq2 = acquisition.ProbabilityOfImprovement(xi=0.01, random_state=random_state) + """ + Tests integration of the GPHedge acquisition function with Bayesian optimization. + + Verifies that GPHedge correctly stores its base acquisitions, raises an error when the target space is empty, and can suggest new points after registration. Also checks that new points can be registered iteratively using the suggested parameters. + """ + base_acq1 = acquisition.UpperConfidenceBound() + base_acq2 = acquisition.ProbabilityOfImprovement(xi=0.01) base_acquisitions = [base_acq1, base_acq2] - acq = acquisition.GPHedge(base_acquisitions=base_acquisitions, random_state=random_state) + acq = acquisition.GPHedge(base_acquisitions=base_acquisitions) assert acq.base_acquisitions == base_acquisitions with pytest.raises(exception.TargetSpaceEmptyError): - acq.suggest(gp=gp, target_space=target_space) + acq.suggest(gp=gp, target_space=target_space, random_state=random_state) target_space.register(params={"x": 2.5, "y": 0.5}, target=3.0) for _ in range(5): - p = acq.suggest(gp=gp, target_space=target_space) + p = acq.suggest(gp=gp, target_space=target_space, random_state=random_state) target_space.register(p, sum(p)) @@ -605,10 +713,23 @@ def test_custom_acquisition_without_get_params(): """Test that a custom acquisition function without get_acquisition_params raises NotImplementedError.""" class CustomAcqWithoutGetParams(acquisition.AcquisitionFunction): - def __init__(self, random_state=None): - super().__init__(random_state=random_state) + def __init__(self): + """ + Initializes the MockAcquisition instance for testing purposes. + """ + super().__init__() def base_acq(self, mean, std): + """ + Computes a simple acquisition value as the sum of mean and standard deviation. + + Args: + mean: The predicted mean value(s). + std: The predicted standard deviation(s). + + Returns: + The sum of mean and standard deviation, representing a basic acquisition score. + """ return mean + std def set_acquisition_params(self, params): @@ -623,13 +744,28 @@ def set_acquisition_params(self, params): def test_custom_acquisition_without_set_params(): - """Test that a custom acquisition function without set_acquisition_params raises NotImplementedError.""" + """ + Tests that a custom acquisition function lacking set_acquisition_params raises NotImplementedError. + """ class CustomAcqWithoutSetParams(acquisition.AcquisitionFunction): - def __init__(self, random_state=None): - super().__init__(random_state=random_state) + def __init__(self): + """ + Initializes the MockAcquisition instance for testing purposes. + """ + super().__init__() def base_acq(self, mean, std): + """ + Computes a simple acquisition value as the sum of mean and standard deviation. + + Args: + mean: The predicted mean value(s). + std: The predicted standard deviation(s). + + Returns: + The sum of mean and standard deviation, representing a basic acquisition score. + """ return mean + std def get_acquisition_params(self): diff --git a/tests/test_constraint.py b/tests/test_constraint.py index 495dc9d1..5f77f235 100644 --- a/tests/test_constraint.py +++ b/tests/test_constraint.py @@ -60,6 +60,11 @@ def test_single_constraint_lower(target_function, constraint_function): def test_single_constraint_lower_upper(target_function, constraint_function): + """ + Tests Bayesian optimization with a nonlinear constraint that has both lower and upper bounds. + + Verifies that the optimizer's constraint bounds match the original constraint, that the optimal solution satisfies the constraint limits, and that the constraint model's approximation closely matches the true constraint function values at sampled points within a specified tolerance. + """ constraint_limit_lower = -0.5 constraint_limit_upper = 0.5 @@ -89,13 +94,15 @@ def test_single_constraint_lower_upper(target_function, constraint_function): y = res[:, 3] # Check accuracy of approximation for sampled points - assert constraint_function(x, y) == pytest.approx(optimizer.constraint.approx(xy), rel=1e-5, abs=1e-5) - assert constraint_function(x, y) == pytest.approx( - optimizer.space.constraint_values[:-1], rel=1e-5, abs=1e-5 - ) + assert constraint_function(x, y) == pytest.approx(optimizer.constraint.approx(xy), rel=1e-4, abs=1e-4) def test_multiple_constraints(target_function): + """ + Tests Bayesian optimization with a two-dimensional nonlinear constraint. + + Verifies that the optimizer respects multiple upper-bounded constraints, and that the constraint model's approximation matches the true constraint function values within a specified tolerance. + """ def constraint_function_2_dim(x, y): return np.array( [-np.cos(x) * np.cos(y) + np.sin(x) * np.sin(y), -np.cos(x) * np.cos(-y) + np.sin(x) * np.sin(-y)] diff --git a/tests/test_target_space.py b/tests/test_target_space.py index c269569d..9a557662 100644 --- a/tests/test_target_space.py +++ b/tests/test_target_space.py @@ -99,8 +99,13 @@ def test_register(): def test_register_with_constraint(): - constraint = ConstraintModel(lambda x: x, -2, 2, transform=lambda x: x) - space = TargetSpace(target_func, PBOUNDS, constraint=constraint) + """ + Tests that registering points with a constraint in TargetSpace requires a constraint value, + correctly stores constraint values, and raises a ValueError if the constraint value is missing. + """ + constraint = ConstraintModel(lambda x: x, -2, 2, transform=None) + space = TargetSpace(target_func, PBOUNDS) + space.set_constraint(constraint) assert len(space) == 0 # registering with dict @@ -193,9 +198,15 @@ def test_y_max(): def test_y_max_with_constraint(): + """ + Tests that `_target_max` returns the maximum target value among feasible points when a constraint is set. + + Verifies that only points satisfying the constraint are considered, and that the method returns `None` if no feasible points exist. + """ PBOUNDS = {"p1": (0, 10), "p2": (1, 100)} constraint = ConstraintModel(lambda p1, p2: p1 - p2, -2, 2) - space = TargetSpace(target_func, PBOUNDS, constraint) + space = TargetSpace(target_func, PBOUNDS) + space.set_constraint(constraint) assert space._target_max() is None space.probe(params={"p1": 1, "p2": 2}) # Feasible space.probe(params={"p1": 5, "p2": 1}) # Unfeasible @@ -227,9 +238,15 @@ def test_max(): def test_max_with_constraint(): + """ + Tests that the `max` method of `TargetSpace` returns the best feasible point when a constraint is set. + + Verifies that only points satisfying the constraint are considered, and the returned result includes the parameters, target value, and constraint value. + """ PBOUNDS = {"p1": (0, 10), "p2": (1, 100)} constraint = ConstraintModel(lambda p1, p2: p1 - p2, -2, 2) - space = TargetSpace(target_func, PBOUNDS, constraint=constraint) + space = TargetSpace(target_func, PBOUNDS) + space.set_constraint(constraint) assert space.max() is None space.probe(params={"p1": 1, "p2": 2}) # Feasible @@ -240,9 +257,15 @@ def test_max_with_constraint(): def test_max_with_constraint_identical_target_value(): + """ + Tests that `TargetSpace.max()` returns the best feasible point when multiple points have identical target values but different constraint satisfaction. + + Ensures that only points satisfying the constraint are considered, and among feasible points with the same target value, the correct one is selected. + """ PBOUNDS = {"p1": (0, 10), "p2": (1, 100)} constraint = ConstraintModel(lambda p1, p2: p1 - p2, -2, 2) - space = TargetSpace(target_func, PBOUNDS, constraint=constraint) + space = TargetSpace(target_func, PBOUNDS) + space.set_constraint(constraint) assert space.max() is None space.probe(params={"p1": 1, "p2": 2}) # Feasible