IV Curve Reports¶
IVCurveReports(perfdb)
¶
Class used for handling IV Curve Reports data. Can be accessed via perfdb.ivcurves.reports.
Parameters:
Source code in echo_postgres/ivcurve_reports.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Class used for handling IV Curve Reports data. Can be accessed via `perfdb.ivcurves.reports`.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
super().__init__(perfdb)
from .ivcurve_report_details import IVCurveReportDetails
# * subclasses
self.details = IVCurveReportDetails(perfdb)
delete(ids=None, ts_names=None, timestamps=None, document_ids=None)
¶
Deletes IV Curve Reports from the database based on the provided filters.
Parameters:
-
(ids¶list[int] | None, default:None) –List of IV Curve Report IDs to filter the deletion.
If None, does not filter by IV Curve Report ID.
-
(ts_names¶list[str] | None, default:None) –List of transformer station names to filter the deletion.
If None, does not filter by transformer station name.
-
(timestamps¶list[datetime] | None, default:None) –List of timestamps to filter the deletion.
If None, does not filter by timestamp.
-
(document_ids¶list[int] | None, default:None) –List of document ids to filter the deletion.
If None, does not filter by document id.
Source code in echo_postgres/ivcurve_reports.py
@validate_call
def delete(
self,
ids: list[int] | None = None,
ts_names: list[str] | None = None,
timestamps: list[datetime] | None = None,
document_ids: list[int] | None = None,
) -> None:
"""Deletes IV Curve Reports from the database based on the provided filters.
Parameters
----------
ids : list[int] | None
List of IV Curve Report IDs to filter the deletion.
If None, does not filter by IV Curve Report ID.
ts_names : list[str] | None, optional
List of transformer station names to filter the deletion.
If None, does not filter by transformer station name.
timestamps : list[datetime] | None, optional
List of timestamps to filter the deletion.
If None, does not filter by timestamp.
document_ids : list[int] | None, optional
List of document ids to filter the deletion.
If None, does not filter by document id.
"""
where_conditions = []
if ids:
where_conditions.append(
sql.SQL("id = ANY({ids})").format(
ids=sql.Literal(ids),
),
)
if ts_names:
ts_ids = self._perfdb.objects.instances.get_ids(object_names=ts_names)
if ts_ids:
where_conditions.append(
sql.SQL("ts_id = ANY({ts_ids})").format(
ts_ids=sql.Literal(list(ts_ids.values())),
),
)
if timestamps:
where_conditions.append(
sql.SQL("timestamp::TIMESTAMP = ANY({timestamps})").format(
timestamps=sql.Literal([ts.isoformat() for ts in timestamps]),
),
)
if document_ids:
where_conditions.append(
sql.SQL("document_id = ANY({document_ids})").format(
document_ids=sql.Literal(document_ids),
),
)
if len(where_conditions) == 0:
raise ValueError("At least one filter must be provided to delete IV Curve Reports data.")
where_query = sql.SQL("WHERE ") + sql.SQL(" AND ").join(where_conditions) if where_conditions else sql.SQL("")
query = sql.SQL(
"DELETE FROM performance.iv_curve_reports {where_query}",
).format(where_query=where_query)
with self._perfdb.conn.reconnect() as conn:
# deleting
conn.execute(query)
logger.debug(f"Deleted {conn.rowcount} IV Curve Reports records from the database")
get(ids=None, ts_names=None, timestamps=None, document_ids=None, filter_type='and', output_type='dict')
¶
Retrieves IV Curve Info information from the database.
The most useful keys/columns returned are:
- id (index in case of DataFrame)
- ts_id
- ts_name
- timestamp
- irradiance
- module_temperature
- document_id
- document_name
Parameters:
-
(ts_names¶list[str] | None, default:None) –List of transformer station names to retrieve information for.
If None, retrieves information for all available.
-
(timestamps¶list[datetime] | None, default:None) –List of timestamps to retrieve information for.
If None, retrieves information for all available.
-
(document_ids¶list[int] | None, default:None) –List of document ids to retrieve information for.
If None, retrieves information for all available.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and".
-
(output_type¶Literal['dict', 'DataFrame'], default:'dict') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"
Returns:
-
dict[str, dict[str, Any]]–In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}
-
DataFrame–In case output_type is "DataFrame", returns a DataFrame with the following format: index = id, columns = [attribute, ...]
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame
Source code in echo_postgres/ivcurve_reports.py
@validate_call
def get(
self,
ids: list[int] | None = None,
ts_names: list[str] | None = None,
timestamps: list[datetime] | None = None,
document_ids: list[int] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "dict",
) -> dict[str, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
"""Retrieves IV Curve Info information from the database.
The most useful keys/columns returned are:
- id (index in case of DataFrame)
- ts_id
- ts_name
- timestamp
- irradiance
- module_temperature
- document_id
- document_name
Parameters
----------
ts_names : list[str] | None, optional
List of transformer station names to retrieve information for.
If None, retrieves information for all available.
timestamps : list[datetime] | None, optional
List of timestamps to retrieve information for.
If None, retrieves information for all available.
document_ids : list[int] | None, optional
List of document ids to retrieve information for.
If None, retrieves information for all available.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and".
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "dict"
Returns
-------
dict[str, dict[str, Any]]
In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}
DataFrame
In case output_type is "DataFrame", returns a DataFrame with the following format: index = id, columns = [attribute, ...]
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame
"""
where_query = self._check_get_args(
ids=ids,
ts_names=ts_names,
timestamps=timestamps,
document_ids=document_ids,
filter_type=filter_type,
)
query = sql.SQL(
"SELECT * FROM performance.v_iv_curve_reports {where_query} ORDER BY ts_name",
).format(
where_query=where_query,
)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_polars(query)
if output_type == "pl.DataFrame":
return df
df = df.to_pandas(use_pyarrow_extension_array=True).set_index("id")
return df.to_dict(orient="index") if output_type == "dict" else df
get_ids(ids=None, ts_names=None, timestamps=None, document_ids=None, filter_type='and')
¶
Gets the IV Curve Report IDs based on the provided filters.
Parameters:
-
(ids¶list[int] | None, default:None) –List of IV Curve Report IDs to filter the query.
If None, does not filter by IV Curve Report ID.
-
(ts_names¶list[str] | None, default:None) –List of transformer station names to filter the query.
If None, does not filter by transformer station name.
-
(timestamps¶list[datetime] | None, default:None) –List of timestamps to filter the query.
If None, does not filter by timestamp.
-
(document_ids¶list[int] | None, default:None) –List of document ids to filter the query.
If None, does not filter by document id.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"].
By default "and".
Returns:
-
list[int]–List of IV Curve Report IDs matching the provided filters.
Source code in echo_postgres/ivcurve_reports.py
@validate_call
def get_ids(
self,
ids: list[int] | None = None,
ts_names: list[str] | None = None,
timestamps: list[datetime] | None = None,
document_ids: list[int] | None = None,
filter_type: Literal["and", "or"] = "and",
) -> list[int]:
"""Gets the IV Curve Report IDs based on the provided filters.
Parameters
----------
ids : list[int] | None
List of IV Curve Report IDs to filter the query.
If None, does not filter by IV Curve Report ID.
ts_names : list[str] | None, optional
List of transformer station names to filter the query.
If None, does not filter by transformer station name.
timestamps : list[datetime] | None, optional
List of timestamps to filter the query.
If None, does not filter by timestamp.
document_ids : list[int] | None, optional
List of document ids to filter the query.
If None, does not filter by document id.
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and".
Returns
-------
list[int]
List of IV Curve Report IDs matching the provided filters.
"""
where_query = self._check_get_args(
ids=ids,
ts_names=ts_names,
timestamps=timestamps,
document_ids=document_ids,
filter_type=filter_type,
)
query = sql.SQL(
"SELECT id FROM performance.iv_curve_reports {where_query}",
).format(where_query=where_query)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_polars(query)
return df["id"].to_list() if len(df) != 0 else []
insert(document_id, ts_id, timestamp, irradiance, module_temperature, on_conflict='ignore')
¶
Inserts or updates an IV curve information.
Parameters:
-
(document_id¶int) –ID of the document associated with the IV curve report.
-
(ts_id¶int) –ID of the transformer station.
-
(timestamp¶datetime) –Timestamp of the IV curve report.
-
(irradiance¶float) –Irradiance value in W/m².
-
(module_temperature¶float) –Module temperature in °C.
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"
Returns:
-
int | None–ID of the inserted or updated IV Curve Report. Will be None if on_conflict is "ignore" and a conflict occurs.
Source code in echo_postgres/ivcurve_reports.py
@validate_call
def insert(
self,
document_id: int,
ts_id: int,
timestamp: datetime,
irradiance: float,
module_temperature: float,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> int | None:
"""Inserts or updates an IV curve information.
Parameters
----------
document_id : int
ID of the document associated with the IV curve report.
ts_id : int
ID of the transformer station.
timestamp : datetime
Timestamp of the IV curve report.
irradiance : float
Irradiance value in W/m².
module_temperature : float
Module temperature in °C.
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. Can be one of ["ignore", "update"].
By default "ignore"
Returns
-------
int | None
ID of the inserted or updated IV Curve Report. Will be None if on_conflict is "ignore" and a conflict occurs.
"""
# defining the query
query = sql.SQL(
"""
INSERT INTO performance.iv_curve_reports (document_id, ts_id, timestamp, irradiance, module_temperature)
VALUES ({document_id}, {ts_id}, {timestamp}, {irradiance}, {module_temperature})
ON CONFLICT (document_id, ts_id, timestamp) DO {conflict_action}
RETURNING id
""",
).format(
document_id=sql.Literal(document_id),
ts_id=sql.Literal(ts_id),
timestamp=sql.Literal(timestamp.isoformat()),
irradiance=sql.Literal(irradiance),
module_temperature=sql.Literal(module_temperature),
conflict_action=sql.SQL(
"NOTHING"
if on_conflict == "ignore"
else "UPDATE SET irradiance = EXCLUDED.irradiance, module_temperature = EXCLUDED.module_temperature",
),
)
# inserting the data
with self._perfdb.conn.reconnect() as conn:
result = conn.execute(query)
row = result.fetchone()
iv_curve_report_id = row[0] if row else None
logger.debug(
f"IV Curve info from '{timestamp}' object '{ts_id}' inserted/updated in the database - New ID {iv_curve_report_id}",
)
return iv_curve_report_id
insert_by_pdf(pdf_file_path, document_name, insert_details=False, excel_file_path=None, on_conflict='ignore')
¶
Inserts IV Curve Info data from a PDF file into the database.
Parameters:
-
(pdf_file_path¶Path | BytesIO) –In case of Path, path to the PDF file containing IV Curve Info data.
In case of BytesIO, BytesIO object containing the PDF data. Can be used to avoid saving the PDF to disk.
-
(document_name¶str) –Name to be assigned to the document in the database. Must end with ".pdf".
-
(insert_details¶bool, default:False) –Whether to also extract and insert IV Curve Details data from the PDF.
By default False.
-
(excel_file_path¶Path | BytesIO | None, default:None) –Excel file with the iv curve array data as exported by Huawei software.
Only applicable if insert_details is True.
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict when inserting the IV Curve Report. Can be one of ["ignore", "update"]. By default "ignore"
Returns:
-
int | None–ID of the inserted IV Curve Report. Will be None if on_conflict is "ignore
Source code in echo_postgres/ivcurve_reports.py
@validate_call
def insert_by_pdf(
self,
pdf_file_path: Path | BytesIO,
document_name: str,
insert_details: bool = False,
excel_file_path: Path | BytesIO | None = None,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> int | None:
"""Inserts IV Curve Info data from a PDF file into the database.
Parameters
----------
pdf_file_path : Path | BytesIO
In case of Path, path to the PDF file containing IV Curve Info data.
In case of BytesIO, BytesIO object containing the PDF data. Can be used to avoid saving the PDF to disk.
document_name : str
Name to be assigned to the document in the database. Must end with ".pdf".
insert_details : bool, optional
Whether to also extract and insert IV Curve Details data from the PDF.
By default False.
excel_file_path : Path | BytesIO | None, optional
Excel file with the iv curve array data as exported by Huawei software.
Only applicable if insert_details is True.
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict when inserting the IV Curve Report. Can be one of ["ignore", "update"].
By default "ignore"
Returns
-------
int | None
ID of the inserted IV Curve Report. Will be None if on_conflict is "ignore
"""
# checking document_name ends with .pdf
if not document_name.lower().endswith(".pdf"):
raise ValueError("The document_name must end with '.pdf'")
new_id = None
try:
# converting to BytesIO if Path
if isinstance(pdf_file_path, Path):
with pdf_file_path.open("rb") as f:
pdf_file_path = BytesIO(f.read())
# reading the pdf
iv_curve_info = self._read_pdf(
pdf_file_path=pdf_file_path,
get_details=insert_details,
excel_file_path=excel_file_path if insert_details else None,
)
# checking if the report already exists
existing_reports = self.get(
ts_names=[iv_curve_info["ts_name"]],
timestamps=[iv_curve_info["timestamp"]],
output_type="pl.DataFrame",
)
# in case it already exists, lets skip inserting
if not existing_reports.is_empty():
# getting current id
doc_id = existing_reports["document_id"][0]
logger.info(
f"IV Curve Report for TS {iv_curve_info['ts_name']} at timestamp {iv_curve_info['timestamp']} already exists in the database with ID {doc_id}. Skipping insertion.",
)
else:
# inserting the pdf
doc_id = self._perfdb.documents.instances.insert(
document_path=pdf_file_path.getvalue(),
document_type="IV Curve Report",
document_name=document_name,
document_date=iv_curve_info["timestamp"].to_pydatetime(),
)
iv_curve_info["document_id"] = doc_id
# get the dict to insert without details
iv_curve_info_no_details = {k: v for k, v in iv_curve_info.items() if k not in {"details", "ts_name"}}
new_id = self.insert(**iv_curve_info_no_details, on_conflict=on_conflict)
# inserting details if requested
# if new_id is None, it means the insert was ignored due to conflict
if insert_details and "details" in iv_curve_info and new_id is not None:
# creating polars DataFrame to bulk insert
details_df = pl.DataFrame(
iv_curve_info["details"],
schema={
"inverter_name": pl.Utf8,
"string_number": pl.Int64,
"v_oc": pl.Float64,
"i_sc": pl.Float64,
"source_diagnostic": pl.Utf8,
"iv_curve_array": pl.List(pl.List(pl.Float64)),
},
)
# adding iv_curve_report_id column
details_df = details_df.with_columns(
pl.lit(new_id).cast(pl.Int64).alias("iv_curve_report_id"),
)
self.details.insert(data_df=details_df, on_conflict=on_conflict)
except Exception as e:
# deleting the report in case of error
if new_id is not None:
self.delete(ids=[new_id])
logger.exception("Error while inserting IV Curve Report from PDF. The transaction has been rolled back.")
raise e
return new_id