Forecast Features - Values¶
ForecastFeatureValues(perfdb)
¶
Class used for handling forecast feature values. Can be accessed via perfdb.forecasts.features.values.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(period, object_names=None, features=None)
¶
Deletes forecast feature values for the given period, objects and features.
Parameters:
-
(period¶DateTimeRange) –Desired period. It considers the date when the forecast was created.
-
(object_names¶list[str] | None, default:None) –Name of the objects to delete the forecasts for. if set to None will delete all. By default None
-
(features¶dict[str, list[str]] | list[str] | None, default:None) –If it is a dict, must be in the format {forecast_model: [feature1, feature2, ...]}. If it is a list, it must be in the format [feature1, feature2, ...] If set to None, will delete all features. By default None
Source code in echo_postgres/forecast_feature_values.py
@validate_call
def delete(
self,
period: DateTimeRange,
object_names: list[str] | None = None,
features: dict[str, list[str]] | list[str] | None = None,
) -> None:
"""Deletes forecast feature values for the given period, objects and features.
Parameters
----------
period : DateTimeRange
Desired period. It considers the date when the forecast was created.
object_names : list[str] | None, optional
Name of the objects to delete the forecasts for. if set to None will delete all. By default None
features : dict[str, list[str]] | list[str] | None, optional
If it is a dict, must be in the format {forecast_model: [feature1, feature2, ...]}.
If it is a list, it must be in the format [feature1, feature2, ...]
If set to None, will delete all features.
By default None
"""
# defining the query
query = [
sql.SQL(
"DELETE FROM performance.forecast_feature_values WHERE (timestamp >= {period_start} AND timestamp <= {period_end})",
).format(
period_start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
period_end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
where = []
if object_names:
# getting the object_ids
obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
if len(obj_ids) != len(object_names):
wrong_objs = set(object_names) - set(obj_ids.keys())
raise ValueError(f"object_name {wrong_objs} not found in the database")
where.append(sql.SQL(" object_id IN ({object_ids}) ").format(object_ids=sql.SQL(", ").join(map(sql.Literal, obj_ids.values()))))
if features:
desired_feature_ids = []
if isinstance(features, dict):
# getting the model ids
model_ids = self._perfdb.forecasts.models.get_ids(forecast_models=list(features.keys()))
if len(model_ids) != len(features):
wrong_models = set(features.keys()) - set(model_ids.keys())
raise ValueError(f"forecast_model {wrong_models} not found in the database")
# getting the feature ids
feature_ids = self._perfdb.forecasts.features.definitions.get_ids(forecast_models=list(features.keys()))
# creating list of desired feature_ids
for model, model_features in features.items():
for feature in model_features:
if feature not in feature_ids[model]:
raise ValueError(f"feature_name {feature} of model {model} not found in the database")
desired_feature_ids.append(feature_ids[model][feature])
else:
# getting the feature ids
feature_ids = self._perfdb.forecasts.features.definitions.get_ids(feature_names=features)
# creating list of desired feature_ids
for feature in features:
desired_feature_ids.extend(
model_features[feature] for model_features in feature_ids.values() if feature in model_features
)
where.append(
sql.SQL(" feature_id IN ({feature_ids}) ").format(
feature_ids=sql.SQL(", ").join(map(sql.Literal, desired_feature_ids)),
),
)
if where:
query.append(sql.SQL(" AND "))
query.append(sql.SQL(" AND ").join(where))
query = sql.Composed(query)
# deleting from the database
with self._perfdb.conn.reconnect() as conn:
# deleting
result = conn.execute(query)
logger.debug(f"Deleted {result.rowcount} rows from performance.forecast_feature_values")
get(period, object_names=None, features=None, order_by=None, limit=None, output_type='DataFrame')
¶
Gets forecast feature values for the given period and object names.
Parameters:
-
(period¶DateTimeRange) –Desired period. It considers the date when the forecast was created.
-
(object_names¶list[str] | None, default:None) –Name of the objects to get the forecasts for. if set to None will get all. By default None
-
(features¶dict[str, list[str]] | list[str] | None, default:None) –Can be one of:
- A dict: must be in the format {forecast_model: [feature1, feature2, ...]}.
- A list: it must be in the format [feature1, feature2, ...]
- None: will get all features.
By default None
-
(order_by¶list[str] | None, default:None) –List of columns to order the results. If set to None will default to ["timestamp ASC", "object_name ASC", "forecast_model_name ASC", "feature_name ASC"] By default None
-
(limit¶int | None, default:None) –Limit of entries to get, by default None
-
(output_type¶Literal['dict', 'DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "DataFrame"
Returns:
-
DataFrame–If output_type is "DataFrame", returns a DataFrame with the forecast feature values in the format index=MultiIndex[forecast_timestamp, timestamp] and columns=MultiIndex[object, model, feature]
-
dict[str, dict[str, dict[str, dict[Timestamp, dict[Timestamp, float]]]]]–If output_type is "dict", returns a dict with the forecast feature values in the format {object: {model: {feature: {forecast_timestamp: {timestamp: value, ...}, ...}, ...}, ...}, ...}
Source code in echo_postgres/forecast_feature_values.py
@validate_call
def get(
self,
period: DateTimeRange,
object_names: list[str] | None = None,
features: dict[str, list[str]] | list[str] | None = None,
order_by: list[str] | None = None,
limit: int | None = None,
output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, dict[str, dict[str, dict[Timestamp, dict[Timestamp, float]]]]]:
"""Gets forecast feature values for the given period and object names.
Parameters
----------
period : DateTimeRange
Desired period. It considers the date when the forecast was created.
object_names : list[str] | None, optional
Name of the objects to get the forecasts for. if set to None will get all. By default None
features : dict[str, list[str]] | list[str] | None, optional
Can be one of:
- A dict: must be in the format {forecast_model: [feature1, feature2, ...]}.
- A list: it must be in the format [feature1, feature2, ...]
- None: will get all features.
By default None
order_by : list[str] | None, optional
List of columns to order the results.
If set to None will default to ["timestamp ASC", "object_name ASC", "forecast_model_name ASC", "feature_name ASC"]
By default None
limit : int | None, optional
Limit of entries to get, by default None
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "DataFrame"
Returns
-------
DataFrame
If output_type is "DataFrame", returns a DataFrame with the forecast feature values in the format index=MultiIndex[forecast_timestamp, timestamp] and columns=MultiIndex[object, model, feature]
dict[str, dict[str, dict[str, dict[Timestamp, dict[Timestamp, float]]]]]
If output_type is "dict", returns a dict with the forecast feature values in the format {object: {model: {feature: {forecast_timestamp: {timestamp: value, ...}, ...}, ...}, ...}, ...}
"""
# defining the query
query = [
sql.SQL(
"SELECT object_name, forecast_model_name, feature_name, timestamp, value FROM v_forecast_feature_values WHERE (timestamp >= {period_start} AND timestamp <= {period_end})",
).format(
period_start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
period_end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
where = []
if object_names:
where.append(
sql.SQL(" object_name IN ({object_names}) ").format(object_names=sql.SQL(", ").join(map(sql.Literal, object_names))),
)
if features:
if isinstance(features, dict):
feature_where = [
sql.SQL(
" (forecast_model_name = {model} AND feature_name IN ({model_features})) ",
).format(
model=sql.Literal(model),
model_features=sql.SQL(", ").join(
map(sql.Literal, model_features),
),
)
for model, model_features in features.items()
]
feature_where = sql.SQL(" OR ").join(feature_where)
feature_where = [sql.SQL("("), feature_where, sql.SQL(")")]
feature_where = sql.Composed(feature_where)
where.append(feature_where)
else:
where.append(sql.SQL(" feature_name IN ({features}) ").format(features=sql.SQL(", ").join(map(sql.Literal, features))))
if where:
query.append(sql.SQL(" AND "))
query.append(sql.SQL(" AND ").join(where))
if not order_by:
order_by = ["timestamp ASC", "object_name ASC", "forecast_model_name ASC", "feature_name ASC"]
query.append(sql.SQL(" ORDER BY {order_by}").format(order_by=sql.SQL(", ").join(map(sql.SQL, order_by))))
if limit:
query.append(sql.SQL(" LIMIT {limit}").format(limit=sql.Literal(limit)))
query = sql.Composed(query)
# getting the data
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query, post_convert="pyarrow")
# converting timestamp to forecast_timestamp
df = df.rename(columns={"timestamp": "forecast_timestamp"})
# getting series with exploded values
values = df["value"].apply(lambda x: np.array(x.items())).explode()
values.name = "value"
# merging with df
exploded_df = df.drop(columns=["value"]).merge(right=values, how="right", left_index=True, right_index=True)
# converting value column that has an array of [timestamp, value] to two columns
exploded_df["timestamp"] = exploded_df["value"].apply(lambda x: x[0])
exploded_df["value"] = exploded_df["value"].apply(func=lambda x: x[1])
# converting timestamp to datetime
exploded_df["timestamp"] = to_datetime(
exploded_df["timestamp"].astype("string[pyarrow]"),
format="%Y-%m-%d %H:%M:%S",
).astype("datetime64[s]")
# converting value to float
exploded_df["value"] = exploded_df["value"].astype("float64[pyarrow]")
# renaming columns
exploded_df = exploded_df.rename(columns={"object_name": "object", "forecast_model_name": "model", "feature_name": "feature"})
# converting to MultiIndex
exploded_df = exploded_df.set_index(["forecast_timestamp", "timestamp"])
exploded_df = exploded_df.pivot(columns=["object", "model", "feature"], values="value")
if output_type == "DataFrame":
return exploded_df
# converting to dict
result = exploded_df.T.to_dict(orient="index")
# converting to dict of dicts
final_result = {}
for (obj, model, feature), values in result.items():
if obj not in final_result:
final_result[obj] = {}
if model not in final_result[obj]:
final_result[obj][model] = {}
if feature not in final_result[obj][model]:
final_result[obj][model][feature] = {}
for (forecast_timestamp, timestamp), value in values.items():
if forecast_timestamp not in final_result[obj][model][feature]:
final_result[obj][model][feature][forecast_timestamp] = {}
final_result[obj][model][feature][forecast_timestamp][timestamp] = value
return final_result
insert(df, forecast_model=None, object_name=None, skip_undefined=False, on_conflict='ignore')
¶
Inserts forecast feature values into the database for the given forecast model and object name.
Parameters:
-
(df¶DataFrame) –DataFrame with the forecast feature values to be inserted.
The index must be a MultiIndex with the levels forecast_timestamp and timestamp.
The columns must be a MultiIndex with the levels object, model and feature. The only exception to this is if both forecast_model and object_name are not None, then the columns can be a single level with the feature names and the MultiIndex will be created inside the method using the forecast_model and object_name.
-
(forecast_model¶str | None, default:None) –forecast_model to consider when the columns of df have a single level. This is only needed if df.columns has only one level with feature names. By default None
-
(object_name¶str | None, default:None) –object_name to consider when the columns of df have a single level. This is only needed if df.columns has only one level with feature names. By default None
-
(skip_undefined¶bool, default:False) –If True, will skip the undefined object, model and feature names, inserting only the existing ones.
If False will raise an error if there are undefined ones. By default False
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"
Source code in echo_postgres/forecast_feature_values.py
@validate_call
def insert(
self,
df: DataFrame,
forecast_model: str | None = None,
object_name: str | None = None,
skip_undefined: bool = False,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
"""Inserts forecast feature values into the database for the given forecast model and object name.
Parameters
----------
df : DataFrame
DataFrame with the forecast feature values to be inserted.
The index must be a MultiIndex with the levels forecast_timestamp and timestamp.
The columns must be a MultiIndex with the levels object, model and feature. The only exception to this is if both forecast_model and object_name are not None, then the columns can be a single level with the feature names and the MultiIndex will be created inside the method using the forecast_model and object_name.
forecast_model : str | None, optional
forecast_model to consider when the columns of df have a single level. This is only needed if df.columns has only one level with feature names. By default None
object_name : str | None, optional
object_name to consider when the columns of df have a single level. This is only needed if df.columns has only one level with feature names. By default None
skip_undefined : bool, optional
If True, will skip the undefined object, model and feature names, inserting only the existing ones.
If False will raise an error if there are undefined ones. By default False
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. Can be one of ["ignore", "update"].
By default "ignore"
"""
# checking if indexes are correct
if not isinstance(df.index, MultiIndex) and df.index.names != ["forecast_timestamp", "timestamp"]:
raise ValueError(
f"df index must be a MultiIndex with the levels forecast_timestamp and timestamp, got type {type(df.index)} and names {df.index.names if isinstance(df.index, MultiIndex) else None}",
)
if isinstance(df.columns, MultiIndex):
if df.columns.names != ["object", "model", "feature"]:
raise ValueError(
f"df columns must be a MultiIndex with the levels object, model and feature, got names {df.columns.names}",
)
elif not forecast_model and not object_name:
raise ValueError("forecast_model and object_name must be set if df.columns has only one level")
# making a copy of the DataFrame
df = df.copy()
# creating the MultiIndex if needed
if not isinstance(df.columns, MultiIndex):
df.columns = MultiIndex.from_tuples(
[(object_name, forecast_model, feature) for feature in df.columns],
names=["object", "model", "feature"],
)
# converting the DataFrame to the format of the database
# database expects columns feature_id, object_id, timestamp and value
# value is a JSONB column in the format {"forecast_timestamp": value, ...} where forecast_timestamp is converted to a string in the format YYYY-MM-DD HH:MM:SS
# first converting timestamp to a string
df.index = df.index.set_levels(df.index.levels[1].astype("datetime64[s]").strftime("%Y-%m-%d %H:%M:%S"), level=1)
# then we need to melt the columns
df = df.melt(ignore_index=False).reset_index(drop=False)
# dropping rows with NaN values
df = df.dropna(how="any")
# changing value to float64 for compatibility
df["value"] = df["value"].astype("float64")
# then grouping by so we have timestamp and value as lists
df = df.groupby(["object", "model", "feature", "forecast_timestamp"]).agg(list)
# getting final value as a dict
df["final_value"] = df.apply(lambda row: dict(zip(row["timestamp"], row["value"], strict=False)), axis=1)
# resetting index and renaming columns
df = df.reset_index(drop=False)
df = df.drop(columns=["timestamp", "value"])
df = df.rename(columns={"forecast_timestamp": "timestamp", "final_value": "value"})
# getting the object_ids
obj_ids = self._perfdb.objects.instances.get_ids(object_names=df["object"].unique().tolist())
if len(obj_ids) != len(df["object"].unique()):
wrong_objs = set(df["object"].unique()) - set(obj_ids.keys())
if not skip_undefined:
raise ValueError(f"object_name {wrong_objs} not found in the database")
logger.warning(f"object_name {wrong_objs} not found in the database, skipping")
df = df.loc[~df["object"].isin(wrong_objs)]
# creating the object_id column
df["object_id"] = df["object"].map(obj_ids).astype("int64[pyarrow]")
# getting the feature ids
feature_ids = self._perfdb.forecasts.features.definitions.get_ids(forecast_models=df["model"].unique().tolist())
# creating the feature_id column
df["feature_id"] = NA
for model in df["model"].unique():
# creating default dict
feat_ids = defaultdict(lambda: None, feature_ids[model])
# mapping the feature names to the feature ids
df.loc[(df["model"] == model), "feature_id"] = df.loc[(df["model"] == model), "feature"].map(feat_ids)
# checking if there are any feature_ids that are None
if df["feature_id"].isna().any():
wrong_feats = df.loc[df["feature_id"].isna(), "feature"].unique()
if not skip_undefined:
raise ValueError(f"feature_name {wrong_feats} not found in the database")
logger.warning(f"feature_name {wrong_feats} not found in the database, skipping")
df = df.loc[df["feature_id"].notna()]
# converting feature_id to int64
df["feature_id"] = df["feature_id"].astype("int64[pyarrow]")
# getting only the desired columns
df = df[["feature_id", "object_id", "timestamp", "value"]]
# inserting into the database
if_exists_mapping = {
"ignore": "append",
"update": "update",
}
with self._perfdb.conn.reconnect() as conn:
conn.pandas_to_sql(
df=df,
table_name="forecast_feature_values",
schema="performance",
if_exists=if_exists_mapping[on_conflict],
ignore_index=True,
)