KPI Availability Forecast Revision Assignments¶
KpiAvailabilityForecastRevisionAssignments(perfdb)
¶
Class used for handling availability forecast revision assignments. Can be accessed via perfdb.kpis.availability.forecasts.revisions.assignments.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(object_names=None, years=None, months=None)
¶
Deletes revision assignments matching the provided filters.
At least one filter must be provided.
Parameters:
-
(object_names¶list[str] | None, default:None) –List of object names to filter. By default None.
-
(years¶list[int] | None, default:None) –List of years to filter. By default None.
-
(months¶list[int] | None, default:None) –List of months to filter. By default None.
Returns:
-
int–Number of rows deleted.
Source code in echo_postgres/kpi_availability_forecast_revision_assignments.py
@validate_call
def delete(
self,
object_names: list[str] | None = None,
years: list[int] | None = None,
months: list[int] | None = None,
) -> int:
"""Deletes revision assignments matching the provided filters.
At least one filter must be provided.
Parameters
----------
object_names : list[str] | None, optional
List of object names to filter. By default None.
years : list[int] | None, optional
List of years to filter. By default None.
months : list[int] | None, optional
List of months to filter. By default None.
Returns
-------
int
Number of rows deleted.
"""
if object_names is None and years is None and months is None:
raise ValueError("At least one of 'object_names', 'years', or 'months' must be provided.")
conditions = []
if object_names:
obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
if missing := set(object_names) - set(obj_ids.keys()):
raise ValueError(f"Objects not found: {missing}")
conditions.append(sql.SQL("object_id = ANY({ids})").format(ids=sql.Literal(list(obj_ids.values()))))
if years:
conditions.append(sql.SQL("year = ANY({years})").format(years=sql.Literal(years)))
if months:
conditions.append(sql.SQL("month = ANY({months})").format(months=sql.Literal(months)))
query = sql.SQL("DELETE FROM performance.availability_forecast_revision_assignments WHERE ") + sql.SQL(" AND ").join(conditions)
self._perfdb.conn.execute(query)
deleted = self._perfdb.conn.rowcount
logger.debug(f"Deleted {deleted} revision assignment(s).")
return deleted
get(object_names=None, years=None, months=None, revision_names=None, filter_type='and', output_type='pl.DataFrame')
¶
Gets availability forecast revision assignments.
The most useful keys/columns returned are:
- object_name
- year
- month
- revision_name
- user_name
- modified_date
Parameters:
-
(object_names¶list[str] | None, default:None) –List of object names to filter. By default None.
-
(years¶list[int] | None, default:None) –List of years to filter. By default None.
-
(months¶list[int] | None, default:None) –List of months (1-12) to filter. By default None.
-
(revision_names¶list[str] | None, default:None) –List of revision names to filter. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
-
(output_type¶Literal['dict', 'DataFrame', 'pl.DataFrame'], default:'pl.DataFrame') –Output type. By default "pl.DataFrame".
Returns:
-
dict[Any, dict[str, Any]]–In case output_type is "dict", returns {(object_name, year, month): {col: val, ...}, ...}.
-
DataFrame–In case output_type is "DataFrame", returns a pandas DataFrame with MultiIndex (object_name, year, month).
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame.
Source code in echo_postgres/kpi_availability_forecast_revision_assignments.py
@validate_call
def get(
self,
object_names: list[str] | None = None,
years: list[int] | None = None,
months: list[int] | None = None,
revision_names: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[Any, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
"""Gets availability forecast revision assignments.
The most useful keys/columns returned are:
- object_name
- year
- month
- revision_name
- user_name
- modified_date
Parameters
----------
object_names : list[str] | None, optional
List of object names to filter. By default None.
years : list[int] | None, optional
List of years to filter. By default None.
months : list[int] | None, optional
List of months (1-12) to filter. By default None.
revision_names : list[str] | None, optional
List of revision names to filter. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
Output type. By default "pl.DataFrame".
Returns
-------
dict[Any, dict[str, Any]]
In case output_type is "dict", returns {(object_name, year, month): {col: val, ...}, ...}.
pd.DataFrame
In case output_type is "DataFrame", returns a pandas DataFrame with MultiIndex (object_name, year, month).
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame.
"""
where = (
WhereClauseBuilder(filter_type=filter_type)
.add_any("object_name", object_names)
.add_any("year", years)
.add_any("month", months)
.add_any("revision_name", revision_names)
.build()
)
query = sql.SQL(
"SELECT * FROM performance.v_availability_forecast_revision_assignments {where} ORDER BY object_name, year, month",
).format(where=where)
df = self._perfdb.conn.read_to_polars(query, schema_overrides=self._cols_schema)
return convert_output(df, output_type, index_col=["object_name", "year", "month"])
insert(object_name=None, year=None, month=None, revision_name=None, user_name=None, data_df=None, on_conflict='raise')
¶
Inserts one or more revision assignments (asset x year x month -> revision).
You can pass individual values to insert a single assignment, or a DataFrame for batch insert.
Parameters:
-
(object_name¶str | None, default:None) –Name of the asset. Required for single insert. By default None.
-
(year¶int | None, default:None) –Calendar year (> 2000). Required for single insert. By default None.
-
(month¶int | None, default:None) –Calendar month (1-12). Required for single insert. By default None.
-
(revision_name¶str | None, default:None) –Name of the revision to assign. Required for single insert. By default None.
-
(user_name¶str | None, default:None) –Name of the user creating the assignment. Required for single insert. By default None.
-
(data_df¶DataFrame | None, default:None) –DataFrame for batch insert. Required columns: object_name, year, month, revision_name, user_name. When provided, individual parameters are ignored. By default None.
-
(on_conflict¶Literal['raise', 'ignore', 'update'], default:'raise') –Behavior on (object_id, year, month) conflict. By default "raise".
Source code in echo_postgres/kpi_availability_forecast_revision_assignments.py
@validate_call
def insert(
self,
object_name: str | None = None,
year: int | None = None,
month: int | None = None,
revision_name: str | None = None,
user_name: str | None = None,
data_df: pl.DataFrame | None = None,
on_conflict: Literal["raise", "ignore", "update"] = "raise",
) -> None:
"""Inserts one or more revision assignments (asset x year x month -> revision).
You can pass individual values to insert a single assignment, or a DataFrame for batch insert.
Parameters
----------
object_name : str | None, optional
Name of the asset. Required for single insert. By default None.
year : int | None, optional
Calendar year (> 2000). Required for single insert. By default None.
month : int | None, optional
Calendar month (1-12). Required for single insert. By default None.
revision_name : str | None, optional
Name of the revision to assign. Required for single insert. By default None.
user_name : str | None, optional
Name of the user creating the assignment. Required for single insert. By default None.
data_df : pl.DataFrame | None, optional
DataFrame for batch insert. Required columns: object_name, year, month, revision_name, user_name.
When provided, individual parameters are ignored. By default None.
on_conflict : Literal["raise", "ignore", "update"], optional
Behavior on (object_id, year, month) conflict. By default "raise".
"""
df_schema = {
"object_name": pl.Utf8,
"year": pl.Int16,
"month": pl.Int16,
"revision_name": pl.Utf8,
"user_name": pl.Utf8,
}
if data_df is None:
single_insert = True
data_df = pl.DataFrame(
{
"object_name": [object_name],
"year": [year],
"month": [month],
"revision_name": [revision_name],
"user_name": [user_name],
},
schema=df_schema,
)
else:
single_insert = False
data_df = data_df.cast({c: t for c, t in df_schema.items() if c in data_df.columns})
required_cols = ["object_name", "year", "month", "revision_name", "user_name"]
for col in required_cols:
if col not in data_df.columns:
raise ValueError(f"data_df is missing required column '{col}'.")
if data_df[col].is_null().any():
raise ValueError(f"Column '{col}' contains null values but is required.")
# resolve object_name → object_id
obj_names = data_df["object_name"].unique().to_list()
obj_ids = self._perfdb.objects.instances.get_ids(object_names=obj_names)
if missing := set(obj_names) - set(obj_ids.keys()):
raise ValueError(f"Objects not found: {missing}")
data_df = data_df.with_columns(pl.col("object_name").replace_strict(obj_ids, return_dtype=pl.Int64).alias("object_id"))
data_df = data_df.drop("object_name")
# resolve revision_name → revision_id
rev_names = data_df["revision_name"].unique().to_list()
rev_ids = self._perfdb.kpis.availability.forecasts.revisions.get_ids(names=rev_names)
if missing := set(rev_names) - set(rev_ids.keys()):
raise ValueError(f"Revisions not found: {missing}")
data_df = data_df.with_columns(pl.col("revision_name").replace_strict(rev_ids, return_dtype=pl.Int64).alias("revision_id"))
data_df = data_df.drop("revision_name")
# resolve user_name → user_id
user_names = data_df["user_name"].unique().to_list()
user_ids = self._perfdb.users.instances.get_ids(names=user_names)
if missing := set(user_names) - set(user_ids.keys()):
raise ValueError(f"Users not found: {missing}")
data_df = data_df.with_columns(pl.col("user_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("user_id"))
data_df = data_df.drop("user_name")
if_exists_map = {"raise": "skip_row_check", "ignore": "append", "update": "update"}
conflict_cols = ["object_id", "year", "month"] if on_conflict != "raise" else None
self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="availability_forecast_revision_assignments",
schema="performance",
if_exists=if_exists_map[on_conflict],
conflict_cols=conflict_cols,
ignore_null_cols=single_insert,
)
logger.debug(f"Inserted {len(data_df)} revision assignment(s).")
update(object_name=None, year=None, month=None, revision_name=None, user_name=None, data_df=None)
¶
Updates one or more revision assignments.
You can pass individual values to update a single assignment, or a DataFrame for batch update.
Parameters:
-
(object_name¶str | None, default:None) –Object name identifying the assignment to update. Required for single update. By default None.
-
(year¶int | None, default:None) –Year identifying the assignment to update. Required for single update. By default None.
-
(month¶int | None, default:None) –Month identifying the assignment to update. Required for single update. By default None.
-
(revision_name¶str | None, default:None) –New revision name to assign. By default None.
-
(user_name¶str | None, default:None) –New user name. By default None.
-
(data_df¶DataFrame | None, default:None) –DataFrame for batch update. Required columns: object_name, year, month. Optional: revision_name, user_name. When provided, individual parameters are ignored. By default None.
Source code in echo_postgres/kpi_availability_forecast_revision_assignments.py
@validate_call
def update(
self,
object_name: str | None = None,
year: int | None = None,
month: int | None = None,
revision_name: str | None = None,
user_name: str | None = None,
data_df: pl.DataFrame | None = None,
) -> None:
"""Updates one or more revision assignments.
You can pass individual values to update a single assignment, or a DataFrame for batch update.
Parameters
----------
object_name : str | None, optional
Object name identifying the assignment to update. Required for single update. By default None.
year : int | None, optional
Year identifying the assignment to update. Required for single update. By default None.
month : int | None, optional
Month identifying the assignment to update. Required for single update. By default None.
revision_name : str | None, optional
New revision name to assign. By default None.
user_name : str | None, optional
New user name. By default None.
data_df : pl.DataFrame | None, optional
DataFrame for batch update. Required columns: object_name, year, month.
Optional: revision_name, user_name. When provided, individual parameters are ignored. By default None.
"""
df_schema = {
"object_name": pl.Utf8,
"year": pl.Int16,
"month": pl.Int16,
"revision_name": pl.Utf8,
"user_name": pl.Utf8,
}
if data_df is None:
single_update = True
data_df = pl.DataFrame(
{
"object_name": [object_name],
"year": [year],
"month": [month],
"revision_name": [revision_name],
"user_name": [user_name],
},
schema=df_schema,
)
else:
single_update = False
data_df = data_df.cast({c: t for c, t in df_schema.items() if c in data_df.columns})
for col in ["object_name", "year", "month"]:
if col not in data_df.columns or data_df[col].is_null().any():
raise ValueError(f"'{col}' is required and cannot contain nulls.")
# resolve object_name → object_id
obj_names = data_df["object_name"].unique().to_list()
obj_ids = self._perfdb.objects.instances.get_ids(object_names=obj_names)
if missing := set(obj_names) - set(obj_ids.keys()):
raise ValueError(f"Objects not found: {missing}")
data_df = data_df.with_columns(pl.col("object_name").replace_strict(obj_ids, return_dtype=pl.Int64).alias("object_id"))
data_df = data_df.drop("object_name")
# resolve revision_name → revision_id if provided
if "revision_name" in data_df.columns:
rev_names = data_df["revision_name"].drop_nulls().unique().to_list()
if rev_names:
rev_ids = self._perfdb.kpis.availability.forecasts.revisions.get_ids(names=rev_names)
if missing := set(rev_names) - set(rev_ids.keys()):
raise ValueError(f"Revisions not found: {missing}")
data_df = data_df.with_columns(
pl.col("revision_name").replace_strict(rev_ids, return_dtype=pl.Int64, default=None).alias("revision_id"),
)
data_df = data_df.drop("revision_name")
# resolve user_name → user_id if provided
if "user_name" in data_df.columns:
user_names = data_df["user_name"].drop_nulls().unique().to_list()
if user_names:
user_ids = self._perfdb.users.instances.get_ids(names=user_names)
if missing := set(user_names) - set(user_ids.keys()):
raise ValueError(f"Users not found: {missing}")
data_df = data_df.with_columns(
pl.col("user_name").replace_strict(user_ids, return_dtype=pl.Int64, default=None).alias("user_id"),
)
data_df = data_df.drop("user_name")
self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="availability_forecast_revision_assignments",
schema="performance",
conflict_cols=["object_id", "year", "month"],
if_exists="update_only",
ignore_null_cols=single_update,
)
logger.debug(f"Updated {len(data_df)} revision assignment(s).")