Skip to content

KPI Energy Values

KpiEnergyValues(perfdb)

Class used for handling energy KPI values. Can be accessed via perfdb.kpis.energy.values.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(period, object_names=None, measurement_points=None)

Deletes energy values from the database.

Parameters:

  • period

    (DateTimeRange) –

    Period of time to delete the data for.

  • object_names

    (list[str], default: None ) –

    List of object names to delete the data for. By default None

  • measurement_points

    (list[ALLOWED_MEASUREMENT_POINTS], default: None ) –

    List of measurement points to delete the data for, like Connection Point, Gravity Center, Asset, etc. By default None which means all measurement points will be deleted.

Source code in echo_postgres/kpi_energy_values.py
@validate_call
def delete(
    self,
    period: DateTimeRange,
    object_names: list[str] | None = None,
    measurement_points: list[ALLOWED_MEASUREMENT_POINTS] | None = None,
) -> None:
    """Deletes energy values from the database.

    Parameters
    ----------
    period : DateTimeRange
        Period of time to delete the data for.
    object_names : list[str], optional
        List of object names to delete the data for. By default None
    measurement_points : list[ALLOWED_MEASUREMENT_POINTS], optional
        List of measurement points to delete the data for, like Connection Point, Gravity Center, Asset, etc. By default None which means all measurement points will be deleted.
    """
    # build the query
    query = [
        sql.SQL("DELETE FROM performance.energy_values WHERE (date >= {start} AND date <= {end})").format(
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]
    if object_names:
        # getting object id
        obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
        if len(obj_ids) != len(object_names):
            missing_objs = set(object_names) - set(obj_ids)
            raise ValueError(f"Could not find the following objects: {missing_objs}")
        query.append(sql.SQL(" AND object_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, obj_ids.values()))))
    if measurement_points:
        # getting measurement point id
        mp_ids = self._perfdb.kpis.energy.measurementpoints.get_ids()
        mp_ids = {mp: mp_ids[mp] for mp in measurement_points}
        query.append(sql.SQL(" AND measurement_point_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, mp_ids.values()))))

    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        # deleting
        result = conn.execute(query)

    logger.debug(f"Deleted {result.rowcount} rows from energy_values table")

get(period, time_res='daily', aggregation_window=None, object_or_group_names=None, object_group_types=None, measurement_points=None, filter_type='and', output_type='DataFrame', values_only=False)

Gets energy values for the desired period and objects.

The most useful keys/columns returned are:

  • energy

Parameters:

  • period

    (DateTimeRange) –

    Period of time to get the data for.

  • time_res

    (Literal['daily', 'monthly', 'quarterly', 'yearly'], default: 'daily' ) –

    Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly"], by default "daily"

  • aggregation_window

    (Literal['mtd', 'ytd', '12m'] | None, default: None ) –

    Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None

  • object_or_group_names

    (list[str], default: None ) –

    List of object or group names to get the data for. By default None

  • object_group_types

    (list[str], default: None ) –

    List of object group types to get the data for. By default None

  • measurement_points

    (list[ALLOWED_MEASUREMENT_POINTS], default: None ) –

    List of measurement points to get the data for, like Connection Point, Gravity Center, Asset, etc. By default None

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

  • output_type

    (Literal['dict', 'DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

  • values_only

    (bool, default: False ) –

    If set to True, when returning a dict will only return the values, ignoring other attributes like modified_date. Is ignored when output_type is "DataFrame". By default False

Returns:

  • DataFrame

    In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex["group_type_name", "object_or_group_name", "measurement_point_name", "date"], columns = [value, target, efficiency, modified_date]

  • dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]

    In case output_type is "dict", returns a dictionary in the format {group_type_name: {object_or_group_name: {measurement_point_name: {date: {attribute: value, ...}, ...}, ...}, ...}

Source code in echo_postgres/kpi_energy_values.py
@validate_call
def get(
    self,
    period: DateTimeRange,
    time_res: Literal["daily", "monthly", "quarterly", "yearly"] = "daily",
    aggregation_window: Literal["mtd", "ytd", "12m"] | None = None,
    object_or_group_names: list[str] | None = None,
    object_group_types: list[str] | None = None,
    measurement_points: list[ALLOWED_MEASUREMENT_POINTS] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame"] = "DataFrame",
    values_only: bool = False,
) -> DataFrame | dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]:
    """Gets energy values for the desired period and objects.

    The most useful keys/columns returned are:

    - energy

    Parameters
    ----------
    period : DateTimeRange
        Period of time to get the data for.
    time_res : Literal["daily", "monthly", "quarterly", "yearly"], optional
        Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly"], by default "daily"
    aggregation_window : Literal["mtd", "ytd", "12m"] | None, optional
        Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None
    object_or_group_names : list[str], optional
        List of object or group names to get the data for. By default None
    object_group_types : list[str], optional
        List of object group types to get the data for. By default None
    measurement_points : list[ALLOWED_MEASUREMENT_POINTS], optional
        List of measurement points to get the data for, like Connection Point, Gravity Center, Asset, etc. By default None
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"
    values_only : bool, optional
        If set to True, when returning a dict will only return the values, ignoring other attributes like modified_date. Is ignored when output_type is "DataFrame". By default False

    Returns
    -------
    DataFrame
        In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex["group_type_name", "object_or_group_name", "measurement_point_name", "date"], columns = [value, target, efficiency, modified_date]
    dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]
        In case output_type is "dict", returns a dictionary in the format {group_type_name: {object_or_group_name: {measurement_point_name: {date: {attribute: value, ...}, ...}, ...}, ...}
    """
    # build the query
    query = [
        sql.SQL(
            "SELECT * FROM performance.{table} WHERE (date >= {start} AND date <= {end})",
        ).format(
            table=sql.Identifier(
                f"mv_energy_{time_res}{f'_{aggregation_window}' if aggregation_window else ''}",
            ),
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]

    where = []
    if object_or_group_names:
        where.append(
            sql.SQL("object_or_group_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, object_or_group_names)),
            ),
        )
    if object_group_types:
        where.append(
            sql.SQL("group_type_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, object_group_types)),
            ),
        )
    if measurement_points:
        where.append(
            sql.SQL("measurement_point_name IN ({points})").format(
                points=sql.SQL(", ").join(map(sql.Literal, measurement_points)),
            ),
        )

    if where:
        query.append(sql.SQL(" AND ("))
        query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
        query.append(sql.SQL(")"))

    query.append(sql.SQL(" ORDER BY object_or_group_name, group_type_name, measurement_point_name, date"))

    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query, post_convert="pyarrow")
    # forcing date to be a Timestamp
    df["date"] = df["date"].astype("datetime64[s]")
    # forcing object_name and object_group_name to be a string
    df = df.astype(
        {"object_or_group_name": "string[pyarrow]", "group_type_name": "string[pyarrow]", "measurement_point_name": "string[pyarrow]"},
    )
    df = df.astype(
        {"object_or_group_id": "int64[pyarrow]", "group_type_id": "int64[pyarrow]", "measurement_point_id": "int16[pyarrow]"},
    )

    df = df.set_index(["group_type_name", "object_or_group_name", "measurement_point_name", "date"])

    if output_type == "DataFrame":
        return df

    # dropping id columns not used in dict format
    df = df.drop(columns=[col for col in df.columns if col.endswith("_id")])
    # converting to Dict
    result = df.to_dict(orient="index")
    final_result = {}
    for (object_group_type_name, object_or_group_name, measurement_point_name, date), data in result.items():
        if object_group_type_name not in final_result:
            final_result[object_group_type_name] = {}
        if object_or_group_name not in final_result[object_group_type_name]:
            final_result[object_group_type_name][object_or_group_name] = {}
        if measurement_point_name not in final_result[object_group_type_name][object_or_group_name]:
            final_result[object_group_type_name][object_or_group_name][measurement_point_name] = {}
        if date not in final_result[object_group_type_name][object_or_group_name]:
            final_result[object_group_type_name][object_or_group_name][measurement_point_name][date] = (
                data["value"] if values_only else data
            )

    return final_result

insert(df, on_conflict='ignore')

Inserts energy values into the database (table energy_values)

Parameters:

  • df

    (DataFrame) –

    DataFrame with the following columns:

    • object_name
    • date
    • measurement_point ('Asset', 'Connection Point', 'Gravity Center', 'Asset Theoretical', 'Asset Operational', 'Asset Theoretical Operational')
    • energy
  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"

Source code in echo_postgres/kpi_energy_values.py
@validate_call
def insert(
    self,
    df: DataFrame,
    on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
    """Inserts energy values into the database (table energy_values)

    Parameters
    ----------
    df : DataFrame
        DataFrame with the following columns:

        - object_name
        - date
        - measurement_point ('Asset', 'Connection Point', 'Gravity Center', 'Asset Theoretical', 'Asset Operational', 'Asset Theoretical Operational')
        - energy
    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict. Can be one of ["ignore", "update"].
        By default "ignore"
    """
    # checking inputs
    required_columns = {"object_name", "date", "measurement_point", "energy"}
    if df.isna().any().any():
        raise ValueError("df cannot have NaN values")
    if set(df.columns) != required_columns:
        additional_cols = set(df.columns) - required_columns
        missing_cols = required_columns - set(df.columns)
        raise ValueError(
            f"df must have the following columns: object_name, date, measurement_point, energy. Additional columns: {additional_cols}. Missing columns: {missing_cols}",
        )

    # making a copy of df
    df = df.copy()

    # getting object id
    wanted_objs = df["object_name"].unique().tolist()
    obj_ids = self._perfdb.objects.instances.get_ids(object_names=wanted_objs)
    if len(obj_ids) != len(wanted_objs):
        missing_objs = set(wanted_objs) - set(obj_ids)
        raise ValueError(f"Could not find the following objects: {missing_objs}")
    df["object_id"] = df["object_name"].map(obj_ids)

    # getting measurement point id
    wanted_measurement_points = df["measurement_point"].unique().tolist()
    mp_ids = self._perfdb.kpis.energy.measurementpoints.get_ids()
    if wrong_mp := set(wanted_measurement_points) - set(mp_ids.keys()):
        raise ValueError(f"Could not find the following measurement points: {wrong_mp}")
    df["measurement_point_id"] = df["measurement_point"].map(mp_ids)

    # removing unwanted columns
    df = df.drop(columns=["object_name", "measurement_point"])

    # converting energy column to float
    df["energy"] = df["energy"].astype("float32")

    # checking if there are NaNs in energy column
    nan_rows = df[df["energy"].isna()].index
    if len(nan_rows) > 0:
        logger.warning(
            f"Found NaN values in energy column. Dropping {len(nan_rows)} rows (indexes: {df['date'].loc[nan_rows].tolist()})",
        )
        df = df[~df.index.isin(nan_rows)].copy()

    # inserting data
    if_exists_mapping = {
        "ignore": "append",
        "update": "update",
    }
    with self._perfdb.conn.reconnect() as conn:
        conn.pandas_to_sql(
            df=df,
            table_name="energy_values",
            schema="performance",
            if_exists=if_exists_mapping[on_conflict],
            ignore_index=True,
        )

    logger.debug("Energy values inserted into the database")

sync_bazefield(period, object_names=None, measurement_points=None, overwrite=False)

Method to get energy numbers from Bazefield and insert them into the database.

This will save the results in the table "energy_values" of performance_db.

Parameters:

  • period

    (DateTimeRange) –

    Period to get energy_ numbers from Bazefield. Values will be rounded to the nearest day. Its recommended that the start is at 00:00:00 and the end is at 23:59:59.

  • object_names

    (dict | None, default: None ) –

    Name of the objects to get the energy from. It needs to be a dictionary indicating the object type as key and a list of the desired objects like {object_type: [object_name1, object_name2, ...]}. The only object types allowed are those in ALLOWED_ENERGY_OBJECT_TYPES (spe, solar_inverter and wind_turbine). Example: {"spe": ["SS3-PEL", "SS3-SS3"], "solar_inverter": ["RBG-RBG1-TS1-INV01", "RBG-RBG1-TS1-INV02"], "wind_turbine": ["SS3-SS3-01", "SS3-SS3-02"]}. If set to None will get all that match the object types allowed in ALLOWED_ENERGY_OBJECT_TYPES. By default None

  • measurement_points

    (list[ALLOWED_MEASUREMENT_POINTS], default: None ) –

    List of measurement points to get the data for, like Connection Point, Gravity Center, Asset, etc. By default None

  • overwrite

    (bool, default: False ) –

    If set to True, will overwrite the existing values in the database, by default False

Returns:

  • DataFrame

    DataFrame with energy values inserted in the database

Source code in echo_postgres/kpi_energy_values.py
@validate_call
def sync_bazefield(
    self,
    period: DateTimeRange,
    object_names: dict | None = None,
    measurement_points: list[ALLOWED_MEASUREMENT_POINTS] | None = None,
    overwrite: bool = False,
) -> DataFrame:
    """Method to get energy numbers from Bazefield and insert them into the database.

    This will save the results in the table "energy_values" of performance_db.

    Parameters
    ----------
    period : DateTimeRange
        Period to get energy_ numbers from Bazefield. Values will be rounded to the nearest day.
        Its recommended that the start is at 00:00:00 and the end is at 23:59:59.
    object_names : dict | None, optional
        Name of the objects to get the energy from.
        It needs to be a dictionary indicating the object type as key and a list of the desired objects like {object_type: [object_name1, object_name2, ...]}.
        The only object types allowed are those in ALLOWED_ENERGY_OBJECT_TYPES (spe, solar_inverter and wind_turbine).
        Example: {"spe": ["SS3-PEL", "SS3-SS3"], "solar_inverter": ["RBG-RBG1-TS1-INV01", "RBG-RBG1-TS1-INV02"], "wind_turbine": ["SS3-SS3-01", "SS3-SS3-02"]}.
        If set to None will get all that match the object types allowed in ALLOWED_ENERGY_OBJECT_TYPES.
        By default None
    measurement_points : list[ALLOWED_MEASUREMENT_POINTS], optional
        List of measurement points to get the data for, like Connection Point, Gravity Center, Asset, etc. By default None
    overwrite : bool, optional
        If set to True, will overwrite the existing values in the database, by default False

    Returns
    -------
    DataFrame
        DataFrame with energy values inserted in the database
    """

    def transform_df(df: DataFrame, interval: int, measurement_point: str, features: list[str]) -> DataFrame:
        """Helper function to transform the DataFrame to the desired format."""
        if measurement_point in {"Asset Operational", "Asset Theoretical Operational"}:
            names = df.columns.get_level_values("object_name").unique()
            for obj_name in names:
                # Filtering condition where IEC-OperationState_5min.REP is not equal to 4
                condition_op_state = df[obj_name, f"IEC-OperationState_{interval}min.REP"] != 4
                # Filtering condition where CurtailmentState_5min.REP is equal to 1 (park curtailment)
                condition_curtailment = df[obj_name, f"CurtailmentState_{interval}min.REP"] == 1
                # Final condition where either of the above conditions is True
                final_condition = condition_op_state | condition_curtailment
                # Setting values to NaN when IEC-OperationState_5min.REP is not equal to 4
                df.loc[final_condition, (obj_name, features)] = np.nan
                # drop IEC-OperationState_5min.REP column
                df = df.drop(columns=(obj_name, f"IEC-OperationState_{interval}min.REP"))

        # dropping second level
        df = df.droplevel(1, axis=1)
        # converting to energy
        df = df * (interval / 60)
        # resampling to day
        daily_values = df.resample("D", closed="right").sum()

        if measurement_point == "Connection Point":
            # removing "-SMF1" from column names
            daily_values.columns = daily_values.columns.str.replace("-SMF1", "")

        # adjusting values to upload to the database
        # melting the DataFrame
        values = daily_values.reset_index().melt(id_vars="index", var_name="object_name", value_name="energy")
        values = values.rename(columns={"index": "date"})
        values["measurement_point"] = measurement_point
        return values

    t0 = perf_counter()

    # adjusting period to cover the whole day
    period = period.copy()
    period = period.round(timedelta(days=1), start="floor", end="ceil")

    # getting all objects that are allowed to have energy values
    allowed_objects = self._perfdb.objects.instances.get_ids(object_types=ALLOWED_ENERGY_OBJECT_TYPES)

    #  checking if provided object names are valid
    logger.info("Checking object names and measurement points")
    if object_names is None:
        objects = self._perfdb.objects.instances.get(object_types=ALLOWED_ENERGY_OBJECT_TYPES)
        # removing all objects with TEST in their name
        objects = {k: v for k, v in objects.items() if "TEST" not in k}
        object_names = defaultdict(list)
        for key, value in objects.items():
            object_names[value.get("object_type_name")].append(key)
        object_names = dict(object_names)
    else:
        object_names_as_list = set(functools.reduce(operator.iadd, object_names.values(), []))
        if wrong_names := list(object_names_as_list - set(allowed_objects.keys())):
            raise ValueError(f"Invalid object names: {wrong_names}")
    logger.info("Finished checking object names and measurement points")

    # checking if object names are valid for the given measurement points
    if measurement_points is None:
        measurement_points = ["Connection Point", "Asset Theoretical", "Asset", "Asset Theoretical Operational", "Asset Operational"]
    if "Connection Point" in measurement_points and "spe" not in object_names:
        raise ValueError("Connection Point measurement point requires 'spe' objects to be provided in object_names.")
    if ("Asset Theoretical" in measurement_points or "Asset Theoretical Operational" in measurement_points) and (
        "wind_turbine" not in object_names and "solar_inverter" not in object_names
    ):
        raise ValueError(
            "Asset Theoretical measurement point requires 'wind_turbine' or 'solar_inverter' objects to be provided in object_names.",
        )
    if ("Asset" in measurement_points or "Asset Operational" in measurement_points) and (
        "wind_turbine" not in object_names and "solar_inverter" not in object_names
    ):
        raise ValueError(
            "Asset measurement point requires 'wind_turbine' or 'solar_inverter' objects to be provided in object_names.",
        )

    baze = Baze()
    features_period = period.copy()
    features_period.start = features_period.start - timedelta(minutes=10)
    features_period.end = features_period.end + timedelta(minutes=10)
    values = DataFrame()
    features_dict = {
        "Connection Point": {"spe": ["ActivePowerCP_5min.AVG"]},
        "Asset": {
            "solar_inverter": ["ActivePower_5min.AVG"],
            "wind_turbine": ["ActivePower_10min.AVG"],
        },
        "Asset Operational": {
            "solar_inverter": ["ActivePower_5min.AVG", "IEC-OperationState_5min.REP", "CurtailmentState_5min.REP"],
            "wind_turbine": ["ActivePower_10min.AVG", "IEC-OperationState_10min.REP", "CurtailmentState_10min.REP"],
        },
        "Asset Theoretical": {
            "solar_inverter": ["ActivePowerTheoretical_5min.AVG"],
            "wind_turbine": ["ActivePowerTheoretical_10min.AVG"],
        },
        "Asset Theoretical Operational": {
            "solar_inverter": ["ActivePowerTheoretical_5min.AVG", "IEC-OperationState_5min.REP", "CurtailmentState_5min.REP"],
            "wind_turbine": ["ActivePowerTheoretical_10min.AVG", "IEC-OperationState_10min.REP", "CurtailmentState_10min.REP"],
        },
    }
    for point in measurement_points:
        if point not in features_dict:
            raise ValueError(f"Invalid measurement point: {point}. Allowed values are {list(features_dict.keys())}")
        for obj_type in object_names:
            if obj_type not in features_dict[point]:
                continue
            wanted_features = (
                {f"{obj}": features_dict[point][obj_type] for obj in object_names[obj_type]}
                if obj_type != "spe"
                else {f"{obj}-SMF1": features_dict[point][obj_type] for obj in object_names[obj_type]}
            )
            frequency = 5 if obj_type in ["solar_inverter", "spe"] else 10
            feature_values = baze.points.values.series.get(
                points=wanted_features,
                reindex=f"{frequency}min",
                period=features_period,
                round_timestamps={"freq": timedelta(minutes=frequency), "tolerance": timedelta(minutes=frequency / 2)},
            )
            values_aux = transform_df(feature_values, frequency, point, features_dict[point][obj_type])
            values = concat([values, values_aux], axis=0)

    # removing outside period
    values = values[
        (values["date"] >= period.start) & (values["date"] < period.end)
    ]  # < used at end to avoid including the next day at 00:00:00

    # inserting energy data into the database
    logger.info("Inserting energy values data into the database")

    self.insert(df=values, on_conflict="update" if overwrite else "ignore")

    logger.info(
        f"Energy values inserted into the database in {perf_counter() - t0:.2f} seconds. Period {period} and objects {object_names}",
    )

    del baze

    return values