Skip to content

Feature Values - Aggregations

FeatureValuesAggregations(perfdb)

Class used for handling Aggregations of Feature Values over a period of time. Can be accessed via perfdb.features.values.aggregations.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

get(features, period, time_weight_method='LOCF', integral_time_unit='second', output_type='Series')

Get the aggregated feature values for the given features and period.

Parameters:

  • features

    (dict[str, dict[str, list[NON_RAW_AGGREGATIONS]]]) –

    Dictionary in the format {object_name: {feature_name: [aggregation1, aggregation2, ...]}, ...}.

    The possible aggregation types are:

    • AVG
    • TimeAverage
    • COUNT
    • MAX
    • MIN
    • SUM
    • STDDEV
    • CORR
    • VARIANCE
    • FirstValue
    • LastValue
    • Integral

    Keep in mind that as filtering is resource intensive in the SQL, the result might return more rows than expected. The output will return all combinations of object_name, feature_name that have the requested aggregations, even if for one specific object the features is not requested.

    This is not harmful, but it might lead to confusion if you are not aware of it.

  • period

    (DateTimeRange) –

    The period to get the feature values for.

  • time_weight_method

    (Literal['Linear', 'LOCF'], default: 'LOCF' ) –

    The method to use for time-weighted calculations. Can be one of: - Linear: linear interpolation between values - LOCF: last observation carried forward (the value is carried forward until a new value is observed)

    By default "LOCF".

    Only applicable if aggregation is one of "TimeAverage", "FirstValue", "LastValue", or "Integral".

    More details on how this works can be found in the TimescaleDB documentation (https://www.tigerdata.com/blog/what-time-weighted-averages-are-and-why-you-should-care).

  • integral_time_unit

    (Literal['microsecond', 'millisecond', 'second', 'minute', 'hour'], default: 'second' ) –

    The time unit to use for the integral aggregation. Can be one of: - microsecond - millisecond - second - minute - hour

    This is used when the Integral aggregation is used. It defines the time unit for the integral calculation. The integral will be calculated as the sum of the values multiplied by it's duration in the specified time unit.

    Only applicable if aggregation=="Integral".

    More details on how this works can be found in the TimescaleDB documentation (https://docs.tigerdata.com/api/latest/hyperfunctions/time-weighted-calculations/time_weight/#integral).

    By default "second".

  • output_type

    (Literal['Series', 'pl.DataFrame'], default: 'Series' ) –

    The type of the output. Can be either "Series" (pandas Series) or "pl.DataFrame" (polars DataFrame).

Returns:

  • Series

    Series with the aggregated feature values. The index is a MultiIndex with the object_name, feature_name and aggregation as levels.

  • DataFrame

    DataFrame with the aggregated feature values. The columns are "object_name", "feature_name", "aggregation" and "value".

Source code in echo_postgres/feature_values_aggregations.py
@validate_call
def get(
    self,
    features: dict[str, dict[str, list[NON_RAW_AGGREGATIONS]]],
    period: DateTimeRange,
    time_weight_method: Literal["Linear", "LOCF"] = "LOCF",
    integral_time_unit: Literal["microsecond", "millisecond", "second", "minute", "hour"] = "second",
    output_type: Literal["Series", "pl.DataFrame"] = "Series",
) -> pd.Series | pl.DataFrame:
    """Get the aggregated feature values for the given features and period.

    Parameters
    ----------
    features : dict[str, dict[str, list[NON_RAW_AGGREGATIONS]]]
        Dictionary in the format {object_name: {feature_name: [aggregation1, aggregation2, ...]}, ...}.

        The possible aggregation types are:

        - AVG
        - TimeAverage
        - COUNT
        - MAX
        - MIN
        - SUM
        - STDDEV
        - CORR
        - VARIANCE
        - FirstValue
        - LastValue
        - Integral

        Keep in mind that as filtering is resource intensive in the SQL, the result might return more rows than expected. The output will return all combinations of object_name, feature_name that have the requested aggregations, even if for one specific object the features is not requested.

        This is not harmful, but it might lead to confusion if you are not aware of it.

    period : DateTimeRange
        The period to get the feature values for.
    time_weight_method : Literal["Linear", "LOCF"], optional
        The method to use for time-weighted calculations. Can be one of:
        - Linear: linear interpolation between values
        - LOCF: last observation carried forward (the value is carried forward until a new value is observed)

        By default "LOCF".

        Only applicable if aggregation is one of "TimeAverage", "FirstValue", "LastValue", or "Integral".

        More details on how this works can be found in the TimescaleDB documentation (https://www.tigerdata.com/blog/what-time-weighted-averages-are-and-why-you-should-care).
    integral_time_unit : Literal["microsecond", "millisecond", "second", "minute", "hour"], optional
        The time unit to use for the integral aggregation. Can be one of:
        - microsecond
        - millisecond
        - second
        - minute
        - hour

        This is used when the Integral aggregation is used. It defines the time unit for the integral calculation. The integral will be calculated as the sum of the values multiplied by it's duration in the specified time unit.

        Only applicable if aggregation=="Integral".

        More details on how this works can be found in the TimescaleDB documentation (https://docs.tigerdata.com/api/latest/hyperfunctions/time-weighted-calculations/time_weight/#integral).

        By default "second".
    output_type : Literal["Series", "pl.DataFrame"], optional
        The type of the output. Can be either "Series" (pandas Series) or "pl.DataFrame" (polars DataFrame).

    Returns
    -------
    Series
        Series with the aggregated feature values. The index is a MultiIndex with the object_name, feature_name and aggregation as levels.
    pl.DataFrame
        DataFrame with the aggregated feature values. The columns are "object_name", "feature_name", "aggregation" and "value".
    """
    # getting models of the wanted objects
    objs_def = self._perfdb.objects.instances.get(
        object_names=list(features.keys()),
        output_type="DataFrame",
    )
    # getting feature definitions for the wanted objects
    features_def = self._perfdb.features.definitions.get(
        object_models=objs_def["object_model_name"].unique().tolist(),
        output_type="DataFrame",
    )
    # getting all the wanted aggregations
    all_aggregations = [agg_type for obj_features in features.values() for agg_types in obj_features.values() for agg_type in agg_types]
    all_aggregations = sorted(set(all_aggregations))

    dfs = []
    # iterating aggregations
    for agg in all_aggregations:
        # getting the feature ids and objects for the current aggregation
        obj_ids = []
        feature_ids = []
        for object_name, obj_features in features.items():
            # getting the object id
            obj_id = objs_def.loc[object_name, "id"]
            # getting the object model
            object_model = objs_def.loc[object_name, "object_model_name"]
            for feature_name, agg_types in obj_features.items():
                # checking if the aggregation is in the list of aggregations for the feature
                if agg in agg_types:
                    # getting the feature id
                    feature_id = features_def.loc[
                        pd.IndexSlice[object_model, feature_name],
                        "id",
                    ]
                    obj_ids.append(obj_id)
                    feature_ids.append(feature_id)

        # get unique values of object_ids and feature_ids
        obj_ids = sorted(set(obj_ids))
        feature_ids = sorted(set(feature_ids))

        timescale_aggs = {
            "TimeAverage": "interpolated_average",
            "FirstValue": "first_val",
            "LastValue": "last_val",
            "Integral": "interpolated_integral",
        }

        # initial_agg_query
        match agg:
            case "TimeAverage" | "Integral" | "FirstValue" | "LastValue":
                initial_agg_query = sql.SQL("time_weight({time_weight_method}, fv.timestamp, fv.value) AS tws").format(
                    time_weight_method=sql.Literal(time_weight_method),
                )
            case _:
                initial_agg_query = sql.SQL("{agg}(fv.value) AS value").format(agg=sql.SQL(agg))

        # agg_query
        match agg:
            case "TimeAverage":
                agg_query = sql.SQL("{agg}(fv.tws, {start}, params.agg_interval)").format(
                    agg=sql.SQL(timescale_aggs[agg]),
                    start=sql.Literal(period["start"].strftime("%Y-%m-%d %H:%M:%S")),
                )
            case "Integral":
                agg_query = sql.SQL("{agg}(fv.tws, {start}, params.agg_interval, NULL, NULL, {integral_time_unit})").format(
                    agg=sql.SQL(timescale_aggs[agg]),
                    start=sql.Literal(period["start"].strftime("%Y-%m-%d %H:%M:%S")),
                    integral_time_unit=sql.Literal(integral_time_unit),
                )
            case "FirstValue" | "LastValue":
                agg_query = sql.SQL("{agg}(fv.tws)").format(
                    agg=sql.SQL(timescale_aggs[agg]),
                )
            case _:
                agg_query = sql.SQL("fv.value")

        # defining agg_interval as the number of seconds in the period
        agg_interval = sql.SQL("'{seconds} seconds'::INTERVAL").format(
            seconds=sql.SQL(str(int((period["end"] - period["start"]).total_seconds() + 1))),
        )

        query = sql.SQL(
            """WITH params AS (SELECT {agg_interval} AS agg_interval),
            fv_initial AS (
            -- This CTE creates the time weight summary needed for timescale
            -- It also groups the data by object_id, feature_id
            SELECT
                fv.object_id,
                fv.feature_id,
                {initial_agg_query}
            FROM performance.feature_values fv
            WHERE fv.object_id IN ({object_ids})
            AND fv.feature_id IN ({feature_ids})
            AND timestamp >= {start}
            AND fv.timestamp <= {end}
            GROUP BY fv.object_id, fv.feature_id
            )
            SELECT
                fv.object_id,
                fv.feature_id,
                {agg_query} AS value
            FROM fv_initial fv, params
            ORDER BY fv.object_id, fv.feature_id;
            """,
        ).format(
            agg_interval=agg_interval,
            object_ids=sql.SQL(", ").join(map(sql.Literal, obj_ids)),
            feature_ids=sql.SQL(", ").join(map(sql.Literal, feature_ids)),
            start=sql.Literal(period["start"].strftime("%Y-%m-%d %H:%M:%S")),
            end=sql.Literal(period["end"].strftime("%Y-%m-%d %H:%M:%S")),
            initial_agg_query=initial_agg_query,
            agg_query=agg_query,
        )

        # getting the data
        with self._perfdb.conn.reconnect() as conn:
            # we are using polars for faster processing
            df = conn.read_to_polars(
                query=query,
                schema_overrides={"value": pl.Float64, "object_id": pl.Int64, "feature_id": pl.Int64},
            )

        # adding aggregation column
        df = df.with_columns(
            pl.lit(agg).alias("aggregation"),
        )

        # appending the DataFrame to the list
        dfs.append(df)

    # converting objs_def and features_def to polars DataFrame
    objs_def = pl.from_pandas(objs_def, include_index=True)
    features_def = pl.from_pandas(features_def, include_index=True)

    # concatenating all DataFrames
    df = pl.concat(dfs, how="vertical")
    # replacing object_id and feature_id with object_name and feature_name
    df = df.join(
        objs_def.select(pl.col("id").alias("object_id"), pl.col("name").alias("object_name")),
        on="object_id",
        how="left",
    ).join(
        features_def.select(pl.col("id").alias("feature_id"), pl.col("name").alias("feature_name")),
        on="feature_id",
        how="left",
    )

    #  dropping unwanted columns, reordering columns and sorting
    df = (
        df.drop(["object_id", "feature_id"])
        .select(
            pl.col("object_name"),
            pl.col("feature_name"),
            pl.col("aggregation"),
            pl.col("value"),
        )
        .sort(
            by=["object_name", "feature_name", "aggregation"],
        )
    )

    # returning polars DataFrame if output_type is "pl.DataFrame"
    if output_type == "pl.DataFrame":
        return df

    # converting to pandas DataFrame
    df = df.to_pandas()

    # creating multindex
    df = df.set_index(["object_name", "feature_name", "aggregation"])

    # returning pandas Series if output_type is "Series"
    return df["value"]