Skip to content

Inventory Physical Count Items

InventoryPhysicalCountItems(perfdb)

Class used for handling Inventory Physical Count Items. Can be accessed via perfdb.inventory.physical_counts.items.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(item_ids)

Deletes inventory physical count items from the database.

Parameters:

  • item_ids

    (list[int]) –

    List of item IDs to be deleted.

Returns:

  • int

    Number of rows deleted.

Source code in echo_postgres/inventory_physical_count_items.py
@validate_call
def delete(self, item_ids: list[int]) -> int:
    """Deletes inventory physical count items from the database.

    Parameters
    ----------
    item_ids : list[int]
        List of item IDs to be deleted.

    Returns
    -------
    int
        Number of rows deleted.
    """
    query = sql.SQL("DELETE FROM performance.inv_physical_count_items WHERE id = ANY({ids})").format(
        ids=sql.Literal(item_ids),
    )

    with self._perfdb.conn.reconnect() as conn:
        conn.execute(query)
        deleted = conn.rowcount

    logger.debug(f"Deleted {deleted} physical count item(s).")
    return deleted

get(ids=None, physical_count_ids=None, center_names=None, storage_location_names=None, material_descriptions=None, count_statuses=None, filter_type='and', output_type='pl.DataFrame')

Gets all inventory physical count items and their attributes.

The most useful keys/columns returned are:

  • id
  • physical_count_id
  • count_description
  • count_status
  • executed_date
  • material_description
  • material_sap_id
  • base_unit
  • system_quantity
  • counted_quantity
  • difference_quantity
  • counter_name
  • storage_location_name
  • center_name

Parameters:

  • ids

    (list[int] | None, default: None ) –

    List of item IDs to filter. By default None.

  • physical_count_ids

    (list[int] | None, default: None ) –

    List of physical count IDs to filter. By default None.

  • center_names

    (list[str] | None, default: None ) –

    List of center names to filter. By default None.

  • storage_location_names

    (list[str] | None, default: None ) –

    List of storage location names to filter. By default None.

  • material_descriptions

    (list[str] | None, default: None ) –

    List of material descriptions to filter. By default None.

  • count_statuses

    (list[str] | None, default: None ) –

    List of count statuses to filter. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

  • output_type

    (Literal['dict', 'DataFrame', 'pl.DataFrame'], default: 'pl.DataFrame' ) –

    Output type of the data. By default "pl.DataFrame".

Returns:

  • dict[int, dict[str, Any]]

    In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.

  • DataFrame

    In case output_type is "DataFrame", returns a pandas DataFrame with index = id.

  • DataFrame

    In case output_type is "pl.DataFrame", returns a Polars DataFrame.

