Skip to content

ONS SPE Site Mapping

OnsSpesSiteMapping(perfdb)

Class used for handling ONS SPE Site Mapping. Can be accessed via perfdb.ons.spes.sitemapping.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(relations)

Deletes ONS SPE to Site mappings.

Parameters:

  • relations

    (list[OnsSpesSiteMappingDict]) –

    List of relations to delete. Each relation must be a dictionary containing the following keys:

    • ons_site_name: Name of the ONS Site
    • ons_spe_name: Name of the ONS SPE
    • relation_start: Start of the relation (date)

    Example:

    relations = [
        {
            "ons_site_name": "Site A",
            "ons_spe_name": "SPE 1",
            "relation_start": date(2023, 1, 1),
        },
        {
            "ons_site_name": "Site B",
            "ons_spe_name": "SPE 2",
            "relation_start": date(2023, 2, 1),
        },
    ]
    
Source code in echo_postgres/ons_spes_sitemapping.py
@validate_call
def delete(
    self,
    relations: list[OnsSpesSiteMappingDict],
) -> None:
    """Deletes ONS SPE to Site mappings.

    Parameters
    ----------
    relations : list[OnsSpesSiteMappingDict]
        List of relations to delete. Each relation must be a dictionary containing the following keys:

        - `ons_site_name`: Name of the ONS Site
        - `ons_spe_name`: Name of the ONS SPE
        - `relation_start`: Start of the relation (date)

        Example:

        ```python
        relations = [
            {
                "ons_site_name": "Site A",
                "ons_spe_name": "SPE 1",
                "relation_start": date(2023, 1, 1),
            },
            {
                "ons_site_name": "Site B",
                "ons_spe_name": "SPE 2",
                "relation_start": date(2023, 2, 1),
            },
        ]
        ```
    """
    if not relations:
        return

    # getting the ids of the objects
    wanted_obj_names = set()
    for relation in relations:
        wanted_obj_names.add(relation["ons_site_name"])
        wanted_obj_names.add(relation["ons_spe_name"])

    existing_objs = self._perfdb.objects.instances.get_ids(object_names=list(wanted_obj_names))

    # checking if all objects exist
    missing_objs = wanted_obj_names - set(existing_objs.keys())
    if missing_objs:
        raise ValueError(f"The following objects do not exist in the database: {', '.join(missing_objs)}")

    for relation in relations:
        site_id = existing_objs[relation["ons_site_name"]]
        spe_id = existing_objs[relation["ons_spe_name"]]

        query = sql.SQL("""
            DELETE FROM
                performance.ons_spe_site_mapping
            WHERE
                ons_site_id = {site_id} AND
                ons_spe_id = {spe_id} AND
                relation_start = {relation_start}
        """).format(
            site_id=sql.Literal(site_id),
            spe_id=sql.Literal(spe_id),
            relation_start=sql.Literal(relation["relation_start"]),
        )

        with self._perfdb.conn.reconnect() as conn:
            result = conn.execute(query)
            if result.rowcount == 0:
                logger.debug(f"No mapping found for deletion: {relation}")
            else:
                logger.debug(f"Deleted mapping: {relation}")

get(site_names=None, spe_names=None, filter_type='and', period=None, only_most_recent=False, output_type='DataFrame')

Gets the mapping from ONS SPEs to ONS Sites.

The mapping is time dependent as some SPEs might be moved from one site to another. Besides the ons_site_name, ons_spe_name columns the result will also contain relation_start and relation_end columns indicating the period when the mapping was valid. If the relation_end is null it means that it is the current mapping.

Parameters:

  • site_names

    (list[str] | None, default: None ) –

    Name of the sites to filter the results, by default None

  • spe_names

    (list[str] | None, default: None ) –

    Name of the SPEs to filter the results, by default None

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

  • period

    (DateTimeRange | None, default: None ) –

    Time period to filter the results. It will be used against start and end, so any relation that covers the desired period will be returned. If None, no time constrain is applied.

    By default None

  • only_most_recent

    (bool, default: False ) –

    Whether to return only the most recent mapping. If set to True, the period parameter is ignored. By default False

  • output_type

    (Literal['DataFrame', 'pl.DataFrame'], default: 'DataFrame' ) –

    The type of output to return, by default a pandas "DataFrame"

Returns:

  • DataFrame

    In case of output_type="DataFrame", a pandas DataFrame

  • DataFrame

    In case of output_type="pl.DataFrame", a polars DataFrame

