Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Weibull CDF and PDF Adstock Transformations #499

Merged
merged 12 commits into from
Jan 29, 2024
103 changes: 103 additions & 0 deletions pymc_marketing/mmm/transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import numpy as np
import numpy.typing as npt
import pymc as pm
import pytensor.tensor as pt
from pytensor.tensor.random.utils import params_broadcast_shapes

Expand All @@ -13,6 +14,11 @@ class ConvMode(Enum):
Overlap = "Overlap"


class WeibullType(Enum):
PDF = "PDF"
CDF = "CDF"


def batched_convolution(x, w, axis: int = 0, mode: ConvMode = ConvMode.Before):
R"""Apply a 1D convolution in a vectorized way across multiple batch dimensions.

Expand Down Expand Up @@ -260,6 +266,103 @@ def delayed_adstock(
return batched_convolution(x, w, axis=axis)


def weibull_adstock(
x,
lam=1,
k=1,
l_max: int = 12,
axis: int = 0,
type: WeibullType = WeibullType.PDF,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use string or a boolean variable so users don't have to import an object to be able to parametrize the function

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

@abdalazizrashid abdalazizrashid Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a similar interface for batched_convolution too. Maybe make it type: WeibullType | str = WeibullType.PDF?

Copy link
Contributor

@wd60622 wd60622 Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will only every be pdf or not pdf (cdf), right? Think the boolean simplifies it heavily

):
R"""Weibull Adstocking Transformation.

This transformation is similar to geometric adstock transformation but has more degrees of freedom, adding more flexibility.
.. plot::
:context: close-figs
abdalazizrashid marked this conversation as resolved.
Show resolved Hide resolved
import matplotlib.pyplot as plt
import numpy as np
import arviz as az
from pymc_marketing.mmm.transformers import WeibullType, weibull_adstock
plt.style.use('arviz-darkgrid')

spend = np.zeros(50)
spend[0] = 1

shapes = [0.5, 1., 1.5, 5.]
scales = [10, 20, 40]
modes = [WeibullType.PDF, WeibullType.CDF]

fig, axes = plt.subplots(
len(shapes), len(modes), figsize=(12, 8), sharex=True, sharey=True
)
fig.suptitle("Effect of Changing Weibull Adstock Parameters", fontsize=16)

for m, mode in enumerate(modes):
axes[0, m].set_title(f"Mode: {mode.value}")

for i, shape in enumerate(shapes):
for j, scale in enumerate(scales):
adstock = weibull_adstock(
spend, lam=scale, k=shape, type=mode, l_max=len(spend)
).eval()

axes[i, m].plot(
np.arange(len(spend)),
adstock,
label=f"Scale={scale}",
linestyle="-",
alpha=0.5
)

fig.legend(
*axes[0, 0].get_legend_handles_labels(),
loc="center right",
bbox_to_anchor=(1.2, 0.85),
)

plt.tight_layout(rect=[0, 0, 0.9, 1])
plt.show()



Parameters
----------
x : tensor
Input tensor.
lam : float, by default 1.
Scale parameter of the Weibull distribution. Must be positive.
k : float, by default 1.
Shape parameter of the Weibull distribution. Must be positive.
l_max : int, by default 12
Maximum duration of carryover effect.
type : WeibullType, by default WeibullType.PDF
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe good to add string to match with the type hint

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Type of Weibull adstock transformation to be applied (PDF or CDF).

Returns
-------
tensor
Transformed tensor based on Weibull adstock transformation.
"""
lam = pt.as_tensor(lam)[..., None]
k = pt.as_tensor(k)[..., None]
t = pt.arange(l_max, dtype=x.dtype) + 1

if type == WeibullType.PDF:
w = pt.exp(pm.Weibull.logp(t, k, lam))
w = (w - pt.min(w, axis=-1)[..., None]) / (
pt.max(w, axis=-1)[..., None] - pt.min(w, axis=-1)[..., None]
)
elif type == WeibullType.CDF:
w = 1 - pt.exp(pm.Weibull.logcdf(t, k, lam))
shape = (*w.shape[:-1], w.shape[-1] + 1)
padded_w = pt.ones(shape, dtype=w.dtype)
padded_w = pt.set_subtensor(padded_w[..., 1:], w)
w = pt.cumprod(padded_w, axis=-1)
else:
raise ValueError(f"Wrong WeibullType: {type}, expected of WeibullType")
return batched_convolution(x, w, axis=axis)


def logistic_saturation(x, lam: Union[npt.NDArray[np.float_], float] = 0.5):
"""Logistic saturation transformation.

