Skip to content

Commit

Permalink
Applying Juan corrections
Browse files Browse the repository at this point in the history
  • Loading branch information
cetagostini committed Aug 10, 2023
1 parent a2f31d4 commit 2e008ea
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 43 deletions.
82 changes: 59 additions & 23 deletions pymc_marketing/mmm/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,54 +475,86 @@ def compute_channel_contribution_original_scale(self) -> DataArray:
)

def _plot_estimations(
self, x: np.ndarray, ax: plt.Axes, channel: str, i: int
self, x: np.ndarray, ax: plt.Axes, channel: str, color_index: int, x_stop: int
) -> None:
"""
Plot the Michaelis-Menten curve fit for the given channel based on the estimation of the Menten parameters.
The function computes the mean channel contributions, estimates the Michaelis-Menten parameters, and plots
the curve fit. An elbow point on the curve is also highlighted.
Parameters
----------
x : np.ndarray
The x-axis data, usually representing the amount of input (e.g., substrate concentration in enzymology terms).
ax : plt.Axes
The matplotlib axes object where the plot should be drawn.
channel : str
The name of the channel for which the curve fit is being plotted.
color_index : int
An index used for color selection to ensure distinct colors for multiple plots.
Returns
-------
None
The function modifies the given axes object in-place and doesn't return any object.
"""
channel_contributions = self.compute_channel_contribution_original_scale().mean(
["chain", "draw"]
)

L, k = estimate_menten_parameters(channel, self.X, channel_contributions)
plateau_x = k * (0.99 * L / (L * 0.01))
elbow_y = michaelis_menten(k, L, k)

x_fit = np.linspace(0, plateau_x - (max(x) * 2), 1000)
if x_stop is not None:
x_limit = x_stop
else:
x_limit = k * (0.99 * L / (L * 0.01))

x_fit = np.linspace(0, x_limit, 1000)
y_fit = michaelis_menten(x_fit, L, k)

ax.plot(x_fit, y_fit, color=f"C{i}", label="Fit Curve", alpha=0.6)
ax.plot(x_fit, y_fit, color=f"C{color_index}", label="Fit Curve", alpha=0.6)
ax.plot(
k,
elbow_y,
"go",
color=f"C{i}",
color=f"C{color_index}",
markerfacecolor="white",
)

ax.set(xlabel="Spent", ylabel="Contribution")
ax.legend()