Source code in echo_postgres/inventory_physical_count_items.py
@validate_call
def get(
    self,
    ids: list[int] | None = None,
    physical_count_ids: list[int] | None = None,
    center_names: list[str] | None = None,
    storage_location_names: list[str] | None = None,
    material_descriptions: list[str] | None = None,
    count_statuses: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
    """Gets all inventory physical count items and their attributes.

    The most useful keys/columns returned are:

    - id
    - physical_count_id
    - count_description
    - count_status
    - executed_date
    - material_description
    - material_sap_id
    - base_unit
    - system_quantity
    - counted_quantity
    - difference_quantity
    - counter_name
    - storage_location_name
    - center_name

    Parameters
    ----------
    ids : list[int] | None, optional
        List of item IDs to filter. By default None.
    physical_count_ids : list[int] | None, optional
        List of physical count IDs to filter. By default None.
    center_names : list[str] | None, optional
        List of center names to filter. By default None.
    storage_location_names : list[str] | None, optional
        List of storage location names to filter. By default None.
    material_descriptions : list[str] | None, optional
        List of material descriptions to filter. By default None.
    count_statuses : list[str] | None, optional
        List of count statuses to filter. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".
    output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
        Output type of the data. By default "pl.DataFrame".

    Returns
    -------
    dict[int, dict[str, Any]]
        In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
    pd.DataFrame
        In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
    pl.DataFrame
        In case output_type is "pl.DataFrame", returns a Polars DataFrame.
    """
    where = self._check_get_args(
        ids=ids,
        physical_count_ids=physical_count_ids,
        center_names=center_names,
        storage_location_names=storage_location_names,
        material_descriptions=material_descriptions,
        count_statuses=count_statuses,
        filter_type=filter_type,
    )

    query = sql.SQL("SELECT * FROM performance.v_inv_physical_count_items {where} ORDER BY physical_count_id, id").format(
        where=where,
    )

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_polars(query)

    if output_type == "pl.DataFrame":
        return df

    df = df.to_pandas(use_pyarrow_extension_array=True)
    df = df.set_index("id")

    if output_type == "DataFrame":
        return df

    return df.to_dict(orient="index")

get_ids(ids=None, physical_count_ids=None, center_names=None, storage_location_names=None, material_descriptions=None, count_statuses=None, filter_type='and')

Gets all inventory physical count item IDs grouped by physical count ID.

Parameters:

  • ids

    (list[int] | None, default: None ) –

    List of item IDs to filter. By default None.

  • physical_count_ids

    (list[int] | None, default: None ) –

    List of physical count IDs to filter. By default None.

  • center_names

    (list[str] | None, default: None ) –

    List of center names to filter. By default None.

  • storage_location_names

    (list[str] | None, default: None ) –

    List of storage location names to filter. By default None.

  • material_descriptions

    (list[str] | None, default: None ) –

    List of material descriptions to filter. By default None.

  • count_statuses

    (list[str] | None, default: None ) –

    List of count statuses to filter. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

Returns:

  • dict[int, list[int]]

    Dictionary with physical count IDs as keys and lists of item IDs as values.

Source code in echo_postgres/inventory_physical_count_items.py
@validate_call
def get_ids(
    self,
    ids: list[int] | None = None,
    physical_count_ids: list[int] | None = None,
    center_names: list[str] | None = None,
    storage_location_names: list[str] | None = None,
    material_descriptions: list[str] | None = None,
    count_statuses: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
) -> dict[int, list[int]]:
    """Gets all inventory physical count item IDs grouped by physical count ID.

    Parameters
    ----------
    ids : list[int] | None, optional
        List of item IDs to filter. By default None.
    physical_count_ids : list[int] | None, optional
        List of physical count IDs to filter. By default None.
    center_names : list[str] | None, optional
        List of center names to filter. By default None.
    storage_location_names : list[str] | None, optional
        List of storage location names to filter. By default None.
    material_descriptions : list[str] | None, optional
        List of material descriptions to filter. By default None.
    count_statuses : list[str] | None, optional
        List of count statuses to filter. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".

    Returns
    -------
    dict[int, list[int]]
        Dictionary with physical count IDs as keys and lists of item IDs as values.
    """
    where = self._check_get_args(
        ids=ids,
        physical_count_ids=physical_count_ids,
        center_names=center_names,
        storage_location_names=storage_location_names,
        material_descriptions=material_descriptions,
        count_statuses=count_statuses,
        filter_type=filter_type,
    )

    query = sql.SQL(
        "SELECT physical_count_id, id FROM performance.v_inv_physical_count_items {where} ORDER BY physical_count_id, id",
    ).format(where=where)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_polars(query)

    df = df.group_by("physical_count_id").agg(pl.col("id")).sort("physical_count_id")
    return dict(zip(df["physical_count_id"].to_list(), df["id"].to_list(), strict=False))

insert(physical_count_id=None, material_description=None, counted_quantity=None, counted_by_name=None, data_df=None)

Inserts a new inventory physical count item into the database.

You can either pass individual values to insert a single item, or pass a DataFrame to insert multiple items at once.

Parameters:

  • physical_count_id

    (int | None, default: None ) –

    ID of the physical count this item belongs to. By default None.

  • material_description

    (str | None, default: None ) –

    Description of the material. Must exist in inv_materials table. By default None.

  • counted_quantity

    (float | None, default: None ) –

    Counted quantity. Must be >= 0. By default None.

  • counted_by_name

    (str | None, default: None ) –

    Name of the user who counted. Must exist in users table. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    Polars DataFrame containing multiple items to insert. Required columns: physical_count_id, material_description. Optional: counted_quantity, counted_by_name, system_quantity. If this is used, all individual parameters will be ignored. By default None.

Returns:

  • int | list[int] | None

    If inserting a single item, returns the ID. If inserting multiple, returns a list of IDs. Returns None if nothing was inserted.

Source code in echo_postgres/inventory_physical_count_items.py
@validate_call
def insert(
    self,
    physical_count_id: int | None = None,
    material_description: str | None = None,
    counted_quantity: float | None = None,
    counted_by_name: str | None = None,
    data_df: pl.DataFrame | None = None,
) -> int | list[int] | None:
    """Inserts a new inventory physical count item into the database.

    You can either pass individual values to insert a single item, or pass a DataFrame
    to insert multiple items at once.

    Parameters
    ----------
    physical_count_id : int | None, optional
        ID of the physical count this item belongs to. By default None.
    material_description : str | None, optional
        Description of the material. Must exist in inv_materials table. By default None.
    counted_quantity : float | None, optional
        Counted quantity. Must be >= 0. By default None.
    counted_by_name : str | None, optional
        Name of the user who counted. Must exist in users table. By default None.
    data_df : pl.DataFrame | None, optional
        Polars DataFrame containing multiple items to insert.
        Required columns: physical_count_id, material_description.
        Optional: counted_quantity, counted_by_name, system_quantity.
        If this is used, all individual parameters will be ignored. By default None.

    Returns
    -------
    int | list[int] | None
        If inserting a single item, returns the ID.
        If inserting multiple, returns a list of IDs.
        Returns None if nothing was inserted.
    """
    df_schema = {
        "physical_count_id": pl.Int64,
        "material_description": pl.Utf8,
        "counted_quantity": pl.Float64,
        "counted_by_name": pl.Utf8,
    }

    if data_df is None:
        single_insert = True
        data_df = pl.DataFrame(
            {
                "physical_count_id": [physical_count_id],
                "material_description": [material_description],
                "counted_quantity": [counted_quantity],
                "counted_by_name": [counted_by_name],
            },
            schema=df_schema,
        )
    else:
        single_insert = False

    required_cols = ["physical_count_id", "material_description", "counted_quantity", "counted_by_name"]
    for col in required_cols:
        if col not in data_df.columns:
            raise ValueError(f"data_df is missing required column '{col}'.")
        if len(data_df.filter(pl.col(col).is_not_null())) == 0:
            raise ValueError(f"data_df column '{col}' cannot be all nulls.")

    # resolve material_description to material_id
    materials_df = self._perfdb.inventory.materials.get(output_type="pl.DataFrame")
    mat_desc_map = dict(
        zip(materials_df["description"].to_list(), materials_df["id"].to_list(), strict=False),
    )

    if wrong_mats := set(data_df["material_description"].drop_nulls().to_list()) - set(mat_desc_map.keys()):
        raise ValueError(f"Material descriptions not found in the database: {wrong_mats}")

    data_df = data_df.with_columns(
        pl.col("material_description").replace_strict(mat_desc_map, return_dtype=pl.Int64).alias("material_id"),
    )

    # resolve counted_by_name to counted_by_id
    if "counted_by_name" in data_df.columns and len(data_df.filter(pl.col("counted_by_name").is_not_null())) > 0:
        user_names = data_df["counted_by_name"].drop_nulls().unique().to_list()
        user_ids = self._perfdb.users.instances.get_ids(names=user_names)
        if wrong_users := set(user_names) - set(user_ids.keys()):
            raise ValueError(f"User names not found in the database: {wrong_users}")

        data_df = data_df.with_columns(
            pl.col("counted_by_name").replace_strict(user_ids, return_dtype=pl.Int64, default=None).alias("counted_by_id"),
        )
    else:
        data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("counted_by_id"))

    data_df = data_df.drop(["material_description", "counted_by_name"])

    ids_df = self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="inv_physical_count_items",
        schema="performance",
        conflict_cols=["physical_count_id", "material_id"],
        return_cols=["id"],
        if_exists="skip_row_check",
        ignore_null_cols=single_insert,
    )

    ids = ids_df["id"].to_list()
    logger.debug(f"Inserted/updated {len(ids)} physical count item(s) with IDs: {ids}")

    return ids if not single_insert else ids[0] if ids else None

