Object Group Instances¶
ObjectGroupInstances(perfdb)
¶
Class used for handling object group instances. Can be accessed via perfdb.objects.groups.instances.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
add_objects(object_group_type_name, object_group_name, object_names)
¶
Adds objects to an object group.
Parameters:
-
(object_group_type_name¶str) –Name of the object group type.
-
(object_group_name¶str) –Name of the object group.
-
(object_names¶list[str]) –List of object names to be added to the object group.
Source code in echo_postgres/object_group_instances.py
@validate_call
def add_objects(self, object_group_type_name: str, object_group_name: str, object_names: list[str]) -> None:
"""Adds objects to an object group.
Parameters
----------
object_group_type_name : str
Name of the object group type.
object_group_name : str
Name of the object group.
object_names : list[str]
List of object names to be added to the object group.
"""
# checking if the object group exists
existing_groups = self.get_ids()
if object_group_type_name not in existing_groups:
raise ValueError(f"object_group_type_name '{object_group_type_name}' does not exist")
if object_group_name not in existing_groups[object_group_type_name]:
raise ValueError(f"object_group_name '{object_group_name}' does not exist")
# checking if the objects exist
objs = self._perfdb.objects.instances.get_ids(object_names=object_names)
if len(objs) != len(object_names):
missing_objs = set(object_names) - set(objs)
raise ValueError(f"object_names '{missing_objs}' do not exist")
obj_group_id = existing_groups[object_group_type_name][object_group_name]
obj_ids = [objs[obj] for obj in object_names]
# constructing query to add to table object_groups_mapping
query = [
sql.SQL(
"INSERT INTO performance.object_groups_mapping (object_group_id, object_id) VALUES ",
),
]
query.append(
sql.SQL(", ").join(
sql.SQL("({obj_group_id}, {obj_id})").format(obj_group_id=sql.Literal(obj_group_id), obj_id=sql.Literal(obj_id))
for obj_id in obj_ids
),
)
query.append(sql.SQL(" ON CONFLICT DO NOTHING"))
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
logger.debug(f"Objects {object_names} added to object group '{object_group_name}' of type '{object_group_type_name}'")
get(object_group_types=None, object_group_names=None, filter_type='and', output_type='dict')
¶
Gets all object group instances with detailed information.
The most useful keys/columns returned are:
- object_group_id
- object_group_type_id
- object_ids
- object_names
- spe_ids
- spe_names
- description
Parameters:
-
(object_group_types¶list[str], default:None) –List of object group types to filter the results.
-
(object_group_names¶list[str], default:None) –List of object group names to filter the results.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(output_type¶Literal['dict', 'DataFrame'], default:'dict') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
Returns:
-
dict[str, dict[str, dict[str, Any]]]–In case output_type is "dict", returns a dictionary in the format {object_group_type_name: {object_group_name: {attribute: value, ...}, ...}, ...}
-
DataFrame–In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[object_group_type_name, object_group_name], columns = [attribute, ...]
Source code in echo_postgres/object_group_instances.py
@validate_call
def get(
self,
object_group_types: list[str] | None = None,
object_group_names: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame"] = "dict",
) -> dict[str, dict[str, dict[str, Any]]] | DataFrame:
"""Gets all object group instances with detailed information.
The most useful keys/columns returned are:
- object_group_id
- object_group_type_id
- object_ids
- object_names
- spe_ids
- spe_names
- description
Parameters
----------
object_group_types : list[str], optional
List of object group types to filter the results.
object_group_names : list[str], optional
List of object group names to filter the results.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
Returns
-------
dict[str, dict[str, dict[str, Any]]]
In case output_type is "dict", returns a dictionary in the format {object_group_type_name: {object_group_name: {attribute: value, ...}, ...}, ...}
DataFrame
In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[object_group_type_name, object_group_name], columns = [attribute, ...]
"""
query = [sql.SQL("SELECT * FROM performance.v_object_groups")]
where = []
if object_group_types:
where.append(
sql.SQL("object_group_type_name IN ({object_group_type_names})").format(
object_group_type_names=sql.SQL(", ").join(sql.Literal(ot) for ot in object_group_types),
),
)
if object_group_names:
where.append(
sql.SQL("object_group_name IN ({object_group_names})").format(
object_group_names=sql.SQL(", ").join(sql.Literal(og) for og in object_group_names),
),
)
if where:
query.append(sql.SQL(" WHERE "))
query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
query.append(sql.SQL(" ORDER BY object_group_type_name, object_group_name"))
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
# post_convert used to keep columns with list type
df = conn.read_to_pandas(query, post_convert="pyarrow")
df = df.set_index(["object_group_type_name", "object_group_name"])
if output_type == "dict":
result = df.to_dict(orient="index")
final_result = {}
for (ot, og), values in result.items():
if ot not in final_result:
final_result[ot] = {}
final_result[ot][og] = values
return final_result
return df
get_ids()
¶
Gets all object group instances and their respective ids.
Returns:
-
dict[str, dict[str, int]]–Dictionary with all object groups and their respective ids in the format {object_group_type_name: {object_group_name: object_group_id, ...}, ...}.
Source code in echo_postgres/object_group_instances.py
def get_ids(self) -> dict[str, dict[str, int]]:
"""Gets all object group instances and their respective ids.
Returns
-------
dict[str, dict[str, int]]
Dictionary with all object groups and their respective ids in the format {object_group_type_name: {object_group_name: object_group_id, ...}, ...}.
"""
query = sql.SQL(
"SELECT object_group_type_name, object_group_name, object_group_id FROM performance.v_object_groups ORDER BY object_group_type_name, object_group_name",
)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query)
result = df.set_index(["object_group_type_name", "object_group_name"])["object_group_id"].to_dict()
final_result = {}
# changing from dict of type {(object_group_type_name, object_group_name): object_group_id, ...} to dict of type {object_group_type_name: {object_group_name: object_group_id, ...}, ...}
for (ot, og), values in result.items():
if ot not in final_result:
final_result[ot] = {}
final_result[ot][og] = values
return final_result
remove_objects(object_group_type_name, object_group_name, object_names)
¶
Removes objects from an object group.
Parameters:
-
(object_group_type_name¶str) –Name of the object group type.
-
(object_group_name¶str) –Name of the object group.
-
(object_names¶list[str] | Literal['all']) –List of object names to be removed from the object group. If "all" is passed, all objects will be removed from the object group.
Source code in echo_postgres/object_group_instances.py
@validate_call
def remove_objects(self, object_group_type_name: str, object_group_name: str, object_names: list[str] | Literal["all"]) -> None:
"""Removes objects from an object group.
Parameters
----------
object_group_type_name : str
Name of the object group type.
object_group_name : str
Name of the object group.
object_names : list[str] | Literal["all"]
List of object names to be removed from the object group.
If "all" is passed, all objects will be removed from the object group.
"""
# checking if the object group exists
existing_groups = self.get_ids()
if object_group_type_name not in existing_groups:
raise ValueError(f"object_group_type_name '{object_group_type_name}' does not exist")
if object_group_name not in existing_groups[object_group_type_name]:
raise ValueError(f"object_group_name '{object_group_name}' does not exist")
# checking if the objects exist
objs = self._perfdb.objects.instances.get_ids(object_names=object_names)
if len(objs) != len(object_names):
missing_objs = set(object_names) - set(objs)
raise ValueError(f"object_names '{missing_objs}' do not exist")
obj_group_id = existing_groups[object_group_type_name][object_group_name]
obj_ids = [objs[obj] for obj in object_names]
# constructing query to remove from table object_groups_mapping
query = [
sql.SQL(
"DELETE FROM performance.object_groups_mapping WHERE object_group_id = {obj_group_id}",
).format(obj_group_id=sql.Literal(obj_group_id)),
]
if object_names != "all":
query.append(
sql.SQL(" AND object_id IN ({obj_ids})").format(
obj_ids=sql.SQL(", ").join(map(sql.Literal, obj_ids)),
),
)
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
logger.debug(f"Objects {object_names} removed from object group '{object_group_name}' of type '{object_group_type_name}'")