Skip to content

ONS Limitations

OnsLimitations(perfdb)

Class used for handling ONS Limitations. Can be accessed via perfdb.ons.limitations.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

get(period, time_res=None, aggregation_window=None, object_names=None, reasons=None, group_type='spe', filter_type='and', realtime=False, output_type='DataFrame')

Gets the ONS limitations for a given period.

The main keys/columns returned are:

  • object_name / spe_name: If group_type is set to site the column is called object_name, if set to spe it is called spe_name.
  • ons_site_name: Only present if group_type is set to spe.
  • start: Only present if time_res is None.
  • end: Only present if time_res is not None.
  • date: Only present if time_res is not None.
  • duration: Duration of the limitation.
  • reimbursable_accum_duration: Duration of the reimbursable accumulated limitation. Will have values only for REL limitations.
  • hourly_allowance: Hourly allowance for the limitation. Will have values only for REL limitations.
  • reimbursable: If the limitation is reimbursable.
  • reason: Reason for the limitation (ENE, CNF, REL, etc.)
  • limitation_value: Value of the limitation.
  • lost_energy: Lost energy in MWh.
  • produced_energy: Produced energy during the period of the limitation in MWh.
  • lost_energy_ons: Lost energy in MWh according to ONS.
  • produced_energy_ons: Produced energy during the period of the limitation in MWh according to ONS.
  • spe_count: Number of SPES affected by the limitation. Only present if group_type is set to site.
  • market_type: Free or Regulated, used for calculation of lost revenue.
  • submarket_acronym: Submarket acronym. Used for calculation of lost revenue.
  • energy_price_type: Type of energy price (PLD or PPA). Used for calculation of lost revenue.
  • energy_price: Energy price used for calculation of lost revenue.
  • lost_revenue: Lost revenue due to the limitation.
  • lost_revenue_ons: Lost revenue due to the limitation according to ONS.

