KPI Resource Values¶
KpiResourceValues(perfdb)
¶
Class used for handling resource KPI values. Can be accessed via perfdb.kpis.resource.values.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(period, object_names=None, resource_types=None)
¶
Deletes resource values from the database.
Parameters:
-
(period¶DateTimeRange) –Period of time to delete the data for.
-
(object_names¶list[str], default:None) –List of object names to delete the data for. By default None
-
(resource_types¶list[str], default:None) –List of resource types to delete the data for. By default None
Source code in echo_postgres/kpi_resource_values.py
@validate_call
def delete(
self,
period: DateTimeRange,
object_names: list[str] | None = None,
resource_types: list[str] | None = None,
) -> None:
"""Deletes resource values from the database.
Parameters
----------
period : DateTimeRange
Period of time to delete the data for.
object_names : list[str], optional
List of object names to delete the data for. By default None
resource_types : list[str], optional
List of resource types to delete the data for. By default None
"""
# validate the input
if resource_types:
rs_ids = self._perfdb.kpis.resource.types.get_ids()
if wrong_rst := set(resource_types) - set(rs_ids):
raise ValueError(f"Could not find the following resource types: {wrong_rst}")
# build the query
query = [
sql.SQL("DELETE FROM performance.resource_values WHERE (date >= {start} AND date <= {end})").format(
start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
if object_names:
# getting object id
obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
if len(obj_ids) != len(object_names):
missing_objs = set(object_names) - set(obj_ids)
raise ValueError(f"Could not find the following objects: {missing_objs}")
query.append(sql.SQL(" AND object_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, obj_ids.values()))))
if resource_types:
rs_ids = {rt: rs_ids[rt] for rt in resource_types}
query.append(sql.SQL(" AND resource_type_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, rs_ids.values()))))
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
# deleting
result = conn.execute(query)
logger.debug(f"Deleted {result.rowcount} rows from resource_values table")
get(period, time_res='daily', aggregation_window=None, object_or_group_names=None, object_group_types=None, resource_types=None, filter_type='and', output_type='DataFrame', values_only=False)
¶
Gets resource values for the desired period and objects.
The most useful keys/columns returned are:
- value
Parameters:
-
(period¶DateTimeRange) –Period of time to get the data for.
-
(time_res¶Literal['daily', 'monthly', 'quarterly', 'yearly'], default:'daily') –Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly"], by default "daily"
-
(aggregation_window¶Literal['mtd', 'ytd', '12m'] | None, default:None) –Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None
-
(object_or_group_names¶list[str], default:None) –List of object or group names to get the data for. By default None
-
(object_group_types¶list[str], default:None) –List of object group types to get the data for. By default None
-
(resource_types¶list[str], default:None) –List of resource types to delete the data for. By default None
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(output_type¶Literal['dict', 'DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
-
(values_only¶bool, default:False) –If set to True, when returning a dict will only return the values, ignoring other attributes like modified_date. Is ignored when output_type is "DataFrame". By default False
Returns:
-
DataFrame–In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex["group_type_name", "object_or_group_name", "resource_type_name", "date"], columns = [resource, modified_date]
-
dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]–In case output_type is "dict", returns a dictionary in the format {group_type_name: {object_or_group_name: {date: {resource_type_name: {attribute: value, ...}, ...}, ...}, ...}
Source code in echo_postgres/kpi_resource_values.py
@validate_call
def get(
self,
period: DateTimeRange,
time_res: Literal["daily", "monthly", "quarterly", "yearly"] = "daily",
aggregation_window: Literal["mtd", "ytd", "12m"] | None = None,
object_or_group_names: list[str] | None = None,
object_group_types: list[str] | None = None,
resource_types: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame"] = "DataFrame",
values_only: bool = False,
) -> DataFrame | dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]:
"""Gets resource values for the desired period and objects.
The most useful keys/columns returned are:
- value
Parameters
----------
period : DateTimeRange
Period of time to get the data for.
time_res : Literal["daily", "monthly", "quarterly", "yearly"], optional
Time resolution of the data. Can be one of ["daily", "monthly", "quarterly", "yearly"], by default "daily"
aggregation_window : Literal["mtd", "ytd", "12m"] | None, optional
Aggregation window to use. Can be one of ["mtd", "ytd", "12m"], by default None
object_or_group_names : list[str], optional
List of object or group names to get the data for. By default None
object_group_types : list[str], optional
List of object group types to get the data for. By default None
resource_types : list[str], optional
List of resource types to delete the data for. By default None
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
values_only : bool, optional
If set to True, when returning a dict will only return the values, ignoring other attributes like modified_date. Is ignored when output_type is "DataFrame". By default False
Returns
-------
DataFrame
In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex["group_type_name", "object_or_group_name", "resource_type_name", "date"], columns = [resource, modified_date]
dict[str, dict[Timestamp, dict[str, dict[str, Any]]]]
In case output_type is "dict", returns a dictionary in the format {group_type_name: {object_or_group_name: {date: {resource_type_name: {attribute: value, ...}, ...}, ...}, ...}
"""
# build the query
query = [
sql.SQL(
"SELECT * FROM performance.{table} WHERE (date >= {start} AND date <= {end})",
).format(
table=sql.Identifier(
f"mv_resource_values_{time_res}{f'_{aggregation_window}' if aggregation_window else ''}",
),
start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
where = []
if object_or_group_names:
where.append(
sql.SQL("object_or_group_name IN ({names})").format(
names=sql.SQL(", ").join(map(sql.Literal, object_or_group_names)),
),
)
if object_group_types:
where.append(
sql.SQL("group_type_name IN ({names})").format(
names=sql.SQL(", ").join(map(sql.Literal, object_group_types)),
),
)
if resource_types:
where.append(
sql.SQL("resource_type_name IN ({points})").format(
points=sql.SQL(", ").join(map(sql.Literal, resource_types)),
),
)
if where:
query.append(sql.SQL(" AND ("))
query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
query.append(sql.SQL(")"))
query.append(sql.SQL(" ORDER BY object_or_group_name, group_type_name, resource_type_name, date"))
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query, post_convert="pyarrow")
# forcing date to be a Timestamp
df["date"] = df["date"].astype("datetime64[s]")
# forcing object_name and object_group_name to be a string
df = df.astype(
{"object_or_group_name": "string[pyarrow]", "group_type_name": "string[pyarrow]", "resource_type_name": "string[pyarrow]"},
)
df = df.astype(
{"object_or_group_id": "int64[pyarrow]", "group_type_id": "int64[pyarrow]", "resource_type_id": "int16[pyarrow]"},
)
df = df.set_index(["group_type_name", "object_or_group_name", "resource_type_name", "date"])
if output_type == "DataFrame":
return df
# dropping id columns not used in dict format
df = df.drop(columns=[col for col in df.columns if col.endswith("_id")])
# converting to Dict
result = df.to_dict(orient="index")
final_result = {}
for (object_group_type_name, object_or_group_name, resource_type_name, date), data in result.items():
if object_group_type_name not in final_result:
final_result[object_group_type_name] = {}
if object_or_group_name not in final_result[object_group_type_name]:
final_result[object_group_type_name][object_or_group_name] = {}
if resource_type_name not in final_result[object_group_type_name][object_or_group_name]:
final_result[object_group_type_name][object_or_group_name][resource_type_name] = {}
if date not in final_result[object_group_type_name][object_or_group_name]:
final_result[object_group_type_name][object_or_group_name][resource_type_name][date] = (
data["value"] if values_only else data
)
return final_result
insert(df, on_conflict='ignore')
¶
Inserts resource values into the database (table resource_values)
Parameters:
-
(df¶DataFrame) –DataFrame with the following columns:
- object_name
- date
- resource_type ('wind_speed', 'solar_irradiance_poa', ...)
- value
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"
Source code in echo_postgres/kpi_resource_values.py
@validate_call
def insert(
self,
df: DataFrame,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
"""Inserts resource values into the database (table resource_values)
Parameters
----------
df : DataFrame
DataFrame with the following columns:
- object_name
- date
- resource_type ('wind_speed', 'solar_irradiance_poa', ...)
- value
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. Can be one of ["ignore", "update"].
By default "ignore"
"""
# checking inputs
required_columns = {"object_name", "date", "resource_type", "value"}
if df.isna().any().any():
raise ValueError("df cannot have NaN values")
if set(df.columns) != required_columns:
additional_cols = set(df.columns) - required_columns
missing_cols = required_columns - set(df.columns)
raise ValueError(
f"df must have the following columns: object_name, date, resource_type, value. Additional columns: {additional_cols}. Missing columns: {missing_cols}",
)
# making a copy of df
df = df.copy()
# getting object id
wanted_objs = df["object_name"].unique().tolist()
obj_ids = self._perfdb.objects.instances.get_ids(object_names=wanted_objs)
if len(obj_ids) != len(wanted_objs):
missing_objs = set(wanted_objs) - set(obj_ids)
raise ValueError(f"Could not find the following objects: {missing_objs}")
df["object_id"] = df["object_name"].map(obj_ids)
# getting resource type id
wanted_resource_types = df["resource_type"].unique().tolist()
rt_ids = self._perfdb.kpis.resource.types.get_ids()
if wrong_rt := set(wanted_resource_types) - set(rt_ids.keys()):
raise ValueError(f"Could not find the following measurement points: {wrong_rt}")
df["resource_type_id"] = df["resource_type"].map(rt_ids)
# removing unwanted columns
df = df.drop(columns=["object_name", "resource_type"])
# converting resource column to float
df["value"] = df["value"].astype("float32")
# checking if there are NaNs in resource column
nan_rows = df[df["value"].isna()].index
if len(nan_rows) > 0:
logger.warning(
f"Found NaN values in value column. Dropping {len(nan_rows)} rows (indexes: {df['date'].loc[nan_rows].tolist()})",
)
df = df[~df.index.isin(nan_rows)].copy()
# inserting data
if_exists_mapping = {
"ignore": "append",
"update": "update",
}
with self._perfdb.conn.reconnect() as conn:
conn.pandas_to_sql(
df=df,
table_name="resource_values",
schema="performance",
if_exists=if_exists_mapping[on_conflict],
ignore_index=True,
)
logger.debug("Resource values inserted into the database")
sync_bazefield(period, object_names=None, resource_types=None, overwrite=False)
¶
Method to get resource KPIs numbers from Bazefield and insert them into the database.
This will save the results in the table "resource_values" of performance_db.
Parameters:
-
(period¶DateTimeRange) –Period to get resource KPIs numbers from Bazefield. Values will be rounded to the nearest day. Its recommended that the start is at 00:00:00 and the end is at 23:59:59.
-
(object_names¶list[str] | None, default:None) –Name of the objects to get the resource values from. If set to None will get all that match the object types allowed in ALLOWED_RESOURCE_OBJECT_MODELS. By default None
-
(resource_types¶list[str] | None, default:None) –List of measurement points to get the availability from. Usually 'wind_speed' or 'solar_irradiance_poa' should be used. By default None
-
(overwrite¶bool, default:False) –If set to True, will overwrite the existing values in the database, by default False
Returns:
-
DataFrame–DataFrame with resource values inserted in the database
Source code in echo_postgres/kpi_resource_values.py
@validate_call
def sync_bazefield(
self,
period: DateTimeRange,
object_names: list[str] | None = None,
resource_types: list[str] | None = None,
overwrite: bool = False,
) -> DataFrame:
"""Method to get resource KPIs numbers from Bazefield and insert them into the database.
This will save the results in the table "resource_values" of performance_db.
Parameters
----------
period : DateTimeRange
Period to get resource KPIs numbers from Bazefield. Values will be rounded to the nearest day.
Its recommended that the start is at 00:00:00 and the end is at 23:59:59.
object_names : list[str] | None, optional
Name of the objects to get the resource values from. If set to None will get all that match the object types allowed in ALLOWED_RESOURCE_OBJECT_MODELS.
By default None
resource_types : list[str] | None, optional
List of measurement points to get the availability from. Usually 'wind_speed' or 'solar_irradiance_poa' should be used. By default None
overwrite : bool, optional
If set to True, will overwrite the existing values in the database, by default False
Returns
-------
DataFrame
DataFrame with resource values inserted in the database
"""
# imported here to avoid circular imports
from echo_meteo.utils import resample_mean
t0 = perf_counter()
# adjusting period to cover the whole day
period = period.copy()
period = period.round(timedelta(days=1), start="floor", end="ceil")
# getting all objects that are allowed to have resource values
allowed_objects = {}
for resource_type, allowed_object_models in ALLOWED_RESOURCE_OBJECT_MODELS.items():
if resource_types and resource_type not in resource_types:
continue
objs = self._perfdb.objects.instances.get_ids(object_models=allowed_object_models)
allowed_objects[resource_type] = list(objs.keys())
# checking if provided object names are valid
if object_names is None:
object_names = allowed_objects
else:
not_found_objs = []
found_objs = {}
for obj in object_names:
found_obj = False
for resource_type, objs in allowed_objects.items():
if obj in objs:
found_obj = True
if resource_type not in found_objs:
found_objs[resource_type] = []
found_objs[resource_type].append(obj)
break
if not found_obj:
not_found_objs.append(obj)
if not_found_objs:
raise ValueError(
f"Could not find the following objects {not_found_objs} considering resource types {list(allowed_objects.keys())}",
)
object_names = found_objs
# getting resource type definitions to get bazefield point
resource_types_def = self._perfdb.kpis.resource.types.get(output_type="dict")
# creating connection to Bazefield
baze = Baze()
# iterating each resource type
for resource_type, objects in object_names.items():
# getting the bazefield point for the resource type
bazefield_point = resource_types_def[resource_type]["bazefield_point"]
# getting values from tag for all objects
wanted_points = {obj: [bazefield_point] for obj in objects}
point_period = period.copy()
point_period.start = point_period.start - timedelta(minutes=10)
point_period.end = point_period.end + timedelta(minutes=10)
# regex to get 5min or 10min from bazefield point
feature_freq = re.findall(r"\d{1,2}min", bazefield_point)
if not feature_freq:
raise ValueError(f"Could not find frequency in {bazefield_point}")
if len(feature_freq) > 1:
raise ValueError(f"Found more than one frequency in {bazefield_point}")
feature_freq = feature_freq[0]
# getting values
values = baze.points.values.series.get(points=wanted_points, reindex=feature_freq, period=point_period)
# filling with zero (only needed as currently we might have missing values at night for irradiance features), we should delete this in the future to avoid these zero values changing the mean incorrectly
values = values.fillna(0.0)
# dropping second level
values = values.droplevel(1, axis=1)
# resampling to day
daily_values = resample_mean(values, "D", min_rr=0.3)
# adjusting values to upload to the database
# melting the DataFrame
values = daily_values.reset_index().melt(id_vars="index", var_name="object_name", value_name="value")
values = values.rename(columns={"index": "date"})
values["resource_type"] = resource_type
# removing outside period
values = values[
(values["date"] >= period.start) & (values["date"] < period.end)
] # < used at end to avoid including the next day at 00:00:00
# checking if any rows have values lower or equal to to 0 (invalid)
wrong_idx = values[values["value"] <= 0].index
if len(wrong_idx) > 0:
logger.warning(
f"Found {len(wrong_idx)} rows with values lower or equal to 0. Dropping these rows \n{values.loc[wrong_idx]}",
)
values = values[~values.index.isin(wrong_idx)].copy()
# inserting resource data into the database
logger.info("Inserting resource values data into the database")
self.insert(df=values, on_conflict="update" if overwrite else "ignore")
logger.info(
f"Resource values for {resource_type} inserted into the database in {perf_counter() - t0:.2f} seconds. Period {period} and objects {objects}",
)
del baze
return values