Skip to content

Inventory Transaction Header Documents

InventoryTransactionHeaderDocuments(perfdb)

Class used for handling Inventory Transaction Header Documents. Can be accessed via perfdb.inventory.transactions.headers.documents.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(transaction_header_id, document_id)

Removes a document link from a transaction header.

Parameters:

  • transaction_header_id

    (int) –

    ID of the transaction header.

  • document_id

    (int) –

    ID of the document to unlink.

Returns:

  • int

    Number of rows deleted.

Source code in echo_postgres/inventory_transaction_header_documents.py
@validate_call
def delete(self, transaction_header_id: int, document_id: int) -> int:
    """Removes a document link from a transaction header.

    Parameters
    ----------
    transaction_header_id : int
        ID of the transaction header.
    document_id : int
        ID of the document to unlink.

    Returns
    -------
    int
        Number of rows deleted.
    """
    query = sql.SQL(
        "DELETE FROM performance.inv_transaction_header_documents WHERE transaction_header_id = {header_id} AND document_id = {doc_id}",
    ).format(
        header_id=sql.Literal(transaction_header_id),
        doc_id=sql.Literal(document_id),
    )

    with self._perfdb.conn.reconnect() as conn:
        conn.execute(query)
        return conn.rowcount

get(transaction_header_ids=None, document_ids=None, filter_type='and', output_type='pl.DataFrame')

Gets all inventory transaction header document links and their attributes.

Parameters:

  • transaction_header_ids

    (list[int] | None, default: None ) –

    List of transaction header IDs to filter. By default None.

  • document_ids

    (list[int] | None, default: None ) –

    List of document IDs to filter. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

  • output_type

    (Literal['dict', 'DataFrame', 'pl.DataFrame'], default: 'pl.DataFrame' ) –

    Output type of the data. By default "pl.DataFrame".

Returns:

  • dict[int, dict[str, Any]]

    In case output_type is "dict", returns a dictionary in the format {transaction_header_id: {attribute: value, ...}, ...}.

  • DataFrame

    In case output_type is "DataFrame", returns a pandas DataFrame.

  • DataFrame

    In case output_type is "pl.DataFrame", returns a Polars DataFrame.

Source code in echo_postgres/inventory_transaction_header_documents.py
@validate_call
def get(
    self,
    transaction_header_ids: list[int] | None = None,
    document_ids: list[int] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
    """Gets all inventory transaction header document links and their attributes.

    Parameters
    ----------
    transaction_header_ids : list[int] | None, optional
        List of transaction header IDs to filter. By default None.
    document_ids : list[int] | None, optional
        List of document IDs to filter. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".
    output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
        Output type of the data. By default "pl.DataFrame".

    Returns
    -------
    dict[int, dict[str, Any]]
        In case output_type is "dict", returns a dictionary in the format {transaction_header_id: {attribute: value, ...}, ...}.
    pd.DataFrame
        In case output_type is "DataFrame", returns a pandas DataFrame.
    pl.DataFrame
        In case output_type is "pl.DataFrame", returns a Polars DataFrame.
    """
    where = self._check_get_args(
        transaction_header_ids=transaction_header_ids,
        document_ids=document_ids,
        filter_type=filter_type,
    )

    query = sql.SQL(
        "SELECT * FROM performance.inv_transaction_header_documents {where} ORDER BY transaction_header_id, document_id",
    ).format(where=where)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_polars(query)

    if output_type == "pl.DataFrame":
        return df

    df = df.to_pandas(use_pyarrow_extension_array=True)

    if output_type == "DataFrame":
        return df

    return df.to_dict(orient="records")

insert(transaction_header_id=None, document_id=None, linked_by_name=None, data_df=None, on_conflict='ignore')

Links a document to a transaction header.

You can either pass individual values to insert a single link, or pass a DataFrame to insert multiple links at once.

Parameters:

  • transaction_header_id

    (int | None, default: None ) –

    ID of the transaction header. By default None.

  • document_id

    (int | None, default: None ) –

    ID of the document to link. By default None.

  • linked_by_name

    (str | None, default: None ) –

    Name of the user performing the link. Must exist in users table. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    Polars DataFrame containing multiple links. Required columns: transaction_header_id, document_id, linked_by_name. If this is used, all individual parameters will be ignored. By default None.

  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict. By default "ignore".

Source code in echo_postgres/inventory_transaction_header_documents.py
@validate_call
def insert(
    self,
    transaction_header_id: int | None = None,
    document_id: int | None = None,
    linked_by_name: str | None = None,
    data_df: pl.DataFrame | None = None,
    on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
    """Links a document to a transaction header.

    You can either pass individual values to insert a single link, or pass a DataFrame
    to insert multiple links at once.

    Parameters
    ----------
    transaction_header_id : int | None, optional
        ID of the transaction header. By default None.
    document_id : int | None, optional
        ID of the document to link. By default None.
    linked_by_name : str | None, optional
        Name of the user performing the link. Must exist in users table. By default None.
    data_df : pl.DataFrame | None, optional
        Polars DataFrame containing multiple links.
        Required columns: transaction_header_id, document_id, linked_by_name.
        If this is used, all individual parameters will be ignored. By default None.
    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict. By default "ignore".
    """
    df_schema = {
        "transaction_header_id": pl.Int64,
        "document_id": pl.Int64,
        "linked_by_name": pl.Utf8,
    }

    if data_df is None:
        single_insert = True
        data_df = pl.DataFrame(
            {
                "transaction_header_id": [transaction_header_id],
                "document_id": [document_id],
                "linked_by_name": [linked_by_name],
            },
            schema=df_schema,
        )
    else:
        single_insert = False

    # resolve linked_by_name to linked_by_id
    user_names = data_df["linked_by_name"].drop_nulls().unique().to_list()
    if user_names:
        user_ids = self._perfdb.users.instances.get_ids(names=user_names)
        if wrong_users := set(user_names) - set(user_ids.keys()):
            raise ValueError(f"User names not found in the database: {wrong_users}")

        data_df = data_df.with_columns(
            pl.col("linked_by_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("linked_by_id"),
        )
    else:
        data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("linked_by_id"))

    data_df = data_df.drop(["linked_by_name"])

    self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="inv_transaction_header_documents",
        schema="performance",
        conflict_cols=["transaction_header_id", "document_id"],
        if_exists="append" if on_conflict == "ignore" else "update",
        ignore_null_cols=single_insert,
    )

    logger.debug(f"Linked {len(data_df)} document(s) to transaction header(s).")