Skip to content

Inventory Materials

InventoryMaterials(perfdb)

Class used for handling Inventory Materials. Can be accessed via perfdb.inventory.materials.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(material_id)

Deletes an inventory material from the database.

Parameters:

  • material_id

    (int) –

    ID of the material to be deleted.

Returns:

  • int

    Number of rows deleted (0 if no material with the given ID was found).

Source code in echo_postgres/inventory_materials.py
@validate_call
def delete(self, material_id: int) -> int:
    """Deletes an inventory material from the database.

    Parameters
    ----------
    material_id : int
        ID of the material to be deleted.

    Returns
    -------
    int
        Number of rows deleted (0 if no material with the given ID was found).
    """
    query = sql.SQL("DELETE FROM performance.inv_materials WHERE id = {id}").format(
        id=sql.Literal(material_id),
    )

    with self._perfdb.conn.reconnect() as conn:
        conn.execute(query)

        logger.debug(f"Deleted {conn.rowcount} material(s) from table inv_materials with ID {material_id}.")

        return conn.rowcount

get(descriptions=None, sap_ids=None, is_active=None, base_units=None, filter_type='and', output_type='pl.DataFrame')

Gets all inventory materials and their attributes.

The most useful keys/columns returned are:

  • id
  • sap_id
  • description
  • display_name
  • criticality_weight
  • base_unit
  • is_active
  • creator_name
  • modifier_name

Parameters:

  • descriptions

    (list[str] | None, default: None ) –

    List of material descriptions to filter the results. By default None.

  • sap_ids

    (list[int] | None, default: None ) –

    List of SAP IDs to filter the results. By default None.

  • is_active

    (bool | None, default: None ) –

    Filter by active status. By default None.

  • base_units

    (list[str] | None, default: None ) –

    List of base units to filter the results. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and".

  • output_type

    (Literal['dict', 'DataFrame', 'pl.DataFrame'], default: 'pl.DataFrame' ) –

    Output type of the data. Can be one of ["dict", "DataFrame", "pl.DataFrame"]. By default "pl.DataFrame".

Returns:

  • dict[int, dict[str, Any]]

    In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.

  • DataFrame

    In case output_type is "DataFrame", returns a pandas DataFrame with index = id.

  • DataFrame

    In case output_type is "pl.DataFrame", returns a Polars DataFrame.

