Raw Data Values¶
RawDataValues(perfdb)
¶
Class used for handling raw data values. Can be accessed via perfdb.rawdata.values.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(period, object_names=None, raw_data_names=None)
¶
Delete raw data values based on the specified criteria.
Parameters:
-
(period¶DateTimeRange) –The time period within which the raw data values should be deleted.
-
(object_names¶list[str] | None, default:None) –A list of object names. Only raw data values associated with these object names will be deleted. If None, all object names will be considered.
-
(raw_data_names¶list[str] | None, default:None) –A list of raw data names. Only raw data values with these names will be deleted. If None, all raw data names will be considered.
Source code in echo_postgres/rawdata_values.py
@validate_call
def delete(
self,
period: DateTimeRange,
object_names: list[str] | None = None,
raw_data_names: list[str] | None = None,
) -> None:
"""
Delete raw data values based on the specified criteria.
Parameters
----------
period : DateTimeRange
The time period within which the raw data values should be deleted.
object_names : list[str] | None, optional
A list of object names. Only raw data values associated with these object names will be deleted.
If None, all object names will be considered.
raw_data_names : list[str] | None, optional
A list of raw data names. Only raw data values with these names will be deleted.
If None, all raw data names will be considered.
"""
if object_names:
obj_models = self._perfdb.objects.instances.get(object_names=object_names, output_type="DataFrame")[
"object_model_name"
].to_dict()
object_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
if len(object_ids) != len(object_names):
wrong_objs = set(object_names) - set(object_ids.keys())
raise ValueError(f"Object names {wrong_objs} not found in the database")
# converting to list
object_ids = list(object_ids.values())
else:
object_ids = None
# raw_data_names
if raw_data_names:
raw_data_ids: dict[str, dict[str, int]] = self._perfdb.rawdata.definitions.get_ids(
raw_data_names=raw_data_names,
object_models=list(set(obj_models.values())) if object_names else None,
)
# converting to list
raw_data_ids = [x for v in raw_data_ids.values() for x in v.values()]
else:
raw_data_ids = None
# building query
query = [
sql.SQL(
"DELETE FROM performance.raw_data_values WHERE (timestamp >= {period_start} AND timestamp <= {period_end})",
).format(
period_start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
period_end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
if object_ids:
query.append(sql.SQL(" AND object_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, object_ids))))
if raw_data_ids:
query.append(sql.SQL(" AND raw_data_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, raw_data_ids))))
query = sql.Composed(query)
# executing the query
with self._perfdb.conn.reconnect() as conn:
# deleting
result = conn.execute(sql.Composed(query))
logger.debug(f"Deleted {result.rowcount} rows from performance.raw_data_values")
get(period, object_names=None, object_models=None, raw_data_names=None, filter_type='and', save_dir=None, output_type='DataFrame')
¶
Gets the raw data values for a given period and set of filters.
Parameters:
-
(period¶DateTimeRange | list[date]) –Can be a DateTimeRange or a list of dates.
- If DateTimeRange, will get the data for the entire range (limiting on start and end)
- If list of dates, will get the data for each date in the list.
-
(object_names¶list[str] | None, default:None) –List of object names to filter the results, by default None.
-
(object_models¶list[str] | None, default:None) –List of object model names to filter the results, by default None.
-
(raw_data_names¶list[str] | None, default:None) –List of raw data value names to filter the results, by default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"], by default "and".
-
(save_dir¶Path | None, default:None) –If set to a directory, will try to save all the data as files in it. The files will have the name like {object_name}{raw_data_name}{YYYYmmddHHMMSS}.{extension}. Instead of the converted value, the DataFrame or dictionary will contain the path to the file. If set to None, no files will be saved but the data will be converted from binary and returned directly. By default None
-
(output_type¶Literal['dict', 'DataFrame'], default:'DataFrame') –The format of the returned data. Can be one of ["dict", "DataFrame"], by default "DataFrame".
Returns:
-
DataFrame–DataFrame in the format index=MultiIndex[object_name, raw_data_name, timestamp], columns=[value, metadata, ...]
-
dict[str, dict[str, dict[Timestamp, dict[str, Any]]]]–A dictionary in the format {object_name: {raw_data_name: {timestamp: {value: value, metadata: metadata, ...}, ...}, ...}, ...}
Source code in echo_postgres/rawdata_values.py
@validate_call
def get(
self,
period: DateTimeRange | list[date],
object_names: list[str] | None = None,
object_models: list[str] | None = None,
raw_data_names: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
save_dir: Path | None = None,
output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, dict[str, dict[Timestamp, dict[str, Any]]]]:
"""Gets the raw data values for a given period and set of filters.
Parameters
----------
period : DateTimeRange | list[date]
Can be a DateTimeRange or a list of dates.
- If DateTimeRange, will get the data for the entire range (limiting on start and end)
- If list of dates, will get the data for each date in the list.
object_names : list[str] | None, optional
List of object names to filter the results, by default None.
object_models : list[str] | None, optional
List of object model names to filter the results, by default None.
raw_data_names : list[str] | None, optional
List of raw data value names to filter the results, by default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"], by default "and".
save_dir : Path | None, optional
If set to a directory, will try to save all the data as files in it. The files will have the name like {object_name}_{raw_data_name}_{YYYYmmddHHMMSS}.{extension}.
Instead of the converted value, the DataFrame or dictionary will contain the path to the file.
If set to None, no files will be saved but the data will be converted from binary and returned directly.
By default None
output_type : Literal["dict", "DataFrame"], optional
The format of the returned data. Can be one of ["dict", "DataFrame"], by default "DataFrame".
Returns
-------
DataFrame
DataFrame in the format index=MultiIndex[object_name, raw_data_name, timestamp], columns=[value, metadata, ...]
dict[str, dict[str, dict[Timestamp, dict[str, Any]]]]
A dictionary in the format {object_name: {raw_data_name: {timestamp: {value: value, metadata: metadata, ...}, ...}, ...}, ...}
"""
where = self._check_get_args(
object_names=object_names,
object_models=object_models,
raw_data_names=raw_data_names,
filter_type=filter_type,
)
# building query
query = [
sql.SQL(
"SELECT object_name, raw_data_name, timestamp, value, metadata, data_type_name FROM performance.v_raw_data_values WHERE ",
),
]
if isinstance(period, DateTimeRange):
where_period = sql.SQL(" (timestamp >= {period_start} AND timestamp <= {period_end}) ").format(
period_start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
period_end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
)
else:
where_period = sql.SQL(" (timestamp::date IN ({dates})) ").format(
dates=sql.SQL(", ").join(sql.Literal(x.strftime("%Y-%m-%d")) for x in period),
)
query.append(where_period)
query.append(where)
query.append(sql.SQL(" ORDER BY object_name, raw_data_name, timestamp"))
query = sql.Composed(query)
# getting the data
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query, post_convert="pyarrow")
if df.empty:
return df.set_index(["object_name", "raw_data_name", "timestamp"]) if output_type == "DataFrame" else {}
# creating file_path column (not used in all cases but needed for the convert_binary function)
if save_dir:
df["file_path"] = df.apply(
lambda row: save_dir
/ f"{row['object_name']}_{row['raw_data_name']}_{row['timestamp']:%Y%m%d%H%M%S}{row['data_type_name']}",
axis=1,
)
else:
df["file_path"] = None
# converting the values
df["value"] = df.apply(lambda row: convert_from_binary(row["value"], row["data_type_name"], row["file_path"]), axis=1)
# making value be the name of the file in case of saving
if save_dir:
df["value"] = df["file_path"]
# dropping the file_path column
df = df.drop(columns=["file_path"])
df = df.set_index(["object_name", "raw_data_name", "timestamp"])
if output_type == "DataFrame":
return df
# converting to dictionary
result = df.to_dict(orient="index")
# changing from {(obj_name, raw_data_name, timestamp): {column: value, ...}, ...} to {obj_name: {raw_data_name: {timestamp: {column: value, ...}, ...}, ...}, ...}
final_result = {}
for (obj_name, raw_data_name, timestamp), data in result.items():
if obj_name not in final_result:
final_result[obj_name] = {}
if raw_data_name not in final_result[obj_name]:
final_result[obj_name][raw_data_name] = {}
final_result[obj_name][raw_data_name][timestamp] = data
return final_result
get_timestamps(period, object_names=None, object_models=None, raw_data_names=None, sort_timestamps=None, limit=None, filter_type='and', output_type='dict')
¶
Gets the timestamps of raw data entries for a given period and set of filters.
Parameters:
-
(period¶DateTimeRange) –The time range for which to get the timestamps.
-
(object_names¶list[str] | None, default:None) –List of object names to filter the results, by default None.
-
(object_models¶list[str] | None, default:None) –List of object model names to filter the results, by default None.
-
(raw_data_names¶list[str] | None, default:None) –List of raw data value names to filter the results, by default None.
-
(sort_timestamps¶None | Literal['ASC', 'DESC'], default:None) –If set, will sort the timestamps in the specified order. Can be one of ["ASC", "DESC"], by default None.
-
(limit¶int | None, default:None) –The maximum number of timestamps to return. If None, all timestamps will be returned, by default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"], by default "and".
-
(output_type¶Literal['dict', 'DataFrame'], default:'dict') –The format of the returned data. Can be one of ["dict", "DataFrame"], by default "dict".
Returns:
-
dict[str, dict[str, list[Timestamp]]]–If output_type is "dict", a dictionary in the format {object_name: {raw_data_name: [timestamp, ...], ...}, ...}.
-
DataFrame–If output_type is "DataFrame", a DataFrame in the format index=MultiIndex[object_name, raw_data_name], columns=timestamp.
Source code in echo_postgres/rawdata_values.py
@validate_call
def get_timestamps(
self,
period: DateTimeRange,
object_names: list[str] | None = None,
object_models: list[str] | None = None,
raw_data_names: list[str] | None = None,
sort_timestamps: None | Literal["ASC", "DESC"] = None,
limit: Annotated[int | None, Field(gt=0)] = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame"] = "dict",
) -> dict[str, dict[str, list[Timestamp]]] | DataFrame:
"""Gets the timestamps of raw data entries for a given period and set of filters.
Parameters
----------
period : DateTimeRange
The time range for which to get the timestamps.
object_names : list[str] | None, optional
List of object names to filter the results, by default None.
object_models : list[str] | None, optional
List of object model names to filter the results, by default None.
raw_data_names : list[str] | None, optional
List of raw data value names to filter the results, by default None.
sort_timestamps : None | Literal["ASC", "DESC"], optional
If set, will sort the timestamps in the specified order. Can be one of ["ASC", "DESC"], by default None.
limit : int | None, optional
The maximum number of timestamps to return. If None, all timestamps will be returned, by default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"], by default "and".
output_type : Literal["dict", "DataFrame"], optional
The format of the returned data. Can be one of ["dict", "DataFrame"], by default "dict".
Returns
-------
dict[str, dict[str, list[Timestamp]]]
If output_type is "dict", a dictionary in the format {object_name: {raw_data_name: [timestamp, ...], ...}, ...}.
DataFrame
If output_type is "DataFrame", a DataFrame in the format index=MultiIndex[object_name, raw_data_name], columns=timestamp.
"""
where = self._check_get_args(
object_names=object_names,
object_models=object_models,
raw_data_names=raw_data_names,
filter_type=filter_type,
)
# building query
query = [
sql.SQL(
"SELECT object_name, raw_data_name, timestamp FROM performance.v_raw_data_values WHERE (timestamp >= {period_start} AND timestamp <= {period_end})",
).format(
period_start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
period_end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
where,
sql.SQL(" ORDER BY object_name, raw_data_name, timestamp {sort}").format(
sort=sql.SQL(sort_timestamps) if sort_timestamps else sql.SQL(""),
),
sql.SQL(" LIMIT {limit}").format(limit=sql.Literal(limit)) if limit else sql.SQL(""),
]
query = sql.Composed(query)
# getting the data
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query)
if output_type == "DataFrame":
df = df.set_index(["object_name", "raw_data_name"])
return df
# grouping by to have lists of timestamps
df = df.groupby(["object_name", "raw_data_name"])["timestamp"].apply(list).reset_index()
return (
df.groupby("object_name")
.apply(
lambda x: dict(
zip(x["raw_data_name"], x["timestamp"], strict=False),
),
include_groups=False,
)
.to_dict()
)
insert(data, value_as_path=False, on_conflict='ignore')
¶
Inserts one or multiple raw data values into the database.
Parameters:
-
(data¶dict[str, dict[str, dict[datetime, dict[str, Any]]]]) –Dict containing the data to be inserted. The format is {object_name: {raw_data_name: {timestamp: {"value": value, "metadata": dict | None}, ...}, ...}, ...}.
-
(value_as_path¶bool, default:False) –If set to True, will treat the value as a file path and try to read the file and insert it as binary, by default False.
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"
Source code in echo_postgres/rawdata_values.py
@validate_call
def insert(
self,
data: dict[str, dict[str, dict[datetime, dict[str, Any]]]],
value_as_path: bool = False,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
"""Inserts one or multiple raw data values into the database.
Parameters
----------
data : dict[str, dict[str, dict[datetime, dict[str, Any]]]]
Dict containing the data to be inserted. The format is {object_name: {raw_data_name: {timestamp: {"value": value, "metadata": dict | None}, ...}, ...}, ...}.
value_as_path : bool, optional
If set to True, will treat the value as a file path and try to read the file and insert it as binary, by default False.
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. Can be one of ["ignore", "update"].
By default "ignore"
"""
obj_ids = self._perfdb.objects.instances.get_ids(object_names=list(data.keys()))
obj_models = self._perfdb.objects.instances.get(object_names=list(data.keys()), output_type="DataFrame")[
"object_model_name"
].to_dict()
if len(obj_models) != len(data):
wrong_objs = set(data.keys()) - set(obj_models.keys())
raise ValueError(f"Object names {wrong_objs} not found in the database")
raw_data_def = self._perfdb.rawdata.definitions.get(object_models=list(set(obj_models.values())), output_type="dict")
for obj_name, obj_data in data.items():
for raw_data_name, raw_data_data in obj_data.items():
if raw_data_name not in raw_data_def[obj_models[obj_name]]:
raise ValueError(
f"Raw data name {raw_data_name} not found in the database for object {obj_name} of model {obj_models[obj_name]}",
)
for timestamp_data in raw_data_data.values():
if {"value", "metadata"} != set(timestamp_data.keys()):
raise ValueError(f"Expected keys in timestamp_data to be ['value', 'metadata'], got {list(timestamp_data.keys())}")
if not isinstance(timestamp_data["metadata"], dict | type(None)):
raise TypeError(f"metadata must be a dict or None, not {type(timestamp_data['metadata'])}")
if value_as_path and not isinstance(timestamp_data["value"], Path):
raise TypeError(f"if value_as_path is True value must be a Path, not {type(timestamp_data['value'])}")
# inserting the data
for obj_name, obj_data in data.items():
for raw_data_name, raw_data_data in obj_data.items():
data_type_name = raw_data_def[obj_models[obj_name]][raw_data_name]["data_type_name"]
for timestamp, timestamp_data in raw_data_data.items():
value = timestamp_data["value"]
# reading from file
if value_as_path:
# validating extension
if value.suffix.lower() != data_type_name.lower():
raise ValueError(f"Expected file to have extension '{data_type_name}', got '{value.suffix}'")
with value.open(mode="rb") as f:
value: bytes = f.read()
# direct conversion
else:
value: bytes = convert_to_binary(value, data_type_name)
metadata = timestamp_data["metadata"]
# inserting
query = [
sql.SQL(
"INSERT INTO performance.raw_data_values (object_id, raw_data_id, timestamp, value, metadata)\n"
"VALUES ({object_id}, {raw_data_id}, {timestamp}, %s, {metadata})\n"
"ON CONFLICT ON CONSTRAINT raw_data_values_pkey DO\n",
).format(
object_id=sql.Literal(obj_ids[obj_name]),
raw_data_id=sql.Literal(raw_data_def[obj_models[obj_name]][raw_data_name]["raw_data_id"]),
timestamp=sql.Literal(f"{timestamp:%Y-%m-%d %H:%M:%S}"),
metadata=sql.Literal(json.dumps(metadata, default=json_serial)) if metadata else sql.SQL("NULL"),
),
]
if on_conflict == "ignore":
query.append(sql.SQL("NOTHING"))
else:
query.append(sql.SQL("UPDATE SET value = EXCLUDED.value, metadata = EXCLUDED.metadata"))
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
conn.execute(
query,
(value,),
)
logger.debug(f"{obj_name} - {raw_data_name} - {timestamp}: value inserted")