Skip to content

Job Steps

JobSteps(perfdb)

Class used for handling job steps. Can be accessed via perfdb.jobs.steps.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(job_step_ids)

Deletes job steps from the database.

Parameters:

  • job_step_ids

    (list[int]) –

    List of ids of the job steps to delete.

Source code in echo_postgres/job_steps.py
@validate_call
def delete(
    self,
    job_step_ids: Annotated[list[int], Field(min_length=1)],
) -> None:
    """Deletes job steps from the database.

    Parameters
    ----------
    job_step_ids : list[int]
        List of ids of the job steps to delete.

    """
    # deleting the job steps
    query = sql.SQL("DELETE FROM performance.job_steps WHERE id IN ({ids})").format(
        ids=sql.SQL(", ").join(map(sql.Literal, job_step_ids)),
    )

    with self._perfdb.conn.reconnect() as conn:
        conn.execute(query)

get(job_instance_ids=None, job_step_ids=None, job_type_names=None, job_state_names=None, created_period=None, object_names=None, feature_names=None, alarm_ids=None, filter_type='and', output_type='dict')

Gets all job steps definitions with detailed information.

The most useful keys/columns returned are:

  • id (index)
  • job_instance_id
  • job_type_id
  • job_type_name
  • job_state_id
  • job_state_name
  • priority
  • execution_order
  • object_id
  • object_name
  • alarm_id
  • alarm_type
  • alarm_manufacturer_id
  • alarm_name
  • feature_id
  • feature_name
  • parameters
  • created
  • started
  • finished
  • duration
  • last_state_change
  • comment

Parameters:

  • job_instance_ids

    (list[int] | None, default: None ) –

    List of job instance ids to filter the results. By default None

  • job_step_ids

    (list[int] | None, default: None ) –

    List of job step ids to filter the results. By default None

  • job_type_names

    (list[str] | None, default: None ) –

    List of job type names to filter the results. By default None

  • job_state_names

    (list[str] | None, default: None ) –

    List of job state names to filter the results. By default None

  • created_period

    (DateTimeRange | None, default: None ) –

    Period of creation of the job steps. By default None

  • object_names

    (list[str] | None, default: None ) –

    List of object names to filter the results. By default None

  • feature_names

    (list[str] | None, default: None ) –

    List of feature names to filter the results. By default None

  • alarm_ids

    (list[int] | None, default: None ) –

    List of alarm ids to filter the results. By default None

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

  • output_type

    (Literal['dict', 'DataFrame'], default: 'dict' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

Returns:

  • dict[int, dict[str, Any]]

    In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}

  • DataFrame

    In case output_type is "DataFrame", returns a DataFrame with the following format: index = id, columns = [attribute, ...]

Source code in echo_postgres/job_steps.py
@validate_call
def get(
    self,
    job_instance_ids: list[int] | None = None,
    job_step_ids: list[int] | None = None,
    job_type_names: list[str] | None = None,
    job_state_names: list[str] | None = None,
    created_period: DateTimeRange | None = None,
    object_names: list[str] | None = None,
    feature_names: list[str] | None = None,
    alarm_ids: list[int] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame"] = "dict",
) -> dict[int, dict[str, Any]] | DataFrame:
    """Gets all job steps definitions with detailed information.

    The most useful keys/columns returned are:

    - id (index)
    - job_instance_id
    - job_type_id
    - job_type_name
    - job_state_id
    - job_state_name
    - priority
    - execution_order
    - object_id
    - object_name
    - alarm_id
    - alarm_type
    - alarm_manufacturer_id
    - alarm_name
    - feature_id
    - feature_name
    - parameters
    - created
    - started
    - finished
    - duration
    - last_state_change
    - comment

    Parameters
    ----------
    job_instance_ids : list[int] | None, optional
        List of job instance ids to filter the results. By default None
    job_step_ids : list[int] | None, optional
        List of job step ids to filter the results. By default None
    job_type_names : list[str] | None, optional
        List of job type names to filter the results. By default None
    job_state_names : list[str] | None, optional
        List of job state names to filter the results. By default None
    created_period : DateTimeRange | None, optional
        Period of creation of the job steps. By default None
    object_names : list[str] | None, optional
        List of object names to filter the results. By default None
    feature_names : list[str] | None, optional
        List of feature names to filter the results. By default None
    alarm_ids : list[int] | None, optional
        List of alarm ids to filter the results. By default None
    filter_type : Literal["and", "or"]
        How to treat multiple filters. Can be one of ["and", "or"].
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"

    Returns
    -------
    dict[int, dict[str, Any]]
        In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}
    DataFrame
        In case output_type is "DataFrame", returns a DataFrame with the following format: index = id, columns = [attribute, ...]

    """
    # checking inputs
    where = self._check_get_args(
        job_instance_ids=job_instance_ids,
        job_step_ids=job_step_ids,
        job_type_names=job_type_names,
        job_state_names=job_state_names,
        created_period=created_period,
        object_names=object_names,
        feature_names=feature_names,
        alarm_ids=alarm_ids,
        filter_type=filter_type,
    )

    query = [
        sql.SQL("SELECT * FROM performance.v_job_steps "),
        where,
        sql.SQL(" ORDER BY execution_order"),
    ]
    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query, post_convert="pyarrow")
    df = df.set_index("id")

    return df.to_dict(orient="index") if output_type == "dict" else df

