Skip to content

KPI Availability Forecast Assumption Assets

KpiAvailabilityForecastAssumptionAssets(perfdb)

Class used for linking assets to forecast assumptions. Can be accessed via perfdb.kpis.availability.forecasts.assumptions.assets.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
Python
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(assumption_ids=None, object_names=None)

Removes asset links from forecast assumptions.

At least one filter must be provided.

Parameters:

  • assumption_ids

    (list[int] | None, default: None ) –

    List of assumption IDs to filter. By default None.

  • object_names

    (list[str] | None, default: None ) –

    List of object names to filter. By default None.

Returns:

  • int

    Number of rows deleted.

Source code in echo_postgres/kpi_availability_forecast_assumption_assets.py
Python
@validate_call
def delete(
    self,
    assumption_ids: list[int] | None = None,
    object_names: list[str] | None = None,
) -> int:
    """Removes asset links from forecast assumptions.

    At least one filter must be provided.

    Parameters
    ----------
    assumption_ids : list[int] | None, optional
        List of assumption IDs to filter. By default None.
    object_names : list[str] | None, optional
        List of object names to filter. By default None.

    Returns
    -------
    int
        Number of rows deleted.
    """
    if assumption_ids is None and object_names is None:
        raise ValueError("At least one of 'assumption_ids' or 'object_names' must be provided.")

    conditions = []
    if assumption_ids:
        conditions.append(sql.SQL("assumption_id = ANY({ids})").format(ids=sql.Literal(assumption_ids)))
    if object_names:
        obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
        if missing := set(object_names) - set(obj_ids.keys()):
            raise ValueError(f"Objects not found: {missing}")
        conditions.append(sql.SQL("object_id = ANY({ids})").format(ids=sql.Literal(list(obj_ids.values()))))

    query = sql.SQL("DELETE FROM performance.availability_forecast_assumption_assets WHERE ") + sql.SQL(" AND ").join(conditions)
    self._perfdb.conn.execute(query)
    deleted = self._perfdb.conn.rowcount
    logger.debug(f"Deleted {deleted} assumption asset link(s).")
    return deleted

get(assumption_ids=None, object_names=None, filter_type='and', output_type='pl.DataFrame')

Gets assets linked to forecast assumptions.

The most useful keys/columns returned are:

  • assumption_id
  • object_name
  • object_type_name
  • revision_name
  • availability_category_name
  • description
  • applicable_from
  • applicable_to
  • period_days
  • expected_downtime_hours
  • availability_impact

Parameters:

  • assumption_ids

    (list[int] | None, default: None ) –

    List of assumption IDs to filter. By default None.

  • object_names

    (list[str] | None, default: None ) –

    List of object names to filter. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

  • output_type

    (Literal['dict', 'DataFrame', 'pl.DataFrame'], default: 'pl.DataFrame' ) –

    Output type. By default "pl.DataFrame".

Returns:

  • dict[Any, dict[str, Any]]

    In case output_type is "dict", returns {(assumption_id, object_name): {col: val, ...}, ...}.

  • DataFrame

    In case output_type is "DataFrame", returns a pandas DataFrame with MultiIndex.

  • DataFrame

    In case output_type is "pl.DataFrame", returns a Polars DataFrame.

