Inventory Transaction Header Documents¶
InventoryTransactionHeaderDocuments(perfdb)
¶
Class used for handling Inventory Transaction Header Documents. Can be accessed via perfdb.inventory.transactions.headers.documents.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(transaction_header_id, document_id)
¶
Removes a document link from a transaction header.
Parameters:
-
(transaction_header_id¶int) –ID of the transaction header.
-
(document_id¶int) –ID of the document to unlink.
Returns:
-
int–Number of rows deleted.
Source code in echo_postgres/inventory_transaction_header_documents.py
@validate_call
def delete(self, transaction_header_id: int, document_id: int) -> int:
"""Removes a document link from a transaction header.
Parameters
----------
transaction_header_id : int
ID of the transaction header.
document_id : int
ID of the document to unlink.
Returns
-------
int
Number of rows deleted.
"""
query = sql.SQL(
"DELETE FROM performance.inv_transaction_header_documents WHERE transaction_header_id = {header_id} AND document_id = {doc_id}",
).format(
header_id=sql.Literal(transaction_header_id),
doc_id=sql.Literal(document_id),
)
self._perfdb.conn.execute(query)
return self._perfdb.conn.rowcount
get(transaction_header_ids=None, document_ids=None, filter_type='and', retrieve_binary_data=False, output_type='pl.DataFrame')
¶
Gets all inventory transaction header document links and their attributes.
Parameters:
-
(transaction_header_ids¶list[int] | None, default:None) –List of transaction header IDs to filter. By default None.
-
(document_ids¶list[int] | None, default:None) –List of document IDs to filter. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
-
(retrieve_binary_data¶bool, default:False) –Whether to include the document_data (binary) column in the result. Set to False to avoid the overhead of transmitting large binary data. By default False.
-
(output_type¶Literal['dict', 'DataFrame', 'pl.DataFrame'], default:'pl.DataFrame') –Output type of the data. By default "pl.DataFrame".
Returns:
-
dict[int, dict[str, Any]]–In case output_type is "dict", returns a dictionary in the format {transaction_header_id: {attribute: value, ...}, ...}.
-
DataFrame–In case output_type is "DataFrame", returns a pandas DataFrame.
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame.
Source code in echo_postgres/inventory_transaction_header_documents.py
@validate_call
def get(
self,
transaction_header_ids: list[int] | None = None,
document_ids: list[int] | None = None,
filter_type: Literal["and", "or"] = "and",
retrieve_binary_data: bool = False,
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
"""Gets all inventory transaction header document links and their attributes.
Parameters
----------
transaction_header_ids : list[int] | None, optional
List of transaction header IDs to filter. By default None.
document_ids : list[int] | None, optional
List of document IDs to filter. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
retrieve_binary_data : bool, optional
Whether to include the document_data (binary) column in the result.
Set to False to avoid the overhead of transmitting large binary data. By default False.
output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
Output type of the data. By default "pl.DataFrame".
Returns
-------
dict[int, dict[str, Any]]
In case output_type is "dict", returns a dictionary in the format {transaction_header_id: {attribute: value, ...}, ...}.
pd.DataFrame
In case output_type is "DataFrame", returns a pandas DataFrame.
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame.
"""
where = self._check_get_args(
transaction_header_ids=transaction_header_ids,
document_ids=document_ids,
filter_type=filter_type,
)
columns = list(
self._perfdb.conn.get_table_columns(
schema="performance",
table_names=["v_inv_transaction_header_documents"],
table_types=["view"],
)["v_inv_transaction_header_documents"].keys(),
)
if not retrieve_binary_data:
columns = [col for col in columns if col != "document_data"]
query = sql.SQL(
"SELECT {columns} FROM performance.v_inv_transaction_header_documents {where} ORDER BY transaction_header_id, document_id",
).format(
columns=sql.SQL(", ").join(sql.Identifier(col) for col in columns),
where=where,
)
df = self._perfdb.conn.read_to_polars(query)
return convert_output(df, output_type, orient="records")
insert(transaction_header_id, document_path, document_type, linked_by_name=None, document_name=None, document_date=None, description=None, labels=None, event_ids=None, delete_after_insert=False)
¶
Inserts a document into the database and links it to a transaction header.
This method creates a new document via perfdb.documents.instances.insert and then
links it to the specified transaction header.
Parameters:
-
(transaction_header_id¶int) –ID of the transaction header to link the document to.
-
(document_path¶Path | bytes) –Path to the document file, or raw bytes to insert directly.
-
(document_type¶str) –Type of the document. Must be a valid document type in the database.
-
(linked_by_name¶str | None, default:None) –Name of the user who linked the document. Must exist in users table. By default None.
-
(document_name¶str | None, default:None) –Name of the document. If not set, the file name (without extension) is used. By default None.
-
(document_date¶datetime | None, default:None) –Date of the document. If not set, the current date is used. By default None.
-
(description¶str | None, default:None) –Description of the document. By default None.
-
(labels¶list[str] | None, default:None) –List of labels to add to the document. By default None.
-
(event_ids¶list[int] | None, default:None) –List of event IDs to associate with the document. By default None.
-
(delete_after_insert¶bool, default:False) –If True, deletes the source file after inserting. By default False.
Returns:
-
int–ID of the inserted document instance.
Source code in echo_postgres/inventory_transaction_header_documents.py
@validate_call
def insert(
self,
transaction_header_id: int,
document_path: Path | bytes,
document_type: str,
linked_by_name: str | None = None,
document_name: str | None = None,
document_date: datetime | None = None,
description: str | None = None,
labels: list[str] | None = None,
event_ids: list[int] | None = None,
delete_after_insert: bool = False,
) -> int:
"""Inserts a document into the database and links it to a transaction header.
This method creates a new document via `perfdb.documents.instances.insert` and then
links it to the specified transaction header.
Parameters
----------
transaction_header_id : int
ID of the transaction header to link the document to.
document_path : Path | bytes
Path to the document file, or raw bytes to insert directly.
document_type : str
Type of the document. Must be a valid document type in the database.
linked_by_name : str | None, optional
Name of the user who linked the document. Must exist in users table. By default None.
document_name : str | None, optional
Name of the document. If not set, the file name (without extension) is used. By default None.
document_date : datetime | None, optional
Date of the document. If not set, the current date is used. By default None.
description : str | None, optional
Description of the document. By default None.
labels : list[str] | None, optional
List of labels to add to the document. By default None.
event_ids : list[int] | None, optional
List of event IDs to associate with the document. By default None.
delete_after_insert : bool, optional
If True, deletes the source file after inserting. By default False.
Returns
-------
int
ID of the inserted document instance.
"""
doc_id = self._perfdb.documents.instances.insert(
document_path=document_path,
document_type=document_type,
document_name=document_name,
document_date=document_date,
description=description,
labels=labels,
event_ids=event_ids,
delete_after_insert=delete_after_insert,
)
self.link(
transaction_header_id=transaction_header_id,
document_id=doc_id,
linked_by_name=linked_by_name,
)
return doc_id
link(transaction_header_id=None, document_id=None, linked_by_name=None, data_df=None, on_conflict='ignore')
¶
Links an existing document to a transaction header.
You can either pass individual values to link a single document, or pass a DataFrame to link multiple documents at once.
Parameters:
-
(transaction_header_id¶int | None, default:None) –ID of the transaction header. By default None.
-
(document_id¶int | None, default:None) –ID of the document to link. By default None.
-
(linked_by_name¶str | None, default:None) –Name of the user performing the link. Must exist in users table. By default None.
-
(data_df¶DataFrame | None, default:None) –Polars DataFrame containing multiple links. Required columns: transaction_header_id, document_id, linked_by_name. If this is used, all individual parameters will be ignored. By default None.
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. By default "ignore".
Source code in echo_postgres/inventory_transaction_header_documents.py
@validate_call
def link(
self,
transaction_header_id: int | None = None,
document_id: int | None = None,
linked_by_name: str | None = None,
data_df: pl.DataFrame | None = None,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
"""Links an existing document to a transaction header.
You can either pass individual values to link a single document, or pass a DataFrame
to link multiple documents at once.
Parameters
----------
transaction_header_id : int | None, optional
ID of the transaction header. By default None.
document_id : int | None, optional
ID of the document to link. By default None.
linked_by_name : str | None, optional
Name of the user performing the link. Must exist in users table. By default None.
data_df : pl.DataFrame | None, optional
Polars DataFrame containing multiple links.
Required columns: transaction_header_id, document_id, linked_by_name.
If this is used, all individual parameters will be ignored. By default None.
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. By default "ignore".
"""
df_schema = {
"transaction_header_id": pl.Int64,
"document_id": pl.Int64,
"linked_by_name": pl.Utf8,
}
if data_df is None:
single_insert = True
data_df = pl.DataFrame(
{
"transaction_header_id": [transaction_header_id],
"document_id": [document_id],
"linked_by_name": [linked_by_name],
},
schema=df_schema,
)
else:
single_insert = False
# converting to expected schema
data_df = data_df.cast({col: dtype for col, dtype in df_schema.items() if col in data_df.columns})
# resolve linked_by_name to linked_by_id
user_names = data_df["linked_by_name"].drop_nulls().unique().to_list()
if user_names:
user_ids = self._perfdb.users.instances.get_ids(names=user_names)
if wrong_users := set(user_names) - set(user_ids.keys()):
raise ValueError(f"User names not found in the database: {wrong_users}")
data_df = data_df.with_columns(
pl.col("linked_by_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("linked_by_id"),
)
else:
data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("linked_by_id"))
data_df = data_df.drop(["linked_by_name"])
self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="inv_transaction_header_documents",
schema="performance",
conflict_cols=["transaction_header_id", "document_id"],
if_exists="append" if on_conflict == "ignore" else "update",
ignore_null_cols=single_insert,
)
logger.debug(f"Linked {len(data_df)} document(s) to transaction header(s).")