Source code for ogstools.meshlib.gmsh_BHE

# Copyright (c) 2012-2025, OpenGeoSys Community (http://www.opengeosys.org)
#            Distributed under a Modified BSD License.
#            See accompanying file LICENSE.txt or
#            http://www.opengeosys.org/project/license
#


from dataclasses import dataclass
from math import ceil
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, Literal

import gmsh
import numpy as np
import shapely

import ogstools as ot


[docs] @dataclass(frozen=True) class Groundwater: begin: float = -30 """ depth of groundwater begin (negative) in m from top surface """ isolation_layer_id: int = 1 """ number of the groundwater isolation layer (count starts with 0)""" upstream: tuple[float, float] = (160, 200) """ Tuple of length 2 defining the angular range (in degrees) of groundwater inflow surfaces. Angles are measured on a 0 - 359° circle, where 0° corresponds to the +x axis direction and values increase counterclockwise. The first value defines the start angle, the second defines the end angle. If the start angle is larger than the end angle, the range wraps around 0° (e.g., (359, 1) covers 359° -> 0° -> 1°). """ downstream: tuple[float, float] = (340, 20) """ Tuple of length 2 defining the angular range (in degrees) of groundwater outflow surfaces. Angles are measured on a 0 - 359° circle, where 0° corresponds to the +x axis direction and values increase counterclockwise. The first value defines the start angle, the second defines the end angle. If the start angle is larger than the end angle, the range wraps around 0° (e.g., (340, 20) covers 340° -> 359° -> 0° -> 20°). """
[docs] @dataclass(frozen=True) class BHE: """(B)orehole (H)eat (E)xchanger""" x: float = 50.0 """x-coordinate of the BHE in m""" y: float = 50.0 """y-coordinate of the BHE in m""" z_begin: float = -1.0 """BHE begin depth (zero or negative) in m""" z_end: float = -60.0 """BHE end depth (negative) in m""" borehole_radius: float = 0.076 """borehole radius in m"""
[docs] def gen_bhe_mesh( model_area: shapely.Polygon, layer: float | list[float], # e.g. 100 groundwater: Groundwater | list[Groundwater], BHE_Array: BHE | list[BHE], refinement_area: shapely.Polygon, target_z_size_coarse: float = 7.5, target_z_size_fine: float = 1.5, n_refinement_layers: int = 2, meshing_type: Literal["prism", "structured"] = "prism", inner_mesh_size: float = 5.0, outer_mesh_size: float = 10.0, propagation: float = 1.1, order: int = 1, meshname: str = "bhe_mesh", ) -> ot.Meshes: """ Create a generic BHE mesh for the Heat_Transport_BHE-Process with additionally submeshes at the top, at the bottom and the groundwater in- and outflow, which is returned as :py:mod:`ogstools.meshlib.Meshes` Refinement layers are placed at the BHE-begin, the BHE-end and the groundwater start/end. See detailed description of the parameters below: :param model_area: A shapely.Polygon (see https://shapely.readthedocs.io/en/stable/reference/shapely.Polygon.html) of the model. No holes are allowed. :param layer: List of the soil layer thickness in m :param groundwater: List of groundwater layers, where every is specified by a tuple of three entries: [depth of groundwater begin (negative), number of the groundwater isolation layer (count starts with 0), groundwater upstream and downstream as tuple of 2 thresholds angles starting with 0 at +x (first value start, second end), empty list [] for no groundwater flow :param BHE_Array: List of BHEs, where every BHE is specified by a tuple of five floats: [x-coordinate BHE, y-coordinate BHE, BHE begin depth (zero or negative), BHE end depth (negative), borehole radius in m] :param target_z_size_coarse: maximum edge length of the elements in m in z-direction, if no refinemnt needed :param target_z_size_fine: maximum edge length of the elements in the refinement zone in m in z-direction :param n_refinement_layers: number of refinement layers which are evenly set above and beneath the refinemnt depths (see general description above) :param meshing_type: 'structured' and 'prism' are supported :param refinement_area: A shapely.Polygon (see https://shapely.readthedocs.io/en/stable/reference/shapely.Polygon.html) of the refinement_area. No holes are allowed. :param inner_mesh_size: mesh size inside the refinement area in m :param outer_mesh_size: mesh size outside of the refinement area in m :param propagation: growth of the outer_mesh_size, only supported by meshing_type 'structured' :param order: Define the order of the mesh: 1 for linear finite elements / 2 for quadratic finite elements :param meshname: The name of the domain mesh. :returns: A ot.Meshes object # .. image:: ../../examples/howto_preprocessing/gen_bhe_mesh.png """ def _compute_layer_spacing( z_min: float, z_max: float, z_min_id: int, id_j: float, id_j_max: float ) -> None: delta_z = np.abs(z_min - z_max) size_fine = n_refinement_layers * target_z_size_fine target_layer: list = number_of_layers[len(number_of_layers) - 1] if z_min_id == 3: # and id_j==1 - not needed, but logically safer if id_j == id_j_max: space = delta_z - size_fine - space_next_layer_refined space_fine_next = space + size_fine + space_next_layer_refined if space_next_layer_refined == 0: if space <= target_z_size_fine: absolute_height_of_layers.append(delta_z) target_layer.append(ceil(delta_z / target_z_size_fine)) else: absolute_height_of_layers.extend([size_fine, space]) target_layer.append(n_refinement_layers) target_layer.append(ceil(space / target_z_size_coarse)) else: if space <= target_z_size_fine: absolute_height_of_layers.append(space_fine_next) target_layer.append( ceil(space_fine_next / target_z_size_fine) ) else: absolute_height_of_layers.append(size_fine) target_layer.append(n_refinement_layers) absolute_height_of_layers.append(space) target_layer.append(ceil(space / target_z_size_coarse)) absolute_height_of_layers.append( space_next_layer_refined ) target_layer.append( ceil(space_next_layer_refined / target_z_size_fine) ) else: # all negative values, because of the extrusion in negative z-direction -> z_min -- End Layer, z_max -- start layer space = delta_z - size_fine if ( np.abs(space) <= (n_refinement_layers + 1) * target_z_size_fine ): absolute_height_of_layers.append(space + size_fine) target_layer.append( ceil((space + size_fine) / target_z_size_fine) ) else: absolute_height_of_layers.append(size_fine) target_layer.append(n_refinement_layers) absolute_height_of_layers.append(space - size_fine) target_layer.append( ceil((space - size_fine) / target_z_size_coarse) ) absolute_height_of_layers.append(size_fine) target_layer.append(n_refinement_layers) # erstes Mesh in jeweiligen Soil-Layer elif id_j == 1 and id_j != id_j_max: space = delta_z - space_last_layer_refined space_last = space + space_last_layer_refined if np.abs(space) <= (n_refinement_layers + 1) * target_z_size_fine: absolute_height_of_layers.append(space_last) target_layer.append(ceil(space_last / target_z_size_fine)) else: if space_last_layer_refined == 0: absolute_height_of_layers.append(space - size_fine) target_layer.append( ceil((space - size_fine) / target_z_size_coarse) ) absolute_height_of_layers.append(size_fine) target_layer.append(n_refinement_layers) else: absolute_height_of_layers.append(space_last_layer_refined) target_layer.append( ceil(space_last_layer_refined / target_z_size_fine) ) absolute_height_of_layers.append(space - size_fine) target_layer.append( ceil((space - size_fine) / target_z_size_coarse) ) absolute_height_of_layers.append(2 * target_z_size_fine) target_layer.append(n_refinement_layers) elif id_j == id_j_max and id_j != 1: space = delta_z - space_next_layer_refined space_next = space + space_next_layer_refined if space <= (n_refinement_layers + 1) * target_z_size_fine: absolute_height_of_layers.append(space_next) target_layer.append(ceil(space_next / target_z_size_fine)) else: if space_next_layer_refined == 0: absolute_height_of_layers.append(size_fine) target_layer.append(n_refinement_layers) absolute_height_of_layers.append(space - size_fine) target_layer.append( ceil((space - size_fine) / target_z_size_coarse) ) else: absolute_height_of_layers.append(size_fine) target_layer.append(n_refinement_layers) absolute_height_of_layers.append(space - size_fine) target_layer.append( ceil((space - size_fine) / target_z_size_coarse) ) absolute_height_of_layers.append(space_next_layer_refined) target_layer.append( ceil(space_next_layer_refined / target_z_size_fine) ) # Layer without a needed depth of BHE or Groundwater elif id_j == id_j_max and id_j == 1: space = delta_z space_next = space - space_next_layer_refined space_last = space - space_last_layer_refined space_nextlast = space_next - space_last_layer_refined if space_last_layer_refined == 0 and space_next_layer_refined == 0: absolute_height_of_layers.append(space) target_layer.append(ceil(space / target_z_size_coarse)) elif space_next_layer_refined == 0: if space_last <= target_z_size_fine: absolute_height_of_layers.append(space) target_layer.append(ceil(space / target_z_size_fine)) else: absolute_height_of_layers.append(space_last_layer_refined) target_layer.append( ceil(space_last_layer_refined / target_z_size_fine) ) absolute_height_of_layers.append(space_last) target_layer.append(ceil(space_last / target_z_size_coarse)) elif space_last_layer_refined == 0: if space_next <= target_z_size_fine: absolute_height_of_layers.append(space) target_layer.append(ceil(space / target_z_size_fine)) else: absolute_height_of_layers.append(space_next) target_layer.append(ceil(space_next / target_z_size_coarse)) absolute_height_of_layers.append(space_next_layer_refined) target_layer.append( ceil(space_next_layer_refined / target_z_size_fine) ) else: if space_nextlast <= target_z_size_fine: absolute_height_of_layers.append(space) target_layer.append(ceil(space / target_z_size_fine)) else: absolute_height_of_layers.append(space_next_layer_refined) target_layer.append( ceil(space_next_layer_refined / target_z_size_fine) ) absolute_height_of_layers.append(space_nextlast) target_layer.append( ceil(space_nextlast / target_z_size_coarse) ) absolute_height_of_layers.append(space_last_layer_refined) target_layer.append( ceil(space_last_layer_refined / target_z_size_fine) ) else: space = delta_z if space <= (2 * n_refinement_layers + 1) * target_z_size_fine: absolute_height_of_layers.append(space) target_layer.append(ceil(space / target_z_size_fine)) else: absolute_height_of_layers.append(size_fine) target_layer.append(n_refinement_layers) absolute_height_of_layers.append(space - 2 * size_fine) target_layer.append( ceil((space - 2 * size_fine) / target_z_size_coarse) ) absolute_height_of_layers.append(size_fine) target_layer.append(n_refinement_layers) # to flat a list, seems not so easy with a ready to use function --> this code is from https://realpython.com/python-flatten-list/ def _flatten_concatenation(matrix: list[list[Any]]) -> list: flat_list = [] for row in matrix: flat_list += row return flat_list def _insert_BHE(inTag: int) -> list: bhe_top_nodes = [] BHE_i: BHE for BHE_i in BHE_array: x, y, z = (BHE_i.x, BHE_i.y, 0) # meshsize at BHE and distance of the surrounding optimal mesh points # see Diersch et al. 2011 Part 2 for 6 surrounding nodes, not to be defined by user delta = 6.134 * BHE_i.borehole_radius bhe_center = gmsh.model.geo.addPoint(x, y, z, delta) gmsh.model.geo.addPoint(x, y - delta, z, delta) gmsh.model.geo.addPoint(x, y + delta, z, delta) dx, dy = (0.866 * delta, 0.5 * delta) gmsh.model.geo.addPoint(x + dx, y + dy, z, delta) gmsh.model.geo.addPoint(x - dx, y + dy, z, delta) gmsh.model.geo.addPoint(x + dx, y - dy, z, delta) gmsh.model.geo.addPoint(x - dx, y - dy, z, delta) if BHE_i.z_begin != 0: bhe_top_nodes.append( gmsh.model.geo.addPoint(x, y, BHE_i.z_begin, delta) ) else: bhe_top_nodes.append(bhe_center) gmsh.model.geo.synchronize() gmsh.model.mesh.embed( 0, list(range(bhe_center, bhe_center + 7)), 2, inTag ) return bhe_top_nodes def _mesh_structured() -> None: if shapely.get_coordinate_dimension( refinement_area ) != shapely.get_coordinate_dimension(model_area): msg = f"Dimension of model area is {shapely.get_coordinate_dimension(model_area)}, but refinement area is of dimension {shapely.get_coordinate_dimension(refinement_area)}" raise ValueError(msg) point_registry_refinement_area: dict[frozenset[float], int] = {} line_registry_refinement_area = {} # define the refinement area point_ids_refinement = [] if shapely.get_coordinate_dimension(refinement_area) == 2: for i, (x, y) in enumerate(refinement_area.exterior.coords[:-1]): point_ids_refinement.append( geo.addPoint(x, y, 0, outer_mesh_size, tag=i) ) point_registry_refinement_area[frozenset([x, y])] = ( point_ids_refinement[-1] ) else: for i, (x, y, z) in enumerate(refinement_area.exterior.coords[:-1]): point_ids_refinement.append( geo.addPoint(x, y, z, outer_mesh_size, tag=i) ) point_registry_refinement_area[frozenset([x, y, z])] = ( point_ids_refinement[-1] ) refinement_area_lines = list( zip( point_ids_refinement, point_ids_refinement[1:] + point_ids_refinement[:1], strict=True, ) ) refinement_area_line_ids = [] for p1, p2 in refinement_area_lines: refinement_area_line_ids.append(geo.addLine(p1, p2)) line_registry_refinement_area[frozenset([p1, p2])] = ( refinement_area_line_ids[-1] ) cl_refinement_area = geo.addCurveLoop(refinement_area_line_ids) surface_refinemment_area = geo.addPlaneSurface([cl_refinement_area]) max_point_tag = geo.getMaxTag(dim=0) + 1 # define the model area point_model_area_registry: dict[int, tuple[float, ...]] = {} if shapely.get_coordinate_dimension(model_area) == 2: for i, (x, y) in enumerate(model_area.exterior.coords[:-1]): point_model_area_registry[ geo.addPoint( x, y, 0, outer_mesh_size, tag=max_point_tag + i ) ] = (x, y, 0) else: for i, (x, y, z) in enumerate(model_area.exterior.coords[:-1]): point_model_area_registry[ geo.addPoint( x, y, z, outer_mesh_size, tag=max_point_tag + i ) ] = (x, y, z) point_ids_model_area = list(point_model_area_registry.keys()) model_area_lines = list( zip( point_ids_model_area, point_ids_model_area[1:] + point_ids_model_area[:1], strict=True, ) ) model_surfaces = [] model_surfaces_n_sides = [ len(refinement_area_lines) ] # TODO: Check this connection_lines = [] transfinite_surfaces = [] transfinite_curves: dict[int, tuple[int, float]] = {} connection_line_registry: dict[frozenset[int], int] = {} connection_line_transfinite_rings: list[list] = [[]] gw_upstream: list[list[int]] = [[] for _ in range(len(groundwaters))] gw_downstream: list[list[int]] = [[] for _ in range(len(groundwaters))] ref_vec = [1, 0] # in positive x-dir for i, (p1, p2) in enumerate(model_area_lines): # determine, if a line corresponds to the groundwater p1_x, p1_y = point_model_area_registry[p1][:2] p2_x, p2_y = point_model_area_registry[p2][:2] dx = p2_x - p1_x dy = p2_y - p1_y normal_vec = [dy, -dx] # Calculate dot product dot_product = np.dot(ref_vec, normal_vec) # Calculate magnitudes (lengths of the vectors) magnitude_A = np.linalg.norm(ref_vec) magnitude_B = np.linalg.norm(normal_vec) # Calculate angle in radians angle_radians = np.arccos(dot_product / (magnitude_A * magnitude_B)) # Convert radians to degrees normal_vec_angle = ( 360 - np.degrees(angle_radians) if normal_vec[1] < 0 else np.degrees(angle_radians) ) for i_gw, groundwater in enumerate(groundwaters): if groundwater.upstream[0] > groundwater.upstream[1]: if ( normal_vec_angle > groundwater.upstream[0] or normal_vec_angle < groundwater.upstream[1] ): gw_upstream[i_gw].append(i) elif ( normal_vec_angle > groundwater.upstream[0] and normal_vec_angle < groundwater.upstream[1] ): gw_upstream[i_gw].append(i) if groundwater.downstream[0] > groundwater.downstream[1]: if ( normal_vec_angle > groundwater.downstream[0] or normal_vec_angle < groundwater.downstream[1] ): gw_downstream[i_gw].append(i) elif ( normal_vec_angle > groundwater.downstream[0] and normal_vec_angle < groundwater.downstream[1] ): gw_downstream[i_gw].append(i) model_line = geo.addLine(p1, p2) point1 = shapely.Point( model_area.exterior.coords[p1 - max_point_tag] ) point2 = shapely.Point( model_area.exterior.coords[p2 - max_point_tag] ) # find nearest point of the refinement area nearest_to_point1 = min( refinement_area.exterior.coords, key=lambda p: point1.distance(shapely.Point(p)), ) nearest_to_point2 = min( refinement_area.exterior.coords, key=lambda p: point2.distance(shapely.Point(p)), ) id_nearest_to_point1 = point_registry_refinement_area[ frozenset(nearest_to_point1) ] id_nearest_to_point2 = point_registry_refinement_area[ frozenset(nearest_to_point2) ] if nearest_to_point1 == nearest_to_point2 and i != 0: connection_line_transfinite_rings.append([]) triangle_surface = True else: triangle_surface = False if ( frozenset([p2, id_nearest_to_point2]) in connection_line_registry ): connection_1 = connection_line_registry[ frozenset([p2, id_nearest_to_point2]) ] if not triangle_surface: connection_line_transfinite_rings[-1].append(connection_1) else: connection_lines.append(geo.addLine(p2, id_nearest_to_point2)) connection_line_registry[ frozenset([p2, id_nearest_to_point2]) ] = connection_lines[-1] connection_1 = connection_lines[-1] if ( frozenset([p1, id_nearest_to_point1]) in connection_line_registry ): connection_2 = connection_line_registry[ frozenset([p1, id_nearest_to_point1]) ] if not triangle_surface: connection_line_transfinite_rings[-1].append(connection_2) else: connection_lines.append(geo.addLine(p1, id_nearest_to_point1)) connection_line_registry[ frozenset([p1, id_nearest_to_point1]) ] = connection_lines[-1] connection_2 = connection_lines[-1] if nearest_to_point1 == nearest_to_point2: cl_surface = geo.addCurveLoop( [model_line, connection_1, -connection_2] ) model_surfaces.append(geo.addPlaneSurface([cl_surface])) model_surfaces_n_sides.append(3) else: if ( frozenset([id_nearest_to_point1, id_nearest_to_point2]) in line_registry_refinement_area ): refinement_area_line = -line_registry_refinement_area[ frozenset([id_nearest_to_point1, id_nearest_to_point2]) ] elif ( frozenset([id_nearest_to_point2, id_nearest_to_point1]) in line_registry_refinement_area ): refinement_area_line = line_registry_refinement_area[ frozenset([id_nearest_to_point2, id_nearest_to_point1]) ] else: msg = "Something went wrong with building the surface line loops. Please check, that your model and refinement area defined properly." raise Exception(msg) cl_surface = geo.addCurveLoop( [ model_line, connection_1, refinement_area_line, -connection_2, ], reorient=True, ) model_surfaces.append(geo.addPlaneSurface([cl_surface])) model_surfaces_n_sides.append(4) geo.synchronize() dist_c1 = point1.distance(shapely.Point(nearest_to_point1)) dist_c2 = point2.distance(shapely.Point(nearest_to_point2)) dist_refinement_line = shapely.Point( nearest_to_point1 ).distance(shapely.Point(nearest_to_point2)) dist_model_line = point1.distance(point2) n_elem_con_1 = ( transfinite_curves[connection_1][0] if connection_1 in transfinite_curves else ceil(dist_c1 / outer_mesh_size_inner) + 1 ) n_elem_con_2 = ( transfinite_curves[connection_2][0] if connection_2 in transfinite_curves else ceil(dist_c2 / outer_mesh_size_inner) + 1 ) max_element_n = max(n_elem_con_1, n_elem_con_2) transfinite_curves[connection_1] = (max_element_n, -propagation) transfinite_curves[connection_2] = (max_element_n, -propagation) num_elements_refine_and_model = max( ceil(dist_refinement_line / inner_mesh_size) + 1, ceil(dist_model_line / outer_mesh_size) + 1, ) transfinite_curves[model_line] = ( num_elements_refine_and_model, 1.0, ) transfinite_curves[refinement_area_line] = ( num_elements_refine_and_model, 1.0, ) transfinite_surfaces.append(model_surfaces[-1]) gmsh.model.geo.synchronize() if groundwaters: if not _flatten_concatenation(gw_downstream): msg = "No groundwater upstream surfaces detected, please check your specified angles!" raise ValueError(msg) if not _flatten_concatenation(gw_downstream): msg = "No groundwater downstream surfaces detected, please check your specified angles!" raise ValueError(msg) for connection_ring in connection_line_transfinite_rings: mesh_sizes = [] for connection_line in connection_ring: mesh_sizes.append(transfinite_curves[connection_line][0]) minimum_mesh_size = max(mesh_sizes) for connection_line in connection_ring: transfinite_curves[connection_line] = ( minimum_mesh_size, -propagation, ) bhe_top_nodes = _insert_BHE(inTag=1) # Extrude the surface mesh according to the previously evaluated structure volumes_list_for_layers = [] top_surface = [ surface_refinemment_area ] + model_surfaces # list(range(1, 10)) surface_list = [(2, tag) for tag in top_surface] gw_downstream_tags: list[list[int]] = [ [] for _ in range(len(groundwaters)) ] gw_upstream_tags: list[list[int]] = [ [] for _ in range(len(groundwaters)) ] for j, num_elements in enumerate(number_of_layers): # spacing of the each layer must be evaluated according to the implementation of the bhe extrusion_tags = geo.extrude( surface_list, 0, 0, -depth_of_extrusion[j], num_elements, cummulative_height_of_layers[j], True, ) # soil 1 geo.synchronize() # list of volume numbers and new bottom surfaces, which were extruded by the five surfaces n_surfaces = ( -1 ) # the number of surfaces is number of model area lines + 1 for the refinement area volume_list = [] surface_list = [] for i, (dim, tag) in enumerate(extrusion_tags): if dim == 3: volume_list.append(tag) surface_list.append(extrusion_tags[i - 1]) for i_gw in range(len(groundwaters)): if n_surfaces in gw_downstream[i_gw]: gw_downstream_tags[i_gw].append( extrusion_tags[i + 1][1] ) if n_surfaces in gw_upstream[i_gw]: gw_upstream_tags[i_gw].append( extrusion_tags[i + 1][1] ) n_surfaces += 1 volumes_list_for_layers.append(volume_list) k = 0 BHE_group = [] for i, BHE_i in enumerate(BHE_array): extrusion_tags = geo.extrude( [(0, bhe_top_nodes[i])], 0, 0, BHE_i.z_end - BHE_i.z_begin, BHE_extrusion_layers[i], BHE_extrusion_depths[i], True, ) BHE_group.append(extrusion_tags[1][1]) geo.synchronize() for i, volume_i in enumerate(volumes_list_for_layers): model.addPhysicalGroup(3, volume_i, i) for k, BHE_group_k in enumerate(BHE_group, start=i + 1): model.addPhysicalGroup(1, [BHE_group_k], k) model.addPhysicalGroup(2, top_surface, k + 1, name="Top_Surface") model.addPhysicalGroup( 2, np.array(surface_list)[:, 1].tolist(), k + 2, name="Bottom_Surface", ) for key, (numNodes, prop) in transfinite_curves.items(): mesh.setTransfiniteCurve(key, numNodes, coef=prop) for surface in transfinite_surfaces: mesh.setTransfiniteSurface(surface) mesh.setRecombine(2, surface) mesh.recombine() gw_counter = 0 # counter_for_gw_start_at_soil_layer for i, groundwater_entry in enumerate(groundwater_list): # add loop for different groundwater flow directions offset = np.abs(groundwater_entry[2]) in np.cumsum(layer) start_id = groundwater_entry[0] + i + int(not offset) - gw_counter end_id = groundwater_entry[3] + i + int(not offset) - gw_counter if offset: gw_counter += 1 model.addPhysicalGroup( 2, gw_downstream_tags[i][start_id:end_id], tag=k + 3, name=f"Groundwater_downstream_{i}", ) model.addPhysicalGroup( 2, gw_upstream_tags[i][start_id:end_id], tag=k + 4, name=f"Groundwater_upstream_{i}", ) k += 2 def _mesh_prism() -> None: # define the outer boundaries square point_model_area_registry: dict[int, tuple[float, ...]] = {} if shapely.get_coordinate_dimension(model_area) == 2: for x, y in model_area.exterior.coords[:-1]: point_model_area_registry[ geo.addPoint(x, y, 0, outer_mesh_size) ] = (x, y, 0) else: for x, y, z in model_area.exterior.coords[:-1]: point_model_area_registry[ geo.addPoint(x, y, z, outer_mesh_size) ] = (x, y, z) point_ids = list(point_model_area_registry.keys()) lines = list(zip(point_ids, point_ids[1:] + point_ids[:1], strict=True)) line_ids = [] gw_upstream: list[list[int]] = [[] for _ in range(len(groundwaters))] gw_downstream: list[list[int]] = [[] for _ in range(len(groundwaters))] ref_vec = [1, 0] # in positive x-dir for i, (p1, p2) in enumerate(lines): line_ids.append(geo.addLine(p1, p2)) p1_x, p1_y = point_model_area_registry[p1][:2] p2_x, p2_y = point_model_area_registry[p2][:2] dx = p2_x - p1_x dy = p2_y - p1_y normal_vec = [dy, -dx] # Calculate dot product dot_product = np.dot(ref_vec, normal_vec) # Calculate magnitudes (lengths of the vectors) magnitude_A = np.linalg.norm(ref_vec) magnitude_B = np.linalg.norm(normal_vec) # Calculate angle in radians angle_radians = np.arccos(dot_product / (magnitude_A * magnitude_B)) # Convert radians to degrees normal_vec_angle = ( 360 - np.degrees(angle_radians) if normal_vec[1] < 0 else np.degrees(angle_radians) ) for i_gw, groundwater in enumerate(groundwaters): if groundwater.upstream[0] > groundwater.upstream[1]: if ( normal_vec_angle > groundwater.upstream[0] or normal_vec_angle < groundwater.upstream[1] ): gw_upstream[i_gw].append(i) elif ( normal_vec_angle > groundwater.upstream[0] and normal_vec_angle < groundwater.upstream[1] ): gw_upstream[i_gw].append(i) if groundwater.downstream[0] > groundwater.downstream[1]: if ( normal_vec_angle > groundwater.downstream[0] or normal_vec_angle < groundwater.downstream[1] ): gw_downstream[i_gw].append(i) elif ( normal_vec_angle > groundwater.downstream[0] and normal_vec_angle < groundwater.downstream[1] ): gw_downstream[i_gw].append(i) if groundwaters: if not _flatten_concatenation(gw_downstream): msg = "No groundwater upstream surfaces detected, please check your specified angles!" raise ValueError(msg) if not _flatten_concatenation(gw_downstream): msg = "No groundwater downstream surfaces detected, please check your specified angles!" raise ValueError(msg) geo.addCurveLoop(line_ids, 1) geo.addPlaneSurface([1], 1) geo.synchronize() bhe_top_nodes = _insert_BHE(inTag=1) # Extrude the surface mesh according to the previously evaluated structure volumes_list_for_layers = [] top_surface = [1] surface_list = [(2, 1)] gw_downstream_tags: list[list[int]] = [ [] for _ in range(len(groundwaters)) ] gw_upstream_tags: list[list[int]] = [ [] for _ in range(len(groundwaters)) ] for j, num_elements in enumerate(number_of_layers): # spacing of the each layer must be evaluated according to the implementation of the bhe extrusion_tags = gmsh.model.geo.extrude( surface_list, 0, 0, -depth_of_extrusion[j], num_elements, cummulative_height_of_layers[j], True, ) # soil 1 # list of new bottom surfaces, extruded by the five surfaces surface_list = [extrusion_tags[0]] for i_gw in range(len(groundwaters)): for i_tags in range(len(gw_upstream)): for tag in gw_upstream[i_tags]: gw_upstream_tags[i_gw].append( extrusion_tags[tag + 2][1] ) for tag in gw_downstream[i_tags]: gw_downstream_tags[i_gw].append( extrusion_tags[tag + 2][1] ) volumes_list_for_layers.append([extrusion_tags[1][1]]) BHE_group = [] for i, BHE_i in enumerate(BHE_array): extrusion_tags = gmsh.model.geo.extrude( [(0, bhe_top_nodes[i])], 0, 0, BHE_i.z_end - BHE_i.z_begin, BHE_extrusion_layers[i], BHE_extrusion_depths[i], ) BHE_group.append(extrusion_tags[1][1]) geo.synchronize() for i, volume_i in enumerate(volumes_list_for_layers): model.addPhysicalGroup(3, volume_i, i) for k, BHE_group_k in enumerate(BHE_group, start=i + 1): model.addPhysicalGroup(1, [BHE_group_k], k) model.addPhysicalGroup(2, top_surface, k + 1, name="Top_Surface") model.addPhysicalGroup( 2, np.array(surface_list)[:, 1].tolist(), k + 2, name="Bottom_Surface", ) gw_counter = 0 # counter_for_gw_start_at_soil_layer for i, groundwater_entry in enumerate(groundwater_list): # add loop for different groundwater flow directions offset = np.abs(groundwater_entry[2]) in np.cumsum(layer) start_id = groundwater_entry[0] + i + int(not offset) - gw_counter end_id = groundwater_entry[3] + i + int(not offset) - gw_counter if offset: gw_counter += 1 model.addPhysicalGroup( 2, gw_downstream_tags[i][start_id:end_id], tag=k + 3, name=f"Groundwater_downstream_{i}", ) model.addPhysicalGroup( 2, gw_upstream_tags[i][start_id:end_id], tag=k + 4, name=f"Groundwater_upstream_{i}", ) k += 2 def mesh_size_callback( _dim: int, _tag: int, x: float, y: float, z: float, lc: float ) -> float: if refinement_area.contains(shapely.Point(x, y, z)): return min(lc, inner_mesh_size) return min(lc, outer_mesh_size) mesh.setSizeCallback(callback=mesh_size_callback) layer = layer if isinstance(layer, list) else [layer] groundwaters: list[Groundwater] = ( [groundwater] if isinstance(groundwater, Groundwater) else groundwater ) BHE_Array = [BHE_Array] if isinstance(BHE_Array, BHE) else BHE_Array model_area = shapely.orient_polygons(model_area) refinement_area = shapely.orient_polygons(refinement_area) # detect the soil layer, in which the groundwater flow starts groundwater_list: list[list] = [] for groundwater in groundwaters: start_groundwater = -1000 # Index for critical layer structure, 0: not critical, 1: top critical, # 2: bottom critical, 3: groundwater at layer transition icl: float = -1 # needed_medias_in_ogs=len(layer)+1 for i, _ in enumerate(layer): if ( np.abs(groundwater.begin) < np.sum(layer[: i + 1]) and start_groundwater == -1000 ): start_groundwater = i if ( # previous elif, one semantic block of different cases -> switch to if, because of ruff error np.abs(groundwater.begin) - np.sum(layer[:start_groundwater]) < n_refinement_layers * target_z_size_fine ): # print('difficult meshing at the top of the soil layer - GW') icl = 1 # beginning of groundwater at a transition of two soil layers - special case if np.abs(groundwater.begin) == np.sum( layer[:start_groundwater] ): icl = 3 # needed_medias_in_ogs=len(layer) # needed_extrusions=len(layer) elif ( np.sum(layer[: start_groundwater + 1]) - np.abs(groundwater.begin) < n_refinement_layers * target_z_size_fine ): # for layers, which are top and bottom critical icl = 1.2 elif ( np.sum(layer[: start_groundwater + 1]) - np.abs(groundwater.begin) < n_refinement_layers * target_z_size_fine and icl != 1.2 ): # print('critical at the bottom of the soil layer-GW') icl = 2 else: icl = 0 groundwater_list.append( [ start_groundwater, icl, groundwater.begin, groundwater.isolation_layer_id, groundwater.downstream, groundwater.upstream, ] ) ### Start of the algorithm ### BHE_array = np.asarray(BHE_Array) i = 0 BHE_to_soil = np.zeros( shape=(len(BHE_array),), dtype=[ ("BHE_index", np.uint16), ("BHE_start_layer", np.uint8), # Soil layer, in which the respective BHE starts ("BHE_start_critical", np.float16), # define, where is a critical transition for BHE start with 0 - not critical, 1 - top critical, 2 - bottom critical, 3 - bhe at layer transition ("BHE_end_layer", np.uint8), # Soil layer, in which the respective BHE ends ("BHE_end_critical", np.float16), # define, where is a critical transition for BHE end with see BHE_start_critical plus 1.2 - top and bottom critical ], ) # detect the soil layer, in which the BHE ends for j, BHE_j in enumerate(BHE_array): if BHE_j.z_end >= BHE_j.z_begin: # pragma: no cover msg = f"BHE end depth must be smaller than BHE begin depth for BHE {j}" raise ValueError(msg) if BHE_j.z_begin > 0: # pragma: no cover msg = "BHE begin depth must be zero or negative for BHE {j}" raise ValueError(msg) for i, _ in enumerate(layer): # detect the soil layer, in which the BHE starts - for the moment only for detection if np.abs(BHE_j.z_begin) < np.sum(layer[: i + 1]) and np.abs( BHE_j.z_begin ) >= np.sum(layer[:i]): BHE_to_soil[j]["BHE_index"] = j BHE_to_soil[j]["BHE_start_layer"] = i if ( np.abs(BHE_j.z_end) - np.abs(BHE_j.z_begin) <= n_refinement_layers * target_z_size_fine ): # pragma: no cover msg = "BHE to short, must be longer than n_refinement_layers * target_z_size_fine!" raise ValueError(msg) if ( # previous elif, one semantic block of different cases -> switch to if, because of ruff error np.abs(BHE_j.z_begin) - np.sum(layer[:i]) < n_refinement_layers * target_z_size_fine ): # print('difficult meshing at the top of the soil layer - BHE %d'%j) # beginning a transition of two soil layers - special case BHE_to_soil[j]["BHE_start_critical"] = 1 if np.abs(BHE_j.z_begin) == np.sum(layer[:i]): BHE_to_soil[j]["BHE_start_critical"] = 3 elif ( np.sum(layer[: i + 1]) - np.abs(BHE_j.z_begin) < n_refinement_layers * target_z_size_fine ): # print('critical at the bottom of the soil layer - BHE %d'%j) BHE_to_soil[j]["BHE_start_critical"] = 2 else: BHE_to_soil[j]["BHE_start_critical"] = 0 # detect the soil layer, in which the BHE ends if np.abs(BHE_j.z_end) < np.sum(layer[: i + 1]) and np.abs( BHE_j.z_end ) >= np.sum(layer[:i]): BHE_to_soil[j]["BHE_end_layer"] = i if ( np.abs(BHE_j.z_end) - np.sum(layer[:i]) < n_refinement_layers * target_z_size_fine ): # print('difficult meshing at the top of the soil layer - BHE %d'%j) BHE_to_soil[j]["BHE_end_critical"] = 1 # beginning at a transition of two soil layers - special case if np.abs(BHE_j.z_end) == np.sum(layer[:i]): BHE_to_soil[j]["BHE_end_critical"] = 3 elif ( np.sum(layer[: i + 1]) - np.abs(BHE_j.z_end) < n_refinement_layers * target_z_size_fine ): # for layers, which are top and bottom critical BHE_to_soil[j]["BHE_end_critical"] = 1.2 elif ( np.sum(layer[: i + 1]) - np.abs(BHE_j.z_end) < n_refinement_layers * target_z_size_fine ): # print('critical at the bottom of the soil layer - BHE %d'%j) BHE_to_soil[j]["BHE_end_critical"] = 2 else: BHE_to_soil[j]["BHE_end_critical"] = 0 elif np.abs(BHE_j.z_end) >= np.sum(layer): # pragma: no cover msg = f"BHE {j} ends at bottom boundary or outside of the model" raise ValueError(msg) needed_depths: list = [] # interesting depths for i, _ in enumerate(layer): # only the interesting depths in the i-th layer # TODO: Rename the variable BHE_end_depths = [] # filter, which BHE's ends in this layer BHE_end_in_Layer = BHE_to_soil[BHE_to_soil["BHE_end_layer"] == i] for k in BHE_end_in_Layer["BHE_index"]: BHE_end_depths.append( [BHE_array[k].z_end, BHE_to_soil[k]["BHE_end_critical"]] ) # filter, which BHE's starts in this layer BHE_starts_in_Layer = BHE_to_soil[BHE_to_soil["BHE_start_layer"] == i] for k in BHE_starts_in_Layer["BHE_index"]: BHE_end_depths.append( [BHE_array[k].z_begin, BHE_to_soil[k]["BHE_start_critical"]] ) groundwater_list_0 = np.array( [inner_list[0] for inner_list in groundwater_list] ) if i in groundwater_list_0: if len(np.argwhere(groundwater_list_0 == i)) == 1: BHE_end_depths.append( [ groundwaters[ np.argwhere(groundwater_list_0 == i)[0, 0] ].begin, icl, ] ) else: # pragma: no cover msg = "Two or more groundwater flows starts in the same soil layer, this is not allowed !" raise Exception(msg) # print(BHE_end_depths) groundwater_list_3 = np.array( [inner_list[3] for inner_list in groundwater_list] ) if i in groundwater_list_3: BHE_end_depths.append([-np.sum(layer[:i]), 3]) BHE_end_depths.append([-np.sum(layer[:i]), 0]) BHE_end_depths.append([-np.sum(layer[: i + 1]), 0]) depths = np.unique(BHE_end_depths, axis=0) # [::-1] test, counts = np.unique(depths[:, 0], return_counts=True) arg_indexes = np.argwhere(counts > 1) if arg_indexes.size != 0: new_test = depths[depths[:, 0] == test[arg_indexes[0]]][:, 1] if np.argwhere(new_test > 0) == 1: duplicate_depth = np.argwhere( depths[:, 0] == test[arg_indexes[0]] ) not_needed_icl = np.argwhere(depths[:, 1] == 0) depths = np.delete( depths, np.intersect1d(duplicate_depth, not_needed_icl), axis=0, ) else: # pragma: no cover msg = "Layering to difficult, groundwater, BHE depths and needed layers are very close - behaviour currently not implemented" raise Exception(msg) BHE_end_depths = depths[::-1] needed_depths.append(BHE_end_depths) number_of_layers: list = [] cummulative_height_of_layers: list = [] depth_of_extrusion: list = [] for i, _ in enumerate(layer): # Schleife zum Berechnen der Layer-Struktur # all depths, which needs a node in the mesh list_of_needed_depths = needed_depths[i] # vorheriger_layer - Abstand für top-critical etc. if i > 0: if ( 2 in needed_depths[i - 1][:, 1] or 1.2 in needed_depths[i - 1][:, 1] ): space_last_layer_refined = ( needed_depths[i - 1][-1, 0] - needed_depths[i - 1][-2, 0] + n_refinement_layers * target_z_size_fine ) if space_last_layer_refined < target_z_size_fine: space_last_layer_refined = target_z_size_fine else: space_last_layer_refined = 0 else: space_last_layer_refined = 0 # nächster_Layer - Abstand für top-critical etc. if i < len(layer) - 1: if ( 1 in needed_depths[i + 1][:, 1] or 3 in needed_depths[i + 1][:, 1] or 1.2 in needed_depths[i + 1][:, 1] ): if 3 in needed_depths[i + 1][:, 1]: space_next_layer_refined = ( n_refinement_layers * target_z_size_fine ) else: space_next_layer_refined = ( needed_depths[i + 1][1, 0] - needed_depths[i + 1][0, 0] + n_refinement_layers * target_z_size_fine ) if space_next_layer_refined < target_z_size_fine: space_next_layer_refined = target_z_size_fine else: space_next_layer_refined = 0 # nichts beim layering zu beachten elif i + 1 in groundwater_list_3: # i+1 == groundwater[1]: space_next_layer_refined = ( n_refinement_layers * target_z_size_fine ) # if, groundwater isolator is deeper than the model area else: space_next_layer_refined = 0 absolute_height_of_layers: list = [] number_of_layers.append([]) # Evaluate Mesh for the Soil-Layers for j in range( 1, len(list_of_needed_depths) ): # not sure, hope to run up to the bounds of the layer groundwater_list_2 = np.array( [inner_list[2] for inner_list in groundwater_list] ) if ( list_of_needed_depths[j, 0] in groundwater_list_2 and list_of_needed_depths[j, 0] != list_of_needed_depths[-1, 0] ): _compute_layer_spacing( list_of_needed_depths[j - 1, 0], list_of_needed_depths[j, 0], list_of_needed_depths[j - 1, 1], j, len(list_of_needed_depths) - 1, ) # Befehle zum Anlegen einer neuen Extrusion cummulative_height_of_layers.append( ( np.cumsum( np.array(absolute_height_of_layers) / np.sum(absolute_height_of_layers) ) ).tolist() ) depth_of_extrusion.append(np.sum(absolute_height_of_layers)) absolute_height_of_layers = [] number_of_layers.append([]) else: _compute_layer_spacing( list_of_needed_depths[j - 1, 0], list_of_needed_depths[j, 0], list_of_needed_depths[j - 1, 1], j, len(list_of_needed_depths) - 1, ) # print('no fine mesh at layer transition') cummulative_height_of_layers.append( ( np.cumsum( np.array(absolute_height_of_layers) / np.sum(absolute_height_of_layers) ) ).tolist() ) depth_of_extrusion.append(np.sum(absolute_height_of_layers)) # evaluate all extrusion depths with according num of layers All_extrusion_depths: list = [] All_extrusion_layers: list = [] last_height = 0 for i, num_layer in enumerate(number_of_layers): All_extrusion_depths.append( ( depth_of_extrusion[i] * np.array(cummulative_height_of_layers[i]) + last_height ).tolist() ) last_height = All_extrusion_depths[-1][-1] All_extrusion_layers.append(num_layer) all_extrusion = np.array( [ _flatten_concatenation(All_extrusion_depths), _flatten_concatenation(All_extrusion_layers), ] ).transpose() BHE_extrusion_layers: list = [] BHE_extrusion_depths: list = [] # evaluate the extrusion for the BHE's for BHE_i in BHE_array: # add little relax tolerance 0.001 needed_extrusion = all_extrusion[ ( (all_extrusion[:, 0] >= np.abs(BHE_i.z_begin)) & (all_extrusion[:, 0] <= np.abs(BHE_i.z_end) + 0.001) ) ] BHE_extrusion_layers.append(needed_extrusion[:, 1]) BHE_extrusion_depths.append( (needed_extrusion[:, 0] - np.abs(BHE_i.z_begin)) / (needed_extrusion[-1, 0] - np.abs(BHE_i.z_begin)) ) outer_mesh_size_inner = (outer_mesh_size + inner_mesh_size) / 2 with TemporaryDirectory() as tmpdir: msh_file = Path(tmpdir) / f"{meshname}.msh" gmsh.initialize() gmsh.option.setNumber("General.Verbosity", 2) model = gmsh.model geo = model.geo mesh = model.mesh model.add(msh_file.stem) if meshing_type == "structured": _mesh_structured() elif meshing_type == "prism": _mesh_prism() mesh.generate(3) gmsh.option.setNumber("Mesh.SecondOrderIncomplete", 1) mesh.setOrder(order) mesh.removeDuplicateNodes() # delete zero-volume elements # 1 for line elements --> BHE's are the reason elem_tags, node_tags = mesh.getElementsByType(1) elem_qualities = mesh.getElementQualities( elementTags=elem_tags, qualityName="volume" ) zero_volume_elements_id = np.argwhere(elem_qualities == 0) # only possible with the hack over the visibilitiy, see https://gitlab.onelab.info/gmsh/gmsh/-/issues/2006 mesh.setVisibility( elem_tags[zero_volume_elements_id].ravel().tolist(), 0 ) gmsh.plugin.setNumber("Invisible", "DeleteElements", 1) gmsh.plugin.run("Invisible") gmsh.write(str(msh_file)) gmsh.finalize() return ot.Meshes.from_gmsh( msh_file, dim=[1, 3], log=False, meshname=meshname )