Source code in echo_postgres/inventory_materials.py
@validate_call
def get(
    self,
    descriptions: list[str] | None = None,
    sap_ids: list[int] | None = None,
    is_active: bool | None = None,
    base_units: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[str, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
    """Gets all inventory materials and their attributes.

    The most useful keys/columns returned are:

    - id
    - sap_id
    - description
    - display_name
    - criticality_weight
    - base_unit
    - is_active
    - creator_name
    - modifier_name

    Parameters
    ----------
    descriptions : list[str] | None, optional
        List of material descriptions to filter the results. By default None.
    sap_ids : list[int] | None, optional
        List of SAP IDs to filter the results. By default None.
    is_active : bool | None, optional
        Filter by active status. By default None.
    base_units : list[str] | None, optional
        List of base units to filter the results. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"]. By default "and".
    output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame", "pl.DataFrame"].
        By default "pl.DataFrame".

    Returns
    -------
    dict[int, dict[str, Any]]
        In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}.
    pd.DataFrame
        In case output_type is "DataFrame", returns a pandas DataFrame with index = id.
    pl.DataFrame
        In case output_type is "pl.DataFrame", returns a Polars DataFrame.
    """
    where = self._check_get_args(
        descriptions=descriptions,
        sap_ids=sap_ids,
        is_active=is_active,
        base_units=base_units,
        filter_type=filter_type,
    )

    query = sql.SQL("SELECT * FROM performance.v_inv_materials {where} ORDER BY description").format(
        where=where,
    )

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_polars(query)

    if output_type == "pl.DataFrame":
        return df

    df = df.to_pandas(use_pyarrow_extension_array=True)
    df = df.set_index("id")

    if output_type == "DataFrame":
        return df

    return df.to_dict(orient="index")

get_ids(descriptions=None, sap_ids=None, is_active=None, base_units=None, filter_type='and')

Gets all inventory materials and their respective ids.

Parameters:

  • descriptions

    (list[str] | None, default: None ) –

    List of material descriptions to filter the results. By default None.

  • sap_ids

    (list[int] | None, default: None ) –

    List of SAP IDs to filter the results. By default None.

  • is_active

    (bool | None, default: None ) –

    Filter by active status. By default None.

  • base_units

    (list[str] | None, default: None ) –

    List of base units to filter the results. By default None.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and".

Returns:

  • dict[str, int]

    Dictionary with all materials and their respective ids in the format {description: id, ...}.

Source code in echo_postgres/inventory_materials.py
@validate_call
def get_ids(
    self,
    descriptions: list[str] | None = None,
    sap_ids: list[int] | None = None,
    is_active: bool | None = None,
    base_units: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
) -> dict[str, int]:
    """Gets all inventory materials and their respective ids.

    Parameters
    ----------
    descriptions : list[str] | None, optional
        List of material descriptions to filter the results. By default None.
    sap_ids : list[int] | None, optional
        List of SAP IDs to filter the results. By default None.
    is_active : bool | None, optional
        Filter by active status. By default None.
    base_units : list[str] | None, optional
        List of base units to filter the results. By default None.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"]. By default "and".

    Returns
    -------
    dict[str, int]
        Dictionary with all materials and their respective ids in the format {description: id, ...}.
    """
    where = self._check_get_args(
        descriptions=descriptions,
        sap_ids=sap_ids,
        is_active=is_active,
        base_units=base_units,
        filter_type=filter_type,
    )

    query = sql.SQL("SELECT description, id FROM performance.v_inv_materials {where} ORDER BY description").format(
        where=where,
    )

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_polars(query)

    return dict(zip(df["description"].to_list(), df["id"].to_list(), strict=False))

insert(description=None, sap_id=None, criticality_weight=1, base_unit=None, is_active=True, created_by_name=None, data_df=None, on_conflict='ignore')

Inserts a new inventory material into the database.

You can either pass individual values to insert a single material, or pass a DataFrame to insert multiple materials at once.

Parameters:

  • description

    (str | None, default: None ) –

    Description of the material. By default None.

  • sap_id

    (int | None, default: None ) –

    SAP ID of the material. Must be unique and > 0. By default None.

  • criticality_weight

    (int, default: 1 ) –

    Criticality weight of the material. Must be > 0. By default 1.

  • base_unit

    (str | None, default: None ) –

    Base unit of measurement for the material. By default None.

  • is_active

    (bool, default: True ) –

    Whether the material is active. By default True.

  • created_by_name

    (str | None, default: None ) –

    Name of the user who created the material. Must exist in the users table. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    Polars DataFrame containing multiple materials to be inserted. Required columns: description, base_unit, created_by_name. Optional: sap_id, criticality_weight, is_active. If this is used, all individual parameters will be ignored. By default None.

  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict (based on sap_id). Can be one of ["ignore", "update"]. By default "ignore".

Returns:

  • int | list[int] | None

    If inserting a single material, returns the ID of the inserted record. If inserting multiple, returns a list of IDs. If no record was inserted (due to conflicts and on_conflict="ignore"), returns None.

Source code in echo_postgres/inventory_materials.py
@validate_call
def insert(
    self,
    description: str | None = None,
    sap_id: int | None = None,
    criticality_weight: int = 1,
    base_unit: str | None = None,
    is_active: bool = True,
    created_by_name: str | None = None,
    data_df: pl.DataFrame | None = None,
    on_conflict: Literal["ignore", "update"] = "ignore",
) -> int | list[int] | None:
    """Inserts a new inventory material into the database.

    You can either pass individual values to insert a single material, or pass a DataFrame
    to insert multiple materials at once.

    Parameters
    ----------
    description : str | None, optional
        Description of the material. By default None.
    sap_id : int | None, optional
        SAP ID of the material. Must be unique and > 0. By default None.
    criticality_weight : int, optional
        Criticality weight of the material. Must be > 0. By default 1.
    base_unit : str | None, optional
        Base unit of measurement for the material. By default None.
    is_active : bool, optional
        Whether the material is active. By default True.
    created_by_name : str | None, optional
        Name of the user who created the material. Must exist in the users table. By default None.
    data_df : pl.DataFrame | None, optional
        Polars DataFrame containing multiple materials to be inserted.
        Required columns: description, base_unit, created_by_name.
        Optional: sap_id, criticality_weight, is_active.
        If this is used, all individual parameters will be ignored. By default None.
    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict (based on sap_id). Can be one of ["ignore", "update"]. By default "ignore".

    Returns
    -------
    int | list[int] | None
        If inserting a single material, returns the ID of the inserted record.
        If inserting multiple, returns a list of IDs.
        If no record was inserted (due to conflicts and on_conflict="ignore"), returns None.
    """
    df_schema = {
        "description": pl.Utf8,
        "sap_id": pl.Int64,
        "criticality_weight": pl.Int64,
        "base_unit": pl.Utf8,
        "is_active": pl.Boolean,
        "created_by_name": pl.Utf8,
    }

    if data_df is None:
        single_insert = True
        data_df = pl.DataFrame(
            {
                "description": [description],
                "sap_id": [sap_id],
                "criticality_weight": [criticality_weight],
                "base_unit": [base_unit],
                "is_active": [is_active],
                "created_by_name": [created_by_name],
            },
            schema=df_schema,
        )
    else:
        single_insert = False

    # validate required columns
    required_cols = {"description", "base_unit", "created_by_name"}
    missing = required_cols - set(data_df.columns)
    if missing:
        raise ValueError(f"data_df is missing required columns: {missing}")

    # resolve created_by_name to created_by_id
    user_names = data_df["created_by_name"].drop_nulls().unique().to_list()
    if user_names:
        user_ids = self._perfdb.users.instances.get_ids(names=user_names)
        if wrong_users := set(user_names) - set(user_ids.keys()):
            raise ValueError(f"User names not found in the database: {wrong_users}")

        data_df = data_df.with_columns(
            pl.col("created_by_name").replace_strict(user_ids, return_dtype=pl.Int64).alias("created_by_id"),
        )
    else:
        data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("created_by_id"))

    data_df = data_df.drop(["created_by_name"])

    ids_df = self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="inv_materials",
        schema="performance",
        conflict_cols=["description"],
        return_cols=["id"],
        if_exists="append" if on_conflict == "ignore" else "update",
        ignore_null_cols=single_insert,
    )

    ids = ids_df["id"].to_list()
    logger.debug(f"Inserted/updated {len(ids)} material(s) with IDs: {ids}")

    return ids if not single_insert else ids[0] if ids else None

