# Source code for biogeme.tools

```"""Implements some useful functions

:author: Michel Bierlaire

:date: Sun Apr 14 10:46:10 2019

"""

import sys
from datetime import timedelta
import logging
from itertools import product
from os import path
import shutil
from collections import defaultdict
import types
from typing import NamedTuple, Callable, Any, Optional, Type
import tempfile
import uuid
import numpy as np
import pandas as pd
from scipy.stats import chi2
import biogeme.exceptions as excep

logger = logging.getLogger(__name__)

[docs]
class LRTuple(NamedTuple):
"""Tuple for the likelihood ratio test"""

message: str
statistic: float
threshold: float

[docs]
def findiff_g(
the_function: Callable[[np.ndarray], tuple[float, ...]], x: np.ndarray
) -> np.ndarray:
"""Calculates the gradient of a function :math:`f` using finite differences

:param the_function: A function object that takes a vector as an
argument, and returns a tuple. The first
element of the tuple is the value of the
function :math:`f`. The other elements are not
used.

:param x: argument of the function

:return: numpy vector, same dimension as x, containing the gradient
calculated by finite differences.
"""
x = x.astype(float)
tau = 0.0000001
n = len(x)
g = np.zeros(n)
f = the_function(x)[0]
for i in range(n):
xi = x.item(i)
xp = x.copy()
if abs(xi) >= 1:
s = tau * xi
elif xi >= 0:
s = tau
else:
s = -tau
xp[i] = xi + s
fp = the_function(xp)[0]
g[i] = (fp - f) / s
return g

[docs]
def findiff_H(
the_function: Callable[[np.ndarray], tuple[float, np.ndarray, Any]], x: np.ndarray
) -> np.ndarray:
"""Calculates the hessian of a function :math:`f` using finite differences

:param the_function: A function object that takes a vector as an
argument, and returns a tuple. The first
element of the tuple is the value of the
function :math:`f`, and the second is the
gradient of the function.  The other elements
are not used.

:param x: argument of the function
:return: numpy matrix containing the hessian calculated by
finite differences.
"""
tau = 1.0e-7
n = len(x)
H = np.zeros((n, n))
g = the_function(x)[1]
eye = np.eye(n, n)
for i in range(n):
xi = x.item(i)
if abs(xi) >= 1:
s = tau * xi
elif xi >= 0:
s = tau
else:
s = -tau
ei = eye[i]
gp = the_function(x + s * ei)[1]
H[:, i] = (gp - g).flatten() / s
return H

[docs]
def checkDerivatives(
the_function: Callable[[np.ndarray], tuple[float, np.ndarray, np.ndarray]],
x: np.ndarray,
names: Optional[list[str]] = None,
logg: Optional[bool] = False,
) -> tuple[float, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Verifies the analytical derivatives of a function by comparing
them with finite difference approximations.

:param the_function: A function object that takes a vector as an argument,
and returns a tuple:

- The first element of the tuple is the value of the
function :math:`f`,
- the second is the gradient of the function,
- the third is the hessian.

:param x: arguments of the function

:param names: the names of the entries of x (for reporting).

:param logg: if True, messages will be displayed.

:return: tuple f, g, h, gdiff, hdiff where

- f is the value of the function at x,
- g is the analytical gradient,
- h is the analytical hessian,
- gdiff is the difference between the analytical gradient
and the finite difference approximation
- hdiff is the difference between the analytical hessian
and the finite difference approximation

"""
x = np.array(x, dtype=float)
f, g, h = the_function(x)
g_num = findiff_g(the_function, x)
gdiff = g - g_num
if logg:
if names is None:
names = [f'x[{i}]' for i in range(len(x))]
for k, v in enumerate(gdiff):
logger.info(f'{names[k]:15}\t{g[k]:+E}\t{g_num[k]:+E}\t{v:+E}')

h_num = findiff_H(the_function, x)
hdiff = h - h_num
if logg:
logger.info('Row\t\tCol\t\tHessian\tFinDiff\t\tDifference')
for row in range(len(hdiff)):
for col in range(len(hdiff)):
logger.info(
f'{names[row]:15}\t{names[col]:15}\t{h[row,col]:+E}\t'
f'{h_num[row,col]:+E}\t{hdiff[row,col]:+E}'
)
return f, g, h, gdiff, hdiff

[docs]
def get_prime_numbers(n: int) -> list[int]:
"""Get a given number of prime numbers

:param n: number of primes that are requested

:return: array with prime numbers

:raise BiogemeError: if the requested number is non positive or a float

"""
total = 0
upper_bound = 100
if n <= 0:
raise excep.BiogemeError(f'Incorrect number: {n}')

while total < n:
upper_bound *= 10
primes = calculate_prime_numbers(upper_bound)
total = len(primes)
try:
return primes[0:n]
except TypeError as e:
raise excep.BiogemeError(f'Incorrect number: {n}') from e

[docs]
def calculate_prime_numbers(upper_bound: int) -> list[int]:
"""Calculate prime numbers

:param upper_bound: prime numbers up to this value will be computed

:return: array with prime numbers

:raise BiogemeError: if the upper_bound is incorrectly defined
(negative number, e.g.)

>>> tools.calculate_prime_numbers(10)
[2, 3, 5, 7]

"""
if upper_bound < 0:
raise excep.BiogemeError(f'Incorrect value: {upper_bound}')
try:
mywork = list(range(0, upper_bound + 1))
except TypeError as e:
raise excep.BiogemeError(f'Incorrect value: {upper_bound}') from e

try:
largest = int(np.ceil(np.sqrt(float(upper_bound))))
except ValueError as e:
raise excep.BiogemeError(f'Incorrect value: {upper_bound}') from e

# Remove all multiples
for i in range(2, largest + 1):
if mywork[i] != 0:
for k in range(2 * i, upper_bound + 1, i):
mywork[k] = 0
# Gather non zero entries, which are the prime numbers
myprimes = []
for i in range(1, upper_bound + 1):
if mywork[i] != 0 and mywork[i] != 1:
myprimes += [mywork[i]]

return myprimes

[docs]
def countNumberOfGroups(df: pd.DataFrame, column: str) -> int:
"""
This function counts the number of groups of same value in a column.
For instance: 1,2,2,3,3,3,4,1,1  would give 5.

Example::

>>>df = pd.DataFrame({'ID': [1, 1, 2, 3, 3, 1, 2, 3],
'value':[1000,
2000,
3000,
4000,
5000,
5000,
10000,
20000]})
>>>tools.countNumberOfGroups(df,'ID')
6

>>>tools.countNumberOfGroups(df,'value')
7

"""
df['_biogroups'] = (df[column] != df[column].shift(1)).cumsum()
result = len(df['_biogroups'].unique())
df.drop(columns=['_biogroups'], inplace=True)
return result

[docs]
def likelihood_ratio_test(
model1: tuple[float, int],
model2: tuple[float, int],
significance_level: float = 0.05,
) -> LRTuple:
"""This function performs a likelihood ratio test between a
restricted and an unrestricted model.

:param model1: the final loglikelihood of one model, and the number of
estimated parameters.

:param model2: the final loglikelihood of the other model, and
the number of estimated parameters.

:param significance_level: level of significance of the test. Default: 0.05

:return: a tuple containing:

- a message with the outcome of the test
- the statistic, that is minus two times the difference
between the loglikelihood  of the two models
- the threshold of the chi square distribution.

:raise BiogemeError: if the unrestricted model has a lower log
likelihood than the restricted model.

"""

loglike_m1, df_m1 = model1
loglike_m2, df_m2 = model2
if loglike_m1 > loglike_m2:
if df_m1 < df_m2:
raise excep.BiogemeError(
f'The unrestricted model {model2} has a '
f'lower log likelihood than the restricted one {model1}'
)
loglike_ur = loglike_m1
loglike_r = loglike_m2
df_ur = df_m1
df_r = df_m2
else:
if df_m1 >= df_m2:
raise excep.BiogemeError(
f'The unrestricted model {model1} has a '
f'lower log likelihood than the restricted one {model2}'
)
loglike_ur = loglike_m2
loglike_r = loglike_m1
df_ur = df_m2
df_r = df_m1

stat = -2 * (loglike_r - loglike_ur)
chi_df = df_ur - df_r
threshold = chi2.ppf(1 - significance_level, chi_df)
if stat <= threshold:
final_msg = f'H0 cannot be rejected at level {100*significance_level:.1f}%'
else:
final_msg = f'H0 can be rejected at level {100*significance_level:.1f}%'
return LRTuple(message=final_msg, statistic=stat, threshold=threshold)

[docs]
def flatten_database(
df: pd.DataFrame,
merge_id: str,
row_name: str = None,
identical_columns: list[str] = None,
) -> pd.DataFrame:
"""Combine several rows of a Pandas database into one.
For instance, consider the following database::

ID  Age  Cost   Name
0   1   23    34  Item3
1   1   23    45  Item4
2   1   23    12  Item7
3   2   45    65  Item3
4   2   45    34  Item7

If row_name is 'Name', the function generates the same data in the
following format::

Age  Item3_Cost  Item4_Cost  Item7_Cost
ID
1    23          34        45.0          12
2    45          65         NaN          34

If row_name is None, the function generates the same data in the
following format::

Age  1_Cost 1_Name  2_Cost 2_Name  3_Cost 3_Name
ID
1    23      34  Item3      45  Item4    12.0  Item7
2    45      65  Item3      34  Item7     NaN    NaN

:param df: initial data frame
:type df: pandas.DataFrame

:param merge_id: name of the column that identifies rows that
should be merged. In the above example: 'ID'
:type merge_id: str

:param row_name: name of the columns that provides the name of the
rows in the new dataframe. In the example above: 'Name'. If
None, the rows are numbered sequentially.
:type row_name: str

:param identical_columns: name of the columns that contain
identical values across the rows of a group. In the example
above: ['Age']. If None, these columns are automatically
detected. On large database, there may be a performance issue.
:type identical_columns: list(str)

:return: reformatted database
:rtype: pandas.DataFrame

"""
grouped = df.groupby(by=merge_id)
all_columns = set(df.columns)

def are_values_identical(col: pd.Series) -> bool:
"""This function checks if all the values in a column
are identical

:param col: the column

:return: True if all values are identical. False otherwise.
"""

return (col.iloc[0] == col).all(0)

def get_varying_cols(g: pd.DataFrame) -> set[str]:
"""This functions returns the name of all columns
that have constant values within each group of data.

:param g: group of data

:return: name of all columns that have constant values
within each group of data.
"""
return {colname for colname, col in g.items() if not are_values_identical(col)}

if identical_columns is None:
all_varying_cols = grouped.apply(get_varying_cols)
varying_columns = set.union(*all_varying_cols)
identical_columns = list(all_columns - varying_columns)
varying_columns = list(varying_columns)
else:
identical_columns = set(identical_columns)
varying_columns = list(all_columns - identical_columns)

# Take the first row for columns that are identical
if identical_columns:
common_data = df[list(identical_columns)].drop_duplicates(
merge_id, keep='first'
)
common_data.index = common_data[merge_id]
# Treat the other columns
grouped_varying = df[[merge_id] + list(varying_columns)].groupby(by=merge_id)

def treat(x: pd.DataFrame) -> pd.DataFrame:
"""Treat a group of data.

:param x: group of data

:return: the same data organized in one row, with proper column names

:raise BiogemeError:  if there are duplicates in the name of
the row. Indeed, in that case, they cannot be used to name the
new columns.
"""
if not are_values_identical(x[merge_id]):
err_msg = f'Group has different IDs: {x[merge_id]}. ' f'Rows id: {x.index}'
raise excep.BiogemeError(err_msg)
if row_name is not None and not x[row_name].is_unique:
err_msg = (
f'Entries in column [{row_name}] are not unique. '
f'This column cannot be used to name the new '
f'columns:\n{x[[row_name, merge_id]]}. '
)
raise excep.BiogemeError(err_msg)

the_columns = set(x.columns) - {merge_id}
if row_name is not None:
the_columns -= {row_name}
sorted_list = sorted(list(the_columns))
first = True
i = 0
for _, row in x.iterrows():
i += 1
if first:
all_values = [row[merge_id]]
all_columns = [merge_id]
first = False
name = f'{i}' if row_name is None else row[row_name]
columns = [f'{name}_{c}' for c in sorted_list]
all_values.extend([row[c] for c in sorted_list])
all_columns.extend(columns)
df = pd.DataFrame([all_values], columns=all_columns)
return df

flat_data = grouped_varying.apply(treat)
flat_data.index = flat_data[merge_id]

# We remove the column 'merge_id' as it is stored as index.
if identical_columns:
return pd.concat([common_data, flat_data], axis='columns').drop(
columns=[merge_id]
)
return flat_data.drop(columns=[merge_id])

[docs]
class TemporaryFile:
"""Class generating a temporary file, so that the user does not
bother about its location, or even its name

Example::

with TemporaryFile() as filename:
with open(filename, 'w') as f:
print('stuff', file=f)
"""

def __enter__(self, name: str = None) -> str:
self.dir = tempfile.mkdtemp()
name = str(uuid.uuid4()) if name is None else name
return path.join(self.dir, name)

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[types.TracebackType],
) -> None:
"""Destroys the temporary directory"""
shutil.rmtree(self.dir)

[docs]
class ModelNames:
"""Class generating model names from unique configuration string"""

[docs]
def __init__(self, prefix: str = 'Model'):
self.prefix = prefix
self.dict_of_names = {}
self.current_number = 0

def __call__(self, the_id):
"""Get a short model name from a unique ID

:param the_id: Id of the model
:type the_id: str (or anything that can be used as a key for a dict)
"""
the_name = self.dict_of_names.get(the_id)
if the_name is None:
the_name = f'{self.prefix}_{self.current_number:06d}'
self.current_number += 1
self.dict_of_names[the_id] = the_name
return the_name

[docs]
def generate_unique_ids(list_of_ids):
"""If there are duplicates in the list, a new list is generated
where there are renamed to obtain a list with unique IDs.

:param list_of_ids: list of ids
:type list_of_ids: list[str]

:return: a dict that maps the unique names with the original name
"""
counts = defaultdict(int)
for the_id in list_of_ids:
counts[the_id] += 1

results = {}
for name, count in counts.items():
if count == 1:
results[name] = name
else:
substitutes = [f'{name}_{i}' for i in range(count)]
for new_name in substitutes:
results[new_name] = name
return results

[docs]
def unique_product(*iterables, max_memory_mb=1024):
"""Generate the Cartesian product of multiple iterables, keeping
only the unique entries.  Raises a MemoryError if memory usage
exceeds the specified threshold.

:param iterables: Variable number of iterables to compute the
Cartesian product from.
:type iterables: Iterable

:param max_memory_mb: Maximum memory usage in megabytes (default: 1024MB).
:type max_memory_mb: int

:return: Yields unique entries from the Cartesian product.
:rtype: Iterator[tuple]

"""

MB_TO_BYTES = 1024 * 1024
max_memory_bytes = max_memory_mb * MB_TO_BYTES  # Convert MB to bytes
seen = set()  # Set to store seen entries
total_memory = 0  # Track memory usage

for items in product(*iterables):
if items not in seen:
item_size = sum(sys.getsizeof(item) for item in items)
total_memory += item_size
if total_memory > max_memory_bytes:
raise MemoryError(
f'Memory usage exceeded the specified threshold: '
f'{total_memory/MB_TO_BYTES:.1f} MB > '
f'{max_memory_bytes/MB_TO_BYTES} MB.'
)
yield items

[docs]
def format_timedelta(td: timedelta) -> str:
"""Format a timedelta in a "human-readable" way"""

# Determine the total amount of seconds
total_seconds = int(td.total_seconds())
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
# Get the total microseconds remaining
microseconds = td.microseconds

# Format based on the most significant unit
if hours > 0:
return f'{hours}h {minutes}m {seconds}s'
if minutes > 0:
return f'{minutes}m {second}s'
if seconds > 0:
return f'{seconds}.{microseconds // 100000:01}s'
if microseconds >= 1000:
return f'{microseconds // 1000}ms'  # Convert to milliseconds

return f'{microseconds}μs'  # Microseconds as is

```