Skip to content

KPI Stopped Assets

KpiStoppedAssets(perfdb)

Class used for handling Stopped Assets KPI. Can be accessed via perfdb.kpis.stoppedassets.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
Python
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

get(end_date, group_names=None, group_types=None, request_interval=None, filter_type='and', output_type='DataFrame')

Gets the stopped assets in a specific date.

The result will have the following attributes for each stopped asset:

  • object_name: Name of the object.
  • start_date: Date when the object stopped (actually, that represents the first day the object was unavailable during the entire day, actual stop date is the previous day).
  • duration_days: Number of days the object was stopped.
  • lost_energy: Energy lost during the stoppage period (in kWh)

It will use the availability data as a reference to find the stopped assets, this way we only have a daily resolution, not being able to find the exact time the asset stopped. The exact duration of the stoppage is the number of days returned by the method plus the additional time from the previous day when the asset actually stopped.

The logic used here is to start the search from the end_date and go back in time until the start date of all objects are reached. For each period, it will get the availability data and check if there are any objects with available energy at the start of the period (oldest date). If so, it will remove these objects from the list and get the data for the previous period. This will continue until we have reached a point all assets where available at least once in the period.

Note

TODO: We can improve this method by looking into Bazefield to find the exact time the asset stopped.

Parameters:

  • end_date

    (date) –

    Date to get the stopped assets.

  • group_names

    (list[str] | None, default: None ) –

    Names of the groups to filter, by default None

  • group_types

    (list[str] | None, default: None ) –

    Types of the groups to filter, by default None

  • request_interval

    (timedelta | relativedelta | None, default: None ) –

    Interval of the requests to the database. If set to None will default to 1 month. By default None

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

  • output_type

    (Literal['dict', 'DataFrame', 'pl.DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["dict", "DataFrame", "pl.DataFrame"] By default "dict"

Returns:

  • DataFrame

    In case output_type is "DataFrame", it will return a DataFrame with the stopped assets. It will have the object_name as the index and the columns start_date, duration_days and lost_energy.

  • dict[str, Any]

    In case output_type is "dict", it will return a dictionary with the stopped assets. The keys will be the object names and the values will be a dictionary with the keys start_date, duration_days and lost_energy.

  • DataFrame

    In case output_type is "pl.DataFrame", it will return a Polars DataFrame with the stopped assets. It will have the object_name as the index and the columns start_date, duration_days and lost_energy.

Source code in echo_postgres/kpi_stoppedassets.py
Python
@validate_call
def get(
    self,
    end_date: date,
    group_names: list[str] | None = None,
    group_types: list[str] | None = None,
    request_interval: timedelta | relativedelta | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "DataFrame",
) -> pd.DataFrame | dict[str, Any] | pl.DataFrame:
    """Gets the stopped assets in a specific date.

    The result will have the following attributes for each stopped asset:

    - `object_name`: Name of the object.
    - `start_date`: Date when the object stopped (actually, that represents the first day the object was unavailable during the entire day, actual stop date is the previous day).
    - `duration_days`: Number of days the object was stopped.
    - `lost_energy`: Energy lost during the stoppage period (in kWh)

    It will use the availability data as a reference to find the stopped assets, this way we only have a daily resolution, not being able to find the exact time the asset stopped. The exact duration of the stoppage is the number of days returned by the method plus the additional time from the previous day when the asset actually stopped.

    The logic used here is to start the search from the end_date and go back in time until the start date of all objects are reached. For each period, it will get the availability data and check if there are any objects with available energy at the start of the period (oldest date). If so, it will remove these objects from the list and get the data for the previous period. This will continue until we have reached a point all assets where available at least once in the period.

    > [!NOTE]
    >
    > TODO: We can improve this method by looking into Bazefield to find the exact time the asset stopped.

    Parameters
    ----------
    end_date : date
        Date to get the stopped assets.
    group_names : list[str] | None, optional
        Names of the groups to filter, by default None
    group_types : list[str] | None, optional
        Types of the groups to filter, by default None
    request_interval : timedelta | relativedelta | None, optional
        Interval of the requests to the database. If set to None will default to 1 month. By default None
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"
    output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame", "pl.DataFrame"]
        By default "dict"

    Returns
    -------
    DataFrame
        In case output_type is "DataFrame", it will return a DataFrame with the stopped assets. It will have the object_name as the index and the columns start_date, duration_days and lost_energy.
    dict[str, Any]
        In case output_type is "dict", it will return a dictionary with the stopped assets. The keys will be the object names and the values will be a dictionary with the keys start_date, duration_days and lost_energy.
    pl.DataFrame
        In case output_type is "pl.DataFrame", it will return a Polars DataFrame with the stopped assets. It will have the object_name as the index and the columns start_date, duration_days and lost_energy.
    """
    if not request_interval:
        request_interval = relativedelta(months=1)

    # getting all objects
    objs_df = self._perfdb.objects.groups.instances.get(
        object_group_types=group_types,
        object_group_names=group_names,
        filter_type=filter_type,
        output_type="pl.DataFrame",
    )
    if objs_df.is_empty():
        raise ValueError(f"No objects found for the given group_names and group_types: {group_names}, {group_types}")

    objs = []
    for obj_list in objs_df["object_names"]:
        objs.extend(obj_list)
    objs = list(set(objs))
    objs.sort()

    # while loop that runs until we find the start date of all objects
    # it will start from the end date and go back in time until the start date of all objects are reached
    all_data: pl.DataFrame | None = None
    all_boundaries: pl.DataFrame | None = None
    wanted_objs = objs.copy()
    request_period = None
    iterations = 0
    while wanted_objs:
        iterations += 1
        # defining the period to get the data
        if request_period is None:
            request_period = DateTimeRange(end_date - request_interval, end_date)
        else:
            request_period = DateTimeRange(request_period.start - request_interval, request_period.start - timedelta(days=1))
        logger.debug(f"Requesting data for period: {request_period} and objects: {wanted_objs}")
        # getting availability for the period
        avail_df = self._perfdb.kpis.availability.values.get(
            period=request_period,
            time_res="daily",
            object_or_group_names=wanted_objs,
            availability_types=["Technical"],
            output_type="pl.DataFrame",
        )
        # selecting and renaming needed columns
        avail_df = avail_df.select(
            pl.col("object_or_group_name").alias("object_name"),
            "date",
            "energy_available",
            "energy_unavailable",
        )

        if avail_df.is_empty():
            logger.warning(f"No availability data found for period {request_period}. Stopping search.")
            break

        # removing all objects that have available energy at the most recent date of the period
        max_date = avail_df["date"].max()
        objs_with_avail = (
            avail_df.filter((pl.col("date") == max_date) & (pl.col("energy_available") > 0))
            .get_column("object_name")
            .unique()
            .to_list()
        )
        if objs_with_avail:
            avail_df = avail_df.filter(~pl.col("object_name").is_in(objs_with_avail))

        if avail_df.is_empty():
            break

        # concatenating with accumulated data
        all_data = pl.concat([all_data, avail_df], how="diagonal_relaxed") if all_data is not None else avail_df

        # for each object, find the boundary date (most recent date with energy_available > 0)
        new_boundaries = (
            all_data.filter(pl.col("energy_available") > 0).group_by("object_name").agg(pl.col("date").max().alias("boundary_date"))
        )
        if not new_boundaries.is_empty():
            all_boundaries = (
                (
                    pl.concat([all_boundaries, new_boundaries], how="diagonal_relaxed")
                    .group_by("object_name")
                    .agg(pl.col("boundary_date").max())
                )
                if all_boundaries is not None
                else new_boundaries
            )

        # filter out rows at or before the boundary date for each object
        if all_boundaries is not None:
            all_data = (
                all_data.join(all_boundaries, on="object_name", how="left")
                .filter(pl.col("boundary_date").is_null() | (pl.col("date") > pl.col("boundary_date")))
                .drop("boundary_date")
            )

        # objects without a boundary still need more historical data
        done_objs = set(all_boundaries.get_column("object_name").to_list()) if all_boundaries is not None else set()
        remaining_objs = set(all_data.get_column("object_name").unique().to_list()) if not all_data.is_empty() else set()
        wanted_objs = sorted(remaining_objs - done_objs)

    n_stopped = all_data.get_column("object_name").n_unique() if all_data is not None and not all_data.is_empty() else 0
    logger.info(f"Finished after {iterations} iterations. Found {n_stopped} objects that are stopped.")

    # building results
    if all_data is None or all_data.is_empty():
        results = pl.DataFrame(
            schema={
                "object_name": pl.Utf8,
                "start_date": pl.Datetime("ms"),
                "duration_days": pl.Int64,
                "lost_energy": pl.Float64,
            },
        )
    else:
        # compute lost energy per object from the stoppage rows
        lost_energy = all_data.group_by("object_name").agg(pl.col("energy_unavailable").sum().alias("lost_energy"))

        # join with boundaries and compute start_date and duration
        if all_boundaries is not None:
            results = lost_energy.join(all_boundaries, on="object_name", how="left")
        else:
            results = lost_energy.with_columns(pl.lit(None, dtype=pl.Date).alias("boundary_date"))

        results = (
            results.with_columns(
                pl.col("boundary_date").cast(pl.Datetime("ms")).alias("start_date"),
                (pl.lit(end_date) - pl.col("boundary_date")).dt.total_days().cast(pl.Int64).alias("duration_days"),
            )
            .select(["object_name", "start_date", "duration_days", "lost_energy"])
            .sort("object_name")
        )

    return convert_output(results, output_type, index_col="object_name")