Skip to content

Raw Data Values

RawDataValues(perfdb)

Class used for handling raw data values. Can be accessed via perfdb.rawdata.values.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(period, object_names=None, raw_data_names=None)

Delete raw data values based on the specified criteria.

Parameters:

  • period

    (DateTimeRange) –

    The time period within which the raw data values should be deleted.

  • object_names

    (list[str] | None, default: None ) –

    A list of object names. Only raw data values associated with these object names will be deleted. If None, all object names will be considered.

  • raw_data_names

    (list[str] | None, default: None ) –

    A list of raw data names. Only raw data values with these names will be deleted. If None, all raw data names will be considered.

Source code in echo_postgres/rawdata_values.py
@validate_call
def delete(
    self,
    period: DateTimeRange,
    object_names: list[str] | None = None,
    raw_data_names: list[str] | None = None,
) -> None:
    """
    Delete raw data values based on the specified criteria.

    Parameters
    ----------
    period : DateTimeRange
        The time period within which the raw data values should be deleted.
    object_names : list[str] | None, optional
        A list of object names. Only raw data values associated with these object names will be deleted.
        If None, all object names will be considered.
    raw_data_names : list[str] | None, optional
        A list of raw data names. Only raw data values with these names will be deleted.
        If None, all raw data names will be considered.
    """
    if object_names:
        obj_models = self._perfdb.objects.instances.get(object_names=object_names, output_type="DataFrame")[
            "object_model_name"
        ].to_dict()
        object_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
        if len(object_ids) != len(object_names):
            wrong_objs = set(object_names) - set(object_ids.keys())
            raise ValueError(f"Object names {wrong_objs} not found in the database")
        # converting to list
        object_ids = list(object_ids.values())
    else:
        object_ids = None

    # raw_data_names
    if raw_data_names:
        raw_data_ids: dict[str, dict[str, int]] = self._perfdb.rawdata.definitions.get_ids(
            raw_data_names=raw_data_names,
            object_models=list(set(obj_models.values())) if object_names else None,
        )
        # converting to list
        raw_data_ids = [x for v in raw_data_ids.values() for x in v.values()]
    else:
        raw_data_ids = None

    # building query
    query = [
        sql.SQL(
            "DELETE FROM performance.raw_data_values WHERE (timestamp >= {period_start} AND timestamp <= {period_end})",
        ).format(
            period_start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            period_end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]
    if object_ids:
        query.append(sql.SQL(" AND object_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, object_ids))))
    if raw_data_ids:
        query.append(sql.SQL(" AND raw_data_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, raw_data_ids))))

    query = sql.Composed(query)

    # executing the query
    with self._perfdb.conn.reconnect() as conn:
        # deleting
        result = conn.execute(sql.Composed(query))

    logger.debug(f"Deleted {result.rowcount} rows from performance.raw_data_values")

get(period, object_names=None, object_models=None, raw_data_names=None, filter_type='and', save_dir=None, output_type='DataFrame')

Gets the raw data values for a given period and set of filters.

Parameters:

  • period

    (DateTimeRange | list[date]) –

    Can be a DateTimeRange or a list of dates.

    • If DateTimeRange, will get the data for the entire range (limiting on start and end)
    • If list of dates, will get the data for each date in the list.
  • object_names

    (list[str] | None, default: None ) –

    List of object names to filter the results, by default None.

  • object_models

    (list[str] | None, default: None ) –

    List of object model names to filter the results, by default None.

  • raw_data_names

    (list[str] | None, default: None ) –

    List of raw data value names to filter the results, by default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"], by default "and".

  • save_dir

    (Path | None, default: None ) –

    If set to a directory, will try to save all the data as files in it. The files will have the name like {object_name}{raw_data_name}{YYYYmmddHHMMSS}.{extension}. Instead of the converted value, the DataFrame or dictionary will contain the path to the file. If set to None, no files will be saved but the data will be converted from binary and returned directly. By default None

  • output_type

    (Literal['dict', 'DataFrame'], default: 'DataFrame' ) –

    The format of the returned data. Can be one of ["dict", "DataFrame"], by default "DataFrame".

Returns:

  • DataFrame

    DataFrame in the format index=MultiIndex[object_name, raw_data_name, timestamp], columns=[value, metadata, ...]

  • dict[str, dict[str, dict[Timestamp, dict[str, Any]]]]

    A dictionary in the format {object_name: {raw_data_name: {timestamp: {value: value, metadata: metadata, ...}, ...}, ...}, ...}

Source code in echo_postgres/rawdata_values.py
@validate_call
def get(
    self,
    period: DateTimeRange | list[date],
    object_names: list[str] | None = None,
    object_models: list[str] | None = None,
    raw_data_names: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
    save_dir: Path | None = None,
    output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, dict[str, dict[Timestamp, dict[str, Any]]]]:
    """Gets the raw data values for a given period and set of filters.

    Parameters
    ----------
    period : DateTimeRange | list[date]
        Can be a DateTimeRange or a list of dates.

        - If DateTimeRange, will get the data for the entire range (limiting on start and end)
        - If list of dates, will get the data for each date in the list.

    object_names : list[str] | None, optional
        List of object names to filter the results, by default None.
    object_models : list[str] | None, optional
        List of object model names to filter the results, by default None.
    raw_data_names : list[str] | None, optional
        List of raw data value names to filter the results, by default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"], by default "and".
    save_dir : Path | None, optional
        If set to a directory, will try to save all the data as files in it. The files will have the name like {object_name}_{raw_data_name}_{YYYYmmddHHMMSS}.{extension}.
        Instead of the converted value, the DataFrame or dictionary will contain the path to the file.
        If set to None, no files will be saved but the data will be converted from binary and returned directly.
        By default None
    output_type : Literal["dict", "DataFrame"], optional
        The format of the returned data. Can be one of ["dict", "DataFrame"], by default "DataFrame".

    Returns
    -------
    DataFrame
        DataFrame in the format index=MultiIndex[object_name, raw_data_name, timestamp], columns=[value, metadata, ...]
    dict[str, dict[str, dict[Timestamp, dict[str, Any]]]]
        A dictionary in the format {object_name: {raw_data_name: {timestamp: {value: value, metadata: metadata, ...}, ...}, ...}, ...}
    """
    where = self._check_get_args(
        object_names=object_names,
        object_models=object_models,
        raw_data_names=raw_data_names,
        filter_type=filter_type,
    )

    # building query
    query = [
        sql.SQL(
            "SELECT object_name, raw_data_name, timestamp, value, metadata, data_type_name FROM performance.v_raw_data_values WHERE ",
        ),
    ]
    if isinstance(period, DateTimeRange):
        where_period = sql.SQL(" (timestamp >= {period_start} AND timestamp <= {period_end}) ").format(
            period_start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            period_end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        )
    else:
        where_period = sql.SQL(" (timestamp::date IN ({dates})) ").format(
            dates=sql.SQL(", ").join(sql.Literal(x.strftime("%Y-%m-%d")) for x in period),
        )
    query.append(where_period)
    query.append(where)
    query.append(sql.SQL(" ORDER BY object_name, raw_data_name, timestamp"))

    query = sql.Composed(query)

    # getting the data
    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query, post_convert="pyarrow")

    if df.empty:
        return df.set_index(["object_name", "raw_data_name", "timestamp"]) if output_type == "DataFrame" else {}

    # creating file_path column (not used in all cases but needed for the convert_binary function)
    if save_dir:
        df["file_path"] = df.apply(
            lambda row: save_dir
            / f"{row['object_name']}_{row['raw_data_name']}_{row['timestamp']:%Y%m%d%H%M%S}{row['data_type_name']}",
            axis=1,
        )
    else:
        df["file_path"] = None

    # converting the values
    df["value"] = df.apply(lambda row: convert_from_binary(row["value"], row["data_type_name"], row["file_path"]), axis=1)

    # making value be the name of the file in case of saving
    if save_dir:
        df["value"] = df["file_path"]
    # dropping the file_path column
    df = df.drop(columns=["file_path"])

    df = df.set_index(["object_name", "raw_data_name", "timestamp"])

    if output_type == "DataFrame":
        return df

    # converting to dictionary
    result = df.to_dict(orient="index")

    # changing from {(obj_name, raw_data_name, timestamp): {column: value, ...}, ...} to {obj_name: {raw_data_name: {timestamp: {column: value, ...}, ...}, ...}, ...}
    final_result = {}
    for (obj_name, raw_data_name, timestamp), data in result.items():
        if obj_name not in final_result:
            final_result[obj_name] = {}
        if raw_data_name not in final_result[obj_name]:
            final_result[obj_name][raw_data_name] = {}
        final_result[obj_name][raw_data_name][timestamp] = data

    return final_result

get_timestamps(period, object_names=None, object_models=None, raw_data_names=None, sort_timestamps=None, limit=None, filter_type='and', output_type='dict')

Gets the timestamps of raw data entries for a given period and set of filters.

Parameters:

  • period

    (DateTimeRange) –

    The time range for which to get the timestamps.

  • object_names

    (list[str] | None, default: None ) –

    List of object names to filter the results, by default None.

  • object_models

    (list[str] | None, default: None ) –

    List of object model names to filter the results, by default None.

  • raw_data_names

    (list[str] | None, default: None ) –

    List of raw data value names to filter the results, by default None.

  • sort_timestamps

    (None | Literal['ASC', 'DESC'], default: None ) –

    If set, will sort the timestamps in the specified order. Can be one of ["ASC", "DESC"], by default None.

  • limit

    (int | None, default: None ) –

    The maximum number of timestamps to return. If None, all timestamps will be returned, by default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"], by default "and".

  • output_type

    (Literal['dict', 'DataFrame'], default: 'dict' ) –

    The format of the returned data. Can be one of ["dict", "DataFrame"], by default "dict".

Returns:

  • dict[str, dict[str, list[Timestamp]]]

    If output_type is "dict", a dictionary in the format {object_name: {raw_data_name: [timestamp, ...], ...}, ...}.

  • DataFrame

    If output_type is "DataFrame", a DataFrame in the format index=MultiIndex[object_name, raw_data_name], columns=timestamp.

Source code in echo_postgres/rawdata_values.py
@validate_call
def get_timestamps(
    self,
    period: DateTimeRange,
    object_names: list[str] | None = None,
    object_models: list[str] | None = None,
    raw_data_names: list[str] | None = None,
    sort_timestamps: None | Literal["ASC", "DESC"] = None,
    limit: Annotated[int | None, Field(gt=0)] = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame"] = "dict",
) -> dict[str, dict[str, list[Timestamp]]] | DataFrame:
    """Gets the timestamps of raw data entries for a given period and set of filters.

    Parameters
    ----------
    period : DateTimeRange
        The time range for which to get the timestamps.
    object_names : list[str] | None, optional
        List of object names to filter the results, by default None.
    object_models : list[str] | None, optional
        List of object model names to filter the results, by default None.
    raw_data_names : list[str] | None, optional
        List of raw data value names to filter the results, by default None.
    sort_timestamps : None | Literal["ASC", "DESC"], optional
        If set, will sort the timestamps in the specified order. Can be one of ["ASC", "DESC"], by default None.
    limit : int | None, optional
        The maximum number of timestamps to return. If None, all timestamps will be returned, by default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"], by default "and".
    output_type : Literal["dict", "DataFrame"], optional
        The format of the returned data. Can be one of ["dict", "DataFrame"], by default "dict".

    Returns
    -------
    dict[str, dict[str, list[Timestamp]]]
        If output_type is "dict", a dictionary in the format {object_name: {raw_data_name: [timestamp, ...], ...}, ...}.
    DataFrame
        If output_type is "DataFrame", a DataFrame in the format index=MultiIndex[object_name, raw_data_name], columns=timestamp.
    """
    where = self._check_get_args(
        object_names=object_names,
        object_models=object_models,
        raw_data_names=raw_data_names,
        filter_type=filter_type,
    )

    # building query
    query = [
        sql.SQL(
            "SELECT object_name, raw_data_name, timestamp FROM performance.v_raw_data_values WHERE (timestamp >= {period_start} AND timestamp <= {period_end})",
        ).format(
            period_start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            period_end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
        where,
        sql.SQL(" ORDER BY object_name, raw_data_name, timestamp {sort}").format(
            sort=sql.SQL(sort_timestamps) if sort_timestamps else sql.SQL(""),
        ),
        sql.SQL(" LIMIT {limit}").format(limit=sql.Literal(limit)) if limit else sql.SQL(""),
    ]

    query = sql.Composed(query)

    # getting the data
    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query)

    if output_type == "DataFrame":
        df = df.set_index(["object_name", "raw_data_name"])
        return df

    # grouping by to have lists of timestamps
    df = df.groupby(["object_name", "raw_data_name"])["timestamp"].apply(list).reset_index()
    return (
        df.groupby("object_name")
        .apply(
            lambda x: dict(
                zip(x["raw_data_name"], x["timestamp"], strict=False),
            ),
            include_groups=False,
        )
        .to_dict()
    )

insert(data, value_as_path=False, on_conflict='ignore')

Inserts one or multiple raw data values into the database.

Parameters:

  • data

    (dict[str, dict[str, dict[datetime, dict[str, Any]]]]) –

    Dict containing the data to be inserted. The format is {object_name: {raw_data_name: {timestamp: {"value": value, "metadata": dict | None}, ...}, ...}, ...}.

  • value_as_path

    (bool, default: False ) –

    If set to True, will treat the value as a file path and try to read the file and insert it as binary, by default False.

  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"

Source code in echo_postgres/rawdata_values.py
@validate_call
def insert(
    self,
    data: dict[str, dict[str, dict[datetime, dict[str, Any]]]],
    value_as_path: bool = False,
    on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
    """Inserts one or multiple raw data values into the database.

    Parameters
    ----------
    data : dict[str, dict[str, dict[datetime, dict[str, Any]]]]
        Dict containing the data to be inserted. The format is {object_name: {raw_data_name: {timestamp: {"value": value, "metadata": dict | None}, ...}, ...}, ...}.
    value_as_path : bool, optional
        If set to True, will treat the value as a file path and try to read the file and insert it as binary, by default False.
    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict. Can be one of ["ignore", "update"].
        By default "ignore"
    """
    obj_ids = self._perfdb.objects.instances.get_ids(object_names=list(data.keys()))
    obj_models = self._perfdb.objects.instances.get(object_names=list(data.keys()), output_type="DataFrame")[
        "object_model_name"
    ].to_dict()
    if len(obj_models) != len(data):
        wrong_objs = set(data.keys()) - set(obj_models.keys())
        raise ValueError(f"Object names {wrong_objs} not found in the database")

    raw_data_def = self._perfdb.rawdata.definitions.get(object_models=list(set(obj_models.values())), output_type="dict")

    for obj_name, obj_data in data.items():
        for raw_data_name, raw_data_data in obj_data.items():
            if raw_data_name not in raw_data_def[obj_models[obj_name]]:
                raise ValueError(
                    f"Raw data name {raw_data_name} not found in the database for object {obj_name} of model {obj_models[obj_name]}",
                )
            for timestamp_data in raw_data_data.values():
                if {"value", "metadata"} != set(timestamp_data.keys()):
                    raise ValueError(f"Expected keys in timestamp_data to be ['value', 'metadata'], got {list(timestamp_data.keys())}")
                if not isinstance(timestamp_data["metadata"], dict | type(None)):
                    raise TypeError(f"metadata must be a dict or None, not {type(timestamp_data['metadata'])}")
                if value_as_path and not isinstance(timestamp_data["value"], Path):
                    raise TypeError(f"if value_as_path is True value must be a Path, not {type(timestamp_data['value'])}")

    # inserting the data
    for obj_name, obj_data in data.items():
        for raw_data_name, raw_data_data in obj_data.items():
            data_type_name = raw_data_def[obj_models[obj_name]][raw_data_name]["data_type_name"]
            for timestamp, timestamp_data in raw_data_data.items():
                value = timestamp_data["value"]
                # reading from file
                if value_as_path:
                    # validating extension
                    if value.suffix.lower() != data_type_name.lower():
                        raise ValueError(f"Expected file to have extension '{data_type_name}', got '{value.suffix}'")
                    with value.open(mode="rb") as f:
                        value: bytes = f.read()
                # direct conversion
                else:
                    value: bytes = convert_to_binary(value, data_type_name)
                metadata = timestamp_data["metadata"]

                # inserting
                query = [
                    sql.SQL(
                        "INSERT INTO performance.raw_data_values (object_id, raw_data_id, timestamp, value, metadata)\n"
                        "VALUES ({object_id}, {raw_data_id}, {timestamp}, %s, {metadata})\n"
                        "ON CONFLICT ON CONSTRAINT raw_data_values_pkey DO\n",
                    ).format(
                        object_id=sql.Literal(obj_ids[obj_name]),
                        raw_data_id=sql.Literal(raw_data_def[obj_models[obj_name]][raw_data_name]["raw_data_id"]),
                        timestamp=sql.Literal(f"{timestamp:%Y-%m-%d %H:%M:%S}"),
                        metadata=sql.Literal(json.dumps(metadata, default=json_serial)) if metadata else sql.SQL("NULL"),
                    ),
                ]
                if on_conflict == "ignore":
                    query.append(sql.SQL("NOTHING"))
                else:
                    query.append(sql.SQL("UPDATE SET value = EXCLUDED.value, metadata = EXCLUDED.metadata"))

                query = sql.Composed(query)

                with self._perfdb.conn.reconnect() as conn:
                    conn.execute(
                        query,
                        (value,),
                    )

                logger.debug(f"{obj_name} - {raw_data_name} - {timestamp}: value inserted")