"""
DataContainer: Responsible for holding and safely manipulating
the Biogeme dataset stored as a Pandas DataFrame.
Michel Bierlaire
Wed Mar 26 19:30:57 2025
"""
from __future__ import annotations
import logging
from collections.abc import Callable
import numpy as np
import pandas as pd
from biogeme.deprecated import deprecated
from biogeme.exceptions import BiogemeError
from biogeme.expressions import (
Expression,
ExpressionOrNumeric,
Variable,
validate_and_convert,
)
from biogeme.floating_point import JAX_FLOAT, PANDAS_FLOAT
from biogeme.second_derivatives import SecondDerivativesMode
from biogeme.segmentation import (
DiscreteSegmentationTuple,
generate_segmentation,
verify_segmentation,
)
from jax import numpy as jnp
from .sampling import sample_with_replacement
logger = logging.getLogger(__name__)
"""Logger that controls the output of
messages to the screen and log file.
"""
[docs]
class Database:
"""Encapsulates a pandas DataFrame for Biogeme, providing safe access
and basic operations such as checking for emptiness, scaling,
and column manipulation.
"""
def __init__(self, name: str, dataframe: pd.DataFrame, use_jit: bool = True):
"""
Constructor
:param name: name of the database
:param dataframe: the data in pandas format
:raises BiogemeError: if the dataframe is empty
"""
self.name = name
if dataframe.empty:
raise BiogemeError('Database has no entry')
try:
self._df = dataframe.astype(PANDAS_FLOAT)
except ValueError as e:
raise BiogemeError(f'Data type conversion failed: {e}')
self.use_jit = use_jit
self.number_of_excluded_data = 0
self._listeners = [] # Called when the database is updated
self.panel_column: str | None = None
[docs]
@classmethod
def dummy_database(
cls,
) -> Database:
df = pd.DataFrame({'x': [0]}) # single-row dummy input
return Database('dummy', df)
def __str__(self) -> str:
return f'biogeme database {self.name}'
[docs]
def register_listener(self, callback: Callable[[pd.Index], None]):
self._listeners.append(callback)
@property
def dataframe(self) -> pd.DataFrame:
"""Returns a reference to the internal DataFrame."""
return self._df
[docs]
def get_copy(self, name_of_copy: str | None = None) -> Database:
"""Returns a copy of the database"""
the_name = f'{self.name}_copy' if name_of_copy is None else name_of_copy
return Database(the_name, self.dataframe.copy())
[docs]
def bootstrap_sample(self):
"""Returns a bootstrap sample of the data."""
df = sample_with_replacement(self._df)
return Database(f'{self.name}_bootstrap', df)
@property
def data_jax(self) -> jnp.ndarray:
"""Returns the data as a biogeme_jax object"""
return jnp.asarray(self._df.to_numpy(), dtype=JAX_FLOAT)
[docs]
def is_empty(self) -> bool:
"""Returns True if the data container is empty"""
return self._df.empty
[docs]
def num_rows(self) -> int:
"""Returns the number of rows in the dataset"""
return self._df.shape[0]
[docs]
def num_columns(self) -> int:
"""Returns the number of columns in the dataset"""
return self._df.shape[1]
[docs]
def column_exists(self, column: str) -> bool:
"""Check if a column exists in the data"""
return column in self._df.columns
[docs]
def scale_column(self, column: str, scale: float):
"""Scales all values in a given column
:param column: name of the column to scale
:param scale: scalar to multiply the column values by
:raises BiogemeError: if the column is not found
"""
if column not in self._df:
raise BiogemeError(f'Column {column} not found in the database.')
self._df[column] *= scale
[docs]
def add_column(self, column: str, values: pd.Series) -> None:
"""Adds a new column to the dataset
:param column: name of the new column
:param values: a pandas Series of same length as data
:raises ValueError: if column already exists or lengths mismatch
"""
if column in self._df.columns:
raise ValueError(f'Column "{column}" already exists.')
if len(values) != self.num_rows():
raise ValueError(
f'Length mismatch: column has {len(values)} values, '
f'expected {self.num_rows()}.'
)
self._df[column] = values
[docs]
def remove_rows(self, condition: pd.Series):
"""Removes all rows where the condition is True
:param condition: Boolean Series of same length as the data
"""
# Build a boolean mask aligned to the current DataFrame index without
# triggering pandas' future warning about silent downcasting on fillna.
cond = pd.Series(condition)
# Align to index first
cond = cond.reindex(self._df.index)
# Ensure we are not on object dtype before filling NAs:
# Prefer pandas' nullable boolean, then downcast to plain bool.
try:
cond = cond.astype('boolean') # BoolDtype with NA support
except (TypeError, ValueError):
# Fallbacks if values are heterogeneous: try to infer objects
# and coerce typical truthy patterns; final fallback: nonzero test.
cond = cond.infer_objects(copy=False)
if cond.dtype == object:
# Map common textual/numeric truthy/falsey to booleans, leave others as NA
_TRUE = {True, 1, 1.0, 'True', 'true', 'TRUE'}
_FALSE = {False, 0, 0.0, 'False', 'false', 'FALSE', ''}
cond = cond.map(
lambda v: True if v in _TRUE else (False if v in _FALSE else pd.NA)
)
cond = cond.astype('boolean')
else:
cond = cond != 0
cond = cond.astype('boolean')
# Now safely fill NA and convert to plain bool
cond = cond.fillna(False).astype(bool)
if len(cond) != len(self._df):
raise ValueError(
f'Condition length {len(cond)} != dataframe length {len(self._df)}'
)
self._df = self._df.loc[~cond].reset_index(drop=True)
condition_index = cond[cond].index
for callback in self._listeners:
callback(condition_index)
[docs]
def reset_indices(self) -> None:
self._df = self._df.reset_index(drop=True)
[docs]
def remove(self, exclude_condition: ExpressionOrNumeric):
"""
Removes rows from the database that satisfy a given condition.
This method evaluates a Biogeme expression row by row on the database.
All rows where the expression evaluates to a truthy value are removed.
:param exclude_condition: A Biogeme expression that returns a boolean-like value
for each row in the dataset. Rows where the result is
True (nonzero) will be excluded.
"""
from biogeme.jax_calculator import evaluate_simple_expression_per_row
exclude_condition: Expression = validate_and_convert(exclude_condition)
condition = evaluate_simple_expression_per_row(
expression=exclude_condition,
database=self,
numerically_safe=True,
second_derivatives_mode=SecondDerivativesMode.NEVER,
use_jit=self.use_jit,
)
series = pd.Series(condition != 0.0)
self.number_of_excluded_data = int(series.sum())
self.remove_rows(series)
[docs]
def define_variable(self, name: str, expression: Expression) -> Variable:
"""
This method evaluates a Biogeme expression row by row on the database
and creates a new column in the internal DataFrame with the results.
:param name: Name of the new column to be added.
:param expression: Biogeme expression to evaluate for each row.
"""
if name in self.dataframe.columns:
error_msg = f'Variable {name} already exists'
raise ValueError(error_msg)
if self.dataframe.empty:
error_msg = 'Empty database.'
raise BiogemeError(error_msg)
from biogeme.jax_calculator import evaluate_simple_expression_per_row
new_values = evaluate_simple_expression_per_row(
expression=expression,
database=self,
numerically_safe=True,
second_derivatives_mode=SecondDerivativesMode.NEVER,
use_jit=self.use_jit,
)
if np.isnan(new_values).any():
num_total = len(new_values)
num_nan = np.isnan(new_values).sum()
nan_indices = np.where(np.isnan(new_values))[0].tolist()
message = f"The evaluated values for '{name}' contain NaN entries.\n"
message += f'Total values: {num_total}, NaN values: {num_nan}.\n'
if num_nan == num_total:
message += 'All values are NaN.'
else:
message += f'Indices with NaN: {nan_indices}'
raise BiogemeError(message)
self.dataframe[name] = pd.Series(
new_values, index=self.dataframe.index, dtype=PANDAS_FLOAT
)
return Variable(name)
[docs]
@deprecated(new_func=define_variable)
def DefineVariable(self, name: str, expression: Expression) -> Variable:
"""
This method evaluates a Biogeme expression row by row on the database
and creates a new column in the internal DataFrame with the results.
:param name: Name of the new column to be added.
:param expression: Biogeme expression to evaluate for each row.
"""
pass
[docs]
def remove_column(self, column: str):
"""Removes a column from the dataset"""
if column in self._df.columns:
self._df.drop(columns=[column], inplace=True)
[docs]
def get_column(self, column: str) -> pd.Series:
"""Returns the values of a column"""
if column not in self._df.columns:
raise BiogemeError(f'Column "{column}" not found.')
return self._df[column]
[docs]
def generate_segmentation(
self,
variable: Variable | str,
mapping: dict[int, str] | None = None,
reference: str | None = None,
) -> DiscreteSegmentationTuple:
"""Generate a segmentation tuple for a variable.
:param variable: Variable object or name of the variable
:param mapping: mapping associating values of the variable to
names. If incomplete, default names are provided.
:param reference: name of the reference category. If None, an
arbitrary category is selected as reference.
"""
return generate_segmentation(
dataframe=self.dataframe,
variable=variable,
mapping=mapping,
reference=reference,
)
[docs]
def panel(self, column_name: str):
self.panel_column = column_name
[docs]
def verify_segmentation(self, segmentation: DiscreteSegmentationTuple) -> None:
"""Verifies if the definition of the segmentation is consistent with the data
:param segmentation: definition of the segmentation
:raise BiogemeError: if the segmentation is not consistent with the data.
"""
verify_segmentation(dataframe=self.dataframe, segmentation=segmentation)
[docs]
def suggest_scaling(
self, columns: list[str] | None = None, report_all: bool = False
) -> pd.DataFrame:
"""Suggest a scaling of the variables in the database.
For each column, :math:`\\delta` is the difference between the
largest and the smallest value, or one if the difference is
smaller than one. The level of magnitude is evaluated as a
power of 10. The suggested scale is the inverse of this value.
.. math:: s = \\frac{1}{10^{|\\log_{10} \\delta|}}
where :math:`|x|` is the integer closest to :math:`x`.
:param columns: list of columns to be considered.
If None, all of them will be considered.
:param report_all: if False, remove entries where the suggested
scale is 1, 0.1 or 10
:return: A Pandas dataframe where each row contains the name
of the variable and the suggested scale s. Ideally,
the column should be multiplied by s.
:raise BiogemeError: if a variable in ``columns`` is unknown.
"""
if columns is None:
columns = self.dataframe.columns
else:
for c in columns:
if c not in self.dataframe:
error_msg = f'Variable {c} not found.'
raise BiogemeError(error_msg)
largest_value = [
max(np.abs(self.dataframe[col].max()), np.abs(self.dataframe[col].min()))
for col in columns
]
res = [
[col, 1 / 10 ** np.round(np.log10(max(1.0, lv))), lv]
for col, lv in zip(columns, largest_value)
]
df = pd.DataFrame(res, columns=['Column', 'Scale', 'Largest'])
if not report_all:
# Remove entries where the suggested scale is 1, 0.1 or 10
remove = (df.Scale == 1) | (df.Scale == 0.1) | (df.Scale == 10)
df.drop(df[remove].index, inplace=True)
return df
[docs]
def is_panel(self) -> bool:
return self.panel_column is not None