Inventory Physical Counts¶
InventoryPhysicalCounts(perfdb)
¶
Class used for handling Inventory Physical Counts. Can be accessed via perfdb.inventory.physical_counts.
Parameters:
Source code in echo_postgres/inventory_physical_counts.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Class used for handling Inventory Physical Counts. Can be accessed via `perfdb.inventory.physical_counts`.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
super().__init__(perfdb)
from .inventory_physical_count_documents import InventoryPhysicalCountDocuments
from .inventory_physical_count_items import InventoryPhysicalCountItems
# * subclasses
self.items = InventoryPhysicalCountItems(perfdb)
self.documents = InventoryPhysicalCountDocuments(perfdb)
delete(physical_count_ids)
¶
Deletes inventory physical counts from the database.
Parameters:
-
(physical_count_ids¶list[int]) –List of physical count IDs to be deleted.
Returns:
-
int–Number of rows deleted.
Source code in echo_postgres/inventory_physical_counts.py
@validate_call
def delete(self, physical_count_ids: list[int]) -> int:
"""Deletes inventory physical counts from the database.
Parameters
----------
physical_count_ids : list[int]
List of physical count IDs to be deleted.
Returns
-------
int
Number of rows deleted.
"""
query = sql.SQL("DELETE FROM performance.inv_physical_counts WHERE id = ANY({ids})").format(
ids=sql.Literal(physical_count_ids),
)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
deleted = conn.rowcount
logger.debug(f"Deleted {deleted} physical count(s).")
return deleted
get(ids=None, center_names=None, storage_location_names=None, statuses=None, is_planned=None, period=None, filter_type='and', output_type='pl.DataFrame')
¶
Gets all inventory physical counts and their attributes.
The most useful keys/columns returned are:
- id
- description
- is_planned
- status
- planned_date
- executed_date
- storage_location_name
- center_name
- executor_name
- creator_name
- display_label
Parameters:
-
(ids¶list[int] | None, default:None) –List of physical count IDs to filter. By default None.
-
(center_names¶list[str] | None, default:None) –List of center names to filter. By default None.
-
(storage_location_names¶list[str] | None, default:None) –List of storage location names to filter. By default None.
-
(statuses¶list[str] | None, default:None) –List of statuses to filter. By default None.
-
(is_planned¶bool | None, default:None) –Filter by planned status. By default None.
-
(period¶DateTimeRange | None, default:None) –Date range to filter by planned_date or executed_date. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
-
(output_type¶Literal['dict', 'DataFrame', 'pl.DataFrame'], default:'pl.DataFrame') –Output type of the data. By default "pl.DataFrame".
Returns:
-
dict[int, dict[str, Any]]–In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
-
DataFrame–In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame.
Source code in echo_postgres/inventory_physical_counts.py
@validate_call
def get(
self,
ids: list[int] | None = None,
center_names: list[str] | None = None,
storage_location_names: list[str] | None = None,
statuses: list[str] | None = None,
is_planned: bool | None = None,
period: DateTimeRange | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
"""Gets all inventory physical counts and their attributes.
The most useful keys/columns returned are:
- id
- description
- is_planned
- status
- planned_date
- executed_date
- storage_location_name
- center_name
- executor_name
- creator_name
- display_label
Parameters
----------
ids : list[int] | None, optional
List of physical count IDs to filter. By default None.
center_names : list[str] | None, optional
List of center names to filter. By default None.
storage_location_names : list[str] | None, optional
List of storage location names to filter. By default None.
statuses : list[str] | None, optional
List of statuses to filter. By default None.
is_planned : bool | None, optional
Filter by planned status. By default None.
period : DateTimeRange | None, optional
Date range to filter by planned_date or executed_date. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
Output type of the data. By default "pl.DataFrame".
Returns
-------
dict[int, dict[str, Any]]
In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
pd.DataFrame
In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame.
"""
where = self._check_get_args(
ids=ids,
center_names=center_names,
storage_location_names=storage_location_names,
statuses=statuses,
is_planned=is_planned,
period=period,
filter_type=filter_type,
)
query = sql.SQL("SELECT * FROM performance.v_inv_physical_counts {where} ORDER BY id").format(where=where)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_polars(query)
if output_type == "pl.DataFrame":
return df
df = df.to_pandas(use_pyarrow_extension_array=True)
df = df.set_index("id")
if output_type == "DataFrame":
return df
return df.to_dict(orient="index")
get_ids(ids=None, center_names=None, storage_location_names=None, statuses=None, is_planned=None, period=None, filter_type='and')
¶
Gets all inventory physical count IDs matching the provided filters.
Parameters:
-
(ids¶list[int] | None, default:None) –List of physical count IDs to filter. By default None.
-
(center_names¶list[str] | None, default:None) –List of center names to filter. By default None.
-
(storage_location_names¶list[str] | None, default:None) –List of storage location names to filter. By default None.
-
(statuses¶list[str] | None, default:None) –List of statuses to filter. By default None.
-
(is_planned¶bool | None, default:None) –Filter by planned status. By default None.
-
(period¶DateTimeRange | None, default:None) –Date range to filter by planned_date or executed_date. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
Returns:
-
list[int]–List of physical count IDs matching the filters.
Source code in echo_postgres/inventory_physical_counts.py
@validate_call
def get_ids(
self,
ids: list[int] | None = None,
center_names: list[str] | None = None,
storage_location_names: list[str] | None = None,
statuses: list[str] | None = None,
is_planned: bool | None = None,
period: DateTimeRange | None = None,
filter_type: Literal["and", "or"] = "and",
) -> list[int]:
"""Gets all inventory physical count IDs matching the provided filters.
Parameters
----------
ids : list[int] | None, optional
List of physical count IDs to filter. By default None.
center_names : list[str] | None, optional
List of center names to filter. By default None.
storage_location_names : list[str] | None, optional
List of storage location names to filter. By default None.
statuses : list[str] | None, optional
List of statuses to filter. By default None.
is_planned : bool | None, optional
Filter by planned status. By default None.
period : DateTimeRange | None, optional
Date range to filter by planned_date or executed_date. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
Returns
-------
list[int]
List of physical count IDs matching the filters.
"""
where = self._check_get_args(
ids=ids,
center_names=center_names,
storage_location_names=storage_location_names,
statuses=statuses,
is_planned=is_planned,
period=period,
filter_type=filter_type,
)
query = sql.SQL("SELECT id FROM performance.v_inv_physical_counts {where} ORDER BY id").format(where=where)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_polars(query)
return df["id"].to_list()
insert(storage_location_name=None, center_name=None, description=None, is_planned=True, status='PROGRAMADA', planned_date=None, created_by_name=None, data_df=None)
¶
Inserts a new inventory physical count into the database.
You can either pass individual values to insert a single physical count, or pass a DataFrame to insert multiple at once.
Parameters:
-
(storage_location_name¶str | None, default:None) –Name of the storage location. By default None.
-
(center_name¶str | None, default:None) –Name of the center (used together with storage_location_name to resolve storage location ID). By default None.
-
(description¶str | None, default:None) –Description of the physical count. By default None.
-
(is_planned¶bool, default:True) –Whether the physical count is planned. By default True.
-
(status¶str, default:'PROGRAMADA') –Status. Valid values: PROGRAMADA, INICIADA, CONTADA, ENCERRADA, CANCELADA. By default "PROGRAMADA".
-
(planned_date¶date | None, default:None) –Planned date for the count. By default None.
-
(created_by_name¶str | None, default:None) –Name of the user creating the count. Must exist in users table. By default None.
-
(data_df¶DataFrame | None, default:None) –Polars DataFrame containing multiple physical counts to insert. Required columns: storage_location_name, center_name, description, created_by_name. Optional: is_planned, status, planned_date. If this is used, all individual parameters will be ignored. By default None.
Returns:
-
int | list[int] | None–If inserting a single count, returns the ID. If inserting multiple, returns a list of IDs. Returns None if nothing was inserted.
Source code in echo_postgres/inventory_physical_counts.py
@validate_call
def insert(
self,
storage_location_name: str | None = None,
center_name: str | None = None,
description: str | None = None,
is_planned: bool = True,
status: str = "PROGRAMADA",
planned_date: date | None = None,
created_by_name: str | None = None,
data_df: pl.DataFrame | None = None,
) -> int | list[int] | None:
"""Inserts a new inventory physical count into the database.
You can either pass individual values to insert a single physical count, or pass a DataFrame
to insert multiple at once.
Parameters
----------
storage_location_name : str | None, optional
Name of the storage location. By default None.
center_name : str | None, optional
Name of the center (used together with storage_location_name to resolve storage location ID). By default None.
description : str | None, optional
Description of the physical count. By default None.
is_planned : bool, optional
Whether the physical count is planned. By default True.
status : str, optional
Status. Valid values: PROGRAMADA, INICIADA, CONTADA, ENCERRADA, CANCELADA. By default "PROGRAMADA".
planned_date : date | None, optional
Planned date for the count. By default None.
created_by_name : str | None, optional
Name of the user creating the count. Must exist in users table. By default None.
data_df : pl.DataFrame | None, optional
Polars DataFrame containing multiple physical counts to insert.
Required columns: storage_location_name, center_name, description, created_by_name.
Optional: is_planned, status, planned_date.
If this is used, all individual parameters will be ignored. By default None.
Returns
-------
int | list[int] | None
If inserting a single count, returns the ID.
If inserting multiple, returns a list of IDs.
Returns None if nothing was inserted.
"""
df_schema = {
"storage_location_name": pl.Utf8,
"center_name": pl.Utf8,
"description": pl.Utf8,
"is_planned": pl.Boolean,
"status": pl.Utf8,
"planned_date": pl.Date,
"created_by_name": pl.Utf8,
}
if data_df is None:
single_insert = True
data_df = pl.DataFrame(
{
"storage_location_name": [storage_location_name],
"center_name": [center_name],
"description": [description],
"is_planned": [is_planned],
"status": [status],
"planned_date": [planned_date],
"created_by_name": [created_by_name],
},
schema=df_schema,
)
else:
single_insert = False
required_cols = ["storage_location_name", "center_name", "description", "created_by_name"]
for col in required_cols:
if col not in data_df.columns:
raise ValueError(f"data_df is missing required column '{col}'.")
if len(data_df.filter(pl.col(col).is_not_null())) == 0:
raise ValueError(f"data_df column '{col}' cannot be all nulls.")
# resolve storage_location_id
sl_ids_nested = self._perfdb.inventory.storage_locations.get_ids()
sl_ids_flat = {(c, loc): loc_id for c, locs in sl_ids_nested.items() for loc, loc_id in locs.items()}
pairs = list(zip(data_df["center_name"].to_list(), data_df["storage_location_name"].to_list(), strict=False))
if wrong_sls := set(pairs) - set(sl_ids_flat.keys()):
raise ValueError(f"Storage locations not found in the database: {wrong_sls}")
data_df = data_df.with_columns(
pl.Series("storage_location_id", [sl_ids_flat[(c, loc)] for c, loc in pairs], dtype=pl.Int64),
)
# resolve created_by_name to created_by_id
user_names = data_df["created_by_name"].drop_nulls().unique().to_list()
if user_names:
user_ids = self._perfdb.users.instances.get_ids(names=user_names)
if wrong_users := set(user_names) - set(user_ids.keys()):
raise ValueError(f"User names not found in the database: {wrong_users}")
data_df = data_df.with_columns(
pl.col("created_by_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("created_by_id"),
)
else:
data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("created_by_id"))
# drop human-readable columns
data_df = data_df.drop(["storage_location_name", "center_name", "created_by_name"])
ids_df = self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="inv_physical_counts",
schema="performance",
return_cols=["id"],
if_exists="skip_row_check",
ignore_null_cols=single_insert,
)
ids = ids_df["id"].to_list()
logger.debug(f"Inserted/updated {len(ids)} physical count(s) with IDs: {ids}")
return ids if not single_insert else ids[0] if ids else None
update(physical_count_id=None, description=None, status=None, planned_date=None, executed_date=None, executed_by_name=None, data_df=None)
¶
Updates an existing inventory physical count in the database.
Parameters:
-
(physical_count_id¶int | None, default:None) –ID of the physical count to be updated. Required for single updates. By default None.
-
(description¶str | None, default:None) –New description. By default None.
-
(status¶str | None, default:None) –New status. Valid values: PROGRAMADA, INICIADA, CONTADA, ENCERRADA, CANCELADA. By default None.
-
(planned_date¶date | None, default:None) –New planned date. By default None.
-
(executed_date¶date | None, default:None) –New executed date. By default None.
-
(executed_by_name¶str | None, default:None) –Name of the user who executed the count. Must exist in users table. By default None.
-
(data_df¶DataFrame | None, default:None) –Polars DataFrame containing multiple physical counts to update. Required column: id. Optional: description, status, planned_date, executed_date, executed_by_name. If this is used, all individual parameters will be ignored. By default None.
Source code in echo_postgres/inventory_physical_counts.py
@validate_call
def update(
self,
physical_count_id: int | None = None,
description: str | None = None,
status: str | None = None,
planned_date: date | None = None,
executed_date: date | None = None,
executed_by_name: str | None = None,
data_df: pl.DataFrame | None = None,
) -> None:
"""Updates an existing inventory physical count in the database.
Parameters
----------
physical_count_id : int | None, optional
ID of the physical count to be updated. Required for single updates. By default None.
description : str | None, optional
New description. By default None.
status : str | None, optional
New status. Valid values: PROGRAMADA, INICIADA, CONTADA, ENCERRADA, CANCELADA. By default None.
planned_date : date | None, optional
New planned date. By default None.
executed_date : date | None, optional
New executed date. By default None.
executed_by_name : str | None, optional
Name of the user who executed the count. Must exist in users table. By default None.
data_df : pl.DataFrame | None, optional
Polars DataFrame containing multiple physical counts to update.
Required column: id. Optional: description, status, planned_date, executed_date, executed_by_name.
If this is used, all individual parameters will be ignored. By default None.
"""
df_schema = {
"id": pl.Int64,
"description": pl.Utf8,
"status": pl.Utf8,
"planned_date": pl.Date,
"executed_date": pl.Date,
"executed_by_name": pl.Utf8,
}
if data_df is None:
data_df = pl.DataFrame(
{
"id": [physical_count_id],
"description": [description],
"status": [status],
"planned_date": [planned_date],
"executed_date": [executed_date],
"executed_by_name": [executed_by_name],
},
schema=df_schema,
)
single_update = True
else:
single_update = False
if "id" not in data_df.columns:
raise ValueError("data_df is missing required column 'id'.")
# check if IDs exist
existing_query = sql.SQL("SELECT id FROM performance.inv_physical_counts WHERE id = ANY({ids})").format(
ids=sql.Literal(data_df["id"].to_list()),
)
with self._perfdb.conn.reconnect() as conn:
existing_df = conn.read_to_polars(existing_query)
if wrong_ids := set(data_df["id"].to_list()) - set(existing_df["id"].to_list()):
raise ValueError(f"Physical count IDs {wrong_ids} do not exist in the database.")
# resolve executed_by_name to executed_by_id
if "executed_by_name" in data_df.columns and len(data_df.filter(pl.col("executed_by_name").is_not_null())) > 0:
user_names = data_df["executed_by_name"].drop_nulls().unique().to_list()
user_ids = self._perfdb.users.instances.get_ids(names=user_names)
if wrong_users := set(user_names) - set(user_ids.keys()):
raise ValueError(f"User names not found in the database: {wrong_users}")
data_df = data_df.with_columns(
pl.col("executed_by_name").replace_strict(user_ids, return_dtype=pl.Int64, default=None).alias("executed_by_id"),
)
else:
data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("executed_by_id"))
if "executed_by_name" in data_df.columns:
data_df = data_df.drop(["executed_by_name"])
self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="inv_physical_counts",
schema="performance",
conflict_cols=["id"],
if_exists="update_only",
ignore_null_cols=single_update,
)
logger.debug(f"Updated {len(data_df)} physical count(s).")