Skip to content

Feature Values - Series

FeatureValuesSeries(perfdb)

Class used for handling Feature Values Series. Can be accessed via perfdb.features.values.series.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(features, period, bazefield_delete=False)

Delete the feature values for the given features and period.

The deletion will not remove the rows, only set the values to NULL to avoid deleting data from other features.

Parameters:

  • features

    (dict[str, list[str]]) –

    Dictionary in the format {object_name: [feature_name, ...], ...}.

  • period

    (DateTimeRange) –

    The period to delete the feature values for.

  • bazefield_delete

    (bool, default: False ) –

    Whether to delete the data from Bazefield, by default False.

Source code in echo_postgres/feature_values_series.py
@validate_call
def delete(
    self,
    features: dict[str, list[str]],
    period: DateTimeRange,
    bazefield_delete: bool = False,
) -> None:
    """Delete the feature values for the given features and period.

    The deletion will not remove the rows, only set the values to NULL to avoid deleting data from other features.

    Parameters
    ----------
    features : dict[str, list[str]]
        Dictionary in the format {object_name: [feature_name, ...], ...}.
    period : DateTimeRange
        The period to delete the feature values for.
    bazefield_delete : bool, optional
        Whether to delete the data from Bazefield, by default False.

    """
    # getting object ids
    objects_def = self._perfdb.objects.instances.get(
        object_names=list(features.keys()),
    )
    object_models = sorted(obj["object_model_name"] for obj in objects_def.values())

    # getting feature ids
    features_def = self._perfdb.features.definitions.get(object_models=object_models, output_type="DataFrame")

    # iterating over objects
    for object_name, feature_names in features.items():
        obj_id = objects_def[object_name]["id"]
        obj_model_name = objects_def[object_name]["object_model_name"]

        feature_ids = features_def.loc[pd.IndexSlice[obj_model_name, feature_names], :]["id"].tolist()

        # writing the query
        query = sql.SQL(
            """DELETE FROM performance.feature_values WHERE "object_id" = {object_id} AND "feature_id" IN ({feature_ids}) AND "timestamp" >= {start} AND "timestamp" <= {end};""",
        ).format(
            object_id=sql.Literal(obj_id),
            feature_ids=sql.SQL(", ").join(sql.Literal(fid) for fid in feature_ids),
            start=sql.Literal(period.start.strftime("%Y-%m-%d %H:%M:%S")),
            end=sql.Literal(period.end.strftime("%Y-%m-%d %H:%M:%S")),
        )

        # executing the query
        with self._perfdb.conn.reconnect() as conn:
            result = conn.execute(query)

        logger.debug(f"{object_name} - Deleted {result.rowcount} rows from performance.feature_values")

    # deleting from Bazefield
    if bazefield_delete:
        # getting objects to check if they exist in Bazefield
        objs = self._perfdb.objects.instances.get(
            object_names=list(features.keys()),
            output_type="DataFrame",
            get_attributes=True,
            attribute_names=["exists_in_bazefield"],
        )
        # adding column "exists_in_bazefield" if it does not exist
        if "exists_in_bazefield" not in objs.columns:
            objs["exists_in_bazefield"] = True

        # connecting to Bazefield
        try:
            baze = Baze()
            for object_name, feature_names in features.items():
                # checking if the object exists in Bazefield
                if not objs.loc[object_name, "exists_in_bazefield"]:
                    logger.debug(f"{object_name} - not deleted from Bazefield because exists_in_bazefield attribute is False")
                    continue
                baze.points.values.series.delete(points={object_name: feature_names}, period=period)
        except Exception:
            logger.exception("Error deleting from Bazefield")

get(features, period, aggregation='Raw', aggregation_interval=None, aggregation_convention=None, timestamp_res=None, time_weight_method='LOCF', integral_time_unit='second', reindex=None, output_type='DataFrame')

Get the feature values for the given features and period.

