Job Instances¶
JobInstances(perfdb)
¶
Class used for handling job instances. Can be accessed via perfdb.jobs.instances.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(job_ids)
¶
Deletes job instances from the database.
Parameters:
-
(job_ids¶list[int]) –List of ids of the job instances to delete.
Source code in echo_postgres/job_instances.py
@validate_call
def delete(
self,
job_ids: Annotated[list[int], Field(min_length=1)],
) -> None:
"""Deletes job instances from the database.
Parameters
----------
job_ids : list[int]
List of ids of the job instances to delete.
"""
# first we need to delete the job steps associated with the job instances
query = sql.SQL("DELETE FROM performance.job_steps WHERE job_instance_id IN ({ids})").format(
ids=sql.SQL(", ").join(map(sql.Literal, job_ids)),
)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
# deleting the job instances
query = sql.SQL("DELETE FROM performance.job_instances WHERE id IN ({ids})").format(
ids=sql.SQL(", ").join(map(sql.Literal, job_ids)),
)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
get(job_ids=None, job_type_names=None, job_state_names=None, created_period=None, n_jobs=None, filter_type='and', output_type='dict')
¶
Gets all job instances definitions with detailed information.
The most useful keys/columns returned are:
- id (index)
- job_type_id
- job_type_name
- job_state_id
- job_state_name
- priority
- description
- created_by_used_id
- created_by_used_name
- period_start
- period_end
- parameters
- created
- started
- finished
- duration
- last_state_change
- comment
Parameters:
-
(job_ids¶list[int] | None, default:None) –List of job ids to filter the results. By default None
-
(job_type_names¶list[str] | None, default:None) –List of job type names to filter the results. By default None
-
(job_state_names¶list[str] | None, default:None) –List of job state names to filter the results. By default None
-
(created_period¶DateTimeRange | None, default:None) –Period of creation of the job instances. By default None
-
(n_jobs¶int | None, default:None) –Number of job instances to return. By default None
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
Returns:
-
dict[int, dict[str, Any]]–In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}
-
DataFrame–In case output_type is "DataFrame", returns a DataFrame with the following format: index = id, columns = [attribute, ...]
Source code in echo_postgres/job_instances.py
@validate_call
def get(
self,
job_ids: list[int] | None = None,
job_type_names: list[str] | None = None,
job_state_names: list[str] | None = None,
created_period: DateTimeRange | None = None,
n_jobs: int | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame"] = "dict",
) -> dict[int, dict[str, Any]] | DataFrame:
"""Gets all job instances definitions with detailed information.
The most useful keys/columns returned are:
- id (index)
- job_type_id
- job_type_name
- job_state_id
- job_state_name
- priority
- description
- created_by_used_id
- created_by_used_name
- period_start
- period_end
- parameters
- created
- started
- finished
- duration
- last_state_change
- comment
Parameters
----------
job_ids : list[int] | None, optional
List of job ids to filter the results. By default None
job_type_names : list[str] | None, optional
List of job type names to filter the results. By default None
job_state_names : list[str] | None, optional
List of job state names to filter the results. By default None
created_period : DateTimeRange | None, optional
Period of creation of the job instances. By default None
n_jobs : int | None, optional
Number of job instances to return. By default None
filter_type : Literal["and", "or"]
How to treat multiple filters. Can be one of ["and", "or"].
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
Returns
-------
dict[int, dict[str, Any]]
In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}
DataFrame
In case output_type is "DataFrame", returns a DataFrame with the following format: index = id, columns = [attribute, ...]
"""
# checking inputs
where = self._check_get_args(
job_ids=job_ids,
job_type_names=job_type_names,
job_state_names=job_state_names,
created_period=created_period,
n_jobs=n_jobs,
filter_type=filter_type,
)
query = [
sql.SQL("SELECT * FROM performance.v_job_instances "),
where,
]
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query, dtype_backend=None, post_convert="pyarrow")
df = df.set_index("id")
return df.to_dict(orient="index") if output_type == "dict" else df
get_ids(job_ids=None, job_type_names=None, job_state_names=None, created_period=None, n_jobs=None, filter_type='and')
¶
Gets all job instances and their respective ids.
Parameters:
-
(job_ids¶list[int] | None, default:None) –List of job ids to filter the results. By default None
-
(job_type_names¶list[str] | None, default:None) –List of job type names to filter the results. By default None
-
(job_state_names¶list[str] | None, default:None) –List of job state names to filter the results. By default None
-
(created_period¶DateTimeRange | None, default:None) –Period of creation of the job instances. By default None
-
(n_jobs¶int | None, default:None) –Number of job instances to return. By default None
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"].
Returns:
-
list[int]–List of job ids.
Source code in echo_postgres/job_instances.py
@validate_call
def get_ids(
self,
job_ids: list[int] | None = None,
job_type_names: list[str] | None = None,
job_state_names: list[str] | None = None,
created_period: DateTimeRange | None = None,
n_jobs: int | None = None,
filter_type: Literal["and", "or"] = "and",
) -> list[int]:
"""Gets all job instances and their respective ids.
Parameters
----------
job_ids : list[int] | None, optional
List of job ids to filter the results. By default None
job_type_names : list[str] | None, optional
List of job type names to filter the results. By default None
job_state_names : list[str] | None, optional
List of job state names to filter the results. By default None
created_period : DateTimeRange | None, optional
Period of creation of the job instances. By default None
n_jobs : int | None, optional
Number of job instances to return. By default None
filter_type : Literal["and", "or"]
How to treat multiple filters. Can be one of ["and", "or"].
Returns
-------
list[int]
List of job ids.
"""
# checking inputs
where = self._check_get_args(
job_ids=job_ids,
job_type_names=job_type_names,
job_state_names=job_state_names,
created_period=created_period,
n_jobs=n_jobs,
filter_type=filter_type,
)
query = [
sql.SQL("SELECT id FROM performance.v_job_instances "),
where,
]
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query)
return df["id"].tolist()
insert(jobs)
¶
Inserts job instances into the database.
Parameters:
-
(jobs¶list[dict[str, Any]]) –List of dictionaries with the following keys:
- job_type_name (str): Name of the job type.
- priority (int | None): Priority of the job instance (1 to 100, 1 being the highest priority). If set to None, the default priority will be used (99).
- description (str): Description of the job.
- created_by_user_name (str): Name of the user that created the job.
- parameters (dict): Parameters of the job instance. Must match the parameters schema of the job type.
Returns:
-
list[int]–List of ids of the inserted job instances.
-
list[dict[str, Any]]–List of dictionaries with the job instances that were not inserted.
Source code in echo_postgres/job_instances.py
@validate_call
def insert(
self,
jobs: list[dict[str, Any]],
) -> tuple[list[int], list[dict[str, Any]]]:
"""Inserts job instances into the database.
Parameters
----------
jobs : list[dict[str, Any]]
List of dictionaries with the following keys:
- **job_type_name** (str): Name of the job type.
- **priority** (int | None): Priority of the job instance (1 to 100, 1 being the highest priority). If set to None, the default priority will be used (99).
- **description** (str): Description of the job.
- **created_by_user_name** (str): Name of the user that created the job.
- **parameters** (dict): Parameters of the job instance. Must match the parameters schema of the job type.
Returns
-------
list[int]
List of ids of the inserted job instances.
list[dict[str, Any]]
List of dictionaries with the job instances that were not inserted.
"""
# checking input
# getting the ids of the job types
job_types = self._perfdb.jobs.types.get_ids()
# getting user ids`
users = self._perfdb.users.instances.get_ids()
# schema that will be used by jsonschema to validate the input
dict_schema = {
"type": "object",
"properties": {
"job_type_name": {"type": ["string"], "enum": list(job_types.keys())},
"priority": {"type": ["integer", "null"], "minimum": 1, "maximum": 100},
"description": {"type": ["string"]},
"created_by_user_name": {"type": ["string"], "enum": list(users.keys())},
"parameters": {"type": ["object"]},
},
"required": [
"job_type_name",
"priority",
"description",
"created_by_user_name",
"parameters",
],
"additionalProperties": False,
}
# validating the input
for job in jobs:
try:
validate(instance=job, schema=dict_schema)
except Exception as e:
raise ValueError(f"Invalid job instance {job}") from e
# iterating jobs
job_ids = []
failed_jobs = []
for job in jobs:
try:
# getting the job type id
job["job_type_id"] = job_types[job["job_type_name"]]
job.pop("job_type_name")
# getting the user id
job["created_by_user_id"] = users[job["created_by_user_name"]]
job.pop("created_by_user_name")
# popping priority if None
if job["priority"] is None:
job.pop("priority")
# creating the query to insert the job instance
query = sql.SQL(
"""
INSERT INTO performance.job_instances (job_type_id, job_state_id, priority, description, created_by_user_id, parameters)
VALUES (
{job_type_id},
DEFAULT,
{priority},
{description},
{created_by_user_id},
{parameters}
)
RETURNING id
""",
).format(
job_type_id=sql.Literal(job["job_type_id"]),
priority=sql.SQL("DEFAULT") if "priority" not in job else sql.Literal(job["priority"]),
description=sql.Literal(job["description"]),
created_by_user_id=sql.Literal(job["created_by_user_id"]),
parameters=sql.Literal(json.dumps(job["parameters"])),
)
# inserting the job instance
with self._perfdb.conn.reconnect() as conn:
cur = conn.execute(query)
job_id = cur.fetchone()[0]
job_ids.append(job_id)
except Exception:
failed_jobs.append(job)
logger.exception(f"Error inserting job instance {job}")
return job_ids, failed_jobs
update(jobs)
¶
Updates job instances in the database.
Parameters:
-
(jobs¶dict[int, dict[str, Any]]) –Dictionary with the job instances to update. The keys are the ids of the job instances and the values are dictionaries with the following keys
- job_state_name (str | None): Name of the job state to set. If set to None or not present, the job state will not be updated.
- comment (str | None): Comment to add to the job instance. If set to None or not present, the comment will not be updated.
Returns:
-
dict[int, dict[str, Any]]–Dictionary with the job instances that were not updated.
Source code in echo_postgres/job_instances.py
@validate_call
def update(
self,
jobs: dict[int, dict[str, Any]],
) -> dict[int, dict[str, Any]]:
"""Updates job instances in the database.
Parameters
----------
jobs : dict[int, dict[str, Any]]
Dictionary with the job instances to update. The keys are the ids of the job instances and the values are dictionaries with the following keys
- **job_state_name** (str | None): Name of the job state to set. If set to None or not present, the job state will not be updated.
- **comment** (str | None): Comment to add to the job instance. If set to None or not present, the comment will not be updated.
Returns
-------
dict[int, dict[str, Any]]
Dictionary with the job instances that were not updated.
"""
# checking input
if not jobs:
raise ValueError("jobs must not be empty")
# getting the ids of the job states
job_states = self._perfdb.jobs.states.get_ids()
# iterating jobs
failed_jobs = {}
for job_id, job_data in jobs.items():
try:
# creating the query to update the job instance
query = [sql.SQL("UPDATE performance.job_instances SET ")]
update_attrs = []
if "job_state_name" in job_data and job_data["job_state_name"] is not None:
if job_data["job_state_name"] not in job_states:
raise ValueError(f"Job state {job_data['job_state_name']} does not exist")
job_state_id = job_states[job_data["job_state_name"]]
update_attrs.append(sql.SQL("job_state_id = {job_state_id}").format(job_state_id=sql.Literal(job_state_id)))
if "comment" in job_data and job_data["comment"] is not None:
update_attrs.append(sql.SQL("comment = {comment}").format(comment=sql.Literal(job_data["comment"])))
if len(update_attrs) == 0:
logger.warning(f"Job instance {job_id} was not updated because no valid fields were provided")
failed_jobs[job_id] = job_data
continue
query.append(sql.SQL(", ").join(update_attrs))
query.append(sql.SQL(" WHERE id = {job_id}").format(job_id=sql.Literal(job_id)))
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
except Exception:
failed_jobs[job_id] = job_data
logger.exception(f"Error updating job instance {job_id}")
return failed_jobs