KPI Availability Amounts¶
KpiAvailabilityAmounts(perfdb)
¶
Class used for handling availability amounts. Can be accessed via perfdb.kpis.availability.amounts.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(period, object_names=None, availability_categories=None)
¶
Deletes availability amounts from the database.
Parameters:
-
(period¶DateTimeRange) –Period of time to delete the data.
-
(object_names¶list[str], default:None) –List of object names to delete data. If None will delete for all objects. By default None
-
(availability_categories¶list[str], default:None) –List of availability categories to delete data. If None will delete for all categories. By default None
Source code in echo_postgres/kpi_availability_amounts.py
@validate_call
def delete(
self,
period: DateTimeRange,
object_names: list[str] | None = None,
availability_categories: list[str] | None = None,
) -> None:
"""Deletes availability amounts from the database.
Parameters
----------
period : DateTimeRange
Period of time to delete the data.
object_names : list[str], optional
List of object names to delete data. If None will delete for all objects. By default None
availability_categories : list[str], optional
List of availability categories to delete data. If None will delete for all categories. By default None
"""
# build the query
query = [
sql.SQL("DELETE FROM performance.availability_amounts WHERE (date >= {start} AND date <= {end})").format(
start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
if object_names:
# getting object id
obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
if len(obj_ids) != len(object_names):
missing_objs = set(object_names) - set(obj_ids)
raise ValueError(f"Could not find the following objects: {missing_objs}")
query.append(sql.SQL(" AND object_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, obj_ids.values()))))
if availability_categories:
# getting availability category id
cat_ids = self._perfdb.kpis.availability.categories.get_ids()
cat_ids = {k: v for k, v in cat_ids.items() if k in availability_categories}
if len(cat_ids) != len(availability_categories):
missing_cats = set(availability_categories) - set(cat_ids)
raise ValueError(f"Could not find the following availability categories: {missing_cats}")
query.append(
sql.SQL(" AND availability_category_id IN ({ids})").format(
ids=sql.SQL(", ").join(map(sql.Literal, cat_ids.values())),
),
)
query = sql.Composed(query)
# deleting data
with self._perfdb.conn.reconnect() as conn:
# deleting
result = conn.execute(query)
logger.debug(f"Deleted {result.rowcount} rows from availability_amounts table")
get(period, object_names=None, availability_categories=None, filter_type='and', output_type='DataFrame')
¶
Gets all availability amounts with detailed information.
The most useful keys/columns returned are:
- time_amount
- energy_amount
Parameters:
-
(period¶DateTimeRange) –Period of time to get the data for.
-
(object_names¶list[str], default:None) –List of object names to get the data for. By default None
-
(availability_categories¶list[str], default:None) –List of availability categories to get the data for. By default None
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(output_type¶Literal['dict', 'DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
Returns:
-
DataFrame–In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[object_name, date, availability_category_name], columns = [time_amount, energy_amount, ...]
-
dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]–In case output_type is "dict", returns a dictionary in the format {object_name: {date: {availability_category_name: {attribute: value, ...}, ...}, ...}, ...}
Source code in echo_postgres/kpi_availability_amounts.py
@validate_call
def get(
self,
period: DateTimeRange,
object_names: list[str] | None = None,
availability_categories: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]:
"""Gets all availability amounts with detailed information.
The most useful keys/columns returned are:
- time_amount
- energy_amount
Parameters
----------
period : DateTimeRange
Period of time to get the data for.
object_names : list[str], optional
List of object names to get the data for. By default None
availability_categories : list[str], optional
List of availability categories to get the data for. By default None
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
Returns
-------
DataFrame
In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[object_name, date, availability_category_name], columns = [time_amount, energy_amount, ...]
dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]
In case output_type is "dict", returns a dictionary in the format {object_name: {date: {availability_category_name: {attribute: value, ...}, ...}, ...}, ...}
"""
# build the query
query = [
sql.SQL("SELECT * FROM performance.v_availability_amounts WHERE (date >= {start} AND date <= {end})").format(
start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
where = []
if object_names:
where.append(sql.SQL("object_name IN ({names})").format(names=sql.SQL(", ").join(map(sql.Literal, object_names))))
if availability_categories:
where.append(
sql.SQL("availability_category_name IN ({names})").format(
names=sql.SQL(", ").join(map(sql.Literal, availability_categories)),
),
)
if where:
query.append(sql.SQL(" AND ("))
query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
query.append(sql.SQL(")"))
query.append(sql.SQL(" ORDER BY object_name, date, availability_category_name"))
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query, post_convert="pyarrow")
# forcing date to be a Timestamp
df["date"] = df["date"].astype("datetime64[s]")
df = df.set_index(["object_name", "date", "availability_category_name"])
if output_type == "DataFrame":
return df
# dropping id columns not used in dict format
df = df.drop(columns=[col for col in df.columns if col.endswith("_id")])
# converting to Dict
result = df.to_dict(orient="index")
final_result = {}
for (object_name, date, availability_category_name), data in result.items():
if object_name not in final_result:
final_result[object_name] = {}
if date not in final_result[object_name]:
final_result[object_name][date] = {}
final_result[object_name][date][availability_category_name] = data
return final_result
insert(df, on_conflict='ignore')
¶
Inserts availability amounts into the database.
Parameters:
-
(df¶DataFrame) –DataFrame with the availability amounts. Must have the following columns:
- object_name
- availability_category_name
- date
- time_amount
- energy_amount
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"
Source code in echo_postgres/kpi_availability_amounts.py
@validate_call
def insert(
self,
df: DataFrame,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
"""Inserts availability amounts into the database.
Parameters
----------
df : DataFrame
DataFrame with the availability amounts. Must have the following columns:
- object_name
- availability_category_name
- date
- time_amount
- energy_amount
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. Can be one of ["ignore", "update"].
By default "ignore"
"""
# checking inputs
if df.isna().any().any():
raise ValueError("df cannot have any NaN values")
if set(df.columns) != {
"object_name",
"availability_category_name",
"date",
"time_amount",
"energy_amount",
}:
additional_cols = set(df.columns) - {"object_name", "availability_category_name", "date", "time_amount", "energy_amount"}
missing_cols = {"object_name", "availability_category_name", "date", "time_amount", "energy_amount"} - set(df.columns)
raise ValueError(
f"df must have the following columns: object_name, availability_category_name, date, time_amount, energy_amount. Additional columns: {additional_cols}. Missing columns: {missing_cols}",
)
# making a copy of the DataFrame
df = df.copy()
# dropping NaN
df = df.dropna(how="any")
# getting object id
wanted_objs = df["object_name"].unique().tolist()
obj_ids = self._perfdb.objects.instances.get_ids(object_names=wanted_objs)
if len(obj_ids) != len(wanted_objs):
missing_objs = set(wanted_objs) - set(obj_ids)
raise ValueError(f"Could not find the following objects: {missing_objs}")
df["object_id"] = df["object_name"].map(obj_ids)
# getting availability category id
wanted_cats = df["availability_category_name"].unique().tolist()
cat_ids = self._perfdb.kpis.availability.categories.get_ids()
cat_ids = {k: v for k, v in cat_ids.items() if k in wanted_cats}
if len(cat_ids) != len(wanted_cats):
missing_cats = set(wanted_cats) - set(cat_ids)
raise ValueError(f"Could not find the following availability categories: {missing_cats}")
df["availability_category_id"] = df["availability_category_name"].map(cat_ids)
# removing unwanted columns
df = df.drop(columns=["object_name", "availability_category_name"])
# inserting data
if_exists_mapping = {
"ignore": "append",
"update": "update",
}
with self._perfdb.conn.reconnect() as conn:
conn.pandas_to_sql(
df=df,
table_name="availability_amounts",
schema="performance",
if_exists=if_exists_mapping[on_conflict],
ignore_index=True,
)
logger.debug("Availability amounts inserted into the database")
sync_bazefield(period, object_names=None, availability_type='all', overwrite=False)
¶
Method to get availability numbers from Bazefield and insert them into the database.
This will save the results in the table "availability_amounts" of performance_db.
Parameters:
-
(period¶DateTimeRange) –Period to get availability numbers from Bazefield. Values will be rounded to the nearest day. Its recommended that the start is at 00:00:00 and the end is at 23:59:59.
-
(object_names¶list[str] | None, default:None) –Name of the objects to get the availability from. If set to None will get all that match the object types allowed in ALLOWED_AVAILABILITY_OBJECT_TYPES. By default None
-
(availability_type¶Literal['time', 'energy', 'all'], default:'all') –Type of availability to calculate, by default "all"
-
(overwrite¶bool, default:False) –If set to True, will overwrite the existing values in the database, by default False
Returns:
-
DataFrame–DataFrame with the availability numbers.
-
DataFrame–DataFrame with the time and / or energy amounts.
-
dict[str, list[DateTimeRange] | str]–Dict containing the SPEs that failed to calculate the amounts and the periods that failed.
The key is the SPE name and the value is a list of periods that failed to calculate the amounts or a string with the error message.
Source code in echo_postgres/kpi_availability_amounts.py
@validate_call
def sync_bazefield(
self,
period: DateTimeRange,
object_names: list[str] | None = None,
availability_type: Literal["time", "energy", "all"] = "all",
overwrite: bool = False,
) -> tuple[DataFrame, DataFrame, dict[str, list[DateTimeRange] | str]]:
"""Method to get availability numbers from Bazefield and insert them into the database.
This will save the results in the table "availability_amounts" of performance_db.
Parameters
----------
period : DateTimeRange
Period to get availability numbers from Bazefield. Values will be rounded to the nearest day.
Its recommended that the start is at 00:00:00 and the end is at 23:59:59.
object_names : list[str] | None, optional
Name of the objects to get the availability from. If set to None will get all that match the object types allowed in ALLOWED_AVAILABILITY_OBJECT_TYPES.
By default None
availability_type : Literal["time", "energy", "all"], optional
Type of availability to calculate, by default "all"
overwrite : bool, optional
If set to True, will overwrite the existing values in the database, by default False
Returns
-------
DataFrame
DataFrame with the availability numbers.
DataFrame
DataFrame with the time and / or energy amounts.
dict[str, list[DateTimeRange] | str]
Dict containing the SPEs that failed to calculate the amounts and the periods that failed.
The key is the SPE name and the value is a list of periods that failed to calculate the amounts or a string with the error message.
"""
t0 = perf_counter()
# adjusting period to cover the whole day
period = period.copy()
period = period.round(timedelta(days=1), start="floor", end="ceil")
# getting all objects that are allowed to have availability values
allowed_objects = self._perfdb.objects.instances.get(object_types=ALLOWED_AVAILABILITY_OBJECT_TYPES, output_type="DataFrame")
# checking if provided object names are valid
if object_names is None:
object_names = allowed_objects.index.tolist()
elif wrong_names := list(set(object_names) - set(allowed_objects.index)):
raise ValueError(f"Invalid object names: {wrong_names}")
# getting availability data from Bazefield
logger.info(f"Getting availability data from Bazefield for period {period} and objects {object_names}")
baze = Baze()
availability, amounts, failed_spes = baze.kpis.availability.get(
object_names=object_names,
period=period,
subperiod_size=timedelta(days=1),
availability_type=availability_type,
)
original_amounts = amounts.copy()
# adjusting amounts to upload to the database
# dropping "Period" from index
amounts = amounts[amounts.index != "Period"].copy()
# melting to move column levels to rows
amounts = amounts.melt(ignore_index=False).reset_index(drop=False)
# pivoting to get "Time", "Object" and "Category" as index and "Type" (Time/Energy) as columns
amounts = amounts.pivot_table(index=["Time", "Object", "Category"], columns=["Type"], values="value")
# removing "Total" object
amounts = amounts[amounts.index.get_level_values("Object") != "Total"].copy()
# renaming amounts columns
amounts = amounts.rename(columns={"Energy": "energy_amount", "Time": "time_amount"})
# dropping index
amounts = amounts.reset_index(drop=False)
# renaming columns
amounts = amounts.rename(columns={"Time": "date", "Object": "object_name", "Category": "availability_category_name"})
# checking if time and energy amounts are positive
if (amounts["time_amount"] < 0).any():
raise ValueError("Negative time amounts found in the data")
if (amounts["energy_amount"] < 0).any():
raise ValueError("Negative energy amounts found in the data")
# inserting availability data into the database
logger.info("Inserting availability data into the database")
self.insert(df=amounts, on_conflict="update" if overwrite else "ignore")
logger.info(
f"Availability amounts inserted into the database in {perf_counter() - t0:.2f} seconds. Period {period} and objects {object_names}",
)
del baze
return availability, original_amounts, failed_spes