update(item_id=None, counted_quantity=None, counted_by_name=None, system_quantity=None, data_df=None)

Updates an existing inventory physical count item in the database.

Parameters:

  • item_id

    (int | None, default: None ) –

    ID of the item to be updated. Required for single updates. By default None.

  • counted_quantity

    (float | None, default: None ) –

    New counted quantity. Must be >= 0. By default None.

  • counted_by_name

    (str | None, default: None ) –

    Name of the user who performed the count. Must exist in users table. By default None.

  • system_quantity

    (float | None, default: None ) –

    New system quantity. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    Polars DataFrame containing multiple items to update. Required column: id. Optional: counted_quantity, counted_by_name, system_quantity. If this is used, all individual parameters will be ignored. By default None.

Source code in echo_postgres/inventory_physical_count_items.py
@validate_call
def update(
    self,
    item_id: int | None = None,
    counted_quantity: float | None = None,
    counted_by_name: str | None = None,
    system_quantity: float | None = None,
    data_df: pl.DataFrame | None = None,
) -> None:
    """Updates an existing inventory physical count item in the database.

    Parameters
    ----------
    item_id : int | None, optional
        ID of the item to be updated. Required for single updates. By default None.
    counted_quantity : float | None, optional
        New counted quantity. Must be >= 0. By default None.
    counted_by_name : str | None, optional
        Name of the user who performed the count. Must exist in users table. By default None.
    system_quantity : float | None, optional
        New system quantity. By default None.
    data_df : pl.DataFrame | None, optional
        Polars DataFrame containing multiple items to update.
        Required column: id. Optional: counted_quantity, counted_by_name, system_quantity.
        If this is used, all individual parameters will be ignored. By default None.
    """
    df_schema = {
        "id": pl.Int64,
        "counted_quantity": pl.Float64,
        "counted_by_name": pl.Utf8,
        "system_quantity": pl.Float64,
    }

    if data_df is None:
        data_df = pl.DataFrame(
            {
                "id": [item_id],
                "counted_quantity": [counted_quantity],
                "counted_by_name": [counted_by_name],
                "system_quantity": [system_quantity],
            },
            schema=df_schema,
        )
        single_update = True
    else:
        single_update = False

    if "id" not in data_df.columns:
        raise ValueError("data_df is missing required column 'id'.")

    # check if IDs exist
    existing_query = sql.SQL("SELECT id FROM performance.inv_physical_count_items WHERE id = ANY({ids})").format(
        ids=sql.Literal(data_df["id"].to_list()),
    )
    with self._perfdb.conn.reconnect() as conn:
        existing_df = conn.read_to_polars(existing_query)
    if wrong_ids := set(data_df["id"].to_list()) - set(existing_df["id"].to_list()):
        raise ValueError(f"Physical count item IDs {wrong_ids} do not exist in the database.")

    # resolve counted_by_name to counted_by_id
    if "counted_by_name" in data_df.columns and len(data_df.filter(pl.col("counted_by_name").is_not_null())) > 0:
        user_names = data_df["counted_by_name"].drop_nulls().unique().to_list()
        user_ids = self._perfdb.users.instances.get_ids(names=user_names)
        if wrong_users := set(user_names) - set(user_ids.keys()):
            raise ValueError(f"User names not found in the database: {wrong_users}")

        data_df = data_df.with_columns(
            pl.col("counted_by_name").replace_strict(user_ids, return_dtype=pl.Int64, default=None).alias("counted_by_id"),
        )
    else:
        data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("counted_by_id"))

    if "counted_by_name" in data_df.columns:
        data_df = data_df.drop(["counted_by_name"])

    self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="inv_physical_count_items",
        schema="performance",
        conflict_cols=["id"],
        if_exists="update_only",
        ignore_null_cols=single_update,
    )

    logger.debug(f"Updated {len(data_df)} physical count item(s).")