CCEE PLD¶
CceePld(perfdb)
¶
Class used for handling CCEE PLD (Energy Spot Price). Can be accessed via perfdb.ccee.pld.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(period, submarkets=None)
¶
Deletes PLD values from the database.
Parameters:
-
(period¶DateTimeRange) –Period for which to delete the PLD values.
-
(submarkets¶list[str], default:None) –List of submarkets to delete the PLD values from. Can be one or more of ["NE", "N", "SE", "S"]. By default None, which means all submarkets.
Source code in echo_postgres/ccee_pld.py
@validate_call
def delete(
self,
period: DateTimeRange,
submarkets: list[str] | None = None,
) -> None:
"""Deletes PLD values from the database.
Parameters
----------
period : DateTimeRange
Period for which to delete the PLD values.
submarkets : list[str], optional
List of submarkets to delete the PLD values from. Can be one or more of ["NE", "N", "SE", "S"].
By default None, which means all submarkets.
"""
# getting all the possible submarkets
submarket_ids = self._perfdb.ccee.submarkets.get_ids()
allowed_submarkets = list(submarket_ids.keys())
# getting the submarket ids
if submarkets is not None:
if any(submarket not in allowed_submarkets for submarket in allowed_submarkets):
wrong_submarkets = [submarket for submarket in submarkets if submarket not in submarkets]
raise ValueError(f"Following submarkets are registered in the database: {wrong_submarkets}")
submarket_ids = {submarket: submarket_ids[submarket] for submarket in submarkets}
# defining the query
query = [
sql.SQL("DELETE FROM performance.ccee_pld"),
sql.SQL(" WHERE timestamp >= {start} AND timestamp <= {end}").format(
start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
if submarkets is not None:
query.append(
sql.SQL(" AND submarket_id IN ({submarket_ids})").format(
submarket_ids=sql.SQL(",").join(map(sql.Literal, submarket_ids.values())),
),
)
query = sql.Composed(query)
# deleting from the database
# deleting
self._perfdb.conn.execute(query)
logger.debug(f"Deleted {self._perfdb.conn.rowcount} rows from performance.ccee_pld table")
get(period, submarkets=None, resolution='hourly', output_type='DataFrame')
¶
Gets PLD values from the database.
Parameters:
-
(period¶DateTimeRange) –Period for which to get the PLD values.
-
(submarkets¶list[str], default:None) –List of submarkets to get the PLD values from. Can be one or more of ["NE", "N", "SE", "S"]. By default None, which means all submarkets.
-
(resolution¶Literal['hourly', 'daily', 'weekly', 'monthly', 'yearly'], default:'hourly') –Resolution of the data to be returned. Can be one of ["hourly", "daily", "weekly", "monthly", "yearly"]. By default "hourly"
-
(output_type¶Literal['DataFrame', 'pl.DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["DataFrame", "pl.DataFrame"] By default "DataFrame"
Returns:
-
DataFrame–In case output_type is "DataFrame", returns a pandas DataFrame with the PLD values.
-
DataFrame–In case output_type is "pl.DataFrame", returns a Polars DataFrame with the PLD values.
It will have the columns:
- submarket (NE, N, SE, S)
- timestamp (datetime with the desired frequency)
- value (float), which represents the average PLD value for the period
The index can be ignored.
Source code in echo_postgres/ccee_pld.py
@validate_call
def get(
self,
period: DateTimeRange,
submarkets: list[str] | None = None,
resolution: Literal["hourly", "daily", "weekly", "monthly", "yearly"] = "hourly",
output_type: Literal["DataFrame", "pl.DataFrame"] = "DataFrame",
) -> pd.DataFrame | pl.DataFrame:
"""Gets PLD values from the database.
Parameters
----------
period : DateTimeRange
Period for which to get the PLD values.
submarkets : list[str], optional
List of submarkets to get the PLD values from. Can be one or more of ["NE", "N", "SE", "S"].
By default None, which means all submarkets.
resolution : Literal["hourly", "daily", "weekly", "monthly", "yearly"], optional
Resolution of the data to be returned. Can be one of ["hourly", "daily", "weekly", "monthly", "yearly"].
By default "hourly"
output_type : Literal["DataFrame", "pl.DataFrame"], optional
Output type of the data. Can be one of ["DataFrame", "pl.DataFrame"]
By default "DataFrame"
Returns
-------
pd.DataFrame
In case output_type is "DataFrame", returns a pandas DataFrame with the PLD values.
pl.DataFrame
In case output_type is "pl.DataFrame", returns a Polars DataFrame with the PLD values.
It will have the columns:
- submarket (NE, N, SE, S)
- timestamp (datetime with the desired frequency)
- value (float), which represents the average PLD value for the period
The index can be ignored.
"""
# getting all the possible submarkets
submarket_ids = self._perfdb.ccee.submarkets.get_ids()
allowed_submarkets = list(submarket_ids.keys())
# getting the submarket ids
if submarkets is not None:
if any(submarket not in allowed_submarkets for submarket in allowed_submarkets):
wrong_submarkets = [submarket for submarket in submarkets if submarket not in submarkets]
raise ValueError(f"Following submarkets are registered in the database: {wrong_submarkets}")
submarket_ids = {submarket: submarket_ids[submarket] for submarket in submarkets}
# defining the table
table = f"v_ccee_pld_{resolution}"
# defining the query
query = [
sql.SQL("SELECT * FROM {table}").format(
table=sql.Identifier(table),
),
sql.SQL(" WHERE timestamp >= {start} AND timestamp <= {end}").format(
start=sql.Literal(f"{period.start:%Y-%m-%d %H:%M:%S}"),
end=sql.Literal(f"{period.end:%Y-%m-%d %H:%M:%S}"),
),
]
if submarkets is not None:
query.append(
sql.SQL(" AND submarket_id IN ({submarket_ids})").format(
submarket_ids=sql.SQL(",").join(map(sql.Literal, submarket_ids.values())),
),
)
query = sql.Composed(query)
# getting the data
df = self._perfdb.conn.read_to_polars(query)
if output_type == "pl.DataFrame":
return df
return df.to_pandas(use_pyarrow_extension_array=True)
insert(df, on_conflict='ignore')
¶
Inserts PLD values into the database.
Parameters:
-
(df¶DataFrame | DataFrame) –DataFrame with PLD values to be inserted. Must have the columns:
- submarket (NE, N, SE, S)
- timestamp (datetime with hourly frequency)
- value (float)
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"
Source code in echo_postgres/ccee_pld.py
@validate_call
def insert(
self,
df: pd.DataFrame | pl.DataFrame,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
"""Inserts PLD values into the database.
Parameters
----------
df : pd.DataFrame | pl.DataFrame
DataFrame with PLD values to be inserted. Must have the columns:
- submarket (NE, N, SE, S)
- timestamp (datetime with hourly frequency)
- value (float)
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. Can be one of ["ignore", "update"].
By default "ignore"
"""
# converting pd.DataFrame to pl.DataFrame if necessary
if isinstance(df, pd.DataFrame):
df = pl.from_pandas(df)
# checking columns
required_cols = [
"submarket",
"timestamp",
"value",
]
if any(col not in df.columns for col in required_cols):
missing_cols = [col for col in required_cols if col not in df.columns]
raise ValueError(f"df is missing the following columns: {missing_cols}")
# making a copy of the DataFrame
df = df.clone()
# getting only the wanted columns
df = df[required_cols].clone()
# getting all the possible submarkets
submarket_ids = self._perfdb.ccee.submarkets.get_ids()
submarkets = list(submarket_ids.keys())
df_submarkets = df["submarket"].unique().to_list()
if any(submarket not in submarkets for submarket in df_submarkets):
wrong_submarkets = [submarket for submarket in df_submarkets if submarket not in submarkets]
raise ValueError(f"Following submarkets are registered in the database: {wrong_submarkets}")
# replacing submarket by its id
df = df.with_columns(pl.col("submarket").replace_strict(submarket_ids).alias("submarket_id"))
df = df.drop(["submarket"])
# casting timestamp to datetime to avoid errors
df = df.with_columns(pl.col("timestamp").cast(pl.Datetime("ms"))) # assuming timestamp is in milliseconds, adjust if necessary
# casting value to float to avoid errors
df = df.with_columns(pl.col("value").cast(pl.Float64))
# dropping duplicates
df = df.unique()
# inserting
if_exists_mapping = {
"ignore": "append",
"update": "update",
}
self._perfdb.conn.polars_to_sql(
df=df,
table_name="ccee_pld",
schema="performance",
if_exists=if_exists_mapping[on_conflict],
)
logger.debug(f"Inserted {df.shape[0]} rows into ccee_pld table")