Skip to content

Alarms Definitions

AlarmDefinitions(perfdb)

Class used for handling alarm definitions. Can be accessed via perfdb.alarms.definitions.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
Python
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(alarms)

Deletes alarm definitions from the database.

Parameters:

  • alarms

    (dict[str, list[tuple[int, str]]]) –

    Dict containing the alarm definitions to be deleted. Must be in the format {object_model_name: [(manufacturer_id, alarm_type), ...], ...}

Source code in echo_postgres/alarm_definitions.py
Python
@validate_call
def delete(
    self,
    alarms: dict[str, list[tuple[int, str]]],
) -> None:
    """Deletes alarm definitions from the database.

    Parameters
    ----------
    alarms : dict[str, list[tuple[int, str]]]
        Dict containing the alarm definitions to be deleted. Must be in the format {object_model_name: [(manufacturer_id, alarm_type), ...], ...}
    """
    # getting ids of the object models
    object_models = list(alarms.keys())
    object_model_ids = self._perfdb.objects.models.get_ids(object_models=object_models)
    if len(object_models) != len(object_model_ids):
        missing_models = set(object_models) - set(object_model_ids.keys())
        raise ValueError(f"The following object model names do not exist: {missing_models}")

    # creating query
    query = [
        sql.SQL("DELETE FROM performance.alarms_def WHERE "),
        sql.SQL(" OR ").join(
            [
                sql.SQL("(object_model_id = {model_id} AND manufacturer_id = {manufacturer_id} AND alarm_type = {alarm_type})").format(
                    model_id=sql.Literal(object_model_ids[object_model_name]),
                    manufacturer_id=sql.Literal(manufacturer_id),
                    alarm_type=sql.Literal(alarm_type),
                )
                for object_model_name, values in alarms.items()
                for manufacturer_id, alarm_type in values
            ],
        ),
    ]
    query = sql.Composed(query)

    # executing query
    # deleting
    self._perfdb.conn.execute(query)

    logger.debug(f"Deleted {self._perfdb.conn.rowcount} rows from the alarms_def table")

get(object_models=None, data_source_types=None, alarm_ids=None, match_alarm_id_on='manufacturer_id', filter_type='and', output_type='DataFrame')

Gets all alarm definitions definitions with detailed information.

The most useful keys/columns returned are:

  • id
  • name
  • description
  • data_source_type_name
  • non_overlapping_alarms
  • trigger
  • modified_date

