Inventory Withdrawals¶
InventoryWithdrawals(perfdb)
¶
Class used for handling Inventory Withdrawals. Can be accessed via perfdb.inventory.withdrawals.
Parameters:
Source code in echo_postgres/inventory_withdrawals.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Class used for handling Inventory Withdrawals. Can be accessed via `perfdb.inventory.withdrawals`.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
super().__init__(perfdb)
from .inventory_withdrawal_items import InventoryWithdrawalItems
# * subclasses
self.items = InventoryWithdrawalItems(perfdb)
delete(withdrawal_ids)
¶
Deletes inventory withdrawals from the database.
Parameters:
-
(withdrawal_ids¶list[int]) –List of withdrawal IDs to be deleted.
Returns:
-
int–Number of rows deleted.
Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def delete(self, withdrawal_ids: list[int]) -> int:
"""Deletes inventory withdrawals from the database.
Parameters
----------
withdrawal_ids : list[int]
List of withdrawal IDs to be deleted.
Returns
-------
int
Number of rows deleted.
"""
query = sql.SQL("DELETE FROM performance.inv_withdrawals WHERE id = ANY({ids})").format(
ids=sql.Literal(withdrawal_ids),
)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
deleted = conn.rowcount
logger.debug(f"Deleted {deleted} withdrawal(s).")
return deleted
get(ids=None, center_names=None, storage_location_names=None, service_order_names=None, statuses=None, period=None, filter_type='and', output_type='pl.DataFrame')
¶
Gets all inventory withdrawals and their attributes.
The most useful keys/columns returned are:
- id
- service_order_name
- service_order_sap_id
- storage_location_name
- center_name
- withdrawal_date
- status
- creator_name
- modifier_name
- display_label
Parameters:
-
(ids¶list[int] | None, default:None) –List of withdrawal IDs to filter. By default None.
-
(center_names¶list[str] | None, default:None) –List of center names to filter. By default None.
-
(storage_location_names¶list[str] | None, default:None) –List of storage location names to filter. By default None.
-
(service_order_names¶list[str] | None, default:None) –List of service order names to filter. By default None.
-
(statuses¶list[str] | None, default:None) –List of statuses to filter. By default None.
-
(period¶DateTimeRange | None, default:None) –Date range to filter by withdrawal_date. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
-
(output_type¶Literal['dict', 'DataFrame', 'pl.DataFrame'], default:'pl.DataFrame') –Output type of the data. By default "pl.DataFrame".
Returns:
-
dict[int, dict[str, Any]]–In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
-
DataFrame–In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame.
Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def get(
self,
ids: list[int] | None = None,
center_names: list[str] | None = None,
storage_location_names: list[str] | None = None,
service_order_names: list[str] | None = None,
statuses: list[str] | None = None,
period: DateTimeRange | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
"""Gets all inventory withdrawals and their attributes.
The most useful keys/columns returned are:
- id
- service_order_name
- service_order_sap_id
- storage_location_name
- center_name
- withdrawal_date
- status
- creator_name
- modifier_name
- display_label
Parameters
----------
ids : list[int] | None, optional
List of withdrawal IDs to filter. By default None.
center_names : list[str] | None, optional
List of center names to filter. By default None.
storage_location_names : list[str] | None, optional
List of storage location names to filter. By default None.
service_order_names : list[str] | None, optional
List of service order names to filter. By default None.
statuses : list[str] | None, optional
List of statuses to filter. By default None.
period : DateTimeRange | None, optional
Date range to filter by withdrawal_date. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
Output type of the data. By default "pl.DataFrame".
Returns
-------
dict[int, dict[str, Any]]
In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
pd.DataFrame
In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame.
"""
where = self._check_get_args(
ids=ids,
center_names=center_names,
storage_location_names=storage_location_names,
service_order_names=service_order_names,
statuses=statuses,
period=period,
filter_type=filter_type,
)
query = sql.SQL("SELECT * FROM performance.v_inv_withdrawals {where} ORDER BY id").format(where=where)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_polars(query)
if output_type == "pl.DataFrame":
return df
df = df.to_pandas(use_pyarrow_extension_array=True)
df = df.set_index("id")
if output_type == "DataFrame":
return df
return df.to_dict(orient="index")
get_ids(ids=None, center_names=None, storage_location_names=None, service_order_names=None, statuses=None, period=None, filter_type='and')
¶
Gets all inventory withdrawal IDs matching the provided filters.
Parameters:
-
(ids¶list[int] | None, default:None) –List of withdrawal IDs to filter. By default None.
-
(center_names¶list[str] | None, default:None) –List of center names to filter. By default None.
-
(storage_location_names¶list[str] | None, default:None) –List of storage location names to filter. By default None.
-
(service_order_names¶list[str] | None, default:None) –List of service order names to filter. By default None.
-
(statuses¶list[str] | None, default:None) –List of statuses to filter. By default None.
-
(period¶DateTimeRange | None, default:None) –Date range to filter by withdrawal_date. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
Returns:
-
list[int]–List of withdrawal IDs matching the filters.
Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def get_ids(
self,
ids: list[int] | None = None,
center_names: list[str] | None = None,
storage_location_names: list[str] | None = None,
service_order_names: list[str] | None = None,
statuses: list[str] | None = None,
period: DateTimeRange | None = None,
filter_type: Literal["and", "or"] = "and",
) -> list[int]:
"""Gets all inventory withdrawal IDs matching the provided filters.
Parameters
----------
ids : list[int] | None, optional
List of withdrawal IDs to filter. By default None.
center_names : list[str] | None, optional
List of center names to filter. By default None.
storage_location_names : list[str] | None, optional
List of storage location names to filter. By default None.
service_order_names : list[str] | None, optional
List of service order names to filter. By default None.
statuses : list[str] | None, optional
List of statuses to filter. By default None.
period : DateTimeRange | None, optional
Date range to filter by withdrawal_date. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
Returns
-------
list[int]
List of withdrawal IDs matching the filters.
"""
where = self._check_get_args(
ids=ids,
center_names=center_names,
storage_location_names=storage_location_names,
service_order_names=service_order_names,
statuses=statuses,
period=period,
filter_type=filter_type,
)
query = sql.SQL("SELECT id FROM performance.v_inv_withdrawals {where} ORDER BY id").format(where=where)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_polars(query)
return df["id"].to_list()
insert(service_order_name=None, storage_location_name=None, center_name=None, withdrawal_date=None, status='ABERTA', created_by_name=None, data_df=None)
¶
Inserts a new inventory withdrawal into the database.
You can either pass individual values to insert a single withdrawal, or pass a DataFrame to insert multiple withdrawals at once.
Parameters:
-
(service_order_name¶str | None, default:None) –Name of the service order. Must exist in service_orders table. By default None.
-
(storage_location_name¶str | None, default:None) –Name of the storage location. By default None.
-
(center_name¶str | None, default:None) –Name of the center (used together with storage_location_name to resolve storage location ID). By default None.
-
(withdrawal_date¶date | None, default:None) –Date of the withdrawal. By default None.
-
(status¶str, default:'ABERTA') –Status of the withdrawal. Valid values: ABERTA, PARCIALMENTE_APONTADA, APONTADA, CANCELADA. By default "ABERTA".
-
(created_by_name¶str | None, default:None) –Name of the user creating the withdrawal. Must exist in users table. By default None.
-
(data_df¶DataFrame | None, default:None) –Polars DataFrame containing multiple withdrawals to insert. Required columns: service_order_name, storage_location_name, center_name, withdrawal_date, created_by_name. Optional: status. If this is used, all individual parameters will be ignored. By default None.
Returns:
-
int | list[int] | None–If inserting a single withdrawal, returns the ID. If inserting multiple, returns a list of IDs. Returns None if nothing was inserted.
Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def insert(
self,
service_order_name: str | None = None,
storage_location_name: str | None = None,
center_name: str | None = None,
withdrawal_date: date | None = None,
status: str = "ABERTA",
created_by_name: str | None = None,
data_df: pl.DataFrame | None = None,
) -> int | list[int] | None:
"""Inserts a new inventory withdrawal into the database.
You can either pass individual values to insert a single withdrawal, or pass a DataFrame
to insert multiple withdrawals at once.
Parameters
----------
service_order_name : str | None, optional
Name of the service order. Must exist in service_orders table. By default None.
storage_location_name : str | None, optional
Name of the storage location. By default None.
center_name : str | None, optional
Name of the center (used together with storage_location_name to resolve storage location ID). By default None.
withdrawal_date : date | None, optional
Date of the withdrawal. By default None.
status : str, optional
Status of the withdrawal. Valid values: ABERTA, PARCIALMENTE_APONTADA, APONTADA, CANCELADA. By default "ABERTA".
created_by_name : str | None, optional
Name of the user creating the withdrawal. Must exist in users table. By default None.
data_df : pl.DataFrame | None, optional
Polars DataFrame containing multiple withdrawals to insert.
Required columns: service_order_name, storage_location_name, center_name, withdrawal_date, created_by_name.
Optional: status.
If this is used, all individual parameters will be ignored. By default None.
Returns
-------
int | list[int] | None
If inserting a single withdrawal, returns the ID.
If inserting multiple, returns a list of IDs.
Returns None if nothing was inserted.
"""
df_schema = {
"service_order_name": pl.Utf8,
"storage_location_name": pl.Utf8,
"center_name": pl.Utf8,
"withdrawal_date": pl.Date,
"status": pl.Utf8,
"created_by_name": pl.Utf8,
}
if data_df is None:
single_insert = True
data_df = pl.DataFrame(
{
"service_order_name": [service_order_name],
"storage_location_name": [storage_location_name],
"center_name": [center_name],
"withdrawal_date": [withdrawal_date],
"status": [status],
"created_by_name": [created_by_name],
},
schema=df_schema,
)
else:
single_insert = False
# check required columns
required_cols = ["storage_location_name", "center_name", "withdrawal_date", "created_by_name", "service_order_name"]
for col in required_cols:
if col not in data_df.columns:
raise ValueError(f"data_df is missing required column '{col}'.")
if len(data_df.filter(pl.col(col).is_null())) > 0:
raise ValueError(f"data_df column '{col}' contains null values, which are not allowed.")
# resolve storage_location_id
sl_ids_nested = self._perfdb.inventory.storage_locations.get_ids()
sl_ids_flat = {(c, loc): loc_id for c, locs in sl_ids_nested.items() for loc, loc_id in locs.items()}
pairs = list(zip(data_df["center_name"].to_list(), data_df["storage_location_name"].to_list(), strict=False))
if wrong_sls := set(pairs) - set(sl_ids_flat.keys()):
raise ValueError(f"Storage locations not found in the database: {wrong_sls}")
data_df = data_df.with_columns(
pl.Series("storage_location_id", [sl_ids_flat[(c, loc)] for c, loc in pairs], dtype=pl.Int64),
)
# resolve service_order_name to service_order_id
so_names = data_df["service_order_name"].drop_nulls().unique().to_list()
if so_names:
so_ids = self._perfdb.service_orders.get_ids(names=so_names)
if wrong_sos := set(so_names) - set(so_ids.keys()):
raise ValueError(f"Service order names not found in the database: {wrong_sos}")
data_df = data_df.with_columns(
pl.col("service_order_name").replace_strict(so_ids, return_dtype=pl.Int64).alias("service_order_id"),
)
else:
data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("service_order_id"))
# resolve created_by_name to created_by_id
user_names = data_df["created_by_name"].drop_nulls().unique().to_list()
if user_names:
user_ids = self._perfdb.users.instances.get_ids(names=user_names)
if wrong_users := set(user_names) - set(user_ids.keys()):
raise ValueError(f"User names not found in the database: {wrong_users}")
data_df = data_df.with_columns(
pl.col("created_by_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("created_by_id"),
)
else:
data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("created_by_id"))
# drop human-readable columns
data_df = data_df.drop(["service_order_name", "storage_location_name", "center_name", "created_by_name"])
ids_df = self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="inv_withdrawals",
schema="performance",
return_cols=["id"],
if_exists="skip_row_check",
ignore_null_cols=single_insert,
)
ids = ids_df["id"].to_list()
logger.debug(f"Inserted/updated {len(ids)} withdrawal(s) with IDs: {ids}")
return ids if not single_insert else ids[0] if ids else None
update(withdrawal_id=None, status=None, withdrawal_date=None, modified_by_name=None, data_df=None)
¶
Updates an existing inventory withdrawal in the database.
Parameters:
-
(withdrawal_id¶int | None, default:None) –ID of the withdrawal to be updated. Required for single updates. By default None.
-
(status¶str | None, default:None) –New status. Valid values: ABERTA, PARCIALMENTE_APONTADA, APONTADA, CANCELADA. By default None.
-
(withdrawal_date¶date | None, default:None) –New withdrawal date. By default None.
-
(modified_by_name¶str | None, default:None) –Name of the user performing the update. Must exist in users table. By default None.
-
(data_df¶DataFrame | None, default:None) –Polars DataFrame containing multiple withdrawals to update. Required column: id. Optional: status, withdrawal_date, modified_by_name. If this is used, all individual parameters will be ignored. By default None.
Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def update(
self,
withdrawal_id: int | None = None,
status: str | None = None,
withdrawal_date: date | None = None,
modified_by_name: str | None = None,
data_df: pl.DataFrame | None = None,
) -> None:
"""Updates an existing inventory withdrawal in the database.
Parameters
----------
withdrawal_id : int | None, optional
ID of the withdrawal to be updated. Required for single updates. By default None.
status : str | None, optional
New status. Valid values: ABERTA, PARCIALMENTE_APONTADA, APONTADA, CANCELADA. By default None.
withdrawal_date : date | None, optional
New withdrawal date. By default None.
modified_by_name : str | None, optional
Name of the user performing the update. Must exist in users table. By default None.
data_df : pl.DataFrame | None, optional
Polars DataFrame containing multiple withdrawals to update.
Required column: id. Optional: status, withdrawal_date, modified_by_name.
If this is used, all individual parameters will be ignored. By default None.
"""
df_schema = {
"id": pl.Int64,
"status": pl.Utf8,
"withdrawal_date": pl.Date,
"modified_by_name": pl.Utf8,
}
if data_df is None:
data_df = pl.DataFrame(
{
"id": [withdrawal_id],
"status": [status],
"withdrawal_date": [withdrawal_date],
"modified_by_name": [modified_by_name],
},
schema=df_schema,
)
single_update = True
else:
single_update = False
if "id" not in data_df.columns:
raise ValueError("data_df is missing required column 'id'.")
# check if IDs exist
existing_query = sql.SQL("SELECT id FROM performance.inv_withdrawals WHERE id = ANY({ids})").format(
ids=sql.Literal(data_df["id"].to_list()),
)
with self._perfdb.conn.reconnect() as conn:
existing_df = conn.read_to_polars(existing_query)
if wrong_ids := set(data_df["id"].to_list()) - set(existing_df["id"].to_list()):
raise ValueError(f"Withdrawal IDs {wrong_ids} do not exist in the database.")
# resolve modified_by_name to modified_by_id
if "modified_by_name" in data_df.columns and len(data_df.filter(pl.col("modified_by_name").is_not_null())) > 0:
user_names = data_df["modified_by_name"].drop_nulls().unique().to_list()
user_ids = self._perfdb.users.instances.get_ids(names=user_names)
if wrong_users := set(user_names) - set(user_ids.keys()):
raise ValueError(f"User names not found in the database: {wrong_users}")
data_df = data_df.with_columns(
pl.col("modified_by_name").replace_strict(user_ids, return_dtype=pl.Int64, default=None).alias("modified_by_id"),
)
else:
data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("modified_by_id"))
if "modified_by_name" in data_df.columns:
data_df = data_df.drop(["modified_by_name"])
self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="inv_withdrawals",
schema="performance",
conflict_cols=["id"],
if_exists="update_only",
ignore_null_cols=single_update,
)
logger.debug(f"Updated {len(data_df)} withdrawal(s).")