Forecast Models - Attributes¶
ForecastModelAttributes(perfdb)
¶
Class used for handling forecast models attributes. Can be accessed via perfdb.forecasts.models.attributes.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
get(forecast_models=None, attribute_names=None, filter_type='and', output_type='dict', values_only=False)
¶
Method to get the attributes of the given data source instance.
The most useful keys/columns returned are:
- attribute_name
- attribute_value
- data_type_name
Parameters:
-
(forecast_models¶list[str] | None, default:None) –Names of the forecast models to filter the results. By default None
-
(attribute_names¶list[str] | None, default:None) –List of attribute names to filter the results. If set to None will get all. By default None
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(output_type¶Literal['dict', 'DataFrame'], default:'dict') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
-
(values_only¶bool, default:False) –If set to True, will only return the values of the attributes, skipping display_name, id, etc.
Returns:
-
dict[str, dict[str, dict[str, Any | dict[str, Any]]]]–In case output_type is "dict", returns a dictionary in the format {forecast_model_name: {attribute_name: {attribute: value, ...}, ...}, ...} If values_only is True, the innermost dictionary will be {attribute_name: value, ...}
-
DataFrame–In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[feature_name, attribute_name], columns = [attribute, ...] If values_only is True, the columns will be ["attribute_value"]
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame
Source code in echo_postgres/forecast_model_attributes.py
@validate_call
def get(
self,
forecast_models: list[str] | None = None,
attribute_names: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "dict",
values_only: bool = False,
) -> dict[str, dict[str, dict[str, Any | dict[str, Any]]]] | pd.DataFrame | pl.DataFrame:
"""Method to get the attributes of the given data source instance.
The most useful keys/columns returned are:
- attribute_name
- attribute_value
- data_type_name
Parameters
----------
forecast_models : list[str] | None, optional
Names of the forecast models to filter the results.
By default None
attribute_names : list[str] | None, optional
List of attribute names to filter the results. If set to None will get all. By default None
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
values_only : bool, optional
If set to True, will only return the values of the attributes, skipping display_name, id, etc.
Returns
-------
dict[str, dict[str, dict[str, Any | dict[str, Any]]]]
In case output_type is "dict", returns a dictionary in the format {forecast_model_name: {attribute_name: {attribute: value, ...}, ...}, ...}
If values_only is True, the innermost dictionary will be {attribute_name: value, ...}
pd.DataFrame
In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[feature_name, attribute_name], columns = [attribute, ...]
If values_only is True, the columns will be ["attribute_value"]
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame
"""
# building the WHERE clause
where = []
if forecast_models:
where.append(
sql.SQL("forecast_model_name IN ({names})").format(names=sql.SQL(", ").join(map(sql.Literal, forecast_models))),
)
if attribute_names:
where.append(
sql.SQL("attribute_name IN ({names})").format(names=sql.SQL(", ").join(map(sql.Literal, attribute_names))),
)
if where:
where = sql.SQL(f" {filter_type.upper()} ").join(where)
# building the query
query = [
sql.SQL(
"SELECT {values} FROM performance.v_forecast_model_attributes",
).format(
values=sql.SQL(
"forecast_model_name, attribute_name, attribute_value::TEXT, data_type_name",
)
if values_only
else sql.SQL(
"forecast_model_id, forecast_model_name, attribute_id, attribute_name, attribute_value::TEXT, data_type_id, data_type_name, modified_date",
),
),
]
if where:
query.append(sql.SQL(" WHERE "))
query.append(where)
query.append(sql.SQL(" ORDER BY forecast_model_name, attribute_name"))
query = sql.Composed(query)
# executing the query
df = self._perfdb.conn.read_to_polars(
query,
schema_overrides=self._cols_schema,
)
# casting the attribute values
df = cast_attributes(df=df, index_cols=["forecast_model_name"])
return convert_output(
df,
output_type,
index_col=["forecast_model_name", "attribute_name"],
nest_by_index=True,
values_only_key="attribute_value" if values_only else None,
)