Feature Values - When At Condition¶
FeatureValuesWhenAtCondition(perfdb)
¶
Class used for getting timestamps when the feature meets a desired condition. Can be accessed via perfdb.features.values.whenatcondition.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
get(features)
¶
Gets the timestamps when the feature meets a desired condition.
Parameters:
-
(features¶dict[str, dict[str, dict[str, str]]]) –Dictionary in the format (values are examples):
{ object_name: { feature_name: { "condition": "feature > 1000 AND timestamp_local > '2021-01-01'", "order_by": "timestamp_local DESC", "limit": 1, }, ... }, ... }The keys needed for each feature are:
- condition: The condition to be met. Any valid SQL condition can be used. The "feature" string is replaced by the feature id automatically.
- order_by: The order by clause to be used. Any valid SQL order by clause can be used. The "feature" string is replaced by the feature id automatically.
- limit: The number of timestamps to return. A minimum of 1 is required.
Returns:
-
dict[str, dict[str, list[datetime] | None]]–Dict containing the timestamps when the condition is met. The number of timestamps returned will be dictated by the limit key in the arguments. It is in the following format:
{ object_name: { feature_name: [timestamp1, timestamp2, ...], ... }, ... }If the condition is not met, the value is None.
Source code in echo_postgres/feature_values_whenatcondition.py
@validate_call
def get(
self,
features: dict[str, dict[str, dict[str, str | int]]],
) -> dict[str, dict[str, list[datetime] | None]]:
"""Gets the timestamps when the feature meets a desired condition.
Parameters
----------
features : dict[str, dict[str, dict[str, str]]]
Dictionary in the format (values are examples):
```python
{
object_name: {
feature_name: {
"condition": "feature > 1000 AND timestamp_local > '2021-01-01'",
"order_by": "timestamp_local DESC",
"limit": 1,
},
...
},
...
}
```
The keys needed for each feature are:
- **condition**: The condition to be met. Any valid SQL condition can be used. The "feature" string is replaced by the feature id automatically.
- **order_by**: The order by clause to be used. Any valid SQL order by clause can be used. The "feature" string is replaced by the feature id automatically.
- **limit**: The number of timestamps to return. A minimum of 1 is required.
Returns
-------
dict[str, dict[str, list[datetime] | None]]
Dict containing the timestamps when the condition is met. The number of timestamps returned will be dictated by the limit key in the arguments. It is in the following format:
```python
{
object_name: {
feature_name: [timestamp1, timestamp2, ...],
...
},
...
}
```
If the condition is not met, the value is None.
"""
# defining json schema to check the input
json_schema = {
"type": "object",
"additionalProperties": {
"type": "object",
"additionalProperties": {
"type": "object",
"properties": {
"condition": {"type": "string"},
"order_by": {"type": "string"},
"limit": {"type": "integer", "minimum": 1},
},
"required": ["condition", "order_by", "limit"],
"additionalProperties": False,
},
},
}
# checking arguments
try:
validate(instance=features, schema=json_schema)
except Exception as e:
raise ValueError("Invalid format for features argument.") from e
# dict to store the results
results = {}
# iterating over the objects
for object_name, features_dict in features.items():
# dict to store the results for the object
results[object_name] = {}
# iterating over the features
for feature_name, feature_dict in features_dict.items():
# replacing the feature string in the condition and order_by, only full words using regex
condition = re.sub(r"\bfeature\b", feature_name, feature_dict["condition"])
order_by = re.sub(r"\bfeature\b", feature_name, feature_dict["order_by"])
# getting the query to get the timestamps
fn_query = sql.SQL(
"SELECT * FROM performance.fn_get_object_feature_values_query({object_name}, ARRAY[{feature}], 'name', {where_clause}, ARRAY[{order_by}], {limit})",
).format(
object_name=sql.Literal(object_name),
feature=sql.Literal(feature_name),
where_clause=sql.Literal(condition),
order_by=sql.Literal(order_by),
limit=sql.Literal(feature_dict["limit"]),
)
# getting the query
try:
with self._perfdb.conn.reconnect() as conn:
cur = conn.execute(fn_query)
except Exception as e:
raise RuntimeError(f"Error while trying to get the query for {object_name} and {feature_name}.") from e
if cur.rowcount == 0:
raise RuntimeError(f"Error while trying to get the query for {object_name} and {feature_name}.")
values_query = cur.fetchone()[0]
# getting the timestamps
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query=values_query, dtype_backend=None)
# storing the timestamps
if df.empty:
results[object_name][feature_name] = None
else:
results[object_name][feature_name] = df["timestamp_local"].tolist()
return results