Skip to content

KPI Availability Forecast Revision Assignments

KpiAvailabilityForecastRevisionAssignments(perfdb)

Class used for handling availability forecast revision assignments. Can be accessed via perfdb.kpis.availability.forecasts.revisions.assignments.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
Python
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(object_names=None, years=None, months=None)

Deletes revision assignments matching the provided filters.

At least one filter must be provided.

Parameters:

  • object_names

    (list[str] | None, default: None ) –

    List of object names to filter. By default None.

  • years

    (list[int] | None, default: None ) –

    List of years to filter. By default None.

  • months

    (list[int] | None, default: None ) –

    List of months to filter. By default None.

Returns:

  • int

    Number of rows deleted.

Source code in echo_postgres/kpi_availability_forecast_revision_assignments.py
Python
@validate_call
def delete(
    self,
    object_names: list[str] | None = None,
    years: list[int] | None = None,
    months: list[int] | None = None,
) -> int:
    """Deletes revision assignments matching the provided filters.

    At least one filter must be provided.

    Parameters
    ----------
    object_names : list[str] | None, optional
        List of object names to filter. By default None.
    years : list[int] | None, optional
        List of years to filter. By default None.
    months : list[int] | None, optional
        List of months to filter. By default None.

    Returns
    -------
    int
        Number of rows deleted.
    """
    if object_names is None and years is None and months is None:
        raise ValueError("At least one of 'object_names', 'years', or 'months' must be provided.")

    conditions = []
    if object_names:
        obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
        if missing := set(object_names) - set(obj_ids.keys()):
            raise ValueError(f"Objects not found: {missing}")
        conditions.append(sql.SQL("object_id = ANY({ids})").format(ids=sql.Literal(list(obj_ids.values()))))
    if years:
        conditions.append(sql.SQL("year = ANY({years})").format(years=sql.Literal(years)))
    if months:
        conditions.append(sql.SQL("month = ANY({months})").format(months=sql.Literal(months)))

    query = sql.SQL("DELETE FROM performance.availability_forecast_revision_assignments WHERE ") + sql.SQL(" AND ").join(conditions)
    self._perfdb.conn.execute(query)
    deleted = self._perfdb.conn.rowcount
    logger.debug(f"Deleted {deleted} revision assignment(s).")
    return deleted

get(object_names=None, years=None, months=None, revision_names=None, filter_type='and', output_type='pl.DataFrame')

Gets availability forecast revision assignments.

The most useful keys/columns returned are:

  • object_name
  • year
  • month
  • revision_name
  • user_name
  • modified_date

Parameters:

  • object_names

    (list[str] | None, default: None ) –

    List of object names to filter. By default None.

  • years

    (list[int] | None, default: None ) –

    List of years to filter. By default None.

  • months

    (list[int] | None, default: None ) –

    List of months (1-12) to filter. By default None.

  • revision_names

    (list[str] | None, default: None ) –

    List of revision names to filter. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

  • output_type

    (Literal['dict', 'DataFrame', 'pl.DataFrame'], default: 'pl.DataFrame' ) –

    Output type. By default "pl.DataFrame".

Returns:

  • dict[Any, dict[str, Any]]

    In case output_type is "dict", returns {(object_name, year, month): {col: val, ...}, ...}.

  • DataFrame

    In case output_type is "DataFrame", returns a pandas DataFrame with MultiIndex (object_name, year, month).

  • DataFrame

    In case output_type is "pl.DataFrame", returns a Polars DataFrame.

