Source code for gerrychain.optimization.optimization

from ..chain import MarkovChain
from ..partition import Partition
from ..accept import always_accept
import random
from typing import Union, Callable, List, Any
from tqdm import tqdm
import math


[docs]class SingleMetricOptimizer: """ SingleMetricOptimizer represents the class of algorithms / chains that optimize plans with respect to a single metric. An instance of this class encapsulates the following state information: * the dual graph and updaters via the initial partition, * the constraints new proposals are subject to, * the metric over which to optimize, * and whether or not to seek maximal or minimal values of the metric. The SingleMetricOptimizer class implements the following common methods of optimization: * Short Bursts * Simulated Annealing * Tilted Runs Both during and after a optimization run, the class properties `best_part` and `best_score` represent the optimal partition / corresponding score value observed. Note that these properties do NOT persist across multiple optimization runs, as they are reset each time an optimization run is invoked. """ def __init__( self, proposal: Callable[[Partition], Partition], constraints: Union[ Callable[[Partition], bool], List[Callable[[Partition], bool]] ], initial_state: Partition, optimization_metric: Callable[[Partition], Any], maximize: bool = True, step_indexer: str = "step", ): """ :param proposal: Function proposing the next state from the current state. :type proposal: Callable :param constraints: A function, or lists of functions, determining whether the proposed next state is valid (passes all binary constraints). Usually this is a :class:`~gerrychain.constraints.Validator` class instance. :type constraints: Union[Callable[[Partition], bool], List[Callable[[Partition], bool]]] :param initial_state: Initial state of the optimizer. :type initial_state: Partition :param optimization_metric: The score function with which to optimize over. This should have the signature: ``Partition -> 'a`` where 'a is comparable. :type optimization_metric: Callable[[Partition], Any] :param maximize: Boolean indicating whether to maximize or minimize the function. Defaults to True for maximize. :type maximize: bool, optional :param step_indexer: Name of the updater tracking the partitions step in the chain. If not implemented on the partition the constructor creates and adds it. Defaults to "step". :type step_indexer: str, optional :return: A SingleMetricOptimizer object :rtype: SingleMetricOptimizer """ self._initial_part = initial_state self._proposal = proposal self._constraints = constraints self._score = optimization_metric self._maximize = maximize self._best_part = None self._best_score = None self._step_indexer = step_indexer if self._step_indexer not in self._initial_part.updaters: step_updater = lambda p: ( 0 if p.parent is None else p.parent[self._step_indexer] + 1 ) self._initial_part.updaters[self._step_indexer] = step_updater @property def best_part(self) -> Partition: """ Partition object corresponding to best scoring plan observed over the current (or most recent) optimization run. :return: Partition object with the best score. :rtype: Partition """ return self._best_part @property def best_score(self) -> Any: """ Value of score metric corresponding to best scoring plan observed over the current (or most recent) optimization run. :return: Value of the best score. :rtype: Any """ return self._best_score @property def score(self) -> Callable[[Partition], Any]: """ The score function which is being optimized over. :return: The score function. :rtype: Callable[[Partition], Any] """ return self._score def _is_improvement(self, new_score: float, old_score: float) -> bool: """ Helper function defining improvement comparison between scores. Scores can be any comparable type. :param new_score: Score of proposed partition. :type new_score: float :param old_score: Score of previous partition. :type old_score: float :return: Whether the new score is an improvement over the old score. :rtype: bool """ if self._maximize: return new_score >= old_score else: return new_score <= old_score def _tilted_acceptance_function(self, p: float) -> Callable[[Partition], bool]: """ Function factory that binds and returns a tilted acceptance function. :param p: The probability of accepting a worse score. :type p: float :return: An acceptance function for tilted chains. :rtype: Callable[[Partition], bool] """ def tilted_acceptance_function(part): if part.parent is None: return True part_score = self.score(part) prev_score = self.score(part.parent) if self._is_improvement(part_score, prev_score): return True else: return random.random() < p return tilted_acceptance_function def _simulated_annealing_acceptance_function( self, beta_function: Callable[[int], float], beta_magnitude: float ): """ Function factory that binds and returns a simulated annealing acceptance function. :param beta_function: Function (f: t -> beta, where beta is in [0,1]) defining temperature over time. f(t) = 0 the chain is hot and every proposal is accepted. At f(t) = 1 the chain is cold and worse proposal have a low probability of being accepted relative to the magnitude of change in score. :type beta_function: Callable[[int], float] :param beta_magnitude: Scaling parameter for how much to weight changes in score. :type beta_magnitude: float :return: A acceptance function for simulated annealing runs. :rtype: Callable[[Partition], bool] """ def simulated_annealing_acceptance_function(part): if part.parent is None: return True score_delta = self.score(part) - self.score(part.parent) beta = beta_function(part[self._step_indexer]) if self._maximize: score_delta *= -1 return random.random() < math.exp(-beta * beta_magnitude * score_delta) return simulated_annealing_acceptance_function
[docs] @classmethod def jumpcycle_beta_function( cls, duration_hot: int, duration_cold: int ) -> Callable[[int], float]: """ Class method that binds and return simple hot-cold cycle beta temperature function, where the chain runs hot for some given duration and then cold for some duration, and repeats that cycle. :param duration_hot: Number of steps to run chain hot. :type duration_hot: int :param duration_cold: Number of steps to run chain cold. :type duration_cold: int :return: Beta function defining hot-cold cycle. :rtype: Callable[[int], float] """ cycle_length = duration_hot + duration_cold def beta_function(step: int): time_in_cycle = step % cycle_length return float(time_in_cycle >= duration_hot) return beta_function
[docs] @classmethod def linearcycle_beta_function( cls, duration_hot: int, duration_cooldown: int, duration_cold: int ) -> Callable[[int], float]: """ Class method that binds and returns a simple linear hot-cool cycle beta temperature function, where the chain runs hot for some given duration, cools down linearly for some duration, and then runs cold for some duration before warming up again and repeating. :param duration_hot: Number of steps to run chain hot. :type duration_hot: int :param duration_cooldown: Number of steps needed to transition from hot to cold or vice-versa. :type duration_cooldown: int :param duration_cold: Number of steps to run chain cold. :type duration_cold: int :return: Beta function defining linear hot-cool cycle. :rtype: Callable[[int], float] """ cycle_length = duration_hot + 2 * duration_cooldown + duration_cold def beta_function(step: int): time_in_cycle = step % cycle_length if time_in_cycle < duration_hot: return 0 elif time_in_cycle < duration_hot + duration_cooldown: return (time_in_cycle - duration_hot) / duration_cooldown elif time_in_cycle < cycle_length - duration_cooldown: return 1 else: return ( 1 - (time_in_cycle - cycle_length + duration_cooldown) / duration_cooldown ) return beta_function
[docs] @classmethod def linear_jumpcycle_beta_function( cls, duration_hot: int, duration_cooldown, duration_cold: int ): """ Class method that binds and returns a simple linear hot-cool cycle beta temperature function, where the chain runs hot for some given duration, cools down linearly for some duration, and then runs cold for some duration before jumping back to hot and repeating. :param duration_hot: Number of steps to run chain hot. :type duration_hot: int :param duration_cooldown: Number of steps needed to transition from hot to cold. :type duration_cooldown: int :param duration_cold: Number of steps to run chain cold. :type duration_cold: int :return: Beta function defining linear hot-cool cycle. :rtype: Callable[[int], float] """ cycle_length = duration_hot + duration_cooldown + duration_cold def beta_function(step: int): time_in_cycle = step % cycle_length if time_in_cycle < duration_hot: return 0 elif time_in_cycle < duration_hot + duration_cooldown: return (time_in_cycle - duration_hot) / duration_cooldown else: return 1 return beta_function
[docs] @classmethod def logitcycle_beta_function( cls, duration_hot: int, duration_cooldown: int, duration_cold: int ) -> Callable[[int], float]: """ Class method that binds and returns a logit hot-cool cycle beta temperature function, where the chain runs hot for some given duration, cools down according to the logit function :math:`f(x) = (log(x/(1-x)) + 5)/10` for some duration, and then runs cold for some duration before warming up again using the :math:`1-f(x)` and repeating. :param duration_hot: Number of steps to run chain hot. :type duration_hot: int :param duration_cooldown: Number of steps needed to transition from hot to cold or vice-versa. :type duration_cooldown: int :param duration_cold: Number of steps to run chain cold. :type duration_cold: int """ cycle_length = duration_hot + 2 * duration_cooldown + duration_cold # this will scale from 0 to 1 approximately logit = lambda x: (math.log(x / (1 - x)) + 5) / 10 def beta_function(step: int): time_in_cycle = step % cycle_length if time_in_cycle <= duration_hot: return 0 elif time_in_cycle < duration_hot + duration_cooldown: value = logit((time_in_cycle - duration_hot) / duration_cooldown) if value < 0: return 0 if value > 1: return 1 return value elif time_in_cycle <= cycle_length - duration_cooldown: return 1 else: value = 1 - logit( (time_in_cycle - cycle_length + duration_cooldown) / duration_cooldown ) if value < 0: return 0 if value > 1: return 1 return value return beta_function
[docs] @classmethod def logit_jumpcycle_beta_function( cls, duration_hot: int, duration_cooldown: int, duration_cold: int ) -> Callable[[int], float]: """ Class method that binds and returns a logit hot-cool cycle beta temperature function, where the chain runs hot for some given duration, cools down according to the logit function :math:`f(x) = (log(x/(1-x)) + 5)/10` for some duration, and then runs cold for some duration before jumping back to hot and repeating. :param duration_hot: Number of steps to run chain hot. :type duration_hot: int :param duration_cooldown: Number of steps needed to transition from hot to cold or vice-versa. :type duration_cooldown: int :param duration_cold: Number of steps to run chain cold. :type duration_cold: int """ cycle_length = duration_hot + duration_cooldown + duration_cold # this will scale from 0 to 1 approximately logit = lambda x: (math.log(x / (1 - x)) + 5) / 10 def beta_function(step: int): time_in_cycle = step % cycle_length if time_in_cycle <= duration_hot: return 0 elif time_in_cycle < duration_hot + duration_cooldown: value = logit((time_in_cycle - duration_hot) / duration_cooldown) if value < 0: return 0 if value > 1: return 1 return value else: return 1 return beta_function
[docs] def short_bursts( self, burst_length: int, num_bursts: int, accept: Callable[[Partition], bool] = always_accept, with_progress_bar: bool = False, ): """ Performs a short burst run using the instance's score function. Each burst starts at the best performing plan of the previous burst. If there's a tie, the later observed one is selected. :param burst_length: Number of steps to run within each burst. :type burst_length: int :param num_bursts: Number of bursts to perform. :type num_bursts: int :param accept: Function accepting or rejecting the proposed state. Defaults to :func:`~gerrychain.accept.always_accept`. :type accept: Callable[[Partition], bool], optional :param with_progress_bar: Whether or not to draw tqdm progress bar. Defaults to False. :type with_progress_bar: bool, optional :return: Partition generator. :rtype: Generator[Partition] """ if with_progress_bar: for part in tqdm( self.short_bursts( burst_length, num_bursts, accept, with_progress_bar=False ), total=burst_length * num_bursts, ): yield part return self._best_part = self._initial_part self._best_score = self.score(self._best_part) for _ in range(num_bursts): chain = MarkovChain( self._proposal, self._constraints, accept, self._best_part, burst_length ) for part in chain: yield part part_score = self.score(part) if self._is_improvement(part_score, self._best_score): self._best_part = part self._best_score = part_score
[docs] def simulated_annealing( self, num_steps: int, beta_function: Callable[[int], float], beta_magnitude: float = 1, with_progress_bar: bool = False, ): """ Performs simulated annealing with respect to the class instance's score function. :param num_steps: Number of steps to run for. :type num_steps: int :param beta_function: Function (f: t -> beta, where beta is in [0,1]) defining temperature over time. f(t) = 0 the chain is hot and every proposal is accepted. At f(t) = 1 the chain is cold and worse proposal have a low probability of being accepted relative to the magnitude of change in score. :type beta_function: Callable[[int], float] :param beta_magnitude: Scaling parameter for how much to weight changes in score. Defaults to 1. :type beta_magnitude: float, optional :param with_progress_bar: Whether or not to draw tqdm progress bar. Defaults to False. :type with_progress_bar: bool, optional :return: Partition generator. :rtype: Generator[Partition] """ chain = MarkovChain( self._proposal, self._constraints, self._simulated_annealing_acceptance_function( beta_function, beta_magnitude ), self._initial_part, num_steps, ) self._best_part = self._initial_part self._best_score = self.score(self._best_part) chain_generator = tqdm(chain) if with_progress_bar else chain for part in chain_generator: yield part part_score = self.score(part) if self._is_improvement(part_score, self._best_score): self._best_part = part self._best_score = part_score
[docs] def tilted_short_bursts( self, burst_length: int, num_bursts: int, p: float, with_progress_bar: bool = False, ): """ Performs a short burst run using the instance's score function. Each burst starts at the best performing plan of the previous burst. If there's a tie, the later observed one is selected. Within each burst a tilted acceptance function is used where better scoring plans are always accepted and worse scoring plans are accepted with probability `p`. :param burst_length: Number of steps to run within each burst. :type burst_length: int :param num_bursts: Number of bursts to perform. :type num_bursts: int :param p: The probability of accepting a plan with a worse score. :type p: float :param with_progress_bar: Whether or not to draw tqdm progress bar. Defaults to False. :type with_progress_bar: bool, optional :return: Partition generator. :rtype: Generator[Partition] """ return self.short_bursts( burst_length, num_bursts, accept=self._tilted_acceptance_function(p), with_progress_bar=with_progress_bar, )
# TODO: Maybe add a max_time variable so we don't run forever.
[docs] def variable_length_short_bursts( self, num_steps: int, stuck_buffer: int, accept: Callable[[Partition], bool] = always_accept, with_progress_bar: bool = False, ): """ Performs a short burst where the burst length is allowed to increase as it gets harder to find high scoring plans. The initial burst length is set to 2, and it is doubled each time there is no improvement over the passed number (`stuck_buffer`) of runs. :param num_steps: Number of steps to run for. :type num_steps: int :param stuck_buffer: How many bursts of a given length with no improvement to allow before increasing the burst length. :type stuck_buffer: int :param accept: Function accepting or rejecting the proposed state. Defaults to :func:`~gerrychain.accept.always_accept`. :type accept: Callable[[Partition], bool], optional :param with_progress_bar: Whether or not to draw tqdm progress bar. Defaults to False. :type with_progress_bar: bool, optional :return: Partition generator. :rtype: Generator[Partition] """ if with_progress_bar: for part in tqdm( self.variable_length_short_bursts( num_steps, stuck_buffer, accept, with_progress_bar=False ), total=num_steps, ): yield part return self._best_part = self._initial_part self._best_score = self.score(self._best_part) time_stuck = 0 burst_length = 2 i = 0 while i < num_steps: chain = MarkovChain( self._proposal, self._constraints, accept, self._best_part, burst_length ) for part in chain: yield part part_score = self.score(part) if self._is_improvement(part_score, self._best_score): self._best_part = part self._best_score = part_score time_stuck = 0 else: time_stuck += 1 i += 1 if i >= num_steps: break if time_stuck >= stuck_buffer * burst_length: burst_length *= 2
[docs] def tilted_run(self, num_steps: int, p: float, with_progress_bar: bool = False): """ Performs a tilted run. A chain where the acceptance function always accepts better plans and accepts worse plans with some probability `p`. :param num_steps: Number of steps to run for. :type num_steps: int :param p: The probability of accepting a plan with a worse score. :type p: float :param with_progress_bar: Whether or not to draw tqdm progress bar. Defaults to False. :type with_progress_bar: bool, optional :return: Partition generator. :rtype: Generator[Partition] """ chain = MarkovChain( self._proposal, self._constraints, self._tilted_acceptance_function(p), self._initial_part, num_steps, ) self._best_part = self._initial_part self._best_score = self.score(self._best_part) chain_generator = tqdm(chain) if with_progress_bar else chain for part in chain_generator: yield part part_score = self.score(part) if self._is_improvement(part_score, self._best_score): self._best_part = part self._best_score = part_score