Skip to content

KPI Tracker Availability Amounts

KpiTrackerAvailabilityAmounts(perfdb)

Class used for handling tracker availability amounts. Can be accessed via perfdb.kpis.trackeravailability.amounts.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(period, tracker_names=None, tracker_availability_types=None)

Deletes tracker availability amounts from the database.

Parameters:

  • period

    (DateTimeRange) –

    Period of time to delete the data.

  • tracker_names

    (list[str], default: None ) –

    List of tracker names to delete data. If None will delete for all trackers. By default None

  • tracker_availability_types

    (list[str], default: None ) –

    List of tracker availability types to delete data. If None will delete for all types. By default None

Source code in echo_postgres/kpi_trackeravailability_amounts.py
@validate_call
def delete(
    self,
    period: DateTimeRange,
    tracker_names: list[str] | None = None,
    tracker_availability_types: list[Literal["Available", "Misaligned", "NotCommunicating"]] | None = None,
) -> None:
    """Deletes tracker availability amounts from the database.

    Parameters
    ----------
    period : DateTimeRange
        Period of time to delete the data.
    tracker_names : list[str], optional
        List of tracker names to delete data. If None will delete for all trackers. By default None
    tracker_availability_types : list[str], optional
        List of tracker availability types to delete data. If None will delete for all types. By default None
    """
    # build the query
    query = [
        sql.SQL("DELETE FROM performance.tracker_availability_amounts WHERE (date >= {start} AND date <= {end})").format(
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]

    if tracker_names:
        # getting object id
        obj_ids = self._perfdb.objects.instances.get_ids(
            object_names=tracker_names,
            object_types=ALLOWED_TRACKER_AVAILABILITY_OBJECT_TYPES,
        )
        if len(obj_ids) != len(tracker_names):
            missing_objs = set(tracker_names) - set(obj_ids)
            raise ValueError(f"Could not find the following trackers: {missing_objs}")
        query.append(sql.SQL(" AND tracker_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, obj_ids.values()))))
    if tracker_availability_types:
        # getting tracker availability type id
        type_ids = self._perfdb.kpis.trackeravailability.types.get_ids()
        type_ids = {k: v for k, v in type_ids.items() if k in tracker_availability_types}
        if len(type_ids) != len(tracker_availability_types):
            missing_types = set(tracker_availability_types) - set(type_ids)
            raise ValueError(f"Could not find the following tracker availability types: {missing_types}")
        query.append(
            sql.SQL(" AND tracker_availability_type_id IN ({ids})").format(
                ids=sql.SQL(", ").join(map(sql.Literal, type_ids.values())),
            ),
        )

    query = sql.Composed(query)

    # deleting data
    with self._perfdb.conn.reconnect() as conn:
        # deleting
        result = conn.execute(query)

    logger.debug(f"Deleted {result.rowcount} rows from tracker_availability_amounts table")

get(period, tracker_names=None, tracker_availability_types=None, filter_type='and', output_type='DataFrame')

Gets all tracker availability amounts with detailed information.

The most useful keys/columns returned are:

  • value
  • energy_amount

Parameters:

  • period

    (DateTimeRange) –

    Period of time to get the data for.

  • tracker_names

    (list[str], default: None ) –

    List of tracker names to get the data for. By default None

  • tracker_availability_types

    (list[str], default: None ) –

    List of tracker availability types to get the data for. By default None

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

  • output_type

    (Literal['dict', 'DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

Returns:

  • DataFrame

    In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[tracker_name, date, tracker_availability_type_name], columns = [time_amount, energy_amount, ...]

  • dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]

    In case output_type is "dict", returns a dictionary in the format {tracker_name: {date: {tracker_availability_type_name: {attribute: value, ...}, ...}, ...}, ...}

Source code in echo_postgres/kpi_trackeravailability_amounts.py
@validate_call
def get(
    self,
    period: DateTimeRange,
    tracker_names: list[str] | None = None,
    tracker_availability_types: list[Literal["Available", "Misaligned", "NotCommunicating"]] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]:
    """Gets all tracker availability amounts with detailed information.

    The most useful keys/columns returned are:

    - value
    - energy_amount

    Parameters
    ----------
    period : DateTimeRange
        Period of time to get the data for.
    tracker_names : list[str], optional
        List of tracker names to get the data for. By default None
    tracker_availability_types : list[str], optional
        List of tracker availability types to get the data for. By default None
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"

    Returns
    -------
    DataFrame
        In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[tracker_name, date, tracker_availability_type_name], columns = [time_amount, energy_amount, ...]
    dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]
        In case output_type is "dict", returns a dictionary in the format {tracker_name: {date: {tracker_availability_type_name: {attribute: value, ...}, ...}, ...}, ...}
    """
    # build the query
    query = [
        sql.SQL("SELECT * FROM performance.v_tracker_availability_amounts WHERE (date >= {start} AND date <= {end})").format(
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]

    where = []
    if tracker_names:
        where.append(sql.SQL("tracker_name IN ({names})").format(names=sql.SQL(", ").join(map(sql.Literal, tracker_names))))
    if tracker_availability_types:
        where.append(
            sql.SQL("tracker_availability_type_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, tracker_availability_types)),
            ),
        )
    if where:
        query.append(sql.SQL(" AND ("))
        query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
        query.append(sql.SQL(")"))

    query.append(sql.SQL(" ORDER BY tracker_name, date, tracker_availability_type_name"))

    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query, post_convert="pyarrow")
    # forcing date to be a Timestamp
    df["date"] = df["date"].astype("datetime64[s]")
    df = df.set_index(["tracker_name", "date", "tracker_availability_type_name"])

    if output_type == "DataFrame":
        return df

    # dropping id columns not used in dict format
    df = df.drop(columns=[col for col in df.columns if col.endswith("_id")])
    # converting to Dict
    result = df.to_dict(orient="index")
    final_result = {}
    for (tracker_name, date, tracker_availability_type_name), data in result.items():
        if tracker_name not in final_result:
            final_result[tracker_name] = {}
        if date not in final_result[tracker_name]:
            final_result[tracker_name][date] = {}
        final_result[tracker_name][date][tracker_availability_type_name] = data

    return final_result

insert(df, on_conflict='ignore')

Inserts tracker availability amounts into the database.

Parameters:

  • df

    (DataFrame) –

    DataFrame with the tracker availability amounts. Must have the following columns:

    • tracker_name
    • tracker_availability_type_name
    • date
    • value
  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"

Source code in echo_postgres/kpi_trackeravailability_amounts.py
@validate_call
def insert(
    self,
    df: DataFrame,
    on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
    """Inserts tracker availability amounts into the database.

    Parameters
    ----------
    df : DataFrame
        DataFrame with the tracker availability amounts. Must have the following columns:

        - tracker_name
        - tracker_availability_type_name
        - date
        - value

    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict. Can be one of ["ignore", "update"].
        By default "ignore"
    """
    # checking inputs
    if df.isna().any().any():
        raise ValueError("df cannot have any NaN values")
    if set(df.columns) != {
        "tracker_name",
        "tracker_availability_type_name",
        "date",
        "value",
    }:
        additional_cols = set(df.columns) - {"tracker_name", "tracker_availability_type_name", "date", "value"}
        missing_cols = {"tracker_name", "tracker_availability_type_name", "date", "value"} - set(df.columns)
        raise ValueError(
            f"df must have the following columns: tracker_name, tracker_availability_type_name, date, value. Additional columns: {additional_cols}. Missing columns: {missing_cols}",
        )

    # making a copy of the DataFrame
    df = df.copy()
    # dropping NaN
    df = df.dropna(how="any")

    # getting object id
    wanted_objs = df["tracker_name"].unique().tolist()
    obj_ids = self._perfdb.objects.instances.get_ids(object_names=wanted_objs, object_types=ALLOWED_TRACKER_AVAILABILITY_OBJECT_TYPES)
    if set(obj_ids.keys()) != set(wanted_objs):
        additional_objs = set(wanted_objs) - set(obj_ids)
        raise ValueError(f"Could not find the following trackers: {additional_objs}.")

    df["tracker_id"] = df["tracker_name"].map(obj_ids)

    # getting tracker availability type ids
    wanted_types = df["tracker_availability_type_name"].unique().tolist()
    type_ids = self._perfdb.kpis.trackeravailability.types.get_ids()
    if not set(wanted_types).issubset(set(type_ids.keys())):
        additional_types = set(wanted_types) - set(type_ids)
        raise ValueError(f"Could not find the following tracker availability types: {additional_types}")
    df["tracker_availability_type_id"] = df["tracker_availability_type_name"].map(type_ids)

    # removing unwanted columns
    df = df.drop(columns=["tracker_name", "tracker_availability_type_name"])

    # inserting data
    if_exists_mapping = {
        "ignore": "append",
        "update": "update",
    }
    with self._perfdb.conn.reconnect() as conn:
        conn.pandas_to_sql(
            df=df,
            table_name="tracker_availability_amounts",
            schema="performance",
            if_exists=if_exists_mapping[on_conflict],
            ignore_index=True,
        )

    logger.debug("Tracker availability amounts inserted into the database")

sync_bazefield(period, tracker_names=None, overwrite=False)

Method to get tracker availability numbers from Bazefield and insert them into the database.

This will save the results in the table "tracker_availability_amounts" of performance_db.

Parameters:

  • period

    (DateTimeRange) –

    Period to get tracker availability numbers from Bazefield. Values will be rounded to the nearest day. Its recommended that the start is at 00:00:00 and the end is at 23:59:59.

  • tracker_names

    (list[str] | None, default: None ) –

    Name of the trackers to get the tracker availability from. If set to None will get all that match the object types allowed in ALLOWED_TRACKER_AVAILABILITY_OBJECT_TYPES. By default None

  • overwrite

    (bool, default: False ) –

    If set to True, will overwrite the existing values in the database, by default False

Returns:

  • DataFrame

    DataFrame with the time amounts.

Source code in echo_postgres/kpi_trackeravailability_amounts.py
@validate_call
def sync_bazefield(
    self,
    period: DateTimeRange,
    tracker_names: list[str] | None = None,
    overwrite: bool = False,
) -> DataFrame:
    """Method to get tracker availability numbers from Bazefield and insert them into the database.

    This will save the results in the table "tracker_availability_amounts" of performance_db.

    Parameters
    ----------
    period : DateTimeRange
        Period to get tracker availability numbers from Bazefield. Values will be rounded to the nearest day.
        Its recommended that the start is at 00:00:00 and the end is at 23:59:59.
    tracker_names : list[str] | None, optional
        Name of the trackers to get the tracker availability from. If set to None will get all that match the object types allowed in ALLOWED_TRACKER_AVAILABILITY_OBJECT_TYPES.
        By default None
    overwrite : bool, optional
        If set to True, will overwrite the existing values in the database, by default False

    Returns
    -------
    DataFrame
        DataFrame with the time amounts.
    """
    t0 = perf_counter()

    # adjusting period to cover the whole day
    period = period.copy()
    period = period.round(timedelta(days=1), start="floor", end="ceil")
    period.end = period.end - timedelta(seconds=1)

    # getting all objects that are allowed to have tracker availability values
    allowed_objects = self._perfdb.objects.instances.get_ids(object_types=ALLOWED_TRACKER_AVAILABILITY_OBJECT_TYPES)

    #  checking if provided object names are valid
    if tracker_names is None:
        tracker_names = list(allowed_objects.keys())
    elif wrong_names := list(set(tracker_names) - set(allowed_objects.keys())):
        raise ValueError(f"Invalid object names:\n{wrong_names}")

    # getting tracker availability data from Bazefield
    logger.info(f"Getting tracker availability data from Bazefield for period {period} and objects {tracker_names}")
    baze = Baze()
    amounts = baze.kpis.trackeravailability.get(
        object_names=tracker_names,
        period=period,
        subperiod_size=timedelta(days=1),
        return_type="Amounts",
    )
    original_amounts = amounts.copy()

    # adjusting amounts to upload to the database

    # melting to move column levels to rows
    amounts = amounts.melt(ignore_index=False).reset_index(drop=False)
    # removing "Total" object
    amounts = amounts[amounts["object_name"] != "Total"].copy()
    # renaming columns
    amounts = amounts.rename(columns={"time": "date", "object_name": "tracker_name", "quantity": "tracker_availability_type_name"})

    # checking if time values are within 0 and 60 * 60 * 24
    if not amounts["value"].between(0, 60 * 60 * 24).all():
        wrong_rows = amounts[~amounts["value"].between(0, 60 * 60 * 24)]
        raise ValueError(
            f"time values must be within 0 and 60 * 60 * 24 seconds. Wrong rows: {wrong_rows[['date', 'tracker_name', 'value']]}",
        )

    # inserting tracker availability data into the database
    logger.info("Inserting tracker availabilitydata into the database")

    self.insert(df=amounts, on_conflict="update" if overwrite else "ignore")

    logger.info(
        f"Availability amounts inserted into the database in {perf_counter() - t0:.2f} seconds. Period {period} and objects {tracker_names}",
    )

    del baze

    return original_amounts