Source code in echo_postgres/kpi_availability_forecast_revision_assignments.py
Python
@validate_call
def get(
    self,
    object_names: list[str] | None = None,
    years: list[int] | None = None,
    months: list[int] | None = None,
    revision_names: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[Any, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
    """Gets availability forecast revision assignments.

    The most useful keys/columns returned are:

    - object_name
    - year
    - month
    - revision_name
    - user_name
    - modified_date

    Parameters
    ----------
    object_names : list[str] | None, optional
        List of object names to filter. By default None.
    years : list[int] | None, optional
        List of years to filter. By default None.
    months : list[int] | None, optional
        List of months (1-12) to filter. By default None.
    revision_names : list[str] | None, optional
        List of revision names to filter. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".
    output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
        Output type. By default "pl.DataFrame".

    Returns
    -------
    dict[Any, dict[str, Any]]
        In case output_type is "dict", returns {(object_name, year, month): {col: val, ...}, ...}.
    pd.DataFrame
        In case output_type is "DataFrame", returns a pandas DataFrame with MultiIndex (object_name, year, month).
    pl.DataFrame
        In case output_type is "pl.DataFrame", returns a Polars DataFrame.
    """
    where = (
        WhereClauseBuilder(filter_type=filter_type)
        .add_any("object_name", object_names)
        .add_any("year", years)
        .add_any("month", months)
        .add_any("revision_name", revision_names)
        .build()
    )
    query = sql.SQL(
        "SELECT * FROM performance.v_availability_forecast_revision_assignments {where} ORDER BY object_name, year, month",
    ).format(where=where)
    df = self._perfdb.conn.read_to_polars(query, schema_overrides=self._cols_schema)
    return convert_output(df, output_type, index_col=["object_name", "year", "month"])

insert(object_name=None, year=None, month=None, revision_name=None, user_name=None, data_df=None, on_conflict='raise')

Inserts one or more revision assignments (asset x year x month -> revision).

You can pass individual values to insert a single assignment, or a DataFrame for batch insert.

Parameters:

  • object_name

    (str | None, default: None ) –

    Name of the asset. Required for single insert. By default None.

  • year

    (int | None, default: None ) –

    Calendar year (> 2000). Required for single insert. By default None.

  • month

    (int | None, default: None ) –

    Calendar month (1-12). Required for single insert. By default None.

  • revision_name

    (str | None, default: None ) –

    Name of the revision to assign. Required for single insert. By default None.

  • user_name

    (str | None, default: None ) –

    Name of the user creating the assignment. Required for single insert. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    DataFrame for batch insert. Required columns: object_name, year, month, revision_name, user_name. When provided, individual parameters are ignored. By default None.

  • on_conflict

    (Literal['raise', 'ignore', 'update'], default: 'raise' ) –

    Behavior on (object_id, year, month) conflict. By default "raise".

Source code in echo_postgres/kpi_availability_forecast_revision_assignments.py
Python
@validate_call
def insert(
    self,
    object_name: str | None = None,
    year: int | None = None,
    month: int | None = None,
    revision_name: str | None = None,
    user_name: str | None = None,
    data_df: pl.DataFrame | None = None,
    on_conflict: Literal["raise", "ignore", "update"] = "raise",
) -> None:
    """Inserts one or more revision assignments (asset x year x month -> revision).

    You can pass individual values to insert a single assignment, or a DataFrame for batch insert.

    Parameters
    ----------
    object_name : str | None, optional
        Name of the asset. Required for single insert. By default None.
    year : int | None, optional
        Calendar year (> 2000). Required for single insert. By default None.
    month : int | None, optional
        Calendar month (1-12). Required for single insert. By default None.
    revision_name : str | None, optional
        Name of the revision to assign. Required for single insert. By default None.
    user_name : str | None, optional
        Name of the user creating the assignment. Required for single insert. By default None.
    data_df : pl.DataFrame | None, optional
        DataFrame for batch insert. Required columns: object_name, year, month, revision_name, user_name.
        When provided, individual parameters are ignored. By default None.
    on_conflict : Literal["raise", "ignore", "update"], optional
        Behavior on (object_id, year, month) conflict. By default "raise".
    """
    df_schema = {
        "object_name": pl.Utf8,
        "year": pl.Int16,
        "month": pl.Int16,
        "revision_name": pl.Utf8,
        "user_name": pl.Utf8,
    }

    if data_df is None:
        single_insert = True
        data_df = pl.DataFrame(
            {
                "object_name": [object_name],
                "year": [year],
                "month": [month],
                "revision_name": [revision_name],
                "user_name": [user_name],
            },
            schema=df_schema,
        )
    else:
        single_insert = False
        data_df = data_df.cast({c: t for c, t in df_schema.items() if c in data_df.columns})

    required_cols = ["object_name", "year", "month", "revision_name", "user_name"]
    for col in required_cols:
        if col not in data_df.columns:
            raise ValueError(f"data_df is missing required column '{col}'.")
        if data_df[col].is_null().any():
            raise ValueError(f"Column '{col}' contains null values but is required.")

    # resolve object_name → object_id
    obj_names = data_df["object_name"].unique().to_list()
    obj_ids = self._perfdb.objects.instances.get_ids(object_names=obj_names)
    if missing := set(obj_names) - set(obj_ids.keys()):
        raise ValueError(f"Objects not found: {missing}")
    data_df = data_df.with_columns(pl.col("object_name").replace_strict(obj_ids, return_dtype=pl.Int64).alias("object_id"))
    data_df = data_df.drop("object_name")

    # resolve revision_name → revision_id
    rev_names = data_df["revision_name"].unique().to_list()
    rev_ids = self._perfdb.kpis.availability.forecasts.revisions.get_ids(names=rev_names)
    if missing := set(rev_names) - set(rev_ids.keys()):
        raise ValueError(f"Revisions not found: {missing}")
    data_df = data_df.with_columns(pl.col("revision_name").replace_strict(rev_ids, return_dtype=pl.Int64).alias("revision_id"))
    data_df = data_df.drop("revision_name")

    # resolve user_name → user_id
    user_names = data_df["user_name"].unique().to_list()
    user_ids = self._perfdb.users.instances.get_ids(names=user_names)
    if missing := set(user_names) - set(user_ids.keys()):
        raise ValueError(f"Users not found: {missing}")
    data_df = data_df.with_columns(pl.col("user_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("user_id"))
    data_df = data_df.drop("user_name")

    if_exists_map = {"raise": "skip_row_check", "ignore": "append", "update": "update"}
    conflict_cols = ["object_id", "year", "month"] if on_conflict != "raise" else None

    self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="availability_forecast_revision_assignments",
        schema="performance",
        if_exists=if_exists_map[on_conflict],
        conflict_cols=conflict_cols,
        ignore_null_cols=single_insert,
    )
    logger.debug(f"Inserted {len(data_df)} revision assignment(s).")

update(object_name=None, year=None, month=None, revision_name=None, user_name=None, data_df=None)

Updates one or more revision assignments.

You can pass individual values to update a single assignment, or a DataFrame for batch update.

Parameters:

  • object_name

    (str | None, default: None ) –

    Object name identifying the assignment to update. Required for single update. By default None.

  • year

    (int | None, default: None ) –

    Year identifying the assignment to update. Required for single update. By default None.

  • month

    (int | None, default: None ) –

    Month identifying the assignment to update. Required for single update. By default None.

  • revision_name

    (str | None, default: None ) –

    New revision name to assign. By default None.

  • user_name

    (str | None, default: None ) –

    New user name. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    DataFrame for batch update. Required columns: object_name, year, month. Optional: revision_name, user_name. When provided, individual parameters are ignored. By default None.

Source code in echo_postgres/kpi_availability_forecast_revision_assignments.py
Python
@validate_call
def update(
    self,
    object_name: str | None = None,
    year: int | None = None,
    month: int | None = None,
    revision_name: str | None = None,
    user_name: str | None = None,
    data_df: pl.DataFrame | None = None,
) -> None:
    """Updates one or more revision assignments.

    You can pass individual values to update a single assignment, or a DataFrame for batch update.

    Parameters
    ----------
    object_name : str | None, optional
        Object name identifying the assignment to update. Required for single update. By default None.
    year : int | None, optional
        Year identifying the assignment to update. Required for single update. By default None.
    month : int | None, optional
        Month identifying the assignment to update. Required for single update. By default None.
    revision_name : str | None, optional
        New revision name to assign. By default None.
    user_name : str | None, optional
        New user name. By default None.
    data_df : pl.DataFrame | None, optional
        DataFrame for batch update. Required columns: object_name, year, month.
        Optional: revision_name, user_name. When provided, individual parameters are ignored. By default None.
    """
    df_schema = {
        "object_name": pl.Utf8,
        "year": pl.Int16,
        "month": pl.Int16,
        "revision_name": pl.Utf8,
        "user_name": pl.Utf8,
    }

    if data_df is None:
        single_update = True
        data_df = pl.DataFrame(
            {
                "object_name": [object_name],
                "year": [year],
                "month": [month],
                "revision_name": [revision_name],
                "user_name": [user_name],
            },
            schema=df_schema,
        )
    else:
        single_update = False
        data_df = data_df.cast({c: t for c, t in df_schema.items() if c in data_df.columns})

    for col in ["object_name", "year", "month"]:
        if col not in data_df.columns or data_df[col].is_null().any():
            raise ValueError(f"'{col}' is required and cannot contain nulls.")

    # resolve object_name → object_id
    obj_names = data_df["object_name"].unique().to_list()
    obj_ids = self._perfdb.objects.instances.get_ids(object_names=obj_names)
    if missing := set(obj_names) - set(obj_ids.keys()):
        raise ValueError(f"Objects not found: {missing}")
    data_df = data_df.with_columns(pl.col("object_name").replace_strict(obj_ids, return_dtype=pl.Int64).alias("object_id"))
    data_df = data_df.drop("object_name")

    # resolve revision_name → revision_id if provided
    if "revision_name" in data_df.columns:
        rev_names = data_df["revision_name"].drop_nulls().unique().to_list()
        if rev_names:
            rev_ids = self._perfdb.kpis.availability.forecasts.revisions.get_ids(names=rev_names)
            if missing := set(rev_names) - set(rev_ids.keys()):
                raise ValueError(f"Revisions not found: {missing}")
            data_df = data_df.with_columns(
                pl.col("revision_name").replace_strict(rev_ids, return_dtype=pl.Int64, default=None).alias("revision_id"),
            )
        data_df = data_df.drop("revision_name")

    # resolve user_name → user_id if provided
    if "user_name" in data_df.columns:
        user_names = data_df["user_name"].drop_nulls().unique().to_list()
        if user_names:
            user_ids = self._perfdb.users.instances.get_ids(names=user_names)
            if missing := set(user_names) - set(user_ids.keys()):
                raise ValueError(f"Users not found: {missing}")
            data_df = data_df.with_columns(
                pl.col("user_name").replace_strict(user_ids, return_dtype=pl.Int64, default=None).alias("user_id"),
            )
        data_df = data_df.drop("user_name")

    self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="availability_forecast_revision_assignments",
        schema="performance",
        conflict_cols=["object_id", "year", "month"],
        if_exists="update_only",
        ignore_null_cols=single_update,
    )
    logger.debug(f"Updated {len(data_df)} revision assignment(s).")