Data Source Instances¶
DataSourceInstances(perfdb)
¶
Class used for handling data source instances. Can be accessed via perfdb.datasources.instances.
Parameters:
Source code in echo_postgres/datasource_instances.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Class used for handling data source instances. Can be accessed via `perfdb.datasources.instances`.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
super().__init__(perfdb)
from .datasource_instance_attributes import DataSourceInstanceAttributes
# * subclasses
self.attributes = DataSourceInstanceAttributes(perfdb)
add_objects(data_source_name, object_names)
¶
Adds objects to a data source instance.
Parameters:
-
(data_source_name¶str) –Name of the data source instance.
-
(object_names¶list[str]) –List of object names to be added to the data source instance.
Source code in echo_postgres/datasource_instances.py
@validate_call
def add_objects(self, data_source_name: str, object_names: list[str]) -> None:
"""Adds objects to a data source instance.
Parameters
----------
data_source_name : str
Name of the data source instance.
object_names : list[str]
List of object names to be added to the data source instance.
"""
# checking if data source exists
exisitng_data_sources = self.get_ids()
if data_source_name not in exisitng_data_sources:
raise ValueError(f"data_source_name {data_source_name} does not exist")
# checking if objects exists
objs = self._perfdb.objects.instances.get_ids(object_names=object_names)
if len(objs) != len(object_names):
missing_objs = set(object_names) - set(objs)
raise ValueError(f"object_names '{missing_objs}' do not exist")
data_source_id = exisitng_data_sources[data_source_name]
object_ids = [objs[name] for name in object_names]
# constructing query
query = [
sql.SQL(
"INSERT INTO performance.data_source_object_connections (data_source_id, object_id) VALUES ",
),
]
query.append(
sql.SQL(", ").join(
sql.SQL("({data_source_id}, {obj_id})").format(data_source_id=sql.Literal(data_source_id), obj_id=sql.Literal(obj_id))
for obj_id in object_ids
),
)
query.append(sql.SQL(" ON CONFLICT DO NOTHING"))
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
logger.debug(f"Added objects {object_names} to data source {data_source_name}")
get(data_source_names=None, data_source_types_names=None, filter_type='and', get_attributes=False, attribute_names=None, output_type='dict')
¶
Gets all data source instances definitions with detailed information.
The most useful keys/columns returned are:
- id
- display_name
- description
- data_source_type_id
- data_source_type_name
- object_ids
- object_names
Parameters:
-
(data_source_names¶list[str], default:None) –List of data source names to be retrieved. If None, retrieves all data sources. By default None
-
(data_source_types_names¶list[str], default:None) –List of data source type names to be retrieved. If None, retrieves all data source types. By default None
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(get_attributes¶bool, default:False) –If True, will also get the attributes of the data sources. It's highly recommended to specify the attribute names to get to speed up the query.
By default False
-
(attribute_names¶list[str] | None, default:None) –List of attribute names to get if get_attributes is True. In case get_attributes is False, this parameter is ignored. If None, all attributes are returned.
By default None
-
(output_type¶Literal['dict', 'DataFrame'], default:'dict') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
Returns:
-
dict[str, dict[str, str | int]]–In case output_instance is "dict", returns a dictionary in the format {name: {attribute: value, ...}, ...}
-
DataFrame–In case output_instance is "DataFrame", returns a DataFrame with the following format: index = name, columns = [attribute, ...]
Source code in echo_postgres/datasource_instances.py
@validate_call
def get(
self,
data_source_names: list[str] | None = None,
data_source_types_names: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
get_attributes: bool = False,
attribute_names: list[str] | None = None,
output_type: Literal["dict", "DataFrame"] = "dict",
) -> dict[str, dict[str, str | int]] | DataFrame:
"""Gets all data source instances definitions with detailed information.
The most useful keys/columns returned are:
- id
- display_name
- description
- data_source_type_id
- data_source_type_name
- object_ids
- object_names
Parameters
----------
data_source_names : list[str], optional
List of data source names to be retrieved. If None, retrieves all data sources.
By default None
data_source_types_names : list[str], optional
List of data source type names to be retrieved. If None, retrieves all data source types.
By default None
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
get_attributes : bool, optional
If True, will also get the attributes of the data sources. It's highly recommended to specify the attribute names to get
to speed up the query.
By default False
attribute_names : list[str] | None, optional
List of attribute names to get if get_attributes is True. In case get_attributes is False, this parameter is ignored. If None, all attributes are returned.
By default None
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
Returns
-------
dict[str, dict[str, str | int]]
In case output_instance is "dict", returns a dictionary in the format {name: {attribute: value, ...}, ...}
DataFrame
In case output_instance is "DataFrame", returns a DataFrame with the following format: index = name, columns = [attribute, ...]
"""
# building query
query = [sql.SQL("SELECT * FROM performance.v_data_sources")]
where = []
if data_source_names:
where.append(
sql.SQL("name IN ({data_source_names})").format(
data_source_names=sql.SQL(", ").join(sql.Literal(name) for name in data_source_names),
),
)
if data_source_types_names:
where.append(
sql.SQL("data_source_type_name IN ({data_source_types_names})").format(
data_source_types_names=sql.SQL(", ").join(sql.Literal(name) for name in data_source_types_names),
),
)
if where:
query.append(sql.SQL(" WHERE "))
query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
# adding order by
query.append(sql.SQL(" ORDER BY name"))
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query, post_convert="pyarrow")
df = df.set_index("name")
# getting attributes
if get_attributes:
# names of the data sources
got_data_sources = df.index.tolist()
attrs: DataFrame = self._perfdb.datasources.instances.attributes.get(
data_sources=got_data_sources,
attribute_names=attribute_names,
output_type="DataFrame",
values_only=True,
)
# pivot the attributes
attrs = attrs.reset_index(drop=False).pivot(index="data_source_name", columns="attribute_name", values="attribute_value")
# merging the attributes with the object models
df = df.merge(attrs, left_index=True, right_index=True, how="left")
return df.to_dict(orient="index") if output_type == "dict" else df
get_ids()
¶
Gets all data source instances and their respective ids.
Returns:
-
dict[str, int]–Dictionary with all data source instances and their respective ids in the format {datasource_instance: id, ...}.
Source code in echo_postgres/datasource_instances.py
def get_ids(self) -> dict[str, int]:
"""Gets all data source instances and their respective ids.
Returns
-------
dict[str, int]
Dictionary with all data source instances and their respective ids in the format {datasource_instance: id, ...}.
"""
query = sql.SQL("SELECT name, id FROM performance.v_data_sources ORDER BY name")
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query)
return df.set_index("name").to_dict()["id"]
remove_objects(data_source_name, object_names)
¶
Removes objects from a data source instance.
Parameters:
-
(data_source_name¶str) –Name of the data source instance.
-
(object_names¶list[str] or Literal['all']) –List of object names to be removed from the data source instance. If "all", removes all objects from the data source.
Source code in echo_postgres/datasource_instances.py
@validate_call
def remove_objects(self, data_source_name: str, object_names: list[str] | Literal["all"]) -> None:
"""Removes objects from a data source instance.
Parameters
----------
data_source_name : str
Name of the data source instance.
object_names : list[str] or Literal["all"]
List of object names to be removed from the data source instance. If "all", removes all objects from the data source.
"""
# checking if data source exists
exisitng_data_sources = self.get_ids()
if data_source_name not in exisitng_data_sources:
raise ValueError(f"data_source_name {data_source_name} does not exist")
# constructing query
query = [
sql.SQL(
"DELETE FROM performance.data_source_object_connections WHERE data_source_id = {data_source_id}",
).format(
data_source_id=sql.Literal(exisitng_data_sources[data_source_name]),
),
]
if object_names != "all":
objs = self._perfdb.objects.instances.get_ids(object_names=object_names)
if len(objs) != len(object_names):
missing_objs = set(object_names) - set(objs)
raise ValueError(f"object_names '{missing_objs}' do not exist")
object_ids = [objs[name] for name in object_names]
query.append(
sql.SQL(" AND object_id IN ({object_ids})").format(
object_ids=sql.SQL(", ").join(sql.Literal(obj_id) for obj_id in object_ids),
),
)
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
logger.debug(f"Removed objects {object_names} from data source {data_source_name}")