Skip to content

User Role Permission Values

UserRolePermissionValues(perfdb)

Class used for handling user role permissions. Can be accessed via perfdb.users.roles.permissions.values.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(role_names, permission_type_names=None, values=None)

Deletes user role permissions from the database.

Parameters:

  • role_names

    (list[str]) –

    List of user role names to delete permissions for.

  • permission_type_names

    (list[str] | None, default: None ) –

    List of permission type names to delete. If None, deletes all permissions for the roles.

  • values

    (list[str] | None, default: None ) –

    List of values to delete. If None, deletes all permissions for the roles and permission types.

Source code in echo_postgres/user_role_permission_values.py
@validate_call
def delete(
    self,
    role_names: list[str],
    permission_type_names: list[str] | None = None,
    values: list[str] | None = None,
) -> None:
    """Deletes user role permissions from the database.

    Parameters
    ----------
    role_names : list[str]
        List of user role names to delete permissions for.
    permission_type_names : list[str] | None, optional
        List of permission type names to delete. If None, deletes all permissions for the roles.
    values : list[str] | None, optional
        List of values to delete. If None, deletes all permissions for the roles and permission types.
    """
    # checks if the roles exist
    role_ids = self._perfdb.users.roles.instances.get_ids(names=role_names)
    if not role_ids:
        raise ValueError(f"Roles {role_names} do not exist.")

    where_query = [
        sql.SQL("role_id = ANY({role_ids})").format(
            role_ids=sql.Literal(list(role_ids.values())),
        ),
    ]
    if permission_type_names:
        permission_type_ids = self._perfdb.users.roles.permissions.types.get_ids(names=permission_type_names)
        if not permission_type_ids:
            raise ValueError(f"Permission types {permission_type_names} do not exist.")
        where_query.append(
            sql.SQL("permission_type_id = ANY({permission_type_ids})").format(
                permission_type_ids=sql.Literal(list(permission_type_ids.values())),
            ),
        )
    if values:
        where_query.append(
            sql.SQL("value = ANY({values})").format(
                values=sql.Literal(values),
            ),
        )
    where_query = (
        sql.SQL("WHERE {conditions}").format(
            conditions=sql.SQL(" AND ").join(where_query),
        )
        if where_query
        else sql.SQL("")
    )

    query = sql.SQL("DELETE FROM performance.user_role_permissions {where_query}").format(
        where_query=where_query,
    )
    with self._perfdb.conn.reconnect() as conn:
        result = conn.execute(query)

    logger.debug(f"Deleted {result.rowcount} user role permissions from the database")

get(role_names=None, permission_types=None, filter_type='and', output_type='dict')

Gets all user role permissions.

The most useful keys/columns returned are:

  • value
  • permitted

