""" Defines a class that characterized the context to apply sampling of alternatives
:author: Michel Bierlaire
:date: Wed Sep 6 14:38:31 2023
"""
import logging
from dataclasses import dataclass
from typing import NamedTuple, Optional, Iterable
import pandas as pd
from biogeme.expressions import Expression, TypeOfElementaryExpression
from biogeme.nests import NestsForCrossNestedLogit
from biogeme.exceptions import BiogemeError
from biogeme.partition import Partition, Segment
logger = logging.getLogger(__name__)
MEV_PREFIX = '_MEV_'
LOG_PROBA_COL = '_log_proba'
MEV_WEIGHT = '_mev_weight'
CNL_PREFIX = '_CNL_'
[docs]
class StratumTuple(NamedTuple):
"""A stratum is an element of a partition of the full choice set,
combined with the number of alternatives that must be sampled.
"""
subset: Segment
sample_size: int
[docs]
class CrossVariableTuple(NamedTuple):
"""A cross variable is a variable that involves socio-economic
attributes of the individuals, and attributes of the
alternatives. It can only be calculated after the sampling has
been made.
"""
name: str
formula: Expression
[docs]
@dataclass
class SamplingContext:
"""Class gathering the data needed to perform an estimation with
samples of alternatives
:param the_partition: Partition used for the sampling.
:param sample_sizes: number of alternative to draw from each segment.
:param individuals: Pandas data frame containing all the
individuals as rows. One column must contain the choice of
each individual.
:param choice_column: name of the column containing the choice of
each individual.
:param alternatives: Pandas data frame containing all the
alternatives as rows. One column must contain a unique ID
identifying the alternatives. The other columns contain
variables to include in the data file.
:param id_column: name of the column containing the Ids of the alternatives.
:param utility_function: definition of the generic utility function
:param combined_variables: definition of interaction variables
:param mev_partition: If a second choice set need to be sampled
for the MEV terms, the corresponding partitition is provided
here.
"""
the_partition: Partition
sample_sizes: Iterable[int]
individuals: pd.DataFrame
choice_column: str
alternatives: pd.DataFrame
id_column: str
biogeme_file_name: str
utility_function: Expression
combined_variables: list[CrossVariableTuple]
mev_partition: Optional[Partition] = None
mev_sample_sizes: Optional[Iterable[int]] = None
cnl_nests: Optional[NestsForCrossNestedLogit] = None
[docs]
def check_expression(self, expression: Expression) -> None:
"""Verifies if the variables contained in the expression can be found in the databases"""
variables = expression.set_of_elementary_expression(
TypeOfElementaryExpression.VARIABLE
)
for variable in variables:
if (
variable not in self.individuals.columns
and variable not in self.alternatives.columns
and all(variable != t.name for t in self.combined_variables)
):
error_msg = (
f'Invalid expression. Variable "{variable}" has not been found in '
f'the provided database'
)
raise BiogemeError(error_msg)
[docs]
def check_partition(self) -> None:
"""Check if the partition is truly a partition. If not, an exception is raised
:raise BiogemeError: if some elements are present in more than one subset.
:raise BiogemeError: if the size of the union of the subsets does
not match the expected total size
:raise BiogemeError: if an alternative in the partition does
not appear in the database of alternatives
:raise BiogemeError: if a segment is empty
:raise BiogemeError: if the number of sampled alternatives in
a stratum is incorrect , that is zero, or larger than the
stratum size..
"""
# Verify that all requested alternatives appear in the database of alternatives
for stratum in self.partition:
n = len(stratum.subset)
if n == 0:
error_msg = 'A stratum is empty'
raise BiogemeError(error_msg)
k = stratum.sample_size
if k > n:
error_msg = f'Cannot draw {k} elements in a stratum of size {n}'
raise BiogemeError(error_msg)
if k == 0:
error_msg = 'At least one alternative must be selected in each segment'
raise BiogemeError(error_msg)
for alt in stratum.subset:
if alt not in self.alternatives[self.id_column].values:
error_msg = (
f'Alternative {alt} does not appear in the database of '
f'alternatives'
)
raise BiogemeError(error_msg)
[docs]
def check_mev_partition(self) -> None:
"""Check if the partition is a partition of the MEV
alternatives. It does not need to cover the full choice set"""
if self.mev_partition:
if self.mev_sample_sizes is None:
error_msg = (
'If mev_partition is defined, mev_sample_size must also be defined'
)
raise BiogemeError(error_msg)
if self.mev_sample_sizes:
if self.mev_partition is None:
error_msg = (
'If mev_sample_sizes is defined, mev_partition must also be defined'
)
raise BiogemeError(error_msg)
if self.cnl_nests and self.mev_partition:
if self.cnl_nests.mev_alternatives != self.mev_partition.full_set:
in_nest_not_in_partition = (
self.cnl_nests.mev_alternatives - self.mev_partition.full_set
)
in_partition_not_in_nest = (
self.mev_partition.full_set - self.cnl_nests.mev_alternatives
)
error_msg = ''
if in_nest_not_in_partition:
error_msg += (
f'The following alternative(s) belong to a nest but not to the'
f' partition for the sample: {in_nest_not_in_partition}. '
)
if in_partition_not_in_nest:
error_msg += (
f'The following alternative(s) belong to the partition for '
f'the MEV sample, but not to any nest: {in_partition_not_in_nest}'
)
[docs]
def check_valid_alternatives(self, set_of_ids: set[int]) -> None:
"""Check if the IDs in set are indeed valid
alternatives. Typically used to check if a nest is well
defined
:param set_of_ids: set of identifiers to check
:raise BiogemeError: if at least one id is invalid.
"""
if (
not pd.Series(list(set_of_ids))
.isin(self.alternatives[self.id_column])
.all()
):
missing_values = set_of_ids - set(self.alternatives[self.id_column])
raise BiogemeError(
f'The following IDs are not valid alternative IDs: {missing_values}'
)
[docs]
def include_cnl_alphas(self) -> None:
if self.cnl_nests is None:
return
for nest in self.cnl_nests:
column_name = f'{CNL_PREFIX}{nest.name}'
self.alternatives[column_name] = self.alternatives[self.id_column].map(
lambda x: (
self.cnl_nests.get_alpha_values(alternative_id=x)[nest.name]
if x in nest.dict_of_alpha
else 0.0
)
)
def __post_init__(self) -> None:
# Check for empty utility function
if self.utility_function is None:
raise BiogemeError('No utility function has been provided')
# Check for empty strings
if not self.choice_column:
raise BiogemeError('choice_column should not be an empty string.')
if not self.id_column:
raise BiogemeError('id_column should not be an empty string.')
# Validate that the DataFrames are not empty
if self.individuals.empty or self.alternatives.empty:
raise BiogemeError(
'DataFrames individuals or alternatives should not be empty.'
)
# A previous implementation used a list of StratumTuple. We
# now perform the conversion.
self.partition = [
StratumTuple(subset=segment, sample_size=size)
for segment, size in zip(self.the_partition, self.sample_sizes)
]
self.check_partition()
logger.debug('Check if there is a MEV partition')
if self.mev_partition or self.mev_sample_sizes:
logger.debug('Yes, there is a MEV partition')
self.check_mev_partition()
self.second_partition = [
StratumTuple(subset=segment, sample_size=size)
for segment, size in zip(self.mev_partition, self.mev_sample_sizes)
]
else:
logger.debug('No, there is no MEV partition')
self.second_partition = None
# If CNL nests are defined, check that the alphas are all
# fixed and that the nests have a name.
if self.cnl_nests:
if not self.cnl_nests.all_alphas_fixed():
error_msg = 'For the CNL model, all alpha parameters must be fixed.'
raise BiogemeError(error_msg)
if not self.cnl_nests.check_names():
error_msg = 'For the CNL model, all nests must have a name.'
raise BiogemeError(error_msg)
self.number_of_alternatives = self.alternatives.shape[0]
self.number_of_individuals = self.individuals.shape[0]
# Validate that choice_column is in the individuals DataFrame
if self.choice_column not in self.individuals.columns:
raise BiogemeError(
f'{self.choice_column} is not a column in the individuals DataFrame.'
)
# Validate that id_column is in the alternatives DataFrame
if self.id_column not in self.alternatives.columns:
raise BiogemeError(
f'{self.id_column} is not a column in the alternatives DataFrame.'
)
# Check for data types
if not self.individuals[self.choice_column].dtype in [int, float]:
raise BiogemeError(
f'Column {self.choice_column} in data frame "individuals" should '
f'be of type int or float.'
)
if not self.alternatives[self.id_column].dtype in [int, float]:
raise BiogemeError(
f'Column {self.id_column} in alternatives should be of type int or float.'
)
self.total_sample_size = sum(stratum.sample_size for stratum in self.partition)
self.second_sample_size = (
None
if self.second_partition is None
else sum(stratum.sample_size for stratum in self.second_partition)
)
self.check_expression(self.utility_function)
for cross_variable in self.combined_variables:
self.check_expression(cross_variable.formula)
self.attributes = set(self.alternatives.columns) | {
combined_variable.name for combined_variable in self.combined_variables
}
self.mev_prefix = '' if self.second_partition is None else MEV_PREFIX
self.include_cnl_alphas()
[docs]
def reporting(self) -> str:
"""Summarizes the configuration specified by the context object."""
result = {
'Size of the choice set': self.alternatives.shape[0],
'Main partition': (
f'{self.the_partition.number_of_segments()} segment(s) of size '
f'{", ".join([str(len(segment)) for segment in self.the_partition])}'
),
'Main sample': f'{self.total_sample_size}: ',
}
result['Main sample'] += ', '.join(
[
f'{stratum.sample_size}/{len(stratum.subset)}'
for stratum in self.partition
]
)
if self.mev_partition:
result['Nbr of MEV alternatives'] = len(self.mev_partition.full_set)
result['MEV partition'] = (
f'{self.mev_partition.number_of_segments()} segment(s) of size '
f'{", ".join([str(len(segment)) for segment in self.mev_partition])}'
)
result['MEV sample'] = f'{self.second_sample_size}: '
result['MEV sample'] += ', '.join(
[
f'{stratum.sample_size}/{len(stratum.subset)}'
for stratum in self.second_partition
]
)
output = ''
for section, description in result.items():
output += f'{section}: {description}\n'
return output