Skip to content

KPI Availability Forecast Revisions

KpiAvailabilityForecastRevisions(perfdb)

Class used for handling availability forecast revisions. Can be accessed via perfdb.kpis.availability.forecasts.revisions.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/kpi_availability_forecast_revisions.py
Python
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Class used for handling availability forecast revisions. Can be accessed via `perfdb.kpis.availability.forecasts.revisions`.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.
    """
    super().__init__(perfdb)

    from .kpi_availability_forecast_revision_assignments import KpiAvailabilityForecastRevisionAssignments

    self.assignments = KpiAvailabilityForecastRevisionAssignments(perfdb)

clone(new_name, revision_id=None, name=None)

Clones an existing availability forecast revision under new_name.

Wraps the performance.fn_clone_availability_forecast_revisions PL/pgSQL function: all child rows (assumptions, downtime events, curtailment events and their asset junction rows) are duplicated under a new, UNLOCKED revision. user_id and description are inherited from the source.

availability_forecast_revision_assignments are intentionally NOT copied — the new revision starts unassigned.

Cloning a locked source is allowed: writes only target the new (unlocked) revision.

Exactly one of revision_id or name must be provided.

Parameters:

  • new_name

    (str) –

    Name for the cloned revision. Must be unique.

  • revision_id

    (int | None, default: None ) –

    ID of the source revision. Mutually exclusive with name. By default None.

  • name

    (str | None, default: None ) –

    Name of the source revision. Mutually exclusive with revision_id. By default None.

Returns:

  • int

    ID of the newly created revision.

Source code in echo_postgres/kpi_availability_forecast_revisions.py
Python
@validate_call
def clone(
    self,
    new_name: str,
    revision_id: int | None = None,
    name: str | None = None,
) -> int:
    """Clones an existing availability forecast revision under ``new_name``.

    Wraps the ``performance.fn_clone_availability_forecast_revisions`` PL/pgSQL function:
    all child rows (assumptions, downtime events, curtailment events and their asset
    junction rows) are duplicated under a new, UNLOCKED revision. ``user_id`` and
    ``description`` are inherited from the source.

    ``availability_forecast_revision_assignments`` are intentionally NOT copied — the new
    revision starts unassigned.

    Cloning a locked source is allowed: writes only target the new (unlocked) revision.

    Exactly one of ``revision_id`` or ``name`` must be provided.

    Parameters
    ----------
    new_name : str
        Name for the cloned revision. Must be unique.
    revision_id : int | None, optional
        ID of the source revision. Mutually exclusive with ``name``. By default None.
    name : str | None, optional
        Name of the source revision. Mutually exclusive with ``revision_id``. By default None.

    Returns
    -------
    int
        ID of the newly created revision.
    """
    if (revision_id is None) == (name is None):
        raise ValueError("Exactly one of 'revision_id' or 'name' must be provided.")

    query = sql.SQL(
        "SELECT performance.fn_clone_availability_forecast_revisions("
        "_id => {id}, _name => {name}, _new_name => {new_name}) AS new_id",
    ).format(
        id=sql.Literal(revision_id),
        name=sql.Literal(name),
        new_name=sql.Literal(new_name),
    )
    df = self._perfdb.conn.read_to_polars(query, schema_overrides={"new_id": pl.Int64})
    cloned_id = int(df["new_id"][0])
    logger.debug(f"Cloned revision {revision_id if revision_id is not None else name!r} → new revision {cloned_id} ({new_name!r}).")
    return cloned_id

delete(ids=None, names=None)

Deletes availability forecast revisions by ID or name.

Wraps the performance.fn_delete_availability_forecast_revisions PL/pgSQL function: the call is atomic — assumptions, downtime events, curtailment events and their asset junction rows are removed alongside the revisions themselves.

Revisions that are still referenced by availability_forecast_revision_assignments (i.e. set as the active default for some asset/year/month) cannot be deleted; the call raises a database error and nothing is deleted. Remove or reassign the assignments first.

Locked revisions also cannot be deleted — the existing locked-revision triggers fire and roll the transaction back. Toggle locked=False via :meth:update first.

Exactly one of ids or names must be provided.

Parameters:

  • ids

    (list[int] | None, default: None ) –

    List of revision IDs to delete. Mutually exclusive with names. By default None.

  • names

    (list[str] | None, default: None ) –

    List of revision names to delete. Mutually exclusive with ids. By default None.

Returns:

  • int

    Number of rows deleted.

Source code in echo_postgres/kpi_availability_forecast_revisions.py
Python
@validate_call
def delete(
    self,
    ids: list[int] | None = None,
    names: list[str] | None = None,
) -> int:
    """Deletes availability forecast revisions by ID or name.

    Wraps the ``performance.fn_delete_availability_forecast_revisions`` PL/pgSQL function:
    the call is atomic — assumptions, downtime events, curtailment events and their asset
    junction rows are removed alongside the revisions themselves.

    Revisions that are still referenced by ``availability_forecast_revision_assignments``
    (i.e. set as the active default for some asset/year/month) cannot be deleted; the call
    raises a database error and nothing is deleted. Remove or reassign the assignments first.

    Locked revisions also cannot be deleted — the existing locked-revision triggers fire and
    roll the transaction back. Toggle ``locked=False`` via :meth:`update` first.

    Exactly one of ``ids`` or ``names`` must be provided.

    Parameters
    ----------
    ids : list[int] | None, optional
        List of revision IDs to delete. Mutually exclusive with ``names``. By default None.
    names : list[str] | None, optional
        List of revision names to delete. Mutually exclusive with ``ids``. By default None.

    Returns
    -------
    int
        Number of rows deleted.
    """
    if (ids is None) == (names is None):
        raise ValueError("Exactly one of 'ids' or 'names' must be provided.")

    query = sql.SQL(
        "SELECT performance.fn_delete_availability_forecast_revisions("
        "_ids => {ids}, _names => {names}) AS deleted_ids",
    ).format(
        ids=sql.Literal(ids),
        names=sql.Literal(names),
    )
    df = self._perfdb.conn.read_to_polars(query)
    # bigint[] surfaces as a polars List column — `df[col][0]` is the inner Series (or None for NULL)
    inner = df["deleted_ids"][0]
    deleted_ids = inner.to_list() if inner is not None else []
    logger.debug(f"Deleted {len(deleted_ids)} availability forecast revision(s): {deleted_ids}")
    return len(deleted_ids)

get(names=None, locked=None, filter_type='and', output_type='pl.DataFrame')

Gets availability forecast revisions with full details.

The most useful keys/columns returned are:

  • id
  • name
  • description
  • user_name
  • modified_date
  • locked

Parameters:

  • names

    (list[str] | None, default: None ) –

    List of revision names to filter. By default None.

  • locked

    (bool | None, default: None ) –

    If provided, filters revisions by their locked flag. By default None (no filter).

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

  • output_type

    (Literal['dict', 'DataFrame', 'pl.DataFrame'], default: 'pl.DataFrame' ) –

    Output type. By default "pl.DataFrame".

Returns:

  • dict[str, dict[str, Any]]

    In case output_type is "dict", returns {name: {col: val, ...}, ...}.

  • DataFrame

    In case output_type is "DataFrame", returns a pandas DataFrame indexed by name.

  • DataFrame

    In case output_type is "pl.DataFrame", returns a Polars DataFrame.

Source code in echo_postgres/kpi_availability_forecast_revisions.py
Python
@validate_call
def get(
    self,
    names: list[str] | None = None,
    locked: bool | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[str, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
    """Gets availability forecast revisions with full details.

    The most useful keys/columns returned are:

    - id
    - name
    - description
    - user_name
    - modified_date
    - locked

    Parameters
    ----------
    names : list[str] | None, optional
        List of revision names to filter. By default None.
    locked : bool | None, optional
        If provided, filters revisions by their `locked` flag. By default None (no filter).
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".
    output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
        Output type. By default "pl.DataFrame".

    Returns
    -------
    dict[str, dict[str, Any]]
        In case output_type is "dict", returns {name: {col: val, ...}, ...}.
    pd.DataFrame
        In case output_type is "DataFrame", returns a pandas DataFrame indexed by name.
    pl.DataFrame
        In case output_type is "pl.DataFrame", returns a Polars DataFrame.
    """
    where = WhereClauseBuilder(filter_type=filter_type).add_any("name", names).add_eq("locked", locked).build()
    query = sql.SQL("SELECT * FROM performance.v_availability_forecast_revisions {where} ORDER BY name").format(where=where)
    df = self._perfdb.conn.read_to_polars(query, schema_overrides=self._cols_schema)
    return convert_output(df, output_type, index_col="name")

get_ids(names=None, locked=None, filter_type='and')

Gets availability forecast revision IDs.

Parameters:

  • names

    (list[str] | None, default: None ) –

    List of revision names to filter. By default None.

  • locked

    (bool | None, default: None ) –

    If provided, filters revisions by their locked flag. By default None (no filter).

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

Returns:

  • dict[str, int]

    Dictionary mapping revision name to id.

Source code in echo_postgres/kpi_availability_forecast_revisions.py
Python
@validate_call
def get_ids(
    self,
    names: list[str] | None = None,
    locked: bool | None = None,
    filter_type: Literal["and", "or"] = "and",
) -> dict[str, int]:
    """Gets availability forecast revision IDs.

    Parameters
    ----------
    names : list[str] | None, optional
        List of revision names to filter. By default None.
    locked : bool | None, optional
        If provided, filters revisions by their `locked` flag. By default None (no filter).
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".

    Returns
    -------
    dict[str, int]
        Dictionary mapping revision name to id.
    """
    where = WhereClauseBuilder(filter_type=filter_type).add_any("name", names).add_eq("locked", locked).build()
    query = sql.SQL("SELECT name, id FROM performance.v_availability_forecast_revisions {where} ORDER BY name").format(where=where)
    df = self._perfdb.conn.read_to_polars(query, schema_overrides=self._cols_schema)
    return dict(zip(df["name"].to_list(), df["id"].to_list(), strict=False))

insert(name=None, description=None, user_name=None, locked=None, data_df=None, on_conflict='raise')

Inserts one or more availability forecast revisions.

You can pass individual values to insert a single revision, or a DataFrame for batch insert.

Parameters:

  • name

    (str | None, default: None ) –

    Name of the revision (must be unique). Required for single insert. By default None.

  • description

    (str | None, default: None ) –

    Free-text description of the revision. By default None.

  • user_name

    (str | None, default: None ) –

    Name of the user creating the revision. Required for single insert. By default None.

  • locked

    (bool | None, default: None ) –

    Initial value of the locked flag. When None (default), the database default of FALSE is applied so the revision is unlocked at creation time. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    DataFrame for batch insert. Required columns: name, user_name. Optional: description, locked. When provided, individual parameters are ignored. By default None.

  • on_conflict

    (Literal['raise', 'ignore', 'update'], default: 'raise' ) –

    Behavior when a revision with the same name already exists. "raise" raises a DB error, "ignore" skips the row, "update" overwrites it. By default "raise".

Returns:

  • int | list[int] | None

    ID of the inserted revision (single) or list of IDs (batch). None if nothing was inserted.

Source code in echo_postgres/kpi_availability_forecast_revisions.py
Python
@validate_call
def insert(
    self,
    name: str | None = None,
    description: str | None = None,
    user_name: str | None = None,
    locked: bool | None = None,
    data_df: pl.DataFrame | None = None,
    on_conflict: Literal["raise", "ignore", "update"] = "raise",
) -> int | list[int] | None:
    """Inserts one or more availability forecast revisions.

    You can pass individual values to insert a single revision, or a DataFrame for batch insert.

    Parameters
    ----------
    name : str | None, optional
        Name of the revision (must be unique). Required for single insert. By default None.
    description : str | None, optional
        Free-text description of the revision. By default None.
    user_name : str | None, optional
        Name of the user creating the revision. Required for single insert. By default None.
    locked : bool | None, optional
        Initial value of the `locked` flag. When None (default), the database default of FALSE is applied
        so the revision is unlocked at creation time. By default None.
    data_df : pl.DataFrame | None, optional
        DataFrame for batch insert. Required columns: name, user_name. Optional: description, locked.
        When provided, individual parameters are ignored. By default None.
    on_conflict : Literal["raise", "ignore", "update"], optional
        Behavior when a revision with the same name already exists.
        "raise" raises a DB error, "ignore" skips the row, "update" overwrites it. By default "raise".

    Returns
    -------
    int | list[int] | None
        ID of the inserted revision (single) or list of IDs (batch). None if nothing was inserted.
    """
    df_schema = {"name": pl.Utf8, "description": pl.Utf8, "user_name": pl.Utf8, "locked": pl.Boolean}

    if data_df is None:
        single_insert = True
        data_df = pl.DataFrame(
            {"name": [name], "description": [description], "user_name": [user_name], "locked": [locked]},
            schema=df_schema,
        )
    else:
        single_insert = False
        data_df = data_df.cast({c: t for c, t in df_schema.items() if c in data_df.columns})

    for col in ["name", "user_name"]:
        if col not in data_df.columns:
            raise ValueError(f"data_df is missing required column '{col}'.")
        if data_df[col].is_null().any():
            raise ValueError(f"Column '{col}' contains null values but is required.")

    # resolve user_name → user_id
    user_names = data_df["user_name"].unique().to_list()
    user_ids = self._perfdb.users.instances.get_ids(names=user_names)
    if missing := set(user_names) - set(user_ids.keys()):
        raise ValueError(f"Users not found in the database: {missing}")
    data_df = data_df.with_columns(pl.col("user_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("user_id"))
    data_df = data_df.drop("user_name")

    if_exists_map = {"raise": "skip_row_check", "ignore": "append", "update": "update"}
    conflict_cols = ["name"] if on_conflict != "raise" else None

    ids_df = self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="availability_forecast_revisions",
        schema="performance",
        return_cols=["id"],
        if_exists=if_exists_map[on_conflict],
        conflict_cols=conflict_cols,
        ignore_null_cols=single_insert,
    )

    ids = ids_df["id"].to_list()
    logger.debug(f"Inserted {len(ids)} availability forecast revision(s): {ids}")
    return ids[0] if single_insert else ids

update(revision_id=None, name=None, description=None, user_name=None, locked=None, data_df=None)

Updates one or more availability forecast revisions.

You can pass individual values to update a single revision, or a DataFrame for batch update.

Note that when a revision is locked, the database trigger only allows the locked column itself to change — attempting to update name, description or user_name on a locked revision raises a database error. Toggle locked=False first if you need to edit other fields.

Parameters:

  • revision_id

    (int | None, default: None ) –

    ID of the revision to update. Required for single update. By default None.

  • name

    (str | None, default: None ) –

    New name for the revision. By default None.

  • description

    (str | None, default: None ) –

    New description. By default None.

  • user_name

    (str | None, default: None ) –

    Name of the user performing the update. By default None.

  • locked

    (bool | None, default: None ) –

    New value of the locked flag. When True the revision becomes read-only for its child rows; when False writes resume. By default None (left unchanged).

  • data_df

    (DataFrame | None, default: None ) –

    DataFrame for batch update. Required column: id. Optional: name, description, user_name, locked. When provided, individual parameters are ignored. By default None.

Source code in echo_postgres/kpi_availability_forecast_revisions.py
Python
@validate_call
def update(
    self,
    revision_id: int | None = None,
    name: str | None = None,
    description: str | None = None,
    user_name: str | None = None,
    locked: bool | None = None,
    data_df: pl.DataFrame | None = None,
) -> None:
    """Updates one or more availability forecast revisions.

    You can pass individual values to update a single revision, or a DataFrame for batch update.

    Note that when a revision is locked, the database trigger only allows the `locked` column itself
    to change — attempting to update `name`, `description` or `user_name` on a locked revision raises
    a database error. Toggle `locked=False` first if you need to edit other fields.

    Parameters
    ----------
    revision_id : int | None, optional
        ID of the revision to update. Required for single update. By default None.
    name : str | None, optional
        New name for the revision. By default None.
    description : str | None, optional
        New description. By default None.
    user_name : str | None, optional
        Name of the user performing the update. By default None.
    locked : bool | None, optional
        New value of the `locked` flag. When True the revision becomes read-only for its child rows;
        when False writes resume. By default None (left unchanged).
    data_df : pl.DataFrame | None, optional
        DataFrame for batch update. Required column: id. Optional: name, description, user_name, locked.
        When provided, individual parameters are ignored. By default None.
    """
    df_schema = {
        "id": pl.Int64,
        "name": pl.Utf8,
        "description": pl.Utf8,
        "user_name": pl.Utf8,
        "locked": pl.Boolean,
    }

    if data_df is None:
        single_update = True
        data_df = pl.DataFrame(
            {
                "id": [revision_id],
                "name": [name],
                "description": [description],
                "user_name": [user_name],
                "locked": [locked],
            },
            schema=df_schema,
        )
    else:
        single_update = False
        data_df = data_df.cast({c: t for c, t in df_schema.items() if c in data_df.columns})

    if "id" not in data_df.columns or data_df["id"].is_null().any():
        raise ValueError("'id' column is required and cannot contain nulls.")

    # verify IDs exist
    existing = self._perfdb.conn.read_to_polars(
        sql.SQL("SELECT id FROM performance.availability_forecast_revisions WHERE id = ANY({ids})").format(
            ids=sql.Literal(data_df["id"].to_list()),
        ),
        schema_overrides={"id": pl.Int64},
    )
    if missing := set(data_df["id"].to_list()) - set(existing["id"].to_list()):
        raise ValueError(f"Revision IDs not found: {missing}")

    # resolve user_name → user_id if provided
    if "user_name" in data_df.columns:
        user_names = data_df["user_name"].drop_nulls().unique().to_list()
        if user_names:
            user_ids = self._perfdb.users.instances.get_ids(names=user_names)
            if missing_users := set(user_names) - set(user_ids.keys()):
                raise ValueError(f"Users not found: {missing_users}")
            data_df = data_df.with_columns(
                pl.col("user_name").replace_strict(user_ids, return_dtype=pl.Int64, default=None).alias("user_id"),
            )
        data_df = data_df.drop("user_name")

    self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="availability_forecast_revisions",
        schema="performance",
        conflict_cols=["id"],
        if_exists="update_only",
        ignore_null_cols=single_update,
    )
    logger.debug(f"Updated {len(data_df)} availability forecast revision(s).")