我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用scipy.stats.beta()。
def prior_contribution_coefficients(self, state): """ Calculate prior contribution from regression coefficients. Log scale. """ # This will need to be revised if we allow different # variances for different classes of coefficients, eg # microbiome related and host-covariate related dimensions = len(state.beta) normalization = -0.5*dimensions*( np.log(2.0*np.pi*self.prior_coefficient_variance) ) exponent = (-0.5*np.dot(state.beta, state.beta) / (self.prior_coefficient_variance)) return normalization + exponent
def update_empty_probability(self): """ Update Theta_0 from its prior based on length of the rule list. """ if len(self.current_state.rl) > 0: # effectively, the 'emptiness' trial has failed conditional_a = self.model.hyperparameter_a_empty conditional_b = self.model.hyperparameter_b_empty + 1 else: conditional_a = self.model.hyperparameter_a_empty + 1 conditional_b = self.model.hyperparameter_b_empty new_empty_probability = scipy.stats.beta.rvs( conditional_a, conditional_b ) self.current_state.empty_probability = new_empty_probability
def ecc_prior_fn(self, ecc): ## UPGRADE TO INCLUDE FURTHER STRINGS TO INDICATE ECC PRIOR OPTIONS FROM KEPLER AND OTHER SURVEY RESULTS #$$$$$$$$$$$$$$$$$$$$ ret = 1.0 if (self.e_prior==False) or (self.e_prior=="uniform"): ret = self.uniform_fn(ecc,4) else: if ecc!=0: if (self.e_prior == True) or (self.e_prior=='beta'): ret = self.ecc_beta.pdf(ecc) elif self.e_prior == '2e': if (self.mins_ary[6]*self.days_per_year)>1000.0: ret = 2.0*ecc elif self.e_prior=='STO8': ret =(1.0/self.ecc_ST08_k)*(1.0/(1.0+ecc)**self.ecc_ST08_a)-(ecc/2.0**self.ecc_ST08_a) elif self.e_prior=='J08': ret = self.ecc_R.pdf(ecc,scale=self.ecc_R_sig) elif self.e_prior=='RayExp': A = self.ecc_Rexp_a*self.ecc_exp.pdf(ecc, scale=1.0/self.ecc_Rexp_lamda) B = (1.0-self.ecc_Rexp_a)*self.ecc_R.pdf(ecc,scale=self.ecc_Rexp_sig)/self.ecc_Rexp_sig ret = A+B #if ret==0: ret=-np.inf return ret
def gen_x_draws(k): """ Returns a flat array containing k independent draws from the distribution of X, the underlying random variable. This distribution is itself a convex combination of three beta distributions. """ bdraws = beta_dist.rvs((3, k)) # == Transform rows, so each represents a different distribution == # bdraws[0, :] -= 0.5 bdraws[1, :] += 0.6 bdraws[2, :] -= 1.1 # == Set X[i] = bdraws[j, i], where j is a random draw from {0, 1, 2} == # js = np.random.random_integers(0, 2, size=k) X = bdraws[js, np.arange(k)] # == Rescale, so that the random variable is zero mean == # m, sigma = X.mean(), X.std() return (X - m) / sigma
def main(): fig = plt.figure() x = np.linspace(0, 1, 100) sizes = [1, 2, 20, 50] fig_row, fig_col = 2, 4 # Mean of i.i.d uniform for i, n in enumerate(sizes): ax = fig.add_subplot(fig_row, fig_col, i + 1) data, gaussian = central_limit(uniform_dist.rvs, n, 1000) ax.hist(data, bins=20, normed=True, alpha=0.7) plt.plot(x, gaussian.pdf(x), 'r') plt.title('n={0}'.format(n)) # Mean of i.i.d beta(1, 2) for i, n in enumerate(sizes): ax = fig.add_subplot(fig_row, fig_col, i + fig_col + 1) data, gaussian = central_limit(beta_dist(1, 2).rvs, n, 1000) ax.hist(data, bins=20, normed=True, alpha=0.7) plt.plot(x, gaussian.pdf(x), 'r') plt.title('n={0}'.format(n)) plt.show()
def test_beta_binomial_two_identical_models(db_path, sampler): binomial_n = 5 def model_fun(args): return {"result": st.binom(binomial_n, args.theta).rvs()} models = [model_fun for _ in range(2)] models = list(map(SimpleModel, models)) population_size = ConstantPopulationSize(800) parameter_given_model_prior_distribution = [Distribution(theta=st.beta( 1, 1)) for _ in range(2)] abc = ABCSMC(models, parameter_given_model_prior_distribution, MinMaxDistanceFunction(measures_to_use=["result"]), population_size, eps=MedianEpsilon(.1), sampler=sampler) abc.new(db_path, {"result": 2}) minimum_epsilon = .2 history = abc.run(minimum_epsilon, max_nr_populations=3) mp = history.get_model_probabilities(history.max_t) assert abs(mp.p[0] - .5) + abs(mp.p[1] - .5) < .08
def test_all_in_one_model(db_path, sampler): models = [AllInOneModel() for _ in range(2)] population_size = ConstantPopulationSize(800) parameter_given_model_prior_distribution = [Distribution(theta=RV("beta", 1, 1)) for _ in range(2)] abc = ABCSMC(models, parameter_given_model_prior_distribution, MinMaxDistanceFunction(measures_to_use=["result"]), population_size, eps=MedianEpsilon(.1), sampler=sampler) abc.new(db_path, {"result": 2}) minimum_epsilon = .2 history = abc.run(minimum_epsilon, max_nr_populations=3) mp = history.get_model_probabilities(history.max_t) assert abs(mp.p[0] - .5) + abs(mp.p[1] - .5) < .08
def test_beta_binomial_two_identical_models_adaptive(db_path, sampler): binomial_n = 5 def model_fun(args): return {"result": st.binom(binomial_n, args.theta).rvs()} models = [model_fun for _ in range(2)] models = list(map(SimpleModel, models)) population_size = AdaptivePopulationSize(800) parameter_given_model_prior_distribution = [ Distribution(theta=st.beta(1, 1)) for _ in range(2)] abc = ABCSMC(models, parameter_given_model_prior_distribution, MinMaxDistanceFunction(measures_to_use=["result"]), population_size, eps=MedianEpsilon(.1), sampler=sampler) abc.new(db_path, {"result": 2}) minimum_epsilon = .2 history = abc.run(minimum_epsilon, max_nr_populations=3) mp = history.get_model_probabilities(history.max_t) assert abs(mp.p[0] - .5) + abs(mp.p[1] - .5) < .08
def testDirichletSample(self): with self.test_session(): alpha = [1., 2] dirichlet = dirichlet_lib.Dirichlet(alpha) n = constant_op.constant(100000) samples = dirichlet.sample(n) sample_values = samples.eval() self.assertEqual(sample_values.shape, (100000, 2)) self.assertTrue(np.all(sample_values > 0.0)) self.assertLess( stats.kstest( # Beta is a univariate distribution. sample_values[:, 0], stats.beta( a=1., b=2.).cdf)[0], 0.01)
def testBetaSample(self): with self.test_session(): a = 1. b = 2. beta = beta_lib.Beta(a, b) n = constant_op.constant(100000) samples = beta.sample(n) sample_values = samples.eval() self.assertEqual(sample_values.shape, (100000,)) self.assertFalse(np.any(sample_values < 0.0)) self.assertLess( stats.kstest( # Beta is a univariate distribution. sample_values, stats.beta( a=1., b=2.).cdf)[0], 0.01) # The standard error of the sample mean is 1 / (sqrt(18 * n)) self.assertAllClose( sample_values.mean(axis=0), stats.beta.mean(a, b), atol=1e-2) self.assertAllClose( np.cov(sample_values, rowvar=0), stats.beta.var(a, b), atol=1e-1) # Test that sampling with the same seed twice gives the same results.
def likelihood_z_given_rl(self, omega, rl): """ Log-likelihood of working response kappa/omega given rl. Integrates out beta. I think this terminology is somewhat misleading and will revise it. """ X = self.data.covariate_matrix(rl) return self.likelihood_z_given_X(omega,X)
def likelihood_y_given_X_beta(self, X, beta): psi = np.dot(X,beta) probabilities = 1.0/(1.0+np.exp(-1.0*psi)) return np.sum( np.log(np.where(self.data.y,probabilities,1.0-probabilities)) )
def prior_contribution_empty_probability(self, state): return scipy.stats.beta.logpdf( state.empty_probability, self.hyperparameter_a_empty, self.hyperparameter_b_empty )
def flat_prior(self, state): """ Evaluate log-probability of each primitive in the population. Return value is properly normalized. """ raw_phylogeny_weights = self.rule_population.flat_variable_weights phylogeny_weights = scipy.stats.norm.logpdf( np.log(raw_phylogeny_weights), loc = state.phylogeny_mean, scale = state.phylogeny_std ) total_duration = float(self.data.experiment_end - self.data.experiment_start) durations = (self.rule_population.flat_durations / ((1.0+self.window_duration_epsilon)*total_duration) ) window_a = ( state.window_concentration * state.window_typical_fraction ) window_b = ( state.window_concentration * (1.0-state.window_typical_fraction) ) window_weights = scipy.stats.beta.logpdf( durations, window_a, window_b ) weights = phylogeny_weights + window_weights normalization = logsumexp(weights) return weights - normalization
def __init__(self, rl, omega, beta, phylogeny_mean, phylogeny_std, window_typical_fraction=0.5, window_concentration = 2.0, empty_probability=0.5): self.rl = rl self.omega = omega self.beta = beta self.phylogeny_mean = phylogeny_mean self.phylogeny_std = phylogeny_std self.window_typical_fraction = window_typical_fraction self.window_concentration = window_concentration self.empty_probability = empty_probability
def copy(self): return LogisticRuleState( self.rl.copy(), self.omega.copy(), self.beta.copy(), self.phylogeny_mean, self.phylogeny_std, self.window_typical_fraction, self.window_concentration, self.empty_probability )
def step(self): beta_values = [] for i in xrange(self.local_updates_per_structure_update): self.update_omega() self.update_beta() beta_values.append(self.current_state.beta) self.additional_beta_values.append(beta_values) move_type_indicator = np.random.rand() if move_type_indicator < 0.45: self.update_primitives() elif move_type_indicator < 0.9: self.add_remove() else: self.move_primitive() # If the structure moves are accepted, beta becomes out of date, # perhaps grievously, so we defensively update it before # recording the new state self.update_beta() self.update_empty_probability() self.update_window_parameters() # As a convenience, have update_phylogeny_parameters return # the resulting state's prior value (which it calculates # anyway) and record that rather than recalculating it. prior = self.update_phylogeny_parameters() self.states.append(self.current_state.copy()) self.likelihoods.append( self.model.likelihood_y_given_X_beta(self.current_X, self.current_state.beta) ) self.priors.append(prior)
def update_beta(self): """ Draw a new beta from its conditional distribution. """ state = self.current_state # Note p may not equal len(self.current_state.beta) _, p = self.current_X.shape v = self.model.prior_coefficient_variance kappa = self.model.data.y - 0.5 # In later revisions we can avoid this inversion # CAUTION: the dot product here will be incorrect if current_X # is stored as an array of Booleans rather than ints. # the np.diag is wasteful here but okay for test purposes posterior_variance = np.linalg.inv( np.eye(p)/v + np.dot(self.current_X.T, (np.dot(np.diag(state.omega),self.current_X)) ) ) posterior_mean = np.dot(posterior_variance, np.dot(self.current_X.T, kappa) ) beta = np.random.multivariate_normal(posterior_mean, posterior_variance) state.beta = beta ######################################## ######################################## ######################################## ##### CORE RL SAMPLING CODE
def probabilities(rule_list, beta, test_data): """ Predict probabilities of outcome according to the rule list. """ X = test_data.covariate_matrix(rule_list) psi = np.dot(X,beta) probabilities = 1.0/(1.0+np.exp(-1.0*psi)) return probabilities
def prediction(rule_list, beta, test_data): """ Predict y=1 for subjects with probability >= 0.5. """ X = test_data.covariate_matrix(rule_list) psi = np.dot(X,beta) probabilities = 1.0/(1.0+np.exp(-1.0*psi)) prediction = probabilities >= 0.5 return prediction
def step(self): move_type_indicator = np.random.rand() if move_type_indicator < -1.0:# 0.44 self.update_primitives() elif move_type_indicator < 2.0:#0.66 self.add_remove() else: raise # self.move_primitive() # If the structure moves are accepted, beta becomes out of date, # perhaps grievously, so we defensively update it before # recording the new state self.states.append(self.current_state.copy())
def step(self): move_type_indicator = np.random.rand() if move_type_indicator < 0.5:# 0.44 self.update_primitives() elif move_type_indicator < 2.0:#0.66 self.add_remove() else: raise # self.move_primitive() # If the structure moves are accepted, beta becomes out of date, # perhaps grievously, so we defensively update it before # recording the new state self.states.append(self.current_state.copy())
def cmf_prior(self, m2, m1): """from equation 8 of Metchev & Hillenbrand 2009""" beta = -0.39 ret = (m2**(beta)) * (m1**(-1.0*beta-1.0)) #if ret==0: ret=-np.inf return ret
def __init__(self, A=1.4, ?=0.6, ?=0.96, grid_size=50, G=None, ?=np.sqrt, F=stats.beta(2, 2)): self.A, self.?, self.? = A, ?, ? # === set defaults for G, ? and F === # self.G = G if G is not None else lambda x, ?: A * (x * ?)**? self.? = ? self.F = F # === Set up grid over the state space for DP === # # Max of grid is the max of a large quantile value for F and the # fixed point y = G(y, 1). grid_max = max(A**(1 / (1 - ?)), self.F.ppf(1 - ?)) self.x_grid = np.linspace(?, grid_max, grid_size)
def __init__(self, ?=0.95, c=0.6, F_a=1, F_b=1, G_a=3, G_b=1.2, w_max=2, w_grid_size=40, ?_grid_size=40): self.?, self.c, self.w_max = ?, c, w_max self.F = ?_distribution(F_a, F_b, scale=w_max) self.G = ?_distribution(G_a, G_b, scale=w_max) self.f, self.g = self.F.pdf, self.G.pdf # Density functions self.?_min, self.?_max = 1e-3, 1 - 1e-3 # Avoids instability self.w_grid = np.linspace(0, w_max, w_grid_size) self.?_grid = np.linspace(self.?_min, self.?_max, ?_grid_size) x, y = np.meshgrid(self.w_grid, self.?_grid) self.grid_points = np.column_stack((x.ravel(1), y.ravel(1)))
def test_beta_binomial_different_priors(db_path, sampler): binomial_n = 5 def model(args): return {"result": st.binom(binomial_n, args['theta']).rvs()} models = [model for _ in range(2)] models = list(map(SimpleModel, models)) population_size = ConstantPopulationSize(800) a1, b1 = 1, 1 a2, b2 = 10, 1 parameter_given_model_prior_distribution = [Distribution(theta=RV("beta", a1, b1)), Distribution(theta=RV("beta", a2, b2))] abc = ABCSMC(models, parameter_given_model_prior_distribution, MinMaxDistanceFunction(measures_to_use=["result"]), population_size, eps=MedianEpsilon(.1), sampler=sampler) n1 = 2 abc.new(db_path, {"result": n1}) minimum_epsilon = .2 history = abc.run(minimum_epsilon, max_nr_populations=3) mp = history.get_model_probabilities(history.max_t) def B(a, b): return gamma(a) * gamma(b) / gamma(a + b) def expected_p(a, b, n1): return binom(binomial_n, n1) * B(a + n1, b + binomial_n - n1) / B(a, b) p1_expected_unnormalized = expected_p(a1, b1, n1) p2_expected_unnormalized = expected_p(a2, b2, n1) p1_expected = p1_expected_unnormalized / (p1_expected_unnormalized + p2_expected_unnormalized) p2_expected = p2_expected_unnormalized / (p1_expected_unnormalized + p2_expected_unnormalized) assert abs(mp.p[0] - p1_expected) + abs(mp.p[1] - p2_expected) < .08
def testBetaMean(self): with session.Session(): a = [1., 2, 3] b = [2., 4, 1.2] expected_mean = stats.beta.mean(a, b) dist = beta_lib.Beta(a, b) self.assertEqual(dist.mean().get_shape(), (3,)) self.assertAllClose(expected_mean, dist.mean().eval())
def testBetaVariance(self): with session.Session(): a = [1., 2, 3] b = [2., 4, 1.2] expected_variance = stats.beta.var(a, b) dist = beta_lib.Beta(a, b) self.assertEqual(dist.variance().get_shape(), (3,)) self.assertAllClose(expected_variance, dist.variance().eval())
def testBetaEntropy(self): with session.Session(): a = [1., 2, 3] b = [2., 4, 1.2] expected_entropy = stats.beta.entropy(a, b) dist = beta_lib.Beta(a, b) self.assertEqual(dist.entropy().get_shape(), (3,)) self.assertAllClose(expected_entropy, dist.entropy().eval())
def testBetaCdf(self): with self.test_session(): shape = (30, 40, 50) for dt in (np.float32, np.float64): a = 10. * np.random.random(shape).astype(dt) b = 10. * np.random.random(shape).astype(dt) x = np.random.random(shape).astype(dt) actual = beta_lib.Beta(a, b).cdf(x).eval() self.assertAllEqual(np.ones(shape, dtype=np.bool), 0. <= x) self.assertAllEqual(np.ones(shape, dtype=np.bool), 1. >= x) self.assertAllClose(stats.beta.cdf(x, a, b), actual, rtol=1e-4, atol=0)
def testBetaLogCdf(self): with self.test_session(): shape = (30, 40, 50) for dt in (np.float32, np.float64): a = 10. * np.random.random(shape).astype(dt) b = 10. * np.random.random(shape).astype(dt) x = np.random.random(shape).astype(dt) actual = math_ops.exp(beta_lib.Beta(a, b).log_cdf(x)).eval() self.assertAllEqual(np.ones(shape, dtype=np.bool), 0. <= x) self.assertAllEqual(np.ones(shape, dtype=np.bool), 1. >= x) self.assertAllClose(stats.beta.cdf(x, a, b), actual, rtol=1e-4, atol=0)
def __init__(self, model, r0, local_updates_per_structure_update=10): self.model = model # Ensure r0 is sorted within each sublist r0 = r0.copy() for i in xrange(len(r0)): model.rule_population.sort_list_of_primitives(r0[i]) # This may be a bad choice of omega_0... omega = np.ones(self.model.data.y.shape) # Come up with some sensible initial values for the various # structure parameters phylogeny_mean = self.model.phylogeny_lambda_l phylogeny_std = min(self.model.phylogeny_std_upper_bound, 5.0*self.model.phylogeny_delta_l) self.current_state = LogisticRuleState( r0,omega,np.zeros(len(r0)+model.data.n_fixed_covariates), phylogeny_mean, phylogeny_std ) # Could include X in the state except we don't really want to # track it over the whole sampling process; definitely do # need it to persist between subparts of iterations at least, though self.current_X = self.model.data.covariate_matrix(r0) # We specified an arbitary beta: we will overwrite it when we draw # it from its conditional in the first iteration. Because this # is not a valid value of beta, we don't add the initial state # to self.states. self.initial_state = self.current_state.copy() self.states = [] ### The following are not updated after updating beta/z, but are # after every change or attempted change to the rule list structure. self.likelihoods = [] self.priors = [] self.comments = [] self.attempts = {} self.successes = {} self.local_updates_per_structure_update = ( local_updates_per_structure_update ) # We do want some better resolution on the distribution of # coefficients for each state, so we'll store the interstitial beta # samples before each structure update step: self.additional_beta_values = [] ### These don't conceptually need to be attributes of the sampler # object but this way we avoid recalculating them every iteration self.phylogeny_mean_proposal_std = ( 0.5*self.model.phylogeny_delta_l ) self.phylogeny_std_proposal_std = ( 0.5*self.model.phylogeny_delta_l ) self.window_fraction_proposal_std = ( self.model.tmin / (self.model.data.experiment_end - self.model.data.experiment_start) ) self.window_concentration_proposal_std = ( self.model.window_concentration_typical * self.model.window_concentration_update_ratio )
def __init__(self, ecc_prior=True, p_prior=True, inc_prior=True, m1_prior=True, m2_prior=True, para_prior=True, para_est=0, para_err=0, m1_est=0, m1_err=0, m2_est=0, m2_err=0, ecc_beta_a=0.867, ecc_beta_b=3.03, ecc_J08_sig=0.3, ecc_Rexp_lamda=5.12, ecc_Rexp_a=0.781, ecc_Rexp_sig=0.272, ecc_ST08_a=4.33, ecc_ST08_k=0.2431,p_gamma=-0.7, mins_ary=[],maxs_ary=[]): ## check min and max range arrays if (len(mins_ary)>1) and (len(maxs_ary)>1): self.mins_ary = mins_ary self.maxs_ary = maxs_ary else: raise IOError('\n\n No min/max ranges were provided to the priors object!!') ## push in two manual constants self.days_per_year = 365.2422 self.sec_per_year = 60*60*24*self.days_per_year ## choices ## choices:'2e', 'ST08','J08', 'RayExp', 'beta', 'uniform'. Default is 'beta'. self.e_prior = ecc_prior ## `best-fit' alpha and beta from Kipping+2013 self.ecc_beta = stats.beta(ecc_beta_a,ecc_beta_b) #https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.beta.html ## 'best-fit' for the basic Rayleigh from Juric+2008 self.ecc_J08_sig = ecc_J08_sig ### Put this into the advanced settings ??? ## `best-fit' alpha, lambda and sig from Kipping+2013 self.ecc_Rexp_lamda = ecc_Rexp_lamda ### Put this into the advanced settings ??? self.ecc_Rexp_a = ecc_Rexp_a ### Put this into the advanced settings ??? self.ecc_Rexp_sig = ecc_Rexp_sig ### Put this into the advanced settings ??? #https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.rayleigh.html#scipy.stats.rayleigh self.ecc_R = stats.rayleigh() #https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.expon.html self.ecc_exp = stats.expon() ## `best-fit' for the Shen&Turner 2008 pdf self.ecc_ST08_a = ecc_ST08_a ### Put this into the advanced settings ??? self.ecc_ST08_k = ecc_ST08_k ### Put this into the advanced settings ??? #self.ecc_norm = stats.norm #self.ecc_norm_mean = ## #self.ecc_norm_sig = ## #self.ecc_norm.pdf(ecc,loc=self.ecc_norm_mean, scale=self.ecc_norm_sig) ## choices: power-law, Jeffrey's self.p_prior = p_prior self.p_gamma = p_gamma ## choices:sin, cos self.inc_prior = inc_prior ## choices:IMF, PDMF self.m1_prior = m1_prior ## choices:CMF, IMF, PDMF self.m2_prior = m2_prior ## choices:gauss self.para_prior = para_prior ## values necessary to calculate priors self.para_est = para_est self.para_err = para_err self.m1_est = m1_est self.m1_err = m1_err self.m2_est = m2_est self.m2_err = m2_err
def __init__(self,**kwargs): # define the [Fe/H] array, this is the values that the MIST # and C3K grids are built self.FeHarr = ([-4.0,-3.5,-3.0,-2.75,-2.5,-2.25,-2.0,-1.75, -1.5,-1.25,-1.0,-0.75,-0.5,-0.25,0.0,0.25,0.5]) # define the [alpha/Fe] array self.alphaarr = [0.0,0.2,0.4] # define aliases for the MIST isochrones and C3K/CKC files self.MISTpath = kwargs.get('MISTpath',Payne.__abspath__+'data/MIST/') self.C3Kpath = kwargs.get('C3Kpath',Payne.__abspath__+'data/C3K/') # load MIST models self.MIST = h5py.File(self.MISTpath+'/MIST_1.2_EEPtrk.h5','r') self.MISTindex = list(self.MIST['index']) # create weights for Teff # determine the min/max Teff from MIST MISTTeffmin = np.inf MISTTeffmax = 0.0 for ind in self.MISTindex: MISTTeffmin_i = self.MIST[ind]['log_Teff'].min() MISTTeffmax_i = self.MIST[ind]['log_Teff'].max() if MISTTeffmin_i < MISTTeffmin: MISTTeffmin = MISTTeffmin_i if MISTTeffmax_i > MISTTeffmax: MISTTeffmax = MISTTeffmax_i self.teffwgts = beta(0.5,0.5,loc=MISTTeffmin-10.0,scale=(MISTTeffmax+10.0)-(MISTTeffmin-10.0)) # create weights for [Fe/H] self.fehwgts = beta(1.0,0.5,loc=-2.1,scale=2.7).pdf(self.FeHarr) self.fehwgts = self.fehwgts/np.sum(self.fehwgts) # create a dictionary for the C3K models and populate it for different # metallicities self.C3K = {} for aa in self.alphaarr: self.C3K[aa] = {} for mm in self.FeHarr: self.C3K[aa][mm] = h5py.File( self.C3Kpath+'/c3k_v1.3_feh{0:+4.2f}_afe{1:+3.1f}.full.h5'.format(mm,aa), 'r')
def test_beta_binomial_different_priors_initial_epsilon_from_sample(db_path, sampler): binomial_n = 5 def model(args): return {"result": st.binom(binomial_n, args.theta).rvs()} models = [model for _ in range(2)] models = list(map(SimpleModel, models)) population_size = ConstantPopulationSize(800) a1, b1 = 1, 1 a2, b2 = 10, 1 parameter_given_model_prior_distribution = [Distribution(theta=RV("beta", a1, b1)), Distribution(theta=RV("beta", a2, b2))] abc = ABCSMC(models, parameter_given_model_prior_distribution, MinMaxDistanceFunction(measures_to_use=["result"]), population_size, eps=MedianEpsilon(median_multiplier=.9), sampler=sampler) n1 = 2 abc.new(db_path, {"result": n1}) minimum_epsilon = -1 history = abc.run(minimum_epsilon, max_nr_populations=5) mp = history.get_model_probabilities(history.max_t) def B(a, b): return gamma(a) * gamma(b) / gamma(a + b) def expected_p(a, b, n1): return binom(binomial_n, n1) * B(a + n1, b + binomial_n - n1) / B(a, b) p1_expected_unnormalized = expected_p(a1, b1, n1) p2_expected_unnormalized = expected_p(a2, b2, n1) p1_expected = p1_expected_unnormalized / (p1_expected_unnormalized + p2_expected_unnormalized) p2_expected = p2_expected_unnormalized / (p1_expected_unnormalized + p2_expected_unnormalized) assert abs(mp.p[0] - p1_expected) + abs(mp.p[1] - p2_expected) < .08