Skip to content

Inventory Transaction Headers

InventoryTransactionHeaders(perfdb)

Class used for handling Inventory Transaction Headers. Can be accessed via perfdb.inventory.transactions.headers.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/inventory_transaction_headers.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Class used for handling Inventory Transaction Headers. Can be accessed via `perfdb.inventory.transactions.headers`.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.
    """
    super().__init__(perfdb)

    from .inventory_transaction_header_documents import InventoryTransactionHeaderDocuments

    # * subclasses

    self.documents = InventoryTransactionHeaderDocuments(perfdb)

delete(header_ids)

Deletes inventory transaction headers from the database.

Parameters:

  • header_ids

    (list[int]) –

    List of transaction header IDs to be deleted.

Returns:

  • int

    Number of rows deleted.

Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def delete(self, header_ids: list[int]) -> int:
    """Deletes inventory transaction headers from the database.

    Parameters
    ----------
    header_ids : list[int]
        List of transaction header IDs to be deleted.

    Returns
    -------
    int
        Number of rows deleted.
    """
    query = sql.SQL("DELETE FROM performance.inv_transaction_headers WHERE id = ANY({ids})").format(
        ids=sql.Literal(header_ids),
    )

    self._perfdb.conn.execute(query)
    deleted = self._perfdb.conn.rowcount

    logger.debug(f"Deleted {deleted} transaction header(s).")
    return deleted

get(ids=None, center_names=None, storage_location_names=None, transaction_type_names=None, directions=None, service_order_names=None, period=None, notes_search=None, has_documents=None, filter_type='and', output_type='pl.DataFrame')

Gets all inventory transaction headers and their attributes.

The most useful keys/columns returned are:

  • id
  • transaction_date
  • storage_location_name
  • center_name
  • transaction_type_name
  • direction
  • withdrawal_id
  • service_order_name
  • reference_number
  • is_sla_violated
  • notes
  • creator_name
  • display_label

Parameters:

  • ids

    (list[int] | None, default: None ) –

    List of transaction header IDs to filter. By default None.

  • center_names

    (list[str] | None, default: None ) –

    List of center names to filter. By default None.

  • storage_location_names

    (list[str] | None, default: None ) –

    List of storage location names to filter. By default None.

  • transaction_type_names

    (list[str] | None, default: None ) –

    List of transaction type names to filter. By default None.

  • directions

    (list[str] | None, default: None ) –

    List of directions to filter. By default None.

  • service_order_names

    (list[str] | None, default: None ) –

    List of service order names to filter. By default None.

  • period

    (DateTimeRange | None, default: None ) –

    Date range to filter by transaction_date. By default None.

  • notes_search

    (str | None, default: None ) –

    Plain-text search string matched against the notes_searchable tsvector column (GIN index, Portuguese language). By default None.

  • has_documents

    (bool | None, default: None ) –

    If True, only return headers that have at least one attached document. If False, only return headers with no documents. If None, no filter is applied. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

  • output_type

    (Literal['dict', 'DataFrame', 'pl.DataFrame'], default: 'pl.DataFrame' ) –

    Output type of the data. By default "pl.DataFrame".

Returns:

  • dict[int, dict[str, Any]]

    In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.

  • DataFrame

    In case output_type is "DataFrame", returns a pandas DataFrame with index = id.

  • DataFrame

    In case output_type is "pl.DataFrame", returns a Polars DataFrame.

Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def get(
    self,
    ids: list[int] | None = None,
    center_names: list[str] | None = None,
    storage_location_names: list[str] | None = None,
    transaction_type_names: list[str] | None = None,
    directions: list[str] | None = None,
    service_order_names: list[str] | None = None,
    period: DateTimeRange | None = None,
    notes_search: str | None = None,
    has_documents: bool | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
    """Gets all inventory transaction headers and their attributes.

    The most useful keys/columns returned are:

    - id
    - transaction_date
    - storage_location_name
    - center_name
    - transaction_type_name
    - direction
    - withdrawal_id
    - service_order_name
    - reference_number
    - is_sla_violated
    - notes
    - creator_name
    - display_label

    Parameters
    ----------
    ids : list[int] | None, optional
        List of transaction header IDs to filter. By default None.
    center_names : list[str] | None, optional
        List of center names to filter. By default None.
    storage_location_names : list[str] | None, optional
        List of storage location names to filter. By default None.
    transaction_type_names : list[str] | None, optional
        List of transaction type names to filter. By default None.
    directions : list[str] | None, optional
        List of directions to filter. By default None.
    service_order_names : list[str] | None, optional
        List of service order names to filter. By default None.
    period : DateTimeRange | None, optional
        Date range to filter by transaction_date. By default None.
    notes_search : str | None, optional
        Plain-text search string matched against the notes_searchable tsvector column (GIN index, Portuguese language). By default None.
    has_documents : bool | None, optional
        If True, only return headers that have at least one attached document. If False, only return headers with no documents. If None, no filter is applied. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".
    output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
        Output type of the data. By default "pl.DataFrame".

    Returns
    -------
    dict[int, dict[str, Any]]
        In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
    pd.DataFrame
        In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
    pl.DataFrame
        In case output_type is "pl.DataFrame", returns a Polars DataFrame.
    """
    where = self._check_get_args(
        ids=ids,
        center_names=center_names,
        storage_location_names=storage_location_names,
        transaction_type_names=transaction_type_names,
        directions=directions,
        service_order_names=service_order_names,
        period=period,
        notes_search=notes_search,
        has_documents=has_documents,
        filter_type=filter_type,
    )

    query = sql.SQL("SELECT * FROM performance.v_inv_transaction_headers {where} ORDER BY id").format(where=where)

    df = self._perfdb.conn.read_to_polars(query, schema_overrides=self._cols_schema)

    # drop notes_searchable column before returning
    df = df.drop("notes_searchable")

    return convert_output(df, output_type, index_col="id")

get_ids(ids=None, center_names=None, storage_location_names=None, transaction_type_names=None, directions=None, service_order_names=None, period=None, notes_search=None, has_documents=None, filter_type='and')

Gets all inventory transaction header IDs matching the provided filters.

Parameters:

  • ids

    (list[int] | None, default: None ) –

    List of transaction header IDs to filter. By default None.

  • center_names

    (list[str] | None, default: None ) –

    List of center names to filter. By default None.

  • storage_location_names

    (list[str] | None, default: None ) –

    List of storage location names to filter. By default None.

  • transaction_type_names

    (list[str] | None, default: None ) –

    List of transaction type names to filter. By default None.

  • directions

    (list[str] | None, default: None ) –

    List of directions to filter. By default None.

  • service_order_names

    (list[str] | None, default: None ) –

    List of service order names to filter. By default None.

  • period

    (DateTimeRange | None, default: None ) –

    Date range to filter by transaction_date. By default None.

  • notes_search

    (str | None, default: None ) –

    Plain-text search string matched against the notes_searchable tsvector column (GIN index, Portuguese language). By default None.

  • has_documents

    (bool | None, default: None ) –

    If True, only return headers that have at least one attached document. If False, only return headers with no documents. If None, no filter is applied. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

Returns:

  • list[int]

    List of transaction header IDs matching the filters.

Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def get_ids(
    self,
    ids: list[int] | None = None,
    center_names: list[str] | None = None,
    storage_location_names: list[str] | None = None,
    transaction_type_names: list[str] | None = None,
    directions: list[str] | None = None,
    service_order_names: list[str] | None = None,
    period: DateTimeRange | None = None,
    notes_search: str | None = None,
    has_documents: bool | None = None,
    filter_type: Literal["and", "or"] = "and",
) -> list[int]:
    """Gets all inventory transaction header IDs matching the provided filters.

    Parameters
    ----------
    ids : list[int] | None, optional
        List of transaction header IDs to filter. By default None.
    center_names : list[str] | None, optional
        List of center names to filter. By default None.
    storage_location_names : list[str] | None, optional
        List of storage location names to filter. By default None.
    transaction_type_names : list[str] | None, optional
        List of transaction type names to filter. By default None.
    directions : list[str] | None, optional
        List of directions to filter. By default None.
    service_order_names : list[str] | None, optional
        List of service order names to filter. By default None.
    period : DateTimeRange | None, optional
        Date range to filter by transaction_date. By default None.
    notes_search : str | None, optional
        Plain-text search string matched against the notes_searchable tsvector column (GIN index, Portuguese language). By default None.
    has_documents : bool | None, optional
        If True, only return headers that have at least one attached document. If False, only return headers with no documents. If None, no filter is applied. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".

    Returns
    -------
    list[int]
        List of transaction header IDs matching the filters.
    """
    where = self._check_get_args(
        ids=ids,
        center_names=center_names,
        storage_location_names=storage_location_names,
        transaction_type_names=transaction_type_names,
        directions=directions,
        service_order_names=service_order_names,
        period=period,
        notes_search=notes_search,
        has_documents=has_documents,
        filter_type=filter_type,
    )

    query = sql.SQL("SELECT id FROM performance.v_inv_transaction_headers {where} ORDER BY id").format(where=where)

    df = self._perfdb.conn.read_to_polars(query, schema_overrides=self._cols_schema)

    return df["id"].to_list()

get_stats(center_names=None, storage_location_names=None, transaction_type_names=None, directions=None, service_order_names=None, creator_names=None, period=None, filter_type='and', top_n=20)

Returns a collection of aggregated statistics about transaction headers for use in dashboards.

All filters are applied consistently across every stat category so only the relevant subset of data is analyzed, keeping queries light.

Parameters:

  • center_names

    (list[str] | None, default: None ) –

    List of center names to filter. By default None.

  • storage_location_names

    (list[str] | None, default: None ) –

    List of storage location names to filter. By default None.

  • transaction_type_names

    (list[str] | None, default: None ) –

    List of transaction type names to filter. By default None.

  • directions

    (list[str] | None, default: None ) –

    List of directions to filter. Valid values: ENTRADA, SAIDA, NEUTRO. By default None.

  • service_order_names

    (list[str] | None, default: None ) –

    List of service order names to filter. By default None.

  • creator_names

    (list[str] | None, default: None ) –

    List of creator (user) names to filter. By default None.

  • period

    (DateTimeRange | None, default: None ) –

    Date range to filter by transaction_date. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

  • top_n

    (int, default: 20 ) –

    Maximum number of rows returned for ranked/top lists (e.g. top_materials, sla_by_creator). By default 20.

Returns:

  • dict[str, DataFrame]

    Dictionary with the following keys, each containing a Polars DataFrame:

    • "by_type" — header count and SLA violations per transaction type and direction. Columns: transaction_type_name, direction, header_count, sla_violation_count.

    • "by_center" — header count and SLA violation rate per center. Columns: center_name, header_count, sla_violation_count, sla_violation_rate.

    • "by_storage_location" — header count and SLA violation rate per storage location. Columns: center_name, storage_location_name, header_count, sla_violation_count, sla_violation_rate.

    • "by_creator" — header count and SLA violation rate per creator. Columns: creator_name, header_count, sla_violation_count, sla_violation_rate.

    • "by_direction" — aggregated item quantities per movement direction. Columns: direction, header_count, item_count, total_quantity.

    • "by_date" — daily header count and SLA violations. Columns: transaction_date, header_count, sla_violation_count.

    • "sla_summary" — single-row global SLA summary. Columns: total_headers, sla_ok_count, sla_violation_count, sla_violation_rate.

    • "sla_by_creator" — top_n creators ranked by SLA violation count (descending). Columns: creator_name, sla_violation_count, total_headers, sla_violation_rate.

    • "top_materials" — top_n most transacted materials by total quantity (descending). Columns: material_description, material_sap_id, base_unit, total_quantity, header_count.

Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def get_stats(
    self,
    center_names: list[str] | None = None,
    storage_location_names: list[str] | None = None,
    transaction_type_names: list[str] | None = None,
    directions: list[str] | None = None,
    service_order_names: list[str] | None = None,
    creator_names: list[str] | None = None,
    period: DateTimeRange | None = None,
    filter_type: Literal["and", "or"] = "and",
    top_n: int = 20,
) -> dict[str, pl.DataFrame]:
    """Returns a collection of aggregated statistics about transaction headers for use in dashboards.

    All filters are applied consistently across every stat category so only the relevant subset
    of data is analyzed, keeping queries light.

    Parameters
    ----------
    center_names : list[str] | None, optional
        List of center names to filter. By default None.
    storage_location_names : list[str] | None, optional
        List of storage location names to filter. By default None.
    transaction_type_names : list[str] | None, optional
        List of transaction type names to filter. By default None.
    directions : list[str] | None, optional
        List of directions to filter. Valid values: ENTRADA, SAIDA, NEUTRO. By default None.
    service_order_names : list[str] | None, optional
        List of service order names to filter. By default None.
    creator_names : list[str] | None, optional
        List of creator (user) names to filter. By default None.
    period : DateTimeRange | None, optional
        Date range to filter by transaction_date. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".
    top_n : int, optional
        Maximum number of rows returned for ranked/top lists (e.g. top_materials, sla_by_creator).
        By default 20.

    Returns
    -------
    dict[str, pl.DataFrame]
        Dictionary with the following keys, each containing a Polars DataFrame:

        - ``"by_type"`` — header count and SLA violations per transaction type and direction.
          Columns: transaction_type_name, direction, header_count, sla_violation_count.

        - ``"by_center"`` — header count and SLA violation rate per center.
          Columns: center_name, header_count, sla_violation_count, sla_violation_rate.

        - ``"by_storage_location"`` — header count and SLA violation rate per storage location.
          Columns: center_name, storage_location_name, header_count, sla_violation_count, sla_violation_rate.

        - ``"by_creator"`` — header count and SLA violation rate per creator.
          Columns: creator_name, header_count, sla_violation_count, sla_violation_rate.

        - ``"by_direction"`` — aggregated item quantities per movement direction.
          Columns: direction, header_count, item_count, total_quantity.

        - ``"by_date"`` — daily header count and SLA violations.
          Columns: transaction_date, header_count, sla_violation_count.

        - ``"sla_summary"`` — single-row global SLA summary.
          Columns: total_headers, sla_ok_count, sla_violation_count, sla_violation_rate.

        - ``"sla_by_creator"`` — top_n creators ranked by SLA violation count (descending).
          Columns: creator_name, sla_violation_count, total_headers, sla_violation_rate.

        - ``"top_materials"`` — top_n most transacted materials by total quantity (descending).
          Columns: material_description, material_sap_id, base_unit, total_quantity, header_count.
    """
    # Build common WHERE from shared filters (reuse _check_get_args, ids/notes_search/has_documents fixed to None)
    where_base = self._check_get_args(
        ids=None,
        center_names=center_names,
        storage_location_names=storage_location_names,
        transaction_type_names=transaction_type_names,
        directions=directions,
        service_order_names=service_order_names,
        period=period,
        notes_search=None,
        has_documents=None,
        filter_type=filter_type,
    )

    # Append creator_names filter on top of the shared WHERE
    if creator_names:
        creator_cond = sql.SQL("creator_name = ANY({creator_names})").format(creator_names=sql.Literal(creator_names))
        if where_base == sql.SQL(""):
            where = sql.SQL("WHERE ") + creator_cond
        else:
            where = where_base + sql.SQL(f" {filter_type.upper()} ") + creator_cond
    else:
        where = where_base

    result: dict[str, pl.DataFrame] = {}

    # -- by_type --
    result["by_type"] = self._perfdb.conn.read_to_polars(
        sql.SQL("""
            SELECT
                transaction_type_name,
                direction,
                COUNT(*) AS header_count,
                SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violation_count
            FROM performance.v_inv_transaction_headers
            {where}
            GROUP BY transaction_type_name, direction
            ORDER BY header_count DESC
        """).format(where=where),
    )

    # -- by_center --
    result["by_center"] = self._perfdb.conn.read_to_polars(
        sql.SQL("""
            SELECT
                center_name,
                COUNT(*) AS header_count,
                SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violation_count,
                ROUND(
                    SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END)::numeric / NULLIF(COUNT(*), 0) * 100,
                    2
                ) AS sla_violation_rate
            FROM performance.v_inv_transaction_headers
            {where}
            GROUP BY center_name
            ORDER BY header_count DESC
        """).format(where=where),
    )

    # -- by_storage_location --
    result["by_storage_location"] = self._perfdb.conn.read_to_polars(
        sql.SQL("""
            SELECT
                center_name,
                storage_location_name,
                COUNT(*) AS header_count,
                SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violation_count,
                ROUND(
                    SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END)::numeric / NULLIF(COUNT(*), 0) * 100,
                    2
                ) AS sla_violation_rate
            FROM performance.v_inv_transaction_headers
            {where}
            GROUP BY center_name, storage_location_name
            ORDER BY header_count DESC
        """).format(where=where),
    )

    # -- by_creator --
    result["by_creator"] = self._perfdb.conn.read_to_polars(
        sql.SQL("""
            SELECT
                creator_name,
                COUNT(*) AS header_count,
                SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violation_count,
                ROUND(
                    SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END)::numeric / NULLIF(COUNT(*), 0) * 100,
                    2
                ) AS sla_violation_rate
            FROM performance.v_inv_transaction_headers
            {where}
            GROUP BY creator_name
            ORDER BY header_count DESC
        """).format(where=where),
    )

    # -- by_direction  (uses v_inv_transaction_items which carries all header columns) --
    result["by_direction"] = self._perfdb.conn.read_to_polars(
        sql.SQL("""
            SELECT
                direction,
                COUNT(DISTINCT transaction_header_id) AS header_count,
                COUNT(*) AS item_count,
                SUM(absolute_quantity) AS total_quantity
            FROM performance.v_inv_transaction_items
            {where}
            GROUP BY direction
            ORDER BY total_quantity DESC
        """).format(where=where),
    )

    # -- by_date --
    result["by_date"] = self._perfdb.conn.read_to_polars(
        sql.SQL("""
            SELECT
                transaction_date,
                COUNT(*) AS header_count,
                SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violation_count
            FROM performance.v_inv_transaction_headers
            {where}
            GROUP BY transaction_date
            ORDER BY transaction_date
        """).format(where=where),
    )

    # -- sla_summary --
    result["sla_summary"] = self._perfdb.conn.read_to_polars(
        sql.SQL("""
            SELECT
                COUNT(*) AS total_headers,
                SUM(CASE WHEN NOT is_sla_violated THEN 1 ELSE 0 END) AS sla_ok_count,
                SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violation_count,
                ROUND(
                    SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END)::numeric / NULLIF(COUNT(*), 0) * 100,
                    2
                ) AS sla_violation_rate
            FROM performance.v_inv_transaction_headers
            {where}
        """).format(where=where),
    )

    # -- sla_by_creator (top_n) --
    result["sla_by_creator"] = self._perfdb.conn.read_to_polars(
        sql.SQL("""
            SELECT
                creator_name,
                SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violation_count,
                COUNT(*) AS total_headers,
                ROUND(
                    SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END)::numeric / NULLIF(COUNT(*), 0) * 100,
                    2
                ) AS sla_violation_rate
            FROM performance.v_inv_transaction_headers
            {where}
            GROUP BY creator_name
            ORDER BY sla_violation_count DESC
            LIMIT {top_n}
        """).format(where=where, top_n=sql.Literal(top_n)),
    )

    # -- top_materials (top_n, uses v_inv_transaction_items) --
    result["top_materials"] = self._perfdb.conn.read_to_polars(
        sql.SQL("""
            SELECT
                material_description,
                material_sap_id,
                base_unit,
                SUM(absolute_quantity) AS total_quantity,
                COUNT(DISTINCT transaction_header_id) AS header_count
            FROM performance.v_inv_transaction_items
            {where}
            GROUP BY material_description, material_sap_id, base_unit
            ORDER BY total_quantity DESC
            LIMIT {top_n}
        """).format(where=where, top_n=sql.Literal(top_n)),
    )

    logger.debug(
        f"get_stats() returned {len(result)} stat categories for transaction headers.",
    )
    return result

insert(storage_location_name=None, center_name=None, transaction_date=None, transaction_type_name=None, reference_number=None, withdrawal_id=None, notes=None, created_by_name=None, service_order_name=None, data_df=None)

Inserts a new inventory transaction header into the database.

You can either pass individual values to insert a single header, or pass a DataFrame to insert multiple headers at once.

Parameters:

  • storage_location_name

    (str | None, default: None ) –

    Name of the storage location. By default None.

  • center_name

    (str | None, default: None ) –

    Name of the center (used together with storage_location_name to resolve the storage location ID). By default None.

  • transaction_date

    (date | None, default: None ) –

    Date of the transaction. By default None.

  • transaction_type_name

    (str | None, default: None ) –

    Name of the transaction type. Must exist in inv_transaction_types table. By default None.

  • reference_number

    (str | None, default: None ) –

    Optional reference number for the transaction. By default None.

  • withdrawal_id

    (int | None, default: None ) –

    Optional ID of an associated withdrawal. By default None.

  • notes

    (str | None, default: None ) –

    Optional notes. By default None.

  • created_by_name

    (str | None, default: None ) –

    Name of the user creating the transaction. Must exist in users table. By default None.

  • service_order_name

    (str | None, default: None ) –

    Name of the associated service order. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    Polars DataFrame containing multiple headers to be inserted. Required columns: storage_location_name, center_name, transaction_date, transaction_type_name, created_by_name. Optional: reference_number, withdrawal_id, notes, service_order_name. If this is used, all individual parameters will be ignored. By default None.

Returns:

  • int | list[int] | None

    If inserting a single header, returns the ID. If inserting multiple, returns a list of IDs. Returns None if nothing was inserted.

Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def insert(
    self,
    storage_location_name: str | None = None,
    center_name: str | None = None,
    transaction_date: date | None = None,
    transaction_type_name: str | None = None,
    reference_number: str | None = None,
    withdrawal_id: int | None = None,
    notes: str | None = None,
    created_by_name: str | None = None,
    service_order_name: str | None = None,
    data_df: pl.DataFrame | None = None,
) -> int | list[int] | None:
    """Inserts a new inventory transaction header into the database.

    You can either pass individual values to insert a single header, or pass a DataFrame
    to insert multiple headers at once.

    Parameters
    ----------
    storage_location_name : str | None, optional
        Name of the storage location. By default None.
    center_name : str | None, optional
        Name of the center (used together with storage_location_name to resolve the storage location ID). By default None.
    transaction_date : date | None, optional
        Date of the transaction. By default None.
    transaction_type_name : str | None, optional
        Name of the transaction type. Must exist in inv_transaction_types table. By default None.
    reference_number : str | None, optional
        Optional reference number for the transaction. By default None.
    withdrawal_id : int | None, optional
        Optional ID of an associated withdrawal. By default None.
    notes : str | None, optional
        Optional notes. By default None.
    created_by_name : str | None, optional
        Name of the user creating the transaction. Must exist in users table. By default None.
    service_order_name : str | None, optional
        Name of the associated service order. By default None.
    data_df : pl.DataFrame | None, optional
        Polars DataFrame containing multiple headers to be inserted.
        Required columns: storage_location_name, center_name, transaction_date, transaction_type_name, created_by_name.
        Optional: reference_number, withdrawal_id, notes, service_order_name.
        If this is used, all individual parameters will be ignored. By default None.

    Returns
    -------
    int | list[int] | None
        If inserting a single header, returns the ID.
        If inserting multiple, returns a list of IDs.
        Returns None if nothing was inserted.
    """
    df_schema = {
        "storage_location_name": pl.Utf8,
        "center_name": pl.Utf8,
        "transaction_date": pl.Date,
        "transaction_type_name": pl.Utf8,
        "reference_number": pl.Utf8,
        "withdrawal_id": pl.Int64,
        "notes": pl.Utf8,
        "created_by_name": pl.Utf8,
        "service_order_name": pl.Utf8,
    }

    if data_df is None:
        single_insert = True
        data_df = pl.DataFrame(
            {
                "storage_location_name": [storage_location_name],
                "center_name": [center_name],
                "transaction_date": [transaction_date],
                "transaction_type_name": [transaction_type_name],
                "reference_number": [reference_number],
                "withdrawal_id": [withdrawal_id],
                "notes": [notes],
                "created_by_name": [created_by_name],
                "service_order_name": [service_order_name],
            },
            schema=df_schema,
        )
    else:
        single_insert = False
        # converting to expected schema
        data_df = data_df.cast({col: dtype for col, dtype in df_schema.items() if col in data_df.columns})

    # checking required columns
    required_cols = ["storage_location_name", "center_name", "transaction_date", "transaction_type_name", "created_by_name"]
    for col in required_cols:
        # check if column exists and has no nulls
        if col not in data_df.columns:
            raise ValueError(f"data_df is missing required column '{col}'.")
        if data_df[col].is_null().any():
            raise ValueError(f"data_df column '{col}' contains null values, but it is required.")

    # resolve storage_location_id from storage_location_name + center_name
    sl_ids_nested = self._perfdb.inventory.storage_locations.get_ids()
    # flatten to {(center, loc): id} for lookup
    sl_ids_flat = {(c, loc): loc_id for c, locs in sl_ids_nested.items() for loc, loc_id in locs.items()}

    pairs = list(zip(data_df["center_name"].to_list(), data_df["storage_location_name"].to_list(), strict=False))
    if wrong_sls := set(pairs) - set(sl_ids_flat.keys()):
        raise ValueError(f"Storage locations not found in the database: {wrong_sls}")

    data_df = data_df.with_columns(
        pl.Series("storage_location_id", [sl_ids_flat[(c, loc)] for c, loc in pairs], dtype=pl.Int64),
    )

    # resolve transaction_type_name to transaction_type_id
    type_ids = self._perfdb.inventory.transactions.types.get_ids()
    if wrong_types := set(data_df["transaction_type_name"].to_list()) - set(type_ids.keys()):
        raise ValueError(f"Transaction type names not found in the database: {wrong_types}")

    data_df = data_df.with_columns(
        pl.col("transaction_type_name").replace_strict(type_ids, return_dtype=pl.Int64).alias("transaction_type_id"),
    )

    # resolve created_by_name to created_by_id
    user_names = data_df["created_by_name"].drop_nulls().unique().to_list()
    if user_names:
        user_ids = self._perfdb.users.instances.get_ids(names=user_names)
        if wrong_users := set(user_names) - set(user_ids.keys()):
            raise ValueError(f"User names not found in the database: {wrong_users}")

        data_df = data_df.with_columns(
            pl.col("created_by_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("created_by_id"),
        )
    else:
        data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("created_by_id"))

    # resolve service_order_name to service_order_id
    if "service_order_name" in data_df.columns:
        if len(data_df.filter(pl.col("service_order_name").is_not_null())) > 0:
            so_names = data_df["service_order_name"].drop_nulls().unique().to_list()
            so_ids = self._perfdb.service_orders.get_ids(names=so_names)
            if wrong_sos := set(so_names) - set(so_ids.keys()):
                raise ValueError(f"Service order names not found in the database: {wrong_sos}")

            data_df = data_df.with_columns(
                pl.col("service_order_name").replace_strict(so_ids, return_dtype=pl.Int64, default=None).alias("service_order_id"),
            )
        else:
            data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("service_order_id"))

    # drop human-readable columns
    data_df = data_df.drop(
        ["storage_location_name", "center_name", "transaction_type_name", "created_by_name", "service_order_name"],
        strict=False,
    )

    ids_df = self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="inv_transaction_headers",
        schema="performance",
        return_cols=["id"],
        if_exists="skip_row_check",
        ignore_null_cols=single_insert,
    )

    ids = ids_df["id"].to_list()
    logger.debug(f"Inserted/updated {len(ids)} transaction header(s) with IDs: {ids}")

    return ids if not single_insert else ids[0] if ids else None

update(header_id=None, transaction_date=None, reference_number=None, is_sla_violated=None, notes=None, data_df=None)

Updates an existing inventory transaction header in the database.

Parameters:

  • header_id

    (int | None, default: None ) –

    ID of the header to be updated. Required for single updates. By default None.

  • transaction_date

    (date | None, default: None ) –

    New transaction date. By default None.

  • reference_number

    (str | None, default: None ) –

    New reference number. By default None.

  • is_sla_violated

    (bool | None, default: None ) –

    New SLA violation status. By default None.

  • notes

    (str | None, default: None ) –

    New notes. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    Polars DataFrame containing multiple headers to update. Required column: id. Optional: transaction_date, reference_number, is_sla_violated, notes. If this is used, all individual parameters will be ignored. By default None.

Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def update(
    self,
    header_id: int | None = None,
    transaction_date: date | None = None,
    reference_number: str | None = None,
    is_sla_violated: bool | None = None,
    notes: str | None = None,
    data_df: pl.DataFrame | None = None,
) -> None:
    """Updates an existing inventory transaction header in the database.

    Parameters
    ----------
    header_id : int | None, optional
        ID of the header to be updated. Required for single updates. By default None.
    transaction_date : date | None, optional
        New transaction date. By default None.
    reference_number : str | None, optional
        New reference number. By default None.
    is_sla_violated : bool | None, optional
        New SLA violation status. By default None.
    notes : str | None, optional
        New notes. By default None.
    data_df : pl.DataFrame | None, optional
        Polars DataFrame containing multiple headers to update.
        Required column: id. Optional: transaction_date, reference_number, is_sla_violated, notes.
        If this is used, all individual parameters will be ignored. By default None.
    """
    df_schema = {
        "id": pl.Int64,
        "transaction_date": pl.Date,
        "reference_number": pl.Utf8,
        "is_sla_violated": pl.Boolean,
        "notes": pl.Utf8,
    }

    if data_df is None:
        data_df = pl.DataFrame(
            {
                "id": [header_id],
                "transaction_date": [transaction_date],
                "reference_number": [reference_number],
                "is_sla_violated": [is_sla_violated],
                "notes": [notes],
            },
            schema=df_schema,
        )
        single_update = True
    else:
        single_update = False
        # converting to expected schema
        data_df = data_df.cast({col: dtype for col, dtype in df_schema.items() if col in data_df.columns})

    if "id" not in data_df.columns:
        raise ValueError("data_df is missing required column 'id'.")

    # check if IDs exist
    existing_query = sql.SQL("SELECT id FROM performance.inv_transaction_headers WHERE id = ANY({ids})").format(
        ids=sql.Literal(data_df["id"].to_list()),
    )
    existing_df = self._perfdb.conn.read_to_polars(existing_query, schema_overrides=self._cols_schema)
    if wrong_ids := set(data_df["id"].to_list()) - set(existing_df["id"].to_list()):
        raise ValueError(f"Transaction header IDs {wrong_ids} do not exist in the database.")

    self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="inv_transaction_headers",
        schema="performance",
        conflict_cols=["id"],
        if_exists="update_only",
        ignore_null_cols=single_update,
    )

    logger.debug(f"Updated {len(data_df)} transaction header(s).")