CCEE TUST¶
CceeTust(perfdb)
¶
Class used for handling TUST values. Can be accessed via perfdb.ccee.tust.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(period, object_names=None)
¶
Deletes TUST values from the database.
Parameters:
-
(period¶DateTimeRange) –Period of time to delete the data for.
-
(object_names¶list[str], default:None) –List of object names to delete the data for. By default None
Source code in echo_postgres/ccee_tust.py
@validate_call
def delete(
self,
period: DateTimeRange,
object_names: list[str] | None = None,
) -> None:
"""Deletes TUST values from the database.
Parameters
----------
period : DateTimeRange
Period of time to delete the data for.
object_names : list[str], optional
List of object names to delete the data for. By default None
"""
# build the query
query = [
sql.SQL("DELETE FROM performance.tust_values WHERE (date >= {start} AND date <= {end})").format(
start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
if object_names:
# getting object id
obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
if len(obj_ids) != len(object_names):
missing_objs = set(object_names) - set(obj_ids)
raise ValueError(f"Could not find the following objects: {missing_objs}")
query.append(sql.SQL(" AND object_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, obj_ids.values()))))
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
# deleting
result = conn.execute(query)
logger.debug(f"Deleted {result.rowcount} rows from tust_values table")
get(period, object_names=None, filter_type='and', output_type='DataFrame', values_only=False)
¶
Gets TUST values for the desired period and objects.
The most useful keys/columns returned are:
- value
Parameters:
-
(period¶DateTimeRange) –Period of time to get the data for.
-
(object_names¶list[str], default:None) –List of object names to get the data for. By default None (meaning all objects). -
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(output_type¶Literal['dict', 'DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
-
(values_only¶bool, default:False) –If set to True, when returning a dict will only return the values, ignoring other attributes like modified_date. Is ignored when output_type is "DataFrame". By default False
Returns:
-
DataFrame–In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex["object_name", "date"], columns = [values, modified_date]
-
dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]–In case output_type is "dict", returns a dictionary in the format {group_type_name: {object_name: {date: {attribute: value, ...}, ...}, ...}
Source code in echo_postgres/ccee_tust.py
@validate_call
def get(
self,
period: DateTimeRange,
object_names: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame"] = "DataFrame",
values_only: bool = False,
) -> DataFrame | dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]:
"""Gets TUST values for the desired period and objects.
The most useful keys/columns returned are:
- value
Parameters
----------
period : DateTimeRange
Period of time to get the data for.
object_names : list[str], optional
List of object names to get the data for. By default None (meaning all objects).
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
values_only : bool, optional
If set to True, when returning a dict will only return the values, ignoring other attributes like modified_date. Is ignored when output_type is "DataFrame". By default False
Returns
-------
DataFrame
In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex["object_name", "date"], columns = [values, modified_date]
dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]
In case output_type is "dict", returns a dictionary in the format {group_type_name: {object_name: {date: {attribute: value, ...}, ...}, ...}
"""
# Adjust period to first day of the month for start and end dates
adjusted_start = period.start.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
adjusted_end = period.end.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
# build the query
query = [
sql.SQL(
"SELECT * FROM performance.{table} WHERE (date >= {start} AND date <= {end})",
).format(
table=sql.Identifier("v_tust_values"),
start=sql.Literal(f"{adjusted_start:%Y-%m-%d %H:%M:%S}"),
end=sql.Literal(f"{adjusted_end:%Y-%m-%d %H:%M:%S}"),
),
]
where = []
if object_names:
where.append(
sql.SQL("object_name IN ({names})").format(
names=sql.SQL(", ").join(map(sql.Literal, object_names)),
),
)
if where:
query.append(sql.SQL(" AND ("))
query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
query.append(sql.SQL(")"))
query.append(sql.SQL(" ORDER BY object_name, date"))
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query, post_convert="pyarrow")
# forcing date to be a Timestamp
df["date"] = df["date"].astype("datetime64[s]")
# forcing object_name to be a string
df = df.astype(
{"object_name": "string[pyarrow]"},
)
df = df.astype(
{"object_id": "int64[pyarrow]"},
)
df = df.set_index(["object_name", "date"])
if output_type == "DataFrame":
return df
# dropping id columns not used in dict format
df = df.drop(columns=[col for col in df.columns if col.endswith("_id")])
# converting to Dict
result = df.to_dict(orient="index")
final_result = {}
for (object_name, date), data in result.items():
if object_name not in final_result:
final_result[object_name] = {}
if date not in final_result[object_name]:
final_result[object_name][date] = data["value"] if values_only else data
return final_result
insert(df, on_conflict='ignore')
¶
Inserts TUST values into the database (table tust_values)
Parameters:
-
(df¶DataFrame) –DataFrame with the following columns:
- object_name
- date (must be a date referring to the beginning of the month)
- value
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"
Source code in echo_postgres/ccee_tust.py
@validate_call
def insert(
self,
df: DataFrame,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
"""Inserts TUST values into the database (table tust_values)
Parameters
----------
df : DataFrame
DataFrame with the following columns:
- object_name
- date (must be a date referring to the beginning of the month)
- value
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. Can be one of ["ignore", "update"].
By default "ignore"
"""
# checking inputs
if df.isna().any().any():
raise ValueError("df cannot have NaN values")
if set(df.columns) != {"object_name", "date", "value"}:
additional_cols = set(df.columns) - {"object_name", "date", "value"}
missing_cols = {"object_name", "date", "value"} - set(df.columns)
raise ValueError(
f"df must have the following columns: object_name, date, value. Additional columns: {additional_cols}. Missing columns: {missing_cols}",
)
# Only let date be a value with day 1 of the month
if not all(df["date"].dt.day == 1):
raise ValueError("The TUST value is monthly, so enter a date that represents the entire month (day 01)")
# getting object id
wanted_objs = df["object_name"].unique().tolist()
obj_ids = self._perfdb.objects.instances.get_ids(object_names=wanted_objs)
if len(obj_ids) != len(wanted_objs):
missing_objs = set(wanted_objs) - set(obj_ids)
raise ValueError(f"Could not find the following objects: {missing_objs}")
df["object_id"] = df["object_name"].map(obj_ids)
# removing unwanted columns
df = df.drop(columns=["object_name"])
# converting value column to float
df["value"] = df["value"].astype("float32")
# inserting data
if_exists_mapping = {
"ignore": "append",
"update": "update",
}
with self._perfdb.conn.reconnect() as conn:
conn.pandas_to_sql(
df=df,
table_name="tust_values",
schema="performance",
if_exists=if_exists_mapping[on_conflict],
ignore_index=True,
)
logger.debug("TUST values inserted into the database")