Skip to content

Inventory Withdrawals

InventoryWithdrawals(perfdb)

Class used for handling Inventory Withdrawals. Can be accessed via perfdb.inventory.withdrawals.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/inventory_withdrawals.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Class used for handling Inventory Withdrawals. Can be accessed via `perfdb.inventory.withdrawals`.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.
    """
    super().__init__(perfdb)

    from .inventory_withdrawal_items import InventoryWithdrawalItems
    from .inventory_withdrawal_service_orders import InventoryWithdrawalServiceOrders

    # * subclasses

    self.items = InventoryWithdrawalItems(perfdb)
    self.service_orders = InventoryWithdrawalServiceOrders(perfdb)

delete(withdrawal_ids)

Deletes inventory withdrawals from the database.

Parameters:

  • withdrawal_ids

    (list[int]) –

    List of withdrawal IDs to be deleted.

Returns:

  • int

    Number of rows deleted.

Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def delete(self, withdrawal_ids: list[int]) -> int:
    """Deletes inventory withdrawals from the database.

    Parameters
    ----------
    withdrawal_ids : list[int]
        List of withdrawal IDs to be deleted.

    Returns
    -------
    int
        Number of rows deleted.
    """
    query = sql.SQL("DELETE FROM performance.inv_withdrawals WHERE id = ANY({ids})").format(
        ids=sql.Literal(withdrawal_ids),
    )

    self._perfdb.conn.execute(query)
    deleted = self._perfdb.conn.rowcount

    logger.debug(f"Deleted {deleted} withdrawal(s).")
    return deleted

get(ids=None, center_names=None, storage_location_names=None, service_order_names=None, statuses=None, period=None, sla_period=None, finished_period=None, sla_violated=None, creator_names=None, created_by_ids=None, modifier_names=None, modified_by_ids=None, filter_type='and', output_type='pl.DataFrame')

Gets all inventory withdrawals and their attributes.

The most useful keys/columns returned are:

  • id
  • service_order_ids
  • service_order_names
  • service_order_sap_ids
  • storage_location_name
  • center_name
  • withdrawal_date
  • sla_deadline_date
  • finished_date
  • status
  • is_sla_violated
  • creator_name
  • modifier_name
  • display_label

Parameters:

  • ids

    (list[int] | None, default: None ) –

    List of withdrawal IDs to filter. By default None.

  • center_names

    (list[str] | None, default: None ) –

    List of center names to filter. By default None.

  • storage_location_names

    (list[str] | None, default: None ) –

    List of storage location names to filter. By default None.

  • service_order_names

    (list[str] | None, default: None ) –

    List of service order names to filter. By default None.

  • statuses

    (list[str] | None, default: None ) –

    List of statuses to filter. By default None.

  • period

    (DateTimeRange | None, default: None ) –

    Date range to filter by withdrawal_date. By default None.

  • sla_period

    (DateTimeRange | None, default: None ) –

    Date range to filter by sla_deadline_date. By default None.

  • finished_period

    (DateTimeRange | None, default: None ) –

    Date range to filter by finished_date. By default None.

  • sla_violated

    (bool | None, default: None ) –

    Filter by whether the SLA has been violated (is_sla_violated column). By default None.

  • creator_names

    (list[str] | None, default: None ) –

    List of creator names to filter. By default None.

  • created_by_ids

    (list[int] | None, default: None ) –

    List of created_by user IDs to filter. By default None.

  • modifier_names

    (list[str] | None, default: None ) –

    List of modifier names to filter. By default None.

  • modified_by_ids

    (list[int] | None, default: None ) –

    List of modified_by user IDs to filter. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

  • output_type

    (Literal['dict', 'DataFrame', 'pl.DataFrame'], default: 'pl.DataFrame' ) –

    Output type of the data. By default "pl.DataFrame".

Returns:

  • dict[int, dict[str, Any]]

    In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.

  • DataFrame

    In case output_type is "DataFrame", returns a pandas DataFrame with index = id.

  • DataFrame

    In case output_type is "pl.DataFrame", returns a Polars DataFrame.

Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def get(
    self,
    ids: list[int] | None = None,
    center_names: list[str] | None = None,
    storage_location_names: list[str] | None = None,
    service_order_names: list[str] | None = None,
    statuses: list[str] | None = None,
    period: DateTimeRange | None = None,
    sla_period: DateTimeRange | None = None,
    finished_period: DateTimeRange | None = None,
    sla_violated: bool | None = None,
    creator_names: list[str] | None = None,
    created_by_ids: list[int] | None = None,
    modifier_names: list[str] | None = None,
    modified_by_ids: list[int] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[int, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
    """Gets all inventory withdrawals and their attributes.

    The most useful keys/columns returned are:

    - id
    - service_order_ids
    - service_order_names
    - service_order_sap_ids
    - storage_location_name
    - center_name
    - withdrawal_date
    - sla_deadline_date
    - finished_date
    - status
    - is_sla_violated
    - creator_name
    - modifier_name
    - display_label

    Parameters
    ----------
    ids : list[int] | None, optional
        List of withdrawal IDs to filter. By default None.
    center_names : list[str] | None, optional
        List of center names to filter. By default None.
    storage_location_names : list[str] | None, optional
        List of storage location names to filter. By default None.
    service_order_names : list[str] | None, optional
        List of service order names to filter. By default None.
    statuses : list[str] | None, optional
        List of statuses to filter. By default None.
    period : DateTimeRange | None, optional
        Date range to filter by withdrawal_date. By default None.
    sla_period : DateTimeRange | None, optional
        Date range to filter by sla_deadline_date. By default None.
    finished_period : DateTimeRange | None, optional
        Date range to filter by finished_date. By default None.
    sla_violated : bool | None, optional
        Filter by whether the SLA has been violated (is_sla_violated column). By default None.
    creator_names : list[str] | None, optional
        List of creator names to filter. By default None.
    created_by_ids : list[int] | None, optional
        List of created_by user IDs to filter. By default None.
    modifier_names : list[str] | None, optional
        List of modifier names to filter. By default None.
    modified_by_ids : list[int] | None, optional
        List of modified_by user IDs to filter. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".
    output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
        Output type of the data. By default "pl.DataFrame".

    Returns
    -------
    dict[int, dict[str, Any]]
        In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
    pd.DataFrame
        In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
    pl.DataFrame
        In case output_type is "pl.DataFrame", returns a Polars DataFrame.
    """
    where = self._check_get_args(
        ids=ids,
        center_names=center_names,
        storage_location_names=storage_location_names,
        service_order_names=service_order_names,
        statuses=statuses,
        period=period,
        sla_period=sla_period,
        finished_period=finished_period,
        sla_violated=sla_violated,
        creator_names=creator_names,
        created_by_ids=created_by_ids,
        modifier_names=modifier_names,
        modified_by_ids=modified_by_ids,
        filter_type=filter_type,
    )

    query = sql.SQL("SELECT * FROM performance.v_inv_withdrawals {where} ORDER BY id").format(where=where)

    df = self._perfdb.conn.read_to_polars(query, schema_overrides=self._cols_schema)

    if output_type == "pl.DataFrame":
        return df

    df = df.to_pandas(use_pyarrow_extension_array=True)
    df = df.set_index("id")

    if output_type == "DataFrame":
        return df

    return df.to_dict(orient="index")

get_ids(ids=None, center_names=None, storage_location_names=None, service_order_names=None, statuses=None, period=None, sla_period=None, finished_period=None, sla_violated=None, creator_names=None, created_by_ids=None, modifier_names=None, modified_by_ids=None, filter_type='and')

Gets all inventory withdrawal IDs matching the provided filters.

Parameters:

  • ids

    (list[int] | None, default: None ) –

    List of withdrawal IDs to filter. By default None.

  • center_names

    (list[str] | None, default: None ) –

    List of center names to filter. By default None.

  • storage_location_names

    (list[str] | None, default: None ) –

    List of storage location names to filter. By default None.

  • service_order_names

    (list[str] | None, default: None ) –

    List of service order names to filter. By default None.

  • statuses

    (list[str] | None, default: None ) –

    List of statuses to filter. By default None.

  • period

    (DateTimeRange | None, default: None ) –

    Date range to filter by withdrawal_date. By default None.

  • sla_period

    (DateTimeRange | None, default: None ) –

    Date range to filter by sla_deadline_date. By default None.

  • finished_period

    (DateTimeRange | None, default: None ) –

    Date range to filter by finished_date. By default None.

  • sla_violated

    (bool | None, default: None ) –

    Filter by whether the SLA has been violated (is_sla_violated column). By default None.

  • creator_names

    (list[str] | None, default: None ) –

    List of creator names to filter. By default None.

  • created_by_ids

    (list[int] | None, default: None ) –

    List of created_by user IDs to filter. By default None.

  • modifier_names

    (list[str] | None, default: None ) –

    List of modifier names to filter. By default None.

  • modified_by_ids

    (list[int] | None, default: None ) –

    List of modified_by user IDs to filter. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

Returns:

  • list[int]

    List of withdrawal IDs matching the filters.

Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def get_ids(
    self,
    ids: list[int] | None = None,
    center_names: list[str] | None = None,
    storage_location_names: list[str] | None = None,
    service_order_names: list[str] | None = None,
    statuses: list[str] | None = None,
    period: DateTimeRange | None = None,
    sla_period: DateTimeRange | None = None,
    finished_period: DateTimeRange | None = None,
    sla_violated: bool | None = None,
    creator_names: list[str] | None = None,
    created_by_ids: list[int] | None = None,
    modifier_names: list[str] | None = None,
    modified_by_ids: list[int] | None = None,
    filter_type: Literal["and", "or"] = "and",
) -> list[int]:
    """Gets all inventory withdrawal IDs matching the provided filters.

    Parameters
    ----------
    ids : list[int] | None, optional
        List of withdrawal IDs to filter. By default None.
    center_names : list[str] | None, optional
        List of center names to filter. By default None.
    storage_location_names : list[str] | None, optional
        List of storage location names to filter. By default None.
    service_order_names : list[str] | None, optional
        List of service order names to filter. By default None.
    statuses : list[str] | None, optional
        List of statuses to filter. By default None.
    period : DateTimeRange | None, optional
        Date range to filter by withdrawal_date. By default None.
    sla_period : DateTimeRange | None, optional
        Date range to filter by sla_deadline_date. By default None.
    finished_period : DateTimeRange | None, optional
        Date range to filter by finished_date. By default None.
    sla_violated : bool | None, optional
        Filter by whether the SLA has been violated (is_sla_violated column). By default None.
    creator_names : list[str] | None, optional
        List of creator names to filter. By default None.
    created_by_ids : list[int] | None, optional
        List of created_by user IDs to filter. By default None.
    modifier_names : list[str] | None, optional
        List of modifier names to filter. By default None.
    modified_by_ids : list[int] | None, optional
        List of modified_by user IDs to filter. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".

    Returns
    -------
    list[int]
        List of withdrawal IDs matching the filters.
    """
    where = self._check_get_args(
        ids=ids,
        center_names=center_names,
        storage_location_names=storage_location_names,
        service_order_names=service_order_names,
        statuses=statuses,
        period=period,
        sla_period=sla_period,
        finished_period=finished_period,
        sla_violated=sla_violated,
        creator_names=creator_names,
        created_by_ids=created_by_ids,
        modifier_names=modifier_names,
        modified_by_ids=modified_by_ids,
        filter_type=filter_type,
    )

    query = sql.SQL("SELECT id FROM performance.v_inv_withdrawals {where} ORDER BY id").format(where=where)

    df = self._perfdb.conn.read_to_polars(query, schema_overrides=self._cols_schema)

    return df["id"].to_list()

get_stats(center_names=None, storage_location_names=None, service_order_names=None, statuses=None, period=None, sla_period=None, finished_period=None, sla_violated=None, creator_names=None, created_by_ids=None, modifier_names=None, modified_by_ids=None, filter_type='and', top_n=20)

Returns a collection of aggregated statistics about withdrawals for use in dashboards.

All filters are applied consistently across every stat category so only the relevant subset of data is analyzed, keeping queries light.

Parameters:

  • center_names

    (list[str] | None, default: None ) –

    List of center names to filter. By default None.

  • storage_location_names

    (list[str] | None, default: None ) –

    List of storage location names to filter. By default None.

  • service_order_names

    (list[str] | None, default: None ) –

    List of service order names to filter. By default None.

  • statuses

    (list[str] | None, default: None ) –

    List of statuses to filter. Valid values: ABERTA, PARCIALMENTE_APONTADA, APONTADA, CANCELADA. By default None.

  • period

    (DateTimeRange | None, default: None ) –

    Date range to filter by withdrawal_date. By default None.

  • sla_period

    (DateTimeRange | None, default: None ) –

    Date range to filter by sla_deadline_date. By default None.

  • finished_period

    (DateTimeRange | None, default: None ) –

    Date range to filter by finished_date. By default None.

  • sla_violated

    (bool | None, default: None ) –

    Filter by SLA violation status. By default None.

  • creator_names

    (list[str] | None, default: None ) –

    List of creator names to filter. By default None.

  • created_by_ids

    (list[int] | None, default: None ) –

    List of created_by user IDs to filter. By default None.

  • modifier_names

    (list[str] | None, default: None ) –

    List of modifier names to filter. By default None.

  • modified_by_ids

    (list[int] | None, default: None ) –

    List of modified_by user IDs to filter. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. By default "and".

  • top_n

    (int, default: 20 ) –

    Maximum number of rows returned for ranked/top lists (e.g. open_by_creator, top_materials). By default 20.

Returns:

  • dict[str, DataFrame]

    Dictionary with the following keys, each containing a Polars DataFrame:

    • "by_status" — withdrawal count per status. Columns: status, count.

    • "by_center" — totals and open/SLA-violated counts per center. Columns: center_name, total_count, open_count, sla_violated_count.

    • "by_storage_location" — totals and open/SLA-violated counts per storage location. Columns: center_name, storage_location_name, total_count, open_count, sla_violated_count.

    • "by_creator" — per-creator totals, open count, SLA-violated count and average days to close. Columns: creator_name, total_count, open_count, sla_violated_count, avg_days_to_close.

    • "by_date" — daily withdrawal count. Columns: withdrawal_date, count.

    • "open_by_creator" — top_n creators ranked by number of open withdrawals (descending). Columns: creator_name, open_count.

    • "avg_close_time_by_creator" — top_n creators ranked by average days to close (descending). Only considers withdrawals in status APONTADA or CANCELADA with a non-null finished_date. Columns: creator_name, avg_days_to_close, closed_count.

    • "sla_summary" — single-row global SLA summary. Columns: total_count, sla_ok_count, sla_violated_count, sla_violation_rate.

    • "top_materials" — top_n most withdrawn materials by total quantity (descending). Columns: material_description, material_sap_id, base_unit, total_withdrawn_qty, withdrawal_count.

Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def get_stats(
    self,
    center_names: list[str] | None = None,
    storage_location_names: list[str] | None = None,
    service_order_names: list[str] | None = None,
    statuses: list[str] | None = None,
    period: DateTimeRange | None = None,
    sla_period: DateTimeRange | None = None,
    finished_period: DateTimeRange | None = None,
    sla_violated: bool | None = None,
    creator_names: list[str] | None = None,
    created_by_ids: list[int] | None = None,
    modifier_names: list[str] | None = None,
    modified_by_ids: list[int] | None = None,
    filter_type: Literal["and", "or"] = "and",
    top_n: int = 20,
) -> dict[str, pl.DataFrame]:
    """Returns a collection of aggregated statistics about withdrawals for use in dashboards.

    All filters are applied consistently across every stat category so only the relevant subset
    of data is analyzed, keeping queries light.

    Parameters
    ----------
    center_names : list[str] | None, optional
        List of center names to filter. By default None.
    storage_location_names : list[str] | None, optional
        List of storage location names to filter. By default None.
    service_order_names : list[str] | None, optional
        List of service order names to filter. By default None.
    statuses : list[str] | None, optional
        List of statuses to filter. Valid values: ABERTA, PARCIALMENTE_APONTADA, APONTADA, CANCELADA. By default None.
    period : DateTimeRange | None, optional
        Date range to filter by withdrawal_date. By default None.
    sla_period : DateTimeRange | None, optional
        Date range to filter by sla_deadline_date. By default None.
    finished_period : DateTimeRange | None, optional
        Date range to filter by finished_date. By default None.
    sla_violated : bool | None, optional
        Filter by SLA violation status. By default None.
    creator_names : list[str] | None, optional
        List of creator names to filter. By default None.
    created_by_ids : list[int] | None, optional
        List of created_by user IDs to filter. By default None.
    modifier_names : list[str] | None, optional
        List of modifier names to filter. By default None.
    modified_by_ids : list[int] | None, optional
        List of modified_by user IDs to filter. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. By default "and".
    top_n : int, optional
        Maximum number of rows returned for ranked/top lists (e.g. open_by_creator, top_materials).
        By default 20.

    Returns
    -------
    dict[str, pl.DataFrame]
        Dictionary with the following keys, each containing a Polars DataFrame:

        - ``"by_status"`` — withdrawal count per status.
          Columns: status, count.

        - ``"by_center"`` — totals and open/SLA-violated counts per center.
          Columns: center_name, total_count, open_count, sla_violated_count.

        - ``"by_storage_location"`` — totals and open/SLA-violated counts per storage location.
          Columns: center_name, storage_location_name, total_count, open_count, sla_violated_count.

        - ``"by_creator"`` — per-creator totals, open count, SLA-violated count and average days to close.
          Columns: creator_name, total_count, open_count, sla_violated_count, avg_days_to_close.

        - ``"by_date"`` — daily withdrawal count.
          Columns: withdrawal_date, count.

        - ``"open_by_creator"`` — top_n creators ranked by number of open withdrawals (descending).
          Columns: creator_name, open_count.

        - ``"avg_close_time_by_creator"`` — top_n creators ranked by average days to close (descending).
          Only considers withdrawals in status APONTADA or CANCELADA with a non-null finished_date.
          Columns: creator_name, avg_days_to_close, closed_count.

        - ``"sla_summary"`` — single-row global SLA summary.
          Columns: total_count, sla_ok_count, sla_violated_count, sla_violation_rate.

        - ``"top_materials"`` — top_n most withdrawn materials by total quantity (descending).
          Columns: material_description, material_sap_id, base_unit, total_withdrawn_qty, withdrawal_count.
    """
    where = self._check_get_args(
        ids=None,
        center_names=center_names,
        storage_location_names=storage_location_names,
        service_order_names=service_order_names,
        statuses=statuses,
        period=period,
        sla_period=sla_period,
        finished_period=finished_period,
        sla_violated=sla_violated,
        creator_names=creator_names,
        created_by_ids=created_by_ids,
        modifier_names=modifier_names,
        modified_by_ids=modified_by_ids,
        filter_type=filter_type,
    )

    result: dict[str, pl.DataFrame] = {}

    # -- by_status --
    result["by_status"] = self._perfdb.conn.read_to_polars(
        sql.SQL("""
            SELECT status, COUNT(*) AS count
            FROM performance.v_inv_withdrawals
            {where}
            GROUP BY status
            ORDER BY count DESC
        """).format(where=where),
    )

    # -- by_center --
    result["by_center"] = self._perfdb.conn.read_to_polars(
        sql.SQL("""
            SELECT
                center_name,
                COUNT(*) AS total_count,
                SUM(CASE WHEN status = 'ABERTA' THEN 1 ELSE 0 END) AS open_count,
                SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violated_count
            FROM performance.v_inv_withdrawals
            {where}
            GROUP BY center_name
            ORDER BY total_count DESC
        """).format(where=where),
    )

    # -- by_storage_location --
    result["by_storage_location"] = self._perfdb.conn.read_to_polars(
        sql.SQL("""
            SELECT
                center_name,
                storage_location_name,
                COUNT(*) AS total_count,
                SUM(CASE WHEN status = 'ABERTA' THEN 1 ELSE 0 END) AS open_count,
                SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violated_count
            FROM performance.v_inv_withdrawals
            {where}
            GROUP BY center_name, storage_location_name
            ORDER BY total_count DESC
        """).format(where=where),
    )

    # -- by_creator --
    result["by_creator"] = self._perfdb.conn.read_to_polars(
        sql.SQL("""
            SELECT
                creator_name,
                COUNT(*) AS total_count,
                SUM(CASE WHEN status = 'ABERTA' THEN 1 ELSE 0 END) AS open_count,
                SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violated_count,
                ROUND(
                    AVG(
                        CASE
                            WHEN status = 'APONTADA' AND finished_date IS NOT NULL
                            THEN EXTRACT(EPOCH FROM (finished_date - created_date)) / 86400
                            ELSE NULL
                        END
                    ),
                    2
                ) AS avg_days_to_close
            FROM performance.v_inv_withdrawals
            {where}
            GROUP BY creator_name
            ORDER BY total_count DESC
        """).format(where=where),
    )

    # -- by_date --
    result["by_date"] = self._perfdb.conn.read_to_polars(
        sql.SQL("""
            SELECT withdrawal_date, COUNT(*) AS count
            FROM performance.v_inv_withdrawals
            {where}
            GROUP BY withdrawal_date
            ORDER BY withdrawal_date
        """).format(where=where),
    )

    # -- open_by_creator (top_n) --
    result["open_by_creator"] = self._perfdb.conn.read_to_polars(
        sql.SQL("""
            SELECT
                creator_name,
                SUM(CASE WHEN status = 'ABERTA' THEN 1 ELSE 0 END) AS open_count
            FROM performance.v_inv_withdrawals
            {where}
            GROUP BY creator_name
            ORDER BY open_count DESC
            LIMIT {top_n}
        """).format(where=where, top_n=sql.Literal(top_n)),
    )

    # -- avg_close_time_by_creator (top_n) --
    # Build an additional WHERE clause that restricts to closed/cancelled rows.
    # We always AND this on top of the existing WHERE regardless of filter_type.
    closed_filter = sql.SQL(
        "status = 'APONTADA' AND finished_date IS NOT NULL",
    )
    where_closed = sql.SQL("WHERE ") + closed_filter if where == sql.SQL("") else where + sql.SQL(" AND ") + closed_filter

    result["avg_close_time_by_creator"] = self._perfdb.conn.read_to_polars(
        sql.SQL("""
            SELECT
                creator_name,
                ROUND(
                    AVG(EXTRACT(EPOCH FROM (finished_date - created_date)) / 86400)::numeric,
                    2
                ) AS avg_days_to_close,
                COUNT(*) AS closed_count
            FROM performance.v_inv_withdrawals
            {where_closed}
            GROUP BY creator_name
            ORDER BY avg_days_to_close DESC
            LIMIT {top_n}
        """).format(where_closed=where_closed, top_n=sql.Literal(top_n)),
    )

    # -- sla_summary --
    result["sla_summary"] = self._perfdb.conn.read_to_polars(
        sql.SQL("""
            SELECT
                COUNT(*) AS total_count,
                SUM(CASE WHEN NOT is_sla_violated THEN 1 ELSE 0 END) AS sla_ok_count,
                SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END) AS sla_violated_count,
                ROUND(
                    SUM(CASE WHEN is_sla_violated THEN 1 ELSE 0 END)::numeric / NULLIF(COUNT(*), 0) * 100,
                    2
                ) AS sla_violation_rate
            FROM performance.v_inv_withdrawals
            {where}
        """).format(where=where),
    )

    # -- top_materials (top_n, uses v_inv_withdrawal_items) --
    # Build a simplified WHERE for the withdrawal_items view (different column names).
    # Supported overlay filters: center_names, storage_location_names, creator_names,
    # withdrawal_ids (resolved from main where above via IDs), withdrawal_statuses, period.
    wi_conditions = []
    if center_names:
        wi_conditions.append(
            sql.SQL("center_name = ANY({v})").format(v=sql.Literal(center_names)),
        )
    if storage_location_names:
        wi_conditions.append(
            sql.SQL("storage_location_name = ANY({v})").format(v=sql.Literal(storage_location_names)),
        )
    if creator_names:
        wi_conditions.append(
            sql.SQL("creator_name = ANY({v})").format(v=sql.Literal(creator_names)),
        )
    if statuses:
        wi_conditions.append(
            sql.SQL("withdrawal_status = ANY({v})").format(v=sql.Literal(statuses)),
        )
    if period:
        wi_conditions.append(
            sql.SQL("(withdrawal_date >= {start} AND withdrawal_date <= {end})").format(
                start=sql.Literal(period.start.date().isoformat()),
                end=sql.Literal(period.end.date().isoformat()),
            ),
        )
    where_wi = sql.SQL("WHERE ") + sql.SQL(f" {filter_type.upper()} ").join(wi_conditions) if wi_conditions else sql.SQL("")

    result["top_materials"] = self._perfdb.conn.read_to_polars(
        sql.SQL("""
            SELECT
                material_description,
                material_sap_id,
                base_unit,
                SUM(withdrawn_quantity) AS total_withdrawn_qty,
                COUNT(DISTINCT withdrawal_id) AS withdrawal_count
            FROM performance.v_inv_withdrawal_items
            {where_wi}
            GROUP BY material_description, material_sap_id, base_unit
            ORDER BY total_withdrawn_qty DESC
            LIMIT {top_n}
        """).format(where_wi=where_wi, top_n=sql.Literal(top_n)),
    )

    logger.debug(
        f"get_stats() returned {len(result)} stat categories for withdrawals.",
    )
    return result

insert(service_order_names=None, storage_location_name=None, center_name=None, withdrawal_date=None, status='ABERTA', created_by_name=None, data_df=None)

Inserts a new inventory withdrawal into the database and links its service orders.

You can either pass individual values to insert a single withdrawal, or pass a DataFrame to insert multiple withdrawals at once.

Parameters:

  • service_order_names

    (list[str] | None, default: None ) –

    Names of the service orders. Must exist in service_orders table. By default None.

  • storage_location_name

    (str | None, default: None ) –

    Name of the storage location. By default None.

  • center_name

    (str | None, default: None ) –

    Name of the center (used together with storage_location_name to resolve storage location ID). By default None.

  • withdrawal_date

    (date | None, default: None ) –

    Date of the withdrawal. By default None.

  • status

    (str, default: 'ABERTA' ) –

    Status of the withdrawal. Valid values: ABERTA, PARCIALMENTE_APONTADA, APONTADA, CANCELADA. By default "ABERTA".

  • created_by_name

    (str | None, default: None ) –

    Name of the user creating the withdrawal. Must exist in users table. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    Polars DataFrame containing multiple withdrawals to insert. Required columns: service_order_names, storage_location_name, center_name, withdrawal_date, created_by_name. Optional: status. If this is used, all individual parameters will be ignored. By default None.

Returns:

  • int | list[int] | None

    If inserting a single withdrawal, returns the ID. If inserting multiple, returns a list of IDs. Returns None if nothing was inserted.

Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def insert(
    self,
    service_order_names: list[str] | None = None,
    storage_location_name: str | None = None,
    center_name: str | None = None,
    withdrawal_date: date | None = None,
    status: str = "ABERTA",
    created_by_name: str | None = None,
    data_df: pl.DataFrame | None = None,
) -> int | list[int] | None:
    """Inserts a new inventory withdrawal into the database and links its service orders.

    You can either pass individual values to insert a single withdrawal, or pass a DataFrame
    to insert multiple withdrawals at once.

    Parameters
    ----------
    service_order_names : list[str] | None, optional
        Names of the service orders. Must exist in service_orders table. By default None.
    storage_location_name : str | None, optional
        Name of the storage location. By default None.
    center_name : str | None, optional
        Name of the center (used together with storage_location_name to resolve storage location ID). By default None.
    withdrawal_date : date | None, optional
        Date of the withdrawal. By default None.
    status : str, optional
        Status of the withdrawal. Valid values: ABERTA, PARCIALMENTE_APONTADA, APONTADA, CANCELADA. By default "ABERTA".
    created_by_name : str | None, optional
        Name of the user creating the withdrawal. Must exist in users table. By default None.
    data_df : pl.DataFrame | None, optional
        Polars DataFrame containing multiple withdrawals to insert.
        Required columns: service_order_names, storage_location_name, center_name, withdrawal_date, created_by_name.
        Optional: status.
        If this is used, all individual parameters will be ignored. By default None.

    Returns
    -------
    int | list[int] | None
        If inserting a single withdrawal, returns the ID.
        If inserting multiple, returns a list of IDs.
        Returns None if nothing was inserted.
    """
    df_schema = {
        "service_order_names": pl.List(pl.Utf8),  # Updated to List type
        "storage_location_name": pl.Utf8,
        "center_name": pl.Utf8,
        "withdrawal_date": pl.Date,
        "status": pl.Utf8,
        "created_by_name": pl.Utf8,
    }

    if data_df is None:
        single_insert = True
        data_df = pl.DataFrame(
            {
                "service_order_names": [service_order_names],
                "storage_location_name": [storage_location_name],
                "center_name": [center_name],
                "withdrawal_date": [withdrawal_date],
                "status": [status],
                "created_by_name": [created_by_name],
            },
            schema=df_schema,
        )
    else:
        single_insert = False
        # converting to expected schema
        data_df = data_df.cast({col: dtype for col, dtype in df_schema.items() if col in data_df.columns})

    # Check required columns
    required_cols = ["storage_location_name", "center_name", "withdrawal_date", "created_by_name", "service_order_names"]
    for col in required_cols:
        if col not in data_df.columns:
            raise ValueError(f"data_df is missing required column '{col}'.")

        if len(data_df.filter(pl.col(col).is_null())) > 0:
            raise ValueError(f"data_df column '{col}' contains null values, which are not allowed.")

    # Resolve storage_location_id
    sl_ids_nested = self._perfdb.inventory.storage_locations.get_ids()
    sl_ids_flat = {(c, loc): loc_id for c, locs in sl_ids_nested.items() for loc, loc_id in locs.items()}

    pairs = list(zip(data_df["center_name"].to_list(), data_df["storage_location_name"].to_list(), strict=False))
    if wrong_sls := set(pairs) - set(sl_ids_flat.keys()):
        raise ValueError(f"Storage locations not found in the database: {wrong_sls}")

    data_df = data_df.with_columns(
        pl.Series("storage_location_id", [sl_ids_flat[(c, loc)] for c, loc in pairs], dtype=pl.Int64),
    )

    # Resolve created_by_name to created_by_id
    user_names = data_df["created_by_name"].drop_nulls().unique().to_list()
    if user_names:
        user_ids = self._perfdb.users.instances.get_ids(names=user_names)
        if wrong_users := set(user_names) - set(user_ids.keys()):
            raise ValueError(f"User names not found in the database: {wrong_users}")

        data_df = data_df.with_columns(
            pl.col("created_by_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("created_by_id"),
        )
    else:
        data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("created_by_id"))

    # 1. Extract the Service Order Names list BEFORE dropping it from the DataFrame
    # This keeps the exact order aligned with the dataframe rows being inserted
    so_names_list = data_df["service_order_names"].to_list()

    # 2. Drop human-readable columns (including service_order_names, as it doesn't belong in the root table anymore)
    data_df = data_df.drop(["service_order_names", "storage_location_name", "center_name", "created_by_name"])

    # 3. Insert into the main inv_withdrawals table
    ids_df = self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="inv_withdrawals",
        schema="performance",
        return_cols=["id"],
        if_exists="skip_row_check",
        ignore_null_cols=single_insert,
    )

    ids = ids_df["id"].to_list()

    # 4. Build the JSON mapping dictionary {withdrawal_id: ["OS-1", "OS-2"]}
    mapping_dict = {}
    for w_id, so_names in zip(ids, so_names_list, strict=False):
        # Only add to mapping if there is an actual list of service orders provided
        if so_names and len(so_names) > 0:
            mapping_dict[w_id] = so_names  # noqa: PERF403

    # 5. Delegate the relationship linking to the Service Orders subclass
    if mapping_dict:
        logger.debug(f"Linking service orders for {len(mapping_dict)} new withdrawal(s)...")
        self.service_orders.insert(mapping_dict)

    logger.debug(f"Inserted/updated {len(ids)} withdrawal(s) with IDs: {ids}")

    return ids if not single_insert else ids[0] if ids else None

update(withdrawal_id=None, status=None, withdrawal_date=None, modified_by_name=None, data_df=None)

Updates an existing inventory withdrawal in the database.

Parameters:

  • withdrawal_id

    (int | None, default: None ) –

    ID of the withdrawal to be updated. Required for single updates. By default None.

  • status

    (str | None, default: None ) –

    New status. Valid values: ABERTA, PARCIALMENTE_APONTADA, APONTADA, CANCELADA. By default None.

  • withdrawal_date

    (date | None, default: None ) –

    New withdrawal date. By default None.

  • modified_by_name

    (str | None, default: None ) –

    Name of the user performing the update. Must exist in users table. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    Polars DataFrame containing multiple withdrawals to update. Required column: id. Optional: status, withdrawal_date, modified_by_name. If this is used, all individual parameters will be ignored. By default None.

Source code in echo_postgres/inventory_withdrawals.py
@validate_call
def update(
    self,
    withdrawal_id: int | None = None,
    status: str | None = None,
    withdrawal_date: date | None = None,
    modified_by_name: str | None = None,
    data_df: pl.DataFrame | None = None,
) -> None:
    """Updates an existing inventory withdrawal in the database.

    Parameters
    ----------
    withdrawal_id : int | None, optional
        ID of the withdrawal to be updated. Required for single updates. By default None.
    status : str | None, optional
        New status. Valid values: ABERTA, PARCIALMENTE_APONTADA, APONTADA, CANCELADA. By default None.
    withdrawal_date : date | None, optional
        New withdrawal date. By default None.
    modified_by_name : str | None, optional
        Name of the user performing the update. Must exist in users table. By default None.
    data_df : pl.DataFrame | None, optional
        Polars DataFrame containing multiple withdrawals to update.
        Required column: id. Optional: status, withdrawal_date, modified_by_name.
        If this is used, all individual parameters will be ignored. By default None.
    """
    df_schema = {
        "id": pl.Int64,
        "status": pl.Utf8,
        "withdrawal_date": pl.Date,
        "modified_by_name": pl.Utf8,
    }

    if data_df is None:
        data_df = pl.DataFrame(
            {
                "id": [withdrawal_id],
                "status": [status],
                "withdrawal_date": [withdrawal_date],
                "modified_by_name": [modified_by_name],
            },
            schema=df_schema,
        )
        single_update = True
    else:
        single_update = False
        # converting to expected schema
        data_df = data_df.cast({col: dtype for col, dtype in df_schema.items() if col in data_df.columns})

    if "id" not in data_df.columns:
        raise ValueError("data_df is missing required column 'id'.")

    # check if IDs exist
    existing_query = sql.SQL("SELECT id FROM performance.inv_withdrawals WHERE id = ANY({ids})").format(
        ids=sql.Literal(data_df["id"].to_list()),
    )
    existing_df = self._perfdb.conn.read_to_polars(existing_query, schema_overrides={"id": pl.Int64})
    if wrong_ids := set(data_df["id"].to_list()) - set(existing_df["id"].to_list()):
        raise ValueError(f"Withdrawal IDs {wrong_ids} do not exist in the database.")

    # resolve modified_by_name to modified_by_id
    if "modified_by_name" in data_df.columns and len(data_df.filter(pl.col("modified_by_name").is_not_null())) > 0:
        user_names = data_df["modified_by_name"].drop_nulls().unique().to_list()
        user_ids = self._perfdb.users.instances.get_ids(names=user_names)
        if wrong_users := set(user_names) - set(user_ids.keys()):
            raise ValueError(f"User names not found in the database: {wrong_users}")

        data_df = data_df.with_columns(
            pl.col("modified_by_name").replace_strict(user_ids, return_dtype=pl.Int64, default=None).alias("modified_by_id"),
        )
    else:
        data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("modified_by_id"))

    if "modified_by_name" in data_df.columns:
        data_df = data_df.drop(["modified_by_name"])

    self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="inv_withdrawals",
        schema="performance",
        conflict_cols=["id"],
        if_exists="update_only",
        ignore_null_cols=single_update,
    )

    logger.debug(f"Updated {len(data_df)} withdrawal(s).")