From 8dd25b1b63e993ae80055303f859c79a1cf3b28a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Capretto?= Date: Fri, 17 Nov 2023 07:06:24 -0300 Subject: [PATCH] Improve tests (#757) * start moving tests * modifications * Replace numpy.round_ with numpy.round * Keep porting tests.. * Add testcategorical * Finalize test migration * fixes --- bambi/priors/prior.py | 2 +- tests/test_aliases.py | 38 + tests/test_alternative_samplers.py | 78 ++ tests/test_built_models.py | 1075 ---------------------- tests/test_interpret_messages.py | 2 - tests/test_model_construction.py | 111 ++- tests/test_models.py | 1336 ++++++++++++++++++++++++++++ tests/test_predict.py | 429 --------- 8 files changed, 1563 insertions(+), 1508 deletions(-) create mode 100644 tests/test_alternative_samplers.py delete mode 100644 tests/test_built_models.py create mode 100644 tests/test_models.py delete mode 100644 tests/test_predict.py diff --git a/bambi/priors/prior.py b/bambi/priors/prior.py index fd5470d57..d4c2da420 100644 --- a/bambi/priors/prior.py +++ b/bambi/priors/prior.py @@ -65,7 +65,7 @@ def __repr__(self): def format_arg(value, decimals): try: - outcome = np.round_(value, decimals) + outcome = np.round(value, decimals) except: # pylint: disable = bare-except try: outcome = value.name diff --git a/tests/test_aliases.py b/tests/test_aliases.py index e332d7ad7..81a098039 100644 --- a/tests/test_aliases.py +++ b/tests/test_aliases.py @@ -1,6 +1,8 @@ import pytest import bambi as bmb +import numpy as np +import pandas as pd @pytest.fixture(scope="module") @@ -13,6 +15,26 @@ def anes(): return bmb.load_data("ANES") +@pytest.fixture(scope="module") +def data_100(): + size = 100 + rng = np.random.default_rng(121195) + data = pd.DataFrame( + { + "n1": rng.normal(size=size), + "n2": rng.normal(size=size), + "n3": rng.normal(size=size), + "b0": rng.binomial(n=1, p=0.5, size=size), + "b1": rng.choice(["a", "b"], size=size), + "count1": rng.poisson(lam=2, size=size), + "count2": rng.poisson(lam=2, size=size), + "cat1": rng.choice(list("MNOP"), size=size), + "cat2": rng.choice(list("FGHIJK"), size=size), + } + ) + return data + + def test_non_distributional_model(my_data): # Plain model formula = bmb.Formula("y ~ x") @@ -126,3 +148,19 @@ def test_set_alias_warnings(my_data): print(model.constant_components) assert len(record) == 1 assert str(record[0].message) == expected_warning + + +# FIXME: Move somewhere +def test_set_alias(data_100): + model = bmb.Model("n1 ~ n2 + (n2|cat1)", data_100) + aliases = { + "Intercept": "α", + "n2": "β", + "1|cat1": "α_group", + "n2|cat1": "β_group", + "sigma": "σ", + } + model.set_alias(aliases) + model.build() + new_names = set(["α", "β", "α_group", "α_group_σ", "β_group", "β_group_σ", "σ"]) + assert new_names.issubset(set(model.backend.model.named_vars)) diff --git a/tests/test_alternative_samplers.py b/tests/test_alternative_samplers.py new file mode 100644 index 000000000..a16134762 --- /dev/null +++ b/tests/test_alternative_samplers.py @@ -0,0 +1,78 @@ +import bambi as bmb +import numpy as np +import pandas as pd + +import pytest + + +@pytest.fixture(scope="module") +def data_n100(): + size = 100 + rng = np.random.default_rng(121195) + data = pd.DataFrame( + { + "b1": rng.binomial(n=1, p=0.5, size=size), + "n1": rng.poisson(lam=2, size=size), + "n2": rng.poisson(lam=2, size=size), + "y1": rng.normal(size=size), + "y2": rng.normal(size=size), + "y3": rng.normal(size=size), + "cat2": rng.choice(["a", "b"], size=size), + "cat4": rng.choice(list("MNOP"), size=size), + "cat5": rng.choice(list("FGHIJK"), size=size), + } + ) + return data + + +def test_laplace(): + data = pd.DataFrame(np.repeat((0, 1), (30, 60)), columns=["w"]) + priors = {"Intercept": bmb.Prior("Uniform", lower=0, upper=1)} + model = bmb.Model("w ~ 1", data=data, family="bernoulli", priors=priors, link="identity") + results = model.fit(inference_method="laplace") + mode_n = results.posterior["Intercept"].mean().item() + std_n = results.posterior["Intercept"].std().item() + mode_a = data.mean() + std_a = data.std() / len(data) ** 0.5 + np.testing.assert_array_almost_equal((mode_n, std_n), (mode_a.item(), std_a.item()), decimal=2) + + +def test_vi(): + data = pd.DataFrame(np.repeat((0, 1), (30, 60)), columns=["w"]) + priors = {"Intercept": bmb.Prior("Uniform", lower=0, upper=1)} + model = bmb.Model("w ~ 1", data=data, family="bernoulli", priors=priors, link="identity") + results = model.fit(inference_method="vi", method="advi") + samples = results.sample(1000).posterior["Intercept"] + mode_n = samples.mean() + std_n = samples.std() + mode_a = data.mean() + std_a = data.std() / len(data) ** 0.5 + np.testing.assert_array_almost_equal( + (mode_n.item(), std_n.item()), (mode_a.item(), std_a.item()), decimal=2 + ) + + +@pytest.mark.parametrize( + "args", + [ + ("mcmc", {}), + ("nuts_numpyro", {"chain_method": "vectorized"}), + ("nuts_blackjax", {"chain_method": "vectorized"}), + ], +) +def test_logistic_regression_categoric_alternative_samplers(data_n100, args): + model = bmb.Model("b1 ~ n1", data_n100, family="bernoulli") + model.fit(tune=50, draws=50, inference_method=args[0], **args[1]) + + +@pytest.mark.parametrize( + "args", + [ + ("mcmc", {}), + ("nuts_numpyro", {"chain_method": "vectorized"}), + ("nuts_blackjax", {"chain_method": "vectorized"}), + ], +) +def test_regression_alternative_samplers(data_n100, args): + model = bmb.Model("n1 ~ n2", data_n100) + model.fit(tune=50, draws=50, inference_method=args[0], **args[1]) diff --git a/tests/test_built_models.py b/tests/test_built_models.py deleted file mode 100644 index 5e58466eb..000000000 --- a/tests/test_built_models.py +++ /dev/null @@ -1,1075 +0,0 @@ -from os.path import dirname, join - -import logging -import re - -import pytest - -import numpy as np -import pandas as pd -import pymc as pm - -from scipy.special import expit - -import bambi as bmb -from bambi.terms import GroupSpecificTerm - - -@pytest.fixture(scope="module") -def crossed_data(): - """ - Group specific effects: - 10 subjects, 12 items, 5 sites - Subjects crossed with items, nested in sites - Items crossed with sites - - common effects: - A continuous predictor, a numeric dummy, and a three-level category - (levels a,b,c) - - Structure: - Subjects nested in dummy (e.g., gender), crossed with threecats - Items crossed with dummy, nested in threecats - Sites partially crossed with dummy (4/5 see a single dummy, 1/5 sees both - dummies) - Sites crossed with threecats - """ - - data_dir = join(dirname(__file__), "data") - data = pd.read_csv(join(data_dir, "crossed_random.csv")) - data["subj"] = data["subj"].astype(str) - return data - - -@pytest.fixture(scope="module") -def dm(): - """ - Data obtained from https://github.com/jswesner/nps_emergence/tree/v2_nps_emerge - and used in Gamma GLM - """ - data_dir = join(dirname(__file__), "data") - data = pd.read_csv(join(data_dir, "dm.csv")) - return data - - -@pytest.fixture(scope="module") -def init_data(): - """ - Data used to test initialization method - """ - data_dir = join(dirname(__file__), "data") - data = pd.read_csv(join(data_dir, "obs.csv")) - return data - - -@pytest.fixture(scope="module") -def inhaler(): - data_dir = join(dirname(__file__), "data") - data = pd.read_csv(join(data_dir, "inhaler.csv")) - data["rating"] = pd.Categorical(data["rating"], categories=[1, 2, 3, 4]) - data["treat"] = pd.Categorical(data["treat"]) - return data - - -@pytest.fixture(scope="module") -def categorical_family_categorical_predictor(): - data_dir = join(dirname(__file__), "data") - data = pd.read_csv(join(data_dir, "categorical_family_categorical_predictor.csv")) - return data - - -@pytest.fixture(scope="module") -def data_100(): - size = 100 - rng = np.random.default_rng(121195) - data = pd.DataFrame( - { - "n1": rng.normal(size=size), - "n2": rng.normal(size=size), - "n3": rng.normal(size=size), - "b0": rng.binomial(n=1, p=0.5, size=size), - "b1": rng.choice(["a", "b"], size=size), - "count1": rng.poisson(lam=2, size=size), - "count2": rng.poisson(lam=2, size=size), - "cat1": rng.choice(list("MNOP"), size=size), - "cat2": rng.choice(list("FGHIJK"), size=size), - } - ) - return data - - -@pytest.fixture(scope="module") -def data_1000(): - size = 1000 - rng = np.random.default_rng(121195) - data = pd.DataFrame( - { - "n1": rng.normal(size=size), - "n2": rng.normal(size=size), - "n3": rng.normal(size=size), - "b0": rng.binomial(n=1, p=0.5, size=size), - "b1": rng.choice(["a", "b"], size=size), - "count1": rng.poisson(lam=2, size=size), - "count2": rng.poisson(lam=2, size=size), - "cat1": rng.choice(list("MNOP"), size=size), - "cat2": rng.choice(list("FGHIJK"), size=size), - } - ) - return data - - -@pytest.fixture(scope="module") -def sleepstudy(): - return bmb.load_data("sleepstudy") - - -def test_empty_model(crossed_data): - model0 = bmb.Model("Y ~ 0", crossed_data) - model0.fit(tune=0, draws=1) - - -def test_intercept_only_model(crossed_data): - model0 = bmb.Model("Y ~ 1", crossed_data) - model0.fit(tune=0, draws=1) - - -def test_slope_only_model(crossed_data): - model0 = bmb.Model("Y ~ 0 + continuous", crossed_data) - model0.fit(tune=0, draws=1) - - -def test_cell_means_parameterization(crossed_data): - model0 = bmb.Model("Y ~ 0 + threecats", crossed_data) - model0.fit(tune=0, draws=1) - - -def test_3x4_common_anova(crossed_data): - # add a four-level category that's perfectly crossed with threecats - crossed_data["fourcats"] = sum([[x] * 10 for x in ["a", "b", "c", "d"]], list()) * 3 - - # with intercept - model0 = bmb.Model("Y ~ threecats*fourcats", crossed_data) - fitted0 = model0.fit(tune=0, draws=1) - assert len(fitted0.posterior.data_vars) == 5 - - # without intercept (i.e., 2-factor cell means model) - model1 = bmb.Model("Y ~ 0 + threecats*fourcats", crossed_data) - fitted1 = model1.fit(tune=0, draws=1) - assert len(fitted1.posterior.data_vars) == 4 - - -def test_cell_means_with_covariate(crossed_data): - model = bmb.Model("Y ~ 0 + threecats + continuous", crossed_data) - model.build() - # check that threecats priors have finite variance - assert not np.isinf(model.response_component.terms["threecats"].prior.args["sigma"]).all() - - -def test_many_common_many_group_specific(crossed_data): - # This test is kind of a mess, but it is very important, it checks lots of things. - # delete a few values to also test dropna=True functionality - crossed_data_missing = crossed_data.copy() - crossed_data_missing.loc[0, "Y"] = np.nan - crossed_data_missing.loc[1, "continuous"] = np.nan - crossed_data_missing.loc[2, "threecats"] = np.nan - - # Here I'm comparing implicit/explicit intercepts for group specific effects work the same way. - model0 = bmb.Model( - "Y ~ continuous + dummy + threecats + (threecats|subj) + (1|item) + (0+continuous|item) + (dummy|item) + (threecats|site)", - crossed_data_missing, - dropna=True, - ) - model0.fit( - tune=10, - draws=10, - chains=2, - ) - - model1 = bmb.Model( - "Y ~ continuous + dummy + threecats + (threecats|subj) + (continuous|item) + (dummy|item) + (threecats|site)", - crossed_data_missing, - dropna=True, - ) - model1.fit( - tune=10, - draws=10, - chains=2, - ) - # Check that the group specific effects design matrices have the same shape - X0 = pd.concat( - [pd.DataFrame(t.data) for t in model0.response_component.group_specific_terms.values()], - axis=1, - ) - X1 = pd.concat( - [pd.DataFrame(t.data) for t in model1.response_component.group_specific_terms.values()], - axis=1, - ) - assert X0.shape == X1.shape - - # check that the group specific effect design matrix contain the same columns, - # even if term names / columns names / order of columns is different - X0_set = set(tuple(X0.iloc[:, i]) for i in range(len(X0.columns))) - X1_set = set(tuple(X1.iloc[:, i]) for i in range(len(X1.columns))) - assert X0_set == X1_set - - # check that common effect design matrices are the same, - # even if term names / level names / order of columns is different - X0_list = [] - X1_list = [] - for term in model0.response_component.common_terms.values(): - if term.levels is not None: - for level_idx in range(len(term.levels)): - X0_list.append(tuple(term.data[:, level_idx])) - else: - X0_list.append(tuple(term.data)) - - for term in model1.response_component.common_terms.values(): - if term.levels is not None: - for level_idx in range(len(term.levels)): - X1_list.append(tuple(term.data[:, level_idx])) - else: - X1_list.append(tuple(term.data)) - - assert set(X0_list) == set(X1_list) - - # check that models have same priors for common effects - priors0 = { - x.name: x.prior.args - for x in model0.response_component.terms.values() - if not isinstance(x, GroupSpecificTerm) - } - priors1 = { - x.name: x.prior.args - for x in model1.response_component.terms.values() - if not isinstance(x, GroupSpecificTerm) - } - - # check dictionary keys - assert set(priors0) == set(priors1) - - # check dictionary values - def dicts_close(a, b): - if set(a) != set(b): - return False - else: - return [np.allclose(a[x], b[x], atol=0, rtol=0.01) for x in a.keys()] - - assert all([dicts_close(priors0[x], priors1[x]) for x in priors0.keys()]) - - # check that fit and add models have same priors for group specific effects - priors0 = { - x.name: x.prior.args["sigma"].args - for x in model0.response_component.group_specific_terms.values() - } - priors1 = { - x.name: x.prior.args["sigma"].args - for x in model1.response_component.group_specific_terms.values() - } - - # check dictionary keys - assert set(priors0) == set(priors1) - - # check dictionary values - def dicts_close(a, b): - if set(a) != set(b): - return False - else: - return [np.allclose(a[x], b[x], atol=0, rtol=0.01) for x in a.keys()] - - assert all([dicts_close(priors0[x], priors1[x]) for x in priors0.keys()]) - - -def test_cell_means_with_many_group_specific_effects(crossed_data): - # Group specific intercepts are added in different way, but the final result should be the same. - formula = "Y ~" + "+".join( - [ - "0", - "threecats", - "(threecats|subj)", - "(1|subj)", - "(0 + continuous|item)", - "(dummy|item)", - "(0 + threecats|site)", - "(1|site)", - ] - ) - model0 = bmb.Model(formula, crossed_data) - model0.fit(tune=0, draws=1) - - formula = "Y ~" + "+".join( - [ - "0", - "threecats", - "(threecats|subj)", - "(continuous|item)", - "(dummy|item)", - "(threecats|site)", - ] - ) - model1 = bmb.Model(formula, crossed_data) - model1.fit(tune=0, draws=1) - - # check that the group specific effects design matrices have the same shape - X0 = pd.concat( - [ - pd.DataFrame(t.data) - if not isinstance(t.data, dict) - else pd.concat([pd.DataFrame(t.data[x]) for x in t.data.keys()], axis=1) - for t in model0.response_component.group_specific_terms.values() - ], - axis=1, - ) - X1 = pd.concat( - [ - pd.DataFrame(t.data) - if not isinstance(t.data, dict) - else pd.concat([pd.DataFrame(t.data[x]) for x in t.data.keys()], axis=1) - for t in model0.response_component.group_specific_terms.values() - ], - axis=1, - ) - assert X0.shape == X1.shape - - # check that the group specific effect design matrix contain the same columns, - # even if term names / columns names / order of columns is different - X0_set = set(tuple(X0.iloc[:, i]) for i in range(len(X0.columns))) - X1_set = set(tuple(X1.iloc[:, i]) for i in range(len(X1.columns))) - assert X0_set == X1_set - - # check that common effect design matrices are the same, - # even if term names / level names / order of columns is different - X0 = set( - [ - tuple(t.data[:, lev]) - for t in model0.response_component.common_terms.values() - for lev in range(len(t.levels)) - ] - ) - X1 = set( - [ - tuple(t.data[:, lev]) - for t in model1.response_component.common_terms.values() - for lev in range(len(t.levels)) - ] - ) - assert X0 == X1 - - # check that fit and add models have same priors for common effects - priors0 = { - x.name: x.prior.args - for x in model0.response_component.terms.values() - if not isinstance(x, GroupSpecificTerm) - } - priors1 = { - x.name: x.prior.args - for x in model1.response_component.terms.values() - if not isinstance(x, GroupSpecificTerm) - } - assert set(priors0) == set(priors1) - - # check that fit and add models have same priors for group specific effects - priors0 = { - x.name: x.prior.args["sigma"].args - for x in model0.response_component.terms.values() - if isinstance(x, GroupSpecificTerm) - } - priors1 = { - x.name: x.prior.args["sigma"].args - for x in model1.response_component.terms.values() - if isinstance(x, GroupSpecificTerm) - } - assert set(priors0) == set(priors1) - - -def test_group_specific_categorical_interaction(crossed_data): - crossed_data["fourcats"] = sum([[x] * 10 for x in ["a", "b", "c", "d"]], list()) * 3 - model = bmb.Model("Y ~ continuous + (threecats:fourcats|site)", crossed_data) - model.fit(tune=10, draws=10) - - -def test_logistic_regression_empty_index(data_100): - model = bmb.Model("b1 ~ n1", data_100, family="bernoulli") - model.fit() - - -def test_logistic_regression_good_numeric(data_100): - model = bmb.Model("b0 ~ n1", data_100, family="bernoulli") - model.fit() - - -def test_logistic_regression_bad_numeric(): - error_msg = "Numeric response must be all 0 and 1 for 'bernoulli' family" - rng = np.random.default_rng(1234) - data = pd.DataFrame({"y": rng.choice([1, 2], 50), "x": rng.normal(size=50)}) - with pytest.raises(ValueError, match=error_msg): - model = bmb.Model("y ~ x", data, family="bernoulli") - model.fit() - - -@pytest.mark.parametrize( - "args", - [ - ("mcmc", {}), - ("nuts_numpyro", {"chain_method": "vectorized"}), - ("nuts_blackjax", {"chain_method": "vectorized"}), - ], -) -def test_logistic_regression_categoric_alternative_samplers(data_100, args): - model = bmb.Model("b1 ~ n1", data_100, family="bernoulli") - model.fit(tune=50, draws=50, inference_method=args[0], **args[1]) - - -@pytest.mark.parametrize( - "args", - [ - ("mcmc", {}), - ("nuts_numpyro", {"chain_method": "vectorized"}), - ("nuts_blackjax", {"chain_method": "vectorized"}), - ], -) -def test_regression_alternative_samplers(data_100, args): - model = bmb.Model("n1 ~ n2", data_100) - model.fit(tune=50, draws=50, inference_method=args[0], **args[1]) - - -def test_laplace_regression(data_100): - bmb_model = bmb.Model("n1 ~ n2", data_100, family="laplace") - bmb_model.fit() - - -def test_poisson_regression(crossed_data): - # build model using fit and pymc - crossed_data["count"] = (crossed_data["Y"] - crossed_data["Y"].min()).round() - model0 = bmb.Model("count ~ dummy + continuous + threecats", crossed_data, family="poisson") - model0.fit(tune=0, draws=1) - - # build model using add - model1 = bmb.Model("count ~ threecats + continuous + dummy", crossed_data, family="poisson") - model1.fit(tune=0, draws=1) - - # check that term names agree - assert set(model0.response_component.terms) == set(model1.response_component.terms) - - # check that common effect design matrices are the same, - # even if term names / level names / order of columns is different - - X0_list = [] - X1_list = [] - for term in model0.response_component.common_terms.values(): - if term.levels is not None: - for level_idx in range(len(term.levels)): - X0_list.append(tuple(term.data[:, level_idx])) - else: - X0_list.append(tuple(term.data)) - - for term in model1.response_component.common_terms.values(): - if term.levels is not None: - for level_idx in range(len(term.levels)): - X1_list.append(tuple(term.data[:, level_idx])) - else: - X1_list.append(tuple(term.data)) - - assert set(X0_list) == set(X1_list) - - # check that models have same priors for common effects - priors0 = { - x.name: x.prior.args - for x in model0.response_component.terms.values() - if not isinstance(x, GroupSpecificTerm) - } - priors1 = { - x.name: x.prior.args - for x in model1.response_component.terms.values() - if not isinstance(x, GroupSpecificTerm) - } - # check dictionary keys - assert set(priors0) == set(priors1) - # check dictionary values - def dicts_close(a, b): - if set(a) != set(b): - return False - else: - return [np.allclose(a[x], b[x], atol=0, rtol=0.01) for x in a.keys()] - - assert all([dicts_close(priors0[x], priors1[x]) for x in priors0.keys()]) - - -def test_laplace(): - data = pd.DataFrame(np.repeat((0, 1), (30, 60)), columns=["w"]) - priors = {"Intercept": bmb.Prior("Uniform", lower=0, upper=1)} - model = bmb.Model("w ~ 1", data=data, family="bernoulli", priors=priors, link="identity") - results = model.fit(inference_method="laplace") - mode_n = results.posterior["Intercept"].mean().item() - std_n = results.posterior["Intercept"].std().item() - mode_a = data.mean() - std_a = data.std() / len(data) ** 0.5 - np.testing.assert_array_almost_equal((mode_n, std_n), (mode_a.item(), std_a.item()), decimal=2) - - -def test_vi(): - data = pd.DataFrame(np.repeat((0, 1), (30, 60)), columns=["w"]) - priors = {"Intercept": bmb.Prior("Uniform", lower=0, upper=1)} - model = bmb.Model("w ~ 1", data=data, family="bernoulli", priors=priors, link="identity") - results = model.fit(inference_method="vi", method="advi") - samples = results.sample(1000).posterior["Intercept"] - mode_n = samples.mean() - std_n = samples.std() - mode_a = data.mean() - std_a = data.std() / len(data) ** 0.5 - np.testing.assert_array_almost_equal( - (mode_n.item(), std_n.item()), (mode_a.item(), std_a.item()), decimal=2 - ) - - -def test_prior_predictive(crossed_data): - crossed_data["count"] = (crossed_data["Y"] - crossed_data["Y"].min()).round() - # New default priors are too wide for this case... something to keep investigating - model = bmb.Model( - "count ~ threecats + continuous + dummy", - crossed_data, - family="poisson", - ) - model.build() - pps = model.prior_predictive(draws=500, random_seed=1234) - - keys = ["Intercept", "threecats", "continuous", "dummy"] - shapes = [(1, 500), (1, 500, 2), (1, 500), (1, 500)] - - for key, shape in zip(keys, shapes): - assert pps.prior[key].shape == shape - - assert pps.prior_predictive["count"].shape == (1, 500, 120) - assert pps.observed_data["count"].shape == (120,) - - pps = model.prior_predictive(draws=500, var_names=["count"], random_seed=1234) - assert pps.groups() == ["prior_predictive", "observed_data"] - - pps = model.prior_predictive(draws=500, var_names=["Intercept"], random_seed=1234) - assert pps.groups() == ["prior", "observed_data"] - - -def test_posterior_predictive(crossed_data): - crossed_data["count"] = (crossed_data["Y"] - crossed_data["Y"].min()).round() - model = bmb.Model("count ~ threecats + continuous + dummy", crossed_data, family="poisson") - fitted = model.fit(tune=0, draws=10, chains=2) - pps = model.predict(fitted, kind="pps", inplace=False) - - assert pps.posterior_predictive["count"].shape == (2, 10, 120) - - pps = model.predict(fitted, kind="pps", inplace=True) - - assert pps is None - assert fitted.posterior_predictive["count"].shape == (2, 10, 120) - - -def test_gamma_regression(dm): - # simulated data - rng = np.random.default_rng(1234) - a, b, N, shape_true = 0.5, 1.2, 100, 10 # alpha - x = rng.uniform(-1, 1, N) - y_true = np.exp(a + b * x) - - y = rng.gamma(shape_true, y_true / shape_true, N) - data = pd.DataFrame({"x": x, "y": y}) - model = bmb.Model("y ~ x", data, family="gamma", link="log") - model.fit(draws=10, tune=10) - - # Real data, categorical predictor. - data = dm[["order", "ind_mg_dry"]] - model = bmb.Model("ind_mg_dry ~ order", data, family="gamma", link="log") - model.fit(draws=10, tune=10) - - -def test_beta_regression(): - data_dir = join(dirname(__file__), "data") - data = pd.read_csv(join(data_dir, "gasoline.csv")) - model = bmb.Model("yield ~ temp + batch", data, family="beta", categorical="batch") - model.fit(draws=10, tune=10, target_accept=0.9) - - -def test_t_regression(data_100): - bmb.Model("n1 ~ n2", data_100, family="t").fit(draws=10, tune=10) - - -def test_vonmises_regression(): - rng = np.random.default_rng(1234) - data = pd.DataFrame({"y": rng.vonmises(0, 1, size=100), "x": rng.normal(size=100)}) - bmb.Model("y ~ x", data, family="vonmises").fit(draws=10, tune=10) - - -def test_quantile_regression(): - rng = np.random.default_rng(1234) - x = rng.uniform(2, 10, 100) - y = 2 * x + rng.normal(0, 0.6 * x**0.75) - data = pd.DataFrame({"x": x, "y": y}) - bmb_model0 = bmb.Model("y ~ x", data, family="asymmetriclaplace", priors={"kappa": 9}) - idata0 = bmb_model0.fit() - bmb_model0.predict(idata0) - - bmb_model1 = bmb.Model("y ~ x", data, family="asymmetriclaplace", priors={"kappa": 0.1}) - idata1 = bmb_model1.fit() - bmb_model1.predict(idata1) - - assert np.all( - idata0.posterior["y_mean"].mean(("chain", "draw")) - > idata1.posterior["y_mean"].mean(("chain", "draw")) - ) - - -def test_plot_priors(crossed_data): - model = bmb.Model("Y ~ 0 + threecats", crossed_data) - with pytest.raises(ValueError, match="Model is not built yet"): - model.plot_priors() - model.build() - model.plot_priors() - - -def test_model_graph(crossed_data): - model = bmb.Model("Y ~ 0 + threecats", crossed_data) - with pytest.raises(ValueError, match="Model is not built yet"): - model.graph() - model.build() - model.graph() - - -def test_potentials(): - data = pd.DataFrame(np.repeat((0, 1), (18, 20)), columns=["w"]) - priors = {"Intercept": bmb.Prior("Uniform", lower=0, upper=1)} - - potentials = [ - (("Intercept", "Intercept"), lambda x, y: bmb.math.switch(x < 0.45, y, -np.inf)), - ("Intercept", lambda x: bmb.math.switch(x > 0.55, 0, -np.inf)), - ] - - model = bmb.Model( - "w ~ 1", - data, - family="bernoulli", - link="identity", - priors=priors, - potentials=potentials, - ) - model.build() - assert len(model.backend.model.potentials) == 2 - - pot0 = model.backend.model.potentials[0].get_parents()[0] - pot1 = model.backend.model.potentials[1].get_parents()[0] - assert pot0.__str__() == "Switch(Lt.0, Intercept, -inf)" - assert pot1.__str__() == "Switch(Gt.0, 0, -inf)" - - -def test_binomial_regression(): - data = pd.DataFrame( - { - "x": np.array([1.6907, 1.7242, 1.7552, 1.7842, 1.8113, 1.8369, 1.8610, 1.8839]), - "n": np.array([59, 60, 62, 56, 63, 59, 62, 60]), - "y": np.array([6, 13, 18, 28, 52, 53, 61, 60]), - } - ) - - model = bmb.Model("prop(y, n) ~ x", data, family="binomial") - model.fit(draws=10, tune=10) - - # Using constant instead of variable in data frame - model = bmb.Model("prop(y, 62) ~ x", data, family="binomial") - model.fit(draws=10, tune=10) - - -@pytest.mark.skip(reason="this example no longer trigger the fallback to adapt_diag") -def test_init_fallback(init_data, caplog): - model = bmb.Model("od ~ temp + (1|source) + 0", init_data) - with caplog.at_level(logging.INFO): - model.fit(draws=100, init="auto") - assert "Initializing NUTS using jitter+adapt_diag..." in caplog.text - assert "The default initialization" in caplog.text - assert "Initializing NUTS using adapt_diag..." in caplog.text - - -def test_categorical_family(inhaler): - model = bmb.Model("rating ~ period + carry + treat", inhaler, family="categorical") - model.fit(draws=10, tune=10) - - -def test_categorical_family_varying_intercept(inhaler): - model = bmb.Model( - "rating ~ period + carry + treat + (1|subject)", inhaler, family="categorical" - ) - model.fit(draws=10, tune=10) - - -def test_categorical_family_categorical_predictors(categorical_family_categorical_predictor): - formula = "response ~ group + city" - model = bmb.Model(formula, categorical_family_categorical_predictor, family="categorical") - model.fit(draws=10, tune=10) - - -def test_set_alias(data_100): - model = bmb.Model("n1 ~ n2 + (n2|cat1)", data_100) - aliases = { - "Intercept": "α", - "n2": "β", - "1|cat1": "α_group", - "n2|cat1": "β_group", - "sigma": "σ", - } - model.set_alias(aliases) - model.build() - new_names = set(["α", "β", "α_group", "α_group_σ", "β_group", "β_group_σ", "σ"]) - assert new_names.issubset(set(model.backend.model.named_vars)) - - -def test_fit_include_mean(crossed_data): - draws = 500 - model = bmb.Model("Y ~ continuous * threecats", crossed_data) - idata = model.fit(tune=draws, draws=draws, include_mean=True) - assert idata.posterior["Y_mean"].shape[1:] == (draws, 120) - - # Compare with the mean obtained with `model.predict()` - mean = idata.posterior["Y_mean"].stack(sample=("chain", "draw")).values.mean(1) - - model.predict(idata) - predicted_mean = idata.posterior["Y_mean"].stack(sample=("chain", "draw")).values.mean(1) - - assert np.array_equal(mean, predicted_mean) - - -def test_group_specific_splines(): - x_check = pd.DataFrame( - { - "x": [ - 82.0, - 143.0, - 426.0, - 641.0, - 1156.0, - 986.0, - 365.0, - 187.0, - 254.0, - 550.0, - 101.0, - 661.0, - 327.0, - 119.0, - ], - "day": ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"] * 2, - "y": [ - 571.0, - 684.0, - 1652.0, - 2130.0, - 2455.0, - 1874.0, - 1288.0, - 1011.0, - 1004.0, - 1993.0, - 593.0, - 1986.0, - 1503.0, - 711.0, - ], - } - ) - knots = np.array([191.0, 297.0, 512.5]) - - model = bmb.Model("y ~ (bs(x, knots=knots, intercept=False, degree=1)|day)", data=x_check) - model.build() - - -def test_2d_response_no_shape(): - """ - This tests whether a model where there's a single linear predictor and a response with - response.ndim > 1 works well, without Bambi causing any shape problems. - See https://github.com/bambinos/bambi/pull/629 - Updated https://github.com/bambinos/bambi/pull/632 - """ - - def fn(name, p, observed, **kwargs): - y = observed[:, 0].flatten() - n = observed[:, 1].flatten() - # It's the users' responsibility to take only the first dim - kwargs["dims"] = kwargs.get("dims")[0] - return pm.Binomial(name, p=p, n=n, observed=y, **kwargs) - - likelihood = bmb.Likelihood("CustomBinomial", params=["p"], parent="p", dist=fn) - link = bmb.Link("logit") - family = bmb.Family("custom-binomial", likelihood, link) - - data = pd.DataFrame( - { - "x": np.array([1.6907, 1.7242, 1.7552, 1.7842, 1.8113, 1.8369, 1.8610, 1.8839]), - "n": np.array([59, 60, 62, 56, 63, 59, 62, 60]), - "y": np.array([6, 13, 18, 28, 52, 53, 61, 60]), - } - ) - - model = bmb.Model("prop(y, n) ~ x", data, family=family) - model.fit(draws=10, tune=10) - - -def test_zero_inflated_poisson(): - rng = np.random.default_rng(121195) - - # Basic intercept-only model - x = np.concatenate([np.zeros(250), rng.poisson(lam=3, size=750)]) - df = pd.DataFrame({"response": x}) - - model = bmb.Model("response ~ 1", df, family="zero_inflated_poisson") - idata = model.fit(chains=2, tune=200, draws=200, random_seed=121195) - model.predict(idata, kind="pps") - - # Distributional model - x = np.sort(rng.uniform(0.2, 3, size=1000)) - - b0, b1 = 0.2, 0.9 - a0, a1 = 2.5, -0.7 - mu = np.exp(b0 + b1 * x) - psi = expit(a0 + a1 * x) - - y = pm.draw(pm.ZeroInflatedPoisson.dist(mu=mu, psi=psi)) - df = pd.DataFrame({"y": y, "x": x}) - - formula = bmb.Formula("y ~ x", "psi ~ x") - model = bmb.Model(formula, df, family="zero_inflated_poisson") - idata = model.fit(chains=2, tune=200, draws=200, random_seed=121195) - model.predict(idata, kind="pps") - - -def test_zero_inflated_binomial(): - rng = np.random.default_rng(121195) - - # Basic intercept-only model - y = pm.draw(pm.ZeroInflatedBinomial.dist(p=0.5, n=30, psi=0.7), draws=500, random_seed=1234) - df = pd.DataFrame({"y": y}) - model = bmb.Model("p(y, 30) ~ 1", df, family="zero_inflated_binomial") - idata = model.fit(chains=2, tune=200, draws=200, random_seed=121195) - model.predict(idata, kind="pps") - - # Distributional model - x = np.sort(rng.uniform(0.2, 3, size=500)) - b0, b1 = -0.5, 0.5 - a0, a1 = 2, -0.7 - p = expit(b0 + b1 * x) - psi = expit(a0 + a1 * x) - - y = pm.draw(pm.ZeroInflatedBinomial.dist(p=p, psi=psi, n=30)) - df = pd.DataFrame({"y": y, "x": x}) - - formula = bmb.Formula("prop(y, 30) ~ x", "psi ~ x") - model = bmb.Model(formula, df, family="zero_inflated_binomial") - idata = model.fit(chains=2, tune=200, draws=200, random_seed=121195) - model.predict(idata, kind="pps") - - -def test_zero_inflated_negativebinomial(): - rng = np.random.default_rng(121195) - - # Basic intercept-only model - y = pm.draw( - pm.ZeroInflatedNegativeBinomial.dist(mu=5, alpha=30, psi=0.7), draws=500, random_seed=1234 - ) - df = pd.DataFrame({"y": y}) - priors = {"alpha": bmb.Prior("HalfNormal", sigma=20)} - model = bmb.Model("y ~ 1", df, family="zero_inflated_negativebinomial", priors=priors) - idata = model.fit(chains=2, tune=200, draws=200, random_seed=121195) - model.predict(idata, kind="pps") - - # Distributional model - x = np.sort(rng.uniform(0.2, 3, size=500)) - b0, b1 = 0.5, 0.35 - a0, a1 = 2, -0.7 - mu = np.exp(b0 + b1 * x) - psi = expit(a0 + a1 * x) - - y = pm.draw(pm.ZeroInflatedNegativeBinomial.dist(mu=mu, alpha=30, psi=psi)) - df = pd.DataFrame({"y": y, "x": x}) - - priors = {"alpha": bmb.Prior("HalfNormal", sigma=20)} - formula = bmb.Formula("y ~ x", "psi ~ x") - model = bmb.Model(formula, df, family="zero_inflated_negativebinomial", priors=priors) - idata = model.fit(chains=2, tune=200, draws=200, random_seed=121195) - model.predict(idata, kind="pps") - - -def test_hurlde_families(): - df = pd.DataFrame({"y": pm.draw(pm.HurdlePoisson.dist(0.5, mu=3.5), 1000)}) - model = bmb.Model("y ~ 1", df, family="hurdle_poisson") - idata = model.fit() - model.predict(idata, kind="pps") - - df = pd.DataFrame({"y": pm.draw(pm.HurdleNegativeBinomial.dist(0.6, 5, 3), 1000)}) - model = bmb.Model("y ~ 1", df, family="hurdle_negativebinomial") - idata = model.fit() - model.predict(idata, kind="pps") - - df = pd.DataFrame({"y": pm.draw(pm.HurdleGamma.dist(0.8, alpha=10, beta=1), 1000)}) - model = bmb.Model("y ~ 1", df, family="hurdle_gamma") - idata = model.fit() - model.predict(idata, kind="pps") - - df = pd.DataFrame({"y": pm.draw(pm.HurdleLogNormal.dist(0.7, mu=0, sigma=0.2), 1000)}) - model = bmb.Model("y ~ 1", df, family="hurdle_lognormal") - idata = model.fit() - model.predict(idata, kind="pps") - - -@pytest.mark.parametrize( - "family, link", - [ - ("cumulative", "logit"), - ("cumulative", "probit"), - ("cumulative", "cloglog"), - ("sratio", "logit"), - ("sratio", "probit"), - ("sratio", "cloglog"), - ], -) -def test_ordinal_families(inhaler, family, link): - data = inhaler.copy() - data["carry"] = pd.Categorical(data["carry"]) # To have both numeric and categoric predictors - model = bmb.Model("rating ~ period + carry + treat", data, family=family, link=link) - idata = model.fit(tune=100, draws=100) - model.predict(idata, kind="pps") - assert np.allclose(idata.posterior["rating_mean"].sum("rating_dim").to_numpy(), 1) - assert np.all(np.unique(idata.posterior_predictive["rating"]) == np.array([0, 1, 2, 3])) - - -def test_cumulative_family_priors(inhaler): - priors = { - "threshold": bmb.Prior( - "Normal", - mu=[-0.5, 0, 0.5], - sigma=1.5, - transform=pm.distributions.transforms.ordered, - ) - } - model = bmb.Model( - "rating ~ 0 + period + carry + treat", inhaler, family="cumulative", priors=priors - ) - model.fit(tune=100, draws=100) - - -def test_predict_new_groups_fail(sleepstudy): - model = bmb.Model("Reaction ~ 1 + Days + (1 + Days | Subject)", sleepstudy) - idata = model.fit(tune=20, draws=20) - - df_new = sleepstudy.head(10).reset_index(drop=True) - df_new["Subject"] = "xxx" - to_match = "There are new groups for the factors ('Subject',) and 'sample_new_groups' is False." - with pytest.raises(ValueError, match=re.escape(to_match)): - model.predict(idata, data=df_new) - - -@pytest.mark.parametrize( - "data,formula,family,df_new", - [ - ( - "sleepstudy", - "Reaction ~ 1 + Days + (1 + Days | Subject)", - "gaussian", - pd.DataFrame({"Days": [1, 2, 3], "Subject": ["x", "y", "z"]}), - ), - ( - "inhaler", - "rating ~ 1 + period + treat + (1 + treat|subject)", - "categorical", - pd.DataFrame( - { - "subject": [1, 999], - "rating": [1, 1], - "treat": [0.5, 0.5], - "period": [0.5, 0.5], - "carry": [0, 0], - } - ), - ), - ( - "crossed_data", - "Y ~ 0 + threecats + (0 + threecats | subj)", - "gaussian", - pd.DataFrame({"threecats": ["a", "a"], "subj": ["0", "11"]}), - ), - ], -) -def test_predict_new_groups(data, formula, family, df_new, request): - data = request.getfixturevalue(data) - model = bmb.Model(formula, data, family=family) - idata = model.fit(tune=100, draws=100) - model.predict(idata, data=df_new, sample_new_groups=True) - - -def test_censored_response(): - data = bmb.load_data("kidney") - data["status"] = np.where(data["censored"] == 0, "none", "right") - - # Model 1, with intercept - priors = { - "Intercept": bmb.Prior("Normal", mu=0, sigma=1), - "sex": bmb.Prior("Normal", mu=0, sigma=2), - "age": bmb.Prior("Normal", mu=0, sigma=1), - "alpha": bmb.Prior("Gamma", alpha=3, beta=5), - } - model = bmb.Model( - "censored(time, status) ~ 1 + sex + age", data, family="weibull", link="log", priors=priors - ) - idata = model.fit(tune=100, draws=100, random_seed=121195) - model.predict(idata, kind="pps") - model.predict(idata, data=data, kind="pps") - - # Model 2, without intercept - priors = { - "sex": bmb.Prior("Normal", mu=0, sigma=2), - "age": bmb.Prior("Normal", mu=0, sigma=1), - "alpha": bmb.Prior("Gamma", alpha=3, beta=5), - } - model = bmb.Model( - "censored(time, status) ~ 0 + sex + age", data, family="weibull", link="log", priors=priors - ) - idata = model.fit(tune=100, draws=100, random_seed=121195) - model.predict(idata, kind="pps") - model.predict(idata, data=data, kind="pps") - - # Model 3, with group-specific effects - priors = { - "alpha": bmb.Prior("Gamma", alpha=3, beta=5), - "sex": bmb.Prior("Normal", mu=0, sigma=1), - "age": bmb.Prior("Normal", mu=0, sigma=1), - "1|patient": bmb.Prior("Normal", mu=0, sigma=bmb.Prior("InverseGamma", alpha=5, beta=10)), - } - model = bmb.Model( - "censored(time, status) ~ 1 + sex + age + (1|patient)", - data, - family="weibull", - link="log", - priors=priors, - ) - idata = model.fit(tune=100, draws=100, random_seed=121195) - model.predict(idata, kind="pps") - model.predict(idata, data=data, kind="pps") - - -def test_truncated_response(): - rng = np.random.default_rng(12345) - slope, intercept, sigma, N = 1, 0, 2, 200 - x = rng.uniform(-10, 10, N) - y = rng.normal(loc=slope * x + intercept, scale=sigma) - - def truncate_y(x, y, bounds): - - return (x[keep], y[keep]) - - bounds = [-5, 5] - keep = (y >= bounds[0]) & (y <= bounds[1]) - xt = x[keep] - yt = y[keep] - - df = pd.DataFrame({"x": xt, "y": yt}) - priors = { - "Intercept": bmb.Prior("Normal", mu=0, sigma=1), - "x": bmb.Prior("Normal", mu=0, sigma=1), - "sigma": bmb.Prior("HalfNormal", sigma=1), - } - model = bmb.Model("truncated(y, -5, 5) ~ x", df, priors=priors) - idata = model.fit(tune=100, draws=100, random_seed=1234) - model.predict(idata, kind="pps") diff --git a/tests/test_interpret_messages.py b/tests/test_interpret_messages.py index 1fe27eecb..a1a875471 100644 --- a/tests/test_interpret_messages.py +++ b/tests/test_interpret_messages.py @@ -16,8 +16,6 @@ def mtcars(): # Use caplog fixture to capture log messages generated by the interpret logger - - def test_predictions_list(mtcars, caplog): model, idata = mtcars caplog.set_level("INFO", logger="__bambi_interpret__") diff --git a/tests/test_model_construction.py b/tests/test_model_construction.py index 6a2942e7d..a79d1a5c1 100644 --- a/tests/test_model_construction.py +++ b/tests/test_model_construction.py @@ -1,3 +1,5 @@ +import logging + from functools import reduce from operator import add from os.path import dirname, join @@ -60,6 +62,16 @@ def crossed_data(): return data +@pytest.fixture(scope="module") +def init_data(): + """ + Data used to test initialization method + """ + data_dir = join(dirname(__file__), "data") + data = pd.read_csv(join(data_dir, "obs.csv")) + return data + + def test_term_init(diabetes_data): design = design_matrices("BMI", diabetes_data) term = design.common.terms["BMI"] @@ -207,7 +219,7 @@ def test_model_term_classes(): def test_one_shot_formula_fit(diabetes_data): model = bmb.Model("S3 ~ S1 + S2", diabetes_data) - model.fit(draws=50) + model.fit(tune=100, draws=100) named_vars = model.backend.model.named_vars targets = ["S3", "S1", "Intercept"] assert len(set(named_vars.keys()) & set(targets)) == 3 @@ -467,3 +479,100 @@ def test_extra_namespace(): model = bmb.Model(formula, data, family="poisson", link="log", extra_namespace=extra_namespace) term = model.response_component.terms["C(veh_body, levels=levels)"] assert (np.asarray(term.levels) == data["veh_body"].unique()).all() + + +def test_drop_na(crossed_data, caplog): + crossed_data_missing = crossed_data.copy() + crossed_data_missing.loc[0, "Y"] = np.nan + crossed_data_missing.loc[1, "continuous"] = np.nan + crossed_data_missing.loc[2, "threecats"] = np.nan + + with caplog.at_level(logging.INFO): + bmb.Model("Y ~ continuous + threecats", crossed_data_missing, dropna=True) + assert "Automatically removing 3/120 rows from the dataset." in caplog.text + + with pytest.raises(ValueError, match="'data' contains 3 incomplete rows"): + bmb.Model("Y ~ continuous + threecats", crossed_data_missing) + + +def test_plot_priors(crossed_data): + model = bmb.Model("Y ~ 0 + threecats", crossed_data) + with pytest.raises(ValueError, match="Model is not built yet"): + model.plot_priors() + model.build() + model.plot_priors() + + +def test_model_graph(crossed_data): + model = bmb.Model("Y ~ 0 + threecats", crossed_data) + with pytest.raises(ValueError, match="Model is not built yet"): + model.graph() + model.build() + model.graph() + + +def test_potentials(): + data = pd.DataFrame(np.repeat((0, 1), (18, 20)), columns=["w"]) + priors = {"Intercept": bmb.Prior("Uniform", lower=0, upper=1)} + + potentials = [ + (("Intercept", "Intercept"), lambda x, y: bmb.math.switch(x < 0.45, y, -np.inf)), + ("Intercept", lambda x: bmb.math.switch(x > 0.55, 0, -np.inf)), + ] + + model = bmb.Model( + "w ~ 1", + data, + family="bernoulli", + link="identity", + priors=priors, + potentials=potentials, + ) + model.build() + assert len(model.backend.model.potentials) == 2 + + pot0 = model.backend.model.potentials[0].get_parents()[0] + pot1 = model.backend.model.potentials[1].get_parents()[0] + assert pot0.__str__() == "Switch(Lt.0, Intercept, -inf)" + assert pot1.__str__() == "Switch(Gt.0, 0, -inf)" + + +@pytest.mark.skip(reason="this example no longer trigger the fallback to adapt_diag") +def test_init_fallback(init_data, caplog): + model = bmb.Model("od ~ temp + (1|source) + 0", init_data) + with caplog.at_level(logging.INFO): + model.fit(draws=100, init="auto") + assert "Initializing NUTS using jitter+adapt_diag..." in caplog.text + assert "The default initialization" in caplog.text + assert "Initializing NUTS using adapt_diag..." in caplog.text + + +def test_2d_response_no_shape(): + """ + This tests whether a model where there's a single linear predictor and a response with + response.ndim > 1 works well, without Bambi causing any shape problems. + See https://github.com/bambinos/bambi/pull/629 + Updated https://github.com/bambinos/bambi/pull/632 + """ + + def fn(name, p, observed, **kwargs): + y = observed[:, 0].flatten() + n = observed[:, 1].flatten() + # It's the users' responsibility to take only the first dim + kwargs["dims"] = kwargs.get("dims")[0] + return pm.Binomial(name, p=p, n=n, observed=y, **kwargs) + + likelihood = bmb.Likelihood("CustomBinomial", params=["p"], parent="p", dist=fn) + link = bmb.Link("logit") + family = bmb.Family("custom-binomial", likelihood, link) + + data = pd.DataFrame( + { + "x": np.array([1.6907, 1.7242, 1.7552, 1.7842, 1.8113, 1.8369, 1.8610, 1.8839]), + "n": np.array([59, 60, 62, 56, 63, 59, 62, 60]), + "y": np.array([6, 13, 18, 28, 52, 53, 61, 60]), + } + ) + + model = bmb.Model("prop(y, n) ~ x", data, family=family) + model.fit(draws=10, tune=10) diff --git a/tests/test_models.py b/tests/test_models.py new file mode 100644 index 000000000..386a08b46 --- /dev/null +++ b/tests/test_models.py @@ -0,0 +1,1336 @@ +import re + +from os.path import dirname, join + +import pytest + +import bambi as bmb +import numpy as np +import pandas as pd +import pymc as pm + +from bambi.terms import GroupSpecificTerm + +TUNE = 50 +DRAWS = 50 + + +@pytest.fixture(scope="module") +def crossed_data(): + """ + Group specific effects: + 10 subjects, 12 items, 5 sites + Subjects crossed with items, nested in sites + Items crossed with sites + + common effects: + A continuous predictor, a numeric dummy, and a three-level category + (levels a,b,c) + + Structure: + Subjects nested in dummy (e.g., gender), crossed with threecats + Items crossed with dummy, nested in threecats + Sites partially crossed with dummy (4/5 see a single dummy, 1/5 sees both + dummies) + Sites crossed with threecats + """ + + data_dir = join(dirname(__file__), "data") + data = pd.read_csv(join(data_dir, "crossed_random.csv")) + data["subj"] = data["subj"].astype(str) + data["fourcats"] = sum([[x] * 10 for x in ["a", "b", "c", "d"]], list()) * 3 + return data + + +@pytest.fixture(scope="module") +def data_n100(): + size = 100 + rng = np.random.default_rng(121195) + data = pd.DataFrame( + { + "b1": rng.binomial(n=1, p=0.5, size=size), + "n1": rng.poisson(lam=2, size=size), + "n2": rng.poisson(lam=2, size=size), + "y1": rng.normal(size=size), + "y2": rng.normal(size=size), + "y3": rng.normal(size=size), + "cat2": rng.choice(["a", "b"], size=size), + "cat4": rng.choice(list("MNOP"), size=size), + "cat5": rng.choice(list("FGHIJK"), size=size), + } + ) + return data + + +@pytest.fixture(scope="module") +def beetle_data(): + return pd.DataFrame( + { + "x": np.array([1.6907, 1.7242, 1.7552, 1.7842, 1.8113, 1.8369, 1.8610, 1.8839]), + "n": np.array([59, 60, 62, 56, 63, 59, 62, 60]), + "y": np.array([6, 13, 18, 28, 52, 53, 61, 60]), + } + ) + + +@pytest.fixture(scope="module") +def gasoline_data(): + data_dir = join(dirname(__file__), "data") + return pd.read_csv(join(data_dir, "gasoline.csv")) + + +@pytest.fixture(scope="module") +def inhaler_data(): + data_dir = join(dirname(__file__), "data") + data = pd.read_csv(join(data_dir, "inhaler.csv")) + data["rating"] = pd.Categorical(data["rating"], categories=[1, 2, 3, 4]) + return data + + +@pytest.fixture(scope="module") +def cat_response_cat_preds_data(): + data_dir = join(dirname(__file__), "data") + data = pd.read_csv(join(data_dir, "categorical_family_categorical_predictor.csv")) + return data + + +@pytest.fixture(scope="module") +def zi_count_data(): + rng = np.random.default_rng(1234) + n1, n2 = 30, 70 + y = np.concatenate([np.zeros(n1), rng.poisson(3, size=n2)]) + x = np.concatenate( + [rng.normal(loc=-1, scale=0.25, size=n1), rng.normal(loc=0.5, scale=0.5, size=n2)] + ) + return pd.DataFrame({"x": x, "y": y}) + + +@pytest.fixture(scope="module") +def zi_bounded_count_data(): + rng = np.random.default_rng(1234) + n1, n2 = 40, 60 + y = np.concatenate([np.zeros(n1), rng.binomial(n=30, p=0.6, size=n2)]) + x = np.concatenate( + [rng.normal(loc=-1, scale=0.25, size=n1), rng.normal(loc=0.5, scale=0.5, size=n2)] + ) + return pd.DataFrame({"x": x, "y": y}) + + +@pytest.fixture(scope="module") +def zi_continuous_data(): + rng = np.random.default_rng(1234) + n1, n2 = 40, 60 + y = np.concatenate([np.zeros(n1), rng.gamma(shape=2, scale=3, size=n2)]) + x = np.concatenate( + [rng.normal(loc=-1, scale=0.25, size=n1), rng.normal(loc=0.5, scale=0.5, size=n2)] + ) + return pd.DataFrame({"x": x, "y": y}) + + +@pytest.fixture(scope="module") +def kidney_data(): + data = bmb.load_data("kidney") + data["status"] = np.where(data["censored"] == 0, "none", "right") + return data + + +@pytest.fixture(scope="module") +def truncated_data(): + rng = np.random.default_rng(12345) + slope, intercept, sigma, N = 1, 0, 2, 200 + x = rng.uniform(-10, 10, N) + y = rng.normal(loc=slope * x + intercept, scale=sigma) + bounds = [-5, 5] + keep = (y >= bounds[0]) & (y <= bounds[1]) + xt = x[keep] + yt = y[keep] + + return pd.DataFrame({"x": xt, "y": yt}) + + +@pytest.fixture(scope="module") +def multinomial_data(inhaler_data): + df = inhaler_data.groupby(["treat", "carry", "rating"], as_index=False).size() + df = df.pivot(index=["treat", "carry"], columns="rating", values="size").reset_index() + df.columns = ["treat", "carry", "y1", "y2", "y3", "y4"] + return df + + +@pytest.fixture(scope="module") +def sleepstudy(): + return bmb.load_data("sleepstudy") + + +class FitPredictParent: + def fit(self, model, **kwargs): + return model.fit(tune=TUNE, draws=DRAWS, **kwargs) + + def predict_oos(self, model, idata, data=None): + # Reuse the original data + if data is None: + data = model.data.head() + return model.predict(idata, kind="pps", data=data, inplace=False) + + +class TestGaussian(FitPredictParent): + def test_intercept_only_model(self, crossed_data): + model = bmb.Model("Y ~ 1", crossed_data) + idata = self.fit(model) + self.predict_oos(model, idata) + + def test_slope_only_model(self, crossed_data): + model = bmb.Model("Y ~ 0 + continuous", crossed_data) + idata = self.fit(model) + self.predict_oos(model, idata) + + def test_cell_means_parameterization(self, crossed_data): + model = bmb.Model("Y ~ 0 + threecats", crossed_data) + idata = self.fit(model) + assert list(idata.posterior["threecats_dim"]) == ["a", "b", "c"] + self.predict_oos(model, idata) + + def test_2_factors_saturated(self, crossed_data): + model = bmb.Model("Y ~ threecats*fourcats", crossed_data) + idata = self.fit(model) + assert list(idata.posterior.data_vars) == [ + "Intercept", + "threecats", + "fourcats", + "threecats:fourcats", + "Y_sigma", + ] + assert list(idata.posterior["threecats_dim"].values) == ["b", "c"] + assert list(idata.posterior["fourcats_dim"].values) == ["b", "c", "d"] + assert list(idata.posterior["threecats:fourcats_dim"].values) == [ + "b, b", + "b, c", + "b, d", + "c, b", + "c, c", + "c, d", + ] + self.predict_oos(model, idata) + + def test_2_factors_no_intercept(self, crossed_data): + model = bmb.Model("Y ~ 0 + threecats*fourcats", crossed_data) + idata = self.fit(model) + assert list(idata.posterior.data_vars) == [ + "threecats", + "fourcats", + "threecats:fourcats", + "Y_sigma", + ] + assert list(idata.posterior["threecats_dim"].values) == ["a", "b", "c"] + assert list(idata.posterior["fourcats_dim"].values) == ["b", "c", "d"] + assert list(idata.posterior["threecats:fourcats_dim"].values) == [ + "b, b", + "b, c", + "b, d", + "c, b", + "c, c", + "c, d", + ] + self.predict_oos(model, idata) + + def test_2_factors_cell_means(self, crossed_data): + model = bmb.Model("Y ~ 0 + threecats:fourcats", crossed_data) + idata = self.fit(model) + assert list(idata.posterior.data_vars) == ["threecats:fourcats", "Y_sigma"] + assert list(idata.posterior["threecats:fourcats_dim"].values) == [ + "a, a", + "a, b", + "a, c", + "a, d", + "b, a", + "b, b", + "b, c", + "b, d", + "c, a", + "c, b", + "c, c", + "c, d", + ] + self.predict_oos(model, idata) + + def test_cell_means_with_covariate(self, crossed_data): + model = bmb.Model("Y ~ 0 + threecats + continuous", crossed_data) + idata = self.fit(model) + assert list(idata.posterior.data_vars) == ["threecats", "continuous", "Y_sigma"] + assert list(idata.posterior["threecats_dim"].values) == ["a", "b", "c"] + self.predict_oos(model, idata) + + def test_many_common_many_group_specific(self, crossed_data): + # Comparing implicit/explicit intercepts for group specific effects work the same way. + terms0 = [ + "continuous", + "dummy", + "threecats", + "(threecats|subj)", + "(1|item)", + "(0 + continuous|item)", + "(dummy|item)", + "(threecats|site)", + ] + terms1 = [ + "continuous", + "dummy", + "threecats", + "(threecats|subj)", + "(continuous|item)", + "(dummy|item)", + "(threecats|site)", + ] + + model0 = bmb.Model("Y ~ " + " + ".join(terms0), crossed_data) + idata0 = self.fit(model0) + self.predict_oos(model0, idata0) + + model1 = bmb.Model("Y ~ " + " + ".join(terms1), crossed_data) + idata1 = self.fit(model1) + self.predict_oos(model1, idata1) + + # Check that the group specific effects design matrices have the same shape + X0 = pd.concat( + [pd.DataFrame(t.data) for t in model0.response_component.group_specific_terms.values()], + axis=1, + ) + X1 = pd.concat( + [pd.DataFrame(t.data) for t in model1.response_component.group_specific_terms.values()], + axis=1, + ) + assert X0.shape == X1.shape + + # check that the group specific effect design matrix contain the same columns, + # even if term names / columns names / order of columns is different + X0_set = set(tuple(X0.iloc[:, i]) for i in range(len(X0.columns))) + X1_set = set(tuple(X1.iloc[:, i]) for i in range(len(X1.columns))) + assert X0_set == X1_set + + # check that common effect design matrices are the same, + # even if term names / level names / order of columns is different + X0_list = [] + X1_list = [] + for term in model0.response_component.common_terms.values(): + if term.levels is not None: + for level_idx in range(len(term.levels)): + X0_list.append(tuple(term.data[:, level_idx])) + else: + X0_list.append(tuple(term.data)) + + for term in model1.response_component.common_terms.values(): + if term.levels is not None: + for level_idx in range(len(term.levels)): + X1_list.append(tuple(term.data[:, level_idx])) + else: + X1_list.append(tuple(term.data)) + + assert set(X0_list) == set(X1_list) + + # check that models have same priors for common effects + priors0 = { + x.name: x.prior.args + for x in model0.response_component.terms.values() + if not isinstance(x, GroupSpecificTerm) + } + priors1 = { + x.name: x.prior.args + for x in model1.response_component.terms.values() + if not isinstance(x, GroupSpecificTerm) + } + + # check dictionary keys + assert set(priors0) == set(priors1) + + # check dictionary values + def dicts_close(a, b): + if set(a) != set(b): + return False + else: + return [np.allclose(a[x], b[x], atol=0, rtol=0.01) for x in a.keys()] + + assert all([dicts_close(priors0[x], priors1[x]) for x in priors0.keys()]) + + # check that fit and add models have same priors for group specific effects + priors0 = { + x.name: x.prior.args["sigma"].args + for x in model0.response_component.group_specific_terms.values() + } + priors1 = { + x.name: x.prior.args["sigma"].args + for x in model1.response_component.group_specific_terms.values() + } + + # check dictionary keys + assert set(priors0) == set(priors1) + + # check dictionary values + def dicts_close(a, b): + if set(a) != set(b): + return False + else: + return [np.allclose(a[x], b[x], atol=0, rtol=0.01) for x in a.keys()] + + assert all([dicts_close(priors0[x], priors1[x]) for x in priors0.keys()]) + + def test_cell_means_with_many_group_specific_effects(self, crossed_data): + # Group specific intercepts are added in different way, but the final result should be the same. + terms0 = [ + "0", + "threecats", + "(threecats|subj)", + "(1|subj)", + "(0 + continuous|item)", + "(dummy|item)", + "(0 + threecats|site)", + "(1|site)", + ] + + terms1 = [ + "0", + "threecats", + "(threecats|subj)", + "(continuous|item)", + "(dummy|item)", + "(threecats|site)", + ] + model0 = bmb.Model("Y ~ " + " + ".join(terms0), crossed_data) + idata0 = self.fit(model0) + self.predict_oos(model0, idata0) + + model1 = bmb.Model("Y ~ " + " + ".join(terms1), crossed_data) + idata1 = self.fit(model1) + self.predict_oos(model1, idata1) + + # check that the group specific effects design matrices have the same shape + X0 = pd.concat( + [ + pd.DataFrame(t.data) + if not isinstance(t.data, dict) + else pd.concat([pd.DataFrame(t.data[x]) for x in t.data.keys()], axis=1) + for t in model0.response_component.group_specific_terms.values() + ], + axis=1, + ) + X1 = pd.concat( + [ + pd.DataFrame(t.data) + if not isinstance(t.data, dict) + else pd.concat([pd.DataFrame(t.data[x]) for x in t.data.keys()], axis=1) + for t in model0.response_component.group_specific_terms.values() + ], + axis=1, + ) + assert X0.shape == X1.shape + + # check that the group specific effect design matrix contain the same columns, + # even if term names / columns names / order of columns is different + X0_set = set(tuple(X0.iloc[:, i]) for i in range(len(X0.columns))) + X1_set = set(tuple(X1.iloc[:, i]) for i in range(len(X1.columns))) + assert X0_set == X1_set + + # check that common effect design matrices are the same, + # even if term names / level names / order of columns is different + X0 = set( + [ + tuple(t.data[:, lev]) + for t in model0.response_component.common_terms.values() + for lev in range(len(t.levels)) + ] + ) + X1 = set( + [ + tuple(t.data[:, lev]) + for t in model1.response_component.common_terms.values() + for lev in range(len(t.levels)) + ] + ) + assert X0 == X1 + + # check that fit and add models have same priors for common effects + priors0 = { + x.name: x.prior.args + for x in model0.response_component.terms.values() + if not isinstance(x, GroupSpecificTerm) + } + priors1 = { + x.name: x.prior.args + for x in model1.response_component.terms.values() + if not isinstance(x, GroupSpecificTerm) + } + assert set(priors0) == set(priors1) + + # check that fit and add models have same priors for group specific effects + priors0 = { + x.name: x.prior.args["sigma"].args + for x in model0.response_component.terms.values() + if isinstance(x, GroupSpecificTerm) + } + priors1 = { + x.name: x.prior.args["sigma"].args + for x in model1.response_component.terms.values() + if isinstance(x, GroupSpecificTerm) + } + assert set(priors0) == set(priors1) + + def test_group_specific_categorical_interaction(self, crossed_data): + model = bmb.Model("Y ~ continuous + (threecats:fourcats|site)", crossed_data) + idata = self.fit(model) + self.predict_oos(model, idata) + + assert list(idata.posterior.data_vars) == [ + "Intercept", + "continuous", + "Y_sigma", + "1|site_sigma", + "threecats:fourcats|site_sigma", + "1|site", + "threecats:fourcats|site", + ] + assert list(idata.posterior["threecats:fourcats|site"].coords) == [ + "chain", + "draw", + "site__factor_dim", + "threecats:fourcats__expr_dim", + ] + assert list(idata.posterior["1|site"].coords) == ["chain", "draw", "site__factor_dim"] + assert list(idata.posterior["1|site_sigma"].coords) == ["chain", "draw"] + assert list(idata.posterior["threecats:fourcats|site_sigma"].coords) == [ + "chain", + "draw", + "threecats:fourcats__expr_dim", + ] + + assert list(idata.posterior["threecats:fourcats__expr_dim"].values) == [ + "b, b", + "b, c", + "b, d", + "c, b", + "c, c", + "c, d", + ] + assert list(idata.posterior["site__factor_dim"].values) == ["0", "1", "2", "3", "4"] + + def test_fit_include_mean(self, crossed_data): + draws = 100 + model = bmb.Model("Y ~ continuous * threecats", crossed_data) + idata = model.fit(tune=draws, draws=draws, include_mean=True) + assert idata.posterior["Y_mean"].shape[1:] == (draws, 120) + + # Compare with the mean obtained with `model.predict()` + mean = idata.posterior["Y_mean"].stack(sample=("chain", "draw")).values.mean(1) + + model.predict(idata) + predicted_mean = idata.posterior["Y_mean"].stack(sample=("chain", "draw")).values.mean(1) + + assert np.array_equal(mean, predicted_mean) + + def test_group_specific_splines(self): + x_check = pd.DataFrame( + { + "x": [ + 82.0, + 143.0, + 426.0, + 641.0, + 1156.0, + 986.0, + 365.0, + 187.0, + 254.0, + 550.0, + 101.0, + 661.0, + 327.0, + 119.0, + ], + "day": ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"] * 2, + "y": [ + 571.0, + 684.0, + 1652.0, + 2130.0, + 2455.0, + 1874.0, + 1288.0, + 1011.0, + 1004.0, + 1993.0, + 593.0, + 1986.0, + 1503.0, + 711.0, + ], + } + ) + knots = np.array([191.0, 297.0, 512.5]) + + model = bmb.Model("y ~ (bs(x, knots=knots, intercept=False, degree=1)|day)", data=x_check) + idata = self.fit(model) + self.predict_oos(model, idata) + + +class TestBernoulli(FitPredictParent): + def assert_posterior_predictive_range(self, model, idata): + y_name = model.response_component.response_term.name + y_posterior_predictive = idata.posterior_predictive[y_name].to_numpy() + assert set(np.unique(y_posterior_predictive)) == {0, 1} + + def assert_mean_range(self, model, idata): + y_mean_name = model.response_component.response_term.name + "_mean" + y_mean_posterior = idata.posterior[y_mean_name].to_numpy() + assert ((0 < y_mean_posterior) & (y_mean_posterior < 1)).all() + + def test_bernoulli_empty_index(self, data_n100): + model = bmb.Model("b1 ~ 1 + y1", data_n100, family="bernoulli") + idata = self.fit(model) + model.predict(idata, kind="pps") + self.assert_mean_range(model, idata) + self.assert_posterior_predictive_range(model, idata) + + # out of sample prediction + idata = self.predict_oos(model, idata) + self.assert_mean_range(model, idata) + self.assert_posterior_predictive_range(model, idata) + + def test_bernoulli_good_numeric(self, data_n100): + model = bmb.Model("b1 ~ y1", data_n100, family="bernoulli") + idata = self.fit(model) + model.predict(idata, kind="pps") + self.assert_mean_range(model, idata) + self.assert_posterior_predictive_range(model, idata) + + def test_bernoulli_bad_numeric(self, data_n100): + error_msg = "Numeric response must be all 0 and 1 for 'bernoulli' family" + with pytest.raises(ValueError, match=error_msg): + model = bmb.Model("y1 ~ y2", data_n100, family="bernoulli") + self.fit(model) + + def test_categorical_group_specific(self, data_n100): + # see https://github.com/bambinos/bambi/issues/447 + + formula = "b1 ~ 0 + cat2 + y2 + (0 + cat2|cat5) + (0 + y2| cat4 + cat5)" + model = bmb.Model(formula, data_n100, family="bernoulli") + idata = self.fit(model, chains=2) + idata = self.predict_oos(model, idata) + + self.assert_mean_range(model, idata) + self.assert_posterior_predictive_range(model, idata) + + +class TestBinomial(FitPredictParent): + def assert_mean_range(self, model, idata): + y_mean_name = model.response_component.response_term.name + "_mean" + y_mean_posterior = idata.posterior[y_mean_name].to_numpy() + assert ((0 < y_mean_posterior) & (y_mean_posterior < 1)).all() + + def test_binomial_regression(self, beetle_data): + model = bmb.Model("prop(y, n) ~ x", beetle_data, family="binomial") + idata = self.fit(model) + model.predict(idata, kind="pps") + self.assert_mean_range(model, idata) + y_reshaped = beetle_data["n"].to_numpy()[None, None, :] + + assert (idata.posterior_predictive["prop(y, n)"].to_numpy() <= y_reshaped).all() + assert (0 <= idata.posterior_predictive["prop(y, n)"].to_numpy()).all() + + y_reshaped = beetle_data["n"].to_numpy()[None, None, :3] + idata = self.predict_oos(model, idata, data=model.data.head(3)) + self.assert_mean_range(model, idata) + assert (idata.posterior_predictive["prop(y, n)"].to_numpy() <= y_reshaped).all() + + def test_binomial_regression_constant(self, beetle_data): + # Uses a constant instead of variable in data frame + model = bmb.Model("prop(y, 62) ~ x", beetle_data, family="binomial") + idata = self.fit(model) + model.predict(idata, kind="pps") + self.assert_mean_range(model, idata) + assert (idata.posterior_predictive["prop(y, 62)"].to_numpy() <= 62).all() + assert (0 <= idata.posterior_predictive["prop(y, 62)"].to_numpy()).all() + + # Out of sample prediction + idata = self.predict_oos(model, idata) + self.assert_mean_range(model, idata) + assert (idata.posterior_predictive["prop(y, 62)"].to_numpy() <= 62).all() + + +class TestPoisson(FitPredictParent): + def assert_mean_range(self, model, idata): + y_mean_name = model.response_component.response_term.name + "_mean" + y_mean_posterior = idata.posterior[y_mean_name].to_numpy() + assert (y_mean_posterior > 0).all() + + def test_poisson_regression(self, crossed_data): + crossed_data["count"] = (crossed_data["Y"] - crossed_data["Y"].min()).round() + model0 = bmb.Model("count ~ dummy + continuous + threecats", crossed_data, family="poisson") + idata0 = self.fit(model0) + idata0 = self.predict_oos(model0, idata0) + self.assert_mean_range(model0, idata0) + + # build model using add + model1 = bmb.Model("count ~ threecats + continuous + dummy", crossed_data, family="poisson") + idata1 = self.fit(model1) + idata1 = self.predict_oos(model1, idata1) + self.assert_mean_range(model1, idata1) + + # check that term names agree + assert set(model0.response_component.terms) == set(model1.response_component.terms) + + # check that common effect design matrices are the same, + # even if term names / level names / order of columns is different + + X0_list = [] + X1_list = [] + for term in model0.response_component.common_terms.values(): + if term.levels is not None: + for level_idx in range(len(term.levels)): + X0_list.append(tuple(term.data[:, level_idx])) + else: + X0_list.append(tuple(term.data)) + + for term in model1.response_component.common_terms.values(): + if term.levels is not None: + for level_idx in range(len(term.levels)): + X1_list.append(tuple(term.data[:, level_idx])) + else: + X1_list.append(tuple(term.data)) + + assert set(X0_list) == set(X1_list) + + # check that models have same priors for common effects + priors0 = { + x.name: x.prior.args + for x in model0.response_component.terms.values() + if not isinstance(x, GroupSpecificTerm) + } + priors1 = { + x.name: x.prior.args + for x in model1.response_component.terms.values() + if not isinstance(x, GroupSpecificTerm) + } + # check dictionary keys + assert set(priors0) == set(priors1) + # check dictionary values + def dicts_close(a, b): + if set(a) != set(b): + return False + else: + return [np.allclose(a[x], b[x], atol=0, rtol=0.01) for x in a.keys()] + + assert all([dicts_close(priors0[x], priors1[x]) for x in priors0.keys()]) + + # Now test prior predictive + pps = model1.prior_predictive(draws=200, random_seed=1234) + + keys = ["Intercept", "threecats", "continuous", "dummy"] + shapes = [(1, 200), (1, 200, 2), (1, 200), (1, 200)] + + for key, shape in zip(keys, shapes): + assert pps.prior[key].shape == shape + + assert pps.prior_predictive["count"].shape == (1, 200, 120) + assert pps.observed_data["count"].shape == (120,) + + pps = model1.prior_predictive(draws=200, var_names=["count"], random_seed=1234) + assert pps.groups() == ["prior_predictive", "observed_data"] + + pps = model1.prior_predictive(draws=200, var_names=["Intercept"], random_seed=1234) + assert pps.groups() == ["prior", "observed_data"] + + # Now test posterior predictive + # Fit again to make sure we fix the number of chainS + idata = model1.fit(tune=50, draws=50, chains=2) + pps = model1.predict(idata, kind="pps", inplace=False) + assert pps.posterior_predictive["count"].shape == (2, 50, 120) + + pps = model1.predict(idata, kind="pps", inplace=True) + assert pps is None + assert idata.posterior_predictive["count"].shape == (2, 50, 120) + + +class TestNegativeBinomial(FitPredictParent): + # To Do: Could be modified to follow the same format than the others + def test_predict_negativebinomial(self, data_n100): + + model = bmb.Model("n1 ~ y1", data_n100, family="negativebinomial") + idata = self.fit(model) + + model.predict(idata, kind="mean") + assert (0 < idata.posterior["n1_mean"]).all() + + model.predict(idata, kind="pps") + assert (np.equal(np.mod(idata.posterior_predictive["n1"].values, 1), 0)).all() + + model.predict(idata, kind="mean", data=data_n100.iloc[:20, :]) + assert (0 < idata.posterior["n1_mean"]).all() + + model.predict(idata, kind="pps", data=data_n100.iloc[:20, :]) + assert (np.equal(np.mod(idata.posterior_predictive["n1"].values, 1), 0)).all() + + +class TestLaplace(FitPredictParent): + def test_laplace_regression(self, data_n100): + model = bmb.Model("y1 ~ y2", data_n100, family="laplace") + idata = self.fit(model) + assert set(idata.posterior.data_vars) == {"Intercept", "y2", "y1_b"} + assert (idata.posterior["y1_b"] > 0).all().item() + + idata = self.predict_oos(model, idata) + assert "y1_mean" in idata.posterior + + +class TestGamma(FitPredictParent): + def test_gamma_regression(self, data_n100): + # Construct a positive variable + data_n100["o"] = np.exp(data_n100["y1"]) + model = bmb.Model("o ~ y2 + y3 + n1 + cat4", data_n100, family="gamma", link="log") + idata = self.fit(model) + assert set(idata.posterior.data_vars) == {"Intercept", "y2", "y3", "n1", "cat4", "o_alpha"} + idata = self.predict_oos(model, idata) + assert (idata.posterior_predictive["o"] > 0).all().item() + + def test_gamma_regression_categoric(self, data_n100): + data_n100["o"] = np.exp(data_n100["y1"]) + model = bmb.Model("o ~ 0 + cat2:cat4", data_n100, family="gamma", link="log") + idata = self.fit(model) + assert set(idata.posterior.data_vars) == {"cat2:cat4", "o_alpha"} + idata = self.predict_oos(model, idata) + assert (idata.posterior_predictive["o"] > 0).all().item() + + +class TestBeta(FitPredictParent): + def test_beta_regression(self, gasoline_data): + model = bmb.Model( + "yield ~ temp + batch", gasoline_data, family="beta", categorical="batch" + ) + idata = self.fit(model, target_accept=0.9, random_seed=1234) + + # To Do: Could be adjusted but this is what we had before + model.predict(idata, kind="mean") + model.predict(idata, kind="pps") + + assert (0 < idata.posterior["yield_mean"]).all() & (idata.posterior["yield_mean"] < 1).all() + assert (0 < idata.posterior_predictive["yield"]).all() & ( + idata.posterior_predictive["yield"] < 1 + ).all() + + model.predict(idata, kind="mean", data=gasoline_data.iloc[:20, :]) + model.predict(idata, kind="pps", data=gasoline_data.iloc[:20, :]) + + assert (0 < idata.posterior["yield_mean"]).all() & (idata.posterior["yield_mean"] < 1).all() + assert (0 < idata.posterior_predictive["yield"]).all() & ( + idata.posterior_predictive["yield"] < 1 + ).all() + + +class TestStudentT(FitPredictParent): + def test_t_regression(self, data_n100): + model = bmb.Model("y1 ~ y2", data_n100, family="t") + idata = self.fit(model) + assert set(idata.posterior.data_vars) == {"Intercept", "y2", "y1_nu", "y1_sigma"} + self.predict_oos(model, idata) + + +class TestVonMises(FitPredictParent): + def test_vonmises_regression(self): + rng = np.random.default_rng(1234) + data = pd.DataFrame({"y": rng.vonmises(0, 1, size=100), "x": rng.normal(size=100)}) + model = bmb.Model("y ~ x", data, family="vonmises") + idata = self.fit(model) + assert set(idata.posterior.data_vars) == {"Intercept", "x", "y_kappa"} + idata = self.predict_oos(model, idata) + assert (idata.posterior_predictive["y"].min() >= -np.pi).item() and ( + idata.posterior_predictive["y"].max() <= np.pi + ).item() + + +class TestAsymmetricLaplace(FitPredictParent): + # This test doesn't follow the previous pattern but it works... + def test_quantile_regression(self): + rng = np.random.default_rng(1234) + x = rng.uniform(2, 10, 100) + y = 2 * x + rng.normal(0, 0.6 * x**0.75) + data = pd.DataFrame({"x": x, "y": y}) + bmb_model0 = bmb.Model("y ~ x", data, family="asymmetriclaplace", priors={"kappa": 9}) + idata0 = bmb_model0.fit() + bmb_model0.predict(idata0) + + bmb_model1 = bmb.Model("y ~ x", data, family="asymmetriclaplace", priors={"kappa": 0.1}) + idata1 = bmb_model1.fit() + bmb_model1.predict(idata1) + + assert np.all( + idata0.posterior["y_mean"].mean(("chain", "draw")) + > idata1.posterior["y_mean"].mean(("chain", "draw")) + ) + + +class TestCategorical(FitPredictParent): + # assert pps.shape[-1] == inhaler.shape[0] + def assert_mean_sum(self, model, idata): + y_mean_name = model.response_component.response_term.name + "_mean" + y_dim = model.response_component.response_term.name + "_dim" + y_mean_posterior = idata.posterior[y_mean_name] + assert np.allclose(y_mean_posterior.sum(y_dim).to_numpy(), 1) + + def assert_mean_range(self, model, idata): + y_mean_name = model.response_component.response_term.name + "_mean" + y_mean_posterior = idata.posterior[y_mean_name].to_numpy() + assert ((0 < y_mean_posterior) & (y_mean_posterior < 1)).all() + + def assert_posterior_predictive_range(self, model, idata, n): + y_name = model.response_component.response_term.name + y_posterior_predictive = idata.posterior_predictive[y_name].to_numpy() + assert set(np.unique(y_posterior_predictive)).issubset(set(range(n))) + + def test_basic(self, inhaler_data): + model = bmb.Model("rating ~ period + carry + treat", inhaler_data, family="categorical") + idata = self.fit(model) + + for name in ["Intercept", "period", "carry", "treat"]: + assert list(idata.posterior[name].coords) == ["chain", "draw", "rating_reduced_dim"] + + assert list(idata.posterior.coords["rating_reduced_dim"].values) == ["2", "3", "4"] + + idata = self.predict_oos(model, idata) + assert list(idata.posterior["rating_mean"].coords) == [ + "chain", + "draw", + "rating_obs", + "rating_dim", + ] + assert list(idata.posterior.coords["rating_dim"].values) == ["1", "2", "3", "4"] + self.assert_mean_range(model, idata) + self.assert_mean_sum(model, idata) + self.assert_posterior_predictive_range(model, idata, len(np.unique(inhaler_data["rating"]))) + + def test_varying_intercept(self, inhaler_data): + formula = "rating ~ period + carry + treat + (1|subject)" + model = bmb.Model(formula, inhaler_data, family="categorical") + idata = self.fit(model) + + for name in ["Intercept", "period", "carry", "treat"]: + assert set(idata.posterior[name].coords) == {"chain", "draw", "rating_reduced_dim"} + + assert set(idata.posterior["1|subject"].coords) == { + "chain", + "draw", + "rating_reduced_dim", + "subject__factor_dim", + } + + assert ( + idata.posterior["subject__factor_dim"].values + == np.unique(inhaler_data["subject"]).astype(str) + ).all() + + assert list(idata.posterior.coords["rating_reduced_dim"].values) == ["2", "3", "4"] + + idata = self.predict_oos(model, idata) + assert set(idata.posterior["rating_mean"].coords) == { + "chain", + "draw", + "rating_obs", + "rating_dim", + } + assert list(idata.posterior.coords["rating_dim"].values) == ["1", "2", "3", "4"] + self.assert_mean_range(model, idata) + self.assert_mean_sum(model, idata) + self.assert_posterior_predictive_range(model, idata, len(np.unique(inhaler_data["rating"]))) + + def test_categorical_predictors(self, cat_response_cat_preds_data): + formula = "response ~ group + city" + model = bmb.Model(formula, cat_response_cat_preds_data, family="categorical") + idata = self.fit(model) + + assert set(idata.posterior["group"].coords) == { + "chain", + "draw", + "response_reduced_dim", + "group_dim", + } + assert set(idata.posterior["city"].coords) == { + "chain", + "draw", + "response_reduced_dim", + "city_dim", + } + assert list(idata.posterior["group_dim"].values) == ["group 2", "group 3"] + assert list(idata.posterior["city_dim"].values) == ["Rosario", "San Luis"] + assert list(idata.posterior["response_reduced_dim"].values) == ["B", "C", "D"] + + idata = self.predict_oos(model, idata) + assert list(idata.posterior["response_dim"].values) == ["A", "B", "C", "D"] + self.assert_mean_range(model, idata) + self.assert_mean_sum(model, idata) + self.assert_posterior_predictive_range(model, idata, 4) + + +class TestZeroInflatedFamilies(FitPredictParent): + @pytest.mark.parametrize( + "formula, data_name, family, priors", + [ # Zero Inflated Poisson + (bmb.Formula("y ~ x"), "zi_count_data", "zero_inflated_poisson", None), + (bmb.Formula("y ~ x", "psi ~ x"), "zi_count_data", "zero_inflated_poisson", None), + # Zero Inflated Negative Binomial + ( + bmb.Formula("y ~ x"), + "zi_count_data", + "zero_inflated_negativebinomial", + {"alpha": bmb.Prior("HalfNormal", sigma=20)}, + ), + ( + bmb.Formula("y ~ x", "psi ~ x"), + "zi_count_data", + "zero_inflated_negativebinomial", + {"alpha": bmb.Prior("HalfNormal", sigma=20)}, + ), + # Zero Inflated Binomial + (bmb.Formula("p(y, 30) ~ 1"), "zi_bounded_count_data", "zero_inflated_binomial", None), + ( + bmb.Formula("p(y, 30) ~ 1", "psi ~ x"), + "zi_bounded_count_data", + "zero_inflated_binomial", + None, + ), + ], + ) + def test_family(self, formula, data_name, family, priors, request): + data = request.getfixturevalue(data_name) + model = bmb.Model(formula, data, priors=priors, family=family) + idata = self.fit(model) + self.predict_oos(model, idata) + + +class TestHurdle(FitPredictParent): + @pytest.mark.parametrize( + "data_name, family", + [ + ("zi_count_data", "hurdle_poisson"), + ("zi_count_data", "hurdle_negativebinomial"), + ("zi_continuous_data", "hurdle_gamma"), + ("zi_continuous_data", "hurdle_lognormal"), + ], + ) + def test_hurlde_families(self, data_name, family, request): + # To access 'data' which is a fixture + data = request.getfixturevalue(data_name) + model = bmb.Model("y ~ 1", data, family=family) + idata = self.fit(model, random_seed=1234) + self.predict_oos(model, idata) + + +class TestOrdinal(FitPredictParent): + @pytest.mark.parametrize( + "family, link", + [ + ("cumulative", "logit"), + ("cumulative", "probit"), + ("cumulative", "cloglog"), + ("sratio", "logit"), + ("sratio", "probit"), + ("sratio", "cloglog"), + ], + ) + def test_ordinal_families(self, inhaler_data, family, link): + # To have both numeric and categoric predictors + inhaler_data["carry"] = pd.Categorical(inhaler_data["carry"]) + model = bmb.Model("rating ~ period + carry + treat", inhaler_data, family=family, link=link) + idata = self.fit(model, random_seed=1234) + idata = self.predict_oos(model, idata) + + assert np.allclose(idata.posterior["rating_mean"].sum("rating_dim").to_numpy(), 1) + assert set(np.unique(idata.posterior_predictive["rating"])).issubset({0, 1, 2, 3}) + + def test_cumulative_family_priors(self, inhaler_data): + priors = { + "threshold": bmb.Prior( + "Normal", + mu=[-0.5, 0, 0.5], + sigma=1.5, + transform=pm.distributions.transforms.ordered, + ) + } + model = bmb.Model( + "rating ~ 0 + period + carry + treat", inhaler_data, family="cumulative", priors=priors + ) + idata = self.fit(model, random_seed=1234) + self.predict_oos(model, idata) + + +class TestCensoredResponses(FitPredictParent): + def test_model_with_intercept(self, kidney_data): + priors = { + "Intercept": bmb.Prior("Normal", mu=0, sigma=1), + "sex": bmb.Prior("Normal", mu=0, sigma=2), + "age": bmb.Prior("Normal", mu=0, sigma=1), + "alpha": bmb.Prior("Gamma", alpha=3, beta=5), + } + model = bmb.Model( + "censored(time, status) ~ 1 + sex + age", + kidney_data, + family="weibull", + link="log", + priors=priors, + ) + idata = self.fit(model, random_seed=121195) + self.predict_oos(model, idata) + # Assert response is censored + assert isinstance(model.backend.model.observed_RVs[0]._owner.op, pm.Censored.rv_type) + + def test_model_without_intercept(self, kidney_data): + priors = { + "sex": bmb.Prior("Normal", mu=0, sigma=2), + "age": bmb.Prior("Normal", mu=0, sigma=1), + "alpha": bmb.Prior("Gamma", alpha=3, beta=5), + } + model = bmb.Model( + "censored(time, status) ~ 0 + sex + age", + kidney_data, + family="weibull", + link="log", + priors=priors, + ) + idata = self.fit(model, random_seed=121195) + self.predict_oos(model, idata) + # Assert response is censored + assert isinstance(model.backend.model.observed_RVs[0]._owner.op, pm.Censored.rv_type) + + def test_model_with_group_specific_effects(self, kidney_data): + # Model 3, with group-specific effects + priors = { + "alpha": bmb.Prior("Gamma", alpha=3, beta=5), + "sex": bmb.Prior("Normal", mu=0, sigma=1), + "age": bmb.Prior("Normal", mu=0, sigma=1), + "1|patient": bmb.Prior( + "Normal", mu=0, sigma=bmb.Prior("InverseGamma", alpha=5, beta=10) + ), + } + model = bmb.Model( + "censored(time, status) ~ 1 + sex + age + (1|patient)", + kidney_data, + family="weibull", + link="log", + priors=priors, + ) + idata = self.fit(model, random_seed=121195) + self.predict_oos(model, idata) + # Assert response is censored + assert isinstance(model.backend.model.observed_RVs[0]._owner.op, pm.Censored.rv_type) + + +class TestTruncatedResponse(FitPredictParent): + def test_truncated_response(self, truncated_data): + priors = { + "Intercept": bmb.Prior("Normal", mu=0, sigma=1), + "x": bmb.Prior("Normal", mu=0, sigma=1), + "sigma": bmb.Prior("HalfNormal", sigma=1), + } + model = bmb.Model("truncated(y, -5, 5) ~ x", truncated_data, priors=priors) + idata = self.fit(model, random_seed=121195) + self.predict_oos(model, idata) + # PyMC seems to automatically dispatch to TruncatedNormal + assert isinstance(model.backend.model.observed_RVs[0]._owner.op, pm.TruncatedNormal.rv_type) + + +class TestMultinomial(FitPredictParent): + def assert_posterior_predictive(self, model, idata): + y_name = model.response_component.response_term.name + y_posterior_predictive = idata.posterior_predictive[y_name].to_numpy() + assert (y_posterior_predictive.sum(-1).var((0, 1)) == 0).all() + + def test_intercept_only(self, multinomial_data): + model = bmb.Model("c(y1, y2, y3, y4) ~ 1", multinomial_data, family="multinomial") + idata = self.fit(model, random_seed=121195) + idata = self.predict_oos(model, idata, data=model.data) + self.assert_posterior_predictive(model, idata) + + def test_numerical_predictors(self, multinomial_data): + model = bmb.Model( + "c(y1, y2, y3, y4) ~ treat + carry", multinomial_data, family="multinomial" + ) + idata = self.fit(model, random_seed=121195) + idata = self.predict_oos(model, idata, data=model.data) + self.assert_posterior_predictive(model, idata) + + def test_categorical_predictors(self, multinomial_data): + multinomial_data["treat"] = multinomial_data["treat"].replace({-0.5: "A", 0.5: "B"}) + multinomial_data["carry"] = multinomial_data["carry"].replace({-1: "a", 0: "b", 1: "c"}) + + model = bmb.Model( + "c(y1, y2, y3, y4) ~ treat + carry", multinomial_data, family="multinomial" + ) + idata = self.fit(model, random_seed=121195) + idata = self.predict_oos(model, idata, data=model.data) + self.assert_posterior_predictive(model, idata) + + def test_group_specific_effects(self): + data = pd.DataFrame( + { + "state": ["A", "B", "C"], + "y1": [35298, 1885, 5775], + "y2": [167328, 20731, 21564], + "y3": [212682, 37716, 20222], + "y4": [37966, 5196, 3277], + } + ) + + model = bmb.Model( + "c(y1, y2, y3, y4) ~ 1 + (1 | state)", data, family="multinomial", noncentered=False + ) + idata = self.fit(model, random_seed=121195) + idata = self.predict_oos(model, idata, data=model.data) + self.assert_posterior_predictive(model, idata) + + +class TestDirichletMultinomial(FitPredictParent): + def assert_posterior_predictive(self, model, idata): + y_name = model.response_component.response_term.name + y_posterior_predictive = idata.posterior_predictive[y_name].to_numpy() + assert (y_posterior_predictive.sum(-1).var((0, 1)) == 0).all() + + def test_intercept_only(self, multinomial_data): + model = bmb.Model("c(y1, y2, y3, y4) ~ 1", multinomial_data, family="dirichlet_multinomial") + idata = self.fit(model) + idata = self.predict_oos(model, idata, model.data) + self.assert_posterior_predictive(model, idata) + + def test_predictor(self, multinomial_data): + model = bmb.Model( + "c(y1, y2, y3, y4) ~ 0 + treat", multinomial_data, family="dirichlet_multinomial" + ) + idata = self.fit(model) + idata = self.predict_oos(model, idata, model.data) + self.assert_posterior_predictive(model, idata) + + +class TestBetaBinomial(FitPredictParent): + def test_basic(self, beetle_data): + model = bmb.Model("prop(y, n) ~ x", beetle_data, family="beta_binomial") + idata = model.fit(draws=100, tune=100) + idata = self.fit(model) + idata = self.predict_oos(model, idata, model.data) + n = beetle_data["n"].to_numpy() + assert np.all( + idata.posterior_predictive["prop(y, n)"].values <= n[np.newaxis, np.newaxis, :] + ) + + +def test_wald_family(data_n100): + data_n100["y"] = np.exp(data_n100["y1"]) + priors = {"common": bmb.Prior("Normal", mu=0, sigma=1)} + model = bmb.Model("y ~ y2", data_n100, family="wald", link="log", priors=priors) + idata = model.fit(tune=DRAWS, draws=DRAWS, random_seed=1234) + + model.predict(idata, kind="mean") + model.predict(idata, kind="pps") + + assert (0 < idata.posterior["y_mean"]).all() + assert (0 < idata.posterior_predictive["y"]).all() + + model.predict(idata, kind="mean", data=data_n100.iloc[:20, :]) + model.predict(idata, kind="pps", data=data_n100.iloc[:20, :]) + + assert (0 < idata.posterior["y_mean"]).all() + assert (0 < idata.posterior_predictive["y"]).all() + + +def test_predict_include_group_specific(): + rng = np.random.default_rng(1234) + size = 100 + + data = pd.DataFrame( + { + "y": rng.choice([0, 1], size=size), + "x1": rng.choice(list("abcd"), size=size), + } + ) + + model = bmb.Model("y ~ 1 + (1|x1)", data, family="bernoulli") + idata = model.fit(tune=DRAWS, draws=DRAWS, random_seed=1234) + idata_1 = model.predict(idata, data=data, inplace=False, include_group_specific=True) + idata_2 = model.predict(idata, data=data, inplace=False, include_group_specific=False) + + assert not np.isclose( + idata_1.posterior["y_mean"].values, + idata_2.posterior["y_mean"].values, + ).all() + + # Since it's an intercept-only model, predictions are the same for all observations if + # we drop group-specific terms. + assert (idata_2.posterior["y_mean"] == idata_2.posterior["y_mean"][:, :, 0]).all() + + # When we include group-specific terms, these predictions are different + assert not (idata_1.posterior["y_mean"] == idata_1.posterior["y_mean"][:, :, 0]).all() + + +def test_predict_offset(): + # Simple case + data = bmb.load_data("carclaims") + model = bmb.Model("numclaims ~ offset(np.log(exposure))", data, family="poisson", link="log") + idata = model.fit(tune=DRAWS, draws=DRAWS, random_seed=1234) + model.predict(idata) + model.predict(idata, kind="pps") + + # More complex case + rng = np.random.default_rng(121195) + data = pd.DataFrame( + { + "y": rng.poisson(20, size=100), + "x": rng.normal(size=100), + "group": np.tile(np.arange(10), 10), + } + ) + data["time"] = data["y"] - rng.normal(loc=1, size=100) + model = bmb.Model("y ~ offset(np.log(time)) + x + (1 | group)", data, family="poisson") + idata = model.fit(tune=DRAWS, draws=DRAWS, target_accept=0.9, random_seed=1234) + model.predict(idata) + model.predict(idata, kind="pps") + + +def test_predict_new_groups_fail(sleepstudy): + model = bmb.Model("Reaction ~ 1 + Days + (1 + Days | Subject)", sleepstudy) + idata = model.fit(tune=20, draws=20) + + df_new = sleepstudy.head(10).reset_index(drop=True) + df_new["Subject"] = "xxx" + to_match = "There are new groups for the factors ('Subject',) and 'sample_new_groups' is False." + with pytest.raises(ValueError, match=re.escape(to_match)): + model.predict(idata, data=df_new) + + +@pytest.mark.parametrize( + "data,formula,family,df_new", + [ + ( + "sleepstudy", + "Reaction ~ 1 + Days + (1 + Days | Subject)", + "gaussian", + pd.DataFrame({"Days": [1, 2, 3], "Subject": ["x", "y", "z"]}), + ), + ( + "inhaler_data", + "rating ~ 1 + period + treat + (1 + treat|subject)", + "categorical", + pd.DataFrame( + { + "subject": [1, 999], + "rating": [1, 1], + "treat": [0.5, 0.5], + "period": [0.5, 0.5], + "carry": [0, 0], + } + ), + ), + ( + "crossed_data", + "Y ~ 0 + threecats + (0 + threecats | subj)", + "gaussian", + pd.DataFrame({"threecats": ["a", "a"], "subj": ["0", "11"]}), + ), + ], +) +def test_predict_new_groups(data, formula, family, df_new, request): + data = request.getfixturevalue(data) + model = bmb.Model(formula, data, family=family) + idata = model.fit(tune=100, draws=100) + model.predict(idata, data=df_new, sample_new_groups=True) diff --git a/tests/test_predict.py b/tests/test_predict.py deleted file mode 100644 index b3e401b13..000000000 --- a/tests/test_predict.py +++ /dev/null @@ -1,429 +0,0 @@ -from os.path import dirname, join - -import numpy as np -import pandas as pd -import pytest - -import bambi as bmb - - -@pytest.fixture(scope="module") -def data_numeric_xy(): - rng = np.random.default_rng(121195) - x = rng.uniform(size=100) - y = x + rng.normal(scale=0.5, size=100) - data = pd.DataFrame({"y": y, "x": x}) - return data - - -@pytest.fixture(scope="module") -def data_bernoulli(): - # Taken from https://juanitorduz.github.io/glm_pymc3/ - rng = np.random.default_rng(121195) - n = 250 - x1 = rng.normal(loc=0.0, scale=2.0, size=n) - x2 = rng.normal(loc=0.0, scale=2.0, size=n) - intercept = -0.5 - beta_x1 = 1 - beta_x2 = -1 - beta_interaction = 2 - z = intercept + beta_x1 * x1 + beta_x2 * x2 + beta_interaction * x1 * x2 - p = 1 / (1 + np.exp(-z)) - y = rng.binomial(n=1, p=p, size=n) - df = pd.DataFrame(dict(x1=x1, x2=x2, y=y)) - return df - - -@pytest.fixture(scope="module") -def data_beta(): - return pd.read_csv(join(dirname(__file__), "data", "gasoline.csv")) - - -@pytest.fixture(scope="module") -def data_gamma(): - rng = np.random.default_rng(121195) - N = 200 - a, b, shape = 0.5, 1.1, 10 - x = rng.uniform(-1, 1, N) - y = rng.gamma(shape, np.exp(a + b * x) / shape, N) - data = pd.DataFrame({"x": x, "y": y}) - return data - - -@pytest.fixture(scope="module") -def data_count(): - rng = np.random.default_rng(121195) - data = pd.DataFrame({"y": rng.poisson(list(range(10)) * 10), "x": rng.uniform(size=100)}) - return data - - -@pytest.fixture(scope="module") -def inhaler(): - data_dir = join(dirname(__file__), "data") - data = pd.read_csv(join(data_dir, "inhaler.csv")) - data["rating"] = pd.Categorical(data["rating"], categories=[1, 2, 3, 4]) - return data - - -def test_predict_bernoulli(data_bernoulli): - data = data_bernoulli - model = bmb.Model("y ~ x1*x2", data, family="bernoulli") - idata = model.fit(tune=100, draws=100, target_accept=0.9) - - # In sample prediction - model.predict(idata, kind="mean") - model.predict(idata, kind="pps") - - assert (0 < idata.posterior["y_mean"]).all() & (idata.posterior["y_mean"] < 1).all() - assert (idata.posterior_predictive["y"].isin([0, 1])).all() - - # Out of sample prediction - model.predict(idata, kind="mean", data=data.iloc[:20, :]) - model.predict(idata, kind="pps", data=data.iloc[:20, :]) - - assert (0 < idata.posterior["y_mean"]).all() & (idata.posterior["y_mean"] < 1).all() - assert (idata.posterior_predictive["y"].isin([0, 1])).all() - - -def test_predict_beta(data_beta): - data = data_beta - data["batch"] = pd.Categorical(data["batch"], [10, 1, 2, 3, 4, 5, 6, 7, 8, 9], ordered=True) - model = bmb.Model("yield ~ temp + batch", data, family="beta") - idata = model.fit(tune=100, draws=100, target_accept=0.90) - - model.predict(idata, kind="mean") - model.predict(idata, kind="pps") - - assert (0 < idata.posterior["yield_mean"]).all() & (idata.posterior["yield_mean"] < 1).all() - assert (0 < idata.posterior_predictive["yield"]).all() & ( - idata.posterior_predictive["yield"] < 1 - ).all() - - model.predict(idata, kind="mean", data=data.iloc[:20, :]) - model.predict(idata, kind="pps", data=data.iloc[:20, :]) - - assert (0 < idata.posterior["yield_mean"]).all() & (idata.posterior["yield_mean"] < 1).all() - assert (0 < idata.posterior_predictive["yield"]).all() & ( - idata.posterior_predictive["yield"] < 1 - ).all() - - -def test_predict_gamma(data_gamma): - data = data_gamma - - model = bmb.Model("y ~ x", data, family="gamma", link="log") - idata = model.fit(tune=100, draws=100) - - model.predict(idata, kind="mean") - model.predict(idata, kind="pps") - - assert (0 < idata.posterior["y_mean"]).all() - assert (0 < idata.posterior_predictive["y"]).all() - - model.predict(idata, kind="mean", data=data.iloc[:20, :]) - model.predict(idata, kind="pps", data=data.iloc[:20, :]) - - assert (0 < idata.posterior["y_mean"]).all() - assert (0 < idata.posterior_predictive["y"]).all() - - -def test_predict_gaussian(data_numeric_xy): - data = data_numeric_xy - model = bmb.Model("y ~ x", data, family="gaussian") - idata = model.fit(tune=100, draws=100) - - model.predict(idata, kind="mean") - model.predict(idata, kind="pps") - - model.predict(idata, kind="mean", data=data.iloc[:20, :]) - model.predict(idata, kind="pps", data=data.iloc[:20, :]) - - -def test_predict_negativebinomial(data_count): - data = data_count - - model = bmb.Model("y ~ x", data, family="negativebinomial") - idata = model.fit(tune=100, draws=100) - - model.predict(idata, kind="mean") - model.predict(idata, kind="pps") - - assert (0 < idata.posterior["y_mean"]).all() - assert (np.equal(np.mod(idata.posterior_predictive["y"].values, 1), 0)).all() - - model.predict(idata, kind="mean", data=data.iloc[:20, :]) - model.predict(idata, kind="pps", data=data.iloc[:20, :]) - - assert (0 < idata.posterior["y_mean"]).all() - assert (np.equal(np.mod(idata.posterior_predictive["y"].values, 1), 0)).all() - - -def test_predict_poisson(data_count): - data = data_count - - model = bmb.Model("y ~ x", data, family="negativebinomial") - idata = model.fit(tune=100, draws=100) - - model.predict(idata, kind="mean") - model.predict(idata, kind="pps") - - assert (0 < idata.posterior["y_mean"]).all() - assert (np.equal(np.mod(idata.posterior_predictive["y"].values, 1), 0)).all() - - model.predict(idata, kind="mean", data=data.iloc[:20, :]) - model.predict(idata, kind="pps", data=data.iloc[:20, :]) - - assert (0 < idata.posterior["y_mean"]).all() - assert (np.equal(np.mod(idata.posterior_predictive["y"].values, 1), 0)).all() - - -def test_predict_t(data_numeric_xy): - data = data_numeric_xy - model = bmb.Model("y ~ x", data, family="t") - idata = model.fit(tune=100, draws=100) - - model.predict(idata, kind="mean") - model.predict(idata, kind="pps") - - model.predict(idata, kind="mean", data=data.iloc[:20, :]) - model.predict(idata, kind="pps", data=data.iloc[:20, :]) - - # A case where the prior for one of the parameters is constant - model = bmb.Model("y ~ x", data, family="t", priors={"nu": 4}) - idata = model.fit(tune=100, draws=100) - - model.predict(idata, kind="mean") - model.predict(idata, kind="pps") - - model.predict(idata, kind="mean", data=data.iloc[:20, :]) - model.predict(idata, kind="pps", data=data.iloc[:20, :]) - - -def test_predict_wald(data_gamma): - data = data_gamma - - model = bmb.Model("y ~ x", data, family="wald", link="log") - idata = model.fit(tune=100, draws=100) - - model.predict(idata, kind="mean") - model.predict(idata, kind="pps") - - assert (0 < idata.posterior["y_mean"]).all() - assert (0 < idata.posterior_predictive["y"]).all() - - model.predict(idata, kind="mean", data=data.iloc[:20, :]) - model.predict(idata, kind="pps", data=data.iloc[:20, :]) - - assert (0 < idata.posterior["y_mean"]).all() - assert (0 < idata.posterior_predictive["y"]).all() - - -def test_predict_categorical(inhaler): - model = bmb.Model("rating ~ period + carry + treat", inhaler, family="categorical") - idata = model.fit(tune=100, draws=100) - - model.predict(idata) - assert np.allclose(idata.posterior["rating_mean"].values.sum(-1), 1) - - model.predict(idata, data=inhaler.iloc[:20, :]) - assert np.allclose(idata.posterior["rating_mean"].values.sum(-1), 1) - - model = bmb.Model( - "rating ~ period + carry + treat + (1|subject)", inhaler, family="categorical" - ) - idata = model.fit(tune=100, draws=100) - - model.predict(idata) - assert np.allclose(idata.posterior["rating_mean"].values.sum(-1), 1) - - model.predict(idata, data=inhaler.iloc[:20, :]) - assert np.allclose(idata.posterior["rating_mean"].values.sum(-1), 1) - - -def test_posterior_predictive_categorical(inhaler): - model = bmb.Model("rating ~ period", data=inhaler, family="categorical") - idata = model.fit(tune=100, draws=100) - model.predict(idata, kind="pps") - pps = idata.posterior_predictive["rating"].to_numpy() - - assert pps.shape[-1] == inhaler.shape[0] - assert (np.unique(pps) == [0, 1, 2, 3]).all() - - -def test_predict_categorical_group_specific(): - # see https://github.com/bambinos/bambi/issues/447 - rng = np.random.default_rng(1234) - size = 100 - - data = pd.DataFrame( - { - "y": rng.choice([0, 1], size=size), - "x1": rng.choice(list("abcd"), size=size), - "x2": rng.choice(list("XY"), size=size), - "x3": rng.normal(size=size), - } - ) - - model = bmb.Model("y ~ x1 + (0 + x2|x1) + (0 + x3|x1 + x2)", data, family="bernoulli") - - idata = model.fit(tune=100, draws=100, chains=2) - - model.predict(idata, data=data) - - assert idata.posterior.y_mean.values.shape == (2, 100, 100) - assert (idata.posterior.y_mean.values > 0).all() and (idata.posterior.y_mean.values < 1).all() - - -def test_predict_multinomial(inhaler): - df = inhaler.groupby(["treat", "carry", "rating"], as_index=False).size() - df = df.pivot(index=["treat", "carry"], columns="rating", values="size").reset_index() - df.columns = ["treat", "carry", "y1", "y2", "y3", "y4"] - - # Intercept only - model = bmb.Model("c(y1, y2, y3, y4) ~ 1", df, family="multinomial") - idata = model.fit(tune=100, draws=100) - - model.predict(idata) - model.predict(idata, data=df.iloc[:3, :]) - - # Numerical predictors - model = bmb.Model("c(y1, y2, y3, y4) ~ treat + carry", df, family="multinomial") - idata = model.fit(tune=100, draws=100) - - model.predict(idata) - model.predict(idata, data=df.iloc[:3, :]) - - # Categorical predictors - df["treat"] = df["treat"].replace({-0.5: "A", 0.5: "B"}) - df["carry"] = df["carry"].replace({-1: "a", 0: "b", 1: "c"}) - - model = bmb.Model("c(y1, y2, y3, y4) ~ treat + carry", df, family="multinomial") - idata = model.fit(tune=100, draws=100) - - model.predict(idata) - model.predict(idata, data=df.iloc[:3, :]) - - data = pd.DataFrame( - { - "state": ["A", "B", "C"], - "y1": [35298, 1885, 5775], - "y2": [167328, 20731, 21564], - "y3": [212682, 37716, 20222], - "y4": [37966, 5196, 3277], - } - ) - - # Contains group-specific effect - model = bmb.Model( - "c(y1, y2, y3, y4) ~ 1 + (1 | state)", data, family="multinomial", noncentered=False - ) - idata = model.fit(tune=100, draws=100, random_seed=0) - - model.predict(idata) - model.predict(idata, kind="pps") - - -def test_posterior_predictive_multinomial(inhaler): - df = inhaler.groupby(["treat", "carry", "rating"], as_index=False).size() - df = df.pivot(index=["treat", "carry"], columns="rating", values="size").reset_index() - df.columns = ["treat", "carry", "y1", "y2", "y3", "y4"] - - # Intercept only - model = bmb.Model("c(y1, y2, y3, y4) ~ 1", df, family="multinomial") - idata = model.fit(tune=100, draws=100) - - # The sum across the columns of the response is the same for all the chain and draws. - model.predict(idata, kind="pps") - assert np.all(idata.posterior_predictive["c(y1, y2, y3, y4)"].values.sum(-1).var((0, 1)) == 0) - - -def test_predict_include_group_specific(): - rng = np.random.default_rng(1234) - size = 100 - - data = pd.DataFrame( - { - "y": rng.choice([0, 1], size=size), - "x1": rng.choice(list("abcd"), size=size), - } - ) - - model = bmb.Model("y ~ 1 + (1|x1)", data, family="bernoulli") - idata = model.fit(tune=100, draws=100, chains=2, random_seed=1234) - idata_1 = model.predict(idata, data=data, inplace=False, include_group_specific=True) - idata_2 = model.predict(idata, data=data, inplace=False, include_group_specific=False) - - assert not np.isclose( - idata_1.posterior["y_mean"].values, - idata_2.posterior["y_mean"].values, - ).all() - - # Since it's an intercept-only model, predictions are the same for all observations if - # we drop group-specific terms. - assert (idata_2.posterior["y_mean"] == idata_2.posterior["y_mean"][:, :, 0]).all() - - # When we include group-specific terms, these predictions are different - assert not (idata_1.posterior["y_mean"] == idata_1.posterior["y_mean"][:, :, 0]).all() - - -def test_predict_offset(): - # Simple case - - data = bmb.load_data("carclaims") - model = bmb.Model("numclaims ~ offset(np.log(exposure))", data, family="poisson", link="log") - idata = model.fit(tune=100, draws=100, chains=2, random_seed=1234) - model.predict(idata) - model.predict(idata, kind="pps") - - # More complex case - rng = np.random.default_rng(121195) - data = pd.DataFrame( - { - "y": rng.poisson(20, size=100), - "x": rng.normal(size=100), - "group": np.tile(np.arange(10), 10), - } - ) - data["time"] = data["y"] - rng.normal(loc=1, size=100) - model = bmb.Model("y ~ offset(np.log(time)) + x + (1 | group)", data, family="poisson") - idata = model.fit(tune=100, draws=100, chains=2, target_accept=0.9, random_seed=1234) - model.predict(idata, kind="pps") - - -def test_posterior_predictive_dirichlet_multinomial(inhaler): - df = inhaler.groupby(["treat", "rating"], as_index=False).size() - df = df.pivot(index=["treat"], columns="rating", values="size").reset_index() - df.columns = ["treat", "y1", "y2", "y3", "y4"] - - # Intercept only - model = bmb.Model("c(y1, y2, y3, y4) ~ 1", df, family="dirichlet_multinomial") - idata = model.fit(tune=100, draws=100) - - # The sum across the columns of the response is the same for all the chain and draws. - model.predict(idata, kind="pps") - assert np.all(idata.posterior_predictive["c(y1, y2, y3, y4)"].values.sum(-1).var((0, 1)) == 0) - - # With predictor only - model = bmb.Model("c(y1, y2, y3, y4) ~ 0 + treat", df, family="dirichlet_multinomial") - idata = model.fit(tune=100, draws=100) - - # The sum across the columns of the response is the same for all the chain and draws. - model.predict(idata, kind="pps") - assert np.all(idata.posterior_predictive["c(y1, y2, y3, y4)"].values.sum(-1).var((0, 1)) == 0) - - -def test_posterior_predictive_beta_binomial(): - data = pd.DataFrame( - { - "x": np.array([1.6907, 1.7242, 1.7552, 1.7842, 1.8113, 1.8369, 1.8610, 1.8839]), - "n": np.array([59, 60, 62, 56, 63, 59, 62, 60]), - "y": np.array([6, 13, 18, 28, 52, 53, 61, 60]), - } - ) - - model = bmb.Model("prop(y, n) ~ x", data, family="beta_binomial") - idata = model.fit(draws=100, tune=100) - model.predict(idata, kind="pps") - - n = data["n"].to_numpy() - assert np.all(idata.posterior_predictive["prop(y, n)"].values <= n[np.newaxis, np.newaxis, :])