Skip to content

KPI Energy Losses - Waterfall

For more details on energy losses data and waterfall calculation see this dedicated page in the reference section.

KpiEnergyWaterfall(perfdb)

Class used for getting energy waterfall values. Can be accessed via perfdb.kpis.energy.waterfall.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
Python
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

get(period, group_name, group_type_name, waterfall_type='relative_perc', include_p50_deviation=False, output_type='pd.Series')

Gets the energy waterfall values for a specific period, group name, group type name and waterfall type.

The resulting values will be positive or negative depending on how each loss impacted the final value. First and last entries in the result are always positive as they represent Gross/Measured (measured, target) or Target/Measured (relative_abs, relative_perc).

The output aims to be used directly in a waterfall chart, where the first and last entries are the total values and the intermediate entries are the losses.

In case include_p50_deviation is set to True, two extra entries will be added at the beginning of the result: P50 and Target Adjustment. P50 is a total value and Target Adjustment is the difference between Target and P50.

Currently this method is getting the values from perfdb.kpis.energy.losses.values as it contains both the lost energy values as well as the target and measured.

For more details on how the waterfall is calculated, specially on relative_abs and relative_perc, check the Reference/Energy Losses section of the documentation.

Parameters:

  • period

    (DateTimeRange) –

    Period for which to get the values. Hour and minute will be removed, only the dates will be considered and are considered as inclusive.

  • group_name

    (str) –

    Name of the desired group.

  • group_type_name

    (str) –

    Name of the desired group type.

  • waterfall_type

    (Literal['actual', 'target', 'relative_abs', 'relative_perc'], default: 'relative_perc' ) –

    Type of the waterfall to get. Can be one of:

    • measured: Actual values in MWh
    • measured_perc: Actual values in percentage
    • target: Target values in MWh
    • target_perc: Target values in percentage
    • relative_abs: Difference of measured and target values in MWh
    • relative_perc: Difference of measured and target values in percentage

    By default, relative_perc is used.

  • include_p50_deviation

    (bool, default: False ) –

    Whether to include the P50 deviation in the waterfall or not. Only applicable if waterfall_type is one of relative_abs or relative_perc. By default, False.

  • output_type

    (Literal['pd.Series', 'pl.DataFrame'], default: 'pd.Series' ) –

    Whether to return a pandas Series or a polars DataFrame. By default, pandas Series is used.

Returns:

  • Series or DataFrame –

    Series or DataFrame with results for wanted group. The index is the name of the loss. Depending on the waterfall_type, there will be special rows added:

    • Gross: First row with the total value of the group. Applicable to all types, except relative types. If a percentage type is selected it will be equal to 100%.
    • Measured: Last row with containing actual measured values (net energy). Applicable to all types, except target and target_perc.
    • Target: Depending on the waterfall_type:
      • target and target_perc: Last row with the target values (expected net energy).
      • relative_abs and relative_perc: First row with the target values (expected net energy).

    If it is a polars DataFrame, it will contain the following columns: name and value

