Skip to content

Data Source Instance Attributes

DataSourceInstanceAttributes(perfdb)

Class used for handling data source instance attributes attributes. Can be accessed via perfdb.datasources.attributes.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(data_source_name, attribute_name)

Deletes an attribute value.

Parameters:

  • data_source_name

    (str) –

    Name of the data source to delete the attribute value from.

  • attribute_name

    (str) –

    Name of the attribute to delete the value from.

Source code in echo_postgres/datasource_instance_attributes.py
@validate_call
def delete(
    self,
    data_source_name: str,
    attribute_name: str,
) -> None:
    """Deletes an attribute value.

    Parameters
    ----------
    data_source_name : str
        Name of the data source to delete the attribute value from.
    attribute_name : str
        Name of the attribute to delete the value from.
    """
    # building the query
    query = [
        sql.SQL(
            "DELETE FROM performance.data_source_attributes "
            "WHERE data_source_id = (SELECT id FROM performance.data_sources WHERE name = {data_source_name}) "
            "AND attribute_id = (SELECT id FROM performance.attributes_def WHERE name = {attribute_name}) ",
        ).format(
            data_source_name=sql.Literal(data_source_name),
            attribute_name=sql.Literal(attribute_name),
        ),
    ]
    # executing the query
    with self._perfdb.conn.reconnect() as conn:
        # deleting
        result = conn.execute(sql.Composed(query))

    logger.debug(f"Deleted {result.rowcount} rows from performance.data_source_attributes")

get(data_sources=None, attribute_names=None, filter_type='and', output_type='dict', values_only=False)

Method to get the attributes of the given data source instance.

The most useful keys/columns returned are:

  • data_source_id
  • data_source_name
  • data_source_type_id
  • data_source_type_name
  • attribute_id
  • attribute_name
  • attribute_display_name
  • attribute_value
  • data_type_name

