Skip to content

KPI Resource Values

KpiResourceValues(perfdb)

Class used for handling resource KPI values. Can be accessed via perfdb.kpis.resource.values.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(period, object_names=None, resource_types=None)

Deletes resource values from the database.

Parameters:

  • period

    (DateTimeRange) –

    Period of time to delete the data for.

  • object_names

    (list[str], default: None ) –

    List of object names to delete the data for. By default None

  • resource_types

    (list[str], default: None ) –

    List of resource types to delete the data for. By default None

Source code in echo_postgres/kpi_resource_values.py
@validate_call
def delete(
    self,
    period: DateTimeRange,
    object_names: list[str] | None = None,
    resource_types: list[str] | None = None,
) -> None:
    """Deletes resource values from the database.

    Parameters
    ----------
    period : DateTimeRange
        Period of time to delete the data for.
    object_names : list[str], optional
        List of object names to delete the data for. By default None
    resource_types : list[str], optional
        List of resource types to delete the data for. By default None
    """
    # validate the input
    if resource_types:
        rs_ids = self._perfdb.kpis.resource.types.get_ids()
        if wrong_rst := set(resource_types) - set(rs_ids):
            raise ValueError(f"Could not find the following resource types: {wrong_rst}")

    # build the query
    query = [
        sql.SQL("DELETE FROM performance.resource_values WHERE (date >= {start} AND date <= {end})").format(
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]
    if object_names:
        # getting object id
        obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
        if len(obj_ids) != len(object_names):
            missing_objs = set(object_names) - set(obj_ids)
            raise ValueError(f"Could not find the following objects: {missing_objs}")
        query.append(sql.SQL(" AND object_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, obj_ids.values()))))
    if resource_types:
        rs_ids = {rt: rs_ids[rt] for rt in resource_types}
        query.append(sql.SQL(" AND resource_type_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, rs_ids.values()))))

    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        # deleting
        result = conn.execute(query)

    logger.debug(f"Deleted {result.rowcount} rows from resource_values table")

get(period, time_res='daily', aggregation_window=None, object_or_group_names=None, object_group_types=None, resource_types=None, filter_type='and', output_type='DataFrame', values_only=False)

Gets resource values for the desired period and objects.

The most useful keys/columns returned are:

  • value

Parameters:

  • period

    (DateTimeRange) –

    Period of time to get the data for.

  • time_res

    (Literal['daily', 'monthly', 'quarterly', 'yearly'], default: 'daily' ) –

    Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly"], by default "daily"

  • aggregation_window

    (Literal['mtd', 'ytd', '12m'] | None, default: None ) –

    Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None

  • object_or_group_names

    (list[str], default: None ) –

    List of object or group names to get the data for. By default None

  • object_group_types

    (list[str], default: None ) –

    List of object group types to get the data for. By default None

  • resource_types

    (list[str], default: None ) –

    List of resource types to delete the data for. By default None

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

  • output_type

    (Literal['dict', 'DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

  • values_only

    (bool, default: False ) –

    If set to True, when returning a dict will only return the values, ignoring other attributes like modified_date. Is ignored when output_type is "DataFrame". By default False

Returns:

  • DataFrame

    In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex["group_type_name", "object_or_group_name", "resource_type_name", "date"], columns = [resource, modified_date]

  • dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]

    In case output_type is "dict", returns a dictionary in the format {group_type_name: {object_or_group_name: {date: {resource_type_name: {attribute: value, ...}, ...}, ...}, ...}

Source code in echo_postgres/kpi_resource_values.py
@validate_call
def get(
    self,
    period: DateTimeRange,
    time_res: Literal["daily", "monthly", "quarterly", "yearly"] = "daily",
    aggregation_window: Literal["mtd", "ytd", "12m"] | None = None,
    object_or_group_names: list[str] | None = None,
    object_group_types: list[str] | None = None,
    resource_types: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame"] = "DataFrame",
    values_only: bool = False,
) -> DataFrame | dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]:
    """Gets resource values for the desired period and objects.

    The most useful keys/columns returned are:

    - value

    Parameters
    ----------
    period : DateTimeRange
        Period of time to get the data for.
    time_res : Literal["daily", "monthly", "quarterly", "yearly"], optional
        Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly"], by default "daily"
    aggregation_window : Literal["mtd", "ytd", "12m"] | None, optional
        Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None
    object_or_group_names : list[str], optional
        List of object or group names to get the data for. By default None
    object_group_types : list[str], optional
        List of object group types to get the data for. By default None
    resource_types : list[str], optional
        List of resource types to delete the data for. By default None
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"
    values_only : bool, optional
        If set to True, when returning a dict will only return the values, ignoring other attributes like modified_date. Is ignored when output_type is "DataFrame". By default False

    Returns
    -------
    DataFrame
        In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex["group_type_name", "object_or_group_name", "resource_type_name", "date"], columns = [resource, modified_date]
    dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]
        In case output_type is "dict", returns a dictionary in the format {group_type_name: {object_or_group_name: {date: {resource_type_name: {attribute: value, ...}, ...}, ...}, ...}
    """
    # build the query
    query = [
        sql.SQL(
            "SELECT * FROM performance.{table} WHERE (date >= {start} AND date <= {end})",
        ).format(
            table=sql.Identifier(
                f"mv_resource_values_{time_res}{f'_{aggregation_window}' if aggregation_window else ''}",
            ),
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]

    where = []
    if object_or_group_names:
        where.append(
            sql.SQL("object_or_group_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, object_or_group_names)),
            ),
        )
    if object_group_types:
        where.append(
            sql.SQL("group_type_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, object_group_types)),
            ),
        )
    if resource_types:
        where.append(
            sql.SQL("resource_type_name IN ({points})").format(
                points=sql.SQL(", ").join(map(sql.Literal, resource_types)),
            ),
        )

    if where:
        query.append(sql.SQL(" AND ("))
        query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
        query.append(sql.SQL(")"))

    query.append(sql.SQL(" ORDER BY object_or_group_name, group_type_name, resource_type_name, date"))

    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query, post_convert="pyarrow")
    # forcing date to be a Timestamp
    df["date"] = df["date"].astype("datetime64[s]")
    # forcing object_name and object_group_name to be a string
    df = df.astype(
        {"object_or_group_name": "string[pyarrow]", "group_type_name": "string[pyarrow]", "resource_type_name": "string[pyarrow]"},
    )
    df = df.astype(
        {"object_or_group_id": "int64[pyarrow]", "group_type_id": "int64[pyarrow]", "resource_type_id": "int16[pyarrow]"},
    )

    df = df.set_index(["group_type_name", "object_or_group_name", "resource_type_name", "date"])

    if output_type == "DataFrame":
        return df

    # dropping id columns not used in dict format
    df = df.drop(columns=[col for col in df.columns if col.endswith("_id")])
    # converting to Dict
    result = df.to_dict(orient="index")
    final_result = {}
    for (object_group_type_name, object_or_group_name, resource_type_name, date), data in result.items():
        if object_group_type_name not in final_result:
            final_result[object_group_type_name] = {}
        if object_or_group_name not in final_result[object_group_type_name]:
            final_result[object_group_type_name][object_or_group_name] = {}
        if resource_type_name not in final_result[object_group_type_name][object_or_group_name]:
            final_result[object_group_type_name][object_or_group_name][resource_type_name] = {}
        if date not in final_result[object_group_type_name][object_or_group_name]:
            final_result[object_group_type_name][object_or_group_name][resource_type_name][date] = (
                data["value"] if values_only else data
            )

    return final_result

insert(df, on_conflict='ignore')

Inserts resource values into the database (table resource_values)

Parameters:

  • df

    (DataFrame) –

    DataFrame with the following columns:

    • object_name
    • date
    • resource_type ('wind_speed', 'solar_irradiance_poa', ...)
    • value
  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"

Source code in echo_postgres/kpi_resource_values.py
@validate_call
def insert(
    self,
    df: DataFrame,
    on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
    """Inserts resource values into the database (table resource_values)

    Parameters
    ----------
    df : DataFrame
        DataFrame with the following columns:

        - object_name
        - date
        - resource_type ('wind_speed', 'solar_irradiance_poa', ...)
        - value
    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict. Can be one of ["ignore", "update"].
        By default "ignore"
    """
    # checking inputs
    required_columns = {"object_name", "date", "resource_type", "value"}
    if df.isna().any().any():
        raise ValueError("df cannot have NaN values")
    if set(df.columns) != required_columns:
        additional_cols = set(df.columns) - required_columns
        missing_cols = required_columns - set(df.columns)
        raise ValueError(
            f"df must have the following columns: object_name, date, resource_type, value. Additional columns: {additional_cols}. Missing columns: {missing_cols}",
        )

    # making a copy of df
    df = df.copy()

    # getting object id
    wanted_objs = df["object_name"].unique().tolist()
    obj_ids = self._perfdb.objects.instances.get_ids(object_names=wanted_objs)
    if len(obj_ids) != len(wanted_objs):
        missing_objs = set(wanted_objs) - set(obj_ids)
        raise ValueError(f"Could not find the following objects: {missing_objs}")
    df["object_id"] = df["object_name"].map(obj_ids)

    # getting resource type id
    wanted_resource_types = df["resource_type"].unique().tolist()
    rt_ids = self._perfdb.kpis.resource.types.get_ids()
    if wrong_rt := set(wanted_resource_types) - set(rt_ids.keys()):
        raise ValueError(f"Could not find the following measurement points: {wrong_rt}")
    df["resource_type_id"] = df["resource_type"].map(rt_ids)

    # removing unwanted columns
    df = df.drop(columns=["object_name", "resource_type"])

    # converting resource column to float
    df["value"] = df["value"].astype("float32")

    # checking if there are NaNs in resource column
    nan_rows = df[df["value"].isna()].index
    if len(nan_rows) > 0:
        logger.warning(
            f"Found NaN values in value column. Dropping {len(nan_rows)} rows (indexes: {df['date'].loc[nan_rows].tolist()})",
        )
        df = df[~df.index.isin(nan_rows)].copy()

    # inserting data
    if_exists_mapping = {
        "ignore": "append",
        "update": "update",
    }
    with self._perfdb.conn.reconnect() as conn:
        conn.pandas_to_sql(
            df=df,
            table_name="resource_values",
            schema="performance",
            if_exists=if_exists_mapping[on_conflict],
            ignore_index=True,
        )

    logger.debug("Resource values inserted into the database")

sync_bazefield(period, object_names=None, resource_types=None, overwrite=False)

Method to get resource KPIs numbers from Bazefield and insert them into the database.

This will save the results in the table "resource_values" of performance_db.

Parameters:

  • period

    (DateTimeRange) –

    Period to get resource KPIs numbers from Bazefield. Values will be rounded to the nearest day. Its recommended that the start is at 00:00:00 and the end is at 23:59:59.

  • object_names

    (list[str] | None, default: None ) –

    Name of the objects to get the resource values from. If set to None will get all that match the object types allowed in ALLOWED_RESOURCE_OBJECT_MODELS. By default None

  • resource_types

    (list[str] | None, default: None ) –

    List of measurement points to get the availability from. Usually 'wind_speed' or 'solar_irradiance_poa' should be used. By default None

  • overwrite

    (bool, default: False ) –

    If set to True, will overwrite the existing values in the database, by default False

Returns:

  • DataFrame

    DataFrame with resource values inserted in the database

Source code in echo_postgres/kpi_resource_values.py
@validate_call
def sync_bazefield(
    self,
    period: DateTimeRange,
    object_names: list[str] | None = None,
    resource_types: list[str] | None = None,
    overwrite: bool = False,
) -> DataFrame:
    """Method to get resource KPIs numbers from Bazefield and insert them into the database.

    This will save the results in the table "resource_values" of performance_db.

    Parameters
    ----------
    period : DateTimeRange
        Period to get resource KPIs numbers from Bazefield. Values will be rounded to the nearest day.
        Its recommended that the start is at 00:00:00 and the end is at 23:59:59.
    object_names : list[str] | None, optional
        Name of the objects to get the resource values from. If set to None will get all that match the object types allowed in ALLOWED_RESOURCE_OBJECT_MODELS.
        By default None
    resource_types : list[str] | None, optional
        List of measurement points to get the availability from. Usually 'wind_speed' or 'solar_irradiance_poa' should be used. By default None
    overwrite : bool, optional
        If set to True, will overwrite the existing values in the database, by default False

    Returns
    -------
    DataFrame
        DataFrame with resource values inserted in the database
    """
    # imported here to avoid circular imports
    from echo_meteo.utils import resample_mean

    t0 = perf_counter()

    # adjusting period to cover the whole day
    period = period.copy()
    period = period.round(timedelta(days=1), start="floor", end="ceil")

    # getting all objects that are allowed to have resource values
    allowed_objects = {}
    for resource_type, allowed_object_models in ALLOWED_RESOURCE_OBJECT_MODELS.items():
        if resource_types and resource_type not in resource_types:
            continue
        objs = self._perfdb.objects.instances.get_ids(object_models=allowed_object_models)
        allowed_objects[resource_type] = list(objs.keys())

    #  checking if provided object names are valid
    if object_names is None:
        object_names = allowed_objects
    else:
        not_found_objs = []
        found_objs = {}
        for obj in object_names:
            found_obj = False
            for resource_type, objs in allowed_objects.items():
                if obj in objs:
                    found_obj = True
                    if resource_type not in found_objs:
                        found_objs[resource_type] = []
                    found_objs[resource_type].append(obj)
                    break
            if not found_obj:
                not_found_objs.append(obj)
        if not_found_objs:
            raise ValueError(
                f"Could not find the following objects {not_found_objs} considering resource types {list(allowed_objects.keys())}",
            )
        object_names = found_objs

    # getting resource type definitions to get bazefield point
    resource_types_def = self._perfdb.kpis.resource.types.get(output_type="dict")

    # creating connection to Bazefield
    baze = Baze()

    # iterating each resource type
    for resource_type, objects in object_names.items():
        # getting the bazefield point for the resource type
        bazefield_point = resource_types_def[resource_type]["bazefield_point"]
        # getting values from tag for all objects
        wanted_points = {obj: [bazefield_point] for obj in objects}
        point_period = period.copy()
        point_period.start = point_period.start - timedelta(minutes=10)
        point_period.end = point_period.end + timedelta(minutes=10)

        # regex to get 5min or 10min from bazefield point
        feature_freq_match = re.search(r"(\d{1,2})min", bazefield_point)
        if not feature_freq_match:
            raise ValueError(f"Could not find frequency in {bazefield_point}")
        feature_freq = feature_freq_match.group(0)
        feature_freq_int = int(feature_freq_match.group(1))

        # getting values
        values = baze.points.values.series.get(
            points=wanted_points,
            reindex=feature_freq,
            period=point_period,
            round_timestamps={"freq": timedelta(minutes=feature_freq_int), "tolerance": timedelta(minutes=2)},
        )

        values = values.ffill().bfill()

        # dropping second level
        values = values.droplevel(1, axis=1)

        # resampling to day
        daily_values = resample_mean(values, "D", min_rr=0.3)

        # adjusting values to upload to the database

        # melting the DataFrame
        values = daily_values.reset_index().melt(id_vars="index", var_name="object_name", value_name="value")
        values = values.rename(columns={"index": "date"})
        values["resource_type"] = resource_type

        # removing outside period
        values = values[
            (values["date"] >= period.start) & (values["date"] < period.end)
        ]  # < used at end to avoid including the next day at 00:00:00

        # checking if any rows have values lower or equal to to 0 (invalid)
        wrong_idx = values[values["value"] <= 0].index
        if len(wrong_idx) > 0:
            logger.warning(
                f"Found {len(wrong_idx)} rows with values lower or equal to 0. Dropping these rows \n{values.loc[wrong_idx]}",
            )
            values = values[~values.index.isin(wrong_idx)].copy()

        # inserting resource data into the database
        logger.info("Inserting resource values data into the database")

        self.insert(df=values, on_conflict="update" if overwrite else "ignore")

        logger.info(
            f"Resource values for {resource_type} inserted into the database in {perf_counter() - t0:.2f} seconds. Period {period} and objects {objects}",
        )

    del baze

    return values