get_ids(job_instance_ids=None, job_step_ids=None, job_type_names=None, job_state_names=None, created_period=None, object_names=None, feature_names=None, alarm_ids=None, filter_type='and')

Gets all job steps and their respective ids.

Parameters:

  • job_instance_ids

    (list[int] | None, default: None ) –

    List of job instance ids to filter the results. By default None

  • job_step_ids

    (list[int] | None, default: None ) –

    List of job step ids to filter the results. By default None

  • job_type_names

    (list[str] | None, default: None ) –

    List of job type names to filter the results. By default None

  • job_state_names

    (list[str] | None, default: None ) –

    List of job state names to filter the results. By default None

  • created_period

    (DateTimeRange | None, default: None ) –

    Period of creation of the job steps. By default None

  • object_names

    (list[str] | None, default: None ) –

    List of object names to filter the results. By default None

  • feature_names

    (list[str] | None, default: None ) –

    List of feature names to filter the results. By default None

  • alarm_ids

    (list[int] | None, default: None ) –

    List of alarm ids to filter the results. By default None

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"].

Returns:

  • list[int]

    List of job ids.

Source code in echo_postgres/job_steps.py
@validate_call
def get_ids(
    self,
    job_instance_ids: list[int] | None = None,
    job_step_ids: list[int] | None = None,
    job_type_names: list[str] | None = None,
    job_state_names: list[str] | None = None,
    created_period: DateTimeRange | None = None,
    object_names: list[str] | None = None,
    feature_names: list[str] | None = None,
    alarm_ids: list[int] | None = None,
    filter_type: Literal["and", "or"] = "and",
) -> list[int]:
    """Gets all job steps and their respective ids.

    Parameters
    ----------
    job_instance_ids : list[int] | None, optional
        List of job instance ids to filter the results. By default None
    job_step_ids : list[int] | None, optional
        List of job step ids to filter the results. By default None
    job_type_names : list[str] | None, optional
        List of job type names to filter the results. By default None
    job_state_names : list[str] | None, optional
        List of job state names to filter the results. By default None
    created_period : DateTimeRange | None, optional
        Period of creation of the job steps. By default None
    object_names : list[str] | None, optional
        List of object names to filter the results. By default None
    feature_names : list[str] | None, optional
        List of feature names to filter the results. By default None
    alarm_ids : list[int] | None, optional
        List of alarm ids to filter the results. By default None
    filter_type : Literal["and", "or"]
        How to treat multiple filters. Can be one of ["and", "or"].

    Returns
    -------
    list[int]
        List of job ids.

    """
    # checking inputs
    where = self._check_get_args(
        job_instance_ids=job_instance_ids,
        job_step_ids=job_step_ids,
        job_type_names=job_type_names,
        job_state_names=job_state_names,
        created_period=created_period,
        object_names=object_names,
        feature_names=feature_names,
        alarm_ids=alarm_ids,
        filter_type=filter_type,
    )

    query = [
        sql.SQL("SELECT id FROM performance.job_steps "),
        where,
        sql.SQL(" ORDER BY execution_order"),
    ]
    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query)

    return df["id"].tolist()

insert(job_steps)

Inserts job steps into the database.

Parameters:

  • job_steps

    (list[dict[str, Any]]) –

    List of dictionaries with the following keys:

    • job_instance_id (int): Id of the job instance to which the job step belongs.
    • execution_order (int | None): Execution order (1 being the first). If set to None, the default value will be used (1).
    • object_name (str | None): Name of the object to which the job step is related. If set to None, the object will not be set.
    • alarm_id (int | None): Id of the alarm (database id, not manufacturer_id) to which the job step is related. If set to None, the alarm will not be set.
    • feature_name (str | None): Name of the feature to which the job step is related. If set to None, the feature will not be set.
    • parameters (dict[str, Any] | None): Parameters of the job step. If set to None, the parameters will not be set.

Returns:

  • list[int]

    List of ids of the inserted job steps. THey are ordered based on execution_order column.

