Skip to content

Job Instances

JobInstances(perfdb)

Class used for handling job instances. Can be accessed via perfdb.jobs.instances.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(job_ids)

Deletes job instances from the database.

Parameters:

  • job_ids

    (list[int]) –

    List of ids of the job instances to delete.

Source code in echo_postgres/job_instances.py
@validate_call
def delete(
    self,
    job_ids: Annotated[list[int], Field(min_length=1)],
) -> None:
    """Deletes job instances from the database.

    Parameters
    ----------
    job_ids : list[int]
        List of ids of the job instances to delete.
    """
    # first we need to delete the job steps associated with the job instances
    query = sql.SQL("DELETE FROM performance.job_steps WHERE job_instance_id IN ({ids})").format(
        ids=sql.SQL(", ").join(map(sql.Literal, job_ids)),
    )
    with self._perfdb.conn.reconnect() as conn:
        conn.execute(query)

    # deleting the job instances
    query = sql.SQL("DELETE FROM performance.job_instances WHERE id IN ({ids})").format(
        ids=sql.SQL(", ").join(map(sql.Literal, job_ids)),
    )

    with self._perfdb.conn.reconnect() as conn:
        conn.execute(query)

get(job_ids=None, job_type_names=None, job_state_names=None, created_period=None, n_jobs=None, filter_type='and', output_type='dict')

Gets all job instances definitions with detailed information.

The most useful keys/columns returned are:

  • id (index)
  • job_type_id
  • job_type_name
  • job_state_id
  • job_state_name
  • priority
  • description
  • created_by_used_id
  • created_by_used_name
  • period_start
  • period_end
  • parameters
  • created
  • started
  • finished
  • duration
  • last_state_change
  • comment

Parameters:

  • job_ids

    (list[int] | None, default: None ) –

    List of job ids to filter the results. By default None

  • job_type_names

    (list[str] | None, default: None ) –

    List of job type names to filter the results. By default None

  • job_state_names

    (list[str] | None, default: None ) –

    List of job state names to filter the results. By default None

  • created_period

    (DateTimeRange | None, default: None ) –

    Period of creation of the job instances. By default None

  • n_jobs

    (int | None, default: None ) –

    Number of job instances to return. By default None

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

Returns:

  • dict[int, dict[str, Any]]

    In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}

  • DataFrame

    In case output_type is "DataFrame", returns a DataFrame with the following format: index = id, columns = [attribute, ...]

Source code in echo_postgres/job_instances.py
@validate_call
def get(
    self,
    job_ids: list[int] | None = None,
    job_type_names: list[str] | None = None,
    job_state_names: list[str] | None = None,
    created_period: DateTimeRange | None = None,
    n_jobs: int | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame"] = "dict",
) -> dict[int, dict[str, Any]] | DataFrame:
    """Gets all job instances definitions with detailed information.

    The most useful keys/columns returned are:

    - id (index)
    - job_type_id
    - job_type_name
    - job_state_id
    - job_state_name
    - priority
    - description
    - created_by_used_id
    - created_by_used_name
    - period_start
    - period_end
    - parameters
    - created
    - started
    - finished
    - duration
    - last_state_change
    - comment

    Parameters
    ----------
    job_ids : list[int] | None, optional
        List of job ids to filter the results. By default None
    job_type_names : list[str] | None, optional
        List of job type names to filter the results. By default None
    job_state_names : list[str] | None, optional
        List of job state names to filter the results. By default None
    created_period : DateTimeRange | None, optional
        Period of creation of the job instances. By default None
    n_jobs : int | None, optional
        Number of job instances to return. By default None
    filter_type : Literal["and", "or"]
        How to treat multiple filters. Can be one of ["and", "or"].
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"

    Returns
    -------
    dict[int, dict[str, Any]]
        In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}
    DataFrame
        In case output_type is "DataFrame", returns a DataFrame with the following format: index = id, columns = [attribute, ...]
    """
    # checking inputs
    where = self._check_get_args(
        job_ids=job_ids,
        job_type_names=job_type_names,
        job_state_names=job_state_names,
        created_period=created_period,
        n_jobs=n_jobs,
        filter_type=filter_type,
    )

    query = [
        sql.SQL("SELECT * FROM performance.v_job_instances "),
        where,
    ]
    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query, dtype_backend=None, post_convert="pyarrow")
    df = df.set_index("id")

    return df.to_dict(orient="index") if output_type == "dict" else df

