Skip to content

Calculation Model Instances - Files Values

CalcModelInstanceFileValues(perfdb)

Class used for handling calculation model instance file values. Can be accessed via perfdb.calcmodels.instances.files.values.

Parameters:

  • perfdb

    (PerfDB) –

    Top level object carrying all functionality and the connection handler.

Source code in echo_postgres/perfdb_root.py
def __init__(self, perfdb: e_pg.PerfDB) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    perfdb : PerfDB
        Top level object carrying all functionality and the connection handler.

    """
    self._perfdb: e_pg.PerfDB = perfdb

delete(object_names=None, calcmodels=None, file_names=None)

Deletes the file values from the database.

If all objects associated are specified the file is deleted, otherwise only the connection is deleted.

Parameters:

  • object_names

    (list[str] | Literal['NULL'] | None, default: None ) –

    List of object names related to the file. It will be handled in the following way:

    • If None, no filter is applied (the file will be deleted regardless of the objects).
    • If "NULL", only files without connected objects will be deleted.
    • If a list, will first try to delete the connections and then the file itself (if all connections are deleted).
  • calcmodels

    (list[str] | None, default: None ) –

    Name of the calculation models of the files. If None, will not filter by calculation model name. By default None

  • file_names

    (list[str] | None, default: None ) –

    Names of the files to delete. If None, will not filter by file name. By default None

Source code in echo_postgres/calcmodel_instance_file_values.py
@validate_call
def delete(
    self,
    object_names: list[str] | Literal["NULL"] | None = None,
    calcmodels: list[str] | None = None,
    file_names: list[str] | None = None,
) -> None:
    """Deletes the file values from the database.

    If all objects associated are specified the file is deleted, otherwise only the connection is deleted.

    Parameters
    ----------
    object_names : list[str] | Literal["NULL"] | None, optional
        List of object names related to the file. It will be handled in the following way:

        - If None, no filter is applied (the file will be deleted regardless of the objects).
        - If "NULL", only files without connected objects will be deleted.
        - If a list, will first try to delete the connections and then the file itself (if all connections are deleted).
    calcmodels : list[str] | None, optional
        Name of the calculation models of the files. If None, will not filter by calculation model name.
        By default None
    file_names : list[str] | None, optional
        Names of the files to delete. If None, will not filter by file name.
        By default None

    """
    # checking inputs
    self._check_get_args(
        object_names=object_names,
        calcmodel_types=None,
        calcmodels=calcmodels,
        file_names=file_names,
        filter_type="and",
        model_as_regex=False,
    )

    # getting all file to object connections that match the desired criteria
    file_connections = self.get_connected_objects(calcmodels=calcmodels, file_names=file_names)

    # getting object_ids
    if object_names and object_names != "NULL":
        obj_ids = self._perfdb.objects.instances.get_ids(object_names=object_names)
        if len(obj_ids) != len(object_names):
            missing_objs = set(object_names) - set(obj_ids)
            logger.warning(f"Could not find the following objects: {missing_objs}")

    # getting the file ids that match the filters (answer from previous query might include more than what we want)
    # it will be a dict in the format {calc_model: {file_name: {file_id: {"action": "delete" | "disconnect", "object_ids": [obj_id, ...]}, ...}, ...}, ...}
    wanted_file_ids = {}
    for calc_model, files in file_connections.items():
        if calcmodels and calc_model not in calcmodels:
            continue
        for file_name, file_ids in files.items():
            if file_names and file_name not in file_names:
                continue
            for file_id, file_vals in file_ids.items():
                # no filters on objects, deleting the file
                if object_names is None or (
                    object_names == "NULL" and (not file_vals["object_ids"] or file_vals["object_ids"] == [None])
                ):
                    wanted_file_ids.setdefault(calc_model, {}).setdefault(file_name, {})[file_id] = {
                        "action": "delete",
                        "object_ids": [],
                    }
                # all objects are in the filter, deleting the file
                elif set(file_vals["object_names"]).issubset(set(object_names)):
                    wanted_file_ids.setdefault(calc_model, {}).setdefault(file_name, {})[file_id] = {
                        "action": "delete",
                        "object_ids": file_vals["object_ids"] if file_vals["object_ids"] != [None] else [],
                    }
                # some objects are not in the filter, disconnecting the wanted objects from the file
                elif set(file_vals["object_names"]).intersection(set(object_names)):
                    disconnect_ids = [obj_ids[obj] for obj in file_vals["object_names"] if obj in object_names]
                    wanted_file_ids.setdefault(calc_model, {}).setdefault(file_name, {})[file_id] = {
                        "action": "disconnect",
                        "object_ids": disconnect_ids,
                    }
                # no objects are in the filter, ignoring the file
                else:
                    continue

    # deleting the files
    for calc_model, files in wanted_file_ids.items():
        for file_name, file_ids in files.items():
            for file_id, file_vals in file_ids.items():
                # deleting the file
                if file_vals["action"] == "delete":
                    # disconnecting the objects
                    query = sql.SQL(
                        "DELETE FROM performance.calculation_model_files_data_object_connections WHERE file_id = {file_id}",
                    ).format(
                        file_id=sql.Literal(file_id),
                    )
                    with self._perfdb.conn.reconnect() as conn:
                        conn.execute(query)
                    # deleting the file
                    query = sql.SQL("DELETE FROM performance.calculation_model_files_data WHERE file_id = {file_id}").format(
                        file_id=sql.Literal(file_id),
                    )
                    with self._perfdb.conn.reconnect() as conn:
                        conn.execute(query)
                    logger.debug(f"File '{file_name}' of '{calc_model}' with file_id '{file_id}' deleted successfully")
                # disconnecting the objects
                elif file_vals["action"] == "disconnect":
                    query = sql.SQL(
                        "DELETE FROM performance.calculation_model_files_data_object_connections WHERE file_id = {file_id} AND object_id IN ({obj_ids})",
                    ).format(
                        file_id=sql.Literal(file_id),
                        obj_ids=sql.SQL(", ").join(map(sql.Literal, file_vals["object_ids"])),
                    )
                    with self._perfdb.conn.reconnect() as conn:
                        conn.execute(query)
                    logger.debug(
                        f"File '{file_name}' of '{calc_model}' with file_id '{file_id}' disconnected from objects {file_vals['object_ids']}",
                    )
                else:
                    raise ValueError(
                        f"Invalid action '{file_vals['action']}' for file '{file_name}' of '{calc_model}' with file_id '{file_id}'",
                    )

get(object_names=None, calcmodel_types=None, calcmodels=None, file_names=None, filter_type='and', model_as_regex=False, save_dir=None, skip_value=False, output_type='DataFrame')

Gets all calculation model instance file values with detailed information.

The most useful keys/columns returned are:

  • file_id
  • value (if skip_value is False)
  • modified_date

Parameters:

  • object_names

    (list[str] | Literal['NULL'] | None, default: None ) –

    Names of the objects to filter the results. If None, no filter is applied. If "NULL", will filter the results where the object_name is NULL. By default None

  • calcmodel_types

    (list[str] | None, default: None ) –

    Names of the calculation model types to filter the results. If None, no filter is applied. By default None

  • calcmodels

    (list[str] | None, default: None ) –

    Names of the calculation model instances to filter the results. If None, no filter is applied. By default None

  • file_names

    (list[str] | None, default: None ) –

    Names of the files to filter the results. If None, no filter is applied. By default None

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

  • model_as_regex

    (bool, default: False ) –

    If True, calcmodel_types and calcmodels filters will be treated as regex. Can only be used if a single value is passed to each filter. By default False

  • skip_value

    (bool, default: False ) –

    If set to True, will skip the value column and return only other columns. This is useful for quick checks of the data without loading the full values.

  • save_dir

    (Path | None, default: None ) –

    If set to a directory, will try to save all the data as files in it. The files will have the name like {object_name}{calculation_model_name}{file_name}.json.

    Instead of the converted value, the DataFrame or dictionary will contain the path to the file.

    If set to None, no files will be saved but the data will be converted from json and returned directly.

    By default None

  • output_type

    (Literal['dict', 'DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["dict", "DataFrame"] By default "DataFrame"

Returns:

  • DataFrame

    In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[object_name, calculation_model_name, file_name], columns = [attribute, ...]

  • dict[str, dict[str, dict[str, Any]]]

    In case output_type is "dict", returns a dictionary in the format {object_name: {calculation_model_name: {file_name: {attribute: value, ...}, ...}, ...}, ...}

Source code in echo_postgres/calcmodel_instance_file_values.py
@validate_call
def get(
    self,
    object_names: list[str] | Literal["NULL"] | None = None,
    calcmodel_types: list[str] | None = None,
    calcmodels: list[str] | None = None,
    file_names: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
    model_as_regex: bool = False,
    save_dir: Path | None = None,
    skip_value: bool = False,
    output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> DataFrame | dict[str, dict[str, dict[str, Any]]]:
    """Gets all calculation model instance file values with detailed information.

    The most useful keys/columns returned are:

    - file_id
    - value (if skip_value is False)
    - modified_date

    Parameters
    ----------
    object_names : list[str] | Literal["NULL"] | None, optional
        Names of the objects to filter the results. If None, no filter is applied.
        If "NULL", will filter the results where the object_name is NULL.
        By default None
    calcmodel_types : list[str] | None, optional
        Names of the calculation model types to filter the results. If None, no filter is applied.
        By default None
    calcmodels : list[str] | None, optional
        Names of the calculation model instances to filter the results. If None, no filter is applied.
        By default None
    file_names: list[str] | None, optional
        Names of the files to filter the results. If None, no filter is applied.
        By default None
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"
    model_as_regex : bool, optional
        If True, calcmodel_types and calcmodels filters will be treated as regex. Can only be used if a single value is passed to each filter.
        By default False
    skip_value : bool, optional
        If set to True, will skip the value column and return only other columns. This is useful for quick checks of the data without loading the full values.
    save_dir : Path | None, optional
        If set to a directory, will try to save all the data as files in it. The files will have the name like {object_name}_{calculation_model_name}_{file_name}.json.

        Instead of the converted value, the DataFrame or dictionary will contain the path to the file.

        If set to None, no files will be saved but the data will be converted from json and returned directly.

        By default None
    output_type : Literal["dict", "DataFrame"], optional
        Output type of the data. Can be one of ["dict", "DataFrame"]
        By default "DataFrame"

    Returns
    -------
    DataFrame
        In case output_type is "DataFrame", returns a DataFrame with the following format: index = MultiIndex[object_name, calculation_model_name, file_name], columns = [attribute, ...]
    dict[str, dict[str, dict[str, Any]]]
        In case output_type is "dict", returns a dictionary in the format {object_name: {calculation_model_name: {file_name: {attribute: value, ...}, ...}, ...}, ...}

    """
    # checking arguments
    where = self._check_get_args(
        object_names=object_names,
        calcmodel_types=calcmodel_types,
        calcmodels=calcmodels,
        file_names=file_names,
        filter_type=filter_type,
        model_as_regex=model_as_regex,
    )

    query = [
        sql.SQL(
            "SELECT object_name, calculation_model_name, file_name, file_id{value_col}, modified_date FROM performance.v_calculation_model_files_data ",
        ).format(
            value_col=sql.SQL(", json_data AS value") if not skip_value else sql.SQL(""),
        ),
        where,
        sql.SQL(" ORDER BY object_name, calculation_model_name, file_name"),
    ]
    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query, post_convert="pyarrow")
    # forcing object_name col to be string[pyarrow] as when it is returned only as NA it is returned as null[pyarrow]
    df["object_name"] = df["object_name"].astype("string[pyarrow]")

    if df.empty:
        return df.set_index(["object_name", "calculation_model_name", "file_name"]) if output_type == "DataFrame" else {}

    # creating file_path column (not used in all cases but needed for the convert_binary function)
    if save_dir:
        save_dir.mkdir(parents=True, exist_ok=True)
        df["file_path"] = df.apply(
            lambda row: save_dir / f"{row['object_name']}_{row['calculation_model_name']}_{row['file_name']}.json",
            axis=1,
        )
    else:
        df["file_path"] = None

    @validate_call
    def covert_model(model_value: Any) -> Any:
        from echo_calcmodels import PredictiveModel  # noqa: PLC0415
        from echo_datetimerange import DeserializableBaseModel  # noqa: PLC0415

        model = model_value

        if isinstance(model_value, str):
            with contextlib.suppress(json.JSONDecodeError):
                model_value = json.loads(model_value)
                model = model_value
        if isinstance(model_value, dict):
            if "model_class" in model_value and "PredictiveModel" in model_value["model_class"]:
                model = PredictiveModel.load(json=model_value)
            if "model_class" in model_value:
                model = DeserializableBaseModel.from_json(loadable=model_value)

        return model

    if not skip_value:
        # converting the values
        if not save_dir:
            df["value"] = df.apply(
                lambda row: covert_model(row["value"]),
                axis=1,
            )
        else:
            # saving json data to file
            df.apply(lambda row: json.dump(row["value"], Path(row["file_path"]).open("w")), axis=1)  # noqa: SIM115

        # making value be the name of the file in case of saving
        if save_dir:
            df["value"] = df["file_path"]
        # dropping the file_path column
        df = df.drop(columns=["file_path"])

    df = df.set_index(["object_name", "calculation_model_name", "file_name"])

    if output_type == "DataFrame":
        return df

    result = df.to_dict(orient="index")
    final_result = {}
    for (object_name, calc_model, file_name), file_vals in result.items():
        corrected_object_name = None if object_name is NA else object_name
        if corrected_object_name not in final_result:
            final_result[corrected_object_name] = {}
        if calc_model not in final_result[corrected_object_name]:
            final_result[corrected_object_name][calc_model] = {}
        final_result[corrected_object_name][calc_model][file_name] = file_vals

    return final_result

get_connected_objects(calcmodels=None, file_names=None, filter_type='and')

Gets all the objects connected to the calculation model files.

Parameters:

  • calcmodels

    (list[str] | None, default: None ) –

    Name of the calculation model files to get, by default None

  • file_names

    (list[str] | None, default: None ) –

    Names of the files to get, by default None

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

Returns:

  • dict[str, dict[str, dict[int, dict[str, list[str] | list[int]]]]]

    Dictionary in the format {calculation_model_name: {file_name: {file_id: {"object_names": [object_name, ...], "object_ids": [object_id, ...]}, ...}, ...}, ...}

Source code in echo_postgres/calcmodel_instance_file_values.py
@validate_call
def get_connected_objects(
    self,
    calcmodels: list[str] | None = None,
    file_names: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
) -> dict[str, dict[str, dict[int, dict[str, list[str] | list[int]]]]]:
    """Gets all the objects connected to the calculation model files.

    Parameters
    ----------
    calcmodels : list[str] | None, optional
        Name of the calculation model files to get, by default None
    file_names : list[str] | None, optional
        Names of the files to get, by default None
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"

    Returns
    -------
    dict[str, dict[str, dict[int, dict[str, list[str] | list[int]]]]]
        Dictionary in the format {calculation_model_name: {file_name: {file_id: {"object_names": [object_name, ...], "object_ids": [object_id, ...]}, ...}, ...}, ...}

    """
    # query
    query = [
        sql.SQL(
            "SELECT calculation_model_name, file_name, file_id, object_names, object_ids FROM performance.v_calculation_model_files_data_object_connections",
        ),
    ]
    where = []
    if calcmodels:
        where.append(
            sql.SQL("calculation_model_name IN ({calcmodels})").format(
                calcmodels=sql.SQL(", ").join(map(sql.Literal, calcmodels)),
            ),
        )
    if file_names:
        where.append(
            sql.SQL("file_name IN ({file_names})").format(
                file_names=sql.SQL(", ").join(map(sql.Literal, file_names)),
            ),
        )
    if where:
        query.append(sql.SQL(" WHERE "))
        query.append(sql.SQL(f" {filter_type.upper()} ").join(where))

    query.append(sql.SQL(" ORDER BY calculation_model_name, file_name, file_id"))

    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query, post_convert="pyarrow")
    df = df.set_index(["calculation_model_name", "file_name", "file_id"])

    if df.empty:
        return {}

    result = df.to_dict(orient="index")
    final_result = {}
    for (calc_model, file_name, file_id), file_vals in result.items():
        if calc_model not in final_result:
            final_result[calc_model] = {}
        if file_name not in final_result[calc_model]:
            final_result[calc_model][file_name] = {}
        final_result[calc_model][file_name][file_id] = file_vals

    return final_result

get_ids(object_names=None, calcmodel_types=None, calcmodels=None, file_names=None, filter_type='and', model_as_regex=False)

Gets all file ids for the given filters.

Parameters:

  • object_names

    (list[str] | Literal['NULL'] | None, default: None ) –

    Names of the objects to filter the results. If None, no filter is applied. If "NULL", will filter the results where the object_name is NULL. By default None

  • calcmodel_types

    (list[str] | None, default: None ) –

    Names of the calculation model types to filter the results. If None, no filter is applied. By default None

  • calcmodels

    (list[str] | None, default: None ) –

    Names of the calculation model instances to filter the results. If None, no filter is applied. By default None

  • file_names

    (list[str] | None, default: None ) –

    Names of the files to filter the results. If None, no filter is applied. By default None

  • filter_type

    (Literal['and', 'or'], default: 'and' ) –

    How to treat multiple filters. Can be one of ["and", "or"]. By default "and"

  • model_as_regex

    (bool, default: False ) –

    If True, calcmodel_types and calcmodels filters will be treated as regex. Can only be used if a single value is passed to each filter. By default False

Returns:

  • dict[str, dict[str, dict[str, int]]]

    Dict in the format {object_name: {calculation_model_name: {file_name: file_id, ...}, ...}, ...}

Source code in echo_postgres/calcmodel_instance_file_values.py
@validate_call
def get_ids(
    self,
    object_names: list[str] | Literal["NULL"] | None = None,
    calcmodel_types: list[str] | None = None,
    calcmodels: list[str] | None = None,
    file_names: list[str] | None = None,
    filter_type: Literal["and", "or"] = "and",
    model_as_regex: bool = False,
) -> dict[str, dict[str, dict[str, int]]]:
    """Gets all file ids for the given filters.

    Parameters
    ----------
    object_names : list[str] | Literal["NULL"] | None, optional
        Names of the objects to filter the results. If None, no filter is applied.
        If "NULL", will filter the results where the object_name is NULL.
        By default None
    calcmodel_types : list[str] | None, optional
        Names of the calculation model types to filter the results. If None, no filter is applied.
        By default None
    calcmodels : list[str] | None, optional
        Names of the calculation model instances to filter the results. If None, no filter is applied.
        By default None
    file_names: list[str] | None, optional
        Names of the files to filter the results. If None, no filter is applied.
        By default None
    filter_type : Literal["and", "or"], optional
        How to treat multiple filters. Can be one of ["and", "or"].
        By default "and"
    model_as_regex : bool, optional
        If True, calcmodel_types and calcmodels filters will be treated as regex. Can only be used if a single value is passed to each filter.
        By default False

    Returns
    -------
    dict[str, dict[str, dict[str, int]]]
        Dict in the format {object_name: {calculation_model_name: {file_name: file_id, ...}, ...}, ...}

    """
    # checking arguments
    where = self._check_get_args(
        object_names=object_names,
        calcmodel_types=calcmodel_types,
        calcmodels=calcmodels,
        file_names=file_names,
        filter_type=filter_type,
        model_as_regex=model_as_regex,
    )

    query = [
        sql.SQL(
            "SELECT object_name, calculation_model_name, file_name, file_id FROM performance.v_calculation_model_files_data ",
        ),
        where,
        sql.SQL(" ORDER BY object_name, calculation_model_name, file_name"),
    ]
    query = sql.Composed(query)

    with self._perfdb.conn.reconnect() as conn:
        df = conn.read_to_pandas(query)
    df = df.set_index(["object_name", "calculation_model_name", "file_name"])

    if df.empty:
        return {}

    result = df.to_dict()["file_id"]
    final_result = {}
    for (object_name, calc_model, file_name), file_id in result.items():
        corrected_object_name = None if object_name is NA else object_name
        if corrected_object_name not in final_result:
            final_result[corrected_object_name] = {}
        if calc_model not in final_result[corrected_object_name]:
            final_result[corrected_object_name][calc_model] = {}
        final_result[corrected_object_name][calc_model][file_name] = file_id

    return final_result

insert(data, value_as_path=False, on_conflict='ignore')

Inserts one or multiple calculation model files into the database.

Parameters:

  • data

    (dict[str, dict[str, dict[str, Any | list[str]]]]) –

    Dict containing the data to be inserted. The format is {calc_model_name: {file_name: {"value": value, "objects": [obj1, obj2, ...]}, ...}, ...}. Objects are passed as a list as one file can be reused for multiple objects. If the list is empty, the file will be used for all objects.

  • value_as_path

    (bool, default: False ) –

    If set to True, will treat the value as a file path and try to read the file and insert it as binary, by default False.

  • on_conflict

    (Literal['ignore', 'update'], default: 'ignore' ) –

    What to do in case of conflict. Can be one of ["ignore", "update"]. By default "ignore"

Source code in echo_postgres/calcmodel_instance_file_values.py
@validate_call
def insert(
    self,
    data: dict[str, dict[str, dict[str, Any | list[str]]]],
    value_as_path: bool = False,
    on_conflict: Literal["ignore", "update"] = "ignore",
) -> None:
    """Inserts one or multiple calculation model files into the database.

    Parameters
    ----------
    data : dict[str, dict[str, dict[str, Any | list[str]]]]
        Dict containing the data to be inserted. The format is {calc_model_name: {file_name: {"value": value, "objects": [obj1, obj2, ...]}, ...}, ...}.
        Objects are passed as a list as one file can be reused for multiple objects. If the list is empty, the file will be used for all objects.
    value_as_path : bool, optional
        If set to True, will treat the value as a file path and try to read the file and insert it as binary, by default False.
    on_conflict : Literal["ignore", "update"], optional
        What to do in case of conflict. Can be one of ["ignore", "update"].
        By default "ignore"

    """
    wanted_objs = []

    # validating types
    for calc_model, files in data.items():
        for file_name, file_data in files.items():
            if {"value", "objects"} != set(file_data.keys()):
                raise ValueError(f"file data for {calc_model}/{file_name} must contain 'value' and 'objects' keys")
            if not isinstance(file_data["objects"], list):
                raise TypeError(f"objects for {calc_model}/{file_name} must be a list, not {type(file_data['objects'])}")
            wanted_objs.extend(file_data["objects"])

    # getting ids of the objects
    wanted_objs = list(set(wanted_objs))
    obj_ids = self._perfdb.objects.instances.get_ids(object_names=wanted_objs)
    if len(obj_ids) != len(wanted_objs):
        missing_objs = set(wanted_objs) - set(obj_ids)
        logger.warning(f"Could not find the following objects: {missing_objs}")

    # getting file definitions
    all_files = list({file_name for files in data.values() for file_name in files})
    file_defs = self._perfdb.calcmodels.instances.files.definitions.get(calcmodels=list(data.keys()), file_names=all_files)

    # inserting the data
    for calc_model, files in data.items():
        for file_name, file_data in files.items():
            value = file_data["value"]

            if value is None:
                raise ValueError(f"Value for {calc_model}/{file_name} cannot be None")

            # reading from file
            if value_as_path:
                value = Path(value)
                # validating extension
                if value.suffix.lower() != ".json":
                    raise ValueError(f"Expected file to have extension '.json', got '{value.suffix}'")
                value = json.load(value.open("r"))

            file_def_id = file_defs[calc_model][file_name]["file_def_id"]
            file_obj_ids = [obj_ids[obj] for obj in file_data["objects"]]

            # checking if the file already exists for the given objects
            existing_files = self.get_ids(object_names=file_data["objects"] or "NULL", calcmodels=[calc_model], file_names=[file_name])

            # making sure all point to the same file id
            file_ids = [
                file_id for calc_models in existing_files.values() for files in calc_models.values() for file_id in files.values()
            ]
            if len(set(file_ids)) > 1:
                raise ValueError(
                    f"Trying to insert the same file '{file_name}' of calc model '{calc_model}' to multiple objects but there's already different files associated with them. Objects {file_data['objects']}",
                )

            file_id = file_ids[0] if file_ids else None

            # getting new file id if needed
            if file_id is None:
                query = sql.SQL("SELECT nextval(pg_get_serial_sequence('calculation_model_files_data', 'file_id'))")
                with self._perfdb.conn.reconnect() as conn:
                    file_id = conn.read_to_pandas(query).iloc[0, 0]

            # inserting the file
            if isinstance(value, dict):
                value = json.dumps(value, ensure_ascii=False)
                # replacing any NaN values with null
            value = value.replace(" NaN", "null")
            query = [
                sql.SQL(
                    "INSERT INTO performance.calculation_model_files_data (file_id, file_def_id, json_data)\n"
                    "VALUES ({file_id}, {file_def_id}, {json_data})\n"
                    "ON CONFLICT (file_id) DO ",
                ).format(
                    file_id=sql.Literal(file_id),
                    file_def_id=sql.Literal(file_def_id),
                    json_data=sql.Literal(value),
                ),
            ]
            if on_conflict == "ignore":
                query.append(sql.SQL("NOTHING"))
            else:
                query.append(sql.SQL("UPDATE SET json_data = EXCLUDED.json_data"))

            query = sql.Composed(query)

            with self._perfdb.conn.reconnect() as conn:
                conn.execute(query=query)

            # connecting this file to the objects
            if file_obj_ids:
                query = [
                    sql.SQL(
                        "INSERT INTO performance.calculation_model_files_data_object_connections (file_id, object_id) VALUES\n",
                    ),
                ]
                connect_query = [
                    sql.SQL("({file_id}, {obj_id})").format(
                        file_id=sql.Literal(file_id),
                        obj_id=sql.Literal(obj_id),
                    )
                    for obj_id in file_obj_ids
                ]
                connect_query = sql.SQL(", ").join(connect_query)
                query.append(connect_query)
                query.append(sql.SQL("ON CONFLICT DO NOTHING"))

                query = sql.Composed(query)

                with self._perfdb.conn.reconnect() as conn:
                    conn.execute(query)

            logger.debug(
                f"'{file_name}' of '{calc_model}' inserted successfully with file_id '{file_id}' and connected to objects {file_data['objects']}",
            )