Skip to content

IV Curve Reports

IVCurveReports(perfdb)

Class used for handling IV Curve Reports data. Can be accessed via perfdb.ivcurves.reports.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/ivcurve_reports.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Class used for handling IV Curve Reports data. Can be accessed via `perfdb.ivcurves.reports`.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.
    """
    super().__init__(perfdb)

    from .ivcurve_report_details import IVCurveReportDetails

    # * subclasses

    self.details = IVCurveReportDetails(perfdb)

delete(ids=None, ts_names=None, timestamps=None, document_ids=None)

Deletes IV Curve Reports from the database based on the provided filters.

Parameters:

  • ids

    (list[int] | None, default: None ) –

    List of IV Curve Report IDs to filter the deletion.

    If None, does not filter by IV Curve Report ID.

  • ts_names

    (list[str] | None, default: None ) –

    List of transformer station names to filter the deletion.

    If None, does not filter by transformer station name.

  • timestamps

    (list[datetime] | None, default: None ) –

    List of timestamps to filter the deletion.

    If None, does not filter by timestamp.

  • document_ids

    (list[int] | None, default: None ) –

    List of document ids to filter the deletion.

    If None, does not filter by document id.

Source code in echo_postgres/ivcurve_reports.py
@validate_call
def delete(
    self,
    ids: list[int] | None = None,
    ts_names: list[str] | None = None,
    timestamps: list[datetime] | None = None,
    document_ids: list[int] | None = None,
) -> None:
    """Deletes IV Curve Reports from the database based on the provided filters.

    Parameters
    ----------
    ids : list[int] | None
        List of IV Curve Report IDs to filter the deletion.

        If None, does not filter by IV Curve Report ID.
    ts_names : list[str] | None, optional
        List of transformer station names to filter the deletion.

        If None, does not filter by transformer station name.
    timestamps : list[datetime] | None, optional
        List of timestamps to filter the deletion.

        If None, does not filter by timestamp.
    document_ids : list[int] | None, optional
        List of document ids to filter the deletion.

        If None, does not filter by document id.
    """
    where_conditions = []

    if ids:
        where_conditions.append(
            sql.SQL("id = ANY({ids})").format(
                ids=sql.Literal(ids),
            ),
        )

    if ts_names:
        ts_ids = self._perfdb.objects.instances.get_ids(object_names=ts_names)
        if ts_ids:
            where_conditions.append(
                sql.SQL("ts_id = ANY({ts_ids})").format(
                    ts_ids=sql.Literal(list(ts_ids.values())),
                ),
            )

    if timestamps:
        where_conditions.append(
            sql.SQL("timestamp::TIMESTAMP = ANY({timestamps})").format(
                timestamps=sql.Literal([ts.isoformat() for ts in timestamps]),
            ),
        )

    if document_ids:
        where_conditions.append(
            sql.SQL("document_id = ANY({document_ids})").format(
                document_ids=sql.Literal(document_ids),
            ),
        )

    if len(where_conditions) == 0:
        raise ValueError("At least one filter must be provided to delete IV Curve Reports data.")

    where_query = sql.SQL("WHERE ") + sql.SQL(" AND ").join(where_conditions) if where_conditions else sql.SQL("")

    query = sql.SQL(
        "DELETE FROM performance.iv_curve_reports {where_query}",
    ).format(where_query=where_query)

    with self._perfdb.conn.reconnect() as conn:
        # deleting
        conn.execute(query)

        logger.debug(f"Deleted {conn.rowcount} IV Curve Reports records from the database")

get(ids=None, ts_names=None, timestamps=None, document_ids=None, filter_type='and', output_type='dict')

Retrieves IV Curve Info information from the database.

The most useful keys/columns returned are:

  • id (index in case of DataFrame)
  • ts_id
  • ts_name
  • timestamp
  • irradiance
  • module_temperature
  • document_id
  • document_name

Parameters:

  • ts_names

    (list[str] | None, default: None ) –

    List of transformer station names to retrieve information for.

    If None, retrieves information for all available.

  • timestamps

    (list[datetime] | None, default: None ) –

    List of timestamps to retrieve information for.

    If None, retrieves information for all available.

  • document_ids

    (list[int] | None, default: None ) –

    List of document ids to retrieve information for.

    If None, retrieves information for all available.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and".

  • output_type

    (Literal['dict', 'DataFrame'], default: 'dict' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "dict"

Returns:

  • dict[str, dict[str, Any]]

    In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}

  • DataFrame

    In case output_type is "DataFrame", returns a DataFrame with the following format: index = id, columns = [attribute, ...]

  • DataFrame

    In case output_type is "pl.DataFrame", returns a Polars DataFrame

Source code in echo_postgres/ivcurve_reports.py
@validate_call
def get(
    self,
    ids: list[int] | None = None,
    ts_names: list[str] | None = None,
    timestamps: list[datetime] | None = None,
    document_ids: list[int] | None = None,
    filter_type: Literal["and", "or"] = "and",
    output_type: Literal["dict", "DataFrame", "pl.DataFrame"] = "dict",
) -> dict[str, dict[str, Any]] | pd.DataFrame | pl.DataFrame:
    """Retrieves IV Curve Info information from the database.

    The most useful keys/columns returned are:

    - id (index in case of DataFrame)
    - ts_id
    - ts_name
    - timestamp
    - irradiance
    - module_temperature
    - document_id
    - document_name

    Parameters
    ----------
    ts_names : list[str] | None, optional
        List of transformer station names to retrieve information for.

        If None, retrieves information for all available.

    timestamps : list[datetime] | None, optional
        List of timestamps to retrieve information for.

        If None, retrieves information for all available.

    document_ids : list[int] | None, optional
        List of document ids to retrieve information for.

        If None, retrieves information for all available.

    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and".

    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "dict"

    Returns
    -------
    dict[str, dict[str, Any]]
        In case output_type is "dict", returns a dictionary in the format {id: {attribute: value, ...}, ...}
    DataFrame
        In case output_type is "DataFrame", returns a DataFrame with the following format: index = id, columns = [attribute, ...]
    pl.DataFrame
        In case output_type is "pl.DataFrame", returns a Polars DataFrame
    """
    where_query = self._check_get_args(
        ids=ids,
        ts_names=ts_names,
        timestamps=timestamps,
        document_ids=document_ids,
        filter_type=filter_type,
    )

    query = sql.SQL(
        "SELECT * FROM performance.v_iv_curve_reports {where_query} ORDER BY ts_name",
    ).format(
        where_query=where_query,
    )

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_polars(query)

    if output_type == "pl.DataFrame":
        return df

    df = df.to_pandas(use_pyarrow_extension_array=True).set_index("id")

    return df.to_dict(orient="index") if output_type == "dict" else df

get_ids(ids=None, ts_names=None, timestamps=None, document_ids=None, filter_type='and')

Gets the IV Curve Report IDs based on the provided filters.

Parameters:

  • ids

    (list[int] | None, default: None ) –

    List of IV Curve Report IDs to filter the query.

    If None, does not filter by IV Curve Report ID.

  • ts_names

    (list[str] | None, default: None ) –

    List of transformer station names to filter the query.

    If None, does not filter by transformer station name.

  • timestamps

    (list[datetime] | None, default: None ) –

    List of timestamps to filter the query.

    If None, does not filter by timestamp.

  • document_ids

    (list[int] | None, default: None ) –

    List of document ids to filter the query.

    If None, does not filter by document id.

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"].

    By default "and".

Returns:

  • list[int]

    List of IV Curve Report IDs matching the provided filters.

Source code in echo_postgres/ivcurve_reports.py
@validate_call
def get_ids(
    self,
    ids: list[int] | None = None,
    ts_names: list[str] | None = None,
    timestamps: list[datetime] | None = None,
    document_ids: list[int] | None = None,
    filter_type: Literal["and", "or"] = "and",
) -> list[int]:
    """Gets the IV Curve Report IDs based on the provided filters.

    Parameters
    ----------
    ids : list[int] | None
        List of IV Curve Report IDs to filter the query.

        If None, does not filter by IV Curve Report ID.
    ts_names : list[str] | None, optional
        List of transformer station names to filter the query.

        If None, does not filter by transformer station name.
    timestamps : list[datetime] | None, optional
        List of timestamps to filter the query.

        If None, does not filter by timestamp.
    document_ids : list[int] | None, optional
        List of document ids to filter the query.

        If None, does not filter by document id.
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].

        By default "and".

    Returns
    -------
    list[int]
        List of IV Curve Report IDs matching the provided filters.
    """
    where_query = self._check_get_args(
        ids=ids,
        ts_names=ts_names,
        timestamps=timestamps,
        document_ids=document_ids,
        filter_type=filter_type,
    )

    query = sql.SQL(
        "SELECT id FROM performance.iv_curve_reports {where_query}",
    ).format(where_query=where_query)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_polars(query)

    return df["id"].to_list() if len(df) != 0 else []

insert(document_id, ts_id, timestamp, irradiance, module_temperature, on_conflict='ignore')

Inserts or updates an IV curve information.

Parameters:

  • document_id

    (int) –

    ID of the document associated with the IV curve report.

  • ts_id

    (int) –

    ID of the transformer station.

  • timestamp

    (datetime) –

    Timestamp of the IV curve report.

  • irradiance

    (float) –

    Irradiance value in W/m².

  • module_temperature

    (float) –

    Module temperature in °C.

  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"

Returns:

  • int | None

    ID of the inserted or updated IV Curve Report. Will be None if on_conflict is "ignore" and a conflict occurs.

Source code in echo_postgres/ivcurve_reports.py
@validate_call
def insert(
    self,
    document_id: int,
    ts_id: int,
    timestamp: datetime,
    irradiance: float,
    module_temperature: float,
    on_conflict: Literal["ignore", "update"] = "ignore",
) -> int | None:
    """Inserts or updates an IV curve information.

    Parameters
    ----------
    document_id : int
        ID of the document associated with the IV curve report.
    ts_id : int
        ID of the transformer station.
    timestamp : datetime
        Timestamp of the IV curve report.
    irradiance : float
        Irradiance value in W/m².
    module_temperature : float
        Module temperature in °C.
    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict. Can be one of ["ignore", "update"].
        By default "ignore"

    Returns
    -------
    int | None
        ID of the inserted or updated IV Curve Report. Will be None if on_conflict is "ignore" and a conflict occurs.
    """
    # defining the query
    query = sql.SQL(
        """
        INSERT INTO performance.iv_curve_reports (document_id, ts_id, timestamp, irradiance, module_temperature)
        VALUES ({document_id}, {ts_id}, {timestamp}, {irradiance}, {module_temperature})
        ON CONFLICT (document_id, ts_id, timestamp) DO {conflict_action}
        RETURNING id
        """,
    ).format(
        document_id=sql.Literal(document_id),
        ts_id=sql.Literal(ts_id),
        timestamp=sql.Literal(timestamp.isoformat()),
        irradiance=sql.Literal(irradiance),
        module_temperature=sql.Literal(module_temperature),
        conflict_action=sql.SQL(
            "NOTHING"
            if on_conflict == "ignore"
            else "UPDATE SET irradiance = EXCLUDED.irradiance, module_temperature = EXCLUDED.module_temperature",
        ),
    )

    # inserting the data
    with self._perfdb.conn.reconnect() as conn:
        result = conn.execute(query)
        row = result.fetchone()
        iv_curve_report_id = row[0] if row else None

    logger.debug(
        f"IV Curve info from '{timestamp}' object '{ts_id}' inserted/updated in the database - New ID {iv_curve_report_id}",
    )

    return iv_curve_report_id

insert_by_pdf(pdf_file_path, document_name, insert_details=False, excel_file_path=None, on_conflict='ignore')

Inserts IV Curve Info data from a PDF file into the database.

Parameters:

  • pdf_file_path

    (Path | BytesIO) –

    In case of Path, path to the PDF file containing IV Curve Info data.

    In case of BytesIO, BytesIO object containing the PDF data. Can be used to avoid saving the PDF to disk.

  • document_name

    (str) –

    Name to be assigned to the document in the database. Must end with ".pdf".

  • insert_details

    (bool, default: False ) –

    Whether to also extract and insert IV Curve Details data from the PDF.

    By default False.

  • excel_file_path

    (Path | BytesIO | None, default: None ) –

    Excel file with the iv curve array data as exported by Huawei software.

    Only applicable if insert_details is True.

  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict when inserting the IV Curve Report. Can be one of ["ignore", "update"]. By default "ignore"

Returns:

  • int | None

    ID of the inserted IV Curve Report. Will be None if on_conflict is "ignore

Source code in echo_postgres/ivcurve_reports.py
@validate_call
def insert_by_pdf(
    self,
    pdf_file_path: Path | BytesIO,
    document_name: str,
    insert_details: bool = False,
    excel_file_path: Path | BytesIO | None = None,
    on_conflict: Literal["ignore", "update"] = "ignore",
) -> int | None:
    """Inserts IV Curve Info data from a PDF file into the database.

    Parameters
    ----------
    pdf_file_path : Path | BytesIO
        In case of Path, path to the PDF file containing IV Curve Info data.

        In case of BytesIO, BytesIO object containing the PDF data. Can be used to avoid saving the PDF to disk.
    document_name : str
        Name to be assigned to the document in the database. Must end with ".pdf".
    insert_details : bool, optional
        Whether to also extract and insert IV Curve Details data from the PDF.

        By default False.
    excel_file_path : Path | BytesIO | None, optional
        Excel file with the iv curve array data as exported by Huawei software.

        Only applicable if insert_details is True.
    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict when inserting the IV Curve Report. Can be one of ["ignore", "update"].
        By default "ignore"

    Returns
    -------
    int | None
        ID of the inserted IV Curve Report. Will be None if on_conflict is "ignore
    """
    # checking document_name ends with .pdf
    if not document_name.lower().endswith(".pdf"):
        raise ValueError("The document_name must end with '.pdf'")

    new_id = None
    try:
        # converting to BytesIO if Path
        if isinstance(pdf_file_path, Path):
            with pdf_file_path.open("rb") as f:
                pdf_file_path = BytesIO(f.read())

        # reading the pdf
        iv_curve_info = self._read_pdf(
            pdf_file_path=pdf_file_path,
            get_details=insert_details,
            excel_file_path=excel_file_path if insert_details else None,
        )

        # checking if the report already exists
        existing_reports = self.get(
            ts_names=[iv_curve_info["ts_name"]],
            timestamps=[iv_curve_info["timestamp"]],
            output_type="pl.DataFrame",
        )

        # in case it already exists, lets skip inserting
        if not existing_reports.is_empty():
            # getting current id
            doc_id = existing_reports["document_id"][0]
            logger.info(
                f"IV Curve Report for TS {iv_curve_info['ts_name']} at timestamp {iv_curve_info['timestamp']} already exists in the database with ID {doc_id}. Skipping insertion.",
            )
        else:
            # inserting the pdf
            doc_id = self._perfdb.documents.instances.insert(
                document_path=pdf_file_path.getvalue(),
                document_type="IV Curve Report",
                document_name=document_name,
                document_date=iv_curve_info["timestamp"].to_pydatetime(),
            )

        iv_curve_info["document_id"] = doc_id

        # get the dict to insert without details
        iv_curve_info_no_details = {k: v for k, v in iv_curve_info.items() if k not in {"details", "ts_name"}}

        new_id = self.insert(**iv_curve_info_no_details, on_conflict=on_conflict)

        # inserting details if requested
        # if new_id is None, it means the insert was ignored due to conflict
        if insert_details and "details" in iv_curve_info and new_id is not None:
            # creating polars DataFrame to bulk insert
            details_df = pl.DataFrame(
                iv_curve_info["details"],
                schema={
                    "inverter_name": pl.Utf8,
                    "string_number": pl.Int64,
                    "v_oc": pl.Float64,
                    "i_sc": pl.Float64,
                    "source_diagnostic": pl.Utf8,
                    "iv_curve_array": pl.List(pl.List(pl.Float64)),
                },
            )
            # adding iv_curve_report_id column
            details_df = details_df.with_columns(
                pl.lit(new_id).cast(pl.Int64).alias("iv_curve_report_id"),
            )

            self.details.insert(data_df=details_df, on_conflict=on_conflict)

    except Exception as e:
        # deleting the report in case of error
        if new_id is not None:
            self.delete(ids=[new_id])
        logger.exception("Error while inserting IV Curve Report from PDF. The transaction has been rolled back.")
        raise e

    return new_id