"""Implements the MDCEV model as a generic class
Michel Bierlaire
Sun Apr 7 16:52:33 2024
"""
from __future__ import annotations
import logging
import warnings
from abc import ABC, abstractmethod
from collections import defaultdict
from dataclasses import dataclass
from functools import lru_cache
from typing import Any, Iterable
import numpy as np
import pandas as pd
from scipy.optimize import minimize
from biogeme.biogeme import BIOGEME
from biogeme.database import Database
from biogeme.exceptions import BiogemeError
from biogeme.expressions import Expression, Elem, bioMultSum, log, exp, Beta, Numeric
from biogeme.function_output import FunctionOutput
from biogeme.results import bioResults
from biogeme.tools.checks import validate_dict_types
logger = logging.getLogger(__name__)
SMALLEST_NON_ZERO_NUMBER = 1.0e-6
[docs]
@dataclass(frozen=True)
class MdcevConfiguration:
"""Identify various configurations of the model"""
gamma_is_none: bool
price_is_none: bool
[docs]
class Mdcev(ABC):
def __init__(
self,
model_name: str,
baseline_utilities: dict[int, Expression],
gamma_parameters: dict[int, Expression | None],
alpha_parameters: dict[int, Expression] | None = None,
scale_parameter: Expression | None = None,
weights: Expression | None = None,
) -> None:
""" """
self.model_name: str = model_name
self.baseline_utilities: dict[int, Expression] = baseline_utilities
self.gamma_parameters: dict[int, Expression | None] = gamma_parameters
self.alpha_parameters: dict[int, Expression] | None = alpha_parameters
self.scale_parameter: Expression | None = scale_parameter
self.weights: Expression | None = weights
self._estimation_results: bioResults | None = None
self.database: Database | None = None
# Check the numbering
self.alternatives: set[int] = set(self.baseline_utilities)
# Map the indices with the keys
self.index_to_key: list[int] = [key for key in self.alternatives]
self.key_to_index: dict[int, int] = {
key: index for index, key in enumerate(self.index_to_key)
}
if len(self.alternatives) != len(list(self.baseline_utilities)):
error_msg = (
f'Some alternatives appear more than once in baseline utilities: '
f'{list(self.baseline_utilities)}'
)
raise BiogemeError(error_msg)
if error_messages := self._verify_dict_keys_against_set(
dict_to_check=self.gamma_parameters
):
error_msg = f'Gamma parameters: {", ".join(error_messages)}'
raise BiogemeError(error_msg)
if self.alpha_parameters:
if error_messages := self._verify_dict_keys_against_set(
dict_to_check=self.alpha_parameters
):
error_msg = f'Alpha parameters: {", ".join(error_messages)}'
raise BiogemeError(error_msg)
# Check if there is an outside good. It correspond to a gamma parameter set to None.
none_keys = [
key for key, value in self.gamma_parameters.items() if value is None
]
if len(none_keys) > 1:
error_msg = f'Only one outside good is allowed, not {len(none_keys)}'
raise BiogemeError(error_msg)
self.outside_good_key = none_keys[0] if len(none_keys) != 0 else None
validate_dict_types(self.baseline_utilities, 'baseline_utilities', Expression)
validate_dict_types(self.gamma_parameters, 'gamma_parameters', Expression)
if self.alpha_parameters is not None:
validate_dict_types(self.alpha_parameters, 'alpha_parameters', Expression)
if self.scale_parameter is not None:
if not isinstance(self.scale_parameter, Expression):
error_msg = f'Expecting a Biogeme expression and not {type(self.scale_parameter)}'
raise BiogemeError(error_msg)
@property
def outside_good_index(self) -> int | None:
"""Obtain the index of the outside good."""
if self.outside_good_key is None:
return None
return self.key_to_index[self.outside_good_key]
@property
def estimation_results(self) -> bioResults | None:
"""Property for the estimation results"""
return self._estimation_results
@estimation_results.setter
def estimation_results(self, the_results: bioResults):
self._estimation_results = the_results
self._update_parameters_in_expressions()
@property
def number_of_alternatives(self) -> int:
return len(self.alternatives)
def _verify_dict_keys_against_set(self, dict_to_check: dict[int, Any]) -> list[str]:
# Convert dictionary keys to a set
dict_keys = set(dict_to_check.keys())
# Find keys in the dictionary that are not in the reference set
extra_keys = dict_keys - self.alternatives
# Find keys in the reference set that are not in the dictionary
missing_keys = self.alternatives - dict_keys
# Constructing the error message based on the findings
error_messages = []
if extra_keys:
error_messages.append(f'Extra alternatives in dictionary: {extra_keys}.')
if missing_keys:
error_messages.append(
f'Missing alternatives from dictionary: {missing_keys}.'
)
return error_messages
[docs]
@lru_cache
def calculate_baseline_utility(
self, alternative_id: int, one_observation: Database
) -> float:
"""As this function may be called many times with the same input in forecasting mode, we use the
lru_cache decorator."""
assert one_observation.get_sample_size() == 1
if self.estimation_results:
return self.baseline_utilities[alternative_id].get_value_c(
database=one_observation,
betas=self.estimation_results.get_beta_values(),
prepare_ids=True,
)[0]
return self.baseline_utilities[alternative_id].get_value_c(
database=one_observation,
prepare_ids=True,
)[0]
[docs]
def calculate_utilities(
self, consumption: dict[int, Expression]
) -> dict[int, Expression]:
return {
alt_id: self.transformed_utility(alt_id, the_consumption)
for alt_id, the_consumption in consumption.items()
}
[docs]
@abstractmethod
def calculate_log_determinant_one_alternative(
self, the_id: int, consumption: Expression
) -> Expression:
pass
[docs]
def calculate_log_determinant_entries(
self, consumption: dict[int, Expression]
) -> dict[int, Expression]:
return {
alt_id: self.calculate_log_determinant_one_alternative(
alt_id, the_consumption
)
for alt_id, the_consumption in consumption.items()
}
[docs]
@abstractmethod
def calculate_inverse_of_determinant_one_alternative(
self, the_id: int, consumption: Expression
) -> Expression:
pass
[docs]
def calculate_inverse_of_determinant_entries(
self, consumption: dict[int, Expression]
) -> dict[int, Expression]:
return {
alt_id: self.calculate_inverse_of_determinant_one_alternative(
alt_id, the_consumption
)
for alt_id, the_consumption in consumption.items()
}
[docs]
def info_gamma_parameters(self) -> str:
"""Provides logging information about the outside good"""
none_count = sum(1 for value in self.gamma_parameters.values() if value is None)
if none_count == 0:
report = 'No outside good is included in the model.'
logger.info(report)
return report
if none_count == 1:
report = 'One outside good is included in the model.'
logger.info(report)
return report
report = (
'Several outside goods are included in the model. If it is intentional, ignore this warning. '
'If not, update the definition of the gamma parameters. The outside good, if any, must be associated with '
'None, and all other alternative with a gamma parameter.'
)
logger.warning(report)
return report
[docs]
def loglikelihood(
self,
number_of_chosen_alternatives: Expression,
consumed_quantities: dict[int, Expression],
) -> Expression:
"""Generate the Biogeme formula for the log probability of the MDCEV model
:param number_of_chosen_alternatives: see the module documentation :mod:`biogeme.mdcev_no_outside_good`
:param consumed_quantities: see the module documentation :mod:`biogeme.mdcev_no_outside_good`
A detailed explanation is provided in the technical report
"Estimating the MDCEV model with Biogeme"
"""
validate_dict_types(consumed_quantities, 'consumed_quantities', Expression)
utilities = self.calculate_utilities(consumption=consumed_quantities)
log_determinant_entries = self.calculate_log_determinant_entries(
consumption=consumed_quantities
)
inverse_of_determinant_entries = self.calculate_inverse_of_determinant_entries(
consumption=consumed_quantities
)
# utility of chosen goods
utility_terms = [
Elem({0: 0.0, 1: util}, consumed_quantities[i] > 0)
for i, util in utilities.items()
]
if self.scale_parameter is None:
baseline_term = bioMultSum(utility_terms)
else:
baseline_term = self.scale_parameter * bioMultSum(utility_terms)
# Determinant: first term
first_determinant_terms = [
Elem({0: 0.0, 1: z}, consumed_quantities[i] > 0)
for i, z in log_determinant_entries.items()
]
first_determinant = bioMultSum(first_determinant_terms)
# Determinant: second term
second_determinant_terms = [
Elem({0: 0.0, 1: z}, consumed_quantities[i] > 0)
for i, z in inverse_of_determinant_entries.items()
]
second_determinant = log(bioMultSum(second_determinant_terms))
# Logsum
if self.scale_parameter is None:
terms_for_logsum = [exp(util) for util in utilities.values()]
else:
terms_for_logsum = [
exp(self.scale_parameter * util) for util in utilities.values()
]
logsum_term = number_of_chosen_alternatives * log(bioMultSum(terms_for_logsum))
log_prob = baseline_term + first_determinant + second_determinant - logsum_term
# Scale parameter
if self.scale_parameter is not None:
log_prob += (number_of_chosen_alternatives - 1) * log(self.scale_parameter)
return log_prob
[docs]
def estimate_parameters(
self,
database: Database,
number_of_chosen_alternatives: Expression,
consumed_quantities: dict[int, Expression],
**kwargs,
) -> bioResults:
"""Generate the Biogeme formula for the log probability of the MDCEV model
:param database: data needed for the estimation of the parameters, in Biogeme format.
:param number_of_chosen_alternatives: see the module documentation :mod:`biogeme.mdcev_no_outside_good`
:param consumed_quantities: see the module documentation :mod:`biogeme.mdcev_no_outside_good`
:param **kwargs: additional parameters that are transmitted as such to the constructor of the Biogeme object.
A detailed explanation is provided in the technical report
"Estimating the MDCEV model with Biogeme":
"""
logprob = self.loglikelihood(
number_of_chosen_alternatives=number_of_chosen_alternatives,
consumed_quantities=consumed_quantities,
)
# Create the Biogeme object
if self.weights is None:
the_biogeme = BIOGEME(database, logprob, **kwargs)
else:
formulas = {'log_like': logprob, 'weight': self.weights}
the_biogeme = BIOGEME(database, formulas, **kwargs)
the_biogeme.modelName = self.model_name
self.estimation_results = the_biogeme.estimate()
return self.estimation_results
@abstractmethod
def _list_of_expressions(self) -> list[Expression]:
"""Extract the list of expressions involved in the model"""
raise NotImplementedError
def _update_parameters_in_expressions(self) -> None:
"""Update the value of the unknown parameters in expression, after estimation"""
if self._estimation_results is None:
error_msg = 'No estimation result is available'
raise BiogemeError(error_msg)
betas = self._estimation_results.get_beta_values()
all_expressions = [
expression
for group in [
self.baseline_utilities.values(),
self.gamma_parameters.values(),
(
self.alpha_parameters.values()
if self.alpha_parameters is not None
else []
),
[self.scale_parameter] if self.scale_parameter is not None else [],
[self.weights] if self.weights is not None else [],
]
for expression in group
if expression is not None
]
# ADd the expressions from the child class
all_expressions += self._list_of_expressions()
# Update each expression with the new beta values
for expression in all_expressions:
expression.change_init_values(betas)
[docs]
@abstractmethod
def utility_one_alternative(
self,
the_id: int,
the_consumption: float,
epsilon: float,
one_observation: Database,
) -> float:
"""Utility needed for forecasting"""
raise NotImplementedError
[docs]
@abstractmethod
def utility_expression_one_alternative(
self,
the_id: int,
the_consumption: Expression,
unscaled_epsilon: Expression,
) -> Expression:
"""Utility expression. Used only for code validation."""
raise NotImplementedError
[docs]
def sum_of_utilities(
self, consumptions: np.ndarray, epsilon: np.ndarray, data_row: Database
) -> float:
"""Calculates the sum of all utilities. Used for forecasting."""
try:
utilities = [
self.utility_one_alternative(
the_id=key,
the_consumption=float(consumptions[index]),
epsilon=float(epsilon[index]),
one_observation=data_row,
)
for index, key in enumerate(self.index_to_key)
]
except IndexError as e:
raise e
return np.sum(utilities)
[docs]
def forecast_bruteforce_one_draw(
self, one_row_database: Database, total_budget: float, epsilon: np.ndarray
) -> dict[int, float] | None:
"""Forecast the optimal expenditures given a budget and one realization of epsilon for each alternative.
If there is an issue with the optimization algorithm, None is returned.
"""
if len(epsilon) != self.number_of_alternatives:
error_msg = f'epsilon must be a vector of size {self.number_of_alternatives}, not {epsilon.shape}'
raise BiogemeError(error_msg)
if len(one_row_database.data) != 1:
error_msg = f'Expecting exactly one row, not {len(one_row_database.data)}'
raise BiogemeError(error_msg)
self.database = one_row_database
# If there is an outside good, we need to impose that the corresponding consumption is not zero.
# To do that, we impose that it is equal to the exponential of a new variable.
outside_good = self.outside_good_key is not None
def objective_function(x: np.ndarray) -> float:
"""Objective function. The number of variables is the number of
alternatives"""
result = self.sum_of_utilities(
consumptions=x, epsilon=epsilon, data_row=one_row_database
)
return -result
def budget_constraint(x: np.ndarray) -> float:
"""Budget constraint when there is no outside good. The number of variables is the number of
alternatives"""
return total_budget - x.sum()
def opposite_budget_constraint(x: np.ndarray) -> float:
"""Budget constraint when there is no outside good. The number of variables is the number of
alternatives"""
return x.sum() - total_budget
constraints = [
{
'type': 'ineq',
'fun': budget_constraint,
},
{
'type': 'ineq',
'fun': opposite_budget_constraint,
},
]
number_of_alternatives = len(self.alternatives)
# Bounds
bounds: list[tuple[float | None, float | None]] = [
(0, total_budget) for _ in range(number_of_alternatives)
]
if self.outside_good_index is not None:
bounds[self.outside_good_index] = (SMALLEST_NON_ZERO_NUMBER, total_budget)
# Starting point
# We split the budget equally across alternatives
initial_consumption = total_budget / number_of_alternatives
# We split equally across alternatives
initial_guess = np.array([initial_consumption] * number_of_alternatives)
# There is a bug in the minimize function. It generates a warning that crashes the script.
# We are converting this warning into an exception in order to catch it.
# Convert specific warnings to exceptions
warnings.filterwarnings(
'error',
message='Values in x were outside bounds during a minimize step, clipping to bounds',
)
try:
optimization_result = minimize(
objective_function,
initial_guess,
method='SLSQP',
bounds=bounds,
constraints=constraints,
)
optimal_consumption = {
self.index_to_key[index]: optimization_result.x[index]
for index in range(number_of_alternatives)
}
for alternative in self.alternatives:
if alternative not in optimal_consumption:
optimal_consumption[alternative] = 0
return optimal_consumption
except RuntimeWarning as e:
logger.warning(e)
return None
[docs]
@abstractmethod
def derivative_utility_one_alternative(
self,
the_id: int,
the_consumption: float,
epsilon: float,
one_observation: Database,
) -> float:
"""Used in the optimization problem solved for forecasting tp calculate the dual variable."""
raise NotImplementedError
[docs]
def optimal_consumption(
self,
chosen_alternatives: Iterable[int],
dual_variable: float,
epsilon: np.ndarray,
one_observation: Database,
) -> dict[int, float]:
"""Analytical calculation of the optimal consumption if the dual variable is known."""
try:
result = {
alt_id: self.optimal_consumption_one_alternative(
the_id=alt_id,
dual_variable=dual_variable,
epsilon=float(epsilon[self.key_to_index[alt_id]]),
one_observation=one_observation,
)
for alt_id in chosen_alternatives
}
except KeyError as e:
raise e
return result
[docs]
@abstractmethod
def optimal_consumption_one_alternative(
self,
the_id: int,
dual_variable: float,
epsilon: float,
one_observation: Database,
) -> float:
"""Analytical calculation of the optimal consumption if the dual variable is known."""
raise NotImplementedError
[docs]
@abstractmethod
def lower_bound_dual_variable(
self,
chosen_alternatives: set[int],
one_observation: Database,
epsilon: np.ndarray,
) -> float:
"""Method providing model specific bounds on the dual variable. It not overloaded,
default values are used.
:param chosen_alternatives: list of alternatives that are chosen at the optimal solution
:param one_observation: data for one observation.
:param epsilon: draws from the error term.
:return: a lower bound on the dual variable, such that the expenditure calculated for any larger value is
well-defined and non negative.
"""
raise NotImplementedError
[docs]
def is_next_alternative_chosen(
self,
chosen_alternatives: set[int],
candidate_alternative: int,
derivative_zero_expenditure_candidate: float,
database: Database,
total_budget: float,
epsilon: np.ndarray,
) -> tuple[bool, float | None]:
# If the candidate alternative is the outside good, it is in the choice set by definition.
model_lower_bound = self.lower_bound_dual_variable(
chosen_alternatives=chosen_alternatives,
one_observation=database,
epsilon=epsilon,
)
if derivative_zero_expenditure_candidate < model_lower_bound:
# Lower bound is violated. Alternative not in choice set.
return False, model_lower_bound
candidate_set = chosen_alternatives | {candidate_alternative}
# Derive the optimal consumption
estimate_optimal_consumption_candidate = self.optimal_consumption(
chosen_alternatives=candidate_set,
dual_variable=derivative_zero_expenditure_candidate,
epsilon=epsilon,
one_observation=database,
)
total_consumption_candidate = sum(
estimate_optimal_consumption_candidate.values()
)
if total_budget <= total_consumption_candidate:
# Total budget is overpassed. Alternative not in the choice set.
return False, derivative_zero_expenditure_candidate
return True, None
[docs]
def identification_chosen_alternatives(
self,
database: Database,
total_budget: float,
epsilon: np.ndarray,
) -> tuple[set[int], float, float]:
"""Algorithm identifying the chosen alternatives at the optimal solution
:param database: one row of the database.
:param total_budget: total budget for the constraint.
:param epsilon: vector of draws from the error term.
:return: the set of chosen alternatives, as well as a lower and an upper bound on the dual variable.
"""
the_unsorted_w: dict[int, float] = {
alt_id: self.derivative_utility_one_alternative(
the_id=alt_id,
one_observation=database,
the_consumption=0,
epsilon=float(epsilon[self.key_to_index[alt_id]]),
)
for alt_id in self.alternatives
if alt_id != self.outside_good_key
}
# Sort the dictionary by values (descending order)
ordered_alternatives: list[int] = [
alt_id
for alt_id, _ in sorted(
the_unsorted_w.items(), key=lambda item: item[1], reverse=True
)
]
chosen_alternatives: set[int] = (
set() if self.outside_good_key is None else {self.outside_good_key}
)
last_chosen_alternative = None
for candidate_alternative_id in ordered_alternatives:
derivative_zero_expenditure_candidate = the_unsorted_w[
candidate_alternative_id
]
next_chosen, lower_bound = self.is_next_alternative_chosen(
chosen_alternatives=chosen_alternatives,
candidate_alternative=candidate_alternative_id,
derivative_zero_expenditure_candidate=derivative_zero_expenditure_candidate,
database=database,
total_budget=total_budget,
epsilon=epsilon,
)
if not next_chosen:
upper_bound = (
np.finfo(np.float64).max
if last_chosen_alternative is None
else the_unsorted_w[last_chosen_alternative]
)
return (
chosen_alternatives,
lower_bound,
upper_bound,
)
chosen_alternatives |= {candidate_alternative_id}
last_chosen_alternative = candidate_alternative_id
# The full choice set is chosen
lower_bound = self.lower_bound_dual_variable(
chosen_alternatives=chosen_alternatives,
one_observation=database,
epsilon=epsilon,
)
upper_bound = the_unsorted_w[ordered_alternatives[-1]]
return chosen_alternatives, lower_bound, upper_bound
[docs]
def forecast_bisection_one_draw(
self,
one_row_of_database: Database,
total_budget: float,
epsilon: np.ndarray,
tolerance_dual=1.0e-13,
tolerance_budget=1.0e-13,
) -> dict[int, float]:
chosen_alternatives, lower_bound, upper_bound = (
self.identification_chosen_alternatives(
database=one_row_of_database, total_budget=total_budget, epsilon=epsilon
)
)
if lower_bound > upper_bound:
error_msg = (
f'Lower bound: {lower_bound} larger than upper bound: {upper_bound}. '
f'Chosen alternatives: {chosen_alternatives}'
)
raise BiogemeError(error_msg)
# Estimate the dual variable by bisection
total_consumption = 0
continue_iterations = True
for _ in range(5000):
if not continue_iterations:
break
dual_variable = (lower_bound + upper_bound) / 2
optimal_consumption = self.optimal_consumption(
chosen_alternatives=chosen_alternatives,
dual_variable=dual_variable,
epsilon=epsilon,
one_observation=one_row_of_database,
)
negative_consumption = any(
value < 0 for value in optimal_consumption.values()
)
if negative_consumption:
raise ValueError('Negative consumption')
if negative_consumption:
# If any consumption is negative, we update the upper bound
upper_bound = dual_variable
else:
total_consumption = sum(optimal_consumption.values())
if total_consumption < total_budget:
upper_bound = dual_variable
elif total_consumption > total_budget:
lower_bound = dual_variable
# Stopping criteria
if (upper_bound - lower_bound) <= tolerance_dual:
continue_iterations = False
elif np.abs(total_consumption - total_budget) <= tolerance_budget:
continue_iterations = False
dual_variable = (lower_bound + upper_bound) / 2
optimal_consumption = self.optimal_consumption(
chosen_alternatives=chosen_alternatives,
dual_variable=dual_variable,
epsilon=epsilon,
one_observation=one_row_of_database,
)
for alternative in self.alternatives:
if alternative not in optimal_consumption:
optimal_consumption[alternative] = 0
return optimal_consumption
[docs]
def validation_one_alternative_one_consumption(
self, alternative_id: int, value_consumption: float, one_row: Database
) -> list[str]:
"""Validation of some of the abstract methods."""
error_messages = []
value_epsilon = 0.01
consumption = Beta('consumption', value_consumption, None, None, 0)
unscaled_epsilon = Numeric(value_epsilon)
utility = self.utility_expression_one_alternative(
the_id=alternative_id,
the_consumption=consumption,
unscaled_epsilon=unscaled_epsilon,
)
result: FunctionOutput = utility.get_value_and_derivatives(
database=one_row, prepare_ids=True, gradient=True, named_results=True
)
# Validate the utility calculation
calculated_utility_value = self.utility_one_alternative(
the_id=alternative_id,
the_consumption=value_consumption,
epsilon=value_epsilon,
one_observation=one_row,
)
if not np.isclose(result.function, calculated_utility_value):
error_msg = f'Inconsistent utility values for alt. {alternative_id}: {result.function} and {calculated_utility_value}'
error_messages.append(error_msg)
# Validate the derivative
calculated_derivative = self.derivative_utility_one_alternative(
the_id=alternative_id,
the_consumption=value_consumption,
epsilon=value_epsilon,
one_observation=one_row,
)
the_gradient = result.gradient['consumption']
if not np.isclose(the_gradient, calculated_derivative):
error_msg = f'Inconsistent derivatives for alt. {alternative_id}: {the_gradient} and {calculated_derivative}'
error_messages.append(error_msg)
return error_messages
[docs]
def validation_one_alternative(
self, alternative_id: int, one_row: Database
) -> list[str]:
values = [1, 10, 100]
error_messages = sum(
(
self.validation_one_alternative_one_consumption(
alternative_id=alternative_id,
value_consumption=consumption_value,
one_row=one_row,
)
for consumption_value in values
),
[],
)
# Validate the optimal consumption
dual_variable = 10
value_epsilon = 0.01
optimal_consumption = self.optimal_consumption_one_alternative(
the_id=alternative_id,
dual_variable=dual_variable,
epsilon=value_epsilon,
one_observation=one_row,
)
# If we calculate the derivative for this level of consumption, we should find the dual variable.
calculated_derivative = self.derivative_utility_one_alternative(
the_id=alternative_id,
the_consumption=optimal_consumption,
epsilon=value_epsilon,
one_observation=one_row,
)
if not np.isclose(calculated_derivative, dual_variable):
error_msg = (
f'Inconsistent dual variables for alt. {alternative_id}: '
f'{calculated_derivative} and {dual_variable}'
)
error_messages.append(error_msg)
return error_messages
[docs]
def validation(self, one_row: Database) -> list[str]:
"""Validation of some abstract methods for all alternatives."""
return sum(
(
self.validation_one_alternative(alternative_id=the_id, one_row=one_row)
for the_id in self.alternatives
),
[],
)
[docs]
def forecast(
self,
database: Database,
total_budget: float,
epsilons: list[np.ndarray],
brute_force: bool = False,
tolerance_dual: float = 1.0e-10,
tolerance_budget: float = 1.0e-10,
) -> list[pd.DataFrame]:
"""Forecast the optimal expenditures given a budget and one realization of epsilon for each alternative
:param database: database containing the values of the explanatory variables.
:param total_budget: total budget.
:param epsilons: draws from the error terms provided by the user. Each entry of the list corresponds
to an observation in the database, and consists of a data frame of
size=(number_of_draws, self.number_of_alternatives).
:param brute_force: if True, the brute force algorithm is applied. It solves each optimization problem using
the scipy optimization algorithm. If False, the method proposed by Pinjari and Bhat (2021) is used.
:param tolerance_dual: convergence criterion for the estimate of the dual variable.
:param tolerance_budget: convergence criterion for the estimate of the total budget.
:return: a list of data frames, each containing the results for each draw
.. [PinjBhat21] A. R. Pinjari, C. Bhat, Computationally efficient forecasting procedures for Kuhn-Tucker
consumer demand model systems: Application to residential energy consumption analysis, Journal of Choice
Modelling, Volume 39, 2021, 100283.
"""
rows_of_database = [
Database(name=f'row_{i}', pandas_database=database.data.iloc[[i]])
for i in range(len(database.data))
]
if len(rows_of_database) == 0:
error_msg = 'Empty database'
raise BiogemeError(error_msg)
if len(epsilons) != len(rows_of_database):
error_msg = (
f'User provided draws have {len(epsilons)} entries while there are '
f'{len(rows_of_database)} observations in the sample.'
)
raise BiogemeError(error_msg)
number_of_draws = None
for index, epsilon in enumerate(epsilons):
if number_of_draws is None:
number_of_draws = epsilon.shape[0]
elif epsilon.shape[0] != number_of_draws:
error_msg = (
f'User defined draws for obs. {index} contains {epsilon.shape[0]} '
f'draws instead of {number_of_draws}'
)
raise error_msg
if epsilon.shape[1] != self.number_of_alternatives:
error_msg = (
f'User defined draws for obs. {index} contains {epsilon.shape[1]} '
f'alternatives instead of {self.number_of_alternatives}'
)
raise error_msg
all_results = []
for index, row in enumerate(rows_of_database):
logger.info(
f'Forecasting observation {index} / {len(rows_of_database)} [{number_of_draws} draws]'
)
# Forecasting
optimal_expenditures = (
[
self.forecast_bruteforce_one_draw(
one_row_database=row, total_budget=total_budget, epsilon=epsilon
)
for epsilon in epsilons[index]
]
if brute_force
else [
self.forecast_bisection_one_draw(
one_row_of_database=row,
total_budget=total_budget,
epsilon=epsilon,
tolerance_budget=tolerance_budget,
tolerance_dual=tolerance_dual,
)
for epsilon in epsilons[index]
]
)
gather_data = defaultdict(list)
# Collecting values for each key
for optimal_solution in optimal_expenditures:
if optimal_solution is not None:
for key, value in optimal_solution.items():
gather_data[key].append(value)
all_results.append(pd.DataFrame(gather_data).sort_index(axis='columns'))
return all_results
[docs]
def forecast_comparison_one_draw(
self,
one_row_of_database: Database,
total_budget: float,
epsilon: np.array,
tolerance_dual: float = 1.0e-10,
tolerance_budget: float = 1.0e-10,
) -> dict[int, float] | None:
"""Compares the results from each of the two forecasting algorithms.
:param one_row_of_database: list of rows from the database containing the values of the explanatory variables.
:param total_budget: total budget.
:param epsilon: draws from the error terms provided by the user. Length: self.number_of_alternatives.
:param tolerance_dual: convergence criterion for the estimate of the dual variable.
:param tolerance_budget: convergence criterion for the estimate of the total budget.
:return: None. Warning are triggered if discrepancies are observed.
"""
if len(epsilon) != self.number_of_alternatives:
error_msg = f'There are {self.number_of_alternatives} alternatives and {len(epsilon)} epsilons'
raise BiogemeError(error_msg)
logger.info('============ Comparison ===================')
brute_force: dict[int, float] | None = self.forecast_bruteforce_one_draw(
one_row_database=one_row_of_database,
total_budget=total_budget,
epsilon=epsilon,
)
consumption_brute_force = (
np.array([value for key, value in sorted(brute_force.items())])
if brute_force is not None
else None
)
obj_brute_force = (
self.sum_of_utilities(
consumptions=consumption_brute_force,
epsilon=epsilon,
data_row=one_row_of_database,
)
if brute_force is not None
else None
)
constraint_brute_force = (
sum(consumption_brute_force) if brute_force is not None else None
)
choice_set_brute_force: set[int] | None = None
if brute_force is None:
logger.info('Brute force algorithm failed')
else:
choice_set_brute_force = set(
[key for key, value in brute_force.items() if not np.isclose(value, 0)]
)
formatted_brute_force = {
k: f'{value:.3g}' for k, value in brute_force.items()
}
logger.info(
f'Brute force: {formatted_brute_force} objective {obj_brute_force:.3g}, constraint {constraint_brute_force:.3g}, choice set {choice_set_brute_force}'
)
analytical = self.forecast_bisection_one_draw(
one_row_of_database=one_row_of_database,
total_budget=total_budget,
epsilon=epsilon,
tolerance_budget=tolerance_budget,
tolerance_dual=tolerance_dual,
)
choice_set_analytical: set[int] = set(
[key for key, value in analytical.items() if not np.isclose(value, 0)]
)
consumption_analytical = (
np.array([value for key, value in sorted(analytical.items())])
if analytical is not None
else None
)
try:
obj_analytical = (
self.sum_of_utilities(
consumptions=consumption_analytical,
epsilon=epsilon,
data_row=one_row_of_database,
)
if analytical is not None
else None
)
except RuntimeWarning as e:
logger.warning(f'{analytical=}')
logger.warning(f'{consumption_analytical=}')
raise e
constraint_analytical = (
sum(consumption_analytical) if analytical is not None else None
)
if analytical is None:
logger.info('Analytical algorithm failed')
else:
formatted_analytical = {
k: f'{value:.3g}' for k, value in analytical.items()
}
logger.info(
f'Analytical: {formatted_analytical} objective {obj_analytical:.3g}, constraint {constraint_analytical:.3g}, choice set {choice_set_analytical}'
)
if brute_force is None and analytical is None:
logger.warning('Both algorithms failed.')
return
if brute_force is None:
logger.warning('Brute force algorithm failed.')
return
if analytical is None:
logger.warning('Analytical algorithm failed.')
return
if (
choice_set_brute_force != choice_set_analytical
and choice_set_brute_force is not None
):
logger.warning(
f'Different optimal choice sets: analytical {choice_set_analytical}, brute force {choice_set_brute_force}'
)
if not np.isclose(obj_analytical, obj_brute_force):
logger.warning(
f'Difference between optimal utility with analytical [{obj_analytical:.4g}] and brute '
f'force [{obj_brute_force:.4g}] algorithms.'
)
brute_force_results = ', '.join(
[f'{key}: {value:.3g}' for key, value in brute_force.items()]
)
analytical_results = ', '.join(
[f'{key}: {value:.3g}' for key, value in analytical.items()]
)
logger.warning(f'Solution with brute force: {brute_force_results}')
logger.warning(f'Solution with analytical: {analytical_results}')
if not np.isclose(constraint_analytical, constraint_brute_force):
logger.warning(
f'Difference between constraint with analytical [{constraint_analytical}] and brute force '
f'[{constraint_brute_force}] algorithms.'
)
[docs]
def generate_epsilons(
self, number_of_observations: int, number_of_draws: int
) -> list[np.ndarray]:
"""Generate draws for the error terms.
:param number_of_observations: number of entries in the database.
:param number_of_draws: number of draws to generate
:return: a list of length "number_of_observations". Each element is a
number_of_draws x self.number_of_alternatives array.
"""
return [
np.random.gumbel(
loc=0, scale=1, size=(number_of_draws, self.number_of_alternatives)
)
for _ in range(number_of_observations)
]
[docs]
def validate_forecast(
self,
database: Database,
total_budget: float,
epsilons: list[np.ndarray],
tolerance_dual: float = 1.0e-13,
tolerance_budget: float = 1.0e-13,
) -> None:
"""Compare the two algorithms to forecast the optimal expenditures given a budget and one realization of
epsilon for each alternative
:param database: database containing the values of the explanatory variables.
:param total_budget: total budget.
:param epsilons: draws from the error terms. Each entry of the list corresponds
to an observation in the database, and consists of a data frame of
size=(number_of_draws, self.number_of_alternatives).
:param tolerance_dual: convergence criterion for the estimate of the dual variable.
:param tolerance_budget: convergence criterion for the estimate of the total budget.
:return: None. Warning are triggered if discrepancies are observed
"""
rows_of_database = database.mdcev_row_split()
if len(rows_of_database) == 0:
error_msg = 'Empty database'
raise BiogemeError(error_msg)
if len(epsilons) != len(rows_of_database):
error_msg = (
f'User provided draws have {len(epsilons)} entries while there are '
f'{len(rows_of_database)} observations in the sample.'
)
raise BiogemeError(error_msg)
number_of_draws = len(epsilons[0])
for index, row in enumerate(rows_of_database):
logger.info(
f'Forecasting observation {index} / {len(rows_of_database)} [{number_of_draws} draws]'
)
# Forecasting
for epsilon in epsilons[index]:
self.forecast_comparison_one_draw(
total_budget=total_budget,
one_row_of_database=row,
epsilon=epsilon,
tolerance_budget=tolerance_budget,
tolerance_dual=tolerance_dual,
)