Alarms History¶
AlarmHistory(perfdb)
¶
Class used for handling alarm history.. Can be accessed via perfdb.alarms.history.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(period, object_names=None, alarm_ids=None)
¶
Deletes alarm history.
Parameters:
-
(period¶DateTimeRange) –Period to delete. Will only filter the start column.
-
(object_names¶list[str] | None, default:None) –Names of the objects to delete alarms history, by default None
-
(alarm_ids¶list[int] | None, default:None) –IDs of the alarms to delete, by default None
Currently only looks at the alarm_id, not the manufacturer_id
Source code in echo_postgres/alarm_history.py
@validate_call
def delete(
self,
period: DateTimeRange,
object_names: list[str] | None = None,
alarm_ids: list[int] | None = None,
) -> None:
"""Deletes alarm history.
Parameters
----------
period : DateTimeRange
Period to delete. Will only filter the start column.
object_names : list[str] | None, optional
Names of the objects to delete alarms history, by default None
alarm_ids : list[int] | None, optional
IDs of the alarms to delete, by default None
Currently only looks at the alarm_id, not the manufacturer_id
"""
# getting object ids
obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
if len(obj_ids) != len(object_names):
missing = set(object_names) - set(obj_ids.keys())
raise ValueError(f"Object names {missing} are not present in the database")
# writing query
query = [
sql.SQL("""DELETE FROM performance.alarms_history WHERE "start" BETWEEN {period_min} AND {period_max}""").format(
period_min=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
period_max=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
if obj_ids:
query.append(
sql.SQL(" AND object_id IN ({obj_ids}) ").format(obj_ids=sql.SQL(",").join(sql.Literal(obj_ids[x]) for x in object_names)),
)
if alarm_ids:
query.append(
sql.SQL(" AND alarm_id IN ({alarm_ids}) ").format(alarm_ids=sql.SQL(",").join(sql.Literal(x) for x in alarm_ids)),
)
query = sql.Composed(query)
# deleting data
with self._perfdb.conn.reconnect() as conn:
# deleting
result = conn.execute(query)
logger.debug(f"Deleted {result.rowcount} alarms from alarms_history table")
get(period, object_names=None, object_models=None, object_types=None, alarm_ids=None, match_alarm_id_on='manufacturer_id', alarm_types=None, data_source_types=None, filter_type='and', output_type='DataFrame')
¶
Gets alarm history.
The most relevant columns are: - object_id - object_name - alarm_id - manufacturer_id - alarm_name - alarm_type - data_source_type_name - start - end
Parameters:
-
(period¶DateTimeRange) –Desired period.
-
(object_names¶list[str] | None, default:None) –List of object names to filter the results. By default None
-
(object_models¶list[str] | None, default:None) –List of object model names to filter the results. By default None
-
(object_types¶list[str] | None, default:None) –List of object type names to filter the results. By default None
-
(alarm_ids¶list[int] | None, default:None) –IDs of the alarms, by default None
-
(match_alarm_id_on¶Literal['id', 'manufacturer_id'], default:'manufacturer_id') –What to match the alarm_ids on. Can be one of ["id", "manufacturer_id"]. By default "manufacturer_id"
-
(alarm_types¶list[Literal['A', 'W', 'S']] | None, default:None) –Types of the alarms (A, W, S), by default None
-
(data_source_types¶list[str] | None, default:None) –Names of the data sources used to get the alarms from, by default None
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(output_type¶Literal['dict', 'DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
Returns:
-
DataFrame–In case output_type is "DataFrame", returns a DataFrame with no index and columns as the attributes.
-
dict[str, dict[str, list[dict[str, Any]]]]–In case output_type is "dict", returns a dictionary with the following structure: {object_name: [{attribute: value, ...}, ...], ...}
Source code in echo_postgres/alarm_history.py
@validate_call
def get(
self,
period: DateTimeRange,
object_names: list[str] | None = None,
object_models: list[str] | None = None,
object_types: list[str] | None = None,
alarm_ids: list[int] | None = None,
match_alarm_id_on: Literal["id", "manufacturer_id"] = "manufacturer_id",
alarm_types: list[Literal["A", "W", "S"]] | None = None,
data_source_types: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, dict[str, list[dict[str, Any]]]]:
"""Gets alarm history.
The most relevant columns are:
- object_id
- object_name
- alarm_id
- manufacturer_id
- alarm_name
- alarm_type
- data_source_type_name
- start
- end
Parameters
----------
period : DateTimeRange
Desired period.
object_names : list[str] | None, optional
List of object names to filter the results. By default None
object_models : list[str] | None, optional
List of object model names to filter the results. By default None
object_types : list[str] | None, optional
List of object type names to filter the results. By default None
alarm_ids : list[int] | None, optional
IDs of the alarms, by default None
match_alarm_id_on : Literal["id", "manufacturer_id"], optional
What to match the alarm_ids on. Can be one of ["id", "manufacturer_id"].
By default "manufacturer_id"
alarm_types : list[Literal["A", "W", "S"]] | None, optional
Types of the alarms (A, W, S), by default None
data_source_types : list[str] | None, optional
Names of the data sources used to get the alarms from, by default None
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
Returns
-------
DataFrame
In case output_type is "DataFrame", returns a DataFrame with no index and columns as the attributes.
dict[str, dict[str, list[dict[str, Any]]]]
In case output_type is "dict", returns a dictionary with the following structure: {object_name: [{attribute: value, ...}, ...], ...}
"""
# writing query
# it will be written using the union operator to avoid using OR in the WHERE clause (start and end), which is slow
query_start = [
sql.SQL(
"""SELECT DISTINCT * FROM (SELECT * FROM v_alarms_hist WHERE "start" BETWEEN {period_min} AND {period_max}""",
).format(
period_min=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
period_max=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
query_end = [
sql.SQL(
"""SELECT * FROM v_alarms_hist WHERE "end" BETWEEN {period_min} AND {period_max}""",
).format(
period_min=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
period_max=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
if object_names is not None or object_models is not None or object_types is not None:
# getting ids of objects
obj_ids: dict[str, int] = self._perfdb.objects.instances.get(
object_names=object_names,
object_models=object_models,
object_types=object_types,
filter_type=filter_type,
)
# forcing object_models to be a list of the object models we have
object_models = list({obj_ids[x]["object_model_name"] for x in obj_ids})
for query in [query_start, query_end]:
query.append(
sql.SQL(" AND object_id IN ({obj_ids}) ").format(
obj_ids=sql.SQL(",").join(sql.Literal(obj_ids[x]["id"]) for x in object_names),
),
)
if alarm_ids is not None:
# getting alarms by id or manufacturer_id
if match_alarm_id_on == "manufacturer_id":
alarms_def = self._perfdb.alarms.definitions.get(
object_models=object_models,
data_source_types=data_source_types,
alarm_ids=alarm_ids,
match_alarm_id_on="manufacturer_id",
filter_type=filter_type,
)
alarm_ids = alarms_def["id"].tolist()
for query in [query_start, query_end]:
query.append(
sql.SQL(""" AND "alarm_id" IN ({alarm_ids}) """).format(
alarm_ids=sql.SQL(",").join(sql.Literal(x) for x in alarm_ids),
),
)
if alarm_types is not None:
for query in [query_start, query_end]:
query.append(
sql.SQL(" AND alarm_type IN ({alarm_types}) ").format(
alarm_types=sql.SQL(",").join(sql.Literal(x) for x in alarm_types),
),
)
if data_source_types is not None:
for query in [query_start, query_end]:
query.append(
sql.SQL(" AND data_source_type_name IN ({data_source_types}) ").format(
data_source_types=sql.SQL(",").join(sql.Literal(x) for x in data_source_types),
),
)
query = [*query_start, sql.SQL(" UNION "), *query_end]
query.append(sql.SQL(""") ORDER BY object_name, "start" """))
query = sql.Composed(query)
# getting data
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query)
# converting start and end to datetime64 to make sure it is the correct type
df["start"] = df["start"].astype("datetime64[ns]")
df["end"] = df["end"].astype("datetime64[ns]")
# returning data if it is a DataFrame
if output_type == "DataFrame":
return df
# returning data if it is a dictionary
temp_result = df.to_dict(orient="records")
# grouping by object_name
result = {}
for row in temp_result:
if row["object_name"] not in result:
result[row["object_name"]] = []
result[row["object_name"]].append(row)
return result
get_count(object_names, period, alarm_ids=None, output_type='DataFrame')
¶
Gets the count and duration of alarms for the provided objects within a specified period.
The resulting DataFrame will have the following columns: - object_id: ID of the object - object_name: Name of the object - alarm_id: ID of the alarm - alarm_manufacturer_id: Manufacturer ID of the alarm - alarm_name: Name of the alarm - alarm_type: Type of the alarm (A, W, S) - occurrences: Number of occurrences of the alarm within the period - occurrences_with_end_time: Number of occurrences of the alarm with an end time within the - duration: Total duration of the alarm within the period
Parameters:
-
(object_names¶list[str]) –Names of the objects to get the count of alarms for.
-
(period¶DateTimeRange) –Period to get the count of alarms for. Will only filter the start column.
-
(alarm_ids¶list[int], default:None) –list of desired alarm ids (number as defined by the manufacturer). If is set to [] or None will get all alarms for each turbine, by default []
-
(output_type¶Literal['DataFrame', 'pl.DataFrame'], default:'DataFrame') –Type of output, by default "DataFrame"
Returns:
-
DataFrame–If
Source code in echo_postgres/alarm_history.py
@validate_call
def get_count(
self,
object_names: list[str],
period: DateTimeRange,
alarm_ids: list[int] | None = None,
output_type: Literal["DataFrame", "pl.DataFrame"] = "DataFrame",
) -> DataFrame | pl.DataFrame:
"""Gets the count and duration of alarms for the provided objects within a specified period.
The resulting DataFrame will have the following columns:
- object_id: ID of the object
- object_name: Name of the object
- alarm_id: ID of the alarm
- alarm_manufacturer_id: Manufacturer ID of the alarm
- alarm_name: Name of the alarm
- alarm_type: Type of the alarm (A, W, S)
- occurrences: Number of occurrences of the alarm within the period
- occurrences_with_end_time: Number of occurrences of the alarm with an end time within the
- duration: Total duration of the alarm within the period
Parameters
----------
object_names : list[str]
Names of the objects to get the count of alarms for.
period : DateTimeRange
Period to get the count of alarms for. Will only filter the start column.
alarm_ids : list[int], optional
list of desired alarm ids (number as defined by the manufacturer). If is set to [] or None will get all alarms for each turbine, by default []
output_type : Literal["DataFrame", "pl.DataFrame"], optional
Type of output, by default "DataFrame"
Returns
-------
pd.DataFrame
If
"""
# getting object ids
objs_def = self._perfdb.objects.instances.get(object_names=object_names, output_type="DataFrame")
# getting alarms definitions
alarms_def = self._perfdb.alarms.definitions.get(
object_models=objs_def["object_model_name"].unique().tolist(),
alarm_ids=alarm_ids,
match_alarm_id_on="manufacturer_id",
output_type="DataFrame",
)
# writing query
query = sql.SQL(
"""SELECT
object_id,
alarm_id,
COUNT("start") AS occurrences,
COUNT("end") occurrences_with_end_time,
SUM("end" - "start") duration
FROM performance.alarms_history
WHERE "start" <= {period_max} AND "start" >= {period_min}
AND object_id IN ({object_ids})
{alarms_filter}
GROUP BY object_id, alarm_id
ORDER BY object_id, alarm_id
""",
).format(
period_min=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
period_max=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
object_ids=sql.SQL(",").join(sql.Literal(objs_def["id"][x]) for x in objs_def.index),
alarms_filter=sql.SQL(
"""AND alarm_id IN ({alarm_ids})""",
).format(alarm_ids=sql.SQL(",").join(sql.Literal(x) for x in alarms_def["id"].tolist()))
if alarm_ids
else sql.SQL(""),
)
# getting data
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_polars(
query,
schema_overrides={
"start": pl.Datetime("ms"),
"end": pl.Datetime("ms"),
"duration": pl.Duration("ms"),
"occurrences": pl.Int64,
"occurrences_with_end_time": pl.Int64,
},
)
# converting objs_def and alarms_def to polars
objs_def = pl.from_pandas(objs_def.reset_index())
alarms_def = pl.from_pandas(alarms_def.reset_index())
# joining with objects definitions
df = df.join(objs_def[["id", "name"]], left_on="object_id", right_on="id", how="left")
df = df.rename({"name": "object_name"})
df = df.join(alarms_def[["id", "name", "manufacturer_id", "alarm_type"]], left_on="alarm_id", right_on="id", how="left")
df = df.rename({"name": "alarm_name", "manufacturer_id": "alarm_manufacturer_id", "alarm_type": "alarm_type"})
# sorting columns
df = df.select(
[
"object_id",
"object_name",
"alarm_id",
"alarm_manufacturer_id",
"alarm_name",
"alarm_type",
"occurrences",
"occurrences_with_end_time",
"duration",
],
)
if output_type == "pl.DataFrame":
return df
# converting to pandas DataFrame
df = df.to_pandas()
return df
get_latest(object_names, only_timestamp=False)
¶
Gets the latest alarm for the provided objects.
Parameters:
-
(object_names¶list[str]) –Names of the objects to get the latest alarm history.
-
(only_timestamp¶bool, default:False) –If set to True, will only return the timestamp of the latest alarm, by default False
Returns:
-
dict[str, dict[str, Any]]–If only_timestamp is False, returns a dictionary with the following structure: {object_name: {attribute: value, ...}, ...}
-
dict[str, datetime]–If only_timestamp is True, returns a dictionary with the following structure: {object_name: timestamp, ...}
Source code in echo_postgres/alarm_history.py
@validate_call
def get_latest(
self,
object_names: list[str],
only_timestamp: bool = False,
) -> dict[str, dict[str, Any]] | dict[str, datetime]:
"""Gets the latest alarm for the provided objects.
Parameters
----------
object_names : list[str]
Names of the objects to get the latest alarm history.
only_timestamp : bool, optional
If set to True, will only return the timestamp of the latest alarm, by default False
Returns
-------
dict[str, dict[str, Any]]
If only_timestamp is False, returns a dictionary with the following structure: {object_name: {attribute: value, ...}, ...}
dict[str, datetime]
If only_timestamp is True, returns a dictionary with the following structure: {object_name: timestamp, ...}
"""
# getting object ids
obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
alarms = {}
for obj in object_names:
# using object id for faster query using index
query = sql.SQL(
"""SELECT * FROM v_alarms_hist WHERE object_id = {obj} ORDER BY "start" DESC LIMIT 1""",
).format(obj=sql.Literal(obj_ids[obj]))
# getting data
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query)
if df.empty:
alarms[obj] = {}
elif only_timestamp:
alarms[obj] = df["start"].iloc[0].to_pydatetime()
else:
alarms[obj] = df.to_dict(orient="records")[0]
alarms[obj]["start"] = alarms[obj]["start"].to_pydatetime()
if alarms[obj]["end"] is not None:
alarms[obj]["end"] = alarms[obj]["end"].to_pydatetime()
return alarms
insert(df, create_definitions=False, on_conflict='ignore')
¶
Inserts alarm history into the database.
Parameters:
-
(df¶DataFrame) –DataFrame containing the data to be inserted. It must have the following columns:
- object_name: str
- manufacturer_id: int
- alarm_type: str (A, W, S)
- start: datetime
- end: datetime, optional
- alarm_name: str, optional
- alarm_description: str, optional
- data_source_type: str, optional
Columns listed as optional are not mandatory, but they might be needed if create_definitions is set to True and any alarm definitions are missing.
-
(create_definitions¶bool, default:False) –If set to True, will create the alarm definitions that are not present in the data base. If set to False, alarms not defined in the database will be skipped. By default False
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"
Source code in echo_postgres/alarm_history.py
@validate_call
def insert(
self,
df: DataFrame,
create_definitions: bool = False,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
"""Inserts alarm history into the database.
Parameters
----------
df : DataFrame
DataFrame containing the data to be inserted. It must have the following columns:
- object_name: str
- manufacturer_id: int
- alarm_type: str (A, W, S)
- start: datetime
- end: datetime, optional
- alarm_name: str, optional
- alarm_description: str, optional
- data_source_type: str, optional
Columns listed as optional are not mandatory, but they might be needed if create_definitions is set to True and any alarm definitions are missing.
create_definitions : bool, optional
If set to True, will create the alarm definitions that are not present in the data base.
If set to False, alarms not defined in the database will be skipped.
By default False
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. Can be one of ["ignore", "update"].
By default "ignore"
"""
# checking columns
required_columns = {"object_name", "manufacturer_id", "alarm_type", "start"}
if not required_columns.issubset(set(df.columns)):
missing_columns = required_columns - set(df.columns)
raise ValueError(f"df is missing the following columns: {missing_columns}")
# creating a copy of the DataFrame
df = df.copy()
# creating end columns if missing
if "end" not in df.columns:
df["end"] = NA
df["end"] = df["end"].astype("datetime64[s]")
# creating definitions if needed
if create_definitions:
self._create_definitions(df)
# getting object id
obj_ids = self._perfdb.objects.instances.get_ids(object_names=df["object_name"].unique().tolist())
# creating column with object_id
df["object_id"] = df["object_name"].map(lambda x: obj_ids[x])
# getting object model of all objects
obj_names = df["object_name"].unique().tolist()
obj_models = self._perfdb.objects.instances.get(object_names=obj_names, output_type="dict")
df["object_model_name"] = df["object_name"].map(lambda x: obj_models[x]["object_model_name"])
# getting alarm definitions to get alarm_id
alarms_def = self._perfdb.alarms.definitions.get(object_models=df["object_model_name"].unique().tolist(), output_type="DataFrame")
# creating column with alarm_id
df = df.set_index(["object_model_name", "manufacturer_id", "alarm_type"])
df = df.merge(alarms_def[["id"]], how="left", left_index=True, right_index=True)
df = df.reset_index(drop=False)
# renaming id to alarm_id
df = df.rename(columns={"id": "alarm_id"})
# getting only the necessary columns
df = df[["object_id", "alarm_id", "start", "end"]].copy()
# making sure object_id and alarm_id are int columns
for col in ["object_id", "alarm_id"]:
df[col] = df[col].astype("int32[pyarrow]")
# dropping null
df = df.dropna(subset=["object_id", "alarm_id", "start"], how="any")
# dropping duplicates
df = df.drop_duplicates(subset=["object_id", "alarm_id", "start"])
# inserting data
if_exists_mapping = {
"ignore": "append",
"update": "update",
}
with self._perfdb.conn.reconnect() as conn:
conn.pandas_to_sql(
df=df,
table_name="alarms_history",
schema="performance",
if_exists=if_exists_mapping[on_conflict],
ignore_index=True,
)
logger.debug(f"Inserted {df.shape[0]} rows into alarms_history table")