Parameters:

  • object_models

    (list[str] | None, default: None ) –

    List of object model names to filter the results. By default None

  • data_source_types

    (list[str] | None, default: None ) –

    List of data source type names to filter the results. By default None

  • alarm_ids

    (list[int] | None, default: None ) –

    List of alarm ids to filter the results. By default None

  • match_alarm_id_on

    (Literal['id', 'manufacturer_id'], default: 'manufacturer_id' ) –

    What to match the alarm_ids on. Can be one of ["id", "manufacturer_id"]. By default "manufacturer_id"

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

  • output_type

    (Literal['dict', 'DataFrame', 'pl.DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["dict", "DataFrame", "pl.DataFrame"] By default "DataFrame"

Returns:

  • DataFrame

    In case output_type is "DataFrame", returns a pandas DataFrame with the following format: index = MultiIndex[object_model_name, manufacturer_id, alarm_type], columns = [attribute, ...]

  • DataFrame

    In case output_type is "pl.DataFrame", returns a Polars DataFrame

  • dict[str, dict[int, dict[str, dict[str, Any]]]]

    In case output_type is "object_models=ict", returns a dictionary in the format {object_model_name: {manufacturer_id: {alarm_type: {attribute: value, ...}, ...}, ...}, ...}

Source code in echo_postgres/alarm_definitions.py
Python
@validate_call
def get(
    self,
    object_models: list[str] | None = None,
    data_source_types: list[str] | None = None,
    alarm_ids: list[int] | None = None,
    match_alarm_id_on: Literal["id", "manufacturer_id"] = "manufacturer_id",
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "DataFrame",
) -> pd.DataFrame | pl.DataFrame | dict[str, dict[int, dict[str, dict[str, Any]]]]:
    """Gets all alarm definitions definitions with detailed information.

    The most useful keys/columns returned are:

    - id
    - name
    - description
    - data_source_type_name
    - non_overlapping_alarms
    - trigger
    - modified_date

    Parameters
    ----------
    object_models : list[str] | None, optional
        List of object model names to filter the results.
        By default None
    data_source_types : list[str] | None, optional
        List of data source type names to filter the results.
        By default None
    alarm_ids : list[int] | None, optional
        List of alarm ids to filter the results.
        By default None
    match_alarm_id_on : Literal["id", "manufacturer_id"], optional
        What to match the alarm_ids on. Can be one of ["id", "manufacturer_id"].
        By default "manufacturer_id"
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"
    output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame", "pl.DataFrame"]
        By default "DataFrame"

    Returns
    -------
    pd.DataFrame
        In case output_type is "DataFrame", returns a pandas DataFrame with the following format: index = MultiIndex[object_model_name, manufacturer_id, alarm_type], columns = [attribute, ...]
    pl.DataFrame
        In case output_type is "pl.DataFrame", returns a Polars DataFrame
    dict[str, dict[int, dict[str, dict[str, Any]]]]
        In case output_type is "object_models=ict", returns a dictionary in the format {object_model_name: {manufacturer_id: {alarm_type: {attribute: value, ...}, ...}, ...}, ...}
    """
    # checking inputs
    where = self._check_get_args(
        object_models=object_models,
        data_source_types=data_source_types,
        alarm_ids=alarm_ids,
        match_alarm_id_on=match_alarm_id_on,
        filter_type=filter_type,
    )

    query = [
        sql.SQL(
            "SELECT object_model_id, object_model_name, id, manufacturer_id, name, description, data_source_type_id, data_source_type_name, alarm_type, non_overlapping_alarms, trigger::TEXT, modified_date FROM performance.v_alarms_def ",
        ),
        where,
        sql.SQL(" ORDER BY object_model_name, manufacturer_id, alarm_type"),
    ]

    query = sql.Composed(query)

    df = self._perfdb.conn.read_to_polars(
        query,
        schema_overrides=self._cols_schema,
    )

    df = df.with_columns(
        pl.col("trigger").map_elements(lambda x: orjson.loads(x) if x else None, return_dtype=pl.Object),
    )

    return convert_output(df, output_type, index_col=["object_model_name", "manufacturer_id", "alarm_type"], nest_by_index=True)

get_ids(object_models=None, data_source_types=None, alarm_ids=None, match_alarm_id_on='manufacturer_id', filter_type='and')

Gets all alarm definitions and their respective ids.

Parameters:

  • object_models

    (list[str] | None, default: None ) –

    List of object model names to filter the results. By default None

  • data_source_types

    (list[str] | None, default: None ) –

    List of data source type names to filter the results. By default None

  • alarm_ids

    (list[int] | None, default: None ) –

    List of alarm ids to filter the results. By default None

  • match_alarm_id_on

    (Literal['id', 'manufacturer_id'], default: 'manufacturer_id' ) –

    What to match the alarm_ids on. Can be one of ["id", "manufacturer_id"]. By default "manufacturer_id"

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

Returns:

  • dict[str, dict[int, dict[str, int]]]

    Dictionary with all alarm definitions and their respective ids in the format {object_model_name: {manufacturer_id: {alarm_type: id, ...}, ...}, ...}.

Source code in echo_postgres/alarm_definitions.py
Python
@validate_call
def get_ids(
    self,
    object_models: list[str] | None = None,
    data_source_types: list[str] | None = None,
    alarm_ids: list[int] | None = None,
    match_alarm_id_on: Literal["id", "manufacturer_id"] = "manufacturer_id",
    filter_type: Literal["and", "or"] = "and",
) -> dict[str, dict[int, dict[str, int]]]:
    """Gets all alarm definitions and their respective ids.

    Parameters
    ----------
    object_models : list[str] | None, optional
        List of object model names to filter the results.
        By default None
    data_source_types : list[str] | None, optional
        List of data source type names to filter the results.
        By default None
    alarm_ids : list[int] | None, optional
        List of alarm ids to filter the results.
        By default None
    match_alarm_id_on : Literal["id", "manufacturer_id"], optional
        What to match the alarm_ids on. Can be one of ["id", "manufacturer_id"].
        By default "manufacturer_id"
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"

    Returns
    -------
    dict[str, dict[int, dict[str, int]]]
        Dictionary with all alarm definitions and their respective ids in the format {object_model_name: {manufacturer_id: {alarm_type: id, ...}, ...}, ...}.
    """
    # checking inputs
    where = self._check_get_args(
        object_models=object_models,
        data_source_types=data_source_types,
        alarm_ids=alarm_ids,
        match_alarm_id_on=match_alarm_id_on,
        filter_type=filter_type,
    )

    query = [
        sql.SQL("SELECT object_model_name, manufacturer_id, id, alarm_type FROM performance.v_alarms_def "),
        where,
        sql.SQL(" ORDER BY object_model_name, manufacturer_id, alarm_type"),
    ]
    query = sql.Composed(query)

    df = self._perfdb.conn.read_to_polars(
        query,
        schema_overrides=self._cols_schema,
    ).to_pandas(use_pyarrow_extension_array=True)
    df = df.set_index(["object_model_name", "manufacturer_id", "alarm_type"])

    result = df["id"].to_dict()

    # converting from format {(object_model_name, manufacturer_id, alarm_type): id, ...} to {object_model_name: {manufacturer_id: {alarm_type: id, ...}, ...}, ...}
    final_result = {}
    for (object_model_name, manufacturer_id, alarm_type), id_ in result.items():
        if object_model_name not in final_result:
            final_result[object_model_name] = {}
        if manufacturer_id not in final_result[object_model_name]:
            final_result[object_model_name][manufacturer_id] = {}
        final_result[object_model_name][manufacturer_id][alarm_type] = id_

    return final_result

insert(df, on_conflict='ignore')

Inserts new alarm definitions into the database.

Parameters:

  • df

    (DataFrame | DataFrame) –

    DataFrame with the data to be inserted. The required columns are:

    • object_model_name (str)
    • manufacturer_id (int)
    • alarm_type (str) (one of "A", "W", "S")
    • name (str)
    • description (str), optional
    • data_source_type_name (str)
  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"

Source code in echo_postgres/alarm_definitions.py
Python
@validate_call
def insert(
    self,
    df: pd.DataFrame | pl.DataFrame,
    on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
    """Inserts new alarm definitions into the database.

    Parameters
    ----------
    df : pd.DataFrame | pl.DataFrame
        DataFrame with the data to be inserted. The required columns are:

        - object_model_name (str)
        - manufacturer_id (int)
        - alarm_type (str) (one of "A", "W", "S")
        - name (str)
        - description (str), optional
        - data_source_type_name (str)

    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict. Can be one of ["ignore", "update"].
        By default "ignore"
    """
    # converting to polars
    if isinstance(df, pd.DataFrame):
        df = pl.from_pandas(df)

    # checking if the required columns are present
    required_columns = {"object_model_name", "manufacturer_id", "alarm_type", "name", "data_source_type_name"}
    optional_columns = {"description"}
    if not required_columns.issubset(set(df.columns)):
        wrong_columns = required_columns - set(df.columns)
        raise ValueError(f"df is missing the following required columns: {wrong_columns}")
    if not required_columns.union(optional_columns).issuperset(set(df.columns)):
        wrong_columns = set(df.columns) - required_columns.union(optional_columns)
        raise ValueError(f"df has the following invalid columns: {wrong_columns}")

    # checking if the models exist
    wanted_models = df["object_model_name"].unique().to_list()
    wanted_model_ids = self._perfdb.objects.models.get_ids(object_models=wanted_models)
    if len(wanted_models) != len(wanted_model_ids):
        missing_models = set(wanted_models) - set(wanted_model_ids.keys())
        raise ValueError(f"The following object model names do not exist: {missing_models}")

    # checking if the data source types exist
    wanted_data_source_types = df["data_source_type_name"].unique().to_list()
    wanted_data_source_type_ids = self._perfdb.datasources.types.get_ids()
    if any(data_source_type not in wanted_data_source_type_ids for data_source_type in wanted_data_source_types):
        missing_data_source_types = set(wanted_data_source_types) - set(wanted_data_source_type_ids)
        raise ValueError(f"The following data source type names do not exist: {missing_data_source_types}")

    # dropping rows with empty values
    df = df.drop_nulls(subset=list(required_columns))

    # changing model and data source type names to ids
    df = df.with_columns(
        [
            pl.col("object_model_name").replace_strict(wanted_model_ids, return_dtype=pl.Int32).alias("object_model_id"),
            pl.col("data_source_type_name")
            .replace_strict(wanted_data_source_type_ids, return_dtype=pl.Int32)
            .alias("data_source_type_id"),
        ],
    )
    df = df.drop(["object_model_name", "data_source_type_name"])

    # skipping if empty DataFrame
    if df.is_empty():
        return

    # making sure all columns can be casted to the correct types
    df = df.cast(
        {
            "manufacturer_id": pl.Int32,
            "object_model_id": pl.Int32,
            "data_source_type_id": pl.Int32,
            "alarm_type": pl.Utf8,
            "name": pl.Utf8,
            "description": pl.Utf8,
        },
    )

    # adjusting some characters that could cause errors
    replace_list = [(";", ""), (",", "")]
    for replace_tuple in replace_list:
        df = df.with_columns(
            pl.col("name").str.replace_all(replace_tuple[0], replace_tuple[1]).alias("name"),
            pl.col("description").str.replace_all(replace_tuple[0], replace_tuple[1]).alias("description"),
        )

    # inserting the data
    if_exists_mapping = {
        "ignore": "append",
        "update": "update",
    }
    self._perfdb.conn.polars_to_sql(
        df=df,
        table_name="alarms_def",
        schema="performance",
        if_exists=if_exists_mapping[on_conflict],
        conflict_cols=["object_model_id", "manufacturer_id", "alarm_type"],
    )

    logger.debug(f"Inserted/updated {len(df)} alarm definitions into the database")