"""
DataContainer: Responsible for holding and safely manipulating
the Biogeme dataset stored as a Pandas DataFrame.
Michel Bierlaire
Wed Mar 26 19:30:57 2025
"""
from __future__ import annotations
import logging
from collections.abc import Callable
import jax.numpy as jnp
import numpy as np
import pandas as pd
from biogeme.deprecated import deprecated
from biogeme.exceptions import BiogemeError
from biogeme.expressions import (
Expression,
ExpressionOrNumeric,
Variable,
validate_and_convert,
)
from biogeme.floating_point import JAX_FLOAT, PANDAS_FLOAT
from biogeme.second_derivatives import SecondDerivativesMode
from biogeme.segmentation import (
DiscreteSegmentationTuple,
generate_segmentation,
verify_segmentation,
)
from .sampling import sample_with_replacement
logger = logging.getLogger(__name__)
"""Logger that controls the output of
messages to the screen and log file.
"""
[docs]
class Database:
"""Encapsulates a pandas DataFrame for Biogeme, providing safe access
and basic operations such as checking for emptiness, scaling,
and column manipulation.
"""
def __init__(self, name: str, dataframe: pd.DataFrame, use_jit: bool = True):
"""
Constructor
:param name: name of the database
:param dataframe: the data in pandas format
:raises BiogemeError: if the dataframe is empty
"""
self.name = name
if dataframe.empty:
raise BiogemeError('Database has no entry')
try:
self._df = dataframe.astype(PANDAS_FLOAT)
except ValueError as e:
raise BiogemeError(f'Data type conversion failed: {e}')
self.use_jit = use_jit
self.number_of_excluded_data = 0
self._listeners = [] # Called when the database is updated
self.panel_column: str | None = None
[docs]
@classmethod
def dummy_database(
cls,
) -> Database:
df = pd.DataFrame({'x': [0]}) # single-row dummy input
return Database('dummy', df)
def __str__(self) -> str:
return f'biogeme database {self.name}'
[docs]
def register_listener(self, callback: Callable[[pd.Index], None]):
self._listeners.append(callback)
@property
def dataframe(self) -> pd.DataFrame:
"""Returns a reference to the internal DataFrame."""
return self._df
[docs]
def bootstrap_sample(self):
"""Returns a bootstrap sample of the data."""
df = sample_with_replacement(self._df)
return Database(f'{self.name}_bootstrap', df)
@property
def data_jax(self) -> jnp.ndarray:
"""Returns the data as a biogeme_jax object"""
return jnp.asarray(self._df.to_numpy(), dtype=JAX_FLOAT)
[docs]
def is_empty(self) -> bool:
"""Returns True if the data container is empty"""
return self._df.empty
[docs]
def num_rows(self) -> int:
"""Returns the number of rows in the dataset"""
return self._df.shape[0]
[docs]
def num_columns(self) -> int:
"""Returns the number of columns in the dataset"""
return self._df.shape[1]
[docs]
def column_exists(self, column: str) -> bool:
"""Check if a column exists in the data"""
return column in self._df.columns
[docs]
def scale_column(self, column: str, scale: float):
"""Scales all values in a given column
:param column: name of the column to scale
:param scale: scalar to multiply the column values by
:raises BiogemeError: if the column is not found
"""
if column not in self._df:
raise BiogemeError(f'Column {column} not found in the database.')
self._df[column] *= scale
[docs]
def add_column(self, column: str, values: pd.Series) -> None:
"""Adds a new column to the dataset
:param column: name of the new column
:param values: a pandas Series of same length as data
:raises ValueError: if column already exists or lengths mismatch
"""
if column in self._df.columns:
raise ValueError(f'Column "{column}" already exists.')
if len(values) != self.num_rows():
raise ValueError(
f'Length mismatch: column has {len(values)} values, '
f'expected {self.num_rows()}.'
)
self._df[column] = values
[docs]
def remove_rows(self, condition: pd.Series):
"""Removes all rows where the condition is True
:param condition: Boolean Series of same length as the data
"""
# Build a boolean mask aligned to the current DataFrame index without
# triggering pandas' future warning about silent downcasting on fillna.
cond = pd.Series(condition)
# Align to index first
cond = cond.reindex(self._df.index)
# Ensure we are not on object dtype before filling NAs:
# Prefer pandas' nullable boolean, then downcast to plain bool.
try:
cond = cond.astype("boolean") # BoolDtype with NA support
except (TypeError, ValueError):
# Fallbacks if values are heterogeneous: try to infer objects
# and coerce typical truthy patterns; final fallback: nonzero test.
cond = cond.infer_objects(copy=False)
if cond.dtype == object:
# Map common textual/numeric truthy/falsey to booleans, leave others as NA
_TRUE = {True, 1, 1.0, "True", "true", "TRUE"}
_FALSE = {False, 0, 0.0, "False", "false", "FALSE", ""}
cond = cond.map(
lambda v: True if v in _TRUE else (False if v in _FALSE else pd.NA)
)
cond = cond.astype("boolean")
else:
cond = cond != 0
cond = cond.astype("boolean")
# Now safely fill NA and convert to plain bool
cond = cond.fillna(False).astype(bool)
if len(cond) != len(self._df):
raise ValueError(
f"Condition length {len(cond)} != dataframe length {len(self._df)}"
)
self._df = self._df.loc[~cond].reset_index(drop=True)
condition_index = cond[cond].index
for callback in self._listeners:
callback(condition_index)
[docs]
def reset_indices(self) -> None:
self._df = self._df.reset_index(drop=True)
[docs]
def remove(self, exclude_condition: ExpressionOrNumeric):
"""
Removes rows from the database that satisfy a given condition.
This method evaluates a Biogeme expression row by row on the database.
All rows where the expression evaluates to a truthy value are removed.
:param exclude_condition: A Biogeme expression that returns a boolean-like value
for each row in the dataset. Rows where the result is
True (nonzero) will be excluded.
"""
from biogeme.calculator import evaluate_simple_expression_per_row
exclude_condition: Expression = validate_and_convert(exclude_condition)
condition = evaluate_simple_expression_per_row(
expression=exclude_condition,
database=self,
numerically_safe=True,
second_derivatives_mode=SecondDerivativesMode.NEVER,
use_jit=self.use_jit,
)
series = pd.Series(condition != 0.0)
self.number_of_excluded_data = int(series.sum())
self.remove_rows(series)
[docs]
def define_variable(self, name: str, expression: Expression) -> Variable:
"""
This method evaluates a Biogeme expression row by row on the database
and creates a new column in the internal DataFrame with the results.
:param name: Name of the new column to be added.
:param expression: Biogeme expression to evaluate for each row.
"""
if name in self.dataframe.columns:
error_msg = f'Variable {name} already exists'
raise ValueError(error_msg)
if self.dataframe.empty:
error_msg = 'Empty database.'
raise BiogemeError(error_msg)
from biogeme.calculator import evaluate_simple_expression_per_row
new_values = evaluate_simple_expression_per_row(
expression=expression,
database=self,
numerically_safe=True,
second_derivatives_mode=SecondDerivativesMode.NEVER,
use_jit=self.use_jit,
)
if np.isnan(new_values).any():
num_total = len(new_values)
num_nan = np.isnan(new_values).sum()
nan_indices = np.where(np.isnan(new_values))[0].tolist()
message = f"The evaluated values for '{name}' contain NaN entries.\n"
message += f"Total values: {num_total}, NaN values: {num_nan}.\n"
if num_nan == num_total:
message += "All values are NaN."
else:
message += f"Indices with NaN: {nan_indices}"
raise BiogemeError(message)
self.dataframe[name] = pd.Series(
new_values, index=self.dataframe.index, dtype=PANDAS_FLOAT
)
return Variable(name)
[docs]
@deprecated(new_func=define_variable)
def DefineVariable(self, name: str, expression: Expression) -> Variable:
"""
This method evaluates a Biogeme expression row by row on the database
and creates a new column in the internal DataFrame with the results.
:param name: Name of the new column to be added.
:param expression: Biogeme expression to evaluate for each row.
"""
pass
[docs]
def remove_column(self, column: str):
"""Removes a column from the dataset"""
if column in self._df.columns:
self._df.drop(columns=[column], inplace=True)
[docs]
def get_column(self, column: str) -> pd.Series:
"""Returns the values of a column"""
if column not in self._df.columns:
raise BiogemeError(f'Column "{column}" not found.')
return self._df[column]
[docs]
def generate_segmentation(
self,
variable: Variable | str,
mapping: dict[int, str] | None = None,
reference: str | None = None,
) -> DiscreteSegmentationTuple:
"""Generate a segmentation tuple for a variable.
:param variable: Variable object or name of the variable
:param mapping: mapping associating values of the variable to
names. If incomplete, default names are provided.
:param reference: name of the reference category. If None, an
arbitrary category is selected as reference.
"""
return generate_segmentation(
dataframe=self.dataframe,
variable=variable,
mapping=mapping,
reference=reference,
)
[docs]
def panel(self, column_name: str):
self.panel_column = column_name
[docs]
def verify_segmentation(self, segmentation: DiscreteSegmentationTuple) -> None:
"""Verifies if the definition of the segmentation is consistent with the data
:param segmentation: definition of the segmentation
:raise BiogemeError: if the segmentation is not consistent with the data.
"""
verify_segmentation(dataframe=self.dataframe, segmentation=segmentation)
[docs]
def suggest_scaling(
self, columns: list[str] | None = None, report_all: bool = False
) -> pd.DataFrame:
"""Suggest a scaling of the variables in the database.
For each column, :math:`\\delta` is the difference between the
largest and the smallest value, or one if the difference is
smaller than one. The level of magnitude is evaluated as a
power of 10. The suggested scale is the inverse of this value.
.. math:: s = \\frac{1}{10^{|\\log_{10} \\delta|}}
where :math:`|x|` is the integer closest to :math:`x`.
:param columns: list of columns to be considered.
If None, all of them will be considered.
:param report_all: if False, remove entries where the suggested
scale is 1, 0.1 or 10
:return: A Pandas dataframe where each row contains the name
of the variable and the suggested scale s. Ideally,
the column should be multiplied by s.
:raise BiogemeError: if a variable in ``columns`` is unknown.
"""
if columns is None:
columns = self.dataframe.columns
else:
for c in columns:
if c not in self.dataframe:
error_msg = f'Variable {c} not found.'
raise BiogemeError(error_msg)
largest_value = [
max(np.abs(self.dataframe[col].max()), np.abs(self.dataframe[col].min()))
for col in columns
]
res = [
[col, 1 / 10 ** np.round(np.log10(max(1.0, lv))), lv]
for col, lv in zip(columns, largest_value)
]
df = pd.DataFrame(res, columns=['Column', 'Scale', 'Largest'])
if not report_all:
# Remove entries where the suggested scale is 1, 0.1 or 10
remove = (df.Scale == 1) | (df.Scale == 0.1) | (df.Scale == 10)
df.drop(df[remove].index, inplace=True)
return df
[docs]
def is_panel(self) -> bool:
return self.panel_column is not None