Skip to content

KPI Energy Losses - Targets

For more details on energy losses data and waterfall calculation see this dedicated page in the reference section.

KpiEnergyLossesTargets(perfdb)

Class used for handling energy losses KPI values. Can be accessed via perfdb.kpis.energy.losses.targets.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(period, object_names=None, energy_losses_types=None)

Deletes energy loss values from the database.

Parameters:

  • period

    (DateTimeRange) –

    Period of time to delete the data for.

  • object_names

    (list[str], default: None ) –

    List of object names to delete the data for. By default None

  • energy_losses_types

    (list[str], default: None ) –

    List of energy loss types to delete the data for. By default None

Source code in echo_postgres/kpi_energy_losses_targets.py
@validate_call
def delete(
    self,
    period: DateTimeRange,
    object_names: list[str] | None = None,
    energy_losses_types: list[str] | None = None,
) -> None:
    """Deletes energy loss values from the database.

    Parameters
    ----------
    period : DateTimeRange
        Period of time to delete the data for.
    object_names : list[str], optional
        List of object names to delete the data for. By default None
    energy_losses_types : list[str], optional
        List of energy loss types to delete the data for. By default None
    """
    # build the query
    query = [
        sql.SQL("DELETE FROM performance.energyloss_targets WHERE (date >= {start} AND date <= {end})").format(
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]
    if object_names:
        # getting object id
        obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
        if len(obj_ids) != len(object_names):
            missing_objs = set(object_names) - set(obj_ids)
            raise ValueError(f"Could not find the following objects: {missing_objs}")
        query.append(sql.SQL(" AND object_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, obj_ids.values()))))
    if energy_losses_types:
        # getting energy losses types ids
        elt_ids = self._perfdb.kpis.energy.losses.types.get_ids()
        elt_ids = {elt: elt_ids[elt] for elt in energy_losses_types}
        query.append(sql.SQL(" AND energyloss_type_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, elt_ids.values()))))

    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        # deleting
        result = conn.execute(query)

    logger.debug(f"Deleted {result.rowcount} rows from energyloss_targets table")

get(period, object_names=None, energy_losses_types=None, filter_type='and', output_type='DataFrame', values_only=False)

Gets energy losses losses values for the desired period and objects.

Please keep in mind that this should only be used to get targets for objects (not groups) and for specific days (not time aggregates). If you want targets for groups and/or time aggregates use baze.kpis.energy.losses.values which will also return the targets with the actual values.

The most useful keys/columns returned are:

  • value
  • modified_date

Parameters:

  • period

    (DateTimeRange) –

    Period of time to get the data for.

  • object_names

    (list[str], default: None ) –

    List of object names to get the data for. By default None

  • energy_losses_types

    (list[str], default: None ) –

    List of energy loss types to delete the data for. By default None

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

  • output_type

    (Literal['dict', 'DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

  • values_only

    (bool, default: False ) –

    If set to True, when returning a dict will only return the values, ignoring other attributes like modified_date. Is ignored when output_type is "DataFrame". By default False

Returns:

  • DataFrame

    In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex["object_name", "energyloss_type_name", "date"], columns = [energy, modified_date]

  • dict[str, dict[str, dict[Timestamp, dict[str, Any]]]] | dict[str, dict[str, dict[Timestamp, Any]]]

    In case output_type is "dict", returns a dictionary in the format {object_name: {energyloss_type_name: {date: {attribute: value, ...}, ...}, ...} In case values_only is True, the dictionary will be in the format {object_name: {energyloss_type_name: {date: value, ...}, ...}

Source code in echo_postgres/kpi_energy_losses_targets.py
@validate_call
def get(
    self,
    period: DateTimeRange,
    object_names: list[str] | None = None,
    energy_losses_types: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame"] = "DataFrame",
    values_only: bool = False,
) -> DataFrame | dict[str, dict[str, dict[Timestamp, dict[str, Any]]]] | dict[str, dict[str, dict[Timestamp, Any]]]:
    """Gets energy losses losses values for the desired period and objects.

    Please keep in mind that this should only be used to get targets for objects (not groups) and for specific days (not time aggregates). If you want targets for groups and/or time aggregates use baze.kpis.energy.losses.values which will also return the targets with the actual values.

    The most useful keys/columns returned are:

    - value
    - modified_date

    Parameters
    ----------
    period : DateTimeRange
        Period of time to get the data for.
    object_names : list[str], optional
        List of object names to get the data for. By default None
    energy_losses_types : list[str], optional
        List of energy loss types to delete the data for. By default None
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"
    values_only : bool, optional
        If set to True, when returning a dict will only return the values, ignoring other attributes like modified_date. Is ignored when output_type is "DataFrame". By default False

    Returns
    -------
    DataFrame
        In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex["object_name", "energyloss_type_name", "date"], columns = [energy, modified_date]
    dict[str, dict[str, dict[Timestamp, dict[str, Any]]]] | dict[str, dict[str, dict[Timestamp, Any]]]
        In case output_type is "dict", returns a dictionary in the format {object_name: {energyloss_type_name: {date: {attribute: value, ...}, ...}, ...}
        In case values_only is True, the dictionary will be in the format {object_name: {energyloss_type_name: {date: value, ...}, ...}
    """
    # getting loss type definitions to get bazefield point
    loss_types_def = self._perfdb.kpis.energy.losses.types.get(output_type="dict")

    # checking if all energy losses types are valid
    if energy_losses_types is not None and (wrong_types := list(set(energy_losses_types) - set(loss_types_def.keys()))):
        raise ValueError(f"Invalid energy losses types: {wrong_types}")

    # build the query
    query = [
        sql.SQL(
            "SELECT * FROM performance.v_energyloss_targets WHERE (date >= {start} AND date <= {end}) AND value IS NOT NULL ",
        ).format(
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]

    where = []
    if object_names:
        where.append(
            sql.SQL("object_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, object_names)),
            ),
        )
    if energy_losses_types:
        where.append(
            sql.SQL("energyloss_type_name IN ({points})").format(
                points=sql.SQL(", ").join(map(sql.Literal, energy_losses_types)),
            ),
        )

    if where:
        query.append(sql.SQL(" AND ("))
        query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
        query.append(sql.SQL(")"))

    query.append(sql.SQL(" ORDER BY object_name, energyloss_type_name, date"))

    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query, post_convert="pyarrow")
    # forcing date to be a Timestamp
    df["date"] = df["date"].astype("datetime64[s]")
    # forcing object_name to be a string
    df = df.astype(
        {"object_name": "string[pyarrow]", "energyloss_type_name": "string[pyarrow]"},
    )
    df = df.astype(
        {"object_id": "int64[pyarrow]", "energyloss_type_id": "int16[pyarrow]"},
    )

    df = df.set_index(["object_name", "energyloss_type_name", "date"])

    if output_type == "DataFrame":
        return df

    # dropping id columns not used in dict format
    df = df.drop(columns=[col for col in df.columns if col.endswith("_id")])
    # converting to Dict
    result = df.to_dict(orient="index")
    final_result = {}
    for (object_or_group_name, energyloss_type_name, date), data in result.items():
        if object_or_group_name not in final_result:
            final_result[object_or_group_name] = {}
        if energyloss_type_name not in final_result[object_or_group_name]:
            final_result[object_or_group_name][energyloss_type_name] = {}
        if date not in final_result[object_or_group_name]:
            final_result[object_or_group_name][energyloss_type_name][date] = data["value"] if values_only else data

    return final_result

insert(df, on_conflict='ignore')

Inserts energy losses values into the database (table energyloss_targets)

Parameters:

  • df

    (DataFrame) –

    DataFrame with the following columns:

    • object_name
    • date
    • energy_loss_type ('park_curtailment', ...)
    • value
  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"

Source code in echo_postgres/kpi_energy_losses_targets.py
@validate_call
def insert(
    self,
    df: DataFrame,
    on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
    """Inserts energy losses values into the database (table energyloss_targets)

    Parameters
    ----------
    df : DataFrame
        DataFrame with the following columns:

        - object_name
        - date
        - energy_loss_type ('park_curtailment', ...)
        - value
    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict. Can be one of ["ignore", "update"].
        By default "ignore"
    """
    # checking inputs
    required_columns = {"object_name", "date", "energy_loss_type", "value"}
    if df.isna().any().any():
        raise ValueError("df cannot have NaN values")
    if set(df.columns) != required_columns:
        additional_cols = set(df.columns) - required_columns
        missing_cols = required_columns - set(df.columns)
        raise ValueError(
            f"df must have the following columns: object_name, date, energy_loss_type, value. Additional columns: {additional_cols}. Missing columns: {missing_cols}",
        )

    # making a copy of df
    df = df.copy()

    # getting object id
    wanted_objs = df["object_name"].unique().tolist()
    obj_ids = self._perfdb.objects.instances.get_ids(object_names=wanted_objs)
    if len(obj_ids) != len(wanted_objs):
        missing_objs = set(wanted_objs) - set(obj_ids)
        raise ValueError(f"Could not find the following objects: {missing_objs}")
    df["object_id"] = df["object_name"].map(obj_ids)

    # getting energy loss type id
    wanted_energy_losses_types = df["energy_loss_type"].unique().tolist()
    elt_ids = self._perfdb.kpis.energy.losses.types.get_ids()
    if wrong_wlt := set(wanted_energy_losses_types) - set(elt_ids.keys()):
        raise ValueError(f"Could not find the following measurement points: {wrong_wlt}")
    df["energyloss_type_id"] = df["energy_loss_type"].map(elt_ids)

    # removing unwanted columns
    df = df.drop(columns=["object_name", "energy_loss_type"])

    # converting energy column to float
    df["value"] = df["value"].astype("float32")

    # checking if there are NaNs in energy column
    nan_rows = df[df["value"].isna()].index
    if len(nan_rows) > 0:
        logger.warning(
            f"Found NaN values in value column. Dropping {len(nan_rows)} rows (indexes: {df['date'].loc[nan_rows].tolist()})",
        )
        df = df[~df.index.isin(nan_rows)].copy()

    # inserting data
    if_exists_mapping = {
        "ignore": "append",
        "update": "update",
    }
    with self._perfdb.conn.reconnect() as conn:
        conn.pandas_to_sql(
            df=df,
            table_name="energyloss_targets",
            schema="performance",
            if_exists=if_exists_mapping[on_conflict],
            ignore_index=True,
        )

    logger.debug(f"Energy loss targets inserted into the database. Total of {len(df)} rows inserted/updated.")