Skip to content

KPI Stopped Assets

KpiStoppedAssets(perfdb)

Class used for handling Stopped Assets KPI. Can be accessed via perfdb.kpis.stoppedassets.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

get(end_date, group_names=None, group_types=None, request_interval=None, filter_type='and', output_type='DataFrame')

Gets the stopped assets in a specific date.

The result will have the following attributes for each stopped asset:

  • object_name: Name of the object.
  • start_date: Date when the object stopped (actually, that represents the first day the object was unavailable during the entire day, actual stop date is the previous day).
  • duration_days: Number of days the object was stopped.
  • lost_energy: Energy lost during the stoppage period (in kWh)

It will use the availability data as a reference to find the stopped assets, this way we only have a daily resolution, not being able to find the exact time the asset stopped. The exact duration of the stoppage is the number of days returned by the method plus the additional time from the previous day when the asset actually stopped.

The logic used here is to start the search from the end_date and go back in time until the start date of all objects are reached. For each period, it will get the availability data and check if there are any objects with available energy at the start of the period (oldest date). If so, it will remove these objects from the list and get the data for the previous period. This will continue until we have reached a point all assets where available at least once in the period.

Note

TODO: We can improve this method by looking into Bazefield to find the exact time the asset stopped.

Parameters:

  • end_date

    (date) –

    Date to get the stopped assets.

  • group_names

    (list[str] | None, default: None ) –

    Names of the groups to filter, by default None

  • group_types

    (list[str] | None, default: None ) –

    Types of the groups to filter, by default None

  • request_interval

    (timedelta | relativedelta | None, default: None ) –

    Interval of the requests to the database. If set to None will default to 1 month. By default None

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

  • output_type

    (Literal['dict', 'DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

Returns:

  • DataFrame

    In case output_type is "DataFrame", it will return a DataFrame with the stopped assets. It will have the object_name as the index and the columns start_date, duration_days and lost_energy.

  • dict[str, Any]

    In case output_type is "dict", it will return a dictionary with the stopped assets. The keys will be the object names and the values will be a dictionary with the keys start_date, duration_days and lost_energy.

Source code in echo_postgres/kpi_stoppedassets.py
@validate_call
def get(
    self,
    end_date: date,
    group_names: list[str] | None = None,
    group_types: list[str] | None = None,
    request_interval: timedelta | relativedelta | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, Any]:
    """Gets the stopped assets in a specific date.

    The result will have the following attributes for each stopped asset:

    - `object_name`: Name of the object.
    - `start_date`: Date when the object stopped (actually, that represents the first day the object was unavailable during the entire day, actual stop date is the previous day).
    - `duration_days`: Number of days the object was stopped.
    - `lost_energy`: Energy lost during the stoppage period (in kWh)

    It will use the availability data as a reference to find the stopped assets, this way we only have a daily resolution, not being able to find the exact time the asset stopped. The exact duration of the stoppage is the number of days returned by the method plus the additional time from the previous day when the asset actually stopped.

    The logic used here is to start the search from the end_date and go back in time until the start date of all objects are reached. For each period, it will get the availability data and check if there are any objects with available energy at the start of the period (oldest date). If so, it will remove these objects from the list and get the data for the previous period. This will continue until we have reached a point all assets where available at least once in the period.

    > [!NOTE]
    >
    > TODO: We can improve this method by looking into Bazefield to find the exact time the asset stopped.

    Parameters
    ----------
    end_date : date
        Date to get the stopped assets.
    group_names : list[str] | None, optional
        Names of the groups to filter, by default None
    group_types : list[str] | None, optional
        Types of the groups to filter, by default None
    request_interval : timedelta | relativedelta | None, optional
        Interval of the requests to the database. If set to None will default to 1 month. By default None
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"

    Returns
    -------
    DataFrame
        In case output_type is "DataFrame", it will return a DataFrame with the stopped assets. It will have the object_name as the index and the columns start_date, duration_days and lost_energy.
    dict[str, Any]
        In case output_type is "dict", it will return a dictionary with the stopped assets. The keys will be the object names and the values will be a dictionary with the keys start_date, duration_days and lost_energy.
    """
    if not request_interval:
        request_interval = relativedelta(months=1)

    # getting all objects
    objs_df = self._perfdb.objects.groups.instances.get(
        object_group_types=group_types,
        object_group_names=group_names,
        filter_type=filter_type,
        output_type="DataFrame",
    )
    if objs_df.empty:
        raise ValueError(f"No objects found for the given group_names and group_types: {group_names}, {group_types}")

    objs = []
    for obj_list in objs_df["object_names"]:
        objs.extend(obj_list)
    objs = list(set(objs))
    objs.sort()

    finished = False
    # while loop that runs until we the start date of all objects
    # it will start from the end date and go back in time until the start date of all objects are reached
    unavailable_energy_df = DataFrame()
    available_energy_df = DataFrame()
    wanted_objs = objs.copy()
    request_period = None
    iterations = 0
    while not finished:
        iterations += 1
        # defining the period to get the data
        if request_period is None:
            request_period = DateTimeRange(end_date - request_interval, end_date)
        else:
            request_period = DateTimeRange(request_period.start - request_interval, request_period.start - timedelta(days=1))
        logger.debug(f"Requesting data for period: {request_period} and objects: {wanted_objs}")
        # getting availability for the period
        avail_df = self._perfdb.kpis.availability.values.get(
            period=request_period,
            time_res="daily",
            object_or_group_names=wanted_objs,
            availability_types=["Technical"],
        )
        avail_df = avail_df.droplevel(level=["group_type_name", "availability_type_name"], axis=0)
        avail_df = avail_df.reset_index()
        avail_df = avail_df[["object_or_group_name", "date", "energy_available", "energy_unavailable"]]
        # pivoting the DataFrame to have the objects as columns
        period_unavailable_energy_df = avail_df.pivot(index="date", columns="object_or_group_name", values="energy_unavailable")
        period_available_energy_df = avail_df.pivot(index="date", columns="object_or_group_name", values="energy_available")
        # making sure index is sorted
        period_unavailable_energy_df = period_unavailable_energy_df.sort_index(ascending=False)
        period_available_energy_df = period_available_energy_df.sort_index(ascending=False)
        # removing all objects that have available energy at the end of the period
        remove_objs = period_available_energy_df.columns[period_available_energy_df.iloc[0] > 0].to_list()
        if remove_objs:
            period_unavailable_energy_df = period_unavailable_energy_df.drop(columns=remove_objs)
            period_available_energy_df = period_available_energy_df.drop(columns=remove_objs)
        # concatenating the DataFrames
        unavailable_energy_df = concat([unavailable_energy_df, period_unavailable_energy_df], axis=0, join="outer")
        available_energy_df = concat([available_energy_df, period_available_energy_df], axis=0, join="outer", sort=True)

        # for each object (column), lets set to NA all rows after (older) the first row with energy available
        # for this, for all columns lets find the index of the first row with energy available
        # lets mask available_energy_df to set <=0 values to NA
        masked_available_energy_df = available_energy_df.mask(available_energy_df <= 0)
        # lets find the first row with energy available for each object
        first_available_energy = np.where(
            masked_available_energy_df.iloc[0].isna(),
            masked_available_energy_df.notna().idxmax(),
            masked_available_energy_df.notna()[::-1].idxmax(),
        )
        # set to NA all rows after (older) the first row with energy available
        for obj, first_available in zip(available_energy_df.columns, first_available_energy, strict=False):
            # in case first_available is equal to the end_date, it means that the object has no available energy in the period, so we should skip it
            if first_available.astype(datetime).date() == end_date:
                continue
            available_energy_df.loc[first_available:, obj] = NA
            unavailable_energy_df.loc[first_available:, obj] = NA

        # check if there are any objects with not NA values in the last rows, if so, we need to get data for the previous period
        wanted_objs = available_energy_df.columns[available_energy_df.iloc[-1].notna()].to_list()
        if not wanted_objs:
            finished = True

    logger.info(f"Finished after {iterations} iterations. Found {len(unavailable_energy_df.columns)} objects that are stopped.")

    # creating DataFrame to store results
    results = DataFrame(columns=["object_name", "start_date", "duration_days", "lost_energy"])

    results["object_name"] = unavailable_energy_df.columns.values
    # finding first NA row for each object
    results["start_date"] = to_datetime(unavailable_energy_df.isna().idxmax()).values
    # calculating duration
    results["duration_days"] = (datetime.combine(end_date, time()) - results["start_date"]).dt.days
    # calculating lost energy
    results["lost_energy"] = unavailable_energy_df.sum().values

    results = results.astype(
        {
            "object_name": "string[pyarrow]",
            "start_date": "datetime64[s]",
            "duration_days": "int64[pyarrow]",
            "lost_energy": "double[pyarrow]",
        },
    )

    results = results.set_index("object_name")

    # returning in case output_type is DataFrame
    if output_type == "DataFrame":
        return results

    # returning in case output_type is dict
    results_dict = results.to_dict(orient="index")
    return results_dict