Document Instances¶
DocumentInstances(perfdb)
¶
Class used for handling document types. Can be accessed via perfdb.documents.instances.
Parameters:
Source code in echo_postgres/document_instances.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Class used for handling object instances. Can be accessed via `perfdb.objects.instances`.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
super().__init__(perfdb)
from .document_instance_events import DocumentInstanceEvents
from .document_instance_labels import DocumentInstanceLabels
# * subclasses
self.events = DocumentInstanceEvents(perfdb)
self.labels = DocumentInstanceLabels(perfdb)
delete(document_ids)
¶
Deletes the document instances with the given ids.
Parameters:
-
(document_ids¶list[int]) –List of the ids of the document instances to delete.
Source code in echo_postgres/document_instances.py
@validate_call
def delete(self, document_ids: list[int]) -> None:
"""Deletes the document instances with the given ids.
Parameters
----------
document_ids : list[int]
List of the ids of the document instances to delete.
"""
if not document_ids:
return
query = sql.SQL("DELETE FROM performance.documents WHERE id in ({ids})").format(
ids=sql.SQL(", ").join(sql.Literal(doc_id) for doc_id in document_ids),
)
with self._perfdb.conn.reconnect() as conn:
result = conn.execute(query)
logger.info(f"Deleted {result.rowcount} documents")
get(document_names=None, description_regex=None, data_types=None, document_types=None, period=None, labels=None, event_ids=None, filter_type='and', dest_folder=None, output_type='DataFrame')
¶
Gets the document instances that match the given filters and saves them to the destination folder.
The most useful keys/columns returned are:
- id (index)
- document_name
- description
- data_type_id
- data_type_name
- document_type_id
- document_type_name
- document_date
- label_ids
- labels
- event_ids
- file_path (if dest_folder is set)
Parameters:
-
(document_names¶list[str] | None, default:None) –Names of the documents to get the documents for. If None will get all. By default None.
-
(description_regex¶str | None, default:None) –Regex to filter the description of the documents. If None will get all. By default None.
-
(data_types¶list[str] | None, default:None) –Types of the data to get the documents for. If None will get all. By default None.
-
(document_types¶list[str] | None, default:None) –Types of the documents to get the documents for. If None will get all. By default None.
-
(period¶DateTimeRange | None, default:None) –Period to get the instances for. If None will get all. By default None.
-
(labels¶list[str] | None, default:None) –Labels to filter the documents by. If None will get all. By default None.
-
(event_ids¶list[int] | None, default:None) –Ids of the events to filter the documents by. If None will get all. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –Type of filter to apply to the where clause. Can be one of ["and", "or"]. By default "and".
-
(dest_folder¶Path, default:None) –Folder where the files will be saved. If this folder does not exist it will be created.
In case this is set to None, the actual document will not be retrieved from the database, only the other columns will be returned. This is useful when you only need the metadata of the documents as it will be much faster than retrieving the actual documents.
If this is set to a Path, the documents will be saved to this folder and a new column "file_path" will be added to the DataFrame with the path to the saved document.
-
(output_type¶Literal['dict', 'DataFrame'], default:'DataFrame') –The format of the returned data. Can be one of ["dict", "DataFrame"], by default "DataFrame".
Returns:
-
DataFrame–In case output_type is "DataFrame", will return a DataFrame with the columns of the v_documents view. The index will be the id of the document instance. If dest_folder is set, will have an additional column "file_path" with the path to the saved document.
-
dict[int, dict[str, Any]]–In case output_type is "dict", will return a dictionary with the id of the document instance as the key and the attributes of the document instance as the value. If dest_folder is set, one of the attributes will be "file_path" with the path to the saved document.
Source code in echo_postgres/document_instances.py
@validate_call
def get(
self,
document_names: list[str] | None = None,
description_regex: str | None = None,
data_types: list[str] | None = None,
document_types: list[str] | None = None,
period: DateTimeRange | None = None,
labels: list[str] | None = None,
event_ids: list[int] | None = None,
filter_type: Literal["and", "or"] = "and",
dest_folder: Path | None = None,
output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[int, dict[str, Any]]:
"""Gets the document instances that match the given filters and saves them to the destination folder.
The most useful keys/columns returned are:
- id (index)
- document_name
- description
- data_type_id
- data_type_name
- document_type_id
- document_type_name
- document_date
- label_ids
- labels
- event_ids
- file_path (if dest_folder is set)
Parameters
----------
document_names : list[str] | None, optional
Names of the documents to get the documents for. If None will get all. By default None.
description_regex : str | None, optional
Regex to filter the description of the documents. If None will get all. By default None.
data_types : list[str] | None, optional
Types of the data to get the documents for. If None will get all. By default None.
document_types : list[str] | None, optional
Types of the documents to get the documents for. If None will get all. By default None.
period : DateTimeRange | None, optional
Period to get the instances for. If None will get all. By default None.
labels : list[str] | None, optional
Labels to filter the documents by. If None will get all. By default None.
event_ids : list[int] | None, optional
Ids of the events to filter the documents by. If None will get all. By default None.
filter_type : Literal["and", "or"], optional
Type of filter to apply to the where clause. Can be one of ["and", "or"]. By default "and".
dest_folder : Path, optional
Folder where the files will be saved. If this folder does not exist it will be created.
In case this is set to None, the actual document will not be retrieved from the database, only the other columns will be returned. This is useful when you only need the metadata of the documents as it will be much faster than retrieving the actual documents.
If this is set to a Path, the documents will be saved to this folder and a new column "file_path" will be added to the DataFrame with the path to the saved document.
output_type : Literal["dict", "DataFrame"], optional
The format of the returned data. Can be one of ["dict", "DataFrame"], by default "DataFrame".
Returns
-------
DataFrame
In case output_type is "DataFrame", will return a DataFrame with the columns of the v_documents view. The index will be the id of the document instance. If dest_folder is set, will have an additional column "file_path" with the path to the saved document.
dict[int, dict[str, Any]]
In case output_type is "dict", will return a dictionary with the id of the document instance as the key and the attributes of the document instance as the value. If dest_folder is set, one of the attributes will be "file_path" with the path to the saved document.
"""
# checking arguments
where = self._check_get_args(
document_names=document_names,
description_regex=description_regex,
data_types=data_types,
document_types=document_types,
period=period,
labels=labels,
event_ids=event_ids,
filter_type=filter_type,
)
# creating the destination folder if it does not exist
if dest_folder is not None:
dest_folder.mkdir(parents=True, exist_ok=True)
# getting list of columns from the view
with self._perfdb.conn.reconnect() as conn:
columns = list(
conn.get_table_columns(schema="performance", table_names=["v_documents"], table_types=["view"])["v_documents"].keys(),
)
# if the destination folder is None, remove the document_data column
if dest_folder is None:
columns = [col for col in columns if col != "document_data"]
query = [
sql.SQL("SELECT {columns} FROM performance.v_documents ").format(
columns=sql.SQL(", ").join(sql.Identifier(col) for col in columns),
),
where,
]
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query, post_convert="pyarrow")
# creating the file path column if the destination folder is set
if dest_folder is not None:
logger.info(f"Saving {len(df)} documents to folder '{dest_folder}'")
df["file_path"] = df[["document_name", "data_type_name"]].apply(
lambda x: dest_folder / Path(f"{x['document_name']}{x['data_type_name']}"),
axis=1,
)
# converting the values
df["document_data"] = df.apply(
lambda row: Path(convert_from_binary(row["document_data"], row["data_type_name"], row["file_path"])),
axis=1,
)
# deleting the document_data column
df["file_path"] = df["document_data"]
df = df.drop(columns=["document_data"])
df = df.set_index("id")
# returning if the output type is a DataFrame
if output_type == "DataFrame":
return df
# converting to dict if needed
return df.to_dict(orient="index")
get_ids(document_names=None, description_regex=None, data_types=None, document_types=None, period=None, labels=None, event_ids=None, filter_type='and')
¶
Gets the ids of the document instances that match the given filters.
Parameters:
-
(document_names¶list[str] | None, default:None) –Names of the documents to get the documents for. If None will get all. By default None.
-
(description_regex¶str | None, default:None) –Regex to filter the description of the documents. If None will get all. By default None.
-
(data_types¶list[str] | None, default:None) –Types of the data to get the documents for. If None will get all. By default None.
-
(document_types¶list[str] | None, default:None) –Types of the documents to get the documents for. If None will get all. By default None.
-
(period¶DateTimeRange | None, default:None) –Period to get the instances for. If None will get all. By default None.
-
(labels¶list[str] | None, default:None) –Labels to filter the documents by. If None will get all. By default None.
-
(event_ids¶list[int] | None, default:None) –Ids of the events to filter the documents by. If None will get all. By default None.
-
(filter_type¶Literal['and', 'or'], default:'and') –Type of filter to apply to the where clause. Can be one of ["and", "or"]. By default "and".
Returns:
-
list[int]–List of the ids of the document instances that match the filters.
Source code in echo_postgres/document_instances.py
@validate_call
def get_ids(
self,
document_names: list[str] | None = None,
description_regex: str | None = None,
data_types: list[str] | None = None,
document_types: list[str] | None = None,
period: DateTimeRange | None = None,
labels: list[str] | None = None,
event_ids: list[int] | None = None,
filter_type: Literal["and", "or"] = "and",
) -> list[int]:
"""Gets the ids of the document instances that match the given filters.
Parameters
----------
document_names : list[str] | None, optional
Names of the documents to get the documents for. If None will get all. By default None.
description_regex : str | None, optional
Regex to filter the description of the documents. If None will get all. By default None.
data_types : list[str] | None, optional
Types of the data to get the documents for. If None will get all. By default None.
document_types : list[str] | None, optional
Types of the documents to get the documents for. If None will get all. By default None.
period : DateTimeRange | None, optional
Period to get the instances for. If None will get all. By default None.
labels : list[str] | None, optional
Labels to filter the documents by. If None will get all. By default None.
event_ids : list[int] | None, optional
Ids of the events to filter the documents by. If None will get all. By default None.
filter_type : Literal["and", "or"], optional
Type of filter to apply to the where clause. Can be one of ["and", "or"]. By default "and".
Returns
-------
list[int]
List of the ids of the document instances that match the filters.
"""
# checking arguments
where = self._check_get_args(
document_names=document_names,
description_regex=description_regex,
data_types=data_types,
document_types=document_types,
period=period,
labels=labels,
event_ids=event_ids,
filter_type=filter_type,
)
query = [sql.SQL("SELECT id FROM performance.v_documents "), where]
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query)
return df["id"].tolist()
insert(document_path, document_type, document_name=None, document_date=None, description=None, labels=None, event_ids=None, delete_after_insert=False)
¶
Inserts a document instance into the database.
This will convert the document into a bytea and insert it into the database.
Parameters:
-
(document_path¶Path | bytes) –Path to the document to insert. If receives a bytes object will insert the bytes directly into the database.
-
(document_type¶str) –Type of the document to insert. Must be a valid document type already in document_types table in the database.
-
(document_name¶str | None, default:None) –Name of the document. If not set will use the name of the file without the extension. By default None.
-
(document_date¶datetime | None, default:None) –Date of the document. If not set will use the current date. By default None.
-
(description¶str | None, default:None) –Description of the document, by default None
-
(labels¶list[str] | None, default:None) –List of labels to add to the document, by default None
-
(event_ids¶list[int] | None, default:None) –List of event ids to associate with the document, by default None
-
(delete_after_insert¶bool, default:False) –If set to True will delete the file after inserting it into the database. It is useful when the file is a temporary file that is not needed anymore. By default False.
Returns:
-
int–Id of the inserted document instance in the database.
Source code in echo_postgres/document_instances.py
@validate_call
def insert(
self,
document_path: Path | bytes,
document_type: str,
document_name: str | None = None,
document_date: datetime | None = None,
description: str | None = None,
labels: list[str] | None = None,
event_ids: list[int] | None = None,
delete_after_insert: bool = False,
) -> int:
"""Inserts a document instance into the database.
This will convert the document into a bytea and insert it into the database.
Parameters
----------
document_path : Path | bytes
Path to the document to insert. If receives a bytes object will insert the bytes directly into the database.
document_type : str
Type of the document to insert. Must be a valid document type already in document_types table in the database.
document_name : str | None, optional
Name of the document. If not set will use the name of the file without the extension. By default None.
document_date : datetime | None, optional
Date of the document. If not set will use the current date. By default None.
description : str | None, optional
Description of the document, by default None
labels : list[str] | None, optional
List of labels to add to the document, by default None
event_ids : list[int] | None, optional
List of event ids to associate with the document, by default None
delete_after_insert : bool, optional
If set to True will delete the file after inserting it into the database. It is useful when the file is a temporary file that is not needed anymore. By default False.
Returns
-------
int
Id of the inserted document instance in the database.
"""
# getting all the possible data types
data_types = self._perfdb.datatypes.get_ids()
if isinstance(document_path, Path): # noqa: SIM108
# checking if the document type is valid
doc_data_type = document_path.suffix.lower()
else:
doc_data_type = f".{document_name.split('.')[-1]}"
# replacing some specific cases that might have different suffixes
match doc_data_type:
case ".jpeg":
doc_data_type = ".jpg"
case ".tiff":
doc_data_type = ".tif"
case ".yml":
doc_data_type = ".yaml"
if doc_data_type not in data_types:
raise ValueError(
f"Document type '{doc_data_type}' is not a valid document type. Please check the valid data types in data_types table in the database.",
)
doc_data_type_id = data_types[doc_data_type]
# checking if the document type is valid
doc_types = self._perfdb.documents.types.get_ids()
if document_type not in doc_types:
raise ValueError(
f"Document type '{document_type}' is not a valid document type. Please check the valid document types in document_types table in the database.",
)
doc_type_id = doc_types[document_type]
# getting the document name
if document_name is None:
document_name = document_path.stem
# getting the document date
if document_date is None:
document_date = datetime.now()
if isinstance(document_path, Path):
# reading the file
with Path(document_path).open("rb") as f:
bytes_value = f.read()
else:
bytes_value = document_path
document_name = f"{document_name.split('.')[0]}"
# inserting the document
query = sql.SQL(
"INSERT INTO performance.documents (name, document_date, description, document_type_id, data_type_id, document_data) VALUES ({name}, {date}, {desc}, {doc_type_id}, {data_type_id}, %s) RETURNING id",
).format(
name=sql.Literal(document_name),
date=sql.Literal(document_date),
desc=sql.Literal(description),
doc_type_id=sql.Literal(doc_type_id),
data_type_id=sql.Literal(doc_data_type_id),
)
with self._perfdb.conn.reconnect() as conn:
cursor = conn.cursor()
cursor.execute(
query,
(bytes_value,),
)
doc_id = cursor.fetchone()[0]
if labels:
try:
self._perfdb.documents.instances.labels.insert(document_id=doc_id, labels=labels)
logger.info(f"Labels {labels} inserted for document {document_name}")
except Exception as e:
logger.exception(
f"Error inserting labels for document {document_name}. The document was inserted but the labels were not.",
)
raise RuntimeError(
f"Error inserting labels for document {document_name}. The document was inserted but the labels were not.",
) from e
if event_ids:
try:
self._perfdb.documents.instances.events.insert(document_id=doc_id, event_ids=event_ids)
logger.info(f"Events {event_ids} inserted for document {document_name}")
except Exception as e:
logger.exception(
f"Error inserting events for document {document_name}. The document was inserted but the events were not.",
)
raise RuntimeError(
f"Error inserting events for document {document_name}. The document was inserted but the events were not.",
) from e
logger.info(f"Document {document_name} inserted with id {doc_id}")
# deleting the file
if delete_after_insert:
logger.debug(f"Deleting file '{document_path}'")
document_path.unlink()
return doc_id