KPI Energy Losses - Values¶
For more details on energy losses data and waterfall calculation see this dedicated page in the reference section.
KpiEnergyLossesValues(perfdb)
¶
Class used for handling energy losses KPI values. Can be accessed via perfdb.kpis.energy.losses.values.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(period, object_names=None, energy_losses_types=None)
¶
Deletes energy loss values from the database.
Parameters:
-
(period¶DateTimeRange) –Period of time to delete the data for.
-
(object_names¶list[str], default:None) –List of object names to delete the data for. By default None
-
(energy_losses_types¶list[str], default:None) –List of energy loss types to delete the data for. By default None
Source code in echo_postgres/kpi_energy_losses_values.py
@validate_call
def delete(
self,
period: DateTimeRange,
object_names: list[str] | None = None,
energy_losses_types: list[str] | None = None,
) -> None:
"""Deletes energy loss values from the database.
Parameters
----------
period : DateTimeRange
Period of time to delete the data for.
object_names : list[str], optional
List of object names to delete the data for. By default None
energy_losses_types : list[str], optional
List of energy loss types to delete the data for. By default None
"""
# build the query
query = [
sql.SQL("DELETE FROM performance.energyloss_values WHERE (date >= {start} AND date <= {end})").format(
start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
if object_names:
# getting object id
obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
if len(obj_ids) != len(object_names):
missing_objs = set(object_names) - set(obj_ids)
raise ValueError(f"Could not find the following objects: {missing_objs}")
query.append(sql.SQL(" AND object_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, obj_ids.values()))))
if energy_losses_types:
# getting energy losses types ids
elt_ids = self._perfdb.kpis.energy.losses.types.get_ids()
elt_ids = {elt: elt_ids[elt] for elt in energy_losses_types}
query.append(sql.SQL(" AND energyloss_type_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, elt_ids.values()))))
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
# deleting
result = conn.execute(query)
logger.debug(f"Deleted {result.rowcount} rows from energyloss_values table")
get(period, time_res='daily', aggregation_window=None, object_or_group_names=None, object_group_types=None, energy_losses_types=None, filter_type='and', output_type='DataFrame', values_only=False)
¶
Gets energy losses values for the desired period and objects.
The most useful keys/columns returned are:
- measured: Actual measured value for the period in kWh
- measured_as_percentage: Actual measured value for the period as a percentage
- measured_after_loss: What's the measured energy production after the losses in kWh.
- target: Target value for the period in kWh
- target_as_percentage: Target value for the period as a percentage.
- target_after_loss: What's the target energy production after the losses in kWh.
Keep in mind that:
- as_percentage and after_loss do take into account the loss_order defined in table energyloss_types. Currently the last loss is park_curtailment and the after_loss value will be equal to the measured value (from energy_values tables) or the target value (from energy_targets tables) at the connection point.
- Percentage values are alway calculated considering the gross energy production (before losses). So, as an example for measured, it will be equivalent to measured / (measured + measured_after_loss).
Parameters:
-
(period¶DateTimeRange) –Period of time to get the data for.
-
(time_res¶Literal['daily', 'monthly', 'quarterly', 'yearly'], default:'daily') –Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly"], by default "daily"
-
(aggregation_window¶Literal['mtd', 'ytd', '12m'] | None, default:None) –Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None
-
(object_or_group_names¶list[str], default:None) –List of object or group names to get the data for. By default None
-
(object_group_types¶list[str], default:None) –List of object group types to get the data for. By default None
-
(energy_losses_types¶list[str], default:None) –List of energy loss types to delete the data for. By default None
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(output_type¶Literal['dict', 'DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
-
(values_only¶bool, default:False) –If set to True, when returning a dict will only return the values, ignoring other attributes like modified_date. Is ignored when output_type is "DataFrame". By default False
Returns:
-
DataFrame–In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex["group_type_name", "object_or_group_name", "energyloss_type_name", "date"], columns = [energy, modified_date]
-
dict[str, dict[str, dict[str, dict[Timestamp, dict[str, Any]]]]] | dict[str, dict[str, dict[str, dict[Timestamp, Any]]]]–In case output_type is "dict", returns a dictionary in the format {group_type_name: {object_or_group_name: {energyloss_type_name: {date: {attribute: value, ...}, ...}, ...}, ...} In case values_only is True, will return a dictionary in the format {group_type_name: {object_or_group_name: {energyloss_type_name: {date: value, ...}, ...}, ...}
Source code in echo_postgres/kpi_energy_losses_values.py
@validate_call
def get(
self,
period: DateTimeRange,
time_res: Literal["daily", "monthly", "quarterly", "yearly"] = "daily",
aggregation_window: Literal["mtd", "ytd", "12m"] | None = None,
object_or_group_names: list[str] | None = None,
object_group_types: list[str] | None = None,
energy_losses_types: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame"] = "DataFrame",
values_only: bool = False,
) -> (
DataFrame | dict[str, dict[str, dict[str, dict[Timestamp, dict[str, Any]]]]] | dict[str, dict[str, dict[str, dict[Timestamp, Any]]]]
):
"""Gets energy losses values for the desired period and objects.
The most useful keys/columns returned are:
- measured: Actual measured value for the period in kWh
- measured_as_percentage: Actual measured value for the period as a percentage
- measured_after_loss: What's the measured energy production after the losses in kWh.
- target: Target value for the period in kWh
- target_as_percentage: Target value for the period as a percentage.
- target_after_loss: What's the target energy production after the losses in kWh.
Keep in mind that:
- as_percentage and after_loss do take into account the loss_order defined in table energyloss_types. Currently the last loss is park_curtailment and the after_loss value will be equal to the measured value (from energy_values tables) or the target value (from energy_targets tables) at the connection point.
- Percentage values are alway calculated considering the gross energy production (before losses). So, as an example for measured, it will be equivalent to measured / (measured + measured_after_loss).
Parameters
----------
period : DateTimeRange
Period of time to get the data for.
time_res : Literal["daily", "monthly", "quarterly", "yearly"], optional
Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly"], by default "daily"
aggregation_window : Literal["mtd", "ytd", "12m"] | None, optional
Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None
object_or_group_names : list[str], optional
List of object or group names to get the data for. By default None
object_group_types : list[str], optional
List of object group types to get the data for. By default None
energy_losses_types : list[str], optional
List of energy loss types to delete the data for. By default None
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
values_only : bool, optional
If set to True, when returning a dict will only return the values, ignoring other attributes like modified_date. Is ignored when output_type is "DataFrame". By default False
Returns
-------
DataFrame
In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex["group_type_name", "object_or_group_name", "energyloss_type_name", "date"], columns = [energy, modified_date]
dict[str, dict[str, dict[str, dict[Timestamp, dict[str, Any]]]]] | dict[str, dict[str, dict[str, dict[Timestamp, Any]]]]
In case output_type is "dict", returns a dictionary in the format {group_type_name: {object_or_group_name: {energyloss_type_name: {date: {attribute: value, ...}, ...}, ...}, ...}
In case values_only is True, will return a dictionary in the format {group_type_name: {object_or_group_name: {energyloss_type_name: {date: value, ...}, ...}, ...}
"""
# getting loss type definitions to get bazefield point
loss_types_def = self._perfdb.kpis.energy.losses.types.get(output_type="dict")
# checking if all energy losses types are valid
if energy_losses_types is not None and (wrong_types := list(set(energy_losses_types) - set(loss_types_def.keys()))):
raise ValueError(f"Invalid energy losses types: {wrong_types}")
# build the query
query = [
sql.SQL(
"SELECT * FROM performance.{table} WHERE (date >= {start} AND date <= {end})",
).format(
table=sql.Identifier(
f"mv_energyloss_values_{time_res}{f'_{aggregation_window}' if aggregation_window else ''}",
),
start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
where = []
if object_or_group_names:
where.append(
sql.SQL("object_or_group_name IN ({names})").format(
names=sql.SQL(", ").join(map(sql.Literal, object_or_group_names)),
),
)
if object_group_types:
where.append(
sql.SQL("group_type_name IN ({names})").format(
names=sql.SQL(", ").join(map(sql.Literal, object_group_types)),
),
)
if energy_losses_types:
where.append(
sql.SQL("energyloss_type_name IN ({points})").format(
points=sql.SQL(", ").join(map(sql.Literal, energy_losses_types)),
),
)
if where:
query.append(sql.SQL(" AND ("))
query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
query.append(sql.SQL(")"))
query.append(sql.SQL(" ORDER BY object_or_group_name, group_type_name, energyloss_type_name, date"))
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query, post_convert="pyarrow")
# forcing date to be a Timestamp
df["date"] = df["date"].astype("datetime64[s]")
# forcing object_name and object_group_name to be a string
df = df.astype(
{"object_or_group_name": "string[pyarrow]", "group_type_name": "string[pyarrow]", "energyloss_type_name": "string[pyarrow]"},
)
df = df.astype(
{"object_or_group_id": "int64[pyarrow]", "group_type_id": "int64[pyarrow]", "energyloss_type_id": "int16[pyarrow]"},
)
df = df.set_index(["group_type_name", "object_or_group_name", "energyloss_type_name", "date"])
if output_type == "DataFrame":
return df
# dropping id columns not used in dict format
df = df.drop(columns=[col for col in df.columns if col.endswith("_id")])
# converting to Dict
result = df.to_dict(orient="index")
final_result = {}
for (object_group_type_name, object_or_group_name, energyloss_type_name, date), data in result.items():
if object_group_type_name not in final_result:
final_result[object_group_type_name] = {}
if object_or_group_name not in final_result[object_group_type_name]:
final_result[object_group_type_name][object_or_group_name] = {}
if energyloss_type_name not in final_result[object_group_type_name][object_or_group_name]:
final_result[object_group_type_name][object_or_group_name][energyloss_type_name] = {}
if date not in final_result[object_group_type_name][object_or_group_name]:
final_result[object_group_type_name][object_or_group_name][energyloss_type_name][date] = (
data["measured"] if values_only else data
)
return final_result
insert(df, on_conflict='ignore')
¶
Inserts energy losses values into the database (table energyloss_values)
Parameters:
-
(df¶DataFrame) –DataFrame with the following columns:
- object_name
- date
- energy_loss_type ('park_curtailment', ...)
- value
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"
Source code in echo_postgres/kpi_energy_losses_values.py
@validate_call
def insert(
self,
df: DataFrame,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
"""Inserts energy losses values into the database (table energyloss_values)
Parameters
----------
df : DataFrame
DataFrame with the following columns:
- object_name
- date
- energy_loss_type ('park_curtailment', ...)
- value
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. Can be one of ["ignore", "update"].
By default "ignore"
"""
# checking inputs
required_columns = {"object_name", "date", "energy_loss_type", "value"}
if df.isna().any().any():
raise ValueError("df cannot have NaN values")
if set(df.columns) != required_columns:
additional_cols = set(df.columns) - required_columns
missing_cols = required_columns - set(df.columns)
raise ValueError(
f"df must have the following columns: object_name, date, energy_loss_type, value. Additional columns: {additional_cols}. Missing columns: {missing_cols}",
)
# making a copy of df
df = df.copy()
# getting object id
wanted_objs = df["object_name"].unique().tolist()
obj_ids = self._perfdb.objects.instances.get_ids(object_names=wanted_objs)
if len(obj_ids) != len(wanted_objs):
missing_objs = set(wanted_objs) - set(obj_ids)
raise ValueError(f"Could not find the following objects: {missing_objs}")
df["object_id"] = df["object_name"].map(obj_ids)
# getting energy loss type id
wanted_energy_losses_types = df["energy_loss_type"].unique().tolist()
elt_ids = self._perfdb.kpis.energy.losses.types.get_ids()
if wrong_wlt := set(wanted_energy_losses_types) - set(elt_ids.keys()):
raise ValueError(f"Could not find the following measurement points: {wrong_wlt}")
df["energyloss_type_id"] = df["energy_loss_type"].map(elt_ids)
# removing unwanted columns
df = df.drop(columns=["object_name", "energy_loss_type"])
# converting energy column to float
df["value"] = df["value"].astype("float32")
# checking if there are NaNs in energy column
nan_rows = df[df["value"].isna()].index
if len(nan_rows) > 0:
logger.warning(
f"Found NaN values in value column. Dropping {len(nan_rows)} rows (indexes: {df['date'].loc[nan_rows].tolist()})",
)
df = df[~df.index.isin(nan_rows)].copy()
# inserting data
if_exists_mapping = {
"ignore": "append",
"update": "update",
}
with self._perfdb.conn.reconnect() as conn:
conn.pandas_to_sql(
df=df,
table_name="energyloss_values",
schema="performance",
if_exists=if_exists_mapping[on_conflict],
ignore_index=True,
)
logger.debug(f"Energy loss values inserted into the database. Total of {len(df)} rows inserted/updated.")
sync_bazefield(period, object_names=None, energy_losses_types=None, overwrite=False)
¶
Method to get energy numbers from Bazefield and insert them into the database.
This will save the results in the table "energy_values" of performance_db.
Parameters:
-
(period¶DateTimeRange) –Period to get energy_ numbers from Bazefield. Values will be rounded to the nearest day. Its recommended that the start is at 00:00:00 and the end is at 23:59:59.
-
(object_names¶list[str] | None, default:None) –Name of the objects to get the energy from (must be an SPE). If set to None will get all that match the object types allowed in ALLOWED_ENERGY_LOSSES_OBJECT_MODELS. By default None
-
(energy_losses_types¶list[str] | None, default:None) –List of energy loss types (park_curtailment, etc...) to get the energy from. By default None
-
(overwrite¶bool, default:False) –If set to True, will overwrite the existing values in the database, by default False
Returns:
-
DataFrame–DataFrame with energy values inserted in the database
Source code in echo_postgres/kpi_energy_losses_values.py
@validate_call
def sync_bazefield(
self,
period: DateTimeRange,
object_names: list[str] | None = None,
energy_losses_types: list[str] | None = None,
overwrite: bool = False,
) -> DataFrame:
"""Method to get energy numbers from Bazefield and insert them into the database.
This will save the results in the table "energy_values" of performance_db.
Parameters
----------
period : DateTimeRange
Period to get energy_ numbers from Bazefield. Values will be rounded to the nearest day.
Its recommended that the start is at 00:00:00 and the end is at 23:59:59.
object_names : list[str] | None, optional
Name of the objects to get the energy from (must be an SPE). If set to None will get all that match the object types allowed in ALLOWED_ENERGY_LOSSES_OBJECT_MODELS.
By default None
energy_losses_types : list[str] | None, optional
List of energy loss types (park_curtailment, etc...) to get the energy from. By default None
overwrite : bool, optional
If set to True, will overwrite the existing values in the database, by default False
Returns
-------
DataFrame
DataFrame with energy values inserted in the database
"""
t0 = perf_counter()
# adjusting period to cover the whole day
period = period.copy()
period = period.round(timedelta(days=1), start="floor", end="ceil")
# getting all objects that are allowed to have energy values
allowed_objects = self._perfdb.objects.instances.get(object_models=ALLOWED_ENERGY_LOSSES_OBJECT_MODELS, output_type="DataFrame")
# checking if provided object names are valid
if object_names is None:
object_names = list(allowed_objects.index)
# removing all objects with TEST in their name
object_names = [name for name in object_names if "TEST" not in name]
elif wrong_names := list(set(object_names) - set(allowed_objects.index)):
raise ValueError(f"Invalid object names: {wrong_names}")
# getting loss type definitions to get bazefield point
loss_types_def = self._perfdb.kpis.energy.losses.types.get(output_type="dict")
# removing all loss types where source != "bazefield" and in case bazefield_point is None
loss_types_def = {k: v for k, v in loss_types_def.items() if v["source"] == "bazefield" and v["bazefield_point"] is not None}
# checking if all energy losses types are valid
if energy_losses_types is not None:
if wrong_types := list(set(energy_losses_types) - set(loss_types_def.keys())):
raise ValueError(
f"Invalid energy losses types: {wrong_types}. Check if they are defined and if they have source='bazefield' and a valid bazefield_point",
)
# filtering loss_types_def
loss_types_def = {k: v for k, v in loss_types_def.items() if k in energy_losses_types}
# creating connection to Bazefield
baze = Baze()
# iterating each resource type
for loss_type, loss_type_attrs in loss_types_def.items():
# getting the bazefield point
bazefield_point = loss_type_attrs["bazefield_point"]
# adjusting object_names depending on the applicable source
applicable_to = loss_type_attrs["applicable_to"]
if applicable_to in {"solar", "wind"}:
model = f"{applicable_to}_farm"
object_names = [obj for obj in object_names if allowed_objects.loc[obj, "object_model_name"] == model]
# getting values from tag for all objects
wanted_points = {obj: [bazefield_point] for obj in object_names}
point_period = period.copy()
point_period.start = point_period.start - timedelta(minutes=10)
point_period.end = point_period.end + timedelta(minutes=10)
# regex to get 5min or 10min from bazefield point
feature_freq = re.findall(r"\d{1,2}min", bazefield_point)
if len(feature_freq) > 1:
raise ValueError(f"Found more than one frequency in {bazefield_point}")
if not feature_freq:
logger.info(f"Could not find frequency in {bazefield_point}. Considering reindex as none to get value from Bazefield")
feature_freq = feature_freq[0] if feature_freq else None
# getting values
values = baze.points.values.series.get(points=wanted_points, reindex=feature_freq, period=point_period)
# dropping second level
values = values.droplevel(1, axis=1)
# converting from kW to kWh
if feature_freq is not None and "min" in feature_freq:
freq = int(feature_freq.replace("min", ""))
elif re.findall(r"1d", bazefield_point) == ["1d"]:
freq = 1440 # number of minutes in one day
values.index.name = None
else:
raise ValueError(f"Could not find frequency in {bazefield_point}. Please check the bazefield point format.")
values = values * (freq / 60)
# resampling to day
daily_values = values.resample("D").sum()
# adjusting values to upload to the database
# melting the DataFrame
values = daily_values.reset_index().melt(id_vars="index", var_name="object_name", value_name="value")
values = values.rename(columns={"index": "date"})
values["energy_loss_type"] = loss_type
# removing outside period
values = values[
(values["date"] >= period.start) & (values["date"] < period.end)
] # < used at end to avoid including the next day at 00:00:00
# inserting energy loss data into the database
logger.info("Inserting energy loss values data into the database")
if values.empty:
logger.info(
f"No energy loss values found for {loss_type} in period {period} for object {object_names}. Skipping insertion.",
)
continue
self.insert(df=values, on_conflict="update" if overwrite else "ignore")
logger.info(
f"Energy loss values for {loss_type} inserted into the database in {perf_counter() - t0:.2f} seconds. Period {period} and objects {object_names}",
)
del baze
return values