Skip to content

Feature Values - When At Condition

FeatureValuesWhenAtCondition(perfdb)

Class used for getting timestamps when the feature meets a desired condition. Can be accessed via perfdb.features.values.whenatcondition.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

get(features)

Gets the timestamps when the feature meets a desired condition.

Parameters:

  • features

    (dict[str, dict[str, dict[str, str]]]) –

    Dictionary in the format (values are examples):

    {
        object_name: {
            feature_name: {
                "condition": "feature > 1000 AND timestamp_local > '2021-01-01'",
                "order_by": "timestamp_local DESC",
                "limit": 1,
            },
            ...
        },
        ...
    }
    

    The keys needed for each feature are:

    • condition: The condition to be met. Any valid SQL condition can be used. The "feature" string is replaced by the feature id automatically.
    • order_by: The order by clause to be used. Any valid SQL order by clause can be used. The "feature" string is replaced by the feature id automatically.
    • limit: The number of timestamps to return. A minimum of 1 is required.

Returns:

  • dict[str, dict[str, list[datetime] | None]]

    Dict containing the timestamps when the condition is met. The number of timestamps returned will be dictated by the limit key in the arguments. It is in the following format:

    {
        object_name: {
            feature_name: [timestamp1, timestamp2, ...],
            ...
        },
        ...
    }
    

    If the condition is not met, the value is None.

Source code in echo_postgres/feature_values_whenatcondition.py
@validate_call
def get(
    self,
    features: dict[str, dict[str, dict[str, str | int]]],
) -> dict[str, dict[str, list[datetime] | None]]:
    """Gets the timestamps when the feature meets a desired condition.

    Parameters
    ----------
    features : dict[str, dict[str, dict[str, str]]]
        Dictionary in the format (values are examples):

        ```python
        {
            object_name: {
                feature_name: {
                    "condition": "feature > 1000 AND timestamp_local > '2021-01-01'",
                    "order_by": "timestamp_local DESC",
                    "limit": 1,
                },
                ...
            },
            ...
        }
        ```

        The keys needed for each feature are:

        - **condition**: The condition to be met. Any valid SQL condition can be used. The "feature" string is replaced by the feature id automatically.
        - **order_by**: The order by clause to be used. Any valid SQL order by clause can be used. The "feature" string is replaced by the feature id automatically.
        - **limit**: The number of timestamps to return. A minimum of 1 is required.

    Returns
    -------
    dict[str, dict[str, list[datetime] | None]]
        Dict containing the timestamps when the condition is met. The number of timestamps returned will be dictated by the limit key in the arguments. It is in the following format:

        ```python
        {
            object_name: {
                feature_name: [timestamp1, timestamp2, ...],
                ...
            },
            ...
        }
        ```

        If the condition is not met, the value is None.
    """
    # defining json schema to check the input
    json_schema = {
        "type": "object",
        "additionalProperties": {
            "type": "object",
            "additionalProperties": {
                "type": "object",
                "properties": {
                    "condition": {"type": "string"},
                    "order_by": {"type": "string"},
                    "limit": {"type": "integer", "minimum": 1},
                },
                "required": ["condition", "order_by", "limit"],
                "additionalProperties": False,
            },
        },
    }

    # checking arguments
    try:
        validate(instance=features, schema=json_schema)
    except Exception as e:
        raise ValueError("Invalid format for features argument.") from e

    # dict to store the results
    results = {}
    # iterating over the objects
    for object_name, features_dict in features.items():
        # dict to store the results for the object
        results[object_name] = {}
        # iterating over the features
        for feature_name, feature_dict in features_dict.items():
            # replacing the feature string in the condition and order_by, only full words using regex
            condition = re.sub(r"\bfeature\b", feature_name, feature_dict["condition"])
            order_by = re.sub(r"\bfeature\b", feature_name, feature_dict["order_by"])

            # getting the query to get the timestamps

            fn_query = sql.SQL(
                "SELECT * FROM performance.fn_get_object_feature_values_query({object_name}, ARRAY[{feature}], 'name', {where_clause}, ARRAY[{order_by}], {limit})",
            ).format(
                object_name=sql.Literal(object_name),
                feature=sql.Literal(feature_name),
                where_clause=sql.Literal(condition),
                order_by=sql.Literal(order_by),
                limit=sql.Literal(feature_dict["limit"]),
            )

            # getting the query
            try:
                with self._perfdb.conn.reconnect() as conn:
                    cur = conn.execute(fn_query)
            except Exception as e:
                raise RuntimeError(f"Error while trying to get the query for {object_name} and {feature_name}.") from e

            if cur.rowcount == 0:
                raise RuntimeError(f"Error while trying to get the query for {object_name} and {feature_name}.")

            values_query = cur.fetchone()[0]

            # getting the timestamps
            with self._perfdb.conn.reconnect() as conn:
                df = conn.read_to_pandas(query=values_query, dtype_backend=None)

            # storing the timestamps
            if df.empty:
                results[object_name][feature_name] = None
            else:
                results[object_name][feature_name] = df["timestamp_local"].tolist()

    return results