Skip to content

Inventory Transaction Headers

InventoryTransactionHeaders(perfdb)

Class used for handling Inventory Transaction Headers. Can be accessed via perfdb.inventory.transactions.headers.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/inventory_transaction_headers.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Class used for handling Inventory Transaction Headers. Can be accessed via `perfdb.inventory.transactions.headers`.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.
    """
    super().__init__(perfdb)

    from .inventory_transaction_header_documents import InventoryTransactionHeaderDocuments

    # * subclasses

    self.documents = InventoryTransactionHeaderDocuments(perfdb)

delete(header_ids)

Deletes inventory transaction headers from the database.

Parameters:

  • header_ids

    (list[int]) –

    List of transaction header IDs to be deleted.

Returns:

  • int

    Number of rows deleted.

Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def delete(self, header_ids: list[int]) -> int:
    """Deletes inventory transaction headers from the database.

    Parameters
    ----------
    header_ids : list[int]
        List of transaction header IDs to be deleted.

    Returns
    -------
    int
        Number of rows deleted.
    """
    query = sql.SQL("DELETE FROM performance.inv_transaction_headers WHERE id = ANY({ids})").format(
        ids=sql.Literal(header_ids),
    )

    with self._perfdb.conn.reconnect() as conn:
        conn.execute(query)
        deleted = conn.rowcount

    logger.debug(f"Deleted {deleted} transaction header(s).")
    return deleted

get(ids=None, center_names=None, storage_location_names=None, transaction_type_names=None, directions=None, service_order_names=None, period=None, notes_search=None, filter_type='and', output_type='pl.DataFrame')

Gets all inventory transaction headers and their attributes.

The most useful keys/columns returned are:

  • id
  • transaction_date
  • storage_location_name
  • center_name
  • transaction_type_name
  • direction
  • withdrawal_id
  • service_order_name
  • reference_number
  • is_sla_violated
  • notes
  • creator_name
  • display_label

Parameters:

  • ids

    (list[int] | None, default: None ) –

    List of transaction header IDs to filter. By default None.

  • center_names

    (list[str] | None, default: None ) –

    List of center names to filter. By default None.

  • storage_location_names

    (list[str] | None, default: None ) –

    List of storage location names to filter. By default None.

  • transaction_type_names

    (list[str] | None, default: None ) –

    List of transaction type names to filter. By default None.

  • directions

    (list[str] | None, default: None ) –

    List of directions to filter. By default None.

  • service_order_names

    (list[str] | None, default: None ) –

    List of service order names to filter. By default None.

  • period

    (DateTimeRange | None, default: None ) –

    Date range to filter by transaction_date. By default None.

  • notes_search

    (str | None, default: None ) –

    Plain-text search string matched against the notes_searchable tsvector column (GIN index, Portuguese language). By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

  • output_type

    (Literal['dict', 'DataFrame', 'pl.DataFrame'], default: 'pl.DataFrame' ) –

    Output type of the data. By default "pl.DataFrame".

Returns:

  • dict[int, dict[str, Any]]

    In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.

  • DataFrame

    In case output_type is "DataFrame", returns a pandas DataFrame with index = id.

  • DataFrame

    In case output_type is "pl.DataFrame", returns a Polars DataFrame.

Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def get(
    self,
    ids: list[int] | None = None,
    center_names: list[str] | None = None,
    storage_location_names: list[str] | None = None,
    transaction_type_names: list[str] | None = None,
    directions: list[str] | None = None,
    service_order_names: list[str] | None = None,
    period: DateTimeRange | None = None,
    notes_search: str | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
    """Gets all inventory transaction headers and their attributes.

    The most useful keys/columns returned are:

    - id
    - transaction_date
    - storage_location_name
    - center_name
    - transaction_type_name
    - direction
    - withdrawal_id
    - service_order_name
    - reference_number
    - is_sla_violated
    - notes
    - creator_name
    - display_label

    Parameters
    ----------
    ids : list[int] | None, optional
        List of transaction header IDs to filter. By default None.
    center_names : list[str] | None, optional
        List of center names to filter. By default None.
    storage_location_names : list[str] | None, optional
        List of storage location names to filter. By default None.
    transaction_type_names : list[str] | None, optional
        List of transaction type names to filter. By default None.
    directions : list[str] | None, optional
        List of directions to filter. By default None.
    service_order_names : list[str] | None, optional
        List of service order names to filter. By default None.
    period : DateTimeRange | None, optional
        Date range to filter by transaction_date. By default None.
    notes_search : str | None, optional
        Plain-text search string matched against the notes_searchable tsvector column (GIN index, Portuguese language). By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".
    output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
        Output type of the data. By default "pl.DataFrame".

    Returns
    -------
    dict[int, dict[str, Any]]
        In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
    pd.DataFrame
        In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
    pl.DataFrame
        In case output_type is "pl.DataFrame", returns a Polars DataFrame.
    """
    where = self._check_get_args(
        ids=ids,
        center_names=center_names,
        storage_location_names=storage_location_names,
        transaction_type_names=transaction_type_names,
        directions=directions,
        service_order_names=service_order_names,
        period=period,
        notes_search=notes_search,
        filter_type=filter_type,
    )

    query = sql.SQL("SELECT * FROM performance.v_inv_transaction_headers {where} ORDER BY id").format(where=where)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_polars(query)

    # drop notes_searchable column before returning
    df = df.drop("notes_searchable")

    if output_type == "pl.DataFrame":
        return df

    df = df.to_pandas(use_pyarrow_extension_array=True)
    df = df.set_index("id")

    if output_type == "DataFrame":
        return df

    return df.to_dict(orient="index")

get_ids(ids=None, center_names=None, storage_location_names=None, transaction_type_names=None, directions=None, service_order_names=None, period=None, notes_search=None, filter_type='and')

Gets all inventory transaction header IDs matching the provided filters.

Parameters:

  • ids

    (list[int] | None, default: None ) –

    List of transaction header IDs to filter. By default None.

  • center_names

    (list[str] | None, default: None ) –

    List of center names to filter. By default None.

  • storage_location_names

    (list[str] | None, default: None ) –

    List of storage location names to filter. By default None.

  • transaction_type_names

    (list[str] | None, default: None ) –

    List of transaction type names to filter. By default None.

  • directions

    (list[str] | None, default: None ) –

    List of directions to filter. By default None.

  • service_order_names

    (list[str] | None, default: None ) –

    List of service order names to filter. By default None.

  • period

    (DateTimeRange | None, default: None ) –

    Date range to filter by transaction_date. By default None.

  • notes_search

    (str | None, default: None ) –

    Plain-text search string matched against the notes_searchable tsvector column (GIN index, Portuguese language). By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

Returns:

  • list[int]

    List of transaction header IDs matching the filters.

Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def get_ids(
    self,
    ids: list[int] | None = None,
    center_names: list[str] | None = None,
    storage_location_names: list[str] | None = None,
    transaction_type_names: list[str] | None = None,
    directions: list[str] | None = None,
    service_order_names: list[str] | None = None,
    period: DateTimeRange | None = None,
    notes_search: str | None = None,
    filter_type: Literal["and", "or"] = "and",
) -> list[int]:
    """Gets all inventory transaction header IDs matching the provided filters.

    Parameters
    ----------
    ids : list[int] | None, optional
        List of transaction header IDs to filter. By default None.
    center_names : list[str] | None, optional
        List of center names to filter. By default None.
    storage_location_names : list[str] | None, optional
        List of storage location names to filter. By default None.
    transaction_type_names : list[str] | None, optional
        List of transaction type names to filter. By default None.
    directions : list[str] | None, optional
        List of directions to filter. By default None.
    service_order_names : list[str] | None, optional
        List of service order names to filter. By default None.
    period : DateTimeRange | None, optional
        Date range to filter by transaction_date. By default None.
    notes_search : str | None, optional
        Plain-text search string matched against the notes_searchable tsvector column (GIN index, Portuguese language). By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".

    Returns
    -------
    list[int]
        List of transaction header IDs matching the filters.
    """
    where = self._check_get_args(
        ids=ids,
        center_names=center_names,
        storage_location_names=storage_location_names,
        transaction_type_names=transaction_type_names,
        directions=directions,
        service_order_names=service_order_names,
        period=period,
        notes_search=notes_search,
        filter_type=filter_type,
    )

    query = sql.SQL("SELECT id FROM performance.v_inv_transaction_headers {where} ORDER BY id").format(where=where)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_polars(query)

    return df["id"].to_list()

insert(storage_location_name=None, center_name=None, transaction_date=None, transaction_type_name=None, reference_number=None, withdrawal_id=None, notes=None, created_by_name=None, service_order_name=None, data_df=None)

Inserts a new inventory transaction header into the database.

You can either pass individual values to insert a single header, or pass a DataFrame to insert multiple headers at once.

Parameters:

  • storage_location_name

    (str | None, default: None ) –

    Name of the storage location. By default None.

  • center_name

    (str | None, default: None ) –

    Name of the center (used together with storage_location_name to resolve the storage location ID). By default None.

  • transaction_date

    (date | None, default: None ) –

    Date of the transaction. By default None.

  • transaction_type_name

    (str | None, default: None ) –

    Name of the transaction type. Must exist in inv_transaction_types table. By default None.

  • reference_number

    (str | None, default: None ) –

    Optional reference number for the transaction. By default None.

  • withdrawal_id

    (int | None, default: None ) –

    Optional ID of an associated withdrawal. By default None.

  • notes

    (str | None, default: None ) –

    Optional notes. By default None.

  • created_by_name

    (str | None, default: None ) –

    Name of the user creating the transaction. Must exist in users table. By default None.

  • service_order_name

    (str | None, default: None ) –

    Name of the associated service order. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    Polars DataFrame containing multiple headers to be inserted. Required columns: storage_location_name, center_name, transaction_date, transaction_type_name, created_by_name. Optional: reference_number, withdrawal_id, notes, service_order_name. If this is used, all individual parameters will be ignored. By default None.

Returns:

  • int | list[int] | None

    If inserting a single header, returns the ID. If inserting multiple, returns a list of IDs. Returns None if nothing was inserted.

Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def insert(
    self,
    storage_location_name: str | None = None,
    center_name: str | None = None,
    transaction_date: date | None = None,
    transaction_type_name: str | None = None,
    reference_number: str | None = None,
    withdrawal_id: int | None = None,
    notes: str | None = None,
    created_by_name: str | None = None,
    service_order_name: str | None = None,
    data_df: pl.DataFrame | None = None,
) -> int | list[int] | None:
    """Inserts a new inventory transaction header into the database.

    You can either pass individual values to insert a single header, or pass a DataFrame
    to insert multiple headers at once.

    Parameters
    ----------
    storage_location_name : str | None, optional
        Name of the storage location. By default None.
    center_name : str | None, optional
        Name of the center (used together with storage_location_name to resolve the storage location ID). By default None.
    transaction_date : date | None, optional
        Date of the transaction. By default None.
    transaction_type_name : str | None, optional
        Name of the transaction type. Must exist in inv_transaction_types table. By default None.
    reference_number : str | None, optional
        Optional reference number for the transaction. By default None.
    withdrawal_id : int | None, optional
        Optional ID of an associated withdrawal. By default None.
    notes : str | None, optional
        Optional notes. By default None.
    created_by_name : str | None, optional
        Name of the user creating the transaction. Must exist in users table. By default None.
    service_order_name : str | None, optional
        Name of the associated service order. By default None.
    data_df : pl.DataFrame | None, optional
        Polars DataFrame containing multiple headers to be inserted.
        Required columns: storage_location_name, center_name, transaction_date, transaction_type_name, created_by_name.
        Optional: reference_number, withdrawal_id, notes, service_order_name.
        If this is used, all individual parameters will be ignored. By default None.

    Returns
    -------
    int | list[int] | None
        If inserting a single header, returns the ID.
        If inserting multiple, returns a list of IDs.
        Returns None if nothing was inserted.
    """
    df_schema = {
        "storage_location_name": pl.Utf8,
        "center_name": pl.Utf8,
        "transaction_date": pl.Date,
        "transaction_type_name": pl.Utf8,
        "reference_number": pl.Utf8,
        "withdrawal_id": pl.Int64,
        "notes": pl.Utf8,
        "created_by_name": pl.Utf8,
        "service_order_name": pl.Utf8,
    }

    if data_df is None:
        single_insert = True
        data_df = pl.DataFrame(
            {
                "storage_location_name": [storage_location_name],
                "center_name": [center_name],
                "transaction_date": [transaction_date],
                "transaction_type_name": [transaction_type_name],
                "reference_number": [reference_number],
                "withdrawal_id": [withdrawal_id],
                "notes": [notes],
                "created_by_name": [created_by_name],
                "service_order_name": [service_order_name],
            },
            schema=df_schema,
        )
    else:
        single_insert = False

    # checking required columns
    required_cols = ["storage_location_name", "center_name", "transaction_date", "transaction_type_name", "created_by_name"]
    for col in required_cols:
        # check if column exists and has no nulls
        if col not in data_df.columns:
            raise ValueError(f"data_df is missing required column '{col}'.")
        if data_df[col].is_null().any():
            raise ValueError(f"data_df column '{col}' contains null values, but it is required.")

    # resolve storage_location_id from storage_location_name + center_name
    sl_ids_nested = self._perfdb.inventory.storage_locations.get_ids()
    # flatten to {(center, loc): id} for lookup
    sl_ids_flat = {(c, loc): loc_id for c, locs in sl_ids_nested.items() for loc, loc_id in locs.items()}

    pairs = list(zip(data_df["center_name"].to_list(), data_df["storage_location_name"].to_list(), strict=False))
    if wrong_sls := set(pairs) - set(sl_ids_flat.keys()):
        raise ValueError(f"Storage locations not found in the database: {wrong_sls}")

    data_df = data_df.with_columns(
        pl.Series("storage_location_id", [sl_ids_flat[(c, loc)] for c, loc in pairs], dtype=pl.Int64),
    )

    # resolve transaction_type_name to transaction_type_id
    type_ids = self._perfdb.inventory.transactions.types.get_ids()
    if wrong_types := set(data_df["transaction_type_name"].to_list()) - set(type_ids.keys()):
        raise ValueError(f"Transaction type names not found in the database: {wrong_types}")

    data_df = data_df.with_columns(
        pl.col("transaction_type_name").replace_strict(type_ids, return_dtype=pl.Int64).alias("transaction_type_id"),
    )

    # resolve created_by_name to created_by_id
    user_names = data_df["created_by_name"].drop_nulls().unique().to_list()
    if user_names:
        user_ids = self._perfdb.users.instances.get_ids(names=user_names)
        if wrong_users := set(user_names) - set(user_ids.keys()):
            raise ValueError(f"User names not found in the database: {wrong_users}")

        data_df = data_df.with_columns(
            pl.col("created_by_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("created_by_id"),
        )
    else:
        data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("created_by_id"))

    # resolve service_order_name to service_order_id
    if "service_order_name" in data_df.columns:
        if len(data_df.filter(pl.col("service_order_name").is_not_null())) > 0:
            so_names = data_df["service_order_name"].drop_nulls().unique().to_list()
            so_ids = self._perfdb.service_orders.get_ids(names=so_names)
            if wrong_sos := set(so_names) - set(so_ids.keys()):
                raise ValueError(f"Service order names not found in the database: {wrong_sos}")

            data_df = data_df.with_columns(
                pl.col("service_order_name").replace_strict(so_ids, return_dtype=pl.Int64, default=None).alias("service_order_id"),
            )
        else:
            data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("service_order_id"))

    # drop human-readable columns
    data_df = data_df.drop(
        ["storage_location_name", "center_name", "transaction_type_name", "created_by_name", "service_order_name"],
        strict=False,
    )

    ids_df = self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="inv_transaction_headers",
        schema="performance",
        return_cols=["id"],
        if_exists="skip_row_check",
        ignore_null_cols=single_insert,
    )

    ids = ids_df["id"].to_list()
    logger.debug(f"Inserted/updated {len(ids)} transaction header(s) with IDs: {ids}")

    return ids if not single_insert else ids[0] if ids else None

update(header_id=None, transaction_date=None, reference_number=None, is_sla_violated=None, notes=None, data_df=None)

Updates an existing inventory transaction header in the database.

Parameters:

  • header_id

    (int | None, default: None ) –

    ID of the header to be updated. Required for single updates. By default None.

  • transaction_date

    (date | None, default: None ) –

    New transaction date. By default None.

  • reference_number

    (str | None, default: None ) –

    New reference number. By default None.

  • is_sla_violated

    (bool | None, default: None ) –

    New SLA violation status. By default None.

  • notes

    (str | None, default: None ) –

    New notes. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    Polars DataFrame containing multiple headers to update. Required column: id. Optional: transaction_date, reference_number, is_sla_violated, notes. If this is used, all individual parameters will be ignored. By default None.

Source code in echo_postgres/inventory_transaction_headers.py
@validate_call
def update(
    self,
    header_id: int | None = None,
    transaction_date: date | None = None,
    reference_number: str | None = None,
    is_sla_violated: bool | None = None,
    notes: str | None = None,
    data_df: pl.DataFrame | None = None,
) -> None:
    """Updates an existing inventory transaction header in the database.

    Parameters
    ----------
    header_id : int | None, optional
        ID of the header to be updated. Required for single updates. By default None.
    transaction_date : date | None, optional
        New transaction date. By default None.
    reference_number : str | None, optional
        New reference number. By default None.
    is_sla_violated : bool | None, optional
        New SLA violation status. By default None.
    notes : str | None, optional
        New notes. By default None.
    data_df : pl.DataFrame | None, optional
        Polars DataFrame containing multiple headers to update.
        Required column: id. Optional: transaction_date, reference_number, is_sla_violated, notes.
        If this is used, all individual parameters will be ignored. By default None.
    """
    df_schema = {
        "id": pl.Int64,
        "transaction_date": pl.Date,
        "reference_number": pl.Utf8,
        "is_sla_violated": pl.Boolean,
        "notes": pl.Utf8,
    }

    if data_df is None:
        data_df = pl.DataFrame(
            {
                "id": [header_id],
                "transaction_date": [transaction_date],
                "reference_number": [reference_number],
                "is_sla_violated": [is_sla_violated],
                "notes": [notes],
            },
            schema=df_schema,
        )
        single_update = True
    else:
        single_update = False

    if "id" not in data_df.columns:
        raise ValueError("data_df is missing required column 'id'.")

    # check if IDs exist
    existing_query = sql.SQL("SELECT id FROM performance.inv_transaction_headers WHERE id = ANY({ids})").format(
        ids=sql.Literal(data_df["id"].to_list()),
    )
    with self._perfdb.conn.reconnect() as conn:
        existing_df = conn.read_to_polars(existing_query)
    if wrong_ids := set(data_df["id"].to_list()) - set(existing_df["id"].to_list()):
        raise ValueError(f"Transaction header IDs {wrong_ids} do not exist in the database.")

    self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="inv_transaction_headers",
        schema="performance",
        conflict_cols=["id"],
        if_exists="update_only",
        ignore_null_cols=single_update,
    )

    logger.debug(f"Updated {len(data_df)} transaction header(s).")