KPI Energy Targets¶
KpiEnergyTargets(perfdb)
¶
Class used for handling energy targets. Can be accessed via perfdb.kpis.energy.targets.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(period, object_names=None)
¶
Deletes energy targets from the database.
Parameters:
-
(period¶DateTimeRange) –Period of time to delete the data for.
-
(object_names¶list[str], default:None) –List of object names to delete the data for. By default None
Source code in echo_postgres/kpi_energy_targets.py
@validate_call
def delete(
self,
period: DateTimeRange,
object_names: list[str] | None = None,
) -> None:
"""Deletes energy targets from the database.
Parameters
----------
period : DateTimeRange
Period of time to delete the data for.
object_names : list[str], optional
List of object names to delete the data for. By default None
"""
# build the query
query = [
sql.SQL("DELETE FROM performance.energy_targets WHERE (date >= {start} AND date <= {end})").format(
start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
if object_names:
# getting object id
obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
if len(obj_ids) != len(object_names):
missing_objs = set(object_names) - set(obj_ids)
raise ValueError(f"Could not find the following objects: {missing_objs}")
query.append(sql.SQL(" AND object_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, obj_ids.values()))))
query = sql.Composed(query)
# deleting
self._perfdb.conn.execute(query)
logger.debug(f"Deleted {self._perfdb.conn.rowcount} rows from energy_targets table")
get(period, time_res='daily', aggregation_window=None, object_or_group_names=None, object_group_types=None, measurement_points=None, filter_type='and', output_type='DataFrame', values_only=False)
¶
Gets energy targets for the desired period and objects.
The most useful keys/columns returned are:
- target
- target_pxx
- target_evaluation_period
- target_resource_assessment_id
Parameters:
-
(period¶DateTimeRange) –Period of time to get the data for.
-
(time_res¶Literal['daily', 'monthly', 'quarterly', 'yearly'], default:'daily') –Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly"], by default "daily"
-
(aggregation_window¶Literal['mtd', 'ytd', '12m'] | None, default:None) –Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None
-
(object_or_group_names¶list[str], default:None) –List of object or group names to get the data for. By default None
-
(object_group_types¶list[str], default:None) –List of object group types to get the data for. By default None
-
(measurement_points¶list[ALLOWED_MEASUREMENT_POINTS], default:None) –List of measurement points to get the data for, like Connection Point, Gravity Center, Asset, etc. By default None
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(output_type¶Literal['dict', 'DataFrame', 'pl.DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame", "pl.DataFrame"] By default "dict"
-
(values_only¶bool, default:False) –If set to True, when returning a dict will only return the values, ignoring other attributes like modified_date. Is ignored when output_type is "DataFrame". By default False
Returns:
-
DataFrame–In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex["group_type_name", "object_or_group_name", "date"], columns = [target, modified_date]
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame
-
dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]–In case output_type is "dict", returns a dictionary in the format {group_type_name: {object_or_group_name: {date: {attribute: value, ...}, ...}, ...}
Source code in echo_postgres/kpi_energy_targets.py
@validate_call
def get(
self,
period: DateTimeRange,
time_res: Literal["daily", "monthly", "quarterly", "yearly"] = "daily",
aggregation_window: Literal["mtd", "ytd", "12m"] | None = None,
object_or_group_names: list[str] | None = None,
object_group_types: list[str] | None = None,
measurement_points: list[ALLOWED_MEASUREMENT_POINTS] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "DataFrame",
values_only: bool = False,
) -> pd.DataFrame | pl.DataFrame | dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]:
"""Gets energy targets for the desired period and objects.
The most useful keys/columns returned are:
- target
- target_pxx
- target_evaluation_period
- target_resource_assessment_id
Parameters
----------
period : DateTimeRange
Period of time to get the data for.
time_res : Literal["daily", "monthly", "quarterly", "yearly"], optional
Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly"], by default "daily"
aggregation_window : Literal["mtd", "ytd", "12m"] | None, optional
Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None
object_or_group_names : list[str], optional
List of object or group names to get the data for. By default None
object_group_types : list[str], optional
List of object group types to get the data for. By default None
measurement_points : list[ALLOWED_MEASUREMENT_POINTS], optional
List of measurement points to get the data for, like Connection Point, Gravity Center, Asset, etc. By default None
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame", "pl.DataFrame"]
By default "dict"
values_only : bool, optional
If set to True, when returning a dict will only return the values, ignoring other attributes like modified_date. Is ignored when output_type is "DataFrame". By default False
Returns
-------
pd.DataFrame
In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex["group_type_name", "object_or_group_name", "date"], columns = [target, modified_date]
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame
dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]
In case output_type is "dict", returns a dictionary in the format {group_type_name: {object_or_group_name: {date: {attribute: value, ...}, ...}, ...}
"""
# build the query
query = [
sql.SQL(
"SELECT * FROM performance.{table} WHERE (date >= {start} AND date <= {end})",
).format(
table=sql.Identifier(
f"mv_energy_{time_res}{f'_{aggregation_window}' if aggregation_window else ''}",
),
start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
where = []
if object_or_group_names:
where.append(
sql.SQL("object_or_group_name IN ({names})").format(
names=sql.SQL(", ").join(map(sql.Literal, object_or_group_names)),
),
)
if object_group_types:
where.append(
sql.SQL("group_type_name IN ({names})").format(
names=sql.SQL(", ").join(map(sql.Literal, object_group_types)),
),
)
if measurement_points:
where.append(
sql.SQL("measurement_point_name IN ({points})").format(
points=sql.SQL(", ").join(map(sql.Literal, measurement_points)),
),
)
if where:
query.append(sql.SQL(" AND ("))
query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
query.append(sql.SQL(")"))
query.append(sql.SQL(" ORDER BY object_or_group_name, group_type_name, date"))
query = sql.Composed(query)
df = self._perfdb.conn.read_to_polars(query)
# dropping value and efficiency columns (targets only)
cols_to_drop = [c for c in ("value", "efficiency") if c in df.columns]
if cols_to_drop:
df = df.drop(cols_to_drop)
return convert_output(
df,
output_type=output_type,
index_col=["group_type_name", "object_or_group_name", "measurement_point_name", "date"],
drop_id_cols=True,
nest_by_index=True,
values_only_key="target" if values_only else None,
)
insert(df, on_conflict='ignore')
¶
Inserts energy targets into the database (table energy_targets)
Parameters:
-
(df¶DataFrame | DataFrame) –DataFrame with the following columns:
- object_name
- date
- energy
- pxx
- evaluation_period (one of 'longterm', '1year', '1month')
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"
Source code in echo_postgres/kpi_energy_targets.py
@validate_call
def insert(
self,
df: pd.DataFrame | pl.DataFrame,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
"""Inserts energy targets into the database (table energy_targets)
Parameters
----------
df : pd.DataFrame | pl.DataFrame
DataFrame with the following columns:
- object_name
- date
- energy
- pxx
- evaluation_period (one of 'longterm', '1year', '1month')
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. Can be one of ["ignore", "update"].
By default "ignore"
"""
# converting from pd.DataFrame to pl.DataFrame if necessary
if isinstance(df, pd.DataFrame):
df = pl.from_pandas(df)
df: pl.DataFrame
# checking inputs
if df.select(pl.any_horizontal(pl.all().is_null().any())).item():
raise ValueError("df cannot have NaN values")
wanted_cols = {"object_name", "date", "energy", "pxx", "evaluation_period"}
if set(df.columns) != wanted_cols:
additional_cols = set(df.columns) - wanted_cols
missing_cols = wanted_cols - set(df.columns)
raise ValueError(
f"df must have the following columns: {wanted_cols}. Additional columns: {additional_cols}. Missing columns: {missing_cols}",
)
# making a copy of the DataFrame
df = df.clone()
# getting object id
wanted_objs = df["object_name"].unique().to_list()
obj_ids = self._perfdb.objects.instances.get_ids(object_names=wanted_objs)
if len(obj_ids) != len(wanted_objs):
missing_objs = set(wanted_objs) - set(obj_ids)
raise ValueError(f"Could not find the following objects: {missing_objs}")
df = df.with_columns(
pl.col("object_name").replace_strict(obj_ids, return_dtype=pl.Int64).alias("object_id"),
)
# removing unwanted columns
df = df.drop(["object_name"])
# converting energy column to float
df = df.with_columns(pl.col("energy").cast(pl.Float32))
# converting pxx column to float
df = df.with_columns(pl.col("pxx").cast(pl.Float32))
# checking evaluation_period values
allowed_eval_periods = ["longterm", "1year", "1month"]
if not df["evaluation_period"].is_in(allowed_eval_periods).all():
invalid_values = df.filter(~pl.col("evaluation_period").is_in(allowed_eval_periods))["evaluation_period"].unique().to_list()
raise ValueError(
f"evaluation_period column can only have the following values: {allowed_eval_periods}. Invalid values found: {invalid_values}",
)
# inserting data
if_exists_mapping = {
"ignore": "append",
"update": "update",
}
self._perfdb.conn.polars_to_sql(
df=df,
table_name="energy_targets",
schema="performance",
if_exists=if_exists_mapping[on_conflict],
)
logger.debug("Energy targets inserted into the database")