Skip to content

KPI Tracker Availability

KpiTrackerAvailabilityValues(perfdb)

Class used for handling tracker availability values. Can be accessed via perfdb.kpis.trackeravailability.values.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

get(period, time_res='daily', aggregation_window=None, tracker_or_group_names=None, group_types=None, filter_type='and', output_type='DataFrame')

Gets all tracker availability values with detailed information.

The most useful keys/columns returned are:

  • time_misaligned
  • time_notcommunicating
  • time_available
  • total_time
  • percentage_misaligned
  • percentage_notcommunicating
  • percentage_available

Parameters:

  • period

    (DateTimeRange) –

    Period of time to get the data for.

  • time_res

    (Literal['daily', 'weekly', 'monthly', 'quarterly', 'yearly'], default: 'daily' ) –

    Time resolution of the data. Can be one of ["daily", "weekly", "monthly", "quarterly", "yearly"], by default "daily"

  • aggregation_window

    (Literal['mtd', 'ytd', '12m'] | None, default: None ) –

    Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None

  • tracker_or_group_names

    (list[str], default: None ) –

    List of tracker or group names to get the data for. By default None

  • group_types

    (list[str], default: None ) –

    List of group types to get the data for. By default None

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

  • output_type

    (Literal['dict', 'DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

Returns:

  • DataFrame

    In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[group_type_name, tracker_or_group_name, date], columns = [time_misaligned, time_available, ...]

  • dict[str, dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]]

    In case output_type is "dict", returns a dictionary in the format {group_type_name: {tracker_or_group_name: {date: {attribute: value, ...}, ...}, ...}, ...}

Source code in echo_postgres/kpi_trackeravailability_values.py
@validate_call
def get(
    self,
    period: DateTimeRange,
    time_res: Literal["daily", "weekly", "monthly", "quarterly", "yearly"] = "daily",
    aggregation_window: Literal["mtd", "ytd", "12m"] | None = None,
    tracker_or_group_names: list[str] | None = None,
    group_types: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]]:
    """Gets all tracker availability values with detailed information.

    The most useful keys/columns returned are:

    - time_misaligned
    - time_notcommunicating
    - time_available
    - total_time
    - percentage_misaligned
    - percentage_notcommunicating
    - percentage_available

    Parameters
    ----------
    period : DateTimeRange
        Period of time to get the data for.
    time_res : Literal["daily", "weekly", "monthly", "quarterly", "yearly"], optional
        Time resolution of the data. Can be one of ["daily", "weekly", "monthly", "quarterly", "yearly"], by default "daily"
    aggregation_window : Literal["mtd", "ytd", "12m"] | None, optional
        Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None
    tracker_or_group_names : list[str], optional
        List of tracker or group names to get the data for. By default None
    group_types : list[str], optional
        List of group types to get the data for. By default None
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"

    Returns
    -------
    DataFrame
        In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[group_type_name, tracker_or_group_name, date], columns = [time_misaligned, time_available, ...]
    dict[str, dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]]
        In case output_type is "dict", returns a dictionary in the format {group_type_name: {tracker_or_group_name: {date: {attribute: value, ...}, ...}, ...}, ...}
    """
    # in case of weekly we hav note implemented yet the aggregation_window
    if time_res == "weekly" and aggregation_window:
        raise ValueError("aggregation_window is not implemented for weekly time_res")

    # build the query
    query = [
        sql.SQL(
            "SELECT * FROM performance.{table} WHERE (date >= {start} AND date <= {end})",
        ).format(
            table=sql.Identifier(
                f"mv_tracker_availability_{time_res}{f'_{aggregation_window}' if aggregation_window else ''}",
            ),
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]

    where = []
    if tracker_or_group_names:
        where.append(
            sql.SQL("tracker_or_group_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, tracker_or_group_names)),
            ),
        )
    if group_types:
        where.append(
            sql.SQL("group_type_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, group_types)),
            ),
        )
    if where:
        query.append(sql.SQL(" AND ("))
        query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
        query.append(sql.SQL(")"))

    query.append(sql.SQL(" ORDER BY tracker_or_group_name, group_type_name, date"))

    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query, post_convert="pyarrow")
    # forcing date to be a Timestamp
    df["date"] = df["date"].astype("datetime64[s]")
    # forcing object_name and object_group_name to be a string
    df = df.astype(
        {"tracker_or_group_name": "string[pyarrow]", "group_type_name": "string[pyarrow]"},
    )
    df = df.astype({"tracker_or_group_id": "int64[pyarrow]", "group_type_id": "int64[pyarrow]"})

    df = df.set_index(["group_type_name", "tracker_or_group_name", "date"])

    if output_type == "DataFrame":
        return df

    # dropping id columns not used in dict format
    df = df.drop(columns=[col for col in df.columns if col.endswith("_id")])
    # converting to Dict
    result = df.to_dict(orient="index")
    final_result = {}
    for (group_type_name, tracker_or_group_name, date), data in result.items():
        if group_type_name not in final_result:
            final_result[group_type_name] = {}
        if tracker_or_group_name not in final_result[group_type_name]:
            final_result[group_type_name][tracker_or_group_name] = {}
        if date not in final_result[group_type_name][tracker_or_group_name]:
            final_result[group_type_name][tracker_or_group_name][date] = data

    return final_result