Skip to content

Commit

Permalink
issue 7247 move CustomDist test to a separate file
Browse files Browse the repository at this point in the history
  • Loading branch information
markgreene74 committed Jun 15, 2024
1 parent 48aaf63 commit 16a1a9d
Show file tree
Hide file tree
Showing 2 changed files with 284 additions and 253 deletions.
280 changes: 280 additions & 0 deletions tests/distributions/test_custom_distribution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
# Copyright 2024 The PyMC Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import warnings

import cloudpickle
import numpy as np
import numpy.random as npr
import pytensor
import pytensor.tensor as pt
import pytest
import scipy.stats as st

import pymc as pm

from pymc.distributions import Normal
from pymc.distributions.custom_distribution import CustomDist, CustomDistRV
from pymc.distributions.distribution import support_point
from pymc.distributions.shape_utils import to_tuple
from pymc.logprob.basic import logp
from pymc.model import Deterministic, Model
from pymc.sampling import sample
from pymc.testing import assert_support_point_is_expected


class TestCustomDist:
@pytest.mark.parametrize("size", [(), (3,), (3, 2)], ids=str)
def test_custom_dist_with_random(self, size):
with Model() as model:
mu = Normal("mu", 0, 1)
obs = CustomDist(
"custom_dist",
mu,
random=lambda mu, rng=None, size=None: rng.normal(loc=mu, scale=1, size=size),
observed=np.random.randn(100, *size),
)
assert isinstance(obs.owner.op, CustomDistRV)
assert obs.eval().shape == (100, *size)

def test_custom_dist_with_random_invalid_observed(self):
with pytest.raises(
TypeError,
match=(
"Since ``v4.0.0`` the ``observed`` parameter should be of type"
" ``pd.Series``, ``np.array``, or ``pm.Data``."
" Previous versions allowed passing distribution parameters as"
" a dictionary in ``observed``, in the current version these "
"parameters are positional arguments."
),
):
size = (3,)
with Model() as model:
mu = Normal("mu", 0, 1)
CustomDist(
"custom_dist",
mu,
random=lambda mu, rng=None, size=None: rng.normal(loc=mu, scale=1, size=size),
observed={"values": np.random.randn(100, *size)},
)

def test_custom_dist_without_random(self):
with Model() as model:
mu = Normal("mu", 0, 1)
custom_dist = CustomDist(
"custom_dist",
mu,
logp=lambda value, mu: logp(pm.Normal.dist(mu, 1, size=100), value),
observed=np.random.randn(100),
initval=0,
)
assert isinstance(custom_dist.owner.op, CustomDistRV)
idata = sample(tune=50, draws=100, cores=1, step=pm.Metropolis())

with pytest.raises(NotImplementedError):
pm.sample_posterior_predictive(idata, model=model)

@pytest.mark.xfail(
NotImplementedError,
reason="Support shape of multivariate CustomDist cannot be inferred. See https://github.com/pymc-devs/pytensor/pull/388",
)
@pytest.mark.parametrize("size", [(), (3,), (3, 2)], ids=str)
def test_custom_dist_with_random_multivariate(self, size):
supp_shape = 5
with Model() as model:
mu = Normal("mu", 0, 1, size=supp_shape)
obs = CustomDist(
"custom_dist",
mu,
random=lambda mu, rng=None, size=None: rng.multivariate_normal(
mean=mu, cov=np.eye(len(mu)), size=size
),
observed=np.random.randn(100, *size, supp_shape),
ndims_params=[1],
ndim_supp=1,
)

assert isinstance(obs.owner.op, CustomDistRV)
assert obs.eval().shape == (100, *size, supp_shape)

def test_serialize_custom_dist(self):
def func(x):
return -2 * (x**2).sum()

def random(rng, size):
return rng.uniform(-2, 2, size=size)

with Model():
Normal("x")
y = CustomDist("y", logp=func, random=random)
y_dist = CustomDist.dist(logp=func, random=random)
Deterministic("y_dist", y_dist)
assert isinstance(y.owner.op, CustomDistRV)
assert isinstance(y_dist.owner.op, CustomDistRV)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", ".*number of samples.*", UserWarning)
sample(draws=5, tune=1, mp_ctx="spawn")

cloudpickle.loads(cloudpickle.dumps(y))
cloudpickle.loads(cloudpickle.dumps(y_dist))

def test_custom_dist_old_api_error(self):
with Model():
with pytest.raises(
TypeError, match="The DensityDist API has changed, you are using the old API"
):
CustomDist("a", lambda x: x)

@pytest.mark.xfail(
NotImplementedError,
reason="Support shape of multivariate CustomDist cannot be inferred. See https://github.com/pymc-devs/pytensor/pull/388",
)
@pytest.mark.parametrize("size", [None, (), (2,)], ids=str)
def test_custom_dist_multivariate_logp(self, size):
supp_shape = 5
with Model() as model:

def logp(value, mu):
return pm.MvNormal.logp(value, mu, pt.eye(mu.shape[0]))

mu = Normal("mu", size=supp_shape)
a = CustomDist("a", mu, logp=logp, ndims_params=[1], ndim_supp=1, size=size)

