Feature Values - Series¶
FeatureValuesSeries(perfdb)
¶
Class used for handling Feature Values Series. Can be accessed via perfdb.features.values.series.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(features, period, bazefield_delete=False)
¶
Delete the feature values for the given features and period.
The deletion will not remove the rows, only set the values to NULL to avoid deleting data from other features.
Parameters:
-
(features¶dict[str, list[str]]) –Dictionary in the format {object_name: [feature_name, ...], ...}.
-
(period¶DateTimeRange) –The period to delete the feature values for.
-
(bazefield_delete¶bool, default:False) –Whether to delete the data from Bazefield, by default False.
Source code in echo_postgres/feature_values_series.py
@validate_call
def delete(
self,
features: dict[str, list[str]],
period: DateTimeRange,
bazefield_delete: bool = False,
) -> None:
"""Delete the feature values for the given features and period.
The deletion will not remove the rows, only set the values to NULL to avoid deleting data from other features.
Parameters
----------
features : dict[str, list[str]]
Dictionary in the format {object_name: [feature_name, ...], ...}.
period : DateTimeRange
The period to delete the feature values for.
bazefield_delete : bool, optional
Whether to delete the data from Bazefield, by default False.
"""
# getting object ids
objects_def = self._perfdb.objects.instances.get(
object_names=list(features.keys()),
)
object_models = sorted(obj["object_model_name"] for obj in objects_def.values())
# getting feature ids
features_def = self._perfdb.features.definitions.get(object_models=object_models, output_type="DataFrame")
# iterating over objects
for object_name, feature_names in features.items():
obj_id = objects_def[object_name]["id"]
obj_model_name = objects_def[object_name]["object_model_name"]
feature_ids = features_def.loc[pd.IndexSlice[obj_model_name, feature_names], :]["id"].tolist()
# writing the query
query = sql.SQL(
"""DELETE FROM performance.feature_values WHERE "object_id" = {object_id} AND "feature_id" IN ({feature_ids}) AND "timestamp" >= {start} AND "timestamp" <= {end};""",
).format(
object_id=sql.Literal(obj_id),
feature_ids=sql.SQL(", ").join(sql.Literal(fid) for fid in feature_ids),
start=sql.Literal(period.start.strftime("%Y-%m-%d %H:%M:%S")),
end=sql.Literal(period.end.strftime("%Y-%m-%d %H:%M:%S")),
)
# executing the query
with self._perfdb.conn.reconnect() as conn:
result = conn.execute(query)
logger.debug(f"{object_name} - Deleted {result.rowcount} rows from performance.feature_values")
# deleting from Bazefield
if bazefield_delete:
# getting objects to check if they exist in Bazefield
objs = self._perfdb.objects.instances.get(
object_names=list(features.keys()),
output_type="DataFrame",
get_attributes=True,
attribute_names=["exists_in_bazefield"],
)
# adding column "exists_in_bazefield" if it does not exist
if "exists_in_bazefield" not in objs.columns:
objs["exists_in_bazefield"] = True
# connecting to Bazefield
try:
baze = Baze()
for object_name, feature_names in features.items():
# checking if the object exists in Bazefield
if not objs.loc[object_name, "exists_in_bazefield"]:
logger.debug(f"{object_name} - not deleted from Bazefield because exists_in_bazefield attribute is False")
continue
baze.points.values.series.delete(points={object_name: feature_names}, period=period)
except Exception:
logger.exception("Error deleting from Bazefield")
get(features, period, aggregation='Raw', aggregation_interval=None, aggregation_convention=None, timestamp_res=None, time_weight_method='LOCF', integral_time_unit='second', reindex=None, output_type='DataFrame')
¶
Get the feature values for the given features and period.
Parameters:
-
(features¶dict[str, list[str]]) –Dictionary in the format {object_name: [feature_name, ...], ...}.
-
(period¶DateTimeRange) –The period to get the feature values for.
-
(aggregation¶SUPPORTED_AGGREGATIONS | list[SUPPORTED_AGGREGATIONS], default:'Raw') –Type of aggregation to be used. Can be one of:
- Raw
- AVG
- TimeAverage
- COUNT
- MAX
- MIN
- SUM
- STDDEV
- CORR
- VARIANCE
- FirstValue
- LastValue
- Integral
A list of aggregations can be passed to get multiple aggregations at once.
This is preferred to resampling after getting Raw values as it offloads the aggregation to the database, which will avoid transferring large amounts of raw data over the network when only a few aggregations are needed.
Some of the methods are based on timescale functions. If you need more information about them, please refer to the TimescaleDB documentation (https://docs.tigerdata.com/api/latest/hyperfunctions/time-weighted-calculations/time_weight).
By default "Raw"
-
(aggregation_interval¶timedelta | None, default:None) –Length of each timestamp. Not necessary if aggregation=="Raw". By default None
-
(aggregation_convention¶Literal['left', 'right'], default:None) –The convention to use for the aggregation. Can be either "left" or "right".
Left assumes that the timestamp is the start of the interval, while right assumes that the timestamp is the end of the interval.
To give an example, considering 10 minute intervals aggregated to 30 minutes: - left: 00:00, 00:10, 00:20 will be aggregated to 00:00; 00:30, 00:40, 00:50 will be aggregated to 00:30 - right: 23:40, 23:50, 00:00 will be aggregated to 00:00; 00:10, 00:20, 00:30 will be aggregated to 00:30
Not necessary if aggregation=="Raw".
Usually you should set this to "right" as most SCADA systems use this convention.
-
(timestamp_res¶timedelta | None, default:None) –The resolution of the timestamps of the requested data. This is used in case aggregation is not "Raw" and aggregation_convention is "right". It will be used to shift the timestamps before doing the aggregation so they are in the equivalent to the "left" convention, which is the expected when doing the aggregation.
Usually this should be 5 or 10 minutes.
Considering this argument, when doing aggregations, it's not recommended to query at the same time features where timestamp resolution is different, as it will lead to unexpected results.
-
(time_weight_method¶Literal['Linear', 'LOCF'], default:'LOCF') –The method to use for time-weighted calculations. Can be one of: - Linear: linear interpolation between values - LOCF: last observation carried forward (the value is carried forward until a new value is observed)
By default "LOCF".
Only applicable if aggregation is one of "TimeAverage", "FirstValue", "LastValue", or "Integral".
More details on how this works can be found in the TimescaleDB documentation (https://www.tigerdata.com/blog/what-time-weighted-averages-are-and-why-you-should-care).
-
(integral_time_unit¶Literal['microsecond', 'millisecond', 'second', 'minute', 'hour'], default:'second') –The time unit to use for the integral aggregation. Can be one of: - microsecond - millisecond - second - minute - hour
This is used when the Integral aggregation is used. It defines the time unit for the integral calculation. The integral will be calculated as the sum of the values multiplied by it's duration in the specified time unit.
Only applicable if aggregation=="Integral".
More details on how this works can be found in the TimescaleDB documentation (https://docs.tigerdata.com/api/latest/hyperfunctions/time-weighted-calculations/time_weight/#integral).
By default "second".
-
(reindex¶str | None, default:None) –String containing timedelta the should be considered to reindex the DataFrame (ex: "10min", "5min", etc.).
This does not resample the DataFrame! It is only used to get all the timestamps if some are missing from the data (the created timestamps will be set to NaN). The operation is done after getting the data from the database.
If set to "infer" the timedelta will be inferred from the acquired data from the database. If set to None no reindex will be done.
By default None
-
(output_type¶Literal['DataFrame', 'pl.DataFrame'], default:'DataFrame') –The type of the output DataFrame. Can be either "DataFrame" (pandas DataFrame) or "pl.DataFrame" (polars DataFrame).
Returns:
-
DataFrame–In case output_type="DataFrame", a Pandas DataFrame with the feature values.
If just one aggregation is requested, timestamp will be the index and columns will be MultiIndex with levels ["object_name", "feature_name"].
If multiple aggregations are requested, timestamp will be the index and columns will be MultiIndex with levels ["object_name", "feature_name", "aggregation"].
-
DataFrame–In case output_type="pl.DataFrame", a Polars DataFrame with the feature values. It will have a column "timestamp" with the timestamp.
If just one aggregation is requested, the columns will be in the format "object_name@feature_name".
If multiple aggregations are requested, the columns will be in the format "object_name@feature_name#aggregation".
Source code in echo_postgres/feature_values_series.py
@validate_call
def get(
self,
features: dict[str, list[str]],
period: DateTimeRange,
aggregation: SUPPORTED_AGGREGATIONS | list[SUPPORTED_AGGREGATIONS] = "Raw",
aggregation_interval: timedelta | None = None,
aggregation_convention: Literal["left", "right"] | None = None,
timestamp_res: timedelta | None = None,
time_weight_method: Literal["Linear", "LOCF"] = "LOCF",
integral_time_unit: Literal["microsecond", "millisecond", "second", "minute", "hour"] = "second",
reindex: str | None = None,
output_type: Literal["DataFrame", "pl.DataFrame"] = "DataFrame",
) -> pd.DataFrame | pl.DataFrame:
"""Get the feature values for the given features and period.
Parameters
----------
features : dict[str, list[str]]
Dictionary in the format {object_name: [feature_name, ...], ...}.
period : DateTimeRange
The period to get the feature values for.
aggregation : SUPPORTED_AGGREGATIONS | list[SUPPORTED_AGGREGATIONS], optional
Type of aggregation to be used. Can be one of:
- Raw
- AVG
- TimeAverage
- COUNT
- MAX
- MIN
- SUM
- STDDEV
- CORR
- VARIANCE
- FirstValue
- LastValue
- Integral
A list of aggregations can be passed to get multiple aggregations at once.
This is preferred to resampling after getting Raw values as it offloads the aggregation to the database, which will avoid transferring large amounts of raw data over the network when only a few aggregations are needed.
Some of the methods are based on timescale functions. If you need more information about them, please refer to the TimescaleDB documentation (https://docs.tigerdata.com/api/latest/hyperfunctions/time-weighted-calculations/time_weight).
By default "Raw"
aggregation_interval : timedelta | None, optional
Length of each timestamp. Not necessary if aggregation=="Raw". By default None
aggregation_convention : Literal["left", "right"]
The convention to use for the aggregation. Can be either "left" or "right".
Left assumes that the timestamp is the start of the interval, while right assumes that the timestamp is the end of the interval.
To give an example, considering 10 minute intervals aggregated to 30 minutes:
- left: 00:00, 00:10, 00:20 will be aggregated to 00:00; 00:30, 00:40, 00:50 will be aggregated to 00:30
- right: 23:40, 23:50, 00:00 will be aggregated to 00:00; 00:10, 00:20, 00:30 will be aggregated to 00:30
Not necessary if aggregation=="Raw".
Usually you should set this to "right" as most SCADA systems use this convention.
timestamp_res : timedelta | None, optional
The resolution of the timestamps of the requested data. This is used in case aggregation is not "Raw" and aggregation_convention is "right". It will be used to shift the timestamps before doing the aggregation so they are in the equivalent to the "left" convention, which is the expected when doing the aggregation.
Usually this should be 5 or 10 minutes.
Considering this argument, when doing aggregations, it's not recommended to query at the same time features where timestamp resolution is different, as it will lead to unexpected results.
time_weight_method : Literal["Linear", "LOCF"], optional
The method to use for time-weighted calculations. Can be one of:
- Linear: linear interpolation between values
- LOCF: last observation carried forward (the value is carried forward until a new value is observed)
By default "LOCF".
Only applicable if aggregation is one of "TimeAverage", "FirstValue", "LastValue", or "Integral".
More details on how this works can be found in the TimescaleDB documentation (https://www.tigerdata.com/blog/what-time-weighted-averages-are-and-why-you-should-care).
integral_time_unit : Literal["microsecond", "millisecond", "second", "minute", "hour"], optional
The time unit to use for the integral aggregation. Can be one of:
- microsecond
- millisecond
- second
- minute
- hour
This is used when the Integral aggregation is used. It defines the time unit for the integral calculation. The integral will be calculated as the sum of the values multiplied by it's duration in the specified time unit.
Only applicable if aggregation=="Integral".
More details on how this works can be found in the TimescaleDB documentation (https://docs.tigerdata.com/api/latest/hyperfunctions/time-weighted-calculations/time_weight/#integral).
By default "second".
reindex : str | None, optional
String containing timedelta the should be considered to reindex the DataFrame (ex: "10min", "5min", etc.).
This does not resample the DataFrame! It is only used to get all the timestamps if some are missing from the data (the created timestamps will be set to NaN). The operation is done after getting the data from the database.
If set to "infer" the timedelta will be inferred from the acquired data from the database.
If set to None no reindex will be done.
By default None
output_type : Literal["DataFrame", "pl.DataFrame"], optional
The type of the output DataFrame. Can be either "DataFrame" (pandas DataFrame) or "pl.DataFrame" (polars DataFrame).
Returns
-------
pd.DataFrame
In case output_type="DataFrame", a Pandas DataFrame with the feature values.
If just one aggregation is requested, timestamp will be the index and columns will be MultiIndex with levels ["object_name", "feature_name"].
If multiple aggregations are requested, timestamp will be the index and columns will be MultiIndex with levels ["object_name", "feature_name", "aggregation"].
pl.DataFrame
In case output_type="pl.DataFrame", a Polars DataFrame with the feature values. It will have a column "timestamp" with the timestamp.
If just one aggregation is requested, the columns will be in the format "object_name@feature_name".
If multiple aggregations are requested, the columns will be in the format "object_name@feature_name#aggregation".
"""
# getting models of the wanted objects
objs_def = self._perfdb.objects.instances.get(
object_names=list(features.keys()),
output_type="DataFrame",
)
# getting feature definitions for the wanted objects
features_def = self._perfdb.features.definitions.get(
object_models=objs_def["object_model_name"].unique().tolist(),
output_type="DataFrame",
)
# checking if all features exist in the database
where_clauses = []
for object_name, feature_names in features.items():
# getting the object id
object_id = objs_def.loc[object_name, "id"]
# getting the model of the object
object_model = objs_def.loc[object_name, "object_model_name"]
# getting the features for the object
obj_features = features_def.loc[pd.IndexSlice[object_model, :], :].index.get_level_values("name").unique().tolist()
missing_features = set(feature_names).difference(obj_features)
if missing_features:
raise ValueError(
f"Features {missing_features} do not exist in the database for object {object_name} of model {object_model}",
)
# adding the where clause for the object and features
obj_where = sql.SQL(
"(object_id = {object_id} AND feature_id IN ({feature_ids}))",
).format(
object_id=sql.Literal(object_id),
feature_ids=sql.SQL(", ").join(
sql.Literal(fid) for fid in features_def.loc[pd.IndexSlice[object_model, feature_names], :]["id"].tolist()
),
)
where_clauses.append(obj_where)
# converting objs_def and features_def to polars DataFrame
objs_def = pl.from_pandas(objs_def, include_index=True)
features_def = pl.from_pandas(features_def, include_index=True)
if isinstance(aggregation, str):
aggregation = [aggregation]
# in case we are doing aggregations we will adjust query period +- the aggregation_interval to make sure the calculated values consider the whole interval
if aggregation != ["Raw"]:
query_period = period.copy()
query_period.start -= aggregation_interval
query_period.end += aggregation_interval
else:
query_period = period
# creating agg_interval in SQL format if anything other than "Raw" is requested
if aggregation != ["Raw"]:
# make sure aggregation_interval is set
if aggregation_interval is None:
raise ValueError("aggregation_interval must be set if aggregation is not 'Raw'")
aggregation_interval = sql.SQL(
"'{agg_seconds} seconds'::INTERVAL",
).format(agg_seconds=sql.SQL(str(int(aggregation_interval.total_seconds()))))
# making sure aggregation_convention is set
if aggregation_convention is None:
raise ValueError(
"aggregation_convention must be set if aggregation is not 'Raw'. It can be either 'left' or 'right'.",
)
# checking if timestamp_res is needed
if timestamp_res is None and aggregation != ["Raw"] and aggregation_convention == "right":
raise ValueError(
"timestamp_res must be set if aggregation is not 'Raw' and aggregation_convention is 'right'.",
)
# converting timestamp_res to SQL format if needed
if timestamp_res is None:
timestamp_res = timedelta(seconds=1)
timestamp_res = sql.SQL(
"'{timestamp_res_seconds} seconds'::INTERVAL",
).format(timestamp_res_seconds=sql.SQL(str(int(timestamp_res.total_seconds()))))
dfs = []
for agg in aggregation:
if agg == "Raw":
# writing the query
# we are only getting the columns: object_id, feature_id, timestamp, value to avoid fetching unnecessary data multiple times
query = sql.SQL(
"SELECT object_id, feature_id, timestamp::TIMESTAMP, value FROM performance.feature_values WHERE timestamp >= {start} AND timestamp <= {end} AND ({where_clauses}) ",
).format(
start=sql.Literal(query_period.start.strftime("%Y-%m-%d %H:%M:%S")),
end=sql.Literal(query_period.end.strftime("%Y-%m-%d %H:%M:%S")),
where_clauses=sql.SQL(" OR ").join(where_clauses),
)
else:
# in case of aggregation we need to group by object_id, feature_id and timestamp
timescale_aggs = {
"TimeAverage": "interpolated_average",
"FirstValue": "first_val",
"LastValue": "last_val",
"Integral": "interpolated_integral",
}
# bucket_agg_query
match agg:
case "TimeAverage" | "Integral" | "FirstValue" | "LastValue":
bucket_agg_query = sql.SQL("time_weight({time_weight_method}, fv.timestamp, fv.value) AS tws").format(
time_weight_method=sql.Literal(time_weight_method),
)
case _:
bucket_agg_query = sql.SQL("{agg}(fv.value) AS value").format(agg=sql.SQL(agg))
# agg_query
match agg:
case "TimeAverage":
agg_query = sql.SQL("{agg}(fv.tws, fv.time_bucket, params.agg_interval)").format(
agg=sql.SQL(timescale_aggs[agg]),
)
case "Integral":
agg_query = sql.SQL("{agg}(fv.tws, fv.time_bucket, params.agg_interval, NULL, NULL, {integral_time_unit})").format(
agg=sql.SQL(timescale_aggs[agg]),
integral_time_unit=sql.Literal(integral_time_unit),
)
case "FirstValue" | "LastValue":
agg_query = sql.SQL("{agg}(fv.tws)").format(
agg=sql.SQL(timescale_aggs[agg]),
)
case _:
agg_query = sql.SQL("fv.value")
# writing the query
query = sql.SQL(
"""WITH params AS (SELECT {agg_interval} AS agg_interval, {timestamp_res} AS timestamp_res),
fv_adj AS (
-- This CTE adjusts the timestamps to the aggregation convention
-- If aggregation_convention is "right", we adjust the timestamps to the left convention
SELECT
fv.object_id,
fv.feature_id,
fv.timestamp {timestamp_adj} AS "timestamp",
value
FROM performance.feature_values fv, params
WHERE fv.timestamp >= {start} AND fv.timestamp <= {end} AND ({where_clauses})
),
fv_bucketed AS (
-- This CTE buckets the adjusted timestamps into intervals of agg_interval
-- It also applies the time_weight function if needed
-- In case no timescale function is used, it will just group by the time_bucket
SELECT
fv.object_id,
fv.feature_id,
time_bucket(params.agg_interval, fv.timestamp) AS time_bucket,
{bucket_agg_query}
FROM fv_adj fv, params
GROUP BY fv.object_id, fv.feature_id, time_bucket
)
SELECT
fv.object_id,
fv.feature_id,
fv.time_bucket::TIMESTAMP AS "timestamp",
{agg_query} AS value
FROM fv_bucketed fv, params
ORDER BY fv.object_id, fv.feature_id, fv.time_bucket;
""",
).format(
agg_interval=aggregation_interval,
timestamp_res=timestamp_res,
timestamp_adj=sql.SQL("+ params.agg_interval - params.timestamp_res")
if aggregation_convention == "right"
else sql.SQL(""),
start=sql.Literal(query_period.start.strftime("%Y-%m-%d %H:%M:%S")),
end=sql.Literal(query_period.end.strftime("%Y-%m-%d %H:%M:%S")),
where_clauses=sql.SQL(" OR ").join(where_clauses),
bucket_agg_query=bucket_agg_query,
agg_query=agg_query,
)
# getting the data
with self._perfdb.conn.reconnect() as conn:
# we are using polars for faster processing
df = conn.read_to_polars(
query=query,
schema_overrides={"timestamp": pl.Datetime("ms"), "value": pl.Float64, "object_id": pl.Int64, "feature_id": pl.Int64},
)
# adding the aggregation type to the DataFrame
df = df.with_columns(pl.lit(agg, dtype=pl.Utf8).alias("aggregation"))
dfs.append(df)
# concatenating all DataFrames
df = pl.concat(dfs, how="vertical")
# replacing object_id and feature_id with object_name and feature_name
df = df.join(
objs_def.select(pl.col("id").alias("object_id"), pl.col("name").alias("object_name")),
on="object_id",
how="left",
).join(
features_def.select(pl.col("id").alias("feature_id"), pl.col("name").alias("feature_name")),
on="feature_id",
how="left",
)
# pivoting the DataFrame to have object_feature as columns (timestamp will also be a column) and value as values
df = df.with_columns(pl.format("{}@{}#{}", "object_name", "feature_name", "aggregation").alias("pivot_key")).pivot(
on=["pivot_key"],
index="timestamp",
values="value",
aggregate_function=None,
sort_columns=False,
)
# sort the index to have timestamps in ascending order
df = df.sort("timestamp")
# adding columns for features that did not have any values in the period
wanted_cols = [f"{obj}@{feat}" for obj, feat_list in features.items() for feat in feat_list]
# adding aggregation type to the columns
wanted_cols = [f"{col}#{agg}" for col in wanted_cols for agg in aggregation]
missing_columns = set(wanted_cols) - set(df.columns)
if missing_columns:
# adding missing columns with NaN values
for col in missing_columns:
df = df.with_columns(pl.lit(None, dtype=pl.Float64).alias(col))
# reindexing
if reindex is not None:
final_reindex = copy(reindex)
# reindexing DataFrame to have all timestamps if desired
if final_reindex is not None:
# inferring frequency if reindex is "infer"
if final_reindex == "infer" and len(df) > 3:
final_reindex = get_index_freq(df["timestamp"])
elif final_reindex == "infer":
logger.debug("Cannot infer frequency from data because it has less than 3 timestamps. Using '10min' as default.")
final_reindex = "10min"
# if still failed, lets raise an error
if final_reindex is None:
raise RuntimeError("Cannot infer frequency from data. Please set 'reindex' argument manually.")
# converting final_reindex from string to timedelta
if isinstance(final_reindex, str):
final_reindex = pd.to_timedelta(final_reindex).to_pytimedelta()
timestamp_start = period["start"].replace(minute=0, second=0, hour=0)
final_reindex = pl.datetime_range(
start=timestamp_start,
end=period["end"],
interval=final_reindex,
closed="both",
eager=True,
time_unit="ms",
)
# reindexing the DataFrame
df = df.join(
pl.DataFrame({"final_timestamp": final_reindex}),
left_on="timestamp",
right_on="final_timestamp",
how="full",
)
# dropping the old timestamp column and renaming the new one
df = df.drop("timestamp").rename({"final_timestamp": "timestamp"})
# sorting the DataFrame by timestamp
df = df.sort("timestamp")
# restricting to desired period only
df = df.filter((pl.col("timestamp") >= period["start"]) & (pl.col("timestamp") <= period["end"]))
# sorting columns and leaving timestamp as the first column
df = df.select(["timestamp", *sorted(set(df.columns).difference({"timestamp"}))])
# removing the aggregation if just one aggregation is requested
if len(aggregation) == 1:
# removing the aggregation from the column names
df = df.rename({col: col.split("#")[0] for col in df.columns if "#" in col})
# returning the DataFrame in the requested format
if output_type == "pl.DataFrame":
return df
# converting to pandas DataFrame
df = df.to_pandas(use_pyarrow_extension_array=True)
# setting the index to timestamp
df = df.set_index("timestamp")
if len(aggregation) == 1:
# converting column names to tuples of (object_name, feature_name) to later convert to MultiIndex
df.columns = pd.MultiIndex.from_tuples(
[tuple(col.split("@")) for col in df.columns],
names=["object_name", "feature_name"],
)
else:
# converting column names to tuples of (object_name, feature_name, aggregation) to later convert to MultiIndex
col_names = [(*tuple(col.split("@")), col.split("#")[1]) for col in df.columns]
# removing #AGG from feature level
col_names = [(obj, feat.split("#")[0], agg) for obj, feat, agg in col_names]
df.columns = pd.MultiIndex.from_tuples(
col_names,
names=["object_name", "feature_name", "aggregation"],
)
# converting index to DatetimeIndex
df.index = df.index.astype("datetime64[s]")
return df
insert(df, on_conflict='update', bazefield_upload=True)
¶
Insert the feature values into the database.
Parameters:
-
(df¶DataFrame | DataFrame) –DataFrame with the feature values.
In case it is a Pandas DataFrame: - It must be in the format: index=timestamp, columns=MultiIndex[object_name, feature_name]. - Index must be a DateTimeIndex and values must be convertible to float.
In case it is a Polars DataFrame: - It must be in the format: columns=["timestamp", "object_name@feature_name", ...]. - The "timestamp" column must be of type Datetime and the values must be convertible to float.
-
(on_conflict¶Literal['ignore', 'update'], default:'update') –What to do in case of conflict, by default "update". It will do the following:
- "ignore": ignore the conflict and do not insert the row (entire row will be ignored, not individual features).
- "update": update the row with the new values.
-
(bazefield_upload¶bool, default:True) –Whether to upload the data to Bazefield, by default True.
If the object has attribute
exists_in_bazefieldset to False, it will not be uploaded anyway.
Source code in echo_postgres/feature_values_series.py
@validate_call
def insert(
self,
df: pd.DataFrame | pl.DataFrame,
on_conflict: Literal["ignore", "update"] = "update",
bazefield_upload: bool = True,
) -> None:
"""Insert the feature values into the database.
Parameters
----------
df : pd.DataFrame | pl.DataFrame
DataFrame with the feature values.
In case it is a **Pandas** DataFrame:
- It must be in the format: index=timestamp, columns=MultiIndex[object_name, feature_name].
- Index must be a DateTimeIndex and values must be convertible to float.
In case it is a **Polars** DataFrame:
- It must be in the format: columns=["timestamp", "object_name@feature_name", ...].
- The "timestamp" column must be of type Datetime and the values must be convertible to float.
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict, by default "update". It will do the following:
- "ignore": ignore the conflict and do not insert the row (entire row will be ignored, not individual features).
- "update": update the row with the new values.
bazefield_upload : bool, optional
Whether to upload the data to Bazefield, by default True.
If the object has attribute `exists_in_bazefield` set to False, it will not be uploaded anyway.
"""
# pandas checks
if isinstance(df, pd.DataFrame):
# checking if the DataFrame is empty
if df.empty:
return
# checking if the DataFrame has MultiIndex
if not isinstance(df.columns, pd.MultiIndex):
raise ValueError(f"df must have MultiIndex columns, got {type(df.columns)}")
if df.columns.names != ["object_name", "feature_name"]:
raise ValueError(f"df columns must have names 'object_name' and 'feature_name', got {df.columns.names}")
# checking if index is a DatetimeIndex
if not isinstance(df.index, pd.DatetimeIndex):
raise ValueError(f"df index must be a DatetimeIndex, got {type(df.index)}")
# checking if all columns are numeric
try:
df = df.astype("float64[pyarrow]")
except Exception as e:
raise ValueError("All columns must be numeric") from e
# joining the two column levels into one
df.columns = df.columns.map(lambda x: f"{x[0]}@{x[1]}")
# forcing the index to be named timestamp and them resetting it
df.index.name = "timestamp"
df = df.reset_index(drop=False)
# converting the DataFrame to Polars
df = pl.from_pandas(df)
# polars checks
# checking if column timestamp exists
if "timestamp" not in df.columns:
raise ValueError("df must have a column 'timestamp'")
# checking if timestamp column is of type Datetime
if not isinstance(df["timestamp"].dtype, pl.Datetime):
raise ValueError("df 'timestamp' column must be of type Datetime")
# getting the list of all other columns and checking if they are in the format "object_name@feature_name"
# a dict like {object: [feature1, feature2, ...], ...} will be created
upload_features = {}
for col in df.columns:
if col == "timestamp":
continue
# checking if the column is in the format "object_name@feature_name"
if "@" not in col:
raise ValueError(f"Column {col} must be in the format 'object_name@feature_name'")
# splitting the column name into object and feature
obj, feat = col.split("@")
if not obj or not feat:
raise ValueError(f"Column {col} must be in the format 'object_name@feature_name' with non-empty object and feature names")
# adding the feature to the object in the dict
if obj not in upload_features:
upload_features[obj] = []
upload_features[obj].append(feat)
# making sure the data type is Float64, if not convert it
if not isinstance(df[col].dtype, pl.Float64):
try:
df = df.with_columns(pl.col(col).cast(pl.Float64))
except Exception as e:
raise ValueError(f"Column {col} must be convertible to Float64") from e
object_names = sorted(upload_features.keys())
# converting to a Lazy Frame for faster processing
df = df.lazy()
# connecting to Bazefield
baze = Baze() if bazefield_upload else None
# checking if the objects exist in the database
objs = self._perfdb.objects.instances.get(
object_names=object_names,
output_type="DataFrame",
get_attributes=True,
attribute_names=["exists_in_bazefield"],
)
if len(objs) != len(object_names):
missing_objects = set(object_names) - set(objs.index)
raise ValueError(f"Objects {missing_objects} do not exist in the database")
# adding column "exists_in_bazefield" if it does not exist
if "exists_in_bazefield" not in objs.columns:
objs["exists_in_bazefield"] = True
# getting features for all models
features = self._perfdb.features.definitions.get(object_models=objs["object_model_name"].unique().tolist(), output_type="DataFrame")
# iterating over objects
for object_name, object_features_list in upload_features.items():
# getting the model of the object
object_model = objs.loc[object_name, "object_model_name"]
# getting object id
object_id: int = objs.loc[object_name, "id"]
# getting a slice of the DataFrame for the object
obj_df = df.select(
pl.col("timestamp"),
*[pl.col(f"{object_name}@{feat}") for feat in object_features_list],
)
# checking if all features exist in the database
obj_features = features.loc[pd.IndexSlice[object_model, :], :].index.get_level_values("name").unique().tolist()
# getting dict of feature names to feature ids
feature_ids: dict[str, int] = features.loc[pd.IndexSlice[object_model, :], :]["id"].droplevel(level=0, axis=0).to_dict()
missing_features = set(object_features_list).difference(obj_features)
if missing_features:
raise ValueError(
f"Features {missing_features} do not exist in the database for object {object_name} of model {object_model}",
)
# keeping a copy for bazefield use
baze_df = obj_df.clone()
# melting the DataFrame to have columns: timestamp, object_id, feature_id, value
# feature column will be categorical to save memory
obj_df = (
obj_df.unpivot(
index=["timestamp"],
variable_name="feature",
value_name="value",
)
# Split the 'feature' column into a struct with named fields
.with_columns(
pl.col("feature")
.str.split_exact("@", 1) # Use split_exact for fixed number of splits
.struct.rename_fields(["object_name", "feature_name"])
.alias("feature_struct"),
)
# Unnest the struct columns into the main DataFrame
.unnest("feature_struct")
# Cast to Enum (less memory usage) and drop the original feature column
.with_columns(
pl.col("feature_name").cast(pl.Enum(categories=list(feature_ids.keys()))).alias("feature_name"),
)
# drop the original 'feature' column
.drop(["feature", "object_name"])
# drop all NA rows
.drop_nulls()
# add object_id column
.with_columns(
pl.lit(object_id, dtype=pl.Int64).alias("object_id"),
)
# add feature_id column mapping from feature names to feature ids
.with_columns(pl.col("feature_name").replace_strict(feature_ids, default=None, return_dtype=pl.Int64).alias("feature_id"))
# dropping feature_name column as it is not needed anymore
.drop("feature_name")
# dropping duplicate rows
.unique(subset=["timestamp", "object_id", "feature_id"])
# sorting by timestamp, object_id and feature_id
.sort(["timestamp", "object_id", "feature_id"])
)
# convert value column to Float32
obj_df = obj_df.with_columns(pl.col("value").cast(pl.Float32))
# collecting results to then insert them into the database
obj_df = obj_df.collect()
# inserting the data
with self._perfdb.conn.reconnect() as conn:
conn.polars_to_sql(
obj_df,
table_name="feature_values",
schema="performance",
if_exists="append" if on_conflict == "ignore" else "update",
)
logger.debug(f"{object_name} - {len(obj_df)} rows inserted")
# uploading to Bazefield
try:
exists_in_bazefield = objs.loc[object_name, "exists_in_bazefield"]
if not bazefield_upload or not exists_in_bazefield:
logger.debug(
f"{object_name} - not uploaded to Bazefield because either exists_in_bazefield object attribute or bazefield_upload argument is False. {bazefield_upload=} {exists_in_bazefield=}",
)
else:
baze.points.values.series.insert(data=baze_df.collect())
except Exception:
logger.exception(f"{object_name} - Error uploading to Bazefield")