ONS SPE Site Mapping¶
OnsSpesSiteMapping(perfdb)
¶
Class used for handling ONS SPE Site Mapping. Can be accessed via perfdb.ons.spes.sitemapping.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(relations)
¶
Deletes ONS SPE to Site mappings.
Parameters:
-
(relations¶list[OnsSpesSiteMappingDict]) –List of relations to delete. Each relation must be a dictionary containing the following keys:
ons_site_name: Name of the ONS Siteons_spe_name: Name of the ONS SPErelation_start: Start of the relation (date)
Example:
relations = [ { "ons_site_name": "Site A", "ons_spe_name": "SPE 1", "relation_start": date(2023, 1, 1), }, { "ons_site_name": "Site B", "ons_spe_name": "SPE 2", "relation_start": date(2023, 2, 1), }, ]
Source code in echo_postgres/ons_spes_sitemapping.py
@validate_call
def delete(
self,
relations: list[OnsSpesSiteMappingDict],
) -> None:
"""Deletes ONS SPE to Site mappings.
Parameters
----------
relations : list[OnsSpesSiteMappingDict]
List of relations to delete. Each relation must be a dictionary containing the following keys:
- `ons_site_name`: Name of the ONS Site
- `ons_spe_name`: Name of the ONS SPE
- `relation_start`: Start of the relation (date)
Example:
```python
relations = [
{
"ons_site_name": "Site A",
"ons_spe_name": "SPE 1",
"relation_start": date(2023, 1, 1),
},
{
"ons_site_name": "Site B",
"ons_spe_name": "SPE 2",
"relation_start": date(2023, 2, 1),
},
]
```
"""
if not relations:
return
# getting the ids of the objects
wanted_obj_names = set()
for relation in relations:
wanted_obj_names.add(relation["ons_site_name"])
wanted_obj_names.add(relation["ons_spe_name"])
existing_objs = self._perfdb.objects.instances.get_ids(object_names=list(wanted_obj_names))
# checking if all objects exist
missing_objs = wanted_obj_names - set(existing_objs.keys())
if missing_objs:
raise ValueError(f"The following objects do not exist in the database: {', '.join(missing_objs)}")
for relation in relations:
site_id = existing_objs[relation["ons_site_name"]]
spe_id = existing_objs[relation["ons_spe_name"]]
query = sql.SQL("""
DELETE FROM
performance.ons_spe_site_mapping
WHERE
ons_site_id = {site_id} AND
ons_spe_id = {spe_id} AND
relation_start = {relation_start}
""").format(
site_id=sql.Literal(site_id),
spe_id=sql.Literal(spe_id),
relation_start=sql.Literal(relation["relation_start"]),
)
with self._perfdb.conn.reconnect() as conn:
result = conn.execute(query)
if result.rowcount == 0:
logger.debug(f"No mapping found for deletion: {relation}")
else:
logger.debug(f"Deleted mapping: {relation}")
get(site_names=None, spe_names=None, filter_type='and', period=None, only_most_recent=False, output_type='DataFrame')
¶
Gets the mapping from ONS SPEs to ONS Sites.
The mapping is time dependent as some SPEs might be moved from one site to another. Besides the ons_site_name, ons_spe_name columns the result will also contain relation_start and relation_end columns indicating the period when the mapping was valid. If the relation_end is null it means that it is the current mapping.
Parameters:
-
(site_names¶list[str] | None, default:None) –Name of the sites to filter the results, by default None
-
(spe_names¶list[str] | None, default:None) –Name of the SPEs to filter the results, by default None
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(period¶DateTimeRange | None, default:None) –Time period to filter the results. It will be used against start and end, so any relation that covers the desired period will be returned. If None, no time constrain is applied.
By default None
-
(only_most_recent¶bool, default:False) –Whether to return only the most recent mapping. If set to True, the
periodparameter is ignored. By default False -
(output_type¶Literal['DataFrame', 'pl.DataFrame'], default:'DataFrame') –The type of output to return, by default a pandas "DataFrame"
Returns:
-
DataFrame–In case of
output_type="DataFrame", a pandas DataFrame -
DataFrame–In case of
output_type="pl.DataFrame", a polars DataFrame
Source code in echo_postgres/ons_spes_sitemapping.py
@validate_call
def get(
self,
site_names: list[str] | None = None,
spe_names: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
period: DateTimeRange | None = None,
only_most_recent: bool = False,
output_type: Literal["DataFrame", "pl.DataFrame"] = "DataFrame",
) -> pd.DataFrame | pl.DataFrame:
"""Gets the mapping from ONS SPEs to ONS Sites.
The mapping is time dependent as some SPEs might be moved from one site to another. Besides the `ons_site_name`, `ons_spe_name` columns the result will also contain `relation_start` and `relation_end` columns indicating the period when the mapping was valid. If the `relation_end` is null it means that it is the current mapping.
Parameters
----------
site_names : list[str] | None, optional
Name of the sites to filter the results, by default None
spe_names : list[str] | None, optional
Name of the SPEs to filter the results, by default None
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
period : DateTimeRange | None, optional
Time period to filter the results. It will be used against start and end, so any relation that covers the desired period will be returned. If None, no time constrain is applied.
By default None
only_most_recent : bool, optional
Whether to return only the most recent mapping. If set to True, the `period` parameter is ignored. By default False
output_type : Literal["DataFrame", "pl.DataFrame"], optional
The type of output to return, by default a pandas "DataFrame"
Returns
-------
pd.DataFrame
In case of `output_type="DataFrame"`, a pandas DataFrame
pl.DataFrame
In case of `output_type="pl.DataFrame"`, a polars DataFrame
"""
where = []
if site_names is not None:
where.append(sql.SQL("ons_site_name = ANY(ARRAY[{names}])").format(names=sql.SQL(", ").join(map(sql.Literal, site_names))))
if spe_names is not None:
where.append(sql.SQL("ons_spe_name = ANY(ARRAY[{names}])").format(names=sql.SQL(", ").join(map(sql.Literal, spe_names))))
if where:
match filter_type:
case "and":
where_clause = sql.SQL(" AND ").join(where)
case "or":
where_clause = sql.SQL(" OR ").join(where)
else:
where_clause = None
if period is not None and not only_most_recent:
where_clause = sql.SQL("({existing}) AND ({new})").format(
existing=where_clause if where_clause else sql.SQL("TRUE"),
new=sql.SQL("NOT (COALESCE(relation_end, NOW() + INTERVAL '1 day') < {start} OR relation_start > {end})").format(
start=sql.Literal(period.start),
end=sql.Literal(period.end),
),
)
if only_most_recent:
where_clause = sql.SQL("({existing}) AND ({new})").format(
existing=where_clause if where_clause else sql.SQL("TRUE"),
new=sql.SQL("relation_end IS NULL"),
)
query = sql.SQL("""
SELECT
*
FROM
performance.v_ons_spe_site_mapping
{where_clause}
ORDER BY
ons_site_name,
ons_spe_name,
relation_start DESC
""").format(where_clause=sql.Composed([sql.SQL("WHERE "), where_clause]) if where_clause else sql.SQL(""))
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_polars(query)
if output_type == "DataFrame":
return df.to_pandas(use_pyarrow_extension_array=True)
return df
insert(data, on_conflict='ignore')
¶
Inserts new ONS SPE to Site mappings.
Parameters:
-
(data¶DataFrame | DataFrame) –Data to insert. Must contain the following columns:
ons_site_name: Name of the ONS Siteons_spe_name: Name of the ONS SPErelation_start: Start of the relation (datetime)relation_end: End of the relation (datetime), can be null if current relation
Any additional columns will be ignored.
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –Action to take in case of conflict (i.e., trying to insert a mapping that already exists). Can be one of:
"ignore": Do nothing, skip the conflicting row."update": Update the existing row with the new data.
By default "ignore"
Source code in echo_postgres/ons_spes_sitemapping.py
@validate_call
def insert(
self,
data: pd.DataFrame | pl.DataFrame,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
"""Inserts new ONS SPE to Site mappings.
Parameters
----------
data : pd.DataFrame | pl.DataFrame
Data to insert. Must contain the following columns:
- `ons_site_name`: Name of the ONS Site
- `ons_spe_name`: Name of the ONS SPE
- `relation_start`: Start of the relation (datetime)
- `relation_end`: End of the relation (datetime), can be null if current relation
Any additional columns will be ignored.
on_conflict : Literal["ignore", "update"], optional
Action to take in case of conflict (i.e., trying to insert a mapping that already exists). Can be one of:
- `"ignore"`: Do nothing, skip the conflicting row.
- `"update"`: Update the existing row with the new data.
By default "ignore"
"""
required_columns = {"ons_site_name", "ons_spe_name", "relation_start", "relation_end"}
existing_columns = set(data.columns)
missing_columns = required_columns - existing_columns
if missing_columns:
raise ValueError(f"Missing required columns: {', '.join(missing_columns)}")
# converting to polars in case of pandas
if isinstance(data, pd.DataFrame):
data = pl.from_pandas(
data,
schema_overrides={
"ons_site_name": pl.String,
"ons_spe_name": pl.String,
"relation_start": pl.Datetime,
"relation_end": pl.Datetime,
},
)
# keeping only required columns
data = data.select(required_columns)
# getting the ids of the objects
wanted_obj_names = set(data["ons_site_name"].unique().to_list() + data["ons_spe_name"].unique().to_list())
existing_objs = self._perfdb.objects.instances.get_ids(object_names=list(wanted_obj_names))
missing_objs = wanted_obj_names - set(existing_objs.keys())
if missing_objs:
raise ValueError(f"The following objects do not exist in the database: {', '.join(missing_objs)}")
data = data.with_columns(
pl.col("ons_site_name").replace(existing_objs).cast(pl.Int64).alias("ons_site_id"),
pl.col("ons_spe_name").replace(existing_objs).cast(pl.Int64).alias("ons_spe_id"),
).select(
["ons_site_id", "ons_spe_id", "relation_start", "relation_end"],
)
# inserting
if_exists_mapping = {
"ignore": "append",
"update": "update",
}
with self._perfdb.conn.reconnect() as conn:
conn.polars_to_sql(df=data, table_name="ons_spe_site_mapping", if_exists=if_exists_mapping[on_conflict], schema="performance")
logger.debug(f"Inserted {len(data)} rows into ons_spe_site_mapping with on_conflict='{on_conflict}'")