Skip to content

Commit

Permalink
Deprecate several pymc_marketing.mmm.utils functions (#1078)
Browse files Browse the repository at this point in the history
  • Loading branch information
PabloRoque authored Oct 7, 2024
1 parent 9bb18a1 commit 95bc1fa
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 252 deletions.
171 changes: 0 additions & 171 deletions pymc_marketing/mmm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,177 +20,6 @@
import numpy.typing as npt
import pandas as pd
import xarray as xr
from scipy.optimize import curve_fit, minimize_scalar

from pymc_marketing.mmm.transformers import michaelis_menten


def estimate_menten_parameters(
channel: str | Any,
original_dataframe: pd.DataFrame | Any,
contributions: xr.DataArray | Any,
**kwargs,
) -> list[float]:
"""Estimate the parameters for the Michaelis-Menten function using curve fitting.
This function extracts the relevant data for the specified channel from both
the original_dataframe and contributions DataArray resulting from the model.
It then utilizes scipy's curve_fit method to find the optimal parameters for
an Menten function, aiming to minimize the least squares difference between
the observed and predicted data.
Parameters
----------
channel : str
The name of the marketing channel for which parameters are to be estimated.
original_dataframe : Union[pd.DataFrame, Any]
The original DataFrame containing the channel data.
contributions : xr.DataArray
An xarray DataArray containing the contributions data, indexed by channel.
**kwargs : dict
Additional keyword arguments to pass to the curve_fit function.
Returns
-------
List[float]
The estimated parameters of the extended sigmoid function.
"""
maxfev = kwargs.get("maxfev", 5000)
lam_initial_estimate = kwargs.get("lam_initial_estimate", 0.001)

x = kwargs.get("x", original_dataframe[channel].to_numpy())
y = kwargs.get("y", contributions.sel(channel=channel).to_numpy())

alpha_initial_estimate = kwargs.get("alpha_initial_estimate", max(y))

# Initial guess for L and k
initial_guess = [alpha_initial_estimate, lam_initial_estimate]
# Curve fitting
popt, _ = curve_fit(michaelis_menten, x, y, p0=initial_guess, maxfev=maxfev)

# Save the parameters
return popt


def estimate_sigmoid_parameters(
channel: str | Any,
original_dataframe: pd.DataFrame | Any,
contributions: xr.DataArray | Any,
**kwargs,
) -> list[float]:
"""Estimate the parameters for the sigmoid function using curve fitting.
This function extracts the relevant data for the specified channel from both
the original_dataframe and contributions DataArray resulting from the model.
It then utilizes scipy's curve_fit method to find the optimal parameters for
an sigmoid function, aiming to minimize the least squares difference between
the observed and predicted data.
Parameters
----------
channel : str
The name of the marketing channel for which parameters are to be estimated.
original_dataframe : Union[pd.DataFrame, Any]
The original DataFrame containing the channel data.
contributions : xr.DataArray
An xarray DataArray containing the contributions data, indexed by channel.
Returns
-------
List[float]
The estimated parameters of the extended sigmoid function.
"""
maxfev = kwargs.get("maxfev", 5000)
lam_initial_estimate = kwargs.get("lam_initial_estimate", 0.00001)

x = kwargs.get("x", original_dataframe[channel].to_numpy())
y = kwargs.get("y", contributions.sel(channel=channel).to_numpy())

alpha_initial_estimate = kwargs.get("alpha_initial_estimate", 3 * max(y))

parameter_bounds_modified = ([0, 0], [alpha_initial_estimate, np.inf])
popt, _ = curve_fit(
sigmoid_saturation,
x,
y,
p0=[alpha_initial_estimate, lam_initial_estimate],
bounds=parameter_bounds_modified,
maxfev=maxfev,
)

return popt


def compute_sigmoid_second_derivative(
x: float | np.ndarray | npt.NDArray[np.float64],
alpha: float | np.ndarray | npt.NDArray[np.float64],
lam: float | np.ndarray | npt.NDArray[np.float64],
) -> float | Any:
"""Compute the second derivative of the extended sigmoid function.
The second derivative of a function gives us information about the curvature of the function.
In the context of the sigmoid function, it helps us identify the inflection point, which is
the point where the function changes from being concave up to concave down, or vice versa.
Parameters
----------
x : float
The input value for which the second derivative is to be computed.
alpha : float
The asymptotic maximum or ceiling value of the sigmoid function.
lam : float
The parameter that affects how quickly the function approaches its upper and lower asymptotes.
Returns
-------
float
The second derivative of the sigmoid function at the input value.
"""
return (
-alpha
* lam**2
* np.exp(-lam * x)
* (1 - np.exp(-lam * x) - 2 * lam * x * np.exp(-lam * x))
/ (1 + np.exp(-lam * x)) ** 3
)


def find_sigmoid_inflection_point(
alpha: float | np.ndarray | npt.NDArray[np.float64],
lam: float | np.ndarray | npt.NDArray[np.float64],
) -> tuple[Any, float]:
"""Find the inflection point of the extended sigmoid function.
The inflection point of a function is the point where the function changes its curvature,
i.e., it changes from being concave up to concave down, or vice versa. For the sigmoid
function, this is the point where the function has its maximum rate of growth.
Parameters
----------
alpha : float
The asymptotic maximum or ceiling value of the sigmoid function.
lam : float
The parameter that affects how quickly the function approaches its upper and lower asymptotes.
Returns
-------
tuple
The x and y coordinates of the inflection point.
"""
# Minimize the negative of the absolute value of the second derivative
result = minimize_scalar(
lambda x: -abs(compute_sigmoid_second_derivative(x, alpha, lam))
)

# Evaluate the original function at the inflection point
x_inflection = result.x
y_inflection = sigmoid_saturation(x_inflection, alpha, lam)

return x_inflection, y_inflection


def apply_sklearn_transformer_across_dim(
Expand Down
81 changes: 0 additions & 81 deletions tests/mmm/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,94 +19,13 @@

from pymc_marketing.mmm.utils import (
apply_sklearn_transformer_across_dim,
compute_sigmoid_second_derivative,
create_new_spend_data,
drop_scalar_coords,
estimate_menten_parameters,
estimate_sigmoid_parameters,
find_sigmoid_inflection_point,
sigmoid_saturation,
transform_1d_array,
)


# Test estimate_menten_parameters with valid inputs
@pytest.mark.parametrize(
"channel,original_dataframe,contributions,expected",
[
(
"channel1",
pd.DataFrame({"channel1": [0, 1, 2, 3, 4], "channel2": [0, 2, 4, 6, 8]}),
xr.DataArray(
np.array([[0, 2.85714286, 5, 6.66666667, 8]]),
coords=[("channel", ["channel1"]), ("observation", list(range(5)))],
),
[20, 6],
),
# Add more test cases as needed
],
)
def test_estimate_menten_parameters_valid(
channel, original_dataframe, contributions, expected
):
result = estimate_menten_parameters(channel, original_dataframe, contributions)
np.testing.assert_allclose(result, expected, rtol=1e-5, atol=1e-8)


# Test estimate_sigmoid_parameters with valid inputs
@pytest.mark.parametrize(
"channel,original_dataframe,contributions,expected",
[
(
"channel1",
pd.DataFrame({"channel1": [0, 1, 2, 3, 4], "channel2": [0, 2, 4, 6, 8]}),
xr.DataArray(
np.array([[0, 1, 2, 2.8, 2.95]]),
coords=[("channel", ["channel1"]), ("observation", list(range(5)))],
),
[3.53, 0.648],
),
# Add more test cases as needed
],
)
def test_estimate_sigmoid_parameters_valid(
channel, original_dataframe, contributions, expected
):
result = estimate_sigmoid_parameters(channel, original_dataframe, contributions)
np.testing.assert_allclose(result, expected, rtol=1e-2, atol=1e-4)


# Test compute_sigmoid_second_derivative with valid inputs
@pytest.mark.parametrize(
"x,alpha,lam,expected",
[
(
np.array([0, 1, 2, 3, 4]),
3.53,
0.648,
np.array([-0, 0.04411199, -0.00336529, -0.04266177, -0.04798905]),
),
# Add more test cases as needed
],
)
def test_compute_sigmoid_second_derivative_valid(x, alpha, lam, expected):
result = compute_sigmoid_second_derivative(x, alpha, lam)
np.testing.assert_allclose(result, expected, rtol=1e-5, atol=1e-8)


# Test find_sigmoid_inflection_point with valid inputs
@pytest.mark.parametrize(
"alpha,lam,expected",
[
(3.53, 0.648, (0.8041700751856726, 0.8994825718533391)),
# Add more test cases as needed
],
)
def test_find_sigmoid_inflection_point_valid(alpha, lam, expected):
result = find_sigmoid_inflection_point(alpha, lam)
np.testing.assert_allclose(result, expected, rtol=1e-5, atol=1e-8)


@pytest.fixture
def mock_method():
def _mock_method(x):
Expand Down

0 comments on commit 95bc1fa

Please sign in to comment.