Skip to content

KPI Availability Forecast Assumptions

KpiAvailabilityForecastAssumptions(perfdb)

Class used for handling availability forecast assumptions. Can be accessed via perfdb.kpis.availability.forecasts.assumptions.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/kpi_availability_forecast_assumptions.py
Python
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Class used for handling availability forecast assumptions. Can be accessed via `perfdb.kpis.availability.forecasts.assumptions`.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.
    """
    super().__init__(perfdb)

    from .kpi_availability_forecast_assumption_assets import KpiAvailabilityForecastAssumptionAssets

    self.assets = KpiAvailabilityForecastAssumptionAssets(perfdb)

delete(ids)

Deletes availability forecast assumptions by ID.

Note: linked assets (in availability_forecast_assumption_assets) are cascade-deleted.

Parameters:

  • ids

    (list[int]) –

    List of assumption IDs to delete.

Returns:

  • int

    Number of rows deleted.

Source code in echo_postgres/kpi_availability_forecast_assumptions.py
Python
@validate_call
def delete(self, ids: list[int]) -> int:
    """Deletes availability forecast assumptions by ID.

    Note: linked assets (in availability_forecast_assumption_assets) are cascade-deleted.

    Parameters
    ----------
    ids : list[int]
        List of assumption IDs to delete.

    Returns
    -------
    int
        Number of rows deleted.
    """
    query = sql.SQL("DELETE FROM performance.availability_forecast_assumptions WHERE id = ANY({ids})").format(
        ids=sql.Literal(ids),
    )
    self._perfdb.conn.execute(query)
    deleted = self._perfdb.conn.rowcount
    logger.debug(f"Deleted {deleted} availability forecast assumption(s).")
    return deleted

get(ids=None, revision_names=None, availability_categories=None, period=None, filter_type='and', output_type='pl.DataFrame')

Gets availability forecast assumptions with full details.

The most useful keys/columns returned are:

  • id
  • revision_name
  • availability_category_name
  • description
  • applicable_from
  • applicable_to
  • period_days
  • expected_downtime_hours
  • availability_impact
  • number_of_assets
  • assets_involved

Parameters:

  • ids

    (list[int] | None, default: None ) –

    List of assumption IDs to filter. By default None.

  • revision_names

    (list[str] | None, default: None ) –

    List of revision names to filter. By default None.

  • availability_categories

    (list[str] | None, default: None ) –

    List of availability category names to filter. By default None.

  • period

    (DateTimeRange | None, default: None ) –

    Date range; returns assumptions whose applicable period overlaps with this range. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

  • output_type

    (Literal['dict', 'DataFrame', 'pl.DataFrame'], default: 'pl.DataFrame' ) –

    Output type. By default "pl.DataFrame".

Returns:

  • dict[int, dict[str, Any]]

    In case output_type is "dict", returns {id: {col: val, ...}, ...}.

  • DataFrame

    In case output_type is "DataFrame", returns a pandas DataFrame indexed by id.

  • DataFrame

    In case output_type is "pl.DataFrame", returns a Polars DataFrame.

Source code in echo_postgres/kpi_availability_forecast_assumptions.py
Python
@validate_call
def get(
    self,
    ids: list[int] | None = None,
    revision_names: list[str] | None = None,
    availability_categories: list[str] | None = None,
    period: DateTimeRange | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
    """Gets availability forecast assumptions with full details.

    The most useful keys/columns returned are:

    - id
    - revision_name
    - availability_category_name
    - description
    - applicable_from
    - applicable_to
    - period_days
    - expected_downtime_hours
    - availability_impact
    - number_of_assets
    - assets_involved

    Parameters
    ----------
    ids : list[int] | None, optional
        List of assumption IDs to filter. By default None.
    revision_names : list[str] | None, optional
        List of revision names to filter. By default None.
    availability_categories : list[str] | None, optional
        List of availability category names to filter. By default None.
    period : DateTimeRange | None, optional
        Date range; returns assumptions whose applicable period overlaps with this range. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".
    output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
        Output type. By default "pl.DataFrame".

    Returns
    -------
    dict[int, dict[str, Any]]
        In case output_type is "dict", returns {id: {col: val, ...}, ...}.
    pd.DataFrame
        In case output_type is "DataFrame", returns a pandas DataFrame indexed by id.
    pl.DataFrame
        In case output_type is "pl.DataFrame", returns a Polars DataFrame.
    """
    where = self._check_get_args(ids, revision_names, availability_categories, period, filter_type)
    query = sql.SQL(
        "SELECT * FROM performance.v_availability_forecast_assumptions {where} ORDER BY revision_name, applicable_from",
    ).format(where=where)
    df = self._perfdb.conn.read_to_polars(query, schema_overrides=self._cols_schema)
    return convert_output(df, output_type, index_col="id")

get_defaults(years=None, months=None, object_names=None, revision_names=None, filter_type='and', output_type='pl.DataFrame')

Gets the per-asset / per-month resolution of the assumptions that are the default for each (asset, year, month) according to availability_forecast_revision_assignments.

Backed by performance.v_availability_forecast_defaults_assumptions. Emits one row per (asset, year, month, assumption) where the asset's revision assignment for that month points to the assumption's revision and the assumption's applicable period overlaps the month.

asset_hours is capped at days_in_slice * 24 and availability_impact is the slice-relative fraction (0..1); month_availability_impact is the calendar-month-relative fraction (0..1).

Parameters:

  • years

    (list[int] | None, default: None ) –

    List of years to filter. By default None.

  • months

    (list[int] | None, default: None ) –

    List of months (1-12) to filter. By default None.

  • object_names

    (list[str] | None, default: None ) –

    List of object names to filter. By default None.

  • revision_names

    (list[str] | None, default: None ) –

    List of revision names to filter. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

  • output_type

    (Literal['dict', 'DataFrame', 'pl.DataFrame'], default: 'pl.DataFrame' ) –

    Output type. By default "pl.DataFrame".

Returns:

  • dict[Any, dict[str, Any]]

    In case output_type is "dict", returns {(object_name, year, month, record_id): {col: val, ...}, ...}.

  • DataFrame

    In case output_type is "DataFrame", returns a pandas DataFrame with MultiIndex (object_name, year, month, record_id).

  • DataFrame

    In case output_type is "pl.DataFrame", returns a Polars DataFrame.

Source code in echo_postgres/kpi_availability_forecast_assumptions.py
Python
@validate_call
def get_defaults(
    self,
    years: list[int] | None = None,
    months: list[int] | None = None,
    object_names: list[str] | None = None,
    revision_names: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[Any, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
    """Gets the per-asset / per-month resolution of the assumptions that are the default for each
    (asset, year, month) according to availability_forecast_revision_assignments.

    Backed by ``performance.v_availability_forecast_defaults_assumptions``. Emits one row per
    (asset, year, month, assumption) where the asset's revision assignment for that month points
    to the assumption's revision and the assumption's applicable period overlaps the month.

    ``asset_hours`` is capped at ``days_in_slice * 24`` and ``availability_impact`` is the slice-relative
    fraction (0..1); ``month_availability_impact`` is the calendar-month-relative fraction (0..1).

    Parameters
    ----------
    years : list[int] | None, optional
        List of years to filter. By default None.
    months : list[int] | None, optional
        List of months (1-12) to filter. By default None.
    object_names : list[str] | None, optional
        List of object names to filter. By default None.
    revision_names : list[str] | None, optional
        List of revision names to filter. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".
    output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
        Output type. By default "pl.DataFrame".

    Returns
    -------
    dict[Any, dict[str, Any]]
        In case output_type is "dict", returns {(object_name, year, month, record_id): {col: val, ...}, ...}.
    pd.DataFrame
        In case output_type is "DataFrame", returns a pandas DataFrame with MultiIndex
        (object_name, year, month, record_id).
    pl.DataFrame
        In case output_type is "pl.DataFrame", returns a Polars DataFrame.
    """
    where = (
        WhereClauseBuilder(filter_type=filter_type)
        .add_any("year", years)
        .add_any("month", months)
        .add_any("object_name", object_names)
        .add_any("revision_name", revision_names)
        .build()
    )
    query = sql.SQL(
        "SELECT * FROM performance.v_availability_forecast_defaults_assumptions {where} "
        "ORDER BY object_name, year, month, record_id",
    ).format(where=where)
    df = self._perfdb.conn.read_to_polars(query, schema_overrides=self._defaults_cols_schema)
    return convert_output(df, output_type, index_col=["object_name", "year", "month", "record_id"])

get_ids(revision_names=None, availability_categories=None, period=None, filter_type='and')

Gets IDs of availability forecast assumptions.

Parameters:

  • revision_names

    (list[str] | None, default: None ) –

    List of revision names to filter. By default None.

  • availability_categories

    (list[str] | None, default: None ) –

    List of availability category names to filter. By default None.

  • period

    (DateTimeRange | None, default: None ) –

    Date range; returns assumptions whose applicable period overlaps with this range. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

Returns:

  • list[int]

    List of assumption IDs.

Source code in echo_postgres/kpi_availability_forecast_assumptions.py
Python
@validate_call
def get_ids(
    self,
    revision_names: list[str] | None = None,
    availability_categories: list[str] | None = None,
    period: DateTimeRange | None = None,
    filter_type: Literal["and", "or"] = "and",
) -> list[int]:
    """Gets IDs of availability forecast assumptions.

    Parameters
    ----------
    revision_names : list[str] | None, optional
        List of revision names to filter. By default None.
    availability_categories : list[str] | None, optional
        List of availability category names to filter. By default None.
    period : DateTimeRange | None, optional
        Date range; returns assumptions whose applicable period overlaps with this range. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".

    Returns
    -------
    list[int]
        List of assumption IDs.
    """
    where = self._check_get_args(None, revision_names, availability_categories, period, filter_type)
    query = sql.SQL("SELECT id FROM performance.v_availability_forecast_assumptions {where} ORDER BY id").format(where=where)
    df = self._perfdb.conn.read_to_polars(query, schema_overrides=self._cols_schema)
    return df["id"].to_list()

insert(revision_name=None, availability_category_name=None, description=None, applicable_from=None, applicable_to=None, expected_downtime_hours=None, availability_impact=None, user_name=None, data_df=None, on_conflict='raise')

Inserts one or more availability forecast assumptions.

You can pass individual values to insert a single assumption, or a DataFrame for batch insert.

Exactly one of expected_downtime_hours or availability_impact must be provided per row.

Parameters:

  • revision_name

    (str | None, default: None ) –

    Name of the revision this assumption belongs to. Required for single insert. By default None.

  • availability_category_name

    (str | None, default: None ) –

    Availability category name. Required for single insert. By default None.

  • description

    (str | None, default: None ) –

    Human-readable description of the assumption. Required for single insert. By default None.

  • applicable_from

    (date | None, default: None ) –

    Start date of the applicable period (inclusive). Required for single insert. By default None.

  • applicable_to

    (date | None, default: None ) –

    End date of the applicable period (inclusive, must be >= applicable_from). Required for single insert. By default None.

  • expected_downtime_hours

    (float | None, default: None ) –

    Total expected lost hours. Mutually exclusive with availability_impact. By default None.

  • availability_impact

    (float | None, default: None ) –

    Fractional availability loss (0.0-1.0). Mutually exclusive with expected_downtime_hours. By default None.

  • user_name

    (str | None, default: None ) –

    Name of the user creating the assumption. Required for single insert. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    DataFrame for batch insert. Required columns: revision_name, availability_category_name, description, applicable_from, applicable_to, user_name. Exactly one of expected_downtime_hours or availability_impact per row. When provided, individual parameters are ignored. By default None.

  • on_conflict

    (Literal['raise', 'ignore'], default: 'raise' ) –

    Behavior on unexpected conflict. By default "raise".

Returns:

  • int | list[int] | None

    ID(s) of the inserted assumption(s).

Source code in echo_postgres/kpi_availability_forecast_assumptions.py
Python
@validate_call
def insert(
    self,
    revision_name: str | None = None,
    availability_category_name: str | None = None,
    description: str | None = None,
    applicable_from: date | None = None,
    applicable_to: date | None = None,
    expected_downtime_hours: float | None = None,
    availability_impact: float | None = None,
    user_name: str | None = None,
    data_df: pl.DataFrame | None = None,
    on_conflict: Literal["raise", "ignore"] = "raise",
) -> int | list[int] | None:
    """Inserts one or more availability forecast assumptions.

    You can pass individual values to insert a single assumption, or a DataFrame for batch insert.

    Exactly one of ``expected_downtime_hours`` or ``availability_impact`` must be provided per row.

    Parameters
    ----------
    revision_name : str | None, optional
        Name of the revision this assumption belongs to. Required for single insert. By default None.
    availability_category_name : str | None, optional
        Availability category name. Required for single insert. By default None.
    description : str | None, optional
        Human-readable description of the assumption. Required for single insert. By default None.
    applicable_from : date | None, optional
        Start date of the applicable period (inclusive). Required for single insert. By default None.
    applicable_to : date | None, optional
        End date of the applicable period (inclusive, must be >= applicable_from). Required for single insert. By default None.
    expected_downtime_hours : float | None, optional
        Total expected lost hours. Mutually exclusive with availability_impact. By default None.
    availability_impact : float | None, optional
        Fractional availability loss (0.0-1.0). Mutually exclusive with expected_downtime_hours. By default None.
    user_name : str | None, optional
        Name of the user creating the assumption. Required for single insert. By default None.
    data_df : pl.DataFrame | None, optional
        DataFrame for batch insert. Required columns: revision_name, availability_category_name,
        description, applicable_from, applicable_to, user_name. Exactly one of
        expected_downtime_hours or availability_impact per row. When provided, individual
        parameters are ignored. By default None.
    on_conflict : Literal["raise", "ignore"], optional
        Behavior on unexpected conflict. By default "raise".

    Returns
    -------
    int | list[int] | None
        ID(s) of the inserted assumption(s).
    """
    df_schema = {
        "revision_name": pl.Utf8,
        "availability_category_name": pl.Utf8,
        "description": pl.Utf8,
        "applicable_from": pl.Date,
        "applicable_to": pl.Date,
        "expected_downtime_hours": pl.Float64,
        "availability_impact": pl.Float64,
        "user_name": pl.Utf8,
    }

    if data_df is None:
        single_insert = True
        data_df = pl.DataFrame(
            {
                "revision_name": [revision_name],
                "availability_category_name": [availability_category_name],
                "description": [description],
                "applicable_from": [applicable_from],
                "applicable_to": [applicable_to],
                "expected_downtime_hours": [expected_downtime_hours],
                "availability_impact": [availability_impact],
                "user_name": [user_name],
            },
            schema=df_schema,
        )
    else:
        single_insert = False
        data_df = data_df.cast({c: t for c, t in df_schema.items() if c in data_df.columns})

    required_cols = ["revision_name", "availability_category_name", "description", "applicable_from", "applicable_to", "user_name"]
    for col in required_cols:
        if col not in data_df.columns:
            raise ValueError(f"data_df is missing required column '{col}'.")
        if data_df[col].is_null().any():
            raise ValueError(f"Column '{col}' contains null values but is required.")

    # validate mutually exclusive columns
    has_hours = pl.col("expected_downtime_hours").is_not_null() if "expected_downtime_hours" in data_df.columns else pl.lit(value=False)
    has_impact = pl.col("availability_impact").is_not_null() if "availability_impact" in data_df.columns else pl.lit(value=False)
    invalid = data_df.filter(~(has_hours ^ has_impact))
    if len(invalid) > 0:
        raise ValueError(
            "Exactly one of 'expected_downtime_hours' or 'availability_impact' must be provided per row (they are mutually exclusive).",
        )

    # resolve revision_name → revision_id
    rev_names = data_df["revision_name"].unique().to_list()
    rev_ids = self._perfdb.kpis.availability.forecasts.revisions.get_ids(names=rev_names)
    if missing := set(rev_names) - set(rev_ids.keys()):
        raise ValueError(f"Revisions not found: {missing}")
    data_df = data_df.with_columns(pl.col("revision_name").replace_strict(rev_ids, return_dtype=pl.Int64).alias("revision_id"))
    data_df = data_df.drop("revision_name")

    # resolve availability_category_name → availability_category_id
    cat_names = data_df["availability_category_name"].unique().to_list()
    cat_ids = self._perfdb.kpis.availability.categories.get_ids()
    cat_ids = {k: v for k, v in cat_ids.items() if k in cat_names}
    if missing := set(cat_names) - set(cat_ids.keys()):
        raise ValueError(f"Availability categories not found: {missing}")
    data_df = data_df.with_columns(
        pl.col("availability_category_name").replace_strict(cat_ids, return_dtype=pl.Int64).alias("availability_category_id"),
    )
    data_df = data_df.drop("availability_category_name")

    # resolve user_name → user_id
    user_names = data_df["user_name"].unique().to_list()
    user_ids = self._perfdb.users.instances.get_ids(names=user_names)
    if missing := set(user_names) - set(user_ids.keys()):
        raise ValueError(f"Users not found: {missing}")
    data_df = data_df.with_columns(pl.col("user_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("user_id"))
    data_df = data_df.drop("user_name")

    if_exists = "append" if on_conflict == "ignore" else "skip_row_check"

    ids_df = self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="availability_forecast_assumptions",
        schema="performance",
        return_cols=["id"],
        if_exists=if_exists,
    )

    ids = ids_df["id"].to_list()
    logger.debug(f"Inserted {len(ids)} availability forecast assumption(s): {ids}")
    return ids[0] if single_insert else ids

update(assumption_id=None, description=None, applicable_from=None, applicable_to=None, expected_downtime_hours=None, availability_impact=None, user_name=None, data_df=None)

Updates one or more availability forecast assumptions.

You can pass individual values to update a single assumption, or a DataFrame for batch update.

Parameters:

  • assumption_id

    (int | None, default: None ) –

    ID of the assumption to update. Required for single update. By default None.

  • description

    (str | None, default: None ) –

    New description. By default None.

  • applicable_from

    (date | None, default: None ) –

    New applicable_from date. By default None.

  • applicable_to

    (date | None, default: None ) –

    New applicable_to date. By default None.

  • expected_downtime_hours

    (float | None, default: None ) –

    New expected downtime hours. By default None.

  • availability_impact

    (float | None, default: None ) –

    New availability impact (0.0-1.0). By default None.

  • user_name

    (str | None, default: None ) –

    Name of the user performing the update. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    DataFrame for batch update. Required column: id. Optional: description, applicable_from, applicable_to, expected_downtime_hours, availability_impact, user_name. When provided, individual parameters are ignored. By default None.

Source code in echo_postgres/kpi_availability_forecast_assumptions.py
Python
@validate_call
def update(
    self,
    assumption_id: int | None = None,
    description: str | None = None,
    applicable_from: date | None = None,
    applicable_to: date | None = None,
    expected_downtime_hours: float | None = None,
    availability_impact: float | None = None,
    user_name: str | None = None,
    data_df: pl.DataFrame | None = None,
) -> None:
    """Updates one or more availability forecast assumptions.

    You can pass individual values to update a single assumption, or a DataFrame for batch update.

    Parameters
    ----------
    assumption_id : int | None, optional
        ID of the assumption to update. Required for single update. By default None.
    description : str | None, optional
        New description. By default None.
    applicable_from : date | None, optional
        New applicable_from date. By default None.
    applicable_to : date | None, optional
        New applicable_to date. By default None.
    expected_downtime_hours : float | None, optional
        New expected downtime hours. By default None.
    availability_impact : float | None, optional
        New availability impact (0.0-1.0). By default None.
    user_name : str | None, optional
        Name of the user performing the update. By default None.
    data_df : pl.DataFrame | None, optional
        DataFrame for batch update. Required column: id. Optional: description, applicable_from,
        applicable_to, expected_downtime_hours, availability_impact, user_name. When provided,
        individual parameters are ignored. By default None.
    """
    df_schema = {
        "id": pl.Int64,
        "description": pl.Utf8,
        "applicable_from": pl.Date,
        "applicable_to": pl.Date,
        "expected_downtime_hours": pl.Float64,
        "availability_impact": pl.Float64,
        "user_name": pl.Utf8,
    }

    if data_df is None:
        single_update = True
        data_df = pl.DataFrame(
            {
                "id": [assumption_id],
                "description": [description],
                "applicable_from": [applicable_from],
                "applicable_to": [applicable_to],
                "expected_downtime_hours": [expected_downtime_hours],
                "availability_impact": [availability_impact],
                "user_name": [user_name],
            },
            schema=df_schema,
        )
    else:
        single_update = False
        data_df = data_df.cast({c: t for c, t in df_schema.items() if c in data_df.columns})

    if "id" not in data_df.columns or data_df["id"].is_null().any():
        raise ValueError("'id' column is required and cannot contain nulls.")

    # verify IDs exist
    existing = self._perfdb.conn.read_to_polars(
        sql.SQL("SELECT id FROM performance.availability_forecast_assumptions WHERE id = ANY({ids})").format(
            ids=sql.Literal(data_df["id"].to_list()),
        ),
        schema_overrides={"id": pl.Int64},
    )
    if missing := set(data_df["id"].to_list()) - set(existing["id"].to_list()):
        raise ValueError(f"Assumption IDs not found: {missing}")

    # resolve user_name → user_id if provided
    if "user_name" in data_df.columns:
        user_names = data_df["user_name"].drop_nulls().unique().to_list()
        if user_names:
            user_ids = self._perfdb.users.instances.get_ids(names=user_names)
            if missing := set(user_names) - set(user_ids.keys()):
                raise ValueError(f"Users not found: {missing}")
            data_df = data_df.with_columns(
                pl.col("user_name").replace_strict(user_ids, return_dtype=pl.Int64, default=None).alias("user_id"),
            )
        data_df = data_df.drop("user_name")

    self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="availability_forecast_assumptions",
        schema="performance",
        conflict_cols=["id"],
        if_exists="update_only",
        ignore_null_cols=single_update,
    )
    logger.debug(f"Updated {len(data_df)} availability forecast assumption(s).")