Inventory Materials¶
InventoryMaterials(perfdb)
¶
Class used for handling Inventory Materials. Can be accessed via perfdb.inventory.materials.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(material_id)
¶
Deletes an inventory material from the database.
Parameters:
-
(material_id¶int) –ID of the material to be deleted.
Returns:
-
int–Number of rows deleted (0 if no material with the given ID was found).
Source code in echo_postgres/inventory_materials.py
@validate_call
def delete(self, material_id: int) -> int:
"""Deletes an inventory material from the database.
Parameters
----------
material_id : int
ID of the material to be deleted.
Returns
-------
int
Number of rows deleted (0 if no material with the given ID was found).
"""
query = sql.SQL("DELETE FROM performance.inv_materials WHERE id = {id}").format(
id=sql.Literal(material_id),
)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
logger.debug(f"Deleted {conn.rowcount} material(s) from table inv_materials with ID {material_id}.")
return conn.rowcount
get(descriptions=None, sap_ids=None, is_active=None, base_units=None, filter_type='and', output_type='pl.DataFrame')
¶
Gets all inventory materials and their attributes.
The most useful keys/columns returned are:
- id
- sap_id
- description
- display_name
- criticality_weight
- base_unit
- is_active
- creator_name
- modifier_name
Parameters:
-
(descriptions¶list[str] | None, default:None) –List of material descriptions to filter the results. By default None.
-
(sap_ids¶list[int] | None, default:None) –List of SAP IDs to filter the results. By default None.
-
(is_active¶bool | None, default:None) –Filter by active status. By default None.
-
(base_units¶list[str] | None, default:None) –List of base units to filter the results. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and".
-
(output_type¶Literal['dict', 'DataFrame', 'pl.DataFrame'], default:'pl.DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame", "pl.DataFrame"]. By default "pl.DataFrame".
Returns:
-
dict[int, dict[str, Any]]–In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
-
DataFrame–In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame.
Source code in echo_postgres/inventory_materials.py
@validate_call
def get(
self,
descriptions: list[str] | None = None,
sap_ids: list[int] | None = None,
is_active: bool | None = None,
base_units: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[str, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
"""Gets all inventory materials and their attributes.
The most useful keys/columns returned are:
- id
- sap_id
- description
- display_name
- criticality_weight
- base_unit
- is_active
- creator_name
- modifier_name
Parameters
----------
descriptions : list[str] | None, optional
List of material descriptions to filter the results. By default None.
sap_ids : list[int] | None, optional
List of SAP IDs to filter the results. By default None.
is_active : bool | None, optional
Filter by active status. By default None.
base_units : list[str] | None, optional
List of base units to filter the results. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"]. By default "and".
output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame", "pl.DataFrame"].
By default "pl.DataFrame".
Returns
-------
dict[int, dict[str, Any]]
In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
pd.DataFrame
In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame.
"""
where = self._check_get_args(
descriptions=descriptions,
sap_ids=sap_ids,
is_active=is_active,
base_units=base_units,
filter_type=filter_type,
)
query = sql.SQL("SELECT * FROM performance.v_inv_materials {where} ORDER BY description").format(
where=where,
)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_polars(query)
if output_type == "pl.DataFrame":
return df
df = df.to_pandas(use_pyarrow_extension_array=True)
df = df.set_index("id")
if output_type == "DataFrame":
return df
return df.to_dict(orient="index")
get_ids(descriptions=None, sap_ids=None, is_active=None, base_units=None, filter_type='and')
¶
Gets all inventory materials and their respective ids.
Parameters:
-
(descriptions¶list[str] | None, default:None) –List of material descriptions to filter the results. By default None.
-
(sap_ids¶list[int] | None, default:None) –List of SAP IDs to filter the results. By default None.
-
(is_active¶bool | None, default:None) –Filter by active status. By default None.
-
(base_units¶list[str] | None, default:None) –List of base units to filter the results. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and".
Returns:
-
dict[str, int]–Dictionary with all materials and their respective ids in the format {description: id, ...}.
Source code in echo_postgres/inventory_materials.py
@validate_call
def get_ids(
self,
descriptions: list[str] | None = None,
sap_ids: list[int] | None = None,
is_active: bool | None = None,
base_units: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
) -> dict[str, int]:
"""Gets all inventory materials and their respective ids.
Parameters
----------
descriptions : list[str] | None, optional
List of material descriptions to filter the results. By default None.
sap_ids : list[int] | None, optional
List of SAP IDs to filter the results. By default None.
is_active : bool | None, optional
Filter by active status. By default None.
base_units : list[str] | None, optional
List of base units to filter the results. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"]. By default "and".
Returns
-------
dict[str, int]
Dictionary with all materials and their respective ids in the format {description: id, ...}.
"""
where = self._check_get_args(
descriptions=descriptions,
sap_ids=sap_ids,
is_active=is_active,
base_units=base_units,
filter_type=filter_type,
)
query = sql.SQL("SELECT description, id FROM performance.v_inv_materials {where} ORDER BY description").format(
where=where,
)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_polars(query)
return dict(zip(df["description"].to_list(), df["id"].to_list(), strict=False))
insert(description=None, sap_id=None, criticality_weight=1, base_unit=None, is_active=True, created_by_name=None, data_df=None, on_conflict='ignore')
¶
Inserts a new inventory material into the database.
You can either pass individual values to insert a single material, or pass a DataFrame to insert multiple materials at once.
Parameters:
-
(description¶str | None, default:None) –Description of the material. By default None.
-
(sap_id¶int | None, default:None) –SAP ID of the material. Must be unique and > 0. By default None.
-
(criticality_weight¶int, default:1) –Criticality weight of the material. Must be > 0. By default 1.
-
(base_unit¶str | None, default:None) –Base unit of measurement for the material. By default None.
-
(is_active¶bool, default:True) –Whether the material is active. By default True.
-
(created_by_name¶str | None, default:None) –Name of the user who created the material. Must exist in the users table. By default None.
-
(data_df¶DataFrame | None, default:None) –Polars DataFrame containing multiple materials to be inserted. Required columns: description, base_unit, created_by_name. Optional: sap_id, criticality_weight, is_active. If this is used, all individual parameters will be ignored. By default None.
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict (based on sap_id). Can be one of ["ignore", "update"]. By default "ignore".
Returns:
-
int | list[int] | None–If inserting a single material, returns the ID of the inserted record. If inserting multiple, returns a list of IDs. If no record was inserted (due to conflicts and on_conflict="ignore"), returns None.
Source code in echo_postgres/inventory_materials.py
@validate_call
def insert(
self,
description: str | None = None,
sap_id: int | None = None,
criticality_weight: int = 1,
base_unit: str | None = None,
is_active: bool = True,
created_by_name: str | None = None,
data_df: pl.DataFrame | None = None,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> int | list[int] | None:
"""Inserts a new inventory material into the database.
You can either pass individual values to insert a single material, or pass a DataFrame
to insert multiple materials at once.
Parameters
----------
description : str | None, optional
Description of the material. By default None.
sap_id : int | None, optional
SAP ID of the material. Must be unique and > 0. By default None.
criticality_weight : int, optional
Criticality weight of the material. Must be > 0. By default 1.
base_unit : str | None, optional
Base unit of measurement for the material. By default None.
is_active : bool, optional
Whether the material is active. By default True.
created_by_name : str | None, optional
Name of the user who created the material. Must exist in the users table. By default None.
data_df : pl.DataFrame | None, optional
Polars DataFrame containing multiple materials to be inserted.
Required columns: description, base_unit, created_by_name.
Optional: sap_id, criticality_weight, is_active.
If this is used, all individual parameters will be ignored. By default None.
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict (based on sap_id). Can be one of ["ignore", "update"]. By default "ignore".
Returns
-------
int | list[int] | None
If inserting a single material, returns the ID of the inserted record.
If inserting multiple, returns a list of IDs.
If no record was inserted (due to conflicts and on_conflict="ignore"), returns None.
"""
df_schema = {
"description": pl.Utf8,
"sap_id": pl.Int64,
"criticality_weight": pl.Int64,
"base_unit": pl.Utf8,
"is_active": pl.Boolean,
"created_by_name": pl.Utf8,
}
if data_df is None:
single_insert = True
data_df = pl.DataFrame(
{
"description": [description],
"sap_id": [sap_id],
"criticality_weight": [criticality_weight],
"base_unit": [base_unit],
"is_active": [is_active],
"created_by_name": [created_by_name],
},
schema=df_schema,
)
else:
single_insert = False
# validate required columns
required_cols = {"description", "base_unit", "created_by_name"}
missing = required_cols - set(data_df.columns)
if missing:
raise ValueError(f"data_df is missing required columns: {missing}")
# resolve created_by_name to created_by_id
user_names = data_df["created_by_name"].drop_nulls().unique().to_list()
if user_names:
user_ids = self._perfdb.users.instances.get_ids(names=user_names)
if wrong_users := set(user_names) - set(user_ids.keys()):
raise ValueError(f"User names not found in the database: {wrong_users}")
data_df = data_df.with_columns(
pl.col("created_by_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("created_by_id"),
)
else:
data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("created_by_id"))
data_df = data_df.drop(["created_by_name"])
ids_df = self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="inv_materials",
schema="performance",
conflict_cols=["description"],
return_cols=["id"],
if_exists="append" if on_conflict == "ignore" else "update",
ignore_null_cols=single_insert,
)
ids = ids_df["id"].to_list()
logger.debug(f"Inserted/updated {len(ids)} material(s) with IDs: {ids}")
return ids if not single_insert else ids[0] if ids else None
update(material_id=None, description=None, sap_id=None, criticality_weight=None, base_unit=None, is_active=None, modified_by_name=None, data_df=None)
¶
Updates an existing inventory material in the database.
You can either pass individual values to update a single material, or pass a DataFrame to update multiple materials at once.
Parameters:
-
(material_id¶int | None, default:None) –ID of the material to be updated. Required for single updates. By default None.
-
(description¶str | None, default:None) –New description of the material. By default None.
-
(sap_id¶int | None, default:None) –New SAP ID of the material. By default None.
-
(criticality_weight¶int | None, default:None) –New criticality weight. By default None.
-
(base_unit¶str | None, default:None) –New base unit. By default None.
-
(is_active¶bool | None, default:None) –New active status. By default None.
-
(modified_by_name¶str | None, default:None) –Name of the user performing the update. Must exist in the users table. By default None.
-
(data_df¶DataFrame | None, default:None) –Polars DataFrame containing multiple materials to be updated. Required column: id. Optional: description, sap_id, criticality_weight, base_unit, is_active, modified_by_name. If this is used, all individual parameters will be ignored. By default None.
Source code in echo_postgres/inventory_materials.py
@validate_call
def update(
self,
material_id: int | None = None,
description: str | None = None,
sap_id: int | None = None,
criticality_weight: int | None = None,
base_unit: str | None = None,
is_active: bool | None = None,
modified_by_name: str | None = None,
data_df: pl.DataFrame | None = None,
) -> None:
"""Updates an existing inventory material in the database.
You can either pass individual values to update a single material, or pass a DataFrame
to update multiple materials at once.
Parameters
----------
material_id : int | None, optional
ID of the material to be updated. Required for single updates. By default None.
description : str | None, optional
New description of the material. By default None.
sap_id : int | None, optional
New SAP ID of the material. By default None.
criticality_weight : int | None, optional
New criticality weight. By default None.
base_unit : str | None, optional
New base unit. By default None.
is_active : bool | None, optional
New active status. By default None.
modified_by_name : str | None, optional
Name of the user performing the update. Must exist in the users table. By default None.
data_df : pl.DataFrame | None, optional
Polars DataFrame containing multiple materials to be updated.
Required column: id.
Optional: description, sap_id, criticality_weight, base_unit, is_active, modified_by_name.
If this is used, all individual parameters will be ignored. By default None.
"""
df_schema = {
"id": pl.Int64,
"description": pl.Utf8,
"sap_id": pl.Int64,
"criticality_weight": pl.Int64,
"base_unit": pl.Utf8,
"is_active": pl.Boolean,
"modified_by_name": pl.Utf8,
}
if data_df is None:
data_df = pl.DataFrame(
{
"id": [material_id],
"description": [description],
"sap_id": [sap_id],
"criticality_weight": [criticality_weight],
"base_unit": [base_unit],
"is_active": [is_active],
"modified_by_name": [modified_by_name],
},
schema=df_schema,
)
single_update = True
else:
single_update = False
# validate required columns
required_cols = {"id", "modified_by_name"}
missing = required_cols - set(data_df.columns)
if missing:
raise ValueError(f"data_df is missing required columns: {missing}")
# check if IDs exist
existing_query = sql.SQL("SELECT id FROM performance.inv_materials WHERE id = ANY({ids})").format(
ids=sql.Literal(data_df["id"].to_list()),
)
with self._perfdb.conn.reconnect() as conn:
existing_df = conn.read_to_polars(existing_query)
if wrong_ids := set(data_df["id"].to_list()) - set(existing_df["id"].to_list()):
raise ValueError(f"Material IDs {wrong_ids} do not exist in the database.")
# resolve modified_by_name to modified_by_id
if "modified_by_name" in data_df.columns and len(data_df.filter(pl.col("modified_by_name").is_not_null())) > 0:
user_names = data_df["modified_by_name"].drop_nulls().unique().to_list()
user_ids = self._perfdb.users.instances.get_ids(names=user_names)
if wrong_users := set(user_names) - set(user_ids.keys()):
raise ValueError(f"User names not found in the database: {wrong_users}")
data_df = data_df.with_columns(
pl.col("modified_by_name").replace_strict(user_ids, return_dtype=pl.Int64, default=None).alias("modified_by_id"),
)
else:
data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("modified_by_id"))
if "modified_by_name" in data_df.columns:
data_df = data_df.drop(["modified_by_name"])
self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="inv_materials",
schema="performance",
conflict_cols=["id"],
if_exists="update_only",
ignore_null_cols=single_update,
)
logger.debug(f"Updated {len(data_df)} material(s).")