Inventory Storage Locations¶
InventoryStorageLocations(perfdb)
¶
Class used for handling Inventory Storage Locations. Can be accessed via perfdb.inventory.storage_locations.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(name, center_name)
¶
Deletes an inventory storage location from the database.
Parameters:
-
(name¶str) –Name of the storage location to be deleted.
-
(center_name¶str) –Name of the center where the storage location is located.
Returns:
-
int–Number of rows deleted (0 if no matching storage location was found).
Source code in echo_postgres/inventory_storage_locations.py
@validate_call
def delete(self, name: str, center_name: str) -> int:
"""Deletes an inventory storage location from the database.
Parameters
----------
name : str
Name of the storage location to be deleted.
center_name : str
Name of the center where the storage location is located.
Returns
-------
int
Number of rows deleted (0 if no matching storage location was found).
"""
# resolve center_id
center_ids = self._perfdb.inventory.centers.get_ids()
if center_name not in center_ids:
raise ValueError(f"Center with name '{center_name}' does not exist in the database.")
center_id = center_ids[center_name]
query = sql.SQL("DELETE FROM performance.inv_storage_locations WHERE name = {name} AND center_id = {center_id}").format(
name=sql.Literal(name),
center_id=sql.Literal(center_id),
)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
return conn.rowcount
get(names=None, center_names=None, is_active=None, is_serviceable_stock=None, filter_type='and', output_type='pl.DataFrame')
¶
Gets all inventory storage locations and their attributes.
The most useful keys/columns returned are:
- id
- center_name
- storage_location_name
- is_serviceable_stock
- is_active
Parameters:
-
(names¶list[str] | None, default:None) –List of storage location names to filter the results. By default None.
-
(center_names¶list[str] | None, default:None) –List of center names to filter the results. By default None.
-
(is_active¶bool | None, default:None) –Filter by active status. By default None.
-
(is_serviceable_stock¶bool | None, default:None) –Filter by serviceable stock status. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and".
-
(output_type¶Literal['dict', 'DataFrame', 'pl.DataFrame'], default:'pl.DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame", "pl.DataFrame"]. By default "pl.DataFrame".
Returns:
-
dict[str, dict[str, dict[str, Any]]]–In case output_type is "dict", returns a nested dictionary {center_name: {storage_location_name: {attribute: value, ...}, ...}, ...}.
-
DataFrame–In case output_type is "DataFrame", returns a pandas DataFrame with MultiIndex = [center_name, storage_location_name].
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame.
Source code in echo_postgres/inventory_storage_locations.py
@validate_call
def get(
self,
names: list[str] | None = None,
center_names: list[str] | None = None,
is_active: bool | None = None,
is_serviceable_stock: bool | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "pl.DataFrame",
) -> dict[str, dict[str, dict[str, Any]]] | pd.DataFrame | pl.DataFrame:
"""Gets all inventory storage locations and their attributes.
The most useful keys/columns returned are:
- id
- center_name
- storage_location_name
- is_serviceable_stock
- is_active
Parameters
----------
names : list[str] | None, optional
List of storage location names to filter the results. By default None.
center_names : list[str] | None, optional
List of center names to filter the results. By default None.
is_active : bool | None, optional
Filter by active status. By default None.
is_serviceable_stock : bool | None, optional
Filter by serviceable stock status. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"]. By default "and".
output_type : Literal["dict", "DataFrame", "pl.DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame", "pl.DataFrame"].
By default "pl.DataFrame".
Returns
-------
dict[str, dict[str, dict[str, Any]]]
In case output_type is "dict", returns a nested dictionary
{center_name: {storage_location_name: {attribute: value, ...}, ...}, ...}.
pd.DataFrame
In case output_type is "DataFrame", returns a pandas DataFrame with
MultiIndex = [center_name, storage_location_name].
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame.
"""
where = self._check_get_args(
names=names,
center_names=center_names,
is_active=is_active,
is_serviceable_stock=is_serviceable_stock,
filter_type=filter_type,
)
query = sql.SQL(
"SELECT * FROM performance.v_inv_storage_locations {where} ORDER BY center_name, storage_location_name",
).format(where=where)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_polars(query)
if output_type == "pl.DataFrame":
return df
df = df.to_pandas(use_pyarrow_extension_array=True)
df = df.set_index(["center_name", "storage_location_name"])
if output_type == "DataFrame":
return df
result: dict[str, dict[str, dict[str, Any]]] = {}
for (center, loc), values in df.to_dict(orient="index").items():
result.setdefault(center, {})[loc] = values
return result
get_ids(names=None, center_names=None, is_active=None, is_serviceable_stock=None, filter_type='and')
¶
Gets all inventory storage locations and their respective ids.
Parameters:
-
(names¶list[str] | None, default:None) –List of storage location names to filter the results. By default None.
-
(center_names¶list[str] | None, default:None) –List of center names to filter the results. By default None.
-
(is_active¶bool | None, default:None) –Filter by active status. By default None.
-
(is_serviceable_stock¶bool | None, default:None) –Filter by serviceable stock status. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and".
Returns:
-
dict[str, dict[str, int]]–Nested dictionary in the format {center_name: {storage_location_name: id, ...}, ...}.
Source code in echo_postgres/inventory_storage_locations.py
@validate_call
def get_ids(
self,
names: list[str] | None = None,
center_names: list[str] | None = None,
is_active: bool | None = None,
is_serviceable_stock: bool | None = None,
filter_type: Literal["and", "or"] = "and",
) -> dict[str, dict[str, int]]:
"""Gets all inventory storage locations and their respective ids.
Parameters
----------
names : list[str] | None, optional
List of storage location names to filter the results. By default None.
center_names : list[str] | None, optional
List of center names to filter the results. By default None.
is_active : bool | None, optional
Filter by active status. By default None.
is_serviceable_stock : bool | None, optional
Filter by serviceable stock status. By default None.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"]. By default "and".
Returns
-------
dict[str, dict[str, int]]
Nested dictionary in the format {center_name: {storage_location_name: id, ...}, ...}.
"""
where = self._check_get_args(
names=names,
center_names=center_names,
is_active=is_active,
is_serviceable_stock=is_serviceable_stock,
filter_type=filter_type,
)
query = sql.SQL(
"SELECT center_name, storage_location_name, id FROM performance.v_inv_storage_locations {where} ORDER BY center_name, storage_location_name",
).format(where=where)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_polars(query)
result: dict[str, dict[str, int]] = {}
for center, loc, loc_id in zip(
df["center_name"].to_list(),
df["storage_location_name"].to_list(),
df["id"].to_list(),
strict=False,
):
result.setdefault(center, {})[loc] = loc_id
return result
insert(name=None, center_name=None, is_active=True, is_serviceable_stock=True, data_df=None, on_conflict='ignore')
¶
Inserts a new inventory storage location into the database.
You can either pass individual values to insert a single storage location, or pass a DataFrame to insert multiple storage locations at once.
Parameters:
-
(name¶str | None, default:None) –Name of the storage location. By default None.
-
(center_name¶str | None, default:None) –Name of the center where the storage location is located. Must exist in the centers table. By default None.
-
(is_active¶bool, default:True) –Whether the storage location is active. By default True.
-
(is_serviceable_stock¶bool, default:True) –Whether the storage location holds serviceable stock. By default True.
-
(data_df¶DataFrame | None, default:None) –Polars DataFrame containing multiple storage locations to be inserted. Required columns: name, center_name. Optional: is_active, is_serviceable_stock. If this is used, all individual parameters will be ignored. By default None.
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore".
Returns:
-
int | list[int] | None–If inserting a single storage location, returns the ID of the inserted record. If inserting multiple, returns a list of IDs. If no record was inserted (due to conflicts and on_conflict="ignore"), returns None.
Source code in echo_postgres/inventory_storage_locations.py
@validate_call
def insert(
self,
name: str | None = None,
center_name: str | None = None,
is_active: bool = True,
is_serviceable_stock: bool = True,
data_df: pl.DataFrame | None = None,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> int | list[int] | None:
"""Inserts a new inventory storage location into the database.
You can either pass individual values to insert a single storage location, or pass a DataFrame
to insert multiple storage locations at once.
Parameters
----------
name : str | None, optional
Name of the storage location. By default None.
center_name : str | None, optional
Name of the center where the storage location is located. Must exist in the centers table. By default None.
is_active : bool, optional
Whether the storage location is active. By default True.
is_serviceable_stock : bool, optional
Whether the storage location holds serviceable stock. By default True.
data_df : pl.DataFrame | None, optional
Polars DataFrame containing multiple storage locations to be inserted.
Required columns: name, center_name. Optional: is_active, is_serviceable_stock.
If this is used, all individual parameters will be ignored. By default None.
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore".
Returns
-------
int | list[int] | None
If inserting a single storage location, returns the ID of the inserted record.
If inserting multiple, returns a list of IDs.
If no record was inserted (due to conflicts and on_conflict="ignore"), returns None.
"""
df_schema = {
"name": pl.Utf8,
"center_name": pl.Utf8,
"is_active": pl.Boolean,
"is_serviceable_stock": pl.Boolean,
}
if data_df is None:
single_insert = True
data_df = pl.DataFrame(
{
"name": [name],
"center_name": [center_name],
"is_active": [is_active],
"is_serviceable_stock": [is_serviceable_stock],
},
schema=df_schema,
)
else:
single_insert = False
# validate required columns
required_cols = {"name", "center_name"}
missing = required_cols - set(data_df.columns)
if missing:
raise ValueError(f"data_df is missing required columns: {missing}")
# resolve center_name to center_id
center_ids = self._perfdb.inventory.centers.get_ids()
if wrong_centers := set(data_df["center_name"].to_list()) - set(center_ids.keys()):
raise ValueError(f"Center names not found in the database: {wrong_centers}")
data_df = data_df.with_columns(
pl.col("center_name").replace_strict(center_ids, return_dtype=pl.Int64).alias("center_id"),
)
data_df = data_df.drop(["center_name"])
ids_df = self._perfdb.conn.polars_to_sql(
df=data_df,
table_name="inv_storage_locations",
schema="performance",
conflict_cols=["center_id", "name"],
return_cols=["id"],
if_exists="append" if on_conflict == "ignore" else "update",
ignore_null_cols=single_insert,
)
ids = ids_df["id"].to_list()
logger.debug(f"Inserted/updated {len(ids)} storage location(s) with IDs: {ids}")
return ids if not single_insert else ids[0] if ids else None