Calculation Model Instances - Files Values¶
CalcModelInstanceFileValues(perfdb)
¶
Class used for handling calculation model instance file values. Can be accessed via perfdb.calcmodels.instances.files.values.
Parameters:
Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
perfdb : PerfDB
Top level object carrying all functionality and the connection handler.
"""
self._perfdb: e_pg.PerfDB = perfdb
delete(object_names=None, calcmodels=None, file_names=None)
¶
Deletes the file values from the database.
If all objects associated are specified the file is deleted, otherwise only the connection is deleted.
Parameters:
-
(object_names¶list[str] | Literal['NULL'] | None, default:None) –List of object names related to the file. It will be handled in the following way:
- If None, no filter is applied (the file will be deleted regardless of the objects).
- If "NULL", only files without connected objects will be deleted.
- If a list, will first try to delete the connections and then the file itself (if all connections are deleted).
-
(calcmodels¶list[str] | None, default:None) –Name of the calculation models of the files. If None, will not filter by calculation model name. By default None
-
(file_names¶list[str] | None, default:None) –Names of the files to delete. If None, will not filter by file name. By default None
Source code in echo_postgres/calcmodel_instance_file_values.py
@validate_call
def delete(
self,
object_names: list[str] | Literal["NULL"] | None = None,
calcmodels: list[str] | None = None,
file_names: list[str] | None = None,
) -> None:
"""Deletes the file values from the database.
If all objects associated are specified the file is deleted, otherwise only the connection is deleted.
Parameters
----------
object_names : list[str] | Literal["NULL"] | None, optional
List of object names related to the file. It will be handled in the following way:
- If None, no filter is applied (the file will be deleted regardless of the objects).
- If "NULL", only files without connected objects will be deleted.
- If a list, will first try to delete the connections and then the file itself (if all connections are deleted).
calcmodels : list[str] | None, optional
Name of the calculation models of the files. If None, will not filter by calculation model name.
By default None
file_names : list[str] | None, optional
Names of the files to delete. If None, will not filter by file name.
By default None
"""
# checking inputs
self._check_get_args(
object_names=object_names,
calcmodel_types=None,
calcmodels=calcmodels,
file_names=file_names,
filter_type="and",
model_as_regex=False,
)
# getting all file to object connections that match the desired criteria
file_connections = self.get_connected_objects(calcmodels=calcmodels, file_names=file_names)
# getting object_ids
if object_names and object_names != "NULL":
obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
if len(obj_ids) != len(object_names):
missing_objs = set(object_names) - set(obj_ids)
logger.warning(f"Could not find the following objects: {missing_objs}")
# getting the file ids that match the filters (answer from previous query might include more than what we want)
# it will be a dict in the format {calc_model: {file_name: {file_id: {"action": "delete" | "disconnect", "object_ids": [obj_id, ...]}, ...}, ...}, ...}
wanted_file_ids = {}
for calc_model, files in file_connections.items():
if calcmodels and calc_model not in calcmodels:
continue
for file_name, file_ids in files.items():
if file_names and file_name not in file_names:
continue
for file_id, file_vals in file_ids.items():
# no filters on objects, deleting the file
if object_names is None or (
object_names == "NULL" and (not file_vals["object_ids"] or file_vals["object_ids"] == [None])
):
wanted_file_ids.setdefault(calc_model, {}).setdefault(file_name, {})[file_id] = {
"action": "delete",
"object_ids": [],
}
# all objects are in the filter, deleting the file
elif set(file_vals["object_names"]).issubset(set(object_names)):
wanted_file_ids.setdefault(calc_model, {}).setdefault(file_name, {})[file_id] = {
"action": "delete",
"object_ids": file_vals["object_ids"] if file_vals["object_ids"] != [None] else [],
}
# some objects are not in the filter, disconnecting the wanted objects from the file
elif set(file_vals["object_names"]).intersection(set(object_names)):
disconnect_ids = [obj_ids[obj] for obj in file_vals["object_names"] if obj in object_names]
wanted_file_ids.setdefault(calc_model, {}).setdefault(file_name, {})[file_id] = {
"action": "disconnect",
"object_ids": disconnect_ids,
}
# no objects are in the filter, ignoring the file
else:
continue
# deleting the files
for calc_model, files in wanted_file_ids.items():
for file_name, file_ids in files.items():
for file_id, file_vals in file_ids.items():
# deleting the file
if file_vals["action"] == "delete":
# disconnecting the objects
query = sql.SQL(
"DELETE FROM performance.calculation_model_files_data_object_connections WHERE file_id = {file_id}",
).format(
file_id=sql.Literal(file_id),
)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
# deleting the file
query = sql.SQL("DELETE FROM performance.calculation_model_files_data WHERE file_id = {file_id}").format(
file_id=sql.Literal(file_id),
)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
logger.debug(f"File '{file_name}' of '{calc_model}' with file_id '{file_id}' deleted successfully")
# disconnecting the objects
elif file_vals["action"] == "disconnect":
query = sql.SQL(
"DELETE FROM performance.calculation_model_files_data_object_connections WHERE file_id = {file_id} AND object_id IN ({obj_ids})",
).format(
file_id=sql.Literal(file_id),
obj_ids=sql.SQL(", ").join(map(sql.Literal, file_vals["object_ids"])),
)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
logger.debug(
f"File '{file_name}' of '{calc_model}' with file_id '{file_id}' disconnected from objects {file_vals['object_ids']}",
)
else:
raise ValueError(
f"Invalid action '{file_vals['action']}' for file '{file_name}' of '{calc_model}' with file_id '{file_id}'",
)
get(object_names=None, calcmodel_types=None, calcmodels=None, file_names=None, filter_type='and', model_as_regex=False, save_dir=None, skip_value=False, output_type='DataFrame')
¶
Gets all calculation model instance file values with detailed information.
The most useful keys/columns returned are:
- file_id
- value (if skip_value is False)
- modified_date
Parameters:
-
(object_names¶list[str] | Literal['NULL'] | None, default:None) –Names of the objects to filter the results. If None, no filter is applied. If "NULL", will filter the results where the object_name is NULL. By default None
-
(calcmodel_types¶list[str] | None, default:None) –Names of the calculation model types to filter the results. If None, no filter is applied. By default None
-
(calcmodels¶list[str] | None, default:None) –Names of the calculation model instances to filter the results. If None, no filter is applied. By default None
-
(file_names¶list[str] | None, default:None) –Names of the files to filter the results. If None, no filter is applied. By default None
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(model_as_regex¶bool, default:False) –If True, calcmodel_types and calcmodels filters will be treated as regex. Can only be used if a single value is passed to each filter. By default False
-
(skip_value¶bool, default:False) –If set to True, will skip the value column and return only other columns. This is useful for quick checks of the data without loading the full values.
-
(save_dir¶Path | None, default:None) –If set to a directory, will try to save all the data as files in it. The files will have the name like {object_name}{calculation_model_name}{file_name}.json.
Instead of the converted value, the DataFrame or dictionary will contain the path to the file.
If set to None, no files will be saved but the data will be converted from json and returned directly.
By default None
-
(output_type¶Literal['dict', 'DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["dict", "DataFrame"] By default "DataFrame"
Returns:
-
DataFrame–In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[object_name, calculation_model_name, file_name], columns = [attribute, ...]
-
dict[str, dict[str, dict[str, Any]]]–In case output_type is "dict", returns a dictionary in the format {object_name: {calculation_model_name: {file_name: {attribute: value, ...}, ...}, ...}, ...}
Source code in echo_postgres/calcmodel_instance_file_values.py
@validate_call
def get(
self,
object_names: list[str] | Literal["NULL"] | None = None,
calcmodel_types: list[str] | None = None,
calcmodels: list[str] | None = None,
file_names: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
model_as_regex: bool = False,
save_dir: Path | None = None,
skip_value: bool = False,
output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, dict[str, dict[str, Any]]]:
"""Gets all calculation model instance file values with detailed information.
The most useful keys/columns returned are:
- file_id
- value (if skip_value is False)
- modified_date
Parameters
----------
object_names : list[str] | Literal["NULL"] | None, optional
Names of the objects to filter the results. If None, no filter is applied.
If "NULL", will filter the results where the object_name is NULL.
By default None
calcmodel_types : list[str] | None, optional
Names of the calculation model types to filter the results. If None, no filter is applied.
By default None
calcmodels : list[str] | None, optional
Names of the calculation model instances to filter the results. If None, no filter is applied.
By default None
file_names: list[str] | None, optional
Names of the files to filter the results. If None, no filter is applied.
By default None
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
model_as_regex : bool, optional
If True, calcmodel_types and calcmodels filters will be treated as regex. Can only be used if a single value is passed to each filter.
By default False
skip_value : bool, optional
If set to True, will skip the value column and return only other columns. This is useful for quick checks of the data without loading the full values.
save_dir : Path | None, optional
If set to a directory, will try to save all the data as files in it. The files will have the name like {object_name}_{calculation_model_name}_{file_name}.json.
Instead of the converted value, the DataFrame or dictionary will contain the path to the file.
If set to None, no files will be saved but the data will be converted from json and returned directly.
By default None
output_type : Literal["dict", "DataFrame"], optional
Output type of the data. Can be one of ["dict", "DataFrame"]
By default "DataFrame"
Returns
-------
DataFrame
In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[object_name, calculation_model_name, file_name], columns = [attribute, ...]
dict[str, dict[str, dict[str, Any]]]
In case output_type is "dict", returns a dictionary in the format {object_name: {calculation_model_name: {file_name: {attribute: value, ...}, ...}, ...}, ...}
"""
# checking arguments
where = self._check_get_args(
object_names=object_names,
calcmodel_types=calcmodel_types,
calcmodels=calcmodels,
file_names=file_names,
filter_type=filter_type,
model_as_regex=model_as_regex,
)
query = [
sql.SQL(
"SELECT object_name, calculation_model_name, file_name, file_id{value_col}, modified_date FROM performance.v_calculation_model_files_data ",
).format(
value_col=sql.SQL(", json_data AS value") if not skip_value else sql.SQL(""),
),
where,
sql.SQL(" ORDER BY object_name, calculation_model_name, file_name"),
]
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query, post_convert="pyarrow")
# forcing object_name col to be string[pyarrow] as when it is returned only as NA it is returned as null[pyarrow]
df["object_name"] = df["object_name"].astype("string[pyarrow]")
if df.empty:
return df.set_index(["object_name", "calculation_model_name", "file_name"]) if output_type == "DataFrame" else {}
# creating file_path column (not used in all cases but needed for the convert_binary function)
if save_dir:
save_dir.mkdir(parents=True, exist_ok=True)
df["file_path"] = df.apply(
lambda row: save_dir / f"{row['object_name']}_{row['calculation_model_name']}_{row['file_name']}.json",
axis=1,
)
else:
df["file_path"] = None
@validate_call
def covert_model(model_value: Any) -> Any:
from echo_calcmodels import PredictiveModel # noqa: PLC0415
from echo_datetimerange import DeserializableBaseModel # noqa: PLC0415
model = model_value
if isinstance(model_value, str):
with contextlib.suppress(json.JSONDecodeError):
model_value = json.loads(model_value)
model = model_value
if isinstance(model_value, dict):
if "model_class" in model_value and "PredictiveModel" in model_value["model_class"]:
model = PredictiveModel.load(json=model_value)
if "model_class" in model_value:
model = DeserializableBaseModel.from_json(loadable=model_value)
return model
if not skip_value:
# converting the values
if not save_dir:
df["value"] = df.apply(
lambda row: covert_model(row["value"]),
axis=1,
)
else:
# saving json data to file
df.apply(lambda row: json.dump(row["value"], Path(row["file_path"]).open("w")), axis=1) # noqa: SIM115
# making value be the name of the file in case of saving
if save_dir:
df["value"] = df["file_path"]
# dropping the file_path column
df = df.drop(columns=["file_path"])
df = df.set_index(["object_name", "calculation_model_name", "file_name"])
if output_type == "DataFrame":
return df
result = df.to_dict(orient="index")
final_result = {}
for (object_name, calc_model, file_name), file_vals in result.items():
corrected_object_name = None if object_name is NA else object_name
if corrected_object_name not in final_result:
final_result[corrected_object_name] = {}
if calc_model not in final_result[corrected_object_name]:
final_result[corrected_object_name][calc_model] = {}
final_result[corrected_object_name][calc_model][file_name] = file_vals
return final_result
get_connected_objects(calcmodels=None, file_names=None, filter_type='and')
¶
Gets all the objects connected to the calculation model files.
Parameters:
-
(calcmodels¶list[str] | None, default:None) –Name of the calculation model files to get, by default None
-
(file_names¶list[str] | None, default:None) –Names of the files to get, by default None
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
Returns:
-
dict[str, dict[str, dict[int, dict[str, list[str] | list[int]]]]]–Dictionary in the format {calculation_model_name: {file_name: {file_id: {"object_names": [object_name, ...], "object_ids": [object_id, ...]}, ...}, ...}, ...}
Source code in echo_postgres/calcmodel_instance_file_values.py
@validate_call
def get_connected_objects(
self,
calcmodels: list[str] | None = None,
file_names: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
) -> dict[str, dict[str, dict[int, dict[str, list[str] | list[int]]]]]:
"""Gets all the objects connected to the calculation model files.
Parameters
----------
calcmodels : list[str] | None, optional
Name of the calculation model files to get, by default None
file_names : list[str] | None, optional
Names of the files to get, by default None
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
Returns
-------
dict[str, dict[str, dict[int, dict[str, list[str] | list[int]]]]]
Dictionary in the format {calculation_model_name: {file_name: {file_id: {"object_names": [object_name, ...], "object_ids": [object_id, ...]}, ...}, ...}, ...}
"""
# query
query = [
sql.SQL(
"SELECT calculation_model_name, file_name, file_id, object_names, object_ids FROM performance.v_calculation_model_files_data_object_connections",
),
]
where = []
if calcmodels:
where.append(
sql.SQL("calculation_model_name IN ({calcmodels})").format(
calcmodels=sql.SQL(", ").join(map(sql.Literal, calcmodels)),
),
)
if file_names:
where.append(
sql.SQL("file_name IN ({file_names})").format(
file_names=sql.SQL(", ").join(map(sql.Literal, file_names)),
),
)
if where:
query.append(sql.SQL(" WHERE "))
query.append(sql.SQL(f" {filter_type.upper()} ").join(where))
query.append(sql.SQL(" ORDER BY calculation_model_name, file_name, file_id"))
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query, post_convert="pyarrow")
df = df.set_index(["calculation_model_name", "file_name", "file_id"])
if df.empty:
return {}
result = df.to_dict(orient="index")
final_result = {}
for (calc_model, file_name, file_id), file_vals in result.items():
if calc_model not in final_result:
final_result[calc_model] = {}
if file_name not in final_result[calc_model]:
final_result[calc_model][file_name] = {}
final_result[calc_model][file_name][file_id] = file_vals
return final_result
get_ids(object_names=None, calcmodel_types=None, calcmodels=None, file_names=None, filter_type='and', model_as_regex=False)
¶
Gets all file ids for the given filters.
Parameters:
-
(object_names¶list[str] | Literal['NULL'] | None, default:None) –Names of the objects to filter the results. If None, no filter is applied. If "NULL", will filter the results where the object_name is NULL. By default None
-
(calcmodel_types¶list[str] | None, default:None) –Names of the calculation model types to filter the results. If None, no filter is applied. By default None
-
(calcmodels¶list[str] | None, default:None) –Names of the calculation model instances to filter the results. If None, no filter is applied. By default None
-
(file_names¶list[str] | None, default:None) –Names of the files to filter the results. If None, no filter is applied. By default None
-
(filter_type¶Literal['and', 'or'], default:'and') –How to treat multiple filters. Can be one of ["and", "or"]. By default "and"
-
(model_as_regex¶bool, default:False) –If True, calcmodel_types and calcmodels filters will be treated as regex. Can only be used if a single value is passed to each filter. By default False
Returns:
-
dict[str, dict[str, dict[str, int]]]–Dict in the format {object_name: {calculation_model_name: {file_name: file_id, ...}, ...}, ...}
Source code in echo_postgres/calcmodel_instance_file_values.py
@validate_call
def get_ids(
self,
object_names: list[str] | Literal["NULL"] | None = None,
calcmodel_types: list[str] | None = None,
calcmodels: list[str] | None = None,
file_names: list[str] | None = None,
filter_type: Literal["and", "or"] = "and",
model_as_regex: bool = False,
) -> dict[str, dict[str, dict[str, int]]]:
"""Gets all file ids for the given filters.
Parameters
----------
object_names : list[str] | Literal["NULL"] | None, optional
Names of the objects to filter the results. If None, no filter is applied.
If "NULL", will filter the results where the object_name is NULL.
By default None
calcmodel_types : list[str] | None, optional
Names of the calculation model types to filter the results. If None, no filter is applied.
By default None
calcmodels : list[str] | None, optional
Names of the calculation model instances to filter the results. If None, no filter is applied.
By default None
file_names: list[str] | None, optional
Names of the files to filter the results. If None, no filter is applied.
By default None
filter_type : Literal["and", "or"], optional
How to treat multiple filters. Can be one of ["and", "or"].
By default "and"
model_as_regex : bool, optional
If True, calcmodel_types and calcmodels filters will be treated as regex. Can only be used if a single value is passed to each filter.
By default False
Returns
-------
dict[str, dict[str, dict[str, int]]]
Dict in the format {object_name: {calculation_model_name: {file_name: file_id, ...}, ...}, ...}
"""
# checking arguments
where = self._check_get_args(
object_names=object_names,
calcmodel_types=calcmodel_types,
calcmodels=calcmodels,
file_names=file_names,
filter_type=filter_type,
model_as_regex=model_as_regex,
)
query = [
sql.SQL(
"SELECT object_name, calculation_model_name, file_name, file_id FROM performance.v_calculation_model_files_data ",
),
where,
sql.SQL(" ORDER BY object_name, calculation_model_name, file_name"),
]
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
df = conn.read_to_pandas(query)
df = df.set_index(["object_name", "calculation_model_name", "file_name"])
if df.empty:
return {}
result = df.to_dict()["file_id"]
final_result = {}
for (object_name, calc_model, file_name), file_id in result.items():
corrected_object_name = None if object_name is NA else object_name
if corrected_object_name not in final_result:
final_result[corrected_object_name] = {}
if calc_model not in final_result[corrected_object_name]:
final_result[corrected_object_name][calc_model] = {}
final_result[corrected_object_name][calc_model][file_name] = file_id
return final_result
insert(data, value_as_path=False, on_conflict='ignore')
¶
Inserts one or multiple calculation model files into the database.
Parameters:
-
(data¶dict[str, dict[str, dict[str, Any | list[str]]]]) –Dict containing the data to be inserted. The format is {calc_model_name: {file_name: {"value": value, "objects": [obj1, obj2, ...]}, ...}, ...}. Objects are passed as a list as one file can be reused for multiple objects. If the list is empty, the file will be used for all objects.
-
(value_as_path¶bool, default:False) –If set to True, will treat the value as a file path and try to read the file and insert it as binary, by default False.
-
(on_conflict¶Literal['ignore', 'update'], default:'ignore') –What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"
Source code in echo_postgres/calcmodel_instance_file_values.py
@validate_call
def insert(
self,
data: dict[str, dict[str, dict[str, Any | list[str]]]],
value_as_path: bool = False,
on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
"""Inserts one or multiple calculation model files into the database.
Parameters
----------
data : dict[str, dict[str, dict[str, Any | list[str]]]]
Dict containing the data to be inserted. The format is {calc_model_name: {file_name: {"value": value, "objects": [obj1, obj2, ...]}, ...}, ...}.
Objects are passed as a list as one file can be reused for multiple objects. If the list is empty, the file will be used for all objects.
value_as_path : bool, optional
If set to True, will treat the value as a file path and try to read the file and insert it as binary, by default False.
on_conflict : Literal["ignore", "update"], optional
What to do in case of conflict. Can be one of ["ignore", "update"].
By default "ignore"
"""
wanted_objs = []
# validating types
for calc_model, files in data.items():
for file_name, file_data in files.items():
if {"value", "objects"} != set(file_data.keys()):
raise ValueError(f"file data for {calc_model}/{file_name} must contain 'value' and 'objects' keys")
if not isinstance(file_data["objects"], list):
raise TypeError(f"objects for {calc_model}/{file_name} must be a list, not {type(file_data['objects'])}")
wanted_objs.extend(file_data["objects"])
# getting ids of the objects
wanted_objs = list(set(wanted_objs))
obj_ids = self._perfdb.objects.instances.get_ids(object_names=wanted_objs)
if len(obj_ids) != len(wanted_objs):
missing_objs = set(wanted_objs) - set(obj_ids)
logger.warning(f"Could not find the following objects: {missing_objs}")
# getting file definitions
all_files = list({file_name for files in data.values() for file_name in files})
file_defs = self._perfdb.calcmodels.instances.files.definitions.get(calcmodels=list(data.keys()), file_names=all_files)
# inserting the data
for calc_model, files in data.items():
for file_name, file_data in files.items():
value = file_data["value"]
if value is None:
raise ValueError(f"Value for {calc_model}/{file_name} cannot be None")
# reading from file
if value_as_path:
value = Path(value)
# validating extension
if value.suffix.lower() != ".json":
raise ValueError(f"Expected file to have extension '.json', got '{value.suffix}'")
value = json.load(value.open("r"))
file_def_id = file_defs[calc_model][file_name]["file_def_id"]
file_obj_ids = [obj_ids[obj] for obj in file_data["objects"]]
# checking if the file already exists for the given objects
existing_files = self.get_ids(object_names=file_data["objects"] or "NULL", calcmodels=[calc_model], file_names=[file_name])
# making sure all point to the same file id
file_ids = [
file_id for calc_models in existing_files.values() for files in calc_models.values() for file_id in files.values()
]
if len(set(file_ids)) > 1:
raise ValueError(
f"Trying to insert the same file '{file_name}' of calc model '{calc_model}' to multiple objects but there's already different files associated with them. Objects {file_data['objects']}",
)
file_id = file_ids[0] if file_ids else None
# getting new file id if needed
if file_id is None:
query = sql.SQL("SELECT nextval(pg_get_serial_sequence('calculation_model_files_data', 'file_id'))")
with self._perfdb.conn.reconnect() as conn:
file_id = conn.read_to_pandas(query).iloc[0, 0]
# inserting the file
if isinstance(value, dict):
value = json.dumps(value, ensure_ascii=False)
# replacing any NaN values with null
value = value.replace(" NaN", "null")
query = [
sql.SQL(
"INSERT INTO performance.calculation_model_files_data (file_id, file_def_id, json_data)\n"
"VALUES ({file_id}, {file_def_id}, {json_data})\n"
"ON CONFLICT (file_id) DO ",
).format(
file_id=sql.Literal(file_id),
file_def_id=sql.Literal(file_def_id),
json_data=sql.Literal(value),
),
]
if on_conflict == "ignore":
query.append(sql.SQL("NOTHING"))
else:
query.append(sql.SQL("UPDATE SET json_data = EXCLUDED.json_data"))
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query=query)
# connecting this file to the objects
if file_obj_ids:
query = [
sql.SQL(
"INSERT INTO performance.calculation_model_files_data_object_connections (file_id, object_id) VALUES\n",
),
]
connect_query = [
sql.SQL("({file_id}, {obj_id})").format(
file_id=sql.Literal(file_id),
obj_id=sql.Literal(obj_id),
)
for obj_id in file_obj_ids
]
connect_query = sql.SQL(", ").join(connect_query)
query.append(connect_query)
query.append(sql.SQL("ON CONFLICT DO NOTHING"))
query = sql.Composed(query)
with self._perfdb.conn.reconnect() as conn:
conn.execute(query)
logger.debug(
f"'{file_name}' of '{calc_model}' inserted successfully with file_id '{file_id}' and connected to objects {file_data['objects']}",
)