Skip to content

CCEE PLD

CceePld(perfdb)

Class used for handling CCEE PLD (Energy Spot Price). Can be accessed via perfdb.ccee.pld.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
Python
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(period, submarkets=None)

Deletes PLD values from the database.

Parameters:

  • period

    (DateTimeRange) –

    Period for which to delete the PLD values.

  • submarkets

    (list[str], default: None ) –

    List of submarkets to delete the PLD values from. Can be one or more of ["NE", "N", "SE", "S"]. By default None, which means all submarkets.

Source code in echo_postgres/ccee_pld.py
Python
@validate_call
def delete(
    self,
    period: DateTimeRange,
    submarkets: list[str] | None = None,
) -> None:
    """Deletes PLD values from the database.

    Parameters
    ----------
    period : DateTimeRange
        Period for which to delete the PLD values.
    submarkets : list[str], optional
        List of submarkets to delete the PLD values from. Can be one or more of ["NE", "N", "SE", "S"].
        By default None, which means all submarkets.
    """
    # getting all the possible submarkets
    submarket_ids = self._perfdb.ccee.submarkets.get_ids()
    allowed_submarkets = list(submarket_ids.keys())

    # getting the submarket ids
    if submarkets is not None:
        if any(submarket not in allowed_submarkets for submarket in allowed_submarkets):
            wrong_submarkets = [submarket for submarket in submarkets if submarket not in submarkets]
            raise ValueError(f"Following submarkets are registered in the database: {wrong_submarkets}")
        submarket_ids = {submarket: submarket_ids[submarket] for submarket in submarkets}

    # defining the query
    query = [
        sql.SQL("DELETE FROM performance.ccee_pld"),
        sql.SQL(" WHERE timestamp >= {start} AND timestamp <= {end}").format(
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]
    if submarkets is not None:
        query.append(
            sql.SQL(" AND submarket_id IN ({submarket_ids})").format(
                submarket_ids=sql.SQL(",").join(map(sql.Literal, submarket_ids.values())),
            ),
        )
    query = sql.Composed(query)

    # deleting from the database
    # deleting
    self._perfdb.conn.execute(query)

    logger.debug(f"Deleted {self._perfdb.conn.rowcount} rows from performance.ccee_pld table")

get(period, submarkets=None, resolution='hourly', output_type='DataFrame')

Gets PLD values from the database.

Parameters:

  • period

    (DateTimeRange) –

    Period for which to get the PLD values.

  • submarkets

    (list[str], default: None ) –

    List of submarkets to get the PLD values from. Can be one or more of ["NE", "N", "SE", "S"]. By default None, which means all submarkets.

  • resolution

    (Literal['hourly', 'daily', 'weekly', 'monthly', 'yearly'], default: 'hourly' ) –

    Resolution of the data to be returned. Can be one of ["hourly", "daily", "weekly", "monthly", "yearly"]. By default "hourly"

  • output_type

    (Literal['DataFrame', 'pl.DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["DataFrame", "pl.DataFrame"] By default "DataFrame"

Returns:

  • DataFrame

    In case output_type is "DataFrame", returns a pandas DataFrame with the PLD values.

  • DataFrame

    In case output_type is "pl.DataFrame", returns a Polars DataFrame with the PLD values.

    It will have the columns:

    • submarket (NE, N, SE, S)
    • timestamp (datetime with the desired frequency)
    • value (float), which represents the average PLD value for the period

    The index can be ignored.

Source code in echo_postgres/ccee_pld.py
Python
@validate_call
def get(
    self,
    period: DateTimeRange,
    submarkets: list[str] | None = None,
    resolution: Literal["hourly", "daily", "weekly", "monthly", "yearly"] = "hourly",
    output_type: Literal["DataFrame", "pl.DataFrame"] = "DataFrame",
) -> pd.DataFrame | pl.DataFrame:
    """Gets PLD values from the database.

    Parameters
    ----------
    period : DateTimeRange
        Period for which to get the PLD values.
    submarkets : list[str], optional
        List of submarkets to get the PLD values from. Can be one or more of ["NE", "N", "SE", "S"].
        By default None, which means all submarkets.
    resolution : Literal["hourly", "daily", "weekly", "monthly", "yearly"], optional
        Resolution of the data to be returned. Can be one of ["hourly", "daily", "weekly", "monthly", "yearly"].
        By default "hourly"
    output_type : Literal["DataFrame", "pl.DataFrame"], optional
        Output type of the data. Can be one of ["DataFrame", "pl.DataFrame"]
        By default "DataFrame"

    Returns
    -------
    pd.DataFrame
        In case output_type is "DataFrame", returns a pandas DataFrame with the PLD values.
    pl.DataFrame
        In case output_type is "pl.DataFrame", returns a Polars DataFrame with the PLD values.

        It will have the columns:

        - submarket (NE, N, SE, S)
        - timestamp (datetime with the desired frequency)
        - value (float), which represents the average PLD value for the period

        The index can be ignored.
    """
    # getting all the possible submarkets
    submarket_ids = self._perfdb.ccee.submarkets.get_ids()
    allowed_submarkets = list(submarket_ids.keys())

    # getting the submarket ids
    if submarkets is not None:
        if any(submarket not in allowed_submarkets for submarket in allowed_submarkets):
            wrong_submarkets = [submarket for submarket in submarkets if submarket not in submarkets]
            raise ValueError(f"Following submarkets are registered in the database: {wrong_submarkets}")
        submarket_ids = {submarket: submarket_ids[submarket] for submarket in submarkets}

    # defining the table
    table = f"v_ccee_pld_{resolution}"

    # defining the query
    query = [
        sql.SQL("SELECT * FROM {table}").format(
            table=sql.Identifier(table),
        ),
        sql.SQL(" WHERE timestamp >= {start} AND timestamp <= {end}").format(
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]
    if submarkets is not None:
        query.append(
            sql.SQL(" AND submarket_id IN ({submarket_ids})").format(
                submarket_ids=sql.SQL(",").join(map(sql.Literal, submarket_ids.values())),
            ),
        )
    query = sql.Composed(query)

    # getting the data
    df = self._perfdb.conn.read_to_polars(query)

    if output_type == "pl.DataFrame":
        return df

    return df.to_pandas(use_pyarrow_extension_array=True)

insert(df, on_conflict='ignore')

Inserts PLD values into the database.

Parameters:

  • df

    (DataFrame | DataFrame) –

    DataFrame with PLD values to be inserted. Must have the columns:

    • submarket (NE, N, SE, S)
    • timestamp (datetime with hourly frequency)
    • value (float)
  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"

Source code in echo_postgres/ccee_pld.py
Python
@validate_call
def insert(
    self,
    df: pd.DataFrame | pl.DataFrame,
    on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
    """Inserts PLD values into the database.

    Parameters
    ----------
    df : pd.DataFrame | pl.DataFrame
        DataFrame with PLD values to be inserted. Must have the columns:

        - submarket (NE, N, SE, S)
        - timestamp (datetime with hourly frequency)
        - value (float)

    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict. Can be one of ["ignore", "update"].
        By default "ignore"
    """
    # converting pd.DataFrame to pl.DataFrame if necessary
    if isinstance(df, pd.DataFrame):
        df = pl.from_pandas(df)

    # checking columns
    required_cols = [
        "submarket",
        "timestamp",
        "value",
    ]

    if any(col not in df.columns for col in required_cols):
        missing_cols = [col for col in required_cols if col not in df.columns]
        raise ValueError(f"df is missing the following columns: {missing_cols}")

    # making a copy of the DataFrame
    df = df.clone()

    # getting only the wanted columns
    df = df[required_cols].clone()

    # getting all the possible submarkets
    submarket_ids = self._perfdb.ccee.submarkets.get_ids()
    submarkets = list(submarket_ids.keys())
    df_submarkets = df["submarket"].unique().to_list()
    if any(submarket not in submarkets for submarket in df_submarkets):
        wrong_submarkets = [submarket for submarket in df_submarkets if submarket not in submarkets]
        raise ValueError(f"Following submarkets are registered in the database: {wrong_submarkets}")

    # replacing submarket by its id
    df = df.with_columns(pl.col("submarket").replace_strict(submarket_ids).alias("submarket_id"))
    df = df.drop(["submarket"])

    # casting timestamp to datetime to avoid errors
    df = df.with_columns(pl.col("timestamp").cast(pl.Datetime("ms")))  # assuming timestamp is in milliseconds, adjust if necessary

    # casting value to float to avoid errors
    df = df.with_columns(pl.col("value").cast(pl.Float64))

    # dropping duplicates
    df = df.unique()

    # inserting
    if_exists_mapping = {
        "ignore": "append",
        "update": "update",
    }
    self._perfdb.conn.polars_to_sql(
        df=df,
        table_name="ccee_pld",
        schema="performance",
        if_exists=if_exists_mapping[on_conflict],
    )

    logger.debug(f"Inserted {df.shape[0]} rows into ccee_pld table")