Skip to content

CCEE PLD

CceePld(perfdb)

Class used for handling CCEE PLD (Energy Spot Price). Can be accessed via perfdb.ccee.pld.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(period, submarkets=None)

Deletes PLD values from the database.

Parameters:

  • period

    (DateTimeRange) –

    Period for which to delete the PLD values.

  • submarkets

    (list[str], default: None ) –

    List of submarkets to delete the PLD values from. Can be one or more of ["NE", "N", "SE", "S"]. By default None, which means all submarkets.

Source code in echo_postgres/ccee_pld.py
@validate_call
def delete(
    self,
    period: DateTimeRange,
    submarkets: list[str] | None = None,
) -> None:
    """Deletes PLD values from the database.

    Parameters
    ----------
    period : DateTimeRange
        Period for which to delete the PLD values.
    submarkets : list[str], optional
        List of submarkets to delete the PLD values from. Can be one or more of ["NE", "N", "SE", "S"].
        By default None, which means all submarkets.
    """
    # getting all the possible submarkets
    submarket_ids = self._perfdb.ccee.submarkets.get_ids()
    allowed_submarkets = list(submarket_ids.keys())

    # getting the submarket ids
    if submarkets is not None:
        if any(submarket not in allowed_submarkets for submarket in allowed_submarkets):
            wrong_submarkets = [submarket for submarket in submarkets if submarket not in submarkets]
            raise ValueError(f"Following submarkets are registered in the database: {wrong_submarkets}")
        submarket_ids = {submarket: submarket_ids[submarket] for submarket in submarkets}

    # defining the query
    query = [
        sql.SQL("DELETE FROM performance.ccee_pld"),
        sql.SQL(" WHERE timestamp >= {start} AND timestamp <= {end}").format(
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]
    if submarkets is not None:
        query.append(
            sql.SQL(" AND submarket_id IN ({submarket_ids})").format(
                submarket_ids=sql.SQL(",").join(map(sql.Literal, submarket_ids.values())),
            ),
        )
    query = sql.Composed(query)

    # deleting from the database
    with self._perfdb.conn.reconnect() as conn:
        # deleting
        result = conn.execute(query)

    logger.debug(f"Deleted {result.rowcount} rows from performance.ccee_pld table")

get(period, submarkets=None, resolution='hourly')

Gets PLD values from the database.

Parameters:

  • period

    (DateTimeRange) –

    Period for which to get the PLD values.

  • submarkets

    (list[str], default: None ) –

    List of submarkets to get the PLD values from. Can be one or more of ["NE", "N", "SE", "S"]. By default None, which means all submarkets.

  • resolution

    (Literal['hourly', 'daily', 'weekly', 'monthly', 'yearly'], default: 'hourly' ) –

    Resolution of the data to be returned. Can be one of ["hourly", "daily", "weekly", "monthly", "yearly"]. By default "hourly"

Returns:

  • DataFrame

    DataFrame with the PLD values. It will have the columns:

    • submarket (NE, N, SE, S)
    • timestamp (datetime with the desired frequency)
    • value (float), which represents the average PLD value for the period

    The index can be ignored.

Source code in echo_postgres/ccee_pld.py
@validate_call
def get(
    self,
    period: DateTimeRange,
    submarkets: list[str] | None = None,
    resolution: Literal["hourly", "daily", "weekly", "monthly", "yearly"] = "hourly",
) -> DataFrame:
    """Gets PLD values from the database.

    Parameters
    ----------
    period : DateTimeRange
        Period for which to get the PLD values.
    submarkets : list[str], optional
        List of submarkets to get the PLD values from. Can be one or more of ["NE", "N", "SE", "S"].
        By default None, which means all submarkets.
    resolution : Literal["hourly", "daily", "weekly", "monthly", "yearly"], optional
        Resolution of the data to be returned. Can be one of ["hourly", "daily", "weekly", "monthly", "yearly"].
        By default "hourly"

    Returns
    -------
    DataFrame
        DataFrame with the PLD values. It will have the columns:

        - submarket (NE, N, SE, S)
        - timestamp (datetime with the desired frequency)
        - value (float), which represents the average PLD value for the period

        The index can be ignored.
    """
    # getting all the possible submarkets
    submarket_ids = self._perfdb.ccee.submarkets.get_ids()
    allowed_submarkets = list(submarket_ids.keys())

    # getting the submarket ids
    if submarkets is not None:
        if any(submarket not in allowed_submarkets for submarket in allowed_submarkets):
            wrong_submarkets = [submarket for submarket in submarkets if submarket not in submarkets]
            raise ValueError(f"Following submarkets are registered in the database: {wrong_submarkets}")
        submarket_ids = {submarket: submarket_ids[submarket] for submarket in submarkets}

    # defining the table
    table = f"v_ccee_pld_{resolution}"

    # defining the query
    query = [
        sql.SQL("SELECT * FROM {table}").format(
            table=sql.Identifier(table),
        ),
        sql.SQL(" WHERE timestamp >= {start} AND timestamp <= {end}").format(
            start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
            end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
        ),
    ]
    if submarkets is not None:
        query.append(
            sql.SQL(" AND submarket_id IN ({submarket_ids})").format(
                submarket_ids=sql.SQL(",").join(map(sql.Literal, submarket_ids.values())),
            ),
        )
    query = sql.Composed(query)

    # getting the data
    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query)

    return df

