Skip to content

Inventory Transaction Items

InventoryTransactionItems(perfdb)

Class used for handling Inventory Transaction Items. Can be accessed via perfdb.inventory.transactions.items.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(item_ids)

Deletes inventory transaction items from the database.

Parameters:

  • item_ids

    (list[int]) –

    List of transaction item IDs to be deleted.

Returns:

  • int

    Number of rows deleted.

Source code in echo_postgres/inventory_transaction_items.py
@validate_call
def delete(self, item_ids: list[int]) -> int:
    """Deletes inventory transaction items from the database.

    Parameters
    ----------
    item_ids : list[int]
        List of transaction item IDs to be deleted.

    Returns
    -------
    int
        Number of rows deleted.
    """
    query = sql.SQL("DELETE FROM performance.inv_transaction_items WHERE id = ANY({ids})").format(
        ids=sql.Literal(item_ids),
    )

    with self._perfdb.conn.reconnect() as conn:
        conn.execute(query)
        deleted = conn.rowcount

    logger.debug(f"Deleted {deleted} transaction item(s).")
    return deleted

get(ids=None, transaction_header_ids=None, center_names=None, storage_location_names=None, material_descriptions=None, material_sap_ids=None, transaction_type_names=None, directions=None, service_order_names=None, period=None, filter_type='and', output_type='pl.DataFrame')

Gets all inventory transaction items and their attributes.

The most useful keys/columns returned are:

  • id
  • transaction_header_id
  • transaction_date
  • storage_location_name
  • center_name
  • material_description
  • material_sap_id
  • base_unit
  • transaction_type_name
  • direction
  • signed_quantity
  • absolute_quantity
  • service_order_name
  • creator_name

Parameters:

  • ids

    (list[int] | None, default: None ) –

    List of transaction item IDs to filter. By default None.

  • transaction_header_ids

    (list[int] | None, default: None ) –

    List of transaction header IDs to filter. By default None.

  • center_names

    (list[str] | None, default: None ) –

    List of center names to filter. By default None.

  • storage_location_names

    (list[str] | None, default: None ) –

    List of storage location names to filter. By default None.

  • material_descriptions

    (list[str] | None, default: None ) –

    List of material descriptions to filter. By default None.

  • material_sap_ids

    (list[int] | None, default: None ) –

    List of material SAP IDs to filter. By default None.

  • transaction_type_names

    (list[str] | None, default: None ) –

    List of transaction type names to filter. By default None.

  • directions

    (list[str] | None, default: None ) –

    List of directions to filter. By default None.

  • service_order_names

    (list[str] | None, default: None ) –

    List of service order names to filter. By default None.

  • period

    (DateTimeRange | None, default: None ) –

    Date range to filter by transaction_date. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

  • output_type

    (Literal['dict', 'DataFrame', 'pl.DataFrame'], default: 'pl.DataFrame' ) –

    Output type of the data. By default "pl.DataFrame".

Returns:

  • dict[int, dict[str, Any]]

    In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.

  • DataFrame

    In case output_type is "DataFrame", returns a pandas DataFrame with index = id.

  • DataFrame

    In case output_type is "pl.DataFrame", returns a Polars DataFrame.

