ONS Limitations¶
OnsLimitations(perfdb)
¶
Class used for handling ONS Limitations. Can be accessed via perfdb.ons.limitations.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
get(period, time_res=None, aggregation_window=None, object_names=None, reasons=None, group_type='spe', filter_type='and', realtime=False, output_type='DataFrame')
¶
Gets the ONS limitations for a given period.
The main keys/columns returned are:
- object_name / spe_name: If group_type is set to site the column is called object_name, if set to spe it is called spe_name.
- ons_site_name: Only present if group_type is set to spe.
- start: Only present if time_res is None.
- end: Only present if time_res is not None.
- date: Only present if time_res is not None.
- duration: Duration of the limitation.
- reimbursable_accum_duration: Duration of the reimbursable accumulated limitation. Will have values only for REL limitations.
- hourly_allowance: Hourly allowance for the limitation. Will have values only for REL limitations.
- reimbursable: If the limitation is reimbursable.
- reason: Reason for the limitation (ENE, CNF, REL, etc.)
- limitation_value: Value of the limitation.
- lost_energy: Lost energy in MWh.
- produced_energy: Produced energy during the period of the limitation in MWh.
- lost_energy_ons: Lost energy in MWh according to ONS.
- produced_energy_ons: Produced energy during the period of the limitation in MWh according to ONS.
- spe_count: Number of SPES affected by the limitation. Only present if group_type is set to site.
- market_type: Free or Regulated, used for calculation of lost revenue.
- submarket_acronym: Submarket acronym. Used for calculation of lost revenue.
- energy_price_type: Type of energy price (PLD or PPA). Used for calculation of lost revenue.
- energy_price: Energy price used for calculation of lost revenue.
- lost_revenue: Lost revenue due to the limitation.
- lost_revenue_ons: Lost revenue due to the limitation according to ONS.
Parameters:
-
(period¶DateTimeRange) –Desired period.
-
(time_res¶Literal['daily', 'monthly', 'quarterly', 'yearly'] | None, default:None) –Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly", None], by default None
-
(aggregation_window¶Literal['mtd', 'ytd', '12m'] | None, default:None) –Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None
-
(object_names¶list[str] | None, default:None) –Names of the objects to get the limitations for. If None will get all. By default None
-
(reasons¶list[str] | None, default:None) –Reasons (ENE, CNF, REL, etc.) to get the limitations for. If None will get all. By default None
-
(realtime¶bool, default:False) –If set to True, will get the data from view v_ons_limitations_realtime. This view is only available for ONS Sites and does net get data from the materialized view mv_ons_spe_limitations, so no revenue data will be available.
This is useful for getting the most recent data, specially by the routines that update the limitations calculating lost energy.
By default, False
-
(group_type¶Literal['spe', 'site'], default:'spe') –If set to spe will get the limitations per SPE. If set to site will get them per site (as defined by ONS). By default "spe"
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(output_type¶Literal['dict', 'DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
Returns:
-
DataFrame–If output_type is set to DataFrame will return a DataFrame in the format index = MultiIndex[object_name, start, reason, reimbursable], columns = [end, duration, ...]
-
dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]–If output_type is set to dict, will return a dict in the format {object_name: {start: {reason: {reimbursable: {end: ..., duration: ...}}}}}
Source code in echo_postgres/ons_limitations.py
@validate_call
def get(
self,
period: DateTimeRange,
time_res: Literal["daily", "monthly", "quarterly", "yearly"] | None = None,
aggregation_window: Literal["mtd", "ytd", "12m"] | None = None,
object_names: list[str] | None = None,
reasons: list[str] | None = None,
group_type: Literal["spe", "site"] = "spe",
filter_type: Literal["and", "or"] = "and",
realtime: bool = False,
output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]:
"""Gets the ONS limitations for a given period.
The main keys/columns returned are:
- object_name / spe_name: If group_type is set to site the column is called object_name, if set to spe it is called spe_name.
- ons_site_name: Only present if group_type is set to spe.
- start: Only present if time_res is None.
- end: Only present if time_res is not None.
- date: Only present if time_res is not None.
- duration: Duration of the limitation.
- reimbursable_accum_duration: Duration of the reimbursable accumulated limitation. Will have values only for REL limitations.
- hourly_allowance: Hourly allowance for the limitation. Will have values only for REL limitations.
- reimbursable: If the limitation is reimbursable.
- reason: Reason for the limitation (ENE, CNF, REL, etc.)
- limitation_value: Value of the limitation.
- lost_energy: Lost energy in MWh.
- produced_energy: Produced energy during the period of the limitation in MWh.
- lost_energy_ons: Lost energy in MWh according to ONS.
- produced_energy_ons: Produced energy during the period of the limitation in MWh according to ONS.
- spe_count: Number of SPES affected by the limitation. Only present if group_type is set to site.
- market_type: Free or Regulated, used for calculation of lost revenue.
- submarket_acronym: Submarket acronym. Used for calculation of lost revenue.
- energy_price_type: Type of energy price (PLD or PPA). Used for calculation of lost revenue.
- energy_price: Energy price used for calculation of lost revenue.
- lost_revenue: Lost revenue due to the limitation.
- lost_revenue_ons: Lost revenue due to the limitation according to ONS.
Parameters
----------
period : DateTimeRange
Desired period.
time_res : Literal["daily", "monthly", "quarterly", "yearly"] | None, optional
Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly", None], by default None
aggregation_window : Literal["mtd", "ytd", "12m"] | None, optional
Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None
object_names : list[str] | None, optional
Names of the objects to get the limitations for. If None will get all. By default None
reasons : list[str] | None, optional
Reasons (ENE, CNF, REL, etc.) to get the limitations for. If None will get all. By default None
realtime : bool, optional
If set to True, will get the data from view v_ons_limitations_realtime. This view is only available for ONS Sites and does net get data from the materialized view mv_ons_spe_limitations, so no revenue data will be available.
This is useful for getting the most recent data, specially by the routines that update the limitations calculating lost energy.
By default, False
group_type : Literal["spe", "site"], optional
If set to spe will get the limitations per SPE. If set to site will get them per site (as defined by ONS). By default "spe"
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
Returns
-------
DataFrame
If output_type is set to DataFrame will return a DataFrame in the format index = MultiIndex[object_name, start, reason, reimbursable], columns = [end, duration, ...]
dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]
If output_type is set to dict, will return a dict in the format {object_name: {start: {reason: {reimbursable: {end: ..., duration: ...}}}}}
"""
if realtime and group_type == "spe":
raise ValueError("realtime is only available for group_type site")
table = f"mv_ons_{'spe_' if group_type == 'spe' else ''}limitations{'' if time_res is None else f'_{time_res}'}{'' if aggregation_window is None else f'_{aggregation_window}'}"
if realtime:
table = "v_ons_limitations_realtime"
ref_time_col = "start" if time_res is None else "date"
# building the query
query = [
sql.SQL("""SELECT * FROM performance.{table} WHERE ({time_col} >= {period_start} AND {time_col} <= {period_end})""").format(
time_col=sql.Identifier(ref_time_col),
table=sql.Identifier(table),
period_start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
period_end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
where = []
if object_names:
# getting id's of the objects for faster query
object_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
if len(object_ids) != len(object_names):
missing_names = [name for name in object_names if name not in object_ids]
raise ValueError(f"Could not find the following object names: {missing_names}")
object_ids = list(object_ids.values())
where.append(
sql.SQL("{objects_col} IN ({names})").format(
objects_col=sql.Identifier("object_id") if group_type == "site" else sql.Identifier("spe_id"),
names=sql.SQL(", ").join(map(sql.Literal, object_ids)),
),
)
if reasons:
where.append(
sql.SQL("reason IN ({reasons})").format(reasons=sql.SQL(", ").join(map(sql.Literal, reasons))),
)
if where:
query.append(sql.SQL(" AND ("))
query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
query.append(sql.SQL(")"))
query.append(
sql.SQL("""ORDER BY {time_col}, {object_col}""").format(
time_col=sql.Identifier(ref_time_col),
object_col=sql.Identifier("object_name") if group_type == "site" else sql.Identifier("spe_name"),
),
)
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query, post_convert="pyarrow")
# adjusting some data types
col_dtypes = {
"duration": "timedelta64[s]",
"reimbursable_accum_duration": "timedelta64[s]",
"hourly_allowance": "timedelta64[s]",
"limitation_value": "double[pyarrow]",
}
if ref_time_col == "date":
col_dtypes["date"] = "datetime64[s]"
else:
col_dtypes["start"] = "datetime64[s]"
col_dtypes["end"] = "datetime64[s]"
# removing columns that are not needed
col_dtypes = {col: dtype for col, dtype in col_dtypes.items() if col in df.columns}
df = df.astype(col_dtypes)
# defining index
df = (
df.set_index(["spe_name", ref_time_col, "reason", "reimbursable"])
if group_type == "spe"
else df.set_index(["object_name", ref_time_col, "reason", "reimbursable"])
)
if output_type == "DataFrame":
return df
# converting to dictionary
result = df.to_dict(orient="index")
final_result = {}
for (obj, time_col, reason, reimbursable), data in result.items():
if obj not in final_result:
final_result[obj] = {}
if time_col not in final_result[obj]:
final_result[obj][time_col] = {}
if reason not in final_result[obj][time_col]:
final_result[obj][time_col][reason] = {}
final_result[obj][time_col][reason][reimbursable] = data
return final_result
insert(df, group_type='spe', on_conflict='ignore')
¶
Inserts ONS limitations into the database.
Parameters:
-
(df¶DataFrame) –DataFrame with the limitations to insert.
If group_type is set to spe, the DataFrame must have the columns:
- ons_site_name
- spe_name
- start
- lost_energy
- produced_energy
- lost_energy_ons
- produced_energy_ons
If group_type is set to site, the DataFrame must have the columns:
- object_name
- start
- end
- reason
- origin
- limitation_value
- description
-
(group_type¶Literal['spe', 'site'], default:'spe') –If set to spe will save data to table "ons_spe_limitations". If set to site will save data for table "ons_limitations". By default "spe"
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"
Source code in echo_postgres/ons_limitations.py
@validate_call
def insert(
self,
df: DataFrame,
group_type: Literal["spe", "site"] = "spe",
on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
"""Inserts ONS limitations into the database.
Parameters
----------
df : DataFrame
DataFrame with the limitations to insert.
If group_type is set to spe, the DataFrame must have the columns:
- ons_site_name
- spe_name
- start
- lost_energy
- produced_energy
- lost_energy_ons
- produced_energy_ons
If group_type is set to site, the DataFrame must have the columns:
- object_name
- start
- end
- reason
- origin
- limitation_value
- description
group_type : Literal["spe", "site"], optional
If set to spe will save data to table "ons_spe_limitations". If set to site will save data for table "ons_limitations". By default "spe"
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. Can be one of ["ignore", "update"].
By default "ignore"
"""
# checking inputs
if group_type not in ["spe", "site"]:
raise ValueError(f"group_type must be one of ['spe', 'site'], not {group_type}")
if on_conflict not in ["ignore", "update"]:
raise ValueError(f"on_conflict must be one of ['ignore', 'update'], not {on_conflict}")
# checking columns
if group_type == "spe":
required_cols = [
"ons_site_name",
"spe_name",
"start",
"lost_energy",
"produced_energy",
"lost_energy_ons",
"produced_energy_ons",
]
else:
required_cols = ["object_name", "start", "end", "reason", "origin", "limitation_value", "description"]
if any(col not in df.columns for col in required_cols):
missing_cols = [col for col in required_cols if col not in df.columns]
raise ValueError(f"df is missing the following columns: {missing_cols}")
# making a copy of the DataFrame
df = df.copy()
# getting only the wanted columns
df = df[required_cols].copy()
# replacing object_name, spe_name and ons_site_name by their ids
if group_type == "spe":
wanted_obj_names = df["spe_name"].unique().tolist() + df["ons_site_name"].unique().tolist()
else:
wanted_obj_names = df["object_name"].unique().tolist()
obj_ids = self._perfdb.objects.instances.get_ids(object_names=wanted_obj_names)
if len(obj_ids) != len(wanted_obj_names):
missing_names = [name for name in wanted_obj_names if name not in obj_ids]
raise ValueError(f"Could not find the following object names: {missing_names}")
if group_type == "spe":
df["spe_id"] = df["spe_name"].map(obj_ids)
df["ons_site_id"] = df["ons_site_name"].map(obj_ids)
df = df.drop(columns=["spe_name", "ons_site_name"])
else:
df["object_id"] = df["object_name"].map(obj_ids)
df = df.drop(columns=["object_name"])
# dropping duplicates
df = df.drop_duplicates()
# inserting
if_exists_mapping = {
"ignore": "append",
"update": "update",
}
table = "ons_spe_limitations" if group_type == "spe" else "ons_limitations"
with self._perfdb.conn.reconnect() as conn:
conn.pandas_to_sql(
df=df,
table_name=table,
schema="performance",
if_exists=if_exists_mapping[on_conflict],
ignore_index=True,
)
logger.debug(f"Inserted {df.shape[0]} rows into {table} table")