Inventory Transaction Headers¶
InventoryTransactionHeaders(perfdb)
¶
Class used for handling Inventory Transaction Headers. Can be accessed via perfdb.inventory.transactions.headers.
Parameters:
Source code in echo_postgres/inventory_transaction_headers.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Class used for handling Inventory Transaction Headers. Can be accessed via `perfdb.inventory.transactions.headers`.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
super().__init__(perfdb)
from .inventory_transaction_header_documents import InventoryTransactionHeaderDocuments
# * subclasses
self.documents = InventoryTransactionHeaderDocuments(perfdb)
delete(header_ids)
¶
Deletes inventory transaction headers from the database.
Parameters:
-
(header_ids¶list[int]) –List of transaction header IDs to be deleted.
Returns:
-
int–Number of rows deleted.
Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def delete(self, header_ids: list[int]) -> int:
"""Deletes inventory transaction headers from the database.
Parameters
----------
header_ids : list[int]
List of transaction header IDs to be deleted.
Returns
-------
int
Number of rows deleted.
"""
query = sql.SQL("DELETE FROM performance.inv_transaction_headers WHERE id = ANY({ids})").format(
ids=sql.Literal(header_ids),
)
self._perfdb.conn.execute(query)
deleted = self._perfdb.conn.rowcount
logger.debug(f"Deleted {deleted} transaction header(s).")
return deleted
get(ids=None, center_names=None, storage_location_names=None, transaction_type_names=None, directions=None, service_order_names=None, period=None, notes_search=None, has_documents=None, filter_type='and', output_type='pl.DataFrame')
¶
Gets all inventory transaction headers and their attributes.
The most useful keys/columns returned are:
- id
- transaction_date
- storage_location_name
- center_name
- transaction_type_name
- direction
- withdrawal_id
- service_order_name
- reference_number
- is_sla_violated
- notes
- creator_name
- display_label
Parameters:
-
(ids¶list[int] | None, default:None) –List of transaction header IDs to filter. By default None.
-
(center_names¶list[str] | None, default:None) –List of center names to filter. By default None.
-
(storage_location_names¶list[str] | None, default:None) –List of storage location names to filter. By default None.
-
(transaction_type_names¶list[str] | None, default:None) –List of transaction type names to filter. By default None.
-
(directions¶list[str] | None, default:None) –List of directions to filter. By default None.
-
(service_order_names¶list[str] | None, default:None) –List of service order names to filter. By default None.
-
(period¶DateTimeRange | None, default:None) –Date range to filter by transaction_date. By default None.
-
(notes_search¶str | None, default:None) –Plain-text search string matched against the notes_searchable tsvector column (GIN index, Portuguese language). By default None.
-
(has_documents¶bool | None, default:None) –If True, only return headers that have at least one attached document. If False, only return headers with no documents. If None, no filter is applied. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
-
(output_type¶Literal['dict', 'DataFrame', 'pl.DataFrame'], default:'pl.DataFrame') –Output type of the data. By default "pl.DataFrame".
Returns:
-
dict[int, dict[str, Any]]–In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
-
DataFrame–In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame.
Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def get(
self,
ids: list[int] | None = None,
center_names: list[str] | None = None,
storage_location_names: list[str] | None = None,
transaction_type_names: list[str] | None = None,
directions: list[str] | None = None,
service_order_names: list[str] | None = None,
period: DateTimeRange | None = None,
notes_search: str | None = None,
has_documents: bool | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
"""Gets all inventory transaction headers and their attributes.
The most useful keys/columns returned are:
- id
- transaction_date
- storage_location_name
- center_name
- transaction_type_name
- direction
- withdrawal_id
- service_order_name
- reference_number
- is_sla_violated
- notes
- creator_name
- display_label
Parameters
----------
ids : list[int] | None, optional
List of transaction header IDs to filter. By default None.
center_names : list[str] | None, optional
List of center names to filter. By default None.
storage_location_names : list[str] | None, optional
List of storage location names to filter. By default None.
transaction_type_names : list[str] | None, optional
List of transaction type names to filter. By default None.
directions : list[str] | None, optional
List of directions to filter. By default None.
service_order_names : list[str] | None, optional
List of service order names to filter. By default None.
period : DateTimeRange | None, optional
Date range to filter by transaction_date. By default None.
notes_search : str | None, optional
Plain-text search string matched against the notes_searchable tsvector column (GIN index, Portuguese language). By default None.
has_documents : bool | None, optional
If True, only return headers that have at least one attached document. If False, only return headers with no documents. If None, no filter is applied. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
Output type of the data. By default "pl.DataFrame".
Returns
-------
dict[int, dict[str, Any]]
In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
pd.DataFrame
In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame.
"""
where = self._check_get_args(
ids=ids,
center_names=center_names,
storage_location_names=storage_location_names,
transaction_type_names=transaction_type_names,
directions=directions,
service_order_names=service_order_names,
period=period,
notes_search=notes_search,
has_documents=has_documents,
filter_type=filter_type,
)
query = sql.SQL("SELECT * FROM performance.v_inv_transaction_headers {where} ORDER BY id").format(where=where)
df = self._perfdb.conn.read_to_polars(query, schema_overrides=self._cols_schema)
# drop notes_searchable column before returning
df = df.drop("notes_searchable")
return convert_output(df, output_type, index_col="id")
get_ids(ids=None, center_names=None, storage_location_names=None, transaction_type_names=None, directions=None, service_order_names=None, period=None, notes_search=None, has_documents=None, filter_type='and')
¶
Gets all inventory transaction header IDs matching the provided filters.
Parameters:
-
(ids¶list[int] | None, default:None) –List of transaction header IDs to filter. By default None.
-
(center_names¶list[str] | None, default:None) –List of center names to filter. By default None.
-
(storage_location_names¶list[str] | None, default:None) –List of storage location names to filter. By default None.
-
(transaction_type_names¶list[str] | None, default:None) –List of transaction type names to filter. By default None.
-
(directions¶list[str] | None, default:None) –List of directions to filter. By default None.
-
(service_order_names¶list[str] | None, default:None) –List of service order names to filter. By default None.
-
(period¶DateTimeRange | None, default:None) –Date range to filter by transaction_date. By default None.
-
(notes_search¶str | None, default:None) –Plain-text search string matched against the notes_searchable tsvector column (GIN index, Portuguese language). By default None.
-
(has_documents¶bool | None, default:None) –If True, only return headers that have at least one attached document. If False, only return headers with no documents. If None, no filter is applied. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
Returns:
-
list[int]–List of transaction header IDs matching the filters.
Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def get_ids(
self,
ids: list[int] | None = None,
center_names: list[str] | None = None,
storage_location_names: list[str] | None = None,
transaction_type_names: list[str] | None = None,
directions: list[str] | None = None,
service_order_names: list[str] | None = None,
period: DateTimeRange | None = None,
notes_search: str | None = None,
has_documents: bool | None = None,
filter_type: Literal["and", "or"] = "and",
) -> list[int]:
"""Gets all inventory transaction header IDs matching the provided filters.
Parameters
----------
ids : list[int] | None, optional
List of transaction header IDs to filter. By default None.
center_names : list[str] | None, optional
List of center names to filter. By default None.
storage_location_names : list[str] | None, optional
List of storage location names to filter. By default None.
transaction_type_names : list[str] | None, optional
List of transaction type names to filter. By default None.
directions : list[str] | None, optional
List of directions to filter. By default None.
service_order_names : list[str] | None, optional
List of service order names to filter. By default None.
period : DateTimeRange | None, optional
Date range to filter by transaction_date. By default None.
notes_search : str | None, optional
Plain-text search string matched against the notes_searchable tsvector column (GIN index, Portuguese language). By default None.
has_documents : bool | None, optional
If True, only return headers that have at least one attached document. If False, only return headers with no documents. If None, no filter is applied. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
Returns
-------
list[int]
List of transaction header IDs matching the filters.
"""
where = self._check_get_args(
ids=ids,
center_names=center_names,
storage_location_names=storage_location_names,
transaction_type_names=transaction_type_names,
directions=directions,
service_order_names=service_order_names,
period=period,
notes_search=notes_search,
has_documents=has_documents,
filter_type=filter_type,
)
query = sql.SQL("SELECT id FROM performance.v_inv_transaction_headers {where} ORDER BY id").format(where=where)
df = self._perfdb.conn.read_to_polars(query, schema_overrides=self._cols_schema)
return df["id"].to_list()
get_stats(center_names=None, storage_location_names=None, transaction_type_names=None, directions=None, service_order_names=None, creator_names=None, period=None, filter_type='and', top_n=20)
¶
Returns a collection of aggregated statistics about transaction headers for use in dashboards.
All filters are applied consistently across every stat category so only the relevant subset of data is analyzed, keeping queries light.
Parameters:
-
(center_names¶list[str] | None, default:None) –List of center names to filter. By default None.
-
(storage_location_names¶list[str] | None, default:None) –List of storage location names to filter. By default None.
-
(transaction_type_names¶list[str] | None, default:None) –List of transaction type names to filter. By default None.
-
(directions¶list[str] | None, default:None) –List of directions to filter. Valid values: ENTRADA, SAIDA, NEUTRO. By default None.
-
(service_order_names¶list[str] | None, default:None) –List of service order names to filter. By default None.
-
(creator_names¶list[str] | None, default:None) –List of creator (user) names to filter. By default None.
-
(period¶DateTimeRange | None, default:None) –Date range to filter by transaction_date. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
-
(top_n¶int, default:20) –Maximum number of rows returned for ranked/top lists (e.g. top_materials, sla_by_creator). By default 20.
Returns:
-
dict[str, DataFrame]–Dictionary with the following keys, each containing a Polars DataFrame:
-
"by_type"— header count and SLA violations per transaction type and direction. Columns: transaction_type_name, direction, header_count, sla_violation_count. -
"by_center"— header count and SLA violation rate per center. Columns: center_name, header_count, sla_violation_count, sla_violation_rate. -
"by_storage_location"— header count and SLA violation rate per storage location. Columns: center_name, storage_location_name, header_count, sla_violation_count, sla_violation_rate. -
"by_creator"— header count and SLA violation rate per creator. Columns: creator_name, header_count, sla_violation_count, sla_violation_rate. -
"by_direction"— aggregated item quantities per movement direction. Columns: direction, header_count, item_count, total_quantity. -
"by_date"— daily header count and SLA violations. Columns: transaction_date, header_count, sla_violation_count. -
"sla_summary"— single-row global SLA summary. Columns: total_headers, sla_ok_count, sla_violation_count, sla_violation_rate. -
"sla_by_creator"— top_n creators ranked by SLA violation count (descending). Columns: creator_name, sla_violation_count, total_headers, sla_violation_rate. -
"top_materials"— top_n most transacted materials by total quantity (descending). Columns: material_description, material_sap_id, base_unit, total_quantity, header_count.
-
Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def get_stats(
self,
center_names: list[str] | None = None,
storage_location_names: list[str] | None = None,
transaction_type_names: list[str] | None = None,
directions: list[str] | None = None,
service_order_names: list[str] | None = None,
creator_names: list[str] | None = None,
period: DateTimeRange | None = None,
filter_type: Literal["and", "or"] = "and",
top_n: int = 20,
) -> dict[str, pl.DataFrame]:
"""Returns a collection of aggregated statistics about transaction headers for use in dashboards.
All filters are applied consistently across every stat category so only the relevant subset
of data is analyzed, keeping queries light.
Parameters
----------
center_names : list[str] | None, optional
List of center names to filter. By default None.
storage_location_names : list[str] | None, optional
List of storage location names to filter. By default None.
transaction_type_names : list[str] | None, optional
List of transaction type names to filter. By default None.
directions : list[str] | None, optional
List of directions to filter. Valid values: ENTRADA, SAIDA, NEUTRO. By default None.
service_order_names : list[str] | None, optional
List of service order names to filter. By default None.
creator_names : list[str] | None, optional
List of creator (user) names to filter. By default None.
period : DateTimeRange | None, optional
Date range to filter by transaction_date. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
top_n : int, optional
Maximum number of rows returned for ranked/top lists (e.g. top_materials, sla_by_creator).
By default 20.
Returns
-------
dict[str, pl.DataFrame]
Dictionary with the following keys, each containing a Polars DataFrame:
- ``"by_type"`` — header count and SLA violations per transaction type and direction.
Columns: transaction_type_name, direction, header_count, sla_violation_count.
- ``"by_center"`` — header count and SLA violation rate per center.
Columns: center_name, header_count, sla_violation_count, sla_violation_rate.
- ``"by_storage_location"`` — header count and SLA violation rate per storage location.
Columns: center_name, storage_location_name, header_count, sla_violation_count, sla_violation_rate.
- ``"by_creator"`` — header count and SLA violation rate per creator.
Columns: creator_name, header_count, sla_violation_count, sla_violation_rate.
- ``"by_direction"`` — aggregated item quantities per movement direction.
Columns: direction, header_count, item_count, total_quantity.
- ``"by_date"`` — daily header count and SLA violations.
Columns: transaction_date, header_count, sla_violation_count.
- ``"sla_summary"`` — single-row global SLA summary.
Columns: total_headers, sla_ok_count, sla_violation_count, sla_violation_rate.
- ``"sla_by_creator"`` — top_n creators ranked by SLA violation count (descending).
Columns: creator_name, sla_violation_count, total_headers, sla_violation_rate.
- ``"top_materials"`` — top_n most transacted materials by total quantity (descending).
Columns: material_description, material_sap_id, base_unit, total_quantity, header_count.
"""
# Build common WHERE from shared filters (reuse _check_get_args, ids/notes_search/has_documents fixed to None)
where_base = self._check_get_args(
ids=None,
center_names=center_names,
storage_location_names=storage_location_names,
transaction_type_names=transaction_type_names,
directions=directions,
service_order_names=service_order_names,
period=period,
notes_search=None,
has_documents=None,
filter_type=filter_type,
)
# Append creator_names filter on top of the shared WHERE
if creator_names:
creator_cond = sql.SQL("creator_name = ANY({creator_names})").format(creator_names=sql.Literal(creator_names))
if where_base == sql.SQL(""):
where = sql.SQL("WHERE ") + creator_cond
else:
where = where_base + sql.SQL(f" {filter_type.upper()} ") + creator_cond
else:
where = where_base
result: dict[str, pl.DataFrame] = {}
# -- by_type --
result["by_type"] = self._perfdb.conn.read_to_polars(
sql.SQL("""
SELECT
transaction_type_name,
direction,
COUNT(*) AS header_count,
SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violation_count
FROM performance.v_inv_transaction_headers
{where}
GROUP BY transaction_type_name, direction
ORDER BY header_count DESC
""").format(where=where),
)
# -- by_center --
result["by_center"] = self._perfdb.conn.read_to_polars(
sql.SQL("""
SELECT
center_name,
COUNT(*) AS header_count,
SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violation_count,
ROUND(
SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END)::numeric / NULLIF(COUNT(*), 0) * 100,
2
) AS sla_violation_rate
FROM performance.v_inv_transaction_headers
{where}
GROUP BY center_name
ORDER BY header_count DESC
""").format(where=where),
)
# -- by_storage_location --
result["by_storage_location"] = self._perfdb.conn.read_to_polars(
sql.SQL("""
SELECT
center_name,
storage_location_name,
COUNT(*) AS header_count,
SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violation_count,
ROUND(
SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END)::numeric / NULLIF(COUNT(*), 0) * 100,
2
) AS sla_violation_rate
FROM performance.v_inv_transaction_headers
{where}
GROUP BY center_name, storage_location_name
ORDER BY header_count DESC
""").format(where=where),
)
# -- by_creator --
result["by_creator"] = self._perfdb.conn.read_to_polars(
sql.SQL("""
SELECT
creator_name,
COUNT(*) AS header_count,
SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violation_count,
ROUND(
SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END)::numeric / NULLIF(COUNT(*), 0) * 100,
2
) AS sla_violation_rate
FROM performance.v_inv_transaction_headers
{where}
GROUP BY creator_name
ORDER BY header_count DESC
""").format(where=where),
)
# -- by_direction (uses v_inv_transaction_items which carries all header columns) --
result["by_direction"] = self._perfdb.conn.read_to_polars(
sql.SQL("""
SELECT
direction,
COUNT(DISTINCT transaction_header_id) AS header_count,
COUNT(*) AS item_count,
SUM(absolute_quantity) AS total_quantity
FROM performance.v_inv_transaction_items
{where}
GROUP BY direction
ORDER BY total_quantity DESC
""").format(where=where),
)
# -- by_date --
result["by_date"] = self._perfdb.conn.read_to_polars(
sql.SQL("""
SELECT
transaction_date,
COUNT(*) AS header_count,
SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violation_count
FROM performance.v_inv_transaction_headers
{where}
GROUP BY transaction_date
ORDER BY transaction_date
""").format(where=where),
)
# -- sla_summary --
result["sla_summary"] = self._perfdb.conn.read_to_polars(
sql.SQL("""
SELECT
COUNT(*) AS total_headers,
SUM(CASE WHEN NOT is_sla_violated THEN 1 ELSE 0 END) AS sla_ok_count,
SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violation_count,
ROUND(
SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END)::numeric / NULLIF(COUNT(*), 0) * 100,
2
) AS sla_violation_rate
FROM performance.v_inv_transaction_headers
{where}
""").format(where=where),
)
# -- sla_by_creator (top_n) --
result["sla_by_creator"] = self._perfdb.conn.read_to_polars(
sql.SQL("""
SELECT
creator_name,
SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violation_count,
COUNT(*) AS total_headers,
ROUND(
SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END)::numeric / NULLIF(COUNT(*), 0) * 100,
2
) AS sla_violation_rate
FROM performance.v_inv_transaction_headers
{where}
GROUP BY creator_name
ORDER BY sla_violation_count DESC
LIMIT {top_n}
""").format(where=where, top_n=sql.Literal(top_n)),
)
# -- top_materials (top_n, uses v_inv_transaction_items) --
result["top_materials"] = self._perfdb.conn.read_to_polars(
sql.SQL("""
SELECT
material_description,
material_sap_id,
base_unit,
SUM(absolute_quantity) AS total_quantity,
COUNT(DISTINCT transaction_header_id) AS header_count
FROM performance.v_inv_transaction_items
{where}
GROUP BY material_description, material_sap_id, base_unit
ORDER BY total_quantity DESC
LIMIT {top_n}
""").format(where=where, top_n=sql.Literal(top_n)),
)
logger.debug(
f"get_stats() returned {len(result)} stat categories for transaction headers.",
)
return result
insert(storage_location_name=None, center_name=None, transaction_date=None, transaction_type_name=None, reference_number=None, withdrawal_id=None, notes=None, created_by_name=None, service_order_name=None, data_df=None)
¶
Inserts a new inventory transaction header into the database.
You can either pass individual values to insert a single header, or pass a DataFrame to insert multiple headers at once.
Parameters:
-
(storage_location_name¶str | None, default:None) –Name of the storage location. By default None.
-
(center_name¶str | None, default:None) –Name of the center (used together with storage_location_name to resolve the storage location ID). By default None.
-
(transaction_date¶date | None, default:None) –Date of the transaction. By default None.
-
(transaction_type_name¶str | None, default:None) –Name of the transaction type. Must exist in inv_transaction_types table. By default None.
-
(reference_number¶str | None, default:None) –Optional reference number for the transaction. By default None.
-
(withdrawal_id¶int | None, default:None) –Optional ID of an associated withdrawal. By default None.
-
(notes¶str | None, default:None) –Optional notes. By default None.
-
(created_by_name¶str | None, default:None) –Name of the user creating the transaction. Must exist in users table. By default None.
-
(service_order_name¶str | None, default:None) –Name of the associated service order. By default None.
-
(data_df¶DataFrame | None, default:None) –Polars DataFrame containing multiple headers to be inserted. Required columns: storage_location_name, center_name, transaction_date, transaction_type_name, created_by_name. Optional: reference_number, withdrawal_id, notes, service_order_name. If this is used, all individual parameters will be ignored. By default None.
Returns:
-
int | list[int] | None–If inserting a single header, returns the ID. If inserting multiple, returns a list of IDs. Returns None if nothing was inserted.
Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def insert(
self,
storage_location_name: str | None = None,
center_name: str | None = None,
transaction_date: date | None = None,
transaction_type_name: str | None = None,
reference_number: str | None = None,
withdrawal_id: int | None = None,
notes: str | None = None,
created_by_name: str | None = None,
service_order_name: str | None = None,
data_df: pl.DataFrame | None = None,
) -> int | list[int] | None:
"""Inserts a new inventory transaction header into the database.
You can either pass individual values to insert a single header, or pass a DataFrame
to insert multiple headers at once.
Parameters
----------
storage_location_name : str | None, optional
Name of the storage location. By default None.
center_name : str | None, optional
Name of the center (used together with storage_location_name to resolve the storage location ID). By default None.
transaction_date : date | None, optional
Date of the transaction. By default None.
transaction_type_name : str | None, optional
Name of the transaction type. Must exist in inv_transaction_types table. By default None.
reference_number : str | None, optional
Optional reference number for the transaction. By default None.
withdrawal_id : int | None, optional
Optional ID of an associated withdrawal. By default None.
notes : str | None, optional
Optional notes. By default None.
created_by_name : str | None, optional
Name of the user creating the transaction. Must exist in users table. By default None.
service_order_name : str | None, optional
Name of the associated service order. By default None.
data_df : pl.DataFrame | None, optional
Polars DataFrame containing multiple headers to be inserted.
Required columns: storage_location_name, center_name, transaction_date, transaction_type_name, created_by_name.
Optional: reference_number, withdrawal_id, notes, service_order_name.
If this is used, all individual parameters will be ignored. By default None.
Returns
-------
int | list[int] | None
If inserting a single header, returns the ID.
If inserting multiple, returns a list of IDs.
Returns None if nothing was inserted.
"""
df_schema = {
"storage_location_name": pl.Utf8,
"center_name": pl.Utf8,
"transaction_date": pl.Date,
"transaction_type_name": pl.Utf8,
"reference_number": pl.Utf8,
"withdrawal_id": pl.Int64,
"notes": pl.Utf8,
"created_by_name": pl.Utf8,
"service_order_name": pl.Utf8,
}
if data_df is None:
single_insert = True
data_df = pl.DataFrame(
{
"storage_location_name": [storage_location_name],
"center_name": [center_name],
"transaction_date": [transaction_date],
"transaction_type_name": [transaction_type_name],
"reference_number": [reference_number],
"withdrawal_id": [withdrawal_id],
"notes": [notes],
"created_by_name": [created_by_name],
"service_order_name": [service_order_name],
},
schema=df_schema,
)
else:
single_insert = False
# converting to expected schema
data_df = data_df.cast({col: dtype for col, dtype in df_schema.items() if col in data_df.columns})
# checking required columns
required_cols = ["storage_location_name", "center_name", "transaction_date", "transaction_type_name", "created_by_name"]
for col in required_cols:
# check if column exists and has no nulls
if col not in data_df.columns:
raise ValueError(f"data_df is missing required column '{col}'.")
if data_df[col].is_null().any():
raise ValueError(f"data_df column '{col}' contains null values, but it is required.")
# resolve storage_location_id from storage_location_name + center_name
sl_ids_nested = self._perfdb.inventory.storage_locations.get_ids()
# flatten to {(center, loc): id} for lookup
sl_ids_flat = {(c, loc): loc_id for c, locs in sl_ids_nested.items() for loc, loc_id in locs.items()}
pairs = list(zip(data_df["center_name"].to_list(), data_df["storage_location_name"].to_list(), strict=False))
if wrong_sls := set(pairs) - set(sl_ids_flat.keys()):
raise ValueError(f"Storage locations not found in the database: {wrong_sls}")
data_df = data_df.with_columns(
pl.Series("storage_location_id", [sl_ids_flat[(c, loc)] for c, loc in pairs], dtype=pl.Int64),
)
# resolve transaction_type_name to transaction_type_id
type_ids = self._perfdb.inventory.transactions.types.get_ids()
if wrong_types := set(data_df["transaction_type_name"].to_list()) - set(type_ids.keys()):
raise ValueError(f"Transaction type names not found in the database: {wrong_types}")
data_df = data_df.with_columns(
pl.col("transaction_type_name").replace_strict(type_ids, return_dtype=pl.Int64).alias("transaction_type_id"),
)
# resolve created_by_name to created_by_id
user_names = data_df["created_by_name"].drop_nulls().unique().to_list()
if user_names:
user_ids = self._perfdb.users.instances.get_ids(names=user_names)
if wrong_users := set(user_names) - set(user_ids.keys()):
raise ValueError(f"User names not found in the database: {wrong_users}")
data_df = data_df.with_columns(
pl.col("created_by_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("created_by_id"),
)
else:
data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("created_by_id"))
# resolve service_order_name to service_order_id
if "service_order_name" in data_df.columns:
if len(data_df.filter(pl.col("service_order_name").is_not_null())) > 0:
so_names = data_df["service_order_name"].drop_nulls().unique().to_list()
so_ids = self._perfdb.service_orders.get_ids(names=so_names)
if wrong_sos := set(so_names) - set(so_ids.keys()):
raise ValueError(f"Service order names not found in the database: {wrong_sos}")
data_df = data_df.with_columns(
pl.col("service_order_name").replace_strict(so_ids, return_dtype=pl.Int64, default=None).alias("service_order_id"),
)
else:
data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("service_order_id"))
# drop human-readable columns
data_df = data_df.drop(
["storage_location_name", "center_name", "transaction_type_name", "created_by_name", "service_order_name"],
strict=False,
)
ids_df = self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="inv_transaction_headers",
schema="performance",
return_cols=["id"],
if_exists="skip_row_check",
ignore_null_cols=single_insert,
)
ids = ids_df["id"].to_list()
logger.debug(f"Inserted/updated {len(ids)} transaction header(s) with IDs: {ids}")
return ids if not single_insert else ids[0] if ids else None
update(header_id=None, transaction_date=None, reference_number=None, is_sla_violated=None, notes=None, data_df=None)
¶
Updates an existing inventory transaction header in the database.
Parameters:
-
(header_id¶int | None, default:None) –ID of the header to be updated. Required for single updates. By default None.
-
(transaction_date¶date | None, default:None) –New transaction date. By default None.
-
(reference_number¶str | None, default:None) –New reference number. By default None.
-
(is_sla_violated¶bool | None, default:None) –New SLA violation status. By default None.
-
(notes¶str | None, default:None) –New notes. By default None.
-
(data_df¶DataFrame | None, default:None) –Polars DataFrame containing multiple headers to update. Required column: id. Optional: transaction_date, reference_number, is_sla_violated, notes. If this is used, all individual parameters will be ignored. By default None.
Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def update(
self,
header_id: int | None = None,
transaction_date: date | None = None,
reference_number: str | None = None,
is_sla_violated: bool | None = None,
notes: str | None = None,
data_df: pl.DataFrame | None = None,
) -> None:
"""Updates an existing inventory transaction header in the database.
Parameters
----------
header_id : int | None, optional
ID of the header to be updated. Required for single updates. By default None.
transaction_date : date | None, optional
New transaction date. By default None.
reference_number : str | None, optional
New reference number. By default None.
is_sla_violated : bool | None, optional
New SLA violation status. By default None.
notes : str | None, optional
New notes. By default None.
data_df : pl.DataFrame | None, optional
Polars DataFrame containing multiple headers to update.
Required column: id. Optional: transaction_date, reference_number, is_sla_violated, notes.
If this is used, all individual parameters will be ignored. By default None.
"""
df_schema = {
"id": pl.Int64,
"transaction_date": pl.Date,
"reference_number": pl.Utf8,
"is_sla_violated": pl.Boolean,
"notes": pl.Utf8,
}
if data_df is None:
data_df = pl.DataFrame(
{
"id": [header_id],
"transaction_date": [transaction_date],
"reference_number": [reference_number],
"is_sla_violated": [is_sla_violated],
"notes": [notes],
},
schema=df_schema,
)
single_update = True
else:
single_update = False
# converting to expected schema
data_df = data_df.cast({col: dtype for col, dtype in df_schema.items() if col in data_df.columns})
if "id" not in data_df.columns:
raise ValueError("data_df is missing required column 'id'.")
# check if IDs exist
existing_query = sql.SQL("SELECT id FROM performance.inv_transaction_headers WHERE id = ANY({ids})").format(
ids=sql.Literal(data_df["id"].to_list()),
)
existing_df = self._perfdb.conn.read_to_polars(existing_query, schema_overrides=self._cols_schema)
if wrong_ids := set(data_df["id"].to_list()) - set(existing_df["id"].to_list()):
raise ValueError(f"Transaction header IDs {wrong_ids} do not exist in the database.")
self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="inv_transaction_headers",
schema="performance",
conflict_cols=["id"],
if_exists="update_only",
ignore_null_cols=single_update,
)
logger.debug(f"Updated {len(data_df)} transaction header(s).")