get_ids(job_ids=None, job_type_names=None, job_state_names=None, created_period=None, n_jobs=None, filter_type='and')

Gets all job instances and their respective ids.

Parameters:

  • job_ids

    (list[int] | None, default: None ) –

    List of job ids to filter the results. By default None

  • job_type_names

    (list[str] | None, default: None ) –

    List of job type names to filter the results. By default None

  • job_state_names

    (list[str] | None, default: None ) –

    List of job state names to filter the results. By default None

  • created_period

    (DateTimeRange | None, default: None ) –

    Period of creation of the job instances. By default None

  • n_jobs

    (int | None, default: None ) –

    Number of job instances to return. By default None

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"].

Returns:

  • list[int]

    List of job ids.

Source code in echo_postgres/job_instances.py
@validate_call
def get_ids(
    self,
    job_ids: list[int] | None = None,
    job_type_names: list[str] | None = None,
    job_state_names: list[str] | None = None,
    created_period: DateTimeRange | None = None,
    n_jobs: int | None = None,
    filter_type: Literal["and", "or"] = "and",
) -> list[int]:
    """Gets all job instances and their respective ids.

    Parameters
    ----------
    job_ids : list[int] | None, optional
        List of job ids to filter the results. By default None
    job_type_names : list[str] | None, optional
        List of job type names to filter the results. By default None
    job_state_names : list[str] | None, optional
        List of job state names to filter the results. By default None
    created_period : DateTimeRange | None, optional
        Period of creation of the job instances. By default None
    n_jobs : int | None, optional
        Number of job instances to return. By default None
    filter_type : Literal["and", "or"]
        How to treat multiple filters. Can be one of ["and", "or"].

    Returns
    -------
    list[int]
        List of job ids.
    """
    # checking inputs
    where = self._check_get_args(
        job_ids=job_ids,
        job_type_names=job_type_names,
        job_state_names=job_state_names,
        created_period=created_period,
        n_jobs=n_jobs,
        filter_type=filter_type,
    )

    query = [
        sql.SQL("SELECT id FROM performance.v_job_instances "),
        where,
    ]
    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query)

    return df["id"].tolist()

insert(jobs)

Inserts job instances into the database.

Parameters:

  • jobs

    (list[dict[str, Any]]) –

    List of dictionaries with the following keys:

    • job_type_name (str): Name of the job type.
    • priority (int | None): Priority of the job instance (1 to 100, 1 being the highest priority). If set to None, the default priority will be used (99).
    • description (str): Description of the job.
    • created_by_user_name (str): Name of the user that created the job.
    • parameters (dict): Parameters of the job instance. Must match the parameters schema of the job type.

Returns:

  • list[int]

    List of ids of the inserted job instances.

  • list[dict[str, Any]]

    List of dictionaries with the job instances that were not inserted.

Source code in echo_postgres/job_instances.py
@validate_call
def insert(
    self,
    jobs: list[dict[str, Any]],
) -> tuple[list[int], list[dict[str, Any]]]:
    """Inserts job instances into the database.

    Parameters
    ----------
    jobs : list[dict[str, Any]]
        List of dictionaries with the following keys:

        - **job_type_name** (str): Name of the job type.
        - **priority** (int | None): Priority of the job instance (1 to 100, 1 being the highest priority). If set to None, the default priority will be used (99).
        - **description** (str): Description of the job.
        - **created_by_user_name** (str): Name of the user that created the job.
        - **parameters** (dict): Parameters of the job instance. Must match the parameters schema of the job type.

    Returns
    -------
    list[int]
        List of ids of the inserted job instances.
    list[dict[str, Any]]
        List of dictionaries with the job instances that were not inserted.
    """
    # checking input

    # getting the ids of the job types
    job_types = self._perfdb.jobs.types.get_ids()
    # getting user ids`
    users = self._perfdb.users.instances.get_ids()

    # schema that will be used by jsonschema to validate the input
    dict_schema = {
        "type": "object",
        "properties": {
            "job_type_name": {"type": ["string"], "enum": list(job_types.keys())},
            "priority": {"type": ["integer", "null"], "minimum": 1, "maximum": 100},
            "description": {"type": ["string"]},
            "created_by_user_name": {"type": ["string"], "enum": list(users.keys())},
            "parameters": {"type": ["object"]},
        },
        "required": [
            "job_type_name",
            "priority",
            "description",
            "created_by_user_name",
            "parameters",
        ],
        "additionalProperties": False,
    }

    # validating the input
    for job in jobs:
        try:
            validate(instance=job, schema=dict_schema)
        except Exception as e:
            raise ValueError(f"Invalid job instance {job}") from e

    # iterating jobs
    job_ids = []
    failed_jobs = []
    for job in jobs:
        try:
            # getting the job type id
            job["job_type_id"] = job_types[job["job_type_name"]]
            job.pop("job_type_name")
            # getting the user id
            job["created_by_user_id"] = users[job["created_by_user_name"]]
            job.pop("created_by_user_name")
            # popping priority if None
            if job["priority"] is None:
                job.pop("priority")

            # creating the query to insert the job instance
            query = sql.SQL(
                """
                INSERT INTO performance.job_instances (job_type_id, job_state_id, priority, description, created_by_user_id, parameters)
                VALUES (
                    {job_type_id},
                    DEFAULT,
                    {priority},
                    {description},
                    {created_by_user_id},
                    {parameters}
                )
                RETURNING id
                """,
            ).format(
                job_type_id=sql.Literal(job["job_type_id"]),
                priority=sql.SQL("DEFAULT") if "priority" not in job else sql.Literal(job["priority"]),
                description=sql.Literal(job["description"]),
                created_by_user_id=sql.Literal(job["created_by_user_id"]),
                parameters=sql.Literal(json.dumps(job["parameters"])),
            )

            # inserting the job instance
            with self._perfdb.conn.reconnect() as conn:
                cur = conn.execute(query)
                job_id = cur.fetchone()[0]
            job_ids.append(job_id)
        except Exception:
            failed_jobs.append(job)
            logger.exception(f"Error inserting job instance {job}")

    return job_ids, failed_jobs

