Resource Assessments PXX¶
ResourceAssessmentPxx(perfdb)
¶
Class used for handling resource assessment pxx values. Can be accessed via perfdb.resourceassessments.pxx.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
get(period, time_res='daily', aggregation_window=None, group_names=None, group_types=None, resource_types=None, pxx=None, evaluation_periods=None, filter_type='and', output_type='DataFrame')
¶
Gets the equivalent pxx values for the given resource types, evaluation periods and objects.
The most useful keys/columns returned are:
- group_type_name
- group_name
- resource_type_name
- evaluation_period
- pxx
- date
- value
This method assumes that at least P50 and P90 are defined in the materialized views at the database. It will use both values to determine the standard deviation of the data and then calculate the other pxx values.
Parameters:
-
(period¶DateTimeRange) –Period of time to get the data for.
-
(time_res¶Literal['daily', 'monthly', 'quarterly', 'yearly'], default:'daily') –Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly"], by default "daily"
-
(aggregation_window¶Literal['mtd', 'ytd', '12m'] | None, default:None) –Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None
-
(group_names¶list[str], default:None) –List of group names to get the data for. By default None
-
(group_types¶list[str], default:None) –List of object group types to get the data for. By default None
-
(resource_types¶list[str], default:None) –List of resource types to get the data for (eg. "wind_speed", "solar_irradiance", "average_power").
If set to None, it will return all available resource types. By default None
-
(pxx¶list[float], default:None) –List of pxx values to get the data for. All values must be between 0 and 1 (not included) and should not be repeated. By default [0.9, 0.5].
-
(evaluation_periods¶list[Literal['longterm', '1year', '1month', '1day']], default:None) –List of evaluation periods to get the data for. By default None, which will return all available evaluation periods.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(output_type¶Literal['dict', 'DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "DataFrame"
Returns:
-
DataFrame–In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[group_type_name, group_name, resource_type_name, evaluation_period, date, pxx], columns = [value, ...]
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame.
-
dict[str, dict[str, dict[str, dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]]]]–In case output_type is "dict", returns a dictionary in the format:
Python{"group_type_name": {"group_name": {"resource_type_name": {"evaluation_period": {"date": {"pxx": {"attribute": value, ...}, ...}, ...}, ...}, ...}, ...}, ...}
Source code in echo_postgres/resourceassessment_pxx.py
@validate_call
def get(
self,
period: DateTimeRange,
time_res: Literal["daily", "monthly", "quarterly", "yearly"] = "daily",
aggregation_window: Literal["mtd", "ytd", "12m"] | None = None,
group_names: list[str] | None = None,
group_types: list[str] | None = None,
resource_types: list[str] | None = None,
pxx: list[float] | None = None,
evaluation_periods: list[Literal["longterm", "1year", "1month", "1day"]] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "DataFrame",
) -> pd.DataFrame | pl.DataFrame | dict[str, dict[str, dict[str, dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]]]]:
"""Gets the equivalent pxx values for the given resource types, evaluation periods and objects.
The most useful keys/columns returned are:
- group_type_name
- group_name
- resource_type_name
- evaluation_period
- pxx
- date
- value
This method assumes that at least P50 and P90 are defined in the materialized views at the database. It will use both values to determine the standard deviation of the data and then calculate the other pxx values.
Parameters
----------
period : DateTimeRange
Period of time to get the data for.
time_res : Literal["daily", "monthly", "quarterly", "yearly"], optional
Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly"], by default "daily"
aggregation_window : Literal["mtd", "ytd", "12m"] | None, optional
Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None
group_names : list[str], optional
List of group names to get the data for. By default None
group_types : list[str], optional
List of object group types to get the data for. By default None
resource_types : list[str], optional
List of resource types to get the data for (eg. "wind_speed", "solar_irradiance", "average_power").
If set to None, it will return all available resource types. By default None
pxx : list[float], optional
List of pxx values to get the data for. All values must be between 0 and 1 (not included) and should not be repeated. By default [0.9, 0.5].
evaluation_periods : list[Literal["longterm", "1year", "1month", "1day"]], optional
List of evaluation periods to get the data for. By default None, which will return all available evaluation periods.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "DataFrame"
Returns
-------
DataFrame
In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[group_type_name, group_name, resource_type_name, evaluation_period, date, pxx], columns = [value, ...]
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame.
dict[str, dict[str, dict[str, dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]]]]
In case output_type is "dict", returns a dictionary in the format:
```python
{"group_type_name":
{"group_name":
{"resource_type_name":
{"evaluation_period":
{"date":
{"pxx":
{"attribute": value, ...},
...},
...},
...},
...},
...},
...}
```
"""
# building the query
query = [
sql.SQL("SELECT * FROM {table_name}").format(
table_name=sql.Identifier(f"mv_resource_assessment_pxx_{time_res}{f'_{aggregation_window}' if aggregation_window else ''}"),
),
sql.SQL(" WHERE pxx IN (0.5, 0.9) AND date >= {start} AND date <= {end}").format(
start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
where = []
if group_names:
where.append(
sql.SQL("group_name IN ({names})").format(
names=sql.SQL(", ").join(map(sql.Literal, group_names)),
),
)
if group_types:
where.append(
sql.SQL("group_type_name IN ({names})").format(
names=sql.SQL(", ").join(map(sql.Literal, group_types)),
),
)
if resource_types:
where.append(
sql.SQL("resource_type_name IN ({names})").format(
names=sql.SQL(", ").join(map(sql.Literal, resource_types)),
),
)
if evaluation_periods:
# adding longterm to force getting p50
request_eval_periods = [*evaluation_periods, "longterm"]
request_eval_periods = list(set(request_eval_periods))
where.append(
sql.SQL("evaluation_period IN ({names})").format(
names=sql.SQL(", ").join(map(sql.Literal, request_eval_periods)),
),
)
if where:
query.append(sql.SQL(" AND ("))
query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
query.append(sql.SQL(")"))
query.append(sql.SQL(" ORDER BY group_type_name, group_name, resource_type_name, evaluation_period, date, pxx"))
query = sql.Composed(query)
df = self._perfdb.conn.read_to_polars(query)
# ensure pxx is Float64 for precision (database may store as Float32)
if "pxx" in df.columns:
df = df.with_columns(pl.col("pxx").cast(pl.Float64).round(2))
# checking if we have the necessary pxx values
pxx_available = set(df["pxx"].unique().to_list())
if not {0.5, 0.9}.issubset(pxx_available):
raise ValueError(
f"The materialized views do not contain the necessary pxx values. Please check the database. Found pxx values: {pxx_available} but expected { {0.5, 0.9} }",
)
# identify non-pivot columns (everything except pxx and value)
id_cols = [col for col in df.columns if col not in ("pxx", "value")]
group_keys = ["group_type_name", "group_name", "resource_type_name", "evaluation_period", "date"]
# pivoting so pxx are the columns (0.5 and 0.9)
df = df.pivot(on="pxx", index=id_cols, values="value")
# ensure Float64 for precision
df = df.with_columns(
pl.col("0.5").cast(pl.Float64),
pl.col("0.9").cast(pl.Float64),
)
# filling p50 of all evaluation periods with the same value based on longterm
# extract longterm p50 values
longterm_p50 = df.filter(pl.col("evaluation_period") == "longterm").select(
"group_type_name",
"group_name",
"resource_type_name",
"date",
pl.col("0.5").alias("p50_longterm"),
)
# join longterm p50 onto all rows and overwrite 0.5 for non-longterm periods
df = df.join(longterm_p50, on=["group_type_name", "group_name", "resource_type_name", "date"], how="left")
df = df.with_columns(
pl.when(pl.col("evaluation_period") != "longterm").then(pl.col("p50_longterm")).otherwise(pl.col("0.5")).alias("0.5"),
).drop("p50_longterm")
# calculating std: (p50 - p90) / p50 / norm.ppf(0.9)
ppf_09 = float(norm.ppf(0.9))
df = df.with_columns(
((pl.col("0.5") - pl.col("0.9")) / pl.col("0.5") / ppf_09).alias("std"),
)
# calculating the other pxx values
if not pxx:
pxx = [0.5, 0.9]
pxx = sorted(pxx)
for p in pxx:
col_name = str(p)
if col_name not in df.columns:
ppf_p = float(norm.ppf(p))
df = df.with_columns(
(pl.col("0.5") - pl.col("0.5") * pl.col("std") * ppf_p).alias(col_name),
)
# dropping unnecessary columns
remove_cols = ["std"]
if 0.5 not in pxx:
remove_cols.append("0.5")
if 0.9 not in pxx:
remove_cols.append("0.9")
df = df.drop([c for c in remove_cols if c in df.columns])
# melting back so pxx are in the rows
pxx_col_names = [str(p) for p in pxx]
non_pxx_cols = [col for col in df.columns if col not in pxx_col_names]
df = df.unpivot(
on=pxx_col_names,
index=non_pxx_cols,
variable_name="pxx",
value_name="value",
)
# convert pxx column from string back to float
df = df.with_columns(pl.col("pxx").cast(pl.Float64), pl.col("value").cast(pl.Float64))
# keeping only the wanted evaluation periods
if evaluation_periods:
df = df.filter(pl.col("evaluation_period").is_in(evaluation_periods))
# sort for consistent output
df = df.sort([*group_keys, "pxx"])
# ensure proper types for output conversion
df = df.with_columns(
pl.col("date").cast(pl.Datetime("ms")),
pl.col("value").cast(pl.Float64),
pl.col("pxx").cast(pl.Float64),
)
return convert_output(df, output_type, index_col=[*group_keys, "pxx"], nest_by_index=True)