diff --git a/pymc_marketing/mmm/base.py b/pymc_marketing/mmm/base.py index eed60225..9d94535a 100644 --- a/pymc_marketing/mmm/base.py +++ b/pymc_marketing/mmm/base.py @@ -475,54 +475,86 @@ def compute_channel_contribution_original_scale(self) -> DataArray: ) def _plot_estimations( - self, x: np.ndarray, ax: plt.Axes, channel: str, i: int + self, x: np.ndarray, ax: plt.Axes, channel: str, color_index: int, x_stop: int ) -> None: + """ + Plot the Michaelis-Menten curve fit for the given channel based on the estimation of the Menten parameters. + + The function computes the mean channel contributions, estimates the Michaelis-Menten parameters, and plots + the curve fit. An elbow point on the curve is also highlighted. + + Parameters + ---------- + x : np.ndarray + The x-axis data, usually representing the amount of input (e.g., substrate concentration in enzymology terms). + ax : plt.Axes + The matplotlib axes object where the plot should be drawn. + channel : str + The name of the channel for which the curve fit is being plotted. + color_index : int + An index used for color selection to ensure distinct colors for multiple plots. + Returns + ------- + None + The function modifies the given axes object in-place and doesn't return any object. + """ channel_contributions = self.compute_channel_contribution_original_scale().mean( ["chain", "draw"] ) L, k = estimate_menten_parameters(channel, self.X, channel_contributions) - plateau_x = k * (0.99 * L / (L * 0.01)) elbow_y = michaelis_menten(k, L, k) - x_fit = np.linspace(0, plateau_x - (max(x) * 2), 1000) + if x_stop is not None: + x_limit = x_stop + else: + x_limit = k * (0.99 * L / (L * 0.01)) + + x_fit = np.linspace(0, x_limit, 1000) y_fit = michaelis_menten(x_fit, L, k) - ax.plot(x_fit, y_fit, color=f"C{i}", label="Fit Curve", alpha=0.6) + ax.plot(x_fit, y_fit, color=f"C{color_index}", label="Fit Curve", alpha=0.6) ax.plot( k, elbow_y, "go", - color=f"C{i}", + color=f"C{color_index}", markerfacecolor="white", ) ax.set(xlabel="Spent", ylabel="Contribution") ax.legend() - def budget_allocation( + def optimize_channel_budget_for_maximum_contribution( self, total_budget: int, - parameters: Optional[Dict[str, Tuple[float, float]]], + *, + parameters: Dict[str, Tuple[float, float]], budget_bounds: Optional[Dict[str, Tuple[float, float]]], ) -> pd.DataFrame: """ - Allocate the budget optimally among different channels based on estimations and budget constraints. + Optimize the allocation of a given total budget across multiple channels to maximize the expected contribution. + + The optimization is based on the Michaelis-Menten equation, where each channel's contribution + follows a saturating function of its allocated budget. The function seeks the budget allocation + that maximizes the total expected contribution across all channels. Parameters ---------- total_budget : int, requiere - The total budget available for allocation. + The total budget to be distributed across channels. parameters : dict, requiere - A DataFrame containing estimations and information about different channels. + A dictionary where keys are channel names and values are tuples (L, k) representing the + Michaelis-Menten parameters for each channel. budget_bounds : dict, optional - A dictionary specifying the budget bounds for each channel. + An optional dictionary defining the minimum and maximum budget for each channel. + If not provided, the budget for each channel is constrained between 0 and its L value. Returns ------- - Dict - A dictionary containing the allocated budget and contribution information. + DataFrame + A pandas DataFrame containing the allocated budget and contribution information. Raises ------ @@ -537,9 +569,14 @@ def budget_allocation( "The 'total_budget' parameter must be an integer or float." ) + if not parameters: + raise ValueError( + "The 'parameters' argument (keyword-only) must be provided and non-empty." + ) + return budget_allocator( total_budget=total_budget, - channels=self.channel_columns, + channels=list(self.channel_columns), parameters=parameters, budget_ranges=budget_bounds, ) @@ -553,20 +590,17 @@ def compute_channel_estimate_points_original_scale(self) -> Dict: Dict A DataFrame with the estimated points. """ - parameters = {} channel_contributions = self.compute_channel_contribution_original_scale().mean( ["chain", "draw"] ) - for channel in self.channel_columns: - parameters[channel] = estimate_menten_parameters( - channel, self.X, channel_contributions - ) - - return parameters + return { + channel: estimate_menten_parameters(channel, self.X, channel_contributions) + for channel in self.channel_columns + } def plot_direct_contribution_curves( - self, show_estimations: bool = False + self, show_estimations: bool = False, x_stop=None ) -> plt.Figure: """ Plots the direct contribution curves. The term "direct" refers to the fact @@ -601,7 +635,9 @@ def plot_direct_contribution_curves( ax.scatter(x, y, label=f"{channel}", color=f"C{i}") if show_estimations: - self._plot_estimations(x, ax, channel, i) + self._plot_estimations( + x=x, ax=ax, channel=channel, color_index=i, x_stop=x_stop + ) ax.legend( loc="upper left", diff --git a/pymc_marketing/mmm/budget_optimizer.py b/pymc_marketing/mmm/budget_optimizer.py index d544a084..9c9771ea 100644 --- a/pymc_marketing/mmm/budget_optimizer.py +++ b/pymc_marketing/mmm/budget_optimizer.py @@ -1,5 +1,5 @@ # optimization_utils.py -from typing import Dict, List, Optional, Tuple, Union +from typing import Dict, List, Optional, Tuple import numpy as np from pandas import DataFrame @@ -8,18 +8,33 @@ from pymc_marketing.mmm.utils import michaelis_menten -def calculate_expected_contribution(parameters, optimal_budget): +def calculate_expected_contribution( + parameters: Dict[str, Tuple[float, float]], optimal_budget: Dict[str, float] +) -> Dict[str, float]: """ - Calculate the total expected contribution of budget allocations across various channels. + Calculate expected contributions using the Michaelis-Menten model. + + The Michaelis-Menten model describes the relationship between the allocated budget and + its expected contribution. As the budget increases, the contribution initially rises quickly + but eventually plateaus, highlighting diminishing returns on investment. + + Parameters + ---------- + parameters : Dict + The Michaelis-Menten parameters for each channel. Each entry is a tuple (L, k) where: + - L is the maximum potential contribution. + - k is the budget at which the contribution is half of its maximum. + optimal_budget : Dict + The optimized budget allocations for each channel. Returns ------- - dict + Dict A dictionary with channels as keys and their respective contributions as values. The key 'total' contains the total expected contribution. """ - total_expected_contribution = 0 + total_expected_contribution = 0.0 contributions = {} for channel, budget in optimal_budget.items(): @@ -32,22 +47,32 @@ def calculate_expected_contribution(parameters, optimal_budget): return contributions -def objective_distribution(x, channels, parameters): +def objective_distribution( + x: List[float], channels: List[str], parameters: Dict[str, Tuple[float, float]] +) -> float: """ - Calculate the objective function value for a given budget distribution. + Compute the total contribution for a given budget distribution. + + This function calculates the negative sum of contributions for a proposed budget + distribution using the Michaelis-Menten model. This value will be minimized in + the optimization process to maximize the total expected contribution. Parameters ---------- - x : list of float - The budget distribution across channels. + x : List of float + The proposed budget distribution across channels. + channels : List of str + The List of channels for which the budget is being optimized. + parameters : Dict + Michaelis-Menten parameters for each channel as described in `calculate_expected_contribution`. Returns ------- float - The value of the objective function given the budget distribution. + Negative of the total expected contribution for the given budget distribution. """ - sum_contributions = 0 + sum_contributions = 0.0 for channel, budget in zip(channels, x): L, k = parameters[channel] @@ -56,13 +81,44 @@ def objective_distribution(x, channels, parameters): return -1 * sum_contributions -def optimize_budget_distribution(total_budget, budget_ranges, parameters, channels): +def optimize_budget_distribution( + total_budget: int, + budget_ranges: Optional[Dict[str, Tuple[float, float]]], + parameters: Dict[str, Tuple[float, float]], + channels: List[str], +) -> Dict[str, float]: """ - Calculate the optimal budget distribution that minimizes the objective function. + Optimize the budget allocation across channels to maximize total contribution. + + Using the Michaelis-Menten model, this function seeks the best budget distribution across + channels that maximizes the total expected contribution. + + This function leverages the Sequential Least Squares Quadratic Programming (SLSQP) optimization + algorithm to find the best budget distribution across channels that maximizes the total + expected contribution based on the Michaelis-Menten model. + + The optimization is constrained such that: + 1. The sum of budgets across all channels equals the total available budget. + 2. The budget allocated to each individual channel lies within its specified range. + + The SLSQP method is particularly suited for this kind of problem as it can handle + both equality and inequality constraints. + + Parameters + ---------- + total_budget : int + The total budget to be distributed across channels. + budget_ranges : Dict or None + An optional dictionary defining the minimum and maximum budget for each channel. + If not provided, the budget for each channel is constrained between 0 and its L value. + parameters : Dict + Michaelis-Menten parameters for each channel as described in `calculate_expected_contribution`. + channels : list of str + The list of channels for which the budget is being optimized. Returns ------- - dict + Dict A dictionary with channels as keys and the optimal budget for each channel as values. """ @@ -72,11 +128,11 @@ def optimize_budget_distribution(total_budget, budget_ranges, parameters, channe if budget_ranges is None: budget_ranges = { - channel: [0, min(total_budget, parameters[channel][0])] + channel: (0, min(total_budget, parameters[channel][0])) for channel in channels } - initial_guess = [total_budget / len(channels)] * len(channels) + initial_guess = [total_budget // len(channels)] * len(channels) bounds = [budget_ranges[channel] for channel in channels] @@ -95,11 +151,12 @@ def optimize_budget_distribution(total_budget, budget_ranges, parameters, channe def budget_allocator( - total_budget: int = 1000, - channels: Union[List[str], Tuple[str]] = [], - parameters: Optional[Dict[str, Tuple[float, float]]] = {}, - budget_ranges: Optional[Dict[str, Tuple[float, float]]] = {}, + total_budget: int, + channels: List[str], + parameters: Dict[str, Tuple[float, float]], + budget_ranges: Optional[Dict[str, Tuple[float, float]]], ) -> DataFrame: + optimal_budget = optimize_budget_distribution( total_budget, budget_ranges, parameters, channels )