"""
Solar Thermal Collector System Module
=====================================
Solar thermal collector modeling with flat-plate and vacuum tube technologies.
:author: Dipl.-Ing. (FH) Jonas Pfeiffer
.. note::
Based on Scenocalc 2.0 solar thermal model (https://www.scfw.de)
"""
import numpy as np
from math import pi, exp, log, sqrt
from typing import Dict, Tuple, List, Optional, Union, Any
from districtheatingsim.heat_generators.solar_radiation import calculate_solar_radiation
from districtheatingsim.heat_generators.base_heat_generator import BaseHeatGenerator, BaseStrategy
[docs]
class SolarThermal(BaseHeatGenerator):
"""
Solar thermal collector system with storage.
:param name: Unique identifier
:type name: str
:param bruttofläche_STA: Gross collector area [m²]
:type bruttofläche_STA: float
:param vs: Storage volume [m³]
:type vs: float
:param Typ: Collector type ("Flachkollektor" or "Vakuumröhrenkollektor")
:type Typ: str
:param kosten_speicher_spez: Storage costs [€/m³], defaults to 750
:type kosten_speicher_spez: float, optional
:param kosten_fk_spez: Flat-plate costs [€/m²], defaults to 430
:type kosten_fk_spez: float, optional
.. note::
Includes detailed solar radiation calculations and efficiency modeling.
"""
[docs]
def __init__(self, name: str, bruttofläche_STA: float, vs: float, Typ: str,
kosten_speicher_spez: float = 750, kosten_fk_spez: float = 430,
kosten_vrk_spez: float = 590, Tsmax: float = 90, Longitude: float = -14.4222,
STD_Longitude: float = -15, Latitude: float = 51.1676,
East_West_collector_azimuth_angle: float = 0, Collector_tilt_angle: float = 36,
Tm_rl: float = 60, Qsa: float = 0, Vorwärmung_K: float = 8,
DT_WT_Solar_K: float = 5, DT_WT_Netz_K: float = 5,
opt_volume_min: float = 0, opt_volume_max: float = 200,
opt_area_min: float = 0, opt_area_max: float = 2000, active: bool = True):
"""
Initialize solar thermal collector system with technical and economic parameters.
:param name: Unique identifier
:type name: str
:param bruttofläche_STA: Gross collector area [m²]
:type bruttofläche_STA: float
:param vs: Storage volume [m³]
:type vs: float
:param Typ: Collector type ("Flachkollektor" or "Vakuumröhrenkollektor")
:type Typ: str
:param kosten_speicher_spez: Storage costs [€/m³], defaults to 750
:type kosten_speicher_spez: float
:param kosten_fk_spez: Flat-plate costs [€/m²], defaults to 430
:type kosten_fk_spez: float
:param kosten_vrk_spez: Vacuum tube costs [€/m²], defaults to 590
:type kosten_vrk_spez: float
"""
super().__init__(name)
self.bruttofläche_STA = bruttofläche_STA
self.vs = vs
self.Typ = Typ
self.kosten_speicher_spez = kosten_speicher_spez
self.kosten_fk_spez = kosten_fk_spez
self.kosten_vrk_spez = kosten_vrk_spez
self.Tsmax = Tsmax
self.Longitude = Longitude
self.STD_Longitude = STD_Longitude
self.Latitude = Latitude
self.East_West_collector_azimuth_angle = East_West_collector_azimuth_angle
self.Collector_tilt_angle = Collector_tilt_angle
self.Tm_rl = Tm_rl
self.Qsa = Qsa
self.Vorwärmung_K = Vorwärmung_K
self.DT_WT_Solar_K = DT_WT_Solar_K
self.DT_WT_Netz_K = DT_WT_Netz_K
self.opt_volume_min = opt_volume_min
self.opt_volume_max = opt_volume_max
self.opt_area_min = opt_area_min
self.opt_area_max = opt_area_max
self.active = active
# Technology-specific cost structure
self.kosten_pro_typ = {
# Viessmann Flachkollektor Vitosol 200-FM, 2,56 m²: 697,9 € (brutto); 586,5 € (netto) -> 229 €/m²
# + 200 €/m² Installation/Zubehör
"Flachkollektor": self.kosten_fk_spez,
# Ritter Vakuumröhrenkollektor CPC XL1921 (4,99m²): 2299 € (brutto); 1932 € (Netto) -> 387 €/m²
# + 200 €/m² Installation/Zubehör
"Vakuumröhrenkollektor": self.kosten_vrk_spez
}
self.Kosten_STA_spez = self.kosten_pro_typ[self.Typ] # €/m²
self.Nutzungsdauer = 20 # Jahre
self.f_Inst, self.f_W_Insp, self.Bedienaufwand = 0.5, 1, 0
self.Anteil_Förderung_BEW = 0.4
self.Betriebskostenförderung_BEW = 10 # €/MWh 10 Jahre
self.co2_factor_solar = 0.0 # tCO2/MWh heat is 0
self.primärenergiefaktor = 0.0
self.strategy = SolarThermalStrategy(charge_on=0, charge_off=0)
self.init_calculation_constants()
self.init_operation(8760)
[docs]
def init_calculation_constants(self) -> None:
"""
Initialize technology-specific calculation constants for collector performance modeling.
.. note::
Sets efficiency parameters, heat loss coefficients, IAM data, and geometric factors based on collector type.
Flat-plate: η0=0.763, c1=1.969 W/(m²·K). Vacuum tube: η0=0.693, c1=0.583 W/(m²·K).
"""
# Environmental parameters
self.Albedo = 0.2 # Ground reflection coefficient
self.wcorr = 0.5 # Wind speed correction factor
if self.Typ == "Flachkollektor":
# Flat-plate collector parameters (Vitosol 200-F XL13)
# Gross area is reference area
self.Eta0b_neu = 0.763
self.Kthetadiff = 0.931
self.Koll_c1 = 1.969
self.Koll_c2 = 0.015
self.Koll_c3 = 0
self.KollCeff_A = 9.053
self.KollAG = 13.17
self.KollAAp = 12.35
self.Aperaturfläche = self.bruttofläche_STA * (self.KollAAp / self.KollAG)
self.Bezugsfläche = self.bruttofläche_STA
self.IAM_W = {0: 1, 10: 1, 20: 0.99, 30: 0.98, 40: 0.96, 50: 0.91, 60: 0.82, 70: 0.53, 80: 0.27, 90: 0.0}
self.IAM_N = {0: 1, 10: 1, 20: 0.99, 30: 0.98, 40: 0.96, 50: 0.91, 60: 0.82, 70: 0.53, 80: 0.27, 90: 0.0}
elif self.Typ == "Vakuumröhrenkollektor":
# Vacuum tube collector parameters
# Aperture area is reference area
self.Eta0hem = 0.688
self.a1 = 0.583
self.a2 = 0.003
self.KollCeff_A = 8.78
self.KollAG = 4.94
self.KollAAp = 4.5
self.Koll_c1 = self.a1
self.Koll_c2 = self.a2
self.Koll_c3 = 0
self.Eta0b_neu = 0.693
self.Kthetadiff = 0.951
self.Aperaturfläche = self.bruttofläche_STA * (self.KollAAp / self.KollAG)
self.Bezugsfläche = self.Aperaturfläche
self.IAM_W = {0: 1, 10: 1.02, 20: 1.03, 30: 1.03, 40: 1.03, 50: 0.96, 60: 1.07, 70: 1.19, 80: 0.595, 90: 0.0}
self.IAM_N = {0: 1, 10: 1, 20: 0.99, 30: 0.96, 40: 0.93, 50: 0.9, 60: 0.87, 70: 0.86, 80: 0.43, 90: 0.0}
# Storage system parameters
self.QSmax = 1.16 * self.vs * (self.Tsmax - self.Tm_rl)
[docs]
def init_operation(self, hours: int) -> None:
"""
Initialize operational arrays for annual simulation.
:param hours: Number of simulation hours (typically 8760)
:type hours: int
"""
self.betrieb_mask = np.array([False] * hours)
self.Wärmeleistung_kW = np.zeros(hours, dtype=float)
self.Speicherinhalt = np.zeros(hours, dtype=float)
self.Speicherfüllstand = np.zeros(hours, dtype=float)
self.Wärmemenge_MWh = 0
self.Anzahl_Starts = 0
self.Betriebsstunden = 0
self.Betriebsstunden_pro_Start = 0
self.calculated = False # Flag to indicate if calculation is complete
# Initialize detailed calculation arrays
self.Tm_a_L = np.zeros(hours, dtype=float)
self.Pkoll_a_L = np.zeros(hours, dtype=float)
self.Pkoll_b_L = np.zeros(hours, dtype=float)
self.T_koll_a_L = np.zeros(hours, dtype=float)
self.T_koll_b_L = np.zeros(hours, dtype=float)
self.Tgkoll_a_L = np.zeros(hours, dtype=float)
self.Tgkoll_L = np.zeros(hours, dtype=float)
self.Tm_koll_L = np.zeros(hours, dtype=float)
self.Tm_L = np.zeros(hours, dtype=float)
self.Kollektorfeldertrag_L = np.zeros(hours, dtype=float)
self.Zieltemperatur_Solaranlage_L = np.zeros(hours, dtype=float)
self.TRL_Solar_L = np.zeros(hours, dtype=float)
self.TS_unten_L = np.zeros(hours, dtype=float)
self.Verlustwärmestrom_Speicher_L = np.zeros(hours, dtype=float)
self.Stagnation_L = np.zeros(hours, dtype=float)
[docs]
def calculate_heat_generation_costs(self, economic_parameters: Dict) -> float:
"""
Calculate levelized heat generation costs with subsidy integration.
:param economic_parameters: Economic parameters (interest_rate, inflation_rate, subsidy_eligibility, etc.)
:type economic_parameters: Dict
:return: Heat generation cost [€/MWh]
:rtype: float
.. note::
Includes BEW program: 40% investment cost reduction and 10 €/MWh operational incentive for 10 years.
"""
self.Strompreis = economic_parameters['electricity_price']
self.Gaspreis = economic_parameters['gas_price']
self.Holzpreis = economic_parameters['wood_price']
self.q = economic_parameters['capital_interest_rate']
self.r = economic_parameters['inflation_rate']
self.T = economic_parameters['time_period']
self.BEW = economic_parameters['subsidy_eligibility']
self.stundensatz = economic_parameters['hourly_rate']
if self.Wärmemenge_MWh == 0:
return 0
# Calculate investment costs
self.Investitionskosten_Speicher = self.vs * self.kosten_speicher_spez
self.Investitionskosten_STA = self.bruttofläche_STA * self.Kosten_STA_spez
self.Investitionskosten = self.Investitionskosten_Speicher + self.Investitionskosten_STA
# Calculate standard annuity without subsidies
self.A_N = self.annuity(
initial_investment_cost=self.Investitionskosten,
asset_lifespan_years=self.Nutzungsdauer,
installation_factor=self.f_Inst,
maintenance_inspection_factor=self.f_W_Insp,
operational_effort_h=self.Bedienaufwand,
interest_rate_factor=self.q,
inflation_rate_factor=self.r,
consideration_time_period_years=self.T,
annual_energy_demand=0,
energy_cost_per_unit=0,
annual_revenue=0,
hourly_rate=self.stundensatz
)
self.WGK = self.A_N / self.Wärmemenge_MWh
# Calculate subsidized costs with BEW program
self.Eigenanteil = 1 - self.Anteil_Förderung_BEW
self.Investitionskosten_Gesamt_BEW = self.Investitionskosten * self.Eigenanteil
self.Annuität_BEW = self.annuity(
initial_investment_cost=self.Investitionskosten_Gesamt_BEW,
asset_lifespan_years=self.Nutzungsdauer,
installation_factor=self.f_Inst,
maintenance_inspection_factor=self.f_W_Insp,
operational_effort_h=self.Bedienaufwand,
interest_rate_factor=self.q,
inflation_rate_factor=self.r,
consideration_time_period_years=self.T,
annual_energy_demand=0,
energy_cost_per_unit=0,
annual_revenue=0,
hourly_rate=self.stundensatz
)
self.WGK_BEW = self.Annuität_BEW / self.Wärmemenge_MWh
self.WGK_BEW_BKF = self.WGK_BEW - self.Betriebskostenförderung_BEW
if self.BEW == "Nein":
return self.WGK
elif self.BEW == "Ja":
return self.WGK_BEW_BKF
[docs]
def calculate_environmental_impact(self) -> None:
"""
Calculate environmental impact metrics (zero CO2 emissions, zero primary energy factor).
.. note::
Solar thermal has zero direct emissions and no fossil fuel dependency.
"""
# Calculate CO2 emissions (zero for renewable solar energy)
self.co2_emissions = self.Wärmemenge_MWh * self.co2_factor_solar # tCO2
# Calculate specific CO2 emissions per unit heat generated
self.spec_co2_total = (self.co2_emissions / self.Wärmemenge_MWh
if self.Wärmemenge_MWh > 0 else 0) # tCO2/MWh_heat
# Calculate primary energy consumption (zero for solar energy)
self.primärenergie_Solarthermie = self.Wärmemenge_MWh * self.primärenergiefaktor
[docs]
def calculate_solar_thermal_with_storage(self, Last_L: np.ndarray, VLT_L: np.ndarray,
RLT_L: np.ndarray, TRY_data: Tuple,
time_steps: np.ndarray, duration: float) -> None:
"""
Hourly solar thermal simulation with storage integration.
:param Last_L: Heat demand profile [kW]
:type Last_L: numpy.ndarray
:param VLT_L: Supply temperature [°C]
:type VLT_L: numpy.ndarray
:param RLT_L: Return temperature [°C]
:type RLT_L: numpy.ndarray
:param TRY_data: Weather data (air_temp, wind_speed, direct_rad, global_rad)
:type TRY_data: Tuple
:param time_steps: Time step array
:type time_steps: numpy.ndarray
:param duration: Time step duration [hours]
:type duration: float
.. note::
Calculates solar radiation, collector efficiency, storage stratification, and heat generation.
"""
self.Lufttemperatur_L, self.Windgeschwindigkeit_L, self.Direktstrahlung_L, self.Globalstrahlung_L = TRY_data[0], TRY_data[1], TRY_data[2], TRY_data[3]
# Calculate solar radiation on collector surface
self.GT_H_Gk, self.K_beam_L, self.GbT_L, self.GdT_H_Dk_L = calculate_solar_radiation(
time_steps, self.Globalstrahlung_L, self.Direktstrahlung_L, self.Longitude,
self.STD_Longitude, self.Latitude, self.Albedo,
self.East_West_collector_azimuth_angle,
self.Collector_tilt_angle, self.IAM_W, self.IAM_N
)
# Hourly simulation loop
n_steps = len(time_steps)
for i in range(n_steps):
# Calculate effective solar radiation terms
Eta0b_neu_K_beam_GbT = self.Eta0b_neu * self.K_beam_L[i] * self.GbT_L[i]
Eta0b_neu_Kthetadiff_GdT_H_Dk = self.Eta0b_neu * self.Kthetadiff * self.GdT_H_Dk_L[i]
if i == 0:
# Initialize first time step
self.TS_unten_L[i] = RLT_L[i]
self.TRL_Solar_L[i] = RLT_L[i]
self.Zieltemperatur_Solaranlage_L[i] = self.TS_unten_L[i] + self.Vorwärmung_K + self.DT_WT_Solar_K + self.DT_WT_Netz_K
self.Tm_a_L[i] = (self.Zieltemperatur_Solaranlage_L[i] + self.TRL_Solar_L[i]) / 2
self.Pkoll_a_L[i] = 0
self.Tgkoll_a_L[i] = 9.3
self.T_koll_a_L[i] = self.Lufttemperatur_L[i] - (self.Lufttemperatur_L[i] - self.Tgkoll_a_L[i]) * exp(-self.Koll_c1 / self.KollCeff_A * 3.6) + (self.Pkoll_a_L[i] * 3600) / (
self.KollCeff_A * self.Bezugsfläche)
self.Pkoll_b_L[i] = 0
self.T_koll_b_L[i] = self.Lufttemperatur_L[i] - (self.Lufttemperatur_L[i] - 0) * exp(-self.Koll_c1 / self.KollCeff_A * 3.6) + (self.Pkoll_b_L[i] * 3600) / (
self.KollCeff_A * self.Bezugsfläche)
self.Tgkoll_L[i] = 9.3
self.Tm_koll_L[i] = (self.T_koll_a_L[i] + self.T_koll_b_L[i]) / 2
self.Kollektorfeldertrag_L[i] = 0
self.Wärmeleistung_kW[i] = min(self.Kollektorfeldertrag_L[i], Last_L[i])
self.Verlustwärmestrom_Speicher_L[i] = 0
self.Speicherinhalt[i] = self.Qsa * 1000
self.Speicherfüllstand[i] = self.Speicherinhalt[i] / self.QSmax
self.Stagnation_L[i] = 0
else:
# Calculate storage temperature stratification
if self.Speicherfüllstand[i - 1] >= 0.8:
self.TS_unten_L[i] = RLT_L[i] + self.DT_WT_Netz_K + (2/3 * (VLT_L[i] - RLT_L[i]) / 0.2 * self.Speicherfüllstand[i - 1]) + (1 / 3 * (VLT_L[i] - RLT_L[i])) - (2/3 * (VLT_L[i] - RLT_L[i]) / 0.2 * self.Speicherfüllstand[i - 1])
else:
self.TS_unten_L[i] = RLT_L[i] + self.DT_WT_Netz_K + (1 / 3 * (VLT_L[i] - RLT_L[i]) / 0.8) * self.Speicherfüllstand[i - 1]
# Calculate solar circuit temperatures
self.Zieltemperatur_Solaranlage_L[i] = self.TS_unten_L[i] + self.Vorwärmung_K + self.DT_WT_Solar_K + self.DT_WT_Netz_K
self.TRL_Solar_L[i] = self.TS_unten_L[i] + self.DT_WT_Solar_K
self.Tm_a_L[i] = (self.Zieltemperatur_Solaranlage_L[i] + self.TRL_Solar_L[i]) / 2
# Calculate collector performance with thermal losses
c1a = self.Koll_c1 * (self.Tm_a_L[i] - self.Lufttemperatur_L[i])
c2a = self.Koll_c2 * (self.Tm_a_L[i] - self.Lufttemperatur_L[i]) ** 2
c3a = self.Koll_c3 * self.wcorr * self.Windgeschwindigkeit_L[i] * (self.Tm_a_L[i] - self.Lufttemperatur_L[i])
self.Pkoll_a_L[i] = max(0, (Eta0b_neu_K_beam_GbT + Eta0b_neu_Kthetadiff_GdT_H_Dk - c1a - c2a - c3a) * self.Bezugsfläche / 1000)
self.T_koll_a_L[i] = self.Lufttemperatur_L[i] - (self.Lufttemperatur_L[i] - self.Tgkoll_a_L[i - 1]) * exp(-self.Koll_c1 / self.KollCeff_A * 3.6) + (self.Pkoll_a_L[i] * 3600) / (
self.KollCeff_A * self.Bezugsfläche)
# Calculate alternative collector state
c1b = self.Koll_c1 * (self.T_koll_b_L[i - 1] - self.Lufttemperatur_L[i])
c2b = self.Koll_c2 * (self.T_koll_b_L[i - 1] - self.Lufttemperatur_L[i]) ** 2
c3b = self.Koll_c3 * self.wcorr * self.Windgeschwindigkeit_L[i] * (self.T_koll_b_L[i - 1] - self.Lufttemperatur_L[i])
self.Pkoll_b_L[i] = max(0, (Eta0b_neu_K_beam_GbT + Eta0b_neu_Kthetadiff_GdT_H_Dk - c1b - c2b - c3b) * self.Bezugsfläche / 1000)
self.T_koll_b_L[i] = self.Lufttemperatur_L[i] - (self.Lufttemperatur_L[i] - self.Tgkoll_a_L[i - 1]) * exp(-self.Koll_c1 / self.KollCeff_A * 3.6) + (self.Pkoll_b_L[i] * 3600) / (
self.KollCeff_A * self.Bezugsfläche)
# Temperature control and collector field output
self.Tgkoll_a_L[i] = min(self.Zieltemperatur_Solaranlage_L[i], self.T_koll_a_L[i])
self.Tm_koll_L[i] = (self.T_koll_a_L[i] + self.T_koll_b_L[i]) / 2
Tm_sys = (self.Zieltemperatur_Solaranlage_L[i] + self.TRL_Solar_L[i]) / 2
if self.Tm_koll_L[i] < Tm_sys and self.Tm_koll_L[i - 1] < Tm_sys:
self.Tm_L[i] = self.Tm_koll_L[i]
else:
self.Tm_L[i] = Tm_sys
# Final collector output calculation
c1 = self.Koll_c1 * (self.Tm_L[i] - self.Lufttemperatur_L[i])
c2 = self.Koll_c2 * (self.Tm_L[i] - self.Lufttemperatur_L[i]) ** 2
c3 = self.Koll_c3 * self.wcorr * self.Windgeschwindigkeit_L[i] * (self.Tm_L[i] - self.Lufttemperatur_L[i])
Pkoll = max(0, (Eta0b_neu_K_beam_GbT + Eta0b_neu_Kthetadiff_GdT_H_Dk - c1 - c2 - c3) * self.Bezugsfläche / 1000)
# Temperature rise calculation
T_koll = self.Lufttemperatur_L[i] - (self.Lufttemperatur_L[i] - self.Tgkoll_L[i - 1]) * exp(-self.Koll_c1 / self.KollCeff_A * 3.6) + (Pkoll * 3600) / (
self.KollCeff_A * self.Bezugsfläche)
self.Tgkoll_L[i] = min(self.Zieltemperatur_Solaranlage_L[i], T_koll)
# Collector field yield calculation
if T_koll > self.Tgkoll_L[i - 1]:
Pkoll_temp_corr = (T_koll-self.Tgkoll_L[i])/(T_koll-self.Tgkoll_L[i - 1]) * Pkoll if self.Tgkoll_L[i] >= self.Zieltemperatur_Solaranlage_L[i] else 0
self.Kollektorfeldertrag_L[i] = max(0, min(Pkoll, Pkoll_temp_corr)) if self.Stagnation_L[i - 1] <= 0 else 0
else:
self.Kollektorfeldertrag_L[i] = 0
# Heat output and storage balance
self.Wärmeleistung_kW[i] = min(self.Kollektorfeldertrag_L[i] + self.Speicherinhalt[i - 1], Last_L[i]) if self.Kollektorfeldertrag_L[i] + self.Speicherinhalt[i - 1] > 0 else 0
# Storage energy balance
Stagnationsverluste = max(0, self.Speicherinhalt[i - 1] - self.Verlustwärmestrom_Speicher_L[i - 1] + self.Kollektorfeldertrag_L[i] - self.Wärmeleistung_kW[i] - self.QSmax)
PSin = self.Kollektorfeldertrag_L[i] - Stagnationsverluste
if self.Speicherinhalt[i - 1] - self.Verlustwärmestrom_Speicher_L[i - 1] + PSin - self.Wärmeleistung_kW[i] > self.QSmax:
self.Speicherinhalt[i] = self.QSmax
else:
self.Speicherinhalt[i] = self.Speicherinhalt[i - 1] - self.Verlustwärmestrom_Speicher_L[i - 1] + PSin - self.Wärmeleistung_kW[i]
# Storage temperature and heat loss calculation
self.Speicherfüllstand[i] = self.Speicherinhalt[i] / self.QSmax
TS_oben = self.Zieltemperatur_Solaranlage_L[i] - self.DT_WT_Solar_K
if self.Speicherinhalt[i] <= 0:
berechnete_temperatur = TS_oben
else:
temperaturverhältnis = (TS_oben - self.Tm_rl) / (self.Tsmax - self.Tm_rl)
if self.Speicherfüllstand[i] < temperaturverhältnis:
berechnete_temperatur = VLT_L[i] + self.DT_WT_Netz_K
else:
berechnete_temperatur = self.Tsmax
gewichtete_untere_temperatur = (1 - self.Speicherfüllstand[i]) * self.TS_unten_L[i]
Tms = self.Speicherfüllstand[i] * berechnete_temperatur + gewichtete_untere_temperatur
self.Verlustwärmestrom_Speicher_L[i] = 0.75 * (self.vs * 1000) ** 0.5 * 0.16 * (Tms - self.Lufttemperatur_L[i]) / 1000
# Stagnation detection
self.Stagnation_L[i] = 1 if np.datetime_as_string(time_steps[i], unit='D') == np.datetime_as_string(time_steps[i - 1], unit='D') and self.Kollektorfeldertrag_L[i] > Last_L[i] and self.Speicherinhalt[i] >= self.QSmax else 0
# Calculate total annual heat generation
self.Wärmemenge_MWh = np.sum(self.Wärmeleistung_kW) * duration / 1000 # kWh -> MWh
[docs]
def generate(self, t: int, **kwargs) -> Tuple[float, float]:
"""
Generate instantaneous heat output with detailed collector and storage modeling.
:param t: Current time step index
:type t: int
:param kwargs: Simulation parameters (remaining_load, storage temps, TRY_data, time_steps, etc.)
:return: (heat_output[kW], electrical_output[kW]=0)
:rtype: Tuple[float, float]
.. note::
Implements dual collector A/B approach, temperature stratification, and stagnation prevention.
"""
# Extract simulation parameters from kwargs
remaining_load = kwargs.get('remaining_load', 0.0)
upper_storage_temperature = kwargs.get('upper_storage_temperature', 70.0)
lower_storage_temperature = kwargs.get('lower_storage_temperature', 40.0)
current_storage_state = kwargs.get('current_storage_state', 0.0)
available_energy = kwargs.get('available_energy', 0.0)
max_energy = kwargs.get('max_energy', 1000.0)
Q_loss = kwargs.get('Q_loss', 0.0)
TRY_data = kwargs.get('TRY_data', (20.0, 2.0, 200.0, 400.0))
time_steps = kwargs.get('time_steps', np.array([]))
duration = kwargs.get('duration', 1.0)
# Initialize weather data and solar radiation calculations at t=0
if t == 0:
# Extract weather data components
self.Lufttemperatur_L, self.Windgeschwindigkeit_L, self.Direktstrahlung_L, self.Globalstrahlung_L = TRY_data[0], TRY_data[1], TRY_data[2], TRY_data[3]
# Calculate solar radiation on collector surface
self.GT_H_Gk, self.K_beam_L, self.GbT_L, self.GdT_H_Dk_L = calculate_solar_radiation(
time_steps, self.Globalstrahlung_L, self.Direktstrahlung_L, self.Longitude,
self.STD_Longitude, self.Latitude, self.Albedo,
self.East_West_collector_azimuth_angle,
self.Collector_tilt_angle, self.IAM_W, self.IAM_N
)
# Perform heat generation calculation only if system is active
if self.active:
# Calculate optical efficiency components for current time step
Eta0b_neu_K_beam_GbT = self.Eta0b_neu * self.K_beam_L[t] * self.GbT_L[t]
Eta0b_neu_Kthetadiff_GdT_H_Dk = self.Eta0b_neu * self.Kthetadiff * self.GdT_H_Dk_L[t]
# Update storage parameters from external storage system
self.QSmax = max_energy # Maximum storage energy capacity [kWh]
self.Speicherinhalt[t] = available_energy # Current storage energy content [kWh]
self.Speicherfüllstand[t] = current_storage_state # Storage filling level [-]
# Update storage heat losses and initialize stagnation state
self.Verlustwärmestrom_Speicher_L[t] = Q_loss # Storage heat losses [kW]
self.Stagnation_L[t] = 0 # Initialize stagnation indicator
# Initial conditions for first time step
if t == 0:
# Initialize storage and solar circuit temperatures
self.TS_unten_L[t] = lower_storage_temperature
self.TRL_Solar_L[t] = lower_storage_temperature
# Calculate target temperature for solar circuit
self.Zieltemperatur_Solaranlage_L[t] = (self.TS_unten_L[t] + self.Vorwärmung_K +
self.DT_WT_Solar_K + self.DT_WT_Netz_K)
# Calculate mean collector A temperature
self.Tm_a_L[t] = (self.Zieltemperatur_Solaranlage_L[t] + self.TRL_Solar_L[t]) / 2
# Initialize collector power outputs and temperatures
self.Pkoll_a_L[t] = 0
self.Tgkoll_a_L[t] = 9.3 # Initial glycol temperature [°C]
# Calculate initial collector A temperature using thermal time constant
self.T_koll_a_L[t] = (self.Lufttemperatur_L[t] -
(self.Lufttemperatur_L[t] - self.Tgkoll_a_L[t]) *
exp(-self.Koll_c1 / self.KollCeff_A * 3.6) +
(self.Pkoll_a_L[t] * 3600) / (self.KollCeff_A * self.Bezugsfläche))
# Initialize collector B with zero initial conditions
self.Pkoll_b_L[t] = 0
self.T_koll_b_L[t] = (self.Lufttemperatur_L[t] -
(self.Lufttemperatur_L[t] - 0) *
exp(-self.Koll_c1 / self.KollCeff_A * 3.6) +
(self.Pkoll_b_L[t] * 3600) / (self.KollCeff_A * self.Bezugsfläche))
# Initialize system glycol temperature
self.Tgkoll_L[t] = 9.3
# No collector field output at initialization
self.Kollektorfeldertrag_L[t] = 0
self.Wärmeleistung_kW[t] = min(self.Kollektorfeldertrag_L[t], remaining_load)
else:
# Calculate lower storage tank temperature based on stratification model
if self.Speicherfüllstand[t - 1] >= 0.8:
# High filling level: complex stratification calculation
self.TS_unten_L[t] = (lower_storage_temperature + self.DT_WT_Netz_K +
(2/3 * (upper_storage_temperature - lower_storage_temperature) / 0.2 *
self.Speicherfüllstand[t - 1]) +
(1/3 * (upper_storage_temperature - lower_storage_temperature)) -
(2/3 * (upper_storage_temperature - lower_storage_temperature) / 0.2 *
self.Speicherfüllstand[t - 1]))
else:
# Low filling level: simplified linear stratification
self.TS_unten_L[t] = (lower_storage_temperature + self.DT_WT_Netz_K +
(1/3 * (upper_storage_temperature - lower_storage_temperature) / 0.8) *
self.Speicherfüllstand[t - 1])
# Calculate solar circuit target and return temperatures
self.Zieltemperatur_Solaranlage_L[t] = (self.TS_unten_L[t] + self.Vorwärmung_K +
self.DT_WT_Solar_K + self.DT_WT_Netz_K)
self.TRL_Solar_L[t] = self.TS_unten_L[t] + self.DT_WT_Solar_K
# Calculate mean collector A temperature for performance calculation
self.Tm_a_L[t] = (self.Zieltemperatur_Solaranlage_L[t] + self.TRL_Solar_L[t]) / 2
# Calculate collector A thermal loss coefficients
c1a = self.Koll_c1 * (self.Tm_a_L[t] - self.Lufttemperatur_L[t])
c2a = self.Koll_c2 * (self.Tm_a_L[t] - self.Lufttemperatur_L[t]) ** 2
c3a = self.Koll_c3 * self.wcorr * self.Windgeschwindigkeit_L[t] * (self.Tm_a_L[t] - self.Lufttemperatur_L[t])
# Calculate collector A power output with thermal losses
self.Pkoll_a_L[t] = max(0, (Eta0b_neu_K_beam_GbT + Eta0b_neu_Kthetadiff_GdT_H_Dk -
c1a - c2a - c3a) * self.Bezugsfläche / 1000)
# Calculate collector A temperature evolution
self.T_koll_a_L[t] = (self.Lufttemperatur_L[t] -
(self.Lufttemperatur_L[t] - self.Tgkoll_a_L[t - 1]) *
exp(-self.Koll_c1 / self.KollCeff_A * 3.6) +
(self.Pkoll_a_L[t] * 3600) / (self.KollCeff_A * self.Bezugsfläche))
# Calculate collector B thermal loss coefficients using previous temperature
c1b = self.Koll_c1 * (self.T_koll_b_L[t - 1] - self.Lufttemperatur_L[t])
c2b = self.Koll_c2 * (self.T_koll_b_L[t - 1] - self.Lufttemperatur_L[t]) ** 2
c3b = self.Koll_c3 * self.wcorr * self.Windgeschwindigkeit_L[t] * (self.T_koll_b_L[t - 1] - self.Lufttemperatur_L[t])
# Calculate collector B power output
self.Pkoll_b_L[t] = max(0, (Eta0b_neu_K_beam_GbT + Eta0b_neu_Kthetadiff_GdT_H_Dk -
c1b - c2b - c3b) * self.Bezugsfläche / 1000)
# Calculate collector B temperature evolution
self.T_koll_b_L[t] = (self.Lufttemperatur_L[t] -
(self.Lufttemperatur_L[t] - self.Tgkoll_a_L[t - 1]) *
exp(-self.Koll_c1 / self.KollCeff_A * 3.6) +
(self.Pkoll_b_L[t] * 3600) / (self.KollCeff_A * self.Bezugsfläche))
# Calculate glycol temperature with target temperature limitation
self.Tgkoll_a_L[t] = min(self.Zieltemperatur_Solaranlage_L[t], self.T_koll_a_L[t])
# Calculate average collector temperature and system mean temperature
self.Tm_koll_L[t] = (self.T_koll_a_L[t] + self.T_koll_b_L[t]) / 2
Tm_sys = (self.Zieltemperatur_Solaranlage_L[t] + self.TRL_Solar_L[t]) / 2
# Select appropriate mean temperature for system calculation
if self.Tm_koll_L[t] < Tm_sys and self.Tm_koll_L[t - 1] < Tm_sys:
self.Tm_L[t] = self.Tm_koll_L[t]
else:
self.Tm_L[t] = Tm_sys
# Calculate system collector power output using system mean temperature
c1 = self.Koll_c1 * (self.Tm_L[t] - self.Lufttemperatur_L[t])
c2 = self.Koll_c2 * (self.Tm_L[t] - self.Lufttemperatur_L[t]) ** 2
c3 = self.Koll_c3 * self.wcorr * self.Windgeschwindigkeit_L[t] * (self.Tm_L[t] - self.Lufttemperatur_L[t])
Pkoll = max(0, (Eta0b_neu_K_beam_GbT + Eta0b_neu_Kthetadiff_GdT_H_Dk -
c1 - c2 - c3) * self.Bezugsfläche / 1000)
# Calculate collector temperature with thermal inertia
T_koll = (self.Lufttemperatur_L[t] -
(self.Lufttemperatur_L[t] - self.Tgkoll_L[t - 1]) *
exp(-self.Koll_c1 / self.KollCeff_A * 3.6) +
(Pkoll * 3600) / (self.KollCeff_A * self.Bezugsfläche))
# Apply target temperature limitation to glycol temperature
self.Tgkoll_L[t] = min(self.Zieltemperatur_Solaranlage_L[t], T_koll)
# Calculate collector field output with temperature corrections
if T_koll > self.Tgkoll_L[t - 1]:
# Apply temperature correction factor if target temperature is reached
Pkoll_temp_corr = ((T_koll - self.Tgkoll_L[t]) / (T_koll - self.Tgkoll_L[t - 1]) * Pkoll
if self.Tgkoll_L[t] >= self.Zieltemperatur_Solaranlage_L[t] else 0)
# Determine collector field output considering stagnation protection
self.Kollektorfeldertrag_L[t] = (max(0, min(Pkoll, Pkoll_temp_corr))
if self.Stagnation_L[t - 1] <= 0 else 0)
else:
# No output if collector temperature is not increasing
self.Kollektorfeldertrag_L[t] = 0
# Set system heat output to collector field output
self.Wärmeleistung_kW[t] = self.Kollektorfeldertrag_L[t]
# Check for stagnation conditions and activate protection
same_day = (np.datetime_as_string(time_steps[t], unit='D') ==
np.datetime_as_string(time_steps[t - 1], unit='D'))
excess_generation = self.Kollektorfeldertrag_L[t] > remaining_load
storage_full = self.Speicherinhalt[t] >= self.QSmax
self.Stagnation_L[t] = 1 if (same_day and excess_generation and storage_full) else 0
else:
# System inactive: no heat generation
self.Wärmeleistung_kW[t] = 0
# Update cumulative energy generation
self.Wärmemenge_MWh += self.Wärmeleistung_kW[t] * duration / 1000 # Convert kW·h to MWh
# Return heat output and zero electrical output (solar thermal only)
return self.Wärmeleistung_kW[t], 0
[docs]
def calculate(self, economic_parameters: Dict[str, Union[float, str]], duration: float,
load_profile: np.ndarray, **kwargs) -> Dict[str, Union[str, float, np.ndarray]]:
"""
Comprehensive system analysis including performance and economic evaluation.
:param economic_parameters: Economic parameters (interest_rate, inflation_rate, subsidies, etc.)
:type economic_parameters: Dict[str, Union[float, str]]
:param duration: Time step duration [hours]
:type duration: float
:param load_profile: Heat demand profile [kW]
:type load_profile: numpy.ndarray
:param kwargs: VLT_L, RLT_L, TRY_data, time_steps
:return: Results dict with Wärmemenge, Wärmeleistung_L, WGK, operational stats, environmental data
:rtype: Dict[str, Union[str, float, np.ndarray]]
.. note::
Performs thermal simulation, operational analysis, economic cost assessment, and environmental evaluation.
"""
# Extract additional calculation parameters
VLT_L = kwargs.get('VLT_L', np.full(len(load_profile), 70.0))
RLT_L = kwargs.get('RLT_L', np.full(len(load_profile), 40.0))
TRY_data = kwargs.get('TRY_data', (np.full(len(load_profile), 10.0),) * 4)
time_steps = kwargs.get('time_steps', np.arange(len(load_profile)))
# Perform thermal calculation if not already completed
if not self.calculated:
# Execute complete solar thermal system calculation
self.calculate_solar_thermal_with_storage(
load_profile,
VLT_L,
RLT_L,
TRY_data,
time_steps,
duration
)
# Mark calculation as completed
self.calculated = True
# Calculate operational statistics
betrieb_mask = self.Wärmeleistung_kW > 0
starts = np.diff(betrieb_mask.astype(int)) > 0
self.Anzahl_Starts = np.sum(starts)
self.Betriebsstunden = np.sum(betrieb_mask) * duration
self.Betriebsstunden_pro_Start = (self.Betriebsstunden / self.Anzahl_Starts
if self.Anzahl_Starts > 0 else 0)
# Calculate economic performance
self.WGK = self.calculate_heat_generation_costs(economic_parameters)
# Calculate environmental impact
self.calculate_environmental_impact()
# Compile comprehensive results dictionary
results = {
'tech_name': self.name,
'Wärmemenge': self.Wärmemenge_MWh,
'Wärmeleistung_L': self.Wärmeleistung_kW,
'WGK': self.WGK,
'Anzahl_Starts': self.Anzahl_Starts,
'Betriebsstunden': self.Betriebsstunden,
'Betriebsstunden_pro_Start': self.Betriebsstunden_pro_Start,
'spec_co2_total': self.spec_co2_total,
'primärenergie': self.primärenergie_Solarthermie,
'Speicherladung_L': self.Speicherinhalt,
'Speicherfüllstand_L': self.Speicherfüllstand,
'color': "red" # Red color for solar thermal visualization
}
return results
[docs]
def set_parameters(self, variables: List[float], variables_order: List[str], idx: int) -> None:
"""
Set optimization parameters from optimizer variable list.
:param variables: Optimization variable values
:type variables: List[float]
:param variables_order: Variable names corresponding to values
:type variables_order: List[str]
:param idx: Technology index for unique parameter identification
:type idx: int
.. note::
Updates bruttofläche_STA and vs from variables with names f"bruttofläche_STA_{idx}" and f"vs_{idx}".
"""
try:
# Extract collector area from optimization variables
area_var_name = f"bruttofläche_STA_{idx}"
if area_var_name in variables_order:
area_index = variables_order.index(area_var_name)
self.bruttofläche_STA = variables[area_index]
# Extract storage volume from optimization variables
volume_var_name = f"vs_{idx}"
if volume_var_name in variables_order:
volume_index = variables_order.index(volume_var_name)
self.vs = variables[volume_index]
# Recalculate dependent parameters after optimization update
self.init_calculation_constants()
except (ValueError, IndexError) as e:
print(f"Error setting optimization parameters for {self.name}: {e}")
print(f"Available variables: {variables_order}")
print(f"Expected variables: bruttofläche_STA_{idx}, vs_{idx}")
[docs]
def add_optimization_parameters(self, idx: int) -> Tuple[List[float], List[str], List[Tuple[float, float]]]:
"""
Define optimization parameters for solar thermal system sizing.
:param idx: Technology index for unique parameter names
:type idx: int
:return: (initial_values, variables_order, bounds)
:rtype: Tuple[List[float], List[str], List[Tuple[float, float]]]
.. note::
Returns [bruttofläche_STA, vs] with bounds from opt_area_min/max and opt_volume_min/max.
"""
# Define initial values from current system configuration
initial_values = [self.bruttofläche_STA, self.vs]
# Create unique variable names using technology index
variables_order = [f"bruttofläche_STA_{idx}", f"vs_{idx}"]
# Define optimization bounds from system constraints
bounds = [
(self.opt_area_min, self.opt_area_max), # Collector area bounds [m²]
(self.opt_volume_min, self.opt_volume_max) # Storage volume bounds [m³]
]
return initial_values, variables_order, bounds
[docs]
def get_display_text(self) -> str:
"""
Generate formatted display text for GUI representation.
:return: Formatted text with key system parameters
:rtype: str
"""
return (f"{self.name}: Bruttokollektorfläche: {self.bruttofläche_STA:.1f} m², "
f"Volumen Solarspeicher: {self.vs:.1f} m³, Kollektortyp: {self.Typ}, "
f"spez. Kosten Speicher: {self.kosten_speicher_spez:.1f} €/m³, "
f"spez. Kosten Flachkollektor: {self.kosten_fk_spez:.1f} €/m², "
f"spez. Kosten Röhrenkollektor: {self.kosten_vrk_spez:.1f} €/m²")
[docs]
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'SolarThermal':
"""
Create SolarThermal object from dictionary representation.
:param data: Dictionary containing SolarThermal attributes
:type data: Dict[str, Any]
:return: Restored SolarThermal object
:rtype: SolarThermal
.. note::
Ensures IAM dictionaries are properly restored when loading saved objects.
"""
# Create object using base class method
obj = super().from_dict(data)
# Ensure IAM dictionaries exist and are valid
# If they are missing or empty, reinitialize calculation constants
if not hasattr(obj, 'IAM_W') or not obj.IAM_W or not isinstance(obj.IAM_W, dict):
obj.init_calculation_constants()
elif not hasattr(obj, 'IAM_N') or not obj.IAM_N or not isinstance(obj.IAM_N, dict):
obj.init_calculation_constants()
return obj
[docs]
class SolarThermalStrategy(BaseStrategy):
"""
Control strategy for solar thermal systems.
:param charge_on: Temperature threshold for activation [°C]
:type charge_on: int
:param charge_off: Temperature threshold for deactivation [°C], optional
:type charge_off: int, optional
.. note::
Operates continuously when solar irradiation available.
"""
[docs]
def __init__(self, charge_on: int, charge_off: Optional[int] = None):
"""
Initialize solar thermal control strategy.
:param charge_on: Activation temperature threshold [°C]
:type charge_on: int
:param charge_off: Deactivation temperature threshold [°C], optional
:type charge_off: int, optional
"""
super().__init__(charge_on, charge_off)
[docs]
def decide_operation(self, current_state: float, upper_storage_temp: float,
lower_storage_temp: float, remaining_demand: float) -> bool:
"""
Decide solar thermal operation (always True for renewable energy priority).
:param current_state: Current system state (not used)
:type current_state: float
:param upper_storage_temp: Upper storage temperature [°C] (not used)
:type upper_storage_temp: float
:param lower_storage_temp: Lower storage temperature [°C] (not used)
:type lower_storage_temp: float
:param remaining_demand: Remaining heat demand [kW] (not used)
:type remaining_demand: float
:return: Always True (renewable energy priority strategy)
:rtype: bool
"""
# Solar thermal operates continuously when solar irradiation is available
# Operation is weather-dependent and prioritizes renewable energy harvesting
return True