Resource Assessments PXX¶
ResourceAssessmentPxx(perfdb)
¶
Class used for handling resource assessment pxx values. Can be accessed via perfdb.resourceassessments.pxx.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
get(period, time_res='daily', aggregation_window=None, group_names=None, group_types=None, resource_types=None, pxx=None, evaluation_periods=None, filter_type='and', output_type='DataFrame')
¶
Gets the equivalent pxx values for the given resource types, evaluation periods and objects.
The most useful keys/columns returned are:
- group_type_name
- group_name
- resource_type_name
- evaluation_period
- pxx
- date
- value
This method assumes that at least P50 and P90 are defined in the materialized views at the database. It will use both values to determine the standard deviation of the data and then calculate the other pxx values.
Parameters:
-
(period¶DateTimeRange) –Period of time to get the data for.
-
(time_res¶Literal['daily', 'monthly', 'quarterly', 'yearly'], default:'daily') –Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly"], by default "daily"
-
(aggregation_window¶Literal['mtd', 'ytd', '12m'] | None, default:None) –Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None
-
(group_names¶list[str], default:None) –List of group names to get the data for. By default None
-
(group_types¶list[str], default:None) –List of object group types to get the data for. By default None
-
(resource_types¶list[str], default:None) –List of resource types to get the data for (eg. "wind_speed", "solar_irradiance", "average_power").
If set to None, it will return all available resource types. By default None
-
(pxx¶list[float], default:None) –List of pxx values to get the data for. All values must be between 0 and 1 (not included) and should not be repeated. By default [0.9, 0.5].
-
(evaluation_periods¶list[Literal['longterm', '1year', '1month', '1day']], default:None) –List of evaluation periods to get the data for. By default None, which will return all available evaluation periods.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(output_type¶Literal['dict', 'DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "DataFrame"
Returns:
-
DataFrame–In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[group_type_name, group_name, resource_type_name, evaluation_period, date, pxx], columns = [value, ...]
-
dict[str, dict[str, dict[str, dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]]]]–In case output_type is "dict", returns a dictionary in the format:
{"group_type_name": {"group_name": {"resource_type_name": {"evaluation_period": {"date": {"pxx": {"attribute": value, ...}, ...}, ...}, ...}, ...}, ...}, ...}
Source code in echo_postgres/resourceassessment_pxx.py
@validate_call
def get(
self,
period: DateTimeRange,
time_res: Literal["daily", "monthly", "quarterly", "yearly"] = "daily",
aggregation_window: Literal["mtd", "ytd", "12m"] | None = None,
group_names: list[str] | None = None,
group_types: list[str] | None = None,
resource_types: list[str] | None = None,
pxx: list[float] | None = None,
evaluation_periods: list[Literal["longterm", "1year", "1month", "1day"]] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, dict[str, dict[str, dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]]]]:
"""Gets the equivalent pxx values for the given resource types, evaluation periods and objects.
The most useful keys/columns returned are:
- group_type_name
- group_name
- resource_type_name
- evaluation_period
- pxx
- date
- value
This method assumes that at least P50 and P90 are defined in the materialized views at the database. It will use both values to determine the standard deviation of the data and then calculate the other pxx values.
Parameters
----------
period : DateTimeRange
Period of time to get the data for.
time_res : Literal["daily", "monthly", "quarterly", "yearly"], optional
Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly"], by default "daily"
aggregation_window : Literal["mtd", "ytd", "12m"] | None, optional
Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None
group_names : list[str], optional
List of group names to get the data for. By default None
group_types : list[str], optional
List of object group types to get the data for. By default None
resource_types : list[str], optional
List of resource types to get the data for (eg. "wind_speed", "solar_irradiance", "average_power").
If set to None, it will return all available resource types. By default None
pxx : list[float], optional
List of pxx values to get the data for. All values must be between 0 and 1 (not included) and should not be repeated. By default [0.9, 0.5].
evaluation_periods : list[Literal["longterm", "1year", "1month", "1day"]], optional
List of evaluation periods to get the data for. By default None, which will return all available evaluation periods.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "DataFrame"
Returns
-------
DataFrame
In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[group_type_name, group_name, resource_type_name, evaluation_period, date, pxx], columns = [value, ...]
dict[str, dict[str, dict[str, dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]]]]
In case output_type is "dict", returns a dictionary in the format:
```python
{"group_type_name":
{"group_name":
{"resource_type_name":
{"evaluation_period":
{"date":
{"pxx":
{"attribute": value, ...},
...},
...},
...},
...},
...},
...}
```
"""
# building the query
query = [
sql.SQL("SELECT * FROM {table_name}").format(
table_name=sql.Identifier(f"mv_resource_assessment_pxx_{time_res}{f'_{aggregation_window}' if aggregation_window else ''}"),
),
sql.SQL(" WHERE pxx IN (0.5, 0.9) AND date >= {start} AND date <= {end}").format(
start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
where = []
if group_names:
where.append(
sql.SQL("group_name IN ({names})").format(
names=sql.SQL(", ").join(map(sql.Literal, group_names)),
),
)
if group_types:
where.append(
sql.SQL("group_type_name IN ({names})").format(
names=sql.SQL(", ").join(map(sql.Literal, group_types)),
),
)
if resource_types:
where.append(
sql.SQL("resource_type_name IN ({names})").format(
names=sql.SQL(", ").join(map(sql.Literal, resource_types)),
),
)
if evaluation_periods:
# adding longterm to force getting p50
request_eval_periods = [*evaluation_periods, "longterm"]
request_eval_periods = list(set(request_eval_periods))
where.append(
sql.SQL("evaluation_period IN ({names})").format(
names=sql.SQL(", ").join(map(sql.Literal, request_eval_periods)),
),
)
if where:
query.append(sql.SQL(" AND ("))
query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
query.append(sql.SQL(")"))
query.append(sql.SQL(" ORDER BY group_type_name, group_name, resource_type_name, evaluation_period, date, pxx"))
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query, post_convert="pyarrow", dtype={"date": "datetime64[s]"})
# checking if we have the necessary pxx values
if not {0.5, 0.9}.issubset(set(df["pxx"].unique())):
raise ValueError(
f"The materialized views do not contain the necessary pxx values. Please check the database. Found pxx values: {set(df['pxx'].unique())} but expected { {0.5, 0.9} }",
)
# pivoting so pxx are the columns
df = df.pivot(
columns=["pxx"],
index=[col for col in df.columns if col not in ["pxx", "value"]],
values="value",
)
df = df.reset_index(drop=False).set_index(["group_type_name", "group_name", "resource_type_name", "evaluation_period", "date"])
# forcing all pxx cols to have double[pyarrow] type
df = df.astype({0.5: "double[pyarrow]", 0.9: "double[pyarrow]"})
# filling p50 of all evaluation periods with the same value based on longterm
for group_type_name in df.index.get_level_values("group_type_name").unique():
for group_name in df.index.get_level_values("group_name").unique():
for resource_type_name in df.index.get_level_values("resource_type_name").unique():
for evaluation_period in df.index.get_level_values("evaluation_period").unique():
if evaluation_period == "longterm":
continue
p50 = df.loc[IndexSlice[group_type_name, group_name, resource_type_name, "longterm", :], 0.5].copy()
p50 = p50.reset_index(drop=False)
p50["evaluation_period"] = evaluation_period
p50 = p50.set_index(["group_type_name", "group_name", "resource_type_name", "evaluation_period", "date"])
df.loc[IndexSlice[group_type_name, group_name, resource_type_name, evaluation_period, :], 0.5] = p50
# calculating std
df["std"] = (df[0.5] - df[0.9]) / df[0.5] / norm.ppf(0.9)
# calculating the other pxx values
if not pxx:
pxx = [0.5, 0.9]
pxx = sorted(pxx)
for p in pxx:
if p not in df.columns:
df[p] = df[0.5] - (df[0.5] * df["std"]) * norm.ppf(p)
# dropping unnecessary columns
remove_cols = ["std"]
if 0.5 not in pxx:
remove_cols.append(0.5)
if 0.9 not in pxx:
remove_cols.append(0.9)
df = df.drop(columns=remove_cols)
# melting back so pxx are in the rows
df = df.melt(
id_vars=[col for col in df.columns if col not in pxx],
value_vars=pxx,
var_name="pxx",
value_name="value",
ignore_index=False,
).reset_index()
# changing data type of value
df = df.astype({"value": "double[pyarrow]", "pxx": "double[pyarrow]"})
# keeping only the wanted evaluation periods
if evaluation_periods:
df = df[df["evaluation_period"].isin(evaluation_periods)].copy()
# setting the index
df = df.set_index(["group_type_name", "group_name", "resource_type_name", "evaluation_period", "date", "pxx"])
# converting to desired format
if output_type == "DataFrame":
return df
result = df.to_dict(orient="index")
final_result = {}
for (group_type_name, group_name, resource_type_name, evaluation_period, date, pxx_val), data in result.items():
if group_type_name not in final_result:
final_result[group_type_name] = {}
if group_name not in final_result[group_type_name]:
final_result[group_type_name][group_name] = {}
if resource_type_name not in final_result[group_type_name][group_name]:
final_result[group_type_name][group_name][resource_type_name] = {}
if evaluation_period not in final_result[group_type_name][group_name][resource_type_name]:
final_result[group_type_name][group_name][resource_type_name][evaluation_period] = {}
if date not in final_result[group_type_name][group_name][resource_type_name][evaluation_period]:
final_result[group_type_name][group_name][resource_type_name][evaluation_period][date] = {}
final_result[group_type_name][group_name][resource_type_name][evaluation_period][date][pxx_val] = data
return final_result