Skip to content

Operatins UST

OperationsUST(perfdb)

Class used for handling TUST values. Can be accessed via perfdb.operations.ust.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(period, attributes_names=None, object_names=None)

Deletes TUST/MUST values from the database.

Parameters:

  • period

    (DateTimeRange) –

    Period of time to delete the data for.

  • attributes_names

    (list[Literal['must', 'tust']], default: None ) –

    List of attribute names to delete. Can be ["must"], ["tust"], or ["must", "tust"]. By default ["must", "tust"] (both values are deleted)

  • object_names

    (list[str], default: None ) –

    List of object names to delete the data for. By default None (all objects)

Source code in echo_postgres/operations_ust.py
@validate_call
def delete(
    self,
    period: DateTimeRange,
    attributes_names: list[Literal["must", "tust"]] | None = None,
    object_names: list[str] | None = None,
) -> None:
    """Deletes TUST/MUST values from the database.

    Parameters
    ----------
    period : DateTimeRange
        Period of time to delete the data for.
    attributes_names : list[Literal["must", "tust"]], optional
        List of attribute names to delete. Can be ["must"], ["tust"], or ["must", "tust"].
        By default ["must", "tust"] (both values are deleted)
    object_names : list[str], optional
        List of object names to delete the data for. By default None (all objects)
    """
    # Default to deleting both "must" and "tust" if not specified
    if attributes_names is None:
        attributes_names = ["must", "tust"]

    # Normalize attribute names to database column names
    attributes_mapping = {
        "must": "must_value",
        "tust": "tust_value",
    }
    valid_attrs = set(attributes_mapping.keys())
    invalid_attrs = set(attributes_names) - valid_attrs
    if invalid_attrs:
        raise ValueError(
            f"Invalid attribute names: {invalid_attrs}. Must be one of: {sorted(valid_attrs)}",
        )

    # Map to database column names
    columns_to_delete = [attributes_mapping[attr] for attr in attributes_names]

    # Build the query
    if set(columns_to_delete) == {"must_value", "tust_value"}:
        # Delete entire rows when both columns are selected
        query = [
            sql.SQL("DELETE FROM performance.ust_values WHERE (date >= {start} AND date <= {end})").format(
                start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
                end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
            ),
        ]
    else:
        # Update specific columns to NULL when only one is selected
        set_clause = sql.SQL(", ").join(sql.SQL("{col} = NULL").format(col=sql.Identifier(col)) for col in columns_to_delete)
        query = [
            sql.SQL("UPDATE performance.ust_values SET {set_clause} WHERE (date >= {start} AND date <= {end})").format(
                set_clause=set_clause,
                start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
                end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
            ),
        ]

    if object_names:
        # getting object id
        obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
        if len(obj_ids) != len(object_names):
            missing_objs = set(object_names) - set(obj_ids)
            raise ValueError(f"Could not find the following objects: {missing_objs}")
        query.append(sql.SQL(" AND object_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, obj_ids.values()))))

    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        result = conn.execute(query)

    deleted_attrs = ", ".join([attr.upper() for attr in attributes_names])
    logger.debug(f"Deleted {result.rowcount} rows - {deleted_attrs} values removed from ust_values table")

get(period, object_names=None, attributes_names=None, filter_type='and', output_type='DataFrame', values_only=False)

Gets TUST values for the desired period and objects.

The most useful keys/columns returned are:

  • must_value
  • tust_value

Parameters:

  • period

    (DateTimeRange) –

    Period of time to get the data for.

  • object_names

    (list[str], default: None ) –
    List of object names to get the data for. By default None (meaning all objects).
    
  • attributes_names

    (list[str], default: None ) –

    List of attribute names to get the data for, can be "Must" or "Tust". By default None (meaning all attributes).

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

  • output_type

    (Literal['dict', 'DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

  • values_only

    (bool, default: False ) –
    If set to True, when returning a dict will only return the values, ignoring other attributes like modified_date. Is ignored when output_type is "DataFrame". By default False
    

Returns:

  • DataFrame

    In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex["object_name", "date"], columns = [values, modified_date]

  • dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]

    In case output_type is "dict", returns a dictionary in the format {group_type_name: {object_name: {date: {attribute: value, ...}, ...}, ...}

Source code in echo_postgres/operations_ust.py
@validate_call
def get(
    self,
    period: DateTimeRange,
    object_names: list[str] | None = None,
    attributes_names: list[Literal["Must", "Tust"]] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame"] = "DataFrame",
    values_only: bool = False,
) -> DataFrame | dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]:
    """Gets TUST values for the desired period and objects.

    The most useful keys/columns returned are:

    - must_value
    - tust_value

    Parameters
    ----------
    period : DateTimeRange
        Period of time to get the data for.
    object_names : list[str], optional
            List of object names to get the data for. By default None (meaning all objects).
    attributes_names : list[str], optional
        List of attribute names to get the data for, can be "Must" or "Tust". By default None (meaning all attributes).
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"
    values_only : bool, optional
            If set to True, when returning a dict will only return the values, ignoring other attributes like modified_date. Is ignored when output_type is "DataFrame". By default False

    Returns
    -------
    DataFrame
        In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex["object_name", "date"], columns = [values, modified_date]
    dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]
        In case output_type is "dict", returns a dictionary in the format {group_type_name: {object_name: {date: {attribute: value, ...}, ...}, ...}
    """
    # Adjust period to first day of the month for start and end dates
    adjusted_start = period.start.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
    adjusted_end = period.end.replace(day=1, hour=0, minute=0, second=0, microsecond=0)

    # Normalize attribute names from user input to database column names
    if attributes_names:
        attributes_mapping = {
            "Must": "must_value",
            "Tust": "tust_value",
        }
        valid_attrs = set(attributes_mapping.keys()) | set(attributes_mapping.values())
        invalid_attrs = set(attributes_names) - valid_attrs
        if invalid_attrs:
            raise ValueError(
                f"Invalid attribute names: {invalid_attrs}. Must be one of: {sorted(valid_attrs)}",
            )
        attributes_names = [attributes_mapping.get(attr, attr) for attr in attributes_names]

    # build the query
    query = [
        sql.SQL(
            "SELECT object_id, object_name, date, {attributes}, modified_date FROM performance.{table} WHERE (date >= {start} AND date <= {end})",
        ).format(
            attributes=sql.SQL(", ").join(map(sql.Identifier, attributes_names))
            if attributes_names
            else sql.SQL("must_value, tust_value"),
            table=sql.Identifier("v_ust_values"),
            start=sql.Literal(f"{adjusted_start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{adjusted_end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]

    where = []
    if object_names:
        where.append(
            sql.SQL("object_name IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, object_names)),
            ),
        )
    if where:
        query.append(sql.SQL(" AND ("))
        query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
        query.append(sql.SQL(")"))

    query.append(sql.SQL(" ORDER BY  object_name, date"))

    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query, post_convert="pyarrow")
    # forcing date to be a Timestamp
    df["date"] = df["date"].astype("datetime64[s]")
    # forcing object_name to be a string
    df = df.astype(
        {"object_name": "string[pyarrow]"},
    )
    df = df.astype(
        {"object_id": "int64[pyarrow]"},
    )

    df = df.set_index(["object_name", "date"])

    if output_type == "DataFrame":
        return df

    # dropping id columns not used in dict format
    df = df.drop(columns=[col for col in df.columns if col.endswith("_id")])
    # converting to Dict
    result = df.to_dict(orient="index")
    final_result = {}

    # Get all value columns (excluding modified_date)
    value_columns = [col for col in df.columns if col != "modified_date"]

    for (object_name, date), data in result.items():
        if object_name not in final_result:
            final_result[object_name] = {}
        if date not in final_result[object_name]:
            if values_only:
                final_result[object_name][date] = {col: data[col] for col in value_columns}
            else:
                final_result[object_name][date] = data

    return final_result

insert(df, on_conflict='ignore')

Inserts TUST or MUST values into the database (table ust_values)

Parameters:

  • df

    (DataFrame) –

    DataFrame with the following columns:

    • object_name
    • date (must be a date referring to the beginning of the month)
    • must_value and/or tust_value (at least one must be present)
  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"

Source code in echo_postgres/operations_ust.py
@validate_call
def insert(
    self,
    df: DataFrame,
    on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
    """Inserts TUST or MUST values into the database (table ust_values)

    Parameters
    ----------
    df : DataFrame
        DataFrame with the following columns:

        - object_name
        - date (must be a date referring to the beginning of the month)
        - must_value and/or tust_value (at least one must be present)
    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict. Can be one of ["ignore", "update"].
        By default "ignore"
    """
    # checking inputs
    if df.isna().any().any():
        raise ValueError("df cannot have NaN values")

    required_cols = {"object_name", "date"}
    value_cols = {"must_value", "tust_value"}
    actual_cols = set(df.columns)

    # Validate required columns exist
    if not required_cols.issubset(actual_cols):
        missing_cols = required_cols - actual_cols
        raise ValueError(f"df must have the following columns: {missing_cols}")

    # Validate at least one value column exists
    present_value_cols = actual_cols & value_cols
    if not present_value_cols:
        raise ValueError("df must have at least one of the following columns: must_value, tust_value")

    # Check for unexpected columns
    expected_cols = required_cols | value_cols
    additional_cols = actual_cols - expected_cols
    if additional_cols:
        raise ValueError(f"Unexpected columns found: {additional_cols}")

    # Only let date be a value with day 1 of the month
    if not all(df["date"].dt.day == 1):
        raise ValueError("The value is monthly, so enter a date that represents the entire month (day 01)")

    # getting object id
    wanted_objs = df["object_name"].unique().tolist()
    obj_ids = self._perfdb.objects.instances.get_ids(object_names=wanted_objs)
    if len(obj_ids) != len(wanted_objs):
        missing_objs = set(wanted_objs) - set(obj_ids)
        raise ValueError(f"Could not find the following objects: {missing_objs}")
    df = df.copy()
    df["object_id"] = df["object_name"].map(obj_ids)
    df = df.drop(columns=["object_name"])

    # converting value columns to float
    for col in present_value_cols:
        df[col] = df[col].astype("float32")

    # Prepare DataFrame with required columns for single insert (only present value columns)
    columns_to_insert = ["object_id", "date", *sorted(present_value_cols)]
    df_to_insert = df[columns_to_insert].copy()

    # Single insert operation
    if_exists_mapping = {
        "ignore": "append",
        "update": "update",
    }

    with self._perfdb.conn.reconnect() as conn:
        conn.pandas_to_sql(
            df=df_to_insert,
            table_name="ust_values",
            schema="performance",
            if_exists=if_exists_mapping[on_conflict],
            ignore_index=True,
        )

    inserted_attrs = ", ".join([col.replace("_value", "").upper() for col in present_value_cols])
    logger.debug(f"{inserted_attrs} values inserted into the database in a single operation")