insert(df, on_conflict='ignore')

Inserts PLD values into the database.

Parameters:

  • df

    (DataFrame) –

    DataFrame with PLD values to be inserted. Must have the columns:

    • submarket (NE, N, SE, S)
    • timestamp (datetime with hourly frequency)
    • value (float)
  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"

Source code in echo_postgres/ccee_pld.py
@validate_call
def insert(
    self,
    df: DataFrame,
    on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
    """Inserts PLD values into the database.

    Parameters
    ----------
    df : DataFrame
        DataFrame with PLD values to be inserted. Must have the columns:

        - submarket (NE, N, SE, S)
        - timestamp (datetime with hourly frequency)
        - value (float)

    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict. Can be one of ["ignore", "update"].
        By default "ignore"
    """
    # checking columns
    required_cols = [
        "submarket",
        "timestamp",
        "value",
    ]

    if any(col not in df.columns for col in required_cols):
        missing_cols = [col for col in required_cols if col not in df.columns]
        raise ValueError(f"df is missing the following columns: {missing_cols}")

    # making a copy of the DataFrame
    df = df.copy()

    # getting only the wanted columns
    df = df[required_cols].copy()

    # getting all the possible submarkets
    submarket_ids = self._perfdb.ccee.submarkets.get_ids()
    submarkets = list(submarket_ids.keys())
    df_submarkets = df["submarket"].unique().tolist()
    if any(submarket not in submarkets for submarket in df_submarkets):
        wrong_submarkets = [submarket for submarket in df_submarkets if submarket not in submarkets]
        raise ValueError(f"Following submarkets are registered in the database: {wrong_submarkets}")

    # replacing submarket by its id
    df["submarket_id"] = df["submarket"].map(submarket_ids)
    df = df.drop(columns=["submarket"])

    # casting timestamp to datetime to avoid errors
    df["timestamp"] = df["timestamp"].astype("datetime64[s]")

    # casting value to float to avoid errors
    df["value"] = df["value"].astype("float")

    # dropping duplicates
    df = df.drop_duplicates()

    # inserting
    if_exists_mapping = {
        "ignore": "append",
        "update": "update",
    }
    with self._perfdb.conn.reconnect() as conn:
        conn.pandas_to_sql(
            df=df,
            table_name="ccee_pld",
            schema="performance",
            if_exists=if_exists_mapping[on_conflict],
            ignore_index=True,
        )

    logger.debug(f"Inserted {df.shape[0]} rows into ccee_pld table")