Source code in echo_postgres/inventory_transaction_items.py
@validate_call
def get(
    self,
    ids: list[int] | None = None,
    transaction_header_ids: list[int] | None = None,
    center_names: list[str] | None = None,
    storage_location_names: list[str] | None = None,
    material_descriptions: list[str] | None = None,
    material_sap_ids: list[int] | None = None,
    transaction_type_names: list[str] | None = None,
    directions: list[str] | None = None,
    service_order_names: list[str] | None = None,
    period: DateTimeRange | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
    """Gets all inventory transaction items and their attributes.

    The most useful keys/columns returned are:

    - id
    - transaction_header_id
    - transaction_date
    - storage_location_name
    - center_name
    - material_description
    - material_sap_id
    - base_unit
    - transaction_type_name
    - direction
    - signed_quantity
    - absolute_quantity
    - service_order_name
    - creator_name

    Parameters
    ----------
    ids : list[int] | None, optional
        List of transaction item IDs to filter. By default None.
    transaction_header_ids : list[int] | None, optional
        List of transaction header IDs to filter. By default None.
    center_names : list[str] | None, optional
        List of center names to filter. By default None.
    storage_location_names : list[str] | None, optional
        List of storage location names to filter. By default None.
    material_descriptions : list[str] | None, optional
        List of material descriptions to filter. By default None.
    material_sap_ids : list[int] | None, optional
        List of material SAP IDs to filter. By default None.
    transaction_type_names : list[str] | None, optional
        List of transaction type names to filter. By default None.
    directions : list[str] | None, optional
        List of directions to filter. By default None.
    service_order_names : list[str] | None, optional
        List of service order names to filter. By default None.
    period : DateTimeRange | None, optional
        Date range to filter by transaction_date. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".
    output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
        Output type of the data. By default "pl.DataFrame".

    Returns
    -------
    dict[int, dict[str, Any]]
        In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
    pd.DataFrame
        In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
    pl.DataFrame
        In case output_type is "pl.DataFrame", returns a Polars DataFrame.
    """
    where = self._check_get_args(
        ids=ids,
        transaction_header_ids=transaction_header_ids,
        center_names=center_names,
        storage_location_names=storage_location_names,
        material_descriptions=material_descriptions,
        material_sap_ids=material_sap_ids,
        transaction_type_names=transaction_type_names,
        directions=directions,
        service_order_names=service_order_names,
        period=period,
        filter_type=filter_type,
    )

    query = sql.SQL("SELECT * FROM performance.v_inv_transaction_items {where} ORDER BY transaction_header_id, id").format(
        where=where,
    )

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_polars(query)

    if output_type == "pl.DataFrame":
        return df

    df = df.to_pandas(use_pyarrow_extension_array=True)
    df = df.set_index("id")

    if output_type == "DataFrame":
        return df

    return df.to_dict(orient="index")

get_ids(ids=None, transaction_header_ids=None, center_names=None, storage_location_names=None, material_descriptions=None, material_sap_ids=None, transaction_type_names=None, directions=None, service_order_names=None, period=None, filter_type='and')

Gets all inventory transaction item IDs grouped by header ID.

Parameters:

  • ids

    (list[int] | None, default: None ) –

    List of transaction item IDs to filter. By default None.

  • transaction_header_ids

    (list[int] | None, default: None ) –

    List of transaction header IDs to filter. By default None.

  • center_names

    (list[str] | None, default: None ) –

    List of center names to filter. By default None.

  • storage_location_names

    (list[str] | None, default: None ) –

    List of storage location names to filter. By default None.

  • material_descriptions

    (list[str] | None, default: None ) –

    List of material descriptions to filter. By default None.

  • material_sap_ids

    (list[int] | None, default: None ) –

    List of material SAP IDs to filter. By default None.

  • transaction_type_names

    (list[str] | None, default: None ) –

    List of transaction type names to filter. By default None.

  • directions

    (list[str] | None, default: None ) –

    List of directions to filter. By default None.

  • service_order_names

    (list[str] | None, default: None ) –

    List of service order names to filter. By default None.

  • period

    (DateTimeRange | None, default: None ) –

    Date range to filter by transaction_date. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

Returns:

  • dict[int, list[int]]

    Dictionary with header IDs as keys and lists of item IDs as values.

Source code in echo_postgres/inventory_transaction_items.py
@validate_call
def get_ids(
    self,
    ids: list[int] | None = None,
    transaction_header_ids: list[int] | None = None,
    center_names: list[str] | None = None,
    storage_location_names: list[str] | None = None,
    material_descriptions: list[str] | None = None,
    material_sap_ids: list[int] | None = None,
    transaction_type_names: list[str] | None = None,
    directions: list[str] | None = None,
    service_order_names: list[str] | None = None,
    period: DateTimeRange | None = None,
    filter_type: Literal["and", "or"] = "and",
) -> dict[int, list[int]]:
    """Gets all inventory transaction item IDs grouped by header ID.

    Parameters
    ----------
    ids : list[int] | None, optional
        List of transaction item IDs to filter. By default None.
    transaction_header_ids : list[int] | None, optional
        List of transaction header IDs to filter. By default None.
    center_names : list[str] | None, optional
        List of center names to filter. By default None.
    storage_location_names : list[str] | None, optional
        List of storage location names to filter. By default None.
    material_descriptions : list[str] | None, optional
        List of material descriptions to filter. By default None.
    material_sap_ids : list[int] | None, optional
        List of material SAP IDs to filter. By default None.
    transaction_type_names : list[str] | None, optional
        List of transaction type names to filter. By default None.
    directions : list[str] | None, optional
        List of directions to filter. By default None.
    service_order_names : list[str] | None, optional
        List of service order names to filter. By default None.
    period : DateTimeRange | None, optional
        Date range to filter by transaction_date. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".

    Returns
    -------
    dict[int, list[int]]
        Dictionary with header IDs as keys and lists of item IDs as values.
    """
    where = self._check_get_args(
        ids=ids,
        transaction_header_ids=transaction_header_ids,
        center_names=center_names,
        storage_location_names=storage_location_names,
        material_descriptions=material_descriptions,
        material_sap_ids=material_sap_ids,
        transaction_type_names=transaction_type_names,
        directions=directions,
        service_order_names=service_order_names,
        period=period,
        filter_type=filter_type,
    )

    query = sql.SQL(
        "SELECT transaction_header_id, id FROM performance.v_inv_transaction_items {where} ORDER BY transaction_header_id, id",
    ).format(where=where)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_polars(query)

    df = df.group_by("transaction_header_id").agg(pl.col("id")).sort("transaction_header_id")
    return dict(zip(df["transaction_header_id"].to_list(), df["id"].to_list(), strict=False))

insert(transaction_header_id=None, material_description=None, quantity=None, created_by_name=None, data_df=None, on_conflict='ignore')

Inserts a new inventory transaction item into the database.

You can either pass individual values to insert a single item, or pass a DataFrame to insert multiple items at once.

Parameters:

  • transaction_header_id

    (int | None, default: None ) –

    ID of the transaction header this item belongs to. By default None.

  • material_description

    (str | None, default: None ) –

    Description of the material. Must exist in inv_materials table. By default None.

  • quantity

    (float | None, default: None ) –

    Quantity of the material. Must be > 0. By default None.

  • created_by_name

    (str | None, default: None ) –

    Name of the user creating the item. Must exist in users table. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    Polars DataFrame containing multiple items to insert. Required columns: transaction_header_id, material_description, quantity, created_by_name. If this is used, all individual parameters will be ignored. By default None.

  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict (based on transaction_header_id + material_id). By default "ignore".

Returns:

  • int | list[int] | None

    If inserting a single item, returns the ID. If inserting multiple, returns a list of IDs. Returns None if nothing was inserted.

Source code in echo_postgres/inventory_transaction_items.py
@validate_call
def insert(
    self,
    transaction_header_id: int | None = None,
    material_description: str | None = None,
    quantity: float | None = None,
    created_by_name: str | None = None,
    data_df: pl.DataFrame | None = None,
    on_conflict: Literal["ignore", "update"] = "ignore",
) -> int | list[int] | None:
    """Inserts a new inventory transaction item into the database.

    You can either pass individual values to insert a single item, or pass a DataFrame
    to insert multiple items at once.

    Parameters
    ----------
    transaction_header_id : int | None, optional
        ID of the transaction header this item belongs to. By default None.
    material_description : str | None, optional
        Description of the material. Must exist in inv_materials table. By default None.
    quantity : float | None, optional
        Quantity of the material. Must be > 0. By default None.
    created_by_name : str | None, optional
        Name of the user creating the item. Must exist in users table. By default None.
    data_df : pl.DataFrame | None, optional
        Polars DataFrame containing multiple items to insert.
        Required columns: transaction_header_id, material_description, quantity, created_by_name.
        If this is used, all individual parameters will be ignored. By default None.
    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict (based on transaction_header_id + material_id). By default "ignore".

    Returns
    -------
    int | list[int] | None
        If inserting a single item, returns the ID.
        If inserting multiple, returns a list of IDs.
        Returns None if nothing was inserted.
    """
    df_schema = {
        "transaction_header_id": pl.Int64,
        "material_description": pl.Utf8,
        "quantity": pl.Float64,
        "created_by_name": pl.Utf8,
    }

    if data_df is None:
        single_insert = True
        data_df = pl.DataFrame(
            {
                "transaction_header_id": [transaction_header_id],
                "material_description": [material_description],
                "quantity": [quantity],
                "created_by_name": [created_by_name],
            },
            schema=df_schema,
        )
    else:
        single_insert = False

    # check required columns
    required_cols = ["transaction_header_id", "material_description", "quantity", "created_by_name"]
    for col in required_cols:
        if col not in data_df.columns:
            raise ValueError(f"data_df is missing required column '{col}'.")
        if data_df[col].is_null().any():
            raise ValueError(f"data_df column '{col}' contains null values, but it is required.")

    # resolve material_description to material_id
    materials_df = self._perfdb.inventory.materials.get(output_type="pl.DataFrame")
    mat_desc_map = dict(
        zip(materials_df["description"].to_list(), materials_df["id"].to_list(), strict=False),
    )

    if wrong_mats := set(data_df["material_description"].to_list()) - set(mat_desc_map.keys()):
        raise ValueError(f"Material descriptions not found in the database: {wrong_mats}")

    data_df = data_df.with_columns(
        pl.col("material_description").replace_strict(mat_desc_map, return_dtype=pl.Int64).alias("material_id"),
    )

    # resolve created_by_name to created_by_id
    user_names = data_df["created_by_name"].drop_nulls().unique().to_list()
    if user_names:
        user_ids = self._perfdb.users.instances.get_ids(names=user_names)
        if wrong_users := set(user_names) - set(user_ids.keys()):
            raise ValueError(f"User names not found in the database: {wrong_users}")

        data_df = data_df.with_columns(
            pl.col("created_by_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("created_by_id"),
        )
    else:
        data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("created_by_id"))

    data_df = data_df.drop(["material_description", "created_by_name"])

    ids_df = self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="inv_transaction_items",
        schema="performance",
        conflict_cols=["transaction_header_id", "material_id"],
        return_cols=["id"],
        if_exists="append" if on_conflict == "ignore" else "update",
        ignore_null_cols=single_insert,
    )

    ids = ids_df["id"].to_list()
    logger.debug(f"Inserted/updated {len(ids)} transaction item(s) with IDs: {ids}")

    return ids if not single_insert else ids[0] if ids else None