Source code in echo_postgres/job_steps.py
@validate_call
def insert(
    self,
    job_steps: list[dict[str, Any]],
) -> list[int]:
    """Inserts job steps into the database.

    Parameters
    ----------
    job_steps : list[dict[str, Any]]
        List of dictionaries with the following keys:

        - **job_instance_id** (int): Id of the job instance to which the job step belongs.
        - **execution_order** (int | None): Execution order (1 being the first). If set to None, the default value will be used (1).
        - **object_name** (str | None): Name of the object to which the job step is related. If set to None, the object will not be set.
        - **alarm_id** (int | None): Id of the alarm (database id, not manufacturer_id) to which the job step is related. If set to None, the alarm will not be set.
        - **feature_name** (str | None): Name of the feature to which the job step is related. If set to None, the feature will not be set.
        - **parameters** (dict[str, Any] | None): Parameters of the job step. If set to None, the parameters will not be set.

    Returns
    -------
    list[int]
        List of ids of the inserted job steps. THey are ordered based on execution_order column.

    """
    # checking input

    # validating job instances, object_names, alarm_ids, and feature_names
    wanted_job_instance_ids = []
    wanted_object_names = []
    wanted_alarm_ids = []
    for job_step in job_steps:
        if job_step["job_instance_id"]:
            if not isinstance(job_step["job_instance_id"], int):
                raise TypeError(f"job_instance_id must be an integer, not {type(job_step['job_instance_id'])}")
            wanted_job_instance_ids.append(job_step["job_instance_id"])
        if job_step.get("object_name", None):
            wanted_object_names.append(job_step["object_name"])
        if job_step.get("alarm_id", None):
            wanted_alarm_ids.append(job_step["alarm_id"])

    wanted_job_instance_ids = list(set(wanted_job_instance_ids))
    wanted_object_names = list(set(wanted_object_names))
    wanted_alarm_ids = list(set(wanted_alarm_ids))
    wanted_feature_names = [job_step["feature_name"] for job_step in job_steps if job_step.get("feature_name", None)]

    job_instances = self._perfdb.jobs.instances.get_ids(job_ids=wanted_job_instance_ids)

    # objects
    if wanted_object_names:
        objects: DataFrame = self._perfdb.objects.instances.get(object_names=wanted_object_names, output_type="DataFrame")
        all_obj_models = objects[["object_model_id", "object_model_name"]].set_index("object_model_name")["object_model_id"].to_dict()
    else:
        objects = None
        all_obj_models = None
    # alarms
    if wanted_alarm_ids:
        alarms: DataFrame = self._perfdb.alarms.definitions.get(
            alarm_ids=wanted_alarm_ids,
            match_alarm_id_on="id",
            object_models=list(all_obj_models.keys()),
            output_type="DataFrame",
        )
        alarms = alarms.reset_index().set_index("id")
    else:
        alarms = None
    # features
    if wanted_feature_names:
        features = self._perfdb.features.definitions.get(
            feature_names=wanted_feature_names,
            object_models=list(all_obj_models.keys()),
            output_type="DataFrame",
        )
        features = features.reset_index().set_index(["object_model_id", "name"])
    else:
        features = None

    # checking if all job instances, objects, alarms, and features exist
    wrong_job_instances = set(wanted_job_instance_ids) - set(job_instances)
    if wrong_job_instances:
        raise ValueError(f"Job instances {wrong_job_instances} do not exist")
    if objects is not None:
        wrong_object_names = set(wanted_object_names) - set(objects.index)
        if wrong_object_names:
            raise ValueError(f"Objects {wrong_object_names} do not exist")
    if alarms is not None:
        wrong_alarm_ids = set(wanted_alarm_ids) - set(alarms.index)
        if wrong_alarm_ids:
            raise ValueError(f"Alarms {wrong_alarm_ids} do not exist")
    if features is not None:
        wrong_feature_names = set(wanted_feature_names) - set(features.index.get_level_values("name"))
        if wrong_feature_names:
            raise ValueError(f"Features {wrong_feature_names} do not exist")

    # making sure all parameters are json serializable
    for job_step in job_steps:
        if "parameters" in job_step and job_step["parameters"] is not None:
            try:
                json.dumps(job_step["parameters"])
            except (TypeError, ValueError) as e:
                raise ValueError(f"Parameters of job step {job_step} are not JSON serializable") from e

    # creating a DataFrame with the job steps
    df = pl.from_dicts(
        job_steps,
        schema={
            "job_instance_id": pl.Int64,
            "execution_order": pl.Int32,
            "object_name": pl.String,
            "alarm_id": pl.Int32,
            "feature_name": pl.String,
            "parameters": pl.Object,
        },
    )

    # merging objects
    if objects is not None:
        objects_df: pl.DataFrame = pl.from_pandas(
            objects.reset_index()[["name", "id", "object_model_id"]].astype({"name": "string", "id": "int", "object_model_id": "int"}),
        ).rename(
            {"id": "object_id", "name": "object_name"},
        )
        df = df.join(objects_df, on="object_name", how="left")
    # merging features
    if features is not None:
        features_df = pl.from_pandas(
            features.reset_index()[["name", "id", "object_model_id"]].astype(
                {"name": "string", "id": "int", "object_model_id": "int"},
            ),
        ).rename(
            {"name": "feature_name", "id": "feature_id"},
        )
        df = df.join(features_df, on=["feature_name", "object_model_id"], how="left")
    # adding missing columns
    for col in ["object_id", "feature_id"]:
        if col not in df.columns:
            df = df.with_columns(
                pl.lit(None).cast(pl.Int32).alias(col),
            )

    # keeping only the columns that will be inserted
    df = df[["job_instance_id", "execution_order", "object_id", "alarm_id", "feature_id", "parameters"]]

    # converting back to pandas and inserting the job steps
    df = df.to_pandas()

    t0 = perf_counter()
    with self._perfdb.conn.reconnect() as conn:
        ref_time = conn.execute("SELECT NOW()").fetchone()[0]
        conn.pandas_to_sql(
            df,
            table_name="job_steps",
            schema="performance",
            if_exists="skip_row_check",
            ignore_index=True,
        )
    logger.debug(f"Inserting job steps took {perf_counter() - t0:.2f} seconds")

    # getting the ids of the inserted job steps
    query = sql.SQL(
        "SELECT id FROM performance.job_steps WHERE job_instance_id IN ({ids}) AND created >= {ref_time} ORDER BY execution_order",
    ).format(
        ids=sql.SQL(", ").join(map(sql.Literal, wanted_job_instance_ids)),
        ref_time=sql.Literal(f"{ref_time:%Y-%m-%d %H:%M:%S}"),
    )
    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query)

    return df["id"].tolist()

