Source code for districtheatingsim.net_simulation_pandapipes.utilities

"""
Pandapipes network utility functions for district heating simulation, optimization,
and analysis including heat pump calculations, controller creation, and diameter optimization.

:author: Dipl.-Ing. (FH) Jonas Pfeiffer
"""

import time
import logging
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import LineString
from scipy.interpolate import RegularGridInterpolator

import pandapipes as pp
from pandapipes.control.run_control import run_control

from pandapower.timeseries import DFData
from pandapower.control.controller.const_control import ConstControl

from districtheatingsim.utilities.utilities import get_resource_path
from districtheatingsim.net_simulation_pandapipes.controllers import MinimumSupplyTemperatureController, BadPointPressureLiftController

from districtheatingsim.net_generation.network_geojson_schema import NetworkGeoJSONSchema

from typing import Optional, List, Tuple, Dict, Any, Union

# Initialize logging
logging.basicConfig(level=logging.INFO)

[docs] def validate_minimum_pressure_difference(net, target_dp_min_bar: float = 1.0, verbose: bool = True) -> Tuple[bool, List[Dict[str, Any]]]: """ Validate that all heat consumers meet minimum pressure difference requirements. :param net: Network with current pipeflow results :type net: pandapipes.pandapipesNet :param target_dp_min_bar: Minimum required pressure difference [bar] :type target_dp_min_bar: float :param verbose: Print detailed warnings for violations :type verbose: bool :return: (all_ok, violations) - True if all consumers meet requirements, list of violations :rtype: Tuple[bool, List[Dict[str, Any]]] .. note:: Design validation only - does NOT modify pump parameters like BadPointPressureLiftController. """ violations = [] if verbose: print(f"\n{'='*80}") print(f"VALIDATING MINIMUM PRESSURE DIFFERENCE") print(f"Target: dp_min >= {target_dp_min_bar} bar") print(f"{'='*80}\n") # Check each active heat consumer for idx in net.heat_consumer.index: qext = net.heat_consumer.at[idx, 'qext_w'] # Only check active consumers if qext == 0: continue p_from = net.res_heat_consumer.at[idx, 'p_from_bar'] p_to = net.res_heat_consumer.at[idx, 'p_to_bar'] dp = p_from - p_to consumer_name = net.heat_consumer.at[idx, 'name'] if 'name' in net.heat_consumer.columns else f"Consumer {idx}" if dp < target_dp_min_bar: violation = { 'index': idx, 'name': consumer_name, 'dp_bar': dp, 'deficit_bar': target_dp_min_bar - dp, 'p_from_bar': p_from, 'p_to_bar': p_to, 'qext_w': qext } violations.append(violation) if verbose: print(f"⚠ WARNING: {consumer_name}") print(f" Pressure difference: {dp:.3f} bar < {target_dp_min_bar} bar") print(f" Deficit: {target_dp_min_bar - dp:.3f} bar") print(f" p_from: {p_from:.3f} bar, p_to: {p_to:.3f} bar") print(f" Heat demand: {qext/1000:.1f} kW\n") else: if verbose: print(f"✓ OK: {consumer_name} - dp={dp:.3f} bar >= {target_dp_min_bar} bar") all_ok = len(violations) == 0 if verbose: print(f"\n{'='*80}") if all_ok: print(f"✓ VALIDATION PASSED - All consumers meet minimum pressure requirements") else: print(f"✗ VALIDATION FAILED - {len(violations)} consumers below minimum pressure") print(f"\nRecommendations:") print(f" 1. Increase pump pressure (p_flow_bar or plift_bar)") print(f" 2. Use larger pipe diameters") print(f" 3. Reduce pipe lengths if possible") print(f" 4. Check for flow restrictions") print(f"{'='*80}\n") return all_ok, violations
[docs] def COP_WP(VLT_L: Union[float, np.ndarray], QT: Union[float, np.ndarray], values: Optional[np.ndarray] = None) -> Tuple[np.ndarray, np.ndarray]: """ Calculate heat pump Coefficient of Performance (COP) based on supply and source temperatures. :param VLT_L: Supply temperature(s) for heat pump [°C] :type VLT_L: Union[float, np.ndarray] :param QT: Source temperature(s) for heat pump [°C] :type QT: Union[float, np.ndarray] :param values: Heat pump performance data matrix (default loads from CSV) :type values: Optional[np.ndarray] :return: (COP values [-], adjusted supply temperatures [°C]) :rtype: Tuple[np.ndarray, np.ndarray] :raises ValueError: If QT array length doesn't match VLT_L length :raises FileNotFoundError: If default COP data file not found .. note:: Technical constraints: max temp lift 75°C (VLT_L ≤ QT + 75°C), min supply temp 35°C. """ # Load default COP data if not provided if values is None: values = np.genfromtxt(get_resource_path('data/COP/Kennlinien WP.csv'), delimiter=';') # Extract temperature grids and COP matrix row_header = values[0, 1:] # Supply temperatures [°C] col_header = values[1:, 0] # Source temperatures [°C] cop_matrix = values[1:, 1:] # COP values [-] # Create interpolation function f = RegularGridInterpolator((col_header, row_header), cop_matrix, method='linear') # Apply technical limits of heat pump operation VLT_L = np.minimum(VLT_L, 75 + QT) # Maximum temperature lift constraint VLT_L = np.maximum(VLT_L, 35) # Minimum supply temperature constraint # Handle scalar vs array input for source temperature if np.isscalar(QT): QT_array = np.full_like(VLT_L, QT) else: if len(QT) != len(VLT_L): raise ValueError("QT must either be a single number or an array with the same length as VLT_L.") QT_array = QT # Calculate COP using bilinear interpolation COP_L = f(np.column_stack((QT_array, VLT_L))) return COP_L, VLT_L
[docs] def create_controllers(net, qext_w: np.ndarray, supply_temperature_heat_generator: float, min_supply_temperature_heat_consumer: Optional[np.ndarray], return_temperature_heat_consumer: np.ndarray, secondary_producers: Optional[List[Dict[str, Any]]] = None): """ Create comprehensive control systems for district heating network operation. :param net: Pandapipes network object :type net: pandapipes.pandapipesNet :param qext_w: External heat demand values for each consumer [W] :type qext_w: np.ndarray :param supply_temperature_heat_generator: Supply temperature setpoint for main generator [°C] :type supply_temperature_heat_generator: float :param min_supply_temperature_heat_consumer: Minimum required supply temperatures [°C] (None if no constraints) :type min_supply_temperature_heat_consumer: Optional[np.ndarray] :param return_temperature_heat_consumer: Return temperature setpoints for consumers [°C] :type return_temperature_heat_consumer: np.ndarray :param secondary_producers: List of secondary producer objects with index, mass_flow, load_percentage :type secondary_producers: Optional[List[Dict[str, Any]]] :return: Network with all controllers added :rtype: pandapipes.pandapipesNet .. note:: Controllers include heat consumer demand, temperature regulation, secondary producers, and pressure management. """ # Create controllers for each heat consumer for i in range(len(net.heat_consumer)): # Heat demand controller placeholder_df_qext = pd.DataFrame({f'qext_w_{i}': [qext_w[i]]}) placeholder_data_source_qext = DFData(placeholder_df_qext) ConstControl(net, element='heat_consumer', variable='qext_w', element_index=i, data_source=placeholder_data_source_qext, profile_name=f'qext_w_{i}') # Return temperature controller placeholder_df_treturn = pd.DataFrame({f'treturn_k_{i}': [return_temperature_heat_consumer[i] + 273.15]}) placeholder_data_source_treturn = DFData(placeholder_df_treturn) ConstControl(net, element='heat_consumer', variable='treturn_k', element_index=i, data_source=placeholder_data_source_treturn, profile_name=f'treturn_k_{i}') # Minimum supply temperature controller (if required) if min_supply_temperature_heat_consumer is not None and np.any(np.array(min_supply_temperature_heat_consumer) != 0): print(f"Creating temperature controller for heat consumer {i} with min supply temperature {min_supply_temperature_heat_consumer[i]} °C") min_supply_temp_profile = pd.DataFrame({ f'min_supply_temperature': [min_supply_temperature_heat_consumer[i]] }) min_supply_temp_data_source = DFData(min_supply_temp_profile) T_controller = MinimumSupplyTemperatureController( net, heat_consumer_idx=i, min_supply_temperature=min_supply_temperature_heat_consumer[i] ) T_controller.data_source = min_supply_temp_data_source # Manual registration required (BasicCtrl doesn't auto-register) net.controller.loc[len(net.controller)] = [T_controller, True, -1, -1, False, False, None] # Main heat generator supply temperature controller placeholder_df_supply_temp = pd.DataFrame({'supply_temperature': [supply_temperature_heat_generator + 273.15]}) placeholder_data_source_supply_temp = DFData(placeholder_df_supply_temp) ConstControl(net, element='circ_pump_pressure', variable='t_flow_k', element_index=0, data_source=placeholder_data_source_supply_temp, profile_name='supply_temperature') # Secondary producer controllers if secondary_producers: for producer in secondary_producers: # KORRIGIERT: Verwende Attribut-Zugriff statt Dictionary-Zugriff mass_flow = producer.mass_flow if hasattr(producer, 'mass_flow') else 0 producer_index = producer.index if hasattr(producer, 'index') else 0 # Mass flow controller for circulation pump placeholder_df = pd.DataFrame({f'mdot_flow_kg_per_s_{producer_index}': [mass_flow]}) placeholder_data_source = DFData(placeholder_df) ConstControl(net, element='circ_pump_mass', variable='mdot_flow_kg_per_s', element_index=0, data_source=placeholder_data_source, profile_name=f'mdot_flow_kg_per_s_{producer_index}') # Flow control for producer placeholder_df_flow = pd.DataFrame({f'controlled_mdot_kg_per_s_{producer_index}': [mass_flow]}) placeholder_data_source_flow = DFData(placeholder_df_flow) ConstControl(net, element='flow_control', variable='controlled_mdot_kg_per_s', element_index=0, data_source=placeholder_data_source_flow, profile_name=f'controlled_mdot_kg_per_s_{producer_index}') # Supply temperature for secondary producers ConstControl(net, element='circ_pump_mass', variable='t_flow_k', element_index=0, data_source=placeholder_data_source_supply_temp, profile_name='supply_temperature') # System pressure management controller dp_controller = BadPointPressureLiftController(net) # Manual registration required (BasicCtrl doesn't auto-register) net.controller.loc[len(net.controller)] = [dp_controller, True, -1, -1, False, False, None] return net
[docs] def correct_flow_directions(net) -> pp.pandapipesNet: """ Correct hydraulic flow directions by analyzing velocities and swapping junction connections. :param net: Network with potentially incorrect flow directions :type net: pandapipes.pandapipesNet :return: Network with corrected flow directions :rtype: pp.pandapipesNet .. note:: Identifies pipes with negative velocities and swaps from_junction/to_junction for proper flow representation. """ # Initial pipeflow calculation to determine actual flow directions pp.pipeflow(net, mode="bidirectional", iter=100) # Identify and correct pipes with reverse flow corrections_made = 0 for pipe_idx in net.pipe.index: if net.res_pipe.v_mean_m_per_s[pipe_idx] < 0: # Swap junction connections for reverse flow pipes from_junction = net.pipe.at[pipe_idx, 'from_junction'] to_junction = net.pipe.at[pipe_idx, 'to_junction'] net.pipe.at[pipe_idx, 'from_junction'] = to_junction net.pipe.at[pipe_idx, 'to_junction'] = from_junction corrections_made += 1 # Recalculate with corrected flow directions pp.pipeflow(net, mode="bidirectional", iter=100) if corrections_made > 0: logging.info(f"Corrected flow directions for {corrections_made} pipes") return net
[docs] def optimize_diameter_parameters(net, element: str = "pipe", v_max: float = 2.0, dx: float = 0.001, safety_factor: float = 1.5) -> pp.pandapipesNet: """ Optimize network element diameters to meet maximum velocity constraints using continuous adjustment. :param net: Pandapipes network to optimize :type net: pandapipes.pandapipesNet :param element: Network element type (default 'pipe') :type element: str :param v_max: Maximum allowable velocity [m/s] :type v_max: float :param dx: Diameter adjustment step size [m] :type dx: float :param safety_factor: Safety factor applied to v_max :type safety_factor: float :return: Network with optimized diameters :rtype: pp.pandapipesNet .. note:: Iteratively adjusts diameters with step size dx. Effective v_max = v_max / safety_factor. """ # Apply safety factor to velocity limit effective_v_max = v_max / safety_factor # Initial flow calculation pp.pipeflow(net, mode="bidirectional", iter=100) run_control(net, mode="bidirectional", iter=100) element_df = getattr(net, element) res_df = getattr(net, f"res_{element}") iteration_count = 0 change_made = True while change_made: change_made = False iteration_count += 1 for idx in element_df.index: # Calculate velocity using volumetric flow and cross-sectional area current_velocity = res_df.vdot_m3_per_s[idx] / (np.pi * (element_df.at[idx, 'diameter_m'] / 2)**2) current_diameter = element_df.at[idx, 'diameter_m'] # Increase diameter if velocity exceeds limit if current_velocity > effective_v_max: element_df.at[idx, 'diameter_m'] += dx change_made = True # Attempt diameter reduction if below limit elif current_velocity < effective_v_max: element_df.at[idx, 'diameter_m'] -= dx # Validate reduction doesn't violate constraints pp.pipeflow(net, mode="bidirectional", iter=100) run_control(net, mode="bidirectional", iter=100) element_df = getattr(net, element) res_df = getattr(net, f"res_{element}") new_velocity = res_df.vdot_m3_per_s[idx] / (np.pi * (element_df.at[idx, 'diameter_m'] / 2)**2) if new_velocity > effective_v_max: # Revert if reduction violates constraint element_df.at[idx, 'diameter_m'] = current_diameter else: change_made = True # Recalculate only if changes were made if change_made: pp.pipeflow(net, mode="bidirectional", iter=100) run_control(net, mode="bidirectional", iter=100) element_df = getattr(net, element) res_df = getattr(net, f"res_{element}") logging.info(f"Diameter optimization completed in {iteration_count} iterations") return net
[docs] def init_diameter_types(net, v_max_pipe: float = 1.0, material_filter: str = "KMR", k: float = 0.1) -> pp.pandapipesNet: """ Initialize pipe diameters using standard pipe types based on velocity requirements. :param net: Pandapipes network to initialize :type net: pandapipes.pandapipesNet :param v_max_pipe: Maximum allowable velocity in pipes [m/s] :type v_max_pipe: float :param material_filter: Pipe material filter for standard types :type material_filter: str :param k: Pipe roughness coefficient [mm] :type k: float :return: Network with initialized standard pipe types :rtype: pp.pandapipesNet .. note:: Selects closest available standard type from filtered catalog based on required diameter. """ start_time = time.time() # Initial hydraulic calculation print(f"\n{'='*80}") print(f"INIT_DIAMETER_TYPES: Initial calculation (pipeflow + control)") print(f"{'='*80}") # Step 1: Calculate velocities with current diameters pp.pipeflow(net, mode="bidirectional", iter=100) # Step 2: Let controller adjust pump parameters for proper pressures run_control(net, mode="bidirectional", iter=100) # Load and filter standard pipe types pipe_std_types = pp.std_types.available_std_types(net, "pipe") filtered_by_material = pipe_std_types[pipe_std_types['material'] == material_filter] if filtered_by_material.empty: raise ValueError(f"No standard pipe types found for material filter: {material_filter}") # Initialize pipe diameters based on velocity requirements print(f"\n{'='*80}") print(f"INIT_DIAMETER_TYPES: Initializing {len(net.pipe)} pipes") print(f"v_max_pipe = {v_max_pipe} m/s, material = {material_filter}") print(f"Available types: {filtered_by_material.index.tolist()}") print(f"{'='*80}\n") for pipe_idx, velocity in enumerate(net.res_pipe.v_mean_m_per_s): current_diameter = net.pipe.at[pipe_idx, 'diameter_m'] pipe_name = net.pipe.at[pipe_idx, 'name'] if 'name' in net.pipe.columns else f"Pipe {pipe_idx}" # Calculate required diameter using continuity equation required_diameter = current_diameter * (velocity / v_max_pipe)**0.5 # Find closest available standard type closest_type = min( filtered_by_material.index, key=lambda x: abs(filtered_by_material.loc[x, 'inner_diameter_mm'] / 1000 - required_diameter) ) # Update pipe properties properties = filtered_by_material.loc[closest_type] selected_diameter = properties['inner_diameter_mm'] / 1000 net.pipe.std_type.at[pipe_idx] = closest_type net.pipe.at[pipe_idx, 'diameter_m'] = selected_diameter net.pipe.at[pipe_idx, 'u_w_per_m2k'] = properties['u_w_per_m2k'] net.pipe.at[pipe_idx, 'k_mm'] = k # Final hydraulic calculation with updated pipe properties print(f"\n{'='*80}") print(f"INIT_DIAMETER_TYPES: Final calculation with new diameters") print(f"{'='*80}") # Step 1: Calculate velocities with new diameters pp.pipeflow(net, mode="bidirectional", iter=100) # Step 2: Adjust pump parameters to meet pressure requirements run_control(net, mode="bidirectional", iter=100) if hasattr(net, 'circ_pump_pressure') and len(net.circ_pump_pressure) > 0: print(f"Final pump pressure: {net.circ_pump_pressure.at[0, 'p_flow_bar']:.2f} bar") print(f"Final pump lift: {net.circ_pump_pressure.at[0, 'plift_bar']:.2f} bar") print(f"{'='*80}\n") total_time = time.time() - start_time logging.info(f"Pipe diameter initialization completed in {total_time:.2f} seconds") return net
[docs] def optimize_diameter_types(net, v_max: float = 1.0, material_filter: str = "KMR", k: float = 0.1) -> pp.pandapipesNet: """ Optimize pipe diameters using discrete standard pipe types through iterative adjustment. :param net: Pandapipes network to optimize :type net: pandapipes.pandapipesNet :param v_max: Maximum allowable velocity [m/s] :type v_max: float :param material_filter: Pipe material filter for standard types :type material_filter: str :param k: Pipe surface roughness [mm] :type k: float :return: Network with optimized standard pipe types :rtype: pp.pandapipesNet .. note:: Iteratively adjusts standard types: upsize pipes exceeding v_max, attempt downsizing below v_max. """ start_time = time.time() # Load and filter standard pipe types pipe_std_types = pp.std_types.available_std_types(net, "pipe") filtered_by_material = pipe_std_types[pipe_std_types['material'] == material_filter] if filtered_by_material.empty: raise ValueError(f"No standard pipe types found for material filter: {material_filter}") # Create type position mapping for optimization type_position_dict = {type_name: i for i, type_name in enumerate(filtered_by_material.index)} # Initial system state calculation print(f"\n{'='*80}") print(f"OPTIMIZE_DIAMETER_TYPES: Starting optimization") print(f"v_max = {v_max} m/s, material = {material_filter}") print(f"Available types: {filtered_by_material.index.tolist()}") print(f"{'='*80}") # Calculate current state (assumes init_diameter_types was called before) pp.pipeflow(net, mode="bidirectional", iter=100) run_control(net, mode="bidirectional", iter=100) # Initialize optimization tracking net.pipe['optimized'] = False change_made = True iteration_count = 0 # Iterative optimization loop print(f"\n{'='*80}") print(f"OPTIMIZE_DIAMETER_TYPES: Starting iterative optimization") print(f"{'='*80}\n") while change_made: iteration_start = time.time() change_made = False pipes_within_target = 0 pipes_outside_target = 0 print(f"\n--- Iteration {iteration_count + 1} ---") for pipe_idx, velocity in enumerate(net.res_pipe.v_mean_m_per_s): # Skip already optimized pipes within limits if net.pipe.at[pipe_idx, 'optimized'] and velocity <= v_max: pipes_within_target += 1 continue current_type = net.pipe.std_type.at[pipe_idx] current_position = type_position_dict[current_type] # Upsize pipes exceeding velocity limit if velocity > v_max and current_position < len(filtered_by_material) - 1: new_type = filtered_by_material.index[current_position + 1] properties = filtered_by_material.loc[new_type] pipe_name = net.pipe.at[pipe_idx, 'name'] if 'name' in net.pipe.columns else f"Pipe {pipe_idx}" print(f" {pipe_name}: UPSIZE v={velocity:.3f} > {v_max} m/s | {current_type}{new_type}") net.pipe.std_type.at[pipe_idx] = new_type net.pipe.at[pipe_idx, 'diameter_m'] = properties['inner_diameter_mm'] / 1000 net.pipe.at[pipe_idx, 'u_w_per_m2k'] = properties['u_w_per_m2k'] net.pipe.at[pipe_idx, 'k_mm'] = k change_made = True pipes_outside_target += 1 # Attempt downsizing for pipes within limits elif velocity <= v_max and current_position > 0: new_type = filtered_by_material.index[current_position - 1] properties = filtered_by_material.loc[new_type] pipe_name = net.pipe.at[pipe_idx, 'name'] if 'name' in net.pipe.columns else f"Pipe {pipe_idx}" # Temporarily apply smaller diameter net.pipe.std_type.at[pipe_idx] = new_type net.pipe.at[pipe_idx, 'diameter_m'] = properties['inner_diameter_mm'] / 1000 net.pipe.at[pipe_idx, 'u_w_per_m2k'] = properties['u_w_per_m2k'] net.pipe.at[pipe_idx, 'k_mm'] = k # Validate downsizing doesn't violate constraints print(f" Testing downsize: {current_type}{new_type}") # Step 1: Calculate new velocities pp.pipeflow(net, mode="bidirectional", iter=100) # Step 2: Adjust pump if needed run_control(net, mode="bidirectional", iter=100) new_velocity = net.res_pipe.v_mean_m_per_s[pipe_idx] print(f" New velocity: {new_velocity:.3f} m/s") if new_velocity <= v_max: print(f" {pipe_name}: DOWNSIZE v={velocity:.3f}{v_max} m/s | {current_type}{new_type} (new v={new_velocity:.3f})") change_made = True else: print(f" {pipe_name}: REVERT v={velocity:.3f}{new_velocity:.3f} > {v_max} m/s | {new_type}{current_type}") # Revert to previous size and mark as optimized properties = filtered_by_material.loc[current_type] net.pipe.std_type.at[pipe_idx] = current_type net.pipe.at[pipe_idx, 'diameter_m'] = properties['inner_diameter_mm'] / 1000 net.pipe.at[pipe_idx, 'u_w_per_m2k'] = properties['u_w_per_m2k'] net.pipe.at[pipe_idx, 'k_mm'] = k net.pipe.at[pipe_idx, 'optimized'] = True pipes_within_target += 1 else: # Mark as optimized if no further changes possible net.pipe.at[pipe_idx, 'optimized'] = True pipes_within_target += 1 iteration_count += 1 # Recalculate if changes were made if change_made: print(f"\nRecalculating network after changes...") # Step 1: Calculate velocities pp.pipeflow(net, mode="bidirectional", iter=100) # Step 2: Adjust pump parameters run_control(net, mode="bidirectional", iter=100) print(f"Network recalculated with adjusted pump parameters") iteration_time = time.time() - iteration_start print(f"\nIteration {iteration_count} summary: {pipes_within_target} pipes OK, {pipes_outside_target} pipes adjusted ({iteration_time:.2f}s)") logging.info(f"Iteration {iteration_count}: {pipes_within_target} pipes within target, " f"{pipes_outside_target} pipes outside target ({iteration_time:.2f}s)") if not change_made: print(f"\n{'='*80}") print(f"OPTIMIZATION CONVERGED after {iteration_count} iterations") print(f"{'='*80}\n") # Final calculation with optimized parameters print(f"\n{'='*80}") print(f"OPTIMIZE: Final calculation") print(f"{'='*80}") # Step 1: Calculate final velocities pp.pipeflow(net, mode="bidirectional", iter=100) print(f"Final velocities: {net.res_pipe.v_mean_m_per_s.values}") # Step 2: Final pump adjustment run_control(net, mode="bidirectional", iter=100) print(f"\nOptimization complete!") if hasattr(net, 'circ_pump_pressure') and len(net.circ_pump_pressure) > 0: print(f"Optimized pump pressure: {net.circ_pump_pressure.at[0, 'p_flow_bar']:.2f} bar") print(f"Optimized pump lift: {net.circ_pump_pressure.at[0, 'plift_bar']:.2f} bar") print(f"Optimized pipe types: {net.pipe.std_type.unique()}") print(f"{'='*80}") total_time = time.time() - start_time logging.info(f"Total optimization time: {total_time:.2f} seconds") return net
[docs] def export_net_geojson(net, filename: str) -> dict: """ Export pandapipes network data to unified GeoJSON format (Version 2.0). :param net: Pandapipes network with topology, geodata, and component properties :type net: pandapipes.pandapipesNet :param filename: Output file path for GeoJSON export :type filename: str :return: Feature counts {'flow': int, 'return': int, 'building': int, 'generator': int} :rtype: dict .. note:: Creates single file with flow/return lines, building connections, and generator connections. """ print(f"\n{'='*80}") print(f"EXPORT_NET_GEOJSON: Starting export to {filename}") print(f"{'='*80}\n") # Extract flow and return lines from pipes flow_features = [] return_features = [] print(f"Checking pipe_geodata... hasattr: {hasattr(net, 'pipe_geodata')}") if hasattr(net, 'pipe_geodata'): print(f"pipe_geodata empty: {net.pipe_geodata.empty}") if hasattr(net, 'pipe_geodata') and not net.pipe_geodata.empty: pipe_count = len(net.pipe) for idx, row in net.pipe_geodata.iterrows(): pipe_data = net.pipe.loc[idx] geometry = LineString(row['coords']) # Split pipes into flow and return (first half = flow, second half = return) feature_data = { 'geometry': geometry, 'segment_id': f"{'flow' if idx < pipe_count/2 else 'return'}_{idx:03d}", 'diameter_mm': pipe_data.get('diameter_m', 0) * 1000, 'std_type': pipe_data.get('std_type', ''), 'length_m': pipe_data.get('length_km', 0) * 1000 } if idx < pipe_count / 2: flow_features.append(feature_data) else: return_features.append(feature_data) # Create GeoDataFrames flow_gdf = gpd.GeoDataFrame(flow_features, crs="EPSG:25833") if flow_features else gpd.GeoDataFrame() return_gdf = gpd.GeoDataFrame(return_features, crs="EPSG:25833") if return_features else gpd.GeoDataFrame() # Extract building connections from heat consumers building_features = [] if hasattr(net, 'heat_consumer') and not net.heat_consumer.empty: for idx, consumer in net.heat_consumer.iterrows(): from_junction = net.junction_geodata.loc[consumer['from_junction']] to_junction = net.junction_geodata.loc[consumer['to_junction']] geometry = LineString([ (from_junction['x'], from_junction['y']), (to_junction['x'], to_junction['y']) ]) building_features.append({ 'geometry': geometry, 'connection_id': f"hast_{idx:03d}", 'heat_demand_W': consumer.get('qext_w', 0) }) building_gdf = gpd.GeoDataFrame(building_features, crs="EPSG:25833") if building_features else gpd.GeoDataFrame() # Extract generator connections from circulation pumps generator_features = [] if hasattr(net, 'circ_pump_pressure') and not net.circ_pump_pressure.empty: for idx, pump in net.circ_pump_pressure.iterrows(): return_junction = net.junction_geodata.loc[pump['return_junction']] flow_junction = net.junction_geodata.loc[pump['flow_junction']] geometry = LineString([ (return_junction['x'], return_junction['y']), (flow_junction['x'], flow_junction['y']) ]) generator_features.append({ 'geometry': geometry, 'generator_id': f"gen_{idx:03d}" }) generator_gdf = gpd.GeoDataFrame(generator_features, crs="EPSG:25833") if generator_features else gpd.GeoDataFrame() print(f"\nExtracted features:") print(f" Flow: {len(flow_features)}") print(f" Return: {len(return_features)}") print(f" Buildings: {len(building_features)}") print(f" Generators: {len(generator_features)}") print(f"\nCalling create_network_geojson...") # Create unified GeoJSON using NetworkGeoJSONSchema # (calculated data is automatically included from GeoDataFrame columns) unified_geojson = NetworkGeoJSONSchema.create_network_geojson( flow_lines=flow_gdf, return_lines=return_gdf, building_connections=building_gdf, generator_connections=generator_gdf, state='dimensioned' ) print(f"Exporting network to unified GeoJSON format: {filename}") logging.info(f"Created unified GeoJSON with {len(unified_geojson.get('features', []))} total features") # Export to file NetworkGeoJSONSchema.export_to_file(unified_geojson, filename) logging.info( f"Network exported to unified GeoJSON: {filename} " f"(Flow: {len(flow_features)}, Return: {len(return_features)}, " f"Buildings: {len(building_features)}, Generators: {len(generator_features)})" ) return { 'flow': len(flow_features), 'return': len(return_features), 'building': len(building_features), 'generator': len(generator_features) }