Skip to content

KPI Energy Losses - Waterfall

For more details on energy losses data and waterfall calculation see this dedicated page in the reference section.

KpiEnergyWaterfall(perfdb)

Class used for getting energy waterfall values. Can be accessed via perfdb.kpis.energy.waterfall.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

get(period, group_name, group_type_name, waterfall_type='relative_perc')

Gets the energy waterfall values for a specific period, group name, group type name and waterfall type.

The resulting values will be positive or negative depending on how each loss impacted the final value. First and last entries in the result are always positive as they represent Gross/Measured (measured, target) or Target/Measured (relative_abs, relative_perc).

The output aims to be used directly in a waterfall chart, where the first and last entries are the total values and the intermediate entries are the losses.

Currently this method is getting the values from perfdb.kpis.energy.losses.values as it contains both the lost energy values as well as the target and measured (considering columns after_loss for energy loss with higher `)

For more details on how the waterfall is calculated, specially on relative_abs and relative_perc, check the Reference/Energy Losses section of the documentation.

Parameters:

  • period

    (DateTimeRange) –

    Period for which to get the values. Hour and minute will be removed, only the dates will be considered and are considered as inclusive.

  • group_name

    (str) –

    Name of the desired group.

  • group_type_name

    (str) –

    Name of the desired group type.

  • waterfall_type

    (Literal['actual', 'target', 'relative_abs', 'relative_perc'], default: 'relative_perc' ) –

    Type of the waterfall to get. Can be one of:

    • measured: Actual values in MWh
    • measured_perc: Actual values in percentage
    • target: Target values in MWh
    • target_perc: Target values in percentage
    • relative_abs: Difference of measured and target values in MWh
    • relative_perc: Difference of measured and target values in percentage

    By default, relative_perc is used.

Returns:

  • Series

    Series with results for wanted group. The index is the name of the loss. Depending on the waterfall_type, there will be special rows added:

    • Gross: First row with the total value of the group. Applicable to all types, except relative types. If a percentage type is selected it will be equal to 100%.
    • Measured: Last row with containing actual measured values (net energy). Applicable to all types, except target and target_perc.
    • Target: Depending on the waterfall_type:
      • target and target_perc: Last row with the target values (expected net energy).
      • relative_abs and relative_perc: First row with the target values (expected net energy).
Source code in echo_postgres/kpi_energy_waterfall.py
@validate_call
def get(
    self,
    period: DateTimeRange,
    group_name: str,
    group_type_name: str,
    waterfall_type: Literal["measured", "measured_perc", "target", "target_perc", "relative_abs", "relative_perc"] = "relative_perc",
) -> Series:
    """Gets the energy waterfall values for a specific period, group name, group type name and waterfall type.

    The resulting values will be positive or negative depending on how each loss impacted the final value. First and last entries in the result are always positive as they represent Gross/Measured (measured, target) or Target/Measured (relative_abs, relative_perc).

    The output aims to be used directly in a waterfall chart, where the first and last entries are the total values and the intermediate entries are the losses.

    Currently this method is getting the values from `perfdb.kpis.energy.losses.values` as it contains both the lost energy values as well as the target and measured (considering columns after_loss for energy loss with higher `)

    For more details on how the waterfall is calculated, specially on relative_abs and relative_perc, check the `Reference/Energy Losses` section of the documentation.

    Parameters
    ----------
    period : DateTimeRange
        Period for which to get the values. Hour and minute will be removed, only the dates will be considered and are considered as inclusive.
    group_name : str
        Name of the desired group.
    group_type_name : str
        Name of the desired group type.
    waterfall_type : Literal["actual", "target", "relative_abs", "relative_perc"], optional
        Type of the waterfall to get. Can be one of:

        - **measured**: Actual values in MWh
        - **measured_perc**: Actual values in percentage
        - **target**: Target values in MWh
        - **target_perc**: Target values in percentage
        - **relative_abs**: Difference of measured and target values in MWh
        - **relative_perc**: Difference of measured and target values in percentage

        By default, **relative_perc** is used.

    Returns
    -------
    Series
        Series with results for wanted group. The index is the name of the loss. Depending on the `waterfall_type`, there will be special rows added:

        - `Gross`: First row with the total value of the group. Applicable to all types, except `relative` types. If a percentage type is selected it will be equal to 100%.
        - `Measured`: Last row with containing actual measured values (net energy). Applicable to all types, except `target` and `target_perc`.
        - `Target`: Depending on the `waterfall_type`:
            - `target` and `target_perc`: Last row with the target values (expected net energy).
            - `relative_abs` and `relative_perc`: First row with the target values (expected net energy).
    """
    if group_type_name != "SPE":
        # checking if group exists
        group_ids = self._perfdb.objects.groups.instances.get_ids()
        if group_type_name not in group_ids:
            raise ValueError(f"group_type_name {group_type_name} does not exist")
        if group_name not in group_ids[group_type_name]:
            raise ValueError(f"group_name {group_name} does not exist")
    else:
        spe_ids = self._perfdb.objects.instances.get_ids(object_names=[group_name], object_models=["wind_farm", "solar_farm"])
        if len(spe_ids) != 1:
            raise ValueError(f"Could not find the SPE {group_name}")

    # adjusting period
    period.start = period.start.replace(hour=0, minute=0, second=0, microsecond=0)
    period.end = period.end.replace(hour=0, minute=0, second=0, microsecond=0)

    # getting definition of losses to get order and grouping in the Waterfall

    loss_def = self._perfdb.kpis.energy.losses.types.get(output_type="DataFrame")
    # removing "considered_in_waterfall" = False
    loss_def = loss_def[loss_def["considered_in_waterfall"]].copy()
    # sorting losses by order
    loss_def = loss_def.sort_values("loss_order")

    logger.info(
        f"Getting energy waterfall values for {period.start.date():%Y-%m-%d} to {period.end.date():%Y-%m-%d}, group {group_name}, group type {group_type_name} and waterfall type {waterfall_type}",
    )
    logger.info(f"The order of losses is {loss_def.index.to_list()}")

    # creating dict with loss order
    loss_order = loss_def["loss_order"].to_dict()

    # getting measured losses values
    df = self._perfdb.kpis.energy.losses.values.get(
        period=period,
        time_res="daily",
        aggregation_window=None,
        object_or_group_names=[group_name],
        object_group_types=[group_type_name],
    )

    # summing all days in the period
    df = df.reset_index(drop=False)
    df = (
        df[["energyloss_type_name", "measured", "measured_after_loss", "target", "target_after_loss"]]
        .groupby("energyloss_type_name")
        .sum()
    )
    # adding rows with losses not present in df
    df = df.reindex(loss_def.index, fill_value=0)
    df.index.name = "energyloss_type_name"
    # sorting losses by order considering loss_order dict, where the values is the order
    df = (
        df.reset_index(drop=False)
        .sort_values("energyloss_type_name", key=lambda x: x.map(loss_order))
        .set_index("energyloss_type_name")
    )

    # iterating each loss and filling after_loss values
    # going from last to first loss
    for i, loss in enumerate(loss_def.index[::-1]):
        if i == 0:
            continue
        # getting after_loss values as the sum of after_loss and loss of the previous loss
        previous_loss = loss_def.index[::-1][i - 1]
        df.loc[loss, "measured_after_loss"] = df.loc[previous_loss, "measured"] + df.loc[previous_loss, "measured_after_loss"]
        df.loc[loss, "target_after_loss"] = df.loc[previous_loss, "target"] + df.loc[previous_loss, "target_after_loss"]

    # summing values of losses that should be grouped based on waterfall_group
    waterfall_groups = loss_def["waterfall_group"].dropna().unique().tolist()
    if waterfall_groups:
        for group in waterfall_groups:
            group_losses = loss_def[loss_def["waterfall_group"] == group].index
            # creating a row with the sum of the group losses
            group_row = df.loc[group_losses].agg(
                {"measured": "sum", "measured_after_loss": "max", "target": "sum", "target_after_loss": "max"},
            )
            group_row.name = group
            # dropping group losses from original df
            df = df.drop(group_losses)
            # separating df in rows before and after the group
            min_group_order, max_group_order = (
                loss_def.loc[group_losses, "loss_order"].min(),
                loss_def.loc[group_losses, "loss_order"].max(),
            )
            before_group = df[df.index.map(lambda x: loss_order[x] < min_group_order)]  # noqa: B023
            after_group = df[df.index.map(lambda x: loss_order[x] > max_group_order)]  # noqa: B023
            # adding group row to the end of before_group
            df = concat([before_group, group_row.to_frame().T, after_group])

    # converting to MWh
    df = df / 1000

    # * Calculating waterfall

    match waterfall_type:
        # measured and target values in MWh
        case "measured" | "measured_perc" | "target" | "target_perc":
            # getting main col
            main_col = "measured" if "measured" in waterfall_type else "target"

            result = df.copy()
            # adding "Measured" or "Target" to the end using after_loss of the last loss
            new_line = df.loc[df.index[-1]].copy()
            new_line.name = main_col.capitalize()
            result = concat([result, new_line.to_frame().T])
            result.loc[main_col.capitalize(), main_col] = result.loc[main_col.capitalize(), f"{main_col}_after_loss"]
            # adding "Gross" to the beginning
            new_line = result.loc[result.index[0]].copy()
            new_line.name = "Gross"
            result = concat([new_line.to_frame().T, result])
            result.loc["Gross", main_col] = result.loc["Gross", f"{main_col}_after_loss"] + result.loc["Gross", main_col]
            # converting to a Series in MWh
            result = result[main_col]
            result.name = "Loss"

            # reversing signal of losses
            result.iloc[1:-1] = -result.iloc[1:-1]

            # converting to percentage if needed
            if waterfall_type == f"{main_col}_perc":
                result = result / result["Gross"]

        # relative to target in MWh or percentage
        case "relative_abs" | "relative_perc":
            result = df.copy()
            # adding "Relative" column where final result will be stored
            result["Relative"] = 0.0
            # adding "Target" to the beginning using after_loss of the last loss
            new_line = result.loc[result.index[-1]].copy()
            new_line.name = "Target"
            new_line.loc["Relative"] = new_line["target_after_loss"]
            result = concat([new_line.to_frame().T, result])
            # adding "Measured" to the end using after_loss of the last loss
            new_line = result.loc[result.index[-1]].copy()
            new_line.name = "Measured"
            new_line.loc["Relative"] = new_line["measured_after_loss"]
            result = concat([result, new_line.to_frame().T])
            # iterating losses from the last to the first
            for loss in df.index[::-1]:
                # getting target losses as percentage
                target_loss_perc = result.loc[loss, "target"] / (result.loc[loss, "target"] + result.loc[loss, "target_after_loss"])
                # applying target losses into gross up of measured losses
                target_loss_on_measured = (result.loc[loss, "measured"] + result.loc[loss, "measured_after_loss"]) * target_loss_perc
                # getting difference between measured and target losses applied on measured losses
                result.loc[loss, "Relative"] = target_loss_on_measured - result.loc[loss, "measured"]
            # getting resource impact as what is left after all losses
            # TODO this needs to be improved when we actually calculate the resource impact and the rest goes to "uncertainty"
            new_row = result.loc[result.index[-1]].copy()
            new_row.name = "Resource & Uncertainty"
            new_row.loc["Relative"] = (
                result.loc["Measured", "Relative"] - result.loc[df.index.to_list(), "Relative"].sum() - result.loc["Target", "Relative"]
            )
            # adding row in second position (after target)
            result = concat([result.iloc[:1, :], new_row.to_frame().T, result.iloc[1:, :]], axis=0)

            # getting only series
            result = result["Relative"]
            result.name = "Loss"

            # converting to percentage if needed
            if waterfall_type == "relative_perc":
                result = result / result["Target"]

    # creating a dict from loss name to display name
    loss_display_name = loss_def["display_name"].to_dict()
    # converting names in result to display names
    result = result.rename(index=loss_display_name)

    return result