KPI Stopped Assets¶
KpiStoppedAssets(perfdb)
¶
Class used for handling Stopped Assets KPI. Can be accessed via perfdb.kpis.stoppedassets.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
get(end_date, group_names=None, group_types=None, request_interval=None, filter_type='and', output_type='DataFrame')
¶
Gets the stopped assets in a specific date.
The result will have the following attributes for each stopped asset:
object_name: Name of the object.start_date: Date when the object stopped (actually, that represents the first day the object was unavailable during the entire day, actual stop date is the previous day).duration_days: Number of days the object was stopped.lost_energy: Energy lost during the stoppage period (in kWh)
It will use the availability data as a reference to find the stopped assets, this way we only have a daily resolution, not being able to find the exact time the asset stopped. The exact duration of the stoppage is the number of days returned by the method plus the additional time from the previous day when the asset actually stopped.
The logic used here is to start the search from the end_date and go back in time until the start date of all objects are reached. For each period, it will get the availability data and check if there are any objects with available energy at the start of the period (oldest date). If so, it will remove these objects from the list and get the data for the previous period. This will continue until we have reached a point all assets where available at least once in the period.
Note
TODO: We can improve this method by looking into Bazefield to find the exact time the asset stopped.
Parameters:
-
(end_date¶date) –Date to get the stopped assets.
-
(group_names¶list[str] | None, default:None) –Names of the groups to filter, by default None
-
(group_types¶list[str] | None, default:None) –Types of the groups to filter, by default None
-
(request_interval¶timedelta | relativedelta | None, default:None) –Interval of the requests to the database. If set to None will default to 1 month. By default None
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(output_type¶Literal['dict', 'DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
Returns:
-
DataFrame–In case output_type is "DataFrame", it will return a DataFrame with the stopped assets. It will have the object_name as the index and the columns start_date, duration_days and lost_energy.
-
dict[str, Any]–In case output_type is "dict", it will return a dictionary with the stopped assets. The keys will be the object names and the values will be a dictionary with the keys start_date, duration_days and lost_energy.
Source code in echo_postgres/kpi_stoppedassets.py
@validate_call
def get(
self,
end_date: date,
group_names: list[str] | None = None,
group_types: list[str] | None = None,
request_interval: timedelta | relativedelta | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, Any]:
"""Gets the stopped assets in a specific date.
The result will have the following attributes for each stopped asset:
- `object_name`: Name of the object.
- `start_date`: Date when the object stopped (actually, that represents the first day the object was unavailable during the entire day, actual stop date is the previous day).
- `duration_days`: Number of days the object was stopped.
- `lost_energy`: Energy lost during the stoppage period (in kWh)
It will use the availability data as a reference to find the stopped assets, this way we only have a daily resolution, not being able to find the exact time the asset stopped. The exact duration of the stoppage is the number of days returned by the method plus the additional time from the previous day when the asset actually stopped.
The logic used here is to start the search from the end_date and go back in time until the start date of all objects are reached. For each period, it will get the availability data and check if there are any objects with available energy at the start of the period (oldest date). If so, it will remove these objects from the list and get the data for the previous period. This will continue until we have reached a point all assets where available at least once in the period.
> [!NOTE]
>
> TODO: We can improve this method by looking into Bazefield to find the exact time the asset stopped.
Parameters
----------
end_date : date
Date to get the stopped assets.
group_names : list[str] | None, optional
Names of the groups to filter, by default None
group_types : list[str] | None, optional
Types of the groups to filter, by default None
request_interval : timedelta | relativedelta | None, optional
Interval of the requests to the database. If set to None will default to 1 month. By default None
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
Returns
-------
DataFrame
In case output_type is "DataFrame", it will return a DataFrame with the stopped assets. It will have the object_name as the index and the columns start_date, duration_days and lost_energy.
dict[str, Any]
In case output_type is "dict", it will return a dictionary with the stopped assets. The keys will be the object names and the values will be a dictionary with the keys start_date, duration_days and lost_energy.
"""
if not request_interval:
request_interval = relativedelta(months=1)
# getting all objects
objs_df = self._perfdb.objects.groups.instances.get(
object_group_types=group_types,
object_group_names=group_names,
filter_type=filter_type,
output_type="DataFrame",
)
if objs_df.empty:
raise ValueError(f"No objects found for the given group_names and group_types: {group_names}, {group_types}")
objs = []
for obj_list in objs_df["object_names"]:
objs.extend(obj_list)
objs = list(set(objs))
objs.sort()
finished = False
# while loop that runs until we the start date of all objects
# it will start from the end date and go back in time until the start date of all objects are reached
unavailable_energy_df = DataFrame()
available_energy_df = DataFrame()
wanted_objs = objs.copy()
request_period = None
iterations = 0
while not finished:
iterations += 1
# defining the period to get the data
if request_period is None:
request_period = DateTimeRange(end_date - request_interval, end_date)
else:
request_period = DateTimeRange(request_period.start - request_interval, request_period.start - timedelta(days=1))
logger.debug(f"Requesting data for period: {request_period} and objects: {wanted_objs}")
# getting availability for the period
avail_df = self._perfdb.kpis.availability.values.get(
period=request_period,
time_res="daily",
object_or_group_names=wanted_objs,
availability_types=["Technical"],
)
avail_df = avail_df.droplevel(level=["group_type_name", "availability_type_name"], axis=0)
avail_df = avail_df.reset_index()
avail_df = avail_df[["object_or_group_name", "date", "energy_available", "energy_unavailable"]]
# pivoting the DataFrame to have the objects as columns
period_unavailable_energy_df = avail_df.pivot(index="date", columns="object_or_group_name", values="energy_unavailable")
period_available_energy_df = avail_df.pivot(index="date", columns="object_or_group_name", values="energy_available")
# making sure index is sorted
period_unavailable_energy_df = period_unavailable_energy_df.sort_index(ascending=False)
period_available_energy_df = period_available_energy_df.sort_index(ascending=False)
# removing all objects that have available energy at the end of the period
remove_objs = period_available_energy_df.columns[period_available_energy_df.iloc[0] > 0].to_list()
if remove_objs:
period_unavailable_energy_df = period_unavailable_energy_df.drop(columns=remove_objs)
period_available_energy_df = period_available_energy_df.drop(columns=remove_objs)
# concatenating the DataFrames
unavailable_energy_df = concat([unavailable_energy_df, period_unavailable_energy_df], axis=0, join="outer")
available_energy_df = concat([available_energy_df, period_available_energy_df], axis=0, join="outer", sort=True)
# for each object (column), lets set to NA all rows after (older) the first row with energy available
# for this, for all columns lets find the index of the first row with energy available
# lets mask available_energy_df to set <=0 values to NA
masked_available_energy_df = available_energy_df.mask(available_energy_df <= 0)
# lets find the first row with energy available for each object
first_available_energy = np.where(
masked_available_energy_df.iloc[0].isna(),
masked_available_energy_df.notna().idxmax(),
masked_available_energy_df.notna()[::-1].idxmax(),
)
# set to NA all rows after (older) the first row with energy available
for obj, first_available in zip(available_energy_df.columns, first_available_energy, strict=False):
# in case first_available is equal to the end_date, it means that the object has no available energy in the period, so we should skip it
if first_available.astype(datetime).date() == end_date:
continue
available_energy_df.loc[first_available:, obj] = NA
unavailable_energy_df.loc[first_available:, obj] = NA
# check if there are any objects with not NA values in the last rows, if so, we need to get data for the previous period
wanted_objs = available_energy_df.columns[available_energy_df.iloc[-1].notna()].to_list()
if not wanted_objs:
finished = True
logger.info(f"Finished after {iterations} iterations. Found {len(unavailable_energy_df.columns)} objects that are stopped.")
# creating DataFrame to store results
results = DataFrame(columns=["object_name", "start_date", "duration_days", "lost_energy"])
results["object_name"] = unavailable_energy_df.columns.values
# finding first NA row for each object
results["start_date"] = to_datetime(unavailable_energy_df.isna().idxmax()).values
# calculating duration
results["duration_days"] = (datetime.combine(end_date, time()) - results["start_date"]).dt.days
# calculating lost energy
results["lost_energy"] = unavailable_energy_df.sum().values
results = results.astype(
{
"object_name": "string[pyarrow]",
"start_date": "datetime64[s]",
"duration_days": "int64[pyarrow]",
"lost_energy": "double[pyarrow]",
},
)
results = results.set_index("object_name")
# returning in case output_type is DataFrame
if output_type == "DataFrame":
return results
# returning in case output_type is dict
results_dict = results.to_dict(orient="index")
return results_dict