Inventory Physical Count Documents¶
InventoryPhysicalCountDocuments(perfdb)
¶
Class used for handling Inventory Physical Count Documents. Can be accessed via perfdb.inventory.physical_counts.documents.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(physical_count_ids=None, document_ids=None)
¶
Deletes inventory physical count document links from the database.
At least one parameter must be provided.
Parameters:
-
(physical_count_ids¶list[int] | None, default:None) –List of physical count IDs to be deleted. By default None.
-
(document_ids¶list[int] | None, default:None) –List of document IDs to be deleted. By default None.
Returns:
-
int–Number of rows deleted.
Source code in echo_postgres/inventory_physical_count_documents.py
@validate_call
def delete(
self,
physical_count_ids: list[int] | None = None,
document_ids: list[int] | None = None,
) -> int:
"""Deletes inventory physical count document links from the database.
At least one parameter must be provided.
Parameters
----------
physical_count_ids : list[int] | None, optional
List of physical count IDs to be deleted. By default None.
document_ids : list[int] | None, optional
List of document IDs to be deleted. By default None.
Returns
-------
int
Number of rows deleted.
"""
if not physical_count_ids and not document_ids:
raise ValueError("At least one of 'physical_count_ids' or 'document_ids' must be provided.")
where = self._check_get_args(
physical_count_ids=physical_count_ids,
document_ids=document_ids,
filter_type="and",
)
query = sql.SQL("DELETE FROM performance.inv_physical_count_documents {where}").format(where=where)
self._perfdb.conn.execute(query)
deleted = self._perfdb.conn.rowcount
logger.debug(f"Deleted {deleted} physical count document link(s).")
return deleted
get(physical_count_ids=None, document_ids=None, filter_type='and', retrieve_binary_data=False, output_type='pl.DataFrame')
¶
Gets all inventory physical count documents and their attributes.
The most useful keys/columns returned are:
- physical_count_id
- document_id
- document_name
- document_description
- document_date
- data_type_name
- document_type_name
- document_data (only when retrieve_binary_data is True)
- linked_date
- linked_by_id
- linker_name
Parameters:
-
(physical_count_ids¶list[int] | None, default:None) –List of physical count IDs to filter. By default None.
-
(document_ids¶list[int] | None, default:None) –List of document IDs to filter. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. By default "and".
-
(retrieve_binary_data¶bool, default:False) –Whether to include the document_data (binary) column in the result. Set to False to avoid the overhead of transmitting large binary data. By default False.
-
(output_type¶Literal['dict', 'DataFrame', 'pl.DataFrame'], default:'pl.DataFrame') –Output type of the data. By default "pl.DataFrame".
Returns:
-
dict[int, dict[str, Any]]–In case output_type is "dict", returns a dictionary in the format {physical_count_id: {attribute: value, ...}, ...}.
-
DataFrame–In case output_type is "DataFrame", returns a pandas DataFrame.
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame.
Source code in echo_postgres/inventory_physical_count_documents.py
@validate_call
def get(
self,
physical_count_ids: list[int] | None = None,
document_ids: list[int] | None = None,
filter_type: Literal["and", "or"] = "and",
retrieve_binary_data: bool = False,
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
"""Gets all inventory physical count documents and their attributes.
The most useful keys/columns returned are:
- physical_count_id
- document_id
- document_name
- document_description
- document_date
- data_type_name
- document_type_name
- document_data (only when retrieve_binary_data is True)
- linked_date
- linked_by_id
- linker_name
Parameters
----------
physical_count_ids : list[int] | None, optional
List of physical count IDs to filter. By default None.
document_ids : list[int] | None, optional
List of document IDs to filter. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. By default "and".
retrieve_binary_data : bool, optional
Whether to include the document_data (binary) column in the result.
Set to False to avoid the overhead of transmitting large binary data. By default False.
output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
Output type of the data. By default "pl.DataFrame".
Returns
-------
dict[int, dict[str, Any]]
In case output_type is "dict", returns a dictionary in the format
{physical_count_id: {attribute: value, ...}, ...}.
pd.DataFrame
In case output_type is "DataFrame", returns a pandas DataFrame.
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame.
"""
where = self._check_get_args(
physical_count_ids=physical_count_ids,
document_ids=document_ids,
filter_type=filter_type,
)
columns = list(
self._perfdb.conn.get_table_columns(schema="performance", table_names=["v_inv_physical_count_documents"], table_types=["view"])[
"v_inv_physical_count_documents"
].keys(),
)
if not retrieve_binary_data:
columns = [col for col in columns if col != "document_data"]
query = sql.SQL(
"SELECT {columns} FROM performance.v_inv_physical_count_documents {where} ORDER BY physical_count_id, document_id",
).format(
columns=sql.SQL(", ").join(sql.Identifier(col) for col in columns),
where=where,
)
df = self._perfdb.conn.read_to_polars(query)
return convert_output(df, output_type, orient="records")
insert(physical_count_id, document_path, document_type, linked_by_name=None, document_name=None, document_date=None, description=None, labels=None, event_ids=None, delete_after_insert=False)
¶
Inserts a document into the database and links it to a physical count.
This method creates a new document via perfdb.documents.instances.insert and then
links it to the specified physical count.
Parameters:
-
(physical_count_id¶int) –ID of the physical count to link the document to.
-
(document_path¶Path | bytes) –Path to the document file, or raw bytes to insert directly.
-
(document_type¶str) –Type of the document. Must be a valid document type in the database.
-
(linked_by_name¶str | None, default:None) –Name of the user who linked the document. Must exist in users table. By default None.
-
(document_name¶str | None, default:None) –Name of the document. If not set, the file name (without extension) is used. By default None.
-
(document_date¶datetime | None, default:None) –Date of the document. If not set, the current date is used. By default None.
-
(description¶str | None, default:None) –Description of the document. By default None.
-
(labels¶list[str] | None, default:None) –List of labels to add to the document. By default None.
-
(event_ids¶list[int] | None, default:None) –List of event IDs to associate with the document. By default None.
-
(delete_after_insert¶bool, default:False) –If True, deletes the source file after inserting. By default False.
Returns:
-
int–ID of the inserted document instance.
Source code in echo_postgres/inventory_physical_count_documents.py
@validate_call
def insert(
self,
physical_count_id: int,
document_path: Path | bytes,
document_type: str,
linked_by_name: str | None = None,
document_name: str | None = None,
document_date: datetime | None = None,
description: str | None = None,
labels: list[str] | None = None,
event_ids: list[int] | None = None,
delete_after_insert: bool = False,
) -> int:
"""Inserts a document into the database and links it to a physical count.
This method creates a new document via `perfdb.documents.instances.insert` and then
links it to the specified physical count.
Parameters
----------
physical_count_id : int
ID of the physical count to link the document to.
document_path : Path | bytes
Path to the document file, or raw bytes to insert directly.
document_type : str
Type of the document. Must be a valid document type in the database.
linked_by_name : str | None, optional
Name of the user who linked the document. Must exist in users table. By default None.
document_name : str | None, optional
Name of the document. If not set, the file name (without extension) is used. By default None.
document_date : datetime | None, optional
Date of the document. If not set, the current date is used. By default None.
description : str | None, optional
Description of the document. By default None.
labels : list[str] | None, optional
List of labels to add to the document. By default None.
event_ids : list[int] | None, optional
List of event IDs to associate with the document. By default None.
delete_after_insert : bool, optional
If True, deletes the source file after inserting. By default False.
Returns
-------
int
ID of the inserted document instance.
"""
doc_id = self._perfdb.documents.instances.insert(
document_path=document_path,
document_type=document_type,
document_name=document_name,
document_date=document_date,
description=description,
labels=labels,
event_ids=event_ids,
delete_after_insert=delete_after_insert,
)
self.link(
physical_count_id=physical_count_id,
document_id=doc_id,
linked_by_name=linked_by_name,
)
return doc_id
link(physical_count_id=None, document_id=None, linked_by_name=None, data_df=None, on_conflict='ignore')
¶
Links an existing document to a physical count.
You can either pass individual values to link a single document, or pass a DataFrame to link multiple documents at once.
Parameters:
-
(physical_count_id¶int | None, default:None) –ID of the physical count. By default None.
-
(document_id¶int | None, default:None) –ID of the document. By default None.
-
(linked_by_name¶str | None, default:None) –Name of the user who linked the document. Must exist in users table. By default None.
-
(data_df¶DataFrame | None, default:None) –Polars DataFrame containing multiple links to insert. Required columns: physical_count_id, document_id. Optional: linked_by_name. If this is used, all individual parameters will be ignored. By default None.
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict (based on physical_count_id + document_id). By default "ignore".
Source code in echo_postgres/inventory_physical_count_documents.py
@validate_call
def link(
self,
physical_count_id: int | None = None,
document_id: int | None = None,
linked_by_name: str | None = None,
data_df: pl.DataFrame | None = None,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
"""Links an existing document to a physical count.
You can either pass individual values to link a single document, or pass a DataFrame
to link multiple documents at once.
Parameters
----------
physical_count_id : int | None, optional
ID of the physical count. By default None.
document_id : int | None, optional
ID of the document. By default None.
linked_by_name : str | None, optional
Name of the user who linked the document. Must exist in users table. By default None.
data_df : pl.DataFrame | None, optional
Polars DataFrame containing multiple links to insert.
Required columns: physical_count_id, document_id.
Optional: linked_by_name.
If this is used, all individual parameters will be ignored. By default None.
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict (based on physical_count_id + document_id). By default "ignore".
"""
df_schema = {
"physical_count_id": pl.Int64,
"document_id": pl.Int64,
"linked_by_name": pl.Utf8,
}
if data_df is None:
single_insert = True
data_df = pl.DataFrame(
{
"physical_count_id": [physical_count_id],
"document_id": [document_id],
"linked_by_name": [linked_by_name],
},
schema=df_schema,
)
else:
single_insert = False
# converting to expected schema
data_df = data_df.cast({col: dtype for col, dtype in df_schema.items() if col in data_df.columns})
# resolve linked_by_name to linked_by_id
if "linked_by_name" in data_df.columns and len(data_df.filter(pl.col("linked_by_name").is_not_null())) > 0:
user_names = data_df["linked_by_name"].drop_nulls().unique().to_list()
user_ids = self._perfdb.users.instances.get_ids(names=user_names)
if wrong_users := set(user_names) - set(user_ids.keys()):
raise ValueError(f"User names not found in the database: {wrong_users}")
data_df = data_df.with_columns(
pl.col("linked_by_name").replace_strict(user_ids, return_dtype=pl.Int64, default=None).alias("linked_by_id"),
)
else:
data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("linked_by_id"))
data_df = data_df.drop(["linked_by_name"])
self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="inv_physical_count_documents",
schema="performance",
conflict_cols=["physical_count_id", "document_id"],
if_exists="append" if on_conflict == "ignore" else "update",
ignore_null_cols=single_insert,
)
logger.debug(f"Linked {len(data_df)} document(s) to physical count(s).")