User Role Permission Values¶
UserRolePermissionValues(perfdb)
¶
Class used for handling user role permissions. Can be accessed via perfdb.users.roles.permissions.values.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(role_names, permission_type_names=None, values=None)
¶
Deletes user role permissions from the database.
Parameters:
-
(role_names¶list[str]) –List of user role names to delete permissions for.
-
(permission_type_names¶list[str] | None, default:None) –List of permission type names to delete. If None, deletes all permissions for the roles.
-
(values¶list[str] | None, default:None) –List of values to delete. If None, deletes all permissions for the roles and permission types.
Source code in echo_postgres/user_role_permission_values.py
@validate_call
def delete(
self,
role_names: list[str],
permission_type_names: list[str] | None = None,
values: list[str] | None = None,
) -> None:
"""Deletes user role permissions from the database.
Parameters
----------
role_names : list[str]
List of user role names to delete permissions for.
permission_type_names : list[str] | None, optional
List of permission type names to delete. If None, deletes all permissions for the roles.
values : list[str] | None, optional
List of values to delete. If None, deletes all permissions for the roles and permission types.
"""
# checks if the roles exist
role_ids = self._perfdb.users.roles.instances.get_ids(names=role_names)
if not role_ids:
raise ValueError(f"Roles {role_names} do not exist.")
where_query = [
sql.SQL("role_id = ANY({role_ids})").format(
role_ids=sql.Literal(list(role_ids.values())),
),
]
if permission_type_names:
permission_type_ids = self._perfdb.users.roles.permissions.types.get_ids(names=permission_type_names)
if not permission_type_ids:
raise ValueError(f"Permission types {permission_type_names} do not exist.")
where_query.append(
sql.SQL("permission_type_id = ANY({permission_type_ids})").format(
permission_type_ids=sql.Literal(list(permission_type_ids.values())),
),
)
if values:
where_query.append(
sql.SQL("value = ANY({values})").format(
values=sql.Literal(values),
),
)
where_query = (
sql.SQL("WHERE {conditions}").format(
conditions=sql.SQL(" AND ").join(where_query),
)
if where_query
else sql.SQL("")
)
query = sql.SQL("DELETE FROM performance.user_role_permissions {where_query}").format(
where_query=where_query,
)
with self._perfdb.conn.reconnect() as conn:
result = conn.execute(query)
logger.debug(f"Deleted {result.rowcount} user role permissions from the database")
get(role_names=None, permission_types=None, filter_type='and', output_type='dict')
¶
Gets all user role permissions.
The most useful keys/columns returned are:
- value
- permitted
Parameters:
-
(role_names¶list[str] | None, default:None) –List of role names to retrieve permissions for.
If None, retrieves permissions for all user roles.
-
(permission_types¶list[str] | None, default:None) –List of permission types to filter by.
If None, retrieves all permission types.
-
(output_type¶Literal['dict', 'DataFrame'], default:'dict') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
Returns:
-
dict[str, dict[str, dict[str, Any]]]–In case of
output_type="dict", returns a dictionary in the format {role_name: {permission_type_name: {attribute: value, ...}, ...}, ...} -
DataFrame–In case of
output_type="DataFrame", returns a DataFrame with MultiIndex with levels [role_name, permission_type_name] and columns as attributes of the permissions.
Source code in echo_postgres/user_role_permission_values.py
@validate_call
def get(
self,
role_names: list[str] | None = None,
permission_types: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame"] = "dict",
) -> dict[str, dict[str, dict[str, Any]]] | pd.DataFrame:
"""Gets all user role permissions.
The most useful keys/columns returned are:
- value
- permitted
Parameters
----------
role_names : list[str] | None, optional
List of role names to retrieve permissions for.
If None, retrieves permissions for all user roles.
permission_types : list[str] | None, optional
List of permission types to filter by.
If None, retrieves all permission types.
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
Returns
-------
dict[str, dict[str, dict[str, Any]]]
In case of `output_type="dict"`, returns a dictionary in the format {role_name: {permission_type_name: {attribute: value, ...}, ...}, ...}
pd.DataFrame
In case of `output_type="DataFrame"`, returns a DataFrame with MultiIndex with levels [role_name, permission_type_name] and columns as attributes of the permissions.
"""
where_query = []
if role_names:
where_query.append(
sql.SQL("role_name = ANY({role_names})").format(
role_names=sql.Literal(role_names),
),
)
if permission_types:
where_query.append(
sql.SQL("permission_type_name = ANY({permission_types})").format(
permission_types=sql.Literal(permission_types),
),
)
where_query = (
sql.SQL("WHERE {conditions}").format(
conditions=sql.SQL(f" {filter_type.upper()} ").join(where_query),
)
if where_query
else sql.SQL("")
)
query = sql.SQL(
"SELECT * FROM performance.v_user_role_permissions {where_query} ORDER BY role_name, permission_type_name",
).format(
where_query=where_query,
)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query)
df = df.set_index(["role_name", "permission_type_name"])
if output_type == "DataFrame":
return df
result = df.to_dict(orient="index")
final_result = {}
for (role_name, permission_type_name), values in result.items():
if role_name not in final_result:
final_result[role_name] = {}
final_result[role_name][permission_type_name] = values
return final_result
insert(role_name, permission_type_name, value=None, permitted=True, on_conflict='ignore')
¶
Inserts a new user role permission.
Parameters:
-
(role_name¶str) –Name of the user role to insert the permission for.
-
(permission_type_name¶str) –Name of the permission type to insert. Must already exist in the database.
-
(value¶str | None, default:None) –Value of the permission. If None, the permission will be inserted without a value.
-
(permitted¶bool, default:True) –Whether the permission is permitted or not. By default True.
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"
Source code in echo_postgres/user_role_permission_values.py
@validate_call
def insert(
self,
role_name: str,
permission_type_name: str,
value: str | None = None,
permitted: bool = True,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
"""Inserts a new user role permission.
Parameters
----------
role_name : str
Name of the user role to insert the permission for.
permission_type_name : str
Name of the permission type to insert. Must already exist in the database.
value : str | None, optional
Value of the permission. If None, the permission will be inserted without a value.
permitted : bool, optional
Whether the permission is permitted or not. By default True.
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. Can be one of ["ignore", "update"].
By default "ignore"
"""
# checks if the role exists
role_ids = self._perfdb.users.roles.instances.get_ids(names=[role_name])
if not role_ids:
raise ValueError(f"Role '{role_name}' does not exist.")
# checks if the permission type exists
permission_type_ids = self._perfdb.users.roles.permissions.types.get_ids(names=[permission_type_name])
if not permission_type_ids:
raise ValueError(f"Permission type '{permission_type_name}' does not exist.")
# creating polars DataFrame with the data to insert
data = {
"permission_type_id": [permission_type_ids[permission_type_name]],
"role_id": [role_ids[role_name]],
"value": [value],
"permitted": [permitted],
}
df = pl.DataFrame(data)
# inserting the data
if_exists_mapping = {
"ignore": "append",
"update": "update",
}
with self._perfdb.conn.reconnect() as conn:
conn.polars_to_sql(
df=df,
table_name="user_role_permissions",
schema="performance",
if_exists=if_exists_mapping[on_conflict],
conflict_cols=["role_id", "permission_type_id", "value"],
)
logger.debug(f"Inserted/updated {len(df)} user role permissions into the database")