Data Source Instance Attributes¶
DataSourceInstanceAttributes(perfdb)
¶
Class used for handling data source instance attributes attributes. Can be accessed via perfdb.datasources.attributes.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(data_source_name, attribute_name)
¶
Deletes an attribute value.
Parameters:
-
(data_source_name¶str) –Name of the data source to delete the attribute value from.
-
(attribute_name¶str) –Name of the attribute to delete the value from.
Source code in echo_postgres/datasource_instance_attributes.py
@validate_call
def delete(
self,
data_source_name: str,
attribute_name: str,
) -> None:
"""Deletes an attribute value.
Parameters
----------
data_source_name : str
Name of the data source to delete the attribute value from.
attribute_name : str
Name of the attribute to delete the value from.
"""
# building the query
query = [
sql.SQL(
"DELETE FROM performance.data_source_attributes "
"WHERE data_source_id = (SELECT id FROM performance.data_sources WHERE name = {data_source_name}) "
"AND attribute_id = (SELECT id FROM performance.attributes_def WHERE name = {attribute_name}) ",
).format(
data_source_name=sql.Literal(data_source_name),
attribute_name=sql.Literal(attribute_name),
),
]
# executing the query
with self._perfdb.conn.reconnect() as conn:
# deleting
result = conn.execute(sql.Composed(query))
logger.debug(f"Deleted {result.rowcount} rows from performance.data_source_attributes")
get(data_sources=None, attribute_names=None, filter_type='and', output_type='dict', values_only=False)
¶
Method to get the attributes of the given data source instance.
The most useful keys/columns returned are:
- data_source_id
- data_source_name
- data_source_type_id
- data_source_type_name
- attribute_id
- attribute_name
- attribute_display_name
- attribute_value
- data_type_name
Parameters:
-
(data_sources¶list[str] | None, default:None) –List of data source instances to get the attributes from. If set to None will get from all. By default None
-
(attribute_names¶list[str] | None, default:None) –List of attribute names to filter the results. If set to None will get all. By default None
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(output_type¶Literal['dict', 'DataFrame'], default:'dict') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
-
(values_only¶bool, default:False) –If set to True, will only return the values of the attributes, skipping display_name, id, etc.
Returns:
-
dict[str, dict[str, Any | dict[str, Any]]]–In case output_type is "dict", returns a dictionary in the format {data_source_name: {attribute_name: {attribute: value, ...}, ...}, ...} If values_only is set to True, returns a dictionary in the format {data_source_name: {attribute_name: value, ...}, ...}
-
DataFrame–In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[data_source_name, attribute_name], columns = [attribute, ...] If values_only is set to True, returns a DataFrame with the following format: index = MultiIndex[data_source_name, attribute_name], columns = ["attribute_value"]
Source code in echo_postgres/datasource_instance_attributes.py
@validate_call
def get(
self,
data_sources: list[str] | None = None,
attribute_names: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame"] = "dict",
values_only: bool = False,
) -> dict[str, dict[str, Any | dict[str, Any]]] | DataFrame:
"""Method to get the attributes of the given data source instance.
The most useful keys/columns returned are:
- data_source_id
- data_source_name
- data_source_type_id
- data_source_type_name
- attribute_id
- attribute_name
- attribute_display_name
- attribute_value
- data_type_name
Parameters
----------
data_sources : list[str] | None, optional
List of data source instances to get the attributes from. If set to None will get from all. By default None
attribute_names : list[str] | None, optional
List of attribute names to filter the results. If set to None will get all. By default None
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
values_only : bool, optional
If set to True, will only return the values of the attributes, skipping display_name, id, etc.
Returns
-------
dict[str, dict[str, Any | dict[str, Any]]]
In case output_type is "dict", returns a dictionary in the format {data_source_name: {attribute_name: {attribute: value, ...}, ...}, ...}
If values_only is set to True, returns a dictionary in the format {data_source_name: {attribute_name: value, ...}, ...}
DataFrame
In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[data_source_name, attribute_name], columns = [attribute, ...]
If values_only is set to True, returns a DataFrame with the following format: index = MultiIndex[data_source_name, attribute_name], columns = ["attribute_value"]
"""
# checking if all data source instance attributes are valid
if data_sources:
existing_data_sources = self._perfdb.datasources.instances.get_ids()
if missing_data_sources := set(data_sources) - set(
existing_data_sources.keys(),
):
raise ValueError(f"The following data source instances do not exist: {missing_data_sources}")
# checking if all attribute names are valid and also getting its ids
if attribute_names:
existing_attributes = self._perfdb.attributes.get_ids(attribute_names=attribute_names)
if missing_attributes := set(attribute_names) - set(existing_attributes.keys()):
raise ValueError(f"The following attributes do not exist: {missing_attributes}")
# building the query
query = [
sql.SQL(
"SELECT {values} FROM performance.v_data_source_attributes",
).format(
values=sql.SQL(
"data_source_name, attribute_name, attribute_value, data_type_name",
)
if values_only
else sql.SQL("*"),
),
]
where = []
if data_sources:
where.append(
sql.SQL(" data_source_name IN ({data_source}) ").format(
data_source=sql.SQL(",").join(sql.Literal(ds) for ds in data_sources),
),
)
if attribute_names:
where.append(
sql.SQL(" attribute_id IN ({attribute_ids}) ").format(
attribute_ids=sql.SQL(",").join(sql.Literal(existing_attributes[an]) for an in attribute_names),
),
)
if where:
where = sql.SQL(f" {filter_type.upper()} ").join(where)
query.append(sql.SQL(" WHERE "))
query.append(where)
query.append(sql.SQL(" ORDER BY data_source_name, attribute_name"))
query = sql.Composed(query)
# executing the query
with self._perfdb.conn.reconnect() as conn:
# setting attribute_value as object to avoid casting json column as string
df = conn.read_to_pandas(query, post_convert="pyarrow")
# casting the attribute values
df = cast_attributes(df=df, index_cols=["data_source_name"])
df = df.set_index(["data_source_name", "attribute_name"])
# returning the result
if output_type == "dict":
# dropping unwanted columns
if values_only:
df = df["attribute_value"]
output = df.to_dict()
else:
output = df[["attribute_id", "attribute_value", "data_type_id", "data_type_name", "modified_date"]].to_dict(orient="index")
# converting dict where the keys are tuples {(key1, key2): value}, to a dict where the keys are strings like {key1: {key2: value}}
new_output = {}
for (om, an), values in output.items():
if om not in new_output:
new_output[om] = {}
new_output[om][an] = values
return new_output
if output_type == "DataFrame" and values_only:
df = df[["attribute_value"]].copy()
return df
insert(data_source_name, attribute_name, attribute_value, on_conflict='raise')
¶
Inserts a new attribute value.
Parameters:
-
(data_source_name¶str) –Name of the data source to insert the attribute value to.
-
(attribute_name¶str) –Name of the attribute to insert the value to.
-
(attribute_value¶Any) –Value of the attribute.
-
(on_conflict¶Literal['raise', 'ignore', 'update'], default:'raise') –What to do in case of conflict. Can be one of ["raise", "ignore", "update"]. By default "raise"
Raises:
-
ValueError–In case of conflict and on_conflict is set to "raise".
Source code in echo_postgres/datasource_instance_attributes.py
@validate_call
def insert(
self,
data_source_name: str,
attribute_name: str,
attribute_value: Any,
on_conflict: Literal["raise", "ignore", "update"] = "raise",
) -> None:
"""Inserts a new attribute value.
Parameters
----------
data_source_name : str
Name of the data source to insert the attribute value to.
attribute_name : str
Name of the attribute to insert the value to.
attribute_value : Any
Value of the attribute.
on_conflict : Literal["raise", "ignore", "update"], optional
What to do in case of conflict. Can be one of ["raise", "ignore", "update"].
By default "raise"
Raises
------
ValueError
In case of conflict and on_conflict is set to "raise".
"""
# checking if data source exists
existing_data_sources = self._perfdb.datasources.instances.get_ids()
if data_source_name not in existing_data_sources:
raise ValueError(f"The data source {data_source_name} does not exist")
# checking and casting the attribute value
insert_attribute_value, attribute_id = check_attribute_dtype(
attribute_name=attribute_name,
attribute_value=attribute_value,
perfdb=self._perfdb,
)
# building the query
query = [
sql.SQL(
"INSERT INTO performance.data_source_attributes (data_source_id, attribute_id, value) "
"VALUES ({data_source_id}, {attribute_id}, {attribute_value}) ",
).format(
data_source_id=sql.Literal(
existing_data_sources[data_source_name],
),
attribute_id=sql.Literal(attribute_id),
attribute_value=sql.Literal(insert_attribute_value),
),
]
match on_conflict:
case "raise":
# doing nothing will raise conflicts as expected
pass
case "ignore":
query.append(sql.SQL(" ON CONFLICT DO NOTHING "))
case "update":
query.append(
sql.SQL(
" ON CONFLICT (data_source_id, attribute_id) DO UPDATE SET value = EXCLUDED.value ",
),
)
# executing the query
with self._perfdb.conn.reconnect() as conn:
conn.execute(sql.Composed(query))
logger.debug(f"Attribute '{attribute_name}' inserted to data source '{data_source_name}'")
update(data_source_name, attribute_name, attribute_value)
¶
Updates an attribute value.
Parameters:
-
(data_source_name¶str) –Name of the data source to update the attribute value from.
-
(attribute_name¶str) –Name of the attribute to update the value from.
-
(attribute_value¶Any) –New value of the attribute.
Source code in echo_postgres/datasource_instance_attributes.py
@validate_call
def update(
self,
data_source_name: str,
attribute_name: str,
attribute_value: Any,
) -> None:
"""Updates an attribute value.
Parameters
----------
data_source_name : str
Name of the data source to update the attribute value from.
attribute_name : str
Name of the attribute to update the value from.
attribute_value : Any
New value of the attribute.
"""
# checking if data source exists
existing_data_sources = self._perfdb.datasources.instances.get_ids()
if data_source_name not in existing_data_sources:
raise ValueError(f"The data source{data_source_name} does not exist")
# checking and casting the attribute value
insert_attribute_value, attribute_id = check_attribute_dtype(
attribute_name=attribute_name,
attribute_value=attribute_value,
perfdb=self._perfdb,
)
# building the query
query = [
sql.SQL(
"UPDATE performance.data_source_attributes "
"SET value = {attribute_value} "
"WHERE data_source_id = {data_source_id} "
"AND attribute_id = {attribute_id} ",
).format(
data_source_id=sql.Literal(
existing_data_sources[data_source_name],
),
attribute_id=sql.Literal(attribute_id),
attribute_value=sql.Literal(insert_attribute_value),
),
]
# executing the query
with self._perfdb.conn.reconnect() as conn:
conn.execute(sql.Composed(query))
logger.debug(f"Attribute '{attribute_name}' updated in data source '{data_source_name}'")