Inventory Withdrawals¶
InventoryWithdrawals(perfdb)
¶
Class used for handling Inventory Withdrawals. Can be accessed via perfdb.inventory.withdrawals.
Parameters:
Source code in echo_postgres/inventory_withdrawals.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Class used for handling Inventory Withdrawals. Can be accessed via `perfdb.inventory.withdrawals`.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
super().__init__(perfdb)
from .inventory_withdrawal_items import InventoryWithdrawalItems
from .inventory_withdrawal_service_orders import InventoryWithdrawalServiceOrders
# * subclasses
self.items = InventoryWithdrawalItems(perfdb)
self.service_orders = InventoryWithdrawalServiceOrders(perfdb)
delete(withdrawal_ids)
¶
Deletes inventory withdrawals from the database.
Parameters:
-
(withdrawal_ids¶list[int]) –List of withdrawal IDs to be deleted.
Returns:
-
int–Number of rows deleted.
Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def delete(self, withdrawal_ids: list[int]) -> int:
"""Deletes inventory withdrawals from the database.
Parameters
----------
withdrawal_ids : list[int]
List of withdrawal IDs to be deleted.
Returns
-------
int
Number of rows deleted.
"""
query = sql.SQL("DELETE FROM performance.inv_withdrawals WHERE id = ANY({ids})").format(
ids=sql.Literal(withdrawal_ids),
)
self._perfdb.conn.execute(query)
deleted = self._perfdb.conn.rowcount
logger.debug(f"Deleted {deleted} withdrawal(s).")
return deleted
get(ids=None, center_names=None, storage_location_names=None, service_order_names=None, statuses=None, period=None, sla_period=None, finished_period=None, sla_violated=None, creator_names=None, created_by_ids=None, modifier_names=None, modified_by_ids=None, filter_type='and', output_type='pl.DataFrame')
¶
Gets all inventory withdrawals and their attributes.
The most useful keys/columns returned are:
- id
- service_order_ids
- service_order_names
- service_order_sap_ids
- storage_location_name
- center_name
- withdrawal_date
- sla_deadline_date
- finished_date
- status
- is_sla_violated
- creator_name
- modifier_name
- display_label
Parameters:
-
(ids¶list[int] | None, default:None) –List of withdrawal IDs to filter. By default None.
-
(center_names¶list[str] | None, default:None) –List of center names to filter. By default None.
-
(storage_location_names¶list[str] | None, default:None) –List of storage location names to filter. By default None.
-
(service_order_names¶list[str] | None, default:None) –List of service order names to filter. By default None.
-
(statuses¶list[str] | None, default:None) –List of statuses to filter. By default None.
-
(period¶DateTimeRange | None, default:None) –Date range to filter by withdrawal_date. By default None.
-
(sla_period¶DateTimeRange | None, default:None) –Date range to filter by sla_deadline_date. By default None.
-
(finished_period¶DateTimeRange | None, default:None) –Date range to filter by finished_date. By default None.
-
(sla_violated¶bool | None, default:None) –Filter by whether the SLA has been violated (is_sla_violated column). By default None.
-
(creator_names¶list[str] | None, default:None) –List of creator names to filter. By default None.
-
(created_by_ids¶list[int] | None, default:None) –List of created_by user IDs to filter. By default None.
-
(modifier_names¶list[str] | None, default:None) –List of modifier names to filter. By default None.
-
(modified_by_ids¶list[int] | None, default:None) –List of modified_by user IDs to filter. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
-
(output_type¶Literal['dict', 'DataFrame', 'pl.DataFrame'], default:'pl.DataFrame') –Output type of the data. By default "pl.DataFrame".
Returns:
-
dict[int, dict[str, Any]]–In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
-
DataFrame–In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame.
Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def get(
self,
ids: list[int] | None = None,
center_names: list[str] | None = None,
storage_location_names: list[str] | None = None,
service_order_names: list[str] | None = None,
statuses: list[str] | None = None,
period: DateTimeRange | None = None,
sla_period: DateTimeRange | None = None,
finished_period: DateTimeRange | None = None,
sla_violated: bool | None = None,
creator_names: list[str] | None = None,
created_by_ids: list[int] | None = None,
modifier_names: list[str] | None = None,
modified_by_ids: list[int] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
"""Gets all inventory withdrawals and their attributes.
The most useful keys/columns returned are:
- id
- service_order_ids
- service_order_names
- service_order_sap_ids
- storage_location_name
- center_name
- withdrawal_date
- sla_deadline_date
- finished_date
- status
- is_sla_violated
- creator_name
- modifier_name
- display_label
Parameters
----------
ids : list[int] | None, optional
List of withdrawal IDs to filter. By default None.
center_names : list[str] | None, optional
List of center names to filter. By default None.
storage_location_names : list[str] | None, optional
List of storage location names to filter. By default None.
service_order_names : list[str] | None, optional
List of service order names to filter. By default None.
statuses : list[str] | None, optional
List of statuses to filter. By default None.
period : DateTimeRange | None, optional
Date range to filter by withdrawal_date. By default None.
sla_period : DateTimeRange | None, optional
Date range to filter by sla_deadline_date. By default None.
finished_period : DateTimeRange | None, optional
Date range to filter by finished_date. By default None.
sla_violated : bool | None, optional
Filter by whether the SLA has been violated (is_sla_violated column). By default None.
creator_names : list[str] | None, optional
List of creator names to filter. By default None.
created_by_ids : list[int] | None, optional
List of created_by user IDs to filter. By default None.
modifier_names : list[str] | None, optional
List of modifier names to filter. By default None.
modified_by_ids : list[int] | None, optional
List of modified_by user IDs to filter. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
Output type of the data. By default "pl.DataFrame".
Returns
-------
dict[int, dict[str, Any]]
In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
pd.DataFrame
In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame.
"""
where = self._check_get_args(
ids=ids,
center_names=center_names,
storage_location_names=storage_location_names,
service_order_names=service_order_names,
statuses=statuses,
period=period,
sla_period=sla_period,
finished_period=finished_period,
sla_violated=sla_violated,
creator_names=creator_names,
created_by_ids=created_by_ids,
modifier_names=modifier_names,
modified_by_ids=modified_by_ids,
filter_type=filter_type,
)
query = sql.SQL("SELECT * FROM performance.v_inv_withdrawals {where} ORDER BY id").format(where=where)
df = self._perfdb.conn.read_to_polars(query, schema_overrides=self._cols_schema)
if output_type == "pl.DataFrame":
return df
df = df.to_pandas(use_pyarrow_extension_array=True)
df = df.set_index("id")
if output_type == "DataFrame":
return df
return df.to_dict(orient="index")
get_ids(ids=None, center_names=None, storage_location_names=None, service_order_names=None, statuses=None, period=None, sla_period=None, finished_period=None, sla_violated=None, creator_names=None, created_by_ids=None, modifier_names=None, modified_by_ids=None, filter_type='and')
¶
Gets all inventory withdrawal IDs matching the provided filters.
Parameters:
-
(ids¶list[int] | None, default:None) –List of withdrawal IDs to filter. By default None.
-
(center_names¶list[str] | None, default:None) –List of center names to filter. By default None.
-
(storage_location_names¶list[str] | None, default:None) –List of storage location names to filter. By default None.
-
(service_order_names¶list[str] | None, default:None) –List of service order names to filter. By default None.
-
(statuses¶list[str] | None, default:None) –List of statuses to filter. By default None.
-
(period¶DateTimeRange | None, default:None) –Date range to filter by withdrawal_date. By default None.
-
(sla_period¶DateTimeRange | None, default:None) –Date range to filter by sla_deadline_date. By default None.
-
(finished_period¶DateTimeRange | None, default:None) –Date range to filter by finished_date. By default None.
-
(sla_violated¶bool | None, default:None) –Filter by whether the SLA has been violated (is_sla_violated column). By default None.
-
(creator_names¶list[str] | None, default:None) –List of creator names to filter. By default None.
-
(created_by_ids¶list[int] | None, default:None) –List of created_by user IDs to filter. By default None.
-
(modifier_names¶list[str] | None, default:None) –List of modifier names to filter. By default None.
-
(modified_by_ids¶list[int] | None, default:None) –List of modified_by user IDs to filter. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
Returns:
-
list[int]–List of withdrawal IDs matching the filters.
Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def get_ids(
self,
ids: list[int] | None = None,
center_names: list[str] | None = None,
storage_location_names: list[str] | None = None,
service_order_names: list[str] | None = None,
statuses: list[str] | None = None,
period: DateTimeRange | None = None,
sla_period: DateTimeRange | None = None,
finished_period: DateTimeRange | None = None,
sla_violated: bool | None = None,
creator_names: list[str] | None = None,
created_by_ids: list[int] | None = None,
modifier_names: list[str] | None = None,
modified_by_ids: list[int] | None = None,
filter_type: Literal["and", "or"] = "and",
) -> list[int]:
"""Gets all inventory withdrawal IDs matching the provided filters.
Parameters
----------
ids : list[int] | None, optional
List of withdrawal IDs to filter. By default None.
center_names : list[str] | None, optional
List of center names to filter. By default None.
storage_location_names : list[str] | None, optional
List of storage location names to filter. By default None.
service_order_names : list[str] | None, optional
List of service order names to filter. By default None.
statuses : list[str] | None, optional
List of statuses to filter. By default None.
period : DateTimeRange | None, optional
Date range to filter by withdrawal_date. By default None.
sla_period : DateTimeRange | None, optional
Date range to filter by sla_deadline_date. By default None.
finished_period : DateTimeRange | None, optional
Date range to filter by finished_date. By default None.
sla_violated : bool | None, optional
Filter by whether the SLA has been violated (is_sla_violated column). By default None.
creator_names : list[str] | None, optional
List of creator names to filter. By default None.
created_by_ids : list[int] | None, optional
List of created_by user IDs to filter. By default None.
modifier_names : list[str] | None, optional
List of modifier names to filter. By default None.
modified_by_ids : list[int] | None, optional
List of modified_by user IDs to filter. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
Returns
-------
list[int]
List of withdrawal IDs matching the filters.
"""
where = self._check_get_args(
ids=ids,
center_names=center_names,
storage_location_names=storage_location_names,
service_order_names=service_order_names,
statuses=statuses,
period=period,
sla_period=sla_period,
finished_period=finished_period,
sla_violated=sla_violated,
creator_names=creator_names,
created_by_ids=created_by_ids,
modifier_names=modifier_names,
modified_by_ids=modified_by_ids,
filter_type=filter_type,
)
query = sql.SQL("SELECT id FROM performance.v_inv_withdrawals {where} ORDER BY id").format(where=where)
df = self._perfdb.conn.read_to_polars(query, schema_overrides=self._cols_schema)
return df["id"].to_list()
get_stats(center_names=None, storage_location_names=None, service_order_names=None, statuses=None, period=None, sla_period=None, finished_period=None, sla_violated=None, creator_names=None, created_by_ids=None, modifier_names=None, modified_by_ids=None, filter_type='and', top_n=20)
¶
Returns a collection of aggregated statistics about withdrawals for use in dashboards.
All filters are applied consistently across every stat category so only the relevant subset of data is analyzed, keeping queries light.
Parameters:
-
(center_names¶list[str] | None, default:None) –List of center names to filter. By default None.
-
(storage_location_names¶list[str] | None, default:None) –List of storage location names to filter. By default None.
-
(service_order_names¶list[str] | None, default:None) –List of service order names to filter. By default None.
-
(statuses¶list[str] | None, default:None) –List of statuses to filter. Valid values: ABERTA, PARCIALMENTE_APONTADA, APONTADA, CANCELADA. By default None.
-
(period¶DateTimeRange | None, default:None) –Date range to filter by withdrawal_date. By default None.
-
(sla_period¶DateTimeRange | None, default:None) –Date range to filter by sla_deadline_date. By default None.
-
(finished_period¶DateTimeRange | None, default:None) –Date range to filter by finished_date. By default None.
-
(sla_violated¶bool | None, default:None) –Filter by SLA violation status. By default None.
-
(creator_names¶list[str] | None, default:None) –List of creator names to filter. By default None.
-
(created_by_ids¶list[int] | None, default:None) –List of created_by user IDs to filter. By default None.
-
(modifier_names¶list[str] | None, default:None) –List of modifier names to filter. By default None.
-
(modified_by_ids¶list[int] | None, default:None) –List of modified_by user IDs to filter. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
-
(top_n¶int, default:20) –Maximum number of rows returned for ranked/top lists (e.g. open_by_creator, top_materials). By default 20.
Returns:
-
dict[str, DataFrame]–Dictionary with the following keys, each containing a Polars DataFrame:
-
"by_status"— withdrawal count per status. Columns: status, count. -
"by_center"— totals and open/SLA-violated counts per center. Columns: center_name, total_count, open_count, sla_violated_count. -
"by_storage_location"— totals and open/SLA-violated counts per storage location. Columns: center_name, storage_location_name, total_count, open_count, sla_violated_count. -
"by_creator"— per-creator totals, open count, SLA-violated count and average days to close. Columns: creator_name, total_count, open_count, sla_violated_count, avg_days_to_close. -
"by_date"— daily withdrawal count. Columns: withdrawal_date, count. -
"open_by_creator"— top_n creators ranked by number of open withdrawals (descending). Columns: creator_name, open_count. -
"avg_close_time_by_creator"— top_n creators ranked by average days to close (descending). Only considers withdrawals in status APONTADA or CANCELADA with a non-null finished_date. Columns: creator_name, avg_days_to_close, closed_count. -
"sla_summary"— single-row global SLA summary. Columns: total_count, sla_ok_count, sla_violated_count, sla_violation_rate. -
"top_materials"— top_n most withdrawn materials by total quantity (descending). Columns: material_description, material_sap_id, base_unit, total_withdrawn_qty, withdrawal_count.
-
Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def get_stats(
self,
center_names: list[str] | None = None,
storage_location_names: list[str] | None = None,
service_order_names: list[str] | None = None,
statuses: list[str] | None = None,
period: DateTimeRange | None = None,
sla_period: DateTimeRange | None = None,
finished_period: DateTimeRange | None = None,
sla_violated: bool | None = None,
creator_names: list[str] | None = None,
created_by_ids: list[int] | None = None,
modifier_names: list[str] | None = None,
modified_by_ids: list[int] | None = None,
filter_type: Literal["and", "or"] = "and",
top_n: int = 20,
) -> dict[str, pl.DataFrame]:
"""Returns a collection of aggregated statistics about withdrawals for use in dashboards.
All filters are applied consistently across every stat category so only the relevant subset
of data is analyzed, keeping queries light.
Parameters
----------
center_names : list[str] | None, optional
List of center names to filter. By default None.
storage_location_names : list[str] | None, optional
List of storage location names to filter. By default None.
service_order_names : list[str] | None, optional
List of service order names to filter. By default None.
statuses : list[str] | None, optional
List of statuses to filter. Valid values: ABERTA, PARCIALMENTE_APONTADA, APONTADA, CANCELADA. By default None.
period : DateTimeRange | None, optional
Date range to filter by withdrawal_date. By default None.
sla_period : DateTimeRange | None, optional
Date range to filter by sla_deadline_date. By default None.
finished_period : DateTimeRange | None, optional
Date range to filter by finished_date. By default None.
sla_violated : bool | None, optional
Filter by SLA violation status. By default None.
creator_names : list[str] | None, optional
List of creator names to filter. By default None.
created_by_ids : list[int] | None, optional
List of created_by user IDs to filter. By default None.
modifier_names : list[str] | None, optional
List of modifier names to filter. By default None.
modified_by_ids : list[int] | None, optional
List of modified_by user IDs to filter. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
top_n : int, optional
Maximum number of rows returned for ranked/top lists (e.g. open_by_creator, top_materials).
By default 20.
Returns
-------
dict[str, pl.DataFrame]
Dictionary with the following keys, each containing a Polars DataFrame:
- ``"by_status"`` — withdrawal count per status.
Columns: status, count.
- ``"by_center"`` — totals and open/SLA-violated counts per center.
Columns: center_name, total_count, open_count, sla_violated_count.
- ``"by_storage_location"`` — totals and open/SLA-violated counts per storage location.
Columns: center_name, storage_location_name, total_count, open_count, sla_violated_count.
- ``"by_creator"`` — per-creator totals, open count, SLA-violated count and average days to close.
Columns: creator_name, total_count, open_count, sla_violated_count, avg_days_to_close.
- ``"by_date"`` — daily withdrawal count.
Columns: withdrawal_date, count.
- ``"open_by_creator"`` — top_n creators ranked by number of open withdrawals (descending).
Columns: creator_name, open_count.
- ``"avg_close_time_by_creator"`` — top_n creators ranked by average days to close (descending).
Only considers withdrawals in status APONTADA or CANCELADA with a non-null finished_date.
Columns: creator_name, avg_days_to_close, closed_count.
- ``"sla_summary"`` — single-row global SLA summary.
Columns: total_count, sla_ok_count, sla_violated_count, sla_violation_rate.
- ``"top_materials"`` — top_n most withdrawn materials by total quantity (descending).
Columns: material_description, material_sap_id, base_unit, total_withdrawn_qty, withdrawal_count.
"""
where = self._check_get_args(
ids=None,
center_names=center_names,
storage_location_names=storage_location_names,
service_order_names=service_order_names,
statuses=statuses,
period=period,
sla_period=sla_period,
finished_period=finished_period,
sla_violated=sla_violated,
creator_names=creator_names,
created_by_ids=created_by_ids,
modifier_names=modifier_names,
modified_by_ids=modified_by_ids,
filter_type=filter_type,
)
result: dict[str, pl.DataFrame] = {}
# -- by_status --
result["by_status"] = self._perfdb.conn.read_to_polars(
sql.SQL("""
SELECT status, COUNT(*) AS count
FROM performance.v_inv_withdrawals
{where}
GROUP BY status
ORDER BY count DESC
""").format(where=where),
)
# -- by_center --
result["by_center"] = self._perfdb.conn.read_to_polars(
sql.SQL("""
SELECT
center_name,
COUNT(*) AS total_count,
SUM(CASE WHEN status = 'ABERTA' THEN 1 ELSE 0 END) AS open_count,
SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violated_count
FROM performance.v_inv_withdrawals
{where}
GROUP BY center_name
ORDER BY total_count DESC
""").format(where=where),
)
# -- by_storage_location --
result["by_storage_location"] = self._perfdb.conn.read_to_polars(
sql.SQL("""
SELECT
center_name,
storage_location_name,
COUNT(*) AS total_count,
SUM(CASE WHEN status = 'ABERTA' THEN 1 ELSE 0 END) AS open_count,
SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violated_count
FROM performance.v_inv_withdrawals
{where}
GROUP BY center_name, storage_location_name
ORDER BY total_count DESC
""").format(where=where),
)
# -- by_creator --
result["by_creator"] = self._perfdb.conn.read_to_polars(
sql.SQL("""
SELECT
creator_name,
COUNT(*) AS total_count,
SUM(CASE WHEN status = 'ABERTA' THEN 1 ELSE 0 END) AS open_count,
SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violated_count,
ROUND(
AVG(
CASE
WHEN status = 'APONTADA' AND finished_date IS NOT NULL
THEN EXTRACT(EPOCH FROM (finished_date - created_date)) / 86400
ELSE NULL
END
),
2
) AS avg_days_to_close
FROM performance.v_inv_withdrawals
{where}
GROUP BY creator_name
ORDER BY total_count DESC
""").format(where=where),
)
# -- by_date --
result["by_date"] = self._perfdb.conn.read_to_polars(
sql.SQL("""
SELECT withdrawal_date, COUNT(*) AS count
FROM performance.v_inv_withdrawals
{where}
GROUP BY withdrawal_date
ORDER BY withdrawal_date
""").format(where=where),
)
# -- open_by_creator (top_n) --
result["open_by_creator"] = self._perfdb.conn.read_to_polars(
sql.SQL("""
SELECT
creator_name,
SUM(CASE WHEN status = 'ABERTA' THEN 1 ELSE 0 END) AS open_count
FROM performance.v_inv_withdrawals
{where}
GROUP BY creator_name
ORDER BY open_count DESC
LIMIT {top_n}
""").format(where=where, top_n=sql.Literal(top_n)),
)
# -- avg_close_time_by_creator (top_n) --
# Build an additional WHERE clause that restricts to closed/cancelled rows.
# We always AND this on top of the existing WHERE regardless of filter_type.
closed_filter = sql.SQL(
"status = 'APONTADA' AND finished_date IS NOT NULL",
)
where_closed = sql.SQL("WHERE ") + closed_filter if where == sql.SQL("") else where + sql.SQL(" AND ") + closed_filter
result["avg_close_time_by_creator"] = self._perfdb.conn.read_to_polars(
sql.SQL("""
SELECT
creator_name,
ROUND(
AVG(EXTRACT(EPOCH FROM (finished_date - created_date)) / 86400)::numeric,
2
) AS avg_days_to_close,
COUNT(*) AS closed_count
FROM performance.v_inv_withdrawals
{where_closed}
GROUP BY creator_name
ORDER BY avg_days_to_close DESC
LIMIT {top_n}
""").format(where_closed=where_closed, top_n=sql.Literal(top_n)),
)
# -- sla_summary --
result["sla_summary"] = self._perfdb.conn.read_to_polars(
sql.SQL("""
SELECT
COUNT(*) AS total_count,
SUM(CASE WHEN NOT is_sla_violated THEN 1 ELSE 0 END) AS sla_ok_count,
SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violated_count,
ROUND(
SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END)::numeric / NULLIF(COUNT(*), 0) * 100,
2
) AS sla_violation_rate
FROM performance.v_inv_withdrawals
{where}
""").format(where=where),
)
# -- top_materials (top_n, uses v_inv_withdrawal_items) --
# Build a simplified WHERE for the withdrawal_items view (different column names).
# Supported overlay filters: center_names, storage_location_names, creator_names,
# withdrawal_ids (resolved from main where above via IDs), withdrawal_statuses, period.
wi_conditions = []
if center_names:
wi_conditions.append(
sql.SQL("center_name = ANY({v})").format(v=sql.Literal(center_names)),
)
if storage_location_names:
wi_conditions.append(
sql.SQL("storage_location_name = ANY({v})").format(v=sql.Literal(storage_location_names)),
)
if creator_names:
wi_conditions.append(
sql.SQL("creator_name = ANY({v})").format(v=sql.Literal(creator_names)),
)
if statuses:
wi_conditions.append(
sql.SQL("withdrawal_status = ANY({v})").format(v=sql.Literal(statuses)),
)
if period:
wi_conditions.append(
sql.SQL("(withdrawal_date >= {start} AND withdrawal_date <= {end})").format(
start=sql.Literal(period.start.date().isoformat()),
end=sql.Literal(period.end.date().isoformat()),
),
)
where_wi = sql.SQL("WHERE ") + sql.SQL(f" {filter_type.upper()} ").join(wi_conditions) if wi_conditions else sql.SQL("")
result["top_materials"] = self._perfdb.conn.read_to_polars(
sql.SQL("""
SELECT
material_description,
material_sap_id,
base_unit,
SUM(withdrawn_quantity) AS total_withdrawn_qty,
COUNT(DISTINCT withdrawal_id) AS withdrawal_count
FROM performance.v_inv_withdrawal_items
{where_wi}
GROUP BY material_description, material_sap_id, base_unit
ORDER BY total_withdrawn_qty DESC
LIMIT {top_n}
""").format(where_wi=where_wi, top_n=sql.Literal(top_n)),
)
logger.debug(
f"get_stats() returned {len(result)} stat categories for withdrawals.",
)
return result
insert(service_order_names=None, storage_location_name=None, center_name=None, withdrawal_date=None, status='ABERTA', created_by_name=None, data_df=None)
¶
Inserts a new inventory withdrawal into the database and links its service orders.
You can either pass individual values to insert a single withdrawal, or pass a DataFrame to insert multiple withdrawals at once.
Parameters:
-
(service_order_names¶list[str] | None, default:None) –Names of the service orders. Must exist in service_orders table. By default None.
-
(storage_location_name¶str | None, default:None) –Name of the storage location. By default None.
-
(center_name¶str | None, default:None) –Name of the center (used together with storage_location_name to resolve storage location ID). By default None.
-
(withdrawal_date¶date | None, default:None) –Date of the withdrawal. By default None.
-
(status¶str, default:'ABERTA') –Status of the withdrawal. Valid values: ABERTA, PARCIALMENTE_APONTADA, APONTADA, CANCELADA. By default "ABERTA".
-
(created_by_name¶str | None, default:None) –Name of the user creating the withdrawal. Must exist in users table. By default None.
-
(data_df¶DataFrame | None, default:None) –Polars DataFrame containing multiple withdrawals to insert. Required columns: service_order_names, storage_location_name, center_name, withdrawal_date, created_by_name. Optional: status. If this is used, all individual parameters will be ignored. By default None.
Returns:
-
int | list[int] | None–If inserting a single withdrawal, returns the ID. If inserting multiple, returns a list of IDs. Returns None if nothing was inserted.
Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def insert(
self,
service_order_names: list[str] | None = None,
storage_location_name: str | None = None,
center_name: str | None = None,
withdrawal_date: date | None = None,
status: str = "ABERTA",
created_by_name: str | None = None,
data_df: pl.DataFrame | None = None,
) -> int | list[int] | None:
"""Inserts a new inventory withdrawal into the database and links its service orders.
You can either pass individual values to insert a single withdrawal, or pass a DataFrame
to insert multiple withdrawals at once.
Parameters
----------
service_order_names : list[str] | None, optional
Names of the service orders. Must exist in service_orders table. By default None.
storage_location_name : str | None, optional
Name of the storage location. By default None.
center_name : str | None, optional
Name of the center (used together with storage_location_name to resolve storage location ID). By default None.
withdrawal_date : date | None, optional
Date of the withdrawal. By default None.
status : str, optional
Status of the withdrawal. Valid values: ABERTA, PARCIALMENTE_APONTADA, APONTADA, CANCELADA. By default "ABERTA".
created_by_name : str | None, optional
Name of the user creating the withdrawal. Must exist in users table. By default None.
data_df : pl.DataFrame | None, optional
Polars DataFrame containing multiple withdrawals to insert.
Required columns: service_order_names, storage_location_name, center_name, withdrawal_date, created_by_name.
Optional: status.
If this is used, all individual parameters will be ignored. By default None.
Returns
-------
int | list[int] | None
If inserting a single withdrawal, returns the ID.
If inserting multiple, returns a list of IDs.
Returns None if nothing was inserted.
"""
df_schema = {
"service_order_names": pl.List(pl.Utf8), # Updated to List type
"storage_location_name": pl.Utf8,
"center_name": pl.Utf8,
"withdrawal_date": pl.Date,
"status": pl.Utf8,
"created_by_name": pl.Utf8,
}
if data_df is None:
single_insert = True
data_df = pl.DataFrame(
{
"service_order_names": [service_order_names],
"storage_location_name": [storage_location_name],
"center_name": [center_name],
"withdrawal_date": [withdrawal_date],
"status": [status],
"created_by_name": [created_by_name],
},
schema=df_schema,
)
else:
single_insert = False
# converting to expected schema
data_df = data_df.cast({col: dtype for col, dtype in df_schema.items() if col in data_df.columns})
# Check required columns
required_cols = ["storage_location_name", "center_name", "withdrawal_date", "created_by_name", "service_order_names"]
for col in required_cols:
if col not in data_df.columns:
raise ValueError(f"data_df is missing required column '{col}'.")
if len(data_df.filter(pl.col(col).is_null())) > 0:
raise ValueError(f"data_df column '{col}' contains null values, which are not allowed.")
# Resolve storage_location_id
sl_ids_nested = self._perfdb.inventory.storage_locations.get_ids()
sl_ids_flat = {(c, loc): loc_id for c, locs in sl_ids_nested.items() for loc, loc_id in locs.items()}
pairs = list(zip(data_df["center_name"].to_list(), data_df["storage_location_name"].to_list(), strict=False))
if wrong_sls := set(pairs) - set(sl_ids_flat.keys()):
raise ValueError(f"Storage locations not found in the database: {wrong_sls}")
data_df = data_df.with_columns(
pl.Series("storage_location_id", [sl_ids_flat[(c, loc)] for c, loc in pairs], dtype=pl.Int64),
)
# Resolve created_by_name to created_by_id
user_names = data_df["created_by_name"].drop_nulls().unique().to_list()
if user_names:
user_ids = self._perfdb.users.instances.get_ids(names=user_names)
if wrong_users := set(user_names) - set(user_ids.keys()):
raise ValueError(f"User names not found in the database: {wrong_users}")
data_df = data_df.with_columns(
pl.col("created_by_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("created_by_id"),
)
else:
data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("created_by_id"))
# 1. Extract the Service Order Names list BEFORE dropping it from the DataFrame
# This keeps the exact order aligned with the dataframe rows being inserted
so_names_list = data_df["service_order_names"].to_list()
# 2. Drop human-readable columns (including service_order_names, as it doesn't belong in the root table anymore)
data_df = data_df.drop(["service_order_names", "storage_location_name", "center_name", "created_by_name"])
# 3. Insert into the main inv_withdrawals table
ids_df = self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="inv_withdrawals",
schema="performance",
return_cols=["id"],
if_exists="skip_row_check",
ignore_null_cols=single_insert,
)
ids = ids_df["id"].to_list()
# 4. Build the JSON mapping dictionary {withdrawal_id: ["OS-1", "OS-2"]}
mapping_dict = {}
for w_id, so_names in zip(ids, so_names_list, strict=False):
# Only add to mapping if there is an actual list of service orders provided
if so_names and len(so_names) > 0:
mapping_dict[w_id] = so_names # noqa: PERF403
# 5. Delegate the relationship linking to the Service Orders subclass
if mapping_dict:
logger.debug(f"Linking service orders for {len(mapping_dict)} new withdrawal(s)...")
self.service_orders.insert(mapping_dict)
logger.debug(f"Inserted/updated {len(ids)} withdrawal(s) with IDs: {ids}")
return ids if not single_insert else ids[0] if ids else None
update(withdrawal_id=None, status=None, withdrawal_date=None, modified_by_name=None, data_df=None)
¶
Updates an existing inventory withdrawal in the database.
Parameters:
-
(withdrawal_id¶int | None, default:None) –ID of the withdrawal to be updated. Required for single updates. By default None.
-
(status¶str | None, default:None) –New status. Valid values: ABERTA, PARCIALMENTE_APONTADA, APONTADA, CANCELADA. By default None.
-
(withdrawal_date¶date | None, default:None) –New withdrawal date. By default None.
-
(modified_by_name¶str | None, default:None) –Name of the user performing the update. Must exist in users table. By default None.
-
(data_df¶DataFrame | None, default:None) –Polars DataFrame containing multiple withdrawals to update. Required column: id. Optional: status, withdrawal_date, modified_by_name. If this is used, all individual parameters will be ignored. By default None.
Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def update(
self,
withdrawal_id: int | None = None,
status: str | None = None,
withdrawal_date: date | None = None,
modified_by_name: str | None = None,
data_df: pl.DataFrame | None = None,
) -> None:
"""Updates an existing inventory withdrawal in the database.
Parameters
----------
withdrawal_id : int | None, optional
ID of the withdrawal to be updated. Required for single updates. By default None.
status : str | None, optional
New status. Valid values: ABERTA, PARCIALMENTE_APONTADA, APONTADA, CANCELADA. By default None.
withdrawal_date : date | None, optional
New withdrawal date. By default None.
modified_by_name : str | None, optional
Name of the user performing the update. Must exist in users table. By default None.
data_df : pl.DataFrame | None, optional
Polars DataFrame containing multiple withdrawals to update.
Required column: id. Optional: status, withdrawal_date, modified_by_name.
If this is used, all individual parameters will be ignored. By default None.
"""
df_schema = {
"id": pl.Int64,
"status": pl.Utf8,
"withdrawal_date": pl.Date,
"modified_by_name": pl.Utf8,
}
if data_df is None:
data_df = pl.DataFrame(
{
"id": [withdrawal_id],
"status": [status],
"withdrawal_date": [withdrawal_date],
"modified_by_name": [modified_by_name],
},
schema=df_schema,
)
single_update = True
else:
single_update = False
# converting to expected schema
data_df = data_df.cast({col: dtype for col, dtype in df_schema.items() if col in data_df.columns})
if "id" not in data_df.columns:
raise ValueError("data_df is missing required column 'id'.")
# check if IDs exist
existing_query = sql.SQL("SELECT id FROM performance.inv_withdrawals WHERE id = ANY({ids})").format(
ids=sql.Literal(data_df["id"].to_list()),
)
existing_df = self._perfdb.conn.read_to_polars(existing_query, schema_overrides={"id": pl.Int64})
if wrong_ids := set(data_df["id"].to_list()) - set(existing_df["id"].to_list()):
raise ValueError(f"Withdrawal IDs {wrong_ids} do not exist in the database.")
# resolve modified_by_name to modified_by_id
if "modified_by_name" in data_df.columns and len(data_df.filter(pl.col("modified_by_name").is_not_null())) > 0:
user_names = data_df["modified_by_name"].drop_nulls().unique().to_list()
user_ids = self._perfdb.users.instances.get_ids(names=user_names)
if wrong_users := set(user_names) - set(user_ids.keys()):
raise ValueError(f"User names not found in the database: {wrong_users}")
data_df = data_df.with_columns(
pl.col("modified_by_name").replace_strict(user_ids, return_dtype=pl.Int64, default=None).alias("modified_by_id"),
)
else:
data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("modified_by_id"))
if "modified_by_name" in data_df.columns:
data_df = data_df.drop(["modified_by_name"])
self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="inv_withdrawals",
schema="performance",
conflict_cols=["id"],
if_exists="update_only",
ignore_null_cols=single_update,
)
logger.debug(f"Updated {len(data_df)} withdrawal(s).")