Parameters:

  • features

    (dict[str, list[str]]) –

    Dictionary in the format {object_name: [feature_name, ...], ...}.

  • period

    (DateTimeRange) –

    The period to get the feature values for.

  • aggregation

    (SUPPORTED_AGGREGATIONS | list[SUPPORTED_AGGREGATIONS], default: 'Raw' ) –

    Type of aggregation to be used. Can be one of:

    • Raw
    • AVG
    • TimeAverage
    • COUNT
    • MAX
    • MIN
    • SUM
    • STDDEV
    • CORR
    • VARIANCE
    • FirstValue
    • LastValue
    • Integral

    A list of aggregations can be passed to get multiple aggregations at once.

    This is preferred to resampling after getting Raw values as it offloads the aggregation to the database, which will avoid transferring large amounts of raw data over the network when only a few aggregations are needed.

    Some of the methods are based on timescale functions. If you need more information about them, please refer to the TimescaleDB documentation (https://docs.tigerdata.com/api/latest/hyperfunctions/time-weighted-calculations/time_weight).

    By default "Raw"

  • aggregation_interval

    (timedelta | None, default: None ) –

    Length of each timestamp. Not necessary if aggregation=="Raw". By default None

  • aggregation_convention

    (Literal['left', 'right'], default: None ) –

    The convention to use for the aggregation. Can be either "left" or "right".

    Left assumes that the timestamp is the start of the interval, while right assumes that the timestamp is the end of the interval.

    To give an example, considering 10 minute intervals aggregated to 30 minutes: - left: 00:00, 00:10, 00:20 will be aggregated to 00:00; 00:30, 00:40, 00:50 will be aggregated to 00:30 - right: 23:40, 23:50, 00:00 will be aggregated to 00:00; 00:10, 00:20, 00:30 will be aggregated to 00:30

    Not necessary if aggregation=="Raw".

    Usually you should set this to "right" as most SCADA systems use this convention.

  • timestamp_res

    (timedelta | None, default: None ) –

    The resolution of the timestamps of the requested data. This is used in case aggregation is not "Raw" and aggregation_convention is "right". It will be used to shift the timestamps before doing the aggregation so they are in the equivalent to the "left" convention, which is the expected when doing the aggregation.

    Usually this should be 5 or 10 minutes.

    Considering this argument, when doing aggregations, it's not recommended to query at the same time features where timestamp resolution is different, as it will lead to unexpected results.

  • time_weight_method

    (Literal['Linear', 'LOCF'], default: 'LOCF' ) –

    The method to use for time-weighted calculations. Can be one of: - Linear: linear interpolation between values - LOCF: last observation carried forward (the value is carried forward until a new value is observed)

    By default "LOCF".

    Only applicable if aggregation is one of "TimeAverage", "FirstValue", "LastValue", or "Integral".

    More details on how this works can be found in the TimescaleDB documentation (https://www.tigerdata.com/blog/what-time-weighted-averages-are-and-why-you-should-care).

  • integral_time_unit

    (Literal['microsecond', 'millisecond', 'second', 'minute', 'hour'], default: 'second' ) –

    The time unit to use for the integral aggregation. Can be one of: - microsecond - millisecond - second - minute - hour

    This is used when the Integral aggregation is used. It defines the time unit for the integral calculation. The integral will be calculated as the sum of the values multiplied by it's duration in the specified time unit.

    Only applicable if aggregation=="Integral".

    More details on how this works can be found in the TimescaleDB documentation (https://docs.tigerdata.com/api/latest/hyperfunctions/time-weighted-calculations/time_weight/#integral).

    By default "second".

  • reindex

    (str | None, default: None ) –

    String containing timedelta the should be considered to reindex the DataFrame (ex: "10min", "5min", etc.).

    This does not resample the DataFrame! It is only used to get all the timestamps if some are missing from the data (the created timestamps will be set to NaN). The operation is done after getting the data from the database.

    If set to "infer" the timedelta will be inferred from the acquired data from the database. If set to None no reindex will be done.

    By default None

  • output_type

    (Literal['DataFrame', 'pl.DataFrame'], default: 'DataFrame' ) –

    The type of the output DataFrame. Can be either "DataFrame" (pandas DataFrame) or "pl.DataFrame" (polars DataFrame).

Returns:

  • DataFrame

    In case output_type="DataFrame", a Pandas DataFrame with the feature values.

    If just one aggregation is requested, timestamp will be the index and columns will be MultiIndex with levels ["object_name", "feature_name"].

    If multiple aggregations are requested, timestamp will be the index and columns will be MultiIndex with levels ["object_name", "feature_name", "aggregation"].

  • DataFrame

    In case output_type="pl.DataFrame", a Polars DataFrame with the feature values. It will have a column "timestamp" with the timestamp.

    If just one aggregation is requested, the columns will be in the format "object_name@feature_name".

    If multiple aggregations are requested, the columns will be in the format "object_name@feature_name#aggregation".

Source code in echo_postgres/feature_values_series.py
@validate_call
def get(
    self,
    features: dict[str, list[str]],
    period: DateTimeRange,
    aggregation: SUPPORTED_AGGREGATIONS | list[SUPPORTED_AGGREGATIONS] = "Raw",
    aggregation_interval: timedelta | None = None,
    aggregation_convention: Literal["left", "right"] | None = None,
    timestamp_res: timedelta | None = None,
    time_weight_method: Literal["Linear", "LOCF"] = "LOCF",
    integral_time_unit: Literal["microsecond", "millisecond", "second", "minute", "hour"] = "second",
    reindex: str | None = None,
    output_type: Literal["DataFrame", "pl.DataFrame"] = "DataFrame",
) -> pd.DataFrame | pl.DataFrame:
    """Get the feature values for the given features and period.

    Parameters
    ----------
    features : dict[str, list[str]]
        Dictionary in the format {object_name: [feature_name, ...], ...}.
    period : DateTimeRange
        The period to get the feature values for.
    aggregation : SUPPORTED_AGGREGATIONS | list[SUPPORTED_AGGREGATIONS], optional
        Type of aggregation to be used. Can be one of:

        - Raw
        - AVG
        - TimeAverage
        - COUNT
        - MAX
        - MIN
        - SUM
        - STDDEV
        - CORR
        - VARIANCE
        - FirstValue
        - LastValue
        - Integral

        A list of aggregations can be passed to get multiple aggregations at once.

        This is preferred to resampling after getting Raw values as it offloads the aggregation to the database, which will avoid transferring large amounts of raw data over the network when only a few aggregations are needed.

        Some of the methods are based on timescale functions. If you need more information about them, please refer to the TimescaleDB documentation (https://docs.tigerdata.com/api/latest/hyperfunctions/time-weighted-calculations/time_weight).

        By default "Raw"
    aggregation_interval : timedelta | None, optional
        Length of each timestamp. Not necessary if aggregation=="Raw". By default None
    aggregation_convention : Literal["left", "right"]
        The convention to use for the aggregation. Can be either "left" or "right".

        Left assumes that the timestamp is the start of the interval, while right assumes that the timestamp is the end of the interval.

        To give an example, considering 10 minute intervals aggregated to 30 minutes:
        - left: 00:00, 00:10, 00:20 will be aggregated to 00:00; 00:30, 00:40, 00:50 will be aggregated to 00:30
        - right: 23:40, 23:50, 00:00 will be aggregated to 00:00; 00:10, 00:20, 00:30 will be aggregated to 00:30

        Not necessary if aggregation=="Raw".

        Usually you should set this to "right" as most SCADA systems use this convention.
    timestamp_res : timedelta | None, optional
        The resolution of the timestamps of the requested data. This is used in case aggregation is not "Raw" and aggregation_convention is "right". It will be used to shift the timestamps before doing the aggregation so they are in the equivalent to the "left" convention, which is the expected when doing the aggregation.

        Usually this should be 5 or 10 minutes.

        Considering this argument, when doing aggregations, it's not recommended to query at the same time features where timestamp resolution is different, as it will lead to unexpected results.
    time_weight_method : Literal["Linear", "LOCF"], optional
        The method to use for time-weighted calculations. Can be one of:
        - Linear: linear interpolation between values
        - LOCF: last observation carried forward (the value is carried forward until a new value is observed)

        By default "LOCF".

        Only applicable if aggregation is one of "TimeAverage", "FirstValue", "LastValue", or "Integral".

        More details on how this works can be found in the TimescaleDB documentation (https://www.tigerdata.com/blog/what-time-weighted-averages-are-and-why-you-should-care).
    integral_time_unit : Literal["microsecond", "millisecond", "second", "minute", "hour"], optional
        The time unit to use for the integral aggregation. Can be one of:
        - microsecond
        - millisecond
        - second
        - minute
        - hour

        This is used when the Integral aggregation is used. It defines the time unit for the integral calculation. The integral will be calculated as the sum of the values multiplied by it's duration in the specified time unit.

        Only applicable if aggregation=="Integral".

        More details on how this works can be found in the TimescaleDB documentation (https://docs.tigerdata.com/api/latest/hyperfunctions/time-weighted-calculations/time_weight/#integral).

        By default "second".
    reindex : str | None, optional
        String containing timedelta the should be considered to reindex the DataFrame (ex: "10min", "5min", etc.).

        This does not resample the DataFrame! It is only used to get all the timestamps if some are missing from the data (the created timestamps will be set to NaN). The operation is done after getting the data from the database.

        If set to "infer" the timedelta will be inferred from the acquired data from the database.
        If set to None no reindex will be done.

        By default None
    output_type : Literal["DataFrame", "pl.DataFrame"], optional
        The type of the output DataFrame. Can be either "DataFrame" (pandas DataFrame) or "pl.DataFrame" (polars DataFrame).

    Returns
    -------
    pd.DataFrame
        In case output_type="DataFrame", a Pandas DataFrame with the feature values.

        If just one aggregation is requested, timestamp will be the index and columns will be MultiIndex with levels ["object_name", "feature_name"].

        If multiple aggregations are requested, timestamp will be the index and columns will be MultiIndex with levels ["object_name", "feature_name", "aggregation"].
    pl.DataFrame
        In case output_type="pl.DataFrame", a Polars DataFrame with the feature values. It will have a column "timestamp" with the timestamp.

        If just one aggregation is requested, the columns will be in the format "object_name@feature_name".

        If multiple aggregations are requested, the columns will be in the format "object_name@feature_name#aggregation".
    """
    # getting models of the wanted objects
    objs_def = self._perfdb.objects.instances.get(
        object_names=list(features.keys()),
        output_type="DataFrame",
    )
    # getting feature definitions for the wanted objects
    features_def = self._perfdb.features.definitions.get(
        object_models=objs_def["object_model_name"].unique().tolist(),
        output_type="DataFrame",
    )
    # checking if all features exist in the database
    where_clauses = []
    for object_name, feature_names in features.items():
        # getting the object id
        object_id = objs_def.loc[object_name, "id"]
        # getting the model of the object
        object_model = objs_def.loc[object_name, "object_model_name"]

        # getting the features for the object
        obj_features = features_def.loc[pd.IndexSlice[object_model, :], :].index.get_level_values("name").unique().tolist()

        missing_features = set(feature_names).difference(obj_features)
        if missing_features:
            raise ValueError(
                f"Features {missing_features} do not exist in the database for object {object_name} of model {object_model}",
            )

        # adding the where clause for the object and features
        obj_where = sql.SQL(
            "(object_id = {object_id} AND feature_id IN ({feature_ids}))",
        ).format(
            object_id=sql.Literal(object_id),
            feature_ids=sql.SQL(", ").join(
                sql.Literal(fid) for fid in features_def.loc[pd.IndexSlice[object_model, feature_names], :]["id"].tolist()
            ),
        )
        where_clauses.append(obj_where)

    # converting objs_def and features_def to polars DataFrame
    objs_def = pl.from_pandas(objs_def, include_index=True)
    features_def = pl.from_pandas(features_def, include_index=True)

    if isinstance(aggregation, str):
        aggregation = [aggregation]

    # in case we are doing aggregations we will adjust query period +- the aggregation_interval to make sure the calculated values consider the whole interval
    if aggregation != ["Raw"]:
        query_period = period.copy()
        query_period.start -= aggregation_interval
        query_period.end += aggregation_interval
    else:
        query_period = period

    # creating agg_interval in SQL format if anything other than "Raw" is requested
    if aggregation != ["Raw"]:
        # make sure aggregation_interval is set
        if aggregation_interval is None:
            raise ValueError("aggregation_interval must be set if aggregation is not 'Raw'")

        aggregation_interval = sql.SQL(
            "'{agg_seconds} seconds'::INTERVAL",
        ).format(agg_seconds=sql.SQL(str(int(aggregation_interval.total_seconds()))))

        # making sure aggregation_convention is set
        if aggregation_convention is None:
            raise ValueError(
                "aggregation_convention must be set if aggregation is not 'Raw'. It can be either 'left' or 'right'.",
            )

    # checking if timestamp_res is needed
    if timestamp_res is None and aggregation != ["Raw"] and aggregation_convention == "right":
        raise ValueError(
            "timestamp_res must be set if aggregation is not 'Raw' and aggregation_convention is 'right'.",
        )

    # converting timestamp_res to SQL format if needed
    if timestamp_res is None:
        timestamp_res = timedelta(seconds=1)
    timestamp_res = sql.SQL(
        "'{timestamp_res_seconds} seconds'::INTERVAL",
    ).format(timestamp_res_seconds=sql.SQL(str(int(timestamp_res.total_seconds()))))

    dfs = []
    for agg in aggregation:
        if agg == "Raw":
            # writing the query
            # we are only getting the columns: object_id, feature_id, timestamp, value to avoid fetching unnecessary data multiple times
            query = sql.SQL(
                "SELECT object_id, feature_id, timestamp::TIMESTAMP, value FROM performance.feature_values WHERE timestamp >= {start} AND timestamp <= {end} AND ({where_clauses}) ",
            ).format(
                start=sql.Literal(query_period.start.strftime("%Y-%m-%d %H:%M:%S")),
                end=sql.Literal(query_period.end.strftime("%Y-%m-%d %H:%M:%S")),
                where_clauses=sql.SQL(" OR ").join(where_clauses),
            )
        else:
            # in case of aggregation we need to group by object_id, feature_id and timestamp

            timescale_aggs = {
                "TimeAverage": "interpolated_average",
                "FirstValue": "first_val",
                "LastValue": "last_val",
                "Integral": "interpolated_integral",
            }

            # bucket_agg_query
            match agg:
                case "TimeAverage" | "Integral" | "FirstValue" | "LastValue":
                    bucket_agg_query = sql.SQL("time_weight({time_weight_method}, fv.timestamp, fv.value) AS tws").format(
                        time_weight_method=sql.Literal(time_weight_method),
                    )
                case _:
                    bucket_agg_query = sql.SQL("{agg}(fv.value) AS value").format(agg=sql.SQL(agg))

            # agg_query
            match agg:
                case "TimeAverage":
                    agg_query = sql.SQL("{agg}(fv.tws, fv.time_bucket, params.agg_interval)").format(
                        agg=sql.SQL(timescale_aggs[agg]),
                    )
                case "Integral":
                    agg_query = sql.SQL("{agg}(fv.tws, fv.time_bucket, params.agg_interval, NULL, NULL, {integral_time_unit})").format(
                        agg=sql.SQL(timescale_aggs[agg]),
                        integral_time_unit=sql.Literal(integral_time_unit),
                    )
                case "FirstValue" | "LastValue":
                    agg_query = sql.SQL("{agg}(fv.tws)").format(
                        agg=sql.SQL(timescale_aggs[agg]),
                    )
                case _:
                    agg_query = sql.SQL("fv.value")

            # writing the query
            query = sql.SQL(
                """WITH params AS (SELECT {agg_interval} AS agg_interval, {timestamp_res} AS timestamp_res),
                fv_adj AS (
                -- This CTE adjusts the timestamps to the aggregation convention
                -- If aggregation_convention is "right", we adjust the timestamps to the left convention
                SELECT
                    fv.object_id,
                    fv.feature_id,
                    fv.timestamp {timestamp_adj} AS "timestamp",
                    value
                FROM performance.feature_values fv, params
                WHERE fv.timestamp >= {start} AND fv.timestamp <= {end} AND ({where_clauses})
                ),
                fv_bucketed AS (
                -- This CTE buckets the adjusted timestamps into intervals of agg_interval
                -- It also applies the time_weight function if needed
                -- In case no timescale function is used, it will just group by the time_bucket
                SELECT
                    fv.object_id,
                    fv.feature_id,
                    time_bucket(params.agg_interval, fv.timestamp) AS time_bucket,
                    {bucket_agg_query}
                FROM fv_adj fv, params
                GROUP BY fv.object_id, fv.feature_id, time_bucket
                )
                SELECT
                    fv.object_id,
                    fv.feature_id,
                    fv.time_bucket::TIMESTAMP AS "timestamp",
                    {agg_query} AS value
                FROM fv_bucketed fv, params
                ORDER BY fv.object_id, fv.feature_id, fv.time_bucket;
                """,
            ).format(
                agg_interval=aggregation_interval,
                timestamp_res=timestamp_res,
                timestamp_adj=sql.SQL("+ params.agg_interval - params.timestamp_res")
                if aggregation_convention == "right"
                else sql.SQL(""),
                start=sql.Literal(query_period.start.strftime("%Y-%m-%d %H:%M:%S")),
                end=sql.Literal(query_period.end.strftime("%Y-%m-%d %H:%M:%S")),
                where_clauses=sql.SQL(" OR ").join(where_clauses),
                bucket_agg_query=bucket_agg_query,
                agg_query=agg_query,
            )

        # getting the data
        with self._perfdb.conn.reconnect() as conn:
            # we are using polars for faster processing
            df = conn.read_to_polars(
                query=query,
                schema_overrides={"timestamp": pl.Datetime("ms"), "value": pl.Float64, "object_id": pl.Int64, "feature_id": pl.Int64},
            )

        # adding the aggregation type to the DataFrame
        df = df.with_columns(pl.lit(agg, dtype=pl.Utf8).alias("aggregation"))

        dfs.append(df)

    # concatenating all DataFrames
    df = pl.concat(dfs, how="vertical")

    # replacing object_id and feature_id with object_name and feature_name
    df = df.join(
        objs_def.select(pl.col("id").alias("object_id"), pl.col("name").alias("object_name")),
        on="object_id",
        how="left",
    ).join(
        features_def.select(pl.col("id").alias("feature_id"), pl.col("name").alias("feature_name")),
        on="feature_id",
        how="left",
    )

    # pivoting the DataFrame to have object_feature as columns (timestamp will also be a column) and value as values
    df = df.with_columns(pl.format("{}@{}#{}", "object_name", "feature_name", "aggregation").alias("pivot_key")).pivot(
        on=["pivot_key"],
        index="timestamp",
        values="value",
        aggregate_function=None,
        sort_columns=False,
    )
    # sort the index to have timestamps in ascending order
    df = df.sort("timestamp")

    # adding columns for features that did not have any values in the period
    wanted_cols = [f"{obj}@{feat}" for obj, feat_list in features.items() for feat in feat_list]
    # adding aggregation type to the columns
    wanted_cols = [f"{col}#{agg}" for col in wanted_cols for agg in aggregation]
    missing_columns = set(wanted_cols) - set(df.columns)
    if missing_columns:
        # adding missing columns with NaN values
        for col in missing_columns:
            df = df.with_columns(pl.lit(None, dtype=pl.Float64).alias(col))

    # reindexing
    if reindex is not None:
        final_reindex = copy(reindex)

        # reindexing DataFrame to have all timestamps if desired
        if final_reindex is not None:
            # inferring frequency if reindex is "infer"
            if final_reindex == "infer" and len(df) > 3:
                final_reindex = get_index_freq(df["timestamp"])
            elif final_reindex == "infer":
                logger.debug("Cannot infer frequency from data because it has less than 3 timestamps. Using '10min' as default.")
                final_reindex = "10min"

            # if still failed, lets raise an error
            if final_reindex is None:
                raise RuntimeError("Cannot infer frequency from data. Please set 'reindex' argument manually.")

            # converting final_reindex from string to timedelta
            if isinstance(final_reindex, str):
                final_reindex = pd.to_timedelta(final_reindex).to_pytimedelta()

            timestamp_start = period["start"].replace(minute=0, second=0, hour=0)

            final_reindex = pl.datetime_range(
                start=timestamp_start,
                end=period["end"],
                interval=final_reindex,
                closed="both",
                eager=True,
                time_unit="ms",
            )

            # reindexing the DataFrame
            df = df.join(
                pl.DataFrame({"final_timestamp": final_reindex}),
                left_on="timestamp",
                right_on="final_timestamp",
                how="full",
            )
            # dropping the old timestamp column and renaming the new one
            df = df.drop("timestamp").rename({"final_timestamp": "timestamp"})
            # sorting the DataFrame by timestamp
            df = df.sort("timestamp")

    # restricting to desired period only
    df = df.filter((pl.col("timestamp") >= period["start"]) & (pl.col("timestamp") <= period["end"]))

    # sorting columns and leaving timestamp as the first column
    df = df.select(["timestamp", *sorted(set(df.columns).difference({"timestamp"}))])

    # removing the aggregation if just one aggregation is requested
    if len(aggregation) == 1:
        # removing the aggregation from the column names
        df = df.rename({col: col.split("#")[0] for col in df.columns if "#" in col})

    # returning the DataFrame in the requested format
    if output_type == "pl.DataFrame":
        return df

    # converting to pandas DataFrame
    df = df.to_pandas(use_pyarrow_extension_array=True)

    # setting the index to timestamp
    df = df.set_index("timestamp")

    if len(aggregation) == 1:
        # converting column names to tuples of (object_name, feature_name) to later convert to MultiIndex
        df.columns = pd.MultiIndex.from_tuples(
            [tuple(col.split("@")) for col in df.columns],
            names=["object_name", "feature_name"],
        )
    else:
        # converting column names to tuples of (object_name, feature_name, aggregation) to later convert to MultiIndex
        col_names = [(*tuple(col.split("@")), col.split("#")[1]) for col in df.columns]
        # removing #AGG from feature level
        col_names = [(obj, feat.split("#")[0], agg) for obj, feat, agg in col_names]
        df.columns = pd.MultiIndex.from_tuples(
            col_names,
            names=["object_name", "feature_name", "aggregation"],
        )

    # converting index to DatetimeIndex
    df.index = df.index.astype("datetime64[s]")

    return df

insert(df, on_conflict='update', bazefield_upload=True)

Insert the feature values into the database.

Parameters:

  • df

    (DataFrame | DataFrame) –

    DataFrame with the feature values.

    In case it is a Pandas DataFrame: - It must be in the format: index=timestamp, columns=MultiIndex[object_name, feature_name]. - Index must be a DateTimeIndex and values must be convertible to float.

    In case it is a Polars DataFrame: - It must be in the format: columns=["timestamp", "object_name@feature_name", ...]. - The "timestamp" column must be of type Datetime and the values must be convertible to float.

  • on_conflict

    (Literal['ignore', 'update'], default: 'update' ) –

    What to do in case of conflict, by default "update". It will do the following:

    • "ignore": ignore the conflict and do not insert the row (entire row will be ignored, not individual features).
    • "update": update the row with the new values.
  • bazefield_upload

    (bool, default: True ) –

    Whether to upload the data to Bazefield, by default True.

    If the object has attribute exists_in_bazefield set to False, it will not be uploaded anyway.

Source code in echo_postgres/feature_values_series.py
@validate_call
def insert(
    self,
    df: pd.DataFrame | pl.DataFrame,
    on_conflict: Literal["ignore", "update"] = "update",
    bazefield_upload: bool = True,
) -> None:
    """Insert the feature values into the database.

    Parameters
    ----------
    df : pd.DataFrame | pl.DataFrame
        DataFrame with the feature values.

        In case it is a **Pandas** DataFrame:
        - It must be in the format: index=timestamp, columns=MultiIndex[object_name, feature_name].
        - Index must be a DateTimeIndex and values must be convertible to float.

        In case it is a **Polars** DataFrame:
        - It must be in the format: columns=["timestamp", "object_name@feature_name", ...].
        - The "timestamp" column must be of type Datetime and the values must be convertible to float.
    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict, by default "update". It will do the following:

        - "ignore": ignore the conflict and do not insert the row (entire row will be ignored, not individual features).
        - "update": update the row with the new values.
    bazefield_upload : bool, optional
        Whether to upload the data to Bazefield, by default True.

        If the object has attribute `exists_in_bazefield` set to False, it will not be uploaded anyway.

    """
    # pandas checks
    if isinstance(df, pd.DataFrame):
        # checking if the DataFrame is empty
        if df.empty:
            return

        # checking if the DataFrame has MultiIndex
        if not isinstance(df.columns, pd.MultiIndex):
            raise ValueError(f"df must have MultiIndex columns, got {type(df.columns)}")
        if df.columns.names != ["object_name", "feature_name"]:
            raise ValueError(f"df columns must have names 'object_name' and 'feature_name', got {df.columns.names}")

        # checking if index is a DatetimeIndex
        if not isinstance(df.index, pd.DatetimeIndex):
            raise ValueError(f"df index must be a DatetimeIndex, got {type(df.index)}")

        # checking if all columns are numeric
        try:
            df = df.astype("float64[pyarrow]")
        except Exception as e:
            raise ValueError("All columns must be numeric") from e

        # joining the two column levels into one
        df.columns = df.columns.map(lambda x: f"{x[0]}@{x[1]}")

        # forcing the index to be named timestamp and them resetting it
        df.index.name = "timestamp"
        df = df.reset_index(drop=False)

        # converting the DataFrame to Polars
        df = pl.from_pandas(df)

    # polars checks

    # checking if column timestamp exists
    if "timestamp" not in df.columns:
        raise ValueError("df must have a column 'timestamp'")
    # checking if timestamp column is of type Datetime
    if not isinstance(df["timestamp"].dtype, pl.Datetime):
        raise ValueError("df 'timestamp' column must be of type Datetime")

    # getting the list of all other columns and checking if they are in the format "object_name@feature_name"
    # a dict like {object: [feature1, feature2, ...], ...} will be created
    upload_features = {}
    for col in df.columns:
        if col == "timestamp":
            continue
        # checking if the column is in the format "object_name@feature_name"
        if "@" not in col:
            raise ValueError(f"Column {col} must be in the format 'object_name@feature_name'")
        # splitting the column name into object and feature
        obj, feat = col.split("@")
        if not obj or not feat:
            raise ValueError(f"Column {col} must be in the format 'object_name@feature_name' with non-empty object and feature names")
        # adding the feature to the object in the dict
        if obj not in upload_features:
            upload_features[obj] = []
        upload_features[obj].append(feat)

        # making sure the data type is Float64, if not convert it
        if not isinstance(df[col].dtype, pl.Float64):
            try:
                df = df.with_columns(pl.col(col).cast(pl.Float64))
            except Exception as e:
                raise ValueError(f"Column {col} must be convertible to Float64") from e

    object_names = sorted(upload_features.keys())

    # converting to a Lazy Frame for faster processing
    df = df.lazy()

    # connecting to Bazefield
    baze = Baze() if bazefield_upload else None

    # checking if the objects exist in the database
    objs = self._perfdb.objects.instances.get(
        object_names=object_names,
        output_type="DataFrame",
        get_attributes=True,
        attribute_names=["exists_in_bazefield"],
    )
    if len(objs) != len(object_names):
        missing_objects = set(object_names) - set(objs.index)
        raise ValueError(f"Objects {missing_objects} do not exist in the database")
    # adding column "exists_in_bazefield" if it does not exist
    if "exists_in_bazefield" not in objs.columns:
        objs["exists_in_bazefield"] = True

    # getting features for all models
    features = self._perfdb.features.definitions.get(object_models=objs["object_model_name"].unique().tolist(), output_type="DataFrame")

    # iterating over objects
    for object_name, object_features_list in upload_features.items():
        # getting the model of the object
        object_model = objs.loc[object_name, "object_model_name"]
        # getting object id
        object_id: int = objs.loc[object_name, "id"]

        # getting a slice of the DataFrame for the object
        obj_df = df.select(
            pl.col("timestamp"),
            *[pl.col(f"{object_name}@{feat}") for feat in object_features_list],
        )

        # checking if all features exist in the database
        obj_features = features.loc[pd.IndexSlice[object_model, :], :].index.get_level_values("name").unique().tolist()

        # getting dict of feature names to feature ids
        feature_ids: dict[str, int] = features.loc[pd.IndexSlice[object_model, :], :]["id"].droplevel(level=0, axis=0).to_dict()

        missing_features = set(object_features_list).difference(obj_features)
        if missing_features:
            raise ValueError(
                f"Features {missing_features} do not exist in the database for object {object_name} of model {object_model}",
            )

        # keeping a copy for bazefield use
        baze_df = obj_df.clone()

        # melting the DataFrame to have columns: timestamp, object_id, feature_id, value
        # feature column will be categorical to save memory
        obj_df = (
            obj_df.unpivot(
                index=["timestamp"],
                variable_name="feature",
                value_name="value",
            )
            # Split the 'feature' column into a struct with named fields
            .with_columns(
                pl.col("feature")
                .str.split_exact("@", 1)  # Use split_exact for fixed number of splits
                .struct.rename_fields(["object_name", "feature_name"])
                .alias("feature_struct"),
            )
            # Unnest the struct columns into the main DataFrame
            .unnest("feature_struct")
            # Cast to Enum (less memory usage) and drop the original feature column
            .with_columns(
                pl.col("feature_name").cast(pl.Enum(categories=list(feature_ids.keys()))).alias("feature_name"),
            )
            # drop the original 'feature' column
            .drop(["feature", "object_name"])
            # drop all NA rows
            .drop_nulls()
            # add object_id column
            .with_columns(
                pl.lit(object_id, dtype=pl.Int64).alias("object_id"),
            )
            # add feature_id column mapping from feature names to feature ids
            .with_columns(pl.col("feature_name").replace_strict(feature_ids, default=None, return_dtype=pl.Int64).alias("feature_id"))
            # dropping feature_name column as it is not needed anymore
            .drop("feature_name")
            # dropping duplicate rows
            .unique(subset=["timestamp", "object_id", "feature_id"])
            # sorting by timestamp, object_id and feature_id
            .sort(["timestamp", "object_id", "feature_id"])
        )

        # convert value column to Float32
        obj_df = obj_df.with_columns(pl.col("value").cast(pl.Float32))

        # collecting results to then insert them into the database
        obj_df = obj_df.collect()

        # inserting the data
        with self._perfdb.conn.reconnect() as conn:
            conn.polars_to_sql(
                obj_df,
                table_name="feature_values",
                schema="performance",
                if_exists="append" if on_conflict == "ignore" else "update",
            )

        logger.debug(f"{object_name} - {len(obj_df)} rows inserted")

        # uploading to Bazefield
        try:
            exists_in_bazefield = objs.loc[object_name, "exists_in_bazefield"]
            if not bazefield_upload or not exists_in_bazefield:
                logger.debug(
                    f"{object_name} - not uploaded to Bazefield because either exists_in_bazefield object attribute or bazefield_upload argument is False. {bazefield_upload=} {exists_in_bazefield=}",
                )
            else:
                baze.points.values.series.insert(data=baze_df.collect())
        except Exception:
            logger.exception(f"{object_name} - Error uploading to Bazefield")