Source code in echo_postgres/kpi_energy_waterfall.py
Python
@validate_call
def get(
    self,
    period: DateTimeRange,
    group_name: str,
    group_type_name: str,
    waterfall_type: Literal["measured", "measured_perc", "target", "target_perc", "relative_abs", "relative_perc"] = "relative_perc",
    include_p50_deviation: bool = False,
    output_type: Literal["pd.Series", "pl.DataFrame"] = "pd.Series",
) -> pd.Series | pl.DataFrame:
    """Gets the energy waterfall values for a specific period, group name, group type name and waterfall type.

    The resulting values will be positive or negative depending on how each loss impacted the final value. First and last entries in the result are always positive as they represent Gross/Measured (measured, target) or Target/Measured (relative_abs, relative_perc).

    The output aims to be used directly in a waterfall chart, where the first and last entries are the total values and the intermediate entries are the losses.

    In case `include_p50_deviation` is set to True, two extra entries will be added at the beginning of the result: `P50` and `Target Adjustment`. `P50` is a total value and `Target Adjustment` is the difference between Target and P50.

    Currently this method is getting the values from `perfdb.kpis.energy.losses.values` as it contains both the lost energy values as well as the target and measured.

    For more details on how the waterfall is calculated, specially on relative_abs and relative_perc, check the `Reference/Energy Losses` section of the documentation.

    Parameters
    ----------
    period : DateTimeRange
        Period for which to get the values. Hour and minute will be removed, only the dates will be considered and are considered as inclusive.
    group_name : str
        Name of the desired group.
    group_type_name : str
        Name of the desired group type.
    waterfall_type : Literal["actual", "target", "relative_abs", "relative_perc"], optional
        Type of the waterfall to get. Can be one of:

        - **measured**: Actual values in MWh
        - **measured_perc**: Actual values in percentage
        - **target**: Target values in MWh
        - **target_perc**: Target values in percentage
        - **relative_abs**: Difference of measured and target values in MWh
        - **relative_perc**: Difference of measured and target values in percentage

        By default, **relative_perc** is used.
    include_p50_deviation : bool, optional
        Whether to include the P50 deviation in the waterfall or not. Only applicable if waterfall_type is one of `relative_abs` or `relative_perc`. By default, False.
    output_type : Literal["pd.Series", "pl.DataFrame"], optional
        Whether to return a pandas Series or a polars DataFrame. By default, pandas Series is used.

    Returns
    -------
    pd.Series or pl.DataFrame
        Series or DataFrame with results for wanted group. The index is the name of the loss. Depending on the `waterfall_type`, there will be special rows added:

        - `Gross`: First row with the total value of the group. Applicable to all types, except `relative` types. If a percentage type is selected it will be equal to 100%.
        - `Measured`: Last row with containing actual measured values (net energy). Applicable to all types, except `target` and `target_perc`.
        - `Target`: Depending on the `waterfall_type`:
            - `target` and `target_perc`: Last row with the target values (expected net energy).
            - `relative_abs` and `relative_perc`: First row with the target values (expected net energy).

        If it is a polars DataFrame, it will contain the following columns: name and value
    """
    # checking if group exists
    group_ids = self._perfdb.objects.groups.instances.get_ids()
    if group_type_name not in group_ids:
        raise ValueError(f"group_type_name {group_type_name} does not exist")
    if group_name not in group_ids[group_type_name]:
        raise ValueError(f"group_name {group_name} does not exist")

    # adjusting period
    period.start = period.start.replace(hour=0, minute=0, second=0, microsecond=0)
    period.end = period.end.replace(hour=0, minute=0, second=0, microsecond=0)

    # getting definition of losses to get order and grouping in the Waterfall

    loss_def: pl.DataFrame = self._perfdb.kpis.energy.losses.types.get(output_type="pl.DataFrame")
    # removing "considered_in_waterfall" = False
    loss_def = loss_def.filter(pl.col("considered_in_waterfall"))
    # sorting losses by order
    loss_def = loss_def.sort("loss_order")

    logger.info(
        f"Getting energy waterfall values for {period.start.date():%Y-%m-%d} to {period.end.date():%Y-%m-%d}, group {group_name}, group type {group_type_name} and waterfall type {waterfall_type}",
    )
    logger.info(f"The order of losses is {loss_def['name'].to_list()}")

    # getting measured losses values
    df: pl.DataFrame = self._perfdb.kpis.energy.losses.values.get(
        period=period,
        time_res="daily",
        aggregation_window=None,
        object_or_group_names=[group_name],
        object_group_types=[group_type_name],
        energy_losses_types=loss_def["name"].to_list(),
        output_type="pl.DataFrame",
    )

    # summing all days in the period
    df = (
        df.select("energyloss_type_name", "measured", "measured_after_loss", "target", "target_after_loss")
        .group_by("energyloss_type_name")
        .sum()
    )

    # converting to MWh (cast to Float64 for numerical precision)
    numeric_cols = ["measured", "measured_after_loss", "target", "target_after_loss"]
    df = df.with_columns([(pl.col(c) / 1000).cast(pl.Float64) for c in numeric_cols])

    # sorting losses by order - join with loss_def to get loss_order, then sort
    df = df.join(
        loss_def.select("name", "loss_order"),
        left_on="energyloss_type_name",
        right_on="name",
    ).sort("loss_order")

    # summing values of losses that should be grouped based on waterfall_group
    # ! for this to work these losses must be sequential!
    waterfall_groups = loss_def["waterfall_group"].drop_nulls().unique().to_list()
    if waterfall_groups:
        for group in waterfall_groups:
            group_losses = loss_def.filter(pl.col("waterfall_group") == group)["name"].to_list()
            # checking if group_losses are sequential in loss order
            group_orders = loss_def.filter(pl.col("name").is_in(group_losses))["loss_order"]
            min_group_order = group_orders.min()
            max_group_order = group_orders.max()
            losses_between_group = loss_def.filter(
                (pl.col("loss_order") >= min_group_order) & (pl.col("loss_order") <= max_group_order),
            )["name"].to_list()
            if set(group_losses) != set(losses_between_group):
                wrong_losses = set(losses_between_group) - set(group_losses)
                raise ValueError(
                    f"Losses in waterfall group {group} are not sequential in loss order. The following losses are in between: {wrong_losses}",
                )

            # creating a row with the sum of the group losses
            group_row = df.filter(pl.col("energyloss_type_name").is_in(group_losses)).select(
                pl.lit(group).alias("energyloss_type_name"),
                pl.col("measured").sum(),
                pl.col("measured_after_loss").min(),
                pl.col("target").sum(),
                pl.col("target_after_loss").min(),
                pl.col("loss_order").min(),
            )
            # dropping group losses from original df and inserting group row
            before_group = df.filter(pl.col("loss_order") < min_group_order)
            after_group = df.filter(pl.col("loss_order") > max_group_order)
            df = pl.concat([before_group, group_row, after_group])

    # * Calculating waterfall

    match waterfall_type:
        # measured and target values in MWh
        case "measured" | "measured_perc" | "target" | "target_perc":
            # getting main col
            main_col = "measured" if "measured" in waterfall_type else "target"

            result = df.clone()

            # dropping "uncertainty" loss
            result = result.filter(pl.col("energyloss_type_name") != "uncertainty")

            # "Gross" value: first row's after_loss + first row's loss
            first_row = result.row(0, named=True)
            gross_value = first_row[f"{main_col}_after_loss"] + first_row[main_col]

            # "Measured" or "Target" value: last row's after_loss from df (includes uncertainty)
            last_row = df.row(-1, named=True)
            end_value = last_row[f"{main_col}_after_loss"]

            # build result as name/value DataFrame
            # Gross at start, losses (negated) in middle, Measured/Target at end
            result = pl.concat(
                [
                    pl.DataFrame({"name": ["Gross"], "value": [gross_value]}),
                    result.select(
                        pl.col("energyloss_type_name").alias("name"),
                        (-pl.col(main_col)).alias("value"),
                    ),
                    pl.DataFrame({"name": [main_col.capitalize()], "value": [end_value]}),
                ],
                how="vertical_relaxed",
            )

            # converting to percentage if needed
            if waterfall_type == f"{main_col}_perc":
                result = result.with_columns(pl.col("value") / gross_value)

        # relative to target in MWh or percentage
        case "relative_abs" | "relative_perc":
            result = df.clone()

            # calculating as_percentage loss for both target and measured
            result = result.with_columns(
                (pl.col("target") / (pl.col("target") + pl.col("target_after_loss"))).alias("target_as_perc"),
                (pl.col("measured") / (pl.col("measured") + pl.col("measured_after_loss"))).alias("measured_as_perc"),
            )

            # extract values as lists for iteration
            names = result["energyloss_type_name"].to_list()
            target_as_perc_vals = result["target_as_perc"].to_list()
            measured_as_perc_vals = result["measured_as_perc"].to_list()
            last_measured_after_loss = result["measured_after_loss"][-1]
            last_target_after_loss = result["target_after_loss"][-1]

            # grossed_up = last measured_after_loss / product of (1 - measured_as_perc) for all losses
            grossed_up = last_measured_after_loss / (1 - result["measured_as_perc"]).product()

            # compute net_simulated for each loss
            # the idea here is to start from the gross up of the target (or measured if you consider that uncertainty is a loss)
            # and then simulate the net considering the target losses up to this point and the measured losses after this point
            # finally the impact is the difference between this simulated net and the previous simulated net
            # As an example, consider 3 losses A, B and C:
            # - grossed up = measured_after_loss / (1 - measured_as_perc).prod()
            # - for loss A:
            #     - net_simulated_A = grossed_up * (1 - target_as_perc_A) * (1 - measured_as_perc_B) * (1 - measured_as_perc_C)
            # - for loss B:
            #     - net_simulated_B = grossed_up * (1 - target_as_perc_A) * (1 - target_as_perc_B) * (1 - measured_as_perc_C)
            # - for loss C:
            #     - net_simulated_C = grossed_up * (1 - target_as_perc_A) * (1 - target_as_perc_B) * (1 - target_as_perc_C)
            # then the impacts are:
            # - impact_A = grossed_up - net_simulated_A
            # - impact_B = net_simulated_A - net_simulated_B
            # - impact_C = net_simulated_B - net_simulated_C
            net_simulated_vals: list[float] = []
            for i in range(len(names)):
                # product of (1 - target_as_perc) for losses BEFORE current
                target_prod = 1.0
                for j in range(i):
                    target_prod *= 1 - target_as_perc_vals[j]
                # product of (1 - measured_as_perc) for losses FROM current onwards
                measured_prod = 1.0
                for j in range(i, len(names)):
                    measured_prod *= 1 - measured_as_perc_vals[j]

                net_simulated_vals.append(grossed_up * measured_prod * target_prod)

            # calculating the relative impact
            # append target_after_loss of the last loss to net_simulated
            net_simulated_vals.append(last_target_after_loss)
            # impact = net_simulated[i] - net_simulated[i+1]  # noqa: ERA001
            impact = [net_simulated_vals[i] - net_simulated_vals[i + 1] for i in range(len(names))]

            # build result: Target at start, losses in middle, Measured at end
            result = pl.DataFrame(
                {
                    "name": ["Target"] + names + ["Measured"],  # noqa: RUF005
                    "value": [last_target_after_loss] + impact + [last_measured_after_loss],  # noqa: RUF005
                },
            )

            # adding P50 deviation if wanted
            if include_p50_deviation:
                # in case the current group type is not SPE, lets get all the SPEs that are part of the group
                if group_type_name != "SPE":
                    group_def: pl.DataFrame = self._perfdb.objects.groups.instances.get(
                        object_group_names=[group_name],
                        object_group_types=[group_type_name],
                        output_type="pl.DataFrame",
                    )
                    spe_names: list[str] = group_def.filter(
                        (pl.col("object_group_type_name") == group_type_name) & (pl.col("object_group_name") == group_name),
                    )["spe_names"][0]
                else:
                    spe_names = [group_name]
                # getting target energy to find the resource assessment used
                target_energy: pl.DataFrame = self._perfdb.kpis.energy.targets.get(
                    period=period,
                    time_res="daily",
                    object_or_group_names=spe_names,
                    object_group_types=["SPE"],
                    measurement_points=["Connection Point"],
                    output_type="pl.DataFrame",
                )
                target_resource_assessments = target_energy.select(
                    "object_or_group_name",
                    "date",
                    "target_resource_assessment_id",
                )
                resource_assessment_ids = target_energy["target_resource_assessment_id"].unique().to_list()  # type: ignore # noqa: F841
                # TODO: currently the materialized view in resourceassessments.pxx only returns the default resource assessment. We need to fix this to allow getting Pxx for the correct pxx in each year
                # getting the P50 from the resource assessments
                p50: pl.DataFrame = self._perfdb.resourceassessments.pxx.get(
                    period=period,
                    time_res="daily",
                    group_names=spe_names,
                    group_types=["SPE"],
                    resource_types=["average_power"],
                    pxx=[0.5],
                    evaluation_periods=["longterm"],
                    output_type="pl.DataFrame",
                )
                # convert to MWh and select relevant columns
                p50 = p50.select(
                    pl.col("group_name").alias("object_or_group_name"),
                    pl.col("date").cast(pl.Date),
                    (pl.col("value") / 1000 * 24).alias("p50"),  # from kW to MWh
                )
                # merge with target resource assessments
                p50 = p50.join(
                    target_resource_assessments,
                    on=["object_or_group_name", "date"],
                )
                # sum of p50 for all SPES in the group for the period
                total_p50 = p50["p50"].sum()
                # adding two rows at start for results: P50 and Target Adjustment
                target_val = result.filter(pl.col("name") == "Target")["value"][0]
                p50_rows = pl.DataFrame(
                    {
                        "name": ["P50", "Target Adjustment"],
                        "value": [total_p50, target_val - total_p50],
                    },
                )
                result = pl.concat([p50_rows, result], how="vertical_relaxed")

            # converting to percentage if needed
            if waterfall_type == "relative_perc":
                target_val = result.filter(pl.col("name") == "Target")["value"][0]
                result = result.with_columns(pl.col("value") / target_val)

    # creating display name mapping and renaming loss names
    name_to_display = dict(zip(loss_def["name"].to_list(), loss_def["display_name"].to_list(), strict=False))
    result = result.with_columns(
        pl.col("name").replace_strict(name_to_display, default=pl.col("name")),
    )

    if output_type == "pd.Series":
        result_series = result.to_pandas().set_index("name")["value"]
        result_series.name = "Loss"
        return result_series

    return result