Expand Down
83 changes: 83 additions & 0 deletions tests/mmm/test_transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@
import pytensor
import pytensor.tensor as pt
import pytest
import scipy as sp
from pytensor.tensor.var import TensorVariable

from pymc_marketing.mmm.transformers import (
ConvMode,
WeibullType,
batched_convolution,
delayed_adstock,
geometric_adstock,
logistic_saturation,
tanh_saturation,
weibull_adstock,
)


Expand Down Expand Up @@ -166,6 +169,86 @@ def test_delayed_adstock_vectorized(self, dummy_design_matrix):
assert y.shape == x.shape
np.testing.assert_almost_equal(actual=y, desired=ys, decimal=12)

def test_weibull_adstock_output_type(self):
abdalazizrashid marked this conversation as resolved.
Show resolved Hide resolved
x = np.ones(shape=(100))
y = weibull_adstock(x=x, lam=1, k=1, l_max=7, type=WeibullType.PDF)
assert isinstance(y, TensorVariable)
assert isinstance(y.eval(), np.ndarray)

def test_weibull_adstock_x_zero(self):
abdalazizrashid marked this conversation as resolved.
Show resolved Hide resolved
x = np.zeros(shape=(100))
y = weibull_adstock(x=x, lam=1, k=1, l_max=4, type=WeibullType.PDF)
np.testing.assert_array_equal(x=x, y=y.eval())

@pytest.mark.parametrize(
"x, lam, k, l_max",
[
(np.ones(shape=(100)), 0.3, 0.5, 10),
(np.ones(shape=(100)), 0.7, 1, 100),
(np.zeros(shape=(100)), 0.2, 0.2, 5),
(np.ones(shape=(100)), 0.5, 0.8, 7),
(np.linspace(start=0.0, stop=1.0, num=50), 0.8, 1.5, 3),
(np.linspace(start=0.0, stop=1.0, num=50), 0.8, 1, 50),
],
)
def test_weibull_pdf_adstock(self, x, lam, k, l_max):
y = weibull_adstock(x=x, lam=lam, k=k, l_max=l_max, type=WeibullType.PDF).eval()

assert np.all(np.isfinite(y))
w = sp.stats.weibull_min.pdf(np.arange(l_max) + 1, c=k, scale=lam)
w = (w - np.min(w)) / (np.max(w) - np.min(w))
sp_y = batched_convolution(x, w).eval()

np.testing.assert_almost_equal(y, sp_y)

@pytest.mark.parametrize(
"x, lam, k, l_max",
[
(np.ones(shape=(100)), 0.3, 0.5, 10),
(np.ones(shape=(100)), 0.7, 1, 100),
(np.zeros(shape=(100)), 0.2, 0.2, 5),
(np.ones(shape=(100)), 0.5, 0.8, 7),
(np.linspace(start=0.0, stop=1.0, num=50), 0.8, 1.5, 3),
(np.linspace(start=0.0, stop=1.0, num=50), 0.8, 1, 50),
],
)
def test_weibull_cdf_adsotck(self, x, lam, k, l_max):
y = weibull_adstock(x=x, lam=lam, k=k, l_max=l_max, type=WeibullType.CDF).eval()

assert np.all(np.isfinite(y))
w = 1 - sp.stats.weibull_min.cdf(np.arange(l_max) + 1, c=k, scale=lam)
w = sp.cumprod(np.concatenate([[1], w]))
sp_y = batched_convolution(x, w).eval()
np.testing.assert_almost_equal(y, sp_y)

@pytest.mark.parametrize(
"type",
[
WeibullType.PDF,
WeibullType.CDF,
],
)
def test_weibull_adstock_vectorized(self, type, dummy_design_matrix):
x = dummy_design_matrix.copy()
x_tensor = pt.as_tensor_variable(x)
lam = [0.9, 0.33, 0.5, 0.1, 1.0]
lam_tensor = pt.as_tensor_variable(lam)
k = [0.8, 0.2, 0.6, 0.4, 1.0]
k_tensor = pt.as_tensor_variable(k)
y = weibull_adstock(
x=x_tensor, lam=lam_tensor, k=k_tensor, l_max=12, type=type
).eval()

y_tensors = [
weibull_adstock(
x=x_tensor[:, i], lam=lam_tensor[i], k=k_tensor[i], l_max=12, type=type
)
for i in range(x.shape[1])
]
ys = np.concatenate([y_t.eval()[..., None] for y_t in y_tensors], axis=1)
assert y.shape == x.shape
np.testing.assert_almost_equal(actual=y, desired=ys, decimal=12)


class TestSaturationTransformers:
def test_logistic_saturation_lam_zero(self):
Expand Down
Loading