KPI Availability Forecast Assumptions¶
KpiAvailabilityForecastAssumptions(perfdb)
¶
Class used for handling availability forecast assumptions. Can be accessed via perfdb.kpis.availability.forecasts.assumptions.
Parameters:
Source code in echo_postgres/kpi_availability_forecast_assumptions.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Class used for handling availability forecast assumptions. Can be accessed via `perfdb.kpis.availability.forecasts.assumptions`.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
super().__init__(perfdb)
from .kpi_availability_forecast_assumption_assets import KpiAvailabilityForecastAssumptionAssets
self.assets = KpiAvailabilityForecastAssumptionAssets(perfdb)
delete(ids)
¶
Deletes availability forecast assumptions by ID.
Note: linked assets (in availability_forecast_assumption_assets) are cascade-deleted.
Parameters:
-
(ids¶list[int]) –List of assumption IDs to delete.
Returns:
-
int–Number of rows deleted.
Source code in echo_postgres/kpi_availability_forecast_assumptions.py
@validate_call
def delete(self, ids: list[int]) -> int:
"""Deletes availability forecast assumptions by ID.
Note: linked assets (in availability_forecast_assumption_assets) are cascade-deleted.
Parameters
----------
ids : list[int]
List of assumption IDs to delete.
Returns
-------
int
Number of rows deleted.
"""
query = sql.SQL("DELETE FROM performance.availability_forecast_assumptions WHERE id = ANY({ids})").format(
ids=sql.Literal(ids),
)
self._perfdb.conn.execute(query)
deleted = self._perfdb.conn.rowcount
logger.debug(f"Deleted {deleted} availability forecast assumption(s).")
return deleted
get(ids=None, revision_names=None, availability_categories=None, period=None, filter_type='and', output_type='pl.DataFrame')
¶
Gets availability forecast assumptions with full details.
The most useful keys/columns returned are:
- id
- revision_name
- availability_category_name
- description
- applicable_from
- applicable_to
- period_days
- expected_downtime_hours
- availability_impact
- number_of_assets
- assets_involved
Parameters:
-
(ids¶list[int] | None, default:None) –List of assumption IDs to filter. By default None.
-
(revision_names¶list[str] | None, default:None) –List of revision names to filter. By default None.
-
(availability_categories¶list[str] | None, default:None) –List of availability category names to filter. By default None.
-
(period¶DateTimeRange | None, default:None) –Date range; returns assumptions whose applicable period overlaps with this range. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
-
(output_type¶Literal['dict', 'DataFrame', 'pl.DataFrame'], default:'pl.DataFrame') –Output type. By default "pl.DataFrame".
Returns:
-
dict[int, dict[str, Any]]–In case output_type is "dict", returns {id: {col: val, ...}, ...}.
-
DataFrame–In case output_type is "DataFrame", returns a pandas DataFrame indexed by id.
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame.
Source code in echo_postgres/kpi_availability_forecast_assumptions.py
@validate_call
def get(
self,
ids: list[int] | None = None,
revision_names: list[str] | None = None,
availability_categories: list[str] | None = None,
period: DateTimeRange | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
"""Gets availability forecast assumptions with full details.
The most useful keys/columns returned are:
- id
- revision_name
- availability_category_name
- description
- applicable_from
- applicable_to
- period_days
- expected_downtime_hours
- availability_impact
- number_of_assets
- assets_involved
Parameters
----------
ids : list[int] | None, optional
List of assumption IDs to filter. By default None.
revision_names : list[str] | None, optional
List of revision names to filter. By default None.
availability_categories : list[str] | None, optional
List of availability category names to filter. By default None.
period : DateTimeRange | None, optional
Date range; returns assumptions whose applicable period overlaps with this range. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
Output type. By default "pl.DataFrame".
Returns
-------
dict[int, dict[str, Any]]
In case output_type is "dict", returns {id: {col: val, ...}, ...}.
pd.DataFrame
In case output_type is "DataFrame", returns a pandas DataFrame indexed by id.
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame.
"""
where = self._check_get_args(ids, revision_names, availability_categories, period, filter_type)
query = sql.SQL(
"SELECT * FROM performance.v_availability_forecast_assumptions {where} ORDER BY revision_name, applicable_from",
).format(where=where)
df = self._perfdb.conn.read_to_polars(query, schema_overrides=self._cols_schema)
return convert_output(df, output_type, index_col="id")
get_defaults(years=None, months=None, object_names=None, revision_names=None, filter_type='and', output_type='pl.DataFrame')
¶
Gets the per-asset / per-month resolution of the assumptions that are the default for each (asset, year, month) according to availability_forecast_revision_assignments.
Backed by performance.v_availability_forecast_defaults_assumptions. Emits one row per
(asset, year, month, assumption) where the asset's revision assignment for that month points
to the assumption's revision and the assumption's applicable period overlaps the month.
asset_hours is capped at days_in_slice * 24 and availability_impact is the slice-relative
fraction (0..1); month_availability_impact is the calendar-month-relative fraction (0..1).
Parameters:
-
(years¶list[int] | None, default:None) –List of years to filter. By default None.
-
(months¶list[int] | None, default:None) –List of months (1-12) to filter. By default None.
-
(object_names¶list[str] | None, default:None) –List of object names to filter. By default None.
-
(revision_names¶list[str] | None, default:None) –List of revision names to filter. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
-
(output_type¶Literal['dict', 'DataFrame', 'pl.DataFrame'], default:'pl.DataFrame') –Output type. By default "pl.DataFrame".
Returns:
-
dict[Any, dict[str, Any]]–In case output_type is "dict", returns {(object_name, year, month, record_id): {col: val, ...}, ...}.
-
DataFrame–In case output_type is "DataFrame", returns a pandas DataFrame with MultiIndex (object_name, year, month, record_id).
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame.
Source code in echo_postgres/kpi_availability_forecast_assumptions.py
@validate_call
def get_defaults(
self,
years: list[int] | None = None,
months: list[int] | None = None,
object_names: list[str] | None = None,
revision_names: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[Any, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
"""Gets the per-asset / per-month resolution of the assumptions that are the default for each
(asset, year, month) according to availability_forecast_revision_assignments.
Backed by ``performance.v_availability_forecast_defaults_assumptions``. Emits one row per
(asset, year, month, assumption) where the asset's revision assignment for that month points
to the assumption's revision and the assumption's applicable period overlaps the month.
``asset_hours`` is capped at ``days_in_slice * 24`` and ``availability_impact`` is the slice-relative
fraction (0..1); ``month_availability_impact`` is the calendar-month-relative fraction (0..1).
Parameters
----------
years : list[int] | None, optional
List of years to filter. By default None.
months : list[int] | None, optional
List of months (1-12) to filter. By default None.
object_names : list[str] | None, optional
List of object names to filter. By default None.
revision_names : list[str] | None, optional
List of revision names to filter. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
Output type. By default "pl.DataFrame".
Returns
-------
dict[Any, dict[str, Any]]
In case output_type is "dict", returns {(object_name, year, month, record_id): {col: val, ...}, ...}.
pd.DataFrame
In case output_type is "DataFrame", returns a pandas DataFrame with MultiIndex
(object_name, year, month, record_id).
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame.
"""
where = (
WhereClauseBuilder(filter_type=filter_type)
.add_any("year", years)
.add_any("month", months)
.add_any("object_name", object_names)
.add_any("revision_name", revision_names)
.build()
)
query = sql.SQL(
"SELECT * FROM performance.v_availability_forecast_defaults_assumptions {where} "
"ORDER BY object_name, year, month, record_id",
).format(where=where)
df = self._perfdb.conn.read_to_polars(query, schema_overrides=self._defaults_cols_schema)
return convert_output(df, output_type, index_col=["object_name", "year", "month", "record_id"])
get_ids(revision_names=None, availability_categories=None, period=None, filter_type='and')
¶
Gets IDs of availability forecast assumptions.
Parameters:
-
(revision_names¶list[str] | None, default:None) –List of revision names to filter. By default None.
-
(availability_categories¶list[str] | None, default:None) –List of availability category names to filter. By default None.
-
(period¶DateTimeRange | None, default:None) –Date range; returns assumptions whose applicable period overlaps with this range. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
Returns:
-
list[int]–List of assumption IDs.
Source code in echo_postgres/kpi_availability_forecast_assumptions.py
@validate_call
def get_ids(
self,
revision_names: list[str] | None = None,
availability_categories: list[str] | None = None,
period: DateTimeRange | None = None,
filter_type: Literal["and", "or"] = "and",
) -> list[int]:
"""Gets IDs of availability forecast assumptions.
Parameters
----------
revision_names : list[str] | None, optional
List of revision names to filter. By default None.
availability_categories : list[str] | None, optional
List of availability category names to filter. By default None.
period : DateTimeRange | None, optional
Date range; returns assumptions whose applicable period overlaps with this range. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
Returns
-------
list[int]
List of assumption IDs.
"""
where = self._check_get_args(None, revision_names, availability_categories, period, filter_type)
query = sql.SQL("SELECT id FROM performance.v_availability_forecast_assumptions {where} ORDER BY id").format(where=where)
df = self._perfdb.conn.read_to_polars(query, schema_overrides=self._cols_schema)
return df["id"].to_list()
insert(revision_name=None, availability_category_name=None, description=None, applicable_from=None, applicable_to=None, expected_downtime_hours=None, availability_impact=None, user_name=None, data_df=None, on_conflict='raise')
¶
Inserts one or more availability forecast assumptions.
You can pass individual values to insert a single assumption, or a DataFrame for batch insert.
Exactly one of expected_downtime_hours or availability_impact must be provided per row.
Parameters:
-
(revision_name¶str | None, default:None) –Name of the revision this assumption belongs to. Required for single insert. By default None.
-
(availability_category_name¶str | None, default:None) –Availability category name. Required for single insert. By default None.
-
(description¶str | None, default:None) –Human-readable description of the assumption. Required for single insert. By default None.
-
(applicable_from¶date | None, default:None) –Start date of the applicable period (inclusive). Required for single insert. By default None.
-
(applicable_to¶date | None, default:None) –End date of the applicable period (inclusive, must be >= applicable_from). Required for single insert. By default None.
-
(expected_downtime_hours¶float | None, default:None) –Total expected lost hours. Mutually exclusive with availability_impact. By default None.
-
(availability_impact¶float | None, default:None) –Fractional availability loss (0.0-1.0). Mutually exclusive with expected_downtime_hours. By default None.
-
(user_name¶str | None, default:None) –Name of the user creating the assumption. Required for single insert. By default None.
-
(data_df¶DataFrame | None, default:None) –DataFrame for batch insert. Required columns: revision_name, availability_category_name, description, applicable_from, applicable_to, user_name. Exactly one of expected_downtime_hours or availability_impact per row. When provided, individual parameters are ignored. By default None.
-
(on_conflict¶Literal['raise', 'ignore'], default:'raise') –Behavior on unexpected conflict. By default "raise".
Returns:
-
int | list[int] | None–ID(s) of the inserted assumption(s).
Source code in echo_postgres/kpi_availability_forecast_assumptions.py
@validate_call
def insert(
self,
revision_name: str | None = None,
availability_category_name: str | None = None,
description: str | None = None,
applicable_from: date | None = None,
applicable_to: date | None = None,
expected_downtime_hours: float | None = None,
availability_impact: float | None = None,
user_name: str | None = None,
data_df: pl.DataFrame | None = None,
on_conflict: Literal["raise", "ignore"] = "raise",
) -> int | list[int] | None:
"""Inserts one or more availability forecast assumptions.
You can pass individual values to insert a single assumption, or a DataFrame for batch insert.
Exactly one of ``expected_downtime_hours`` or ``availability_impact`` must be provided per row.
Parameters
----------
revision_name : str | None, optional
Name of the revision this assumption belongs to. Required for single insert. By default None.
availability_category_name : str | None, optional
Availability category name. Required for single insert. By default None.
description : str | None, optional
Human-readable description of the assumption. Required for single insert. By default None.
applicable_from : date | None, optional
Start date of the applicable period (inclusive). Required for single insert. By default None.
applicable_to : date | None, optional
End date of the applicable period (inclusive, must be >= applicable_from). Required for single insert. By default None.
expected_downtime_hours : float | None, optional
Total expected lost hours. Mutually exclusive with availability_impact. By default None.
availability_impact : float | None, optional
Fractional availability loss (0.0-1.0). Mutually exclusive with expected_downtime_hours. By default None.
user_name : str | None, optional
Name of the user creating the assumption. Required for single insert. By default None.
data_df : pl.DataFrame | None, optional
DataFrame for batch insert. Required columns: revision_name, availability_category_name,
description, applicable_from, applicable_to, user_name. Exactly one of
expected_downtime_hours or availability_impact per row. When provided, individual
parameters are ignored. By default None.
on_conflict : Literal["raise", "ignore"], optional
Behavior on unexpected conflict. By default "raise".
Returns
-------
int | list[int] | None
ID(s) of the inserted assumption(s).
"""
df_schema = {
"revision_name": pl.Utf8,
"availability_category_name": pl.Utf8,
"description": pl.Utf8,
"applicable_from": pl.Date,
"applicable_to": pl.Date,
"expected_downtime_hours": pl.Float64,
"availability_impact": pl.Float64,
"user_name": pl.Utf8,
}
if data_df is None:
single_insert = True
data_df = pl.DataFrame(
{
"revision_name": [revision_name],
"availability_category_name": [availability_category_name],
"description": [description],
"applicable_from": [applicable_from],
"applicable_to": [applicable_to],
"expected_downtime_hours": [expected_downtime_hours],
"availability_impact": [availability_impact],
"user_name": [user_name],
},
schema=df_schema,
)
else:
single_insert = False
data_df = data_df.cast({c: t for c, t in df_schema.items() if c in data_df.columns})
required_cols = ["revision_name", "availability_category_name", "description", "applicable_from", "applicable_to", "user_name"]
for col in required_cols:
if col not in data_df.columns:
raise ValueError(f"data_df is missing required column '{col}'.")
if data_df[col].is_null().any():
raise ValueError(f"Column '{col}' contains null values but is required.")
# validate mutually exclusive columns
has_hours = pl.col("expected_downtime_hours").is_not_null() if "expected_downtime_hours" in data_df.columns else pl.lit(value=False)
has_impact = pl.col("availability_impact").is_not_null() if "availability_impact" in data_df.columns else pl.lit(value=False)
invalid = data_df.filter(~(has_hours ^ has_impact))
if len(invalid) > 0:
raise ValueError(
"Exactly one of 'expected_downtime_hours' or 'availability_impact' must be provided per row (they are mutually exclusive).",
)
# resolve revision_name → revision_id
rev_names = data_df["revision_name"].unique().to_list()
rev_ids = self._perfdb.kpis.availability.forecasts.revisions.get_ids(names=rev_names)
if missing := set(rev_names) - set(rev_ids.keys()):
raise ValueError(f"Revisions not found: {missing}")
data_df = data_df.with_columns(pl.col("revision_name").replace_strict(rev_ids, return_dtype=pl.Int64).alias("revision_id"))
data_df = data_df.drop("revision_name")
# resolve availability_category_name → availability_category_id
cat_names = data_df["availability_category_name"].unique().to_list()
cat_ids = self._perfdb.kpis.availability.categories.get_ids()
cat_ids = {k: v for k, v in cat_ids.items() if k in cat_names}
if missing := set(cat_names) - set(cat_ids.keys()):
raise ValueError(f"Availability categories not found: {missing}")
data_df = data_df.with_columns(
pl.col("availability_category_name").replace_strict(cat_ids, return_dtype=pl.Int64).alias("availability_category_id"),
)
data_df = data_df.drop("availability_category_name")
# resolve user_name → user_id
user_names = data_df["user_name"].unique().to_list()
user_ids = self._perfdb.users.instances.get_ids(names=user_names)
if missing := set(user_names) - set(user_ids.keys()):
raise ValueError(f"Users not found: {missing}")
data_df = data_df.with_columns(pl.col("user_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("user_id"))
data_df = data_df.drop("user_name")
if_exists = "append" if on_conflict == "ignore" else "skip_row_check"
ids_df = self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="availability_forecast_assumptions",
schema="performance",
return_cols=["id"],
if_exists=if_exists,
)
ids = ids_df["id"].to_list()
logger.debug(f"Inserted {len(ids)} availability forecast assumption(s): {ids}")
return ids[0] if single_insert else ids
update(assumption_id=None, description=None, applicable_from=None, applicable_to=None, expected_downtime_hours=None, availability_impact=None, user_name=None, data_df=None)
¶
Updates one or more availability forecast assumptions.
You can pass individual values to update a single assumption, or a DataFrame for batch update.
Parameters:
-
(assumption_id¶int | None, default:None) –ID of the assumption to update. Required for single update. By default None.
-
(description¶str | None, default:None) –New description. By default None.
-
(applicable_from¶date | None, default:None) –New applicable_from date. By default None.
-
(applicable_to¶date | None, default:None) –New applicable_to date. By default None.
-
(expected_downtime_hours¶float | None, default:None) –New expected downtime hours. By default None.
-
(availability_impact¶float | None, default:None) –New availability impact (0.0-1.0). By default None.
-
(user_name¶str | None, default:None) –Name of the user performing the update. By default None.
-
(data_df¶DataFrame | None, default:None) –DataFrame for batch update. Required column: id. Optional: description, applicable_from, applicable_to, expected_downtime_hours, availability_impact, user_name. When provided, individual parameters are ignored. By default None.
Source code in echo_postgres/kpi_availability_forecast_assumptions.py
@validate_call
def update(
self,
assumption_id: int | None = None,
description: str | None = None,
applicable_from: date | None = None,
applicable_to: date | None = None,
expected_downtime_hours: float | None = None,
availability_impact: float | None = None,
user_name: str | None = None,
data_df: pl.DataFrame | None = None,
) -> None:
"""Updates one or more availability forecast assumptions.
You can pass individual values to update a single assumption, or a DataFrame for batch update.
Parameters
----------
assumption_id : int | None, optional
ID of the assumption to update. Required for single update. By default None.
description : str | None, optional
New description. By default None.
applicable_from : date | None, optional
New applicable_from date. By default None.
applicable_to : date | None, optional
New applicable_to date. By default None.
expected_downtime_hours : float | None, optional
New expected downtime hours. By default None.
availability_impact : float | None, optional
New availability impact (0.0-1.0). By default None.
user_name : str | None, optional
Name of the user performing the update. By default None.
data_df : pl.DataFrame | None, optional
DataFrame for batch update. Required column: id. Optional: description, applicable_from,
applicable_to, expected_downtime_hours, availability_impact, user_name. When provided,
individual parameters are ignored. By default None.
"""
df_schema = {
"id": pl.Int64,
"description": pl.Utf8,
"applicable_from": pl.Date,
"applicable_to": pl.Date,
"expected_downtime_hours": pl.Float64,
"availability_impact": pl.Float64,
"user_name": pl.Utf8,
}
if data_df is None:
single_update = True
data_df = pl.DataFrame(
{
"id": [assumption_id],
"description": [description],
"applicable_from": [applicable_from],
"applicable_to": [applicable_to],
"expected_downtime_hours": [expected_downtime_hours],
"availability_impact": [availability_impact],
"user_name": [user_name],
},
schema=df_schema,
)
else:
single_update = False
data_df = data_df.cast({c: t for c, t in df_schema.items() if c in data_df.columns})
if "id" not in data_df.columns or data_df["id"].is_null().any():
raise ValueError("'id' column is required and cannot contain nulls.")
# verify IDs exist
existing = self._perfdb.conn.read_to_polars(
sql.SQL("SELECT id FROM performance.availability_forecast_assumptions WHERE id = ANY({ids})").format(
ids=sql.Literal(data_df["id"].to_list()),
),
schema_overrides={"id": pl.Int64},
)
if missing := set(data_df["id"].to_list()) - set(existing["id"].to_list()):
raise ValueError(f"Assumption IDs not found: {missing}")
# resolve user_name → user_id if provided
if "user_name" in data_df.columns:
user_names = data_df["user_name"].drop_nulls().unique().to_list()
if user_names:
user_ids = self._perfdb.users.instances.get_ids(names=user_names)
if missing := set(user_names) - set(user_ids.keys()):
raise ValueError(f"Users not found: {missing}")
data_df = data_df.with_columns(
pl.col("user_name").replace_strict(user_ids, return_dtype=pl.Int64, default=None).alias("user_id"),
)
data_df = data_df.drop("user_name")
self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="availability_forecast_assumptions",
schema="performance",
conflict_cols=["id"],
if_exists="update_only",
ignore_null_cols=single_update,
)
logger.debug(f"Updated {len(data_df)} availability forecast assumption(s).")