Alarms Definitions¶
AlarmDefinitions(perfdb)
¶
Class used for handling alarm definitions. Can be accessed via perfdb.alarms.definitions.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(alarms)
¶
Deletes alarm definitions from the database.
Parameters:
-
(alarms¶dict[str, list[tuple[int, str]]]) –Dict containing the alarm definitions to be deleted. Must be in the format {object_model_name: [(manufacturer_id, alarm_type), ...], ...}
Source code in echo_postgres/alarm_definitions.py
@validate_call
def delete(
self,
alarms: dict[str, list[tuple[int, str]]],
) -> None:
"""Deletes alarm definitions from the database.
Parameters
----------
alarms : dict[str, list[tuple[int, str]]]
Dict containing the alarm definitions to be deleted. Must be in the format {object_model_name: [(manufacturer_id, alarm_type), ...], ...}
"""
# getting ids of the object models
object_models = list(alarms.keys())
object_model_ids = self._perfdb.objects.models.get_ids(object_models=object_models)
if len(object_models) != len(object_model_ids):
missing_models = set(object_models) - set(object_model_ids.keys())
raise ValueError(f"The following object model names do not exist: {missing_models}")
# creating query
query = [
sql.SQL("DELETE FROM performance.alarms_def WHERE "),
sql.SQL(" OR ").join(
[
sql.SQL("(object_model_id = {model_id} AND manufacturer_id = {manufacturer_id} AND alarm_type = {alarm_type})").format(
model_id=sql.Literal(object_model_ids[object_model_name]),
manufacturer_id=sql.Literal(manufacturer_id),
alarm_type=sql.Literal(alarm_type),
)
for object_model_name, values in alarms.items()
for manufacturer_id, alarm_type in values
],
),
]
query = sql.Composed(query)
# executing query
# deleting
self._perfdb.conn.execute(query)
logger.debug(f"Deleted {self._perfdb.conn.rowcount} rows from the alarms_def table")
get(object_models=None, data_source_types=None, alarm_ids=None, match_alarm_id_on='manufacturer_id', filter_type='and', output_type='DataFrame')
¶
Gets all alarm definitions definitions with detailed information.
The most useful keys/columns returned are:
- id
- name
- description
- data_source_type_name
- non_overlapping_alarms
- trigger
- modified_date
Parameters:
-
(object_models¶list[str] | None, default:None) –List of object model names to filter the results. By default None
-
(data_source_types¶list[str] | None, default:None) –List of data source type names to filter the results. By default None
-
(alarm_ids¶list[int] | None, default:None) –List of alarm ids to filter the results. By default None
-
(match_alarm_id_on¶Literal['id', 'manufacturer_id'], default:'manufacturer_id') –What to match the alarm_ids on. Can be one of ["id", "manufacturer_id"]. By default "manufacturer_id"
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(output_type¶Literal['dict', 'DataFrame', 'pl.DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame", "pl.DataFrame"] By default "DataFrame"
Returns:
-
DataFrame–In case output_type is "DataFrame", returns a pandas DataFrame with the following format: index = MultiIndex[object_model_name, manufacturer_id, alarm_type], columns = [attribute, ...]
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame
-
dict[str, dict[int, dict[str, dict[str, Any]]]]–In case output_type is "object_models=ict", returns a dictionary in the format {object_model_name: {manufacturer_id: {alarm_type: {attribute: value, ...}, ...}, ...}, ...}
Source code in echo_postgres/alarm_definitions.py
@validate_call
def get(
self,
object_models: list[str] | None = None,
data_source_types: list[str] | None = None,
alarm_ids: list[int] | None = None,
match_alarm_id_on: Literal["id", "manufacturer_id"] = "manufacturer_id",
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "DataFrame",
) -> pd.DataFrame | pl.DataFrame | dict[str, dict[int, dict[str, dict[str, Any]]]]:
"""Gets all alarm definitions definitions with detailed information.
The most useful keys/columns returned are:
- id
- name
- description
- data_source_type_name
- non_overlapping_alarms
- trigger
- modified_date
Parameters
----------
object_models : list[str] | None, optional
List of object model names to filter the results.
By default None
data_source_types : list[str] | None, optional
List of data source type names to filter the results.
By default None
alarm_ids : list[int] | None, optional
List of alarm ids to filter the results.
By default None
match_alarm_id_on : Literal["id", "manufacturer_id"], optional
What to match the alarm_ids on. Can be one of ["id", "manufacturer_id"].
By default "manufacturer_id"
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame", "pl.DataFrame"]
By default "DataFrame"
Returns
-------
pd.DataFrame
In case output_type is "DataFrame", returns a pandas DataFrame with the following format: index = MultiIndex[object_model_name, manufacturer_id, alarm_type], columns = [attribute, ...]
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame
dict[str, dict[int, dict[str, dict[str, Any]]]]
In case output_type is "object_models=ict", returns a dictionary in the format {object_model_name: {manufacturer_id: {alarm_type: {attribute: value, ...}, ...}, ...}, ...}
"""
# checking inputs
where = self._check_get_args(
object_models=object_models,
data_source_types=data_source_types,
alarm_ids=alarm_ids,
match_alarm_id_on=match_alarm_id_on,
filter_type=filter_type,
)
query = [
sql.SQL(
"SELECT object_model_id, object_model_name, id, manufacturer_id, name, description, data_source_type_id, data_source_type_name, alarm_type, non_overlapping_alarms, trigger::TEXT, modified_date FROM performance.v_alarms_def ",
),
where,
sql.SQL(" ORDER BY object_model_name, manufacturer_id, alarm_type"),
]
query = sql.Composed(query)
df = self._perfdb.conn.read_to_polars(
query,
schema_overrides=self._cols_schema,
)
df = df.with_columns(
pl.col("trigger").map_elements(lambda x: orjson.loads(x) if x else None, return_dtype=pl.Object),
)
return convert_output(df, output_type, index_col=["object_model_name", "manufacturer_id", "alarm_type"], nest_by_index=True)
get_ids(object_models=None, data_source_types=None, alarm_ids=None, match_alarm_id_on='manufacturer_id', filter_type='and')
¶
Gets all alarm definitions and their respective ids.
Parameters:
-
(object_models¶list[str] | None, default:None) –List of object model names to filter the results. By default None
-
(data_source_types¶list[str] | None, default:None) –List of data source type names to filter the results. By default None
-
(alarm_ids¶list[int] | None, default:None) –List of alarm ids to filter the results. By default None
-
(match_alarm_id_on¶Literal['id', 'manufacturer_id'], default:'manufacturer_id') –What to match the alarm_ids on. Can be one of ["id", "manufacturer_id"]. By default "manufacturer_id"
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
Returns:
-
dict[str, dict[int, dict[str, int]]]–Dictionary with all alarm definitions and their respective ids in the format {object_model_name: {manufacturer_id: {alarm_type: id, ...}, ...}, ...}.
Source code in echo_postgres/alarm_definitions.py
@validate_call
def get_ids(
self,
object_models: list[str] | None = None,
data_source_types: list[str] | None = None,
alarm_ids: list[int] | None = None,
match_alarm_id_on: Literal["id", "manufacturer_id"] = "manufacturer_id",
filter_type: Literal["and", "or"] = "and",
) -> dict[str, dict[int, dict[str, int]]]:
"""Gets all alarm definitions and their respective ids.
Parameters
----------
object_models : list[str] | None, optional
List of object model names to filter the results.
By default None
data_source_types : list[str] | None, optional
List of data source type names to filter the results.
By default None
alarm_ids : list[int] | None, optional
List of alarm ids to filter the results.
By default None
match_alarm_id_on : Literal["id", "manufacturer_id"], optional
What to match the alarm_ids on. Can be one of ["id", "manufacturer_id"].
By default "manufacturer_id"
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
Returns
-------
dict[str, dict[int, dict[str, int]]]
Dictionary with all alarm definitions and their respective ids in the format {object_model_name: {manufacturer_id: {alarm_type: id, ...}, ...}, ...}.
"""
# checking inputs
where = self._check_get_args(
object_models=object_models,
data_source_types=data_source_types,
alarm_ids=alarm_ids,
match_alarm_id_on=match_alarm_id_on,
filter_type=filter_type,
)
query = [
sql.SQL("SELECT object_model_name, manufacturer_id, id, alarm_type FROM performance.v_alarms_def "),
where,
sql.SQL(" ORDER BY object_model_name, manufacturer_id, alarm_type"),
]
query = sql.Composed(query)
df = self._perfdb.conn.read_to_polars(
query,
schema_overrides=self._cols_schema,
).to_pandas(use_pyarrow_extension_array=True)
df = df.set_index(["object_model_name", "manufacturer_id", "alarm_type"])
result = df["id"].to_dict()
# converting from format {(object_model_name, manufacturer_id, alarm_type): id, ...} to {object_model_name: {manufacturer_id: {alarm_type: id, ...}, ...}, ...}
final_result = {}
for (object_model_name, manufacturer_id, alarm_type), id_ in result.items():
if object_model_name not in final_result:
final_result[object_model_name] = {}
if manufacturer_id not in final_result[object_model_name]:
final_result[object_model_name][manufacturer_id] = {}
final_result[object_model_name][manufacturer_id][alarm_type] = id_
return final_result
insert(df, on_conflict='ignore')
¶
Inserts new alarm definitions into the database.
Parameters:
-
(df¶DataFrame | DataFrame) –DataFrame with the data to be inserted. The required columns are:
- object_model_name (str)
- manufacturer_id (int)
- alarm_type (str) (one of "A", "W", "S")
- name (str)
- description (str), optional
- data_source_type_name (str)
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"
Source code in echo_postgres/alarm_definitions.py
@validate_call
def insert(
self,
df: pd.DataFrame | pl.DataFrame,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
"""Inserts new alarm definitions into the database.
Parameters
----------
df : pd.DataFrame | pl.DataFrame
DataFrame with the data to be inserted. The required columns are:
- object_model_name (str)
- manufacturer_id (int)
- alarm_type (str) (one of "A", "W", "S")
- name (str)
- description (str), optional
- data_source_type_name (str)
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. Can be one of ["ignore", "update"].
By default "ignore"
"""
# converting to polars
if isinstance(df, pd.DataFrame):
df = pl.from_pandas(df)
# checking if the required columns are present
required_columns = {"object_model_name", "manufacturer_id", "alarm_type", "name", "data_source_type_name"}
optional_columns = {"description"}
if not required_columns.issubset(set(df.columns)):
wrong_columns = required_columns - set(df.columns)
raise ValueError(f"df is missing the following required columns: {wrong_columns}")
if not required_columns.union(optional_columns).issuperset(set(df.columns)):
wrong_columns = set(df.columns) - required_columns.union(optional_columns)
raise ValueError(f"df has the following invalid columns: {wrong_columns}")
# checking if the models exist
wanted_models = df["object_model_name"].unique().to_list()
wanted_model_ids = self._perfdb.objects.models.get_ids(object_models=wanted_models)
if len(wanted_models) != len(wanted_model_ids):
missing_models = set(wanted_models) - set(wanted_model_ids.keys())
raise ValueError(f"The following object model names do not exist: {missing_models}")
# checking if the data source types exist
wanted_data_source_types = df["data_source_type_name"].unique().to_list()
wanted_data_source_type_ids = self._perfdb.datasources.types.get_ids()
if any(data_source_type not in wanted_data_source_type_ids for data_source_type in wanted_data_source_types):
missing_data_source_types = set(wanted_data_source_types) - set(wanted_data_source_type_ids)
raise ValueError(f"The following data source type names do not exist: {missing_data_source_types}")
# dropping rows with empty values
df = df.drop_nulls(subset=list(required_columns))
# changing model and data source type names to ids
df = df.with_columns(
[
pl.col("object_model_name").replace_strict(wanted_model_ids, return_dtype=pl.Int32).alias("object_model_id"),
pl.col("data_source_type_name")
.replace_strict(wanted_data_source_type_ids, return_dtype=pl.Int32)
.alias("data_source_type_id"),
],
)
df = df.drop(["object_model_name", "data_source_type_name"])
# skipping if empty DataFrame
if df.is_empty():
return
# making sure all columns can be casted to the correct types
df = df.cast(
{
"manufacturer_id": pl.Int32,
"object_model_id": pl.Int32,
"data_source_type_id": pl.Int32,
"alarm_type": pl.Utf8,
"name": pl.Utf8,
"description": pl.Utf8,
},
)
# adjusting some characters that could cause errors
replace_list = [(";", ""), (",", "")]
for replace_tuple in replace_list:
df = df.with_columns(
pl.col("name").str.replace_all(replace_tuple[0], replace_tuple[1]).alias("name"),
pl.col("description").str.replace_all(replace_tuple[0], replace_tuple[1]).alias("description"),
)
# inserting the data
if_exists_mapping = {
"ignore": "append",
"update": "update",
}
self._perfdb.conn.polars_to_sql(
df=df,
table_name="alarms_def",
schema="performance",
if_exists=if_exists_mapping[on_conflict],
conflict_cols=["object_model_id", "manufacturer_id", "alarm_type"],
)
logger.debug(f"Inserted/updated {len(df)} alarm definitions into the database")