Source code for underworld3.meshing.surfaces

"""
Surface Module for Underworld3.

This module provides classes for representing and manipulating embedded discretized
surfaces within computational meshes. Surfaces are represented as:
- 3D: triangulated 2D manifolds (triangle meshes)
- 2D: polylines (connected line segments)

Surfaces can represent:

- Fault surfaces (tectonic, shear zones)
- Geological horizons (Moho, sediment layers)
- Phase boundaries (solid-liquid interfaces)
- Material boundaries (compositional interfaces)
- Subduction zones (slab interfaces)

Key features:
- pyvista PolyData as the storage backend (geometry + variable data)
- SurfaceVariable for per-vertex data with .sym access for expressions
- Lazy evaluation via stale flags (distance field, proxy MeshVariables)
- VTK I/O with automatic inclusion of all variable data
- Global/redundant storage (all ranks have full copy, ~10 MB for 100k vertices)

Example:
    >>> # Create a surface from points
    >>> surface = uw.meshing.Surface("fault1", mesh, points)
    >>> surface.discretize()
    >>>
    >>> # Add variables
    >>> friction = surface.add_variable("friction", size=1)
    >>> friction.data[:] = 0.6
    >>>
    >>> # Use in expressions via .sym
    >>> eta_weak = surface.influence_function(
    ...     width=0.05,
    ...     value_near=friction.sym,
    ...     value_far=1.0,
    ...     profile="gaussian",
    ... )
"""

from __future__ import annotations

import os
from pathlib import Path
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union

import numpy as np
import sympy

import underworld3 as uw
from underworld3 import mpi

if TYPE_CHECKING:
    from underworld3.discretisation import Mesh, MeshVariable


def _to_nd_length(value) -> float:
    """Convert a length value to nondimensional mesh coordinates.

    Accepts either a plain float (returned as-is) or a unit-aware quantity
    (e.g., ``uw.quantity(10, "km")``) which is nondimensionalised via the
    model's reference scales.

    This helper is used by Surface methods that accept physical distances
    (``h_near``, ``h_far``, ``width``, etc.) so that users can specify
    distances in natural units rather than manually dividing by a reference
    length.

    Parameters
    ----------
    value : float or UWQuantity
        A length value.  If it has a ``.magnitude`` attribute (Pint/UWQuantity),
        it is nondimensionalised.  Plain numbers pass through unchanged.

    Returns
    -------
    float
        Nondimensional value in mesh coordinate space.
    """
    if isinstance(value, (int, float, np.floating)):
        return float(value)

    # UWQuantity or Pint quantity — nondimensionalise via the units system
    try:
        nd_val = uw.non_dimensionalise(value)
        # non_dimensionalise may return UWQuantity or scalar — extract float
        if hasattr(nd_val, "value"):
            return float(nd_val.value)
        if hasattr(nd_val, "magnitude"):
            return float(nd_val.magnitude)
        return float(nd_val)
    except (TypeError, AttributeError):
        # Fallback: try to extract raw magnitude
        if hasattr(value, "magnitude"):
            return float(value.magnitude)
        return float(value)


def _require_pyvista():
    """Check pyvista availability with helpful error message."""
    try:
        import pyvista

        return pyvista
    except ImportError:
        raise ImportError(
            "Surface discretization and operations require pyvista. "
            "Install with: pixi install -e runtime"
        )


def _depth_to_km(value) -> float:
    """Convert a depth value to kilometres.

    Accepts a plain float (assumed km) or a ``uw.quantity`` which is
    converted to km via Pint.
    """
    if isinstance(value, (int, float, np.floating)):
        return float(value)
    # UWQuantity / Pint quantity — convert to km
    try:
        if hasattr(value, "to"):
            return float(value.to("km").magnitude)
        if hasattr(value, "magnitude"):
            return float(value.magnitude)
    except Exception:
        pass
    return float(value)


def _order_polyline(points: np.ndarray) -> np.ndarray:
    """Order 2D points along a polyline.

    First removes near-duplicate points (within 1e-10 distance), then
    finds the two true endpoints of the trace (the farthest-apart pair)
    and chains from one endpoint using nearest-neighbour traversal.

    Starting from a true endpoint avoids the common failure mode where
    nearest-neighbour begins mid-trace and creates loops at bends.

    Parameters
    ----------
    points : ndarray, shape (N, 2)
        Unordered 2D points.

    Returns
    -------
    ndarray, shape (M, 2)
        Points reordered along the polyline (M <= N after dedup).
    """
    if len(points) <= 2:
        return points.copy()

    # Remove near-duplicate points
    _, unique_idx = np.unique(np.round(points, decimals=10), axis=0, return_index=True)
    pts = points[np.sort(unique_idx)]
    n = len(pts)
    if n <= 2:
        return pts.copy()

    # Find the farthest-apart pair — these are the trace endpoints.
    # For small N (typical fault traces), brute-force pairwise is fine.
    dist_sq = np.sum((pts[:, None, :] - pts[None, :, :]) ** 2, axis=2)
    i_max, j_max = np.unravel_index(dist_sq.argmax(), dist_sq.shape)
    start = i_max  # begin from one endpoint

    # Nearest-neighbour chain from the endpoint
    visited = np.zeros(n, dtype=bool)
    order = np.empty(n, dtype=int)
    order[0] = start
    visited[start] = True

    for step in range(1, n):
        current = order[step - 1]
        diffs = pts[~visited] - pts[current]
        dists = np.einsum("ij,ij->i", diffs, diffs)
        unvisited_idx = np.where(~visited)[0]
        nearest = unvisited_idx[np.argmin(dists)]
        order[step] = nearest
        visited[nearest] = True

    return pts[order]


def _interpolate_trace(
    points: np.ndarray,
    target_spacing: float,
    smoothing: float = 0.0,
    is_geographic: bool = False,
) -> np.ndarray:
    """Interpolate a 2D polyline to a target point spacing.

    Uses scipy parametric spline fitting (``splprep`` / ``splev``).

    Parameters
    ----------
    points : ndarray, shape (N, 2)
        Ordered polyline vertices (lon/lat or x/y).
    target_spacing : float
        Desired spacing between output points.  For geographic traces
        this is in km; for Cartesian traces it is in model coordinates.
    smoothing : float
        Spline smoothing parameter (0 = exact interpolation).
    is_geographic : bool
        If True, arc lengths are estimated in km using a rough
        degree-to-km conversion (~111 km/deg).

    Returns
    -------
    ndarray, shape (M, 2)
        Resampled polyline with approximately *target_spacing* between
        consecutive points.
    """
    from scipy.interpolate import splprep, splev

    if len(points) < 2:
        return points.copy()

    # Estimate cumulative arc length
    diffs = np.diff(points, axis=0)
    if is_geographic:
        # Rough conversion: 1 degree ≈ 111 km (adequate for spacing estimates)
        seg_lengths = np.sqrt((diffs[:, 0] * 111) ** 2 + (diffs[:, 1] * 111) ** 2)
    else:
        seg_lengths = np.sqrt((diffs ** 2).sum(axis=1))

    total_length = seg_lengths.sum()
    if total_length < 1e-12:
        return points.copy()

    n_out = max(3, int(round(total_length / target_spacing)) + 1)

    # Fit parametric spline
    try:
        tck, u = splprep([points[:, 0], points[:, 1]], s=smoothing)
        u_new = np.linspace(0, 1, n_out)
        x_new, y_new = splev(u_new, tck)
        return np.column_stack([x_new, y_new])
    except Exception:
        # Fallback: linear interpolation along arc length
        cum_length = np.concatenate([[0], np.cumsum(seg_lengths)])
        t_old = cum_length / total_length
        t_new = np.linspace(0, 1, n_out)
        x_new = np.interp(t_new, t_old, points[:, 0])
        y_new = np.interp(t_new, t_old, points[:, 1])
        return np.column_stack([x_new, y_new])


def _profile_to_edge_lengths(
    dist_values: np.ndarray,
    h_near: float,
    h_far: float,
    width: float,
    profile: str,
) -> np.ndarray:
    """Compute target edge lengths from distance values using a profile function.

    Parameters
    ----------
    dist_values : ndarray
        Unsigned distance values at each mesh node.
    h_near : float
        Target edge length near the surface.
    h_far : float
        Target edge length far from the surface.
    width : float
        Transition distance from h_near to h_far.
    profile : str
        One of "linear", "smoothstep", or "gaussian".

    Returns
    -------
    ndarray
        Target edge lengths, same shape as *dist_values*.
    """
    if profile == "linear":
        t = np.minimum(dist_values / width, 1.0)
        return h_near + (h_far - h_near) * t
    elif profile == "smoothstep":
        t = np.minimum(dist_values / width, 1.0)
        smooth_t = 3 * t**2 - 2 * t**3
        return h_near + (h_far - h_near) * smooth_t
    elif profile == "gaussian":
        sigma = width / 3.0
        gaussian = np.exp(-(dist_values**2) / (2 * sigma**2))
        return h_far - (h_far - h_near) * gaussian
    else:
        raise ValueError(
            f"Unknown profile: {profile}. Use 'linear', 'smoothstep', or 'gaussian'"
        )


def _compute_trace_perpendicular(
    trace_points: np.ndarray,
    direction: str = "right",
    is_geographic: bool = False,
) -> np.ndarray:
    """Compute the perpendicular direction at each point of a 2D trace.

    Returns vectors such that ``offset_km * result`` (geographic) or
    ``offset * result`` (Cartesian) gives the correct horizontal
    displacement perpendicular to the trace.

    Parameters
    ----------
    trace_points : ndarray, shape (N, 2)
        Ordered trace points — ``(lon, lat)`` for geographic or
        ``(x, y)`` for Cartesian.
    direction : str
        ``"right"`` or ``"left"`` relative to the trace direction
        (from first point to last).
    is_geographic : bool
        If True, account for the latitude-dependent metric when
        computing directions and return results in degrees so that
        ``offset_km * perp`` gives the geographic offset.

    Returns
    -------
    ndarray, shape (N, 2)
        Perpendicular direction at each trace point.
    """
    n = len(trace_points)
    tangent = np.zeros_like(trace_points)

    # Central differences for interior, forward/backward for endpoints
    if n > 2:
        tangent[1:-1] = trace_points[2:] - trace_points[:-2]
    tangent[0] = trace_points[min(1, n - 1)] - trace_points[0]
    tangent[-1] = trace_points[-1] - trace_points[max(0, n - 2)]

    if is_geographic:
        cos_lat = np.cos(np.radians(trace_points[:, 1]))
        km_per_deg_lon = 111.32 * cos_lat
        km_per_deg_lat = 111.32

        tangent_km = np.column_stack([
            tangent[:, 0] * km_per_deg_lon,
            tangent[:, 1] * km_per_deg_lat,
        ])
    else:
        tangent_km = tangent.copy()

    # Normalise to unit length
    lengths = np.sqrt(tangent_km[:, 0] ** 2 + tangent_km[:, 1] ** 2)
    lengths = np.maximum(lengths, 1e-10)
    tangent_km /= lengths[:, np.newaxis]

    # Rotate 90° to get perpendicular
    if direction == "right":
        perp_km = np.column_stack([tangent_km[:, 1], -tangent_km[:, 0]])
    else:  # "left"
        perp_km = np.column_stack([-tangent_km[:, 1], tangent_km[:, 0]])

    if is_geographic:
        # Convert back to degrees (so offset_km * perp gives degree offset)
        perp = np.column_stack([
            perp_km[:, 0] / km_per_deg_lon,
            perp_km[:, 1] / km_per_deg_lat,
        ])
    else:
        perp = perp_km

    return perp


