KPI Availability Forecast Curtailment Event Assets¶
KpiAvailabilityForecastCurtailmentEventAssets(perfdb)
¶
Class used for linking assets to curtailment events. Can be accessed via perfdb.kpis.availability.forecasts.curtailment_events.assets.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(curtailment_event_ids=None, object_names=None)
¶
Removes asset links from curtailment events.
At least one filter must be provided.
Parameters:
-
(curtailment_event_ids¶list[int] | None, default:None) –List of curtailment event IDs to filter. By default None.
-
(object_names¶list[str] | None, default:None) –List of object names to filter. By default None.
Returns:
-
int–Number of rows deleted.
Source code in echo_postgres/kpi_availability_forecast_curtailment_event_assets.py
@validate_call
def delete(
self,
curtailment_event_ids: list[int] | None = None,
object_names: list[str] | None = None,
) -> int:
"""Removes asset links from curtailment events.
At least one filter must be provided.
Parameters
----------
curtailment_event_ids : list[int] | None, optional
List of curtailment event IDs to filter. By default None.
object_names : list[str] | None, optional
List of object names to filter. By default None.
Returns
-------
int
Number of rows deleted.
"""
if curtailment_event_ids is None and object_names is None:
raise ValueError("At least one of 'curtailment_event_ids' or 'object_names' must be provided.")
conditions = []
if curtailment_event_ids:
conditions.append(sql.SQL("curtailment_event_id = ANY({ids})").format(ids=sql.Literal(curtailment_event_ids)))
if object_names:
obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
if missing := set(object_names) - set(obj_ids.keys()):
raise ValueError(f"Objects not found: {missing}")
conditions.append(sql.SQL("object_id = ANY({ids})").format(ids=sql.Literal(list(obj_ids.values()))))
query = sql.SQL("DELETE FROM performance.power_forecast_curtailment_event_assets WHERE ") + sql.SQL(" AND ").join(conditions)
self._perfdb.conn.execute(query)
deleted = self._perfdb.conn.rowcount
logger.debug(f"Deleted {deleted} curtailment event asset link(s).")
return deleted
get(curtailment_event_ids=None, object_names=None, filter_type='and', output_type='pl.DataFrame')
¶
Gets assets linked to curtailment events.
The most useful keys/columns returned are:
- curtailment_event_id
- object_name
- object_type_name
- revision_name
- description
- curtailment_percentage
- power_fraction_available
- start_time
- end_time
- duration_hours
- duration_days
Parameters:
-
(curtailment_event_ids¶list[int] | None, default:None) –List of curtailment event IDs to filter. By default None.
-
(object_names¶list[str] | None, default:None) –List of object names to filter. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
-
(output_type¶Literal['dict', 'DataFrame', 'pl.DataFrame'], default:'pl.DataFrame') –Output type. By default "pl.DataFrame".
Returns:
-
dict[Any, dict[str, Any]]–In case output_type is "dict", returns {(curtailment_event_id, object_name): {col: val, ...}, ...}.
-
DataFrame–In case output_type is "DataFrame", returns a pandas DataFrame with MultiIndex.
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame.
Source code in echo_postgres/kpi_availability_forecast_curtailment_event_assets.py
@validate_call
def get(
self,
curtailment_event_ids: list[int] | None = None,
object_names: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[Any, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
"""Gets assets linked to curtailment events.
The most useful keys/columns returned are:
- curtailment_event_id
- object_name
- object_type_name
- revision_name
- description
- curtailment_percentage
- power_fraction_available
- start_time
- end_time
- duration_hours
- duration_days
Parameters
----------
curtailment_event_ids : list[int] | None, optional
List of curtailment event IDs to filter. By default None.
object_names : list[str] | None, optional
List of object names to filter. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
Output type. By default "pl.DataFrame".
Returns
-------
dict[Any, dict[str, Any]]
In case output_type is "dict", returns {(curtailment_event_id, object_name): {col: val, ...}, ...}.
pd.DataFrame
In case output_type is "DataFrame", returns a pandas DataFrame with MultiIndex.
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame.
"""
where = (
WhereClauseBuilder(filter_type=filter_type)
.add_any("curtailment_event_id", curtailment_event_ids)
.add_any("object_name", object_names)
.build()
)
query = sql.SQL(
"SELECT * FROM performance.v_power_forecast_curtailment_event_assets {where} ORDER BY curtailment_event_id, object_name",
).format(where=where)
df = self._perfdb.conn.read_to_polars(query, schema_overrides=self._cols_schema)
return convert_output(df, output_type, index_col=["curtailment_event_id", "object_name"])
insert(curtailment_event_id=None, object_name=None, data_df=None, on_conflict='ignore')
¶
Links one or more assets to a curtailment event.
You can pass individual values to link a single asset, or a DataFrame for batch linking.
Parameters:
-
(curtailment_event_id¶int | None, default:None) –ID of the curtailment event. Required for single insert. By default None.
-
(object_name¶str | None, default:None) –Name of the asset to link. Must be a wind_turbine or solar_inverter. Required for single insert. By default None.
-
(data_df¶DataFrame | None, default:None) –DataFrame for batch insert. Required columns: curtailment_event_id, object_name. When provided, individual parameters are ignored. By default None.
-
(on_conflict¶Literal['raise', 'ignore'], default:'ignore') –Behavior if the (curtailment_event_id, object_id) pair already exists. By default "ignore".
Source code in echo_postgres/kpi_availability_forecast_curtailment_event_assets.py
@validate_call
def insert(
self,
curtailment_event_id: int | None = None,
object_name: str | None = None,
data_df: pl.DataFrame | None = None,
on_conflict: Literal["raise", "ignore"] = "ignore",
) -> None:
"""Links one or more assets to a curtailment event.
You can pass individual values to link a single asset, or a DataFrame for batch linking.
Parameters
----------
curtailment_event_id : int | None, optional
ID of the curtailment event. Required for single insert. By default None.
object_name : str | None, optional
Name of the asset to link. Must be a wind_turbine or solar_inverter. Required for single insert. By default None.
data_df : pl.DataFrame | None, optional
DataFrame for batch insert. Required columns: curtailment_event_id, object_name.
When provided, individual parameters are ignored. By default None.
on_conflict : Literal["raise", "ignore"], optional
Behavior if the (curtailment_event_id, object_id) pair already exists. By default "ignore".
"""
df_schema = {"curtailment_event_id": pl.Int64, "object_name": pl.Utf8}
if data_df is None:
data_df = pl.DataFrame(
{"curtailment_event_id": [curtailment_event_id], "object_name": [object_name]},
schema=df_schema,
)
else:
data_df = data_df.cast({c: t for c, t in df_schema.items() if c in data_df.columns})
for col in ["curtailment_event_id", "object_name"]:
if col not in data_df.columns:
raise ValueError(f"data_df is missing required column '{col}'.")
if data_df[col].is_null().any():
raise ValueError(f"Column '{col}' contains null values but is required.")
# resolve object_name → object_id
obj_names = data_df["object_name"].unique().to_list()
obj_ids = self._perfdb.objects.instances.get_ids(object_names=obj_names)
if missing := set(obj_names) - set(obj_ids.keys()):
raise ValueError(f"Objects not found: {missing}")
data_df = data_df.with_columns(pl.col("object_name").replace_strict(obj_ids, return_dtype=pl.Int64).alias("object_id"))
data_df = data_df.drop("object_name")
if_exists = "append" if on_conflict == "ignore" else "skip_row_check"
self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="power_forecast_curtailment_event_assets",
schema="performance",
if_exists=if_exists,
conflict_cols=["curtailment_event_id", "object_id"] if on_conflict == "ignore" else None,
)
logger.debug(f"Inserted {len(data_df)} curtailment event asset link(s).")