User Instances¶
UserInstances(perfdb)
¶
Class used for handling Users. Can be accessed via perfdb.users.instances.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
assign_roles(user_name, role_names)
¶
Assign roles to a user.
Parameters:
-
(user_name¶str) –Name of the user to assign roles to.
-
(role_names¶list[str]) –List of role names to assign to the user.
Returns:
-
list[str]–List of role names that were successfully assigned to the user.
Source code in echo_postgres/user_instances.py
@validate_call
def assign_roles(self, user_name: str, role_names: list[str]) -> list[str]:
"""Assign roles to a user.
Parameters
----------
user_name : str
Name of the user to assign roles to.
role_names : list[str]
List of role names to assign to the user.
Returns
-------
list[str]
List of role names that were successfully assigned to the user.
"""
query = sql.SQL("SELECT * FROM performance.fn_assign_user_roles({user_name}, '{role_names}')").format(
user_name=sql.Literal(user_name),
role_names=sql.SQL(",").join(map(sql.SQL, role_names)),
)
with self._perfdb.conn.reconnect() as conn:
result = conn.execute(query, skip_retry=True)
# getting string with the role names
role_names_str = result.fetchone()[0] if result.rowcount > 0 else ""
# splitting the string into a list
assigned_roles = role_names_str.split(",") if role_names_str else []
if not assigned_roles:
raise ValueError(f"No roles were assigned for user '{user_name}'")
return assigned_roles
delete(names)
¶
Deletes users from the database.
Parameters:
-
(names¶list[str]) –List of user names to delete.
Source code in echo_postgres/user_instances.py
def delete(self, names: list[str]) -> None:
"""Deletes users from the database.
Parameters
----------
names : list[str]
List of user names to delete.
"""
if not names:
return
# get existing users
existing_users = self.get_ids(names=names)
# removing all assigned roles first
for name in existing_users:
self.unassign_roles(user_name=name)
where_query = sql.SQL("WHERE name = ANY({user_names})").format(
user_names=sql.Literal(names),
)
query = sql.SQL("DELETE FROM performance.users {where_query}").format(
where_query=where_query,
)
with self._perfdb.conn.reconnect() as conn:
# deleting
result = conn.execute(query)
logger.debug(f"Deleted {result.rowcount} users from the database")
get(names=None, get_permissions=False, output_type='dict')
¶
Gets all users with detailed information.
The most useful keys/columns returned are:
- id
- disabled
- login_count
- last_login
- wrong_password_attempts
- password_modified_date
- role_ids
- role_names
- permissions (from all roles), which is a list of dicts with the following keys:
- permission_type_id
- permission_type_name
- value
- permitted
Parameters:
-
(names¶list[str] | None, default:None) –List of user names to retrieve information for.
If None, retrieves information for all users.
-
(get_permissions¶bool, default:False) –Whether to retrieve permissions for the users. By default False.
-
(output_type¶Literal['dict', 'DataFrame'], default:'dict') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
Returns:
-
dict[str, dict[str, Any]]–In case output_type is "dict", returns a dictionary in the format {name: {attribute: value, ...}, ...}
-
DataFrame–In case output_type is "DataFrame", returns a DataFrame with the following format: index = name, columns = [attribute, ...]
Source code in echo_postgres/user_instances.py
@validate_call
def get(
self,
names: list[str] | None = None,
get_permissions: bool = False,
output_type: Literal["dict", "DataFrame"] = "dict",
) -> dict[str, dict[str, Any]] | pd.DataFrame:
"""Gets all users with detailed information.
The most useful keys/columns returned are:
- id
- email
- disabled
- login_count
- last_login
- wrong_password_attempts
- password_modified_date
- role_ids
- role_names
- permissions (from all roles), which is a list of dicts with the following keys:
- permission_type_id
- permission_type_name
- value
- permitted
Parameters
----------
names : list[str] | None, optional
List of user names to retrieve information for.
If None, retrieves information for all users.
get_permissions : bool, optional
Whether to retrieve permissions for the users. By default False.
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
Returns
-------
dict[str, dict[str, Any]]
In case output_type is "dict", returns a dictionary in the format {name: {attribute: value, ...}, ...}
DataFrame
In case output_type is "DataFrame", returns a DataFrame with the following format: index = name, columns = [attribute, ...]
"""
where_query = (
sql.SQL("WHERE name = ANY({user_names})").format(
user_names=sql.Literal(names),
)
if names
else sql.SQL("")
)
query = sql.SQL(
"SELECT id, name, email, disabled, login_count, last_login, wrong_password_attempts, password_modified_date, role_ids, role_names {permissions} FROM performance.v_users {where_query} ORDER BY name",
).format(
where_query=where_query,
permissions=sql.SQL(", permissions") if get_permissions else sql.SQL(""),
)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query, post_convert="pyarrow")
df = df.set_index("name")
return df.to_dict(orient="index") if output_type == "dict" else df
get_ids(names=None)
¶
Gets all users and their respective ids.
Parameters:
-
(names¶list[str] | None, default:None) –List of user names to retrieve ids for.
If None, retrieves ids for all users.
Returns:
-
dict[str, int]–Dictionary with all users and their respective ids in the format {data_type: id, ...}.
Source code in echo_postgres/user_instances.py
def get_ids(self, names: list[str] | None = None) -> dict[str, int]:
"""Gets all users and their respective ids.
Parameters
----------
names : list[str] | None, optional
List of user names to retrieve ids for.
If None, retrieves ids for all users.
Returns
-------
dict[str, int]
Dictionary with all users and their respective ids in the format {data_type: id, ...}.
"""
where_query = (
sql.SQL("WHERE name = ANY({user_names})").format(
user_names=sql.Literal(names),
)
if names
else sql.SQL("")
)
query = sql.SQL("SELECT name, id FROM performance.users {where_query} ORDER BY name").format(
where_query=where_query,
)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query)
return df.set_index("name").to_dict()["id"]
insert(name, email=None, disabled=None, password=None, role_names=None, on_conflict='ignore')
¶
Inserts or updates a user into the database.
Fields left as None will not be inserted/updated.
Parameters:
-
(name¶str) –Name of the user to insert. Keep in mind that only lower case letters, numbers, and "." are allowed.
-
(email¶EmailStr | None, default:None) –Email of the user. By default None (not set).
-
(disabled¶bool | None, default:None) –Whether the user is disabled. By default None (not set).
-
(password¶str | None, default:None) –Password of the user. By default None (not set).
-
(role_names¶list[str] | None, default:None) –List of role names to assign to the user. This will remove all previously assigned roles and assign the new ones.
By default None (not set).
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"
Source code in echo_postgres/user_instances.py
def insert(
self,
name: str,
email: EmailStr | None = None,
disabled: bool | None = None,
password: str | None = None,
role_names: list[str] | None = None,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
"""Inserts or updates a user into the database.
Fields left as None will not be inserted/updated.
Parameters
----------
name : str
Name of the user to insert. Keep in mind that only lower case letters, numbers, and "." are allowed.
email : EmailStr | None, optional
Email of the user. By default None (not set).
disabled : bool | None, optional
Whether the user is disabled. By default None (not set).
password : str | None, optional
Password of the user. By default None (not set).
role_names : list[str] | None, optional
List of role names to assign to the user. This will remove all previously assigned roles and assign the new ones.
By default None (not set).
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. Can be one of ["ignore", "update"].
By default "ignore"
"""
# creating polars DataFrame with the data to insert
data = {
"name": [name],
}
if email is not None:
data["email"] = [email]
if disabled is not None:
data["disabled"] = [disabled]
df = pl.DataFrame(data)
# inserting the data
if_exists_mapping = {
"ignore": "append",
"update": "update",
}
with self._perfdb.conn.reconnect() as conn:
conn.polars_to_sql(
df=df,
table_name="users",
schema="performance",
if_exists=if_exists_mapping[on_conflict],
conflict_cols=["name"],
)
# defining the password if provided
if password is not None:
query = sql.SQL("SELECT performance.fn_set_user_password({user_name}, {password})").format(
user_name=sql.Literal(name),
password=sql.Literal(password),
)
with self._perfdb.conn.reconnect() as conn:
result = conn.execute(query, skip_retry=True)
# checking if user id was returned (successfully)
if result.rowcount == 0 or not isinstance(result.fetchone()[0], int):
raise ValueError(f"Could not set password for user '{name}'")
# assigning roles if provided
if role_names:
self.unassign_roles(user_name=name) # unassigning all roles first
assigned_roles = self.assign_roles(user_name=name, role_names=role_names)
logger.debug(f"Assigned roles {assigned_roles} to user '{name}'")
logger.debug(f"User '{name}' inserted/updated in the database")
login(user_name, password)
¶
Logs in a user by checking the password.
If the user is not found, or the password is incorrect, or the user is disabled, will raise an error.
Parameters:
Returns:
-
int–User ID if login is successful.
Source code in echo_postgres/user_instances.py
def login(
self,
user_name: str,
password: str,
) -> int:
"""Logs in a user by checking the password.
If the user is not found, or the password is incorrect, or the user is disabled, will raise an error.
Parameters
----------
user_name : str
Name of the user to log in.
password : str
Password of the user.
Returns
-------
int
User ID if login is successful.
"""
query = sql.SQL("SELECT * FROM performance.fn_login_user({user_name}, {password})").format(
user_name=sql.Literal(user_name),
password=sql.Literal(password),
)
with self._perfdb.conn.reconnect() as conn:
result = conn.execute(query, skip_retry=True)
result_mapping = {
0: WrongUserError(f"User {user_name} not found"),
-1: WrongPasswordError(f"Incorrect password for user {user_name}"),
-2: DisabledUserError(f"User {user_name} is disabled"),
}
result_code = result.fetchone()[0]
if result.rowcount == 0 or not isinstance(result_code, int):
raise ValueError(f"Could not log in user '{user_name}'")
if result_code in result_mapping:
raise result_mapping[result_code]
return result_code # Return the user ID if login is successful
unassign_roles(user_name, role_names=None)
¶
Unassigns (removes) roles from a user.
Parameters:
-
(user_name¶str) –Name of the user to unassign roles from.
-
(role_names¶list[str], default:None) –List of role names to unassign from the user.
If None, unassigns all roles from the user.
Returns:
-
list[str]–List of role names that were removed from the user.
Source code in echo_postgres/user_instances.py
@validate_call
def unassign_roles(self, user_name: str, role_names: list[str] | None = None) -> list[str]:
"""Unassigns (removes) roles from a user.
Parameters
----------
user_name : str
Name of the user to unassign roles from.
role_names : list[str], optional
List of role names to unassign from the user.
If None, unassigns all roles from the user.
Returns
-------
list[str]
List of role names that were removed from the user.
"""
# in case no role names are provided, unassigns all roles
# to do that, we need to get all role names first
if role_names is None:
role_names = self.get(names=[user_name])[user_name]["role_names"]
if not role_names:
return []
query = sql.SQL("SELECT * FROM performance.fn_unassign_user_roles({user_name}, '{role_names}')").format(
user_name=sql.Literal(user_name),
role_names=sql.SQL(",").join(map(sql.SQL, role_names)),
)
with self._perfdb.conn.reconnect() as conn:
result = conn.execute(query, skip_retry=True)
# getting string with the role names
role_names_str = result.fetchone()[0] if result.rowcount > 0 else ""
# splitting the string into a list
unassigned_roles = role_names_str.split(",") if role_names_str else []
return unassigned_roles
update(user_id, name=None, email=None, disabled=None, password=None, role_names=None)
¶
Updates a user in the database.
Fields left as None will not be updated.
Parameters:
-
(user_id¶int) –ID of the user to update.
-
(name¶str | None, default:None) –Name of the user to update. Keep in mind that only lower case letters, numbers, and "." are allowed. By default None (not set).
-
(email¶EmailStr | None, default:None) –Email of the user. By default None (not set).
-
(disabled¶bool | None, default:None) –Whether the user is disabled. By default None (not set).
-
(password¶str | None, default:None) –Password of the user. By default None (not set).
-
(role_names¶list[str] | None, default:None) –List of role names to assign to the user.
This will remove all previously assigned roles and assign the new ones.
By default None (not set).
Source code in echo_postgres/user_instances.py
def update(
self,
user_id: int,
name: str | None = None,
email: EmailStr | None = None,
disabled: bool | None = None,
password: str | None = None,
role_names: list[str] | None = None,
) -> None:
"""Updates a user in the database.
Fields left as None will not be updated.
Parameters
----------
user_id : int
ID of the user to update.
name : str | None, optional
Name of the user to update. Keep in mind that only lower case letters, numbers, and "." are allowed.
By default None (not set).
email : EmailStr | None, optional
Email of the user. By default None (not set).
disabled : bool | None, optional
Whether the user is disabled. By default None (not set).
password : str | None, optional
Password of the user. By default None (not set).
role_names : list[str] | None, optional
List of role names to assign to the user.
This will remove all previously assigned roles and assign the new ones.
By default None (not set).
"""
# creating the query
query = sql.SQL(
"UPDATE performance.users SET {set_clause} WHERE id = {user_id}",
).format(
set_clause=sql.SQL(", ").join(
sql.SQL("{} = {}").format(sql.Identifier(col), sql.Literal(value))
for col, value in [
("name", name) if name is not None else None,
("email", email) if email is not None else None,
("disabled", disabled) if disabled is not None else None,
]
if value is not None
),
user_id=sql.Literal(user_id),
)
with self._perfdb.conn.reconnect() as conn:
result = conn.execute(query)
if result.rowcount == 0:
raise ValueError(f"User with ID {user_id} not found")
if role_names is not None or password is not None:
# getting user name for setting password and assigning roles
query = sql.SQL("SELECT name FROM performance.users WHERE id = {user_id}").format(
user_id=sql.Literal(user_id),
)
with self._perfdb.conn.reconnect() as conn:
user_name = conn.execute(query).fetchone()[0]
# defining the password if provided
if password is not None:
query = sql.SQL("SELECT performance.fn_set_user_password({user_name}, {password})").format(
user_name=sql.Literal(user_name),
password=sql.Literal(password),
)
with self._perfdb.conn.reconnect() as conn:
result = conn.execute(query, skip_retry=True)
# checking if user id was returned (successfully)
if result.rowcount == 0 or not isinstance(result.fetchone()[0], int):
raise ValueError(f"Could not set password for user '{user_name}'")
# assigning roles if provided
if role_names:
self.unassign_roles(user_name=user_name) # unassigning all roles first
assigned_roles = self.assign_roles(user_name=user_name, role_names=role_names)
logger.debug(f"Assigned roles {assigned_roles} to user '{user_name}'")