Skip to content

Resource Assessments PXX

ResourceAssessmentPxx(perfdb)

Class used for handling resource assessment pxx values. Can be accessed via perfdb.resourceassessments.pxx.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

get(period, time_res='daily', aggregation_window=None, group_names=None, group_types=None, resource_types=None, pxx=None, evaluation_periods=None, filter_type='and', output_type='DataFrame')

Gets the equivalent pxx values for the given resource types, evaluation periods and objects.

The most useful keys/columns returned are:

  • group_type_name
  • group_name
  • resource_type_name
  • evaluation_period
  • pxx
  • date
  • value

This method assumes that at least P50 and P90 are defined in the materialized views at the database. It will use both values to determine the standard deviation of the data and then calculate the other pxx values.

Parameters:

  • period

    (DateTimeRange) –

    Period of time to get the data for.

  • time_res

    (Literal['daily', 'monthly', 'quarterly', 'yearly'], default: 'daily' ) –

    Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly"], by default "daily"

  • aggregation_window

    (Literal['mtd', 'ytd', '12m'] | None, default: None ) –

    Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None

  • group_names

    (list[str], default: None ) –

    List of group names to get the data for. By default None

  • group_types

    (list[str], default: None ) –

    List of object group types to get the data for. By default None

  • resource_types

    (list[str], default: None ) –

    List of resource types to get the data for (eg. "wind_speed", "solar_irradiance", "average_power").

    If set to None, it will return all available resource types. By default None

  • pxx

    (list[float], default: None ) –

    List of pxx values to get the data for. All values must be between 0 and 1 (not included) and should not be repeated. By default [0.9, 0.5].

  • evaluation_periods

    (list[Literal['longterm', '1year', '1month', '1day']], default: None ) –

    List of evaluation periods to get the data for. By default None, which will return all available evaluation periods.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

  • output_type

    (Literal['dict', 'DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "DataFrame"

Returns:

  • DataFrame

    In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[group_type_name, group_name, resource_type_name, evaluation_period, date, pxx], columns = [value, ...]

  • dict[str, dict[str, dict[str, dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]]]]

    In case output_type is "dict", returns a dictionary in the format:

    {"group_type_name":
        {"group_name":
            {"resource_type_name":
                {"evaluation_period":
                    {"date":
                        {"pxx":
                            {"attribute": value, ...},
                        ...},
                    ...},
                ...},
            ...},
        ...},
    ...}
    
Source code in echo_postgres/resourceassessment_pxx.py
@validate_call
def get(
    self,
    period: DateTimeRange,
    time_res: Literal["daily", "monthly", "quarterly", "yearly"] = "daily",
    aggregation_window: Literal["mtd", "ytd", "12m"] | None = None,
    group_names: list[str] | None = None,
    group_types: list[str] | None = None,
    resource_types: list[str] | None = None,
    pxx: list[float] | None = None,
    evaluation_periods: list[Literal["longterm", "1year", "1month", "1day"]] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, dict[str, dict[str, dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]]]]:
    """Gets the equivalent pxx values for the given resource types, evaluation periods and objects.

    The most useful keys/columns returned are:

    - group_type_name
    - group_name
    - resource_type_name
    - evaluation_period
    - pxx
    - date
    - value

    This method assumes that at least P50 and P90 are defined in the materialized views at the database. It will use both values to determine the standard deviation of the data and then calculate the other pxx values.

    Parameters
    ----------
    period : DateTimeRange
        Period of time to get the data for.
    time_res : Literal["daily", "monthly", "quarterly", "yearly"], optional
        Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly"], by default "daily"
    aggregation_window : Literal["mtd", "ytd", "12m"] | None, optional
        Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None
    group_names : list[str], optional
        List of group names to get the data for. By default None
    group_types : list[str], optional
        List of object group types to get the data for. By default None
    resource_types : list[str], optional
        List of resource types to get the data for (eg. "wind_speed", "solar_irradiance", "average_power").

        If set to None, it will return all available resource types. By default None
    pxx : list[float], optional
        List of pxx values to get the data for. All values must be between 0 and 1 (not included) and should not be repeated. By default [0.9, 0.5].
    evaluation_periods : list[Literal["longterm", "1year", "1month", "1day"]], optional
        List of evaluation periods to get the data for. By default None, which will return all available evaluation periods.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "DataFrame"

    Returns
    -------
    DataFrame
        In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[group_type_name, group_name, resource_type_name, evaluation_period, date, pxx], columns = [value, ...]
    dict[str, dict[str, dict[str, dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]]]]
        In case output_type is "dict", returns a dictionary in the format:

        ```python
        {"group_type_name":
            {"group_name":
                {"resource_type_name":
                    {"evaluation_period":
                        {"date":
                            {"pxx":
                                {"attribute": value, ...},
                            ...},
                        ...},
                    ...},
                ...},
            ...},
        ...}
        ```
    """
    # building the query
    query = [
        sql.SQL("SELECT * FROM {table_name}").format(
            table_name=sql.Identifier(f"mv_resource_assessment_pxx_{time_res}{f'_{aggregation_window}' if aggregation_window else ''}"),
        ),
        sql.SQL(" WHERE pxx IN (0.5, 0.9) AND date >= {start} AND date <= {end}").format(
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]

    where = []

    if group_names:
        where.append(
            sql.SQL("group_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, group_names)),
            ),
        )
    if group_types:
        where.append(
            sql.SQL("group_type_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, group_types)),
            ),
        )
    if resource_types:
        where.append(
            sql.SQL("resource_type_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, resource_types)),
            ),
        )
    if evaluation_periods:
        # adding longterm to force getting p50
        request_eval_periods = [*evaluation_periods, "longterm"]
        request_eval_periods = list(set(request_eval_periods))
        where.append(
            sql.SQL("evaluation_period IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, request_eval_periods)),
            ),
        )
    if where:
        query.append(sql.SQL(" AND ("))
        query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
        query.append(sql.SQL(")"))
    query.append(sql.SQL(" ORDER BY group_type_name, group_name, resource_type_name, evaluation_period, date, pxx"))

    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query, post_convert="pyarrow", dtype={"date": "datetime64[s]"})

    # checking if we have the necessary pxx values
    if not {0.5, 0.9}.issubset(set(df["pxx"].unique())):
        raise ValueError(
            f"The materialized views do not contain the necessary pxx values. Please check the database. Found pxx values: {set(df['pxx'].unique())} but expected { {0.5, 0.9} }",
        )

    # pivoting so pxx are the columns
    df = df.pivot(
        columns=["pxx"],
        index=[col for col in df.columns if col not in ["pxx", "value"]],
        values="value",
    )
    df = df.reset_index(drop=False).set_index(["group_type_name", "group_name", "resource_type_name", "evaluation_period", "date"])

    # forcing all pxx cols to have double[pyarrow] type
    df = df.astype({0.5: "double[pyarrow]", 0.9: "double[pyarrow]"})

    # filling p50 of all evaluation periods with the same value based on longterm
    for group_type_name in df.index.get_level_values("group_type_name").unique():
        for group_name in df.index.get_level_values("group_name").unique():
            for resource_type_name in df.index.get_level_values("resource_type_name").unique():
                for evaluation_period in df.index.get_level_values("evaluation_period").unique():
                    if evaluation_period == "longterm":
                        continue
                    p50 = df.loc[IndexSlice[group_type_name, group_name, resource_type_name, "longterm", :], 0.5].copy()
                    p50 = p50.reset_index(drop=False)
                    p50["evaluation_period"] = evaluation_period
                    p50 = p50.set_index(["group_type_name", "group_name", "resource_type_name", "evaluation_period", "date"])
                    df.loc[IndexSlice[group_type_name, group_name, resource_type_name, evaluation_period, :], 0.5] = p50

    # calculating std
    df["std"] = (df[0.5] - df[0.9]) / df[0.5] / norm.ppf(0.9)

    # calculating the other pxx values
    if not pxx:
        pxx = [0.5, 0.9]
    pxx = sorted(pxx)
    for p in pxx:
        if p not in df.columns:
            df[p] = df[0.5] - (df[0.5] * df["std"]) * norm.ppf(p)

    # dropping unnecessary columns
    remove_cols = ["std"]
    if 0.5 not in pxx:
        remove_cols.append(0.5)
    if 0.9 not in pxx:
        remove_cols.append(0.9)
    df = df.drop(columns=remove_cols)

    # melting back so pxx are in the rows
    df = df.melt(
        id_vars=[col for col in df.columns if col not in pxx],
        value_vars=pxx,
        var_name="pxx",
        value_name="value",
        ignore_index=False,
    ).reset_index()

    # changing data type of value
    df = df.astype({"value": "double[pyarrow]", "pxx": "double[pyarrow]"})

    # keeping only the wanted evaluation periods
    if evaluation_periods:
        df = df[df["evaluation_period"].isin(evaluation_periods)].copy()

    # setting the index
    df = df.set_index(["group_type_name", "group_name", "resource_type_name", "evaluation_period", "date", "pxx"])

    # converting to desired format
    if output_type == "DataFrame":
        return df

    result = df.to_dict(orient="index")
    final_result = {}
    for (group_type_name, group_name, resource_type_name, evaluation_period, date, pxx_val), data in result.items():
        if group_type_name not in final_result:
            final_result[group_type_name] = {}
        if group_name not in final_result[group_type_name]:
            final_result[group_type_name][group_name] = {}
        if resource_type_name not in final_result[group_type_name][group_name]:
            final_result[group_type_name][group_name][resource_type_name] = {}
        if evaluation_period not in final_result[group_type_name][group_name][resource_type_name]:
            final_result[group_type_name][group_name][resource_type_name][evaluation_period] = {}
        if date not in final_result[group_type_name][group_name][resource_type_name][evaluation_period]:
            final_result[group_type_name][group_name][resource_type_name][evaluation_period][date] = {}
        final_result[group_type_name][group_name][resource_type_name][evaluation_period][date][pxx_val] = data

    return final_result