Parameters:

  • role_names

    (list[str] | None, default: None ) –

    List of role names to retrieve permissions for.

    If None, retrieves permissions for all user roles.

  • permission_types

    (list[str] | None, default: None ) –

    List of permission types to filter by.

    If None, retrieves all permission types.

  • output_type

    (Literal['dict', 'DataFrame'], default: 'dict' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

Returns:

  • dict[str, dict[str, dict[str, Any]]]

    In case of output_type="dict", returns a dictionary in the format {role_name: {permission_type_name: {attribute: value, ...}, ...}, ...}

  • DataFrame

    In case of output_type="DataFrame", returns a DataFrame with MultiIndex with levels [role_name, permission_type_name] and columns as attributes of the permissions.

Source code in echo_postgres/user_role_permission_values.py
@validate_call
def get(
    self,
    role_names: list[str] | None = None,
    permission_types: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame"] = "dict",
) -> dict[str, dict[str, dict[str, Any]]] | pd.DataFrame:
    """Gets all user role permissions.

    The most useful keys/columns returned are:

    - value
    - permitted

    Parameters
    ----------
    role_names : list[str] | None, optional
        List of role names to retrieve permissions for.

        If None, retrieves permissions for all user roles.
    permission_types : list[str] | None, optional
        List of permission types to filter by.

        If None, retrieves all permission types.
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

    Returns
    -------
    dict[str, dict[str, dict[str, Any]]]
        In case of `output_type="dict"`, returns a dictionary in the format {role_name: {permission_type_name: {attribute: value, ...}, ...}, ...}
    pd.DataFrame
        In case of `output_type="DataFrame"`, returns a DataFrame with MultiIndex with levels [role_name, permission_type_name] and columns as attributes of the permissions.
    """
    where_query = []
    if role_names:
        where_query.append(
            sql.SQL("role_name = ANY({role_names})").format(
                role_names=sql.Literal(role_names),
            ),
        )
    if permission_types:
        where_query.append(
            sql.SQL("permission_type_name = ANY({permission_types})").format(
                permission_types=sql.Literal(permission_types),
            ),
        )
    where_query = (
        sql.SQL("WHERE {conditions}").format(
            conditions=sql.SQL(f" {filter_type.upper()} ").join(where_query),
        )
        if where_query
        else sql.SQL("")
    )

    query = sql.SQL(
        "SELECT * FROM performance.v_user_role_permissions {where_query} ORDER BY role_name, permission_type_name",
    ).format(
        where_query=where_query,
    )
    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query)

    df = df.set_index(["role_name", "permission_type_name"])

    if output_type == "DataFrame":
        return df

    result = df.to_dict(orient="index")
    final_result = {}
    for (role_name, permission_type_name), values in result.items():
        if role_name not in final_result:
            final_result[role_name] = {}
        final_result[role_name][permission_type_name] = values

    return final_result

insert(role_name, permission_type_name, value=None, permitted=True, on_conflict='ignore')

Inserts a new user role permission.

Parameters:

  • role_name

    (str) –

    Name of the user role to insert the permission for.

  • permission_type_name

    (str) –

    Name of the permission type to insert. Must already exist in the database.

  • value

    (str | None, default: None ) –

    Value of the permission. If None, the permission will be inserted without a value.

  • permitted

    (bool, default: True ) –

    Whether the permission is permitted or not. By default True.

  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"

Source code in echo_postgres/user_role_permission_values.py
@validate_call
def insert(
    self,
    role_name: str,
    permission_type_name: str,
    value: str | None = None,
    permitted: bool = True,
    on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
    """Inserts a new user role permission.

    Parameters
    ----------
    role_name : str
        Name of the user role to insert the permission for.
    permission_type_name : str
        Name of the permission type to insert. Must already exist in the database.
    value : str | None, optional
        Value of the permission. If None, the permission will be inserted without a value.
    permitted : bool, optional
        Whether the permission is permitted or not. By default True.
    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict. Can be one of ["ignore", "update"].
        By default "ignore"
    """
    # checks if the role exists
    role_ids = self._perfdb.users.roles.instances.get_ids(names=[role_name])
    if not role_ids:
        raise ValueError(f"Role '{role_name}' does not exist.")
    # checks if the permission type exists
    permission_type_ids = self._perfdb.users.roles.permissions.types.get_ids(names=[permission_type_name])
    if not permission_type_ids:
        raise ValueError(f"Permission type '{permission_type_name}' does not exist.")

    # creating polars DataFrame with the data to insert
    data = {
        "permission_type_id": [permission_type_ids[permission_type_name]],
        "role_id": [role_ids[role_name]],
        "value": [value],
        "permitted": [permitted],
    }
    df = pl.DataFrame(data)

    # inserting the data
    if_exists_mapping = {
        "ignore": "append",
        "update": "update",
    }
    with self._perfdb.conn.reconnect() as conn:
        conn.polars_to_sql(
            df=df,
            table_name="user_role_permissions",
            schema="performance",
            if_exists=if_exists_mapping[on_conflict],
            conflict_cols=["role_id", "permission_type_id", "value"],
        )

    logger.debug(f"Inserted/updated {len(df)} user role permissions into the database")