Inventory Transaction Header Documents¶
InventoryTransactionHeaderDocuments(perfdb)
¶
Class used for handling Inventory Transaction Header Documents. Can be accessed via perfdb.inventory.transactions.headers.documents.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(transaction_header_id, document_id)
¶
Removes a document link from a transaction header.
Parameters:
-
(transaction_header_id¶int) –ID of the transaction header.
-
(document_id¶int) –ID of the document to unlink.
Returns:
-
int–Number of rows deleted.
Source code in echo_postgres/inventory_transaction_header_documents.py
@validate_call
def delete(self, transaction_header_id: int, document_id: int) -> int:
"""Removes a document link from a transaction header.
Parameters
----------
transaction_header_id : int
ID of the transaction header.
document_id : int
ID of the document to unlink.
Returns
-------
int
Number of rows deleted.
"""
query = sql.SQL(
"DELETE FROM performance.inv_transaction_header_documents WHERE transaction_header_id = {header_id} AND document_id = {doc_id}",
).format(
header_id=sql.Literal(transaction_header_id),
doc_id=sql.Literal(document_id),
)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
return conn.rowcount
get(transaction_header_ids=None, document_ids=None, filter_type='and', output_type='pl.DataFrame')
¶
Gets all inventory transaction header document links and their attributes.
Parameters:
-
(transaction_header_ids¶list[int] | None, default:None) –List of transaction header IDs to filter. By default None.
-
(document_ids¶list[int] | None, default:None) –List of document IDs to filter. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
-
(output_type¶Literal['dict', 'DataFrame', 'pl.DataFrame'], default:'pl.DataFrame') –Output type of the data. By default "pl.DataFrame".
Returns:
-
dict[int, dict[str, Any]]–In case output_type is "dict", returns a dictionary in the format {transaction_header_id: {attribute: value, ...}, ...}.
-
DataFrame–In case output_type is "DataFrame", returns a pandas DataFrame.
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame.
Source code in echo_postgres/inventory_transaction_header_documents.py
@validate_call
def get(
self,
transaction_header_ids: list[int] | None = None,
document_ids: list[int] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
"""Gets all inventory transaction header document links and their attributes.
Parameters
----------
transaction_header_ids : list[int] | None, optional
List of transaction header IDs to filter. By default None.
document_ids : list[int] | None, optional
List of document IDs to filter. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
Output type of the data. By default "pl.DataFrame".
Returns
-------
dict[int, dict[str, Any]]
In case output_type is "dict", returns a dictionary in the format {transaction_header_id: {attribute: value, ...}, ...}.
pd.DataFrame
In case output_type is "DataFrame", returns a pandas DataFrame.
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame.
"""
where = self._check_get_args(
transaction_header_ids=transaction_header_ids,
document_ids=document_ids,
filter_type=filter_type,
)
query = sql.SQL(
"SELECT * FROM performance.inv_transaction_header_documents {where} ORDER BY transaction_header_id, document_id",
).format(where=where)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_polars(query)
if output_type == "pl.DataFrame":
return df
df = df.to_pandas(use_pyarrow_extension_array=True)
if output_type == "DataFrame":
return df
return df.to_dict(orient="records")
insert(transaction_header_id=None, document_id=None, linked_by_name=None, data_df=None, on_conflict='ignore')
¶
Links a document to a transaction header.
You can either pass individual values to insert a single link, or pass a DataFrame to insert multiple links at once.
Parameters:
-
(transaction_header_id¶int | None, default:None) –ID of the transaction header. By default None.
-
(document_id¶int | None, default:None) –ID of the document to link. By default None.
-
(linked_by_name¶str | None, default:None) –Name of the user performing the link. Must exist in users table. By default None.
-
(data_df¶DataFrame | None, default:None) –Polars DataFrame containing multiple links. Required columns: transaction_header_id, document_id, linked_by_name. If this is used, all individual parameters will be ignored. By default None.
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. By default "ignore".
Source code in echo_postgres/inventory_transaction_header_documents.py
@validate_call
def insert(
self,
transaction_header_id: int | None = None,
document_id: int | None = None,
linked_by_name: str | None = None,
data_df: pl.DataFrame | None = None,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
"""Links a document to a transaction header.
You can either pass individual values to insert a single link, or pass a DataFrame
to insert multiple links at once.
Parameters
----------
transaction_header_id : int | None, optional
ID of the transaction header. By default None.
document_id : int | None, optional
ID of the document to link. By default None.
linked_by_name : str | None, optional
Name of the user performing the link. Must exist in users table. By default None.
data_df : pl.DataFrame | None, optional
Polars DataFrame containing multiple links.
Required columns: transaction_header_id, document_id, linked_by_name.
If this is used, all individual parameters will be ignored. By default None.
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. By default "ignore".
"""
df_schema = {
"transaction_header_id": pl.Int64,
"document_id": pl.Int64,
"linked_by_name": pl.Utf8,
}
if data_df is None:
single_insert = True
data_df = pl.DataFrame(
{
"transaction_header_id": [transaction_header_id],
"document_id": [document_id],
"linked_by_name": [linked_by_name],
},
schema=df_schema,
)
else:
single_insert = False
# resolve linked_by_name to linked_by_id
user_names = data_df["linked_by_name"].drop_nulls().unique().to_list()
if user_names:
user_ids = self._perfdb.users.instances.get_ids(names=user_names)
if wrong_users := set(user_names) - set(user_ids.keys()):
raise ValueError(f"User names not found in the database: {wrong_users}")
data_df = data_df.with_columns(
pl.col("linked_by_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("linked_by_id"),
)
else:
data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("linked_by_id"))
data_df = data_df.drop(["linked_by_name"])
self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="inv_transaction_header_documents",
schema="performance",
conflict_cols=["transaction_header_id", "document_id"],
if_exists="append" if on_conflict == "ignore" else "update",
ignore_null_cols=single_insert,
)
logger.debug(f"Linked {len(data_df)} document(s) to transaction header(s).")