update(jobs)

Updates job instances in the database.

Parameters:

  • jobs

    (dict[int, dict[str, Any]]) –

    Dictionary with the job instances to update. The keys are the ids of the job instances and the values are dictionaries with the following keys

    • job_state_name (str | None): Name of the job state to set. If set to None or not present, the job state will not be updated.
    • comment (str | None): Comment to add to the job instance. If set to None or not present, the comment will not be updated.

Returns:

  • dict[int, dict[str, Any]]

    Dictionary with the job instances that were not updated.

Source code in echo_postgres/job_instances.py
@validate_call
def update(
    self,
    jobs: dict[int, dict[str, Any]],
) -> dict[int, dict[str, Any]]:
    """Updates job instances in the database.

    Parameters
    ----------
    jobs : dict[int, dict[str, Any]]
        Dictionary with the job instances to update. The keys are the ids of the job instances and the values are dictionaries with the following keys

        - **job_state_name** (str | None): Name of the job state to set. If set to None or not present, the job state will not be updated.
        - **comment** (str | None): Comment to add to the job instance. If set to None or not present, the comment will not be updated.

    Returns
    -------
    dict[int, dict[str, Any]]
        Dictionary with the job instances that were not updated.
    """
    # checking input
    if not jobs:
        raise ValueError("jobs must not be empty")

    # getting the ids of the job states
    job_states = self._perfdb.jobs.states.get_ids()

    # iterating jobs
    failed_jobs = {}
    for job_id, job_data in jobs.items():
        try:
            # creating the query to update the job instance
            query = [sql.SQL("UPDATE performance.job_instances SET ")]

            update_attrs = []

            if "job_state_name" in job_data and job_data["job_state_name"] is not None:
                if job_data["job_state_name"] not in job_states:
                    raise ValueError(f"Job state {job_data['job_state_name']} does not exist")
                job_state_id = job_states[job_data["job_state_name"]]
                update_attrs.append(sql.SQL("job_state_id = {job_state_id}").format(job_state_id=sql.Literal(job_state_id)))

            if "comment" in job_data and job_data["comment"] is not None:
                update_attrs.append(sql.SQL("comment = {comment}").format(comment=sql.Literal(job_data["comment"])))

            if len(update_attrs) == 0:
                logger.warning(f"Job instance {job_id} was not updated because no valid fields were provided")
                failed_jobs[job_id] = job_data
                continue

            query.append(sql.SQL(", ").join(update_attrs))

            query.append(sql.SQL(" WHERE id = {job_id}").format(job_id=sql.Literal(job_id)))

            query = sql.Composed(query)

            with self._perfdb.conn.reconnect() as conn:
                conn.execute(query)

        except Exception:
            failed_jobs[job_id] = job_data
            logger.exception(f"Error updating job instance {job_id}")

    return failed_jobs