Alarms Definitions¶
AlarmDefinitions(perfdb)
¶
Class used for handling alarm definitions. Can be accessed via perfdb.alarms.definitions.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(alarms)
¶
Deletes alarm definitions from the database.
Parameters:
-
(alarms¶dict[str, list[tuple[int, str]]]) –Dict containing the alarm definitions to be deleted. Must be in the format {object_model_name: [(manufacturer_id, alarm_type), ...], ...}
Source code in echo_postgres/alarm_definitions.py
@validate_call
def delete(
self,
alarms: dict[str, list[tuple[int, str]]],
) -> None:
"""Deletes alarm definitions from the database.
Parameters
----------
alarms : dict[str, list[tuple[int, str]]]
Dict containing the alarm definitions to be deleted. Must be in the format {object_model_name: [(manufacturer_id, alarm_type), ...], ...}
"""
# getting ids of the object models
object_models = list(alarms.keys())
object_model_ids = self._perfdb.objects.models.get_ids(object_models=object_models)
if len(object_models) != len(object_model_ids):
missing_models = set(object_models) - set(object_model_ids.keys())
raise ValueError(f"The following object model names do not exist: {missing_models}")
# creating query
query = [
sql.SQL("DELETE FROM performance.alarms_def WHERE "),
sql.SQL(" OR ").join(
[
sql.SQL("(object_model_id = {model_id} AND manufacturer_id = {manufacturer_id} AND alarm_type = {alarm_type})").format(
model_id=sql.Literal(object_model_ids[object_model_name]),
manufacturer_id=sql.Literal(manufacturer_id),
alarm_type=sql.Literal(alarm_type),
)
for object_model_name, values in alarms.items()
for manufacturer_id, alarm_type in values
],
),
]
query = sql.Composed(query)
# executing query
with self._perfdb.conn.reconnect() as conn:
# deleting
result = conn.execute(query)
logger.debug(f"Deleted {result.rowcount} rows from the alarms_def table")
get(object_models=None, data_source_types=None, alarm_ids=None, match_alarm_id_on='manufacturer_id', filter_type='and', output_type='DataFrame')
¶
Gets all alarm definitions definitions with detailed information.
The most useful keys/columns returned are:
- id
- name
- description
- data_source_type_name
- non_overlapping_alarms
- trigger
- modified_date
Parameters:
-
(object_models¶list[str] | None, default:None) –List of object model names to filter the results. By default None
-
(data_source_types¶list[str] | None, default:None) –List of data source type names to filter the results. By default None
-
(alarm_ids¶list[int] | None, default:None) –List of alarm ids to filter the results. By default None
-
(match_alarm_id_on¶Literal['id', 'manufacturer_id'], default:'manufacturer_id') –What to match the alarm_ids on. Can be one of ["id", "manufacturer_id"]. By default "manufacturer_id"
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(output_type¶Literal['dict', 'DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "DataFrame"
Returns:
-
DataFrame–In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[object_model_name, manufacturer_id, alarm_type], columns = [attribute, ...]
-
dict[str, dict[int, dict[str, dict[str, Any]]]]–In case output_type is "object_models=ict", returns a dictionary in the format {object_model_name: {manufacturer_id: {alarm_type: {attribute: value, ...}, ...}, ...}, ...}
Source code in echo_postgres/alarm_definitions.py
@validate_call
def get(
self,
object_models: list[str] | None = None,
data_source_types: list[str] | None = None,
alarm_ids: list[int] | None = None,
match_alarm_id_on: Literal["id", "manufacturer_id"] = "manufacturer_id",
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, dict[int, dict[str, dict[str, Any]]]]:
"""Gets all alarm definitions definitions with detailed information.
The most useful keys/columns returned are:
- id
- name
- description
- data_source_type_name
- non_overlapping_alarms
- trigger
- modified_date
Parameters
----------
object_models : list[str] | None, optional
List of object model names to filter the results.
By default None
data_source_types : list[str] | None, optional
List of data source type names to filter the results.
By default None
alarm_ids : list[int] | None, optional
List of alarm ids to filter the results.
By default None
match_alarm_id_on : Literal["id", "manufacturer_id"], optional
What to match the alarm_ids on. Can be one of ["id", "manufacturer_id"].
By default "manufacturer_id"
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "DataFrame"
Returns
-------
DataFrame
In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[object_model_name, manufacturer_id, alarm_type], columns = [attribute, ...]
dict[str, dict[int, dict[str, dict[str, Any]]]]
In case output_type is "object_models=ict", returns a dictionary in the format {object_model_name: {manufacturer_id: {alarm_type: {attribute: value, ...}, ...}, ...}, ...}
"""
# checking inputs
where = self._check_get_args(
object_models=object_models,
data_source_types=data_source_types,
alarm_ids=alarm_ids,
match_alarm_id_on=match_alarm_id_on,
filter_type=filter_type,
)
query = [
sql.SQL("SELECT * FROM performance.v_alarms_def "),
where,
sql.SQL(" ORDER BY object_model_name, manufacturer_id, alarm_type"),
]
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query, dtype_backend=None, post_convert="pyarrow")
df = df.set_index(["object_model_name", "manufacturer_id", "alarm_type"])
if output_type == "dict":
result = df.to_dict(orient="index")
# converting from format {(object_model_name, manufacturer_id, alarm_type): {attribute: value, ...}, ...} to {object_model_name: {manufacturer_id: {alarm_type: {attribute: value, ...}, ...}, ...}, ...}
final_result = {}
for (object_model_name, manufacturer_id, alarm_type), values in result.items():
if object_model_name not in final_result:
final_result[object_model_name] = {}
if manufacturer_id not in final_result[object_model_name]:
final_result[object_model_name][manufacturer_id] = {}
final_result[object_model_name][manufacturer_id][alarm_type] = values
return final_result
return df
get_ids(object_models=None, data_source_types=None, alarm_ids=None, match_alarm_id_on='manufacturer_id', filter_type='and')
¶
Gets all alarm definitions and their respective ids.
Parameters:
-
(object_models¶list[str] | None, default:None) –List of object model names to filter the results. By default None
-
(data_source_types¶list[str] | None, default:None) –List of data source type names to filter the results. By default None
-
(alarm_ids¶list[int] | None, default:None) –List of alarm ids to filter the results. By default None
-
(match_alarm_id_on¶Literal['id', 'manufacturer_id'], default:'manufacturer_id') –What to match the alarm_ids on. Can be one of ["id", "manufacturer_id"]. By default "manufacturer_id"
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
Returns:
-
dict[str, dict[int, dict[str, int]]]–Dictionary with all alarm definitions and their respective ids in the format {object_model_name: {manufacturer_id: {alarm_type: id, ...}, ...}, ...}.
Source code in echo_postgres/alarm_definitions.py
@validate_call
def get_ids(
self,
object_models: list[str] | None = None,
data_source_types: list[str] | None = None,
alarm_ids: list[int] | None = None,
match_alarm_id_on: Literal["id", "manufacturer_id"] = "manufacturer_id",
filter_type: Literal["and", "or"] = "and",
) -> dict[str, dict[int, dict[str, int]]]:
"""Gets all alarm definitions and their respective ids.
Parameters
----------
object_models : list[str] | None, optional
List of object model names to filter the results.
By default None
data_source_types : list[str] | None, optional
List of data source type names to filter the results.
By default None
alarm_ids : list[int] | None, optional
List of alarm ids to filter the results.
By default None
match_alarm_id_on : Literal["id", "manufacturer_id"], optional
What to match the alarm_ids on. Can be one of ["id", "manufacturer_id"].
By default "manufacturer_id"
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
Returns
-------
dict[str, dict[int, dict[str, int]]]
Dictionary with all alarm definitions and their respective ids in the format {object_model_name: {manufacturer_id: {alarm_type: id, ...}, ...}, ...}.
"""
# checking inputs
where = self._check_get_args(
object_models=object_models,
data_source_types=data_source_types,
alarm_ids=alarm_ids,
match_alarm_id_on=match_alarm_id_on,
filter_type=filter_type,
)
query = [
sql.SQL("SELECT object_model_name, manufacturer_id, id, alarm_type FROM performance.v_alarms_def "),
where,
sql.SQL(" ORDER BY object_model_name, manufacturer_id, alarm_type"),
]
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query)
df = df.set_index(["object_model_name", "manufacturer_id", "alarm_type"])
result = df["id"].to_dict()
# converting from format {(object_model_name, manufacturer_id, alarm_type): id, ...} to {object_model_name: {manufacturer_id: {alarm_type: id, ...}, ...}, ...}
final_result = {}
for (object_model_name, manufacturer_id, alarm_type), id_ in result.items():
if object_model_name not in final_result:
final_result[object_model_name] = {}
if manufacturer_id not in final_result[object_model_name]:
final_result[object_model_name][manufacturer_id] = {}
final_result[object_model_name][manufacturer_id][alarm_type] = id_
return final_result
insert(df, on_conflict='ignore')
¶
Inserts new alarm definitions into the database.
Parameters:
-
(df¶DataFrame) –DataFrame with the data to be inserted. The required columns are:
- object_model_name (str)
- manufacturer_id (int)
- alarm_type (str) (one of "A", "W", "S")
- name (str)
- description (str), optional
- data_source_type_name (str)
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"
Source code in echo_postgres/alarm_definitions.py
@validate_call
def insert(
self,
df: DataFrame,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
"""Inserts new alarm definitions into the database.
Parameters
----------
df : DataFrame
DataFrame with the data to be inserted. The required columns are:
- object_model_name (str)
- manufacturer_id (int)
- alarm_type (str) (one of "A", "W", "S")
- name (str)
- description (str), optional
- data_source_type_name (str)
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. Can be one of ["ignore", "update"].
By default "ignore"
"""
# making a copy of the DataFrame
df = df.copy()
# checking if the required columns are present
required_columns = {"object_model_name", "manufacturer_id", "alarm_type", "name", "data_source_type_name"}
optional_columns = {"description"}
if not required_columns.issubset(set(df.columns)):
wrong_columns = required_columns - set(df.columns)
raise ValueError(f"df is missing the following required columns: {wrong_columns}")
if not required_columns.union(optional_columns).issuperset(set(df.columns)):
wrong_columns = set(df.columns) - required_columns.union(optional_columns)
raise ValueError(f"df has the following invalid columns: {wrong_columns}")
# checking if the models exist
wanted_models = df["object_model_name"].unique().tolist()
wanted_model_ids = self._perfdb.objects.models.get_ids(object_models=wanted_models)
if len(wanted_models) != len(wanted_model_ids):
missing_models = set(wanted_models) - set(wanted_model_ids.keys())
raise ValueError(f"The following object model names do not exist: {missing_models}")
# checking if the data source types exist
wanted_data_source_types = df["data_source_type_name"].unique().tolist()
wanted_data_source_type_ids = self._perfdb.datasources.types.get_ids()
if any(data_source_type not in wanted_data_source_type_ids for data_source_type in wanted_data_source_types):
missing_data_source_types = set(wanted_data_source_types) - set(wanted_data_source_type_ids)
raise ValueError(f"The following data source type names do not exist: {missing_data_source_types}")
# dropping rows with empty values
df = df.dropna(subset=list(required_columns), how="any")
# changing model and data source type names to ids
df["object_model_id"] = df["object_model_name"].map(wanted_model_ids)
df["data_source_type_id"] = df["data_source_type_name"].map(wanted_data_source_type_ids)
df = df.drop(columns=["object_model_name", "data_source_type_name"])
# skipping if empty DataFrame
if df.empty:
return
# making sure all columns can be casted to the correct types
df = df.astype(
{
"manufacturer_id": "int32[pyarrow]",
"object_model_id": "int32[pyarrow]",
"data_source_type_id": "int32[pyarrow]",
"alarm_type": "string[pyarrow]",
"name": "string[pyarrow]",
"description": "string[pyarrow]",
},
)
# adjusting some characters that could cause errors
replace_list = [(";", ""), (",", "")]
for replace_tuple in replace_list:
df["name"] = df["name"].str.replace(replace_tuple[0], replace_tuple[1])
# inserting the data
if_exists_mapping = {
"ignore": "append",
"update": "update",
}
with self._perfdb.conn.reconnect() as conn:
conn.pandas_to_sql(
df=df,
table_name="alarms_def",
schema="performance",
if_exists=if_exists_mapping[on_conflict],
ignore_index=True,
conflict_cols=["object_model_id", "manufacturer_id", "alarm_type"],
)
logger.debug(f"Inserted/updated {len(df)} alarm definitions into the database")