Skip to content

Data Source Instances

DataSourceInstances(perfdb)

Class used for handling data source instances. Can be accessed via perfdb.datasources.instances.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/datasource_instances.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Class used for handling data source instances. Can be accessed via `perfdb.datasources.instances`.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.
    """
    super().__init__(perfdb)

    from .datasource_instance_attributes import DataSourceInstanceAttributes

    # * subclasses

    self.attributes = DataSourceInstanceAttributes(perfdb)

add_objects(data_source_name, object_names)

Adds objects to a data source instance.

Parameters:

  • data_source_name

    (str) –

    Name of the data source instance.

  • object_names

    (list[str]) –

    List of object names to be added to the data source instance.

Source code in echo_postgres/datasource_instances.py
@validate_call
def add_objects(self, data_source_name: str, object_names: list[str]) -> None:
    """Adds objects to a data source instance.

    Parameters
    ----------
    data_source_name : str
        Name of the data source instance.
    object_names : list[str]
        List of object names to be added to the data source instance.
    """
    # checking if data source exists
    exisitng_data_sources = self.get_ids()
    if data_source_name not in exisitng_data_sources:
        raise ValueError(f"data_source_name {data_source_name} does not exist")

    # checking if objects exists
    objs = self._perfdb.objects.instances.get_ids(object_names=object_names)
    if len(objs) != len(object_names):
        missing_objs = set(object_names) - set(objs)
        raise ValueError(f"object_names '{missing_objs}' do not exist")

    data_source_id = exisitng_data_sources[data_source_name]
    object_ids = [objs[name] for name in object_names]

    # constructing query
    query = [
        sql.SQL(
            "INSERT INTO performance.data_source_object_connections (data_source_id, object_id) VALUES ",
        ),
    ]
    query.append(
        sql.SQL(", ").join(
            sql.SQL("({data_source_id}, {obj_id})").format(data_source_id=sql.Literal(data_source_id), obj_id=sql.Literal(obj_id))
            for obj_id in object_ids
        ),
    )
    query.append(sql.SQL(" ON CONFLICT DO NOTHING"))

    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        conn.execute(query)

    logger.debug(f"Added objects {object_names} to data source {data_source_name}")

get(data_source_names=None, data_source_types_names=None, filter_type='and', get_attributes=False, attribute_names=None, output_type='dict')

Gets all data source instances definitions with detailed information.

The most useful keys/columns returned are:

  • id
  • display_name
  • description
  • data_source_type_id
  • data_source_type_name
  • object_ids
  • object_names

Parameters:

  • data_source_names

    (list[str], default: None ) –

    List of data source names to be retrieved. If None, retrieves all data sources. By default None

  • data_source_types_names

    (list[str], default: None ) –

    List of data source type names to be retrieved. If None, retrieves all data source types. By default None

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

  • get_attributes

    (bool, default: False ) –

    If True, will also get the attributes of the data sources. It's highly recommended to specify the attribute names to get to speed up the query.

    By default False

  • attribute_names

    (list[str] | None, default: None ) –

    List of attribute names to get if get_attributes is True. In case get_attributes is False, this parameter is ignored. If None, all attributes are returned.

    By default None

  • output_type

    (Literal['dict', 'DataFrame'], default: 'dict' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

Returns:

  • dict[str, dict[str, str | int]]

    In case output_instance is "dict", returns a dictionary in the format {name: {attribute: value, ...}, ...}

  • DataFrame

    In case output_instance is "DataFrame", returns a DataFrame with the following format: index = name, columns = [attribute, ...]

Source code in echo_postgres/datasource_instances.py
@validate_call
def get(
    self,
    data_source_names: list[str] | None = None,
    data_source_types_names: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
    get_attributes: bool = False,
    attribute_names: list[str] | None = None,
    output_type: Literal["dict", "DataFrame"] = "dict",
) -> dict[str, dict[str, str | int]] | DataFrame:
    """Gets all data source instances definitions with detailed information.

    The most useful keys/columns returned are:

    - id
    - display_name
    - description
    - data_source_type_id
    - data_source_type_name
    - object_ids
    - object_names

    Parameters
    ----------
    data_source_names : list[str], optional
        List of data source names to be retrieved. If None, retrieves all data sources.
        By default None
    data_source_types_names : list[str], optional
        List of data source type names to be retrieved. If None, retrieves all data source types.
        By default None
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"
    get_attributes : bool, optional
        If True, will also get the attributes of the data sources. It's highly recommended to specify the attribute names to get
        to speed up the query.

        By default False
    attribute_names : list[str] | None, optional
        List of attribute names to get if get_attributes is True. In case get_attributes is False, this parameter is ignored. If None, all attributes are returned.

        By default None
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"

    Returns
    -------
    dict[str, dict[str, str | int]]
        In case output_instance is "dict", returns a dictionary in the format {name: {attribute: value, ...}, ...}
    DataFrame
        In case output_instance is "DataFrame", returns a DataFrame with the following format: index = name, columns = [attribute, ...]
    """
    # building query
    query = [sql.SQL("SELECT * FROM performance.v_data_sources")]

    where = []
    if data_source_names:
        where.append(
            sql.SQL("name IN ({data_source_names})").format(
                data_source_names=sql.SQL(", ").join(sql.Literal(name) for name in data_source_names),
            ),
        )
    if data_source_types_names:
        where.append(
            sql.SQL("data_source_type_name IN ({data_source_types_names})").format(
                data_source_types_names=sql.SQL(", ").join(sql.Literal(name) for name in data_source_types_names),
            ),
        )
    if where:
        query.append(sql.SQL(" WHERE "))
        query.append(sql.SQL(f" {filter_type.upper()} ").join(where))

    # adding order by
    query.append(sql.SQL(" ORDER BY name"))

    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query, post_convert="pyarrow")
    df = df.set_index("name")

    # getting attributes
    if get_attributes:
        # names of the data sources
        got_data_sources = df.index.tolist()
        attrs: DataFrame = self._perfdb.datasources.instances.attributes.get(
            data_sources=got_data_sources,
            attribute_names=attribute_names,
            output_type="DataFrame",
            values_only=True,
        )
        # pivot the attributes
        attrs = attrs.reset_index(drop=False).pivot(index="data_source_name", columns="attribute_name", values="attribute_value")
        # merging the attributes with the object models
        df = df.merge(attrs, left_index=True, right_index=True, how="left")

    return df.to_dict(orient="index") if output_type == "dict" else df

get_ids()

Gets all data source instances and their respective ids.

Returns:

  • dict[str, int]

    Dictionary with all data source instances and their respective ids in the format {datasource_instance: id, ...}.

Source code in echo_postgres/datasource_instances.py
def get_ids(self) -> dict[str, int]:
    """Gets all data source instances and their respective ids.

    Returns
    -------
    dict[str, int]
        Dictionary with all data source instances and their respective ids in the format {datasource_instance: id, ...}.
    """
    query = sql.SQL("SELECT name, id FROM performance.v_data_sources ORDER BY name")

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query)

    return df.set_index("name").to_dict()["id"]

remove_objects(data_source_name, object_names)

Removes objects from a data source instance.

Parameters:

  • data_source_name

    (str) –

    Name of the data source instance.

  • object_names

    (list[str] or Literal['all']) –

    List of object names to be removed from the data source instance. If "all", removes all objects from the data source.

Source code in echo_postgres/datasource_instances.py
@validate_call
def remove_objects(self, data_source_name: str, object_names: list[str] | Literal["all"]) -> None:
    """Removes objects from a data source instance.

    Parameters
    ----------
    data_source_name : str
        Name of the data source instance.
    object_names : list[str] or Literal["all"]
        List of object names to be removed from the data source instance. If "all", removes all objects from the data source.
    """
    # checking if data source exists
    exisitng_data_sources = self.get_ids()
    if data_source_name not in exisitng_data_sources:
        raise ValueError(f"data_source_name {data_source_name} does not exist")

    # constructing query
    query = [
        sql.SQL(
            "DELETE FROM performance.data_source_object_connections WHERE data_source_id = {data_source_id}",
        ).format(
            data_source_id=sql.Literal(exisitng_data_sources[data_source_name]),
        ),
    ]
    if object_names != "all":
        objs = self._perfdb.objects.instances.get_ids(object_names=object_names)
        if len(objs) != len(object_names):
            missing_objs = set(object_names) - set(objs)
            raise ValueError(f"object_names '{missing_objs}' do not exist")
        object_ids = [objs[name] for name in object_names]
        query.append(
            sql.SQL(" AND object_id IN ({object_ids})").format(
                object_ids=sql.SQL(", ").join(sql.Literal(obj_id) for obj_id in object_ids),
            ),
        )

    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        conn.execute(query)

    logger.debug(f"Removed objects {object_names} from data source {data_source_name}")