Source code for biogeme.tools.database

import pandas as pd

from biogeme.exceptions import BiogemeError
from biogeme.deprecated import deprecated


[docs] def count_number_of_groups(df: pd.DataFrame, column: str) -> int: """ This function counts the number of groups of same value in a column. For instance: 1,2,2,3,3,3,4,1,1 would give 5. Example:: >>>df = pd.DataFrame({'ID': [1, 1, 2, 3, 3, 1, 2, 3], 'value':[1000, 2000, 3000, 4000, 5000, 5000, 10000, 20000]}) >>>count_number_of_groups(df,'ID') 6 >>>count_number_of_groups(df,'value') 7 """ df['_bio_groups'] = pd.Series(df[column] != df[column].shift(1)).cumsum() result = len(df['_bio_groups'].unique()) df.drop(columns=['_bio_groups'], inplace=True) return result
[docs] @deprecated(count_number_of_groups) def countNumberOfGroups(df: pd.DataFrame, column: str) -> int: pass
[docs] def flatten_database( df: pd.DataFrame, merge_id: str, row_name: str | None = None, identical_columns: list[str] | None = None, ) -> pd.DataFrame: """Combine several rows of a Pandas database into one. For instance, consider the following database:: ID Age Cost Name 0 1 23 34 Item3 1 1 23 45 Item4 2 1 23 12 Item7 3 2 45 65 Item3 4 2 45 34 Item7 If row_name is 'Name', the function generates the same data in the following format:: Age Item3_Cost Item4_Cost Item7_Cost ID 1 23 34 45.0 12 2 45 65 NaN 34 If row_name is None, the function generates the same data in the following format:: Age 1_Cost 1_Name 2_Cost 2_Name 3_Cost 3_Name ID 1 23 34 Item3 45 Item4 12.0 Item7 2 45 65 Item3 34 Item7 NaN NaN :param df: initial data frame :type df: pandas.DataFrame :param merge_id: name of the column that identifies rows that should be merged. In the above example: 'ID' :type merge_id: str :param row_name: name of the columns that provides the name of the rows in the new dataframe. In the example above: 'Name'. If None, the rows are numbered sequentially. :type row_name: str :param identical_columns: name of the columns that contain identical values across the rows of a group. In the example above: ['Age']. If None, these columns are automatically detected. On large database, there may be a performance issue. :type identical_columns: list(str) :return: reformatted database :rtype: pandas.DataFrame """ df_copy = df.copy() all_columns = set(df_copy.columns) duplicate = f'{merge_id}_biogeme_tmp_duplicate' df_copy[duplicate] = df_copy.loc[:, merge_id] grouped = df_copy.groupby(by=duplicate) def are_values_identical(col: pd.Series) -> bool: """This function checks if all the values in a column are identical :param col: the column :return: True if all values are identical. False otherwise. """ return (col.iloc[0] == col).all(0) def get_varying_cols(g: pd.DataFrame) -> set[str]: """This functions returns the name of all columns that have constant values within each group of data. :param g: group of data :return: name of all columns that have constant values within each group of data. """ return {colname for colname, col in g.items() if not are_values_identical(col)} if identical_columns is None: all_varying_cols = grouped.apply(get_varying_cols, include_groups=False) varying_columns = set.union(*all_varying_cols) identical_columns = list(all_columns - varying_columns) varying_columns = list(varying_columns) else: identical_columns = set(identical_columns) identical_columns.add(merge_id) varying_columns = list(all_columns - identical_columns) # Take the first row for columns that are identical if identical_columns: common_data = df_copy[list(identical_columns)].drop_duplicates( merge_id, keep='first' ) common_data.index = common_data[merge_id] # Treat the other columns # Include merge_id and a duplicate tmp_df = df_copy[[merge_id] + list(varying_columns)].copy() tmp_df[duplicate] = tmp_df[merge_id].copy() grouped_varying = tmp_df.groupby(by=duplicate) def treat(x: pd.DataFrame) -> pd.DataFrame: """Treat a group of data. :param x: group of data :return: the same data organized in one row, with proper column names :raise BiogemeError: if there are duplicates in the name of the row. Indeed, in that case, they cannot be used to name the new columns. """ if not are_values_identical(x[merge_id]): err_msg = f'Group has different IDs: {x[merge_id]}. ' f'Rows id: {x.index}' raise BiogemeError(err_msg) if row_name is not None and not x[row_name].is_unique: err_msg = ( f'Entries in column [{row_name}] are not unique. ' f'This column cannot be used to name the new ' f'columns:\n{x[[row_name, merge_id]]}. ' ) raise BiogemeError(err_msg) the_columns = set(x.columns) - {merge_id} if row_name is not None: the_columns -= {row_name} sorted_list = sorted(list(the_columns)) first = True i = 0 for _, row in x.iterrows(): i += 1 if first: all_values = [row[merge_id]] all_columns = [merge_id] first = False name = f'{i}' if row_name is None else row[row_name] columns = [f'{name}_{c}' for c in sorted_list] all_values.extend([row[c] for c in sorted_list]) all_columns.extend(columns) treated_df = pd.DataFrame([all_values], columns=all_columns) return treated_df flat_data = grouped_varying.apply(treat, include_groups=False) flat_data.index = flat_data[merge_id] # We remove the column 'merge_id' as it is stored as index. if identical_columns: return pd.concat([common_data, flat_data], axis='columns').drop( columns=[merge_id] ) return flat_data.drop(columns=[merge_id])