Source code for biogeme.sampling_of_alternatives.choice_set_generation
"""Module in charge of functionalities related to the choice set generation For thew main sample, all alternatives except the last one must be used: 0 to J-1. For MEV models, the approximation of the sum capturing the nests requires another sample not based on the choice.:author: Michel Bierlaire:date: Fri Oct 27 12:50:06 2023"""importloggingimportosimportpandasaspdfromtqdmimporttqdmfrombiogeme.databaseimportDatabasefrombiogeme.expressionsimport(Expression,OldNewName,list_of_variables_in_expression,rename_all_variables,)from.sampling_contextimportMEV_PREFIX,SamplingContextfrom.sampling_of_alternativesimportSamplingOfAlternativestqdm.pandas()logger=logging.getLogger(__name__)
[docs]classChoiceSetsGeneration:"""Class in charge of generating the choice sets for each individual."""def__init__(self,context:SamplingContext):"""Constructor :param context: contains all the information that is needed to perform the sampling of alternatives. """self.sampling_of_alternatives=SamplingOfAlternatives(context)self.alternatives=context.alternativesself.individuals=context.individualsself.choice_column=context.choice_columnself.number_of_individuals=context.number_of_individualsself.id_column=context.id_columnself.partition=context.sampling_protocolself.second_partition=context.mev_sampling_protocolself.combined_variables=context.combined_variablesself.biogeme_file_name=context.biogeme_file_nameself.total_sample_size=context.total_sample_sizeself.second_sample_size=context.total_mev_sample_sizeself.cnl_nests=context.cnl_nestsself.biogeme_data=None
[docs]defget_attributes_from_expression(self,expression:Expression)->set[str]:"""Extract the names of the attributes of alternatives from an expression"""variables={var.nameforvarinlist_of_variables_in_expression(the_expression=expression)}attributes=set(self.alternatives.columns)returnvariables&attributes
[docs]defprocess_row(self,individual_row:pd.Series)->dict:"""Process one row of the individual database :param individual_row: row corresponding to one individual :return: a dictionary containing the data for the extended row """choice=individual_row[self.choice_column]first_sample=self.sampling_of_alternatives.sample_alternatives(chosen=choice)# Create the columnsflattened_first_series:pd.Series[float,tuple[int,str]]=first_sample.stack()flattened_first_dict={f'{col_name}_{row}':valuefor(row,col_name),valueinflattened_first_series.items()}row_data=individual_row.to_dict()row_data.update(flattened_first_dict)ifself.second_partitionisnotNone:second_sample=self.sampling_of_alternatives.sample_mev_alternatives()# Rename columns for second_sample without multi-level indexflattened_second_series=second_sample.stack()flattened_second_dict={f"{MEV_PREFIX}{col_name}_{row}":valuefor(row,col_name),valueinflattened_second_series.items()}row_data.update(flattened_second_dict)returnrow_data
[docs]defdefine_new_variables(self,database:Database):"""Create the new variables :param database: database, in Biogeme format. """total_iterations=len(self.combined_variables)*self.total_sample_sizewithtqdm(total=total_iterations,desc="Defining new variables...")asprogress_bar:fornew_variableinself.combined_variables:forindexinrange(self.total_sample_size):copy_expression=new_variable.formula.deep_flat_copy()attributes=self.get_attributes_from_expression(copy_expression)renaming_list=[OldNewName(old_name=attribute,new_name=f'{attribute}_{index}')forattributeinattributes]rename_all_variables(expr=copy_expression,renaming_list=renaming_list)database.define_variable(f"{new_variable.name}_{index}",copy_expression)progress_bar.update(1)ifself.second_partitionisnotNone:forindexinrange(self.second_sample_size):copy_expression=new_variable.formula.deep_flat_copy()attributes=self.get_attributes_from_expression(copy_expression)renaming_list=[OldNewName(old_name=attribute,new_name=f'{MEV_PREFIX}{attribute}_{index}',)forattributeinattributes]rename_all_variables(expr=copy_expression,renaming_list=renaming_list,)database.define_variable(f"{MEV_PREFIX}{new_variable.name}_{index}",copy_expression)
[docs]defsample_and_merge(self,recycle:bool=False)->Database:"""Loops on the individuals and generate a choice set for each of them :param recycle: if True, if the data file already exists, it is not re-created. :return: database for Biogeme """ifrecycle:ifos.path.exists(self.biogeme_file_name):biogeme_data=pd.read_csv(self.biogeme_file_name)biogeme_database=Database("merged_data",biogeme_data)returnbiogeme_databasewarning_msg=f"File {self.biogeme_file_name} does not exist."logger.warning(warning_msg)size=(f'{self.total_sample_size}'ifself.second_sample_sizeisNoneelsef'{self.total_sample_size} + {self.second_sample_size}')logger.info(f"Generating {size} alternatives for "f"{self.number_of_individuals} observations")biogeme_data=self.individuals.progress_apply(self.process_row,axis=1,result_type="expand")biogeme_database=Database("merged_data",biogeme_data)logger.info("Define new variables")self.define_new_variables(biogeme_database)biogeme_data.to_csv(self.biogeme_file_name,index=False)logger.info(f"File {self.biogeme_file_name} has been created.")returnbiogeme_database