Skip to content

KPI Energy Losses - Values

For more details on energy losses data and waterfall calculation see this dedicated page in the reference section.

KpiEnergyLossesValues(perfdb)

Class used for handling energy losses KPI values. Can be accessed via perfdb.kpis.energy.losses.values.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(period, object_names=None, energy_losses_types=None)

Deletes energy loss values from the database.

Parameters:

  • period

    (DateTimeRange) –

    Period of time to delete the data for.

  • object_names

    (list[str], default: None ) –

    List of object names to delete the data for. By default None

  • energy_losses_types

    (list[str], default: None ) –

    List of energy loss types to delete the data for. By default None

Source code in echo_postgres/kpi_energy_losses_values.py
@validate_call
def delete(
    self,
    period: DateTimeRange,
    object_names: list[str] | None = None,
    energy_losses_types: list[str] | None = None,
) -> None:
    """Deletes energy loss values from the database.

    Parameters
    ----------
    period : DateTimeRange
        Period of time to delete the data for.
    object_names : list[str], optional
        List of object names to delete the data for. By default None
    energy_losses_types : list[str], optional
        List of energy loss types to delete the data for. By default None
    """
    # build the query
    query = [
        sql.SQL("DELETE FROM performance.energyloss_values WHERE (date >= {start} AND date <= {end})").format(
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]
    if object_names:
        # getting object id
        obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
        if len(obj_ids) != len(object_names):
            missing_objs = set(object_names) - set(obj_ids)
            raise ValueError(f"Could not find the following objects: {missing_objs}")
        query.append(sql.SQL(" AND object_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, obj_ids.values()))))
    if energy_losses_types:
        # getting energy losses types ids
        elt_ids = self._perfdb.kpis.energy.losses.types.get_ids()
        elt_ids = {elt: elt_ids[elt] for elt in energy_losses_types}
        query.append(sql.SQL(" AND energyloss_type_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, elt_ids.values()))))

    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        # deleting
        result = conn.execute(query)

    logger.debug(f"Deleted {result.rowcount} rows from energyloss_values table")

get(period, time_res='daily', aggregation_window=None, object_or_group_names=None, object_group_types=None, energy_losses_types=None, filter_type='and', output_type='DataFrame', values_only=False)

Gets energy losses values for the desired period and objects.

The most useful keys/columns returned are:

  • measured: Actual measured value for the period in kWh
  • measured_as_percentage: Actual measured value for the period as a percentage
  • measured_after_loss: What's the measured energy production after the losses in kWh.
  • target: Target value for the period in kWh
  • target_as_percentage: Target value for the period as a percentage.
  • target_after_loss: What's the target energy production after the losses in kWh.

Keep in mind that:

  • as_percentage and after_loss do take into account the loss_order defined in table energyloss_types. Currently the last loss is park_curtailment and the after_loss value will be equal to the measured value (from energy_values tables) or the target value (from energy_targets tables) at the connection point.
  • Percentage values are alway calculated considering the gross energy production (before losses). So, as an example for measured, it will be equivalent to measured / (measured + measured_after_loss).

Parameters:

  • period

    (DateTimeRange) –

    Period of time to get the data for.

  • time_res

    (Literal['daily', 'monthly', 'quarterly', 'yearly'], default: 'daily' ) –

    Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly"], by default "daily"

  • aggregation_window

    (Literal['mtd', 'ytd', '12m'] | None, default: None ) –

    Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None

  • object_or_group_names

    (list[str], default: None ) –

    List of object or group names to get the data for. By default None

  • object_group_types

    (list[str], default: None ) –

    List of object group types to get the data for. By default None

  • energy_losses_types

    (list[str], default: None ) –

    List of energy loss types to delete the data for. By default None

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

  • output_type

    (Literal['dict', 'DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

  • values_only

    (bool, default: False ) –

    If set to True, when returning a dict will only return the values, ignoring other attributes like modified_date. Is ignored when output_type is "DataFrame". By default False

Returns:

  • DataFrame

    In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex["group_type_name", "object_or_group_name", "energyloss_type_name", "date"], columns = [energy, modified_date]

  • dict[str, dict[str, dict[str, dict[Timestamp, dict[str, Any]]]]] | dict[str, dict[str, dict[str, dict[Timestamp, Any]]]]

    In case output_type is "dict", returns a dictionary in the format {group_type_name: {object_or_group_name: {energyloss_type_name: {date: {attribute: value, ...}, ...}, ...}, ...} In case values_only is True, will return a dictionary in the format {group_type_name: {object_or_group_name: {energyloss_type_name: {date: value, ...}, ...}, ...}

Source code in echo_postgres/kpi_energy_losses_values.py
@validate_call
def get(
    self,
    period: DateTimeRange,
    time_res: Literal["daily", "monthly", "quarterly", "yearly"] = "daily",
    aggregation_window: Literal["mtd", "ytd", "12m"] | None = None,
    object_or_group_names: list[str] | None = None,
    object_group_types: list[str] | None = None,
    energy_losses_types: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame"] = "DataFrame",
    values_only: bool = False,
) -> (
    DataFrame | dict[str, dict[str, dict[str, dict[Timestamp, dict[str, Any]]]]] | dict[str, dict[str, dict[str, dict[Timestamp, Any]]]]
):
    """Gets energy losses values for the desired period and objects.

    The most useful keys/columns returned are:

    - measured: Actual measured value for the period in kWh
    - measured_as_percentage: Actual measured value for the period as a percentage
    - measured_after_loss: What's the measured energy production after the losses in kWh.
    - target: Target value for the period in kWh
    - target_as_percentage: Target value for the period as a percentage.
    - target_after_loss: What's the target energy production after the losses in kWh.

    Keep in mind that:

    - as_percentage and after_loss do take into account the loss_order defined in table energyloss_types. Currently the last loss is park_curtailment and the after_loss value will be equal to the measured value (from energy_values tables) or the target value (from energy_targets tables) at the connection point.
    - Percentage values are alway calculated considering the gross energy production (before losses). So, as an example for measured, it will be equivalent to measured / (measured + measured_after_loss).

    Parameters
    ----------
    period : DateTimeRange
        Period of time to get the data for.
    time_res : Literal["daily", "monthly", "quarterly", "yearly"], optional
        Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly"], by default "daily"
    aggregation_window : Literal["mtd", "ytd", "12m"] | None, optional
        Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None
    object_or_group_names : list[str], optional
        List of object or group names to get the data for. By default None
    object_group_types : list[str], optional
        List of object group types to get the data for. By default None
    energy_losses_types : list[str], optional
        List of energy loss types to delete the data for. By default None
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"
    values_only : bool, optional
        If set to True, when returning a dict will only return the values, ignoring other attributes like modified_date. Is ignored when output_type is "DataFrame". By default False

    Returns
    -------
    DataFrame
        In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex["group_type_name", "object_or_group_name", "energyloss_type_name", "date"], columns = [energy, modified_date]
    dict[str, dict[str, dict[str, dict[Timestamp, dict[str, Any]]]]] | dict[str, dict[str, dict[str, dict[Timestamp, Any]]]]
        In case output_type is "dict", returns a dictionary in the format {group_type_name: {object_or_group_name: {energyloss_type_name: {date: {attribute: value, ...}, ...}, ...}, ...}
        In case values_only is True, will return a dictionary in the format {group_type_name: {object_or_group_name: {energyloss_type_name: {date: value, ...}, ...}, ...}
    """
    # getting loss type definitions to get bazefield point
    loss_types_def = self._perfdb.kpis.energy.losses.types.get(output_type="dict")

    # checking if all energy losses types are valid
    if energy_losses_types is not None and (wrong_types := list(set(energy_losses_types) - set(loss_types_def.keys()))):
        raise ValueError(f"Invalid energy losses types: {wrong_types}")

    # build the query
    query = [
        sql.SQL(
            "SELECT * FROM performance.{table} WHERE (date >= {start} AND date <= {end})",
        ).format(
            table=sql.Identifier(
                f"mv_energyloss_values_{time_res}{f'_{aggregation_window}' if aggregation_window else ''}",
            ),
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]

    where = []
    if object_or_group_names:
        where.append(
            sql.SQL("object_or_group_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, object_or_group_names)),
            ),
        )
    if object_group_types:
        where.append(
            sql.SQL("group_type_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, object_group_types)),
            ),
        )
    if energy_losses_types:
        where.append(
            sql.SQL("energyloss_type_name IN ({points})").format(
                points=sql.SQL(", ").join(map(sql.Literal, energy_losses_types)),
            ),
        )

    if where:
        query.append(sql.SQL(" AND ("))
        query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
        query.append(sql.SQL(")"))

    query.append(sql.SQL(" ORDER BY object_or_group_name, group_type_name, energyloss_type_name, date"))

    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query, post_convert="pyarrow")
    # forcing date to be a Timestamp
    df["date"] = df["date"].astype("datetime64[s]")
    # forcing object_name and object_group_name to be a string
    df = df.astype(
        {"object_or_group_name": "string[pyarrow]", "group_type_name": "string[pyarrow]", "energyloss_type_name": "string[pyarrow]"},
    )
    df = df.astype(
        {"object_or_group_id": "int64[pyarrow]", "group_type_id": "int64[pyarrow]", "energyloss_type_id": "int16[pyarrow]"},
    )

    df = df.set_index(["group_type_name", "object_or_group_name", "energyloss_type_name", "date"])

    if output_type == "DataFrame":
        return df

    # dropping id columns not used in dict format
    df = df.drop(columns=[col for col in df.columns if col.endswith("_id")])
    # converting to Dict
    result = df.to_dict(orient="index")
    final_result = {}
    for (object_group_type_name, object_or_group_name, energyloss_type_name, date), data in result.items():
        if object_group_type_name not in final_result:
            final_result[object_group_type_name] = {}
        if object_or_group_name not in final_result[object_group_type_name]:
            final_result[object_group_type_name][object_or_group_name] = {}
        if energyloss_type_name not in final_result[object_group_type_name][object_or_group_name]:
            final_result[object_group_type_name][object_or_group_name][energyloss_type_name] = {}
        if date not in final_result[object_group_type_name][object_or_group_name]:
            final_result[object_group_type_name][object_or_group_name][energyloss_type_name][date] = (
                data["measured"] if values_only else data
            )

    return final_result

insert(df, on_conflict='ignore')

Inserts energy losses values into the database (table energyloss_values)

Parameters:

  • df

    (DataFrame) –

    DataFrame with the following columns:

    • object_name
    • date
    • energy_loss_type ('park_curtailment', ...)
    • value
  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"

Source code in echo_postgres/kpi_energy_losses_values.py
@validate_call
def insert(
    self,
    df: DataFrame,
    on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
    """Inserts energy losses values into the database (table energyloss_values)

    Parameters
    ----------
    df : DataFrame
        DataFrame with the following columns:

        - object_name
        - date
        - energy_loss_type ('park_curtailment', ...)
        - value
    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict. Can be one of ["ignore", "update"].
        By default "ignore"
    """
    # checking inputs
    required_columns = {"object_name", "date", "energy_loss_type", "value"}
    if df.isna().any().any():
        raise ValueError("df cannot have NaN values")
    if set(df.columns) != required_columns:
        additional_cols = set(df.columns) - required_columns
        missing_cols = required_columns - set(df.columns)
        raise ValueError(
            f"df must have the following columns: object_name, date, energy_loss_type, value. Additional columns: {additional_cols}. Missing columns: {missing_cols}",
        )

    # making a copy of df
    df = df.copy()

    # getting object id
    wanted_objs = df["object_name"].unique().tolist()
    obj_ids = self._perfdb.objects.instances.get_ids(object_names=wanted_objs)
    if len(obj_ids) != len(wanted_objs):
        missing_objs = set(wanted_objs) - set(obj_ids)
        raise ValueError(f"Could not find the following objects: {missing_objs}")
    df["object_id"] = df["object_name"].map(obj_ids)

    # getting energy loss type id
    wanted_energy_losses_types = df["energy_loss_type"].unique().tolist()
    elt_ids = self._perfdb.kpis.energy.losses.types.get_ids()
    if wrong_wlt := set(wanted_energy_losses_types) - set(elt_ids.keys()):
        raise ValueError(f"Could not find the following measurement points: {wrong_wlt}")
    df["energyloss_type_id"] = df["energy_loss_type"].map(elt_ids)

    # removing unwanted columns
    df = df.drop(columns=["object_name", "energy_loss_type"])

    # converting energy column to float
    df["value"] = df["value"].astype("float32")

    # checking if there are NaNs in energy column
    nan_rows = df[df["value"].isna()].index
    if len(nan_rows) > 0:
        logger.warning(
            f"Found NaN values in value column. Dropping {len(nan_rows)} rows (indexes: {df['date'].loc[nan_rows].tolist()})",
        )
        df = df[~df.index.isin(nan_rows)].copy()

    # inserting data
    if_exists_mapping = {
        "ignore": "append",
        "update": "update",
    }
    with self._perfdb.conn.reconnect() as conn:
        conn.pandas_to_sql(
            df=df,
            table_name="energyloss_values",
            schema="performance",
            if_exists=if_exists_mapping[on_conflict],
            ignore_index=True,
        )

    logger.debug(f"Energy loss values inserted into the database. Total of {len(df)} rows inserted/updated.")

sync_bazefield(period, object_names=None, energy_losses_types=None, overwrite=False)

Method to get energy numbers from Bazefield and insert them into the database.

This will save the results in the table "energy_values" of performance_db.

Parameters:

  • period

    (DateTimeRange) –

    Period to get energy_ numbers from Bazefield. Values will be rounded to the nearest day. Its recommended that the start is at 00:00:00 and the end is at 23:59:59.

  • object_names

    (list[str] | None, default: None ) –

    Name of the objects to get the energy from (must be an SPE). If set to None will get all that match the object types allowed in ALLOWED_ENERGY_LOSSES_OBJECT_MODELS. By default None

  • energy_losses_types

    (list[str] | None, default: None ) –

    List of energy loss types (park_curtailment, etc...) to get the energy from. By default None

  • overwrite

    (bool, default: False ) –

    If set to True, will overwrite the existing values in the database, by default False

Returns:

  • DataFrame

    DataFrame with energy values inserted in the database

Source code in echo_postgres/kpi_energy_losses_values.py
@validate_call
def sync_bazefield(
    self,
    period: DateTimeRange,
    object_names: list[str] | None = None,
    energy_losses_types: list[str] | None = None,
    overwrite: bool = False,
) -> DataFrame:
    """Method to get energy numbers from Bazefield and insert them into the database.

    This will save the results in the table "energy_values" of performance_db.

    Parameters
    ----------
    period : DateTimeRange
        Period to get energy_ numbers from Bazefield. Values will be rounded to the nearest day.
        Its recommended that the start is at 00:00:00 and the end is at 23:59:59.
    object_names : list[str] | None, optional
        Name of the objects to get the energy from (must be an SPE). If set to None will get all that match the object types allowed in ALLOWED_ENERGY_LOSSES_OBJECT_MODELS.
        By default None
    energy_losses_types : list[str] | None, optional
        List of energy loss types (park_curtailment, etc...) to get the energy from. By default None
    overwrite : bool, optional
        If set to True, will overwrite the existing values in the database, by default False

    Returns
    -------
    DataFrame
        DataFrame with energy values inserted in the database
    """
    t0 = perf_counter()

    # adjusting period to cover the whole day
    period = period.copy()
    period = period.round(timedelta(days=1), start="floor", end="ceil")

    # getting all objects that are allowed to have energy values
    allowed_objects = self._perfdb.objects.instances.get(object_models=ALLOWED_ENERGY_LOSSES_OBJECT_MODELS, output_type="DataFrame")

    #  checking if provided object names are valid
    if object_names is None:
        object_names = list(allowed_objects.index)
        # removing all objects with TEST in their name
        object_names = [name for name in object_names if "TEST" not in name]
    elif wrong_names := list(set(object_names) - set(allowed_objects.index)):
        raise ValueError(f"Invalid object names: {wrong_names}")

    # getting loss type definitions to get bazefield point
    loss_types_def = self._perfdb.kpis.energy.losses.types.get(output_type="dict")
    # removing all loss types where source != "bazefield" and in case bazefield_point is None
    loss_types_def = {k: v for k, v in loss_types_def.items() if v["source"] == "bazefield" and v["bazefield_point"] is not None}

    # checking if all energy losses types are valid
    if energy_losses_types is not None:
        if wrong_types := list(set(energy_losses_types) - set(loss_types_def.keys())):
            raise ValueError(
                f"Invalid energy losses types: {wrong_types}. Check if they are defined and if they have source='bazefield' and a valid bazefield_point",
            )
        # filtering loss_types_def
        loss_types_def = {k: v for k, v in loss_types_def.items() if k in energy_losses_types}

    # creating connection to Bazefield
    baze = Baze()

    # iterating each resource type
    for loss_type, loss_type_attrs in loss_types_def.items():
        # getting the bazefield point
        bazefield_point = loss_type_attrs["bazefield_point"]
        # adjusting object_names depending on the applicable source
        applicable_to = loss_type_attrs["applicable_to"]
        if applicable_to in {"solar", "wind"}:
            model = f"{applicable_to}_farm"
            object_names = [obj for obj in object_names if allowed_objects.loc[obj, "object_model_name"] == model]
        # getting values from tag for all objects
        wanted_points = {obj: [bazefield_point] for obj in object_names}
        point_period = period.copy()
        point_period.start = point_period.start - timedelta(minutes=10)
        point_period.end = point_period.end + timedelta(minutes=10)

        # regex to get 5min or 10min from bazefield point
        feature_freq = re.findall(r"\d{1,2}min", bazefield_point)
        if len(feature_freq) > 1:
            raise ValueError(f"Found more than one frequency in {bazefield_point}")
        if not feature_freq:
            logger.info(f"Could not find frequency in {bazefield_point}. Considering reindex as none to get value from Bazefield")
        feature_freq = feature_freq[0] if feature_freq else None

        # getting values
        values = baze.points.values.series.get(points=wanted_points, reindex=feature_freq, period=point_period)

        # dropping second level
        values = values.droplevel(1, axis=1)

        # converting from kW to kWh
        if feature_freq is not None and "min" in feature_freq:
            freq = int(feature_freq.replace("min", ""))
        elif re.findall(r"1d", bazefield_point) == ["1d"]:
            freq = 1440  # number of minutes in one day
            values.index.name = None
        else:
            raise ValueError(f"Could not find frequency in {bazefield_point}. Please check the bazefield point format.")

        values = values * (freq / 60)
        # resampling to day
        daily_values = values.resample("D").sum()
        # adjusting values to upload to the database

        # melting the DataFrame
        values = daily_values.reset_index().melt(id_vars="index", var_name="object_name", value_name="value")
        values = values.rename(columns={"index": "date"})
        values["energy_loss_type"] = loss_type

        # removing outside period
        values = values[
            (values["date"] >= period.start) & (values["date"] < period.end)
        ]  # < used at end to avoid including the next day at 00:00:00

        # inserting energy loss data into the database
        logger.info("Inserting energy loss values data into the database")

        if values.empty:
            logger.info(
                f"No energy loss values found for {loss_type} in period {period} for object {object_names}. Skipping insertion.",
            )
            continue
        self.insert(df=values, on_conflict="update" if overwrite else "ignore")

        logger.info(
            f"Energy loss values for {loss_type} inserted into the database in {perf_counter() - t0:.2f} seconds. Period {period} and objects {object_names}",
        )
    del baze

    return values