Settings¶
Settings(perfdb)
¶
Class used for handling database settings. Can be accessed via perfdb.settings.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(section, key)
¶
Deletes a setting from the database
Parameters:
Returns:
-
None–
Source code in echo_postgres/settings.py
@validate_call
def delete(self, section: str, key: str) -> None:
"""Deletes a setting from the database
Parameters
----------
section : str
Section name
key : str
Setting name
Returns
-------
None
"""
query = sql.SQL("""
DELETE FROM performance.settings WHERE section = {section} AND key = {key}
""").format(
section=sql.Literal(section),
key=sql.Literal(key),
)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
get(sections=None, keys=None, output_type='dict')
¶
Gets all database settings
The most useful keys/columns returned are:
- id: Unique identifier for the setting
- section: Section name
- key: Setting name
- value: Value of the setting (is a dictionary)
- description: Description of the setting
- schema: Schema used to validate if the setting value is correct, only for database internal checks
Parameters:
-
(sections¶list[str] | None, default:None) –List of sections to filter the settings by. If None, all sections will be returned. By default None
-
(keys¶list[str] | None, default:None) –List of keys to filter the settings by. If None, all keys will be returned. By default None
-
(output_type¶Literal['dict', 'DataFrame'], default:'dict') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
Returns:
-
dict[str, dict[str, dict[str, Any]]]–In case output_type is "dict", returns a dictionary in the format {section: {key: {"value": value, "description": description, "schema": schema}}, ...}
-
DataFrame–In case output_type is "DataFrame", returns a DataFrame with the following format: MultiIndex = [section, key], columns = ["value", "description", "schema"]
Source code in echo_postgres/settings.py
@validate_call
def get(
self,
sections: list[str] | None = None,
keys: list[str] | None = None,
output_type: Literal["dict", "DataFrame"] = "dict",
) -> dict[str, dict[str, dict[str, Any]]] | DataFrame:
"""Gets all database settings
The most useful keys/columns returned are:
- id: Unique identifier for the setting
- section: Section name
- key: Setting name
- value: Value of the setting (is a dictionary)
- description: Description of the setting
- schema: Schema used to validate if the setting value is correct, only for database internal checks
Parameters
----------
sections : list[str] | None, optional
List of sections to filter the settings by. If None, all sections will be returned.
By default None
keys : list[str] | None, optional
List of keys to filter the settings by. If None, all keys will be returned.
By default None
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
Returns
-------
dict[str, dict[str, dict[str, Any]]]
In case output_type is "dict", returns a dictionary in the format {section: {key: {"value": value, "description": description, "schema": schema}}, ...}
DataFrame
In case output_type is "DataFrame", returns a DataFrame with the following format: MultiIndex = [section, key], columns = ["value", "description", "schema"]
"""
where = []
if sections is not None:
where.append(
sql.SQL("section = ANY({sections})").format(
sections=sql.Literal(sections),
),
)
if keys is not None:
where.append(
sql.SQL("key = ANY({keys})").format(
keys=sql.Literal(keys),
),
)
where = sql.Composed([sql.SQL("WHERE "), sql.SQL(" AND ").join(where)]) if where else sql.SQL("")
query = sql.SQL("SELECT * FROM performance.settings {where} ORDER BY section, key").format(
where=where,
)
with self._perfdb.conn.reconnect() as conn:
df: DataFrame = conn.read_to_pandas(query, dtype_backend=None, post_convert="pyarrow")
df = df.set_index(["section", "key"])
if output_type == "dict":
# converting dict where the keys are tuples {(key1, key2): value}, to a dict where the keys are strings like {key1: {key2: value}}
output = df.to_dict(orient="index")
new_output = {}
for (sec, key), values in output.items():
if sec not in new_output:
new_output[sec] = {}
new_output[sec][key] = values
return new_output
return df
insert(section, key, value, description, schema)
¶
Inserts a new setting into the database
Parameters:
-
(section¶str) –Section name
-
(key¶str) –Setting name
-
(value¶Any) –Value of the Setting
-
(description¶str) –Description of the Setting
-
(schema¶dict[str, Any]) –A JSON schema used to validate if the setting value is correct. If the value is not valid against the schema, an error will be raised.
Returns:
-
int–The ID of the inserted setting
Source code in echo_postgres/settings.py
@validate_call
def insert(
self,
section: str,
key: str,
value: Any,
description: str,
schema: dict[str, Any],
) -> int:
"""Inserts a new setting into the database
Parameters
----------
section : str
Section name
key : str
Setting name
value : Any
Value of the Setting
description : str
Description of the Setting
schema : dict[str, Any]
A JSON schema used to validate if the setting value is correct. If the value is not valid against the schema, an error will be raised.
Returns
-------
int
The ID of the inserted setting
"""
# validating the value against the schema
try:
jsonschema.validate(instance=value, schema=schema)
except jsonschema.ValidationError as e:
raise ValueError(f"Value {value} is not valid against the schema {schema}. Error: {e.message}") from e
query = sql.SQL("""
INSERT INTO performance.settings (section, key, value, description, schema)
VALUES ({section}, {key}, {value}, {description}, {schema})
RETURNING id
""").format(
section=sql.Literal(section),
key=sql.Literal(key),
value=sql.Literal(json.dumps(value)),
description=sql.Literal(description),
schema=sql.Literal(json.dumps(schema)),
)
with self._perfdb.conn.reconnect() as conn:
return conn.execute(query)
update(setting_id, section=None, key=None, value=None, description=None, schema=None)
¶
Updates a setting in the database. At least one field must be provided for update.
Parameters:
-
(setting_id¶int) –ID of the setting to update
-
(section¶str, default:None) –Section name, by default None (will not be updated)
-
(key¶str, default:None) –Setting name, by default None (will not be updated)
-
(value¶Any, default:None) –Value of the Setting, by default None (will not be updated)
-
(description¶str, default:None) –Description of the Setting, by default None (will not be updated)
-
(schema¶dict[str, Any], default:None) –A JSON schema used to validate if the setting value is correct. If the value is not valid against the schema, an error will be raised. By default None (will not be updated)
Returns:
-
None–
Source code in echo_postgres/settings.py
@validate_call
def update(
self,
setting_id: int,
section: str | None = None,
key: str | None = None,
value: Any | None = None,
description: str | None = None,
schema: dict[str, Any] | None = None,
) -> None:
"""Updates a setting in the database. At least one field must be provided for update.
Parameters
----------
setting_id : int
ID of the setting to update
section : str, optional
Section name, by default None (will not be updated)
key : str, optional
Setting name, by default None (will not be updated)
value : Any, optional
Value of the Setting, by default None (will not be updated)
description : str, optional
Description of the Setting, by default None (will not be updated)
schema : dict[str, Any], optional
A JSON schema used to validate if the setting value is correct. If the value is not valid against the schema, an error will be raised. By default None (will not be updated)
Returns
-------
None
"""
# checking if at least one field is provided for update
if section is None and key is None and value is None and description is None and schema is None:
raise ValueError("At least one field must be provided for update")
# validating the value against the schema if value is provided
if value is not None and schema is not None:
try:
jsonschema.validate(instance=value, schema=schema)
except jsonschema.ValidationError as e:
raise ValueError(f"Value {value} is not valid against the schema {schema}. Error: {e.message}") from e
set_clauses = []
if section is not None:
set_clauses.append(sql.SQL("section = {section}").format(section=sql.Literal(section)))
if key is not None:
set_clauses.append(sql.SQL("key = {key}").format(key=sql.Literal(key)))
if value is not None:
set_clauses.append(sql.SQL("value = {value}").format(value=sql.Literal(json.dumps(value))))
if description is not None:
set_clauses.append(sql.SQL("description = {description}").format(description=sql.Literal(description)))
if schema is not None:
set_clauses.append(sql.SQL("schema = {schema}").format(schema=sql.Literal(json.dumps(schema))))
set_clause = sql.SQL(", ").join(set_clauses)
query = sql.SQL("""
UPDATE performance.settings
SET {set_clause}
WHERE id = {id}
""").format(
set_clause=set_clause,
id=sql.Literal(setting_id),
)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)