ONS Limitations Time Series¶
OnsLimitationsSeries(perfdb)
¶
Class used for handling Time Series of ONS Limitations. Can be accessed via perfdb.ons.limitations.series.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
get(period, object_names, group_type='spe', output_type='DataFrame', errors='raise')
¶
Gets a high resolution time series of ONS Limitations for the given objects.
This is based on the materialized view mv_ons_limitations which contains the start and end time of each limitation event, allowing us to build a one minute resolution time series.
For this method to work, the following attributes must be defined for the objects:
- ons_spes for sites
- nominal_power for both sites and SPEs
The main keys/columns returned are:
- 'timestamp': Timestamp of the limitation value (1 min resolution). Will be the index if output_type is "DataFrame".
- '
': Limitation value in kW for the given object at that timestamp. There will be one column per object name requested.
Parameters:
-
(period¶DateTimeRange) –Desired period.
-
(object_names¶list[str]) –Names of the objects to get the limitations for. Must match the types specified by group_type.
-
(group_type¶Literal['spe', 'site'], default:'spe') –If set to spe will get the limitations per SPE. If set to site will get them per site (as defined by ONS). By default "spe"
-
(output_type¶Literal['DataFrame', 'pl.DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["DataFrame", "pl.DataFrame"] By default "DataFrame"
-
(errors¶Literal['raise', 'ignore'], default:'raise') –Whether to raise an error or just log a warning if some of the object_names are not found in the database. By default "raise"
Returns:
-
DataFrame | DataFrame–Pandas or Polars DataFrame containing the time series of limitations.
Source code in echo_postgres/ons_limitations_series.py
@validate_call
def get(
self,
period: DateTimeRange,
object_names: list[str],
group_type: Literal["spe", "site"] = "spe",
output_type: Literal["DataFrame", "pl.DataFrame"] = "DataFrame",
errors: Literal["raise", "ignore"] = "raise",
) -> pd.DataFrame | pl.DataFrame:
"""Gets a high resolution time series of ONS Limitations for the given objects.
This is based on the materialized view `mv_ons_limitations` which contains the start and end time of each limitation event, allowing us to build a one minute resolution time series.
For this method to work, the following attributes must be defined for the objects:
- `ons_spes` for sites
- `nominal_power` for both sites and SPEs
The main keys/columns returned are:
- 'timestamp': Timestamp of the limitation value (1 min resolution). Will be the index if output_type is "DataFrame".
- '<object_name>': Limitation value in kW for the given object at that timestamp. There will be one column per object name requested.
Parameters
----------
period : DateTimeRange
Desired period.
object_names : list[str]
Names of the objects to get the limitations for. Must match the types specified by group_type.
group_type : Literal["spe", "site"], optional
If set to spe will get the limitations per SPE. If set to site will get them per site (as defined by ONS). By default "spe"
output_type : Literal["DataFrame", "pl.DataFrame"], optional
Output type of the data. Can be one of ["DataFrame", "pl.DataFrame"]
By default "DataFrame"
errors : Literal["raise", "ignore"], optional
Whether to raise an error or just log a warning if some of the object_names are not found in the database.
By default "raise"
Returns
-------
pd.DataFrame | pl.DataFrame
Pandas or Polars DataFrame containing the time series of limitations.
"""
# first getting all the associated SPEs/sites
spes_sites_mapping = self._perfdb.objects.instances.get(
object_types=["ons_site"],
attribute_names=["ons_spes", "nominal_power"],
get_attributes=True,
output_type="DataFrame",
)
spes_sites_mapping = (
spes_sites_mapping.reset_index()[["name", "ons_spes", "nominal_power"]]
.rename(columns={"name": "site_name", "ons_spes": "spe_name"})
.explode("spe_name")
)
spes_sites_mapping = pl.from_pandas(spes_sites_mapping)
# validating if the object names exist
if group_type == "spe":
missing_objects = set(object_names) - set(spes_sites_mapping["spe_name"].unique().to_list())
else:
missing_objects = set(object_names) - set(spes_sites_mapping["site_name"].unique().to_list())
if missing_objects:
if errors == "raise":
raise ValueError(f"The following {group_type} names were not found in the database: {missing_objects}")
logger.warning(f"The following {group_type} names were not found in the database: {missing_objects}")
# now getting the power for SPEs if group_type is spe
if group_type == "spe":
spes_power = self._perfdb.objects.instances.get(
object_names=object_names,
attribute_names=["nominal_power"],
get_attributes=True,
output_type="DataFrame",
)
spes_power = spes_power.reset_index()[["name", "nominal_power"]].rename(columns={"name": "spe_name"})
spes_power = pl.from_pandas(spes_power)
# getting the limitation events
df: pl.DataFrame = self._perfdb.ons.limitations.get(
period=period,
object_names=object_names,
group_type=group_type,
output_type="pl.DataFrame",
)
# rename columns
if group_type == "spe":
df = df.rename({"spe_name": "object_name"})
# getting only the relevant columns
df = df.select(
[
"object_name",
"limitation_value",
"start",
"end",
],
)
# converting limitation values from MW to kW
df = df.with_columns(
(pl.col("limitation_value") * 1000).alias("limitation_value"),
)
# building the time series
final_df = None
for object_name in object_names:
df_object = df.filter(pl.col("object_name") == object_name)
# create a dataframe in 1 min frequency with the limitation values for this SPE using polars only
df_time_index = pl.DataFrame().with_columns(
pl.datetime_range(
start=period.start,
end=period.end,
interval="1m",
closed="left",
)
.dt.cast_time_unit("ms")
.alias("timestamp"), # Cast to milliseconds
)
# First expand the datetime ranges into individual timestamps
df_object_expanded = (
df_object.with_columns(
pl.datetime_ranges("start", "end", interval="1m", closed="left").alias("timestamp_ranges"),
)
.explode("timestamp_ranges")
.rename({"timestamp_ranges": "timestamp"})
)
# Now join with the time index
df_expanded = df_time_index.join(
df_object_expanded,
on="timestamp",
how="left",
)
# now lets just keep timestamp and limitation_value, renaming limitation_value to the object name
df_expanded = df_expanded.select(
[
"timestamp",
pl.col("limitation_value").alias(object_name),
],
)
# in case of SPEs, we need to convert limitation_value from site to SPE by doing the ration from nominal powers
# TODO we should change this from nominal power to MUST
if group_type == "spe":
# getting nominal power for this SPE
spe_nominal_power = spes_power.filter(pl.col("spe_name") == object_name)["nominal_power"]
if len(spe_nominal_power) == 0:
raise ValueError(f"Nominal power not found for SPE {object_name}")
spe_nominal_power = spe_nominal_power[0]
# getting site nominal power
site_names = spes_sites_mapping.filter(pl.col("spe_name") == object_name)["site_name"].unique().to_list()
if len(site_names) == 0:
if errors == "raise":
raise ValueError(f"Site not found for SPE {object_name}")
logger.warning(f"Site not found for SPE {object_name}, limitation value will be considered as nominal power.")
df_expanded = df_expanded.with_columns(
pl.lit(spe_nominal_power).alias(object_name),
)
else:
site_nominal_power = spes_sites_mapping.filter(pl.col("site_name").is_in(site_names))["nominal_power"].sum()
if site_nominal_power == 0:
raise ValueError(f"Site nominal power is zero for SPE {object_name}")
# adjusting limitation value
df_expanded = df_expanded.with_columns(
(pl.col(object_name) * spe_nominal_power / site_nominal_power).alias(object_name),
)
final_df = df_expanded if final_df is None else final_df.join(df_expanded, on="timestamp", how="left")
# merging all DataFrame on timestamp
if output_type == "pl.DataFrame":
return final_df
final_df = final_df.to_pandas(use_pyarrow_extension_array=True)
final_df = final_df.set_index("timestamp")
return final_df