[docs] class SurfaceVariable: """Variable defined on surface vertices, stored in pyvista point_data. Provides direct access to per-vertex data via .data property, and symbolic access for expressions via .sym property. The .sym property uses a proxy MeshVariable that is lazily interpolated from surface vertices to mesh nodes when accessed. Unit awareness is provided via UnitAwareArray wrapper on .data access. Distance-based masking is available via .mask property when mask_width is set. Attributes: name: Variable name (also the key in pyvista point_data) surface: Parent Surface object size: Number of components per vertex units: Optional units for this variable (e.g., "Pa", "m/s") Example: >>> friction = surface.add_variable("friction", size=1, mask_width=0.1) >>> friction.data[:] = 0.6 # Set values on surface vertices >>> friction.data[weak_mask] = 0.3 # Set weak zone values >>> >>> # Use in expressions with explicit masking >>> eta = friction.sym[0] * friction.mask """
[docs] def __init__( self, name: str, surface: "Surface", size: int = 1, proxy_degree: int = 1, existing: bool = False, units: Optional[str] = None, mask_width: Optional[float] = None, mask_profile: str = "gaussian", ): """Create a variable on surface vertices. Args: name: Variable name (key in pyvista point_data) surface: Parent Surface object size: Number of components per vertex (1 for scalar, 3 for vector) proxy_degree: Degree of proxy MeshVariable for .sym access existing: If True, wraps existing point_data (for loading from VTK) units: Optional units for this variable (e.g., "Pa", "m/s") mask_width: Width for distance-based mask (enables .mask property) mask_profile: Profile for mask function ("step", "linear", "gaussian", "smoothstep") """ self.name = name self.surface = surface self.size = size self._proxy_degree = proxy_degree self._units = units self._mask_width = mask_width self._mask_profile = mask_profile # Create array in pyvista's point_data (unless wrapping existing) if not existing: n_verts = surface.n_vertices if size == 1: surface._pv_mesh.point_data[name] = np.zeros(n_verts) else: surface._pv_mesh.point_data[name] = np.zeros((n_verts, size)) # Proxy MeshVariable for .sym access (created lazily) self._proxy: Optional[uw.discretisation.MeshVariable] = None self._proxy_stale = True
@property def units(self) -> Optional[str]: """Units for this variable (None if dimensionless).""" return self._units @property def has_units(self) -> bool: """Check if this variable has units.""" return self._units is not None @property def data(self): """Direct access to vertex data with optional unit awareness. Returns: UnitAwareArray if units are set, otherwise plain numpy array. Shape is (n_vertices,) for size=1, or (n_vertices, size) otherwise. Note: When units are set, modifications via array operations automatically sync to pyvista storage and mark proxy as stale. For raw numpy arrays, call mark_stale() after modifications. """ raw = self.surface._pv_mesh.point_data[self.name] if self._units is not None: # Import here to avoid circular imports from underworld3.utilities.unit_aware_array import UnitAwareArray # Create callback to sync changes to pyvista and mark stale def sync_to_pyvista(array, info): self.surface._pv_mesh.point_data[self.name] = np.asarray(array) self._proxy_stale = True return UnitAwareArray( raw, units=self._units, callback=sync_to_pyvista, owner=self, ) return raw @data.setter def data(self, values) -> None: """Set vertex data and mark proxy as stale. If values have units (magnitude attribute), the magnitude is extracted for storage in pyvista. Units are tracked separately. """ # Strip units if present if hasattr(values, 'magnitude'): values = values.magnitude self.surface._pv_mesh.point_data[self.name] = np.asarray(values) self._proxy_stale = True
[docs] def mark_stale(self) -> None: """Mark the proxy as stale so it will be recomputed on next .sym access.""" self._proxy_stale = True
@property def mask(self) -> sympy.Expr: """Distance-based mask: 1 near surface, 0 far away. Uses the surface's signed distance field with the configured profile. Must be explicitly applied in expressions: use `var.sym * var.mask`. Returns: sympy.Expr representing the mask (1 near, 0 far) Raises: ValueError: If mask_width was not set in add_variable() Example: >>> friction = surface.add_variable("friction", mask_width=0.1) >>> friction.data[:] = 0.6 >>> eta = friction.sym[0] * friction.mask # Masked value """ if self._mask_width is None: raise ValueError( f"SurfaceVariable '{self.name}' has no mask_width set. " "Set mask_width in add_variable() or use surface.influence_function() directly." ) return self.surface.influence_function( width=self._mask_width, value_near=1.0, value_far=0.0, profile=self._mask_profile, ) @property def sym(self) -> sympy.Matrix: """Symbolic representation for use in expressions. Returns: sympy.Matrix that can be used in Underworld expressions. Note: On first access (or after mark_stale()), this triggers interpolation from surface vertices to mesh nodes. The interpolation uses inverse distance weighting from nearby surface vertices. """ if self._proxy is None: self._create_proxy() if self._proxy_stale: self._interpolate_to_proxy() self._proxy_stale = False return self._proxy.sym def _create_proxy(self) -> None: """Create the proxy MeshVariable for .sym access.""" self._proxy = uw.discretisation.MeshVariable( f"surf_{self.surface.name}_{self.name}", self.surface.mesh, self.size, degree=self._proxy_degree, ) def _interpolate_to_proxy(self) -> None: """Interpolate surface vertex data to mesh nodes. Each rank populates its LOCAL mesh nodes only using inverse distance weighting from surface vertices. """ if self._proxy is None: self._create_proxy() # Get local mesh node coordinates in model (internal) space # to match surface vertex coordinates mesh_coords = np.asarray(self.surface.mesh._coords) # Get surface vertex data in model (internal) space — bypass the # dimensionalising output gateway so coordinates match mesh._coords. surface_coords = np.array(self.surface._pv_mesh.points) if self.surface._pv_mesh is not None else None surface_values = self.data # For 2D surfaces, use only x,y components for KDTree if self.surface.is_2d: surface_coords = np.ascontiguousarray(surface_coords[:, :2]) # Build KDTree for surface vertices kdtree = uw.kdtree.KDTree(surface_coords) # Find nearest surface vertex for each mesh node # (Simple nearest-neighbor for now; could use inverse distance weighting) _, nearest_idx = kdtree.query(mesh_coords) # Transfer values with uw.synchronised_array_update(): if self.size == 1: self._proxy.data[:, 0] = surface_values[nearest_idx.flatten()] else: self._proxy.data[:] = surface_values[nearest_idx.flatten()] def __repr__(self) -> str: parts = [ f"SurfaceVariable(name='{self.name}'", f"surface='{self.surface.name}'", f"size={self.size}", ] if self._units is not None: parts.append(f"units='{self._units}'") if self._mask_width is not None: parts.append(f"mask_width={self._mask_width}") # Get raw data length to avoid triggering UnitAwareArray raw = self.surface._pv_mesh.point_data[self.name] parts.append(f"n_vertices={len(raw)}") return ", ".join(parts) + ")"
[docs] class Surface: """A discretized embedded surface with variable storage. Represents a surface embedded in a computational mesh using pyvista PolyData for storage. In 3D, surfaces are triangulated meshes; in 2D, they are polylines. Supports per-vertex variables with symbolic access for use in Underworld expressions. The surface uses lazy evaluation with stale flags: - Discretization is computed when first accessed - Distance field is computed when first accessed - Proxy MeshVariables are updated when .sym is accessed Attributes: name: Identifier for this surface mesh: Associated computational mesh (for proxy MeshVariables) vertices: (N, 3) array of vertex positions (via pyvista) normals: (N, 3) array of vertex normals (via pyvista) n_vertices: Number of vertices n_triangles: Number of triangles (3D only) Example: >>> # Create from points and discretize >>> surface = uw.meshing.Surface("fault", mesh, points) >>> surface.discretize() >>> >>> # Add a variable >>> friction = surface.add_variable("friction") >>> friction.data[:] = 0.6 >>> >>> # Use distance field in influence function >>> eta_weak = surface.influence_function( ... width=0.05, ... value_near=0.01, ... value_far=1.0, ... profile="gaussian", ... ) """
[docs] def __init__( self, name: str, mesh: "Mesh" = None, control_points: np.ndarray = None, symbol: str = None, ): """Create a surface. Parameters ---------- name : str Identifier for this surface. mesh : Mesh, optional Computational mesh (required for ``.sym`` access and distance field). control_points : ndarray, optional (N, 3) array of 3D points defining the surface. If None, the surface is empty and must be loaded or have points set later. symbol : str, optional Short LaTeX-friendly symbol for math display (e.g., ``"F"``). If None, defaults to first letter of name capitalized. """ self.name = name self.mesh = mesh # Register with mesh for adaptation notifications if mesh is not None and hasattr(mesh, 'register_surface'): mesh.register_surface(self) # Math symbol for clean LaTeX display # Default: first letter capitalized (e.g., "main_fault" -> "M") if symbol is not None: self._symbol = symbol else: # Default to the full (unique) name. The old `name[0].upper()` # collapsed distinct surfaces sharing a first letter (e.g. # "fault1"/"fault2" -> both "F") onto the IDENTICAL distance # varsymbol `d_{F}`, so their (correctly distinct) distance fields # silently aliased to one in `function.evaluate`. The full name is # unique per surface; pass `symbol=` for compact LaTeX if desired. self._symbol = name if name else "S" # Level 1: Control points (primary for evolving surfaces) self._control_points = None # Level 2: pyvista PolyData (global/redundant on all ranks) self._pv_mesh = None # SurfaceVariable wrappers self._variables: Dict[str, SurfaceVariable] = {} # Stale flags for lazy evaluation self._discretization_stale = True self._distance_stale = True # Level 3: Cached proxy MeshVariable for distance self._distance_var: Optional[uw.discretisation.MeshVariable] = None # Companion UNSIGNED (edge-clamped) distance field. influence_function() # interpolates THIS rather than Abs() of the signed field — see # _compute_distance_field for why the signed field is unsafe to abs. self._abs_distance_var: Optional[uw.discretisation.MeshVariable] = None # Dimension (2 or 3) - detected from mesh or control points self._dim = None # Set control points if provided if control_points is not None: self.set_control_points(control_points)
@property def dim(self) -> int: """Spatial dimension (2 or 3). Detected from mesh.dim if available, otherwise from control points shape. """ if self._dim is not None: return self._dim # Try to get from mesh if self.mesh is not None and hasattr(self.mesh, 'dim'): self._dim = self.mesh.dim return self._dim # Infer from control points if self._control_points is not None: if self._control_points.shape[1] == 2: self._dim = 2 else: self._dim = 3 return self._dim # Default to 3D return 3 @property def is_2d(self) -> bool: """True if this is a 2D surface (1D curve in 2D space).""" return self.dim == 2 @property def symbol(self) -> str: """Short LaTeX-friendly symbol for math expressions. Used in distance field expressions like $d_F$ instead of the full variable name {surf_fault_distance}. """ return self._symbol @symbol.setter def symbol(self, value: str) -> None: """Set the math symbol. Marks distance field as stale if changed.""" if value != self._symbol: self._symbol = value # If distance var exists, it needs to be recreated with new symbol if self._distance_var is not None: self._distance_var = None self._abs_distance_var = None self._distance_stale = True def _dimensionalise_coords(self, coords: np.ndarray) -> np.ndarray: """Apply dimensional scaling to internal model coordinates. Follows the same gateway pattern as ``mesh.X.coords``: internal storage is in model (ND) space; user-facing properties return physical (dimensional) coordinates when the units system is active. Returns the original array unchanged when no units are configured. """ if coords is None: return None if self.mesh is None: return coords cs = getattr(self.mesh, "CoordinateSystem", None) if cs is not None and getattr(cs, "_scaled", False): coords = coords * cs._length_scale if self.mesh.units is not None: from underworld3.utilities.unit_aware_array import UnitAwareArray return UnitAwareArray(coords, units="meter") return coords @property def control_points(self) -> Optional[np.ndarray]: """(N, 3) array of control points defining the surface. Returns coordinates in physical (dimensional) units when the units system is active, matching the ``mesh.X.coords`` convention. Internally, points are stored in model (non-dimensional) space. """ if self._control_points is None: return None return self._dimensionalise_coords(np.array(self._control_points))
[docs] def set_control_points(self, points: np.ndarray) -> None: """Set control points and mark discretization as stale. Coordinates are stored internally in model (non-dimensional) space, matching the mesh's internal coordinate representation (``mesh._coords``). Args: points: (N, 2) or (N, 3) array of points in one of these forms: - **Raw numpy array**: Assumed to be in model coordinates (same space as ``mesh._coords``). When units are active, this means nondimensional coordinates. - **Pint Quantity / UnitAwareArray**: Automatically converted to model coordinates via the scaling system. For 2D points, a z=0 column is appended automatically. """ # Handle unit conversion: Pint Quantity or UnitAwareArray → model coords # Use the mesh's coordinate system scale factor for consistency with # the output gateway (_dimensionalise_coords). if hasattr(points, "magnitude"): cs = getattr(self.mesh, "CoordinateSystem", None) if cs is not None and getattr(cs, "_scaled", False): # Convert to base SI (meters) then divide by scale factor if hasattr(points, "to_base_units"): points = np.asarray(points.to_base_units().magnitude) / cs._length_scale else: points = np.asarray(points.magnitude) / cs._length_scale else: # No scaling active — just strip units points = np.asarray(points.magnitude) points = np.asarray(points) if points.ndim != 2 or points.shape[1] not in (2, 3): raise ValueError( f"Points must be (N, 2) or (N, 3) array, got shape {points.shape}" ) # For 2D points, pad with z=0 if points.shape[1] == 2: self._dim = 2 points = np.column_stack([points, np.zeros(len(points))]) self._control_points = points self._discretization_stale = True self._distance_stale = True self._mark_all_proxies_stale()
def _mark_all_proxies_stale(self) -> None: """Mark all SurfaceVariable proxies as stale.""" for var in self._variables.values(): var.mark_stale() @property def vertices(self) -> Optional[np.ndarray]: """(N, 3) array of vertex positions. Returns coordinates in physical (dimensional) units when the units system is active, matching the ``mesh.X.coords`` convention. Internally, vertices are stored in model (non-dimensional) space. """ if self._pv_mesh is None: return None return self._dimensionalise_coords(np.array(self._pv_mesh.points)) @property def n_vertices(self) -> int: """Number of vertices in the discretized surface.""" if self._pv_mesh is None: return 0 return self._pv_mesh.n_points @property def n_triangles(self) -> int: """Number of triangles in the surface.""" if self._pv_mesh is None: return 0 return self._pv_mesh.n_cells @property def normals(self) -> Optional[np.ndarray]: """(N, 3) array of vertex normals (point normals from pyvista).""" self._ensure_discretized() if self._pv_mesh is None: return None return np.array(self._pv_mesh.point_normals) @property def triangles(self) -> Optional[np.ndarray]: """(M, 3) array of triangle vertex indices.""" if self._pv_mesh is None: return None faces = self._pv_mesh.faces if len(faces) == 0: return None return faces.reshape(-1, 4)[:, 1:4] @property def face_centers(self) -> Optional[np.ndarray]: """(M, 3) array of triangle centroids.""" self._ensure_discretized() if self._pv_mesh is None: return None return np.array(self._pv_mesh.cell_centers().points) @property def face_normals(self) -> Optional[np.ndarray]: """(M, 3) array of face normals (cell normals from pyvista).""" self._ensure_discretized() if self._pv_mesh is None: return None return np.array(self._pv_mesh.cell_normals) @property def pv_mesh(self): """PyVista PolyData mesh in the same coordinate space as ``mesh.X.coords``. Returns a shallow copy whose points have been dimensionalised so that they overlay correctly with ``vis.mesh_to_pv_mesh(mesh)``. The internal ``_pv_mesh`` stays in nondimensional model space so that distance calculations remain consistent with the solver. Returns None if the surface has not been discretized. """ if self._pv_mesh is None: return None dim_coords = self._dimensionalise_coords(np.array(self._pv_mesh.points)) copy = self._pv_mesh.copy(deep=True) copy.points = np.asarray(dim_coords, dtype=float) return copy @property def is_discretized(self) -> bool: """Whether the surface has been discretized.""" return self._pv_mesh is not None and self._pv_mesh.n_cells > 0 # --- Discretization --- def _ensure_discretized(self) -> None: """Ensure discretization is computed (lazy evaluation).""" if self._discretization_stale and self._control_points is not None: self.discretize()
[docs] def discretize(self, offset: float = 0.01, n_segments: int = None) -> None: """Discretize control points into a surface mesh. For 3D surfaces: Uses pyvista delaunay_2d to create a triangulated mesh. For 2D surfaces: Uses scipy to fit a spline and create a polyline. Args: offset: (3D only) Height offset for delaunay_2d (controls curvature tolerance). n_segments: (2D only) Number of line segments, or None for auto. Raises: ImportError: If pyvista not available ValueError: If points too sparse for discretization RuntimeError: If discretization fails """ if self._control_points is None or len(self._control_points) == 0: raise ValueError(f"Surface '{self.name}' has no control points to discretize") if self.is_2d: self._discretize_2d(n_segments=n_segments) else: self._discretize_3d(offset=offset) # Clear stale flags self._discretization_stale = False self._distance_stale = True
def _discretize_3d(self, offset: float = 0.01) -> None: """Discretize 3D control points into triangulated mesh using pyvista delaunay_2d.""" pv = _require_pyvista() if len(self._control_points) < 3: raise ValueError( f"Surface '{self.name}' has only {len(self._control_points)} points. " "Need at least 3 points for 3D discretization." ) # Check for degenerate cases (all points nearly collinear) extents = self._control_points.max(axis=0) - self._control_points.min(axis=0) sorted_extents = np.sort(extents) if sorted_extents[0] < 1e-10 * sorted_extents[2] and sorted_extents[1] < 1e-10 * sorted_extents[2]: raise ValueError( f"Surface '{self.name}' points appear to be nearly collinear. " "Cannot create a 2D surface from a 1D line." ) # Create PolyData from points and triangulate polydata = pv.PolyData(self._control_points) self._pv_mesh = polydata.delaunay_2d(offset=offset) if self._pv_mesh.n_cells == 0: raise RuntimeError( f"Triangulation failed for surface '{self.name}'. " "Try adjusting the offset parameter or check point distribution." ) # Compute normals (both point and cell) self._pv_mesh.compute_normals(inplace=True) def _discretize_2d(self, n_segments: int = None) -> None: """Create 2D surface as ordered line segments using scipy spline fitting. For 2D, a "surface" is a 1D curve (polyline) embedded in 2D space. """ pv = _require_pyvista() from scipy.interpolate import splprep, splev if len(self._control_points) < 2: raise ValueError( f"Surface '{self.name}' has only {len(self._control_points)} points. " "Need at least 2 points for 2D line segments." ) # Get 2D coordinates points_2d = self._control_points[:, :2] # For just 2 points, connect them directly if len(points_2d) == 2: vertices_2d = points_2d.copy() else: # Use scipy to fit a parametric spline through points # This naturally orders points along the curve try: # s=0 means interpolate exactly through points tck, u = splprep([points_2d[:, 0], points_2d[:, 1]], s=0) # Determine number of output points if n_segments is None: n_out = len(points_2d) else: n_out = n_segments + 1 # Evaluate spline at uniform parameter values u_new = np.linspace(0, 1, n_out) x, y = splev(u_new, tck) vertices_2d = np.column_stack([x, y]) except Exception as e: # Fall back to original points if spline fitting fails # (e.g., for nearly collinear points) vertices_2d = points_2d.copy() # Compute normals using geometry_tools from underworld3.utilities.geometry_tools import linesegment_normals_2d _, vertex_normals_2d = linesegment_normals_2d(vertices_2d) # Pad to 3D for pyvista storage (z=0) vertices_3d = np.column_stack([vertices_2d, np.zeros(len(vertices_2d))]) vertex_normals_3d = np.column_stack([ vertex_normals_2d, np.zeros(len(vertex_normals_2d)) ]) # Create pyvista line mesh self._pv_mesh = pv.lines_from_points(vertices_3d) # Store vertex normals in point_data (since compute_normals doesn't work for lines) self._pv_mesh.point_data["Normals"] = vertex_normals_3d # Store 2D vertices for distance computation (avoid z=0 overhead) self._vertices_2d = vertices_2d
[docs] def deform_vertices(self, displacement: np.ndarray) -> None: """Deform surface vertices in-place (no re-discretization). This modifies vertex positions while keeping topology fixed. Normals are automatically recomputed. Args: displacement: (n_vertices, 3) array of displacements to add Note: This does NOT update control points. If you want topology changes, use set_control_points() instead and call discretize(). """ self._ensure_discretized() if self._pv_mesh is None: raise RuntimeError(f"Surface '{self.name}' must be discretized before deforming") displacement = np.asarray(displacement) if displacement.shape != self._pv_mesh.points.shape: raise ValueError( f"Displacement shape {displacement.shape} doesn't match " f"vertex shape {self._pv_mesh.points.shape}" ) self._pv_mesh.points += displacement self._pv_mesh.compute_normals(inplace=True) self._distance_stale = True self._mark_all_proxies_stale()
# --- Distance field --- @property def distance(self) -> "uw.discretisation.MeshVariable": """Signed distance from mesh nodes to surface (lazily computed). The signed distance is positive on one side of the surface and negative on the other. Use sympy.Abs(surface.distance.sym[0]) for unsigned distance, or use influence_function() which does this automatically. Returns: MeshVariable with signed distance values at each mesh node. Access .sym[0] for use in expressions. Raises: RuntimeError: If mesh not set or surface not discretized Example: >>> # Use signed distance for different properties on each side >>> d = surface.distance.sym[0] >>> prop = sympy.Piecewise((upper_value, d > 0), (lower_value, True)) >>> >>> # Use absolute distance for symmetric influence >>> mask = sympy.Piecewise((1, sympy.Abs(d) < width), (0, True)) """ if self.mesh is None: raise RuntimeError( f"Surface '{self.name}' requires a mesh to compute distance field. " "Set mesh in constructor or via surface.mesh = mesh" ) self._ensure_discretized() if self._distance_stale: self._compute_distance_field() self._distance_stale = False return self._distance_var @property def abs_distance(self) -> "uw.discretisation.MeshVariable": """Unsigned (edge-clamped) distance from mesh nodes to the surface. Unlike ``Abs(self.distance.sym[0])``, this field is safe to interpolate near a surface edge. The signed ``distance`` field changes sign across the surface, and its zero-contour extends along the surface's infinite-line/plane PAST the finite edge; interpolating it (any non-nodal evaluation) and taking ``Abs`` produces a spurious near-zero valley beyond the edge. This unsigned field is ``>= 0`` everywhere and, beyond the edge, is simply the radial distance to the endpoint, so its interpolant never crosses zero there. Used by :meth:`influence_function`. Returns: MeshVariable with unsigned distance values at each mesh node. Access ``.sym[0]`` for use in expressions. """ if self.mesh is None: raise RuntimeError( f"Surface '{self.name}' requires a mesh to compute distance field. " "Set mesh in constructor or via surface.mesh = mesh" ) self._ensure_discretized() if self._distance_stale: self._compute_distance_field() self._distance_stale = False return self._abs_distance_var def _signed_distance_at(self, coords: np.ndarray) -> np.ndarray: """Exact signed distance from arbitrary query points to this surface. This is the geometric primitive underneath the ``distance`` / ``abs_distance`` fields, but evaluated at *any* points rather than only the mesh nodes. It does **not** touch or interpolate the P1 distance field — it re-runs the exact point-to-polyline (2D) / implicit-distance (3D) computation directly. That makes it safe to call on the centroids of a *refined* mesh during adaptation: the distance resolves itself at the new resolution instead of aliasing a coarse P1 field. Parameters ---------- coords : ndarray, shape (N, 2) or (N, 3) Query points in model (internal) coordinate space — the same space the surface control points live in (cf. ``mesh._coords``). Returns ------- ndarray, shape (N,) Signed distance at each query point (edge-clamped: beyond a finite edge it is the radial distance to the nearest endpoint). """ self._ensure_discretized() coords = np.asarray(coords, dtype=float) if self.is_2d: from underworld3.utilities.geometry_tools import ( signed_distance_pointcloud_polyline_2d ) coords_2d = coords[:, :2] if hasattr(self, "_vertices_2d") and self._vertices_2d is not None: vertices_2d = self._vertices_2d else: vertices_2d = self._pv_mesh.points[:, :2] return signed_distance_pointcloud_polyline_2d(coords_2d, vertices_2d) # 3D: pyvista implicit distance to the surface polydata pv = _require_pyvista() pv_pts = pv.PolyData(np.ascontiguousarray(coords[:, :3])) dist_result = pv_pts.compute_implicit_distance(self._pv_mesh) return np.asarray(dist_result.point_data["implicit_distance"])
[docs] def signed_distance(self, coords: np.ndarray) -> np.ndarray: """Exact **signed** distance from arbitrary points to the surface. Unlike ``distance`` (a P1 :class:`MeshVariable` sampled at the mesh nodes), this evaluates the exact geometry at *whatever* points you pass — so it stays accurate on a refined mesh. Positive on one side of the surface, negative on the other. Parameters ---------- coords : ndarray, shape (N, 2) or (N, 3) Query points in model coordinate space. Returns ------- ndarray, shape (N,) Signed distance at each point. """ return self._signed_distance_at(coords)
[docs] def unsigned_distance(self, coords: np.ndarray) -> np.ndarray: """Exact **unsigned**, edge-clamped distance from arbitrary points. ``abs()`` of :meth:`signed_distance`. Because the underlying distance is edge-clamped (it falls back to the nearest endpoint beyond a finite edge), this is the true distance to the *finite* surface — the right quantity for a distance-driven refinement metric. Parameters ---------- coords : ndarray, shape (N, 2) or (N, 3) Query points in model coordinate space. Returns ------- ndarray, shape (N,) Unsigned distance at each point (``>= 0``). """ return np.abs(self._signed_distance_at(coords))
@property def director(self) -> sympy.Matrix: r"""Unit surface-normal as a symbolic column vector — the TI director n̂. The gradient of a signed-distance field is its unit normal, so this is ``∇d / |∇d|`` built from the :attr:`distance` field. It is the natural **director** for :class:`~underworld3.constitutive_models.TransverseIsotropicFlowModel` (the weak-plane orientation): the same fault object that drives the refinement metric and the weak-zone viscosity also supplies the normal:: ti = uw.constitutive_models.TransverseIsotropicFlowModel stokes.constitutive_model = ti stokes.constitutive_model.Parameters.director = fault.director For a **planar** surface the signed distance is linear, so ``∇d`` is the exact constant unit normal everywhere. Near a curved surface or a finite edge it is the local unit normal wherever the field is smooth. The normalisation makes it robust where ``|∇d|`` drifts from 1 (interpolated distance, edge clamping). Returns ------- sympy.Matrix ``(dim, 1)`` unit normal expression, evaluatable on this surface's mesh (call :meth:`remap_to` first if the surface was built on a different mesh, e.g. an ``adapt`` parent). """ d = self.distance.sym[0] X = self.mesh.X dim = self.mesh.dim grad = [sympy.diff(d, X[i]) for i in range(dim)] norm = sympy.sqrt(sum(g ** 2 for g in grad)) + sympy.sympify(1e-30) return sympy.Matrix([g / norm for g in grad]) def _compute_distance_field(self) -> None: """Compute signed distance field from mesh nodes to surface. The signed distance is positive on one side of the surface and negative on the other. Helper functions like influence_function() use sympy.Abs() when unsigned distance is needed. For 3D surfaces: Uses pyvista's compute_implicit_distance. For 2D surfaces: Uses geometry_tools signed_distance_pointcloud_polyline_2d. """ if self._distance_var is None: # Use varsymbol for clean LaTeX display: d_{F} instead of {surf_fault_distance} # Always wrap symbol in braces for proper LaTeX grouping (e.g., d_{F_1} not d_F_1) self._distance_var = uw.discretisation.MeshVariable( f"surf_{self.name}_distance", self.mesh, 1, degree=self.mesh.degree, varsymbol=f"d_{{{self._symbol}}}", ) # Get mesh coordinates in model (internal) space. # Must use raw model coordinates (mesh._coords) rather than mesh.X.coords # because mesh.X.coords returns dimensional/scaled coordinates when units # are active, while surface points (self._pv_mesh) are stored in the same # coordinate space used to create them — typically model coordinates. coords = np.asarray(self.mesh._coords) # Exact signed distance at the mesh nodes (same primitive that # signed_distance()/unsigned_distance() expose for arbitrary points). distances = self._signed_distance_at(coords) # Companion UNSIGNED distance field. The per-node distance is already # edge-clamped (the segment/surface distance falls back to the nearest # ENDPOINT beyond the edge), so |distances| is the true, edge-aware # distance to the FINITE surface at every node. We store it separately # because the *signed* field is unsafe to interpolate near an edge: # its zero-contour follows the surface's infinite-line/plane and so # extends PAST the finite edge. A consumer that interpolates the signed # field (any non-nodal evaluation) and then takes Abs() gets a spurious # near-zero |distance| valley along that extension — i.e. the influence # bleeds out beyond the end of the surface. The unsigned field is >= 0 # and beyond the edge it is just the radial distance to the endpoint, # so its interpolant never crosses zero there. influence_function() # therefore uses this field, not Abs(signed). if self._abs_distance_var is None: self._abs_distance_var = uw.discretisation.MeshVariable( f"surf_{self.name}_absdistance", self.mesh, 1, degree=self.mesh.degree, varsymbol=f"|d_{{{self._symbol}}}|", ) with uw.synchronised_array_update(): self._distance_var.data[:, 0] = distances self._abs_distance_var.data[:, 0] = np.abs(distances) # --- Influence function ---
[docs] def influence_function( self, width: float, value_near: Union[float, sympy.Expr] = 1.0, value_far: Union[float, sympy.Expr] = 0.0, profile: str = "step", ) -> sympy.Expr: """Create level-set-like influence function based on distance. Creates a sympy expression that varies from value_near (at the surface) to value_far (far from the surface) based on the chosen profile. Uses the unsigned, edge-clamped distance field (:attr:`abs_distance`), so the influence is symmetric on both sides of the surface AND decays correctly beyond a finite edge (it does not bleed along the surface's line/plane past the end). For asymmetric behaviour, access the signed distance directly via ``surface.distance.sym[0]``. Parameters ---------- width : float Characteristic width of the transition zone. value_near : float or sympy.Expr, optional Value at/near the surface. Default is 1.0. value_far : float or sympy.Expr, optional Value far from the surface. Default is 0.0. profile : str, optional Transition profile type. One of ``"step"`` (sharp transition), ``"linear"`` (linear ramp), ``"gaussian"`` (smooth decay), or ``"smoothstep"`` (C1-continuous Hermite). Default is ``"step"``. Returns ------- sympy.Expr Expression that can be used in Underworld equations. Examples -------- Step function for fault zone viscosity: >>> eta = surface.influence_function( ... width=0.05, ... value_near=0.01, ... value_far=1.0, ... profile="step", ... ) >>> >>> # Gaussian decay for smooth transitions >>> eta = surface.influence_function( ... width=0.1, ... value_near=friction.sym, # Variable on surface ... value_far=1.0, ... profile="gaussian", ... ) """ # Accept quantities and convert to nondimensional mesh coordinates width = _to_nd_length(width) # Use the UNSIGNED (edge-clamped) distance FIELD, not Abs() of the signed # field. The signed field's zero-contour extends past a finite edge, so # Abs() of its interpolant lights up a spurious weak zone beyond the # surface end; the unsigned field is edge-aware between nodes too. See # abs_distance / _compute_distance_field. d = self.abs_distance.sym[0] if profile == "step": return sympy.Piecewise( (value_near, d < width), (value_far, True), ) elif profile == "linear": t = sympy.Max(0, 1 - d / width) return value_far + (value_near - value_far) * t elif profile == "gaussian": return value_far + (value_near - value_far) * sympy.exp(-(d / width) ** 2) elif profile == "smoothstep": # Hermite smoothstep: 3t^2 - 2t^3 for t in [0,1] t = sympy.Max(0, sympy.Min(1, 1 - d / width)) smooth = 3 * t**2 - 2 * t**3 return value_far + (value_near - value_far) * smooth else: raise ValueError( f"Unknown profile '{profile}'. " "Choose from: step, linear, gaussian, smoothstep" )
# --- Variables ---
[docs] def add_variable( self, name: str, size: int = 1, proxy_degree: int = 1, units: Optional[str] = None, mask_width: Optional[float] = None, mask_profile: str = "gaussian", ) -> SurfaceVariable: """Add a variable on surface vertices. Creates a SurfaceVariable stored in pyvista point_data with symbolic access via .sym for use in expressions. Args: name: Variable name size: Number of components (1 for scalar, 3 for vector) proxy_degree: Degree of proxy MeshVariable for .sym access units: Optional units for this variable (e.g., "Pa", "m/s") mask_width: Width for distance-based mask (enables .mask property) mask_profile: Profile for mask function ("step", "linear", "gaussian", "smoothstep") Returns: SurfaceVariable that can be modified via .data and used via .sym Example: >>> # Variable with units and mask >>> friction = surface.add_variable("friction", size=1, mask_width=0.1) >>> friction.data[:] = 0.6 >>> >>> # Use in expressions with explicit masking >>> eta = friction.sym[0] * friction.mask """ self._ensure_discretized() if name in self._variables: raise ValueError( f"Variable '{name}' already exists on surface '{self.name}'" ) var = SurfaceVariable( name, self, size, proxy_degree, units=units, mask_width=mask_width, mask_profile=mask_profile ) self._variables[name] = var return var
[docs] def get_variable(self, name: str) -> SurfaceVariable: """Get an existing variable by name. Args: name: Variable name Returns: SurfaceVariable Raises: KeyError: If variable doesn't exist """ return self._variables[name]
@property def variables(self) -> Dict[str, SurfaceVariable]: """Dictionary of all variables on this surface.""" return self._variables # --- VTK I/O ---
[docs] def save(self, filename: str) -> None: """Save surface with all variables to VTK file. All SurfaceVariable data is automatically included in the VTK file as point_data arrays. Args: filename: Output path (.vtk or .vtp) """ self._ensure_discretized() if self._pv_mesh is None: raise RuntimeError( f"Surface '{self.name}' must be discretized before saving" ) self._pv_mesh.save(str(filename))
# --- Mesh Adaptation Support --- def _on_mesh_adapted(self, adapted_mesh: "Mesh") -> None: """Called by mesh.adapt() to update after mesh adaptation. Marks the distance field as stale so it will be recomputed on next access. The surface geometry (control points, pyvista mesh) is unchanged - only the cached distance values need updating. The distance MeshVariable itself is reinitialized by mesh.adapt() along with all other MeshVariables - we just need to mark the data as stale. Args: adapted_mesh: The mesh (same object, updated internals) """ # Mark distance as stale - will be recomputed on next access # The MeshVariable stays in mesh._vars and gets reinitialized by adapt() # just like any other variable (same pattern as swarm proxy variables) self._distance_stale = True # Mark all variable proxies as stale (they project to mesh nodes) self._mark_all_proxies_stale()
[docs] def remap_to(self, new_mesh: "Mesh") -> "Surface": """Re-home this surface onto a different mesh and return ``self``. The surface **geometry** (control points, polyline/polydata vertices) is unchanged — only the mesh its *fields* are computed against changes. Cached distance fields (which were bound to the old mesh) are dropped so they recompute, at **exact** distance, on the new mesh's nodes on next access. This is the companion to :meth:`Mesh.adapt` for the nested (child- returning) engines (``engine="sbr"``/``"nvb"``): ``adapt`` leaves the base mesh untouched and returns a refined *child*, so — unlike the in-place :meth:`Mesh.remesh` — registered surfaces are not auto-notified. Re-homing lets **one** fault object drive both the geometry-only refinement metric (:meth:`refinement_metric_function`, evaluated before the child exists) and the child-side constitutive model (its ``distance`` / normal live on the child):: fault = uw.meshing.Surface("fault", base, trace) metric = fault.refinement_metric_function(h_near, h_far, width) child = base.adapt(metric, engine="nvb") fault.remap_to(child) # distance now on the child eta = fault.influence_function(width, value_near=1e-3, value_far=1.0) Parameters ---------- new_mesh : Mesh The mesh to re-home onto (e.g. an ``adapt`` child). Returns ------- Surface ``self`` (for chaining). """ if new_mesh is self.mesh: return self old_mesh = self.mesh if old_mesh is not None and hasattr(old_mesh, "unregister_surface"): old_mesh.unregister_surface(self) self.mesh = new_mesh self._dim = None # re-detect from the new mesh if new_mesh is not None and hasattr(new_mesh, "register_surface"): new_mesh.register_surface(self) # Drop cached fields bound to the OLD mesh; the geometry (pyvista mesh / # 2D vertices) is mesh-independent and stays as-is. .distance / .abs_distance # rebuild lazily on the new mesh's nodes (exact) on next access. self._distance_var = None self._abs_distance_var = None self._distance_stale = True self._mark_all_proxies_stale() return self
[docs] def refinement_metric( self, h_near, h_far, width=None, profile: str = "linear", name: str = None, ) -> "MeshVariable": r"""Create a metric field for mesh adaptation based on distance from this surface. Returns a MeshVariable containing refinement metric values that can be passed directly to mesh.adapt(). Higher metric values produce finer mesh (smaller elements). Parameters ---------- h_near : float or quantity Target edge length near the surface (smaller = finer mesh). Accepts a plain float (in nondimensional mesh coordinates) or a ``uw.quantity`` (e.g., ``uw.quantity(3, "km")``) which is automatically nondimensionalised. h_far : float or quantity Target edge length far from the surface (larger = coarser mesh). Same unit handling as *h_near*. width : float or quantity, optional Distance over which to transition from h_near to h_far. If None, defaults to 2 * h_far. Same unit handling as *h_near*. profile : str, optional Transition profile: "linear", "smoothstep", or "gaussian". Default is "linear". name : str, optional Name for the metric MeshVariable. Defaults to "{surface_name}_metric". Returns ------- MeshVariable Scalar MeshVariable containing refinement metric values. Notes ----- **Metric Tensor Mathematics** For isotropic mesh adaptation, MMG/PETSc uses a metric tensor: .. math:: M = h^{-2} \cdot I where :math:`h` is the target edge length and :math:`I` is the identity matrix. This relationship is **dimension-independent** - the same formula applies in 2D and 3D because the metric defines edge lengths, not areas or volumes. The adaptation algorithm seeks to make all edges have unit length in the metric space (i.e., :math:`\mathbf{e}^T M \mathbf{e} = 1` for edge vector :math:`\mathbf{e}`). Higher metric values produce smaller elements. **Refinement Ratio** The refinement ratio is ``h_far / h_near``. For example, if ``h_near=0.01`` and ``h_far=0.1``, the mesh will be ~10× finer near the surface. **Element Count Control** To maintain approximately the same total element count while refining near the surface, the far-field should use similar h to the original mesh's cell size. The refined region is small, so coarsening the far-field slightly can compensate. References ---------- .. [1] MMG Platform documentation: http://www.mmgtools.org/ .. [2] Alauzet, F. "Metric-based anisotropic mesh adaptation" (2010) Examples -------- >>> fault = uw.meshing.Surface("fault", mesh, fault_points) >>> fault.discretize() >>> >>> # With plain floats (nondimensional coordinates) >>> metric = fault.refinement_metric(h_near=0.005, h_far=0.05) >>> mesh.adapt(metric) >>> >>> # With quantities (automatic nondimensionalisation) >>> metric = fault.refinement_metric( ... h_near=uw.quantity(3, "km"), ... h_far=uw.quantity(30, "km"), ... width=uw.quantity(10, "km"), ... ) >>> mesh.adapt(metric) """ if self.mesh is None: raise RuntimeError( f"Surface '{self.name}' must be attached to a mesh to create refinement metric" ) # Accept quantities and convert to nondimensional mesh coordinates h_near = _to_nd_length(h_near) h_far = _to_nd_length(h_far) if width is not None: width = _to_nd_length(width) else: width = 2.0 * h_far # Create metric MeshVariable if name is None: name = f"{self.name}_metric" metric = uw.discretisation.MeshVariable(name, self.mesh, 1, degree=1) # Get distance values directly from the distance MeshVariable dist_var = self.distance dist_values = np.abs(dist_var.data[:, 0]) # Compute target edge lengths based on distance profile h_values = _profile_to_edge_lengths(dist_values, h_near, h_far, width, profile) # Convert to metric tensor: M = 1/h² × I (isotropic) # This is dimension-independent: same formula for 2D and 3D # The metric defines edge lengths, not areas/volumes # Higher metric values → finer mesh (smaller elements) metric.data[:, 0] = 1.0 / (h_values ** 2) return metric
[docs] def refinement_metric_function( self, h_near, h_far, width=None, profile: str = "linear", ): r"""Return a **callable** refinement metric based on exact distance. This is the self-resolving companion to :meth:`refinement_metric`. Where ``refinement_metric`` bakes ``M = 1/h²`` into a P1 :class:`MeshVariable` sampled at the *base* mesh nodes, this returns a plain function ``metric(coords) -> M`` that computes the **exact** distance (:meth:`unsigned_distance`) at whatever points it is handed and maps it through the same profile. Pass it straight to :meth:`Mesh.adapt`, which re-evaluates the callable at the cell centroids of **each refined level**. The metric therefore resolves itself at the new resolution as mesh levels appear — it never interpolates a coarse P1 field, so a thin feature refines to a clean, uniform-width band instead of the *patchy* levels a P1-interpolated ``1/h²`` produces (that peaked quantity aliases across a base cell). Parameters ---------- h_near, h_far : float or quantity Target edge length near / far from the surface (same unit handling as :meth:`refinement_metric`). width : float or quantity, optional Transition distance; defaults to ``2 * h_far``. profile : {"linear", "smoothstep", "gaussian"} Distance-to-size profile. Default ``"linear"``. Returns ------- callable ``metric(coords: ndarray[N, dim]) -> ndarray[N]`` giving ``M = 1/h²`` at each query point. Examples -------- >>> fault = uw.meshing.Surface("fault", mesh, fault_points) >>> fault.discretize() >>> metric = fault.refinement_metric_function(h_near=0.005, h_far=0.05, ... width=0.02) >>> child = mesh.adapt(metric, max_levels=3, engine="nvb") """ if self.mesh is None: raise RuntimeError( f"Surface '{self.name}' must be attached to a mesh to create " "a refinement metric" ) h_near = _to_nd_length(h_near) h_far = _to_nd_length(h_far) width = _to_nd_length(width) if width is not None else 2.0 * h_far def metric(coords: np.ndarray) -> np.ndarray: d = self.unsigned_distance(coords) h = _profile_to_edge_lengths(d, h_near, h_far, width, profile) return 1.0 / (h ** 2) return metric
# --- Factory methods ---
[docs] @classmethod def from_trace( cls, name: str, mesh: "Mesh", trace_points: np.ndarray, depth_range: tuple, n_depth_layers: int = None, depth_spacing=None, trace_resolution=None, smoothing: float = 0.0, dip: float = None, dip_direction: str = "right", symbol: str = None, ) -> "Surface": """Create a surface by extruding a polyline trace to depth. This is the recommended way to create fault surfaces from map-view trace data. The trace polyline is optionally interpolated to a target resolution, then extruded to depth to create a ruled surface with explicit triangulation. When *dip* is specified, the surface follows a parabolic curve from vertical at the surface to the given dip angle at maximum depth. The offset is applied perpendicular to the local trace direction. This produces geologically realistic fault geometry where faults are steep near the surface and flatten at depth. For geographic meshes, trace points are (lon, lat) in degrees and depths are physical distances below the ellipsoid surface. The ellipsoid parameters are read from the mesh automatically. For Cartesian meshes, trace points are (x, y) in model coordinates and depth is the z-coordinate (downward positive). Parameters ---------- name : str Surface identifier. mesh : Mesh Computational mesh. For geographic meshes the ellipsoid is read from ``mesh.CoordinateSystem.ellipsoid``. trace_points : ndarray, shape (N, 2) Surface trace polyline. For geographic meshes: ``(lon, lat)`` in degrees. For Cartesian meshes: ``(x, y)`` in the same coordinate space as control points (model or physical, depending on whether units are active). depth_range : tuple of (min_depth, max_depth) Depth extent for extrusion (positive downward). Each value can be a plain float (km for geographic, model coords for Cartesian) or a ``uw.quantity``. n_depth_layers : int, optional Number of depth levels (including surface and deepest). If None, computed from *depth_spacing*. If both are None, defaults to 7 layers. depth_spacing : float or quantity, optional Spacing between depth layers. Alternative to *n_depth_layers*. trace_resolution : float or quantity, optional Target point spacing along the trace. If None the trace is used as-is. For geographic meshes this is in km; for Cartesian meshes it is in model coordinates. smoothing : float Spline smoothing parameter for ``splprep`` (0 = interpolate exactly through points, >0 allows smoothing). dip : float, optional Dip angle in degrees measured from horizontal. ``90`` means vertical (no offset), ``45`` means equal horizontal and vertical extent at maximum depth. When *None* (default), the surface is extruded vertically. The surface profile is parabolic: vertical at the surface (``depth = depth_range[0]``) and reaching *dip* at the deepest layer. dip_direction : str Direction of the dip offset relative to the trace: ``"right"`` (default) or ``"left"`` when looking along the trace from first to last point. symbol : str, optional Short LaTeX symbol for expressions (e.g. ``"F"``). Returns ------- Surface A fully discretized surface ready for use. Examples -------- >>> # Geographic mesh — trace in lon/lat, depths in km >>> trace = np.column_stack([lon_points, lat_points]) >>> s = Surface.from_trace( ... "fault_1", mesh, trace, ... depth_range=(uw.quantity(0, "km"), uw.quantity(30, "km")), ... depth_spacing=uw.quantity(5, "km"), ... trace_resolution=uw.quantity(3, "km"), ... dip=60, # 60° from horizontal, parabolic profile ... ) """ pv = _require_pyvista() from scipy.interpolate import splprep, splev trace_points = np.asarray(trace_points) if trace_points.ndim != 2 or trace_points.shape[1] != 2: raise ValueError( f"trace_points must be (N, 2) array, got shape {trace_points.shape}" ) if trace_points.shape[0] < 2: raise ValueError("Need at least 2 trace points") # --- Detect geographic vs Cartesian --- cs = getattr(mesh, "CoordinateSystem", None) ellipsoid = getattr(cs, "ellipsoid", None) if cs is not None else None is_geographic = ellipsoid is not None # --- Parse depth range --- d_min_raw, d_max_raw = depth_range if is_geographic: # For geographic: depths are in km (or quantities → km) d_min_km = _depth_to_km(d_min_raw) d_max_km = _depth_to_km(d_max_raw) else: # For Cartesian: depths in model coordinates d_min = _to_nd_length(d_min_raw) if not isinstance(d_min_raw, (int, float)) else float(d_min_raw) d_max = _to_nd_length(d_max_raw) if not isinstance(d_max_raw, (int, float)) else float(d_max_raw) # --- Determine depth layers --- if n_depth_layers is not None: n_layers = int(n_depth_layers) elif depth_spacing is not None: if is_geographic: ds_km = _depth_to_km(depth_spacing) n_layers = max(2, int(round((d_max_km - d_min_km) / ds_km)) + 1) else: ds = _to_nd_length(depth_spacing) if not isinstance(depth_spacing, (int, float)) else float(depth_spacing) n_layers = max(2, int(round((d_max - d_min) / ds)) + 1) else: n_layers = 7 # sensible default if is_geographic: depth_values_km = np.linspace(d_min_km, d_max_km, n_layers) else: depth_values = np.linspace(d_min, d_max, n_layers) # --- Order trace points along the curve --- trace_ordered = _order_polyline(trace_points) # --- Interpolate trace if requested --- if trace_resolution is not None: if is_geographic: target_spacing_km = _depth_to_km(trace_resolution) else: target_spacing_km = None target_spacing_nd = ( _to_nd_length(trace_resolution) if not isinstance(trace_resolution, (int, float)) else float(trace_resolution) ) trace_ordered = _interpolate_trace( trace_ordered, target_spacing=target_spacing_km if is_geographic else target_spacing_nd, smoothing=smoothing, is_geographic=is_geographic, ) n_trace = len(trace_ordered) # --- Parabolic dip offset --- # When dip < 90°, each depth layer is offset perpendicular to the # trace by a parabolic amount: offset(z) = z² / (2·z_max·tan(dip)) # This gives vertical at the surface (z=0) and the specified dip at # maximum depth. has_dip = dip is not None and float(dip) < 90.0 if has_dip: dip_deg = float(dip) perp = _compute_trace_perpendicular( trace_ordered, direction=dip_direction, is_geographic=is_geographic ) if is_geographic: z_total_km = d_max_km - d_min_km else: z_total = d_max - d_min # --- Build 3D points for each depth layer --- if is_geographic: # Extract km floats — ellipsoid["a"] is uw.quantity when units active a_raw = ellipsoid["a"] b_raw = ellipsoid["b"] a_km = float(a_raw.to("km").magnitude) if hasattr(a_raw, "to") else float(a_raw) b_km = float(b_raw.to("km").magnitude) if hasattr(b_raw, "to") else float(b_raw) from underworld3.coordinates import geographic_to_cartesian all_points_km = [] for depth_km in depth_values_km: if has_dip and z_total_km > 0: z = depth_km - d_min_km offset_km = z ** 2 / (2 * z_total_km * np.tan(np.radians(dip_deg))) lon_layer = trace_ordered[:, 0] + offset_km * perp[:, 0] lat_layer = trace_ordered[:, 1] + offset_km * perp[:, 1] else: lon_layer = trace_ordered[:, 0] lat_layer = trace_ordered[:, 1] x, y, z_cart = geographic_to_cartesian(lon_layer, lat_layer, depth_km, a_km, b_km) all_points_km.append(np.column_stack([x, y, z_cart])) all_points_km = np.vstack(all_points_km) # Nondimensionalise all_points_nd = np.asarray( uw.non_dimensionalise(uw.quantity(all_points_km, "km")) ) else: # Cartesian: extrude along z (downward positive → negative z) all_points = [] for depth_val in depth_values: if has_dip and z_total > 0: z_offset = depth_val - d_min offset = z_offset ** 2 / (2 * z_total * np.tan(np.radians(dip_deg))) x_layer = trace_ordered[:, 0] + offset * perp[:, 0] y_layer = trace_ordered[:, 1] + offset * perp[:, 1] else: x_layer = trace_ordered[:, 0] y_layer = trace_ordered[:, 1] layer = np.column_stack([ x_layer, y_layer, np.full(n_trace, -depth_val), ]) all_points.append(layer) all_points_nd = np.vstack(all_points) # --- Build explicit triangulation --- # Grid is n_layers rows × n_trace columns # Vertex index: layer * n_trace + i faces = [] for layer in range(n_layers - 1): for i in range(n_trace - 1): # Quad: (layer, i) → (layer, i+1) → (layer+1, i+1) → (layer+1, i) v00 = layer * n_trace + i v10 = layer * n_trace + (i + 1) v01 = (layer + 1) * n_trace + i v11 = (layer + 1) * n_trace + (i + 1) # Two triangles per quad faces.append([3, v00, v10, v01]) faces.append([3, v10, v11, v01]) faces = np.array(faces, dtype=np.int64) # --- Create Surface with explicit mesh --- surface = cls(name, mesh, symbol=symbol) surface._control_points = all_points_nd.copy() pv_mesh = pv.PolyData(all_points_nd, faces=faces.ravel()) pv_mesh.compute_normals(inplace=True) surface._pv_mesh = pv_mesh surface._discretization_stale = False surface._distance_stale = True return surface
[docs] @classmethod def from_vtk( cls, filename: str, mesh: "Mesh" = None, name: str = None, ) -> "Surface": """Load surface from VTK file. All point_data arrays in the VTK file are automatically wrapped as SurfaceVariables. Args: filename: Path to VTK file (.vtk or .vtp) mesh: Computational mesh (required for .sym access) name: Name for the surface. If None, uses filename stem. Returns: Surface with all variables from VTK file Raises: FileNotFoundError: If file doesn't exist ImportError: If pyvista not available """ pv = _require_pyvista() filepath = Path(filename) if not filepath.exists(): raise FileNotFoundError(f"VTK file not found: {filename}") if name is None: name = filepath.stem surface = cls(name, mesh) surface._pv_mesh = pv.read(str(filepath)) surface._discretization_stale = False surface._distance_stale = True # Compute normals if not present if "Normals" not in surface._pv_mesh.point_data: surface._pv_mesh.compute_normals(inplace=True) # Wrap existing point_data as SurfaceVariables for data_name in surface._pv_mesh.point_data.keys(): if data_name not in ["Normals"]: # Skip built-in normals data = surface._pv_mesh.point_data[data_name] size = 1 if data.ndim == 1 else data.shape[1] surface._variables[data_name] = SurfaceVariable( data_name, surface, size, existing=True ) return surface
# --- Compatibility ---
[docs] def compute_normals(self, consistent_normals: bool = True) -> None: """Recompute vertex and face normals. Args: consistent_normals: If True, attempt to make normals consistently oriented """ self._ensure_discretized() if self._pv_mesh is not None: self._pv_mesh.compute_normals( inplace=True, consistent_normals=consistent_normals ) self._mark_all_proxies_stale()
[docs] def flip_normals(self) -> None: """Flip the direction of all normals. This directly negates the normal vectors. Note that pyvista's compute_normals() uses consistent orientation so we can't rely on flip_faces() to reverse normals - we must negate them explicitly. """ self._ensure_discretized() if self._pv_mesh is not None: # Ensure normals are computed if "Normals" not in self._pv_mesh.point_data: self._pv_mesh.compute_normals(inplace=True) # Directly negate the point normals if "Normals" in self._pv_mesh.point_data: self._pv_mesh.point_data["Normals"] = ( -self._pv_mesh.point_data["Normals"] ) # Also negate cell normals if present if "Normals" in self._pv_mesh.cell_data: self._pv_mesh.cell_data["Normals"] = ( -self._pv_mesh.cell_data["Normals"] ) self._mark_all_proxies_stale()
# --- Representation --- def __repr__(self) -> str: status = "discretized" if self.is_discretized else "not discretized" mesh_name = self.mesh.name if self.mesh is not None else "None" n_vars = len(self._variables) return ( f"Surface(name='{self.name}', " f"mesh='{mesh_name}', " f"n_vertices={self.n_vertices}, " f"n_triangles={self.n_triangles}, " f"n_variables={n_vars}, " f"status={status})" )
[docs] class SurfaceCollection: """Collection of surfaces for combined operations. Manages multiple Surface objects and provides methods to: - Compute minimum distance from mesh points to any surface - Transfer surface normals to mesh variables via nearest-neighbor - Create combined influence functions Example: >>> surfaces = uw.meshing.SurfaceCollection() >>> surfaces.add(fault1) >>> surfaces.add(fault2) >>> >>> # Compute combined distance field >>> dist = surfaces.compute_distance_field(mesh) >>> >>> # Combined influence function >>> eta = surfaces.influence_function( ... width=0.05, ... value_near=0.01, ... value_far=1.0, ... ) """
[docs] def __init__(self): """Create an empty surface collection.""" self.surfaces: Dict[str, Surface] = {} self._distance_var: Optional[uw.discretisation.MeshVariable] = None self._distance_stale = True
[docs] def add(self, surface: Surface) -> None: """Add a surface to the collection. Args: surface: Surface to add Raises: ValueError: If surface with same name already exists """ if surface.name in self.surfaces: raise ValueError( f"Surface '{surface.name}' already exists in collection. " "Use a different name or remove the existing surface." ) self.surfaces[surface.name] = surface self._distance_stale = True
[docs] def add_from_vtk( self, filename: str, mesh: "Mesh" = None, name: str = None, ) -> Surface: """Load and add a surface from VTK file. Args: filename: Path to VTK file mesh: Computational mesh name: Name for the surface. If None, uses filename stem. Returns: The loaded Surface """ surface = Surface.from_vtk(filename, mesh, name) self.add(surface) return surface
[docs] def remove(self, name: str) -> Surface: """Remove and return a surface from the collection. Args: name: Name of surface to remove Returns: The removed Surface Raises: KeyError: If surface not found """ surface = self.surfaces.pop(name) self._distance_stale = True return surface
def __getitem__(self, name: str) -> Surface: """Get a surface by name.""" return self.surfaces[name] def __iter__(self): """Iterate over surface names.""" return iter(self.surfaces) def __len__(self): """Number of surfaces in collection.""" return len(self.surfaces) @property def names(self) -> List[str]: """List of surface names.""" return list(self.surfaces.keys())
[docs] def compute_distance_field( self, mesh: "Mesh", distance_var: "MeshVariable" = None, variable_name: str = "surface_distance", ) -> "MeshVariable": """Compute minimum distance from mesh points to any surface. Args: mesh: The mesh to compute distances on distance_var: Optional existing MeshVariable to store results variable_name: Name for new variable if distance_var is None Returns: MeshVariable with distance values (scalar) """ pv = _require_pyvista() if len(self.surfaces) == 0: raise ValueError("Cannot compute distance field: no surfaces in collection") # Ensure all surfaces are discretized for name, surface in self.surfaces.items(): surface._ensure_discretized() if not surface.is_discretized: raise ValueError( f"Surface '{name}' must be discretized before computing distances" ) # Create or use existing variable if distance_var is None: if self._distance_var is None or self._distance_stale: self._distance_var = uw.discretisation.MeshVariable( variable_name, mesh, 1, degree=mesh.degree ) distance_var = self._distance_var # Get mesh coordinates in model (internal) space # to match surface point coordinates coords = np.asarray(mesh._coords) pv_mesh = pv.PolyData(coords) # Initialize with large distance with uw.synchronised_array_update(): distance_var.data[:, 0] = 1e10 # Compute unsigned distance to each surface, take minimum. # Unlike single Surface (which stores signed distance), collections # store unsigned because signed distance is ambiguous with multiple surfaces. for surface in self.surfaces.values(): dist_result = pv_mesh.compute_implicit_distance(surface._pv_mesh) surface_dist = np.abs(dist_result.point_data["implicit_distance"]) distance_var.data[:, 0] = np.minimum( distance_var.data[:, 0], surface_dist ) self._distance_stale = False return distance_var
[docs] def transfer_normals( self, mesh: "Mesh", coords: np.ndarray = None, normal_var: "MeshVariable" = None, variable_name: str = "surface_normals", ) -> "MeshVariable": """Transfer surface normals to mesh points via nearest-neighbor. For each mesh point, finds the closest surface face and copies that face's normal vector. Args: mesh: The mesh to transfer normals to coords: Optional coordinates to query. If None, uses mesh.data normal_var: Optional existing MeshVariable variable_name: Name for new variable Returns: MeshVariable with normal vectors (3 components) """ if len(self.surfaces) == 0: raise ValueError("Cannot transfer normals: no surfaces in collection") # Ensure all surfaces are discretized for name, surface in self.surfaces.items(): surface._ensure_discretized() # Get query coordinates in model (internal) space # to match surface point coordinates if coords is None: coords = np.asarray(mesh._coords) elif hasattr(coords, 'magnitude'): coords = coords.magnitude elif hasattr(coords, '__array__'): coords = np.asarray(coords) # Create or validate output variable if normal_var is None: normal_var = uw.discretisation.MeshVariable( variable_name, mesh, 3, degree=mesh.degree ) # Build combined arrays of all face centers and normals all_centers = [] all_normals = [] for surface in self.surfaces.values(): all_centers.append(surface.face_centers) all_normals.append(surface.face_normals) combined_centers = np.vstack(all_centers) combined_normals = np.vstack(all_normals) # Build KDTree and query kdtree = uw.kdtree.KDTree(combined_centers) _, closest_idx = kdtree.query(coords) # Transfer normals with uw.synchronised_array_update(): normal_var.data[:] = combined_normals[closest_idx.flatten()] return normal_var
[docs] def influence_function( self, mesh: "Mesh", width: float, value_near: Union[float, sympy.Expr] = 1.0, value_far: Union[float, sympy.Expr] = 0.0, profile: str = "step", ) -> sympy.Expr: """Create combined influence function from all surfaces. Args: mesh: Computational mesh width: Characteristic width of the transition zone value_near: Value at/near surfaces value_far: Value far from surfaces profile: Transition profile (step, linear, gaussian, smoothstep) Returns: sympy.Expr based on combined distance field """ distance_var = self.compute_distance_field(mesh) d = distance_var.sym[0] if profile == "step": return sympy.Piecewise( (value_near, d < width), (value_far, True), ) elif profile == "linear": t = sympy.Max(0, 1 - d / width) return value_far + (value_near - value_far) * t elif profile == "gaussian": return value_far + (value_near - value_far) * sympy.exp(-(d / width) ** 2) elif profile == "smoothstep": t = sympy.Max(0, sympy.Min(1, 1 - d / width)) smooth = 3 * t**2 - 2 * t**3 return value_far + (value_near - value_far) * smooth else: raise ValueError(f"Unknown profile '{profile}'")
[docs] def refinement_metric( self, mesh: "Mesh", h_near, h_far, width=None, profile: str = "linear", variable_name: str = "fault_metric", ) -> "MeshVariable": r"""Create a combined refinement metric for mesh adaptation. Computes a single metric field using the minimum unsigned distance across all surfaces in the collection. This creates **2 MeshVariables** (distance + metric) regardless of how many surfaces are in the collection, avoiding the O(N²) DM-rebuild cost of computing per-surface metrics in a loop. Parameters ---------- mesh : Mesh The computational mesh. h_near : float or quantity Target edge length near any surface. h_far : float or quantity Target edge length far from surfaces. width : float or quantity, optional Transition distance. Defaults to ``2 * h_far``. profile : str ``"linear"``, ``"smoothstep"``, or ``"gaussian"``. variable_name : str Name for the metric MeshVariable. Returns ------- MeshVariable Scalar metric field suitable for ``mesh.adapt()``. """ h_near = _to_nd_length(h_near) h_far = _to_nd_length(h_far) width = _to_nd_length(width) if width is not None else 2.0 * h_far # Compute (or reuse) the collection-wide minimum unsigned distance distance_var = self.compute_distance_field(mesh) metric = uw.discretisation.MeshVariable(variable_name, mesh, 1, degree=1) dist_values = np.abs(distance_var.data[:, 0]) h_values = _profile_to_edge_lengths(dist_values, h_near, h_far, width, profile) metric.data[:, 0] = 1.0 / (h_values ** 2) return metric
[docs] def compute_nearest_fields( self, mesh: "Mesh", fault_width=None, normal_var_name: str = "fault_n", id_var_name: str = "fault_id", distance_var_name: str = "d_faults", weight_var_name: str = "fault_w", ) -> dict: """Compute per-node nearest-surface fields for rheology. For every mesh node, finds the closest surface vertex across all surfaces and returns: - **normal**: unit normal of the nearest surface vertex - **id**: surface identifier for the nearest surface - **distance**: minimum unsigned distance to any surface - **weight** *(optional)*: Gaussian influence ``exp(-0.5*(d/width)²)`` This consolidates what would otherwise be manual KDTree code into a single reusable call, and creates only 3–4 MeshVariables total. Parameters ---------- mesh : Mesh The computational mesh. fault_width : float or quantity, optional Gaussian half-width for the weight field. If ``None``, the weight field is omitted from the returned dict. normal_var_name, id_var_name, distance_var_name, weight_var_name : str Names for the output MeshVariables. Returns ------- dict Keys: ``"normal"``, ``"id"``, ``"distance"``, and optionally ``"weight"``. Values are MeshVariable instances. """ if len(self.surfaces) == 0: raise ValueError("Cannot compute nearest fields: no surfaces in collection") # Ensure discretized for name, surface in self.surfaces.items(): surface._ensure_discretized() # --- Gather combined vertex data from all surfaces --- all_vertices = [] all_normals = [] all_ids = [] for sid, surface in self.surfaces.items(): pts = np.array(surface._pv_mesh.points) # ND model coords norms = surface.normals # triangulation normals # Use a numeric ID: try float(sid), fall back to index try: numeric_id = float(sid) except (TypeError, ValueError): numeric_id = float(len(all_vertices)) all_vertices.append(pts) all_normals.append(norms) all_ids.append(np.full(len(pts), numeric_id)) combined_vertices = np.vstack(all_vertices) combined_normals = np.vstack(all_normals) combined_ids = np.concatenate(all_ids) # --- KDTree query --- kdtree = uw.kdtree.KDTree(combined_vertices) mesh_coords = np.asarray(mesh._coords) _, closest_idx = kdtree.query(mesh_coords) closest_idx = closest_idx.flatten() # --- Create output MeshVariables --- dim = mesh.dim fault_normal = uw.discretisation.MeshVariable( normal_var_name, mesh, dim, degree=1, varsymbol=r"\hat{\mathbf{n}}_f", ) fault_id_var = uw.discretisation.MeshVariable( id_var_name, mesh, 1, degree=1, varsymbol=r"f_{id}", ) with uw.synchronised_array_update(): fault_normal.data[:] = combined_normals[closest_idx] fault_id_var.data[:, 0] = combined_ids[closest_idx] # --- Distance field (reuse collection's cached distance) --- distance_var = self.compute_distance_field( mesh, variable_name=distance_var_name, ) result = { "normal": fault_normal, "id": fault_id_var, "distance": distance_var, } # --- Optional Gaussian weight --- if fault_width is not None: fw_nd = _to_nd_length(fault_width) dist_values = np.abs(distance_var.data[:, 0]) weight_var = uw.discretisation.MeshVariable( weight_var_name, mesh, 1, degree=1, varsymbol=r"w_f", ) weight_var.data[:, 0] = np.exp(-0.5 * (dist_values / fw_nd) ** 2) result["weight"] = weight_var return result
# --- Backward compatibility ---
[docs] def create_weakness_function( self, distance_var: "MeshVariable", fault_width: float, eta_weak: float = 0.01, eta_background: float = 1.0, ) -> sympy.Expr: """Create Piecewise viscosity function for fault weakness. DEPRECATED: Use influence_function() instead. Args: distance_var: MeshVariable containing distances fault_width: Width of the weak zone eta_weak: Viscosity within weak zone eta_background: Viscosity outside weak zone Returns: sympy.Piecewise expression """ import warnings warnings.warn( "create_weakness_function is deprecated. Use influence_function() instead.", DeprecationWarning, stacklevel=2, ) return sympy.Piecewise( (eta_weak, distance_var.sym[0] < fault_width), (eta_background, True), )
def __repr__(self) -> str: surface_strs = [ f" {name}: {surface}" for name, surface in self.surfaces.items() ] surfaces_repr = "\n".join(surface_strs) if surface_strs else " (empty)" return f"SurfaceCollection(\n{surfaces_repr}\n)"
# --------------------------------------------------------------------------- # Anisotropic fault metric tensor (for the supplied-tensor r-adapt mover) # --------------------------------------------------------------------------- def _fault_seg_distance_sym(X, a, b): """Analytic point-to-segment distance (sympy) for the 2D segment a→b. A pure function of the symbolic coordinates ``X`` and the FIXED segment endpoints, so it re-evaluates exactly (Eulerian) wherever sampled — the property the fault metric needs (a nodal distance field would bridge the sub-cell dip and convect under iteration).""" abx, aby = b[0] - a[0], b[1] - a[1] ab2 = float(abx * abx + aby * aby) if ab2 == 0.0: return sympy.sqrt((X[0] - a[0]) ** 2 + (X[1] - a[1]) ** 2) t = sympy.Min(1, sympy.Max( 0, ((X[0] - a[0]) * abx + (X[1] - a[1]) * aby) / ab2)) return sympy.sqrt((X[0] - (a[0] + t * abx)) ** 2 + (X[1] - (a[1] + t * aby)) ** 2) def _fault_collect_polylines(faults): """Group ``faults`` into a list of per-fault segment lists. Returns ``[[(a,b), ...], ...]`` — one inner list per fault, holding the consecutive segments of that fault's polyline. Preserving the per-fault grouping matters for the comb metric, whose teeth are placed at distances from each fault's MIN-distance (so a curved/polyline fault gets bands that follow the curve, not a tangle of per-segment bands). ``faults`` may be a single :class:`Surface`, a single segment / polyline array, or a list mixing those. A :class:`Surface` contributes the segments of its control-point polyline (model-space, matching ``mesh.X``); an ``(N, 2)``/``(N, 3)`` array a polyline (``N≥2``).""" if isinstance(faults, Surface) or hasattr(faults, "ndim"): items = [faults] else: items = list(faults) polylines = [] for item in items: if isinstance(item, Surface): cp = item._control_points # model space ≡ mesh.X coords if cp is None: raise ValueError( f"Surface {item.name!r} has no control points") pts = np.asarray(cp, dtype=float)[:, :2] else: pts = np.asarray(item, dtype=float) if pts.ndim != 2 or pts.shape[0] < 2 or pts.shape[1] not in (2, 3): raise ValueError( "fault segment must be an (N>=2, 2|3) array of points; " f"got shape {pts.shape}") pts = pts[:, :2] polylines.append([(pts[k], pts[k + 1]) for k in range(len(pts) - 1)]) return polylines def _fault_collect_segments(faults): """Flatten ``faults`` into a single list of ``(a, b)`` 2D endpoint pairs (segment grouping discarded — used by the anisotropic tensor builder, where each segment contributes its own normal-aligned bump).""" return [seg for poly in _fault_collect_polylines(faults) for seg in poly] def fault_metric_tensor(mesh, faults, refinement=3.0, width="auto", base=1.0): r"""Build the analytic, Eulerian **normal-aligned anisotropic metric tensor** ``M(x)`` for refining a thin band of cells **across** one or more codimension-1 faults, for the supplied-tensor r-adapt mover. Pass the result straight to the mover:: M = uw.meshing.fault_metric_tensor(mesh, faults, refinement=3.0) uw.meshing.smooth_mesh_interior( mesh, metric=M, method="anisotropic", boundary_slip=False, method_kwargs=dict(n_outer=12, relax=0.4)) Construction — summed over every fault segment ``i`` (normal ``n_i``, point-to-segment distance ``d_i(x)``): .. math:: M(x) = \mathtt{base}\,\Big[\,I + (R^2 - 1)\textstyle\sum_i e^{-(d_i(x)/W)^2}\, n_i n_i^{\mathsf T}\Big]. At a fault the across-fault eigenvalue is ``base·R²`` (cell size ``h_0/R``) and the along-fault eigenvalue is ``base`` (size ``h_0``): a thin strip refined **only across** the fault, so there is no along-fault budget competition and the band centres on the line. Used directly as the mover's tensor ``D``; the overall ``base`` scale is irrelevant to the mover (only the ``R²`` anisotropy ratio and the spatial variation matter). Parameters ---------- mesh : Mesh 2D mesh (the anisotropic mover is 2D-only). faults : Surface | array | list The fault geometry, in **mesh coordinate space**: a :class:`Surface` (uses its control-point polyline), an ``(N>=2, 2|3)`` polyline array, or a list mixing those (each polyline segment contributes a bump with its own normal — handles 1/2/3 faults, parallel or not, straight or kinked). refinement : float, default 3.0 ``R`` — the across-fault refinement ratio. Cells refine to ``≈ h_0/R`` across the fault (eigenvalue ratio ``R²:1``). Larger ``R`` ⇒ finer across-fault cells (down to the fixed-node-budget floor). width : float | quantity | "auto", default "auto" ``W`` — the half-width (length-scale) of the refined strip. ``"auto"`` ≈ ``h_0/6`` (the mesh's mean cell size / 6 — resolvable yet tight). **Smaller ``W`` centres the band more tightly** (the residual offset scales with ``W``), but must stay resolvable by the (Eulerian-refined) mesh — too thin (``≲ h_0/12``) and the strip under-resolves on the starting mesh. ``h_0/4 … h_0/8`` is the sweet spot. base : float, default 1.0 Overall isotropic scale (mover-irrelevant; kept for generality). Returns ------- sympy.Matrix The ``2×2`` analytic metric tensor ``M(x)`` (a function of ``mesh.CoordinateSystem.X``), to pass as ``metric=`` with ``method="anisotropic"``. """ cdim = mesh.cdim if cdim != 2: raise NotImplementedError( "fault_metric_tensor is 2D only (matches the anisotropic mover)") R = float(refinement) if isinstance(width, str): if width.strip().lower() != "auto": raise ValueError( f"width string must be 'auto'; got {width!r} (pass a number " "or a uw.quantity length otherwise)") from underworld3.meshing.smoothing import _edge_pairs ep = _edge_pairs(mesh.dm) Xc = np.asarray(mesh.X.coords) if ep.shape[0]: _el = np.linalg.norm(Xc[ep[:, 1]] - Xc[ep[:, 0]], axis=1) _esum, _ecnt = float(_el.sum()), int(_el.shape[0]) else: _esum, _ecnt = 0.0, 0 if uw.mpi.size > 1: _esum = uw.mpi.comm.allreduce(_esum) _ecnt = uw.mpi.comm.allreduce(_ecnt) # TRUE global mean edge length (sum/count). Averaging per-rank means # (allreduce(mean)/size) mis-weights ranks with unequal edge counts and # lets an empty partition's sentinel 1.0 pollute the result. h0 = (_esum / _ecnt) if _ecnt > 0 else 1.0 W = h0 / 6.0 else: try: W = float(uw.scaling.non_dimensionalise(width)) except Exception: W = float(width) if not (W > 0.0): raise ValueError(f"width must be positive; got {W}") segs = _fault_collect_segments(faults) if not segs: raise ValueError("fault_metric_tensor: no fault segments found") X = mesh.CoordinateSystem.X amp = base * (R ** 2 - 1.0) M = base * sympy.eye(2) for (a, b) in segs: ab = np.asarray(b, dtype=float) - np.asarray(a, dtype=float) seglen = float(np.linalg.norm(ab)) if seglen == 0.0: continue nx, ny = -ab[1] / seglen, ab[0] / seglen d = _fault_seg_distance_sym(X, a, b) bump = amp * sympy.exp(-(d / W) ** 2) M = M + bump * sympy.Matrix([[nx * nx, nx * ny], [nx * ny, ny * ny]]) return M def fault_comb_metric(mesh, faults, cell_size, n_across=4, amplitude=6.0, tooth_width=None, combine="sum"): r"""Build a scalar **comb** metric ``ρ(x)`` that refines a band of a controlled number of roughly-**uniform** cells *across* one or more faults, for the isotropic equidistribution mover (``method="ma"``). Pass the result straight to the mover:: rho = uw.meshing.fault_comb_metric(mesh, faults, cell_size=0.006, n_across=4) uw.meshing.smooth_mesh_interior( mesh, metric=rho, method="ma", method_kwargs=dict(n_outer=1, n_picard=25)) # single-shot Use the **single-shot** map (``n_outer=1``): one Caffarelli-clean Monge–Ampère solve, untangled by construction (no folding), with no outer-iteration compounding and nothing to tune — the most robust configuration, and the comb's teeth give the single map all the row structure it needs (~``n_across``−1 even layers, centred). ``n_outer=2`` realises a touch more of the requested ``n_across`` (the single map is mildly node-budget-capped) at ~1.6× the cost; rarely needed. **Why a comb.** An equidistribution mover places node density ∝ √ρ, so a single peaked "refine-this-band" metric piles all the nodes at the maximum (finest at the fault, coarsening out — *graded*, not uniform). The comb instead places **discrete equal teeth at the exact distances where node layers are wanted** — ``d = 0, dx, 2 dx, …`` — so the mover drops one node **row at each tooth**: equal teeth ⇒ evenly-spaced rows ⇒ a roughly-uniform band of cell size ``dx``. The ``d=0`` tooth sits on the fault, so a layer is pinned to the line (this also *centres* the band, even for two close faults). Per fault the distance is the **min over its segments**, so a curved/polyline fault gets bands that follow the curve (offset curves), not a tangle of per-segment bands. The realised band is ~2.5:1 in cell size (the metric valleys between teeth still want to coarsen) — uniform *enough* for a slip rheology; a perfectly uniform band needs added nodes (h-adapt). .. math:: ρ(x) = 1 + A \sum_i \sum_{k=0}^{m} \exp\!\big(-((d_i(x) - k\,dx)/w)^2\big), teeth ``k = 0…⌊n_across/2⌋``, ``d_i`` the distance to fault ``i``. Parameters ---------- mesh : Mesh 2D mesh. (The isotropic equidistribution movers — ``ma``/``ot`` — are 2D-only; 3D would need a 3D equidistribution mover, which does not yet exist, so this builder is 2D-only.) faults : Surface | array | list Fault geometry in mesh coordinate space — a :class:`Surface`, an ``(N>=2, 2|3)`` polyline array, or a list mixing those. Each fault's band is built from its own min-distance (handles 1/2/3 faults, any orientation, straight or curved). cell_size : float ``dx`` — the tooth spacing = the target uniform across-fault cell size. n_across : int, default 4 Number of elements across each band; teeth fill the half-width ``(n_across/2)*cell_size`` on each side of the fault. amplitude : float, default 6.0 ``A`` — how strongly each tooth attracts a node row (contrast vs the unrefined background). ~6 is a good operating point. tooth_width : float, optional Gaussian half-width of each tooth. Default ``cell_size/4`` — narrow enough for distinct rows, wide enough to be resolvable on the starting mesh (``≲ cell_size/6`` can be sub-cell and fail to form a row). combine : {"sum", "max"}, default "sum" How to combine faults. ``"sum"`` (default) superposes the per-fault combs (fine for separated faults); ``"max"`` takes the strongest comb (cleaner when two faults are closer than a band width, avoiding doubled teeth in the gap). Returns ------- sympy.Expr The scalar comb metric ``ρ(x)``, to pass as ``metric=`` with ``method="ma"``. """ cdim = mesh.cdim if cdim != 2: raise NotImplementedError( "fault_comb_metric is 2D only (the isotropic equidistribution " "movers are 2D; 3D needs a 3D equidistribution mover)") dx = float(cell_size) if not (dx > 0.0): raise ValueError(f"cell_size must be positive; got {cell_size}") if combine not in ("sum", "max"): raise ValueError(f"combine must be 'sum' or 'max'; got {combine!r}") wn = float(tooth_width) if tooth_width is not None else dx / 4.0 nteeth = int(round(n_across / 2.0)) + 1 A = float(amplitude) polylines = _fault_collect_polylines(faults) if not polylines: raise ValueError("fault_comb_metric: no faults found") X = mesh.CoordinateSystem.X per_fault = [] for segs in polylines: d = None for (a, b) in segs: ds = _fault_seg_distance_sym(X, a, b) d = ds if d is None else sympy.Min(d, ds) # min-distance = the fault comb_i = sum(sympy.exp(-((d - k * dx) / wn) ** 2) for k in range(nteeth)) per_fault.append(comb_i) if combine == "sum": total = sum(per_fault, sympy.Integer(0)) else: # "max" total = per_fault[0] for ci in per_fault[1:]: total = sympy.Max(total, ci) return 1 + A * total def compose_metrics(metrics, compose="max"): r"""Combine several scalar density metrics into one, for the equidistribution mover (``method="ma"``). Each item may be either a metric (a scalar sympy expression or MeshVariable) or a ``(metric, weight)`` tuple. The default ``"max"`` composition is a **weighted maximum on the excess density** .. math:: \rho_{\mathrm{combined}}(x) = 1 + \max_i\; w_i\,\bigl(\rho_i(x) - 1\bigr), so equal weights reduce to plain ``max(ρ_i)`` ("refine to the finest demand from any feature") and larger ``w_i`` amplifies that feature's relative demand (the way to e.g. make a fault "heavier" than a thermal boundary layer in the same run). The result is itself a valid scalar density (``≥ 1``). Examples -------- :: rho_T = uw.meshing.metric_density_from_gradient(mesh, T, metric_choice="arc-length") rho_F = uw.meshing.fault_comb_metric(mesh, faults, cell_size=0.008) rho = uw.meshing.compose_metrics([(rho_T, 1.0), (rho_F, 3.0)]) # fault heavier uw.meshing.smooth_mesh_interior(mesh, metric=rho, method="ma", method_kwargs=dict(n_outer=1, n_picard=25)) Parameters ---------- metrics : sequence Items are scalar metrics or ``(metric, weight)`` tuples. compose : {"max"}, default "max" Composition operator. Only ``"max"`` (weighted-max-on-excess) is implemented; the kwarg exists to leave room for other strategies. Returns ------- sympy.Expr The composed scalar density. """ if compose != "max": raise ValueError( f"compose must be 'max' (got {compose!r}); other strategies " "are not implemented yet") pairs = [] for item in metrics: if isinstance(item, tuple) and len(item) == 2: m, w = item[0], float(item[1]) else: m, w = item, 1.0 if isinstance(m, sympy.MatrixBase): raise ValueError( "compose_metrics: only scalar density metrics compose by " "max; tensor metrics need metric intersection (not " "implemented). Got a sympy Matrix.") pairs.append((m, w)) if not pairs: raise ValueError("compose_metrics: at least one metric required") if len(pairs) == 1: m, w = pairs[0] return m if w == 1.0 else 1 + w * (m - 1) excess = [w * (m - 1) for (m, w) in pairs] return 1 + sympy.Max(*excess) def _mesh_h0(mesh): """Mean undeformed edge length (parallel-safe) — the mesh's characteristic cell size, used to translate an absolute ``cell_size`` into the anisotropic mover's relative refinement ratio.""" from underworld3.meshing.smoothing import _edge_pairs ep = _edge_pairs(mesh.dm) Xc = np.asarray(mesh.X.coords) if ep.shape[0]: _el = np.linalg.norm(Xc[ep[:, 1]] - Xc[ep[:, 0]], axis=1) _esum, _ecnt = float(_el.sum()), int(_el.shape[0]) else: _esum, _ecnt = 0.0, 0 if uw.mpi.size > 1: _esum = uw.mpi.comm.allreduce(_esum) _ecnt = uw.mpi.comm.allreduce(_ecnt) # TRUE global mean edge length (sum/count), not a mean of per-rank means. return (_esum / _ecnt) if _ecnt > 0 else 1.0 def _fault_min_distance_np(P, polylines): """Numpy min point-to-polyline distance from points ``P`` (k, 2) to all segments of all faults — used to build the nodal MMG metric.""" d = np.full(P.shape[0], np.inf) for segs in polylines: for (a, b) in segs: a = np.asarray(a, float); b = np.asarray(b, float) ab = b - a; ab2 = float(ab @ ab) if ab2 == 0.0: dd = np.linalg.norm(P - a, axis=1) else: t = np.clip(((P - a) @ ab) / ab2, 0.0, 1.0) dd = np.linalg.norm(P - (a + np.outer(t, ab)), axis=1) d = np.minimum(d, dd) return d def _fault_mmg_metric(mesh, faults, cell_size, n_across, h_far, name): """Build the ``h⁻²`` isotropic metric MeshVariable for ``mesh.adapt`` (MMG): edge length ``cell_size`` in the band ``|d| < (n_across/2)·dx``, ramping (smoothstep) to ``h_far`` outside. MMG *adds* nodes to honour this absolute spacing, so the band is genuinely uniform.""" dx = float(cell_size) D = (n_across / 2.0) * dx h_far = float(h_far) if h_far is not None else _mesh_h0(mesh) tau = max(D, 2.0 * dx) # transition width metric = uw.discretisation.MeshVariable( name, mesh, vtype=uw.VarType.SCALAR, degree=1, continuous=True) polylines = _fault_collect_polylines(faults) P = np.asarray(metric.coords)[:, :mesh.cdim] d = _fault_min_distance_np(P, polylines) xc = np.clip((d - D) / tau, 0.0, 1.0) ramp = 3.0 * xc ** 2 - 2.0 * xc ** 3 # 0 in band -> 1 far h = dx + (h_far - dx) * ramp metric.data[:, 0] = 1.0 / h ** 2 # isotropic h^-2 metric return metric def fault_metric(mesh, faults, method="ma", *, cell_size, n_across=4, h_far=None, name="fault_metric", **kwargs): r"""Build the fault-refinement metric appropriate for the chosen adaptation ``method``, from one shared physical intent: *resolve ``n_across`` elements of size ``cell_size`` across a band around the fault(s)*. The three movers consume **different metric objects with different semantics**, so this facade unifies the *intent* and emits the right representation — it does not pretend they are interchangeable: =================== ============================ =========================== ``method`` returns pass to =================== ============================ =========================== ``"ma"`` (default) scalar comb density (sympy) ``smooth_mesh_interior( method="ma")`` ``"anisotropic"`` 2×2 tensor (sympy Matrix) ``smooth_mesh_interior( method="anisotropic")`` ``"adapt"``/``"mmg"`` ``h⁻²`` MeshVariable ``mesh.adapt(...)`` =================== ============================ =========================== **``cell_size`` is honoured differently by each** — this is the key distinction, not a detail: * ``"adapt"`` (MMG) **adds nodes**, so ``cell_size`` is an *absolute*, *exact* target: you get a genuinely uniform band of that spacing (topology changes). * ``"ma"`` / ``"anisotropic"`` only **redistribute a fixed node budget** (topology preserved), so ``cell_size`` is a *target*: ``"ma"`` reaches a roughly-uniform ~2.5:1 band near it; ``"anisotropic"`` grades (finest at the fault), is the most node-efficient, and ``n_across`` is only indicative. Parameters ---------- mesh : Mesh (2D) faults : Surface | array | list Fault geometry (``Surface``, polyline array, or a list); passed through to the per-method builder. method : {"ma", "anisotropic", "adapt"/"mmg"}, default "ma" cell_size : float (keyword-only, required) Target across-fault cell size. Exact for ``adapt``; a target for the r-adapt methods. n_across : int, default 4 Elements across the band → band half-width ``(n_across/2)·cell_size``. h_far : float, optional ``adapt`` only — far-field edge length (default ≈ mesh cell size). name : str ``adapt`` only — name for the metric MeshVariable. **kwargs Forwarded to the underlying builder (e.g. ``amplitude``, ``tooth_width``, ``combine`` for ``ma``; ``base`` for ``anisotropic``). Returns ------- sympy.Expr | sympy.Matrix | MeshVariable The metric object for the chosen ``method`` (see table). Examples -------- :: # uniform-ish band, fixed topology (the slip-rheology recipe) rho = uw.meshing.fault_metric(mesh, faults, method="ma", cell_size=0.006, n_across=4) uw.meshing.smooth_mesh_interior(mesh, metric=rho, method="ma", method_kwargs=dict(n_outer=1, n_picard=25)) """ if cell_size is None or not (float(cell_size) > 0.0): raise ValueError("cell_size must be a positive number") m = method.strip().lower() if m in ("ma", "monge-ampere", "monge_ampere", "comb"): return fault_comb_metric(mesh, faults, cell_size=cell_size, n_across=n_across, **kwargs) if m in ("anisotropic", "aniso", "tensor"): # translate absolute intent -> relative refinement ratio + strip width R = max(_mesh_h0(mesh) / float(cell_size), 1.0) width = (n_across / 2.0) * float(cell_size) return fault_metric_tensor(mesh, faults, refinement=R, width=width, **kwargs) if m in ("adapt", "mmg", "h-adapt", "h_adapt"): return _fault_mmg_metric(mesh, faults, cell_size, n_across, h_far, name) raise ValueError( f"unknown method {method!r}; choose 'ma' (comb density, r-adapt), " "'anisotropic' (tensor, r-adapt) or 'adapt'/'mmg' (h^-2 MeshVariable, " "mesh.adapt, adds nodes)")