Source code for maxent_disaggregation.aggregate

from scipy.stats import truncnorm, lognorm
from scipy.optimize import least_squares
import warnings
import numpy as np


[docs] def sample_aggregate( n: int, mean: float = None, sd: float = None, low_bound: float = 0, high_bound: float = np.inf, log: bool = True, suppress_warnings: bool = False, seed: int = None, ) -> np.ndarray: """ Generate random aggregate values based on the information provided. The distribution from which to sample is determined internally based on the information provided by the user." Parameters ---------- n : int The number of samples to generate. mean: The best guess of the aggregate value. sd: The standard deviation of the aggregate value. low_bound: The lower boundary of the aggregate value. high_bound: The upper boundary of the aggregate value. log: If True, the lognormal distribution is used for the aggregate value when a mean and a standard deviation are provided. If False, samples are drawn from a truncated normal distribution, which is the maximum entropy solution but produces a biased mean. Default is True suppress_warnings : bool, optional If True, suppress warnings about sample statistics deviating from input values. Default is False. seed : int, optional Random seed for reproducibility. Default is None. """ rng = np.random.default_rng(seed) # harmonize input of sd if sd is not None and np.isnan(sd): sd = None # Determine distribution and sample # Normal distribution if ( mean is not None and sd is not None and (low_bound == -np.inf or low_bound is None) and (high_bound == np.inf or high_bound is None) ): # Normal distribution sample = rng.normal(loc=mean, scale=sd, size=n) # Truncated normal or lognormal elif mean is not None and sd is not None: if log == False: # Truncated normal from observed parameters sample = sample_truncnorm(mean, sd, low_bound, high_bound, size=n, seed=rng) else: # use lognormal if low_bound < 0: warnings.warn( "You provided a negative lower bound but the lognormal distribution cannot be used with negative values. Setting low_bound to 0. Alternatively set log=False." ) low_bound = 0 if high_bound != np.inf: warnings.warn( "You provided a finite high bound, currently this not supported for the lognormal distribution. High bound is ignored. Alternatively set log=False." ) # Lognormal distribution sigma = np.sqrt(np.log(1 + (sd / mean) ** 2)) mu = np.log(mean) - 0.5 * sigma**2 sample = lognorm.rvs(s=sigma, scale=np.exp(mu), size=n, random_state=rng) elif ( mean is not None and sd is None and low_bound == 0 and (high_bound == np.inf or high_bound is None) ): # Exponential sample = rng.exponential(scale=mean, size=n) elif ( mean is None and sd is None and np.isfinite(low_bound) and np.isfinite(high_bound) ): # Uniform sample = rng.uniform(low=low_bound, high=high_bound, size=n) elif mean is not None and sd is None and low_bound not in [0, None]: raise ValueError( "Case with mean, no sd, and non-zero lower bound, or non-finite high bound is not implemented at the moment." ) elif mean is not None and sd is None and np.isfinite(high_bound): raise ValueError( "Case with mean, no sd, and non-zero lower bound, or non-finite high bound is not implemented at the moment." ) else: raise ValueError( "Combination of inputs not implemented. Please check the input values." ) # Check sample vs input check_sample_vs_input( mean=mean, sd=sd, low_bound=low_bound, high_bound=high_bound, samples=sample, suppress_warnings=suppress_warnings) return sample
[docs] def sample_truncnorm(obs_mean, obs_std, a=None, b=None, size=1000, seed=None): """ Draw random samples from a truncated normal distribution given observed mean, standard deviation, and bounds. Parameters ---------- obs_mean : float Observed mean used to infer the underlying normal distribution's location. obs_std : float Observed standard deviation used to infer the underlying normal distribution's scale. a : float Lower truncation bound (expressed in the same units as obs_mean/obs_std). If None, defaults to 0. b : float Upper truncation bound (expressed in the same units as obs_mean/obs_std). If None, defaults to infinity. size : int, optional Number of random samples to draw. Default is 1000. Returns ------- numpy.ndarray 1-D array of random variates drawn from the truncated normal distribution. Notes ----- This function relies on estimate_truncnormparams(obs_mean, obs_std, a, b) to compute parameters (mu, sigma, alpha, beta) suitable for scipy.stats.truncnorm.rvs, where mu and sigma are the location and scale of the underlying normal distribution and alpha, beta are the standardized truncation limits accepted by scipy.stats.truncnorm. Examples -------- >>> samples = sample_truncnorm(10.0, 2.0, 5.0, 15.0, size=500) >>> samples.shape(500,) """ if a is None: a = 0 if b is None: b = np.inf # Checks on min and max if a >= b: raise ValueError("a should be less than b.") if a is None: a = -np.inf if b is None: b = np.inf if obs_mean < a or obs_mean > b: raise ValueError("obs_mean should be between a and b.") # Check Bhatia–Davis inequality sigma_bd = np.sqrt((b - obs_mean) * (obs_mean - a)) if obs_std > sigma_bd: warnings.warn( f"Observed standard deviation exceeds the Bhatia–Davis inequality bound." f"The inputs are inconsistent: obs_std must be less than or equal to {sigma_bd:.2f}." f" Adjusting obs_std to {sigma_bd:.2f}." ) obs_std = sigma_bd mu, sigma, alpha, beta = estimate_truncnormparams(obs_mean, obs_std, a, b) rng = np.random.default_rng(seed) return truncnorm.rvs(alpha, beta, loc=mu, scale=sigma, size=size, random_state=rng)
[docs] def estimate_truncnormparams( obs_mean, obs_std, a, b, mu_init=None, sigma_init=None, mean_weight=10 ): """ Estimate the Gaussian parameters of a truncated normal distribution given observed statistics. This function finds the parameters (mu, sigma) of a truncated normal distribution that best match the observed mean and standard deviation, given truncation bounds. Parameters ---------- obs_mean : float The observed mean of the truncated distribution. obs_std : float The observed standard deviation of the truncated distribution. a : float The lower truncation bound. b : float The upper truncation bound. mu_init : float, optional Initial guess for the location parameter (mu). Defaults to obs_mean. sigma_init : float, optional Initial guess for the scale parameter (sigma). Defaults to obs_std. mean_weight : float, optional Weighting factor for the mean relative to the standard deviation in the optimization objective. Higher values prioritize matching the mean more closely. Default is 10. Adjust as needed. Returns ------- mu_opt : float Optimal location parameter of the underlying normal distribution. sigma_opt : float Optimal scale parameter of the underlying normal distribution. alpha_opt : float Standardized lower truncation bound: (a - mu_opt) / sigma_opt. beta_opt : float Standardized upper truncation bound: (b - mu_opt) / sigma_opt. Notes ----- The function uses least squares optimization to minimize the difference between the theoretical and observed moments of the truncated normal distribution. The scale parameter (sigma) is optimized in logscale to ensure positivity without boundary issues. Examples -------- >>> mu, sigma, alpha, beta = estimate_truncnormparams(5.0, 1.5, 0, 10) >>> print(f"Estimated mu: {mu:.2f}, sigma: {sigma:.2f}") """ def objective(params): mu, log_sigma = params sigma = np.exp(log_sigma) alpha = (a - mu) / sigma beta = (b - mu) / sigma tn = truncnorm(alpha, beta, loc=mu, scale=sigma) return [mean_weight * (tn.mean() - obs_mean), tn.std() - obs_std] if mu_init is None: mu_init = obs_mean if sigma_init is None: sigma_init = obs_std res = least_squares( objective, x0=[mu_init, np.log(sigma_init)], ) mu_opt, log_sigma_opt = res.x sigma_opt = np.exp(log_sigma_opt) alpha_opt = (a - mu_opt) / sigma_opt beta_opt = (b - mu_opt) / sigma_opt return mu_opt, sigma_opt, alpha_opt, beta_opt
[docs] def check_sample_vs_input( mean, sd, low_bound, high_bound, samples, threshold_shares=0.05, threshold_sd=0.2, suppress_warnings=False, ): """ Check if the sample mean and standard deviation are close to the input values. Raise warnings if the mean and standard deviation deviate beyond specified thresholds. Raise a ValueError if samples fall outside the specified bounds. Parameters ---------- mean : float The input mean value. sd : float The input standard deviation value. low_bound : float The lower bound used in sampling. high_bound : float The upper bound used in sampling. samples : numpy.ndarray The array of sampled values. threshold_shares : float, optional The relative tolerance for mean comparison. Default is 0.05 (5%). threshold_sd : float, optional The relative tolerance for standard deviation comparison. Default is 0.2 (20%). suppress_warnings : bool, optional If True, suppress warnings about sample statistics deviating from input values. Default is False. Returns ------- None - Warnings are printed if the sample statistics deviate significantly from the input values for the mean or standard deviation. - Raises ValueError if samples fall outside the specified bounds. """ sample_mean = samples.mean() sample_sd = samples.std() if not np.isclose(sample_mean, mean, rtol=threshold_shares) and not suppress_warnings: warnings.warn( f"Sample mean {sample_mean:.2f} deviates from input mean {mean:.2f} " f"by more than {threshold_shares*100:.1f}%.\n" f"To suppress this warning, set suppress_warnings=True or increase threshold_shares." ) if not np.isclose(sample_sd, sd, rtol=threshold_sd) and not suppress_warnings: warnings.warn( f"Sample standard deviation {sample_sd:.2f} deviates from input standard deviation {sd:.2f} " f"by more than {threshold_sd*100:.1f}%.\n" f"To suppress this warning, set suppress_warnings=True or increase threshold_sd." ) if low_bound is not None and samples.min() < low_bound: raise ValueError( f"Sample minimum {samples.min():.2f} is below the lower bound {low_bound:.2f}." ) if high_bound is not None and samples.max() > high_bound: raise ValueError( f"Sample maximum {samples.max():.2f} is above the upper bound {high_bound:.2f}." )