ONS Contestations¶
OnsContestations(perfdb)
¶
Class used for handling ONS data contestations. Can be accessed via perfdb.ons.contestations.
Parameters:
Source code in echo_postgres/ons_contestations.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Class used for handling ONS data contestations.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
super().__init__(perfdb)
delete(period, object_names=None)
¶
Deletes ons contestation values from the database, only will delete the queued contestations.
Parameters:
-
(period¶DateTimeRange) –Period of time to delete the data for.
-
(object_names¶list[str], default:None) –List of object names to delete the data for. By default None
Source code in echo_postgres/ons_contestations.py
@validate_call
def delete(
self,
period: DateTimeRange,
object_names: list[str] | None = None,
) -> None:
"""Deletes ons contestation values from the database, only will delete the queued contestations.
Parameters
----------
period : DateTimeRange
Period of time to delete the data for.
object_names : list[str], optional
List of object names to delete the data for. By default None
"""
# build the query
query = [
sql.SQL(
"""DELETE FROM performance.ons_data_contestation WHERE (date >= {period_start} AND date <= {period_end})""",
).format(
period_start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
period_end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
if object_names:
# getting object id
obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
if len(obj_ids) != len(object_names):
missing_objs = set(object_names) - set(obj_ids)
raise ValueError(f"Could not find the following objects: {missing_objs}")
query.append(sql.SQL(" AND object_id IN ({ids})").format(ids=sql.SQL(", ").join(map(sql.Literal, obj_ids.values()))))
query = sql.Composed(query)
# deleting
self._perfdb.conn.execute(query)
logger.debug(f"Deleted {self._perfdb.conn.rowcount} rows from ons_data_contestation table")
get(period, object_names=None, output_type='DataFrame')
¶
Get ONS data contestations for a period.
This method reads from the view performance.v_ons_data_contestation and returns
contestation records indexed by object_name and date.
Columns returned by the view (and present in the output) include:
- object_id (int)
- object_name (str)
- date (date)
- echo_values (json/jsonb)
- ons_values (json/jsonb)
- contestation_timestamp (datetime)
- user_id (int)
- user_name (str)
- echo_status (str)
- ons_status (str)
Parameters:
-
(period¶DateTimeRange) –Desired period to query (start/end).
-
(object_names¶list[str] | None, default:None) –If provided, filter results to these object names (uses internal ids for performance).
-
(output_type¶Literal['dict', 'DataFrame', 'pl.DataFrame'], default:'DataFrame') –Output format. If
dict, structure is {object_name: {date: {column: value}}}.
Returns:
-
DataFrame | DataFrame | dict–Depending on
output_type, returns apandas.DataFrame,polars.DataFrame, or nesteddictwith the structure{object_name: {date: {column: value}}}.
Source code in echo_postgres/ons_contestations.py
@validate_call
def get(
self,
period: DateTimeRange,
object_names: list[str] | None = None,
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "DataFrame",
) -> pd.DataFrame | pl.DataFrame | dict[str, dict[pd.Timestamp, dict[str, Any]]]:
"""Get ONS data contestations for a period.
This method reads from the view `performance.v_ons_data_contestation` and returns
contestation records indexed by `object_name` and `date`.
Columns returned by the view (and present in the output) include:
- `object_id` (int)
- `object_name` (str)
- `date` (date)
- `echo_values` (json/jsonb)
- `ons_values` (json/jsonb)
- `contestation_timestamp` (datetime)
- `user_id` (int)
- `user_name` (str)
- `echo_status` (str)
- `ons_status` (str)
Parameters
----------
period : DateTimeRange
Desired period to query (start/end).
object_names : list[str] | None, optional
If provided, filter results to these object names (uses internal ids for performance).
output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
Output format. If `dict`, structure is {object_name: {date: {column: value}}}.
Returns
-------
pandas.DataFrame | polars.DataFrame | dict
Depending on `output_type`, returns a `pandas.DataFrame`, `polars.DataFrame`,
or nested `dict` with the structure `{object_name: {date: {column: value}}}`.
"""
# building the query
query = [
sql.SQL(
"""SELECT * FROM performance.{view} WHERE (date >= {period_start} AND date <= {period_end})""",
).format(
view=sql.Identifier("v_ons_data_contestation"),
period_start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
period_end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
where = []
if object_names:
# getting id's of the objects for faster query
object_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
if len(object_ids) != len(object_names):
missing_names = [name for name in object_names if name not in object_ids]
raise ValueError(f"Could not find the following object names: {missing_names}")
object_ids = list(object_ids.values())
where.append(
sql.SQL("object_id IN ({names})").format(
names=sql.SQL(", ").join(map(sql.Literal, object_ids)),
),
)
if where:
query.append(sql.SQL(" AND ("))
query.append(sql.SQL(" AND ").join(where))
query.append(sql.SQL(")"))
query.append(
sql.SQL("""ORDER BY date, {object_col}""").format(
object_col=sql.Identifier("object_name"),
),
)
query = sql.Composed(query)
df = self._perfdb.conn.read_to_polars(
query,
infer_schema_length=None,
use_adbc=False,
)
name_col = "object_name"
index_cols = [name_col, "date"]
return convert_output(df, output_type, index_col=index_cols, nest_by_index=True)
insert(df, on_conflict='update')
¶
Inserts ONS data contestations into the database.
Parameters:
-
(df¶DataFrame | DataFrame) –DataFrame with the contestation data to insert.
Required columns: -
object_name(str): Name of the object. -date(date): Contestation date. -user_name(str): Name of the user who created the contestation. -echo_values(json/dict/str): Echo values data (will be serialized to JSON). -echo_status(str): Status name (e.g., 'Queued', 'Contested'); must exist in the contestation status lookup.Optional columns: -
ons_values(json/dict/str): ONS values data (will be serialized to JSON). If not provided, set to NULL. -contestation_timestamp: Will be ignored if provided; always auto-filled by the database. -
(on_conflict¶Literal['ignore', 'update'], default:'update') –What to do in case of conflict. Can be one of ["ignore", "update"]. By default "update"
Raises:
-
ValueError–If required columns are missing or if object_name, user_name, or echo_status values cannot be resolved to ids.
Source code in echo_postgres/ons_contestations.py
@validate_call
def insert(
self,
df: pd.DataFrame | pl.DataFrame,
on_conflict: Literal["ignore", "update"] = "update",
) -> None:
"""Inserts ONS data contestations into the database.
Parameters
----------
df : DataFrame | pl.DataFrame
DataFrame with the contestation data to insert.
Required columns:
- `object_name` (str): Name of the object.
- `date` (date): Contestation date.
- `user_name` (str): Name of the user who created the contestation.
- `echo_values` (json/dict/str): Echo values data (will be serialized to JSON).
- `echo_status` (str): Status name (e.g., 'Queued', 'Contested'); must exist in the contestation status lookup.
Optional columns:
- `ons_values` (json/dict/str): ONS values data (will be serialized to JSON). If not provided, set to NULL.
- `contestation_timestamp`: Will be ignored if provided; always auto-filled by the database.
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. Can be one of ["ignore", "update"].
By default "update"
Raises
------
ValueError
If required columns are missing or if object_name, user_name, or echo_status values cannot be resolved to ids.
"""
# converting to polars if it is a pandas DataFrame
if isinstance(df, pd.DataFrame):
df = pl.from_pandas(df)
df: pl.DataFrame
# checking columns
required_cols = [
"object_name",
"date",
"user_name",
"echo_values",
"echo_status",
]
if any(col not in df.columns for col in required_cols):
missing_cols = [col for col in required_cols if col not in df.columns]
raise ValueError(f"df is missing the following columns: {missing_cols}")
# making a copy of the DataFrame with required columns
cols_to_select = required_cols + (["ons_values"] if "ons_values" in df.columns else [])
df = df.select(cols_to_select).clone()
# remove contestation_timestamp if present (it's auto-filled by the database)
cols_to_drop = [c for c in ["contestation_timestamp"] if c in df.columns]
if cols_to_drop:
df = df.drop(cols_to_drop)
# replacing object_name by their ids
wanted_obj_names = df["object_name"].unique().to_list()
obj_ids = self._perfdb.objects.instances.get_ids(object_names=wanted_obj_names)
if len(obj_ids) != len(wanted_obj_names):
missing_names = [name for name in wanted_obj_names if name not in obj_ids]
raise ValueError(f"Could not find the following object names: {missing_names}")
# replacing user_name by their ids
wanted_user_names = df["user_name"].unique().to_list()
user_ids = self._perfdb.users.instances.get_ids(names=wanted_user_names)
if len(user_ids) != len(wanted_user_names):
missing_names = [name for name in wanted_user_names if name not in user_ids]
raise ValueError(f"Could not find the following user names: {missing_names}")
# add object_id and user_id columns
df = df.with_columns(
pl.col("object_name").replace_strict(obj_ids, return_dtype=pl.Int32).alias("object_id"),
pl.col("user_name").replace_strict(user_ids, return_dtype=pl.Int32).alias("user_id"),
)
df = df.drop(["object_name", "user_name"])
# dropping duplicates
df = df.unique()
# Adjusting data type - Converting echo_values and ons_values to JSON strings if they are not already strings
for col in ["echo_values", "ons_values"]:
if col in df.columns:
df = df.with_columns(pl.col(col).struct.json_encode().alias(col))
# inserting
if_exists_mapping = {
"ignore": "append",
"update": "update",
}
self._perfdb.conn.polars_to_sql(
df=df,
table_name="ons_data_contestation",
schema="performance",
if_exists=if_exists_mapping[on_conflict],
)
logger.debug(f"Inserted {df.shape[0]} rows into ons_data_contestation table.")