update(material_id=None, description=None, sap_id=None, criticality_weight=None, base_unit=None, is_active=None, modified_by_name=None, data_df=None)

Updates an existing inventory material in the database.

You can either pass individual values to update a single material, or pass a DataFrame to update multiple materials at once.

Parameters:

  • material_id

    (int | None, default: None ) –

    ID of the material to be updated. Required for single updates. By default None.

  • description

    (str | None, default: None ) –

    New description of the material. By default None.

  • sap_id

    (int | None, default: None ) –

    New SAP ID of the material. By default None.

  • criticality_weight

    (int | None, default: None ) –

    New criticality weight. By default None.

  • base_unit

    (str | None, default: None ) –

    New base unit. By default None.

  • is_active

    (bool | None, default: None ) –

    New active status. By default None.

  • modified_by_name

    (str | None, default: None ) –

    Name of the user performing the update. Must exist in the users table. By default None.

  • data_df

    (DataFrame | None, default: None ) –

    Polars DataFrame containing multiple materials to be updated. Required column: id. Optional: description, sap_id, criticality_weight, base_unit, is_active, modified_by_name. If this is used, all individual parameters will be ignored. By default None.

Source code in echo_postgres/inventory_materials.py
@validate_call
def update(
    self,
    material_id: int | None = None,
    description: str | None = None,
    sap_id: int | None = None,
    criticality_weight: int | None = None,
    base_unit: str | None = None,
    is_active: bool | None = None,
    modified_by_name: str | None = None,
    data_df: pl.DataFrame | None = None,
) -> None:
    """Updates an existing inventory material in the database.

    You can either pass individual values to update a single material, or pass a DataFrame
    to update multiple materials at once.

    Parameters
    ----------
    material_id : int | None, optional
        ID of the material to be updated. Required for single updates. By default None.
    description : str | None, optional
        New description of the material. By default None.
    sap_id : int | None, optional
        New SAP ID of the material. By default None.
    criticality_weight : int | None, optional
        New criticality weight. By default None.
    base_unit : str | None, optional
        New base unit. By default None.
    is_active : bool | None, optional
        New active status. By default None.
    modified_by_name : str | None, optional
        Name of the user performing the update. Must exist in the users table. By default None.
    data_df : pl.DataFrame | None, optional
        Polars DataFrame containing multiple materials to be updated.
        Required column: id.
        Optional: description, sap_id, criticality_weight, base_unit, is_active, modified_by_name.
        If this is used, all individual parameters will be ignored. By default None.
    """
    df_schema = {
        "id": pl.Int64,
        "description": pl.Utf8,
        "sap_id": pl.Int64,
        "criticality_weight": pl.Int64,
        "base_unit": pl.Utf8,
        "is_active": pl.Boolean,
        "modified_by_name": pl.Utf8,
    }

    if data_df is None:
        data_df = pl.DataFrame(
            {
                "id": [material_id],
                "description": [description],
                "sap_id": [sap_id],
                "criticality_weight": [criticality_weight],
                "base_unit": [base_unit],
                "is_active": [is_active],
                "modified_by_name": [modified_by_name],
            },
            schema=df_schema,
        )
        single_update = True
    else:
        single_update = False

    # validate required columns
    required_cols = {"id", "modified_by_name"}
    missing = required_cols - set(data_df.columns)
    if missing:
        raise ValueError(f"data_df is missing required columns: {missing}")

    # check if IDs exist
    existing_query = sql.SQL("SELECT id FROM performance.inv_materials WHERE id = ANY({ids})").format(
        ids=sql.Literal(data_df["id"].to_list()),
    )
    with self._perfdb.conn.reconnect() as conn:
        existing_df = conn.read_to_polars(existing_query)
    if wrong_ids := set(data_df["id"].to_list()) - set(existing_df["id"].to_list()):
        raise ValueError(f"Material IDs {wrong_ids} do not exist in the database.")

    # resolve modified_by_name to modified_by_id
    if "modified_by_name" in data_df.columns and len(data_df.filter(pl.col("modified_by_name").is_not_null())) > 0:
        user_names = data_df["modified_by_name"].drop_nulls().unique().to_list()
        user_ids = self._perfdb.users.instances.get_ids(names=user_names)
        if wrong_users := set(user_names) - set(user_ids.keys()):
            raise ValueError(f"User names not found in the database: {wrong_users}")

        data_df = data_df.with_columns(
            pl.col("modified_by_name").replace_strict(user_ids, return_dtype=pl.Int64, default=None).alias("modified_by_id"),
        )
    else:
        data_df = data_df.with_columns(pl.lit(None).cast(pl.Int64).alias("modified_by_id"))

    if "modified_by_name" in data_df.columns:
        data_df = data_df.drop(["modified_by_name"])

    self._perfdb.conn.polars_to_sql(
        df=data_df,
        table_name="inv_materials",
        schema="performance",
        conflict_cols=["id"],
        if_exists="update_only",
        ignore_null_cols=single_update,
    )

    logger.debug(f"Updated {len(data_df)} material(s).")