Job Steps¶
JobSteps(perfdb)
¶
Class used for handling job steps. Can be accessed via perfdb.jobs.steps.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(job_step_ids)
¶
Deletes job steps from the database.
Parameters:
-
(job_step_ids¶list[int]) –List of ids of the job steps to delete.
Source code in echo_postgres/job_steps.py
@validate_call
def delete(
self,
job_step_ids: Annotated[list[int], Field(min_length=1)],
) -> None:
"""Deletes job steps from the database.
Parameters
----------
job_step_ids : list[int]
List of ids of the job steps to delete.
"""
# deleting the job steps
query = sql.SQL("DELETE FROM performance.job_steps WHERE id IN ({ids})").format(
ids=sql.SQL(", ").join(map(sql.Literal, job_step_ids)),
)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
get(job_instance_ids=None, job_step_ids=None, job_type_names=None, job_state_names=None, created_period=None, object_names=None, feature_names=None, alarm_ids=None, filter_type='and', output_type='dict')
¶
Gets all job steps definitions with detailed information.
The most useful keys/columns returned are:
- id (index)
- job_instance_id
- job_type_id
- job_type_name
- job_state_id
- job_state_name
- priority
- execution_order
- object_id
- object_name
- alarm_id
- alarm_type
- alarm_manufacturer_id
- alarm_name
- feature_id
- feature_name
- parameters
- created
- started
- finished
- duration
- last_state_change
- comment
Parameters:
-
(job_instance_ids¶list[int] | None, default:None) –List of job instance ids to filter the results. By default None
-
(job_step_ids¶list[int] | None, default:None) –List of job step ids to filter the results. By default None
-
(job_type_names¶list[str] | None, default:None) –List of job type names to filter the results. By default None
-
(job_state_names¶list[str] | None, default:None) –List of job state names to filter the results. By default None
-
(created_period¶DateTimeRange | None, default:None) –Period of creation of the job steps. By default None
-
(object_names¶list[str] | None, default:None) –List of object names to filter the results. By default None
-
(feature_names¶list[str] | None, default:None) –List of feature names to filter the results. By default None
-
(alarm_ids¶list[int] | None, default:None) –List of alarm ids to filter the results. By default None
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
-
(output_type¶Literal['dict', 'DataFrame'], default:'dict') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
Returns:
-
dict[int, dict[str, Any]]–In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}
-
DataFrame–In case output_type is "DataFrame", returns a DataFrame with the following format: index = id, columns = [attribute, ...]
Source code in echo_postgres/job_steps.py
@validate_call
def get(
self,
job_instance_ids: list[int] | None = None,
job_step_ids: list[int] | None = None,
job_type_names: list[str] | None = None,
job_state_names: list[str] | None = None,
created_period: DateTimeRange | None = None,
object_names: list[str] | None = None,
feature_names: list[str] | None = None,
alarm_ids: list[int] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame"] = "dict",
) -> dict[int, dict[str, Any]] | DataFrame:
"""Gets all job steps definitions with detailed information.
The most useful keys/columns returned are:
- id (index)
- job_instance_id
- job_type_id
- job_type_name
- job_state_id
- job_state_name
- priority
- execution_order
- object_id
- object_name
- alarm_id
- alarm_type
- alarm_manufacturer_id
- alarm_name
- feature_id
- feature_name
- parameters
- created
- started
- finished
- duration
- last_state_change
- comment
Parameters
----------
job_instance_ids : list[int] | None, optional
List of job instance ids to filter the results. By default None
job_step_ids : list[int] | None, optional
List of job step ids to filter the results. By default None
job_type_names : list[str] | None, optional
List of job type names to filter the results. By default None
job_state_names : list[str] | None, optional
List of job state names to filter the results. By default None
created_period : DateTimeRange | None, optional
Period of creation of the job steps. By default None
object_names : list[str] | None, optional
List of object names to filter the results. By default None
feature_names : list[str] | None, optional
List of feature names to filter the results. By default None
alarm_ids : list[int] | None, optional
List of alarm ids to filter the results. By default None
filter_type : Literal["and", "or"]
How to treat multiple filters. Can be one of ["and", "or"].
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
Returns
-------
dict[int, dict[str, Any]]
In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}
DataFrame
In case output_type is "DataFrame", returns a DataFrame with the following format: index = id, columns = [attribute, ...]
"""
# checking inputs
where = self._check_get_args(
job_instance_ids=job_instance_ids,
job_step_ids=job_step_ids,
job_type_names=job_type_names,
job_state_names=job_state_names,
created_period=created_period,
object_names=object_names,
feature_names=feature_names,
alarm_ids=alarm_ids,
filter_type=filter_type,
)
query = [
sql.SQL("SELECT * FROM performance.v_job_steps "),
where,
sql.SQL(" ORDER BY execution_order"),
]
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query, post_convert="pyarrow")
df = df.set_index("id")
return df.to_dict(orient="index") if output_type == "dict" else df
get_ids(job_instance_ids=None, job_step_ids=None, job_type_names=None, job_state_names=None, created_period=None, object_names=None, feature_names=None, alarm_ids=None, filter_type='and')
¶
Gets all job steps and their respective ids.
Parameters:
-
(job_instance_ids¶list[int] | None, default:None) –List of job instance ids to filter the results. By default None
-
(job_step_ids¶list[int] | None, default:None) –List of job step ids to filter the results. By default None
-
(job_type_names¶list[str] | None, default:None) –List of job type names to filter the results. By default None
-
(job_state_names¶list[str] | None, default:None) –List of job state names to filter the results. By default None
-
(created_period¶DateTimeRange | None, default:None) –Period of creation of the job steps. By default None
-
(object_names¶list[str] | None, default:None) –List of object names to filter the results. By default None
-
(feature_names¶list[str] | None, default:None) –List of feature names to filter the results. By default None
-
(alarm_ids¶list[int] | None, default:None) –List of alarm ids to filter the results. By default None
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"].
Returns:
-
list[int]–List of job ids.
Source code in echo_postgres/job_steps.py
@validate_call
def get_ids(
self,
job_instance_ids: list[int] | None = None,
job_step_ids: list[int] | None = None,
job_type_names: list[str] | None = None,
job_state_names: list[str] | None = None,
created_period: DateTimeRange | None = None,
object_names: list[str] | None = None,
feature_names: list[str] | None = None,
alarm_ids: list[int] | None = None,
filter_type: Literal["and", "or"] = "and",
) -> list[int]:
"""Gets all job steps and their respective ids.
Parameters
----------
job_instance_ids : list[int] | None, optional
List of job instance ids to filter the results. By default None
job_step_ids : list[int] | None, optional
List of job step ids to filter the results. By default None
job_type_names : list[str] | None, optional
List of job type names to filter the results. By default None
job_state_names : list[str] | None, optional
List of job state names to filter the results. By default None
created_period : DateTimeRange | None, optional
Period of creation of the job steps. By default None
object_names : list[str] | None, optional
List of object names to filter the results. By default None
feature_names : list[str] | None, optional
List of feature names to filter the results. By default None
alarm_ids : list[int] | None, optional
List of alarm ids to filter the results. By default None
filter_type : Literal["and", "or"]
How to treat multiple filters. Can be one of ["and", "or"].
Returns
-------
list[int]
List of job ids.
"""
# checking inputs
where = self._check_get_args(
job_instance_ids=job_instance_ids,
job_step_ids=job_step_ids,
job_type_names=job_type_names,
job_state_names=job_state_names,
created_period=created_period,
object_names=object_names,
feature_names=feature_names,
alarm_ids=alarm_ids,
filter_type=filter_type,
)
query = [
sql.SQL("SELECT id FROM performance.job_steps "),
where,
sql.SQL(" ORDER BY execution_order"),
]
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query)
return df["id"].tolist()
insert(job_steps)
¶
Inserts job steps into the database.
Parameters:
-
(job_steps¶list[dict[str, Any]]) –List of dictionaries with the following keys:
- job_instance_id (int): Id of the job instance to which the job step belongs.
- execution_order (int | None): Execution order (1 being the first). If set to None, the default value will be used (1).
- object_name (str | None): Name of the object to which the job step is related. If set to None, the object will not be set.
- alarm_id (int | None): Id of the alarm (database id, not manufacturer_id) to which the job step is related. If set to None, the alarm will not be set.
- feature_name (str | None): Name of the feature to which the job step is related. If set to None, the feature will not be set.
- parameters (dict[str, Any] | None): Parameters of the job step. If set to None, the parameters will not be set.
Returns:
-
list[int]–List of ids of the inserted job steps. THey are ordered based on execution_order column.
Source code in echo_postgres/job_steps.py
@validate_call
def insert(
self,
job_steps: list[dict[str, Any]],
) -> list[int]:
"""Inserts job steps into the database.
Parameters
----------
job_steps : list[dict[str, Any]]
List of dictionaries with the following keys:
- **job_instance_id** (int): Id of the job instance to which the job step belongs.
- **execution_order** (int | None): Execution order (1 being the first). If set to None, the default value will be used (1).
- **object_name** (str | None): Name of the object to which the job step is related. If set to None, the object will not be set.
- **alarm_id** (int | None): Id of the alarm (database id, not manufacturer_id) to which the job step is related. If set to None, the alarm will not be set.
- **feature_name** (str | None): Name of the feature to which the job step is related. If set to None, the feature will not be set.
- **parameters** (dict[str, Any] | None): Parameters of the job step. If set to None, the parameters will not be set.
Returns
-------
list[int]
List of ids of the inserted job steps. THey are ordered based on execution_order column.
"""
# checking input
# validating job instances, object_names, alarm_ids, and feature_names
wanted_job_instance_ids = []
wanted_object_names = []
wanted_alarm_ids = []
for job_step in job_steps:
if job_step["job_instance_id"]:
if not isinstance(job_step["job_instance_id"], int):
raise TypeError(f"job_instance_id must be an integer, not {type(job_step['job_instance_id'])}")
wanted_job_instance_ids.append(job_step["job_instance_id"])
if job_step.get("object_name", None):
wanted_object_names.append(job_step["object_name"])
if job_step.get("alarm_id", None):
wanted_alarm_ids.append(job_step["alarm_id"])
wanted_job_instance_ids = list(set(wanted_job_instance_ids))
wanted_object_names = list(set(wanted_object_names))
wanted_alarm_ids = list(set(wanted_alarm_ids))
wanted_feature_names = [job_step["feature_name"] for job_step in job_steps if job_step.get("feature_name", None)]
job_instances = self._perfdb.jobs.instances.get_ids(job_ids=wanted_job_instance_ids)
# objects
if wanted_object_names:
objects: DataFrame = self._perfdb.objects.instances.get(object_names=wanted_object_names, output_type="DataFrame")
all_obj_models = objects[["object_model_id", "object_model_name"]].set_index("object_model_name")["object_model_id"].to_dict()
else:
objects = None
all_obj_models = None
# alarms
if wanted_alarm_ids:
alarms: DataFrame = self._perfdb.alarms.definitions.get(
alarm_ids=wanted_alarm_ids,
match_alarm_id_on="id",
object_models=list(all_obj_models.keys()),
output_type="DataFrame",
)
alarms = alarms.reset_index().set_index("id")
else:
alarms = None
# features
if wanted_feature_names:
features = self._perfdb.features.definitions.get(
feature_names=wanted_feature_names,
object_models=list(all_obj_models.keys()),
output_type="DataFrame",
)
features = features.reset_index().set_index(["object_model_id", "name"])
else:
features = None
# checking if all job instances, objects, alarms, and features exist
wrong_job_instances = set(wanted_job_instance_ids) - set(job_instances)
if wrong_job_instances:
raise ValueError(f"Job instances {wrong_job_instances} do not exist")
if objects is not None:
wrong_object_names = set(wanted_object_names) - set(objects.index)
if wrong_object_names:
raise ValueError(f"Objects {wrong_object_names} do not exist")
if alarms is not None:
wrong_alarm_ids = set(wanted_alarm_ids) - set(alarms.index)
if wrong_alarm_ids:
raise ValueError(f"Alarms {wrong_alarm_ids} do not exist")
if features is not None:
wrong_feature_names = set(wanted_feature_names) - set(features.index.get_level_values("name"))
if wrong_feature_names:
raise ValueError(f"Features {wrong_feature_names} do not exist")
# making sure all parameters are json serializable
for job_step in job_steps:
if "parameters" in job_step and job_step["parameters"] is not None:
try:
json.dumps(job_step["parameters"])
except (TypeError, ValueError) as e:
raise ValueError(f"Parameters of job step {job_step} are not JSON serializable") from e
# creating a DataFrame with the job steps
df = pl.from_dicts(
job_steps,
schema={
"job_instance_id": pl.Int64,
"execution_order": pl.Int32,
"object_name": pl.String,
"alarm_id": pl.Int32,
"feature_name": pl.String,
"parameters": pl.Object,
},
)
# merging objects
if objects is not None:
objects_df: pl.DataFrame = pl.from_pandas(
objects.reset_index()[["name", "id", "object_model_id"]].astype({"name": "string", "id": "int", "object_model_id": "int"}),
).rename(
{"id": "object_id", "name": "object_name"},
)
df = df.join(objects_df, on="object_name", how="left")
# merging features
if features is not None:
features_df = pl.from_pandas(
features.reset_index()[["name", "id", "object_model_id"]].astype(
{"name": "string", "id": "int", "object_model_id": "int"},
),
).rename(
{"name": "feature_name", "id": "feature_id"},
)
df = df.join(features_df, on=["feature_name", "object_model_id"], how="left")
# adding missing columns
for col in ["object_id", "feature_id"]:
if col not in df.columns:
df = df.with_columns(
pl.lit(None).cast(pl.Int32).alias(col),
)
# keeping only the columns that will be inserted
df = df[["job_instance_id", "execution_order", "object_id", "alarm_id", "feature_id", "parameters"]]
# converting back to pandas and inserting the job steps
df = df.to_pandas()
t0 = perf_counter()
with self._perfdb.conn.reconnect() as conn:
ref_time = conn.execute("SELECT NOW()").fetchone()[0]
conn.pandas_to_sql(
df,
table_name="job_steps",
schema="performance",
if_exists="skip_row_check",
ignore_index=True,
)
logger.debug(f"Inserting job steps took {perf_counter() - t0:.2f} seconds")
# getting the ids of the inserted job steps
query = sql.SQL(
"SELECT id FROM performance.job_steps WHERE job_instance_id IN ({ids}) AND created >= {ref_time} ORDER BY execution_order",
).format(
ids=sql.SQL(", ").join(map(sql.Literal, wanted_job_instance_ids)),
ref_time=sql.Literal(f"{ref_time:%Y-%m-%d %H:%M:%S}"),
)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query)
return df["id"].tolist()
update(job_steps)
¶
Updates job steps in the database.
Parameters:
-
(job_steps¶dict[int, dict[str, Any]]) –Dictionary with the job steps to update. The keys are the ids of the job steps and the values are dictionaries with the following keys
- job_state_name (str | None): Name of the job state to set. If set to None or not present, the job state will not be updated.
- comment (str | None): Comment to add to the job step. If set to None or not present, the comment will not be updated.
Returns:
-
dict[int, dict[str, Any]]–Dictionary with the job steps that were not updated.
Source code in echo_postgres/job_steps.py
@validate_call
def update(
self,
job_steps: dict[int, dict[str, Any]],
) -> dict[int, dict[str, Any]]:
"""Updates job steps in the database.
Parameters
----------
job_steps : dict[int, dict[str, Any]]
Dictionary with the job steps to update. The keys are the ids of the job steps and the values are dictionaries with the following keys
- **job_state_name** (str | None): Name of the job state to set. If set to None or not present, the job state will not be updated.
- **comment** (str | None): Comment to add to the job step. If set to None or not present, the comment will not be updated.
Returns
-------
dict[int, dict[str, Any]]
Dictionary with the job steps that were not updated.
"""
# checking input
if not job_steps:
raise ValueError("jobs must not be empty")
# getting the ids of the job states
job_states = self._perfdb.jobs.states.get_ids()
# iterating jobs
failed_jobs = {}
for job_id, job_data in job_steps.items():
try:
# creating the query to update the job step
query = [sql.SQL("UPDATE performance.job_steps SET ")]
update_attrs = []
if "job_state_name" in job_data and job_data["job_state_name"] is not None:
if job_data["job_state_name"] not in job_states:
raise ValueError(f"Job state {job_data['job_state_name']} does not exist")
job_state_id = job_states[job_data["job_state_name"]]
update_attrs.append(sql.SQL("job_state_id = {job_state_id}").format(job_state_id=sql.Literal(job_state_id)))
if "comment" in job_data and job_data["comment"] is not None:
update_attrs.append(sql.SQL("comment = {comment}").format(comment=sql.Literal(job_data["comment"])))
if len(update_attrs) == 0:
logger.warning(f"Job step {job_id} was not updated because no valid fields were provided")
failed_jobs[job_id] = job_data
continue
query.append(sql.SQL(", ").join(update_attrs))
query.append(sql.SQL(" WHERE id = {job_id}").format(job_id=sql.Literal(job_id)))
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
except Exception:
failed_jobs[job_id] = job_data
logger.exception(f"Error updating job step {job_id}")
return failed_jobs