KPI Stopped Assets¶
KpiStoppedAssets(perfdb)
¶
Class used for handling Stopped Assets KPI. Can be accessed via perfdb.kpis.stoppedassets.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
get(end_date, group_names=None, group_types=None, request_interval=None, filter_type='and', output_type='DataFrame')
¶
Gets the stopped assets in a specific date.
The result will have the following attributes for each stopped asset:
object_name: Name of the object.start_date: Date when the object stopped (actually, that represents the first day the object was unavailable during the entire day, actual stop date is the previous day).duration_days: Number of days the object was stopped.lost_energy: Energy lost during the stoppage period (in kWh)
It will use the availability data as a reference to find the stopped assets, this way we only have a daily resolution, not being able to find the exact time the asset stopped. The exact duration of the stoppage is the number of days returned by the method plus the additional time from the previous day when the asset actually stopped.
The logic used here is to start the search from the end_date and go back in time until the start date of all objects are reached. For each period, it will get the availability data and check if there are any objects with available energy at the start of the period (oldest date). If so, it will remove these objects from the list and get the data for the previous period. This will continue until we have reached a point all assets where available at least once in the period.
Note
TODO: We can improve this method by looking into Bazefield to find the exact time the asset stopped.
Parameters:
-
(end_date¶date) –Date to get the stopped assets.
-
(group_names¶list[str] | None, default:None) –Names of the groups to filter, by default None
-
(group_types¶list[str] | None, default:None) –Types of the groups to filter, by default None
-
(request_interval¶timedelta | relativedelta | None, default:None) –Interval of the requests to the database. If set to None will default to 1 month. By default None
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(output_type¶Literal['dict', 'DataFrame', 'pl.DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame", "pl.DataFrame"] By default "dict"
Returns:
-
DataFrame–In case output_type is "DataFrame", it will return a DataFrame with the stopped assets. It will have the object_name as the index and the columns start_date, duration_days and lost_energy.
-
dict[str, Any]–In case output_type is "dict", it will return a dictionary with the stopped assets. The keys will be the object names and the values will be a dictionary with the keys start_date, duration_days and lost_energy.
-
DataFrame–In case output_type is "pl.DataFrame", it will return a Polars DataFrame with the stopped assets. It will have the object_name as the index and the columns start_date, duration_days and lost_energy.
Source code in echo_postgres/kpi_stoppedassets.py
@validate_call
def get(
self,
end_date: date,
group_names: list[str] | None = None,
group_types: list[str] | None = None,
request_interval: timedelta | relativedelta | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "DataFrame",
) -> pd.DataFrame | dict[str, Any] | pl.DataFrame:
"""Gets the stopped assets in a specific date.
The result will have the following attributes for each stopped asset:
- `object_name`: Name of the object.
- `start_date`: Date when the object stopped (actually, that represents the first day the object was unavailable during the entire day, actual stop date is the previous day).
- `duration_days`: Number of days the object was stopped.
- `lost_energy`: Energy lost during the stoppage period (in kWh)
It will use the availability data as a reference to find the stopped assets, this way we only have a daily resolution, not being able to find the exact time the asset stopped. The exact duration of the stoppage is the number of days returned by the method plus the additional time from the previous day when the asset actually stopped.
The logic used here is to start the search from the end_date and go back in time until the start date of all objects are reached. For each period, it will get the availability data and check if there are any objects with available energy at the start of the period (oldest date). If so, it will remove these objects from the list and get the data for the previous period. This will continue until we have reached a point all assets where available at least once in the period.
> [!NOTE]
>
> TODO: We can improve this method by looking into Bazefield to find the exact time the asset stopped.
Parameters
----------
end_date : date
Date to get the stopped assets.
group_names : list[str] | None, optional
Names of the groups to filter, by default None
group_types : list[str] | None, optional
Types of the groups to filter, by default None
request_interval : timedelta | relativedelta | None, optional
Interval of the requests to the database. If set to None will default to 1 month. By default None
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame", "pl.DataFrame"]
By default "dict"
Returns
-------
DataFrame
In case output_type is "DataFrame", it will return a DataFrame with the stopped assets. It will have the object_name as the index and the columns start_date, duration_days and lost_energy.
dict[str, Any]
In case output_type is "dict", it will return a dictionary with the stopped assets. The keys will be the object names and the values will be a dictionary with the keys start_date, duration_days and lost_energy.
pl.DataFrame
In case output_type is "pl.DataFrame", it will return a Polars DataFrame with the stopped assets. It will have the object_name as the index and the columns start_date, duration_days and lost_energy.
"""
if not request_interval:
request_interval = relativedelta(months=1)
# getting all objects
objs_df = self._perfdb.objects.groups.instances.get(
object_group_types=group_types,
object_group_names=group_names,
filter_type=filter_type,
output_type="pl.DataFrame",
)
if objs_df.is_empty():
raise ValueError(f"No objects found for the given group_names and group_types: {group_names}, {group_types}")
objs = []
for obj_list in objs_df["object_names"]:
objs.extend(obj_list)
objs = list(set(objs))
objs.sort()
# while loop that runs until we find the start date of all objects
# it will start from the end date and go back in time until the start date of all objects are reached
all_data: pl.DataFrame | None = None
all_boundaries: pl.DataFrame | None = None
wanted_objs = objs.copy()
request_period = None
iterations = 0
while wanted_objs:
iterations += 1
# defining the period to get the data
if request_period is None:
request_period = DateTimeRange(end_date - request_interval, end_date)
else:
request_period = DateTimeRange(request_period.start - request_interval, request_period.start - timedelta(days=1))
logger.debug(f"Requesting data for period: {request_period} and objects: {wanted_objs}")
# getting availability for the period
avail_df = self._perfdb.kpis.availability.values.get(
period=request_period,
time_res="daily",
object_or_group_names=wanted_objs,
availability_types=["Technical"],
output_type="pl.DataFrame",
)
# selecting and renaming needed columns
avail_df = avail_df.select(
pl.col("object_or_group_name").alias("object_name"),
"date",
"energy_available",
"energy_unavailable",
)
if avail_df.is_empty():
logger.warning(f"No availability data found for period {request_period}. Stopping search.")
break
# removing all objects that have available energy at the most recent date of the period
max_date = avail_df["date"].max()
objs_with_avail = (
avail_df.filter((pl.col("date") == max_date) & (pl.col("energy_available") > 0))
.get_column("object_name")
.unique()
.to_list()
)
if objs_with_avail:
avail_df = avail_df.filter(~pl.col("object_name").is_in(objs_with_avail))
if avail_df.is_empty():
break
# concatenating with accumulated data
all_data = pl.concat([all_data, avail_df], how="diagonal_relaxed") if all_data is not None else avail_df
# for each object, find the boundary date (most recent date with energy_available > 0)
new_boundaries = (
all_data.filter(pl.col("energy_available") > 0).group_by("object_name").agg(pl.col("date").max().alias("boundary_date"))
)
if not new_boundaries.is_empty():
all_boundaries = (
(
pl.concat([all_boundaries, new_boundaries], how="diagonal_relaxed")
.group_by("object_name")
.agg(pl.col("boundary_date").max())
)
if all_boundaries is not None
else new_boundaries
)
# filter out rows at or before the boundary date for each object
if all_boundaries is not None:
all_data = (
all_data.join(all_boundaries, on="object_name", how="left")
.filter(pl.col("boundary_date").is_null() | (pl.col("date") > pl.col("boundary_date")))
.drop("boundary_date")
)
# objects without a boundary still need more historical data
done_objs = set(all_boundaries.get_column("object_name").to_list()) if all_boundaries is not None else set()
remaining_objs = set(all_data.get_column("object_name").unique().to_list()) if not all_data.is_empty() else set()
wanted_objs = sorted(remaining_objs - done_objs)
n_stopped = all_data.get_column("object_name").n_unique() if all_data is not None and not all_data.is_empty() else 0
logger.info(f"Finished after {iterations} iterations. Found {n_stopped} objects that are stopped.")
# building results
if all_data is None or all_data.is_empty():
results = pl.DataFrame(
schema={
"object_name": pl.Utf8,
"start_date": pl.Datetime("ms"),
"duration_days": pl.Int64,
"lost_energy": pl.Float64,
},
)
else:
# compute lost energy per object from the stoppage rows
lost_energy = all_data.group_by("object_name").agg(pl.col("energy_unavailable").sum().alias("lost_energy"))
# join with boundaries and compute start_date and duration
if all_boundaries is not None:
results = lost_energy.join(all_boundaries, on="object_name", how="left")
else:
results = lost_energy.with_columns(pl.lit(None, dtype=pl.Date).alias("boundary_date"))
results = (
results.with_columns(
pl.col("boundary_date").cast(pl.Datetime("ms")).alias("start_date"),
(pl.lit(end_date) - pl.col("boundary_date")).dt.total_days().cast(pl.Int64).alias("duration_days"),
)
.select(["object_name", "start_date", "duration_days", "lost_energy"])
.sort("object_name")
)
return convert_output(results, output_type, index_col="object_name")