Source code in echo_postgres/kpi_availability_forecast_assumption_assets.py
Python
@validate_call
def get(
    self,
    assumption_ids: list[int] | None = None,
    object_names: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[Any, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
    """Gets assets linked to forecast assumptions.

    The most useful keys/columns returned are:

    - assumption_id
    - object_name
    - object_type_name
    - revision_name
    - availability_category_name
    - description
    - applicable_from
    - applicable_to
    - period_days
    - expected_downtime_hours
    - availability_impact

    Parameters
    ----------
    assumption_ids : list[int] | None, optional
        List of assumption IDs to filter. By default None.
    object_names : list[str] | None, optional
        List of object names to filter. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".
    output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
        Output type. By default "pl.DataFrame".

    Returns
    -------
    dict[Any, dict[str, Any]]
        In case output_type is "dict", returns {(assumption_id, object_name): {col: val, ...}, ...}.
    pd.DataFrame
        In case output_type is "DataFrame", returns a pandas DataFrame with MultiIndex.
    pl.DataFrame
        In case output_type is "pl.DataFrame", returns a Polars DataFrame.
    """
    where = (
        WhereClauseBuilder(filter_type=filter_type)
        .add_any("assumption_id", assumption_ids)
        .add_any("object_name", object_names)
        .build()
    )
    query = sql.SQL(
        "SELECT * FROM performance.v_availability_forecast_assumption_assets {where} ORDER BY assumption_id, object_name",
    ).format(where=where)
    df = self._perfdb.conn.read_to_polars(query, schema_overrides=self._cols_schema)
    return convert_output(df, output_type, index_col=["assumption_id", "object_name"])

insert(assumption_id=None, object_name=None, data_df=None, on_conflict='ignore')

Links one or more assets to a forecast assumption.

You can pass individual values to link a single asset, or a DataFrame for batch linking.

Parameters:

  • assumption_id

    (int | None, default: None ) –

    ID of the assumption. Required for single insert. By default None.

  • object_name

    (str | None, default: None ) –

    Name of the asset to link. Must be a wind_turbine or solar_inverter. Required for single insert. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    DataFrame for batch insert. Required columns: assumption_id, object_name. When provided, individual parameters are ignored. By default None.

  • on_conflict

    (Literal['raise', 'ignore'], default: 'ignore' ) –

    Behavior if the (assumption_id, object_id) pair already exists. By default "ignore".

Source code in echo_postgres/kpi_availability_forecast_assumption_assets.py
Python
@validate_call
def insert(
    self,
    assumption_id: int | None = None,
    object_name: str | None = None,
    data_df: pl.DataFrame | None = None,
    on_conflict: Literal["raise", "ignore"] = "ignore",
) -> None:
    """Links one or more assets to a forecast assumption.

    You can pass individual values to link a single asset, or a DataFrame for batch linking.

    Parameters
    ----------
    assumption_id : int | None, optional
        ID of the assumption. Required for single insert. By default None.
    object_name : str | None, optional
        Name of the asset to link. Must be a wind_turbine or solar_inverter. Required for single insert. By default None.
    data_df : pl.DataFrame | None, optional
        DataFrame for batch insert. Required columns: assumption_id, object_name.
        When provided, individual parameters are ignored. By default None.
    on_conflict : Literal["raise", "ignore"], optional
        Behavior if the (assumption_id, object_id) pair already exists. By default "ignore".
    """
    df_schema = {"assumption_id": pl.Int64, "object_name": pl.Utf8}

    if data_df is None:
        data_df = pl.DataFrame(
            {"assumption_id": [assumption_id], "object_name": [object_name]},
            schema=df_schema,
        )
    else:
        data_df = data_df.cast({c: t for c, t in df_schema.items() if c in data_df.columns})

    for col in ["assumption_id", "object_name"]:
        if col not in data_df.columns:
            raise ValueError(f"data_df is missing required column '{col}'.")
        if data_df[col].is_null().any():
            raise ValueError(f"Column '{col}' contains null values but is required.")

    # resolve object_name → object_id
    obj_names = data_df["object_name"].unique().to_list()
    obj_ids = self._perfdb.objects.instances.get_ids(object_names=obj_names)
    if missing := set(obj_names) - set(obj_ids.keys()):
        raise ValueError(f"Objects not found: {missing}")
    data_df = data_df.with_columns(pl.col("object_name").replace_strict(obj_ids, return_dtype=pl.Int64).alias("object_id"))
    data_df = data_df.drop("object_name")

    if_exists = "append" if on_conflict == "ignore" else "skip_row_check"

    self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="availability_forecast_assumption_assets",
        schema="performance",
        if_exists=if_exists,
        conflict_cols=["assumption_id", "object_id"] if on_conflict == "ignore" else None,
    )
    logger.debug(f"Inserted {len(data_df)} assumption asset link(s).")