Inventory Transaction Items¶
InventoryTransactionItems(perfdb)
¶
Class used for handling Inventory Transaction Items. Can be accessed via perfdb.inventory.transactions.items.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(item_ids)
¶
Deletes inventory transaction items from the database.
Parameters:
-
(item_ids¶list[int]) –List of transaction item IDs to be deleted.
Returns:
-
int–Number of rows deleted.
Source code in echo_postgres/inventory_transaction_items.py
@validate_call
def delete(self, item_ids: list[int]) -> int:
"""Deletes inventory transaction items from the database.
Parameters
----------
item_ids : list[int]
List of transaction item IDs to be deleted.
Returns
-------
int
Number of rows deleted.
"""
query = sql.SQL("DELETE FROM performance.inv_transaction_items WHERE id = ANY({ids})").format(
ids=sql.Literal(item_ids),
)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
deleted = conn.rowcount
logger.debug(f"Deleted {deleted} transaction item(s).")
return deleted
get(ids=None, transaction_header_ids=None, center_names=None, storage_location_names=None, material_descriptions=None, material_sap_ids=None, transaction_type_names=None, directions=None, service_order_names=None, period=None, filter_type='and', output_type='pl.DataFrame')
¶
Gets all inventory transaction items and their attributes.
The most useful keys/columns returned are:
- id
- transaction_header_id
- transaction_date
- storage_location_name
- center_name
- material_description
- material_sap_id
- base_unit
- transaction_type_name
- direction
- signed_quantity
- absolute_quantity
- service_order_name
- creator_name
Parameters:
-
(ids¶list[int] | None, default:None) –List of transaction item IDs to filter. By default None.
-
(transaction_header_ids¶list[int] | None, default:None) –List of transaction header IDs to filter. By default None.
-
(center_names¶list[str] | None, default:None) –List of center names to filter. By default None.
-
(storage_location_names¶list[str] | None, default:None) –List of storage location names to filter. By default None.
-
(material_descriptions¶list[str] | None, default:None) –List of material descriptions to filter. By default None.
-
(material_sap_ids¶list[int] | None, default:None) –List of material SAP IDs to filter. By default None.
-
(transaction_type_names¶list[str] | None, default:None) –List of transaction type names to filter. By default None.
-
(directions¶list[str] | None, default:None) –List of directions to filter. By default None.
-
(service_order_names¶list[str] | None, default:None) –List of service order names to filter. By default None.
-
(period¶DateTimeRange | None, default:None) –Date range to filter by transaction_date. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
-
(output_type¶Literal['dict', 'DataFrame', 'pl.DataFrame'], default:'pl.DataFrame') –Output type of the data. By default "pl.DataFrame".
Returns:
-
dict[int, dict[str, Any]]–In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
-
DataFrame–In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame.
Source code in echo_postgres/inventory_transaction_items.py
@validate_call
def get(
self,
ids: list[int] | None = None,
transaction_header_ids: list[int] | None = None,
center_names: list[str] | None = None,
storage_location_names: list[str] | None = None,
material_descriptions: list[str] | None = None,
material_sap_ids: list[int] | None = None,
transaction_type_names: list[str] | None = None,
directions: list[str] | None = None,
service_order_names: list[str] | None = None,
period: DateTimeRange | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
"""Gets all inventory transaction items and their attributes.
The most useful keys/columns returned are:
- id
- transaction_header_id
- transaction_date
- storage_location_name
- center_name
- material_description
- material_sap_id
- base_unit
- transaction_type_name
- direction
- signed_quantity
- absolute_quantity
- service_order_name
- creator_name
Parameters
----------
ids : list[int] | None, optional
List of transaction item IDs to filter. By default None.
transaction_header_ids : list[int] | None, optional
List of transaction header IDs to filter. By default None.
center_names : list[str] | None, optional
List of center names to filter. By default None.
storage_location_names : list[str] | None, optional
List of storage location names to filter. By default None.
material_descriptions : list[str] | None, optional
List of material descriptions to filter. By default None.
material_sap_ids : list[int] | None, optional
List of material SAP IDs to filter. By default None.
transaction_type_names : list[str] | None, optional
List of transaction type names to filter. By default None.
directions : list[str] | None, optional
List of directions to filter. By default None.
service_order_names : list[str] | None, optional
List of service order names to filter. By default None.
period : DateTimeRange | None, optional
Date range to filter by transaction_date. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
Output type of the data. By default "pl.DataFrame".
Returns
-------
dict[int, dict[str, Any]]
In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
pd.DataFrame
In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame.
"""
where = self._check_get_args(
ids=ids,
transaction_header_ids=transaction_header_ids,
center_names=center_names,
storage_location_names=storage_location_names,
material_descriptions=material_descriptions,
material_sap_ids=material_sap_ids,
transaction_type_names=transaction_type_names,
directions=directions,
service_order_names=service_order_names,
period=period,
filter_type=filter_type,
)
query = sql.SQL("SELECT * FROM performance.v_inv_transaction_items {where} ORDER BY transaction_header_id, id").format(
where=where,
)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_polars(query)
if output_type == "pl.DataFrame":
return df
df = df.to_pandas(use_pyarrow_extension_array=True)
df = df.set_index("id")
if output_type == "DataFrame":
return df
return df.to_dict(orient="index")
get_ids(ids=None, transaction_header_ids=None, center_names=None, storage_location_names=None, material_descriptions=None, material_sap_ids=None, transaction_type_names=None, directions=None, service_order_names=None, period=None, filter_type='and')
¶
Gets all inventory transaction item IDs grouped by header ID.
Parameters:
-
(ids¶list[int] | None, default:None) –List of transaction item IDs to filter. By default None.
-
(transaction_header_ids¶list[int] | None, default:None) –List of transaction header IDs to filter. By default None.
-
(center_names¶list[str] | None, default:None) –List of center names to filter. By default None.
-
(storage_location_names¶list[str] | None, default:None) –List of storage location names to filter. By default None.
-
(material_descriptions¶list[str] | None, default:None) –List of material descriptions to filter. By default None.
-
(material_sap_ids¶list[int] | None, default:None) –List of material SAP IDs to filter. By default None.
-
(transaction_type_names¶list[str] | None, default:None) –List of transaction type names to filter. By default None.
-
(directions¶list[str] | None, default:None) –List of directions to filter. By default None.
-
(service_order_names¶list[str] | None, default:None) –List of service order names to filter. By default None.
-
(period¶DateTimeRange | None, default:None) –Date range to filter by transaction_date. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
Returns:
-
dict[int, list[int]]–Dictionary with header IDs as keys and lists of item IDs as values.
Source code in echo_postgres/inventory_transaction_items.py
@validate_call
def get_ids(
self,
ids: list[int] | None = None,
transaction_header_ids: list[int] | None = None,
center_names: list[str] | None = None,
storage_location_names: list[str] | None = None,
material_descriptions: list[str] | None = None,
material_sap_ids: list[int] | None = None,
transaction_type_names: list[str] | None = None,
directions: list[str] | None = None,
service_order_names: list[str] | None = None,
period: DateTimeRange | None = None,
filter_type: Literal["and", "or"] = "and",
) -> dict[int, list[int]]:
"""Gets all inventory transaction item IDs grouped by header ID.
Parameters
----------
ids : list[int] | None, optional
List of transaction item IDs to filter. By default None.
transaction_header_ids : list[int] | None, optional
List of transaction header IDs to filter. By default None.
center_names : list[str] | None, optional
List of center names to filter. By default None.
storage_location_names : list[str] | None, optional
List of storage location names to filter. By default None.
material_descriptions : list[str] | None, optional
List of material descriptions to filter. By default None.
material_sap_ids : list[int] | None, optional
List of material SAP IDs to filter. By default None.
transaction_type_names : list[str] | None, optional
List of transaction type names to filter. By default None.
directions : list[str] | None, optional
List of directions to filter. By default None.
service_order_names : list[str] | None, optional
List of service order names to filter. By default None.
period : DateTimeRange | None, optional
Date range to filter by transaction_date. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
Returns
-------
dict[int, list[int]]
Dictionary with header IDs as keys and lists of item IDs as values.
"""
where = self._check_get_args(
ids=ids,
transaction_header_ids=transaction_header_ids,
center_names=center_names,
storage_location_names=storage_location_names,
material_descriptions=material_descriptions,
material_sap_ids=material_sap_ids,
transaction_type_names=transaction_type_names,
directions=directions,
service_order_names=service_order_names,
period=period,
filter_type=filter_type,
)
query = sql.SQL(
"SELECT transaction_header_id, id FROM performance.v_inv_transaction_items {where} ORDER BY transaction_header_id, id",
).format(where=where)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_polars(query)
df = df.group_by("transaction_header_id").agg(pl.col("id")).sort("transaction_header_id")
return dict(zip(df["transaction_header_id"].to_list(), df["id"].to_list(), strict=False))
insert(transaction_header_id=None, material_description=None, quantity=None, created_by_name=None, data_df=None, on_conflict='ignore')
¶
Inserts a new inventory transaction item into the database.
You can either pass individual values to insert a single item, or pass a DataFrame to insert multiple items at once.
Parameters:
-
(transaction_header_id¶int | None, default:None) –ID of the transaction header this item belongs to. By default None.
-
(material_description¶str | None, default:None) –Description of the material. Must exist in inv_materials table. By default None.
-
(quantity¶float | None, default:None) –Quantity of the material. Must be > 0. By default None.
-
(created_by_name¶str | None, default:None) –Name of the user creating the item. Must exist in users table. By default None.
-
(data_df¶DataFrame | None, default:None) –Polars DataFrame containing multiple items to insert. Required columns: transaction_header_id, material_description, quantity, created_by_name. If this is used, all individual parameters will be ignored. By default None.
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict (based on transaction_header_id + material_id). By default "ignore".
Returns:
-
int | list[int] | None–If inserting a single item, returns the ID. If inserting multiple, returns a list of IDs. Returns None if nothing was inserted.
Source code in echo_postgres/inventory_transaction_items.py
@validate_call
def insert(
self,
transaction_header_id: int | None = None,
material_description: str | None = None,
quantity: float | None = None,
created_by_name: str | None = None,
data_df: pl.DataFrame | None = None,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> int | list[int] | None:
"""Inserts a new inventory transaction item into the database.
You can either pass individual values to insert a single item, or pass a DataFrame
to insert multiple items at once.
Parameters
----------
transaction_header_id : int | None, optional
ID of the transaction header this item belongs to. By default None.
material_description : str | None, optional
Description of the material. Must exist in inv_materials table. By default None.
quantity : float | None, optional
Quantity of the material. Must be > 0. By default None.
created_by_name : str | None, optional
Name of the user creating the item. Must exist in users table. By default None.
data_df : pl.DataFrame | None, optional
Polars DataFrame containing multiple items to insert.
Required columns: transaction_header_id, material_description, quantity, created_by_name.
If this is used, all individual parameters will be ignored. By default None.
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict (based on transaction_header_id + material_id). By default "ignore".
Returns
-------
int | list[int] | None
If inserting a single item, returns the ID.
If inserting multiple, returns a list of IDs.
Returns None if nothing was inserted.
"""
df_schema = {
"transaction_header_id": pl.Int64,
"material_description": pl.Utf8,
"quantity": pl.Float64,
"created_by_name": pl.Utf8,
}
if data_df is None:
single_insert = True
data_df = pl.DataFrame(
{
"transaction_header_id": [transaction_header_id],
"material_description": [material_description],
"quantity": [quantity],
"created_by_name": [created_by_name],
},
schema=df_schema,
)
else:
single_insert = False
# check required columns
required_cols = ["transaction_header_id", "material_description", "quantity", "created_by_name"]
for col in required_cols:
if col not in data_df.columns:
raise ValueError(f"data_df is missing required column '{col}'.")
if data_df[col].is_null().any():
raise ValueError(f"data_df column '{col}' contains null values, but it is required.")
# resolve material_description to material_id
materials_df = self._perfdb.inventory.materials.get(output_type="pl.DataFrame")
mat_desc_map = dict(
zip(materials_df["description"].to_list(), materials_df["id"].to_list(), strict=False),
)
if wrong_mats := set(data_df["material_description"].to_list()) - set(mat_desc_map.keys()):
raise ValueError(f"Material descriptions not found in the database: {wrong_mats}")
data_df = data_df.with_columns(
pl.col("material_description").replace_strict(mat_desc_map, return_dtype=pl.Int64).alias("material_id"),
)
# resolve created_by_name to created_by_id
user_names = data_df["created_by_name"].drop_nulls().unique().to_list()
if user_names:
user_ids = self._perfdb.users.instances.get_ids(names=user_names)
if wrong_users := set(user_names) - set(user_ids.keys()):
raise ValueError(f"User names not found in the database: {wrong_users}")
data_df = data_df.with_columns(
pl.col("created_by_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("created_by_id"),
)
else:
data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("created_by_id"))
data_df = data_df.drop(["material_description", "created_by_name"])
ids_df = self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="inv_transaction_items",
schema="performance",
conflict_cols=["transaction_header_id", "material_id"],
return_cols=["id"],
if_exists="append" if on_conflict == "ignore" else "update",
ignore_null_cols=single_insert,
)
ids = ids_df["id"].to_list()
logger.debug(f"Inserted/updated {len(ids)} transaction item(s) with IDs: {ids}")
return ids if not single_insert else ids[0] if ids else None
update(item_id=None, quantity=None, data_df=None)
¶
Updates an existing inventory transaction item in the database.
Parameters:
-
(item_id¶int | None, default:None) –ID of the item to be updated. Required for single updates. By default None.
-
(quantity¶float | None, default:None) –New quantity. Must be > 0. By default None.
-
(data_df¶DataFrame | None, default:None) –Polars DataFrame containing multiple items to update. Required column: id. Optional: quantity. If this is used, all individual parameters will be ignored. By default None.
Source code in echo_postgres/inventory_transaction_items.py
@validate_call
def update(
self,
item_id: int | None = None,
quantity: float | None = None,
data_df: pl.DataFrame | None = None,
) -> None:
"""Updates an existing inventory transaction item in the database.
Parameters
----------
item_id : int | None, optional
ID of the item to be updated. Required for single updates. By default None.
quantity : float | None, optional
New quantity. Must be > 0. By default None.
data_df : pl.DataFrame | None, optional
Polars DataFrame containing multiple items to update.
Required column: id. Optional: quantity.
If this is used, all individual parameters will be ignored. By default None.
"""
df_schema = {
"id": pl.Int64,
"quantity": pl.Float64,
}
if data_df is None:
data_df = pl.DataFrame(
{
"id": [item_id],
"quantity": [quantity],
},
schema=df_schema,
)
single_update = True
else:
single_update = False
if "id" not in data_df.columns:
raise ValueError("data_df is missing required column 'id'.")
# check if IDs exist
existing_query = sql.SQL("SELECT id FROM performance.inv_transaction_items WHERE id = ANY({ids})").format(
ids=sql.Literal(data_df["id"].to_list()),
)
with self._perfdb.conn.reconnect() as conn:
existing_df = conn.read_to_polars(existing_query)
if wrong_ids := set(data_df["id"].to_list()) - set(existing_df["id"].to_list()):
raise ValueError(f"Transaction item IDs {wrong_ids} do not exist in the database.")
self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="inv_transaction_items",
schema="performance",
conflict_cols=["id"],
if_exists="update_only",
ignore_null_cols=single_update,
)
logger.debug(f"Updated {len(data_df)} transaction item(s).")