update(item_id=None, quantity=None, data_df=None)

Updates an existing inventory transaction item in the database.

Parameters:

  • item_id

    (int | None, default: None ) –

    ID of the item to be updated. Required for single updates. By default None.

  • quantity

    (float | None, default: None ) –

    New quantity. Must be > 0. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    Polars DataFrame containing multiple items to update. Required column: id. Optional: quantity. If this is used, all individual parameters will be ignored. By default None.

Source code in echo_postgres/inventory_transaction_items.py
@validate_call
def update(
    self,
    item_id: int | None = None,
    quantity: float | None = None,
    data_df: pl.DataFrame | None = None,
) -> None:
    """Updates an existing inventory transaction item in the database.

    Parameters
    ----------
    item_id : int | None, optional
        ID of the item to be updated. Required for single updates. By default None.
    quantity : float | None, optional
        New quantity. Must be > 0. By default None.
    data_df : pl.DataFrame | None, optional
        Polars DataFrame containing multiple items to update.
        Required column: id. Optional: quantity.
        If this is used, all individual parameters will be ignored. By default None.
    """
    df_schema = {
        "id": pl.Int64,
        "quantity": pl.Float64,
    }

    if data_df is None:
        data_df = pl.DataFrame(
            {
                "id": [item_id],
                "quantity": [quantity],
            },
            schema=df_schema,
        )
        single_update = True
    else:
        single_update = False

    if "id" not in data_df.columns:
        raise ValueError("data_df is missing required column 'id'.")

    # check if IDs exist
    existing_query = sql.SQL("SELECT id FROM performance.inv_transaction_items WHERE id = ANY({ids})").format(
        ids=sql.Literal(data_df["id"].to_list()),
    )
    with self._perfdb.conn.reconnect() as conn:
        existing_df = conn.read_to_polars(existing_query)
    if wrong_ids := set(data_df["id"].to_list()) - set(existing_df["id"].to_list()):
        raise ValueError(f"Transaction item IDs {wrong_ids} do not exist in the database.")

    self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="inv_transaction_items",
        schema="performance",
        conflict_cols=["id"],
        if_exists="update_only",
        ignore_null_cols=single_update,
    )

    logger.debug(f"Updated {len(data_df)} transaction item(s).")