CCEE PLD¶
CceePld(perfdb)
¶
Class used for handling CCEE PLD (Energy Spot Price). Can be accessed via perfdb.ccee.pld.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(period, submarkets=None)
¶
Deletes PLD values from the database.
Parameters:
-
(period¶DateTimeRange) –Period for which to delete the PLD values.
-
(submarkets¶list[str], default:None) –List of submarkets to delete the PLD values from. Can be one or more of ["NE", "N", "SE", "S"]. By default None, which means all submarkets.
Source code in echo_postgres/ccee_pld.py
@validate_call
def delete(
self,
period: DateTimeRange,
submarkets: list[str] | None = None,
) -> None:
"""Deletes PLD values from the database.
Parameters
----------
period : DateTimeRange
Period for which to delete the PLD values.
submarkets : list[str], optional
List of submarkets to delete the PLD values from. Can be one or more of ["NE", "N", "SE", "S"].
By default None, which means all submarkets.
"""
# getting all the possible submarkets
submarket_ids = self._perfdb.ccee.submarkets.get_ids()
allowed_submarkets = list(submarket_ids.keys())
# getting the submarket ids
if submarkets is not None:
if any(submarket not in allowed_submarkets for submarket in allowed_submarkets):
wrong_submarkets = [submarket for submarket in submarkets if submarket not in submarkets]
raise ValueError(f"Following submarkets are registered in the database: {wrong_submarkets}")
submarket_ids = {submarket: submarket_ids[submarket] for submarket in submarkets}
# defining the query
query = [
sql.SQL("DELETE FROM performance.ccee_pld"),
sql.SQL(" WHERE timestamp >= {start} AND timestamp <= {end}").format(
start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
if submarkets is not None:
query.append(
sql.SQL(" AND submarket_id IN ({submarket_ids})").format(
submarket_ids=sql.SQL(",").join(map(sql.Literal, submarket_ids.values())),
),
)
query = sql.Composed(query)
# deleting from the database
with self._perfdb.conn.reconnect() as conn:
# deleting
result = conn.execute(query)
logger.debug(f"Deleted {result.rowcount} rows from performance.ccee_pld table")
get(period, submarkets=None, resolution='hourly')
¶
Gets PLD values from the database.
Parameters:
-
(period¶DateTimeRange) –Period for which to get the PLD values.
-
(submarkets¶list[str], default:None) –List of submarkets to get the PLD values from. Can be one or more of ["NE", "N", "SE", "S"]. By default None, which means all submarkets.
-
(resolution¶Literal['hourly', 'daily', 'weekly', 'monthly', 'yearly'], default:'hourly') –Resolution of the data to be returned. Can be one of ["hourly", "daily", "weekly", "monthly", "yearly"]. By default "hourly"
Returns:
-
DataFrame–DataFrame with the PLD values. It will have the columns:
- submarket (NE, N, SE, S)
- timestamp (datetime with the desired frequency)
- value (float), which represents the average PLD value for the period
The index can be ignored.
Source code in echo_postgres/ccee_pld.py
@validate_call
def get(
self,
period: DateTimeRange,
submarkets: list[str] | None = None,
resolution: Literal["hourly", "daily", "weekly", "monthly", "yearly"] = "hourly",
) -> DataFrame:
"""Gets PLD values from the database.
Parameters
----------
period : DateTimeRange
Period for which to get the PLD values.
submarkets : list[str], optional
List of submarkets to get the PLD values from. Can be one or more of ["NE", "N", "SE", "S"].
By default None, which means all submarkets.
resolution : Literal["hourly", "daily", "weekly", "monthly", "yearly"], optional
Resolution of the data to be returned. Can be one of ["hourly", "daily", "weekly", "monthly", "yearly"].
By default "hourly"
Returns
-------
DataFrame
DataFrame with the PLD values. It will have the columns:
- submarket (NE, N, SE, S)
- timestamp (datetime with the desired frequency)
- value (float), which represents the average PLD value for the period
The index can be ignored.
"""
# getting all the possible submarkets
submarket_ids = self._perfdb.ccee.submarkets.get_ids()
allowed_submarkets = list(submarket_ids.keys())
# getting the submarket ids
if submarkets is not None:
if any(submarket not in allowed_submarkets for submarket in allowed_submarkets):
wrong_submarkets = [submarket for submarket in submarkets if submarket not in submarkets]
raise ValueError(f"Following submarkets are registered in the database: {wrong_submarkets}")
submarket_ids = {submarket: submarket_ids[submarket] for submarket in submarkets}
# defining the table
table = f"v_ccee_pld_{resolution}"
# defining the query
query = [
sql.SQL("SELECT * FROM {table}").format(
table=sql.Identifier(table),
),
sql.SQL(" WHERE timestamp >= {start} AND timestamp <= {end}").format(
start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
if submarkets is not None:
query.append(
sql.SQL(" AND submarket_id IN ({submarket_ids})").format(
submarket_ids=sql.SQL(",").join(map(sql.Literal, submarket_ids.values())),
),
)
query = sql.Composed(query)
# getting the data
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query)
return df
insert(df, on_conflict='ignore')
¶
Inserts PLD values into the database.
Parameters:
-
(df¶DataFrame) –DataFrame with PLD values to be inserted. Must have the columns:
- submarket (NE, N, SE, S)
- timestamp (datetime with hourly frequency)
- value (float)
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"
Source code in echo_postgres/ccee_pld.py
@validate_call
def insert(
self,
df: DataFrame,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
"""Inserts PLD values into the database.
Parameters
----------
df : DataFrame
DataFrame with PLD values to be inserted. Must have the columns:
- submarket (NE, N, SE, S)
- timestamp (datetime with hourly frequency)
- value (float)
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. Can be one of ["ignore", "update"].
By default "ignore"
"""
# checking columns
required_cols = [
"submarket",
"timestamp",
"value",
]
if any(col not in df.columns for col in required_cols):
missing_cols = [col for col in required_cols if col not in df.columns]
raise ValueError(f"df is missing the following columns: {missing_cols}")
# making a copy of the DataFrame
df = df.copy()
# getting only the wanted columns
df = df[required_cols].copy()
# getting all the possible submarkets
submarket_ids = self._perfdb.ccee.submarkets.get_ids()
submarkets = list(submarket_ids.keys())
df_submarkets = df["submarket"].unique().tolist()
if any(submarket not in submarkets for submarket in df_submarkets):
wrong_submarkets = [submarket for submarket in df_submarkets if submarket not in submarkets]
raise ValueError(f"Following submarkets are registered in the database: {wrong_submarkets}")
# replacing submarket by its id
df["submarket_id"] = df["submarket"].map(submarket_ids)
df = df.drop(columns=["submarket"])
# casting timestamp to datetime to avoid errors
df["timestamp"] = df["timestamp"].astype("datetime64[s]")
# casting value to float to avoid errors
df["value"] = df["value"].astype("float")
# dropping duplicates
df = df.drop_duplicates()
# inserting
if_exists_mapping = {
"ignore": "append",
"update": "update",
}
with self._perfdb.conn.reconnect() as conn:
conn.pandas_to_sql(
df=df,
table_name="ccee_pld",
schema="performance",
if_exists=if_exists_mapping[on_conflict],
ignore_index=True,
)
logger.debug(f"Inserted {df.shape[0]} rows into ccee_pld table")