Skip to content

KPI Availability Targets

KpiAvailabilityTargets(perfdb)

Class used for handling availability targets. Can be accessed via perfdb.kpis.availability.targets.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(period, object_names=None, availability_types=None)

Deletes availability forecast values from the database.

Parameters:

  • period

    (DateTimeRange) –

    Period of time to delete the data.

  • object_names

    (list[str], default: None ) –

    List of object names to delete data. If None will delete for all objects. By default None

  • availability_types

    (list[str], default: None ) –

    List of availability types to delete data. If None will delete for all types. By default None

Source code in echo_postgres/kpi_availability_targets.py
@validate_call
def delete(
    self,
    period: DateTimeRange,
    object_names: list[str] | None = None,
    availability_types: list[str] | None = None,
) -> None:
    """Deletes availability forecast values from the database.

    Parameters
    ----------
    period : DateTimeRange
        Period of time to delete the data.
    object_names : list[str], optional
        List of object names to delete data. If None will delete for all objects. By default None
    availability_types : list[str], optional
        List of availability types to delete data. If None will delete for all types. By default None
    """
    # build the query
    query = [
        sql.SQL("DELETE FROM performance.availability_targets WHERE (date >= {start} AND date <= {end})").format(
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]

    if object_names:
        # getting object id
        obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
        if len(obj_ids) != len(object_names):
            missing_objs = set(object_names) - set(obj_ids)
            raise ValueError(f"Could not find the following objects: {missing_objs}")
        query.append(sql.SQL(" AND object_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, obj_ids.values()))))
    if availability_types:
        # getting availability type id
        cat_ids = self._perfdb.kpis.availability.types.get_ids()
        cat_ids = {k: v for k, v in cat_ids.items() if k in availability_types}
        if len(cat_ids) != len(availability_types):
            missing_cats = set(availability_types) - set(cat_ids)
            raise ValueError(f"Could not find the following availability types: {missing_cats}")
        query.append(
            sql.SQL(" AND availability_type_id IN ({ids})").format(
                ids=sql.SQL(", ").join(map(sql.Literal, cat_ids.values())),
            ),
        )

    query = sql.Composed(query)

    # deleting data
    with self._perfdb.conn.reconnect() as conn:
        # deleting
        result = conn.execute(query)

    logger.debug(f"Deleted {result.rowcount} rows from availability_targets table")

get(period, object_names=None, availability_types=None, filter_type='and', output_type='DataFrame')

Gets all availability targets for objects.

Please keep in mind that this should only be used to get targets for objects (not groups) and for specific days (not time aggregates). If you want targets for groups and/or time aggregates use baze.kpis.availability.values which will also return the targets with the actual values.

The most useful keys/columns returned are:

  • availability
  • modified_date

Parameters:

  • period

    (DateTimeRange) –

    Period of time to get the data for.

  • object_names

    (list[str], default: None ) –

    List of object names to get the data for. By default None

  • availability_types

    (list[str], default: None ) –

    List of availability types to get the data for. By default None

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

  • output_type

    (Literal['dict', 'DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

Returns:

  • DataFrame

    In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[object_name, date, availability_type_name], columns = [availability, modified_date, ...]

  • dict[str, dict[Timestamp, dict[str, float]]]

    In case output_type is "dict", returns a dictionary in the format {object_name: {date: {availability_type_name: value}, ...}, ...}, ...}

Source code in echo_postgres/kpi_availability_targets.py
@validate_call
def get(
    self,
    period: DateTimeRange,
    object_names: list[str] | None = None,
    availability_types: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, dict[Timestamp, dict[str, float]]]:
    """Gets all availability targets for objects.

    Please keep in mind that this should only be used to get targets for objects (not groups) and for specific days (not time aggregates). If you want targets for groups and/or time aggregates use baze.kpis.availability.values which will also return the targets with the actual values.

    The most useful keys/columns returned are:

    - availability
    - modified_date

    Parameters
    ----------
    period : DateTimeRange
        Period of time to get the data for.
    object_names : list[str], optional
        List of object names to get the data for. By default None
    availability_types : list[str], optional
        List of availability types to get the data for. By default None
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"

    Returns
    -------
    DataFrame
        In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[object_name, date, availability_type_name], columns = [availability, modified_date, ...]
    dict[str, dict[Timestamp, dict[str, float]]]
        In case output_type is "dict", returns a dictionary in the format {object_name: {date: {availability_type_name: value}, ...}, ...}, ...}
    """
    # build the query
    query = [
        sql.SQL(
            "SELECT * FROM performance.v_availability_targets WHERE (date >= {start} AND date <= {end})",
        ).format(
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]

    where = []
    if object_names:
        where.append(
            sql.SQL("object_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, object_names)),
            ),
        )
    if availability_types:
        where.append(
            sql.SQL("availability_type_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, availability_types)),
            ),
        )
    if where:
        query.append(sql.SQL(" AND ("))
        query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
        query.append(sql.SQL(")"))

    query.append(sql.SQL(" ORDER BY object_name, date, availability_type_name"))

    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query, post_convert="pyarrow")
    # forcing date to be a Timestamp
    df["date"] = df["date"].astype("datetime64[s]")
    # forcing object_name and availability_type_name to be a string
    df = df.astype(
        {"object_name": "string[pyarrow]", "availability_type_name": "string[pyarrow]"},
    )
    df = df.astype(
        {"object_id": "int64[pyarrow]", "availability_type_id": "int64[pyarrow]"},
    )
    df = df.set_index(["object_name", "date", "availability_type_name"])

    if output_type == "DataFrame":
        return df

    # dropping id columns not used in dict format
    df = df.drop(columns=[col for col in df.columns if col.endswith("_id")] + ["modified_date"])
    # converting to Dict
    result = df["availability"].to_dict()
    final_result = {}
    for (object_name, date, availability_type_name), value in result.items():
        if object_name not in final_result:
            final_result[object_name] = {}
        if date not in final_result[object_name]:
            final_result[object_name][date] = {}
        final_result[object_name][date][availability_type_name] = value

    return final_result

insert(df, on_conflict='ignore')

Inserts availability target values into table availability_targets

Parameters:

  • df

    (DataFrame) –

    DataFrame with the availability targets. Must have the following columns:

    • object_name
    • availability_type_name
    • date
    • availability: Value ranging from 0 to 1 representing the availability target for that availability_type.
  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"

Source code in echo_postgres/kpi_availability_targets.py
@validate_call
def insert(self, df: DataFrame, on_conflict: Literal["ignore", "update"] = "ignore") -> None:
    """Inserts availability target values into table availability_targets

    Parameters
    ----------
    df : DataFrame
        DataFrame with the availability targets. Must have the following columns:

        - object_name
        - availability_type_name
        - date
        - availability: Value ranging from 0 to 1 representing the availability target for that availability_type.

    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict. Can be one of ["ignore", "update"].
        By default "ignore"
    """
    # checking inputs
    if df.isna().any().any():
        raise ValueError("df cannot have any NaN values")
    if set(df.columns) != {
        "object_name",
        "availability_type_name",
        "date",
        "availability",
    }:
        additional_cols = set(df.columns) - {"object_name", "availability_type_name", "date", "availability"}
        missing_cols = {"object_name", "availability_type_name", "date", "availability"} - set(df.columns)
        raise ValueError(
            f"df must have the following columns: object_name, availability_type_name, date, availability. Additional columns: {additional_cols}. Missing columns: {missing_cols}",
        )

    # making a copy of the DataFrame
    df = df.copy()
    # dropping NaN
    df = df.dropna(how="any")

    # getting object id
    wanted_objs = df["object_name"].unique().tolist()
    obj_ids = self._perfdb.objects.instances.get_ids(object_names=wanted_objs)
    if len(obj_ids) != len(wanted_objs):
        missing_objs = set(wanted_objs) - set(obj_ids)
        raise ValueError(f"Could not find the following objects: {missing_objs}")
    df["object_id"] = df["object_name"].map(obj_ids)

    # getting availability type id
    wanted_cats = df["availability_type_name"].unique().tolist()
    cat_ids = self._perfdb.kpis.availability.types.get_ids()
    cat_ids = {k: v for k, v in cat_ids.items() if k in wanted_cats}
    if len(cat_ids) != len(wanted_cats):
        missing_cats = set(wanted_cats) - set(cat_ids)
        raise ValueError(f"Could not find the following availability types: {missing_cats}")
    df["availability_type_id"] = df["availability_type_name"].map(cat_ids)

    # removing unwanted columns
    df = df.drop(columns=["object_name", "availability_type_name"])

    # inserting data
    if_exists_mapping = {
        "ignore": "append",
        "update": "update",
    }
    with self._perfdb.conn.reconnect() as conn:
        conn.pandas_to_sql(
            df=df,
            table_name="availability_targets",
            schema="performance",
            if_exists=if_exists_mapping[on_conflict],
            ignore_index=True,
        )

    logger.debug("Availability targets inserted into the database")