Skip to content

KPI Availability Forecasts

KpiAvailabilityForecasts(perfdb)

Class used for handling availability forecasts. Can be accessed via perfdb.kpis.availability.forecasts.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(period, object_names=None, availability_categories=None)

Deletes availability forecast values from the database.

Parameters:

  • period

    (DateTimeRange) –

    Period of time to delete the data.

  • object_names

    (list[str], default: None ) –

    List of object names to delete data. If None will delete for all objects. By default None

  • availability_categories

    (list[str], default: None ) –

    List of availability categories to delete data. If None will delete for all categories. By default None

Source code in echo_postgres/kpi_availability_forecasts.py
@validate_call
def delete(
    self,
    period: DateTimeRange,
    object_names: list[str] | None = None,
    availability_categories: list[str] | None = None,
) -> None:
    """Deletes availability forecast values from the database.

    Parameters
    ----------
    period : DateTimeRange
        Period of time to delete the data.
    object_names : list[str], optional
        List of object names to delete data. If None will delete for all objects. By default None
    availability_categories : list[str], optional
        List of availability categories to delete data. If None will delete for all categories. By default None
    """
    # build the query
    query = [
        sql.SQL("DELETE FROM performance.availability_forecasts WHERE (date >= {start} AND date <= {end})").format(
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]

    if object_names:
        # getting object id
        obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
        if len(obj_ids) != len(object_names):
            missing_objs = set(object_names) - set(obj_ids)
            raise ValueError(f"Could not find the following objects: {missing_objs}")
        query.append(sql.SQL(" AND object_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, obj_ids.values()))))
    if availability_categories:
        # getting availability category id
        cat_ids = self._perfdb.kpis.availability.categories.get_ids()
        cat_ids = {k: v for k, v in cat_ids.items() if k in availability_categories}
        if len(cat_ids) != len(availability_categories):
            missing_cats = set(availability_categories) - set(cat_ids)
            raise ValueError(f"Could not find the following availability categories: {missing_cats}")
        query.append(
            sql.SQL(" AND availability_category_id IN ({ids})").format(
                ids=sql.SQL(", ").join(map(sql.Literal, cat_ids.values())),
            ),
        )

    query = sql.Composed(query)

    # deleting data
    with self._perfdb.conn.reconnect() as conn:
        # deleting
        result = conn.execute(query)

    logger.debug(f"Deleted {result.rowcount} rows from availability_forecasts table")

get(period, object_names=None, availability_categories=None, filter_type='and', output_type='DataFrame')

Gets all availability forecasts with detailed information.

The most useful keys/columns returned are:

  • availability
  • modified_date

Parameters:

  • period

    (DateTimeRange) –

    Period of time to get the data for.

  • object_names

    (list[str], default: None ) –

    List of object names to get the data for. By default None

  • availability_categories

    (list[str], default: None ) –

    List of availability categories to get the data for. By default None

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

  • output_type

    (Literal['dict', 'DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

Returns:

  • DataFrame

    In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[object_name, date, availability_category_name], columns = [availability, modified_date, ...]

  • dict[str, dict[Timestamp, dict[str, float]]]

    In case output_type is "dict", returns a dictionary in the format {object_name: {date: {availability_category_name: value}, ...}, ...}, ...}

Source code in echo_postgres/kpi_availability_forecasts.py
@validate_call
def get(
    self,
    period: DateTimeRange,
    object_names: list[str] | None = None,
    availability_categories: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, dict[Timestamp, dict[str, float]]]:
    """Gets all availability forecasts with detailed information.

    The most useful keys/columns returned are:

    - availability
    - modified_date

    Parameters
    ----------
    period : DateTimeRange
        Period of time to get the data for.
    object_names : list[str], optional
        List of object names to get the data for. By default None
    availability_categories : list[str], optional
        List of availability categories to get the data for. By default None
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"

    Returns
    -------
    DataFrame
        In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[object_name, date, availability_category_name], columns = [availability, modified_date, ...]
    dict[str, dict[Timestamp, dict[str, float]]]
        In case output_type is "dict", returns a dictionary in the format {object_name: {date: {availability_category_name: value}, ...}, ...}, ...}
    """
    # build the query
    query = [
        sql.SQL(
            "SELECT * FROM performance.v_availability_forecasts WHERE (date >= {start} AND date <= {end})",
        ).format(
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]

    where = []
    if object_names:
        where.append(
            sql.SQL("object_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, object_names)),
            ),
        )
    if availability_categories:
        where.append(
            sql.SQL("availability_category_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, availability_categories)),
            ),
        )
    if where:
        query.append(sql.SQL(" AND ("))
        query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
        query.append(sql.SQL(")"))

    query.append(sql.SQL(" ORDER BY object_name, date, availability_category_name"))

    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query, post_convert="pyarrow")
    # forcing date to be a Timestamp
    df["date"] = df["date"].astype("datetime64[s]")
    # forcing object_name and availability_category_name to be a string
    df = df.astype(
        {"object_name": "string[pyarrow]", "availability_category_name": "string[pyarrow]"},
    )
    df = df.astype(
        {"object_id": "int64[pyarrow]", "availability_category_id": "int64[pyarrow]"},
    )
    df = df.set_index(["object_name", "date", "availability_category_name"])

    if output_type == "DataFrame":
        return df

    # dropping id columns not used in dict format
    df = df.drop(columns=[col for col in df.columns if col.endswith("_id")] + ["modified_date"])
    # converting to Dict
    result = df["availability"].to_dict()
    final_result = {}
    for (object_name, date, availability_category_name), value in result.items():
        if object_name not in final_result:
            final_result[object_name] = {}
        if date not in final_result[object_name]:
            final_result[object_name][date] = {}
        final_result[object_name][date][availability_category_name] = value

    return final_result

insert(df, on_conflict='ignore')

Inserts availability forecast values into table availability_forecasts

Parameters:

  • df

    (DataFrame) –

    DataFrame with the availability amounts. Must have the following columns:

    • object_name
    • availability_category_name
    • date
    • availability: Value ranging from 0 to 1 representing the availability for that day considering only this category. You can interpret 1 minus value as the unavailability impact for that category in that day.
  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"

Source code in echo_postgres/kpi_availability_forecasts.py
@validate_call
def insert(self, df: DataFrame, on_conflict: Literal["ignore", "update"] = "ignore") -> None:
    """Inserts availability forecast values into table availability_forecasts

    Parameters
    ----------
    df : DataFrame
        DataFrame with the availability amounts. Must have the following columns:

        - object_name
        - availability_category_name
        - date
        - availability: Value ranging from 0 to 1 representing the availability for that day considering only this category. You can interpret 1 minus value as the unavailability impact for that category in that day.

    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict. Can be one of ["ignore", "update"].
        By default "ignore"
    """
    # checking inputs
    if df.isna().any().any():
        raise ValueError("df cannot have any NaN values")
    if set(df.columns) != {
        "object_name",
        "availability_category_name",
        "date",
        "availability",
    }:
        additional_cols = set(df.columns) - {"object_name", "availability_category_name", "date", "availability"}
        missing_cols = {"object_name", "availability_category_name", "date", "availability"} - set(df.columns)
        raise ValueError(
            f"df must have the following columns: object_name, availability_category_name, date, availability. Additional columns: {additional_cols}. Missing columns: {missing_cols}",
        )

    # making a copy of the DataFrame
    df = df.copy()
    # dropping NaN
    df = df.dropna(how="any")

    # getting object id
    wanted_objs = df["object_name"].unique().tolist()
    obj_ids = self._perfdb.objects.instances.get_ids(object_names=wanted_objs)
    if len(obj_ids) != len(wanted_objs):
        missing_objs = set(wanted_objs) - set(obj_ids)
        raise ValueError(f"Could not find the following objects: {missing_objs}")
    df["object_id"] = df["object_name"].map(obj_ids)

    # getting availability category id
    wanted_cats = df["availability_category_name"].unique().tolist()
    cat_ids = self._perfdb.kpis.availability.categories.get_ids()
    cat_ids = {k: v for k, v in cat_ids.items() if k in wanted_cats}
    if len(cat_ids) != len(wanted_cats):
        missing_cats = set(wanted_cats) - set(cat_ids)
        raise ValueError(f"Could not find the following availability categories: {missing_cats}")
    df["availability_category_id"] = df["availability_category_name"].map(cat_ids)

    # removing unwanted columns
    df = df.drop(columns=["object_name", "availability_category_name"])

    # inserting data
    if_exists_mapping = {
        "ignore": "append",
        "update": "update",
    }
    with self._perfdb.conn.reconnect() as conn:
        conn.pandas_to_sql(
            df=df,
            table_name="availability_forecasts",
            schema="performance",
            if_exists=if_exists_mapping[on_conflict],
            ignore_index=True,
        )

    logger.debug("Availability forecasts inserted into the database")