def budget_allocation(
def optimize_channel_budget_for_maximum_contribution(
self,
total_budget: int,
parameters: Optional[Dict[str, Tuple[float, float]]],
*,
parameters: Dict[str, Tuple[float, float]],
budget_bounds: Optional[Dict[str, Tuple[float, float]]],
) -> pd.DataFrame:
"""
Allocate the budget optimally among different channels based on estimations and budget constraints.
Optimize the allocation of a given total budget across multiple channels to maximize the expected contribution.
The optimization is based on the Michaelis-Menten equation, where each channel's contribution
follows a saturating function of its allocated budget. The function seeks the budget allocation
that maximizes the total expected contribution across all channels.
Parameters
----------
total_budget : int, requiere
The total budget available for allocation.
The total budget to be distributed across channels.
parameters : dict, requiere
A DataFrame containing estimations and information about different channels.
A dictionary where keys are channel names and values are tuples (L, k) representing the
Michaelis-Menten parameters for each channel.
budget_bounds : dict, optional
A dictionary specifying the budget bounds for each channel.
An optional dictionary defining the minimum and maximum budget for each channel.
If not provided, the budget for each channel is constrained between 0 and its L value.
Returns
-------
Dict
A dictionary containing the allocated budget and contribution information.
DataFrame
A pandas DataFrame containing the allocated budget and contribution information.
Raises
------
Expand All @@ -537,9 +569,14 @@ def budget_allocation(
"The 'total_budget' parameter must be an integer or float."
)

if not parameters:
raise ValueError(
"The 'parameters' argument (keyword-only) must be provided and non-empty."
)

return budget_allocator(
total_budget=total_budget,
channels=self.channel_columns,
channels=list(self.channel_columns),
parameters=parameters,
budget_ranges=budget_bounds,
)
Expand All @@ -553,20 +590,17 @@ def compute_channel_estimate_points_original_scale(self) -> Dict:
Dict
A DataFrame with the estimated points.
"""
parameters = {}
channel_contributions = self.compute_channel_contribution_original_scale().mean(
["chain", "draw"]
)

for channel in self.channel_columns:
parameters[channel] = estimate_menten_parameters(
channel, self.X, channel_contributions
)

return parameters
return {
channel: estimate_menten_parameters(channel, self.X, channel_contributions)
for channel in self.channel_columns
}

def plot_direct_contribution_curves(
self, show_estimations: bool = False
self, show_estimations: bool = False, x_stop=None
) -> plt.Figure:
"""
Plots the direct contribution curves. The term "direct" refers to the fact
Expand Down Expand Up @@ -601,7 +635,9 @@ def plot_direct_contribution_curves(
ax.scatter(x, y, label=f"{channel}", color=f"C{i}")

if show_estimations:
self._plot_estimations(x, ax, channel, i)
self._plot_estimations(
x=x, ax=ax, channel=channel, color_index=i, x_stop=x_stop
)

ax.legend(
loc="upper left",
Expand Down
97 changes: 77 additions & 20 deletions pymc_marketing/mmm/budget_optimizer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# optimization_utils.py
from typing import Dict, List, Optional, Tuple, Union
from typing import Dict, List, Optional, Tuple

import numpy as np
from pandas import DataFrame
Expand All @@ -8,18 +8,33 @@
from pymc_marketing.mmm.utils import michaelis_menten


def calculate_expected_contribution(parameters, optimal_budget):
def calculate_expected_contribution(
parameters: Dict[str, Tuple[float, float]], optimal_budget: Dict[str, float]
) -> Dict[str, float]:
"""
Calculate the total expected contribution of budget allocations across various channels.
Calculate expected contributions using the Michaelis-Menten model.
The Michaelis-Menten model describes the relationship between the allocated budget and
its expected contribution. As the budget increases, the contribution initially rises quickly
but eventually plateaus, highlighting diminishing returns on investment.
Parameters
----------
parameters : Dict
The Michaelis-Menten parameters for each channel. Each entry is a tuple (L, k) where:
- L is the maximum potential contribution.
- k is the budget at which the contribution is half of its maximum.
optimal_budget : Dict
The optimized budget allocations for each channel.
Returns
-------
dict
Dict
A dictionary with channels as keys and their respective contributions as values.
The key 'total' contains the total expected contribution.
"""

total_expected_contribution = 0
total_expected_contribution = 0.0
contributions = {}

for channel, budget in optimal_budget.items():
Expand All @@ -32,22 +47,32 @@ def calculate_expected_contribution(parameters, optimal_budget):
return contributions


def objective_distribution(x, channels, parameters):
def objective_distribution(
x: List[float], channels: List[str], parameters: Dict[str, Tuple[float, float]]
) -> float:
"""
Calculate the objective function value for a given budget distribution.
Compute the total contribution for a given budget distribution.
This function calculates the negative sum of contributions for a proposed budget
distribution using the Michaelis-Menten model. This value will be minimized in
the optimization process to maximize the total expected contribution.
Parameters
----------
x : list of float
The budget distribution across channels.
x : List of float
The proposed budget distribution across channels.
channels : List of str
The List of channels for which the budget is being optimized.
parameters : Dict
Michaelis-Menten parameters for each channel as described in `calculate_expected_contribution`.
Returns
-------
float
The value of the objective function given the budget distribution.
Negative of the total expected contribution for the given budget distribution.
"""

sum_contributions = 0
sum_contributions = 0.0

for channel, budget in zip(channels, x):
L, k = parameters[channel]
Expand All @@ -56,13 +81,44 @@ def objective_distribution(x, channels, parameters):
return -1 * sum_contributions


def optimize_budget_distribution(total_budget, budget_ranges, parameters, channels):
def optimize_budget_distribution(
total_budget: int,
budget_ranges: Optional[Dict[str, Tuple[float, float]]],
parameters: Dict[str, Tuple[float, float]],
channels: List[str],
) -> Dict[str, float]:
"""
Calculate the optimal budget distribution that minimizes the objective function.
Optimize the budget allocation across channels to maximize total contribution.
Using the Michaelis-Menten model, this function seeks the best budget distribution across
channels that maximizes the total expected contribution.
This function leverages the Sequential Least Squares Quadratic Programming (SLSQP) optimization
algorithm to find the best budget distribution across channels that maximizes the total
expected contribution based on the Michaelis-Menten model.
The optimization is constrained such that:
1. The sum of budgets across all channels equals the total available budget.
2. The budget allocated to each individual channel lies within its specified range.
The SLSQP method is particularly suited for this kind of problem as it can handle
both equality and inequality constraints.
Parameters
----------
total_budget : int
The total budget to be distributed across channels.
budget_ranges : Dict or None
An optional dictionary defining the minimum and maximum budget for each channel.
If not provided, the budget for each channel is constrained between 0 and its L value.
parameters : Dict
Michaelis-Menten parameters for each channel as described in `calculate_expected_contribution`.
channels : list of str
The list of channels for which the budget is being optimized.
Returns
-------
dict
Dict
A dictionary with channels as keys and the optimal budget for each channel as values.
"""

Expand All @@ -72,11 +128,11 @@ def optimize_budget_distribution(total_budget, budget_ranges, parameters, channe

if budget_ranges is None:
budget_ranges = {
channel: [0, min(total_budget, parameters[channel][0])]
channel: (0, min(total_budget, parameters[channel][0]))
for channel in channels
}

initial_guess = [total_budget / len(channels)] * len(channels)
initial_guess = [total_budget // len(channels)] * len(channels)

bounds = [budget_ranges[channel] for channel in channels]

Expand All @@ -95,11 +151,12 @@ def optimize_budget_distribution(total_budget, budget_ranges, parameters, channe


def budget_allocator(
total_budget: int = 1000,
channels: Union[List[str], Tuple[str]] = [],
parameters: Optional[Dict[str, Tuple[float, float]]] = {},
budget_ranges: Optional[Dict[str, Tuple[float, float]]] = {},
total_budget: int,
channels: List[str],
parameters: Dict[str, Tuple[float, float]],
budget_ranges: Optional[Dict[str, Tuple[float, float]]],
) -> DataFrame:

optimal_budget = optimize_budget_distribution(
total_budget, budget_ranges, parameters, channels
)
Expand Down

0 comments on commit 2e008ea

Please sign in to comment.