Skip to content

Inventory Physical Counts

InventoryPhysicalCounts(perfdb)

Class used for handling Inventory Physical Counts. Can be accessed via perfdb.inventory.physical_counts.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/inventory_physical_counts.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Class used for handling Inventory Physical Counts. Can be accessed via `perfdb.inventory.physical_counts`.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.
    """
    super().__init__(perfdb)

    from .inventory_physical_count_documents import InventoryPhysicalCountDocuments
    from .inventory_physical_count_items import InventoryPhysicalCountItems

    # * subclasses

    self.items = InventoryPhysicalCountItems(perfdb)
    self.documents = InventoryPhysicalCountDocuments(perfdb)

delete(physical_count_ids)

Deletes inventory physical counts from the database.

Parameters:

  • physical_count_ids

    (list[int]) –

    List of physical count IDs to be deleted.

Returns:

  • int

    Number of rows deleted.

Source code in echo_postgres/inventory_physical_counts.py
@validate_call
def delete(self, physical_count_ids: list[int]) -> int:
    """Deletes inventory physical counts from the database.

    Parameters
    ----------
    physical_count_ids : list[int]
        List of physical count IDs to be deleted.

    Returns
    -------
    int
        Number of rows deleted.
    """
    query = sql.SQL("DELETE FROM performance.inv_physical_counts WHERE id = ANY({ids})").format(
        ids=sql.Literal(physical_count_ids),
    )

    with self._perfdb.conn.reconnect() as conn:
        conn.execute(query)
        deleted = conn.rowcount

    logger.debug(f"Deleted {deleted} physical count(s).")
    return deleted

get(ids=None, center_names=None, storage_location_names=None, statuses=None, is_planned=None, period=None, filter_type='and', output_type='pl.DataFrame')

Gets all inventory physical counts and their attributes.

The most useful keys/columns returned are:

  • id
  • description
  • is_planned
  • status
  • planned_date
  • executed_date
  • storage_location_name
  • center_name
  • executor_name
  • creator_name
  • display_label

Parameters:

  • ids

    (list[int] | None, default: None ) –

    List of physical count IDs to filter. By default None.

  • center_names

    (list[str] | None, default: None ) –

    List of center names to filter. By default None.

  • storage_location_names

    (list[str] | None, default: None ) –

    List of storage location names to filter. By default None.

  • statuses

    (list[str] | None, default: None ) –

    List of statuses to filter. By default None.

  • is_planned

    (bool | None, default: None ) –

    Filter by planned status. By default None.

  • period

    (DateTimeRange | None, default: None ) –

    Date range to filter by planned_date or executed_date. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

  • output_type

    (Literal['dict', 'DataFrame', 'pl.DataFrame'], default: 'pl.DataFrame' ) –

    Output type of the data. By default "pl.DataFrame".

Returns:

  • dict[int, dict[str, Any]]

    In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.

  • DataFrame

    In case output_type is "DataFrame", returns a pandas DataFrame with index = id.

  • DataFrame

    In case output_type is "pl.DataFrame", returns a Polars DataFrame.

Source code in echo_postgres/inventory_physical_counts.py
@validate_call
def get(
    self,
    ids: list[int] | None = None,
    center_names: list[str] | None = None,
    storage_location_names: list[str] | None = None,
    statuses: list[str] | None = None,
    is_planned: bool | None = None,
    period: DateTimeRange | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
    """Gets all inventory physical counts and their attributes.

    The most useful keys/columns returned are:

    - id
    - description
    - is_planned
    - status
    - planned_date
    - executed_date
    - storage_location_name
    - center_name
    - executor_name
    - creator_name
    - display_label

    Parameters
    ----------
    ids : list[int] | None, optional
        List of physical count IDs to filter. By default None.
    center_names : list[str] | None, optional
        List of center names to filter. By default None.
    storage_location_names : list[str] | None, optional
        List of storage location names to filter. By default None.
    statuses : list[str] | None, optional
        List of statuses to filter. By default None.
    is_planned : bool | None, optional
        Filter by planned status. By default None.
    period : DateTimeRange | None, optional
        Date range to filter by planned_date or executed_date. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".
    output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
        Output type of the data. By default "pl.DataFrame".

    Returns
    -------
    dict[int, dict[str, Any]]
        In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
    pd.DataFrame
        In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
    pl.DataFrame
        In case output_type is "pl.DataFrame", returns a Polars DataFrame.
    """
    where = self._check_get_args(
        ids=ids,
        center_names=center_names,
        storage_location_names=storage_location_names,
        statuses=statuses,
        is_planned=is_planned,
        period=period,
        filter_type=filter_type,
    )

    query = sql.SQL("SELECT * FROM performance.v_inv_physical_counts {where} ORDER BY id").format(where=where)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_polars(query)

    if output_type == "pl.DataFrame":
        return df

    df = df.to_pandas(use_pyarrow_extension_array=True)
    df = df.set_index("id")

    if output_type == "DataFrame":
        return df

    return df.to_dict(orient="index")

get_ids(ids=None, center_names=None, storage_location_names=None, statuses=None, is_planned=None, period=None, filter_type='and')

Gets all inventory physical count IDs matching the provided filters.

Parameters:

  • ids

    (list[int] | None, default: None ) –

    List of physical count IDs to filter. By default None.

  • center_names

    (list[str] | None, default: None ) –

    List of center names to filter. By default None.

  • storage_location_names

    (list[str] | None, default: None ) –

    List of storage location names to filter. By default None.

  • statuses

    (list[str] | None, default: None ) –

    List of statuses to filter. By default None.

  • is_planned

    (bool | None, default: None ) –

    Filter by planned status. By default None.

  • period

    (DateTimeRange | None, default: None ) –

    Date range to filter by planned_date or executed_date. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

Returns:

  • list[int]

    List of physical count IDs matching the filters.

Source code in echo_postgres/inventory_physical_counts.py
@validate_call
def get_ids(
    self,
    ids: list[int] | None = None,
    center_names: list[str] | None = None,
    storage_location_names: list[str] | None = None,
    statuses: list[str] | None = None,
    is_planned: bool | None = None,
    period: DateTimeRange | None = None,
    filter_type: Literal["and", "or"] = "and",
) -> list[int]:
    """Gets all inventory physical count IDs matching the provided filters.

    Parameters
    ----------
    ids : list[int] | None, optional
        List of physical count IDs to filter. By default None.
    center_names : list[str] | None, optional
        List of center names to filter. By default None.
    storage_location_names : list[str] | None, optional
        List of storage location names to filter. By default None.
    statuses : list[str] | None, optional
        List of statuses to filter. By default None.
    is_planned : bool | None, optional
        Filter by planned status. By default None.
    period : DateTimeRange | None, optional
        Date range to filter by planned_date or executed_date. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".

    Returns
    -------
    list[int]
        List of physical count IDs matching the filters.
    """
    where = self._check_get_args(
        ids=ids,
        center_names=center_names,
        storage_location_names=storage_location_names,
        statuses=statuses,
        is_planned=is_planned,
        period=period,
        filter_type=filter_type,
    )

    query = sql.SQL("SELECT id FROM performance.v_inv_physical_counts {where} ORDER BY id").format(where=where)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_polars(query)

    return df["id"].to_list()

insert(storage_location_name=None, center_name=None, description=None, is_planned=True, status='PROGRAMADA', planned_date=None, created_by_name=None, data_df=None)

Inserts a new inventory physical count into the database.

You can either pass individual values to insert a single physical count, or pass a DataFrame to insert multiple at once.

Parameters:

  • storage_location_name

    (str | None, default: None ) –

    Name of the storage location. By default None.

  • center_name

    (str | None, default: None ) –

    Name of the center (used together with storage_location_name to resolve storage location ID). By default None.

  • description

    (str | None, default: None ) –

    Description of the physical count. By default None.

  • is_planned

    (bool, default: True ) –

    Whether the physical count is planned. By default True.

  • status

    (str, default: 'PROGRAMADA' ) –

    Status. Valid values: PROGRAMADA, INICIADA, CONTADA, ENCERRADA, CANCELADA. By default "PROGRAMADA".

  • planned_date

    (date | None, default: None ) –

    Planned date for the count. By default None.

  • created_by_name

    (str | None, default: None ) –

    Name of the user creating the count. Must exist in users table. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    Polars DataFrame containing multiple physical counts to insert. Required columns: storage_location_name, center_name, description, created_by_name. Optional: is_planned, status, planned_date. If this is used, all individual parameters will be ignored. By default None.

Returns:

  • int | list[int] | None

    If inserting a single count, returns the ID. If inserting multiple, returns a list of IDs. Returns None if nothing was inserted.

Source code in echo_postgres/inventory_physical_counts.py
@validate_call
def insert(
    self,
    storage_location_name: str | None = None,
    center_name: str | None = None,
    description: str | None = None,
    is_planned: bool = True,
    status: str = "PROGRAMADA",
    planned_date: date | None = None,
    created_by_name: str | None = None,
    data_df: pl.DataFrame | None = None,
) -> int | list[int] | None:
    """Inserts a new inventory physical count into the database.

    You can either pass individual values to insert a single physical count, or pass a DataFrame
    to insert multiple at once.

    Parameters
    ----------
    storage_location_name : str | None, optional
        Name of the storage location. By default None.
    center_name : str | None, optional
        Name of the center (used together with storage_location_name to resolve storage location ID). By default None.
    description : str | None, optional
        Description of the physical count. By default None.
    is_planned : bool, optional
        Whether the physical count is planned. By default True.
    status : str, optional
        Status. Valid values: PROGRAMADA, INICIADA, CONTADA, ENCERRADA, CANCELADA. By default "PROGRAMADA".
    planned_date : date | None, optional
        Planned date for the count. By default None.
    created_by_name : str | None, optional
        Name of the user creating the count. Must exist in users table. By default None.
    data_df : pl.DataFrame | None, optional
        Polars DataFrame containing multiple physical counts to insert.
        Required columns: storage_location_name, center_name, description, created_by_name.
        Optional: is_planned, status, planned_date.
        If this is used, all individual parameters will be ignored. By default None.

    Returns
    -------
    int | list[int] | None
        If inserting a single count, returns the ID.
        If inserting multiple, returns a list of IDs.
        Returns None if nothing was inserted.
    """
    df_schema = {
        "storage_location_name": pl.Utf8,
        "center_name": pl.Utf8,
        "description": pl.Utf8,
        "is_planned": pl.Boolean,
        "status": pl.Utf8,
        "planned_date": pl.Date,
        "created_by_name": pl.Utf8,
    }

    if data_df is None:
        single_insert = True
        data_df = pl.DataFrame(
            {
                "storage_location_name": [storage_location_name],
                "center_name": [center_name],
                "description": [description],
                "is_planned": [is_planned],
                "status": [status],
                "planned_date": [planned_date],
                "created_by_name": [created_by_name],
            },
            schema=df_schema,
        )
    else:
        single_insert = False

    required_cols = ["storage_location_name", "center_name", "description", "created_by_name"]
    for col in required_cols:
        if col not in data_df.columns:
            raise ValueError(f"data_df is missing required column '{col}'.")
        if len(data_df.filter(pl.col(col).is_not_null())) == 0:
            raise ValueError(f"data_df column '{col}' cannot be all nulls.")

    # resolve storage_location_id
    sl_ids_nested = self._perfdb.inventory.storage_locations.get_ids()
    sl_ids_flat = {(c, loc): loc_id for c, locs in sl_ids_nested.items() for loc, loc_id in locs.items()}

    pairs = list(zip(data_df["center_name"].to_list(), data_df["storage_location_name"].to_list(), strict=False))
    if wrong_sls := set(pairs) - set(sl_ids_flat.keys()):
        raise ValueError(f"Storage locations not found in the database: {wrong_sls}")

    data_df = data_df.with_columns(
        pl.Series("storage_location_id", [sl_ids_flat[(c, loc)] for c, loc in pairs], dtype=pl.Int64),
    )

    # resolve created_by_name to created_by_id
    user_names = data_df["created_by_name"].drop_nulls().unique().to_list()
    if user_names:
        user_ids = self._perfdb.users.instances.get_ids(names=user_names)
        if wrong_users := set(user_names) - set(user_ids.keys()):
            raise ValueError(f"User names not found in the database: {wrong_users}")

        data_df = data_df.with_columns(
            pl.col("created_by_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("created_by_id"),
        )
    else:
        data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("created_by_id"))

    # drop human-readable columns
    data_df = data_df.drop(["storage_location_name", "center_name", "created_by_name"])

    ids_df = self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="inv_physical_counts",
        schema="performance",
        return_cols=["id"],
        if_exists="skip_row_check",
        ignore_null_cols=single_insert,
    )

    ids = ids_df["id"].to_list()
    logger.debug(f"Inserted/updated {len(ids)} physical count(s) with IDs: {ids}")

    return ids if not single_insert else ids[0] if ids else None

update(physical_count_id=None, description=None, status=None, planned_date=None, executed_date=None, executed_by_name=None, data_df=None)

Updates an existing inventory physical count in the database.

Parameters:

  • physical_count_id

    (int | None, default: None ) –

    ID of the physical count to be updated. Required for single updates. By default None.

  • description

    (str | None, default: None ) –

    New description. By default None.

  • status

    (str | None, default: None ) –

    New status. Valid values: PROGRAMADA, INICIADA, CONTADA, ENCERRADA, CANCELADA. By default None.

  • planned_date

    (date | None, default: None ) –

    New planned date. By default None.

  • executed_date

    (date | None, default: None ) –

    New executed date. By default None.

  • executed_by_name

    (str | None, default: None ) –

    Name of the user who executed the count. Must exist in users table. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    Polars DataFrame containing multiple physical counts to update. Required column: id. Optional: description, status, planned_date, executed_date, executed_by_name. If this is used, all individual parameters will be ignored. By default None.

Source code in echo_postgres/inventory_physical_counts.py
@validate_call
def update(
    self,
    physical_count_id: int | None = None,
    description: str | None = None,
    status: str | None = None,
    planned_date: date | None = None,
    executed_date: date | None = None,
    executed_by_name: str | None = None,
    data_df: pl.DataFrame | None = None,
) -> None:
    """Updates an existing inventory physical count in the database.

    Parameters
    ----------
    physical_count_id : int | None, optional
        ID of the physical count to be updated. Required for single updates. By default None.
    description : str | None, optional
        New description. By default None.
    status : str | None, optional
        New status. Valid values: PROGRAMADA, INICIADA, CONTADA, ENCERRADA, CANCELADA. By default None.
    planned_date : date | None, optional
        New planned date. By default None.
    executed_date : date | None, optional
        New executed date. By default None.
    executed_by_name : str | None, optional
        Name of the user who executed the count. Must exist in users table. By default None.
    data_df : pl.DataFrame | None, optional
        Polars DataFrame containing multiple physical counts to update.
        Required column: id. Optional: description, status, planned_date, executed_date, executed_by_name.
        If this is used, all individual parameters will be ignored. By default None.
    """
    df_schema = {
        "id": pl.Int64,
        "description": pl.Utf8,
        "status": pl.Utf8,
        "planned_date": pl.Date,
        "executed_date": pl.Date,
        "executed_by_name": pl.Utf8,
    }

    if data_df is None:
        data_df = pl.DataFrame(
            {
                "id": [physical_count_id],
                "description": [description],
                "status": [status],
                "planned_date": [planned_date],
                "executed_date": [executed_date],
                "executed_by_name": [executed_by_name],
            },
            schema=df_schema,
        )
        single_update = True
    else:
        single_update = False

    if "id" not in data_df.columns:
        raise ValueError("data_df is missing required column 'id'.")

    # check if IDs exist
    existing_query = sql.SQL("SELECT id FROM performance.inv_physical_counts WHERE id = ANY({ids})").format(
        ids=sql.Literal(data_df["id"].to_list()),
    )
    with self._perfdb.conn.reconnect() as conn:
        existing_df = conn.read_to_polars(existing_query)
    if wrong_ids := set(data_df["id"].to_list()) - set(existing_df["id"].to_list()):
        raise ValueError(f"Physical count IDs {wrong_ids} do not exist in the database.")

    # resolve executed_by_name to executed_by_id
    if "executed_by_name" in data_df.columns and len(data_df.filter(pl.col("executed_by_name").is_not_null())) > 0:
        user_names = data_df["executed_by_name"].drop_nulls().unique().to_list()
        user_ids = self._perfdb.users.instances.get_ids(names=user_names)
        if wrong_users := set(user_names) - set(user_ids.keys()):
            raise ValueError(f"User names not found in the database: {wrong_users}")

        data_df = data_df.with_columns(
            pl.col("executed_by_name").replace_strict(user_ids, return_dtype=pl.Int64, default=None).alias("executed_by_id"),
        )
    else:
        data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("executed_by_id"))

    if "executed_by_name" in data_df.columns:
        data_df = data_df.drop(["executed_by_name"])

    self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="inv_physical_counts",
        schema="performance",
        conflict_cols=["id"],
        if_exists="update_only",
        ignore_null_cols=single_update,
    )

    logger.debug(f"Updated {len(data_df)} physical count(s).")