Feature Values - Aggregations¶
FeatureValuesAggregations(perfdb)
¶
Class used for handling Aggregations of Feature Values over a period of time. Can be accessed via perfdb.features.values.aggregations.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
get(features, period, time_weight_method='LOCF', integral_time_unit='second', output_type='Series')
¶
Get the aggregated feature values for the given features and period.
Parameters:
-
(features¶dict[str, dict[str, list[NON_RAW_AGGREGATIONS]]]) –Dictionary in the format {object_name: {feature_name: [aggregation1, aggregation2, ...]}, ...}.
The possible aggregation types are:
- AVG
- TimeAverage
- COUNT
- MAX
- MIN
- SUM
- STDDEV
- CORR
- VARIANCE
- FirstValue
- LastValue
- Integral
Keep in mind that as filtering is resource intensive in the SQL, the result might return more rows than expected. The output will return all combinations of object_name, feature_name that have the requested aggregations, even if for one specific object the features is not requested.
This is not harmful, but it might lead to confusion if you are not aware of it.
-
(period¶DateTimeRange) –The period to get the feature values for.
-
(time_weight_method¶Literal['Linear', 'LOCF'], default:'LOCF') –The method to use for time-weighted calculations. Can be one of: - Linear: linear interpolation between values - LOCF: last observation carried forward (the value is carried forward until a new value is observed)
By default "LOCF".
Only applicable if aggregation is one of "TimeAverage", "FirstValue", "LastValue", or "Integral".
More details on how this works can be found in the TimescaleDB documentation (https://www.tigerdata.com/blog/what-time-weighted-averages-are-and-why-you-should-care).
-
(integral_time_unit¶Literal['microsecond', 'millisecond', 'second', 'minute', 'hour'], default:'second') –The time unit to use for the integral aggregation. Can be one of: - microsecond - millisecond - second - minute - hour
This is used when the Integral aggregation is used. It defines the time unit for the integral calculation. The integral will be calculated as the sum of the values multiplied by it's duration in the specified time unit.
Only applicable if aggregation=="Integral".
More details on how this works can be found in the TimescaleDB documentation (https://docs.tigerdata.com/api/latest/hyperfunctions/time-weighted-calculations/time_weight/#integral).
By default "second".
-
(output_type¶Literal['Series', 'pl.DataFrame'], default:'Series') –The type of the output. Can be either "Series" (pandas Series) or "pl.DataFrame" (polars DataFrame).
Returns:
-
Series–Series with the aggregated feature values. The index is a MultiIndex with the object_name, feature_name and aggregation as levels.
-
DataFrame–DataFrame with the aggregated feature values. The columns are "object_name", "feature_name", "aggregation" and "value".
Source code in echo_postgres/feature_values_aggregations.py
@validate_call
def get(
self,
features: dict[str, dict[str, list[NON_RAW_AGGREGATIONS]]],
period: DateTimeRange,
time_weight_method: Literal["Linear", "LOCF"] = "LOCF",
integral_time_unit: Literal["microsecond", "millisecond", "second", "minute", "hour"] = "second",
output_type: Literal["Series", "pl.DataFrame"] = "Series",
) -> pd.Series | pl.DataFrame:
"""Get the aggregated feature values for the given features and period.
Parameters
----------
features : dict[str, dict[str, list[NON_RAW_AGGREGATIONS]]]
Dictionary in the format {object_name: {feature_name: [aggregation1, aggregation2, ...]}, ...}.
The possible aggregation types are:
- AVG
- TimeAverage
- COUNT
- MAX
- MIN
- SUM
- STDDEV
- CORR
- VARIANCE
- FirstValue
- LastValue
- Integral
Keep in mind that as filtering is resource intensive in the SQL, the result might return more rows than expected. The output will return all combinations of object_name, feature_name that have the requested aggregations, even if for one specific object the features is not requested.
This is not harmful, but it might lead to confusion if you are not aware of it.
period : DateTimeRange
The period to get the feature values for.
time_weight_method : Literal["Linear", "LOCF"], optional
The method to use for time-weighted calculations. Can be one of:
- Linear: linear interpolation between values
- LOCF: last observation carried forward (the value is carried forward until a new value is observed)
By default "LOCF".
Only applicable if aggregation is one of "TimeAverage", "FirstValue", "LastValue", or "Integral".
More details on how this works can be found in the TimescaleDB documentation (https://www.tigerdata.com/blog/what-time-weighted-averages-are-and-why-you-should-care).
integral_time_unit : Literal["microsecond", "millisecond", "second", "minute", "hour"], optional
The time unit to use for the integral aggregation. Can be one of:
- microsecond
- millisecond
- second
- minute
- hour
This is used when the Integral aggregation is used. It defines the time unit for the integral calculation. The integral will be calculated as the sum of the values multiplied by it's duration in the specified time unit.
Only applicable if aggregation=="Integral".
More details on how this works can be found in the TimescaleDB documentation (https://docs.tigerdata.com/api/latest/hyperfunctions/time-weighted-calculations/time_weight/#integral).
By default "second".
output_type : Literal["Series", "pl.DataFrame"], optional
The type of the output. Can be either "Series" (pandas Series) or "pl.DataFrame" (polars DataFrame).
Returns
-------
Series
Series with the aggregated feature values. The index is a MultiIndex with the object_name, feature_name and aggregation as levels.
pl.DataFrame
DataFrame with the aggregated feature values. The columns are "object_name", "feature_name", "aggregation" and "value".
"""
# getting models of the wanted objects
objs_def = self._perfdb.objects.instances.get(
object_names=list(features.keys()),
output_type="DataFrame",
)
# getting feature definitions for the wanted objects
features_def = self._perfdb.features.definitions.get(
object_models=objs_def["object_model_name"].unique().tolist(),
output_type="DataFrame",
)
# getting all the wanted aggregations
all_aggregations = [agg_type for obj_features in features.values() for agg_types in obj_features.values() for agg_type in agg_types]
all_aggregations = sorted(set(all_aggregations))
dfs = []
# iterating aggregations
for agg in all_aggregations:
# getting the feature ids and objects for the current aggregation
obj_ids = []
feature_ids = []
for object_name, obj_features in features.items():
# getting the object id
obj_id = objs_def.loc[object_name, "id"]
# getting the object model
object_model = objs_def.loc[object_name, "object_model_name"]
for feature_name, agg_types in obj_features.items():
# checking if the aggregation is in the list of aggregations for the feature
if agg in agg_types:
# getting the feature id
feature_id = features_def.loc[
pd.IndexSlice[object_model, feature_name],
"id",
]
obj_ids.append(obj_id)
feature_ids.append(feature_id)
# get unique values of object_ids and feature_ids
obj_ids = sorted(set(obj_ids))
feature_ids = sorted(set(feature_ids))
timescale_aggs = {
"TimeAverage": "interpolated_average",
"FirstValue": "first_val",
"LastValue": "last_val",
"Integral": "interpolated_integral",
}
# initial_agg_query
match agg:
case "TimeAverage" | "Integral" | "FirstValue" | "LastValue":
initial_agg_query = sql.SQL("time_weight({time_weight_method}, fv.timestamp, fv.value) AS tws").format(
time_weight_method=sql.Literal(time_weight_method),
)
case _:
initial_agg_query = sql.SQL("{agg}(fv.value) AS value").format(agg=sql.SQL(agg))
# agg_query
match agg:
case "TimeAverage":
agg_query = sql.SQL("{agg}(fv.tws, {start}, params.agg_interval)").format(
agg=sql.SQL(timescale_aggs[agg]),
start=sql.Literal(period["start"].strftime("%Y-%m-%d %H:%M:%S")),
)
case "Integral":
agg_query = sql.SQL("{agg}(fv.tws, {start}, params.agg_interval, NULL, NULL, {integral_time_unit})").format(
agg=sql.SQL(timescale_aggs[agg]),
start=sql.Literal(period["start"].strftime("%Y-%m-%d %H:%M:%S")),
integral_time_unit=sql.Literal(integral_time_unit),
)
case "FirstValue" | "LastValue":
agg_query = sql.SQL("{agg}(fv.tws)").format(
agg=sql.SQL(timescale_aggs[agg]),
)
case _:
agg_query = sql.SQL("fv.value")
# defining agg_interval as the number of seconds in the period
agg_interval = sql.SQL("'{seconds} seconds'::INTERVAL").format(
seconds=sql.SQL(str(int((period["end"] - period["start"]).total_seconds() + 1))),
)
query = sql.SQL(
"""WITH params AS (SELECT {agg_interval} AS agg_interval),
fv_initial AS (
-- This CTE creates the time weight summary needed for timescale
-- It also groups the data by object_id, feature_id
SELECT
fv.object_id,
fv.feature_id,
{initial_agg_query}
FROM performance.feature_values fv
WHERE fv.object_id IN ({object_ids})
AND fv.feature_id IN ({feature_ids})
AND timestamp >= {start}
AND fv.timestamp <= {end}
GROUP BY fv.object_id, fv.feature_id
)
SELECT
fv.object_id,
fv.feature_id,
{agg_query} AS value
FROM fv_initial fv, params
ORDER BY fv.object_id, fv.feature_id;
""",
).format(
agg_interval=agg_interval,
object_ids=sql.SQL(", ").join(map(sql.Literal, obj_ids)),
feature_ids=sql.SQL(", ").join(map(sql.Literal, feature_ids)),
start=sql.Literal(period["start"].strftime("%Y-%m-%d %H:%M:%S")),
end=sql.Literal(period["end"].strftime("%Y-%m-%d %H:%M:%S")),
initial_agg_query=initial_agg_query,
agg_query=agg_query,
)
# getting the data
with self._perfdb.conn.reconnect() as conn:
# we are using polars for faster processing
df = conn.read_to_polars(
query=query,
schema_overrides={"value": pl.Float64, "object_id": pl.Int64, "feature_id": pl.Int64},
)
# adding aggregation column
df = df.with_columns(
pl.lit(agg).alias("aggregation"),
)
# appending the DataFrame to the list
dfs.append(df)
# converting objs_def and features_def to polars DataFrame
objs_def = pl.from_pandas(objs_def, include_index=True)
features_def = pl.from_pandas(features_def, include_index=True)
# concatenating all DataFrames
df = pl.concat(dfs, how="vertical")
# replacing object_id and feature_id with object_name and feature_name
df = df.join(
objs_def.select(pl.col("id").alias("object_id"), pl.col("name").alias("object_name")),
on="object_id",
how="left",
).join(
features_def.select(pl.col("id").alias("feature_id"), pl.col("name").alias("feature_name")),
on="feature_id",
how="left",
)
# dropping unwanted columns, reordering columns and sorting
df = (
df.drop(["object_id", "feature_id"])
.select(
pl.col("object_name"),
pl.col("feature_name"),
pl.col("aggregation"),
pl.col("value"),
)
.sort(
by=["object_name", "feature_name", "aggregation"],
)
)
# returning polars DataFrame if output_type is "pl.DataFrame"
if output_type == "pl.DataFrame":
return df
# converting to pandas DataFrame
df = df.to_pandas()
# creating multindex
df = df.set_index(["object_name", "feature_name", "aggregation"])
# returning pandas Series if output_type is "Series"
return df["value"]