Source code for districtheatingsim.heat_generators.energy_system

"""
Energy System Module
=====================

Multi-technology energy system modeling with optimization and visualization.

:author: Dipl.-Ing. (FH) Jonas Pfeiffer
"""

import logging
logging.basicConfig(level=logging.INFO)

import numpy as np
import matplotlib.pyplot as plt
import copy
import json
import pandas as pd

from typing import Dict, Tuple, List, Optional, Union

from scipy.optimize import minimize as scipy_minimize

from districtheatingsim.heat_generators import *
from districtheatingsim.gui.EnergySystemTab._10_utilities import CustomJSONEncoder
import itertools
from matplotlib import cm

[docs] class EnergySystem: """ Multi-technology district heating system integration. :param time_steps: Simulation time steps :type time_steps: numpy.ndarray :param load_profile: Hourly thermal load [kW] :type load_profile: numpy.ndarray :param VLT_L: Supply temperature profile [°C] :type VLT_L: numpy.ndarray :param RLT_L: Return temperature profile [°C] :type RLT_L: numpy.ndarray :param TRY_data: Test Reference Year meteorological data :type TRY_data: object :param COP_data: Heat pump performance data :type COP_data: object :param economic_parameters: Economic parameters dict :type economic_parameters: dict .. note:: Supports multi-technology dispatch, storage integration and optimization. """
[docs] def __init__(self, time_steps: np.ndarray, load_profile: np.ndarray, VLT_L: np.ndarray, RLT_L: np.ndarray, TRY_data: object, COP_data: object, economic_parameters: dict): """ Initialize energy system. :param time_steps: Time steps for simulation :type time_steps: numpy.ndarray :param load_profile: Hourly thermal load [kW] :type load_profile: numpy.ndarray :param VLT_L: Supply temperature [°C] :type VLT_L: numpy.ndarray :param RLT_L: Return temperature [°C] :type RLT_L: numpy.ndarray :param TRY_data: Test Reference Year data :type TRY_data: object :param COP_data: Heat pump performance data :type COP_data: object :param economic_parameters: Economic parameters :type economic_parameters: dict """ self.time_steps = time_steps self.load_profile = load_profile self.VLT_L = VLT_L self.RLT_L = RLT_L self.TRY_data = TRY_data self.COP_data = COP_data self.economic_parameters = economic_parameters self.technologies = [] # List to store generator objects self.storage = None self.results = {} self.duration = (np.diff(self.time_steps[:2]) / np.timedelta64(1, 'h'))[0]
[docs] def add_technology(self, tech) -> None: """ Add a heat generation technology to the energy system. :param tech: Technology object to add. :type tech: BaseHeatGenerator .. note:: Technologies operate based on priority and control strategies. """ self.technologies.append(tech)
[docs] def add_storage(self, storage) -> None: """ Add a seasonal thermal energy storage system to the energy system. :param storage: Seasonal Thermal Energy Storage object. :type storage: STES .. note:: Enables temporal decoupling of generation and demand for improved efficiency. """ self.storage = storage
[docs] def initialize_results(self) -> None: """ Initialize the results dictionary for energy system calculations. .. note:: Sets up structure for energy balance, economic, environmental, and performance results. """ if not hasattr(self, 'results') or not isinstance(self.results, dict): self.results = {} self.results.update({ 'time_steps': self.time_steps, 'Last_L': self.load_profile, 'VLT_L': self.VLT_L, 'RLT_L': self.RLT_L, 'Jahreswärmebedarf': (np.sum(self.load_profile) / 1000) * self.duration, 'Restlast_L': self.load_profile.copy(), 'Restwärmebedarf': (np.sum(self.load_profile) / 1000) * self.duration, 'WGK_Gesamt': 0, 'Strombedarf': 0, 'Strommenge': 0, 'el_Leistungsbedarf_L': np.zeros_like(self.load_profile), 'el_Leistung_L': np.zeros_like(self.load_profile), 'el_Leistung_ges_L': np.zeros_like(self.load_profile), 'specific_emissions_Gesamt': 0, 'primärenergiefaktor_Gesamt': 0, }) # Ensure lists are initialized or cleared for key in ['Wärmeleistung_L', 'colors', 'Wärmemengen', 'Anteile', 'WGK', 'specific_emissions_L', 'primärenergie_L', 'techs']: if key not in self.results: self.results[key] = [] else: self.results[key].clear()
[docs] def set_optimization_variables(self, variables: list, variables_order: list) -> None: """ Set optimization variables for technologies. :param variables: Optimization variable values :type variables: list :param variables_order: Variable names :type variables_order: list """ for tech in self.technologies: if len(variables) > 0: idx = tech.name.split("_")[-1] tech.set_parameters(variables, variables_order, idx)
[docs] def aggregate_results(self, tech_results: dict) -> None: """ Aggregate technology results into system-level metrics. :param tech_results: Technology results dictionary :type tech_results: dict """ self.results['techs'].append(tech_results.get('tech_name', 'unknown')) self.results['Wärmeleistung_L'].append(tech_results.get('Wärmeleistung_L', np.zeros_like(self.load_profile))) self.results['Wärmemengen'].append(tech_results.get('Wärmemenge', 0)) self.results['Anteile'].append(tech_results.get('Wärmemenge', 0) / self.results['Jahreswärmebedarf']) self.results['WGK'].append(tech_results.get('WGK', 0)) self.results['specific_emissions_L'].append(tech_results.get('spec_co2_total', 0)) self.results['primärenergie_L'].append(tech_results.get('primärenergie', 0)) self.results['colors'].append(tech_results.get('color', 'gray')) if tech_results.get('Wärmemenge', 0) > 1e-6: self.results['Restlast_L'] -= tech_results.get('Wärmeleistung_L', np.zeros_like(self.load_profile)) self.results['Restwärmebedarf'] -= tech_results.get('Wärmemenge', 0) self.results['WGK_Gesamt'] += (tech_results['Wärmemenge'] * tech_results['WGK']) / self.results['Jahreswärmebedarf'] self.results['specific_emissions_Gesamt'] += (tech_results['Wärmemenge'] * tech_results['spec_co2_total']) / self.results['Jahreswärmebedarf'] self.results['primärenergiefaktor_Gesamt'] += tech_results['primärenergie'] / self.results['Jahreswärmebedarf'] if tech_results.get("Strommenge"): self.results['Strommenge'] += tech_results["Strommenge"] self.results['el_Leistung_L'] += tech_results["el_Leistung_L"] self.results['el_Leistung_ges_L'] += tech_results["el_Leistung_L"] if tech_results.get("Strombedarf"): self.results['Strombedarf'] += tech_results["Strombedarf"] self.results['el_Leistungsbedarf_L'] += tech_results["el_Leistung_L"] self.results['el_Leistung_ges_L'] -= tech_results["el_Leistung_L"] if "Wärmeleistung_Speicher_L" in tech_results.keys(): self.results['Restlast_L'] -= tech_results["Wärmeleistung_Speicher_L"] self.results['Wärmeleistung_L'].append(tech_results["Wärmeleistung_Speicher_L"]) self.results['techs'].append(f"{tech_results['tech_name']}_Speicher") self.results['Anteile'].append(0) self.results['colors'].append("gray")
[docs] def calculate_mix(self, variables: list = [], variables_order: list = []) -> dict: """ Calculate energy generation mix with technology dispatch and storage. :param variables: Optimization variables, defaults to [] :type variables: list :param variables_order: Variable order, defaults to [] :type variables_order: list :return: System results dictionary :rtype: dict """ self.initialize_results() # Initialize optimization variables self.set_optimization_variables(variables, variables_order) for tech in self.technologies: if isinstance(tech, STES): self.storage = tech # remove the storage from the technologies list self.technologies.remove(tech) else: # Initialize each technology tech.init_operation(8760) if self.storage: self.storage_state = np.zeros(len(self.time_steps)) # Initialize results for each time step time_steps = len(self.time_steps) for t in range(time_steps): Q_in_total = 0 # Total heat input T_Q_in_flow = self.VLT_L[t] # Supply temperature T_Q_out_return = self.RLT_L[t] # Return temperature Q_out_total = self.load_profile[t] # Heat demand remaining_load = Q_out_total # Get storage state and temperatures upper_storage_temperature, lower_storage_temperature = self.storage.current_storage_temperatures(t-1) if t > 0 else (0, 0) # Get storage state and available energy current_storage_state, available_energy, max_energy = self.storage.current_storage_state(t-1, T_Q_out_return, T_Q_in_flow) if t > 0 else (0, 0, 0) # Calculate storage losses Q_loss = self.storage.Q_loss[t - 1] if t > 0 else 0 # Control generators based on priority for i, tech in enumerate(self.technologies): tech.active = tech.strategy.decide_operation(tech.active, upper_storage_temperature, lower_storage_temperature, remaining_load) if tech.active: # Create kwargs dictionary with technology-specific data kwargs = { "remaining_load": remaining_load, "VLT_L": self.VLT_L[t], "COP_data": self.COP_data, "time_steps": self.time_steps, "duration": self.duration, "TRY_data": self.TRY_data, "RLT_L": self.RLT_L[t], "upper_storage_temperature": upper_storage_temperature, "lower_storage_temperature": lower_storage_temperature, "current_storage_state": current_storage_state, "available_energy": available_energy, "max_energy": max_energy, "Q_loss": Q_loss, } Q_in, _ = tech.generate(t, **kwargs) remaining_load -= Q_in Q_in_total += Q_in tech.calculated = True # Mark technology as calculated # Update storage self.storage.simulate_stratified_temperature_mass_flows(t, Q_in_total, Q_out_total, T_Q_in_flow, T_Q_out_return) # Calculate storage results self.storage.calculate_efficiency(self.load_profile) # self.storage.calculate_operational_costs(0.10) # TODO: needs to be implemented in STES class self.results['storage_class'] = self.storage for tech in self.technologies: # Perform technology-specific calculation tech_results = tech.calculate(economic_parameters=self.economic_parameters, duration=self.duration, load_profile=self.results["Restlast_L"], VLT_L=self.VLT_L, RLT_L=self.RLT_L, TRY_data=self.TRY_data, COP_data=self.COP_data, time_steps=self.time_steps) if tech_results['Wärmemenge'] > 1e-6: self.aggregate_results(tech_results) else: # Add technology as inactive with zero contribution self.aggregate_results({'tech_name': tech.name}) # Calculate unmet demand after processing all technologies if np.any(self.results['Restlast_L'] > 1e-6): unmet_demand = np.sum(self.results['Restlast_L']) / 1000 * self.duration self.results['Wärmeleistung_L'].append(self.results['Restlast_L']) self.results['Wärmemengen'].append(unmet_demand) self.results['techs'].append("Ungedeckter Bedarf") self.results['Anteile'].append(unmet_demand / self.results['Jahreswärmebedarf']) self.results['colors'].append("black") self.getInitialPlotData() return self.results
[docs] def optimize_mix(self, weights: dict, num_restarts: int = 5): """ Optimize energy mix for multi-objective performance. :param weights: Optimization weights (WGK_Gesamt, specific_emissions_Gesamt, primärenergiefaktor_Gesamt) :type weights: dict :param num_restarts: Number of random restarts, defaults to 5 :type num_restarts: int :return: Optimized energy system :rtype: EnergySystem """ optimizer = EnergySystemOptimizer(self, weights, num_restarts) self.optimized_energy_system = optimizer.optimize() return self.optimized_energy_system
[docs] def getInitialPlotData(self) -> tuple: """ Extract and prepare data for visualization. :return: (extracted_data, initial_vars) :rtype: tuple """ # Extract data self.extracted_data = {} for tech_class in self.technologies: for var_name in dir(tech_class): var_value = getattr(tech_class, var_name) if isinstance(var_value, (list, np.ndarray)) and len(var_value) == len(self.time_steps): unique_var_name = f"{tech_class.name}_{var_name}" self.extracted_data[unique_var_name] = var_value # Add storage data if self.storage: Q_net_storage_flow = self.storage.Q_net_storage_flow # Separate storage charging (negative values) and discharging (positive values) Q_net_positive = np.maximum(Q_net_storage_flow, 0) # Storage discharging Q_net_negative = np.minimum(Q_net_storage_flow, 0) # Storage charging # Add storage data to extracted data structure self.extracted_data['Speicherbeladung_kW'] = Q_net_negative self.extracted_data['Speicherentladung_kW'] = Q_net_positive if "Ungedeckter Bedarf" in self.results['techs']: # Find index of "Ungedeckter Bedarf" in technology list if isinstance(self.results['techs'], list): unmet_demand_index = self.results['techs'].index("Ungedeckter Bedarf") elif isinstance(self.results['techs'], np.ndarray): unmet_demand_index = np.where(self.results['techs'] == "Ungedeckter Bedarf")[0][0] else: # Skip unmet demand if data type is unknown unmet_demand_index = None # Add unmet demand to extracted data structure if index was found if unmet_demand_index is not None: self.extracted_data['Ungedeckter_Bedarf_kW'] = self.results["Wärmeleistung_L"][unmet_demand_index] # Initial selection self.initial_vars = [var_name for var_name in self.extracted_data.keys() if "_Wärmeleistung" in var_name] self.initial_vars.append("Last_L") if self.storage: self.initial_vars.append("Speicherbeladung_kW") self.initial_vars.append("Speicherentladung_kW") return self.extracted_data, self.initial_vars
[docs] def plot_stack_plot(self, figure=None, selected_vars=None, second_y_axis=False) -> None: """ Create stack plot visualization of energy system operation. :param figure: Figure object, defaults to None :type figure: matplotlib.figure.Figure, optional :param selected_vars: Selected variables, defaults to None :type selected_vars: list, optional :param second_y_axis: Use second y-axis, defaults to False :type second_y_axis: bool, optional """ if figure is None: figure = plt.figure() if selected_vars is None: selected_vars = self.initial_vars # X-Achse: Jahresstunden als int n_steps = len(self.time_steps) x = np.arange(n_steps) import matplotlib.gridspec as gridspec figure.clear() # Breitere Legenden-Spalten für lange Namen gs = gridspec.GridSpec(1, 3, width_ratios=[0.22, 0.56, 0.22], figure=figure) ax_legend_left = figure.add_subplot(gs[0, 0]) ax_main = figure.add_subplot(gs[0, 1]) ax_legend_right = figure.add_subplot(gs[0, 2]) ax_legend_left.axis('off') ax_legend_right.axis('off') ax_main.set_prop_cycle(color=cm.tab10.colors) # Stackplot- und Linienplot-Logik auf ax_main stackplot_vars = [] if "Speicherbeladung_kW" in selected_vars: stackplot_vars.append("Speicherbeladung_kW") if "Speicherentladung_kW" in selected_vars: stackplot_vars.append("Speicherentladung_kW") stackplot_vars += [var for var in selected_vars if var not in stackplot_vars and "_Wärmeleistung" in var] if "Ungedeckter_Bedarf_kW" in selected_vars: stackplot_vars.append("Ungedeckter_Bedarf_kW") if "Speicherentladung_kW" in stackplot_vars: stackplot_vars.remove("Speicherentladung_kW") stackplot_vars.append("Speicherentladung_kW") line_vars = [var for var in selected_vars if var not in stackplot_vars and var != "Last_L"] stackplot_data = [] stackplot_labels = [] for var in stackplot_vars: if var == "Speicherbeladung_kW" and var in self.extracted_data: ax_main.fill_between( x, 0, self.extracted_data[var], label=var, step="mid", color="gray", alpha=1.0, ) elif var in self.extracted_data: stackplot_data.append(self.extracted_data[var]) stackplot_labels.append(var) if stackplot_data: ax_main.stackplot( x, stackplot_data, labels=stackplot_labels, step="mid", edgecolor='none' ) ax2 = ax_main.twinx() if second_y_axis else None lines_ax1 = [] labels_ax1 = [] lines_ax2 = [] labels_ax2 = [] import itertools color_cycle = itertools.cycle(cm.Dark2.colors) for var_name in line_vars: if var_name in self.extracted_data: if ax2: line, = ax2.plot( x, self.extracted_data[var_name], label=var_name, color=next(color_cycle) ) lines_ax2.append(line) labels_ax2.append(var_name) else: line, = ax_main.plot(x, self.extracted_data[var_name], label=var_name) lines_ax1.append(line) labels_ax1.append(var_name) if "Last_L" in selected_vars: line, = ax_main.plot(x, self.results["Last_L"], color='blue', label='Last', linewidth=0.25) lines_ax1.append(line) labels_ax1.append('Last') # Achsenbeschriftung und Grid ax_main.set_title("Jahresganglinie", fontsize=16) ax_main.set_xlabel("Jahresstunden", fontsize=14) ax_main.set_ylabel("Wärmeleistung [kW]", fontsize=14) ax_main.grid() if ax2: ax2.set_ylabel('Temperatur (°C)', fontsize=14) ax2.tick_params(axis='y', labelsize=14) step = 1000 ax_main.set_xticks(np.arange(0, n_steps+step, step)) ax_main.set_xticklabels([str(i) for i in np.arange(0, n_steps+step, step)]) # Legenden in eigenen Achsen def get_ncol(n): return 1 if n <= 18 else 2 if lines_ax1 or stackplot_labels: ncol_left = get_ncol(len(lines_ax1) + len(stackplot_labels)) ax_legend_left.legend( ax_main.get_legend_handles_labels()[0], ax_main.get_legend_handles_labels()[1], loc='best', fontsize=12, frameon=False, ncol=ncol_left ) if lines_ax2: ncol_right = get_ncol(len(lines_ax2)) ax_legend_right.legend(lines_ax2, labels_ax2, loc='best', fontsize=12, frameon=False, ncol=ncol_right) # Weniger Rand, damit die Daten direkt an den Achsen anliegen figure.subplots_adjust(left=0.08, right=0.92, wspace=0.18) # X-Achse: min/max exakt an Daten ax_main.set_xlim(x[0], x[-1]) # Y-Achse: min/max exakt an Daten y_data_ax1 = [] for arr in stackplot_data: y_data_ax1.append(np.asarray(arr)) for var_name in line_vars: if var_name in self.extracted_data: y_data_ax1.append(np.asarray(self.extracted_data[var_name])) if "Last_L" in selected_vars: y_data_ax1.append(np.asarray(self.results["Last_L"])) if y_data_ax1: y_min = min(arr.min() for arr in y_data_ax1) y_max = max(arr.max() for arr in y_data_ax1) ax_main.set_ylim(y_min, y_max) if ax2: y_data_ax2 = [] for var_name in line_vars: if var_name in self.extracted_data: y_data_ax2.append(np.asarray(self.extracted_data[var_name])) if y_data_ax2: y2_min = min(arr.min() for arr in y_data_ax2) y2_max = max(arr.max() for arr in y_data_ax2) ax2.set_ylim(y2_min, y2_max)
[docs] def plot_pie_chart(self, figure=None) -> None: """ Create pie chart visualization of technology contributions. :param figure: Figure object, defaults to None :type figure: matplotlib.figure.Figure, optional """ if figure is None: figure = plt.figure() # clear the figure if it already exists if figure.axes: for ax in figure.axes: ax.clear() ax = figure.add_subplot(111) labels = self.results['techs'] Anteile = self.results['Anteile'] colors = self.results['colors'] # Create the pie chart without percentage labels on the chart wedges, _ = ax.pie( Anteile, labels=None, colors=colors, startangle=90 ) ax.set_title("Anteile Wärmeerzeugung") ax.axis("equal") # Ensure the pie chart is circular # Prepare legend labels with percentages percent_labels = [f"{label}: {100 * anteil:.1f}%" for label, anteil in zip(labels, Anteile)] ax.legend(wedges, percent_labels, loc="center left")
[docs] def copy(self): """ Create deep copy of EnergySystem instance. :return: Deep copy of energy system :rtype: EnergySystem """ # Create a new EnergySystem instance with copied basic attributes copied_system = EnergySystem( time_steps=self.time_steps.copy(), load_profile=self.load_profile.copy(), VLT_L=self.VLT_L.copy(), RLT_L=self.RLT_L.copy(), TRY_data=copy.deepcopy(self.TRY_data), COP_data=copy.deepcopy(self.COP_data), economic_parameters=copy.deepcopy(self.economic_parameters) ) # Deep-copy the technologies copied_system.technologies = [copy.deepcopy(tech) for tech in self.technologies] # Deep-copy the storage, if it exists if self.storage: copied_system.storage = copy.deepcopy(self.storage) # Deep-copy the results dictionary copied_system.results = copy.deepcopy(self.results) # Copy any additional attributes that may have been added dynamically for attr_name, attr_value in self.__dict__.items(): if attr_name not in copied_system.__dict__: copied_system.__dict__[attr_name] = copy.deepcopy(attr_value) return copied_system
[docs] def to_dict(self) -> dict: """ Convert EnergySystem to dictionary for serialization and storage. Returns ------- dict Dictionary representation of the complete energy system. """ return { 'time_steps': self.time_steps.astype(str).tolist(), # Convert datetime64 to string 'load_profile': self.load_profile.tolist(), 'VLT_L': self.VLT_L.tolist(), 'RLT_L': self.RLT_L.tolist(), 'TRY_data': [data.tolist() for data in self.TRY_data], 'COP_data': self.COP_data.tolist(), 'economic_parameters': self.economic_parameters, 'technologies': [tech.to_dict() for tech in self.technologies], 'storage': self.storage.to_dict() if self.storage else None, 'results': { key: (value.to_dict(orient='split') if isinstance(value, pd.DataFrame) else value) for key, value in self.results.items() }, }
[docs] @classmethod def from_dict(cls, data: dict): """ Recreate EnergySystem instance from dictionary representation. Parameters ---------- data : dict Dictionary representation of the EnergySystem. Returns ------- EnergySystem Fully initialized EnergySystem object. """ # Restore basic attributes time_steps = np.array(data['time_steps'], dtype='datetime64') load_profile = np.array(data['load_profile']) VLT_L = np.array(data['VLT_L']) RLT_L = np.array(data['RLT_L']) TRY_data = [np.array(item) for item in data['TRY_data']] COP_data = np.array(data['COP_data']) economic_parameters = data['economic_parameters'] # Create the EnergySystem object obj = cls( time_steps=time_steps, load_profile=load_profile, VLT_L=VLT_L, RLT_L=RLT_L, TRY_data=TRY_data, COP_data=COP_data, economic_parameters=economic_parameters ) # Restore technologies obj.technologies = [] for tech_data in data.get('technologies', []): for prefix, tech_class in TECH_CLASS_REGISTRY.items(): if tech_data['name'].startswith(prefix): obj.technologies.append(tech_class.from_dict(tech_data)) break # Restore storage if data.get('storage'): obj.storage = STES.from_dict(data['storage']) # Restore results (if available) obj.results = {} if 'results' in data: for key, value in data['results'].items(): if isinstance(value, dict) and 'columns' in value and 'data' in value: obj.results[key] = pd.DataFrame(**value) elif isinstance(value, list): if all(isinstance(v, list) for v in value): obj.results[key] = [np.array(v) for v in value] else: obj.results[key] = np.array(value) else: obj.results[key] = value return obj
[docs] def save_to_csv(self, file_path: str) -> None: """ Save energy system results to CSV file. :param file_path: Path for CSV output :type file_path: str """ if not self.results: raise ValueError("No results available to save.") # Initialize the DataFrame with the timestamps df = pd.DataFrame({'time_steps': self.results['time_steps']}) # Add the load data df['Last_L'] = self.results['Last_L'] # Add the heat generation data for each technology for tech_results, techs in zip(self.results['Wärmeleistung_L'], self.results['techs']): df[techs] = tech_results # Add the electrical power data df['el_Leistungsbedarf_L'] = self.results['el_Leistungsbedarf_L'] df['el_Leistung_L'] = self.results['el_Leistung_L'] df['el_Leistung_ges_L'] = self.results['el_Leistung_ges_L'] # Save the DataFrame as a CSV file df.to_csv(file_path, index=False, sep=";", encoding='utf-8-sig')
[docs] def save_to_json(self, file_path: str) -> None: """ Save complete EnergySystem object to JSON file for persistence. Parameters ---------- file_path : str Path for JSON file output. """ with open(file_path, 'w') as json_file: json.dump(self.to_dict(), json_file, indent=4, cls=CustomJSONEncoder)
[docs] @classmethod def load_from_json(cls, file_path: str): """ Load complete EnergySystem object from JSON file. Parameters ---------- file_path : str Path to JSON file for loading. Returns ------- EnergySystem Loaded EnergySystem object with complete configuration. """ try: with open(file_path, 'r') as json_file: data_loaded = json.load(json_file) return cls.from_dict(data_loaded) except Exception as e: raise ValueError(f"Error loading JSON file: {e}")
[docs] class EnergySystemOptimizer: """ Multi-objective optimizer for energy system configuration. :param initial_energy_system: Initial system configuration :type initial_energy_system: EnergySystem :param weights: Optimization weights dict with 'WGK_Gesamt', 'specific_emissions_Gesamt', 'primärenergiefaktor_Gesamt' :type weights: dict :param num_restarts: Number of random restart runs, defaults to 5 :type num_restarts: int, optional .. note:: Uses SLSQP with random restarts for multi-objective optimization. """
[docs] def __init__(self, initial_energy_system: 'EnergySystem', weights: Dict[str, float], num_restarts: int = 5): """ Initialize multi-objective optimizer. :param initial_energy_system: Initial system configuration :type initial_energy_system: EnergySystem :param weights: Optimization weights :type weights: dict :param num_restarts: Number of random restarts, defaults to 5 :type num_restarts: int :raises ValueError: If required weights missing or negative """ self.initial_energy_system = initial_energy_system self.weights = weights self.num_restarts = num_restarts # Validate optimization weights required_weights = ['WGK_Gesamt', 'specific_emissions_Gesamt', 'primärenergiefaktor_Gesamt'] for weight_key in required_weights: if weight_key not in weights: raise ValueError(f"Required weight '{weight_key}' missing from weights dictionary") if weights[weight_key] < 0: raise ValueError(f"Weight '{weight_key}' must be non-negative")
[docs] def optimize(self) -> 'EnergySystem': """ Perform multi-objective optimization with random restarts. :return: Optimized energy system :rtype: EnergySystem :raises ValueError: If no optimization parameters available :raises RuntimeError: If optimization fails in all restarts """ best_solution = None best_objective_value = float('inf') # Validate that technologies have optimization parameters has_optimization_params = False for tech in self.initial_energy_system.technologies: idx = tech.name.split("_")[-1] if "_" in tech.name else "0" tech_values, tech_variables, tech_bounds = tech.add_optimization_parameters(idx) if tech_values and tech_variables and tech_bounds: has_optimization_params = True break if not has_optimization_params: raise ValueError("No optimization parameters available. Energy system optimization requires " "technologies with configurable parameters (e.g., capacity, storage volume).") for restart in range(self.num_restarts): print(f"Starting optimization run {restart + 1}/{self.num_restarts}") # Create fresh copy for this optimization run self.energy_system_copy = self.initial_energy_system.copy() # Extract optimization parameters from all technologies initial_values = [] bounds = [] variables_mapping = {} for tech in self.energy_system_copy.technologies: idx = tech.name.split("_")[-1] if "_" in tech.name else "0" tech_values, tech_variables, tech_bounds = tech.add_optimization_parameters(idx) # Skip technologies without optimization parameters if not tech_values or not tech_variables or not tech_bounds: continue initial_values.extend(tech_values) bounds.extend(tech_bounds) # Map variables to technology for solution interpretation for var in tech_variables: variables_mapping[var] = tech.name variables_order = list(variables_mapping.keys()) if not initial_values: print("No optimization parameters found. Skipping optimization.") return self.initial_energy_system # Generate random initial values within parameter bounds random_initial_values = [ np.random.uniform(low=bound[0], high=bound[1]) if bound[1] > bound[0] else bound[0] for bound in bounds ] print(f"Initial values for restart {restart + 1}: {random_initial_values}") def objective_function(variables): """ Multi-objective function for energy system optimization. Parameters ---------- variables : array_like Technology parameter values for evaluation. Returns ------- float Weighted sum of optimization criteria. """ try: # Create fresh copy for objective evaluation fresh_energy_system = self.energy_system_copy.copy() # Calculate energy system performance with given parameters results = fresh_energy_system.calculate_mix(variables, variables_order) # Calculate weighted multi-objective value weighted_sum = ( self.weights['WGK_Gesamt'] * results['WGK_Gesamt'] + self.weights['specific_emissions_Gesamt'] * results['specific_emissions_Gesamt'] + self.weights['primärenergiefaktor_Gesamt'] * results['primärenergiefaktor_Gesamt'] ) return weighted_sum except Exception as e: print(f"Error in objective function evaluation: {e}") return float('inf') # Return large value for infeasible solutions # Perform optimization with SLSQP algorithm try: result = scipy_minimize( objective_function, random_initial_values, method='SLSQP', bounds=bounds, options={'maxiter': 1000, 'ftol': 1e-6} ) # Check if current solution is better than previous best if result.success and result.fun < best_objective_value: best_objective_value = result.fun best_solution = result print(f"New best solution found in restart {restart + 1}: {result.fun:.4f}") except Exception as e: print(f"Optimization failed in restart {restart + 1}: {e}") continue # Apply best solution if found if best_solution is not None: print(f"Optimization completed successfully. Best objective value: {best_objective_value:.4f}") # Apply optimal parameters to energy system for tech in self.energy_system_copy.technologies: idx = tech.name.split("_")[-1] if "_" in tech.name else "0" tech.set_parameters(best_solution.x, variables_order, idx) # Store optimization results self.best_solution = best_solution self.best_objective_value = best_objective_value return self.energy_system_copy else: raise RuntimeError("Optimization failed to find valid solution in all restart attempts. " "Consider adjusting parameter bounds, weights, or increasing restart attempts.")
[docs] def get_optimization_summary(self) -> Dict[str, Union[float, int, bool]]: """ Generate optimization summary report. :return: Summary dict with success, best_objective_value, num_restarts, etc. :rtype: dict """ if hasattr(self, 'best_solution') and self.best_solution is not None: return { 'success': True, 'best_objective_value': self.best_objective_value, 'num_restarts': self.num_restarts, 'optimization_message': f"Optimization successful with {self.num_restarts} restarts", 'solution_variables': self.best_solution.x.tolist(), 'function_evaluations': getattr(self.best_solution, 'nfev', 0), 'iterations': getattr(self.best_solution, 'nit', 0) } else: return { 'success': False, 'best_objective_value': float('inf'), 'num_restarts': self.num_restarts, 'optimization_message': "Optimization failed to find valid solution", 'solution_variables': [], 'function_evaluations': 0, 'iterations': 0 }