Skip to content

Forecast Features - Values

ForecastFeatureValues(perfdb)

Class used for handling forecast feature values. Can be accessed via perfdb.forecasts.features.values.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(period, object_names=None, features=None)

Deletes forecast feature values for the given period, objects and features.

Parameters:

  • period

    (DateTimeRange) –

    Desired period. It considers the date when the forecast was created.

  • object_names

    (list[str] | None, default: None ) –

    Name of the objects to delete the forecasts for. if set to None will delete all. By default None

  • features

    (dict[str, list[str]] | list[str] | None, default: None ) –

    If it is a dict, must be in the format {forecast_model: [feature1, feature2, ...]}. If it is a list, it must be in the format [feature1, feature2, ...] If set to None, will delete all features. By default None

Source code in echo_postgres/forecast_feature_values.py
@validate_call
def delete(
    self,
    period: DateTimeRange,
    object_names: list[str] | None = None,
    features: dict[str, list[str]] | list[str] | None = None,
) -> None:
    """Deletes forecast feature values for the given period, objects and features.

    Parameters
    ----------
    period : DateTimeRange
        Desired period. It considers the date when the forecast was created.
    object_names : list[str] | None, optional
        Name of the objects to delete the forecasts for. if set to None will delete all. By default None
    features : dict[str, list[str]] | list[str] | None, optional
        If it is a dict, must be in the format {forecast_model: [feature1, feature2, ...]}.
        If it is a list, it must be in the format [feature1, feature2, ...]
        If set to None, will delete all features.
        By default None
    """
    # defining the query
    query = [
        sql.SQL(
            "DELETE FROM performance.forecast_feature_values WHERE (timestamp >= {period_start} AND timestamp <= {period_end})",
        ).format(
            period_start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            period_end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]

    where = []
    if object_names:
        # getting the object_ids
        obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
        if len(obj_ids) != len(object_names):
            wrong_objs = set(object_names) - set(obj_ids.keys())
            raise ValueError(f"object_name {wrong_objs} not found in the database")
        where.append(sql.SQL(" object_id IN ({object_ids}) ").format(object_ids=sql.SQL(", ").join(map(sql.Literal, obj_ids.values()))))
    if features:
        desired_feature_ids = []
        if isinstance(features, dict):
            # getting the model ids
            model_ids = self._perfdb.forecasts.models.get_ids(forecast_models=list(features.keys()))
            if len(model_ids) != len(features):
                wrong_models = set(features.keys()) - set(model_ids.keys())
                raise ValueError(f"forecast_model {wrong_models} not found in the database")
            # getting the feature ids
            feature_ids = self._perfdb.forecasts.features.definitions.get_ids(forecast_models=list(features.keys()))
            # creating list of desired feature_ids

            for model, model_features in features.items():
                for feature in model_features:
                    if feature not in feature_ids[model]:
                        raise ValueError(f"feature_name {feature} of model {model} not found in the database")
                    desired_feature_ids.append(feature_ids[model][feature])
        else:
            # getting the feature ids
            feature_ids = self._perfdb.forecasts.features.definitions.get_ids(feature_names=features)
            # creating list of desired feature_ids
            for feature in features:
                desired_feature_ids.extend(
                    model_features[feature] for model_features in feature_ids.values() if feature in model_features
                )
        where.append(
            sql.SQL(" feature_id IN ({feature_ids}) ").format(
                feature_ids=sql.SQL(", ").join(map(sql.Literal, desired_feature_ids)),
            ),
        )
    if where:
        query.append(sql.SQL(" AND "))
        query.append(sql.SQL(" AND ").join(where))

    query = sql.Composed(query)

    # deleting from the database
    with self._perfdb.conn.reconnect() as conn:
        # deleting
        result = conn.execute(query)

    logger.debug(f"Deleted {result.rowcount} rows from performance.forecast_feature_values")

get(period, object_names=None, features=None, order_by=None, limit=None, output_type='DataFrame')

Gets forecast feature values for the given period and object names.

Parameters:

  • period

    (DateTimeRange) –

    Desired period. It considers the date when the forecast was created.

  • object_names

    (list[str] | None, default: None ) –

    Name of the objects to get the forecasts for. if set to None will get all. By default None

  • features

    (dict[str, list[str]] | list[str] | None, default: None ) –

    Can be one of:

    • A dict: must be in the format {forecast_model: [feature1, feature2, ...]}.
    • A list: it must be in the format [feature1, feature2, ...]
    • None: will get all features.

    By default None

  • order_by

    (list[str] | None, default: None ) –

    List of columns to order the results. If set to None will default to ["timestamp ASC", "object_name ASC", "forecast_model_name ASC", "feature_name ASC"] By default None

  • limit

    (int | None, default: None ) –

    Limit of entries to get, by default None

  • output_type

    (Literal['dict', 'DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "DataFrame"

Returns:

  • DataFrame

    If output_type is "DataFrame", returns a DataFrame with the forecast feature values in the format index=MultiIndex[forecast_timestamp, timestamp] and columns=MultiIndex[object, model, feature]

  • dict[str, dict[str, dict[str, dict[Timestamp, dict[Timestamp, float]]]]]

    If output_type is "dict", returns a dict with the forecast feature values in the format {object: {model: {feature: {forecast_timestamp: {timestamp: value, ...}, ...}, ...}, ...}, ...}

Source code in echo_postgres/forecast_feature_values.py
@validate_call
def get(
    self,
    period: DateTimeRange,
    object_names: list[str] | None = None,
    features: dict[str, list[str]] | list[str] | None = None,
    order_by: list[str] | None = None,
    limit: int | None = None,
    output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, dict[str, dict[str, dict[Timestamp, dict[Timestamp, float]]]]]:
    """Gets forecast feature values for the given period and object names.

    Parameters
    ----------
    period : DateTimeRange
        Desired period. It considers the date when the forecast was created.
    object_names : list[str] | None, optional
        Name of the objects to get the forecasts for. if set to None will get all. By default None
    features : dict[str, list[str]] | list[str] | None, optional
        Can be one of:

        - A dict: must be in the format {forecast_model: [feature1, feature2, ...]}.
        - A list: it must be in the format [feature1, feature2, ...]
        - None: will get all features.

        By default None
    order_by : list[str] | None, optional
        List of columns to order the results.
        If set to None will default to ["timestamp ASC", "object_name ASC", "forecast_model_name ASC", "feature_name ASC"]
        By default None
    limit : int | None, optional
        Limit of entries to get, by default None
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "DataFrame"

    Returns
    -------
    DataFrame
        If output_type is "DataFrame", returns a DataFrame with the forecast feature values in the format index=MultiIndex[forecast_timestamp, timestamp] and columns=MultiIndex[object, model, feature]
    dict[str, dict[str, dict[str, dict[Timestamp, dict[Timestamp, float]]]]]
        If output_type is "dict", returns a dict with the forecast feature values in the format {object: {model: {feature: {forecast_timestamp: {timestamp: value, ...}, ...}, ...}, ...}, ...}
    """
    # defining the query
    query = [
        sql.SQL(
            "SELECT object_name, forecast_model_name, feature_name, timestamp, value FROM v_forecast_feature_values WHERE (timestamp >= {period_start} AND timestamp <= {period_end})",
        ).format(
            period_start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            period_end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]

    where = []
    if object_names:
        where.append(
            sql.SQL(" object_name IN ({object_names}) ").format(object_names=sql.SQL(", ").join(map(sql.Literal, object_names))),
        )
    if features:
        if isinstance(features, dict):
            feature_where = [
                sql.SQL(
                    " (forecast_model_name = {model} AND feature_name IN ({model_features})) ",
                ).format(
                    model=sql.Literal(model),
                    model_features=sql.SQL(", ").join(
                        map(sql.Literal, model_features),
                    ),
                )
                for model, model_features in features.items()
            ]
            feature_where = sql.SQL(" OR ").join(feature_where)
            feature_where = [sql.SQL("("), feature_where, sql.SQL(")")]
            feature_where = sql.Composed(feature_where)
            where.append(feature_where)
        else:
            where.append(sql.SQL(" feature_name IN ({features}) ").format(features=sql.SQL(", ").join(map(sql.Literal, features))))
    if where:
        query.append(sql.SQL(" AND "))
        query.append(sql.SQL(" AND ").join(where))

    if not order_by:
        order_by = ["timestamp ASC", "object_name ASC", "forecast_model_name ASC", "feature_name ASC"]
    query.append(sql.SQL(" ORDER BY {order_by}").format(order_by=sql.SQL(", ").join(map(sql.SQL, order_by))))

    if limit:
        query.append(sql.SQL(" LIMIT {limit}").format(limit=sql.Literal(limit)))

    query = sql.Composed(query)

    # getting the data
    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query, post_convert="pyarrow")

    # converting timestamp to forecast_timestamp
    df = df.rename(columns={"timestamp": "forecast_timestamp"})

    # getting series with exploded values
    values = df["value"].apply(lambda x: np.array(x.items())).explode()
    values.name = "value"

    # merging with df
    exploded_df = df.drop(columns=["value"]).merge(right=values, how="right", left_index=True, right_index=True)

    # converting value column that has an array of [timestamp, value] to two columns
    exploded_df["timestamp"] = exploded_df["value"].apply(lambda x: x[0])
    exploded_df["value"] = exploded_df["value"].apply(func=lambda x: x[1])

    # converting timestamp to datetime
    exploded_df["timestamp"] = to_datetime(
        exploded_df["timestamp"].astype("string[pyarrow]"),
        format="%Y-%m-%d %H:%M:%S",
    ).astype("datetime64[s]")

    # converting value to float
    exploded_df["value"] = exploded_df["value"].astype("float64[pyarrow]")

    # renaming columns
    exploded_df = exploded_df.rename(columns={"object_name": "object", "forecast_model_name": "model", "feature_name": "feature"})

    # converting to MultiIndex
    exploded_df = exploded_df.set_index(["forecast_timestamp", "timestamp"])
    exploded_df = exploded_df.pivot(columns=["object", "model", "feature"], values="value")

    if output_type == "DataFrame":
        return exploded_df

    # converting to dict
    result = exploded_df.T.to_dict(orient="index")
    # converting to dict of dicts
    final_result = {}
    for (obj, model, feature), values in result.items():
        if obj not in final_result:
            final_result[obj] = {}
        if model not in final_result[obj]:
            final_result[obj][model] = {}
        if feature not in final_result[obj][model]:
            final_result[obj][model][feature] = {}
        for (forecast_timestamp, timestamp), value in values.items():
            if forecast_timestamp not in final_result[obj][model][feature]:
                final_result[obj][model][feature][forecast_timestamp] = {}
            final_result[obj][model][feature][forecast_timestamp][timestamp] = value

    return final_result

insert(df, forecast_model=None, object_name=None, skip_undefined=False, on_conflict='ignore')

Inserts forecast feature values into the database for the given forecast model and object name.

Parameters:

  • df

    (DataFrame) –

    DataFrame with the forecast feature values to be inserted.

    The index must be a MultiIndex with the levels forecast_timestamp and timestamp.

    The columns must be a MultiIndex with the levels object, model and feature. The only exception to this is if both forecast_model and object_name are not None, then the columns can be a single level with the feature names and the MultiIndex will be created inside the method using the forecast_model and object_name.

  • forecast_model

    (str | None, default: None ) –

    forecast_model to consider when the columns of df have a single level. This is only needed if df.columns has only one level with feature names. By default None

  • object_name

    (str | None, default: None ) –

    object_name to consider when the columns of df have a single level. This is only needed if df.columns has only one level with feature names. By default None

  • skip_undefined

    (bool, default: False ) –

    If True, will skip the undefined object, model and feature names, inserting only the existing ones.

    If False will raise an error if there are undefined ones. By default False

  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"

Source code in echo_postgres/forecast_feature_values.py
@validate_call
def insert(
    self,
    df: DataFrame,
    forecast_model: str | None = None,
    object_name: str | None = None,
    skip_undefined: bool = False,
    on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
    """Inserts forecast feature values into the database for the given forecast model and object name.

    Parameters
    ----------
    df : DataFrame
        DataFrame with the forecast feature values to be inserted.

        The index must be a MultiIndex with the levels forecast_timestamp and timestamp.

        The columns must be a MultiIndex with the levels object, model and feature. The only exception to this is if both forecast_model and object_name are not None, then the columns can be a single level with the feature names and the MultiIndex will be created inside the method using the forecast_model and object_name.
    forecast_model : str | None, optional
        forecast_model to consider when the columns of df have a single level. This is only needed if df.columns has only one level with feature names. By default None
    object_name : str | None, optional
        object_name to consider when the columns of df have a single level. This is only needed if df.columns has only one level with feature names. By default None
    skip_undefined : bool, optional
        If True, will skip the undefined object, model and feature names, inserting only the existing ones.

        If False will raise an error if there are undefined ones. By default False
    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict. Can be one of ["ignore", "update"].
        By default "ignore"
    """
    # checking if indexes are correct
    if not isinstance(df.index, MultiIndex) and df.index.names != ["forecast_timestamp", "timestamp"]:
        raise ValueError(
            f"df index must be a MultiIndex with the levels forecast_timestamp and timestamp, got type {type(df.index)} and names {df.index.names if isinstance(df.index, MultiIndex) else None}",
        )
    if isinstance(df.columns, MultiIndex):
        if df.columns.names != ["object", "model", "feature"]:
            raise ValueError(
                f"df columns must be a MultiIndex with the levels object, model and feature, got names {df.columns.names}",
            )
    elif not forecast_model and not object_name:
        raise ValueError("forecast_model and object_name must be set if df.columns has only one level")

    # making a copy of the DataFrame
    df = df.copy()

    # creating the MultiIndex if needed
    if not isinstance(df.columns, MultiIndex):
        df.columns = MultiIndex.from_tuples(
            [(object_name, forecast_model, feature) for feature in df.columns],
            names=["object", "model", "feature"],
        )

    # converting the DataFrame to the format of the database
    # database expects columns feature_id, object_id, timestamp and value
    # value is a JSONB column in the format {"forecast_timestamp": value, ...} where forecast_timestamp is converted to a string in the format YYYY-MM-DD HH:MM:SS

    # first converting timestamp to a string
    df.index = df.index.set_levels(df.index.levels[1].astype("datetime64[s]").strftime("%Y-%m-%d %H:%M:%S"), level=1)
    # then we need to melt the columns
    df = df.melt(ignore_index=False).reset_index(drop=False)
    # dropping rows with NaN values
    df = df.dropna(how="any")
    # changing value to float64 for compatibility
    df["value"] = df["value"].astype("float64")
    # then grouping by so we have timestamp and value as lists
    df = df.groupby(["object", "model", "feature", "forecast_timestamp"]).agg(list)
    # getting final value as a dict
    df["final_value"] = df.apply(lambda row: dict(zip(row["timestamp"], row["value"], strict=False)), axis=1)
    # resetting index and renaming columns
    df = df.reset_index(drop=False)
    df = df.drop(columns=["timestamp", "value"])
    df = df.rename(columns={"forecast_timestamp": "timestamp", "final_value": "value"})

    # getting the object_ids
    obj_ids = self._perfdb.objects.instances.get_ids(object_names=df["object"].unique().tolist())
    if len(obj_ids) != len(df["object"].unique()):
        wrong_objs = set(df["object"].unique()) - set(obj_ids.keys())
        if not skip_undefined:
            raise ValueError(f"object_name {wrong_objs} not found in the database")
        logger.warning(f"object_name {wrong_objs} not found in the database, skipping")
        df = df.loc[~df["object"].isin(wrong_objs)]
    # creating the object_id column
    df["object_id"] = df["object"].map(obj_ids).astype("int64[pyarrow]")

    # getting the feature ids
    feature_ids = self._perfdb.forecasts.features.definitions.get_ids(forecast_models=df["model"].unique().tolist())

    # creating the feature_id column
    df["feature_id"] = NA

    for model in df["model"].unique():
        # creating default dict
        feat_ids = defaultdict(lambda: None, feature_ids[model])
        # mapping the feature names to the feature ids
        df.loc[(df["model"] == model), "feature_id"] = df.loc[(df["model"] == model), "feature"].map(feat_ids)

    # checking if there are any feature_ids that are None
    if df["feature_id"].isna().any():
        wrong_feats = df.loc[df["feature_id"].isna(), "feature"].unique()
        if not skip_undefined:
            raise ValueError(f"feature_name {wrong_feats} not found in the database")
        logger.warning(f"feature_name {wrong_feats} not found in the database, skipping")
        df = df.loc[df["feature_id"].notna()]
    # converting feature_id to int64
    df["feature_id"] = df["feature_id"].astype("int64[pyarrow]")

    # getting only the desired columns
    df = df[["feature_id", "object_id", "timestamp", "value"]]

    # inserting into the database
    if_exists_mapping = {
        "ignore": "append",
        "update": "update",
    }
    with self._perfdb.conn.reconnect() as conn:
        conn.pandas_to_sql(
            df=df,
            table_name="forecast_feature_values",
            schema="performance",
            if_exists=if_exists_mapping[on_conflict],
            ignore_index=True,
        )