Skip to content

Resource Assessments PXX

ResourceAssessmentPxx(perfdb)

Class used for handling resource assessment pxx values. Can be accessed via perfdb.resourceassessments.pxx.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
Python
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

get(period, time_res='daily', aggregation_window=None, group_names=None, group_types=None, resource_types=None, pxx=None, evaluation_periods=None, filter_type='and', output_type='DataFrame')

Gets the equivalent pxx values for the given resource types, evaluation periods and objects.

The most useful keys/columns returned are:

  • group_type_name
  • group_name
  • resource_type_name
  • evaluation_period
  • pxx
  • date
  • value

This method assumes that at least P50 and P90 are defined in the materialized views at the database. It will use both values to determine the standard deviation of the data and then calculate the other pxx values.

Parameters:

  • period

    (DateTimeRange) –

    Period of time to get the data for.

  • time_res

    (Literal['daily', 'monthly', 'quarterly', 'yearly'], default: 'daily' ) –

    Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly"], by default "daily"

  • aggregation_window

    (Literal['mtd', 'ytd', '12m'] | None, default: None ) –

    Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None

  • group_names

    (list[str], default: None ) –

    List of group names to get the data for. By default None

  • group_types

    (list[str], default: None ) –

    List of object group types to get the data for. By default None

  • resource_types

    (list[str], default: None ) –

    List of resource types to get the data for (eg. "wind_speed", "solar_irradiance", "average_power").

    If set to None, it will return all available resource types. By default None

  • pxx

    (list[float], default: None ) –

    List of pxx values to get the data for. All values must be between 0 and 1 (not included) and should not be repeated. By default [0.9, 0.5].

  • evaluation_periods

    (list[Literal['longterm', '1year', '1month', '1day']], default: None ) –

    List of evaluation periods to get the data for. By default None, which will return all available evaluation periods.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

  • output_type

    (Literal['dict', 'DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "DataFrame"

Returns:

  • DataFrame

    In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[group_type_name, group_name, resource_type_name, evaluation_period, date, pxx], columns = [value, ...]

  • DataFrame

    In case output_type is "pl.DataFrame", returns a Polars DataFrame.

  • dict[str, dict[str, dict[str, dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]]]]

    In case output_type is "dict", returns a dictionary in the format:

    Python
    {"group_type_name":
        {"group_name":
            {"resource_type_name":
                {"evaluation_period":
                    {"date":
                        {"pxx":
                            {"attribute": value, ...},
                        ...},
                    ...},
                ...},
            ...},
        ...},
    ...}
    
Source code in echo_postgres/resourceassessment_pxx.py
Python
@validate_call
def get(
    self,
    period: DateTimeRange,
    time_res: Literal["daily", "monthly", "quarterly", "yearly"] = "daily",
    aggregation_window: Literal["mtd", "ytd", "12m"] | None = None,
    group_names: list[str] | None = None,
    group_types: list[str] | None = None,
    resource_types: list[str] | None = None,
    pxx: list[float] | None = None,
    evaluation_periods: list[Literal["longterm", "1year", "1month", "1day"]] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "DataFrame",
) -> pd.DataFrame | pl.DataFrame | dict[str, dict[str, dict[str, dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]]]]:
    """Gets the equivalent pxx values for the given resource types, evaluation periods and objects.

    The most useful keys/columns returned are:

    - group_type_name
    - group_name
    - resource_type_name
    - evaluation_period
    - pxx
    - date
    - value

    This method assumes that at least P50 and P90 are defined in the materialized views at the database. It will use both values to determine the standard deviation of the data and then calculate the other pxx values.

    Parameters
    ----------
    period : DateTimeRange
        Period of time to get the data for.
    time_res : Literal["daily", "monthly", "quarterly", "yearly"], optional
        Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly"], by default "daily"
    aggregation_window : Literal["mtd", "ytd", "12m"] | None, optional
        Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None
    group_names : list[str], optional
        List of group names to get the data for. By default None
    group_types : list[str], optional
        List of object group types to get the data for. By default None
    resource_types : list[str], optional
        List of resource types to get the data for (eg. "wind_speed", "solar_irradiance", "average_power").

        If set to None, it will return all available resource types. By default None
    pxx : list[float], optional
        List of pxx values to get the data for. All values must be between 0 and 1 (not included) and should not be repeated. By default [0.9, 0.5].
    evaluation_periods : list[Literal["longterm", "1year", "1month", "1day"]], optional
        List of evaluation periods to get the data for. By default None, which will return all available evaluation periods.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "DataFrame"

    Returns
    -------
    DataFrame
        In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[group_type_name, group_name, resource_type_name, evaluation_period, date, pxx], columns = [value, ...]
    pl.DataFrame
        In case output_type is "pl.DataFrame", returns a Polars DataFrame.
    dict[str, dict[str, dict[str, dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]]]]
        In case output_type is "dict", returns a dictionary in the format:

        ```python
        {"group_type_name":
            {"group_name":
                {"resource_type_name":
                    {"evaluation_period":
                        {"date":
                            {"pxx":
                                {"attribute": value, ...},
                            ...},
                        ...},
                    ...},
                ...},
            ...},
        ...}
        ```
    """
    # building the query
    query = [
        sql.SQL("SELECT * FROM {table_name}").format(
            table_name=sql.Identifier(f"mv_resource_assessment_pxx_{time_res}{f'_{aggregation_window}' if aggregation_window else ''}"),
        ),
        sql.SQL(" WHERE pxx IN (0.5, 0.9) AND date >= {start} AND date <= {end}").format(
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]

    where = []

    if group_names:
        where.append(
            sql.SQL("group_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, group_names)),
            ),
        )
    if group_types:
        where.append(
            sql.SQL("group_type_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, group_types)),
            ),
        )
    if resource_types:
        where.append(
            sql.SQL("resource_type_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, resource_types)),
            ),
        )
    if evaluation_periods:
        # adding longterm to force getting p50
        request_eval_periods = [*evaluation_periods, "longterm"]
        request_eval_periods = list(set(request_eval_periods))
        where.append(
            sql.SQL("evaluation_period IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, request_eval_periods)),
            ),
        )
    if where:
        query.append(sql.SQL(" AND ("))
        query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
        query.append(sql.SQL(")"))
    query.append(sql.SQL(" ORDER BY group_type_name, group_name, resource_type_name, evaluation_period, date, pxx"))

    query = sql.Composed(query)

    df = self._perfdb.conn.read_to_polars(query)

    # ensure pxx is Float64 for precision (database may store as Float32)
    if "pxx" in df.columns:
        df = df.with_columns(pl.col("pxx").cast(pl.Float64).round(2))

    # checking if we have the necessary pxx values
    pxx_available = set(df["pxx"].unique().to_list())
    if not {0.5, 0.9}.issubset(pxx_available):
        raise ValueError(
            f"The materialized views do not contain the necessary pxx values. Please check the database. Found pxx values: {pxx_available} but expected { {0.5, 0.9} }",
        )

    # identify non-pivot columns (everything except pxx and value)
    id_cols = [col for col in df.columns if col not in ("pxx", "value")]
    group_keys = ["group_type_name", "group_name", "resource_type_name", "evaluation_period", "date"]

    # pivoting so pxx are the columns (0.5 and 0.9)
    df = df.pivot(on="pxx", index=id_cols, values="value")
    # ensure Float64 for precision
    df = df.with_columns(
        pl.col("0.5").cast(pl.Float64),
        pl.col("0.9").cast(pl.Float64),
    )

    # filling p50 of all evaluation periods with the same value based on longterm
    # extract longterm p50 values
    longterm_p50 = df.filter(pl.col("evaluation_period") == "longterm").select(
        "group_type_name",
        "group_name",
        "resource_type_name",
        "date",
        pl.col("0.5").alias("p50_longterm"),
    )
    # join longterm p50 onto all rows and overwrite 0.5 for non-longterm periods
    df = df.join(longterm_p50, on=["group_type_name", "group_name", "resource_type_name", "date"], how="left")
    df = df.with_columns(
        pl.when(pl.col("evaluation_period") != "longterm").then(pl.col("p50_longterm")).otherwise(pl.col("0.5")).alias("0.5"),
    ).drop("p50_longterm")

    # calculating std: (p50 - p90) / p50 / norm.ppf(0.9)
    ppf_09 = float(norm.ppf(0.9))
    df = df.with_columns(
        ((pl.col("0.5") - pl.col("0.9")) / pl.col("0.5") / ppf_09).alias("std"),
    )

    # calculating the other pxx values
    if not pxx:
        pxx = [0.5, 0.9]
    pxx = sorted(pxx)
    for p in pxx:
        col_name = str(p)
        if col_name not in df.columns:
            ppf_p = float(norm.ppf(p))
            df = df.with_columns(
                (pl.col("0.5") - pl.col("0.5") * pl.col("std") * ppf_p).alias(col_name),
            )

    # dropping unnecessary columns
    remove_cols = ["std"]
    if 0.5 not in pxx:
        remove_cols.append("0.5")
    if 0.9 not in pxx:
        remove_cols.append("0.9")
    df = df.drop([c for c in remove_cols if c in df.columns])

    # melting back so pxx are in the rows
    pxx_col_names = [str(p) for p in pxx]
    non_pxx_cols = [col for col in df.columns if col not in pxx_col_names]
    df = df.unpivot(
        on=pxx_col_names,
        index=non_pxx_cols,
        variable_name="pxx",
        value_name="value",
    )
    # convert pxx column from string back to float
    df = df.with_columns(pl.col("pxx").cast(pl.Float64), pl.col("value").cast(pl.Float64))

    # keeping only the wanted evaluation periods
    if evaluation_periods:
        df = df.filter(pl.col("evaluation_period").is_in(evaluation_periods))

    # sort for consistent output
    df = df.sort([*group_keys, "pxx"])

    # ensure proper types for output conversion
    df = df.with_columns(
        pl.col("date").cast(pl.Datetime("ms")),
        pl.col("value").cast(pl.Float64),
        pl.col("pxx").cast(pl.Float64),
    )

    return convert_output(df, output_type, index_col=[*group_keys, "pxx"], nest_by_index=True)