Inventory Transaction Headers¶
InventoryTransactionHeaders(perfdb)
¶
Class used for handling Inventory Transaction Headers. Can be accessed via perfdb.inventory.transactions.headers.
Parameters:
Source code in echo_postgres/inventory_transaction_headers.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Class used for handling Inventory Transaction Headers. Can be accessed via `perfdb.inventory.transactions.headers`.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
super().__init__(perfdb)
from .inventory_transaction_header_documents import InventoryTransactionHeaderDocuments
# * subclasses
self.documents = InventoryTransactionHeaderDocuments(perfdb)
delete(header_ids)
¶
Deletes inventory transaction headers from the database.
Parameters:
-
(header_ids¶list[int]) –List of transaction header IDs to be deleted.
Returns:
-
int–Number of rows deleted.
Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def delete(self, header_ids: list[int]) -> int:
"""Deletes inventory transaction headers from the database.
Parameters
----------
header_ids : list[int]
List of transaction header IDs to be deleted.
Returns
-------
int
Number of rows deleted.
"""
query = sql.SQL("DELETE FROM performance.inv_transaction_headers WHERE id = ANY({ids})").format(
ids=sql.Literal(header_ids),
)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
deleted = conn.rowcount
logger.debug(f"Deleted {deleted} transaction header(s).")
return deleted
get(ids=None, center_names=None, storage_location_names=None, transaction_type_names=None, directions=None, service_order_names=None, period=None, notes_search=None, filter_type='and', output_type='pl.DataFrame')
¶
Gets all inventory transaction headers and their attributes.
The most useful keys/columns returned are:
- id
- transaction_date
- storage_location_name
- center_name
- transaction_type_name
- direction
- withdrawal_id
- service_order_name
- reference_number
- is_sla_violated
- notes
- creator_name
- display_label
Parameters:
-
(ids¶list[int] | None, default:None) –List of transaction header IDs to filter. By default None.
-
(center_names¶list[str] | None, default:None) –List of center names to filter. By default None.
-
(storage_location_names¶list[str] | None, default:None) –List of storage location names to filter. By default None.
-
(transaction_type_names¶list[str] | None, default:None) –List of transaction type names to filter. By default None.
-
(directions¶list[str] | None, default:None) –List of directions to filter. By default None.
-
(service_order_names¶list[str] | None, default:None) –List of service order names to filter. By default None.
-
(period¶DateTimeRange | None, default:None) –Date range to filter by transaction_date. By default None.
-
(notes_search¶str | None, default:None) –Plain-text search string matched against the notes_searchable tsvector column (GIN index, Portuguese language). By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
-
(output_type¶Literal['dict', 'DataFrame', 'pl.DataFrame'], default:'pl.DataFrame') –Output type of the data. By default "pl.DataFrame".
Returns:
-
dict[int, dict[str, Any]]–In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
-
DataFrame–In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame.
Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def get(
self,
ids: list[int] | None = None,
center_names: list[str] | None = None,
storage_location_names: list[str] | None = None,
transaction_type_names: list[str] | None = None,
directions: list[str] | None = None,
service_order_names: list[str] | None = None,
period: DateTimeRange | None = None,
notes_search: str | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
"""Gets all inventory transaction headers and their attributes.
The most useful keys/columns returned are:
- id
- transaction_date
- storage_location_name
- center_name
- transaction_type_name
- direction
- withdrawal_id
- service_order_name
- reference_number
- is_sla_violated
- notes
- creator_name
- display_label
Parameters
----------
ids : list[int] | None, optional
List of transaction header IDs to filter. By default None.
center_names : list[str] | None, optional
List of center names to filter. By default None.
storage_location_names : list[str] | None, optional
List of storage location names to filter. By default None.
transaction_type_names : list[str] | None, optional
List of transaction type names to filter. By default None.
directions : list[str] | None, optional
List of directions to filter. By default None.
service_order_names : list[str] | None, optional
List of service order names to filter. By default None.
period : DateTimeRange | None, optional
Date range to filter by transaction_date. By default None.
notes_search : str | None, optional
Plain-text search string matched against the notes_searchable tsvector column (GIN index, Portuguese language). By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
Output type of the data. By default "pl.DataFrame".
Returns
-------
dict[int, dict[str, Any]]
In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
pd.DataFrame
In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame.
"""
where = self._check_get_args(
ids=ids,
center_names=center_names,
storage_location_names=storage_location_names,
transaction_type_names=transaction_type_names,
directions=directions,
service_order_names=service_order_names,
period=period,
notes_search=notes_search,
filter_type=filter_type,
)
query = sql.SQL("SELECT * FROM performance.v_inv_transaction_headers {where} ORDER BY id").format(where=where)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_polars(query)
# drop notes_searchable column before returning
df = df.drop("notes_searchable")
if output_type == "pl.DataFrame":
return df
df = df.to_pandas(use_pyarrow_extension_array=True)
df = df.set_index("id")
if output_type == "DataFrame":
return df
return df.to_dict(orient="index")
get_ids(ids=None, center_names=None, storage_location_names=None, transaction_type_names=None, directions=None, service_order_names=None, period=None, notes_search=None, filter_type='and')
¶
Gets all inventory transaction header IDs matching the provided filters.
Parameters:
-
(ids¶list[int] | None, default:None) –List of transaction header IDs to filter. By default None.
-
(center_names¶list[str] | None, default:None) –List of center names to filter. By default None.
-
(storage_location_names¶list[str] | None, default:None) –List of storage location names to filter. By default None.
-
(transaction_type_names¶list[str] | None, default:None) –List of transaction type names to filter. By default None.
-
(directions¶list[str] | None, default:None) –List of directions to filter. By default None.
-
(service_order_names¶list[str] | None, default:None) –List of service order names to filter. By default None.
-
(period¶DateTimeRange | None, default:None) –Date range to filter by transaction_date. By default None.
-
(notes_search¶str | None, default:None) –Plain-text search string matched against the notes_searchable tsvector column (GIN index, Portuguese language). By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
Returns:
-
list[int]–List of transaction header IDs matching the filters.
Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def get_ids(
self,
ids: list[int] | None = None,
center_names: list[str] | None = None,
storage_location_names: list[str] | None = None,
transaction_type_names: list[str] | None = None,
directions: list[str] | None = None,
service_order_names: list[str] | None = None,
period: DateTimeRange | None = None,
notes_search: str | None = None,
filter_type: Literal["and", "or"] = "and",
) -> list[int]:
"""Gets all inventory transaction header IDs matching the provided filters.
Parameters
----------
ids : list[int] | None, optional
List of transaction header IDs to filter. By default None.
center_names : list[str] | None, optional
List of center names to filter. By default None.
storage_location_names : list[str] | None, optional
List of storage location names to filter. By default None.
transaction_type_names : list[str] | None, optional
List of transaction type names to filter. By default None.
directions : list[str] | None, optional
List of directions to filter. By default None.
service_order_names : list[str] | None, optional
List of service order names to filter. By default None.
period : DateTimeRange | None, optional
Date range to filter by transaction_date. By default None.
notes_search : str | None, optional
Plain-text search string matched against the notes_searchable tsvector column (GIN index, Portuguese language). By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
Returns
-------
list[int]
List of transaction header IDs matching the filters.
"""
where = self._check_get_args(
ids=ids,
center_names=center_names,
storage_location_names=storage_location_names,
transaction_type_names=transaction_type_names,
directions=directions,
service_order_names=service_order_names,
period=period,
notes_search=notes_search,
filter_type=filter_type,
)
query = sql.SQL("SELECT id FROM performance.v_inv_transaction_headers {where} ORDER BY id").format(where=where)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_polars(query)
return df["id"].to_list()
insert(storage_location_name=None, center_name=None, transaction_date=None, transaction_type_name=None, reference_number=None, withdrawal_id=None, notes=None, created_by_name=None, service_order_name=None, data_df=None)
¶
Inserts a new inventory transaction header into the database.
You can either pass individual values to insert a single header, or pass a DataFrame to insert multiple headers at once.
Parameters:
-
(storage_location_name¶str | None, default:None) –Name of the storage location. By default None.
-
(center_name¶str | None, default:None) –Name of the center (used together with storage_location_name to resolve the storage location ID). By default None.
-
(transaction_date¶date | None, default:None) –Date of the transaction. By default None.
-
(transaction_type_name¶str | None, default:None) –Name of the transaction type. Must exist in inv_transaction_types table. By default None.
-
(reference_number¶str | None, default:None) –Optional reference number for the transaction. By default None.
-
(withdrawal_id¶int | None, default:None) –Optional ID of an associated withdrawal. By default None.
-
(notes¶str | None, default:None) –Optional notes. By default None.
-
(created_by_name¶str | None, default:None) –Name of the user creating the transaction. Must exist in users table. By default None.
-
(service_order_name¶str | None, default:None) –Name of the associated service order. By default None.
-
(data_df¶DataFrame | None, default:None) –Polars DataFrame containing multiple headers to be inserted. Required columns: storage_location_name, center_name, transaction_date, transaction_type_name, created_by_name. Optional: reference_number, withdrawal_id, notes, service_order_name. If this is used, all individual parameters will be ignored. By default None.
Returns:
-
int | list[int] | None–If inserting a single header, returns the ID. If inserting multiple, returns a list of IDs. Returns None if nothing was inserted.
Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def insert(
self,
storage_location_name: str | None = None,
center_name: str | None = None,
transaction_date: date | None = None,
transaction_type_name: str | None = None,
reference_number: str | None = None,
withdrawal_id: int | None = None,
notes: str | None = None,
created_by_name: str | None = None,
service_order_name: str | None = None,
data_df: pl.DataFrame | None = None,
) -> int | list[int] | None:
"""Inserts a new inventory transaction header into the database.
You can either pass individual values to insert a single header, or pass a DataFrame
to insert multiple headers at once.
Parameters
----------
storage_location_name : str | None, optional
Name of the storage location. By default None.
center_name : str | None, optional
Name of the center (used together with storage_location_name to resolve the storage location ID). By default None.
transaction_date : date | None, optional
Date of the transaction. By default None.
transaction_type_name : str | None, optional
Name of the transaction type. Must exist in inv_transaction_types table. By default None.
reference_number : str | None, optional
Optional reference number for the transaction. By default None.
withdrawal_id : int | None, optional
Optional ID of an associated withdrawal. By default None.
notes : str | None, optional
Optional notes. By default None.
created_by_name : str | None, optional
Name of the user creating the transaction. Must exist in users table. By default None.
service_order_name : str | None, optional
Name of the associated service order. By default None.
data_df : pl.DataFrame | None, optional
Polars DataFrame containing multiple headers to be inserted.
Required columns: storage_location_name, center_name, transaction_date, transaction_type_name, created_by_name.
Optional: reference_number, withdrawal_id, notes, service_order_name.
If this is used, all individual parameters will be ignored. By default None.
Returns
-------
int | list[int] | None
If inserting a single header, returns the ID.
If inserting multiple, returns a list of IDs.
Returns None if nothing was inserted.
"""
df_schema = {
"storage_location_name": pl.Utf8,
"center_name": pl.Utf8,
"transaction_date": pl.Date,
"transaction_type_name": pl.Utf8,
"reference_number": pl.Utf8,
"withdrawal_id": pl.Int64,
"notes": pl.Utf8,
"created_by_name": pl.Utf8,
"service_order_name": pl.Utf8,
}
if data_df is None:
single_insert = True
data_df = pl.DataFrame(
{
"storage_location_name": [storage_location_name],
"center_name": [center_name],
"transaction_date": [transaction_date],
"transaction_type_name": [transaction_type_name],
"reference_number": [reference_number],
"withdrawal_id": [withdrawal_id],
"notes": [notes],
"created_by_name": [created_by_name],
"service_order_name": [service_order_name],
},
schema=df_schema,
)
else:
single_insert = False
# checking required columns
required_cols = ["storage_location_name", "center_name", "transaction_date", "transaction_type_name", "created_by_name"]
for col in required_cols:
# check if column exists and has no nulls
if col not in data_df.columns:
raise ValueError(f"data_df is missing required column '{col}'.")
if data_df[col].is_null().any():
raise ValueError(f"data_df column '{col}' contains null values, but it is required.")
# resolve storage_location_id from storage_location_name + center_name
sl_ids_nested = self._perfdb.inventory.storage_locations.get_ids()
# flatten to {(center, loc): id} for lookup
sl_ids_flat = {(c, loc): loc_id for c, locs in sl_ids_nested.items() for loc, loc_id in locs.items()}
pairs = list(zip(data_df["center_name"].to_list(), data_df["storage_location_name"].to_list(), strict=False))
if wrong_sls := set(pairs) - set(sl_ids_flat.keys()):
raise ValueError(f"Storage locations not found in the database: {wrong_sls}")
data_df = data_df.with_columns(
pl.Series("storage_location_id", [sl_ids_flat[(c, loc)] for c, loc in pairs], dtype=pl.Int64),
)
# resolve transaction_type_name to transaction_type_id
type_ids = self._perfdb.inventory.transactions.types.get_ids()
if wrong_types := set(data_df["transaction_type_name"].to_list()) - set(type_ids.keys()):
raise ValueError(f"Transaction type names not found in the database: {wrong_types}")
data_df = data_df.with_columns(
pl.col("transaction_type_name").replace_strict(type_ids, return_dtype=pl.Int64).alias("transaction_type_id"),
)
# resolve created_by_name to created_by_id
user_names = data_df["created_by_name"].drop_nulls().unique().to_list()
if user_names:
user_ids = self._perfdb.users.instances.get_ids(names=user_names)
if wrong_users := set(user_names) - set(user_ids.keys()):
raise ValueError(f"User names not found in the database: {wrong_users}")
data_df = data_df.with_columns(
pl.col("created_by_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("created_by_id"),
)
else:
data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("created_by_id"))
# resolve service_order_name to service_order_id
if "service_order_name" in data_df.columns:
if len(data_df.filter(pl.col("service_order_name").is_not_null())) > 0:
so_names = data_df["service_order_name"].drop_nulls().unique().to_list()
so_ids = self._perfdb.service_orders.get_ids(names=so_names)
if wrong_sos := set(so_names) - set(so_ids.keys()):
raise ValueError(f"Service order names not found in the database: {wrong_sos}")
data_df = data_df.with_columns(
pl.col("service_order_name").replace_strict(so_ids, return_dtype=pl.Int64, default=None).alias("service_order_id"),
)
else:
data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("service_order_id"))
# drop human-readable columns
data_df = data_df.drop(
["storage_location_name", "center_name", "transaction_type_name", "created_by_name", "service_order_name"],
strict=False,
)
ids_df = self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="inv_transaction_headers",
schema="performance",
return_cols=["id"],
if_exists="skip_row_check",
ignore_null_cols=single_insert,
)
ids = ids_df["id"].to_list()
logger.debug(f"Inserted/updated {len(ids)} transaction header(s) with IDs: {ids}")
return ids if not single_insert else ids[0] if ids else None
update(header_id=None, transaction_date=None, reference_number=None, is_sla_violated=None, notes=None, data_df=None)
¶
Updates an existing inventory transaction header in the database.
Parameters:
-
(header_id¶int | None, default:None) –ID of the header to be updated. Required for single updates. By default None.
-
(transaction_date¶date | None, default:None) –New transaction date. By default None.
-
(reference_number¶str | None, default:None) –New reference number. By default None.
-
(is_sla_violated¶bool | None, default:None) –New SLA violation status. By default None.
-
(notes¶str | None, default:None) –New notes. By default None.
-
(data_df¶DataFrame | None, default:None) –Polars DataFrame containing multiple headers to update. Required column: id. Optional: transaction_date, reference_number, is_sla_violated, notes. If this is used, all individual parameters will be ignored. By default None.
Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def update(
self,
header_id: int | None = None,
transaction_date: date | None = None,
reference_number: str | None = None,
is_sla_violated: bool | None = None,
notes: str | None = None,
data_df: pl.DataFrame | None = None,
) -> None:
"""Updates an existing inventory transaction header in the database.
Parameters
----------
header_id : int | None, optional
ID of the header to be updated. Required for single updates. By default None.
transaction_date : date | None, optional
New transaction date. By default None.
reference_number : str | None, optional
New reference number. By default None.
is_sla_violated : bool | None, optional
New SLA violation status. By default None.
notes : str | None, optional
New notes. By default None.
data_df : pl.DataFrame | None, optional
Polars DataFrame containing multiple headers to update.
Required column: id. Optional: transaction_date, reference_number, is_sla_violated, notes.
If this is used, all individual parameters will be ignored. By default None.
"""
df_schema = {
"id": pl.Int64,
"transaction_date": pl.Date,
"reference_number": pl.Utf8,
"is_sla_violated": pl.Boolean,
"notes": pl.Utf8,
}
if data_df is None:
data_df = pl.DataFrame(
{
"id": [header_id],
"transaction_date": [transaction_date],
"reference_number": [reference_number],
"is_sla_violated": [is_sla_violated],
"notes": [notes],
},
schema=df_schema,
)
single_update = True
else:
single_update = False
if "id" not in data_df.columns:
raise ValueError("data_df is missing required column 'id'.")
# check if IDs exist
existing_query = sql.SQL("SELECT id FROM performance.inv_transaction_headers WHERE id = ANY({ids})").format(
ids=sql.Literal(data_df["id"].to_list()),
)
with self._perfdb.conn.reconnect() as conn:
existing_df = conn.read_to_polars(existing_query)
if wrong_ids := set(data_df["id"].to_list()) - set(existing_df["id"].to_list()):
raise ValueError(f"Transaction header IDs {wrong_ids} do not exist in the database.")
self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="inv_transaction_headers",
schema="performance",
conflict_cols=["id"],
if_exists="update_only",
ignore_null_cols=single_update,
)
logger.debug(f"Updated {len(data_df)} transaction header(s).")