Skip to content

Feature Values - When At Condition

FeatureValuesWhenAtCondition(perfdb)

Class used for getting timestamps when the feature meets a desired condition. Can be accessed via perfdb.features.values.whenatcondition.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
Python
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

get(features)

Gets the timestamps when the feature meets a desired condition.

Parameters:

  • features

    (dict[str, dict[str, dict[str, str]]]) –

    Dictionary in the format (values are examples):

    Python
    {
        object_name: {
            feature_name: {
                "condition": "value > 1000 AND timestamp > '2021-01-01'",
                "order_by": "timestamp DESC",
                "limit": 1,
            },
            ...
        },
        ...
    }
    

    The keys needed for each feature are:

    • condition: The condition to be met. Any valid SQL condition can be used. You must always do the comparisons against "value" and "timestamp" columns.
    • order_by: The order by clause to be used. Any valid SQL order by clause can be used. You must always order by "value" or "timestamp" columns.
    • limit: The number of timestamps to return. A minimum of 1 is required.

Returns:

  • dict[str, dict[str, list[datetime] | None]]

    Dict containing the timestamps when the condition is met. The number of timestamps returned will be dictated by the limit key in the arguments. It is in the following format:

    Python
    {
        object_name: {
            feature_name: [timestamp1, timestamp2, ...],
            ...
        },
        ...
    }
    

    If the condition is not met, the value is None.

Source code in echo_postgres/feature_values_whenatcondition.py
Python
@validate_call
def get(
    self,
    features: dict[str, dict[str, dict[str, str | int]]],
) -> dict[str, dict[str, list[datetime] | None]]:
    """Gets the timestamps when the feature meets a desired condition.

    Parameters
    ----------
    features : dict[str, dict[str, dict[str, str]]]
        Dictionary in the format (values are examples):

        ```python
        {
            object_name: {
                feature_name: {
                    "condition": "value > 1000 AND timestamp > '2021-01-01'",
                    "order_by": "timestamp DESC",
                    "limit": 1,
                },
                ...
            },
            ...
        }
        ```

        The keys needed for each feature are:

        - **condition**: The condition to be met. Any valid SQL condition can be used. You must always do the comparisons against "value" and "timestamp" columns.
        - **order_by**: The order by clause to be used. Any valid SQL order by clause can be used. You must always order by "value" or "timestamp" columns.
        - **limit**: The number of timestamps to return. A minimum of 1 is required.

    Returns
    -------
    dict[str, dict[str, list[datetime] | None]]
        Dict containing the timestamps when the condition is met. The number of timestamps returned will be dictated by the limit key in the arguments. It is in the following format:

        ```python
        {
            object_name: {
                feature_name: [timestamp1, timestamp2, ...],
                ...
            },
            ...
        }
        ```

        If the condition is not met, the value is None.
    """
    # defining json schema to check the input
    json_schema = {
        "type": "object",
        "additionalProperties": {
            "type": "object",
            "additionalProperties": {
                "type": "object",
                "properties": {
                    "condition": {"type": "string"},
                    "order_by": {"type": "string"},
                    "limit": {"type": "integer", "minimum": 1},
                },
                "required": ["condition", "order_by", "limit"],
                "additionalProperties": False,
            },
        },
    }

    # checking arguments
    try:
        validate(instance=features, schema=json_schema)
    except Exception as e:
        raise ValueError("Invalid format for features argument.") from e

    # getting the models of all the objects
    object_names = list(features.keys())
    objs = self._perfdb.objects.instances.get(object_names=object_names, output_type="pl.DataFrame")
    # validating if all objects were found
    if objs.shape[0] != len(features):
        missing_objs = set(features.keys()) - set(objs["name"].to_list())
        raise ValueError(f"The following objects were not found in the database: {missing_objs}")

    # pre-build lookup dicts to avoid repeated DataFrame filtering
    obj_id_map: dict[str, int] = dict(zip(objs["name"].to_list(), objs["id"].to_list(), strict=False))
    obj_model_map: dict[str, str] = dict(
        zip(objs["name"].to_list(), objs["object_model_name"].to_list(), strict=False),
    )

    obj_models = sorted(set(obj_model_map.values()))

    # getting the features for the desired models and building a lookup dict
    features_def = self._perfdb.features.definitions.get(object_models=obj_models, output_type="pl.DataFrame")
    feature_id_map: dict[tuple[str, str], int] = {
        (model, name): fid
        for model, name, fid in zip(
            features_def["object_model_name"].to_list(),
            features_def["name"].to_list(),
            features_def["id"].to_list(),
            strict=False,
        )
    }

    # collect all individual queries and their result keys
    queries: list[tuple[str, str, sql.Composed]] = []
    for object_name, features_dict in features.items():
        obj_id = obj_id_map[object_name]
        obj_model = obj_model_map[object_name]

        for feature_name, feature_dict in features_dict.items():
            # checking if the feature exists for the object
            key = (obj_model, feature_name)
            if key not in feature_id_map:
                raise ValueError(f"Feature {feature_name} not found for object {object_name}.")

            feature_id = feature_id_map[key]

            # making sure "value" or "timestamp" are in the condition and order_by
            for field in ["order_by", "condition"]:
                if not re.search(r"\bvalue\b", feature_dict[field]) and not re.search(r"\btimestamp\b", feature_dict[field]):
                    raise ValueError(f'{field} for feature {feature_name} in object {object_name} must contain "value" or "timestamp".')

            query = sql.SQL(
                """
                SELECT timestamp::TIMESTAMP FROM performance.feature_values
                WHERE object_id = {object_id}
                AND feature_id = {feature_id}
                AND {condition}
                ORDER BY {order_by}
                LIMIT {limit}
                """,
            ).format(
                object_id=sql.Literal(obj_id),
                feature_id=sql.Literal(feature_id),
                condition=sql.SQL(feature_dict["condition"]),
                order_by=sql.SQL(feature_dict["order_by"]),
                limit=sql.Literal(feature_dict["limit"]),
            )

            queries.append((object_name, feature_name, query))

    # execute all queries and collect results
    results: dict[str, dict[str, list[datetime] | None]] = {obj: {} for obj in features}
    for object_name, feature_name, query in queries:
        df = self._perfdb.conn.read_to_polars(query=query, schema_overrides={"timestamp": pl.Datetime})
        results[object_name][feature_name] = None if df.is_empty() else df["timestamp"].to_list()

    return results