"""Combine several arithmetic expressions and a database to obtain formulas
:author: Michel Bierlaire
:date: Sat Jul 30 12:36:40 2022
"""
from __future__ import annotations
import logging
from typing import Generic, Iterable, NamedTuple, TYPE_CHECKING, TypeVar
import numpy as np
from biogeme.exceptions import BiogemeError
if TYPE_CHECKING:
from biogeme.database import Database
from biogeme.expressions import (
Expression,
TypeOfElementaryExpression,
Beta,
Variable,
RandomVariable,
Draws,
list_of_fixed_betas_in_expression,
list_of_free_betas_in_expression,
list_of_random_variables_in_expression,
list_of_draws_in_expression,
)
T = TypeVar('T', bound='Elementary')
try:
class ElementsTuple(Generic[T], NamedTuple):
"""Data structure for elementary expressions."""
expressions: dict[str, T] | None
indices: dict[str, int] | None
names: list[str]
except TypeError:
# This exception is raised by Python 3.10.
class _ElementsTuple(NamedTuple):
"""Data structure for elementary expressions."""
expressions: dict[str, T] | None
indices: dict[str, int] | None
names: list[str]
[docs]
class ElementsTuple(Generic[T], _ElementsTuple):
pass
logger = logging.getLogger(__name__)
[docs]
def expressions_names_indices(dict_of_elements: dict[str, type[T]]) -> ElementsTuple[T]:
"""Assigns consecutive indices to expressions
:param dict_of_elements: dictionary of expressions. The keys
are the names.
:return: a tuple with the original dictionary, the indices,
and the sorted names.
:rtype: ElementsTuple
"""
indices = {}
names = sorted(dict_of_elements)
for i, v in enumerate(names):
indices[v] = i
return ElementsTuple(expressions=dict_of_elements, indices=indices, names=names)
[docs]
class ExpressionRegistry:
"""Class combining managing the ids of an arithmetic expression."""
def __init__(
self,
expressions: Iterable[Expression],
database: Database,
):
"""Ctor
:param expressions: list of expressions
:type expressions: list(biogeme.expressions.Expression)
:param database: database with the variables as column names
:type database: biogeme.database.Database
:raises BiogemeError: if an expression contains a variable and
no database is provided.
"""
self._expressions: list[Expression] = list(expressions)
self.database: Database = database
self._free_betas: list[Beta] | None = None
self._fixed_betas: list[Beta] | None = None
self._random_variables: list[RandomVariable] | None = None
self._draws: list[Draws] | None = None
self._variables: list[Variable] | None = None
self._bounds: list[tuple[float, float]] | None = None
self._require_draws: bool | None = None
self._free_betas_indices: dict[str, int] | None = None
self._fixed_betas_indices: dict[str, int] | None = None
self._variables_indices: dict[str, int] | None = None
self._random_variables_indices: dict[str, int] | None = None
self._draws_indices: dict[str, int] | None = None
self.broadcast()
@property
def bounds(self) -> list[tuple[float, float]]:
if self._bounds is None:
self._bounds = [
(beta.lower_bound, beta.upper_bound) for beta in self._free_betas
]
return self._bounds
@property
def expressions(self) -> list[Expression]:
return self._expressions
@property
def requires_draws(self) -> bool:
if self._require_draws is None:
self._require_draws = any(
expression.requires_draws() for expression in self.expressions
)
return self._require_draws
@property
def free_betas_names(self) -> list[str]:
return [beta.name for beta in self.free_betas]
@property
def draws_names(self) -> list[str]:
return [draw.name for draw in self.draws]
@property
def free_betas(self) -> list[Beta]:
if self._free_betas is None:
self._free_betas = []
for f in self.expressions:
self._free_betas.extend(list_of_free_betas_in_expression(f))
# Remove duplicates based on the name attribute
unique_betas = {beta.name: beta for beta in self._free_betas}
self._free_betas = list(unique_betas.values())
return self._free_betas
@property
def free_betas_indices(self) -> dict[str, int]:
if self._free_betas_indices is None:
self._free_betas_indices = {
beta.name: i for i, beta in enumerate(self.free_betas)
}
return self._free_betas_indices
@property
def fixed_betas(self) -> list[Beta]:
if self._fixed_betas is None:
self._fixed_betas = []
for f in self.expressions:
self._fixed_betas.extend(list_of_fixed_betas_in_expression(f))
# Remove duplicates based on the name attribute
unique_betas = {beta.name: beta for beta in self._fixed_betas}
self._fixed_betas = list(unique_betas.values())
return self._fixed_betas
@property
def fixed_betas_indices(self) -> dict[str, int]:
if self._fixed_betas_indices is None:
self._fixed_betas_indices = {
beta.name: i for i, beta in enumerate(self.fixed_betas)
}
return self._fixed_betas_indices
@property
def random_variables(self) -> list[RandomVariable]:
if self._random_variables is None:
self._random_variables = []
for f in self.expressions:
self._random_variables.extend(list_of_random_variables_in_expression(f))
# Remove duplicates based on the name attribute
unique_rv = {rv.name: rv for rv in self._random_variables}
self._random_variables = list(unique_rv.values())
return self._random_variables
@property
def random_variables_indices(self) -> dict[str, int]:
if self._random_variables_indices is None:
self._random_variables_indices = {
rv.name: i for i, rv in enumerate(self.random_variables)
}
return self._random_variables_indices
@property
def draws(self) -> list[Draws]:
if self._draws is None:
self._draws = []
for f in self.expressions:
self._draws.extend(list_of_draws_in_expression(f))
# Remove duplicates based on the name attribute
unique_draws = {draw.name: draw for draw in self._draws}
self._draws = list(unique_draws.values())
return self._draws
@property
def draws_indices(self) -> dict[str, int]:
if self._draws_indices is None:
self._draws_indices = {draw.name: i for i, draw in enumerate(self.draws)}
return self._draws_indices
@property
def variables(self) -> list[Variable]:
if self._variables is None:
self._variables = (
[]
if self.database is None
else [
Variable(name) for name in self.database.dataframe.columns.to_list()
]
)
return self._variables
@property
def variables_indices(self) -> dict[str, int]:
if self._variables_indices is None:
self._variables_indices = {
variable.name: i for i, variable in enumerate(self.variables)
}
return self._variables_indices
@property
def number_of_free_betas(self) -> int:
return len(self.free_betas)
@property
def number_of_random_variables(self) -> int:
return len(self.random_variables)
@property
def free_betas_init_values(self) -> dict[str, float]:
"""
Retrieve the initial values of all free beta parameters.
:return: A dictionary mapping parameter names to their initial values.
"""
return {
expression.name: float(expression.init_value)
for expression in self.free_betas
}
[docs]
def get_betas_array(self, betas_dict: dict[str, float]) -> np.ndarray:
try:
result = np.array(
[float(betas_dict[beta.name]) for beta in self.free_betas]
)
return result
except KeyError:
betas_input = set(betas_dict.keys())
betas_names = set([beta.name for beta in self.free_betas])
unknown_betas = betas_input - betas_names
err_msg = f'Unknown parameters: {unknown_betas}. List of known parameters: {betas_names}'
raise BiogemeError(err_msg)
[docs]
def get_complete_betas_array(self, betas_dict: dict[str, float]) -> np.ndarray:
complete_dict = self.complete_dict_of_free_beta_values(the_betas=betas_dict)
return np.array([float(complete_dict[beta.name]) for beta in self.free_betas])
[docs]
def get_named_betas_values(self, values: np.ndarray) -> dict[str, float]:
return {beta.name: float(values[i]) for i, beta in enumerate(self.free_betas)}
@property
def list_of_free_betas_init_values(self) -> list[float]:
"""
Retrieve the initial values of all free beta parameters as a list.
:return: A list with parameter initial values.
"""
return list(self.free_betas_init_values.values())
[docs]
def complete_dict_of_free_beta_values(
self, the_betas: dict[str, float]
) -> dict[str, float]:
"""Build a list of values for the beta parameters, some of them provided by the user. The others are the
initial values.
:param the_betas: user provided values
:return: A list with parameter values.
"""
for beta in self.free_betas:
if beta.name not in the_betas:
the_betas[beta.name] = beta.init_value
return the_betas
@property
def fixed_betas_values(self) -> dict[str, float]:
"""
Retrieve the initial values of all fixed beta parameters.
:return: A dictionary mapping parameter names to their fixed values.
"""
return {
expression.name: expression.init_value for expression in self.fixed_betas
}
[docs]
def draw_types(self) -> dict[str, str]:
"""Retrieve the type of draw for each draw expression"""
return {expression.name: expression.draw_type for expression in self.draws}
[docs]
def broadcast(self) -> None:
"""Broadcast the ids to the expressions"""
for expression in self.expressions:
for name, index in self.free_betas_indices.items():
expression.set_specific_id(
name, index, TypeOfElementaryExpression.FREE_BETA
)
for name, index in self.fixed_betas_indices.items():
expression.set_specific_id(
name, index, TypeOfElementaryExpression.FIXED_BETA
)
for name, index in self.variables_indices.items():
expression.set_specific_id(
name, index, TypeOfElementaryExpression.VARIABLE
)
for name, index in self.draws_indices.items():
expression.set_specific_id(
name, index, TypeOfElementaryExpression.DRAWS
)
for name, index in self.random_variables_indices.items():
expression.set_specific_id(
name, index, TypeOfElementaryExpression.RANDOM_VARIABLE
)