Parameters:

  • data_sources

    (list[str] | None, default: None ) –

    List of data source instances to get the attributes from. If set to None will get from all. By default None

  • attribute_names

    (list[str] | None, default: None ) –

    List of attribute names to filter the results. If set to None will get all. By default None

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

  • output_type

    (Literal['dict', 'DataFrame'], default: 'dict' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

  • values_only

    (bool, default: False ) –

    If set to True, will only return the values of the attributes, skipping display_name, id, etc.

Returns:

  • dict[str, dict[str, Any | dict[str, Any]]]

    In case output_type is "dict", returns a dictionary in the format {data_source_name: {attribute_name: {attribute: value, ...}, ...}, ...} If values_only is set to True, returns a dictionary in the format {data_source_name: {attribute_name: value, ...}, ...}

  • DataFrame

    In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[data_source_name, attribute_name], columns = [attribute, ...] If values_only is set to True, returns a DataFrame with the following format: index = MultiIndex[data_source_name, attribute_name], columns = ["attribute_value"]

Source code in echo_postgres/datasource_instance_attributes.py
@validate_call
def get(
    self,
    data_sources: list[str] | None = None,
    attribute_names: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame"] = "dict",
    values_only: bool = False,
) -> dict[str, dict[str, Any | dict[str, Any]]] | DataFrame:
    """Method to get the attributes of the given data source instance.

    The most useful keys/columns returned are:

    - data_source_id
    - data_source_name
    - data_source_type_id
    - data_source_type_name
    - attribute_id
    - attribute_name
    - attribute_display_name
    - attribute_value
    - data_type_name

    Parameters
    ----------
    data_sources : list[str] | None, optional
        List of data source instances to get the attributes from. If set to None will get from all. By default None
    attribute_names : list[str] | None, optional
        List of attribute names to filter the results. If set to None will get all. By default None
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"
    values_only : bool, optional
        If set to True, will only return the values of the attributes, skipping display_name, id, etc.

    Returns
    -------
    dict[str, dict[str, Any | dict[str, Any]]]
        In case output_type is "dict", returns a dictionary in the format {data_source_name: {attribute_name: {attribute: value, ...}, ...}, ...}
        If values_only is set to True, returns a dictionary in the format {data_source_name: {attribute_name: value, ...}, ...}
    DataFrame
        In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[data_source_name, attribute_name], columns = [attribute, ...]
        If values_only is set to True, returns a DataFrame with the following format: index = MultiIndex[data_source_name, attribute_name], columns = ["attribute_value"]
    """
    # checking if all data source instance attributes are valid
    if data_sources:
        existing_data_sources = self._perfdb.datasources.instances.get_ids()
        if missing_data_sources := set(data_sources) - set(
            existing_data_sources.keys(),
        ):
            raise ValueError(f"The following data source instances do not exist: {missing_data_sources}")

    # checking if all attribute names are valid and also getting its ids
    if attribute_names:
        existing_attributes = self._perfdb.attributes.get_ids(attribute_names=attribute_names)
        if missing_attributes := set(attribute_names) - set(existing_attributes.keys()):
            raise ValueError(f"The following attributes do not exist: {missing_attributes}")

    # building the query
    query = [
        sql.SQL(
            "SELECT {values} FROM performance.v_data_source_attributes",
        ).format(
            values=sql.SQL(
                "data_source_name, attribute_name, attribute_value, data_type_name",
            )
            if values_only
            else sql.SQL("*"),
        ),
    ]
    where = []
    if data_sources:
        where.append(
            sql.SQL(" data_source_name IN ({data_source}) ").format(
                data_source=sql.SQL(",").join(sql.Literal(ds) for ds in data_sources),
            ),
        )
    if attribute_names:
        where.append(
            sql.SQL(" attribute_id IN ({attribute_ids}) ").format(
                attribute_ids=sql.SQL(",").join(sql.Literal(existing_attributes[an]) for an in attribute_names),
            ),
        )
    if where:
        where = sql.SQL(f" {filter_type.upper()} ").join(where)
        query.append(sql.SQL(" WHERE "))
        query.append(where)
    query.append(sql.SQL(" ORDER BY data_source_name, attribute_name"))
    query = sql.Composed(query)

    # executing the query
    with self._perfdb.conn.reconnect() as conn:
        # setting attribute_value as object to avoid casting json column as string
        df = conn.read_to_pandas(query, post_convert="pyarrow")

    # casting the attribute values
    df = cast_attributes(df=df, index_cols=["data_source_name"])

    df = df.set_index(["data_source_name", "attribute_name"])

    # returning the result
    if output_type == "dict":
        # dropping unwanted columns
        if values_only:
            df = df["attribute_value"]
            output = df.to_dict()
        else:
            output = df[["attribute_id", "attribute_value", "data_type_id", "data_type_name", "modified_date"]].to_dict(orient="index")
        # converting dict where the keys are tuples {(key1, key2): value}, to a dict where the keys are strings like {key1: {key2: value}}
        new_output = {}
        for (om, an), values in output.items():
            if om not in new_output:
                new_output[om] = {}
            new_output[om][an] = values
        return new_output
    if output_type == "DataFrame" and values_only:
        df = df[["attribute_value"]].copy()

    return df

insert(data_source_name, attribute_name, attribute_value, on_conflict='raise')

Inserts a new attribute value.

Parameters:

  • data_source_name

    (str) –

    Name of the data source to insert the attribute value to.

  • attribute_name

    (str) –

    Name of the attribute to insert the value to.

  • attribute_value

    (Any) –

    Value of the attribute.

  • on_conflict

    (Literal['raise', 'ignore', 'update'], default: 'raise' ) –

    What to do in case of conflict. Can be one of ["raise", "ignore", "update"]. By default "raise"

Raises:

  • ValueError

    In case of conflict and on_conflict is set to "raise".

Source code in echo_postgres/datasource_instance_attributes.py
@validate_call
def insert(
    self,
    data_source_name: str,
    attribute_name: str,
    attribute_value: Any,
    on_conflict: Literal["raise", "ignore", "update"] = "raise",
) -> None:
    """Inserts a new attribute value.

    Parameters
    ----------
    data_source_name : str
        Name of the data source to insert the attribute value to.
    attribute_name : str
        Name of the attribute to insert the value to.
    attribute_value : Any
        Value of the attribute.
    on_conflict : Literal["raise", "ignore", "update"], optional
        What to do in case of conflict. Can be one of ["raise", "ignore", "update"].
        By default "raise"

    Raises
    ------
    ValueError
        In case of conflict and on_conflict is set to "raise".
    """
    # checking if data source exists
    existing_data_sources = self._perfdb.datasources.instances.get_ids()
    if data_source_name not in existing_data_sources:
        raise ValueError(f"The data source {data_source_name} does not exist")

    # checking and casting the attribute value
    insert_attribute_value, attribute_id = check_attribute_dtype(
        attribute_name=attribute_name,
        attribute_value=attribute_value,
        perfdb=self._perfdb,
    )

    # building the query
    query = [
        sql.SQL(
            "INSERT INTO performance.data_source_attributes (data_source_id, attribute_id, value) "
            "VALUES ({data_source_id}, {attribute_id}, {attribute_value}) ",
        ).format(
            data_source_id=sql.Literal(
                existing_data_sources[data_source_name],
            ),
            attribute_id=sql.Literal(attribute_id),
            attribute_value=sql.Literal(insert_attribute_value),
        ),
    ]
    match on_conflict:
        case "raise":
            # doing nothing will raise conflicts as expected
            pass
        case "ignore":
            query.append(sql.SQL(" ON CONFLICT DO NOTHING "))
        case "update":
            query.append(
                sql.SQL(
                    " ON CONFLICT (data_source_id, attribute_id) DO UPDATE SET value = EXCLUDED.value ",
                ),
            )

    # executing the query
    with self._perfdb.conn.reconnect() as conn:
        conn.execute(sql.Composed(query))

    logger.debug(f"Attribute '{attribute_name}' inserted to data source '{data_source_name}'")

update(data_source_name, attribute_name, attribute_value)

Updates an attribute value.

Parameters:

  • data_source_name

    (str) –

    Name of the data source to update the attribute value from.

  • attribute_name

    (str) –

    Name of the attribute to update the value from.

  • attribute_value

    (Any) –

    New value of the attribute.

Source code in echo_postgres/datasource_instance_attributes.py
@validate_call
def update(
    self,
    data_source_name: str,
    attribute_name: str,
    attribute_value: Any,
) -> None:
    """Updates an attribute value.

    Parameters
    ----------
    data_source_name : str
        Name of the data source to update the attribute value from.
    attribute_name : str
        Name of the attribute to update the value from.
    attribute_value : Any
        New value of the attribute.
    """
    # checking if data source exists
    existing_data_sources = self._perfdb.datasources.instances.get_ids()
    if data_source_name not in existing_data_sources:
        raise ValueError(f"The data source{data_source_name} does not exist")

    # checking and casting the attribute value
    insert_attribute_value, attribute_id = check_attribute_dtype(
        attribute_name=attribute_name,
        attribute_value=attribute_value,
        perfdb=self._perfdb,
    )

    # building the query
    query = [
        sql.SQL(
            "UPDATE performance.data_source_attributes "
            "SET value = {attribute_value} "
            "WHERE data_source_id = {data_source_id} "
            "AND attribute_id = {attribute_id} ",
        ).format(
            data_source_id=sql.Literal(
                existing_data_sources[data_source_name],
            ),
            attribute_id=sql.Literal(attribute_id),
            attribute_value=sql.Literal(insert_attribute_value),
        ),
    ]
    # executing the query
    with self._perfdb.conn.reconnect() as conn:
        conn.execute(sql.Composed(query))

    logger.debug(f"Attribute '{attribute_name}' updated in data source '{data_source_name}'")