Skip to content

ONS Contestations

OnsContestations(perfdb)

Class used for handling ONS data contestations. Can be accessed via perfdb.ons.contestations.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/ons_contestations.py
Python
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Class used for handling ONS data contestations.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.
    """
    super().__init__(perfdb)

delete(period, object_names=None)

Deletes ons contestation values from the database, only will delete the queued contestations.

Parameters:

  • period

    (DateTimeRange) –

    Period of time to delete the data for.

  • object_names

    (list[str], default: None ) –

    List of object names to delete the data for. By default None

Source code in echo_postgres/ons_contestations.py
Python
@validate_call
def delete(
    self,
    period: DateTimeRange,
    object_names: list[str] | None = None,
) -> None:
    """Deletes ons contestation values from the database, only will delete the queued contestations.

    Parameters
    ----------
    period : DateTimeRange
        Period of time to delete the data for.
    object_names : list[str], optional
        List of object names to delete the data for. By default None
    """
    # build the query
    query = [
        sql.SQL(
            """DELETE FROM performance.ons_data_contestation WHERE (date >= {period_start} AND date <= {period_end})""",
        ).format(
            period_start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            period_end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]
    if object_names:
        # getting object id
        obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
        if len(obj_ids) != len(object_names):
            missing_objs = set(object_names) - set(obj_ids)
            raise ValueError(f"Could not find the following objects: {missing_objs}")
        query.append(sql.SQL(" AND object_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, obj_ids.values()))))

    query = sql.Composed(query)

    # deleting
    self._perfdb.conn.execute(query)

    logger.debug(f"Deleted {self._perfdb.conn.rowcount} rows from ons_data_contestation table")

get(period, object_names=None, output_type='DataFrame')

Get ONS data contestations for a period.

This method reads from the view performance.v_ons_data_contestation and returns contestation records indexed by object_name and date.

Columns returned by the view (and present in the output) include: - object_id (int) - object_name (str) - date (date) - echo_values (json/jsonb) - ons_values (json/jsonb) - contestation_timestamp (datetime) - user_id (int) - user_name (str) - echo_status (str) - ons_status (str)

Parameters:

  • period

    (DateTimeRange) –

    Desired period to query (start/end).

  • object_names

    (list[str] | None, default: None ) –

    If provided, filter results to these object names (uses internal ids for performance).

  • output_type

    (Literal['dict', 'DataFrame', 'pl.DataFrame'], default: 'DataFrame' ) –

    Output format. If dict, structure is {object_name: {date: {column: value}}}.

Returns:

  • DataFrame | DataFrame | dict

    Depending on output_type, returns a pandas.DataFrame, polars.DataFrame, or nested dict with the structure {object_name: {date: {column: value}}}.

Source code in echo_postgres/ons_contestations.py
Python
@validate_call
def get(
    self,
    period: DateTimeRange,
    object_names: list[str] | None = None,
    output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "DataFrame",
) -> pd.DataFrame | pl.DataFrame | dict[str, dict[pd.Timestamp, dict[str, Any]]]:
    """Get ONS data contestations for a period.

    This method reads from the view `performance.v_ons_data_contestation` and returns
    contestation records indexed by `object_name` and `date`.

    Columns returned by the view (and present in the output) include:
    - `object_id` (int)
    - `object_name` (str)
    - `date` (date)
    - `echo_values` (json/jsonb)
    - `ons_values` (json/jsonb)
    - `contestation_timestamp` (datetime)
    - `user_id` (int)
    - `user_name` (str)
    - `echo_status` (str)
    - `ons_status` (str)

    Parameters
    ----------
    period : DateTimeRange
        Desired period to query (start/end).
    object_names : list[str] | None, optional
        If provided, filter results to these object names (uses internal ids for performance).
    output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
        Output format. If `dict`, structure is {object_name: {date: {column: value}}}.

    Returns
    -------
    pandas.DataFrame | polars.DataFrame | dict
        Depending on `output_type`, returns a `pandas.DataFrame`, `polars.DataFrame`,
        or nested `dict` with the structure `{object_name: {date: {column: value}}}`.
    """
    # building the query
    query = [
        sql.SQL(
            """SELECT * FROM performance.{view} WHERE (date >= {period_start} AND date <= {period_end})""",
        ).format(
            view=sql.Identifier("v_ons_data_contestation"),
            period_start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            period_end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]

    where = []
    if object_names:
        # getting id's of the objects for faster query
        object_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
        if len(object_ids) != len(object_names):
            missing_names = [name for name in object_names if name not in object_ids]
            raise ValueError(f"Could not find the following object names: {missing_names}")
        object_ids = list(object_ids.values())

        where.append(
            sql.SQL("object_id IN ({names})").format(
                names=sql.SQL(", ").join(map(sql.Literal, object_ids)),
            ),
        )
    if where:
        query.append(sql.SQL(" AND ("))
        query.append(sql.SQL(" AND ").join(where))
        query.append(sql.SQL(")"))

    query.append(
        sql.SQL("""ORDER BY date, {object_col}""").format(
            object_col=sql.Identifier("object_name"),
        ),
    )

    query = sql.Composed(query)

    df = self._perfdb.conn.read_to_polars(
        query,
        infer_schema_length=None,
        use_adbc=False,
    )

    name_col = "object_name"
    index_cols = [name_col, "date"]
    return convert_output(df, output_type, index_col=index_cols, nest_by_index=True)

insert(df, on_conflict='update')

Inserts ONS data contestations into the database.

Parameters:

  • df

    (DataFrame | DataFrame) –

    DataFrame with the contestation data to insert.

    Required columns: - object_name (str): Name of the object. - date (date): Contestation date. - user_name (str): Name of the user who created the contestation. - echo_values (json/dict/str): Echo values data (will be serialized to JSON). - echo_status (str): Status name (e.g., 'Queued', 'Contested'); must exist in the contestation status lookup.

    Optional columns: - ons_values (json/dict/str): ONS values data (will be serialized to JSON). If not provided, set to NULL. - contestation_timestamp: Will be ignored if provided; always auto-filled by the database.

  • on_conflict

    (Literal['ignore', 'update'], default: 'update' ) –

    What to do in case of conflict. Can be one of ["ignore", "update"]. By default "update"

Raises:

  • ValueError

    If required columns are missing or if object_name, user_name, or echo_status values cannot be resolved to ids.

Source code in echo_postgres/ons_contestations.py
Python
@validate_call
def insert(
    self,
    df: pd.DataFrame | pl.DataFrame,
    on_conflict: Literal["ignore", "update"] = "update",
) -> None:
    """Inserts ONS data contestations into the database.

    Parameters
    ----------
    df : DataFrame | pl.DataFrame
        DataFrame with the contestation data to insert.

        Required columns:
        - `object_name` (str): Name of the object.
        - `date` (date): Contestation date.
        - `user_name` (str): Name of the user who created the contestation.
        - `echo_values` (json/dict/str): Echo values data (will be serialized to JSON).
        - `echo_status` (str): Status name (e.g., 'Queued', 'Contested'); must exist in the contestation status lookup.

        Optional columns:
        - `ons_values` (json/dict/str): ONS values data (will be serialized to JSON). If not provided, set to NULL.
        - `contestation_timestamp`: Will be ignored if provided; always auto-filled by the database.

    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict. Can be one of ["ignore", "update"].
        By default "update"

    Raises
    ------
    ValueError
        If required columns are missing or if object_name, user_name, or echo_status values cannot be resolved to ids.
    """
    # converting to polars if it is a pandas DataFrame
    if isinstance(df, pd.DataFrame):
        df = pl.from_pandas(df)
    df: pl.DataFrame

    # checking columns
    required_cols = [
        "object_name",
        "date",
        "user_name",
        "echo_values",
        "echo_status",
    ]
    if any(col not in df.columns for col in required_cols):
        missing_cols = [col for col in required_cols if col not in df.columns]
        raise ValueError(f"df is missing the following columns: {missing_cols}")

    # making a copy of the DataFrame with required columns
    cols_to_select = required_cols + (["ons_values"] if "ons_values" in df.columns else [])
    df = df.select(cols_to_select).clone()

    # remove contestation_timestamp if present (it's auto-filled by the database)
    cols_to_drop = [c for c in ["contestation_timestamp"] if c in df.columns]
    if cols_to_drop:
        df = df.drop(cols_to_drop)

    # replacing object_name by their ids
    wanted_obj_names = df["object_name"].unique().to_list()
    obj_ids = self._perfdb.objects.instances.get_ids(object_names=wanted_obj_names)

    if len(obj_ids) != len(wanted_obj_names):
        missing_names = [name for name in wanted_obj_names if name not in obj_ids]
        raise ValueError(f"Could not find the following object names: {missing_names}")

    # replacing user_name by their ids
    wanted_user_names = df["user_name"].unique().to_list()
    user_ids = self._perfdb.users.instances.get_ids(names=wanted_user_names)

    if len(user_ids) != len(wanted_user_names):
        missing_names = [name for name in wanted_user_names if name not in user_ids]
        raise ValueError(f"Could not find the following user names: {missing_names}")

    # add object_id and user_id columns
    df = df.with_columns(
        pl.col("object_name").replace_strict(obj_ids, return_dtype=pl.Int32).alias("object_id"),
        pl.col("user_name").replace_strict(user_ids, return_dtype=pl.Int32).alias("user_id"),
    )
    df = df.drop(["object_name", "user_name"])

    # dropping duplicates
    df = df.unique()
    # Adjusting data type - Converting echo_values and ons_values to JSON strings if they are not already strings
    for col in ["echo_values", "ons_values"]:
        if col in df.columns:
            df = df.with_columns(pl.col(col).struct.json_encode().alias(col))

    # inserting
    if_exists_mapping = {
        "ignore": "append",
        "update": "update",
    }
    self._perfdb.conn.polars_to_sql(
        df=df,
        table_name="ons_data_contestation",
        schema="performance",
        if_exists=if_exists_mapping[on_conflict],
    )

    logger.debug(f"Inserted {df.shape[0]} rows into ons_data_contestation table.")