Source code in echo_postgres/ons_spes_sitemapping.py
@validate_call
def get(
    self,
    site_names: list[str] | None = None,
    spe_names: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
    period: DateTimeRange | None = None,
    only_most_recent: bool = False,
    output_type: Literal["DataFrame", "pl.DataFrame"] = "DataFrame",
) -> pd.DataFrame | pl.DataFrame:
    """Gets the mapping from ONS SPEs to ONS Sites.

    The mapping is time dependent as some SPEs might be moved from one site to another. Besides the `ons_site_name`, `ons_spe_name` columns the result will also contain `relation_start` and `relation_end` columns indicating the period when the mapping was valid. If the `relation_end` is null it means that it is the current mapping.

    Parameters
    ----------
    site_names : list[str] | None, optional
        Name of the sites to filter the results, by default None
    spe_names : list[str] | None, optional
        Name of the SPEs to filter the results, by default None
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"
    period : DateTimeRange | None, optional
        Time period to filter the results. It will be used against start and end, so any relation that covers the desired period will be returned. If None, no time constrain is applied.

        By default None
    only_most_recent : bool, optional
        Whether to return only the most recent mapping. If set to True, the `period` parameter is ignored. By default False
    output_type : Literal["DataFrame", "pl.DataFrame"], optional
        The type of output to return, by default a pandas "DataFrame"

    Returns
    -------
    pd.DataFrame
        In case of `output_type="DataFrame"`, a pandas DataFrame
    pl.DataFrame
        In case of `output_type="pl.DataFrame"`, a polars DataFrame
    """
    where = []

    if site_names is not None:
        where.append(sql.SQL("ons_site_name = ANY(ARRAY[{names}])").format(names=sql.SQL(", ").join(map(sql.Literal, site_names))))
    if spe_names is not None:
        where.append(sql.SQL("ons_spe_name = ANY(ARRAY[{names}])").format(names=sql.SQL(", ").join(map(sql.Literal, spe_names))))

    if where:
        match filter_type:
            case "and":
                where_clause = sql.SQL(" AND ").join(where)
            case "or":
                where_clause = sql.SQL(" OR ").join(where)
    else:
        where_clause = None

    if period is not None and not only_most_recent:
        where_clause = sql.SQL("({existing}) AND ({new})").format(
            existing=where_clause if where_clause else sql.SQL("TRUE"),
            new=sql.SQL("NOT (COALESCE(relation_end, NOW() + INTERVAL '1 day') < {start} OR relation_start > {end})").format(
                start=sql.Literal(period.start),
                end=sql.Literal(period.end),
            ),
        )
    if only_most_recent:
        where_clause = sql.SQL("({existing}) AND ({new})").format(
            existing=where_clause if where_clause else sql.SQL("TRUE"),
            new=sql.SQL("relation_end IS NULL"),
        )

    query = sql.SQL("""
        SELECT
            *
        FROM
            performance.v_ons_spe_site_mapping
        {where_clause}
        ORDER BY
            ons_site_name,
            ons_spe_name,
            relation_start DESC
    """).format(where_clause=sql.Composed([sql.SQL("WHERE "), where_clause]) if where_clause else sql.SQL(""))

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_polars(query)

    if output_type == "DataFrame":
        return df.to_pandas(use_pyarrow_extension_array=True)
    return df

insert(data, on_conflict='ignore')

Inserts new ONS SPE to Site mappings.

Parameters:

  • data

    (DataFrame | DataFrame) –

    Data to insert. Must contain the following columns:

    • ons_site_name: Name of the ONS Site
    • ons_spe_name: Name of the ONS SPE
    • relation_start: Start of the relation (datetime)
    • relation_end: End of the relation (datetime), can be null if current relation

    Any additional columns will be ignored.

  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    Action to take in case of conflict (i.e., trying to insert a mapping that already exists). Can be one of:

    • "ignore": Do nothing, skip the conflicting row.
    • "update": Update the existing row with the new data.

    By default "ignore"

Source code in echo_postgres/ons_spes_sitemapping.py
@validate_call
def insert(
    self,
    data: pd.DataFrame | pl.DataFrame,
    on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
    """Inserts new ONS SPE to Site mappings.

    Parameters
    ----------
    data : pd.DataFrame | pl.DataFrame
        Data to insert. Must contain the following columns:

        - `ons_site_name`: Name of the ONS Site
        - `ons_spe_name`: Name of the ONS SPE
        - `relation_start`: Start of the relation (datetime)
        - `relation_end`: End of the relation (datetime), can be null if current relation

        Any additional columns will be ignored.
    on_conflict : Literal["ignore", "update"], optional
        Action to take in case of conflict (i.e., trying to insert a mapping that already exists). Can be one of:

        - `"ignore"`: Do nothing, skip the conflicting row.
        - `"update"`: Update the existing row with the new data.

        By default "ignore"
    """
    required_columns = {"ons_site_name", "ons_spe_name", "relation_start", "relation_end"}

    existing_columns = set(data.columns)
    missing_columns = required_columns - existing_columns
    if missing_columns:
        raise ValueError(f"Missing required columns: {', '.join(missing_columns)}")

    # converting to polars in case of pandas
    if isinstance(data, pd.DataFrame):
        data = pl.from_pandas(
            data,
            schema_overrides={
                "ons_site_name": pl.String,
                "ons_spe_name": pl.String,
                "relation_start": pl.Datetime,
                "relation_end": pl.Datetime,
            },
        )

    # keeping only required columns
    data = data.select(required_columns)

    # getting the ids of the objects
    wanted_obj_names = set(data["ons_site_name"].unique().to_list() + data["ons_spe_name"].unique().to_list())
    existing_objs = self._perfdb.objects.instances.get_ids(object_names=list(wanted_obj_names))

    missing_objs = wanted_obj_names - set(existing_objs.keys())
    if missing_objs:
        raise ValueError(f"The following objects do not exist in the database: {', '.join(missing_objs)}")

    data = data.with_columns(
        pl.col("ons_site_name").replace(existing_objs).cast(pl.Int64).alias("ons_site_id"),
        pl.col("ons_spe_name").replace(existing_objs).cast(pl.Int64).alias("ons_spe_id"),
    ).select(
        ["ons_site_id", "ons_spe_id", "relation_start", "relation_end"],
    )

    # inserting
    if_exists_mapping = {
        "ignore": "append",
        "update": "update",
    }
    with self._perfdb.conn.reconnect() as conn:
        conn.polars_to_sql(df=data, table_name="ons_spe_site_mapping", if_exists=if_exists_mapping[on_conflict], schema="performance")

    logger.debug(f"Inserted {len(data)} rows into ons_spe_site_mapping with on_conflict='{on_conflict}'")