Inventory Physical Count Items¶
InventoryPhysicalCountItems(perfdb)
¶
Class used for handling Inventory Physical Count Items. Can be accessed via perfdb.inventory.physical_counts.items.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(item_ids)
¶
Deletes inventory physical count items from the database.
Parameters:
-
(item_ids¶list[int]) –List of item IDs to be deleted.
Returns:
-
int–Number of rows deleted.
Source code in echo_postgres/inventory_physical_count_items.py
@validate_call
def delete(self, item_ids: list[int]) -> int:
"""Deletes inventory physical count items from the database.
Parameters
----------
item_ids : list[int]
List of item IDs to be deleted.
Returns
-------
int
Number of rows deleted.
"""
query = sql.SQL("DELETE FROM performance.inv_physical_count_items WHERE id = ANY({ids})").format(
ids=sql.Literal(item_ids),
)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
deleted = conn.rowcount
logger.debug(f"Deleted {deleted} physical count item(s).")
return deleted
get(ids=None, physical_count_ids=None, center_names=None, storage_location_names=None, material_descriptions=None, count_statuses=None, filter_type='and', output_type='pl.DataFrame')
¶
Gets all inventory physical count items and their attributes.
The most useful keys/columns returned are:
- id
- physical_count_id
- count_description
- count_status
- executed_date
- material_description
- material_sap_id
- base_unit
- system_quantity
- counted_quantity
- difference_quantity
- counter_name
- storage_location_name
- center_name
Parameters:
-
(ids¶list[int] | None, default:None) –List of item IDs to filter. By default None.
-
(physical_count_ids¶list[int] | None, default:None) –List of physical count IDs to filter. By default None.
-
(center_names¶list[str] | None, default:None) –List of center names to filter. By default None.
-
(storage_location_names¶list[str] | None, default:None) –List of storage location names to filter. By default None.
-
(material_descriptions¶list[str] | None, default:None) –List of material descriptions to filter. By default None.
-
(count_statuses¶list[str] | None, default:None) –List of count statuses to filter. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
-
(output_type¶Literal['dict', 'DataFrame', 'pl.DataFrame'], default:'pl.DataFrame') –Output type of the data. By default "pl.DataFrame".
Returns:
-
dict[int, dict[str, Any]]–In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
-
DataFrame–In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame.
Source code in echo_postgres/inventory_physical_count_items.py
@validate_call
def get(
self,
ids: list[int] | None = None,
physical_count_ids: list[int] | None = None,
center_names: list[str] | None = None,
storage_location_names: list[str] | None = None,
material_descriptions: list[str] | None = None,
count_statuses: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
"""Gets all inventory physical count items and their attributes.
The most useful keys/columns returned are:
- id
- physical_count_id
- count_description
- count_status
- executed_date
- material_description
- material_sap_id
- base_unit
- system_quantity
- counted_quantity
- difference_quantity
- counter_name
- storage_location_name
- center_name
Parameters
----------
ids : list[int] | None, optional
List of item IDs to filter. By default None.
physical_count_ids : list[int] | None, optional
List of physical count IDs to filter. By default None.
center_names : list[str] | None, optional
List of center names to filter. By default None.
storage_location_names : list[str] | None, optional
List of storage location names to filter. By default None.
material_descriptions : list[str] | None, optional
List of material descriptions to filter. By default None.
count_statuses : list[str] | None, optional
List of count statuses to filter. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
Output type of the data. By default "pl.DataFrame".
Returns
-------
dict[int, dict[str, Any]]
In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
pd.DataFrame
In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame.
"""
where = self._check_get_args(
ids=ids,
physical_count_ids=physical_count_ids,
center_names=center_names,
storage_location_names=storage_location_names,
material_descriptions=material_descriptions,
count_statuses=count_statuses,
filter_type=filter_type,
)
query = sql.SQL("SELECT * FROM performance.v_inv_physical_count_items {where} ORDER BY physical_count_id, id").format(
where=where,
)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_polars(query)
if output_type == "pl.DataFrame":
return df
df = df.to_pandas(use_pyarrow_extension_array=True)
df = df.set_index("id")
if output_type == "DataFrame":
return df
return df.to_dict(orient="index")
get_ids(ids=None, physical_count_ids=None, center_names=None, storage_location_names=None, material_descriptions=None, count_statuses=None, filter_type='and')
¶
Gets all inventory physical count item IDs grouped by physical count ID.
Parameters:
-
(ids¶list[int] | None, default:None) –List of item IDs to filter. By default None.
-
(physical_count_ids¶list[int] | None, default:None) –List of physical count IDs to filter. By default None.
-
(center_names¶list[str] | None, default:None) –List of center names to filter. By default None.
-
(storage_location_names¶list[str] | None, default:None) –List of storage location names to filter. By default None.
-
(material_descriptions¶list[str] | None, default:None) –List of material descriptions to filter. By default None.
-
(count_statuses¶list[str] | None, default:None) –List of count statuses to filter. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
Returns:
-
dict[int, list[int]]–Dictionary with physical count IDs as keys and lists of item IDs as values.
Source code in echo_postgres/inventory_physical_count_items.py
@validate_call
def get_ids(
self,
ids: list[int] | None = None,
physical_count_ids: list[int] | None = None,
center_names: list[str] | None = None,
storage_location_names: list[str] | None = None,
material_descriptions: list[str] | None = None,
count_statuses: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
) -> dict[int, list[int]]:
"""Gets all inventory physical count item IDs grouped by physical count ID.
Parameters
----------
ids : list[int] | None, optional
List of item IDs to filter. By default None.
physical_count_ids : list[int] | None, optional
List of physical count IDs to filter. By default None.
center_names : list[str] | None, optional
List of center names to filter. By default None.
storage_location_names : list[str] | None, optional
List of storage location names to filter. By default None.
material_descriptions : list[str] | None, optional
List of material descriptions to filter. By default None.
count_statuses : list[str] | None, optional
List of count statuses to filter. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
Returns
-------
dict[int, list[int]]
Dictionary with physical count IDs as keys and lists of item IDs as values.
"""
where = self._check_get_args(
ids=ids,
physical_count_ids=physical_count_ids,
center_names=center_names,
storage_location_names=storage_location_names,
material_descriptions=material_descriptions,
count_statuses=count_statuses,
filter_type=filter_type,
)
query = sql.SQL(
"SELECT physical_count_id, id FROM performance.v_inv_physical_count_items {where} ORDER BY physical_count_id, id",
).format(where=where)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_polars(query)
df = df.group_by("physical_count_id").agg(pl.col("id")).sort("physical_count_id")
return dict(zip(df["physical_count_id"].to_list(), df["id"].to_list(), strict=False))
insert(physical_count_id=None, material_description=None, counted_quantity=None, counted_by_name=None, data_df=None)
¶
Inserts a new inventory physical count item into the database.
You can either pass individual values to insert a single item, or pass a DataFrame to insert multiple items at once.
Parameters:
-
(physical_count_id¶int | None, default:None) –ID of the physical count this item belongs to. By default None.
-
(material_description¶str | None, default:None) –Description of the material. Must exist in inv_materials table. By default None.
-
(counted_quantity¶float | None, default:None) –Counted quantity. Must be >= 0. By default None.
-
(counted_by_name¶str | None, default:None) –Name of the user who counted. Must exist in users table. By default None.
-
(data_df¶DataFrame | None, default:None) –Polars DataFrame containing multiple items to insert. Required columns: physical_count_id, material_description. Optional: counted_quantity, counted_by_name, system_quantity. If this is used, all individual parameters will be ignored. By default None.
Returns:
-
int | list[int] | None–If inserting a single item, returns the ID. If inserting multiple, returns a list of IDs. Returns None if nothing was inserted.
Source code in echo_postgres/inventory_physical_count_items.py
@validate_call
def insert(
self,
physical_count_id: int | None = None,
material_description: str | None = None,
counted_quantity: float | None = None,
counted_by_name: str | None = None,
data_df: pl.DataFrame | None = None,
) -> int | list[int] | None:
"""Inserts a new inventory physical count item into the database.
You can either pass individual values to insert a single item, or pass a DataFrame
to insert multiple items at once.
Parameters
----------
physical_count_id : int | None, optional
ID of the physical count this item belongs to. By default None.
material_description : str | None, optional
Description of the material. Must exist in inv_materials table. By default None.
counted_quantity : float | None, optional
Counted quantity. Must be >= 0. By default None.
counted_by_name : str | None, optional
Name of the user who counted. Must exist in users table. By default None.
data_df : pl.DataFrame | None, optional
Polars DataFrame containing multiple items to insert.
Required columns: physical_count_id, material_description.
Optional: counted_quantity, counted_by_name, system_quantity.
If this is used, all individual parameters will be ignored. By default None.
Returns
-------
int | list[int] | None
If inserting a single item, returns the ID.
If inserting multiple, returns a list of IDs.
Returns None if nothing was inserted.
"""
df_schema = {
"physical_count_id": pl.Int64,
"material_description": pl.Utf8,
"counted_quantity": pl.Float64,
"counted_by_name": pl.Utf8,
}
if data_df is None:
single_insert = True
data_df = pl.DataFrame(
{
"physical_count_id": [physical_count_id],
"material_description": [material_description],
"counted_quantity": [counted_quantity],
"counted_by_name": [counted_by_name],
},
schema=df_schema,
)
else:
single_insert = False
required_cols = ["physical_count_id", "material_description", "counted_quantity", "counted_by_name"]
for col in required_cols:
if col not in data_df.columns:
raise ValueError(f"data_df is missing required column '{col}'.")
if len(data_df.filter(pl.col(col).is_not_null())) == 0:
raise ValueError(f"data_df column '{col}' cannot be all nulls.")
# resolve material_description to material_id
materials_df = self._perfdb.inventory.materials.get(output_type="pl.DataFrame")
mat_desc_map = dict(
zip(materials_df["description"].to_list(), materials_df["id"].to_list(), strict=False),
)
if wrong_mats := set(data_df["material_description"].drop_nulls().to_list()) - set(mat_desc_map.keys()):
raise ValueError(f"Material descriptions not found in the database: {wrong_mats}")
data_df = data_df.with_columns(
pl.col("material_description").replace_strict(mat_desc_map, return_dtype=pl.Int64).alias("material_id"),
)
# resolve counted_by_name to counted_by_id
if "counted_by_name" in data_df.columns and len(data_df.filter(pl.col("counted_by_name").is_not_null())) > 0:
user_names = data_df["counted_by_name"].drop_nulls().unique().to_list()
user_ids = self._perfdb.users.instances.get_ids(names=user_names)
if wrong_users := set(user_names) - set(user_ids.keys()):
raise ValueError(f"User names not found in the database: {wrong_users}")
data_df = data_df.with_columns(
pl.col("counted_by_name").replace_strict(user_ids, return_dtype=pl.Int64, default=None).alias("counted_by_id"),
)
else:
data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("counted_by_id"))
data_df = data_df.drop(["material_description", "counted_by_name"])
ids_df = self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="inv_physical_count_items",
schema="performance",
conflict_cols=["physical_count_id", "material_id"],
return_cols=["id"],
if_exists="skip_row_check",
ignore_null_cols=single_insert,
)
ids = ids_df["id"].to_list()
logger.debug(f"Inserted/updated {len(ids)} physical count item(s) with IDs: {ids}")
return ids if not single_insert else ids[0] if ids else None
update(item_id=None, counted_quantity=None, counted_by_name=None, system_quantity=None, data_df=None)
¶
Updates an existing inventory physical count item in the database.
Parameters:
-
(item_id¶int | None, default:None) –ID of the item to be updated. Required for single updates. By default None.
-
(counted_quantity¶float | None, default:None) –New counted quantity. Must be >= 0. By default None.
-
(counted_by_name¶str | None, default:None) –Name of the user who performed the count. Must exist in users table. By default None.
-
(system_quantity¶float | None, default:None) –New system quantity. By default None.
-
(data_df¶DataFrame | None, default:None) –Polars DataFrame containing multiple items to update. Required column: id. Optional: counted_quantity, counted_by_name, system_quantity. If this is used, all individual parameters will be ignored. By default None.
Source code in echo_postgres/inventory_physical_count_items.py
@validate_call
def update(
self,
item_id: int | None = None,
counted_quantity: float | None = None,
counted_by_name: str | None = None,
system_quantity: float | None = None,
data_df: pl.DataFrame | None = None,
) -> None:
"""Updates an existing inventory physical count item in the database.
Parameters
----------
item_id : int | None, optional
ID of the item to be updated. Required for single updates. By default None.
counted_quantity : float | None, optional
New counted quantity. Must be >= 0. By default None.
counted_by_name : str | None, optional
Name of the user who performed the count. Must exist in users table. By default None.
system_quantity : float | None, optional
New system quantity. By default None.
data_df : pl.DataFrame | None, optional
Polars DataFrame containing multiple items to update.
Required column: id. Optional: counted_quantity, counted_by_name, system_quantity.
If this is used, all individual parameters will be ignored. By default None.
"""
df_schema = {
"id": pl.Int64,
"counted_quantity": pl.Float64,
"counted_by_name": pl.Utf8,
"system_quantity": pl.Float64,
}
if data_df is None:
data_df = pl.DataFrame(
{
"id": [item_id],
"counted_quantity": [counted_quantity],
"counted_by_name": [counted_by_name],
"system_quantity": [system_quantity],
},
schema=df_schema,
)
single_update = True
else:
single_update = False
if "id" not in data_df.columns:
raise ValueError("data_df is missing required column 'id'.")
# check if IDs exist
existing_query = sql.SQL("SELECT id FROM performance.inv_physical_count_items WHERE id = ANY({ids})").format(
ids=sql.Literal(data_df["id"].to_list()),
)
with self._perfdb.conn.reconnect() as conn:
existing_df = conn.read_to_polars(existing_query)
if wrong_ids := set(data_df["id"].to_list()) - set(existing_df["id"].to_list()):
raise ValueError(f"Physical count item IDs {wrong_ids} do not exist in the database.")
# resolve counted_by_name to counted_by_id
if "counted_by_name" in data_df.columns and len(data_df.filter(pl.col("counted_by_name").is_not_null())) > 0:
user_names = data_df["counted_by_name"].drop_nulls().unique().to_list()
user_ids = self._perfdb.users.instances.get_ids(names=user_names)
if wrong_users := set(user_names) - set(user_ids.keys()):
raise ValueError(f"User names not found in the database: {wrong_users}")
data_df = data_df.with_columns(
pl.col("counted_by_name").replace_strict(user_ids, return_dtype=pl.Int64, default=None).alias("counted_by_id"),
)
else:
data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("counted_by_id"))
if "counted_by_name" in data_df.columns:
data_df = data_df.drop(["counted_by_name"])
self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="inv_physical_count_items",
schema="performance",
conflict_cols=["id"],
if_exists="update_only",
ignore_null_cols=single_update,
)
logger.debug(f"Updated {len(data_df)} physical count item(s).")