CCEE PPA Definitions¶
CceePpaDefinitions(perfdb)
¶
Class used for handling CCEE PPA definitions. Can be accessed via perfdb.ccee.ppa.definitions.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
get(spes=None, market_types=None, filter_type='and', output_type='DataFrame')
¶
Gets PPA definitions from the database.
The definitions do not contain adjusted values. The values presented here are the base when the PPA was signed.
Them main keys/columns returned are:
- spe_name
- submarket_acronym
- market_type_name
- start
- end
- price
- base_date
- price_adjustment_month
- sold_mw_avg
Parameters:
-
(spes¶list[str], default:None) –List of SPES to get the PPA definitions from. By default None, which means all SPES.
-
(market_types¶list[str], default:None) –List of market types (Free or Regulated) to get the PPA definitions from. By default None, which means all market types.
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(output_type¶Literal['dict', 'DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "DataFrame"
Returns:
-
DataFrame–In case output_type = 'DataFrame', returns a DataFrame with the PPA definitions. The index will be a Multindex with the levels: spe_name and start. Columns will have the other attributes.
-
dict[str, dict[datetime, Any]]–In case output_type = 'dict', returns a dictionary with the PPA definitions. The structure is {spe_name: {start: {attribute: value, ...}, ...}, ...}
Source code in echo_postgres/ccee_ppa_definitions.py
@validate_call
def get(
self,
spes: list[str] | None = None,
market_types: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, dict[datetime, Any]]:
"""Gets PPA definitions from the database.
The definitions do not contain adjusted values. The values presented here are the base when the PPA was signed.
Them main keys/columns returned are:
- spe_name
- submarket_acronym
- market_type_name
- start
- end
- price
- base_date
- price_adjustment_month
- sold_mw_avg
Parameters
----------
spes : list[str], optional
List of SPES to get the PPA definitions from. By default None, which means all SPES.
market_types : list[str], optional
List of market types (Free or Regulated) to get the PPA definitions from. By default None, which means all market types.
filter_type : Literal["and", "or"]
How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "DataFrame"
Returns
-------
DataFrame
In case output_type = 'DataFrame', returns a DataFrame with the PPA definitions. The index will be a Multindex with the levels: spe_name and start. Columns will have the other attributes.
dict[str, dict[datetime, Any]]
In case output_type = 'dict', returns a dictionary with the PPA definitions. The structure is {spe_name: {start: {attribute: value, ...}, ...}, ...}
"""
# checking inputs
if filter_type not in ["and", "or"]:
raise ValueError(f"filter_type must be 'and' or 'or', got {filter_type}")
if output_type not in ["dict", "DataFrame"]:
raise ValueError(f"output_type must be 'dict' or 'DataFrame', got {output_type}")
if not isinstance(spes, list | type(None)):
raise TypeError(f"spes must be a list of str, got {type(spes)}")
if not isinstance(market_types, list | type(None)):
raise TypeError(f"market_types must be a list of str, got {type(market_types)}")
# defining the query
query = [sql.SQL("SELECT * FROM v_ccee_ppa_def")]
where_query = []
if spes:
where_query.append(sql.SQL("spe_name IN ({spes})").format(spes=sql.SQL(", ").join(map(sql.Literal, spes))))
if market_types:
where_query.append(
sql.SQL("market_type_name IN ({market_types})").format(market_types=sql.SQL(", ").join(map(sql.Literal, market_types))),
)
if where_query:
where_query = sql.SQL(" WHERE ") + sql.SQL(f" {filter_type.upper()} ").join(where_query)
query.append(where_query)
query = sql.Composed(query)
# getting the data
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query)
# creating index
df = df.set_index(["spe_name", "start"])
if output_type == "DataFrame":
return df
result = df.to_dict(orient="index")
# converting from format {(spe_name, start): {attribute: value, ...}, ...} to {spe_name: {start: {attribute: value, ...}, ...}, ...}
final_result = {}
for (spe_name, start), values in result.items():
if spe_name not in final_result:
final_result[spe_name] = {}
final_result[spe_name][start] = values
return final_result