Forecast Models - Attributes¶
ForecastModelAttributes(perfdb)
¶
Class used for handling forecast models attributes. Can be accessed via perfdb.forecasts.models.attributes.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
get(forecast_models=None, attribute_names=None, filter_type='and', output_type='dict', values_only=False)
¶
Method to get the attributes of the given data source instance.
The most useful keys/columns returned are:
- attribute_name
- attribute_value
- data_type_name
Parameters:
-
(forecast_models¶list[str] | None, default:None) –Names of the forecast models to filter the results. By default None
-
(attribute_names¶list[str] | None, default:None) –List of attribute names to filter the results. If set to None will get all. By default None
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(output_type¶Literal['dict', 'DataFrame'], default:'dict') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
-
(values_only¶bool, default:False) –If set to True, will only return the values of the attributes, skipping display_name, id, etc.
Returns:
-
dict[str, dict[str, dict[str, Any | dict[str, Any]]]]–In case output_type is "dict", returns a dictionary in the format {forecast_model_name: {attribute_name: {attribute: value, ...}, ...}, ...} If values_only is True, the innermost dictionary will be {attribute_name: value, ...}
-
DataFrame–In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[feature_name, attribute_name], columns = [attribute, ...] If values_only is True, the columns will be ["attribute_value"]
Source code in echo_postgres/forecast_model_attributes.py
@validate_call
def get(
self,
forecast_models: list[str] | None = None,
attribute_names: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame"] = "dict",
values_only: bool = False,
) -> dict[str, dict[str, dict[str, Any | dict[str, Any]]]] | DataFrame:
"""Method to get the attributes of the given data source instance.
The most useful keys/columns returned are:
- attribute_name
- attribute_value
- data_type_name
Parameters
----------
forecast_models : list[str] | None, optional
Names of the forecast models to filter the results.
By default None
attribute_names : list[str] | None, optional
List of attribute names to filter the results. If set to None will get all. By default None
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
values_only : bool, optional
If set to True, will only return the values of the attributes, skipping display_name, id, etc.
Returns
-------
dict[str, dict[str, dict[str, Any | dict[str, Any]]]]
In case output_type is "dict", returns a dictionary in the format {forecast_model_name: {attribute_name: {attribute: value, ...}, ...}, ...}
If values_only is True, the innermost dictionary will be {attribute_name: value, ...}
DataFrame
In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[feature_name, attribute_name], columns = [attribute, ...]
If values_only is True, the columns will be ["attribute_value"]
"""
# building the WHERE clause
where = []
if forecast_models:
where.append(
sql.SQL("forecast_model_name IN ({names})").format(names=sql.SQL(", ").join(map(sql.Literal, forecast_models))),
)
if attribute_names:
where.append(
sql.SQL("attribute_name IN ({names})").format(names=sql.SQL(", ").join(map(sql.Literal, attribute_names))),
)
if where:
where = sql.SQL(f" {filter_type.upper()} ").join(where)
# building the query
query = [
sql.SQL(
"SELECT {values} FROM performance.v_forecast_model_attributes",
).format(
values=sql.SQL(
"forecast_model_name, attribute_name, attribute_value, data_type_name",
)
if values_only
else sql.SQL("*"),
),
]
if where:
query.append(sql.SQL(" WHERE "))
query.append(where)
query.append(sql.SQL(" ORDER BY forecast_model_name, attribute_name"))
query = sql.Composed(query)
# executing the query
with self._perfdb.conn.reconnect() as conn:
# setting attribute_value as object to avoid casting json column as string
df = conn.read_to_pandas(query, post_convert="pyarrow")
# casting the attribute values
df = cast_attributes(df=df, index_cols=["forecast_model_name"])
df = df.set_index(["forecast_model_name", "attribute_name"])
# returning the result
if output_type == "dict":
# dropping unwanted columns
if values_only:
df = df["attribute_value"]
temp = df.to_dict()
else:
temp = df[["attribute_id", "attribute_value", "data_type_id", "data_type_name", "modified_date"]].to_dict(orient="index")
# converting from format {(forecast_model_name, attribute_name): value} to {forecast_model_name: {attribute_name: value, ...}, ...}
output = {}
for (forecast_model_name, attribute_name), value in temp.items():
if forecast_model_name not in output:
output[forecast_model_name] = {}
output[forecast_model_name][attribute_name] = value
return output
if output_type == "DataFrame" and values_only:
df = df[["attribute_value"]].copy()
return df