"""Implements the MDCEV model as a generic class
Michel Bierlaire
Sun Apr 7 16:52:33 2024
"""
from __future__ import annotations
import logging
import warnings
from abc import ABC, abstractmethod
from collections import defaultdict
from dataclasses import dataclass
from functools import lru_cache
from typing import Any, Iterable
import numpy as np
import pandas as pd
from scipy.optimize import minimize
from biogeme.biogeme import BIOGEME
from biogeme.calculator import (
evaluate_expression,
get_value_and_derivatives,
)
from biogeme.database import Database
from biogeme.exceptions import BiogemeError
from biogeme.expressions import Beta, Elem, Expression, Numeric, bioMultSum, exp, log
from biogeme.function_output import FunctionOutput
from biogeme.results_processing import EstimationResults
from biogeme.tools.checks import validate_dict_types
from .database_utils import mdcev_row_split
logger = logging.getLogger(__name__)
SMALLEST_NON_ZERO_NUMBER = 1.0e-6
[docs]
@dataclass(frozen=True)
class MdcevConfiguration:
"""Identify various configurations of the model"""
gamma_is_none: bool
price_is_none: bool
[docs]
class Mdcev(ABC):
def __init__(
self,
model_name: str,
baseline_utilities: dict[int, Expression],
gamma_parameters: dict[int, Expression | None],
alpha_parameters: dict[int, Expression] | None = None,
scale_parameter: Expression | None = None,
weights: Expression | None = None,
) -> None:
""" """
self.model_name: str = model_name
self.baseline_utilities: dict[int, Expression] = baseline_utilities
self.gamma_parameters: dict[int, Expression | None] = gamma_parameters
self.alpha_parameters: dict[int, Expression] | None = alpha_parameters
self.scale_parameter: Expression | None = scale_parameter
self.weights: Expression | None = weights
self._estimation_results: EstimationResults | None = None
self.database: Database | None = None
# Check the numbering
self.alternatives: set[int] = set(self.baseline_utilities)
# Map the indices with the keys
self.index_to_key: list[int] = [key for key in self.alternatives]
self.key_to_index: dict[int, int] = {
key: index for index, key in enumerate(self.index_to_key)
}
if len(self.alternatives) != len(list(self.baseline_utilities)):
error_msg = (
f'Some alternatives appear more than once in baseline utilities: '
f'{list(self.baseline_utilities)}'
)
raise BiogemeError(error_msg)
if error_messages := self._verify_dict_keys_against_set(
dict_to_check=self.gamma_parameters
):
error_msg = f'Gamma parameters: {", ".join(error_messages)}'
raise BiogemeError(error_msg)
if self.alpha_parameters:
if error_messages := self._verify_dict_keys_against_set(
dict_to_check=self.alpha_parameters
):
error_msg = f'Alpha parameters: {", ".join(error_messages)}'
raise BiogemeError(error_msg)
# Check if there is an outside good. It correspond to a gamma parameter set to None.
none_keys = [
key for key, value in self.gamma_parameters.items() if value is None
]
if len(none_keys) > 1:
error_msg = f'Only one outside good is allowed, not {len(none_keys)}'
raise BiogemeError(error_msg)
self.outside_good_key = none_keys[0] if len(none_keys) != 0 else None
validate_dict_types(self.baseline_utilities, 'baseline_utilities', Expression)
validate_dict_types(self.gamma_parameters, 'gamma_parameters', Expression)
if self.alpha_parameters is not None:
validate_dict_types(self.alpha_parameters, 'alpha_parameters', Expression)
if self.scale_parameter is not None:
if not isinstance(self.scale_parameter, Expression):
error_msg = f'Expecting a Biogeme expression and not {type(self.scale_parameter)}'
raise BiogemeError(error_msg)
@property
def outside_good_index(self) -> int | None:
"""Obtain the index of the outside good."""
if self.outside_good_key is None:
return None
return self.key_to_index[self.outside_good_key]
@property
def estimation_results(self) -> EstimationResults | None:
"""Property for the estimation results"""
return self._estimation_results
@estimation_results.setter
def estimation_results(self, the_results: EstimationResults):
self._estimation_results = the_results
self._update_parameters_in_expressions()
@property
def number_of_alternatives(self) -> int:
return len(self.alternatives)
def _verify_dict_keys_against_set(self, dict_to_check: dict[int, Any]) -> list[str]:
# Convert dictionary keys to a set
dict_keys = set(dict_to_check.keys())
# Find keys in the dictionary that are not in the reference set
extra_keys = dict_keys - self.alternatives
# Find keys in the reference set that are not in the dictionary
missing_keys = self.alternatives - dict_keys
# Constructing the error message based on the findings
error_messages = []
if extra_keys:
error_messages.append(f'Extra alternatives in dictionary: {extra_keys}.')
if missing_keys:
error_messages.append(
f'Missing alternatives from dictionary: {missing_keys}.'
)
return error_messages
[docs]
@lru_cache
def calculate_baseline_utility(
self, alternative_id: int, one_observation: Database
) -> float:
"""As this function may be called many times with the same input in forecasting mode, we use the
lru_cache decorator."""
assert one_observation.num_rows() == 1
if self.estimation_results:
return evaluate_expression(
expression=self.baseline_utilities[alternative_id],
numerically_safe=False,
database=one_observation,
betas=self.estimation_results.get_beta_values(),
aggregation=True,
use_jit=True,
)
return evaluate_expression(
expression=self.baseline_utilities[alternative_id],
numerically_safe=False,
database=one_observation,
aggregation=True,
use_jit=True,
)
[docs]
def calculate_utilities(
self, consumption: dict[int, Expression]
) -> dict[int, Expression]:
return {
alt_id: self.transformed_utility(alt_id, the_consumption)
for alt_id, the_consumption in consumption.items()
}
[docs]
@abstractmethod
def calculate_log_determinant_one_alternative(
self, the_id: int, consumption: Expression
) -> Expression:
pass
[docs]
def calculate_log_determinant_entries(
self, consumption: dict[int, Expression]
) -> dict[int, Expression]:
return {
alt_id: self.calculate_log_determinant_one_alternative(
alt_id, the_consumption
)
for alt_id, the_consumption in consumption.items()
}
[docs]
@abstractmethod
def calculate_inverse_of_determinant_one_alternative(
self, the_id: int, consumption: Expression
) -> Expression:
pass
[docs]
def calculate_inverse_of_determinant_entries(
self, consumption: dict[int, Expression]
) -> dict[int, Expression]:
return {
alt_id: self.calculate_inverse_of_determinant_one_alternative(
alt_id, the_consumption
)
for alt_id, the_consumption in consumption.items()
}
[docs]
def info_gamma_parameters(self) -> str:
"""Provides logging information about the outside good"""
none_count = sum(1 for value in self.gamma_parameters.values() if value is None)
if none_count == 0:
report = 'No outside good is included in the model.'
logger.info(report)
return report
if none_count == 1:
report = 'One outside good is included in the model.'
logger.info(report)
return report
report = (
'Several outside goods are included in the model. If it is intentional, ignore this warning. '
'If not, update the definition of the gamma parameters. The outside good, if any, must be associated with '
'None, and all other alternative with a gamma parameter.'
)
logger.warning(report)
return report
[docs]
def loglikelihood(
self,
number_of_chosen_alternatives: Expression,
consumed_quantities: dict[int, Expression],
) -> Expression:
"""Generate the Biogeme formula for the log probability of the MDCEV model
:param number_of_chosen_alternatives: see the module documentation :mod:`biogeme.mdcev_no_outside_good`
:param consumed_quantities: see the module documentation :mod:`biogeme.mdcev_no_outside_good`
A detailed explanation is provided in the technical report
"Estimating the MDCEV model with Biogeme"
"""
validate_dict_types(consumed_quantities, 'consumed_quantities', Expression)
utilities = self.calculate_utilities(consumption=consumed_quantities)
log_determinant_entries = self.calculate_log_determinant_entries(
consumption=consumed_quantities
)
inverse_of_determinant_entries = self.calculate_inverse_of_determinant_entries(
consumption=consumed_quantities
)
# utility of chosen goods
utility_terms = [
Elem({0: 0.0, 1: util}, consumed_quantities[i] > 0)
for i, util in utilities.items()
]
if self.scale_parameter is None:
baseline_term = bioMultSum(utility_terms)
else:
baseline_term = self.scale_parameter * bioMultSum(utility_terms)
# Determinant: first term
first_determinant_terms = [
Elem({0: 0.0, 1: z}, consumed_quantities[i] > 0)
for i, z in log_determinant_entries.items()
]
first_determinant = bioMultSum(first_determinant_terms)
# Determinant: second term
second_determinant_terms = [
Elem({0: 0.0, 1: z}, consumed_quantities[i] > 0)
for i, z in inverse_of_determinant_entries.items()
]
second_determinant = log(bioMultSum(second_determinant_terms))
# Logsum
if self.scale_parameter is None:
terms_for_logsum = [exp(util) for util in utilities.values()]
else:
terms_for_logsum = [
exp(self.scale_parameter * util) for util in utilities.values()
]
logsum_term = number_of_chosen_alternatives * log(bioMultSum(terms_for_logsum))
log_prob = baseline_term + first_determinant + second_determinant - logsum_term
# Scale parameter
if self.scale_parameter is not None:
log_prob += (number_of_chosen_alternatives - 1) * log(self.scale_parameter)
return log_prob
[docs]
def estimate_parameters(
self,
database: Database,
number_of_chosen_alternatives: Expression,
consumed_quantities: dict[int, Expression],
**kwargs,
) -> EstimationResults:
"""Generate the Biogeme formula for the log probability of the MDCEV model
:param database: data needed for the estimation of the parameters, in Biogeme format.
:param number_of_chosen_alternatives: see the module documentation :mod:`biogeme.mdcev_no_outside_good`
:param consumed_quantities: see the module documentation :mod:`biogeme.mdcev_no_outside_good`
:param **kwargs: additional parameters that are transmitted as such to the constructor of the Biogeme object.
A detailed explanation is provided in the technical report
"Estimating the MDCEV model with Biogeme":
"""
logprob = self.loglikelihood(
number_of_chosen_alternatives=number_of_chosen_alternatives,
consumed_quantities=consumed_quantities,
)
# Create the Biogeme object
if self.weights is None:
the_biogeme = BIOGEME(database, logprob, **kwargs)
else:
formulas = {'log_like': logprob, 'weight': self.weights}
the_biogeme = BIOGEME(database, formulas, **kwargs)
the_biogeme.modelName = self.model_name
self.estimation_results = the_biogeme.estimate()
return self.estimation_results
@abstractmethod
def _list_of_expressions(self) -> list[Expression]:
"""Extract the list of expressions involved in the model"""
raise NotImplementedError
def _update_parameters_in_expressions(self) -> None:
"""Update the value of the unknown parameters in expression, after estimation"""
if self._estimation_results is None:
error_msg = 'No estimation result is available'
raise BiogemeError(error_msg)
betas = self._estimation_results.get_beta_values()
all_expressions = [
expression
for group in [
self.baseline_utilities.values(),
self.gamma_parameters.values(),
(
self.alpha_parameters.values()
if self.alpha_parameters is not None
else []
),
[self.scale_parameter] if self.scale_parameter is not None else [],
[self.weights] if self.weights is not None else [],
]
for expression in group
if expression is not None
]
# ADd the expressions from the child class
all_expressions += self._list_of_expressions()
# Update each expression with the new beta values
for expression in all_expressions:
expression.change_init_values(betas)
[docs]
@abstractmethod
def utility_one_alternative(
self,
the_id: int,
the_consumption: float,
epsilon: float,
one_observation: Database,
) -> float:
"""Utility needed for forecasting"""
raise NotImplementedError
[docs]
@abstractmethod
def utility_expression_one_alternative(
self,
the_id: int,
the_consumption: Expression,
unscaled_epsilon: Expression,
) -> Expression:
"""Utility expression. Used only for code validation."""
raise NotImplementedError
[docs]
def sum_of_utilities(
self, consumptions: np.ndarray, epsilon: np.ndarray, data_row: Database
) -> float:
"""Calculates the sum of all utilities. Used for forecasting."""
try:
utilities = [
self.utility_one_alternative(
the_id=key,
the_consumption=float(consumptions[index]),
epsilon=float(epsilon[index]),
one_observation=data_row,
)
for index, key in enumerate(self.index_to_key)
]
except IndexError as e:
raise e
return np.sum(utilities)
[docs]
def forecast_bruteforce_one_draw(
self, one_row_database: Database, total_budget: float, epsilon: np.ndarray
) -> dict[int, float] | None:
"""Forecast the optimal expenditures given a budget and one realization of epsilon for each alternative.
If there is an issue with the optimization algorithm, None is returned.
"""
if len(epsilon) != self.number_of_alternatives:
error_msg = f'epsilon must be a vector of size {self.number_of_alternatives}, not {epsilon.shape}'
raise BiogemeError(error_msg)
if len(one_row_database.dataframe) != 1:
error_msg = (
f'Expecting exactly one row, not {len(one_row_database.dataframe)}'
)
raise BiogemeError(error_msg)
self.database = one_row_database
# If there is an outside good, we need to impose that the corresponding consumption is not zero.
# To do that, we impose that it is equal to the exponential of a new variable.
outside_good = self.outside_good_key is not None
def objective_function(x: np.ndarray) -> float:
"""Objective function. The number of variables is the number of
alternatives"""
result = self.sum_of_utilities(
consumptions=x, epsilon=epsilon, data_row=one_row_database
)
return -result
def budget_constraint(x: np.ndarray) -> float:
"""Budget constraint when there is no outside good. The number of variables is the number of
alternatives"""
return total_budget - x.sum()
def opposite_budget_constraint(x: np.ndarray) -> float:
"""Budget constraint when there is no outside good. The number of variables is the number of
alternatives"""
return x.sum() - total_budget
constraints = [
{
'type': 'ineq',
'fun': budget_constraint,
},
{
'type': 'ineq',
'fun': opposite_budget_constraint,
},
]
number_of_alternatives = len(self.alternatives)
# Bounds
bounds: list[tuple[float | None, float | None]] = [
(0, total_budget) for _ in range(number_of_alternatives)
]
if self.outside_good_index is not None:
bounds[self.outside_good_index] = (SMALLEST_NON_ZERO_NUMBER, total_budget)
# Starting point
# We split the budget equally across alternatives
initial_consumption = total_budget / number_of_alternatives
# We split equally across alternatives
initial_guess = np.array([initial_consumption] * number_of_alternatives)
# There is a bug in the minimize function. It generates a warning that crashes the script.
# We are converting this warning into an exception in order to catch it.
# Convert specific warnings to exceptions
warnings.filterwarnings(
'error',
message='Values in x were outside bounds during a minimize step, clipping to bounds',
)
try:
optimization_result = minimize(
objective_function,
initial_guess,
method='SLSQP',
bounds=bounds,
constraints=constraints,
)
optimal_consumption = {
self.index_to_key[index]: optimization_result.x[index]
for index in range(number_of_alternatives)
}
for alternative in self.alternatives:
if alternative not in optimal_consumption:
optimal_consumption[alternative] = 0
return optimal_consumption
except RuntimeWarning as e:
logger.warning(e)
return None
[docs]
@abstractmethod
def derivative_utility_one_alternative(
self,
the_id: int,
the_consumption: float,
epsilon: float,
one_observation: Database,
) -> float:
"""Used in the optimization problem solved for forecasting tp calculate the dual variable."""
raise NotImplementedError
[docs]
def optimal_consumption(
self,
chosen_alternatives: Iterable[int],
dual_variable: float,
epsilon: np.ndarray,
one_observation: Database,
) -> dict[int, float]:
"""Analytical calculation of the optimal consumption if the dual variable is known."""
try:
result = {
alt_id: self.optimal_consumption_one_alternative(
the_id=alt_id,
dual_variable=dual_variable,
epsilon=float(epsilon[self.key_to_index[alt_id]]),
one_observation=one_observation,
)
for alt_id in chosen_alternatives
}
except KeyError as e:
raise e
return result
[docs]
@abstractmethod
def optimal_consumption_one_alternative(
self,
the_id: int,
dual_variable: float,
epsilon: float,
one_observation: Database,
) -> float:
"""Analytical calculation of the optimal consumption if the dual variable is known."""
raise NotImplementedError
[docs]
@abstractmethod
def lower_bound_dual_variable(
self,
chosen_alternatives: set[int],
one_observation: Database,
epsilon: np.ndarray,
) -> float:
"""Method providing model specific bounds on the dual variable. It not overloaded,
default values are used.
:param chosen_alternatives: list of alternatives that are chosen at the optimal solution
:param one_observation: data for one observation.
:param epsilon: draws from the error term.
:return: a lower bound on the dual variable, such that the expenditure calculated for any larger value is
well-defined and non negative.
"""
raise NotImplementedError
[docs]
def is_next_alternative_chosen(
self,
chosen_alternatives: set[int],
candidate_alternative: int,
derivative_zero_expenditure_candidate: float,
database: Database,
total_budget: float,
epsilon: np.ndarray,
) -> tuple[bool, float | None]:
# If the candidate alternative is the outside good, it is in the choice set by definition.
model_lower_bound = self.lower_bound_dual_variable(
chosen_alternatives=chosen_alternatives,
one_observation=database,
epsilon=epsilon,
)
if derivative_zero_expenditure_candidate < model_lower_bound:
# Lower bound is violated. Alternative not in choice set.
return False, model_lower_bound
candidate_set = chosen_alternatives | {candidate_alternative}
# Derive the optimal consumption
estimate_optimal_consumption_candidate = self.optimal_consumption(
chosen_alternatives=candidate_set,
dual_variable=derivative_zero_expenditure_candidate,
epsilon=epsilon,
one_observation=database,
)
total_consumption_candidate = sum(
estimate_optimal_consumption_candidate.values()
)
if total_budget <= total_consumption_candidate:
# Total budget is overpassed. Alternative not in the choice set.
return False, derivative_zero_expenditure_candidate
return True, None
[docs]
def identification_chosen_alternatives(
self,
database: Database,
total_budget: float,
epsilon: np.ndarray,
) -> tuple[set[int], float, float]:
"""Algorithm identifying the chosen alternatives at the optimal solution
:param database: one row of the database.
:param total_budget: total budget for the constraint.
:param epsilon: vector of draws from the error term.
:return: the set of chosen alternatives, as well as a lower and an upper bound on the dual variable.
"""
the_unsorted_w: dict[int, float] = {
alt_id: self.derivative_utility_one_alternative(
the_id=alt_id,
one_observation=database,
the_consumption=0,
epsilon=float(epsilon[self.key_to_index[alt_id]]),
)
for alt_id in self.alternatives
if alt_id != self.outside_good_key
}
# Sort the dictionary by values (descending order)
ordered_alternatives: list[int] = [
alt_id
for alt_id, _ in sorted(
the_unsorted_w.items(), key=lambda item: item[1], reverse=True
)
]
chosen_alternatives: set[int] = (
set() if self.outside_good_key is None else {self.outside_good_key}
)
last_chosen_alternative = None
for candidate_alternative_id in ordered_alternatives:
derivative_zero_expenditure_candidate = the_unsorted_w[
candidate_alternative_id
]
next_chosen, lower_bound = self.is_next_alternative_chosen(
chosen_alternatives=chosen_alternatives,
candidate_alternative=candidate_alternative_id,
derivative_zero_expenditure_candidate=derivative_zero_expenditure_candidate,
database=database,
total_budget=total_budget,
epsilon=epsilon,
)
if not next_chosen:
upper_bound = (
np.finfo(np.float64).max
if last_chosen_alternative is None
else the_unsorted_w[last_chosen_alternative]
)
return (
chosen_alternatives,
lower_bound,
upper_bound,
)
chosen_alternatives |= {candidate_alternative_id}
last_chosen_alternative = candidate_alternative_id
# The full choice set is chosen
lower_bound = self.lower_bound_dual_variable(
chosen_alternatives=chosen_alternatives,
one_observation=database,
epsilon=epsilon,
)
upper_bound = the_unsorted_w[ordered_alternatives[-1]]
return chosen_alternatives, lower_bound, upper_bound
[docs]
def forecast_bisection_one_draw(
self,
one_row_of_database: Database,
total_budget: float,
epsilon: np.ndarray,
tolerance_dual=1.0e-13,
tolerance_budget=1.0e-13,
) -> dict[int, float]:
chosen_alternatives, lower_bound, upper_bound = (
self.identification_chosen_alternatives(
database=one_row_of_database, total_budget=total_budget, epsilon=epsilon
)
)
if lower_bound > upper_bound:
error_msg = (
f'Lower bound: {lower_bound} larger than upper bound: {upper_bound}. '
f'Chosen alternatives: {chosen_alternatives}'
)
raise BiogemeError(error_msg)
# Estimate the dual variable by bisection
total_consumption = 0
continue_iterations = True
for _ in range(5000):
if not continue_iterations:
break
dual_variable = (lower_bound + upper_bound) / 2
optimal_consumption = self.optimal_consumption(
chosen_alternatives=chosen_alternatives,
dual_variable=dual_variable,
epsilon=epsilon,
one_observation=one_row_of_database,
)
negative_consumption = any(
value < 0 for value in optimal_consumption.values()
)
if negative_consumption:
raise ValueError('Negative consumption')
if negative_consumption:
# If any consumption is negative, we update the upper bound
upper_bound = dual_variable
else:
total_consumption = sum(optimal_consumption.values())
if total_consumption < total_budget:
upper_bound = dual_variable
elif total_consumption > total_budget:
lower_bound = dual_variable
# Stopping criteria
if (upper_bound - lower_bound) <= tolerance_dual:
continue_iterations = False
elif np.abs(total_consumption - total_budget) <= tolerance_budget:
continue_iterations = False
dual_variable = (lower_bound + upper_bound) / 2
optimal_consumption = self.optimal_consumption(
chosen_alternatives=chosen_alternatives,
dual_variable=dual_variable,
epsilon=epsilon,
one_observation=one_row_of_database,
)
for alternative in self.alternatives:
if alternative not in optimal_consumption:
optimal_consumption[alternative] = 0
return optimal_consumption
[docs]
def validation_one_alternative_one_consumption(
self, alternative_id: int, value_consumption: float, one_row: Database
) -> list[str]:
"""Validation of some of the abstract methods."""
error_messages = []
value_epsilon = 0.01
consumption = Beta('consumption', value_consumption, None, None, 0)
unscaled_epsilon = Numeric(value_epsilon)
utility = self.utility_expression_one_alternative(
the_id=alternative_id,
the_consumption=consumption,
unscaled_epsilon=unscaled_epsilon,
)
result: FunctionOutput = get_value_and_derivatives(
expression=utility,
database=one_row,
gradient=True,
named_results=True,
numerically_safe=False,
use_jit=True,
)
# Validate the utility calculation
calculated_utility_value = self.utility_one_alternative(
the_id=alternative_id,
the_consumption=value_consumption,
epsilon=value_epsilon,
one_observation=one_row,
)
if not np.isclose(result.function, calculated_utility_value):
error_msg = f'Inconsistent utility values for alt. {alternative_id}: {result.function} and {calculated_utility_value}'
error_messages.append(error_msg)
# Validate the derivative
calculated_derivative = self.derivative_utility_one_alternative(
the_id=alternative_id,
the_consumption=value_consumption,
epsilon=value_epsilon,
one_observation=one_row,
)
the_gradient = result.gradient['consumption']
if not np.isclose(the_gradient, calculated_derivative):
error_msg = f'Inconsistent derivatives for alt. {alternative_id}: {the_gradient} and {calculated_derivative}'
error_messages.append(error_msg)
return error_messages
[docs]
def validation_one_alternative(
self, alternative_id: int, one_row: Database
) -> list[str]:
values = [1, 10, 100]
error_messages = sum(
(
self.validation_one_alternative_one_consumption(
alternative_id=alternative_id,
value_consumption=consumption_value,
one_row=one_row,
)
for consumption_value in values
),
[],
)
# Validate the optimal consumption
dual_variable = 10
value_epsilon = 0.01
optimal_consumption = self.optimal_consumption_one_alternative(
the_id=alternative_id,
dual_variable=dual_variable,
epsilon=value_epsilon,
one_observation=one_row,
)
# If we calculate the derivative for this level of consumption, we should find the dual variable.
calculated_derivative = self.derivative_utility_one_alternative(
the_id=alternative_id,
the_consumption=optimal_consumption,
epsilon=value_epsilon,
one_observation=one_row,
)
if not np.isclose(calculated_derivative, dual_variable):
error_msg = (
f'Inconsistent dual variables for alt. {alternative_id}: '
f'{calculated_derivative} and {dual_variable}'
)
error_messages.append(error_msg)
return error_messages
[docs]
def validation(self, one_row: Database) -> list[str]:
"""Validation of some abstract methods for all alternatives."""
return sum(
(
self.validation_one_alternative(alternative_id=the_id, one_row=one_row)
for the_id in self.alternatives
),
[],
)
[docs]
def forecast(
self,
database: Database,
total_budget: float,
epsilons: list[np.ndarray],
brute_force: bool = False,
tolerance_dual: float = 1.0e-10,
tolerance_budget: float = 1.0e-10,
) -> list[pd.DataFrame]:
"""Forecast the optimal expenditures given a budget and one realization of epsilon for each alternative
:param database: database containing the values of the explanatory variables.
:param total_budget: total budget.
:param epsilons: draws from the error terms provided by the user. Each entry of the list corresponds
to an observation in the database, and consists of a data frame of
size=(number_of_draws, self.number_of_alternatives).
:param brute_force: if True, the brute force algorithm is applied. It solves each optimization problem using
the scipy optimization algorithm. If False, the method proposed by Pinjari and Bhat (2021) is used.
:param tolerance_dual: convergence criterion for the estimate of the dual variable.
:param tolerance_budget: convergence criterion for the estimate of the total budget.
:return: a list of data frames, each containing the results for each draw
.. [PinjBhat21] A. R. Pinjari, C. Bhat, Computationally efficient forecasting procedures for Kuhn-Tucker
consumer demand model systems: Application to residential energy consumption analysis, Journal of Choice
Modelling, Volume 39, 2021, 100283.
"""
rows_of_database = [
Database(name=f'row_{i}', dataframe=database.dataframe.iloc[[i]])
for i in range(len(database.dataframe))
]
if len(rows_of_database) == 0:
error_msg = 'Empty database'
raise BiogemeError(error_msg)
if len(epsilons) != len(rows_of_database):
error_msg = (
f'User provided draws have {len(epsilons)} entries while there are '
f'{len(rows_of_database)} observations in the sample.'
)
raise BiogemeError(error_msg)
number_of_draws = None
for index, epsilon in enumerate(epsilons):
if number_of_draws is None:
number_of_draws = epsilon.shape[0]
elif epsilon.shape[0] != number_of_draws:
error_msg = (
f'User defined draws for obs. {index} contains {epsilon.shape[0]} '
f'draws instead of {number_of_draws}'
)
raise error_msg
if epsilon.shape[1] != self.number_of_alternatives:
error_msg = (
f'User defined draws for obs. {index} contains {epsilon.shape[1]} '
f'alternatives instead of {self.number_of_alternatives}'
)
raise error_msg
all_results = []
for index, row in enumerate(rows_of_database):
logger.info(
f'Forecasting observation {index} / {len(rows_of_database)} [{number_of_draws} draws]'
)
# Forecasting
optimal_expenditures = (
[
self.forecast_bruteforce_one_draw(
one_row_database=row, total_budget=total_budget, epsilon=epsilon
)
for epsilon in epsilons[index]
]
if brute_force
else [
self.forecast_bisection_one_draw(
one_row_of_database=row,
total_budget=total_budget,
epsilon=epsilon,
tolerance_budget=tolerance_budget,
tolerance_dual=tolerance_dual,
)
for epsilon in epsilons[index]
]
)
gather_data = defaultdict(list)
# Collecting values for each key
for optimal_solution in optimal_expenditures:
if optimal_solution is not None:
for key, value in optimal_solution.items():
gather_data[key].append(value)
all_results.append(pd.DataFrame(gather_data).sort_index(axis='columns'))
return all_results
[docs]
def forecast_comparison_one_draw(
self,
one_row_of_database: Database,
total_budget: float,
epsilon: np.array,
tolerance_dual: float = 1.0e-10,
tolerance_budget: float = 1.0e-10,
) -> dict[int, float] | None:
"""Compares the results from each of the two forecasting algorithms.
:param one_row_of_database: list of rows from the database containing the values of the explanatory variables.
:param total_budget: total budget.
:param epsilon: draws from the error terms provided by the user. Length: self.number_of_alternatives.
:param tolerance_dual: convergence criterion for the estimate of the dual variable.
:param tolerance_budget: convergence criterion for the estimate of the total budget.
:return: None. Warning are triggered if discrepancies are observed.
"""
if len(epsilon) != self.number_of_alternatives:
error_msg = f'There are {self.number_of_alternatives} alternatives and {len(epsilon)} epsilons'
raise BiogemeError(error_msg)
logger.info('============ Comparison ===================')
brute_force: dict[int, float] | None = self.forecast_bruteforce_one_draw(
one_row_database=one_row_of_database,
total_budget=total_budget,
epsilon=epsilon,
)
consumption_brute_force = (
np.array([value for key, value in sorted(brute_force.items())])
if brute_force is not None
else None
)
obj_brute_force = (
self.sum_of_utilities(
consumptions=consumption_brute_force,
epsilon=epsilon,
data_row=one_row_of_database,
)
if brute_force is not None
else None
)
constraint_brute_force = (
sum(consumption_brute_force) if brute_force is not None else None
)
choice_set_brute_force: set[int] | None = None
if brute_force is None:
logger.info('Brute force algorithm failed')
else:
choice_set_brute_force = set(
[key for key, value in brute_force.items() if not np.isclose(value, 0)]
)
formatted_brute_force = {
k: f'{value:.3g}' for k, value in brute_force.items()
}
logger.info(
f'Brute force: {formatted_brute_force} objective {obj_brute_force:.3g}, constraint {constraint_brute_force:.3g}, choice set {choice_set_brute_force}'
)
analytical = self.forecast_bisection_one_draw(
one_row_of_database=one_row_of_database,
total_budget=total_budget,
epsilon=epsilon,
tolerance_budget=tolerance_budget,
tolerance_dual=tolerance_dual,
)
choice_set_analytical: set[int] = set(
[key for key, value in analytical.items() if not np.isclose(value, 0)]
)
consumption_analytical = (
np.array([value for key, value in sorted(analytical.items())])
if analytical is not None
else None
)
try:
obj_analytical = (
self.sum_of_utilities(
consumptions=consumption_analytical,
epsilon=epsilon,
data_row=one_row_of_database,
)
if analytical is not None
else None
)
except RuntimeWarning as e:
logger.warning(f'{analytical=}')
logger.warning(f'{consumption_analytical=}')
raise e
constraint_analytical = (
sum(consumption_analytical) if analytical is not None else None
)
if analytical is None:
logger.info('Analytical algorithm failed')
else:
formatted_analytical = {
k: f'{value:.3g}' for k, value in analytical.items()
}
logger.info(
f'Analytical: {formatted_analytical} objective {obj_analytical:.3g}, constraint {constraint_analytical:.3g}, choice set {choice_set_analytical}'
)
if brute_force is None and analytical is None:
logger.warning('Both algorithms failed.')
return
if brute_force is None:
logger.warning('Brute force algorithm failed.')
return
if analytical is None:
logger.warning('Analytical algorithm failed.')
return
if (
choice_set_brute_force != choice_set_analytical
and choice_set_brute_force is not None
):
logger.warning(
f'Different optimal choice sets: analytical {choice_set_analytical}, brute force {choice_set_brute_force}'
)
if not np.isclose(obj_analytical, obj_brute_force):
logger.warning(
f'Difference between optimal utility with analytical [{obj_analytical:.4g}] and brute '
f'force [{obj_brute_force:.4g}] algorithms.'
)
brute_force_results = ', '.join(
[f'{key}: {value:.3g}' for key, value in brute_force.items()]
)
analytical_results = ', '.join(
[f'{key}: {value:.3g}' for key, value in analytical.items()]
)
logger.warning(f'Solution with brute force: {brute_force_results}')
logger.warning(f'Solution with analytical: {analytical_results}')
if not np.isclose(constraint_analytical, constraint_brute_force):
logger.warning(
f'Difference between constraint with analytical [{constraint_analytical}] and brute force '
f'[{constraint_brute_force}] algorithms.'
)
[docs]
def generate_epsilons(
self, number_of_observations: int, number_of_draws: int
) -> list[np.ndarray]:
"""Generate draws for the error terms.
:param number_of_observations: number of entries in the database.
:param number_of_draws: number of draws to generate
:return: a list of length "number_of_observations". Each element is a
number_of_draws x self.number_of_alternatives array.
"""
return [
np.random.gumbel(
loc=0, scale=1, size=(number_of_draws, self.number_of_alternatives)
)
for _ in range(number_of_observations)
]
[docs]
def validate_forecast(
self,
database: Database,
total_budget: float,
epsilons: list[np.ndarray],
tolerance_dual: float = 1.0e-13,
tolerance_budget: float = 1.0e-13,
) -> None:
"""Compare the two algorithms to forecast the optimal expenditures given a budget and one realization of
epsilon for each alternative
:param database: database containing the values of the explanatory variables.
:param total_budget: total budget.
:param epsilons: draws from the error terms. Each entry of the list corresponds
to an observation in the database, and consists of a data frame of
size=(number_of_draws, self.number_of_alternatives).
:param tolerance_dual: convergence criterion for the estimate of the dual variable.
:param tolerance_budget: convergence criterion for the estimate of the total budget.
:return: None. Warning are triggered if discrepancies are observed
"""
rows_of_database = mdcev_row_split(database)
if len(rows_of_database) == 0:
error_msg = 'Empty database'
raise BiogemeError(error_msg)
if len(epsilons) != len(rows_of_database):
error_msg = (
f'User provided draws have {len(epsilons)} entries while there are '
f'{len(rows_of_database)} observations in the sample.'
)
raise BiogemeError(error_msg)
number_of_draws = len(epsilons[0])
for index, row in enumerate(rows_of_database):
logger.info(
f'Forecasting observation {index} / {len(rows_of_database)} [{number_of_draws} draws]'
)
# Forecasting
for epsilon in epsilons[index]:
self.forecast_comparison_one_draw(
total_budget=total_budget,
one_row_of_database=row,
epsilon=epsilon,
tolerance_budget=tolerance_budget,
tolerance_dual=tolerance_dual,
)