KPI Availability Targets¶
KpiAvailabilityTargets(perfdb)
¶
Class used for handling availability targets. Can be accessed via perfdb.kpis.availability.targets.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(period, object_names=None, availability_types=None)
¶
Deletes availability forecast values from the database.
Parameters:
-
(period¶DateTimeRange) –Period of time to delete the data.
-
(object_names¶list[str], default:None) –List of object names to delete data. If None will delete for all objects. By default None
-
(availability_types¶list[str], default:None) –List of availability types to delete data. If None will delete for all types. By default None
Source code in echo_postgres/kpi_availability_targets.py
@validate_call
def delete(
self,
period: DateTimeRange,
object_names: list[str] | None = None,
availability_types: list[str] | None = None,
) -> None:
"""Deletes availability forecast values from the database.
Parameters
----------
period : DateTimeRange
Period of time to delete the data.
object_names : list[str], optional
List of object names to delete data. If None will delete for all objects. By default None
availability_types : list[str], optional
List of availability types to delete data. If None will delete for all types. By default None
"""
# build the query
query = [
sql.SQL("DELETE FROM performance.availability_targets WHERE (date >= {start} AND date <= {end})").format(
start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
if object_names:
# getting object id
obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
if len(obj_ids) != len(object_names):
missing_objs = set(object_names) - set(obj_ids)
raise ValueError(f"Could not find the following objects: {missing_objs}")
query.append(sql.SQL(" AND object_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, obj_ids.values()))))
if availability_types:
# getting availability type id
cat_ids = self._perfdb.kpis.availability.types.get_ids()
cat_ids = {k: v for k, v in cat_ids.items() if k in availability_types}
if len(cat_ids) != len(availability_types):
missing_cats = set(availability_types) - set(cat_ids)
raise ValueError(f"Could not find the following availability types: {missing_cats}")
query.append(
sql.SQL(" AND availability_type_id IN ({ids})").format(
ids=sql.SQL(", ").join(map(sql.Literal, cat_ids.values())),
),
)
query = sql.Composed(query)
# deleting data
with self._perfdb.conn.reconnect() as conn:
# deleting
result = conn.execute(query)
logger.debug(f"Deleted {result.rowcount} rows from availability_targets table")
get(period, object_names=None, availability_types=None, filter_type='and', output_type='DataFrame')
¶
Gets all availability targets for objects.
Please keep in mind that this should only be used to get targets for objects (not groups) and for specific days (not time aggregates). If you want targets for groups and/or time aggregates use baze.kpis.availability.values which will also return the targets with the actual values.
The most useful keys/columns returned are:
- availability
- modified_date
Parameters:
-
(period¶DateTimeRange) –Period of time to get the data for.
-
(object_names¶list[str], default:None) –List of object names to get the data for. By default None
-
(availability_types¶list[str], default:None) –List of availability types to get the data for. By default None
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(output_type¶Literal['dict', 'DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
Returns:
-
DataFrame–In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[object_name, date, availability_type_name], columns = [availability, modified_date, ...]
-
dict[str, dict[Timestamp, dict[str, float]]]–In case output_type is "dict", returns a dictionary in the format {object_name: {date: {availability_type_name: value}, ...}, ...}, ...}
Source code in echo_postgres/kpi_availability_targets.py
@validate_call
def get(
self,
period: DateTimeRange,
object_names: list[str] | None = None,
availability_types: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, dict[Timestamp, dict[str, float]]]:
"""Gets all availability targets for objects.
Please keep in mind that this should only be used to get targets for objects (not groups) and for specific days (not time aggregates). If you want targets for groups and/or time aggregates use baze.kpis.availability.values which will also return the targets with the actual values.
The most useful keys/columns returned are:
- availability
- modified_date
Parameters
----------
period : DateTimeRange
Period of time to get the data for.
object_names : list[str], optional
List of object names to get the data for. By default None
availability_types : list[str], optional
List of availability types to get the data for. By default None
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
Returns
-------
DataFrame
In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[object_name, date, availability_type_name], columns = [availability, modified_date, ...]
dict[str, dict[Timestamp, dict[str, float]]]
In case output_type is "dict", returns a dictionary in the format {object_name: {date: {availability_type_name: value}, ...}, ...}, ...}
"""
# build the query
query = [
sql.SQL(
"SELECT * FROM performance.v_availability_targets WHERE (date >= {start} AND date <= {end})",
).format(
start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
where = []
if object_names:
where.append(
sql.SQL("object_name IN ({names})").format(
names=sql.SQL(", ").join(map(sql.Literal, object_names)),
),
)
if availability_types:
where.append(
sql.SQL("availability_type_name IN ({names})").format(
names=sql.SQL(", ").join(map(sql.Literal, availability_types)),
),
)
if where:
query.append(sql.SQL(" AND ("))
query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
query.append(sql.SQL(")"))
query.append(sql.SQL(" ORDER BY object_name, date, availability_type_name"))
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query, post_convert="pyarrow")
# forcing date to be a Timestamp
df["date"] = df["date"].astype("datetime64[s]")
# forcing object_name and availability_type_name to be a string
df = df.astype(
{"object_name": "string[pyarrow]", "availability_type_name": "string[pyarrow]"},
)
df = df.astype(
{"object_id": "int64[pyarrow]", "availability_type_id": "int64[pyarrow]"},
)
df = df.set_index(["object_name", "date", "availability_type_name"])
if output_type == "DataFrame":
return df
# dropping id columns not used in dict format
df = df.drop(columns=[col for col in df.columns if col.endswith("_id")] + ["modified_date"])
# converting to Dict
result = df["availability"].to_dict()
final_result = {}
for (object_name, date, availability_type_name), value in result.items():
if object_name not in final_result:
final_result[object_name] = {}
if date not in final_result[object_name]:
final_result[object_name][date] = {}
final_result[object_name][date][availability_type_name] = value
return final_result
insert(df, on_conflict='ignore')
¶
Inserts availability target values into table availability_targets
Parameters:
-
(df¶DataFrame) –DataFrame with the availability targets. Must have the following columns:
- object_name
- availability_type_name
- date
- availability: Value ranging from 0 to 1 representing the availability target for that availability_type.
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"
Source code in echo_postgres/kpi_availability_targets.py
@validate_call
def insert(self, df: DataFrame, on_conflict: Literal["ignore", "update"] = "ignore") -> None:
"""Inserts availability target values into table availability_targets
Parameters
----------
df : DataFrame
DataFrame with the availability targets. Must have the following columns:
- object_name
- availability_type_name
- date
- availability: Value ranging from 0 to 1 representing the availability target for that availability_type.
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. Can be one of ["ignore", "update"].
By default "ignore"
"""
# checking inputs
if df.isna().any().any():
raise ValueError("df cannot have any NaN values")
if set(df.columns) != {
"object_name",
"availability_type_name",
"date",
"availability",
}:
additional_cols = set(df.columns) - {"object_name", "availability_type_name", "date", "availability"}
missing_cols = {"object_name", "availability_type_name", "date", "availability"} - set(df.columns)
raise ValueError(
f"df must have the following columns: object_name, availability_type_name, date, availability. Additional columns: {additional_cols}. Missing columns: {missing_cols}",
)
# making a copy of the DataFrame
df = df.copy()
# dropping NaN
df = df.dropna(how="any")
# getting object id
wanted_objs = df["object_name"].unique().tolist()
obj_ids = self._perfdb.objects.instances.get_ids(object_names=wanted_objs)
if len(obj_ids) != len(wanted_objs):
missing_objs = set(wanted_objs) - set(obj_ids)
raise ValueError(f"Could not find the following objects: {missing_objs}")
df["object_id"] = df["object_name"].map(obj_ids)
# getting availability type id
wanted_cats = df["availability_type_name"].unique().tolist()
cat_ids = self._perfdb.kpis.availability.types.get_ids()
cat_ids = {k: v for k, v in cat_ids.items() if k in wanted_cats}
if len(cat_ids) != len(wanted_cats):
missing_cats = set(wanted_cats) - set(cat_ids)
raise ValueError(f"Could not find the following availability types: {missing_cats}")
df["availability_type_id"] = df["availability_type_name"].map(cat_ids)
# removing unwanted columns
df = df.drop(columns=["object_name", "availability_type_name"])
# inserting data
if_exists_mapping = {
"ignore": "append",
"update": "update",
}
with self._perfdb.conn.reconnect() as conn:
conn.pandas_to_sql(
df=df,
table_name="availability_targets",
schema="performance",
if_exists=if_exists_mapping[on_conflict],
ignore_index=True,
)
logger.debug("Availability targets inserted into the database")