update(job_steps)

Updates job steps in the database.

Parameters:

  • job_steps

    (dict[int, dict[str, Any]]) –

    Dictionary with the job steps to update. The keys are the ids of the job steps and the values are dictionaries with the following keys

    • job_state_name (str | None): Name of the job state to set. If set to None or not present, the job state will not be updated.
    • comment (str | None): Comment to add to the job step. If set to None or not present, the comment will not be updated.

Returns:

  • dict[int, dict[str, Any]]

    Dictionary with the job steps that were not updated.

Source code in echo_postgres/job_steps.py
@validate_call
def update(
    self,
    job_steps: dict[int, dict[str, Any]],
) -> dict[int, dict[str, Any]]:
    """Updates job steps in the database.

    Parameters
    ----------
    job_steps : dict[int, dict[str, Any]]
        Dictionary with the job steps to update. The keys are the ids of the job steps and the values are dictionaries with the following keys

        - **job_state_name** (str | None): Name of the job state to set. If set to None or not present, the job state will not be updated.
        - **comment** (str | None): Comment to add to the job step. If set to None or not present, the comment will not be updated.

    Returns
    -------
    dict[int, dict[str, Any]]
        Dictionary with the job steps that were not updated.

    """
    # checking input
    if not job_steps:
        raise ValueError("jobs must not be empty")

    # getting the ids of the job states
    job_states = self._perfdb.jobs.states.get_ids()

    # iterating jobs
    failed_jobs = {}
    for job_id, job_data in job_steps.items():
        try:
            # creating the query to update the job step
            query = [sql.SQL("UPDATE performance.job_steps SET ")]

            update_attrs = []

            if "job_state_name" in job_data and job_data["job_state_name"] is not None:
                if job_data["job_state_name"] not in job_states:
                    raise ValueError(f"Job state {job_data['job_state_name']} does not exist")
                job_state_id = job_states[job_data["job_state_name"]]
                update_attrs.append(sql.SQL("job_state_id = {job_state_id}").format(job_state_id=sql.Literal(job_state_id)))

            if "comment" in job_data and job_data["comment"] is not None:
                update_attrs.append(sql.SQL("comment = {comment}").format(comment=sql.Literal(job_data["comment"])))

            if len(update_attrs) == 0:
                logger.warning(f"Job step {job_id} was not updated because no valid fields were provided")
                failed_jobs[job_id] = job_data
                continue

            query.append(sql.SQL(", ").join(update_attrs))

            query.append(sql.SQL(" WHERE id = {job_id}").format(job_id=sql.Literal(job_id)))

            query = sql.Composed(query)

            with self._perfdb.conn.reconnect() as conn:
                conn.execute(query)

        except Exception:
            failed_jobs[job_id] = job_data
            logger.exception(f"Error updating job step {job_id}")

    return failed_jobs