Feature Values - When At Condition¶
FeatureValuesWhenAtCondition(perfdb)
¶
Class used for getting timestamps when the feature meets a desired condition. Can be accessed via perfdb.features.values.whenatcondition.
Parameters:
Source code in echo_postgres/perfdb_root.py
Python
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
get(features)
¶
Gets the timestamps when the feature meets a desired condition.
Parameters:
-
(features¶dict[str, dict[str, dict[str, str]]]) –Dictionary in the format (values are examples):
Python{ object_name: { feature_name: { "condition": "value > 1000 AND timestamp > '2021-01-01'", "order_by": "timestamp DESC", "limit": 1, }, ... }, ... }The keys needed for each feature are:
- condition: The condition to be met. Any valid SQL condition can be used. You must always do the comparisons against "value" and "timestamp" columns.
- order_by: The order by clause to be used. Any valid SQL order by clause can be used. You must always order by "value" or "timestamp" columns.
- limit: The number of timestamps to return. A minimum of 1 is required.
Returns:
-
dict[str, dict[str, list[datetime] | None]]–Dict containing the timestamps when the condition is met. The number of timestamps returned will be dictated by the limit key in the arguments. It is in the following format:
Python{ object_name: { feature_name: [timestamp1, timestamp2, ...], ... }, ... }If the condition is not met, the value is None.
Source code in echo_postgres/feature_values_whenatcondition.py
Python
@validate_call
def get(
self,
features: dict[str, dict[str, dict[str, str | int]]],
) -> dict[str, dict[str, list[datetime] | None]]:
"""Gets the timestamps when the feature meets a desired condition.
Parameters
----------
features : dict[str, dict[str, dict[str, str]]]
Dictionary in the format (values are examples):
```python
{
object_name: {
feature_name: {
"condition": "value > 1000 AND timestamp > '2021-01-01'",
"order_by": "timestamp DESC",
"limit": 1,
},
...
},
...
}
```
The keys needed for each feature are:
- **condition**: The condition to be met. Any valid SQL condition can be used. You must always do the comparisons against "value" and "timestamp" columns.
- **order_by**: The order by clause to be used. Any valid SQL order by clause can be used. You must always order by "value" or "timestamp" columns.
- **limit**: The number of timestamps to return. A minimum of 1 is required.
Returns
-------
dict[str, dict[str, list[datetime] | None]]
Dict containing the timestamps when the condition is met. The number of timestamps returned will be dictated by the limit key in the arguments. It is in the following format:
```python
{
object_name: {
feature_name: [timestamp1, timestamp2, ...],
...
},
...
}
```
If the condition is not met, the value is None.
"""
# defining json schema to check the input
json_schema = {
"type": "object",
"additionalProperties": {
"type": "object",
"additionalProperties": {
"type": "object",
"properties": {
"condition": {"type": "string"},
"order_by": {"type": "string"},
"limit": {"type": "integer", "minimum": 1},
},
"required": ["condition", "order_by", "limit"],
"additionalProperties": False,
},
},
}
# checking arguments
try:
validate(instance=features, schema=json_schema)
except Exception as e:
raise ValueError("Invalid format for features argument.") from e
# getting the models of all the objects
object_names = list(features.keys())
objs = self._perfdb.objects.instances.get(object_names=object_names, output_type="pl.DataFrame")
# validating if all objects were found
if objs.shape[0] != len(features):
missing_objs = set(features.keys()) - set(objs["name"].to_list())
raise ValueError(f"The following objects were not found in the database: {missing_objs}")
# pre-build lookup dicts to avoid repeated DataFrame filtering
obj_id_map: dict[str, int] = dict(zip(objs["name"].to_list(), objs["id"].to_list(), strict=False))
obj_model_map: dict[str, str] = dict(
zip(objs["name"].to_list(), objs["object_model_name"].to_list(), strict=False),
)
obj_models = sorted(set(obj_model_map.values()))
# getting the features for the desired models and building a lookup dict
features_def = self._perfdb.features.definitions.get(object_models=obj_models, output_type="pl.DataFrame")
feature_id_map: dict[tuple[str, str], int] = {
(model, name): fid
for model, name, fid in zip(
features_def["object_model_name"].to_list(),
features_def["name"].to_list(),
features_def["id"].to_list(),
strict=False,
)
}
# collect all individual queries and their result keys
queries: list[tuple[str, str, sql.Composed]] = []
for object_name, features_dict in features.items():
obj_id = obj_id_map[object_name]
obj_model = obj_model_map[object_name]
for feature_name, feature_dict in features_dict.items():
# checking if the feature exists for the object
key = (obj_model, feature_name)
if key not in feature_id_map:
raise ValueError(f"Feature {feature_name} not found for object {object_name}.")
feature_id = feature_id_map[key]
# making sure "value" or "timestamp" are in the condition and order_by
for field in ["order_by", "condition"]:
if not re.search(r"\bvalue\b", feature_dict[field]) and not re.search(r"\btimestamp\b", feature_dict[field]):
raise ValueError(f'{field} for feature {feature_name} in object {object_name} must contain "value" or "timestamp".')
query = sql.SQL(
"""
SELECT timestamp::TIMESTAMP FROM performance.feature_values
WHERE object_id = {object_id}
AND feature_id = {feature_id}
AND {condition}
ORDER BY {order_by}
LIMIT {limit}
""",
).format(
object_id=sql.Literal(obj_id),
feature_id=sql.Literal(feature_id),
condition=sql.SQL(feature_dict["condition"]),
order_by=sql.SQL(feature_dict["order_by"]),
limit=sql.Literal(feature_dict["limit"]),
)
queries.append((object_name, feature_name, query))
# execute all queries and collect results
results: dict[str, dict[str, list[datetime] | None]] = {obj: {} for obj in features}
for object_name, feature_name, query in queries:
df = self._perfdb.conn.read_to_polars(query=query, schema_overrides={"timestamp": pl.Datetime})
results[object_name][feature_name] = None if df.is_empty() else df["timestamp"].to_list()
return results