Parameters:

  • period

    (DateTimeRange) –

    Desired period.

  • time_res

    (Literal['daily', 'monthly', 'quarterly', 'yearly'] | None, default: None ) –

    Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly", None], by default None

  • aggregation_window

    (Literal['mtd', 'ytd', '12m'] | None, default: None ) –

    Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None

  • object_names

    (list[str] | None, default: None ) –

    Names of the objects to get the limitations for. If None will get all. By default None

  • reasons

    (list[str] | None, default: None ) –

    Reasons (ENE, CNF, REL, etc.) to get the limitations for. If None will get all. By default None

  • realtime

    (bool, default: False ) –

    If set to True, will get the data from view v_ons_limitations_realtime. This view is only available for ONS Sites and does net get data from the materialized view mv_ons_spe_limitations, so no revenue data will be available.

    This is useful for getting the most recent data, specially by the routines that update the limitations calculating lost energy.

    By default, False

  • group_type

    (Literal['spe', 'site'], default: 'spe' ) –

    If set to spe will get the limitations per SPE. If set to site will get them per site (as defined by ONS). By default "spe"

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

  • output_type

    (Literal['dict', 'DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

Returns:

  • DataFrame

    If output_type is set to DataFrame will return a DataFrame in the format index = MultiIndex[object_name, start, reason, reimbursable], columns = [end, duration, ...]

  • dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]

    If output_type is set to dict, will return a dict in the format {object_name: {start: {reason: {reimbursable: {end: ..., duration: ...}}}}}

Source code in echo_postgres/ons_limitations.py
@validate_call
def get(
    self,
    period: DateTimeRange,
    time_res: Literal["daily", "monthly", "quarterly", "yearly"] | None = None,
    aggregation_window: Literal["mtd", "ytd", "12m"] | None = None,
    object_names: list[str] | None = None,
    reasons: list[str] | None = None,
    group_type: Literal["spe", "site"] = "spe",
    filter_type: Literal["and", "or"] = "and",
    realtime: bool = False,
    output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]:
    """Gets the ONS limitations for a given period.

    The main keys/columns returned are:

    - object_name / spe_name: If group_type is set to site the column is called object_name, if set to spe it is called spe_name.
    - ons_site_name: Only present if group_type is set to spe.
    - start: Only present if time_res is None.
    - end: Only present if time_res is not None.
    - date: Only present if time_res is not None.
    - duration: Duration of the limitation.
    - reimbursable_accum_duration: Duration of the reimbursable accumulated limitation. Will have values only for REL limitations.
    - hourly_allowance: Hourly allowance for the limitation. Will have values only for REL limitations.
    - reimbursable: If the limitation is reimbursable.
    - reason: Reason for the limitation (ENE, CNF, REL, etc.)
    - limitation_value: Value of the limitation.
    - lost_energy: Lost energy in MWh.
    - produced_energy: Produced energy during the period of the limitation in MWh.
    - lost_energy_ons: Lost energy in MWh according to ONS.
    - produced_energy_ons: Produced energy during the period of the limitation in MWh according to ONS.
    - spe_count: Number of SPES affected by the limitation. Only present if group_type is set to site.
    - market_type: Free or Regulated, used for calculation of lost revenue.
    - submarket_acronym: Submarket acronym. Used for calculation of lost revenue.
    - energy_price_type: Type of energy price (PLD or PPA). Used for calculation of lost revenue.
    - energy_price: Energy price used for calculation of lost revenue.
    - lost_revenue: Lost revenue due to the limitation.
    - lost_revenue_ons: Lost revenue due to the limitation according to ONS.

    Parameters
    ----------
    period : DateTimeRange
        Desired period.
    time_res : Literal["daily", "monthly", "quarterly", "yearly"] | None, optional
        Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly", None], by default None
    aggregation_window : Literal["mtd", "ytd", "12m"] | None, optional
        Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None
    object_names : list[str] | None, optional
        Names of the objects to get the limitations for. If None will get all. By default None
    reasons : list[str] | None, optional
        Reasons (ENE, CNF, REL, etc.) to get the limitations for. If None will get all. By default None
    realtime : bool, optional
        If set to True, will get the data from view v_ons_limitations_realtime. This view is only available for ONS Sites and does net get data from the materialized view mv_ons_spe_limitations, so no revenue data will be available.

        This is useful for getting the most recent data, specially by the routines that update the limitations calculating lost energy.

        By default, False
    group_type : Literal["spe", "site"], optional
        If set to spe will get the limitations per SPE. If set to site will get them per site (as defined by ONS). By default "spe"
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"

    Returns
    -------
    DataFrame
        If output_type is set to DataFrame will return a DataFrame in the format index = MultiIndex[object_name, start, reason, reimbursable], columns = [end, duration, ...]
    dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]
        If output_type is set to dict, will return a dict in the format {object_name: {start: {reason: {reimbursable: {end: ..., duration: ...}}}}}
    """
    if realtime and group_type == "spe":
        raise ValueError("realtime is only available for group_type site")

    table = f"mv_ons_{'spe_' if group_type == 'spe' else ''}limitations{'' if time_res is None else f'_{time_res}'}{'' if aggregation_window is None else f'_{aggregation_window}'}"
    if realtime:
        table = "v_ons_limitations_realtime"
    ref_time_col = "start" if time_res is None else "date"

    # building the query
    query = [
        sql.SQL("""SELECT * FROM performance.{table} WHERE ({time_col} >= {period_start} AND {time_col} <= {period_end})""").format(
            time_col=sql.Identifier(ref_time_col),
            table=sql.Identifier(table),
            period_start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            period_end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]

    where = []
    if object_names:
        # getting id's of the objects for faster query
        object_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
        if len(object_ids) != len(object_names):
            missing_names = [name for name in object_names if name not in object_ids]
            raise ValueError(f"Could not find the following object names: {missing_names}")
        object_ids = list(object_ids.values())

        where.append(
            sql.SQL("{objects_col} IN ({names})").format(
                objects_col=sql.Identifier("object_id") if group_type == "site" else sql.Identifier("spe_id"),
                names=sql.SQL(", ").join(map(sql.Literal, object_ids)),
            ),
        )
    if reasons:
        where.append(
            sql.SQL("reason IN ({reasons})").format(reasons=sql.SQL(", ").join(map(sql.Literal, reasons))),
        )
    if where:
        query.append(sql.SQL(" AND ("))
        query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
        query.append(sql.SQL(")"))

    query.append(
        sql.SQL("""ORDER BY {time_col}, {object_col}""").format(
            time_col=sql.Identifier(ref_time_col),
            object_col=sql.Identifier("object_name") if group_type == "site" else sql.Identifier("spe_name"),
        ),
    )

    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query, post_convert="pyarrow")

    # adjusting some data types
    col_dtypes = {
        "duration": "timedelta64[s]",
        "reimbursable_accum_duration": "timedelta64[s]",
        "hourly_allowance": "timedelta64[s]",
        "limitation_value": "double[pyarrow]",
    }
    if ref_time_col == "date":
        col_dtypes["date"] = "datetime64[s]"
    else:
        col_dtypes["start"] = "datetime64[s]"
        col_dtypes["end"] = "datetime64[s]"

    # removing columns that are not needed
    col_dtypes = {col: dtype for col, dtype in col_dtypes.items() if col in df.columns}

    df = df.astype(col_dtypes)

    # defining index
    df = (
        df.set_index(["spe_name", ref_time_col, "reason", "reimbursable"])
        if group_type == "spe"
        else df.set_index(["object_name", ref_time_col, "reason", "reimbursable"])
    )

    if output_type == "DataFrame":
        return df

    # converting to dictionary
    result = df.to_dict(orient="index")
    final_result = {}
    for (obj, time_col, reason, reimbursable), data in result.items():
        if obj not in final_result:
            final_result[obj] = {}
        if time_col not in final_result[obj]:
            final_result[obj][time_col] = {}
        if reason not in final_result[obj][time_col]:
            final_result[obj][time_col][reason] = {}
        final_result[obj][time_col][reason][reimbursable] = data

    return final_result

insert(df, group_type='spe', on_conflict='ignore')

Inserts ONS limitations into the database.

Parameters:

  • df

    (DataFrame) –

    DataFrame with the limitations to insert.

    If group_type is set to spe, the DataFrame must have the columns:

    • ons_site_name
    • spe_name
    • start
    • lost_energy
    • produced_energy
    • lost_energy_ons
    • produced_energy_ons

    If group_type is set to site, the DataFrame must have the columns:

    • object_name
    • start
    • end
    • reason
    • origin
    • limitation_value
    • description
  • group_type

    (Literal['spe', 'site'], default: 'spe' ) –

    If set to spe will save data to table "ons_spe_limitations". If set to site will save data for table "ons_limitations". By default "spe"

  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"

Source code in echo_postgres/ons_limitations.py
@validate_call
def insert(
    self,
    df: DataFrame,
    group_type: Literal["spe", "site"] = "spe",
    on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
    """Inserts ONS limitations into the database.

    Parameters
    ----------
    df : DataFrame
        DataFrame with the limitations to insert.

        If group_type is set to spe, the DataFrame must have the columns:

        - ons_site_name
        - spe_name
        - start
        - lost_energy
        - produced_energy
        - lost_energy_ons
        - produced_energy_ons

        If group_type is set to site, the DataFrame must have the columns:

        - object_name
        - start
        - end
        - reason
        - origin
        - limitation_value
        - description
    group_type : Literal["spe", "site"], optional
        If set to spe will save data to table "ons_spe_limitations". If set to site will save data for table "ons_limitations". By default "spe"
    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict. Can be one of ["ignore", "update"].
        By default "ignore"
    """
    # checking inputs
    if group_type not in ["spe", "site"]:
        raise ValueError(f"group_type must be one of ['spe', 'site'], not {group_type}")
    if on_conflict not in ["ignore", "update"]:
        raise ValueError(f"on_conflict must be one of ['ignore', 'update'], not {on_conflict}")

    # checking columns
    if group_type == "spe":
        required_cols = [
            "ons_site_name",
            "spe_name",
            "start",
            "lost_energy",
            "produced_energy",
            "lost_energy_ons",
            "produced_energy_ons",
        ]
    else:
        required_cols = ["object_name", "start", "end", "reason", "origin", "limitation_value", "description"]
    if any(col not in df.columns for col in required_cols):
        missing_cols = [col for col in required_cols if col not in df.columns]
        raise ValueError(f"df is missing the following columns: {missing_cols}")

    # making a copy of the DataFrame
    df = df.copy()

    # getting only the wanted columns
    df = df[required_cols].copy()

    # replacing object_name, spe_name and ons_site_name by their ids
    if group_type == "spe":
        wanted_obj_names = df["spe_name"].unique().tolist() + df["ons_site_name"].unique().tolist()
    else:
        wanted_obj_names = df["object_name"].unique().tolist()
    obj_ids = self._perfdb.objects.instances.get_ids(object_names=wanted_obj_names)

    if len(obj_ids) != len(wanted_obj_names):
        missing_names = [name for name in wanted_obj_names if name not in obj_ids]
        raise ValueError(f"Could not find the following object names: {missing_names}")

    if group_type == "spe":
        df["spe_id"] = df["spe_name"].map(obj_ids)
        df["ons_site_id"] = df["ons_site_name"].map(obj_ids)
        df = df.drop(columns=["spe_name", "ons_site_name"])
    else:
        df["object_id"] = df["object_name"].map(obj_ids)
        df = df.drop(columns=["object_name"])

    # dropping duplicates
    df = df.drop_duplicates()

    # inserting
    if_exists_mapping = {
        "ignore": "append",
        "update": "update",
    }
    table = "ons_spe_limitations" if group_type == "spe" else "ons_limitations"
    with self._perfdb.conn.reconnect() as conn:
        conn.pandas_to_sql(
            df=df,
            table_name=table,
            schema="performance",
            if_exists=if_exists_mapping[on_conflict],
            ignore_index=True,
        )

    logger.debug(f"Inserted {df.shape[0]} rows into {table} table")