assert isinstance(a.owner.op, CustomDistRV)
mu_test_value = npr.normal(loc=0, scale=1, size=supp_shape).astype(pytensor.config.floatX)
a_test_value = npr.normal(
loc=mu_test_value, scale=1, size=(*to_tuple(size), supp_shape)
).astype(pytensor.config.floatX)
log_densityf = model.compile_logp(vars=[a], sum=False)
assert log_densityf({"a": a_test_value, "mu": mu_test_value})[0].shape == to_tuple(size)

@pytest.mark.parametrize(
"support_point, size, expected",
[
(None, None, 0.0),
(None, 5, np.zeros(5)),
("custom_support_point", None, 5),
("custom_support_point", (2, 5), np.full((2, 5), 5)),
],
)
def test_custom_dist_default_support_point_univariate(self, support_point, size, expected):
if support_point == "custom_support_point":
support_point = lambda rv, size, *rv_inputs: 5 * pt.ones(size, dtype=rv.dtype) # noqa E731
with pm.Model() as model:
x = CustomDist("x", support_point=support_point, size=size)
assert isinstance(x.owner.op, CustomDistRV)
assert_support_point_is_expected(model, expected, check_finite_logp=False)

def test_custom_dist_moment_future_warning(self):
moment = lambda rv, size, *rv_inputs: 5 * pt.ones(size, dtype=rv.dtype) # noqa E731
with pm.Model() as model:
with pytest.warns(
FutureWarning, match="`moment` argument is deprecated. Use `support_point` instead."
):
x = CustomDist("x", moment=moment)
assert_support_point_is_expected(model, 5, check_finite_logp=False)

@pytest.mark.parametrize("size", [(), (2,), (3, 2)], ids=str)
def test_custom_dist_custom_support_point_univariate(self, size):
def density_support_point(rv, size, mu):
return (pt.ones(size) * mu).astype(rv.dtype)

mu_val = np.array(np.random.normal(loc=2, scale=1)).astype(pytensor.config.floatX)
with Model():
mu = Normal("mu")
a = CustomDist("a", mu, support_point=density_support_point, size=size)
assert isinstance(a.owner.op, CustomDistRV)
evaled_support_point = support_point(a).eval({mu: mu_val})
assert evaled_support_point.shape == to_tuple(size)
assert np.all(evaled_support_point == mu_val)

@pytest.mark.xfail(
NotImplementedError,
reason="Support shape of multivariate CustomDist cannot be inferred. See https://github.com/pymc-devs/pytensor/pull/388",
)
@pytest.mark.parametrize("size", [(), (2,), (3, 2)], ids=str)
def test_custom_dist_custom_support_point_multivariate(self, size):
def density_support_point(rv, size, mu):
return (pt.ones(size)[..., None] * mu).astype(rv.dtype)

mu_val = np.random.normal(loc=2, scale=1, size=5).astype(pytensor.config.floatX)
with Model():
mu = Normal("mu", size=5)
a = CustomDist(
"a",
mu,
support_point=density_support_point,
ndims_params=[1],
ndim_supp=1,
size=size,
)
assert isinstance(a.owner.op, CustomDistRV)
evaled_support_point = support_point(a).eval({mu: mu_val})
assert evaled_support_point.shape == (*to_tuple(size), 5)
assert np.all(evaled_support_point == mu_val)

@pytest.mark.xfail(
NotImplementedError,
reason="Support shape of multivariate CustomDist cannot be inferred. See https://github.com/pymc-devs/pytensor/pull/388",
)
@pytest.mark.parametrize(
"with_random, size",
[
(True, ()),
(True, (2,)),
(True, (3, 2)),
(False, ()),
(False, (2,)),
],
)
def test_custom_dist_default_support_point_multivariate(self, with_random, size):
def _random(mu, rng=None, size=None):
return rng.normal(mu, scale=1, size=to_tuple(size) + mu.shape)

if with_random:
random = _random
else:
random = None

mu_val = np.random.normal(loc=2, scale=1, size=5).astype(pytensor.config.floatX)
with Model():
mu = Normal("mu", size=5)
a = CustomDist("a", mu, random=random, ndims_params=[1], ndim_supp=1, size=size)
assert isinstance(a.owner.op, CustomDistRV)
if with_random:
evaled_support_point = support_point(a).eval({mu: mu_val})
assert evaled_support_point.shape == (*to_tuple(size), 5)
assert np.all(evaled_support_point == 0)
else:
with pytest.raises(
TypeError,
match="Cannot safely infer the size of a multivariate random variable's support_point.",
):
evaled_support_point = support_point(a).eval({mu: mu_val})

def test_dist(self):
mu = 1
x = pm.CustomDist.dist(
mu,
logp=lambda value, mu: pm.logp(pm.Normal.dist(mu), value),
random=lambda mu, rng=None, size=None: rng.normal(loc=mu, scale=1, size=size),
shape=(3,),
)

x = cloudpickle.loads(cloudpickle.dumps(x))

test_value = pm.draw(x, random_seed=1)
assert np.all(test_value == pm.draw(x, random_seed=1))

x_logp = pm.logp(x, test_value)
assert np.allclose(x_logp.eval(), st.norm(1).logpdf(test_value))
Loading

0 comments on commit 16a1a9d

Please sign in to comment.