Skip to content

Inventory Withdrawals

InventoryWithdrawals(perfdb)

Class used for handling Inventory Withdrawals. Can be accessed via perfdb.inventory.withdrawals.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/inventory_withdrawals.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Class used for handling Inventory Withdrawals. Can be accessed via `perfdb.inventory.withdrawals`.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.
    """
    super().__init__(perfdb)

    from .inventory_withdrawal_items import InventoryWithdrawalItems

    # * subclasses

    self.items = InventoryWithdrawalItems(perfdb)

delete(withdrawal_ids)

Deletes inventory withdrawals from the database.

Parameters:

  • withdrawal_ids

    (list[int]) –

    List of withdrawal IDs to be deleted.

Returns:

  • int

    Number of rows deleted.

Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def delete(self, withdrawal_ids: list[int]) -> int:
    """Deletes inventory withdrawals from the database.

    Parameters
    ----------
    withdrawal_ids : list[int]
        List of withdrawal IDs to be deleted.

    Returns
    -------
    int
        Number of rows deleted.
    """
    query = sql.SQL("DELETE FROM performance.inv_withdrawals WHERE id = ANY({ids})").format(
        ids=sql.Literal(withdrawal_ids),
    )

    with self._perfdb.conn.reconnect() as conn:
        conn.execute(query)
        deleted = conn.rowcount

    logger.debug(f"Deleted {deleted} withdrawal(s).")
    return deleted

get(ids=None, center_names=None, storage_location_names=None, service_order_names=None, statuses=None, period=None, filter_type='and', output_type='pl.DataFrame')

Gets all inventory withdrawals and their attributes.

The most useful keys/columns returned are:

  • id
  • service_order_name
  • service_order_sap_id
  • storage_location_name
  • center_name
  • withdrawal_date
  • status
  • creator_name
  • modifier_name
  • display_label

Parameters:

  • ids

    (list[int] | None, default: None ) –

    List of withdrawal IDs to filter. By default None.

  • center_names

    (list[str] | None, default: None ) –

    List of center names to filter. By default None.

  • storage_location_names

    (list[str] | None, default: None ) –

    List of storage location names to filter. By default None.

  • service_order_names

    (list[str] | None, default: None ) –

    List of service order names to filter. By default None.

  • statuses

    (list[str] | None, default: None ) –

    List of statuses to filter. By default None.

  • period

    (DateTimeRange | None, default: None ) –

    Date range to filter by withdrawal_date. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

  • output_type

    (Literal['dict', 'DataFrame', 'pl.DataFrame'], default: 'pl.DataFrame' ) –

    Output type of the data. By default "pl.DataFrame".

Returns:

  • dict[int, dict[str, Any]]

    In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.

  • DataFrame

    In case output_type is "DataFrame", returns a pandas DataFrame with index = id.

  • DataFrame

    In case output_type is "pl.DataFrame", returns a Polars DataFrame.

Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def get(
    self,
    ids: list[int] | None = None,
    center_names: list[str] | None = None,
    storage_location_names: list[str] | None = None,
    service_order_names: list[str] | None = None,
    statuses: list[str] | None = None,
    period: DateTimeRange | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
    """Gets all inventory withdrawals and their attributes.

    The most useful keys/columns returned are:

    - id
    - service_order_name
    - service_order_sap_id
    - storage_location_name
    - center_name
    - withdrawal_date
    - status
    - creator_name
    - modifier_name
    - display_label

    Parameters
    ----------
    ids : list[int] | None, optional
        List of withdrawal IDs to filter. By default None.
    center_names : list[str] | None, optional
        List of center names to filter. By default None.
    storage_location_names : list[str] | None, optional
        List of storage location names to filter. By default None.
    service_order_names : list[str] | None, optional
        List of service order names to filter. By default None.
    statuses : list[str] | None, optional
        List of statuses to filter. By default None.
    period : DateTimeRange | None, optional
        Date range to filter by withdrawal_date. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".
    output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
        Output type of the data. By default "pl.DataFrame".

    Returns
    -------
    dict[int, dict[str, Any]]
        In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
    pd.DataFrame
        In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
    pl.DataFrame
        In case output_type is "pl.DataFrame", returns a Polars DataFrame.
    """
    where = self._check_get_args(
        ids=ids,
        center_names=center_names,
        storage_location_names=storage_location_names,
        service_order_names=service_order_names,
        statuses=statuses,
        period=period,
        filter_type=filter_type,
    )

    query = sql.SQL("SELECT * FROM performance.v_inv_withdrawals {where} ORDER BY id").format(where=where)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_polars(query)

    if output_type == "pl.DataFrame":
        return df

    df = df.to_pandas(use_pyarrow_extension_array=True)
    df = df.set_index("id")

    if output_type == "DataFrame":
        return df

    return df.to_dict(orient="index")

get_ids(ids=None, center_names=None, storage_location_names=None, service_order_names=None, statuses=None, period=None, filter_type='and')

Gets all inventory withdrawal IDs matching the provided filters.

Parameters:

  • ids

    (list[int] | None, default: None ) –

    List of withdrawal IDs to filter. By default None.

  • center_names

    (list[str] | None, default: None ) –

    List of center names to filter. By default None.

  • storage_location_names

    (list[str] | None, default: None ) –

    List of storage location names to filter. By default None.

  • service_order_names

    (list[str] | None, default: None ) –

    List of service order names to filter. By default None.

  • statuses

    (list[str] | None, default: None ) –

    List of statuses to filter. By default None.

  • period

    (DateTimeRange | None, default: None ) –

    Date range to filter by withdrawal_date. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

Returns:

  • list[int]

    List of withdrawal IDs matching the filters.

Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def get_ids(
    self,
    ids: list[int] | None = None,
    center_names: list[str] | None = None,
    storage_location_names: list[str] | None = None,
    service_order_names: list[str] | None = None,
    statuses: list[str] | None = None,
    period: DateTimeRange | None = None,
    filter_type: Literal["and", "or"] = "and",
) -> list[int]:
    """Gets all inventory withdrawal IDs matching the provided filters.

    Parameters
    ----------
    ids : list[int] | None, optional
        List of withdrawal IDs to filter. By default None.
    center_names : list[str] | None, optional
        List of center names to filter. By default None.
    storage_location_names : list[str] | None, optional
        List of storage location names to filter. By default None.
    service_order_names : list[str] | None, optional
        List of service order names to filter. By default None.
    statuses : list[str] | None, optional
        List of statuses to filter. By default None.
    period : DateTimeRange | None, optional
        Date range to filter by withdrawal_date. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".

    Returns
    -------
    list[int]
        List of withdrawal IDs matching the filters.
    """
    where = self._check_get_args(
        ids=ids,
        center_names=center_names,
        storage_location_names=storage_location_names,
        service_order_names=service_order_names,
        statuses=statuses,
        period=period,
        filter_type=filter_type,
    )

    query = sql.SQL("SELECT id FROM performance.v_inv_withdrawals {where} ORDER BY id").format(where=where)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_polars(query)

    return df["id"].to_list()

insert(service_order_name=None, storage_location_name=None, center_name=None, withdrawal_date=None, status='ABERTA', created_by_name=None, data_df=None)

Inserts a new inventory withdrawal into the database.

You can either pass individual values to insert a single withdrawal, or pass a DataFrame to insert multiple withdrawals at once.

Parameters:

  • service_order_name

    (str | None, default: None ) –

    Name of the service order. Must exist in service_orders table. By default None.

  • storage_location_name

    (str | None, default: None ) –

    Name of the storage location. By default None.

  • center_name

    (str | None, default: None ) –

    Name of the center (used together with storage_location_name to resolve storage location ID). By default None.

  • withdrawal_date

    (date | None, default: None ) –

    Date of the withdrawal. By default None.

  • status

    (str, default: 'ABERTA' ) –

    Status of the withdrawal. Valid values: ABERTA, PARCIALMENTE_APONTADA, APONTADA, CANCELADA. By default "ABERTA".

  • created_by_name

    (str | None, default: None ) –

    Name of the user creating the withdrawal. Must exist in users table. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    Polars DataFrame containing multiple withdrawals to insert. Required columns: service_order_name, storage_location_name, center_name, withdrawal_date, created_by_name. Optional: status. If this is used, all individual parameters will be ignored. By default None.

Returns:

  • int | list[int] | None

    If inserting a single withdrawal, returns the ID. If inserting multiple, returns a list of IDs. Returns None if nothing was inserted.

Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def insert(
    self,
    service_order_name: str | None = None,
    storage_location_name: str | None = None,
    center_name: str | None = None,
    withdrawal_date: date | None = None,
    status: str = "ABERTA",
    created_by_name: str | None = None,
    data_df: pl.DataFrame | None = None,
) -> int | list[int] | None:
    """Inserts a new inventory withdrawal into the database.

    You can either pass individual values to insert a single withdrawal, or pass a DataFrame
    to insert multiple withdrawals at once.

    Parameters
    ----------
    service_order_name : str | None, optional
        Name of the service order. Must exist in service_orders table. By default None.
    storage_location_name : str | None, optional
        Name of the storage location. By default None.
    center_name : str | None, optional
        Name of the center (used together with storage_location_name to resolve storage location ID). By default None.
    withdrawal_date : date | None, optional
        Date of the withdrawal. By default None.
    status : str, optional
        Status of the withdrawal. Valid values: ABERTA, PARCIALMENTE_APONTADA, APONTADA, CANCELADA. By default "ABERTA".
    created_by_name : str | None, optional
        Name of the user creating the withdrawal. Must exist in users table. By default None.
    data_df : pl.DataFrame | None, optional
        Polars DataFrame containing multiple withdrawals to insert.
        Required columns: service_order_name, storage_location_name, center_name, withdrawal_date, created_by_name.
        Optional: status.
        If this is used, all individual parameters will be ignored. By default None.

    Returns
    -------
    int | list[int] | None
        If inserting a single withdrawal, returns the ID.
        If inserting multiple, returns a list of IDs.
        Returns None if nothing was inserted.
    """
    df_schema = {
        "service_order_name": pl.Utf8,
        "storage_location_name": pl.Utf8,
        "center_name": pl.Utf8,
        "withdrawal_date": pl.Date,
        "status": pl.Utf8,
        "created_by_name": pl.Utf8,
    }

    if data_df is None:
        single_insert = True
        data_df = pl.DataFrame(
            {
                "service_order_name": [service_order_name],
                "storage_location_name": [storage_location_name],
                "center_name": [center_name],
                "withdrawal_date": [withdrawal_date],
                "status": [status],
                "created_by_name": [created_by_name],
            },
            schema=df_schema,
        )
    else:
        single_insert = False

    # check required columns
    required_cols = ["storage_location_name", "center_name", "withdrawal_date", "created_by_name", "service_order_name"]
    for col in required_cols:
        if col not in data_df.columns:
            raise ValueError(f"data_df is missing required column '{col}'.")
        if len(data_df.filter(pl.col(col).is_null())) > 0:
            raise ValueError(f"data_df column '{col}' contains null values, which are not allowed.")

    # resolve storage_location_id
    sl_ids_nested = self._perfdb.inventory.storage_locations.get_ids()
    sl_ids_flat = {(c, loc): loc_id for c, locs in sl_ids_nested.items() for loc, loc_id in locs.items()}

    pairs = list(zip(data_df["center_name"].to_list(), data_df["storage_location_name"].to_list(), strict=False))
    if wrong_sls := set(pairs) - set(sl_ids_flat.keys()):
        raise ValueError(f"Storage locations not found in the database: {wrong_sls}")

    data_df = data_df.with_columns(
        pl.Series("storage_location_id", [sl_ids_flat[(c, loc)] for c, loc in pairs], dtype=pl.Int64),
    )

    # resolve service_order_name to service_order_id
    so_names = data_df["service_order_name"].drop_nulls().unique().to_list()
    if so_names:
        so_ids = self._perfdb.service_orders.get_ids(names=so_names)
        if wrong_sos := set(so_names) - set(so_ids.keys()):
            raise ValueError(f"Service order names not found in the database: {wrong_sos}")

        data_df = data_df.with_columns(
            pl.col("service_order_name").replace_strict(so_ids, return_dtype=pl.Int64).alias("service_order_id"),
        )
    else:
        data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("service_order_id"))

    # resolve created_by_name to created_by_id
    user_names = data_df["created_by_name"].drop_nulls().unique().to_list()
    if user_names:
        user_ids = self._perfdb.users.instances.get_ids(names=user_names)
        if wrong_users := set(user_names) - set(user_ids.keys()):
            raise ValueError(f"User names not found in the database: {wrong_users}")

        data_df = data_df.with_columns(
            pl.col("created_by_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("created_by_id"),
        )
    else:
        data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("created_by_id"))

    # drop human-readable columns
    data_df = data_df.drop(["service_order_name", "storage_location_name", "center_name", "created_by_name"])

    ids_df = self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="inv_withdrawals",
        schema="performance",
        return_cols=["id"],
        if_exists="skip_row_check",
        ignore_null_cols=single_insert,
    )

    ids = ids_df["id"].to_list()
    logger.debug(f"Inserted/updated {len(ids)} withdrawal(s) with IDs: {ids}")

    return ids if not single_insert else ids[0] if ids else None

update(withdrawal_id=None, status=None, withdrawal_date=None, modified_by_name=None, data_df=None)

Updates an existing inventory withdrawal in the database.

Parameters:

  • withdrawal_id

    (int | None, default: None ) –

    ID of the withdrawal to be updated. Required for single updates. By default None.

  • status

    (str | None, default: None ) –

    New status. Valid values: ABERTA, PARCIALMENTE_APONTADA, APONTADA, CANCELADA. By default None.

  • withdrawal_date

    (date | None, default: None ) –

    New withdrawal date. By default None.

  • modified_by_name

    (str | None, default: None ) –

    Name of the user performing the update. Must exist in users table. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    Polars DataFrame containing multiple withdrawals to update. Required column: id. Optional: status, withdrawal_date, modified_by_name. If this is used, all individual parameters will be ignored. By default None.

Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def update(
    self,
    withdrawal_id: int | None = None,
    status: str | None = None,
    withdrawal_date: date | None = None,
    modified_by_name: str | None = None,
    data_df: pl.DataFrame | None = None,
) -> None:
    """Updates an existing inventory withdrawal in the database.

    Parameters
    ----------
    withdrawal_id : int | None, optional
        ID of the withdrawal to be updated. Required for single updates. By default None.
    status : str | None, optional
        New status. Valid values: ABERTA, PARCIALMENTE_APONTADA, APONTADA, CANCELADA. By default None.
    withdrawal_date : date | None, optional
        New withdrawal date. By default None.
    modified_by_name : str | None, optional
        Name of the user performing the update. Must exist in users table. By default None.
    data_df : pl.DataFrame | None, optional
        Polars DataFrame containing multiple withdrawals to update.
        Required column: id. Optional: status, withdrawal_date, modified_by_name.
        If this is used, all individual parameters will be ignored. By default None.
    """
    df_schema = {
        "id": pl.Int64,
        "status": pl.Utf8,
        "withdrawal_date": pl.Date,
        "modified_by_name": pl.Utf8,
    }

    if data_df is None:
        data_df = pl.DataFrame(
            {
                "id": [withdrawal_id],
                "status": [status],
                "withdrawal_date": [withdrawal_date],
                "modified_by_name": [modified_by_name],
            },
            schema=df_schema,
        )
        single_update = True
    else:
        single_update = False

    if "id" not in data_df.columns:
        raise ValueError("data_df is missing required column 'id'.")

    # check if IDs exist
    existing_query = sql.SQL("SELECT id FROM performance.inv_withdrawals WHERE id = ANY({ids})").format(
        ids=sql.Literal(data_df["id"].to_list()),
    )
    with self._perfdb.conn.reconnect() as conn:
        existing_df = conn.read_to_polars(existing_query)
    if wrong_ids := set(data_df["id"].to_list()) - set(existing_df["id"].to_list()):
        raise ValueError(f"Withdrawal IDs {wrong_ids} do not exist in the database.")

    # resolve modified_by_name to modified_by_id
    if "modified_by_name" in data_df.columns and len(data_df.filter(pl.col("modified_by_name").is_not_null())) > 0:
        user_names = data_df["modified_by_name"].drop_nulls().unique().to_list()
        user_ids = self._perfdb.users.instances.get_ids(names=user_names)
        if wrong_users := set(user_names) - set(user_ids.keys()):
            raise ValueError(f"User names not found in the database: {wrong_users}")

        data_df = data_df.with_columns(
            pl.col("modified_by_name").replace_strict(user_ids, return_dtype=pl.Int64, default=None).alias("modified_by_id"),
        )
    else:
        data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("modified_by_id"))

    if "modified_by_name" in data_df.columns:
        data_df = data_df.drop(["modified_by_name"])

    self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="inv_withdrawals",
        schema="performance",
        conflict_cols=["id"],
        if_exists="update_only",
        ignore_null_cols=single_update,
    )

    logger.debug(f"Updated {len(data_df)} withdrawal(s).")